summaryrefslogtreecommitdiffstats
path: root/server/red_worker.c
diff options
context:
space:
mode:
authorAlon Levy <alevy@redhat.com>2011-04-11 12:44:00 +0300
committerAlon Levy <alevy@redhat.com>2011-08-23 17:56:44 +0300
commit448ed75bd6c8db7ca48cab8aa1256a262e87fcc0 (patch)
tree653fa73b47966052cc0cec9184090c0b3349fc89 /server/red_worker.c
parent22084c4703282699a34dfb72f3c6318159ddcedf (diff)
downloadspice-448ed75bd6c8db7ca48cab8aa1256a262e87fcc0.tar.gz
spice-448ed75bd6c8db7ca48cab8aa1256a262e87fcc0.tar.xz
spice-448ed75bd6c8db7ca48cab8aa1256a262e87fcc0.zip
server: Add RedClient
That means RedClient tracks a ring of channels. Right now there will be only a single client because of the disconnection mechanism - whenever a new client comes we disconnect all existing clients. But this patch adds already a ring of clients to reds.c (stored in RedServer). There is a known problem handling many connections and disconnections at the same time, trigerrable easily by the following script: export NEW_DISPLAY=:3.0 Xephyr $NEW_DISPLAY -noreset & for ((i = 0 ; i < 5; ++i)); do for ((j = 0 ; j < 10; ++j)); do DISPLAY=$NEW_DISPLAY c_win7x86_qxl_tests & done sleep 2; done I fixed a few of the problems resulting from this in the same patch. This required already introducing a few other changes: * make sure all removal of channels happens in the main thread, for that two additional dispatcher calls are added to remove a specific channel client (RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT and RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT). * change some asserts in input channel. * make main channel disconnect not recursive * introduce disconnect call back to red_channel_create_parser The remaining abort is from a double free in the main channel, still can't find it (doesn't happen when running under valgrind - probably due to the slowness resulting from that), but is easy to see when running under gdb.
Diffstat (limited to 'server/red_worker.c')
-rw-r--r--server/red_worker.c146
1 files changed, 110 insertions, 36 deletions
diff --git a/server/red_worker.c b/server/red_worker.c
index 6ff84f2e..afde6d73 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -8848,6 +8848,7 @@ static void free_common_channel_from_listener(EventListener *ctx)
free(common);
}
+
static void worker_watch_update_mask(SpiceWatch *watch, int event_mask)
{
}
@@ -8868,13 +8869,15 @@ SpiceCoreInterface worker_core = {
};
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
- RedsStream *stream, int migrate,
+ RedClient *client, RedsStream *stream, int migrate,
event_listener_action_proc handler,
channel_disconnect_proc disconnect,
channel_send_pipe_item_proc send_item,
channel_hold_pipe_item_proc hold_item,
channel_release_pipe_item_proc release_item,
channel_handle_parsed_proc handle_parsed,
+ channel_on_incoming_error_proc on_incoming_error,
+ channel_on_outgoing_error_proc on_outgoing_error,
channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
channel_handle_migrate_data_proc handle_migrate_data,
channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial)
@@ -8886,6 +8889,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
channel = red_channel_create_parser(size, &worker_core, migrate,
TRUE /* handle_acks */,
common_channel_config_socket,
+ disconnect,
spice_get_client_channel_parser(channel_id, NULL),
handle_parsed,
common_alloc_recv_buf,
@@ -8893,8 +8897,8 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
hold_item,
send_item,
release_item,
- red_channel_client_default_peer_on_error,
- red_channel_client_default_peer_on_error,
+ on_incoming_error,
+ on_outgoing_error,
handle_migrate_flush_mark,
handle_migrate_data,
handle_migrate_data_get_serial);
@@ -8902,7 +8906,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
if (!channel) {
goto error;
}
- red_channel_client_create(sizeof(RedChannelClient), channel, stream);
+ red_channel_client_create(sizeof(RedChannelClient), channel, client, stream);
common->id = worker->id;
common->listener.refs = 1;
common->listener.action = handler;
@@ -9058,25 +9062,72 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item,
}
}
-static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, int migrate)
+static void display_channel_on_incoming_error(RedChannelClient *rcc)
+{
+ red_printf("");
+ red_channel_client_shutdown(rcc);
+}
+
+static void display_channel_on_outgoing_error(RedChannelClient *rcc)
+{
+ red_printf("");
+ red_channel_client_shutdown(rcc);
+}
+
+static void cursor_channel_on_incoming_error(RedChannelClient *rcc)
+{
+ red_printf("");
+ red_channel_client_shutdown(rcc);
+}
+
+static void cursor_channel_on_outgoing_error(RedChannelClient *rcc)
+{
+ red_printf("");
+ red_channel_client_shutdown(rcc);
+}
+
+// call this from dispatcher thread context
+static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc)
+{
+ RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker;
+ struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
+
+ red_printf("");
+ red_dispatcher_disconnect_display_client(dispatcher, rcc);
+}
+
+// call this from dispatcher thread context
+static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc)
+{
+ RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker;
+ struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher;
+
+ red_printf("");
+ red_dispatcher_disconnect_cursor_client(dispatcher, rcc);
+}
+
+static void handle_new_display_channel(RedWorker *worker, RedClient *client, RedsStream *stream,
+ int migrate)
{
DisplayChannel *display_channel;
size_t stream_buf_size;
red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel);
- if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel),
- SPICE_CHANNEL_DISPLAY, stream,
- migrate, handle_channel_events,
- red_disconnect_display,
- display_channel_send_item,
- display_channel_hold_pipe_item,
- display_channel_release_item,
- display_channel_handle_message,
- display_channel_handle_migrate_mark,
- display_channel_handle_migrate_data,
- display_channel_handle_migrate_data_get_serial
- ))) {
+ if (!(display_channel = (DisplayChannel *)__new_channel(
+ worker, sizeof(*display_channel),
+ SPICE_CHANNEL_DISPLAY, client, stream,
+ migrate, handle_channel_events,
+ dispatch_display_channel_client_disconnect,
+ display_channel_send_item,
+ display_channel_hold_pipe_item,
+ display_channel_release_item,
+ display_channel_handle_message,
+ display_channel_on_incoming_error,
+ display_channel_on_outgoing_error,
+ display_channel_handle_migrate_mark,
+ display_channel_handle_migrate_data,
+ display_channel_handle_migrate_data_get_serial))) {
return;
}
#ifdef RED_STATISTICS
@@ -9140,11 +9191,6 @@ static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, in
stat_compress_init(&display_channel->jpeg_alpha_stat, jpeg_alpha_stat_name);
}
-static void red_disconnect_cursor_client(RedChannelClient *rcc)
-{
- red_disconnect_cursor(rcc->channel);
-}
-
static void red_disconnect_cursor(RedChannel *channel)
{
CommonChannel *common;
@@ -9228,23 +9274,27 @@ static void cursor_channel_release_item(RedChannelClient *rcc, PipeItem *item, i
}
}
-static void red_connect_cursor(RedWorker *worker, RedsStream *stream, int migrate)
+static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream *stream,
+ int migrate)
{
CursorChannel *channel;
red_disconnect_cursor((RedChannel *)worker->cursor_channel);
- if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel),
- SPICE_CHANNEL_CURSOR, stream, migrate,
- handle_channel_events,
- red_disconnect_cursor_client,
- cursor_channel_send_item,
- cursor_channel_hold_pipe_item,
- cursor_channel_release_item,
- red_channel_client_handle_message,
- NULL,
- NULL,
- NULL))) {
+ if (!(channel = (CursorChannel *)__new_channel(
+ worker, sizeof(*channel),
+ SPICE_CHANNEL_CURSOR, client, stream, migrate,
+ handle_channel_events,
+ dispatch_cursor_channel_client_disconnect,
+ cursor_channel_send_item,
+ cursor_channel_hold_pipe_item,
+ cursor_channel_release_item,
+ red_channel_client_handle_message,
+ cursor_channel_on_incoming_error,
+ cursor_channel_on_outgoing_error,
+ NULL,
+ NULL,
+ NULL))) {
return;
}
#ifdef RED_STATISTICS
@@ -9722,12 +9772,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
break;
case RED_WORKER_MESSAGE_DISPLAY_CONNECT: {
RedsStream *stream;
+ RedClient *client;
int migrate;
red_printf("connect");
+ receive_data(worker->channel, &client, sizeof(RedClient *));
receive_data(worker->channel, &stream, sizeof(RedsStream *));
receive_data(worker->channel, &migrate, sizeof(int));
- handle_new_display_channel(worker, stream, migrate);
+ handle_new_display_channel(worker, client, stream, migrate);
+ break;
+ }
+ case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: {
+ RedChannelClient *rcc;
+
+ red_printf("disconnect display client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ red_disconnect_display(rcc);
+ message = RED_WORKER_MESSAGE_READY;
+ write_message(worker->channel, &message);
break;
}
case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT:
@@ -9749,12 +9811,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events)
break;
case RED_WORKER_MESSAGE_CURSOR_CONNECT: {
RedsStream *stream;
+ RedClient *client;
int migrate;
red_printf("cursor connect");
+ receive_data(worker->channel, &client, sizeof(RedClient *));
receive_data(worker->channel, &stream, sizeof(RedsStream *));
receive_data(worker->channel, &migrate, sizeof(int));
- red_connect_cursor(worker, stream, migrate);
+ red_connect_cursor(worker, client, stream, migrate);
+ break;
+ }
+ case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: {
+ RedChannelClient *rcc;
+
+ red_printf("disconnect cursor client");
+ receive_data(worker->channel, &rcc, sizeof(RedChannelClient *));
+ red_disconnect_cursor(rcc->channel); /* TODO - assumes a single client */
+ message = RED_WORKER_MESSAGE_READY;
+ write_message(worker->channel, &message);
break;
}
case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: