diff options
author | Alon Levy <alevy@redhat.com> | 2010-11-08 13:58:10 +0200 |
---|---|---|
committer | Alon Levy <alevy@redhat.com> | 2011-03-02 17:27:50 +0200 |
commit | 73858b93dccf595652f1b1e86fea1d0e7e9e153b (patch) | |
tree | d1a96c96c32b730dffe82cde43fcda7404b94c13 /server | |
parent | 29a7bcd5964539bfdddc2489b33471038cf18477 (diff) | |
download | spice-73858b93dccf595652f1b1e86fea1d0e7e9e153b.tar.gz spice-73858b93dccf595652f1b1e86fea1d0e7e9e153b.tar.xz spice-73858b93dccf595652f1b1e86fea1d0e7e9e153b.zip |
server/red_worker: introduce red_peer_handle_outgoing and OutgoingHandler
From red_channel.
Diffstat (limited to 'server')
-rw-r--r-- | server/red_worker.c | 151 |
1 files changed, 115 insertions, 36 deletions
diff --git a/server/red_worker.c b/server/red_worker.c index 683f8ad4..3614eb89 100644 --- a/server/red_worker.c +++ b/server/red_worker.c @@ -354,6 +354,31 @@ typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item) typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, PipeItem *item, int item_pushed); typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type, void *message); +#define MAX_SEND_VEC 100 + +typedef int (*get_outgoing_msg_size_proc)(void *opaque); +typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos); +typedef void (*on_outgoing_error_proc)(void *opaque); +typedef void (*on_outgoing_block_proc)(void *opaque); +typedef void (*on_outgoing_msg_done_proc)(void *opaque); + +typedef struct OutgoingHandler { + void *opaque; + struct iovec vec_buf[MAX_SEND_VEC]; + int vec_size; + struct iovec *vec; + int pos; + int size; + get_outgoing_msg_size_proc get_msg_size; + prepare_outgoing_proc prepare; + on_outgoing_error_proc on_error; + on_outgoing_block_proc on_block; + on_outgoing_msg_done_proc on_msg_done; +#ifdef RED_STATISTICS + uint64_t *out_bytes_counter; +#endif +} OutgoingHandler; + struct RedChannel { spice_parse_channel_func_t parser; RedsStream *stream; @@ -386,6 +411,8 @@ struct RedChannel { uint8_t *end; } incoming; + OutgoingHandler outgoing; + channel_disconnect_proc disconnect; channel_hold_pipe_item_proc hold_item; channel_release_pipe_item_proc release_item; @@ -393,12 +420,6 @@ struct RedChannel { channel_send_pipe_item_proc send_item; int during_send; - -#ifdef RED_STATISTICS - struct { - uint64_t *out_bytes_counter; - } outgoing; -#endif }; typedef struct ImageItem { @@ -7277,8 +7298,6 @@ static inline void red_send_qxl_drawable(RedWorker *worker, DisplayChannel *disp display_begin_send_message(display_channel); } -#define MAX_SEND_VEC 100 - static void inline channel_release_res(RedChannel *channel) { if (!channel->send_data.item) { @@ -7288,47 +7307,83 @@ static void inline channel_release_res(RedChannel *channel) channel->send_data.item = NULL; } -static void red_channel_send(RedChannel *channel) +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, + int *vec_size, int pos) { - for (;;) { - ssize_t n = channel->send_data.size - channel->send_data.pos; - struct iovec vec[MAX_SEND_VEC]; - size_t vec_size; - - if (!n) { - channel->send_data.blocked = FALSE; - if (channel->send_data.item) { - channel->release_item(channel, channel->send_data.item, FALSE); - channel->send_data.item = NULL; - } - break; + RedChannel *channel = (RedChannel *)opaque; + + *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller, + vec, MAX_SEND_VEC, pos); +} + +static void red_channel_peer_on_out_block(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + + channel->send_data.blocked = TRUE; +} + +static void red_channel_peer_on_out_msg_done(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + + channel->send_data.size = 0; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + channel->send_data.item = NULL; + } + channel->send_data.blocked = FALSE; +} + +static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler) +{ + int n; + + ASSERT(stream); + if (handler->size == 0) { + handler->vec = handler->vec_buf; + handler->size = handler->get_msg_size(handler->opaque); + if (!handler->size) { // nothing to be sent + return; } - vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller, - vec, MAX_SEND_VEC, channel->send_data.pos); - ASSERT(channel->stream); - n = reds_stream_writev(channel->stream, vec, vec_size); + } + for (;;) { + handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos); + n = reds_stream_writev(stream, handler->vec, handler->vec_size); if (n == -1) { switch (errno) { case EAGAIN: - channel->send_data.blocked = TRUE; + handler->on_block(handler->opaque); return; case EINTR: break; case EPIPE: - channel->disconnect(channel); + handler->on_error(handler->opaque); return; default: red_printf("%s", strerror(errno)); - channel->disconnect(channel); + handler->on_error(handler->opaque); return; } } else { - channel->send_data.pos += n; - stat_inc_counter(channel->outgoing.out_bytes_counter, n); + handler->pos += n; + stat_inc_counter(handler->out_bytes_counter, n); + if (handler->pos == handler->size) { // finished writing data + handler->on_msg_done(handler->opaque); + handler->vec = handler->vec_buf; + handler->pos = 0; + handler->size = 0; + return; + } } } } +static void red_channel_send(RedChannel *channel) +{ + red_peer_handle_outgoing(channel->stream, &channel->outgoing); +} + static void display_channel_push_release(DisplayChannel *channel, uint8_t type, uint64_t id, uint64_t* sync_data) { @@ -8244,16 +8299,17 @@ static void red_send_surface_destroy(DisplayChannel *display, uint32_t surface_i red_channel_begin_send_message(channel); } +static inline int red_channel_waiting_for_ack(RedChannel *channel) +{ + return (channel->ack_data.messages_window > channel->ack_data.client_window * 2); +} + static inline PipeItem *red_channel_pipe_get(RedChannel *channel) { PipeItem *item; if (!channel || channel->send_data.blocked || - !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { - return NULL; - } - - if (channel->ack_data.messages_window > channel->ack_data.client_window * 2) { - channel->send_data.blocked = TRUE; + red_channel_waiting_for_ack(channel) || + !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { return NULL; } @@ -9336,6 +9392,18 @@ static void free_common_channel_from_listener(EventListener *ctx) free(common); } +static void red_channel_default_peer_on_error(RedChannel *channel) +{ + channel->disconnect(channel); +} + +static int red_channel_peer_get_out_msg_size(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + + return channel->send_data.size; +} + static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id, RedsStream *stream, int migrate, event_listener_action_proc handler, @@ -9380,6 +9448,17 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i ring_init(&channel->pipe); channel->send_data.marshaller = spice_marshaller_new(); + channel->outgoing.opaque = channel; + channel->outgoing.pos = 0; + channel->outgoing.size = 0; + channel->outgoing.out_bytes_counter = 0; + + channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; + channel->outgoing.prepare = red_channel_peer_prepare_out_msg; + channel->outgoing.on_block = red_channel_peer_on_out_block; + channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error; + channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; + event.events = EPOLLIN | EPOLLOUT | EPOLLET; event.data.ptr = &common->listener; if (epoll_ctl(worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) { |