diff options
| author | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-05-27 20:06:26 -0400 |
|---|---|---|
| committer | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-05-27 20:06:26 -0400 |
| commit | b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5 (patch) | |
| tree | 5f57fd0d5074d411aa54a073e973033ac7c35ef6 /src/dispatch.c | |
| parent | b595fe2039de58f655e226e4e427530884d11d3f (diff) | |
| download | slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.tar.gz slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.tar.xz slapi-nis-b64ae5cb2f3adb40da0a61ddba1eb0d71f12b2b5.zip | |
- rewrite dispatching code so that
* we multiplex servicing for connected clients in the same thread that
handles datagram clients
* nis query routines use a callback rather than a symbol, so that...
* ...we can supply different callbacks for datagram and connected clients
* nis query routines have the option of giving us a chunk of output and
telling us to come back later for more, though for now only nis_all gets
the option of doing that, and currently write those chunks immediately in
a blocking way
Diffstat (limited to 'src/dispatch.c')
| -rw-r--r-- | src/dispatch.c | 721 |
1 files changed, 463 insertions, 258 deletions
diff --git a/src/dispatch.c b/src/dispatch.c index 34932f1..38197cc 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -26,17 +26,48 @@ #define MAX_CLIENT_IDLE (60 * 1000) -struct dispatch_stream_client_params { - int client; - struct plugin_state *state; -}; - struct dispatch_client_data { int client; struct sockaddr client_addr; socklen_t client_addrlen; }; +/* Send a reply, unbuffered datagram version. */ +static bool_t +dispatch_reply_fragment_dgram(struct plugin_state *state, + struct dispatch_client_data *cdata, + struct rpc_msg *reply, + XDR *reply_xdrs, char *reply_buf, + bool_t first_fragment, bool_t last_fragment) +{ + xdr_replymsg(reply_xdrs, reply); + if (!first_fragment || !last_fragment) { + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "Trying to sending datagram reply (%d bytes), " + "even though the reply is not suitable for " + "transmission as a datagram.\n", + xdr_getpos(reply_xdrs)); + } else { + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "Sending datagram reply (%d bytes).\n", + xdr_getpos(reply_xdrs)); + } + sendto(cdata->client, reply_buf, xdr_getpos(reply_xdrs), + 0, &cdata->client_addr, cdata->client_addrlen); + return TRUE; +} +static bool_t +dispatch_reply_dgram(struct plugin_state *state, + struct dispatch_client_data *cdata, + struct rpc_msg *reply, + XDR *reply_xdrs, char *reply_buf) +{ + return dispatch_reply_fragment_dgram(state, cdata, + reply, reply_xdrs, reply_buf, + TRUE, TRUE); +} + +/* Send a reply, buffered-for-connected-clients version. */ static ssize_t dispatch_write_with_retry(struct plugin_state *state, int fd, const void *buffer1, ssize_t length1, @@ -87,297 +118,449 @@ dispatch_write_with_retry(struct plugin_state *state, int fd, } return sent; } - -/* Send a reply which _could_ be sent over a datagram. */ -void -dispatch_reply_fragment(struct plugin_state *state, - struct dispatch_client_data *cdata, - struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf, - bool_t first_fragment, bool_t last_fragment) +static bool_t +dispatch_reply_fragment_connected(struct plugin_state *state, + struct dispatch_client_data *cdata, + struct rpc_msg *reply, + XDR *reply_xdrs, char *reply_buf, + bool_t first_fragment, bool_t last_fragment) { uint32_t len; - if (cdata->client_addrlen != 0) { - /* It's a datagram socket, so there's nothing to do but send - * the data by itself. */ + /* Record reply - first fragment. */ + if (first_fragment) { xdr_replymsg(reply_xdrs, reply); - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Sending datagram reply (%d bytes).\n", + } + /* Calculate the fragment length bytes. */ + len = htonl(xdr_getpos(reply_xdrs) | + (last_fragment ? 0x80000000 : 0)); + /* Send the data to the client. */ + if (dispatch_write_with_retry(state, cdata->client, + &len, 4, + reply_buf, + xdr_getpos(reply_xdrs)) == + 4 + xdr_getpos(reply_xdrs)) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Stream reply (4+%d bytes) sent.\n", xdr_getpos(reply_xdrs)); - sendto(cdata->client, reply_buf, xdr_getpos(reply_xdrs), - 0, &cdata->client_addr, cdata->client_addrlen); + return TRUE; } else { - /* Record reply - one fragment. */ - if (first_fragment) { - xdr_replymsg(reply_xdrs, reply); - } - /* Calculate the fragment length bytes. */ - len = htonl(xdr_getpos(reply_xdrs) | - (last_fragment ? 0x80000000 : 0)); - /* Send the data to the client. */ - if (dispatch_write_with_retry(state, cdata->client, - &len, 4, - reply_buf, - xdr_getpos(reply_xdrs)) == - 4 + xdr_getpos(reply_xdrs)) { - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "Stream reply (4+%d bytes) sent.\n", - xdr_getpos(reply_xdrs)); - } else { - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "Stream reply (4+%d bytes) failed!\n", - xdr_getpos(reply_xdrs)); - } + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Stream reply (4+%d bytes) failed!\n", + xdr_getpos(reply_xdrs)); + return FALSE; } } - -/* Send an entire reply datagram or record at once. */ -void -dispatch_reply(struct plugin_state *state, struct dispatch_client_data *cdata, - struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf) +/* Send an entire reply record at once. */ +bool_t +dispatch_reply_connected(struct plugin_state *state, + struct dispatch_client_data *cdata, + struct rpc_msg *reply, + XDR *reply_xdrs, char *reply_buf) { - dispatch_reply_fragment(state, cdata, - reply, reply_xdrs, reply_buf, TRUE, TRUE); + return dispatch_reply_fragment_connected(state, cdata, + reply, reply_xdrs, reply_buf, + TRUE, TRUE); } -/* Handle requests from one stream client. */ -static void * -dispatch_stream_handler_thread(void *arg) +/* Handle a datagram client -- read the request and handle it immediately. */ +static void +dispatch_dgram(struct plugin_state *state, int fd) { - int flags, frag_len, record_len; - ssize_t i; - int32_t len, nlen; - char fragment[65540], record[65536]; - struct pollfd pollfd; - struct dispatch_stream_client_params *params; struct dispatch_client_data cdata; - struct plugin_state *state; - bool_t last; + char dgram[65536]; + int reqsize; - /* Recover the full set of parameters. */ - params = arg; - state = params->state; - cdata.client = params->client; - memset(&cdata.client_addr, 0, sizeof(cdata.client_addr)); - cdata.client_addrlen = 0; - free(arg); + /* Read the request. */ + cdata.client = fd; + cdata.client_addrlen = sizeof(cdata.client_addr); + reqsize = recvfrom(fd, dgram, sizeof(dgram), 0, + &cdata.client_addr, &cdata.client_addrlen); + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "datagram request (%d bytes)\n", reqsize); - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "opened client connection %d, thread started\n", - cdata.client); + /* Handle the request. */ + nis_process_request(state, dgram, reqsize, + &dispatch_reply_fragment_dgram, + &dispatch_reply_dgram, + &cdata, NULL); +} - /* Set the connection to be non-blocking. */ - flags = fcntl(cdata.client, F_GETFD); - if ((flags == -1) || - (fcntl(cdata.client, F_SETFD, flags | O_NONBLOCK) == -1)) { - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "error setting new client connection to be " - "non-blocking\n"); - close(cdata.client); +/* Handle all incoming data. */ +struct dispatch_client { + /* The client socket and address. */ + int client_fd; + struct sockaddr client_addr; + socklen_t client_addrlen; + /* The client state. */ + enum { + client_invalid, + client_closing, + client_reading, + client_replying_with_more, + client_replying_final, + } client_state; + /* The client's request while we're reading it. */ + char client_inbuf[8192]; + ssize_t client_inbuf_used; + char *client_query; + ssize_t client_query_size; + void *client_query_cookie; + /* The reply to the client, when we're sending one. */ + char client_outbuf[4096]; + ssize_t client_outbuf_used; + /* This is a linked list. */ + struct dispatch_client *client_next; +}; + +/* Set the client's record up to start reading a new query. */ +static void +client_set_reading(struct plugin_state *state, struct dispatch_client *client) +{ + client->client_inbuf_used = 0; + free(client->client_query); + client->client_query = NULL; + client->client_query_size = 0; + client->client_outbuf_used = 0; + client->client_state = client_reading; +} + +/* Set the client's record up to be cleaned up. */ +static void +client_set_closing(struct plugin_state *state, struct dispatch_client *client) +{ + client->client_inbuf_used = 0; + free(client->client_query); + client->client_query = NULL; + client->client_query_size = 0; + client->client_outbuf_used = 0; + client->client_state = client_closing; +} + +/* Set the client's record up. */ +static struct dispatch_client * +dispatch_accept_client(struct plugin_state *state, int fd) +{ + struct dispatch_client *client; + client = malloc(sizeof(*client)); + if (client == NULL) { + return NULL; + } + fd = accept(fd, &client->client_addr, &client->client_addrlen); + if (fd == -1) { + free(client); return NULL; } - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "set new client connection to be non-blocking\n"); + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "new connected client on %d\n", fd); + memset(client, 0, sizeof(*client)); + client->client_fd = fd; + client_set_reading(state, client); + return client; +} - /* Assemble fragments and responses. */ - frag_len = 0; - record_len = 0; - for (;;) { - /* Wait for incoming data. */ - pollfd.fd = cdata.client; - pollfd.events = POLLIN | POLLHUP; - switch (poll(&pollfd, 1, MAX_CLIENT_IDLE)) { - case -1: - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "communication error\n"); - goto done; - break; - case 0: - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "client timeout\n"); - goto done; - break; - default: - break; +/* Decide what to do next. */ +static void +client_interpret_nis_result(struct plugin_state *state, + struct dispatch_client *client) +{ + if (client->client_query_cookie != NULL) { + client->client_state = client_replying_with_more; + } else { + if (client->client_outbuf_used > 0) { + client->client_state = client_replying_final; + } else { + client_set_reading(state, client); } - /* Try to read as much data as we have space to hold. */ - i = read(cdata.client, fragment + frag_len, - sizeof(fragment) - frag_len); - switch (i) { - case -1: - if (errno == EAGAIN) { - continue; + } +} + +/* Handle reading state. */ +static void +client_read(struct plugin_state *state, struct dispatch_client *client) +{ + ssize_t count; + int32_t len, nlen; + int last; + char *query; + struct dispatch_client_data client_data; + /* Try to read some data. */ + count = read(client->client_fd, + client->client_inbuf + client->client_inbuf_used, + sizeof(client->client_inbuf) - client->client_inbuf_used); + if (count <= 0) { + if ((count != -1) || (errno != EAGAIN)) { + /* Disconnect the client. */ + if (count == 0) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "no more data from %d, " + "marking for closing\n", + client->client_fd); + } else { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "error reading from %d, " + "marking for closing\n", + client->client_fd); } - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "communication error\n"); - goto done; - break; - case 0: - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "client closed connection\n"); - goto done; - break; - default: - break; + client_set_closing(state, client); } - /* We managed to read some piece of a fragment. */ + } else { + /* Record the data as added to the fragment buffer. */ + client->client_inbuf_used += count; slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "read %d bytes\n", i); - frag_len += i; - /* If we can't know the fragment size yet, then wait for more - * data. */ - if (frag_len < 4) { - continue; + "have %d bytes from %d\n", + client->client_inbuf_used, + client->client_fd); + /* Check if we've got a complete fragment. */ + if (client->client_inbuf_used < 4) { + /* We don't even have a length, so continue reading. */ + return; } - /* Convert the fragment length information to local byte order, - * and determine if this is the last fragment in the record. */ - memcpy(&nlen, fragment, 4); + /* Read the length of the first fragment in the buffer. */ + memcpy(&nlen, client->client_inbuf, 4); len = ntohl(nlen); last = ((len & 0x80000000) != 0); len &= 0x7fffffff; - /* If we don't have a complete fragment, then wait for more - * data. */ - if (frag_len < (len + 4)) { - continue; - } - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "expecting %d bytes\n", len); - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "in: frag_len=%d,record=%d\n", - frag_len, record_len); - /* Pull the fragment out of the buffer and append it to the - * record buffer. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "got whole fragment (%d bytes)\n", len); - memmove(record + record_len, fragment + 4, len); - record_len += len; - memmove(fragment, fragment + (len + 4), frag_len - (len + 4)); - frag_len -= (len + 4); - /* If there are more fragments to come, then we need to wait - * for more data. */ - if (!last) { - continue; + "fragment is %d bytes long%s, " + "have %d bytes pending, on %d\n", + len, last ? " (last one)" : "", + client->client_inbuf_used, client->client_fd); + if ((len + 4) <= client->client_inbuf_used) { + /* Got at least one fragment! */ + nlen = len + client->client_query_size; + query = malloc(nlen); + if (query == NULL) { + /* Out of memory, we'll have to try again + * later. */ + return; + } + /* Copy any previously-received fragments and append + * this one. */ + if (client->client_query_size > 0) { + memcpy(query, + client->client_query, + client->client_query_size); + } + memcpy(query + client->client_query_size, + client->client_inbuf + 4, + len); + /* Save the new query-in-progress. */ + free(client->client_query); + client->client_query = query; + client->client_query_size = nlen; + /* Drop the fragment from the incoming + * buffer. */ + memmove(client->client_inbuf, + client->client_inbuf + (len + 4), + client->client_inbuf_used - (len + 4)); + client->client_inbuf_used -= (len + 4); + } + if (last) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "query is %d bytes long on %d\n", + client->client_query_size, + client->client_fd); + /* We have a complete query. Pass it on down. */ + client_data.client = client->client_fd; + memset(&client_data.client_addr, 0, + sizeof(client_data.client_addr)); + client_data.client_addrlen = 0; + nis_process_request(state, + client->client_query, + client->client_query_size, + &dispatch_reply_fragment_connected, + &dispatch_reply_connected, + &client_data, + &client->client_query_cookie); + /* Decide what to do next. */ + client_interpret_nis_result(state, client); } - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "got whole record (%d bytes)\n", record_len); - nis_process_request(state, &cdata, record, record_len); - record_len = 0; - /* Note that we did it! */ - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "out: frag_len=%d,record=%d\n", - frag_len, record_len); } -done: - close(cdata.client); - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "closed client connection %d, thread ending\n", - cdata.client); - return NULL; } -/* Handle a stream client -- answer the connection and spawn a thread to handle - * its actual requests. */ +/* Handle replying states. */ static void -dispatch_stream(struct plugin_state *state, int fd) +client_write(struct plugin_state *state, struct dispatch_client *client) { - int flags; - pthread_t thread; - struct dispatch_client_data cdata; - struct dispatch_stream_client_params *params; + ssize_t count; + int32_t len, nlen; + int last; + char *query; + struct dispatch_client_data client_data; - /* Answer the connection request. */ - cdata.client = accept(fd, &cdata.client_addr, &cdata.client_addrlen); - if (cdata.client == -1) { - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "failed to answer new stream request\n"); - return; - } else { - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "new stream client on %d\n", cdata.client); - flags = fcntl(cdata.client, F_GETFL); - flags |= O_NONBLOCK; - if (fcntl(cdata.client, F_SETFL, flags) != 0) { - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "error setting client to " - "non-blocking\n"); + /* Try to send some of the pending data. */ + count = write(client->client_fd, + client->client_outbuf, + client->client_outbuf_used); + if (count <= 0) { + if ((count != -1) || (errno != EAGAIN)) { + /* Fail, disconnect because we're out of sync. */ + client_set_closing(state, client); } - } - - /* Bundle up enough info for the thread to do its work. */ - params = malloc(sizeof(*params)); - if (params == NULL) { - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "out of memory\n"); - close(cdata.client); return; } - params->client = cdata.client; - params->state = state; - - /* Kick off the thread. */ - if (pthread_create(&thread, NULL, - &dispatch_stream_handler_thread, params) != 0) { - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "error starting thread\n"); - close(cdata.client); + if (count == client->client_outbuf_used) { + /* There's no more data to send. */ + if (client->client_state == client_replying_final) { + /* Done. Go back to reading next time. */ + client_set_reading(state, client); + } else { + /* More to send, so ask for more reply data. */ + client->client_outbuf_used = 0; + client_data.client = client->client_fd; + client_data.client_addr = client->client_addr; + client_data.client_addrlen = client->client_addrlen; + nis_process_request(state, + client->client_query, + client->client_query_size, + &dispatch_reply_fragment_connected, + &dispatch_reply_connected, + &client_data, + &client->client_query_cookie); + client_interpret_nis_result(state, client); + } + } else { + /* Partial write, adjust outgoing buffer for next time. */ + memmove(client->client_outbuf, + client->client_outbuf + count, + client->client_outbuf_used - count); + client->client_outbuf_used -= count; } - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "started client-specific thread for client on %d\n", - cdata.client); - return; } -/* Handle a datagram client -- read the request and handle it immediately. */ static void -dispatch_dgram(struct plugin_state *state, int fd) +dispatch_service_client(struct plugin_state *state, + struct dispatch_client *client, + struct pollfd *fd) { - struct dispatch_client_data cdata; - char dgram[65536]; - int reqsize; - - /* Read the request. */ - cdata.client = fd; - cdata.client_addrlen = sizeof(cdata.client_addr); - reqsize = recvfrom(fd, dgram, sizeof(dgram), 0, - &cdata.client_addr, &cdata.client_addrlen); - slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "datagram request (%d bytes)\n", reqsize); - - /* Handle the request. */ - nis_process_request(state, &cdata, dgram, reqsize); + ssize_t count; + int32_t len, nlen; + int last; + char *query; + switch (client->client_state) { + case client_reading: + if (fd->revents & POLLIN) { + client_read(state, client); + } else { + client_set_closing(state, client); + } + break; + case client_replying_with_more: + case client_replying_final: + if (fd->revents & POLLOUT) { + client_write(state, client); + } else { + client_set_closing(state, client); + } + break; + case client_closing: + case client_invalid: + /* never reached */ + assert(0); + break; + } } - -/* Handle all incoming data. */ void * dispatch_thread(void *p) { + struct dispatch_client *clients, *client, *next, **list; + struct dispatch_client_data client_data; struct plugin_state *state = p; - struct pollfd fds[4]; - int i; + struct pollfd *fds; + int i, n_fds, client_count; + ssize_t count; + + clients = NULL; + client_count = 0; + fds = NULL; + for (;;) { - /* Set up for polling. */ - memset(&fds, 0, sizeof(fds)); + /* Prune out recently-disconnected clients. */ + list = &clients; + while (*list != NULL) { + client = *list; + next = client->client_next; + if (client->client_state == client_closing) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "pruning client %d\n", + client->client_fd); + if (client->client_fd != -1) { + close(client->client_fd); + } + free(client); + *list = next; + } else { + list = &(client->client_next); + } + } + /* Count the number of connected clients we have. */ + client = clients; + i = 0; + while (client != NULL) { + next = client->client_next; + client = next; + i++; + } + /* If the "fds" block isn't big enough (or doesn't exist yet), + * reallocate it. */ + if (i > client_count) { + free(fds); + fds = NULL; + client_count = i; + } + if (fds == NULL) { + fds = malloc((4 + client_count) * sizeof(fds[0])); + if (fds == NULL) { + /* Wait a bit, then try again? */ + poll(NULL, 0, 10 * 1000); + continue; + } + } + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "%d connected clients\n", i); + + /* Fill in the set of polling descriptors. */ + memset(fds, 0, sizeof((4 + client_count) * sizeof(fds[0]))); for (i = 0; i < state->n_listeners; i++) { fds[i].fd = state->listener[i].fd; fds[i].events = POLLIN; } + client = clients; + while (client != NULL) { + fds[i].fd = client->client_fd; + switch (client->client_state) { + case client_reading: + fds[i].events = POLLIN; + break; + case client_replying_with_more: + case client_replying_final: + fds[i].events = POLLOUT; + break; + case client_closing: + case client_invalid: + /* shouldn't happen */ + assert(0); + break; + } + client = client->client_next; + i++; + } + + /* Check for status updates. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "listening for next request\n"); - switch (poll(fds, state->n_listeners, -1)) { + "listening\n"); + n_fds = i; + switch (poll(fds, n_fds, -1)) { case -1: slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, @@ -387,29 +570,51 @@ dispatch_thread(void *p) case 0: slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "no request(?)\n"); + "no request(timeout?)\n"); continue; default: - /* Iterate over listening sockets which have work for - * us to do. */ - for (i = 0; i < state->n_listeners; i++) { - if ((fds[i].revents & POLLIN) == 0) { - continue; - } - switch (state->listener[i].type) { - case SOCK_DGRAM: - dispatch_dgram(state, fds[i].fd); - break; - case SOCK_STREAM: - dispatch_stream(state, fds[i].fd); - break; - default: - /* never reached */ - assert(0); - break; + break; + } + + /* Save the head of the existing clients list. */ + client = clients; + + /* Iterate over listening sockets which have work for us. */ + for (i = 0; i < state->n_listeners; i++) { + if ((fds[i].revents & POLLIN) == 0) { + continue; + } + switch (state->listener[i].type) { + case SOCK_DGRAM: + /* Datagram requests we handle right + * here, right now. */ + dispatch_dgram(state, fds[i].fd); + break; + case SOCK_STREAM: + /* Try to accept a new client. */ + next = dispatch_accept_client(state, fds[i].fd); + if (next != NULL) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "new client on %d\n", + next->client_fd); + next->client_next = clients; + clients = next; } + break; + default: + /* never reached */ + assert(0); + break; } } + + /* Service the already-connected clients. */ + for (; i < n_fds; i++, client = client->client_next) { + assert(client != NULL); + assert(client->client_fd == fds[i].fd); + dispatch_service_client(state, client, &fds[i]); + } } return state; } |
