diff options
Diffstat (limited to 'server/red_channel.c')
-rw-r--r-- | server/red_channel.c | 278 |
1 files changed, 138 insertions, 140 deletions
diff --git a/server/red_channel.c b/server/red_channel.c index 80aa667c..ae333aa7 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -32,10 +32,12 @@ #include "ring.h" #include "stat.h" #include "red_channel.h" +#include "reds.h" #include "generated_marshallers.h" static void red_channel_client_event(int fd, int event, void *data); static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); +static void red_client_remove_channel(RedChannelClient *rcc); /* return the number of bytes read. -1 in case of error */ static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) @@ -138,10 +140,6 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle ret_handle = handler->cb->handle_message(handler->opaque, &handler->header, handler->msg); } - if (handler->shut) { - handler->cb->on_error(handler->opaque); - return; - } handler->msg_pos = 0; handler->msg = NULL; handler->header_pos = 0; @@ -221,22 +219,9 @@ static void red_channel_client_on_output(void *opaque, int n) stat_inc_counter(rcc->channel->out_bytes_counter, n); } -void red_channel_client_default_peer_on_error(RedChannelClient *rcc) -{ - rcc->channel->channel_cbs.disconnect(rcc); -} - -static void red_channel_peer_on_incoming_error(RedChannelClient *rcc) -{ - if (rcc->stream->shutdown) { - return; // assume error has already been handled which caused the shutdown. - } - rcc->channel->on_incoming_error(rcc); -} - -static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc) +static void red_channel_client_default_peer_on_error(RedChannelClient *rcc) { - rcc->channel->on_outgoing_error(rcc); + red_channel_client_disconnect(rcc); } static int red_channel_client_peer_get_out_msg_size(void *opaque) @@ -417,6 +402,21 @@ error: return NULL; } + +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client) +{ + RedChannelClient *rcc; + + ASSERT(size >= sizeof(RedChannelClient)); + rcc = spice_malloc0(size); + rcc->client = client; + rcc->channel = channel; + red_channel_add_client(channel, rcc); + return rcc; +} + static void red_channel_client_default_connect(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, @@ -437,6 +437,7 @@ static void red_channel_client_default_migrate(RedChannelClient *base) RedChannel *red_channel_create(int size, SpiceCoreInterface *core, + uint32_t type, uint32_t id, int migrate, int handle_acks, channel_handle_message_proc handle_message, ChannelCbs *channel_cbs) @@ -445,11 +446,13 @@ RedChannel *red_channel_create(int size, ClientCbs client_cbs; ASSERT(size >= sizeof(*channel)); - ASSERT(channel_cbs->config_socket && channel_cbs->disconnect && handle_message && + ASSERT(channel_cbs->config_socket && channel_cbs->on_disconnect && handle_message && channel_cbs->alloc_recv_buf && channel_cbs->release_item); channel = spice_malloc0(size); + channel->type = type; + channel->id = id; channel->handle_acks = handle_acks; - channel->channel_cbs.disconnect = channel_cbs->disconnect; + channel->channel_cbs.on_disconnect = channel_cbs->on_disconnect; channel->channel_cbs.send_item = channel_cbs->send_item; channel->channel_cbs.release_item = channel_cbs->release_item; channel->channel_cbs.hold_item = channel_cbs->hold_item; @@ -484,8 +487,53 @@ RedChannel *red_channel_create(int size, channel->thread_id = pthread_self(); - channel->shut = 0; // came here from inputs, perhaps can be removed? XXX channel->out_bytes_counter = 0; + + return channel; +} + +// TODO: red_worker can use this one +static void dummy_watch_update_mask(SpiceWatch *watch, int event_mask) +{ +} + +static SpiceWatch *dummy_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque) +{ + return NULL; // apparently allowed? +} + +static void dummy_watch_remove(SpiceWatch *watch) +{ +} + +// TODO: actually, since I also use channel_client_dummym, no need for core. Can be NULL +SpiceCoreInterface dummy_core = { + .watch_update_mask = dummy_watch_update_mask, + .watch_add = dummy_watch_add, + .watch_remove = dummy_watch_remove, +}; + +RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id) +{ + RedChannel *channel; + ClientCbs client_cbs; + + ASSERT(size >= sizeof(*channel)); + channel = spice_malloc0(size); + channel->type = type; + channel->id = id; + channel->core = &dummy_core; + ring_init(&channel->clients); + client_cbs.connect = red_channel_client_default_connect; + client_cbs.disconnect = red_channel_client_default_disconnect; + client_cbs.migrate = red_channel_client_default_migrate; + + red_channel_register_client_cbs(channel, &client_cbs); + + channel->thread_id = pthread_self(); + + channel->out_bytes_counter = 0; + return channel; } @@ -496,26 +544,22 @@ static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *hea RedChannel *red_channel_create_parser(int size, SpiceCoreInterface *core, + uint32_t type, uint32_t id, int migrate, int handle_acks, spice_parse_channel_func_t parser, - channel_handle_parsed_proc handle_parsed, - channel_on_incoming_error_proc incoming_error, - channel_on_outgoing_error_proc outgoing_error, + channel_handle_parsed_proc handle_parsed, ChannelCbs *channel_cbs) { - RedChannel *channel = red_channel_create(size, - core, migrate, handle_acks, do_nothing_handle_message, - channel_cbs); + RedChannel *channel = red_channel_create(size, core, type, id, + migrate, handle_acks, + do_nothing_handle_message, + channel_cbs); if (channel == NULL) { return NULL; } channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; channel->incoming_cb.parser = parser; - channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; - channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; - channel->on_incoming_error = incoming_error; - channel->on_outgoing_error = outgoing_error; return channel; } @@ -533,10 +577,27 @@ void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs) } } +void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps) +{ + channel->num_caps = num_caps; + channel->caps = caps; +} + +void red_channel_set_data(RedChannel *channel, void *data) +{ + ASSERT(channel); + channel->data = data; +} + void red_channel_client_destroy(RedChannelClient *rcc) { - red_channel_client_disconnect(rcc); - spice_marshaller_destroy(rcc->send_data.marshaller); + if (red_channel_client_is_connected(rcc)) { + red_channel_client_disconnect(rcc); + } + red_client_remove_channel(rcc); + if (rcc->send_data.marshaller) { + spice_marshaller_destroy(rcc->send_data.marshaller); + } free(rcc); } @@ -548,11 +609,13 @@ void red_channel_destroy(RedChannel *channel) if (!channel) { return; } - red_channel_pipes_clear(channel); RING_FOREACH_SAFE(link, next, &channel->clients) { red_channel_client_destroy( SPICE_CONTAINEROF(link, RedChannelClient, channel_link)); } + if (channel->caps) { + free(channel->caps); + } free(channel); } @@ -563,21 +626,7 @@ void red_channel_client_shutdown(RedChannelClient *rcc) rcc->stream->watch = NULL; shutdown(rcc->stream->socket, SHUT_RDWR); rcc->stream->shutdown = TRUE; - rcc->incoming.shut = TRUE; } - red_channel_client_release_sent_item(rcc); -} - -void red_channel_shutdown(RedChannel *channel) -{ - RingItem *link; - RingItem *next; - - red_printf("%d", channel->clients_num); - RING_FOREACH_SAFE(link, next, &channel->clients) { - red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, channel_link)); - } - red_channel_pipes_clear(channel); } void red_channel_client_send(RedChannelClient *rcc) @@ -650,11 +699,7 @@ void red_channel_push(RedChannel *channel) } RING_FOREACH_SAFE(link, next, &channel->clients) { rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - if (rcc->stream == NULL) { - rcc->channel->channel_cbs.disconnect(rcc); - } else { - red_channel_client_push(rcc); - } + red_channel_client_push(rcc); } } @@ -861,25 +906,9 @@ int red_channel_client_is_connected(RedChannelClient *rcc) return rcc->stream != NULL; } -void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item) -{ - ring_remove(&item->link); -} - int red_channel_is_connected(RedChannel *channel) { - RingItem *link; - - if (!channel || channel->clients_num == 0) { - return FALSE; - } - RING_FOREACH(link, &channel->clients) { - if (red_channel_client_is_connected( - SPICE_CONTAINEROF(link, RedChannelClient, channel_link))) { - return TRUE; - } - } - return FALSE; + return channel && (channel->clients_num > 0); } void red_channel_client_clear_sent_item(RedChannelClient *rcc) @@ -906,21 +935,6 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc) rcc->pipe_size = 0; } -void red_channel_pipes_clear(RedChannel *channel) -{ - RingItem *link; - RingItem *next; - RedChannelClient *rcc; - - if (!channel) { - return; - } - RING_FOREACH_SAFE(link, next, &channel->clients) { - rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - red_channel_client_pipe_clear(rcc); - } -} - void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) { rcc->ack_data.messages_window = 0; @@ -931,27 +945,37 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_ rcc->ack_data.client_window = client_window; } -static void red_channel_client_remove(RedChannelClient *rcc) + +static void red_channel_remove_client(RedChannelClient *rcc) { - ring_remove(&rcc->client_link); - rcc->client->channels_num--; + ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id)); ring_remove(&rcc->channel_link); rcc->channel->clients_num--; + // TODO: should we set rcc->channel to NULL??? } -void red_channel_client_disconnect(RedChannelClient *rcc) +static void red_client_remove_channel(RedChannelClient *rcc) { - red_printf("%p (channel %p)", rcc, rcc->channel); + pthread_mutex_lock(&rcc->client->lock); + ring_remove(&rcc->client_link); + rcc->client->channels_num--; + pthread_mutex_unlock(&rcc->client->lock); +} - if (rcc->send_data.item) { - rcc->channel->channel_cbs.release_item(rcc, rcc->send_data.item, FALSE); +void red_channel_client_disconnect(RedChannelClient *rcc) +{ + red_printf("%p (channel %p type %d id %d)", rcc, rcc->channel, + rcc->channel->type, rcc->channel->id); + if (!red_channel_client_is_connected(rcc)) { + return; } red_channel_client_pipe_clear(rcc); reds_stream_free(rcc->stream); - rcc->send_data.item = NULL; - rcc->send_data.blocked = FALSE; - rcc->send_data.size = 0; - red_channel_client_remove(rcc); + rcc->stream = NULL; + red_channel_remove_client(rcc); + // TODO: not do it till destroyed? +// red_channel_client_remove(rcc); + rcc->channel->channel_cbs.on_disconnect(rcc); } void red_channel_disconnect(RedChannel *channel) @@ -959,27 +983,12 @@ void red_channel_disconnect(RedChannel *channel) RingItem *link; RingItem *next; - red_channel_pipes_clear(channel); RING_FOREACH_SAFE(link, next, &channel->clients) { red_channel_client_disconnect( SPICE_CONTAINEROF(link, RedChannelClient, channel_link)); } } -int red_channel_all_clients_serials_are_zero(RedChannel *channel) -{ - RingItem *link; - RedChannelClient *rcc; - - RING_FOREACH(link, &channel->clients) { - rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - if (rcc->send_data.serial != 0) { - return FALSE; - } - } - return TRUE; -} - void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb) { RingItem *link; @@ -1004,17 +1013,6 @@ void red_channel_apply_clients_data(RedChannel *channel, channel_client_callback } } -void red_channel_set_shut(RedChannel *channel) -{ - RingItem *link; - RedChannelClient *rcc; - - RING_FOREACH(link, &channel->clients) { - rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - rcc->incoming.shut = TRUE; - } -} - int red_channel_all_blocked(RedChannel *channel) { RingItem *link; @@ -1143,18 +1141,24 @@ RedClient *red_client_new() client = spice_malloc0(sizeof(RedClient)); ring_init(&client->channels); + pthread_mutex_init(&client->lock, NULL); client->thread_id = pthread_self(); return client; } -void red_client_shutdown(RedClient *client) +void red_client_migrate(RedClient *client) { RingItem *link, *next; + RedChannelClient *rcc; - red_printf("#channels %d", client->channels_num); + red_printf("migrate client with #channels %d", client->channels_num); + ASSERT(pthread_equal(pthread_self(), client->thread_id)); RING_FOREACH_SAFE(link, next, &client->channels) { - red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link)); + rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link); + if (red_channel_client_is_connected(rcc)) { + rcc->channel->client_cbs.migrate(rcc); + } } } @@ -1175,29 +1179,23 @@ void red_client_destroy(RedClient *client) // TODO: should we go back to async. For this we need to use // ref count for channel clients. rcc->channel->client_cbs.disconnect(rcc); + ASSERT(ring_is_empty(&rcc->pipe)); + ASSERT(rcc->pipe_size == 0); + ASSERT(rcc->send_data.size == 0); + red_channel_client_destroy(rcc); } - free(client); -} -void red_client_disconnect(RedClient *client) -{ - RingItem *link, *next; - RedChannelClient *rcc; - - red_printf("#channels %d", client->channels_num); - RING_FOREACH_SAFE(link, next, &client->channels) { - // some channels may be in other threads, so disconnection - // is not synchronous. - rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link); - rcc->channel->client_cbs.disconnect(rcc); - } + pthread_mutex_destroy(&client->lock); + free(client); } static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) { ASSERT(rcc && client); + pthread_mutex_lock(&client->lock); ring_add(&client->channels, &rcc->client_link); client->channels_num++; + pthread_mutex_unlock(&client->lock); } MainChannelClient *red_client_get_main(RedClient *client) { |