summaryrefslogtreecommitdiffstats
path: root/server
diff options
context:
space:
mode:
authorAlon Levy <alevy@redhat.com>2010-11-08 13:58:10 +0200
committerAlon Levy <alevy@redhat.com>2011-03-02 17:27:50 +0200
commit73858b93dccf595652f1b1e86fea1d0e7e9e153b (patch)
treed1a96c96c32b730dffe82cde43fcda7404b94c13 /server
parent29a7bcd5964539bfdddc2489b33471038cf18477 (diff)
downloadspice-73858b93dccf595652f1b1e86fea1d0e7e9e153b.tar.gz
spice-73858b93dccf595652f1b1e86fea1d0e7e9e153b.tar.xz
spice-73858b93dccf595652f1b1e86fea1d0e7e9e153b.zip
server/red_worker: introduce red_peer_handle_outgoing and OutgoingHandler
From red_channel.
Diffstat (limited to 'server')
-rw-r--r--server/red_worker.c151
1 files changed, 115 insertions, 36 deletions
diff --git a/server/red_worker.c b/server/red_worker.c
index 683f8ad4..3614eb89 100644
--- a/server/red_worker.c
+++ b/server/red_worker.c
@@ -354,6 +354,31 @@ typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item)
typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, PipeItem *item, int item_pushed);
typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type, void *message);
+#define MAX_SEND_VEC 100
+
+typedef int (*get_outgoing_msg_size_proc)(void *opaque);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
+typedef void (*on_outgoing_error_proc)(void *opaque);
+typedef void (*on_outgoing_block_proc)(void *opaque);
+typedef void (*on_outgoing_msg_done_proc)(void *opaque);
+
+typedef struct OutgoingHandler {
+ void *opaque;
+ struct iovec vec_buf[MAX_SEND_VEC];
+ int vec_size;
+ struct iovec *vec;
+ int pos;
+ int size;
+ get_outgoing_msg_size_proc get_msg_size;
+ prepare_outgoing_proc prepare;
+ on_outgoing_error_proc on_error;
+ on_outgoing_block_proc on_block;
+ on_outgoing_msg_done_proc on_msg_done;
+#ifdef RED_STATISTICS
+ uint64_t *out_bytes_counter;
+#endif
+} OutgoingHandler;
+
struct RedChannel {
spice_parse_channel_func_t parser;
RedsStream *stream;
@@ -386,6 +411,8 @@ struct RedChannel {
uint8_t *end;
} incoming;
+ OutgoingHandler outgoing;
+
channel_disconnect_proc disconnect;
channel_hold_pipe_item_proc hold_item;
channel_release_pipe_item_proc release_item;
@@ -393,12 +420,6 @@ struct RedChannel {
channel_send_pipe_item_proc send_item;
int during_send;
-
-#ifdef RED_STATISTICS
- struct {
- uint64_t *out_bytes_counter;
- } outgoing;
-#endif
};
typedef struct ImageItem {
@@ -7277,8 +7298,6 @@ static inline void red_send_qxl_drawable(RedWorker *worker, DisplayChannel *disp
display_begin_send_message(display_channel);
}
-#define MAX_SEND_VEC 100
-
static void inline channel_release_res(RedChannel *channel)
{
if (!channel->send_data.item) {
@@ -7288,47 +7307,83 @@ static void inline channel_release_res(RedChannel *channel)
channel->send_data.item = NULL;
}
-static void red_channel_send(RedChannel *channel)
+static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec,
+ int *vec_size, int pos)
{
- for (;;) {
- ssize_t n = channel->send_data.size - channel->send_data.pos;
- struct iovec vec[MAX_SEND_VEC];
- size_t vec_size;
-
- if (!n) {
- channel->send_data.blocked = FALSE;
- if (channel->send_data.item) {
- channel->release_item(channel, channel->send_data.item, FALSE);
- channel->send_data.item = NULL;
- }
- break;
+ RedChannel *channel = (RedChannel *)opaque;
+
+ *vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
+ vec, MAX_SEND_VEC, pos);
+}
+
+static void red_channel_peer_on_out_block(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+
+ channel->send_data.blocked = TRUE;
+}
+
+static void red_channel_peer_on_out_msg_done(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+
+ channel->send_data.size = 0;
+ if (channel->send_data.item) {
+ channel->release_item(channel, channel->send_data.item, TRUE);
+ channel->send_data.item = NULL;
+ }
+ channel->send_data.blocked = FALSE;
+}
+
+static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
+{
+ int n;
+
+ ASSERT(stream);
+ if (handler->size == 0) {
+ handler->vec = handler->vec_buf;
+ handler->size = handler->get_msg_size(handler->opaque);
+ if (!handler->size) { // nothing to be sent
+ return;
}
- vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
- vec, MAX_SEND_VEC, channel->send_data.pos);
- ASSERT(channel->stream);
- n = reds_stream_writev(channel->stream, vec, vec_size);
+ }
+ for (;;) {
+ handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
+ n = reds_stream_writev(stream, handler->vec, handler->vec_size);
if (n == -1) {
switch (errno) {
case EAGAIN:
- channel->send_data.blocked = TRUE;
+ handler->on_block(handler->opaque);
return;
case EINTR:
break;
case EPIPE:
- channel->disconnect(channel);
+ handler->on_error(handler->opaque);
return;
default:
red_printf("%s", strerror(errno));
- channel->disconnect(channel);
+ handler->on_error(handler->opaque);
return;
}
} else {
- channel->send_data.pos += n;
- stat_inc_counter(channel->outgoing.out_bytes_counter, n);
+ handler->pos += n;
+ stat_inc_counter(handler->out_bytes_counter, n);
+ if (handler->pos == handler->size) { // finished writing data
+ handler->on_msg_done(handler->opaque);
+ handler->vec = handler->vec_buf;
+ handler->pos = 0;
+ handler->size = 0;
+ return;
+ }
}
}
}
+static void red_channel_send(RedChannel *channel)
+{
+ red_peer_handle_outgoing(channel->stream, &channel->outgoing);
+}
+
static void display_channel_push_release(DisplayChannel *channel, uint8_t type, uint64_t id,
uint64_t* sync_data)
{
@@ -8244,16 +8299,17 @@ static void red_send_surface_destroy(DisplayChannel *display, uint32_t surface_i
red_channel_begin_send_message(channel);
}
+static inline int red_channel_waiting_for_ack(RedChannel *channel)
+{
+ return (channel->ack_data.messages_window > channel->ack_data.client_window * 2);
+}
+
static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
{
PipeItem *item;
if (!channel || channel->send_data.blocked ||
- !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
- return NULL;
- }
-
- if (channel->ack_data.messages_window > channel->ack_data.client_window * 2) {
- channel->send_data.blocked = TRUE;
+ red_channel_waiting_for_ack(channel) ||
+ !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
return NULL;
}
@@ -9336,6 +9392,18 @@ static void free_common_channel_from_listener(EventListener *ctx)
free(common);
}
+static void red_channel_default_peer_on_error(RedChannel *channel)
+{
+ channel->disconnect(channel);
+}
+
+static int red_channel_peer_get_out_msg_size(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+
+ return channel->send_data.size;
+}
+
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
RedsStream *stream, int migrate,
event_listener_action_proc handler,
@@ -9380,6 +9448,17 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
ring_init(&channel->pipe);
channel->send_data.marshaller = spice_marshaller_new();
+ channel->outgoing.opaque = channel;
+ channel->outgoing.pos = 0;
+ channel->outgoing.size = 0;
+ channel->outgoing.out_bytes_counter = 0;
+
+ channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
+ channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
+ channel->outgoing.on_block = red_channel_peer_on_out_block;
+ channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
+ channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
+
event.events = EPOLLIN | EPOLLOUT | EPOLLET;
event.data.ptr = &common->listener;
if (epoll_ctl(worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) {