diff options
-rw-r--r-- | server/red_worker.c | 446 |
1 files changed, 40 insertions, 406 deletions
diff --git a/server/red_worker.c b/server/red_worker.c index 3614eb89..aeaaa8b1 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -57,6 +57,7 @@ #include "demarshallers.h" #include "generated_marshallers.h" #include "zlib_encoder.h" +#include "red_channel.h" //#define COMPRESS_STAT //#define DUMP_BITMAP @@ -252,11 +253,6 @@ enum { PIPE_ITEM_TYPE_DESTROY_SURFACE, }; -typedef struct PipeItem { - RingItem link; - int type; -} PipeItem; - typedef struct VerbItem { PipeItem base; uint16_t verb; @@ -347,81 +343,6 @@ typedef struct LocalCursor { #define PALETTE_CACHE_HASH_MASK (PALETTE_CACHE_HASH_SIZE - 1) #define PALETTE_CACHE_HASH_KEY(id) ((id) & PALETTE_CACHE_HASH_MASK) -typedef struct RedChannel RedChannel; -typedef void (*channel_disconnect_proc)(RedChannel *channel); -typedef void (*channel_hold_pipe_item_proc)(RedChannel *channel, PipeItem *item); -typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item); -typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, PipeItem *item, int item_pushed); -typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type, void *message); - -#define MAX_SEND_VEC 100 - -typedef int (*get_outgoing_msg_size_proc)(void *opaque); -typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos); -typedef void (*on_outgoing_error_proc)(void *opaque); -typedef void (*on_outgoing_block_proc)(void *opaque); -typedef void (*on_outgoing_msg_done_proc)(void *opaque); - -typedef struct OutgoingHandler { - void *opaque; - struct iovec vec_buf[MAX_SEND_VEC]; - int vec_size; - struct iovec *vec; - int pos; - int size; - get_outgoing_msg_size_proc get_msg_size; - prepare_outgoing_proc prepare; - on_outgoing_error_proc on_error; - on_outgoing_block_proc on_block; - on_outgoing_msg_done_proc on_msg_done; -#ifdef RED_STATISTICS - uint64_t *out_bytes_counter; -#endif -} OutgoingHandler; - -struct RedChannel { - spice_parse_channel_func_t parser; - RedsStream *stream; - int migrate; - - Ring pipe; - uint32_t pipe_size; - - struct { - uint32_t client_window; - uint32_t generation; - uint32_t client_generation; - uint32_t messages_window; - } ack_data; - - struct { - int blocked; - uint64_t serial; - SpiceDataHeader *header; - SpiceMarshaller *marshaller; - uint32_t size; - uint32_t pos; - void *item; - } send_data; - - struct { - uint8_t buf[RECIVE_BUF_SIZE]; - SpiceDataHeader *message; - uint8_t *now; - uint8_t *end; - } incoming; - - OutgoingHandler outgoing; - - channel_disconnect_proc disconnect; - channel_hold_pipe_item_proc hold_item; - channel_release_pipe_item_proc release_item; - channel_handle_parsed_proc handle_parsed; - channel_send_pipe_item_proc send_item; - - int during_send; -}; - typedef struct ImageItem { PipeItem link; int refs; @@ -637,6 +558,7 @@ typedef struct CommonChannel { EventListener listener; uint32_t id; struct RedWorker *worker; + uint8_t recv_buf[RECIVE_BUF_SIZE]; } CommonChannel; @@ -990,7 +912,6 @@ typedef struct BitmapData { static void red_draw_qxl_drawable(RedWorker *worker, Drawable *drawable); static void red_current_flush(RedWorker *worker, int surface_id); -static void red_channel_push(RedChannel *channel); #ifdef DRAW_ALL #define red_update_area(worker, rect, surface_id) #define red_draw_drawable(worker, item) @@ -1004,9 +925,7 @@ static void red_display_release_stream(DisplayChannel *display, StreamAgent *age static inline void red_detach_stream(RedWorker *worker, Stream *stream); static void red_stop_stream(RedWorker *worker, Stream *stream); static inline void red_stream_maintenance(RedWorker *worker, Drawable *candidate, Drawable *sect); -static inline void red_channel_begin_send_message(RedChannel *channel); static inline void display_begin_send_message(DisplayChannel *channel); -static void red_channel_receive(RedChannel *channel); static void red_release_pixmap_cache(DisplayChannel *channel); static void red_release_glz(DisplayChannel *channel); static void red_freeze_glz(DisplayChannel *channel); @@ -1204,36 +1123,12 @@ static void show_draw_item(RedWorker *worker, DrawItem *draw_item, const char *p draw_item->base.rgn.extents.y2); } -static void red_channel_init_send_data(RedChannel *channel, uint16_t type, PipeItem *item) -{ - if (item) { - channel->hold_item(channel, item); - ASSERT(channel->send_data.item == NULL); - channel->send_data.item = item; - } - channel->send_data.header->type = type; -} - static inline void red_pipe_item_init(PipeItem *item, int type) { ring_item_init(&item->link); item->type = type; } -static inline void red_channel_pipe_add(RedChannel *channel, PipeItem *item) -{ - ASSERT(channel); - channel->pipe_size++; - ring_add(&channel->pipe, &item->link); -} - -static inline void red_channel_pipe_add_after(RedChannel *channel, PipeItem *item, PipeItem *pos) -{ - ASSERT(channel && pos); - channel->pipe_size++; - ring_add_after(&item->link, &pos->link); -} - static inline int pipe_item_is_linked(PipeItem *item) { return ring_item_is_linked(&item->link); @@ -1259,13 +1154,6 @@ static void red_pipe_add_verb(RedChannel* channel, uint16_t verb) red_channel_pipe_add(channel, &item->base); } -static void red_channel_pipe_add_type(RedChannel* channel, int pipe_item_type) -{ - PipeItem *item = spice_new(PipeItem, 1); - red_pipe_item_init(item, pipe_item_type); - red_channel_pipe_add(channel, item); -} - static inline void red_create_surface_item(RedWorker *worker, int surface_id); static void red_add_surface_image(RedWorker *worker, int surface_id); @@ -1392,19 +1280,16 @@ static void release_upgrade_item(RedWorker* worker, UpgradeItem *item) } } -static void red_channel_pipe_clear(RedChannel *channel) +static uint8_t *common_alloc_recv_buf(RedChannel *channel, SpiceDataHeader *msg_header) { - PipeItem *item; + CommonChannel *common = SPICE_CONTAINEROF(channel, CommonChannel, base); - ASSERT(channel); - if (channel->send_data.item) { - channel->release_item(channel, channel->send_data.item, TRUE); - } - while ((item = (PipeItem *)ring_get_head(&channel->pipe))) { - ring_remove(&item->link); - channel->release_item(channel, item, FALSE); - } - channel->pipe_size = 0; + return common->recv_buf; +} + +static void common_release_recv_buf(RedChannel *channel, SpiceDataHeader *msg_header, uint8_t* msg) +{ + return; } #define CLIENT_PIXMAPS_CACHE @@ -6041,19 +5926,6 @@ static void fill_cursor(CursorChannel *cursor_channel, SpiceCursor *red_cursor, } } -static inline void red_channel_reset_send_data(RedChannel *channel) -{ - spice_marshaller_reset(channel->send_data.marshaller); - channel->send_data.header = (SpiceDataHeader *) - spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader)); - spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader)); - channel->send_data.pos = 0; - channel->send_data.header->type = 0; - channel->send_data.header->size = 0; - channel->send_data.header->sub_list = 0; - channel->send_data.header->serial = ++channel->send_data.serial; -} - static inline void red_display_reset_send_data(DisplayChannel *channel) { red_channel_reset_send_data((RedChannel *)channel); @@ -7307,83 +7179,6 @@ static void inline channel_release_res(RedChannel *channel) channel->send_data.item = NULL; } -static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, - int *vec_size, int pos) -{ - RedChannel *channel = (RedChannel *)opaque; - - *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller, - vec, MAX_SEND_VEC, pos); -} - -static void red_channel_peer_on_out_block(void *opaque) -{ - RedChannel *channel = (RedChannel *)opaque; - - channel->send_data.blocked = TRUE; -} - -static void red_channel_peer_on_out_msg_done(void *opaque) -{ - RedChannel *channel = (RedChannel *)opaque; - - channel->send_data.size = 0; - if (channel->send_data.item) { - channel->release_item(channel, channel->send_data.item, TRUE); - channel->send_data.item = NULL; - } - channel->send_data.blocked = FALSE; -} - -static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) -{ - int n; - - ASSERT(stream); - if (handler->size == 0) { - handler->vec = handler->vec_buf; - handler->size = handler->get_msg_size(handler->opaque); - if (!handler->size) { // nothing to be sent - return; - } - } - for (;;) { - handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); - n = reds_stream_writev(stream, handler->vec, handler->vec_size); - if (n == -1) { - switch (errno) { - case EAGAIN: - handler->on_block(handler->opaque); - return; - case EINTR: - break; - case EPIPE: - handler->on_error(handler->opaque); - return; - default: - red_printf("%s", strerror(errno)); - handler->on_error(handler->opaque); - return; - } - } else { - handler->pos += n; - stat_inc_counter(handler->out_bytes_counter, n); - if (handler->pos == handler->size) { // finished writing data - handler->on_msg_done(handler->opaque); - handler->vec = handler->vec_buf; - handler->pos = 0; - handler->size = 0; - return; - } - } - } -} - -static void red_channel_send(RedChannel *channel) -{ - red_peer_handle_outgoing(channel->stream, &channel->outgoing); -} - static void display_channel_push_release(DisplayChannel *channel, uint8_t type, uint64_t id, uint64_t* sync_data) { @@ -7408,16 +7203,6 @@ static void display_channel_push_release(DisplayChannel *channel, uint8_t type, free_list->res->resources[free_list->res->count++].id = id; } -static inline void red_channel_begin_send_message(RedChannel *channel) -{ - spice_marshaller_flush(channel->send_data.marshaller); - channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller); - channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader); - channel->ack_data.messages_window++; - channel->send_data.header = NULL; /* avoid writing to this until we have a new message */ - red_channel_send(channel); -} - static inline void display_begin_send_message(DisplayChannel *channel) { FreeList *free_list = &channel->send_data.free_list; @@ -8299,25 +8084,6 @@ static void red_send_surface_destroy(DisplayChannel *display, uint32_t surface_i red_channel_begin_send_message(channel); } -static inline int red_channel_waiting_for_ack(RedChannel *channel) -{ - return (channel->ack_data.messages_window > channel->ack_data.client_window * 2); -} - -static inline PipeItem *red_channel_pipe_get(RedChannel *channel) -{ - PipeItem *item; - if (!channel || channel->send_data.blocked || - red_channel_waiting_for_ack(channel) || - !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { - return NULL; - } - - --channel->pipe_size; - ring_remove(&item->link); - return item; -} - static void display_channel_send_item(RedChannel *base, PipeItem *pipe_item) { DisplayChannel *display_channel = (DisplayChannel *)red_ref_channel(base); @@ -8411,29 +8177,6 @@ static void display_channel_send_item(RedChannel *base, PipeItem *pipe_item) red_unref_channel(&display_channel->common.base); } -void red_channel_push(RedChannel *channel) -{ - PipeItem *pipe_item; - - if (!channel) { - return; - } - if (!channel->during_send) { - channel->during_send = TRUE; - } else { - return; - } - - if (channel->send_data.blocked) { - red_channel_send(channel); - } - - while ((pipe_item = red_channel_pipe_get(channel))) { - channel->send_item(channel, pipe_item); - } - channel->during_send = FALSE; -} - static void cursor_channel_send_item(RedChannel *channel, PipeItem *pipe_item) { CursorChannel *cursor_channel = SPICE_CONTAINEROF(channel, CursorChannel, common.base); @@ -8557,11 +8300,7 @@ void red_show_tree(RedWorker *worker) } } -static inline int red_channel_is_connected(RedChannel *channel) -{ - return !!channel->stream; -} - +// TODO: move to red_channel static void red_disconnect_channel(RedChannel *channel) { channel_release_res(channel); @@ -8569,7 +8308,7 @@ static void red_disconnect_channel(RedChannel *channel) reds_stream_free(channel->stream); channel->stream = NULL; channel->send_data.blocked = FALSE; - channel->send_data.size = channel->send_data.pos = 0; + channel->send_data.size = 0; spice_marshaller_reset(channel->send_data.marshaller); red_unref_channel(channel); } @@ -8962,26 +8701,6 @@ static void on_new_display_channel(RedWorker *worker) } } -static int red_channel_handle_message(RedChannel *channel, uint32_t size, uint16_t type, void *message) -{ - switch (type) { - case SPICE_MSGC_ACK_SYNC: - channel->ack_data.client_generation = *(uint32_t *)message; - break; - case SPICE_MSGC_ACK: - if (channel->ack_data.client_generation == channel->ack_data.generation) { - channel->ack_data.messages_window -= channel->ack_data.client_window; - } - break; - case SPICE_MSGC_DISCONNECTING: - break; - default: - red_printf("invalid message type %u", type); - return FALSE; - } - return TRUE; -} - static GlzSharedDictionary *_red_find_glz_dictionary(uint8_t dict_id) { RingItem *now; @@ -9288,79 +9007,6 @@ static int display_channel_handle_message(RedChannel *channel, uint32_t size, ui } } -static void red_channel_receive(RedChannel *channel) -{ - for (;;) { - ssize_t n; - n = channel->incoming.end - channel->incoming.now; - ASSERT(n); - ASSERT(channel->stream); - n = reds_stream_read(channel->stream, channel->incoming.now, n); - if (n <= 0) { - if (n == 0) { - channel->disconnect(channel); - return; - } - ASSERT(n == -1); - switch (errno) { - case EAGAIN: - return; - case EINTR: - break; - case EPIPE: - channel->disconnect(channel); - return; - default: - red_printf("%s", strerror(errno)); - channel->disconnect(channel); - return; - } - } else { - channel->incoming.now += n; - for (;;) { - SpiceDataHeader *header = channel->incoming.message; - uint8_t *data = (uint8_t *)(header+1); - size_t parsed_size; - uint8_t *parsed; - message_destructor_t parsed_free; - - n = channel->incoming.now - (uint8_t *)header; - if (n < sizeof(SpiceDataHeader) || - n < sizeof(SpiceDataHeader) + header->size) { - break; - } - parsed = channel->parser((void *)data, data + header->size, header->type, - SPICE_VERSION_MINOR, &parsed_size, &parsed_free); - - if (parsed == NULL) { - red_printf("failed to parse message type %d", header->type); - channel->disconnect(channel); - return; - } - - if (!channel->handle_parsed(channel, parsed_size, header->type, parsed)) { - free(parsed); - channel->disconnect(channel); - return; - } - parsed_free(parsed); - channel->incoming.message = (SpiceDataHeader *)((uint8_t *)header + - sizeof(SpiceDataHeader) + - header->size); - } - - if (channel->incoming.now == (uint8_t *)channel->incoming.message) { - channel->incoming.now = channel->incoming.buf; - channel->incoming.message = (SpiceDataHeader *)channel->incoming.buf; - } else if (channel->incoming.now == channel->incoming.end) { - memcpy(channel->incoming.buf, channel->incoming.message, n); - channel->incoming.now = channel->incoming.buf + n; - channel->incoming.message = (SpiceDataHeader *)channel->incoming.buf; - } - } - } -} - int common_channel_config_socket(RedChannel *channel) { int flags; @@ -9391,19 +9037,25 @@ static void free_common_channel_from_listener(EventListener *ctx) free(common); } - -static void red_channel_default_peer_on_error(RedChannel *channel) +void worker_watch_update_mask(SpiceWatch *watch, int event_mask) { - channel->disconnect(channel); } -static int red_channel_peer_get_out_msg_size(void *opaque) +SpiceWatch *worker_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque) { - RedChannel *channel = (RedChannel *)opaque; + return NULL; // apparently allowed? +} - return channel->send_data.size; +void worker_watch_remove(SpiceWatch *watch) +{ } +SpiceCoreInterface worker_core = { + .watch_update_mask = worker_watch_update_mask, + .watch_add = worker_watch_add, + .watch_remove = worker_watch_remove, +}; + static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, RedsStream *stream, int migrate, event_listener_action_proc handler, @@ -9417,47 +9069,31 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i RedChannel *channel; CommonChannel *common; - ASSERT(size >= sizeof(*channel)); - common = spice_malloc0(size); - channel = &common->base; - ASSERT(common == (CommonChannel*)channel); - channel->stream = stream; - if (!common_channel_config_socket(channel)) { + channel = red_channel_create_parser(size, stream, &worker_core, migrate, + TRUE /* handle_acks */, + common_channel_config_socket, + spice_get_client_channel_parser(channel_id, NULL), + handle_parsed, + common_alloc_recv_buf, + common_release_recv_buf, + hold_item, + send_item, + release_item, + red_channel_default_peer_on_error, + red_channel_default_peer_on_error); + common = (CommonChannel *)channel; + if (!channel) { goto error; } common->id = worker->id; - channel->parser = spice_get_client_channel_parser(channel_id, NULL); common->listener.refs = 1; common->listener.action = handler; common->listener.free = free_common_channel_from_listener; - channel->disconnect = disconnect; - channel->send_item = send_item; - channel->hold_item = hold_item; - channel->release_item = release_item; - channel->handle_parsed = handle_parsed; - channel->stream = stream; common->worker = worker; - channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + - // block flags) + // TODO: Should this be distinctive for the Display/Cursor channels? doesn't + // make sense, does it? channel->ack_data.client_window = IS_LOW_BANDWIDTH() ? WIDE_CLIENT_ACK_WINDOW : NARROW_CLIENT_ACK_WINDOW; - channel->ack_data.client_generation = ~0; - channel->incoming.message = (SpiceDataHeader *)channel->incoming.buf; - channel->incoming.now = channel->incoming.buf; - channel->incoming.end = channel->incoming.buf + sizeof(channel->incoming.buf); - ring_init(&channel->pipe); - channel->send_data.marshaller = spice_marshaller_new(); - - channel->outgoing.opaque = channel; - channel->outgoing.pos = 0; - channel->outgoing.size = 0; - channel->outgoing.out_bytes_counter = 0; - - channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; - channel->outgoing.prepare = red_channel_peer_prepare_out_msg; - channel->outgoing.on_block = red_channel_peer_on_out_block; - channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; - channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; event.events = EPOLLIN | EPOLLOUT | EPOLLET; event.data.ptr = &common->listener; @@ -9466,8 +9102,6 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i goto error; } - channel->migrate = migrate; - return channel; error: |