summaryrefslogtreecommitdiffstats
path: root/server/red_channel.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/red_channel.c')
-rw-r--r--server/red_channel.c278
1 files changed, 138 insertions, 140 deletions
diff --git a/server/red_channel.c b/server/red_channel.c
index 80aa667c..ae333aa7 100644
--- a/server/red_channel.c
+++ b/server/red_channel.c
@@ -32,10 +32,12 @@
#include "ring.h"
#include "stat.h"
#include "red_channel.h"
+#include "reds.h"
#include "generated_marshallers.h"
static void red_channel_client_event(int fd, int event, void *data);
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc);
+static void red_client_remove_channel(RedChannelClient *rcc);
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
@@ -138,10 +140,6 @@ static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handle
ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
handler->msg);
}
- if (handler->shut) {
- handler->cb->on_error(handler->opaque);
- return;
- }
handler->msg_pos = 0;
handler->msg = NULL;
handler->header_pos = 0;
@@ -221,22 +219,9 @@ static void red_channel_client_on_output(void *opaque, int n)
stat_inc_counter(rcc->channel->out_bytes_counter, n);
}
-void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
-{
- rcc->channel->channel_cbs.disconnect(rcc);
-}
-
-static void red_channel_peer_on_incoming_error(RedChannelClient *rcc)
-{
- if (rcc->stream->shutdown) {
- return; // assume error has already been handled which caused the shutdown.
- }
- rcc->channel->on_incoming_error(rcc);
-}
-
-static void red_channel_peer_on_outgoing_error(RedChannelClient *rcc)
+static void red_channel_client_default_peer_on_error(RedChannelClient *rcc)
{
- rcc->channel->on_outgoing_error(rcc);
+ red_channel_client_disconnect(rcc);
}
static int red_channel_client_peer_get_out_msg_size(void *opaque)
@@ -417,6 +402,21 @@ error:
return NULL;
}
+
+RedChannelClient *red_channel_client_create_dummy(int size,
+ RedChannel *channel,
+ RedClient *client)
+{
+ RedChannelClient *rcc;
+
+ ASSERT(size >= sizeof(RedChannelClient));
+ rcc = spice_malloc0(size);
+ rcc->client = client;
+ rcc->channel = channel;
+ red_channel_add_client(channel, rcc);
+ return rcc;
+}
+
static void red_channel_client_default_connect(RedChannel *channel, RedClient *client,
RedsStream *stream,
int migration,
@@ -437,6 +437,7 @@ static void red_channel_client_default_migrate(RedChannelClient *base)
RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
channel_handle_message_proc handle_message,
ChannelCbs *channel_cbs)
@@ -445,11 +446,13 @@ RedChannel *red_channel_create(int size,
ClientCbs client_cbs;
ASSERT(size >= sizeof(*channel));
- ASSERT(channel_cbs->config_socket && channel_cbs->disconnect && handle_message &&
+ ASSERT(channel_cbs->config_socket && channel_cbs->on_disconnect && handle_message &&
channel_cbs->alloc_recv_buf && channel_cbs->release_item);
channel = spice_malloc0(size);
+ channel->type = type;
+ channel->id = id;
channel->handle_acks = handle_acks;
- channel->channel_cbs.disconnect = channel_cbs->disconnect;
+ channel->channel_cbs.on_disconnect = channel_cbs->on_disconnect;
channel->channel_cbs.send_item = channel_cbs->send_item;
channel->channel_cbs.release_item = channel_cbs->release_item;
channel->channel_cbs.hold_item = channel_cbs->hold_item;
@@ -484,8 +487,53 @@ RedChannel *red_channel_create(int size,
channel->thread_id = pthread_self();
- channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
channel->out_bytes_counter = 0;
+
+ return channel;
+}
+
+// TODO: red_worker can use this one
+static void dummy_watch_update_mask(SpiceWatch *watch, int event_mask)
+{
+}
+
+static SpiceWatch *dummy_watch_add(int fd, int event_mask, SpiceWatchFunc func, void *opaque)
+{
+ return NULL; // apparently allowed?
+}
+
+static void dummy_watch_remove(SpiceWatch *watch)
+{
+}
+
+// TODO: actually, since I also use channel_client_dummym, no need for core. Can be NULL
+SpiceCoreInterface dummy_core = {
+ .watch_update_mask = dummy_watch_update_mask,
+ .watch_add = dummy_watch_add,
+ .watch_remove = dummy_watch_remove,
+};
+
+RedChannel *red_channel_create_dummy(int size, uint32_t type, uint32_t id)
+{
+ RedChannel *channel;
+ ClientCbs client_cbs;
+
+ ASSERT(size >= sizeof(*channel));
+ channel = spice_malloc0(size);
+ channel->type = type;
+ channel->id = id;
+ channel->core = &dummy_core;
+ ring_init(&channel->clients);
+ client_cbs.connect = red_channel_client_default_connect;
+ client_cbs.disconnect = red_channel_client_default_disconnect;
+ client_cbs.migrate = red_channel_client_default_migrate;
+
+ red_channel_register_client_cbs(channel, &client_cbs);
+
+ channel->thread_id = pthread_self();
+
+ channel->out_bytes_counter = 0;
+
return channel;
}
@@ -496,26 +544,22 @@ static int do_nothing_handle_message(RedChannelClient *rcc, SpiceDataHeader *hea
RedChannel *red_channel_create_parser(int size,
SpiceCoreInterface *core,
+ uint32_t type, uint32_t id,
int migrate, int handle_acks,
spice_parse_channel_func_t parser,
- channel_handle_parsed_proc handle_parsed,
- channel_on_incoming_error_proc incoming_error,
- channel_on_outgoing_error_proc outgoing_error,
+ channel_handle_parsed_proc handle_parsed,
ChannelCbs *channel_cbs)
{
- RedChannel *channel = red_channel_create(size,
- core, migrate, handle_acks, do_nothing_handle_message,
- channel_cbs);
+ RedChannel *channel = red_channel_create(size, core, type, id,
+ migrate, handle_acks,
+ do_nothing_handle_message,
+ channel_cbs);
if (channel == NULL) {
return NULL;
}
channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
channel->incoming_cb.parser = parser;
- channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
- channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
- channel->on_incoming_error = incoming_error;
- channel->on_outgoing_error = outgoing_error;
return channel;
}
@@ -533,10 +577,27 @@ void red_channel_register_client_cbs(RedChannel *channel, ClientCbs *client_cbs)
}
}
+void red_channel_set_caps(RedChannel *channel, int num_caps, uint32_t *caps)
+{
+ channel->num_caps = num_caps;
+ channel->caps = caps;
+}
+
+void red_channel_set_data(RedChannel *channel, void *data)
+{
+ ASSERT(channel);
+ channel->data = data;
+}
+
void red_channel_client_destroy(RedChannelClient *rcc)
{
- red_channel_client_disconnect(rcc);
- spice_marshaller_destroy(rcc->send_data.marshaller);
+ if (red_channel_client_is_connected(rcc)) {
+ red_channel_client_disconnect(rcc);
+ }
+ red_client_remove_channel(rcc);
+ if (rcc->send_data.marshaller) {
+ spice_marshaller_destroy(rcc->send_data.marshaller);
+ }
free(rcc);
}
@@ -548,11 +609,13 @@ void red_channel_destroy(RedChannel *channel)
if (!channel) {
return;
}
- red_channel_pipes_clear(channel);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_destroy(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
+ if (channel->caps) {
+ free(channel->caps);
+ }
free(channel);
}
@@ -563,21 +626,7 @@ void red_channel_client_shutdown(RedChannelClient *rcc)
rcc->stream->watch = NULL;
shutdown(rcc->stream->socket, SHUT_RDWR);
rcc->stream->shutdown = TRUE;
- rcc->incoming.shut = TRUE;
}
- red_channel_client_release_sent_item(rcc);
-}
-
-void red_channel_shutdown(RedChannel *channel)
-{
- RingItem *link;
- RingItem *next;
-
- red_printf("%d", channel->clients_num);
- RING_FOREACH_SAFE(link, next, &channel->clients) {
- red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
- }
- red_channel_pipes_clear(channel);
}
void red_channel_client_send(RedChannelClient *rcc)
@@ -650,11 +699,7 @@ void red_channel_push(RedChannel *channel)
}
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- if (rcc->stream == NULL) {
- rcc->channel->channel_cbs.disconnect(rcc);
- } else {
- red_channel_client_push(rcc);
- }
+ red_channel_client_push(rcc);
}
}
@@ -861,25 +906,9 @@ int red_channel_client_is_connected(RedChannelClient *rcc)
return rcc->stream != NULL;
}
-void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
-{
- ring_remove(&item->link);
-}
-
int red_channel_is_connected(RedChannel *channel)
{
- RingItem *link;
-
- if (!channel || channel->clients_num == 0) {
- return FALSE;
- }
- RING_FOREACH(link, &channel->clients) {
- if (red_channel_client_is_connected(
- SPICE_CONTAINEROF(link, RedChannelClient, channel_link))) {
- return TRUE;
- }
- }
- return FALSE;
+ return channel && (channel->clients_num > 0);
}
void red_channel_client_clear_sent_item(RedChannelClient *rcc)
@@ -906,21 +935,6 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc)
rcc->pipe_size = 0;
}
-void red_channel_pipes_clear(RedChannel *channel)
-{
- RingItem *link;
- RingItem *next;
- RedChannelClient *rcc;
-
- if (!channel) {
- return;
- }
- RING_FOREACH_SAFE(link, next, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- red_channel_client_pipe_clear(rcc);
- }
-}
-
void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
{
rcc->ack_data.messages_window = 0;
@@ -931,27 +945,37 @@ void red_channel_client_ack_set_client_window(RedChannelClient *rcc, int client_
rcc->ack_data.client_window = client_window;
}
-static void red_channel_client_remove(RedChannelClient *rcc)
+
+static void red_channel_remove_client(RedChannelClient *rcc)
{
- ring_remove(&rcc->client_link);
- rcc->client->channels_num--;
+ ASSERT(pthread_equal(pthread_self(), rcc->channel->thread_id));
ring_remove(&rcc->channel_link);
rcc->channel->clients_num--;
+ // TODO: should we set rcc->channel to NULL???
}
-void red_channel_client_disconnect(RedChannelClient *rcc)
+static void red_client_remove_channel(RedChannelClient *rcc)
{
- red_printf("%p (channel %p)", rcc, rcc->channel);
+ pthread_mutex_lock(&rcc->client->lock);
+ ring_remove(&rcc->client_link);
+ rcc->client->channels_num--;
+ pthread_mutex_unlock(&rcc->client->lock);
+}
- if (rcc->send_data.item) {
- rcc->channel->channel_cbs.release_item(rcc, rcc->send_data.item, FALSE);
+void red_channel_client_disconnect(RedChannelClient *rcc)
+{
+ red_printf("%p (channel %p type %d id %d)", rcc, rcc->channel,
+ rcc->channel->type, rcc->channel->id);
+ if (!red_channel_client_is_connected(rcc)) {
+ return;
}
red_channel_client_pipe_clear(rcc);
reds_stream_free(rcc->stream);
- rcc->send_data.item = NULL;
- rcc->send_data.blocked = FALSE;
- rcc->send_data.size = 0;
- red_channel_client_remove(rcc);
+ rcc->stream = NULL;
+ red_channel_remove_client(rcc);
+ // TODO: not do it till destroyed?
+// red_channel_client_remove(rcc);
+ rcc->channel->channel_cbs.on_disconnect(rcc);
}
void red_channel_disconnect(RedChannel *channel)
@@ -959,27 +983,12 @@ void red_channel_disconnect(RedChannel *channel)
RingItem *link;
RingItem *next;
- red_channel_pipes_clear(channel);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_disconnect(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
}
-int red_channel_all_clients_serials_are_zero(RedChannel *channel)
-{
- RingItem *link;
- RedChannelClient *rcc;
-
- RING_FOREACH(link, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- if (rcc->send_data.serial != 0) {
- return FALSE;
- }
- }
- return TRUE;
-}
-
void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb)
{
RingItem *link;
@@ -1004,17 +1013,6 @@ void red_channel_apply_clients_data(RedChannel *channel, channel_client_callback
}
}
-void red_channel_set_shut(RedChannel *channel)
-{
- RingItem *link;
- RedChannelClient *rcc;
-
- RING_FOREACH(link, &channel->clients) {
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
- rcc->incoming.shut = TRUE;
- }
-}
-
int red_channel_all_blocked(RedChannel *channel)
{
RingItem *link;
@@ -1143,18 +1141,24 @@ RedClient *red_client_new()
client = spice_malloc0(sizeof(RedClient));
ring_init(&client->channels);
+ pthread_mutex_init(&client->lock, NULL);
client->thread_id = pthread_self();
return client;
}
-void red_client_shutdown(RedClient *client)
+void red_client_migrate(RedClient *client)
{
RingItem *link, *next;
+ RedChannelClient *rcc;
- red_printf("#channels %d", client->channels_num);
+ red_printf("migrate client with #channels %d", client->channels_num);
+ ASSERT(pthread_equal(pthread_self(), client->thread_id));
RING_FOREACH_SAFE(link, next, &client->channels) {
- red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, client_link));
+ rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
+ if (red_channel_client_is_connected(rcc)) {
+ rcc->channel->client_cbs.migrate(rcc);
+ }
}
}
@@ -1175,29 +1179,23 @@ void red_client_destroy(RedClient *client)
// TODO: should we go back to async. For this we need to use
// ref count for channel clients.
rcc->channel->client_cbs.disconnect(rcc);
+ ASSERT(ring_is_empty(&rcc->pipe));
+ ASSERT(rcc->pipe_size == 0);
+ ASSERT(rcc->send_data.size == 0);
+ red_channel_client_destroy(rcc);
}
- free(client);
-}
-void red_client_disconnect(RedClient *client)
-{
- RingItem *link, *next;
- RedChannelClient *rcc;
-
- red_printf("#channels %d", client->channels_num);
- RING_FOREACH_SAFE(link, next, &client->channels) {
- // some channels may be in other threads, so disconnection
- // is not synchronous.
- rcc = SPICE_CONTAINEROF(link, RedChannelClient, client_link);
- rcc->channel->client_cbs.disconnect(rcc);
- }
+ pthread_mutex_destroy(&client->lock);
+ free(client);
}
static void red_client_add_channel(RedClient *client, RedChannelClient *rcc)
{
ASSERT(rcc && client);
+ pthread_mutex_lock(&client->lock);
ring_add(&client->channels, &rcc->client_link);
client->channels_num++;
+ pthread_mutex_unlock(&client->lock);
}
MainChannelClient *red_client_get_main(RedClient *client) {