summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-30 14:59:11 -0400
committerNalin Dahyabhai <nalin.dahyabhai@pobox.com>2008-05-30 14:59:11 -0400
commit3d207eb9f9f4b3bdc7e4775622fe75d318d054ae (patch)
treeb1a93be89a1a54915c81bc3c2194c6c04a99c67e /src
parent32dd26f4052b57bf1cdc7fad2cc47874ff989cc1 (diff)
- move nis_all processing to a works-in-chunks state machine
Diffstat (limited to 'src')
-rw-r--r--src/dispatch.c40
-rw-r--r--src/dispatch.h8
-rw-r--r--src/nis.c394
3 files changed, 351 insertions, 91 deletions
diff --git a/src/dispatch.c b/src/dispatch.c
index a9f6e98..155d407 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -76,7 +76,7 @@ struct dispatch_client {
ssize_t client_query_size;
void *client_query_cookie;
/* The reply to the client, when we're sending one. */
- char client_outbuf[4096];
+ char client_outbuf[8192];
ssize_t client_outbuf_used;
/* This is a linked list. */
struct dispatch_client *client_next;
@@ -102,28 +102,28 @@ dispatch_reply_fragment_dgram(struct plugin_state *state,
xdr_replymsg(reply_xdrs, reply);
if (!first_fragment || !last_fragment) {
slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "Trying to sending datagram reply (%d bytes), "
+ "trying to sending datagram reply (%d bytes), "
"even though the reply is not suitable for "
- "transmission as a datagram.\n",
+ "transmission as a datagram\n",
xdr_getpos(reply_xdrs));
} else {
slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "Sending datagram reply (%d bytes).\n",
+ "sending datagram reply (%d bytes)\n",
xdr_getpos(reply_xdrs));
}
sendto(cdata->dgram.client_fd, reply_buf, xdr_getpos(reply_xdrs),
0, &cdata->dgram.client_addr, cdata->dgram.client_addrlen);
return TRUE;
}
-static bool_t
+static void
dispatch_reply_dgram(struct plugin_state *state,
struct dispatch_client_data *cdata,
struct rpc_msg *reply,
XDR *reply_xdrs, char *reply_buf)
{
- return dispatch_reply_fragment_dgram(state, cdata,
- reply, reply_xdrs, reply_buf,
- TRUE, TRUE);
+ dispatch_reply_fragment_dgram(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, TRUE);
}
/* Send a reply, buffered-for-connected-clients version. */
@@ -161,7 +161,7 @@ dispatch_write_with_retry(struct plugin_state *state, int fd,
default:
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Got error %s sending "
+ "got error %s sending "
"%d bytes (at %d) to %d.\n",
strerror(errno),
length1 + length2,
@@ -197,7 +197,7 @@ dispatch_reply_fragment_connected(struct plugin_state *state,
if (next_size > sizeof(cdata->connected->client_outbuf)) {
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Failed to queue stream reply (4+%d bytes)!\n",
+ "failed to queue stream reply (4+%d bytes)!n",
xdr_getpos(reply_xdrs));
return FALSE;
}
@@ -214,21 +214,21 @@ dispatch_reply_fragment_connected(struct plugin_state *state,
cdata->connected->client_outbuf_used += (4 + xdr_getpos(reply_xdrs));
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Queued stream reply (4+%d bytes), %d total!\n",
+ "queued stream reply (4+%d bytes), %d total in queue\n",
xdr_getpos(reply_xdrs),
cdata->connected->client_outbuf_used);
return TRUE;
}
/* Send an entire reply record at once. */
-static bool_t
+static void
dispatch_reply_connected(struct plugin_state *state,
struct dispatch_client_data *cdata,
struct rpc_msg *reply,
XDR *reply_xdrs, char *reply_buf)
{
- return dispatch_reply_fragment_connected(state, cdata,
- reply, reply_xdrs, reply_buf,
- TRUE, TRUE);
+ dispatch_reply_fragment_connected(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ TRUE, TRUE);
}
/* Handle a datagram client -- read the request and handle it immediately. */
@@ -443,14 +443,14 @@ client_write(struct plugin_state *state, struct dispatch_client *client)
len = client->client_outbuf_used;
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Attempting to send %d bytes to %d.\n",
+ "attempting to send %d bytes to %d\n",
len, client->client_fd);
count = write(client->client_fd, client->client_outbuf, len);
if (count <= 0) {
if ((count != -1) || (errno != EAGAIN)) {
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Error sending %d bytes to %d.\n",
+ "error sending %d bytes to %d\n",
client->client_outbuf_used,
client->client_fd);
/* Fail, disconnect because we're out of sync. */
@@ -459,21 +459,21 @@ client_write(struct plugin_state *state, struct dispatch_client *client)
return;
}
slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id,
- "Sent %d bytes to %d.\n", count, client->client_fd);
+ "sent %d bytes to %d\n", count, client->client_fd);
if (count == client->client_outbuf_used) {
/* There's no more data to send. */
if (client->client_state == client_replying_final) {
/* Done. Go back to reading next time. */
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Waiting for next query on %d.\n",
+ "waiting for next query on %d\n",
client->client_fd);
client_set_reading(state, client);
} else {
/* More to send, so ask for more reply data. */
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "Waiting for more data for %d.\n",
+ "waiting for more data for %d\n",
client->client_fd);
client->client_outbuf_used = 0;
memset(&client_data, 0, sizeof(client_data));
diff --git a/src/dispatch.h b/src/dispatch.h
index 6ebb079..97ade14 100644
--- a/src/dispatch.h
+++ b/src/dispatch.h
@@ -30,8 +30,8 @@ typedef bool_t (dispatch_reply_fragment)(struct plugin_state *state,
XDR *reply_xdrs, char *reply_buf,
bool_t first_fragment,
bool_t last_fragment);
-typedef bool_t (dispatch_reply)(struct plugin_state *state,
- struct dispatch_client_data *cdata,
- struct rpc_msg *reply,
- XDR *reply_xdrs, char *reply_buf);
+typedef void (dispatch_reply)(struct plugin_state *state,
+ struct dispatch_client_data *cdata,
+ struct rpc_msg *reply,
+ XDR *reply_xdrs, char *reply_buf);
#endif
diff --git a/src/nis.c b/src/nis.c
index a12b60b..b61163a 100644
--- a/src/nis.c
+++ b/src/nis.c
@@ -353,6 +353,55 @@ nis_maplist(struct plugin_state *state,
}
}
+/* Enumeration, if we want to break it down into chunks, happens in a few
+ * phases (given the protocol):
+ * 1. we're sending the first entry in a map
+ * 2. we're sending a not-the-first entry in a map
+ * 3. we're sending an end-of-map
+ */
+struct nis_all_cookie {
+ enum nis_all_cookie_state {
+ cookie_bad,
+ cookie_first,
+ cookie_next,
+ cookie_this,
+ cookie_end1,
+ cookie_end2,
+ } state;
+ unsigned int key_length;
+ char key[1];
+};
+static void
+nis_all_free_cookie(struct nis_all_cookie *cookie)
+{
+ free(cookie);
+}
+static struct nis_all_cookie *
+nis_all_make_cookie(enum nis_all_cookie_state state,
+ unsigned int length, const char *value)
+{
+ struct nis_all_cookie *cookie;
+ cookie = malloc(sizeof(*cookie) + ((length > 0) ? length : 0));
+ if (cookie != NULL) {
+ cookie->state = state;
+ switch (cookie->state) {
+ case cookie_bad:
+ case cookie_first:
+ case cookie_end1:
+ case cookie_end2:
+ break;
+ case cookie_this:
+ case cookie_next:
+ cookie->key_length = length;
+ if ((length > 0) && (value != NULL)) {
+ memcpy(&cookie->key, value, length);
+ }
+ break;
+ }
+ }
+ return cookie;
+}
+
static void
nis_all(struct plugin_state *state,
dispatch_reply_fragment *reply_fragment_fn,
@@ -365,92 +414,303 @@ nis_all(struct plugin_state *state,
struct ypreq_nokey req_nokey;
keydat_t *reply_key;
valdat_t *reply_val;
- bool_t supported;
+ struct nis_all_cookie *cookie;
+ enum nis_all_cookie_state next_state;
+ bool_t supported, stop;
memset(&req_nokey, 0, sizeof(req_nokey));
reply_key = &reply_all->ypresp_all_u.val.keydat;
reply_val = &reply_all->ypresp_all_u.val.valdat;
if (xdr_ypreq_nokey(request_xdrs, &req_nokey)) {
+ /* Take ownership of the cookie data. */
+ if (continuation_cookie) {
+ if (*continuation_cookie != NULL) {
+ cookie = *continuation_cookie;
+ } else {
+ cookie = nis_all_make_cookie(cookie_bad,
+ 0, NULL);
+ }
+ *continuation_cookie = NULL;
+ } else {
+ cookie = nis_all_make_cookie(cookie_bad,
+ 0, NULL);
+ }
+ /* Check if we even support the map. */
if (!map_supports_map(state, req_nokey.domain, req_nokey.map,
&supported) ||
!supported) {
- /* No entries? No such map final status. */
+ /* No entries? No-such-map final status. */
reply_all->more = TRUE;
reply_all->ypresp_all_u.val.status = YP_NOMAP;
reply_key->keydat_len = 0;
reply_val->valdat_len = 0;
- (*reply_fragment_fn)(state, cdata,
- reply, reply_xdrs, reply_buf,
- TRUE, FALSE);
+ /* Encode the reply header so that we can queue the
+ * entire reply as one block. */
+ xdr_replymsg(reply_xdrs, reply);
/* End of data. */
reply_all->more = FALSE;
- xdr_setpos(reply_xdrs, 0);
xdr_ypresp_all(reply_xdrs, reply_all);
- (*reply_fragment_fn)(state, cdata,
- reply, reply_xdrs, reply_buf,
- FALSE, TRUE);
- } else {
- bool_t first;
- reply_all->more = TRUE;
- reply_all->ypresp_all_u.val.status = YP_TRUE;
- reply_all->more = map_first(state,
- req_nokey.domain,
- req_nokey.map,
- &reply_key->keydat_len,
- &reply_key->keydat_val,
- &reply_val->valdat_len,
- &reply_val->valdat_val);
- first = TRUE;
- do {
- /* Send back a result. If it's the first
- * entry, also send back the reply header. */
+ /* Queue the entire response. */
+ if (!(*reply_fragment_fn)(state, cdata,
+ reply, reply_xdrs, reply_buf,
+ FALSE, TRUE)) {
slapi_log_error(SLAPI_LOG_PLUGIN,
state->plugin_desc->spd_id,
- "all(%s/%s) -> (%.*s),%d\n",
+ "all(%s/%s) - error queueing "
+ "error response\n",
req_nokey.domain,
- req_nokey.map,
- reply_key->keydat_len,
- reply_key->keydat_val,
- reply_all->more);
- (*reply_fragment_fn)(state, cdata,
- reply,
- reply_xdrs, reply_buf,
- first, FALSE);
- first = FALSE;
- /* Find the next entry. */
- reply_all->more = map_next(state,
- req_nokey.domain,
- req_nokey.map,
- reply_key->keydat_len,
- reply_key->keydat_val,
- &reply_key->keydat_len,
- &reply_key->keydat_val,
- &reply_val->valdat_len,
- &reply_val->valdat_val);
- /* If we got an entry, encode it. */
- if (reply_all->more) {
- xdr_setpos(reply_xdrs, 0);
- xdr_ypresp_all(reply_xdrs, reply_all);
- }
- } while (reply_all->more);
- /* Send the end-of-map marker. */
- reply_all->ypresp_all_u.val.status = YP_NOMORE;
- reply_key->keydat_len = 0;
- reply_val->valdat_len = 0;
+ req_nokey.map);
+ }
+ /* Don't return a cookie, if one was passed to us. */
+ nis_all_free_cookie(cookie);
+ cookie = NULL;
+ } else
+ for (stop = FALSE; stop == FALSE;) {
+ bool_t found, skip;
xdr_setpos(reply_xdrs, 0);
- reply_all->more = TRUE;
- xdr_ypresp_all(reply_xdrs, reply_all);
- /* End of data. */
- reply_all->more = FALSE;
- xdr_ypresp_all(reply_xdrs, reply_all);
- /* Bundle those two chunks into one reply. */
- (*reply_fragment_fn)(state, cdata,
- reply, reply_xdrs, reply_buf,
- FALSE, TRUE);
- slapi_log_error(SLAPI_LOG_PLUGIN,
- state->plugin_desc->spd_id,
- "all(%s/%s) done\n",
- req_nokey.domain, req_nokey.map);
+ memset(reply_all, 0, sizeof(reply_all));
+ /* Follow any instructions we left for this iteration.
+ */
+ switch (cookie->state) {
+ case cookie_bad:
+ /* fall through */
+ case cookie_first:
+ /* Read the first key in the map, and make the
+ * next state either be queuing the first item
+ * or queueing the end-of-map reply. */
+ found = map_first(state,
+ req_nokey.domain,
+ req_nokey.map,
+ &reply_key->keydat_len,
+ &reply_key->keydat_val,
+ &reply_val->valdat_len,
+ &reply_val->valdat_val);
+ if (found) {
+ /* Next time grab the entry after this
+ * one. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) \"%.*s\"\n",
+ req_nokey.domain,
+ req_nokey.map,
+ reply_key->keydat_len,
+ reply_key->keydat_val);
+ skip = FALSE;
+ reply_all->more = TRUE;
+ reply_all->ypresp_all_u.val.status = YP_TRUE;
+ next_state = cookie_next;
+ } else {
+ /* Don't reply, just move to end-of-map
+ * state. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) no-first\n",
+ req_nokey.domain,
+ req_nokey.map);
+ skip = TRUE;
+ next_state = cookie_end1;
+ }
+ /* Try to queue the packet. */
+ nis_all_free_cookie(cookie);
+ if (skip ||
+ (*reply_fragment_fn)(state, cdata,
+ reply,
+ reply_xdrs, reply_buf,
+ TRUE, FALSE)) {
+ /* Leave a note to choose the next
+ * entry or send end1, whichever is
+ * appropriate. */
+ cookie = nis_all_make_cookie(next_state,
+ reply_key->keydat_len,
+ reply_key->keydat_val);
+ } else {
+ /* Leave a note to try sending the
+ * first entry again. */
+ cookie = nis_all_make_cookie(cookie_first,
+ 0, NULL);
+ stop = TRUE;
+ }
+ break;
+ case cookie_next:
+ /* Read the next key in the map, and set up the
+ * cookie to note that we're queuing a not-
+ * first item. */
+ found = map_next(state,
+ req_nokey.domain,
+ req_nokey.map,
+ cookie->key_length,
+ cookie->key,
+ &reply_key->keydat_len,
+ &reply_key->keydat_val,
+ &reply_val->valdat_len,
+ &reply_val->valdat_val);
+ if (found) {
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) \"%.*s\"\n",
+ req_nokey.domain,
+ req_nokey.map,
+ reply_key->keydat_len,
+ reply_key->keydat_val);
+ /* Next time grab the entry after this
+ * one. */
+ skip = FALSE;
+ reply_all->more = TRUE;
+ reply_all->ypresp_all_u.val.status = YP_TRUE;
+ next_state = cookie_next;
+ } else {
+ /* Don't reply, just move to end-of-map
+ * state. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) no-next\n",
+ req_nokey.domain,
+ req_nokey.map);
+ skip = TRUE;
+ next_state = cookie_end1;
+ }
+ /* Try to queue the packet. */
+ if (skip ||
+ (xdr_ypresp_all(reply_xdrs, reply_all) &&
+ (*reply_fragment_fn)(state, cdata,
+ reply,
+ reply_xdrs, reply_buf,
+ FALSE, FALSE))) {
+ /* Leave a note to choose the next
+ * entry or send end1, whichever is
+ * appropriate. */
+ nis_all_free_cookie(cookie);
+ cookie = nis_all_make_cookie(next_state,
+ reply_key->keydat_len,
+ reply_key->keydat_val);
+ } else {
+ /* Leave a note to retry sending this
+ * entry the next time. */
+ nis_all_free_cookie(cookie);
+ cookie = nis_all_make_cookie(cookie_this,
+ reply_key->keydat_len,
+ reply_key->keydat_val);
+ stop = TRUE;
+ }
+ break;
+ case cookie_this:
+ /* Read the matching key in the map, and set up
+ * the cookie to note that we're queuing a not-
+ * first item. */
+ reply_key->keydat_len = cookie->key_length;
+ reply_key->keydat_val = cookie->key;
+ found = map_match(state,
+ req_nokey.domain,
+ req_nokey.map,
+ reply_key->keydat_len,
+ reply_key->keydat_val,
+ &reply_val->valdat_len,
+ &reply_val->valdat_val);
+ if (found) {
+ /* Next time grab the entry after this
+ * one. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) \"%.*s\" "
+ "(retry)\n",
+ req_nokey.domain,
+ req_nokey.map,
+ cookie->key_length,
+ cookie->key);
+ skip = FALSE;
+ reply_all->more = TRUE;
+ reply_all->ypresp_all_u.val.status = YP_TRUE;
+ next_state = cookie_next;
+ } else {
+ /* Don't reply, just move to end-of-map
+ * state. */
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) \"%.*s\" "
+ "(disappeared?)\n",
+ req_nokey.domain,
+ req_nokey.map,
+ cookie->key_length,
+ cookie->key);
+ skip = TRUE;
+ next_state = cookie_end1;
+ }
+ /* Try to queue the packet. */
+ if (skip ||
+ (xdr_ypresp_all(reply_xdrs, reply_all) &&
+ (*reply_fragment_fn)(state, cdata,
+ reply,
+ reply_xdrs, reply_buf,
+ FALSE, FALSE))) {
+ /* Leave a note to choose the next
+ * entry or send end1, whichever is
+ * appropriate. */
+ nis_all_free_cookie(cookie);
+ cookie = nis_all_make_cookie(next_state,
+ reply_key->keydat_len,
+ reply_key->keydat_val);
+ } else {
+ /* Leave a note to retry sending this
+ * entry the next time. But that's how
+ * we got here, so do nothing. */
+ stop = TRUE;
+ }
+ break;
+ case cookie_end1:
+ /* Send the end-of-map message. */
+ memset(reply_key, 0, sizeof(*reply_key));
+ memset(reply_val, 0, sizeof(*reply_key));
+ reply_all->more = TRUE;
+ reply_all->ypresp_all_u.val.status = YP_NOMORE;
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) end-of-map\n",
+ req_nokey.domain,
+ req_nokey.map);
+ if (xdr_ypresp_all(reply_xdrs, reply_all) &&
+ (*reply_fragment_fn)(state, cdata,
+ reply,
+ reply_xdrs, reply_buf,
+ FALSE, FALSE)) {
+ /* Leave a note to finish the reply. */
+ nis_all_free_cookie(cookie);
+ cookie = nis_all_make_cookie(cookie_end2,
+ 0, NULL);
+ } else {
+ /* Leave the note alone, so that we'll
+ * have to try again. */
+ stop = TRUE;
+ }
+ break;
+ case cookie_end2:
+ /* Send the final message. */
+ reply_all->more = FALSE;
+ slapi_log_error(SLAPI_LOG_PLUGIN,
+ state->plugin_desc->spd_id,
+ "all(%s/%s) done\n",
+ req_nokey.domain,
+ req_nokey.map);
+ if (xdr_ypresp_all(reply_xdrs, reply_all) &&
+ (*reply_fragment_fn)(state, cdata,
+ reply,
+ reply_xdrs, reply_buf,
+ FALSE, TRUE)) {
+ /* We're done. */
+ nis_all_free_cookie(cookie);
+ cookie = NULL;
+ } else {
+ /* Leave the note alone, so that we'll
+ * have to try again. */
+ }
+ stop = TRUE;
+ break;
+ }
+ }
+ /* Return the cookie if we can, else destroy it. */
+ if (continuation_cookie) {
+ *continuation_cookie = cookie;
+ } else {
+ nis_all_free_cookie(cookie);
}
} else {
/* XXX */