diff options
author | Alon Levy <alevy@redhat.com> | 2011-04-11 12:44:00 +0300 |
---|---|---|
committer | Alon Levy <alevy@redhat.com> | 2011-08-23 17:56:44 +0300 |
commit | 448ed75bd6c8db7ca48cab8aa1256a262e87fcc0 (patch) | |
tree | 653fa73b47966052cc0cec9184090c0b3349fc89 /server/red_worker.c | |
parent | 22084c4703282699a34dfb72f3c6318159ddcedf (diff) | |
download | spice-448ed75bd6c8db7ca48cab8aa1256a262e87fcc0.tar.gz spice-448ed75bd6c8db7ca48cab8aa1256a262e87fcc0.tar.xz spice-448ed75bd6c8db7ca48cab8aa1256a262e87fcc0.zip |
server: Add RedClient
That means RedClient tracks a ring of channels. Right now there will be only
a single client because of the disconnection mechanism - whenever a new
client comes we disconnect all existing clients. But this patch adds already
a ring of clients to reds.c (stored in RedServer).
There is a known problem handling many connections and disconnections at the
same time, trigerrable easily by the following script:
export NEW_DISPLAY=:3.0
Xephyr $NEW_DISPLAY -noreset &
for ((i = 0 ; i < 5; ++i)); do
for ((j = 0 ; j < 10; ++j)); do
DISPLAY=$NEW_DISPLAY c_win7x86_qxl_tests &
done
sleep 2;
done
I fixed a few of the problems resulting from this in the same patch. This
required already introducing a few other changes:
* make sure all removal of channels happens in the main thread, for that
two additional dispatcher calls are added to remove a specific channel
client (RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT and
RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT).
* change some asserts in input channel.
* make main channel disconnect not recursive
* introduce disconnect call back to red_channel_create_parser
The remaining abort is from a double free in the main channel, still can't
find it (doesn't happen when running under valgrind - probably due to the
slowness resulting from that), but is easy to see when running under gdb.
Diffstat (limited to 'server/red_worker.c')
-rw-r--r-- | server/red_worker.c | 146 |
1 files changed, 110 insertions, 36 deletions
diff --git a/server/red_worker.c b/server/red_worker.c index 6ff84f2e..afde6d73 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -8848,6 +8848,7 @@ static void free_common_channel_from_listener(EventListener *ctx) free(common); } + static void worker_watch_update_mask(SpiceWatch *watch, int event_mask) { } @@ -8868,13 +8869,15 @@ SpiceCoreInterface worker_core = { }; static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, - RedsStream *stream, int migrate, + RedClient *client, RedsStream *stream, int migrate, event_listener_action_proc handler, channel_disconnect_proc disconnect, channel_send_pipe_item_proc send_item, channel_hold_pipe_item_proc hold_item, channel_release_pipe_item_proc release_item, channel_handle_parsed_proc handle_parsed, + channel_on_incoming_error_proc on_incoming_error, + channel_on_outgoing_error_proc on_outgoing_error, channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark, channel_handle_migrate_data_proc handle_migrate_data, channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial) @@ -8886,6 +8889,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i channel = red_channel_create_parser(size, &worker_core, migrate, TRUE /* handle_acks */, common_channel_config_socket, + disconnect, spice_get_client_channel_parser(channel_id, NULL), handle_parsed, common_alloc_recv_buf, @@ -8893,8 +8897,8 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i hold_item, send_item, release_item, - red_channel_client_default_peer_on_error, - red_channel_client_default_peer_on_error, + on_incoming_error, + on_outgoing_error, handle_migrate_flush_mark, handle_migrate_data, handle_migrate_data_get_serial); @@ -8902,7 +8906,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i if (!channel) { goto error; } - red_channel_client_create(sizeof(RedChannelClient), channel, stream); + red_channel_client_create(sizeof(RedChannelClient), channel, client, stream); common->id = worker->id; common->listener.refs = 1; common->listener.action = handler; @@ -9058,25 +9062,72 @@ static void display_channel_release_item(RedChannelClient *rcc, PipeItem *item, } } -static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, int migrate) +static void display_channel_on_incoming_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +static void display_channel_on_outgoing_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +static void cursor_channel_on_incoming_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +static void cursor_channel_on_outgoing_error(RedChannelClient *rcc) +{ + red_printf(""); + red_channel_client_shutdown(rcc); +} + +// call this from dispatcher thread context +static void dispatch_display_channel_client_disconnect(RedChannelClient *rcc) +{ + RedWorker *worker = ((DisplayChannel*)rcc->channel)->common.worker; + struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher; + + red_printf(""); + red_dispatcher_disconnect_display_client(dispatcher, rcc); +} + +// call this from dispatcher thread context +static void dispatch_cursor_channel_client_disconnect(RedChannelClient *rcc) +{ + RedWorker *worker = ((CursorChannel*)rcc->channel)->common.worker; + struct RedDispatcher *dispatcher = worker->qxl->st->dispatcher; + + red_printf(""); + red_dispatcher_disconnect_cursor_client(dispatcher, rcc); +} + +static void handle_new_display_channel(RedWorker *worker, RedClient *client, RedsStream *stream, + int migrate) { DisplayChannel *display_channel; size_t stream_buf_size; red_disconnect_all_display_TODO_remove_me((RedChannel *)worker->display_channel); - if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel), - SPICE_CHANNEL_DISPLAY, stream, - migrate, handle_channel_events, - red_disconnect_display, - display_channel_send_item, - display_channel_hold_pipe_item, - display_channel_release_item, - display_channel_handle_message, - display_channel_handle_migrate_mark, - display_channel_handle_migrate_data, - display_channel_handle_migrate_data_get_serial - ))) { + if (!(display_channel = (DisplayChannel *)__new_channel( + worker, sizeof(*display_channel), + SPICE_CHANNEL_DISPLAY, client, stream, + migrate, handle_channel_events, + dispatch_display_channel_client_disconnect, + display_channel_send_item, + display_channel_hold_pipe_item, + display_channel_release_item, + display_channel_handle_message, + display_channel_on_incoming_error, + display_channel_on_outgoing_error, + display_channel_handle_migrate_mark, + display_channel_handle_migrate_data, + display_channel_handle_migrate_data_get_serial))) { return; } #ifdef RED_STATISTICS @@ -9140,11 +9191,6 @@ static void handle_new_display_channel(RedWorker *worker, RedsStream *stream, in stat_compress_init(&display_channel->jpeg_alpha_stat, jpeg_alpha_stat_name); } -static void red_disconnect_cursor_client(RedChannelClient *rcc) -{ - red_disconnect_cursor(rcc->channel); -} - static void red_disconnect_cursor(RedChannel *channel) { CommonChannel *common; @@ -9228,23 +9274,27 @@ static void cursor_channel_release_item(RedChannelClient *rcc, PipeItem *item, i } } -static void red_connect_cursor(RedWorker *worker, RedsStream *stream, int migrate) +static void red_connect_cursor(RedWorker *worker, RedClient *client, RedsStream *stream, + int migrate) { CursorChannel *channel; red_disconnect_cursor((RedChannel *)worker->cursor_channel); - if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel), - SPICE_CHANNEL_CURSOR, stream, migrate, - handle_channel_events, - red_disconnect_cursor_client, - cursor_channel_send_item, - cursor_channel_hold_pipe_item, - cursor_channel_release_item, - red_channel_client_handle_message, - NULL, - NULL, - NULL))) { + if (!(channel = (CursorChannel *)__new_channel( + worker, sizeof(*channel), + SPICE_CHANNEL_CURSOR, client, stream, migrate, + handle_channel_events, + dispatch_cursor_channel_client_disconnect, + cursor_channel_send_item, + cursor_channel_hold_pipe_item, + cursor_channel_release_item, + red_channel_client_handle_message, + cursor_channel_on_incoming_error, + cursor_channel_on_outgoing_error, + NULL, + NULL, + NULL))) { return; } #ifdef RED_STATISTICS @@ -9722,12 +9772,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events) break; case RED_WORKER_MESSAGE_DISPLAY_CONNECT: { RedsStream *stream; + RedClient *client; int migrate; red_printf("connect"); + receive_data(worker->channel, &client, sizeof(RedClient *)); receive_data(worker->channel, &stream, sizeof(RedsStream *)); receive_data(worker->channel, &migrate, sizeof(int)); - handle_new_display_channel(worker, stream, migrate); + handle_new_display_channel(worker, client, stream, migrate); + break; + } + case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT_CLIENT: { + RedChannelClient *rcc; + + red_printf("disconnect display client"); + receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + red_disconnect_display(rcc); + message = RED_WORKER_MESSAGE_READY; + write_message(worker->channel, &message); break; } case RED_WORKER_MESSAGE_DISPLAY_DISCONNECT: @@ -9749,12 +9811,24 @@ static void handle_dev_input(EventListener *listener, uint32_t events) break; case RED_WORKER_MESSAGE_CURSOR_CONNECT: { RedsStream *stream; + RedClient *client; int migrate; red_printf("cursor connect"); + receive_data(worker->channel, &client, sizeof(RedClient *)); receive_data(worker->channel, &stream, sizeof(RedsStream *)); receive_data(worker->channel, &migrate, sizeof(int)); - red_connect_cursor(worker, stream, migrate); + red_connect_cursor(worker, client, stream, migrate); + break; + } + case RED_WORKER_MESSAGE_CURSOR_DISCONNECT_CLIENT: { + RedChannelClient *rcc; + + red_printf("disconnect cursor client"); + receive_data(worker->channel, &rcc, sizeof(RedChannelClient *)); + red_disconnect_cursor(rcc->channel); /* TODO - assumes a single client */ + message = RED_WORKER_MESSAGE_READY; + write_message(worker->channel, &message); break; } case RED_WORKER_MESSAGE_CURSOR_DISCONNECT: |