From 776bdd6c95715dcd8e609dc3ff647d0ad73fd339 Mon Sep 17 00:00:00 2001 From: Alon Levy Date: Sun, 30 Oct 2011 17:04:59 +0200 Subject: server: introduce dispatcher used for main_dispatcher only in this patch. Dispatcher is meant to be used for Main<->any low frequency messages. It's interface is meant to include the red_dispatcher usage: fixed size messages per message type some messages require an ack Some methods are added to be used by RedDispatcher later: dispatcher_handle_read - to be called directly by RedDispatcher epoll based loop dispatcher_set_opaque - to be set from red_worker pthread dispatcher_init - allow NULL core as used by red_worker Read and Write behavior: Sender: blocking write, blocking read for ack (if any). Reader: poll for any data, if such then blocking read for a message_type and following message. repeat until poll returns with no pending data to read. FDO Bugzilla: 42463 --- server/main_dispatcher.c | 102 +++++++++++++---------------------------------- 1 file changed, 27 insertions(+), 75 deletions(-) (limited to 'server/main_dispatcher.c') diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c index 73856bfe..a5967fa4 100644 --- a/server/main_dispatcher.c +++ b/server/main_dispatcher.c @@ -5,6 +5,7 @@ #include #include "red_common.h" +#include "dispatcher.h" #include "main_dispatcher.h" /* @@ -28,11 +29,8 @@ */ typedef struct { + Dispatcher base; SpiceCoreInterface *core; - int main_fd; - int other_fd; - pthread_t self; - pthread_mutex_t lock; } MainDispatcher; MainDispatcher main_dispatcher; @@ -43,15 +41,10 @@ enum { MAIN_DISPATCHER_NUM_MESSAGES }; -typedef struct MainDispatcherMessage { - uint32_t type; - union { - struct { - int event; - SpiceChannelEventInfo *info; - } channel_event; - } data; -} MainDispatcherMessage; +typedef struct MainDispatcherChannelEventMessage { + int event; + SpiceChannelEventInfo *info; +} MainDispatcherChannelEventMessage; /* channel_event - calls core->channel_event, must be done in main thread */ static void main_dispatcher_self_handle_channel_event( @@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event( main_dispatcher.core->channel_event(event, info); } -static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg) +static void main_dispatcher_handle_channel_event(void *opaque, + void *payload) { - main_dispatcher_self_handle_channel_event(msg->data.channel_event.event, - msg->data.channel_event.info); + MainDispatcherChannelEventMessage *channel_event = payload; + + main_dispatcher_self_handle_channel_event(channel_event->event, + channel_event->info); } void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info) { - MainDispatcherMessage msg; - ssize_t written = 0; - ssize_t ret; + MainDispatcherChannelEventMessage msg; - if (pthread_self() == main_dispatcher.self) { + if (pthread_self() == main_dispatcher.base.self) { main_dispatcher_self_handle_channel_event(event, info); return; } - msg.type = MAIN_DISPATCHER_CHANNEL_EVENT; - msg.data.channel_event.event = event; - msg.data.channel_event.info = info; - pthread_mutex_lock(&main_dispatcher.lock); - while (written < sizeof(msg)) { - ret = write(main_dispatcher.other_fd, &msg + written, - sizeof(msg) - written); - if (ret == -1) { - assert(errno == EINTR); - continue; - } - written += ret; - } - pthread_mutex_unlock(&main_dispatcher.lock); + msg.event = event; + msg.info = info; + dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT, + &msg); } - -static void main_dispatcher_handle_read(int fd, int event, void *opaque) +static void dispatcher_handle_read(int fd, int event, void *opaque) { - int ret; - MainDispatcher *md = opaque; - MainDispatcherMessage msg; - int read_size = 0; + Dispatcher *dispatcher = opaque; - while (read_size < sizeof(msg)) { - /* blocks until sizeof(msg) is read */ - ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size); - if (ret == -1) { - if (errno != EINTR) { - red_printf("error reading from main dispatcher: %d\n", errno); - /* TODO: close channel? */ - return; - } - continue; - } - read_size += ret; - } - switch (msg.type) { - case MAIN_DISPATCHER_CHANNEL_EVENT: - main_dispatcher_handle_channel_event(&msg); - break; - default: - red_printf("error: unhandled main dispatcher message type %d\n", - msg.type); - } + dispatcher_handle_recv_read(dispatcher); } void main_dispatcher_init(SpiceCoreInterface *core) { - int channels[2]; - memset(&main_dispatcher, 0, sizeof(main_dispatcher)); main_dispatcher.core = core; - - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { - red_error("socketpair failed %s", strerror(errno)); - return; - } - pthread_mutex_init(&main_dispatcher.lock, NULL); - main_dispatcher.main_fd = channels[0]; - main_dispatcher.other_fd = channels[1]; - main_dispatcher.self = pthread_self(); - - core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ, - main_dispatcher_handle_read, &main_dispatcher); + dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base); + core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ, + dispatcher_handle_read, &main_dispatcher.base); + dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT, + main_dispatcher_handle_channel_event, + sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */); } -- cgit