diff options
author | Nathan Straz <nstraz@redhat.com> | 2013-08-28 14:16:30 -0400 |
---|---|---|
committer | Nathan Straz <nstraz@redhat.com> | 2013-09-11 17:49:33 -0400 |
commit | ceaf5360d969f2507018f51876f64aaae767e367 (patch) | |
tree | 07d3db73a4186ce2dcad660393e9265b0ff254ca | |
parent | 604b053a1e710f22226fcb86c34e737df1058f92 (diff) | |
download | qarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.gz qarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.xz qarsh-ceaf5360d969f2507018f51876f64aaae767e367.zip |
Get commands running over one socket
Added a new packet to limit data sent from the other side.
-rw-r--r-- | qarsh.c | 224 | ||||
-rw-r--r-- | qarsh_packet.c | 57 | ||||
-rw-r--r-- | qarsh_packet.h | 16 | ||||
-rw-r--r-- | qarshd.c | 254 |
4 files changed, 355 insertions, 196 deletions
@@ -183,87 +183,21 @@ run_remote_cmd(char *cmdline) { struct qa_packet *qp; int rc; - int p_in, p_out, p_err; /* Port numbers */ - int l_in, l_out, l_err; /* listening sockets */ - int c_in, c_out, c_err; /* client sockets */ - char *b_in, *b_out, *b_err; /* Buffers */ - int z_in, z_out, z_err; /* size in buffer */ - fd_set readfds, testfds, testwds; + int allowed_in = 0; /* bytes we can send to qarshd */ + char b_out[QARSH_BUFSIZE], b_err[QARSH_BUFSIZE]; /* Buffers */ + char buf[QARSH_BUFSIZE]; + int z_out = 0, z_err = 0; /* size in buffer */ + int eof_out = 0, eof_err = 0; + fd_set rfds, wfds; int nset; - struct sockaddr_in caddr; - socklen_t clen; + int nbytes; struct timespec timeout; short cmd_finished; - l_in = bind_any(QARSH_MINPORT, qarsh_ss_family); - p_in = getsockport(l_in); - l_out = bind_any(QARSH_MINPORT, qarsh_ss_family); - p_out = getsockport(l_out); - l_err = bind_any(QARSH_MINPORT, qarsh_ss_family); - p_err = getsockport(l_err); - - qp = make_qp_runcmd(cmdline, p_in, p_out, p_err); + qp = make_qp_runcmd(cmdline); send_packet(qarsh_fd, qp); qpfree(qp); - /* Get the stdin, stdout, and stderr connections up before we do work */ - FD_ZERO(&readfds); - FD_SET(l_in, &readfds); - FD_SET(l_out, &readfds); - FD_SET(l_err, &readfds); - c_in = c_out = c_err = 0; - - do { - testfds = readfds; - nset = select(FD_SETSIZE, &testfds, NULL, NULL, NULL); - - if (FD_ISSET(l_in, &testfds)) { - clen = sizeof caddr; - c_in = accept(l_in, (struct sockaddr *)&caddr, &clen); - if (c_in == -1) { - fprintf(stderr, - "accept of l_in failed, %d: %s\n", - errno, strerror(errno)); - continue; - } - } - if (FD_ISSET(l_out, &testfds)) { - clen = sizeof caddr; - c_out = accept(l_out, (struct sockaddr *)&caddr, &clen); - if (c_out == -1) { - fprintf(stderr, - "accept of l_out failed, %d: %s\n", - errno, strerror(errno)); - continue; - } - } - if (FD_ISSET(l_err, &testfds)) { - clen = sizeof caddr; - c_err = accept(l_err, (struct sockaddr *)&caddr, &clen); - if (c_err == -1) { - fprintf(stderr, - "accept of l_err failed, %d: %s\n", - errno, strerror(errno)); - continue; - } - } - } while (c_in == 0 || c_out == 0 || c_err == 0); - close(l_in); - close(l_out); - close(l_err); - l_in = l_out = l_err = -1; - - /* Now we can start doing some real work */ - - FD_ZERO(&readfds); - FD_SET(qarsh_fd, &readfds); - FD_SET(c_out, &readfds); - FD_SET(c_err, &readfds); - /* Ignore stdin if it's a tty, we don't want to deal with - * tty i/o if we're a background process */ - if (!isatty(fileno(stdin))) { - FD_SET(fileno(stdin), &readfds); - } /* Setup signal handling stuff so we can propogate signals */ setup_signals(); @@ -272,10 +206,14 @@ run_remote_cmd(char *cmdline) "fcntl stdin O_NONBLOCK failed, %d: %s\n", errno, strerror(errno)); } - /* allocate one buffer for each stream */ - b_in = malloc(QARSH_BUFSIZE); memset(b_in, 0, QARSH_BUFSIZE); z_in = 0; - b_out = malloc(QARSH_BUFSIZE); memset(b_out, 0, QARSH_BUFSIZE); z_out = 0; - b_err = malloc(QARSH_BUFSIZE); memset(b_err, 0, QARSH_BUFSIZE); z_err = 0; + + /* Tell qarshd how much data it can send on stdout and stderr */ + qp = make_qp_data_allow(1, QARSH_BUFSIZE); + send_packet(qarsh_fd, qp); + qpfree(qp); + qp = make_qp_data_allow(2, QARSH_BUFSIZE); + send_packet(qarsh_fd, qp); + qpfree(qp); hbeat(qarsh_hb); cmd_finished = 0; @@ -287,17 +225,19 @@ run_remote_cmd(char *cmdline) timeout.tv_sec = 5; timeout.tv_nsec = 0; } - testfds = readfds; - FD_ZERO(&testwds); - /* If there are things to write, set the write bit and clear the read */ - /* We use one buffer per channel and we don't want to overwrite it */ - if (z_in) { FD_CLR(fileno(stdin), &testfds); FD_SET(c_in, &testwds); } - if (z_out) { FD_CLR(c_out, &testfds); FD_SET(fileno(stdout), &testwds); } - if (z_err) { FD_CLR(c_err, &testfds); FD_SET(fileno(stderr), &testwds); } - - nset = pselect(FD_SETSIZE, &testfds, &testwds, NULL, &timeout, + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_SET(qarsh_fd, &rfds); + /* Ignore stdin if it's a tty, we don't want to deal with + * tty i/o if we're a background process */ + if (!isatty(fileno(stdin)) && allowed_in) { FD_SET(fileno(stdin), &rfds); } + if (z_out) { FD_SET(fileno(stdout), &wfds); } + if (z_err) { FD_SET(fileno(stderr), &wfds); } + + nset = pselect(FD_SETSIZE, &rfds, &wfds, NULL, &timeout, &pselect_sigmask); + /* Timeout hit, send a heartbeat */ if (nset == 0) { if (!hbeat(qarsh_hb)) { /* If the heartbeat fails, we should exit now. @@ -322,76 +262,98 @@ run_remote_cmd(char *cmdline) */ hbeat_setstate(qarsh_hb, HOST_ALIVE); - if (nset && FD_ISSET(fileno(stdin), &testfds)) { - z_in = read(fileno(stdin), b_in, QARSH_BUFSIZE); - if (z_in == 0) { - FD_CLR(fileno(stdin), &readfds); + if (nset && FD_ISSET(fileno(stdin), &rfds)) { + nbytes = read(fileno(stdin), buf, allowed_in); + qp = make_qp_data(0, 0, nbytes, buf); + send_packet(qarsh_fd, qp); + qpfree(qp); + allowed_in -= nbytes; + if (nbytes == 0) { close(fileno(stdin)); - close(c_in); - c_in = 0; + allowed_in = 0; } nset--; } - if (nset && c_in && FD_ISSET(c_in, &testwds)) { - write(c_in, b_in, z_in); - z_in = 0; - memset(b_in, 0, QARSH_BUFSIZE); - } - if (nset && c_out && FD_ISSET(c_out, &testfds)) { - z_out = read(c_out, b_out, QARSH_BUFSIZE); - if (z_out == 0) { - FD_CLR(c_out, &readfds); - close(c_out); - c_out = 0; + if (nset && FD_ISSET(fileno(stdout), &wfds)) { + nbytes = write(fileno(stdout), b_out, z_out); + if (nbytes == z_out) { + z_out = 0; + if (eof_out) close(fileno(stdout)); + } else { + memmove(b_out, b_out+nbytes, z_out - nbytes); + z_out -= nbytes; + } + if (!eof_out) { + qp = make_qp_data_allow(1, nbytes); + send_packet(qarsh_fd, qp); + qpfree(qp); } nset--; } - if (nset && FD_ISSET(fileno(stdout), &testwds)) { - write(fileno(stdout), b_out, z_out); - z_out = 0; - } - if (nset && c_err && FD_ISSET(c_err, &testfds)) { - z_err = read(c_err, b_err, QARSH_BUFSIZE); - if (z_err == 0) { - FD_CLR(c_err, &readfds); - close(c_err); - c_err = 0; + if (nset && FD_ISSET(fileno(stderr), &wfds)) { + nbytes = write(fileno(stderr), b_err, z_err); + if (nbytes == z_err) { + z_err = 0; + if (eof_err) close(fileno(stderr)); + } else { + memmove(b_err, b_err+nbytes, z_err - nbytes); + z_err -= nbytes; + } + if (!eof_err) { + qp = make_qp_data_allow(2, nbytes); + send_packet(qarsh_fd, qp); + qpfree(qp); } nset--; } - if (nset && FD_ISSET(fileno(stderr), &testwds)) { - write(fileno(stderr), b_err, z_err); - z_err = 0; - } - if (nset && FD_ISSET(qarsh_fd, &testfds)) { + if (nset && FD_ISSET(qarsh_fd, &rfds)) { qp = recv_packet(qarsh_fd); if (qp == NULL) { fprintf(stderr, "recv_packet() returned NULL!\n:"); break; } - /* dump_qp(qp); */ - if (qp && qp->qp_type == QP_CMDEXIT) { + if (qp->qp_type == QP_CMDEXIT) { cmd_finished = 1; rc = qp->qp_cmdexit.qp_status; - qpfree(qp); /* Don't break yet, we need to make * sure all output is read. */ + } else if (qp->qp_type == QP_DATA) { + if (qp->qp_data.qp_remfd == 1 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_out)) { + if (qp->qp_data.qp_count == 0) eof_out = 1; + memcpy(b_out+z_out, qp->qp_data.qp_blob, qp->qp_data.qp_count); + z_out += qp->qp_data.qp_count; + } else if (qp->qp_data.qp_remfd == 2 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_err)) { + if (qp->qp_data.qp_count == 0) eof_err = 1; + memcpy(b_err+z_err, qp->qp_data.qp_blob, qp->qp_data.qp_count); + z_err += qp->qp_data.qp_count; + } else { + fprintf(stderr, "ERROR: Bad data packet: fd %d, cnt: %d\n, bufleft: %d, %d", + qp->qp_data.qp_remfd, qp->qp_data.qp_count, + QARSH_BUFSIZE - z_out, QARSH_BUFSIZE - z_err); + } + } else if (qp->qp_type == QP_DALLOW) { + if (qp->qp_dallow.qp_remfd == 0) { + allowed_in += qp->qp_dallow.qp_count; + } else { + fprintf(stderr, "ERROR: Received data allow for fd %d\n", + qp->qp_dallow.qp_remfd); + } } + qpfree(qp); nset--; } } - if (cmd_finished && c_out == 0 && c_err == 0) { - /* If the command is complete and both output - * sockets are closed, we can exit now. We need - * to test all conditions at once so if none are - * true, we'll still check for heartbeat. */ + if (cmd_finished + && eof_out && z_out == 0 + && eof_err && z_err == 0) { + /* If the command is complete, we've seen EOF + * on outputs and both output buffers are empty we can + * exit now. We need to test all conditions at once so + * if none are true, we'll still check for heartbeat. */ break; } } - if (c_out) close(c_out); - if (c_err) close(c_err); - free(b_in); free(b_out); free(b_err); if (hbeat_getstate(qarsh_hb) == HOST_TIMEOUT) { fprintf(stderr, "Didn't receive heartbeat for %d seconds\n", hbeat_getmaxtimeout(qarsh_hb)); @@ -539,7 +501,9 @@ again: ret = run_remote_cmd(args); close(qarsh_fd); + hbeat_free(qarsh_hb); free(args); + free(remuser); /* If the remote cmd was killed, we need to be killed too */ if (WIFSIGNALED(ret)) { reset_signals(); diff --git a/qarsh_packet.c b/qarsh_packet.c index 269675f..945db6b 100644 --- a/qarsh_packet.c +++ b/qarsh_packet.c @@ -49,6 +49,7 @@ void parse_qp_recvfile(char *buf, int *buflen, struct qa_packet *qp); void parse_qp_sendfile(char *buf, int *buflen, struct qa_packet *qp); void parse_qp_rstat(char *buf, int *buflen, struct qa_packet *qp); void parse_qp_data(char *buf, int *buflen, struct qa_packet *qp); +void parse_qp_data_allow(char *buf, int *buflen, struct qa_packet *qp); char *store_qp_hello(char *buf, struct qa_packet *qp); char *store_qp_returncode(char *buf, struct qa_packet *qp); @@ -61,6 +62,7 @@ char *store_qp_recvfile(char *buf, struct qa_packet *qp); char *store_qp_sendfile(char *buf, struct qa_packet *qp); char *store_qp_rstat(char *buf, struct qa_packet *qp); char *store_qp_data(char *buf, struct qa_packet *qp); +char *store_qp_data_allow(char *buf, struct qa_packet *qp); void free_qp_hello(struct qa_packet *qp); void free_qp_returncode(struct qa_packet *qp); @@ -81,6 +83,7 @@ void dump_qp_recvfile(struct qa_packet *qp); void dump_qp_sendfile(struct qa_packet *qp); void dump_qp_rstat(struct qa_packet *qp); void dump_qp_data(struct qa_packet *qp); +void dump_qp_data_allow(struct qa_packet *qp); struct packet_internals { @@ -162,6 +165,12 @@ struct packet_internals { .pi_store = store_qp_data, .pi_free = free_qp_data, .pi_dump = dump_qp_data + }, { + .pi_name = "dallow", + .pi_parse = parse_qp_data_allow, + .pi_store = store_qp_data_allow, + .pi_free = NULL, + .pi_dump = dump_qp_data_allow } }; @@ -268,9 +277,6 @@ void parse_qp_runcmd(char *buf, int *buflen, struct qa_packet *qp) { buf = fetch_string(buf, buflen, &(qp->qp_runcmd.qp_cmdline)); - buf = fetch_int(buf, buflen, &(qp->qp_runcmd.qp_stdin_port)); - buf = fetch_int(buf, buflen, &(qp->qp_runcmd.qp_stdout_port)); - buf = fetch_int(buf, buflen, &(qp->qp_runcmd.qp_stderr_port)); } void @@ -344,6 +350,13 @@ parse_qp_data(char *buf, int *buflen, struct qa_packet *qp) } } +void +parse_qp_data_allow(char *buf, int *buflen, struct qa_packet *qp) +{ + buf = fetch_int(buf, buflen, (int *)&(qp->qp_dallow.qp_remfd)); + buf = fetch_int(buf, buflen, (int *)&(qp->qp_dallow.qp_count)); +} + struct qa_packet * parse_packet(char *buf, int buflen) { @@ -424,9 +437,6 @@ char * store_qp_runcmd(char *buf, struct qa_packet *qp) { buf = store_string(buf, qp->qp_runcmd.qp_cmdline); - buf = store_int(buf, qp->qp_runcmd.qp_stdin_port); - buf = store_int(buf, qp->qp_runcmd.qp_stdout_port); - buf = store_int(buf, qp->qp_runcmd.qp_stderr_port); return buf; } @@ -499,6 +509,14 @@ store_qp_data(char *buf, struct qa_packet *qp) return buf; } +char * +store_qp_data_allow(char *buf, struct qa_packet *qp) +{ + buf = store_int(buf, qp->qp_dallow.qp_remfd); + buf = store_int(buf, qp->qp_dallow.qp_count); + return buf; +} + int qptostr(struct qa_packet *qp, char *qpstr, int maxsize) { @@ -568,7 +586,7 @@ make_qp_ack(enum qa_packet_type t, int i) } struct qa_packet * -make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err) +make_qp_runcmd(char *cmdline) { struct qa_packet *qp; qp = malloc(sizeof *qp); @@ -577,9 +595,6 @@ make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err) qp->qp_type = QP_RUNCMD; qp->qp_runcmd.qp_cmdline = strdup(cmdline); - qp->qp_runcmd.qp_stdin_port = p_in; - qp->qp_runcmd.qp_stdout_port = p_out; - qp->qp_runcmd.qp_stderr_port = p_err; return qp; } @@ -698,6 +713,21 @@ make_qp_data(const int remfd, const off_t offset, const int count, void *blob) return qp; } +struct qa_packet * +make_qp_data_allow(const int remfd, const int count) +{ + struct qa_packet *qp; + qp = malloc(sizeof *qp); + assert(qp); + memset(qp, 0, sizeof *qp); + + qp->qp_type = QP_DALLOW; + qp->qp_dallow.qp_remfd = remfd; + qp->qp_dallow.qp_count = count; + + return qp; +} + /* * Packet deallocation functions */ @@ -843,6 +873,13 @@ dump_qp_data(struct qa_packet *qp) } void +dump_qp_data_allow(struct qa_packet *qp) +{ + fprintf(stderr, "remfd: %d count: %d", + qp->qp_dallow.qp_remfd, qp->qp_dallow.qp_count); +} + +void dump_qp(struct qa_packet *qp) { fprintf(stderr, "#%d %s ", qp->qp_seq, QP_NAME(qp->qp_type)); diff --git a/qarsh_packet.h b/qarsh_packet.h index 908a6da..8a74791 100644 --- a/qarsh_packet.h +++ b/qarsh_packet.h @@ -37,7 +37,8 @@ enum qa_packet_type { QP_RECVFILE = 8, QP_SENDFILE = 9, QP_RSTAT = 10, - QP_DATA = 11 + QP_DATA = 11, + QP_DALLOW = 12 }; struct qp_hello_pkt { @@ -52,9 +53,6 @@ struct qp_returncode_pkt { struct qp_runcmd_pkt { char *qp_cmdline; - int qp_stdin_port; - int qp_stdout_port; - int qp_stderr_port; }; /* General packet for acknowledging a command worked */ @@ -104,6 +102,11 @@ struct qp_data_pkt { void *qp_blob; }; +struct qp_data_allow_pkt { + int qp_remfd; /* fd we're allowing data on */ + int qp_count; /* How much data the receiver is allowed to send */ +}; + struct qa_packet { enum qa_packet_type qp_type; int qp_seq; /* Sequence number for this packet */ @@ -119,6 +122,7 @@ struct qa_packet { struct qp_sendfile_pkt sendfile; struct qp_rstat_pkt rstat; struct qp_data_pkt data; + struct qp_data_allow_pkt dallow; } qp_u; }; @@ -133,6 +137,7 @@ struct qa_packet { #define qp_sendfile qp_u.sendfile #define qp_rstat qp_u.rstat #define qp_data qp_u.data +#define qp_dallow qp_u.dallow /* Prototypes */ char *qp_packet_type(enum qa_packet_type t); @@ -140,7 +145,7 @@ struct qa_packet *parse_packet(char *buf, int buflen); struct qa_packet *make_qp_hello(char *greeting); struct qa_packet *make_qp_returncode(int rc, int eno, char *strerr); struct qa_packet *make_qp_ack(enum qa_packet_type t, int i); -struct qa_packet *make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err); +struct qa_packet *make_qp_runcmd(char *cmdline); struct qa_packet *make_qp_cmdexit(pid_t pid, int status); struct qa_packet *make_qp_setuser(char *user, char *group); struct qa_packet *make_qp_kill(int sig); @@ -148,6 +153,7 @@ struct qa_packet *make_qp_recvfile(const char *path, off_t count, mode_t mode); struct qa_packet *make_qp_sendfile(const char *path, int remfd); struct qa_packet *make_qp_rstat(const char *path, const struct stat *sb); struct qa_packet *make_qp_data(const int remfd, const off_t offset, const int count, void *blob); +struct qa_packet *make_qp_data_allow(const int remfd, const int count); int qptostr(struct qa_packet *qp, char *qpstr, int maxsize); void qpfree(struct qa_packet *qp); void dump_qp(struct qa_packet *qp); @@ -41,6 +41,7 @@ #include "sockutil.h" #include "qarsh_packet.h" +#define QARSHD_BUFSIZE 4096 /* * QA Remote Shell Daemon */ @@ -49,9 +50,12 @@ int debug = 0; int dopause = 0; /* Globals */ -struct sockaddr_storage peername; +const int qinfd = 0; /* qarshd in file descriptor */ +const int qoutfd = 1; /* qarshd out file descriptor */ int child_exitted = 0; +pid_t child_pid; sigset_t orig_sigmask; +int childfds[3] = { -1, -1, -1 }; /* pipes to child for stdin/stdout/stderr */ /* A mini cache for rstat so we can check it in pushfile */ char *saved_path = NULL; @@ -98,10 +102,18 @@ sig_handler(int sig) } pid_t -run_cmd(const char *cmd, int p_in, int p_out, int p_err) +run_cmd(const char *cmd) { pid_t pid; - int new_in, new_out, new_err; + int pipefds[2], parentfds[3]; + + pipe(pipefds); + childfds[0] = pipefds[1]; parentfds[0] = pipefds[0]; /* stdin */ + pipe(pipefds); + childfds[1] = pipefds[0]; parentfds[1] = pipefds[1]; /* stdout */ + pipe(pipefds); + childfds[2] = pipefds[0]; parentfds[2] = pipefds[1]; /* stderr */ + syslog(LOG_INFO, "Running cmdline: %s\n", cmd); if ((pid = fork()) < 0) { @@ -114,21 +126,22 @@ run_cmd(const char *cmd, int p_in, int p_out, int p_err) setpgrp(); sigprocmask(SIG_SETMASK, &orig_sigmask, NULL); - /* Connect stdin, stdout, and stderr to qarsh */ - new_in = connect_to_peer(&peername, p_in); - if (new_in == -1) syslog(LOG_WARNING, "connect to new_in failed"); - dup2(new_in, fileno(stdin)); - new_out = connect_to_peer(&peername, p_out); - if (new_out == -1) syslog(LOG_WARNING, "connect to new_out failed"); - dup2(new_out, fileno(stdout)); - new_err = connect_to_peer(&peername, p_err); - if (new_err == -1) syslog(LOG_WARNING, "connect to new_err failed"); - dup2(new_err, fileno(stderr)); + /* Connect stdin, stdout, and stderr to parent's pipes */ + dup2(parentfds[0], fileno(stdin)); + dup2(parentfds[1], fileno(stdout)); + dup2(parentfds[2], fileno(stderr)); + /* close end of pipes we're not using */ + close(childfds[0]); + close(childfds[1]); + close(childfds[2]); execlp("sh", "sh", "-c", cmd, NULL); fprintf(stderr, "exec of %s failed: %d, %s\n", cmd, errno, strerror(errno)); exit(127); } + close(parentfds[0]); + close(parentfds[1]); + close(parentfds[2]); return pid; } @@ -225,7 +238,7 @@ pushfile(const char *path, int remfd) } else { qp = make_qp_data(remfd, offset, nbytes, buf); } - send_packet(fileno(stdout), qp); + send_packet(qoutfd, qp); offset += nbytes; qpfree(qp); } while (nbytes > 0); @@ -259,19 +272,22 @@ rstat(const char *path) return rp; } +/* Handle only qarsh related packets */ void -handle_packets(int infd) +handle_qarsh() { - fd_set rfds; - int nfd; + fd_set rfds, wfds; + int nfd, maxfd; struct timespec timeout; - struct qa_packet *qp = NULL, *rp = NULL; sigset_t sigmask; struct sigaction sa; - - pid_t child_pid = 0; int child_status; - off_t nbytes; + struct qa_packet *qp = NULL, *rp = NULL; + int allowed_out = 0, allowed_err = 0; /* number of bytes we can send to client */ + char buf[4096], buf_in[4096]; + int eof_in = 0; + int z_in = 0; + int nbytes; sigemptyset(&sigmask); sigaddset(&sigmask, SIGCHLD); @@ -280,28 +296,171 @@ handle_packets(int infd) sa.sa_mask = sigmask; sa.sa_flags = SA_RESTART; sigaction(SIGCHLD, &sa, NULL); - if (dopause) { - signal(SIGALRM, sig_handler); - pause(); - signal(SIGALRM, SIG_DFL); - } + qp = make_qp_data_allow(0, 4096); + send_packet(qoutfd, qp); + qpfree(qp); + for (;;) { - FD_SET(infd, &rfds); - timeout.tv_sec = 3; - timeout.tv_nsec = 0; - nfd = pselect(infd+1, &rfds, NULL, NULL, &timeout, &orig_sigmask); + FD_ZERO(&rfds); FD_ZERO(&wfds); + FD_SET(qinfd, &rfds); + maxfd = qinfd; + if (childfds[0] > -1 && z_in) { + FD_SET(childfds[0], &wfds); + maxfd = childfds[0] > maxfd ? childfds[0] : maxfd; + } + if (childfds[1] > -1 && allowed_out) { + FD_SET(childfds[1], &rfds); + maxfd = childfds[1] > maxfd ? childfds[1] : maxfd; + } + if (childfds[2] > -1 && allowed_err) { + FD_SET(childfds[2], &rfds); + maxfd = childfds[2] > maxfd ? childfds[2] : maxfd; + } + nfd = pselect(maxfd+1, &rfds, &wfds, NULL, &timeout, &orig_sigmask); if (child_exitted) { waitpid(child_pid, &child_status, 0); child_exitted--; child_pid = 0; rp = make_qp_cmdexit(child_pid, child_status); - send_packet(fileno(stdout), rp); + send_packet(qoutfd, rp); qpfree(rp); } + if (nfd < 0) { + if (errno == EINTR) { + /* signals handled above here */ + continue; + } else { + syslog(LOG_ERR, "select errno %d, %s\n", + errno, strerror(errno)); + } + } else if (nfd > 0) { + if (nfd && FD_ISSET(qinfd, &rfds)) { + qp = recv_packet(qinfd); + if (qp == NULL) { + if (debug) syslog(LOG_DEBUG, "That's enough\n"); + break; + } + if (debug) dump_qp(qp); + switch(qp->qp_type) { + case QP_KILL: + if (child_pid) { + syslog(LOG_INFO, "Sending child %d signal %d", + child_pid, qp->qp_kill.qp_sig); + kill(child_pid * -1, qp->qp_kill.qp_sig); + } + break; + case QP_RUNCMD: + syslog(LOG_ERR, "Only one command allowed per connection"); + break; + case QP_DATA: + if (qp->qp_data.qp_remfd != 0) { + syslog(LOG_ERR, "Received data for fd %d\n", qp->qp_data.qp_remfd); + break; + } + if (qp->qp_data.qp_count > (QARSHD_BUFSIZE - z_in)) { + syslog(LOG_ERR, "Received too much data for fd %d, %d > %d\n", + qp->qp_data.qp_remfd, qp->qp_data.qp_count, QARSHD_BUFSIZE - z_in); + break; + } + if (qp->qp_data.qp_count == 0) eof_in = 1; + memcpy(buf_in+z_in, qp->qp_data.qp_blob, qp->qp_data.qp_count); + z_in += qp->qp_data.qp_count; + break; + case QP_DALLOW: + if (qp->qp_dallow.qp_remfd == 1) { + allowed_out += qp->qp_dallow.qp_count; + } else if (qp->qp_dallow.qp_remfd == 2) { + allowed_err += qp->qp_dallow.qp_count; + } else { + syslog(LOG_ERR, "Received data allow for fd %d\n", qp->qp_dallow.qp_remfd); + } + break; + default: + syslog(LOG_ERR, "Expected message type %s", qp_packet_type(qp->qp_type)); + break; + } + nfd--; + } + if (nfd && FD_ISSET(childfds[0], &wfds)) { + /* Child is ready for data on stdin */ + nbytes = write(childfds[0], buf_in, z_in); + if (nbytes == z_in) { + z_in = 0; + if (eof_in) { + close(childfds[0]); + childfds[0] = -1; + } + } else { + memmove(buf_in, buf_in+nbytes, z_in - nbytes); + z_in -= nbytes; + } + if (!eof_in) { + qp = make_qp_data_allow(0, nbytes); + send_packet(qoutfd, qp); + qpfree(qp); + } + nfd--; + } + if (nfd && FD_ISSET(childfds[1], &rfds)) { + /* Child has something to send to stdout */ + nbytes = read(childfds[1], buf, allowed_out); + qp = make_qp_data(1, 0, nbytes, buf); + send_packet(qoutfd, qp); + qpfree(qp); + allowed_out -= nbytes; + if (nbytes == 0) { + close(childfds[1]); + childfds[1] = -1; + allowed_out = 0; + } + nfd--; + } + if (nfd && FD_ISSET(childfds[2], &rfds)) { + /* Child has something to send to stderr */ + nbytes = read(childfds[2], buf, allowed_err); + qp = make_qp_data(2, 0, nbytes, buf); + send_packet(qoutfd, qp); + qpfree(qp); + allowed_err -= nbytes; + if (nbytes == 0) { + close(childfds[2]); + childfds[2] = -1; + allowed_err = 0; + } + nfd--; + } + + } else { + if (debug) syslog(LOG_DEBUG, "Nothing to do in handle_qarsh\n"); + } + } +} +void +handle_packets() +{ + fd_set rfds; + int nfd; + struct timeval timeout; + struct qa_packet *qp = NULL, *rp = NULL; + + off_t nbytes; + + if (dopause) { + signal(SIGALRM, sig_handler); + pause(); + signal(SIGALRM, SIG_DFL); + } + + for (;;) { + FD_SET(qinfd, &rfds); + timeout.tv_sec = 3; + timeout.tv_usec = 0; + + nfd = select(qinfd+1, &rfds, NULL, NULL, &timeout); if (nfd < 0) { if (errno == EINTR) { /* signals handled above here */ @@ -311,21 +470,13 @@ handle_packets(int infd) errno, strerror(errno)); } } else if (nfd > 0) { - qp = recv_packet(infd); + qp = recv_packet(qinfd); if (qp == NULL) { if (debug) syslog(LOG_DEBUG, "That's enough\n"); break; } if (debug) dump_qp(qp); switch (qp->qp_type) { - case QP_KILL: - if (child_pid) { - - syslog(LOG_INFO, "Sending child %d signal %d", - child_pid, qp->qp_kill.qp_sig); - kill(child_pid * -1, qp->qp_kill.qp_sig); - } - break; case QP_SETUSER: if (setup_user(qp->qp_setuser.qp_user, qp->qp_setuser.qp_group) == 0) { @@ -333,25 +484,24 @@ handle_packets(int infd) } else { rp = make_qp_ack(QP_SETUSER, 1); } - send_packet(fileno(stdout), rp); + send_packet(qoutfd, rp); qpfree(rp); break; case QP_RUNCMD: - child_pid = run_cmd(qp->qp_runcmd.qp_cmdline, - qp->qp_runcmd.qp_stdin_port, - qp->qp_runcmd.qp_stdout_port, - qp->qp_runcmd.qp_stderr_port); + child_pid = run_cmd(qp->qp_runcmd.qp_cmdline); + qpfree(qp); + handle_qarsh(); /* continue processing qarsh separately */ break; case QP_RECVFILE: /* Setup file descriptors to handle incoming data */ rp = prepare_recvfile(qp); - send_packet(fileno(stdout), rp); + send_packet(qoutfd, rp); qpfree(rp); break; case QP_DATA: assert(qp->qp_data.qp_remfd == remotefds[0].fd); rp = receive_data(&remotefds[0], qp); if (rp) { - send_packet(fileno(stdout), rp); + send_packet(qoutfd, rp); qpfree(rp); } break; @@ -366,15 +516,16 @@ handle_packets(int infd) } else { rp = make_qp_returncode(0, 0, "Transfer Complete"); } - send_packet(fileno(stdout), rp); + send_packet(qoutfd, rp); qpfree(rp); break; case QP_RSTAT: syslog(LOG_INFO, "Got a QP_RSTAT with path = %s\n", qp->qp_rstat.qp_path); rp = rstat(qp->qp_rstat.qp_path); - send_packet(fileno(stdout), rp); + send_packet(qoutfd, rp); qpfree(rp); + qpfree(qp); break; default: syslog(LOG_WARNING, @@ -383,7 +534,7 @@ handle_packets(int infd) } qpfree(qp); } else { - if (debug) syslog(LOG_DEBUG, "Nothing to do\n"); + if (debug) syslog(LOG_DEBUG, "Nothing to do in handle_packets\n"); } } @@ -394,6 +545,7 @@ main(int argc, char *argv[]) { int ch; socklen_t peerlen; + struct sockaddr_storage peername; char peer_hoststr[NI_MAXHOST]; char peer_portstr[NI_MAXSERV]; @@ -424,7 +576,7 @@ main(int argc, char *argv[]) peername.ss_family == AF_INET ? "IPv4" : "IPv6"); /* Start reading packets from stdin */ - handle_packets(0); + handle_packets(); return 0; } |