summaryrefslogtreecommitdiffstats
path: root/src/dispatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dispatch.c')
-rw-r--r--src/dispatch.c87
1 files changed, 28 insertions, 59 deletions
diff --git a/src/dispatch.c b/src/dispatch.c
index f72aceb..8a9c981 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -30,7 +30,6 @@
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
-#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <syslog.h>
@@ -126,58 +125,6 @@ dispatch_reply_dgram(struct plugin_state *state,
TRUE, TRUE);
}
-/* Send a reply, buffered-for-connected-clients version. */
-static ssize_t
-dispatch_write_with_retry(struct plugin_state *state, int fd,
- const void *buffer1, ssize_t length1,
- const void *buffer2, ssize_t length2)
-{
- ssize_t sent, i;
- struct iovec iov[2];
- int iovc;
- sent = 0;
- while ((sent != -1) && (sent < (length1 + length2))) {
- if (sent < length1) {
- iov[0].iov_base = ((char *) buffer1) + sent;
- iov[0].iov_len = length1 - sent;
- iov[1].iov_base = (char *) buffer2;
- iov[1].iov_len = length2;
- iovc = 2;
- } else {
- iov[0].iov_base = ((char *) buffer2) + (sent - length1);
- iov[0].iov_len = length2 - (sent - length1);
- iovc = 1;
- }
- i = writev(fd, &iov[0], iovc);
- switch (i) {
- case 0:
- sent = -1;
- break;
- case -1:
- switch (errno) {
- case EAGAIN:
- continue;
- break;
- default:
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "got error %s sending "
- "%d bytes (at %d) to %d.\n",
- strerror(errno),
- length1 + length2,
- sent, fd);
- sent = -1;
- break;
- }
- break;
- default:
- sent += i;
- break;
- }
- }
- return sent;
-}
-
static bool_t
dispatch_reply_fragment_connected(struct plugin_state *state,
struct dispatch_client_data *cdata,
@@ -534,20 +481,21 @@ dispatch_service_client(struct plugin_state *state,
}
void *
-dispatch_thread(void *p)
+dispatch_thread(struct wrapped_thread *t)
{
struct dispatch_client *clients, *client, *next, **list;
struct dispatch_client_data client_data;
- struct plugin_state *state = p;
+ struct plugin_state *state = wrap_thread_arg(t);
struct pollfd *fds;
int i, n_fds, client_count;
ssize_t count;
+ bool_t stop;
clients = NULL;
client_count = 0;
fds = NULL;
- for (;;) {
+ for (stop = FALSE; !stop;) {
/* Prune out recently-disconnected clients. */
list = &clients;
while (*list != NULL) {
@@ -583,10 +531,11 @@ dispatch_thread(void *p)
client_count = i;
}
if (fds == NULL) {
- fds = malloc((4 + client_count) * sizeof(fds[0]));
+ fds = malloc((state->n_listeners + client_count + 1) *
+ sizeof(fds[0]));
if (fds == NULL) {
/* Wait a bit, then try again? */
- poll(NULL, 0, 10 * 1000);
+ poll(NULL, 0, 10000);
continue;
}
}
@@ -595,11 +544,22 @@ dispatch_thread(void *p)
"%d connected clients\n", i);
/* Fill in the set of polling descriptors. */
- memset(fds, 0, sizeof((4 + client_count) * sizeof(fds[0])));
+ memset(fds, 0,
+ sizeof((state->n_listeners + client_count + 1) *
+ sizeof(fds[0])));
for (i = 0; i < state->n_listeners; i++) {
fds[i].fd = state->listener[i].fd;
fds[i].events = POLLIN;
}
+
+ /* Add the shutdown pipe reader. */
+ fds[i].fd = wrap_thread_stopfd(t);
+ fds[i].events = POLLIN;
+ if (fds[i].fd != -1) {
+ i++;
+ }
+
+ /* Add the client list. */
client = clients;
while (client != NULL) {
fds[i].fd = client->client_fd;
@@ -674,6 +634,12 @@ dispatch_thread(void *p)
}
}
+ /* Add the shutdown pipe reader. */
+ if (fds[i].revents & POLLIN) {
+ stop = TRUE;
+ }
+ i++;
+
/* Service the already-connected clients. */
for (; i < n_fds; i++, client = client->client_next) {
assert(client != NULL);
@@ -681,5 +647,8 @@ dispatch_thread(void *p)
dispatch_service_client(state, client, &fds[i]);
}
}
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "listening thread stopping\n");
return state;
}