diff options
| author | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-05-27 20:55:55 -0400 |
|---|---|---|
| committer | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-05-27 20:55:55 -0400 |
| commit | d9d141d4bfef490d086372b195e71f5cdac6016c (patch) | |
| tree | b78e4aa0c38c63f4446b46ef482e3015dbb67b4d /src/dispatch.c | |
| parent | beaf6734f8a8589c9213d8c95e58d98a417c2aaf (diff) | |
| download | slapi-nis-d9d141d4bfef490d086372b195e71f5cdac6016c.tar.gz slapi-nis-d9d141d4bfef490d086372b195e71f5cdac6016c.tar.xz slapi-nis-d9d141d4bfef490d086372b195e71f5cdac6016c.zip | |
- start queuing data for connected clients instead of sending it immediately
Diffstat (limited to 'src/dispatch.c')
| -rw-r--r-- | src/dispatch.c | 73 |
1 files changed, 54 insertions, 19 deletions
diff --git a/src/dispatch.c b/src/dispatch.c index d453d71..a802487 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -148,6 +148,7 @@ dispatch_write_with_retry(struct plugin_state *state, int fd, } return sent; } + static bool_t dispatch_reply_fragment_connected(struct plugin_state *state, struct dispatch_client_data *cdata, @@ -156,31 +157,38 @@ dispatch_reply_fragment_connected(struct plugin_state *state, bool_t first_fragment, bool_t last_fragment) { uint32_t len; + size_t next_size; /* Record reply - first fragment. */ if (first_fragment) { xdr_replymsg(reply_xdrs, reply); } - /* Calculate the fragment length bytes. */ - len = htonl(xdr_getpos(reply_xdrs) | - (last_fragment ? 0x80000000 : 0)); - /* Send the data to the client. */ - if (dispatch_write_with_retry(state, cdata->connected->client_fd, - &len, 4, - reply_buf, - xdr_getpos(reply_xdrs)) == - 4 + xdr_getpos(reply_xdrs)) { - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "Stream reply (4+%d bytes) sent.\n", - xdr_getpos(reply_xdrs)); - return TRUE; - } else { + /* If we don't have space for the data, stop now. */ + next_size = cdata->connected->client_outbuf_used + 4 + + xdr_getpos(reply_xdrs); + if (next_size > sizeof(cdata->connected->client_outbuf)) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Stream reply (4+%d bytes) failed!\n", + "Failed to queue stream reply (4+%d bytes)!\n", xdr_getpos(reply_xdrs)); return FALSE; } + /* Calculate the fragment length bytes. */ + len = htonl(xdr_getpos(reply_xdrs) | (last_fragment ? 0x80000000 : 0)); + /* Queue the data. */ + memcpy(cdata->connected->client_outbuf + + cdata->connected->client_outbuf_used, + &len, 4); + memcpy(cdata->connected->client_outbuf + + cdata->connected->client_outbuf_used + 4, + reply_buf, + xdr_getpos(reply_xdrs)); + cdata->connected->client_outbuf_used += (4 + xdr_getpos(reply_xdrs)); + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Queued stream reply (4+%d bytes), %d total!\n", + xdr_getpos(reply_xdrs), + cdata->connected->client_outbuf_used); + return TRUE; } /* Send an entire reply record at once. */ static bool_t @@ -247,6 +255,7 @@ static struct dispatch_client * dispatch_accept_client(struct plugin_state *state, int fd) { struct dispatch_client *client; + int flags; client = malloc(sizeof(*client)); if (client == NULL) { return NULL; @@ -256,6 +265,10 @@ dispatch_accept_client(struct plugin_state *state, int fd) free(client); return NULL; } + flags = fcntl(fd, F_GETFL); + if ((flags & O_NONBLOCK) == 0) { + fcntl(fd, F_SETFL, flags | O_NONBLOCK); + } slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "new connected client on %d\n", fd); memset(client, 0, sizeof(*client)); @@ -398,23 +411,41 @@ client_write(struct plugin_state *state, struct dispatch_client *client) struct dispatch_client_data client_data; /* Try to send some of the pending data. */ - count = write(client->client_fd, - client->client_outbuf, - client->client_outbuf_used); + len = client->client_outbuf_used; + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Attempting to send %d bytes to %d.\n", + len, client->client_fd); + count = write(client->client_fd, client->client_outbuf, len); if (count <= 0) { if ((count != -1) || (errno != EAGAIN)) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Error sending %d bytes to %d.\n", + client->client_outbuf_used, + client->client_fd); /* Fail, disconnect because we're out of sync. */ client_set_closing(state, client); } return; } + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "Sent %d bytes to %d.\n", count, client->client_fd); if (count == client->client_outbuf_used) { /* There's no more data to send. */ if (client->client_state == client_replying_final) { /* Done. Go back to reading next time. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Waiting for next query on %d.\n", + client->client_fd); client_set_reading(state, client); } else { /* More to send, so ask for more reply data. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "Waiting for more data for %d.\n", + client->client_fd); client->client_outbuf_used = 0; memset(&client_data, 0, sizeof(client_data)); client_data.connected = client; @@ -433,6 +464,10 @@ client_write(struct plugin_state *state, struct dispatch_client *client) client->client_outbuf + count, client->client_outbuf_used - count); client->client_outbuf_used -= count; + slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, + "%d bytes to go for %d\n", + client->client_outbuf_used, + client->client_fd); } } |
