summaryrefslogtreecommitdiffstats
path: root/server/main_dispatcher.c
diff options
context:
space:
mode:
authorAlon Levy <alevy@redhat.com>2011-10-30 17:04:59 +0200
committerAlon Levy <alevy@redhat.com>2011-11-08 14:59:49 +0200
commit776bdd6c95715dcd8e609dc3ff647d0ad73fd339 (patch)
treed39addbae110ce4d421f4ad968adafbc1c38987b /server/main_dispatcher.c
parent9174b67160157f74cc00faf0b42cb7c5d2b710a1 (diff)
downloadspice-776bdd6c95715dcd8e609dc3ff647d0ad73fd339.tar.gz
spice-776bdd6c95715dcd8e609dc3ff647d0ad73fd339.tar.xz
spice-776bdd6c95715dcd8e609dc3ff647d0ad73fd339.zip
server: introduce dispatcher
used for main_dispatcher only in this patch. Dispatcher is meant to be used for Main<->any low frequency messages. It's interface is meant to include the red_dispatcher usage: fixed size messages per message type some messages require an ack Some methods are added to be used by RedDispatcher later: dispatcher_handle_read - to be called directly by RedDispatcher epoll based loop dispatcher_set_opaque - to be set from red_worker pthread dispatcher_init - allow NULL core as used by red_worker Read and Write behavior: Sender: blocking write, blocking read for ack (if any). Reader: poll for any data, if such then blocking read for a message_type and following message. repeat until poll returns with no pending data to read. FDO Bugzilla: 42463
Diffstat (limited to 'server/main_dispatcher.c')
-rw-r--r--server/main_dispatcher.c102
1 files changed, 27 insertions, 75 deletions
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c
index 73856bfe..a5967fa4 100644
--- a/server/main_dispatcher.c
+++ b/server/main_dispatcher.c
@@ -5,6 +5,7 @@
#include <assert.h>
#include "red_common.h"
+#include "dispatcher.h"
#include "main_dispatcher.h"
/*
@@ -28,11 +29,8 @@
*/
typedef struct {
+ Dispatcher base;
SpiceCoreInterface *core;
- int main_fd;
- int other_fd;
- pthread_t self;
- pthread_mutex_t lock;
} MainDispatcher;
MainDispatcher main_dispatcher;
@@ -43,15 +41,10 @@ enum {
MAIN_DISPATCHER_NUM_MESSAGES
};
-typedef struct MainDispatcherMessage {
- uint32_t type;
- union {
- struct {
- int event;
- SpiceChannelEventInfo *info;
- } channel_event;
- } data;
-} MainDispatcherMessage;
+typedef struct MainDispatcherChannelEventMessage {
+ int event;
+ SpiceChannelEventInfo *info;
+} MainDispatcherChannelEventMessage;
/* channel_event - calls core->channel_event, must be done in main thread */
static void main_dispatcher_self_handle_channel_event(
@@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
main_dispatcher.core->channel_event(event, info);
}
-static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
+static void main_dispatcher_handle_channel_event(void *opaque,
+ void *payload)
{
- main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
- msg->data.channel_event.info);
+ MainDispatcherChannelEventMessage *channel_event = payload;
+
+ main_dispatcher_self_handle_channel_event(channel_event->event,
+ channel_event->info);
}
void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
{
- MainDispatcherMessage msg;
- ssize_t written = 0;
- ssize_t ret;
+ MainDispatcherChannelEventMessage msg;
- if (pthread_self() == main_dispatcher.self) {
+ if (pthread_self() == main_dispatcher.base.self) {
main_dispatcher_self_handle_channel_event(event, info);
return;
}
- msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
- msg.data.channel_event.event = event;
- msg.data.channel_event.info = info;
- pthread_mutex_lock(&main_dispatcher.lock);
- while (written < sizeof(msg)) {
- ret = write(main_dispatcher.other_fd, &msg + written,
- sizeof(msg) - written);
- if (ret == -1) {
- assert(errno == EINTR);
- continue;
- }
- written += ret;
- }
- pthread_mutex_unlock(&main_dispatcher.lock);
+ msg.event = event;
+ msg.info = info;
+ dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+ &msg);
}
-
-static void main_dispatcher_handle_read(int fd, int event, void *opaque)
+static void dispatcher_handle_read(int fd, int event, void *opaque)
{
- int ret;
- MainDispatcher *md = opaque;
- MainDispatcherMessage msg;
- int read_size = 0;
+ Dispatcher *dispatcher = opaque;
- while (read_size < sizeof(msg)) {
- /* blocks until sizeof(msg) is read */
- ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size);
- if (ret == -1) {
- if (errno != EINTR) {
- red_printf("error reading from main dispatcher: %d\n", errno);
- /* TODO: close channel? */
- return;
- }
- continue;
- }
- read_size += ret;
- }
- switch (msg.type) {
- case MAIN_DISPATCHER_CHANNEL_EVENT:
- main_dispatcher_handle_channel_event(&msg);
- break;
- default:
- red_printf("error: unhandled main dispatcher message type %d\n",
- msg.type);
- }
+ dispatcher_handle_recv_read(dispatcher);
}
void main_dispatcher_init(SpiceCoreInterface *core)
{
- int channels[2];
-
memset(&main_dispatcher, 0, sizeof(main_dispatcher));
main_dispatcher.core = core;
-
- if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
- red_error("socketpair failed %s", strerror(errno));
- return;
- }
- pthread_mutex_init(&main_dispatcher.lock, NULL);
- main_dispatcher.main_fd = channels[0];
- main_dispatcher.other_fd = channels[1];
- main_dispatcher.self = pthread_self();
-
- core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
- main_dispatcher_handle_read, &main_dispatcher);
+ dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base);
+ core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
+ dispatcher_handle_read, &main_dispatcher.base);
+ dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
+ main_dispatcher_handle_channel_event,
+ sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
}