/*
Copyright (C) 2009 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see .
Author:
yhalperi@redhat.com
*/
#ifdef HAVE_CONFIG_H
#include
#endif
#include
#include
#include
#include
#include
#include
#include
#include "stat.h"
#include "red_channel.h"
#include "generated_marshallers.h"
static PipeItem *red_channel_pipe_get(RedChannel *channel);
static void red_channel_event(int fd, int event, void *data);
/* return the number of bytes read. -1 in case of error */
static int red_peer_receive(RedsStream *stream, uint8_t *buf, uint32_t size)
{
uint8_t *pos = buf;
while (size) {
int now;
if (stream->shutdown) {
return -1;
}
now = reds_stream_read(stream, pos, size);
if (now <= 0) {
if (now == 0) {
return -1;
}
ASSERT(now == -1);
if (errno == EAGAIN) {
break;
} else if (errno == EINTR) {
continue;
} else if (errno == EPIPE) {
return -1;
} else {
red_printf("%s", strerror(errno));
return -1;
}
} else {
size -= now;
pos += now;
}
}
return pos - buf;
}
// TODO: this implementation, as opposed to the old implementation in red_worker,
// does many calls to red_peer_receive and through it cb_read, and thus avoids pointer
// arithmetic for the case where a single cb_read could return multiple messages. But
// this is suboptimal potentially. Profile and consider fixing.
static void red_peer_handle_incoming(RedsStream *stream, IncomingHandler *handler)
{
int bytes_read;
uint8_t *parsed;
size_t parsed_size;
message_destructor_t parsed_free;
for (;;) {
int ret_handle;
if (handler->header_pos < sizeof(SpiceDataHeader)) {
bytes_read = red_peer_receive(stream,
((uint8_t *)&handler->header) + handler->header_pos,
sizeof(SpiceDataHeader) - handler->header_pos);
if (bytes_read == -1) {
handler->cb->on_error(handler->opaque);
return;
}
handler->header_pos += bytes_read;
if (handler->header_pos != sizeof(SpiceDataHeader)) {
return;
}
}
if (handler->msg_pos < handler->header.size) {
if (!handler->msg) {
handler->msg = handler->cb->alloc_msg_buf(handler->opaque, &handler->header);
if (handler->msg == NULL) {
red_printf("ERROR: channel refused to allocate buffer.");
handler->cb->on_error(handler->opaque);
return;
}
}
bytes_read = red_peer_receive(stream,
handler->msg + handler->msg_pos,
handler->header.size - handler->msg_pos);
if (bytes_read == -1) {
handler->cb->release_msg_buf(handler->opaque, &handler->header, handler->msg);
handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos += bytes_read;
if (handler->msg_pos != handler->header.size) {
return;
}
}
if (handler->cb->parser) {
parsed = handler->cb->parser(handler->msg,
handler->msg + handler->header.size, handler->header.type,
SPICE_VERSION_MINOR, &parsed_size, &parsed_free);
if (parsed == NULL) {
red_printf("failed to parse message type %d", handler->header.type);
handler->cb->on_error(handler->opaque);
return;
}
ret_handle = handler->cb->handle_parsed(handler->opaque, parsed_size,
handler->header.type, parsed);
parsed_free(parsed);
} else {
ret_handle = handler->cb->handle_message(handler->opaque, &handler->header,
handler->msg);
}
if (handler->shut) {
handler->cb->on_error(handler->opaque);
return;
}
handler->msg_pos = 0;
handler->msg = NULL;
handler->header_pos = 0;
if (!ret_handle) {
handler->cb->on_error(handler->opaque);
return;
}
}
}
void red_channel_receive(RedChannel *channel)
{
red_peer_handle_incoming(channel->stream, &channel->incoming);
}
static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
{
ssize_t n;
if (handler->size == 0) {
handler->vec = handler->vec_buf;
handler->size = handler->cb->get_msg_size(handler->opaque);
if (!handler->size) { // nothing to be sent
return;
}
}
for (;;) {
handler->cb->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
n = reds_stream_writev(stream, handler->vec, handler->vec_size);
if (n == -1) {
switch (errno) {
case EAGAIN:
handler->cb->on_block(handler->opaque);
return;
case EINTR:
continue;
case EPIPE:
handler->cb->on_error(handler->opaque);
return;
default:
red_printf("%s", strerror(errno));
handler->cb->on_error(handler->opaque);
return;
}
} else {
handler->pos += n;
handler->cb->on_output(handler->opaque, n);
if (handler->pos == handler->size) { // finished writing data
handler->cb->on_msg_done(handler->opaque);
handler->vec = handler->vec_buf;
handler->pos = 0;
handler->size = 0;
return;
}
}
}
}
static void red_channel_on_output(void *opaque, int n)
{
RedChannel *channel = opaque;
stat_inc_counter(channel->out_bytes_counter, n);
}
void red_channel_default_peer_on_error(RedChannel *channel)
{
channel->disconnect(channel);
}
static void red_channel_peer_on_incoming_error(RedChannel *channel)
{
channel->on_incoming_error(channel);
}
static void red_channel_peer_on_outgoing_error(RedChannel *channel)
{
channel->on_outgoing_error(channel);
}
static int red_channel_peer_get_out_msg_size(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
return channel->send_data.size;
}
static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size, int pos)
{
RedChannel *channel = (RedChannel *)opaque;
*vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
vec, MAX_SEND_VEC, pos);
}
static void red_channel_peer_on_out_block(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
channel->send_data.blocked = TRUE;
channel->core->watch_update_mask(channel->stream->watch,
SPICE_WATCH_EVENT_READ |
SPICE_WATCH_EVENT_WRITE);
}
static void red_channel_reset_send_data(RedChannel *channel)
{
spice_marshaller_reset(channel->send_data.marshaller);
channel->send_data.header = (SpiceDataHeader *)
spice_marshaller_reserve_space(channel->send_data.marshaller, sizeof(SpiceDataHeader));
spice_marshaller_set_base(channel->send_data.marshaller, sizeof(SpiceDataHeader));
channel->send_data.header->type = 0;
channel->send_data.header->size = 0;
channel->send_data.header->sub_list = 0;
channel->send_data.header->serial = ++channel->send_data.serial;
}
void red_channel_push_set_ack(RedChannel *channel)
{
red_channel_pipe_add_type(channel, PIPE_ITEM_TYPE_SET_ACK);
}
static void red_channel_send_set_ack(RedChannel *channel)
{
SpiceMsgSetAck ack;
ASSERT(channel);
red_channel_init_send_data(channel, SPICE_MSG_SET_ACK, NULL);
ack.generation = ++channel->ack_data.generation;
ack.window = channel->ack_data.client_window;
channel->ack_data.messages_window = 0;
spice_marshall_msg_set_ack(channel->send_data.marshaller, &ack);
red_channel_begin_send_message(channel);
}
static void red_channel_send_item(RedChannel *channel, PipeItem *item)
{
red_channel_reset_send_data(channel);
switch (item->type) {
case PIPE_ITEM_TYPE_SET_ACK:
red_channel_send_set_ack(channel);
return;
}
/* only reached if not handled here */
channel->send_item(channel, item);
}
static void red_channel_release_item(RedChannel *channel, PipeItem *item, int item_pushed)
{
switch (item->type) {
case PIPE_ITEM_TYPE_SET_ACK:
free(item);
return;
}
/* only reached if not handled here */
channel->release_item(channel, item, item_pushed);
}
static void red_channel_peer_on_out_msg_done(void *opaque)
{
RedChannel *channel = (RedChannel *)opaque;
channel->send_data.size = 0;
if (channel->send_data.item) {
red_channel_release_item(channel, channel->send_data.item, TRUE);
channel->send_data.item = NULL;
}
if (channel->send_data.blocked) {
channel->send_data.blocked = FALSE;
channel->core->watch_update_mask(channel->stream->watch,
SPICE_WATCH_EVENT_READ);
}
}
RedChannel *red_channel_create(int size, RedsStream *stream,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
channel_disconnect_proc disconnect,
channel_handle_message_proc handle_message,
channel_alloc_msg_recv_buf_proc alloc_recv_buf,
channel_release_msg_recv_buf_proc release_recv_buf,
channel_hold_pipe_item_proc hold_item,
channel_send_pipe_item_proc send_item,
channel_release_pipe_item_proc release_item,
channel_handle_migrate_flush_mark handle_migrate_flush_mark,
channel_handle_migrate_data handle_migrate_data,
channel_handle_migrate_data_get_serial handle_migrate_data_get_serial)
{
RedChannel *channel;
ASSERT(size >= sizeof(*channel));
ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf &&
release_item);
channel = spice_malloc0(size);
channel->handle_acks = handle_acks;
channel->disconnect = disconnect;
channel->send_item = send_item;
channel->release_item = release_item;
channel->hold_item = hold_item;
channel->handle_migrate_flush_mark = handle_migrate_flush_mark;
channel->handle_migrate_data = handle_migrate_data;
channel->handle_migrate_data_get_serial = handle_migrate_data_get_serial;
channel->stream = stream;
channel->core = core;
channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
// block flags)
channel->ack_data.client_generation = ~0;
channel->ack_data.client_window = CLIENT_ACK_WINDOW;
channel->migrate = migrate;
ring_init(&channel->pipe);
channel->send_data.marshaller = spice_marshaller_new();
channel->incoming.opaque = channel;
channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
channel->incoming_cb.handle_message = (handle_message_proc)handle_message;
channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_default_peer_on_error;
channel->outgoing.opaque = channel;
channel->outgoing.pos = 0;
channel->outgoing.size = 0;
channel->outgoing_cb.get_msg_size = red_channel_peer_get_out_msg_size;
channel->outgoing_cb.prepare = red_channel_peer_prepare_out_msg;
channel->outgoing_cb.on_block = red_channel_peer_on_out_block;
channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
channel->outgoing_cb.on_msg_done = red_channel_peer_on_out_msg_done;
channel->outgoing_cb.on_output = red_channel_on_output;
channel->incoming.cb = &channel->incoming_cb;
channel->outgoing.cb = &channel->outgoing_cb;
channel->shut = 0; // came here from inputs, perhaps can be removed? XXX
channel->out_bytes_counter = 0;
if (!config_socket(channel)) {
goto error;
}
channel->stream->watch = channel->core->watch_add(channel->stream->socket,
SPICE_WATCH_EVENT_READ,
red_channel_event, channel);
return channel;
error:
spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
reds_stream_free(stream);
return NULL;
}
static void do_nothing_disconnect(RedChannel *red_channel)
{
}
static int do_nothing_handle_message(RedChannel *red_channel, SpiceDataHeader *header, uint8_t *msg)
{
return TRUE;
}
RedChannel *red_channel_create_parser(int size, RedsStream *stream,
SpiceCoreInterface *core,
int migrate, int handle_acks,
channel_configure_socket_proc config_socket,
spice_parse_channel_func_t parser,
channel_handle_parsed_proc handle_parsed,
channel_alloc_msg_recv_buf_proc alloc_recv_buf,
channel_release_msg_recv_buf_proc release_recv_buf,
channel_hold_pipe_item_proc hold_item,
channel_send_pipe_item_proc send_item,
channel_release_pipe_item_proc release_item,
channel_on_incoming_error_proc incoming_error,
channel_on_outgoing_error_proc outgoing_error,
channel_handle_migrate_flush_mark handle_migrate_flush_mark,
channel_handle_migrate_data handle_migrate_data,
channel_handle_migrate_data_get_serial handle_migrate_data_get_serial)
{
RedChannel *channel = red_channel_create(size, stream,
core, migrate, handle_acks, config_socket, do_nothing_disconnect,
do_nothing_handle_message, alloc_recv_buf, release_recv_buf, hold_item,
send_item, release_item, handle_migrate_flush_mark, handle_migrate_data,
handle_migrate_data_get_serial);
if (channel == NULL) {
return NULL;
}
channel->incoming_cb.handle_parsed = (handle_parsed_proc)handle_parsed;
channel->incoming_cb.parser = parser;
channel->on_incoming_error = incoming_error;
channel->on_outgoing_error = outgoing_error;
channel->incoming_cb.on_error = (on_incoming_error_proc)red_channel_peer_on_incoming_error;
channel->outgoing_cb.on_error = (on_outgoing_error_proc)red_channel_peer_on_outgoing_error;
return channel;
}
void red_channel_destroy(RedChannel *channel)
{
if (!channel) {
return;
}
red_channel_pipe_clear(channel);
reds_stream_free(channel->stream);
spice_marshaller_destroy(channel->send_data.marshaller);
free(channel);
}
void red_channel_shutdown(RedChannel *channel)
{
red_printf("");
if (channel->stream && !channel->stream->shutdown) {
channel->core->watch_update_mask(channel->stream->watch,
SPICE_WATCH_EVENT_READ);
red_channel_pipe_clear(channel);
shutdown(channel->stream->socket, SHUT_RDWR);
channel->stream->shutdown = TRUE;
channel->incoming.shut = TRUE;
}
}
void red_channel_init_outgoing_messages_window(RedChannel *channel)
{
channel->ack_data.messages_window = 0;
red_channel_push(channel);
}
static void red_channel_handle_migrate_flush_mark(RedChannel *channel)
{
if (channel->handle_migrate_flush_mark) {
channel->handle_migrate_flush_mark(channel);
}
}
static void red_channel_handle_migrate_data(RedChannel *channel, uint32_t size, void *message)
{
if (!channel->handle_migrate_data) {
return;
}
ASSERT(red_channel_get_message_serial(channel) == 0);
red_channel_set_message_serial(channel,
channel->handle_migrate_data_get_serial(channel, size, message));
channel->handle_migrate_data(channel, size, message);
}
int red_channel_handle_message(RedChannel *channel, uint32_t size,
uint16_t type, void *message)
{
switch (type) {
case SPICE_MSGC_ACK_SYNC:
if (size != sizeof(uint32_t)) {
red_printf("bad message size");
return FALSE;
}
channel->ack_data.client_generation = *(uint32_t *)(message);
break;
case SPICE_MSGC_ACK:
if (channel->ack_data.client_generation == channel->ack_data.generation) {
channel->ack_data.messages_window -= channel->ack_data.client_window;
red_channel_push(channel);
}
break;
case SPICE_MSGC_DISCONNECTING:
break;
case SPICE_MSGC_MIGRATE_FLUSH_MARK:
red_channel_handle_migrate_flush_mark(channel);
break;
case SPICE_MSGC_MIGRATE_DATA:
red_channel_handle_migrate_data(channel, size, message);
break;
default:
red_printf("invalid message type %u", type);
return FALSE;
}
return TRUE;
}
static void red_channel_event(int fd, int event, void *data)
{
RedChannel *channel = (RedChannel *)data;
if (event & SPICE_WATCH_EVENT_READ) {
red_channel_receive(channel);
}
if (event & SPICE_WATCH_EVENT_WRITE) {
red_channel_push(channel);
}
}
void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
{
ASSERT(channel->send_data.item == NULL);
channel->send_data.header->type = msg_type;
channel->send_data.item = item;
if (item) {
channel->hold_item(channel, item);
}
}
void red_channel_send(RedChannel *channel)
{
red_peer_handle_outgoing(channel->stream, &channel->outgoing);
}
void red_channel_begin_send_message(RedChannel *channel)
{
spice_marshaller_flush(channel->send_data.marshaller);
channel->send_data.size = spice_marshaller_get_total_size(channel->send_data.marshaller);
channel->send_data.header->size = channel->send_data.size - sizeof(SpiceDataHeader);
channel->ack_data.messages_window++;
channel->send_data.header = NULL; /* avoid writing to this until we have a new message */
red_channel_send(channel);
}
void red_channel_push(RedChannel *channel)
{
PipeItem *pipe_item;
if (!channel) {
return;
}
if (!channel->during_send) {
channel->during_send = TRUE;
} else {
return;
}
if (channel->send_data.blocked) {
red_channel_send(channel);
}
while ((pipe_item = red_channel_pipe_get(channel))) {
red_channel_send_item(channel, pipe_item);
}
channel->during_send = FALSE;
}
uint64_t red_channel_get_message_serial(RedChannel *channel)
{
return channel->send_data.serial;
}
void red_channel_set_message_serial(RedChannel *channel, uint64_t serial)
{
channel->send_data.serial = serial;
}
void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
{
ring_item_init(&item->link);
item->type = type;
}
void red_channel_pipe_add(RedChannel *channel, PipeItem *item)
{
ASSERT(channel);
channel->pipe_size++;
ring_add(&channel->pipe, &item->link);
}
void red_channel_pipe_add_push(RedChannel *channel, PipeItem *item)
{
ASSERT(channel);
channel->pipe_size++;
ring_add(&channel->pipe, &item->link);
red_channel_push(channel);
}
void red_channel_pipe_add_after(RedChannel *channel, PipeItem *item, PipeItem *pos)
{
ASSERT(channel);
ASSERT(pos);
ASSERT(item);
channel->pipe_size++;
ring_add_after(&item->link, &pos->link);
}
int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item)
{
return ring_item_is_linked(&item->link);
}
void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
{
ring_remove(&item->link);
}
void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item)
{
ASSERT(channel);
channel->pipe_size++;
ring_add_before(&item->link, &channel->pipe);
red_channel_push(channel);
}
void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type)
{
PipeItem *item = spice_new(PipeItem, 1);
red_channel_pipe_item_init(channel, item, pipe_item_type);
red_channel_pipe_add(channel, item);
red_channel_push(channel);
}
static inline int red_channel_waiting_for_ack(RedChannel *channel)
{
return (channel->handle_acks && (channel->ack_data.messages_window > channel->ack_data.client_window * 2));
}
static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
{
PipeItem *item;
if (!channel || channel->send_data.blocked ||
red_channel_waiting_for_ack(channel) ||
!(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
return NULL;
}
--channel->pipe_size;
ring_remove(&item->link);
return item;
}
int red_channel_is_connected(RedChannel *channel)
{
return !!channel->stream;
}
void red_channel_pipe_clear(RedChannel *channel)
{
PipeItem *item;
ASSERT(channel);
if (channel->send_data.item) {
red_channel_release_item(channel, channel->send_data.item, TRUE);
channel->send_data.item = NULL;
}
while ((item = (PipeItem *)ring_get_head(&channel->pipe))) {
ring_remove(&item->link);
red_channel_release_item(channel, item, FALSE);
}
channel->pipe_size = 0;
}
void red_channel_ack_zero_messages_window(RedChannel *channel)
{
channel->ack_data.messages_window = 0;
}
void red_channel_ack_set_client_window(RedChannel *channel, int client_window)
{
channel->ack_data.client_window = client_window;
}
int red_channel_all_blocked(RedChannel *channel)
{
return channel->send_data.blocked;
}
int red_channel_any_blocked(RedChannel *channel)
{
return channel->send_data.blocked;
}
int red_channel_send_message_pending(RedChannel *channel)
{
return channel->send_data.header->type != 0;
}
/* accessors for RedChannel */
SpiceMarshaller *red_channel_get_marshaller(RedChannel *channel)
{
return channel->send_data.marshaller;
}
RedsStream *red_channel_get_stream(RedChannel *channel)
{
return channel->stream;
}
SpiceDataHeader *red_channel_get_header(RedChannel *channel)
{
return channel->send_data.header;
}
/* end of accessors */
int red_channel_get_first_socket(RedChannel *channel)
{
if (!channel->stream) {
return -1;
}
return channel->stream->socket;
}
int red_channel_item_being_sent(RedChannel *channel, PipeItem *item)
{
return channel->send_data.item == item;
}
int red_channel_no_item_being_sent(RedChannel *channel)
{
return channel->send_data.item == NULL;
}
void red_channel_disconnect(RedChannel *channel)
{
red_channel_pipe_clear(channel);
reds_stream_free(channel->stream);
channel->stream = NULL;
channel->send_data.blocked = FALSE;
channel->send_data.size = 0;
}