diff options
-rw-r--r-- | client/Makefile.am | 4 | ||||
-rw-r--r-- | client/application.cpp | 10 | ||||
-rw-r--r-- | client/client_net_socket.cpp | 386 | ||||
-rw-r--r-- | client/client_net_socket.h | 154 | ||||
-rw-r--r-- | client/red_channel.cpp | 38 | ||||
-rw-r--r-- | client/red_peer.cpp | 46 | ||||
-rw-r--r-- | client/red_peer.h | 17 | ||||
-rw-r--r-- | client/tunnel_channel.cpp | 792 | ||||
-rw-r--r-- | client/tunnel_channel.h | 138 | ||||
-rw-r--r-- | client/windows/platform_utils.cpp | 17 | ||||
-rw-r--r-- | client/windows/platform_utils.h | 22 | ||||
-rw-r--r-- | client/windows/redc.vcproj | 16 | ||||
-rw-r--r-- | client/x11/Makefile.am | 6 | ||||
-rw-r--r-- | client/x11/platform_utils.h | 21 | ||||
-rw-r--r-- | common/red.h | 142 | ||||
-rw-r--r-- | configure.ac | 5 | ||||
-rw-r--r-- | server/Makefile.am | 6 | ||||
-rw-r--r-- | server/red_channel.c | 520 | ||||
-rw-r--r-- | server/red_channel.h | 182 | ||||
-rw-r--r-- | server/red_tunnel_worker.c | 3510 | ||||
-rwxr-xr-x | server/red_tunnel_worker.h | 29 | ||||
-rw-r--r-- | server/reds.c | 17 | ||||
-rw-r--r-- | server/reds.h | 3 | ||||
-rw-r--r-- | server/vd_interface.h | 18 |
24 files changed, 6016 insertions, 83 deletions
diff --git a/client/Makefile.am b/client/Makefile.am index bb78ba49..51d4857e 100644 --- a/client/Makefile.am +++ b/client/Makefile.am @@ -15,6 +15,8 @@ RED_COMMON_SRCS = \ canvas_utils.cpp \ red_cairo_canvas.cpp \ red_cairo_canvas.h \ + client_net_socket.cpp \ + client_net_socket.h \ cmd_line_parser.cpp \ cmd_line_parser.h \ common.h \ @@ -71,6 +73,8 @@ RED_COMMON_SRCS = \ screen_layer.cpp \ screen_layer.h \ shared_cache.hpp \ + tunnel_channel.cpp \ + tunnel_channel.h \ hot_keys.cpp \ hot_keys.h \ threads.cpp \ diff --git a/client/application.cpp b/client/application.cpp index b30baa84..65937156 100644 --- a/client/application.cpp +++ b/client/application.cpp @@ -40,6 +40,7 @@ #include "quic.h" #include "mutex.h" #include "cmd_line_parser.h" +#include "tunnel_channel.h" #include <log4cpp/BasicConfigurator.hh> #include <log4cpp/FileAppender.hh> @@ -236,7 +237,7 @@ enum AppCommands { Application::Application() : _client (*this) - , _enabled_channels(RED_CHANNEL_END, true) + , _enabled_channels (RED_CHANNEL_END, true) , _main_screen (NULL) , _quitting (false) , _active (false) @@ -1323,6 +1324,7 @@ bool Application::set_channels_security(CmdLineParser& parser, bool on, char *va channels_names["cursor"] = RED_CHANNEL_CURSOR; channels_names["playback"] = RED_CHANNEL_PLAYBACK; channels_names["record"] = RED_CHANNEL_RECORD; + channels_names["tunnel"] = RED_CHANNEL_TUNNEL; if (!strcmp(val, "all")) { if ((val = parser.next_argument())) { @@ -1382,6 +1384,7 @@ bool Application::set_enable_channels(CmdLineParser& parser, bool enable, char * channels_names["cursor"] = RED_CHANNEL_CURSOR; channels_names["playback"] = RED_CHANNEL_PLAYBACK; channels_names["record"] = RED_CHANNEL_RECORD; + channels_names["tunnel"] = RED_CHANNEL_TUNNEL; if (!strcmp(val, "all")) { if ((val = parser.next_argument())) { @@ -1460,6 +1463,7 @@ bool Application::process_cmd_line(int argc, char** argv) _peer_con_opt[RED_CHANNEL_CURSOR] = RedPeer::ConnectionOptions::CON_OP_INVALID; _peer_con_opt[RED_CHANNEL_PLAYBACK] = RedPeer::ConnectionOptions::CON_OP_INVALID; _peer_con_opt[RED_CHANNEL_RECORD] = RedPeer::ConnectionOptions::CON_OP_INVALID; + _peer_con_opt[RED_CHANNEL_TUNNEL] = RedPeer::ConnectionOptions::CON_OP_INVALID; parser.begin(argc, argv); @@ -1595,6 +1599,10 @@ bool Application::process_cmd_line(int argc, char** argv) _client.register_channel_factory(RecordChannel::Factory()); } + if (_enabled_channels[RED_CHANNEL_TUNNEL]) { + _client.register_channel_factory(TunnelChannel::Factory()); + } + _client.init(host.c_str(), port, sport, password.c_str(), auto_display_res); if (auto_display_res) { Monitor* mon = find_monitor(0); diff --git a/client/client_net_socket.cpp b/client/client_net_socket.cpp new file mode 100644 index 00000000..50174795 --- /dev/null +++ b/client/client_net_socket.cpp @@ -0,0 +1,386 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include "common.h" +#include "client_net_socket.h" +#include "debug.h" +#include "red_error_codes.h" +#include "utils.h" + +ClientNetSocket::ClientNetSocket(uint16_t id, const struct in_addr& dst_addr, uint16_t dst_port, + EventsLoop& events_loop, EventHandler& event_handler) + : _id (id) + , _local_addr (dst_addr) + , _local_port (dst_port) + , _peer (INVALID_SOCKET) + , _events_loop (events_loop) + , _event_handler (event_handler) + , _num_recv_tokens (0) + , _send_message (NULL) + , _send_pos (0) + , _status (SOCKET_STATUS_CLOSED) + , _fin_pending (false) + , _close_pending (false) +{ +} + +ClientNetSocket::~ClientNetSocket() +{ + close(); +} + +bool ClientNetSocket::connect(uint32_t recv_tokens) +{ + struct sockaddr_in addr; + int no_delay; + + + ASSERT(_peer == INVALID_SOCKET && _status == SOCKET_STATUS_CLOSED); + + addr.sin_port = _local_port; + addr.sin_addr.s_addr = _local_addr.s_addr; + addr.sin_family = AF_INET; + + if ((_peer = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) == INVALID_SOCKET) { + int err = sock_error(); + THROW("%s: failed to create socket: %s", __FUNCTION__, sock_err_message(err)); + } + + no_delay = 1; + if (setsockopt(_peer, IPPROTO_TCP, TCP_NODELAY, + (const char*)&no_delay, sizeof(no_delay)) == SOCKET_ERROR) { + LOG_WARN("set TCP_NODELAY failed"); + } + + LOG_INFO("connect to ip=%s port=%d (connection_id=%d)", + inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), _id); + + if (::connect(_peer, (struct sockaddr*)&addr, sizeof(sockaddr_in)) == SOCKET_ERROR) { + int err = sock_error(); + closesocket(_peer); + _peer = INVALID_SOCKET; + LOG_INFO("connect to ip=%s port=%d failed %s (connection_id=%d)", + inet_ntoa(addr.sin_addr), ntohs(addr.sin_port), sock_err_message(err), _id); + return false; + } + + _events_loop.add_socket(*this); + _status = SOCKET_STATUS_OPEN; + _num_recv_tokens = recv_tokens; + return true; +} + +void ClientNetSocket::push_disconnect() +{ + if ((_status == SOCKET_STATUS_CLOSED) || _close_pending) { + THROW("%s: disconnect attempt for disconnected socket %s %d", __FUNCTION__, + inet_ntoa(_local_addr), ntohs(_local_port)); + } + + _close_pending = true; + + if (!during_send()) { + close_and_tell(); + } +} + +void ClientNetSocket::push_fin() +{ + if ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_RECEIVED_FIN)) { + _fin_pending = true; + if (!during_send()) { + try { + apply_guest_fin(); + } catch (ClientNetSocket::ShutdownExcpetion&) { + close_and_tell(); + } + } + } else { + THROW("%s: unexpected fin connection_id=%d (status=%d)", __FUNCTION__, + _id, _status); + } +} + +void ClientNetSocket::add_recv_tokens(uint32_t num_tokens) +{ + if ((_status == SOCKET_STATUS_CLOSED) || _close_pending) { + THROW("%s: ack attempt for disconnected socket connection_id=%d", __FUNCTION__, + _id); + } + + _num_recv_tokens += num_tokens; + + // recv might have not been called because tokens weren't available + if (_num_recv_tokens && (_num_recv_tokens == num_tokens)) { + if (can_receive()) { + receive(); + } + } +} + +void ClientNetSocket::push_send(SendBuffer& buf) +{ + if (!can_send()) { + THROW("%s: unexpected send attempt for connection_id=%d (status = %d)", + __FUNCTION__, _id, _status); + } + + if (_fin_pending || _close_pending) { + THROW("%s: unexpected send attempt for connection_id=% - shutdown send pending", + __FUNCTION__, _id); + } + + _send_messages.push_back(buf.ref()); + send(); +} + +void ClientNetSocket::on_event() +{ + if (can_send()) { + send(); + } + + if (!during_send()) { + if (_close_pending) { + close_and_tell(); + } else if (_fin_pending) { + apply_guest_fin(); + } + } + + if (can_receive()) { + receive(); + } +} + +void ClientNetSocket::apply_guest_fin() +{ + if (_status == SOCKET_STATUS_OPEN) { + if (shutdown(_peer, SHUT_WR) == SOCKET_ERROR) { + int err = sock_error(); + LOG_INFO("shutdown in connection_id=%d failed %s", _id, sock_err_message(err)); + throw ClientNetSocket::ShutdownExcpetion(); + } + + _fin_pending = false; + _status = SOCKET_STATUS_SENT_FIN; + } else if (_status == SOCKET_STATUS_RECEIVED_FIN) { + close_and_tell(); + } +} + +void ClientNetSocket::handle_client_fin() +{ + if (_status == SOCKET_STATUS_OPEN) { + _status = SOCKET_STATUS_RECEIVED_FIN; + _event_handler.on_socket_fin_recv(*this); + } else if (_status == SOCKET_STATUS_SENT_FIN) { + close_and_tell(); + } +} + +inline bool ClientNetSocket::during_send() +{ + return ((!_send_messages.empty()) || _send_message); +} + +inline bool ClientNetSocket::can_send() +{ + return ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_RECEIVED_FIN)); +} + +inline bool ClientNetSocket::can_receive() +{ + return ((_status == SOCKET_STATUS_OPEN) || (_status == SOCKET_STATUS_SENT_FIN)); +} + +void ClientNetSocket::send_message_done() +{ + _send_message->unref(); + _send_message = NULL; + _send_pos = 0; + _event_handler.on_socket_message_send_done(*this); +} + +void ClientNetSocket::send() +{ + ASSERT(_peer != INVALID_SOCKET); + try { + if (_send_message) { + _send_pos += send_buf(_send_message->data() + _send_pos, + _send_message->size() - _send_pos); + + if (_send_pos != _send_message->size()) { + return; + } else { + send_message_done(); + } + } + + while (!_send_messages.empty()) { + _send_message = _send_messages.front(); + _send_messages.pop_front(); + _send_pos = send_buf(_send_message->data(), _send_message->size()); + if (_send_pos != _send_message->size()) { + return; + } else { + send_message_done(); + } + } + } catch (ClientNetSocket::SendException&) { + close_and_tell(); + } +} + +uint32_t ClientNetSocket::send_buf(const uint8_t* buf, uint32_t size) +{ + const uint8_t* pos = buf; + ASSERT(_peer != INVALID_SOCKET); + while (size) { + int now; + if ((now = ::send(_peer, (char*)pos, size, MSG_NOSIGNAL)) == SOCKET_ERROR) { + int err = sock_error(); + if (err == WOULDBLOCK_ERR) { + break; + } + + if (err == INTERRUPTED_ERR) { + continue; + } + + LOG_INFO("send in connection_id=%d failed %s", _id, sock_err_message(err)); + throw ClientNetSocket::SendException(); + } + size -= now; + pos += now; + } + return pos - buf; +} + +void ClientNetSocket::receive() +{ + ASSERT(_peer != INVALID_SOCKET); + bool shutdown; + while (_num_recv_tokens) { + ReceiveBuffer& rcv_buf = alloc_receive_buffer(); + uint32_t size; + + try { + size = receive_buf(rcv_buf.buf(), rcv_buf.buf_max_size(), shutdown); + } catch (ClientNetSocket::ReceiveException&) { + rcv_buf.release_buf(); + close_and_tell(); + return; + } + + if (size) { + rcv_buf.set_buf_size(size); + _num_recv_tokens--; + _event_handler.on_socket_message_recv_done(*this, rcv_buf); + } else { + rcv_buf.release_buf(); + } + + if (shutdown) { + handle_client_fin(); + return; + } + + if (size < rcv_buf.buf_max_size()) { + return; + } + } +} + +uint32_t ClientNetSocket::receive_buf(uint8_t* buf, uint32_t max_size, bool& shutdown) +{ + uint8_t* pos = buf; + ASSERT(_peer != INVALID_SOCKET); + + shutdown = false; + + while (max_size) { + int now; + if ((now = ::recv(_peer, (char*)pos, max_size, 0)) <= 0) { + if (now == 0) { + shutdown = true; + break; // a case where fin is received, but before that, there is a msg + } + + int err = sock_error(); + + if (err == WOULDBLOCK_ERR) { + break; + } + + if (err == INTERRUPTED_ERR) { + continue; + } + + LOG_INFO("receive in connection_id=%d failed errno=%s", _id, sock_err_message(err)); + throw ClientNetSocket::ReceiveException(); + } + max_size -= now; + pos += now; + } + + return (pos - buf); +} + +void ClientNetSocket::release_wait_send_messages() +{ + if (_send_message) { + _send_message->unref(); + _send_message = NULL; + _send_pos = 0; + } + + while (!_send_messages.empty()) { + _send_messages.front()->unref(); + _send_messages.pop_front(); + } +} + +void ClientNetSocket::close() +{ + release_wait_send_messages(); + apply_disconnect(); +} + +void ClientNetSocket::close_and_tell() +{ + close(); + _event_handler.on_socket_disconnect(*this); +} + +void ClientNetSocket::apply_disconnect() +{ + if (_peer != INVALID_SOCKET) { + _events_loop.remove_socket(*this); + closesocket(_peer); + _peer = INVALID_SOCKET; + LOG_INFO("closing connection_id=%d", _id); + } + _status = SOCKET_STATUS_CLOSED; + _close_pending = false; + _fin_pending = false; +} + diff --git a/client/client_net_socket.h b/client/client_net_socket.h new file mode 100644 index 00000000..8ca1cfcf --- /dev/null +++ b/client/client_net_socket.h @@ -0,0 +1,154 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + Author: + yhalperi@redhat.com +*/ + +#ifndef _H_CLIENT_NET_SOCKET +#define _H_CLIENT_NET_SOCKET + +#include "platform_utils.h" +#include "common.h" +#include "events_loop.h" + +/* intterface for connenctions inside client LAN */ + +typedef enum { + SOCKET_STATUS_OPEN, + SOCKET_STATUS_SENT_FIN, + SOCKET_STATUS_RECEIVED_FIN, + SOCKET_STATUS_CLOSED, +} SocketStatus; + +class ClientNetSocket: public EventsLoop::Socket { +public: + class ReceiveBuffer; + class SendBuffer; + + class EventHandler; + + class SendException {}; + class ReceiveException {}; + class ShutdownExcpetion {}; + + ClientNetSocket(uint16_t id, const struct in_addr& dst_addr, uint16_t dst_port, + EventsLoop& events_loop, ClientNetSocket::EventHandler& event_handler); + virtual ~ClientNetSocket(); + + bool connect(uint32_t recv_tokens); + void push_disconnect(); + void push_fin(); + void push_send(SendBuffer& buf); + void add_recv_tokens(uint32_t num_tokens); + + bool is_connected() {return _status != SOCKET_STATUS_CLOSED;} + + inline uint16_t id() {return _id;} + inline const struct in_addr& local_addr() {return _local_addr;} + inline uint16_t local_port() {return _local_port;} + + /* EventsLoop::Socket interface */ + void on_event(); + int get_socket() {return _peer;} + +protected: + virtual ReceiveBuffer& alloc_receive_buffer() = 0; + +private: + void send(); + void receive(); + + uint32_t send_buf(const uint8_t* buf, uint32_t size); + uint32_t receive_buf(uint8_t* buf, uint32_t max_size, bool& shutdown); + + bool can_receive(); + bool can_send(); + + void apply_disconnect(); + void apply_guest_fin(); + + void close(); + void close_and_tell(); + + void handle_client_fin(); + + bool during_send(); + void release_wait_send_messages(); + void clear(); + void send_message_done(); + +private: + uint16_t _id; + struct in_addr _local_addr; + uint16_t _local_port; + + SOCKET _peer; + + EventsLoop& _events_loop; + + EventHandler& _event_handler; + + uint32_t _num_recv_tokens; + + std::list<SendBuffer*> _send_messages; + SendBuffer* _send_message; + uint32_t _send_pos; + + SocketStatus _status; + bool _fin_pending; + bool _close_pending; +}; + +class ClientNetSocket::ReceiveBuffer { +public: + ReceiveBuffer() {} + + virtual uint8_t* buf() = 0; + virtual uint32_t buf_max_size() = 0; + virtual void set_buf_size(uint32_t size) = 0; + virtual void release_buf() = 0; + +protected: + virtual ~ReceiveBuffer() {} +}; + +class ClientNetSocket::SendBuffer { +public: + SendBuffer() {}; + + virtual const uint8_t* data() = 0; + virtual uint32_t size() = 0; + virtual ClientNetSocket::SendBuffer* ref() = 0; + virtual void unref() = 0; + +protected: + virtual ~SendBuffer() {} +}; + +class ClientNetSocket::EventHandler { +public: + EventHandler() {} + virtual ~EventHandler() {} + virtual void on_socket_message_recv_done(ClientNetSocket& sckt, ReceiveBuffer& buf) = 0; + virtual void on_socket_message_send_done(ClientNetSocket& sckt) = 0; + virtual void on_socket_disconnect(ClientNetSocket& sckt) = 0; + virtual void on_socket_fin_recv(ClientNetSocket& sckt) = 0; +}; + + + +#endif diff --git a/client/red_channel.cpp b/client/red_channel.cpp index 4c6f1f8f..a82d9f77 100644 --- a/client/red_channel.cpp +++ b/client/red_channel.cpp @@ -133,7 +133,7 @@ void RedChannelBase::link(uint32_t connection_id, const std::string& password) */ if (RSA_public_encrypt(password.length() + 1, (unsigned char *)password.c_str(), (uint8_t *)bufEncrypted.get(), - rsa, RSA_PKCS1_OAEP_PADDING) > 0 ) { + rsa, RSA_PKCS1_OAEP_PADDING) > 0) { send((uint8_t*)bufEncrypted.get(), nRSASize); } else { THROW("could not encrypt password"); @@ -425,8 +425,10 @@ void RedChannel::run() _outgoing_message = NULL; } _incomming_header_pos = 0; - delete _incomming_message; - _incomming_message = NULL; + if (_incomming_message) { + _incomming_message->unref(); + _incomming_message = NULL; + } case DISCONNECT_ACTION: close(); on_disconnect(); @@ -525,19 +527,19 @@ void RedChannel::recive_messages() _incomming_header_pos = n; return; } - std::auto_ptr<CompundInMessage> message(new CompundInMessage(_incomming_header.serial, - _incomming_header.type, - _incomming_header.size, - _incomming_header.sub_list)); - n = RedPeer::recive(message->data(), message->compund_size()); - if (n != message->compund_size()) { + AutoRef<CompundInMessage> message(new CompundInMessage(_incomming_header.serial, + _incomming_header.type, + _incomming_header.size, + _incomming_header.sub_list)); + n = RedPeer::recive((*message)->data(), (*message)->compund_size()); + if (n != (*message)->compund_size()) { _incomming_message = message.release(); _incomming_message_pos = n; return; } on_message_recived(); - _message_handler->handle_message(*message.get()); - on_message_complition(message->serial()); + _message_handler->handle_message(*(*message)); + on_message_complition((*message)->serial()); } } @@ -577,11 +579,11 @@ void RedChannel::on_event() if (_incomming_message_pos != _incomming_message->compund_size()) { return; } - std::auto_ptr<CompundInMessage> message(_incomming_message); + AutoRef<CompundInMessage> message(_incomming_message); _incomming_message = NULL; on_message_recived(); - _message_handler->handle_message(*message.get()); - on_message_complition(message->serial()); + _message_handler->handle_message(*(*message)); + on_message_complition((*message)->serial()); } recive_messages(); } @@ -616,18 +618,18 @@ void RedChannel::handle_migrate(RedPeer::InMessage* message) if (migrate->flags & RED_MIGRATE_NEED_FLUSH) { send_migrate_flush_mark(); } - std::auto_ptr<RedPeer::CompundInMessage> data_message; + AutoRef<CompundInMessage> data_message; if (migrate->flags & RED_MIGRATE_NEED_DATA_TRANSFER) { data_message.reset(recive()); } _client.migrate_channel(*this); if (migrate->flags & RED_MIGRATE_NEED_DATA_TRANSFER) { - if (data_message->type() != RED_MIGRATE_DATA) { + if ((*data_message)->type() != RED_MIGRATE_DATA) { THROW("expect RED_MIGRATE_DATA"); } std::auto_ptr<RedPeer::OutMessage> message(new RedPeer::OutMessage(REDC_MIGRATE_DATA, - data_message->size())); - memcpy(message->data(), data_message->data(), data_message->size()); + (*data_message)->size())); + memcpy(message->data(), (*data_message)->data(), (*data_message)->size()); send(*message); } _loop.add_socket(*this); diff --git a/client/red_peer.cpp b/client/red_peer.cpp index e20d5ca6..dad035d4 100644 --- a/client/red_peer.cpp +++ b/client/red_peer.cpp @@ -16,54 +16,12 @@ */ #include "common.h" -#ifdef _WIN32 -#include <winsock2.h> -#include <ws2tcpip.h> - -#define SHUT_RDWR SD_BOTH -#else -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netinet/tcp.h> -#include <netdb.h> - -#define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 -#define closesocket(sock) ::close(sock) -#endif #include "red.h" #include "red_peer.h" #include "utils.h" #include "debug.h" #include "platform_utils.h" -#ifdef _WIN32 - -int inet_aton(const char *ip, struct in_addr *in_addr) -{ - unsigned long addr = inet_addr(ip); - - if (addr == INADDR_NONE) { - return 0; - } - in_addr->S_un.S_addr = addr; - return 1; -} - -#define SHUTDOWN_ERR WSAESHUTDOWN -#define INTERRUPTED_ERR WSAEINTR -#define WOULDBLOCK_ERR WSAEWOULDBLOCK -#define sock_error() WSAGetLastError() -#define sock_err_message(err) sys_err_to_str(err) -#else -#define SHUTDOWN_ERR EPIPE -#define INTERRUPTED_ERR EINTR -#define WOULDBLOCK_ERR EAGAIN -#define sock_error() errno -#define sock_err_message(err) strerror(err) -#endif - static void ssl_error() { ERR_print_errors_fp(stderr); @@ -326,11 +284,11 @@ uint32_t RedPeer::recive(uint8_t *buf, uint32_t size) RedPeer::CompundInMessage* RedPeer::recive() { RedDataHeader header; - std::auto_ptr<CompundInMessage> message; + AutoRef<CompundInMessage> message; recive((uint8_t*)&header, sizeof(RedDataHeader)); message.reset(new CompundInMessage(header.serial, header.type, header.size, header.sub_list)); - recive(message->data(), message->compund_size()); + recive((*message)->data(), (*message)->compund_size()); return message.release(); } diff --git a/client/red_peer.h b/client/red_peer.h index f1db1814..eb0597d8 100644 --- a/client/red_peer.h +++ b/client/red_peer.h @@ -18,12 +18,6 @@ #ifndef _H_REDPEER #define _H_REDPEER -#ifdef _WIN32 -#include <winsock.h> -#else -typedef int SOCKET; -#endif - #include <openssl/ssl.h> #include <openssl/err.h> @@ -31,6 +25,7 @@ typedef int SOCKET; #include "red.h" #include "events_loop.h" #include "threads.h" +#include "platform_utils.h" class RedPeer: protected EventsLoop::Socket { public: @@ -116,7 +111,7 @@ private: class RedPeer::InMessage { public: - InMessage(uint16_t type, uint32_t size, uint8_t* data) + InMessage(uint16_t type, uint32_t size, uint8_t * data) : _type (type) , _size (size) , _data (data) @@ -139,12 +134,14 @@ class RedPeer::CompundInMessage: public RedPeer::InMessage { public: CompundInMessage(uint64_t _serial, uint16_t type, uint32_t size, uint32_t sub_list) : InMessage(type, size, new uint8_t[size]) + , _refs (1) , _serial (_serial) , _sub_list (sub_list) { } - virtual ~CompundInMessage() { delete[] _data;} + RedPeer::InMessage* ref() { _refs++; return this;} + void unref() {if (!--_refs) delete this;} uint64_t serial() { return _serial;} uint32_t sub_list() { return _sub_list;} @@ -152,7 +149,11 @@ public: virtual uint32_t size() { return _sub_list ? _sub_list : _size;} uint32_t compund_size() {return _size;} +protected: + virtual ~CompundInMessage() { delete[] _data;} + private: + int _refs; uint64_t _serial; uint32_t _sub_list; }; diff --git a/client/tunnel_channel.cpp b/client/tunnel_channel.cpp new file mode 100644 index 00000000..1d28ee48 --- /dev/null +++ b/client/tunnel_channel.cpp @@ -0,0 +1,792 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include "common.h" +#include "tunnel_channel.h" +#include "red.h" + +#define SOCKET_WINDOW_SIZE 60 +#define SOCKET_TOKENS_TO_SEND 20 + +/* classes for tunneling msgs without reallocations and memcpy */ + +class InSocketMessage; +class OutSocketMessage; + +class InSocketMessage: public ClientNetSocket::SendBuffer { +public: + InSocketMessage(RedChannel::CompundInMessage& full_msg); + + const uint8_t* data(); + uint32_t size(); + ClientNetSocket::SendBuffer* ref(); + void unref(); + +protected: + virtual ~InSocketMessage() {} + +private: + int _refs; + RedChannel::CompundInMessage& _full_msg; + RedTunnelSocketData* _sckt_msg; + uint32_t _buf_size; +}; + +InSocketMessage::InSocketMessage(RedChannel::CompundInMessage& full_msg) + : _refs (1) + , _full_msg (full_msg) +{ + ASSERT(full_msg.type() == RED_TUNNEL_SOCKET_DATA); + _full_msg.ref(); + _sckt_msg = (RedTunnelSocketData*)(_full_msg.data()); + _buf_size = _full_msg.size() - sizeof(RedTunnelSocketData); +} + +const uint8_t* InSocketMessage::data() +{ + return _sckt_msg->data; +} + +uint32_t InSocketMessage::size() +{ + return _buf_size; +} + +ClientNetSocket::SendBuffer* InSocketMessage::ref() +{ + _full_msg.ref(); + _refs++; + return this; +} + +void InSocketMessage::unref() +{ + _full_msg.unref(); + if (!--_refs) { + delete this; + } +} + +class OutSocketMessage: public RedPeer::OutMessage, + public RedChannel::OutMessage, + public ClientNetSocket::ReceiveBuffer { +public: + + virtual RedPeer::OutMessage& peer_message() { return *this;} + virtual void release(); + + virtual uint8_t* buf(); + virtual uint32_t buf_max_size() {return _max_data_size;} + virtual void set_buf_size(uint32_t size); + virtual void release_buf(); + + static void init(uint32_t max_data_size); + static OutSocketMessage& alloc_message(); + static void clear_free_messages(); + +protected: + OutSocketMessage(); + virtual ~OutSocketMessage() {} + +private: + static std::list<OutSocketMessage*> _free_messages; + static uint32_t _max_data_size; +}; + +std::list<OutSocketMessage*> OutSocketMessage::_free_messages; +uint32_t OutSocketMessage::_max_data_size; + +OutSocketMessage::OutSocketMessage() + : RedPeer::OutMessage(REDC_TUNNEL_SOCKET_DATA, sizeof(RedcTunnelSocketData) + _max_data_size) + , RedChannel::OutMessage() + , ClientNetSocket::ReceiveBuffer() +{ +} + +uint8_t* OutSocketMessage::buf() +{ + return ((RedcTunnelSocketData*)RedPeer::OutMessage::data())->data; +} + +void OutSocketMessage::set_buf_size(uint32_t size) +{ + RedPeer::OutMessage::header().size = size + sizeof(RedcTunnelSocketData); +} + +void OutSocketMessage::release() +{ + OutSocketMessage::_free_messages.push_front(this); +} + +void OutSocketMessage::release_buf() +{ + release(); +} + +void OutSocketMessage::init(uint32_t max_data_size) +{ + _max_data_size = max_data_size; +} + +OutSocketMessage& OutSocketMessage::alloc_message() +{ + OutSocketMessage* ret; + if (!_free_messages.empty()) { + ret = _free_messages.front(); + _free_messages.pop_front(); + } else { + ret = new OutSocketMessage(); + } + + return *ret; +} + +void OutSocketMessage::clear_free_messages() +{ + while (!_free_messages.empty()) { + OutSocketMessage* message = _free_messages.front(); + _free_messages.pop_front(); + delete message; + } +} + +struct TunnelService { + uint32_t type; + uint32_t id; + uint32_t group; + struct in_addr ip; + uint32_t port; + std::string name; + std::string description; + + struct in_addr virtual_ip; +#ifdef TUNNEL_CONFIG + TunnelConfigConnectionIfc* service_src; +#endif +}; + +class TunnelChannel::TunnelSocket: public ClientNetSocket { +public: + TunnelSocket(uint16_t id, TunnelService & dst_service, EventsLoop & events_loop, + EventHandler & event_handler); + virtual ~TunnelSocket() {} + + void set_num_tokens(uint32_t tokens) {_num_tokens = tokens;} + void set_server_num_tokens(uint32_t tokens) {_server_num_tokens = tokens;} + void set_guest_closed() {_guest_closed = true;} + + uint32_t get_num_tokens() {return _num_tokens;} + uint32_t get_server_num_tokens() {return _server_num_tokens;} + bool get_guest_closed() {return _guest_closed;} + +protected: + virtual ReceiveBuffer& alloc_receive_buffer() {return OutSocketMessage::alloc_message();} + +private: + uint32_t _num_tokens; + uint32_t _server_num_tokens; + uint32_t _service_id; + bool _guest_closed; +}; + +TunnelChannel::TunnelSocket::TunnelSocket(uint16_t id, TunnelService& dst_service, + EventsLoop& events_loop, + ClientNetSocket::EventHandler& event_handler) + : ClientNetSocket(id, dst_service.ip, htons((uint16_t)dst_service.port), + events_loop, event_handler) + , _num_tokens (0) + , _server_num_tokens (0) + , _service_id (dst_service.id) + , _guest_closed (false) +{ +} + +class TunnelHandler: public MessageHandlerImp<TunnelChannel, RED_TUNNEL_MESSAGES_END> { +public: + TunnelHandler(TunnelChannel& channel) + : MessageHandlerImp<TunnelChannel, RED_TUNNEL_MESSAGES_END>(channel) {} +}; + +TunnelChannel::TunnelChannel(RedClient& client, uint32_t id) + : RedChannel(client, RED_CHANNEL_TUNNEL, id, new TunnelHandler(*this)) + , _max_socket_data_size(0) + , _service_id(0) + , _service_group(0) +{ + TunnelHandler* handler = static_cast<TunnelHandler*>(get_message_handler()); + + handler->set_handler(RED_MIGRATE, &TunnelChannel::handle_migrate, 0); + handler->set_handler(RED_SET_ACK, &TunnelChannel::handle_set_ack, sizeof(RedSetAck)); + handler->set_handler(RED_PING, &TunnelChannel::handle_ping, sizeof(RedPing)); + handler->set_handler(RED_WAIT_FOR_CHANNELS, &TunnelChannel::handle_wait_for_channels, + sizeof(RedWaitForChannels)); + + handler->set_handler(RED_TUNNEL_INIT, + &TunnelChannel::handle_init, sizeof(RedTunnelInit)); + handler->set_handler(RED_TUNNEL_SERVICE_IP_MAP, + &TunnelChannel::handle_service_ip_map, sizeof(RedTunnelServiceIpMap)); + handler->set_handler(RED_TUNNEL_SOCKET_OPEN, + &TunnelChannel::handle_socket_open, sizeof(RedTunnelSocketOpen)); + handler->set_handler(RED_TUNNEL_SOCKET_CLOSE, + &TunnelChannel::handle_socket_close, sizeof(RedTunnelSocketClose)); + handler->set_handler(RED_TUNNEL_SOCKET_FIN, + &TunnelChannel::handle_socket_fin, sizeof(RedTunnelSocketFin)); + handler->set_handler(RED_TUNNEL_SOCKET_TOKEN, + &TunnelChannel::handle_socket_token, sizeof(RedTunnelSocketTokens)); + handler->set_handler(RED_TUNNEL_SOCKET_CLOSED_ACK, + &TunnelChannel::handle_socket_closed_ack, + sizeof(RedTunnelSocketClosedAck)); + handler->set_handler(RED_TUNNEL_SOCKET_DATA, + &TunnelChannel::handle_socket_data, sizeof(RedTunnelSocketData)); +#ifdef TUNNEL_CONFIG + _config_listener = new TunnelConfigListenerIfc(*this); +#endif +} + +TunnelChannel::~TunnelChannel() +{ + destroy_sockets(); + OutSocketMessage::clear_free_messages(); +#ifdef TUNNEL_CONFIG + delete _config_listener; +#endif +} + +void TunnelChannel::handle_init(RedPeer::InMessage* message) +{ + RedTunnelInit* init_msg = (RedTunnelInit*)message->data(); + _max_socket_data_size = init_msg->max_socket_data_size; + OutSocketMessage::init(_max_socket_data_size); + _sockets.resize(init_msg->max_num_of_sockets); +} + +void TunnelChannel::send_service(TunnelService& service) +{ + int msg_size = 0; + msg_size += service.name.length() + 1; + msg_size += service.description.length() + 1; + + if (service.type == RED_TUNNEL_SERVICE_TYPE_IPP) { + msg_size += sizeof(RedcTunnelAddPrintService) + sizeof(RedTunnelIPv4); + } else if (service.type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + msg_size += sizeof(RedcTunnelAddGenericService); + } else { + THROW("%s: invalid service type", __FUNCTION__); + } + Message* service_msg = new Message(REDC_TUNNEL_SERVICE_ADD, msg_size); + RedcTunnelAddGenericService* out_service = (RedcTunnelAddGenericService*)service_msg->data(); + out_service->id = service.id; + out_service->group = service.group; + out_service->type = service.type; + out_service->port = service.port; + + int cur_offset; + if (service.type == RED_TUNNEL_SERVICE_TYPE_IPP) { + cur_offset = sizeof(RedcTunnelAddPrintService); + ((RedcTunnelAddPrintService*)out_service)->ip.type = RED_TUNNEL_IP_TYPE_IPv4; + memcpy(((RedcTunnelAddPrintService*)out_service)->ip.data, &(service.ip.s_addr), + sizeof(RedTunnelIPv4)); + cur_offset += sizeof(RedTunnelIPv4); + } else { + cur_offset = sizeof(RedcTunnelAddGenericService); + } + + out_service->name = cur_offset; + service.name.copy((char*)(service_msg->data() + cur_offset), service.name.length()); + (service_msg->data() + cur_offset)[service.name.length()] = '\0'; + cur_offset += service.name.length() + 1; + + out_service->description = cur_offset; + service.description.copy((char*)(service_msg->data() + cur_offset), + service.description.length()); + (service_msg->data() + cur_offset)[service.description.length()] = '\0'; + cur_offset += service.description.length() + 1; + + post_message(service_msg); +} + +void TunnelChannel::handle_service_ip_map(RedPeer::InMessage* message) +{ + RedTunnelServiceIpMap* service_ip_msg = (RedTunnelServiceIpMap*)message->data(); + TunnelService* service = find_service(service_ip_msg->service_id); + if (!service) { + THROW("%s: attempt to map non-existing service id=%d", __FUNCTION__, + service_ip_msg->service_id); + } + + if (service_ip_msg->virtual_ip.type == RED_TUNNEL_IP_TYPE_IPv4) { + memcpy(&service->virtual_ip.s_addr, service_ip_msg->virtual_ip.data, + sizeof(RedTunnelIPv4)); + } else { + THROW("unexpected ip type %d", service_ip_msg->virtual_ip.type); + } + DBG(0, "service_id=%d (%s), virtual_ip=%s", service->id, service->name.c_str(), + inet_ntoa(service->virtual_ip)); +#ifdef TUNNEL_CONFIG + service->service_src->send_virtual_ip(service->virtual_ip); +#endif +} + +void TunnelChannel::handle_socket_open(RedPeer::InMessage* message) +{ + RedTunnelSocketOpen* open_msg = (RedTunnelSocketOpen*)message->data(); + TunnelSocket* sckt; + Message* out_msg; + + if (_sockets[open_msg->connection_id]) { + THROW("%s: attempt to open an already opened connection id=%d", __FUNCTION__, + open_msg->connection_id); + } + + TunnelService* service = find_service(open_msg->service_id); + if (!service) { + THROW("%s: attempt to access non-existing service id=%d", __FUNCTION__, + open_msg->service_id); + } + + sckt = new TunnelSocket(open_msg->connection_id, *service, get_events_loop(), *this); + + if (sckt->connect(open_msg->tokens)) { + _sockets[open_msg->connection_id] = sckt; + out_msg = new Message(REDC_TUNNEL_SOCKET_OPEN_ACK, sizeof(RedcTunnelSocketOpenAck)); + sckt->set_num_tokens(0); + sckt->set_server_num_tokens(SOCKET_WINDOW_SIZE); + + ((RedcTunnelSocketOpenAck*)out_msg->data())->connection_id = open_msg->connection_id; + ((RedcTunnelSocketOpenAck*)out_msg->data())->tokens = SOCKET_WINDOW_SIZE; + } else { + out_msg = new Message(REDC_TUNNEL_SOCKET_OPEN_NACK, sizeof(RedcTunnelSocketOpenNack)); + ((RedcTunnelSocketOpenNack*)out_msg->data())->connection_id = open_msg->connection_id; + delete sckt; + } + + post_message(out_msg); +} + +void TunnelChannel::handle_socket_fin(RedPeer::InMessage* message) +{ + RedTunnelSocketFin* fin_msg = (RedTunnelSocketFin*)message->data(); + TunnelSocket* sckt = _sockets[fin_msg->connection_id]; + + if (!sckt) { + THROW("%s: fin connection that doesn't exist id=%d", __FUNCTION__, fin_msg->connection_id); + } + + DBG(0, "guest fin connection_id=%d", fin_msg->connection_id); + if (sckt->is_connected()) { + sckt->push_fin(); + } +} + +void TunnelChannel::handle_socket_close(RedPeer::InMessage* message) +{ + RedTunnelSocketClose* close_msg = (RedTunnelSocketClose*)message->data(); + TunnelSocket* sckt = _sockets[close_msg->connection_id]; + + if (!sckt) { + THROW("%s: closing connection that doesn't exist id=%d", __FUNCTION__, + close_msg->connection_id); + } + DBG(0, "guest closed connection_id=%d", close_msg->connection_id); + + sckt->set_guest_closed(); + + if (sckt->is_connected()) { + sckt->push_disconnect(); + } else { + // close happend in the server side before it received the client + // close msg. we should ack the server and free the socket + on_socket_disconnect(*sckt); + } +} + +void TunnelChannel::handle_socket_closed_ack(RedPeer::InMessage* message) +{ + RedTunnelSocketClosedAck* close_ack_msg = (RedTunnelSocketClosedAck*)message->data(); + TunnelSocket* sckt = _sockets[close_ack_msg->connection_id]; + if (!sckt) { + THROW("%s: close ack to connection that doesn't exist id=%d", __FUNCTION__, + close_ack_msg->connection_id); + } + + if (sckt->is_connected()) { + THROW("%s: close ack to connection that is not closed id=%d", + __FUNCTION__, close_ack_msg->connection_id); + } + _sockets[sckt->id()] = NULL; + DBG(0, "guest Acked closed connection_id=%d", close_ack_msg->connection_id); + delete sckt; +} + +void TunnelChannel::handle_socket_data(RedPeer::InMessage* message) +{ + RedTunnelSocketData* send_msg = (RedTunnelSocketData*)message->data(); + TunnelSocket* sckt = _sockets[send_msg->connection_id]; + + if (!sckt) { + THROW("%s: sending data to connection that doesn't exist id=%d", __FUNCTION__, + send_msg->connection_id); + } + + if (!sckt->get_server_num_tokens()) { + THROW("%s: token violation connectio_id=%d", __FUNCTION__, sckt->id()); + } + + sckt->set_server_num_tokens(sckt->get_server_num_tokens() - 1); + + if (!sckt->is_connected()) { + // server hasn't handled the close msg yet + return; + } + + InSocketMessage* sckt_msg = new InSocketMessage(*( + static_cast<RedChannel::CompundInMessage*>(message))); + if (sckt_msg->size() > _max_socket_data_size) { + THROW("%s: socket data exceeds size limit %d > %d connection_id=%d", __FUNCTION__, + sckt_msg->size(), _max_socket_data_size, sckt->id()); + } + sckt->push_send(*sckt_msg); + sckt_msg->unref(); +} + +void TunnelChannel::handle_socket_token(RedPeer::InMessage* message) +{ + RedTunnelSocketTokens* token_msg = (RedTunnelSocketTokens*)message->data(); + TunnelSocket* sckt = _sockets[token_msg->connection_id]; + + if (!sckt) { + THROW("%s: ack connection that doesn't exist id=%d", __FUNCTION__, + token_msg->connection_id); + } + if (!sckt->is_connected()) { + return; + } + sckt->add_recv_tokens(token_msg->num_tokens); +} + +void TunnelChannel::on_socket_message_recv_done(ClientNetSocket& sckt, + ClientNetSocket::ReceiveBuffer& buf) +{ + TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt); + OutSocketMessage* out_msg = static_cast<OutSocketMessage*>(&buf); + + ((RedcTunnelSocketData*)(out_msg->data()))->connection_id = tunnel_sckt->id(); + post_message(out_msg); +} + +void TunnelChannel::on_socket_fin_recv(ClientNetSocket& sckt) +{ + TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt); + Message* out_msg = new Message(REDC_TUNNEL_SOCKET_FIN, sizeof(RedcTunnelSocketFin)); + DBG(0, "FIN from client coonection id=%d", tunnel_sckt->id()); + ((RedcTunnelSocketFin*)out_msg->data())->connection_id = tunnel_sckt->id(); + post_message(out_msg); +} + +void TunnelChannel::on_socket_disconnect(ClientNetSocket& sckt) +{ + TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt); + Message* out_msg; + // close intiated by server -> needs ack + if (tunnel_sckt->get_guest_closed()) { + DBG(0, "send close ack connection_id=%d", tunnel_sckt->id()); + out_msg = new Message(REDC_TUNNEL_SOCKET_CLOSED_ACK, sizeof(RedcTunnelSocketClosedAck)); + ((RedcTunnelSocketClosedAck*)out_msg->data())->connection_id = tunnel_sckt->id(); + _sockets[tunnel_sckt->id()] = NULL; + delete &sckt; + } else { // close initiated by client + DBG(0, "send close coonection_id=%d", tunnel_sckt->id()); + out_msg = new Message(REDC_TUNNEL_SOCKET_CLOSED, sizeof(RedcTunnelSocketClosed)); + ((RedcTunnelSocketClosed*)out_msg->data())->connection_id = tunnel_sckt->id(); + } + + post_message(out_msg); +} + +void TunnelChannel::on_socket_message_send_done(ClientNetSocket& sckt) +{ + TunnelChannel::TunnelSocket* tunnel_sckt = static_cast<TunnelChannel::TunnelSocket*>(&sckt); + uint32_t num_tokens = tunnel_sckt->get_num_tokens(); + num_tokens++; + + if (num_tokens == SOCKET_TOKENS_TO_SEND) { + Message* out_msg = new Message(REDC_TUNNEL_SOCKET_TOKEN, sizeof(RedcTunnelSocketTokens)); + RedcTunnelSocketTokens* tokens_msg = (RedcTunnelSocketTokens*)out_msg->data(); + tokens_msg->connection_id = tunnel_sckt->id(); + tokens_msg->num_tokens = num_tokens; + post_message(out_msg); + + tunnel_sckt->set_num_tokens(0); + tunnel_sckt->set_server_num_tokens(tunnel_sckt->get_server_num_tokens() + num_tokens); + + ASSERT(tunnel_sckt->get_server_num_tokens() <= SOCKET_WINDOW_SIZE); + } else { + tunnel_sckt->set_num_tokens(num_tokens); + } +} + +TunnelService* TunnelChannel::find_service(uint32_t id) +{ + for (std::list<TunnelService*>::iterator iter = _services.begin(); + iter != _services.end(); iter++) { + if ((*iter)->id == id) { + return *iter; + } + } + return NULL; +} + +/* returns the first service with the same ip */ +TunnelService* TunnelChannel::find_service(struct in_addr& ip) +{ + for (std::list<TunnelService*>::iterator iter = _services.begin(); + iter != _services.end(); iter++) { + if ((*iter)->ip.s_addr == ip.s_addr) { + return *iter; + } + } + return NULL; +} + +TunnelService* TunnelChannel::find_service(struct in_addr& ip, uint32_t port) +{ + for (std::list<TunnelService*>::iterator iter = _services.begin(); + iter != _services.end(); iter++) { + if (((*iter)->ip.s_addr == ip.s_addr) && ((*iter)->port == port)) { + return *iter; + } + } + return NULL; +} + +void TunnelChannel::destroy_sockets() +{ + for (unsigned int i = 0; i < _sockets.size(); i++) { + if (_sockets[i]) { + delete _sockets[i]; + _sockets[i] = NULL; + } + } +} + +void TunnelChannel::on_disconnect() +{ + destroy_sockets(); + OutSocketMessage::clear_free_messages(); +} + +#ifdef TUNNEL_CONFIG +void TunnelChannel::add_service(TunnelConfigConnectionIfc& source, + uint32_t type, struct in_addr& ip, uint32_t port, + std::string& name, std::string& description) +{ + if (find_service(ip, port)) { + LOG_WARN("service ip=%s port=%d was already added", + inet_ntoa(ip), port); + return; + } + TunnelService* new_service = new TunnelService; + TunnelService* service_group = find_service(ip); + new_service->type = type; + new_service->id = _service_id++; + if (service_group) { + if (name != service_group->name) { + LOG_WARN("service ip=%s port=%d was not added because of inconsistent name for ip", + inet_ntoa(ip), port); + delete new_service; + return; + } + new_service->group = service_group->group; + } else { + new_service->group = _service_group++; + } + new_service->ip.s_addr = ip.s_addr; + new_service->port = port; + new_service->name = name; + new_service->description = description; + new_service->service_src = &source; + _services.push_back(new_service); + send_service(*new_service); +} + +#endif + +class TunnelFactory: public ChannelFactory { +public: + TunnelFactory() : ChannelFactory(RED_CHANNEL_TUNNEL) {} + virtual RedChannel* construct(RedClient& client, uint32_t id) + { + return new TunnelChannel(client, id); + } +}; + +static TunnelFactory factory; + +ChannelFactory& TunnelChannel::Factory() +{ + return factory; +} + +#ifdef TUNNEL_CONFIG +TunnelConfigListenerIfc::TunnelConfigListenerIfc(TunnelChannel& tunnel) + : _tunnel(tunnel) +{ + _listener_ref = NamedPipe::create(TUNNEL_CONFIG_PIPE_NAME, *this); +} + +TunnelConfigListenerIfc::~TunnelConfigListenerIfc() +{ + for (std::list<TunnelConfigConnectionIfc*>::iterator it = _connections.begin(); + it != _connections.end(); ++it) { + if ((*it)->get_ref() != NamedPipe::INVALID_CONNECTION) { + NamedPipe::destroy_connection((*it)->get_ref()); + } + delete (*it); + } + + NamedPipe::destroy(_listener_ref); +} + +NamedPipe::ConnectionInterface& TunnelConfigListenerIfc::create() +{ + DBG(0, "new_connection"); + TunnelConfigConnectionIfc* new_conn = new TunnelConfigConnectionIfc(_tunnel, *this); + _connections.push_back(new_conn); + return *new_conn; +} + +void TunnelConfigListenerIfc::destroy_connection(TunnelConfigConnectionIfc* conn) +{ + if (conn->get_ref() != NamedPipe::INVALID_CONNECTION) { + NamedPipe::destroy_connection(conn->get_ref()); + } + _connections.remove(conn); + delete conn; +} + +TunnelConfigConnectionIfc::TunnelConfigConnectionIfc(TunnelChannel& tunnel, + TunnelConfigListenerIfc& listener) + : _tunnel(tunnel) + , _listener(listener) + , _in_msg_len(0) + , _out_msg("") + , _out_msg_pos(0) +{ +} + +void TunnelConfigConnectionIfc::bind(NamedPipe::ConnectionRef conn_ref) +{ + _opaque = conn_ref; + on_data(); +} + +void TunnelConfigConnectionIfc::on_data() +{ + if (!_out_msg.empty()) { + int ret = NamedPipe::write(_opaque, (uint8_t*)_out_msg.c_str() + _out_msg_pos, + _out_msg.length() - _out_msg_pos); + if (ret == -1) { + _listener.destroy_connection(this); + return; + } + _out_msg_pos += ret; + if (_out_msg_pos == _out_msg.length()) { + _out_msg = ""; + _out_msg_pos = 0; + } + } else { + int ret = NamedPipe::read(_opaque, (uint8_t*)_in_msg + _in_msg_len, + TUNNEL_CONFIG_MAX_MSG_LEN - _in_msg_len); + + if (ret == -1) { + _listener.destroy_connection(this); + return; + } + _in_msg_len += ret; + + if (_in_msg[_in_msg_len - 1] != '\n') { + return; + } + handle_msg(); + _in_msg_len = 0; + } +} + +void TunnelConfigConnectionIfc::send_virtual_ip(struct in_addr& ip) +{ + _out_msg = inet_ntoa(ip); + _out_msg += "\n"; + _out_msg_pos = 0; + on_data(); +} + +void TunnelConfigConnectionIfc::handle_msg() +{ + std::string space = " \t"; + _in_msg[_in_msg_len - 1] = '\0'; + std::string msg(_in_msg); + + uint32_t service_type; + struct in_addr ip; + uint32_t port; + std::string name; + std::string desc; + + DBG(0, "msg=%s", _in_msg); + size_t start_token = 0; + size_t end_token; + + start_token = msg.find_first_not_of(space); + end_token = msg.find_first_of(space, start_token); + + if ((end_token - start_token) != 1) { + THROW("unexpected service type length"); + } + if (msg[start_token] == '0') { + service_type = RED_TUNNEL_SERVICE_TYPE_GENERIC; + } else if (msg[start_token] == '1') { + service_type = RED_TUNNEL_SERVICE_TYPE_IPP; + } else { + THROW("unexpected service type"); + } + + start_token = msg.find_first_not_of(space, end_token); + end_token = msg.find_first_of(space, start_token); + + inet_aton(msg.substr(start_token, end_token - start_token).c_str(), &ip); + + start_token = msg.find_first_not_of(space, end_token); + end_token = msg.find_first_of(space, start_token); + + port = atoi(msg.substr(start_token, end_token - start_token).c_str()); + + start_token = msg.find_first_not_of(space, end_token); + end_token = msg.find_first_of(space, start_token); + + name = msg.substr(start_token, end_token - start_token); + + start_token = msg.find_first_not_of(space, end_token); + desc = msg.substr(start_token); + + _tunnel.add_service(*this, service_type, ip, port, name, desc); +} + +#endif diff --git a/client/tunnel_channel.h b/client/tunnel_channel.h new file mode 100644 index 00000000..4fd3465c --- /dev/null +++ b/client/tunnel_channel.h @@ -0,0 +1,138 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#ifndef _H_TUNNEL_CHANNEL +#define _H_TUNNEL_CHANNEL + +#include "common.h" +#include "red_channel.h" +#include "red_client.h" +#include "client_net_socket.h" +#include "platform.h" + +#define TUNNEL_CONFIG + +#ifdef TUNNEL_CONFIG +class TunnelConfigConnectionIfc; +class TunnelConfigListenerIfc; +#endif + +/* channel for tunneling tcp from guest to client network */ +typedef struct TunnelService TunnelService; +class TunnelChannel: public RedChannel, + public ClientNetSocket::EventHandler { +public: + + TunnelChannel(RedClient& client, uint32_t id); + virtual ~TunnelChannel(); + + virtual void on_socket_message_recv_done(ClientNetSocket& sckt, + ClientNetSocket::ReceiveBuffer& buf); + virtual void on_socket_message_send_done(ClientNetSocket& sckt); + virtual void on_socket_fin_recv(ClientNetSocket& sckt); + virtual void on_socket_disconnect(ClientNetSocket& sckt); + +#ifdef TUNNEL_CONFIG + void add_service(TunnelConfigConnectionIfc& source, + uint32_t type, struct in_addr& ip, uint32_t port, + std::string& name, std::string& description); +#endif + static ChannelFactory& Factory(); + +protected: + class TunnelSocket; + + virtual void on_disconnect(); + +private: + void handle_init(RedPeer::InMessage* message); + void handle_service_ip_map(RedPeer::InMessage* message); + + void handle_socket_open(RedPeer::InMessage* message); + void handle_socket_fin(RedPeer::InMessage* message); + void handle_socket_close(RedPeer::InMessage* message); + void handle_socket_closed_ack(RedPeer::InMessage* message); + void handle_socket_data(RedPeer::InMessage* message); + void handle_socket_token(RedPeer::InMessage* message); + + TunnelService* find_service(uint32_t id); + TunnelService* find_service(struct in_addr& ip); + TunnelService* find_service(struct in_addr& ip, uint32_t port); + + void send_service(TunnelService& service); + void destroy_sockets(); + +private: + std::vector<TunnelSocket*> _sockets; + std::list<TunnelService*> _services; + uint32_t _max_socket_data_size; + uint32_t _service_id; + uint32_t _service_group; +#ifdef TUNNEL_CONFIG + TunnelConfigListenerIfc* _config_listener; +#endif +}; + +#ifdef TUNNEL_CONFIG +#ifdef _WIN32 +#define TUNNEL_CONFIG_PIPE_NAME "tunnel-config.pipe" +#else +#define TUNNEL_CONFIG_PIPE_NAME "/tmp/tunnel-config.pipe" +#endif + +class TunnelConfigConnectionIfc; + +class TunnelConfigListenerIfc: public NamedPipe::ListenerInterface { +public: + TunnelConfigListenerIfc(TunnelChannel& tunnel); + virtual ~TunnelConfigListenerIfc(); + virtual NamedPipe::ConnectionInterface& create(); + virtual void destroy_connection(TunnelConfigConnectionIfc* conn); + +private: + TunnelChannel& _tunnel; + NamedPipe::ListenerRef _listener_ref; + std::list<TunnelConfigConnectionIfc*> _connections; +}; + +#define TUNNEL_CONFIG_MAX_MSG_LEN 2048 +class TunnelConfigConnectionIfc: public NamedPipe::ConnectionInterface { +public: + TunnelConfigConnectionIfc(TunnelChannel& tunnel, + TunnelConfigListenerIfc& listener); + virtual void bind(NamedPipe::ConnectionRef conn_ref); + virtual void on_data(); + void send_virtual_ip(struct in_addr& ip); + NamedPipe::ConnectionRef get_ref() {return _opaque;} + void handle_msg(); + +private: + TunnelChannel& _tunnel; + TunnelConfigListenerIfc& _listener; + char _in_msg[TUNNEL_CONFIG_MAX_MSG_LEN]; // <service_type> <ip> <port> <name> <desc>\n + int _in_msg_len; + + std::string _out_msg; // <virtual ip>\n + int _out_msg_pos; +}; +#endif + +#endif diff --git a/client/windows/platform_utils.cpp b/client/windows/platform_utils.cpp index 6b9049a0..9b809c87 100644 --- a/client/windows/platform_utils.cpp +++ b/client/windows/platform_utils.cpp @@ -90,7 +90,7 @@ HBITMAP get_alpha_bitmap_res(int id) AutoDC auto_dc(create_compatible_dc()); BITMAPINFO dest_info; - uint8_t *dest; + uint8_t* dest; dest_info.bmiHeader.biSize = sizeof(dest_info.bmiHeader); dest_info.bmiHeader.biWidth = src_info.bmWidth; dest_info.bmiHeader.biHeight = -src_info.bmHeight; @@ -102,7 +102,7 @@ HBITMAP get_alpha_bitmap_res(int id) dest_info.bmiHeader.biClrUsed = 0; dest_info.bmiHeader.biClrImportant = 0; - HBITMAP ret = CreateDIBSection(auto_dc.get(), &dest_info, 0, (VOID **)&dest, NULL, 0); + HBITMAP ret = CreateDIBSection(auto_dc.get(), &dest_info, 0, (VOID**)&dest, NULL, 0); if (!ret) { THROW("create bitmap failed, %u", GetLastError()); } @@ -139,7 +139,7 @@ const char* sys_err_to_str(int error) msg = new char[BUF_SIZE]; _snprintf(msg, BUF_SIZE, "errno %d", error); } else { - char *new_line; + char* new_line; if ((new_line = strrchr(msg, '\r'))) { *new_line = 0; } @@ -149,3 +149,14 @@ const char* sys_err_to_str(int error) return errors_map[error]; } +int inet_aton(const char* ip, struct in_addr* in_addr) +{ + unsigned long addr = inet_addr(ip); + + if (addr == INADDR_NONE) { + return 0; + } + in_addr->S_un.S_addr = addr; + return 1; +} + diff --git a/client/windows/platform_utils.h b/client/windows/platform_utils.h index 95b04fcb..299f70b1 100644 --- a/client/windows/platform_utils.h +++ b/client/windows/platform_utils.h @@ -18,13 +18,17 @@ #ifndef _H_PLATFORM_UTILS #define _H_PLATFORM_UTILS +#include <winsock.h> +#include <winsock2.h> +#include <ws2tcpip.h> + #define mb() __asm {lock add [esp], 0} template<class T, class FreeRes = FreeObject<T>, T invalid = NULL > class AutoRes { public: - AutoRes() : res (invalid) {} - AutoRes(T inRes) : res (inRes) {} + AutoRes() : res(invalid) {} + AutoRes(T inRes) : res(inRes) {} ~AutoRes() { set(invalid); } void set(T inRes) {if (res != invalid) free_res(res); res = inRes; } @@ -67,7 +71,7 @@ HBITMAP get_alpha_bitmap_res(int id); class WindowDC { public: - WindowDC(HWND window) : _window (window), _dc (GetDC(window)) {} + WindowDC(HWND window): _window (window), _dc (GetDC(window)) {} ~WindowDC() { ReleaseDC(_window, _dc);} HDC operator * () { return _dc;} @@ -80,5 +84,17 @@ typedef AutoRes<HDC, Delete_DC> AutoReleaseDC; const char* sys_err_to_str(int error); +#define SHUT_WR SD_SEND +#define SHUT_RD SD_RECEIVE +#define SHUT_RDWR SD_BOTH +#define MSG_NOSIGNAL 0 + +#define SHUTDOWN_ERR WSAESHUTDOWN +#define INTERRUPTED_ERR WSAEINTR +#define WOULDBLOCK_ERR WSAEWOULDBLOCK +#define sock_error() WSAGetLastError() +#define sock_err_message(err) sys_err_to_str(err) +int inet_aton(const char* ip, struct in_addr* in_addr); + #endif diff --git a/client/windows/redc.vcproj b/client/windows/redc.vcproj index 25ff3259..90d235ca 100644 --- a/client/windows/redc.vcproj +++ b/client/windows/redc.vcproj @@ -205,6 +205,10 @@ >
</File>
<File
+ RelativePath="..\client_net_socket.cpp"
+ >
+ </File>
+ <File
RelativePath="..\cmd_line_parser.cpp"
>
</File>
@@ -379,6 +383,10 @@ >
</File>
<File
+ RelativePath="..\tunnel_channel.cpp"
+ >
+ </File>
+ <File
RelativePath="..\utils.cpp"
>
</File>
@@ -405,6 +413,10 @@ >
</File>
<File
+ RelativePath="..\client_net_socket.h"
+ >
+ </File>
+ <File
RelativePath="..\common.h"
>
</File>
@@ -557,6 +569,10 @@ >
</File>
<File
+ RelativePath="..\tunnel_channel.h"
+ >
+ </File>
+ <File
RelativePath="..\utils.h"
>
</File>
diff --git a/client/x11/Makefile.am b/client/x11/Makefile.am index 2bb1c853..02ee8717 100644 --- a/client/x11/Makefile.am +++ b/client/x11/Makefile.am @@ -39,6 +39,8 @@ RED_COMMON_SRCS = \ $(top_srcdir)/client/red_cairo_canvas.h \ $(top_srcdir)/client/cmd_line_parser.cpp \ $(top_srcdir)/client/cmd_line_parser.h \ + $(top_srcdir)/client/client_net_socket.cpp \ + $(top_srcdir)/client/client_net_socket.h \ $(top_srcdir)/client/common.h \ $(top_srcdir)/client/cursor_channel.cpp \ $(top_srcdir)/client/cursor_channel.h \ @@ -94,9 +96,11 @@ RED_COMMON_SRCS = \ $(top_srcdir)/client/hot_keys.cpp \ $(top_srcdir)/client/hot_keys.h \ $(top_srcdir)/client/threads.cpp \ + $(top_srcdir)/client/tunnel_channel.cpp \ + $(top_srcdir)/client/tunnel_channel.h \ $(top_srcdir)/client/utils.cpp \ $(top_srcdir)/client/utils.h \ - $(top_srcdir)/client/icon.h \ + $(top_srcdir)/client/icon.h \ $(NULL) bin_PROGRAMS = spicec diff --git a/client/x11/platform_utils.h b/client/x11/platform_utils.h index 763a70e5..1e86f72e 100644 --- a/client/x11/platform_utils.h +++ b/client/x11/platform_utils.h @@ -18,11 +18,28 @@ #ifndef _H_PLATFORM_UTILS #define _H_PLATFORM_UTILS +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <netdb.h> + #ifdef __i386__ -#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%esp)": : : "memory") +#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%esp)" : : : "memory") #else -#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%rsp)": : : "memory") +#define mb() __asm__ __volatile__ ("lock; addl $0,0(%%rsp)" : : : "memory") #endif +typedef int SOCKET; + +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#define closesocket(sock) ::close(sock) +#define SHUTDOWN_ERR EPIPE +#define INTERRUPTED_ERR EINTR +#define WOULDBLOCK_ERR EAGAIN +#define sock_error() errno +#define sock_err_message(err) strerror(err) + #endif diff --git a/common/red.h b/common/red.h index 4b0a3642..dfd85126 100644 --- a/common/red.h +++ b/common/red.h @@ -45,8 +45,8 @@ #endif #define RED_MAGIC (*(uint32_t*)"REDQ") -#define RED_VERSION_MAJOR 1 -#define RED_VERSION_MINOR 0 +#define RED_VERSION_MAJOR (~(uint32_t)0 - 1) +#define RED_VERSION_MINOR 1 // Encryption & Ticketing Parameters #define RED_MAX_PASSWORD_LENGTH 60 @@ -60,6 +60,7 @@ enum { RED_CHANNEL_CURSOR, RED_CHANNEL_PLAYBACK, RED_CHANNEL_RECORD, + RED_CHANNEL_TUNNEL, RED_CHANNEL_END }; @@ -675,6 +676,143 @@ typedef struct ATTR_PACKED RedcRecordStartMark { uint32_t time; } RedcRecordStartMark; +enum { + RED_TUNNEL_SERVICE_TYPE_INVALID, + RED_TUNNEL_SERVICE_TYPE_GENERIC, + RED_TUNNEL_SERVICE_TYPE_IPP, +}; + +enum { + RED_TUNNEL_INIT = RED_FIRST_AVAIL_MESSAGE, + RED_TUNNEL_SERVICE_IP_MAP, + RED_TUNNEL_SOCKET_OPEN, + RED_TUNNEL_SOCKET_FIN, + RED_TUNNEL_SOCKET_CLOSE, + RED_TUNNEL_SOCKET_DATA, + RED_TUNNEL_SOCKET_CLOSED_ACK, + RED_TUNNEL_SOCKET_TOKEN, + + RED_TUNNEL_MESSAGES_END, +}; + +typedef struct ATTR_PACKED RedTunnelInit { + uint16_t max_num_of_sockets; + uint32_t max_socket_data_size; +} RedTunnelInit; + +enum { + RED_TUNNEL_IP_TYPE_INVALID, + RED_TUNNEL_IP_TYPE_IPv4, +}; + +typedef struct ATTR_PACKED RedTunnelIpInfo { + uint16_t type; + uint8_t data[0]; +} RedTunnelIpInfo; + +typedef uint8_t RedTunnelIPv4[4]; + +typedef struct ATTR_PACKED RedTunnelServiceIpMap { + uint32_t service_id; + RedTunnelIpInfo virtual_ip; +} RedTunnelServiceIpMap; + +typedef struct ATTR_PACKED RedTunnelSocketOpen { + uint16_t connection_id; + uint32_t service_id; + uint32_t tokens; +} RedTunnelSocketOpen; + +/* connection id must be the first field in msgs directed to a specific connection */ + +typedef struct ATTR_PACKED RedTunnelSocketFin { + uint16_t connection_id; +} RedTunnelSocketFin; + +typedef struct ATTR_PACKED RedTunnelSocketClose { + uint16_t connection_id; +} RedTunnelSocketClose; + +typedef struct ATTR_PACKED RedTunnelSocketData { + uint16_t connection_id; + uint8_t data[0]; +} RedTunnelSocketData; + +typedef struct ATTR_PACKED RedTunnelSocketTokens { + uint16_t connection_id; + uint32_t num_tokens; +} RedTunnelSocketTokens; + +typedef struct ATTR_PACKED RedTunnelSocketClosedAck { + uint16_t connection_id; +} RedTunnelSocketClosedAck; + +enum { + REDC_TUNNEL_SERVICE_ADD = REDC_FIRST_AVAIL_MESSAGE, + REDC_TUNNEL_SERVICE_REMOVE, + REDC_TUNNEL_SOCKET_OPEN_ACK, + REDC_TUNNEL_SOCKET_OPEN_NACK, + REDC_TUNNEL_SOCKET_FIN, + REDC_TUNNEL_SOCKET_CLOSED, + REDC_TUNNEL_SOCKET_CLOSED_ACK, + REDC_TUNNEL_SOCKET_DATA, + + REDC_TUNNEL_SOCKET_TOKEN, + + REDC_TUNNEL_MESSGES_END, +}; + +typedef struct ATTR_PACKED RedcTunnelAddGenericService { + uint32_t type; + uint32_t id; + uint32_t group; + uint32_t port; + uint32_t name; + uint32_t description; +} RedcTunnelAddGenericService; + +typedef struct ATTR_PACKED RedcTunnelAddPrintService { + RedcTunnelAddGenericService base; + RedTunnelIpInfo ip; +} RedcTunnelAddPrintService; + +typedef struct ATTR_PACKED RedcTunnelRemoveService { + uint32_t id; +} RedcTunnelRemoveService; + +/* connection id must be the first field in msgs directed to a specific connection */ + +typedef struct ATTR_PACKED RedcTunnelSocketOpenAck { + uint16_t connection_id; + uint32_t tokens; +} RedcTunnelSocketOpenAck; + +typedef struct ATTR_PACKED RedcTunnelSocketOpenNack { + uint16_t connection_id; +} RedcTunnelSocketOpenNack; + +typedef struct ATTR_PACKED RedcTunnelSocketData { + uint16_t connection_id; + uint8_t data[0]; +} RedcTunnelSocketData; + +typedef struct ATTR_PACKED RedcTunnelSocketFin { + uint16_t connection_id; +} RedcTunnelSocketFin; + +typedef struct ATTR_PACKED RedcTunnelSocketClosed { + uint16_t connection_id; +} RedcTunnelSocketClosed; + +typedef struct ATTR_PACKED RedcTunnelSocketClosedAck { + uint16_t connection_id; +} RedcTunnelSocketClosedAck; + +typedef struct ATTR_PACKED RedcTunnelSocketTokens { + uint16_t connection_id; + uint32_t num_tokens; +} RedcTunnelSocketTokens; + #undef ATTR_PACKED #ifndef __GNUC__ diff --git a/configure.ac b/configure.ac index 12799195..9810b822 100644 --- a/configure.ac +++ b/configure.ac @@ -105,6 +105,11 @@ AC_SUBST(LOG4CPP_CFLAGS) AC_SUBST(LOG4CPP_LIBS) SPICE_REQUIRES+=" log4cpp" +PKG_CHECK_MODULES(SLIRP, slirp) +AC_SUBST(SLIRP_CFLAGS) +AC_SUBST(SLIRP_LIBS) +SPICE_REQUIRES+=" slirp" + PKG_CHECK_MODULES(QCAIRO, qcairo >= 1.4.6) AC_SUBST(QCAIRO_CFLAGS) AC_SUBST(QCAIRO_LIBS) diff --git a/server/Makefile.am b/server/Makefile.am index e6ffab40..f9909615 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -9,6 +9,7 @@ INCLUDES = \ $(LOG4CPP_CFLAGS) \ $(SSL_CFLAGS) \ $(CELT051_CFLAGS) \ + $(SLIRP_CFLAGS) \ -DCAIRO_CANVAS_IMAGE_CACHE \ -DRED_STATISTICS \ $(WARN_CFLAGS) \ @@ -40,6 +41,7 @@ libspice_la_LIBADD = \ $(QCAIRO_LIBS) \ $(SSL_LIBS) \ $(CELT051_LIBS) \ + $(SLIRP_LIBS) \ $(LIBRT) \ $(NULL) @@ -64,6 +66,10 @@ libspice_la_SOURCES = \ red_yuv.h \ snd_worker.c \ snd_worker.h \ + red_channel.h \ + red_channel.c \ + red_tunnel_worker.c \ + red_tunnel_worker.h \ spice.h \ vd_interface.h \ $(COMMON_SRCS) \ diff --git a/server/red_channel.c b/server/red_channel.c new file mode 100644 index 00000000..48ace448 --- /dev/null +++ b/server/red_channel.c @@ -0,0 +1,520 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include <stdio.h> +#include <stdint.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include "red_channel.h" + +static void red_channel_receive(void *data); +static void red_channel_push(RedChannel *channel); +static void red_channel_opaque_push(void *data); +static PipeItem *red_channel_pipe_get(RedChannel *channel); +static void red_channel_pipe_clear(RedChannel *channel); + +/* return the number of bytes read. -1 in case of error */ +static int red_peer_receive(RedsStreamContext *peer, uint8_t *buf, uint32_t size) +{ + uint8_t *pos = buf; + while (size) { + int now; + if (peer->shutdown) { + return -1; + } + if ((now = peer->cb_read(peer->ctx, pos, size)) <= 0) { + if (now == 0) { + return -1; + } + ASSERT(now == -1); + if (errno == EAGAIN) { + break; + } else if (errno == EINTR) { + continue; + } else if (errno == EPIPE) { + return -1; + } else { + red_printf("%s", strerror(errno)); + return -1; + } + } else { + size -= now; + pos += now; + } + } + return pos - buf; +} + +static void red_peer_handle_incoming(RedsStreamContext *peer, IncomingHandler *handler) +{ + int bytes_read; + + for (;;) { + int ret_handle; + if (handler->header_pos < sizeof(RedDataHeader)) { + bytes_read = red_peer_receive(peer, + ((uint8_t *)&handler->header) + handler->header_pos, + sizeof(RedDataHeader) - handler->header_pos); + if (bytes_read == -1) { + handler->on_error(handler->opaque); + return; + } + handler->header_pos += bytes_read; + + if (handler->header_pos != sizeof(RedDataHeader)) { + return; + } + } + + if (handler->msg_pos < handler->header.size) { + if (!handler->msg) { + handler->msg = handler->alloc_msg_buf(handler->opaque, &handler->header); + } + + bytes_read = red_peer_receive(peer, + handler->msg + handler->msg_pos, + handler->header.size - handler->msg_pos); + if (bytes_read == -1) { + handler->release_msg_buf(handler->opaque, &handler->header, handler->msg); + handler->on_error(handler->opaque); + return; + } + handler->msg_pos += bytes_read; + if (handler->msg_pos != handler->header.size) { + return; + } + } + + ret_handle = handler->handle_message(handler->opaque, &handler->header, + handler->msg); + handler->msg_pos = 0; + handler->msg = NULL; + handler->header_pos = 0; + + if (!ret_handle) { + handler->on_error(handler->opaque); + return; + } + } +} + +static struct iovec *__iovec_skip(struct iovec vec[], int skip, int *vec_size) +{ + struct iovec *now = vec; + + while ((skip) && (skip >= now->iov_len)) { + skip -= now->iov_len; + --*vec_size; + now++; + } + + now->iov_base = (uint8_t *)now->iov_base + skip; + now->iov_len -= skip; + return now; +} + +static void red_peer_handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler) +{ + int n; + if (handler->size == 0) { + handler->vec = handler->vec_buf; + handler->size = handler->get_msg_size(handler->opaque); + if (!handler->size) { // nothing to be sent + return; + } + handler->prepare(handler->opaque, handler->vec, &handler->vec_size); + } + for (;;) { + if ((n = peer->cb_writev(peer->ctx, handler->vec, handler->vec_size)) == -1) { + switch (errno) { + case EAGAIN: + handler->on_block(handler->opaque); + return; + case EINTR: + continue; + case EPIPE: + handler->on_error(handler->opaque); + return; + default: + red_printf("%s", strerror(errno)); + handler->on_error(handler->opaque); + return; + } + } else { + handler->pos += n; + handler->vec = __iovec_skip(handler->vec, n, &handler->vec_size); + if (!handler->vec_size) { + if (handler->pos == handler->size) { // finished writing data + handler->on_msg_done(handler->opaque); + handler->vec = handler->vec_buf; + handler->pos = 0; + handler->size = 0; + return; + } else { + // There wasn't enough place for all the outgoing data in one iovec array. + // Filling the rest of the data. + handler->vec = handler->vec_buf; + handler->prepare(handler->opaque, handler->vec, &handler->vec_size); + } + } + } + } +} + +static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size); + + +static void red_channel_peer_on_error(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->disconnect(channel); +} + +static int red_channel_peer_get_out_msg_size(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + return channel->send_data.size; +} + +static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec, int *vec_size) +{ + RedChannel *channel = (RedChannel *)opaque; + red_channel_fill_iovec(channel, vec, vec_size); +} + +static void red_channel_peer_on_out_block(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->send_data.blocked = TRUE; + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, red_channel_opaque_push, + channel); +} + +static void red_channel_peer_on_out_msg_done(void *opaque) +{ + RedChannel *channel = (RedChannel *)opaque; + channel->send_data.size = 0; + channel->send_data.n_bufs = 0; + channel->send_data.not_sent_buf_head = 0; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + channel->send_data.item = NULL; + } + if (channel->send_data.blocked) { + channel->send_data.blocked = FALSE; + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, + channel); + } +} + +RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core, + int migrate, int handle_acks, + channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, + channel_handle_message_proc handle_message, + channel_alloc_msg_recv_buf_proc alloc_recv_buf, + channel_release_msg_recv_buf_proc release_recv_buf, + channel_send_pipe_item_proc send_item, + channel_release_pipe_item_proc release_item) +{ + RedChannel *channel; + + ASSERT(size >= sizeof(*channel)); + ASSERT(config_socket && disconnect && handle_message && alloc_recv_buf && + release_item); + if (!(channel = malloc(size))) { + red_printf("malloc failed"); + goto error1; + } + memset(channel, 0, size); + + channel->handle_acks = handle_acks; + channel->disconnect = disconnect; + channel->send_item = send_item; + channel->release_item = release_item; + + channel->peer = peer; + channel->core = core; + channel->ack_data.messages_window = ~0; // blocks send message (maybe use send_data.blocked + + // block flags) + channel->ack_data.client_generation = ~0; + + channel->migrate = migrate; + ring_init(&channel->pipe); + + channel->incoming.opaque = channel; + channel->incoming.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf; + channel->incoming.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf; + channel->incoming.handle_message = (handle_message_proc)handle_message; + channel->incoming.on_error = red_channel_peer_on_error; + + channel->outgoing.opaque = channel; + channel->outgoing.pos = 0; + channel->outgoing.size = 0; + + channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size; + channel->outgoing.prepare = red_channel_peer_prepare_out_msg; + channel->outgoing.on_block = red_channel_peer_on_out_block; + channel->outgoing.on_error = red_channel_peer_on_error; + channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done; + + if (!config_socket(channel)) { + goto error2; + } + + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, channel); + + return channel; + +error2: + free(channel); +error1: + peer->cb_free(peer); + + return NULL; +} + +void red_channel_destroy(RedChannel *channel) +{ + if (!channel) { + return; + } + red_channel_pipe_clear(channel); + channel->core->set_file_handlers(channel->core, channel->peer->socket, + NULL, NULL, NULL); + channel->peer->cb_free(channel->peer); + free(channel); +} + +void red_channel_shutdown(RedChannel *channel) +{ + red_printf(""); + if (!channel->peer->shutdown) { + channel->core->set_file_handlers(channel->core, channel->peer->socket, + red_channel_receive, NULL, channel); + red_channel_pipe_clear(channel); + shutdown(channel->peer->socket, SHUT_RDWR); + channel->peer->shutdown = TRUE; + } +} + +void red_channel_init_outgoing_messages_window(RedChannel *channel) +{ + channel->ack_data.messages_window = 0; + red_channel_push(channel); +} + +int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg) +{ + switch (header->type) { + case REDC_ACK_SYNC: + if (header->size != sizeof(uint32_t)) { + red_printf("bad message size"); + return FALSE; + } + channel->ack_data.client_generation = *(uint32_t *)(msg); + break; + case REDC_ACK: + if (channel->ack_data.client_generation == channel->ack_data.generation) { + channel->ack_data.messages_window -= CLIENT_ACK_WINDOW; + red_channel_push(channel); + } + break; + default: + red_printf("invalid message type %u", header->type); + return FALSE; + } + return TRUE; +} + +static void red_channel_receive(void *data) +{ + RedChannel *channel = (RedChannel *)data; + red_peer_handle_incoming(channel->peer, &channel->incoming); +} + +static void inline __red_channel_add_buf(RedChannel *channel, void *data, uint32_t size) +{ + int pos = channel->send_data.n_bufs++; + ASSERT(pos < MAX_SEND_BUFS); + channel->send_data.bufs[pos].size = size; + channel->send_data.bufs[pos].data = data; +} + +void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size) +{ + __red_channel_add_buf(channel, data, size); + channel->send_data.header.size += size; +} + +void red_channel_reset_send_data(RedChannel *channel) +{ + channel->send_data.n_bufs = 0; + channel->send_data.header.size = 0; + channel->send_data.header.sub_list = 0; + ++channel->send_data.header.serial; + __red_channel_add_buf(channel, (void *)&channel->send_data.header, sizeof(RedDataHeader)); +} + +void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item) +{ + channel->send_data.header.type = msg_type; + channel->send_data.item = item; +} + +static inline void red_channel_fill_iovec(RedChannel *channel, struct iovec *vec, int *vec_size) +{ + BufDescriptor *buf = channel->send_data.bufs + channel->send_data.not_sent_buf_head; + ASSERT(channel->send_data.not_sent_buf_head < channel->send_data.n_bufs); + *vec_size = 0; + do { + vec[*vec_size].iov_base = buf->data; + vec[*vec_size].iov_len = buf->size; + (*vec_size)++; + buf++; + channel->send_data.not_sent_buf_head++; + } while (((*vec_size) < MAX_SEND_VEC) && + (channel->send_data.not_sent_buf_head != channel->send_data.n_bufs)); +} + +static void red_channel_send(RedChannel *channel) +{ + red_peer_handle_outgoing(channel->peer, &channel->outgoing); +} + +void red_channel_begin_send_massage(RedChannel *channel) +{ + channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader); + channel->ack_data.messages_window++; + red_channel_send(channel); +} + +static void red_channel_push(RedChannel *channel) +{ + PipeItem *pipe_item; + + if (!channel->during_send) { + channel->during_send = TRUE; + } else { + return; + } + + if (channel->send_data.blocked) { + red_channel_send(channel); + } + + while ((pipe_item = red_channel_pipe_get(channel))) { + channel->send_item(channel, pipe_item); + } + channel->during_send = FALSE; +} + +static void red_channel_opaque_push(void *data) +{ + red_channel_push((RedChannel *)data); +} + +uint64_t red_channel_get_message_serial(RedChannel *channel) +{ + return channel->send_data.header.serial; +} + +void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type) +{ + ring_item_init(&item->link); + item->type = type; +} + +void red_channel_pipe_add(RedChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->pipe_size++; + ring_add(&channel->pipe, &item->link); + + red_channel_push(channel); +} + +int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item) +{ + return ring_item_is_linked(&item->link); +} + +void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item) +{ + ring_remove(&item->link); +} + +void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item) +{ + ASSERT(channel); + channel->pipe_size++; + ring_add_before(&item->link, &channel->pipe); + + red_channel_push(channel); +} + +void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type) +{ + PipeItem *item = malloc(sizeof(*item)); + if (!item) { + red_error("malloc failed"); + } + red_channel_pipe_item_init(channel, item, pipe_item_type); + red_channel_pipe_add(channel, item); + + red_channel_push(channel); +} + +static PipeItem *red_channel_pipe_get(RedChannel *channel) +{ + PipeItem *item; + + if (!channel || channel->send_data.blocked || + (channel->handle_acks && (channel->ack_data.messages_window > CLIENT_ACK_WINDOW * 2)) || + !(item = (PipeItem *)ring_get_tail(&channel->pipe))) { + return NULL; + } + + --channel->pipe_size; + ring_remove(&item->link); + return item; +} + +static void red_channel_pipe_clear(RedChannel *channel) +{ + PipeItem *item; + if (channel->send_data.item) { + channel->release_item(channel, channel->send_data.item, TRUE); + } + + while ((item = (PipeItem *)ring_get_head(&channel->pipe))) { + ring_remove(&item->link); + channel->release_item(channel, item, FALSE); + } +} + diff --git a/server/red_channel.h b/server/red_channel.h new file mode 100644 index 00000000..1096ba70 --- /dev/null +++ b/server/red_channel.h @@ -0,0 +1,182 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#ifndef _H_RED_CHANNEL +#define _H_RED_CHANNEL + +#include "red_common.h" +#include "reds.h" +#include "vd_interface.h" +#include "ring.h" + +#define MAX_SEND_BUFS 1000 +#define MAX_SEND_VEC 50 +#define CLIENT_ACK_WINDOW 20 + +/* Basic interface for channels, without using the RedChannel interface. + The intention is to move towards one channel interface gradually. + At the final stage, this interface shouldn't be exposed. Only RedChannel will use it. */ + +typedef int (*handle_message_proc)(void *opaque, + RedDataHeader *header, uint8_t *msg); +typedef uint8_t *(*alloc_msg_recv_buf_proc)(void *opaque, RedDataHeader *msg_header); +typedef void (*release_msg_recv_buf_proc)(void *opaque, + RedDataHeader *msg_header, uint8_t *msg); +typedef void (*on_incoming_error_proc)(void *opaque); + +typedef struct IncomingHandler { + void *opaque; + RedDataHeader header; + uint32_t header_pos; + uint8_t *msg; // data of the msg following the header. allocated by alloc_msg_buf. + uint32_t msg_pos; + handle_message_proc handle_message; + alloc_msg_recv_buf_proc alloc_msg_buf; + on_incoming_error_proc on_error; // recv error or handle_message error + release_msg_recv_buf_proc release_msg_buf; // for errors +} IncomingHandler; + +typedef int (*get_outgoing_msg_size_proc)(void *opaque); +typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size); +typedef void (*on_outgoing_error_proc)(void *opaque); +typedef void (*on_outgoing_block_proc)(void *opaque); +typedef void (*on_outgoing_msg_done_proc)(void *opaque); +typedef struct OutgoingHandler { + void *opaque; + struct iovec vec_buf[MAX_SEND_VEC]; + int vec_size; + struct iovec *vec; + int pos; + int size; + get_outgoing_msg_size_proc get_msg_size; + prepare_outgoing_proc prepare; + on_outgoing_error_proc on_error; + on_outgoing_block_proc on_block; + on_outgoing_msg_done_proc on_msg_done; +} OutgoingHandler; + +/* Red Channel interface */ + +typedef struct BufDescriptor { + uint32_t size; + uint8_t *data; +} BufDescriptor; + +typedef struct PipeItem { + RingItem link; + int type; +} PipeItem; + +typedef struct RedChannel RedChannel; + +typedef uint8_t *(*channel_alloc_msg_recv_buf_proc)(RedChannel *channel, + RedDataHeader *msg_header); +typedef int (*channel_handle_message_proc)(RedChannel *channel, + RedDataHeader *header, uint8_t *msg); +typedef void (*channel_release_msg_recv_buf_proc)(RedChannel *channel, + RedDataHeader *msg_header, uint8_t *msg); +typedef void (*channel_disconnect_proc)(RedChannel *channel); +typedef int (*channel_configure_socket_proc)(RedChannel *channel); +typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item); +typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, + PipeItem *item, int item_pushed); + +struct RedChannel { + RedsStreamContext *peer; + CoreInterface *core; + int migrate; + int handle_acks; + + struct { + uint32_t generation; + uint32_t client_generation; + uint32_t messages_window; + } ack_data; + + Ring pipe; + uint32_t pipe_size; + + struct { + RedDataHeader header; + union { + RedSetAck ack; + RedMigrate migrate; + } u; + uint32_t n_bufs; + BufDescriptor bufs[MAX_SEND_BUFS]; + uint32_t size; + uint32_t not_sent_buf_head; + + PipeItem *item; + int blocked; + } send_data; + + OutgoingHandler outgoing; + IncomingHandler incoming; + + channel_disconnect_proc disconnect; + channel_send_pipe_item_proc send_item; + channel_release_pipe_item_proc release_item; + + int during_send; +}; + +/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't + explicitly destroy the channel */ +RedChannel *red_channel_create(int size, RedsStreamContext *peer, CoreInterface *core, + int migrate, int handle_acks, + channel_configure_socket_proc config_socket, + channel_disconnect_proc disconnect, + channel_handle_message_proc handle_message, + channel_alloc_msg_recv_buf_proc alloc_recv_buf, + channel_release_msg_recv_buf_proc release_recv_buf, + channel_send_pipe_item_proc send_item, + channel_release_pipe_item_proc release_item); + +void red_channel_destroy(RedChannel *channel); + +void red_channel_shutdown(RedChannel *channel); +/* should be called when a new channel is ready to send messages */ +void red_channel_init_outgoing_messages_window(RedChannel *channel); + +/* handles general channel msgs from the client */ +int red_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg); + +/* when preparing send_data: should call reset, then init and then add_buf per buffer that is + being sent */ +void red_channel_reset_send_data(RedChannel *channel); +void red_channel_init_send_data(RedChannel *channel, uint16_t msg_type, PipeItem *item); +void red_channel_add_buf(RedChannel *channel, void *data, uint32_t size); + +uint64_t red_channel_get_message_serial(RedChannel *channel); + +/* when sending a msg. should first call red_channel_begin_send_massage */ +void red_channel_begin_send_massage(RedChannel *channel); + +void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type); +void red_channel_pipe_add(RedChannel *channel, PipeItem *item); +int red_channel_pipe_item_is_linked(RedChannel *channel, PipeItem *item); +void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item); +void red_channel_pipe_add_tail(RedChannel *channel, PipeItem *item); +/* for types that use this routine -> the pipe item should be freed */ +void red_channel_pipe_add_type(RedChannel *channel, int pipe_item_type); + +#endif diff --git a/server/red_tunnel_worker.c b/server/red_tunnel_worker.c new file mode 100644 index 00000000..e4cb2172 --- /dev/null +++ b/server/red_tunnel_worker.c @@ -0,0 +1,3510 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#include <stdio.h> +#include <stdint.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <fcntl.h> +#include <unistd.h> +#include <errno.h> +#include "red_tunnel_worker.h" +#include "red_common.h" +#include "red.h" +#include "reds.h" +#include "net_slirp.h" +#include "red_channel.h" + + +//#define DEBUG_NETWORK + +#ifdef DEBUG_NETWORK +#define PRINT_SCKT(sckt) red_printf("TUNNEL_DBG SOCKET(connection_id=%d port=%d, service=%d)",\ + sckt->connection_id, ntohs(sckt->local_port), \ + sckt->far_service->id) +#endif + +#define MAX_SOCKETS_NUM 20 + +#define MAX_SOCKET_DATA_SIZE (1024 * 2) + +#define SOCKET_WINDOW_SIZE 80 +#define SOCKET_TOKENS_TO_SEND 20 +#define SOCKET_TOKENS_TO_SEND_FOR_PROCESS 5 // sent in case the all the tokens were used by + // the client but they weren't consumed by slirp + // due to missing data for processing them and + // turning them into 'ready chunks' + +/* the number of buffer might exceed the window size when the analysis of the buffers in the + process queue need more data in order to be able to move them to the ready queue */ +#define MAX_SOCKET_IN_BUFFERS (int)(SOCKET_WINDOW_SIZE * 1.5) +#define MAX_SOCKET_OUT_BUFFERS (int)(SOCKET_WINDOW_SIZE * 1.5) + +#define CONTROL_MSG_RECV_BUF_SIZE 1024 + +typedef struct TunnelWorker TunnelWorker; + +enum { + PIPE_ITEM_TYPE_SET_ACK, + PIPE_ITEM_TYPE_MIGRATE, + PIPE_ITEM_TYPE_MIGRATE_DATA, + PIPE_ITEM_TYPE_TUNNEL_INIT, + PIPE_ITEM_TYPE_SERVICE_IP_MAP, + PIPE_ITEM_TYPE_SOCKET_OPEN, + PIPE_ITEM_TYPE_SOCKET_FIN, + PIPE_ITEM_TYPE_SOCKET_CLOSE, + PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK, + PIPE_ITEM_TYPE_SOCKET_DATA, + PIPE_ITEM_TYPE_SOCKET_TOKEN, +}; + +typedef struct RawTunneledBuffer RawTunneledBuffer; +typedef void (*release_tunneled_buffer_proc_t)(RawTunneledBuffer *buf); + +struct RawTunneledBuffer { + uint8_t *data; + int size; + int max_size; + int refs; + RawTunneledBuffer *next; + void *usr_opaque; + release_tunneled_buffer_proc_t release_proc; +}; + +static inline RawTunneledBuffer *tunneled_buffer_ref(RawTunneledBuffer *buf) +{ + buf->refs++; + return buf; +} + +static inline void tunneled_buffer_unref(RawTunneledBuffer *buf) +{ + if (!(--buf->refs)) { + buf->release_proc(buf); + } +} + +typedef struct RedSocket RedSocket; + +/* data received from the quest through slirp */ +typedef struct RedSocketRawSndBuf { + RawTunneledBuffer base; + uint8_t buf[MAX_SOCKET_DATA_SIZE]; +} RedSocketRawSndBuf; + +/* data received from the client */ +typedef struct RedSocketRawRcvBuf { + RawTunneledBuffer base; + uint8_t buf[MAX_SOCKET_DATA_SIZE + sizeof(RedcTunnelSocketData)]; + RedcTunnelSocketData *msg_info; +} RedSocketRawRcvBuf; + +typedef struct ReadyTunneledChunk ReadyTunneledChunk; + +enum { + READY_TUNNELED_CHUNK_TYPE_ORIG, + READY_TUNNELED_CHUNK_TYPE_SUB, // substitution +}; + + +/* A chunk of data from a RawTunneledBuffer (or a substitution for a part of it) + that was processed and is ready to be consumed (by slirp or by the client). + Each chunk has a reference to the RawTunneledBuffer it + was originated from. When all the reference chunks of one buffer are consumed (i.e. they are out + of the ready queue and they unrefed the buffer), the buffer is released */ +struct ReadyTunneledChunk { + uint32_t type; + RawTunneledBuffer *origin; + uint8_t *data; // if type == READY_TUNNELED_CHUNK_TYPE_ORIG, it points + // directly to the tunneled data. Otherwise, it is a + // newly allocated chunk of data + // that should be freed after its consumption. + int size; + ReadyTunneledChunk *next; +}; + +typedef struct ReadyTunneledChunkQueue { + ReadyTunneledChunk *head; + ReadyTunneledChunk *tail; + uint32_t offset; // first byte in the ready queue that wasn't consumed +} ReadyTunneledChunkQueue; + +static void ready_queue_add_orig_chunk(ReadyTunneledChunkQueue *queue, RawTunneledBuffer *origin, + uint8_t *data, int size); +static void ready_queue_pop_chunk(ReadyTunneledChunkQueue *queue); + + +enum { + PROCESS_DIRECTION_TYPE_REQUEST, // guest request + PROCESS_DIRECTION_TYPE_REPLY, // reply from the service in the client LAN +}; + +typedef struct TunneledBufferProcessQueue TunneledBufferProcessQueue; + +typedef RawTunneledBuffer *(*alloc_tunneled_buffer_proc_t)(TunneledBufferProcessQueue *queue); +/* processing the data. Notice that the buffers can be empty of + * data (see RedSocketRestoreTokensBuf) */ +typedef void (*analyze_new_data_proc_t)(TunneledBufferProcessQueue *queue, + RawTunneledBuffer *start_buf, int offset, int len); + +// migrating specific queue data (not the buffers themselves) +typedef int (*get_migrate_data_proc_t)(TunneledBufferProcessQueue *queue, void **migrate_data); +typedef void (*release_migrate_data_proc_t)(TunneledBufferProcessQueue *queue, void *migrate_data); +typedef void (*restore_proc_t)(TunneledBufferProcessQueue *queue, uint8_t *migrate_data); + +struct TunneledBufferProcessQueue { + uint32_t service_type; // which kind of processing is performed. + uint32_t direction; // reply/request + RawTunneledBuffer *head; + RawTunneledBuffer *tail; + int head_offset; + + ReadyTunneledChunkQueue *ready_chunks_queue; // the queue to push the post-process data to + + void *usr_opaque; + + alloc_tunneled_buffer_proc_t alloc_buf_proc; // for appending data to the queue + analyze_new_data_proc_t analysis_proc; // service dependent. should create the + // post-process chunks and remove buffers + // from the queue. + get_migrate_data_proc_t get_migrate_data_proc; + release_migrate_data_proc_t release_migrate_data_proc; + restore_proc_t restore_proc; +}; + +/* push and append routines are the ones that call to the analysis_proc */ +static void process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf); +static void process_queue_append(TunneledBufferProcessQueue *queue, uint8_t *data, size_t size); +static void process_queue_pop(TunneledBufferProcessQueue *queue); + +static void process_queue_clear(TunneledBufferProcessQueue *queue); + + +typedef struct RedSocketOutData { + // Note that this pipe items can appear only once in the pipe + PipeItem status_pipe_item; + PipeItem data_pipe_item; + PipeItem token_pipe_item; + + TunneledBufferProcessQueue *process_queue; // service type dependent + ReadyTunneledChunkQueue ready_chunks_queue; + ReadyTunneledChunk *push_tail; // last chunk in the ready queue that was pushed + uint32_t push_tail_size; // the subset of the push_tail that was sent + + uint32_t num_buffers; // total count of buffers in process_queue + references from ready queue + uint32_t data_size; // total size of data that is waiting to be sent. + + uint32_t num_tokens; + uint32_t window_size; +} RedSocketOutData; + +typedef struct RedSocketInData { + TunneledBufferProcessQueue *process_queue; // service type dependent + ReadyTunneledChunkQueue ready_chunks_queue; + + uint32_t num_buffers; + + int32_t num_tokens; // No. tokens conusmed by slirp since the last token msg sent to the + // client. can be negative if we loaned some to the client (when the + // ready queue is empty) + uint32_t client_total_num_tokens; +} RedSocketInData; + +typedef enum { + SLIRP_SCKT_STATUS_OPEN, + SLIRP_SCKT_STATUS_SHUTDOWN_SEND, // FIN was issued from guest + SLIRP_SCKT_STATUS_SHUTDOWN_RECV, // Triggered when FIN is received from client + SLIRP_SCKT_STATUS_DELAY_ABORT, // when out buffers overflow, we wait for client to + // close before we close slirp socket. see + //tunnel_socket_force_close + SLIRP_SCKT_STATUS_WAIT_CLOSE, // when shutdown_send was called after shut_recv + // and vice versa + SLIRP_SCKT_STATUS_CLOSED, +} SlirpSocketStatus; + +typedef enum { + CLIENT_SCKT_STATUS_WAIT_OPEN, + CLIENT_SCKT_STATUS_OPEN, + CLIENT_SCKT_STATUS_SHUTDOWN_SEND, // FIN was issued from client + CLIENT_SCKT_STATUS_CLOSED, +} ClientSocketStatus; + +typedef struct TunnelService TunnelService; +struct RedSocket { + int allocated; + + TunnelWorker *worker; + + uint16_t connection_id; + + uint16_t local_port; + TunnelService *far_service; + + ClientSocketStatus client_status; + SlirpSocketStatus slirp_status; + + int pushed_close; + int client_waits_close_ack; + + SlirpSocket *slirp_sckt; + + RedSocketOutData out_data; + RedSocketInData in_data; + + int in_slirp_send; + + uint32_t mig_client_status_msg; // the last status change msg that was received from + //the client during migration, and thus was unhandled. + // It is 0 if the status didn't change during migration + uint32_t mig_open_ack_tokens; // if REDC_TUNNEL_SOCKET_OPEN_ACK was received during + // migration, we store the tokens we received in the + // msg. +}; + +/********** managing send buffers ***********/ +static RawTunneledBuffer *tunnel_socket_alloc_snd_buf(RedSocket *sckt); +static inline RedSocketRawSndBuf *__tunnel_worker_alloc_socket_snd_buf(TunnelWorker *worker); +static RawTunneledBuffer *process_queue_alloc_snd_tunneled_buffer( + TunneledBufferProcessQueue *queue); + +static void tunnel_socket_free_snd_buf(RedSocket *sckt, RedSocketRawSndBuf *snd_buf); +static inline void __tunnel_worker_free_socket_snd_buf(TunnelWorker *worker, + RedSocketRawSndBuf *snd_buf); +static void snd_tunnled_buffer_release(RawTunneledBuffer *buf); + +/********** managing recv buffers ***********/ +// receive buffers are allocated before we know to which socket they are directed. +static inline void tunnel_socket_assign_rcv_buf(RedSocket *sckt, + RedSocketRawRcvBuf *recv_buf, int buf_size); +static inline RedSocketRawRcvBuf *__tunnel_worker_alloc_socket_rcv_buf(TunnelWorker *worker); + +static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_buf); +static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker, + RedSocketRawRcvBuf *rcv_buf); +static void rcv_tunnled_buffer_release(RawTunneledBuffer *buf); + +/********* managing buffers' queues ***********/ + +static void process_queue_simple_analysis(TunneledBufferProcessQueue *queue, + RawTunneledBuffer *start_last_added, + int offset, int len); +static inline TunneledBufferProcessQueue *__tunnel_socket_alloc_simple_process_queue( + RedSocket *sckt, + uint32_t service_type, + uint32_t direction_type); +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_request_process_queue( + RedSocket *sckt); +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_reply_process_queue( + RedSocket *sckt); +static void free_simple_process_queue(TunneledBufferProcessQueue *queue); + +typedef struct ServiceCallback { + /* allocating the the queue & setting the analysis proc by service type */ + TunneledBufferProcessQueue *(*alloc_process_queue)(RedSocket * sckt); + void (*free_process_queue)(TunneledBufferProcessQueue *queue); +} ServiceCallback; + +/* Callbacks for process queue manipulation according to the service type and + the direction of the data. + The access is performed by [service_type][direction] */ +static const ServiceCallback SERVICES_CALLBACKS[3][2] = { + {{NULL, NULL}, + {NULL, NULL}}, + {{tunnel_socket_alloc_simple_print_request_process_queue, free_simple_process_queue}, + {tunnel_socket_alloc_simple_print_reply_process_queue, free_simple_process_queue}}, + {{tunnel_socket_alloc_simple_print_request_process_queue, free_simple_process_queue}, + {tunnel_socket_alloc_simple_print_reply_process_queue, free_simple_process_queue}} +}; + +/**************************************************** +* Migration data +****************************************************/ +typedef struct TunnelChannel TunnelChannel; + +#define TUNNEL_MIGRATE_DATA_MAGIC (*(uint32_t *)"TMDA") +#define TUNNEL_MIGRATE_DATA_VERSION 1 + +#define TUNNEL_MIGRATE_NULL_OFFSET = ~0; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketOutData { + uint32_t num_tokens; + uint32_t window_size; + + uint32_t process_buf_size; + uint32_t process_buf; + + uint32_t process_queue_size; + uint32_t process_queue; + + uint32_t ready_buf_size; + uint32_t ready_buf; +} TunnelMigrateSocketOutData; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketInData { + int32_t num_tokens; + uint32_t client_total_num_tokens; + + uint32_t process_buf_size; + uint32_t process_buf; + + uint32_t process_queue_size; + uint32_t process_queue; + + uint32_t ready_buf_size; + uint32_t ready_buf; +} TunnelMigrateSocketInData; + + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocket { + uint16_t connection_id; + uint16_t local_port; + uint32_t far_service_id; + + uint16_t client_status; + uint16_t slirp_status; + + uint8_t pushed_close; + uint8_t client_waits_close_ack; + + TunnelMigrateSocketOutData out_data; + TunnelMigrateSocketInData in_data; + + uint32_t slirp_sckt; + + uint32_t mig_client_status_msg; + uint32_t mig_open_ack_tokens; +} TunnelMigrateSocket; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateSocketList { + uint16_t num_sockets; + uint32_t sockets[0]; // offsets in TunnelMigrateData.data to TunnelMigrateSocket +} TunnelMigrateSocketList; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateService { + uint32_t type; + uint32_t id; + uint32_t group; + uint32_t port; + uint32_t name; + uint32_t description; + uint8_t virt_ip[4]; +} TunnelMigrateService; + +typedef struct __attribute__ ((__packed__)) TunnelMigratePrintService { + TunnelMigrateService base; + uint8_t ip[4]; +} TunnelMigratePrintService; + +typedef struct __attribute__ ((__packed__)) TunnelMigrateServicesList { + uint32_t num_services; + uint32_t services[0]; +} TunnelMigrateServicesList; + +//todo: add ack_generation +typedef struct __attribute__ ((__packed__)) TunnelMigrateData { + uint32_t magic; + uint32_t version; + uint64_t message_serial; + + uint32_t slirp_state; // offset in data to slirp state + uint32_t sockets_list; // offset in data to TunnelMigrateSocketList + uint32_t services_list; + + uint8_t data[0]; +} TunnelMigrateData; + +typedef struct TunnelMigrateSocketItem { + RedSocket *socket; + TunnelMigrateSocket mig_socket; + void *out_process_queue; + void *in_process_queue; // queue data specific for service + void *slirp_socket; + uint32_t slirp_socket_size; +} TunnelMigrateSocketItem; + +typedef struct TunnelMigrateServiceItem { + TunnelService *service; + union { + TunnelMigrateService generic_service; + TunnelMigratePrintService print_service; + } u; +} TunnelMigrateServiceItem; + +typedef struct TunnelMigrateItem { + PipeItem base; + + void *slirp_state; + uint64_t slirp_state_size; + + TunnelMigrateServicesList *services_list; + uint32_t services_list_size; + + TunnelMigrateServiceItem *services; + + TunnelMigrateSocketList *sockets_list; + uint32_t sockets_list_size; + + TunnelMigrateSocketItem sockets_data[MAX_SOCKETS_NUM]; +} TunnelMigrateItem; + +static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel); + +/*******************************************************************************************/ + +/* use for signaling that 1) subroutines failed 2)routines in the interface for slirp + failed (which triggred from a call to slirp) */ +#define SET_TUNNEL_ERROR(channel,format, ...) { \ + channel->tunnel_error = TRUE; \ + red_printf(format, ## __VA_ARGS__); \ +} + +/* should be checked after each subroutine that may cause error or fter calls to slirp routines */ +#define CHECK_TUNNEL_ERROR(channel) (channel->tunnel_error) + +struct TunnelChannel { + RedChannel base; + TunnelWorker *worker; + int mig_inprogress; + int expect_migrate_mark; + int expect_migrate_data; + + int tunnel_error; + + struct { + union { + RedTunnelInit init; + RedTunnelServiceIpMap service_ip; + RedTunnelSocketOpen socket_open; + RedTunnelSocketFin socket_fin; + RedTunnelSocketClose socket_close; + RedTunnelSocketClosedAck socket_close_ack; + RedTunnelSocketData socket_data; + RedTunnelSocketTokens socket_token; + TunnelMigrateData migrate_data; + } u; + } send_data; + + uint8_t control_rcv_buf[CONTROL_MSG_RECV_BUF_SIZE]; +}; + +typedef struct RedSlirpNetworkInterface { + SlirpUsrNetworkInterface base; + TunnelWorker *worker; +} RedSlirpNetworkInterface; + +struct TunnelService { + RingItem ring_item; + PipeItem pipe_item; + uint32_t type; + uint32_t id; + uint32_t group; + uint32_t port; + char *name; + char *description; + + struct in_addr virt_ip; +}; + +typedef struct TunnelPrintService { + TunnelService base; + uint8_t ip[4]; +} TunnelPrintService; + +struct TunnelWorker { + Channel channel_interface; // for reds + TunnelChannel *channel; + + CoreInterface *core_interface; + NetWireInterface *vlan_interface; + RedSlirpNetworkInterface tunnel_interface; + RedSlirpNetworkInterface null_interface; + + RedSocket sockets[MAX_SOCKETS_NUM]; // the sockets are in the worker and not + // in the channel since the slirp sockets + // can be still alive (but during close) after + // the channel was disconnected + + int num_sockets; + + RedSocketRawSndBuf *free_snd_buf; + RedSocketRawRcvBuf *free_rcv_buf; + + Ring services; + int num_services; +}; + + +/********************************************************************* + * Tunnel interface + *********************************************************************/ +static void tunnel_channel_disconnect(RedChannel *channel); + +/* networking interface for slirp */ +static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface); +static void qemu_output(SlirpUsrNetworkInterface *usr_interface, const uint8_t *pkt, int pkt_len); +static int null_tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s); +static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s); +static void null_tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque); +static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque); +static int null_tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent); +static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent); +static int null_tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len); +static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len); +static void null_tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); +static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); +static void null_tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); +static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque); + +static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface, + timer_proc_t proc, void *opaque); +static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms); + + +/* reds interface */ +static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration, + int num_common_caps, uint32_t *common_caps, int num_caps, + uint32_t *caps); +static void handle_tunnel_channel_shutdown(struct Channel *channel); +static void handle_tunnel_channel_migrate(struct Channel *channel); + + +static void tunnel_shutdown(TunnelWorker *worker) +{ + int i; + red_printf(""); + /* shutdown input from channel */ + if (worker->channel) { + red_channel_shutdown(&worker->channel->base); + } + + /* shutdown socket pipe items */ + for (i = 0; i < MAX_SOCKETS_NUM; i++) { + RedSocket *sckt = worker->sockets + i; + if (sckt->allocated) { + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + sckt->client_waits_close_ack = FALSE; + } + } + + /* shutdown input from slirp */ + net_slirp_set_net_interface(&worker->null_interface.base); +} + +/***************************************************************** +* Managing raw tunneled buffers storage +******************************************************************/ + +/********** send buffers ***********/ +static RawTunneledBuffer *tunnel_socket_alloc_snd_buf(RedSocket *sckt) +{ + RedSocketRawSndBuf *ret = __tunnel_worker_alloc_socket_snd_buf(sckt->worker); + ret->base.usr_opaque = sckt; + ret->base.release_proc = snd_tunnled_buffer_release; + sckt->out_data.num_buffers++; + return &ret->base; +} + +static inline RedSocketRawSndBuf *__tunnel_worker_alloc_socket_snd_buf(TunnelWorker *worker) +{ + RedSocketRawSndBuf *ret; + if (worker->free_snd_buf) { + ret = worker->free_snd_buf; + worker->free_snd_buf = (RedSocketRawSndBuf *)worker->free_snd_buf->base.next; + } else { + ret = (RedSocketRawSndBuf *)malloc(sizeof(*ret)); + if (!ret) { + red_error("malloc of send buf failed"); + } + } + ret->base.data = ret->buf; + ret->base.size = 0; + ret->base.max_size = MAX_SOCKET_DATA_SIZE; + ret->base.usr_opaque = NULL; + ret->base.refs = 1; + ret->base.next = NULL; + + return ret; +} + +static void tunnel_socket_free_snd_buf(RedSocket *sckt, RedSocketRawSndBuf *snd_buf) +{ + sckt->out_data.num_buffers--; + __tunnel_worker_free_socket_snd_buf(sckt->worker, snd_buf); +} + +static inline void __tunnel_worker_free_socket_snd_buf(TunnelWorker *worker, + RedSocketRawSndBuf *snd_buf) +{ + snd_buf->base.size = 0; + snd_buf->base.next = &worker->free_snd_buf->base; + worker->free_snd_buf = snd_buf; +} + +static RawTunneledBuffer *process_queue_alloc_snd_tunneled_buffer(TunneledBufferProcessQueue *queue) +{ + return tunnel_socket_alloc_snd_buf((RedSocket *)queue->usr_opaque); +} + +static void snd_tunnled_buffer_release(RawTunneledBuffer *buf) +{ + tunnel_socket_free_snd_buf((RedSocket *)buf->usr_opaque, (RedSocketRawSndBuf *)buf); +} + +/********** recv buffers ***********/ + +static inline void tunnel_socket_assign_rcv_buf(RedSocket *sckt, + RedSocketRawRcvBuf *recv_buf, int buf_size) +{ + ASSERT(!recv_buf->base.usr_opaque); + // the rcv buffer was allocated by tunnel_channel_alloc_msg_rcv_buf + // before we could know which of the sockets it belongs to, so the + // assignment to the socket is performed now + recv_buf->base.size = buf_size; + recv_buf->base.usr_opaque = sckt; + recv_buf->base.release_proc = rcv_tunnled_buffer_release; + sckt->in_data.num_buffers++; + process_queue_push(sckt->in_data.process_queue, &recv_buf->base); +} + +static inline RedSocketRawRcvBuf *__tunnel_worker_alloc_socket_rcv_buf(TunnelWorker *worker) +{ + RedSocketRawRcvBuf *ret; + if (worker->free_rcv_buf) { + ret = worker->free_rcv_buf; + worker->free_rcv_buf = (RedSocketRawRcvBuf *)worker->free_rcv_buf->base.next; + } else { + ret = (RedSocketRawRcvBuf *)malloc(sizeof(*ret)); + if (!ret) { + red_error("malloc of send buf failed"); + } + } + ret->msg_info = (RedcTunnelSocketData *)ret->buf; + ret->base.usr_opaque = NULL; + ret->base.data = ret->msg_info->data; + ret->base.size = 0; + ret->base.max_size = MAX_SOCKET_DATA_SIZE; + ret->base.refs = 1; + ret->base.next = NULL; + + return ret; +} + +static inline void __process_rcv_buf_tokens(TunnelChannel *channel, RedSocket *sckt) +{ + if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) || red_channel_pipe_item_is_linked( + &channel->base, &sckt->out_data.token_pipe_item) || channel->mig_inprogress) { + return; + } + + if ((sckt->in_data.num_tokens >= SOCKET_TOKENS_TO_SEND) || + (!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head)) { + sckt->out_data.token_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_TOKEN; + red_channel_pipe_add(&channel->base, &sckt->out_data.token_pipe_item); + } +} + +static void tunnel_socket_free_rcv_buf(RedSocket *sckt, RedSocketRawRcvBuf *rcv_buf) +{ + --sckt->in_data.num_buffers; + __tunnel_worker_free_socket_rcv_buf(sckt->worker, rcv_buf); + ++sckt->in_data.num_tokens; + __process_rcv_buf_tokens(sckt->worker->channel, sckt); +} + +static inline void __tunnel_worker_free_socket_rcv_buf(TunnelWorker *worker, + RedSocketRawRcvBuf *rcv_buf) +{ + rcv_buf->base.next = &worker->free_rcv_buf->base; + worker->free_rcv_buf = rcv_buf; +} + +static void rcv_tunnled_buffer_release(RawTunneledBuffer *buf) +{ + tunnel_socket_free_rcv_buf((RedSocket *)buf->usr_opaque, + (RedSocketRawRcvBuf *)buf); +} + +/************************ +* Process & Ready queue +*************************/ + +static inline void __process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf) +{ + buf->next = NULL; + if (!queue->head) { + queue->head = buf; + queue->tail = buf; + } else { + queue->tail->next = buf; + queue->tail = buf; + } +} + +static void process_queue_push(TunneledBufferProcessQueue *queue, RawTunneledBuffer *buf) +{ + __process_queue_push(queue, buf); + queue->analysis_proc(queue, buf, 0, buf->size); +} + +static void process_queue_append(TunneledBufferProcessQueue *queue, uint8_t *data, size_t size) +{ + RawTunneledBuffer *start_buf = NULL; + int start_offset = 0; + int copied = 0; + + if (queue->tail) { + RawTunneledBuffer *buf = queue->tail; + int space = buf->max_size - buf->size; + if (space) { + int copy_count = MIN(size, space); + start_buf = buf; + start_offset = buf->size; + memcpy(buf->data + buf->size, data, copy_count); + copied += copy_count; + buf->size += copy_count; + } + } + + + while (copied < size) { + RawTunneledBuffer *buf = queue->alloc_buf_proc(queue); + int copy_count = MIN(size - copied, buf->max_size); + memcpy(buf->data, data + copied, copy_count); + copied += copy_count; + buf->size = copy_count; + + __process_queue_push(queue, buf); + + if (!start_buf) { + start_buf = buf; + start_offset = 0; + } + } + + queue->analysis_proc(queue, start_buf, start_offset, size); +} + +static void process_queue_pop(TunneledBufferProcessQueue *queue) +{ + RawTunneledBuffer *prev_head; + ASSERT(queue->head && queue->tail); + prev_head = queue->head; + queue->head = queue->head->next; + if (!queue->head) { + queue->tail = NULL; + } + + tunneled_buffer_unref(prev_head); +} + +static void process_queue_clear(TunneledBufferProcessQueue *queue) +{ + while (queue->head) { + process_queue_pop(queue); + } +} + +static void __ready_queue_push(ReadyTunneledChunkQueue *queue, ReadyTunneledChunk *chunk) +{ + chunk->next = NULL; + if (queue->tail) { + queue->tail->next = chunk; + queue->tail = chunk; + } else { + queue->head = chunk; + queue->tail = chunk; + } +} + +static void ready_queue_add_orig_chunk(ReadyTunneledChunkQueue *queue, RawTunneledBuffer *origin, + uint8_t *data, int size) +{ + ReadyTunneledChunk *chunk = malloc(sizeof(ReadyTunneledChunk)); + chunk->type = READY_TUNNELED_CHUNK_TYPE_ORIG; + chunk->origin = tunneled_buffer_ref(origin); + chunk->data = data; + chunk->size = size; + + __ready_queue_push(queue, chunk); +} + +static void ready_queue_pop_chunk(ReadyTunneledChunkQueue *queue) +{ + ReadyTunneledChunk *chunk = queue->head; + ASSERT(queue->head); + queue->head = queue->head->next; + + if (!queue->head) { + queue->tail = NULL; + } + + tunneled_buffer_unref(chunk->origin); + if (chunk->type != READY_TUNNELED_CHUNK_TYPE_ORIG) { + free(chunk->data); + } + free(chunk); +} + +static void ready_queue_clear(ReadyTunneledChunkQueue *queue) +{ + while (queue->head) { + ready_queue_pop_chunk(queue); + } +} + +static void process_queue_simple_analysis(TunneledBufferProcessQueue *queue, + RawTunneledBuffer *start_last_added, int offset, int len) +{ + ASSERT(offset == 0); + ASSERT(start_last_added == queue->head); + + while (queue->head) { + ready_queue_add_orig_chunk(queue->ready_chunks_queue, queue->head, queue->head->data, + queue->head->size); + process_queue_pop(queue); + } +} + +static int process_queue_simple_get_migrate_data(TunneledBufferProcessQueue *queue, + void **migrate_data) +{ + *migrate_data = NULL; + return 0; +} + +static void process_queue_simple_release_migrate_data(TunneledBufferProcessQueue *queue, + void *migrate_data) +{ + ASSERT(!migrate_data); +} + +static void process_queue_simple_restore(TunneledBufferProcessQueue *queue, uint8_t *migrate_data) +{ +} + +static inline TunneledBufferProcessQueue *__tunnel_socket_alloc_simple_process_queue( + RedSocket *sckt, + uint32_t service_type, + uint32_t direction_type) +{ + TunneledBufferProcessQueue *ret_queue = malloc(sizeof(TunneledBufferProcessQueue)); + memset(ret_queue, 0, sizeof(TunneledBufferProcessQueue)); + ret_queue->service_type = service_type; + ret_queue->direction = direction_type; + ret_queue->usr_opaque = sckt; + // NO need for allocations by the process queue when getting replies. The buffer is created + // when the msg is received + if (direction_type == PROCESS_DIRECTION_TYPE_REQUEST) { + ret_queue->alloc_buf_proc = process_queue_alloc_snd_tunneled_buffer; + ret_queue->ready_chunks_queue = &sckt->out_data.ready_chunks_queue; + } else { + ret_queue->ready_chunks_queue = &sckt->in_data.ready_chunks_queue; + } + + ret_queue->analysis_proc = process_queue_simple_analysis; + + ret_queue->get_migrate_data_proc = process_queue_simple_get_migrate_data; + ret_queue->release_migrate_data_proc = process_queue_simple_release_migrate_data; + ret_queue->restore_proc = process_queue_simple_restore; + return ret_queue; +} + +static void free_simple_process_queue(TunneledBufferProcessQueue *queue) +{ + process_queue_clear(queue); + free(queue); +} + +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_request_process_queue( + RedSocket *sckt) +{ + return __tunnel_socket_alloc_simple_process_queue(sckt, + RED_TUNNEL_SERVICE_TYPE_IPP, + PROCESS_DIRECTION_TYPE_REQUEST); +} + +static TunneledBufferProcessQueue *tunnel_socket_alloc_simple_print_reply_process_queue( + RedSocket *sckt) +{ + return __tunnel_socket_alloc_simple_process_queue(sckt, + RED_TUNNEL_SERVICE_TYPE_IPP, + PROCESS_DIRECTION_TYPE_REPLY); +} + +static void tunnel_send_packet(void *opaque_tunnel, const uint8_t *pkt, int pkt_len) +{ + TunnelWorker *worker = (TunnelWorker *)opaque_tunnel; + ASSERT(worker); + + if (worker->channel && worker->channel->base.migrate) { + return; // during migration and the tunnel state hasn't been restored yet. + } + + net_slirp_input(pkt, pkt_len); +} + +void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface) +{ + TunnelWorker *worker = (TunnelWorker *)malloc(sizeof(TunnelWorker)); + + if (!worker) { + red_error("malloc of tunnel worker failed"); + } + memset(worker, 0, sizeof(*worker)); + + worker->core_interface = core_interface; + worker->vlan_interface = vlan_interface; + + worker->tunnel_interface.base.slirp_can_output = qemu_can_output; + worker->tunnel_interface.base.slirp_output = qemu_output; + worker->tunnel_interface.base.connect = tunnel_socket_connect; + worker->tunnel_interface.base.send = tunnel_socket_send; + worker->tunnel_interface.base.recv = tunnel_socket_recv; + worker->tunnel_interface.base.close = tunnel_socket_close; + worker->tunnel_interface.base.shutdown_recv = tunnel_socket_shutdown_recv; + worker->tunnel_interface.base.shutdown_send = tunnel_socket_shutdown_send; + worker->tunnel_interface.base.create_timer = create_timer; + worker->tunnel_interface.base.arm_timer = arm_timer; + + worker->tunnel_interface.worker = worker; + + worker->null_interface.base.slirp_can_output = qemu_can_output; + worker->null_interface.base.slirp_output = qemu_output; + worker->null_interface.base.connect = null_tunnel_socket_connect; + worker->null_interface.base.send = null_tunnel_socket_send; + worker->null_interface.base.recv = null_tunnel_socket_recv; + worker->null_interface.base.close = null_tunnel_socket_close; + worker->null_interface.base.shutdown_recv = null_tunnel_socket_shutdown_recv; + worker->null_interface.base.shutdown_send = null_tunnel_socket_shutdown_send; + worker->null_interface.base.create_timer = create_timer; + worker->null_interface.base.arm_timer = arm_timer; + + worker->null_interface.worker = worker; + + worker->channel_interface.type = RED_CHANNEL_TUNNEL; + worker->channel_interface.id = 0; + worker->channel_interface.link = handle_tunnel_channel_link; + worker->channel_interface.shutdown = handle_tunnel_channel_shutdown; + worker->channel_interface.migrate = handle_tunnel_channel_migrate; + worker->channel_interface.data = worker; + + ring_init(&worker->services); + reds_register_channel(&worker->channel_interface); + + net_slirp_init(worker->vlan_interface->get_ip(worker->vlan_interface), + TRUE, + &worker->null_interface.base); + if (!vlan_interface->register_route_packet(vlan_interface, tunnel_send_packet, worker)) { + red_error("register route packet failed"); + } + return worker; +} + +/* returns the first service that has the same group id (NULL if not found) */ +static inline TunnelService *__tunnel_worker_find_service_of_group(TunnelWorker *worker, + uint32_t group) +{ + TunnelService *service; + for (service = (TunnelService *)ring_get_head(&worker->services); + service; + service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) { + if (service->group == group) { + return service; + } + } + + return NULL; +} + +static inline TunnelService *__tunnel_worker_add_service(TunnelWorker *worker, uint32_t size, + uint32_t type, uint32_t id, + uint32_t group, uint32_t port, + char *name, char *description, + struct in_addr *virt_ip) +{ + TunnelService *new_service = malloc(size); + + if (!new_service) { + red_error("malloc of TunnelService failed"); + } + memset(new_service, 0, size); + + if (!virt_ip) { + TunnelService *service_of_same_group; + if (!(service_of_same_group = __tunnel_worker_find_service_of_group(worker, group))) { + if (!net_slirp_allocate_virtual_ip(&new_service->virt_ip)) { + red_printf("failed to allocate virtual ip"); + free(new_service); + return NULL; + } + } else { + if (strcmp(name, service_of_same_group->name) == 0) { + new_service->virt_ip.s_addr = service_of_same_group->virt_ip.s_addr; + } else { + red_printf("inconsistent name for service group %d", group); + free(new_service); + return NULL; + } + } + } else { + new_service->virt_ip.s_addr = virt_ip->s_addr; + } + + ring_item_init(&new_service->ring_item); + new_service->type = type; + new_service->id = id; + new_service->group = group; + new_service->port = port; + + new_service->name = strdup(name); + new_service->description = strdup(description); + + ring_add(&worker->services, &new_service->ring_item); + worker->num_services++; + +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: ==>SERVICE ADDED: id=%d virt ip=%s port=%d name=%s desc=%s", + new_service->id, inet_ntoa(new_service->virt_ip), + new_service->port, new_service->name, new_service->description); +#endif + if (!virt_ip) { + new_service->pipe_item.type = PIPE_ITEM_TYPE_SERVICE_IP_MAP; + red_channel_pipe_add(&worker->channel->base, &new_service->pipe_item); + } + + return new_service; +} + +static TunnelService *tunnel_worker_add_service(TunnelWorker *worker, uint32_t size, + RedcTunnelAddGenericService *redc_service) +{ + return __tunnel_worker_add_service(worker, size, redc_service->type, + redc_service->id, redc_service->group, + redc_service->port, + (char *)(((uint8_t *)redc_service) + + redc_service->name), + (char *)(((uint8_t *)redc_service) + + redc_service->description), NULL); +} + +static inline void tunnel_worker_free_service(TunnelWorker *worker, TunnelService *service) +{ + ring_remove(&service->ring_item); + free(service->name); + free(service->description); + free(service); + worker->num_services--; +} + +static void tunnel_worker_free_print_service(TunnelWorker *worker, TunnelPrintService *service) +{ + tunnel_worker_free_service(worker, &service->base); +} + +static TunnelPrintService *tunnel_worker_add_print_service(TunnelWorker *worker, + RedcTunnelAddPrintService *redc_service) +{ + TunnelPrintService *service; + + service = (TunnelPrintService *)tunnel_worker_add_service(worker, sizeof(TunnelPrintService), + &redc_service->base); + + if (!service) { + return NULL; + } + + if (redc_service->ip.type == RED_TUNNEL_IP_TYPE_IPv4) { + memcpy(service->ip, redc_service->ip.data, sizeof(RedTunnelIPv4)); + } else { + red_printf("unexpected ip type=%d", redc_service->ip.type); + tunnel_worker_free_print_service(worker, service); + return NULL; + } +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: ==>PRINT SERVICE ADDED: ip=%d.%d.%d.%d", service->ip[0], + service->ip[1], service->ip[2], service->ip[3]); +#endif + return service; +} + +static int tunnel_channel_handle_service_add(TunnelChannel *channel, + RedcTunnelAddGenericService *service_msg) +{ + TunnelService *out_service = NULL; + if (service_msg->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + out_service = &tunnel_worker_add_print_service(channel->worker, + (RedcTunnelAddPrintService *) + service_msg)->base; + } else if (service_msg->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + out_service = tunnel_worker_add_service(channel->worker, sizeof(TunnelService), + service_msg); + } else { + red_printf("invalid service type"); + } + + free(service_msg); + return (out_service != NULL); +} + +static inline TunnelService *tunnel_worker_find_service_by_id(TunnelWorker *worker, uint32_t id) +{ + TunnelService *service; + for (service = (TunnelService *)ring_get_head(&worker->services); + service; + service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) { + if (service->id == id) { + return service; + } + } + + return NULL; +} + +static inline TunnelService *tunnel_worker_find_service_by_addr(TunnelWorker *worker, + struct in_addr *virt_ip, + uint32_t port) +{ + TunnelService *service; + for (service = (TunnelService *)ring_get_head(&worker->services); + service; + service = (TunnelService *)ring_next(&worker->services, &service->ring_item)) { + if ((virt_ip->s_addr == service->virt_ip.s_addr) && (port == service->port)) { + return service; + } + } + + return NULL; +} + +static inline void tunnel_worker_clear_routed_network(TunnelWorker *worker) +{ + while (!ring_is_empty(&worker->services)) { + TunnelService *service = (TunnelService *)ring_get_head(&worker->services); + if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + tunnel_worker_free_service(worker, service); + } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + tunnel_worker_free_print_service(worker, (TunnelPrintService *)service); + } else { + red_error("unexpected service type"); + } + } + + net_slirp_clear_virtual_ips(); +} + +static inline RedSocket *__tunnel_worker_find_free_socket(TunnelWorker *worker) +{ + int i; + RedSocket *ret = NULL; + + if (worker->num_sockets == MAX_SOCKETS_NUM) { + return NULL; + } + + for (i = 0; i < MAX_SOCKETS_NUM; i++) { + if (!worker->sockets[i].allocated) { + ret = worker->sockets + i; + ret->connection_id = i; + break; + } + } + + ASSERT(ret); + return ret; +} + +static inline void __tunnel_worker_add_socket(TunnelWorker *worker, RedSocket *sckt) +{ + ASSERT(!sckt->allocated); + sckt->allocated = TRUE; + worker->num_sockets++; +} + +static inline void tunnel_worker_alloc_socket(TunnelWorker *worker, RedSocket *sckt, + uint16_t local_port, TunnelService *far_service, + SlirpSocket *slirp_s) +{ + ASSERT(far_service); + sckt->worker = worker; + sckt->local_port = local_port; + sckt->far_service = far_service; + sckt->out_data.num_tokens = 0; + + sckt->slirp_status = SLIRP_SCKT_STATUS_OPEN; + sckt->client_status = CLIENT_SCKT_STATUS_WAIT_OPEN; + sckt->slirp_sckt = slirp_s; + + sckt->out_data.process_queue = SERVICES_CALLBACKS[far_service->type][ + PROCESS_DIRECTION_TYPE_REQUEST].alloc_process_queue(sckt); + sckt->in_data.process_queue = SERVICES_CALLBACKS[far_service->type][ + PROCESS_DIRECTION_TYPE_REPLY].alloc_process_queue(sckt); + __tunnel_worker_add_socket(worker, sckt); +} + +static inline void __tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt) +{ + memset(sckt, 0, sizeof(*sckt)); + worker->num_sockets--; +} + +static RedSocket *tunnel_worker_create_socket(TunnelWorker *worker, uint16_t local_port, + TunnelService *far_service, + SlirpSocket *slirp_s) +{ + RedSocket *new_socket; + ASSERT(worker); + new_socket = __tunnel_worker_find_free_socket(worker); + + if (!new_socket) { + red_error("malloc of RedSocket failed"); + } + + tunnel_worker_alloc_socket(worker, new_socket, local_port, far_service, slirp_s); + + return new_socket; +} + +static void tunnel_worker_free_socket(TunnelWorker *worker, RedSocket *sckt) +{ + if (worker->channel) { + if (red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.data_pipe_item)) { + red_channel_pipe_item_remove(&worker->channel->base, + &sckt->out_data.data_pipe_item); + return; + } + + if (red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.status_pipe_item)) { + red_channel_pipe_item_remove(&worker->channel->base, + &sckt->out_data.status_pipe_item); + return; + } + + if (red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.token_pipe_item)) { + red_channel_pipe_item_remove(&worker->channel->base, + &sckt->out_data.token_pipe_item); + return; + } + } + + SERVICES_CALLBACKS[sckt->far_service->type][ + PROCESS_DIRECTION_TYPE_REQUEST].free_process_queue(sckt->out_data.process_queue); + SERVICES_CALLBACKS[sckt->far_service->type][ + PROCESS_DIRECTION_TYPE_REPLY].free_process_queue(sckt->in_data.process_queue); + + ready_queue_clear(&sckt->out_data.ready_chunks_queue); + ready_queue_clear(&sckt->in_data.ready_chunks_queue); + + __tunnel_worker_free_socket(worker, sckt); +} + +static inline RedSocket *tunnel_worker_find_socket(TunnelWorker *worker, + uint16_t local_port, + uint32_t far_service_id) +{ + RedSocket *sckt; + int allocated = 0; + + for (sckt = worker->sockets; allocated < worker->num_sockets; sckt++) { + if (sckt->allocated) { + allocated++; + if ((sckt->local_port == local_port) && + (sckt->far_service->id == far_service_id)) { + return sckt; + } + } + } + return NULL; +} + +static inline void __tunnel_socket_add_fin_to_pipe(TunnelChannel *channel, RedSocket *sckt) +{ + ASSERT(!red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)); + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_FIN; + red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item); +} + +static inline void __tunnel_socket_add_close_to_pipe(TunnelChannel *channel, RedSocket *sckt) +{ + ASSERT(!channel->mig_inprogress); + + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) { + ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN); + // close is stronger than FIN + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item); + } + sckt->pushed_close = TRUE; + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSE; + red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item); +} + +static inline void __tunnel_socket_add_close_ack_to_pipe(TunnelChannel *channel, RedSocket *sckt) +{ + ASSERT(!channel->mig_inprogress); + + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.status_pipe_item)) { + ASSERT(sckt->out_data.status_pipe_item.type == PIPE_ITEM_TYPE_SOCKET_FIN); + // close is stronger than FIN + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.status_pipe_item); + } + + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK; + red_channel_pipe_add(&channel->base, &sckt->out_data.status_pipe_item); +} + +/* + Send close msg to the client. + If possible, notify slirp to recv data (which will return 0) + When close ack is received from client, we notify slirp (maybe again) if needed. +*/ +static void tunnel_socket_force_close(TunnelChannel *channel, RedSocket *sckt) +{ + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.token_pipe_item)) { + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.token_pipe_item); + } + + if (red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) { + red_channel_pipe_item_remove(&channel->base, &sckt->out_data.data_pipe_item); + } + + + if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) || + !sckt->pushed_close) { + __tunnel_socket_add_close_to_pipe(channel, sckt); + } + + // we can't call net_slirp_socket_can_receive_notify if the forced close was initiated by + // tunnel_socket_send (which was called from slirp). Instead, when + // we receive the close ack from the client, we call net_slirp_socket_can_receive_notify + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + if (!sckt->in_slirp_send) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + } else { + sckt->slirp_status = SLIRP_SCKT_STATUS_DELAY_ABORT; + } + } +} + +static int tunnel_channel_handle_socket_connect_ack(TunnelChannel *channel, RedSocket *sckt, + uint32_t tokens) +{ +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG"); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_OPEN_ACK; + sckt->mig_open_ack_tokens = tokens; + return TRUE; + } + + if (sckt->client_status != CLIENT_SCKT_STATUS_WAIT_OPEN) { + red_printf("unexpected REDC_TUNNEL_SOCKET_OPEN_ACK status=%d", sckt->client_status); + return FALSE; + } + sckt->client_status = CLIENT_SCKT_STATUS_OPEN; + + // SLIRP_SCKT_STATUS_CLOSED is possible after waiting for a connection has timed out + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + ASSERT(!sckt->pushed_close); + __tunnel_socket_add_close_to_pipe(channel, sckt); + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) { + sckt->out_data.window_size = tokens; + sckt->out_data.num_tokens = tokens; + net_slirp_socket_connected_notify(sckt->slirp_sckt); + } else { + red_printf("unexpected slirp status status=%d", sckt->slirp_status); + return FALSE; + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_connect_nack(TunnelChannel *channel, RedSocket *sckt) +{ +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_OPEN_NACK; + return TRUE; + } + + if (sckt->client_status != CLIENT_SCKT_STATUS_WAIT_OPEN) { + red_printf("unexpected REDC_TUNNEL_SOCKET_OPEN_NACK status=%d", sckt->client_status); + return FALSE; + } + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + net_slirp_socket_connect_failed_notify(sckt->slirp_sckt); + } else { + tunnel_worker_free_socket(channel->worker, sckt); + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_fin(TunnelChannel *channel, RedSocket *sckt) +{ +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_FIN; + return TRUE; + } + + if (sckt->client_status != CLIENT_SCKT_STATUS_OPEN) { + red_printf("unexpected REDC_TUNNEL_SOCKET_FIN status=%d", sckt->client_status); + return FALSE; + } + sckt->client_status = CLIENT_SCKT_STATUS_SHUTDOWN_SEND; + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT)) { + return TRUE; + } + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + // After slirp will receive all the data buffers, the next recv + // will return an error and shutdown_recv should be called. + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) { + // it already received the FIN + red_printf("unexpected slirp status=%d", sckt->slirp_status); + return FALSE; + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_closed(TunnelChannel *channel, RedSocket *sckt) +{ + int prev_client_status = sckt->client_status; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_CLOSED; + return TRUE; + } + + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + // if we already pushed close to the client, we expect it to send us ack. + // Otherwise, we will send it an ack. + if (!sckt->pushed_close) { + sckt->client_waits_close_ack = TRUE; + __tunnel_socket_add_close_ack_to_pipe(channel, sckt); + } + + return (!CHECK_TUNNEL_ERROR(channel)); + } + + // close was initiated by client + sckt->client_waits_close_ack = TRUE; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) { + // guest waits for fin: after slirp will receive all the data buffers, + // the next recv will return an error and shutdown_recv should be called. + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + } else if ((sckt->slirp_status != SLIRP_SCKT_STATUS_WAIT_CLOSE) || + (prev_client_status != CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + // slirp can be in wait close if both slirp and client sent fin perviously + // otherwise, the prev client status would also have been wait close, and this + // case was handled above + red_printf("unexpected slirp_status=%d", sckt->slirp_status); + return FALSE; + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_closed_ack(TunnelChannel *channel, RedSocket *sckt) +{ +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + if (channel->mig_inprogress || channel->base.migrate) { + sckt->mig_client_status_msg = REDC_TUNNEL_SOCKET_CLOSED_ACK; + return TRUE; + } + + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + return (!CHECK_TUNNEL_ERROR(channel)); + } + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + red_printf("unexcpected REDC_TUNNEL_SOCKET_CLOSED_ACK slirp_status=%d", + sckt->slirp_status); + return FALSE; + } + + tunnel_worker_free_socket(channel->worker, sckt); + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static int tunnel_channel_handle_socket_receive_data(TunnelChannel *channel, RedSocket *sckt, + RedSocketRawRcvBuf *recv_data, int buf_size) +{ + if ((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) || + (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED)) { + red_printf("unexcpected REDC_TUNNEL_SOCKET_DATA clinet_status=%d", + sckt->client_status); + return FALSE; + } + + // handling a case where the client sent data before it recieved the close msg + if ((sckt->slirp_status != SLIRP_SCKT_STATUS_OPEN) && + (sckt->slirp_status != SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data); + return (!CHECK_TUNNEL_ERROR(channel)); + } else if ((sckt->in_data.num_buffers == MAX_SOCKET_IN_BUFFERS) && + !channel->mig_inprogress && !channel->base.migrate) { + red_printf("socket in buffers overflow, socket will be closed" + " (local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + __tunnel_worker_free_socket_rcv_buf(sckt->worker, recv_data); + tunnel_socket_force_close(channel, sckt); + return (!CHECK_TUNNEL_ERROR(channel)); + } + + tunnel_socket_assign_rcv_buf(sckt, recv_data, buf_size); + if (!sckt->in_data.client_total_num_tokens) { + red_printf("token vailoation"); + return FALSE; + } + + --sckt->in_data.client_total_num_tokens; + __process_rcv_buf_tokens(channel, sckt); + + if (sckt->in_data.ready_chunks_queue.head && !channel->mig_inprogress) { + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } + + return (!CHECK_TUNNEL_ERROR(channel)); +} + +static inline int __client_socket_can_receive(RedSocket *sckt) +{ + return (((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) && + !sckt->worker->channel->mig_inprogress); +} + +static int tunnel_channel_handle_socket_token(TunnelChannel *channel, RedSocket *sckt, + RedcTunnelSocketTokens *message) +{ + sckt->out_data.num_tokens += message->num_tokens; + + if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head && + !red_channel_pipe_item_is_linked(&channel->base, &sckt->out_data.data_pipe_item)) { + // data is pending to be sent + sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item); + } + + return TRUE; +} + +static uint8_t *tunnel_channel_alloc_msg_rcv_buf(RedChannel *channel, RedDataHeader *msg_header) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + if (msg_header->type == REDC_TUNNEL_SOCKET_DATA) { + return (__tunnel_worker_alloc_socket_rcv_buf(tunnel_channel->worker)->buf); + } else if ((msg_header->type == REDC_MIGRATE_DATA) || + (msg_header->type == REDC_TUNNEL_SERVICE_ADD)) { + uint8_t *ret = malloc(msg_header->size); + if (!ret) { + red_error("failed allocating"); + } + return ret; + } else { + return (tunnel_channel->control_rcv_buf); + } +} + +// called by the receive routine of the channel, before the buffer was assigned to a socket +static void tunnel_channel_release_msg_rcv_buf(RedChannel *channel, RedDataHeader *msg_header, + uint8_t *msg) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + if (msg_header->type == REDC_TUNNEL_SOCKET_DATA) { + ASSERT(!(CONTAINEROF(msg, RedSocketRawRcvBuf, buf)->base.usr_opaque)); + __tunnel_worker_free_socket_rcv_buf(tunnel_channel->worker, + CONTAINEROF(msg, RedSocketRawRcvBuf, buf)); + } +} + +static void __tunnel_channel_fill_service_migrate_item(TunnelChannel *channel, + TunnelService *service, + TunnelMigrateServiceItem *migrate_item) +{ + migrate_item->service = service; + TunnelMigrateService *general_data; + if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + general_data = &migrate_item->u.generic_service; + } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + general_data = &migrate_item->u.print_service.base; + memcpy(migrate_item->u.print_service.ip, ((TunnelPrintService *)service)->ip, 4); + } else { + red_error("unexpected service type"); + } + + general_data->type = service->type; + general_data->id = service->id; + general_data->group = service->group; + general_data->port = service->port; + memcpy(general_data->virt_ip, &service->virt_ip.s_addr, 4); +} + +static void __tunnel_channel_fill_socket_migrate_item(TunnelChannel *channel, RedSocket *sckt, + TunnelMigrateSocketItem *migrate_item) +{ + TunnelMigrateSocket *mig_sckt = &migrate_item->mig_socket; + migrate_item->socket = sckt; + mig_sckt->connection_id = sckt->connection_id; + mig_sckt->local_port = sckt->local_port; + mig_sckt->far_service_id = sckt->far_service->id; + mig_sckt->client_status = sckt->client_status; + mig_sckt->slirp_status = sckt->slirp_status; + + mig_sckt->pushed_close = sckt->pushed_close; + mig_sckt->client_waits_close_ack = sckt->client_waits_close_ack; + + mig_sckt->mig_client_status_msg = sckt->mig_client_status_msg; + mig_sckt->mig_open_ack_tokens = sckt->mig_open_ack_tokens; + + mig_sckt->out_data.num_tokens = sckt->out_data.num_tokens; + mig_sckt->out_data.window_size = sckt->out_data.window_size; + + // checking if there is a need to save the queues + if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED_ACK)) { + mig_sckt->out_data.process_queue_size = + sckt->out_data.process_queue->get_migrate_data_proc(sckt->out_data.process_queue, + &migrate_item->out_process_queue); + } + + mig_sckt->in_data.num_tokens = sckt->in_data.num_tokens; + mig_sckt->in_data.client_total_num_tokens = sckt->in_data.client_total_num_tokens; + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + mig_sckt->in_data.process_queue_size = + sckt->in_data.process_queue->get_migrate_data_proc(sckt->in_data.process_queue, + &migrate_item->in_process_queue); + } + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + migrate_item->slirp_socket_size = net_slirp_tcp_socket_export(sckt->slirp_sckt, + &migrate_item->slirp_socket); + if (!migrate_item->slirp_socket) { + SET_TUNNEL_ERROR(channel, "failed export slirp socket"); + } + } else { + migrate_item->slirp_socket_size = 0; + migrate_item->slirp_socket = NULL; + } +} + +static void release_migrate_item(TunnelMigrateItem *item); +static int tunnel_channel_handle_migrate_mark(TunnelChannel *channel) +{ + TunnelMigrateItem *migrate_item = NULL; + TunnelService *service; + TunnelMigrateServiceItem *mig_service; + int num_sockets_saved = 0; + RedSocket *sckt; + + if (!channel->expect_migrate_mark) { + red_printf("unexpected"); + return FALSE; + } + channel->expect_migrate_mark = FALSE; + migrate_item = (TunnelMigrateItem *)malloc(sizeof(*migrate_item)); + + if (!migrate_item) { + red_error("failed alloc TunnelMigrateItem"); + } + + memset(migrate_item, 0, sizeof(*migrate_item)); + migrate_item->base.type = PIPE_ITEM_TYPE_MIGRATE_DATA; + + migrate_item->slirp_state_size = net_slirp_state_export(&migrate_item->slirp_state); + if (!migrate_item->slirp_state) { + red_printf("failed export slirp state"); + goto error; + } + + migrate_item->services_list_size = sizeof(TunnelMigrateServicesList) + + (sizeof(uint32_t)*channel->worker->num_services); + migrate_item->services_list = + (TunnelMigrateServicesList *)malloc(migrate_item->services_list_size); + if (!migrate_item->services_list) { + red_error("failed alloc services list"); + } + migrate_item->services_list->num_services = channel->worker->num_services; + + migrate_item->services = (TunnelMigrateServiceItem *)malloc( + channel->worker->num_services * sizeof(TunnelMigrateServiceItem)); + + if (!migrate_item->services) { + red_error("failed alloc services items"); + } + + for (mig_service = migrate_item->services, + service = (TunnelService *)ring_get_head(&channel->worker->services); + service; + mig_service++, + service = (TunnelService *)ring_next(&channel->worker->services, &service->ring_item)) { + __tunnel_channel_fill_service_migrate_item(channel, service, mig_service); + if (CHECK_TUNNEL_ERROR(channel)) { + goto error; + } + } + + migrate_item->sockets_list_size = sizeof(TunnelMigrateSocketList) + + (sizeof(uint32_t)*channel->worker->num_sockets); + migrate_item->sockets_list = (TunnelMigrateSocketList *)malloc( + migrate_item->sockets_list_size); + if (!migrate_item->sockets_list) { + red_error("failed alloc sockets list"); + } + + migrate_item->sockets_list->num_sockets = channel->worker->num_sockets; + + for (sckt = channel->worker->sockets; num_sockets_saved < channel->worker->num_sockets; + sckt++) { + if (sckt->allocated) { + __tunnel_channel_fill_socket_migrate_item(channel, sckt, + &migrate_item->sockets_data[ + num_sockets_saved++]); + if (CHECK_TUNNEL_ERROR(channel)) { + goto error; + } + } + } + + red_channel_pipe_add((RedChannel *)channel, &migrate_item->base); + + return TRUE; +error: + release_migrate_item(migrate_item); + return FALSE; +} + +static void release_migrate_item(TunnelMigrateItem *item) +{ + if (!item) { + return; + } + + int i; + if (item->sockets_list) { + int num_sockets = item->sockets_list->num_sockets; + for (i = 0; i < num_sockets; i++) { + if (item->sockets_data[i].socket) { // handling errors in the middle of + // __tunnel_channel_fill_socket_migrate_item + if (item->sockets_data[i].out_process_queue) { + item->sockets_data[i].socket->out_data.process_queue->release_migrate_data_proc( + item->sockets_data[i].socket->out_data.process_queue, + item->sockets_data[i].out_process_queue); + } + if (item->sockets_data[i].in_process_queue) { + item->sockets_data[i].socket->in_data.process_queue->release_migrate_data_proc( + item->sockets_data[i].socket->in_data.process_queue, + item->sockets_data[i].in_process_queue); + } + } + + free(item->sockets_data[i].slirp_socket); + } + free(item->sockets_list); + } + + free(item->services); + free(item->services_list); + free(item->slirp_state); + free(item); +} + +typedef RawTunneledBuffer *(*socket_alloc_buffer_proc_t)(RedSocket *sckt); + +typedef struct RedSocketRestoreTokensBuf { + RedSocketRawRcvBuf base; + int num_tokens; +} RedSocketRestoreTokensBuf; + +// not updating tokens +static void restored_rcv_buf_release(RawTunneledBuffer *buf) +{ + RedSocket *sckt = (RedSocket *)buf->usr_opaque; + --sckt->in_data.num_buffers; + __tunnel_worker_free_socket_rcv_buf(sckt->worker, (RedSocketRawRcvBuf *)buf); + // for case that ready queue is empty and the client has no tokens + __process_rcv_buf_tokens(sckt->worker->channel, sckt); +} + +RawTunneledBuffer *tunnel_socket_alloc_restored_rcv_buf(RedSocket *sckt) +{ + RedSocketRawRcvBuf *buf = __tunnel_worker_alloc_socket_rcv_buf(sckt->worker); + buf->base.usr_opaque = sckt; + buf->base.release_proc = restored_rcv_buf_release; + + sckt->in_data.num_buffers++; + return &buf->base; +} + +static void restore_tokens_buf_release(RawTunneledBuffer *buf) +{ + RedSocketRestoreTokensBuf *tokens_buf = (RedSocketRestoreTokensBuf *)buf; + RedSocket *sckt = (RedSocket *)buf->usr_opaque; + + sckt->in_data.num_tokens += tokens_buf->num_tokens; + __process_rcv_buf_tokens(sckt->worker->channel, sckt); + + free(tokens_buf); +} + +RawTunneledBuffer *__tunnel_socket_alloc_restore_tokens_buf(RedSocket *sckt, int num_tokens) +{ + RedSocketRestoreTokensBuf *buf = (RedSocketRestoreTokensBuf *)malloc(sizeof(*buf)); + if (!buf) { + red_error("failed alloc"); + } + memset(buf, 0, sizeof(*buf)); + + buf->base.base.usr_opaque = sckt; + buf->base.base.refs = 1; + buf->base.base.release_proc = restore_tokens_buf_release; + buf->num_tokens = num_tokens; +#ifdef DEBUG_NETWORK + red_printf("TUNNEL DBG: num_tokens=%d", num_tokens); +#endif + return &buf->base.base; +} + +static void __restore_ready_chunks_queue(RedSocket *sckt, ReadyTunneledChunkQueue *queue, + uint8_t *data, int size, + socket_alloc_buffer_proc_t alloc_buf) +{ + int copied = 0; + + while (copied < size) { + RawTunneledBuffer *buf = alloc_buf(sckt); + int copy_count = MIN(size - copied, buf->max_size); + memcpy(buf->data, data + copied, copy_count); + copied += copy_count; + buf->size = copy_count; + ready_queue_add_orig_chunk(queue, buf, buf->data, buf->size); + tunneled_buffer_unref(buf); + } +} + +// not using the alloc_buf cb of the queue, since we may want to create the migrated buffers +// with other properties (e.g., not releasing tokent) +static void __restore_process_queue(RedSocket *sckt, TunneledBufferProcessQueue *queue, + uint8_t *data, int size, + socket_alloc_buffer_proc_t alloc_buf) +{ + int copied = 0; + + while (copied < size) { + RawTunneledBuffer *buf = alloc_buf(sckt); + int copy_count = MIN(size - copied, buf->max_size); + memcpy(buf->data, data + copied, copy_count); + copied += copy_count; + buf->size = copy_count; + __process_queue_push(queue, buf); + } +} + +static void tunnel_channel_restore_migrated_service(TunnelChannel *channel, + TunnelMigrateService *mig_service, + uint8_t *data_buf) +{ + int service_size; + TunnelService *service; + struct in_addr virt_ip; + if (mig_service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + service_size = sizeof(TunnelService); + } else if (mig_service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + service_size = sizeof(TunnelPrintService); + } else { + SET_TUNNEL_ERROR(channel, "unexpected service type"); + return; + } + + memcpy(&virt_ip.s_addr, mig_service->virt_ip, 4); + service = __tunnel_worker_add_service(channel->worker, service_size, + mig_service->type, mig_service->id, + mig_service->group, mig_service->port, + (char *)(data_buf + mig_service->name), + (char *)(data_buf + mig_service->description), &virt_ip); + if (!service) { + SET_TUNNEL_ERROR(channel, "failed creating service"); + return; + } + + if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + TunnelMigratePrintService *mig_print_service = (TunnelMigratePrintService *)mig_service; + TunnelPrintService *print_service = (TunnelPrintService *)service; + + memcpy(print_service->ip, mig_print_service->ip, 4); + } +} + +static void tunnel_channel_restore_migrated_socket(TunnelChannel *channel, + TunnelMigrateSocket *mig_socket, + uint8_t *data_buf) +{ + RedSocket *sckt; + SlirpSocket *slirp_sckt; + RawTunneledBuffer *tokens_buf; + TunnelService *service; + sckt = channel->worker->sockets + mig_socket->connection_id; + sckt->connection_id = mig_socket->connection_id; + ASSERT(!sckt->allocated); + + /* Services must be restored before sockets */ + service = tunnel_worker_find_service_by_id(channel->worker, mig_socket->far_service_id); + if (!service) { + SET_TUNNEL_ERROR(channel, "service not found"); + return; + } + + tunnel_worker_alloc_socket(channel->worker, sckt, mig_socket->local_port, service, NULL); + + sckt->client_status = mig_socket->client_status; + sckt->slirp_status = mig_socket->slirp_status; + + sckt->mig_client_status_msg = mig_socket->mig_client_status_msg; + sckt->mig_open_ack_tokens = mig_socket->mig_open_ack_tokens; + + sckt->pushed_close = mig_socket->pushed_close; + sckt->client_waits_close_ack = mig_socket->client_waits_close_ack; + + if (sckt->slirp_status != SLIRP_SCKT_STATUS_CLOSED) { + slirp_sckt = net_slirp_tcp_socket_restore(data_buf + mig_socket->slirp_sckt, sckt); + if (!slirp_sckt) { + SET_TUNNEL_ERROR(channel, "failed restoring slirp socket"); + return; + } + sckt->slirp_sckt = slirp_sckt; + } + // out data + sckt->out_data.num_tokens = mig_socket->out_data.num_tokens; + sckt->out_data.window_size = mig_socket->out_data.window_size; + sckt->out_data.data_size = mig_socket->out_data.process_buf_size + + mig_socket->out_data.ready_buf_size; + + __restore_ready_chunks_queue(sckt, &sckt->out_data.ready_chunks_queue, + data_buf + mig_socket->out_data.ready_buf, + mig_socket->out_data.ready_buf_size, + tunnel_socket_alloc_snd_buf); + + sckt->out_data.process_queue->restore_proc(sckt->out_data.process_queue, + data_buf + mig_socket->out_data.process_queue); + + __restore_process_queue(sckt, sckt->out_data.process_queue, + data_buf + mig_socket->out_data.process_buf, + mig_socket->out_data.process_buf_size, + tunnel_socket_alloc_snd_buf); + + sckt->in_data.client_total_num_tokens = mig_socket->in_data.client_total_num_tokens; + sckt->in_data.num_tokens = mig_socket->in_data.num_tokens; + + __restore_ready_chunks_queue(sckt, &sckt->in_data.ready_chunks_queue, + data_buf + mig_socket->in_data.ready_buf, + mig_socket->in_data.ready_buf_size, + tunnel_socket_alloc_restored_rcv_buf); + + sckt->in_data.process_queue->restore_proc(sckt->in_data.process_queue, + data_buf + mig_socket->in_data.process_queue); + + __restore_process_queue(sckt, sckt->in_data.process_queue, + data_buf + mig_socket->in_data.process_buf, + mig_socket->in_data.process_buf_size, + tunnel_socket_alloc_restored_rcv_buf); + + tokens_buf = __tunnel_socket_alloc_restore_tokens_buf(sckt, + SOCKET_WINDOW_SIZE - + (sckt->in_data.client_total_num_tokens + + sckt->in_data.num_tokens)); + if (sckt->in_data.process_queue->head) { + __process_queue_push(sckt->in_data.process_queue, tokens_buf); + } else { + ready_queue_add_orig_chunk(&sckt->in_data.ready_chunks_queue, tokens_buf, + tokens_buf->data, tokens_buf->size); + tunneled_buffer_unref(tokens_buf); + } +} + +static void tunnel_channel_restore_socket_state(TunnelChannel *channel, RedSocket *sckt) +{ + int ret = TRUE; + red_printf(""); + // handling client status msgs that were received during migration + switch (sckt->mig_client_status_msg) { + case 0: + break; + case REDC_TUNNEL_SOCKET_OPEN_ACK: + ret = tunnel_channel_handle_socket_connect_ack(channel, sckt, + sckt->mig_open_ack_tokens); + break; + case REDC_TUNNEL_SOCKET_OPEN_NACK: + ret = tunnel_channel_handle_socket_connect_nack(channel, sckt); + break; + case REDC_TUNNEL_SOCKET_FIN: + if (sckt->client_status == CLIENT_SCKT_STATUS_WAIT_OPEN) { + ret = tunnel_channel_handle_socket_connect_ack(channel, sckt, + sckt->mig_open_ack_tokens); + } + if (ret) { + ret = tunnel_channel_handle_socket_fin(channel, sckt); + } + break; + case REDC_TUNNEL_SOCKET_CLOSED: + // can't just send nack since we need to send close ack to client + if (sckt->client_status == CLIENT_SCKT_STATUS_WAIT_OPEN) { + ret = tunnel_channel_handle_socket_connect_ack(channel, sckt, + sckt->mig_open_ack_tokens); + } + ret = ret & tunnel_channel_handle_socket_closed(channel, sckt); + + break; + case REDC_TUNNEL_SOCKET_CLOSED_ACK: + ret = tunnel_channel_handle_socket_closed_ack(channel, sckt); + break; + default: + SET_TUNNEL_ERROR(channel, "invalid message type %u", sckt->mig_client_status_msg); + return; + } + + if (!ret) { + SET_TUNNEL_ERROR(channel, "failed restoring socket state"); + return; + } + sckt->mig_client_status_msg = 0; + sckt->mig_open_ack_tokens = 0; + + // handling data transfer + if (__client_socket_can_receive(sckt) && sckt->out_data.ready_chunks_queue.head) { + if (!red_channel_pipe_item_is_linked( + &channel->base, &sckt->out_data.data_pipe_item)) { + sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&channel->base, &sckt->out_data.data_pipe_item); + } + } + + if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) && + sckt->in_data.ready_chunks_queue.head) { + net_slirp_socket_can_receive_notify(sckt->slirp_sckt); + } + + if (CHECK_TUNNEL_ERROR(channel)) { + return; + } + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) { + net_slirp_socket_can_send_notify(sckt->slirp_sckt); + } + + if (CHECK_TUNNEL_ERROR(channel)) { + return; + } + // for cases where the client has no tokens left, but all the data is in the process queue. + __process_rcv_buf_tokens(channel, sckt); +} + +static inline void tunnel_channel_activate_migrated_sockets(TunnelChannel *channel) +{ + // if we are overgoing migration again, no need to restore the state, we will wait + // for the next host. + if (!channel->mig_inprogress) { + int num_activated = 0; + RedSocket *sckt = channel->worker->sockets; + + for (; num_activated < channel->worker->num_sockets; sckt++) { + if (sckt->allocated) { + tunnel_channel_restore_socket_state(channel, sckt); + + if (CHECK_TUNNEL_ERROR(channel)) { + return; + } + + num_activated++; + } + } + net_slirp_unfreeze(); + } +} + +static int tunnel_channel_handle_migrate_data(TunnelChannel *channel, + TunnelMigrateData *migrate_data) +{ + TunnelMigrateSocketList *sockets_list; + TunnelMigrateServicesList *services_list; + int i; + + if (!channel->expect_migrate_data) { + red_printf("unexpected"); + goto error; + } + channel->expect_migrate_data = FALSE; + + if (migrate_data->magic != TUNNEL_MIGRATE_DATA_MAGIC || + migrate_data->version != TUNNEL_MIGRATE_DATA_VERSION) { + red_printf("invalid content"); + goto error; + } + + ASSERT(channel->base.send_data.header.serial == 0); + channel->base.send_data.header.serial = migrate_data->message_serial; + + net_slirp_state_restore(migrate_data->data + migrate_data->slirp_state); + + services_list = (TunnelMigrateServicesList *)(migrate_data->data + + migrate_data->services_list); + for (i = 0; i < services_list->num_services; i++) { + tunnel_channel_restore_migrated_service(channel, + (TunnelMigrateService *)(migrate_data->data + + services_list->services[i]), + migrate_data->data); + if (CHECK_TUNNEL_ERROR(channel)) { + red_printf("failed restoring service"); + goto error; + } + } + + sockets_list = (TunnelMigrateSocketList *)(migrate_data->data + migrate_data->sockets_list); + + for (i = 0; i < sockets_list->num_sockets; i++) { + tunnel_channel_restore_migrated_socket(channel, + (TunnelMigrateSocket *)(migrate_data->data + + sockets_list->sockets[i]), + migrate_data->data); + if (CHECK_TUNNEL_ERROR(channel)) { + red_printf("failed restoring socket"); + goto error; + } + } + + // activate channel + channel->base.migrate = FALSE; + red_channel_init_outgoing_messages_window(&channel->base); + + tunnel_channel_activate_migrated_sockets(channel); + + if (CHECK_TUNNEL_ERROR(channel)) { + goto error; + } + free(migrate_data); + return TRUE; +error: + free(migrate_data); + return FALSE; +} + +// msg was allocated by tunnel_channel_alloc_msg_rcv_buf +static int tunnel_channel_handle_message(RedChannel *channel, RedDataHeader *header, uint8_t *msg) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + RedSocket *sckt = NULL; + // retrieve the sckt + switch (header->type) { + case REDC_MIGRATE_FLUSH_MARK: + case REDC_MIGRATE_DATA: + case REDC_TUNNEL_SERVICE_ADD: + case REDC_TUNNEL_SERVICE_REMOVE: + break; + case REDC_TUNNEL_SOCKET_OPEN_ACK: + case REDC_TUNNEL_SOCKET_OPEN_NACK: + case REDC_TUNNEL_SOCKET_DATA: + case REDC_TUNNEL_SOCKET_FIN: + case REDC_TUNNEL_SOCKET_CLOSED: + case REDC_TUNNEL_SOCKET_CLOSED_ACK: + case REDC_TUNNEL_SOCKET_TOKEN: + // the first field in these messages is connection id + sckt = tunnel_channel->worker->sockets + (*((uint16_t *)msg)); + if (!sckt->allocated) { + red_printf("red socket not found"); + return FALSE; + } + break; + default: + return red_channel_handle_message(channel, header, msg); + } + + switch (header->type) { + case REDC_TUNNEL_SERVICE_ADD: + if (header->size < sizeof(RedcTunnelAddGenericService)) { + red_printf("bad message size"); + free(msg); + return FALSE; + } + return tunnel_channel_handle_service_add(tunnel_channel, + (RedcTunnelAddGenericService *)msg); + case REDC_TUNNEL_SERVICE_REMOVE: + red_printf("REDC_TUNNEL_REMOVE_SERVICE not supported yet"); + return FALSE; + case REDC_TUNNEL_SOCKET_OPEN_ACK: + if (header->size != sizeof(RedcTunnelSocketOpenAck)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_connect_ack(tunnel_channel, sckt, + ((RedcTunnelSocketOpenAck *)msg)->tokens); + + case REDC_TUNNEL_SOCKET_OPEN_NACK: + if (header->size != sizeof(RedcTunnelSocketOpenNack)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_connect_nack(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_DATA: + { + if (header->size < sizeof(RedcTunnelSocketData)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_receive_data(tunnel_channel, sckt, + CONTAINEROF(msg, RedSocketRawRcvBuf, buf), + header->size - sizeof(RedcTunnelSocketData)); + } + case REDC_TUNNEL_SOCKET_FIN: + if (header->size != sizeof(RedcTunnelSocketFin)) { + red_printf("bad message size"); + return FALSE; + } + return tunnel_channel_handle_socket_fin(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_CLOSED: + if (header->size != sizeof(RedcTunnelSocketClosed)) { + red_printf("bad message size"); + return FALSE; + } + return tunnel_channel_handle_socket_closed(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_CLOSED_ACK: + if (header->size != sizeof(RedcTunnelSocketClosedAck)) { + red_printf("bad message size"); + return FALSE; + } + return tunnel_channel_handle_socket_closed_ack(tunnel_channel, sckt); + case REDC_TUNNEL_SOCKET_TOKEN: + if (header->size != sizeof(RedcTunnelSocketTokens)) { + red_printf("bad message size"); + return FALSE; + } + + return tunnel_channel_handle_socket_token(tunnel_channel, sckt, + (RedcTunnelSocketTokens *)msg); + case REDC_MIGRATE_FLUSH_MARK: + return tunnel_channel_handle_migrate_mark(tunnel_channel); + case REDC_MIGRATE_DATA: + if (header->size < sizeof(TunnelMigrateData)) { + red_printf("bad message size"); + free(msg); + return FALSE; + } + return tunnel_channel_handle_migrate_data(tunnel_channel, (TunnelMigrateData *)msg); + default: + return red_channel_handle_message(channel, header, msg); + } + return TRUE; +} + +/********************************/ +/* outgoing msgs +********************************/ + +static void tunnel_channel_send_set_ack(TunnelChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->base.send_data.u.ack.generation = ++channel->base.ack_data.generation; + channel->base.send_data.u.ack.window = CLIENT_ACK_WINDOW; + + red_channel_init_send_data(&channel->base, RED_SET_ACK, item); + red_channel_add_buf(&channel->base, &channel->base.send_data.u.ack, sizeof(RedSetAck)); + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_migrate(TunnelChannel *channel, PipeItem *item) +{ + ASSERT(channel); + channel->base.send_data.u.migrate.flags = RED_MIGRATE_NEED_FLUSH | + RED_MIGRATE_NEED_DATA_TRANSFER; + channel->expect_migrate_mark = TRUE; + red_channel_init_send_data(&channel->base, RED_MIGRATE, item); + red_channel_add_buf(&channel->base, &channel->base.send_data.u.migrate, sizeof(RedMigrate)); + red_channel_begin_send_massage(&channel->base); +} + +static int __tunnel_channel_send_process_bufs_migrate_data(TunnelChannel *channel, + TunneledBufferProcessQueue *queue) +{ + int buf_offset = queue->head_offset; + RawTunneledBuffer *buf = queue->head; + int size = 0; + + while (buf) { + red_channel_add_buf(&channel->base, buf->data + buf_offset, buf->size - buf_offset); + size += buf->size - buf_offset; + buf_offset = 0; + buf = buf->next; + } + + return size; +} + +static int __tunnel_channel_send_ready_bufs_migrate_data(TunnelChannel *channel, + ReadyTunneledChunkQueue *queue) +{ + int offset = queue->offset; + ReadyTunneledChunk *chunk = queue->head; + int size = 0; + + while (chunk) { + red_channel_add_buf(&channel->base, chunk->data + offset, chunk->size - offset); + size += chunk->size - offset; + offset = 0; + chunk = chunk->next; + } + return size; +} + +// returns the size to send +static int __tunnel_channel_send_service_migrate_data(TunnelChannel *channel, + TunnelMigrateServiceItem *item, + int offset) +{ + TunnelService *service = item->service; + int cur_offset = offset; + TunnelMigrateService *generic_data; + + if (service->type == RED_TUNNEL_SERVICE_TYPE_GENERIC) { + generic_data = &item->u.generic_service; + red_channel_add_buf(&channel->base, &item->u.generic_service, + sizeof(item->u.generic_service)); + cur_offset += sizeof(item->u.generic_service); + } else if (service->type == RED_TUNNEL_SERVICE_TYPE_IPP) { + generic_data = &item->u.print_service.base; + red_channel_add_buf(&channel->base, &item->u.print_service, + sizeof(item->u.print_service)); + cur_offset += sizeof(item->u.print_service); + } else { + red_error("unexpected service type"); + } + + generic_data->name = cur_offset; + red_channel_add_buf(&channel->base, service->name, strlen(service->name) + 1); + cur_offset += strlen(service->name) + 1; + + generic_data->description = cur_offset; + red_channel_add_buf(&channel->base, service->description, strlen(service->description) + 1); + cur_offset += strlen(service->description) + 1; + + return (cur_offset - offset); +} + +// returns the size to send +static int __tunnel_channel_send_socket_migrate_data(TunnelChannel *channel, + TunnelMigrateSocketItem *item, int offset) +{ + RedSocket *sckt = item->socket; + TunnelMigrateSocket *mig_sckt = &item->mig_socket; + int cur_offset = offset; + red_channel_add_buf(&channel->base, mig_sckt, sizeof(*mig_sckt)); + cur_offset += sizeof(*mig_sckt); + + if ((sckt->client_status != CLIENT_SCKT_STATUS_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED) && + (sckt->mig_client_status_msg != REDC_TUNNEL_SOCKET_CLOSED_ACK)) { + mig_sckt->out_data.process_buf = cur_offset; + mig_sckt->out_data.process_buf_size = + __tunnel_channel_send_process_bufs_migrate_data(channel, + sckt->out_data.process_queue); + cur_offset += mig_sckt->out_data.process_buf_size; + if (mig_sckt->out_data.process_queue_size) { + mig_sckt->out_data.process_queue = cur_offset; + red_channel_add_buf(&channel->base, item->out_process_queue, + mig_sckt->out_data.process_queue_size); + cur_offset += mig_sckt->out_data.process_queue_size; + } + mig_sckt->out_data.ready_buf = cur_offset; + mig_sckt->out_data.ready_buf_size = + __tunnel_channel_send_ready_bufs_migrate_data(channel, + &sckt->out_data.ready_chunks_queue); + cur_offset += mig_sckt->out_data.ready_buf_size; + } else { + mig_sckt->out_data.process_buf_size = 0; + mig_sckt->out_data.ready_buf_size = 0; + } + + // notice that we migrate the received buffers without the msg headers. + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND)) { + mig_sckt->in_data.process_buf = cur_offset; + mig_sckt->in_data.process_buf_size = + __tunnel_channel_send_process_bufs_migrate_data(channel, + sckt->in_data.process_queue); + cur_offset += mig_sckt->in_data.process_buf_size; + if (mig_sckt->in_data.process_queue_size) { + mig_sckt->in_data.process_queue = cur_offset; + red_channel_add_buf(&channel->base, item->in_process_queue, + mig_sckt->in_data.process_queue_size); + cur_offset += mig_sckt->in_data.process_queue_size; + } + mig_sckt->in_data.ready_buf = cur_offset; + mig_sckt->in_data.ready_buf_size = + __tunnel_channel_send_ready_bufs_migrate_data(channel, + &sckt->in_data.ready_chunks_queue); + cur_offset += mig_sckt->in_data.ready_buf_size; + } else { + mig_sckt->in_data.process_buf_size = 0; + mig_sckt->in_data.ready_buf_size = 0; + } + + if (item->slirp_socket_size) { // zero if socket is closed + red_channel_add_buf(&channel->base, item->slirp_socket, item->slirp_socket_size); + mig_sckt->slirp_sckt = cur_offset; + cur_offset += item->slirp_socket_size; + } + return (cur_offset - offset); +} + +static void tunnel_channel_send_migrate_data(TunnelChannel *channel, PipeItem *item) +{ + TunnelMigrateData *migrate_data = &channel->send_data.u.migrate_data; + TunnelMigrateItem *migrate_item = (TunnelMigrateItem *)item; + int i; + + uint32_t data_buf_offset = 0; // current location in data[0] field + ASSERT(channel); + + migrate_data->magic = TUNNEL_MIGRATE_DATA_MAGIC; + migrate_data->version = TUNNEL_MIGRATE_DATA_VERSION; + migrate_data->message_serial = red_channel_get_message_serial(&channel->base); + red_channel_init_send_data(&channel->base, RED_MIGRATE_DATA, item); + red_channel_add_buf(&channel->base, migrate_data, sizeof(*migrate_data)); + + migrate_data->slirp_state = data_buf_offset; + red_channel_add_buf(&channel->base, migrate_item->slirp_state, migrate_item->slirp_state_size); + data_buf_offset += migrate_item->slirp_state_size; + + migrate_data->services_list = data_buf_offset; + red_channel_add_buf(&channel->base, migrate_item->services_list, + migrate_item->services_list_size); + data_buf_offset += migrate_item->services_list_size; + + for (i = 0; i < migrate_item->services_list->num_services; i++) { + migrate_item->services_list->services[i] = data_buf_offset; + data_buf_offset += __tunnel_channel_send_service_migrate_data(channel, + migrate_item->services + i, + data_buf_offset); + } + + + migrate_data->sockets_list = data_buf_offset; + red_channel_add_buf(&channel->base, migrate_item->sockets_list, + migrate_item->sockets_list_size); + data_buf_offset += migrate_item->sockets_list_size; + + for (i = 0; i < migrate_item->sockets_list->num_sockets; i++) { + migrate_item->sockets_list->sockets[i] = data_buf_offset; + data_buf_offset += __tunnel_channel_send_socket_migrate_data(channel, + migrate_item->sockets_data + i, + data_buf_offset); + } + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_init(TunnelChannel *channel, PipeItem *item) +{ + ASSERT(channel); + + channel->send_data.u.init.max_socket_data_size = MAX_SOCKET_DATA_SIZE; + channel->send_data.u.init.max_num_of_sockets = MAX_SOCKETS_NUM; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_INIT, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.init, sizeof(RedTunnelInit)); + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_service_ip_map(TunnelChannel *channel, PipeItem *item) +{ + TunnelService *service = CONTAINEROF(item, TunnelService, pipe_item); + + channel->send_data.u.service_ip.service_id = service->id; + channel->send_data.u.service_ip.virtual_ip.type = RED_TUNNEL_IP_TYPE_IPv4; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SERVICE_IP_MAP, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.service_ip, + sizeof(RedTunnelServiceIpMap)); + red_channel_add_buf(&channel->base, &service->virt_ip.s_addr, sizeof(RedTunnelIPv4)); + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_socket_open(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + channel->send_data.u.socket_open.connection_id = sckt->connection_id; + channel->send_data.u.socket_open.service_id = sckt->far_service->id; + channel->send_data.u.socket_open.tokens = SOCKET_WINDOW_SIZE; + + sckt->in_data.client_total_num_tokens = SOCKET_WINDOW_SIZE; + sckt->in_data.num_tokens = 0; + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_OPEN, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_open, + sizeof(channel->send_data.u.socket_open)); + + red_channel_begin_send_massage(&channel->base); +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif +} + +static void tunnel_channel_send_socket_fin(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + ASSERT(!sckt->out_data.ready_chunks_queue.head); + + if (sckt->out_data.process_queue->head) { + red_printf("socket sent FIN but there are still buffers in outgoing process queue" + "(local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + } + + channel->send_data.u.socket_fin.connection_id = sckt->connection_id; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_FIN, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_fin, + sizeof(channel->send_data.u.socket_fin)); + + red_channel_begin_send_massage(&channel->base); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif +} + +static void tunnel_channel_send_socket_close(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + // can happen when it is a forced close + if (sckt->out_data.ready_chunks_queue.head) { + red_printf("socket closed but there are still buffers in outgoing ready queue" + "(local_port=%d, service_id=%d)", + ntohs(sckt->local_port), + sckt->far_service->id); + } + + if (sckt->out_data.process_queue->head) { + red_printf("socket closed but there are still buffers in outgoing process queue" + "(local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + } + + channel->send_data.u.socket_close.connection_id = sckt->connection_id; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_CLOSE, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close, + sizeof(channel->send_data.u.socket_close)); + + red_channel_begin_send_massage(&channel->base); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif +} + +static void tunnel_channel_send_socket_closed_ack(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, status_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + channel->send_data.u.socket_close_ack.connection_id = sckt->connection_id; + + // pipe item is null because we free the sckt. + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_CLOSED_ACK, NULL); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_close_ack, + sizeof(channel->send_data.u.socket_close_ack)); + + red_channel_begin_send_massage(&channel->base); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + + ASSERT(sckt->client_waits_close_ack && (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED)); + tunnel_worker_free_socket(channel->worker, sckt); + if (CHECK_TUNNEL_ERROR(channel)) { + tunnel_shutdown(channel->worker); + } +} + +static void tunnel_channel_send_socket_token(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, token_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + /* notice that the num of tokens sent can be > SOCKET_TOKENS_TO_SEND, since + the sending is performed after the pipe item was pushed */ + + channel->send_data.u.socket_token.connection_id = sckt->connection_id; + + if (sckt->in_data.num_tokens > 0) { + channel->send_data.u.socket_token.num_tokens = sckt->in_data.num_tokens; + } else { + ASSERT(!sckt->in_data.client_total_num_tokens && !sckt->in_data.ready_chunks_queue.head); + channel->send_data.u.socket_token.num_tokens = SOCKET_TOKENS_TO_SEND_FOR_PROCESS; + } + sckt->in_data.num_tokens -= channel->send_data.u.socket_token.num_tokens; + sckt->in_data.client_total_num_tokens += channel->send_data.u.socket_token.num_tokens; + ASSERT(sckt->in_data.client_total_num_tokens <= SOCKET_WINDOW_SIZE); + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_TOKEN, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_token, + sizeof(channel->send_data.u.socket_token)); + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_channel_send_socket_out_data(TunnelChannel *channel, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, data_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + ReadyTunneledChunk *chunk; + uint32_t total_push_size = 0; + uint32_t pushed_bufs_num = 0; + + ASSERT(!sckt->pushed_close); + if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { + return; + } + + if (!sckt->out_data.num_tokens) { + return; // only when an we will recieve tokens, data will be sent again. + } + + ASSERT(sckt->out_data.ready_chunks_queue.head); + ASSERT(!sckt->out_data.push_tail); + ASSERT(sckt->out_data.ready_chunks_queue.head->size <= MAX_SOCKET_DATA_SIZE); + + channel->send_data.u.socket_data.connection_id = sckt->connection_id; + + red_channel_init_send_data(&channel->base, RED_TUNNEL_SOCKET_DATA, item); + red_channel_add_buf(&channel->base, &channel->send_data.u.socket_data, + sizeof(channel->send_data.u.socket_data)); + pushed_bufs_num++; + + // the first chunk is in a valid size + chunk = sckt->out_data.ready_chunks_queue.head; + total_push_size = chunk->size - sckt->out_data.ready_chunks_queue.offset; + red_channel_add_buf(&channel->base, chunk->data + sckt->out_data.ready_chunks_queue.offset, + total_push_size); + pushed_bufs_num++; + sckt->out_data.push_tail = chunk; + sckt->out_data.push_tail_size = chunk->size; // all the chunk was sent + + chunk = chunk->next; + + while (chunk && (total_push_size < MAX_SOCKET_DATA_SIZE) && (pushed_bufs_num < MAX_SEND_BUFS)) { + uint32_t cur_push_size = MIN(chunk->size, MAX_SOCKET_DATA_SIZE - total_push_size); + red_channel_add_buf(&channel->base, chunk->data, cur_push_size); + pushed_bufs_num++; + + sckt->out_data.push_tail = chunk; + sckt->out_data.push_tail_size = cur_push_size; + total_push_size += cur_push_size; + + chunk = chunk->next; + } + + sckt->out_data.num_tokens--; + + red_channel_begin_send_massage(&channel->base); +} + +static void tunnel_worker_release_socket_out_data(TunnelWorker *worker, PipeItem *item) +{ + RedSocketOutData *sckt_out_data = CONTAINEROF(item, RedSocketOutData, data_pipe_item); + RedSocket *sckt = CONTAINEROF(sckt_out_data, RedSocket, out_data); + + ASSERT(sckt_out_data->ready_chunks_queue.head); + + while (sckt_out_data->ready_chunks_queue.head != sckt_out_data->push_tail) { + sckt_out_data->data_size -= sckt_out_data->ready_chunks_queue.head->size; + ready_queue_pop_chunk(&sckt_out_data->ready_chunks_queue); + } + + sckt_out_data->data_size -= sckt_out_data->push_tail_size; + + // compansation. was substructed in the previous lines + sckt_out_data->data_size += sckt_out_data->ready_chunks_queue.offset; + + if (sckt_out_data->push_tail_size == sckt_out_data->push_tail->size) { + ready_queue_pop_chunk(&sckt_out_data->ready_chunks_queue); + sckt_out_data->ready_chunks_queue.offset = 0; + } else { + sckt_out_data->ready_chunks_queue.offset = sckt_out_data->push_tail_size; + } + + sckt_out_data->push_tail = NULL; + sckt_out_data->push_tail_size = 0; + + if (worker->channel) { + // can still send data to socket + if (__client_socket_can_receive(sckt)) { + if (sckt_out_data->ready_chunks_queue.head) { + // the pipe item may alreay be linked, if for example the send was + // blocked and before it finshed and called release, tunnel_socket_send was called + if (!red_channel_pipe_item_is_linked( + &worker->channel->base, &sckt_out_data->data_pipe_item)) { + sckt_out_data->data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&worker->channel->base, &sckt_out_data->data_pipe_item); + } + } else if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { + __tunnel_socket_add_fin_to_pipe(worker->channel, sckt); + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + __tunnel_socket_add_close_to_pipe(worker->channel, sckt); + } + } + } + + + if (((sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV)) && + !sckt->in_slirp_send && !worker->channel->mig_inprogress) { + // for cases that slirp couldn't write whole it data to our socket buffer + net_slirp_socket_can_send_notify(sckt->slirp_sckt); + } +} + +static void tunnel_channel_send_item(RedChannel *channel, PipeItem *item) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + + red_channel_reset_send_data(channel); + switch (item->type) { + case PIPE_ITEM_TYPE_SET_ACK: + tunnel_channel_send_set_ack(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_TUNNEL_INIT: + tunnel_channel_send_init(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SERVICE_IP_MAP: + tunnel_channel_send_service_ip_map(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_OPEN: + tunnel_channel_send_socket_open(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_DATA: + tunnel_channel_send_socket_out_data(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_FIN: + tunnel_channel_send_socket_fin(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_CLOSE: + tunnel_channel_send_socket_close(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_CLOSED_ACK: + tunnel_channel_send_socket_closed_ack(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_SOCKET_TOKEN: + tunnel_channel_send_socket_token(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_MIGRATE: + tunnel_channel_send_migrate(tunnel_channel, item); + break; + case PIPE_ITEM_TYPE_MIGRATE_DATA: + tunnel_channel_send_migrate_data(tunnel_channel, item); + break; + default: + red_error("invalid pipe item type"); + } +} + +/* param item_pushed: distinguishes between a pipe item that was pushed for sending, and + a pipe item that is still in the pipe and is released due to disconnection. + see red_pipe_item_clear */ +static void tunnel_channel_release_pipe_item(RedChannel *channel, PipeItem *item, int item_pushed) +{ + if (!item) { // e.g. when acking closed socket + return; + } + switch (item->type) { + case PIPE_ITEM_TYPE_SET_ACK: + case PIPE_ITEM_TYPE_TUNNEL_INIT: + free(item); + break; + case PIPE_ITEM_TYPE_SERVICE_IP_MAP: + case PIPE_ITEM_TYPE_SOCKET_OPEN: + case PIPE_ITEM_TYPE_SOCKET_CLOSE: + case PIPE_ITEM_TYPE_SOCKET_FIN: + case PIPE_ITEM_TYPE_SOCKET_TOKEN: + break; + case PIPE_ITEM_TYPE_SOCKET_DATA: + if (item_pushed) { + tunnel_worker_release_socket_out_data(((TunnelChannel *)channel)->worker, item); + } + break; + case PIPE_ITEM_TYPE_MIGRATE: + free(item); + break; + case PIPE_ITEM_TYPE_MIGRATE_DATA: + release_migrate_item((TunnelMigrateItem *)item); + break; + default: + red_error("invalid pipe item type"); + } +} + +/*********************************************************** +* interface for slirp +************************************************************/ + +static int qemu_can_output(SlirpUsrNetworkInterface *usr_interface) +{ + TunnelWorker *worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + return worker->vlan_interface->can_send_packet(worker->vlan_interface); +} + +static void qemu_output(SlirpUsrNetworkInterface *usr_interface, const uint8_t *pkt, int pkt_len) +{ + TunnelWorker *worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + worker->vlan_interface->send_packet(worker->vlan_interface, pkt, pkt_len); +} + +static int null_tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s) +{ + errno = ENETUNREACH; + return -1; +} + +static int tunnel_socket_connect(SlirpUsrNetworkInterface *usr_interface, + struct in_addr src_addr, uint16_t src_port, + struct in_addr dst_addr, uint16_t dst_port, + SlirpSocket *slirp_s, UserSocket **o_usr_s) +{ + TunnelWorker *worker; + RedSocket *sckt; + TunnelService *far_service; + + ASSERT(usr_interface); + +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG"); +#endif + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + ASSERT(worker->channel); + ASSERT(!worker->channel->mig_inprogress); + + far_service = tunnel_worker_find_service_by_addr(worker, &dst_addr, (uint32_t)ntohs(dst_port)); + + if (!far_service) { + errno = EADDRNOTAVAIL; + return -1; + } + + if (tunnel_worker_find_socket(worker, src_port, far_service->id)) { + red_printf("slirp tried to open a socket that is still opened"); + errno = EADDRINUSE; + return -1; + } + + if (worker->num_sockets == MAX_SOCKETS_NUM) { + red_printf("number of tunneled sockets exceeds the limit"); + errno = ENFILE; + return -1; + } + + sckt = tunnel_worker_create_socket(worker, src_port, far_service, slirp_s); + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + *o_usr_s = sckt; + sckt->out_data.status_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_OPEN; + red_channel_pipe_add(&worker->channel->base, &sckt->out_data.status_pipe_item); + + errno = EINPROGRESS; + return -1; +} + +static int null_tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent) +{ + errno = ECONNRESET; + return -1; +} + +static int tunnel_socket_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len, uint8_t urgent) +{ + TunnelWorker *worker; + RedSocket *sckt; + size_t size_to_send; + + ASSERT(usr_interface); + ASSERT(opaque); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + ASSERT(!worker->channel->mig_inprogress); + + sckt = (RedSocket *)opaque; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + errno = EAGAIN; + return -1; + } + + if ((sckt->client_status != CLIENT_SCKT_STATUS_OPEN) && + (sckt->client_status != CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + red_printf("client socket is unable to receive data"); + errno = ECONNRESET; + return -1; + } + + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { + red_printf("send was shutdown"); + errno = EPIPE; + return -1; + } + + if (urgent) { + SET_TUNNEL_ERROR(worker->channel, "urgent msgs not supported"); + tunnel_shutdown(worker); + errno = ECONNRESET; + return -1; + } + + sckt->in_slirp_send = TRUE; + + if (sckt->out_data.data_size < (sckt->out_data.window_size) * MAX_SOCKET_DATA_SIZE) { + // the current data in the queues doesn't fill all the tokens + size_to_send = len; + } else { + if (sckt->out_data.ready_chunks_queue.head) { + // there are no tokens for future data, but once the data will be sent + // and buffers will be released, we will try to send again. + size_to_send = 0; + } else { + ASSERT(sckt->out_data.process_queue->head); + if ((sckt->out_data.data_size + len) > + (MAX_SOCKET_OUT_BUFFERS * MAX_SOCKET_DATA_SIZE)) { + red_printf("socket out buffers overflow, socket will be closed" + " (local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + tunnel_socket_force_close(worker->channel, sckt); + size_to_send = 0; + } else { + size_to_send = len; + } + } + } + + if (size_to_send) { + process_queue_append(sckt->out_data.process_queue, buf, size_to_send); + sckt->out_data.data_size += size_to_send; + + if (sckt->out_data.ready_chunks_queue.head && + !red_channel_pipe_item_is_linked(&worker->channel->base, + &sckt->out_data.data_pipe_item)) { + sckt->out_data.data_pipe_item.type = PIPE_ITEM_TYPE_SOCKET_DATA; + red_channel_pipe_add(&worker->channel->base, &sckt->out_data.data_pipe_item); + } + } + + sckt->in_slirp_send = FALSE; + + if (!size_to_send) { + errno = EAGAIN; + return -1; + } else { + return size_to_send; + } +} + +static inline int __should_send_fin_to_guest(RedSocket *sckt) +{ + return (((sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) || + ((sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) && + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND))) && + !sckt->in_data.ready_chunks_queue.head); +} + +static int null_tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len) +{ + errno = ECONNRESET; + return -1; +} + +static int tunnel_socket_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque, + uint8_t *buf, size_t len) +{ + TunnelWorker *worker; + RedSocket *sckt; + int copied = 0; + + ASSERT(usr_interface); + ASSERT(opaque); + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + ASSERT(!worker->channel->mig_inprogress); + + sckt = (RedSocket *)opaque; + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + errno = EAGAIN; + return -1; + } + + if ((sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) || + (sckt->slirp_status == SLIRP_SCKT_STATUS_WAIT_CLOSE)) { + SET_TUNNEL_ERROR(worker->channel, "recieve was shutdown"); + tunnel_shutdown(worker); + errno = ECONNRESET; + return -1; + } + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + SET_TUNNEL_ERROR(worker->channel, "slirp socket not connected"); + tunnel_shutdown(worker); + errno = ECONNRESET; + return -1; + } + + ASSERT((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND) || + ((sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) && + (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND))); + + + // if there is data in ready queue, when it is acked, slirp will call recv and get 0 + if (__should_send_fin_to_guest(sckt)) { + if (sckt->in_data.process_queue->head) { + red_printf("client socket sent FIN but there are still buffers in incoming process" + "queue (local_port=%d, service_id=%d)", + ntohs(sckt->local_port), sckt->far_service->id); + } + return 0; // slirp will call shutdown recv now and it will also send FIN to the guest. + } + + while (sckt->in_data.ready_chunks_queue.head && (copied < len)) { + ReadyTunneledChunk *cur_chunk = sckt->in_data.ready_chunks_queue.head; + int copy_count = MIN(cur_chunk->size - sckt->in_data.ready_chunks_queue.offset, + len - copied); + + memcpy(buf + copied, cur_chunk->data + sckt->in_data.ready_chunks_queue.offset, copy_count); + copied += copy_count; + if ((sckt->in_data.ready_chunks_queue.offset + copy_count) == cur_chunk->size) { + ready_queue_pop_chunk(&sckt->in_data.ready_chunks_queue); + sckt->in_data.ready_chunks_queue.offset = 0; + } else { + ASSERT(copied == len); + sckt->in_data.ready_chunks_queue.offset += copy_count; + } + } + + if (!copied) { + errno = EAGAIN; + return -1; + } else { + return copied; + } +} + +static void null_tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque) +{ +} + +// can be called : 1) when a FIN is reqested from the guset 2) after shutdown rcv that was called +// after recieved failed because the client socket was sent FIN +static void tunnel_socket_shutdown_send(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + sckt = (RedSocket *)opaque; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + ASSERT(!worker->channel->mig_inprogress); + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_DELAY_ABORT) { + return; + } + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) { + sckt->slirp_status = SLIRP_SCKT_STATUS_SHUTDOWN_SEND; + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_RECV) { + ASSERT(sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND); + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + } else { + SET_TUNNEL_ERROR(worker->channel, "unexpected tunnel_socket_shutdown_send slirp_status=%d", + sckt->slirp_status); + tunnel_shutdown(worker); + return; + } + + if ((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + // check if there is still data to send. the fin will be sent after data is released + // channel is alive, otherwise the sockets would have been aborted + if (!sckt->out_data.ready_chunks_queue.head) { + __tunnel_socket_add_fin_to_pipe(worker->channel, sckt); + } + } else { // if client is closed, it means the connection was aborted since we didn't + // received fin from guest + SET_TUNNEL_ERROR(worker->channel, + "unexpected tunnel_socket_shutdown_send client_status=%d", + sckt->client_status); + tunnel_shutdown(worker); + } +} + +static void null_tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, + UserSocket *opaque) +{ +} + +static void tunnel_socket_shutdown_recv(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + sckt = (RedSocket *)opaque; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + ASSERT(!worker->channel->mig_inprogress); + + /* failure in recv can happen after the client sckt was shutdown + (after client sent FIN, or after slirp sent FIN and client socket was closed */ + if (!__should_send_fin_to_guest(sckt)) { + SET_TUNNEL_ERROR(worker->channel, + "unexpected tunnel_socket_shutdown_recv client_status=%d slirp_status=%d", + sckt->client_status, sckt->slirp_status); + tunnel_shutdown(worker); + return; + } + + if (sckt->slirp_status == SLIRP_SCKT_STATUS_OPEN) { + sckt->slirp_status = SLIRP_SCKT_STATUS_SHUTDOWN_RECV; + } else if (sckt->slirp_status == SLIRP_SCKT_STATUS_SHUTDOWN_SEND) { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + } else { + SET_TUNNEL_ERROR(worker->channel, + "unexpected tunnel_socket_shutdown_recv slirp_status=%d", + sckt->slirp_status); + tunnel_shutdown(worker); + } +} + +static void null_tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + sckt = (RedSocket *)opaque; +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + sckt->slirp_status = SLIRP_SCKT_STATUS_CLOSED; + + if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { + tunnel_worker_free_socket(worker, sckt); + } // else, it will be closed when disconnect will be called (because this callback is + // set if the channel is disconnect or when we are in the middle of disconnection that + // was caused by an error +} + +// can be called during migration due to the channel diconnect. But it does not affect the +// migrate data +static void tunnel_socket_close(SlirpUsrNetworkInterface *usr_interface, UserSocket *opaque) +{ + TunnelWorker *worker; + RedSocket *sckt; + + ASSERT(usr_interface); + ASSERT(opaque); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + sckt = (RedSocket *)opaque; + +#ifdef DEBUG_NETWORK + PRINT_SCKT(sckt); +#endif + + sckt->slirp_status = SLIRP_SCKT_STATUS_CLOSED; + + // if sckt is not opened yet, close will be sent when we recieve connect ack + if ((sckt->client_status == CLIENT_SCKT_STATUS_OPEN) || + (sckt->client_status == CLIENT_SCKT_STATUS_SHUTDOWN_SEND)) { + // check if there is still data to send. the close will be sent after data is released. + // close may already been pushed if it is a forced close + if (!sckt->out_data.ready_chunks_queue.head && !sckt->pushed_close) { + __tunnel_socket_add_close_to_pipe(worker->channel, sckt); + } + } else if (sckt->client_status == CLIENT_SCKT_STATUS_CLOSED) { + if (sckt->client_waits_close_ack) { + __tunnel_socket_add_close_ack_to_pipe(worker->channel, sckt); + } else { + tunnel_worker_free_socket(worker, sckt); + } + } +} + +static UserTimer *create_timer(SlirpUsrNetworkInterface *usr_interface, + timer_proc_t proc, void *opaque) +{ + TunnelWorker *worker; + + ASSERT(usr_interface); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; + + return (void *)worker->core_interface->create_timer(worker->core_interface, + proc, opaque); +} + +static void arm_timer(SlirpUsrNetworkInterface *usr_interface, UserTimer *timer, uint32_t ms) +{ + TunnelWorker *worker; + + ASSERT(usr_interface); + + worker = ((RedSlirpNetworkInterface *)usr_interface)->worker; +#ifdef DEBUG_NETWORK + if (!worker->channel) { + red_printf("channel not connected"); + } +#endif + if (worker->channel && worker->channel->mig_inprogress) { + SET_TUNNEL_ERROR(worker->channel, "during migration"); + tunnel_shutdown(worker); + return; + } + + worker->core_interface->arm_timer(worker->core_interface, (VDObjectRef)timer, ms); +} + +/*********************************************** +* channel interface and other related procedures +************************************************/ + +static int tunnel_channel_config_socket(RedChannel *channel) +{ + int flags; + int delay_val; + + if ((flags = fcntl(channel->peer->socket, F_GETFL)) == -1) { + red_printf("accept failed, %s", strerror(errno)); // can't we just use red_error? + return FALSE; + } + + if (fcntl(channel->peer->socket, F_SETFL, flags | O_NONBLOCK) == -1) { + red_printf("accept failed, %s", strerror(errno)); + return FALSE; + } + + delay_val = 1; + + if (setsockopt(channel->peer->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, + sizeof(delay_val)) == -1) { + red_printf("setsockopt failed, %s", strerror(errno)); + } + + return TRUE; +} + +static void tunnel_worker_disconnect_slirp(TunnelWorker *worker) +{ + int i; + + net_slirp_set_net_interface(&worker->null_interface.base); + for (i = 0; i < MAX_SOCKETS_NUM; i++) { + RedSocket *sckt = worker->sockets + i; + if (sckt->allocated) { + sckt->client_status = CLIENT_SCKT_STATUS_CLOSED; + sckt->client_waits_close_ack = FALSE; + if (sckt->slirp_status == SLIRP_SCKT_STATUS_CLOSED) { + tunnel_worker_free_socket(worker, sckt); + } else { + sckt->slirp_status = SLIRP_SCKT_STATUS_WAIT_CLOSE; + net_slirp_socket_abort(sckt->slirp_sckt); + } + } + } +} + +/* don't call disconnect from functions that might be called by slirp + since it closes all its sockets and slirp is not aware of it */ +static void tunnel_channel_disconnect(RedChannel *channel) +{ + TunnelChannel *tunnel_channel = (TunnelChannel *)channel; + TunnelWorker *worker; + if (!channel) { + return; + } + red_printf(""); + worker = tunnel_channel->worker; + + tunnel_worker_disconnect_slirp(worker); + + tunnel_worker_clear_routed_network(worker); + red_channel_destroy(channel); + worker->channel = NULL; +} + +/* interface for reds */ + +static void on_new_tunnel_channel(TunnelChannel *channel) +{ + red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_SET_ACK); + + if (channel->base.migrate) { + channel->expect_migrate_data = TRUE; + } else { + red_channel_init_outgoing_messages_window(&channel->base); + red_channel_pipe_add_type(&channel->base, PIPE_ITEM_TYPE_TUNNEL_INIT); + } +} + +static void handle_tunnel_channel_link(Channel *channel, RedsStreamContext *peer, int migration, + int num_common_caps, uint32_t *common_caps, int num_caps, + uint32_t *caps) +{ + TunnelChannel *tunnel_channel; + TunnelWorker *worker = (TunnelWorker *)channel->data; + if (worker->channel) { + tunnel_channel_disconnect(&worker->channel->base); + } + + tunnel_channel = + (TunnelChannel *)red_channel_create(sizeof(*tunnel_channel), peer, worker->core_interface, + migration, TRUE, + tunnel_channel_config_socket, + tunnel_channel_disconnect, + tunnel_channel_handle_message, + tunnel_channel_alloc_msg_rcv_buf, + tunnel_channel_release_msg_rcv_buf, + tunnel_channel_send_item, + tunnel_channel_release_pipe_item); + + if (!tunnel_channel) { + return; + } + + + tunnel_channel->worker = worker; + tunnel_channel->worker->channel = tunnel_channel; + net_slirp_set_net_interface(&worker->tunnel_interface.base); + + on_new_tunnel_channel(tunnel_channel); +} + +static void handle_tunnel_channel_shutdown(struct Channel *channel) +{ + tunnel_channel_disconnect(&((TunnelWorker *)channel->data)->channel->base); +} + +static void handle_tunnel_channel_migrate(struct Channel *channel) +{ +#ifdef DEBUG_NETWORK + red_printf("TUNNEL_DBG: MIGRATE STARTED"); +#endif + TunnelChannel *tunnel_channel = ((TunnelWorker *)channel->data)->channel; + tunnel_channel->mig_inprogress = TRUE; + net_slirp_freeze(); + red_channel_pipe_add_type(&tunnel_channel->base, PIPE_ITEM_TYPE_MIGRATE); +} + diff --git a/server/red_tunnel_worker.h b/server/red_tunnel_worker.h new file mode 100755 index 00000000..5d00a341 --- /dev/null +++ b/server/red_tunnel_worker.h @@ -0,0 +1,29 @@ +/* + Copyright (C) 2009 Red Hat, Inc. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License as + published by the Free Software Foundation; either version 2 of + the License, or (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. + + + Author: + yhalperi@redhat.com +*/ + +#ifndef _H_RED_TUNNEL_WORKER +#define _H_RED_TUNNEL_WORKER + +#include "vd_interface.h" + +void *red_tunnel_attach(CoreInterface *core_interface, NetWireInterface *vlan_interface); + +#endif diff --git a/server/reds.c b/server/reds.c index 067304d7..4630e958 100644 --- a/server/reds.c +++ b/server/reds.c @@ -47,6 +47,7 @@ #include "red_common.h" #include "red_dispatcher.h" #include "snd_worker.h" +#include "red_tunnel_worker.h" #include "reds_stat.h" #include "stat.h" #include "ring.h" @@ -83,6 +84,7 @@ static pthread_mutex_t *lock_cs; static long *lock_count; uint32_t streaming_video = TRUE; image_compression_t image_compression = IMAGE_COMPRESS_AUTO_GLZ; +void *red_tunnel = NULL; int agent_mouse = TRUE; static void openssl_init(); @@ -3921,7 +3923,7 @@ int __attribute__ ((visibility ("default"))) spice_parse_args(const char *in_arg // All SSL parameters should be either on or off. if (ssl_port != ssl_key || ssl_key != ssl_certs || ssl_certs != ssl_cafile || - ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) { + ssl_cafile != ssl_dhfile || ssl_dhfile != ssl_ciphersuite) { goto error; } @@ -4775,6 +4777,19 @@ static void interface_change_notifier(void *opaque, VDInterface *interface, return; } attach_to_red_agent((VDIPortInterface *)interface); + } else if (strcmp(interface->type, VD_INTERFACE_NET_WIRE) == 0) { + NetWireInterface * net_wire = (NetWireInterface *)interface; + red_printf("VD_INTERFACE_NET_WIRE"); + if (red_tunnel) { + red_printf("net wire already attached"); + return; + } + if (interface->major_version != VD_INTERFACE_NET_WIRE_MAJOR || + interface->minor_version < VD_INTERFACE_NET_WIRE_MINOR) { + red_printf("unsuported net wire interface"); + return; + } + red_tunnel = red_tunnel_attach(core, net_wire); } break; case VD_INTERFACE_REMOVING: diff --git a/server/reds.h b/server/reds.h index ce09e5b6..248edffa 100644 --- a/server/reds.h +++ b/server/reds.h @@ -27,6 +27,9 @@ typedef struct RedsStreamContext { int socket; + /* set it to TRUE if you shutdown the socket. shutdown read doesn't work as accepted - + receive may return data afterwards. check the flag before calling receive*/ + int shutdown; SSL *ssl; int (*cb_write)(void *, void *, int); diff --git a/server/vd_interface.h b/server/vd_interface.h index 932c0b13..12dbd5ff 100644 --- a/server/vd_interface.h +++ b/server/vd_interface.h @@ -330,5 +330,23 @@ struct VDIPortInterface { int (*read)(VDIPortInterface *port, VDObjectRef plug, uint8_t *buf, int len); }; +#define VD_INTERFACE_NET_WIRE "net_wire" +#define VD_INTERFACE_NET_WIRE_MAJOR 1 +#define VD_INTERFACE_NET_WIRE_MINOR 1 + +typedef struct NetWireInterface NetWireInterface; +typedef void (*net_wire_packet_route_proc_t)(void *opaque, const uint8_t *pkt, int pkt_len); + +struct NetWireInterface { + VDInterface base; + + struct in_addr (*get_ip)(NetWireInterface *vlan); + int (*can_send_packet)(NetWireInterface *vlan); + void (*send_packet)(NetWireInterface *vlan, const uint8_t *buf, int size); + VDObjectRef (*register_route_packet)(NetWireInterface *vlan, net_wire_packet_route_proc_t proc, + void *opaque); + void (*unregister_route_packet)(NetWireInterface *vlan, VDObjectRef proc); +}; + #endif |