summaryrefslogtreecommitdiffstats
path: root/qarsh.c
diff options
context:
space:
mode:
authorNathan Straz <nstraz@redhat.com>2013-08-28 14:16:30 -0400
committerNathan Straz <nstraz@redhat.com>2013-09-11 17:49:33 -0400
commitceaf5360d969f2507018f51876f64aaae767e367 (patch)
tree07d3db73a4186ce2dcad660393e9265b0ff254ca /qarsh.c
parent604b053a1e710f22226fcb86c34e737df1058f92 (diff)
downloadqarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.gz
qarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.xz
qarsh-ceaf5360d969f2507018f51876f64aaae767e367.zip
Get commands running over one socket
Added a new packet to limit data sent from the other side.
Diffstat (limited to 'qarsh.c')
-rw-r--r--qarsh.c224
1 files changed, 94 insertions, 130 deletions
diff --git a/qarsh.c b/qarsh.c
index 62dd60d..a3fa325 100644
--- a/qarsh.c
+++ b/qarsh.c
@@ -183,87 +183,21 @@ run_remote_cmd(char *cmdline)
{
struct qa_packet *qp;
int rc;
- int p_in, p_out, p_err; /* Port numbers */
- int l_in, l_out, l_err; /* listening sockets */
- int c_in, c_out, c_err; /* client sockets */
- char *b_in, *b_out, *b_err; /* Buffers */
- int z_in, z_out, z_err; /* size in buffer */
- fd_set readfds, testfds, testwds;
+ int allowed_in = 0; /* bytes we can send to qarshd */
+ char b_out[QARSH_BUFSIZE], b_err[QARSH_BUFSIZE]; /* Buffers */
+ char buf[QARSH_BUFSIZE];
+ int z_out = 0, z_err = 0; /* size in buffer */
+ int eof_out = 0, eof_err = 0;
+ fd_set rfds, wfds;
int nset;
- struct sockaddr_in caddr;
- socklen_t clen;
+ int nbytes;
struct timespec timeout;
short cmd_finished;
- l_in = bind_any(QARSH_MINPORT, qarsh_ss_family);
- p_in = getsockport(l_in);
- l_out = bind_any(QARSH_MINPORT, qarsh_ss_family);
- p_out = getsockport(l_out);
- l_err = bind_any(QARSH_MINPORT, qarsh_ss_family);
- p_err = getsockport(l_err);
-
- qp = make_qp_runcmd(cmdline, p_in, p_out, p_err);
+ qp = make_qp_runcmd(cmdline);
send_packet(qarsh_fd, qp);
qpfree(qp);
- /* Get the stdin, stdout, and stderr connections up before we do work */
- FD_ZERO(&readfds);
- FD_SET(l_in, &readfds);
- FD_SET(l_out, &readfds);
- FD_SET(l_err, &readfds);
- c_in = c_out = c_err = 0;
-
- do {
- testfds = readfds;
- nset = select(FD_SETSIZE, &testfds, NULL, NULL, NULL);
-
- if (FD_ISSET(l_in, &testfds)) {
- clen = sizeof caddr;
- c_in = accept(l_in, (struct sockaddr *)&caddr, &clen);
- if (c_in == -1) {
- fprintf(stderr,
- "accept of l_in failed, %d: %s\n",
- errno, strerror(errno));
- continue;
- }
- }
- if (FD_ISSET(l_out, &testfds)) {
- clen = sizeof caddr;
- c_out = accept(l_out, (struct sockaddr *)&caddr, &clen);
- if (c_out == -1) {
- fprintf(stderr,
- "accept of l_out failed, %d: %s\n",
- errno, strerror(errno));
- continue;
- }
- }
- if (FD_ISSET(l_err, &testfds)) {
- clen = sizeof caddr;
- c_err = accept(l_err, (struct sockaddr *)&caddr, &clen);
- if (c_err == -1) {
- fprintf(stderr,
- "accept of l_err failed, %d: %s\n",
- errno, strerror(errno));
- continue;
- }
- }
- } while (c_in == 0 || c_out == 0 || c_err == 0);
- close(l_in);
- close(l_out);
- close(l_err);
- l_in = l_out = l_err = -1;
-
- /* Now we can start doing some real work */
-
- FD_ZERO(&readfds);
- FD_SET(qarsh_fd, &readfds);
- FD_SET(c_out, &readfds);
- FD_SET(c_err, &readfds);
- /* Ignore stdin if it's a tty, we don't want to deal with
- * tty i/o if we're a background process */
- if (!isatty(fileno(stdin))) {
- FD_SET(fileno(stdin), &readfds);
- }
/* Setup signal handling stuff so we can propogate signals */
setup_signals();
@@ -272,10 +206,14 @@ run_remote_cmd(char *cmdline)
"fcntl stdin O_NONBLOCK failed, %d: %s\n",
errno, strerror(errno));
}
- /* allocate one buffer for each stream */
- b_in = malloc(QARSH_BUFSIZE); memset(b_in, 0, QARSH_BUFSIZE); z_in = 0;
- b_out = malloc(QARSH_BUFSIZE); memset(b_out, 0, QARSH_BUFSIZE); z_out = 0;
- b_err = malloc(QARSH_BUFSIZE); memset(b_err, 0, QARSH_BUFSIZE); z_err = 0;
+
+ /* Tell qarshd how much data it can send on stdout and stderr */
+ qp = make_qp_data_allow(1, QARSH_BUFSIZE);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
+ qp = make_qp_data_allow(2, QARSH_BUFSIZE);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
hbeat(qarsh_hb);
cmd_finished = 0;
@@ -287,17 +225,19 @@ run_remote_cmd(char *cmdline)
timeout.tv_sec = 5;
timeout.tv_nsec = 0;
}
- testfds = readfds;
- FD_ZERO(&testwds);
- /* If there are things to write, set the write bit and clear the read */
- /* We use one buffer per channel and we don't want to overwrite it */
- if (z_in) { FD_CLR(fileno(stdin), &testfds); FD_SET(c_in, &testwds); }
- if (z_out) { FD_CLR(c_out, &testfds); FD_SET(fileno(stdout), &testwds); }
- if (z_err) { FD_CLR(c_err, &testfds); FD_SET(fileno(stderr), &testwds); }
-
- nset = pselect(FD_SETSIZE, &testfds, &testwds, NULL, &timeout,
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ FD_SET(qarsh_fd, &rfds);
+ /* Ignore stdin if it's a tty, we don't want to deal with
+ * tty i/o if we're a background process */
+ if (!isatty(fileno(stdin)) && allowed_in) { FD_SET(fileno(stdin), &rfds); }
+ if (z_out) { FD_SET(fileno(stdout), &wfds); }
+ if (z_err) { FD_SET(fileno(stderr), &wfds); }
+
+ nset = pselect(FD_SETSIZE, &rfds, &wfds, NULL, &timeout,
&pselect_sigmask);
+ /* Timeout hit, send a heartbeat */
if (nset == 0) {
if (!hbeat(qarsh_hb)) {
/* If the heartbeat fails, we should exit now.
@@ -322,76 +262,98 @@ run_remote_cmd(char *cmdline)
*/
hbeat_setstate(qarsh_hb, HOST_ALIVE);
- if (nset && FD_ISSET(fileno(stdin), &testfds)) {
- z_in = read(fileno(stdin), b_in, QARSH_BUFSIZE);
- if (z_in == 0) {
- FD_CLR(fileno(stdin), &readfds);
+ if (nset && FD_ISSET(fileno(stdin), &rfds)) {
+ nbytes = read(fileno(stdin), buf, allowed_in);
+ qp = make_qp_data(0, 0, nbytes, buf);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
+ allowed_in -= nbytes;
+ if (nbytes == 0) {
close(fileno(stdin));
- close(c_in);
- c_in = 0;
+ allowed_in = 0;
}
nset--;
}
- if (nset && c_in && FD_ISSET(c_in, &testwds)) {
- write(c_in, b_in, z_in);
- z_in = 0;
- memset(b_in, 0, QARSH_BUFSIZE);
- }
- if (nset && c_out && FD_ISSET(c_out, &testfds)) {
- z_out = read(c_out, b_out, QARSH_BUFSIZE);
- if (z_out == 0) {
- FD_CLR(c_out, &readfds);
- close(c_out);
- c_out = 0;
+ if (nset && FD_ISSET(fileno(stdout), &wfds)) {
+ nbytes = write(fileno(stdout), b_out, z_out);
+ if (nbytes == z_out) {
+ z_out = 0;
+ if (eof_out) close(fileno(stdout));
+ } else {
+ memmove(b_out, b_out+nbytes, z_out - nbytes);
+ z_out -= nbytes;
+ }
+ if (!eof_out) {
+ qp = make_qp_data_allow(1, nbytes);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
}
nset--;
}
- if (nset && FD_ISSET(fileno(stdout), &testwds)) {
- write(fileno(stdout), b_out, z_out);
- z_out = 0;
- }
- if (nset && c_err && FD_ISSET(c_err, &testfds)) {
- z_err = read(c_err, b_err, QARSH_BUFSIZE);
- if (z_err == 0) {
- FD_CLR(c_err, &readfds);
- close(c_err);
- c_err = 0;
+ if (nset && FD_ISSET(fileno(stderr), &wfds)) {
+ nbytes = write(fileno(stderr), b_err, z_err);
+ if (nbytes == z_err) {
+ z_err = 0;
+ if (eof_err) close(fileno(stderr));
+ } else {
+ memmove(b_err, b_err+nbytes, z_err - nbytes);
+ z_err -= nbytes;
+ }
+ if (!eof_err) {
+ qp = make_qp_data_allow(2, nbytes);
+ send_packet(qarsh_fd, qp);
+ qpfree(qp);
}
nset--;
}
- if (nset && FD_ISSET(fileno(stderr), &testwds)) {
- write(fileno(stderr), b_err, z_err);
- z_err = 0;
- }
- if (nset && FD_ISSET(qarsh_fd, &testfds)) {
+ if (nset && FD_ISSET(qarsh_fd, &rfds)) {
qp = recv_packet(qarsh_fd);
if (qp == NULL) {
fprintf(stderr, "recv_packet() returned NULL!\n:");
break;
}
- /* dump_qp(qp); */
- if (qp && qp->qp_type == QP_CMDEXIT) {
+ if (qp->qp_type == QP_CMDEXIT) {
cmd_finished = 1;
rc = qp->qp_cmdexit.qp_status;
- qpfree(qp);
/* Don't break yet, we need to make
* sure all output is read. */
+ } else if (qp->qp_type == QP_DATA) {
+ if (qp->qp_data.qp_remfd == 1 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_out)) {
+ if (qp->qp_data.qp_count == 0) eof_out = 1;
+ memcpy(b_out+z_out, qp->qp_data.qp_blob, qp->qp_data.qp_count);
+ z_out += qp->qp_data.qp_count;
+ } else if (qp->qp_data.qp_remfd == 2 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_err)) {
+ if (qp->qp_data.qp_count == 0) eof_err = 1;
+ memcpy(b_err+z_err, qp->qp_data.qp_blob, qp->qp_data.qp_count);
+ z_err += qp->qp_data.qp_count;
+ } else {
+ fprintf(stderr, "ERROR: Bad data packet: fd %d, cnt: %d\n, bufleft: %d, %d",
+ qp->qp_data.qp_remfd, qp->qp_data.qp_count,
+ QARSH_BUFSIZE - z_out, QARSH_BUFSIZE - z_err);
+ }
+ } else if (qp->qp_type == QP_DALLOW) {
+ if (qp->qp_dallow.qp_remfd == 0) {
+ allowed_in += qp->qp_dallow.qp_count;
+ } else {
+ fprintf(stderr, "ERROR: Received data allow for fd %d\n",
+ qp->qp_dallow.qp_remfd);
+ }
}
+ qpfree(qp);
nset--;
}
}
- if (cmd_finished && c_out == 0 && c_err == 0) {
- /* If the command is complete and both output
- * sockets are closed, we can exit now. We need
- * to test all conditions at once so if none are
- * true, we'll still check for heartbeat. */
+ if (cmd_finished
+ && eof_out && z_out == 0
+ && eof_err && z_err == 0) {
+ /* If the command is complete, we've seen EOF
+ * on outputs and both output buffers are empty we can
+ * exit now. We need to test all conditions at once so
+ * if none are true, we'll still check for heartbeat. */
break;
}
}
- if (c_out) close(c_out);
- if (c_err) close(c_err);
- free(b_in); free(b_out); free(b_err);
if (hbeat_getstate(qarsh_hb) == HOST_TIMEOUT) {
fprintf(stderr, "Didn't receive heartbeat for %d seconds\n",
hbeat_getmaxtimeout(qarsh_hb));
@@ -539,7 +501,9 @@ again:
ret = run_remote_cmd(args);
close(qarsh_fd);
+ hbeat_free(qarsh_hb);
free(args);
+ free(remuser);
/* If the remote cmd was killed, we need to be killed too */
if (WIFSIGNALED(ret)) {
reset_signals();