summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYonit Halperin <yhalperi@redhat.com>2011-08-03 18:36:04 +0300
committerAlon Levy <alevy@redhat.com>2011-08-23 18:27:46 +0300
commitf84dfeb0aac4b6b49bc447a2140cbc7c2882de51 (patch)
tree67797cd53349553b101bf1f9c5ea4057b123bacb
parent1db936e64cfe955a757d9d77302f104f68a58bfd (diff)
downloadspice-f84dfeb0aac4b6b49bc447a2140cbc7c2882de51.tar.gz
spice-f84dfeb0aac4b6b49bc447a2140cbc7c2882de51.tar.xz
spice-f84dfeb0aac4b6b49bc447a2140cbc7c2882de51.zip
server: registering RedChannel in reds, instead of Channel
Merging the functionality of reds::channel, into RedChannel. In addition, cleanup and fix disconnection code: before this patch, red_dispatcher_disconnect_display_client could have been called from the red_worker thread (and it must be called only from the io thread). RedChannel holds only connected channel clients. RedClient holds all the channel clients that were created till it is destroyed (and then it destroys them as well). Note: snd_channel still doesn't use red_channel, however it creates dummy channel and channel clients, in order to register itself in reds. server/red_channel.c: a channel is connected if it holds at least one channel client Previously I changed RedChannel to hold only connected channel clients and RedClient, to hold all the channel clients as long as it is not destroyed. usbredir: multichannel has not been tested, it just compiles.
-rw-r--r--server/inputs_channel.c115
-rw-r--r--server/main_channel.c107
-rw-r--r--server/main_channel.h4
-rw-r--r--server/red_channel.c278
-rw-r--r--server/red_channel.h70
-rw-r--r--server/red_dispatcher.c163
-rw-r--r--server/red_dispatcher.h5
-rw-r--r--server/red_tunnel_worker.c211
-rw-r--r--server/red_worker.c179
-rw-r--r--server/red_worker.h5
-rw-r--r--server/reds.c156
-rw-r--r--server/reds.h27
-rw-r--r--server/smartcard.c89
-rw-r--r--server/snd_worker.c157
-rw-r--r--server/usbredir.c102
15 files changed, 856 insertions, 812 deletions
diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index e6af0d58..653910e8 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -434,37 +434,18 @@ static void inputs_relase_keys(void)
kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
}
-static void inputs_channel_disconnect(RedChannelClient *rcc)
+static void inputs_channel_on_disconnect(RedChannelClient *rcc)
{
- inputs_relase_keys();
- red_channel_client_disconnect(rcc);
-}
-
-static void inputs_channel_on_error(RedChannelClient *rcc)
-{
- red_printf("");
- inputs_channel_disconnect(rcc);
-}
-
-static void inputs_shutdown(Channel *channel)
-{
- InputsChannel *inputs_channel = (InputsChannel *)channel->data;
- ASSERT(g_inputs_channel == inputs_channel);
-
- if (inputs_channel) {
- red_channel_shutdown(&inputs_channel->base);
- red_channel_destroy(&inputs_channel->base);
- channel->data = NULL;
- g_inputs_channel = NULL;
+ if (!rcc) {
+ return;
}
+ inputs_relase_keys();
}
-static void inputs_migrate(Channel *channel)
+static void inputs_migrate(RedChannelClient *rcc)
{
- InputsChannel *inputs_channel = channel->data;
-
- ASSERT(g_inputs_channel == (InputsChannel *)channel->data);
- red_channel_pipes_add_type(&inputs_channel->base, PIPE_ITEM_MIGRATE);
+ ASSERT(g_inputs_channel == (InputsChannel *)rcc->channel);
+ red_channel_client_pipe_add_type(rcc, PIPE_ITEM_MIGRATE);
}
static void inputs_pipe_add_init(RedChannelClient *rcc)
@@ -501,40 +482,21 @@ static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
{
}
-static void inputs_link(Channel *channel, RedClient *client,
- RedsStream *stream, int migration,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps)
+static void inputs_connect(RedChannel *channel, RedClient *client,
+ RedsStream *stream, int migration,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps)
{
InputsChannelClient *icc;
- ASSERT(channel->data == g_inputs_channel);
- if (channel->data == NULL) {
- ChannelCbs channel_cbs;
-
- memset(&channel_cbs, sizeof(channel_cbs), 0);
-
- channel_cbs.config_socket = inputs_channel_config_socket;
- channel_cbs.disconnect = inputs_channel_disconnect;
- channel_cbs.send_item = inputs_channel_send_item;
- channel_cbs.hold_item = inputs_channel_hold_pipe_item;
- channel_cbs.release_item = inputs_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = inputs_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = inputs_channel_release_msg_rcv_buf;
-
- red_printf("inputs channel create");
- channel->data = g_inputs_channel = (InputsChannel*)red_channel_create_parser(
- sizeof(InputsChannel), core, migration, FALSE /* handle_acks */
- ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
- ,inputs_channel_handle_parsed
- ,inputs_channel_on_error
- ,inputs_channel_on_error
- ,&channel_cbs);
- ASSERT(channel->data);
- }
+ ASSERT(g_inputs_channel);
+ ASSERT(channel == &g_inputs_channel->base);
+
red_printf("inputs channel client create");
icc = (InputsChannelClient*)red_channel_client_create(sizeof(InputsChannelClient),
- channel->data, client, stream);
+ channel,
+ client,
+ stream);
icc->motion_count = 0;
inputs_pipe_add_init(&icc->base);
}
@@ -560,14 +522,41 @@ static void key_modifiers_sender(void *opaque)
void inputs_init(void)
{
- Channel *channel;
-
- channel = spice_new0(Channel, 1);
- channel->type = SPICE_CHANNEL_INPUTS;
- channel->link = inputs_link;
- channel->shutdown = inputs_shutdown;
- channel->migrate = inputs_migrate;
- reds_register_channel(channel);
+ ChannelCbs channel_cbs;
+ ClientCbs client_cbs = {0,};
+
+ ASSERT(!g_inputs_channel);
+
+ memset(&channel_cbs, sizeof(channel_cbs), 0);
+ memset(&client_cbs, sizeof(client_cbs), 0);
+
+ channel_cbs.config_socket = inputs_channel_config_socket;
+ channel_cbs.on_disconnect = inputs_channel_on_disconnect;
+ channel_cbs.send_item = inputs_channel_send_item;
+ channel_cbs.hold_item = inputs_channel_hold_pipe_item;
+ channel_cbs.release_item = inputs_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = inputs_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = inputs_channel_release_msg_rcv_buf;
+
+ g_inputs_channel = (InputsChannel *)red_channel_create_parser(
+ sizeof(InputsChannel),
+ core,
+ SPICE_CHANNEL_INPUTS, 0,
+ FALSE, // TODO: set migration?
+ FALSE, /* handle_acks */
+ spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL),
+ inputs_channel_handle_parsed,
+ &channel_cbs);
+
+ if (!g_inputs_channel) {
+ red_error("failed to allocate Inputs Channel");
+ }
+
+ client_cbs.connect = inputs_connect;
+ client_cbs.migrate = inputs_migrate;
+ red_channel_register_client_cbs(&g_inputs_channel->base, &client_cbs);
+
+ reds_register_channel(&g_inputs_channel->base);
if (!(key_modifiers_timer = core->timer_add(key_modifiers_sender, NULL))) {
red_error("key modifiers timer create failed");
diff --git a/server/main_channel.c b/server/main_channel.c
index 54b3dff4..6d77f711 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -155,14 +155,13 @@ enum NetTestStage {
NET_TEST_STAGE_RATE,
};
-static void main_channel_client_disconnect(RedChannelClient *rcc)
+// when disconnection occurs, let reds shutdown all channels. This will trigger the
+// real disconnection of main channel
+static void main_channel_client_on_disconnect(RedChannelClient *rcc)
{
- red_channel_client_disconnect(rcc);
-}
-
-static void main_disconnect(MainChannel *main_chan)
-{
- red_channel_destroy(&main_chan->base);
+ red_printf("rcc=%p", rcc);
+ reds_client_disconnect(rcc->client);
+// red_channel_client_disconnect(rcc);
}
RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id)
@@ -840,12 +839,6 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
return TRUE;
}
-static void main_channel_on_error(RedChannelClient *rcc)
-{
- red_printf("");
- reds_client_disconnect(rcc->client);
-}
-
static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
{
MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
@@ -912,11 +905,12 @@ uint32_t main_channel_client_get_link_id(MainChannelClient *mcc)
return mcc->connection_id;
}
-MainChannelClient *main_channel_client_create(MainChannel *main_chan,
- RedClient *client, RedsStream *stream, uint32_t connection_id)
+MainChannelClient *main_channel_client_create(MainChannel *main_chan, RedClient *client,
+ RedsStream *stream, uint32_t connection_id)
{
- MainChannelClient *mcc = (MainChannelClient*)red_channel_client_create(
- sizeof(MainChannelClient), &main_chan->base, client, stream);
+ MainChannelClient *mcc = (MainChannelClient*)red_channel_client_create(sizeof(MainChannelClient),
+ &main_chan->base,
+ client, stream);
mcc->connection_id = connection_id;
mcc->bitrate_per_sec = ~0;
@@ -929,42 +923,20 @@ MainChannelClient *main_channel_client_create(MainChannel *main_chan,
return mcc;
}
-MainChannelClient *main_channel_link(Channel *channel, RedClient *client,
- RedsStream *stream, uint32_t connection_id, int migration,
- int num_common_caps, uint32_t *common_caps, int num_caps,
- uint32_t *caps)
+MainChannelClient *main_channel_link(MainChannel *channel, RedClient *client,
+ RedsStream *stream, uint32_t connection_id, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
{
MainChannelClient *mcc;
- if (channel->data == NULL) {
- ChannelCbs channel_cbs;
-
- channel_cbs.config_socket = main_channel_config_socket;
- channel_cbs.disconnect = main_channel_client_disconnect;
- channel_cbs.send_item = main_channel_send_item;
- channel_cbs.hold_item = main_channel_hold_pipe_item;
- channel_cbs.release_item = main_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = main_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = main_channel_release_msg_rcv_buf;
- channel_cbs.handle_migrate_flush_mark = main_channel_handle_migrate_flush_mark;
- channel_cbs.handle_migrate_data = main_channel_handle_migrate_data;
- channel_cbs.handle_migrate_data_get_serial = main_channel_handle_migrate_data_get_serial;
-
- red_printf("create main channel");
- channel->data = red_channel_create_parser(
- sizeof(MainChannel), core, migration, FALSE /* handle_acks */
- ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
- ,main_channel_handle_parsed
- ,main_channel_on_error
- ,main_channel_on_error
- ,&channel_cbs);
- ASSERT(channel->data);
- }
+ ASSERT(channel);
+
// TODO - migration - I removed it from channel creation, now put it
// into usage somewhere (not an issue until we return migration to it's
// former glory)
red_printf("add main channel client");
- mcc = main_channel_client_create(channel->data, client, stream, connection_id);
+ mcc = main_channel_client_create(channel, client, stream, connection_id);
return mcc;
}
@@ -978,6 +950,7 @@ int main_channel_getpeername(MainChannel *main_chan, struct sockaddr *sa, sockle
return main_chan ? getpeername(red_channel_get_first_socket(&main_chan->base), sa, salen) : -1;
}
+// TODO: ? shouldn't it disonnect all clients? or shutdown all main_channels?
void main_channel_close(MainChannel *main_chan)
{
int socketfd;
@@ -998,27 +971,31 @@ uint64_t main_channel_client_get_bitrate_per_sec(MainChannelClient *mcc)
return mcc->bitrate_per_sec;
}
-static void main_channel_shutdown(Channel *channel)
+MainChannel* main_channel_init(void)
{
- MainChannel *main_chan = channel->data;
+ RedChannel *channel;
+ ChannelCbs channel_cbs;
- if (main_chan != NULL) {
- main_disconnect(main_chan);
- }
-}
-static void main_channel_migrate()
-{
-}
+ channel_cbs.config_socket = main_channel_config_socket;
+ channel_cbs.on_disconnect = main_channel_client_on_disconnect;
+ channel_cbs.send_item = main_channel_send_item;
+ channel_cbs.hold_item = main_channel_hold_pipe_item;
+ channel_cbs.release_item = main_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = main_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = main_channel_release_msg_rcv_buf;
+ channel_cbs.handle_migrate_flush_mark = main_channel_handle_migrate_flush_mark;
+ channel_cbs.handle_migrate_data = main_channel_handle_migrate_data;
+ channel_cbs.handle_migrate_data_get_serial = main_channel_handle_migrate_data_get_serial;
-Channel* main_channel_init(void)
-{
- Channel *channel;
-
- channel = spice_new0(Channel, 1);
- channel->type = SPICE_CHANNEL_MAIN;
- channel->link = NULL; /* the main channel client is created by reds.c explicitly */
- channel->shutdown = main_channel_shutdown;
- channel->migrate = main_channel_migrate;
- return channel;
+ // TODO: set the migration flag of the channel
+ channel = red_channel_create_parser(sizeof(MainChannel), core,
+ SPICE_CHANNEL_MAIN, 0,
+ FALSE, FALSE, /* handle_acks */
+ spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL),
+ main_channel_handle_parsed,
+ &channel_cbs);
+ ASSERT(channel);
+
+ return (MainChannel *)channel;
}
diff --git a/server/main_channel.h b/server/main_channel.h
index 0dd37973..80475354 100644
--- a/server/main_channel.h
+++ b/server/main_channel.h
@@ -47,10 +47,10 @@ struct MainMigrateData {
typedef struct MainChannel MainChannel;
-Channel *main_channel_init(void);
+MainChannel *main_channel_init(void);
RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t link_id);
/* This is a 'clone' from the reds.h Channel.link callback to allow passing link_id */
-MainChannelClient *main_channel_link(struct Channel *, RedClient *client,
+MainChannelClient *main_channel_link(MainChannel *, RedClient *client,
RedsStream *stream, uint32_t link_id, int migration, int num_common_caps,
uint32_t *common_caps, int num_caps, uint32_t *caps);
void main_channel_close(MainChannel *main_chan); // not destroy, just socket close
diff --git a/server/red_channel.c b/server/red_channel.c
index 80aa667c..ae333aa7 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -32,10 +32,12 @@
#include "ring.h"
#include "stat.h"
#include "red_channel.h"
+#include "reds.h"
#include "generated_marshallers.h"
static void red_channel_client_event(int fd, int event, void *data);
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
+static void red_client_remove_channel(RedChannelClient *rcc);
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -138,10 +140,6 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
handler->msg);
}
- if (handler->shut) {
- handler->cb->on_error(handler->opaque);
- return;
- }
handler->msg_pos = 0;
handler->msg = NULL;
handler->header_pos = 0;
@@ -221,22 +219,9 @@ static void red_channel_client_on_output(void *opaque, int n)
stat_inc_counter(rcc->channel->out_bytes_counter, n);
}
-void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
-{
- rcc->channel->channel_cbs.disconnect(rcc);
-}
-
-static void red_channel_peer_on_incoming_error(RedChannelClient *rcc)
-{
- if (rcc->stream->shutdown) {
- return; // assume error has already been handled which caused the shutdown.
- }
- rcc->channel->on_incoming_error(rcc);
-}
-
-static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc)
+static void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
{
- rcc->channel->on_outgoing_error(rcc);
+ red_channel_client_disconnect(rcc);
}
static int red_channel_client_peer_get_out_msg_size(void *opaque)
@@ -417,6 +402,21 @@ error:
return NULL;
}
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client)
+{
+ RedChannelClient *rcc;
+
+ ASSERT(size >= sizeof(RedChannelClient));
+ rcc = spice_malloc0(size);
+ rcc->client = client;
+ rcc->channel = channel;
+ red_channel_add_client(channel, rcc);
+ return rcc;
+}
+
static void red_channel_client_default_connect(RedChannel *channel, RedClient *client,
RedsStream *stream,
int migration,
@@ -437,6 +437,7 @@ static void red_channel_client_default_migrate(RedChannelClient *base)
RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
channel_handle_message_proc handle_message,
ChannelCbs *channel_cbs)
@@ -445,11 +446,13 @@ RedChannel *red_channel_create(int size,
ClientCbs client_cbs;
ASSERT(size >= sizeof(*channel));
- ASSERT(channel_cbs->config_socket && channel_cbs->disconnect && handle_message &&
+ ASSERT(channel_cbs->config_socket && channel_cbs->on_disconnect && handle_message &&
channel_cbs->alloc_recv_buf && channel_cbs->release_item);
channel = spice_malloc0(size);
+ channel->type = type;
+ channel->id = id;
channel->handle_acks = handle_acks;
- channel->channel_cbs.disconnect = channel_cbs->disconnect;
+ channel->channel_cbs.on_disconnect = channel_cbs->on_disconnect;
channel->channel_cbs.send_item = channel_cbs->send_item;
channel->channel_cbs.release_item = channel_cbs->release_item;
channel->channel_cbs.hold_item = channel_cbs->hold_item;
@@ -484,8 +487,53 @@ RedChannel *red_channel_create(int size,
channel->thread_id = pthread_self();
- channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
channel->out_bytes_counter = 0;
+
+ return channel;
+}
+
+// TODO: red_worker can use this one
+static void dummy_watch_update_mask(SpiceWatch *watch, int event_mask)
+{
+}
+
+static SpiceWatch *dummy_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque)
+{
+ return NULL; // apparently allowed?
+}
+
+static void dummy_watch_remove(SpiceWatch *watch)
+{
+}
+
+// TODO: actually, since I also use channel_client_dummym, no need for core. Can be NULL
+SpiceCoreInterface dummy_core = {
+ .watch_update_mask = dummy_watch_update_mask,
+ .watch_add = dummy_watch_add,
+ .watch_remove = dummy_watch_remove,
+};
+
+RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id)
+{
+ RedChannel *channel;
+ ClientCbs client_cbs;
+
+ ASSERT(size >= sizeof(*channel));
+ channel = spice_malloc0(size);
+ channel->type = type;
+ channel->id = id;
+ channel->core = &dummy_core;
+ ring_init(&channel->clients);
+ client_cbs.connect = red_channel_client_default_connect;
+ client_cbs.disconnect = red_channel_client_default_disconnect;
+ client_cbs.migrate = red_channel_client_default_migrate;
+
+ red_channel_register_client_cbs(channel, &client_cbs);
+
+ channel->thread_id = pthread_self();
+
+ channel->out_bytes_counter = 0;
+
return channel;
}
@@ -496,26 +544,22 @@ static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *hea
RedChannel *red_channel_create_parser(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
spice_parse_channel_func_t parser,
- channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc incoming_error,
- channel_on_outgoing_error_proc outgoing_error,
+ channel_handle_parsed_proc handle_parsed,
ChannelCbs *channel_cbs)
{
- RedChannel *channel = red_channel_create(size,
- core, migrate, handle_acks, do_nothing_handle_message,
- channel_cbs);
+ RedChannel *channel = red_channel_create(size, core, type, id,
+ migrate, handle_acks,
+ do_nothing_handle_message,
+ channel_cbs);
if (channel == NULL) {
return NULL;
}
channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
channel->incoming_cb.parser = parser;
- channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
- channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
- channel->on_incoming_error = incoming_error;
- channel->on_outgoing_error = outgoing_error;
return channel;
}
@@ -533,10 +577,27 @@ void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs)
}
}
+void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps)
+{
+ channel->num_caps = num_caps;
+ channel->caps = caps;
+}
+
+void red_channel_set_data(RedChannel *channel, void *data)
+{
+ ASSERT(channel);
+ channel->data = data;
+}
+
void red_channel_client_destroy(RedChannelClient *rcc)
{
- red_channel_client_disconnect(rcc);
- spice_marshaller_destroy(rcc->send_data.marshaller);
+ if (red_channel_client_is_connected(rcc)) {
+ red_channel_client_disconnect(rcc);
+ }
+ red_client_remove_channel(rcc);
+ if (rcc->send_data.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.marshaller);
+ }
free(rcc);
}
@@ -548,11 +609,13 @@ void red_channel_destroy(RedChannel *channel)
if (!channel) {
return;
}
- red_channel_pipes_clear(channel);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_destroy(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
+ if (channel->caps) {
+ free(channel->caps);
+ }
free(channel);
}
@@ -563,21 +626,7 @@ void red_channel_client_shutdown(RedChannelClient *rcc)
rcc->stream->watch = NULL;
shutdown(rcc->stream->socket, SHUT_RDWR);
rcc->stream->shutdown = TRUE;
- rcc->incoming.shut = TRUE;
}
- red_channel_client_release_sent_item(rcc);
-}
-
-void red_channel_shutdown(RedChannel *channel)
-{
- RingItem *link;
- RingItem *next;
-
- red_printf("%d", channel->clients_num);
- RING_FOREACH_SAFE(link, next, &channel->clients) {
- red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
- }
- red_channel_pipes_clear(channel);
}
void red_channel_client_send(RedChannelClient *rcc)
@@ -650,11 +699,7 @@ void red_channel_push(RedChannel *channel)
}
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- if (rcc->stream == NULL) {
- rcc->channel->channel_cbs.disconnect(rcc);
- } else {
- red_channel_client_push(rcc);
- }
+ red_channel_client_push(rcc);
}
}
@@ -861,25 +906,9 @@ int red_channel_client_is_connected(RedChannelClient *rcc)
return rcc->stream != NULL;
}
-void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
-{
- ring_remove(&item->link);
-}
-
int red_channel_is_connected(RedChannel *channel)
{
- RingItem *link;
-
- if (!channel || channel->clients_num == 0) {
- return FALSE;
- }
- RING_FOREACH(link, &channel->clients) {
- if (red_channel_client_is_connected(
- SPICE_CONTAINEROF(link, RedChannelClient, channel_link))) {
- return TRUE;
- }
- }
- return FALSE;
+ return channel && (channel->clients_num > 0);
}
void red_channel_client_clear_sent_item(RedChannelClient *rcc)
@@ -906,21 +935,6 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc)
rcc->pipe_size = 0;
}
-void red_channel_pipes_clear(RedChannel *channel)
-{
- RingItem *link;
- RingItem *next;
- RedChannelClient *rcc;
-
- if (!channel) {
- return;
- }
- RING_FOREACH_SAFE(link, next, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- red_channel_client_pipe_clear(rcc);
- }
-}
-
void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
{
rcc->ack_data.messages_window = 0;
@@ -931,27 +945,37 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_
rcc->ack_data.client_window = client_window;
}
-static void red_channel_client_remove(RedChannelClient *rcc)
+
+static void red_channel_remove_client(RedChannelClient *rcc)
{
- ring_remove(&rcc->client_link);
- rcc->client->channels_num--;
+ ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id));
ring_remove(&rcc->channel_link);
rcc->channel->clients_num--;
+ // TODO: should we set rcc->channel to NULL???
}
-void red_channel_client_disconnect(RedChannelClient *rcc)
+static void red_client_remove_channel(RedChannelClient *rcc)
{
- red_printf("%p (channel %p)", rcc, rcc->channel);
+ pthread_mutex_lock(&rcc->client->lock);
+ ring_remove(&rcc->client_link);
+ rcc->client->channels_num--;
+ pthread_mutex_unlock(&rcc->client->lock);
+}
- if (rcc->send_data.item) {
- rcc->channel->channel_cbs.release_item(rcc, rcc->send_data.item, FALSE);
+void red_channel_client_disconnect(RedChannelClient *rcc)
+{
+ red_printf("%p (channel %p type %d id %d)", rcc, rcc->channel,
+ rcc->channel->type, rcc->channel->id);
+ if (!red_channel_client_is_connected(rcc)) {
+ return;
}
red_channel_client_pipe_clear(rcc);
reds_stream_free(rcc->stream);
- rcc->send_data.item = NULL;
- rcc->send_data.blocked = FALSE;
- rcc->send_data.size = 0;
- red_channel_client_remove(rcc);
+ rcc->stream = NULL;
+ red_channel_remove_client(rcc);
+ // TODO: not do it till destroyed?
+// red_channel_client_remove(rcc);
+ rcc->channel->channel_cbs.on_disconnect(rcc);
}
void red_channel_disconnect(RedChannel *channel)
@@ -959,27 +983,12 @@ void red_channel_disconnect(RedChannel *channel)
RingItem *link;
RingItem *next;
- red_channel_pipes_clear(channel);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_disconnect(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
}
-int red_channel_all_clients_serials_are_zero(RedChannel *channel)
-{
- RingItem *link;
- RedChannelClient *rcc;
-
- RING_FOREACH(link, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- if (rcc->send_data.serial != 0) {
- return FALSE;
- }
- }
- return TRUE;
-}
-
void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb)
{
RingItem *link;
@@ -1004,17 +1013,6 @@ void red_channel_apply_clients_data(RedChannel *channel, channel_client_callback
}
}
-void red_channel_set_shut(RedChannel *channel)
-{
- RingItem *link;
- RedChannelClient *rcc;
-
- RING_FOREACH(link, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- rcc->incoming.shut = TRUE;
- }
-}
-
int red_channel_all_blocked(RedChannel *channel)
{
RingItem *link;
@@ -1143,18 +1141,24 @@ RedClient *red_client_new()
client = spice_malloc0(sizeof(RedClient));
ring_init(&client->channels);
+ pthread_mutex_init(&client->lock, NULL);
client->thread_id = pthread_self();
return client;
}
-void red_client_shutdown(RedClient *client)
+void red_client_migrate(RedClient *client)
{
RingItem *link, *next;
+ RedChannelClient *rcc;
- red_printf("#channels %d", client->channels_num);
+ red_printf("migrate client with #channels %d", client->channels_num);
+ ASSERT(pthread_equal(pthread_self(), client->thread_id));
RING_FOREACH_SAFE(link, next, &client->channels) {
- red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link));
+ rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+ if (red_channel_client_is_connected(rcc)) {
+ rcc->channel->client_cbs.migrate(rcc);
+ }
}
}
@@ -1175,29 +1179,23 @@ void red_client_destroy(RedClient *client)
// TODO: should we go back to async. For this we need to use
// ref count for channel clients.
rcc->channel->client_cbs.disconnect(rcc);
+ ASSERT(ring_is_empty(&rcc->pipe));
+ ASSERT(rcc->pipe_size == 0);
+ ASSERT(rcc->send_data.size == 0);
+ red_channel_client_destroy(rcc);
}
- free(client);
-}
-void red_client_disconnect(RedClient *client)
-{
- RingItem *link, *next;
- RedChannelClient *rcc;
-
- red_printf("#channels %d", client->channels_num);
- RING_FOREACH_SAFE(link, next, &client->channels) {
- // some channels may be in other threads, so disconnection
- // is not synchronous.
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
- rcc->channel->client_cbs.disconnect(rcc);
- }
+ pthread_mutex_destroy(&client->lock);
+ free(client);
}
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
{
ASSERT(rcc && client);
+ pthread_mutex_lock(&client->lock);
ring_add(&client->channels, &rcc->client_link);
client->channels_num++;
+ pthread_mutex_unlock(&client->lock);
}
MainChannelClient *red_client_get_main(RedClient *client) {
diff --git a/server/red_channel.h b/server/red_channel.h
index cb33fcb1..daee8bae 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -24,9 +24,9 @@
#include "red_common.h"
#include <pthread.h>
-#include "reds.h"
#include "spice.h"
#include "ring.h"
+#include "common/marshaller.h"
#include "server/demarshallers.h"
#define MAX_SEND_BUFS 1000
@@ -62,7 +62,6 @@ typedef struct IncomingHandler {
uint32_t header_pos;
uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
uint32_t msg_pos;
- int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX
} IncomingHandler;
typedef int (*get_outgoing_msg_size_proc)(void *opaque);
@@ -98,8 +97,11 @@ typedef struct BufDescriptor {
uint8_t *data;
} BufDescriptor;
+typedef struct RedsStream RedsStream;
typedef struct RedChannel RedChannel;
typedef struct RedChannelClient RedChannelClient;
+typedef struct RedClient RedClient;
+typedef struct MainChannelClient MainChannelClient;
/* Messages handled by red_channel
* SET_ACK - sent to client on channel connection
@@ -154,7 +156,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base);
*/
typedef struct {
channel_configure_socket_proc config_socket;
- channel_disconnect_proc disconnect;
+ channel_disconnect_proc on_disconnect;
channel_send_pipe_item_proc send_item;
channel_hold_pipe_item_proc hold_item;
channel_release_pipe_item_proc release_item;
@@ -207,10 +209,20 @@ struct RedChannelClient {
};
struct RedChannel {
+ uint32_t type;
+ uint32_t id;
+
SpiceCoreInterface *core;
int migrate;
int handle_acks;
+ // RedChannel will hold only connected channel clients (logic - when pushing pipe item to all channel clients, there
+ // is no need to go over disconnect clients)
+ // . While client will hold the channel clients till it is destroyed
+ // and then it will destroy them as well.
+ // However RCC still holds a reference to the Channel.
+ // Maybe replace these logic with ref count?
+ // TODO: rename to 'connected_clients'?
Ring clients;
uint32_t clients_num;
@@ -220,11 +232,10 @@ struct RedChannel {
ChannelCbs channel_cbs;
ClientCbs client_cbs;
- /* Stuff below added for Main and Inputs channels switch to RedChannel
- * (might be removed later) */
- channel_on_incoming_error_proc on_incoming_error; /* alternative to disconnect */
- channel_on_outgoing_error_proc on_outgoing_error;
- int shut; /* signal channel is to be closed */
+ int num_caps;
+ uint32_t *caps;
+
+ void *data;
// TODO: when different channel_clients are in different threads from Channel -> need to protect!
pthread_t thread_id;
@@ -237,6 +248,7 @@ struct RedChannel {
* explicitly destroy the channel */
RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
channel_handle_message_proc handle_message,
ChannelCbs *channel_cbs);
@@ -245,21 +257,39 @@ RedChannel *red_channel_create(int size,
* will become default eventually */
RedChannel *red_channel_create_parser(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
spice_parse_channel_func_t parser,
channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc incoming_error,
- channel_on_outgoing_error_proc outgoing_error,
ChannelCbs *channel_cbs);
void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs);
+// caps are freed when the channel is destroyed
+void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps);
+void red_channel_set_data(RedChannel *channel, void *data);
RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
RedsStream *stream);
+// TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but
+// do use the client callbacks. So the channel clients are not connected (the channel doesn't
+// have list of them, but they do have a link to the channel, and the client has a list of them)
+RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id);
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client);
+
+
int red_channel_is_connected(RedChannel *channel);
int red_channel_client_is_connected(RedChannelClient *rcc);
+/*
+ * the disconnect callback is called from the channel's thread,
+ * i.e., for display channels - red worker thread, for all the other - from the main thread.
+ * RedClient is managed from the main thread. red_channel_client_destroy can be called only
+ * from red_client_destroy.
+ */
+
void red_channel_client_destroy(RedChannelClient *rcc);
void red_channel_destroy(RedChannel *channel);
@@ -267,7 +297,6 @@ void red_channel_destroy(RedChannel *channel);
* thread. It will not touch the rings, just shutdown the socket.
* It should be followed by some way to gurantee a disconnection. */
void red_channel_client_shutdown(RedChannelClient *rcc);
-void red_channel_shutdown(RedChannel *channel);
/* should be called when a new channel is ready to send messages */
void red_channel_init_outgoing_messages_window(RedChannel *channel);
@@ -276,9 +305,6 @@ void red_channel_init_outgoing_messages_window(RedChannel *channel);
int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
uint16_t type, void *message);
-/* default error handler that disconnects channel */
-void red_channel_client_default_peer_on_error(RedChannelClient *rcc);
-
/* when preparing send_data: should call init and then use marshaller */
void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item);
@@ -303,7 +329,6 @@ void red_channel_client_pipe_add_push(RedChannelClient *rcc, PipeItem *item);
void red_channel_client_pipe_add(RedChannelClient *rcc, PipeItem *item);
void red_channel_client_pipe_add_after(RedChannelClient *rcc, PipeItem *item, PipeItem *pos);
int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, PipeItem *item);
-void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, PipeItem *item);
void red_channel_client_pipe_add_tail(RedChannelClient *rcc, PipeItem *item);
/* for types that use this routine -> the pipe item should be freed */
@@ -315,9 +340,6 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_
void red_channel_client_push_set_ack(RedChannelClient *rcc);
void red_channel_push_set_ack(RedChannel *channel);
-/* TODO: This sets all clients to shut state - probably we want to close per channel */
-void red_channel_shutdown(RedChannel *channel);
-
int red_channel_get_first_socket(RedChannel *channel);
/* return TRUE if all of the connected clients to this channel are blocked */
@@ -354,7 +376,6 @@ void red_channel_client_push(RedChannelClient *rcc);
// TODO: again - what is the context exactly? this happens in channel disconnect. but our
// current red_channel_shutdown also closes the socket - is there a socket to close?
// are we reading from an fd here? arghh
-void red_channel_pipes_clear(RedChannel *channel);
void red_channel_client_pipe_clear(RedChannelClient *rcc);
// Again, used in various places outside of event handler context (or in other event handler
// contexts):
@@ -403,17 +424,22 @@ struct RedClient {
RingItem link;
Ring channels;
int channels_num;
- int disconnecting;
MainChannelClient *mcc;
+ pthread_mutex_t lock; // different channels can be in different threads
pthread_t thread_id;
+
+ int disconnecting;
};
RedClient *red_client_new();
MainChannelClient *red_client_get_main(RedClient *client);
+// main should be set once before all the other channels are created
void red_client_set_main(RedClient *client, MainChannelClient *mcc);
+
+
+void red_client_migrate(RedClient *client);
+// disconnects all the client's channels (should be called from the client's thread)
void red_client_destroy(RedClient *client);
-void red_client_disconnect(RedClient *client);
-void red_client_remove_channel(RedClient *client, RedChannelClient *rcc);
#endif
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 8cbdec9d..c00dc580 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -76,10 +76,10 @@ extern spice_wan_compression_t zlib_glz_state;
static RedDispatcher *dispatchers = NULL;
-static void red_dispatcher_set_peer(Channel *channel, RedClient *client,
- RedsStream *stream, int migration,
- int num_common_caps, uint32_t *common_caps, int num_caps,
- uint32_t *caps)
+static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *client,
+ RedsStream *stream, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
{
RedDispatcher *dispatcher;
@@ -92,23 +92,41 @@ static void red_dispatcher_set_peer(Channel *channel, RedClient *client,
send_data(dispatcher->channel, &migration, sizeof(int));
}
-static void red_dispatcher_shutdown_peer(Channel *channel)
+static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
+ RedDispatcher *dispatcher;
+
+ if (!rcc->channel) {
+ return;
+ }
+
+ dispatcher = (RedDispatcher *)rcc->channel->data;
+
red_printf("");
RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT;
write_message(dispatcher->channel, &message);
+ send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+
+ // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count
+ // for channels
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
}
-static void red_dispatcher_migrate(Channel *channel)
+static void red_dispatcher_display_migrate(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
- red_printf("channel type %u id %u", channel->type, channel->id);
+ RedDispatcher *dispatcher;
+ if (!rcc->channel) {
+ return;
+ }
+ dispatcher = (RedDispatcher *)rcc->channel->data;
+ red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE;
write_message(dispatcher->channel, &message);
+ send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
}
-static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, RedsStream *stream,
+static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
int migration, int num_common_caps,
uint32_t *common_caps, int num_caps,
uint32_t *caps)
@@ -122,18 +140,33 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client,
send_data(dispatcher->channel, &migration, sizeof(int));
}
-static void red_dispatcher_shutdown_cursor_peer(Channel *channel)
+static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
+ RedDispatcher *dispatcher;
+
+ if (!rcc->channel) {
+ return;
+ }
+
+ dispatcher = (RedDispatcher *)rcc->channel->data;
red_printf("");
RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT;
write_message(dispatcher->channel, &message);
+ send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
}
-static void red_dispatcher_cursor_migrate(Channel *channel)
+static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
- red_printf("channel type %u id %u", channel->type, channel->id);
+ RedDispatcher *dispatcher;
+
+ if (!rcc->channel) {
+ return;
+ }
+ dispatcher = (RedDispatcher *)rcc->channel->data;
+ red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE;
write_message(dispatcher->channel, &message);
}
@@ -591,36 +624,6 @@ static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
red_dispatcher_loadvm_commands((RedDispatcher*)qxl_worker, ext, count);
}
-static void red_dispatcher_send_disconnect(RedDispatcher *dispatcher,
- struct RedChannelClient *rcc, RedWorkerMessage message)
-{
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &rcc, sizeof(struct RedChannelClient *));
-}
-
-void red_dispatcher_disconnect_display_client(RedDispatcher *dispatcher,
- struct RedChannelClient *rcc)
-{
- RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
-
- red_dispatcher_send_disconnect(dispatcher, rcc,
- RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
-}
-
-void red_dispatcher_disconnect_cursor_client(RedDispatcher *dispatcher,
- struct RedChannelClient *rcc)
-{
- RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
-
- red_dispatcher_send_disconnect(dispatcher, rcc,
- RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
-}
-
-
void red_dispatcher_set_mm_time(uint32_t mm_time)
{
RedDispatcher *now = dispatchers;
@@ -867,6 +870,32 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher, uint64_t co
dispatcher->qxl->st->qif->async_complete(dispatcher->qxl, cookie);
}
+static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher)
+{
+ RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE;
+ RedChannel *display_channel;
+
+ write_message(dispatcher->channel, &message);
+
+ receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *));
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
+ return display_channel;
+}
+
+static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher)
+{
+ RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE;
+ RedChannel *cursor_channel;
+
+ write_message(dispatcher->channel, &message);
+
+ receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *));
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
+ return cursor_channel;
+}
+
RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
{
RedDispatcher *dispatcher;
@@ -875,10 +904,11 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
WorkerInitData init_data;
QXLDevInitInfo init_info;
int r;
- Channel *reds_channel;
- Channel *cursor_channel;
+ RedChannel *display_channel;
+ RedChannel *cursor_channel;
sigset_t thread_sig_mask;
sigset_t curr_sig_mask;
+ ClientCbs client_cbs = {0,};
quic_init();
sw_canvas_init();
@@ -950,23 +980,28 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
read_message(dispatcher->channel, &message);
ASSERT(message == RED_WORKER_MESSAGE_READY);
- reds_channel = spice_new0(Channel, 1);
- reds_channel->type = SPICE_CHANNEL_DISPLAY;
- reds_channel->id = qxl->id;
- reds_channel->link = red_dispatcher_set_peer;
- reds_channel->shutdown = red_dispatcher_shutdown_peer;
- reds_channel->migrate = red_dispatcher_migrate;
- reds_channel->data = dispatcher;
- reds_register_channel(reds_channel);
-
- cursor_channel = spice_new0(Channel, 1);
- cursor_channel->type = SPICE_CHANNEL_CURSOR;
- cursor_channel->id = qxl->id;
- cursor_channel->link = red_dispatcher_set_cursor_peer;
- cursor_channel->shutdown = red_dispatcher_shutdown_cursor_peer;
- cursor_channel->migrate = red_dispatcher_cursor_migrate;
- cursor_channel->data = dispatcher;
- reds_register_channel(cursor_channel);
+ display_channel = red_dispatcher_display_channel_create(dispatcher);
+
+ if (display_channel) {
+ client_cbs.connect = red_dispatcher_set_display_peer;
+ client_cbs.disconnect = red_dispatcher_disconnect_display_peer;
+ client_cbs.migrate = red_dispatcher_display_migrate;
+ red_channel_register_client_cbs(display_channel, &client_cbs);
+ red_channel_set_data(display_channel, dispatcher);
+ reds_register_channel(display_channel);
+ }
+
+ cursor_channel = red_dispatcher_cursor_channel_create(dispatcher);
+
+ if (cursor_channel) {
+ client_cbs.connect = red_dispatcher_set_cursor_peer;
+ client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer;
+ client_cbs.migrate = red_dispatcher_cursor_migrate;
+ red_channel_register_client_cbs(cursor_channel, &client_cbs);
+ red_channel_set_data(cursor_channel, dispatcher);
+ reds_register_channel(cursor_channel);
+ }
+
qxl->st->qif->attache_worker(qxl, &dispatcher->base);
qxl->st->qif->set_compression_level(qxl, calc_compression_level());
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index 07a95aec..144a40e2 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -32,9 +32,4 @@ uint32_t red_dispatcher_qxl_ram_size(void);
int red_dispatcher_qxl_count(void);
void red_dispatcher_async_complete(struct RedDispatcher*, uint64_t);
-void red_dispatcher_disconnect_display_client(struct RedDispatcher *dispatcher,
- struct RedChannelClient *rcc);
-void red_dispatcher_disconnect_cursor_client(struct RedDispatcher *dispatcher,
- struct RedChannelClient *rcc);
-
#endif
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
index 79cb8546..6c36fecb 100644
--- a/server/red_tunnel_worker.c
+++ b/server/red_tunnel_worker.c
@@ -537,8 +537,8 @@ typedef struct TunnelPrintService {
} TunnelPrintService;
struct TunnelWorker {
- Channel channel_interface; // for reds
- TunnelChannelClient *channel;
+ RedChannel *channel;
+ TunnelChannelClient *channel_client;
SpiceCoreInterface *core_interface;
SpiceNetWireInstance *sin;
@@ -564,7 +564,7 @@ struct TunnelWorker {
/*********************************************************************
* Tunnel interface
*********************************************************************/
-static void tunnel_channel_disconnect(RedChannel *channel);
+static void tunnel_channel_on_disconnect(RedChannel *channel);
/* networking interface for slirp */
static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface);
@@ -601,23 +601,23 @@ static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface,
static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms);
-/* reds interface */
-static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
+/* RedChannel interface */
+
+static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client,
RedsStream *stream, int migration,
int num_common_caps,
uint32_t *common_caps, int num_caps,
uint32_t *caps);
-static void handle_tunnel_channel_shutdown(struct Channel *channel);
-static void handle_tunnel_channel_migrate(struct Channel *channel);
-
+static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc);
+static void red_tunnel_channel_create(TunnelWorker *worker);
static void tunnel_shutdown(TunnelWorker *worker)
{
int i;
red_printf("");
/* shutdown input from channel */
- if (worker->channel) {
- red_channel_shutdown(worker->channel->base.channel);
+ if (worker->channel_client) {
+ red_channel_client_shutdown(&worker->channel_client->base);
}
/* shutdown socket pipe items */
@@ -745,7 +745,7 @@ static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_
--sckt->in_data.num_buffers;
__tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf);
++sckt->in_data.num_tokens;
- __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+ __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
}
static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker,
@@ -973,7 +973,7 @@ SPICE_GNUC_VISIBLE void spice_server_net_wire_recv_packet(SpiceNetWireInstance *
TunnelWorker *worker = sin->st->worker;
ASSERT(worker);
- if (worker->channel && worker->channel->base.channel->migrate) {
+ if (worker->channel_client && worker->channel_client->base.channel->migrate) {
return; // during migration and the tunnel state hasn't been restored yet.
}
@@ -1016,15 +1016,9 @@ void *red_tunnel_attach(SpiceCoreInterface *core_interface,
worker->null_interface.worker = worker;
- worker->channel_interface.type = SPICE_CHANNEL_TUNNEL;
- worker->channel_interface.id = 0;
- worker->channel_interface.link = handle_tunnel_channel_link;
- worker->channel_interface.shutdown = handle_tunnel_channel_shutdown;
- worker->channel_interface.migrate = handle_tunnel_channel_migrate;
- worker->channel_interface.data = worker;
+ red_tunnel_channel_create(worker);
- ring_init(&worker->services);
- reds_register_channel(&worker->channel_interface);
+ ring_init(&worker->services);
net_slirp_init(worker->sif->get_ip(worker->sin),
TRUE,
@@ -1096,7 +1090,7 @@ static inline TunnelService *__tunnel_worker_add_service(TunnelWorker *worker, u
#endif
if (!virt_ip) {
new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP;
- red_channel_client_pipe_add(&worker->channel->base, &new_service->pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &new_service->pipe_item);
}
return new_service;
@@ -1292,24 +1286,24 @@ static RedSocket *tunnel_worker_create_socket(TunnelWorker *worker, uint16_t loc
static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt)
{
- if (worker->channel) {
- if (red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ if (worker->channel_client) {
+ if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.data_pipe_item)) {
- red_channel_client_pipe_remove_and_release(&worker->channel->base,
+ red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
&sckt->out_data.data_pipe_item);
return;
}
- if (red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.status_pipe_item)) {
- red_channel_client_pipe_remove_and_release(&worker->channel->base,
+ red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
&sckt->out_data.status_pipe_item);
return;
}
- if (red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.token_pipe_item)) {
- red_channel_client_pipe_remove_and_release(&worker->channel->base,
+ red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
&sckt->out_data.token_pipe_item);
return;
}
@@ -1631,7 +1625,7 @@ static inline int __client_socket_can_receive(RedSocket *sckt)
{
return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) &&
- !sckt->worker->channel->mig_inprogress);
+ !sckt->worker->channel_client->mig_inprogress);
}
static int tunnel_channel_handle_socket_token(TunnelChannelClient *channel, RedSocket *sckt,
@@ -1870,7 +1864,7 @@ static void restored_rcv_buf_release(RawTunneledBuffer *buf)
--sckt->in_data.num_buffers;
__tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf);
// for case that ready queue is empty and the client has no tokens
- __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+ __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
}
RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt)
@@ -1889,7 +1883,7 @@ static void restore_tokens_buf_release(RawTunneledBuffer *buf)
RedSocket *sckt = (RedSocket *)buf->usr_opaque;
sckt->in_data.num_tokens += tokens_buf->num_tokens;
- __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+ __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
free(tokens_buf);
}
@@ -2182,7 +2176,7 @@ static uint64_t tunnel_channel_handle_migrate_data_get_serial(RedChannelClient *
static uint64_t tunnel_channel_handle_migrate_data(RedChannelClient *base,
uint32_t size, void *msg)
{
- TunnelChannelClient *channel = SPICE_CONTAINEROF(base->channel, TunnelChannelClient, base);
+ TunnelChannelClient *channel = SPICE_CONTAINEROF(base, TunnelChannelClient, base);
TunnelMigrateSocketList *sockets_list;
TunnelMigrateServicesList *services_list;
TunnelMigrateData *migrate_data = msg;
@@ -2790,22 +2784,22 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem
sckt_out_data->push_tail = NULL;
sckt_out_data->push_tail_size = 0;
- if (worker->channel) {
+ if (worker->channel_client) {
// can still send data to socket
if (__client_socket_can_receive(sckt)) {
if (sckt_out_data->ready_chunks_queue.head) {
// the pipe item may already be linked, if for example the send was
// blocked and before it finished and called release, tunnel_socket_send was called
if (!red_channel_client_pipe_item_is_linked(
- &worker->channel->base, &sckt_out_data->data_pipe_item)) {
+ &worker->channel_client->base, &sckt_out_data->data_pipe_item)) {
sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
- red_channel_client_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &sckt_out_data->data_pipe_item);
}
} else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) ||
(sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
- __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt);
} else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
- __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt);
}
}
}
@@ -2813,7 +2807,7 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem
if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
(sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) &&
- !sckt->in_slirp_send && !worker->channel->mig_inprogress) {
+ !sckt->in_slirp_send && !worker->channel_client->mig_inprogress) {
// for cases that slirp couldn't write whole it data to our socket buffer
net_slirp_socket_can_send_notify(sckt->slirp_sckt);
}
@@ -2935,8 +2929,8 @@ static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
red_printf("TUNNEL_DBG");
#endif
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
- ASSERT(worker->channel);
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(worker->channel_client);
+ ASSERT(!worker->channel_client->mig_inprogress);
far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port));
@@ -2964,7 +2958,7 @@ static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
#endif
*o_usr_s = sckt;
sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN;
- red_channel_client_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.status_pipe_item);
errno = EINPROGRESS;
return -1;
@@ -2989,7 +2983,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
sckt = (RedSocket *)opaque;
@@ -3014,7 +3008,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
}
if (urgent) {
- SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported");
+ SET_TUNNEL_ERROR(worker->channel_client, "urgent msgs not supported");
tunnel_shutdown(worker);
errno = ECONNRESET;
return -1;
@@ -3037,7 +3031,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
red_printf("socket out buffers overflow, socket will be closed"
" (local_port=%d, service_id=%d)",
ntohs(sckt->local_port), sckt->far_service->id);
- tunnel_socket_force_close(worker->channel, sckt);
+ tunnel_socket_force_close(worker->channel_client, sckt);
size_to_send = 0;
} else {
size_to_send = len;
@@ -3050,10 +3044,10 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
sckt->out_data.data_size += size_to_send;
if (sckt->out_data.ready_chunks_queue.head &&
- !red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ !red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.data_pipe_item)) {
sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
- red_channel_client_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.data_pipe_item);
}
}
@@ -3093,7 +3087,7 @@ static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocke
ASSERT(opaque);
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
sckt = (RedSocket *)opaque;
@@ -3104,14 +3098,14 @@ static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocke
if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) ||
(sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
- SET_TUNNEL_ERROR(worker->channel, "receive was shutdown");
+ SET_TUNNEL_ERROR(worker->channel_client, "receive was shutdown");
tunnel_shutdown(worker);
errno = ECONNRESET;
return -1;
}
if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
- SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected");
+ SET_TUNNEL_ERROR(worker->channel_client, "slirp socket not connected");
tunnel_shutdown(worker);
errno = ECONNRESET;
return -1;
@@ -3177,7 +3171,7 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
#ifdef DEBUG_NETWORK
PRINT_SCKT(sckt);
#endif
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
return;
@@ -3189,7 +3183,7 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND);
sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
} else {
- SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
+ SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
sckt->slirp_status);
tunnel_shutdown(worker);
return;
@@ -3200,11 +3194,11 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
// check if there is still data to send. the fin will be sent after data is released
// channel is alive, otherwise the sockets would have been aborted
if (!sckt->out_data.ready_chunks_queue.head) {
- __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt);
}
} else { // if client is closed, it means the connection was aborted since we didn't
// received fin from guest
- SET_TUNNEL_ERROR(worker->channel,
+ SET_TUNNEL_ERROR(worker->channel_client,
"unexpected tunnel_socket_shutdown_send client_status=%d",
sckt->client_status);
tunnel_shutdown(worker);
@@ -3229,12 +3223,12 @@ static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
#ifdef DEBUG_NETWORK
PRINT_SCKT(sckt);
#endif
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
/* failure in recv can happen after the client sckt was shutdown
(after client sent FIN, or after slirp sent FIN and client socket was closed */
if (!__should_send_fin_to_guest(sckt)) {
- SET_TUNNEL_ERROR(worker->channel,
+ SET_TUNNEL_ERROR(worker->channel_client,
"unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d",
sckt->client_status, sckt->slirp_status);
tunnel_shutdown(worker);
@@ -3246,7 +3240,7 @@ static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
} else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) {
sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
} else {
- SET_TUNNEL_ERROR(worker->channel,
+ SET_TUNNEL_ERROR(worker->channel_client,
"unexpected tunnel_socket_shutdown_recv slirp_status=%d",
sckt->slirp_status);
tunnel_shutdown(worker);
@@ -3302,11 +3296,11 @@ static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSoc
// check if there is still data to send. the close will be sent after data is released.
// close may already been pushed if it is a forced close
if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) {
- __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt);
}
} else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) {
if (sckt->client_waits_close_ack) {
- __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_close_ack_to_pipe(worker->channel_client, sckt);
} else {
tunnel_worker_free_socket(worker, sckt);
}
@@ -3333,12 +3327,12 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
#ifdef DEBUG_NETWORK
- if (!worker->channel) {
+ if (!worker->channel_client) {
red_printf("channel not connected");
}
#endif
- if (worker->channel && worker->channel->mig_inprogress) {
- SET_TUNNEL_ERROR(worker->channel, "during migration");
+ if (worker->channel_client && worker->channel_client->mig_inprogress) {
+ SET_TUNNEL_ERROR(worker->channel_client, "during migration");
tunnel_shutdown(worker);
return;
}
@@ -3398,27 +3392,25 @@ static void tunnel_worker_disconnect_slirp(TunnelWorker *worker)
/* don't call disconnect from functions that might be called by slirp
since it closes all its sockets and slirp is not aware of it */
-static void tunnel_channel_disconnect(RedChannel *channel)
+static void tunnel_channel_on_disconnect(RedChannel *channel)
{
- TunnelChannelClient *tunnel_channel = (TunnelChannelClient *)channel;
TunnelWorker *worker;
if (!channel) {
return;
}
red_printf("");
- worker = tunnel_channel->worker;
+ worker = (TunnelWorker *)channel->data;
tunnel_worker_disconnect_slirp(worker);
tunnel_worker_clear_routed_network(worker);
- red_channel_destroy(channel);
- worker->channel = NULL;
+ worker->channel_client = NULL;
}
// TODO - not MC friendly, remove
-static void tunnel_channel_disconnect_client(RedChannelClient *rcc)
+static void tunnel_channel_client_on_disconnect(RedChannelClient *rcc)
{
- tunnel_channel_disconnect(rcc->channel);
+ tunnel_channel_on_disconnect(rcc->channel);
}
/* interface for reds */
@@ -3439,7 +3431,7 @@ static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
{
}
-static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
+static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client,
RedsStream *stream, int migration,
int num_common_caps,
uint32_t *common_caps, int num_caps,
@@ -3447,15 +3439,42 @@ static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
{
TunnelChannelClient *tcc;
TunnelWorker *worker = (TunnelWorker *)channel->data;
- RedChannel *tunnel_channel;
- ChannelCbs channel_cbs = {0,};
- if (worker->channel) {
- tunnel_channel_disconnect(worker->channel->base.channel);
+ if (worker->channel_client) {
+ red_error("tunnel does not support multiple client");
}
+ tcc = (TunnelChannelClient*)red_channel_client_create(sizeof(TunnelChannelClient),
+ channel, client, stream);
+
+ tcc->worker = worker;
+ tcc->worker->channel_client = tcc;
+ net_slirp_set_net_interface(&worker->tunnel_interface.base);
+
+ on_new_tunnel_channel(tcc);
+}
+
+static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc)
+{
+ TunnelChannelClient *tunnel_channel;
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG: MIGRATE STARTED");
+#endif
+ tunnel_channel = (TunnelChannelClient *)rcc;
+ ASSERT(tunnel_channel == tunnel_channel->worker->channel_client);
+ tunnel_channel->mig_inprogress = TRUE;
+ net_slirp_freeze();
+ red_channel_client_pipe_add_type(rcc, PIPE_ITEM_TYPE_MIGRATE);
+}
+
+static void red_tunnel_channel_create(TunnelWorker *worker)
+{
+ RedChannel *channel;
+ ChannelCbs channel_cbs;
+ ClientCbs client_cbs = {0,};
+
channel_cbs.config_socket = tunnel_channel_config_socket;
- channel_cbs.disconnect = tunnel_channel_disconnect_client;
+ channel_cbs.on_disconnect = tunnel_channel_client_on_disconnect;
channel_cbs.alloc_recv_buf = tunnel_channel_alloc_msg_rcv_buf;
channel_cbs.release_recv_buf = tunnel_channel_release_msg_rcv_buf;
channel_cbs.hold_item = tunnel_channel_hold_pipe_item;
@@ -3465,39 +3484,23 @@ static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
channel_cbs.handle_migrate_data = tunnel_channel_handle_migrate_data;
channel_cbs.handle_migrate_data_get_serial = tunnel_channel_handle_migrate_data_get_serial;
- tunnel_channel = red_channel_create(sizeof(RedChannel),
- worker->core_interface,
- migration, TRUE,
- tunnel_channel_handle_message,
- &channel_cbs);
-
- if (!tunnel_channel) {
+ channel = red_channel_create(sizeof(RedChannel),
+ worker->core_interface,
+ SPICE_CHANNEL_TUNNEL, 0,
+ FALSE, // TODO: handle migration=TRUE
+ TRUE,
+ tunnel_channel_handle_message,
+ &channel_cbs);
+ if (!channel) {
return;
}
- tcc = (TunnelChannelClient*)red_channel_client_create(
- sizeof(TunnelChannelClient),
- tunnel_channel, client, stream);
- tcc->worker = worker;
- tcc->worker->channel = tcc;
- net_slirp_set_net_interface(&worker->tunnel_interface.base);
-
- on_new_tunnel_channel(tcc);
-}
+ client_cbs.connect = handle_tunnel_channel_link;
+ client_cbs.migrate = handle_tunnel_channel_client_migrate;
+ red_channel_register_client_cbs(channel, &client_cbs);
-static void handle_tunnel_channel_shutdown(struct Channel *channel)
-{
- tunnel_channel_disconnect(((TunnelWorker *)channel->data)->channel->base.channel);
-}
-
-static void handle_tunnel_channel_migrate(struct Channel *channel)
-{
-#ifdef DEBUG_NETWORK
- red_printf("TUNNEL_DBG: MIGRATE STARTED");
-#endif
- TunnelChannelClient *tunnel_channel = ((TunnelWorker *)channel->data)->channel;
- tunnel_channel->mig_inprogress = TRUE;
- net_slirp_freeze();
- red_channel_client_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE);
+ worker->channel = channel;
+ red_channel_set_data(channel, worker);
+ reds_register_channel(worker->channel);
}
diff --git a/server/red_worker.c b/server/red_worker.c
index 4347d243..e767623e 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -8606,15 +8606,20 @@ static void red_disconnect_channel(RedChannel *channel)
static void display_channel_client_disconnect(RedChannelClient *rcc)
{
- // TODO: MC: right now we assume single channel
+ red_channel_client_disconnect(rcc);
+}
+
+static void display_channel_client_on_disconnect(RedChannelClient *rcc)
+{
DisplayChannel *display_channel;
DisplayChannelClient *dcc = RCC_TO_DCC(rcc);
CommonChannel *common;
RedWorker *worker;
- if (!rcc || !red_channel_is_connected(rcc->channel)) {
+ if (!rcc) {
return;
}
+ red_printf("");
common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
worker = common->worker;
display_channel = (DisplayChannel *)rcc->channel;
@@ -8629,8 +8634,6 @@ static void display_channel_client_disconnect(RedChannelClient *rcc)
red_display_reset_compress_buf(dcc);
free(dcc->send_data.free_list.res);
red_display_destroy_streams(dcc);
- red_channel_client_pipe_clear(rcc); // do this before deleting surfaces
- red_channel_client_disconnect(rcc);
// this was the last channel client
if (!red_channel_is_connected(rcc->channel)) {
@@ -8654,12 +8657,15 @@ void red_disconnect_all_display_TODO_remove_me(RedChannel *channel)
worker->display_channel = NULL;
}
-static void red_migrate_display(RedWorker *worker)
+static void red_migrate_display(RedWorker *worker, RedChannelClient *rcc)
{
- if (worker->display_channel) {
- red_pipes_add_verb(&worker->display_channel->common.base,
- SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL);
- red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE);
+ // TODO: replace all worker->display_channel tests with
+ // is_connected
+ if (red_channel_client_is_connected(rcc)) {
+ red_pipe_add_verb(rcc, PIPE_ITEM_TYPE_MIGRATE);
+// red_pipes_add_verb(&worker->display_channel->common.base,
+// SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL);
+// red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE);
}
}
@@ -8900,7 +8906,7 @@ static inline void flush_display_commands(RedWorker *worker)
for (;;) {
red_channel_push(&worker->display_channel->common.base);
if (!display_is_connected(worker) ||
- red_channel_min_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) {
+ red_channel_max_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) {
break;
}
RedChannel *channel = (RedChannel *)worker->display_channel;
@@ -8961,6 +8967,9 @@ static inline void flush_cursor_commands(RedWorker *worker)
}
}
+// TODO: on timeout, don't disconnect all channeld immeduiatly - try to disconnect the slowest ones first
+// and maybe turn timeouts to several timeouts in order to disconnect channels gradually.
+// Should use disconnect or shutdown?
static inline void flush_all_qxl_commands(RedWorker *worker)
{
flush_display_commands(worker);
@@ -9487,15 +9496,13 @@ static int listen_to_new_client_channel(CommonChannel *common,
return TRUE;
}
-static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, int migrate,
+static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
event_listener_action_proc handler,
- channel_disconnect_proc disconnect,
+ channel_disconnect_proc on_disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
channel_release_pipe_item_proc release_item,
channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc on_incoming_error,
- channel_on_outgoing_error_proc on_outgoing_error,
channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
channel_handle_migrate_data_proc handle_migrate_data,
channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
@@ -9505,7 +9512,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
ChannelCbs channel_cbs;
channel_cbs.config_socket = common_channel_config_socket;
- channel_cbs.disconnect = disconnect;
+ channel_cbs.on_disconnect = on_disconnect;
channel_cbs.send_item = send_item;
channel_cbs.hold_item = hold_item;
channel_cbs.release_item = release_item;
@@ -9515,12 +9522,12 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
channel_cbs.handle_migrate_data = handle_migrate_data;
channel_cbs.handle_migrate_data_get_serial = handle_migrate_data_get_serial;
- channel = red_channel_create_parser(size, &worker_core, migrate,
+ channel = red_channel_create_parser(size, &worker_core,
+ channel_type, worker->id,
+ migrate,
TRUE /* handle_acks */,
- spice_get_client_channel_parser(channel_id, NULL),
+ spice_get_client_channel_parser(channel_type, NULL),
handle_parsed,
- on_incoming_error,
- on_outgoing_error,
&channel_cbs);
common = (CommonChannel *)channel;
if (!channel) {
@@ -9682,50 +9689,6 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item,
}
}
-static void display_channel_on_incoming_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void display_channel_on_outgoing_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void cursor_channel_on_incoming_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void cursor_channel_on_outgoing_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-// call this from dispatcher thread context
-static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc)
-{
- RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker;
- struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
-
- red_printf("");
- red_dispatcher_disconnect_display_client(dispatcher, rcc);
-}
-
-// call this from dispatcher thread context
-static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc)
-{
- RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker;
- struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
-
- red_printf("");
- red_dispatcher_disconnect_cursor_client(dispatcher, rcc);
-}
-
static void ensure_display_channel_created(RedWorker *worker, int migrate)
{
DisplayChannel *display_channel;
@@ -9739,17 +9702,16 @@ static void ensure_display_channel_created(RedWorker *worker, int migrate)
worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, migrate,
handle_channel_events,
- dispatch_display_channel_client_disconnect,
+ display_channel_client_on_disconnect,
display_channel_send_item,
display_channel_hold_pipe_item,
display_channel_release_item,
display_channel_handle_message,
- display_channel_on_incoming_error,
- display_channel_on_outgoing_error,
display_channel_handle_migrate_mark,
display_channel_handle_migrate_data,
display_channel_handle_migrate_data_get_serial
))) {
+ red_printf("failed to create display channel");
return;
}
display_channel = worker->display_channel;
@@ -9791,7 +9753,7 @@ static void handle_new_display_channel(RedWorker *worker, RedClient *client, Red
if (!dcc) {
return;
}
-
+ red_printf("New display (client %p) dcc %p stream %p", client, dcc, stream);
stream_buf_size = 32*1024;
dcc->send_data.stream_outbuf = spice_malloc(stream_buf_size);
dcc->send_data.stream_outbuf_size = stream_buf_size;
@@ -9836,11 +9798,15 @@ error:
static void cursor_channel_client_disconnect(RedChannelClient *rcc)
{
- if (!red_channel_is_connected(rcc->channel)) {
+ red_channel_client_disconnect(rcc);
+}
+
+static void cursor_channel_client_on_disconnect(RedChannelClient *rcc)
+{
+ if (!rcc) {
return;
}
red_reset_cursor_cache(rcc);
- red_channel_client_disconnect(rcc);
}
static void red_disconnect_cursor(RedChannel *channel)
@@ -9857,13 +9823,14 @@ static void red_disconnect_cursor(RedChannel *channel)
red_disconnect_channel(channel);
}
-static void red_migrate_cursor(RedWorker *worker)
+static void red_migrate_cursor(RedWorker *worker, RedChannelClient *rcc)
{
- if (cursor_is_connected(worker)) {
- red_channel_pipes_add_type(&worker->cursor_channel->common.base,
- PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE);
- red_channel_pipes_add_type(&worker->cursor_channel->common.base,
- PIPE_ITEM_TYPE_MIGRATE);
+// if (cursor_is_connected(worker)) {
+ if (red_channel_client_is_connected(rcc)) {
+ red_channel_client_pipe_add_type(rcc,
+ PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE);
+ red_channel_client_pipe_add_type(rcc,
+ PIPE_ITEM_TYPE_MIGRATE);
}
}
@@ -9950,13 +9917,11 @@ static void ensure_cursor_channel_created(RedWorker *worker, int migrate)
worker, sizeof(*worker->cursor_channel),
SPICE_CHANNEL_CURSOR, migrate,
handle_channel_events,
- dispatch_cursor_channel_client_disconnect,
+ cursor_channel_client_on_disconnect,
cursor_channel_send_item,
cursor_channel_hold_pipe_item,
cursor_channel_release_item,
red_channel_client_handle_message,
- cursor_channel_on_incoming_error,
- cursor_channel_on_outgoing_error,
NULL,
NULL,
NULL);
@@ -10424,6 +10389,10 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
case RED_WORKER_MESSAGE_STOP:
case RED_WORKER_MESSAGE_LOADVM_COMMANDS:
+ case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE:
+ case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE:
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
write_ready = 1;
default:
break;
@@ -10476,6 +10445,14 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
handle_dev_destroy_primary_surface(worker);
break;
+ case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: {
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_display_channel_created(worker, FALSE);
+ red_channel = &worker->display_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+ break;
+ }
case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
RedsStream *stream;
RedClient *client;
@@ -10488,20 +10465,15 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
handle_new_display_channel(worker, client, stream, migrate);
break;
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: {
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: {
RedChannelClient *rcc;
red_printf("disconnect display client");
receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
display_channel_client_disconnect(rcc);
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
break;
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
- red_printf("disconnect");
- red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel);
- break;
case RED_WORKER_MESSAGE_STOP: {
red_printf("stop");
handle_dev_stop(worker);
@@ -10511,10 +10483,22 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_printf("start");
handle_dev_start(worker);
break;
- case RED_WORKER_MESSAGE_DISPLAY_MIGRATE:
- red_printf("migrate");
- red_migrate_display(worker);
+ case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: {
+ RedChannelClient *rcc;
+ red_printf("migrate display client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
+ red_migrate_display(worker, rcc);
+ break;
+ }
+ case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: {
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_cursor_channel_created(worker, FALSE);
+ red_channel = &worker->cursor_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
break;
+ }
case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
RedsStream *stream;
RedClient *client;
@@ -10527,24 +10511,23 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_connect_cursor(worker, client, stream, migrate);
break;
}
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: {
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: {
RedChannelClient *rcc;
red_printf("disconnect cursor client");
receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
cursor_channel_client_disconnect(rcc);
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
break;
}
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
- red_printf("cursor disconnect");
- red_disconnect_cursor((RedChannel *)worker->cursor_channel);
- break;
- case RED_WORKER_MESSAGE_CURSOR_MIGRATE:
- red_printf("cursor migrate");
- red_migrate_cursor(worker);
+ case RED_WORKER_MESSAGE_CURSOR_MIGRATE: {
+ RedChannelClient *rcc;
+ red_printf("migrate cursor client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
+ red_migrate_cursor(worker, rcc);
break;
+ }
case RED_WORKER_MESSAGE_SET_COMPRESSION:
receive_data(worker->channel, &worker->image_compression,
sizeof(spice_image_compression_t));
diff --git a/server/red_worker.h b/server/red_worker.h
index 6fbe0611..26c43adb 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -53,13 +53,11 @@ enum {
RED_WORKER_MESSAGE_READY,
RED_WORKER_MESSAGE_DISPLAY_CONNECT,
RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
- RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT,
RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
RED_WORKER_MESSAGE_START,
RED_WORKER_MESSAGE_STOP,
RED_WORKER_MESSAGE_CURSOR_CONNECT,
RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
- RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT,
RED_WORKER_MESSAGE_CURSOR_MIGRATE,
RED_WORKER_MESSAGE_SET_COMPRESSION,
RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
@@ -83,6 +81,9 @@ enum {
RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
/* suspend/windows resolution change command */
RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
+
+ RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+ RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
};
typedef uint32_t RedWorkerMessage;
diff --git a/server/reds.c b/server/reds.c
index b8b4d26f..0387a5cc 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -193,17 +193,23 @@ typedef struct RedsStatValue {
typedef struct RedsMigSpice RedsMigSpice;
+typedef struct RedsChannel {
+ struct RedsChannel *next;
+ RedChannel *base;
+
+ int num_common_caps;
+ uint32_t *common_caps;
+} RedsChannel;
+
typedef struct RedsState {
int listen_socket;
int secure_listen_socket;
SpiceWatch *listen_watch;
SpiceWatch *secure_listen_watch;
- int disconnecting;
VDIPortState agent_state;
int pending_mouse_event;
Ring clients;
int num_clients;
- Channel *main_channel_factory;
MainChannel *main_channel;
int mig_wait_connect;
@@ -212,7 +218,7 @@ typedef struct RedsState {
int mig_target;
RedsMigSpice *mig_spice;
int num_of_channels;
- Channel *channels;
+ RedsChannel *channels;
int mouse_mode;
int is_client_mouse_allowed;
int dispatcher_allows_client_mouse;
@@ -297,6 +303,8 @@ struct ChannelSecurityOptions {
ChannelSecurityOptions *next;
};
+static void reds_dispose_channel(RedsChannel *channel);
+
static ChannelSecurityOptions *channels_security = NULL;
static int default_channel_security =
SPICE_CHANNEL_SECURITY_NONE | SPICE_CHANNEL_SECURITY_SSL;
@@ -506,21 +514,29 @@ void reds_update_stat_value(uint32_t value)
#endif
-void reds_register_channel(Channel *channel)
+void reds_register_channel(RedChannel *channel)
{
+ RedsChannel *reds_channel;
+
ASSERT(reds);
- channel->next = reds->channels;
- reds->channels = channel;
+ // TODO: should channels be released upon some destructor?
+ reds_channel = spice_malloc0(sizeof(RedsChannel));
+ reds_channel->base = channel;
+ reds_channel->next = reds->channels;
+ reds->channels = reds_channel;
reds->num_of_channels++;
}
-void reds_unregister_channel(Channel *channel)
+void reds_unregister_channel(RedChannel *channel)
{
- Channel **now = &reds->channels;
+ RedsChannel **now = &reds->channels;
while (*now) {
- if (*now == channel) {
- *now = channel->next;
+ if ((*now)->base == channel) {
+ RedsChannel *free_channel = *now;
+ *now = free_channel->next;
+ reds_dispose_channel(free_channel);
+ free(free_channel);
reds->num_of_channels--;
return;
}
@@ -529,10 +545,10 @@ void reds_unregister_channel(Channel *channel)
red_printf("not found");
}
-static Channel *reds_find_channel(uint32_t type, uint32_t id)
+static RedsChannel *reds_find_channel(uint32_t type, uint32_t id)
{
- Channel *channel = reds->channels;
- while (channel && !(channel->type == type && channel->id == id)) {
+ RedsChannel *channel = reds->channels;
+ while (channel && !(channel->base->type == type && channel->base->id == id)) {
channel = channel->next;
}
return channel;
@@ -590,27 +606,42 @@ static int reds_main_channel_connected(void)
void reds_client_disconnect(RedClient *client)
{
- if (!reds_main_channel_connected() || client->disconnecting) {
+ // TODO: rename reds_main_channel_connected, or really set main_channel to NULL on disconnect,
+ // though still, it is not a reason not to disconnect the rest of the channels
+ if (!reds_main_channel_connected() || client->disconnecting) {
+ /* case of recursion (main_channel_client_on_disconnect->
+ * reds_client_disconnect->red_client_destroy-<main_channel...
+ */
return;
}
red_printf("");
+ // why is "disconnecting" even needed? it is synchronic, even in the dispatcher we are now waiting for disconnection
+ // Are there recursive calls? Maybe from main_channel?
client->disconnecting = TRUE;
- /* Reset write filter to start with clean state on client reconnect */
- agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste,
- TRUE);
- /* Throw away pending chunks from the current (if any) and future
- messages read from the agent */
- reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD;
- reds->agent_state.read_filter.discard_all = TRUE;
+ // TODO: we need to handle agent properly for all clients!!!! (e.g., cut and paste, how?)
+ // We shouldn't initialize the agent when there are still clients connected
ring_remove(&client->link);
reds->num_clients--;
red_client_destroy(client);
- reds_mig_cleanup();
- reds->disconnecting = FALSE;
+ // TODO: we need to handle agent properly for all clients!!!! (e.g., cut and paste, how? Maybe throw away messages
+ // if we are in the middle of one from another client)
+ // We shouldn't initialize the agent when there are still clients connected
+ if (reds->num_clients == 0) {
+ /* Reset write filter to start with clean state on client reconnect */
+ agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste,
+ TRUE);
+
+ /* Throw away pending chunks from the current (if any) and future
+ * messages read from the agent */
+ reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD;
+ reds->agent_state.read_filter.discard_all = TRUE;
+
+ reds_mig_cleanup();
+ }
}
// TODO: go over all usage of reds_disconnect, most/some of it should be converted to
@@ -623,6 +654,7 @@ static void reds_disconnect(void)
RING_FOREACH_SAFE(link, next, &reds->clients) {
reds_client_disconnect(SPICE_CONTAINEROF(link, RedClient, link));
}
+ reds_mig_cleanup();
}
static void reds_mig_disconnect()
@@ -949,11 +981,11 @@ int reds_num_of_channels()
static int secondary_channels[] = {
SPICE_CHANNEL_MAIN, SPICE_CHANNEL_DISPLAY, SPICE_CHANNEL_CURSOR, SPICE_CHANNEL_INPUTS};
-static int channel_is_secondary(Channel *channel)
+static int channel_is_secondary(RedsChannel *channel)
{
int i;
for (i = 0 ; i < sizeof(secondary_channels)/sizeof(secondary_channels[0]); ++i) {
- if (channel->type == secondary_channels[i]) {
+ if (channel->base->type == secondary_channels[i]) {
return TRUE;
}
}
@@ -962,7 +994,7 @@ static int channel_is_secondary(Channel *channel)
void reds_fill_channels(SpiceMsgChannels *channels_info)
{
- Channel *channel;
+ RedsChannel *channel;
int i;
int used_channels = 0;
@@ -973,8 +1005,8 @@ void reds_fill_channels(SpiceMsgChannels *channels_info)
if (reds->num_clients > 1 && !channel_is_secondary(channel)) {
continue;
}
- channels_info->channels[used_channels].type = channel->type;
- channels_info->channels[used_channels].id = channel->id;
+ channels_info->channels[used_channels].type = channel->base->type;
+ channels_info->channels[used_channels].id = channel->base->id;
used_channels++;
}
channels_info->num_of_channels = used_channels;
@@ -1350,7 +1382,7 @@ static int sync_write(RedsStream *stream, const void *in_buf, size_t n)
return TRUE;
}
-static void reds_channel_set_common_caps(Channel *channel, int cap, int active)
+static void reds_channel_set_common_caps(RedsChannel *channel, int cap, int active)
{
int nbefore, n;
@@ -1367,7 +1399,7 @@ static void reds_channel_set_common_caps(Channel *channel, int cap, int active)
}
}
-static void reds_channel_init_auth_caps(Channel *channel)
+static void reds_channel_init_auth_caps(RedsChannel *channel)
{
if (sasl_enabled) {
reds_channel_set_common_caps(channel, SPICE_COMMON_CAP_AUTH_SASL, TRUE);
@@ -1377,12 +1409,8 @@ static void reds_channel_init_auth_caps(Channel *channel)
reds_channel_set_common_caps(channel, SPICE_COMMON_CAP_PROTOCOL_AUTH_SELECTION, TRUE);
}
-void reds_channel_dispose(Channel *channel)
+static void reds_dispose_channel(RedsChannel *channel)
{
- free(channel->caps);
- channel->caps = NULL;
- channel->num_caps = 0;
-
free(channel->common_caps);
channel->common_caps = NULL;
channel->num_common_caps = 0;
@@ -1392,8 +1420,8 @@ static int reds_send_link_ack(RedLinkInfo *link)
{
SpiceLinkHeader header;
SpiceLinkReply ack;
- Channel caps = { 0, };
- Channel *channel;
+ RedsChannel common_caps = { 0, };
+ RedsChannel *channel;
BUF_MEM *bmBuf;
BIO *bio;
int ret = FALSE;
@@ -1407,13 +1435,13 @@ static int reds_send_link_ack(RedLinkInfo *link)
channel = reds_find_channel(link->link_mess->channel_type, 0);
if (!channel) {
- channel = &caps;
+ channel = &common_caps;
}
reds_channel_init_auth_caps(channel); /* make sure common caps are set */
ack.num_common_caps = channel->num_common_caps;
- ack.num_channel_caps = channel->num_caps;
+ ack.num_channel_caps = channel->base ? channel->base->num_caps : 0;
header.size += (ack.num_common_caps + ack.num_channel_caps) * sizeof(uint32_t);
ack.caps_offset = sizeof(SpiceLinkReply);
@@ -1441,13 +1469,15 @@ static int reds_send_link_ack(RedLinkInfo *link)
goto end;
if (!sync_write(link->stream, channel->common_caps, channel->num_common_caps * sizeof(uint32_t)))
goto end;
- if (!sync_write(link->stream, channel->caps, channel->num_caps * sizeof(uint32_t)))
- goto end;
+ if (channel->base) {
+ if (!sync_write(link->stream, channel->base->caps, channel->base->num_caps * sizeof(uint32_t)))
+ goto end;
+ }
ret = TRUE;
end:
- reds_channel_dispose(&caps);
+ reds_dispose_channel(&common_caps);
BIO_free(bio);
return ret;
}
@@ -1540,18 +1570,19 @@ static void reds_handle_main_link(RedLinkInfo *link)
link->link_mess = NULL;
reds_link_free(link);
caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
- if (!reds->main_channel_factory) {
- reds->main_channel_factory = main_channel_init();
+ if (!reds->main_channel) {
+ reds->main_channel = main_channel_init();
+ ASSERT(reds->main_channel);
}
client = red_client_new();
ring_add(&reds->clients, &client->link);
reds->num_clients++;
- mcc = main_channel_link(reds->main_channel_factory, client,
- stream, connection_id, reds->mig_target, link_mess->num_common_caps,
- link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
- link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
- reds->main_channel = (MainChannel*)reds->main_channel_factory->data;
- ASSERT(reds->main_channel);
+ mcc = main_channel_link(reds->main_channel, client,
+ stream, connection_id, reds->mig_target,
+ link_mess->num_common_caps,
+ link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
+ link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
+ red_printf("NEW Client %p mcc %p connect-id %d", client, mcc, connection_id);
free(link_mess);
red_client_set_main(client, mcc);
@@ -1608,7 +1639,7 @@ static void openssl_init(RedLinkInfo *link)
static void reds_handle_other_links(RedLinkInfo *link)
{
- Channel *channel;
+ RedsChannel *channel;
RedClient *client = NULL;
RedsStream *stream;
SpiceLinkMess *link_mess;
@@ -1617,7 +1648,7 @@ static void reds_handle_other_links(RedLinkInfo *link)
link_mess = link->link_mess;
if (reds->main_channel) {
client = main_channel_get_client_by_link_id(reds->main_channel,
- link_mess->connection_id);
+ link_mess->connection_id);
}
// TODO: MC: broke migration (at least for the dont-drop-connection kind).
@@ -1650,9 +1681,12 @@ static void reds_handle_other_links(RedLinkInfo *link)
link->link_mess = NULL;
reds_link_free(link);
caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
- channel->link(channel, client, stream, reds->mig_target, link_mess->num_common_caps,
- link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
- link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
+ channel->base->client_cbs.connect(channel->base, client, stream, reds->mig_target,
+ link_mess->num_common_caps,
+ link_mess->num_common_caps ? caps : NULL,
+ link_mess->num_channel_caps,
+ link_mess->num_channel_caps ?
+ caps + link_mess->num_common_caps : NULL);
free(link_mess);
}
@@ -3077,7 +3111,7 @@ static void reds_mig_finished(int completed)
reds->mig_inprogress = TRUE;
if (completed) {
- Channel *channel;
+ RingItem *link, *next;
reds->mig_wait_disconnect = TRUE;
core->timer_start(reds->mig_timer, MIGRATE_TIMEOUT);
@@ -3086,14 +3120,16 @@ static void reds_mig_finished(int completed)
// - it can have an empty migrate - that seems ok
// - I can try to fill it's migrate, then move stuff from reds.c there, but a lot of data
// is in reds state right now.
+ // currently the migrate callback of main_channel does nothing
main_channel_push_migrate(reds->main_channel);
- channel = reds->channels;
- while (channel) {
- channel->migrate(channel);
- channel = channel->next;
+
+ RING_FOREACH_SAFE(link, next, &reds->clients) {
+ red_client_migrate(SPICE_CONTAINEROF(link, RedClient, link));
}
} else {
main_channel_push_migrate_cancel(reds->main_channel);
+ // TODO: all the seemless migration is broken. Before MC we waited for disconection of one client,
+ // no we need to wait to all the clients (see mig_timer)?
reds_mig_cleanup();
}
}
diff --git a/server/reds.h b/server/reds.h
index 6930fe6a..0bbda143 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -31,13 +31,10 @@
#include "common/marshaller.h"
#include "common/messages.h"
#include "spice.h"
+#include "red_channel.h"
#define SPICE_GNUC_VISIBLE __attribute__ ((visibility ("default")))
-typedef struct RedsStream RedsStream;
-typedef struct RedClient RedClient;
-typedef struct MainChannelClient MainChannelClient;
-
#if HAVE_SASL
typedef struct RedsSASL {
sasl_conn_t *conn;
@@ -88,22 +85,6 @@ struct RedsStream {
ssize_t (*writev)(RedsStream *s, const struct iovec *iov, int iovcnt);
};
-typedef struct Channel {
- struct Channel *next;
- uint32_t type;
- uint32_t id;
- int num_common_caps;
- uint32_t *common_caps;
- int num_caps;
- uint32_t *caps;
- void (*link)(struct Channel *, RedClient *client, RedsStream *stream,
- int migration, int num_common_caps,
- uint32_t *common_caps, int num_caps, uint32_t *caps);
- void (*shutdown)(struct Channel *);
- void (*migrate)(struct Channel *);
- void *data;
-} Channel;
-
struct QXLState {
QXLInterface *qif;
struct RedDispatcher *dispatcher;
@@ -114,8 +95,6 @@ struct SpiceNetWireState {
struct TunnelWorker *worker;
};
-void reds_channel_dispose(Channel *channel);
-
ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte);
ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte);
ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt);
@@ -127,8 +106,8 @@ void reds_update_mm_timer(uint32_t mm_time);
uint32_t reds_get_mm_time(void);
void reds_set_client_mouse_allowed(int is_client_mouse_allowed,
int x_res, int y_res);
-void reds_register_channel(Channel *channel);
-void reds_unregister_channel(Channel *channel);
+void reds_register_channel(RedChannel *channel);
+void reds_unregister_channel(RedChannel *channel);
int reds_get_mouse_mode(void); // used by inputs_channel
int reds_get_agent_mouse(void); // used by inputs_channel
int reds_has_vdagent(void); // used by inputs channel
diff --git a/server/smartcard.c b/server/smartcard.c
index 36004f75..2ff13101 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -22,6 +22,7 @@
#include <arpa/inet.h>
#include <vscard_common.h>
+#include "server/reds.h"
#include "server/char_device.h"
#include "server/red_channel.h"
#include "server/smartcard.h"
@@ -78,7 +79,7 @@ static void smartcard_on_message_from_device(
RedChannelClient *rcc, VSCMsgHeader *vheader);
static SmartCardDeviceState* smartcard_device_state_new();
static void smartcard_device_state_free(SmartCardDeviceState* st);
-static void smartcard_register_channel(void);
+static void smartcard_init(void);
void smartcard_char_device_wakeup(SpiceCharDeviceInstance *sin)
{
@@ -169,7 +170,7 @@ static int smartcard_char_device_add_to_readers(SpiceCharDeviceInstance *char_de
}
state->reader_id = g_smartcard_readers.num;
g_smartcard_readers.sin[g_smartcard_readers.num++] = char_device;
- smartcard_register_channel();
+ smartcard_init();
return 0;
}
@@ -274,7 +275,6 @@ static int smartcard_channel_client_config_socket(RedChannelClient *rcc)
static uint8_t *smartcard_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
SpiceDataHeader *msg_header)
{
- //red_printf("allocing %d bytes", msg_header->size);
return spice_malloc(msg_header->size);
}
@@ -337,10 +337,9 @@ static void smartcard_channel_release_pipe_item(RedChannelClient *rcc,
free(item);
}
-static void smartcard_channel_client_disconnect(RedChannelClient *rcc)
+static void smartcard_channel_on_disconnect(RedChannelClient *rcc)
{
smartcard_readers_detach_all(rcc);
- red_channel_client_destroy(rcc);
}
/* this is called from both device input and client input. since the device is
@@ -487,63 +486,55 @@ static void smartcard_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *it
{
}
-static void smartcard_link(Channel *channel, RedClient *client,
+static void smartcard_connect(RedChannel *channel, RedClient *client,
RedsStream *stream, int migration,
int num_common_caps, uint32_t *common_caps,
int num_caps, uint32_t *caps)
{
RedChannelClient *rcc;
- ChannelCbs channel_cbs;
- if (!channel->data) {
- memset(&channel_cbs, sizeof(channel_cbs), 0);
- channel_cbs.config_socket = smartcard_channel_client_config_socket;
- channel_cbs.disconnect = smartcard_channel_client_disconnect;
- channel_cbs.send_item = smartcard_channel_send_item;
- channel_cbs.hold_item = smartcard_channel_hold_pipe_item;
- channel_cbs.release_item = smartcard_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = smartcard_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = smartcard_channel_release_msg_rcv_buf;
- channel->data =
- red_channel_create(sizeof(SmartCardChannel),
- core, migration,
- FALSE /* handle_acks */,
- smartcard_channel_handle_message,
- &channel_cbs);
- if (channel->data) {
- red_channel_init_outgoing_messages_window((RedChannel*)channel->data);
- } else {
- red_printf("ERROR: smartcard channel creation failed");
- return;
- }
- }
- rcc = red_channel_client_create(sizeof(RedChannelClient),
- (RedChannel*)channel->data, client, stream);
+ rcc = red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
red_channel_client_ack_zero_messages_window(rcc);
}
-static void smartcard_shutdown(Channel *channel)
+static void smartcard_migrate(RedChannelClient *rcc)
{
}
-static void smartcard_migrate(Channel *channel)
-{
-}
+SmartCardChannel *g_smartcard_channel;
-static void smartcard_register_channel(void)
+static void smartcard_init(void)
{
- Channel *channel;
- static int registered = 0;
-
- if (registered) {
- return;
+ ChannelCbs channel_cbs;
+ ClientCbs client_cbs = {0,};
+
+ ASSERT(!g_smartcard_channel);
+
+ memset(&channel_cbs, sizeof(channel_cbs), 0);
+ memset(&client_cbs, sizeof(client_cbs), 0);
+
+ channel_cbs.config_socket = smartcard_channel_client_config_socket;
+ channel_cbs.on_disconnect = smartcard_channel_on_disconnect;
+ channel_cbs.send_item = smartcard_channel_send_item;
+ channel_cbs.hold_item = smartcard_channel_hold_pipe_item;
+ channel_cbs.release_item = smartcard_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = smartcard_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = smartcard_channel_release_msg_rcv_buf;
+
+ g_smartcard_channel = (SmartCardChannel*)red_channel_create(sizeof(SmartCardChannel),
+ core, SPICE_CHANNEL_SMARTCARD, 0,
+ FALSE /* migration - TODO?*/,
+ FALSE /* handle_acks */,
+ smartcard_channel_handle_message,
+ &channel_cbs);
+
+ if (!g_smartcard_channel) {
+ red_error("failed to allocate Inputs Channel");
}
- red_printf("registering smartcard channel");
- registered = 1;
- channel = spice_new0(Channel, 1);
- channel->type = SPICE_CHANNEL_SMARTCARD;
- channel->link = smartcard_link;
- channel->shutdown = smartcard_shutdown;
- channel->migrate = smartcard_migrate;
- reds_register_channel(channel);
+
+ client_cbs.connect = smartcard_connect;
+ client_cbs.migrate = smartcard_migrate;
+ red_channel_register_client_cbs(&g_smartcard_channel->base, &client_cbs);
+
+ reds_register_channel(&g_smartcard_channel->base);
}
diff --git a/server/snd_worker.c b/server/snd_worker.c
index 332612f5..4c9a6f01 100644
--- a/server/snd_worker.c
+++ b/server/snd_worker.c
@@ -28,6 +28,7 @@
#include "spice.h"
#include "red_common.h"
+#include "main_channel.h"
#include "reds.h"
#include "red_dispatcher.h"
#include "snd_worker.h"
@@ -35,12 +36,6 @@
#include "generated_marshallers.h"
#include "demarshallers.h"
-/* main_channel.h inclusion drags red_channel.h which has conflicting types.
- * until the channels here are defined in terms of red_channel.h we have some
- * duplicate declarations */
-MainChannelClient *red_client_get_main(RedClient *client);
-int main_channel_client_is_low_bandwidth(MainChannelClient *mcc);
-
#define MAX_SEND_VEC 100
#define RECIVE_BUF_SIZE (16 * 1024 * 2)
@@ -78,10 +73,10 @@ enum RecordCommand {
#define SND_RECORD_VOLUME_MASK (1 << SND_RECORD_VOLUME)
typedef struct SndChannel SndChannel;
-typedef void (*send_messages_proc)(void *in_channel);
-typedef int (*handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message);
-typedef void (*on_message_done_proc)(SndChannel *channel);
-typedef void (*cleanup_channel_proc)(SndChannel *channel);
+typedef void (*snd_channel_send_messages_proc)(void *in_channel);
+typedef int (*snd_channel_handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message);
+typedef void (*snd_channel_on_message_done_proc)(SndChannel *channel);
+typedef void (*snd_channel_cleanup_channel_proc)(SndChannel *channel);
typedef struct SndWorker SndWorker;
@@ -90,6 +85,8 @@ struct SndChannel {
SndWorker *worker;
spice_parse_channel_func_t parser;
+ RedChannelClient *channel_client;
+
int active;
int client_active;
int blocked;
@@ -116,10 +113,10 @@ struct SndChannel {
uint8_t *end;
} recive_data;
- send_messages_proc send_messages;
- handle_message_proc handle_message;
- on_message_done_proc on_message_done;
- cleanup_channel_proc cleanup;
+ snd_channel_send_messages_proc send_messages;
+ snd_channel_handle_message_proc handle_message;
+ snd_channel_on_message_done_proc on_message_done;
+ snd_channel_cleanup_channel_proc cleanup;
int num_caps;
uint32_t *caps;
};
@@ -146,7 +143,7 @@ typedef struct PlaybackChannel {
} PlaybackChannel;
struct SndWorker {
- Channel base;
+ RedChannel *base_channel;
SndChannel *connection;
SndWorker *next;
int active;
@@ -193,7 +190,7 @@ typedef struct RecordChannel {
uint32_t celt_buf[FRAME_SIZE];
} RecordChannel;
-static SndWorker *workers = NULL;
+static SndWorker *workers;
static uint32_t playback_compression = SPICE_AUDIO_DATA_MODE_CELT_0_5_1;
static void snd_receive(void* data);
@@ -863,10 +860,11 @@ static void snd_record_send(void* data)
static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id,
RedClient *client,
RedsStream *stream,
- int migrate, send_messages_proc send_messages,
- handle_message_proc handle_message,
- on_message_done_proc on_message_done,
- cleanup_channel_proc cleanup,
+ int migrate,
+ snd_channel_send_messages_proc send_messages,
+ snd_channel_handle_message_proc handle_message,
+ snd_channel_on_message_done_proc on_message_done,
+ snd_channel_cleanup_channel_proc cleanup,
uint32_t *caps, int num_caps)
{
SndChannel *channel;
@@ -926,6 +924,10 @@ static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_i
channel->cleanup = cleanup;
channel->num_caps = num_caps;
channel->caps = spice_memdup(caps, num_caps * sizeof(uint32_t));
+
+ channel->channel_client = red_channel_client_create_dummy(sizeof(RedChannelClient),
+ worker->base_channel,
+ client);
return channel;
error2:
@@ -936,9 +938,15 @@ error1:
return NULL;
}
-static void snd_shutdown(Channel *channel)
+static void snd_disconnect_channel_client(RedChannelClient *rcc)
{
- SndWorker *worker = (SndWorker *)channel;
+ SndWorker *worker;
+
+ ASSERT(rcc->channel);
+ ASSERT(rcc->channel->data);
+ worker = (SndWorker *)rcc->channel->data;
+
+ ASSERT(worker->connection->channel_client == rcc);
snd_disconnect_channel(worker->connection);
}
@@ -1096,13 +1104,13 @@ static void snd_playback_cleanup(SndChannel *channel)
celt051_mode_destroy(playback_channel->celt_mode);
}
-static void snd_set_playback_peer(Channel *channel, RedClient *client, RedsStream *stream,
+static void snd_set_playback_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
int migration, int num_common_caps, uint32_t *common_caps,
int num_caps, uint32_t *caps)
{
- SndWorker *worker = (SndWorker *)channel;
- SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker);
+ SndWorker *worker = channel->data;
PlaybackChannel *playback_channel;
+ SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker);
CELTEncoder *celt_encoder;
CELTMode *celt_mode;
int celt_error;
@@ -1158,10 +1166,16 @@ error_1:
celt051_mode_destroy(celt_mode);
}
-static void snd_record_migrate(Channel *channel)
+static void snd_record_migrate_channel_client(RedChannelClient *rcc)
{
- SndWorker *worker = (SndWorker *)channel;
+ SndWorker *worker;
+
+ ASSERT(rcc->channel);
+ ASSERT(rcc->channel->data);
+ worker = (SndWorker *)rcc->channel->data;
+
if (worker->connection) {
+ ASSERT(worker->connection->channel_client == rcc);
snd_set_command(worker->connection, SND_RECORD_MIGRATE_MASK);
snd_record_send(worker->connection);
}
@@ -1290,19 +1304,19 @@ static void on_new_record_channel(SndWorker *worker)
static void snd_record_cleanup(SndChannel *channel)
{
- RecordChannel *record_channel = (RecordChannel *)channel;
+ RecordChannel *record_channel = SPICE_CONTAINEROF(channel, RecordChannel, base);
celt051_decoder_destroy(record_channel->celt_decoder);
celt051_mode_destroy(record_channel->celt_mode);
}
-static void snd_set_record_peer(Channel *channel, RedClient *client, RedsStream *stream,
+static void snd_set_record_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
int migration, int num_common_caps, uint32_t *common_caps,
int num_caps, uint32_t *caps)
{
- SndWorker *worker = (SndWorker *)channel;
- SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker);
+ SndWorker *worker = channel->data;
RecordChannel *record_channel;
+ SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker);
CELTDecoder *celt_decoder;
CELTMode *celt_mode;
int celt_error;
@@ -1354,11 +1368,16 @@ error_2:
celt051_mode_destroy(celt_mode);
}
-static void snd_playback_migrate(Channel *channel)
+static void snd_playback_migrate_channel_client(RedChannelClient *rcc)
{
- SndWorker *worker = (SndWorker *)channel;
+ SndWorker *worker;
+
+ ASSERT(rcc->channel);
+ ASSERT(rcc->channel->data);
+ worker = (SndWorker *)rcc->channel->data;
if (worker->connection) {
+ ASSERT(worker->connection->channel_client == rcc);
snd_set_command(worker->connection, SND_PLAYBACK_MIGRATE_MASK);
snd_playback_send(worker->connection);
}
@@ -1386,48 +1405,67 @@ static void remove_worker(SndWorker *worker)
void snd_attach_playback(SpicePlaybackInstance *sin)
{
SndWorker *playback_worker;
+ int num_caps;
+ uint32_t *caps;
+ RedChannel *channel;
+ ClientCbs client_cbs = {0,};
sin->st = spice_new0(SpicePlaybackState, 1);
sin->st->sin = sin;
playback_worker = &sin->st->worker;
- playback_worker->base.type = SPICE_CHANNEL_PLAYBACK;
- playback_worker->base.link = snd_set_playback_peer;
- playback_worker->base.shutdown = snd_shutdown;
- playback_worker->base.migrate = snd_playback_migrate;
- playback_worker->base.data = NULL;
+ // TODO: Make RedChannel base of worker? instead of assigning it to channel->data
+ channel = red_channel_create_dummy(sizeof(RedChannel), SPICE_CHANNEL_PLAYBACK, 0);
+
+ channel->data = playback_worker;
+ client_cbs.connect = snd_set_playback_peer;
+ client_cbs.disconnect = snd_disconnect_channel_client;
+ client_cbs.migrate = snd_playback_migrate_channel_client;
+ red_channel_register_client_cbs(channel, &client_cbs);
+ red_channel_set_data(channel, playback_worker);
- playback_worker->base.num_caps = 1;
- playback_worker->base.caps = spice_new(uint32_t, 1);
- playback_worker->base.caps[0] =
- (1 << SPICE_PLAYBACK_CAP_CELT_0_5_1) |
- (1 << SPICE_PLAYBACK_CAP_VOLUME);
+ num_caps = 1;
+ caps = spice_new(uint32_t, 1);
+ caps[0] = (1 << SPICE_PLAYBACK_CAP_CELT_0_5_1) |
+ (1 << SPICE_PLAYBACK_CAP_VOLUME);
+ red_channel_set_caps(channel, num_caps, caps);
+ playback_worker->base_channel = channel;
add_worker(playback_worker);
- reds_register_channel(&playback_worker->base);
+ reds_register_channel(playback_worker->base_channel);
}
void snd_attach_record(SpiceRecordInstance *sin)
{
SndWorker *record_worker;
+ int num_caps;
+ uint32_t *caps;
+ RedChannel *channel;
+ ClientCbs client_cbs = {0,};
sin->st = spice_new0(SpiceRecordState, 1);
sin->st->sin = sin;
record_worker = &sin->st->worker;
- record_worker->base.type = SPICE_CHANNEL_RECORD;
- record_worker->base.link = snd_set_record_peer;
- record_worker->base.shutdown = snd_shutdown;
- record_worker->base.migrate = snd_record_migrate;
- record_worker->base.data = NULL;
-
- record_worker->base.num_caps = 1;
- record_worker->base.caps = spice_new(uint32_t, 1);
- record_worker->base.caps[0] =
- (1 << SPICE_RECORD_CAP_CELT_0_5_1) |
- (1 << SPICE_RECORD_CAP_VOLUME);
+ // TODO: Make RedChannel base of worker? instead of assigning it to channel->data
+ channel = red_channel_create_dummy(sizeof(RedChannel), SPICE_CHANNEL_RECORD, 0);
+
+ channel->data = record_worker;
+ client_cbs.connect = snd_set_record_peer;
+ client_cbs.disconnect = snd_disconnect_channel_client;
+ client_cbs.migrate = snd_record_migrate_channel_client;
+ red_channel_register_client_cbs(channel, &client_cbs);
+ red_channel_set_data(channel, record_worker);
+
+ num_caps = 1;
+ caps = spice_new(uint32_t, 1);
+ caps[0] = (1 << SPICE_RECORD_CAP_CELT_0_5_1) |
+ (1 << SPICE_RECORD_CAP_VOLUME);
+ red_channel_set_caps(channel, num_caps, caps);
+
+ record_worker->base_channel = channel;
add_worker(record_worker);
- reds_register_channel(&record_worker->base);
+ reds_register_channel(record_worker->base_channel);
}
static void snd_detach_common(SndWorker *worker)
@@ -1437,9 +1475,8 @@ static void snd_detach_common(SndWorker *worker)
}
remove_worker(worker);
snd_disconnect_channel(worker->connection);
- reds_unregister_channel(&worker->base);
-
- reds_channel_dispose(&worker->base);
+ reds_unregister_channel(worker->base_channel);
+ red_channel_destroy(worker->base_channel);
}
static void spice_playback_state_free(SpicePlaybackState *st)
@@ -1472,7 +1509,7 @@ void snd_set_playback_compression(int on)
playback_compression = on ? SPICE_AUDIO_DATA_MODE_CELT_0_5_1 : SPICE_AUDIO_DATA_MODE_RAW;
for (; now; now = now->next) {
- if (now->base.type == SPICE_CHANNEL_PLAYBACK && now->connection) {
+ if (now->base_channel->type == SPICE_CHANNEL_PLAYBACK && now->connection) {
SndChannel* sndchannel = now->connection;
PlaybackChannel* playback = (PlaybackChannel*)now->connection;
if (!check_cap(sndchannel->caps, sndchannel->num_caps,
diff --git a/server/usbredir.c b/server/usbredir.c
index 50a60956..8daab7d4 100644
--- a/server/usbredir.c
+++ b/server/usbredir.c
@@ -24,6 +24,7 @@
#include "server/char_device.h"
#include "server/red_channel.h"
+#include "server/reds.h"
/* 64K should be enough for all but the largest bulk xfers + 32 bytes hdr */
#define BUF_SIZE (64 * 1024 + 32)
@@ -36,8 +37,7 @@ typedef struct UsbRedirPipeItem {
} UsbRedirPipeItem;
typedef struct UsbRedirState {
- Channel channel;
- RedChannel *red_channel;
+ RedChannel *channel;
RedChannelClient *rcc;
SpiceCharDeviceState chardev_st;
SpiceCharDeviceInstance *chardev_sin;
@@ -60,14 +60,14 @@ static void usbredir_chardev_wakeup(SpiceCharDeviceInstance *sin)
state = SPICE_CONTAINEROF(sin->st, UsbRedirState, chardev_st);
sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base);
- if (!state->red_channel) {
+ if (!state->rcc) {
return;
}
do {
if (!state->pipe_item) {
state->pipe_item = spice_malloc(sizeof(UsbRedirPipeItem));
- red_channel_pipe_item_init(state->red_channel,
+ red_channel_pipe_item_init(state->rcc->channel,
&state->pipe_item->base, 0);
}
@@ -82,12 +82,12 @@ static void usbredir_chardev_wakeup(SpiceCharDeviceInstance *sin)
} while (n > 0);
}
-static int usbredir_red_channel_config_socket(RedChannelClient *rcc)
+static int usbredir_red_channel_client_config_socket(RedChannelClient *rcc)
{
return TRUE;
}
-static void usbredir_red_channel_disconnect(RedChannelClient *rcc)
+static void usbredir_red_channel_client_on_disconnect(RedChannelClient *rcc)
{
UsbRedirState *state;
SpiceCharDeviceInstance *sin;
@@ -177,16 +177,18 @@ static void usbredir_red_channel_release_pipe_item(RedChannelClient *rcc,
free(item);
}
-static void usbredir_link(Channel *channel, RedClient *client, RedsStream *stream, int migration,
- int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps)
+static void usbredir_connect(RedChannel *channel, RedClient *client,
+ RedsStream *stream, int migration, int num_common_caps,
+ uint32_t *common_caps, int num_caps, uint32_t *caps)
{
+ RedChannelClient *rcc;
UsbRedirState *state;
UsbRedirChannel *redir_chan;
SpiceCharDeviceInstance *sin;
SpiceCharDeviceInterface *sif;
- ChannelCbs channel_cbs;
- state = SPICE_CONTAINEROF(channel, UsbRedirState, channel);
+ redir_chan = SPICE_CONTAINEROF(channel, UsbRedirChannel, base);
+ state = redir_chan->state;
sin = state->chardev_sin;
sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base);
@@ -199,86 +201,78 @@ static void usbredir_link(Channel *channel, RedClient *client, RedsStream *strea
return;
}
- if (!state->red_channel) {
- memset(&channel_cbs, sizeof(channel_cbs), 0);
- channel_cbs.config_socket = usbredir_red_channel_config_socket;
- channel_cbs.disconnect = usbredir_red_channel_disconnect;
- channel_cbs.send_item = usbredir_red_channel_send_item;
- channel_cbs.hold_item = usbredir_red_channel_hold_pipe_item;
- channel_cbs.release_item = usbredir_red_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = usbredir_red_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = usbredir_red_channel_release_msg_rcv_buf;
- state->red_channel = red_channel_create(sizeof(UsbRedirChannel),
- core, migration, FALSE /* handle_acks */,
- usbredir_red_channel_client_handle_message,
- &channel_cbs);
- }
- if (!state->red_channel) {
- return;
- }
- state->rcc = red_channel_client_create(sizeof(RedChannelClient), state->red_channel,
- client, stream);
- if (!state->rcc) {
- red_printf("failed to create usbredir channel client\n");
+ rcc = red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
+ if (!rcc) {
return;
}
- red_channel_init_outgoing_messages_window(state->red_channel);
- redir_chan = SPICE_CONTAINEROF(state->red_channel, UsbRedirChannel, base);
- redir_chan->state = state;
+ state->rcc = rcc;
+ red_channel_client_ack_zero_messages_window(rcc);
if (sif->state) {
sif->state(sin, 1);
}
}
-static void usbredir_shutdown(Channel *channel)
-{
- UsbRedirState *state = SPICE_CONTAINEROF(channel, UsbRedirState, channel);
-
- usbredir_red_channel_disconnect(state->rcc);
- red_channel_destroy(state->red_channel);
- state->red_channel = NULL;
-}
-
-static void usbredir_migrate(Channel *channel)
+static void usbredir_migrate(RedChannelClient *rcc)
{
/* NOOP */
}
int usbredir_device_connect(SpiceCharDeviceInstance *sin)
{
- UsbRedirState *state;
static int id = 0;
-
+ UsbRedirState *state;
+ UsbRedirChannel *redir_chan;
+ ChannelCbs channel_cbs = {0,};
+ ClientCbs client_cbs = {0,};
+
+ channel_cbs.config_socket = usbredir_red_channel_client_config_socket;
+ channel_cbs.on_disconnect = usbredir_red_channel_client_on_disconnect;
+ channel_cbs.send_item = usbredir_red_channel_send_item;
+ channel_cbs.hold_item = usbredir_red_channel_hold_pipe_item;
+ channel_cbs.release_item = usbredir_red_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = usbredir_red_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = usbredir_red_channel_release_msg_rcv_buf;
+
+ redir_chan = (UsbRedirChannel*)red_channel_create(sizeof(UsbRedirChannel),
+ core, SPICE_CHANNEL_USBREDIR, id++,
+ FALSE /* migration - TODO?*/,
+ FALSE /* handle_acks */,
+ usbredir_red_channel_client_handle_message,
+ &channel_cbs);
+ red_channel_init_outgoing_messages_window(&redir_chan->base);
state = spice_new0(UsbRedirState, 1);
- state->channel.type = SPICE_CHANNEL_USBREDIR;
- state->channel.id = id++;
- state->channel.link = usbredir_link;
- state->channel.shutdown = usbredir_shutdown;
- state->channel.migrate = usbredir_migrate;
+ state->channel = &redir_chan->base;
state->chardev_st.wakeup = usbredir_chardev_wakeup;
state->chardev_sin = sin;
state->rcv_buf = spice_malloc(BUF_SIZE);
state->rcv_buf_size = BUF_SIZE;
+ client_cbs.connect = usbredir_connect;
+ client_cbs.migrate = usbredir_migrate;
+ red_channel_register_client_cbs(&redir_chan->base, &client_cbs);
+
sin->st = &state->chardev_st;
- reds_register_channel(&state->channel);
+ reds_register_channel(&redir_chan->base);
return 0;
}
+/* Must be called from RedClient handling thread. */
void usbredir_device_disconnect(SpiceCharDeviceInstance *sin)
{
UsbRedirState *state;
state = SPICE_CONTAINEROF(sin->st, UsbRedirState, chardev_st);
- reds_unregister_channel(&state->channel);
+ reds_unregister_channel(state->channel);
- usbredir_red_channel_disconnect(state->rcc);
+ red_channel_client_destroy(state->rcc);
+ red_channel_disconnect(state->channel);
free(state->pipe_item);
free(state->rcv_buf);
+ free(state->channel);
free(state);
}