summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--server/red_channel.c3
-rw-r--r--server/red_channel.h1
-rw-r--r--server/red_worker.c106
-rw-r--r--server/reds.c1
4 files changed, 69 insertions, 42 deletions
diff --git a/server/red_channel.c b/server/red_channel.c
index 59e6af6d..5b8027cb 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -1191,6 +1191,9 @@ void red_channel_client_disconnect(RedChannelClient *rcc)
return;
}
red_channel_client_pipe_clear(rcc);
+ if (rcc->channel->channel_cbs.pre_disconnect) {
+ rcc->channel->channel_cbs.pre_disconnect(rcc);
+ }
reds_stream_free(rcc->stream);
rcc->stream = NULL;
red_channel_remove_client(rcc);
diff --git a/server/red_channel.h b/server/red_channel.h
index 543aec18..a360a302 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -182,6 +182,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base);
*/
typedef struct {
channel_configure_socket_proc config_socket;
+ channel_disconnect_proc pre_disconnect;
channel_disconnect_proc on_disconnect;
channel_send_pipe_item_proc send_item;
channel_hold_pipe_item_proc hold_item;
diff --git a/server/red_worker.c b/server/red_worker.c
index 3973f3e9..e88dbc05 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -31,7 +31,6 @@
#include <stdio.h>
#include <stdarg.h>
-#include <sys/epoll.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
@@ -39,6 +38,7 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
+#include <poll.h>
#include <pthread.h>
#include <netinet/tcp.h>
#include <setjmp.h>
@@ -231,12 +231,12 @@ double inline stat_byte_to_mega(uint64_t size)
#define stat_compress_add(a, b, c, d)
#endif
-#define MAX_EPOLL_SOURCES 10
+#define MAX_EVENT_SOURCES 20
#define INF_EVENT_WAIT ~0
typedef struct EventListener EventListener;
-typedef void (*event_listener_action_proc)(EventListener *ctx, uint32_t events);
+typedef void (*event_listener_action_proc)(EventListener *ctx, struct pollfd *pfd);
typedef void (*event_listener_free_proc)(EventListener *ctx);
struct EventListener {
uint32_t refs;
@@ -877,7 +877,8 @@ typedef struct RedWorker {
int id;
int running;
uint32_t *pending;
- int epoll;
+ struct pollfd poll_fds[MAX_EVENT_SOURCES];
+ EventListener *listeners[MAX_EVENT_SOURCES];
unsigned int event_timeout;
uint32_t repoll_cmd_ring;
uint32_t repoll_cursor_ring;
@@ -8721,6 +8722,21 @@ void red_show_tree(RedWorker *worker)
}
}
+static void poll_channel_client_pre_disconnect(RedChannelClient *rcc)
+{
+ CommonChannel *common;
+ int i;
+
+ common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = common->worker->poll_fds + i;
+ if (pfd->fd == rcc->stream->socket) {
+ pfd->fd = -1;
+ break;
+ }
+ }
+}
+
static void display_channel_client_on_disconnect(RedChannelClient *rcc)
{
DisplayChannel *display_channel;
@@ -9612,7 +9628,7 @@ CursorChannelClient *cursor_channel_create_rcc(CommonChannel *common,
static int listen_to_new_client_channel(CommonChannel *common,
CommonChannelClient *common_cc, RedsStream *stream)
{
- struct epoll_event event;
+ int i;
common_cc->listener.refs = 1;
common_cc->listener.action = common->listener_action;
@@ -9620,17 +9636,23 @@ static int listen_to_new_client_channel(CommonChannel *common,
ASSERT(common->base.clients_num);
common_cc->id = common->worker->id;
red_printf("NEW ID = %d", common_cc->id);
- event.events = EPOLLIN | EPOLLOUT | EPOLLET;
- event.data.ptr = &common_cc->listener;
- if (epoll_ctl(common->worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) {
- red_printf("epoll_ctl failed, %s", strerror(errno));
- return FALSE;
+
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = common->worker->poll_fds + i;
+ if (pfd->fd < 0) {
+ red_printf("new poll event %d (fd %d)", i, stream->socket);
+ pfd->fd = stream->socket;
+ pfd->events = POLLIN | POLLOUT;
+ common->worker->listeners[i] = &common_cc->listener;
+ return TRUE;
+ }
}
- return TRUE;
+ return FALSE;
}
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
event_listener_action_proc handler,
+ channel_disconnect_proc pre_disconnect,
channel_disconnect_proc on_disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
@@ -9645,6 +9667,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
ChannelCbs channel_cbs;
channel_cbs.config_socket = common_channel_config_socket;
+ channel_cbs.pre_disconnect = pre_disconnect;
channel_cbs.on_disconnect = on_disconnect;
channel_cbs.send_item = send_item;
channel_cbs.hold_item = hold_item;
@@ -9675,12 +9698,12 @@ error:
return NULL;
}
-static void handle_channel_events(EventListener *in_listener, uint32_t events)
+static void handle_channel_events(EventListener *in_listener, struct pollfd *pfd)
{
CommonChannelClient *common_cc = SPICE_CONTAINEROF(in_listener, CommonChannelClient, listener);
RedChannelClient *rcc = &common_cc->base;
- if ((events & EPOLLIN) && red_channel_client_is_connected(rcc)) {
+ if ((pfd->events & POLLIN) && red_channel_client_is_connected(rcc)) {
red_channel_client_receive(rcc);
}
@@ -9835,6 +9858,7 @@ static void display_channel_create(RedWorker *worker, int migrate)
worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, migrate,
handle_channel_events,
+ poll_channel_client_pre_disconnect,
display_channel_client_on_disconnect,
display_channel_send_item,
display_channel_hold_pipe_item,
@@ -10049,6 +10073,7 @@ static void cursor_channel_create(RedWorker *worker, int migrate)
worker, sizeof(*worker->cursor_channel),
SPICE_CHANNEL_CURSOR, migrate,
handle_channel_events,
+ poll_channel_client_pre_disconnect,
cursor_channel_client_on_disconnect,
cursor_channel_send_item,
cursor_channel_hold_pipe_item,
@@ -11027,7 +11052,7 @@ static void register_callbacks(Dispatcher *dispatcher)
-static void handle_dev_input(EventListener *listener, uint32_t events)
+static void handle_dev_input(EventListener *listener, struct pollfd *pfd)
{
RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
@@ -11041,10 +11066,9 @@ static void handle_dev_free(EventListener *ctx)
static void red_init(RedWorker *worker, WorkerInitData *init_data)
{
- struct epoll_event event;
RedWorkerMessage message;
- int epoll;
Dispatcher *dispatcher;
+ int i;
ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE);
@@ -11086,17 +11110,13 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
worker->wakeup_counter = stat_add_counter(worker->stat, "wakeups", TRUE);
worker->command_counter = stat_add_counter(worker->stat, "commands", TRUE);
#endif
- if ((epoll = epoll_create(MAX_EPOLL_SOURCES)) == -1) {
- red_error("epoll_create failed, %s", strerror(errno));
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ worker->poll_fds[i].fd = -1;
}
- worker->epoll = epoll;
- event.events = EPOLLIN;
- event.data.ptr = &worker->dev_listener;
-
- if (epoll_ctl(epoll, EPOLL_CTL_ADD, worker->channel, &event) == -1) {
- red_error("add channel failed, %s", strerror(errno));
- }
+ worker->poll_fds[0].fd = worker->channel;
+ worker->poll_fds[0].events = POLLIN;
+ worker->listeners[0] = &worker->dev_listener;
red_memslot_info_init(&worker->mem_slots,
init_data->num_memslots_groups,
@@ -11140,13 +11160,10 @@ void *red_worker_main(void *arg)
red_init_zlib(&worker);
worker.event_timeout = INF_EVENT_WAIT;
for (;;) {
- struct epoll_event events[MAX_EPOLL_SOURCES];
- int num_events;
- struct epoll_event *event;
- struct epoll_event *end;
+ int i, num_events;
worker.event_timeout = MIN(red_get_streams_timout(&worker), worker.event_timeout);
- num_events = epoll_wait(worker.epoll, events, MAX_EPOLL_SOURCES, worker.event_timeout);
+ num_events = poll(worker.poll_fds, MAX_EVENT_SOURCES, worker.event_timeout);
red_handle_streams_timout(&worker);
if (worker.display_channel) {
@@ -11160,27 +11177,32 @@ void *red_worker_main(void *arg)
worker.event_timeout = INF_EVENT_WAIT;
if (num_events == -1) {
if (errno != EINTR) {
- red_error("poll_wait failed, %s", strerror(errno));
+ red_error("poll failed, %s", strerror(errno));
}
num_events = 0;
}
- for (event = events, end = event + num_events; event < end; event++) {
- EventListener *evt_listener = (EventListener *)event->data.ptr;
- evt_listener->refs++;
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = worker.poll_fds + i;
+ if (pfd->revents) {
+ worker.listeners[i]->refs++;
+ }
}
- for (event = events, end = event + num_events; event < end; event++) {
- EventListener *evt_listener = (EventListener *)event->data.ptr;
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ struct pollfd *pfd = worker.poll_fds + i;
+ if (pfd->revents) {
+ EventListener *evt_listener = worker.listeners[i];
- if (evt_listener->refs > 1) {
- evt_listener->action(evt_listener, event->events);
- if (--evt_listener->refs) {
- continue;
+ if (evt_listener->refs > 1) {
+ evt_listener->action(evt_listener, pfd);
+ if (--evt_listener->refs) {
+ continue;
+ }
}
+ red_printf("freeing event listener");
+ evt_listener->free(evt_listener);
}
- red_printf("freeing event listener");
- evt_listener->free(evt_listener);
}
if (worker.running) {
diff --git a/server/reds.c b/server/reds.c
index daadb569..250e0ca4 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -4204,6 +4204,7 @@ void reds_stream_free(RedsStream *s)
}
reds_stream_remove_watch(s);
+ red_printf("close socket fd %d", s->socket);
close(s->socket);
free(s);