From 6c316e9ae234a97aa0cf6c240bcf38e35a53ae7c Mon Sep 17 00:00:00 2001 From: Nalin Dahyabhai Date: Fri, 30 May 2008 17:47:28 -0400 Subject: - sort out the threading start/stop functions, and add rwlock functions --- src/Makefile.am | 3 +- src/dispatch.c | 87 +++++++++++++++++++-------------------------------------- src/dispatch.h | 3 +- src/map.c | 25 ++++++++++++++++- src/map.h | 3 ++ src/plugin.c | 28 ++----------------- src/plugin.h | 10 ++----- 7 files changed, 65 insertions(+), 94 deletions(-) (limited to 'src') diff --git a/src/Makefile.am b/src/Makefile.am index 95734e1..6eb8724 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -12,7 +12,8 @@ nis_plugin_la_SOURCES = \ map.c \ nis.c \ plugin.c \ - portmap.c + portmap.c \ + wrap.c nis_plugin_la_LIBADD = $(RUNTIME_LIBS) -lnsl -lpthread noinst_LTLIBRARIES = dummy-nis-plugin.la diff --git a/src/dispatch.c b/src/dispatch.c index f72aceb..8a9c981 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -126,58 +125,6 @@ dispatch_reply_dgram(struct plugin_state *state, TRUE, TRUE); } -/* Send a reply, buffered-for-connected-clients version. */ -static ssize_t -dispatch_write_with_retry(struct plugin_state *state, int fd, - const void *buffer1, ssize_t length1, - const void *buffer2, ssize_t length2) -{ - ssize_t sent, i; - struct iovec iov[2]; - int iovc; - sent = 0; - while ((sent != -1) && (sent < (length1 + length2))) { - if (sent < length1) { - iov[0].iov_base = ((char *) buffer1) + sent; - iov[0].iov_len = length1 - sent; - iov[1].iov_base = (char *) buffer2; - iov[1].iov_len = length2; - iovc = 2; - } else { - iov[0].iov_base = ((char *) buffer2) + (sent - length1); - iov[0].iov_len = length2 - (sent - length1); - iovc = 1; - } - i = writev(fd, &iov[0], iovc); - switch (i) { - case 0: - sent = -1; - break; - case -1: - switch (errno) { - case EAGAIN: - continue; - break; - default: - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "got error %s sending " - "%d bytes (at %d) to %d.\n", - strerror(errno), - length1 + length2, - sent, fd); - sent = -1; - break; - } - break; - default: - sent += i; - break; - } - } - return sent; -} - static bool_t dispatch_reply_fragment_connected(struct plugin_state *state, struct dispatch_client_data *cdata, @@ -534,20 +481,21 @@ dispatch_service_client(struct plugin_state *state, } void * -dispatch_thread(void *p) +dispatch_thread(struct wrapped_thread *t) { struct dispatch_client *clients, *client, *next, **list; struct dispatch_client_data client_data; - struct plugin_state *state = p; + struct plugin_state *state = wrap_thread_arg(t); struct pollfd *fds; int i, n_fds, client_count; ssize_t count; + bool_t stop; clients = NULL; client_count = 0; fds = NULL; - for (;;) { + for (stop = FALSE; !stop;) { /* Prune out recently-disconnected clients. */ list = &clients; while (*list != NULL) { @@ -583,10 +531,11 @@ dispatch_thread(void *p) client_count = i; } if (fds == NULL) { - fds = malloc((4 + client_count) * sizeof(fds[0])); + fds = malloc((state->n_listeners + client_count + 1) * + sizeof(fds[0])); if (fds == NULL) { /* Wait a bit, then try again? */ - poll(NULL, 0, 10 * 1000); + poll(NULL, 0, 10000); continue; } } @@ -595,11 +544,22 @@ dispatch_thread(void *p) "%d connected clients\n", i); /* Fill in the set of polling descriptors. */ - memset(fds, 0, sizeof((4 + client_count) * sizeof(fds[0]))); + memset(fds, 0, + sizeof((state->n_listeners + client_count + 1) * + sizeof(fds[0]))); for (i = 0; i < state->n_listeners; i++) { fds[i].fd = state->listener[i].fd; fds[i].events = POLLIN; } + + /* Add the shutdown pipe reader. */ + fds[i].fd = wrap_thread_stopfd(t); + fds[i].events = POLLIN; + if (fds[i].fd != -1) { + i++; + } + + /* Add the client list. */ client = clients; while (client != NULL) { fds[i].fd = client->client_fd; @@ -674,6 +634,12 @@ dispatch_thread(void *p) } } + /* Add the shutdown pipe reader. */ + if (fds[i].revents & POLLIN) { + stop = TRUE; + } + i++; + /* Service the already-connected clients. */ for (; i < n_fds; i++, client = client->client_next) { assert(client != NULL); @@ -681,5 +647,8 @@ dispatch_thread(void *p) dispatch_service_client(state, client, &fds[i]); } } + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "listening thread stopping\n"); return state; } diff --git a/src/dispatch.h b/src/dispatch.h index 97ade14..ea4813e 100644 --- a/src/dispatch.h +++ b/src/dispatch.h @@ -22,8 +22,9 @@ #ifndef dispatch_h #define dispatch_h struct plugin_state; +struct wrapped_thread; struct dispatch_client_data; -void *dispatch_thread(void *p); +void *dispatch_thread(struct wrapped_thread *t); typedef bool_t (dispatch_reply_fragment)(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, diff --git a/src/map.c b/src/map.c index 4b936c2..bdde29c 100644 --- a/src/map.c +++ b/src/map.c @@ -24,7 +24,6 @@ #endif #include -#include #include #include #include @@ -45,6 +44,7 @@ #include "dispatch.h" #include "map.h" #include "portmap.h" +#include "wrap.h" /* The singleton for the cache. */ struct { @@ -80,6 +80,7 @@ struct { int n_maps; } *domains; int n_domains; + struct wrapped_rwlock *lock; } map_data; static void * @@ -813,6 +814,28 @@ int map_init(struct slapi_pblock *pb, struct plugin_state *state) { memset(&map_data, 0, sizeof(map_data)); + map_data.lock = wrap_new_rwlock(); + if (map_data.lock == NULL) { + return -1; + } backend_init(pb, state); return 0; } + +void +map_rdlock(void) +{ + wrap_rwlock_rdlock(map_data.lock); +} + +void +map_wrlock(void) +{ + wrap_rwlock_wrlock(map_data.lock); +} + +void +map_unlock(void) +{ + wrap_rwlock_unlock(map_data.lock); +} diff --git a/src/map.h b/src/map.h index 0f712c6..017cb72 100644 --- a/src/map.h +++ b/src/map.h @@ -95,4 +95,7 @@ bool_t map_data_foreach_map(struct plugin_state *state, const char *domain_name, void *backend_data, void *cbdata), void *cbdata); +void map_rdlock(void); +void map_wrlock(void); +void map_unlock(void); #endif diff --git a/src/plugin.c b/src/plugin.c index 3219f4b..1b54d81 100644 --- a/src/plugin.c +++ b/src/plugin.c @@ -47,18 +47,13 @@ #include #endif -#ifdef USE_PTHREADS -#include -#endif - +#include "wrap.h" #include "dispatch.h" #include "map.h" #include "nis.h" #include "plugin.h" #include "portmap.h" -#define PACKAGE_VERSION "0.0" - /* the module initialization function */ static Slapi_PluginDesc plugin_description = { @@ -115,32 +110,14 @@ plugin_startup(Slapi_PBlock *pb) } } } -#if defined(USE_PTHREADS) /* Start a new listening thread to handle incoming traffic. */ - if (pthread_create(&state->tid, NULL, &dispatch_thread, state) != 0) { - slapi_log_error(SLAPI_LOG_PLUGIN, - plugin_description.spd_id, - "error starting listener thread\n"); - return -1; - } -#elif defined(USE_NSPR_THREADS) - /* Start a new listening thread to handle incoming traffic. */ - state->tid = PR_CreateThread(PR_USER_THREAD, - &dispatch_thread, - state, - PR_PRIORITY_NORMAL, - PR_GLOBAL_THREAD, - PR_JOINABLE_THREAD, - 0); + state->tid = wrap_start_thread(&dispatch_thread, state); if (state->tid == NULL) { slapi_log_error(SLAPI_LOG_PLUGIN, plugin_description.spd_id, "error starting listener thread\n"); return -1; } -#else -#error "Don't know how to start a thread for your server!" -#endif slapi_log_error(SLAPI_LOG_PLUGIN, plugin_description.spd_id, "plugin startup completed\n"); return 0; @@ -152,6 +129,7 @@ plugin_shutdown(Slapi_PBlock *pb) { struct plugin_state *state; slapi_pblock_get(pb, SLAPI_PLUGIN_PRIVATE, &state); + wrap_stop_thread(state->tid); if (state->pmap_client_socket != -1) { /* Clear our registration with the portmapper. */ portmap_unregister(plugin_description.spd_id, diff --git a/src/plugin.h b/src/plugin.h index bce8112..13c06c0 100644 --- a/src/plugin.h +++ b/src/plugin.h @@ -22,14 +22,10 @@ #ifndef plugin_h #define plugin_h +#include "wrap.h" + struct plugin_state { -#ifdef USE_PTHREADS - pthread_t tid; -#elif defined(USE_NSPR_THREADS) - PRThread *tid; -#else -#error "Don't know which threading model to use!" -#endif + struct wrapped_thread *tid; char *plugin_base; Slapi_ComponentId *plugin_identity; Slapi_PluginDesc *plugin_desc; -- cgit