summaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorHans de Goede <hdegoede@redhat.com>2012-03-10 15:22:49 +0100
committerHans de Goede <hdegoede@redhat.com>2012-03-12 12:10:51 +0100
commita7841325b22af56355ba353842d3c01b407f26d9 (patch)
treeb53207a33fd7d4237828c627bc476707c3792951 /server
parentb023f85ebda1e077d2456ce9e443ea6b3904d58f (diff)
downloadspice-a7841325b22af56355ba353842d3c01b407f26d9.tar.gz
spice-a7841325b22af56355ba353842d3c01b407f26d9.tar.xz
spice-a7841325b22af56355ba353842d3c01b407f26d9.zip
red_worker: Rework poll code to use the watch interface
Commit 143a1df24e83e9c1e173c16aeb76d61ffdce9598 changed red_worker_main from epoll to poll. But epoll has edge triggered semantics (when requested and we requested them), where as poll is always level triggered. And red_worker was relying on the edge triggered semantics, as it was always polling for POLLOUT, which, when edge triggered, would only cause poll to register an event after we had blocked on a write. But after the switch to regular poll, with its level triggered semantics, the POLLOUT condition would almost always be true, causing red_worker_main to not block on the poll and burn CPU as fast as it can as soon as a client was connected. Luckily we already have a mechanism to switch from polling for read only to polling for read+write and back again in the form of watches. So this patch changes the red_worker dummy watch implementation into a proper watch implementation, and drops the entire EventListener concept since that then is no longer needed. This fixes spice-server using 400% CPU on my quad core machine as soon as a client was connected to a multi head vm, and as an added bonus is a nice cleanup IMHO. Signed-off-by: Hans de Goede <hdegoede@redhat.com>
Diffstat (limited to 'server')
-rw-r--r--server/red_worker.c170
1 files changed, 79 insertions, 91 deletions
diff --git a/server/red_worker.c b/server/red_worker.c
index 3da51612..f7c6ccbe 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -234,12 +234,11 @@ double inline stat_byte_to_mega(uint64_t size)
#define MAX_EVENT_SOURCES 20
#define INF_EVENT_WAIT ~0
-
-typedef struct EventListener EventListener;
-typedef void (*event_listener_action_proc)(EventListener *ctx, struct pollfd *pfd);
-struct EventListener {
- event_listener_action_proc action;
-};
+typedef struct SpiceWatch {
+ struct RedWorker *worker;
+ SpiceWatchFunc watch_func;
+ void *watch_func_opaque;
+} SpiceWatch;
enum {
BUF_TYPE_RAW = 1,
@@ -577,7 +576,6 @@ typedef struct GlzSharedDictionary {
typedef struct CommonChannel {
RedChannel base; // Must be the first thing
- event_listener_action_proc listener_action;
struct RedWorker *worker;
uint8_t recv_buf[RECIVE_BUF_SIZE];
uint32_t id_alloc; // bitfield. TODO - use this instead of shift scheme.
@@ -585,7 +583,6 @@ typedef struct CommonChannel {
typedef struct CommonChannelClient {
RedChannelClient base;
- EventListener listener;
uint32_t id;
struct RedWorker *worker;
} CommonChannelClient;
@@ -864,7 +861,6 @@ typedef struct ItemTrace {
#define NUM_CURSORS 100
typedef struct RedWorker {
- EventListener dev_listener;
DisplayChannel *display_channel;
CursorChannel *cursor_channel;
QXLInstance *qxl;
@@ -875,7 +871,7 @@ typedef struct RedWorker {
int running;
uint32_t *pending;
struct pollfd poll_fds[MAX_EVENT_SOURCES];
- EventListener *listeners[MAX_EVENT_SOURCES];
+ struct SpiceWatch watches[MAX_EVENT_SOURCES];
unsigned int event_timeout;
uint32_t repoll_cmd_ring;
uint32_t repoll_cursor_ring;
@@ -8719,22 +8715,6 @@ void red_show_tree(RedWorker *worker)
}
}
-static void poll_channel_client_pre_disconnect(RedChannelClient *rcc)
-{
- CommonChannel *common;
- int i;
-
- common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
- for (i = 0; i < MAX_EVENT_SOURCES; i++) {
- struct pollfd *pfd = common->worker->poll_fds + i;
- if (pfd->fd == rcc->stream->socket) {
- pfd->fd = -1;
- common->worker->listeners[i] = NULL;
- break;
- }
- }
-}
-
static void display_channel_client_on_disconnect(RedChannelClient *rcc)
{
DisplayChannel *display_channel;
@@ -9534,15 +9514,61 @@ static int common_channel_config_socket(RedChannelClient *rcc)
static void worker_watch_update_mask(SpiceWatch *watch, int event_mask)
{
+ struct RedWorker *worker = watch->worker;
+ int i = watch - worker->watches;
+
+ worker->poll_fds[i].events = 0;
+ if (event_mask & SPICE_WATCH_EVENT_READ) {
+ worker->poll_fds[i].events |= POLLIN;
+ }
+ if (event_mask & SPICE_WATCH_EVENT_WRITE) {
+ worker->poll_fds[i].events |= POLLOUT;
+ }
}
static SpiceWatch *worker_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque)
{
- return NULL; // apparently allowed?
+ /* Since we are a channel core implementation, we always get called from
+ red_channel_client_create(), so opaque always is our rcc */
+ RedChannelClient *rcc = opaque;
+ struct RedWorker *worker;
+ int i;
+
+ /* Since we are called from red_channel_client_create()
+ CommonChannelClient->worker has not been set yet! */
+ worker = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base)->worker;
+
+ /* Search for a free slot in our poll_fds & watches arrays */
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ if (worker->poll_fds[i].fd == -1) {
+ break;
+ }
+ }
+ if (i == MAX_EVENT_SOURCES) {
+ red_printf("ERROR could not add a watch for channel type %u id %u",
+ rcc->channel->type, rcc->channel->id);
+ return NULL;
+ }
+
+ worker->poll_fds[i].fd = fd;
+ worker->watches[i].worker = worker;
+ worker->watches[i].watch_func = func;
+ worker->watches[i].watch_func_opaque = opaque;
+ worker_watch_update_mask(&worker->watches[i], event_mask);
+
+ return &worker->watches[i];
}
static void worker_watch_remove(SpiceWatch *watch)
{
+ /* Note we don't touch the poll_fd here, to avoid the
+ poll_fds/watches table entry getting re-used in the same
+ red_worker_main loop over the fds as it is removed.
+
+ This is done because re-using it while events were pending on
+ the fd previously occupying the slot would lead to incorrectly
+ calling the watch_func for the new fd. */
+ memset(watch, 0, sizeof(SpiceWatch));
}
SpiceCoreInterface worker_core = {
@@ -9566,6 +9592,7 @@ static CommonChannelClient *common_channel_client_create(int size,
num_common_caps, common_caps, num_caps, caps);
CommonChannelClient *common_cc = (CommonChannelClient*)rcc;
common_cc->worker = common->worker;
+ common_cc->id = common->worker->id;
// TODO: move wide/narrow ack setting to red_channel.
red_channel_client_ack_set_client_window(rcc,
@@ -9615,32 +9642,7 @@ CursorChannelClient *cursor_channel_create_rcc(CommonChannel *common,
return ccc;
}
-static int listen_to_new_client_channel(CommonChannel *common,
- CommonChannelClient *common_cc, RedsStream *stream)
-{
- int i;
-
- common_cc->listener.action = common->listener_action;
- ASSERT(common->base.clients_num);
- common_cc->id = common->worker->id;
- red_printf("NEW ID = %d", common_cc->id);
-
- for (i = 0; i < MAX_EVENT_SOURCES; i++) {
- struct pollfd *pfd = common->worker->poll_fds + i;
- if (pfd->fd < 0) {
- red_printf("new poll event %d (fd %d)", i, stream->socket);
- pfd->fd = stream->socket;
- pfd->events = POLLIN | POLLOUT;
- common->worker->listeners[i] = &common_cc->listener;
- return TRUE;
- }
- }
- return FALSE;
-}
-
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
- event_listener_action_proc handler,
- channel_disconnect_proc pre_disconnect,
channel_disconnect_proc on_disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
@@ -9655,7 +9657,6 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
ChannelCbs channel_cbs = { NULL, };
channel_cbs.config_socket = common_channel_config_socket;
- channel_cbs.pre_disconnect = pre_disconnect;
channel_cbs.on_disconnect = on_disconnect;
channel_cbs.send_item = send_item;
channel_cbs.hold_item = hold_item;
@@ -9678,7 +9679,6 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_t
goto error;
}
common->worker = worker;
- common->listener_action = handler;
return channel;
error:
@@ -9686,20 +9686,6 @@ error:
return NULL;
}
-static void handle_channel_events(EventListener *in_listener, struct pollfd *pfd)
-{
- CommonChannelClient *common_cc = SPICE_CONTAINEROF(in_listener, CommonChannelClient, listener);
- RedChannelClient *rcc = &common_cc->base;
-
- if ((pfd->events & POLLIN) && red_channel_client_is_connected(rcc)) {
- red_channel_client_receive(rcc);
- }
-
- if (rcc->send_data.blocked && red_channel_client_is_connected(rcc)) {
- red_channel_client_push(rcc);
- }
-}
-
static void display_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
{
ASSERT(item);
@@ -9845,8 +9831,6 @@ static void display_channel_create(RedWorker *worker, int migrate)
if (!(worker->display_channel = (DisplayChannel *)__new_channel(
worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, migrate,
- handle_channel_events,
- poll_channel_client_pre_disconnect,
display_channel_client_on_disconnect,
display_channel_send_item,
display_channel_hold_pipe_item,
@@ -9934,15 +9918,8 @@ static void handle_new_display_channel(RedWorker *worker, RedClient *client, Red
// todo: tune level according to bandwidth
display_channel->zlib_level = ZLIB_DEFAULT_COMPRESSION_LEVEL;
- if (!listen_to_new_client_channel(&display_channel->common, &dcc->common, stream)) {
- goto error;
- }
red_display_client_init_streams(dcc);
on_new_display_channel_client(dcc);
- return;
-
-error:
- red_channel_client_destroy(&dcc->common.base);
}
static void cursor_channel_client_on_disconnect(RedChannelClient *rcc)
@@ -10060,8 +10037,6 @@ static void cursor_channel_create(RedWorker *worker, int migrate)
worker->cursor_channel = (CursorChannel *)__new_channel(
worker, sizeof(*worker->cursor_channel),
SPICE_CHANNEL_CURSOR, migrate,
- handle_channel_events,
- poll_channel_client_pre_disconnect,
cursor_channel_client_on_disconnect,
cursor_channel_send_item,
cursor_channel_hold_pipe_item,
@@ -10096,7 +10071,6 @@ static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream
channel->stat = stat_add_node(worker->stat, "cursor_channel", TRUE);
channel->common.base.out_bytes_counter = stat_add_counter(channel->stat, "out_bytes", TRUE);
#endif
- listen_to_new_client_channel(&channel->common, &ccc->common, stream);
on_new_cursor_channel(worker, &ccc->common.base);
}
@@ -11040,9 +11014,9 @@ static void register_callbacks(Dispatcher *dispatcher)
-static void handle_dev_input(EventListener *listener, struct pollfd *pfd)
+static void handle_dev_input(int fd, int event, void *opaque)
{
- RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener);
+ RedWorker *worker = opaque;
dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher));
}
@@ -11064,7 +11038,6 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
worker->channel = dispatcher_get_recv_fd(dispatcher);
register_callbacks(dispatcher);
worker->pending = init_data->pending;
- worker->dev_listener.action = handle_dev_input;
worker->cursor_visible = TRUE;
ASSERT(init_data->num_renderers > 0);
worker->num_renderers = init_data->num_renderers;
@@ -11097,7 +11070,9 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data)
worker->poll_fds[0].fd = worker->channel;
worker->poll_fds[0].events = POLLIN;
- worker->listeners[0] = &worker->dev_listener;
+ worker->watches[0].worker = worker;
+ worker->watches[0].watch_func = handle_dev_input;
+ worker->watches[0].watch_func_opaque = worker;
red_memslot_info_init(&worker->mem_slots,
init_data->num_memslots_groups,
@@ -11160,17 +11135,30 @@ void *red_worker_main(void *arg)
if (errno != EINTR) {
red_error("poll failed, %s", strerror(errno));
}
- num_events = 0;
}
for (i = 0; i < MAX_EVENT_SOURCES; i++) {
- struct pollfd *pfd = worker.poll_fds + i;
- if (pfd->revents) {
- EventListener *evt_listener = worker.listeners[i];
-
- if (evt_listener) {
- evt_listener->action(evt_listener, pfd);
+ /* The watch may have been removed by the watch-func from
+ another fd (ie a disconnect through the dispatcher),
+ in this case watch_func is NULL. */
+ if (worker.poll_fds[i].revents && worker.watches[i].watch_func) {
+ int events = 0;
+ if (worker.poll_fds[i].revents & POLLIN) {
+ events |= SPICE_WATCH_EVENT_READ;
+ }
+ if (worker.poll_fds[i].revents & POLLOUT) {
+ events |= SPICE_WATCH_EVENT_WRITE;
}
+ worker.watches[i].watch_func(worker.poll_fds[i].fd, events,
+ worker.watches[i].watch_func_opaque);
+ }
+ }
+
+ /* Clear the poll_fd for any removed watches, see the comment in
+ watch_remove for why we don't do this there. */
+ for (i = 0; i < MAX_EVENT_SOURCES; i++) {
+ if (!worker.watches[i].watch_func) {
+ worker.poll_fds[i].fd = -1;
}
}