#include #include #include #include #include #include #include #include #include extern const char *__progname; #define COREBSIZE 1048576 typedef struct diskfile DISKFILE; struct diskfile { DISKFILE *flink; DISKFILE *blink; int rfd; int wfd; unsigned long int fx; unsigned long long int size; unsigned long long int roff; } ; typedef struct fifo FIFO; struct fifo { int head; int tail; int size; char buf[COREBSIZE]; } ; static const char *tmpdir = 0; static unsigned long long int fragsize = 10485760; static unsigned long long int batchsize = 1; static unsigned long long int maxbuf = ~0ULL; static const char *filepfx = ".buffer."; static char *filename; static char *filesuf; static unsigned long int nextfx; static unsigned long long int bytesbuffered; static unsigned long long int bytesread; static unsigned long long int byteswritten; static int dumping; static int verbose = 0; static volatile int got_siginfo; static int ateof; static FIFO rbuf; static FIFO wbuf; static int wrbuf; static DISKFILE *df_head; static DISKFILE *df_tail; static int numbervalue(const char *s, unsigned long long int *valp) { char *sp; unsigned long long int v; v = strtouq(s,&sp,0); switch (*sp) { case 't': case 'T': sp ++; v <<= 40; break; case 'g': case 'G': sp ++; v <<= 30; break; case 'm': case 'M': sp ++; v <<= 20; break; case 'k': case 'K': sp ++; v <<= 10; break; case 'b': case 'B': sp ++; v <<= 9; break; } if ((sp == s) || *sp) { fprintf(stderr,"%s: invalid number `%s'\n",__progname,s); return(1); } *valp = v; return(0); } static void handleargs(int ac, char **av) { int skip; int errs; int l; skip = 0; errs = 0; for (ac--,av++;ac;ac--,av++) { if (skip > 0) { skip --; continue; } if (**av != '-') { fprintf(stderr,"%s: unrecognized argument `%s'\n",__progname,*av); errs ++; continue; } if (0) { needarg:; fprintf(stderr,"%s: %s needs a following argument\n",__progname,*av); errs ++; continue; } #define WANTARG() do { if (++skip >= ac) goto needarg; } while (0) if (!strcmp(*av,"-tmpdir")) { WANTARG(); tmpdir = av[skip]; continue; } if (!strcmp(*av,"-size")) { WANTARG(); errs += numbervalue(av[skip],&fragsize); continue; } if (!strcmp(*av,"-batch")) { WANTARG(); errs += numbervalue(av[skip],&batchsize); continue; } if (!strcmp(*av,"-max")) { WANTARG(); errs += numbervalue(av[skip],&maxbuf); continue; } if (!strcmp(*av,"-prefix")) { WANTARG(); filepfx = av[skip]; continue; } if (!strcmp(*av,"-verbose") || !strcmp(*av,"-v")) { verbose = 1; continue; } #undef WANTARG fprintf(stderr,"%s: unrecognized option `%s'\n",__progname,*av); errs ++; } if (errs) exit(1); if (fragsize < 1) { fprintf(stderr,"%s: invalid fragment size %llu (must be >= 1)\n",__progname,fragsize); errs ++; } if (tmpdir == 0) tmpdir = "/tmp"; if (errs) exit(1); l = strlen(tmpdir) + 1 + strlen(filepfx); filename = malloc(l+64); sprintf(filename,"%s/%s",tmpdir,filepfx); filesuf = filename + l; } static void set_nonblocking(int fd) { int on; on = 1; ioctl(fd,FIONBIO,&on); } static void fifo_empty(FIFO *f) { f->head = 0; f->tail = 0; f->size = 0; } static void fifo_init(FIFO *f) { fifo_empty(f); } static void fifo_consume(FIFO *f, int n) { if (n > f->size) abort(); f->size -= n; f->tail += n; if (f->tail >= COREBSIZE) f->tail -= COREBSIZE; if (f->size == 0) fifo_empty(f); } static void fifo_produce(FIFO *f, int n) { f->size += n; if (f->size > COREBSIZE) abort(); f->head += n; if (f->head >= COREBSIZE) f->head -= COREBSIZE; } static int fifo_write(FIFO *f, int fd, int nb) { int n; if (f->tail+nb <= COREBSIZE) { n = write(fd,&f->buf[f->tail],nb); } else { struct iovec iov[2]; iov[0].iov_base = &f->buf[f->tail]; iov[0].iov_len = COREBSIZE - f->tail; iov[1].iov_base = &f->buf[0]; iov[1].iov_len = nb - iov[0].iov_len; n = writev(fd,&iov[0],2); } if (n < 0) return(-1); fifo_consume(f,n); return(n); } static int fifo_read(FIFO *f, int fd, int nb) { int n; if (f->head+nb <= COREBSIZE) { n = read(fd,&f->buf[f->head],nb); } else { struct iovec iov[2]; iov[0].iov_base = &f->buf[f->head]; iov[0].iov_len = COREBSIZE - f->head; iov[1].iov_base = &f->buf[0]; iov[1].iov_len = nb - iov[0].iov_len; n = readv(fd,&iov[0],2); } if (n < 0) return(-1); fifo_produce(f,n); return(n); } static void makefn(unsigned long int x) { char *p; static char chars[64] = "0123456789@ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz"; int n; p = filesuf; for (n=1;x>>(n*6);n++) ; p += n; *p = '\0'; while (x > 0) { *--p = chars[x&63]; x >>= 6; } } static void die(int) __attribute__ ((noreturn)); static void die(int exitstat) { if (df_head) close(df_head->wfd); if (df_tail) close(df_tail->rfd); while (df_head) { makefn(df_head->fx); if (unlink(filename) < 0) { if (errno != ENOENT) { fprintf(stderr,"%s: can't remove %s: %s\n",__progname,filename,strerror(errno)); } } df_head = df_head->flink; } exit(exitstat); } static void new_buffile(void) { DISKFILE *df; if (df_head) { close(df_head->wfd); df_head->wfd = -1; } df = malloc(sizeof(DISKFILE)); df->size = 0; df->flink = df_head; df->blink = 0; if (df_head) df_head->blink = df; else df_tail = df; df_head = df; try_next_fx:; df->fx = nextfx ++; makefn(df->fx); df->wfd = open(filename,O_WRONLY|O_CREAT|O_EXCL,0600); if (df->wfd < 0) { if (errno == EEXIST) goto try_next_fx; fprintf(stderr,"%s: can't open temporary file %s: %s\n",__progname,filename,strerror(errno)); die(1); } if (df_tail == df) { df->rfd = open(filename,O_RDONLY,0); if (df->rfd < 0) { fprintf(stderr,"%s: can't reopen %s for read: %s\n",__progname,filename,strerror(errno)); die(1); } df->roff = 0; } else { df->rfd = -1; } if (verbose) fprintf(stderr,"%s: new buffer file %s\n",__progname,filename); } static void old_buffile(void) { DISKFILE *f; f = df_tail; if (f->blink) { f->blink->flink = 0; } else { close(f->wfd); df_head = 0; } close(f->rfd); df_tail = f->blink; makefn(f->fx); unlink(filename); if (verbose) fprintf(stderr,"%s: removed buffer file %s [size %llu roff %llu]\n",__progname,filename,f->size,f->roff); free(f); if (df_tail) { makefn(df_tail->fx); df_tail->rfd = open(filename,O_RDONLY,0); if (df_tail->rfd < 0) { fprintf(stderr,"%s: can't reopen %s for read: %s\n",__progname,filename,strerror(errno)); die(1); } df_tail->roff = 0; } } static void core_to_disk(void) { int n; if (!df_head || (df_head->size >= fragsize)) { if (verbose) fprintf(stderr,"%s: new buffile (no room)\n",__progname); new_buffile(); } if (verbose) fprintf(stderr,"%s: core_to_disk, rbuf.size %d\n",__progname,rbuf.size); while (rbuf.size > fragsize-df_head->size) { if (fifo_write(&rbuf,df_head->wfd,fragsize-df_head->size) < 0) { fprintf(stderr,"%s: buffer file write error: %s\n",__progname,strerror(errno)); die(1); } if (verbose) fprintf(stderr,"%s: filled buffile, rbuf.size now %d\n",__progname,rbuf.size); df_head->size = fragsize; new_buffile(); } n = rbuf.size; if (fifo_write(&rbuf,df_head->wfd,n) < 0) { fprintf(stderr,"%s: buffer file write error: %s\n",__progname,strerror(errno)); die(1); } if (verbose) fprintf(stderr,"%s: wrote %d to buffile\n",__progname,n); df_head->size += n; fifo_empty(&rbuf); wrbuf = 0; } static void core_to_core(void) { int ncopy; ncopy = rbuf.size; if (wbuf.size+ncopy > COREBSIZE) ncopy = COREBSIZE - wbuf.size; while (ncopy > 0) { int n; n = ncopy; if (n > COREBSIZE-rbuf.tail) n = COREBSIZE - rbuf.tail; if (n > COREBSIZE-wbuf.head) n = COREBSIZE - wbuf.head; bcopy(&rbuf.buf[rbuf.tail],&wbuf.buf[wbuf.head],n); fifo_consume(&rbuf,n); fifo_produce(&wbuf,n); ncopy -= n; } wrbuf = 0; } static void disk_to_core(void) { int n; int r; while (1) { if (! df_tail) return; n = df_tail->size - df_tail->roff; if (n < 1) { old_buffile(); continue; } if (COREBSIZE-wbuf.size < n) { n = COREBSIZE - wbuf.size; if (n < 1) return; } r = fifo_read(&wbuf,df_tail->rfd,n); if (r < 0) { fprintf(stderr,"%s: read error on temporary file: %s\n",__progname,strerror(errno)); die(1); } if (r == 0) { fprintf(stderr,"%s: unexpected EOF on temporary file\n",__progname); die(1); } if (r < n) { fprintf(stderr,"%s: warning: short read from temporary file (wanted %d, got %d)\n",__progname,n,r); } df_tail->roff += r; } } static void readsome(void) { int n; if (rbuf.size >= COREBSIZE) { if (!df_head && (wbuf.size < COREBSIZE)) { core_to_core(); } else { core_to_disk(); } } n = fifo_read(&rbuf,0,COREBSIZE-rbuf.size); if (n < 0) { fprintf(stderr,"%s: read from stdin: %s\n",__progname,strerror(errno)); ateof = 1; return; } if (n == 0) { ateof = 1; return; } if (bytesbuffered == 0) wrbuf = 1; bytesread += n; bytesbuffered += n; if (verbose) fprintf(stderr,"%s: read %d\n",__progname,n); } static void writesome(void) { int n; FIFO *b; if (wrbuf) { b = &rbuf; } else { b = &wbuf; if ((wbuf.size < COREBSIZE) && df_head) { disk_to_core(); } if (wbuf.size == 0) { wrbuf = 1; b = &rbuf; } } if (b->size == 0) { fprintf(stderr,"%s: bytesbuffered = %llu but nothing buffered\n",__progname,bytesbuffered); abort(); } n = fifo_write(b,1,b->size); if (n < 0) { if (errno == EWOULDBLOCK) return; fprintf(stderr,"%s: output write error: %s\n",__progname,strerror(errno)); die(1); } byteswritten += n; bytesbuffered -= n; if (verbose) fprintf(stderr,"%s: wrote %d\n",__progname,n); } static void handle_siginfo(int sig __attribute__((__unused__))) { got_siginfo = 1; } static void setup_siginfo(void) { struct sigaction sa; sa.sa_handler = &handle_siginfo; sigemptyset(&sa.sa_mask); sa.sa_flags = SA_RESTART; got_siginfo = 0; if (sigaction(SIGINFO,&sa,0) < 0) { fprintf(stderr,"%s: warning: couldn't establish SIGINFO handler: %s\n",__progname,strerror(errno)); } } static void print_status(void) { fprintf(stderr,"%s: read %llu%s, wrote %llu, buffered %llu\n",__progname,bytesread,ateof?"[EOF]":"",byteswritten,bytesbuffered); } int main(int, char **); int main(int ac, char **av) { fd_set rfds; fd_set wfds; handleargs(ac,av); nextfx = 1; bytesbuffered = 0; bytesread = 0; byteswritten = 0; ateof = 0; set_nonblocking(0); set_nonblocking(1); fifo_init(&rbuf); fifo_init(&wbuf); df_head = 0; df_tail = 0; if (verbose) { fprintf(stderr,"%s: tmpdir = %s\n",__progname,tmpdir); fprintf(stderr,"%s: filepfx = %s\n",__progname,filepfx); fprintf(stderr,"%s: fragsize = %llu\n",__progname,fragsize); } setup_siginfo(); while ((bytesbuffered > 0) || !ateof) { if (got_siginfo) { got_siginfo = 0; print_status(); } FD_ZERO(&rfds); FD_ZERO(&wfds); if ((bytesbuffered < maxbuf) && !ateof) FD_SET(0,&rfds); if (bytesbuffered >= (ateof?1:batchsize)) dumping = 1; else if (bytesbuffered < 1) dumping = 0; if (dumping) FD_SET(1,&wfds); if (select(2,&rfds,&wfds,0,0) < 0) { if (errno == EINTR) continue; fprintf(stderr,"%s: select: %s\n",__progname,strerror(errno)); die(1); } if (FD_ISSET(1,&wfds)) writesome(); if (FD_ISSET(0,&rfds)) readsome(); } die(0); }