diff options
author | Nathan Straz <nstraz@redhat.com> | 2013-08-28 14:16:30 -0400 |
---|---|---|
committer | Nathan Straz <nstraz@redhat.com> | 2013-09-11 17:49:33 -0400 |
commit | ceaf5360d969f2507018f51876f64aaae767e367 (patch) | |
tree | 07d3db73a4186ce2dcad660393e9265b0ff254ca /qarsh.c | |
parent | 604b053a1e710f22226fcb86c34e737df1058f92 (diff) | |
download | qarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.gz qarsh-ceaf5360d969f2507018f51876f64aaae767e367.tar.xz qarsh-ceaf5360d969f2507018f51876f64aaae767e367.zip |
Get commands running over one socket
Added a new packet to limit data sent from the other side.
Diffstat (limited to 'qarsh.c')
-rw-r--r-- | qarsh.c | 224 |
1 files changed, 94 insertions, 130 deletions
@@ -183,87 +183,21 @@ run_remote_cmd(char *cmdline) { struct qa_packet *qp; int rc; - int p_in, p_out, p_err; /* Port numbers */ - int l_in, l_out, l_err; /* listening sockets */ - int c_in, c_out, c_err; /* client sockets */ - char *b_in, *b_out, *b_err; /* Buffers */ - int z_in, z_out, z_err; /* size in buffer */ - fd_set readfds, testfds, testwds; + int allowed_in = 0; /* bytes we can send to qarshd */ + char b_out[QARSH_BUFSIZE], b_err[QARSH_BUFSIZE]; /* Buffers */ + char buf[QARSH_BUFSIZE]; + int z_out = 0, z_err = 0; /* size in buffer */ + int eof_out = 0, eof_err = 0; + fd_set rfds, wfds; int nset; - struct sockaddr_in caddr; - socklen_t clen; + int nbytes; struct timespec timeout; short cmd_finished; - l_in = bind_any(QARSH_MINPORT, qarsh_ss_family); - p_in = getsockport(l_in); - l_out = bind_any(QARSH_MINPORT, qarsh_ss_family); - p_out = getsockport(l_out); - l_err = bind_any(QARSH_MINPORT, qarsh_ss_family); - p_err = getsockport(l_err); - - qp = make_qp_runcmd(cmdline, p_in, p_out, p_err); + qp = make_qp_runcmd(cmdline); send_packet(qarsh_fd, qp); qpfree(qp); - /* Get the stdin, stdout, and stderr connections up before we do work */ - FD_ZERO(&readfds); - FD_SET(l_in, &readfds); - FD_SET(l_out, &readfds); - FD_SET(l_err, &readfds); - c_in = c_out = c_err = 0; - - do { - testfds = readfds; - nset = select(FD_SETSIZE, &testfds, NULL, NULL, NULL); - - if (FD_ISSET(l_in, &testfds)) { - clen = sizeof caddr; - c_in = accept(l_in, (struct sockaddr *)&caddr, &clen); - if (c_in == -1) { - fprintf(stderr, - "accept of l_in failed, %d: %s\n", - errno, strerror(errno)); - continue; - } - } - if (FD_ISSET(l_out, &testfds)) { - clen = sizeof caddr; - c_out = accept(l_out, (struct sockaddr *)&caddr, &clen); - if (c_out == -1) { - fprintf(stderr, - "accept of l_out failed, %d: %s\n", - errno, strerror(errno)); - continue; - } - } - if (FD_ISSET(l_err, &testfds)) { - clen = sizeof caddr; - c_err = accept(l_err, (struct sockaddr *)&caddr, &clen); - if (c_err == -1) { - fprintf(stderr, - "accept of l_err failed, %d: %s\n", - errno, strerror(errno)); - continue; - } - } - } while (c_in == 0 || c_out == 0 || c_err == 0); - close(l_in); - close(l_out); - close(l_err); - l_in = l_out = l_err = -1; - - /* Now we can start doing some real work */ - - FD_ZERO(&readfds); - FD_SET(qarsh_fd, &readfds); - FD_SET(c_out, &readfds); - FD_SET(c_err, &readfds); - /* Ignore stdin if it's a tty, we don't want to deal with - * tty i/o if we're a background process */ - if (!isatty(fileno(stdin))) { - FD_SET(fileno(stdin), &readfds); - } /* Setup signal handling stuff so we can propogate signals */ setup_signals(); @@ -272,10 +206,14 @@ run_remote_cmd(char *cmdline) "fcntl stdin O_NONBLOCK failed, %d: %s\n", errno, strerror(errno)); } - /* allocate one buffer for each stream */ - b_in = malloc(QARSH_BUFSIZE); memset(b_in, 0, QARSH_BUFSIZE); z_in = 0; - b_out = malloc(QARSH_BUFSIZE); memset(b_out, 0, QARSH_BUFSIZE); z_out = 0; - b_err = malloc(QARSH_BUFSIZE); memset(b_err, 0, QARSH_BUFSIZE); z_err = 0; + + /* Tell qarshd how much data it can send on stdout and stderr */ + qp = make_qp_data_allow(1, QARSH_BUFSIZE); + send_packet(qarsh_fd, qp); + qpfree(qp); + qp = make_qp_data_allow(2, QARSH_BUFSIZE); + send_packet(qarsh_fd, qp); + qpfree(qp); hbeat(qarsh_hb); cmd_finished = 0; @@ -287,17 +225,19 @@ run_remote_cmd(char *cmdline) timeout.tv_sec = 5; timeout.tv_nsec = 0; } - testfds = readfds; - FD_ZERO(&testwds); - /* If there are things to write, set the write bit and clear the read */ - /* We use one buffer per channel and we don't want to overwrite it */ - if (z_in) { FD_CLR(fileno(stdin), &testfds); FD_SET(c_in, &testwds); } - if (z_out) { FD_CLR(c_out, &testfds); FD_SET(fileno(stdout), &testwds); } - if (z_err) { FD_CLR(c_err, &testfds); FD_SET(fileno(stderr), &testwds); } - - nset = pselect(FD_SETSIZE, &testfds, &testwds, NULL, &timeout, + FD_ZERO(&rfds); + FD_ZERO(&wfds); + FD_SET(qarsh_fd, &rfds); + /* Ignore stdin if it's a tty, we don't want to deal with + * tty i/o if we're a background process */ + if (!isatty(fileno(stdin)) && allowed_in) { FD_SET(fileno(stdin), &rfds); } + if (z_out) { FD_SET(fileno(stdout), &wfds); } + if (z_err) { FD_SET(fileno(stderr), &wfds); } + + nset = pselect(FD_SETSIZE, &rfds, &wfds, NULL, &timeout, &pselect_sigmask); + /* Timeout hit, send a heartbeat */ if (nset == 0) { if (!hbeat(qarsh_hb)) { /* If the heartbeat fails, we should exit now. @@ -322,76 +262,98 @@ run_remote_cmd(char *cmdline) */ hbeat_setstate(qarsh_hb, HOST_ALIVE); - if (nset && FD_ISSET(fileno(stdin), &testfds)) { - z_in = read(fileno(stdin), b_in, QARSH_BUFSIZE); - if (z_in == 0) { - FD_CLR(fileno(stdin), &readfds); + if (nset && FD_ISSET(fileno(stdin), &rfds)) { + nbytes = read(fileno(stdin), buf, allowed_in); + qp = make_qp_data(0, 0, nbytes, buf); + send_packet(qarsh_fd, qp); + qpfree(qp); + allowed_in -= nbytes; + if (nbytes == 0) { close(fileno(stdin)); - close(c_in); - c_in = 0; + allowed_in = 0; } nset--; } - if (nset && c_in && FD_ISSET(c_in, &testwds)) { - write(c_in, b_in, z_in); - z_in = 0; - memset(b_in, 0, QARSH_BUFSIZE); - } - if (nset && c_out && FD_ISSET(c_out, &testfds)) { - z_out = read(c_out, b_out, QARSH_BUFSIZE); - if (z_out == 0) { - FD_CLR(c_out, &readfds); - close(c_out); - c_out = 0; + if (nset && FD_ISSET(fileno(stdout), &wfds)) { + nbytes = write(fileno(stdout), b_out, z_out); + if (nbytes == z_out) { + z_out = 0; + if (eof_out) close(fileno(stdout)); + } else { + memmove(b_out, b_out+nbytes, z_out - nbytes); + z_out -= nbytes; + } + if (!eof_out) { + qp = make_qp_data_allow(1, nbytes); + send_packet(qarsh_fd, qp); + qpfree(qp); } nset--; } - if (nset && FD_ISSET(fileno(stdout), &testwds)) { - write(fileno(stdout), b_out, z_out); - z_out = 0; - } - if (nset && c_err && FD_ISSET(c_err, &testfds)) { - z_err = read(c_err, b_err, QARSH_BUFSIZE); - if (z_err == 0) { - FD_CLR(c_err, &readfds); - close(c_err); - c_err = 0; + if (nset && FD_ISSET(fileno(stderr), &wfds)) { + nbytes = write(fileno(stderr), b_err, z_err); + if (nbytes == z_err) { + z_err = 0; + if (eof_err) close(fileno(stderr)); + } else { + memmove(b_err, b_err+nbytes, z_err - nbytes); + z_err -= nbytes; + } + if (!eof_err) { + qp = make_qp_data_allow(2, nbytes); + send_packet(qarsh_fd, qp); + qpfree(qp); } nset--; } - if (nset && FD_ISSET(fileno(stderr), &testwds)) { - write(fileno(stderr), b_err, z_err); - z_err = 0; - } - if (nset && FD_ISSET(qarsh_fd, &testfds)) { + if (nset && FD_ISSET(qarsh_fd, &rfds)) { qp = recv_packet(qarsh_fd); if (qp == NULL) { fprintf(stderr, "recv_packet() returned NULL!\n:"); break; } - /* dump_qp(qp); */ - if (qp && qp->qp_type == QP_CMDEXIT) { + if (qp->qp_type == QP_CMDEXIT) { cmd_finished = 1; rc = qp->qp_cmdexit.qp_status; - qpfree(qp); /* Don't break yet, we need to make * sure all output is read. */ + } else if (qp->qp_type == QP_DATA) { + if (qp->qp_data.qp_remfd == 1 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_out)) { + if (qp->qp_data.qp_count == 0) eof_out = 1; + memcpy(b_out+z_out, qp->qp_data.qp_blob, qp->qp_data.qp_count); + z_out += qp->qp_data.qp_count; + } else if (qp->qp_data.qp_remfd == 2 && qp->qp_data.qp_count <= (QARSH_BUFSIZE - z_err)) { + if (qp->qp_data.qp_count == 0) eof_err = 1; + memcpy(b_err+z_err, qp->qp_data.qp_blob, qp->qp_data.qp_count); + z_err += qp->qp_data.qp_count; + } else { + fprintf(stderr, "ERROR: Bad data packet: fd %d, cnt: %d\n, bufleft: %d, %d", + qp->qp_data.qp_remfd, qp->qp_data.qp_count, + QARSH_BUFSIZE - z_out, QARSH_BUFSIZE - z_err); + } + } else if (qp->qp_type == QP_DALLOW) { + if (qp->qp_dallow.qp_remfd == 0) { + allowed_in += qp->qp_dallow.qp_count; + } else { + fprintf(stderr, "ERROR: Received data allow for fd %d\n", + qp->qp_dallow.qp_remfd); + } } + qpfree(qp); nset--; } } - if (cmd_finished && c_out == 0 && c_err == 0) { - /* If the command is complete and both output - * sockets are closed, we can exit now. We need - * to test all conditions at once so if none are - * true, we'll still check for heartbeat. */ + if (cmd_finished + && eof_out && z_out == 0 + && eof_err && z_err == 0) { + /* If the command is complete, we've seen EOF + * on outputs and both output buffers are empty we can + * exit now. We need to test all conditions at once so + * if none are true, we'll still check for heartbeat. */ break; } } - if (c_out) close(c_out); - if (c_err) close(c_err); - free(b_in); free(b_out); free(b_err); if (hbeat_getstate(qarsh_hb) == HOST_TIMEOUT) { fprintf(stderr, "Didn't receive heartbeat for %d seconds\n", hbeat_getmaxtimeout(qarsh_hb)); @@ -539,7 +501,9 @@ again: ret = run_remote_cmd(args); close(qarsh_fd); + hbeat_free(qarsh_hb); free(args); + free(remuser); /* If the remote cmd was killed, we need to be killed too */ if (WIFSIGNALED(ret)) { reset_signals(); |