summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNathan Straz <nstraz@redhat.com>2013-08-28 14:16:30 -0400
committerNathan Straz <nstraz@redhat.com>2013-09-11 17:49:33 -0400
commitceaf5360d969f2507018f51876f64aaae767e367 (patch)
tree07d3db73a4186ce2dcad660393e9265b0ff254ca
parent604b053a1e710f22226fcb86c34e737df1058f92 (diff)
downloadqarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.gz
qarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.xz
qarsh-ceaf5360d969f2507018f51876f64aaae767e367.zip
Get commands running over one socket
Added a new packet to limit data sent from the other side.
-rw-r--r--qarsh.c224
-rw-r--r--qarsh_packet.c57
-rw-r--r--qarsh_packet.h16
-rw-r--r--qarshd.c254
4 files changed, 355 insertions, 196 deletions
diff --git a/qarsh.c b/qarsh.c
index 62dd60d..a3fa325 100644
--- a/qarsh.c
+++ b/qarsh.c
@@ -183,87 +183,21 @@ run_remote_cmd(char *cmdline)
{
struct qa_packet *qp;
int rc;
- int p_in, p_out, p_err; /* Port numbers */
- int l_in, l_out, l_err; /* listening sockets */
- int c_in, c_out, c_err; /* client sockets */
- char *b_in, *b_out, *b_err; /* Buffers */
- int z_in, z_out, z_err; /* size in buffer */
- fd_set readfds, testfds, testwds;
+ int allowed_in = 0; /* bytes we can send to qarshd */
+ char b_out[QARSH_BUFSIZE], b_err[QARSH_BUFSIZE]; /* Buffers */
+ char buf[QARSH_BUFSIZE];
+ int z_out = 0, z_err = 0; /* size in buffer */
+ int eof_out = 0, eof_err = 0;
+ fd_set rfds, wfds;
int nset;
- struct sockaddr_in caddr;
- socklen_t clen;
+ int nbytes;
struct timespec timeout;
short cmd_finished;
- l_in = bind_any(QARSH_MINPORT, qarsh_ss_family);
- p_in = getsockport(l_in);
- l_out = bind_any(QARSH_MINPORT, qarsh_ss_family);
- p_out = getsockport(l_out);
- l_err = bind_any(QARSH_MINPORT, qarsh_ss_family);
- p_err = getsockport(l_err);
-
- qp = make_qp_runcmd(cmdline, p_in, p_out, p_err);
+ qp = make_qp_runcmd(cmdline);
send_packet(qarsh_fd, qp);
qpfree(qp);
- /* Get the stdin, stdout, and stderr connections up before we do work */
- FD_ZERO(&readfds);
- FD_SET(l_in, &readfds);
- FD_SET(l_out, &readfds);
- FD_SET(l_err, &readfds);
- c_in = c_out = c_err = 0;
-
- do {
- testfds = readfds;
- nset = select(FD_SETSIZE, &testfds, NULL, NULL, NULL);
-
- if (FD_ISSET(l_in, &testfds)) {
- clen = sizeof caddr;
- c_in = accept(l_in, (struct sockaddr *)&caddr, &clen);
- if (c_in == -1) {
- fprintf(stderr,
- "accept of l_in failed, %d: %s\n",
- errno, strerror(errno));
- continue;
- }
- }
- if (FD_ISSET(l_out, &testfds)) {
- clen = sizeof caddr;
- c_out = accept(l_out, (struct sockaddr *)&caddr, &clen);
- if (c_out == -1) {
- fprintf(stderr,
- "accept of l_out failed, %d: %s\n",
- errno, strerror(errno));
- continue;
- }
- }
- if (FD_ISSET(l_err, &testfds)) {
- clen = sizeof caddr;
- c_err = accept(l_err, (struct sockaddr *)&caddr, &clen);
- if (c_err == -1) {
- fprintf(stderr,
- "accept of l_err failed, %d: %s\n",
- errno, strerror(errno));
- continue;
- }
- }
- } while (c_in == 0 || c_out == 0 || c_err == 0);
- close(l_in);
- close(l_out);
- close(l_err);
- l_in = l_out = l_err = -1;
-
- /* Now we can start doing some real work */
-
- FD_ZERO(&readfds);
- FD_SET(qarsh_fd, &readfds);
- FD_SET(c_out, &readfds);
- FD_SET(c_err, &readfds);
- /* Ignore stdin if it's a tty, we don't want to deal with
- * tty i/o if we're a background process */
- if (!isatty(fileno(stdin))) {
- FD_SET(fileno(stdin), &readfds);
- }
/* Setup signal handling stuff so we can propogate signals */
setup_signals();
@@ -272,10 +206,14 @@ run_remote_cmd(char *cmdline)
"fcntl stdin O_NONBLOCK failed, %d: %s\n",
errno, strerror(errno));
}
- /* allocate one buffer for each stream */
- b_in = malloc(QARSH_BUFSIZE); memset(b_in, 0, QARSH_BUFSIZE); z_in = 0;
- b_out = malloc(QARSH_BUFSIZE); memset(b_out, 0, QARSH_BUFSIZE); z_out = 0;
- b_err = malloc(QARSH_BUFSIZE); memset(b_err, 0, QARSH_BUFSIZE); z_err = 0;
+
+ /* Tell qarshd how much data it can send on stdout and stderr */
+ qp = make_qp_data_allow(1, QARSH_BUFSIZE);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
+ qp = make_qp_data_allow(2, QARSH_BUFSIZE);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
hbeat(qarsh_hb);
cmd_finished = 0;
@@ -287,17 +225,19 @@ run_remote_cmd(char *cmdline)
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
}
- testfds = readfds;
- FD_ZERO(&testwds);
- /* If there are things to write, set the write bit and clear the read */
- /* We use one buffer per channel and we don't want to overwrite it */
- if (z_in) { FD_CLR(fileno(stdin), &testfds); FD_SET(c_in, &testwds); }
- if (z_out) { FD_CLR(c_out, &testfds); FD_SET(fileno(stdout), &testwds); }
- if (z_err) { FD_CLR(c_err, &testfds); FD_SET(fileno(stderr), &testwds); }
-
- nset = pselect(FD_SETSIZE, &testfds, &testwds, NULL, &timeout,
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_SET(qarsh_fd, &rfds);
+ /* Ignore stdin if it's a tty, we don't want to deal with
+ * tty i/o if we're a background process */
+ if (!isatty(fileno(stdin)) && allowed_in) { FD_SET(fileno(stdin), &rfds); }
+ if (z_out) { FD_SET(fileno(stdout), &wfds); }
+ if (z_err) { FD_SET(fileno(stderr), &wfds); }
+
+ nset = pselect(FD_SETSIZE, &rfds, &wfds, NULL, &timeout,
&pselect_sigmask);
+ /* Timeout hit, send a heartbeat */
if (nset == 0) {
if (!hbeat(qarsh_hb)) {
/* If the heartbeat fails, we should exit now.
@@ -322,76 +262,98 @@ run_remote_cmd(char *cmdline)
*/
hbeat_setstate(qarsh_hb, HOST_ALIVE);
- if (nset && FD_ISSET(fileno(stdin), &testfds)) {
- z_in = read(fileno(stdin), b_in, QARSH_BUFSIZE);
- if (z_in == 0) {
- FD_CLR(fileno(stdin), &readfds);
+ if (nset && FD_ISSET(fileno(stdin), &rfds)) {
+ nbytes = read(fileno(stdin), buf, allowed_in);
+ qp = make_qp_data(0, 0, nbytes, buf);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
+ allowed_in -= nbytes;
+ if (nbytes == 0) {
close(fileno(stdin));
- close(c_in);
- c_in = 0;
+ allowed_in = 0;
}
nset--;
}
- if (nset && c_in && FD_ISSET(c_in, &testwds)) {
- write(c_in, b_in, z_in);
- z_in = 0;
- memset(b_in, 0, QARSH_BUFSIZE);
- }
- if (nset && c_out && FD_ISSET(c_out, &testfds)) {
- z_out = read(c_out, b_out, QARSH_BUFSIZE);
- if (z_out == 0) {
- FD_CLR(c_out, &readfds);
- close(c_out);
- c_out = 0;
+ if (nset && FD_ISSET(fileno(stdout), &wfds)) {
+ nbytes = write(fileno(stdout), b_out, z_out);
+ if (nbytes == z_out) {
+ z_out = 0;
+ if (eof_out) close(fileno(stdout));
+ } else {
+ memmove(b_out, b_out+nbytes, z_out - nbytes);
+ z_out -= nbytes;
+ }
+ if (!eof_out) {
+ qp = make_qp_data_allow(1, nbytes);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
}
nset--;
}
- if (nset && FD_ISSET(fileno(stdout), &testwds)) {
- write(fileno(stdout), b_out, z_out);
- z_out = 0;
- }
- if (nset && c_err && FD_ISSET(c_err, &testfds)) {
- z_err = read(c_err, b_err, QARSH_BUFSIZE);
- if (z_err == 0) {
- FD_CLR(c_err, &readfds);
- close(c_err);
- c_err = 0;
+ if (nset && FD_ISSET(fileno(stderr), &wfds)) {
+ nbytes = write(fileno(stderr), b_err, z_err);
+ if (nbytes == z_err) {
+ z_err = 0;
+ if (eof_err) close(fileno(stderr));
+ } else {
+ memmove(b_err, b_err+nbytes, z_err - nbytes);
+ z_err -= nbytes;
+ }
+ if (!eof_err) {
+ qp = make_qp_data_allow(2, nbytes);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
}
nset--;
}
- if (nset && FD_ISSET(fileno(stderr), &testwds)) {
- write(fileno(stderr), b_err, z_err);
- z_err = 0;
- }
- if (nset && FD_ISSET(qarsh_fd, &testfds)) {
+ if (nset && FD_ISSET(qarsh_fd, &rfds)) {
qp = recv_packet(qarsh_fd);
if (qp == NULL) {
fprintf(stderr, "recv_packet() returned NULL!\n:");
break;
}
- /* dump_qp(qp); */
- if (qp && qp->qp_type == QP_CMDEXIT) {
+ if (qp->qp_type == QP_CMDEXIT) {
cmd_finished = 1;
rc = qp->qp_cmdexit.qp_status;
- qpfree(qp);
/* Don't break yet, we need to make
* sure all output is read. */
+ } else if (qp->qp_type == QP_DATA) {
+ if (qp->qp_data.qp_remfd == 1 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_out)) {
+ if (qp->qp_data.qp_count == 0) eof_out = 1;
+ memcpy(b_out+z_out, qp->qp_data.qp_blob, qp->qp_data.qp_count);
+ z_out += qp->qp_data.qp_count;
+ } else if (qp->qp_data.qp_remfd == 2 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_err)) {
+ if (qp->qp_data.qp_count == 0) eof_err = 1;
+ memcpy(b_err+z_err, qp->qp_data.qp_blob, qp->qp_data.qp_count);
+ z_err += qp->qp_data.qp_count;
+ } else {
+ fprintf(stderr, "ERROR: Bad data packet: fd %d, cnt: %d\n, bufleft: %d, %d",
+ qp->qp_data.qp_remfd, qp->qp_data.qp_count,
+ QARSH_BUFSIZE - z_out, QARSH_BUFSIZE - z_err);
+ }
+ } else if (qp->qp_type == QP_DALLOW) {
+ if (qp->qp_dallow.qp_remfd == 0) {
+ allowed_in += qp->qp_dallow.qp_count;
+ } else {
+ fprintf(stderr, "ERROR: Received data allow for fd %d\n",
+ qp->qp_dallow.qp_remfd);
+ }
}
+ qpfree(qp);
nset--;
}
}
- if (cmd_finished && c_out == 0 && c_err == 0) {
- /* If the command is complete and both output
- * sockets are closed, we can exit now. We need
- * to test all conditions at once so if none are
- * true, we'll still check for heartbeat. */
+ if (cmd_finished
+ && eof_out && z_out == 0
+ && eof_err && z_err == 0) {
+ /* If the command is complete, we've seen EOF
+ * on outputs and both output buffers are empty we can
+ * exit now. We need to test all conditions at once so
+ * if none are true, we'll still check for heartbeat. */
break;
}
}
- if (c_out) close(c_out);
- if (c_err) close(c_err);
- free(b_in); free(b_out); free(b_err);
if (hbeat_getstate(qarsh_hb) == HOST_TIMEOUT) {
fprintf(stderr, "Didn't receive heartbeat for %d seconds\n",
hbeat_getmaxtimeout(qarsh_hb));
@@ -539,7 +501,9 @@ again:
ret = run_remote_cmd(args);
close(qarsh_fd);
+ hbeat_free(qarsh_hb);
free(args);
+ free(remuser);
/* If the remote cmd was killed, we need to be killed too */
if (WIFSIGNALED(ret)) {
reset_signals();
diff --git a/qarsh_packet.c b/qarsh_packet.c
index 269675f..945db6b 100644
--- a/qarsh_packet.c
+++ b/qarsh_packet.c
@@ -49,6 +49,7 @@ void parse_qp_recvfile(char *buf, int *buflen, struct qa_packet *qp);
void parse_qp_sendfile(char *buf, int *buflen, struct qa_packet *qp);
void parse_qp_rstat(char *buf, int *buflen, struct qa_packet *qp);
void parse_qp_data(char *buf, int *buflen, struct qa_packet *qp);
+void parse_qp_data_allow(char *buf, int *buflen, struct qa_packet *qp);
char *store_qp_hello(char *buf, struct qa_packet *qp);
char *store_qp_returncode(char *buf, struct qa_packet *qp);
@@ -61,6 +62,7 @@ char *store_qp_recvfile(char *buf, struct qa_packet *qp);
char *store_qp_sendfile(char *buf, struct qa_packet *qp);
char *store_qp_rstat(char *buf, struct qa_packet *qp);
char *store_qp_data(char *buf, struct qa_packet *qp);
+char *store_qp_data_allow(char *buf, struct qa_packet *qp);
void free_qp_hello(struct qa_packet *qp);
void free_qp_returncode(struct qa_packet *qp);
@@ -81,6 +83,7 @@ void dump_qp_recvfile(struct qa_packet *qp);
void dump_qp_sendfile(struct qa_packet *qp);
void dump_qp_rstat(struct qa_packet *qp);
void dump_qp_data(struct qa_packet *qp);
+void dump_qp_data_allow(struct qa_packet *qp);
struct packet_internals {
@@ -162,6 +165,12 @@ struct packet_internals {
.pi_store = store_qp_data,
.pi_free = free_qp_data,
.pi_dump = dump_qp_data
+ }, {
+ .pi_name = "dallow",
+ .pi_parse = parse_qp_data_allow,
+ .pi_store = store_qp_data_allow,
+ .pi_free = NULL,
+ .pi_dump = dump_qp_data_allow
}
};
@@ -268,9 +277,6 @@ void
parse_qp_runcmd(char *buf, int *buflen, struct qa_packet *qp)
{
buf = fetch_string(buf, buflen, &(qp->qp_runcmd.qp_cmdline));
- buf = fetch_int(buf, buflen, &(qp->qp_runcmd.qp_stdin_port));
- buf = fetch_int(buf, buflen, &(qp->qp_runcmd.qp_stdout_port));
- buf = fetch_int(buf, buflen, &(qp->qp_runcmd.qp_stderr_port));
}
void
@@ -344,6 +350,13 @@ parse_qp_data(char *buf, int *buflen, struct qa_packet *qp)
}
}
+void
+parse_qp_data_allow(char *buf, int *buflen, struct qa_packet *qp)
+{
+ buf = fetch_int(buf, buflen, (int *)&(qp->qp_dallow.qp_remfd));
+ buf = fetch_int(buf, buflen, (int *)&(qp->qp_dallow.qp_count));
+}
+
struct qa_packet *
parse_packet(char *buf, int buflen)
{
@@ -424,9 +437,6 @@ char *
store_qp_runcmd(char *buf, struct qa_packet *qp)
{
buf = store_string(buf, qp->qp_runcmd.qp_cmdline);
- buf = store_int(buf, qp->qp_runcmd.qp_stdin_port);
- buf = store_int(buf, qp->qp_runcmd.qp_stdout_port);
- buf = store_int(buf, qp->qp_runcmd.qp_stderr_port);
return buf;
}
@@ -499,6 +509,14 @@ store_qp_data(char *buf, struct qa_packet *qp)
return buf;
}
+char *
+store_qp_data_allow(char *buf, struct qa_packet *qp)
+{
+ buf = store_int(buf, qp->qp_dallow.qp_remfd);
+ buf = store_int(buf, qp->qp_dallow.qp_count);
+ return buf;
+}
+
int
qptostr(struct qa_packet *qp, char *qpstr, int maxsize)
{
@@ -568,7 +586,7 @@ make_qp_ack(enum qa_packet_type t, int i)
}
struct qa_packet *
-make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err)
+make_qp_runcmd(char *cmdline)
{
struct qa_packet *qp;
qp = malloc(sizeof *qp);
@@ -577,9 +595,6 @@ make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err)
qp->qp_type = QP_RUNCMD;
qp->qp_runcmd.qp_cmdline = strdup(cmdline);
- qp->qp_runcmd.qp_stdin_port = p_in;
- qp->qp_runcmd.qp_stdout_port = p_out;
- qp->qp_runcmd.qp_stderr_port = p_err;
return qp;
}
@@ -698,6 +713,21 @@ make_qp_data(const int remfd, const off_t offset, const int count, void *blob)
return qp;
}
+struct qa_packet *
+make_qp_data_allow(const int remfd, const int count)
+{
+ struct qa_packet *qp;
+ qp = malloc(sizeof *qp);
+ assert(qp);
+ memset(qp, 0, sizeof *qp);
+
+ qp->qp_type = QP_DALLOW;
+ qp->qp_dallow.qp_remfd = remfd;
+ qp->qp_dallow.qp_count = count;
+
+ return qp;
+}
+
/*
* Packet deallocation functions
*/
@@ -843,6 +873,13 @@ dump_qp_data(struct qa_packet *qp)
}
void
+dump_qp_data_allow(struct qa_packet *qp)
+{
+ fprintf(stderr, "remfd: %d count: %d",
+ qp->qp_dallow.qp_remfd, qp->qp_dallow.qp_count);
+}
+
+void
dump_qp(struct qa_packet *qp)
{
fprintf(stderr, "#%d %s ", qp->qp_seq, QP_NAME(qp->qp_type));
diff --git a/qarsh_packet.h b/qarsh_packet.h
index 908a6da..8a74791 100644
--- a/qarsh_packet.h
+++ b/qarsh_packet.h
@@ -37,7 +37,8 @@ enum qa_packet_type {
QP_RECVFILE = 8,
QP_SENDFILE = 9,
QP_RSTAT = 10,
- QP_DATA = 11
+ QP_DATA = 11,
+ QP_DALLOW = 12
};
struct qp_hello_pkt {
@@ -52,9 +53,6 @@ struct qp_returncode_pkt {
struct qp_runcmd_pkt {
char *qp_cmdline;
- int qp_stdin_port;
- int qp_stdout_port;
- int qp_stderr_port;
};
/* General packet for acknowledging a command worked */
@@ -104,6 +102,11 @@ struct qp_data_pkt {
void *qp_blob;
};
+struct qp_data_allow_pkt {
+ int qp_remfd; /* fd we're allowing data on */
+ int qp_count; /* How much data the receiver is allowed to send */
+};
+
struct qa_packet {
enum qa_packet_type qp_type;
int qp_seq; /* Sequence number for this packet */
@@ -119,6 +122,7 @@ struct qa_packet {
struct qp_sendfile_pkt sendfile;
struct qp_rstat_pkt rstat;
struct qp_data_pkt data;
+ struct qp_data_allow_pkt dallow;
} qp_u;
};
@@ -133,6 +137,7 @@ struct qa_packet {
#define qp_sendfile qp_u.sendfile
#define qp_rstat qp_u.rstat
#define qp_data qp_u.data
+#define qp_dallow qp_u.dallow
/* Prototypes */
char *qp_packet_type(enum qa_packet_type t);
@@ -140,7 +145,7 @@ struct qa_packet *parse_packet(char *buf, int buflen);
struct qa_packet *make_qp_hello(char *greeting);
struct qa_packet *make_qp_returncode(int rc, int eno, char *strerr);
struct qa_packet *make_qp_ack(enum qa_packet_type t, int i);
-struct qa_packet *make_qp_runcmd(char *cmdline, int p_in, int p_out, int p_err);
+struct qa_packet *make_qp_runcmd(char *cmdline);
struct qa_packet *make_qp_cmdexit(pid_t pid, int status);
struct qa_packet *make_qp_setuser(char *user, char *group);
struct qa_packet *make_qp_kill(int sig);
@@ -148,6 +153,7 @@ struct qa_packet *make_qp_recvfile(const char *path, off_t count, mode_t mode);
struct qa_packet *make_qp_sendfile(const char *path, int remfd);
struct qa_packet *make_qp_rstat(const char *path, const struct stat *sb);
struct qa_packet *make_qp_data(const int remfd, const off_t offset, const int count, void *blob);
+struct qa_packet *make_qp_data_allow(const int remfd, const int count);
int qptostr(struct qa_packet *qp, char *qpstr, int maxsize);
void qpfree(struct qa_packet *qp);
void dump_qp(struct qa_packet *qp);
diff --git a/qarshd.c b/qarshd.c
index 91e2501..6cb52e0 100644
--- a/qarshd.c
+++ b/qarshd.c
@@ -41,6 +41,7 @@
#include "sockutil.h"
#include "qarsh_packet.h"
+#define QARSHD_BUFSIZE 4096
/*
* QA Remote Shell Daemon
*/
@@ -49,9 +50,12 @@ int debug = 0;
int dopause = 0;
/* Globals */
-struct sockaddr_storage peername;
+const int qinfd = 0; /* qarshd in file descriptor */
+const int qoutfd = 1; /* qarshd out file descriptor */
int child_exitted = 0;
+pid_t child_pid;
sigset_t orig_sigmask;
+int childfds[3] = { -1, -1, -1 }; /* pipes to child for stdin/stdout/stderr */
/* A mini cache for rstat so we can check it in pushfile */
char *saved_path = NULL;
@@ -98,10 +102,18 @@ sig_handler(int sig)
}
pid_t
-run_cmd(const char *cmd, int p_in, int p_out, int p_err)
+run_cmd(const char *cmd)
{
pid_t pid;
- int new_in, new_out, new_err;
+ int pipefds[2], parentfds[3];
+
+ pipe(pipefds);
+ childfds[0] = pipefds[1]; parentfds[0] = pipefds[0]; /* stdin */
+ pipe(pipefds);
+ childfds[1] = pipefds[0]; parentfds[1] = pipefds[1]; /* stdout */
+ pipe(pipefds);
+ childfds[2] = pipefds[0]; parentfds[2] = pipefds[1]; /* stderr */
+
syslog(LOG_INFO, "Running cmdline: %s\n", cmd);
if ((pid = fork()) < 0) {
@@ -114,21 +126,22 @@ run_cmd(const char *cmd, int p_in, int p_out, int p_err)
setpgrp();
sigprocmask(SIG_SETMASK, &orig_sigmask, NULL);
- /* Connect stdin, stdout, and stderr to qarsh */
- new_in = connect_to_peer(&peername, p_in);
- if (new_in == -1) syslog(LOG_WARNING, "connect to new_in failed");
- dup2(new_in, fileno(stdin));
- new_out = connect_to_peer(&peername, p_out);
- if (new_out == -1) syslog(LOG_WARNING, "connect to new_out failed");
- dup2(new_out, fileno(stdout));
- new_err = connect_to_peer(&peername, p_err);
- if (new_err == -1) syslog(LOG_WARNING, "connect to new_err failed");
- dup2(new_err, fileno(stderr));
+ /* Connect stdin, stdout, and stderr to parent's pipes */
+ dup2(parentfds[0], fileno(stdin));
+ dup2(parentfds[1], fileno(stdout));
+ dup2(parentfds[2], fileno(stderr));
+ /* close end of pipes we're not using */
+ close(childfds[0]);
+ close(childfds[1]);
+ close(childfds[2]);
execlp("sh", "sh", "-c", cmd, NULL);
fprintf(stderr, "exec of %s failed: %d, %s\n", cmd, errno, strerror(errno));
exit(127);
}
+ close(parentfds[0]);
+ close(parentfds[1]);
+ close(parentfds[2]);
return pid;
}
@@ -225,7 +238,7 @@ pushfile(const char *path, int remfd)
} else {
qp = make_qp_data(remfd, offset, nbytes, buf);
}
- send_packet(fileno(stdout), qp);
+ send_packet(qoutfd, qp);
offset += nbytes;
qpfree(qp);
} while (nbytes > 0);
@@ -259,19 +272,22 @@ rstat(const char *path)
return rp;
}
+/* Handle only qarsh related packets */
void
-handle_packets(int infd)
+handle_qarsh()
{
- fd_set rfds;
- int nfd;
+ fd_set rfds, wfds;
+ int nfd, maxfd;
struct timespec timeout;
- struct qa_packet *qp = NULL, *rp = NULL;
sigset_t sigmask;
struct sigaction sa;
-
- pid_t child_pid = 0;
int child_status;
- off_t nbytes;
+ struct qa_packet *qp = NULL, *rp = NULL;
+ int allowed_out = 0, allowed_err = 0; /* number of bytes we can send to client */
+ char buf[4096], buf_in[4096];
+ int eof_in = 0;
+ int z_in = 0;
+ int nbytes;
sigemptyset(&sigmask);
sigaddset(&sigmask, SIGCHLD);
@@ -280,28 +296,171 @@ handle_packets(int infd)
sa.sa_mask = sigmask;
sa.sa_flags = SA_RESTART;
sigaction(SIGCHLD, &sa, NULL);
- if (dopause) {
- signal(SIGALRM, sig_handler);
- pause();
- signal(SIGALRM, SIG_DFL);
- }
+ qp = make_qp_data_allow(0, 4096);
+ send_packet(qoutfd, qp);
+ qpfree(qp);
+
for (;;) {
- FD_SET(infd, &rfds);
- timeout.tv_sec = 3;
- timeout.tv_nsec = 0;
- nfd = pselect(infd+1, &rfds, NULL, NULL, &timeout, &orig_sigmask);
+ FD_ZERO(&rfds); FD_ZERO(&wfds);
+ FD_SET(qinfd, &rfds);
+ maxfd = qinfd;
+ if (childfds[0] > -1 && z_in) {
+ FD_SET(childfds[0], &wfds);
+ maxfd = childfds[0] > maxfd ? childfds[0] : maxfd;
+ }
+ if (childfds[1] > -1 && allowed_out) {
+ FD_SET(childfds[1], &rfds);
+ maxfd = childfds[1] > maxfd ? childfds[1] : maxfd;
+ }
+ if (childfds[2] > -1 && allowed_err) {
+ FD_SET(childfds[2], &rfds);
+ maxfd = childfds[2] > maxfd ? childfds[2] : maxfd;
+ }
+ nfd = pselect(maxfd+1, &rfds, &wfds, NULL, &timeout, &orig_sigmask);
if (child_exitted) {
waitpid(child_pid, &child_status, 0);
child_exitted--;
child_pid = 0;
rp = make_qp_cmdexit(child_pid, child_status);
- send_packet(fileno(stdout), rp);
+ send_packet(qoutfd, rp);
qpfree(rp);
}
+ if (nfd < 0) {
+ if (errno == EINTR) {
+ /* signals handled above here */
+ continue;
+ } else {
+ syslog(LOG_ERR, "select errno %d, %s\n",
+ errno, strerror(errno));
+ }
+ } else if (nfd > 0) {
+ if (nfd && FD_ISSET(qinfd, &rfds)) {
+ qp = recv_packet(qinfd);
+ if (qp == NULL) {
+ if (debug) syslog(LOG_DEBUG, "That's enough\n");
+ break;
+ }
+ if (debug) dump_qp(qp);
+ switch(qp->qp_type) {
+ case QP_KILL:
+ if (child_pid) {
+ syslog(LOG_INFO, "Sending child %d signal %d",
+ child_pid, qp->qp_kill.qp_sig);
+ kill(child_pid * -1, qp->qp_kill.qp_sig);
+ }
+ break;
+ case QP_RUNCMD:
+ syslog(LOG_ERR, "Only one command allowed per connection");
+ break;
+ case QP_DATA:
+ if (qp->qp_data.qp_remfd != 0) {
+ syslog(LOG_ERR, "Received data for fd %d\n", qp->qp_data.qp_remfd);
+ break;
+ }
+ if (qp->qp_data.qp_count > (QARSHD_BUFSIZE - z_in)) {
+ syslog(LOG_ERR, "Received too much data for fd %d, %d > %d\n",
+ qp->qp_data.qp_remfd, qp->qp_data.qp_count, QARSHD_BUFSIZE - z_in);
+ break;
+ }
+ if (qp->qp_data.qp_count == 0) eof_in = 1;
+ memcpy(buf_in+z_in, qp->qp_data.qp_blob, qp->qp_data.qp_count);
+ z_in += qp->qp_data.qp_count;
+ break;
+ case QP_DALLOW:
+ if (qp->qp_dallow.qp_remfd == 1) {
+ allowed_out += qp->qp_dallow.qp_count;
+ } else if (qp->qp_dallow.qp_remfd == 2) {
+ allowed_err += qp->qp_dallow.qp_count;
+ } else {
+ syslog(LOG_ERR, "Received data allow for fd %d\n", qp->qp_dallow.qp_remfd);
+ }
+ break;
+ default:
+ syslog(LOG_ERR, "Expected message type %s", qp_packet_type(qp->qp_type));
+ break;
+ }
+ nfd--;
+ }
+ if (nfd && FD_ISSET(childfds[0], &wfds)) {
+ /* Child is ready for data on stdin */
+ nbytes = write(childfds[0], buf_in, z_in);
+ if (nbytes == z_in) {
+ z_in = 0;
+ if (eof_in) {
+ close(childfds[0]);
+ childfds[0] = -1;
+ }
+ } else {
+ memmove(buf_in, buf_in+nbytes, z_in - nbytes);
+ z_in -= nbytes;
+ }
+ if (!eof_in) {
+ qp = make_qp_data_allow(0, nbytes);
+ send_packet(qoutfd, qp);
+ qpfree(qp);
+ }
+ nfd--;
+ }
+ if (nfd && FD_ISSET(childfds[1], &rfds)) {
+ /* Child has something to send to stdout */
+ nbytes = read(childfds[1], buf, allowed_out);
+ qp = make_qp_data(1, 0, nbytes, buf);
+ send_packet(qoutfd, qp);
+ qpfree(qp);
+ allowed_out -= nbytes;
+ if (nbytes == 0) {
+ close(childfds[1]);
+ childfds[1] = -1;
+ allowed_out = 0;
+ }
+ nfd--;
+ }
+ if (nfd && FD_ISSET(childfds[2], &rfds)) {
+ /* Child has something to send to stderr */
+ nbytes = read(childfds[2], buf, allowed_err);
+ qp = make_qp_data(2, 0, nbytes, buf);
+ send_packet(qoutfd, qp);
+ qpfree(qp);
+ allowed_err -= nbytes;
+ if (nbytes == 0) {
+ close(childfds[2]);
+ childfds[2] = -1;
+ allowed_err = 0;
+ }
+ nfd--;
+ }
+
+ } else {
+ if (debug) syslog(LOG_DEBUG, "Nothing to do in handle_qarsh\n");
+ }
+ }
+}
+void
+handle_packets()
+{
+ fd_set rfds;
+ int nfd;
+ struct timeval timeout;
+ struct qa_packet *qp = NULL, *rp = NULL;
+
+ off_t nbytes;
+
+ if (dopause) {
+ signal(SIGALRM, sig_handler);
+ pause();
+ signal(SIGALRM, SIG_DFL);
+ }
+
+ for (;;) {
+ FD_SET(qinfd, &rfds);
+ timeout.tv_sec = 3;
+ timeout.tv_usec = 0;
+
+ nfd = select(qinfd+1, &rfds, NULL, NULL, &timeout);
if (nfd < 0) {
if (errno == EINTR) {
/* signals handled above here */
@@ -311,21 +470,13 @@ handle_packets(int infd)
errno, strerror(errno));
}
} else if (nfd > 0) {
- qp = recv_packet(infd);
+ qp = recv_packet(qinfd);
if (qp == NULL) {
if (debug) syslog(LOG_DEBUG, "That's enough\n");
break;
}
if (debug) dump_qp(qp);
switch (qp->qp_type) {
- case QP_KILL:
- if (child_pid) {
-
- syslog(LOG_INFO, "Sending child %d signal %d",
- child_pid, qp->qp_kill.qp_sig);
- kill(child_pid * -1, qp->qp_kill.qp_sig);
- }
- break;
case QP_SETUSER:
if (setup_user(qp->qp_setuser.qp_user,
qp->qp_setuser.qp_group) == 0) {
@@ -333,25 +484,24 @@ handle_packets(int infd)
} else {
rp = make_qp_ack(QP_SETUSER, 1);
}
- send_packet(fileno(stdout), rp);
+ send_packet(qoutfd, rp);
qpfree(rp);
break;
case QP_RUNCMD:
- child_pid = run_cmd(qp->qp_runcmd.qp_cmdline,
- qp->qp_runcmd.qp_stdin_port,
- qp->qp_runcmd.qp_stdout_port,
- qp->qp_runcmd.qp_stderr_port);
+ child_pid = run_cmd(qp->qp_runcmd.qp_cmdline);
+ qpfree(qp);
+ handle_qarsh(); /* continue processing qarsh separately */
break;
case QP_RECVFILE: /* Setup file descriptors to handle incoming data */
rp = prepare_recvfile(qp);
- send_packet(fileno(stdout), rp);
+ send_packet(qoutfd, rp);
qpfree(rp);
break;
case QP_DATA:
assert(qp->qp_data.qp_remfd == remotefds[0].fd);
rp = receive_data(&remotefds[0], qp);
if (rp) {
- send_packet(fileno(stdout), rp);
+ send_packet(qoutfd, rp);
qpfree(rp);
}
break;
@@ -366,15 +516,16 @@ handle_packets(int infd)
} else {
rp = make_qp_returncode(0, 0, "Transfer Complete");
}
- send_packet(fileno(stdout), rp);
+ send_packet(qoutfd, rp);
qpfree(rp);
break;
case QP_RSTAT:
syslog(LOG_INFO, "Got a QP_RSTAT with path = %s\n",
qp->qp_rstat.qp_path);
rp = rstat(qp->qp_rstat.qp_path);
- send_packet(fileno(stdout), rp);
+ send_packet(qoutfd, rp);
qpfree(rp);
+ qpfree(qp);
break;
default:
syslog(LOG_WARNING,
@@ -383,7 +534,7 @@ handle_packets(int infd)
}
qpfree(qp);
} else {
- if (debug) syslog(LOG_DEBUG, "Nothing to do\n");
+ if (debug) syslog(LOG_DEBUG, "Nothing to do in handle_packets\n");
}
}
@@ -394,6 +545,7 @@ main(int argc, char *argv[])
{
int ch;
socklen_t peerlen;
+ struct sockaddr_storage peername;
char peer_hoststr[NI_MAXHOST];
char peer_portstr[NI_MAXSERV];
@@ -424,7 +576,7 @@ main(int argc, char *argv[])
peername.ss_family == AF_INET ? "IPv4" : "IPv6");
/* Start reading packets from stdin */
- handle_packets(0);
+ handle_packets();
return 0;
}