diff options
| author | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-05-30 14:59:11 -0400 |
|---|---|---|
| committer | Nalin Dahyabhai <nalin.dahyabhai@pobox.com> | 2008-05-30 14:59:11 -0400 |
| commit | 3d207eb9f9f4b3bdc7e4775622fe75d318d054ae (patch) | |
| tree | b1a93be89a1a54915c81bc3c2194c6c04a99c67e /src | |
| parent | 32dd26f4052b57bf1cdc7fad2cc47874ff989cc1 (diff) | |
- move nis_all processing to a works-in-chunks state machine
Diffstat (limited to 'src')
| -rw-r--r-- | src/dispatch.c | 40 | ||||
| -rw-r--r-- | src/dispatch.h | 8 | ||||
| -rw-r--r-- | src/nis.c | 394 |
3 files changed, 351 insertions, 91 deletions
diff --git a/src/dispatch.c b/src/dispatch.c index a9f6e98..155d407 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -76,7 +76,7 @@ struct dispatch_client { ssize_t client_query_size; void *client_query_cookie; /* The reply to the client, when we're sending one. */ - char client_outbuf[4096]; + char client_outbuf[8192]; ssize_t client_outbuf_used; /* This is a linked list. */ struct dispatch_client *client_next; @@ -102,28 +102,28 @@ dispatch_reply_fragment_dgram(struct plugin_state *state, xdr_replymsg(reply_xdrs, reply); if (!first_fragment || !last_fragment) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Trying to sending datagram reply (%d bytes), " + "trying to sending datagram reply (%d bytes), " "even though the reply is not suitable for " - "transmission as a datagram.\n", + "transmission as a datagram\n", xdr_getpos(reply_xdrs)); } else { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Sending datagram reply (%d bytes).\n", + "sending datagram reply (%d bytes)\n", xdr_getpos(reply_xdrs)); } sendto(cdata->dgram.client_fd, reply_buf, xdr_getpos(reply_xdrs), 0, &cdata->dgram.client_addr, cdata->dgram.client_addrlen); return TRUE; } -static bool_t +static void dispatch_reply_dgram(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf) { - return dispatch_reply_fragment_dgram(state, cdata, - reply, reply_xdrs, reply_buf, - TRUE, TRUE); + dispatch_reply_fragment_dgram(state, cdata, + reply, reply_xdrs, reply_buf, + TRUE, TRUE); } /* Send a reply, buffered-for-connected-clients version. */ @@ -161,7 +161,7 @@ dispatch_write_with_retry(struct plugin_state *state, int fd, default: slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Got error %s sending " + "got error %s sending " "%d bytes (at %d) to %d.\n", strerror(errno), length1 + length2, @@ -197,7 +197,7 @@ dispatch_reply_fragment_connected(struct plugin_state *state, if (next_size > sizeof(cdata->connected->client_outbuf)) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Failed to queue stream reply (4+%d bytes)!\n", + "failed to queue stream reply (4+%d bytes)!n", xdr_getpos(reply_xdrs)); return FALSE; } @@ -214,21 +214,21 @@ dispatch_reply_fragment_connected(struct plugin_state *state, cdata->connected->client_outbuf_used += (4 + xdr_getpos(reply_xdrs)); slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Queued stream reply (4+%d bytes), %d total!\n", + "queued stream reply (4+%d bytes), %d total in queue\n", xdr_getpos(reply_xdrs), cdata->connected->client_outbuf_used); return TRUE; } /* Send an entire reply record at once. */ -static bool_t +static void dispatch_reply_connected(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf) { - return dispatch_reply_fragment_connected(state, cdata, - reply, reply_xdrs, reply_buf, - TRUE, TRUE); + dispatch_reply_fragment_connected(state, cdata, + reply, reply_xdrs, reply_buf, + TRUE, TRUE); } /* Handle a datagram client -- read the request and handle it immediately. */ @@ -443,14 +443,14 @@ client_write(struct plugin_state *state, struct dispatch_client *client) len = client->client_outbuf_used; slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Attempting to send %d bytes to %d.\n", + "attempting to send %d bytes to %d\n", len, client->client_fd); count = write(client->client_fd, client->client_outbuf, len); if (count <= 0) { if ((count != -1) || (errno != EAGAIN)) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Error sending %d bytes to %d.\n", + "error sending %d bytes to %d\n", client->client_outbuf_used, client->client_fd); /* Fail, disconnect because we're out of sync. */ @@ -459,21 +459,21 @@ client_write(struct plugin_state *state, struct dispatch_client *client) return; } slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Sent %d bytes to %d.\n", count, client->client_fd); + "sent %d bytes to %d\n", count, client->client_fd); if (count == client->client_outbuf_used) { /* There's no more data to send. */ if (client->client_state == client_replying_final) { /* Done. Go back to reading next time. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Waiting for next query on %d.\n", + "waiting for next query on %d\n", client->client_fd); client_set_reading(state, client); } else { /* More to send, so ask for more reply data. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "Waiting for more data for %d.\n", + "waiting for more data for %d\n", client->client_fd); client->client_outbuf_used = 0; memset(&client_data, 0, sizeof(client_data)); diff --git a/src/dispatch.h b/src/dispatch.h index 6ebb079..97ade14 100644 --- a/src/dispatch.h +++ b/src/dispatch.h @@ -30,8 +30,8 @@ typedef bool_t (dispatch_reply_fragment)(struct plugin_state *state, XDR *reply_xdrs, char *reply_buf, bool_t first_fragment, bool_t last_fragment); -typedef bool_t (dispatch_reply)(struct plugin_state *state, - struct dispatch_client_data *cdata, - struct rpc_msg *reply, - XDR *reply_xdrs, char *reply_buf); +typedef void (dispatch_reply)(struct plugin_state *state, + struct dispatch_client_data *cdata, + struct rpc_msg *reply, + XDR *reply_xdrs, char *reply_buf); #endif @@ -353,6 +353,55 @@ nis_maplist(struct plugin_state *state, } } +/* Enumeration, if we want to break it down into chunks, happens in a few + * phases (given the protocol): + * 1. we're sending the first entry in a map + * 2. we're sending a not-the-first entry in a map + * 3. we're sending an end-of-map + */ +struct nis_all_cookie { + enum nis_all_cookie_state { + cookie_bad, + cookie_first, + cookie_next, + cookie_this, + cookie_end1, + cookie_end2, + } state; + unsigned int key_length; + char key[1]; +}; +static void +nis_all_free_cookie(struct nis_all_cookie *cookie) +{ + free(cookie); +} +static struct nis_all_cookie * +nis_all_make_cookie(enum nis_all_cookie_state state, + unsigned int length, const char *value) +{ + struct nis_all_cookie *cookie; + cookie = malloc(sizeof(*cookie) + ((length > 0) ? length : 0)); + if (cookie != NULL) { + cookie->state = state; + switch (cookie->state) { + case cookie_bad: + case cookie_first: + case cookie_end1: + case cookie_end2: + break; + case cookie_this: + case cookie_next: + cookie->key_length = length; + if ((length > 0) && (value != NULL)) { + memcpy(&cookie->key, value, length); + } + break; + } + } + return cookie; +} + static void nis_all(struct plugin_state *state, dispatch_reply_fragment *reply_fragment_fn, @@ -365,92 +414,303 @@ nis_all(struct plugin_state *state, struct ypreq_nokey req_nokey; keydat_t *reply_key; valdat_t *reply_val; - bool_t supported; + struct nis_all_cookie *cookie; + enum nis_all_cookie_state next_state; + bool_t supported, stop; memset(&req_nokey, 0, sizeof(req_nokey)); reply_key = &reply_all->ypresp_all_u.val.keydat; reply_val = &reply_all->ypresp_all_u.val.valdat; if (xdr_ypreq_nokey(request_xdrs, &req_nokey)) { + /* Take ownership of the cookie data. */ + if (continuation_cookie) { + if (*continuation_cookie != NULL) { + cookie = *continuation_cookie; + } else { + cookie = nis_all_make_cookie(cookie_bad, + 0, NULL); + } + *continuation_cookie = NULL; + } else { + cookie = nis_all_make_cookie(cookie_bad, + 0, NULL); + } + /* Check if we even support the map. */ if (!map_supports_map(state, req_nokey.domain, req_nokey.map, &supported) || !supported) { - /* No entries? No such map final status. */ + /* No entries? No-such-map final status. */ reply_all->more = TRUE; reply_all->ypresp_all_u.val.status = YP_NOMAP; reply_key->keydat_len = 0; reply_val->valdat_len = 0; - (*reply_fragment_fn)(state, cdata, - reply, reply_xdrs, reply_buf, - TRUE, FALSE); + /* Encode the reply header so that we can queue the + * entire reply as one block. */ + xdr_replymsg(reply_xdrs, reply); /* End of data. */ reply_all->more = FALSE; - xdr_setpos(reply_xdrs, 0); xdr_ypresp_all(reply_xdrs, reply_all); - (*reply_fragment_fn)(state, cdata, - reply, reply_xdrs, reply_buf, - FALSE, TRUE); - } else { - bool_t first; - reply_all->more = TRUE; - reply_all->ypresp_all_u.val.status = YP_TRUE; - reply_all->more = map_first(state, - req_nokey.domain, - req_nokey.map, - &reply_key->keydat_len, - &reply_key->keydat_val, - &reply_val->valdat_len, - &reply_val->valdat_val); - first = TRUE; - do { - /* Send back a result. If it's the first - * entry, also send back the reply header. */ + /* Queue the entire response. */ + if (!(*reply_fragment_fn)(state, cdata, + reply, reply_xdrs, reply_buf, + FALSE, TRUE)) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, - "all(%s/%s) -> (%.*s),%d\n", + "all(%s/%s) - error queueing " + "error response\n", req_nokey.domain, - req_nokey.map, - reply_key->keydat_len, - reply_key->keydat_val, - reply_all->more); - (*reply_fragment_fn)(state, cdata, - reply, - reply_xdrs, reply_buf, - first, FALSE); - first = FALSE; - /* Find the next entry. */ - reply_all->more = map_next(state, - req_nokey.domain, - req_nokey.map, - reply_key->keydat_len, - reply_key->keydat_val, - &reply_key->keydat_len, - &reply_key->keydat_val, - &reply_val->valdat_len, - &reply_val->valdat_val); - /* If we got an entry, encode it. */ - if (reply_all->more) { - xdr_setpos(reply_xdrs, 0); - xdr_ypresp_all(reply_xdrs, reply_all); - } - } while (reply_all->more); - /* Send the end-of-map marker. */ - reply_all->ypresp_all_u.val.status = YP_NOMORE; - reply_key->keydat_len = 0; - reply_val->valdat_len = 0; + req_nokey.map); + } + /* Don't return a cookie, if one was passed to us. */ + nis_all_free_cookie(cookie); + cookie = NULL; + } else + for (stop = FALSE; stop == FALSE;) { + bool_t found, skip; xdr_setpos(reply_xdrs, 0); - reply_all->more = TRUE; - xdr_ypresp_all(reply_xdrs, reply_all); - /* End of data. */ - reply_all->more = FALSE; - xdr_ypresp_all(reply_xdrs, reply_all); - /* Bundle those two chunks into one reply. */ - (*reply_fragment_fn)(state, cdata, - reply, reply_xdrs, reply_buf, - FALSE, TRUE); - slapi_log_error(SLAPI_LOG_PLUGIN, - state->plugin_desc->spd_id, - "all(%s/%s) done\n", - req_nokey.domain, req_nokey.map); + memset(reply_all, 0, sizeof(reply_all)); + /* Follow any instructions we left for this iteration. + */ + switch (cookie->state) { + case cookie_bad: + /* fall through */ + case cookie_first: + /* Read the first key in the map, and make the + * next state either be queuing the first item + * or queueing the end-of-map reply. */ + found = map_first(state, + req_nokey.domain, + req_nokey.map, + &reply_key->keydat_len, + &reply_key->keydat_val, + &reply_val->valdat_len, + &reply_val->valdat_val); + if (found) { + /* Next time grab the entry after this + * one. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) \"%.*s\"\n", + req_nokey.domain, + req_nokey.map, + reply_key->keydat_len, + reply_key->keydat_val); + skip = FALSE; + reply_all->more = TRUE; + reply_all->ypresp_all_u.val.status = YP_TRUE; + next_state = cookie_next; + } else { + /* Don't reply, just move to end-of-map + * state. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) no-first\n", + req_nokey.domain, + req_nokey.map); + skip = TRUE; + next_state = cookie_end1; + } + /* Try to queue the packet. */ + nis_all_free_cookie(cookie); + if (skip || + (*reply_fragment_fn)(state, cdata, + reply, + reply_xdrs, reply_buf, + TRUE, FALSE)) { + /* Leave a note to choose the next + * entry or send end1, whichever is + * appropriate. */ + cookie = nis_all_make_cookie(next_state, + reply_key->keydat_len, + reply_key->keydat_val); + } else { + /* Leave a note to try sending the + * first entry again. */ + cookie = nis_all_make_cookie(cookie_first, + 0, NULL); + stop = TRUE; + } + break; + case cookie_next: + /* Read the next key in the map, and set up the + * cookie to note that we're queuing a not- + * first item. */ + found = map_next(state, + req_nokey.domain, + req_nokey.map, + cookie->key_length, + cookie->key, + &reply_key->keydat_len, + &reply_key->keydat_val, + &reply_val->valdat_len, + &reply_val->valdat_val); + if (found) { + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) \"%.*s\"\n", + req_nokey.domain, + req_nokey.map, + reply_key->keydat_len, + reply_key->keydat_val); + /* Next time grab the entry after this + * one. */ + skip = FALSE; + reply_all->more = TRUE; + reply_all->ypresp_all_u.val.status = YP_TRUE; + next_state = cookie_next; + } else { + /* Don't reply, just move to end-of-map + * state. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) no-next\n", + req_nokey.domain, + req_nokey.map); + skip = TRUE; + next_state = cookie_end1; + } + /* Try to queue the packet. */ + if (skip || + (xdr_ypresp_all(reply_xdrs, reply_all) && + (*reply_fragment_fn)(state, cdata, + reply, + reply_xdrs, reply_buf, + FALSE, FALSE))) { + /* Leave a note to choose the next + * entry or send end1, whichever is + * appropriate. */ + nis_all_free_cookie(cookie); + cookie = nis_all_make_cookie(next_state, + reply_key->keydat_len, + reply_key->keydat_val); + } else { + /* Leave a note to retry sending this + * entry the next time. */ + nis_all_free_cookie(cookie); + cookie = nis_all_make_cookie(cookie_this, + reply_key->keydat_len, + reply_key->keydat_val); + stop = TRUE; + } + break; + case cookie_this: + /* Read the matching key in the map, and set up + * the cookie to note that we're queuing a not- + * first item. */ + reply_key->keydat_len = cookie->key_length; + reply_key->keydat_val = cookie->key; + found = map_match(state, + req_nokey.domain, + req_nokey.map, + reply_key->keydat_len, + reply_key->keydat_val, + &reply_val->valdat_len, + &reply_val->valdat_val); + if (found) { + /* Next time grab the entry after this + * one. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) \"%.*s\" " + "(retry)\n", + req_nokey.domain, + req_nokey.map, + cookie->key_length, + cookie->key); + skip = FALSE; + reply_all->more = TRUE; + reply_all->ypresp_all_u.val.status = YP_TRUE; + next_state = cookie_next; + } else { + /* Don't reply, just move to end-of-map + * state. */ + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) \"%.*s\" " + "(disappeared?)\n", + req_nokey.domain, + req_nokey.map, + cookie->key_length, + cookie->key); + skip = TRUE; + next_state = cookie_end1; + } + /* Try to queue the packet. */ + if (skip || + (xdr_ypresp_all(reply_xdrs, reply_all) && + (*reply_fragment_fn)(state, cdata, + reply, + reply_xdrs, reply_buf, + FALSE, FALSE))) { + /* Leave a note to choose the next + * entry or send end1, whichever is + * appropriate. */ + nis_all_free_cookie(cookie); + cookie = nis_all_make_cookie(next_state, + reply_key->keydat_len, + reply_key->keydat_val); + } else { + /* Leave a note to retry sending this + * entry the next time. But that's how + * we got here, so do nothing. */ + stop = TRUE; + } + break; + case cookie_end1: + /* Send the end-of-map message. */ + memset(reply_key, 0, sizeof(*reply_key)); + memset(reply_val, 0, sizeof(*reply_key)); + reply_all->more = TRUE; + reply_all->ypresp_all_u.val.status = YP_NOMORE; + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) end-of-map\n", + req_nokey.domain, + req_nokey.map); + if (xdr_ypresp_all(reply_xdrs, reply_all) && + (*reply_fragment_fn)(state, cdata, + reply, + reply_xdrs, reply_buf, + FALSE, FALSE)) { + /* Leave a note to finish the reply. */ + nis_all_free_cookie(cookie); + cookie = nis_all_make_cookie(cookie_end2, + 0, NULL); + } else { + /* Leave the note alone, so that we'll + * have to try again. */ + stop = TRUE; + } + break; + case cookie_end2: + /* Send the final message. */ + reply_all->more = FALSE; + slapi_log_error(SLAPI_LOG_PLUGIN, + state->plugin_desc->spd_id, + "all(%s/%s) done\n", + req_nokey.domain, + req_nokey.map); + if (xdr_ypresp_all(reply_xdrs, reply_all) && + (*reply_fragment_fn)(state, cdata, + reply, + reply_xdrs, reply_buf, + FALSE, TRUE)) { + /* We're done. */ + nis_all_free_cookie(cookie); + cookie = NULL; + } else { + /* Leave the note alone, so that we'll + * have to try again. */ + } + stop = TRUE; + break; + } + } + /* Return the cookie if we can, else destroy it. */ + if (continuation_cookie) { + *continuation_cookie = cookie; + } else { + nis_all_free_cookie(cookie); } } else { /* XXX */ |
