From 816b3f95288e0c8e3a06dd653cb2e82a88c94647 Mon Sep 17 00:00:00 2001 From: Jonathon Jongsma Date: Fri, 20 Feb 2015 17:13:36 -0600 Subject: Convert Dispatcher and MainDispatcher to GObjects Allows more explicit inheritance relationship, and numerous other advantages. --- server/dispatcher.c | 242 ++++++++++++++++++++++++++++++++++++----------- server/dispatcher.h | 56 ++++++----- server/main-dispatcher.c | 151 +++++++++++++++++++++++------ server/main-dispatcher.h | 27 +++++- server/red-dispatcher.c | 76 +++++++-------- server/red-dispatcher.h | 3 +- 6 files changed, 405 insertions(+), 150 deletions(-) diff --git a/server/dispatcher.c b/server/dispatcher.c index cc34d2d2..7673e9ac 100644 --- a/server/dispatcher.c +++ b/server/dispatcher.c @@ -1,6 +1,5 @@ -/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* - Copyright (C) 2009-2012 Red Hat, Inc. + Copyright (C) 2009-2015 Red Hat, Inc. This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -39,6 +38,154 @@ #include #endif +G_DEFINE_TYPE(Dispatcher, dispatcher, G_TYPE_OBJECT) + +#define DISPATCHER_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TYPE_DISPATCHER, DispatcherPrivate)) + +struct _DispatcherPrivate { + SpiceCoreInterface *recv_core; + int recv_fd; + int send_fd; + pthread_t self; + pthread_mutex_t lock; + DispatcherMessage *messages; + int stage; /* message parser stage - sender has no stages */ + size_t max_message_type; + void *payload; /* allocated as max of message sizes */ + size_t payload_size; /* used to track realloc calls */ + void *opaque; + dispatcher_handle_async_done handle_async_done; + dispatcher_handle_message extra_handler; +}; + +enum { + PROP_0, + PROP_MAX_MESSAGE_TYPE, + PROP_OPAQUE +}; + +static void +dispatcher_get_property(GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + Dispatcher *self = DISPATCHER(object); + + switch (property_id) + { + case PROP_MAX_MESSAGE_TYPE: + g_value_set_uint(value, self->priv->max_message_type); + break; + case PROP_OPAQUE: + g_value_set_pointer(value, self->priv->opaque); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); + } +} + +static void +dispatcher_set_property(GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + Dispatcher *self = DISPATCHER(object); + + switch (property_id) + { + case PROP_MAX_MESSAGE_TYPE: + self->priv->max_message_type = g_value_get_uint(value); + break; + case PROP_OPAQUE: + dispatcher_set_opaque(self, g_value_get_pointer(value)); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); + } +} + +static void +dispatcher_finalize(GObject *object) +{ + Dispatcher *self = DISPATCHER(object); + g_free(self->priv->messages); + close(self->priv->send_fd); + close(self->priv->recv_fd); + free(self->priv->payload); + G_OBJECT_CLASS(dispatcher_parent_class)->finalize(object); +} + +static void dispatcher_constructed(GObject *object) +{ + Dispatcher *self = DISPATCHER(object); + int channels[2]; + +#ifdef DEBUG_DISPATCHER + setup_dummy_signal_handler(); +#endif + if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { + spice_error("socketpair failed %s", strerror(errno)); + return; + } + pthread_mutex_init(&self->priv->lock, NULL); + self->priv->recv_fd = channels[0]; + self->priv->send_fd = channels[1]; + self->priv->self = pthread_self(); + + self->priv->messages = g_new0(DispatcherMessage, + self->priv->max_message_type); +} + +static void +dispatcher_class_init(DispatcherClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS(klass); + + g_type_class_add_private(klass, sizeof (DispatcherPrivate)); + + object_class->get_property = dispatcher_get_property; + object_class->set_property = dispatcher_set_property; + object_class->constructed = dispatcher_constructed; + object_class->finalize = dispatcher_finalize; + + g_object_class_install_property(object_class, + PROP_MAX_MESSAGE_TYPE, + g_param_spec_uint("max-message-type", + "Maximum message type", + "Maximum message type", + 0, G_MAXUINT, 0, + G_PARAM_STATIC_STRINGS | + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + g_object_class_install_property(object_class, + PROP_OPAQUE, + g_param_spec_pointer("opaque", + "opaque", + "User data to pass to callbacks", + G_PARAM_STATIC_STRINGS | + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT)); + +} + +static void +dispatcher_init(Dispatcher *self) +{ + self->priv = DISPATCHER_PRIVATE(self); +} + +Dispatcher * +dispatcher_new(size_t max_message_type, void *opaque) +{ + return g_object_new(TYPE_DISPATCHER, + "max-message-type", max_message_type, + "opaque", opaque, + NULL); +} + + #define ACK 0xffffffff /* @@ -118,10 +265,10 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher) int ret; uint32_t type; DispatcherMessage *msg = NULL; - uint8_t *payload = dispatcher->payload; + uint8_t *payload = dispatcher->priv->payload; uint32_t ack = ACK; - if ((ret = read_safe(dispatcher->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) { + if ((ret = read_safe(dispatcher->priv->recv_fd, (uint8_t*)&type, sizeof(type), 0)) == -1) { spice_printerr("error reading from dispatcher: %d", errno); return 0; } @@ -129,28 +276,28 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher) /* no messsage */ return 0; } - msg = &dispatcher->messages[type]; - if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) { + msg = &dispatcher->priv->messages[type]; + if (read_safe(dispatcher->priv->recv_fd, payload, msg->size, 1) == -1) { spice_printerr("error reading from dispatcher: %d", errno); /* TODO: close socketpair? */ return 0; } - if (dispatcher->extra_handler) { - dispatcher->extra_handler(dispatcher->opaque, type, (void *)payload); + if (dispatcher->priv->extra_handler) { + dispatcher->priv->extra_handler(dispatcher->priv->opaque, type, (void *)payload); } if (msg->handler) { - msg->handler(dispatcher->opaque, type, (void *)payload); + msg->handler(dispatcher->priv->opaque, type, (void *)payload); } else { spice_printerr("error: no handler for message type %d", type); } if (msg->ack == DISPATCHER_ACK) { - if (write_safe(dispatcher->recv_fd, + if (write_safe(dispatcher->priv->recv_fd, (uint8_t*)&ack, sizeof(ack)) == -1) { spice_printerr("error writing ack for message %d", type); /* TODO: close socketpair? */ } - } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->handle_async_done) { - dispatcher->handle_async_done(dispatcher->opaque, type, + } else if (msg->ack == DISPATCHER_ASYNC && dispatcher->priv->handle_async_done) { + dispatcher->priv->handle_async_done(dispatcher->priv->opaque, type, (void *)payload); } return 1; @@ -171,12 +318,12 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, { DispatcherMessage *msg; uint32_t ack; - int send_fd = dispatcher->send_fd; + int send_fd = dispatcher->priv->send_fd; - assert(dispatcher->max_message_type > message_type); - assert(dispatcher->messages[message_type].handler); - msg = &dispatcher->messages[message_type]; - pthread_mutex_lock(&dispatcher->lock); + assert(dispatcher->priv->max_message_type > message_type); + assert(dispatcher->priv->messages[message_type].handler); + msg = &dispatcher->priv->messages[message_type]; + pthread_mutex_lock(&dispatcher->priv->lock); if (write_safe(send_fd, (uint8_t*)&message_type, sizeof(message_type)) == -1) { spice_printerr("error: failed to send message type for message %d", message_type); @@ -197,7 +344,7 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, } } unlock: - pthread_mutex_unlock(&dispatcher->lock); + pthread_mutex_unlock(&dispatcher->priv->lock); } uint32_t dispatcher_read_message(Dispatcher *dispatcher) @@ -205,9 +352,9 @@ uint32_t dispatcher_read_message(Dispatcher *dispatcher) uint32_t message = 0; spice_return_val_if_fail(dispatcher, 0); - spice_return_val_if_fail(dispatcher->send_fd != -1, 0); + spice_return_val_if_fail(dispatcher->priv->send_fd != -1, 0); - if (read_safe(dispatcher->send_fd, (uint8_t*)&message, sizeof(message), 1) == -1) + if (read_safe(dispatcher->priv->send_fd, (uint8_t*)&message, sizeof(message), 1) == -1) spice_warn_if_reached(); return message; @@ -217,8 +364,8 @@ void dispatcher_register_async_done_callback( Dispatcher *dispatcher, dispatcher_handle_async_done handler) { - assert(dispatcher->handle_async_done == NULL); - dispatcher->handle_async_done = handler; + assert(dispatcher->priv->handle_async_done == NULL); + dispatcher->priv->handle_async_done = handler; } void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, @@ -227,15 +374,15 @@ void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type, { DispatcherMessage *msg; - assert(message_type < dispatcher->max_message_type); - assert(dispatcher->messages[message_type].handler == 0); - msg = &dispatcher->messages[message_type]; + assert(message_type < dispatcher->priv->max_message_type); + assert(dispatcher->priv->messages[message_type].handler == 0); + msg = &dispatcher->priv->messages[message_type]; msg->handler = handler; msg->size = size; msg->ack = ack; - if (msg->size > dispatcher->payload_size) { - dispatcher->payload = realloc(dispatcher->payload, msg->size); - dispatcher->payload_size = msg->size; + if (msg->size > dispatcher->priv->payload_size) { + dispatcher->priv->payload = realloc(dispatcher->priv->payload, msg->size); + dispatcher->priv->payload_size = msg->size; } } @@ -243,7 +390,7 @@ void dispatcher_register_extra_handler( Dispatcher *dispatcher, dispatcher_handle_message extra_handler) { - dispatcher->extra_handler = extra_handler; + dispatcher->priv->extra_handler = extra_handler; } #ifdef DEBUG_DISPATCHER @@ -270,37 +417,15 @@ static void setup_dummy_signal_handler(void) } #endif -void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, - void *opaque) +void dispatcher_set_opaque(Dispatcher *self, void *opaque) { - int channels[2]; - -#ifdef DEBUG_DISPATCHER - setup_dummy_signal_handler(); -#endif - dispatcher->opaque = opaque; - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { - spice_error("socketpair failed %s", strerror(errno)); - return; - } - pthread_mutex_init(&dispatcher->lock, NULL); - dispatcher->recv_fd = channels[0]; - dispatcher->send_fd = channels[1]; - dispatcher->self = pthread_self(); - - dispatcher->messages = spice_malloc0_n(max_message_type, - sizeof(dispatcher->messages[0])); - dispatcher->max_message_type = max_message_type; -} - -void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque) -{ - dispatcher->opaque = opaque; + self->priv->opaque = opaque; + g_object_notify(G_OBJECT(self), "opaque"); } int dispatcher_get_recv_fd(Dispatcher *dispatcher) { - return dispatcher->recv_fd; + return dispatcher->priv->recv_fd; } static gboolean dispatch_cb(GIOChannel *source, GIOCondition condition, @@ -320,10 +445,15 @@ void dispatcher_attach(Dispatcher *dispatcher, GMainContext *main_context) spice_return_if_fail(dispatcher != NULL); spice_return_if_fail(main_context != NULL); - GIOChannel *channel = g_io_channel_unix_new(dispatcher->recv_fd); + GIOChannel *channel = g_io_channel_unix_new(dispatcher->priv->recv_fd); GSource *source = g_io_create_watch(channel, G_IO_IN); g_source_set_callback(source, (GSourceFunc)dispatch_cb, dispatcher, NULL); g_source_attach(source, main_context); g_source_unref(source); } + +pthread_t dispatcher_get_thread_id(Dispatcher *self) +{ + return self->priv->self; +} diff --git a/server/dispatcher.h b/server/dispatcher.h index fd02bce4..968127e2 100644 --- a/server/dispatcher.h +++ b/server/dispatcher.h @@ -18,9 +18,37 @@ #ifndef DISPATCHER_H #define DISPATCHER_H +#include #include "common.h" -typedef struct Dispatcher Dispatcher; +#define TYPE_DISPATCHER dispatcher_get_type() + +#define DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), TYPE_DISPATCHER, Dispatcher)) +#define DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), TYPE_DISPATCHER, DispatcherClass)) +#define IS_DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), TYPE_DISPATCHER)) +#define IS_DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), TYPE_DISPATCHER)) +#define DISPATCHER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), TYPE_DISPATCHER, DispatcherClass)) + +typedef struct _Dispatcher Dispatcher; +typedef struct _DispatcherClass DispatcherClass; +typedef struct _DispatcherPrivate DispatcherPrivate; + +struct _Dispatcher +{ + GObject parent; + + DispatcherPrivate *priv; +}; + +struct _DispatcherClass +{ + GObjectClass parent_class; +}; + +GType dispatcher_get_type(void) G_GNUC_CONST; + +Dispatcher *dispatcher_new(size_t max_message_type, void *opaque); + typedef void (*dispatcher_handle_message)(void *opaque, uint32_t message_type, @@ -37,21 +65,6 @@ typedef struct DispatcherMessage { dispatcher_handle_message handler; } DispatcherMessage; -struct Dispatcher { - SpiceCoreInterface *recv_core; - int recv_fd; - int send_fd; - pthread_t self; - pthread_mutex_t lock; - DispatcherMessage *messages; - int stage; /* message parser stage - sender has no stages */ - size_t max_message_type; - void *payload; /* allocated as max of message sizes */ - size_t payload_size; /* used to track realloc calls */ - void *opaque; - dispatcher_handle_async_done handle_async_done; - dispatcher_handle_message extra_handler; -}; /* * dispatcher_send_message @@ -62,15 +75,6 @@ void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type, void *payload); uint32_t dispatcher_read_message(Dispatcher *dispatcher); -/* - * dispatcher_init - * @max_message_type: number of message types. Allows upfront allocation - * of a DispatcherMessage list. - * up front, and registration in any order wanted. - */ -void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type, - void *opaque); - void dispatcher_attach(Dispatcher *dispatcher, GMainContext *main_context); enum { @@ -132,4 +136,6 @@ int dispatcher_get_recv_fd(Dispatcher *); */ void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque); +pthread_t dispatcher_get_thread_id(Dispatcher *self); + #endif //DISPATCHER_H diff --git a/server/main-dispatcher.c b/server/main-dispatcher.c index c419d7af..6ae0ac53 100644 --- a/server/main-dispatcher.c +++ b/server/main-dispatcher.c @@ -46,12 +46,96 @@ * seperate from self because it may send an ack or do other work in the future. */ -struct MainDispatcher { - Dispatcher base; - SpiceCoreInterface *core; - RedsState *reds; +G_DEFINE_TYPE(MainDispatcher, main_dispatcher, TYPE_DISPATCHER) + +#define MAIN_DISPATCHER_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE((o), TYPE_MAIN_DISPATCHER, MainDispatcherPrivate)) + +struct _MainDispatcherPrivate +{ + SpiceCoreInterface *core; /* weak */ + RedsState *reds; /* weak */ }; + +enum { + PROP0, + PROP_SPICE_SERVER, + PROP_CORE_INTERFACE +}; + +static void +main_dispatcher_get_property(GObject *object, + guint property_id, + GValue *value, + GParamSpec *pspec) +{ + MainDispatcher *self = MAIN_DISPATCHER(object); + + switch (property_id) { + case PROP_SPICE_SERVER: + g_value_set_pointer(value, self->priv->reds); + break; + case PROP_CORE_INTERFACE: + g_value_set_pointer(value, self->priv->core); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); + } +} + +static void +main_dispatcher_set_property(GObject *object, + guint property_id, + const GValue *value, + GParamSpec *pspec) +{ + MainDispatcher *self = MAIN_DISPATCHER(object); + + switch (property_id) { + case PROP_SPICE_SERVER: + self->priv->reds = g_value_get_pointer(value); + break; + case PROP_CORE_INTERFACE: + self->priv->core = g_value_get_pointer(value); + break; + default: + G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec); + } +} + +static void +main_dispatcher_class_init(MainDispatcherClass *klass) +{ + GObjectClass *object_class = G_OBJECT_CLASS(klass); + + g_type_class_add_private(klass, sizeof(MainDispatcherPrivate)); + + object_class->get_property = main_dispatcher_get_property; + object_class->set_property = main_dispatcher_set_property; + + g_object_class_install_property(object_class, + PROP_SPICE_SERVER, + g_param_spec_pointer("spice-server", + "spice-server", + "The spice server associated with this dispatcher", + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); + + g_object_class_install_property(object_class, + PROP_CORE_INTERFACE, + g_param_spec_pointer("core-interface", + "core-interface", + "The SpiceCoreInterface server associated with this dispatcher", + G_PARAM_READWRITE | + G_PARAM_CONSTRUCT_ONLY)); +} + +static void +main_dispatcher_init(MainDispatcher *self) +{ + self->priv = MAIN_DISPATCHER_PRIVATE(self); +} + enum { MAIN_DISPATCHER_CHANNEL_EVENT = 0, MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE, @@ -84,7 +168,7 @@ static void main_dispatcher_self_handle_channel_event(MainDispatcher *self, int event, SpiceChannelEventInfo *info) { - reds_handle_channel_event(self->reds, event, info); + reds_handle_channel_event(self->priv->reds, event, info); } static void main_dispatcher_handle_channel_event(void *opaque, @@ -103,13 +187,13 @@ void main_dispatcher_channel_event(MainDispatcher *self, int event, SpiceChannel { MainDispatcherChannelEventMessage msg = {0,}; - if (pthread_self() == self->base.self) { + if (pthread_self() == dispatcher_get_thread_id(DISPATCHER(self))) { main_dispatcher_self_handle_channel_event(self, event, info); return; } msg.event = event; msg.info = info; - dispatcher_send_message(&self->base, MAIN_DISPATCHER_CHANNEL_EVENT, + dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_CHANNEL_EVENT, &msg); } @@ -121,7 +205,7 @@ static void main_dispatcher_handle_migrate_complete(void *opaque, MainDispatcher *self = opaque; MainDispatcherMigrateSeamlessDstCompleteMessage *mig_complete = payload; - reds_on_client_seamless_migrate_complete(self->reds, mig_complete->client); + reds_on_client_seamless_migrate_complete(self->priv->reds, mig_complete->client); red_client_unref(mig_complete->client); } @@ -131,7 +215,7 @@ static void main_dispatcher_handle_mm_time_latency(void *opaque, { MainDispatcher *self = opaque; MainDispatcherMmTimeLatencyMessage *msg = payload; - reds_set_client_mm_time_latency(self->reds, msg->client, msg->latency); + reds_set_client_mm_time_latency(self->priv->reds, msg->client, msg->latency); red_client_unref(msg->client); } @@ -143,7 +227,7 @@ static void main_dispatcher_handle_client_disconnect(void *opaque, MainDispatcherClientDisconnectMessage *msg = payload; spice_debug("client=%p", msg->client); - reds_client_disconnect(self->reds, msg->client); + reds_client_disconnect(self->priv->reds, msg->client); red_client_unref(msg->client); } @@ -152,13 +236,13 @@ void main_dispatcher_seamless_migrate_dst_complete(MainDispatcher *self, { MainDispatcherMigrateSeamlessDstCompleteMessage msg; - if (pthread_self() == self->base.self) { - reds_on_client_seamless_migrate_complete(self->reds, client); + if (pthread_self() == dispatcher_get_thread_id(DISPATCHER(self))) { + reds_on_client_seamless_migrate_complete(self->priv->reds, client); return; } msg.client = red_client_ref(client); - dispatcher_send_message(&self->base, MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE, + dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE, &msg); } @@ -166,14 +250,14 @@ void main_dispatcher_set_mm_time_latency(MainDispatcher *self, RedClient *client { MainDispatcherMmTimeLatencyMessage msg; - if (pthread_self() == self->base.self) { - reds_set_client_mm_time_latency(self->reds, client, latency); + if (pthread_self() == dispatcher_get_thread_id(DISPATCHER(self))) { + reds_set_client_mm_time_latency(self->priv->reds, client, latency); return; } msg.client = red_client_ref(client); msg.latency = latency; - dispatcher_send_message(&self->base, MAIN_DISPATCHER_SET_MM_TIME_LATENCY, + dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_SET_MM_TIME_LATENCY, &msg); } @@ -184,7 +268,7 @@ void main_dispatcher_client_disconnect(MainDispatcher *self, RedClient *client) if (!client->disconnecting) { spice_debug("client %p", client); msg.client = red_client_ref(client); - dispatcher_send_message(&self->base, MAIN_DISPATCHER_CLIENT_DISCONNECT, + dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_CLIENT_DISCONNECT, &msg); } else { spice_debug("client %p already during disconnection", client); @@ -195,7 +279,7 @@ static void dispatcher_handle_read(int fd, int event, void *opaque) { MainDispatcher *self = opaque; - dispatcher_handle_recv_read(&self->base); + dispatcher_handle_recv_read(DISPATCHER(self)); } /* @@ -205,23 +289,32 @@ static void dispatcher_handle_read(int fd, int event, void *opaque) */ MainDispatcher* main_dispatcher_new(RedsState *reds, SpiceCoreInterface *core) { - MainDispatcher *main_dispatcher = g_new0(MainDispatcher, 1); - main_dispatcher->core = core; - main_dispatcher->reds = reds; - dispatcher_init(&main_dispatcher->base, MAIN_DISPATCHER_NUM_MESSAGES, main_dispatcher); - core->watch_add(main_dispatcher->base.recv_fd, SPICE_WATCH_EVENT_READ, - dispatcher_handle_read, main_dispatcher); - dispatcher_register_handler(&main_dispatcher->base, MAIN_DISPATCHER_CHANNEL_EVENT, + MainDispatcher *self = g_object_new(TYPE_MAIN_DISPATCHER, + "spice-server", reds, + "core-interface", core, + "max-message-type", MAIN_DISPATCHER_NUM_MESSAGES, + NULL); + return self; +} + +void main_dispatcher_constructed(GObject *object) +{ + MainDispatcher *self = MAIN_DISPATCHER(object); + dispatcher_set_opaque(DISPATCHER(self), self); + + self->priv->core->watch_add(dispatcher_get_recv_fd(DISPATCHER(self)), + SPICE_WATCH_EVENT_READ, dispatcher_handle_read, + self); + dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_CHANNEL_EVENT, main_dispatcher_handle_channel_event, sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */); - dispatcher_register_handler(&main_dispatcher->base, MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE, + dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE, main_dispatcher_handle_migrate_complete, sizeof(MainDispatcherMigrateSeamlessDstCompleteMessage), 0 /* no ack */); - dispatcher_register_handler(&main_dispatcher->base, MAIN_DISPATCHER_SET_MM_TIME_LATENCY, + dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_SET_MM_TIME_LATENCY, main_dispatcher_handle_mm_time_latency, sizeof(MainDispatcherMmTimeLatencyMessage), 0 /* no ack */); - dispatcher_register_handler(&main_dispatcher->base, MAIN_DISPATCHER_CLIENT_DISCONNECT, + dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_CLIENT_DISCONNECT, main_dispatcher_handle_client_disconnect, sizeof(MainDispatcherClientDisconnectMessage), 0 /* no ack */); - return main_dispatcher; } diff --git a/server/main-dispatcher.h b/server/main-dispatcher.h index 306b4219..9f1c7729 100644 --- a/server/main-dispatcher.h +++ b/server/main-dispatcher.h @@ -19,9 +19,34 @@ #define MAIN_DISPATCHER_H #include +#include "dispatcher.h" #include "red-channel.h" -typedef struct MainDispatcher MainDispatcher; +#define TYPE_MAIN_DISPATCHER main_dispatcher_get_type() + +#define MAIN_DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), TYPE_MAIN_DISPATCHER, MainDispatcher)) +#define MAIN_DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), TYPE_MAIN_DISPATCHER, MainDispatcherClass)) +#define IS_MAIN_DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), TYPE_MAIN_DISPATCHER)) +#define IS_MAIN_DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), TYPE_MAIN_DISPATCHER)) +#define MAIN_DISPATCHER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), TYPE_MAIN_DISPATCHER, MainDispatcherClass)) + +typedef struct _MainDispatcher MainDispatcher; +typedef struct _MainDispatcherClass MainDispatcherClass; +typedef struct _MainDispatcherPrivate MainDispatcherPrivate; + +struct _MainDispatcher +{ + Dispatcher parent; + + MainDispatcherPrivate *priv; +}; + +struct _MainDispatcherClass +{ + DispatcherClass parent_class; +}; + +GType main_dispatcher_get_type(void) G_GNUC_CONST; void main_dispatcher_channel_event(MainDispatcher *self, int event, SpiceChannelEventInfo *info); void main_dispatcher_seamless_migrate_dst_complete(MainDispatcher *self, RedClient *client); diff --git a/server/red-dispatcher.c b/server/red-dispatcher.c index 24b944d5..44b13343 100644 --- a/server/red-dispatcher.c +++ b/server/red-dispatcher.c @@ -51,7 +51,7 @@ struct AsyncCommand { struct RedDispatcher { QXLWorker base; QXLInstance *qxl; - Dispatcher dispatcher; + Dispatcher *dispatcher; uint32_t pending; int primary_active; int x_res; @@ -103,7 +103,7 @@ static void red_dispatcher_set_display_peer(RedChannel *channel, RedClient *clie memcpy(payload.common_caps, common_caps, sizeof(uint32_t)*num_common_caps); memcpy(payload.caps, caps, sizeof(uint32_t)*num_caps); - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DISPLAY_CONNECT, &payload); } @@ -124,7 +124,7 @@ static void red_dispatcher_disconnect_display_peer(RedChannelClient *rcc) // TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count // for channels - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DISPLAY_DISCONNECT, &payload); } @@ -139,7 +139,7 @@ static void red_dispatcher_display_migrate(RedChannelClient *rcc) dispatcher = (RedDispatcher *)rcc->channel->data; spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id); payload.rcc = rcc; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DISPLAY_MIGRATE, &payload); } @@ -164,7 +164,7 @@ static void red_dispatcher_set_cursor_peer(RedChannel *channel, RedClient *clien memcpy(payload.caps, caps, sizeof(uint32_t)*num_caps); /* TODO serialize it all, no dangling pointers */ - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_CURSOR_CONNECT, &payload); } @@ -182,7 +182,7 @@ static void red_dispatcher_disconnect_cursor_peer(RedChannelClient *rcc) spice_printerr(""); payload.rcc = rcc; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_CURSOR_DISCONNECT, &payload); } @@ -198,7 +198,7 @@ static void red_dispatcher_cursor_migrate(RedChannelClient *rcc) dispatcher = (RedDispatcher *)rcc->channel->data; spice_printerr("channel type %u id %u", rcc->channel->type, rcc->channel->id); payload.rcc = rcc; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_CURSOR_MIGRATE, &payload); } @@ -214,7 +214,7 @@ static void red_dispatcher_update_area(RedDispatcher *dispatcher, uint32_t surfa payload.qxl_dirty_rects = qxl_dirty_rects; payload.num_dirty_rects = num_dirty_rects; payload.clear_dirty_region = clear_dirty_region; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_UPDATE, &payload); } @@ -262,7 +262,7 @@ static void red_dispatcher_update_area_async(RedDispatcher *dispatcher, payload.surface_id = surface_id; payload.qxl_area = *qxl_area; payload.clear_dirty_region = clear_dirty_region; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } @@ -280,7 +280,7 @@ static void red_dispatcher_add_memslot(RedDispatcher *dispatcher, QXLDevMemSlot RedWorkerMessageAddMemslot payload; payload.mem_slot = *mem_slot; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_ADD_MEMSLOT, &payload); } @@ -297,7 +297,7 @@ static void red_dispatcher_add_memslot_async(RedDispatcher *dispatcher, QXLDevMe payload.base.cmd = async_command_alloc(dispatcher, message, cookie); payload.mem_slot = *mem_slot; - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void red_dispatcher_del_memslot(RedDispatcher *dispatcher, uint32_t slot_group_id, uint32_t slot_id) @@ -307,7 +307,7 @@ static void red_dispatcher_del_memslot(RedDispatcher *dispatcher, uint32_t slot_ payload.slot_group_id = slot_group_id; payload.slot_id = slot_id; - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void qxl_worker_del_memslot(QXLWorker *qxl_worker, uint32_t slot_group_id, uint32_t slot_id) @@ -319,7 +319,7 @@ static void red_dispatcher_destroy_surfaces(RedDispatcher *dispatcher) { RedWorkerMessageDestroySurfaces payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DESTROY_SURFACES, &payload); } @@ -335,7 +335,7 @@ static void red_dispatcher_destroy_surfaces_async(RedDispatcher *dispatcher, uin RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC; payload.base.cmd = async_command_alloc(dispatcher, message, cookie); - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void red_dispatcher_destroy_primary_surface_complete(RedDispatcher *dispatcher) @@ -354,7 +354,7 @@ red_dispatcher_destroy_primary_surface_sync(RedDispatcher *dispatcher, { RedWorkerMessageDestroyPrimarySurface payload; payload.surface_id = surface_id; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE, &payload); red_dispatcher_destroy_primary_surface_complete(dispatcher); @@ -369,7 +369,7 @@ red_dispatcher_destroy_primary_surface_async(RedDispatcher *dispatcher, payload.base.cmd = async_command_alloc(dispatcher, message, cookie); payload.surface_id = surface_id; - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void @@ -412,7 +412,7 @@ red_dispatcher_create_primary_surface_async(RedDispatcher *dispatcher, uint32_t payload.base.cmd = async_command_alloc(dispatcher, message, cookie); payload.surface_id = surface_id; payload.surface = *surface; - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void @@ -424,7 +424,7 @@ red_dispatcher_create_primary_surface_sync(RedDispatcher *dispatcher, uint32_t s dispatcher->surface_create = *surface; payload.surface_id = surface_id; payload.surface = *surface; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE, &payload); red_dispatcher_create_primary_surface_complete(dispatcher); @@ -451,7 +451,7 @@ static void red_dispatcher_reset_image_cache(RedDispatcher *dispatcher) { RedWorkerMessageResetImageCache payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_RESET_IMAGE_CACHE, &payload); } @@ -465,7 +465,7 @@ static void red_dispatcher_reset_cursor(RedDispatcher *dispatcher) { RedWorkerMessageResetCursor payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_RESET_CURSOR, &payload); } @@ -481,7 +481,7 @@ static void red_dispatcher_destroy_surface_wait_sync(RedDispatcher *dispatcher, RedWorkerMessageDestroySurfaceWait payload; payload.surface_id = surface_id; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT, &payload); } @@ -495,7 +495,7 @@ static void red_dispatcher_destroy_surface_wait_async(RedDispatcher *dispatcher, payload.base.cmd = async_command_alloc(dispatcher, message, cookie); payload.surface_id = surface_id; - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void red_dispatcher_destroy_surface_wait(RedDispatcher *dispatcher, @@ -518,7 +518,7 @@ static void red_dispatcher_reset_memslots(RedDispatcher *dispatcher) { RedWorkerMessageResetMemslots payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_RESET_MEMSLOTS, &payload); } @@ -546,7 +546,7 @@ static void red_dispatcher_wakeup(RedDispatcher *dispatcher) if (red_dispatcher_set_pending(dispatcher, RED_DISPATCHER_PENDING_WAKEUP)) return; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_WAKEUP, &payload); } @@ -563,7 +563,7 @@ static void red_dispatcher_oom(RedDispatcher *dispatcher) if (red_dispatcher_set_pending(dispatcher, RED_DISPATCHER_PENDING_OOM)) return; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_OOM, &payload); } @@ -577,7 +577,7 @@ void red_dispatcher_start(RedDispatcher *dispatcher) { RedWorkerMessageStart payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_START, &payload); } @@ -593,7 +593,7 @@ static void red_dispatcher_flush_surfaces_async(RedDispatcher *dispatcher, uint6 RedWorkerMessage message = RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC; payload.base.cmd = async_command_alloc(dispatcher, message, cookie); - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void red_dispatcher_monitors_config_async(RedDispatcher *dispatcher, @@ -608,14 +608,14 @@ static void red_dispatcher_monitors_config_async(RedDispatcher *dispatcher, payload.monitors_config = monitors_config; payload.group_id = group_id; - dispatcher_send_message(&dispatcher->dispatcher, message, &payload); + dispatcher_send_message(dispatcher->dispatcher, message, &payload); } static void red_dispatcher_driver_unload(RedDispatcher *dispatcher) { RedWorkerMessageDriverUnload payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_DRIVER_UNLOAD, &payload); } @@ -624,7 +624,7 @@ void red_dispatcher_stop(RedDispatcher *dispatcher) { RedWorkerMessageStop payload; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_STOP, &payload); } @@ -643,7 +643,7 @@ static void red_dispatcher_loadvm_commands(RedDispatcher *dispatcher, spice_printerr(""); payload.count = count; payload.ext = ext; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_LOADVM_COMMANDS, &payload); } @@ -882,7 +882,7 @@ RedDispatcher *red_dispatcher_new(RedsState *reds, QXLInstance *qxl, int compres red_dispatcher->qxl = qxl; ring_init(&red_dispatcher->async_commands); spice_debug("red_dispatcher->async_commands.next %p", red_dispatcher->async_commands.next); - dispatcher_init(&red_dispatcher->dispatcher, RED_WORKER_MESSAGE_COUNT, NULL); + red_dispatcher->dispatcher = dispatcher_new(RED_WORKER_MESSAGE_COUNT, NULL); pthread_mutex_init(&red_dispatcher->async_lock, NULL); red_dispatcher->base.major_version = SPICE_INTERFACE_QXL_MAJOR; red_dispatcher->base.minor_version = SPICE_INTERFACE_QXL_MINOR; @@ -934,15 +934,15 @@ RedDispatcher *red_dispatcher_new(RedsState *reds, QXLInstance *qxl, int compres return red_dispatcher; } -struct Dispatcher *red_dispatcher_get_dispatcher(RedDispatcher *red_dispatcher) +Dispatcher *red_dispatcher_get_dispatcher(RedDispatcher *red_dispatcher) { - return &red_dispatcher->dispatcher; + return red_dispatcher->dispatcher; } void red_dispatcher_set_dispatcher_opaque(RedDispatcher *red_dispatcher, void *opaque) { - dispatcher_set_opaque(&red_dispatcher->dispatcher, opaque); + dispatcher_set_opaque(red_dispatcher->dispatcher, opaque); } void red_dispatcher_clear_pending(RedDispatcher *red_dispatcher, int pending) @@ -972,7 +972,7 @@ void red_dispatcher_on_ic_change(RedDispatcher *dispatcher, spice_image_compress { RedWorkerMessageSetCompression payload; payload.image_compression = ic; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_SET_COMPRESSION, &payload); } @@ -981,7 +981,7 @@ void red_dispatcher_on_sv_change(RedDispatcher *dispatcher, int sv) { RedWorkerMessageSetStreamingVideo payload; payload.streaming_video = sv; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_SET_STREAMING_VIDEO, &payload); } @@ -990,7 +990,7 @@ void red_dispatcher_set_mouse_mode(RedDispatcher *dispatcher, uint32_t mode) { RedWorkerMessageSetMouseMode payload; payload.mode = mode; - dispatcher_send_message(&dispatcher->dispatcher, + dispatcher_send_message(dispatcher->dispatcher, RED_WORKER_MESSAGE_SET_MOUSE_MODE, &payload); } diff --git a/server/red-dispatcher.h b/server/red-dispatcher.h index 4aeac179..35b0ab88 100644 --- a/server/red-dispatcher.h +++ b/server/red-dispatcher.h @@ -19,6 +19,7 @@ #define _H_RED_DISPATCHER #include "red-channel.h" +#include "dispatcher.h" typedef struct RedDispatcher RedDispatcher; typedef struct RedChannelClient RedChannelClient; @@ -37,7 +38,7 @@ void red_dispatcher_start(RedDispatcher *dispatcher); int red_dispatcher_add_renderer(const char *name); uint32_t red_dispatcher_qxl_ram_size(RedDispatcher *dispatcher); void red_dispatcher_async_complete(struct RedDispatcher *, AsyncCommand *); -struct Dispatcher *red_dispatcher_get_dispatcher(struct RedDispatcher *); +Dispatcher *red_dispatcher_get_dispatcher(struct RedDispatcher *); gboolean red_dispatcher_use_client_monitors_config(RedDispatcher *dispatcher); gboolean red_dispatcher_client_monitors_config(RedDispatcher *dispatcher, VDAgentMonitorsConfig *monitors_config); gboolean red_dispatcher_get_primary_active(RedDispatcher *dispatcher); -- cgit