From 45afaadb739b75f4ef2ce84650ff05f5a21cbf6c Mon Sep 17 00:00:00 2001 From: Nate Straz Date: Thu, 3 Jul 2008 14:48:52 +0000 Subject: Add write file descriptors to main select() call. When running rsync on an existing directory structure, rsync may be too busy to read everything that qarsh is writing to it from the remote rsync daemon. Create a buffer for each of stdin, stdout, and stderr and keep it around until we are able to write it, holding off further reads until it can be written. We still don't handle partial writes. --- qarsh.c | 56 +++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/qarsh.c b/qarsh.c index 404a7fa..e1380c3 100644 --- a/qarsh.c +++ b/qarsh.c @@ -174,13 +174,13 @@ int run_remote_cmd(char *cmdline) { struct qa_packet *qp; - char *buf; - int bufsize; int rc; int p_in, p_out, p_err; /* Port numbers */ int l_in, l_out, l_err; /* listening sockets */ int c_in, c_out, c_err; /* client sockets */ - fd_set readfds, testfds; + char *b_in, *b_out, *b_err; /* Buffers */ + int z_in, z_out, z_err; /* size in buffer */ + fd_set readfds, testfds, testwds; int nset; struct sockaddr_in caddr; socklen_t clen; @@ -265,8 +265,10 @@ run_remote_cmd(char *cmdline) "fcntl stdin O_NONBLOCK failed, %d: %s\n", errno, strerror(errno)); } - buf = malloc(QARSH_BUFSIZE); - memset(buf, 0, QARSH_BUFSIZE); + /* allocate one buffer for each stream */ + b_in = malloc(QARSH_BUFSIZE); memset(b_in, 0, QARSH_BUFSIZE); z_in = 0; + b_out = malloc(QARSH_BUFSIZE); memset(b_out, 0, QARSH_BUFSIZE); z_out = 0; + b_err = malloc(QARSH_BUFSIZE); memset(b_err, 0, QARSH_BUFSIZE); z_err = 0; hbeat(qarsh_hb); cmd_finished = 0; @@ -279,9 +281,14 @@ run_remote_cmd(char *cmdline) timeout.tv_nsec = 0; } testfds = readfds; - memset(buf, 0, QARSH_BUFSIZE); - - nset = pselect(FD_SETSIZE, &testfds, NULL, NULL, &timeout, + FD_ZERO(&testwds); + /* If there are things to write, set the write bit and clear the read */ + /* We use one buffer per channel and we don't want to overwrite it */ + if (z_in) { FD_CLR(fileno(stdin), &testfds); FD_SET(c_in, &testwds); } + if (z_out) { FD_CLR(c_out, &testfds); FD_SET(fileno(stdout), &testwds); } + if (z_err) { FD_CLR(c_err, &testfds); FD_SET(fileno(stderr), &testwds); } + + nset = pselect(FD_SETSIZE, &testfds, &testwds, NULL, &timeout, &pselect_sigmask); if (nset == 0) { @@ -313,10 +320,8 @@ run_remote_cmd(char *cmdline) hbeat_setstate(qarsh_hb, HOST_ALIVE); if (nset && FD_ISSET(fileno(stdin), &testfds)) { - bufsize = read(fileno(stdin), buf, QARSH_BUFSIZE); - if (bufsize > 0) { - write(c_in, buf, bufsize); - } else if (bufsize == 0) { + z_in = read(fileno(stdin), b_in, QARSH_BUFSIZE); + if (z_in == 0) { FD_CLR(fileno(stdin), &readfds); close(fileno(stdin)); close(c_in); @@ -324,28 +329,37 @@ run_remote_cmd(char *cmdline) } nset--; } + if (nset && c_in && FD_ISSET(c_in, &testwds)) { + write(c_in, b_in, z_in); + z_in = 0; + memset(b_in, 0, QARSH_BUFSIZE); + } if (nset && c_out && FD_ISSET(c_out, &testfds)) { - bufsize = read(c_out, buf, QARSH_BUFSIZE); - if (bufsize > 0) { - write(fileno(stdout), buf, bufsize); - } else if (bufsize == 0) { + z_out = read(c_out, b_out, QARSH_BUFSIZE); + if (z_out == 0) { FD_CLR(c_out, &readfds); close(c_out); c_out = 0; } nset--; } + if (nset && FD_ISSET(fileno(stdout), &testwds)) { + write(fileno(stdout), b_out, z_out); + z_out = 0; + } if (nset && c_err && FD_ISSET(c_err, &testfds)) { - bufsize = read(c_err, buf, QARSH_BUFSIZE); - if (bufsize > 0) { - write(fileno(stderr), buf, bufsize); - } else if (bufsize == 0) { + z_err = read(c_err, b_err, QARSH_BUFSIZE); + if (z_err == 0) { FD_CLR(c_err, &readfds); close(c_err); c_err = 0; } nset--; } + if (nset && FD_ISSET(fileno(stderr), &testwds)) { + write(fileno(stderr), b_err, z_err); + z_err = 0; + } if (nset && FD_ISSET(qarsh_fd, &testfds)) { qp = recv_packet(qarsh_fd); if (qp == NULL) { @@ -364,7 +378,7 @@ run_remote_cmd(char *cmdline) } if (c_out) close(c_out); if (c_err) close(c_err); - free(buf); + free(b_in); free(b_out); free(b_err); if (hbeat_getstate(qarsh_hb) == HOST_TIMEOUT) { fprintf(stderr, "Didn't receive heartbeat for %d seconds\n", hbeat_getmaxtimeout(qarsh_hb)); -- cgit