summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--server/inputs_channel.c115
-rw-r--r--server/main_channel.c107
-rw-r--r--server/main_channel.h4
-rw-r--r--server/red_channel.c278
-rw-r--r--server/red_channel.h70
-rw-r--r--server/red_dispatcher.c163
-rw-r--r--server/red_dispatcher.h5
-rw-r--r--server/red_tunnel_worker.c211
-rw-r--r--server/red_worker.c179
-rw-r--r--server/red_worker.h5
-rw-r--r--server/reds.c156
-rw-r--r--server/reds.h27
-rw-r--r--server/smartcard.c89
-rw-r--r--server/snd_worker.c157
-rw-r--r--server/usbredir.c102
15 files changed, 856 insertions, 812 deletions
diff --git a/server/inputs_channel.c b/server/inputs_channel.c
index e6af0d58..653910e8 100644
--- a/server/inputs_channel.c
+++ b/server/inputs_channel.c
@@ -434,37 +434,18 @@ static void inputs_relase_keys(void)
kbd_push_scan(keyboard, 0x38 | 0x80); //LALT
}
-static void inputs_channel_disconnect(RedChannelClient *rcc)
+static void inputs_channel_on_disconnect(RedChannelClient *rcc)
{
- inputs_relase_keys();
- red_channel_client_disconnect(rcc);
-}
-
-static void inputs_channel_on_error(RedChannelClient *rcc)
-{
- red_printf("");
- inputs_channel_disconnect(rcc);
-}
-
-static void inputs_shutdown(Channel *channel)
-{
- InputsChannel *inputs_channel = (InputsChannel *)channel->data;
- ASSERT(g_inputs_channel == inputs_channel);
-
- if (inputs_channel) {
- red_channel_shutdown(&inputs_channel->base);
- red_channel_destroy(&inputs_channel->base);
- channel->data = NULL;
- g_inputs_channel = NULL;
+ if (!rcc) {
+ return;
}
+ inputs_relase_keys();
}
-static void inputs_migrate(Channel *channel)
+static void inputs_migrate(RedChannelClient *rcc)
{
- InputsChannel *inputs_channel = channel->data;
-
- ASSERT(g_inputs_channel == (InputsChannel *)channel->data);
- red_channel_pipes_add_type(&inputs_channel->base, PIPE_ITEM_MIGRATE);
+ ASSERT(g_inputs_channel == (InputsChannel *)rcc->channel);
+ red_channel_client_pipe_add_type(rcc, PIPE_ITEM_MIGRATE);
}
static void inputs_pipe_add_init(RedChannelClient *rcc)
@@ -501,40 +482,21 @@ static void inputs_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
{
}
-static void inputs_link(Channel *channel, RedClient *client,
- RedsStream *stream, int migration,
- int num_common_caps, uint32_t *common_caps,
- int num_caps, uint32_t *caps)
+static void inputs_connect(RedChannel *channel, RedClient *client,
+ RedsStream *stream, int migration,
+ int num_common_caps, uint32_t *common_caps,
+ int num_caps, uint32_t *caps)
{
InputsChannelClient *icc;
- ASSERT(channel->data == g_inputs_channel);
- if (channel->data == NULL) {
- ChannelCbs channel_cbs;
-
- memset(&channel_cbs, sizeof(channel_cbs), 0);
-
- channel_cbs.config_socket = inputs_channel_config_socket;
- channel_cbs.disconnect = inputs_channel_disconnect;
- channel_cbs.send_item = inputs_channel_send_item;
- channel_cbs.hold_item = inputs_channel_hold_pipe_item;
- channel_cbs.release_item = inputs_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = inputs_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = inputs_channel_release_msg_rcv_buf;
-
- red_printf("inputs channel create");
- channel->data = g_inputs_channel = (InputsChannel*)red_channel_create_parser(
- sizeof(InputsChannel), core, migration, FALSE /* handle_acks */
- ,spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL)
- ,inputs_channel_handle_parsed
- ,inputs_channel_on_error
- ,inputs_channel_on_error
- ,&channel_cbs);
- ASSERT(channel->data);
- }
+ ASSERT(g_inputs_channel);
+ ASSERT(channel == &g_inputs_channel->base);
+
red_printf("inputs channel client create");
icc = (InputsChannelClient*)red_channel_client_create(sizeof(InputsChannelClient),
- channel->data, client, stream);
+ channel,
+ client,
+ stream);
icc->motion_count = 0;
inputs_pipe_add_init(&icc->base);
}
@@ -560,14 +522,41 @@ static void key_modifiers_sender(void *opaque)
void inputs_init(void)
{
- Channel *channel;
-
- channel = spice_new0(Channel, 1);
- channel->type = SPICE_CHANNEL_INPUTS;
- channel->link = inputs_link;
- channel->shutdown = inputs_shutdown;
- channel->migrate = inputs_migrate;
- reds_register_channel(channel);
+ ChannelCbs channel_cbs;
+ ClientCbs client_cbs = {0,};
+
+ ASSERT(!g_inputs_channel);
+
+ memset(&channel_cbs, sizeof(channel_cbs), 0);
+ memset(&client_cbs, sizeof(client_cbs), 0);
+
+ channel_cbs.config_socket = inputs_channel_config_socket;
+ channel_cbs.on_disconnect = inputs_channel_on_disconnect;
+ channel_cbs.send_item = inputs_channel_send_item;
+ channel_cbs.hold_item = inputs_channel_hold_pipe_item;
+ channel_cbs.release_item = inputs_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = inputs_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = inputs_channel_release_msg_rcv_buf;
+
+ g_inputs_channel = (InputsChannel *)red_channel_create_parser(
+ sizeof(InputsChannel),
+ core,
+ SPICE_CHANNEL_INPUTS, 0,
+ FALSE, // TODO: set migration?
+ FALSE, /* handle_acks */
+ spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL),
+ inputs_channel_handle_parsed,
+ &channel_cbs);
+
+ if (!g_inputs_channel) {
+ red_error("failed to allocate Inputs Channel");
+ }
+
+ client_cbs.connect = inputs_connect;
+ client_cbs.migrate = inputs_migrate;
+ red_channel_register_client_cbs(&g_inputs_channel->base, &client_cbs);
+
+ reds_register_channel(&g_inputs_channel->base);
if (!(key_modifiers_timer = core->timer_add(key_modifiers_sender, NULL))) {
red_error("key modifiers timer create failed");
diff --git a/server/main_channel.c b/server/main_channel.c
index 54b3dff4..6d77f711 100644
--- a/server/main_channel.c
+++ b/server/main_channel.c
@@ -155,14 +155,13 @@ enum NetTestStage {
NET_TEST_STAGE_RATE,
};
-static void main_channel_client_disconnect(RedChannelClient *rcc)
+// when disconnection occurs, let reds shutdown all channels. This will trigger the
+// real disconnection of main channel
+static void main_channel_client_on_disconnect(RedChannelClient *rcc)
{
- red_channel_client_disconnect(rcc);
-}
-
-static void main_disconnect(MainChannel *main_chan)
-{
- red_channel_destroy(&main_chan->base);
+ red_printf("rcc=%p", rcc);
+ reds_client_disconnect(rcc->client);
+// red_channel_client_disconnect(rcc);
}
RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id)
@@ -840,12 +839,6 @@ static int main_channel_handle_parsed(RedChannelClient *rcc, uint32_t size, uint
return TRUE;
}
-static void main_channel_on_error(RedChannelClient *rcc)
-{
- red_printf("");
- reds_client_disconnect(rcc->client);
-}
-
static uint8_t *main_channel_alloc_msg_rcv_buf(RedChannelClient *rcc, SpiceDataHeader *msg_header)
{
MainChannel *main_chan = SPICE_CONTAINEROF(rcc->channel, MainChannel, base);
@@ -912,11 +905,12 @@ uint32_t main_channel_client_get_link_id(MainChannelClient *mcc)
return mcc->connection_id;
}
-MainChannelClient *main_channel_client_create(MainChannel *main_chan,
- RedClient *client, RedsStream *stream, uint32_t connection_id)
+MainChannelClient *main_channel_client_create(MainChannel *main_chan, RedClient *client,
+ RedsStream *stream, uint32_t connection_id)
{
- MainChannelClient *mcc = (MainChannelClient*)red_channel_client_create(
- sizeof(MainChannelClient), &main_chan->base, client, stream);
+ MainChannelClient *mcc = (MainChannelClient*)red_channel_client_create(sizeof(MainChannelClient),
+ &main_chan->base,
+ client, stream);
mcc->connection_id = connection_id;
mcc->bitrate_per_sec = ~0;
@@ -929,42 +923,20 @@ MainChannelClient *main_channel_client_create(MainChannel *main_chan,
return mcc;
}
-MainChannelClient *main_channel_link(Channel *channel, RedClient *client,
- RedsStream *stream, uint32_t connection_id, int migration,
- int num_common_caps, uint32_t *common_caps, int num_caps,
- uint32_t *caps)
+MainChannelClient *main_channel_link(MainChannel *channel, RedClient *client,
+ RedsStream *stream, uint32_t connection_id, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
{
MainChannelClient *mcc;
- if (channel->data == NULL) {
- ChannelCbs channel_cbs;
-
- channel_cbs.config_socket = main_channel_config_socket;
- channel_cbs.disconnect = main_channel_client_disconnect;
- channel_cbs.send_item = main_channel_send_item;
- channel_cbs.hold_item = main_channel_hold_pipe_item;
- channel_cbs.release_item = main_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = main_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = main_channel_release_msg_rcv_buf;
- channel_cbs.handle_migrate_flush_mark = main_channel_handle_migrate_flush_mark;
- channel_cbs.handle_migrate_data = main_channel_handle_migrate_data;
- channel_cbs.handle_migrate_data_get_serial = main_channel_handle_migrate_data_get_serial;
-
- red_printf("create main channel");
- channel->data = red_channel_create_parser(
- sizeof(MainChannel), core, migration, FALSE /* handle_acks */
- ,spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL)
- ,main_channel_handle_parsed
- ,main_channel_on_error
- ,main_channel_on_error
- ,&channel_cbs);
- ASSERT(channel->data);
- }
+ ASSERT(channel);
+
// TODO - migration - I removed it from channel creation, now put it
// into usage somewhere (not an issue until we return migration to it's
// former glory)
red_printf("add main channel client");
- mcc = main_channel_client_create(channel->data, client, stream, connection_id);
+ mcc = main_channel_client_create(channel, client, stream, connection_id);
return mcc;
}
@@ -978,6 +950,7 @@ int main_channel_getpeername(MainChannel *main_chan, struct sockaddr *sa, sockle
return main_chan ? getpeername(red_channel_get_first_socket(&main_chan->base), sa, salen) : -1;
}
+// TODO: ? shouldn't it disonnect all clients? or shutdown all main_channels?
void main_channel_close(MainChannel *main_chan)
{
int socketfd;
@@ -998,27 +971,31 @@ uint64_t main_channel_client_get_bitrate_per_sec(MainChannelClient *mcc)
return mcc->bitrate_per_sec;
}
-static void main_channel_shutdown(Channel *channel)
+MainChannel* main_channel_init(void)
{
- MainChannel *main_chan = channel->data;
+ RedChannel *channel;
+ ChannelCbs channel_cbs;
- if (main_chan != NULL) {
- main_disconnect(main_chan);
- }
-}
-static void main_channel_migrate()
-{
-}
+ channel_cbs.config_socket = main_channel_config_socket;
+ channel_cbs.on_disconnect = main_channel_client_on_disconnect;
+ channel_cbs.send_item = main_channel_send_item;
+ channel_cbs.hold_item = main_channel_hold_pipe_item;
+ channel_cbs.release_item = main_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = main_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = main_channel_release_msg_rcv_buf;
+ channel_cbs.handle_migrate_flush_mark = main_channel_handle_migrate_flush_mark;
+ channel_cbs.handle_migrate_data = main_channel_handle_migrate_data;
+ channel_cbs.handle_migrate_data_get_serial = main_channel_handle_migrate_data_get_serial;
-Channel* main_channel_init(void)
-{
- Channel *channel;
-
- channel = spice_new0(Channel, 1);
- channel->type = SPICE_CHANNEL_MAIN;
- channel->link = NULL; /* the main channel client is created by reds.c explicitly */
- channel->shutdown = main_channel_shutdown;
- channel->migrate = main_channel_migrate;
- return channel;
+ // TODO: set the migration flag of the channel
+ channel = red_channel_create_parser(sizeof(MainChannel), core,
+ SPICE_CHANNEL_MAIN, 0,
+ FALSE, FALSE, /* handle_acks */
+ spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL),
+ main_channel_handle_parsed,
+ &channel_cbs);
+ ASSERT(channel);
+
+ return (MainChannel *)channel;
}
diff --git a/server/main_channel.h b/server/main_channel.h
index 0dd37973..80475354 100644
--- a/server/main_channel.h
+++ b/server/main_channel.h
@@ -47,10 +47,10 @@ struct MainMigrateData {
typedef struct MainChannel MainChannel;
-Channel *main_channel_init(void);
+MainChannel *main_channel_init(void);
RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t link_id);
/* This is a 'clone' from the reds.h Channel.link callback to allow passing link_id */
-MainChannelClient *main_channel_link(struct Channel *, RedClient *client,
+MainChannelClient *main_channel_link(MainChannel *, RedClient *client,
RedsStream *stream, uint32_t link_id, int migration, int num_common_caps,
uint32_t *common_caps, int num_caps, uint32_t *caps);
void main_channel_close(MainChannel *main_chan); // not destroy, just socket close
diff --git a/server/red_channel.c b/server/red_channel.c
index 80aa667c..ae333aa7 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -32,10 +32,12 @@
#include "ring.h"
#include "stat.h"
#include "red_channel.h"
+#include "reds.h"
#include "generated_marshallers.h"
static void red_channel_client_event(int fd, int event, void *data);
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
+static void red_client_remove_channel(RedChannelClient *rcc);
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -138,10 +140,6 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
handler->msg);
}
- if (handler->shut) {
- handler->cb->on_error(handler->opaque);
- return;
- }
handler->msg_pos = 0;
handler->msg = NULL;
handler->header_pos = 0;
@@ -221,22 +219,9 @@ static void red_channel_client_on_output(void *opaque, int n)
stat_inc_counter(rcc->channel->out_bytes_counter, n);
}
-void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
-{
- rcc->channel->channel_cbs.disconnect(rcc);
-}
-
-static void red_channel_peer_on_incoming_error(RedChannelClient *rcc)
-{
- if (rcc->stream->shutdown) {
- return; // assume error has already been handled which caused the shutdown.
- }
- rcc->channel->on_incoming_error(rcc);
-}
-
-static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc)
+static void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
{
- rcc->channel->on_outgoing_error(rcc);
+ red_channel_client_disconnect(rcc);
}
static int red_channel_client_peer_get_out_msg_size(void *opaque)
@@ -417,6 +402,21 @@ error:
return NULL;
}
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client)
+{
+ RedChannelClient *rcc;
+
+ ASSERT(size >= sizeof(RedChannelClient));
+ rcc = spice_malloc0(size);
+ rcc->client = client;
+ rcc->channel = channel;
+ red_channel_add_client(channel, rcc);
+ return rcc;
+}
+
static void red_channel_client_default_connect(RedChannel *channel, RedClient *client,
RedsStream *stream,
int migration,
@@ -437,6 +437,7 @@ static void red_channel_client_default_migrate(RedChannelClient *base)
RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
channel_handle_message_proc handle_message,
ChannelCbs *channel_cbs)
@@ -445,11 +446,13 @@ RedChannel *red_channel_create(int size,
ClientCbs client_cbs;
ASSERT(size >= sizeof(*channel));
- ASSERT(channel_cbs->config_socket && channel_cbs->disconnect && handle_message &&
+ ASSERT(channel_cbs->config_socket && channel_cbs->on_disconnect && handle_message &&
channel_cbs->alloc_recv_buf && channel_cbs->release_item);
channel = spice_malloc0(size);
+ channel->type = type;
+ channel->id = id;
channel->handle_acks = handle_acks;
- channel->channel_cbs.disconnect = channel_cbs->disconnect;
+ channel->channel_cbs.on_disconnect = channel_cbs->on_disconnect;
channel->channel_cbs.send_item = channel_cbs->send_item;
channel->channel_cbs.release_item = channel_cbs->release_item;
channel->channel_cbs.hold_item = channel_cbs->hold_item;
@@ -484,8 +487,53 @@ RedChannel *red_channel_create(int size,
channel->thread_id = pthread_self();
- channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
channel->out_bytes_counter = 0;
+
+ return channel;
+}
+
+// TODO: red_worker can use this one
+static void dummy_watch_update_mask(SpiceWatch *watch, int event_mask)
+{
+}
+
+static SpiceWatch *dummy_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque)
+{
+ return NULL; // apparently allowed?
+}
+
+static void dummy_watch_remove(SpiceWatch *watch)
+{
+}
+
+// TODO: actually, since I also use channel_client_dummym, no need for core. Can be NULL
+SpiceCoreInterface dummy_core = {
+ .watch_update_mask = dummy_watch_update_mask,
+ .watch_add = dummy_watch_add,
+ .watch_remove = dummy_watch_remove,
+};
+
+RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id)
+{
+ RedChannel *channel;
+ ClientCbs client_cbs;
+
+ ASSERT(size >= sizeof(*channel));
+ channel = spice_malloc0(size);
+ channel->type = type;
+ channel->id = id;
+ channel->core = &dummy_core;
+ ring_init(&channel->clients);
+ client_cbs.connect = red_channel_client_default_connect;
+ client_cbs.disconnect = red_channel_client_default_disconnect;
+ client_cbs.migrate = red_channel_client_default_migrate;
+
+ red_channel_register_client_cbs(channel, &client_cbs);
+
+ channel->thread_id = pthread_self();
+
+ channel->out_bytes_counter = 0;
+
return channel;
}
@@ -496,26 +544,22 @@ static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *hea
RedChannel *red_channel_create_parser(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
spice_parse_channel_func_t parser,
- channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc incoming_error,
- channel_on_outgoing_error_proc outgoing_error,
+ channel_handle_parsed_proc handle_parsed,
ChannelCbs *channel_cbs)
{
- RedChannel *channel = red_channel_create(size,
- core, migrate, handle_acks, do_nothing_handle_message,
- channel_cbs);
+ RedChannel *channel = red_channel_create(size, core, type, id,
+ migrate, handle_acks,
+ do_nothing_handle_message,
+ channel_cbs);
if (channel == NULL) {
return NULL;
}
channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
channel->incoming_cb.parser = parser;
- channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
- channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
- channel->on_incoming_error = incoming_error;
- channel->on_outgoing_error = outgoing_error;
return channel;
}
@@ -533,10 +577,27 @@ void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs)
}
}
+void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps)
+{
+ channel->num_caps = num_caps;
+ channel->caps = caps;
+}
+
+void red_channel_set_data(RedChannel *channel, void *data)
+{
+ ASSERT(channel);
+ channel->data = data;
+}
+
void red_channel_client_destroy(RedChannelClient *rcc)
{
- red_channel_client_disconnect(rcc);
- spice_marshaller_destroy(rcc->send_data.marshaller);
+ if (red_channel_client_is_connected(rcc)) {
+ red_channel_client_disconnect(rcc);
+ }
+ red_client_remove_channel(rcc);
+ if (rcc->send_data.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.marshaller);
+ }
free(rcc);
}
@@ -548,11 +609,13 @@ void red_channel_destroy(RedChannel *channel)
if (!channel) {
return;
}
- red_channel_pipes_clear(channel);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_destroy(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
+ if (channel->caps) {
+ free(channel->caps);
+ }
free(channel);
}
@@ -563,21 +626,7 @@ void red_channel_client_shutdown(RedChannelClient *rcc)
rcc->stream->watch = NULL;
shutdown(rcc->stream->socket, SHUT_RDWR);
rcc->stream->shutdown = TRUE;
- rcc->incoming.shut = TRUE;
}
- red_channel_client_release_sent_item(rcc);
-}
-
-void red_channel_shutdown(RedChannel *channel)
-{
- RingItem *link;
- RingItem *next;
-
- red_printf("%d", channel->clients_num);
- RING_FOREACH_SAFE(link, next, &channel->clients) {
- red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
- }
- red_channel_pipes_clear(channel);
}
void red_channel_client_send(RedChannelClient *rcc)
@@ -650,11 +699,7 @@ void red_channel_push(RedChannel *channel)
}
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- if (rcc->stream == NULL) {
- rcc->channel->channel_cbs.disconnect(rcc);
- } else {
- red_channel_client_push(rcc);
- }
+ red_channel_client_push(rcc);
}
}
@@ -861,25 +906,9 @@ int red_channel_client_is_connected(RedChannelClient *rcc)
return rcc->stream != NULL;
}
-void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
-{
- ring_remove(&item->link);
-}
-
int red_channel_is_connected(RedChannel *channel)
{
- RingItem *link;
-
- if (!channel || channel->clients_num == 0) {
- return FALSE;
- }
- RING_FOREACH(link, &channel->clients) {
- if (red_channel_client_is_connected(
- SPICE_CONTAINEROF(link, RedChannelClient, channel_link))) {
- return TRUE;
- }
- }
- return FALSE;
+ return channel && (channel->clients_num > 0);
}
void red_channel_client_clear_sent_item(RedChannelClient *rcc)
@@ -906,21 +935,6 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc)
rcc->pipe_size = 0;
}
-void red_channel_pipes_clear(RedChannel *channel)
-{
- RingItem *link;
- RingItem *next;
- RedChannelClient *rcc;
-
- if (!channel) {
- return;
- }
- RING_FOREACH_SAFE(link, next, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- red_channel_client_pipe_clear(rcc);
- }
-}
-
void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
{
rcc->ack_data.messages_window = 0;
@@ -931,27 +945,37 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_
rcc->ack_data.client_window = client_window;
}
-static void red_channel_client_remove(RedChannelClient *rcc)
+
+static void red_channel_remove_client(RedChannelClient *rcc)
{
- ring_remove(&rcc->client_link);
- rcc->client->channels_num--;
+ ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id));
ring_remove(&rcc->channel_link);
rcc->channel->clients_num--;
+ // TODO: should we set rcc->channel to NULL???
}
-void red_channel_client_disconnect(RedChannelClient *rcc)
+static void red_client_remove_channel(RedChannelClient *rcc)
{
- red_printf("%p (channel %p)", rcc, rcc->channel);
+ pthread_mutex_lock(&rcc->client->lock);
+ ring_remove(&rcc->client_link);
+ rcc->client->channels_num--;
+ pthread_mutex_unlock(&rcc->client->lock);
+}
- if (rcc->send_data.item) {
- rcc->channel->channel_cbs.release_item(rcc, rcc->send_data.item, FALSE);
+void red_channel_client_disconnect(RedChannelClient *rcc)
+{
+ red_printf("%p (channel %p type %d id %d)", rcc, rcc->channel,
+ rcc->channel->type, rcc->channel->id);
+ if (!red_channel_client_is_connected(rcc)) {
+ return;
}
red_channel_client_pipe_clear(rcc);
reds_stream_free(rcc->stream);
- rcc->send_data.item = NULL;
- rcc->send_data.blocked = FALSE;
- rcc->send_data.size = 0;
- red_channel_client_remove(rcc);
+ rcc->stream = NULL;
+ red_channel_remove_client(rcc);
+ // TODO: not do it till destroyed?
+// red_channel_client_remove(rcc);
+ rcc->channel->channel_cbs.on_disconnect(rcc);
}
void red_channel_disconnect(RedChannel *channel)
@@ -959,27 +983,12 @@ void red_channel_disconnect(RedChannel *channel)
RingItem *link;
RingItem *next;
- red_channel_pipes_clear(channel);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_disconnect(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
}
-int red_channel_all_clients_serials_are_zero(RedChannel *channel)
-{
- RingItem *link;
- RedChannelClient *rcc;
-
- RING_FOREACH(link, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- if (rcc->send_data.serial != 0) {
- return FALSE;
- }
- }
- return TRUE;
-}
-
void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb)
{
RingItem *link;
@@ -1004,17 +1013,6 @@ void red_channel_apply_clients_data(RedChannel *channel, channel_client_callback
}
}
-void red_channel_set_shut(RedChannel *channel)
-{
- RingItem *link;
- RedChannelClient *rcc;
-
- RING_FOREACH(link, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- rcc->incoming.shut = TRUE;
- }
-}
-
int red_channel_all_blocked(RedChannel *channel)
{
RingItem *link;
@@ -1143,18 +1141,24 @@ RedClient *red_client_new()
client = spice_malloc0(sizeof(RedClient));
ring_init(&client->channels);
+ pthread_mutex_init(&client->lock, NULL);
client->thread_id = pthread_self();
return client;
}
-void red_client_shutdown(RedClient *client)
+void red_client_migrate(RedClient *client)
{
RingItem *link, *next;
+ RedChannelClient *rcc;
- red_printf("#channels %d", client->channels_num);
+ red_printf("migrate client with #channels %d", client->channels_num);
+ ASSERT(pthread_equal(pthread_self(), client->thread_id));
RING_FOREACH_SAFE(link, next, &client->channels) {
- red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link));
+ rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+ if (red_channel_client_is_connected(rcc)) {
+ rcc->channel->client_cbs.migrate(rcc);
+ }
}
}
@@ -1175,29 +1179,23 @@ void red_client_destroy(RedClient *client)
// TODO: should we go back to async. For this we need to use
// ref count for channel clients.
rcc->channel->client_cbs.disconnect(rcc);
+ ASSERT(ring_is_empty(&rcc->pipe));
+ ASSERT(rcc->pipe_size == 0);
+ ASSERT(rcc->send_data.size == 0);
+ red_channel_client_destroy(rcc);
}
- free(client);
-}
-void red_client_disconnect(RedClient *client)
-{
- RingItem *link, *next;
- RedChannelClient *rcc;
-
- red_printf("#channels %d", client->channels_num);
- RING_FOREACH_SAFE(link, next, &client->channels) {
- // some channels may be in other threads, so disconnection
- // is not synchronous.
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
- rcc->channel->client_cbs.disconnect(rcc);
- }
+ pthread_mutex_destroy(&client->lock);
+ free(client);
}
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
{
ASSERT(rcc && client);
+ pthread_mutex_lock(&client->lock);
ring_add(&client->channels, &rcc->client_link);
client->channels_num++;
+ pthread_mutex_unlock(&client->lock);
}
MainChannelClient *red_client_get_main(RedClient *client) {
diff --git a/server/red_channel.h b/server/red_channel.h
index cb33fcb1..daee8bae 100644
--- a/server/red_channel.h
+++ b/server/red_channel.h
@@ -24,9 +24,9 @@
#include "red_common.h"
#include <pthread.h>
-#include "reds.h"
#include "spice.h"
#include "ring.h"
+#include "common/marshaller.h"
#include "server/demarshallers.h"
#define MAX_SEND_BUFS 1000
@@ -62,7 +62,6 @@ typedef struct IncomingHandler {
uint32_t header_pos;
uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
uint32_t msg_pos;
- int shut; // came here from inputs_channel. Not sure if it is really required or can be removed. XXX
} IncomingHandler;
typedef int (*get_outgoing_msg_size_proc)(void *opaque);
@@ -98,8 +97,11 @@ typedef struct BufDescriptor {
uint8_t *data;
} BufDescriptor;
+typedef struct RedsStream RedsStream;
typedef struct RedChannel RedChannel;
typedef struct RedChannelClient RedChannelClient;
+typedef struct RedClient RedClient;
+typedef struct MainChannelClient MainChannelClient;
/* Messages handled by red_channel
* SET_ACK - sent to client on channel connection
@@ -154,7 +156,7 @@ typedef void (*channel_client_migrate_proc)(RedChannelClient *base);
*/
typedef struct {
channel_configure_socket_proc config_socket;
- channel_disconnect_proc disconnect;
+ channel_disconnect_proc on_disconnect;
channel_send_pipe_item_proc send_item;
channel_hold_pipe_item_proc hold_item;
channel_release_pipe_item_proc release_item;
@@ -207,10 +209,20 @@ struct RedChannelClient {
};
struct RedChannel {
+ uint32_t type;
+ uint32_t id;
+
SpiceCoreInterface *core;
int migrate;
int handle_acks;
+ // RedChannel will hold only connected channel clients (logic - when pushing pipe item to all channel clients, there
+ // is no need to go over disconnect clients)
+ // . While client will hold the channel clients till it is destroyed
+ // and then it will destroy them as well.
+ // However RCC still holds a reference to the Channel.
+ // Maybe replace these logic with ref count?
+ // TODO: rename to 'connected_clients'?
Ring clients;
uint32_t clients_num;
@@ -220,11 +232,10 @@ struct RedChannel {
ChannelCbs channel_cbs;
ClientCbs client_cbs;
- /* Stuff below added for Main and Inputs channels switch to RedChannel
- * (might be removed later) */
- channel_on_incoming_error_proc on_incoming_error; /* alternative to disconnect */
- channel_on_outgoing_error_proc on_outgoing_error;
- int shut; /* signal channel is to be closed */
+ int num_caps;
+ uint32_t *caps;
+
+ void *data;
// TODO: when different channel_clients are in different threads from Channel -> need to protect!
pthread_t thread_id;
@@ -237,6 +248,7 @@ struct RedChannel {
* explicitly destroy the channel */
RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
channel_handle_message_proc handle_message,
ChannelCbs *channel_cbs);
@@ -245,21 +257,39 @@ RedChannel *red_channel_create(int size,
* will become default eventually */
RedChannel *red_channel_create_parser(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
spice_parse_channel_func_t parser,
channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc incoming_error,
- channel_on_outgoing_error_proc outgoing_error,
ChannelCbs *channel_cbs);
void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs);
+// caps are freed when the channel is destroyed
+void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps);
+void red_channel_set_data(RedChannel *channel, void *data);
RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
RedsStream *stream);
+// TODO: tmp, for channels that don't use RedChannel yet (e.g., snd channel), but
+// do use the client callbacks. So the channel clients are not connected (the channel doesn't
+// have list of them, but they do have a link to the channel, and the client has a list of them)
+RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id);
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client);
+
+
int red_channel_is_connected(RedChannel *channel);
int red_channel_client_is_connected(RedChannelClient *rcc);
+/*
+ * the disconnect callback is called from the channel's thread,
+ * i.e., for display channels - red worker thread, for all the other - from the main thread.
+ * RedClient is managed from the main thread. red_channel_client_destroy can be called only
+ * from red_client_destroy.
+ */
+
void red_channel_client_destroy(RedChannelClient *rcc);
void red_channel_destroy(RedChannel *channel);
@@ -267,7 +297,6 @@ void red_channel_destroy(RedChannel *channel);
* thread. It will not touch the rings, just shutdown the socket.
* It should be followed by some way to gurantee a disconnection. */
void red_channel_client_shutdown(RedChannelClient *rcc);
-void red_channel_shutdown(RedChannel *channel);
/* should be called when a new channel is ready to send messages */
void red_channel_init_outgoing_messages_window(RedChannel *channel);
@@ -276,9 +305,6 @@ void red_channel_init_outgoing_messages_window(RedChannel *channel);
int red_channel_client_handle_message(RedChannelClient *rcc, uint32_t size,
uint16_t type, void *message);
-/* default error handler that disconnects channel */
-void red_channel_client_default_peer_on_error(RedChannelClient *rcc);
-
/* when preparing send_data: should call init and then use marshaller */
void red_channel_client_init_send_data(RedChannelClient *rcc, uint16_t msg_type, PipeItem *item);
@@ -303,7 +329,6 @@ void red_channel_client_pipe_add_push(RedChannelClient *rcc, PipeItem *item);
void red_channel_client_pipe_add(RedChannelClient *rcc, PipeItem *item);
void red_channel_client_pipe_add_after(RedChannelClient *rcc, PipeItem *item, PipeItem *pos);
int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc, PipeItem *item);
-void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
void red_channel_client_pipe_remove_and_release(RedChannelClient *rcc, PipeItem *item);
void red_channel_client_pipe_add_tail(RedChannelClient *rcc, PipeItem *item);
/* for types that use this routine -> the pipe item should be freed */
@@ -315,9 +340,6 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_
void red_channel_client_push_set_ack(RedChannelClient *rcc);
void red_channel_push_set_ack(RedChannel *channel);
-/* TODO: This sets all clients to shut state - probably we want to close per channel */
-void red_channel_shutdown(RedChannel *channel);
-
int red_channel_get_first_socket(RedChannel *channel);
/* return TRUE if all of the connected clients to this channel are blocked */
@@ -354,7 +376,6 @@ void red_channel_client_push(RedChannelClient *rcc);
// TODO: again - what is the context exactly? this happens in channel disconnect. but our
// current red_channel_shutdown also closes the socket - is there a socket to close?
// are we reading from an fd here? arghh
-void red_channel_pipes_clear(RedChannel *channel);
void red_channel_client_pipe_clear(RedChannelClient *rcc);
// Again, used in various places outside of event handler context (or in other event handler
// contexts):
@@ -403,17 +424,22 @@ struct RedClient {
RingItem link;
Ring channels;
int channels_num;
- int disconnecting;
MainChannelClient *mcc;
+ pthread_mutex_t lock; // different channels can be in different threads
pthread_t thread_id;
+
+ int disconnecting;
};
RedClient *red_client_new();
MainChannelClient *red_client_get_main(RedClient *client);
+// main should be set once before all the other channels are created
void red_client_set_main(RedClient *client, MainChannelClient *mcc);
+
+
+void red_client_migrate(RedClient *client);
+// disconnects all the client's channels (should be called from the client's thread)
void red_client_destroy(RedClient *client);
-void red_client_disconnect(RedClient *client);
-void red_client_remove_channel(RedClient *client, RedChannelClient *rcc);
#endif
diff --git a/server/red_dispatcher.c b/server/red_dispatcher.c
index 8cbdec9d..c00dc580 100644
--- a/server/red_dispatcher.c
+++ b/server/red_dispatcher.c
@@ -76,10 +76,10 @@ extern spice_wan_compression_t zlib_glz_state;
static RedDispatcher *dispatchers = NULL;
-static void red_dispatcher_set_peer(Channel *channel, RedClient *client,
- RedsStream *stream, int migration,
- int num_common_caps, uint32_t *common_caps, int num_caps,
- uint32_t *caps)
+static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *client,
+ RedsStream *stream, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
{
RedDispatcher *dispatcher;
@@ -92,23 +92,41 @@ static void red_dispatcher_set_peer(Channel *channel, RedClient *client,
send_data(dispatcher->channel, &migration, sizeof(int));
}
-static void red_dispatcher_shutdown_peer(Channel *channel)
+static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
+ RedDispatcher *dispatcher;
+
+ if (!rcc->channel) {
+ return;
+ }
+
+ dispatcher = (RedDispatcher *)rcc->channel->data;
+
red_printf("");
RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_DISCONNECT;
write_message(dispatcher->channel, &message);
+ send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+
+ // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count
+ // for channels
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
}
-static void red_dispatcher_migrate(Channel *channel)
+static void red_dispatcher_display_migrate(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
- red_printf("channel type %u id %u", channel->type, channel->id);
+ RedDispatcher *dispatcher;
+ if (!rcc->channel) {
+ return;
+ }
+ dispatcher = (RedDispatcher *)rcc->channel->data;
+ red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_MIGRATE;
write_message(dispatcher->channel, &message);
+ send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
}
-static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client, RedsStream *stream,
+static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
int migration, int num_common_caps,
uint32_t *common_caps, int num_caps,
uint32_t *caps)
@@ -122,18 +140,33 @@ static void red_dispatcher_set_cursor_peer(Channel *channel, RedClient *client,
send_data(dispatcher->channel, &migration, sizeof(int));
}
-static void red_dispatcher_shutdown_cursor_peer(Channel *channel)
+static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
+ RedDispatcher *dispatcher;
+
+ if (!rcc->channel) {
+ return;
+ }
+
+ dispatcher = (RedDispatcher *)rcc->channel->data;
red_printf("");
RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_DISCONNECT;
write_message(dispatcher->channel, &message);
+ send_data(dispatcher->channel, &rcc, sizeof(RedChannelClient *));
+
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
}
-static void red_dispatcher_cursor_migrate(Channel *channel)
+static void red_dispatcher_cursor_migrate(RedChannelClient *rcc)
{
- RedDispatcher *dispatcher = (RedDispatcher *)channel->data;
- red_printf("channel type %u id %u", channel->type, channel->id);
+ RedDispatcher *dispatcher;
+
+ if (!rcc->channel) {
+ return;
+ }
+ dispatcher = (RedDispatcher *)rcc->channel->data;
+ red_printf("channel type %u id %u", rcc->channel->type, rcc->channel->id);
RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_MIGRATE;
write_message(dispatcher->channel, &message);
}
@@ -591,36 +624,6 @@ static void qxl_worker_loadvm_commands(QXLWorker *qxl_worker,
red_dispatcher_loadvm_commands((RedDispatcher*)qxl_worker, ext, count);
}
-static void red_dispatcher_send_disconnect(RedDispatcher *dispatcher,
- struct RedChannelClient *rcc, RedWorkerMessage message)
-{
- write_message(dispatcher->channel, &message);
- send_data(dispatcher->channel, &rcc, sizeof(struct RedChannelClient *));
-}
-
-void red_dispatcher_disconnect_display_client(RedDispatcher *dispatcher,
- struct RedChannelClient *rcc)
-{
- RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
-
- red_dispatcher_send_disconnect(dispatcher, rcc,
- RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
-}
-
-void red_dispatcher_disconnect_cursor_client(RedDispatcher *dispatcher,
- struct RedChannelClient *rcc)
-{
- RedWorkerMessage message = RED_WORKER_MESSAGE_STOP;
-
- red_dispatcher_send_disconnect(dispatcher, rcc,
- RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT);
- read_message(dispatcher->channel, &message);
- ASSERT(message == RED_WORKER_MESSAGE_READY);
-}
-
-
void red_dispatcher_set_mm_time(uint32_t mm_time)
{
RedDispatcher *now = dispatchers;
@@ -867,6 +870,32 @@ void red_dispatcher_async_complete(struct RedDispatcher *dispatcher, uint64_t co
dispatcher->qxl->st->qif->async_complete(dispatcher->qxl, cookie);
}
+static RedChannel *red_dispatcher_display_channel_create(RedDispatcher *dispatcher)
+{
+ RedWorkerMessage message = RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE;
+ RedChannel *display_channel;
+
+ write_message(dispatcher->channel, &message);
+
+ receive_data(dispatcher->channel, &display_channel, sizeof(RedChannel *));
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
+ return display_channel;
+}
+
+static RedChannel *red_dispatcher_cursor_channel_create(RedDispatcher *dispatcher)
+{
+ RedWorkerMessage message = RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE;
+ RedChannel *cursor_channel;
+
+ write_message(dispatcher->channel, &message);
+
+ receive_data(dispatcher->channel, &cursor_channel, sizeof(RedChannel *));
+ read_message(dispatcher->channel, &message);
+ ASSERT(message == RED_WORKER_MESSAGE_READY);
+ return cursor_channel;
+}
+
RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
{
RedDispatcher *dispatcher;
@@ -875,10 +904,11 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
WorkerInitData init_data;
QXLDevInitInfo init_info;
int r;
- Channel *reds_channel;
- Channel *cursor_channel;
+ RedChannel *display_channel;
+ RedChannel *cursor_channel;
sigset_t thread_sig_mask;
sigset_t curr_sig_mask;
+ ClientCbs client_cbs = {0,};
quic_init();
sw_canvas_init();
@@ -950,23 +980,28 @@ RedDispatcher *red_dispatcher_init(QXLInstance *qxl)
read_message(dispatcher->channel, &message);
ASSERT(message == RED_WORKER_MESSAGE_READY);
- reds_channel = spice_new0(Channel, 1);
- reds_channel->type = SPICE_CHANNEL_DISPLAY;
- reds_channel->id = qxl->id;
- reds_channel->link = red_dispatcher_set_peer;
- reds_channel->shutdown = red_dispatcher_shutdown_peer;
- reds_channel->migrate = red_dispatcher_migrate;
- reds_channel->data = dispatcher;
- reds_register_channel(reds_channel);
-
- cursor_channel = spice_new0(Channel, 1);
- cursor_channel->type = SPICE_CHANNEL_CURSOR;
- cursor_channel->id = qxl->id;
- cursor_channel->link = red_dispatcher_set_cursor_peer;
- cursor_channel->shutdown = red_dispatcher_shutdown_cursor_peer;
- cursor_channel->migrate = red_dispatcher_cursor_migrate;
- cursor_channel->data = dispatcher;
- reds_register_channel(cursor_channel);
+ display_channel = red_dispatcher_display_channel_create(dispatcher);
+
+ if (display_channel) {
+ client_cbs.connect = red_dispatcher_set_display_peer;
+ client_cbs.disconnect = red_dispatcher_disconnect_display_peer;
+ client_cbs.migrate = red_dispatcher_display_migrate;
+ red_channel_register_client_cbs(display_channel, &client_cbs);
+ red_channel_set_data(display_channel, dispatcher);
+ reds_register_channel(display_channel);
+ }
+
+ cursor_channel = red_dispatcher_cursor_channel_create(dispatcher);
+
+ if (cursor_channel) {
+ client_cbs.connect = red_dispatcher_set_cursor_peer;
+ client_cbs.disconnect = red_dispatcher_disconnect_cursor_peer;
+ client_cbs.migrate = red_dispatcher_cursor_migrate;
+ red_channel_register_client_cbs(cursor_channel, &client_cbs);
+ red_channel_set_data(cursor_channel, dispatcher);
+ reds_register_channel(cursor_channel);
+ }
+
qxl->st->qif->attache_worker(qxl, &dispatcher->base);
qxl->st->qif->set_compression_level(qxl, calc_compression_level());
diff --git a/server/red_dispatcher.h b/server/red_dispatcher.h
index 07a95aec..144a40e2 100644
--- a/server/red_dispatcher.h
+++ b/server/red_dispatcher.h
@@ -32,9 +32,4 @@ uint32_t red_dispatcher_qxl_ram_size(void);
int red_dispatcher_qxl_count(void);
void red_dispatcher_async_complete(struct RedDispatcher*, uint64_t);
-void red_dispatcher_disconnect_display_client(struct RedDispatcher *dispatcher,
- struct RedChannelClient *rcc);
-void red_dispatcher_disconnect_cursor_client(struct RedDispatcher *dispatcher,
- struct RedChannelClient *rcc);
-
#endif
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
index 79cb8546..6c36fecb 100644
--- a/server/red_tunnel_worker.c
+++ b/server/red_tunnel_worker.c
@@ -537,8 +537,8 @@ typedef struct TunnelPrintService {
} TunnelPrintService;
struct TunnelWorker {
- Channel channel_interface; // for reds
- TunnelChannelClient *channel;
+ RedChannel *channel;
+ TunnelChannelClient *channel_client;
SpiceCoreInterface *core_interface;
SpiceNetWireInstance *sin;
@@ -564,7 +564,7 @@ struct TunnelWorker {
/*********************************************************************
* Tunnel interface
*********************************************************************/
-static void tunnel_channel_disconnect(RedChannel *channel);
+static void tunnel_channel_on_disconnect(RedChannel *channel);
/* networking interface for slirp */
static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface);
@@ -601,23 +601,23 @@ static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface,
static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms);
-/* reds interface */
-static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
+/* RedChannel interface */
+
+static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client,
RedsStream *stream, int migration,
int num_common_caps,
uint32_t *common_caps, int num_caps,
uint32_t *caps);
-static void handle_tunnel_channel_shutdown(struct Channel *channel);
-static void handle_tunnel_channel_migrate(struct Channel *channel);
-
+static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc);
+static void red_tunnel_channel_create(TunnelWorker *worker);
static void tunnel_shutdown(TunnelWorker *worker)
{
int i;
red_printf("");
/* shutdown input from channel */
- if (worker->channel) {
- red_channel_shutdown(worker->channel->base.channel);
+ if (worker->channel_client) {
+ red_channel_client_shutdown(&worker->channel_client->base);
}
/* shutdown socket pipe items */
@@ -745,7 +745,7 @@ static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_
--sckt->in_data.num_buffers;
__tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf);
++sckt->in_data.num_tokens;
- __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+ __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
}
static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker,
@@ -973,7 +973,7 @@ SPICE_GNUC_VISIBLE void spice_server_net_wire_recv_packet(SpiceNetWireInstance *
TunnelWorker *worker = sin->st->worker;
ASSERT(worker);
- if (worker->channel && worker->channel->base.channel->migrate) {
+ if (worker->channel_client && worker->channel_client->base.channel->migrate) {
return; // during migration and the tunnel state hasn't been restored yet.
}
@@ -1016,15 +1016,9 @@ void *red_tunnel_attach(SpiceCoreInterface *core_interface,
worker->null_interface.worker = worker;
- worker->channel_interface.type = SPICE_CHANNEL_TUNNEL;
- worker->channel_interface.id = 0;
- worker->channel_interface.link = handle_tunnel_channel_link;
- worker->channel_interface.shutdown = handle_tunnel_channel_shutdown;
- worker->channel_interface.migrate = handle_tunnel_channel_migrate;
- worker->channel_interface.data = worker;
+ red_tunnel_channel_create(worker);
- ring_init(&worker->services);
- reds_register_channel(&worker->channel_interface);
+ ring_init(&worker->services);
net_slirp_init(worker->sif->get_ip(worker->sin),
TRUE,
@@ -1096,7 +1090,7 @@ static inline TunnelService *__tunnel_worker_add_service(TunnelWorker *worker, u
#endif
if (!virt_ip) {
new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP;
- red_channel_client_pipe_add(&worker->channel->base, &new_service->pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &new_service->pipe_item);
}
return new_service;
@@ -1292,24 +1286,24 @@ static RedSocket *tunnel_worker_create_socket(TunnelWorker *worker, uint16_t loc
static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt)
{
- if (worker->channel) {
- if (red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ if (worker->channel_client) {
+ if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.data_pipe_item)) {
- red_channel_client_pipe_remove_and_release(&worker->channel->base,
+ red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
&sckt->out_data.data_pipe_item);
return;
}
- if (red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.status_pipe_item)) {
- red_channel_client_pipe_remove_and_release(&worker->channel->base,
+ red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
&sckt->out_data.status_pipe_item);
return;
}
- if (red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ if (red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.token_pipe_item)) {
- red_channel_client_pipe_remove_and_release(&worker->channel->base,
+ red_channel_client_pipe_remove_and_release(&worker->channel_client->base,
&sckt->out_data.token_pipe_item);
return;
}
@@ -1631,7 +1625,7 @@ static inline int __client_socket_can_receive(RedSocket *sckt)
{
return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) &&
- !sckt->worker->channel->mig_inprogress);
+ !sckt->worker->channel_client->mig_inprogress);
}
static int tunnel_channel_handle_socket_token(TunnelChannelClient *channel, RedSocket *sckt,
@@ -1870,7 +1864,7 @@ static void restored_rcv_buf_release(RawTunneledBuffer *buf)
--sckt->in_data.num_buffers;
__tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf);
// for case that ready queue is empty and the client has no tokens
- __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+ __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
}
RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt)
@@ -1889,7 +1883,7 @@ static void restore_tokens_buf_release(RawTunneledBuffer *buf)
RedSocket *sckt = (RedSocket *)buf->usr_opaque;
sckt->in_data.num_tokens += tokens_buf->num_tokens;
- __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+ __process_rcv_buf_tokens(sckt->worker->channel_client, sckt);
free(tokens_buf);
}
@@ -2182,7 +2176,7 @@ static uint64_t tunnel_channel_handle_migrate_data_get_serial(RedChannelClient *
static uint64_t tunnel_channel_handle_migrate_data(RedChannelClient *base,
uint32_t size, void *msg)
{
- TunnelChannelClient *channel = SPICE_CONTAINEROF(base->channel, TunnelChannelClient, base);
+ TunnelChannelClient *channel = SPICE_CONTAINEROF(base, TunnelChannelClient, base);
TunnelMigrateSocketList *sockets_list;
TunnelMigrateServicesList *services_list;
TunnelMigrateData *migrate_data = msg;
@@ -2790,22 +2784,22 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem
sckt_out_data->push_tail = NULL;
sckt_out_data->push_tail_size = 0;
- if (worker->channel) {
+ if (worker->channel_client) {
// can still send data to socket
if (__client_socket_can_receive(sckt)) {
if (sckt_out_data->ready_chunks_queue.head) {
// the pipe item may already be linked, if for example the send was
// blocked and before it finished and called release, tunnel_socket_send was called
if (!red_channel_client_pipe_item_is_linked(
- &worker->channel->base, &sckt_out_data->data_pipe_item)) {
+ &worker->channel_client->base, &sckt_out_data->data_pipe_item)) {
sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
- red_channel_client_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &sckt_out_data->data_pipe_item);
}
} else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) ||
(sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
- __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt);
} else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
- __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt);
}
}
}
@@ -2813,7 +2807,7 @@ static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem
if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
(sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) &&
- !sckt->in_slirp_send && !worker->channel->mig_inprogress) {
+ !sckt->in_slirp_send && !worker->channel_client->mig_inprogress) {
// for cases that slirp couldn't write whole it data to our socket buffer
net_slirp_socket_can_send_notify(sckt->slirp_sckt);
}
@@ -2935,8 +2929,8 @@ static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
red_printf("TUNNEL_DBG");
#endif
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
- ASSERT(worker->channel);
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(worker->channel_client);
+ ASSERT(!worker->channel_client->mig_inprogress);
far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port));
@@ -2964,7 +2958,7 @@ static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
#endif
*o_usr_s = sckt;
sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN;
- red_channel_client_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.status_pipe_item);
errno = EINPROGRESS;
return -1;
@@ -2989,7 +2983,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
sckt = (RedSocket *)opaque;
@@ -3014,7 +3008,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
}
if (urgent) {
- SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported");
+ SET_TUNNEL_ERROR(worker->channel_client, "urgent msgs not supported");
tunnel_shutdown(worker);
errno = ECONNRESET;
return -1;
@@ -3037,7 +3031,7 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
red_printf("socket out buffers overflow, socket will be closed"
" (local_port=%d, service_id=%d)",
ntohs(sckt->local_port), sckt->far_service->id);
- tunnel_socket_force_close(worker->channel, sckt);
+ tunnel_socket_force_close(worker->channel_client, sckt);
size_to_send = 0;
} else {
size_to_send = len;
@@ -3050,10 +3044,10 @@ static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocke
sckt->out_data.data_size += size_to_send;
if (sckt->out_data.ready_chunks_queue.head &&
- !red_channel_client_pipe_item_is_linked(&worker->channel->base,
+ !red_channel_client_pipe_item_is_linked(&worker->channel_client->base,
&sckt->out_data.data_pipe_item)) {
sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
- red_channel_client_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item);
+ red_channel_client_pipe_add(&worker->channel_client->base, &sckt->out_data.data_pipe_item);
}
}
@@ -3093,7 +3087,7 @@ static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocke
ASSERT(opaque);
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
sckt = (RedSocket *)opaque;
@@ -3104,14 +3098,14 @@ static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocke
if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) ||
(sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
- SET_TUNNEL_ERROR(worker->channel, "receive was shutdown");
+ SET_TUNNEL_ERROR(worker->channel_client, "receive was shutdown");
tunnel_shutdown(worker);
errno = ECONNRESET;
return -1;
}
if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
- SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected");
+ SET_TUNNEL_ERROR(worker->channel_client, "slirp socket not connected");
tunnel_shutdown(worker);
errno = ECONNRESET;
return -1;
@@ -3177,7 +3171,7 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
#ifdef DEBUG_NETWORK
PRINT_SCKT(sckt);
#endif
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
return;
@@ -3189,7 +3183,7 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND);
sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
} else {
- SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
+ SET_TUNNEL_ERROR(worker->channel_client, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
sckt->slirp_status);
tunnel_shutdown(worker);
return;
@@ -3200,11 +3194,11 @@ static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
// check if there is still data to send. the fin will be sent after data is released
// channel is alive, otherwise the sockets would have been aborted
if (!sckt->out_data.ready_chunks_queue.head) {
- __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_fin_to_pipe(worker->channel_client, sckt);
}
} else { // if client is closed, it means the connection was aborted since we didn't
// received fin from guest
- SET_TUNNEL_ERROR(worker->channel,
+ SET_TUNNEL_ERROR(worker->channel_client,
"unexpected tunnel_socket_shutdown_send client_status=%d",
sckt->client_status);
tunnel_shutdown(worker);
@@ -3229,12 +3223,12 @@ static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
#ifdef DEBUG_NETWORK
PRINT_SCKT(sckt);
#endif
- ASSERT(!worker->channel->mig_inprogress);
+ ASSERT(!worker->channel_client->mig_inprogress);
/* failure in recv can happen after the client sckt was shutdown
(after client sent FIN, or after slirp sent FIN and client socket was closed */
if (!__should_send_fin_to_guest(sckt)) {
- SET_TUNNEL_ERROR(worker->channel,
+ SET_TUNNEL_ERROR(worker->channel_client,
"unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d",
sckt->client_status, sckt->slirp_status);
tunnel_shutdown(worker);
@@ -3246,7 +3240,7 @@ static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
} else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) {
sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
} else {
- SET_TUNNEL_ERROR(worker->channel,
+ SET_TUNNEL_ERROR(worker->channel_client,
"unexpected tunnel_socket_shutdown_recv slirp_status=%d",
sckt->slirp_status);
tunnel_shutdown(worker);
@@ -3302,11 +3296,11 @@ static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSoc
// check if there is still data to send. the close will be sent after data is released.
// close may already been pushed if it is a forced close
if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) {
- __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_close_to_pipe(worker->channel_client, sckt);
}
} else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) {
if (sckt->client_waits_close_ack) {
- __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt);
+ __tunnel_socket_add_close_ack_to_pipe(worker->channel_client, sckt);
} else {
tunnel_worker_free_socket(worker, sckt);
}
@@ -3333,12 +3327,12 @@ static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer,
worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
#ifdef DEBUG_NETWORK
- if (!worker->channel) {
+ if (!worker->channel_client) {
red_printf("channel not connected");
}
#endif
- if (worker->channel && worker->channel->mig_inprogress) {
- SET_TUNNEL_ERROR(worker->channel, "during migration");
+ if (worker->channel_client && worker->channel_client->mig_inprogress) {
+ SET_TUNNEL_ERROR(worker->channel_client, "during migration");
tunnel_shutdown(worker);
return;
}
@@ -3398,27 +3392,25 @@ static void tunnel_worker_disconnect_slirp(TunnelWorker *worker)
/* don't call disconnect from functions that might be called by slirp
since it closes all its sockets and slirp is not aware of it */
-static void tunnel_channel_disconnect(RedChannel *channel)
+static void tunnel_channel_on_disconnect(RedChannel *channel)
{
- TunnelChannelClient *tunnel_channel = (TunnelChannelClient *)channel;
TunnelWorker *worker;
if (!channel) {
return;
}
red_printf("");
- worker = tunnel_channel->worker;
+ worker = (TunnelWorker *)channel->data;
tunnel_worker_disconnect_slirp(worker);
tunnel_worker_clear_routed_network(worker);
- red_channel_destroy(channel);
- worker->channel = NULL;
+ worker->channel_client = NULL;
}
// TODO - not MC friendly, remove
-static void tunnel_channel_disconnect_client(RedChannelClient *rcc)
+static void tunnel_channel_client_on_disconnect(RedChannelClient *rcc)
{
- tunnel_channel_disconnect(rcc->channel);
+ tunnel_channel_on_disconnect(rcc->channel);
}
/* interface for reds */
@@ -3439,7 +3431,7 @@ static void tunnel_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *item)
{
}
-static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
+static void handle_tunnel_channel_link(RedChannel *channel, RedClient *client,
RedsStream *stream, int migration,
int num_common_caps,
uint32_t *common_caps, int num_caps,
@@ -3447,15 +3439,42 @@ static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
{
TunnelChannelClient *tcc;
TunnelWorker *worker = (TunnelWorker *)channel->data;
- RedChannel *tunnel_channel;
- ChannelCbs channel_cbs = {0,};
- if (worker->channel) {
- tunnel_channel_disconnect(worker->channel->base.channel);
+ if (worker->channel_client) {
+ red_error("tunnel does not support multiple client");
}
+ tcc = (TunnelChannelClient*)red_channel_client_create(sizeof(TunnelChannelClient),
+ channel, client, stream);
+
+ tcc->worker = worker;
+ tcc->worker->channel_client = tcc;
+ net_slirp_set_net_interface(&worker->tunnel_interface.base);
+
+ on_new_tunnel_channel(tcc);
+}
+
+static void handle_tunnel_channel_client_migrate(RedChannelClient *rcc)
+{
+ TunnelChannelClient *tunnel_channel;
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG: MIGRATE STARTED");
+#endif
+ tunnel_channel = (TunnelChannelClient *)rcc;
+ ASSERT(tunnel_channel == tunnel_channel->worker->channel_client);
+ tunnel_channel->mig_inprogress = TRUE;
+ net_slirp_freeze();
+ red_channel_client_pipe_add_type(rcc, PIPE_ITEM_TYPE_MIGRATE);
+}
+
+static void red_tunnel_channel_create(TunnelWorker *worker)
+{
+ RedChannel *channel;
+ ChannelCbs channel_cbs;
+ ClientCbs client_cbs = {0,};
+
channel_cbs.config_socket = tunnel_channel_config_socket;
- channel_cbs.disconnect = tunnel_channel_disconnect_client;
+ channel_cbs.on_disconnect = tunnel_channel_client_on_disconnect;
channel_cbs.alloc_recv_buf = tunnel_channel_alloc_msg_rcv_buf;
channel_cbs.release_recv_buf = tunnel_channel_release_msg_rcv_buf;
channel_cbs.hold_item = tunnel_channel_hold_pipe_item;
@@ -3465,39 +3484,23 @@ static void handle_tunnel_channel_link(Channel *channel, RedClient *client,
channel_cbs.handle_migrate_data = tunnel_channel_handle_migrate_data;
channel_cbs.handle_migrate_data_get_serial = tunnel_channel_handle_migrate_data_get_serial;
- tunnel_channel = red_channel_create(sizeof(RedChannel),
- worker->core_interface,
- migration, TRUE,
- tunnel_channel_handle_message,
- &channel_cbs);
-
- if (!tunnel_channel) {
+ channel = red_channel_create(sizeof(RedChannel),
+ worker->core_interface,
+ SPICE_CHANNEL_TUNNEL, 0,
+ FALSE, // TODO: handle migration=TRUE
+ TRUE,
+ tunnel_channel_handle_message,
+ &channel_cbs);
+ if (!channel) {
return;
}
- tcc = (TunnelChannelClient*)red_channel_client_create(
- sizeof(TunnelChannelClient),
- tunnel_channel, client, stream);
- tcc->worker = worker;
- tcc->worker->channel = tcc;
- net_slirp_set_net_interface(&worker->tunnel_interface.base);
-
- on_new_tunnel_channel(tcc);
-}
+ client_cbs.connect = handle_tunnel_channel_link;
+ client_cbs.migrate = handle_tunnel_channel_client_migrate;
+ red_channel_register_client_cbs(channel, &client_cbs);
-static void handle_tunnel_channel_shutdown(struct Channel *channel)
-{
- tunnel_channel_disconnect(((TunnelWorker *)channel->data)->channel->base.channel);
-}
-
-static void handle_tunnel_channel_migrate(struct Channel *channel)
-{
-#ifdef DEBUG_NETWORK
- red_printf("TUNNEL_DBG: MIGRATE STARTED");
-#endif
- TunnelChannelClient *tunnel_channel = ((TunnelWorker *)channel->data)->channel;
- tunnel_channel->mig_inprogress = TRUE;
- net_slirp_freeze();
- red_channel_client_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE);
+ worker->channel = channel;
+ red_channel_set_data(channel, worker);
+ reds_register_channel(worker->channel);
}
diff --git a/server/red_worker.c b/server/red_worker.c
index 4347d243..e767623e 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -8606,15 +8606,20 @@ static void red_disconnect_channel(RedChannel *channel)
static void display_channel_client_disconnect(RedChannelClient *rcc)
{
- // TODO: MC: right now we assume single channel
+ red_channel_client_disconnect(rcc);
+}
+
+static void display_channel_client_on_disconnect(RedChannelClient *rcc)
+{
DisplayChannel *display_channel;
DisplayChannelClient *dcc = RCC_TO_DCC(rcc);
CommonChannel *common;
RedWorker *worker;
- if (!rcc || !red_channel_is_connected(rcc->channel)) {
+ if (!rcc) {
return;
}
+ red_printf("");
common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
worker = common->worker;
display_channel = (DisplayChannel *)rcc->channel;
@@ -8629,8 +8634,6 @@ static void display_channel_client_disconnect(RedChannelClient *rcc)
red_display_reset_compress_buf(dcc);
free(dcc->send_data.free_list.res);
red_display_destroy_streams(dcc);
- red_channel_client_pipe_clear(rcc); // do this before deleting surfaces
- red_channel_client_disconnect(rcc);
// this was the last channel client
if (!red_channel_is_connected(rcc->channel)) {
@@ -8654,12 +8657,15 @@ void red_disconnect_all_display_TODO_remove_me(RedChannel *channel)
worker->display_channel = NULL;
}
-static void red_migrate_display(RedWorker *worker)
+static void red_migrate_display(RedWorker *worker, RedChannelClient *rcc)
{
- if (worker->display_channel) {
- red_pipes_add_verb(&worker->display_channel->common.base,
- SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL);
- red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE);
+ // TODO: replace all worker->display_channel tests with
+ // is_connected
+ if (red_channel_client_is_connected(rcc)) {
+ red_pipe_add_verb(rcc, PIPE_ITEM_TYPE_MIGRATE);
+// red_pipes_add_verb(&worker->display_channel->common.base,
+// SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL);
+// red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE);
}
}
@@ -8900,7 +8906,7 @@ static inline void flush_display_commands(RedWorker *worker)
for (;;) {
red_channel_push(&worker->display_channel->common.base);
if (!display_is_connected(worker) ||
- red_channel_min_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) {
+ red_channel_max_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) {
break;
}
RedChannel *channel = (RedChannel *)worker->display_channel;
@@ -8961,6 +8967,9 @@ static inline void flush_cursor_commands(RedWorker *worker)
}
}
+// TODO: on timeout, don't disconnect all channeld immeduiatly - try to disconnect the slowest ones first
+// and maybe turn timeouts to several timeouts in order to disconnect channels gradually.
+// Should use disconnect or shutdown?
static inline void flush_all_qxl_commands(RedWorker *worker)
{
flush_display_commands(worker);
@@ -9487,15 +9496,13 @@ static int listen_to_new_client_channel(CommonChannel *common,
return TRUE;
}
-static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, int migrate,
+static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
event_listener_action_proc handler,
- channel_disconnect_proc disconnect,
+ channel_disconnect_proc on_disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
channel_release_pipe_item_proc release_item,
channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc on_incoming_error,
- channel_on_outgoing_error_proc on_outgoing_error,
channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
channel_handle_migrate_data_proc handle_migrate_data,
channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
@@ -9505,7 +9512,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
ChannelCbs channel_cbs;
channel_cbs.config_socket = common_channel_config_socket;
- channel_cbs.disconnect = disconnect;
+ channel_cbs.on_disconnect = on_disconnect;
channel_cbs.send_item = send_item;
channel_cbs.hold_item = hold_item;
channel_cbs.release_item = release_item;
@@ -9515,12 +9522,12 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
channel_cbs.handle_migrate_data = handle_migrate_data;
channel_cbs.handle_migrate_data_get_serial = handle_migrate_data_get_serial;
- channel = red_channel_create_parser(size, &worker_core, migrate,
+ channel = red_channel_create_parser(size, &worker_core,
+ channel_type, worker->id,
+ migrate,
TRUE /* handle_acks */,
- spice_get_client_channel_parser(channel_id, NULL),
+ spice_get_client_channel_parser(channel_type, NULL),
handle_parsed,
- on_incoming_error,
- on_outgoing_error,
&channel_cbs);
common = (CommonChannel *)channel;
if (!channel) {
@@ -9682,50 +9689,6 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item,
}
}
-static void display_channel_on_incoming_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void display_channel_on_outgoing_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void cursor_channel_on_incoming_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void cursor_channel_on_outgoing_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-// call this from dispatcher thread context
-static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc)
-{
- RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker;
- struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
-
- red_printf("");
- red_dispatcher_disconnect_display_client(dispatcher, rcc);
-}
-
-// call this from dispatcher thread context
-static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc)
-{
- RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker;
- struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
-
- red_printf("");
- red_dispatcher_disconnect_cursor_client(dispatcher, rcc);
-}
-
static void ensure_display_channel_created(RedWorker *worker, int migrate)
{
DisplayChannel *display_channel;
@@ -9739,17 +9702,16 @@ static void ensure_display_channel_created(RedWorker *worker, int migrate)
worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, migrate,
handle_channel_events,
- dispatch_display_channel_client_disconnect,
+ display_channel_client_on_disconnect,
display_channel_send_item,
display_channel_hold_pipe_item,
display_channel_release_item,
display_channel_handle_message,
- display_channel_on_incoming_error,
- display_channel_on_outgoing_error,
display_channel_handle_migrate_mark,
display_channel_handle_migrate_data,
display_channel_handle_migrate_data_get_serial
))) {
+ red_printf("failed to create display channel");
return;
}
display_channel = worker->display_channel;
@@ -9791,7 +9753,7 @@ static void handle_new_display_channel(RedWorker *worker, RedClient *client, Red
if (!dcc) {
return;
}
-
+ red_printf("New display (client %p) dcc %p stream %p", client, dcc, stream);
stream_buf_size = 32*1024;
dcc->send_data.stream_outbuf = spice_malloc(stream_buf_size);
dcc->send_data.stream_outbuf_size = stream_buf_size;
@@ -9836,11 +9798,15 @@ error:
static void cursor_channel_client_disconnect(RedChannelClient *rcc)
{
- if (!red_channel_is_connected(rcc->channel)) {
+ red_channel_client_disconnect(rcc);
+}
+
+static void cursor_channel_client_on_disconnect(RedChannelClient *rcc)
+{
+ if (!rcc) {
return;
}
red_reset_cursor_cache(rcc);
- red_channel_client_disconnect(rcc);
}
static void red_disconnect_cursor(RedChannel *channel)
@@ -9857,13 +9823,14 @@ static void red_disconnect_cursor(RedChannel *channel)
red_disconnect_channel(channel);
}
-static void red_migrate_cursor(RedWorker *worker)
+static void red_migrate_cursor(RedWorker *worker, RedChannelClient *rcc)
{
- if (cursor_is_connected(worker)) {
- red_channel_pipes_add_type(&worker->cursor_channel->common.base,
- PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE);
- red_channel_pipes_add_type(&worker->cursor_channel->common.base,
- PIPE_ITEM_TYPE_MIGRATE);
+// if (cursor_is_connected(worker)) {
+ if (red_channel_client_is_connected(rcc)) {
+ red_channel_client_pipe_add_type(rcc,
+ PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE);
+ red_channel_client_pipe_add_type(rcc,
+ PIPE_ITEM_TYPE_MIGRATE);
}
}
@@ -9950,13 +9917,11 @@ static void ensure_cursor_channel_created(RedWorker *worker, int migrate)
worker, sizeof(*worker->cursor_channel),
SPICE_CHANNEL_CURSOR, migrate,
handle_channel_events,
- dispatch_cursor_channel_client_disconnect,
+ cursor_channel_client_on_disconnect,
cursor_channel_send_item,
cursor_channel_hold_pipe_item,
cursor_channel_release_item,
red_channel_client_handle_message,
- cursor_channel_on_incoming_error,
- cursor_channel_on_outgoing_error,
NULL,
NULL,
NULL);
@@ -10424,6 +10389,10 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
case RED_WORKER_MESSAGE_STOP:
case RED_WORKER_MESSAGE_LOADVM_COMMANDS:
+ case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE:
+ case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE:
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
write_ready = 1;
default:
break;
@@ -10476,6 +10445,14 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
handle_dev_destroy_primary_surface(worker);
break;
+ case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: {
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_display_channel_created(worker, FALSE);
+ red_channel = &worker->display_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+ break;
+ }
case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
RedsStream *stream;
RedClient *client;
@@ -10488,20 +10465,15 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
handle_new_display_channel(worker, client, stream, migrate);
break;
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: {
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: {
RedChannelClient *rcc;
red_printf("disconnect display client");
receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
display_channel_client_disconnect(rcc);
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
break;
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
- red_printf("disconnect");
- red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel);
- break;
case RED_WORKER_MESSAGE_STOP: {
red_printf("stop");
handle_dev_stop(worker);
@@ -10511,10 +10483,22 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_printf("start");
handle_dev_start(worker);
break;
- case RED_WORKER_MESSAGE_DISPLAY_MIGRATE:
- red_printf("migrate");
- red_migrate_display(worker);
+ case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: {
+ RedChannelClient *rcc;
+ red_printf("migrate display client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
+ red_migrate_display(worker, rcc);
+ break;
+ }
+ case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: {
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_cursor_channel_created(worker, FALSE);
+ red_channel = &worker->cursor_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
break;
+ }
case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
RedsStream *stream;
RedClient *client;
@@ -10527,24 +10511,23 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_connect_cursor(worker, client, stream, migrate);
break;
}
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: {
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: {
RedChannelClient *rcc;
red_printf("disconnect cursor client");
receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
cursor_channel_client_disconnect(rcc);
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
break;
}
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
- red_printf("cursor disconnect");
- red_disconnect_cursor((RedChannel *)worker->cursor_channel);
- break;
- case RED_WORKER_MESSAGE_CURSOR_MIGRATE:
- red_printf("cursor migrate");
- red_migrate_cursor(worker);
+ case RED_WORKER_MESSAGE_CURSOR_MIGRATE: {
+ RedChannelClient *rcc;
+ red_printf("migrate cursor client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
+ red_migrate_cursor(worker, rcc);
break;
+ }
case RED_WORKER_MESSAGE_SET_COMPRESSION:
receive_data(worker->channel, &worker->image_compression,
sizeof(spice_image_compression_t));
diff --git a/server/red_worker.h b/server/red_worker.h
index 6fbe0611..26c43adb 100644
--- a/server/red_worker.h
+++ b/server/red_worker.h
@@ -53,13 +53,11 @@ enum {
RED_WORKER_MESSAGE_READY,
RED_WORKER_MESSAGE_DISPLAY_CONNECT,
RED_WORKER_MESSAGE_DISPLAY_DISCONNECT,
- RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT,
RED_WORKER_MESSAGE_DISPLAY_MIGRATE,
RED_WORKER_MESSAGE_START,
RED_WORKER_MESSAGE_STOP,
RED_WORKER_MESSAGE_CURSOR_CONNECT,
RED_WORKER_MESSAGE_CURSOR_DISCONNECT,
- RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT,
RED_WORKER_MESSAGE_CURSOR_MIGRATE,
RED_WORKER_MESSAGE_SET_COMPRESSION,
RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
@@ -83,6 +81,9 @@ enum {
RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
/* suspend/windows resolution change command */
RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
+
+ RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE,
+ RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE,
};
typedef uint32_t RedWorkerMessage;
diff --git a/server/reds.c b/server/reds.c
index b8b4d26f..0387a5cc 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -193,17 +193,23 @@ typedef struct RedsStatValue {
typedef struct RedsMigSpice RedsMigSpice;
+typedef struct RedsChannel {
+ struct RedsChannel *next;
+ RedChannel *base;
+
+ int num_common_caps;
+ uint32_t *common_caps;
+} RedsChannel;
+
typedef struct RedsState {
int listen_socket;
int secure_listen_socket;
SpiceWatch *listen_watch;
SpiceWatch *secure_listen_watch;
- int disconnecting;
VDIPortState agent_state;
int pending_mouse_event;
Ring clients;
int num_clients;
- Channel *main_channel_factory;
MainChannel *main_channel;
int mig_wait_connect;
@@ -212,7 +218,7 @@ typedef struct RedsState {
int mig_target;
RedsMigSpice *mig_spice;
int num_of_channels;
- Channel *channels;
+ RedsChannel *channels;
int mouse_mode;
int is_client_mouse_allowed;
int dispatcher_allows_client_mouse;
@@ -297,6 +303,8 @@ struct ChannelSecurityOptions {
ChannelSecurityOptions *next;
};
+static void reds_dispose_channel(RedsChannel *channel);
+
static ChannelSecurityOptions *channels_security = NULL;
static int default_channel_security =
SPICE_CHANNEL_SECURITY_NONE | SPICE_CHANNEL_SECURITY_SSL;
@@ -506,21 +514,29 @@ void reds_update_stat_value(uint32_t value)
#endif
-void reds_register_channel(Channel *channel)
+void reds_register_channel(RedChannel *channel)
{
+ RedsChannel *reds_channel;
+
ASSERT(reds);
- channel->next = reds->channels;
- reds->channels = channel;
+ // TODO: should channels be released upon some destructor?
+ reds_channel = spice_malloc0(sizeof(RedsChannel));
+ reds_channel->base = channel;
+ reds_channel->next = reds->channels;
+ reds->channels = reds_channel;
reds->num_of_channels++;
}
-void reds_unregister_channel(Channel *channel)
+void reds_unregister_channel(RedChannel *channel)
{
- Channel **now = &reds->channels;
+ RedsChannel **now = &reds->channels;
while (*now) {
- if (*now == channel) {
- *now = channel->next;
+ if ((*now)->base == channel) {
+ RedsChannel *free_channel = *now;
+ *now = free_channel->next;
+ reds_dispose_channel(free_channel);
+ free(free_channel);
reds->num_of_channels--;
return;
}
@@ -529,10 +545,10 @@ void reds_unregister_channel(Channel *channel)
red_printf("not found");
}
-static Channel *reds_find_channel(uint32_t type, uint32_t id)
+static RedsChannel *reds_find_channel(uint32_t type, uint32_t id)
{
- Channel *channel = reds->channels;
- while (channel && !(channel->type == type && channel->id == id)) {
+ RedsChannel *channel = reds->channels;
+ while (channel && !(channel->base->type == type && channel->base->id == id)) {
channel = channel->next;
}
return channel;
@@ -590,27 +606,42 @@ static int reds_main_channel_connected(void)
void reds_client_disconnect(RedClient *client)
{
- if (!reds_main_channel_connected() || client->disconnecting) {
+ // TODO: rename reds_main_channel_connected, or really set main_channel to NULL on disconnect,
+ // though still, it is not a reason not to disconnect the rest of the channels
+ if (!reds_main_channel_connected() || client->disconnecting) {
+ /* case of recursion (main_channel_client_on_disconnect->
+ * reds_client_disconnect->red_client_destroy-<main_channel...
+ */
return;
}
red_printf("");
+ // why is "disconnecting" even needed? it is synchronic, even in the dispatcher we are now waiting for disconnection
+ // Are there recursive calls? Maybe from main_channel?
client->disconnecting = TRUE;
- /* Reset write filter to start with clean state on client reconnect */
- agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste,
- TRUE);
- /* Throw away pending chunks from the current (if any) and future
- messages read from the agent */
- reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD;
- reds->agent_state.read_filter.discard_all = TRUE;
+ // TODO: we need to handle agent properly for all clients!!!! (e.g., cut and paste, how?)
+ // We shouldn't initialize the agent when there are still clients connected
ring_remove(&client->link);
reds->num_clients--;
red_client_destroy(client);
- reds_mig_cleanup();
- reds->disconnecting = FALSE;
+ // TODO: we need to handle agent properly for all clients!!!! (e.g., cut and paste, how? Maybe throw away messages
+ // if we are in the middle of one from another client)
+ // We shouldn't initialize the agent when there are still clients connected
+ if (reds->num_clients == 0) {
+ /* Reset write filter to start with clean state on client reconnect */
+ agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste,
+ TRUE);
+
+ /* Throw away pending chunks from the current (if any) and future
+ * messages read from the agent */
+ reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD;
+ reds->agent_state.read_filter.discard_all = TRUE;
+
+ reds_mig_cleanup();
+ }
}
// TODO: go over all usage of reds_disconnect, most/some of it should be converted to
@@ -623,6 +654,7 @@ static void reds_disconnect(void)
RING_FOREACH_SAFE(link, next, &reds->clients) {
reds_client_disconnect(SPICE_CONTAINEROF(link, RedClient, link));
}
+ reds_mig_cleanup();
}
static void reds_mig_disconnect()
@@ -949,11 +981,11 @@ int reds_num_of_channels()
static int secondary_channels[] = {
SPICE_CHANNEL_MAIN, SPICE_CHANNEL_DISPLAY, SPICE_CHANNEL_CURSOR, SPICE_CHANNEL_INPUTS};
-static int channel_is_secondary(Channel *channel)
+static int channel_is_secondary(RedsChannel *channel)
{
int i;
for (i = 0 ; i < sizeof(secondary_channels)/sizeof(secondary_channels[0]); ++i) {
- if (channel->type == secondary_channels[i]) {
+ if (channel->base->type == secondary_channels[i]) {
return TRUE;
}
}
@@ -962,7 +994,7 @@ static int channel_is_secondary(Channel *channel)
void reds_fill_channels(SpiceMsgChannels *channels_info)
{
- Channel *channel;
+ RedsChannel *channel;
int i;
int used_channels = 0;
@@ -973,8 +1005,8 @@ void reds_fill_channels(SpiceMsgChannels *channels_info)
if (reds->num_clients > 1 && !channel_is_secondary(channel)) {
continue;
}
- channels_info->channels[used_channels].type = channel->type;
- channels_info->channels[used_channels].id = channel->id;
+ channels_info->channels[used_channels].type = channel->base->type;
+ channels_info->channels[used_channels].id = channel->base->id;
used_channels++;
}
channels_info->num_of_channels = used_channels;
@@ -1350,7 +1382,7 @@ static int sync_write(RedsStream *stream, const void *in_buf, size_t n)
return TRUE;
}
-static void reds_channel_set_common_caps(Channel *channel, int cap, int active)
+static void reds_channel_set_common_caps(RedsChannel *channel, int cap, int active)
{
int nbefore, n;
@@ -1367,7 +1399,7 @@ static void reds_channel_set_common_caps(Channel *channel, int cap, int active)
}
}
-static void reds_channel_init_auth_caps(Channel *channel)
+static void reds_channel_init_auth_caps(RedsChannel *channel)
{
if (sasl_enabled) {
reds_channel_set_common_caps(channel, SPICE_COMMON_CAP_AUTH_SASL, TRUE);
@@ -1377,12 +1409,8 @@ static void reds_channel_init_auth_caps(Channel *channel)
reds_channel_set_common_caps(channel, SPICE_COMMON_CAP_PROTOCOL_AUTH_SELECTION, TRUE);
}
-void reds_channel_dispose(Channel *channel)
+static void reds_dispose_channel(RedsChannel *channel)
{
- free(channel->caps);
- channel->caps = NULL;
- channel->num_caps = 0;
-
free(channel->common_caps);
channel->common_caps = NULL;
channel->num_common_caps = 0;
@@ -1392,8 +1420,8 @@ static int reds_send_link_ack(RedLinkInfo *link)
{
SpiceLinkHeader header;
SpiceLinkReply ack;
- Channel caps = { 0, };
- Channel *channel;
+ RedsChannel common_caps = { 0, };
+ RedsChannel *channel;
BUF_MEM *bmBuf;
BIO *bio;
int ret = FALSE;
@@ -1407,13 +1435,13 @@ static int reds_send_link_ack(RedLinkInfo *link)
channel = reds_find_channel(link->link_mess->channel_type, 0);
if (!channel) {
- channel = &caps;
+ channel = &common_caps;
}
reds_channel_init_auth_caps(channel); /* make sure common caps are set */
ack.num_common_caps = channel->num_common_caps;
- ack.num_channel_caps = channel->num_caps;
+ ack.num_channel_caps = channel->base ? channel->base->num_caps : 0;
header.size += (ack.num_common_caps + ack.num_channel_caps) * sizeof(uint32_t);
ack.caps_offset = sizeof(SpiceLinkReply);
@@ -1441,13 +1469,15 @@ static int reds_send_link_ack(RedLinkInfo *link)
goto end;
if (!sync_write(link->stream, channel->common_caps, channel->num_common_caps * sizeof(uint32_t)))
goto end;
- if (!sync_write(link->stream, channel->caps, channel->num_caps * sizeof(uint32_t)))
- goto end;
+ if (channel->base) {
+ if (!sync_write(link->stream, channel->base->caps, channel->base->num_caps * sizeof(uint32_t)))
+ goto end;
+ }
ret = TRUE;
end:
- reds_channel_dispose(&caps);
+ reds_dispose_channel(&common_caps);
BIO_free(bio);
return ret;
}
@@ -1540,18 +1570,19 @@ static void reds_handle_main_link(RedLinkInfo *link)
link->link_mess = NULL;
reds_link_free(link);
caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
- if (!reds->main_channel_factory) {
- reds->main_channel_factory = main_channel_init();
+ if (!reds->main_channel) {
+ reds->main_channel = main_channel_init();
+ ASSERT(reds->main_channel);
}
client = red_client_new();
ring_add(&reds->clients, &client->link);
reds->num_clients++;
- mcc = main_channel_link(reds->main_channel_factory, client,
- stream, connection_id, reds->mig_target, link_mess->num_common_caps,
- link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
- link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
- reds->main_channel = (MainChannel*)reds->main_channel_factory->data;
- ASSERT(reds->main_channel);
+ mcc = main_channel_link(reds->main_channel, client,
+ stream, connection_id, reds->mig_target,
+ link_mess->num_common_caps,
+ link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
+ link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
+ red_printf("NEW Client %p mcc %p connect-id %d", client, mcc, connection_id);
free(link_mess);
red_client_set_main(client, mcc);
@@ -1608,7 +1639,7 @@ static void openssl_init(RedLinkInfo *link)
static void reds_handle_other_links(RedLinkInfo *link)
{
- Channel *channel;
+ RedsChannel *channel;
RedClient *client = NULL;
RedsStream *stream;
SpiceLinkMess *link_mess;
@@ -1617,7 +1648,7 @@ static void reds_handle_other_links(RedLinkInfo *link)
link_mess = link->link_mess;
if (reds->main_channel) {
client = main_channel_get_client_by_link_id(reds->main_channel,
- link_mess->connection_id);
+ link_mess->connection_id);
}
// TODO: MC: broke migration (at least for the dont-drop-connection kind).
@@ -1650,9 +1681,12 @@ static void reds_handle_other_links(RedLinkInfo *link)
link->link_mess = NULL;
reds_link_free(link);
caps = (uint32_t *)((uint8_t *)link_mess + link_mess->caps_offset);
- channel->link(channel, client, stream, reds->mig_target, link_mess->num_common_caps,
- link_mess->num_common_caps ? caps : NULL, link_mess->num_channel_caps,
- link_mess->num_channel_caps ? caps + link_mess->num_common_caps : NULL);
+ channel->base->client_cbs.connect(channel->base, client, stream, reds->mig_target,
+ link_mess->num_common_caps,
+ link_mess->num_common_caps ? caps : NULL,
+ link_mess->num_channel_caps,
+ link_mess->num_channel_caps ?
+ caps + link_mess->num_common_caps : NULL);
free(link_mess);
}
@@ -3077,7 +3111,7 @@ static void reds_mig_finished(int completed)
reds->mig_inprogress = TRUE;
if (completed) {
- Channel *channel;
+ RingItem *link, *next;
reds->mig_wait_disconnect = TRUE;
core->timer_start(reds->mig_timer, MIGRATE_TIMEOUT);
@@ -3086,14 +3120,16 @@ static void reds_mig_finished(int completed)
// - it can have an empty migrate - that seems ok
// - I can try to fill it's migrate, then move stuff from reds.c there, but a lot of data
// is in reds state right now.
+ // currently the migrate callback of main_channel does nothing
main_channel_push_migrate(reds->main_channel);
- channel = reds->channels;
- while (channel) {
- channel->migrate(channel);
- channel = channel->next;
+
+ RING_FOREACH_SAFE(link, next, &reds->clients) {
+ red_client_migrate(SPICE_CONTAINEROF(link, RedClient, link));
}
} else {
main_channel_push_migrate_cancel(reds->main_channel);
+ // TODO: all the seemless migration is broken. Before MC we waited for disconection of one client,
+ // no we need to wait to all the clients (see mig_timer)?
reds_mig_cleanup();
}
}
diff --git a/server/reds.h b/server/reds.h
index 6930fe6a..0bbda143 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -31,13 +31,10 @@
#include "common/marshaller.h"
#include "common/messages.h"
#include "spice.h"
+#include "red_channel.h"
#define SPICE_GNUC_VISIBLE __attribute__ ((visibility ("default")))
-typedef struct RedsStream RedsStream;
-typedef struct RedClient RedClient;
-typedef struct MainChannelClient MainChannelClient;
-
#if HAVE_SASL
typedef struct RedsSASL {
sasl_conn_t *conn;
@@ -88,22 +85,6 @@ struct RedsStream {
ssize_t (*writev)(RedsStream *s, const struct iovec *iov, int iovcnt);
};
-typedef struct Channel {
- struct Channel *next;
- uint32_t type;
- uint32_t id;
- int num_common_caps;
- uint32_t *common_caps;
- int num_caps;
- uint32_t *caps;
- void (*link)(struct Channel *, RedClient *client, RedsStream *stream,
- int migration, int num_common_caps,
- uint32_t *common_caps, int num_caps, uint32_t *caps);
- void (*shutdown)(struct Channel *);
- void (*migrate)(struct Channel *);
- void *data;
-} Channel;
-
struct QXLState {
QXLInterface *qif;
struct RedDispatcher *dispatcher;
@@ -114,8 +95,6 @@ struct SpiceNetWireState {
struct TunnelWorker *worker;
};
-void reds_channel_dispose(Channel *channel);
-
ssize_t reds_stream_read(RedsStream *s, void *buf, size_t nbyte);
ssize_t reds_stream_write(RedsStream *s, const void *buf, size_t nbyte);
ssize_t reds_stream_writev(RedsStream *s, const struct iovec *iov, int iovcnt);
@@ -127,8 +106,8 @@ void reds_update_mm_timer(uint32_t mm_time);
uint32_t reds_get_mm_time(void);
void reds_set_client_mouse_allowed(int is_client_mouse_allowed,
int x_res, int y_res);
-void reds_register_channel(Channel *channel);
-void reds_unregister_channel(Channel *channel);
+void reds_register_channel(RedChannel *channel);
+void reds_unregister_channel(RedChannel *channel);
int reds_get_mouse_mode(void); // used by inputs_channel
int reds_get_agent_mouse(void); // used by inputs_channel
int reds_has_vdagent(void); // used by inputs channel
diff --git a/server/smartcard.c b/server/smartcard.c
index 36004f75..2ff13101 100644
--- a/server/smartcard.c
+++ b/server/smartcard.c
@@ -22,6 +22,7 @@
#include <arpa/inet.h>
#include <vscard_common.h>
+#include "server/reds.h"
#include "server/char_device.h"
#include "server/red_channel.h"
#include "server/smartcard.h"
@@ -78,7 +79,7 @@ static void smartcard_on_message_from_device(
RedChannelClient *rcc, VSCMsgHeader *vheader);
static SmartCardDeviceState* smartcard_device_state_new();
static void smartcard_device_state_free(SmartCardDeviceState* st);
-static void smartcard_register_channel(void);
+static void smartcard_init(void);
void smartcard_char_device_wakeup(SpiceCharDeviceInstance *sin)
{
@@ -169,7 +170,7 @@ static int smartcard_char_device_add_to_readers(SpiceCharDeviceInstance *char_de
}
state->reader_id = g_smartcard_readers.num;
g_smartcard_readers.sin[g_smartcard_readers.num++] = char_device;
- smartcard_register_channel();
+ smartcard_init();
return 0;
}
@@ -274,7 +275,6 @@ static int smartcard_channel_client_config_socket(RedChannelClient *rcc)
static uint8_t *smartcard_channel_alloc_msg_rcv_buf(RedChannelClient *rcc,
SpiceDataHeader *msg_header)
{
- //red_printf("allocing %d bytes", msg_header->size);
return spice_malloc(msg_header->size);
}
@@ -337,10 +337,9 @@ static void smartcard_channel_release_pipe_item(RedChannelClient *rcc,
free(item);
}
-static void smartcard_channel_client_disconnect(RedChannelClient *rcc)
+static void smartcard_channel_on_disconnect(RedChannelClient *rcc)
{
smartcard_readers_detach_all(rcc);
- red_channel_client_destroy(rcc);
}
/* this is called from both device input and client input. since the device is
@@ -487,63 +486,55 @@ static void smartcard_channel_hold_pipe_item(RedChannelClient *rcc, PipeItem *it
{
}
-static void smartcard_link(Channel *channel, RedClient *client,
+static void smartcard_connect(RedChannel *channel, RedClient *client,
RedsStream *stream, int migration,
int num_common_caps, uint32_t *common_caps,
int num_caps, uint32_t *caps)
{
RedChannelClient *rcc;
- ChannelCbs channel_cbs;
- if (!channel->data) {
- memset(&channel_cbs, sizeof(channel_cbs), 0);
- channel_cbs.config_socket = smartcard_channel_client_config_socket;
- channel_cbs.disconnect = smartcard_channel_client_disconnect;
- channel_cbs.send_item = smartcard_channel_send_item;
- channel_cbs.hold_item = smartcard_channel_hold_pipe_item;
- channel_cbs.release_item = smartcard_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = smartcard_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = smartcard_channel_release_msg_rcv_buf;
- channel->data =
- red_channel_create(sizeof(SmartCardChannel),
- core, migration,
- FALSE /* handle_acks */,
- smartcard_channel_handle_message,
- &channel_cbs);
- if (channel->data) {
- red_channel_init_outgoing_messages_window((RedChannel*)channel->data);
- } else {
- red_printf("ERROR: smartcard channel creation failed");
- return;
- }
- }
- rcc = red_channel_client_create(sizeof(RedChannelClient),
- (RedChannel*)channel->data, client, stream);
+ rcc = red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
red_channel_client_ack_zero_messages_window(rcc);
}
-static void smartcard_shutdown(Channel *channel)
+static void smartcard_migrate(RedChannelClient *rcc)
{
}
-static void smartcard_migrate(Channel *channel)
-{
-}
+SmartCardChannel *g_smartcard_channel;
-static void smartcard_register_channel(void)
+static void smartcard_init(void)
{
- Channel *channel;
- static int registered = 0;
-
- if (registered) {
- return;
+ ChannelCbs channel_cbs;
+ ClientCbs client_cbs = {0,};
+
+ ASSERT(!g_smartcard_channel);
+
+ memset(&channel_cbs, sizeof(channel_cbs), 0);
+ memset(&client_cbs, sizeof(client_cbs), 0);
+
+ channel_cbs.config_socket = smartcard_channel_client_config_socket;
+ channel_cbs.on_disconnect = smartcard_channel_on_disconnect;
+ channel_cbs.send_item = smartcard_channel_send_item;
+ channel_cbs.hold_item = smartcard_channel_hold_pipe_item;
+ channel_cbs.release_item = smartcard_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = smartcard_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = smartcard_channel_release_msg_rcv_buf;
+
+ g_smartcard_channel = (SmartCardChannel*)red_channel_create(sizeof(SmartCardChannel),
+ core, SPICE_CHANNEL_SMARTCARD, 0,
+ FALSE /* migration - TODO?*/,
+ FALSE /* handle_acks */,
+ smartcard_channel_handle_message,
+ &channel_cbs);
+
+ if (!g_smartcard_channel) {
+ red_error("failed to allocate Inputs Channel");
}
- red_printf("registering smartcard channel");
- registered = 1;
- channel = spice_new0(Channel, 1);
- channel->type = SPICE_CHANNEL_SMARTCARD;
- channel->link = smartcard_link;
- channel->shutdown = smartcard_shutdown;
- channel->migrate = smartcard_migrate;
- reds_register_channel(channel);
+
+ client_cbs.connect = smartcard_connect;
+ client_cbs.migrate = smartcard_migrate;
+ red_channel_register_client_cbs(&g_smartcard_channel->base, &client_cbs);
+
+ reds_register_channel(&g_smartcard_channel->base);
}
diff --git a/server/snd_worker.c b/server/snd_worker.c
index 332612f5..4c9a6f01 100644
--- a/server/snd_worker.c
+++ b/server/snd_worker.c
@@ -28,6 +28,7 @@
#include "spice.h"
#include "red_common.h"
+#include "main_channel.h"
#include "reds.h"
#include "red_dispatcher.h"
#include "snd_worker.h"
@@ -35,12 +36,6 @@
#include "generated_marshallers.h"
#include "demarshallers.h"
-/* main_channel.h inclusion drags red_channel.h which has conflicting types.
- * until the channels here are defined in terms of red_channel.h we have some
- * duplicate declarations */
-MainChannelClient *red_client_get_main(RedClient *client);
-int main_channel_client_is_low_bandwidth(MainChannelClient *mcc);
-
#define MAX_SEND_VEC 100
#define RECIVE_BUF_SIZE (16 * 1024 * 2)
@@ -78,10 +73,10 @@ enum RecordCommand {
#define SND_RECORD_VOLUME_MASK (1 << SND_RECORD_VOLUME)
typedef struct SndChannel SndChannel;
-typedef void (*send_messages_proc)(void *in_channel);
-typedef int (*handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message);
-typedef void (*on_message_done_proc)(SndChannel *channel);
-typedef void (*cleanup_channel_proc)(SndChannel *channel);
+typedef void (*snd_channel_send_messages_proc)(void *in_channel);
+typedef int (*snd_channel_handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message);
+typedef void (*snd_channel_on_message_done_proc)(SndChannel *channel);
+typedef void (*snd_channel_cleanup_channel_proc)(SndChannel *channel);
typedef struct SndWorker SndWorker;
@@ -90,6 +85,8 @@ struct SndChannel {
SndWorker *worker;
spice_parse_channel_func_t parser;
+ RedChannelClient *channel_client;
+
int active;
int client_active;
int blocked;
@@ -116,10 +113,10 @@ struct SndChannel {
uint8_t *end;
} recive_data;
- send_messages_proc send_messages;
- handle_message_proc handle_message;
- on_message_done_proc on_message_done;
- cleanup_channel_proc cleanup;
+ snd_channel_send_messages_proc send_messages;
+ snd_channel_handle_message_proc handle_message;
+ snd_channel_on_message_done_proc on_message_done;
+ snd_channel_cleanup_channel_proc cleanup;
int num_caps;
uint32_t *caps;
};
@@ -146,7 +143,7 @@ typedef struct PlaybackChannel {
} PlaybackChannel;
struct SndWorker {
- Channel base;
+ RedChannel *base_channel;
SndChannel *connection;
SndWorker *next;
int active;
@@ -193,7 +190,7 @@ typedef struct RecordChannel {
uint32_t celt_buf[FRAME_SIZE];
} RecordChannel;
-static SndWorker *workers = NULL;
+static SndWorker *workers;
static uint32_t playback_compression = SPICE_AUDIO_DATA_MODE_CELT_0_5_1;
static void snd_receive(void* data);
@@ -863,10 +860,11 @@ static void snd_record_send(void* data)
static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id,
RedClient *client,
RedsStream *stream,
- int migrate, send_messages_proc send_messages,
- handle_message_proc handle_message,
- on_message_done_proc on_message_done,
- cleanup_channel_proc cleanup,
+ int migrate,
+ snd_channel_send_messages_proc send_messages,
+ snd_channel_handle_message_proc handle_message,
+ snd_channel_on_message_done_proc on_message_done,
+ snd_channel_cleanup_channel_proc cleanup,
uint32_t *caps, int num_caps)
{
SndChannel *channel;
@@ -926,6 +924,10 @@ static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_i
channel->cleanup = cleanup;
channel->num_caps = num_caps;
channel->caps = spice_memdup(caps, num_caps * sizeof(uint32_t));
+
+ channel->channel_client = red_channel_client_create_dummy(sizeof(RedChannelClient),
+ worker->base_channel,
+ client);
return channel;
error2:
@@ -936,9 +938,15 @@ error1:
return NULL;
}
-static void snd_shutdown(Channel *channel)
+static void snd_disconnect_channel_client(RedChannelClient *rcc)
{
- SndWorker *worker = (SndWorker *)channel;
+ SndWorker *worker;
+
+ ASSERT(rcc->channel);
+ ASSERT(rcc->channel->data);
+ worker = (SndWorker *)rcc->channel->data;
+
+ ASSERT(worker->connection->channel_client == rcc);
snd_disconnect_channel(worker->connection);
}
@@ -1096,13 +1104,13 @@ static void snd_playback_cleanup(SndChannel *channel)
celt051_mode_destroy(playback_channel->celt_mode);
}
-static void snd_set_playback_peer(Channel *channel, RedClient *client, RedsStream *stream,
+static void snd_set_playback_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
int migration, int num_common_caps, uint32_t *common_caps,
int num_caps, uint32_t *caps)
{
- SndWorker *worker = (SndWorker *)channel;
- SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker);
+ SndWorker *worker = channel->data;
PlaybackChannel *playback_channel;
+ SpicePlaybackState *st = SPICE_CONTAINEROF(worker, SpicePlaybackState, worker);
CELTEncoder *celt_encoder;
CELTMode *celt_mode;
int celt_error;
@@ -1158,10 +1166,16 @@ error_1:
celt051_mode_destroy(celt_mode);
}
-static void snd_record_migrate(Channel *channel)
+static void snd_record_migrate_channel_client(RedChannelClient *rcc)
{
- SndWorker *worker = (SndWorker *)channel;
+ SndWorker *worker;
+
+ ASSERT(rcc->channel);
+ ASSERT(rcc->channel->data);
+ worker = (SndWorker *)rcc->channel->data;
+
if (worker->connection) {
+ ASSERT(worker->connection->channel_client == rcc);
snd_set_command(worker->connection, SND_RECORD_MIGRATE_MASK);
snd_record_send(worker->connection);
}
@@ -1290,19 +1304,19 @@ static void on_new_record_channel(SndWorker *worker)
static void snd_record_cleanup(SndChannel *channel)
{
- RecordChannel *record_channel = (RecordChannel *)channel;
+ RecordChannel *record_channel = SPICE_CONTAINEROF(channel, RecordChannel, base);
celt051_decoder_destroy(record_channel->celt_decoder);
celt051_mode_destroy(record_channel->celt_mode);
}
-static void snd_set_record_peer(Channel *channel, RedClient *client, RedsStream *stream,
+static void snd_set_record_peer(RedChannel *channel, RedClient *client, RedsStream *stream,
int migration, int num_common_caps, uint32_t *common_caps,
int num_caps, uint32_t *caps)
{
- SndWorker *worker = (SndWorker *)channel;
- SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker);
+ SndWorker *worker = channel->data;
RecordChannel *record_channel;
+ SpiceRecordState *st = SPICE_CONTAINEROF(worker, SpiceRecordState, worker);
CELTDecoder *celt_decoder;
CELTMode *celt_mode;
int celt_error;
@@ -1354,11 +1368,16 @@ error_2:
celt051_mode_destroy(celt_mode);
}
-static void snd_playback_migrate(Channel *channel)
+static void snd_playback_migrate_channel_client(RedChannelClient *rcc)
{
- SndWorker *worker = (SndWorker *)channel;
+ SndWorker *worker;
+
+ ASSERT(rcc->channel);
+ ASSERT(rcc->channel->data);
+ worker = (SndWorker *)rcc->channel->data;
if (worker->connection) {
+ ASSERT(worker->connection->channel_client == rcc);
snd_set_command(worker->connection, SND_PLAYBACK_MIGRATE_MASK);
snd_playback_send(worker->connection);
}
@@ -1386,48 +1405,67 @@ static void remove_worker(SndWorker *worker)
void snd_attach_playback(SpicePlaybackInstance *sin)
{
SndWorker *playback_worker;
+ int num_caps;
+ uint32_t *caps;
+ RedChannel *channel;
+ ClientCbs client_cbs = {0,};
sin->st = spice_new0(SpicePlaybackState, 1);
sin->st->sin = sin;
playback_worker = &sin->st->worker;
- playback_worker->base.type = SPICE_CHANNEL_PLAYBACK;
- playback_worker->base.link = snd_set_playback_peer;
- playback_worker->base.shutdown = snd_shutdown;
- playback_worker->base.migrate = snd_playback_migrate;
- playback_worker->base.data = NULL;
+ // TODO: Make RedChannel base of worker? instead of assigning it to channel->data
+ channel = red_channel_create_dummy(sizeof(RedChannel), SPICE_CHANNEL_PLAYBACK, 0);
+
+ channel->data = playback_worker;
+ client_cbs.connect = snd_set_playback_peer;
+ client_cbs.disconnect = snd_disconnect_channel_client;
+ client_cbs.migrate = snd_playback_migrate_channel_client;
+ red_channel_register_client_cbs(channel, &client_cbs);
+ red_channel_set_data(channel, playback_worker);
- playback_worker->base.num_caps = 1;
- playback_worker->base.caps = spice_new(uint32_t, 1);
- playback_worker->base.caps[0] =
- (1 << SPICE_PLAYBACK_CAP_CELT_0_5_1) |
- (1 << SPICE_PLAYBACK_CAP_VOLUME);
+ num_caps = 1;
+ caps = spice_new(uint32_t, 1);
+ caps[0] = (1 << SPICE_PLAYBACK_CAP_CELT_0_5_1) |
+ (1 << SPICE_PLAYBACK_CAP_VOLUME);
+ red_channel_set_caps(channel, num_caps, caps);
+ playback_worker->base_channel = channel;
add_worker(playback_worker);
- reds_register_channel(&playback_worker->base);
+ reds_register_channel(playback_worker->base_channel);
}
void snd_attach_record(SpiceRecordInstance *sin)
{
SndWorker *record_worker;
+ int num_caps;
+ uint32_t *caps;
+ RedChannel *channel;
+ ClientCbs client_cbs = {0,};
sin->st = spice_new0(SpiceRecordState, 1);
sin->st->sin = sin;
record_worker = &sin->st->worker;
- record_worker->base.type = SPICE_CHANNEL_RECORD;
- record_worker->base.link = snd_set_record_peer;
- record_worker->base.shutdown = snd_shutdown;
- record_worker->base.migrate = snd_record_migrate;
- record_worker->base.data = NULL;
-
- record_worker->base.num_caps = 1;
- record_worker->base.caps = spice_new(uint32_t, 1);
- record_worker->base.caps[0] =
- (1 << SPICE_RECORD_CAP_CELT_0_5_1) |
- (1 << SPICE_RECORD_CAP_VOLUME);
+ // TODO: Make RedChannel base of worker? instead of assigning it to channel->data
+ channel = red_channel_create_dummy(sizeof(RedChannel), SPICE_CHANNEL_RECORD, 0);
+
+ channel->data = record_worker;
+ client_cbs.connect = snd_set_record_peer;
+ client_cbs.disconnect = snd_disconnect_channel_client;
+ client_cbs.migrate = snd_record_migrate_channel_client;
+ red_channel_register_client_cbs(channel, &client_cbs);
+ red_channel_set_data(channel, record_worker);
+
+ num_caps = 1;
+ caps = spice_new(uint32_t, 1);
+ caps[0] = (1 << SPICE_RECORD_CAP_CELT_0_5_1) |
+ (1 << SPICE_RECORD_CAP_VOLUME);
+ red_channel_set_caps(channel, num_caps, caps);
+
+ record_worker->base_channel = channel;
add_worker(record_worker);
- reds_register_channel(&record_worker->base);
+ reds_register_channel(record_worker->base_channel);
}
static void snd_detach_common(SndWorker *worker)
@@ -1437,9 +1475,8 @@ static void snd_detach_common(SndWorker *worker)
}
remove_worker(worker);
snd_disconnect_channel(worker->connection);
- reds_unregister_channel(&worker->base);
-
- reds_channel_dispose(&worker->base);
+ reds_unregister_channel(worker->base_channel);
+ red_channel_destroy(worker->base_channel);
}
static void spice_playback_state_free(SpicePlaybackState *st)
@@ -1472,7 +1509,7 @@ void snd_set_playback_compression(int on)
playback_compression = on ? SPICE_AUDIO_DATA_MODE_CELT_0_5_1 : SPICE_AUDIO_DATA_MODE_RAW;
for (; now; now = now->next) {
- if (now->base.type == SPICE_CHANNEL_PLAYBACK && now->connection) {
+ if (now->base_channel->type == SPICE_CHANNEL_PLAYBACK && now->connection) {
SndChannel* sndchannel = now->connection;
PlaybackChannel* playback = (PlaybackChannel*)now->connection;
if (!check_cap(sndchannel->caps, sndchannel->num_caps,
diff --git a/server/usbredir.c b/server/usbredir.c
index 50a60956..8daab7d4 100644
--- a/server/usbredir.c
+++ b/server/usbredir.c
@@ -24,6 +24,7 @@
#include "server/char_device.h"
#include "server/red_channel.h"
+#include "server/reds.h"
/* 64K should be enough for all but the largest bulk xfers + 32 bytes hdr */
#define BUF_SIZE (64 * 1024 + 32)
@@ -36,8 +37,7 @@ typedef struct UsbRedirPipeItem {
} UsbRedirPipeItem;
typedef struct UsbRedirState {
- Channel channel;
- RedChannel *red_channel;
+ RedChannel *channel;
RedChannelClient *rcc;
SpiceCharDeviceState chardev_st;
SpiceCharDeviceInstance *chardev_sin;
@@ -60,14 +60,14 @@ static void usbredir_chardev_wakeup(SpiceCharDeviceInstance *sin)
state = SPICE_CONTAINEROF(sin->st, UsbRedirState, chardev_st);
sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base);
- if (!state->red_channel) {
+ if (!state->rcc) {
return;
}
do {
if (!state->pipe_item) {
state->pipe_item = spice_malloc(sizeof(UsbRedirPipeItem));
- red_channel_pipe_item_init(state->red_channel,
+ red_channel_pipe_item_init(state->rcc->channel,
&state->pipe_item->base, 0);
}
@@ -82,12 +82,12 @@ static void usbredir_chardev_wakeup(SpiceCharDeviceInstance *sin)
} while (n > 0);
}
-static int usbredir_red_channel_config_socket(RedChannelClient *rcc)
+static int usbredir_red_channel_client_config_socket(RedChannelClient *rcc)
{
return TRUE;
}
-static void usbredir_red_channel_disconnect(RedChannelClient *rcc)
+static void usbredir_red_channel_client_on_disconnect(RedChannelClient *rcc)
{
UsbRedirState *state;
SpiceCharDeviceInstance *sin;
@@ -177,16 +177,18 @@ static void usbredir_red_channel_release_pipe_item(RedChannelClient *rcc,
free(item);
}
-static void usbredir_link(Channel *channel, RedClient *client, RedsStream *stream, int migration,
- int num_common_caps, uint32_t *common_caps, int num_caps, uint32_t *caps)
+static void usbredir_connect(RedChannel *channel, RedClient *client,
+ RedsStream *stream, int migration, int num_common_caps,
+ uint32_t *common_caps, int num_caps, uint32_t *caps)
{
+ RedChannelClient *rcc;
UsbRedirState *state;
UsbRedirChannel *redir_chan;
SpiceCharDeviceInstance *sin;
SpiceCharDeviceInterface *sif;
- ChannelCbs channel_cbs;
- state = SPICE_CONTAINEROF(channel, UsbRedirState, channel);
+ redir_chan = SPICE_CONTAINEROF(channel, UsbRedirChannel, base);
+ state = redir_chan->state;
sin = state->chardev_sin;
sif = SPICE_CONTAINEROF(sin->base.sif, SpiceCharDeviceInterface, base);
@@ -199,86 +201,78 @@ static void usbredir_link(Channel *channel, RedClient *client, RedsStream *strea
return;
}
- if (!state->red_channel) {
- memset(&channel_cbs, sizeof(channel_cbs), 0);
- channel_cbs.config_socket = usbredir_red_channel_config_socket;
- channel_cbs.disconnect = usbredir_red_channel_disconnect;
- channel_cbs.send_item = usbredir_red_channel_send_item;
- channel_cbs.hold_item = usbredir_red_channel_hold_pipe_item;
- channel_cbs.release_item = usbredir_red_channel_release_pipe_item;
- channel_cbs.alloc_recv_buf = usbredir_red_channel_alloc_msg_rcv_buf;
- channel_cbs.release_recv_buf = usbredir_red_channel_release_msg_rcv_buf;
- state->red_channel = red_channel_create(sizeof(UsbRedirChannel),
- core, migration, FALSE /* handle_acks */,
- usbredir_red_channel_client_handle_message,
- &channel_cbs);
- }
- if (!state->red_channel) {
- return;
- }
- state->rcc = red_channel_client_create(sizeof(RedChannelClient), state->red_channel,
- client, stream);
- if (!state->rcc) {
- red_printf("failed to create usbredir channel client\n");
+ rcc = red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
+ if (!rcc) {
return;
}
- red_channel_init_outgoing_messages_window(state->red_channel);
- redir_chan = SPICE_CONTAINEROF(state->red_channel, UsbRedirChannel, base);
- redir_chan->state = state;
+ state->rcc = rcc;
+ red_channel_client_ack_zero_messages_window(rcc);
if (sif->state) {
sif->state(sin, 1);
}
}
-static void usbredir_shutdown(Channel *channel)
-{
- UsbRedirState *state = SPICE_CONTAINEROF(channel, UsbRedirState, channel);
-
- usbredir_red_channel_disconnect(state->rcc);
- red_channel_destroy(state->red_channel);
- state->red_channel = NULL;
-}
-
-static void usbredir_migrate(Channel *channel)
+static void usbredir_migrate(RedChannelClient *rcc)
{
/* NOOP */
}
int usbredir_device_connect(SpiceCharDeviceInstance *sin)
{
- UsbRedirState *state;
static int id = 0;
-
+ UsbRedirState *state;
+ UsbRedirChannel *redir_chan;
+ ChannelCbs channel_cbs = {0,};
+ ClientCbs client_cbs = {0,};
+
+ channel_cbs.config_socket = usbredir_red_channel_client_config_socket;
+ channel_cbs.on_disconnect = usbredir_red_channel_client_on_disconnect;
+ channel_cbs.send_item = usbredir_red_channel_send_item;
+ channel_cbs.hold_item = usbredir_red_channel_hold_pipe_item;
+ channel_cbs.release_item = usbredir_red_channel_release_pipe_item;
+ channel_cbs.alloc_recv_buf = usbredir_red_channel_alloc_msg_rcv_buf;
+ channel_cbs.release_recv_buf = usbredir_red_channel_release_msg_rcv_buf;
+
+ redir_chan = (UsbRedirChannel*)red_channel_create(sizeof(UsbRedirChannel),
+ core, SPICE_CHANNEL_USBREDIR, id++,
+ FALSE /* migration - TODO?*/,
+ FALSE /* handle_acks */,
+ usbredir_red_channel_client_handle_message,
+ &channel_cbs);
+ red_channel_init_outgoing_messages_window(&redir_chan->base);
state = spice_new0(UsbRedirState, 1);
- state->channel.type = SPICE_CHANNEL_USBREDIR;
- state->channel.id = id++;
- state->channel.link = usbredir_link;
- state->channel.shutdown = usbredir_shutdown;
- state->channel.migrate = usbredir_migrate;
+ state->channel = &redir_chan->base;
state->chardev_st.wakeup = usbredir_chardev_wakeup;
state->chardev_sin = sin;
state->rcv_buf = spice_malloc(BUF_SIZE);
state->rcv_buf_size = BUF_SIZE;
+ client_cbs.connect = usbredir_connect;
+ client_cbs.migrate = usbredir_migrate;
+ red_channel_register_client_cbs(&redir_chan->base, &client_cbs);
+
sin->st = &state->chardev_st;
- reds_register_channel(&state->channel);
+ reds_register_channel(&redir_chan->base);
return 0;
}
+/* Must be called from RedClient handling thread. */
void usbredir_device_disconnect(SpiceCharDeviceInstance *sin)
{
UsbRedirState *state;
state = SPICE_CONTAINEROF(sin->st, UsbRedirState, chardev_st);
- reds_unregister_channel(&state->channel);
+ reds_unregister_channel(state->channel);
- usbredir_red_channel_disconnect(state->rcc);
+ red_channel_client_destroy(state->rcc);
+ red_channel_disconnect(state->channel);
free(state->pipe_item);
free(state->rcv_buf);
+ free(state->channel);
free(state);
}