/* Define NO_SS_LEN if ss_len doesn't exist in struct sockaddr_storage. */ /* Define NO_VERBOSE to get rid of the verbose progress code; in particular, this will elimiate dependence on a Torek stdio (the usual symptom is a missing fwopen()). */ /* * Protocol: * * Sender sends PKOP_STARTSEND packet, retransmitting until it gets a * PKOP_STARTRECV reply. * * Receiver listens until receiving PKOP_STARTSEND, at which point it * replies with PKOP_STARTRECV. To help deal with receiving files * through NAT firewalls, it sends a PKOP_NOOP packet every fifteen * seconds while waiting. * * While there are blocks that have not been verified sent: * * Sender sends one or more PKOP_BLKHASH packets containing hashes * of file blocks. * * Receiver replies with PKOP_IHAVE and/or PKOP_SENDME packets * listing the blocks to be sent. * * Sender sends PKOP_DATA packets for blocks requested by SENDME * packets. * * Receiver sends a PKOP_GOTIT packet for each PKOP_DATA packet it * receives. * * Sender will eventually time out and resend BLKHASH packets if * it doesn't get an IHAVE or SENDME listing block numbers; if it * doesn't receive a GOTIT for a DATA, it will resend the DATA. * * Once all data transferred OK, sender sends one PKOP_ALLDONE packet * per second until it exits; the receiver replies to ALLDONE packets * with EXITING packets. The sender exits upon receiving the third * EXITING packet; the receiver exits five seconds after it last * receives an ALLDONE packet, or after receiving ten ALLDONE packets. * * Packets with incorrect signatures are ignored. * Out-of-order packets are ignored. * Ignored packets produce a warning message. * * Files are conceptually divided into 512-byte blocks. If the file * size is not a multiple of 512 bytes, it is padded with 0x00 bytes * for purposes of block contents; truncation to actual size is done * by the receiver. * * Every packet consists of: * * Four-byte signature, which is computed by taking the MD5 hash * of the shared key, the rest of the packet, and the shared key * again, then folding this hash in half with XOR, then folding * the result in half again with XOR. * * One-byte type field, which can be * PKOP_STARTSEND 1 * PKOP_STARTRECV 2 * PKOP_BLKHASH 3 * PKOP_IHAVE 4 * PKOP_SENDME 5 * PKOP_DATA 6 * PKOP_GOTIT 7 * PKOP_ALLDONE 8 * PKOP_EXITING 9 * PKOP_NOOP 10 * * Type-specific data. All multi-byte numbers are sent big-endian. * * For STARTSEND: * Eight bytes holding the total file size in bytes. * * For STARTRECV: * Nothing. * * For BLKHASH: * One byte containing the number (N) of pairs * following (N will be at most 22). * N pairs, each sent as * 7 bytes of block number * 16 bytes holding the MD5 hash of that block * * For IHAVE and SENDME: * One byte containing the number (N) of block numbers * following. * N block numbers, each 7 bytes long. * * For DATA: * 7 bytes of block number * 512 bytes of block data * * For GOTIT: * 7 bytes of block number * 16 bytes holding the MD5 hash of the block * * For ALLDONE: * Nothing. * * For EXITING: * Nothing. * * For NOOP: * Nothing. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include extern const char *__progname; #define PKTTIMEOUT 10000 #define PKOP_NULL 0 #define PKOP_STARTSEND 1 #define PKOP_STARTRECV 2 #define PKOP_BLKHASH 3 #define PKOP_IHAVE 4 #define PKOP_SENDME 5 #define PKOP_DATA 6 #define PKOP_GOTIT 7 #define PKOP_ALLDONE 8 #define PKOP_EXITING 9 #define PKOP_NOOP 10 static int debugging = 0; static unsigned short int my_port; static unsigned short int peer_port; static const char *key; static int keylen; static const char *filename; static int filefd; static off_t filesize; static char *filemap; #ifndef NO_VERBOSE static int verbose; #endif static int startdelay = 0; static int timeout = 0; #define MAXPENDING 64 static int pendovf = 0; typedef unsigned long long int uquad; #define UQFMT "%qu" typedef struct packet PACKET; typedef struct timer TIMER; typedef struct pending PENDING; typedef struct range RANGE; struct range { RANGE *link; uquad lo; uquad hi; } ; struct packet { unsigned char *data; int len; } ; struct timer { unsigned int fired : 1; unsigned int armed : 1; struct timeval when; int x; } ; struct pending { int x; int state; #define PS_FREE 1 /* unused slot */ #define PS_UNTRIED 2 /* not yet queried about */ #define PS_QUERIED 3 /* BLKHASH sent, awaiting IHAVE/SENDME */ #define PS_OKAY 4 /* block over there OK */ #define PS_NEED 5 /* block needs to be sent */ #define PS_SENT 6 /* DATA sent, awaiting GOTIT */ #define PS_RESV 7 /* reserved to avoid ENOBUFS */ #define PS__N 8 uquad block; unsigned char hash[16]; TIMER timer; } ; static const char zblk[512] = { 0 }; static TIMER **timers; static TIMER exit_timer; static TIMER startdelay_timer; static TIMER timeout_timer; static int timers_alloc; static int timers_n; static int s; static uquad maxblock; static uquad cutblock; static uquad floor; static uquad ceiling; static uquad blksxf; static PENDING pend[MAXPENDING]; static int pendfree[MAXPENDING]; static int pendnfree; static int pendnresv; static int pscount[PS__N]; static struct sockaddr_storage sendss; #ifdef NO_SS_LEN static int sendsslen; #else #define sendsslen sendss.ss_len #endif static int exitcount; static int nowvalid; static struct timeval nowtv; static RANGE *unsent; #ifndef NO_VERBOSE static TIMER progress_timer; static FILE *progress; static int progress_len; static int progress_alloc; static char *progress_buf; static int progress_displen; #endif static void range_add(RANGE **root, uquad lo, uquad hi) { RANGE *r; RANGE *r2; RANGE **rp; rp = root; while ((r = *rp)) { if (r->hi+1 < lo) { rp = &r->link; continue; } if (r->hi >= hi) return; if (r->lo > hi+1) break; if (lo < r->lo) r->lo = lo; if (hi > r->hi) { while ((r2=r->link) && (hi+1 >= r2->lo)) { if (r2->hi > hi) hi = r2->hi; r->link = r2->link; free(r2); } } } r = malloc(sizeof(RANGE)); r->link = *rp; *rp = r; r->lo = lo; r->hi = hi; } static uquad range_remove(RANGE **root, uquad lo, uquad hi) { RANGE *r; RANGE *r2; RANGE **rp; uquad rv; rv = 0; rp = root; while ((r = *rp)) { if (r->hi < lo) { rp = &r->link; continue; } if (r->lo > hi) return(rv); if (lo > r->lo) { if (hi < r->hi) { r2 = malloc(sizeof(RANGE)); r2->link = r->link; r->link = r2; r2->hi = r->hi; r2->lo = hi + 1; r->hi = lo - 1; return(rv+(1+hi-lo)); } rv += r->hi - (lo-1); r->hi = lo - 1; rp = &r->link; continue; } if (hi >= r->hi) { rv += 1 + r->hi - r->lo; *rp = r->link; free(r); continue; } rv += hi+1 - r->lo; r->lo = hi + 1; return(rv); } return(rv); } static void range_print(RANGE *r, FILE *to) { const char *pref; pref = ""; for (;r;r=r->link) { if (r->lo == r->hi) { fprintf(to,"%s"UQFMT,pref,r->lo); } else { fprintf(to,"%s"UQFMT"-"UQFMT,pref,r->lo,r->hi); } pref = " "; } } static void openfile(int how) { filefd = open(filename,how,0666); if (filefd < 0) { fprintf(stderr,"%s: can't open %s: %s\n",__progname,filename,strerror(errno)); exit(1); } } static int tvlt(struct timeval a, struct timeval b) { return( (a.tv_sec < b.tv_sec) || ( (a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec) ) ); } static struct timeval tv_add_ms_tv(struct timeval a, unsigned int ms) { a.tv_sec += ms / 1000; a.tv_usec += (ms % 1000) * 1000; if (a.tv_usec >= 1000000) { a.tv_sec ++; a.tv_usec -= 1000000; } return(a); } static unsigned int tv_sub_tv_ms_chop(struct timeval a, struct timeval b) { uquad d; if (tvlt(a,b)) return(0); d = ((a.tv_sec - b.tv_sec) * (uquad)1000000) + (a.tv_usec - b.tv_usec); return(d/1000); } static struct timeval now(void) { if (! nowvalid) { gettimeofday(&nowtv,0); nowvalid = 1; } return(nowtv); } static void timer_reheap_down(int x) { int l; int r; int s; TIMER *tx; TIMER *ts; tx = timers[x]; while (1) { l = x + x + 1; r = l + 1; if ((l < timers_n) && tvlt(timers[l]->when,tx->when)) { if ((r < timers_n) && tvlt(timers[r]->when,tx->when)) { s = tvlt(timers[l]->when,timers[r]->when) ? l : r; } else { s = l; } } else { if ((r < timers_n) && tvlt(timers[r]->when,tx->when)) { s = r; } else { break; } } ts = timers[s]; ts->x = x; timers[x] = ts; x = s; } tx->x = x; timers[x] = tx; } static void timer_reheap_up(int x) { int u; TIMER *tx; TIMER *tu; tx = timers[x]; while (x > 0) { u = (x - 1) >> 1; tu = timers[u]; if (! tvlt(tx->when,tu->when)) break; tu->x = x; timers[x] = tu; x = u; } tx->x = x; timers[x] = tx; } static void timer_init(TIMER *t) { t->fired = 0; t->armed = 0; } static void timer_disarm(TIMER *t) { if (! t->armed) return; if (timers_n < 1) abort(); timers_n --; if (t->x < timers_n) { timers[t->x] = timers[timers_n]; timer_reheap_down(t->x); } t->armed = 0; } static void timer_arm(TIMER *t) { if (timers_alloc <= timers_n) timers = realloc(timers,(timers_alloc=timers_n+10)*sizeof(*timers)); timers[timers_n++] = t; timer_reheap_up(timers_n-1); t->armed = 1; t->fired = 0; } static void timer_set(TIMER *t, int ms) { if (t->armed) timer_disarm(t); t->when = tv_add_ms_tv(now(),ms); timer_arm(t); } static int timer_fired(TIMER *t) { return(t->fired); } static void timer_fire(TIMER *t) { timer_disarm(t); t->fired = 1; } static void timer_check(void) { while ((timers_n > 0) && !tvlt(now(),timers[0]->when)) timer_fire(timers[0]); } static void setup_read(void) { struct stat stb; void *mrv; int i; openfile(O_RDONLY); fstat(filefd,&stb); filesize = stb.st_size; mrv = mmap(0,filesize,PROT_READ,MAP_SHARED,filefd,0); if (mrv == MAP_FAILED) { fprintf(stderr,"%s: can't mmap %s: %s\n",__progname,filename,strerror(errno)); exit(1); } filemap = mrv; maxblock = (filesize + 511) >> 9; cutblock = filesize >> 9; floor = 0; ceiling = 0; pendnfree = MAXPENDING; pendnresv = 0; for (i=0;idata = (void *) (p+1); p->len = payload_len; } else { p = malloc(sizeof(PACKET)+4+1+payload_len); p->data = (void *) (p+1); p->len = 4 + 1 + payload_len; p->data[4] = type; } return(p); } static void compute_checksum(const void *data, int len, unsigned char *into) { void *m; unsigned char res[16]; m = md5_init(); md5_process_bytes(m,key,keylen); md5_process_bytes(m,data,len); md5_process_bytes(m,key,keylen); md5_result(m,&res[0]); into[0] = res[ 0] ^ res[ 4] ^ res[ 8] ^ res[12]; into[1] = res[ 1] ^ res[ 5] ^ res[ 9] ^ res[13]; into[2] = res[ 2] ^ res[ 6] ^ res[10] ^ res[14]; into[3] = res[ 3] ^ res[ 7] ^ res[11] ^ res[15]; } static uquad getmbyte(const void *at, int len) { uquad rv; const unsigned char *p; rv = 0; p = at; for (;len>0;len--) rv = (rv << 8) | *p++; return(rv); } static void putmbyte(void *at, uquad val, int len) { unsigned char *p; p = at; for (len--;len>=0;len--) *p++ = (val >> (len<<3)) & 0xff; } static PACKET *make_packet(int type, ...) { va_list ap; PACKET *p; va_start(ap,type); switch (type) { default: abort(); break; case PKOP_STARTSEND: p = init_packet(8,PKOP_STARTSEND); putmbyte(p->data+4+1,filesize,8); break; case PKOP_STARTRECV: case PKOP_ALLDONE: case PKOP_EXITING: case PKOP_NOOP: p = init_packet(0,type); break; case PKOP_SENDME: case PKOP_IHAVE: { int n; uquad *list; int i; n = va_arg(ap,int); list = va_arg(ap,uquad *); p = init_packet(1+(7*n),type); p->data[4+1] = n; for (i=0;idata+4+1+1+(7*i),list[i],7); } break; case PKOP_GOTIT: { int datalen; uquad blkno; const unsigned char *data; datalen = 16; if (0) { case PKOP_DATA: datalen = 512; } blkno = va_arg(ap,uquad); data = va_arg(ap,const unsigned char *); p = init_packet(7+datalen,type); putmbyte(p->data+4+1,blkno,7); bcopy(data,p->data+4+1+7,datalen); } break; case PKOP_BLKHASH: { int n; uquad *blks; const unsigned char **hashes; int i; n = va_arg(ap,int); blks = va_arg(ap,uquad *); hashes = va_arg(ap,const unsigned char **); p = init_packet(1+(n*(7+16)),PKOP_BLKHASH); p->data[4+1] = n; for (i=0;idata+4+1+1+(i*(7+16)),blks[i],7); bcopy(hashes[i],p->data+4+1+1+(i*(7+16))+7,16); } } break; } compute_checksum(p->data+4,p->len-4,p->data); return(p); } static void send_packet(PACKET *p) { struct sockaddr_storage ss; if (debugging > 1) { int i; printf("send: %d:",p->len); for (i=0;ilen;i++) printf(" %02x",p->data[i]); printf("\n"); } ss = sendss; if (sendto(s,p->data,p->len,0,(struct sockaddr *)&ss,sendsslen) < 0) { if (errno == ENOBUFS) { pendovf ++; } else { fprintf(stderr,"%s: sendto: %s\n",__progname,strerror(errno)); } } } static PACKET *recv_packet(int tmo) { struct pollfd pfd[1]; int n; PACKET *p; unsigned char check[4]; unsigned char recvbuf[1024]; if (debugging > 1) { printf("recv: "); if (tmo == INFTIM) printf("INFTIM: "); else printf("%d: ",tmo); fflush(stdout); } while (1) { pfd[0].fd = s; pfd[0].events = POLLIN | POLLRDNORM; nowvalid = 0; n = poll(&pfd[0],1,tmo); if (n < 0) { if (errno == EINTR) continue; fprintf(stderr,"%s: poll: %s\n",__progname,strerror(errno)); exit(1); } if (n == 0) { if (debugging) printf("timeout\n"); return(0); } n = recv(s,&recvbuf[0],sizeof(recvbuf),0); if (n < 0) { fprintf(stderr,"%s: recv: %s\n",__progname,strerror(errno)); exit(1); } if (n < 5) continue; compute_checksum(&recvbuf[4],n-4,&check[0]); if (bcmp(&check[0],&recvbuf[0],4)) { fprintf(stderr,"%s: checksum wrong (packet %02x%02x%02x%02x, wanted %02x%02x%02x%02x)\n",__progname,recvbuf[0],recvbuf[1],recvbuf[2],recvbuf[3],check[0],check[1],check[2],check[3]); continue; } p = init_packet(n,PKOP_NULL); if (n > 0) bcopy(&recvbuf[0],p->data,n); if (debugging > 1) { int i; printf("got %d:",p->len); for (i=0;ilen;i++) printf(" %02x",p->data[i]); printf("\n"); } return(p); } } static void free_packet(PACKET *p) { free(p); } static const char *type_name(int type) { static char badbuf[64]; switch (type) { case PKOP_STARTSEND: return("STARTSEND"); break; case PKOP_STARTRECV: return("STARTRECV"); break; case PKOP_BLKHASH: return("BLKHASH"); break; case PKOP_IHAVE: return("IHAVE"); break; case PKOP_SENDME: return("SENDME"); break; case PKOP_DATA: return("DATA"); break; case PKOP_GOTIT: return("GOTIT"); break; case PKOP_ALLDONE: return("ALLDONE"); break; case PKOP_EXITING: return("EXITING"); break; } sprintf(&badbuf[0],"",type); return(&badbuf[0]); } static void send_block_list(int op, int n, uquad *list) { PACKET *r; r = make_packet(op,n,list); send_packet(r); free_packet(r); } static void get_block_hash(uquad bno, void *rvp) { void *m; m = md5_init(); if (bno == cutblock) { md5_process_bytes(m,filemap+(bno<<9),filesize&511); md5_process_bytes(m,&zblk[0],512-(int)(filesize&511)); } else { md5_process_bytes(m,filemap+(bno<<9),512); } md5_result(m,rvp); } static void blkhash_packet(PACKET *p) { int n; int i; unsigned char *dp; uquad blkno; unsigned char realhash[16]; uquad goodblks[22]; int ngood; uquad badblks[22]; int nbad; if (p->len < 4+1+1+7+16) { fprintf(stderr,"%s: too-small BLKHASH packet (len %d)\n",__progname,p->len); return; } n = p->data[4+1]; if ((n > 22) || (p->len < 4+1+1+(n*(7+16)))) { fprintf(stderr,"%s: BLKHASH packet claims count %d (len %d -> max count %d)\n",__progname,n,p->len,(p->len-(4+1+1))/(7+16)); return; } if (p->len > 4+1+1+(n*(7+16))) { fprintf(stderr,"%s: BLKHASH packet count %d, len %d > %d (dropping %d)\n",__progname,n,p->len,4+1+1+(n*(7+16)),p->len-(4+1+1+(n*(7+16)))); } dp = p->data + 4+1+1; ngood = 0; nbad = 0; for (i=0;i maxblock) { fprintf(stderr,"%s: BLKHASH packet blkno "UQFMT" past end of file, ignoring\n",__progname,blkno); continue; } get_block_hash(blkno,&realhash[0]); if (bcmp(&realhash[0],dp+7,16)) { badblks[nbad++] = blkno; } else { goodblks[ngood++] = blkno; blksxf += range_remove(&unsent,blkno,blkno); } dp += 7 + 16; } if (nbad > 0) send_block_list(PKOP_SENDME,nbad,&badblks[0]); if (ngood > 0) send_block_list(PKOP_IHAVE,ngood,&goodblks[0]); } static void data_packet(PACKET *p) { uquad blkno; PACKET *r; unsigned char hash[16]; if (p->len != 4+1+7+512) { fprintf(stderr,"%s: DATA packet with wrong length %d (ignored)\n",__progname,p->len); return; } blkno = getmbyte(p->data+4+1,7); if (blkno > maxblock) { fprintf(stderr,"%s: DATA packet blkno "UQFMT" past end of file, ignoring\n",__progname,blkno); return; } bcopy(p->data+4+1+7,filemap+(blkno<<9),(blkno==cutblock)?filesize&511:512); blksxf += range_remove(&unsent,blkno,blkno); get_block_hash(blkno,&hash[0]); r = make_packet(PKOP_GOTIT,blkno,&hash[0]); send_packet(r); free_packet(r); } static void receiver_exit(void) { msync(filemap,filesize,MS_SYNC); printf("Receiver all done\n"); exit(0); } static void sender_exit(void) { printf("Sender all done\n"); exit(0); } static void alldone_packet(PACKET *p) { PACKET *r; if (p->len != 5) { fprintf(stderr,"%s: ALLDONE packet with wrong length %d (ignored)\n",__progname,p->len); return; } r = make_packet(PKOP_EXITING); send_packet(r); free_packet(r); exitcount ++; timer_set(&exit_timer,5000); } static void setpendstate(PENDING *p, int new) { if (new != p->state) { pscount[p->state] --; p->state = new; pscount[new] ++; } } static void weed_pending(void) { int i; PENDING *p; for (i=0;istate) { case PS_OKAY: timer_disarm(&p->timer); if ((pendovf > 0) && (pscount[PS_RESV] < MAXPENDING-1)) { setpendstate(p,PS_RESV); pendovf = 0; pendnresv ++; } else { setpendstate(p,PS_FREE); pendfree[pendnfree++] = p->x; } blksxf += range_remove(&unsent,p->block,p->block); break; } } } static void send_need(void) { int i; PENDING *p; PACKET *q; for (i=0;istate) { case PS_SENT: if (timer_fired(&p->timer)) { case PS_NEED: q = make_packet(PKOP_DATA,p->block,filemap+(p->block<<9),(p->block==cutblock)?filesize&511:512); send_packet(q); free_packet(q); setpendstate(p,PS_SENT); timer_set(&p->timer,PKTTIMEOUT); } break; } } } static void send_hash(void) { uquad blocks[MAXPENDING]; const unsigned char *hashes[MAXPENDING]; int nb; int i; PACKET *q; PENDING *p; static void send_it(void) { q = make_packet(PKOP_BLKHASH,nb,&blocks[0],&hashes[0]); send_packet(q); free_packet(q); nb = 0; } nb = 0; for (i=0;istate) { case PS_QUERIED: if (timer_fired(&p->timer)) { case PS_UNTRIED: if (nb >= 22) send_it(); blocks[nb] = p->block; hashes[nb] = &p->hash[0]; nb ++; setpendstate(p,PS_QUERIED); timer_set(&p->timer,PKTTIMEOUT); } break; } } if (nb > 0) send_it(); } static void ihave_sendme_packet(PACKET *p, int state) { int n; unsigned char *dp; uquad blkno; int i; int j; PENDING *pd; if (p->len < 4+1+1+7) { fprintf(stderr,"%s: too-small %s packet (len %d)\n",__progname,type_name(p->data[4]),p->len); return; } n = p->data[4+1]; if (p->len < 4+1+1+(n*7)) { fprintf(stderr,"%s: %s packet claims count %d (len %d -> max count %d)\n",__progname,type_name(p->data[4]),n,p->len,(p->len-(4+1+1))/(7)); return; } if (p->len > 4+1+1+(n*7)) { fprintf(stderr,"%s: %s packet count %d, len %d > %d (dropping %d)\n",__progname,type_name(p->data[4]),n,p->len,4+1+1+(n*7),p->len-(4+1+1+(n*7))); } dp = p->data + 4+1+1; for (i=0;i maxblock) { fprintf(stderr,"%s: %s packet blkno "UQFMT" past end of file, ignoring\n",__progname,type_name(p->data[4]),blkno); continue; } for (j=0;jstate == PS_QUERIED) && (pd->block == blkno)) { setpendstate(pd,state); break; } } if (j >= MAXPENDING) { fprintf(stderr,"%s: %s packet blkno "UQFMT" not pending, ignoring\n",__progname,type_name(p->data[4]),blkno); continue; } } } static void ihave_packet(PACKET *p) { ihave_sendme_packet(p,PS_OKAY); } static void sendme_packet(PACKET *p) { ihave_sendme_packet(p,PS_NEED); } static void gotit_packet(PACKET *p) { uquad blkno; PENDING *pd; int i; int j; if (p->len != 4+1+7+16) { fprintf(stderr,"%s: wrong-size GOTIT packet (len %d)\n",__progname,p->len); return; } blkno = getmbyte(p->data+4+1,7); for (i=0;istate == PS_SENT) && (pd->block == blkno)) { if (bcmp(p->data+4+1+7,&pd->hash[0],16)) { fprintf(stderr,"%s: GOTIT packet hash wrong, blkno "UQFMT": wanted ",__progname,blkno); for (j=0;j<16;j++) fprintf(stderr,"%02x",pd->hash[j]); fprintf(stderr,", got "); for (j=0;j<16;j++) fprintf(stderr,"%02x",p->data[4+1+7+j]); fprintf(stderr,"\n"); } setpendstate(pd,PS_OKAY); break; } } if (i >= MAXPENDING) { fprintf(stderr,"%s: GOTIT packet blkno "UQFMT" not pending, ignoring\n",__progname,blkno); } } static void exiting_packet(PACKET *p __attribute__((__unused__))) { exitcount ++; } #ifndef NO_VERBOSE static int progress_write(void *arg __attribute__((__unused__)), const char *data, int len) { if (progress_len+len >= progress_alloc) { progress_alloc = progress_len + len + 1; progress_buf = realloc(progress_buf,progress_alloc); } bcopy(data,progress_buf+progress_len,len); progress_len += len; return(len); } #endif #ifndef NO_VERBOSE static void progress_endline(void) { int n; int i; fflush(progress); printf("%.*s",progress_len,progress_buf); n = progress_displen - progress_len; if (n > 0) { printf("%*s",n,""); for (i=n;i>0;i--) putchar('\b'); } for (i=progress_len;i>0;i--) putchar('\b'); printf("\r"); progress_displen = progress_len; progress_len = 0; fflush(stdout); } #endif static void timeout_check(void) { if (timeout_timer.fired) { fprintf(stderr,"%s: timeout expired\n",__progname); exit(1); } } static void startdelay_check(void) { if (startdelay_timer.fired) { fprintf(stderr,"%s: startdelay expired\n",__progname); exit(1); } } static void do_send(void) { int i; PACKET *q; nowvalid = 0; setup_read(); while (1) { timer_check(); startdelay_check(); timeout_check(); q = make_packet(PKOP_STARTSEND,filesize); send_packet(q); free_packet(q); q = recv_packet(5000); if (! q) continue; if (q->data[4] == PKOP_STARTRECV) break; if (q->data[4] != PKOP_NOOP) fprintf(stderr,"%s: sequence error (got %s, wanted STARTRECV)\n",__progname,type_name(q->data[4])); free_packet(q); } free_packet(q); timer_disarm(&startdelay_timer); blksxf = 0; unsent = 0; range_add(&unsent,0,maxblock-1); #ifndef NO_VERBOSE if (verbose) { progress = fwopen(0,progress_write); progress_len = 0; progress_alloc = 0; progress_buf = 0; progress_displen = 0; timer_init(&progress_timer); timer_set(&progress_timer,1000); } #endif while (1) { timer_check(); timeout_check(); weed_pending(); if (debugging) { const char *pref; int i; pref = "pend ["; for (i=1;i 0) && (ceiling < maxblock)) { uquad b; for (b=ceiling;(b0);b++) { PENDING *pd; i = pendfree[--pendnfree]; pd = &pend[i]; if (pd->state != PS_FREE) abort(); setpendstate(pd,PS_UNTRIED); pd->block = b; get_block_hash(b,&pd->hash[0]); } ceiling = b; } send_need(); send_hash(); if (pendnfree+pendnresv == MAXPENDING) break; q = recv_packet( (timers_n > 0) ? (tv_sub_tv_ms_chop(timers[0]->when,now())?:1) : INFTIM ); if (q) { switch (q->data[4]) { case PKOP_IHAVE: ihave_packet(q); break; case PKOP_SENDME: sendme_packet(q); break; case PKOP_GOTIT: gotit_packet(q); break; default: fprintf(stderr,"%s: sender ignoring %s packet (data)\n",__progname,type_name(q->data[4])); break; } free_packet(q); } } #ifndef NO_VERBOSE if (verbose) progress_endline(); #endif fflush(stdout); while (1) { if (exitcount >= 3) sender_exit(); q = make_packet(PKOP_ALLDONE); send_packet(q); free_packet(q); q = recv_packet(1000); if (q) { switch (q->data[4]) { case PKOP_EXITING: exiting_packet(q); break; default: fprintf(stderr,"%s: sender ignoring %s packet (exiting)\n",__progname,type_name(q->data[4])); break; } } } } static void set_size(uquad size) { void *mrv; if (size == filesize) return; if (filemap) munmap(filemap,filesize); filesize = size; ftruncate(filefd,size); mrv = mmap(0,filesize,PROT_READ|PROT_WRITE,MAP_SHARED,filefd,0); if (mrv == MAP_FAILED) { fprintf(stderr,"%s: can't mmap %s: %s\n",__progname,filename,strerror(errno)); exit(1); } filemap = mrv; maxblock = (filesize + 511) >> 9; cutblock = filesize >> 9; } static void do_recv(void) { PACKET *p; nowvalid = 0; setup_write(); while (1) { timer_check(); startdelay_check(); timeout_check(); p = recv_packet(15000); if (! p) { p = make_packet(PKOP_NOOP); send_packet(p); free_packet(p); continue; } if (p->data[4] == PKOP_STARTSEND) break; fprintf(stderr,"%s: sequence error (got type %s, wanted STARTSEND)\n",__progname,type_name(p->data[4])); free_packet(p); } do { if (p->len == 4+1+8) { PACKET *r; set_size(getmbyte(p->data+4+1,8)); r = make_packet(PKOP_STARTRECV); send_packet(r); free_packet(r); } free_packet(p); do { timer_check(); startdelay_check(); timeout_check(); p = recv_packet(5000); } while (! p); } while (p->data[4] == PKOP_STARTSEND); timer_disarm(&startdelay_timer); blksxf = 0; unsent = 0; range_add(&unsent,0,maxblock-1); #ifndef NO_VERBOSE if (verbose) { progress = fwopen(0,progress_write); progress_len = 0; progress_alloc = 0; progress_buf = 0; progress_displen = 0; timer_init(&progress_timer); timer_set(&progress_timer,1000); } #endif timer_init(&exit_timer); while (1) { timer_check(); timeout_check(); #ifndef NO_VERBOSE if (verbose && progress_timer.fired) { timer_set(&progress_timer,1000); fprintf(progress,UQFMT"/"UQFMT" ",blksxf,maxblock); range_print(unsent,progress); progress_endline(); } #endif fflush(stdout); if (p) { switch (p->data[4]) { case PKOP_BLKHASH: blkhash_packet(p); break; case PKOP_DATA: data_packet(p); break; case PKOP_ALLDONE: alldone_packet(p); break; default: fprintf(stderr,"%s: receiver ignoring %s packet\n",__progname,type_name(p->data[4])); break; } free_packet(p); } if ( (exitcount >= 10) || ((exitcount > 0) && exit_timer.fired) ) { #ifndef NO_VERBOSE if (verbose) progress_endline(); #endif fflush(stdout); receiver_exit(); } p = recv_packet(0); if (! p) { p = recv_packet( (timers_n > 0) ? tv_sub_tv_ms_chop(timers[0]->when,now()) : INFTIM ); } } } static void setup(const char *s_my_port, const char *s_peer_ip, const char *s_peer_port, const char *s_key, const char *s_filename) { struct sockaddr_storage ss; #ifdef NO_SS_LEN int sslen; #else #define sslen ss.ss_len #endif union { struct in_addr v4; struct in6_addr v6; } u; my_port = atoi(s_my_port); peer_port = atoi(s_peer_port); bzero(&ss,sizeof(ss)); bzero(&sendss,sizeof(sendss)); if (inet_pton(AF_INET6,s_peer_ip,&u.v6)) { ss.ss_family = AF_INET6; sslen = sizeof(struct sockaddr_in6); ((struct sockaddr_in6 *)&ss)->sin6_addr = in6addr_any; ((struct sockaddr_in6 *)&ss)->sin6_port = htons(my_port); sendss.ss_family = AF_INET6; sendsslen = sizeof(struct sockaddr_in6); ((struct sockaddr_in6 *)&sendss)->sin6_addr = u.v6; ((struct sockaddr_in6 *)&sendss)->sin6_port = htons(peer_port); } else if (inet_pton(AF_INET,s_peer_ip,&u.v4)) { ss.ss_family = AF_INET; sslen = sizeof(struct sockaddr_in); ((struct sockaddr_in *)&ss)->sin_addr.s_addr = INADDR_ANY; ((struct sockaddr_in *)&ss)->sin_port = htons(my_port); sendss.ss_family = AF_INET; sendsslen = sizeof(struct sockaddr_in); ((struct sockaddr_in *)&sendss)->sin_addr = u.v4; ((struct sockaddr_in *)&sendss)->sin_port = htons(peer_port); } else { fprintf(stderr,"%s: %s: invalid address\n",__progname,s_peer_ip); exit(1); } key = s_key; keylen = strlen(key); filename = s_filename; filemap = 0; s = socket(ss.ss_family,SOCK_DGRAM,0); if (bind(s,(struct sockaddr *)&ss,sslen) < 0) { fprintf(stderr,"%s: bind: %s\n",__progname,strerror(errno)); exit(1); } exitcount = 0; timers = 0; timers_alloc = 0; timers_n = 0; timer_init(&timeout_timer); timer_init(&startdelay_timer); if (timeout) timer_set(&timeout_timer,timeout*1000); if (startdelay) timer_set(&startdelay_timer,startdelay*1000); #undef sslen /* no harm if not already defined */ } int main(int, char **); int main(int ac, char **av) { void (*fn)(void); while ((ac > 1) && (av[1][0] == '-')) { if (! strcmp(av[1],"-startdelay")) { startdelay = atoi(av[2]); av += 2; ac -= 2; continue; } if (! strcmp(av[1],"-timeout")) { timeout = atoi(av[2]); av += 2; ac -= 2; continue; } fprintf(stderr,"%s: unrecognized flag `%s'\n",__progname,av[1]); ac = 0; break; } if (ac != 7) { fprintf(stderr,"Usage: %s [qv]{send|put|recv|get} my-port peer-ip peer-port key filename\n",__progname); fprintf(stderr,"Options:\n"); fprintf(stderr,"\t-startdelay NNN\n"); fprintf(stderr,"\t\tIf communication hasn't been established after NNN seconds,\n"); fprintf(stderr,"\t\texit with an error.\n"); fprintf(stderr,"\t-timeout NNN\n"); fprintf(stderr,"\t\tIf the transfer still isn't done after NNN seconds,\n"); fprintf(stderr,"\t\texit with an error.\n"); fprintf(stderr,"Times longer than about three weeks may not work right.\n"); exit(1); } while (av[1][0] == 'D') { debugging ++; av[1] ++; } if (debugging) setbuf(stdout,0); switch (av[1][0]) { case 'q': #ifndef NO_VERBOSE verbose = 0; #endif if (0) { case 'v': #ifdef NO_VERBOSE fprintf(stderr,"%s: this build doesn't support verbosity\n",__progname); fprintf(stderr,"%s: using `q%s' instead\n",__progname,av[1]+1); #else verbose = 1; #endif } if ( !strcmp(av[1]+1,"send") || !strcmp(av[1]+1,"put") ) fn = do_send; else if ( !strcmp(av[1]+1,"recv") || !strcmp(av[1]+1,"get") ) fn = do_recv; else { default: fprintf(stderr,"%s: first arg must be operation [qv]{send|recv|get|put}\n",__progname); exit(1); } break; } setup(av[2],av[3],av[4],av[5],av[6]); (*fn)(); exit(0); }