diff options
| author | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-03-27 15:24:05 -0400 |
|---|---|---|
| committer | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-03-27 15:24:05 -0400 |
| commit | 8f884257d95ea9ebec742b59dc83d7defdae117d (patch) | |
| tree | 8d49b573e9f96e2d4fddcb8fa01031265e03934f /src/dispatch.c | |
| parent | bdd74a017f4f1456b63e3b621064adde8e739ac8 (diff) | |
| download | slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.tar.gz slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.tar.xz slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.zip | |
more cleanup of client-handling code
Diffstat (limited to 'src/dispatch.c')
| -rw-r--r-- | src/dispatch.c | 212 |
1 files changed, 196 insertions, 16 deletions
diff --git a/src/dispatch.c b/src/dispatch.c index 9435193..c61b9c3 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -3,6 +3,7 @@ #include <arpa/inet.h> #include <assert.h> #include <errno.h> +#include <fcntl.h> #include <poll.h> #include <pthread.h> #include <stdlib.h> @@ -24,7 +25,200 @@ #include "plugin.h" #include "portmap.h" #include "schema.h" -#include "stream.h" + +#define MAX_CLIENT_IDLE (60 * 1000) + +struct dispatch_stream_client_params { + int client; + struct plugin_state *state; +}; + +/* Handle requests from one stream client. */ +static void * +dispatch_stream_handler_thread(void *arg) +{ + int fraglen, flags, ret, frag_len, record_len, client; + ssize_t i; + int32_t len, nlen; + char fragment[65540], record[65536]; + struct pollfd pollfd; + struct dispatch_stream_client_params *params; + struct plugin_state *state; + bool_t done, last; + + /* Recover the full set of parameters. */ + params = arg; + state = params->state; + client = params->client; + free(arg); + + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "opened client connection %d, thread started\n", + client); + + /* Set the connection to be non-blocking. */ + flags = fcntl(client, F_GETFD); + if ((flags == -1) || + (fcntl(client, F_SETFD, flags | O_NONBLOCK) == -1)) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "error setting new client connection to be " + "non-blocking\n"); + close(client); + return NULL; + } + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "set new client connection to be non-blocking\n"); + + /* Assemble fragments and responses. */ + frag_len = 0; + record_len = 0; + for (;;) { + /* Wait for incoming data. */ + pollfd.fd = client; + pollfd.events = POLLIN | POLLHUP; + switch (poll(&pollfd, 1, MAX_CLIENT_IDLE)) { + case -1: + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "communication error\n"); + goto done; + break; + case 0: + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "client timeout\n"); + goto done; + break; + default: + break; + } + /* Try to read as much data as we have space to hold. */ + i = read(client, fragment + frag_len, + sizeof(fragment) - frag_len); + switch (i) { + case -1: + if (errno == EAGAIN) { + continue; + } + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "communication error\n"); + goto done; + break; + case 0: + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "client closed connection\n"); + goto done; + break; + default: + break; + } + /* We managed to read some piece of a fragment. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "read %d bytes\n", i); + frag_len += i; + /* If we can't know the fragment size yet, then wait for more + * data. */ + if (frag_len < 4) { + continue; + } + /* Convert the fragment length information to local byte order, + * and check if this is the last fragment in the record. */ + memcpy(&nlen, fragment, 4); + len = ntohl(nlen); + last = ((len & 0x80000000) != 0); + len &= 0x7fffffff; + /* If we don't have a complete fragment, then wait for more + * data. */ + if (frag_len < (len + 4)) { + continue; + } + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "expecting %d bytes\n", len); + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "in: frag_len=%d,record=%d\n", + frag_len, record_len); + /* Pull the fragment out of the buffer. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "got whole fragment (%d bytes)\n", len); + /* Append fragment to the record buffer. */ + memmove(record + record_len, fragment + 4, len); + record_len += len; + /* Remove the fragment from the fragment buffer. */ + memmove(fragment, fragment + (len + 4), frag_len - (len + 4)); + frag_len -= (len + 4); + /* If this is the last fragment in the record, then we can + * process the record and reset the record buffer. */ + if (last) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "got whole record (%d bytes)\n", + record_len); + nis_process_request(state, client, NULL, 0, + record, record_len); + record_len = 0; + } + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "out: frag_len=%d,record=%d\n", + frag_len, record_len); + } +done: + close(client); + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "closed client connection %d, thread ending\n", + client); + return NULL; +} + +/* Handle a stream client -- answer the connection and spawn a thread to handle + * its actual requests. */ +static void +dispatch_stream(struct plugin_state *state, int fd) +{ + struct sockaddr_storage client_addr; + socklen_t client_addrlen; + int client; + pthread_t thread; + struct dispatch_stream_client_params *params; + + /* Answer the connection request. */ + client = accept(fd, (struct sockaddr *) &client_addr, &client_addrlen); + if (client == -1) { + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "failed to answer new stream request\n"); + return; + } + + /* Bundle up enough info for the thread to do its work. */ + params = malloc(sizeof(*params)); + if (params == NULL) { + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "out of memory\n"); + close(client); + return; + } + params->client = client; + params->state = state; + + /* Kick off the thread. */ + if (pthread_create(&thread, NULL, + &dispatch_stream_handler_thread, params) != 0) { + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "error starting thread\n"); + close(client); + } + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "started client-specific thread\n"); + return; +} /* Handle a datagram client -- read the request and handle it immediately. */ static void @@ -48,22 +242,8 @@ dispatch_dgram(struct plugin_state *state, int fd) dgram, reqsize); } -/* Handle a stream client -- answer the connection and spawn a thread to handle - * its requests. */ -static void -dispatch_stream(struct plugin_state *state, int fd) -{ - struct sockaddr_storage client_addr; - socklen_t client_addrlen; - int client; - client = accept(fd, (struct sockaddr *) &client_addr, &client_addrlen); - if (client != -1) { - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "new stream request\n"); - stream_client_start(state, client); - } -} +/* Handle all incoming data. */ void * dispatch_thread(void *p) { |
