summaryrefslogtreecommitdiffstats
path: root/server/red_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/red_worker.c')
-rw-r--r--server/red_worker.c179
1 files changed, 81 insertions, 98 deletions
diff --git a/server/red_worker.c b/server/red_worker.c
index 4347d243..e767623e 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -8606,15 +8606,20 @@ static void red_disconnect_channel(RedChannel *channel)
static void display_channel_client_disconnect(RedChannelClient *rcc)
{
- // TODO: MC: right now we assume single channel
+ red_channel_client_disconnect(rcc);
+}
+
+static void display_channel_client_on_disconnect(RedChannelClient *rcc)
+{
DisplayChannel *display_channel;
DisplayChannelClient *dcc = RCC_TO_DCC(rcc);
CommonChannel *common;
RedWorker *worker;
- if (!rcc || !red_channel_is_connected(rcc->channel)) {
+ if (!rcc) {
return;
}
+ red_printf("");
common = SPICE_CONTAINEROF(rcc->channel, CommonChannel, base);
worker = common->worker;
display_channel = (DisplayChannel *)rcc->channel;
@@ -8629,8 +8634,6 @@ static void display_channel_client_disconnect(RedChannelClient *rcc)
red_display_reset_compress_buf(dcc);
free(dcc->send_data.free_list.res);
red_display_destroy_streams(dcc);
- red_channel_client_pipe_clear(rcc); // do this before deleting surfaces
- red_channel_client_disconnect(rcc);
// this was the last channel client
if (!red_channel_is_connected(rcc->channel)) {
@@ -8654,12 +8657,15 @@ void red_disconnect_all_display_TODO_remove_me(RedChannel *channel)
worker->display_channel = NULL;
}
-static void red_migrate_display(RedWorker *worker)
+static void red_migrate_display(RedWorker *worker, RedChannelClient *rcc)
{
- if (worker->display_channel) {
- red_pipes_add_verb(&worker->display_channel->common.base,
- SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL);
- red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE);
+ // TODO: replace all worker->display_channel tests with
+ // is_connected
+ if (red_channel_client_is_connected(rcc)) {
+ red_pipe_add_verb(rcc, PIPE_ITEM_TYPE_MIGRATE);
+// red_pipes_add_verb(&worker->display_channel->common.base,
+// SPICE_MSG_DISPLAY_STREAM_DESTROY_ALL);
+// red_channel_pipes_add_type(&worker->display_channel->common.base, PIPE_ITEM_TYPE_MIGRATE);
}
}
@@ -8900,7 +8906,7 @@ static inline void flush_display_commands(RedWorker *worker)
for (;;) {
red_channel_push(&worker->display_channel->common.base);
if (!display_is_connected(worker) ||
- red_channel_min_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) {
+ red_channel_max_pipe_size(display_red_channel) <= MAX_PIPE_SIZE) {
break;
}
RedChannel *channel = (RedChannel *)worker->display_channel;
@@ -8961,6 +8967,9 @@ static inline void flush_cursor_commands(RedWorker *worker)
}
}
+// TODO: on timeout, don't disconnect all channeld immeduiatly - try to disconnect the slowest ones first
+// and maybe turn timeouts to several timeouts in order to disconnect channels gradually.
+// Should use disconnect or shutdown?
static inline void flush_all_qxl_commands(RedWorker *worker)
{
flush_display_commands(worker);
@@ -9487,15 +9496,13 @@ static int listen_to_new_client_channel(CommonChannel *common,
return TRUE;
}
-static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, int migrate,
+static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_type, int migrate,
event_listener_action_proc handler,
- channel_disconnect_proc disconnect,
+ channel_disconnect_proc on_disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
channel_release_pipe_item_proc release_item,
channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc on_incoming_error,
- channel_on_outgoing_error_proc on_outgoing_error,
channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
channel_handle_migrate_data_proc handle_migrate_data,
channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
@@ -9505,7 +9512,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
ChannelCbs channel_cbs;
channel_cbs.config_socket = common_channel_config_socket;
- channel_cbs.disconnect = disconnect;
+ channel_cbs.on_disconnect = on_disconnect;
channel_cbs.send_item = send_item;
channel_cbs.hold_item = hold_item;
channel_cbs.release_item = release_item;
@@ -9515,12 +9522,12 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
channel_cbs.handle_migrate_data = handle_migrate_data;
channel_cbs.handle_migrate_data_get_serial = handle_migrate_data_get_serial;
- channel = red_channel_create_parser(size, &worker_core, migrate,
+ channel = red_channel_create_parser(size, &worker_core,
+ channel_type, worker->id,
+ migrate,
TRUE /* handle_acks */,
- spice_get_client_channel_parser(channel_id, NULL),
+ spice_get_client_channel_parser(channel_type, NULL),
handle_parsed,
- on_incoming_error,
- on_outgoing_error,
&channel_cbs);
common = (CommonChannel *)channel;
if (!channel) {
@@ -9682,50 +9689,6 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item,
}
}
-static void display_channel_on_incoming_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void display_channel_on_outgoing_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void cursor_channel_on_incoming_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-static void cursor_channel_on_outgoing_error(RedChannelClient *rcc)
-{
- red_printf("");
- red_channel_client_shutdown(rcc);
-}
-
-// call this from dispatcher thread context
-static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc)
-{
- RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker;
- struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
-
- red_printf("");
- red_dispatcher_disconnect_display_client(dispatcher, rcc);
-}
-
-// call this from dispatcher thread context
-static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc)
-{
- RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker;
- struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
-
- red_printf("");
- red_dispatcher_disconnect_cursor_client(dispatcher, rcc);
-}
-
static void ensure_display_channel_created(RedWorker *worker, int migrate)
{
DisplayChannel *display_channel;
@@ -9739,17 +9702,16 @@ static void ensure_display_channel_created(RedWorker *worker, int migrate)
worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, migrate,
handle_channel_events,
- dispatch_display_channel_client_disconnect,
+ display_channel_client_on_disconnect,
display_channel_send_item,
display_channel_hold_pipe_item,
display_channel_release_item,
display_channel_handle_message,
- display_channel_on_incoming_error,
- display_channel_on_outgoing_error,
display_channel_handle_migrate_mark,
display_channel_handle_migrate_data,
display_channel_handle_migrate_data_get_serial
))) {
+ red_printf("failed to create display channel");
return;
}
display_channel = worker->display_channel;
@@ -9791,7 +9753,7 @@ static void handle_new_display_channel(RedWorker *worker, RedClient *client, Red
if (!dcc) {
return;
}
-
+ red_printf("New display (client %p) dcc %p stream %p", client, dcc, stream);
stream_buf_size = 32*1024;
dcc->send_data.stream_outbuf = spice_malloc(stream_buf_size);
dcc->send_data.stream_outbuf_size = stream_buf_size;
@@ -9836,11 +9798,15 @@ error:
static void cursor_channel_client_disconnect(RedChannelClient *rcc)
{
- if (!red_channel_is_connected(rcc->channel)) {
+ red_channel_client_disconnect(rcc);
+}
+
+static void cursor_channel_client_on_disconnect(RedChannelClient *rcc)
+{
+ if (!rcc) {
return;
}
red_reset_cursor_cache(rcc);
- red_channel_client_disconnect(rcc);
}
static void red_disconnect_cursor(RedChannel *channel)
@@ -9857,13 +9823,14 @@ static void red_disconnect_cursor(RedChannel *channel)
red_disconnect_channel(channel);
}
-static void red_migrate_cursor(RedWorker *worker)
+static void red_migrate_cursor(RedWorker *worker, RedChannelClient *rcc)
{
- if (cursor_is_connected(worker)) {
- red_channel_pipes_add_type(&worker->cursor_channel->common.base,
- PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE);
- red_channel_pipes_add_type(&worker->cursor_channel->common.base,
- PIPE_ITEM_TYPE_MIGRATE);
+// if (cursor_is_connected(worker)) {
+ if (red_channel_client_is_connected(rcc)) {
+ red_channel_client_pipe_add_type(rcc,
+ PIPE_ITEM_TYPE_INVAL_CURSOR_CACHE);
+ red_channel_client_pipe_add_type(rcc,
+ PIPE_ITEM_TYPE_MIGRATE);
}
}
@@ -9950,13 +9917,11 @@ static void ensure_cursor_channel_created(RedWorker *worker, int migrate)
worker, sizeof(*worker->cursor_channel),
SPICE_CHANNEL_CURSOR, migrate,
handle_channel_events,
- dispatch_cursor_channel_client_disconnect,
+ cursor_channel_client_on_disconnect,
cursor_channel_send_item,
cursor_channel_hold_pipe_item,
cursor_channel_release_item,
red_channel_client_handle_message,
- cursor_channel_on_incoming_error,
- cursor_channel_on_outgoing_error,
NULL,
NULL,
NULL);
@@ -10424,6 +10389,10 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
case RED_WORKER_MESSAGE_RESET_IMAGE_CACHE:
case RED_WORKER_MESSAGE_STOP:
case RED_WORKER_MESSAGE_LOADVM_COMMANDS:
+ case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE:
+ case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE:
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
write_ready = 1;
default:
break;
@@ -10476,6 +10445,14 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
case RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE:
handle_dev_destroy_primary_surface(worker);
break;
+ case RED_WORKER_MESSAGE_DISPLAY_CHANNEL_CREATE: {
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_display_channel_created(worker, FALSE);
+ red_channel = &worker->display_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
+ break;
+ }
case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
RedsStream *stream;
RedClient *client;
@@ -10488,20 +10465,15 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
handle_new_display_channel(worker, client, stream, migrate);
break;
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: {
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: {
RedChannelClient *rcc;
red_printf("disconnect display client");
receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
display_channel_client_disconnect(rcc);
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
break;
}
- case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
- red_printf("disconnect");
- red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel);
- break;
case RED_WORKER_MESSAGE_STOP: {
red_printf("stop");
handle_dev_stop(worker);
@@ -10511,10 +10483,22 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_printf("start");
handle_dev_start(worker);
break;
- case RED_WORKER_MESSAGE_DISPLAY_MIGRATE:
- red_printf("migrate");
- red_migrate_display(worker);
+ case RED_WORKER_MESSAGE_DISPLAY_MIGRATE: {
+ RedChannelClient *rcc;
+ red_printf("migrate display client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
+ red_migrate_display(worker, rcc);
+ break;
+ }
+ case RED_WORKER_MESSAGE_CURSOR_CHANNEL_CREATE: {
+ RedChannel *red_channel;
+ // TODO: handle seemless migration. Temp, setting migrate to FALSE
+ ensure_cursor_channel_created(worker, FALSE);
+ red_channel = &worker->cursor_channel->common.base;
+ send_data(worker->channel, &red_channel, sizeof(RedChannel *));
break;
+ }
case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
RedsStream *stream;
RedClient *client;
@@ -10527,24 +10511,23 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
red_connect_cursor(worker, client, stream, migrate);
break;
}
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: {
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: {
RedChannelClient *rcc;
red_printf("disconnect cursor client");
receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
cursor_channel_client_disconnect(rcc);
- message = RED_WORKER_MESSAGE_READY;
- write_message(worker->channel, &message);
break;
}
- case RED_WORKER_MESSAGE_CURSOR_DISCONNECT:
- red_printf("cursor disconnect");
- red_disconnect_cursor((RedChannel *)worker->cursor_channel);
- break;
- case RED_WORKER_MESSAGE_CURSOR_MIGRATE:
- red_printf("cursor migrate");
- red_migrate_cursor(worker);
+ case RED_WORKER_MESSAGE_CURSOR_MIGRATE: {
+ RedChannelClient *rcc;
+ red_printf("migrate cursor client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ ASSERT(rcc);
+ red_migrate_cursor(worker, rcc);
break;
+ }
case RED_WORKER_MESSAGE_SET_COMPRESSION:
receive_data(worker->channel, &worker->image_compression,
sizeof(spice_image_compression_t));