summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-27 20:55:55 -0400
committerNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-27 20:55:55 -0400
commitd9d141d4bfef490d086372b195e71f5cdac6016c (patch)
treeb78e4aa0c38c63f4446b46ef482e3015dbb67b4d /src
parentbeaf6734f8a8589c9213d8c95e58d98a417c2aaf (diff)
downloadslapi-nis-d9d141d4bfef490d086372b195e71f5cdac6016c.tar.gz
slapi-nis-d9d141d4bfef490d086372b195e71f5cdac6016c.tar.xz
slapi-nis-d9d141d4bfef490d086372b195e71f5cdac6016c.zip
- start queuing data for connected clients instead of sending it immediately
Diffstat (limited to 'src')
-rw-r--r--src/dispatch.c73
1 files changed, 54 insertions, 19 deletions
diff --git a/src/dispatch.c b/src/dispatch.c
index d453d71..a802487 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -148,6 +148,7 @@ dispatch_write_with_retry(struct plugin_state *state, int fd,
}
return sent;
}
+
static bool_t
dispatch_reply_fragment_connected(struct plugin_state *state,
struct dispatch_client_data *cdata,
@@ -156,31 +157,38 @@ dispatch_reply_fragment_connected(struct plugin_state *state,
bool_t first_fragment, bool_t last_fragment)
{
uint32_t len;
+ size_t next_size;
/* Record reply - first fragment. */
if (first_fragment) {
xdr_replymsg(reply_xdrs, reply);
}
- /* Calculate the fragment length bytes. */
- len = htonl(xdr_getpos(reply_xdrs) |
- (last_fragment ? 0x80000000 : 0));
- /* Send the data to the client. */
- if (dispatch_write_with_retry(state, cdata->connected->client_fd,
- &len, 4,
- reply_buf,
- xdr_getpos(reply_xdrs)) ==
- 4 + xdr_getpos(reply_xdrs)) {
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "Stream reply (4+%d bytes) sent.\n",
- xdr_getpos(reply_xdrs));
- return TRUE;
- } else {
+ /* If we don't have space for the data, stop now. */
+ next_size = cdata->connected->client_outbuf_used + 4 +
+ xdr_getpos(reply_xdrs);
+ if (next_size > sizeof(cdata->connected->client_outbuf)) {
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Stream reply (4+%d bytes) failed!\n",
+ "Failed to queue stream reply (4+%d bytes)!\n",
xdr_getpos(reply_xdrs));
return FALSE;
}
+ /* Calculate the fragment length bytes. */
+ len = htonl(xdr_getpos(reply_xdrs) | (last_fragment ? 0x80000000 : 0));
+ /* Queue the data. */
+ memcpy(cdata->connected->client_outbuf +
+ cdata->connected->client_outbuf_used,
+ &len, 4);
+ memcpy(cdata->connected->client_outbuf +
+ cdata->connected->client_outbuf_used + 4,
+ reply_buf,
+ xdr_getpos(reply_xdrs));
+ cdata->connected->client_outbuf_used += (4 + xdr_getpos(reply_xdrs));
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Queued stream reply (4+%d bytes), %d total!\n",
+ xdr_getpos(reply_xdrs),
+ cdata->connected->client_outbuf_used);
+ return TRUE;
}
/* Send an entire reply record at once. */
static bool_t
@@ -247,6 +255,7 @@ static struct dispatch_client *
dispatch_accept_client(struct plugin_state *state, int fd)
{
struct dispatch_client *client;
+ int flags;
client = malloc(sizeof(*client));
if (client == NULL) {
return NULL;
@@ -256,6 +265,10 @@ dispatch_accept_client(struct plugin_state *state, int fd)
free(client);
return NULL;
}
+ flags = fcntl(fd, F_GETFL);
+ if ((flags & O_NONBLOCK) == 0) {
+ fcntl(fd, F_SETFL, flags | O_NONBLOCK);
+ }
slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
"new connected client on %d\n", fd);
memset(client, 0, sizeof(*client));
@@ -398,23 +411,41 @@ client_write(struct plugin_state *state, struct dispatch_client *client)
struct dispatch_client_data client_data;
/* Try to send some of the pending data. */
- count = write(client->client_fd,
- client->client_outbuf,
- client->client_outbuf_used);
+ len = client->client_outbuf_used;
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Attempting to send %d bytes to %d.\n",
+ len, client->client_fd);
+ count = write(client->client_fd, client->client_outbuf, len);
if (count <= 0) {
if ((count != -1) || (errno != EAGAIN)) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Error sending %d bytes to %d.\n",
+ client->client_outbuf_used,
+ client->client_fd);
/* Fail, disconnect because we're out of sync. */
client_set_closing(state, client);
}
return;
}
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "Sent %d bytes to %d.\n", count, client->client_fd);
if (count == client->client_outbuf_used) {
/* There's no more data to send. */
if (client->client_state == client_replying_final) {
/* Done. Go back to reading next time. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Waiting for next query on %d.\n",
+ client->client_fd);
client_set_reading(state, client);
} else {
/* More to send, so ask for more reply data. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "Waiting for more data for %d.\n",
+ client->client_fd);
client->client_outbuf_used = 0;
memset(&client_data, 0, sizeof(client_data));
client_data.connected = client;
@@ -433,6 +464,10 @@ client_write(struct plugin_state *state, struct dispatch_client *client)
client->client_outbuf + count,
client->client_outbuf_used - count);
client->client_outbuf_used -= count;
+ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
+ "%d bytes to go for %d\n",
+ client->client_outbuf_used,
+ client->client_fd);
}
}