summaryrefslogtreecommitdiffstats
path: root/server/red_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/red_channel.c')
-rw-r--r--server/red_channel.c253
1 files changed, 216 insertions, 37 deletions
diff --git a/server/red_channel.c b/server/red_channel.c
index 2ce0094c..e526179e 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -38,6 +38,83 @@
static void red_channel_client_event(int fd, int event, void *data);
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
static void red_client_remove_channel(RedChannelClient *rcc);
+static void red_channel_client_restore_main_sender(RedChannelClient *rcc);
+
+static uint32_t full_header_get_msg_size(SpiceDataHeaderOpaque *header)
+{
+ return ((SpiceDataHeader *)header->data)->size;
+}
+
+static uint32_t mini_header_get_msg_size(SpiceDataHeaderOpaque *header)
+{
+ return ((SpiceMiniDataHeader *)header->data)->size;
+}
+
+static uint16_t full_header_get_msg_type(SpiceDataHeaderOpaque *header)
+{
+ return ((SpiceDataHeader *)header->data)->type;
+}
+
+static uint16_t mini_header_get_msg_type(SpiceDataHeaderOpaque *header)
+{
+ return ((SpiceMiniDataHeader *)header->data)->type;
+}
+
+static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
+{
+ ((SpiceDataHeader *)header->data)->type = type;
+}
+
+static void mini_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type)
+{
+ ((SpiceMiniDataHeader *)header->data)->type = type;
+}
+
+static void full_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
+{
+ ((SpiceDataHeader *)header->data)->size = size;
+}
+
+static void mini_header_set_msg_size(SpiceDataHeaderOpaque *header, uint32_t size)
+{
+ ((SpiceMiniDataHeader *)header->data)->size = size;
+}
+
+static void full_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
+{
+ ((SpiceDataHeader *)header->data)->serial = serial;
+}
+
+static void mini_header_set_msg_serial(SpiceDataHeaderOpaque *header, uint64_t serial)
+{
+ red_error("attempt to set header serial on mini header");
+}
+
+static void full_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
+{
+ ((SpiceDataHeader *)header->data)->sub_list = sub_list;
+}
+
+static void mini_header_set_msg_sub_list(SpiceDataHeaderOpaque *header, uint32_t sub_list)
+{
+ red_error("attempt to set header sub list on mini header");
+}
+
+static SpiceDataHeaderOpaque full_header_wrapper = {NULL, sizeof(SpiceDataHeader),
+ full_header_set_msg_type,
+ full_header_set_msg_size,
+ full_header_set_msg_serial,
+ full_header_set_msg_sub_list,
+ full_header_get_msg_type,
+ full_header_get_msg_size};
+
+static SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMiniDataHeader),
+ mini_header_set_msg_type,
+ mini_header_set_msg_size,
+ mini_header_set_msg_serial,
+ mini_header_set_msg_sub_list,
+ mini_header_get_msg_type,
+ mini_header_get_msg_size};
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -82,27 +159,31 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
uint8_t *parsed;
size_t parsed_size;
message_destructor_t parsed_free;
+ uint16_t msg_type;
+ uint32_t msg_size;
for (;;) {
int ret_handle;
- if (handler->header_pos < sizeof(SpiceDataHeader)) {
+ if (handler->header_pos < handler->header.header_size) {
bytes_read = red_peer_receive(stream,
- ((uint8_t *)&handler->header) + handler->header_pos,
- sizeof(SpiceDataHeader) - handler->header_pos);
+ handler->header.data + handler->header_pos,
+ handler->header.header_size - handler->header_pos);
if (bytes_read == -1) {
handler->cb->on_error(handler->opaque);
return;
}
handler->header_pos += bytes_read;
- if (handler->header_pos != sizeof(SpiceDataHeader)) {
+ if (handler->header_pos != handler->header.header_size) {
return;
}
}
- if (handler->msg_pos < handler->header.size) {
+ msg_size = handler->header.get_msg_size(&handler->header);
+ msg_type = handler->header.get_msg_type(&handler->header);
+ if (handler->msg_pos < msg_size) {
if (!handler->msg) {
- handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header);
+ handler->msg = handler->cb->alloc_msg_buf(handler->opaque, msg_type, msg_size);
if (handler->msg == NULL) {
red_printf("ERROR: channel refused to allocate buffer.");
handler->cb->on_error(handler->opaque);
@@ -112,37 +193,37 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
bytes_read = red_peer_receive(stream,
handler->msg + handler->msg_pos,
- handler->header.size - handler->msg_pos);
+ msg_size - handler->msg_pos);
if (bytes_read == -1) {
- handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg);
+ handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos += bytes_read;
- if (handler->msg_pos != handler->header.size) {
+ if (handler->msg_pos != msg_size) {
return;
}
}
if (handler->cb->parser) {
parsed = handler->cb->parser(handler->msg,
- handler->msg + handler->header.size, handler->header.type,
+ handler->msg + msg_size, msg_type,
SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
if (parsed == NULL) {
- red_printf("failed to parse message type %d", handler->header.type);
- handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg);
+ red_printf("failed to parse message type %d", msg_type);
+ handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
handler->cb->on_error(handler->opaque);
return;
}
ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
- handler->header.type, parsed);
+ msg_type, parsed);
parsed_free(parsed);
} else {
- ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
- handler->msg);
+ ret_handle = handler->cb->handle_message(handler->opaque, msg_type, msg_size,
+ handler->msg);
}
handler->msg_pos = 0;
- handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg);
+ handler->cb->release_msg_buf(handler->opaque, msg_type, msg_size, handler->msg);
handler->msg = NULL;
handler->header_pos = 0;
@@ -204,10 +285,13 @@ static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handle
handler->pos += n;
handler->cb->on_output(handler->opaque, n);
if (handler->pos == handler->size) { // finished writing data
- handler->cb->on_msg_done(handler->opaque);
+ /* reset handler before calling on_msg_done, since it
+ * can trigger another call to red_peer_handle_outgoing (when
+ * switching from the urgent marshaller to the main one */
handler->vec = handler->vec_buf;
handler->pos = 0;
handler->size = 0;
+ handler->cb->on_msg_done(handler->opaque);
return;
}
}
@@ -252,16 +336,41 @@ static void red_channel_client_peer_on_out_block(void *opaque)
SPICE_WATCH_EVENT_WRITE);
}
+static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc)
+{
+ return (rcc->send_data.marshaller == rcc->send_data.urgent.marshaller);
+}
+
static void red_channel_client_reset_send_data(RedChannelClient *rcc)
{
spice_marshaller_reset(rcc->send_data.marshaller);
- rcc->send_data.header = (SpiceDataHeader *)
- spice_marshaller_reserve_space(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
- spice_marshaller_set_base(rcc->send_data.marshaller, sizeof(SpiceDataHeader));
- rcc->send_data.header->type = 0;
- rcc->send_data.header->size = 0;
- rcc->send_data.header->sub_list = 0;
- rcc->send_data.header->serial = ++rcc->send_data.serial;
+ rcc->send_data.header.data = spice_marshaller_reserve_space(rcc->send_data.marshaller,
+ rcc->send_data.header.header_size);
+ spice_marshaller_set_base(rcc->send_data.marshaller, rcc->send_data.header.header_size);
+ rcc->send_data.header.set_msg_type(&rcc->send_data.header, 0);
+ rcc->send_data.header.set_msg_size(&rcc->send_data.header, 0);
+
+ /* Keeping the serial consecutive: reseting it if reset_send_data
+ * has been called before, but no message has been sent since then.
+ */
+ if (rcc->send_data.last_sent_serial != rcc->send_data.serial) {
+ ASSERT(rcc->send_data.serial - rcc->send_data.last_sent_serial == 1);
+ /* When the urgent marshaller is active, the serial was incremented by
+ * the call to reset_send_data that was made for the main marshaller.
+ * The urgent msg receives this serial, and the main msg serial is
+ * the following one. Thus, (rcc->send_data.serial - rcc->send_data.last_sent_serial)
+ * should be 1 in this case*/
+ if (!red_channel_client_urgent_marshaller_is_active(rcc)) {
+ rcc->send_data.serial = rcc->send_data.last_sent_serial;
+ }
+ }
+ rcc->send_data.serial++;
+
+ if (!rcc->is_mini_header) {
+ ASSERT(rcc->send_data.marshaller != rcc->send_data.urgent.marshaller);
+ rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, 0);
+ rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
+ }
}
void red_channel_client_push_set_ack(RedChannelClient *rcc)
@@ -343,6 +452,12 @@ static void red_channel_peer_on_out_msg_done(void *opaque)
rcc->channel->core->watch_update_mask(rcc->stream->watch,
SPICE_WATCH_EVENT_READ);
}
+
+ if (red_channel_client_urgent_marshaller_is_active(rcc)) {
+ red_channel_client_restore_main_sender(rcc);
+ ASSERT(rcc->send_data.header.data != NULL);
+ red_channel_client_begin_send_message(rcc);
+ }
}
static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item)
@@ -407,7 +522,10 @@ RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedCl
// block flags)
rcc->ack_data.client_generation = ~0;
rcc->ack_data.client_window = CLIENT_ACK_WINDOW;
- rcc->send_data.marshaller = spice_marshaller_new();
+ rcc->send_data.main.marshaller = spice_marshaller_new();
+ rcc->send_data.urgent.marshaller = spice_marshaller_new();
+
+ rcc->send_data.marshaller = rcc->send_data.main.marshaller;
rcc->incoming.opaque = rcc;
rcc->incoming.cb = &channel->incoming_cb;
@@ -418,6 +536,18 @@ RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedCl
rcc->outgoing.size = 0;
red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
+ if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
+ rcc->incoming.header = mini_header_wrapper;
+ rcc->send_data.header = mini_header_wrapper;
+ rcc->is_mini_header = TRUE;
+ } else {
+ rcc->incoming.header = full_header_wrapper;
+ rcc->send_data.header = full_header_wrapper;
+ rcc->is_mini_header = FALSE;
+ }
+
+ rcc->incoming.header.data = rcc->incoming.header_buf;
+ rcc->incoming.serial = 1;
if (!channel->channel_cbs.config_socket(rcc)) {
goto error;
@@ -506,6 +636,7 @@ RedChannel *red_channel_create(int size,
client_cbs.migrate = red_channel_client_default_migrate;
red_channel_register_client_cbs(channel, &client_cbs);
+ red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER);
channel->thread_id = pthread_self();
@@ -551,6 +682,7 @@ RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id)
client_cbs.migrate = red_channel_client_default_migrate;
red_channel_register_client_cbs(channel, &client_cbs);
+ red_channel_set_common_cap(channel, SPICE_COMMON_CAP_MINI_HEADER);
channel->thread_id = pthread_self();
@@ -559,7 +691,10 @@ RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id)
return channel;
}
-static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *header, uint8_t *msg)
+static int do_nothing_handle_message(RedChannelClient *rcc,
+ uint16_t type,
+ uint32_t size,
+ uint8_t *msg)
{
return TRUE;
}
@@ -643,9 +778,14 @@ void red_channel_client_destroy(RedChannelClient *rcc)
red_channel_client_disconnect(rcc);
}
red_client_remove_channel(rcc);
- if (rcc->send_data.marshaller) {
- spice_marshaller_destroy(rcc->send_data.marshaller);
+ if (rcc->send_data.main.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.main.marshaller);
+ }
+
+ if (rcc->send_data.urgent.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.urgent.marshaller);
}
+
red_channel_client_destroy_remote_caps(rcc);
free(rcc);
}
@@ -803,7 +943,7 @@ static void red_channel_handle_migrate_data(RedChannelClient *rcc, uint32_t size
}
int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
- uint16_t type, void *message)
+ uint16_t type, void *message)
{
switch (type) {
case SPICE_MSGC_ACK_SYNC:
@@ -850,7 +990,7 @@ void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type,
{
ASSERT(red_channel_client_no_item_being_sent(rcc));
ASSERT(msg_type != 0);
- rcc->send_data.header->type = msg_type;
+ rcc->send_data.header.set_msg_type(&rcc->send_data.header, msg_type);
rcc->send_data.item = item;
if (item) {
rcc->channel->channel_cbs.hold_item(rcc, item);
@@ -862,18 +1002,44 @@ void red_channel_client_begin_send_message(RedChannelClient *rcc)
SpiceMarshaller *m = rcc->send_data.marshaller;
// TODO - better check: type in channel_allowed_types. Better: type in channel_allowed_types(channel_state)
- if (rcc->send_data.header->type == 0) {
+ if (rcc->send_data.header.get_msg_type(&rcc->send_data.header) == 0) {
red_printf("BUG: header->type == 0");
return;
}
spice_marshaller_flush(m);
rcc->send_data.size = spice_marshaller_get_total_size(m);
- rcc->send_data.header->size = rcc->send_data.size - sizeof(SpiceDataHeader);
+ rcc->send_data.header.set_msg_size(&rcc->send_data.header,
+ rcc->send_data.size - rcc->send_data.header.header_size);
rcc->ack_data.messages_window++;
- rcc->send_data.header = NULL; /* avoid writing to this until we have a new message */
+ rcc->send_data.last_sent_serial = rcc->send_data.serial;
+ rcc->send_data.header.data = NULL; /* avoid writing to this until we have a new message */
red_channel_client_send(rcc);
}
+SpiceMarshaller *red_channel_client_switch_to_urgent_sender(RedChannelClient *rcc)
+{
+ ASSERT(red_channel_client_no_item_being_sent(rcc));
+ ASSERT(rcc->send_data.header.data != NULL);
+ rcc->send_data.main.header_data = rcc->send_data.header.data;
+ rcc->send_data.main.item = rcc->send_data.item;
+
+ rcc->send_data.marshaller = rcc->send_data.urgent.marshaller;
+ rcc->send_data.item = NULL;
+ red_channel_client_reset_send_data(rcc);
+ return rcc->send_data.marshaller;
+}
+
+static void red_channel_client_restore_main_sender(RedChannelClient *rcc)
+{
+ spice_marshaller_reset(rcc->send_data.urgent.marshaller);
+ rcc->send_data.marshaller = rcc->send_data.main.marshaller;
+ rcc->send_data.header.data = rcc->send_data.main.header_data;
+ if (!rcc->is_mini_header) {
+ rcc->send_data.header.set_msg_serial(&rcc->send_data.header, rcc->send_data.serial);
+ }
+ rcc->send_data.item = rcc->send_data.main.item;
+}
+
uint64_t red_channel_client_get_message_serial(RedChannelClient *rcc)
{
return rcc->send_data.serial;
@@ -1000,7 +1166,6 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_
rcc->ack_data.client_window = client_window;
}
-
static void red_channel_remove_client(RedChannelClient *rcc)
{
ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id));
@@ -1058,6 +1223,19 @@ RedChannelClient *red_channel_client_create_dummy(int size,
rcc->client = client;
rcc->channel = channel;
red_channel_client_set_remote_caps(rcc, num_common_caps, common_caps, num_caps, caps);
+ if (red_channel_client_test_remote_common_cap(rcc, SPICE_COMMON_CAP_MINI_HEADER)) {
+ rcc->incoming.header = mini_header_wrapper;
+ rcc->send_data.header = mini_header_wrapper;
+ rcc->is_mini_header = TRUE;
+ } else {
+ rcc->incoming.header = full_header_wrapper;
+ rcc->send_data.header = full_header_wrapper;
+ rcc->is_mini_header = FALSE;
+ }
+
+ rcc->incoming.header.data = rcc->incoming.header_buf;
+ rcc->incoming.serial = 1;
+
red_channel_add_client(channel, rcc);
return rcc;
}
@@ -1131,7 +1309,7 @@ int red_channel_client_blocked(RedChannelClient *rcc)
int red_channel_client_send_message_pending(RedChannelClient *rcc)
{
- return rcc->send_data.header->type != 0;
+ return rcc->send_data.header.get_msg_type(&rcc->send_data.header) != 0;
}
/* accessors for RedChannelClient */
@@ -1150,10 +1328,11 @@ RedClient *red_channel_client_get_client(RedChannelClient *rcc)
return rcc->client;
}
-SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc)
+void red_channel_client_set_header_sub_list(RedChannelClient *rcc, uint32_t sub_list)
{
- return rcc->send_data.header;
+ rcc->send_data.header.set_msg_sub_list(&rcc->send_data.header, sub_list);
}
+
/* end of accessors */
int red_channel_get_first_socket(RedChannel *channel)