/*
Copyright (C) 2009 Red Hat, Inc.
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
published by the Free Software Foundation; either version 2 of
the License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdint.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <limits.h>
#include <time.h>
#include <pthread.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#include <ctype.h>
#include <openssl/bio.h>
#include <openssl/pem.h>
#include <openssl/bn.h>
#include <openssl/rsa.h>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include "spice.h"
#include "reds.h"
#include "red.h"
#include "vd_agent.h"
#include "red_common.h"
#include "red_dispatcher.h"
#include "snd_worker.h"
#include "red_tunnel_worker.h"
#include "reds_stat.h"
#include "stat.h"
#include "ring.h"
#include "config.h"
CoreInterface *core = NULL;
static MigrationInterface *mig = NULL;
static KeyboardInterface *keyboard = NULL;
static MouseInterface *mouse = NULL;
static TabletInterface *tablet = NULL;
static VDIPortInterface *vdagent = NULL;
#define MIGRATION_NOTIFY_SPICE_KEY "spice_mig_ext"
#define REDS_MIG_VERSION 1
#define REDS_MIG_CONTINUE 1
#define REDS_MIG_ABORT 2
#define REDS_AGENT_WINDOW_SIZE 10
#define REDS_TOKENS_TO_SEND 5
#define REDS_NUM_INTERNAL_AGENT_MESSAGES 1
#define REDS_VDI_PORT_NUM_RECIVE_BUFFS 5
#define REDS_MAX_SEND_IOVEC 100
#define NET_TEST_WARMUP_BYTES 0
#define NET_TEST_BYTES (1024 * 250)
static int spice_port = -1;
static int spice_secure_port = -1;
static struct in_addr spice_addr = {INADDR_ANY};
static int ticketing_enabled = 1; //Ticketing is enabled by default
static pthread_mutex_t *lock_cs;
static long *lock_count;
uint32_t streaming_video = STREAM_VIDEO_FILTER;
image_compression_t image_compression = IMAGE_COMPRESS_AUTO_GLZ;
void *red_tunnel = NULL;
int agent_mouse = TRUE;
static void openssl_init();
#define MIGRATE_TIMEOUT (1000 * 10) /* 10sec */
#define PING_INTERVAL (1000 * 10)
#define KEY_MODIFIERS_TTL (1000 * 2) /*2sec*/
#define MM_TIMER_GRANULARITY_MS (1000 / 30)
#define MM_TIME_DELTA 400 /*ms*/
// approximate max recive message size
#define RECIVE_BUF_SIZE \
(4096 + (REDS_AGENT_WINDOW_SIZE + REDS_NUM_INTERNAL_AGENT_MESSAGES) * RED_AGENT_MAX_DATA_SIZE)
#define SEND_BUF_SIZE 4096
#define SCROLL_LOCK_SCAN_CODE 0x46
#define NUM_LOCK_SCAN_CODE 0x45
#define CAPS_LOCK_SCAN_CODE 0x3a
typedef struct IncomingHandler {
void *opaque;
int shut;
uint8_t buf[RECIVE_BUF_SIZE];
uint32_t end_pos;
void (*handle_message)(void *opaque, RedDataHeader *message);
} IncomingHandler;
typedef struct OutgoingHandler {
void *opaque;
uint8_t buf[SEND_BUF_SIZE];
uint8_t *now;
uint32_t length;
void (*select)(void *opaque, int select);
void (*may_write)(void *opaque);
} OutgoingHandler;
typedef struct TicketAuthentication {
char password[RED_MAX_PASSWORD_LENGTH];
time_t expiration_time;
} TicketAuthentication;
static TicketAuthentication taTicket;
typedef struct TicketInfo {
RSA *rsa;
int rsa_size;
BIGNUM *bn;
RedLinkEncryptedTicket encrypted_ticket;
} TicketInfo;
typedef struct MonitorMode {
uint32_t x_res;
uint32_t y_res;
} MonitorMode;
typedef struct RedsOutItem RedsOutItem;
struct RedsOutItem {
RingItem link;
void (*prepare)(RedsOutItem *item, struct iovec* vec, int *len);
void (*release)(RedsOutItem *item);
};
typedef struct VDIReadBuf {
RedsOutItem out_item;
int len;
RedDataHeader header;
uint8_t data[RED_AGENT_MAX_DATA_SIZE];
} VDIReadBuf;
enum {
VDI_PORT_READ_STATE_READ_HADER,
VDI_PORT_READ_STATE_GET_BUFF,
VDI_PORT_READ_STATE_READ_DATA,
};
enum {
VDP_CLIENT_PORT = 1,
VDP_SERVER_PORT,
};
typedef struct __attribute__ ((__packed__)) VDIChunkHeader {
uint32_t port;
uint32_t size;
} VDIChunkHeader;
typedef struct VDIPortState {
VDIPortPlug plug;
VDObjectRef plug_ref;
uint32_t plug_generation;
uint32_t num_tokens;
uint32_t num_client_tokens;
Ring external_bufs;
Ring internal_bufs;
Ring write_queue;
Ring read_bufs;
uint32_t read_state;
uint32_t message_recive_len;
uint8_t *recive_pos;
uint32_t recive_len;
VDIReadBuf *current_read_buf;
VDIChunkHeader vdi_chunk_header;
int client_agent_started;
uint32_t send_tokens;
} VDIPortState;
typedef struct InputsState {
Channel *channel;
RedsStreamContext *peer;
uint8_t buf[RECIVE_BUF_SIZE];
uint32_t end_pos;
IncomingHandler in_handler;
OutgoingHandler out_handler;
VDAgentMouseState mouse_state;
int pending_mouse_event;
uint32_t motion_count;
uint64_t serial; //migrate me
} InputsState;
typedef struct RedsOutgoingData {
Ring pipe;
RedsOutItem *item;
int vec_size;
struct iovec vec_buf[REDS_MAX_SEND_IOVEC];
struct iovec *vec;
} RedsOutgoingData;
enum NetTestStage {
NET_TEST_STAGE_INVALID,
NET_TEST_STAGE_WARMUP,
NET_TEST_STAGE_LATENCY,
NET_TEST_STAGE_RATE,
};
#ifdef RED_STATISTICS
#define REDS_MAX_STAT_NODES 100
#define REDS_STAT_SHM_SIZE (sizeof(RedsStat) + REDS_MAX_STAT_NODES * sizeof(StatNode))
typedef struct RedsStatValue {
uint32_t value;
uint32_t min;
uint32_t max;
uint32_t average;
uint32_t count;
} RedsStatValue;
#endif
typedef struct RedsState {
int listen_socket;
int secure_listen_socket;
RedsStreamContext *peer;
int disconnecting;
uint32_t link_id;
uint64_t serial; //migrate me
VDIPortState agent_state;
InputsState *inputs_state;
VDObjectRef mig_notifier;
int mig_wait_connect;
int mig_wait_disconnect;
int mig_inprogress;
int mig_target;
int num_of_channels;
IncomingHandler in_handler;
RedsOutgoingData outgoing;
Channel *channels;
int mouse_mode;
int is_client_mouse_allowed;
int dispatcher_allows_client_mouse;
MonitorMode monitor_mode;
VDObjectRef mig_timer;
VDObjectRef key_modifiers_timer;
VDObjectRef mm_timer;
TicketAuthentication taTicket;
SSL_CTX *ctx;
#ifdef RED_STATISTICS
char *stat_shm_name;
RedsStat *stat;
pthread_mutex_t stat_lock;
RedsStatValue roundtrip_stat;
VDObjectRef ping_timer;
int ping_interval;
#endif
uint32_t ping_id;
uint32_t net_test_id;
int net_test_stage;
} RedsState;
uint64_t bitrate_per_sec = ~0;
static uint64_t letancy = 0;
static RedsState *reds = NULL;
typedef struct AsyncRead {
RedsStreamContext *peer;
void *opaque;
uint8_t *now;
uint8_t *end;
int active_file_handlers;
void (*done)(void *opaque);
void (*error)(void *opaque, int err);
} AsyncRead;
typedef struct RedLinkInfo {
RedsStreamContext *peer;
AsyncRead asyc_read;
RedLinkHeader link_header;
RedLinkMess *link_mess;
int mess_pos;
TicketInfo tiTicketing;
} RedLinkInfo;
typedef struct VDIPortBuf VDIPortBuf;
struct __attribute__ ((__packed__)) VDIPortBuf {
RingItem link;
uint8_t *now;
int write_len;
void (*free)(VDIPortBuf *buf);
VDIChunkHeader chunk_header; //start send from &chunk_header
};
typedef struct __attribute__ ((__packed__)) VDAgentExtBuf {
VDIPortBuf base;
uint8_t buf[RED_AGENT_MAX_DATA_SIZE];
VDIChunkHeader migrate_overflow;
} VDAgentExtBuf;
typedef struct __attribute__ ((__packed__)) VDInternalBuf {
VDIPortBuf base;
VDAgentMessage header;
union {
VDAgentMouseState mouse_state;
}
u;
VDIChunkHeader migrate_overflow;
} VDInternalBuf;
typedef struct RedSSLParameters {
char keyfile_password[256];
char certs_file[256];
char private_key_file[256];
char ca_certificate_file[256];
char dh_key_file[256];
char ciphersuite[256];
} RedSSLParameters;
#define CHANNEL_SECURITY_NON (1 << 0)
#define CHANNEL_SECURITY_SSL (1 << 1)
typedef struct ChannelSecurityOptions ChannelSecurityOptions;
struct ChannelSecurityOptions {
uint32_t channel_id;
uint32_t options;
ChannelSecurityOptions *next;
};
typedef struct PingItem {
RedsOutItem base;
RedDataHeader header;
RedPing ping;
int size;
} PingItem;
#define ZERO_BUF_SIZE 4096
static uint8_t zero_page[ZERO_BUF_SIZE] = {0};
static void reds_main_write(void *data);
static void reds_push();
static ChannelSecurityOptions *channels_security = NULL;
static int default_channel_security = CHANNEL_SECURITY_NON | CHANNEL_SECURITY_SSL;
static RedSSLParameters ssl_parameters;
void (*log_proc)(CoreInterface *core, LogLevel level, const char* component,
const char* format, ...) = NULL;
#define LOG_MESSAGE(level, format, ...) { \
if (log_proc) { \
log_proc(core, level, "spice", format, ## __VA_ARGS__ ); \
} \
}
static int args_is_empty(const VDICmdArg* args)
{
return !args || args[0].descriptor.type == ARG_TYPE_INVALID;
}
const int args_is_string(const VDICmdArg* args)
{
return !args_is_empty(args) && args->descriptor.type == ARG_TYPE_STRING;
}
const int args_is_int(const VDICmdArg* args)
{
return !args_is_empty(args) && args->descriptor.type == ARG_TYPE_INT;
}
static ChannelSecurityOptions *find_channel_security(int id)
{
ChannelSecurityOptions *now = channels_security;
while (now && now->channel_id != id) {
now = now->next;
}
return now;
}
static int reds_write(void *ctx, void *buf, size_t size)
{
int return_code;
int sock = (long)ctx;
size_t count = size;
return_code = write(sock, buf, count);
return (return_code);
}
static int reds_read(void *ctx, void *buf, size_t size)
{
int return_code;
int sock = (long)ctx;
size_t count = size;
return_code = read(sock, buf, count);
return (return_code);
}
static int reds_free(RedsStreamContext *peer)
{
close(peer->socket);
free(peer);
return 0;
}
static int reds_ssl_write(void *ctx, void *buf, size_t size)
{
int return_code;
int ssl_error;
SSL *ssl = ctx;
return_code = SSL_write(ssl, buf, size);
if (return_code < 0) {
ssl_error = SSL_get_error(ssl, return_code);
}
return (return_code);
}
static int reds_ssl_read(void *ctx, void *buf, size_t size)
{
int return_code;
int ssl_error;
SSL *ssl = ctx;
return_code = SSL_read(ssl, buf, size);
if (return_code < 0) {
ssl_error = SSL_get_error(ssl, return_code);
}
return (return_code);
}
static int reds_ssl_writev(void *ctx, const struct iovec *vector, int count)
{
int i;
int n;
int return_code = 0;
int ssl_error;
SSL *ssl = ctx;
for (i = 0; i < count; ++i) {
n = SSL_write(ssl, vector[i].iov_base, vector[i].iov_len);
if (n <= 0) {
ssl_error = SSL_get_error(ssl, n);
if (return_code <= 0) {
return n;
} else {
break;
}
} else {
return_code += n;
}
}
return return_code;
}
static int reds_ssl_free(RedsStreamContext *peer)
{
SSL_free(peer->ssl);
close(peer->socket);
free(peer);
return 0;
}
static void __reds_release_link(RedLinkInfo *link)
{
ASSERT(link->peer);
core->set_file_handlers(core, link->peer->socket, NULL, NULL, NULL);
free(link->link_mess);
BN_free(link->tiTicketing.bn);
if (link->tiTicketing.rsa) {
RSA_free(link->tiTicketing.rsa);
}
free(link);
}
static inline void reds_release_link(RedLinkInfo *link)
{
RedsStreamContext *peer = link->peer;
__reds_release_link(link);
peer->cb_free(peer);
}
static void reds_do_disable_ticketing(void)
{
ticketing_enabled = 0;
memset(taTicket.password, 0, sizeof(taTicket.password));
core->term_printf(core, "Ticketing is now disabled.\n");
}
static void reds_do_disable_ticketing_2(const VDICmdArg* args)
{
if (!args_is_empty(args)) {
red_printf("invalid args");
return;
}
reds_do_disable_ticketing();
}
static char *base64decode(const char *input, int length)
{
BIO *b64;
BIO *bmem;
int n;
char *buffer = (char *)malloc(length);
memset(buffer, 0, length);
char *inbuffer = (char *)malloc(length + 1);
memset(inbuffer, 0, length + 1);
memcpy(inbuffer, input, length);
inbuffer[length] = '\n';
b64 = BIO_new(BIO_f_base64());
bmem = BIO_new_mem_buf(inbuffer, length + 1);
if (b64 != NULL && bmem != NULL) {
bmem = BIO_push(b64, bmem);
n = BIO_read(bmem, buffer, length);
if (n != 0) {
buffer[n - 1] = '\0';
} else {
free(buffer);
buffer = NULL;
}
} else {
free(buffer);
buffer = NULL;
}
BIO_free_all(bmem);
return buffer;
}
static void reds_do_info_ticket(void)
{
core->term_printf(core, "Ticket Information:");
if (ticketing_enabled) {
if (strlen(taTicket.password) == 0) {
core->term_printf(core, " blocked\n");
} else {
if (taTicket.expiration_time == INT_MAX) {
core->term_printf(core, " expiration NEVER\n");
} else {
time_t now;
time(&now);
int expired = taTicket.expiration_time < now;
if (expired) {
core->term_printf(core, " expiration EXPIRED\n");
} else {
core->term_printf(core, " expiration %s\n",
ctime((time_t *)&(taTicket.expiration_time)));
}
}
}
} else {
core->term_printf(core, " disabled\n");
}
}
static struct iovec *reds_iovec_skip(struct iovec vec[], int skip, int *vec_size)
{
struct iovec *now = vec;
while (skip && skip >= now->iov_len) {
skip -= now->iov_len;
--*vec_size;
now++;
}
now->iov_base = (uint8_t *)now->iov_base + skip;
now->iov_len -= skip;
return now;
}
#ifdef RED_STATISTICS
#define STAT_TAB_LEN 4
#define STAT_VALUE_TABS 7
static void print_stat_tree(uint32_t node_index, int depth)
{
StatNode *node = &reds->stat->nodes[node_index];
if ((node->flags & STAT_NODE_MASK_SHOW) == STAT_NODE_MASK_SHOW) {
core->term_printf(core, "%*s%s", depth * STAT_TAB_LEN, "", node->name);
if (node->flags & STAT_NODE_FLAG_VALUE) {
core->term_printf(core, ":%*s%llu\n",
(STAT_VALUE_TABS - depth) * STAT_TAB_LEN - strlen(node->name) - 1, "",
node->value);
} else {
core->term_printf(core, "\n");
if (node->first_child_index != INVALID_STAT_REF) {
print_stat_tree(node->first_child_index, depth + 1);
}
}
}
if (node->next_sibling_index != INVALID_STAT_REF) {
print_stat_tree(node->next_sibling_index, depth);
}
}
static void do_info_statistics()
{
core->term_printf(core, "Spice Statistics:\n");
print_stat_tree(reds->stat->root_index, 0);
}
static void do_reset_statistics()
{
StatNode *node;
int i;
for (i = 0; i <= REDS_MAX_STAT_NODES; i++) {
node = &reds->stat->nodes[i];
if (node->flags & STAT_NODE_FLAG_VALUE) {
node->value = 0;
}
}
}
static void do_reset_statistics_2(const VDICmdArg* args)
{
if (!args_is_empty(args)) {
red_printf("invalid args");
return;
}
do_reset_statistics();
}
void insert_stat_node(StatNodeRef parent, StatNodeRef ref)
{
StatNode *node = &reds->stat->nodes[ref];
uint32_t pos = INVALID_STAT_REF;
uint32_t node_index;
uint32_t *head;
StatNode *n;
node->first_child_index = INVALID_STAT_REF;
head = (parent == INVALID_STAT_REF ? &reds->stat->root_index :
&reds->stat->nodes[parent].first_child_index);
node_index = *head;
while (node_index != INVALID_STAT_REF && (n = &reds->stat->nodes[node_index]) &&
strcmp(node->name, n->name) > 0) {
pos = node_index;
node_index = n->next_sibling_index;
}
if (pos == INVALID_STAT_REF) {
node->next_sibling_index = *head;
*head = ref;
} else {
n = &reds->stat->nodes[pos];
node->next_sibling_index = n->next_sibling_index;
n->next_sibling_index = ref;
}
}
StatNodeRef stat_add_node(StatNodeRef parent, const char *name, int visible)
{
StatNodeRef ref;
StatNode *node;
ASSERT(name && strlen(name) > 0);
if (strlen(name) >= sizeof(node->name)) {
return INVALID_STAT_REF;
}
pthread_mutex_lock(&reds->stat_lock);
ref = (parent == INVALID_STAT_REF ? reds->stat->root_index :
reds->stat->nodes[parent].first_child_index);
while (ref != INVALID_STAT_REF) {
node = &reds->stat->nodes[ref];
if (strcmp(name, node->name)) {
ref = node->next_sibling_index;
} else {
pthread_mutex_unlock(&reds->stat_lock);
return ref;
}
}
if (reds->stat->num_of_nodes >= REDS_MAX_STAT_NODES || reds->stat == NULL) {
pthread_mutex_unlock(&reds->stat_lock);
return INVALID_STAT_REF;
}
reds->stat->generation++;
reds->stat->num_of_nodes++;
for (ref = 0; ref <= REDS_MAX_STAT_NODES; ref++) {
node = &reds->stat->nodes[ref];
if (!(node->flags & STAT_NODE_FLAG_ENABLED)) {
break;
}
}
ASSERT(!(node->flags & STAT_NODE_FLAG_ENABLED));
node->value = 0;
node->flags = STAT_NODE_FLAG_ENABLED | (visible ? STAT_NODE_FLAG_VISIBLE : 0);
strncpy(node->name, name, sizeof(node->name));
insert_stat_node(parent, ref);
pthread_mutex_unlock(&reds->stat_lock);
return ref;
}
void stat_remove(StatNode *node)
{
pthread_mutex_lock(&reds->stat_lock);
node->flags &= ~STAT_NODE_FLAG_ENABLED;
reds->stat->generation++;
reds->stat->num_of_nodes--;
pthread_mutex_unlock(&reds->stat_lock);
}
void stat_remove_node(StatNodeRef ref)
{
stat_remove(&reds->stat->nodes[ref]);
}
uint64_t *stat_add_counter(StatNodeRef parent, const char *name, int visible)
{
StatNodeRef ref = stat_add_node(parent, name, visible);
StatNode *node;
if (ref == INVALID_STAT_REF) {
return NULL;
}
node = &reds->stat->nodes[ref];
node->flags |= STAT_NODE_FLAG_VALUE;
return &node->value;
}
void stat_remove_counter(uint64_t *counter)
{
stat_remove((StatNode *)(counter - offsetof(StatNode, value)));
}
static void reds_update_stat_value(RedsStatValue* stat_value, uint32_t value)
{
stat_value->value = value;
stat_value->min = (stat_value->count ? MIN(stat_value->min, value) : value);
stat_value->max = MAX(stat_value->max, value);
stat_value->average = (stat_value->average * stat_value->count + value) /
(stat_value->count + 1);
stat_value->count++;
}
#endif
void reds_register_channel(Channel *channel)
{
ASSERT(reds);
channel->next = reds->channels;
reds->channels = channel;
reds->num_of_channels++;
}
void reds_unregister_channel(Channel *channel)
{
Channel **now = &reds->channels;
while (*now) {
if (*now == channel) {
*now = channel->next;
reds->num_of_channels--;
return;
}
now = &(*now)->next;
}
red_printf("not found");
}
static Channel *reds_find_channel(uint32_t type, uint32_t id)
{
Channel *channel = reds->channels;
while (channel && !(channel->type == type && channel->id == id)) {
channel = channel->next;
}
return channel;
}
static void reds_shatdown_channels()
{
Channel *channel = reds->channels;
while (channel) {
channel->shutdown(channel);
channel = channel->next;
}
}
static void reds_mig_cleanup()
{
if (reds->mig_inprogress) {
reds->mig_inprogress = FALSE;
reds->mig_wait_connect = FALSE;
reds->mig_wait_disconnect = FALSE;
core->disarm_timer(core, reds->mig_timer);
mig->notifier_done(mig, reds->mig_notifier);
}
}
static void reds_reset_vdp()
{
VDIPortState *state = &reds->agent_state;
while (!ring_is_empty(&state->write_queue)) {
VDIPortBuf *buf;
RingItem *item;
item = ring_get_tail(&state->write_queue);
ring_remove(item);
buf = (VDIPortBuf *)item;
buf->free(buf);
}
state->read_state = VDI_PORT_READ_STATE_READ_HADER;
state->recive_pos = (uint8_t *)&state->vdi_chunk_header;
state->recive_len = sizeof(state->vdi_chunk_header);
state->message_recive_len = 0;
if (state->current_read_buf) {
ring_add(&state->read_bufs, &state->current_read_buf->out_item.link);
state->current_read_buf = NULL;
}
state->client_agent_started = FALSE;
state->send_tokens = 0;
}
static void reds_reset_outgoing()
{
RedsOutgoingData *outgoing = &reds->outgoing;
RingItem *ring_item;
if (outgoing->item) {
outgoing->item->release(outgoing->item);
outgoing->item = NULL;
}
while ((ring_item = ring_get_tail(&outgoing->pipe))) {
RedsOutItem *out_item = (RedsOutItem *)ring_item;
ring_remove(ring_item);
out_item->release(out_item);
}
outgoing->vec_size = 0;
outgoing->vec = outgoing->vec_buf;
}
static void reds_disconnect()
{
if (!reds->peer || reds->disconnecting) {
return;
}
red_printf("");
LOG_MESSAGE(VD_LOG_INFO, "user disconnected");
reds->disconnecting = TRUE;
reds_reset_outgoing();
if (reds->agent_state.plug_ref != INVALID_VD_OBJECT_REF) {
ASSERT(vdagent);
vdagent->unplug(vdagent, reds->agent_state.plug_ref);
reds->agent_state.plug_ref = INVALID_VD_OBJECT_REF;
reds_reset_vdp();
}
reds_shatdown_channels();
core->set_file_handlers(core, reds->peer->socket, NULL, NULL, NULL);
reds->peer->cb_free(reds->peer);
reds->peer = NULL;
reds->in_handler.shut = TRUE;
reds->link_id = 0;
reds->serial = 0;
reds->ping_id = 0;
reds->net_test_id = 0;
reds->net_test_stage = NET_TEST_STAGE_INVALID;
reds->in_handler.end_pos = 0;
bitrate_per_sec = ~0;
letancy = 0;
reds_mig_cleanup();
reds->disconnecting = FALSE;
}
static void reds_mig_disconnect()
{
if (reds->peer) {
reds_disconnect();
} else {
reds_mig_cleanup();
}
}
static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
{
for (;;) {
uint8_t *buf = handler->buf;
uint32_t pos = handler->end_pos;
uint8_t *end = buf + pos;
RedDataHeader *header;
int n;
n = peer->cb_read(peer->ctx, buf + pos, RECIVE_BUF_SIZE - pos);
if (n <= 0) {
if (n == 0) {
return -1;
}
switch (errno) {
case EAGAIN:
return 0;
case EINTR:
break;
case EPIPE:
return -1;
default:
red_printf("%s", strerror(errno));
return -1;
}
} else {
pos += n;
end = buf + pos;
while (buf + sizeof(RedDataHeader) <= end &&
buf + sizeof(RedDataHeader) + (header = (RedDataHeader *)buf)->size <= end) {
buf += sizeof(RedDataHeader) + header->size;
handler->handle_message(handler->opaque, header);
if (handler->shut) {
return -1;
}
}
memmove(handler->buf, buf, (handler->end_pos = end - buf));
}
}
}
static int handle_outgoing(RedsStreamContext *peer, OutgoingHandler *handler)
{
if (!handler->length) {
return 0;
}
while (handler->length) {
int n;
n = peer->cb_write(peer->ctx, handler->now, handler->length);
if (n <= 0) {
if (n == 0) {
return -1;
}
switch (errno) {
case EAGAIN:
return 0;
case EINTR:
break;
case EPIPE:
return -1;
default:
red_printf("%s", strerror(errno));
return -1;
}
} else {
handler->now += n;
handler->length -= n;
}
}
handler->select(handler->opaque, FALSE);
handler->may_write(handler->opaque);
return 0;
}
#define OUTGOING_OK 0
#define OUTGOING_FAILED -1
#define OUTGOING_BLOCKED 1
static int outgoing_write(RedsStreamContext *peer, OutgoingHandler *handler, void *in_data,
int length)
{
uint8_t *data = in_data;
ASSERT(length <= SEND_BUF_SIZE);
if (handler->length) {
return OUTGOING_BLOCKED;
}
while (length) {
int n = peer->cb_write(peer->ctx, data, length);
if (n < 0) {
switch (errno) {
case EAGAIN:
handler->length = length;
memcpy(handler->buf, data, length);
handler->select(handler->opaque, TRUE);
return OUTGOING_OK;
case EINTR:
break;
case EPIPE:
return OUTGOING_FAILED;
default:
red_printf("%s", strerror(errno));
return OUTGOING_FAILED;
}
} else {
data += n;
length -= n;
}
}
return OUTGOING_OK;
}
typedef struct SimpleOutItem {
RedsOutItem base;
RedDataHeader header;
uint8_t data[0];
} SimpleOutItem;
static void reds_prepare_basic_out_item(RedsOutItem *in_item, struct iovec* vec, int *len)
{
SimpleOutItem *item = (SimpleOutItem *)in_item;
vec[0].iov_base = &item->header;
vec[0].iov_len = sizeof(item->header);
if (item->header.size) {
vec[1].iov_base = item->data;
vec[1].iov_len = item->header.size;
*len = 2;
} else {
*len = 1;
}
}
static void reds_free_basic_out_item(RedsOutItem *item)
{
free(item);
}
static SimpleOutItem *new_simple_out_item(uint32_t type, int message_size)
{
SimpleOutItem *item;
if (!(item = (SimpleOutItem *)malloc(sizeof(*item) + message_size))) {
return NULL;
}
ring_item_init(&item->base.link);
item->base.prepare = reds_prepare_basic_out_item;
item->base.release = reds_free_basic_out_item;
item->header.serial = ++reds->serial;
item->header.type = type;
item->header.size = message_size;
item->header.sub_list = 0;
return item;
}
static void reds_push_pipe_item(RedsOutItem *item)
{
ring_add(&reds->outgoing.pipe, &item->link);
reds_push();
}
static void reds_send_channels()
{
RedChannels* channels_info;
SimpleOutItem *item;
int message_size;
Channel *channel;
int i;
message_size = sizeof(RedChannels) + reds->num_of_channels * sizeof(RedChannelInit);
if (!(item = new_simple_out_item(RED_CHANNELS_LIST, message_size))) {
red_printf("alloc item failed");
reds_disconnect();
return;
}
channels_info = (RedChannels *)item->data;
channels_info->num_of_channels = reds->num_of_channels;
channel = reds->channels;
for (i = 0; i < reds->num_of_channels; i++) {
ASSERT(channel);
channels_info->channels[i].type = channel->type;
channels_info->channels[i].id = channel->id;
channel = channel->next;
}
reds_push_pipe_item(&item->base);
}
static void reds_prepare_ping_item(RedsOutItem *in_item, struct iovec* vec, int *len)
{
PingItem *item = (PingItem *)in_item;
vec[0].iov_base = &item->header;
vec[0].iov_len = sizeof(item->header);
vec[1].iov_base = &item->ping;
vec[1].iov_len = sizeof(item->ping);
int size = item->size;
int pos = 2;
while (size) {
ASSERT(pos < REDS_MAX_SEND_IOVEC);
int now = MIN(ZERO_BUF_SIZE, size);
size -= now;
vec[pos].iov_base = zero_page;
vec[pos].iov_len = now;
pos++;
}
*len = pos;
}
static void reds_free_ping_item(RedsOutItem *item)
{
free(item);
}
static int send_ping(int size)
{
struct timespec time_space;
PingItem *item;
if (!reds->peer || !(item = (PingItem *)malloc(sizeof(*item)))) {
return FALSE;
}
ring_item_init(&item->base.link);
item->base.prepare = reds_prepare_ping_item;
item->base.release = reds_free_ping_item;
item->header.serial = ++reds->serial;
item->header.type = RED_PING;
item->header.size = sizeof(item->ping) + size;
item->header.sub_list = 0;
item->ping.id = ++reds->ping_id;
clock_gettime(CLOCK_MONOTONIC, &time_space);
item->ping.timestamp = time_space.tv_sec * 1000000LL + time_space.tv_nsec / 1000LL;
item->size = size;
reds_push_pipe_item(&item->base);
return TRUE;
}
#ifdef RED_STATISTICS
static void do_ping_client(const char *opt, int has_interval, int interval)
{
if (!reds->peer) {
red_printf("not connected to peer");
return;
}
if (!opt) {
send_ping(0);
} else if (!strcmp(opt, "on")) {
if (has_interval && interval > 0) {
reds->ping_interval = interval * 1000;
}
core->arm_timer(core, reds->ping_timer, reds->ping_interval);
core->term_printf(core, "ping on, interval %u s\n", reds->ping_interval / 1000);
} else if (!strcmp(opt, "off")) {
core->disarm_timer(core, reds->ping_timer);
core->term_printf(core, "ping off\n");
} else {
core->term_printf(core, "ping invalid option: %s\n", opt);
return;
}
}
static void do_ping_client_2(const VDICmdArg* args)
{
if (args_is_empty(args)) {
do_ping_client(NULL, FALSE, 0);
return;
}
if (!args_is_string(args)) {
red_printf("invalid args");
return;
}
if (args_is_empty(&args[1])) {
do_ping_client(args[0].string_val, FALSE, 0);
return;
}
if (!args_is_int(&args[1])) {
red_printf("invalid args");
return;
}
do_ping_client(args[0].string_val, TRUE, args[1].int_val);
}
static void ping_timer_cb()
{
if (!reds->peer) {
red_printf("not connected to peer, ping off");
core->disarm_timer(core, reds->ping_timer);
return;
}
do_ping_client(NULL, 0, 0);
core->arm_timer(core, reds->ping_timer, reds->ping_interval);
}
static void do_info_rtt_client()
{
core->term_printf(core, "rtt=%uus, min/max/avg=%u/%u/%uus\n", reds->roundtrip_stat.value,
reds->roundtrip_stat.min, reds->roundtrip_stat.max,
reds->roundtrip_stat.average);
}
#endif
static void reds_send_mouse_mode()
{
RedMouseMode *mouse_mode;
SimpleOutItem *item;
if (!reds->peer) {
return;
}
if (!(item = new_simple_out_item(RED_MOUSE_MODE, sizeof(RedMouseMode)))) {
red_printf("alloc item failed");
reds_disconnect();
return;
}
mouse_mode = (RedMouseMode *)item->data;
mouse_mode->supported_modes = RED_MOUSE_MODE_SERVER;
if (reds->is_client_mouse_allowed) {
mouse_mode->supported_modes |= RED_MOUSE_MODE_CLIENT;
|