/* * Copyright 2008 Red Hat, Inc. * * This Program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; version 2 of the License. * * This Program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this Program; if not, write to the * * Free Software Foundation, Inc. * 59 Temple Place, Suite 330 * Boston, MA 02111-1307 USA * */ #ifdef HAVE_CONFIG_H #include "../config.h" #endif #include #include #include #include #include #include #include #include #include #include #include #ifdef HAVE_DIRSRV_SLAPI_PLUGIN_H #include #include #include #else #include #endif #include #include #include #include "plugin.h" #include "dispatch.h" #include "nis.h" #include "portmap.h" #define MAX_CLIENT_IDLE (60 * 1000) /* Handle all incoming data. */ struct dispatch_client { /* The client socket and address. */ int client_fd; struct sockaddr client_addr; socklen_t client_addrlen; /* The client state. */ enum { client_invalid, client_closing, client_reading, client_replying_with_more, client_replying_final, } client_state; /* The client's request while we're reading it. */ char client_inbuf[4096]; ssize_t client_inbuf_used; char *client_query; ssize_t client_query_size; void *client_query_cookie; /* The reply to the client, when we're sending one. */ char client_outbuf[4096]; ssize_t client_outbuf_used; /* This is a linked list. */ struct dispatch_client *client_next; }; struct dispatch_client_data { struct dispatch_client *connected; struct { int client_fd; struct sockaddr client_addr; socklen_t client_addrlen; } dgram; }; /* Send a reply, unbuffered datagram version. */ static bool_t dispatch_reply_fragment_dgram(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf, bool_t first_fragment, bool_t last_fragment) { xdr_replymsg(reply_xdrs, reply); if (!first_fragment || !last_fragment) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "trying to sending datagram reply (%d bytes), " "even though the reply is not suitable for " "transmission as a datagram\n", xdr_getpos(reply_xdrs)); } else { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "sending datagram reply (%d bytes)\n", xdr_getpos(reply_xdrs)); } sendto(cdata->dgram.client_fd, reply_buf, xdr_getpos(reply_xdrs), 0, &cdata->dgram.client_addr, cdata->dgram.client_addrlen); return TRUE; } static void dispatch_reply_dgram(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf) { dispatch_reply_fragment_dgram(state, cdata, reply, reply_xdrs, reply_buf, TRUE, TRUE); } static bool_t dispatch_reply_fragment_connected(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf, bool_t first_fragment, bool_t last_fragment) { uint32_t len; size_t next_size; /* Record reply - first fragment. */ if (first_fragment) { xdr_replymsg(reply_xdrs, reply); } /* If we don't have space for the data, stop now. */ next_size = cdata->connected->client_outbuf_used + 4 + xdr_getpos(reply_xdrs); if (next_size > sizeof(cdata->connected->client_outbuf)) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "failed to queue stream reply (4+%d bytes)!n", xdr_getpos(reply_xdrs)); return FALSE; } /* Calculate the fragment length bytes. */ len = htonl(xdr_getpos(reply_xdrs) | (last_fragment ? 0x80000000 : 0)); /* Queue the data. */ memcpy(cdata->connected->client_outbuf + cdata->connected->client_outbuf_used, &len, 4); memcpy(cdata->connected->client_outbuf + cdata->connected->client_outbuf_used + 4, reply_buf, xdr_getpos(reply_xdrs)); cdata->connected->client_outbuf_used += (4 + xdr_getpos(reply_xdrs)); slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "queued stream reply (4+%d bytes), %d total in queue\n", xdr_getpos(reply_xdrs), cdata->connected->client_outbuf_used); return TRUE; } /* Send an entire reply record at once. */ static void dispatch_reply_connected(struct plugin_state *state, struct dispatch_client_data *cdata, struct rpc_msg *reply, XDR *reply_xdrs, char *reply_buf) { dispatch_reply_fragment_connected(state, cdata, reply, reply_xdrs, reply_buf, TRUE, TRUE); } /* Handle a datagram client -- read the request and handle it immediately. */ static void dispatch_dgram(struct plugin_state *state, int fd) { struct dispatch_client_data cdata; char dgram[65536]; int reqsize; /* Read the request. */ cdata.dgram.client_fd = fd; cdata.dgram.client_addrlen = sizeof(cdata.dgram.client_addr); reqsize = recvfrom(cdata.dgram.client_fd, dgram, sizeof(dgram), 0, &cdata.dgram.client_addr, &cdata.dgram.client_addrlen); slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "datagram request (%d bytes)\n", reqsize); /* Handle the request. */ nis_process_request(state, dgram, reqsize, &dispatch_reply_fragment_dgram, &dispatch_reply_dgram, &cdata, NULL); } /* Set the client's record up to start reading a new query. */ static void client_set_reading(struct plugin_state *state, struct dispatch_client *client) { client->client_inbuf_used = 0; free(client->client_query); client->client_query = NULL; client->client_query_size = 0; client->client_outbuf_used = 0; client->client_state = client_reading; } /* Set the client's record up to be cleaned up. */ static void client_set_closing(struct plugin_state *state, struct dispatch_client *client) { client->client_inbuf_used = 0; free(client->client_query); client->client_query = NULL; client->client_query_size = 0; client->client_outbuf_used = 0; client->client_state = client_closing; } /* Set the client's record up. */ static struct dispatch_client * dispatch_accept_client(struct plugin_state *state, int fd) { struct dispatch_client *client; int flags; client = malloc(sizeof(*client)); if (client == NULL) { return NULL; } fd = accept(fd, &client->client_addr, &client->client_addrlen); if (fd == -1) { free(client); return NULL; } flags = fcntl(fd, F_GETFL); if ((flags & O_NONBLOCK) == 0) { fcntl(fd, F_SETFL, flags | O_NONBLOCK); } slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "new connected client on %d\n", fd); memset(client, 0, sizeof(*client)); client->client_fd = fd; client_set_reading(state, client); return client; } /* Decide what to do next. */ static void client_interpret_nis_result(struct plugin_state *state, struct dispatch_client *client) { if (client->client_query_cookie != NULL) { client->client_state = client_replying_with_more; } else { if (client->client_outbuf_used > 0) { client->client_state = client_replying_final; } else { client_set_reading(state, client); } } } /* Handle reading state. */ static void client_read(struct plugin_state *state, struct dispatch_client *client) { ssize_t count; int32_t len, nlen; int last; char *query; struct dispatch_client_data client_data; /* Try to read some data. */ count = read(client->client_fd, client->client_inbuf + client->client_inbuf_used, sizeof(client->client_inbuf) - client->client_inbuf_used); if (count <= 0) { if ((count != -1) || (errno != EAGAIN)) { /* Disconnect the client. */ if (count == 0) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "no more data from %d, " "marking for closing\n", client->client_fd); } else { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "error reading from %d, " "marking for closing\n", client->client_fd); } client_set_closing(state, client); } } else { /* Record the data as added to the fragment buffer. */ client->client_inbuf_used += count; slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "have %d bytes from %d\n", client->client_inbuf_used, client->client_fd); /* Check if we've got a complete fragment. */ if (client->client_inbuf_used < 4) { /* We don't even have a length, so continue reading. */ return; } /* Read the length of the first fragment in the buffer. */ memcpy(&nlen, client->client_inbuf, 4); len = ntohl(nlen); last = ((len & 0x80000000) != 0); len &= 0x7fffffff; slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "fragment is %d bytes long%s, " "have %d bytes pending, on %d\n", len, last ? " (last one)" : "", client->client_inbuf_used, client->client_fd); if ((len + 4) <= client->client_inbuf_used) { /* Got at least one fragment! */ nlen = len + client->client_query_size; query = malloc(nlen); if (query == NULL) { /* Out of memory, we'll have to try again * later. */ return; } /* Copy any previously-received fragments and append * this one. */ if (client->client_query_size > 0) { memcpy(query, client->client_query, client->client_query_size); } memcpy(query + client->client_query_size, client->client_inbuf + 4, len); /* Save the new query-in-progress. */ free(client->client_query); client->client_query = query; client->client_query_size = nlen; /* Drop the fragment from the incoming * buffer. */ memmove(client->client_inbuf, client->client_inbuf + (len + 4), client->client_inbuf_used - (len + 4)); client->client_inbuf_used -= (len + 4); } if (last) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "query is %d bytes long on %d\n", client->client_query_size, client->client_fd); /* We have a complete query. Pass it on down. */ memset(&client_data, 0, sizeof(client_data)); client_data.connected = client; nis_process_request(state, client->client_query, client->client_query_size, &dispatch_reply_fragment_connected, &dispatch_reply_connected, &client_data, &client->client_query_cookie); /* Decide what to do next. */ client_interpret_nis_result(state, client); } } } /* Handle replying states. */ static void client_write(struct plugin_state *state, struct dispatch_client *client) { ssize_t count; int32_t len, nlen; int last; char *query; struct dispatch_client_data client_data; /* Try to send some of the pending data. */ len = client->client_outbuf_used; slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "attempting to send %d bytes to %d\n", len, client->client_fd); count = write(client->client_fd, client->client_outbuf, len); if (count <= 0) { if ((count != -1) || (errno != EAGAIN)) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "error sending %d bytes to %d\n", client->client_outbuf_used, client->client_fd); /* Fail, disconnect because we're out of sync. */ client_set_closing(state, client); } return; } slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "sent %d bytes to %d\n", count, client->client_fd); if (count == client->client_outbuf_used) { /* There's no more data to send. */ if (client->client_state == client_replying_final) { /* Done. Go back to reading next time. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "waiting for next query on %d\n", client->client_fd); client_set_reading(state, client); } else { /* More to send, so ask for more reply data. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "waiting for more data for %d\n", client->client_fd); client->client_outbuf_used = 0; memset(&client_data, 0, sizeof(client_data)); client_data.connected = client; nis_process_request(state, client->client_query, client->client_query_size, &dispatch_reply_fragment_connected, &dispatch_reply_connected, &client_data, &client->client_query_cookie); client_interpret_nis_result(state, client); } } else { /* Partial write, adjust outgoing buffer for next time. */ memmove(client->client_outbuf, client->client_outbuf + count, client->client_outbuf_used - count); client->client_outbuf_used -= count; slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "%d bytes to go for %d\n", client->client_outbuf_used, client->client_fd); } } static void dispatch_service_client(struct plugin_state *state, struct dispatch_client *client, struct pollfd *fd) { ssize_t count; int32_t len, nlen; int last; char *query; switch (client->client_state) { case client_reading: if (fd->revents & POLLIN) { client_read(state, client); } else { client_set_closing(state, client); } break; case client_replying_with_more: case client_replying_final: if (fd->revents & POLLOUT) { client_write(state, client); } else { client_set_closing(state, client); } break; case client_closing: case client_invalid: /* never reached */ assert(0); break; } } void * dispatch_thread(struct wrapped_thread *t) { struct dispatch_client *clients, *client, *next, **list; struct dispatch_client_data client_data; struct plugin_state *state = wrap_thread_arg(t); struct pollfd *fds; int i, n_fds, client_count; ssize_t count; bool_t stop; clients = NULL; client_count = 0; fds = NULL; for (stop = FALSE; !stop;) { /* Prune out recently-disconnected clients. */ list = &clients; while (*list != NULL) { client = *list; next = client->client_next; if (client->client_state == client_closing) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "pruning client %d\n", client->client_fd); if (client->client_fd != -1) { close(client->client_fd); } free(client); *list = next; } else { list = &(client->client_next); } } /* Count the number of connected clients we have. */ client = clients; i = 0; while (client != NULL) { next = client->client_next; client = next; i++; } /* If the "fds" block isn't big enough (or doesn't exist yet), * reallocate it. */ if (i > client_count) { free(fds); fds = NULL; client_count = i; } if (fds == NULL) { fds = malloc((state->n_listeners + client_count + 1) * sizeof(fds[0])); if (fds == NULL) { /* Wait a bit, then try again? */ poll(NULL, 0, 10000); continue; } } slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "%d connected clients\n", i); /* Fill in the set of polling descriptors. */ memset(fds, 0, sizeof((state->n_listeners + client_count + 1) * sizeof(fds[0]))); for (i = 0; i < state->n_listeners; i++) { fds[i].fd = state->listener[i].fd; fds[i].events = POLLIN; } /* Add the shutdown pipe reader. */ fds[i].fd = wrap_thread_stopfd(t); fds[i].events = POLLIN; if (fds[i].fd != -1) { i++; } /* Add the client list. */ client = clients; while (client != NULL) { fds[i].fd = client->client_fd; switch (client->client_state) { case client_reading: fds[i].events = POLLIN; break; case client_replying_with_more: case client_replying_final: fds[i].events = POLLOUT; break; case client_closing: case client_invalid: /* shouldn't happen */ assert(0); break; } client = client->client_next; i++; } /* Check for status updates. */ slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "listening\n"); n_fds = i; switch (poll(fds, n_fds, -1)) { case -1: slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "done waiting\n"); return NULL; break; case 0: slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "no request(timeout?)\n"); continue; default: break; } /* Save the head of the existing clients list. */ client = clients; /* Iterate over listening sockets which have work for us. */ for (i = 0; i < state->n_listeners; i++) { if ((fds[i].revents & POLLIN) == 0) { continue; } switch (state->listener[i].type) { case SOCK_DGRAM: /* Datagram requests we handle right * here, right now. */ dispatch_dgram(state, fds[i].fd); break; case SOCK_STREAM: /* Try to accept a new client. */ next = dispatch_accept_client(state, fds[i].fd); if (next != NULL) { slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "new client on %d\n", next->client_fd); next->client_next = clients; clients = next; } break; default: /* never reached */ assert(0); break; } } /* Add the shutdown pipe reader. */ if (fds[i].revents & POLLIN) { stop = TRUE; } i++; /* Service the already-connected clients. */ for (; i < n_fds; i++, client = client->client_next) { assert(client != NULL); assert(client->client_fd == fds[i].fd); dispatch_service_client(state, client, &fds[i]); } } slapi_log_error(SLAPI_LOG_PLUGIN, state->plugin_desc->spd_id, "listening thread stopping\n"); return state; }