summaryrefslogtreecommitdiffstats
path: root/server/red_channel.h
diff options
context:
space:
mode:
Diffstat (limited to 'server/red_channel.h')
-rw-r--r--server/red_channel.h182
1 files changed, 182 insertions, 0 deletions
diff --git a/server/red_channel.h b/server/red_channel.h
new file mode 100644
index 00000000..1096ba70
--- /dev/null
+++ b/server/red_channel.h
@@ -0,0 +1,182 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#ifndef _H_RED_CHANNEL
+#define _H_RED_CHANNEL
+
+#include "red_common.h"
+#include "reds.h"
+#include "vd_interface.h"
+#include "ring.h"
+
+#define MAX_SEND_BUFS 1000
+#define MAX_SEND_VEC 50
+#define CLIENT_ACK_WINDOW 20
+
+/* Basic interface for channels, without using the RedChannel interface.
+ The intention is to move towards one channel interface gradually.
+ At the final stage, this interface shouldn't be exposed. Only RedChannel will use it. */
+
+typedef int (*handle_message_proc)(void *opaque,
+ RedDataHeader *header, uint8_t *msg);
+typedef uint8_t *(*alloc_msg_recv_buf_proc)(void *opaque, RedDataHeader *msg_header);
+typedef void (*release_msg_recv_buf_proc)(void *opaque,
+ RedDataHeader *msg_header, uint8_t *msg);
+typedef void (*on_incoming_error_proc)(void *opaque);
+
+typedef struct IncomingHandler {
+ void *opaque;
+ RedDataHeader header;
+ uint32_t header_pos;
+ uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
+ uint32_t msg_pos;
+ handle_message_proc handle_message;
+ alloc_msg_recv_buf_proc alloc_msg_buf;
+ on_incoming_error_proc on_error; // recv error or handle_message error
+ release_msg_recv_buf_proc release_msg_buf; // for errors
+} IncomingHandler;
+
+typedef int (*get_outgoing_msg_size_proc)(void *opaque);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
+typedef void (*on_outgoing_error_proc)(void *opaque);
+typedef void (*on_outgoing_block_proc)(void *opaque);
+typedef void (*on_outgoing_msg_done_proc)(void *opaque);
+typedef struct OutgoingHandler {
+ void *opaque;
+ struct iovec vec_buf[MAX_SEND_VEC];
+ int vec_size;
+ struct iovec *vec;
+ int pos;
+ int size;
+ get_outgoing_msg_size_proc get_msg_size;
+ prepare_outgoing_proc prepare;
+ on_outgoing_error_proc on_error;
+ on_outgoing_block_proc on_block;
+ on_outgoing_msg_done_proc on_msg_done;
+} OutgoingHandler;
+
+/* Red Channel interface */
+
+typedef struct BufDescriptor {
+ uint32_t size;
+ uint8_t *data;
+} BufDescriptor;
+
+typedef struct PipeItem {
+ RingItem link;
+ int type;
+} PipeItem;
+
+typedef struct RedChannel RedChannel;
+
+typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel,
+ RedDataHeader *msg_header);
+typedef int (*channel_handle_message_proc)(RedChannel *channel,
+ RedDataHeader *header, uint8_t *msg);
+typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel,
+ RedDataHeader *msg_header, uint8_t *msg);
+typedef void (*channel_disconnect_proc)(RedChannel *channel);
+typedef int (*channel_configure_socket_proc)(RedChannel *channel);
+typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item);
+typedef void (*channel_release_pipe_item_proc)(RedChannel *channel,
+ PipeItem *item, int item_pushed);
+
+struct RedChannel {
+ RedsStreamContext *peer;
+ CoreInterface *core;
+ int migrate;
+ int handle_acks;
+
+ struct {
+ uint32_t generation;
+ uint32_t client_generation;
+ uint32_t messages_window;
+ } ack_data;
+
+ Ring pipe;
+ uint32_t pipe_size;
+
+ struct {
+ RedDataHeader header;
+ union {
+ RedSetAck ack;
+ RedMigrate migrate;
+ } u;
+ uint32_t n_bufs;
+ BufDescriptor bufs[MAX_SEND_BUFS];
+ uint32_t size;
+ uint32_t not_sent_buf_head;
+
+ PipeItem *item;
+ int blocked;
+ } send_data;
+
+ OutgoingHandler outgoing;
+ IncomingHandler incoming;
+
+ channel_disconnect_proc disconnect;
+ channel_send_pipe_item_proc send_item;
+ channel_release_pipe_item_proc release_item;
+
+ int during_send;
+};
+
+/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
+ explicitly destroy the channel */
+RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core,
+ int migrate, int handle_acks,
+ channel_configure_socket_proc config_socket,
+ channel_disconnect_proc disconnect,
+ channel_handle_message_proc handle_message,
+ channel_alloc_msg_recv_buf_proc alloc_recv_buf,
+ channel_release_msg_recv_buf_proc release_recv_buf,
+ channel_send_pipe_item_proc send_item,
+ channel_release_pipe_item_proc release_item);
+
+void red_channel_destroy(RedChannel *channel);
+
+void red_channel_shutdown(RedChannel *channel);
+/* should be called when a new channel is ready to send messages */
+void red_channel_init_outgoing_messages_window(RedChannel *channel);
+
+/* handles general channel msgs from the client */
+int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg);
+
+/* when preparing send_data: should call reset, then init and then add_buf per buffer that is
+ being sent */
+void red_channel_reset_send_data(RedChannel *channel);
+void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item);
+void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
+
+uint64_t red_channel_get_message_serial(RedChannel *channel);
+
+/* when sending a msg. should first call red_channel_begin_send_massage */
+void red_channel_begin_send_massage(RedChannel *channel);
+
+void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type);
+void red_channel_pipe_add(RedChannel *channel, PipeItem *item);
+int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item);
+void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
+void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item);
+/* for types that use this routine -> the pipe item should be freed */
+void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type);
+
+#endif