summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-27 20:06:26 -0400
committerNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-27 20:06:26 -0400
commitb64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5 (patch)
tree5f57fd0d5074d411aa54a073e973033ac7c35ef6 /src
parentb595fe2039de58f655e226e4e427530884d11d3f (diff)
downloadslapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.tar.gz
slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.tar.xz
slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.zip
- rewrite dispatching code so that
* we multiplex servicing for connected clients in the same thread that handles datagram clients * nis query routines use a callback rather than a symbol, so that... * ...we can supply different callbacks for datagram and connected clients * nis query routines have the option of giving us a chunk of output and telling us to come back later for more, though for now only nis_all gets the option of doing that, and currently write those chunks immediately in a blocking way
Diffstat (limited to 'src')
-rw-r--r--src/dispatch.c721
-rw-r--r--src/dispatch.h21
-rw-r--r--src/nis.c92
-rw-r--r--src/nis.h6
4 files changed, 540 insertions, 300 deletions
diff --git a/src/dispatch.c b/src/dispatch.c
index 34932f1..38197cc 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -26,17 +26,48 @@
#define MAX_CLIENT_IDLE (60 * 1000)
-struct dispatch_stream_client_params {
- int client;
- struct plugin_state *state;
-};
-
struct dispatch_client_data {
int client;
struct sockaddr client_addr;
socklen_t client_addrlen;
};
+/* Send a reply, unbuffered datagram version. */
+static bool_t
+dispatch_reply_fragment_dgram(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf,
+ bool_t first_fragment, bool_t last_fragment)
+{
+ xdr_replymsg(reply_xdrs, reply);
+ if (!first_fragment || !last_fragment) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "Trying to sending datagram reply (%d bytes), "
+ "even though the reply is not suitable for "
+ "transmission as a datagram.\n",
+ xdr_getpos(reply_xdrs));
+ } else {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "Sending datagram reply (%d bytes).\n",
+ xdr_getpos(reply_xdrs));
+ }
+ sendto(cdata->client, reply_buf, xdr_getpos(reply_xdrs),
+ 0, &cdata->client_addr, cdata->client_addrlen);
+ return TRUE;
+}
+static bool_t
+dispatch_reply_dgram(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf)
+{
+ return dispatch_reply_fragment_dgram(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, TRUE);
+}
+
+/* Send a reply, buffered-for-connected-clients version. */
static ssize_t
dispatch_write_with_retry(struct plugin_state *state, int fd,
const void *buffer1, ssize_t length1,
@@ -87,297 +118,449 @@ dispatch_write_with_retry(struct plugin_state *state, int fd,
}
return sent;
}
-
-/* Send a reply which _could_ be sent over a datagram. */
-void
-dispatch_reply_fragment(struct plugin_state *state,
- struct dispatch_client_data *cdata,
- struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
- bool_t first_fragment, bool_t last_fragment)
+static bool_t
+dispatch_reply_fragment_connected(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf,
+ bool_t first_fragment, bool_t last_fragment)
{
uint32_t len;
- if (cdata->client_addrlen != 0) {
- /* It's a datagram socket, so there's nothing to do but send
- * the data by itself. */
+ /* Record reply - first fragment. */
+ if (first_fragment) {
xdr_replymsg(reply_xdrs, reply);
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "Sending datagram reply (%d bytes).\n",
+ }
+ /* Calculate the fragment length bytes. */
+ len = htonl(xdr_getpos(reply_xdrs) |
+ (last_fragment ? 0x80000000 : 0));
+ /* Send the data to the client. */
+ if (dispatch_write_with_retry(state, cdata->client,
+ &len, 4,
+ reply_buf,
+ xdr_getpos(reply_xdrs)) ==
+ 4 + xdr_getpos(reply_xdrs)) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Stream reply (4+%d bytes) sent.\n",
xdr_getpos(reply_xdrs));
- sendto(cdata->client, reply_buf, xdr_getpos(reply_xdrs),
- 0, &cdata->client_addr, cdata->client_addrlen);
+ return TRUE;
} else {
- /* Record reply - one fragment. */
- if (first_fragment) {
- xdr_replymsg(reply_xdrs, reply);
- }
- /* Calculate the fragment length bytes. */
- len = htonl(xdr_getpos(reply_xdrs) |
- (last_fragment ? 0x80000000 : 0));
- /* Send the data to the client. */
- if (dispatch_write_with_retry(state, cdata->client,
- &len, 4,
- reply_buf,
- xdr_getpos(reply_xdrs)) ==
- 4 + xdr_getpos(reply_xdrs)) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "Stream reply (4+%d bytes) sent.\n",
- xdr_getpos(reply_xdrs));
- } else {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "Stream reply (4+%d bytes) failed!\n",
- xdr_getpos(reply_xdrs));
- }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Stream reply (4+%d bytes) failed!\n",
+ xdr_getpos(reply_xdrs));
+ return FALSE;
}
}
-
-/* Send an entire reply datagram or record at once. */
-void
-dispatch_reply(struct plugin_state *state, struct dispatch_client_data *cdata,
- struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf)
+/* Send an entire reply record at once. */
+bool_t
+dispatch_reply_connected(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf)
{
- dispatch_reply_fragment(state, cdata,
- reply, reply_xdrs, reply_buf, TRUE, TRUE);
+ return dispatch_reply_fragment_connected(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, TRUE);
}
-/* Handle requests from one stream client. */
-static void *
-dispatch_stream_handler_thread(void *arg)
+/* Handle a datagram client -- read the request and handle it immediately. */
+static void
+dispatch_dgram(struct plugin_state *state, int fd)
{
- int flags, frag_len, record_len;
- ssize_t i;
- int32_t len, nlen;
- char fragment[65540], record[65536];
- struct pollfd pollfd;
- struct dispatch_stream_client_params *params;
struct dispatch_client_data cdata;
- struct plugin_state *state;
- bool_t last;
+ char dgram[65536];
+ int reqsize;
- /* Recover the full set of parameters. */
- params = arg;
- state = params->state;
- cdata.client = params->client;
- memset(&cdata.client_addr, 0, sizeof(cdata.client_addr));
- cdata.client_addrlen = 0;
- free(arg);
+ /* Read the request. */
+ cdata.client = fd;
+ cdata.client_addrlen = sizeof(cdata.client_addr);
+ reqsize = recvfrom(fd, dgram, sizeof(dgram), 0,
+ &cdata.client_addr, &cdata.client_addrlen);
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "datagram request (%d bytes)\n", reqsize);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "opened client connection %d, thread started\n",
- cdata.client);
+ /* Handle the request. */
+ nis_process_request(state, dgram, reqsize,
+ &dispatch_reply_fragment_dgram,
+ &dispatch_reply_dgram,
+ &cdata, NULL);
+}
- /* Set the connection to be non-blocking. */
- flags = fcntl(cdata.client, F_GETFD);
- if ((flags == -1) ||
- (fcntl(cdata.client, F_SETFD, flags | O_NONBLOCK) == -1)) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "error setting new client connection to be "
- "non-blocking\n");
- close(cdata.client);
+/* Handle all incoming data. */
+struct dispatch_client {
+ /* The client socket and address. */
+ int client_fd;
+ struct sockaddr client_addr;
+ socklen_t client_addrlen;
+ /* The client state. */
+ enum {
+ client_invalid,
+ client_closing,
+ client_reading,
+ client_replying_with_more,
+ client_replying_final,
+ } client_state;
+ /* The client's request while we're reading it. */
+ char client_inbuf[8192];
+ ssize_t client_inbuf_used;
+ char *client_query;
+ ssize_t client_query_size;
+ void *client_query_cookie;
+ /* The reply to the client, when we're sending one. */
+ char client_outbuf[4096];
+ ssize_t client_outbuf_used;
+ /* This is a linked list. */
+ struct dispatch_client *client_next;
+};
+
+/* Set the client's record up to start reading a new query. */
+static void
+client_set_reading(struct plugin_state *state, struct dispatch_client *client)
+{
+ client->client_inbuf_used = 0;
+ free(client->client_query);
+ client->client_query = NULL;
+ client->client_query_size = 0;
+ client->client_outbuf_used = 0;
+ client->client_state = client_reading;
+}
+
+/* Set the client's record up to be cleaned up. */
+static void
+client_set_closing(struct plugin_state *state, struct dispatch_client *client)
+{
+ client->client_inbuf_used = 0;
+ free(client->client_query);
+ client->client_query = NULL;
+ client->client_query_size = 0;
+ client->client_outbuf_used = 0;
+ client->client_state = client_closing;
+}
+
+/* Set the client's record up. */
+static struct dispatch_client *
+dispatch_accept_client(struct plugin_state *state, int fd)
+{
+ struct dispatch_client *client;
+ client = malloc(sizeof(*client));
+ if (client == NULL) {
+ return NULL;
+ }
+ fd = accept(fd, &client->client_addr, &client->client_addrlen);
+ if (fd == -1) {
+ free(client);
return NULL;
}
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "set new client connection to be non-blocking\n");
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "new connected client on %d\n", fd);
+ memset(client, 0, sizeof(*client));
+ client->client_fd = fd;
+ client_set_reading(state, client);
+ return client;
+}
- /* Assemble fragments and responses. */
- frag_len = 0;
- record_len = 0;
- for (;;) {
- /* Wait for incoming data. */
- pollfd.fd = cdata.client;
- pollfd.events = POLLIN | POLLHUP;
- switch (poll(&pollfd, 1, MAX_CLIENT_IDLE)) {
- case -1:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "communication error\n");
- goto done;
- break;
- case 0:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "client timeout\n");
- goto done;
- break;
- default:
- break;
+/* Decide what to do next. */
+static void
+client_interpret_nis_result(struct plugin_state *state,
+ struct dispatch_client *client)
+{
+ if (client->client_query_cookie != NULL) {
+ client->client_state = client_replying_with_more;
+ } else {
+ if (client->client_outbuf_used > 0) {
+ client->client_state = client_replying_final;
+ } else {
+ client_set_reading(state, client);
}
- /* Try to read as much data as we have space to hold. */
- i = read(cdata.client, fragment + frag_len,
- sizeof(fragment) - frag_len);
- switch (i) {
- case -1:
- if (errno == EAGAIN) {
- continue;
+ }
+}
+
+/* Handle reading state. */
+static void
+client_read(struct plugin_state *state, struct dispatch_client *client)
+{
+ ssize_t count;
+ int32_t len, nlen;
+ int last;
+ char *query;
+ struct dispatch_client_data client_data;
+ /* Try to read some data. */
+ count = read(client->client_fd,
+ client->client_inbuf + client->client_inbuf_used,
+ sizeof(client->client_inbuf) - client->client_inbuf_used);
+ if (count <= 0) {
+ if ((count != -1) || (errno != EAGAIN)) {
+ /* Disconnect the client. */
+ if (count == 0) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "no more data from %d, "
+ "marking for closing\n",
+ client->client_fd);
+ } else {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "error reading from %d, "
+ "marking for closing\n",
+ client->client_fd);
}
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "communication error\n");
- goto done;
- break;
- case 0:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "client closed connection\n");
- goto done;
- break;
- default:
- break;
+ client_set_closing(state, client);
}
- /* We managed to read some piece of a fragment. */
+ } else {
+ /* Record the data as added to the fragment buffer. */
+ client->client_inbuf_used += count;
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "read %d bytes\n", i);
- frag_len += i;
- /* If we can't know the fragment size yet, then wait for more
- * data. */
- if (frag_len < 4) {
- continue;
+ "have %d bytes from %d\n",
+ client->client_inbuf_used,
+ client->client_fd);
+ /* Check if we've got a complete fragment. */
+ if (client->client_inbuf_used < 4) {
+ /* We don't even have a length, so continue reading. */
+ return;
}
- /* Convert the fragment length information to local byte order,
- * and determine if this is the last fragment in the record. */
- memcpy(&nlen, fragment, 4);
+ /* Read the length of the first fragment in the buffer. */
+ memcpy(&nlen, client->client_inbuf, 4);
len = ntohl(nlen);
last = ((len & 0x80000000) != 0);
len &= 0x7fffffff;
- /* If we don't have a complete fragment, then wait for more
- * data. */
- if (frag_len < (len + 4)) {
- continue;
- }
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "expecting %d bytes\n", len);
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "in: frag_len=%d,record=%d\n",
- frag_len, record_len);
- /* Pull the fragment out of the buffer and append it to the
- * record buffer. */
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "got whole fragment (%d bytes)\n", len);
- memmove(record + record_len, fragment + 4, len);
- record_len += len;
- memmove(fragment, fragment + (len + 4), frag_len - (len + 4));
- frag_len -= (len + 4);
- /* If there are more fragments to come, then we need to wait
- * for more data. */
- if (!last) {
- continue;
+ "fragment is %d bytes long%s, "
+ "have %d bytes pending, on %d\n",
+ len, last ? " (last one)" : "",
+ client->client_inbuf_used, client->client_fd);
+ if ((len + 4) <= client->client_inbuf_used) {
+ /* Got at least one fragment! */
+ nlen = len + client->client_query_size;
+ query = malloc(nlen);
+ if (query == NULL) {
+ /* Out of memory, we'll have to try again
+ * later. */
+ return;
+ }
+ /* Copy any previously-received fragments and append
+ * this one. */
+ if (client->client_query_size > 0) {
+ memcpy(query,
+ client->client_query,
+ client->client_query_size);
+ }
+ memcpy(query + client->client_query_size,
+ client->client_inbuf + 4,
+ len);
+ /* Save the new query-in-progress. */
+ free(client->client_query);
+ client->client_query = query;
+ client->client_query_size = nlen;
+ /* Drop the fragment from the incoming
+ * buffer. */
+ memmove(client->client_inbuf,
+ client->client_inbuf + (len + 4),
+ client->client_inbuf_used - (len + 4));
+ client->client_inbuf_used -= (len + 4);
+ }
+ if (last) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "query is %d bytes long on %d\n",
+ client->client_query_size,
+ client->client_fd);
+ /* We have a complete query. Pass it on down. */
+ client_data.client = client->client_fd;
+ memset(&client_data.client_addr, 0,
+ sizeof(client_data.client_addr));
+ client_data.client_addrlen = 0;
+ nis_process_request(state,
+ client->client_query,
+ client->client_query_size,
+ &dispatch_reply_fragment_connected,
+ &dispatch_reply_connected,
+ &client_data,
+ &client->client_query_cookie);
+ /* Decide what to do next. */
+ client_interpret_nis_result(state, client);
}
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "got whole record (%d bytes)\n", record_len);
- nis_process_request(state, &cdata, record, record_len);
- record_len = 0;
- /* Note that we did it! */
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "out: frag_len=%d,record=%d\n",
- frag_len, record_len);
}
-done:
- close(cdata.client);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "closed client connection %d, thread ending\n",
- cdata.client);
- return NULL;
}
-/* Handle a stream client -- answer the connection and spawn a thread to handle
- * its actual requests. */
+/* Handle replying states. */
static void
-dispatch_stream(struct plugin_state *state, int fd)
+client_write(struct plugin_state *state, struct dispatch_client *client)
{
- int flags;
- pthread_t thread;
- struct dispatch_client_data cdata;
- struct dispatch_stream_client_params *params;
+ ssize_t count;
+ int32_t len, nlen;
+ int last;
+ char *query;
+ struct dispatch_client_data client_data;
- /* Answer the connection request. */
- cdata.client = accept(fd, &cdata.client_addr, &cdata.client_addrlen);
- if (cdata.client == -1) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "failed to answer new stream request\n");
- return;
- } else {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "new stream client on %d\n", cdata.client);
- flags = fcntl(cdata.client, F_GETFL);
- flags |= O_NONBLOCK;
- if (fcntl(cdata.client, F_SETFL, flags) != 0) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "error setting client to "
- "non-blocking\n");
+ /* Try to send some of the pending data. */
+ count = write(client->client_fd,
+ client->client_outbuf,
+ client->client_outbuf_used);
+ if (count <= 0) {
+ if ((count != -1) || (errno != EAGAIN)) {
+ /* Fail, disconnect because we're out of sync. */
+ client_set_closing(state, client);
}
- }
-
- /* Bundle up enough info for the thread to do its work. */
- params = malloc(sizeof(*params));
- if (params == NULL) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "out of memory\n");
- close(cdata.client);
return;
}
- params->client = cdata.client;
- params->state = state;
-
- /* Kick off the thread. */
- if (pthread_create(&thread, NULL,
- &dispatch_stream_handler_thread, params) != 0) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "error starting thread\n");
- close(cdata.client);
+ if (count == client->client_outbuf_used) {
+ /* There's no more data to send. */
+ if (client->client_state == client_replying_final) {
+ /* Done. Go back to reading next time. */
+ client_set_reading(state, client);
+ } else {
+ /* More to send, so ask for more reply data. */
+ client->client_outbuf_used = 0;
+ client_data.client = client->client_fd;
+ client_data.client_addr = client->client_addr;
+ client_data.client_addrlen = client->client_addrlen;
+ nis_process_request(state,
+ client->client_query,
+ client->client_query_size,
+ &dispatch_reply_fragment_connected,
+ &dispatch_reply_connected,
+ &client_data,
+ &client->client_query_cookie);
+ client_interpret_nis_result(state, client);
+ }
+ } else {
+ /* Partial write, adjust outgoing buffer for next time. */
+ memmove(client->client_outbuf,
+ client->client_outbuf + count,
+ client->client_outbuf_used - count);
+ client->client_outbuf_used -= count;
}
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "started client-specific thread for client on %d\n",
- cdata.client);
- return;
}
-/* Handle a datagram client -- read the request and handle it immediately. */
static void
-dispatch_dgram(struct plugin_state *state, int fd)
+dispatch_service_client(struct plugin_state *state,
+ struct dispatch_client *client,
+ struct pollfd *fd)
{
- struct dispatch_client_data cdata;
- char dgram[65536];
- int reqsize;
-
- /* Read the request. */
- cdata.client = fd;
- cdata.client_addrlen = sizeof(cdata.client_addr);
- reqsize = recvfrom(fd, dgram, sizeof(dgram), 0,
- &cdata.client_addr, &cdata.client_addrlen);
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "datagram request (%d bytes)\n", reqsize);
-
- /* Handle the request. */
- nis_process_request(state, &cdata, dgram, reqsize);
+ ssize_t count;
+ int32_t len, nlen;
+ int last;
+ char *query;
+ switch (client->client_state) {
+ case client_reading:
+ if (fd->revents & POLLIN) {
+ client_read(state, client);
+ } else {
+ client_set_closing(state, client);
+ }
+ break;
+ case client_replying_with_more:
+ case client_replying_final:
+ if (fd->revents & POLLOUT) {
+ client_write(state, client);
+ } else {
+ client_set_closing(state, client);
+ }
+ break;
+ case client_closing:
+ case client_invalid:
+ /* never reached */
+ assert(0);
+ break;
+ }
}
-
-/* Handle all incoming data. */
void *
dispatch_thread(void *p)
{
+ struct dispatch_client *clients, *client, *next, **list;
+ struct dispatch_client_data client_data;
struct plugin_state *state = p;
- struct pollfd fds[4];
- int i;
+ struct pollfd *fds;
+ int i, n_fds, client_count;
+ ssize_t count;
+
+ clients = NULL;
+ client_count = 0;
+ fds = NULL;
+
for (;;) {
- /* Set up for polling. */
- memset(&fds, 0, sizeof(fds));
+ /* Prune out recently-disconnected clients. */
+ list = &clients;
+ while (*list != NULL) {
+ client = *list;
+ next = client->client_next;
+ if (client->client_state == client_closing) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "pruning client %d\n",
+ client->client_fd);
+ if (client->client_fd != -1) {
+ close(client->client_fd);
+ }
+ free(client);
+ *list = next;
+ } else {
+ list = &(client->client_next);
+ }
+ }
+ /* Count the number of connected clients we have. */
+ client = clients;
+ i = 0;
+ while (client != NULL) {
+ next = client->client_next;
+ client = next;
+ i++;
+ }
+ /* If the "fds" block isn't big enough (or doesn't exist yet),
+ * reallocate it. */
+ if (i > client_count) {
+ free(fds);
+ fds = NULL;
+ client_count = i;
+ }
+ if (fds == NULL) {
+ fds = malloc((4 + client_count) * sizeof(fds[0]));
+ if (fds == NULL) {
+ /* Wait a bit, then try again? */
+ poll(NULL, 0, 10 * 1000);
+ continue;
+ }
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "%d connected clients\n", i);
+
+ /* Fill in the set of polling descriptors. */
+ memset(fds, 0, sizeof((4 + client_count) * sizeof(fds[0])));
for (i = 0; i < state->n_listeners; i++) {
fds[i].fd = state->listener[i].fd;
fds[i].events = POLLIN;
}
+ client = clients;
+ while (client != NULL) {
+ fds[i].fd = client->client_fd;
+ switch (client->client_state) {
+ case client_reading:
+ fds[i].events = POLLIN;
+ break;
+ case client_replying_with_more:
+ case client_replying_final:
+ fds[i].events = POLLOUT;
+ break;
+ case client_closing:
+ case client_invalid:
+ /* shouldn't happen */
+ assert(0);
+ break;
+ }
+ client = client->client_next;
+ i++;
+ }
+
+ /* Check for status updates. */
slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "listening for next request\n");
- switch (poll(fds, state->n_listeners, -1)) {
+ "listening\n");
+ n_fds = i;
+ switch (poll(fds, n_fds, -1)) {
case -1:
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
@@ -387,29 +570,51 @@ dispatch_thread(void *p)
case 0:
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "no request(?)\n");
+ "no request(timeout?)\n");
continue;
default:
- /* Iterate over listening sockets which have work for
- * us to do. */
- for (i = 0; i < state->n_listeners; i++) {
- if ((fds[i].revents & POLLIN) == 0) {
- continue;
- }
- switch (state->listener[i].type) {
- case SOCK_DGRAM:
- dispatch_dgram(state, fds[i].fd);
- break;
- case SOCK_STREAM:
- dispatch_stream(state, fds[i].fd);
- break;
- default:
- /* never reached */
- assert(0);
- break;
+ break;
+ }
+
+ /* Save the head of the existing clients list. */
+ client = clients;
+
+ /* Iterate over listening sockets which have work for us. */
+ for (i = 0; i < state->n_listeners; i++) {
+ if ((fds[i].revents & POLLIN) == 0) {
+ continue;
+ }
+ switch (state->listener[i].type) {
+ case SOCK_DGRAM:
+ /* Datagram requests we handle right
+ * here, right now. */
+ dispatch_dgram(state, fds[i].fd);
+ break;
+ case SOCK_STREAM:
+ /* Try to accept a new client. */
+ next = dispatch_accept_client(state, fds[i].fd);
+ if (next != NULL) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "new client on %d\n",
+ next->client_fd);
+ next->client_next = clients;
+ clients = next;
}
+ break;
+ default:
+ /* never reached */
+ assert(0);
+ break;
}
}
+
+ /* Service the already-connected clients. */
+ for (; i < n_fds; i++, client = client->client_next) {
+ assert(client != NULL);
+ assert(client->client_fd == fds[i].fd);
+ dispatch_service_client(state, client, &fds[i]);
+ }
}
return state;
}
diff --git a/src/dispatch.h b/src/dispatch.h
index 14ea795..2fc273e 100644
--- a/src/dispatch.h
+++ b/src/dispatch.h
@@ -1,11 +1,16 @@
+#ifndef dispatch_h
+#define dispatch_h
struct plugin_state;
struct dispatch_client_data;
void *dispatch_thread(void *p);
-void dispatch_reply_fragment(struct plugin_state *state,
- struct dispatch_client_data *cdata,
- struct rpc_msg *reply,
- XDR *reply_xdrs, char *reply_buf,
- bool_t first_fragment, bool_t last_fragment);
-void dispatch_reply(struct plugin_state *state,
- struct dispatch_client_data *cdata,
- struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf);
+typedef bool_t (dispatch_reply_fragment)(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf,
+ bool_t first_fragment,
+ bool_t last_fragment);
+typedef bool_t (dispatch_reply)(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf);
+#endif
diff --git a/src/nis.c b/src/nis.c
index 22d70a7..a1e7ff5 100644
--- a/src/nis.c
+++ b/src/nis.c
@@ -25,6 +25,8 @@
/* Indicate whether or not we serve the specified domain. */
static void
nis_domain(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs, PRBool reply_on_failure,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -39,8 +41,7 @@ nis_domain(struct plugin_state *state,
state->plugin_desc->spd_id,
"domain(%s) -> %s\n",
domain, *reply_bool ? "TRUE" : "FALSE");
- dispatch_reply(state, cdata,
- reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
} else {
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
@@ -57,6 +58,8 @@ nis_domain(struct plugin_state *state,
static void
nis_match(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -84,7 +87,7 @@ nis_match(struct plugin_state *state,
} else {
reply_val->status = YP_NOKEY;
}
- dispatch_reply(state, cdata, reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
} else {
/* XXX */
}
@@ -92,6 +95,8 @@ nis_match(struct plugin_state *state,
static void
nis_first(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -122,7 +127,7 @@ nis_first(struct plugin_state *state,
reply_key_val->status = map_supported ? YP_NOKEY :
YP_NOMAP;
}
- dispatch_reply(state, cdata, reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
} else {
/* XXX */
}
@@ -130,6 +135,8 @@ nis_first(struct plugin_state *state,
static void
nis_next(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -175,7 +182,7 @@ nis_next(struct plugin_state *state,
req_key.domain,
req_key.map);
}
- dispatch_reply(state, cdata, reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
} else {
/* XXX */
}
@@ -183,6 +190,8 @@ nis_next(struct plugin_state *state,
static void
nis_master(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -204,7 +213,7 @@ nis_master(struct plugin_state *state,
req_nokey.domain,
req_nokey.map,
reply_master->master);
- dispatch_reply(state, cdata, reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
} else {
/* XXX */
}
@@ -212,6 +221,8 @@ nis_master(struct plugin_state *state,
static void
nis_order(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -239,7 +250,7 @@ nis_order(struct plugin_state *state,
req_nokey.domain,
req_nokey.map);
}
- dispatch_reply(state, cdata, reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
} else {
/* XXX */
}
@@ -278,6 +289,8 @@ nis_free_maplist_cb_result(struct ypmaplist **list)
}
static void
nis_maplist(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
@@ -306,7 +319,7 @@ nis_maplist(struct plugin_state *state,
domain, list->map);
}
}
- dispatch_reply(state, cdata, reply, reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, reply, reply_xdrs, reply_buf);
nis_free_maplist_cb_result(&list);
} else {
/* XXX */
@@ -315,10 +328,12 @@ nis_maplist(struct plugin_state *state,
static void
nis_all(struct plugin_state *state,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
XDR *request_xdrs,
struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf,
- struct ypresp_all *reply_all)
+ struct ypresp_all *reply_all, void **continuation_cookie)
{
struct ypreq_nokey req_nokey;
keydat_t *reply_key;
@@ -337,16 +352,16 @@ nis_all(struct plugin_state *state,
reply_all->ypresp_all_u.val.status = YP_NOMAP;
reply_key->keydat_len = 0;
reply_val->valdat_len = 0;
- dispatch_reply_fragment(state, cdata,
- reply, reply_xdrs, reply_buf,
- TRUE, FALSE);
+ (*reply_fragment_fn)(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, FALSE);
/* End of data. */
reply_all->more = FALSE;
xdr_setpos(reply_xdrs, 0);
xdr_ypresp_all(reply_xdrs, reply_all);
- dispatch_reply_fragment(state, cdata,
- reply, reply_xdrs, reply_buf,
- FALSE, TRUE);
+ (*reply_fragment_fn)(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ FALSE, TRUE);
} else {
bool_t first;
reply_all->more = TRUE;
@@ -370,10 +385,10 @@ nis_all(struct plugin_state *state,
reply_key->keydat_len,
reply_key->keydat_val,
reply_all->more);
- dispatch_reply_fragment(state, cdata,
- reply,
- reply_xdrs, reply_buf,
- first, FALSE);
+ (*reply_fragment_fn)(state, cdata,
+ reply,
+ reply_xdrs, reply_buf,
+ first, FALSE);
first = FALSE;
/* Find the next entry. */
reply_all->more = map_next(state,
@@ -402,9 +417,9 @@ nis_all(struct plugin_state *state,
reply_all->more = FALSE;
xdr_ypresp_all(reply_xdrs, reply_all);
/* Bundle those two chunks into one reply. */
- dispatch_reply_fragment(state, cdata,
- reply, reply_xdrs, reply_buf,
- FALSE, TRUE);
+ (*reply_fragment_fn)(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ FALSE, TRUE);
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
"all(%s/%s) done\n",
@@ -420,8 +435,11 @@ nis_all(struct plugin_state *state,
* stream client. */
void
nis_process_request(struct plugin_state *state,
+ char *request_buf, size_t request_buflen,
+ dispatch_reply_fragment *reply_fragment_fn,
+ dispatch_reply *reply_fn,
struct dispatch_client_data *cdata,
- char *request_buf, size_t request_buflen)
+ void **continuation_cookie)
{
XDR request_xdrs, reply_xdrs, auth_xdrs;
AUTH *request_auth, *reply_auth;
@@ -573,7 +591,8 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_bool;
accepted->ar_results.proc = (xdrproc_t) xdr_bool;
/* Call the real function. */
- nis_domain(state, cdata, &request_xdrs,
+ nis_domain(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
request.rm_call.cb_proc == YPPROC_DOMAIN,
&reply, &reply_xdrs, reply_buf, &reply_bool);
goto sent_reply;
@@ -587,7 +606,8 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_val;
accepted->ar_results.proc = (xdrproc_t) xdr_ypresp_val;
/* Call the real function. */
- nis_match(state, cdata, &request_xdrs,
+ nis_match(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
&reply, &reply_xdrs, reply_buf, &reply_val);
goto sent_reply;
break;
@@ -600,7 +620,8 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_key_val;
accepted->ar_results.proc = (xdrproc_t) xdr_ypresp_key_val;
/* Call the real function. */
- nis_first(state, cdata, &request_xdrs,
+ nis_first(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
&reply, &reply_xdrs, reply_buf, &reply_key_val);
goto sent_reply;
break;
@@ -613,7 +634,8 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_key_val;
accepted->ar_results.proc = (xdrproc_t) xdr_ypresp_key_val;
/* Call the real function. */
- nis_next(state, cdata, &request_xdrs,
+ nis_next(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
&reply, &reply_xdrs, reply_buf, &reply_key_val);
goto sent_reply;
break;
@@ -638,8 +660,10 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_all;
accepted->ar_results.proc = (xdrproc_t) &xdr_ypresp_all;
/* Call the real function. */
- nis_all(state, cdata, &request_xdrs,
- &reply, &reply_xdrs, reply_buf, &reply_all);
+ nis_all(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
+ &reply, &reply_xdrs, reply_buf, &reply_all,
+ continuation_cookie);
goto sent_reply;
break;
case YPPROC_MASTER:
@@ -651,7 +675,8 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_master;
accepted->ar_results.proc = (xdrproc_t) xdr_ypresp_master;
/* Call the real function. */
- nis_master(state, cdata, &request_xdrs,
+ nis_master(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
&reply, &reply_xdrs, reply_buf, &reply_master);
goto sent_reply;
break;
@@ -664,7 +689,7 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_order;
accepted->ar_results.proc = (xdrproc_t) xdr_ypresp_order;
/* Call the real function. */
- nis_order(state, cdata,
+ nis_order(state, reply_fragment_fn, reply_fn, cdata,
&request_xdrs,
&reply, &reply_xdrs, reply_buf, &reply_order);
goto sent_reply;
@@ -678,7 +703,8 @@ nis_process_request(struct plugin_state *state,
accepted->ar_results.where = (caddr_t) &reply_maplist;
accepted->ar_results.proc = (xdrproc_t) xdr_ypresp_maplist;
/* Call the real function. */
- nis_maplist(state, cdata, &request_xdrs,
+ nis_maplist(state, reply_fragment_fn, reply_fn,
+ cdata, &request_xdrs,
&reply, &reply_xdrs, reply_buf, &reply_maplist);
goto sent_reply;
break;
@@ -692,7 +718,7 @@ nis_process_request(struct plugin_state *state,
}
send_reply:
- dispatch_reply(state, cdata, &reply, &reply_xdrs, reply_buf);
+ (*reply_fn)(state, cdata, &reply, &reply_xdrs, reply_buf);
sent_reply:
xdr_destroy(&reply_xdrs);
diff --git a/src/nis.h b/src/nis.h
index fd7f6d7..60eb6a2 100644
--- a/src/nis.h
+++ b/src/nis.h
@@ -4,10 +4,14 @@
#include <sys/types.h>
#include <sys/socket.h>
#include "plugin.h"
+#include "dispatch.h"
struct dispatch_client_data;
void nis_process_request(struct plugin_state *state,
+ char *request_buf, size_t request_buflen,
+ dispatch_reply_fragment *reply_fragment,
+ dispatch_reply *reply,
struct dispatch_client_data *cdata,
- char *request_buf, size_t request_buflen);
+ void **continuation_cookie);
#endif