summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--client/Makefile.am4
-rw-r--r--client/application.cpp10
-rw-r--r--client/client_net_socket.cpp386
-rw-r--r--client/client_net_socket.h154
-rw-r--r--client/red_channel.cpp38
-rw-r--r--client/red_peer.cpp46
-rw-r--r--client/red_peer.h17
-rw-r--r--client/tunnel_channel.cpp792
-rw-r--r--client/tunnel_channel.h138
-rw-r--r--client/windows/platform_utils.cpp17
-rw-r--r--client/windows/platform_utils.h22
-rw-r--r--client/windows/redc.vcproj16
-rw-r--r--client/x11/Makefile.am6
-rw-r--r--client/x11/platform_utils.h21
-rw-r--r--common/red.h142
-rw-r--r--configure.ac5
-rw-r--r--server/Makefile.am6
-rw-r--r--server/red_channel.c520
-rw-r--r--server/red_channel.h182
-rw-r--r--server/red_tunnel_worker.c3510
-rwxr-xr-xserver/red_tunnel_worker.h29
-rw-r--r--server/reds.c17
-rw-r--r--server/reds.h3
-rw-r--r--server/vd_interface.h18
24 files changed, 6016 insertions, 83 deletions
diff --git a/client/Makefile.am b/client/Makefile.am
index bb78ba49..51d4857e 100644
--- a/client/Makefile.am
+++ b/client/Makefile.am
@@ -15,6 +15,8 @@ RED_COMMON_SRCS = \
canvas_utils.cpp \
red_cairo_canvas.cpp \
red_cairo_canvas.h \
+ client_net_socket.cpp \
+ client_net_socket.h \
cmd_line_parser.cpp \
cmd_line_parser.h \
common.h \
@@ -71,6 +73,8 @@ RED_COMMON_SRCS = \
screen_layer.cpp \
screen_layer.h \
shared_cache.hpp \
+ tunnel_channel.cpp \
+ tunnel_channel.h \
hot_keys.cpp \
hot_keys.h \
threads.cpp \
diff --git a/client/application.cpp b/client/application.cpp
index b30baa84..65937156 100644
--- a/client/application.cpp
+++ b/client/application.cpp
@@ -40,6 +40,7 @@
#include "quic.h"
#include "mutex.h"
#include "cmd_line_parser.h"
+#include "tunnel_channel.h"
#include <log4cpp/BasicConfigurator.hh>
#include <log4cpp/FileAppender.hh>
@@ -236,7 +237,7 @@ enum AppCommands {
Application::Application()
: _client (*this)
- , _enabled_channels(RED_CHANNEL_END, true)
+ , _enabled_channels (RED_CHANNEL_END, true)
, _main_screen (NULL)
, _quitting (false)
, _active (false)
@@ -1323,6 +1324,7 @@ bool Application::set_channels_security(CmdLineParser& parser, bool on, char *va
channels_names["cursor"] = RED_CHANNEL_CURSOR;
channels_names["playback"] = RED_CHANNEL_PLAYBACK;
channels_names["record"] = RED_CHANNEL_RECORD;
+ channels_names["tunnel"] = RED_CHANNEL_TUNNEL;
if (!strcmp(val, "all")) {
if ((val = parser.next_argument())) {
@@ -1382,6 +1384,7 @@ bool Application::set_enable_channels(CmdLineParser& parser, bool enable, char *
channels_names["cursor"] = RED_CHANNEL_CURSOR;
channels_names["playback"] = RED_CHANNEL_PLAYBACK;
channels_names["record"] = RED_CHANNEL_RECORD;
+ channels_names["tunnel"] = RED_CHANNEL_TUNNEL;
if (!strcmp(val, "all")) {
if ((val = parser.next_argument())) {
@@ -1460,6 +1463,7 @@ bool Application::process_cmd_line(int argc, char** argv)
_peer_con_opt[RED_CHANNEL_CURSOR] = RedPeer::ConnectionOptions::CON_OP_INVALID;
_peer_con_opt[RED_CHANNEL_PLAYBACK] = RedPeer::ConnectionOptions::CON_OP_INVALID;
_peer_con_opt[RED_CHANNEL_RECORD] = RedPeer::ConnectionOptions::CON_OP_INVALID;
+ _peer_con_opt[RED_CHANNEL_TUNNEL] = RedPeer::ConnectionOptions::CON_OP_INVALID;
parser.begin(argc, argv);
@@ -1595,6 +1599,10 @@ bool Application::process_cmd_line(int argc, char** argv)
_client.register_channel_factory(RecordChannel::Factory());
}
+ if (_enabled_channels[RED_CHANNEL_TUNNEL]) {
+ _client.register_channel_factory(TunnelChannel::Factory());
+ }
+
_client.init(host.c_str(), port, sport, password.c_str(), auto_display_res);
if (auto_display_res) {
Monitor* mon = find_monitor(0);
diff --git a/client/client_net_socket.cpp b/client/client_net_socket.cpp
new file mode 100644
index 00000000..50174795
--- /dev/null
+++ b/client/client_net_socket.cpp
@@ -0,0 +1,386 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#include "common.h"
+#include "client_net_socket.h"
+#include "debug.h"
+#include "red_error_codes.h"
+#include "utils.h"
+
+ClientNetSocket::ClientNetSocket(uint16_t id, const struct in_addr& dst_addr, uint16_t dst_port,
+ EventsLoop& events_loop, EventHandler& event_handler)
+ : _id (id)
+ , _local_addr (dst_addr)
+ , _local_port (dst_port)
+ , _peer (INVALID_SOCKET)
+ , _events_loop (events_loop)
+ , _event_handler (event_handler)
+ , _num_recv_tokens (0)
+ , _send_message (NULL)
+ , _send_pos (0)
+ , _status (SOCKET_STATUS_CLOSED)
+ , _fin_pending (false)
+ , _close_pending (false)
+{
+}
+
+ClientNetSocket::~ClientNetSocket()
+{
+ close();
+}
+
+bool ClientNetSocket::connect(uint32_t recv_tokens)
+{
+ struct sockaddr_in addr;
+ int no_delay;
+
+
+ ASSERT(_peer == INVALID_SOCKET && _status == SOCKET_STATUS_CLOSED);
+
+ addr.sin_port = _local_port;
+ addr.sin_addr.s_addr = _local_addr.s_addr;
+ addr.sin_family = AF_INET;
+
+ if ((_peer = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) {
+ int err = sock_error();
+ THROW("%s: failed to create socket: %s", __FUNCTION__, sock_err_message(err));
+ }
+
+ no_delay = 1;
+ if (setsockopt(_peer, IPPROTO_TCP, TCP_NODELAY,
+ (const char*)&no_delay, sizeof(no_delay)) == SOCKET_ERROR) {
+ LOG_WARN("set TCP_NODELAY failed");
+ }
+
+ LOG_INFO("connect to ip=%s port=%d (connection_id=%d)",
+ inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), _id);
+
+ if (::connect(_peer, (struct sockaddr*)&addr, sizeof(sockaddr_in)) == SOCKET_ERROR) {
+ int err = sock_error();
+ closesocket(_peer);
+ _peer = INVALID_SOCKET;
+ LOG_INFO("connect to ip=%s port=%d failed %s (connection_id=%d)",
+ inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), sock_err_message(err), _id);
+ return false;
+ }
+
+ _events_loop.add_socket(*this);
+ _status = SOCKET_STATUS_OPEN;
+ _num_recv_tokens = recv_tokens;
+ return true;
+}
+
+void ClientNetSocket::push_disconnect()
+{
+ if ((_status == SOCKET_STATUS_CLOSED) || _close_pending) {
+ THROW("%s: disconnect attempt for disconnected socket %s %d", __FUNCTION__,
+ inet_ntoa(_local_addr), ntohs(_local_port));
+ }
+
+ _close_pending = true;
+
+ if (!during_send()) {
+ close_and_tell();
+ }
+}
+
+void ClientNetSocket::push_fin()
+{
+ if ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_RECEIVED_FIN)) {
+ _fin_pending = true;
+ if (!during_send()) {
+ try {
+ apply_guest_fin();
+ } catch (ClientNetSocket::ShutdownExcpetion&) {
+ close_and_tell();
+ }
+ }
+ } else {
+ THROW("%s: unexpected fin connection_id=%d (status=%d)", __FUNCTION__,
+ _id, _status);
+ }
+}
+
+void ClientNetSocket::add_recv_tokens(uint32_t num_tokens)
+{
+ if ((_status == SOCKET_STATUS_CLOSED) || _close_pending) {
+ THROW("%s: ack attempt for disconnected socket connection_id=%d", __FUNCTION__,
+ _id);
+ }
+
+ _num_recv_tokens += num_tokens;
+
+ // recv might have not been called because tokens weren't available
+ if (_num_recv_tokens && (_num_recv_tokens == num_tokens)) {
+ if (can_receive()) {
+ receive();
+ }
+ }
+}
+
+void ClientNetSocket::push_send(SendBuffer& buf)
+{
+ if (!can_send()) {
+ THROW("%s: unexpected send attempt for connection_id=%d (status = %d)",
+ __FUNCTION__, _id, _status);
+ }
+
+ if (_fin_pending || _close_pending) {
+ THROW("%s: unexpected send attempt for connection_id=% - shutdown send pending",
+ __FUNCTION__, _id);
+ }
+
+ _send_messages.push_back(buf.ref());
+ send();
+}
+
+void ClientNetSocket::on_event()
+{
+ if (can_send()) {
+ send();
+ }
+
+ if (!during_send()) {
+ if (_close_pending) {
+ close_and_tell();
+ } else if (_fin_pending) {
+ apply_guest_fin();
+ }
+ }
+
+ if (can_receive()) {
+ receive();
+ }
+}
+
+void ClientNetSocket::apply_guest_fin()
+{
+ if (_status == SOCKET_STATUS_OPEN) {
+ if (shutdown(_peer, SHUT_WR) == SOCKET_ERROR) {
+ int err = sock_error();
+ LOG_INFO("shutdown in connection_id=%d failed %s", _id, sock_err_message(err));
+ throw ClientNetSocket::ShutdownExcpetion();
+ }
+
+ _fin_pending = false;
+ _status = SOCKET_STATUS_SENT_FIN;
+ } else if (_status == SOCKET_STATUS_RECEIVED_FIN) {
+ close_and_tell();
+ }
+}
+
+void ClientNetSocket::handle_client_fin()
+{
+ if (_status == SOCKET_STATUS_OPEN) {
+ _status = SOCKET_STATUS_RECEIVED_FIN;
+ _event_handler.on_socket_fin_recv(*this);
+ } else if (_status == SOCKET_STATUS_SENT_FIN) {
+ close_and_tell();
+ }
+}
+
+inline bool ClientNetSocket::during_send()
+{
+ return ((!_send_messages.empty()) || _send_message);
+}
+
+inline bool ClientNetSocket::can_send()
+{
+ return ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_RECEIVED_FIN));
+}
+
+inline bool ClientNetSocket::can_receive()
+{
+ return ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_SENT_FIN));
+}
+
+void ClientNetSocket::send_message_done()
+{
+ _send_message->unref();
+ _send_message = NULL;
+ _send_pos = 0;
+ _event_handler.on_socket_message_send_done(*this);
+}
+
+void ClientNetSocket::send()
+{
+ ASSERT(_peer != INVALID_SOCKET);
+ try {
+ if (_send_message) {
+ _send_pos += send_buf(_send_message->data() + _send_pos,
+ _send_message->size() - _send_pos);
+
+ if (_send_pos != _send_message->size()) {
+ return;
+ } else {
+ send_message_done();
+ }
+ }
+
+ while (!_send_messages.empty()) {
+ _send_message = _send_messages.front();
+ _send_messages.pop_front();
+ _send_pos = send_buf(_send_message->data(), _send_message->size());
+ if (_send_pos != _send_message->size()) {
+ return;
+ } else {
+ send_message_done();
+ }
+ }
+ } catch (ClientNetSocket::SendException&) {
+ close_and_tell();
+ }
+}
+
+uint32_t ClientNetSocket::send_buf(const uint8_t* buf, uint32_t size)
+{
+ const uint8_t* pos = buf;
+ ASSERT(_peer != INVALID_SOCKET);
+ while (size) {
+ int now;
+ if ((now = ::send(_peer, (char*)pos, size, MSG_NOSIGNAL)) == SOCKET_ERROR) {
+ int err = sock_error();
+ if (err == WOULDBLOCK_ERR) {
+ break;
+ }
+
+ if (err == INTERRUPTED_ERR) {
+ continue;
+ }
+
+ LOG_INFO("send in connection_id=%d failed %s", _id, sock_err_message(err));
+ throw ClientNetSocket::SendException();
+ }
+ size -= now;
+ pos += now;
+ }
+ return pos - buf;
+}
+
+void ClientNetSocket::receive()
+{
+ ASSERT(_peer != INVALID_SOCKET);
+ bool shutdown;
+ while (_num_recv_tokens) {
+ ReceiveBuffer& rcv_buf = alloc_receive_buffer();
+ uint32_t size;
+
+ try {
+ size = receive_buf(rcv_buf.buf(), rcv_buf.buf_max_size(), shutdown);
+ } catch (ClientNetSocket::ReceiveException&) {
+ rcv_buf.release_buf();
+ close_and_tell();
+ return;
+ }
+
+ if (size) {
+ rcv_buf.set_buf_size(size);
+ _num_recv_tokens--;
+ _event_handler.on_socket_message_recv_done(*this, rcv_buf);
+ } else {
+ rcv_buf.release_buf();
+ }
+
+ if (shutdown) {
+ handle_client_fin();
+ return;
+ }
+
+ if (size < rcv_buf.buf_max_size()) {
+ return;
+ }
+ }
+}
+
+uint32_t ClientNetSocket::receive_buf(uint8_t* buf, uint32_t max_size, bool& shutdown)
+{
+ uint8_t* pos = buf;
+ ASSERT(_peer != INVALID_SOCKET);
+
+ shutdown = false;
+
+ while (max_size) {
+ int now;
+ if ((now = ::recv(_peer, (char*)pos, max_size, 0)) <= 0) {
+ if (now == 0) {
+ shutdown = true;
+ break; // a case where fin is received, but before that, there is a msg
+ }
+
+ int err = sock_error();
+
+ if (err == WOULDBLOCK_ERR) {
+ break;
+ }
+
+ if (err == INTERRUPTED_ERR) {
+ continue;
+ }
+
+ LOG_INFO("receive in connection_id=%d failed errno=%s", _id, sock_err_message(err));
+ throw ClientNetSocket::ReceiveException();
+ }
+ max_size -= now;
+ pos += now;
+ }
+
+ return (pos - buf);
+}
+
+void ClientNetSocket::release_wait_send_messages()
+{
+ if (_send_message) {
+ _send_message->unref();
+ _send_message = NULL;
+ _send_pos = 0;
+ }
+
+ while (!_send_messages.empty()) {
+ _send_messages.front()->unref();
+ _send_messages.pop_front();
+ }
+}
+
+void ClientNetSocket::close()
+{
+ release_wait_send_messages();
+ apply_disconnect();
+}
+
+void ClientNetSocket::close_and_tell()
+{
+ close();
+ _event_handler.on_socket_disconnect(*this);
+}
+
+void ClientNetSocket::apply_disconnect()
+{
+ if (_peer != INVALID_SOCKET) {
+ _events_loop.remove_socket(*this);
+ closesocket(_peer);
+ _peer = INVALID_SOCKET;
+ LOG_INFO("closing connection_id=%d", _id);
+ }
+ _status = SOCKET_STATUS_CLOSED;
+ _close_pending = false;
+ _fin_pending = false;
+}
+
diff --git a/client/client_net_socket.h b/client/client_net_socket.h
new file mode 100644
index 00000000..8ca1cfcf
--- /dev/null
+++ b/client/client_net_socket.h
@@ -0,0 +1,154 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#ifndef _H_CLIENT_NET_SOCKET
+#define _H_CLIENT_NET_SOCKET
+
+#include "platform_utils.h"
+#include "common.h"
+#include "events_loop.h"
+
+/* intterface for connenctions inside client LAN */
+
+typedef enum {
+ SOCKET_STATUS_OPEN,
+ SOCKET_STATUS_SENT_FIN,
+ SOCKET_STATUS_RECEIVED_FIN,
+ SOCKET_STATUS_CLOSED,
+} SocketStatus;
+
+class ClientNetSocket: public EventsLoop::Socket {
+public:
+ class ReceiveBuffer;
+ class SendBuffer;
+
+ class EventHandler;
+
+ class SendException {};
+ class ReceiveException {};
+ class ShutdownExcpetion {};
+
+ ClientNetSocket(uint16_t id, const struct in_addr& dst_addr, uint16_t dst_port,
+ EventsLoop& events_loop, ClientNetSocket::EventHandler& event_handler);
+ virtual ~ClientNetSocket();
+
+ bool connect(uint32_t recv_tokens);
+ void push_disconnect();
+ void push_fin();
+ void push_send(SendBuffer& buf);
+ void add_recv_tokens(uint32_t num_tokens);
+
+ bool is_connected() {return _status != SOCKET_STATUS_CLOSED;}
+
+ inline uint16_t id() {return _id;}
+ inline const struct in_addr& local_addr() {return _local_addr;}
+ inline uint16_t local_port() {return _local_port;}
+
+ /* EventsLoop::Socket interface */
+ void on_event();
+ int get_socket() {return _peer;}
+
+protected:
+ virtual ReceiveBuffer& alloc_receive_buffer() = 0;
+
+private:
+ void send();
+ void receive();
+
+ uint32_t send_buf(const uint8_t* buf, uint32_t size);
+ uint32_t receive_buf(uint8_t* buf, uint32_t max_size, bool& shutdown);
+
+ bool can_receive();
+ bool can_send();
+
+ void apply_disconnect();
+ void apply_guest_fin();
+
+ void close();
+ void close_and_tell();
+
+ void handle_client_fin();
+
+ bool during_send();
+ void release_wait_send_messages();
+ void clear();
+ void send_message_done();
+
+private:
+ uint16_t _id;
+ struct in_addr _local_addr;
+ uint16_t _local_port;
+
+ SOCKET _peer;
+
+ EventsLoop& _events_loop;
+
+ EventHandler& _event_handler;
+
+ uint32_t _num_recv_tokens;
+
+ std::list<SendBuffer*> _send_messages;
+ SendBuffer* _send_message;
+ uint32_t _send_pos;
+
+ SocketStatus _status;
+ bool _fin_pending;
+ bool _close_pending;
+};
+
+class ClientNetSocket::ReceiveBuffer {
+public:
+ ReceiveBuffer() {}
+
+ virtual uint8_t* buf() = 0;
+ virtual uint32_t buf_max_size() = 0;
+ virtual void set_buf_size(uint32_t size) = 0;
+ virtual void release_buf() = 0;
+
+protected:
+ virtual ~ReceiveBuffer() {}
+};
+
+class ClientNetSocket::SendBuffer {
+public:
+ SendBuffer() {};
+
+ virtual const uint8_t* data() = 0;
+ virtual uint32_t size() = 0;
+ virtual ClientNetSocket::SendBuffer* ref() = 0;
+ virtual void unref() = 0;
+
+protected:
+ virtual ~SendBuffer() {}
+};
+
+class ClientNetSocket::EventHandler {
+public:
+ EventHandler() {}
+ virtual ~EventHandler() {}
+ virtual void on_socket_message_recv_done(ClientNetSocket& sckt, ReceiveBuffer& buf) = 0;
+ virtual void on_socket_message_send_done(ClientNetSocket& sckt) = 0;
+ virtual void on_socket_disconnect(ClientNetSocket& sckt) = 0;
+ virtual void on_socket_fin_recv(ClientNetSocket& sckt) = 0;
+};
+
+
+
+#endif
diff --git a/client/red_channel.cpp b/client/red_channel.cpp
index 4c6f1f8f..a82d9f77 100644
--- a/client/red_channel.cpp
+++ b/client/red_channel.cpp
@@ -133,7 +133,7 @@ void RedChannelBase::link(uint32_t connection_id, const std::string& password)
*/
if (RSA_public_encrypt(password.length() + 1, (unsigned char *)password.c_str(),
(uint8_t *)bufEncrypted.get(),
- rsa, RSA_PKCS1_OAEP_PADDING) > 0 ) {
+ rsa, RSA_PKCS1_OAEP_PADDING) > 0) {
send((uint8_t*)bufEncrypted.get(), nRSASize);
} else {
THROW("could not encrypt password");
@@ -425,8 +425,10 @@ void RedChannel::run()
_outgoing_message = NULL;
}
_incomming_header_pos = 0;
- delete _incomming_message;
- _incomming_message = NULL;
+ if (_incomming_message) {
+ _incomming_message->unref();
+ _incomming_message = NULL;
+ }
case DISCONNECT_ACTION:
close();
on_disconnect();
@@ -525,19 +527,19 @@ void RedChannel::recive_messages()
_incomming_header_pos = n;
return;
}
- std::auto_ptr<CompundInMessage> message(new CompundInMessage(_incomming_header.serial,
- _incomming_header.type,
- _incomming_header.size,
- _incomming_header.sub_list));
- n = RedPeer::recive(message->data(), message->compund_size());
- if (n != message->compund_size()) {
+ AutoRef<CompundInMessage> message(new CompundInMessage(_incomming_header.serial,
+ _incomming_header.type,
+ _incomming_header.size,
+ _incomming_header.sub_list));
+ n = RedPeer::recive((*message)->data(), (*message)->compund_size());
+ if (n != (*message)->compund_size()) {
_incomming_message = message.release();
_incomming_message_pos = n;
return;
}
on_message_recived();
- _message_handler->handle_message(*message.get());
- on_message_complition(message->serial());
+ _message_handler->handle_message(*(*message));
+ on_message_complition((*message)->serial());
}
}
@@ -577,11 +579,11 @@ void RedChannel::on_event()
if (_incomming_message_pos != _incomming_message->compund_size()) {
return;
}
- std::auto_ptr<CompundInMessage> message(_incomming_message);
+ AutoRef<CompundInMessage> message(_incomming_message);
_incomming_message = NULL;
on_message_recived();
- _message_handler->handle_message(*message.get());
- on_message_complition(message->serial());
+ _message_handler->handle_message(*(*message));
+ on_message_complition((*message)->serial());
}
recive_messages();
}
@@ -616,18 +618,18 @@ void RedChannel::handle_migrate(RedPeer::InMessage* message)
if (migrate->flags & RED_MIGRATE_NEED_FLUSH) {
send_migrate_flush_mark();
}
- std::auto_ptr<RedPeer::CompundInMessage> data_message;
+ AutoRef<CompundInMessage> data_message;
if (migrate->flags & RED_MIGRATE_NEED_DATA_TRANSFER) {
data_message.reset(recive());
}
_client.migrate_channel(*this);
if (migrate->flags & RED_MIGRATE_NEED_DATA_TRANSFER) {
- if (data_message->type() != RED_MIGRATE_DATA) {
+ if ((*data_message)->type() != RED_MIGRATE_DATA) {
THROW("expect RED_MIGRATE_DATA");
}
std::auto_ptr<RedPeer::OutMessage> message(new RedPeer::OutMessage(REDC_MIGRATE_DATA,
- data_message->size()));
- memcpy(message->data(), data_message->data(), data_message->size());
+ (*data_message)->size()));
+ memcpy(message->data(), (*data_message)->data(), (*data_message)->size());
send(*message);
}
_loop.add_socket(*this);
diff --git a/client/red_peer.cpp b/client/red_peer.cpp
index e20d5ca6..dad035d4 100644
--- a/client/red_peer.cpp
+++ b/client/red_peer.cpp
@@ -16,54 +16,12 @@
*/
#include "common.h"
-#ifdef _WIN32
-#include <winsock2.h>
-#include <ws2tcpip.h>
-
-#define SHUT_RDWR SD_BOTH
-#else
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <netdb.h>
-
-#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-#define closesocket(sock) ::close(sock)
-#endif
#include "red.h"
#include "red_peer.h"
#include "utils.h"
#include "debug.h"
#include "platform_utils.h"
-#ifdef _WIN32
-
-int inet_aton(const char *ip, struct in_addr *in_addr)
-{
- unsigned long addr = inet_addr(ip);
-
- if (addr == INADDR_NONE) {
- return 0;
- }
- in_addr->S_un.S_addr = addr;
- return 1;
-}
-
-#define SHUTDOWN_ERR WSAESHUTDOWN
-#define INTERRUPTED_ERR WSAEINTR
-#define WOULDBLOCK_ERR WSAEWOULDBLOCK
-#define sock_error() WSAGetLastError()
-#define sock_err_message(err) sys_err_to_str(err)
-#else
-#define SHUTDOWN_ERR EPIPE
-#define INTERRUPTED_ERR EINTR
-#define WOULDBLOCK_ERR EAGAIN
-#define sock_error() errno
-#define sock_err_message(err) strerror(err)
-#endif
-
static void ssl_error()
{
ERR_print_errors_fp(stderr);
@@ -326,11 +284,11 @@ uint32_t RedPeer::recive(uint8_t *buf, uint32_t size)
RedPeer::CompundInMessage* RedPeer::recive()
{
RedDataHeader header;
- std::auto_ptr<CompundInMessage> message;
+ AutoRef<CompundInMessage> message;
recive((uint8_t*)&header, sizeof(RedDataHeader));
message.reset(new CompundInMessage(header.serial, header.type, header.size, header.sub_list));
- recive(message->data(), message->compund_size());
+ recive((*message)->data(), (*message)->compund_size());
return message.release();
}
diff --git a/client/red_peer.h b/client/red_peer.h
index f1db1814..eb0597d8 100644
--- a/client/red_peer.h
+++ b/client/red_peer.h
@@ -18,12 +18,6 @@
#ifndef _H_REDPEER
#define _H_REDPEER
-#ifdef _WIN32
-#include <winsock.h>
-#else
-typedef int SOCKET;
-#endif
-
#include <openssl/ssl.h>
#include <openssl/err.h>
@@ -31,6 +25,7 @@ typedef int SOCKET;
#include "red.h"
#include "events_loop.h"
#include "threads.h"
+#include "platform_utils.h"
class RedPeer: protected EventsLoop::Socket {
public:
@@ -116,7 +111,7 @@ private:
class RedPeer::InMessage {
public:
- InMessage(uint16_t type, uint32_t size, uint8_t* data)
+ InMessage(uint16_t type, uint32_t size, uint8_t * data)
: _type (type)
, _size (size)
, _data (data)
@@ -139,12 +134,14 @@ class RedPeer::CompundInMessage: public RedPeer::InMessage {
public:
CompundInMessage(uint64_t _serial, uint16_t type, uint32_t size, uint32_t sub_list)
: InMessage(type, size, new uint8_t[size])
+ , _refs (1)
, _serial (_serial)
, _sub_list (sub_list)
{
}
- virtual ~CompundInMessage() { delete[] _data;}
+ RedPeer::InMessage* ref() { _refs++; return this;}
+ void unref() {if (!--_refs) delete this;}
uint64_t serial() { return _serial;}
uint32_t sub_list() { return _sub_list;}
@@ -152,7 +149,11 @@ public:
virtual uint32_t size() { return _sub_list ? _sub_list : _size;}
uint32_t compund_size() {return _size;}
+protected:
+ virtual ~CompundInMessage() { delete[] _data;}
+
private:
+ int _refs;
uint64_t _serial;
uint32_t _sub_list;
};
diff --git a/client/tunnel_channel.cpp b/client/tunnel_channel.cpp
new file mode 100644
index 00000000..1d28ee48
--- /dev/null
+++ b/client/tunnel_channel.cpp
@@ -0,0 +1,792 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#include "common.h"
+#include "tunnel_channel.h"
+#include "red.h"
+
+#define SOCKET_WINDOW_SIZE 60
+#define SOCKET_TOKENS_TO_SEND 20
+
+/* classes for tunneling msgs without reallocations and memcpy */
+
+class InSocketMessage;
+class OutSocketMessage;
+
+class InSocketMessage: public ClientNetSocket::SendBuffer {
+public:
+ InSocketMessage(RedChannel::CompundInMessage& full_msg);
+
+ const uint8_t* data();
+ uint32_t size();
+ ClientNetSocket::SendBuffer* ref();
+ void unref();
+
+protected:
+ virtual ~InSocketMessage() {}
+
+private:
+ int _refs;
+ RedChannel::CompundInMessage& _full_msg;
+ RedTunnelSocketData* _sckt_msg;
+ uint32_t _buf_size;
+};
+
+InSocketMessage::InSocketMessage(RedChannel::CompundInMessage& full_msg)
+ : _refs (1)
+ , _full_msg (full_msg)
+{
+ ASSERT(full_msg.type() == RED_TUNNEL_SOCKET_DATA);
+ _full_msg.ref();
+ _sckt_msg = (RedTunnelSocketData*)(_full_msg.data());
+ _buf_size = _full_msg.size() - sizeof(RedTunnelSocketData);
+}
+
+const uint8_t* InSocketMessage::data()
+{
+ return _sckt_msg->data;
+}
+
+uint32_t InSocketMessage::size()
+{
+ return _buf_size;
+}
+
+ClientNetSocket::SendBuffer* InSocketMessage::ref()
+{
+ _full_msg.ref();
+ _refs++;
+ return this;
+}
+
+void InSocketMessage::unref()
+{
+ _full_msg.unref();
+ if (!--_refs) {
+ delete this;
+ }
+}
+
+class OutSocketMessage: public RedPeer::OutMessage,
+ public RedChannel::OutMessage,
+ public ClientNetSocket::ReceiveBuffer {
+public:
+
+ virtual RedPeer::OutMessage& peer_message() { return *this;}
+ virtual void release();
+
+ virtual uint8_t* buf();
+ virtual uint32_t buf_max_size() {return _max_data_size;}
+ virtual void set_buf_size(uint32_t size);
+ virtual void release_buf();
+
+ static void init(uint32_t max_data_size);
+ static OutSocketMessage& alloc_message();
+ static void clear_free_messages();
+
+protected:
+ OutSocketMessage();
+ virtual ~OutSocketMessage() {}
+
+private:
+ static std::list<OutSocketMessage*> _free_messages;
+ static uint32_t _max_data_size;
+};
+
+std::list<OutSocketMessage*> OutSocketMessage::_free_messages;
+uint32_t OutSocketMessage::_max_data_size;
+
+OutSocketMessage::OutSocketMessage()
+ : RedPeer::OutMessage(REDC_TUNNEL_SOCKET_DATA, sizeof(RedcTunnelSocketData) + _max_data_size)
+ , RedChannel::OutMessage()
+ , ClientNetSocket::ReceiveBuffer()
+{
+}
+
+uint8_t* OutSocketMessage::buf()
+{
+ return ((RedcTunnelSocketData*)RedPeer::OutMessage::data())->data;
+}
+
+void OutSocketMessage::set_buf_size(uint32_t size)
+{
+ RedPeer::OutMessage::header().size = size + sizeof(RedcTunnelSocketData);
+}
+
+void OutSocketMessage::release()
+{
+ OutSocketMessage::_free_messages.push_front(this);
+}
+
+void OutSocketMessage::release_buf()
+{
+ release();
+}
+
+void OutSocketMessage::init(uint32_t max_data_size)
+{
+ _max_data_size = max_data_size;
+}
+
+OutSocketMessage& OutSocketMessage::alloc_message()
+{
+ OutSocketMessage* ret;
+ if (!_free_messages.empty()) {
+ ret = _free_messages.front();
+ _free_messages.pop_front();
+ } else {
+ ret = new OutSocketMessage();
+ }
+
+ return *ret;
+}
+
+void OutSocketMessage::clear_free_messages()
+{
+ while (!_free_messages.empty()) {
+ OutSocketMessage* message = _free_messages.front();
+ _free_messages.pop_front();
+ delete message;
+ }
+}
+
+struct TunnelService {
+ uint32_t type;
+ uint32_t id;
+ uint32_t group;
+ struct in_addr ip;
+ uint32_t port;
+ std::string name;
+ std::string description;
+
+ struct in_addr virtual_ip;
+#ifdef TUNNEL_CONFIG
+ TunnelConfigConnectionIfc* service_src;
+#endif
+};
+
+class TunnelChannel::TunnelSocket: public ClientNetSocket {
+public:
+ TunnelSocket(uint16_t id, TunnelService & dst_service, EventsLoop & events_loop,
+ EventHandler & event_handler);
+ virtual ~TunnelSocket() {}
+
+ void set_num_tokens(uint32_t tokens) {_num_tokens = tokens;}
+ void set_server_num_tokens(uint32_t tokens) {_server_num_tokens = tokens;}
+ void set_guest_closed() {_guest_closed = true;}
+
+ uint32_t get_num_tokens() {return _num_tokens;}
+ uint32_t get_server_num_tokens() {return _server_num_tokens;}
+ bool get_guest_closed() {return _guest_closed;}
+
+protected:
+ virtual ReceiveBuffer& alloc_receive_buffer() {return OutSocketMessage::alloc_message();}
+
+private:
+ uint32_t _num_tokens;
+ uint32_t _server_num_tokens;
+ uint32_t _service_id;
+ bool _guest_closed;
+};
+
+TunnelChannel::TunnelSocket::TunnelSocket(uint16_t id, TunnelService& dst_service,
+ EventsLoop& events_loop,
+ ClientNetSocket::EventHandler& event_handler)
+ : ClientNetSocket(id, dst_service.ip, htons((uint16_t)dst_service.port),
+ events_loop, event_handler)
+ , _num_tokens (0)
+ , _server_num_tokens (0)
+ , _service_id (dst_service.id)
+ , _guest_closed (false)
+{
+}
+
+class TunnelHandler: public MessageHandlerImp<TunnelChannel, RED_TUNNEL_MESSAGES_END> {
+public:
+ TunnelHandler(TunnelChannel& channel)
+ : MessageHandlerImp<TunnelChannel, RED_TUNNEL_MESSAGES_END>(channel) {}
+};
+
+TunnelChannel::TunnelChannel(RedClient& client, uint32_t id)
+ : RedChannel(client, RED_CHANNEL_TUNNEL, id, new TunnelHandler(*this))
+ , _max_socket_data_size(0)
+ , _service_id(0)
+ , _service_group(0)
+{
+ TunnelHandler* handler = static_cast<TunnelHandler*>(get_message_handler());
+
+ handler->set_handler(RED_MIGRATE, &TunnelChannel::handle_migrate, 0);
+ handler->set_handler(RED_SET_ACK, &TunnelChannel::handle_set_ack, sizeof(RedSetAck));
+ handler->set_handler(RED_PING, &TunnelChannel::handle_ping, sizeof(RedPing));
+ handler->set_handler(RED_WAIT_FOR_CHANNELS, &TunnelChannel::handle_wait_for_channels,
+ sizeof(RedWaitForChannels));
+
+ handler->set_handler(RED_TUNNEL_INIT,
+ &TunnelChannel::handle_init, sizeof(RedTunnelInit));
+ handler->set_handler(RED_TUNNEL_SERVICE_IP_MAP,
+ &TunnelChannel::handle_service_ip_map, sizeof(RedTunnelServiceIpMap));
+ handler->set_handler(RED_TUNNEL_SOCKET_OPEN,
+ &TunnelChannel::handle_socket_open, sizeof(RedTunnelSocketOpen));
+ handler->set_handler(RED_TUNNEL_SOCKET_CLOSE,
+ &TunnelChannel::handle_socket_close, sizeof(RedTunnelSocketClose));
+ handler->set_handler(RED_TUNNEL_SOCKET_FIN,
+ &TunnelChannel::handle_socket_fin, sizeof(RedTunnelSocketFin));
+ handler->set_handler(RED_TUNNEL_SOCKET_TOKEN,
+ &TunnelChannel::handle_socket_token, sizeof(RedTunnelSocketTokens));
+ handler->set_handler(RED_TUNNEL_SOCKET_CLOSED_ACK,
+ &TunnelChannel::handle_socket_closed_ack,
+ sizeof(RedTunnelSocketClosedAck));
+ handler->set_handler(RED_TUNNEL_SOCKET_DATA,
+ &TunnelChannel::handle_socket_data, sizeof(RedTunnelSocketData));
+#ifdef TUNNEL_CONFIG
+ _config_listener = new TunnelConfigListenerIfc(*this);
+#endif
+}
+
+TunnelChannel::~TunnelChannel()
+{
+ destroy_sockets();
+ OutSocketMessage::clear_free_messages();
+#ifdef TUNNEL_CONFIG
+ delete _config_listener;
+#endif
+}
+
+void TunnelChannel::handle_init(RedPeer::InMessage* message)
+{
+ RedTunnelInit* init_msg = (RedTunnelInit*)message->data();
+ _max_socket_data_size = init_msg->max_socket_data_size;
+ OutSocketMessage::init(_max_socket_data_size);
+ _sockets.resize(init_msg->max_num_of_sockets);
+}
+
+void TunnelChannel::send_service(TunnelService& service)
+{
+ int msg_size = 0;
+ msg_size += service.name.length() + 1;
+ msg_size += service.description.length() + 1;
+
+ if (service.type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ msg_size += sizeof(RedcTunnelAddPrintService) + sizeof(RedTunnelIPv4);
+ } else if (service.type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
+ msg_size += sizeof(RedcTunnelAddGenericService);
+ } else {
+ THROW("%s: invalid service type", __FUNCTION__);
+ }
+ Message* service_msg = new Message(REDC_TUNNEL_SERVICE_ADD, msg_size);
+ RedcTunnelAddGenericService* out_service = (RedcTunnelAddGenericService*)service_msg->data();
+ out_service->id = service.id;
+ out_service->group = service.group;
+ out_service->type = service.type;
+ out_service->port = service.port;
+
+ int cur_offset;
+ if (service.type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ cur_offset = sizeof(RedcTunnelAddPrintService);
+ ((RedcTunnelAddPrintService*)out_service)->ip.type = RED_TUNNEL_IP_TYPE_IPv4;
+ memcpy(((RedcTunnelAddPrintService*)out_service)->ip.data, &(service.ip.s_addr),
+ sizeof(RedTunnelIPv4));
+ cur_offset += sizeof(RedTunnelIPv4);
+ } else {
+ cur_offset = sizeof(RedcTunnelAddGenericService);
+ }
+
+ out_service->name = cur_offset;
+ service.name.copy((char*)(service_msg->data() + cur_offset), service.name.length());
+ (service_msg->data() + cur_offset)[service.name.length()] = '\0';
+ cur_offset += service.name.length() + 1;
+
+ out_service->description = cur_offset;
+ service.description.copy((char*)(service_msg->data() + cur_offset),
+ service.description.length());
+ (service_msg->data() + cur_offset)[service.description.length()] = '\0';
+ cur_offset += service.description.length() + 1;
+
+ post_message(service_msg);
+}
+
+void TunnelChannel::handle_service_ip_map(RedPeer::InMessage* message)
+{
+ RedTunnelServiceIpMap* service_ip_msg = (RedTunnelServiceIpMap*)message->data();
+ TunnelService* service = find_service(service_ip_msg->service_id);
+ if (!service) {
+ THROW("%s: attempt to map non-existing service id=%d", __FUNCTION__,
+ service_ip_msg->service_id);
+ }
+
+ if (service_ip_msg->virtual_ip.type == RED_TUNNEL_IP_TYPE_IPv4) {
+ memcpy(&service->virtual_ip.s_addr, service_ip_msg->virtual_ip.data,
+ sizeof(RedTunnelIPv4));
+ } else {
+ THROW("unexpected ip type %d", service_ip_msg->virtual_ip.type);
+ }
+ DBG(0, "service_id=%d (%s), virtual_ip=%s", service->id, service->name.c_str(),
+ inet_ntoa(service->virtual_ip));
+#ifdef TUNNEL_CONFIG
+ service->service_src->send_virtual_ip(service->virtual_ip);
+#endif
+}
+
+void TunnelChannel::handle_socket_open(RedPeer::InMessage* message)
+{
+ RedTunnelSocketOpen* open_msg = (RedTunnelSocketOpen*)message->data();
+ TunnelSocket* sckt;
+ Message* out_msg;
+
+ if (_sockets[open_msg->connection_id]) {
+ THROW("%s: attempt to open an already opened connection id=%d", __FUNCTION__,
+ open_msg->connection_id);
+ }
+
+ TunnelService* service = find_service(open_msg->service_id);
+ if (!service) {
+ THROW("%s: attempt to access non-existing service id=%d", __FUNCTION__,
+ open_msg->service_id);
+ }
+
+ sckt = new TunnelSocket(open_msg->connection_id, *service, get_events_loop(), *this);
+
+ if (sckt->connect(open_msg->tokens)) {
+ _sockets[open_msg->connection_id] = sckt;
+ out_msg = new Message(REDC_TUNNEL_SOCKET_OPEN_ACK, sizeof(RedcTunnelSocketOpenAck));
+ sckt->set_num_tokens(0);
+ sckt->set_server_num_tokens(SOCKET_WINDOW_SIZE);
+
+ ((RedcTunnelSocketOpenAck*)out_msg->data())->connection_id = open_msg->connection_id;
+ ((RedcTunnelSocketOpenAck*)out_msg->data())->tokens = SOCKET_WINDOW_SIZE;
+ } else {
+ out_msg = new Message(REDC_TUNNEL_SOCKET_OPEN_NACK, sizeof(RedcTunnelSocketOpenNack));
+ ((RedcTunnelSocketOpenNack*)out_msg->data())->connection_id = open_msg->connection_id;
+ delete sckt;
+ }
+
+ post_message(out_msg);
+}
+
+void TunnelChannel::handle_socket_fin(RedPeer::InMessage* message)
+{
+ RedTunnelSocketFin* fin_msg = (RedTunnelSocketFin*)message->data();
+ TunnelSocket* sckt = _sockets[fin_msg->connection_id];
+
+ if (!sckt) {
+ THROW("%s: fin connection that doesn't exist id=%d", __FUNCTION__, fin_msg->connection_id);
+ }
+
+ DBG(0, "guest fin connection_id=%d", fin_msg->connection_id);
+ if (sckt->is_connected()) {
+ sckt->push_fin();
+ }
+}
+
+void TunnelChannel::handle_socket_close(RedPeer::InMessage* message)
+{
+ RedTunnelSocketClose* close_msg = (RedTunnelSocketClose*)message->data();
+ TunnelSocket* sckt = _sockets[close_msg->connection_id];
+
+ if (!sckt) {
+ THROW("%s: closing connection that doesn't exist id=%d", __FUNCTION__,
+ close_msg->connection_id);
+ }
+ DBG(0, "guest closed connection_id=%d", close_msg->connection_id);
+
+ sckt->set_guest_closed();
+
+ if (sckt->is_connected()) {
+ sckt->push_disconnect();
+ } else {
+ // close happend in the server side before it received the client
+ // close msg. we should ack the server and free the socket
+ on_socket_disconnect(*sckt);
+ }
+}
+
+void TunnelChannel::handle_socket_closed_ack(RedPeer::InMessage* message)
+{
+ RedTunnelSocketClosedAck* close_ack_msg = (RedTunnelSocketClosedAck*)message->data();
+ TunnelSocket* sckt = _sockets[close_ack_msg->connection_id];
+ if (!sckt) {
+ THROW("%s: close ack to connection that doesn't exist id=%d", __FUNCTION__,
+ close_ack_msg->connection_id);
+ }
+
+ if (sckt->is_connected()) {
+ THROW("%s: close ack to connection that is not closed id=%d",
+ __FUNCTION__, close_ack_msg->connection_id);
+ }
+ _sockets[sckt->id()] = NULL;
+ DBG(0, "guest Acked closed connection_id=%d", close_ack_msg->connection_id);
+ delete sckt;
+}
+
+void TunnelChannel::handle_socket_data(RedPeer::InMessage* message)
+{
+ RedTunnelSocketData* send_msg = (RedTunnelSocketData*)message->data();
+ TunnelSocket* sckt = _sockets[send_msg->connection_id];
+
+ if (!sckt) {
+ THROW("%s: sending data to connection that doesn't exist id=%d", __FUNCTION__,
+ send_msg->connection_id);
+ }
+
+ if (!sckt->get_server_num_tokens()) {
+ THROW("%s: token violation connectio_id=%d", __FUNCTION__, sckt->id());
+ }
+
+ sckt->set_server_num_tokens(sckt->get_server_num_tokens() - 1);
+
+ if (!sckt->is_connected()) {
+ // server hasn't handled the close msg yet
+ return;
+ }
+
+ InSocketMessage* sckt_msg = new InSocketMessage(*(
+ static_cast<RedChannel::CompundInMessage*>(message)));
+ if (sckt_msg->size() > _max_socket_data_size) {
+ THROW("%s: socket data exceeds size limit %d > %d connection_id=%d", __FUNCTION__,
+ sckt_msg->size(), _max_socket_data_size, sckt->id());
+ }
+ sckt->push_send(*sckt_msg);
+ sckt_msg->unref();
+}
+
+void TunnelChannel::handle_socket_token(RedPeer::InMessage* message)
+{
+ RedTunnelSocketTokens* token_msg = (RedTunnelSocketTokens*)message->data();
+ TunnelSocket* sckt = _sockets[token_msg->connection_id];
+
+ if (!sckt) {
+ THROW("%s: ack connection that doesn't exist id=%d", __FUNCTION__,
+ token_msg->connection_id);
+ }
+ if (!sckt->is_connected()) {
+ return;
+ }
+ sckt->add_recv_tokens(token_msg->num_tokens);
+}
+
+void TunnelChannel::on_socket_message_recv_done(ClientNetSocket& sckt,
+ ClientNetSocket::ReceiveBuffer& buf)
+{
+ TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
+ OutSocketMessage* out_msg = static_cast<OutSocketMessage*>(&buf);
+
+ ((RedcTunnelSocketData*)(out_msg->data()))->connection_id = tunnel_sckt->id();
+ post_message(out_msg);
+}
+
+void TunnelChannel::on_socket_fin_recv(ClientNetSocket& sckt)
+{
+ TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
+ Message* out_msg = new Message(REDC_TUNNEL_SOCKET_FIN, sizeof(RedcTunnelSocketFin));
+ DBG(0, "FIN from client coonection id=%d", tunnel_sckt->id());
+ ((RedcTunnelSocketFin*)out_msg->data())->connection_id = tunnel_sckt->id();
+ post_message(out_msg);
+}
+
+void TunnelChannel::on_socket_disconnect(ClientNetSocket& sckt)
+{
+ TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
+ Message* out_msg;
+ // close intiated by server -> needs ack
+ if (tunnel_sckt->get_guest_closed()) {
+ DBG(0, "send close ack connection_id=%d", tunnel_sckt->id());
+ out_msg = new Message(REDC_TUNNEL_SOCKET_CLOSED_ACK, sizeof(RedcTunnelSocketClosedAck));
+ ((RedcTunnelSocketClosedAck*)out_msg->data())->connection_id = tunnel_sckt->id();
+ _sockets[tunnel_sckt->id()] = NULL;
+ delete &sckt;
+ } else { // close initiated by client
+ DBG(0, "send close coonection_id=%d", tunnel_sckt->id());
+ out_msg = new Message(REDC_TUNNEL_SOCKET_CLOSED, sizeof(RedcTunnelSocketClosed));
+ ((RedcTunnelSocketClosed*)out_msg->data())->connection_id = tunnel_sckt->id();
+ }
+
+ post_message(out_msg);
+}
+
+void TunnelChannel::on_socket_message_send_done(ClientNetSocket& sckt)
+{
+ TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt);
+ uint32_t num_tokens = tunnel_sckt->get_num_tokens();
+ num_tokens++;
+
+ if (num_tokens == SOCKET_TOKENS_TO_SEND) {
+ Message* out_msg = new Message(REDC_TUNNEL_SOCKET_TOKEN, sizeof(RedcTunnelSocketTokens));
+ RedcTunnelSocketTokens* tokens_msg = (RedcTunnelSocketTokens*)out_msg->data();
+ tokens_msg->connection_id = tunnel_sckt->id();
+ tokens_msg->num_tokens = num_tokens;
+ post_message(out_msg);
+
+ tunnel_sckt->set_num_tokens(0);
+ tunnel_sckt->set_server_num_tokens(tunnel_sckt->get_server_num_tokens() + num_tokens);
+
+ ASSERT(tunnel_sckt->get_server_num_tokens() <= SOCKET_WINDOW_SIZE);
+ } else {
+ tunnel_sckt->set_num_tokens(num_tokens);
+ }
+}
+
+TunnelService* TunnelChannel::find_service(uint32_t id)
+{
+ for (std::list<TunnelService*>::iterator iter = _services.begin();
+ iter != _services.end(); iter++) {
+ if ((*iter)->id == id) {
+ return *iter;
+ }
+ }
+ return NULL;
+}
+
+/* returns the first service with the same ip */
+TunnelService* TunnelChannel::find_service(struct in_addr& ip)
+{
+ for (std::list<TunnelService*>::iterator iter = _services.begin();
+ iter != _services.end(); iter++) {
+ if ((*iter)->ip.s_addr == ip.s_addr) {
+ return *iter;
+ }
+ }
+ return NULL;
+}
+
+TunnelService* TunnelChannel::find_service(struct in_addr& ip, uint32_t port)
+{
+ for (std::list<TunnelService*>::iterator iter = _services.begin();
+ iter != _services.end(); iter++) {
+ if (((*iter)->ip.s_addr == ip.s_addr) && ((*iter)->port == port)) {
+ return *iter;
+ }
+ }
+ return NULL;
+}
+
+void TunnelChannel::destroy_sockets()
+{
+ for (unsigned int i = 0; i < _sockets.size(); i++) {
+ if (_sockets[i]) {
+ delete _sockets[i];
+ _sockets[i] = NULL;
+ }
+ }
+}
+
+void TunnelChannel::on_disconnect()
+{
+ destroy_sockets();
+ OutSocketMessage::clear_free_messages();
+}
+
+#ifdef TUNNEL_CONFIG
+void TunnelChannel::add_service(TunnelConfigConnectionIfc& source,
+ uint32_t type, struct in_addr& ip, uint32_t port,
+ std::string& name, std::string& description)
+{
+ if (find_service(ip, port)) {
+ LOG_WARN("service ip=%s port=%d was already added",
+ inet_ntoa(ip), port);
+ return;
+ }
+ TunnelService* new_service = new TunnelService;
+ TunnelService* service_group = find_service(ip);
+ new_service->type = type;
+ new_service->id = _service_id++;
+ if (service_group) {
+ if (name != service_group->name) {
+ LOG_WARN("service ip=%s port=%d was not added because of inconsistent name for ip",
+ inet_ntoa(ip), port);
+ delete new_service;
+ return;
+ }
+ new_service->group = service_group->group;
+ } else {
+ new_service->group = _service_group++;
+ }
+ new_service->ip.s_addr = ip.s_addr;
+ new_service->port = port;
+ new_service->name = name;
+ new_service->description = description;
+ new_service->service_src = &source;
+ _services.push_back(new_service);
+ send_service(*new_service);
+}
+
+#endif
+
+class TunnelFactory: public ChannelFactory {
+public:
+ TunnelFactory() : ChannelFactory(RED_CHANNEL_TUNNEL) {}
+ virtual RedChannel* construct(RedClient& client, uint32_t id)
+ {
+ return new TunnelChannel(client, id);
+ }
+};
+
+static TunnelFactory factory;
+
+ChannelFactory& TunnelChannel::Factory()
+{
+ return factory;
+}
+
+#ifdef TUNNEL_CONFIG
+TunnelConfigListenerIfc::TunnelConfigListenerIfc(TunnelChannel& tunnel)
+ : _tunnel(tunnel)
+{
+ _listener_ref = NamedPipe::create(TUNNEL_CONFIG_PIPE_NAME, *this);
+}
+
+TunnelConfigListenerIfc::~TunnelConfigListenerIfc()
+{
+ for (std::list<TunnelConfigConnectionIfc*>::iterator it = _connections.begin();
+ it != _connections.end(); ++it) {
+ if ((*it)->get_ref() != NamedPipe::INVALID_CONNECTION) {
+ NamedPipe::destroy_connection((*it)->get_ref());
+ }
+ delete (*it);
+ }
+
+ NamedPipe::destroy(_listener_ref);
+}
+
+NamedPipe::ConnectionInterface& TunnelConfigListenerIfc::create()
+{
+ DBG(0, "new_connection");
+ TunnelConfigConnectionIfc* new_conn = new TunnelConfigConnectionIfc(_tunnel, *this);
+ _connections.push_back(new_conn);
+ return *new_conn;
+}
+
+void TunnelConfigListenerIfc::destroy_connection(TunnelConfigConnectionIfc* conn)
+{
+ if (conn->get_ref() != NamedPipe::INVALID_CONNECTION) {
+ NamedPipe::destroy_connection(conn->get_ref());
+ }
+ _connections.remove(conn);
+ delete conn;
+}
+
+TunnelConfigConnectionIfc::TunnelConfigConnectionIfc(TunnelChannel& tunnel,
+ TunnelConfigListenerIfc& listener)
+ : _tunnel(tunnel)
+ , _listener(listener)
+ , _in_msg_len(0)
+ , _out_msg("")
+ , _out_msg_pos(0)
+{
+}
+
+void TunnelConfigConnectionIfc::bind(NamedPipe::ConnectionRef conn_ref)
+{
+ _opaque = conn_ref;
+ on_data();
+}
+
+void TunnelConfigConnectionIfc::on_data()
+{
+ if (!_out_msg.empty()) {
+ int ret = NamedPipe::write(_opaque, (uint8_t*)_out_msg.c_str() + _out_msg_pos,
+ _out_msg.length() - _out_msg_pos);
+ if (ret == -1) {
+ _listener.destroy_connection(this);
+ return;
+ }
+ _out_msg_pos += ret;
+ if (_out_msg_pos == _out_msg.length()) {
+ _out_msg = "";
+ _out_msg_pos = 0;
+ }
+ } else {
+ int ret = NamedPipe::read(_opaque, (uint8_t*)_in_msg + _in_msg_len,
+ TUNNEL_CONFIG_MAX_MSG_LEN - _in_msg_len);
+
+ if (ret == -1) {
+ _listener.destroy_connection(this);
+ return;
+ }
+ _in_msg_len += ret;
+
+ if (_in_msg[_in_msg_len - 1] != '\n') {
+ return;
+ }
+ handle_msg();
+ _in_msg_len = 0;
+ }
+}
+
+void TunnelConfigConnectionIfc::send_virtual_ip(struct in_addr& ip)
+{
+ _out_msg = inet_ntoa(ip);
+ _out_msg += "\n";
+ _out_msg_pos = 0;
+ on_data();
+}
+
+void TunnelConfigConnectionIfc::handle_msg()
+{
+ std::string space = " \t";
+ _in_msg[_in_msg_len - 1] = '\0';
+ std::string msg(_in_msg);
+
+ uint32_t service_type;
+ struct in_addr ip;
+ uint32_t port;
+ std::string name;
+ std::string desc;
+
+ DBG(0, "msg=%s", _in_msg);
+ size_t start_token = 0;
+ size_t end_token;
+
+ start_token = msg.find_first_not_of(space);
+ end_token = msg.find_first_of(space, start_token);
+
+ if ((end_token - start_token) != 1) {
+ THROW("unexpected service type length");
+ }
+ if (msg[start_token] == '0') {
+ service_type = RED_TUNNEL_SERVICE_TYPE_GENERIC;
+ } else if (msg[start_token] == '1') {
+ service_type = RED_TUNNEL_SERVICE_TYPE_IPP;
+ } else {
+ THROW("unexpected service type");
+ }
+
+ start_token = msg.find_first_not_of(space, end_token);
+ end_token = msg.find_first_of(space, start_token);
+
+ inet_aton(msg.substr(start_token, end_token - start_token).c_str(), &ip);
+
+ start_token = msg.find_first_not_of(space, end_token);
+ end_token = msg.find_first_of(space, start_token);
+
+ port = atoi(msg.substr(start_token, end_token - start_token).c_str());
+
+ start_token = msg.find_first_not_of(space, end_token);
+ end_token = msg.find_first_of(space, start_token);
+
+ name = msg.substr(start_token, end_token - start_token);
+
+ start_token = msg.find_first_not_of(space, end_token);
+ desc = msg.substr(start_token);
+
+ _tunnel.add_service(*this, service_type, ip, port, name, desc);
+}
+
+#endif
diff --git a/client/tunnel_channel.h b/client/tunnel_channel.h
new file mode 100644
index 00000000..4fd3465c
--- /dev/null
+++ b/client/tunnel_channel.h
@@ -0,0 +1,138 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#ifndef _H_TUNNEL_CHANNEL
+#define _H_TUNNEL_CHANNEL
+
+#include "common.h"
+#include "red_channel.h"
+#include "red_client.h"
+#include "client_net_socket.h"
+#include "platform.h"
+
+#define TUNNEL_CONFIG
+
+#ifdef TUNNEL_CONFIG
+class TunnelConfigConnectionIfc;
+class TunnelConfigListenerIfc;
+#endif
+
+/* channel for tunneling tcp from guest to client network */
+typedef struct TunnelService TunnelService;
+class TunnelChannel: public RedChannel,
+ public ClientNetSocket::EventHandler {
+public:
+
+ TunnelChannel(RedClient& client, uint32_t id);
+ virtual ~TunnelChannel();
+
+ virtual void on_socket_message_recv_done(ClientNetSocket& sckt,
+ ClientNetSocket::ReceiveBuffer& buf);
+ virtual void on_socket_message_send_done(ClientNetSocket& sckt);
+ virtual void on_socket_fin_recv(ClientNetSocket& sckt);
+ virtual void on_socket_disconnect(ClientNetSocket& sckt);
+
+#ifdef TUNNEL_CONFIG
+ void add_service(TunnelConfigConnectionIfc& source,
+ uint32_t type, struct in_addr& ip, uint32_t port,
+ std::string& name, std::string& description);
+#endif
+ static ChannelFactory& Factory();
+
+protected:
+ class TunnelSocket;
+
+ virtual void on_disconnect();
+
+private:
+ void handle_init(RedPeer::InMessage* message);
+ void handle_service_ip_map(RedPeer::InMessage* message);
+
+ void handle_socket_open(RedPeer::InMessage* message);
+ void handle_socket_fin(RedPeer::InMessage* message);
+ void handle_socket_close(RedPeer::InMessage* message);
+ void handle_socket_closed_ack(RedPeer::InMessage* message);
+ void handle_socket_data(RedPeer::InMessage* message);
+ void handle_socket_token(RedPeer::InMessage* message);
+
+ TunnelService* find_service(uint32_t id);
+ TunnelService* find_service(struct in_addr& ip);
+ TunnelService* find_service(struct in_addr& ip, uint32_t port);
+
+ void send_service(TunnelService& service);
+ void destroy_sockets();
+
+private:
+ std::vector<TunnelSocket*> _sockets;
+ std::list<TunnelService*> _services;
+ uint32_t _max_socket_data_size;
+ uint32_t _service_id;
+ uint32_t _service_group;
+#ifdef TUNNEL_CONFIG
+ TunnelConfigListenerIfc* _config_listener;
+#endif
+};
+
+#ifdef TUNNEL_CONFIG
+#ifdef _WIN32
+#define TUNNEL_CONFIG_PIPE_NAME "tunnel-config.pipe"
+#else
+#define TUNNEL_CONFIG_PIPE_NAME "/tmp/tunnel-config.pipe"
+#endif
+
+class TunnelConfigConnectionIfc;
+
+class TunnelConfigListenerIfc: public NamedPipe::ListenerInterface {
+public:
+ TunnelConfigListenerIfc(TunnelChannel& tunnel);
+ virtual ~TunnelConfigListenerIfc();
+ virtual NamedPipe::ConnectionInterface& create();
+ virtual void destroy_connection(TunnelConfigConnectionIfc* conn);
+
+private:
+ TunnelChannel& _tunnel;
+ NamedPipe::ListenerRef _listener_ref;
+ std::list<TunnelConfigConnectionIfc*> _connections;
+};
+
+#define TUNNEL_CONFIG_MAX_MSG_LEN 2048
+class TunnelConfigConnectionIfc: public NamedPipe::ConnectionInterface {
+public:
+ TunnelConfigConnectionIfc(TunnelChannel& tunnel,
+ TunnelConfigListenerIfc& listener);
+ virtual void bind(NamedPipe::ConnectionRef conn_ref);
+ virtual void on_data();
+ void send_virtual_ip(struct in_addr& ip);
+ NamedPipe::ConnectionRef get_ref() {return _opaque;}
+ void handle_msg();
+
+private:
+ TunnelChannel& _tunnel;
+ TunnelConfigListenerIfc& _listener;
+ char _in_msg[TUNNEL_CONFIG_MAX_MSG_LEN]; // <service_type> <ip> <port> <name> <desc>\n
+ int _in_msg_len;
+
+ std::string _out_msg; // <virtual ip>\n
+ int _out_msg_pos;
+};
+#endif
+
+#endif
diff --git a/client/windows/platform_utils.cpp b/client/windows/platform_utils.cpp
index 6b9049a0..9b809c87 100644
--- a/client/windows/platform_utils.cpp
+++ b/client/windows/platform_utils.cpp
@@ -90,7 +90,7 @@ HBITMAP get_alpha_bitmap_res(int id)
AutoDC auto_dc(create_compatible_dc());
BITMAPINFO dest_info;
- uint8_t *dest;
+ uint8_t* dest;
dest_info.bmiHeader.biSize = sizeof(dest_info.bmiHeader);
dest_info.bmiHeader.biWidth = src_info.bmWidth;
dest_info.bmiHeader.biHeight = -src_info.bmHeight;
@@ -102,7 +102,7 @@ HBITMAP get_alpha_bitmap_res(int id)
dest_info.bmiHeader.biClrUsed = 0;
dest_info.bmiHeader.biClrImportant = 0;
- HBITMAP ret = CreateDIBSection(auto_dc.get(), &dest_info, 0, (VOID **)&dest, NULL, 0);
+ HBITMAP ret = CreateDIBSection(auto_dc.get(), &dest_info, 0, (VOID**)&dest, NULL, 0);
if (!ret) {
THROW("create bitmap failed, %u", GetLastError());
}
@@ -139,7 +139,7 @@ const char* sys_err_to_str(int error)
msg = new char[BUF_SIZE];
_snprintf(msg, BUF_SIZE, "errno %d", error);
} else {
- char *new_line;
+ char* new_line;
if ((new_line = strrchr(msg, '\r'))) {
*new_line = 0;
}
@@ -149,3 +149,14 @@ const char* sys_err_to_str(int error)
return errors_map[error];
}
+int inet_aton(const char* ip, struct in_addr* in_addr)
+{
+ unsigned long addr = inet_addr(ip);
+
+ if (addr == INADDR_NONE) {
+ return 0;
+ }
+ in_addr->S_un.S_addr = addr;
+ return 1;
+}
+
diff --git a/client/windows/platform_utils.h b/client/windows/platform_utils.h
index 95b04fcb..299f70b1 100644
--- a/client/windows/platform_utils.h
+++ b/client/windows/platform_utils.h
@@ -18,13 +18,17 @@
#ifndef _H_PLATFORM_UTILS
#define _H_PLATFORM_UTILS
+#include <winsock.h>
+#include <winsock2.h>
+#include <ws2tcpip.h>
+
#define mb() __asm {lock add [esp], 0}
template<class T, class FreeRes = FreeObject<T>, T invalid = NULL >
class AutoRes {
public:
- AutoRes() : res (invalid) {}
- AutoRes(T inRes) : res (inRes) {}
+ AutoRes() : res(invalid) {}
+ AutoRes(T inRes) : res(inRes) {}
~AutoRes() { set(invalid); }
void set(T inRes) {if (res != invalid) free_res(res); res = inRes; }
@@ -67,7 +71,7 @@ HBITMAP get_alpha_bitmap_res(int id);
class WindowDC {
public:
- WindowDC(HWND window) : _window (window), _dc (GetDC(window)) {}
+ WindowDC(HWND window): _window (window), _dc (GetDC(window)) {}
~WindowDC() { ReleaseDC(_window, _dc);}
HDC operator * () { return _dc;}
@@ -80,5 +84,17 @@ typedef AutoRes<HDC, Delete_DC> AutoReleaseDC;
const char* sys_err_to_str(int error);
+#define SHUT_WR SD_SEND
+#define SHUT_RD SD_RECEIVE
+#define SHUT_RDWR SD_BOTH
+#define MSG_NOSIGNAL 0
+
+#define SHUTDOWN_ERR WSAESHUTDOWN
+#define INTERRUPTED_ERR WSAEINTR
+#define WOULDBLOCK_ERR WSAEWOULDBLOCK
+#define sock_error() WSAGetLastError()
+#define sock_err_message(err) sys_err_to_str(err)
+int inet_aton(const char* ip, struct in_addr* in_addr);
+
#endif
diff --git a/client/windows/redc.vcproj b/client/windows/redc.vcproj
index 25ff3259..90d235ca 100644
--- a/client/windows/redc.vcproj
+++ b/client/windows/redc.vcproj
@@ -205,6 +205,10 @@
>
</File>
<File
+ RelativePath="..\client_net_socket.cpp"
+ >
+ </File>
+ <File
RelativePath="..\cmd_line_parser.cpp"
>
</File>
@@ -379,6 +383,10 @@
>
</File>
<File
+ RelativePath="..\tunnel_channel.cpp"
+ >
+ </File>
+ <File
RelativePath="..\utils.cpp"
>
</File>
@@ -405,6 +413,10 @@
>
</File>
<File
+ RelativePath="..\client_net_socket.h"
+ >
+ </File>
+ <File
RelativePath="..\common.h"
>
</File>
@@ -557,6 +569,10 @@
>
</File>
<File
+ RelativePath="..\tunnel_channel.h"
+ >
+ </File>
+ <File
RelativePath="..\utils.h"
>
</File>
diff --git a/client/x11/Makefile.am b/client/x11/Makefile.am
index 2bb1c853..02ee8717 100644
--- a/client/x11/Makefile.am
+++ b/client/x11/Makefile.am
@@ -39,6 +39,8 @@ RED_COMMON_SRCS = \
$(top_srcdir)/client/red_cairo_canvas.h \
$(top_srcdir)/client/cmd_line_parser.cpp \
$(top_srcdir)/client/cmd_line_parser.h \
+ $(top_srcdir)/client/client_net_socket.cpp \
+ $(top_srcdir)/client/client_net_socket.h \
$(top_srcdir)/client/common.h \
$(top_srcdir)/client/cursor_channel.cpp \
$(top_srcdir)/client/cursor_channel.h \
@@ -94,9 +96,11 @@ RED_COMMON_SRCS = \
$(top_srcdir)/client/hot_keys.cpp \
$(top_srcdir)/client/hot_keys.h \
$(top_srcdir)/client/threads.cpp \
+ $(top_srcdir)/client/tunnel_channel.cpp \
+ $(top_srcdir)/client/tunnel_channel.h \
$(top_srcdir)/client/utils.cpp \
$(top_srcdir)/client/utils.h \
- $(top_srcdir)/client/icon.h \
+ $(top_srcdir)/client/icon.h \
$(NULL)
bin_PROGRAMS = spicec
diff --git a/client/x11/platform_utils.h b/client/x11/platform_utils.h
index 763a70e5..1e86f72e 100644
--- a/client/x11/platform_utils.h
+++ b/client/x11/platform_utils.h
@@ -18,11 +18,28 @@
#ifndef _H_PLATFORM_UTILS
#define _H_PLATFORM_UTILS
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+
#ifdef __i386__
-#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%esp)": : : "memory")
+#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%esp)" : : : "memory")
#else
-#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%rsp)": : : "memory")
+#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%rsp)" : : : "memory")
#endif
+typedef int SOCKET;
+
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
+#define closesocket(sock) ::close(sock)
+#define SHUTDOWN_ERR EPIPE
+#define INTERRUPTED_ERR EINTR
+#define WOULDBLOCK_ERR EAGAIN
+#define sock_error() errno
+#define sock_err_message(err) strerror(err)
+
#endif
diff --git a/common/red.h b/common/red.h
index 4b0a3642..dfd85126 100644
--- a/common/red.h
+++ b/common/red.h
@@ -45,8 +45,8 @@
#endif
#define RED_MAGIC (*(uint32_t*)"REDQ")
-#define RED_VERSION_MAJOR 1
-#define RED_VERSION_MINOR 0
+#define RED_VERSION_MAJOR (~(uint32_t)0 - 1)
+#define RED_VERSION_MINOR 1
// Encryption & Ticketing Parameters
#define RED_MAX_PASSWORD_LENGTH 60
@@ -60,6 +60,7 @@ enum {
RED_CHANNEL_CURSOR,
RED_CHANNEL_PLAYBACK,
RED_CHANNEL_RECORD,
+ RED_CHANNEL_TUNNEL,
RED_CHANNEL_END
};
@@ -675,6 +676,143 @@ typedef struct ATTR_PACKED RedcRecordStartMark {
uint32_t time;
} RedcRecordStartMark;
+enum {
+ RED_TUNNEL_SERVICE_TYPE_INVALID,
+ RED_TUNNEL_SERVICE_TYPE_GENERIC,
+ RED_TUNNEL_SERVICE_TYPE_IPP,
+};
+
+enum {
+ RED_TUNNEL_INIT = RED_FIRST_AVAIL_MESSAGE,
+ RED_TUNNEL_SERVICE_IP_MAP,
+ RED_TUNNEL_SOCKET_OPEN,
+ RED_TUNNEL_SOCKET_FIN,
+ RED_TUNNEL_SOCKET_CLOSE,
+ RED_TUNNEL_SOCKET_DATA,
+ RED_TUNNEL_SOCKET_CLOSED_ACK,
+ RED_TUNNEL_SOCKET_TOKEN,
+
+ RED_TUNNEL_MESSAGES_END,
+};
+
+typedef struct ATTR_PACKED RedTunnelInit {
+ uint16_t max_num_of_sockets;
+ uint32_t max_socket_data_size;
+} RedTunnelInit;
+
+enum {
+ RED_TUNNEL_IP_TYPE_INVALID,
+ RED_TUNNEL_IP_TYPE_IPv4,
+};
+
+typedef struct ATTR_PACKED RedTunnelIpInfo {
+ uint16_t type;
+ uint8_t data[0];
+} RedTunnelIpInfo;
+
+typedef uint8_t RedTunnelIPv4[4];
+
+typedef struct ATTR_PACKED RedTunnelServiceIpMap {
+ uint32_t service_id;
+ RedTunnelIpInfo virtual_ip;
+} RedTunnelServiceIpMap;
+
+typedef struct ATTR_PACKED RedTunnelSocketOpen {
+ uint16_t connection_id;
+ uint32_t service_id;
+ uint32_t tokens;
+} RedTunnelSocketOpen;
+
+/* connection id must be the first field in msgs directed to a specific connection */
+
+typedef struct ATTR_PACKED RedTunnelSocketFin {
+ uint16_t connection_id;
+} RedTunnelSocketFin;
+
+typedef struct ATTR_PACKED RedTunnelSocketClose {
+ uint16_t connection_id;
+} RedTunnelSocketClose;
+
+typedef struct ATTR_PACKED RedTunnelSocketData {
+ uint16_t connection_id;
+ uint8_t data[0];
+} RedTunnelSocketData;
+
+typedef struct ATTR_PACKED RedTunnelSocketTokens {
+ uint16_t connection_id;
+ uint32_t num_tokens;
+} RedTunnelSocketTokens;
+
+typedef struct ATTR_PACKED RedTunnelSocketClosedAck {
+ uint16_t connection_id;
+} RedTunnelSocketClosedAck;
+
+enum {
+ REDC_TUNNEL_SERVICE_ADD = REDC_FIRST_AVAIL_MESSAGE,
+ REDC_TUNNEL_SERVICE_REMOVE,
+ REDC_TUNNEL_SOCKET_OPEN_ACK,
+ REDC_TUNNEL_SOCKET_OPEN_NACK,
+ REDC_TUNNEL_SOCKET_FIN,
+ REDC_TUNNEL_SOCKET_CLOSED,
+ REDC_TUNNEL_SOCKET_CLOSED_ACK,
+ REDC_TUNNEL_SOCKET_DATA,
+
+ REDC_TUNNEL_SOCKET_TOKEN,
+
+ REDC_TUNNEL_MESSGES_END,
+};
+
+typedef struct ATTR_PACKED RedcTunnelAddGenericService {
+ uint32_t type;
+ uint32_t id;
+ uint32_t group;
+ uint32_t port;
+ uint32_t name;
+ uint32_t description;
+} RedcTunnelAddGenericService;
+
+typedef struct ATTR_PACKED RedcTunnelAddPrintService {
+ RedcTunnelAddGenericService base;
+ RedTunnelIpInfo ip;
+} RedcTunnelAddPrintService;
+
+typedef struct ATTR_PACKED RedcTunnelRemoveService {
+ uint32_t id;
+} RedcTunnelRemoveService;
+
+/* connection id must be the first field in msgs directed to a specific connection */
+
+typedef struct ATTR_PACKED RedcTunnelSocketOpenAck {
+ uint16_t connection_id;
+ uint32_t tokens;
+} RedcTunnelSocketOpenAck;
+
+typedef struct ATTR_PACKED RedcTunnelSocketOpenNack {
+ uint16_t connection_id;
+} RedcTunnelSocketOpenNack;
+
+typedef struct ATTR_PACKED RedcTunnelSocketData {
+ uint16_t connection_id;
+ uint8_t data[0];
+} RedcTunnelSocketData;
+
+typedef struct ATTR_PACKED RedcTunnelSocketFin {
+ uint16_t connection_id;
+} RedcTunnelSocketFin;
+
+typedef struct ATTR_PACKED RedcTunnelSocketClosed {
+ uint16_t connection_id;
+} RedcTunnelSocketClosed;
+
+typedef struct ATTR_PACKED RedcTunnelSocketClosedAck {
+ uint16_t connection_id;
+} RedcTunnelSocketClosedAck;
+
+typedef struct ATTR_PACKED RedcTunnelSocketTokens {
+ uint16_t connection_id;
+ uint32_t num_tokens;
+} RedcTunnelSocketTokens;
+
#undef ATTR_PACKED
#ifndef __GNUC__
diff --git a/configure.ac b/configure.ac
index 12799195..9810b822 100644
--- a/configure.ac
+++ b/configure.ac
@@ -105,6 +105,11 @@ AC_SUBST(LOG4CPP_CFLAGS)
AC_SUBST(LOG4CPP_LIBS)
SPICE_REQUIRES+=" log4cpp"
+PKG_CHECK_MODULES(SLIRP, slirp)
+AC_SUBST(SLIRP_CFLAGS)
+AC_SUBST(SLIRP_LIBS)
+SPICE_REQUIRES+=" slirp"
+
PKG_CHECK_MODULES(QCAIRO, qcairo >= 1.4.6)
AC_SUBST(QCAIRO_CFLAGS)
AC_SUBST(QCAIRO_LIBS)
diff --git a/server/Makefile.am b/server/Makefile.am
index e6ffab40..f9909615 100644
--- a/server/Makefile.am
+++ b/server/Makefile.am
@@ -9,6 +9,7 @@ INCLUDES = \
$(LOG4CPP_CFLAGS) \
$(SSL_CFLAGS) \
$(CELT051_CFLAGS) \
+ $(SLIRP_CFLAGS) \
-DCAIRO_CANVAS_IMAGE_CACHE \
-DRED_STATISTICS \
$(WARN_CFLAGS) \
@@ -40,6 +41,7 @@ libspice_la_LIBADD = \
$(QCAIRO_LIBS) \
$(SSL_LIBS) \
$(CELT051_LIBS) \
+ $(SLIRP_LIBS) \
$(LIBRT) \
$(NULL)
@@ -64,6 +66,10 @@ libspice_la_SOURCES = \
red_yuv.h \
snd_worker.c \
snd_worker.h \
+ red_channel.h \
+ red_channel.c \
+ red_tunnel_worker.c \
+ red_tunnel_worker.h \
spice.h \
vd_interface.h \
$(COMMON_SRCS) \
diff --git a/server/red_channel.c b/server/red_channel.c
new file mode 100644
index 00000000..48ace448
--- /dev/null
+++ b/server/red_channel.c
@@ -0,0 +1,520 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#include <stdio.h>
+#include <stdint.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include "red_channel.h"
+
+static void red_channel_receive(void *data);
+static void red_channel_push(RedChannel *channel);
+static void red_channel_opaque_push(void *data);
+static PipeItem *red_channel_pipe_get(RedChannel *channel);
+static void red_channel_pipe_clear(RedChannel *channel);
+
+/* return the number of bytes read. -1 in case of error */
+static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size)
+{
+ uint8_t *pos = buf;
+ while (size) {
+ int now;
+ if (peer->shutdown) {
+ return -1;
+ }
+ if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) {
+ if (now == 0) {
+ return -1;
+ }
+ ASSERT(now == -1);
+ if (errno == EAGAIN) {
+ break;
+ } else if (errno == EINTR) {
+ continue;
+ } else if (errno == EPIPE) {
+ return -1;
+ } else {
+ red_printf("%s", strerror(errno));
+ return -1;
+ }
+ } else {
+ size -= now;
+ pos += now;
+ }
+ }
+ return pos - buf;
+}
+
+static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
+{
+ int bytes_read;
+
+ for (;;) {
+ int ret_handle;
+ if (handler->header_pos < sizeof(RedDataHeader)) {
+ bytes_read = red_peer_receive(peer,
+ ((uint8_t *)&handler->header) + handler->header_pos,
+ sizeof(RedDataHeader) - handler->header_pos);
+ if (bytes_read == -1) {
+ handler->on_error(handler->opaque);
+ return;
+ }
+ handler->header_pos += bytes_read;
+
+ if (handler->header_pos != sizeof(RedDataHeader)) {
+ return;
+ }
+ }
+
+ if (handler->msg_pos < handler->header.size) {
+ if (!handler->msg) {
+ handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header);
+ }
+
+ bytes_read = red_peer_receive(peer,
+ handler->msg + handler->msg_pos,
+ handler->header.size - handler->msg_pos);
+ if (bytes_read == -1) {
+ handler->release_msg_buf(handler->opaque, &handler->header, handler->msg);
+ handler->on_error(handler->opaque);
+ return;
+ }
+ handler->msg_pos += bytes_read;
+ if (handler->msg_pos != handler->header.size) {
+ return;
+ }
+ }
+
+ ret_handle = handler->handle_message(handler->opaque, &handler->header,
+ handler->msg);
+ handler->msg_pos = 0;
+ handler->msg = NULL;
+ handler->header_pos = 0;
+
+ if (!ret_handle) {
+ handler->on_error(handler->opaque);
+ return;
+ }
+ }
+}
+
+static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size)
+{
+ struct iovec *now = vec;
+
+ while ((skip) && (skip >= now->iov_len)) {
+ skip -= now->iov_len;
+ --*vec_size;
+ now++;
+ }
+
+ now->iov_base = (uint8_t *)now->iov_base + skip;
+ now->iov_len -= skip;
+ return now;
+}
+
+static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
+{
+ int n;
+ if (handler->size == 0) {
+ handler->vec = handler->vec_buf;
+ handler->size = handler->get_msg_size(handler->opaque);
+ if (!handler->size) { // nothing to be sent
+ return;
+ }
+ handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
+ }
+ for (;;) {
+ if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) {
+ switch (errno) {
+ case EAGAIN:
+ handler->on_block(handler->opaque);
+ return;
+ case EINTR:
+ continue;
+ case EPIPE:
+ handler->on_error(handler->opaque);
+ return;
+ default:
+ red_printf("%s", strerror(errno));
+ handler->on_error(handler->opaque);
+ return;
+ }
+ } else {
+ handler->pos += n;
+ handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size);
+ if (!handler->vec_size) {
+ if (handler->pos == handler->size) { // finished writing data
+ handler->on_msg_done(handler->opaque);
+ handler->vec = handler->vec_buf;
+ handler->pos = 0;
+ handler->size = 0;
+ return;
+ } else {
+ // There wasn't enough place for all the outgoing data in one iovec array.
+ // Filling the rest of the data.
+ handler->vec = handler->vec_buf;
+ handler->prepare(handler->opaque, handler->vec, &handler->vec_size);
+ }
+ }
+ }
+ }
+}
+
+static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size);
+
+
+static void red_channel_peer_on_error(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+ channel->disconnect(channel);
+}
+
+static int red_channel_peer_get_out_msg_size(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+ return channel->send_data.size;
+}
+
+static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+ red_channel_fill_iovec(channel, vec, vec_size);
+}
+
+static void red_channel_peer_on_out_block(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+ channel->send_data.blocked = TRUE;
+ channel->core->set_file_handlers(channel->core, channel->peer->socket,
+ red_channel_receive, red_channel_opaque_push,
+ channel);
+}
+
+static void red_channel_peer_on_out_msg_done(void *opaque)
+{
+ RedChannel *channel = (RedChannel *)opaque;
+ channel->send_data.size = 0;
+ channel->send_data.n_bufs = 0;
+ channel->send_data.not_sent_buf_head = 0;
+ if (channel->send_data.item) {
+ channel->release_item(channel, channel->send_data.item, TRUE);
+ channel->send_data.item = NULL;
+ }
+ if (channel->send_data.blocked) {
+ channel->send_data.blocked = FALSE;
+ channel->core->set_file_handlers(channel->core, channel->peer->socket,
+ red_channel_receive, NULL,
+ channel);
+ }
+}
+
+RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core,
+ int migrate, int handle_acks,
+ channel_configure_socket_proc config_socket,
+ channel_disconnect_proc disconnect,
+ channel_handle_message_proc handle_message,
+ channel_alloc_msg_recv_buf_proc alloc_recv_buf,
+ channel_release_msg_recv_buf_proc release_recv_buf,
+ channel_send_pipe_item_proc send_item,
+ channel_release_pipe_item_proc release_item)
+{
+ RedChannel *channel;
+
+ ASSERT(size >= sizeof(*channel));
+ ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf &&
+ release_item);
+ if (!(channel = malloc(size))) {
+ red_printf("malloc failed");
+ goto error1;
+ }
+ memset(channel, 0, size);
+
+ channel->handle_acks = handle_acks;
+ channel->disconnect = disconnect;
+ channel->send_item = send_item;
+ channel->release_item = release_item;
+
+ channel->peer = peer;
+ channel->core = core;
+ channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked +
+ // block flags)
+ channel->ack_data.client_generation = ~0;
+
+ channel->migrate = migrate;
+ ring_init(&channel->pipe);
+
+ channel->incoming.opaque = channel;
+ channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
+ channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
+ channel->incoming.handle_message = (handle_message_proc)handle_message;
+ channel->incoming.on_error = red_channel_peer_on_error;
+
+ channel->outgoing.opaque = channel;
+ channel->outgoing.pos = 0;
+ channel->outgoing.size = 0;
+
+ channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
+ channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
+ channel->outgoing.on_block = red_channel_peer_on_out_block;
+ channel->outgoing.on_error = red_channel_peer_on_error;
+ channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
+
+ if (!config_socket(channel)) {
+ goto error2;
+ }
+
+ channel->core->set_file_handlers(channel->core, channel->peer->socket,
+ red_channel_receive, NULL, channel);
+
+ return channel;
+
+error2:
+ free(channel);
+error1:
+ peer->cb_free(peer);
+
+ return NULL;
+}
+
+void red_channel_destroy(RedChannel *channel)
+{
+ if (!channel) {
+ return;
+ }
+ red_channel_pipe_clear(channel);
+ channel->core->set_file_handlers(channel->core, channel->peer->socket,
+ NULL, NULL, NULL);
+ channel->peer->cb_free(channel->peer);
+ free(channel);
+}
+
+void red_channel_shutdown(RedChannel *channel)
+{
+ red_printf("");
+ if (!channel->peer->shutdown) {
+ channel->core->set_file_handlers(channel->core, channel->peer->socket,
+ red_channel_receive, NULL, channel);
+ red_channel_pipe_clear(channel);
+ shutdown(channel->peer->socket, SHUT_RDWR);
+ channel->peer->shutdown = TRUE;
+ }
+}
+
+void red_channel_init_outgoing_messages_window(RedChannel *channel)
+{
+ channel->ack_data.messages_window = 0;
+ red_channel_push(channel);
+}
+
+int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg)
+{
+ switch (header->type) {
+ case REDC_ACK_SYNC:
+ if (header->size != sizeof(uint32_t)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+ channel->ack_data.client_generation = *(uint32_t *)(msg);
+ break;
+ case REDC_ACK:
+ if (channel->ack_data.client_generation == channel->ack_data.generation) {
+ channel->ack_data.messages_window -= CLIENT_ACK_WINDOW;
+ red_channel_push(channel);
+ }
+ break;
+ default:
+ red_printf("invalid message type %u", header->type);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void red_channel_receive(void *data)
+{
+ RedChannel *channel = (RedChannel *)data;
+ red_peer_handle_incoming(channel->peer, &channel->incoming);
+}
+
+static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
+{
+ int pos = channel->send_data.n_bufs++;
+ ASSERT(pos < MAX_SEND_BUFS);
+ channel->send_data.bufs[pos].size = size;
+ channel->send_data.bufs[pos].data = data;
+}
+
+void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size)
+{
+ __red_channel_add_buf(channel, data, size);
+ channel->send_data.header.size += size;
+}
+
+void red_channel_reset_send_data(RedChannel *channel)
+{
+ channel->send_data.n_bufs = 0;
+ channel->send_data.header.size = 0;
+ channel->send_data.header.sub_list = 0;
+ ++channel->send_data.header.serial;
+ __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(RedDataHeader));
+}
+
+void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item)
+{
+ channel->send_data.header.type = msg_type;
+ channel->send_data.item = item;
+}
+
+static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size)
+{
+ BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head;
+ ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs);
+ *vec_size = 0;
+ do {
+ vec[*vec_size].iov_base = buf->data;
+ vec[*vec_size].iov_len = buf->size;
+ (*vec_size)++;
+ buf++;
+ channel->send_data.not_sent_buf_head++;
+ } while (((*vec_size) < MAX_SEND_VEC) &&
+ (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs));
+}
+
+static void red_channel_send(RedChannel *channel)
+{
+ red_peer_handle_outgoing(channel->peer, &channel->outgoing);
+}
+
+void red_channel_begin_send_massage(RedChannel *channel)
+{
+ channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader);
+ channel->ack_data.messages_window++;
+ red_channel_send(channel);
+}
+
+static void red_channel_push(RedChannel *channel)
+{
+ PipeItem *pipe_item;
+
+ if (!channel->during_send) {
+ channel->during_send = TRUE;
+ } else {
+ return;
+ }
+
+ if (channel->send_data.blocked) {
+ red_channel_send(channel);
+ }
+
+ while ((pipe_item = red_channel_pipe_get(channel))) {
+ channel->send_item(channel, pipe_item);
+ }
+ channel->during_send = FALSE;
+}
+
+static void red_channel_opaque_push(void *data)
+{
+ red_channel_push((RedChannel *)data);
+}
+
+uint64_t red_channel_get_message_serial(RedChannel *channel)
+{
+ return channel->send_data.header.serial;
+}
+
+void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
+{
+ ring_item_init(&item->link);
+ item->type = type;
+}
+
+void red_channel_pipe_add(RedChannel *channel, PipeItem *item)
+{
+ ASSERT(channel);
+
+ channel->pipe_size++;
+ ring_add(&channel->pipe, &item->link);
+
+ red_channel_push(channel);
+}
+
+int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item)
+{
+ return ring_item_is_linked(&item->link);
+}
+
+void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
+{
+ ring_remove(&item->link);
+}
+
+void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item)
+{
+ ASSERT(channel);
+ channel->pipe_size++;
+ ring_add_before(&item->link, &channel->pipe);
+
+ red_channel_push(channel);
+}
+
+void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type)
+{
+ PipeItem *item = malloc(sizeof(*item));
+ if (!item) {
+ red_error("malloc failed");
+ }
+ red_channel_pipe_item_init(channel, item, pipe_item_type);
+ red_channel_pipe_add(channel, item);
+
+ red_channel_push(channel);
+}
+
+static PipeItem *red_channel_pipe_get(RedChannel *channel)
+{
+ PipeItem *item;
+
+ if (!channel || channel->send_data.blocked ||
+ (channel->handle_acks && (channel->ack_data.messages_window > CLIENT_ACK_WINDOW * 2)) ||
+ !(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
+ return NULL;
+ }
+
+ --channel->pipe_size;
+ ring_remove(&item->link);
+ return item;
+}
+
+static void red_channel_pipe_clear(RedChannel *channel)
+{
+ PipeItem *item;
+ if (channel->send_data.item) {
+ channel->release_item(channel, channel->send_data.item, TRUE);
+ }
+
+ while ((item = (PipeItem *)ring_get_head(&channel->pipe))) {
+ ring_remove(&item->link);
+ channel->release_item(channel, item, FALSE);
+ }
+}
+
diff --git a/server/red_channel.h b/server/red_channel.h
new file mode 100644
index 00000000..1096ba70
--- /dev/null
+++ b/server/red_channel.h
@@ -0,0 +1,182 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#ifndef _H_RED_CHANNEL
+#define _H_RED_CHANNEL
+
+#include "red_common.h"
+#include "reds.h"
+#include "vd_interface.h"
+#include "ring.h"
+
+#define MAX_SEND_BUFS 1000
+#define MAX_SEND_VEC 50
+#define CLIENT_ACK_WINDOW 20
+
+/* Basic interface for channels, without using the RedChannel interface.
+ The intention is to move towards one channel interface gradually.
+ At the final stage, this interface shouldn't be exposed. Only RedChannel will use it. */
+
+typedef int (*handle_message_proc)(void *opaque,
+ RedDataHeader *header, uint8_t *msg);
+typedef uint8_t *(*alloc_msg_recv_buf_proc)(void *opaque, RedDataHeader *msg_header);
+typedef void (*release_msg_recv_buf_proc)(void *opaque,
+ RedDataHeader *msg_header, uint8_t *msg);
+typedef void (*on_incoming_error_proc)(void *opaque);
+
+typedef struct IncomingHandler {
+ void *opaque;
+ RedDataHeader header;
+ uint32_t header_pos;
+ uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf.
+ uint32_t msg_pos;
+ handle_message_proc handle_message;
+ alloc_msg_recv_buf_proc alloc_msg_buf;
+ on_incoming_error_proc on_error; // recv error or handle_message error
+ release_msg_recv_buf_proc release_msg_buf; // for errors
+} IncomingHandler;
+
+typedef int (*get_outgoing_msg_size_proc)(void *opaque);
+typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size);
+typedef void (*on_outgoing_error_proc)(void *opaque);
+typedef void (*on_outgoing_block_proc)(void *opaque);
+typedef void (*on_outgoing_msg_done_proc)(void *opaque);
+typedef struct OutgoingHandler {
+ void *opaque;
+ struct iovec vec_buf[MAX_SEND_VEC];
+ int vec_size;
+ struct iovec *vec;
+ int pos;
+ int size;
+ get_outgoing_msg_size_proc get_msg_size;
+ prepare_outgoing_proc prepare;
+ on_outgoing_error_proc on_error;
+ on_outgoing_block_proc on_block;
+ on_outgoing_msg_done_proc on_msg_done;
+} OutgoingHandler;
+
+/* Red Channel interface */
+
+typedef struct BufDescriptor {
+ uint32_t size;
+ uint8_t *data;
+} BufDescriptor;
+
+typedef struct PipeItem {
+ RingItem link;
+ int type;
+} PipeItem;
+
+typedef struct RedChannel RedChannel;
+
+typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel,
+ RedDataHeader *msg_header);
+typedef int (*channel_handle_message_proc)(RedChannel *channel,
+ RedDataHeader *header, uint8_t *msg);
+typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel,
+ RedDataHeader *msg_header, uint8_t *msg);
+typedef void (*channel_disconnect_proc)(RedChannel *channel);
+typedef int (*channel_configure_socket_proc)(RedChannel *channel);
+typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item);
+typedef void (*channel_release_pipe_item_proc)(RedChannel *channel,
+ PipeItem *item, int item_pushed);
+
+struct RedChannel {
+ RedsStreamContext *peer;
+ CoreInterface *core;
+ int migrate;
+ int handle_acks;
+
+ struct {
+ uint32_t generation;
+ uint32_t client_generation;
+ uint32_t messages_window;
+ } ack_data;
+
+ Ring pipe;
+ uint32_t pipe_size;
+
+ struct {
+ RedDataHeader header;
+ union {
+ RedSetAck ack;
+ RedMigrate migrate;
+ } u;
+ uint32_t n_bufs;
+ BufDescriptor bufs[MAX_SEND_BUFS];
+ uint32_t size;
+ uint32_t not_sent_buf_head;
+
+ PipeItem *item;
+ int blocked;
+ } send_data;
+
+ OutgoingHandler outgoing;
+ IncomingHandler incoming;
+
+ channel_disconnect_proc disconnect;
+ channel_send_pipe_item_proc send_item;
+ channel_release_pipe_item_proc release_item;
+
+ int during_send;
+};
+
+/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
+ explicitly destroy the channel */
+RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core,
+ int migrate, int handle_acks,
+ channel_configure_socket_proc config_socket,
+ channel_disconnect_proc disconnect,
+ channel_handle_message_proc handle_message,
+ channel_alloc_msg_recv_buf_proc alloc_recv_buf,
+ channel_release_msg_recv_buf_proc release_recv_buf,
+ channel_send_pipe_item_proc send_item,
+ channel_release_pipe_item_proc release_item);
+
+void red_channel_destroy(RedChannel *channel);
+
+void red_channel_shutdown(RedChannel *channel);
+/* should be called when a new channel is ready to send messages */
+void red_channel_init_outgoing_messages_window(RedChannel *channel);
+
+/* handles general channel msgs from the client */
+int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg);
+
+/* when preparing send_data: should call reset, then init and then add_buf per buffer that is
+ being sent */
+void red_channel_reset_send_data(RedChannel *channel);
+void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item);
+void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size);
+
+uint64_t red_channel_get_message_serial(RedChannel *channel);
+
+/* when sending a msg. should first call red_channel_begin_send_massage */
+void red_channel_begin_send_massage(RedChannel *channel);
+
+void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type);
+void red_channel_pipe_add(RedChannel *channel, PipeItem *item);
+int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item);
+void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item);
+void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item);
+/* for types that use this routine -> the pipe item should be freed */
+void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type);
+
+#endif
diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c
new file mode 100644
index 00000000..e4cb2172
--- /dev/null
+++ b/server/red_tunnel_worker.c
@@ -0,0 +1,3510 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#include <stdio.h>
+#include <stdint.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <errno.h>
+#include "red_tunnel_worker.h"
+#include "red_common.h"
+#include "red.h"
+#include "reds.h"
+#include "net_slirp.h"
+#include "red_channel.h"
+
+
+//#define DEBUG_NETWORK
+
+#ifdef DEBUG_NETWORK
+#define PRINT_SCKT(sckt) red_printf("TUNNEL_DBG SOCKET(connection_id=%d port=%d, service=%d)",\
+ sckt->connection_id, ntohs(sckt->local_port), \
+ sckt->far_service->id)
+#endif
+
+#define MAX_SOCKETS_NUM 20
+
+#define MAX_SOCKET_DATA_SIZE (1024 * 2)
+
+#define SOCKET_WINDOW_SIZE 80
+#define SOCKET_TOKENS_TO_SEND 20
+#define SOCKET_TOKENS_TO_SEND_FOR_PROCESS 5 // sent in case the all the tokens were used by
+ // the client but they weren't consumed by slirp
+ // due to missing data for processing them and
+ // turning them into 'ready chunks'
+
+/* the number of buffer might exceed the window size when the analysis of the buffers in the
+ process queue need more data in order to be able to move them to the ready queue */
+#define MAX_SOCKET_IN_BUFFERS (int)(SOCKET_WINDOW_SIZE * 1.5)
+#define MAX_SOCKET_OUT_BUFFERS (int)(SOCKET_WINDOW_SIZE * 1.5)
+
+#define CONTROL_MSG_RECV_BUF_SIZE 1024
+
+typedef struct TunnelWorker TunnelWorker;
+
+enum {
+ PIPE_ITEM_TYPE_SET_ACK,
+ PIPE_ITEM_TYPE_MIGRATE,
+ PIPE_ITEM_TYPE_MIGRATE_DATA,
+ PIPE_ITEM_TYPE_TUNNEL_INIT,
+ PIPE_ITEM_TYPE_SERVICE_IP_MAP,
+ PIPE_ITEM_TYPE_SOCKET_OPEN,
+ PIPE_ITEM_TYPE_SOCKET_FIN,
+ PIPE_ITEM_TYPE_SOCKET_CLOSE,
+ PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK,
+ PIPE_ITEM_TYPE_SOCKET_DATA,
+ PIPE_ITEM_TYPE_SOCKET_TOKEN,
+};
+
+typedef struct RawTunneledBuffer RawTunneledBuffer;
+typedef void (*release_tunneled_buffer_proc_t)(RawTunneledBuffer *buf);
+
+struct RawTunneledBuffer {
+ uint8_t *data;
+ int size;
+ int max_size;
+ int refs;
+ RawTunneledBuffer *next;
+ void *usr_opaque;
+ release_tunneled_buffer_proc_t release_proc;
+};
+
+static inline RawTunneledBuffer *tunneled_buffer_ref(RawTunneledBuffer *buf)
+{
+ buf->refs++;
+ return buf;
+}
+
+static inline void tunneled_buffer_unref(RawTunneledBuffer *buf)
+{
+ if (!(--buf->refs)) {
+ buf->release_proc(buf);
+ }
+}
+
+typedef struct RedSocket RedSocket;
+
+/* data received from the quest through slirp */
+typedef struct RedSocketRawSndBuf {
+ RawTunneledBuffer base;
+ uint8_t buf[MAX_SOCKET_DATA_SIZE];
+} RedSocketRawSndBuf;
+
+/* data received from the client */
+typedef struct RedSocketRawRcvBuf {
+ RawTunneledBuffer base;
+ uint8_t buf[MAX_SOCKET_DATA_SIZE + sizeof(RedcTunnelSocketData)];
+ RedcTunnelSocketData *msg_info;
+} RedSocketRawRcvBuf;
+
+typedef struct ReadyTunneledChunk ReadyTunneledChunk;
+
+enum {
+ READY_TUNNELED_CHUNK_TYPE_ORIG,
+ READY_TUNNELED_CHUNK_TYPE_SUB, // substitution
+};
+
+
+/* A chunk of data from a RawTunneledBuffer (or a substitution for a part of it)
+ that was processed and is ready to be consumed (by slirp or by the client).
+ Each chunk has a reference to the RawTunneledBuffer it
+ was originated from. When all the reference chunks of one buffer are consumed (i.e. they are out
+ of the ready queue and they unrefed the buffer), the buffer is released */
+struct ReadyTunneledChunk {
+ uint32_t type;
+ RawTunneledBuffer *origin;
+ uint8_t *data; // if type == READY_TUNNELED_CHUNK_TYPE_ORIG, it points
+ // directly to the tunneled data. Otherwise, it is a
+ // newly allocated chunk of data
+ // that should be freed after its consumption.
+ int size;
+ ReadyTunneledChunk *next;
+};
+
+typedef struct ReadyTunneledChunkQueue {
+ ReadyTunneledChunk *head;
+ ReadyTunneledChunk *tail;
+ uint32_t offset; // first byte in the ready queue that wasn't consumed
+} ReadyTunneledChunkQueue;
+
+static void ready_queue_add_orig_chunk(ReadyTunneledChunkQueue *queue, RawTunneledBuffer *origin,
+ uint8_t *data, int size);
+static void ready_queue_pop_chunk(ReadyTunneledChunkQueue *queue);
+
+
+enum {
+ PROCESS_DIRECTION_TYPE_REQUEST, // guest request
+ PROCESS_DIRECTION_TYPE_REPLY, // reply from the service in the client LAN
+};
+
+typedef struct TunneledBufferProcessQueue TunneledBufferProcessQueue;
+
+typedef RawTunneledBuffer *(*alloc_tunneled_buffer_proc_t)(TunneledBufferProcessQueue *queue);
+/* processing the data. Notice that the buffers can be empty of
+ * data (see RedSocketRestoreTokensBuf) */
+typedef void (*analyze_new_data_proc_t)(TunneledBufferProcessQueue *queue,
+ RawTunneledBuffer *start_buf, int offset, int len);
+
+// migrating specific queue data (not the buffers themselves)
+typedef int (*get_migrate_data_proc_t)(TunneledBufferProcessQueue *queue, void **migrate_data);
+typedef void (*release_migrate_data_proc_t)(TunneledBufferProcessQueue *queue, void *migrate_data);
+typedef void (*restore_proc_t)(TunneledBufferProcessQueue *queue, uint8_t *migrate_data);
+
+struct TunneledBufferProcessQueue {
+ uint32_t service_type; // which kind of processing is performed.
+ uint32_t direction; // reply/request
+ RawTunneledBuffer *head;
+ RawTunneledBuffer *tail;
+ int head_offset;
+
+ ReadyTunneledChunkQueue *ready_chunks_queue; // the queue to push the post-process data to
+
+ void *usr_opaque;
+
+ alloc_tunneled_buffer_proc_t alloc_buf_proc; // for appending data to the queue
+ analyze_new_data_proc_t analysis_proc; // service dependent. should create the
+ // post-process chunks and remove buffers
+ // from the queue.
+ get_migrate_data_proc_t get_migrate_data_proc;
+ release_migrate_data_proc_t release_migrate_data_proc;
+ restore_proc_t restore_proc;
+};
+
+/* push and append routines are the ones that call to the analysis_proc */
+static void process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf);
+static void process_queue_append(TunneledBufferProcessQueue *queue, uint8_t *data, size_t size);
+static void process_queue_pop(TunneledBufferProcessQueue *queue);
+
+static void process_queue_clear(TunneledBufferProcessQueue *queue);
+
+
+typedef struct RedSocketOutData {
+ // Note that this pipe items can appear only once in the pipe
+ PipeItem status_pipe_item;
+ PipeItem data_pipe_item;
+ PipeItem token_pipe_item;
+
+ TunneledBufferProcessQueue *process_queue; // service type dependent
+ ReadyTunneledChunkQueue ready_chunks_queue;
+ ReadyTunneledChunk *push_tail; // last chunk in the ready queue that was pushed
+ uint32_t push_tail_size; // the subset of the push_tail that was sent
+
+ uint32_t num_buffers; // total count of buffers in process_queue + references from ready queue
+ uint32_t data_size; // total size of data that is waiting to be sent.
+
+ uint32_t num_tokens;
+ uint32_t window_size;
+} RedSocketOutData;
+
+typedef struct RedSocketInData {
+ TunneledBufferProcessQueue *process_queue; // service type dependent
+ ReadyTunneledChunkQueue ready_chunks_queue;
+
+ uint32_t num_buffers;
+
+ int32_t num_tokens; // No. tokens conusmed by slirp since the last token msg sent to the
+ // client. can be negative if we loaned some to the client (when the
+ // ready queue is empty)
+ uint32_t client_total_num_tokens;
+} RedSocketInData;
+
+typedef enum {
+ SLIRP_SCKT_STATUS_OPEN,
+ SLIRP_SCKT_STATUS_SHUTDOWN_SEND, // FIN was issued from guest
+ SLIRP_SCKT_STATUS_SHUTDOWN_RECV, // Triggered when FIN is received from client
+ SLIRP_SCKT_STATUS_DELAY_ABORT, // when out buffers overflow, we wait for client to
+ // close before we close slirp socket. see
+ //tunnel_socket_force_close
+ SLIRP_SCKT_STATUS_WAIT_CLOSE, // when shutdown_send was called after shut_recv
+ // and vice versa
+ SLIRP_SCKT_STATUS_CLOSED,
+} SlirpSocketStatus;
+
+typedef enum {
+ CLIENT_SCKT_STATUS_WAIT_OPEN,
+ CLIENT_SCKT_STATUS_OPEN,
+ CLIENT_SCKT_STATUS_SHUTDOWN_SEND, // FIN was issued from client
+ CLIENT_SCKT_STATUS_CLOSED,
+} ClientSocketStatus;
+
+typedef struct TunnelService TunnelService;
+struct RedSocket {
+ int allocated;
+
+ TunnelWorker *worker;
+
+ uint16_t connection_id;
+
+ uint16_t local_port;
+ TunnelService *far_service;
+
+ ClientSocketStatus client_status;
+ SlirpSocketStatus slirp_status;
+
+ int pushed_close;
+ int client_waits_close_ack;
+
+ SlirpSocket *slirp_sckt;
+
+ RedSocketOutData out_data;
+ RedSocketInData in_data;
+
+ int in_slirp_send;
+
+ uint32_t mig_client_status_msg; // the last status change msg that was received from
+ //the client during migration, and thus was unhandled.
+ // It is 0 if the status didn't change during migration
+ uint32_t mig_open_ack_tokens; // if REDC_TUNNEL_SOCKET_OPEN_ACK was received during
+ // migration, we store the tokens we received in the
+ // msg.
+};
+
+/********** managing send buffers ***********/
+static RawTunneledBuffer *tunnel_socket_alloc_snd_buf(RedSocket *sckt);
+static inline RedSocketRawSndBuf *__tunnel_worker_alloc_socket_snd_buf(TunnelWorker *worker);
+static RawTunneledBuffer *process_queue_alloc_snd_tunneled_buffer(
+ TunneledBufferProcessQueue *queue);
+
+static void tunnel_socket_free_snd_buf(RedSocket *sckt, RedSocketRawSndBuf *snd_buf);
+static inline void __tunnel_worker_free_socket_snd_buf(TunnelWorker *worker,
+ RedSocketRawSndBuf *snd_buf);
+static void snd_tunnled_buffer_release(RawTunneledBuffer *buf);
+
+/********** managing recv buffers ***********/
+// receive buffers are allocated before we know to which socket they are directed.
+static inline void tunnel_socket_assign_rcv_buf(RedSocket *sckt,
+ RedSocketRawRcvBuf *recv_buf, int buf_size);
+static inline RedSocketRawRcvBuf *__tunnel_worker_alloc_socket_rcv_buf(TunnelWorker *worker);
+
+static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_buf);
+static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker,
+ RedSocketRawRcvBuf *rcv_buf);
+static void rcv_tunnled_buffer_release(RawTunneledBuffer *buf);
+
+/********* managing buffers' queues ***********/
+
+static void process_queue_simple_analysis(TunneledBufferProcessQueue *queue,
+ RawTunneledBuffer *start_last_added,
+ int offset, int len);
+static inline TunneledBufferProcessQueue *__tunnel_socket_alloc_simple_process_queue(
+ RedSocket *sckt,
+ uint32_t service_type,
+ uint32_t direction_type);
+static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_request_process_queue(
+ RedSocket *sckt);
+static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_reply_process_queue(
+ RedSocket *sckt);
+static void free_simple_process_queue(TunneledBufferProcessQueue *queue);
+
+typedef struct ServiceCallback {
+ /* allocating the the queue & setting the analysis proc by service type */
+ TunneledBufferProcessQueue *(*alloc_process_queue)(RedSocket * sckt);
+ void (*free_process_queue)(TunneledBufferProcessQueue *queue);
+} ServiceCallback;
+
+/* Callbacks for process queue manipulation according to the service type and
+ the direction of the data.
+ The access is performed by [service_type][direction] */
+static const ServiceCallback SERVICES_CALLBACKS[3][2] = {
+ {{NULL, NULL},
+ {NULL, NULL}},
+ {{tunnel_socket_alloc_simple_print_request_process_queue, free_simple_process_queue},
+ {tunnel_socket_alloc_simple_print_reply_process_queue, free_simple_process_queue}},
+ {{tunnel_socket_alloc_simple_print_request_process_queue, free_simple_process_queue},
+ {tunnel_socket_alloc_simple_print_reply_process_queue, free_simple_process_queue}}
+};
+
+/****************************************************
+* Migration data
+****************************************************/
+typedef struct TunnelChannel TunnelChannel;
+
+#define TUNNEL_MIGRATE_DATA_MAGIC (*(uint32_t *)"TMDA")
+#define TUNNEL_MIGRATE_DATA_VERSION 1
+
+#define TUNNEL_MIGRATE_NULL_OFFSET = ~0;
+
+typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketOutData {
+ uint32_t num_tokens;
+ uint32_t window_size;
+
+ uint32_t process_buf_size;
+ uint32_t process_buf;
+
+ uint32_t process_queue_size;
+ uint32_t process_queue;
+
+ uint32_t ready_buf_size;
+ uint32_t ready_buf;
+} TunnelMigrateSocketOutData;
+
+typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketInData {
+ int32_t num_tokens;
+ uint32_t client_total_num_tokens;
+
+ uint32_t process_buf_size;
+ uint32_t process_buf;
+
+ uint32_t process_queue_size;
+ uint32_t process_queue;
+
+ uint32_t ready_buf_size;
+ uint32_t ready_buf;
+} TunnelMigrateSocketInData;
+
+
+typedef struct __attribute__ ((__packed__)) TunnelMigrateSocket {
+ uint16_t connection_id;
+ uint16_t local_port;
+ uint32_t far_service_id;
+
+ uint16_t client_status;
+ uint16_t slirp_status;
+
+ uint8_t pushed_close;
+ uint8_t client_waits_close_ack;
+
+ TunnelMigrateSocketOutData out_data;
+ TunnelMigrateSocketInData in_data;
+
+ uint32_t slirp_sckt;
+
+ uint32_t mig_client_status_msg;
+ uint32_t mig_open_ack_tokens;
+} TunnelMigrateSocket;
+
+typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketList {
+ uint16_t num_sockets;
+ uint32_t sockets[0]; // offsets in TunnelMigrateData.data to TunnelMigrateSocket
+} TunnelMigrateSocketList;
+
+typedef struct __attribute__ ((__packed__)) TunnelMigrateService {
+ uint32_t type;
+ uint32_t id;
+ uint32_t group;
+ uint32_t port;
+ uint32_t name;
+ uint32_t description;
+ uint8_t virt_ip[4];
+} TunnelMigrateService;
+
+typedef struct __attribute__ ((__packed__)) TunnelMigratePrintService {
+ TunnelMigrateService base;
+ uint8_t ip[4];
+} TunnelMigratePrintService;
+
+typedef struct __attribute__ ((__packed__)) TunnelMigrateServicesList {
+ uint32_t num_services;
+ uint32_t services[0];
+} TunnelMigrateServicesList;
+
+//todo: add ack_generation
+typedef struct __attribute__ ((__packed__)) TunnelMigrateData {
+ uint32_t magic;
+ uint32_t version;
+ uint64_t message_serial;
+
+ uint32_t slirp_state; // offset in data to slirp state
+ uint32_t sockets_list; // offset in data to TunnelMigrateSocketList
+ uint32_t services_list;
+
+ uint8_t data[0];
+} TunnelMigrateData;
+
+typedef struct TunnelMigrateSocketItem {
+ RedSocket *socket;
+ TunnelMigrateSocket mig_socket;
+ void *out_process_queue;
+ void *in_process_queue; // queue data specific for service
+ void *slirp_socket;
+ uint32_t slirp_socket_size;
+} TunnelMigrateSocketItem;
+
+typedef struct TunnelMigrateServiceItem {
+ TunnelService *service;
+ union {
+ TunnelMigrateService generic_service;
+ TunnelMigratePrintService print_service;
+ } u;
+} TunnelMigrateServiceItem;
+
+typedef struct TunnelMigrateItem {
+ PipeItem base;
+
+ void *slirp_state;
+ uint64_t slirp_state_size;
+
+ TunnelMigrateServicesList *services_list;
+ uint32_t services_list_size;
+
+ TunnelMigrateServiceItem *services;
+
+ TunnelMigrateSocketList *sockets_list;
+ uint32_t sockets_list_size;
+
+ TunnelMigrateSocketItem sockets_data[MAX_SOCKETS_NUM];
+} TunnelMigrateItem;
+
+static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel);
+
+/*******************************************************************************************/
+
+/* use for signaling that 1) subroutines failed 2)routines in the interface for slirp
+ failed (which triggred from a call to slirp) */
+#define SET_TUNNEL_ERROR(channel,format, ...) { \
+ channel->tunnel_error = TRUE; \
+ red_printf(format, ## __VA_ARGS__); \
+}
+
+/* should be checked after each subroutine that may cause error or fter calls to slirp routines */
+#define CHECK_TUNNEL_ERROR(channel) (channel->tunnel_error)
+
+struct TunnelChannel {
+ RedChannel base;
+ TunnelWorker *worker;
+ int mig_inprogress;
+ int expect_migrate_mark;
+ int expect_migrate_data;
+
+ int tunnel_error;
+
+ struct {
+ union {
+ RedTunnelInit init;
+ RedTunnelServiceIpMap service_ip;
+ RedTunnelSocketOpen socket_open;
+ RedTunnelSocketFin socket_fin;
+ RedTunnelSocketClose socket_close;
+ RedTunnelSocketClosedAck socket_close_ack;
+ RedTunnelSocketData socket_data;
+ RedTunnelSocketTokens socket_token;
+ TunnelMigrateData migrate_data;
+ } u;
+ } send_data;
+
+ uint8_t control_rcv_buf[CONTROL_MSG_RECV_BUF_SIZE];
+};
+
+typedef struct RedSlirpNetworkInterface {
+ SlirpUsrNetworkInterface base;
+ TunnelWorker *worker;
+} RedSlirpNetworkInterface;
+
+struct TunnelService {
+ RingItem ring_item;
+ PipeItem pipe_item;
+ uint32_t type;
+ uint32_t id;
+ uint32_t group;
+ uint32_t port;
+ char *name;
+ char *description;
+
+ struct in_addr virt_ip;
+};
+
+typedef struct TunnelPrintService {
+ TunnelService base;
+ uint8_t ip[4];
+} TunnelPrintService;
+
+struct TunnelWorker {
+ Channel channel_interface; // for reds
+ TunnelChannel *channel;
+
+ CoreInterface *core_interface;
+ NetWireInterface *vlan_interface;
+ RedSlirpNetworkInterface tunnel_interface;
+ RedSlirpNetworkInterface null_interface;
+
+ RedSocket sockets[MAX_SOCKETS_NUM]; // the sockets are in the worker and not
+ // in the channel since the slirp sockets
+ // can be still alive (but during close) after
+ // the channel was disconnected
+
+ int num_sockets;
+
+ RedSocketRawSndBuf *free_snd_buf;
+ RedSocketRawRcvBuf *free_rcv_buf;
+
+ Ring services;
+ int num_services;
+};
+
+
+/*********************************************************************
+ * Tunnel interface
+ *********************************************************************/
+static void tunnel_channel_disconnect(RedChannel *channel);
+
+/* networking interface for slirp */
+static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface);
+static void qemu_output(SlirpUsrNetworkInterface *usr_interface, const uint8_t *pkt, int pkt_len);
+static int null_tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
+ struct in_addr src_addr, uint16_t src_port,
+ struct in_addr dst_addr, uint16_t dst_port,
+ SlirpSocket *slirp_s, UserSocket **o_usr_s);
+static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
+ struct in_addr src_addr, uint16_t src_port,
+ struct in_addr dst_addr, uint16_t dst_port,
+ SlirpSocket *slirp_s, UserSocket **o_usr_s);
+static void null_tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque);
+static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque);
+static int null_tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len, uint8_t urgent);
+static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len, uint8_t urgent);
+static int null_tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len);
+static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len);
+static void null_tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
+ UserSocket *opaque);
+static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
+ UserSocket *opaque);
+static void null_tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
+ UserSocket *opaque);
+static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
+ UserSocket *opaque);
+
+static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface,
+ timer_proc_t proc, void *opaque);
+static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms);
+
+
+/* reds interface */
+static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps);
+static void handle_tunnel_channel_shutdown(struct Channel *channel);
+static void handle_tunnel_channel_migrate(struct Channel *channel);
+
+
+static void tunnel_shutdown(TunnelWorker *worker)
+{
+ int i;
+ red_printf("");
+ /* shutdown input from channel */
+ if (worker->channel) {
+ red_channel_shutdown(&worker->channel->base);
+ }
+
+ /* shutdown socket pipe items */
+ for (i = 0; i < MAX_SOCKETS_NUM; i++) {
+ RedSocket *sckt = worker->sockets + i;
+ if (sckt->allocated) {
+ sckt->client_status = CLIENT_SCKT_STATUS_CLOSED;
+ sckt->client_waits_close_ack = FALSE;
+ }
+ }
+
+ /* shutdown input from slirp */
+ net_slirp_set_net_interface(&worker->null_interface.base);
+}
+
+/*****************************************************************
+* Managing raw tunneled buffers storage
+******************************************************************/
+
+/********** send buffers ***********/
+static RawTunneledBuffer *tunnel_socket_alloc_snd_buf(RedSocket *sckt)
+{
+ RedSocketRawSndBuf *ret = __tunnel_worker_alloc_socket_snd_buf(sckt->worker);
+ ret->base.usr_opaque = sckt;
+ ret->base.release_proc = snd_tunnled_buffer_release;
+ sckt->out_data.num_buffers++;
+ return &ret->base;
+}
+
+static inline RedSocketRawSndBuf *__tunnel_worker_alloc_socket_snd_buf(TunnelWorker *worker)
+{
+ RedSocketRawSndBuf *ret;
+ if (worker->free_snd_buf) {
+ ret = worker->free_snd_buf;
+ worker->free_snd_buf = (RedSocketRawSndBuf *)worker->free_snd_buf->base.next;
+ } else {
+ ret = (RedSocketRawSndBuf *)malloc(sizeof(*ret));
+ if (!ret) {
+ red_error("malloc of send buf failed");
+ }
+ }
+ ret->base.data = ret->buf;
+ ret->base.size = 0;
+ ret->base.max_size = MAX_SOCKET_DATA_SIZE;
+ ret->base.usr_opaque = NULL;
+ ret->base.refs = 1;
+ ret->base.next = NULL;
+
+ return ret;
+}
+
+static void tunnel_socket_free_snd_buf(RedSocket *sckt, RedSocketRawSndBuf *snd_buf)
+{
+ sckt->out_data.num_buffers--;
+ __tunnel_worker_free_socket_snd_buf(sckt->worker, snd_buf);
+}
+
+static inline void __tunnel_worker_free_socket_snd_buf(TunnelWorker *worker,
+ RedSocketRawSndBuf *snd_buf)
+{
+ snd_buf->base.size = 0;
+ snd_buf->base.next = &worker->free_snd_buf->base;
+ worker->free_snd_buf = snd_buf;
+}
+
+static RawTunneledBuffer *process_queue_alloc_snd_tunneled_buffer(TunneledBufferProcessQueue *queue)
+{
+ return tunnel_socket_alloc_snd_buf((RedSocket *)queue->usr_opaque);
+}
+
+static void snd_tunnled_buffer_release(RawTunneledBuffer *buf)
+{
+ tunnel_socket_free_snd_buf((RedSocket *)buf->usr_opaque, (RedSocketRawSndBuf *)buf);
+}
+
+/********** recv buffers ***********/
+
+static inline void tunnel_socket_assign_rcv_buf(RedSocket *sckt,
+ RedSocketRawRcvBuf *recv_buf, int buf_size)
+{
+ ASSERT(!recv_buf->base.usr_opaque);
+ // the rcv buffer was allocated by tunnel_channel_alloc_msg_rcv_buf
+ // before we could know which of the sockets it belongs to, so the
+ // assignment to the socket is performed now
+ recv_buf->base.size = buf_size;
+ recv_buf->base.usr_opaque = sckt;
+ recv_buf->base.release_proc = rcv_tunnled_buffer_release;
+ sckt->in_data.num_buffers++;
+ process_queue_push(sckt->in_data.process_queue, &recv_buf->base);
+}
+
+static inline RedSocketRawRcvBuf *__tunnel_worker_alloc_socket_rcv_buf(TunnelWorker *worker)
+{
+ RedSocketRawRcvBuf *ret;
+ if (worker->free_rcv_buf) {
+ ret = worker->free_rcv_buf;
+ worker->free_rcv_buf = (RedSocketRawRcvBuf *)worker->free_rcv_buf->base.next;
+ } else {
+ ret = (RedSocketRawRcvBuf *)malloc(sizeof(*ret));
+ if (!ret) {
+ red_error("malloc of send buf failed");
+ }
+ }
+ ret->msg_info = (RedcTunnelSocketData *)ret->buf;
+ ret->base.usr_opaque = NULL;
+ ret->base.data = ret->msg_info->data;
+ ret->base.size = 0;
+ ret->base.max_size = MAX_SOCKET_DATA_SIZE;
+ ret->base.refs = 1;
+ ret->base.next = NULL;
+
+ return ret;
+}
+
+static inline void __process_rcv_buf_tokens(TunnelChannel *channel, RedSocket *sckt)
+{
+ if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) || red_channel_pipe_item_is_linked(
+ &channel->base, &sckt->out_data.token_pipe_item) || channel->mig_inprogress) {
+ return;
+ }
+
+ if ((sckt->in_data.num_tokens >= SOCKET_TOKENS_TO_SEND) ||
+ (!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head)) {
+ sckt->out_data.token_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_TOKEN;
+ red_channel_pipe_add(&channel->base, &sckt->out_data.token_pipe_item);
+ }
+}
+
+static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_buf)
+{
+ --sckt->in_data.num_buffers;
+ __tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf);
+ ++sckt->in_data.num_tokens;
+ __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+}
+
+static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker,
+ RedSocketRawRcvBuf *rcv_buf)
+{
+ rcv_buf->base.next = &worker->free_rcv_buf->base;
+ worker->free_rcv_buf = rcv_buf;
+}
+
+static void rcv_tunnled_buffer_release(RawTunneledBuffer *buf)
+{
+ tunnel_socket_free_rcv_buf((RedSocket *)buf->usr_opaque,
+ (RedSocketRawRcvBuf *)buf);
+}
+
+/************************
+* Process & Ready queue
+*************************/
+
+static inline void __process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf)
+{
+ buf->next = NULL;
+ if (!queue->head) {
+ queue->head = buf;
+ queue->tail = buf;
+ } else {
+ queue->tail->next = buf;
+ queue->tail = buf;
+ }
+}
+
+static void process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf)
+{
+ __process_queue_push(queue, buf);
+ queue->analysis_proc(queue, buf, 0, buf->size);
+}
+
+static void process_queue_append(TunneledBufferProcessQueue *queue, uint8_t *data, size_t size)
+{
+ RawTunneledBuffer *start_buf = NULL;
+ int start_offset = 0;
+ int copied = 0;
+
+ if (queue->tail) {
+ RawTunneledBuffer *buf = queue->tail;
+ int space = buf->max_size - buf->size;
+ if (space) {
+ int copy_count = MIN(size, space);
+ start_buf = buf;
+ start_offset = buf->size;
+ memcpy(buf->data + buf->size, data, copy_count);
+ copied += copy_count;
+ buf->size += copy_count;
+ }
+ }
+
+
+ while (copied < size) {
+ RawTunneledBuffer *buf = queue->alloc_buf_proc(queue);
+ int copy_count = MIN(size - copied, buf->max_size);
+ memcpy(buf->data, data + copied, copy_count);
+ copied += copy_count;
+ buf->size = copy_count;
+
+ __process_queue_push(queue, buf);
+
+ if (!start_buf) {
+ start_buf = buf;
+ start_offset = 0;
+ }
+ }
+
+ queue->analysis_proc(queue, start_buf, start_offset, size);
+}
+
+static void process_queue_pop(TunneledBufferProcessQueue *queue)
+{
+ RawTunneledBuffer *prev_head;
+ ASSERT(queue->head && queue->tail);
+ prev_head = queue->head;
+ queue->head = queue->head->next;
+ if (!queue->head) {
+ queue->tail = NULL;
+ }
+
+ tunneled_buffer_unref(prev_head);
+}
+
+static void process_queue_clear(TunneledBufferProcessQueue *queue)
+{
+ while (queue->head) {
+ process_queue_pop(queue);
+ }
+}
+
+static void __ready_queue_push(ReadyTunneledChunkQueue *queue, ReadyTunneledChunk *chunk)
+{
+ chunk->next = NULL;
+ if (queue->tail) {
+ queue->tail->next = chunk;
+ queue->tail = chunk;
+ } else {
+ queue->head = chunk;
+ queue->tail = chunk;
+ }
+}
+
+static void ready_queue_add_orig_chunk(ReadyTunneledChunkQueue *queue, RawTunneledBuffer *origin,
+ uint8_t *data, int size)
+{
+ ReadyTunneledChunk *chunk = malloc(sizeof(ReadyTunneledChunk));
+ chunk->type = READY_TUNNELED_CHUNK_TYPE_ORIG;
+ chunk->origin = tunneled_buffer_ref(origin);
+ chunk->data = data;
+ chunk->size = size;
+
+ __ready_queue_push(queue, chunk);
+}
+
+static void ready_queue_pop_chunk(ReadyTunneledChunkQueue *queue)
+{
+ ReadyTunneledChunk *chunk = queue->head;
+ ASSERT(queue->head);
+ queue->head = queue->head->next;
+
+ if (!queue->head) {
+ queue->tail = NULL;
+ }
+
+ tunneled_buffer_unref(chunk->origin);
+ if (chunk->type != READY_TUNNELED_CHUNK_TYPE_ORIG) {
+ free(chunk->data);
+ }
+ free(chunk);
+}
+
+static void ready_queue_clear(ReadyTunneledChunkQueue *queue)
+{
+ while (queue->head) {
+ ready_queue_pop_chunk(queue);
+ }
+}
+
+static void process_queue_simple_analysis(TunneledBufferProcessQueue *queue,
+ RawTunneledBuffer *start_last_added, int offset, int len)
+{
+ ASSERT(offset == 0);
+ ASSERT(start_last_added == queue->head);
+
+ while (queue->head) {
+ ready_queue_add_orig_chunk(queue->ready_chunks_queue, queue->head, queue->head->data,
+ queue->head->size);
+ process_queue_pop(queue);
+ }
+}
+
+static int process_queue_simple_get_migrate_data(TunneledBufferProcessQueue *queue,
+ void **migrate_data)
+{
+ *migrate_data = NULL;
+ return 0;
+}
+
+static void process_queue_simple_release_migrate_data(TunneledBufferProcessQueue *queue,
+ void *migrate_data)
+{
+ ASSERT(!migrate_data);
+}
+
+static void process_queue_simple_restore(TunneledBufferProcessQueue *queue, uint8_t *migrate_data)
+{
+}
+
+static inline TunneledBufferProcessQueue *__tunnel_socket_alloc_simple_process_queue(
+ RedSocket *sckt,
+ uint32_t service_type,
+ uint32_t direction_type)
+{
+ TunneledBufferProcessQueue *ret_queue = malloc(sizeof(TunneledBufferProcessQueue));
+ memset(ret_queue, 0, sizeof(TunneledBufferProcessQueue));
+ ret_queue->service_type = service_type;
+ ret_queue->direction = direction_type;
+ ret_queue->usr_opaque = sckt;
+ // NO need for allocations by the process queue when getting replies. The buffer is created
+ // when the msg is received
+ if (direction_type == PROCESS_DIRECTION_TYPE_REQUEST) {
+ ret_queue->alloc_buf_proc = process_queue_alloc_snd_tunneled_buffer;
+ ret_queue->ready_chunks_queue = &sckt->out_data.ready_chunks_queue;
+ } else {
+ ret_queue->ready_chunks_queue = &sckt->in_data.ready_chunks_queue;
+ }
+
+ ret_queue->analysis_proc = process_queue_simple_analysis;
+
+ ret_queue->get_migrate_data_proc = process_queue_simple_get_migrate_data;
+ ret_queue->release_migrate_data_proc = process_queue_simple_release_migrate_data;
+ ret_queue->restore_proc = process_queue_simple_restore;
+ return ret_queue;
+}
+
+static void free_simple_process_queue(TunneledBufferProcessQueue *queue)
+{
+ process_queue_clear(queue);
+ free(queue);
+}
+
+static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_request_process_queue(
+ RedSocket *sckt)
+{
+ return __tunnel_socket_alloc_simple_process_queue(sckt,
+ RED_TUNNEL_SERVICE_TYPE_IPP,
+ PROCESS_DIRECTION_TYPE_REQUEST);
+}
+
+static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_reply_process_queue(
+ RedSocket *sckt)
+{
+ return __tunnel_socket_alloc_simple_process_queue(sckt,
+ RED_TUNNEL_SERVICE_TYPE_IPP,
+ PROCESS_DIRECTION_TYPE_REPLY);
+}
+
+static void tunnel_send_packet(void *opaque_tunnel, const uint8_t *pkt, int pkt_len)
+{
+ TunnelWorker *worker = (TunnelWorker *)opaque_tunnel;
+ ASSERT(worker);
+
+ if (worker->channel && worker->channel->base.migrate) {
+ return; // during migration and the tunnel state hasn't been restored yet.
+ }
+
+ net_slirp_input(pkt, pkt_len);
+}
+
+void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface)
+{
+ TunnelWorker *worker = (TunnelWorker *)malloc(sizeof(TunnelWorker));
+
+ if (!worker) {
+ red_error("malloc of tunnel worker failed");
+ }
+ memset(worker, 0, sizeof(*worker));
+
+ worker->core_interface = core_interface;
+ worker->vlan_interface = vlan_interface;
+
+ worker->tunnel_interface.base.slirp_can_output = qemu_can_output;
+ worker->tunnel_interface.base.slirp_output = qemu_output;
+ worker->tunnel_interface.base.connect = tunnel_socket_connect;
+ worker->tunnel_interface.base.send = tunnel_socket_send;
+ worker->tunnel_interface.base.recv = tunnel_socket_recv;
+ worker->tunnel_interface.base.close = tunnel_socket_close;
+ worker->tunnel_interface.base.shutdown_recv = tunnel_socket_shutdown_recv;
+ worker->tunnel_interface.base.shutdown_send = tunnel_socket_shutdown_send;
+ worker->tunnel_interface.base.create_timer = create_timer;
+ worker->tunnel_interface.base.arm_timer = arm_timer;
+
+ worker->tunnel_interface.worker = worker;
+
+ worker->null_interface.base.slirp_can_output = qemu_can_output;
+ worker->null_interface.base.slirp_output = qemu_output;
+ worker->null_interface.base.connect = null_tunnel_socket_connect;
+ worker->null_interface.base.send = null_tunnel_socket_send;
+ worker->null_interface.base.recv = null_tunnel_socket_recv;
+ worker->null_interface.base.close = null_tunnel_socket_close;
+ worker->null_interface.base.shutdown_recv = null_tunnel_socket_shutdown_recv;
+ worker->null_interface.base.shutdown_send = null_tunnel_socket_shutdown_send;
+ worker->null_interface.base.create_timer = create_timer;
+ worker->null_interface.base.arm_timer = arm_timer;
+
+ worker->null_interface.worker = worker;
+
+ worker->channel_interface.type = RED_CHANNEL_TUNNEL;
+ worker->channel_interface.id = 0;
+ worker->channel_interface.link = handle_tunnel_channel_link;
+ worker->channel_interface.shutdown = handle_tunnel_channel_shutdown;
+ worker->channel_interface.migrate = handle_tunnel_channel_migrate;
+ worker->channel_interface.data = worker;
+
+ ring_init(&worker->services);
+ reds_register_channel(&worker->channel_interface);
+
+ net_slirp_init(worker->vlan_interface->get_ip(worker->vlan_interface),
+ TRUE,
+ &worker->null_interface.base);
+ if (!vlan_interface->register_route_packet(vlan_interface, tunnel_send_packet, worker)) {
+ red_error("register route packet failed");
+ }
+ return worker;
+}
+
+/* returns the first service that has the same group id (NULL if not found) */
+static inline TunnelService *__tunnel_worker_find_service_of_group(TunnelWorker *worker,
+ uint32_t group)
+{
+ TunnelService *service;
+ for (service = (TunnelService *)ring_get_head(&worker->services);
+ service;
+ service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) {
+ if (service->group == group) {
+ return service;
+ }
+ }
+
+ return NULL;
+}
+
+static inline TunnelService *__tunnel_worker_add_service(TunnelWorker *worker, uint32_t size,
+ uint32_t type, uint32_t id,
+ uint32_t group, uint32_t port,
+ char *name, char *description,
+ struct in_addr *virt_ip)
+{
+ TunnelService *new_service = malloc(size);
+
+ if (!new_service) {
+ red_error("malloc of TunnelService failed");
+ }
+ memset(new_service, 0, size);
+
+ if (!virt_ip) {
+ TunnelService *service_of_same_group;
+ if (!(service_of_same_group = __tunnel_worker_find_service_of_group(worker, group))) {
+ if (!net_slirp_allocate_virtual_ip(&new_service->virt_ip)) {
+ red_printf("failed to allocate virtual ip");
+ free(new_service);
+ return NULL;
+ }
+ } else {
+ if (strcmp(name, service_of_same_group->name) == 0) {
+ new_service->virt_ip.s_addr = service_of_same_group->virt_ip.s_addr;
+ } else {
+ red_printf("inconsistent name for service group %d", group);
+ free(new_service);
+ return NULL;
+ }
+ }
+ } else {
+ new_service->virt_ip.s_addr = virt_ip->s_addr;
+ }
+
+ ring_item_init(&new_service->ring_item);
+ new_service->type = type;
+ new_service->id = id;
+ new_service->group = group;
+ new_service->port = port;
+
+ new_service->name = strdup(name);
+ new_service->description = strdup(description);
+
+ ring_add(&worker->services, &new_service->ring_item);
+ worker->num_services++;
+
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG: ==>SERVICE ADDED: id=%d virt ip=%s port=%d name=%s desc=%s",
+ new_service->id, inet_ntoa(new_service->virt_ip),
+ new_service->port, new_service->name, new_service->description);
+#endif
+ if (!virt_ip) {
+ new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP;
+ red_channel_pipe_add(&worker->channel->base, &new_service->pipe_item);
+ }
+
+ return new_service;
+}
+
+static TunnelService *tunnel_worker_add_service(TunnelWorker *worker, uint32_t size,
+ RedcTunnelAddGenericService *redc_service)
+{
+ return __tunnel_worker_add_service(worker, size, redc_service->type,
+ redc_service->id, redc_service->group,
+ redc_service->port,
+ (char *)(((uint8_t *)redc_service) +
+ redc_service->name),
+ (char *)(((uint8_t *)redc_service) +
+ redc_service->description), NULL);
+}
+
+static inline void tunnel_worker_free_service(TunnelWorker *worker, TunnelService *service)
+{
+ ring_remove(&service->ring_item);
+ free(service->name);
+ free(service->description);
+ free(service);
+ worker->num_services--;
+}
+
+static void tunnel_worker_free_print_service(TunnelWorker *worker, TunnelPrintService *service)
+{
+ tunnel_worker_free_service(worker, &service->base);
+}
+
+static TunnelPrintService *tunnel_worker_add_print_service(TunnelWorker *worker,
+ RedcTunnelAddPrintService *redc_service)
+{
+ TunnelPrintService *service;
+
+ service = (TunnelPrintService *)tunnel_worker_add_service(worker, sizeof(TunnelPrintService),
+ &redc_service->base);
+
+ if (!service) {
+ return NULL;
+ }
+
+ if (redc_service->ip.type == RED_TUNNEL_IP_TYPE_IPv4) {
+ memcpy(service->ip, redc_service->ip.data, sizeof(RedTunnelIPv4));
+ } else {
+ red_printf("unexpected ip type=%d", redc_service->ip.type);
+ tunnel_worker_free_print_service(worker, service);
+ return NULL;
+ }
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG: ==>PRINT SERVICE ADDED: ip=%d.%d.%d.%d", service->ip[0],
+ service->ip[1], service->ip[2], service->ip[3]);
+#endif
+ return service;
+}
+
+static int tunnel_channel_handle_service_add(TunnelChannel *channel,
+ RedcTunnelAddGenericService *service_msg)
+{
+ TunnelService *out_service = NULL;
+ if (service_msg->type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ out_service = &tunnel_worker_add_print_service(channel->worker,
+ (RedcTunnelAddPrintService *)
+ service_msg)->base;
+ } else if (service_msg->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
+ out_service = tunnel_worker_add_service(channel->worker, sizeof(TunnelService),
+ service_msg);
+ } else {
+ red_printf("invalid service type");
+ }
+
+ free(service_msg);
+ return (out_service != NULL);
+}
+
+static inline TunnelService *tunnel_worker_find_service_by_id(TunnelWorker *worker, uint32_t id)
+{
+ TunnelService *service;
+ for (service = (TunnelService *)ring_get_head(&worker->services);
+ service;
+ service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) {
+ if (service->id == id) {
+ return service;
+ }
+ }
+
+ return NULL;
+}
+
+static inline TunnelService *tunnel_worker_find_service_by_addr(TunnelWorker *worker,
+ struct in_addr *virt_ip,
+ uint32_t port)
+{
+ TunnelService *service;
+ for (service = (TunnelService *)ring_get_head(&worker->services);
+ service;
+ service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) {
+ if ((virt_ip->s_addr == service->virt_ip.s_addr) && (port == service->port)) {
+ return service;
+ }
+ }
+
+ return NULL;
+}
+
+static inline void tunnel_worker_clear_routed_network(TunnelWorker *worker)
+{
+ while (!ring_is_empty(&worker->services)) {
+ TunnelService *service = (TunnelService *)ring_get_head(&worker->services);
+ if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
+ tunnel_worker_free_service(worker, service);
+ } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ tunnel_worker_free_print_service(worker, (TunnelPrintService *)service);
+ } else {
+ red_error("unexpected service type");
+ }
+ }
+
+ net_slirp_clear_virtual_ips();
+}
+
+static inline RedSocket *__tunnel_worker_find_free_socket(TunnelWorker *worker)
+{
+ int i;
+ RedSocket *ret = NULL;
+
+ if (worker->num_sockets == MAX_SOCKETS_NUM) {
+ return NULL;
+ }
+
+ for (i = 0; i < MAX_SOCKETS_NUM; i++) {
+ if (!worker->sockets[i].allocated) {
+ ret = worker->sockets + i;
+ ret->connection_id = i;
+ break;
+ }
+ }
+
+ ASSERT(ret);
+ return ret;
+}
+
+static inline void __tunnel_worker_add_socket(TunnelWorker *worker, RedSocket *sckt)
+{
+ ASSERT(!sckt->allocated);
+ sckt->allocated = TRUE;
+ worker->num_sockets++;
+}
+
+static inline void tunnel_worker_alloc_socket(TunnelWorker *worker, RedSocket *sckt,
+ uint16_t local_port, TunnelService *far_service,
+ SlirpSocket *slirp_s)
+{
+ ASSERT(far_service);
+ sckt->worker = worker;
+ sckt->local_port = local_port;
+ sckt->far_service = far_service;
+ sckt->out_data.num_tokens = 0;
+
+ sckt->slirp_status = SLIRP_SCKT_STATUS_OPEN;
+ sckt->client_status = CLIENT_SCKT_STATUS_WAIT_OPEN;
+ sckt->slirp_sckt = slirp_s;
+
+ sckt->out_data.process_queue = SERVICES_CALLBACKS[far_service->type][
+ PROCESS_DIRECTION_TYPE_REQUEST].alloc_process_queue(sckt);
+ sckt->in_data.process_queue = SERVICES_CALLBACKS[far_service->type][
+ PROCESS_DIRECTION_TYPE_REPLY].alloc_process_queue(sckt);
+ __tunnel_worker_add_socket(worker, sckt);
+}
+
+static inline void __tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt)
+{
+ memset(sckt, 0, sizeof(*sckt));
+ worker->num_sockets--;
+}
+
+static RedSocket *tunnel_worker_create_socket(TunnelWorker *worker, uint16_t local_port,
+ TunnelService *far_service,
+ SlirpSocket *slirp_s)
+{
+ RedSocket *new_socket;
+ ASSERT(worker);
+ new_socket = __tunnel_worker_find_free_socket(worker);
+
+ if (!new_socket) {
+ red_error("malloc of RedSocket failed");
+ }
+
+ tunnel_worker_alloc_socket(worker, new_socket, local_port, far_service, slirp_s);
+
+ return new_socket;
+}
+
+static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt)
+{
+ if (worker->channel) {
+ if (red_channel_pipe_item_is_linked(&worker->channel->base,
+ &sckt->out_data.data_pipe_item)) {
+ red_channel_pipe_item_remove(&worker->channel->base,
+ &sckt->out_data.data_pipe_item);
+ return;
+ }
+
+ if (red_channel_pipe_item_is_linked(&worker->channel->base,
+ &sckt->out_data.status_pipe_item)) {
+ red_channel_pipe_item_remove(&worker->channel->base,
+ &sckt->out_data.status_pipe_item);
+ return;
+ }
+
+ if (red_channel_pipe_item_is_linked(&worker->channel->base,
+ &sckt->out_data.token_pipe_item)) {
+ red_channel_pipe_item_remove(&worker->channel->base,
+ &sckt->out_data.token_pipe_item);
+ return;
+ }
+ }
+
+ SERVICES_CALLBACKS[sckt->far_service->type][
+ PROCESS_DIRECTION_TYPE_REQUEST].free_process_queue(sckt->out_data.process_queue);
+ SERVICES_CALLBACKS[sckt->far_service->type][
+ PROCESS_DIRECTION_TYPE_REPLY].free_process_queue(sckt->in_data.process_queue);
+
+ ready_queue_clear(&sckt->out_data.ready_chunks_queue);
+ ready_queue_clear(&sckt->in_data.ready_chunks_queue);
+
+ __tunnel_worker_free_socket(worker, sckt);
+}
+
+static inline RedSocket *tunnel_worker_find_socket(TunnelWorker *worker,
+ uint16_t local_port,
+ uint32_t far_service_id)
+{
+ RedSocket *sckt;
+ int allocated = 0;
+
+ for (sckt = worker->sockets; allocated < worker->num_sockets; sckt++) {
+ if (sckt->allocated) {
+ allocated++;
+ if ((sckt->local_port == local_port) &&
+ (sckt->far_service->id == far_service_id)) {
+ return sckt;
+ }
+ }
+ }
+ return NULL;
+}
+
+static inline void __tunnel_socket_add_fin_to_pipe(TunnelChannel *channel, RedSocket *sckt)
+{
+ ASSERT(!red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item));
+ sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_FIN;
+ red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
+}
+
+static inline void __tunnel_socket_add_close_to_pipe(TunnelChannel *channel, RedSocket *sckt)
+{
+ ASSERT(!channel->mig_inprogress);
+
+ if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) {
+ ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN);
+ // close is stronger than FIN
+ red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item);
+ }
+ sckt->pushed_close = TRUE;
+ sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSE;
+ red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
+}
+
+static inline void __tunnel_socket_add_close_ack_to_pipe(TunnelChannel *channel, RedSocket *sckt)
+{
+ ASSERT(!channel->mig_inprogress);
+
+ if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) {
+ ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN);
+ // close is stronger than FIN
+ red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item);
+ }
+
+ sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK;
+ red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item);
+}
+
+/*
+ Send close msg to the client.
+ If possible, notify slirp to recv data (which will return 0)
+ When close ack is received from client, we notify slirp (maybe again) if needed.
+*/
+static void tunnel_socket_force_close(TunnelChannel *channel, RedSocket *sckt)
+{
+ if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.token_pipe_item)) {
+ red_channel_pipe_item_remove(&channel->base, &sckt->out_data.token_pipe_item);
+ }
+
+ if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) {
+ red_channel_pipe_item_remove(&channel->base, &sckt->out_data.data_pipe_item);
+ }
+
+
+ if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) ||
+ !sckt->pushed_close) {
+ __tunnel_socket_add_close_to_pipe(channel, sckt);
+ }
+
+ // we can't call net_slirp_socket_can_receive_notify if the forced close was initiated by
+ // tunnel_socket_send (which was called from slirp). Instead, when
+ // we receive the close ack from the client, we call net_slirp_socket_can_receive_notify
+ if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) {
+ if (!sckt->in_slirp_send) {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
+ net_slirp_socket_abort(sckt->slirp_sckt);
+ } else {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_DELAY_ABORT;
+ }
+ }
+}
+
+static int tunnel_channel_handle_socket_connect_ack(TunnelChannel *channel, RedSocket *sckt,
+ uint32_t tokens)
+{
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG");
+#endif
+ if (channel->mig_inprogress || channel->base.migrate) {
+ sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_OPEN_ACK;
+ sckt->mig_open_ack_tokens = tokens;
+ return TRUE;
+ }
+
+ if (sckt->client_status != CLIENT_SCKT_STATUS_WAIT_OPEN) {
+ red_printf("unexpected REDC_TUNNEL_SOCKET_OPEN_ACK status=%d", sckt->client_status);
+ return FALSE;
+ }
+ sckt->client_status = CLIENT_SCKT_STATUS_OPEN;
+
+ // SLIRP_SCKT_STATUS_CLOSED is possible after waiting for a connection has timed out
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
+ ASSERT(!sckt->pushed_close);
+ __tunnel_socket_add_close_to_pipe(channel, sckt);
+ } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) {
+ sckt->out_data.window_size = tokens;
+ sckt->out_data.num_tokens = tokens;
+ net_slirp_socket_connected_notify(sckt->slirp_sckt);
+ } else {
+ red_printf("unexpected slirp status status=%d", sckt->slirp_status);
+ return FALSE;
+ }
+
+ return (!CHECK_TUNNEL_ERROR(channel));
+}
+
+static int tunnel_channel_handle_socket_connect_nack(TunnelChannel *channel, RedSocket *sckt)
+{
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ if (channel->mig_inprogress || channel->base.migrate) {
+ sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_OPEN_NACK;
+ return TRUE;
+ }
+
+ if (sckt->client_status != CLIENT_SCKT_STATUS_WAIT_OPEN) {
+ red_printf("unexpected REDC_TUNNEL_SOCKET_OPEN_NACK status=%d", sckt->client_status);
+ return FALSE;
+ }
+ sckt->client_status = CLIENT_SCKT_STATUS_CLOSED;
+
+ if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) {
+ net_slirp_socket_connect_failed_notify(sckt->slirp_sckt);
+ } else {
+ tunnel_worker_free_socket(channel->worker, sckt);
+ }
+
+ return (!CHECK_TUNNEL_ERROR(channel));
+}
+
+static int tunnel_channel_handle_socket_fin(TunnelChannel *channel, RedSocket *sckt)
+{
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ if (channel->mig_inprogress || channel->base.migrate) {
+ sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_FIN;
+ return TRUE;
+ }
+
+ if (sckt->client_status != CLIENT_SCKT_STATUS_OPEN) {
+ red_printf("unexpected REDC_TUNNEL_SOCKET_FIN status=%d", sckt->client_status);
+ return FALSE;
+ }
+ sckt->client_status = CLIENT_SCKT_STATUS_SHUTDOWN_SEND;
+
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT)) {
+ return TRUE;
+ }
+
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) {
+ // After slirp will receive all the data buffers, the next recv
+ // will return an error and shutdown_recv should be called.
+ net_slirp_socket_can_receive_notify(sckt->slirp_sckt);
+ } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) {
+ // it already received the FIN
+ red_printf("unexpected slirp status=%d", sckt->slirp_status);
+ return FALSE;
+ }
+
+ return (!CHECK_TUNNEL_ERROR(channel));
+}
+
+static int tunnel_channel_handle_socket_closed(TunnelChannel *channel, RedSocket *sckt)
+{
+ int prev_client_status = sckt->client_status;
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+
+ if (channel->mig_inprogress || channel->base.migrate) {
+ sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_CLOSED;
+ return TRUE;
+ }
+
+ sckt->client_status = CLIENT_SCKT_STATUS_CLOSED;
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
+ // if we already pushed close to the client, we expect it to send us ack.
+ // Otherwise, we will send it an ack.
+ if (!sckt->pushed_close) {
+ sckt->client_waits_close_ack = TRUE;
+ __tunnel_socket_add_close_ack_to_pipe(channel, sckt);
+ }
+
+ return (!CHECK_TUNNEL_ERROR(channel));
+ }
+
+ // close was initiated by client
+ sckt->client_waits_close_ack = TRUE;
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) {
+ // guest waits for fin: after slirp will receive all the data buffers,
+ // the next recv will return an error and shutdown_recv should be called.
+ net_slirp_socket_can_receive_notify(sckt->slirp_sckt);
+ } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
+ net_slirp_socket_abort(sckt->slirp_sckt);
+ } else if ((sckt->slirp_status != SLIRP_SCKT_STATUS_WAIT_CLOSE) ||
+ (prev_client_status != CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) {
+ // slirp can be in wait close if both slirp and client sent fin perviously
+ // otherwise, the prev client status would also have been wait close, and this
+ // case was handled above
+ red_printf("unexpected slirp_status=%d", sckt->slirp_status);
+ return FALSE;
+ }
+
+ return (!CHECK_TUNNEL_ERROR(channel));
+}
+
+static int tunnel_channel_handle_socket_closed_ack(TunnelChannel *channel, RedSocket *sckt)
+{
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ if (channel->mig_inprogress || channel->base.migrate) {
+ sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_CLOSED_ACK;
+ return TRUE;
+ }
+
+ sckt->client_status = CLIENT_SCKT_STATUS_CLOSED;
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
+ net_slirp_socket_abort(sckt->slirp_sckt);
+ return (!CHECK_TUNNEL_ERROR(channel));
+ }
+
+ if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) {
+ red_printf("unexcpected REDC_TUNNEL_SOCKET_CLOSED_ACK slirp_status=%d",
+ sckt->slirp_status);
+ return FALSE;
+ }
+
+ tunnel_worker_free_socket(channel->worker, sckt);
+ return (!CHECK_TUNNEL_ERROR(channel));
+}
+
+static int tunnel_channel_handle_socket_receive_data(TunnelChannel *channel, RedSocket *sckt,
+ RedSocketRawRcvBuf *recv_data, int buf_size)
+{
+ if ((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) ||
+ (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED)) {
+ red_printf("unexcpected REDC_TUNNEL_SOCKET_DATA clinet_status=%d",
+ sckt->client_status);
+ return FALSE;
+ }
+
+ // handling a case where the client sent data before it recieved the close msg
+ if ((sckt->slirp_status != SLIRP_SCKT_STATUS_OPEN) &&
+ (sckt->slirp_status != SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) {
+ __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data);
+ return (!CHECK_TUNNEL_ERROR(channel));
+ } else if ((sckt->in_data.num_buffers == MAX_SOCKET_IN_BUFFERS) &&
+ !channel->mig_inprogress && !channel->base.migrate) {
+ red_printf("socket in buffers overflow, socket will be closed"
+ " (local_port=%d, service_id=%d)",
+ ntohs(sckt->local_port), sckt->far_service->id);
+ __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data);
+ tunnel_socket_force_close(channel, sckt);
+ return (!CHECK_TUNNEL_ERROR(channel));
+ }
+
+ tunnel_socket_assign_rcv_buf(sckt, recv_data, buf_size);
+ if (!sckt->in_data.client_total_num_tokens) {
+ red_printf("token vailoation");
+ return FALSE;
+ }
+
+ --sckt->in_data.client_total_num_tokens;
+ __process_rcv_buf_tokens(channel, sckt);
+
+ if (sckt->in_data.ready_chunks_queue.head && !channel->mig_inprogress) {
+ net_slirp_socket_can_receive_notify(sckt->slirp_sckt);
+ }
+
+ return (!CHECK_TUNNEL_ERROR(channel));
+}
+
+static inline int __client_socket_can_receive(RedSocket *sckt)
+{
+ return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
+ (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) &&
+ !sckt->worker->channel->mig_inprogress);
+}
+
+static int tunnel_channel_handle_socket_token(TunnelChannel *channel, RedSocket *sckt,
+ RedcTunnelSocketTokens *message)
+{
+ sckt->out_data.num_tokens += message->num_tokens;
+
+ if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head &&
+ !red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) {
+ // data is pending to be sent
+ sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
+ red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item);
+ }
+
+ return TRUE;
+}
+
+static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, RedDataHeader *msg_header)
+{
+ TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
+ if (msg_header->type == REDC_TUNNEL_SOCKET_DATA) {
+ return (__tunnel_worker_alloc_socket_rcv_buf(tunnel_channel->worker)->buf);
+ } else if ((msg_header->type == REDC_MIGRATE_DATA) ||
+ (msg_header->type == REDC_TUNNEL_SERVICE_ADD)) {
+ uint8_t *ret = malloc(msg_header->size);
+ if (!ret) {
+ red_error("failed allocating");
+ }
+ return ret;
+ } else {
+ return (tunnel_channel->control_rcv_buf);
+ }
+}
+
+// called by the receive routine of the channel, before the buffer was assigned to a socket
+static void tunnel_channel_release_msg_rcv_buf(RedChannel *channel, RedDataHeader *msg_header,
+ uint8_t *msg)
+{
+ TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
+ if (msg_header->type == REDC_TUNNEL_SOCKET_DATA) {
+ ASSERT(!(CONTAINEROF(msg, RedSocketRawRcvBuf, buf)->base.usr_opaque));
+ __tunnel_worker_free_socket_rcv_buf(tunnel_channel->worker,
+ CONTAINEROF(msg, RedSocketRawRcvBuf, buf));
+ }
+}
+
+static void __tunnel_channel_fill_service_migrate_item(TunnelChannel *channel,
+ TunnelService *service,
+ TunnelMigrateServiceItem *migrate_item)
+{
+ migrate_item->service = service;
+ TunnelMigrateService *general_data;
+ if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
+ general_data = &migrate_item->u.generic_service;
+ } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ general_data = &migrate_item->u.print_service.base;
+ memcpy(migrate_item->u.print_service.ip, ((TunnelPrintService *)service)->ip, 4);
+ } else {
+ red_error("unexpected service type");
+ }
+
+ general_data->type = service->type;
+ general_data->id = service->id;
+ general_data->group = service->group;
+ general_data->port = service->port;
+ memcpy(general_data->virt_ip, &service->virt_ip.s_addr, 4);
+}
+
+static void __tunnel_channel_fill_socket_migrate_item(TunnelChannel *channel, RedSocket *sckt,
+ TunnelMigrateSocketItem *migrate_item)
+{
+ TunnelMigrateSocket *mig_sckt = &migrate_item->mig_socket;
+ migrate_item->socket = sckt;
+ mig_sckt->connection_id = sckt->connection_id;
+ mig_sckt->local_port = sckt->local_port;
+ mig_sckt->far_service_id = sckt->far_service->id;
+ mig_sckt->client_status = sckt->client_status;
+ mig_sckt->slirp_status = sckt->slirp_status;
+
+ mig_sckt->pushed_close = sckt->pushed_close;
+ mig_sckt->client_waits_close_ack = sckt->client_waits_close_ack;
+
+ mig_sckt->mig_client_status_msg = sckt->mig_client_status_msg;
+ mig_sckt->mig_open_ack_tokens = sckt->mig_open_ack_tokens;
+
+ mig_sckt->out_data.num_tokens = sckt->out_data.num_tokens;
+ mig_sckt->out_data.window_size = sckt->out_data.window_size;
+
+ // checking if there is a need to save the queues
+ if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) &&
+ (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED) &&
+ (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED_ACK)) {
+ mig_sckt->out_data.process_queue_size =
+ sckt->out_data.process_queue->get_migrate_data_proc(sckt->out_data.process_queue,
+ &migrate_item->out_process_queue);
+ }
+
+ mig_sckt->in_data.num_tokens = sckt->in_data.num_tokens;
+ mig_sckt->in_data.client_total_num_tokens = sckt->in_data.client_total_num_tokens;
+
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) {
+ mig_sckt->in_data.process_queue_size =
+ sckt->in_data.process_queue->get_migrate_data_proc(sckt->in_data.process_queue,
+ &migrate_item->in_process_queue);
+ }
+
+ if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) {
+ migrate_item->slirp_socket_size = net_slirp_tcp_socket_export(sckt->slirp_sckt,
+ &migrate_item->slirp_socket);
+ if (!migrate_item->slirp_socket) {
+ SET_TUNNEL_ERROR(channel, "failed export slirp socket");
+ }
+ } else {
+ migrate_item->slirp_socket_size = 0;
+ migrate_item->slirp_socket = NULL;
+ }
+}
+
+static void release_migrate_item(TunnelMigrateItem *item);
+static int tunnel_channel_handle_migrate_mark(TunnelChannel *channel)
+{
+ TunnelMigrateItem *migrate_item = NULL;
+ TunnelService *service;
+ TunnelMigrateServiceItem *mig_service;
+ int num_sockets_saved = 0;
+ RedSocket *sckt;
+
+ if (!channel->expect_migrate_mark) {
+ red_printf("unexpected");
+ return FALSE;
+ }
+ channel->expect_migrate_mark = FALSE;
+ migrate_item = (TunnelMigrateItem *)malloc(sizeof(*migrate_item));
+
+ if (!migrate_item) {
+ red_error("failed alloc TunnelMigrateItem");
+ }
+
+ memset(migrate_item, 0, sizeof(*migrate_item));
+ migrate_item->base.type = PIPE_ITEM_TYPE_MIGRATE_DATA;
+
+ migrate_item->slirp_state_size = net_slirp_state_export(&migrate_item->slirp_state);
+ if (!migrate_item->slirp_state) {
+ red_printf("failed export slirp state");
+ goto error;
+ }
+
+ migrate_item->services_list_size = sizeof(TunnelMigrateServicesList) +
+ (sizeof(uint32_t)*channel->worker->num_services);
+ migrate_item->services_list =
+ (TunnelMigrateServicesList *)malloc(migrate_item->services_list_size);
+ if (!migrate_item->services_list) {
+ red_error("failed alloc services list");
+ }
+ migrate_item->services_list->num_services = channel->worker->num_services;
+
+ migrate_item->services = (TunnelMigrateServiceItem *)malloc(
+ channel->worker->num_services * sizeof(TunnelMigrateServiceItem));
+
+ if (!migrate_item->services) {
+ red_error("failed alloc services items");
+ }
+
+ for (mig_service = migrate_item->services,
+ service = (TunnelService *)ring_get_head(&channel->worker->services);
+ service;
+ mig_service++,
+ service = (TunnelService *)ring_next(&channel->worker->services, &service->ring_item)) {
+ __tunnel_channel_fill_service_migrate_item(channel, service, mig_service);
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ goto error;
+ }
+ }
+
+ migrate_item->sockets_list_size = sizeof(TunnelMigrateSocketList) +
+ (sizeof(uint32_t)*channel->worker->num_sockets);
+ migrate_item->sockets_list = (TunnelMigrateSocketList *)malloc(
+ migrate_item->sockets_list_size);
+ if (!migrate_item->sockets_list) {
+ red_error("failed alloc sockets list");
+ }
+
+ migrate_item->sockets_list->num_sockets = channel->worker->num_sockets;
+
+ for (sckt = channel->worker->sockets; num_sockets_saved < channel->worker->num_sockets;
+ sckt++) {
+ if (sckt->allocated) {
+ __tunnel_channel_fill_socket_migrate_item(channel, sckt,
+ &migrate_item->sockets_data[
+ num_sockets_saved++]);
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ goto error;
+ }
+ }
+ }
+
+ red_channel_pipe_add((RedChannel *)channel, &migrate_item->base);
+
+ return TRUE;
+error:
+ release_migrate_item(migrate_item);
+ return FALSE;
+}
+
+static void release_migrate_item(TunnelMigrateItem *item)
+{
+ if (!item) {
+ return;
+ }
+
+ int i;
+ if (item->sockets_list) {
+ int num_sockets = item->sockets_list->num_sockets;
+ for (i = 0; i < num_sockets; i++) {
+ if (item->sockets_data[i].socket) { // handling errors in the middle of
+ // __tunnel_channel_fill_socket_migrate_item
+ if (item->sockets_data[i].out_process_queue) {
+ item->sockets_data[i].socket->out_data.process_queue->release_migrate_data_proc(
+ item->sockets_data[i].socket->out_data.process_queue,
+ item->sockets_data[i].out_process_queue);
+ }
+ if (item->sockets_data[i].in_process_queue) {
+ item->sockets_data[i].socket->in_data.process_queue->release_migrate_data_proc(
+ item->sockets_data[i].socket->in_data.process_queue,
+ item->sockets_data[i].in_process_queue);
+ }
+ }
+
+ free(item->sockets_data[i].slirp_socket);
+ }
+ free(item->sockets_list);
+ }
+
+ free(item->services);
+ free(item->services_list);
+ free(item->slirp_state);
+ free(item);
+}
+
+typedef RawTunneledBuffer *(*socket_alloc_buffer_proc_t)(RedSocket *sckt);
+
+typedef struct RedSocketRestoreTokensBuf {
+ RedSocketRawRcvBuf base;
+ int num_tokens;
+} RedSocketRestoreTokensBuf;
+
+// not updating tokens
+static void restored_rcv_buf_release(RawTunneledBuffer *buf)
+{
+ RedSocket *sckt = (RedSocket *)buf->usr_opaque;
+ --sckt->in_data.num_buffers;
+ __tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf);
+ // for case that ready queue is empty and the client has no tokens
+ __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+}
+
+RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt)
+{
+ RedSocketRawRcvBuf *buf = __tunnel_worker_alloc_socket_rcv_buf(sckt->worker);
+ buf->base.usr_opaque = sckt;
+ buf->base.release_proc = restored_rcv_buf_release;
+
+ sckt->in_data.num_buffers++;
+ return &buf->base;
+}
+
+static void restore_tokens_buf_release(RawTunneledBuffer *buf)
+{
+ RedSocketRestoreTokensBuf *tokens_buf = (RedSocketRestoreTokensBuf *)buf;
+ RedSocket *sckt = (RedSocket *)buf->usr_opaque;
+
+ sckt->in_data.num_tokens += tokens_buf->num_tokens;
+ __process_rcv_buf_tokens(sckt->worker->channel, sckt);
+
+ free(tokens_buf);
+}
+
+RawTunneledBuffer *__tunnel_socket_alloc_restore_tokens_buf(RedSocket *sckt, int num_tokens)
+{
+ RedSocketRestoreTokensBuf *buf = (RedSocketRestoreTokensBuf *)malloc(sizeof(*buf));
+ if (!buf) {
+ red_error("failed alloc");
+ }
+ memset(buf, 0, sizeof(*buf));
+
+ buf->base.base.usr_opaque = sckt;
+ buf->base.base.refs = 1;
+ buf->base.base.release_proc = restore_tokens_buf_release;
+ buf->num_tokens = num_tokens;
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL DBG: num_tokens=%d", num_tokens);
+#endif
+ return &buf->base.base;
+}
+
+static void __restore_ready_chunks_queue(RedSocket *sckt, ReadyTunneledChunkQueue *queue,
+ uint8_t *data, int size,
+ socket_alloc_buffer_proc_t alloc_buf)
+{
+ int copied = 0;
+
+ while (copied < size) {
+ RawTunneledBuffer *buf = alloc_buf(sckt);
+ int copy_count = MIN(size - copied, buf->max_size);
+ memcpy(buf->data, data + copied, copy_count);
+ copied += copy_count;
+ buf->size = copy_count;
+ ready_queue_add_orig_chunk(queue, buf, buf->data, buf->size);
+ tunneled_buffer_unref(buf);
+ }
+}
+
+// not using the alloc_buf cb of the queue, since we may want to create the migrated buffers
+// with other properties (e.g., not releasing tokent)
+static void __restore_process_queue(RedSocket *sckt, TunneledBufferProcessQueue *queue,
+ uint8_t *data, int size,
+ socket_alloc_buffer_proc_t alloc_buf)
+{
+ int copied = 0;
+
+ while (copied < size) {
+ RawTunneledBuffer *buf = alloc_buf(sckt);
+ int copy_count = MIN(size - copied, buf->max_size);
+ memcpy(buf->data, data + copied, copy_count);
+ copied += copy_count;
+ buf->size = copy_count;
+ __process_queue_push(queue, buf);
+ }
+}
+
+static void tunnel_channel_restore_migrated_service(TunnelChannel *channel,
+ TunnelMigrateService *mig_service,
+ uint8_t *data_buf)
+{
+ int service_size;
+ TunnelService *service;
+ struct in_addr virt_ip;
+ if (mig_service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
+ service_size = sizeof(TunnelService);
+ } else if (mig_service->type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ service_size = sizeof(TunnelPrintService);
+ } else {
+ SET_TUNNEL_ERROR(channel, "unexpected service type");
+ return;
+ }
+
+ memcpy(&virt_ip.s_addr, mig_service->virt_ip, 4);
+ service = __tunnel_worker_add_service(channel->worker, service_size,
+ mig_service->type, mig_service->id,
+ mig_service->group, mig_service->port,
+ (char *)(data_buf + mig_service->name),
+ (char *)(data_buf + mig_service->description), &virt_ip);
+ if (!service) {
+ SET_TUNNEL_ERROR(channel, "failed creating service");
+ return;
+ }
+
+ if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ TunnelMigratePrintService *mig_print_service = (TunnelMigratePrintService *)mig_service;
+ TunnelPrintService *print_service = (TunnelPrintService *)service;
+
+ memcpy(print_service->ip, mig_print_service->ip, 4);
+ }
+}
+
+static void tunnel_channel_restore_migrated_socket(TunnelChannel *channel,
+ TunnelMigrateSocket *mig_socket,
+ uint8_t *data_buf)
+{
+ RedSocket *sckt;
+ SlirpSocket *slirp_sckt;
+ RawTunneledBuffer *tokens_buf;
+ TunnelService *service;
+ sckt = channel->worker->sockets + mig_socket->connection_id;
+ sckt->connection_id = mig_socket->connection_id;
+ ASSERT(!sckt->allocated);
+
+ /* Services must be restored before sockets */
+ service = tunnel_worker_find_service_by_id(channel->worker, mig_socket->far_service_id);
+ if (!service) {
+ SET_TUNNEL_ERROR(channel, "service not found");
+ return;
+ }
+
+ tunnel_worker_alloc_socket(channel->worker, sckt, mig_socket->local_port, service, NULL);
+
+ sckt->client_status = mig_socket->client_status;
+ sckt->slirp_status = mig_socket->slirp_status;
+
+ sckt->mig_client_status_msg = mig_socket->mig_client_status_msg;
+ sckt->mig_open_ack_tokens = mig_socket->mig_open_ack_tokens;
+
+ sckt->pushed_close = mig_socket->pushed_close;
+ sckt->client_waits_close_ack = mig_socket->client_waits_close_ack;
+
+ if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) {
+ slirp_sckt = net_slirp_tcp_socket_restore(data_buf + mig_socket->slirp_sckt, sckt);
+ if (!slirp_sckt) {
+ SET_TUNNEL_ERROR(channel, "failed restoring slirp socket");
+ return;
+ }
+ sckt->slirp_sckt = slirp_sckt;
+ }
+ // out data
+ sckt->out_data.num_tokens = mig_socket->out_data.num_tokens;
+ sckt->out_data.window_size = mig_socket->out_data.window_size;
+ sckt->out_data.data_size = mig_socket->out_data.process_buf_size +
+ mig_socket->out_data.ready_buf_size;
+
+ __restore_ready_chunks_queue(sckt, &sckt->out_data.ready_chunks_queue,
+ data_buf + mig_socket->out_data.ready_buf,
+ mig_socket->out_data.ready_buf_size,
+ tunnel_socket_alloc_snd_buf);
+
+ sckt->out_data.process_queue->restore_proc(sckt->out_data.process_queue,
+ data_buf + mig_socket->out_data.process_queue);
+
+ __restore_process_queue(sckt, sckt->out_data.process_queue,
+ data_buf + mig_socket->out_data.process_buf,
+ mig_socket->out_data.process_buf_size,
+ tunnel_socket_alloc_snd_buf);
+
+ sckt->in_data.client_total_num_tokens = mig_socket->in_data.client_total_num_tokens;
+ sckt->in_data.num_tokens = mig_socket->in_data.num_tokens;
+
+ __restore_ready_chunks_queue(sckt, &sckt->in_data.ready_chunks_queue,
+ data_buf + mig_socket->in_data.ready_buf,
+ mig_socket->in_data.ready_buf_size,
+ tunnel_socket_alloc_restored_rcv_buf);
+
+ sckt->in_data.process_queue->restore_proc(sckt->in_data.process_queue,
+ data_buf + mig_socket->in_data.process_queue);
+
+ __restore_process_queue(sckt, sckt->in_data.process_queue,
+ data_buf + mig_socket->in_data.process_buf,
+ mig_socket->in_data.process_buf_size,
+ tunnel_socket_alloc_restored_rcv_buf);
+
+ tokens_buf = __tunnel_socket_alloc_restore_tokens_buf(sckt,
+ SOCKET_WINDOW_SIZE -
+ (sckt->in_data.client_total_num_tokens +
+ sckt->in_data.num_tokens));
+ if (sckt->in_data.process_queue->head) {
+ __process_queue_push(sckt->in_data.process_queue, tokens_buf);
+ } else {
+ ready_queue_add_orig_chunk(&sckt->in_data.ready_chunks_queue, tokens_buf,
+ tokens_buf->data, tokens_buf->size);
+ tunneled_buffer_unref(tokens_buf);
+ }
+}
+
+static void tunnel_channel_restore_socket_state(TunnelChannel *channel, RedSocket *sckt)
+{
+ int ret = TRUE;
+ red_printf("");
+ // handling client status msgs that were received during migration
+ switch (sckt->mig_client_status_msg) {
+ case 0:
+ break;
+ case REDC_TUNNEL_SOCKET_OPEN_ACK:
+ ret = tunnel_channel_handle_socket_connect_ack(channel, sckt,
+ sckt->mig_open_ack_tokens);
+ break;
+ case REDC_TUNNEL_SOCKET_OPEN_NACK:
+ ret = tunnel_channel_handle_socket_connect_nack(channel, sckt);
+ break;
+ case REDC_TUNNEL_SOCKET_FIN:
+ if (sckt->client_status == CLIENT_SCKT_STATUS_WAIT_OPEN) {
+ ret = tunnel_channel_handle_socket_connect_ack(channel, sckt,
+ sckt->mig_open_ack_tokens);
+ }
+ if (ret) {
+ ret = tunnel_channel_handle_socket_fin(channel, sckt);
+ }
+ break;
+ case REDC_TUNNEL_SOCKET_CLOSED:
+ // can't just send nack since we need to send close ack to client
+ if (sckt->client_status == CLIENT_SCKT_STATUS_WAIT_OPEN) {
+ ret = tunnel_channel_handle_socket_connect_ack(channel, sckt,
+ sckt->mig_open_ack_tokens);
+ }
+ ret = ret & tunnel_channel_handle_socket_closed(channel, sckt);
+
+ break;
+ case REDC_TUNNEL_SOCKET_CLOSED_ACK:
+ ret = tunnel_channel_handle_socket_closed_ack(channel, sckt);
+ break;
+ default:
+ SET_TUNNEL_ERROR(channel, "invalid message type %u", sckt->mig_client_status_msg);
+ return;
+ }
+
+ if (!ret) {
+ SET_TUNNEL_ERROR(channel, "failed restoring socket state");
+ return;
+ }
+ sckt->mig_client_status_msg = 0;
+ sckt->mig_open_ack_tokens = 0;
+
+ // handling data transfer
+ if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head) {
+ if (!red_channel_pipe_item_is_linked(
+ &channel->base, &sckt->out_data.data_pipe_item)) {
+ sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
+ red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item);
+ }
+ }
+
+ if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) &&
+ sckt->in_data.ready_chunks_queue.head) {
+ net_slirp_socket_can_receive_notify(sckt->slirp_sckt);
+ }
+
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ return;
+ }
+
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) {
+ net_slirp_socket_can_send_notify(sckt->slirp_sckt);
+ }
+
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ return;
+ }
+ // for cases where the client has no tokens left, but all the data is in the process queue.
+ __process_rcv_buf_tokens(channel, sckt);
+}
+
+static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel)
+{
+ // if we are overgoing migration again, no need to restore the state, we will wait
+ // for the next host.
+ if (!channel->mig_inprogress) {
+ int num_activated = 0;
+ RedSocket *sckt = channel->worker->sockets;
+
+ for (; num_activated < channel->worker->num_sockets; sckt++) {
+ if (sckt->allocated) {
+ tunnel_channel_restore_socket_state(channel, sckt);
+
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ return;
+ }
+
+ num_activated++;
+ }
+ }
+ net_slirp_unfreeze();
+ }
+}
+
+static int tunnel_channel_handle_migrate_data(TunnelChannel *channel,
+ TunnelMigrateData *migrate_data)
+{
+ TunnelMigrateSocketList *sockets_list;
+ TunnelMigrateServicesList *services_list;
+ int i;
+
+ if (!channel->expect_migrate_data) {
+ red_printf("unexpected");
+ goto error;
+ }
+ channel->expect_migrate_data = FALSE;
+
+ if (migrate_data->magic != TUNNEL_MIGRATE_DATA_MAGIC ||
+ migrate_data->version != TUNNEL_MIGRATE_DATA_VERSION) {
+ red_printf("invalid content");
+ goto error;
+ }
+
+ ASSERT(channel->base.send_data.header.serial == 0);
+ channel->base.send_data.header.serial = migrate_data->message_serial;
+
+ net_slirp_state_restore(migrate_data->data + migrate_data->slirp_state);
+
+ services_list = (TunnelMigrateServicesList *)(migrate_data->data +
+ migrate_data->services_list);
+ for (i = 0; i < services_list->num_services; i++) {
+ tunnel_channel_restore_migrated_service(channel,
+ (TunnelMigrateService *)(migrate_data->data +
+ services_list->services[i]),
+ migrate_data->data);
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ red_printf("failed restoring service");
+ goto error;
+ }
+ }
+
+ sockets_list = (TunnelMigrateSocketList *)(migrate_data->data + migrate_data->sockets_list);
+
+ for (i = 0; i < sockets_list->num_sockets; i++) {
+ tunnel_channel_restore_migrated_socket(channel,
+ (TunnelMigrateSocket *)(migrate_data->data +
+ sockets_list->sockets[i]),
+ migrate_data->data);
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ red_printf("failed restoring socket");
+ goto error;
+ }
+ }
+
+ // activate channel
+ channel->base.migrate = FALSE;
+ red_channel_init_outgoing_messages_window(&channel->base);
+
+ tunnel_channel_activate_migrated_sockets(channel);
+
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ goto error;
+ }
+ free(migrate_data);
+ return TRUE;
+error:
+ free(migrate_data);
+ return FALSE;
+}
+
+// msg was allocated by tunnel_channel_alloc_msg_rcv_buf
+static int tunnel_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg)
+{
+ TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
+ RedSocket *sckt = NULL;
+ // retrieve the sckt
+ switch (header->type) {
+ case REDC_MIGRATE_FLUSH_MARK:
+ case REDC_MIGRATE_DATA:
+ case REDC_TUNNEL_SERVICE_ADD:
+ case REDC_TUNNEL_SERVICE_REMOVE:
+ break;
+ case REDC_TUNNEL_SOCKET_OPEN_ACK:
+ case REDC_TUNNEL_SOCKET_OPEN_NACK:
+ case REDC_TUNNEL_SOCKET_DATA:
+ case REDC_TUNNEL_SOCKET_FIN:
+ case REDC_TUNNEL_SOCKET_CLOSED:
+ case REDC_TUNNEL_SOCKET_CLOSED_ACK:
+ case REDC_TUNNEL_SOCKET_TOKEN:
+ // the first field in these messages is connection id
+ sckt = tunnel_channel->worker->sockets + (*((uint16_t *)msg));
+ if (!sckt->allocated) {
+ red_printf("red socket not found");
+ return FALSE;
+ }
+ break;
+ default:
+ return red_channel_handle_message(channel, header, msg);
+ }
+
+ switch (header->type) {
+ case REDC_TUNNEL_SERVICE_ADD:
+ if (header->size < sizeof(RedcTunnelAddGenericService)) {
+ red_printf("bad message size");
+ free(msg);
+ return FALSE;
+ }
+ return tunnel_channel_handle_service_add(tunnel_channel,
+ (RedcTunnelAddGenericService *)msg);
+ case REDC_TUNNEL_SERVICE_REMOVE:
+ red_printf("REDC_TUNNEL_REMOVE_SERVICE not supported yet");
+ return FALSE;
+ case REDC_TUNNEL_SOCKET_OPEN_ACK:
+ if (header->size != sizeof(RedcTunnelSocketOpenAck)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+
+ return tunnel_channel_handle_socket_connect_ack(tunnel_channel, sckt,
+ ((RedcTunnelSocketOpenAck *)msg)->tokens);
+
+ case REDC_TUNNEL_SOCKET_OPEN_NACK:
+ if (header->size != sizeof(RedcTunnelSocketOpenNack)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+
+ return tunnel_channel_handle_socket_connect_nack(tunnel_channel, sckt);
+ case REDC_TUNNEL_SOCKET_DATA:
+ {
+ if (header->size < sizeof(RedcTunnelSocketData)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+
+ return tunnel_channel_handle_socket_receive_data(tunnel_channel, sckt,
+ CONTAINEROF(msg, RedSocketRawRcvBuf, buf),
+ header->size - sizeof(RedcTunnelSocketData));
+ }
+ case REDC_TUNNEL_SOCKET_FIN:
+ if (header->size != sizeof(RedcTunnelSocketFin)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+ return tunnel_channel_handle_socket_fin(tunnel_channel, sckt);
+ case REDC_TUNNEL_SOCKET_CLOSED:
+ if (header->size != sizeof(RedcTunnelSocketClosed)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+ return tunnel_channel_handle_socket_closed(tunnel_channel, sckt);
+ case REDC_TUNNEL_SOCKET_CLOSED_ACK:
+ if (header->size != sizeof(RedcTunnelSocketClosedAck)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+ return tunnel_channel_handle_socket_closed_ack(tunnel_channel, sckt);
+ case REDC_TUNNEL_SOCKET_TOKEN:
+ if (header->size != sizeof(RedcTunnelSocketTokens)) {
+ red_printf("bad message size");
+ return FALSE;
+ }
+
+ return tunnel_channel_handle_socket_token(tunnel_channel, sckt,
+ (RedcTunnelSocketTokens *)msg);
+ case REDC_MIGRATE_FLUSH_MARK:
+ return tunnel_channel_handle_migrate_mark(tunnel_channel);
+ case REDC_MIGRATE_DATA:
+ if (header->size < sizeof(TunnelMigrateData)) {
+ red_printf("bad message size");
+ free(msg);
+ return FALSE;
+ }
+ return tunnel_channel_handle_migrate_data(tunnel_channel, (TunnelMigrateData *)msg);
+ default:
+ return red_channel_handle_message(channel, header, msg);
+ }
+ return TRUE;
+}
+
+/********************************/
+/* outgoing msgs
+********************************/
+
+static void tunnel_channel_send_set_ack(TunnelChannel *channel, PipeItem *item)
+{
+ ASSERT(channel);
+
+ channel->base.send_data.u.ack.generation = ++channel->base.ack_data.generation;
+ channel->base.send_data.u.ack.window = CLIENT_ACK_WINDOW;
+
+ red_channel_init_send_data(&channel->base, RED_SET_ACK, item);
+ red_channel_add_buf(&channel->base, &channel->base.send_data.u.ack, sizeof(RedSetAck));
+
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static void tunnel_channel_send_migrate(TunnelChannel *channel, PipeItem *item)
+{
+ ASSERT(channel);
+ channel->base.send_data.u.migrate.flags = RED_MIGRATE_NEED_FLUSH |
+ RED_MIGRATE_NEED_DATA_TRANSFER;
+ channel->expect_migrate_mark = TRUE;
+ red_channel_init_send_data(&channel->base, RED_MIGRATE, item);
+ red_channel_add_buf(&channel->base, &channel->base.send_data.u.migrate, sizeof(RedMigrate));
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static int __tunnel_channel_send_process_bufs_migrate_data(TunnelChannel *channel,
+ TunneledBufferProcessQueue *queue)
+{
+ int buf_offset = queue->head_offset;
+ RawTunneledBuffer *buf = queue->head;
+ int size = 0;
+
+ while (buf) {
+ red_channel_add_buf(&channel->base, buf->data + buf_offset, buf->size - buf_offset);
+ size += buf->size - buf_offset;
+ buf_offset = 0;
+ buf = buf->next;
+ }
+
+ return size;
+}
+
+static int __tunnel_channel_send_ready_bufs_migrate_data(TunnelChannel *channel,
+ ReadyTunneledChunkQueue *queue)
+{
+ int offset = queue->offset;
+ ReadyTunneledChunk *chunk = queue->head;
+ int size = 0;
+
+ while (chunk) {
+ red_channel_add_buf(&channel->base, chunk->data + offset, chunk->size - offset);
+ size += chunk->size - offset;
+ offset = 0;
+ chunk = chunk->next;
+ }
+ return size;
+}
+
+// returns the size to send
+static int __tunnel_channel_send_service_migrate_data(TunnelChannel *channel,
+ TunnelMigrateServiceItem *item,
+ int offset)
+{
+ TunnelService *service = item->service;
+ int cur_offset = offset;
+ TunnelMigrateService *generic_data;
+
+ if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) {
+ generic_data = &item->u.generic_service;
+ red_channel_add_buf(&channel->base, &item->u.generic_service,
+ sizeof(item->u.generic_service));
+ cur_offset += sizeof(item->u.generic_service);
+ } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) {
+ generic_data = &item->u.print_service.base;
+ red_channel_add_buf(&channel->base, &item->u.print_service,
+ sizeof(item->u.print_service));
+ cur_offset += sizeof(item->u.print_service);
+ } else {
+ red_error("unexpected service type");
+ }
+
+ generic_data->name = cur_offset;
+ red_channel_add_buf(&channel->base, service->name, strlen(service->name) + 1);
+ cur_offset += strlen(service->name) + 1;
+
+ generic_data->description = cur_offset;
+ red_channel_add_buf(&channel->base, service->description, strlen(service->description) + 1);
+ cur_offset += strlen(service->description) + 1;
+
+ return (cur_offset - offset);
+}
+
+// returns the size to send
+static int __tunnel_channel_send_socket_migrate_data(TunnelChannel *channel,
+ TunnelMigrateSocketItem *item, int offset)
+{
+ RedSocket *sckt = item->socket;
+ TunnelMigrateSocket *mig_sckt = &item->mig_socket;
+ int cur_offset = offset;
+ red_channel_add_buf(&channel->base, mig_sckt, sizeof(*mig_sckt));
+ cur_offset += sizeof(*mig_sckt);
+
+ if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) &&
+ (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED) &&
+ (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED_ACK)) {
+ mig_sckt->out_data.process_buf = cur_offset;
+ mig_sckt->out_data.process_buf_size =
+ __tunnel_channel_send_process_bufs_migrate_data(channel,
+ sckt->out_data.process_queue);
+ cur_offset += mig_sckt->out_data.process_buf_size;
+ if (mig_sckt->out_data.process_queue_size) {
+ mig_sckt->out_data.process_queue = cur_offset;
+ red_channel_add_buf(&channel->base, item->out_process_queue,
+ mig_sckt->out_data.process_queue_size);
+ cur_offset += mig_sckt->out_data.process_queue_size;
+ }
+ mig_sckt->out_data.ready_buf = cur_offset;
+ mig_sckt->out_data.ready_buf_size =
+ __tunnel_channel_send_ready_bufs_migrate_data(channel,
+ &sckt->out_data.ready_chunks_queue);
+ cur_offset += mig_sckt->out_data.ready_buf_size;
+ } else {
+ mig_sckt->out_data.process_buf_size = 0;
+ mig_sckt->out_data.ready_buf_size = 0;
+ }
+
+ // notice that we migrate the received buffers without the msg headers.
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) {
+ mig_sckt->in_data.process_buf = cur_offset;
+ mig_sckt->in_data.process_buf_size =
+ __tunnel_channel_send_process_bufs_migrate_data(channel,
+ sckt->in_data.process_queue);
+ cur_offset += mig_sckt->in_data.process_buf_size;
+ if (mig_sckt->in_data.process_queue_size) {
+ mig_sckt->in_data.process_queue = cur_offset;
+ red_channel_add_buf(&channel->base, item->in_process_queue,
+ mig_sckt->in_data.process_queue_size);
+ cur_offset += mig_sckt->in_data.process_queue_size;
+ }
+ mig_sckt->in_data.ready_buf = cur_offset;
+ mig_sckt->in_data.ready_buf_size =
+ __tunnel_channel_send_ready_bufs_migrate_data(channel,
+ &sckt->in_data.ready_chunks_queue);
+ cur_offset += mig_sckt->in_data.ready_buf_size;
+ } else {
+ mig_sckt->in_data.process_buf_size = 0;
+ mig_sckt->in_data.ready_buf_size = 0;
+ }
+
+ if (item->slirp_socket_size) { // zero if socket is closed
+ red_channel_add_buf(&channel->base, item->slirp_socket, item->slirp_socket_size);
+ mig_sckt->slirp_sckt = cur_offset;
+ cur_offset += item->slirp_socket_size;
+ }
+ return (cur_offset - offset);
+}
+
+static void tunnel_channel_send_migrate_data(TunnelChannel *channel, PipeItem *item)
+{
+ TunnelMigrateData *migrate_data = &channel->send_data.u.migrate_data;
+ TunnelMigrateItem *migrate_item = (TunnelMigrateItem *)item;
+ int i;
+
+ uint32_t data_buf_offset = 0; // current location in data[0] field
+ ASSERT(channel);
+
+ migrate_data->magic = TUNNEL_MIGRATE_DATA_MAGIC;
+ migrate_data->version = TUNNEL_MIGRATE_DATA_VERSION;
+ migrate_data->message_serial = red_channel_get_message_serial(&channel->base);
+ red_channel_init_send_data(&channel->base, RED_MIGRATE_DATA, item);
+ red_channel_add_buf(&channel->base, migrate_data, sizeof(*migrate_data));
+
+ migrate_data->slirp_state = data_buf_offset;
+ red_channel_add_buf(&channel->base, migrate_item->slirp_state, migrate_item->slirp_state_size);
+ data_buf_offset += migrate_item->slirp_state_size;
+
+ migrate_data->services_list = data_buf_offset;
+ red_channel_add_buf(&channel->base, migrate_item->services_list,
+ migrate_item->services_list_size);
+ data_buf_offset += migrate_item->services_list_size;
+
+ for (i = 0; i < migrate_item->services_list->num_services; i++) {
+ migrate_item->services_list->services[i] = data_buf_offset;
+ data_buf_offset += __tunnel_channel_send_service_migrate_data(channel,
+ migrate_item->services + i,
+ data_buf_offset);
+ }
+
+
+ migrate_data->sockets_list = data_buf_offset;
+ red_channel_add_buf(&channel->base, migrate_item->sockets_list,
+ migrate_item->sockets_list_size);
+ data_buf_offset += migrate_item->sockets_list_size;
+
+ for (i = 0; i < migrate_item->sockets_list->num_sockets; i++) {
+ migrate_item->sockets_list->sockets[i] = data_buf_offset;
+ data_buf_offset += __tunnel_channel_send_socket_migrate_data(channel,
+ migrate_item->sockets_data + i,
+ data_buf_offset);
+ }
+
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static void tunnel_channel_send_init(TunnelChannel *channel, PipeItem *item)
+{
+ ASSERT(channel);
+
+ channel->send_data.u.init.max_socket_data_size = MAX_SOCKET_DATA_SIZE;
+ channel->send_data.u.init.max_num_of_sockets = MAX_SOCKETS_NUM;
+
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_INIT, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.init, sizeof(RedTunnelInit));
+
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static void tunnel_channel_send_service_ip_map(TunnelChannel *channel, PipeItem *item)
+{
+ TunnelService *service = CONTAINEROF(item, TunnelService, pipe_item);
+
+ channel->send_data.u.service_ip.service_id = service->id;
+ channel->send_data.u.service_ip.virtual_ip.type = RED_TUNNEL_IP_TYPE_IPv4;
+
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SERVICE_IP_MAP, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.service_ip,
+ sizeof(RedTunnelServiceIpMap));
+ red_channel_add_buf(&channel->base, &service->virt_ip.s_addr, sizeof(RedTunnelIPv4));
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static void tunnel_channel_send_socket_open(TunnelChannel *channel, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+
+ channel->send_data.u.socket_open.connection_id = sckt->connection_id;
+ channel->send_data.u.socket_open.service_id = sckt->far_service->id;
+ channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE;
+
+ sckt->in_data.client_total_num_tokens = SOCKET_WINDOW_SIZE;
+ sckt->in_data.num_tokens = 0;
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_OPEN, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.socket_open,
+ sizeof(channel->send_data.u.socket_open));
+
+ red_channel_begin_send_massage(&channel->base);
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+}
+
+static void tunnel_channel_send_socket_fin(TunnelChannel *channel, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+
+ ASSERT(!sckt->out_data.ready_chunks_queue.head);
+
+ if (sckt->out_data.process_queue->head) {
+ red_printf("socket sent FIN but there are still buffers in outgoing process queue"
+ "(local_port=%d, service_id=%d)",
+ ntohs(sckt->local_port), sckt->far_service->id);
+ }
+
+ channel->send_data.u.socket_fin.connection_id = sckt->connection_id;
+
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_FIN, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.socket_fin,
+ sizeof(channel->send_data.u.socket_fin));
+
+ red_channel_begin_send_massage(&channel->base);
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+}
+
+static void tunnel_channel_send_socket_close(TunnelChannel *channel, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+
+ // can happen when it is a forced close
+ if (sckt->out_data.ready_chunks_queue.head) {
+ red_printf("socket closed but there are still buffers in outgoing ready queue"
+ "(local_port=%d, service_id=%d)",
+ ntohs(sckt->local_port),
+ sckt->far_service->id);
+ }
+
+ if (sckt->out_data.process_queue->head) {
+ red_printf("socket closed but there are still buffers in outgoing process queue"
+ "(local_port=%d, service_id=%d)",
+ ntohs(sckt->local_port), sckt->far_service->id);
+ }
+
+ channel->send_data.u.socket_close.connection_id = sckt->connection_id;
+
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_CLOSE, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close,
+ sizeof(channel->send_data.u.socket_close));
+
+ red_channel_begin_send_massage(&channel->base);
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+}
+
+static void tunnel_channel_send_socket_closed_ack(TunnelChannel *channel, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+
+ channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id;
+
+ // pipe item is null because we free the sckt.
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_CLOSED_ACK, NULL);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close_ack,
+ sizeof(channel->send_data.u.socket_close_ack));
+
+ red_channel_begin_send_massage(&channel->base);
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+
+ ASSERT(sckt->client_waits_close_ack && (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED));
+ tunnel_worker_free_socket(channel->worker, sckt);
+ if (CHECK_TUNNEL_ERROR(channel)) {
+ tunnel_shutdown(channel->worker);
+ }
+}
+
+static void tunnel_channel_send_socket_token(TunnelChannel *channel, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, token_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+
+ /* notice that the num of tokens sent can be > SOCKET_TOKENS_TO_SEND, since
+ the sending is performed after the pipe item was pushed */
+
+ channel->send_data.u.socket_token.connection_id = sckt->connection_id;
+
+ if (sckt->in_data.num_tokens > 0) {
+ channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens;
+ } else {
+ ASSERT(!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head);
+ channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS;
+ }
+ sckt->in_data.num_tokens -= channel->send_data.u.socket_token.num_tokens;
+ sckt->in_data.client_total_num_tokens += channel->send_data.u.socket_token.num_tokens;
+ ASSERT(sckt->in_data.client_total_num_tokens <= SOCKET_WINDOW_SIZE);
+
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_TOKEN, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.socket_token,
+ sizeof(channel->send_data.u.socket_token));
+
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static void tunnel_channel_send_socket_out_data(TunnelChannel *channel, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, data_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+ ReadyTunneledChunk *chunk;
+ uint32_t total_push_size = 0;
+ uint32_t pushed_bufs_num = 0;
+
+ ASSERT(!sckt->pushed_close);
+ if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) {
+ return;
+ }
+
+ if (!sckt->out_data.num_tokens) {
+ return; // only when an we will recieve tokens, data will be sent again.
+ }
+
+ ASSERT(sckt->out_data.ready_chunks_queue.head);
+ ASSERT(!sckt->out_data.push_tail);
+ ASSERT(sckt->out_data.ready_chunks_queue.head->size <= MAX_SOCKET_DATA_SIZE);
+
+ channel->send_data.u.socket_data.connection_id = sckt->connection_id;
+
+ red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_DATA, item);
+ red_channel_add_buf(&channel->base, &channel->send_data.u.socket_data,
+ sizeof(channel->send_data.u.socket_data));
+ pushed_bufs_num++;
+
+ // the first chunk is in a valid size
+ chunk = sckt->out_data.ready_chunks_queue.head;
+ total_push_size = chunk->size - sckt->out_data.ready_chunks_queue.offset;
+ red_channel_add_buf(&channel->base, chunk->data + sckt->out_data.ready_chunks_queue.offset,
+ total_push_size);
+ pushed_bufs_num++;
+ sckt->out_data.push_tail = chunk;
+ sckt->out_data.push_tail_size = chunk->size; // all the chunk was sent
+
+ chunk = chunk->next;
+
+ while (chunk && (total_push_size < MAX_SOCKET_DATA_SIZE) && (pushed_bufs_num < MAX_SEND_BUFS)) {
+ uint32_t cur_push_size = MIN(chunk->size, MAX_SOCKET_DATA_SIZE - total_push_size);
+ red_channel_add_buf(&channel->base, chunk->data, cur_push_size);
+ pushed_bufs_num++;
+
+ sckt->out_data.push_tail = chunk;
+ sckt->out_data.push_tail_size = cur_push_size;
+ total_push_size += cur_push_size;
+
+ chunk = chunk->next;
+ }
+
+ sckt->out_data.num_tokens--;
+
+ red_channel_begin_send_massage(&channel->base);
+}
+
+static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem *item)
+{
+ RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, data_pipe_item);
+ RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data);
+
+ ASSERT(sckt_out_data->ready_chunks_queue.head);
+
+ while (sckt_out_data->ready_chunks_queue.head != sckt_out_data->push_tail) {
+ sckt_out_data->data_size -= sckt_out_data->ready_chunks_queue.head->size;
+ ready_queue_pop_chunk(&sckt_out_data->ready_chunks_queue);
+ }
+
+ sckt_out_data->data_size -= sckt_out_data->push_tail_size;
+
+ // compansation. was substructed in the previous lines
+ sckt_out_data->data_size += sckt_out_data->ready_chunks_queue.offset;
+
+ if (sckt_out_data->push_tail_size == sckt_out_data->push_tail->size) {
+ ready_queue_pop_chunk(&sckt_out_data->ready_chunks_queue);
+ sckt_out_data->ready_chunks_queue.offset = 0;
+ } else {
+ sckt_out_data->ready_chunks_queue.offset = sckt_out_data->push_tail_size;
+ }
+
+ sckt_out_data->push_tail = NULL;
+ sckt_out_data->push_tail_size = 0;
+
+ if (worker->channel) {
+ // can still send data to socket
+ if (__client_socket_can_receive(sckt)) {
+ if (sckt_out_data->ready_chunks_queue.head) {
+ // the pipe item may alreay be linked, if for example the send was
+ // blocked and before it finshed and called release, tunnel_socket_send was called
+ if (!red_channel_pipe_item_is_linked(
+ &worker->channel->base, &sckt_out_data->data_pipe_item)) {
+ sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
+ red_channel_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item);
+ }
+ } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
+ __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
+ } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
+ __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
+ }
+ }
+ }
+
+
+ if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) &&
+ !sckt->in_slirp_send && !worker->channel->mig_inprogress) {
+ // for cases that slirp couldn't write whole it data to our socket buffer
+ net_slirp_socket_can_send_notify(sckt->slirp_sckt);
+ }
+}
+
+static void tunnel_channel_send_item(RedChannel *channel, PipeItem *item)
+{
+ TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
+
+ red_channel_reset_send_data(channel);
+ switch (item->type) {
+ case PIPE_ITEM_TYPE_SET_ACK:
+ tunnel_channel_send_set_ack(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_TUNNEL_INIT:
+ tunnel_channel_send_init(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SERVICE_IP_MAP:
+ tunnel_channel_send_service_ip_map(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_OPEN:
+ tunnel_channel_send_socket_open(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_DATA:
+ tunnel_channel_send_socket_out_data(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_FIN:
+ tunnel_channel_send_socket_fin(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_CLOSE:
+ tunnel_channel_send_socket_close(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK:
+ tunnel_channel_send_socket_closed_ack(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_TOKEN:
+ tunnel_channel_send_socket_token(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_MIGRATE:
+ tunnel_channel_send_migrate(tunnel_channel, item);
+ break;
+ case PIPE_ITEM_TYPE_MIGRATE_DATA:
+ tunnel_channel_send_migrate_data(tunnel_channel, item);
+ break;
+ default:
+ red_error("invalid pipe item type");
+ }
+}
+
+/* param item_pushed: distinguishes between a pipe item that was pushed for sending, and
+ a pipe item that is still in the pipe and is released due to disconnection.
+ see red_pipe_item_clear */
+static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item, int item_pushed)
+{
+ if (!item) { // e.g. when acking closed socket
+ return;
+ }
+ switch (item->type) {
+ case PIPE_ITEM_TYPE_SET_ACK:
+ case PIPE_ITEM_TYPE_TUNNEL_INIT:
+ free(item);
+ break;
+ case PIPE_ITEM_TYPE_SERVICE_IP_MAP:
+ case PIPE_ITEM_TYPE_SOCKET_OPEN:
+ case PIPE_ITEM_TYPE_SOCKET_CLOSE:
+ case PIPE_ITEM_TYPE_SOCKET_FIN:
+ case PIPE_ITEM_TYPE_SOCKET_TOKEN:
+ break;
+ case PIPE_ITEM_TYPE_SOCKET_DATA:
+ if (item_pushed) {
+ tunnel_worker_release_socket_out_data(((TunnelChannel *)channel)->worker, item);
+ }
+ break;
+ case PIPE_ITEM_TYPE_MIGRATE:
+ free(item);
+ break;
+ case PIPE_ITEM_TYPE_MIGRATE_DATA:
+ release_migrate_item((TunnelMigrateItem *)item);
+ break;
+ default:
+ red_error("invalid pipe item type");
+ }
+}
+
+/***********************************************************
+* interface for slirp
+************************************************************/
+
+static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface)
+{
+ TunnelWorker *worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+ return worker->vlan_interface->can_send_packet(worker->vlan_interface);
+}
+
+static void qemu_output(SlirpUsrNetworkInterface *usr_interface, const uint8_t *pkt, int pkt_len)
+{
+ TunnelWorker *worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+ worker->vlan_interface->send_packet(worker->vlan_interface, pkt, pkt_len);
+}
+
+static int null_tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
+ struct in_addr src_addr, uint16_t src_port,
+ struct in_addr dst_addr, uint16_t dst_port,
+ SlirpSocket *slirp_s, UserSocket **o_usr_s)
+{
+ errno = ENETUNREACH;
+ return -1;
+}
+
+static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface,
+ struct in_addr src_addr, uint16_t src_port,
+ struct in_addr dst_addr, uint16_t dst_port,
+ SlirpSocket *slirp_s, UserSocket **o_usr_s)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+ TunnelService *far_service;
+
+ ASSERT(usr_interface);
+
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG");
+#endif
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+ ASSERT(worker->channel);
+ ASSERT(!worker->channel->mig_inprogress);
+
+ far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port));
+
+ if (!far_service) {
+ errno = EADDRNOTAVAIL;
+ return -1;
+ }
+
+ if (tunnel_worker_find_socket(worker, src_port, far_service->id)) {
+ red_printf("slirp tried to open a socket that is still opened");
+ errno = EADDRINUSE;
+ return -1;
+ }
+
+ if (worker->num_sockets == MAX_SOCKETS_NUM) {
+ red_printf("number of tunneled sockets exceeds the limit");
+ errno = ENFILE;
+ return -1;
+ }
+
+ sckt = tunnel_worker_create_socket(worker, src_port, far_service, slirp_s);
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ *o_usr_s = sckt;
+ sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN;
+ red_channel_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item);
+
+ errno = EINPROGRESS;
+ return -1;
+}
+
+static int null_tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len, uint8_t urgent)
+{
+ errno = ECONNRESET;
+ return -1;
+}
+
+static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len, uint8_t urgent)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+ size_t size_to_send;
+
+ ASSERT(usr_interface);
+ ASSERT(opaque);
+
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+
+ ASSERT(!worker->channel->mig_inprogress);
+
+ sckt = (RedSocket *)opaque;
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) &&
+ (sckt->client_status != CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) {
+ red_printf("client socket is unable to receive data");
+ errno = ECONNRESET;
+ return -1;
+ }
+
+
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
+ red_printf("send was shutdown");
+ errno = EPIPE;
+ return -1;
+ }
+
+ if (urgent) {
+ SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported");
+ tunnel_shutdown(worker);
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ sckt->in_slirp_send = TRUE;
+
+ if (sckt->out_data.data_size < (sckt->out_data.window_size) * MAX_SOCKET_DATA_SIZE) {
+ // the current data in the queues doesn't fill all the tokens
+ size_to_send = len;
+ } else {
+ if (sckt->out_data.ready_chunks_queue.head) {
+ // there are no tokens for future data, but once the data will be sent
+ // and buffers will be released, we will try to send again.
+ size_to_send = 0;
+ } else {
+ ASSERT(sckt->out_data.process_queue->head);
+ if ((sckt->out_data.data_size + len) >
+ (MAX_SOCKET_OUT_BUFFERS * MAX_SOCKET_DATA_SIZE)) {
+ red_printf("socket out buffers overflow, socket will be closed"
+ " (local_port=%d, service_id=%d)",
+ ntohs(sckt->local_port), sckt->far_service->id);
+ tunnel_socket_force_close(worker->channel, sckt);
+ size_to_send = 0;
+ } else {
+ size_to_send = len;
+ }
+ }
+ }
+
+ if (size_to_send) {
+ process_queue_append(sckt->out_data.process_queue, buf, size_to_send);
+ sckt->out_data.data_size += size_to_send;
+
+ if (sckt->out_data.ready_chunks_queue.head &&
+ !red_channel_pipe_item_is_linked(&worker->channel->base,
+ &sckt->out_data.data_pipe_item)) {
+ sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA;
+ red_channel_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item);
+ }
+ }
+
+ sckt->in_slirp_send = FALSE;
+
+ if (!size_to_send) {
+ errno = EAGAIN;
+ return -1;
+ } else {
+ return size_to_send;
+ }
+}
+
+static inline int __should_send_fin_to_guest(RedSocket *sckt)
+{
+ return (((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) ||
+ ((sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) &&
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND))) &&
+ !sckt->in_data.ready_chunks_queue.head);
+}
+
+static int null_tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len)
+{
+ errno = ECONNRESET;
+ return -1;
+}
+
+static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque,
+ uint8_t *buf, size_t len)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+ int copied = 0;
+
+ ASSERT(usr_interface);
+ ASSERT(opaque);
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+
+ ASSERT(!worker->channel->mig_inprogress);
+
+ sckt = (RedSocket *)opaque;
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
+ errno = EAGAIN;
+ return -1;
+ }
+
+ if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) ||
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) {
+ SET_TUNNEL_ERROR(worker->channel, "recieve was shutdown");
+ tunnel_shutdown(worker);
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
+ SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected");
+ tunnel_shutdown(worker);
+ errno = ECONNRESET;
+ return -1;
+ }
+
+ ASSERT((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
+ (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) ||
+ ((sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) &&
+ (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)));
+
+
+ // if there is data in ready queue, when it is acked, slirp will call recv and get 0
+ if (__should_send_fin_to_guest(sckt)) {
+ if (sckt->in_data.process_queue->head) {
+ red_printf("client socket sent FIN but there are still buffers in incoming process"
+ "queue (local_port=%d, service_id=%d)",
+ ntohs(sckt->local_port), sckt->far_service->id);
+ }
+ return 0; // slirp will call shutdown recv now and it will also send FIN to the guest.
+ }
+
+ while (sckt->in_data.ready_chunks_queue.head && (copied < len)) {
+ ReadyTunneledChunk *cur_chunk = sckt->in_data.ready_chunks_queue.head;
+ int copy_count = MIN(cur_chunk->size - sckt->in_data.ready_chunks_queue.offset,
+ len - copied);
+
+ memcpy(buf + copied, cur_chunk->data + sckt->in_data.ready_chunks_queue.offset, copy_count);
+ copied += copy_count;
+ if ((sckt->in_data.ready_chunks_queue.offset + copy_count) == cur_chunk->size) {
+ ready_queue_pop_chunk(&sckt->in_data.ready_chunks_queue);
+ sckt->in_data.ready_chunks_queue.offset = 0;
+ } else {
+ ASSERT(copied == len);
+ sckt->in_data.ready_chunks_queue.offset += copy_count;
+ }
+ }
+
+ if (!copied) {
+ errno = EAGAIN;
+ return -1;
+ } else {
+ return copied;
+ }
+}
+
+static void null_tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface,
+ UserSocket *opaque)
+{
+}
+
+// can be called : 1) when a FIN is reqested from the guset 2) after shutdown rcv that was called
+// after recieved failed because the client socket was sent FIN
+static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+
+ ASSERT(usr_interface);
+ ASSERT(opaque);
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+ sckt = (RedSocket *)opaque;
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ ASSERT(!worker->channel->mig_inprogress);
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) {
+ return;
+ }
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_SHUTDOWN_SEND;
+ } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) {
+ ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND);
+ sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
+ } else {
+ SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d",
+ sckt->slirp_status);
+ tunnel_shutdown(worker);
+ return;
+ }
+
+ if ((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
+ (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) {
+ // check if there is still data to send. the fin will be sent after data is released
+ // channel is alive, otherwise the sockets would have been aborted
+ if (!sckt->out_data.ready_chunks_queue.head) {
+ __tunnel_socket_add_fin_to_pipe(worker->channel, sckt);
+ }
+ } else { // if client is closed, it means the connection was aborted since we didn't
+ // received fin from guest
+ SET_TUNNEL_ERROR(worker->channel,
+ "unexpected tunnel_socket_shutdown_send client_status=%d",
+ sckt->client_status);
+ tunnel_shutdown(worker);
+ }
+}
+
+static void null_tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface,
+ UserSocket *opaque)
+{
+}
+
+static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+
+ ASSERT(usr_interface);
+ ASSERT(opaque);
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+ sckt = (RedSocket *)opaque;
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ ASSERT(!worker->channel->mig_inprogress);
+
+ /* failure in recv can happen after the client sckt was shutdown
+ (after client sent FIN, or after slirp sent FIN and client socket was closed */
+ if (!__should_send_fin_to_guest(sckt)) {
+ SET_TUNNEL_ERROR(worker->channel,
+ "unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d",
+ sckt->client_status, sckt->slirp_status);
+ tunnel_shutdown(worker);
+ return;
+ }
+
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_SHUTDOWN_RECV;
+ } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
+ } else {
+ SET_TUNNEL_ERROR(worker->channel,
+ "unexpected tunnel_socket_shutdown_recv slirp_status=%d",
+ sckt->slirp_status);
+ tunnel_shutdown(worker);
+ }
+}
+
+static void null_tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+
+ ASSERT(usr_interface);
+ ASSERT(opaque);
+
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+
+ sckt = (RedSocket *)opaque;
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+ sckt->slirp_status = SLIRP_SCKT_STATUS_CLOSED;
+
+ if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) {
+ tunnel_worker_free_socket(worker, sckt);
+ } // else, it will be closed when disconnect will be called (because this callback is
+ // set if the channel is disconnect or when we are in the middle of disconnection that
+ // was caused by an error
+}
+
+// can be called during migration due to the channel diconnect. But it does not affect the
+// migrate data
+static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque)
+{
+ TunnelWorker *worker;
+ RedSocket *sckt;
+
+ ASSERT(usr_interface);
+ ASSERT(opaque);
+
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+
+ sckt = (RedSocket *)opaque;
+
+#ifdef DEBUG_NETWORK
+ PRINT_SCKT(sckt);
+#endif
+
+ sckt->slirp_status = SLIRP_SCKT_STATUS_CLOSED;
+
+ // if sckt is not opened yet, close will be sent when we recieve connect ack
+ if ((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) ||
+ (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) {
+ // check if there is still data to send. the close will be sent after data is released.
+ // close may already been pushed if it is a forced close
+ if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) {
+ __tunnel_socket_add_close_to_pipe(worker->channel, sckt);
+ }
+ } else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) {
+ if (sckt->client_waits_close_ack) {
+ __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt);
+ } else {
+ tunnel_worker_free_socket(worker, sckt);
+ }
+ }
+}
+
+static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface,
+ timer_proc_t proc, void *opaque)
+{
+ TunnelWorker *worker;
+
+ ASSERT(usr_interface);
+
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+
+ return (void *)worker->core_interface->create_timer(worker->core_interface,
+ proc, opaque);
+}
+
+static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms)
+{
+ TunnelWorker *worker;
+
+ ASSERT(usr_interface);
+
+ worker = ((RedSlirpNetworkInterface *)usr_interface)->worker;
+#ifdef DEBUG_NETWORK
+ if (!worker->channel) {
+ red_printf("channel not connected");
+ }
+#endif
+ if (worker->channel && worker->channel->mig_inprogress) {
+ SET_TUNNEL_ERROR(worker->channel, "during migration");
+ tunnel_shutdown(worker);
+ return;
+ }
+
+ worker->core_interface->arm_timer(worker->core_interface, (VDObjectRef)timer, ms);
+}
+
+/***********************************************
+* channel interface and other related procedures
+************************************************/
+
+static int tunnel_channel_config_socket(RedChannel *channel)
+{
+ int flags;
+ int delay_val;
+
+ if ((flags = fcntl(channel->peer->socket, F_GETFL)) == -1) {
+ red_printf("accept failed, %s", strerror(errno)); // can't we just use red_error?
+ return FALSE;
+ }
+
+ if (fcntl(channel->peer->socket, F_SETFL, flags | O_NONBLOCK) == -1) {
+ red_printf("accept failed, %s", strerror(errno));
+ return FALSE;
+ }
+
+ delay_val = 1;
+
+ if (setsockopt(channel->peer->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val,
+ sizeof(delay_val)) == -1) {
+ red_printf("setsockopt failed, %s", strerror(errno));
+ }
+
+ return TRUE;
+}
+
+static void tunnel_worker_disconnect_slirp(TunnelWorker *worker)
+{
+ int i;
+
+ net_slirp_set_net_interface(&worker->null_interface.base);
+ for (i = 0; i < MAX_SOCKETS_NUM; i++) {
+ RedSocket *sckt = worker->sockets + i;
+ if (sckt->allocated) {
+ sckt->client_status = CLIENT_SCKT_STATUS_CLOSED;
+ sckt->client_waits_close_ack = FALSE;
+ if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) {
+ tunnel_worker_free_socket(worker, sckt);
+ } else {
+ sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE;
+ net_slirp_socket_abort(sckt->slirp_sckt);
+ }
+ }
+ }
+}
+
+/* don't call disconnect from functions that might be called by slirp
+ since it closes all its sockets and slirp is not aware of it */
+static void tunnel_channel_disconnect(RedChannel *channel)
+{
+ TunnelChannel *tunnel_channel = (TunnelChannel *)channel;
+ TunnelWorker *worker;
+ if (!channel) {
+ return;
+ }
+ red_printf("");
+ worker = tunnel_channel->worker;
+
+ tunnel_worker_disconnect_slirp(worker);
+
+ tunnel_worker_clear_routed_network(worker);
+ red_channel_destroy(channel);
+ worker->channel = NULL;
+}
+
+/* interface for reds */
+
+static void on_new_tunnel_channel(TunnelChannel *channel)
+{
+ red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_SET_ACK);
+
+ if (channel->base.migrate) {
+ channel->expect_migrate_data = TRUE;
+ } else {
+ red_channel_init_outgoing_messages_window(&channel->base);
+ red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_TUNNEL_INIT);
+ }
+}
+
+static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
+{
+ TunnelChannel *tunnel_channel;
+ TunnelWorker *worker = (TunnelWorker *)channel->data;
+ if (worker->channel) {
+ tunnel_channel_disconnect(&worker->channel->base);
+ }
+
+ tunnel_channel =
+ (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), peer, worker->core_interface,
+ migration, TRUE,
+ tunnel_channel_config_socket,
+ tunnel_channel_disconnect,
+ tunnel_channel_handle_message,
+ tunnel_channel_alloc_msg_rcv_buf,
+ tunnel_channel_release_msg_rcv_buf,
+ tunnel_channel_send_item,
+ tunnel_channel_release_pipe_item);
+
+ if (!tunnel_channel) {
+ return;
+ }
+
+
+ tunnel_channel->worker = worker;
+ tunnel_channel->worker->channel = tunnel_channel;
+ net_slirp_set_net_interface(&worker->tunnel_interface.base);
+
+ on_new_tunnel_channel(tunnel_channel);
+}
+
+static void handle_tunnel_channel_shutdown(struct Channel *channel)
+{
+ tunnel_channel_disconnect(&((TunnelWorker *)channel->data)->channel->base);
+}
+
+static void handle_tunnel_channel_migrate(struct Channel *channel)
+{
+#ifdef DEBUG_NETWORK
+ red_printf("TUNNEL_DBG: MIGRATE STARTED");
+#endif
+ TunnelChannel *tunnel_channel = ((TunnelWorker *)channel->data)->channel;
+ tunnel_channel->mig_inprogress = TRUE;
+ net_slirp_freeze();
+ red_channel_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE);
+}
+
diff --git a/server/red_tunnel_worker.h b/server/red_tunnel_worker.h
new file mode 100755
index 00000000..5d00a341
--- /dev/null
+++ b/server/red_tunnel_worker.h
@@ -0,0 +1,29 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+
+ Author:
+ yhalperi@redhat.com
+*/
+
+#ifndef _H_RED_TUNNEL_WORKER
+#define _H_RED_TUNNEL_WORKER
+
+#include "vd_interface.h"
+
+void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface);
+
+#endif
diff --git a/server/reds.c b/server/reds.c
index 067304d7..4630e958 100644
--- a/server/reds.c
+++ b/server/reds.c
@@ -47,6 +47,7 @@
#include "red_common.h"
#include "red_dispatcher.h"
#include "snd_worker.h"
+#include "red_tunnel_worker.h"
#include "reds_stat.h"
#include "stat.h"
#include "ring.h"
@@ -83,6 +84,7 @@ static pthread_mutex_t *lock_cs;
static long *lock_count;
uint32_t streaming_video = TRUE;
image_compression_t image_compression = IMAGE_COMPRESS_AUTO_GLZ;
+void *red_tunnel = NULL;
int agent_mouse = TRUE;
static void openssl_init();
@@ -3921,7 +3923,7 @@ int __attribute__ ((visibility ("default"))) spice_parse_args(const char *in_arg
// All SSL parameters should be either on or off.
if (ssl_port != ssl_key || ssl_key != ssl_certs || ssl_certs != ssl_cafile ||
- ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) {
+ ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) {
goto error;
}
@@ -4775,6 +4777,19 @@ static void interface_change_notifier(void *opaque, VDInterface *interface,
return;
}
attach_to_red_agent((VDIPortInterface *)interface);
+ } else if (strcmp(interface->type, VD_INTERFACE_NET_WIRE) == 0) {
+ NetWireInterface * net_wire = (NetWireInterface *)interface;
+ red_printf("VD_INTERFACE_NET_WIRE");
+ if (red_tunnel) {
+ red_printf("net wire already attached");
+ return;
+ }
+ if (interface->major_version != VD_INTERFACE_NET_WIRE_MAJOR ||
+ interface->minor_version < VD_INTERFACE_NET_WIRE_MINOR) {
+ red_printf("unsuported net wire interface");
+ return;
+ }
+ red_tunnel = red_tunnel_attach(core, net_wire);
}
break;
case VD_INTERFACE_REMOVING:
diff --git a/server/reds.h b/server/reds.h
index ce09e5b6..248edffa 100644
--- a/server/reds.h
+++ b/server/reds.h
@@ -27,6 +27,9 @@ typedef struct RedsStreamContext {
int socket;
+ /* set it to TRUE if you shutdown the socket. shutdown read doesn't work as accepted -
+ receive may return data afterwards. check the flag before calling receive*/
+ int shutdown;
SSL *ssl;
int (*cb_write)(void *, void *, int);
diff --git a/server/vd_interface.h b/server/vd_interface.h
index 932c0b13..12dbd5ff 100644
--- a/server/vd_interface.h
+++ b/server/vd_interface.h
@@ -330,5 +330,23 @@ struct VDIPortInterface {
int (*read)(VDIPortInterface *port, VDObjectRef plug, uint8_t *buf, int len);
};
+#define VD_INTERFACE_NET_WIRE "net_wire"
+#define VD_INTERFACE_NET_WIRE_MAJOR 1
+#define VD_INTERFACE_NET_WIRE_MINOR 1
+
+typedef struct NetWireInterface NetWireInterface;
+typedef void (*net_wire_packet_route_proc_t)(void *opaque, const uint8_t *pkt, int pkt_len);
+
+struct NetWireInterface {
+ VDInterface base;
+
+ struct in_addr (*get_ip)(NetWireInterface *vlan);
+ int (*can_send_packet)(NetWireInterface *vlan);
+ void (*send_packet)(NetWireInterface *vlan, const uint8_t *buf, int size);
+ VDObjectRef (*register_route_packet)(NetWireInterface *vlan, net_wire_packet_route_proc_t proc,
+ void *opaque);
+ void (*unregister_route_packet)(NetWireInterface *vlan, VDObjectRef proc);
+};
+
#endif