summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-03-27 15:24:05 -0400
committerNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-03-27 15:24:05 -0400
commit8f884257d95ea9ebec742b59dc83d7defdae117d (patch)
tree8d49b573e9f96e2d4fddcb8fa01031265e03934f /src
parentbdd74a017f4f1456b63e3b621064adde8e739ac8 (diff)
downloadslapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.tar.gz
slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.tar.xz
slapi-nis-8f884257d95ea9ebec742b59dc83d7defdae117d.zip
more cleanup of client-handling code
Diffstat (limited to 'src')
-rw-r--r--src/Makefile2
-rw-r--r--src/dispatch.c212
-rw-r--r--src/plugin.c1
-rw-r--r--src/stream.c171
-rw-r--r--src/stream.h8
5 files changed, 197 insertions, 197 deletions
diff --git a/src/Makefile b/src/Makefile
index ce610e9..884afc4 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -3,7 +3,7 @@ LDFLAGS = -lnsl -lpthread
all:: plugin.so portmap
-plugin.so: dispatch.c plugin.c portmap.c schema.c stream.c nis.c
+plugin.so: dispatch.c plugin.c portmap.c schema.c nis.c
$(CC) $(CFLAGS) -shared -o $@ $^ $(LDFLAGS)
portmap: portmap.c
$(CC) $(CFLAGS) -o $@ -DPORTMAP_MAIN $^ $(LDFLAGS)
diff --git a/src/dispatch.c b/src/dispatch.c
index 9435193..c61b9c3 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -3,6 +3,7 @@
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
+#include <fcntl.h>
#include <poll.h>
#include <pthread.h>
#include <stdlib.h>
@@ -24,7 +25,200 @@
#include "plugin.h"
#include "portmap.h"
#include "schema.h"
-#include "stream.h"
+
+#define MAX_CLIENT_IDLE (60 * 1000)
+
+struct dispatch_stream_client_params {
+ int client;
+ struct plugin_state *state;
+};
+
+/* Handle requests from one stream client. */
+static void *
+dispatch_stream_handler_thread(void *arg)
+{
+ int fraglen, flags, ret, frag_len, record_len, client;
+ ssize_t i;
+ int32_t len, nlen;
+ char fragment[65540], record[65536];
+ struct pollfd pollfd;
+ struct dispatch_stream_client_params *params;
+ struct plugin_state *state;
+ bool_t done, last;
+
+ /* Recover the full set of parameters. */
+ params = arg;
+ state = params->state;
+ client = params->client;
+ free(arg);
+
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "opened client connection %d, thread started\n",
+ client);
+
+ /* Set the connection to be non-blocking. */
+ flags = fcntl(client, F_GETFD);
+ if ((flags == -1) ||
+ (fcntl(client, F_SETFD, flags | O_NONBLOCK) == -1)) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "error setting new client connection to be "
+ "non-blocking\n");
+ close(client);
+ return NULL;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "set new client connection to be non-blocking\n");
+
+ /* Assemble fragments and responses. */
+ frag_len = 0;
+ record_len = 0;
+ for (;;) {
+ /* Wait for incoming data. */
+ pollfd.fd = client;
+ pollfd.events = POLLIN | POLLHUP;
+ switch (poll(&pollfd, 1, MAX_CLIENT_IDLE)) {
+ case -1:
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "communication error\n");
+ goto done;
+ break;
+ case 0:
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "client timeout\n");
+ goto done;
+ break;
+ default:
+ break;
+ }
+ /* Try to read as much data as we have space to hold. */
+ i = read(client, fragment + frag_len,
+ sizeof(fragment) - frag_len);
+ switch (i) {
+ case -1:
+ if (errno == EAGAIN) {
+ continue;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "communication error\n");
+ goto done;
+ break;
+ case 0:
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "client closed connection\n");
+ goto done;
+ break;
+ default:
+ break;
+ }
+ /* We managed to read some piece of a fragment. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "read %d bytes\n", i);
+ frag_len += i;
+ /* If we can't know the fragment size yet, then wait for more
+ * data. */
+ if (frag_len < 4) {
+ continue;
+ }
+ /* Convert the fragment length information to local byte order,
+ * and check if this is the last fragment in the record. */
+ memcpy(&nlen, fragment, 4);
+ len = ntohl(nlen);
+ last = ((len & 0x80000000) != 0);
+ len &= 0x7fffffff;
+ /* If we don't have a complete fragment, then wait for more
+ * data. */
+ if (frag_len < (len + 4)) {
+ continue;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "expecting %d bytes\n", len);
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "in: frag_len=%d,record=%d\n",
+ frag_len, record_len);
+ /* Pull the fragment out of the buffer. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "got whole fragment (%d bytes)\n", len);
+ /* Append fragment to the record buffer. */
+ memmove(record + record_len, fragment + 4, len);
+ record_len += len;
+ /* Remove the fragment from the fragment buffer. */
+ memmove(fragment, fragment + (len + 4), frag_len - (len + 4));
+ frag_len -= (len + 4);
+ /* If this is the last fragment in the record, then we can
+ * process the record and reset the record buffer. */
+ if (last) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "got whole record (%d bytes)\n",
+ record_len);
+ nis_process_request(state, client, NULL, 0,
+ record, record_len);
+ record_len = 0;
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "out: frag_len=%d,record=%d\n",
+ frag_len, record_len);
+ }
+done:
+ close(client);
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "closed client connection %d, thread ending\n",
+ client);
+ return NULL;
+}
+
+/* Handle a stream client -- answer the connection and spawn a thread to handle
+ * its actual requests. */
+static void
+dispatch_stream(struct plugin_state *state, int fd)
+{
+ struct sockaddr_storage client_addr;
+ socklen_t client_addrlen;
+ int client;
+ pthread_t thread;
+ struct dispatch_stream_client_params *params;
+
+ /* Answer the connection request. */
+ client = accept(fd, (struct sockaddr *) &client_addr, &client_addrlen);
+ if (client == -1) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "failed to answer new stream request\n");
+ return;
+ }
+
+ /* Bundle up enough info for the thread to do its work. */
+ params = malloc(sizeof(*params));
+ if (params == NULL) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "out of memory\n");
+ close(client);
+ return;
+ }
+ params->client = client;
+ params->state = state;
+
+ /* Kick off the thread. */
+ if (pthread_create(&thread, NULL,
+ &dispatch_stream_handler_thread, params) != 0) {
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "error starting thread\n");
+ close(client);
+ }
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "started client-specific thread\n");
+ return;
+}
/* Handle a datagram client -- read the request and handle it immediately. */
static void
@@ -48,22 +242,8 @@ dispatch_dgram(struct plugin_state *state, int fd)
dgram, reqsize);
}
-/* Handle a stream client -- answer the connection and spawn a thread to handle
- * its requests. */
-static void
-dispatch_stream(struct plugin_state *state, int fd)
-{
- struct sockaddr_storage client_addr;
- socklen_t client_addrlen;
- int client;
- client = accept(fd, (struct sockaddr *) &client_addr, &client_addrlen);
- if (client != -1) {
- slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "new stream request\n");
- stream_client_start(state, client);
- }
-}
+/* Handle all incoming data. */
void *
dispatch_thread(void *p)
{
diff --git a/src/plugin.c b/src/plugin.c
index 7741593..4ceb98d 100644
--- a/src/plugin.c
+++ b/src/plugin.c
@@ -28,7 +28,6 @@
#include "plugin.h"
#include "portmap.h"
#include "schema.h"
-#include "stream.h"
#define PACKAGE_VERSION "0.0"
diff --git a/src/stream.c b/src/stream.c
deleted file mode 100644
index dbb53bf..0000000
--- a/src/stream.c
+++ /dev/null
@@ -1,171 +0,0 @@
-#ifdef HAVE_CONFIG_H
-#include "../config.h"
-#endif
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <poll.h>
-#include <pthread.h>
-#include <unistd.h>
-
-#include "nis.h"
-#include "stream.h"
-#define MAX_CLIENT_IDLE (60 * 1000)
-
-struct stream_client_thread_parms {
- int client;
- struct plugin_state *state;
-};
-
-static void *
-stream_client_thread(void *arg)
-{
- int i, fraglen, last, ret, frag_len, record_len;
- int32_t len, nlen;
- char fragment[65536], record[65536];
- struct pollfd pollfd;
-
- struct stream_client_thread_parms *parms = arg;
- struct plugin_state *state = parms->state;
- int done, client = parms->client;
- free(parms);
-
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "opened client connection %d, thread started\n",
- client);
- last = fcntl(client, F_GETFD);
- if ((last == -1) ||
- (fcntl(client, F_SETFD, last | O_NONBLOCK) == -1)) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "error setting new client connection to be "
- "non-blocking\n");
- close(client);
- return NULL;
- }
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "set new client connection to be non-blocking\n");
- frag_len = 0;
- record_len = 0;
- done = 0;
- while (!done) {
- pollfd.fd = client;
- pollfd.events = POLLIN | POLLHUP;
- if (poll(&pollfd, 1, MAX_CLIENT_IDLE) > 0) {
- i = read(client, fragment + frag_len,
- sizeof(fragment) - frag_len);
- switch (i) {
- case -1:
- if (errno == EAGAIN) {
- continue;
- }
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "communication error\n");
- done = 1;
- break;
- case 0:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "client closed connection\n");
- done = 1;
- break;
- default:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "read %d bytes\n", i);
- frag_len += i;
- break;
- }
- if (frag_len > 4) {
- memcpy(&nlen, fragment, 4);
- len = ntohl(nlen);
- last = ((len & 0x80000000) != 0);
- len &= 0x7fffffff;
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "expecting %d bytes\n", len);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "in: frag_len=%d,record=%d\n",
- frag_len, record_len);
- if (frag_len >= len + 4) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "got whole fragment "
- "(%d bytes)\n", len);
- /* we have a whole fragment */
- memmove(record + record_len,
- fragment + 4, len);
- record_len += len;
- if (last) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "got whole "
- "record (%d "
- "bytes)\n",
- record_len);
- /* we have a whole record */
- nis_process_request(state,
- client,
- NULL, 0,
- record,
- record_len);
- record_len = 0;
- }
- memmove(fragment,
- fragment + (len + 4),
- frag_len - (len + 4));
- frag_len -= (len + 4);
- }
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "out: frag_len=%d,record=%d\n",
- frag_len, record_len);
- }
- } else {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "connection timeout\n");
- done = 1;
- }
- }
- close(client);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "closed client connection %d, thread ending\n", client);
- return NULL;
-}
-
-void
-stream_client_start(struct plugin_state *state, int client)
-{
- pthread_t thread;
- struct stream_client_thread_parms *parms;
-
- parms = malloc(sizeof(*parms));
- if (parms == NULL) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "out of memory\n");
- close(client);
- return;
- }
- parms->client = client;
- parms->state = state;
-
- if (pthread_create(&thread, NULL, &stream_client_thread, parms) != 0) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "error starting thread\n");
- close(client);
- }
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "started client-specific thread\n");
- return;
-}
diff --git a/src/stream.h b/src/stream.h
deleted file mode 100644
index a5de9b9..0000000
--- a/src/stream.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#ifndef stream_h
-#define stream_h
-
-#include "plugin.h"
-
-void stream_client_start(struct plugin_state *state, int client);
-
-#endif