summaryrefslogtreecommitdiffstats
path: root/src/dispatch.c
diff options
context:
space:
mode:
authorNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-03-27 15:24:05 -0400
committerNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-03-27 15:24:05 -0400
commit8f884257d95ea9ebec742b59dc83d7defdae117d (patch)
tree8d49b573e9f96e2d4fddcb8fa01031265e03934f /src/dispatch.c
parentbdd74a017f4f1456b63e3b621064adde8e739ac8 (diff)
downloadslapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.tar.gz
slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.tar.xz
slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.zip
more cleanup of client-handling code
Diffstat (limited to 'src/dispatch.c')
-rw-r--r--src/dispatch.c212
1 files changed, 196 insertions, 16 deletions
diff --git a/src/dispatch.c b/src/dispatch.c
index 9435193..c61b9c3 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -3,6 +3,7 @@
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
+#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
@@ -24,7 +25,200 @@
#include "plugin.h"
#include "portmap.h"
#include "schema.h"
-#include "stream.h"
+
+#define MAX_CLIENT_IDLE (60 * 1000)
+
+struct dispatch_stream_client_params {
+ int client;
+ struct plugin_state *state;
+};
+
+/* Handle requests from one stream client. */
+static void *
+dispatch_stream_handler_thread(void *arg)
+{
+ int fraglen, flags, ret, frag_len, record_len, client;
+ ssize_t i;
+ int32_t len, nlen;
+ char fragment[65540], record[65536];
+ struct pollfd pollfd;
+ struct dispatch_stream_client_params *params;
+ struct plugin_state *state;
+ bool_t done, last;
+
+ /* Recover the full set of parameters. */
+ params = arg;
+ state = params->state;
+ client = params->client;
+ free(arg);
+
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "opened client connection %d, thread started\n",
+ client);
+
+ /* Set the connection to be non-blocking. */
+ flags = fcntl(client, F_GETFD);
+ if ((flags == -1) ||
+ (fcntl(client, F_SETFD, flags | O_NONBLOCK) == -1)) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "error setting new client connection to be "
+ "non-blocking\n");
+ close(client);
+ return NULL;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "set new client connection to be non-blocking\n");
+
+ /* Assemble fragments and responses. */
+ frag_len = 0;
+ record_len = 0;
+ for (;;) {
+ /* Wait for incoming data. */
+ pollfd.fd = client;
+ pollfd.events = POLLIN | POLLHUP;
+ switch (poll(&pollfd, 1, MAX_CLIENT_IDLE)) {
+ case -1:
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "communication error\n");
+ goto done;
+ break;
+ case 0:
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "client timeout\n");
+ goto done;
+ break;
+ default:
+ break;
+ }
+ /* Try to read as much data as we have space to hold. */
+ i = read(client, fragment + frag_len,
+ sizeof(fragment) - frag_len);
+ switch (i) {
+ case -1:
+ if (errno == EAGAIN) {
+ continue;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "communication error\n");
+ goto done;
+ break;
+ case 0:
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "client closed connection\n");
+ goto done;
+ break;
+ default:
+ break;
+ }
+ /* We managed to read some piece of a fragment. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "read %d bytes\n", i);
+ frag_len += i;
+ /* If we can't know the fragment size yet, then wait for more
+ * data. */
+ if (frag_len < 4) {
+ continue;
+ }
+ /* Convert the fragment length information to local byte order,
+ * and check if this is the last fragment in the record. */
+ memcpy(&nlen, fragment, 4);
+ len = ntohl(nlen);
+ last = ((len & 0x80000000) != 0);
+ len &= 0x7fffffff;
+ /* If we don't have a complete fragment, then wait for more
+ * data. */
+ if (frag_len < (len + 4)) {
+ continue;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "expecting %d bytes\n", len);
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "in: frag_len=%d,record=%d\n",
+ frag_len, record_len);
+ /* Pull the fragment out of the buffer. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "got whole fragment (%d bytes)\n", len);
+ /* Append fragment to the record buffer. */
+ memmove(record + record_len, fragment + 4, len);
+ record_len += len;
+ /* Remove the fragment from the fragment buffer. */
+ memmove(fragment, fragment + (len + 4), frag_len - (len + 4));
+ frag_len -= (len + 4);
+ /* If this is the last fragment in the record, then we can
+ * process the record and reset the record buffer. */
+ if (last) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "got whole record (%d bytes)\n",
+ record_len);
+ nis_process_request(state, client, NULL, 0,
+ record, record_len);
+ record_len = 0;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "out: frag_len=%d,record=%d\n",
+ frag_len, record_len);
+ }
+done:
+ close(client);
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "closed client connection %d, thread ending\n",
+ client);
+ return NULL;
+}
+
+/* Handle a stream client -- answer the connection and spawn a thread to handle
+ * its actual requests. */
+static void
+dispatch_stream(struct plugin_state *state, int fd)
+{
+ struct sockaddr_storage client_addr;
+ socklen_t client_addrlen;
+ int client;
+ pthread_t thread;
+ struct dispatch_stream_client_params *params;
+
+ /* Answer the connection request. */
+ client = accept(fd, (struct sockaddr *) &client_addr, &client_addrlen);
+ if (client == -1) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "failed to answer new stream request\n");
+ return;
+ }
+
+ /* Bundle up enough info for the thread to do its work. */
+ params = malloc(sizeof(*params));
+ if (params == NULL) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "out of memory\n");
+ close(client);
+ return;
+ }
+ params->client = client;
+ params->state = state;
+
+ /* Kick off the thread. */
+ if (pthread_create(&thread, NULL,
+ &dispatch_stream_handler_thread, params) != 0) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "error starting thread\n");
+ close(client);
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "started client-specific thread\n");
+ return;
+}
/* Handle a datagram client -- read the request and handle it immediately. */
static void
@@ -48,22 +242,8 @@ dispatch_dgram(struct plugin_state *state, int fd)
dgram, reqsize);
}
-/* Handle a stream client -- answer the connection and spawn a thread to handle
- * its requests. */
-static void
-dispatch_stream(struct plugin_state *state, int fd)
-{
- struct sockaddr_storage client_addr;
- socklen_t client_addrlen;
- int client;
- client = accept(fd, (struct sockaddr *) &client_addr, &client_addrlen);
- if (client != -1) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "new stream request\n");
- stream_client_start(state, client);
- }
-}
+/* Handle all incoming data. */
void *
dispatch_thread(void *p)
{