diff options
-rw-r--r-- | server/red_channel.c | 3 | ||||
-rw-r--r-- | server/red_channel.h | 1 | ||||
-rw-r--r-- | server/red_worker.c | 106 | ||||
-rw-r--r-- | server/reds.c | 1 |
4 files changed, 69 insertions, 42 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 59e6af6d..5b8027cb 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -1191,6 +1191,9 @@ void red_channel_client_disconnect(RedChannelClient *rcc) return; } red_channel_client_pipe_clear(rcc); + if (rcc->channel->channel_cbs.pre_disconnect) { + rcc->channel->channel_cbs.pre_disconnect(rcc); + } reds_stream_free(rcc->stream); rcc->stream = NULL; red_channel_remove_client(rcc); diff --git a/server/red_channel.h b/server/red_channel.h index 543aec18..a360a302 100644 --- a/server/red_channel.h +++ b/server/red_channel.h @@ -182,6 +182,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base); */ typedef struct { channel_configure_socket_proc config_socket; + channel_disconnect_proc pre_disconnect; channel_disconnect_proc on_disconnect; channel_send_pipe_item_proc send_item; channel_hold_pipe_item_proc hold_item; diff --git a/server/red_worker.c b/server/red_worker.c index 3973f3e9..e88dbc05 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -31,7 +31,6 @@ #include <stdio.h> #include <stdarg.h> -#include <sys/epoll.h> #include <fcntl.h> #include <sys/socket.h> #include <netinet/in.h> @@ -39,6 +38,7 @@ #include <errno.h> #include <string.h> #include <unistd.h> +#include <poll.h> #include <pthread.h> #include <netinet/tcp.h> #include <setjmp.h> @@ -231,12 +231,12 @@ double inline stat_byte_to_mega(uint64_t size) #define stat_compress_add(a, b, c, d) #endif -#define MAX_EPOLL_SOURCES 10 +#define MAX_EVENT_SOURCES 20 #define INF_EVENT_WAIT ~0 typedef struct EventListener EventListener; -typedef void (*event_listener_action_proc)(EventListener *ctx, uint32_t events); +typedef void (*event_listener_action_proc)(EventListener *ctx, struct pollfd *pfd); typedef void (*event_listener_free_proc)(EventListener *ctx); struct EventListener { uint32_t refs; @@ -877,7 +877,8 @@ typedef struct RedWorker { int id; int running; uint32_t *pending; - int epoll; + struct pollfd poll_fds[MAX_EVENT_SOURCES]; + EventListener *listeners[MAX_EVENT_SOURCES]; unsigned int event_timeout; uint32_t repoll_cmd_ring; uint32_t repoll_cursor_ring; @@ -8721,6 +8722,21 @@ void red_show_tree(RedWorker *worker) } } +static void poll_channel_client_pre_disconnect(RedChannelClient *rcc) +{ + CommonChannel *common; + int i; + + common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base); + for (i = 0; i < MAX_EVENT_SOURCES; i++) { + struct pollfd *pfd = common->worker->poll_fds + i; + if (pfd->fd == rcc->stream->socket) { + pfd->fd = -1; + break; + } + } +} + static void display_channel_client_on_disconnect(RedChannelClient *rcc) { DisplayChannel *display_channel; @@ -9612,7 +9628,7 @@ CursorChannelClient *cursor_channel_create_rcc(CommonChannel *common, static int listen_to_new_client_channel(CommonChannel *common, CommonChannelClient *common_cc, RedsStream *stream) { - struct epoll_event event; + int i; common_cc->listener.refs = 1; common_cc->listener.action = common->listener_action; @@ -9620,17 +9636,23 @@ static int listen_to_new_client_channel(CommonChannel *common, ASSERT(common->base.clients_num); common_cc->id = common->worker->id; red_printf("NEW ID = %d", common_cc->id); - event.events = EPOLLIN | EPOLLOUT | EPOLLET; - event.data.ptr = &common_cc->listener; - if (epoll_ctl(common->worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) { - red_printf("epoll_ctl failed, %s", strerror(errno)); - return FALSE; + + for (i = 0; i < MAX_EVENT_SOURCES; i++) { + struct pollfd *pfd = common->worker->poll_fds + i; + if (pfd->fd < 0) { + red_printf("new poll event %d (fd %d)", i, stream->socket); + pfd->fd = stream->socket; + pfd->events = POLLIN | POLLOUT; + common->worker->listeners[i] = &common_cc->listener; + return TRUE; + } } - return TRUE; + return FALSE; } static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate, event_listener_action_proc handler, + channel_disconnect_proc pre_disconnect, channel_disconnect_proc on_disconnect, channel_send_pipe_item_proc send_item, channel_hold_pipe_item_proc hold_item, @@ -9645,6 +9667,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t ChannelCbs channel_cbs; channel_cbs.config_socket = common_channel_config_socket; + channel_cbs.pre_disconnect = pre_disconnect; channel_cbs.on_disconnect = on_disconnect; channel_cbs.send_item = send_item; channel_cbs.hold_item = hold_item; @@ -9675,12 +9698,12 @@ error: return NULL; } -static void handle_channel_events(EventListener *in_listener, uint32_t events) +static void handle_channel_events(EventListener *in_listener, struct pollfd *pfd) { CommonChannelClient *common_cc = SPICE_CONTAINEROF(in_listener, CommonChannelClient, listener); RedChannelClient *rcc = &common_cc->base; - if ((events & EPOLLIN) && red_channel_client_is_connected(rcc)) { + if ((pfd->events & POLLIN) && red_channel_client_is_connected(rcc)) { red_channel_client_receive(rcc); } @@ -9835,6 +9858,7 @@ static void display_channel_create(RedWorker *worker, int migrate) worker, sizeof(*display_channel), SPICE_CHANNEL_DISPLAY, migrate, handle_channel_events, + poll_channel_client_pre_disconnect, display_channel_client_on_disconnect, display_channel_send_item, display_channel_hold_pipe_item, @@ -10049,6 +10073,7 @@ static void cursor_channel_create(RedWorker *worker, int migrate) worker, sizeof(*worker->cursor_channel), SPICE_CHANNEL_CURSOR, migrate, handle_channel_events, + poll_channel_client_pre_disconnect, cursor_channel_client_on_disconnect, cursor_channel_send_item, cursor_channel_hold_pipe_item, @@ -11027,7 +11052,7 @@ static void register_callbacks(Dispatcher *dispatcher) -static void handle_dev_input(EventListener *listener, uint32_t events) +static void handle_dev_input(EventListener *listener, struct pollfd *pfd) { RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener); @@ -11041,10 +11066,9 @@ static void handle_dev_free(EventListener *ctx) static void red_init(RedWorker *worker, WorkerInitData *init_data) { - struct epoll_event event; RedWorkerMessage message; - int epoll; Dispatcher *dispatcher; + int i; ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE); @@ -11086,17 +11110,13 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data) worker->wakeup_counter = stat_add_counter(worker->stat, "wakeups", TRUE); worker->command_counter = stat_add_counter(worker->stat, "commands", TRUE); #endif - if ((epoll = epoll_create(MAX_EPOLL_SOURCES)) == -1) { - red_error("epoll_create failed, %s", strerror(errno)); + for (i = 0; i < MAX_EVENT_SOURCES; i++) { + worker->poll_fds[i].fd = -1; } - worker->epoll = epoll; - event.events = EPOLLIN; - event.data.ptr = &worker->dev_listener; - - if (epoll_ctl(epoll, EPOLL_CTL_ADD, worker->channel, &event) == -1) { - red_error("add channel failed, %s", strerror(errno)); - } + worker->poll_fds[0].fd = worker->channel; + worker->poll_fds[0].events = POLLIN; + worker->listeners[0] = &worker->dev_listener; red_memslot_info_init(&worker->mem_slots, init_data->num_memslots_groups, @@ -11140,13 +11160,10 @@ void *red_worker_main(void *arg) red_init_zlib(&worker); worker.event_timeout = INF_EVENT_WAIT; for (;;) { - struct epoll_event events[MAX_EPOLL_SOURCES]; - int num_events; - struct epoll_event *event; - struct epoll_event *end; + int i, num_events; worker.event_timeout = MIN(red_get_streams_timout(&worker), worker.event_timeout); - num_events = epoll_wait(worker.epoll, events, MAX_EPOLL_SOURCES, worker.event_timeout); + num_events = poll(worker.poll_fds, MAX_EVENT_SOURCES, worker.event_timeout); red_handle_streams_timout(&worker); if (worker.display_channel) { @@ -11160,27 +11177,32 @@ void *red_worker_main(void *arg) worker.event_timeout = INF_EVENT_WAIT; if (num_events == -1) { if (errno != EINTR) { - red_error("poll_wait failed, %s", strerror(errno)); + red_error("poll failed, %s", strerror(errno)); } num_events = 0; } - for (event = events, end = event + num_events; event < end; event++) { - EventListener *evt_listener = (EventListener *)event->data.ptr; - evt_listener->refs++; + for (i = 0; i < MAX_EVENT_SOURCES; i++) { + struct pollfd *pfd = worker.poll_fds + i; + if (pfd->revents) { + worker.listeners[i]->refs++; + } } - for (event = events, end = event + num_events; event < end; event++) { - EventListener *evt_listener = (EventListener *)event->data.ptr; + for (i = 0; i < MAX_EVENT_SOURCES; i++) { + struct pollfd *pfd = worker.poll_fds + i; + if (pfd->revents) { + EventListener *evt_listener = worker.listeners[i]; - if (evt_listener->refs > 1) { - evt_listener->action(evt_listener, event->events); - if (--evt_listener->refs) { - continue; + if (evt_listener->refs > 1) { + evt_listener->action(evt_listener, pfd); + if (--evt_listener->refs) { + continue; + } } + red_printf("freeing event listener"); + evt_listener->free(evt_listener); } - red_printf("freeing event listener"); - evt_listener->free(evt_listener); } if (worker.running) { diff --git a/server/reds.c b/server/reds.c index daadb569..250e0ca4 100644 --- a/server/reds.c +++ b/server/reds.c @@ -4204,6 +4204,7 @@ void reds_stream_free(RedsStream *s) } reds_stream_remove_watch(s); + red_printf("close socket fd %d", s->socket); close(s->socket); free(s); |