diff options
author | Alon Levy <alevy@redhat.com> | 2011-10-30 17:04:59 +0200 |
---|---|---|
committer | Alon Levy <alevy@redhat.com> | 2011-11-08 14:59:49 +0200 |
commit | 776bdd6c95715dcd8e609dc3ff647d0ad73fd339 (patch) | |
tree | d39addbae110ce4d421f4ad968adafbc1c38987b /server/main_dispatcher.c | |
parent | 9174b67160157f74cc00faf0b42cb7c5d2b710a1 (diff) | |
download | spice-776bdd6c95715dcd8e609dc3ff647d0ad73fd339.tar.gz spice-776bdd6c95715dcd8e609dc3ff647d0ad73fd339.tar.xz spice-776bdd6c95715dcd8e609dc3ff647d0ad73fd339.zip |
server: introduce dispatcher
used for main_dispatcher only in this patch.
Dispatcher is meant to be used for Main<->any low frequency messages.
It's interface is meant to include the red_dispatcher usage:
fixed size messages per message type
some messages require an ack
Some methods are added to be used by RedDispatcher later:
dispatcher_handle_read - to be called directly by RedDispatcher epoll
based loop
dispatcher_set_opaque - to be set from red_worker pthread
dispatcher_init - allow NULL core as used by red_worker
Read and Write behavior:
Sender: blocking write, blocking read for ack (if any).
Reader: poll for any data, if such then blocking read for a
message_type and following message. repeat until poll returns
with no pending data to read.
FDO Bugzilla: 42463
Diffstat (limited to 'server/main_dispatcher.c')
-rw-r--r-- | server/main_dispatcher.c | 102 |
1 files changed, 27 insertions, 75 deletions
diff --git a/server/main_dispatcher.c b/server/main_dispatcher.c index 73856bfe..a5967fa4 100644 --- a/server/main_dispatcher.c +++ b/server/main_dispatcher.c @@ -5,6 +5,7 @@ #include <assert.h> #include "red_common.h" +#include "dispatcher.h" #include "main_dispatcher.h" /* @@ -28,11 +29,8 @@ */ typedef struct { + Dispatcher base; SpiceCoreInterface *core; - int main_fd; - int other_fd; - pthread_t self; - pthread_mutex_t lock; } MainDispatcher; MainDispatcher main_dispatcher; @@ -43,15 +41,10 @@ enum { MAIN_DISPATCHER_NUM_MESSAGES }; -typedef struct MainDispatcherMessage { - uint32_t type; - union { - struct { - int event; - SpiceChannelEventInfo *info; - } channel_event; - } data; -} MainDispatcherMessage; +typedef struct MainDispatcherChannelEventMessage { + int event; + SpiceChannelEventInfo *info; +} MainDispatcherChannelEventMessage; /* channel_event - calls core->channel_event, must be done in main thread */ static void main_dispatcher_self_handle_channel_event( @@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event( main_dispatcher.core->channel_event(event, info); } -static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg) +static void main_dispatcher_handle_channel_event(void *opaque, + void *payload) { - main_dispatcher_self_handle_channel_event(msg->data.channel_event.event, - msg->data.channel_event.info); + MainDispatcherChannelEventMessage *channel_event = payload; + + main_dispatcher_self_handle_channel_event(channel_event->event, + channel_event->info); } void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info) { - MainDispatcherMessage msg; - ssize_t written = 0; - ssize_t ret; + MainDispatcherChannelEventMessage msg; - if (pthread_self() == main_dispatcher.self) { + if (pthread_self() == main_dispatcher.base.self) { main_dispatcher_self_handle_channel_event(event, info); return; } - msg.type = MAIN_DISPATCHER_CHANNEL_EVENT; - msg.data.channel_event.event = event; - msg.data.channel_event.info = info; - pthread_mutex_lock(&main_dispatcher.lock); - while (written < sizeof(msg)) { - ret = write(main_dispatcher.other_fd, &msg + written, - sizeof(msg) - written); - if (ret == -1) { - assert(errno == EINTR); - continue; - } - written += ret; - } - pthread_mutex_unlock(&main_dispatcher.lock); + msg.event = event; + msg.info = info; + dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT, + &msg); } - -static void main_dispatcher_handle_read(int fd, int event, void *opaque) +static void dispatcher_handle_read(int fd, int event, void *opaque) { - int ret; - MainDispatcher *md = opaque; - MainDispatcherMessage msg; - int read_size = 0; + Dispatcher *dispatcher = opaque; - while (read_size < sizeof(msg)) { - /* blocks until sizeof(msg) is read */ - ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size); - if (ret == -1) { - if (errno != EINTR) { - red_printf("error reading from main dispatcher: %d\n", errno); - /* TODO: close channel? */ - return; - } - continue; - } - read_size += ret; - } - switch (msg.type) { - case MAIN_DISPATCHER_CHANNEL_EVENT: - main_dispatcher_handle_channel_event(&msg); - break; - default: - red_printf("error: unhandled main dispatcher message type %d\n", - msg.type); - } + dispatcher_handle_recv_read(dispatcher); } void main_dispatcher_init(SpiceCoreInterface *core) { - int channels[2]; - memset(&main_dispatcher, 0, sizeof(main_dispatcher)); main_dispatcher.core = core; - - if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) { - red_error("socketpair failed %s", strerror(errno)); - return; - } - pthread_mutex_init(&main_dispatcher.lock, NULL); - main_dispatcher.main_fd = channels[0]; - main_dispatcher.other_fd = channels[1]; - main_dispatcher.self = pthread_self(); - - core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ, - main_dispatcher_handle_read, &main_dispatcher); + dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base); + core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ, + dispatcher_handle_read, &main_dispatcher.base); + dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT, + main_dispatcher_handle_channel_event, + sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */); } |