diff options
author | Yonit Halperin <yhalperi@redhat.com> | 2012-01-08 09:20:55 +0200 |
---|---|---|
committer | Yonit Halperin <yhalperi@redhat.com> | 2012-01-12 16:33:36 +0200 |
commit | 65c859ba819fdc70ebc3ba5208bb994d06174873 (patch) | |
tree | 0156288ab282012b7fdf9e8ad0b1bf66ada00e58 /server/red_channel.c | |
parent | ec0bf2488f2ac0b7fb5102fd3d8822fd2883bd0a (diff) | |
download | spice-65c859ba819fdc70ebc3ba5208bb994d06174873.tar.gz spice-65c859ba819fdc70ebc3ba5208bb994d06174873.tar.xz spice-65c859ba819fdc70ebc3ba5208bb994d06174873.zip |
server: add support for SPICE_COMMON_CAP_MINI_HEADER
Support for a header without a serial and without sub list.
red_channel: Support the two types of headers.
Keep a consistent consecutive messages serial.
red_worker: use urgent marshaller instead of sub list.
snd_worker: Sound channels need special support since they still don't use
red_channel for sending & receiving.
Diffstat (limited to 'server/red_channel.c')
-rw-r--r-- | server/red_channel.c | 219 |
1 files changed, 164 insertions, 55 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 06b4ef0f..e526179e 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -40,6 +40,82 @@ static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); static void red_client_remove_channel(RedChannelClient *rcc); static void red_channel_client_restore_main_sender(RedChannelClient *rcc); +static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return ((SpiceDataHeader *)header->data)->size; +} + +static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header) +{ + return ((SpiceMiniDataHeader *)header->data)->size; +} + +static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return ((SpiceDataHeader *)header->data)->type; +} + +static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header) +{ + return ((SpiceMiniDataHeader *)header->data)->type; +} + +static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceDataHeader *)header->data)->type = type; +} + +static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type) +{ + ((SpiceMiniDataHeader *)header->data)->type = type; +} + +static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceDataHeader *)header->data)->size = size; +} + +static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size) +{ + ((SpiceMiniDataHeader *)header->data)->size = size; +} + +static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + ((SpiceDataHeader *)header->data)->serial = serial; +} + +static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial) +{ + red_error("attempt to set header serial on mini header"); +} + +static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + ((SpiceDataHeader *)header->data)->sub_list = sub_list; +} + +static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list) +{ + red_error("attempt to set header sub list on mini header"); +} + +static SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader), + full_header_set_msg_type, + full_header_set_msg_size, + full_header_set_msg_serial, + full_header_set_msg_sub_list, + full_header_get_msg_type, + full_header_get_msg_size}; + +static SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader), + mini_header_set_msg_type, + mini_header_set_msg_size, + mini_header_set_msg_serial, + mini_header_set_msg_sub_list, + mini_header_get_msg_type, + mini_header_get_msg_size}; + /* return the number of bytes read. -1 in case of error */ static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) { @@ -83,29 +159,31 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle uint8_t *parsed; size_t parsed_size; message_destructor_t parsed_free; + uint16_t msg_type; + uint32_t msg_size; for (;;) { int ret_handle; - if (handler->header_pos < sizeof(SpiceDataHeader)) { + if (handler->header_pos < handler->header.header_size) { bytes_read = red_peer_receive(stream, - ((uint8_t *)&handler->header) + handler->header_pos, - sizeof(SpiceDataHeader) - handler->header_pos); + handler->header.data + handler->header_pos, + handler->header.header_size - handler->header_pos); if (bytes_read == -1) { handler->cb->on_error(handler->opaque); return; } handler->header_pos += bytes_read; - if (handler->header_pos != sizeof(SpiceDataHeader)) { + if (handler->header_pos != handler->header.header_size) { return; } } - if (handler->msg_pos < handler->header.size) { + msg_size = handler->header.get_msg_size(&handler->header); + msg_type = handler->header.get_msg_type(&handler->header); + if (handler->msg_pos < msg_size) { if (!handler->msg) { - handler->msg = handler->cb->alloc_msg_buf(handler->opaque, - handler->header.type, - handler->header.size); + handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size); if (handler->msg == NULL) { red_printf("ERROR: channel refused to allocate buffer."); handler->cb->on_error(handler->opaque); @@ -115,47 +193,37 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle bytes_read = red_peer_receive(stream, handler->msg + handler->msg_pos, - handler->header.size - handler->msg_pos); + msg_size - handler->msg_pos); if (bytes_read == -1) { - handler->cb->release_msg_buf(handler->opaque, - handler->header.type, - handler->header.size, - handler->msg); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); handler->cb->on_error(handler->opaque); return; } handler->msg_pos += bytes_read; - if (handler->msg_pos != handler->header.size) { + if (handler->msg_pos != msg_size) { return; } } if (handler->cb->parser) { parsed = handler->cb->parser(handler->msg, - handler->msg + handler->header.size, handler->header.type, + handler->msg + msg_size, msg_type, SPICE_VERSION_MINOR, &parsed_size, &parsed_free); if (parsed == NULL) { - red_printf("failed to parse message type %d", handler->header.type); - handler->cb->release_msg_buf(handler->opaque, handler->header.type, - handler->header.size, - handler->msg); + red_printf("failed to parse message type %d", msg_type); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); handler->cb->on_error(handler->opaque); return; } ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size, - handler->header.type, parsed); + msg_type, parsed); parsed_free(parsed); } else { - ret_handle = handler->cb->handle_message(handler->opaque, - handler->header.type, - handler->header.size, + ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size, handler->msg); } handler->msg_pos = 0; - handler->cb->release_msg_buf(handler->opaque, - handler->header.type, - handler->header.size, - handler->msg); + handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg); handler->msg = NULL; handler->header_pos = 0; @@ -276,21 +344,32 @@ static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClien static void red_channel_client_reset_send_data(RedChannelClient *rcc) { spice_marshaller_reset(rcc->send_data.marshaller); - rcc->send_data.header = (SpiceDataHeader *) - spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader)); - spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader)); - rcc->send_data.header->type = 0; - rcc->send_data.header->size = 0; - rcc->send_data.header->sub_list = 0; - - if (!red_channel_client_urgent_marshaller_is_active(rcc)) { - rcc->send_data.header->serial = ++rcc->send_data.serial; - } else { - /* The serial was incremented by the call to reset_send_data - * that was done for the main marshaller. The urgent msg should - * receive this serial, and the main msg serial should be - * the following one. */ - rcc->send_data.header->serial = rcc->send_data.serial++; + rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller, + rcc->send_data.header.header_size); + spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size); + rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0); + + /* Keeping the serial consecutive: reseting it if reset_send_data + * has been called before, but no message has been sent since then. + */ + if (rcc->send_data.last_sent_serial != rcc->send_data.serial) { + ASSERT(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1); + /* When the urgent marshaller is active, the serial was incremented by + * the call to reset_send_data that was made for the main marshaller. + * The urgent msg receives this serial, and the main msg serial is + * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial) + * should be 1 in this case*/ + if (!red_channel_client_urgent_marshaller_is_active(rcc)) { + rcc->send_data.serial = rcc->send_data.last_sent_serial; + } + } + rcc->send_data.serial++; + + if (!rcc->is_mini_header) { + ASSERT(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller); + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0); + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); } } @@ -376,7 +455,7 @@ static void red_channel_peer_on_out_msg_done(void *opaque) if (red_channel_client_urgent_marshaller_is_active(rcc)) { red_channel_client_restore_main_sender(rcc); - ASSERT(rcc->send_data.header != NULL); + ASSERT(rcc->send_data.header.data != NULL); red_channel_client_begin_send_message(rcc); } } @@ -457,6 +536,18 @@ RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedCl rcc->outgoing.size = 0; red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; if (!channel->channel_cbs.config_socket(rcc)) { goto error; @@ -545,6 +636,7 @@ RedChannel *red_channel_create(int size, client_cbs.migrate = red_channel_client_default_migrate; red_channel_register_client_cbs(channel, &client_cbs); + red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER); channel->thread_id = pthread_self(); @@ -590,6 +682,7 @@ RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id) client_cbs.migrate = red_channel_client_default_migrate; red_channel_register_client_cbs(channel, &client_cbs); + red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER); channel->thread_id = pthread_self(); @@ -850,7 +943,7 @@ static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size } int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, - uint16_t type, void *message) + uint16_t type, void *message) { switch (type) { case SPICE_MSGC_ACK_SYNC: @@ -897,7 +990,7 @@ void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, { ASSERT(red_channel_client_no_item_being_sent(rcc)); ASSERT(msg_type != 0); - rcc->send_data.header->type = msg_type; + rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type); rcc->send_data.item = item; if (item) { rcc->channel->channel_cbs.hold_item(rcc, item); @@ -909,23 +1002,25 @@ void red_channel_client_begin_send_message(RedChannelClient *rcc) SpiceMarshaller *m = rcc->send_data.marshaller; // TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state) - if (rcc->send_data.header->type == 0) { + if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) { red_printf("BUG: header->type == 0"); return; } spice_marshaller_flush(m); rcc->send_data.size = spice_marshaller_get_total_size(m); - rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader); + rcc->send_data.header.set_msg_size(&rcc->send_data.header, + rcc->send_data.size - rcc->send_data.header.header_size); rcc->ack_data.messages_window++; - rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */ + rcc->send_data.last_sent_serial = rcc->send_data.serial; + rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */ red_channel_client_send(rcc); } SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc) { ASSERT(red_channel_client_no_item_being_sent(rcc)); - ASSERT(rcc->send_data.header != NULL); - rcc->send_data.main.header = rcc->send_data.header; + ASSERT(rcc->send_data.header.data != NULL); + rcc->send_data.main.header_data = rcc->send_data.header.data; rcc->send_data.main.item = rcc->send_data.item; rcc->send_data.marshaller = rcc->send_data.urgent.marshaller; @@ -938,8 +1033,10 @@ static void red_channel_client_restore_main_sender(RedChannelClient *rcc) { spice_marshaller_reset(rcc->send_data.urgent.marshaller); rcc->send_data.marshaller = rcc->send_data.main.marshaller; - rcc->send_data.header = rcc->send_data.main.header; - rcc->send_data.header->serial = rcc->send_data.serial; + rcc->send_data.header.data = rcc->send_data.main.header_data; + if (!rcc->is_mini_header) { + rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial); + } rcc->send_data.item = rcc->send_data.main.item; } @@ -1069,7 +1166,6 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_ rcc->ack_data.client_window = client_window; } - static void red_channel_remove_client(RedChannelClient *rcc) { ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id)); @@ -1127,6 +1223,19 @@ RedChannelClient *red_channel_client_create_dummy(int size, rcc->client = client; rcc->channel = channel; red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps); + if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) { + rcc->incoming.header = mini_header_wrapper; + rcc->send_data.header = mini_header_wrapper; + rcc->is_mini_header = TRUE; + } else { + rcc->incoming.header = full_header_wrapper; + rcc->send_data.header = full_header_wrapper; + rcc->is_mini_header = FALSE; + } + + rcc->incoming.header.data = rcc->incoming.header_buf; + rcc->incoming.serial = 1; + red_channel_add_client(channel, rcc); return rcc; } @@ -1200,7 +1309,7 @@ int red_channel_client_blocked(RedChannelClient *rcc) int red_channel_client_send_message_pending(RedChannelClient *rcc) { - return rcc->send_data.header->type != 0; + return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0; } /* accessors for RedChannelClient */ @@ -1221,7 +1330,7 @@ RedClient *red_channel_client_get_client(RedChannelClient *rcc) void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list) { - rcc->send_data.header->sub_list = sub_list; + rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list); } /* end of accessors */ |