diff options
-rw-r--r-- | qacp.c | 88 | ||||
-rw-r--r-- | qarsh_packet.c | 59 | ||||
-rw-r--r-- | qarsh_packet.h | 9 | ||||
-rw-r--r-- | qarshd.c | 137 | ||||
-rw-r--r-- | sockutil.c | 82 |
5 files changed, 170 insertions, 205 deletions
@@ -123,17 +123,13 @@ void qacp_sendonefile(const char *host, const char *srcfile, const char *destfile) { struct qa_packet *qp; - struct sockaddr_in daddr; - socklen_t dlen; int fd; - int sd; - int outsd; - int port; + int remfd; ssize_t nbytes; - off_t offset = 0; + off_t offset; struct stat sb; - char *fallbackbuf; - int bufsize; + const int bufsize = 16384; + char buf[bufsize]; if ((fd = open(srcfile, O_RDONLY)) <0) { @@ -148,56 +144,51 @@ qacp_sendonefile(const char *host, const char *srcfile, const char *destfile) exit(errno); } - sd = bind_any(QARSH_MINPORT, qarsh_ss_family); - port = getsockport(sd); + if (S_ISDIR(sb.st_mode)) { + fprintf(stderr, "Skipping directory %s, recursive not supported, use rsync.\n", srcfile); + return; + } - /* Recall that the packet types are qarshd-centric, so if we want - * to send a file to the host running qarshd we have to tell - * qarshd to recv a file. */ + /* Packet types are qarshd-centric, so if we want to send a file to the + * host running qarshd we have to tell qarshd to recv a file. */ - qp = make_qp_recvfile(destfile, port, sb.st_size, sb.st_mode); - qp->qp_seq = packet_seq++; + qp = make_qp_recvfile(destfile, sb.st_size, sb.st_mode); send_packet(qacp_fd, qp); qpfree(qp); - dlen = sizeof daddr; - outsd = accept(sd, (struct sockaddr *) &daddr, &dlen); + /* Await our return code from qarshd */ + qp = recv_packet(qacp_fd); + if (qp && qp->qp_type == QP_RETURNCODE) { + if (qp->qp_returncode.qp_rc == 0) { + remfd = qp->qp_returncode.qp_errno; + } else if (qp->qp_returncode.qp_rc == -1) { + fprintf(stderr, "Remote side failed: %s\n", + qp->qp_returncode.qp_strerror); + goto sendone_error; + } else { + fprintf(stderr, "Unexpected return code %d\n", + qp->qp_returncode.qp_rc); + goto sendone_error; + } + } else { + fprintf(stderr, "Did not receive response to recvfile\n"); + goto sendone_failure; + } + offset = 0; do { - nbytes = sendfile(outsd, fd, &offset, sb.st_size); - } while (nbytes >= 0 && offset < sb.st_size); - - if (nbytes == -1 && errno == EINVAL) { - fprintf(stderr, "Falling back to reads\n"); - - if (sb.st_size < 1024000) { - bufsize = 8192; + nbytes = read(fd, buf, bufsize); + if (nbytes < 0) { + fprintf(stderr, "read() error: %s\n", strerror(errno)); + qp = make_qp_returncode(-1, errno, strerror(errno)); } else { - bufsize = 1024000; - } - fallbackbuf = malloc(bufsize); - if (fallbackbuf == NULL) { - fprintf(stderr, "Could not allocate transfer buffer\n"); - goto sendone_failure; + qp = make_qp_data(remfd, offset, nbytes, buf); } - - do { - nbytes = read(fd, fallbackbuf, bufsize); - if (nbytes < 0) { - fprintf(stderr, "read() error: %s\n", strerror(errno)); - goto sendone_failure; - } else if (nbytes == 0) { /* EOF */ - break; - } - write(outsd, fallbackbuf, nbytes); - } while (nbytes > 0); - } else if (nbytes == -1) { - fprintf(stderr, "error: %s\n", strerror(errno)); - } + send_packet(qacp_fd, qp); + offset += nbytes; + } while (nbytes > 0); sendone_failure: - close(sd); - close(outsd); close(fd); /* Await our return code from qarshd */ @@ -218,6 +209,9 @@ sendone_failure: } return; +sendone_error: + close(qacp_fd); + exit(125); } diff --git a/qarsh_packet.c b/qarsh_packet.c index eb4eac0..269675f 100644 --- a/qarsh_packet.c +++ b/qarsh_packet.c @@ -305,7 +305,6 @@ void parse_qp_recvfile(char *buf, int *buflen, struct qa_packet *qp) { buf = fetch_string(buf, buflen, &(qp->qp_recvfile.qp_path)); - buf = fetch_int(buf, buflen, &(qp->qp_recvfile.qp_if_port)); buf = fetch_off_t(buf, buflen, &(qp->qp_recvfile.qp_count)); buf = fetch_int(buf, buflen, (int *)&(qp->qp_recvfile.qp_mode)); } @@ -314,7 +313,7 @@ void parse_qp_sendfile(char *buf, int *buflen, struct qa_packet *qp) { buf = fetch_string(buf, buflen, &(qp->qp_sendfile.qp_path)); - buf = fetch_int(buf, buflen, &(qp->qp_sendfile.qp_of_port)); + buf = fetch_int(buf, buflen, &(qp->qp_sendfile.qp_remfd)); } void @@ -467,7 +466,6 @@ char * store_qp_recvfile(char *buf, struct qa_packet *qp) { buf = store_string(buf, qp->qp_recvfile.qp_path); - buf = store_int(buf, qp->qp_recvfile.qp_if_port); buf = store_off_t(buf, qp->qp_recvfile.qp_count); buf = store_int(buf, qp->qp_recvfile.qp_mode); return buf; @@ -477,7 +475,7 @@ char * store_qp_sendfile(char *buf, struct qa_packet *qp) { buf = store_string(buf, qp->qp_sendfile.qp_path); - buf = store_int(buf, qp->qp_sendfile.qp_of_port); + buf = store_int(buf, qp->qp_sendfile.qp_remfd); return buf; } @@ -631,7 +629,7 @@ make_qp_kill(int sig) } struct qa_packet * -make_qp_recvfile(const char *path, int if_port, off_t count, mode_t mode) +make_qp_recvfile(const char *path, off_t count, mode_t mode) { struct qa_packet *qp; qp = malloc(sizeof *qp); @@ -640,7 +638,6 @@ make_qp_recvfile(const char *path, int if_port, off_t count, mode_t mode) qp->qp_type = QP_RECVFILE; qp->qp_recvfile.qp_path = strdup(path); - qp->qp_recvfile.qp_if_port = if_port; qp->qp_recvfile.qp_count = count; qp->qp_recvfile.qp_mode = mode; @@ -648,7 +645,7 @@ make_qp_recvfile(const char *path, int if_port, off_t count, mode_t mode) } struct qa_packet * -make_qp_sendfile(const char *path, int of_port) +make_qp_sendfile(const char *path, int remfd) { struct qa_packet *qp; qp = malloc(sizeof *qp); @@ -657,7 +654,7 @@ make_qp_sendfile(const char *path, int of_port) qp->qp_type = QP_SENDFILE; qp->qp_sendfile.qp_path = strdup(path); - qp->qp_sendfile.qp_of_port = of_port; + qp->qp_sendfile.qp_remfd = remfd; return qp; } @@ -773,85 +770,85 @@ qpfree(struct qa_packet *qp) void dump_qp_ack(struct qa_packet *qp) { - fprintf(stderr, "\t%s #%d\n", QP_NAME(qp->qp_ack.qp_ack_type), + fprintf(stderr, "%s #%d", QP_NAME(qp->qp_ack.qp_ack_type), qp->qp_ack.qp_ack_seq); } void dump_qp_runcmd(struct qa_packet *qp) { - fprintf(stderr, "\tcmdline: %s\n", qp->qp_runcmd.qp_cmdline); + fprintf(stderr, "cmdline: %s", qp->qp_runcmd.qp_cmdline); } void dump_qp_returncode(struct qa_packet *qp) { - fprintf(stderr, "\trc: %d\n", qp->qp_returncode.qp_rc); + fprintf(stderr, "rc: %d", qp->qp_returncode.qp_rc); } void dump_qp_cmdexit(struct qa_packet *qp) { if (WIFEXITED(qp->qp_cmdexit.qp_status)) { - fprintf(stderr, "\texited: %d\n", WEXITSTATUS(qp->qp_cmdexit.qp_status)); + fprintf(stderr, "exited: %d", WEXITSTATUS(qp->qp_cmdexit.qp_status)); } else if (WIFSIGNALED(qp->qp_cmdexit.qp_status)) { - fprintf(stderr, "\tsignaled: %d\n", WTERMSIG(qp->qp_cmdexit.qp_status)); + fprintf(stderr, "signaled: %d", WTERMSIG(qp->qp_cmdexit.qp_status)); } else { - fprintf(stderr, "\tstatus: %d\n", qp->qp_cmdexit.qp_status); + fprintf(stderr, "status: %d", qp->qp_cmdexit.qp_status); } } void dump_qp_setuser(struct qa_packet *qp) { - fprintf(stderr, "\tuser: %s\n", qp->qp_setuser.qp_user); - fprintf(stderr, "\tgroup: %s\n", qp->qp_setuser.qp_group); + fprintf(stderr, "user: %s group: %s", + qp->qp_setuser.qp_user, qp->qp_setuser.qp_group); } void dump_qp_kill(struct qa_packet *qp) { - fprintf(stderr, "\tsig: %d\n", qp->qp_kill.qp_sig); + fprintf(stderr, "sig: %d", qp->qp_kill.qp_sig); } void dump_qp_recvfile(struct qa_packet *qp) { - fprintf(stderr, "\tpath: %s\n", qp->qp_recvfile.qp_path); - fprintf(stderr, "\tmode: %o\n", qp->qp_recvfile.qp_mode); - fprintf(stderr, "\tcount: %lld\n", (long long int)qp->qp_recvfile.qp_count); + fprintf(stderr, "path: %s mode: %o count: %lld", + qp->qp_recvfile.qp_path, qp->qp_recvfile.qp_mode, + (long long int)qp->qp_recvfile.qp_count); } void dump_qp_sendfile(struct qa_packet *qp) { - fprintf(stderr, "\tpath: %s\n", qp->qp_sendfile.qp_path); - fprintf(stderr, "\tremfd: %d\n", qp->qp_sendfile.qp_remfd); + fprintf(stderr, "path: %s remfd: %d", + qp->qp_sendfile.qp_path, qp->qp_sendfile.qp_remfd); } void dump_qp_rstat(struct qa_packet *qp) { - fprintf(stderr, "\tpath: %s\n", qp->qp_rstat.qp_path); - fprintf(stderr, "\tst_mode: %o\n", qp->qp_rstat.qp_st_mode); - fprintf(stderr, "\tst_uid: %d\n", qp->qp_rstat.qp_st_uid); - fprintf(stderr, "\tst_gid: %d\n", qp->qp_rstat.qp_st_gid); - fprintf(stderr, "\tst_size: %lld\n", (long long int)qp->qp_rstat.qp_st_size); + fprintf(stderr, "path: %s st_mode: %o st_uid: %d st_gid: %d st_size: %lld", + qp->qp_rstat.qp_path, qp->qp_rstat.qp_st_mode, + qp->qp_rstat.qp_st_uid, qp->qp_rstat.qp_st_gid, + (long long int)qp->qp_rstat.qp_st_size); } void dump_qp_data(struct qa_packet *qp) { - fprintf(stderr, "\tremfd: %d\n", qp->qp_data.qp_remfd); - fprintf(stderr, "\toffset: %lld\n", (long long int)qp->qp_data.qp_offset); - fprintf(stderr, "\tcount: %d\n", qp->qp_data.qp_count); + fprintf(stderr, "remfd: %d offset: %lld count: %d", + qp->qp_data.qp_remfd, (long long int)qp->qp_data.qp_offset, qp->qp_data.qp_count); } void dump_qp(struct qa_packet *qp) { - fprintf(stderr, "%s #%d\n", QP_NAME(qp->qp_type), qp->qp_seq); + fprintf(stderr, "#%d %s ", qp->qp_seq, QP_NAME(qp->qp_type)); if (qa_pi[qp->qp_type].pi_dump) { qa_pi[qp->qp_type].pi_dump(qp); } + fprintf(stderr, "\n"); + fflush(stderr); } diff --git a/qarsh_packet.h b/qarsh_packet.h index 42f4b61..908a6da 100644 --- a/qarsh_packet.h +++ b/qarsh_packet.h @@ -80,13 +80,12 @@ struct qp_kill_pkt { struct qp_recvfile_pkt { char *qp_path; mode_t qp_mode; - int qp_if_port; off_t qp_count; }; struct qp_sendfile_pkt { char *qp_path; - int qp_of_port; + int qp_remfd; /* file descriptor on client side */ }; struct qp_rstat_pkt { @@ -105,8 +104,6 @@ struct qp_data_pkt { void *qp_blob; }; -#define QP_VERSION 1 - struct qa_packet { enum qa_packet_type qp_type; int qp_seq; /* Sequence number for this packet */ @@ -147,8 +144,8 @@ struct qa_packet *make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err); struct qa_packet *make_qp_cmdexit(pid_t pid, int status); struct qa_packet *make_qp_setuser(char *user, char *group); struct qa_packet *make_qp_kill(int sig); -struct qa_packet *make_qp_recvfile(const char *path, int if_port, off_t count, mode_t mode); -struct qa_packet *make_qp_sendfile(const char *path, int of_port); +struct qa_packet *make_qp_recvfile(const char *path, off_t count, mode_t mode); +struct qa_packet *make_qp_sendfile(const char *path, int remfd); struct qa_packet *make_qp_rstat(const char *path, const struct stat *sb); struct qa_packet *make_qp_data(const int remfd, const off_t offset, const int count, void *blob); int qptostr(struct qa_packet *qp, char *qpstr, int maxsize); @@ -47,7 +47,6 @@ int debug = 0; - /* Globals */ struct sockaddr_storage peername; int child_exitted = 0; @@ -57,6 +56,12 @@ sigset_t orig_sigmask; char *saved_path = NULL; struct stat saved_stat; +struct remotefd { + int fd; + int direction; /* 0 incoming, 1 outgoing */ + off_t expectedsize; + off_t received; +} remotefds[8]; int setup_user(char *user, char *group) @@ -126,58 +131,53 @@ run_cmd(const char *cmd, int p_in, int p_out, int p_err) return pid; } -off_t -recvfile(const char *path, int if_port, off_t count, mode_t mode) +struct qa_packet * +prepare_recvfile(struct qa_packet *qp) { - int sd; - int ofd; - char buf[BUFSIZ]; - ssize_t nread, nwrote; - off_t nleft; - - /* Read count bytes from ifd (sd after we connect), - * write into file @ path - */ - - sd = connect_to_peer(&peername, if_port); - if (sd == -1) { - syslog(LOG_WARNING, "connect to if_port failed\n"); - return -1; + int fd; + + syslog(LOG_INFO, "Receive file %s, size = %lld, mode = %o\n", + qp->qp_recvfile.qp_path, + (long long int)qp->qp_recvfile.qp_count, + qp->qp_recvfile.qp_mode); + + if ((fd = open(qp->qp_recvfile.qp_path, O_TRUNC|O_CREAT|O_WRONLY, + qp->qp_recvfile.qp_mode)) < 0) { + syslog(LOG_WARNING, "Could not open %s to receive file: %s\n", + qp->qp_recvfile.qp_path, strerror(errno)); + return make_qp_returncode(-1, errno, strerror(errno)); } - if ((ofd = open(path, O_TRUNC|O_CREAT|O_WRONLY, mode)) < 0) { - syslog(LOG_WARNING, "Could not open %s to recv file: %s\n", - path, strerror(errno)); - return -1; - } - - fchmod(ofd, mode); - - nleft = count; - while (nleft > 0) { - nread = read(sd, buf, BUFSIZ); - if (nread < 0) { - return nread; - } else if (nread == 0) { /* EOF */ - break; - } + /* Set permissions again to override umask */ + fchmod(fd, qp->qp_recvfile.qp_mode); - nwrote = write(ofd, buf, nread); - nleft -= nread; - } + /* Store fd to check data packets against */ + remotefds[0].fd = fd; + remotefds[0].direction = 0; + remotefds[0].expectedsize = qp->qp_recvfile.qp_count; + remotefds[0].received = 0; - if (nleft != 0) { - unlink(path); - syslog(LOG_WARNING, "Short file transfer in recvfile(), " - "%lld bytes lost, wanted %lld\n", - (long long int)nleft, - (long long int)count); - } + return make_qp_returncode(0, fd, "Ready to receive"); +} - close(sd); - close(ofd); +struct qa_packet * +receive_data(struct remotefd *rfd, struct qa_packet *qp) +{ + ssize_t nwrote; + + if (qp->qp_data.qp_count == 0) { /* EOF */ + close(rfd->fd); + syslog(LOG_DEBUG, "Transfer complete\n"); + return make_qp_returncode(0, 0, "Transfer complete"); + } + /* syslog(LOG_DEBUG, "Data for %d, %d@%ld\n", rfd->fd, qp->qp_data.qp_count, qp->qp_data.qp_offset); */ + nwrote = write(rfd->fd, qp->qp_data.qp_blob, qp->qp_data.qp_count); - return count - nleft; + if (nwrote < 0) { + return make_qp_returncode(-1, errno, strerror(errno)); + } + rfd->received += nwrote; + return NULL; } ssize_t @@ -267,6 +267,10 @@ handle_packets(int infd) sa.sa_mask = sigmask; sa.sa_flags = SA_RESTART; sigaction(SIGCHLD, &sa, NULL); +#if 0 + signal(SIGALRM, sig_handler); + pause(); +#endif for (;;) { FD_SET(infd, &rfds); @@ -298,7 +302,7 @@ handle_packets(int infd) if (debug) syslog(LOG_DEBUG, "That's enough\n"); break; } - dump_qp(qp); + if (debug) dump_qp(qp); switch (qp->qp_type) { case QP_KILL: if (child_pid) { @@ -324,38 +328,26 @@ handle_packets(int infd) qp->qp_runcmd.qp_stdout_port, qp->qp_runcmd.qp_stderr_port); break; - case QP_RECVFILE: - syslog(LOG_INFO, "Got a QP_RECVFILE with path = %s, " - "ifd = %d, count = %lld, mode = %o\n", - qp->qp_recvfile.qp_path, - qp->qp_recvfile.qp_if_port, - (long long int)qp->qp_recvfile.qp_count, - qp->qp_recvfile.qp_mode); - nbytes = recvfile(qp->qp_recvfile.qp_path, - qp->qp_recvfile.qp_if_port, - qp->qp_recvfile.qp_count, - qp->qp_recvfile.qp_mode); - if (nbytes < 0) { - rp = make_qp_returncode(-1, errno, strerror(errno)); - } else if (nbytes < qp->qp_recvfile.qp_count) { - char tmpstr[512]; - sprintf(tmpstr, "Excpected %lld, wrote %lld\n", - (long long int)qp->qp_recvfile.qp_count, - (long long int)nbytes); - rp = make_qp_returncode(-1, 0, tmpstr); - } else { - rp = make_qp_returncode(0, 0, "Transfer Complete"); - } + case QP_RECVFILE: /* Setup file descriptors to handle incoming data */ + rp = prepare_recvfile(qp); send_packet(fileno(stdout), rp); qpfree(rp); break; + case QP_DATA: + assert(qp->qp_data.qp_remfd == remotefds[0].fd); + rp = receive_data(&remotefds[0], qp); + if (rp) { + send_packet(fileno(stdout), rp); + qpfree(rp); + } + break; case QP_SENDFILE: syslog(LOG_INFO, "Got a QP_SENDFILE with path = %s, " - "ofd = %d\n", + "remfd = %d\n", qp->qp_sendfile.qp_path, - qp->qp_sendfile.qp_of_port); + qp->qp_sendfile.qp_remfd); nbytes = pushfile(qp->qp_sendfile.qp_path, - qp->qp_sendfile.qp_of_port); + qp->qp_sendfile.qp_remfd); if (nbytes < 0) { rp = make_qp_returncode(-1, errno, strerror(errno)); } else { @@ -376,6 +368,7 @@ handle_packets(int infd) "Packet type %s unimplemented", qp_packet_type(qp->qp_type)); } + qpfree(qp); } else { if (debug) syslog(LOG_DEBUG, "Nothing to do\n"); } @@ -202,75 +202,59 @@ connect_to_peer(struct sockaddr_storage *peer, int port) * All incoming and outgoing packets go through this function. * Caller should make sure the fd is ready to read. */ -typedef struct { - char buf[QARSH_MAX_PACKET_SIZE]; - int start; - int end; -} Buffer; - struct qa_packet * recv_packet(int fd) { - static Buffer pb = { "", 0, 0 }; + char buf[QARSH_MAX_PACKET_SIZE]; uint32_t packetsize; - struct qa_packet *qp = NULL; - int ret; - - if (pb.start == pb.end && pb.start) - pb.start = pb.end = 0; + int ret = 0; -recv_read: - ret = read(fd, pb.buf, QARSH_MAX_PACKET_SIZE - pb.start); - if (ret > 0) { - pb.end = pb.start + ret; - /* get packet size from the packet */ - packetsize = ntohl(*(uint32_t *)(pb.buf+pb.start)); - if (packetsize > QARSH_MAX_PACKET_SIZE) { - printf("Packet size too large, %d > %d\n", packetsize, QARSH_MAX_PACKET_SIZE); - return NULL; - } else if (packetsize <= (ret - sizeof packetsize)) { - pb.start += sizeof packetsize; - qp = parse_packet(pb.buf + pb.start, pb.end - pb.start); - } else { /* packetsize > what we read, we need to read more */ - printf("Compacting buffer to fit rest of packet, %d read, %d left\n", ret, packetsize - ret); - memmove(pb.buf, pb.buf + pb.start, pb.end - pb.start); - pb.end -= pb.start; - pb.start = 0; - goto recv_read; - } - - /* Get the packet buffer ready for the next call */ - if (packetsize == (ret - sizeof packetsize)) { /* The read was a complete packet */ - pb.start = pb.end = 0; - printf("Complete read used\n"); - } else { - pb.start += packetsize; - memmove(pb.buf, pb.buf + pb.start, pb.end - pb.start); - pb.end -= pb.start; - pb.start = 0; - } + if ((ret = read(fd, &packetsize, sizeof packetsize)) < 0) { + fprintf(stderr, "Read error while reading packet size: %s\n", strerror(errno)); + return NULL; + } else if (ret == 0) { + return NULL; + } else if (ret != sizeof packetsize) { + fprintf(stderr, "Did not read packet size"); + return NULL; + } + packetsize = ntohl(packetsize); + if (packetsize > QARSH_MAX_PACKET_SIZE) { + fprintf(stderr, "Packet size too large, %d > %d\n", packetsize, QARSH_MAX_PACKET_SIZE); + return NULL; + } + if ((ret = read(fd, buf, packetsize)) < 0) { + fprintf(stderr, "Read error while reading packet data: %s\n", strerror(errno)); + return NULL; } - return qp; + if (ret != packetsize) { + fprintf(stderr, "Read something other than packetsize bytes, %d != %d\n", + ret, packetsize); + return NULL; + } + + return parse_packet(buf, packetsize); } int send_packet(int fd, struct qa_packet *qp) { - Buffer pb = { "", 0, 0 }; + char buf[QARSH_MAX_PACKET_SIZE]; uint32_t netsize; + int len; ssize_t ret = -1; struct iovec iovs[2]; qp->qp_seq = packet_seq++; - pb.end = qptostr(qp, pb.buf, QARSH_MAX_PACKET_SIZE - sizeof netsize); + len = qptostr(qp, buf, QARSH_MAX_PACKET_SIZE - sizeof netsize); - if (pb.end > 0) { - netsize = htonl(pb.end); + if (len > 0) { + netsize = htonl(len); iovs[0].iov_base = &netsize; iovs[0].iov_len = sizeof netsize; - iovs[1].iov_base = pb.buf; - iovs[1].iov_len = pb.end; + iovs[1].iov_base = buf; + iovs[1].iov_len = len; ret = writev(fd, iovs, 2); } |