diff options
Diffstat (limited to 'server/red_channel.c')
-rw-r--r-- | server/red_channel.c | 520 |
1 files changed, 520 insertions, 0 deletions
diff --git a/server/red_channel.c b/server/red_channel.c new file mode 100644 index 00000000..48ace448 --- /dev/null +++ b/server/red_channel.c @@ -0,0 +1,520 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include <stdio.h> +#include <stdint.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include "red_channel.h" + +static void red_channel_receive(void *data); +static void red_channel_push(RedChannel *channel); +static void red_channel_opaque_push(void *data); +static PipeItem *red_channel_pipe_get(RedChannel *channel); +static void red_channel_pipe_clear(RedChannel *channel); + +/* return the number of bytes read. -1 in case of error */ +static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size) +{ + uint8_t *pos = buf; + while (size) { + int now; + if (peer->shutdown) { + return -1; + } + if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) { + if (now == 0) { + return -1; + } + ASSERT(now == -1); + if (errno == EAGAIN) { + break; + } else if (errno == EINTR) { + continue; + } else if (errno == EPIPE) { + return -1; + } else { + red_printf("%s", strerror(errno)); + return -1; + } + } else { + size -= now; + pos += now; + } + } + return pos - buf; +} + +static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler) +{ + int bytes_read; + + for (;;) { + int ret_handle; + if (handler->header_pos < sizeof(RedDataHeader)) { + bytes_read = red_peer_receive(peer, + ((uint8_t *)&handler->header) + handler->header_pos, + sizeof(RedDataHeader) - handler->header_pos); + if (bytes_read == -1) { + handler->on_error(handler->opaque); + return; + } + handler->header_pos += bytes_read; + + if (handler->header_pos != sizeof(RedDataHeader)) { + return; + } + } + + if (handler->msg_pos < handler->header.size) { + if (!handler->msg) { + handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header); + } + + bytes_read = red_peer_receive(peer, + handler->msg + handler->msg_pos, + handler->header.size - handler->msg_pos); + if (bytes_read == -1) { + handler->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->on_error(handler->opaque); + return; + } + handler->msg_pos += bytes_read; + if (handler->msg_pos != handler->header.size) { + return; + } + } + + ret_handle = handler->handle_message(handler->opaque, &handler->header, + handler->msg); + handler->msg_pos = 0; + handler->msg = NULL; + handler->header_pos = 0; + + if (!ret_handle) { + handler->on_error(handler->opaque); + return; + } + } +} + +static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size) +{ + struct iovec *now = vec; + + while ((skip) && (skip >= now->iov_len)) { + skip -= now->iov_len; + --*vec_size; + now++; + } + + now->iov_base = (uint8_t *)now->iov_base + skip; + now->iov_len -= skip; + return now; +} + +static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler) +{ + int n; + if (handler->size == 0) { + handler->vec = handler->vec_buf; + handler->size = handler->get_msg_size(handler->opaque); + if (!handler->size) { // nothing to be sent + return; + } + handler->prepare(handler->opaque, handler->vec, &handler->vec_size); + } + for (;;) { + if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) { + switch (errno) { + case EAGAIN: + handler->on_block(handler->opaque); + return; + case EINTR: + continue; + case EPIPE: + handler->on_error(handler->opaque); + return; + default: + red_printf("%s", strerror(errno)); + handler->on_error(handler->opaque); + return; + } + } else { + handler->pos += n; + handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size); + if (!handler->vec_size) { + if (handler->pos == handler->size) { // finished writing data + handler->on_msg_done(handler->opaque); + handler->vec = handler->vec_buf; + handler->pos = 0; + handler->size = 0; + return; + } else { + // There wasn't enough place for all the outgoing data in one iovec array. + // Filling the rest of the data. + handler->vec = handler->vec_buf; + handler->prepare(handler->opaque, handler->vec, &handler->vec_size); + } + } + } + } +} + +static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size); + + +static void red_channel_peer_on_error(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->disconnect(channel); +} + +static int red_channel_peer_get_out_msg_size(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + return channel->send_data.size; +} + +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size) +{ + RedChannel *channel = (RedChannel *)opaque; + red_channel_fill_iovec(channel, vec, vec_size); +} + +static void red_channel_peer_on_out_block(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->send_data.blocked = TRUE; + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, red_channel_opaque_push, + channel); +} + +static void red_channel_peer_on_out_msg_done(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->send_data.size = 0; + channel->send_data.n_bufs = 0; + channel->send_data.not_sent_buf_head = 0; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + channel->send_data.item = NULL; + } + if (channel->send_data.blocked) { + channel->send_data.blocked = FALSE; + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, + channel); + } +} + +RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core, + int migrate, int handle_acks, + channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, + channel_handle_message_proc handle_message, + channel_alloc_msg_recv_buf_proc alloc_recv_buf, + channel_release_msg_recv_buf_proc release_recv_buf, + channel_send_pipe_item_proc send_item, + channel_release_pipe_item_proc release_item) +{ + RedChannel *channel; + + ASSERT(size >= sizeof(*channel)); + ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf && + release_item); + if (!(channel = malloc(size))) { + red_printf("malloc failed"); + goto error1; + } + memset(channel, 0, size); + + channel->handle_acks = handle_acks; + channel->disconnect = disconnect; + channel->send_item = send_item; + channel->release_item = release_item; + + channel->peer = peer; + channel->core = core; + channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + + // block flags) + channel->ack_data.client_generation = ~0; + + channel->migrate = migrate; + ring_init(&channel->pipe); + + channel->incoming.opaque = channel; + channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; + channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; + channel->incoming.handle_message = (handle_message_proc)handle_message; + channel->incoming.on_error = red_channel_peer_on_error; + + channel->outgoing.opaque = channel; + channel->outgoing.pos = 0; + channel->outgoing.size = 0; + + channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; + channel->outgoing.prepare = red_channel_peer_prepare_out_msg; + channel->outgoing.on_block = red_channel_peer_on_out_block; + channel->outgoing.on_error = red_channel_peer_on_error; + channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; + + if (!config_socket(channel)) { + goto error2; + } + + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, channel); + + return channel; + +error2: + free(channel); +error1: + peer->cb_free(peer); + + return NULL; +} + +void red_channel_destroy(RedChannel *channel) +{ + if (!channel) { + return; + } + red_channel_pipe_clear(channel); + channel->core->set_file_handlers(channel->core, channel->peer->socket, + NULL, NULL, NULL); + channel->peer->cb_free(channel->peer); + free(channel); +} + +void red_channel_shutdown(RedChannel *channel) +{ + red_printf(""); + if (!channel->peer->shutdown) { + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, channel); + red_channel_pipe_clear(channel); + shutdown(channel->peer->socket, SHUT_RDWR); + channel->peer->shutdown = TRUE; + } +} + +void red_channel_init_outgoing_messages_window(RedChannel *channel) +{ + channel->ack_data.messages_window = 0; + red_channel_push(channel); +} + +int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg) +{ + switch (header->type) { + case REDC_ACK_SYNC: + if (header->size != sizeof(uint32_t)) { + red_printf("bad message size"); + return FALSE; + } + channel->ack_data.client_generation = *(uint32_t *)(msg); + break; + case REDC_ACK: + if (channel->ack_data.client_generation == channel->ack_data.generation) { + channel->ack_data.messages_window -= CLIENT_ACK_WINDOW; + red_channel_push(channel); + } + break; + default: + red_printf("invalid message type %u", header->type); + return FALSE; + } + return TRUE; +} + +static void red_channel_receive(void *data) +{ + RedChannel *channel = (RedChannel *)data; + red_peer_handle_incoming(channel->peer, &channel->incoming); +} + +static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size) +{ + int pos = channel->send_data.n_bufs++; + ASSERT(pos < MAX_SEND_BUFS); + channel->send_data.bufs[pos].size = size; + channel->send_data.bufs[pos].data = data; +} + +void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size) +{ + __red_channel_add_buf(channel, data, size); + channel->send_data.header.size += size; +} + +void red_channel_reset_send_data(RedChannel *channel) +{ + channel->send_data.n_bufs = 0; + channel->send_data.header.size = 0; + channel->send_data.header.sub_list = 0; + ++channel->send_data.header.serial; + __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(RedDataHeader)); +} + +void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item) +{ + channel->send_data.header.type = msg_type; + channel->send_data.item = item; +} + +static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size) +{ + BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head; + ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs); + *vec_size = 0; + do { + vec[*vec_size].iov_base = buf->data; + vec[*vec_size].iov_len = buf->size; + (*vec_size)++; + buf++; + channel->send_data.not_sent_buf_head++; + } while (((*vec_size) < MAX_SEND_VEC) && + (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs)); +} + +static void red_channel_send(RedChannel *channel) +{ + red_peer_handle_outgoing(channel->peer, &channel->outgoing); +} + +void red_channel_begin_send_massage(RedChannel *channel) +{ + channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader); + channel->ack_data.messages_window++; + red_channel_send(channel); +} + +static void red_channel_push(RedChannel *channel) +{ + PipeItem *pipe_item; + + if (!channel->during_send) { + channel->during_send = TRUE; + } else { + return; + } + + if (channel->send_data.blocked) { + red_channel_send(channel); + } + + while ((pipe_item = red_channel_pipe_get(channel))) { + channel->send_item(channel, pipe_item); + } + channel->during_send = FALSE; +} + +static void red_channel_opaque_push(void *data) +{ + red_channel_push((RedChannel *)data); +} + +uint64_t red_channel_get_message_serial(RedChannel *channel) +{ + return channel->send_data.header.serial; +} + +void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type) +{ + ring_item_init(&item->link); + item->type = type; +} + +void red_channel_pipe_add(RedChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->pipe_size++; + ring_add(&channel->pipe, &item->link); + + red_channel_push(channel); +} + +int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item) +{ + return ring_item_is_linked(&item->link); +} + +void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item) +{ + ring_remove(&item->link); +} + +void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item) +{ + ASSERT(channel); + channel->pipe_size++; + ring_add_before(&item->link, &channel->pipe); + + red_channel_push(channel); +} + +void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type) +{ + PipeItem *item = malloc(sizeof(*item)); + if (!item) { + red_error("malloc failed"); + } + red_channel_pipe_item_init(channel, item, pipe_item_type); + red_channel_pipe_add(channel, item); + + red_channel_push(channel); +} + +static PipeItem *red_channel_pipe_get(RedChannel *channel) +{ + PipeItem *item; + + if (!channel || channel->send_data.blocked || + (channel->handle_acks && (channel->ack_data.messages_window > CLIENT_ACK_WINDOW * 2)) || + !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { + return NULL; + } + + --channel->pipe_size; + ring_remove(&item->link); + return item; +} + +static void red_channel_pipe_clear(RedChannel *channel) +{ + PipeItem *item; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + } + + while ((item = (PipeItem *)ring_get_head(&channel->pipe))) { + ring_remove(&item->link); + channel->release_item(channel, item, FALSE); + } +} + |