diff options
Diffstat (limited to 'server/red_channel.c')
-rw-r--r-- | server/red_channel.c | 253 |
1 files changed, 216 insertions, 37 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 2ce0094c..e526179e 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -38,6 +38,83 @@ static void red_channel_client_event(int fd, int event, void *data); static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); static void red_client_remove_channel(RedChannelClient *rcc); +static void red_channel_client_restore_main_sender(RedChannelClient *rcc); + +static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return ((SpiceDataHeader *)header->data)->size; +} + +static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return ((SpiceMiniDataHeader *)header->data)->size; +} + +static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return ((SpiceDataHeader *)header->data)->type; +} + +static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return ((SpiceMiniDataHeader *)header->data)->type; +} + +static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceDataHeader *)header->data)->type = type; +} + +static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceMiniDataHeader *)header->data)->type = type; +} + +static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceDataHeader *)header->data)->size = size; +} + +static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceMiniDataHeader *)header->data)->size = size; +} + +static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + ((SpiceDataHeader *)header->data)->serial = serial; +} + +static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + red_error("attempt to set header serial on mini header"); +} + +static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + ((SpiceDataHeader *)header->data)->sub_list = sub_list; +} + +static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + red_error("attempt to set header sub list on mini header"); +} + +static SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), + full_header_set_msg_type, + full_header_set_msg_size, + full_header_set_msg_serial, + full_header_set_msg_sub_list, + full_header_get_msg_type, + full_header_get_msg_size}; + +static SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), + mini_header_set_msg_type, + mini_header_set_msg_size, + mini_header_set_msg_serial, + mini_header_set_msg_sub_list, + mini_header_get_msg_type, + mini_header_get_msg_size}; /* return the number of bytes read. -1 in case of error */ static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) @@ -82,27 +159,31 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle uint8_t *parsed; size_t parsed_size; message_destructor_t parsed_free; + uint16_t msg_type; + uint32_t msg_size; for (;;) { int ret_handle; - if (handler->header_pos < sizeof(SpiceDataHeader)) { + if (handler->header_pos < handler->header.header_size) { bytes_read = red_peer_receive(stream, - ((uint8_t *)&handler->header) + handler->header_pos, - sizeof(SpiceDataHeader) - handler->header_pos); + handler->header.data + handler->header_pos, + handler->header.header_size - handler->header_pos); if (bytes_read == -1) { handler->cb->on_error(handler->opaque); return; } handler->header_pos += bytes_read; - if (handler->header_pos != sizeof(SpiceDataHeader)) { + if (handler->header_pos != handler->header.header_size) { return; } } - if (handler->msg_pos < handler->header.size) { + msg_size = handler->header.get_msg_size(&handler->header); + msg_type = handler->header.get_msg_type(&handler->header); + if (handler->msg_pos < msg_size) { if (!handler->msg) { - handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header); + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); if (handler->msg == NULL) { red_printf("ERROR: channel refused to allocate buffer."); handler->cb->on_error(handler->opaque); @@ -112,37 +193,37 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle bytes_read = red_peer_receive(stream, handler->msg + handler->msg_pos, - handler->header.size - handler->msg_pos); + msg_size - handler->msg_pos); if (bytes_read == -1) { - handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); handler->cb->on_error(handler->opaque); return; } handler->msg_pos += bytes_read; - if (handler->msg_pos != handler->header.size) { + if (handler->msg_pos != msg_size) { return; } } if (handler->cb->parser) { parsed = handler->cb->parser(handler->msg, - handler->msg + handler->header.size, handler->header.type, + handler->msg + msg_size, msg_type, SPICE_VERSION_MINOR, &parsed_size, &parsed_free); if (parsed == NULL) { - red_printf("failed to parse message type %d", handler->header.type); - handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg); + red_printf("failed to parse message type %d", msg_type); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); handler->cb->on_error(handler->opaque); return; } ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, - handler->header.type, parsed); + msg_type, parsed); parsed_free(parsed); } else { - ret_handle = handler->cb->handle_message(handler->opaque, &handler->header, - handler->msg); + ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, + handler->msg); } handler->msg_pos = 0; - handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); handler->msg = NULL; handler->header_pos = 0; @@ -204,10 +285,13 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle handler->pos += n; handler->cb->on_output(handler->opaque, n); if (handler->pos == handler->size) { // finished writing data - handler->cb->on_msg_done(handler->opaque); + /* reset handler before calling on_msg_done, since it + * can trigger another call to red_peer_handle_outgoing (when + * switching from the urgent marshaller to the main one */ handler->vec = handler->vec_buf; handler->pos = 0; handler->size = 0; + handler->cb->on_msg_done(handler->opaque); return; } } @@ -252,16 +336,41 @@ static void red_channel_client_peer_on_out_block(void *opaque) SPICE_WATCH_EVENT_WRITE); } +static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) +{ + return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller); +} + static void red_channel_client_reset_send_data(RedChannelClient *rcc) { spice_marshaller_reset(rcc->send_data.marshaller); - rcc->send_data.header = (SpiceDataHeader *) - spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader)); - spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader)); - rcc->send_data.header->type = 0; - rcc->send_data.header->size = 0; - rcc->send_data.header->sub_list = 0; - rcc->send_data.header->serial = ++rcc->send_data.serial; + rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, + rcc->send_data.header.header_size); + spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); + rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); + + /* Keeping the serial consecutive: reseting it if reset_send_data + * has been called before, but no message has been sent since then. + */ + if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { + ASSERT(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); + /* When the urgent marshaller is active, the serial was incremented by + * the call to reset_send_data that was made for the main marshaller. + * The urgent msg receives this serial, and the main msg serial is + * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) + * should be 1 in this case*/ + if (!red_channel_client_urgent_marshaller_is_active(rcc)) { + rcc->send_data.serial = rcc->send_data.last_sent_serial; + } + } + rcc->send_data.serial++; + + if (!rcc->is_mini_header) { + ASSERT(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } } void red_channel_client_push_set_ack(RedChannelClient *rcc) @@ -343,6 +452,12 @@ static void red_channel_peer_on_out_msg_done(void *opaque) rcc->channel->core->watch_update_mask(rcc->stream->watch, SPICE_WATCH_EVENT_READ); } + + if (red_channel_client_urgent_marshaller_is_active(rcc)) { + red_channel_client_restore_main_sender(rcc); + ASSERT(rcc->send_data.header.data != NULL); + red_channel_client_begin_send_message(rcc); + } } static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item) @@ -407,7 +522,10 @@ RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedCl // block flags) rcc->ack_data.client_generation = ~0; rcc->ack_data.client_window = CLIENT_ACK_WINDOW; - rcc->send_data.marshaller = spice_marshaller_new(); + rcc->send_data.main.marshaller = spice_marshaller_new(); + rcc->send_data.urgent.marshaller = spice_marshaller_new(); + + rcc->send_data.marshaller = rcc->send_data.main.marshaller; rcc->incoming.opaque = rcc; rcc->incoming.cb = &channel->incoming_cb; @@ -418,6 +536,18 @@ RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedCl rcc->outgoing.size = 0; red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; if (!channel->channel_cbs.config_socket(rcc)) { goto error; @@ -506,6 +636,7 @@ RedChannel *red_channel_create(int size, client_cbs.migrate = red_channel_client_default_migrate; red_channel_register_client_cbs(channel, &client_cbs); + red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER); channel->thread_id = pthread_self(); @@ -551,6 +682,7 @@ RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id) client_cbs.migrate = red_channel_client_default_migrate; red_channel_register_client_cbs(channel, &client_cbs); + red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER); channel->thread_id = pthread_self(); @@ -559,7 +691,10 @@ RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id) return channel; } -static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg) +static int do_nothing_handle_message(RedChannelClient *rcc, + uint16_t type, + uint32_t size, + uint8_t *msg) { return TRUE; } @@ -643,9 +778,14 @@ void red_channel_client_destroy(RedChannelClient *rcc) red_channel_client_disconnect(rcc); } red_client_remove_channel(rcc); - if (rcc->send_data.marshaller) { - spice_marshaller_destroy(rcc->send_data.marshaller); + if (rcc->send_data.main.marshaller) { + spice_marshaller_destroy(rcc->send_data.main.marshaller); + } + + if (rcc->send_data.urgent.marshaller) { + spice_marshaller_destroy(rcc->send_data.urgent.marshaller); } + red_channel_client_destroy_remote_caps(rcc); free(rcc); } @@ -803,7 +943,7 @@ static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size } int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, - uint16_t type, void *message) + uint16_t type, void *message) { switch (type) { case SPICE_MSGC_ACK_SYNC: @@ -850,7 +990,7 @@ void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, { ASSERT(red_channel_client_no_item_being_sent(rcc)); ASSERT(msg_type != 0); - rcc->send_data.header->type = msg_type; + rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); rcc->send_data.item = item; if (item) { rcc->channel->channel_cbs.hold_item(rcc, item); @@ -862,18 +1002,44 @@ void red_channel_client_begin_send_message(RedChannelClient *rcc) SpiceMarshaller *m = rcc->send_data.marshaller; // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) - if (rcc->send_data.header->type == 0) { + if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { red_printf("BUG: header->type == 0"); return; } spice_marshaller_flush(m); rcc->send_data.size = spice_marshaller_get_total_size(m); - rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, + rcc->send_data.size - rcc->send_data.header.header_size); rcc->ack_data.messages_window++; - rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */ + rcc->send_data.last_sent_serial = rcc->send_data.serial; + rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ red_channel_client_send(rcc); } +SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) +{ + ASSERT(red_channel_client_no_item_being_sent(rcc)); + ASSERT(rcc->send_data.header.data != NULL); + rcc->send_data.main.header_data = rcc->send_data.header.data; + rcc->send_data.main.item = rcc->send_data.item; + + rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; + rcc->send_data.item = NULL; + red_channel_client_reset_send_data(rcc); + return rcc->send_data.marshaller; +} + +static void red_channel_client_restore_main_sender(RedChannelClient *rcc) +{ + spice_marshaller_reset(rcc->send_data.urgent.marshaller); + rcc->send_data.marshaller = rcc->send_data.main.marshaller; + rcc->send_data.header.data = rcc->send_data.main.header_data; + if (!rcc->is_mini_header) { + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } + rcc->send_data.item = rcc->send_data.main.item; +} + uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc) { return rcc->send_data.serial; @@ -1000,7 +1166,6 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_ rcc->ack_data.client_window = client_window; } - static void red_channel_remove_client(RedChannelClient *rcc) { ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id)); @@ -1058,6 +1223,19 @@ RedChannelClient *red_channel_client_create_dummy(int size, rcc->client = client; rcc->channel = channel; red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; + red_channel_add_client(channel, rcc); return rcc; } @@ -1131,7 +1309,7 @@ int red_channel_client_blocked(RedChannelClient *rcc) int red_channel_client_send_message_pending(RedChannelClient *rcc) { - return rcc->send_data.header->type != 0; + return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; } /* accessors for RedChannelClient */ @@ -1150,10 +1328,11 @@ RedClient *red_channel_client_get_client(RedChannelClient *rcc) return rcc->client; } -SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc) +void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) { - return rcc->send_data.header; + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); } + /* end of accessors */ int red_channel_get_first_socket(RedChannel *channel) |