summaryrefslogtreecommitdiffstats
path: root/src/dispatch.c
diff options
context:
space:
mode:
authorNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-27 20:06:26 -0400
committerNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-27 20:06:26 -0400
commitb64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5 (patch)
tree5f57fd0d5074d411aa54a073e973033ac7c35ef6 /src/dispatch.c
parentb595fe2039de58f655e226e4e427530884d11d3f (diff)
downloadslapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.tar.gz
slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.tar.xz
slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.zip
- rewrite dispatching code so that
* we multiplex servicing for connected clients in the same thread that handles datagram clients * nis query routines use a callback rather than a symbol, so that... * ...we can supply different callbacks for datagram and connected clients * nis query routines have the option of giving us a chunk of output and telling us to come back later for more, though for now only nis_all gets the option of doing that, and currently write those chunks immediately in a blocking way
Diffstat (limited to 'src/dispatch.c')
-rw-r--r--src/dispatch.c721
1 files changed, 463 insertions, 258 deletions
diff --git a/src/dispatch.c b/src/dispatch.c
index 34932f1..38197cc 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -26,17 +26,48 @@
#define MAX_CLIENT_IDLE (60 * 1000)
-struct dispatch_stream_client_params {
- int client;
- struct plugin_state *state;
-};
-
struct dispatch_client_data {
int client;
struct sockaddr client_addr;
socklen_t client_addrlen;
};
+/* Send a reply, unbuffered datagram version. */
+static bool_t
+dispatch_reply_fragment_dgram(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf,
+ bool_t first_fragment, bool_t last_fragment)
+{
+ xdr_replymsg(reply_xdrs, reply);
+ if (!first_fragment || !last_fragment) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "Trying to sending datagram reply (%d bytes), "
+ "even though the reply is not suitable for "
+ "transmission as a datagram.\n",
+ xdr_getpos(reply_xdrs));
+ } else {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "Sending datagram reply (%d bytes).\n",
+ xdr_getpos(reply_xdrs));
+ }
+ sendto(cdata->client, reply_buf, xdr_getpos(reply_xdrs),
+ 0, &cdata->client_addr, cdata->client_addrlen);
+ return TRUE;
+}
+static bool_t
+dispatch_reply_dgram(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf)
+{
+ return dispatch_reply_fragment_dgram(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, TRUE);
+}
+
+/* Send a reply, buffered-for-connected-clients version. */
static ssize_t
dispatch_write_with_retry(struct plugin_state *state, int fd,
const void *buffer1, ssize_t length1,
@@ -87,297 +118,449 @@ dispatch_write_with_retry(struct plugin_state *state, int fd,
}
return sent;
}
-
-/* Send a reply which _could_ be sent over a datagram. */
-void
-dispatch_reply_fragment(struct plugin_state *state,
- struct dispatch_client_data *cdata,
- struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
- bool_t first_fragment, bool_t last_fragment)
+static bool_t
+dispatch_reply_fragment_connected(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf,
+ bool_t first_fragment, bool_t last_fragment)
{
uint32_t len;
- if (cdata->client_addrlen != 0) {
- /* It's a datagram socket, so there's nothing to do but send
- * the data by itself. */
+ /* Record reply - first fragment. */
+ if (first_fragment) {
xdr_replymsg(reply_xdrs, reply);
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "Sending datagram reply (%d bytes).\n",
+ }
+ /* Calculate the fragment length bytes. */
+ len = htonl(xdr_getpos(reply_xdrs) |
+ (last_fragment ? 0x80000000 : 0));
+ /* Send the data to the client. */
+ if (dispatch_write_with_retry(state, cdata->client,
+ &len, 4,
+ reply_buf,
+ xdr_getpos(reply_xdrs)) ==
+ 4 + xdr_getpos(reply_xdrs)) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Stream reply (4+%d bytes) sent.\n",
xdr_getpos(reply_xdrs));
- sendto(cdata->client, reply_buf, xdr_getpos(reply_xdrs),
- 0, &cdata->client_addr, cdata->client_addrlen);
+ return TRUE;
} else {
- /* Record reply - one fragment. */
- if (first_fragment) {
- xdr_replymsg(reply_xdrs, reply);
- }
- /* Calculate the fragment length bytes. */
- len = htonl(xdr_getpos(reply_xdrs) |
- (last_fragment ? 0x80000000 : 0));
- /* Send the data to the client. */
- if (dispatch_write_with_retry(state, cdata->client,
- &len, 4,
- reply_buf,
- xdr_getpos(reply_xdrs)) ==
- 4 + xdr_getpos(reply_xdrs)) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "Stream reply (4+%d bytes) sent.\n",
- xdr_getpos(reply_xdrs));
- } else {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "Stream reply (4+%d bytes) failed!\n",
- xdr_getpos(reply_xdrs));
- }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Stream reply (4+%d bytes) failed!\n",
+ xdr_getpos(reply_xdrs));
+ return FALSE;
}
}
-
-/* Send an entire reply datagram or record at once. */
-void
-dispatch_reply(struct plugin_state *state, struct dispatch_client_data *cdata,
- struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf)
+/* Send an entire reply record at once. */
+bool_t
+dispatch_reply_connected(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf)
{
- dispatch_reply_fragment(state, cdata,
- reply, reply_xdrs, reply_buf, TRUE, TRUE);
+ return dispatch_reply_fragment_connected(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, TRUE);
}
-/* Handle requests from one stream client. */
-static void *
-dispatch_stream_handler_thread(void *arg)
+/* Handle a datagram client -- read the request and handle it immediately. */
+static void
+dispatch_dgram(struct plugin_state *state, int fd)
{
- int flags, frag_len, record_len;
- ssize_t i;
- int32_t len, nlen;
- char fragment[65540], record[65536];
- struct pollfd pollfd;
- struct dispatch_stream_client_params *params;
struct dispatch_client_data cdata;
- struct plugin_state *state;
- bool_t last;
+ char dgram[65536];
+ int reqsize;
- /* Recover the full set of parameters. */
- params = arg;
- state = params->state;
- cdata.client = params->client;
- memset(&cdata.client_addr, 0, sizeof(cdata.client_addr));
- cdata.client_addrlen = 0;
- free(arg);
+ /* Read the request. */
+ cdata.client = fd;
+ cdata.client_addrlen = sizeof(cdata.client_addr);
+ reqsize = recvfrom(fd, dgram, sizeof(dgram), 0,
+ &cdata.client_addr, &cdata.client_addrlen);
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "datagram request (%d bytes)\n", reqsize);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "opened client connection %d, thread started\n",
- cdata.client);
+ /* Handle the request. */
+ nis_process_request(state, dgram, reqsize,
+ &dispatch_reply_fragment_dgram,
+ &dispatch_reply_dgram,
+ &cdata, NULL);
+}
- /* Set the connection to be non-blocking. */
- flags = fcntl(cdata.client, F_GETFD);
- if ((flags == -1) ||
- (fcntl(cdata.client, F_SETFD, flags | O_NONBLOCK) == -1)) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "error setting new client connection to be "
- "non-blocking\n");
- close(cdata.client);
+/* Handle all incoming data. */
+struct dispatch_client {
+ /* The client socket and address. */
+ int client_fd;
+ struct sockaddr client_addr;
+ socklen_t client_addrlen;
+ /* The client state. */
+ enum {
+ client_invalid,
+ client_closing,
+ client_reading,
+ client_replying_with_more,
+ client_replying_final,
+ } client_state;
+ /* The client's request while we're reading it. */
+ char client_inbuf[8192];
+ ssize_t client_inbuf_used;
+ char *client_query;
+ ssize_t client_query_size;
+ void *client_query_cookie;
+ /* The reply to the client, when we're sending one. */
+ char client_outbuf[4096];
+ ssize_t client_outbuf_used;
+ /* This is a linked list. */
+ struct dispatch_client *client_next;
+};
+
+/* Set the client's record up to start reading a new query. */
+static void
+client_set_reading(struct plugin_state *state, struct dispatch_client *client)
+{
+ client->client_inbuf_used = 0;
+ free(client->client_query);
+ client->client_query = NULL;
+ client->client_query_size = 0;
+ client->client_outbuf_used = 0;
+ client->client_state = client_reading;
+}
+
+/* Set the client's record up to be cleaned up. */
+static void
+client_set_closing(struct plugin_state *state, struct dispatch_client *client)
+{
+ client->client_inbuf_used = 0;
+ free(client->client_query);
+ client->client_query = NULL;
+ client->client_query_size = 0;
+ client->client_outbuf_used = 0;
+ client->client_state = client_closing;
+}
+
+/* Set the client's record up. */
+static struct dispatch_client *
+dispatch_accept_client(struct plugin_state *state, int fd)
+{
+ struct dispatch_client *client;
+ client = malloc(sizeof(*client));
+ if (client == NULL) {
+ return NULL;
+ }
+ fd = accept(fd, &client->client_addr, &client->client_addrlen);
+ if (fd == -1) {
+ free(client);
return NULL;
}
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "set new client connection to be non-blocking\n");
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "new connected client on %d\n", fd);
+ memset(client, 0, sizeof(*client));
+ client->client_fd = fd;
+ client_set_reading(state, client);
+ return client;
+}
- /* Assemble fragments and responses. */
- frag_len = 0;
- record_len = 0;
- for (;;) {
- /* Wait for incoming data. */
- pollfd.fd = cdata.client;
- pollfd.events = POLLIN | POLLHUP;
- switch (poll(&pollfd, 1, MAX_CLIENT_IDLE)) {
- case -1:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "communication error\n");
- goto done;
- break;
- case 0:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "client timeout\n");
- goto done;
- break;
- default:
- break;
+/* Decide what to do next. */
+static void
+client_interpret_nis_result(struct plugin_state *state,
+ struct dispatch_client *client)
+{
+ if (client->client_query_cookie != NULL) {
+ client->client_state = client_replying_with_more;
+ } else {
+ if (client->client_outbuf_used > 0) {
+ client->client_state = client_replying_final;
+ } else {
+ client_set_reading(state, client);
}
- /* Try to read as much data as we have space to hold. */
- i = read(cdata.client, fragment + frag_len,
- sizeof(fragment) - frag_len);
- switch (i) {
- case -1:
- if (errno == EAGAIN) {
- continue;
+ }
+}
+
+/* Handle reading state. */
+static void
+client_read(struct plugin_state *state, struct dispatch_client *client)
+{
+ ssize_t count;
+ int32_t len, nlen;
+ int last;
+ char *query;
+ struct dispatch_client_data client_data;
+ /* Try to read some data. */
+ count = read(client->client_fd,
+ client->client_inbuf + client->client_inbuf_used,
+ sizeof(client->client_inbuf) - client->client_inbuf_used);
+ if (count <= 0) {
+ if ((count != -1) || (errno != EAGAIN)) {
+ /* Disconnect the client. */
+ if (count == 0) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "no more data from %d, "
+ "marking for closing\n",
+ client->client_fd);
+ } else {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "error reading from %d, "
+ "marking for closing\n",
+ client->client_fd);
}
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "communication error\n");
- goto done;
- break;
- case 0:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "client closed connection\n");
- goto done;
- break;
- default:
- break;
+ client_set_closing(state, client);
}
- /* We managed to read some piece of a fragment. */
+ } else {
+ /* Record the data as added to the fragment buffer. */
+ client->client_inbuf_used += count;
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "read %d bytes\n", i);
- frag_len += i;
- /* If we can't know the fragment size yet, then wait for more
- * data. */
- if (frag_len < 4) {
- continue;
+ "have %d bytes from %d\n",
+ client->client_inbuf_used,
+ client->client_fd);
+ /* Check if we've got a complete fragment. */
+ if (client->client_inbuf_used < 4) {
+ /* We don't even have a length, so continue reading. */
+ return;
}
- /* Convert the fragment length information to local byte order,
- * and determine if this is the last fragment in the record. */
- memcpy(&nlen, fragment, 4);
+ /* Read the length of the first fragment in the buffer. */
+ memcpy(&nlen, client->client_inbuf, 4);
len = ntohl(nlen);
last = ((len & 0x80000000) != 0);
len &= 0x7fffffff;
- /* If we don't have a complete fragment, then wait for more
- * data. */
- if (frag_len < (len + 4)) {
- continue;
- }
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "expecting %d bytes\n", len);
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "in: frag_len=%d,record=%d\n",
- frag_len, record_len);
- /* Pull the fragment out of the buffer and append it to the
- * record buffer. */
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "got whole fragment (%d bytes)\n", len);
- memmove(record + record_len, fragment + 4, len);
- record_len += len;
- memmove(fragment, fragment + (len + 4), frag_len - (len + 4));
- frag_len -= (len + 4);
- /* If there are more fragments to come, then we need to wait
- * for more data. */
- if (!last) {
- continue;
+ "fragment is %d bytes long%s, "
+ "have %d bytes pending, on %d\n",
+ len, last ? " (last one)" : "",
+ client->client_inbuf_used, client->client_fd);
+ if ((len + 4) <= client->client_inbuf_used) {
+ /* Got at least one fragment! */
+ nlen = len + client->client_query_size;
+ query = malloc(nlen);
+ if (query == NULL) {
+ /* Out of memory, we'll have to try again
+ * later. */
+ return;
+ }
+ /* Copy any previously-received fragments and append
+ * this one. */
+ if (client->client_query_size > 0) {
+ memcpy(query,
+ client->client_query,
+ client->client_query_size);
+ }
+ memcpy(query + client->client_query_size,
+ client->client_inbuf + 4,
+ len);
+ /* Save the new query-in-progress. */
+ free(client->client_query);
+ client->client_query = query;
+ client->client_query_size = nlen;
+ /* Drop the fragment from the incoming
+ * buffer. */
+ memmove(client->client_inbuf,
+ client->client_inbuf + (len + 4),
+ client->client_inbuf_used - (len + 4));
+ client->client_inbuf_used -= (len + 4);
+ }
+ if (last) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "query is %d bytes long on %d\n",
+ client->client_query_size,
+ client->client_fd);
+ /* We have a complete query. Pass it on down. */
+ client_data.client = client->client_fd;
+ memset(&client_data.client_addr, 0,
+ sizeof(client_data.client_addr));
+ client_data.client_addrlen = 0;
+ nis_process_request(state,
+ client->client_query,
+ client->client_query_size,
+ &dispatch_reply_fragment_connected,
+ &dispatch_reply_connected,
+ &client_data,
+ &client->client_query_cookie);
+ /* Decide what to do next. */
+ client_interpret_nis_result(state, client);
}
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "got whole record (%d bytes)\n", record_len);
- nis_process_request(state, &cdata, record, record_len);
- record_len = 0;
- /* Note that we did it! */
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "out: frag_len=%d,record=%d\n",
- frag_len, record_len);
}
-done:
- close(cdata.client);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "closed client connection %d, thread ending\n",
- cdata.client);
- return NULL;
}
-/* Handle a stream client -- answer the connection and spawn a thread to handle
- * its actual requests. */
+/* Handle replying states. */
static void
-dispatch_stream(struct plugin_state *state, int fd)
+client_write(struct plugin_state *state, struct dispatch_client *client)
{
- int flags;
- pthread_t thread;
- struct dispatch_client_data cdata;
- struct dispatch_stream_client_params *params;
+ ssize_t count;
+ int32_t len, nlen;
+ int last;
+ char *query;
+ struct dispatch_client_data client_data;
- /* Answer the connection request. */
- cdata.client = accept(fd, &cdata.client_addr, &cdata.client_addrlen);
- if (cdata.client == -1) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "failed to answer new stream request\n");
- return;
- } else {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "new stream client on %d\n", cdata.client);
- flags = fcntl(cdata.client, F_GETFL);
- flags |= O_NONBLOCK;
- if (fcntl(cdata.client, F_SETFL, flags) != 0) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "error setting client to "
- "non-blocking\n");
+ /* Try to send some of the pending data. */
+ count = write(client->client_fd,
+ client->client_outbuf,
+ client->client_outbuf_used);
+ if (count <= 0) {
+ if ((count != -1) || (errno != EAGAIN)) {
+ /* Fail, disconnect because we're out of sync. */
+ client_set_closing(state, client);
}
- }
-
- /* Bundle up enough info for the thread to do its work. */
- params = malloc(sizeof(*params));
- if (params == NULL) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "out of memory\n");
- close(cdata.client);
return;
}
- params->client = cdata.client;
- params->state = state;
-
- /* Kick off the thread. */
- if (pthread_create(&thread, NULL,
- &dispatch_stream_handler_thread, params) != 0) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "error starting thread\n");
- close(cdata.client);
+ if (count == client->client_outbuf_used) {
+ /* There's no more data to send. */
+ if (client->client_state == client_replying_final) {
+ /* Done. Go back to reading next time. */
+ client_set_reading(state, client);
+ } else {
+ /* More to send, so ask for more reply data. */
+ client->client_outbuf_used = 0;
+ client_data.client = client->client_fd;
+ client_data.client_addr = client->client_addr;
+ client_data.client_addrlen = client->client_addrlen;
+ nis_process_request(state,
+ client->client_query,
+ client->client_query_size,
+ &dispatch_reply_fragment_connected,
+ &dispatch_reply_connected,
+ &client_data,
+ &client->client_query_cookie);
+ client_interpret_nis_result(state, client);
+ }
+ } else {
+ /* Partial write, adjust outgoing buffer for next time. */
+ memmove(client->client_outbuf,
+ client->client_outbuf + count,
+ client->client_outbuf_used - count);
+ client->client_outbuf_used -= count;
}
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "started client-specific thread for client on %d\n",
- cdata.client);
- return;
}
-/* Handle a datagram client -- read the request and handle it immediately. */
static void
-dispatch_dgram(struct plugin_state *state, int fd)
+dispatch_service_client(struct plugin_state *state,
+ struct dispatch_client *client,
+ struct pollfd *fd)
{
- struct dispatch_client_data cdata;
- char dgram[65536];
- int reqsize;
-
- /* Read the request. */
- cdata.client = fd;
- cdata.client_addrlen = sizeof(cdata.client_addr);
- reqsize = recvfrom(fd, dgram, sizeof(dgram), 0,
- &cdata.client_addr, &cdata.client_addrlen);
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "datagram request (%d bytes)\n", reqsize);
-
- /* Handle the request. */
- nis_process_request(state, &cdata, dgram, reqsize);
+ ssize_t count;
+ int32_t len, nlen;
+ int last;
+ char *query;
+ switch (client->client_state) {
+ case client_reading:
+ if (fd->revents & POLLIN) {
+ client_read(state, client);
+ } else {
+ client_set_closing(state, client);
+ }
+ break;
+ case client_replying_with_more:
+ case client_replying_final:
+ if (fd->revents & POLLOUT) {
+ client_write(state, client);
+ } else {
+ client_set_closing(state, client);
+ }
+ break;
+ case client_closing:
+ case client_invalid:
+ /* never reached */
+ assert(0);
+ break;
+ }
}
-
-/* Handle all incoming data. */
void *
dispatch_thread(void *p)
{
+ struct dispatch_client *clients, *client, *next, **list;
+ struct dispatch_client_data client_data;
struct plugin_state *state = p;
- struct pollfd fds[4];
- int i;
+ struct pollfd *fds;
+ int i, n_fds, client_count;
+ ssize_t count;
+
+ clients = NULL;
+ client_count = 0;
+ fds = NULL;
+
for (;;) {
- /* Set up for polling. */
- memset(&fds, 0, sizeof(fds));
+ /* Prune out recently-disconnected clients. */
+ list = &clients;
+ while (*list != NULL) {
+ client = *list;
+ next = client->client_next;
+ if (client->client_state == client_closing) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "pruning client %d\n",
+ client->client_fd);
+ if (client->client_fd != -1) {
+ close(client->client_fd);
+ }
+ free(client);
+ *list = next;
+ } else {
+ list = &(client->client_next);
+ }
+ }
+ /* Count the number of connected clients we have. */
+ client = clients;
+ i = 0;
+ while (client != NULL) {
+ next = client->client_next;
+ client = next;
+ i++;
+ }
+ /* If the "fds" block isn't big enough (or doesn't exist yet),
+ * reallocate it. */
+ if (i > client_count) {
+ free(fds);
+ fds = NULL;
+ client_count = i;
+ }
+ if (fds == NULL) {
+ fds = malloc((4 + client_count) * sizeof(fds[0]));
+ if (fds == NULL) {
+ /* Wait a bit, then try again? */
+ poll(NULL, 0, 10 * 1000);
+ continue;
+ }
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "%d connected clients\n", i);
+
+ /* Fill in the set of polling descriptors. */
+ memset(fds, 0, sizeof((4 + client_count) * sizeof(fds[0])));
for (i = 0; i < state->n_listeners; i++) {
fds[i].fd = state->listener[i].fd;
fds[i].events = POLLIN;
}
+ client = clients;
+ while (client != NULL) {
+ fds[i].fd = client->client_fd;
+ switch (client->client_state) {
+ case client_reading:
+ fds[i].events = POLLIN;
+ break;
+ case client_replying_with_more:
+ case client_replying_final:
+ fds[i].events = POLLOUT;
+ break;
+ case client_closing:
+ case client_invalid:
+ /* shouldn't happen */
+ assert(0);
+ break;
+ }
+ client = client->client_next;
+ i++;
+ }
+
+ /* Check for status updates. */
slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "listening for next request\n");
- switch (poll(fds, state->n_listeners, -1)) {
+ "listening\n");
+ n_fds = i;
+ switch (poll(fds, n_fds, -1)) {
case -1:
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
@@ -387,29 +570,51 @@ dispatch_thread(void *p)
case 0:
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "no request(?)\n");
+ "no request(timeout?)\n");
continue;
default:
- /* Iterate over listening sockets which have work for
- * us to do. */
- for (i = 0; i < state->n_listeners; i++) {
- if ((fds[i].revents & POLLIN) == 0) {
- continue;
- }
- switch (state->listener[i].type) {
- case SOCK_DGRAM:
- dispatch_dgram(state, fds[i].fd);
- break;
- case SOCK_STREAM:
- dispatch_stream(state, fds[i].fd);
- break;
- default:
- /* never reached */
- assert(0);
- break;
+ break;
+ }
+
+ /* Save the head of the existing clients list. */
+ client = clients;
+
+ /* Iterate over listening sockets which have work for us. */
+ for (i = 0; i < state->n_listeners; i++) {
+ if ((fds[i].revents & POLLIN) == 0) {
+ continue;
+ }
+ switch (state->listener[i].type) {
+ case SOCK_DGRAM:
+ /* Datagram requests we handle right
+ * here, right now. */
+ dispatch_dgram(state, fds[i].fd);
+ break;
+ case SOCK_STREAM:
+ /* Try to accept a new client. */
+ next = dispatch_accept_client(state, fds[i].fd);
+ if (next != NULL) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "new client on %d\n",
+ next->client_fd);
+ next->client_next = clients;
+ clients = next;
}
+ break;
+ default:
+ /* never reached */
+ assert(0);
+ break;
}
}
+
+ /* Service the already-connected clients. */
+ for (; i < n_fds; i++, client = client->client_next) {
+ assert(client != NULL);
+ assert(client->client_fd == fds[i].fd);
+ dispatch_service_client(state, client, &fds[i]);
+ }
}
return state;
}