diff options
author | Stefan Metzmacher <metze@samba.org> | 2013-11-13 14:17:32 +0100 |
---|---|---|
committer | Michael Adam <obnox@samba.org> | 2013-11-13 14:18:52 +0100 |
commit | f087a8e2b81fae82fa571ef09d2d1cb682cc8ff8 (patch) | |
tree | d4a6b14a1594958150b733180d995b5c32025017 /ctdb/lib/tevent/tevent_poll.c | |
parent | e80a5aba3db8e81173fa443991e08ef4a300ea5c (diff) | |
parent | 25f3c8b5269863aadbe72e304da7012782ef5b25 (diff) | |
download | samba-f087a8e2b81fae82fa571ef09d2d1cb682cc8ff8.tar.gz samba-f087a8e2b81fae82fa571ef09d2d1cb682cc8ff8.tar.xz samba-f087a8e2b81fae82fa571ef09d2d1cb682cc8ff8.zip |
Merge branch 'master' of ctdb into 'master' of samba
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Signed-off-by: Michael Adam <obnox@samba.org>
Diffstat (limited to 'ctdb/lib/tevent/tevent_poll.c')
-rw-r--r-- | ctdb/lib/tevent/tevent_poll.c | 725 |
1 files changed, 725 insertions, 0 deletions
diff --git a/ctdb/lib/tevent/tevent_poll.c b/ctdb/lib/tevent/tevent_poll.c new file mode 100644 index 0000000000..92fcc441ac --- /dev/null +++ b/ctdb/lib/tevent/tevent_poll.c @@ -0,0 +1,725 @@ +/* + Unix SMB/CIFS implementation. + main select loop and event handling + Copyright (C) Andrew Tridgell 2003-2005 + Copyright (C) Stefan Metzmacher 2005-2009 + + ** NOTE! The following LGPL license applies to the tevent + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see <http://www.gnu.org/licenses/>. +*/ + +#include "replace.h" +#include "system/filesys.h" +#include "system/select.h" +#include "tevent.h" +#include "tevent_util.h" +#include "tevent_internal.h" + +struct poll_event_context { + /* a pointer back to the generic event_context */ + struct tevent_context *ev; + + /* + * A DLIST for fresh fde's added by poll_event_add_fd but not + * picked up yet by poll_event_loop_once + */ + struct tevent_fd *fresh; + /* + * A DLIST for disabled fde's. + */ + struct tevent_fd *disabled; + /* + * one or more events were deleted or disabled + */ + bool deleted; + + /* + * These two arrays are maintained together. + */ + struct pollfd *fds; + struct tevent_fd **fdes; + unsigned num_fds; + + /* + * Signal fd to wake the poll() thread + */ + int signal_fd; + + /* information for exiting from the event loop */ + int exit_code; +}; + +static int poll_event_context_destructor(struct poll_event_context *poll_ev) +{ + struct tevent_fd *fd, *fn; + + for (fd = poll_ev->fresh; fd; fd = fn) { + fn = fd->next; + fd->event_ctx = NULL; + DLIST_REMOVE(poll_ev->fresh, fd); + } + + for (fd = poll_ev->disabled; fd; fd = fn) { + fn = fd->next; + fd->event_ctx = NULL; + DLIST_REMOVE(poll_ev->disabled, fd); + } + + if (poll_ev->signal_fd == -1) { + /* + * Non-threaded, no signal pipe + */ + return 0; + } + + close(poll_ev->signal_fd); + poll_ev->signal_fd = -1; + + if (poll_ev->num_fds == 0) { + return 0; + } + if (poll_ev->fds[0].fd != -1) { + close(poll_ev->fds[0].fd); + poll_ev->fds[0].fd = -1; + } + return 0; +} + +/* + create a poll_event_context structure. +*/ +static int poll_event_context_init(struct tevent_context *ev) +{ + struct poll_event_context *poll_ev; + + /* + * we might be called during tevent_re_initialise() + * which means we need to free our old additional_data + * in order to detach old fd events from the + * poll_ev->fresh list + */ + TALLOC_FREE(ev->additional_data); + + poll_ev = talloc_zero(ev, struct poll_event_context); + if (poll_ev == NULL) { + return -1; + } + poll_ev->ev = ev; + poll_ev->signal_fd = -1; + ev->additional_data = poll_ev; + talloc_set_destructor(poll_ev, poll_event_context_destructor); + return 0; +} + +static bool set_nonblock(int fd) +{ + int val; + + val = fcntl(fd, F_GETFL, 0); + if (val == -1) { + return false; + } + val |= O_NONBLOCK; + + return (fcntl(fd, F_SETFL, val) != -1); +} + +static int poll_event_context_init_mt(struct tevent_context *ev) +{ + struct poll_event_context *poll_ev; + struct pollfd *pfd; + int fds[2]; + int ret; + + ret = poll_event_context_init(ev); + if (ret == -1) { + return ret; + } + + poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + poll_ev->fds = talloc_zero(poll_ev, struct pollfd); + if (poll_ev->fds == NULL) { + return -1; + } + + ret = pipe(fds); + if (ret == -1) { + return -1; + } + + if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) { + close(fds[0]); + close(fds[1]); + return -1; + } + + poll_ev->signal_fd = fds[1]; + + pfd = &poll_ev->fds[0]; + pfd->fd = fds[0]; + pfd->events = (POLLIN|POLLHUP); + + poll_ev->num_fds = 1; + + talloc_set_destructor(poll_ev, poll_event_context_destructor); + + return 0; +} + +static void poll_event_wake_pollthread(struct poll_event_context *poll_ev) +{ + char c; + ssize_t ret; + + if (poll_ev->signal_fd == -1) { + return; + } + c = 0; + do { + ret = write(poll_ev->signal_fd, &c, sizeof(c)); + } while ((ret == -1) && (errno == EINTR)); +} + +static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev) +{ + char buf[16]; + ssize_t ret; + int fd; + + if (poll_ev->signal_fd == -1) { + return; + } + + if (poll_ev->num_fds < 1) { + return; + } + fd = poll_ev->fds[0].fd; + + do { + ret = read(fd, buf, sizeof(buf)); + } while (ret == sizeof(buf)); +} + +/* + destroy an fd_event +*/ +static int poll_event_fd_destructor(struct tevent_fd *fde) +{ + struct tevent_context *ev = fde->event_ctx; + struct poll_event_context *poll_ev; + uint64_t del_idx = fde->additional_flags; + + if (ev == NULL) { + goto done; + } + + poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + if (del_idx == UINT64_MAX) { + struct tevent_fd **listp = + (struct tevent_fd **)fde->additional_data; + + DLIST_REMOVE((*listp), fde); + goto done; + } + + poll_ev->fdes[del_idx] = NULL; + poll_ev->deleted = true; + poll_event_wake_pollthread(poll_ev); +done: + return tevent_common_fd_destructor(fde); +} + +static void poll_event_schedule_immediate(struct tevent_immediate *im, + struct tevent_context *ev, + tevent_immediate_handler_t handler, + void *private_data, + const char *handler_name, + const char *location) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + tevent_common_schedule_immediate(im, ev, handler, private_data, + handler_name, location); + poll_event_wake_pollthread(poll_ev); +} + +/* + Private function called by "standard" backend fallback. + Note this only allows fallback to "poll" backend, not "poll-mt". +*/ +_PRIVATE_ void tevent_poll_event_add_fd_internal(struct tevent_context *ev, + struct tevent_fd *fde) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + struct tevent_fd **listp; + + if (fde->flags != 0) { + listp = &poll_ev->fresh; + } else { + listp = &poll_ev->disabled; + } + + fde->additional_flags = UINT64_MAX; + fde->additional_data = listp; + + DLIST_ADD((*listp), fde); + talloc_set_destructor(fde, poll_event_fd_destructor); +} + +/* + add a fd based event + return NULL on failure (memory allocation error) +*/ +static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev, + TALLOC_CTX *mem_ctx, + int fd, uint16_t flags, + tevent_fd_handler_t handler, + void *private_data, + const char *handler_name, + const char *location) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + struct tevent_fd *fde; + + if (fd < 0) { + return NULL; + } + + fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd); + if (fde == NULL) { + return NULL; + } + fde->event_ctx = ev; + fde->fd = fd; + fde->flags = flags; + fde->handler = handler; + fde->close_fn = NULL; + fde->private_data = private_data; + fde->handler_name = handler_name; + fde->location = location; + fde->additional_flags = UINT64_MAX; + fde->additional_data = NULL; + + tevent_poll_event_add_fd_internal(ev, fde); + poll_event_wake_pollthread(poll_ev); + + /* + * poll_event_loop_poll will take care of the rest in + * poll_event_setup_fresh + */ + return fde; +} + +/* + set the fd event flags +*/ +static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags) +{ + struct tevent_context *ev = fde->event_ctx; + struct poll_event_context *poll_ev; + uint64_t idx = fde->additional_flags; + uint16_t pollflags; + + if (ev == NULL) { + return; + } + poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + fde->flags = flags; + + if (idx == UINT64_MAX) { + struct tevent_fd **listp = + (struct tevent_fd **)fde->additional_data; + + /* + * We move it between the fresh and disabled lists. + */ + DLIST_REMOVE((*listp), fde); + tevent_poll_event_add_fd_internal(ev, fde); + poll_event_wake_pollthread(poll_ev); + return; + } + + if (fde->flags == 0) { + /* + * We need to remove it from the array + * and move it to the disabled list. + */ + poll_ev->fdes[idx] = NULL; + poll_ev->deleted = true; + DLIST_REMOVE(ev->fd_events, fde); + tevent_poll_event_add_fd_internal(ev, fde); + poll_event_wake_pollthread(poll_ev); + return; + } + + pollflags = 0; + + if (flags & TEVENT_FD_READ) { + pollflags |= (POLLIN|POLLHUP); + } + if (flags & TEVENT_FD_WRITE) { + pollflags |= (POLLOUT); + } + poll_ev->fds[idx].events = pollflags; + + poll_event_wake_pollthread(poll_ev); +} + +static bool poll_event_setup_fresh(struct tevent_context *ev, + struct poll_event_context *poll_ev) +{ + struct tevent_fd *fde, *next; + unsigned num_fresh, num_fds; + + if (poll_ev->deleted) { + unsigned first_fd = (poll_ev->signal_fd != -1) ? 1 : 0; + unsigned i; + + for (i=first_fd; i < poll_ev->num_fds;) { + fde = poll_ev->fdes[i]; + if (fde != NULL) { + i++; + continue; + } + + /* + * This fde was talloc_free()'ed. Delete it + * from the arrays + */ + poll_ev->num_fds -= 1; + if (poll_ev->num_fds == i) { + break; + } + poll_ev->fds[i] = poll_ev->fds[poll_ev->num_fds]; + poll_ev->fdes[i] = poll_ev->fdes[poll_ev->num_fds]; + if (poll_ev->fdes[i] != NULL) { + poll_ev->fdes[i]->additional_flags = i; + } + } + } + poll_ev->deleted = false; + + if (poll_ev->fresh == NULL) { + return true; + } + + num_fresh = 0; + for (fde = poll_ev->fresh; fde; fde = fde->next) { + num_fresh += 1; + } + num_fds = poll_ev->num_fds + num_fresh; + + /* + * We check the length of fdes here. It is the last one + * enlarged, so if the realloc for poll_fd->fdes fails, + * poll_fd->fds will have at least the size of poll_fd->fdes + */ + + if (num_fds >= talloc_array_length(poll_ev->fdes)) { + struct pollfd *tmp_fds; + struct tevent_fd **tmp_fdes; + unsigned array_length; + + array_length = (num_fds + 15) & ~15; /* round up to 16 */ + + tmp_fds = talloc_realloc( + poll_ev, poll_ev->fds, struct pollfd, array_length); + if (tmp_fds == NULL) { + return false; + } + poll_ev->fds = tmp_fds; + + tmp_fdes = talloc_realloc( + poll_ev, poll_ev->fdes, struct tevent_fd *, + array_length); + if (tmp_fdes == NULL) { + return false; + } + poll_ev->fdes = tmp_fdes; + } + + for (fde = poll_ev->fresh; fde; fde = next) { + struct pollfd *pfd; + + pfd = &poll_ev->fds[poll_ev->num_fds]; + + pfd->fd = fde->fd; + pfd->events = 0; + pfd->revents = 0; + + if (fde->flags & TEVENT_FD_READ) { + pfd->events |= (POLLIN|POLLHUP); + } + if (fde->flags & TEVENT_FD_WRITE) { + pfd->events |= (POLLOUT); + } + + fde->additional_flags = poll_ev->num_fds; + poll_ev->fdes[poll_ev->num_fds] = fde; + + next = fde->next; + DLIST_REMOVE(poll_ev->fresh, fde); + DLIST_ADD(ev->fd_events, fde); + + poll_ev->num_fds += 1; + } + return true; +} + +/* + event loop handling using poll() +*/ +static int poll_event_loop_poll(struct tevent_context *ev, + struct timeval *tvalp) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + int pollrtn; + int timeout = -1; + int poll_errno; + struct tevent_fd *fde = NULL; + unsigned i; + + if (ev->signal_events && tevent_common_check_signal(ev)) { + return 0; + } + + if (tvalp != NULL) { + timeout = tvalp->tv_sec * 1000; + timeout += (tvalp->tv_usec + 999) / 1000; + } + + poll_event_drain_signal_fd(poll_ev); + + if (!poll_event_setup_fresh(ev, poll_ev)) { + return -1; + } + + tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_BEFORE_WAIT); + pollrtn = poll(poll_ev->fds, poll_ev->num_fds, timeout); + poll_errno = errno; + tevent_trace_point_callback(poll_ev->ev, TEVENT_TRACE_AFTER_WAIT); + + if (pollrtn == -1 && poll_errno == EINTR && ev->signal_events) { + tevent_common_check_signal(ev); + return 0; + } + + if (pollrtn == 0 && tvalp) { + /* we don't care about a possible delay here */ + tevent_common_loop_timer_delay(ev); + return 0; + } + + if (pollrtn <= 0) { + /* + * No fd's ready + */ + return 0; + } + + /* at least one file descriptor is ready - check + which ones and call the handler, being careful to allow + the handler to remove itself when called */ + + for (fde = ev->fd_events; fde; fde = fde->next) { + unsigned idx = fde->additional_flags; + struct pollfd *pfd; + uint16_t flags = 0; + + if (idx == UINT64_MAX) { + continue; + } + + pfd = &poll_ev->fds[idx]; + + if (pfd->revents & POLLNVAL) { + /* + * the socket is dead! this should never + * happen as the socket should have first been + * made readable and that should have removed + * the event, so this must be a bug. + * + * We ignore it here to match the epoll + * behavior. + */ + tevent_debug(ev, TEVENT_DEBUG_ERROR, + "POLLNVAL on fde[%p] fd[%d] - disabling\n", + fde, pfd->fd); + poll_ev->fdes[idx] = NULL; + poll_ev->deleted = true; + DLIST_REMOVE(ev->fd_events, fde); + fde->event_ctx = NULL; + continue; + } + + if (pfd->revents & (POLLHUP|POLLERR)) { + /* If we only wait for TEVENT_FD_WRITE, we + should not tell the event handler about it, + and remove the writable flag, as we only + report errors when waiting for read events + to match the select behavior. */ + if (!(fde->flags & TEVENT_FD_READ)) { + TEVENT_FD_NOT_WRITEABLE(fde); + continue; + } + flags |= TEVENT_FD_READ; + } + if (pfd->revents & POLLIN) { + flags |= TEVENT_FD_READ; + } + if (pfd->revents & POLLOUT) { + flags |= TEVENT_FD_WRITE; + } + /* + * Note that fde->flags could be changed when using + * the poll_mt backend together with threads, + * that why we need to check pfd->revents and fde->flags + */ + flags &= fde->flags; + if (flags != 0) { + DLIST_DEMOTE(ev->fd_events, fde, struct tevent_fd); + fde->handler(ev, fde, flags, fde->private_data); + return 0; + } + } + + for (i = 0; i < poll_ev->num_fds; i++) { + if (poll_ev->fds[i].revents & POLLNVAL) { + /* + * the socket is dead! this should never + * happen as the socket should have first been + * made readable and that should have removed + * the event, so this must be a bug or + * a race in the poll_mt usage. + */ + fde = poll_ev->fdes[i]; + tevent_debug(ev, TEVENT_DEBUG_WARNING, + "POLLNVAL on dangling fd[%d] fde[%p] - disabling\n", + poll_ev->fds[i].fd, fde); + poll_ev->fdes[i] = NULL; + poll_ev->deleted = true; + if (fde != NULL) { + DLIST_REMOVE(ev->fd_events, fde); + fde->event_ctx = NULL; + } + } + } + + return 0; +} + +/* + do a single event loop using the events defined in ev +*/ +static int poll_event_loop_once(struct tevent_context *ev, + const char *location) +{ + struct timeval tval; + + if (ev->signal_events && + tevent_common_check_signal(ev)) { + return 0; + } + + if (ev->immediate_events && + tevent_common_loop_immediate(ev)) { + return 0; + } + + tval = tevent_common_loop_timer_delay(ev); + if (tevent_timeval_is_zero(&tval)) { + return 0; + } + + return poll_event_loop_poll(ev, &tval); +} + +static int poll_event_loop_wait(struct tevent_context *ev, + const char *location) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + /* + * loop as long as we have events pending + */ + while (ev->fd_events || + ev->timer_events || + ev->immediate_events || + ev->signal_events || + poll_ev->fresh || + poll_ev->disabled) { + int ret; + ret = _tevent_loop_once(ev, location); + if (ret != 0) { + tevent_debug(ev, TEVENT_DEBUG_FATAL, + "_tevent_loop_once() failed: %d - %s\n", + ret, strerror(errno)); + return ret; + } + } + + tevent_debug(ev, TEVENT_DEBUG_WARNING, + "poll_event_loop_wait() out of events\n"); + return 0; +} + +static const struct tevent_ops poll_event_ops = { + .context_init = poll_event_context_init, + .add_fd = poll_event_add_fd, + .set_fd_close_fn = tevent_common_fd_set_close_fn, + .get_fd_flags = tevent_common_fd_get_flags, + .set_fd_flags = poll_event_set_fd_flags, + .add_timer = tevent_common_add_timer_v2, + .schedule_immediate = tevent_common_schedule_immediate, + .add_signal = tevent_common_add_signal, + .loop_once = poll_event_loop_once, + .loop_wait = poll_event_loop_wait, +}; + +_PRIVATE_ bool tevent_poll_init(void) +{ + return tevent_register_backend("poll", &poll_event_ops); +} + +static const struct tevent_ops poll_event_mt_ops = { + .context_init = poll_event_context_init_mt, + .add_fd = poll_event_add_fd, + .set_fd_close_fn = tevent_common_fd_set_close_fn, + .get_fd_flags = tevent_common_fd_get_flags, + .set_fd_flags = poll_event_set_fd_flags, + .add_timer = tevent_common_add_timer_v2, + .schedule_immediate = poll_event_schedule_immediate, + .add_signal = tevent_common_add_signal, + .loop_once = poll_event_loop_once, + .loop_wait = poll_event_loop_wait, +}; + +_PRIVATE_ bool tevent_poll_mt_init(void) +{ + return tevent_register_backend("poll_mt", &poll_event_mt_ops); +} |