From 8e049ce3b03c5e0034702d2a4b49b7ca36aaff92 Mon Sep 17 00:00:00 2001 From: Alon Levy Date: Mon, 31 Oct 2011 17:35:30 +0200 Subject: server/red_worker: reuse dispatcher This patch reuses Dispatcher in RedDispatcher. It adds two helpers to red_worker to keep RedWorker opaque to the outside. The dispatcher is abused in three places that use the underlying socket directly: once sending a READY after red_init completes once for each channel creation, replying with the RedChannel instance for cursor and display. FDO Bugzilla: 42463 rfc->v1: * move callbacks to red_worker.c including registration (Yonit) * rename dispatcher to red_dispatcher in red_worker.c and red_dispatcher.c * add accessor red_dispatcher_get_dispatcher * s/dispatcher_handle_recv/dispatcher_handle_recv_read/ and change sig to just Dispatcher *dispatcher (was the SpiceCoreInterface one) * remove SpiceCoreInterface parameter from dispatcher_init (Yonit) * main_dispatcher needed it for channel_event so it has it in struct MainDispatcher * add dispatcher_get_recv_fd for red_worker --- server/red_dispatcher.c | 487 +++++++++++++++----------- server/red_dispatcher.h | 149 ++++++++ server/red_worker.c | 903 ++++++++++++++++++++++++++++++------------------ server/red_worker.h | 5 +- 4 files changed, 997 insertions(+), 547 deletions(-) diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c index 0c7a8754..17b469e1 100644 --- a/server/red_dispatcher.c +++ b/server/red_dispatcher.c @@ -37,6 +37,7 @@ #include "reds_gl_canvas.h" #endif // USE_OPENGL #include "reds.h" +#include "dispatcher.h" #include "red_dispatcher.h" #include "red_parse_qxl.h" @@ -58,7 +59,7 @@ struct AsyncCommand { struct RedDispatcher { QXLWorker base; QXLInstance *qxl; - int channel; + Dispatcher dispatcher; pthread_t worker_thread; uint32_t pending; int primary_active; @@ -93,19 +94,22 @@ static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *clie int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { + RedWorkerMessageDisplayConnect payload; RedDispatcher *dispatcher; red_printf(""); dispatcher = (RedDispatcher *)channel->data; - RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CONNECT; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &client, sizeof(RedClient *)); - send_data(dispatcher->channel, &stream, sizeof(RedsStream *)); - send_data(dispatcher->channel, &migration, sizeof(int)); + payload.client = client; + payload.stream = stream; + payload.migration = migration; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DISPLAY_CONNECT, + &payload); } static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc) { + RedWorkerMessageDisplayDisconnect payload; RedDispatcher *dispatcher; if (!rcc->channel) { @@ -115,27 +119,28 @@ static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc) dispatcher = (RedDispatcher *)rcc->channel->data; red_printf(""); - RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *)); + payload.rcc = rcc; // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count // for channels - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DISPLAY_DISCONNECT, + &payload); } static void red_dispatcher_display_migrate(RedChannelClient *rcc) { + RedWorkerMessageDisplayMigrate payload; RedDispatcher *dispatcher; if (!rcc->channel) { return; } dispatcher = (RedDispatcher *)rcc->channel->data; red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id); - RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *)); + payload.rcc = rcc; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DISPLAY_MIGRATE, + &payload); } static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream, @@ -143,17 +148,20 @@ static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *clien uint32_t *common_caps, int num_caps, uint32_t *caps) { + RedWorkerMessageCursorConnect payload; RedDispatcher *dispatcher = (RedDispatcher *)channel->data; red_printf(""); - RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CONNECT; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &client, sizeof(RedClient *)); - send_data(dispatcher->channel, &stream, sizeof(RedsStream *)); - send_data(dispatcher->channel, &migration, sizeof(int)); + payload.client = client; + payload.stream = stream; + payload.migration = migration; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_CURSOR_CONNECT, + &payload); } static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc) { + RedWorkerMessageCursorDisconnect payload; RedDispatcher *dispatcher; if (!rcc->channel) { @@ -162,16 +170,16 @@ static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc) dispatcher = (RedDispatcher *)rcc->channel->data; red_printf(""); - RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *)); + payload.rcc = rcc; - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_CURSOR_DISCONNECT, + &payload); } static void red_dispatcher_cursor_migrate(RedChannelClient *rcc) { + RedWorkerMessageCursorMigrate payload; RedDispatcher *dispatcher; if (!rcc->channel) { @@ -179,8 +187,10 @@ static void red_dispatcher_cursor_migrate(RedChannelClient *rcc) } dispatcher = (RedDispatcher *)rcc->channel->data; red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id); - RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE; - write_message(dispatcher->channel, &message); + payload.rcc = rcc; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_CURSOR_MIGRATE, + &payload); } typedef struct RendererInfo { @@ -263,16 +273,16 @@ static void red_dispatcher_update_area(RedDispatcher *dispatcher, uint32_t surfa QXLRect *qxl_area, QXLRect *qxl_dirty_rects, uint32_t num_dirty_rects, uint32_t clear_dirty_region) { - RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE; + RedWorkerMessageUpdate payload; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &surface_id, sizeof(uint32_t)); - send_data(dispatcher->channel, &qxl_area, sizeof(QXLRect *)); - send_data(dispatcher->channel, &qxl_dirty_rects, sizeof(QXLRect *)); - send_data(dispatcher->channel, &num_dirty_rects, sizeof(uint32_t)); - send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t)); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + payload.surface_id = surface_id; + payload.qxl_area = qxl_area; + payload.qxl_dirty_rects = qxl_dirty_rects; + payload.num_dirty_rects = num_dirty_rects; + payload.clear_dirty_region = clear_dirty_region; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_UPDATE, + &payload); } static AsyncCommand *async_command_alloc(RedDispatcher *dispatcher, @@ -297,13 +307,15 @@ static void red_dispatcher_update_area_async(RedDispatcher *dispatcher, uint64_t cookie) { RedWorkerMessage message = RED_WORKER_MESSAGE_UPDATE_ASYNC; - AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie); + RedWorkerMessageUpdateAsync payload; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &cmd, sizeof(cmd)); - send_data(dispatcher->channel, &surface_id, sizeof(uint32_t)); - send_data(dispatcher->channel, qxl_area, sizeof(QXLRect)); - send_data(dispatcher->channel, &clear_dirty_region, sizeof(uint32_t)); + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + payload.surface_id = surface_id; + payload.qxl_area = *qxl_area; + payload.clear_dirty_region = clear_dirty_region; + dispatcher_send_message(&dispatcher->dispatcher, + message, + &payload); } static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id, @@ -316,12 +328,12 @@ static void qxl_worker_update_area(QXLWorker *qxl_worker, uint32_t surface_id, static void red_dispatcher_add_memslot(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot) { - RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT; + RedWorkerMessageAddMemslot payload; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot)); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + payload.mem_slot = *mem_slot; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_ADD_MEMSLOT, + &payload); } static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slot) @@ -331,21 +343,22 @@ static void qxl_worker_add_memslot(QXLWorker *qxl_worker, QXLDevMemSlot *mem_slo static void red_dispatcher_add_memslot_async(RedDispatcher *dispatcher, QXLDevMemSlot *mem_slot, uint64_t cookie) { + RedWorkerMessageAddMemslotAsync payload; RedWorkerMessage message = RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC; - AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie); - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &cmd, sizeof(cmd)); - send_data(dispatcher->channel, mem_slot, sizeof(QXLDevMemSlot)); + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + payload.mem_slot = *mem_slot; + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); } static void red_dispatcher_del_memslot(RedDispatcher *dispatcher, uint32_t slot_group_id, uint32_t slot_id) { + RedWorkerMessageDelMemslot payload; RedWorkerMessage message = RED_WORKER_MESSAGE_DEL_MEMSLOT; - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &slot_group_id, sizeof(uint32_t)); - send_data(dispatcher->channel, &slot_id, sizeof(uint32_t)); + payload.slot_group_id = slot_group_id; + payload.slot_id = slot_id; + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); } static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id, uint32_t slot_id) @@ -355,11 +368,11 @@ static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id static void red_dispatcher_destroy_surfaces(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES; + RedWorkerMessageDestroySurfaces payload; - write_message(dispatcher->channel, &message); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DESTROY_SURFACES, + &payload); } static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker) @@ -369,11 +382,11 @@ static void qxl_worker_destroy_surfaces(QXLWorker *qxl_worker) static void red_dispatcher_destroy_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie) { + RedWorkerMessageDestroySurfacesAsync payload; RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC; - AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie); - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &cmd, sizeof(cmd)); + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); } static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispatcher) @@ -386,29 +399,38 @@ static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispa update_client_mouse_allowed(); } +static void +red_dispatcher_destroy_primary_surface_sync(RedDispatcher *dispatcher, + uint32_t surface_id) +{ + RedWorkerMessageDestroyPrimarySurface payload; + payload.surface_id = surface_id; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE, + &payload); + red_dispatcher_destroy_primary_surface_complete(dispatcher); +} + +static void +red_dispatcher_destroy_primary_surface_async(RedDispatcher *dispatcher, + uint32_t surface_id, uint64_t cookie) +{ + RedWorkerMessageDestroyPrimarySurfaceAsync payload; + RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC; + + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + payload.surface_id = surface_id; + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); +} + static void red_dispatcher_destroy_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id, int async, uint64_t cookie) { - RedWorkerMessage message; - AsyncCommand *cmd = NULL; - if (async) { - message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC; - cmd = async_command_alloc(dispatcher, message, cookie); + red_dispatcher_destroy_primary_surface_async(dispatcher, surface_id, cookie); } else { - message = RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE; - } - - write_message(dispatcher->channel, &message); - if (async) { - send_data(dispatcher->channel, &cmd, sizeof(cmd)); - } - send_data(dispatcher->channel, &surface_id, sizeof(uint32_t)); - if (!async) { - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); - red_dispatcher_destroy_primary_surface_complete(dispatcher); + red_dispatcher_destroy_primary_surface_sync(dispatcher, surface_id); } } @@ -431,30 +453,42 @@ static void red_dispatcher_create_primary_surface_complete(RedDispatcher *dispat } static void -red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id, - QXLDevSurfaceCreate *surface, int async, uint64_t cookie) +red_dispatcher_create_primary_surface_async(RedDispatcher *dispatcher, uint32_t surface_id, + QXLDevSurfaceCreate *surface, uint64_t cookie) { - RedWorkerMessage message; - AsyncCommand *cmd = NULL; + RedWorkerMessageCreatePrimarySurfaceAsync payload; + RedWorkerMessage message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC; + + dispatcher->surface_create = *surface; + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + payload.surface_id = surface_id; + payload.surface = *surface; + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); +} + +static void +red_dispatcher_create_primary_surface_sync(RedDispatcher *dispatcher, uint32_t surface_id, + QXLDevSurfaceCreate *surface) +{ + RedWorkerMessageCreatePrimarySurface payload; - if (async) { - message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC; - cmd = async_command_alloc(dispatcher, message, cookie); - } else { - message = RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE; - } dispatcher->surface_create = *surface; + payload.surface_id = surface_id; + payload.surface = *surface; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE, + &payload); + red_dispatcher_create_primary_surface_complete(dispatcher); +} - write_message(dispatcher->channel, &message); +static void +red_dispatcher_create_primary_surface(RedDispatcher *dispatcher, uint32_t surface_id, + QXLDevSurfaceCreate *surface, int async, uint64_t cookie) +{ if (async) { - send_data(dispatcher->channel, &cmd, sizeof(cmd)); - } - send_data(dispatcher->channel, &surface_id, sizeof(uint32_t)); - send_data(dispatcher->channel, surface, sizeof(QXLDevSurfaceCreate)); - if (!async) { - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); - red_dispatcher_create_primary_surface_complete(dispatcher); + red_dispatcher_create_primary_surface_async(dispatcher, surface_id, surface, cookie); + } else { + red_dispatcher_create_primary_surface_sync(dispatcher, surface_id, surface); } } @@ -466,11 +500,11 @@ static void qxl_worker_create_primary_surface(QXLWorker *qxl_worker, uint32_t su static void red_dispatcher_reset_image_cache(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_IMAGE_CACHE; + RedWorkerMessageResetImageCache payload; - write_message(dispatcher->channel, &message); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_RESET_IMAGE_CACHE, + &payload); } static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker) @@ -480,11 +514,11 @@ static void qxl_worker_reset_image_cache(QXLWorker *qxl_worker) static void red_dispatcher_reset_cursor(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_CURSOR; + RedWorkerMessageResetCursor payload; - write_message(dispatcher->channel, &message); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_RESET_CURSOR, + &payload); } static void qxl_worker_reset_cursor(QXLWorker *qxl_worker) @@ -492,29 +526,38 @@ static void qxl_worker_reset_cursor(QXLWorker *qxl_worker) red_dispatcher_reset_cursor((RedDispatcher*)qxl_worker); } -static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher, uint32_t surface_id, - int async, uint64_t cookie) +static void red_dispatcher_destroy_surface_wait_sync(RedDispatcher *dispatcher, + uint32_t surface_id) { - RedWorkerMessage message; - AsyncCommand *cmd = NULL; + RedWorkerMessageDestroySurfaceWait payload; - if (async ) { - message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC; - cmd = async_command_alloc(dispatcher, message, cookie); - } else { - message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT; - } + payload.surface_id = surface_id; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT, + &payload); +} - write_message(dispatcher->channel, &message); - if (async) { - send_data(dispatcher->channel, &cmd, sizeof(cmd)); - } - send_data(dispatcher->channel, &surface_id, sizeof(uint32_t)); +static void red_dispatcher_destroy_surface_wait_async(RedDispatcher *dispatcher, + uint32_t surface_id, + uint64_t cookie) +{ + RedWorkerMessageDestroySurfaceWaitAsync payload; + RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC; + + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + payload.surface_id = surface_id; + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); +} + +static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher, + uint32_t surface_id, + int async, uint64_t cookie) +{ if (async) { - return; + red_dispatcher_destroy_surface_wait_async(dispatcher, surface_id, cookie); + } else { + red_dispatcher_destroy_surface_wait_sync(dispatcher, surface_id); } - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); } static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surface_id) @@ -524,9 +567,11 @@ static void qxl_worker_destroy_surface_wait(QXLWorker *qxl_worker, uint32_t surf static void red_dispatcher_reset_memslots(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_RESET_MEMSLOTS; + RedWorkerMessageResetMemslots payload; - write_message(dispatcher->channel, &message); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_RESET_MEMSLOTS, + &payload); } static void qxl_worker_reset_memslots(QXLWorker *qxl_worker) @@ -536,11 +581,15 @@ static void qxl_worker_reset_memslots(QXLWorker *qxl_worker) static void red_dispatcher_wakeup(RedDispatcher *dispatcher) { - if (!test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) { - RedWorkerMessage message = RED_WORKER_MESSAGE_WAKEUP; - set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending); - write_message(dispatcher->channel, &message); + RedWorkerMessageWakeup payload; + + if (test_bit(RED_WORKER_PENDING_WAKEUP, dispatcher->pending)) { + return; } + set_bit(RED_WORKER_PENDING_WAKEUP, &dispatcher->pending); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_WAKEUP, + &payload); } static void qxl_worker_wakeup(QXLWorker *qxl_worker) @@ -550,11 +599,15 @@ static void qxl_worker_wakeup(QXLWorker *qxl_worker) static void red_dispatcher_oom(RedDispatcher *dispatcher) { - if (!test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) { - RedWorkerMessage message = RED_WORKER_MESSAGE_OOM; - set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending); - write_message(dispatcher->channel, &message); + RedWorkerMessageOom payload; + + if (test_bit(RED_WORKER_PENDING_OOM, dispatcher->pending)) { + return; } + set_bit(RED_WORKER_PENDING_OOM, &dispatcher->pending); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_OOM, + &payload); } static void qxl_worker_oom(QXLWorker *qxl_worker) @@ -564,9 +617,11 @@ static void qxl_worker_oom(QXLWorker *qxl_worker) static void red_dispatcher_start(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_START; + RedWorkerMessageStart payload; - write_message(dispatcher->channel, &message); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_START, + &payload); } static void qxl_worker_start(QXLWorker *qxl_worker) @@ -576,20 +631,20 @@ static void qxl_worker_start(QXLWorker *qxl_worker) static void red_dispatcher_flush_surfaces_async(RedDispatcher *dispatcher, uint64_t cookie) { + RedWorkerMessageFlushSurfacesAsync payload; RedWorkerMessage message = RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC; - AsyncCommand *cmd = async_command_alloc(dispatcher, message, cookie); - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &cmd, sizeof(cmd)); + payload.base.cmd = async_command_alloc(dispatcher, message, cookie); + dispatcher_send_message(&dispatcher->dispatcher, message, &payload); } static void red_dispatcher_stop(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_STOP; + RedWorkerMessageStop payload; - write_message(dispatcher->channel, &message); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_STOP, + &payload); } static void qxl_worker_stop(QXLWorker *qxl_worker) @@ -601,14 +656,14 @@ static void red_dispatcher_loadvm_commands(RedDispatcher *dispatcher, struct QXLCommandExt *ext, uint32_t count) { - RedWorkerMessage message = RED_WORKER_MESSAGE_LOADVM_COMMANDS; + RedWorkerMessageLoadvmCommands payload; red_printf(""); - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &count, sizeof(uint32_t)); - send_data(dispatcher->channel, ext, sizeof(QXLCommandExt) * count); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + payload.count = count; + payload.ext = ext; + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_LOADVM_COMMANDS, + &payload); } static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker, @@ -640,37 +695,44 @@ static inline int calc_compression_level(void) void red_dispatcher_on_ic_change(void) { + RedWorkerMessageSetCompression payload; int compression_level = calc_compression_level(); RedDispatcher *now = dispatchers; + while (now) { - RedWorkerMessage message = RED_WORKER_MESSAGE_SET_COMPRESSION; now->qxl->st->qif->set_compression_level(now->qxl, compression_level); - write_message(now->channel, &message); - send_data(now->channel, &image_compression, sizeof(spice_image_compression_t)); + payload.image_compression = image_compression; + dispatcher_send_message(&now->dispatcher, + RED_WORKER_MESSAGE_SET_COMPRESSION, + &payload); now = now->next; } } void red_dispatcher_on_sv_change(void) { + RedWorkerMessageSetStreamingVideo payload; int compression_level = calc_compression_level(); RedDispatcher *now = dispatchers; while (now) { - RedWorkerMessage message = RED_WORKER_MESSAGE_SET_STREAMING_VIDEO; now->qxl->st->qif->set_compression_level(now->qxl, compression_level); - write_message(now->channel, &message); - send_data(now->channel, &streaming_video, sizeof(uint32_t)); + payload.streaming_video = streaming_video; + dispatcher_send_message(&now->dispatcher, + RED_WORKER_MESSAGE_SET_STREAMING_VIDEO, + &payload); now = now->next; } } void red_dispatcher_set_mouse_mode(uint32_t mode) { + RedWorkerMessageSetMouseMode payload; RedDispatcher *now = dispatchers; while (now) { - RedWorkerMessage message = RED_WORKER_MESSAGE_SET_MOUSE_MODE; - write_message(now->channel, &message); - send_data(now->channel, &mode, sizeof(uint32_t)); + payload.mode = mode; + dispatcher_send_message(&now->dispatcher, + RED_WORKER_MESSAGE_SET_MOUSE_MODE, + &payload); now = now->next; } } @@ -873,34 +935,31 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher, static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE; + RedWorkerMessageDisplayChannelCreate payload; RedChannel *display_channel; - write_message(dispatcher->channel, &message); - - receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *)); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE, + &payload); + receive_data(dispatcher->dispatcher.send_fd, &display_channel, sizeof(RedChannel *)); return display_channel; } static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher) { - RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE; + RedWorkerMessageCursorChannelCreate payload; RedChannel *cursor_channel; - write_message(dispatcher->channel, &message); - - receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *)); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); + dispatcher_send_message(&dispatcher->dispatcher, + RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE, + &payload); + receive_data(dispatcher->dispatcher.send_fd, &cursor_channel, sizeof(RedChannel *)); return cursor_channel; } RedDispatcher *red_dispatcher_init(QXLInstance *qxl) { - RedDispatcher *dispatcher; - int channels[2]; + RedDispatcher *red_dispatcher; RedWorkerMessage message; WorkerInitData init_data; QXLDevInitInfo init_info; @@ -917,45 +976,41 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl) gl_canvas_init(); #endif // USE_OPENGL - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { - red_error("socketpair failed %s", strerror(errno)); - } - - dispatcher = spice_new0(RedDispatcher, 1); - dispatcher->channel = channels[0]; - ring_init(&dispatcher->async_commands); - DBG_ASYNC("dispatcher->async_commands.next %p", dispatcher->async_commands.next); - init_data.qxl = dispatcher->qxl = qxl; + red_dispatcher = spice_new0(RedDispatcher, 1); + ring_init(&red_dispatcher->async_commands); + DBG_ASYNC("red_dispatcher->async_commands.next %p", red_dispatcher->async_commands.next); + dispatcher_init(&red_dispatcher->dispatcher, RED_WORKER_MESSAGE_COUNT, NULL); + init_data.qxl = red_dispatcher->qxl = qxl; init_data.id = qxl->id; - init_data.channel = channels[1]; - init_data.pending = &dispatcher->pending; + init_data.red_dispatcher = red_dispatcher; + init_data.pending = &red_dispatcher->pending; init_data.num_renderers = num_renderers; memcpy(init_data.renderers, renderers, sizeof(init_data.renderers)); - pthread_mutex_init(&dispatcher->async_lock, NULL); + pthread_mutex_init(&red_dispatcher->async_lock, NULL); init_data.image_compression = image_compression; init_data.jpeg_state = jpeg_state; init_data.zlib_glz_state = zlib_glz_state; init_data.streaming_video = streaming_video; - dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR; - dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR; - dispatcher->base.wakeup = qxl_worker_wakeup; - dispatcher->base.oom = qxl_worker_oom; - dispatcher->base.start = qxl_worker_start; - dispatcher->base.stop = qxl_worker_stop; - dispatcher->base.update_area = qxl_worker_update_area; - dispatcher->base.add_memslot = qxl_worker_add_memslot; - dispatcher->base.del_memslot = qxl_worker_del_memslot; - dispatcher->base.reset_memslots = qxl_worker_reset_memslots; - dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces; - dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface; - dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface; - - dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache; - dispatcher->base.reset_cursor = qxl_worker_reset_cursor; - dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait; - dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands; + red_dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR; + red_dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR; + red_dispatcher->base.wakeup = qxl_worker_wakeup; + red_dispatcher->base.oom = qxl_worker_oom; + red_dispatcher->base.start = qxl_worker_start; + red_dispatcher->base.stop = qxl_worker_stop; + red_dispatcher->base.update_area = qxl_worker_update_area; + red_dispatcher->base.add_memslot = qxl_worker_add_memslot; + red_dispatcher->base.del_memslot = qxl_worker_del_memslot; + red_dispatcher->base.reset_memslots = qxl_worker_reset_memslots; + red_dispatcher->base.destroy_surfaces = qxl_worker_destroy_surfaces; + red_dispatcher->base.create_primary_surface = qxl_worker_create_primary_surface; + red_dispatcher->base.destroy_primary_surface = qxl_worker_destroy_primary_surface; + + red_dispatcher->base.reset_image_cache = qxl_worker_reset_image_cache; + red_dispatcher->base.reset_cursor = qxl_worker_reset_cursor; + red_dispatcher->base.destroy_surface_wait = qxl_worker_destroy_surface_wait; + red_dispatcher->base.loadvm_commands = qxl_worker_loadvm_commands; qxl->st->qif->get_init_info(qxl, &init_info); @@ -965,7 +1020,6 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl) init_data.num_memslots_groups = init_info.num_memslots_groups; init_data.internal_groupslot_id = init_info.internal_groupslot_id; init_data.n_surfaces = init_info.n_surfaces; - init_data.dispatcher = dispatcher; num_active_workers = 1; @@ -974,40 +1028,51 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl) sigdelset(&thread_sig_mask, SIGFPE); sigdelset(&thread_sig_mask, SIGSEGV); pthread_sigmask(SIG_SETMASK, &thread_sig_mask, &curr_sig_mask); - if ((r = pthread_create(&dispatcher->worker_thread, NULL, red_worker_main, &init_data))) { + if ((r = pthread_create(&red_dispatcher->worker_thread, NULL, red_worker_main, &init_data))) { red_error("create thread failed %d", r); } pthread_sigmask(SIG_SETMASK, &curr_sig_mask, NULL); - read_message(dispatcher->channel, &message); + read_message(red_dispatcher->dispatcher.send_fd, &message); ASSERT(message == RED_WORKER_MESSAGE_READY); - display_channel = red_dispatcher_display_channel_create(dispatcher); + display_channel = red_dispatcher_display_channel_create(red_dispatcher); if (display_channel) { client_cbs.connect = red_dispatcher_set_display_peer; client_cbs.disconnect = red_dispatcher_disconnect_display_peer; client_cbs.migrate = red_dispatcher_display_migrate; red_channel_register_client_cbs(display_channel, &client_cbs); - red_channel_set_data(display_channel, dispatcher); + red_channel_set_data(display_channel, red_dispatcher); reds_register_channel(display_channel); } - cursor_channel = red_dispatcher_cursor_channel_create(dispatcher); + cursor_channel = red_dispatcher_cursor_channel_create(red_dispatcher); if (cursor_channel) { client_cbs.connect = red_dispatcher_set_cursor_peer; client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer; client_cbs.migrate = red_dispatcher_cursor_migrate; red_channel_register_client_cbs(cursor_channel, &client_cbs); - red_channel_set_data(cursor_channel, dispatcher); + red_channel_set_data(cursor_channel, red_dispatcher); reds_register_channel(cursor_channel); } - qxl->st->qif->attache_worker(qxl, &dispatcher->base); + qxl->st->qif->attache_worker(qxl, &red_dispatcher->base); qxl->st->qif->set_compression_level(qxl, calc_compression_level()); - dispatcher->next = dispatchers; - dispatchers = dispatcher; - return dispatcher; + red_dispatcher->next = dispatchers; + dispatchers = red_dispatcher; + return red_dispatcher; +} + +struct Dispatcher *red_dispatcher_get_dispatcher(RedDispatcher *red_dispatcher) +{ + return &red_dispatcher->dispatcher; +} + +void red_dispatcher_set_dispatcher_opaque(struct RedDispatcher *red_dispatcher, + void *opaque) +{ + dispatcher_set_opaque(&red_dispatcher->dispatcher, opaque); } diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h index c2582f43..7417aac1 100644 --- a/server/red_dispatcher.h +++ b/server/red_dispatcher.h @@ -32,5 +32,154 @@ int red_dispatcher_add_renderer(const char *name); uint32_t red_dispatcher_qxl_ram_size(void); int red_dispatcher_qxl_count(void); void red_dispatcher_async_complete(struct RedDispatcher *, AsyncCommand *); +struct Dispatcher *red_dispatcher_get_dispatcher(struct RedDispatcher *); + +typedef struct RedWorkerMessageDisplayConnect { + RedClient * client; + RedsStream * stream; + int migration; +} RedWorkerMessageDisplayConnect; + +typedef struct RedWorkerMessageDisplayDisconnect { + RedChannelClient *rcc; +} RedWorkerMessageDisplayDisconnect; + +typedef struct RedWorkerMessageDisplayMigrate { + RedChannelClient *rcc; +} RedWorkerMessageDisplayMigrate; + +typedef struct RedWorkerMessageCursorConnect { + RedClient *client; + RedsStream *stream; + int migration; +} RedWorkerMessageCursorConnect; + +typedef struct RedWorkerMessageCursorDisconnect { + RedChannelClient *rcc; +} RedWorkerMessageCursorDisconnect; + +typedef struct RedWorkerMessageCursorMigrate { + RedChannelClient *rcc; +} RedWorkerMessageCursorMigrate; + +typedef struct RedWorkerMessageUpdate { + uint32_t surface_id; + QXLRect * qxl_area; + QXLRect * qxl_dirty_rects; + uint32_t num_dirty_rects; + uint32_t clear_dirty_region; +} RedWorkerMessageUpdate; + +typedef struct RedWorkerMessageAsync { + AsyncCommand *cmd; +} RedWorkerMessageAsync; + +typedef struct RedWorkerMessageUpdateAsync { + RedWorkerMessageAsync base; + uint32_t surface_id; + QXLRect qxl_area; + uint32_t clear_dirty_region; +} RedWorkerMessageUpdateAsync; + +typedef struct RedWorkerMessageAddMemslot { + QXLDevMemSlot mem_slot; +} RedWorkerMessageAddMemslot; + +typedef struct RedWorkerMessageAddMemslotAsync { + RedWorkerMessageAsync base; + QXLDevMemSlot mem_slot; +} RedWorkerMessageAddMemslotAsync; + +typedef struct RedWorkerMessageDelMemslot { + uint32_t slot_group_id; + uint32_t slot_id; +} RedWorkerMessageDelMemslot; + +typedef struct RedWorkerMessageDestroySurfaces { +} RedWorkerMessageDestroySurfaces; + +typedef struct RedWorkerMessageDestroySurfacesAsync { + RedWorkerMessageAsync base; +} RedWorkerMessageDestroySurfacesAsync; + + +typedef struct RedWorkerMessageDestroyPrimarySurface { + uint32_t surface_id; +} RedWorkerMessageDestroyPrimarySurface; + +typedef struct RedWorkerMessageDestroyPrimarySurfaceAsync { + RedWorkerMessageAsync base; + uint32_t surface_id; +} RedWorkerMessageDestroyPrimarySurfaceAsync; + +typedef struct RedWorkerMessageCreatePrimarySurfaceAsync { + RedWorkerMessageAsync base; + uint32_t surface_id; + QXLDevSurfaceCreate surface; +} RedWorkerMessageCreatePrimarySurfaceAsync; + +typedef struct RedWorkerMessageCreatePrimarySurface { + uint32_t surface_id; + QXLDevSurfaceCreate surface; +} RedWorkerMessageCreatePrimarySurface; + +typedef struct RedWorkerMessageResetImageCache { +} RedWorkerMessageResetImageCache; + +typedef struct RedWorkerMessageResetCursor { +} RedWorkerMessageResetCursor; + +typedef struct RedWorkerMessageWakeup { +} RedWorkerMessageWakeup; + +typedef struct RedWorkerMessageOom { +} RedWorkerMessageOom; + +typedef struct RedWorkerMessageStart { +} RedWorkerMessageStart; + +typedef struct RedWorkerMessageFlushSurfacesAsync { + RedWorkerMessageAsync base; +} RedWorkerMessageFlushSurfacesAsync; + +typedef struct RedWorkerMessageStop { +} RedWorkerMessageStop; + +/* this command is sync, so it's ok to pass a pointer */ +typedef struct RedWorkerMessageLoadvmCommands { + uint32_t count; + QXLCommandExt *ext; +} RedWorkerMessageLoadvmCommands; + +typedef struct RedWorkerMessageSetCompression { + spice_image_compression_t image_compression; +} RedWorkerMessageSetCompression; + +typedef struct RedWorkerMessageSetStreamingVideo { + uint32_t streaming_video; +} RedWorkerMessageSetStreamingVideo; + +typedef struct RedWorkerMessageSetMouseMode { + uint32_t mode; +} RedWorkerMessageSetMouseMode; + +typedef struct RedWorkerMessageDisplayChannelCreate { +} RedWorkerMessageDisplayChannelCreate; + +typedef struct RedWorkerMessageCursorChannelCreate { +} RedWorkerMessageCursorChannelCreate; + +typedef struct RedWorkerMessageDestroySurfaceWait { + uint32_t surface_id; +} RedWorkerMessageDestroySurfaceWait; + +typedef struct RedWorkerMessageDestroySurfaceWaitAsync { + RedWorkerMessageAsync base; + uint32_t surface_id; +} RedWorkerMessageDestroySurfaceWaitAsync; + +typedef struct RedWorkerMessageResetMemslots { +} RedWorkerMessageResetMemslots; + #endif diff --git a/server/red_worker.c b/server/red_worker.c index de8a8202..cb48f09e 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -72,6 +72,7 @@ #include "zlib_encoder.h" #include "red_channel.h" #include "red_dispatcher.h" +#include "dispatcher.h" #include "main_channel.h" //#define COMPRESS_STAT @@ -865,9 +866,10 @@ typedef struct RedWorker { DisplayChannel *display_channel; CursorChannel *cursor_channel; QXLInstance *qxl; - RedDispatcher *dispatcher; - int id; + RedDispatcher *red_dispatcher; + int channel; + int id; int running; uint32_t *pending; int epoll; @@ -10103,21 +10105,19 @@ static void surface_dirty_region_to_rects(RedSurface *surface, free(dirty_rects); } -static inline void handle_dev_update_async(RedWorker *worker) +void handle_dev_update_async(void *opaque, void *payload) { - QXLRect qxl_rect; + RedWorker *worker = opaque; + RedWorkerMessageUpdateAsync *msg = payload; SpiceRect rect; - uint32_t surface_id; - uint32_t clear_dirty_region; QXLRect *qxl_dirty_rects; uint32_t num_dirty_rects; RedSurface *surface; + uint32_t surface_id = msg->surface_id; + QXLRect qxl_area = msg->qxl_area; + uint32_t clear_dirty_region = msg->clear_dirty_region; - receive_data(worker->channel, &surface_id, sizeof(uint32_t)); - receive_data(worker->channel, &qxl_rect, sizeof(QXLRect)); - receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t)); - - red_get_rect_ptr(&rect, &qxl_rect); + red_get_rect_ptr(&rect, &qxl_area); flush_display_commands(worker); ASSERT(worker->running); @@ -10140,24 +10140,20 @@ static inline void handle_dev_update_async(RedWorker *worker) free(qxl_dirty_rects); } -static inline void handle_dev_update(RedWorker *worker) +void handle_dev_update(void *opaque, void *payload) { - const QXLRect *qxl_rect; + RedWorker *worker = opaque; + RedWorkerMessageUpdate *msg = payload; SpiceRect *rect = spice_new0(SpiceRect, 1); - QXLRect *qxl_dirty_rects; RedSurface *surface; - uint32_t num_dirty_rects; - uint32_t surface_id; - uint32_t clear_dirty_region; - - receive_data(worker->channel, &surface_id, sizeof(uint32_t)); - receive_data(worker->channel, &qxl_rect, sizeof(QXLRect *)); - receive_data(worker->channel, &qxl_dirty_rects, sizeof(QXLRect *)); - receive_data(worker->channel, &num_dirty_rects, sizeof(uint32_t)); - receive_data(worker->channel, &clear_dirty_region, sizeof(uint32_t)); + uint32_t surface_id = msg->surface_id; + const QXLRect *qxl_area = msg->qxl_area; + uint32_t num_dirty_rects = msg->num_dirty_rects; + QXLRect *qxl_dirty_rects = msg->qxl_dirty_rects; + uint32_t clear_dirty_region = msg->clear_dirty_region; surface = &worker->surfaces[surface_id]; - red_get_rect_ptr(rect, qxl_rect); + red_get_rect_ptr(rect, qxl_area); flush_display_commands(worker); ASSERT(worker->running); @@ -10170,28 +10166,36 @@ static inline void handle_dev_update(RedWorker *worker) clear_dirty_region); } -static inline void handle_dev_add_memslot(RedWorker *worker) +static void dev_add_memslot(RedWorker *worker, QXLDevMemSlot mem_slot) { - QXLDevMemSlot dev_slot; + red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id, + mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end, + mem_slot.generation); +} - receive_data(worker->channel, &dev_slot, sizeof(QXLDevMemSlot)); +void handle_dev_add_memslot(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + RedWorkerMessageAddMemslot *msg = payload; + QXLDevMemSlot mem_slot = msg->mem_slot; - red_memslot_info_add_slot(&worker->mem_slots, dev_slot.slot_group_id, dev_slot.slot_id, - dev_slot.addr_delta, dev_slot.virt_start, dev_slot.virt_end, - dev_slot.generation); + red_memslot_info_add_slot(&worker->mem_slots, mem_slot.slot_group_id, mem_slot.slot_id, + mem_slot.addr_delta, mem_slot.virt_start, mem_slot.virt_end, + mem_slot.generation); } -static inline void handle_dev_del_memslot(RedWorker *worker) +void handle_dev_del_memslot(void *opaque, void *payload) { - uint32_t slot_id; - uint32_t slot_group_id; - - receive_data(worker->channel, &slot_group_id, sizeof(uint32_t)); - receive_data(worker->channel, &slot_id, sizeof(uint32_t)); + RedWorker *worker = opaque; + RedWorkerMessageDelMemslot *msg = payload; + uint32_t slot_id = msg->slot_id; + uint32_t slot_group_id = msg->slot_group_id; red_memslot_info_del_slot(&worker->mem_slots, slot_group_id, slot_id); } +/* TODO: destroy_surface_wait, dev_destroy_surface_wait - confusing. one asserts + * surface_id == 0, maybe move the assert upward and merge the two functions? */ static inline void destroy_surface_wait(RedWorker *worker, int surface_id) { if (!worker->surfaces[surface_id].context.canvas) { @@ -10206,12 +10210,8 @@ static inline void destroy_surface_wait(RedWorker *worker, int surface_id) red_clear_surface_drawables_from_pipes(worker, surface_id, TRUE, TRUE); } -static inline void handle_dev_destroy_surface_wait(RedWorker *worker) +static void dev_destroy_surface_wait(RedWorker *worker, uint32_t surface_id) { - uint32_t surface_id; - - receive_data(worker->channel, &surface_id, sizeof(uint32_t)); - ASSERT(surface_id == 0); flush_all_qxl_commands(worker); @@ -10221,6 +10221,14 @@ static inline void handle_dev_destroy_surface_wait(RedWorker *worker) } } +void handle_dev_destroy_surface_wait(void *opaque, void *payload) +{ + RedWorkerMessageDestroySurfaceWait *msg = payload; + RedWorker *worker = opaque; + + dev_destroy_surface_wait(worker, msg->surface_id); +} + static inline void red_cursor_reset(RedWorker *worker) { if (worker->cursor) { @@ -10244,7 +10252,7 @@ static inline void red_cursor_reset(RedWorker *worker) /* called upon device reset */ /* TODO: split me*/ -static inline void handle_dev_destroy_surfaces(RedWorker *worker) +static inline void dev_destroy_surfaces(RedWorker *worker) { int i; @@ -10273,14 +10281,17 @@ static inline void handle_dev_destroy_surfaces(RedWorker *worker) red_cursor_reset(worker); } -static inline void handle_dev_create_primary_surface(RedWorker *worker) +void handle_dev_destroy_surfaces(void *opaque, void *payload) { - uint32_t surface_id; - QXLDevSurfaceCreate surface; - uint8_t *line_0; + RedWorker *worker = opaque; - receive_data(worker->channel, &surface_id, sizeof(uint32_t)); - receive_data(worker->channel, &surface, sizeof(QXLDevSurfaceCreate)); + dev_destroy_surfaces(worker); +} + +static void dev_create_primary_surface(RedWorker *worker, uint32_t surface_id, + QXLDevSurfaceCreate surface) +{ + uint8_t *line_0; PANIC_ON(surface_id != 0); PANIC_ON(surface.height == 0); @@ -10308,12 +10319,16 @@ static inline void handle_dev_create_primary_surface(RedWorker *worker) } } -static inline void handle_dev_destroy_primary_surface(RedWorker *worker) +void handle_dev_create_primary_surface(void *opaque, void *payload) { - uint32_t surface_id; + RedWorkerMessageCreatePrimarySurface *msg = payload; + RedWorker *worker = opaque; - receive_data(worker->channel, &surface_id, sizeof(uint32_t)); + dev_create_primary_surface(worker, msg->surface_id, msg->surface); +} +static void dev_destroy_primary_surface(RedWorker *worker, uint32_t surface_id) +{ PANIC_ON(surface_id != 0); if (!worker->surfaces[surface_id].context.canvas) { @@ -10322,7 +10337,7 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker) } flush_all_qxl_commands(worker); - destroy_surface_wait(worker, 0); + dev_destroy_surface_wait(worker, 0); red_destroy_surface(worker, 0); ASSERT(ring_is_empty(&worker->streams)); @@ -10331,6 +10346,24 @@ static inline void handle_dev_destroy_primary_surface(RedWorker *worker) red_cursor_reset(worker); } +void handle_dev_destroy_primary_surface(void *opaque, void *payload) +{ + RedWorkerMessageDestroyPrimarySurface *msg = payload; + RedWorker *worker = opaque; + uint32_t surface_id = msg->surface_id; + + dev_destroy_primary_surface(worker, surface_id); +} + +void handle_dev_destroy_primary_surface_async(void *opaque, void *payload) +{ + RedWorkerMessageDestroyPrimarySurfaceAsync *msg = payload; + RedWorker *worker = opaque; + uint32_t surface_id = msg->surface_id; + + dev_destroy_primary_surface(worker, surface_id); +} + static void flush_all_surfaces(RedWorker *worker) { int x; @@ -10342,7 +10375,7 @@ static void flush_all_surfaces(RedWorker *worker) } } -static void handle_dev_flush_surfaces(RedWorker *worker) +static void dev_flush_surfaces(RedWorker *worker) { flush_all_qxl_commands(worker); flush_all_surfaces(worker); @@ -10350,8 +10383,25 @@ static void handle_dev_flush_surfaces(RedWorker *worker) red_wait_outgoing_items(&worker->cursor_channel->common.base); } -static void handle_dev_stop(RedWorker *worker) +void handle_dev_flush_surfaces(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + + dev_flush_surfaces(worker); +} + +void handle_dev_flush_surfaces_async(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + + dev_flush_surfaces(worker); +} + +void handle_dev_stop(void *opaque, void *payload) { + RedWorker *worker = opaque; + + red_printf("stop"); ASSERT(worker->running); worker->running = FALSE; red_display_clear_glz_drawables(worker->display_channel); @@ -10360,8 +10410,9 @@ static void handle_dev_stop(RedWorker *worker) red_wait_outgoing_items(&worker->cursor_channel->common.base); } -static void handle_dev_start(RedWorker *worker) +void handle_dev_start(void *opaque, void *payload) { + RedWorker *worker = opaque; RedChannel *cursor_red_channel = &worker->cursor_channel->common.base; RedChannel *display_red_channel = &worker->display_channel->common.base; @@ -10375,308 +10426,488 @@ static void handle_dev_start(RedWorker *worker) worker->running = TRUE; } -static void handle_dev_input(EventListener *listener, uint32_t events) +void handle_dev_wakeup(void *opaque, void *payload) { - RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener); - RedWorkerMessage message; + RedWorker *worker = opaque; + + clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending); + stat_inc_counter(worker->wakeup_counter, 1); +} + +void handle_dev_oom(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + RedChannel *display_red_channel = &worker->display_channel->common.base; int ring_is_empty; - int call_async_complete = 0; - int write_ready = 0; - AsyncCommand *cmd; - - read_message(worker->channel, &message); - - /* for async messages we do the common work in the handler, and - * send a ready or call async_complete from here, hence the added switch. */ - switch (message) { - case RED_WORKER_MESSAGE_UPDATE_ASYNC: - case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC: - case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC: - case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC: - case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC: - case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC: - case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC: - call_async_complete = 1; - receive_data(worker->channel, &cmd, sizeof(cmd)); - break; - case RED_WORKER_MESSAGE_UPDATE: - case RED_WORKER_MESSAGE_ADD_MEMSLOT: - case RED_WORKER_MESSAGE_DESTROY_SURFACES: - case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE: - case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE: - case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT: - case RED_WORKER_MESSAGE_RESET_CURSOR: - case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE: - case RED_WORKER_MESSAGE_STOP: - case RED_WORKER_MESSAGE_LOADVM_COMMANDS: - case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: - case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: - case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: - case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: - write_ready = 1; - default: - break; - } - switch (message) { - case RED_WORKER_MESSAGE_UPDATE_ASYNC: - handle_dev_update_async(worker); - break; - case RED_WORKER_MESSAGE_UPDATE: - handle_dev_update(worker); - break; - case RED_WORKER_MESSAGE_WAKEUP: - clear_bit(RED_WORKER_PENDING_WAKEUP, worker->pending); - stat_inc_counter(worker->wakeup_counter, 1); - break; - case RED_WORKER_MESSAGE_OOM: - ASSERT(worker->running); - // streams? but without streams also leak - red_printf_debug(1, "WORKER", - "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u", - worker->drawable_count, - worker->red_drawable_count, - worker->glz_drawable_count, - worker->current_size, - worker->display_channel ? - red_channel_sum_pipes_size(display_red_channel) : 0); - while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) { - red_channel_push(&worker->display_channel->common.base); - } - if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) { - red_free_some(worker); - worker->qxl->st->qif->flush_resources(worker->qxl); - } - red_printf_debug(1, "WORKER", - "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u", - worker->drawable_count, - worker->red_drawable_count, - worker->glz_drawable_count, - worker->current_size, - worker->display_channel ? - red_channel_sum_pipes_size(display_red_channel) : 0); - clear_bit(RED_WORKER_PENDING_OOM, worker->pending); - break; - case RED_WORKER_MESSAGE_RESET_CURSOR: - red_cursor_reset(worker); - break; - case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE: - image_cache_reset(&worker->image_cache); - break; - case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC: - case RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT: - handle_dev_destroy_surface_wait(worker); - break; - case RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC: - case RED_WORKER_MESSAGE_DESTROY_SURFACES: - handle_dev_destroy_surfaces(worker); - break; - case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC: - case RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE: - handle_dev_create_primary_surface(worker); - break; - case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC: - case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE: - handle_dev_destroy_primary_surface(worker); - break; - case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: { - RedChannel *red_channel; - // TODO: handle seemless migration. Temp, setting migrate to FALSE - ensure_display_channel_created(worker, FALSE); - red_channel = &worker->display_channel->common.base; - send_data(worker->channel, &red_channel, sizeof(RedChannel *)); - break; + ASSERT(worker->running); + // streams? but without streams also leak + red_printf_debug(1, "WORKER", + "OOM1 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u", + worker->drawable_count, + worker->red_drawable_count, + worker->glz_drawable_count, + worker->current_size, + worker->display_channel ? + red_channel_sum_pipes_size(display_red_channel) : 0); + while (red_process_commands(worker, MAX_PIPE_SIZE, &ring_is_empty)) { + red_channel_push(&worker->display_channel->common.base); } - case RED_WORKER_MESSAGE_DISPLAY_CONNECT: { - RedsStream *stream; - RedClient *client; - int migrate; - red_printf("connect"); - - receive_data(worker->channel, &client, sizeof(RedClient *)); - receive_data(worker->channel, &stream, sizeof(RedsStream *)); - receive_data(worker->channel, &migrate, sizeof(int)); - handle_new_display_channel(worker, client, stream, migrate); - break; + if (worker->qxl->st->qif->flush_resources(worker->qxl) == 0) { + red_free_some(worker); + worker->qxl->st->qif->flush_resources(worker->qxl); } - case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: { - RedChannelClient *rcc; + red_printf_debug(1, "WORKER", + "OOM2 #draw=%u, #red_draw=%u, #glz_draw=%u current %u pipes %u", + worker->drawable_count, + worker->red_drawable_count, + worker->glz_drawable_count, + worker->current_size, + worker->display_channel ? + red_channel_sum_pipes_size(display_red_channel) : 0); + clear_bit(RED_WORKER_PENDING_OOM, worker->pending); +} - red_printf("disconnect display client"); - receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); - ASSERT(rcc); - display_channel_client_disconnect(rcc); - break; - } - case RED_WORKER_MESSAGE_STOP: { - red_printf("stop"); - handle_dev_stop(worker); +void handle_dev_reset_cursor(void *opaque, void *payload) +{ + red_cursor_reset((RedWorker *)opaque); +} + +void handle_dev_reset_image_cache(void *opaque, void *payload) +{ + image_cache_reset(&((RedWorker *)opaque)->image_cache); +} + +void handle_dev_destroy_surface_wait_async(void *opaque, void *payload) +{ + RedWorkerMessageDestroySurfaceWaitAsync *msg = payload; + RedWorker *worker = opaque; + + dev_destroy_surface_wait(worker, msg->surface_id); +} + +void handle_dev_destroy_surfaces_async(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + + dev_destroy_surfaces(worker); +} + +void handle_dev_create_primary_surface_async(void *opaque, void *payload) +{ + RedWorkerMessageCreatePrimarySurfaceAsync *msg = payload; + RedWorker *worker = opaque; + + dev_create_primary_surface(worker, msg->surface_id, msg->surface); +} + +/* exception for Dispatcher, data going from red_worker to main thread, + * TODO: use a different dispatcher? + * TODO: leave direct usage of channel(fd)? It's only used right after the + * pthread is created, since the channel duration is the lifetime of the spice + * server. */ + +void handle_dev_display_channel_create(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + + RedChannel *red_channel; + // TODO: handle seemless migration. Temp, setting migrate to FALSE + ensure_display_channel_created(worker, FALSE); + red_channel = &worker->display_channel->common.base; + send_data(worker->channel, &red_channel, sizeof(RedChannel *)); +} + +void handle_dev_display_connect(void *opaque, void *payload) +{ + RedWorkerMessageDisplayConnect *msg = payload; + RedWorker *worker = opaque; + RedsStream *stream = msg->stream; + RedClient *client = msg->client; + int migration = msg->migration; + + red_printf("connect"); + handle_new_display_channel(worker, client, stream, migration); +} + +void handle_dev_display_disconnect(void *opaque, void *payload) +{ + RedWorkerMessageDisplayDisconnect *msg = payload; + RedChannelClient *rcc = msg->rcc; + + red_printf("disconnect display client"); + ASSERT(rcc); + display_channel_client_disconnect(rcc); +} + +void handle_dev_display_migrate(void *opaque, void *payload) +{ + RedWorkerMessageDisplayMigrate *msg = payload; + RedWorker *worker = opaque; + + RedChannelClient *rcc = msg->rcc; + red_printf("migrate display client"); + ASSERT(rcc); + red_migrate_display(worker, rcc); +} + +/* TODO: special, perhaps use another dispatcher? */ +void handle_dev_cursor_channel_create(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + RedChannel *red_channel; + + // TODO: handle seemless migration. Temp, setting migrate to FALSE + ensure_cursor_channel_created(worker, FALSE); + red_channel = &worker->cursor_channel->common.base; + send_data(worker->channel, &red_channel, sizeof(RedChannel *)); +} + +void handle_dev_cursor_connect(void *opaque, void *payload) +{ + RedWorkerMessageCursorConnect *msg = payload; + RedWorker *worker = opaque; + RedsStream *stream = msg->stream; + RedClient *client = msg->client; + int migration = msg->migration; + + red_printf("cursor connect"); + red_connect_cursor(worker, client, stream, migration); +} + +void handle_dev_cursor_disconnect(void *opaque, void *payload) +{ + RedWorkerMessageCursorDisconnect *msg = payload; + RedChannelClient *rcc = msg->rcc; + + red_printf("disconnect cursor client"); + ASSERT(rcc); + cursor_channel_client_disconnect(rcc); +} + +void handle_dev_cursor_migrate(void *opaque, void *payload) +{ + RedWorkerMessageCursorMigrate *msg = payload; + RedWorker *worker = opaque; + RedChannelClient *rcc = msg->rcc; + + red_printf("migrate cursor client"); + ASSERT(rcc); + red_migrate_cursor(worker, rcc); +} + +void handle_dev_set_compression(void *opaque, void *payload) +{ + RedWorkerMessageSetCompression *msg = payload; + RedWorker *worker = opaque; + + worker->image_compression = msg->image_compression; + switch (worker->image_compression) { + case SPICE_IMAGE_COMPRESS_AUTO_LZ: + red_printf("ic auto_lz"); break; - } - case RED_WORKER_MESSAGE_START: - red_printf("start"); - handle_dev_start(worker); + case SPICE_IMAGE_COMPRESS_AUTO_GLZ: + red_printf("ic auto_glz"); break; - case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: { - RedChannelClient *rcc; - red_printf("migrate display client"); - receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); - ASSERT(rcc); - red_migrate_display(worker, rcc); + case SPICE_IMAGE_COMPRESS_QUIC: + red_printf("ic quic"); break; - } - case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: { - RedChannel *red_channel; - // TODO: handle seemless migration. Temp, setting migrate to FALSE - ensure_cursor_channel_created(worker, FALSE); - red_channel = &worker->cursor_channel->common.base; - send_data(worker->channel, &red_channel, sizeof(RedChannel *)); + case SPICE_IMAGE_COMPRESS_LZ: + red_printf("ic lz"); break; - } - case RED_WORKER_MESSAGE_CURSOR_CONNECT: { - RedsStream *stream; - RedClient *client; - int migrate; - - red_printf("cursor connect"); - receive_data(worker->channel, &client, sizeof(RedClient *)); - receive_data(worker->channel, &stream, sizeof(RedsStream *)); - receive_data(worker->channel, &migrate, sizeof(int)); - red_connect_cursor(worker, client, stream, migrate); + case SPICE_IMAGE_COMPRESS_GLZ: + red_printf("ic glz"); break; - } - case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: { - RedChannelClient *rcc; - - red_printf("disconnect cursor client"); - receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); - ASSERT(rcc); - cursor_channel_client_disconnect(rcc); + case SPICE_IMAGE_COMPRESS_OFF: + red_printf("ic off"); break; + default: + red_printf("ic invalid"); } - case RED_WORKER_MESSAGE_CURSOR_MIGRATE: { - RedChannelClient *rcc; - red_printf("migrate cursor client"); - receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); - ASSERT(rcc); - red_migrate_cursor(worker, rcc); - break; +#ifdef COMPRESS_STAT + print_compress_stats(worker->display_channel); + if (worker->display_channel) { + stat_reset(&worker->display_channel->quic_stat); + stat_reset(&worker->display_channel->lz_stat); + stat_reset(&worker->display_channel->glz_stat); + stat_reset(&worker->display_channel->jpeg_stat); + stat_reset(&worker->display_channel->zlib_glz_stat); + stat_reset(&worker->display_channel->jpeg_alpha_stat); } - case RED_WORKER_MESSAGE_SET_COMPRESSION: - receive_data(worker->channel, &worker->image_compression, - sizeof(spice_image_compression_t)); - switch (worker->image_compression) { - case SPICE_IMAGE_COMPRESS_AUTO_LZ: - red_printf("ic auto_lz"); - break; - case SPICE_IMAGE_COMPRESS_AUTO_GLZ: - red_printf("ic auto_glz"); +#endif +} + +void handle_dev_set_streaming_video(void *opaque, void *payload) +{ + RedWorkerMessageSetStreamingVideo *msg = payload; + RedWorker *worker = opaque; + + worker->streaming_video = msg->streaming_video; + ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID); + switch(worker->streaming_video) { + case STREAM_VIDEO_ALL: + red_printf("sv all"); break; - case SPICE_IMAGE_COMPRESS_QUIC: - red_printf("ic quic"); + case STREAM_VIDEO_FILTER: + red_printf("sv filter"); break; - case SPICE_IMAGE_COMPRESS_LZ: - red_printf("ic lz"); + case STREAM_VIDEO_OFF: + red_printf("sv off"); break; - case SPICE_IMAGE_COMPRESS_GLZ: - red_printf("ic glz"); + default: + red_printf("sv invalid"); + } +} + +void handle_dev_set_mouse_mode(void *opaque, void *payload) +{ + RedWorkerMessageSetMouseMode *msg = payload; + RedWorker *worker = opaque; + + worker->mouse_mode = msg->mode; + red_printf("mouse mode %u", worker->mouse_mode); +} + +void handle_dev_add_memslot_async(void *opaque, void *payload) +{ + RedWorkerMessageAddMemslotAsync *msg = payload; + RedWorker *worker = opaque; + + dev_add_memslot(worker, msg->mem_slot); +} + +void handle_dev_reset_memslots(void *opaque, void *payload) +{ + RedWorker *worker = opaque; + + red_memslot_info_reset(&worker->mem_slots); +} + +void handle_dev_loadvm_commands(void *opaque, void *payload) +{ + RedWorkerMessageLoadvmCommands *msg = payload; + RedWorker *worker = opaque; + uint32_t i; + RedCursorCmd *cursor_cmd; + RedSurfaceCmd *surface_cmd; + uint32_t count = msg->count; + QXLCommandExt *ext = msg->ext; + + red_printf("loadvm_commands"); + for (i = 0 ; i < count ; ++i) { + switch (ext[i].cmd.type) { + case QXL_CMD_CURSOR: + cursor_cmd = spice_new0(RedCursorCmd, 1); + red_get_cursor_cmd(&worker->mem_slots, ext[i].group_id, + cursor_cmd, ext[i].cmd.data); + qxl_process_cursor(worker, cursor_cmd, ext[i].group_id); break; - case SPICE_IMAGE_COMPRESS_OFF: - red_printf("ic off"); + case QXL_CMD_SURFACE: + surface_cmd = spice_new0(RedSurfaceCmd, 1); + red_get_surface_cmd(&worker->mem_slots, ext[i].group_id, + surface_cmd, ext[i].cmd.data); + red_process_surface(worker, surface_cmd, ext[i].group_id, TRUE); break; default: - red_printf("ic invalid"); - } -#ifdef COMPRESS_STAT - print_compress_stats(worker->display_channel); - if (worker->display_channel) { - stat_reset(&worker->display_channel->quic_stat); - stat_reset(&worker->display_channel->lz_stat); - stat_reset(&worker->display_channel->glz_stat); - stat_reset(&worker->display_channel->jpeg_stat); - stat_reset(&worker->display_channel->zlib_glz_stat); - stat_reset(&worker->display_channel->jpeg_alpha_stat); - } -#endif - break; - case RED_WORKER_MESSAGE_SET_STREAMING_VIDEO: - receive_data(worker->channel, &worker->streaming_video, sizeof(uint32_t)); - ASSERT(worker->streaming_video != STREAM_VIDEO_INVALID); - switch(worker->streaming_video) { - case STREAM_VIDEO_ALL: - red_printf("sv all"); - break; - case STREAM_VIDEO_FILTER: - red_printf("sv filter"); - break; - case STREAM_VIDEO_OFF: - red_printf("sv off"); - break; - default: - red_printf("sv invalid"); - } - break; - case RED_WORKER_MESSAGE_SET_MOUSE_MODE: - receive_data(worker->channel, &worker->mouse_mode, sizeof(uint32_t)); - red_printf("mouse mode %u", worker->mouse_mode); - break; - case RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC: - case RED_WORKER_MESSAGE_ADD_MEMSLOT: - handle_dev_add_memslot(worker); - break; - case RED_WORKER_MESSAGE_DEL_MEMSLOT: - handle_dev_del_memslot(worker); - break; - case RED_WORKER_MESSAGE_RESET_MEMSLOTS: - red_memslot_info_reset(&worker->mem_slots); - break; - case RED_WORKER_MESSAGE_LOADVM_COMMANDS: { - uint32_t count; - QXLCommandExt ext; - RedCursorCmd *cursor_cmd; - RedSurfaceCmd *surface_cmd; - - red_printf("loadvm_commands"); - receive_data(worker->channel, &count, sizeof(uint32_t)); - while (count > 0) { - receive_data(worker->channel, &ext, sizeof(QXLCommandExt)); - switch (ext.cmd.type) { - case QXL_CMD_CURSOR: - cursor_cmd = spice_new0(RedCursorCmd, 1); - red_get_cursor_cmd(&worker->mem_slots, ext.group_id, - cursor_cmd, ext.cmd.data); - qxl_process_cursor(worker, cursor_cmd, ext.group_id); - break; - case QXL_CMD_SURFACE: - surface_cmd = spice_new0(RedSurfaceCmd, 1); - red_get_surface_cmd(&worker->mem_slots, ext.group_id, - surface_cmd, ext.cmd.data); - red_process_surface(worker, surface_cmd, ext.group_id, TRUE); - break; - default: - red_printf("unhandled loadvm command type (%d)", ext.cmd.type); - break; - } - count--; + red_printf("unhandled loadvm command type (%d)", ext[i].cmd.type); + break; } - break; - } - case RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC: - handle_dev_flush_surfaces(worker); - break; - default: - red_error("message error"); - } - if (call_async_complete) { - red_dispatcher_async_complete(worker->dispatcher, cmd); - } - if (write_ready) { - message = RED_WORKER_MESSAGE_READY; - write_message(worker->channel, &message); } } +static void worker_handle_dispatcher_async_done(void *opaque, + uint32_t message_type, + void *payload) +{ + RedWorker *worker = opaque; + RedWorkerMessageAsync *msg_async = payload; + + red_printf_debug(2, "WORKER", ""); + red_dispatcher_async_complete(worker->red_dispatcher, msg_async->cmd); +} + +static void register_callbacks(Dispatcher *dispatcher) +{ + dispatcher_register_async_done_callback( + dispatcher, + worker_handle_dispatcher_async_done); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DISPLAY_CONNECT, + handle_dev_display_connect, + sizeof(RedWorkerMessageDisplayConnect), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DISPLAY_DISCONNECT, + handle_dev_display_disconnect, + sizeof(RedWorkerMessageDisplayDisconnect), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DISPLAY_MIGRATE, + handle_dev_display_migrate, + sizeof(RedWorkerMessageDisplayMigrate), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_CURSOR_CONNECT, + handle_dev_cursor_connect, + sizeof(RedWorkerMessageCursorConnect), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_CURSOR_DISCONNECT, + handle_dev_cursor_disconnect, + sizeof(RedWorkerMessageCursorDisconnect), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_CURSOR_MIGRATE, + handle_dev_cursor_migrate, + sizeof(RedWorkerMessageCursorMigrate), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_UPDATE, + handle_dev_update, + sizeof(RedWorkerMessageUpdate), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_UPDATE_ASYNC, + handle_dev_update_async, + sizeof(RedWorkerMessageUpdateAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_ADD_MEMSLOT, + handle_dev_add_memslot, + sizeof(RedWorkerMessageAddMemslot), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC, + handle_dev_add_memslot_async, + sizeof(RedWorkerMessageAddMemslotAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DEL_MEMSLOT, + handle_dev_del_memslot, + sizeof(RedWorkerMessageDelMemslot), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DESTROY_SURFACES, + handle_dev_destroy_surfaces, + sizeof(RedWorkerMessageDestroySurfaces), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC, + handle_dev_destroy_surfaces_async, + sizeof(RedWorkerMessageDestroySurfacesAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE, + handle_dev_destroy_primary_surface, + sizeof(RedWorkerMessageDestroyPrimarySurface), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC, + handle_dev_destroy_primary_surface_async, + sizeof(RedWorkerMessageDestroyPrimarySurfaceAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC, + handle_dev_create_primary_surface_async, + sizeof(RedWorkerMessageCreatePrimarySurfaceAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE, + handle_dev_create_primary_surface, + sizeof(RedWorkerMessageCreatePrimarySurface), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_RESET_IMAGE_CACHE, + handle_dev_reset_image_cache, + sizeof(RedWorkerMessageResetImageCache), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_RESET_CURSOR, + handle_dev_reset_cursor, + sizeof(RedWorkerMessageResetCursor), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_WAKEUP, + handle_dev_wakeup, + sizeof(RedWorkerMessageWakeup), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_OOM, + handle_dev_oom, + sizeof(RedWorkerMessageOom), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_START, + handle_dev_start, + sizeof(RedWorkerMessageStart), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC, + handle_dev_flush_surfaces_async, + sizeof(RedWorkerMessageFlushSurfacesAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_STOP, + handle_dev_stop, + sizeof(RedWorkerMessageStop), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_LOADVM_COMMANDS, + handle_dev_loadvm_commands, + sizeof(RedWorkerMessageLoadvmCommands), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_SET_COMPRESSION, + handle_dev_set_compression, + sizeof(RedWorkerMessageSetCompression), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_SET_STREAMING_VIDEO, + handle_dev_set_streaming_video, + sizeof(RedWorkerMessageSetStreamingVideo), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_SET_MOUSE_MODE, + handle_dev_set_mouse_mode, + sizeof(RedWorkerMessageSetMouseMode), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE, + handle_dev_display_channel_create, + sizeof(RedWorkerMessageDisplayChannelCreate), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE, + handle_dev_cursor_channel_create, + sizeof(RedWorkerMessageCursorChannelCreate), + DISPATCHER_NONE); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT, + handle_dev_destroy_surface_wait, + sizeof(RedWorkerMessageDestroySurfaceWait), + DISPATCHER_ACK); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC, + handle_dev_destroy_surface_wait_async, + sizeof(RedWorkerMessageDestroySurfaceWaitAsync), + DISPATCHER_ASYNC); + dispatcher_register_handler(dispatcher, + RED_WORKER_MESSAGE_RESET_MEMSLOTS, + handle_dev_reset_memslots, + sizeof(RedWorkerMessageResetMemslots), + DISPATCHER_NONE); +} + + + +static void handle_dev_input(EventListener *listener, uint32_t events) +{ + RedWorker *worker = SPICE_CONTAINEROF(listener, RedWorker, dev_listener); + + dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher)); +} + static void handle_dev_free(EventListener *ctx) { free(ctx); @@ -10687,14 +10918,18 @@ static void red_init(RedWorker *worker, WorkerInitData *init_data) struct epoll_event event; RedWorkerMessage message; int epoll; + Dispatcher *dispatcher; ASSERT(sizeof(CursorItem) <= QXL_CURSUR_DEVICE_DATA_SIZE); memset(worker, 0, sizeof(RedWorker)); - worker->dispatcher = init_data->dispatcher; + dispatcher = red_dispatcher_get_dispatcher(init_data->red_dispatcher); + dispatcher_set_opaque(dispatcher, worker); + worker->red_dispatcher = init_data->red_dispatcher; worker->qxl = init_data->qxl; worker->id = init_data->id; - worker->channel = init_data->channel; + worker->channel = dispatcher_get_recv_fd(dispatcher); + register_callbacks(dispatcher); worker->pending = init_data->pending; worker->dev_listener.refs = 1; worker->dev_listener.action = handle_dev_input; diff --git a/server/red_worker.h b/server/red_worker.h index 26c43adb..08c7b223 100644 --- a/server/red_worker.h +++ b/server/red_worker.h @@ -84,6 +84,8 @@ enum { RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE, RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE, + + RED_WORKER_MESSAGE_COUNT // LAST }; typedef uint32_t RedWorkerMessage; @@ -102,7 +104,6 @@ typedef struct RedDispatcher RedDispatcher; typedef struct WorkerInitData { struct QXLInstance *qxl; int id; - int channel; uint32_t *pending; uint32_t num_renderers; uint32_t renderers[RED_MAX_RENDERERS]; @@ -116,7 +117,7 @@ typedef struct WorkerInitData { uint8_t memslot_id_bits; uint8_t internal_groupslot_id; uint32_t n_surfaces; - RedDispatcher *dispatcher; + RedDispatcher *red_dispatcher; } WorkerInitData; void *red_worker_main(void *arg); -- cgit