diff options
-rw-r--r-- | server/inputs_channel.c | 115 | ||||
-rw-r--r-- | server/main_channel.c | 107 | ||||
-rw-r--r-- | server/main_channel.h | 4 | ||||
-rw-r--r-- | server/red_channel.c | 278 | ||||
-rw-r--r-- | server/red_channel.h | 70 | ||||
-rw-r--r-- | server/red_dispatcher.c | 163 | ||||
-rw-r--r-- | server/red_dispatcher.h | 5 | ||||
-rw-r--r-- | server/red_tunnel_worker.c | 211 | ||||
-rw-r--r-- | server/red_worker.c | 179 | ||||
-rw-r--r-- | server/red_worker.h | 5 | ||||
-rw-r--r-- | server/reds.c | 156 | ||||
-rw-r--r-- | server/reds.h | 27 | ||||
-rw-r--r-- | server/smartcard.c | 89 | ||||
-rw-r--r-- | server/snd_worker.c | 157 | ||||
-rw-r--r-- | server/usbredir.c | 102 |
15 files changed, 856 insertions, 812 deletions
diff --git a/server/inputs_channel.c b/server/inputs_channel.c index e6af0d58..653910e8 100644 --- a/server/inputs_channel.c +++ b/server/inputs_channel.c @@ -434,37 +434,18 @@ static void inputs_relase_keys(void) kbd_push_scan(keyboard, 0x38 | 0x80); //LALT } -static void inputs_channel_disconnect(RedChannelClient *rcc) +static void inputs_channel_on_disconnect(RedChannelClient *rcc) { - inputs_relase_keys(); - red_channel_client_disconnect(rcc); -} - -static void inputs_channel_on_error(RedChannelClient *rcc) -{ - red_printf(""); - inputs_channel_disconnect(rcc); -} - -static void inputs_shutdown(Channel *channel) -{ - InputsChannel *inputs_channel = (InputsChannel *)channel->data; - ASSERT(g_inputs_channel == inputs_channel); - - if (inputs_channel) { - red_channel_shutdown(&inputs_channel->base); - red_channel_destroy(&inputs_channel->base); - channel->data = NULL; - g_inputs_channel = NULL; + if (!rcc) { + return; } + inputs_relase_keys(); } -static void inputs_migrate(Channel *channel) +static void inputs_migrate(RedChannelClient *rcc) { - InputsChannel *inputs_channel = channel->data; - - ASSERT(g_inputs_channel == (InputsChannel *)channel->data); - red_channel_pipes_add_type(&inputs_channel->base, PIPE_ITEM_MIGRATE); + ASSERT(g_inputs_channel == (InputsChannel *)rcc->channel); + red_channel_client_pipe_add_type(rcc, PIPE_ITEM_MIGRATE); } static void inputs_pipe_add_init(RedChannelClient *rcc) @@ -501,40 +482,21 @@ static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item) { } -static void inputs_link(Channel *channel, RedClient *client, - RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, - int num_caps, uint32_t *caps) +static void inputs_connect(RedChannel *channel, RedClient *client, + RedsStream *stream, int migration, + int num_common_caps, uint32_t *common_caps, + int num_caps, uint32_t *caps) { InputsChannelClient *icc; - ASSERT(channel->data == g_inputs_channel); - if (channel->data == NULL) { - ChannelCbs channel_cbs; - - memset(&channel_cbs, sizeof(channel_cbs), 0); - - channel_cbs.config_socket = inputs_channel_config_socket; - channel_cbs.disconnect = inputs_channel_disconnect; - channel_cbs.send_item = inputs_channel_send_item; - channel_cbs.hold_item = inputs_channel_hold_pipe_item; - channel_cbs.release_item = inputs_channel_release_pipe_item; - channel_cbs.alloc_recv_buf = inputs_channel_alloc_msg_rcv_buf; - channel_cbs.release_recv_buf = inputs_channel_release_msg_rcv_buf; - - red_printf("inputs channel create"); - channel->data = g_inputs_channel = (InputsChannel*)red_channel_create_parser( - sizeof(InputsChannel), core, migration, FALSE /* handle_acks */ - ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL) - ,inputs_channel_handle_parsed - ,inputs_channel_on_error - ,inputs_channel_on_error - ,&channel_cbs); - ASSERT(channel->data); - } + ASSERT(g_inputs_channel); + ASSERT(channel == &g_inputs_channel->base); + red_printf("inputs channel client create"); icc = (InputsChannelClient*)red_channel_client_create(sizeof(InputsChannelClient), - channel->data, client, stream); + channel, + client, + stream); icc->motion_count = 0; inputs_pipe_add_init(&icc->base); } @@ -560,14 +522,41 @@ static void key_modifiers_sender(void *opaque) void inputs_init(void) { - Channel *channel; - - channel = spice_new0(Channel, 1); - channel->type = SPICE_CHANNEL_INPUTS; - channel->link = inputs_link; - channel->shutdown = inputs_shutdown; - channel->migrate = inputs_migrate; - reds_register_channel(channel); + ChannelCbs channel_cbs; + ClientCbs client_cbs = {0,}; + + ASSERT(!g_inputs_channel); + + memset(&channel_cbs, sizeof(channel_cbs), 0); + memset(&client_cbs, sizeof(client_cbs), 0); + + channel_cbs.config_socket = inputs_channel_config_socket; + channel_cbs.on_disconnect = inputs_channel_on_disconnect; + channel_cbs.send_item = inputs_channel_send_item; + channel_cbs.hold_item = inputs_channel_hold_pipe_item; + channel_cbs.release_item = inputs_channel_release_pipe_item; + channel_cbs.alloc_recv_buf = inputs_channel_alloc_msg_rcv_buf; + channel_cbs.release_recv_buf = inputs_channel_release_msg_rcv_buf; + + g_inputs_channel = (InputsChannel *)red_channel_create_parser( + sizeof(InputsChannel), + core, + SPICE_CHANNEL_INPUTS, 0, + FALSE, // TODO: set migration? + FALSE, /* handle_acks */ + spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL), + inputs_channel_handle_parsed, + &channel_cbs); + + if (!g_inputs_channel) { + red_error("failed to allocate Inputs Channel"); + } + + client_cbs.connect = inputs_connect; + client_cbs.migrate = inputs_migrate; + red_channel_register_client_cbs(&g_inputs_channel->base, &client_cbs); + + reds_register_channel(&g_inputs_channel->base); if (!(key_modifiers_timer = core->timer_add(key_modifiers_sender, NULL))) { red_error("key modifiers timer create failed"); diff --git a/server/main_channel.c b/server/main_channel.c index 54b3dff4..6d77f711 100644 --- a/server/main_channel.c +++ b/server/main_channel.c @@ -155,14 +155,13 @@ enum NetTestStage { NET_TEST_STAGE_RATE, }; -static void main_channel_client_disconnect(RedChannelClient *rcc) +// when disconnection occurs, let reds shutdown all channels. This will trigger the +// real disconnection of main channel +static void main_channel_client_on_disconnect(RedChannelClient *rcc) { - red_channel_client_disconnect(rcc); -} - -static void main_disconnect(MainChannel *main_chan) -{ - red_channel_destroy(&main_chan->base); + red_printf("rcc=%p", rcc); + reds_client_disconnect(rcc->client); +// red_channel_client_disconnect(rcc); } RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id) @@ -840,12 +839,6 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint return TRUE; } -static void main_channel_on_error(RedChannelClient *rcc) -{ - red_printf(""); - reds_client_disconnect(rcc->client); -} - static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header) { MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base); @@ -912,11 +905,12 @@ uint32_t main_channel_client_get_link_id(MainChannelClient *mcc) return mcc->connection_id; } -MainChannelClient *main_channel_client_create(MainChannel *main_chan, - RedClient *client, RedsStream *stream, uint32_t connection_id) +MainChannelClient *main_channel_client_create(MainChannel *main_chan, RedClient *client, + RedsStream *stream, uint32_t connection_id) { - MainChannelClient *mcc = (MainChannelClient*)red_channel_client_create( - sizeof(MainChannelClient), &main_chan->base, client, stream); + MainChannelClient *mcc = (MainChannelClient*)red_channel_client_create(sizeof(MainChannelClient), + &main_chan->base, + client, stream); mcc->connection_id = connection_id; mcc->bitrate_per_sec = ~0; @@ -929,42 +923,20 @@ MainChannelClient *main_channel_client_create(MainChannel *main_chan, return mcc; } -MainChannelClient *main_channel_link(Channel *channel, RedClient *client, - RedsStream *stream, uint32_t connection_id, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, - uint32_t *caps) +MainChannelClient *main_channel_link(MainChannel *channel, RedClient *client, + RedsStream *stream, uint32_t connection_id, int migration, + int num_common_caps, uint32_t *common_caps, int num_caps, + uint32_t *caps) { MainChannelClient *mcc; - if (channel->data == NULL) { - ChannelCbs channel_cbs; - - channel_cbs.config_socket = main_channel_config_socket; - channel_cbs.disconnect = main_channel_client_disconnect; - channel_cbs.send_item = main_channel_send_item; - channel_cbs.hold_item = main_channel_hold_pipe_item; - channel_cbs.release_item = main_channel_release_pipe_item; - channel_cbs.alloc_recv_buf = main_channel_alloc_msg_rcv_buf; - channel_cbs.release_recv_buf = main_channel_release_msg_rcv_buf; - channel_cbs.handle_migrate_flush_mark = main_channel_handle_migrate_flush_mark; - channel_cbs.handle_migrate_data = main_channel_handle_migrate_data; - channel_cbs.handle_migrate_data_get_serial = main_channel_handle_migrate_data_get_serial; - - red_printf("create main channel"); - channel->data = red_channel_create_parser( - sizeof(MainChannel), core, migration, FALSE /* handle_acks */ - ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL) - ,main_channel_handle_parsed - ,main_channel_on_error - ,main_channel_on_error - ,&channel_cbs); - ASSERT(channel->data); - } + ASSERT(channel); + // TODO - migration - I removed it from channel creation, now put it // into usage somewhere (not an issue until we return migration to it's // former glory) red_printf("add main channel client"); - mcc = main_channel_client_create(channel->data, client, stream, connection_id); + mcc = main_channel_client_create(channel, client, stream, connection_id); return mcc; } @@ -978,6 +950,7 @@ int main_channel_getpeername(MainChannel *main_chan, struct sockaddr *sa, sockle return main_chan ? getpeername(red_channel_get_first_socket(&main_chan->base), sa, salen) : -1; } +// TODO: ? shouldn't it disonnect all clients? or shutdown all main_channels? void main_channel_close(MainChannel *main_chan) { int socketfd; @@ -998,27 +971,31 @@ uint64_t main_channel_client_get_bitrate_per_sec(MainChannelClient *mcc) return mcc->bitrate_per_sec; } -static void main_channel_shutdown(Channel *channel) +MainChannel* main_channel_init(void) { - MainChannel *main_chan = channel->data; + RedChannel *channel; + ChannelCbs channel_cbs; - if (main_chan != NULL) { - main_disconnect(main_chan); - } -} -static void main_channel_migrate() -{ -} + channel_cbs.config_socket = main_channel_config_socket; + channel_cbs.on_disconnect = main_channel_client_on_disconnect; + channel_cbs.send_item = main_channel_send_item; + channel_cbs.hold_item = main_channel_hold_pipe_item; + channel_cbs.release_item = main_channel_release_pipe_item; + channel_cbs.alloc_recv_buf = main_channel_alloc_msg_rcv_buf; + channel_cbs.release_recv_buf = main_channel_release_msg_rcv_buf; + channel_cbs.handle_migrate_flush_mark = main_channel_handle_migrate_flush_mark; + channel_cbs.handle_migrate_data = main_channel_handle_migrate_data; + channel_cbs.handle_migrate_data_get_serial = main_channel_handle_migrate_data_get_serial; -Channel* main_channel_init(void) -{ - Channel *channel; - - channel = spice_new0(Channel, 1); - channel->type = SPICE_CHANNEL_MAIN; - channel->link = NULL; /* the main channel client is created by reds.c explicitly */ - channel->shutdown = main_channel_shutdown; - channel->migrate = main_channel_migrate; - return channel; + // TODO: set the migration flag of the channel + channel = red_channel_create_parser(sizeof(MainChannel), core, + SPICE_CHANNEL_MAIN, 0, + FALSE, FALSE, /* handle_acks */ + spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL), + main_channel_handle_parsed, + &channel_cbs); + ASSERT(channel); + + return (MainChannel *)channel; } diff --git a/server/main_channel.h b/server/main_channel.h index 0dd37973..80475354 100644 --- a/server/main_channel.h +++ b/server/main_channel.h @@ -47,10 +47,10 @@ struct MainMigrateData { typedef struct MainChannel MainChannel; -Channel *main_channel_init(void); +MainChannel *main_channel_init(void); RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t link_id); /* This is a 'clone' from the reds.h Channel.link callback to allow passing link_id */ -MainChannelClient *main_channel_link(struct Channel *, RedClient *client, +MainChannelClient *main_channel_link(MainChannel *, RedClient *client, RedsStream *stream, uint32_t link_id, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps); void main_channel_close(MainChannel *main_chan); // not destroy, just socket close diff --git a/server/red_channel.c b/server/red_channel.c index 80aa667c..ae333aa7 100644 --- a/server/red_channel.c +++ b/server/red_channel.c @@ -32,10 +32,12 @@ #include "ring.h" #include "stat.h" #include "red_channel.h" +#include "reds.h" #include "generated_marshallers.h" static void red_channel_client_event(int fd, int event, void *data); static void red_client_add_channel(RedClient *client, RedChannelClient *rcc); +static void red_client_remove_channel(RedChannelClient *rcc); /* return the number of bytes read. -1 in case of error */ static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size) @@ -138,10 +140,6 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle ret_handle = handler->cb->handle_message(handler->opaque, &handler->header, handler->msg); } - if (handler->shut) { - handler->cb->on_error(handler->opaque); - return; - } handler->msg_pos = 0; handler->msg = NULL; handler->header_pos = 0; @@ -221,22 +219,9 @@ static void red_channel_client_on_output(void *opaque, int n) stat_inc_counter(rcc->channel->out_bytes_counter, n); } -void red_channel_client_default_peer_on_error(RedChannelClient *rcc) -{ - rcc->channel->channel_cbs.disconnect(rcc); -} - -static void red_channel_peer_on_incoming_error(RedChannelClient *rcc) -{ - if (rcc->stream->shutdown) { - return; // assume error has already been handled which caused the shutdown. - } - rcc->channel->on_incoming_error(rcc); -} - -static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc) +static void red_channel_client_default_peer_on_error(RedChannelClient *rcc) { - rcc->channel->on_outgoing_error(rcc); + red_channel_client_disconnect(rcc); } static int red_channel_client_peer_get_out_msg_size(void *opaque) @@ -417,6 +402,21 @@ error: return NULL; } + +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client) +{ + RedChannelClient *rcc; + + ASSERT(size >= sizeof(RedChannelClient)); + rcc = spice_malloc0(size); + rcc->client = client; + rcc->channel = channel; + red_channel_add_client(channel, rcc); + return rcc; +} + static void red_channel_client_default_connect(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, @@ -437,6 +437,7 @@ static void red_channel_client_default_migrate(RedChannelClient *base) RedChannel *red_channel_create(int size, SpiceCoreInterface *core, + uint32_t type, uint32_t id, int migrate, int handle_acks, channel_handle_message_proc handle_message, ChannelCbs *channel_cbs) @@ -445,11 +446,13 @@ RedChannel *red_channel_create(int size, ClientCbs client_cbs; ASSERT(size >= sizeof(*channel)); - ASSERT(channel_cbs->config_socket && channel_cbs->disconnect && handle_message && + ASSERT(channel_cbs->config_socket && channel_cbs->on_disconnect && handle_message && channel_cbs->alloc_recv_buf && channel_cbs->release_item); channel = spice_malloc0(size); + channel->type = type; + channel->id = id; channel->handle_acks = handle_acks; - channel->channel_cbs.disconnect = channel_cbs->disconnect; + channel->channel_cbs.on_disconnect = channel_cbs->on_disconnect; channel->channel_cbs.send_item = channel_cbs->send_item; channel->channel_cbs.release_item = channel_cbs->release_item; channel->channel_cbs.hold_item = channel_cbs->hold_item; @@ -484,8 +487,53 @@ RedChannel *red_channel_create(int size, channel->thread_id = pthread_self(); - channel->shut = 0; // came here from inputs, perhaps can be removed? XXX channel->out_bytes_counter = 0; + + return channel; +} + +// TODO: red_worker can use this one +static void dummy_watch_update_mask(SpiceWatch *watch, int event_mask) +{ +} + +static SpiceWatch *dummy_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque) +{ + return NULL; // apparently allowed? +} + +static void dummy_watch_remove(SpiceWatch *watch) +{ +} + +// TODO: actually, since I also use channel_client_dummym, no need for core. Can be NULL +SpiceCoreInterface dummy_core = { + .watch_update_mask = dummy_watch_update_mask, + .watch_add = dummy_watch_add, + .watch_remove = dummy_watch_remove, +}; + +RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id) +{ + RedChannel *channel; + ClientCbs client_cbs; + + ASSERT(size >= sizeof(*channel)); + channel = spice_malloc0(size); + channel->type = type; + channel->id = id; + channel->core = &dummy_core; + ring_init(&channel->clients); + client_cbs.connect = red_channel_client_default_connect; + client_cbs.disconnect = red_channel_client_default_disconnect; + client_cbs.migrate = red_channel_client_default_migrate; + + red_channel_register_client_cbs(channel, &client_cbs); + + channel->thread_id = pthread_self(); + + channel->out_bytes_counter = 0; + return channel; } @@ -496,26 +544,22 @@ static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *hea RedChannel *red_channel_create_parser(int size, SpiceCoreInterface *core, + uint32_t type, uint32_t id, int migrate, int handle_acks, spice_parse_channel_func_t parser, - channel_handle_parsed_proc handle_parsed, - channel_on_incoming_error_proc incoming_error, - channel_on_outgoing_error_proc outgoing_error, + channel_handle_parsed_proc handle_parsed, ChannelCbs *channel_cbs) { - RedChannel *channel = red_channel_create(size, - core, migrate, handle_acks, do_nothing_handle_message, - channel_cbs); + RedChannel *channel = red_channel_create(size, core, type, id, + migrate, handle_acks, + do_nothing_handle_message, + channel_cbs); if (channel == NULL) { return NULL; } channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed; channel->incoming_cb.parser = parser; - channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error; - channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error; - channel->on_incoming_error = incoming_error; - channel->on_outgoing_error = outgoing_error; return channel; } @@ -533,10 +577,27 @@ void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs) } } +void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps) +{ + channel->num_caps = num_caps; + channel->caps = caps; +} + +void red_channel_set_data(RedChannel *channel, void *data) +{ + ASSERT(channel); + channel->data = data; +} + void red_channel_client_destroy(RedChannelClient *rcc) { - red_channel_client_disconnect(rcc); - spice_marshaller_destroy(rcc->send_data.marshaller); + if (red_channel_client_is_connected(rcc)) { + red_channel_client_disconnect(rcc); + } + red_client_remove_channel(rcc); + if (rcc->send_data.marshaller) { + spice_marshaller_destroy(rcc->send_data.marshaller); + } free(rcc); } @@ -548,11 +609,13 @@ void red_channel_destroy(RedChannel *channel) if (!channel) { return; } - red_channel_pipes_clear(channel); RING_FOREACH_SAFE(link, next, &channel->clients) { red_channel_client_destroy( SPICE_CONTAINEROF(link, RedChannelClient, channel_link)); } + if (channel->caps) { + free(channel->caps); + } free(channel); } @@ -563,21 +626,7 @@ void red_channel_client_shutdown(RedChannelClient *rcc) rcc->stream->watch = NULL; shutdown(rcc->stream->socket, SHUT_RDWR); rcc->stream->shutdown = TRUE; - rcc->incoming.shut = TRUE; } - red_channel_client_release_sent_item(rcc); -} - -void red_channel_shutdown(RedChannel *channel) -{ - RingItem *link; - RingItem *next; - - red_printf("%d", channel->clients_num); - RING_FOREACH_SAFE(link, next, &channel->clients) { - red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, channel_link)); - } - red_channel_pipes_clear(channel); } void red_channel_client_send(RedChannelClient *rcc) @@ -650,11 +699,7 @@ void red_channel_push(RedChannel *channel) } RING_FOREACH_SAFE(link, next, &channel->clients) { rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - if (rcc->stream == NULL) { - rcc->channel->channel_cbs.disconnect(rcc); - } else { - red_channel_client_push(rcc); - } + red_channel_client_push(rcc); } } @@ -861,25 +906,9 @@ int red_channel_client_is_connected(RedChannelClient *rcc) return rcc->stream != NULL; } -void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item) -{ - ring_remove(&item->link); -} - int red_channel_is_connected(RedChannel *channel) { - RingItem *link; - - if (!channel || channel->clients_num == 0) { - return FALSE; - } - RING_FOREACH(link, &channel->clients) { - if (red_channel_client_is_connected( - SPICE_CONTAINEROF(link, RedChannelClient, channel_link))) { - return TRUE; - } - } - return FALSE; + return channel && (channel->clients_num > 0); } void red_channel_client_clear_sent_item(RedChannelClient *rcc) @@ -906,21 +935,6 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc) rcc->pipe_size = 0; } -void red_channel_pipes_clear(RedChannel *channel) -{ - RingItem *link; - RingItem *next; - RedChannelClient *rcc; - - if (!channel) { - return; - } - RING_FOREACH_SAFE(link, next, &channel->clients) { - rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - red_channel_client_pipe_clear(rcc); - } -} - void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc) { rcc->ack_data.messages_window = 0; @@ -931,27 +945,37 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_ rcc->ack_data.client_window = client_window; } -static void red_channel_client_remove(RedChannelClient *rcc) + +static void red_channel_remove_client(RedChannelClient *rcc) { - ring_remove(&rcc->client_link); - rcc->client->channels_num--; + ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id)); ring_remove(&rcc->channel_link); rcc->channel->clients_num--; + // TODO: should we set rcc->channel to NULL??? } -void red_channel_client_disconnect(RedChannelClient *rcc) +static void red_client_remove_channel(RedChannelClient *rcc) { - red_printf("%p (channel %p)", rcc, rcc->channel); + pthread_mutex_lock(&rcc->client->lock); + ring_remove(&rcc->client_link); + rcc->client->channels_num--; + pthread_mutex_unlock(&rcc->client->lock); +} - if (rcc->send_data.item) { - rcc->channel->channel_cbs.release_item(rcc, rcc->send_data.item, FALSE); +void red_channel_client_disconnect(RedChannelClient *rcc) +{ + red_printf("%p (channel %p type %d id %d)", rcc, rcc->channel, + rcc->channel->type, rcc->channel->id); + if (!red_channel_client_is_connected(rcc)) { + return; } red_channel_client_pipe_clear(rcc); reds_stream_free(rcc->stream); - rcc->send_data.item = NULL; - rcc->send_data.blocked = FALSE; - rcc->send_data.size = 0; - red_channel_client_remove(rcc); + rcc->stream = NULL; + red_channel_remove_client(rcc); + // TODO: not do it till destroyed? +// red_channel_client_remove(rcc); + rcc->channel->channel_cbs.on_disconnect(rcc); } void red_channel_disconnect(RedChannel *channel) @@ -959,27 +983,12 @@ void red_channel_disconnect(RedChannel *channel) RingItem *link; RingItem *next; - red_channel_pipes_clear(channel); RING_FOREACH_SAFE(link, next, &channel->clients) { red_channel_client_disconnect( SPICE_CONTAINEROF(link, RedChannelClient, channel_link)); } } -int red_channel_all_clients_serials_are_zero(RedChannel *channel) -{ - RingItem *link; - RedChannelClient *rcc; - - RING_FOREACH(link, &channel->clients) { - rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - if (rcc->send_data.serial != 0) { - return FALSE; - } - } - return TRUE; -} - void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb) { RingItem *link; @@ -1004,17 +1013,6 @@ void red_channel_apply_clients_data(RedChannel *channel, channel_client_callback } } -void red_channel_set_shut(RedChannel *channel) -{ - RingItem *link; - RedChannelClient *rcc; - - RING_FOREACH(link, &channel->clients) { - rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link); - rcc->incoming.shut = TRUE; - } -} - int red_channel_all_blocked(RedChannel *channel) { RingItem *link; @@ -1143,18 +1141,24 @@ RedClient *red_client_new() client = spice_malloc0(sizeof(RedClient)); ring_init(&client->channels); + pthread_mutex_init(&client->lock, NULL); client->thread_id = pthread_self(); return client; } -void red_client_shutdown(RedClient *client) +void red_client_migrate(RedClient *client) { RingItem *link, *next; + RedChannelClient *rcc; - red_printf("#channels %d", client->channels_num); + red_printf("migrate client with #channels %d", client->channels_num); + ASSERT(pthread_equal(pthread_self(), client->thread_id)); RING_FOREACH_SAFE(link, next, &client->channels) { - red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link)); + rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link); + if (red_channel_client_is_connected(rcc)) { + rcc->channel->client_cbs.migrate(rcc); + } } } @@ -1175,29 +1179,23 @@ void red_client_destroy(RedClient *client) // TODO: should we go back to async. For this we need to use // ref count for channel clients. rcc->channel->client_cbs.disconnect(rcc); + ASSERT(ring_is_empty(&rcc->pipe)); + ASSERT(rcc->pipe_size == 0); + ASSERT(rcc->send_data.size == 0); + red_channel_client_destroy(rcc); } - free(client); -} -void red_client_disconnect(RedClient *client) -{ - RingItem *link, *next; - RedChannelClient *rcc; - - red_printf("#channels %d", client->channels_num); - RING_FOREACH_SAFE(link, next, &client->channels) { - // some channels may be in other threads, so disconnection - // is not synchronous. - rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link); - rcc->channel->client_cbs.disconnect(rcc); - } + pthread_mutex_destroy(&client->lock); + free(client); } static void red_client_add_channel(RedClient *client, RedChannelClient *rcc) { ASSERT(rcc && client); + pthread_mutex_lock(&client->lock); ring_add(&client->channels, &rcc->client_link); client->channels_num++; + pthread_mutex_unlock(&client->lock); } MainChannelClient *red_client_get_main(RedClient *client) { diff --git a/server/red_channel.h b/server/red_channel.h index cb33fcb1..daee8bae 100644 --- a/server/red_channel.h +++ b/server/red_channel.h @@ -24,9 +24,9 @@ #include "red_common.h" #include <pthread.h> -#include "reds.h" #include "spice.h" #include "ring.h" +#include "common/marshaller.h" #include "server/demarshallers.h" #define MAX_SEND_BUFS 1000 @@ -62,7 +62,6 @@ typedef struct IncomingHandler { uint32_t header_pos; uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. uint32_t msg_pos; - int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX } IncomingHandler; typedef int (*get_outgoing_msg_size_proc)(void *opaque); @@ -98,8 +97,11 @@ typedef struct BufDescriptor { uint8_t *data; } BufDescriptor; +typedef struct RedsStream RedsStream; typedef struct RedChannel RedChannel; typedef struct RedChannelClient RedChannelClient; +typedef struct RedClient RedClient; +typedef struct MainChannelClient MainChannelClient; /* Messages handled by red_channel * SET_ACK - sent to client on channel connection @@ -154,7 +156,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base); */ typedef struct { channel_configure_socket_proc config_socket; - channel_disconnect_proc disconnect; + channel_disconnect_proc on_disconnect; channel_send_pipe_item_proc send_item; channel_hold_pipe_item_proc hold_item; channel_release_pipe_item_proc release_item; @@ -207,10 +209,20 @@ struct RedChannelClient { }; struct RedChannel { + uint32_t type; + uint32_t id; + SpiceCoreInterface *core; int migrate; int handle_acks; + // RedChannel will hold only connected channel clients (logic - when pushing pipe item to all channel clients, there + // is no need to go over disconnect clients) + // . While client will hold the channel clients till it is destroyed + // and then it will destroy them as well. + // However RCC still holds a reference to the Channel. + // Maybe replace these logic with ref count? + // TODO: rename to 'connected_clients'? Ring clients; uint32_t clients_num; @@ -220,11 +232,10 @@ struct RedChannel { ChannelCbs channel_cbs; ClientCbs client_cbs; - /* Stuff below added for Main and Inputs channels switch to RedChannel - * (might be removed later) */ - channel_on_incoming_error_proc on_incoming_error; /* alternative to disconnect */ - channel_on_outgoing_error_proc on_outgoing_error; - int shut; /* signal channel is to be closed */ + int num_caps; + uint32_t *caps; + + void *data; // TODO: when different channel_clients are in different threads from Channel -> need to protect! pthread_t thread_id; @@ -237,6 +248,7 @@ struct RedChannel { * explicitly destroy the channel */ RedChannel *red_channel_create(int size, SpiceCoreInterface *core, + uint32_t type, uint32_t id, int migrate, int handle_acks, channel_handle_message_proc handle_message, ChannelCbs *channel_cbs); @@ -245,21 +257,39 @@ RedChannel *red_channel_create(int size, * will become default eventually */ RedChannel *red_channel_create_parser(int size, SpiceCoreInterface *core, + uint32_t type, uint32_t id, int migrate, int handle_acks, spice_parse_channel_func_t parser, channel_handle_parsed_proc handle_parsed, - channel_on_incoming_error_proc incoming_error, - channel_on_outgoing_error_proc outgoing_error, ChannelCbs *channel_cbs); void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs); +// caps are freed when the channel is destroyed +void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps); +void red_channel_set_data(RedChannel *channel, void *data); RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client, RedsStream *stream); +// TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but +// do use the client callbacks. So the channel clients are not connected (the channel doesn't +// have list of them, but they do have a link to the channel, and the client has a list of them) +RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id); +RedChannelClient *red_channel_client_create_dummy(int size, + RedChannel *channel, + RedClient *client); + + int red_channel_is_connected(RedChannel *channel); int red_channel_client_is_connected(RedChannelClient *rcc); +/* + * the disconnect callback is called from the channel's thread, + * i.e., for display channels - red worker thread, for all the other - from the main thread. + * RedClient is managed from the main thread. red_channel_client_destroy can be called only + * from red_client_destroy. + */ + void red_channel_client_destroy(RedChannelClient *rcc); void red_channel_destroy(RedChannel *channel); @@ -267,7 +297,6 @@ void red_channel_destroy(RedChannel *channel); * thread. It will not touch the rings, just shutdown the socket. * It should be followed by some way to gurantee a disconnection. */ void red_channel_client_shutdown(RedChannelClient *rcc); -void red_channel_shutdown(RedChannel *channel); /* should be called when a new channel is ready to send messages */ void red_channel_init_outgoing_messages_window(RedChannel *channel); @@ -276,9 +305,6 @@ void red_channel_init_outgoing_messages_window(RedChannel *channel); int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size, uint16_t type, void *message); -/* default error handler that disconnects channel */ -void red_channel_client_default_peer_on_error(RedChannelClient *rcc); - /* when preparing send_data: should call init and then use marshaller */ void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item); @@ -303,7 +329,6 @@ void red_channel_client_pipe_add_push(RedChannelClient *rcc, PipeItem *item); void red_channel_client_pipe_add(RedChannelClient *rcc, PipeItem *item); void red_channel_client_pipe_add_after(RedChannelClient *rcc, PipeItem *item, PipeItem *pos); int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, PipeItem *item); -void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item); void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, PipeItem *item); void red_channel_client_pipe_add_tail(RedChannelClient *rcc, PipeItem *item); /* for types that use this routine -> the pipe item should be freed */ @@ -315,9 +340,6 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_ void red_channel_client_push_set_ack(RedChannelClient *rcc); void red_channel_push_set_ack(RedChannel *channel); -/* TODO: This sets all clients to shut state - probably we want to close per channel */ -void red_channel_shutdown(RedChannel *channel); - int red_channel_get_first_socket(RedChannel *channel); /* return TRUE if all of the connected clients to this channel are blocked */ @@ -354,7 +376,6 @@ void red_channel_client_push(RedChannelClient *rcc); // TODO: again - what is the context exactly? this happens in channel disconnect. but our // current red_channel_shutdown also closes the socket - is there a socket to close? // are we reading from an fd here? arghh -void red_channel_pipes_clear(RedChannel *channel); void red_channel_client_pipe_clear(RedChannelClient *rcc); // Again, used in various places outside of event handler context (or in other event handler // contexts): @@ -403,17 +424,22 @@ struct RedClient { RingItem link; Ring channels; int channels_num; - int disconnecting; MainChannelClient *mcc; + pthread_mutex_t lock; // different channels can be in different threads pthread_t thread_id; + + int disconnecting; }; RedClient *red_client_new(); MainChannelClient *red_client_get_main(RedClient *client); +// main should be set once before all the other channels are created void red_client_set_main(RedClient *client, MainChannelClient *mcc); + + +void red_client_migrate(RedClient *client); +// disconnects all the client's channels (should be called from the client's thread) void red_client_destroy(RedClient *client); -void red_client_disconnect(RedClient *client); -void red_client_remove_channel(RedClient *client, RedChannelClient *rcc); #endif diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c index 8cbdec9d..c00dc580 100644 --- a/server/red_dispatcher.c +++ b/server/red_dispatcher.c @@ -76,10 +76,10 @@ extern spice_wan_compression_t zlib_glz_state; static RedDispatcher *dispatchers = NULL; -static void red_dispatcher_set_peer(Channel *channel, RedClient *client, - RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, - uint32_t *caps) +static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *client, + RedsStream *stream, int migration, + int num_common_caps, uint32_t *common_caps, int num_caps, + uint32_t *caps) { RedDispatcher *dispatcher; @@ -92,23 +92,41 @@ static void red_dispatcher_set_peer(Channel *channel, RedClient *client, send_data(dispatcher->channel, &migration, sizeof(int)); } -static void red_dispatcher_shutdown_peer(Channel *channel) +static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc) { - RedDispatcher *dispatcher = (RedDispatcher *)channel->data; + RedDispatcher *dispatcher; + + if (!rcc->channel) { + return; + } + + dispatcher = (RedDispatcher *)rcc->channel->data; + red_printf(""); RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT; write_message(dispatcher->channel, &message); + send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *)); + + // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count + // for channels + read_message(dispatcher->channel, &message); + ASSERT(message == RED_WORKER_MESSAGE_READY); } -static void red_dispatcher_migrate(Channel *channel) +static void red_dispatcher_display_migrate(RedChannelClient *rcc) { - RedDispatcher *dispatcher = (RedDispatcher *)channel->data; - red_printf("channel type %u id %u", channel->type, channel->id); + RedDispatcher *dispatcher; + if (!rcc->channel) { + return; + } + dispatcher = (RedDispatcher *)rcc->channel->data; + red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id); RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE; write_message(dispatcher->channel, &message); + send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *)); } -static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, RedsStream *stream, +static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) @@ -122,18 +140,33 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, send_data(dispatcher->channel, &migration, sizeof(int)); } -static void red_dispatcher_shutdown_cursor_peer(Channel *channel) +static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc) { - RedDispatcher *dispatcher = (RedDispatcher *)channel->data; + RedDispatcher *dispatcher; + + if (!rcc->channel) { + return; + } + + dispatcher = (RedDispatcher *)rcc->channel->data; red_printf(""); RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT; write_message(dispatcher->channel, &message); + send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *)); + + read_message(dispatcher->channel, &message); + ASSERT(message == RED_WORKER_MESSAGE_READY); } -static void red_dispatcher_cursor_migrate(Channel *channel) +static void red_dispatcher_cursor_migrate(RedChannelClient *rcc) { - RedDispatcher *dispatcher = (RedDispatcher *)channel->data; - red_printf("channel type %u id %u", channel->type, channel->id); + RedDispatcher *dispatcher; + + if (!rcc->channel) { + return; + } + dispatcher = (RedDispatcher *)rcc->channel->data; + red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id); RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE; write_message(dispatcher->channel, &message); } @@ -591,36 +624,6 @@ static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker, red_dispatcher_loadvm_commands((RedDispatcher*)qxl_worker, ext, count); } -static void red_dispatcher_send_disconnect(RedDispatcher *dispatcher, - struct RedChannelClient *rcc, RedWorkerMessage message) -{ - write_message(dispatcher->channel, &message); - send_data(dispatcher->channel, &rcc, sizeof(struct RedChannelClient *)); -} - -void red_dispatcher_disconnect_display_client(RedDispatcher *dispatcher, - struct RedChannelClient *rcc) -{ - RedWorkerMessage message = RED_WORKER_MESSAGE_STOP; - - red_dispatcher_send_disconnect(dispatcher, rcc, - RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); -} - -void red_dispatcher_disconnect_cursor_client(RedDispatcher *dispatcher, - struct RedChannelClient *rcc) -{ - RedWorkerMessage message = RED_WORKER_MESSAGE_STOP; - - red_dispatcher_send_disconnect(dispatcher, rcc, - RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT); - read_message(dispatcher->channel, &message); - ASSERT(message == RED_WORKER_MESSAGE_READY); -} - - void red_dispatcher_set_mm_time(uint32_t mm_time) { RedDispatcher *now = dispatchers; @@ -867,6 +870,32 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher, uint64_t co dispatcher->qxl->st->qif->async_complete(dispatcher->qxl, cookie); } +static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher) +{ + RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE; + RedChannel *display_channel; + + write_message(dispatcher->channel, &message); + + receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *)); + read_message(dispatcher->channel, &message); + ASSERT(message == RED_WORKER_MESSAGE_READY); + return display_channel; +} + +static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher) +{ + RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE; + RedChannel *cursor_channel; + + write_message(dispatcher->channel, &message); + + receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *)); + read_message(dispatcher->channel, &message); + ASSERT(message == RED_WORKER_MESSAGE_READY); + return cursor_channel; +} + RedDispatcher *red_dispatcher_init(QXLInstance *qxl) { RedDispatcher *dispatcher; @@ -875,10 +904,11 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl) WorkerInitData init_data; QXLDevInitInfo init_info; int r; - Channel *reds_channel; - Channel *cursor_channel; + RedChannel *display_channel; + RedChannel *cursor_channel; sigset_t thread_sig_mask; sigset_t curr_sig_mask; + ClientCbs client_cbs = {0,}; quic_init(); sw_canvas_init(); @@ -950,23 +980,28 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl) read_message(dispatcher->channel, &message); ASSERT(message == RED_WORKER_MESSAGE_READY); - reds_channel = spice_new0(Channel, 1); - reds_channel->type = SPICE_CHANNEL_DISPLAY; - reds_channel->id = qxl->id; - reds_channel->link = red_dispatcher_set_peer; - reds_channel->shutdown = red_dispatcher_shutdown_peer; - reds_channel->migrate = red_dispatcher_migrate; - reds_channel->data = dispatcher; - reds_register_channel(reds_channel); - - cursor_channel = spice_new0(Channel, 1); - cursor_channel->type = SPICE_CHANNEL_CURSOR; - cursor_channel->id = qxl->id; - cursor_channel->link = red_dispatcher_set_cursor_peer; - cursor_channel->shutdown = red_dispatcher_shutdown_cursor_peer; - cursor_channel->migrate = red_dispatcher_cursor_migrate; - cursor_channel->data = dispatcher; - reds_register_channel(cursor_channel); + display_channel = red_dispatcher_display_channel_create(dispatcher); + + if (display_channel) { + client_cbs.connect = red_dispatcher_set_display_peer; + client_cbs.disconnect = red_dispatcher_disconnect_display_peer; + client_cbs.migrate = red_dispatcher_display_migrate; + red_channel_register_client_cbs(display_channel, &client_cbs); + red_channel_set_data(display_channel, dispatcher); + reds_register_channel(display_channel); + } + + cursor_channel = red_dispatcher_cursor_channel_create(dispatcher); + + if (cursor_channel) { + client_cbs.connect = red_dispatcher_set_cursor_peer; + client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer; + client_cbs.migrate = red_dispatcher_cursor_migrate; + red_channel_register_client_cbs(cursor_channel, &client_cbs); + red_channel_set_data(cursor_channel, dispatcher); + reds_register_channel(cursor_channel); + } + qxl->st->qif->attache_worker(qxl, &dispatcher->base); qxl->st->qif->set_compression_level(qxl, calc_compression_level()); diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h index 07a95aec..144a40e2 100644 --- a/server/red_dispatcher.h +++ b/server/red_dispatcher.h @@ -32,9 +32,4 @@ uint32_t red_dispatcher_qxl_ram_size(void); int red_dispatcher_qxl_count(void); void red_dispatcher_async_complete(struct RedDispatcher*, uint64_t); -void red_dispatcher_disconnect_display_client(struct RedDispatcher *dispatcher, - struct RedChannelClient *rcc); -void red_dispatcher_disconnect_cursor_client(struct RedDispatcher *dispatcher, - struct RedChannelClient *rcc); - #endif diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c index 79cb8546..6c36fecb 100644 --- a/server/red_tunnel_worker.c +++ b/server/red_tunnel_worker.c @@ -537,8 +537,8 @@ typedef struct TunnelPrintService { } TunnelPrintService; struct TunnelWorker { - Channel channel_interface; // for reds - TunnelChannelClient *channel; + RedChannel *channel; + TunnelChannelClient *channel_client; SpiceCoreInterface *core_interface; SpiceNetWireInstance *sin; @@ -564,7 +564,7 @@ struct TunnelWorker { /********************************************************************* * Tunnel interface *********************************************************************/ -static void tunnel_channel_disconnect(RedChannel *channel); +static void tunnel_channel_on_disconnect(RedChannel *channel); /* networking interface for slirp */ static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface); @@ -601,23 +601,23 @@ static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface, static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms); -/* reds interface */ -static void handle_tunnel_channel_link(Channel *channel, RedClient *client, +/* RedChannel interface */ + +static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps); -static void handle_tunnel_channel_shutdown(struct Channel *channel); -static void handle_tunnel_channel_migrate(struct Channel *channel); - +static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc); +static void red_tunnel_channel_create(TunnelWorker *worker); static void tunnel_shutdown(TunnelWorker *worker) { int i; red_printf(""); /* shutdown input from channel */ - if (worker->channel) { - red_channel_shutdown(worker->channel->base.channel); + if (worker->channel_client) { + red_channel_client_shutdown(&worker->channel_client->base); } /* shutdown socket pipe items */ @@ -745,7 +745,7 @@ static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_ --sckt->in_data.num_buffers; __tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf); ++sckt->in_data.num_tokens; - __process_rcv_buf_tokens(sckt->worker->channel, sckt); + __process_rcv_buf_tokens(sckt->worker->channel_client, sckt); } static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker, @@ -973,7 +973,7 @@ SPICE_GNUC_VISIBLE void spice_server_net_wire_recv_packet(SpiceNetWireInstance * TunnelWorker *worker = sin->st->worker; ASSERT(worker); - if (worker->channel && worker->channel->base.channel->migrate) { + if (worker->channel_client && worker->channel_client->base.channel->migrate) { return; // during migration and the tunnel state hasn't been restored yet. } @@ -1016,15 +1016,9 @@ void *red_tunnel_attach(SpiceCoreInterface *core_interface, worker->null_interface.worker = worker; - worker->channel_interface.type = SPICE_CHANNEL_TUNNEL; - worker->channel_interface.id = 0; - worker->channel_interface.link = handle_tunnel_channel_link; - worker->channel_interface.shutdown = handle_tunnel_channel_shutdown; - worker->channel_interface.migrate = handle_tunnel_channel_migrate; - worker->channel_interface.data = worker; + red_tunnel_channel_create(worker); - ring_init(&worker->services); - reds_register_channel(&worker->channel_interface); + ring_init(&worker->services); net_slirp_init(worker->sif->get_ip(worker->sin), TRUE, @@ -1096,7 +1090,7 @@ static inline TunnelService *__tunnel_worker_add_service(TunnelWorker *worker, u #endif if (!virt_ip) { new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP; - red_channel_client_pipe_add(&worker->channel->base, &new_service->pipe_item); + red_channel_client_pipe_add(&worker->channel_client->base, &new_service->pipe_item); } return new_service; @@ -1292,24 +1286,24 @@ static RedSocket *tunnel_worker_create_socket(TunnelWorker *worker, uint16_t loc static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt) { - if (worker->channel) { - if (red_channel_client_pipe_item_is_linked(&worker->channel->base, + if (worker->channel_client) { + if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base, &sckt->out_data.data_pipe_item)) { - red_channel_client_pipe_remove_and_release(&worker->channel->base, + red_channel_client_pipe_remove_and_release(&worker->channel_client->base, &sckt->out_data.data_pipe_item); return; } - if (red_channel_client_pipe_item_is_linked(&worker->channel->base, + if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base, &sckt->out_data.status_pipe_item)) { - red_channel_client_pipe_remove_and_release(&worker->channel->base, + red_channel_client_pipe_remove_and_release(&worker->channel_client->base, &sckt->out_data.status_pipe_item); return; } - if (red_channel_client_pipe_item_is_linked(&worker->channel->base, + if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base, &sckt->out_data.token_pipe_item)) { - red_channel_client_pipe_remove_and_release(&worker->channel->base, + red_channel_client_pipe_remove_and_release(&worker->channel_client->base, &sckt->out_data.token_pipe_item); return; } @@ -1631,7 +1625,7 @@ static inline int __client_socket_can_receive(RedSocket *sckt) { return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) && - !sckt->worker->channel->mig_inprogress); + !sckt->worker->channel_client->mig_inprogress); } static int tunnel_channel_handle_socket_token(TunnelChannelClient *channel, RedSocket *sckt, @@ -1870,7 +1864,7 @@ static void restored_rcv_buf_release(RawTunneledBuffer *buf) --sckt->in_data.num_buffers; __tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf); // for case that ready queue is empty and the client has no tokens - __process_rcv_buf_tokens(sckt->worker->channel, sckt); + __process_rcv_buf_tokens(sckt->worker->channel_client, sckt); } RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt) @@ -1889,7 +1883,7 @@ static void restore_tokens_buf_release(RawTunneledBuffer *buf) RedSocket *sckt = (RedSocket *)buf->usr_opaque; sckt->in_data.num_tokens += tokens_buf->num_tokens; - __process_rcv_buf_tokens(sckt->worker->channel, sckt); + __process_rcv_buf_tokens(sckt->worker->channel_client, sckt); free(tokens_buf); } @@ -2182,7 +2176,7 @@ static uint64_t tunnel_channel_handle_migrate_data_get_serial(RedChannelClient * static uint64_t tunnel_channel_handle_migrate_data(RedChannelClient *base, uint32_t size, void *msg) { - TunnelChannelClient *channel = SPICE_CONTAINEROF(base->channel, TunnelChannelClient, base); + TunnelChannelClient *channel = SPICE_CONTAINEROF(base, TunnelChannelClient, base); TunnelMigrateSocketList *sockets_list; TunnelMigrateServicesList *services_list; TunnelMigrateData *migrate_data = msg; @@ -2790,22 +2784,22 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem sckt_out_data->push_tail = NULL; sckt_out_data->push_tail_size = 0; - if (worker->channel) { + if (worker->channel_client) { // can still send data to socket if (__client_socket_can_receive(sckt)) { if (sckt_out_data->ready_chunks_queue.head) { // the pipe item may already be linked, if for example the send was // blocked and before it finished and called release, tunnel_socket_send was called if (!red_channel_client_pipe_item_is_linked( - &worker->channel->base, &sckt_out_data->data_pipe_item)) { + &worker->channel_client->base, &sckt_out_data->data_pipe_item)) { sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; - red_channel_client_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item); + red_channel_client_pipe_add(&worker->channel_client->base, &sckt_out_data->data_pipe_item); } } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) || (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { - __tunnel_socket_add_fin_to_pipe(worker->channel, sckt); + __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt); } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { - __tunnel_socket_add_close_to_pipe(worker->channel, sckt); + __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt); } } } @@ -2813,7 +2807,7 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) && - !sckt->in_slirp_send && !worker->channel->mig_inprogress) { + !sckt->in_slirp_send && !worker->channel_client->mig_inprogress) { // for cases that slirp couldn't write whole it data to our socket buffer net_slirp_socket_can_send_notify(sckt->slirp_sckt); } @@ -2935,8 +2929,8 @@ static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, red_printf("TUNNEL_DBG"); #endif worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; - ASSERT(worker->channel); - ASSERT(!worker->channel->mig_inprogress); + ASSERT(worker->channel_client); + ASSERT(!worker->channel_client->mig_inprogress); far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port)); @@ -2964,7 +2958,7 @@ static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, #endif *o_usr_s = sckt; sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN; - red_channel_client_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item); + red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.status_pipe_item); errno = EINPROGRESS; return -1; @@ -2989,7 +2983,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; - ASSERT(!worker->channel->mig_inprogress); + ASSERT(!worker->channel_client->mig_inprogress); sckt = (RedSocket *)opaque; @@ -3014,7 +3008,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke } if (urgent) { - SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported"); + SET_TUNNEL_ERROR(worker->channel_client, "urgent msgs not supported"); tunnel_shutdown(worker); errno = ECONNRESET; return -1; @@ -3037,7 +3031,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke red_printf("socket out buffers overflow, socket will be closed" " (local_port=%d, service_id=%d)", ntohs(sckt->local_port), sckt->far_service->id); - tunnel_socket_force_close(worker->channel, sckt); + tunnel_socket_force_close(worker->channel_client, sckt); size_to_send = 0; } else { size_to_send = len; @@ -3050,10 +3044,10 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke sckt->out_data.data_size += size_to_send; if (sckt->out_data.ready_chunks_queue.head && - !red_channel_client_pipe_item_is_linked(&worker->channel->base, + !red_channel_client_pipe_item_is_linked(&worker->channel_client->base, &sckt->out_data.data_pipe_item)) { sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; - red_channel_client_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item); + red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.data_pipe_item); } } @@ -3093,7 +3087,7 @@ static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocke ASSERT(opaque); worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; - ASSERT(!worker->channel->mig_inprogress); + ASSERT(!worker->channel_client->mig_inprogress); sckt = (RedSocket *)opaque; @@ -3104,14 +3098,14 @@ static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocke if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) || (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { - SET_TUNNEL_ERROR(worker->channel, "receive was shutdown"); + SET_TUNNEL_ERROR(worker->channel_client, "receive was shutdown"); tunnel_shutdown(worker); errno = ECONNRESET; return -1; } if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { - SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected"); + SET_TUNNEL_ERROR(worker->channel_client, "slirp socket not connected"); tunnel_shutdown(worker); errno = ECONNRESET; return -1; @@ -3177,7 +3171,7 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, #ifdef DEBUG_NETWORK PRINT_SCKT(sckt); #endif - ASSERT(!worker->channel->mig_inprogress); + ASSERT(!worker->channel_client->mig_inprogress); if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { return; @@ -3189,7 +3183,7 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND); sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; } else { - SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d", + SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_send slirp_status=%d", sckt->slirp_status); tunnel_shutdown(worker); return; @@ -3200,11 +3194,11 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, // check if there is still data to send. the fin will be sent after data is released // channel is alive, otherwise the sockets would have been aborted if (!sckt->out_data.ready_chunks_queue.head) { - __tunnel_socket_add_fin_to_pipe(worker->channel, sckt); + __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt); } } else { // if client is closed, it means the connection was aborted since we didn't // received fin from guest - SET_TUNNEL_ERROR(worker->channel, + SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_send client_status=%d", sckt->client_status); tunnel_shutdown(worker); @@ -3229,12 +3223,12 @@ static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, #ifdef DEBUG_NETWORK PRINT_SCKT(sckt); #endif - ASSERT(!worker->channel->mig_inprogress); + ASSERT(!worker->channel_client->mig_inprogress); /* failure in recv can happen after the client sckt was shutdown (after client sent FIN, or after slirp sent FIN and client socket was closed */ if (!__should_send_fin_to_guest(sckt)) { - SET_TUNNEL_ERROR(worker->channel, + SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d", sckt->client_status, sckt->slirp_status); tunnel_shutdown(worker); @@ -3246,7 +3240,7 @@ static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) { sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; } else { - SET_TUNNEL_ERROR(worker->channel, + SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_recv slirp_status=%d", sckt->slirp_status); tunnel_shutdown(worker); @@ -3302,11 +3296,11 @@ static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSoc // check if there is still data to send. the close will be sent after data is released. // close may already been pushed if it is a forced close if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) { - __tunnel_socket_add_close_to_pipe(worker->channel, sckt); + __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt); } } else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { if (sckt->client_waits_close_ack) { - __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt); + __tunnel_socket_add_close_ack_to_pipe(worker->channel_client, sckt); } else { tunnel_worker_free_socket(worker, sckt); } @@ -3333,12 +3327,12 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; #ifdef DEBUG_NETWORK - if (!worker->channel) { + if (!worker->channel_client) { red_printf("channel not connected"); } #endif - if (worker->channel && worker->channel->mig_inprogress) { - SET_TUNNEL_ERROR(worker->channel, "during migration"); + if (worker->channel_client && worker->channel_client->mig_inprogress) { + SET_TUNNEL_ERROR(worker->channel_client, "during migration"); tunnel_shutdown(worker); return; } @@ -3398,27 +3392,25 @@ static void tunnel_worker_disconnect_slirp(TunnelWorker *worker) /* don't call disconnect from functions that might be called by slirp since it closes all its sockets and slirp is not aware of it */ -static void tunnel_channel_disconnect(RedChannel *channel) +static void tunnel_channel_on_disconnect(RedChannel *channel) { - TunnelChannelClient *tunnel_channel = (TunnelChannelClient *)channel; TunnelWorker *worker; if (!channel) { return; } red_printf(""); - worker = tunnel_channel->worker; + worker = (TunnelWorker *)channel->data; tunnel_worker_disconnect_slirp(worker); tunnel_worker_clear_routed_network(worker); - red_channel_destroy(channel); - worker->channel = NULL; + worker->channel_client = NULL; } // TODO - not MC friendly, remove -static void tunnel_channel_disconnect_client(RedChannelClient *rcc) +static void tunnel_channel_client_on_disconnect(RedChannelClient *rcc) { - tunnel_channel_disconnect(rcc->channel); + tunnel_channel_on_disconnect(rcc->channel); } /* interface for reds */ @@ -3439,7 +3431,7 @@ static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item) { } -static void handle_tunnel_channel_link(Channel *channel, RedClient *client, +static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, @@ -3447,15 +3439,42 @@ static void handle_tunnel_channel_link(Channel *channel, RedClient *client, { TunnelChannelClient *tcc; TunnelWorker *worker = (TunnelWorker *)channel->data; - RedChannel *tunnel_channel; - ChannelCbs channel_cbs = {0,}; - if (worker->channel) { - tunnel_channel_disconnect(worker->channel->base.channel); + if (worker->channel_client) { + red_error("tunnel does not support multiple client"); } + tcc = (TunnelChannelClient*)red_channel_client_create(sizeof(TunnelChannelClient), + channel, client, stream); + + tcc->worker = worker; + tcc->worker->channel_client = tcc; + net_slirp_set_net_interface(&worker->tunnel_interface.base); + + on_new_tunnel_channel(tcc); +} + +static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc) +{ + TunnelChannelClient *tunnel_channel; +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: MIGRATE STARTED"); +#endif + tunnel_channel = (TunnelChannelClient *)rcc; + ASSERT(tunnel_channel == tunnel_channel->worker->channel_client); + tunnel_channel->mig_inprogress = TRUE; + net_slirp_freeze(); + red_channel_client_pipe_add_type(rcc, PIPE_ITEM_TYPE_MIGRATE); +} + +static void red_tunnel_channel_create(TunnelWorker *worker) +{ + RedChannel *channel; + ChannelCbs channel_cbs; + ClientCbs client_cbs = {0,}; + channel_cbs.config_socket = tunnel_channel_config_socket; - channel_cbs.disconnect = tunnel_channel_disconnect_client; + channel_cbs.on_disconnect = tunnel_channel_client_on_disconnect; channel_cbs.alloc_recv_buf = tunnel_channel_alloc_msg_rcv_buf; channel_cbs.release_recv_buf = tunnel_channel_release_msg_rcv_buf; channel_cbs.hold_item = tunnel_channel_hold_pipe_item; @@ -3465,39 +3484,23 @@ static void handle_tunnel_channel_link(Channel *channel, RedClient *client, channel_cbs.handle_migrate_data = tunnel_channel_handle_migrate_data; channel_cbs.handle_migrate_data_get_serial = tunnel_channel_handle_migrate_data_get_serial; - tunnel_channel = red_channel_create(sizeof(RedChannel), - worker->core_interface, - migration, TRUE, - tunnel_channel_handle_message, - &channel_cbs); - - if (!tunnel_channel) { + channel = red_channel_create(sizeof(RedChannel), + worker->core_interface, + SPICE_CHANNEL_TUNNEL, 0, + FALSE, // TODO: handle migration=TRUE + TRUE, + tunnel_channel_handle_message, + &channel_cbs); + if (!channel) { return; } - tcc = (TunnelChannelClient*)red_channel_client_create( - sizeof(TunnelChannelClient), - tunnel_channel, client, stream); - tcc->worker = worker; - tcc->worker->channel = tcc; - net_slirp_set_net_interface(&worker->tunnel_interface.base); - - on_new_tunnel_channel(tcc); -} + client_cbs.connect = handle_tunnel_channel_link; + client_cbs.migrate = handle_tunnel_channel_client_migrate; + red_channel_register_client_cbs(channel, &client_cbs); -static void handle_tunnel_channel_shutdown(struct Channel *channel) -{ - tunnel_channel_disconnect(((TunnelWorker *)channel->data)->channel->base.channel); -} - -static void handle_tunnel_channel_migrate(struct Channel *channel) -{ -#ifdef DEBUG_NETWORK - red_printf("TUNNEL_DBG: MIGRATE STARTED"); -#endif - TunnelChannelClient *tunnel_channel = ((TunnelWorker *)channel->data)->channel; - tunnel_channel->mig_inprogress = TRUE; - net_slirp_freeze(); - red_channel_client_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE); + worker->channel = channel; + red_channel_set_data(channel, worker); + reds_register_channel(worker->channel); } diff --git a/server/red_worker.c b/server/red_worker.c index 4347d243..e767623e 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -8606,15 +8606,20 @@ static void red_disconnect_channel(RedChannel *channel) static void display_channel_client_disconnect(RedChannelClient *rcc) { - // TODO: MC: right now we assume single channel + red_channel_client_disconnect(rcc); +} + +static void display_channel_client_on_disconnect(RedChannelClient *rcc) +{ DisplayChannel *display_channel; DisplayChannelClient *dcc = RCC_TO_DCC(rcc); CommonChannel *common; RedWorker *worker; - if (!rcc || !red_channel_is_connected(rcc->channel)) { + if (!rcc) { return; } + red_printf(""); common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base); worker = common->worker; display_channel = (DisplayChannel *)rcc->channel; @@ -8629,8 +8634,6 @@ static void display_channel_client_disconnect(RedChannelClient *rcc) red_display_reset_compress_buf(dcc); free(dcc->send_data.free_list.res); red_display_destroy_streams(dcc); - red_channel_client_pipe_clear(rcc); // do this before deleting surfaces - red_channel_client_disconnect(rcc); // this was the last channel client if (!red_channel_is_connected(rcc->channel)) { @@ -8654,12 +8657,15 @@ void red_disconnect_all_display_TODO_remove_me(RedChannel *channel) worker->display_channel = NULL; } -static void red_migrate_display(RedWorker *worker) +static void red_migrate_display(RedWorker *worker, RedChannelClient *rcc) { - if (worker->display_channel) { - red_pipes_add_verb(&worker->display_channel->common.base, - SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL); - red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE); + // TODO: replace all worker->display_channel tests with + // is_connected + if (red_channel_client_is_connected(rcc)) { + red_pipe_add_verb(rcc, PIPE_ITEM_TYPE_MIGRATE); +// red_pipes_add_verb(&worker->display_channel->common.base, +// SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL); +// red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE); } } @@ -8900,7 +8906,7 @@ static inline void flush_display_commands(RedWorker *worker) for (;;) { red_channel_push(&worker->display_channel->common.base); if (!display_is_connected(worker) || - red_channel_min_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) { + red_channel_max_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) { break; } RedChannel *channel = (RedChannel *)worker->display_channel; @@ -8961,6 +8967,9 @@ static inline void flush_cursor_commands(RedWorker *worker) } } +// TODO: on timeout, don't disconnect all channeld immeduiatly - try to disconnect the slowest ones first +// and maybe turn timeouts to several timeouts in order to disconnect channels gradually. +// Should use disconnect or shutdown? static inline void flush_all_qxl_commands(RedWorker *worker) { flush_display_commands(worker); @@ -9487,15 +9496,13 @@ static int listen_to_new_client_channel(CommonChannel *common, return TRUE; } -static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, int migrate, +static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate, event_listener_action_proc handler, - channel_disconnect_proc disconnect, + channel_disconnect_proc on_disconnect, channel_send_pipe_item_proc send_item, channel_hold_pipe_item_proc hold_item, channel_release_pipe_item_proc release_item, channel_handle_parsed_proc handle_parsed, - channel_on_incoming_error_proc on_incoming_error, - channel_on_outgoing_error_proc on_outgoing_error, channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark, channel_handle_migrate_data_proc handle_migrate_data, channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial) @@ -9505,7 +9512,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i ChannelCbs channel_cbs; channel_cbs.config_socket = common_channel_config_socket; - channel_cbs.disconnect = disconnect; + channel_cbs.on_disconnect = on_disconnect; channel_cbs.send_item = send_item; channel_cbs.hold_item = hold_item; channel_cbs.release_item = release_item; @@ -9515,12 +9522,12 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i channel_cbs.handle_migrate_data = handle_migrate_data; channel_cbs.handle_migrate_data_get_serial = handle_migrate_data_get_serial; - channel = red_channel_create_parser(size, &worker_core, migrate, + channel = red_channel_create_parser(size, &worker_core, + channel_type, worker->id, + migrate, TRUE /* handle_acks */, - spice_get_client_channel_parser(channel_id, NULL), + spice_get_client_channel_parser(channel_type, NULL), handle_parsed, - on_incoming_error, - on_outgoing_error, &channel_cbs); common = (CommonChannel *)channel; if (!channel) { @@ -9682,50 +9689,6 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item, } } -static void display_channel_on_incoming_error(RedChannelClient *rcc) -{ - red_printf(""); - red_channel_client_shutdown(rcc); -} - -static void display_channel_on_outgoing_error(RedChannelClient *rcc) -{ - red_printf(""); - red_channel_client_shutdown(rcc); -} - -static void cursor_channel_on_incoming_error(RedChannelClient *rcc) -{ - red_printf(""); - red_channel_client_shutdown(rcc); -} - -static void cursor_channel_on_outgoing_error(RedChannelClient *rcc) -{ - red_printf(""); - red_channel_client_shutdown(rcc); -} - -// call this from dispatcher thread context -static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc) -{ - RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker; - struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher; - - red_printf(""); - red_dispatcher_disconnect_display_client(dispatcher, rcc); -} - -// call this from dispatcher thread context -static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc) -{ - RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker; - struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher; - - red_printf(""); - red_dispatcher_disconnect_cursor_client(dispatcher, rcc); -} - static void ensure_display_channel_created(RedWorker *worker, int migrate) { DisplayChannel *display_channel; @@ -9739,17 +9702,16 @@ static void ensure_display_channel_created(RedWorker *worker, int migrate) worker, sizeof(*display_channel), SPICE_CHANNEL_DISPLAY, migrate, handle_channel_events, - dispatch_display_channel_client_disconnect, + display_channel_client_on_disconnect, display_channel_send_item, display_channel_hold_pipe_item, display_channel_release_item, display_channel_handle_message, - display_channel_on_incoming_error, - display_channel_on_outgoing_error, display_channel_handle_migrate_mark, display_channel_handle_migrate_data, display_channel_handle_migrate_data_get_serial ))) { + red_printf("failed to create display channel"); return; } display_channel = worker->display_channel; @@ -9791,7 +9753,7 @@ static void handle_new_display_channel(RedWorker *worker, RedClient *client, Red if (!dcc) { return; } - + red_printf("New display (client %p) dcc %p stream %p", client, dcc, stream); stream_buf_size = 32*1024; dcc->send_data.stream_outbuf = spice_malloc(stream_buf_size); dcc->send_data.stream_outbuf_size = stream_buf_size; @@ -9836,11 +9798,15 @@ error: static void cursor_channel_client_disconnect(RedChannelClient *rcc) { - if (!red_channel_is_connected(rcc->channel)) { + red_channel_client_disconnect(rcc); +} + +static void cursor_channel_client_on_disconnect(RedChannelClient *rcc) +{ + if (!rcc) { return; } red_reset_cursor_cache(rcc); - red_channel_client_disconnect(rcc); } static void red_disconnect_cursor(RedChannel *channel) @@ -9857,13 +9823,14 @@ static void red_disconnect_cursor(RedChannel *channel) red_disconnect_channel(channel); } -static void red_migrate_cursor(RedWorker *worker) +static void red_migrate_cursor(RedWorker *worker, RedChannelClient *rcc) { - if (cursor_is_connected(worker)) { - red_channel_pipes_add_type(&worker->cursor_channel->common.base, - PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE); - red_channel_pipes_add_type(&worker->cursor_channel->common.base, - PIPE_ITEM_TYPE_MIGRATE); +// if (cursor_is_connected(worker)) { + if (red_channel_client_is_connected(rcc)) { + red_channel_client_pipe_add_type(rcc, + PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE); + red_channel_client_pipe_add_type(rcc, + PIPE_ITEM_TYPE_MIGRATE); } } @@ -9950,13 +9917,11 @@ static void ensure_cursor_channel_created(RedWorker *worker, int migrate) worker, sizeof(*worker->cursor_channel), SPICE_CHANNEL_CURSOR, migrate, handle_channel_events, - dispatch_cursor_channel_client_disconnect, + cursor_channel_client_on_disconnect, cursor_channel_send_item, cursor_channel_hold_pipe_item, cursor_channel_release_item, red_channel_client_handle_message, - cursor_channel_on_incoming_error, - cursor_channel_on_outgoing_error, NULL, NULL, NULL); @@ -10424,6 +10389,10 @@ static void handle_dev_input(EventListener *listener, uint32_t events) case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE: case RED_WORKER_MESSAGE_STOP: case RED_WORKER_MESSAGE_LOADVM_COMMANDS: + case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: + case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: + case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: + case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: write_ready = 1; default: break; @@ -10476,6 +10445,14 @@ static void handle_dev_input(EventListener *listener, uint32_t events) case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE: handle_dev_destroy_primary_surface(worker); break; + case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: { + RedChannel *red_channel; + // TODO: handle seemless migration. Temp, setting migrate to FALSE + ensure_display_channel_created(worker, FALSE); + red_channel = &worker->display_channel->common.base; + send_data(worker->channel, &red_channel, sizeof(RedChannel *)); + break; + } case RED_WORKER_MESSAGE_DISPLAY_CONNECT: { RedsStream *stream; RedClient *client; @@ -10488,20 +10465,15 @@ static void handle_dev_input(EventListener *listener, uint32_t events) handle_new_display_channel(worker, client, stream, migrate); break; } - case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: { + case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: { RedChannelClient *rcc; red_printf("disconnect display client"); receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + ASSERT(rcc); display_channel_client_disconnect(rcc); - message = RED_WORKER_MESSAGE_READY; - write_message(worker->channel, &message); break; } - case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: - red_printf("disconnect"); - red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel); - break; case RED_WORKER_MESSAGE_STOP: { red_printf("stop"); handle_dev_stop(worker); @@ -10511,10 +10483,22 @@ static void handle_dev_input(EventListener *listener, uint32_t events) red_printf("start"); handle_dev_start(worker); break; - case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: - red_printf("migrate"); - red_migrate_display(worker); + case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: { + RedChannelClient *rcc; + red_printf("migrate display client"); + receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + ASSERT(rcc); + red_migrate_display(worker, rcc); + break; + } + case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: { + RedChannel *red_channel; + // TODO: handle seemless migration. Temp, setting migrate to FALSE + ensure_cursor_channel_created(worker, FALSE); + red_channel = &worker->cursor_channel->common.base; + send_data(worker->channel, &red_channel, sizeof(RedChannel *)); break; + } case RED_WORKER_MESSAGE_CURSOR_CONNECT: { RedsStream *stream; RedClient *client; @@ -10527,24 +10511,23 @@ static void handle_dev_input(EventListener *listener, uint32_t events) red_connect_cursor(worker, client, stream, migrate); break; } - case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: { + case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: { RedChannelClient *rcc; red_printf("disconnect cursor client"); receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + ASSERT(rcc); cursor_channel_client_disconnect(rcc); - message = RED_WORKER_MESSAGE_READY; - write_message(worker->channel, &message); break; } - case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: - red_printf("cursor disconnect"); - red_disconnect_cursor((RedChannel *)worker->cursor_channel); - break; - case RED_WORKER_MESSAGE_CURSOR_MIGRATE: - red_printf("cursor migrate"); - red_migrate_cursor(worker); + case RED_WORKER_MESSAGE_CURSOR_MIGRATE: { + RedChannelClient *rcc; + red_printf("migrate cursor client"); + receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + ASSERT(rcc); + red_migrate_cursor(worker, rcc); break; + } case RED_WORKER_MESSAGE_SET_COMPRESSION: receive_data(worker->channel, &worker->image_compression, sizeof(spice_image_compression_t)); diff --git a/server/red_worker.h b/server/red_worker.h index 6fbe0611..26c43adb 100644 --- a/server/red_worker.h +++ b/server/red_worker.h @@ -53,13 +53,11 @@ enum { RED_WORKER_MESSAGE_READY, RED_WORKER_MESSAGE_DISPLAY_CONNECT, RED_WORKER_MESSAGE_DISPLAY_DISCONNECT, - RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT, RED_WORKER_MESSAGE_DISPLAY_MIGRATE, RED_WORKER_MESSAGE_START, RED_WORKER_MESSAGE_STOP, RED_WORKER_MESSAGE_CURSOR_CONNECT, RED_WORKER_MESSAGE_CURSOR_DISCONNECT, - RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT, RED_WORKER_MESSAGE_CURSOR_MIGRATE, RED_WORKER_MESSAGE_SET_COMPRESSION, RED_WORKER_MESSAGE_SET_STREAMING_VIDEO, @@ -83,6 +81,9 @@ enum { RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC, /* suspend/windows resolution change command */ RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC, + + RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE, + RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE, }; typedef uint32_t RedWorkerMessage; diff --git a/server/reds.c b/server/reds.c index b8b4d26f..0387a5cc 100644 --- a/server/reds.c +++ b/server/reds.c @@ -193,17 +193,23 @@ typedef struct RedsStatValue { typedef struct RedsMigSpice RedsMigSpice; +typedef struct RedsChannel { + struct RedsChannel *next; + RedChannel *base; + + int num_common_caps; + uint32_t *common_caps; +} RedsChannel; + typedef struct RedsState { int listen_socket; int secure_listen_socket; SpiceWatch *listen_watch; SpiceWatch *secure_listen_watch; - int disconnecting; VDIPortState agent_state; int pending_mouse_event; Ring clients; int num_clients; - Channel *main_channel_factory; MainChannel *main_channel; int mig_wait_connect; @@ -212,7 +218,7 @@ typedef struct RedsState { int mig_target; RedsMigSpice *mig_spice; int num_of_channels; - Channel *channels; + RedsChannel *channels; int mouse_mode; int is_client_mouse_allowed; int dispatcher_allows_client_mouse; @@ -297,6 +303,8 @@ struct ChannelSecurityOptions { ChannelSecurityOptions *next; }; +static void reds_dispose_channel(RedsChannel *channel); + static ChannelSecurityOptions *channels_security = NULL; static int default_channel_security = SPICE_CHANNEL_SECURITY_NONE | SPICE_CHANNEL_SECURITY_SSL; @@ -506,21 +514,29 @@ void reds_update_stat_value(uint32_t value) #endif -void reds_register_channel(Channel *channel) +void reds_register_channel(RedChannel *channel) { + RedsChannel *reds_channel; + ASSERT(reds); - channel->next = reds->channels; - reds->channels = channel; + // TODO: should channels be released upon some destructor? + reds_channel = spice_malloc0(sizeof(RedsChannel)); + reds_channel->base = channel; + reds_channel->next = reds->channels; + reds->channels = reds_channel; reds->num_of_channels++; } -void reds_unregister_channel(Channel *channel) +void reds_unregister_channel(RedChannel *channel) { - Channel **now = &reds->channels; + RedsChannel **now = &reds->channels; while (*now) { - if (*now == channel) { - *now = channel->next; + if ((*now)->base == channel) { + RedsChannel *free_channel = *now; + *now = free_channel->next; + reds_dispose_channel(free_channel); + free(free_channel); reds->num_of_channels--; return; } @@ -529,10 +545,10 @@ void reds_unregister_channel(Channel *channel) red_printf("not found"); } -static Channel *reds_find_channel(uint32_t type, uint32_t id) +static RedsChannel *reds_find_channel(uint32_t type, uint32_t id) { - Channel *channel = reds->channels; - while (channel && !(channel->type == type && channel->id == id)) { + RedsChannel *channel = reds->channels; + while (channel && !(channel->base->type == type && channel->base->id == id)) { channel = channel->next; } return channel; @@ -590,27 +606,42 @@ static int reds_main_channel_connected(void) void reds_client_disconnect(RedClient *client) { - if (!reds_main_channel_connected() || client->disconnecting) { + // TODO: rename reds_main_channel_connected, or really set main_channel to NULL on disconnect, + // though still, it is not a reason not to disconnect the rest of the channels + if (!reds_main_channel_connected() || client->disconnecting) { + /* case of recursion (main_channel_client_on_disconnect-> + * reds_client_disconnect->red_client_destroy-<main_channel... + */ return; } red_printf(""); + // why is "disconnecting" even needed? it is synchronic, even in the dispatcher we are now waiting for disconnection + // Are there recursive calls? Maybe from main_channel? client->disconnecting = TRUE; - /* Reset write filter to start with clean state on client reconnect */ - agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste, - TRUE); - /* Throw away pending chunks from the current (if any) and future - messages read from the agent */ - reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD; - reds->agent_state.read_filter.discard_all = TRUE; + // TODO: we need to handle agent properly for all clients!!!! (e.g., cut and paste, how?) + // We shouldn't initialize the agent when there are still clients connected ring_remove(&client->link); reds->num_clients--; red_client_destroy(client); - reds_mig_cleanup(); - reds->disconnecting = FALSE; + // TODO: we need to handle agent properly for all clients!!!! (e.g., cut and paste, how? Maybe throw away messages + // if we are in the middle of one from another client) + // We shouldn't initialize the agent when there are still clients connected + if (reds->num_clients == 0) { + /* Reset write filter to start with clean state on client reconnect */ + agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste, + TRUE); + + /* Throw away pending chunks from the current (if any) and future + * messages read from the agent */ + reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD; + reds->agent_state.read_filter.discard_all = TRUE; + + reds_mig_cleanup(); + } } // TODO: go over all usage of reds_disconnect, most/some of it should be converted to @@ -623,6 +654,7 @@ static void reds_disconnect(void) RING_FOREACH_SAFE(link, next, &reds->clients) { reds_client_disconnect(SPICE_CONTAINEROF(link, RedClient, link)); } + reds_mig_cleanup(); } static void reds_mig_disconnect() @@ -949,11 +981,11 @@ int reds_num_of_channels() static int secondary_channels[] = { SPICE_CHANNEL_MAIN, SPICE_CHANNEL_DISPLAY, SPICE_CHANNEL_CURSOR, SPICE_CHANNEL_INPUTS}; -static int channel_is_secondary(Channel *channel) +static int channel_is_secondary(RedsChannel *channel) { int i; for (i = 0 ; i < sizeof(secondary_channels)/sizeof(secondary_channels[0]); ++i) { - if (channel->type == secondary_channels[i]) { + if (channel->base->type == secondary_channels[i]) { return TRUE; } } @@ -962,7 +994,7 @@ static int channel_is_secondary(Channel *channel) void reds_fill_channels(SpiceMsgChannels *channels_info) { - Channel *channel; + RedsChannel *channel; int i; int used_channels = 0; @@ -973,8 +1005,8 @@ void reds_fill_channels(SpiceMsgChannels *channels_info) if (reds->num_clients > 1 && !channel_is_secondary(channel)) { continue; } - channels_info->channels[used_channels].type = channel->type; - channels_info->channels[used_channels].id = channel->id; + channels_info->channels[used_channels].type = channel->base->type; + channels_info->channels[used_channels].id = channel->base->id; used_channels++; } channels_info->num_of_channels = used_channels; @@ -1350,7 +1382,7 @@ static int sync_write(RedsStream *stream, const void *in_buf, size_t n) return TRUE; } -static void reds_channel_set_common_caps(Channel *channel, int cap, int active) +static void reds_channel_set_common_caps(RedsChannel *channel, int cap, int active) { int nbefore, n; @@ -1367,7 +1399,7 @@ static void reds_channel_set_common_caps(Channel *channel, int cap, int active) } } -static void reds_channel_init_auth_caps(Channel *channel) +static void reds_channel_init_auth_caps(RedsChannel *channel) { if (sasl_enabled) { reds_channel_set_common_caps(channel, SPICE_COMMON_CAP_AUTH_SASL, TRUE); @@ -1377,12 +1409,8 @@ static void reds_channel_init_auth_caps(Channel *channel) reds_channel_set_common_caps(channel, SPICE_COMMON_CAP_PROTOCOL_AUTH_SELECTION, TRUE); } -void reds_channel_dispose(Channel *channel) +static void reds_dispose_channel(RedsChannel *channel) { - free(channel->caps); - channel->caps = NULL; - channel->num_caps = 0; - free(channel->common_caps); channel->common_caps = NULL; channel->num_common_caps = 0; @@ -1392,8 +1420,8 @@ static int reds_send_link_ack(RedLinkInfo *link) { SpiceLinkHeader header; SpiceLinkReply ack; - Channel caps = { 0, }; - Channel *channel; + RedsChannel common_caps = { 0, }; + RedsChannel *channel; BUF_MEM *bmBuf; BIO *bio; int ret = FALSE; @@ -1407,13 +1435,13 @@ static int reds_send_link_ack(RedLinkInfo *link) channel = reds_find_channel(link->link_mess->channel_type, 0); if (!channel) { - channel = ∩︀ + channel = &common_caps; } reds_channel_init_auth_caps(channel); /* make sure common caps are set */ ack.num_common_caps = channel->num_common_caps; - ack.num_channel_caps = channel->num_caps; + ack.num_channel_caps = channel->base ? channel->base->num_caps : 0; header.size += (ack.num_common_caps + ack.num_channel_caps) * sizeof(uint32_t); ack.caps_offset = sizeof(SpiceLinkReply); @@ -1441,13 +1469,15 @@ static int reds_send_link_ack(RedLinkInfo *link) goto end; if (!sync_write(link->stream, channel->common_caps, channel->num_common_caps * sizeof(uint32_t))) goto end; - if (!sync_write(link->stream, channel->caps, channel->num_caps * sizeof(uint32_t))) - goto end; + if (channel->base) { + if (!sync_write(link->stream, channel->base->caps, channel->base->num_caps * sizeof(uint32_t))) + goto end; + } ret = TRUE; end: - reds_channel_dispose(&caps); + reds_dispose_channel(&common_caps); BIO_free(bio); return ret; } @@ -1540,18 +1570,19 @@ static void reds_handle_main_link(RedLinkInfo *link) link->link_mess = NULL; reds_link_free(link); caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset); - if (!reds->main_channel_factory) { - reds->main_channel_factory = main_channel_init(); + if (!reds->main_channel) { + reds->main_channel = main_channel_init(); + ASSERT(reds->main_channel); } client = red_client_new(); ring_add(&reds->clients, &client->link); reds->num_clients++; - mcc = main_channel_link(reds->main_channel_factory, client, - stream, connection_id, reds->mig_target, link_mess->num_common_caps, - link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps, - link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL); - reds->main_channel = (MainChannel*)reds->main_channel_factory->data; - ASSERT(reds->main_channel); + mcc = main_channel_link(reds->main_channel, client, + stream, connection_id, reds->mig_target, + link_mess->num_common_caps, + link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps, + link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL); + red_printf("NEW Client %p mcc %p connect-id %d", client, mcc, connection_id); free(link_mess); red_client_set_main(client, mcc); @@ -1608,7 +1639,7 @@ static void openssl_init(RedLinkInfo *link) static void reds_handle_other_links(RedLinkInfo *link) { - Channel *channel; + RedsChannel *channel; RedClient *client = NULL; RedsStream *stream; SpiceLinkMess *link_mess; @@ -1617,7 +1648,7 @@ static void reds_handle_other_links(RedLinkInfo *link) link_mess = link->link_mess; if (reds->main_channel) { client = main_channel_get_client_by_link_id(reds->main_channel, - link_mess->connection_id); + link_mess->connection_id); } // TODO: MC: broke migration (at least for the dont-drop-connection kind). @@ -1650,9 +1681,12 @@ static void reds_handle_other_links(RedLinkInfo *link) link->link_mess = NULL; reds_link_free(link); caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset); - channel->link(channel, client, stream, reds->mig_target, link_mess->num_common_caps, - link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps, - link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL); + channel->base->client_cbs.connect(channel->base, client, stream, reds->mig_target, + link_mess->num_common_caps, + link_mess->num_common_caps ? caps : NULL, + link_mess->num_channel_caps, + link_mess->num_channel_caps ? + caps + link_mess->num_common_caps : NULL); free(link_mess); } @@ -3077,7 +3111,7 @@ static void reds_mig_finished(int completed) reds->mig_inprogress = TRUE; if (completed) { - Channel *channel; + RingItem *link, *next; reds->mig_wait_disconnect = TRUE; core->timer_start(reds->mig_timer, MIGRATE_TIMEOUT); @@ -3086,14 +3120,16 @@ static void reds_mig_finished(int completed) // - it can have an empty migrate - that seems ok // - I can try to fill it's migrate, then move stuff from reds.c there, but a lot of data // is in reds state right now. + // currently the migrate callback of main_channel does nothing main_channel_push_migrate(reds->main_channel); - channel = reds->channels; - while (channel) { - channel->migrate(channel); - channel = channel->next; + + RING_FOREACH_SAFE(link, next, &reds->clients) { + red_client_migrate(SPICE_CONTAINEROF(link, RedClient, link)); } } else { main_channel_push_migrate_cancel(reds->main_channel); + // TODO: all the seemless migration is broken. Before MC we waited for disconection of one client, + // no we need to wait to all the clients (see mig_timer)? reds_mig_cleanup(); } } diff --git a/server/reds.h b/server/reds.h index 6930fe6a..0bbda143 100644 --- a/server/reds.h +++ b/server/reds.h @@ -31,13 +31,10 @@ #include "common/marshaller.h" #include "common/messages.h" #include "spice.h" +#include "red_channel.h" #define SPICE_GNUC_VISIBLE __attribute__ ((visibility ("default"))) -typedef struct RedsStream RedsStream; -typedef struct RedClient RedClient; -typedef struct MainChannelClient MainChannelClient; - #if HAVE_SASL typedef struct RedsSASL { sasl_conn_t *conn; @@ -88,22 +85,6 @@ struct RedsStream { ssize_t (*writev)(RedsStream *s, const struct iovec *iov, int iovcnt); }; -typedef struct Channel { - struct Channel *next; - uint32_t type; - uint32_t id; - int num_common_caps; - uint32_t *common_caps; - int num_caps; - uint32_t *caps; - void (*link)(struct Channel *, RedClient *client, RedsStream *stream, - int migration, int num_common_caps, - uint32_t *common_caps, int num_caps, uint32_t *caps); - void (*shutdown)(struct Channel *); - void (*migrate)(struct Channel *); - void *data; -} Channel; - struct QXLState { QXLInterface *qif; struct RedDispatcher *dispatcher; @@ -114,8 +95,6 @@ struct SpiceNetWireState { struct TunnelWorker *worker; }; -void reds_channel_dispose(Channel *channel); - ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte); ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte); ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt); @@ -127,8 +106,8 @@ void reds_update_mm_timer(uint32_t mm_time); uint32_t reds_get_mm_time(void); void reds_set_client_mouse_allowed(int is_client_mouse_allowed, int x_res, int y_res); -void reds_register_channel(Channel *channel); -void reds_unregister_channel(Channel *channel); +void reds_register_channel(RedChannel *channel); +void reds_unregister_channel(RedChannel *channel); int reds_get_mouse_mode(void); // used by inputs_channel int reds_get_agent_mouse(void); // used by inputs_channel int reds_has_vdagent(void); // used by inputs channel diff --git a/server/smartcard.c b/server/smartcard.c index 36004f75..2ff13101 100644 --- a/server/smartcard.c +++ b/server/smartcard.c @@ -22,6 +22,7 @@ #include <arpa/inet.h> #include <vscard_common.h> +#include "server/reds.h" #include "server/char_device.h" #include "server/red_channel.h" #include "server/smartcard.h" @@ -78,7 +79,7 @@ static void smartcard_on_message_from_device( RedChannelClient *rcc, VSCMsgHeader *vheader); static SmartCardDeviceState* smartcard_device_state_new(); static void smartcard_device_state_free(SmartCardDeviceState* st); -static void smartcard_register_channel(void); +static void smartcard_init(void); void smartcard_char_device_wakeup(SpiceCharDeviceInstance *sin) { @@ -169,7 +170,7 @@ static int smartcard_char_device_add_to_readers(SpiceCharDeviceInstance *char_de } state->reader_id = g_smartcard_readers.num; g_smartcard_readers.sin[g_smartcard_readers.num++] = char_device; - smartcard_register_channel(); + smartcard_init(); return 0; } @@ -274,7 +275,6 @@ static int smartcard_channel_client_config_socket(RedChannelClient *rcc) static uint8_t *smartcard_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header) { - //red_printf("allocing %d bytes", msg_header->size); return spice_malloc(msg_header->size); } @@ -337,10 +337,9 @@ static void smartcard_channel_release_pipe_item(RedChannelClient *rcc, free(item); } -static void smartcard_channel_client_disconnect(RedChannelClient *rcc) +static void smartcard_channel_on_disconnect(RedChannelClient *rcc) { smartcard_readers_detach_all(rcc); - red_channel_client_destroy(rcc); } /* this is called from both device input and client input. since the device is @@ -487,63 +486,55 @@ static void smartcard_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *it { } -static void smartcard_link(Channel *channel, RedClient *client, +static void smartcard_connect(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { RedChannelClient *rcc; - ChannelCbs channel_cbs; - if (!channel->data) { - memset(&channel_cbs, sizeof(channel_cbs), 0); - channel_cbs.config_socket = smartcard_channel_client_config_socket; - channel_cbs.disconnect = smartcard_channel_client_disconnect; - channel_cbs.send_item = smartcard_channel_send_item; - channel_cbs.hold_item = smartcard_channel_hold_pipe_item; - channel_cbs.release_item = smartcard_channel_release_pipe_item; - channel_cbs.alloc_recv_buf = smartcard_channel_alloc_msg_rcv_buf; - channel_cbs.release_recv_buf = smartcard_channel_release_msg_rcv_buf; - channel->data = - red_channel_create(sizeof(SmartCardChannel), - core, migration, - FALSE /* handle_acks */, - smartcard_channel_handle_message, - &channel_cbs); - if (channel->data) { - red_channel_init_outgoing_messages_window((RedChannel*)channel->data); - } else { - red_printf("ERROR: smartcard channel creation failed"); - return; - } - } - rcc = red_channel_client_create(sizeof(RedChannelClient), - (RedChannel*)channel->data, client, stream); + rcc = red_channel_client_create(sizeof(RedChannelClient), channel, client, stream); red_channel_client_ack_zero_messages_window(rcc); } -static void smartcard_shutdown(Channel *channel) +static void smartcard_migrate(RedChannelClient *rcc) { } -static void smartcard_migrate(Channel *channel) -{ -} +SmartCardChannel *g_smartcard_channel; -static void smartcard_register_channel(void) +static void smartcard_init(void) { - Channel *channel; - static int registered = 0; - - if (registered) { - return; + ChannelCbs channel_cbs; + ClientCbs client_cbs = {0,}; + + ASSERT(!g_smartcard_channel); + + memset(&channel_cbs, sizeof(channel_cbs), 0); + memset(&client_cbs, sizeof(client_cbs), 0); + + channel_cbs.config_socket = smartcard_channel_client_config_socket; + channel_cbs.on_disconnect = smartcard_channel_on_disconnect; + channel_cbs.send_item = smartcard_channel_send_item; + channel_cbs.hold_item = smartcard_channel_hold_pipe_item; + channel_cbs.release_item = smartcard_channel_release_pipe_item; + channel_cbs.alloc_recv_buf = smartcard_channel_alloc_msg_rcv_buf; + channel_cbs.release_recv_buf = smartcard_channel_release_msg_rcv_buf; + + g_smartcard_channel = (SmartCardChannel*)red_channel_create(sizeof(SmartCardChannel), + core, SPICE_CHANNEL_SMARTCARD, 0, + FALSE /* migration - TODO?*/, + FALSE /* handle_acks */, + smartcard_channel_handle_message, + &channel_cbs); + + if (!g_smartcard_channel) { + red_error("failed to allocate Inputs Channel"); } - red_printf("registering smartcard channel"); - registered = 1; - channel = spice_new0(Channel, 1); - channel->type = SPICE_CHANNEL_SMARTCARD; - channel->link = smartcard_link; - channel->shutdown = smartcard_shutdown; - channel->migrate = smartcard_migrate; - reds_register_channel(channel); + + client_cbs.connect = smartcard_connect; + client_cbs.migrate = smartcard_migrate; + red_channel_register_client_cbs(&g_smartcard_channel->base, &client_cbs); + + reds_register_channel(&g_smartcard_channel->base); } diff --git a/server/snd_worker.c b/server/snd_worker.c index 332612f5..4c9a6f01 100644 --- a/server/snd_worker.c +++ b/server/snd_worker.c @@ -28,6 +28,7 @@ #include "spice.h" #include "red_common.h" +#include "main_channel.h" #include "reds.h" #include "red_dispatcher.h" #include "snd_worker.h" @@ -35,12 +36,6 @@ #include "generated_marshallers.h" #include "demarshallers.h" -/* main_channel.h inclusion drags red_channel.h which has conflicting types. - * until the channels here are defined in terms of red_channel.h we have some - * duplicate declarations */ -MainChannelClient *red_client_get_main(RedClient *client); -int main_channel_client_is_low_bandwidth(MainChannelClient *mcc); - #define MAX_SEND_VEC 100 #define RECIVE_BUF_SIZE (16 * 1024 * 2) @@ -78,10 +73,10 @@ enum RecordCommand { #define SND_RECORD_VOLUME_MASK (1 << SND_RECORD_VOLUME) typedef struct SndChannel SndChannel; -typedef void (*send_messages_proc)(void *in_channel); -typedef int (*handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message); -typedef void (*on_message_done_proc)(SndChannel *channel); -typedef void (*cleanup_channel_proc)(SndChannel *channel); +typedef void (*snd_channel_send_messages_proc)(void *in_channel); +typedef int (*snd_channel_handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message); +typedef void (*snd_channel_on_message_done_proc)(SndChannel *channel); +typedef void (*snd_channel_cleanup_channel_proc)(SndChannel *channel); typedef struct SndWorker SndWorker; @@ -90,6 +85,8 @@ struct SndChannel { SndWorker *worker; spice_parse_channel_func_t parser; + RedChannelClient *channel_client; + int active; int client_active; int blocked; @@ -116,10 +113,10 @@ struct SndChannel { uint8_t *end; } recive_data; - send_messages_proc send_messages; - handle_message_proc handle_message; - on_message_done_proc on_message_done; - cleanup_channel_proc cleanup; + snd_channel_send_messages_proc send_messages; + snd_channel_handle_message_proc handle_message; + snd_channel_on_message_done_proc on_message_done; + snd_channel_cleanup_channel_proc cleanup; int num_caps; uint32_t *caps; }; @@ -146,7 +143,7 @@ typedef struct PlaybackChannel { } PlaybackChannel; struct SndWorker { - Channel base; + RedChannel *base_channel; SndChannel *connection; SndWorker *next; int active; @@ -193,7 +190,7 @@ typedef struct RecordChannel { uint32_t celt_buf[FRAME_SIZE]; } RecordChannel; -static SndWorker *workers = NULL; +static SndWorker *workers; static uint32_t playback_compression = SPICE_AUDIO_DATA_MODE_CELT_0_5_1; static void snd_receive(void* data); @@ -863,10 +860,11 @@ static void snd_record_send(void* data) static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id, RedClient *client, RedsStream *stream, - int migrate, send_messages_proc send_messages, - handle_message_proc handle_message, - on_message_done_proc on_message_done, - cleanup_channel_proc cleanup, + int migrate, + snd_channel_send_messages_proc send_messages, + snd_channel_handle_message_proc handle_message, + snd_channel_on_message_done_proc on_message_done, + snd_channel_cleanup_channel_proc cleanup, uint32_t *caps, int num_caps) { SndChannel *channel; @@ -926,6 +924,10 @@ static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_i channel->cleanup = cleanup; channel->num_caps = num_caps; channel->caps = spice_memdup(caps, num_caps * sizeof(uint32_t)); + + channel->channel_client = red_channel_client_create_dummy(sizeof(RedChannelClient), + worker->base_channel, + client); return channel; error2: @@ -936,9 +938,15 @@ error1: return NULL; } -static void snd_shutdown(Channel *channel) +static void snd_disconnect_channel_client(RedChannelClient *rcc) { - SndWorker *worker = (SndWorker *)channel; + SndWorker *worker; + + ASSERT(rcc->channel); + ASSERT(rcc->channel->data); + worker = (SndWorker *)rcc->channel->data; + + ASSERT(worker->connection->channel_client == rcc); snd_disconnect_channel(worker->connection); } @@ -1096,13 +1104,13 @@ static void snd_playback_cleanup(SndChannel *channel) celt051_mode_destroy(playback_channel->celt_mode); } -static void snd_set_playback_peer(Channel *channel, RedClient *client, RedsStream *stream, +static void snd_set_playback_peer(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { - SndWorker *worker = (SndWorker *)channel; - SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker); + SndWorker *worker = channel->data; PlaybackChannel *playback_channel; + SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker); CELTEncoder *celt_encoder; CELTMode *celt_mode; int celt_error; @@ -1158,10 +1166,16 @@ error_1: celt051_mode_destroy(celt_mode); } -static void snd_record_migrate(Channel *channel) +static void snd_record_migrate_channel_client(RedChannelClient *rcc) { - SndWorker *worker = (SndWorker *)channel; + SndWorker *worker; + + ASSERT(rcc->channel); + ASSERT(rcc->channel->data); + worker = (SndWorker *)rcc->channel->data; + if (worker->connection) { + ASSERT(worker->connection->channel_client == rcc); snd_set_command(worker->connection, SND_RECORD_MIGRATE_MASK); snd_record_send(worker->connection); } @@ -1290,19 +1304,19 @@ static void on_new_record_channel(SndWorker *worker) static void snd_record_cleanup(SndChannel *channel) { - RecordChannel *record_channel = (RecordChannel *)channel; + RecordChannel *record_channel = SPICE_CONTAINEROF(channel, RecordChannel, base); celt051_decoder_destroy(record_channel->celt_decoder); celt051_mode_destroy(record_channel->celt_mode); } -static void snd_set_record_peer(Channel *channel, RedClient *client, RedsStream *stream, +static void snd_set_record_peer(RedChannel *channel, RedClient *client, RedsStream *stream, int migration, int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) { - SndWorker *worker = (SndWorker *)channel; - SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker); + SndWorker *worker = channel->data; RecordChannel *record_channel; + SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker); CELTDecoder *celt_decoder; CELTMode *celt_mode; int celt_error; @@ -1354,11 +1368,16 @@ error_2: celt051_mode_destroy(celt_mode); } -static void snd_playback_migrate(Channel *channel) +static void snd_playback_migrate_channel_client(RedChannelClient *rcc) { - SndWorker *worker = (SndWorker *)channel; + SndWorker *worker; + + ASSERT(rcc->channel); + ASSERT(rcc->channel->data); + worker = (SndWorker *)rcc->channel->data; if (worker->connection) { + ASSERT(worker->connection->channel_client == rcc); snd_set_command(worker->connection, SND_PLAYBACK_MIGRATE_MASK); snd_playback_send(worker->connection); } @@ -1386,48 +1405,67 @@ static void remove_worker(SndWorker *worker) void snd_attach_playback(SpicePlaybackInstance *sin) { SndWorker *playback_worker; + int num_caps; + uint32_t *caps; + RedChannel *channel; + ClientCbs client_cbs = {0,}; sin->st = spice_new0(SpicePlaybackState, 1); sin->st->sin = sin; playback_worker = &sin->st->worker; - playback_worker->base.type = SPICE_CHANNEL_PLAYBACK; - playback_worker->base.link = snd_set_playback_peer; - playback_worker->base.shutdown = snd_shutdown; - playback_worker->base.migrate = snd_playback_migrate; - playback_worker->base.data = NULL; + // TODO: Make RedChannel base of worker? instead of assigning it to channel->data + channel = red_channel_create_dummy(sizeof(RedChannel), SPICE_CHANNEL_PLAYBACK, 0); + + channel->data = playback_worker; + client_cbs.connect = snd_set_playback_peer; + client_cbs.disconnect = snd_disconnect_channel_client; + client_cbs.migrate = snd_playback_migrate_channel_client; + red_channel_register_client_cbs(channel, &client_cbs); + red_channel_set_data(channel, playback_worker); - playback_worker->base.num_caps = 1; - playback_worker->base.caps = spice_new(uint32_t, 1); - playback_worker->base.caps[0] = - (1 << SPICE_PLAYBACK_CAP_CELT_0_5_1) | - (1 << SPICE_PLAYBACK_CAP_VOLUME); + num_caps = 1; + caps = spice_new(uint32_t, 1); + caps[0] = (1 << SPICE_PLAYBACK_CAP_CELT_0_5_1) | + (1 << SPICE_PLAYBACK_CAP_VOLUME); + red_channel_set_caps(channel, num_caps, caps); + playback_worker->base_channel = channel; add_worker(playback_worker); - reds_register_channel(&playback_worker->base); + reds_register_channel(playback_worker->base_channel); } void snd_attach_record(SpiceRecordInstance *sin) { SndWorker *record_worker; + int num_caps; + uint32_t *caps; + RedChannel *channel; + ClientCbs client_cbs = {0,}; sin->st = spice_new0(SpiceRecordState, 1); sin->st->sin = sin; record_worker = &sin->st->worker; - record_worker->base.type = SPICE_CHANNEL_RECORD; - record_worker->base.link = snd_set_record_peer; - record_worker->base.shutdown = snd_shutdown; - record_worker->base.migrate = snd_record_migrate; - record_worker->base.data = NULL; - - record_worker->base.num_caps = 1; - record_worker->base.caps = spice_new(uint32_t, 1); - record_worker->base.caps[0] = - (1 << SPICE_RECORD_CAP_CELT_0_5_1) | - (1 << SPICE_RECORD_CAP_VOLUME); + // TODO: Make RedChannel base of worker? instead of assigning it to channel->data + channel = red_channel_create_dummy(sizeof(RedChannel), SPICE_CHANNEL_RECORD, 0); + + channel->data = record_worker; + client_cbs.connect = snd_set_record_peer; + client_cbs.disconnect = snd_disconnect_channel_client; + client_cbs.migrate = snd_record_migrate_channel_client; + red_channel_register_client_cbs(channel, &client_cbs); + red_channel_set_data(channel, record_worker); + + num_caps = 1; + caps = spice_new(uint32_t, 1); + caps[0] = (1 << SPICE_RECORD_CAP_CELT_0_5_1) | + (1 << SPICE_RECORD_CAP_VOLUME); + red_channel_set_caps(channel, num_caps, caps); + + record_worker->base_channel = channel; add_worker(record_worker); - reds_register_channel(&record_worker->base); + reds_register_channel(record_worker->base_channel); } static void snd_detach_common(SndWorker *worker) @@ -1437,9 +1475,8 @@ static void snd_detach_common(SndWorker *worker) } remove_worker(worker); snd_disconnect_channel(worker->connection); - reds_unregister_channel(&worker->base); - - reds_channel_dispose(&worker->base); + reds_unregister_channel(worker->base_channel); + red_channel_destroy(worker->base_channel); } static void spice_playback_state_free(SpicePlaybackState *st) @@ -1472,7 +1509,7 @@ void snd_set_playback_compression(int on) playback_compression = on ? SPICE_AUDIO_DATA_MODE_CELT_0_5_1 : SPICE_AUDIO_DATA_MODE_RAW; for (; now; now = now->next) { - if (now->base.type == SPICE_CHANNEL_PLAYBACK && now->connection) { + if (now->base_channel->type == SPICE_CHANNEL_PLAYBACK && now->connection) { SndChannel* sndchannel = now->connection; PlaybackChannel* playback = (PlaybackChannel*)now->connection; if (!check_cap(sndchannel->caps, sndchannel->num_caps, diff --git a/server/usbredir.c b/server/usbredir.c index 50a60956..8daab7d4 100644 --- a/server/usbredir.c +++ b/server/usbredir.c @@ -24,6 +24,7 @@ #include "server/char_device.h" #include "server/red_channel.h" +#include "server/reds.h" /* 64K should be enough for all but the largest bulk xfers + 32 bytes hdr */ #define BUF_SIZE (64 * 1024 + 32) @@ -36,8 +37,7 @@ typedef struct UsbRedirPipeItem { } UsbRedirPipeItem; typedef struct UsbRedirState { - Channel channel; - RedChannel *red_channel; + RedChannel *channel; RedChannelClient *rcc; SpiceCharDeviceState chardev_st; SpiceCharDeviceInstance *chardev_sin; @@ -60,14 +60,14 @@ static void usbredir_chardev_wakeup(SpiceCharDeviceInstance *sin) state = SPICE_CONTAINEROF(sin->st, UsbRedirState, chardev_st); sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base); - if (!state->red_channel) { + if (!state->rcc) { return; } do { if (!state->pipe_item) { state->pipe_item = spice_malloc(sizeof(UsbRedirPipeItem)); - red_channel_pipe_item_init(state->red_channel, + red_channel_pipe_item_init(state->rcc->channel, &state->pipe_item->base, 0); } @@ -82,12 +82,12 @@ static void usbredir_chardev_wakeup(SpiceCharDeviceInstance *sin) } while (n > 0); } -static int usbredir_red_channel_config_socket(RedChannelClient *rcc) +static int usbredir_red_channel_client_config_socket(RedChannelClient *rcc) { return TRUE; } -static void usbredir_red_channel_disconnect(RedChannelClient *rcc) +static void usbredir_red_channel_client_on_disconnect(RedChannelClient *rcc) { UsbRedirState *state; SpiceCharDeviceInstance *sin; @@ -177,16 +177,18 @@ static void usbredir_red_channel_release_pipe_item(RedChannelClient *rcc, free(item); } -static void usbredir_link(Channel *channel, RedClient *client, RedsStream *stream, int migration, - int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps) +static void usbredir_connect(RedChannel *channel, RedClient *client, + RedsStream *stream, int migration, int num_common_caps, + uint32_t *common_caps, int num_caps, uint32_t *caps) { + RedChannelClient *rcc; UsbRedirState *state; UsbRedirChannel *redir_chan; SpiceCharDeviceInstance *sin; SpiceCharDeviceInterface *sif; - ChannelCbs channel_cbs; - state = SPICE_CONTAINEROF(channel, UsbRedirState, channel); + redir_chan = SPICE_CONTAINEROF(channel, UsbRedirChannel, base); + state = redir_chan->state; sin = state->chardev_sin; sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base); @@ -199,86 +201,78 @@ static void usbredir_link(Channel *channel, RedClient *client, RedsStream *strea return; } - if (!state->red_channel) { - memset(&channel_cbs, sizeof(channel_cbs), 0); - channel_cbs.config_socket = usbredir_red_channel_config_socket; - channel_cbs.disconnect = usbredir_red_channel_disconnect; - channel_cbs.send_item = usbredir_red_channel_send_item; - channel_cbs.hold_item = usbredir_red_channel_hold_pipe_item; - channel_cbs.release_item = usbredir_red_channel_release_pipe_item; - channel_cbs.alloc_recv_buf = usbredir_red_channel_alloc_msg_rcv_buf; - channel_cbs.release_recv_buf = usbredir_red_channel_release_msg_rcv_buf; - state->red_channel = red_channel_create(sizeof(UsbRedirChannel), - core, migration, FALSE /* handle_acks */, - usbredir_red_channel_client_handle_message, - &channel_cbs); - } - if (!state->red_channel) { - return; - } - state->rcc = red_channel_client_create(sizeof(RedChannelClient), state->red_channel, - client, stream); - if (!state->rcc) { - red_printf("failed to create usbredir channel client\n"); + rcc = red_channel_client_create(sizeof(RedChannelClient), channel, client, stream); + if (!rcc) { return; } - red_channel_init_outgoing_messages_window(state->red_channel); - redir_chan = SPICE_CONTAINEROF(state->red_channel, UsbRedirChannel, base); - redir_chan->state = state; + state->rcc = rcc; + red_channel_client_ack_zero_messages_window(rcc); if (sif->state) { sif->state(sin, 1); } } -static void usbredir_shutdown(Channel *channel) -{ - UsbRedirState *state = SPICE_CONTAINEROF(channel, UsbRedirState, channel); - - usbredir_red_channel_disconnect(state->rcc); - red_channel_destroy(state->red_channel); - state->red_channel = NULL; -} - -static void usbredir_migrate(Channel *channel) +static void usbredir_migrate(RedChannelClient *rcc) { /* NOOP */ } int usbredir_device_connect(SpiceCharDeviceInstance *sin) { - UsbRedirState *state; static int id = 0; - + UsbRedirState *state; + UsbRedirChannel *redir_chan; + ChannelCbs channel_cbs = {0,}; + ClientCbs client_cbs = {0,}; + + channel_cbs.config_socket = usbredir_red_channel_client_config_socket; + channel_cbs.on_disconnect = usbredir_red_channel_client_on_disconnect; + channel_cbs.send_item = usbredir_red_channel_send_item; + channel_cbs.hold_item = usbredir_red_channel_hold_pipe_item; + channel_cbs.release_item = usbredir_red_channel_release_pipe_item; + channel_cbs.alloc_recv_buf = usbredir_red_channel_alloc_msg_rcv_buf; + channel_cbs.release_recv_buf = usbredir_red_channel_release_msg_rcv_buf; + + redir_chan = (UsbRedirChannel*)red_channel_create(sizeof(UsbRedirChannel), + core, SPICE_CHANNEL_USBREDIR, id++, + FALSE /* migration - TODO?*/, + FALSE /* handle_acks */, + usbredir_red_channel_client_handle_message, + &channel_cbs); + red_channel_init_outgoing_messages_window(&redir_chan->base); state = spice_new0(UsbRedirState, 1); - state->channel.type = SPICE_CHANNEL_USBREDIR; - state->channel.id = id++; - state->channel.link = usbredir_link; - state->channel.shutdown = usbredir_shutdown; - state->channel.migrate = usbredir_migrate; + state->channel = &redir_chan->base; state->chardev_st.wakeup = usbredir_chardev_wakeup; state->chardev_sin = sin; state->rcv_buf = spice_malloc(BUF_SIZE); state->rcv_buf_size = BUF_SIZE; + client_cbs.connect = usbredir_connect; + client_cbs.migrate = usbredir_migrate; + red_channel_register_client_cbs(&redir_chan->base, &client_cbs); + sin->st = &state->chardev_st; - reds_register_channel(&state->channel); + reds_register_channel(&redir_chan->base); return 0; } +/* Must be called from RedClient handling thread. */ void usbredir_device_disconnect(SpiceCharDeviceInstance *sin) { UsbRedirState *state; state = SPICE_CONTAINEROF(sin->st, UsbRedirState, chardev_st); - reds_unregister_channel(&state->channel); + reds_unregister_channel(state->channel); - usbredir_red_channel_disconnect(state->rcc); + red_channel_client_destroy(state->rcc); + red_channel_disconnect(state->channel); free(state->pipe_item); free(state->rcv_buf); + free(state->channel); free(state); } |