summaryrefslogtreecommitdiffstats
path: root/server/snd_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'server/snd_worker.c')
-rw-r--r--server/snd_worker.c1300
1 files changed, 1300 insertions, 0 deletions
diff --git a/server/snd_worker.c b/server/snd_worker.c
new file mode 100644
index 00000000..8cfabaef
--- /dev/null
+++ b/server/snd_worker.c
@@ -0,0 +1,1300 @@
+/*
+ Copyright (C) 2009 Red Hat, Inc.
+
+ This program is free software; you can redistribute it and/or
+ modify it under the terms of the GNU General Public License as
+ published by the Free Software Foundation; either version 2 of
+ the License, or (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <fcntl.h>
+#include <errno.h>
+#include <sys/socket.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <celt051/celt.h>
+
+#include "vd_interface.h"
+#include "red_common.h"
+#include "reds.h"
+#include "red_dispatcher.h"
+#include "snd_worker.h"
+
+#define MAX_SEND_VEC 100
+#define MAX_SEND_BUFS 200
+
+#define RECIVE_BUF_SIZE (16 * 1024 * 2)
+
+#define FRAME_SIZE 256
+#define PLAYBACK_BUF_SIZE (FRAME_SIZE * 4)
+
+#define CELT_BIT_RATE (64 * 1024)
+#define CELT_COMPRESSED_FRAME_BYTES (FRAME_SIZE * CELT_BIT_RATE / VD_INTERFACE_PLAYBACK_FREQ / 8)
+
+#define RECORD_SAMPLES_SIZE (RECIVE_BUF_SIZE >> 2)
+
+enum PlaybackeCommand {
+ SND_PLAYBACK_MIGRATE,
+ SND_PLAYBACK_MODE,
+ SND_PLAYBACK_CTRL,
+ SND_PLAYBACK_PCM,
+};
+
+enum RecordCommand {
+ SND_RECORD_MIGRATE,
+ SND_RECORD_CTRL,
+};
+
+#define SND_PLAYBACK_MIGRATE_MASK (1 << SND_PLAYBACK_MIGRATE)
+#define SND_PLAYBACK_MODE_MASK (1 << SND_PLAYBACK_MODE)
+#define SND_PLAYBACK_CTRL_MASK (1 << SND_PLAYBACK_CTRL)
+#define SND_PLAYBACK_PCM_MASK (1 << SND_PLAYBACK_PCM)
+
+#define SND_RECORD_MIGRATE_MASK (1 << SND_RECORD_MIGRATE)
+#define SND_RECORD_CTRL_MASK (1 << SND_RECORD_CTRL)
+
+typedef struct BufDescriptor {
+ uint32_t size;
+ uint8_t *data;
+} BufDescriptor;
+
+typedef struct SndChannel SndChannel;
+typedef void (*send_messages_proc)(void *in_channel);
+typedef int (*handle_message_proc)(SndChannel *channel, RedDataHeader *message);
+typedef void (*on_message_done_proc)(SndChannel *channel);
+typedef void (*cleanup_channel_proc)(SndChannel *channel);
+
+typedef struct SndWorker SndWorker;
+
+struct SndChannel {
+ RedsStreamContext *peer;
+ SndWorker *worker;
+
+ int active;
+ int client_active;
+ int blocked;
+
+ uint32_t command;
+ int migrate;
+ uint32_t ack_generation;
+ uint32_t client_ack_generation;
+ uint32_t out_messages;
+ uint32_t ack_messages;
+
+ struct {
+ RedDataHeader header;
+ uint32_t n_bufs;
+ BufDescriptor bufs[MAX_SEND_BUFS];
+
+ uint32_t size;
+ uint32_t pos;
+ } send_data;
+
+ struct {
+ uint8_t buf[RECIVE_BUF_SIZE];
+ RedDataHeader *message;
+ uint8_t *now;
+ uint8_t *end;
+ } recive_data;
+
+ send_messages_proc send_messages;
+ handle_message_proc handle_message;
+ on_message_done_proc on_message_done;
+ cleanup_channel_proc cleanup;
+};
+
+typedef struct AudioFrame AudioFrame;
+struct AudioFrame {
+ uint32_t time;
+ uint32_t samples[FRAME_SIZE];
+ AudioFrame *next;
+};
+
+typedef struct PlaybackChannel {
+ SndChannel base;
+ AudioFrame frames[3];
+ PlaybackPlug plug;
+ VDObjectRef plug_ref;
+ AudioFrame *free_frames;
+ AudioFrame *in_progress;
+ AudioFrame *pending_frame;
+ CELTMode *celt_mode;
+ CELTEncoder *celt_encoder;
+ int celt_allowed;
+ uint32_t mode;
+ struct {
+ union {
+ RedPlaybackMode mode;
+ RedPlaybackStart start;
+ RedMigrate migrate;
+ uint8_t celt_buf[CELT_COMPRESSED_FRAME_BYTES];
+ } u;
+ } send_data;
+} PlaybackChannel;
+
+struct SndWorker {
+ Channel base;
+ VDInterface *interface;
+ SndChannel *connection;
+ SndWorker *next;
+};
+
+#define RECORD_MIG_VERSION 1
+
+typedef struct __attribute__ ((__packed__)) RecordMigrateData {
+ uint32_t version;
+ uint64_t serial;
+ uint32_t start_time;
+ uint32_t mode;
+ uint32_t mode_time;
+} RecordMigrateData;
+
+typedef struct __attribute__ ((__packed__)) RecordMigrateMessage {
+ RedMigrate migrate;
+ RedDataHeader header;
+ RecordMigrateData data;
+} RecordMigrateMessage;
+
+typedef struct RecordChannel {
+ SndChannel base;
+ RecordPlug plug;
+ VDObjectRef plug_ref;
+ uint32_t samples[RECORD_SAMPLES_SIZE];
+ uint32_t write_pos;
+ uint32_t read_pos;
+ uint32_t mode;
+ uint32_t mode_time;
+ uint32_t start_time;
+ CELTDecoder *celt_decoder;
+ CELTMode *celt_mode;
+ uint32_t celt_buf[FRAME_SIZE];
+ struct {
+ union {
+ RedRecordStart start;
+ RecordMigrateMessage migrate;
+ } u;
+ } send_data;
+} RecordChannel;
+
+static SndWorker *workers = NULL;
+static uint32_t playback_compression = RED_AUDIO_DATA_MODE_CELT_0_5_1;
+
+static void snd_receive(void* data);
+
+static inline BufDescriptor *snd_find_buf(SndChannel *channel, int buf_pos, int *buf_offset)
+{
+ BufDescriptor *buf;
+ int pos = 0;
+
+ for (buf = channel->send_data.bufs; buf_pos >= pos + buf->size; buf++) {
+ pos += buf->size;
+ ASSERT(buf != &channel->send_data.bufs[channel->send_data.n_bufs - 1]);
+ }
+ *buf_offset = buf_pos - pos;
+ return buf;
+}
+
+static inline uint32_t __snd_fill_iovec(BufDescriptor *buf, int skip, struct iovec *vec,
+ int *vec_index, long phys_delta)
+{
+ uint32_t size = 0;
+ vec[*vec_index].iov_base = buf->data + skip;
+ vec[*vec_index].iov_len = size = buf->size - skip;
+ (*vec_index)++;
+ return size;
+}
+
+static inline void snd_fill_iovec(SndChannel *channel, struct iovec *vec, int *vec_size)
+{
+ int vec_index = 0;
+ uint32_t pos = channel->send_data.pos;
+ ASSERT(channel->send_data.size != pos && channel->send_data.size > pos);
+
+ do {
+ BufDescriptor *buf;
+ int buf_offset;
+
+ buf = snd_find_buf(channel, pos, &buf_offset);
+ ASSERT(buf);
+ pos += __snd_fill_iovec(buf, buf_offset, vec, &vec_index, 0);
+ } while (vec_index < MAX_SEND_VEC && pos != channel->send_data.size);
+ *vec_size = vec_index;
+}
+
+static void snd_disconnect_channel(SndChannel *channel)
+{
+ SndWorker *worker;
+
+ if (!channel) {
+ return;
+ }
+ channel->cleanup(channel);
+ worker = channel->worker;
+ worker->connection = NULL;
+ core->set_file_handlers(core, channel->peer->socket, NULL, NULL, NULL);
+ channel->peer->cb_free(channel->peer);
+ free(channel);
+}
+
+static void snd_playback_free_frame(PlaybackChannel *playback_channel, AudioFrame *frame)
+{
+ frame->next = playback_channel->free_frames;
+ playback_channel->free_frames = frame;
+}
+
+static void snd_playback_on_message_done(SndChannel *channel)
+{
+ PlaybackChannel *playback_channel = (PlaybackChannel *)channel;
+ if (playback_channel->in_progress) {
+ snd_playback_free_frame(playback_channel, playback_channel->in_progress);
+ playback_channel->in_progress = NULL;
+ if (playback_channel->pending_frame) {
+ channel->command |= SND_PLAYBACK_PCM_MASK;
+ }
+ }
+}
+
+static void snd_record_on_message_done(SndChannel *channel)
+{
+}
+
+static int snd_send_data(SndChannel *channel)
+{
+ uint32_t n;
+
+ if (!channel) {
+ return FALSE;
+ }
+
+ if (!(n = channel->send_data.size - channel->send_data.pos)) {
+ return TRUE;
+ }
+
+ for (;;) {
+ struct iovec vec[MAX_SEND_VEC];
+ int vec_size;
+
+ if (!n) {
+ channel->on_message_done(channel);
+
+ if (channel->blocked) {
+ channel->blocked = FALSE;
+ if (core->set_file_handlers(core, channel->peer->socket, snd_receive,
+ NULL, channel) == -1) {
+ red_printf("qemu_set_fd_handler failed");
+ }
+ }
+ break;
+ }
+
+ snd_fill_iovec(channel, vec, &vec_size);
+ if ((n = channel->peer->cb_writev(channel->peer->ctx, vec, vec_size)) == -1) {
+ switch (errno) {
+ case EAGAIN:
+ channel->blocked = TRUE;
+ if (core->set_file_handlers(core, channel->peer->socket, snd_receive,
+ channel->send_messages, channel) == -1) {
+ red_printf("qemu_set_fd_handler failed");
+ }
+ return FALSE;
+ case EINTR:
+ break;
+ case EPIPE:
+ snd_disconnect_channel(channel);
+ return FALSE;
+ default:
+ red_printf("%s", strerror(errno));
+ snd_disconnect_channel(channel);
+ return FALSE;
+ }
+ } else {
+ channel->send_data.pos += n;
+ }
+ n = channel->send_data.size - channel->send_data.pos;
+ }
+ return TRUE;
+}
+
+static int snd_record_handle_write(RecordChannel *record_channel, RedDataHeader *message)
+{
+ RedcRecordPacket *packet;
+ uint32_t write_pos;
+ uint32_t* data;
+ uint32_t size;
+ uint32_t len;
+ uint32_t now;
+
+ if (!record_channel) {
+ return FALSE;
+ }
+
+ packet = (RedcRecordPacket *)(message + 1);
+ size = message->size - sizeof(*packet);
+
+ if (record_channel->mode == RED_AUDIO_DATA_MODE_CELT_0_5_1) {
+ int celt_err = celt051_decode(record_channel->celt_decoder, packet->data, size,
+ (celt_int16_t *)record_channel->celt_buf);
+ if (celt_err != CELT_OK) {
+ red_printf("celt decode failed (%d)", celt_err);
+ return FALSE;
+ }
+ data = record_channel->celt_buf;
+ size = FRAME_SIZE;
+ } else if (record_channel->mode == RED_AUDIO_DATA_MODE_RAW) {
+ data = (uint32_t *)packet->data;
+ size = size >> 2;
+ size = MIN(size, RECORD_SAMPLES_SIZE);
+ } else {
+ return FALSE;
+ }
+
+ write_pos = record_channel->write_pos % RECORD_SAMPLES_SIZE;
+ record_channel->write_pos += size;
+ len = RECORD_SAMPLES_SIZE - write_pos;
+ now = MIN(len, size);
+ size -= now;
+ memcpy(record_channel->samples + write_pos, data, now << 2);
+
+ if (size) {
+ memcpy(record_channel->samples, data + now, size << 2);
+ }
+
+ if (record_channel->write_pos - record_channel->read_pos > RECORD_SAMPLES_SIZE) {
+ record_channel->read_pos = record_channel->write_pos - RECORD_SAMPLES_SIZE;
+ }
+ return TRUE;
+}
+
+static int snd_playback_handle_message(SndChannel *channel, RedDataHeader *message)
+{
+ if (!channel) {
+ return FALSE;
+ }
+
+ switch (message->type) {
+ case REDC_DISCONNECTING:
+ break;
+ default:
+ red_printf("invalid message type %u", message->type);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static int snd_record_handle_message(SndChannel *channel, RedDataHeader *message)
+{
+ RecordChannel *record_channel = (RecordChannel *)channel;
+
+ if (!channel) {
+ return FALSE;
+ }
+ switch (message->type) {
+ case REDC_RECORD_DATA:
+ return snd_record_handle_write((RecordChannel *)channel, message);
+ case REDC_RECORD_MODE: {
+ RedcRecordMode *mode = (RedcRecordMode *)(message + 1);
+ record_channel->mode = mode->mode;
+ record_channel->mode_time = mode->time;
+ if (record_channel->mode != RED_AUDIO_DATA_MODE_CELT_0_5_1 &&
+ record_channel->mode != RED_AUDIO_DATA_MODE_RAW) {
+ red_printf("unsupported mode");
+ }
+ break;
+ }
+ case REDC_RECORD_START_MARK: {
+ RedcRecordStartMark *mark = (RedcRecordStartMark *)(message + 1);
+ record_channel->start_time = mark->time;
+ break;
+ }
+ case REDC_DISCONNECTING:
+ break;
+ case REDC_MIGRATE_DATA: {
+ RecordMigrateData* mig_data = (RecordMigrateData *)(message + 1);
+ if (mig_data->version != RECORD_MIG_VERSION) {
+ red_printf("invalid mig version");
+ break;
+ }
+ record_channel->mode = mig_data->mode;
+ record_channel->mode_time = mig_data->mode_time;
+ record_channel->start_time = mig_data->start_time;
+ break;
+ }
+ default:
+ red_printf("invalid message type %u", message->type);
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static void snd_receive(void* data)
+{
+ SndChannel *channel = (SndChannel*)data;
+ if (!channel) {
+ return;
+ }
+
+ for (;;) {
+ ssize_t n;
+ n = channel->recive_data.end - channel->recive_data.now;
+ ASSERT(n);
+ if ((n = channel->peer->cb_read(channel->peer->ctx, channel->recive_data.now, n)) <= 0) {
+ if (n == 0) {
+ snd_disconnect_channel(channel);
+ return;
+ }
+ ASSERT(n == -1);
+ switch (errno) {
+ case EAGAIN:
+ return;
+ case EINTR:
+ break;
+ case EPIPE:
+ snd_disconnect_channel(channel);
+ return;
+ default:
+ red_printf("%s", strerror(errno));
+ snd_disconnect_channel(channel);
+ return;
+ }
+ } else {
+ channel->recive_data.now += n;
+ for (;;) {
+ RedDataHeader *message = channel->recive_data.message;
+ n = channel->recive_data.now - (uint8_t *)message;
+ if (n < sizeof(RedDataHeader) || n < sizeof(RedDataHeader) + message->size) {
+ break;
+ }
+ if (!channel->handle_message(channel, message)) {
+ snd_disconnect_channel(channel);
+ return;
+ }
+ channel->recive_data.message = (RedDataHeader *)((uint8_t *)message +
+ sizeof(RedDataHeader) +
+ message->size);
+ }
+ if (channel->recive_data.now == (uint8_t *)channel->recive_data.message) {
+ channel->recive_data.now = channel->recive_data.buf;
+ channel->recive_data.message = (RedDataHeader *)channel->recive_data.buf;
+ } else if (channel->recive_data.now == channel->recive_data.end) {
+ memcpy(channel->recive_data.buf, channel->recive_data.message, n);
+ channel->recive_data.now = channel->recive_data.buf + n;
+ channel->recive_data.message = (RedDataHeader *)channel->recive_data.buf;
+ }
+ }
+ }
+}
+
+static inline void __snd_add_buf(SndChannel *channel, void *data, uint32_t size)
+{
+ int pos = channel->send_data.n_bufs++;
+ ASSERT(pos < MAX_SEND_BUFS);
+ channel->send_data.bufs[pos].size = size;
+ channel->send_data.bufs[pos].data = data;
+}
+
+static void snd_add_buf(SndChannel *channel, void *data, uint32_t size)
+{
+ __snd_add_buf(channel, data, size);
+ channel->send_data.header.size += size;
+}
+
+static inline int snd_reset_send_data(SndChannel *channel, uint16_t verb)
+{
+ if (!channel) {
+ return FALSE;
+ }
+
+ channel->send_data.pos = 0;
+ channel->send_data.n_bufs = 0;
+ channel->send_data.header.sub_list = 0;
+ channel->send_data.header.size = 0;
+ channel->send_data.header.type = verb;
+ ++channel->send_data.header.serial;
+ __snd_add_buf(channel, &channel->send_data.header, sizeof(RedDataHeader));
+ return TRUE;
+}
+
+static int snd_playback_send_migrate(PlaybackChannel *channel)
+{
+ if (!snd_reset_send_data((SndChannel *)channel, RED_MIGRATE)) {
+ return FALSE;
+ }
+ channel->send_data.u.migrate.flags = 0;
+ snd_add_buf((SndChannel *)channel, &channel->send_data.u.migrate,
+ sizeof(channel->send_data.u.migrate));
+ channel->base.send_data.size = channel->base.send_data.header.size + sizeof(RedDataHeader);
+ return snd_send_data((SndChannel *)channel);
+}
+
+static int snd_playback_send_start(PlaybackChannel *playback_channel)
+{
+ SndChannel *channel = (SndChannel *)playback_channel;
+ RedPlaybackStart *start;
+ if (!snd_reset_send_data(channel, RED_PLAYBACK_START)) {
+ return FALSE;
+ }
+
+ start = &playback_channel->send_data.u.start;
+ start->channels = VD_INTERFACE_PLAYBACK_CHAN;
+ start->frequency = VD_INTERFACE_PLAYBACK_FREQ;
+ ASSERT(VD_INTERFACE_PLAYBACK_FMT == VD_INTERFACE_AUDIO_FMT_S16);
+ start->format = RED_AUDIO_FMT_S16;
+ start->time = reds_get_mm_time();
+ snd_add_buf(channel, start, sizeof(*start));
+
+ channel->send_data.size = sizeof(RedDataHeader) + sizeof(*start);
+ return snd_send_data(channel);
+}
+
+static int snd_playback_send_stop(PlaybackChannel *playback_channel)
+{
+ SndChannel *channel = (SndChannel *)playback_channel;
+ if (!snd_reset_send_data(channel, RED_PLAYBACK_STOP)) {
+ return FALSE;
+ }
+ channel->send_data.size = sizeof(RedDataHeader);
+ return snd_send_data(channel);
+}
+
+static int snd_playback_send_ctl(PlaybackChannel *playback_channel)
+{
+ SndChannel *channel = (SndChannel *)playback_channel;
+
+ if ((channel->client_active = channel->active)) {
+ return snd_playback_send_start(playback_channel);
+ } else {
+ return snd_playback_send_stop(playback_channel);
+ }
+}
+
+static int snd_record_send_start(RecordChannel *record_channel)
+{
+ SndChannel *channel = (SndChannel *)record_channel;
+ RedRecordStart *start;
+ if (!snd_reset_send_data(channel, RED_RECORD_START)) {
+ return FALSE;
+ }
+
+ start = &record_channel->send_data.u.start;
+ start->channels = VD_INTERFACE_RECORD_CHAN;
+ start->frequency = VD_INTERFACE_RECORD_FREQ;
+ ASSERT(VD_INTERFACE_RECORD_FMT == VD_INTERFACE_AUDIO_FMT_S16);
+ start->format = RED_AUDIO_FMT_S16;
+ snd_add_buf(channel, start, sizeof(*start));
+
+ channel->send_data.size = sizeof(RedDataHeader) + sizeof(*start);
+ return snd_send_data(channel);
+}
+
+static int snd_record_send_stop(RecordChannel *record_channel)
+{
+ SndChannel *channel = (SndChannel *)record_channel;
+ if (!snd_reset_send_data(channel, RED_RECORD_STOP)) {
+ return FALSE;
+ }
+ channel->send_data.size = sizeof(RedDataHeader);
+ return snd_send_data(channel);
+}
+
+static int snd_record_send_ctl(RecordChannel *record_channel)
+{
+ SndChannel *channel = (SndChannel *)record_channel;
+
+ if ((channel->client_active = channel->active)) {
+ return snd_record_send_start(record_channel);
+ } else {
+ return snd_record_send_stop(record_channel);
+ }
+}
+
+static int snd_record_send_migrate(RecordChannel *record_channel)
+{
+ SndChannel *channel = (SndChannel *)record_channel;
+ RecordMigrateMessage* migrate;
+
+ if (!snd_reset_send_data(channel, RED_MIGRATE)) {
+ return FALSE;
+ }
+
+ migrate = &record_channel->send_data.u.migrate;
+ migrate->migrate.flags = RED_MIGRATE_NEED_DATA_TRANSFER;
+ migrate->header.type = RED_MIGRATE_DATA;
+ migrate->header.size = sizeof(RecordMigrateData);
+ migrate->header.serial = ++channel->send_data.header.serial;
+ migrate->header.sub_list = 0;
+
+ migrate->data.version = RECORD_MIG_VERSION;
+ migrate->data.serial = channel->send_data.header.serial;
+ migrate->data.start_time = record_channel->start_time;
+ migrate->data.mode = record_channel->mode;
+ migrate->data.mode_time = record_channel->mode_time;
+
+ snd_add_buf(channel, migrate, sizeof(*migrate));
+ channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader);
+ channel->send_data.header.size -= sizeof(migrate->header);
+ channel->send_data.header.size -= sizeof(migrate->data);
+ return snd_send_data(channel);
+}
+
+static int snd_playback_send_write(PlaybackChannel *playback_channel)
+{
+ SndChannel *channel = (SndChannel *)playback_channel;
+ AudioFrame *frame;
+
+ if (!snd_reset_send_data(channel, RED_PLAYBACK_DATA)) {
+ return FALSE;
+ }
+
+ frame = playback_channel->in_progress;
+ snd_add_buf(channel, &frame->time, sizeof(frame->time));
+ if (playback_channel->mode == RED_AUDIO_DATA_MODE_CELT_0_5_1) {
+ int n = celt051_encode(playback_channel->celt_encoder, (celt_int16_t *)frame->samples, NULL,
+ playback_channel->send_data.u.celt_buf, CELT_COMPRESSED_FRAME_BYTES);
+ if (n < 0) {
+ red_printf("celt encode failed");
+ snd_disconnect_channel(channel);
+ return FALSE;
+ }
+ snd_add_buf(channel, playback_channel->send_data.u.celt_buf, n);
+ } else {
+ snd_add_buf(channel, frame->samples, sizeof(frame->samples));
+ }
+
+ channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader);
+
+ return snd_send_data(channel);
+}
+
+static int playback_send_mode(PlaybackChannel *playback_channel)
+{
+ SndChannel *channel = (SndChannel *)playback_channel;
+ RedPlaybackMode *mode;
+
+ if (!snd_reset_send_data(channel, RED_PLAYBACK_MODE)) {
+ return FALSE;
+ }
+ mode = &playback_channel->send_data.u.mode;
+ mode->time = reds_get_mm_time();
+ mode->mode = playback_channel->mode;
+ snd_add_buf(channel, mode, sizeof(*mode));
+
+ channel->send_data.size = channel->send_data.header.size + sizeof(RedDataHeader);
+ return snd_send_data(channel);
+}
+
+static void snd_playback_send(void* data)
+{
+ PlaybackChannel *playback_channel = (PlaybackChannel*)data;
+ SndChannel *channel = (SndChannel*)playback_channel;
+
+ if (!playback_channel || !snd_send_data(data)) {
+ return;
+ }
+
+ while (channel->command) {
+ if (channel->command & SND_PLAYBACK_MODE_MASK) {
+ if (!playback_send_mode(playback_channel)) {
+ return;
+ }
+ channel->command &= ~SND_PLAYBACK_MODE_MASK;
+ }
+ if (channel->command & SND_PLAYBACK_PCM_MASK) {
+ ASSERT(!playback_channel->in_progress && playback_channel->pending_frame);
+ playback_channel->in_progress = playback_channel->pending_frame;
+ playback_channel->pending_frame = NULL;
+ channel->command &= ~SND_PLAYBACK_PCM_MASK;
+ if (!snd_playback_send_write(playback_channel)) {
+ red_printf("snd_send_playback_write failed");
+ return;
+ }
+ }
+ if (channel->command & SND_PLAYBACK_CTRL_MASK) {
+ if (!snd_playback_send_ctl(playback_channel)) {
+ return;
+ }
+ channel->command &= ~SND_PLAYBACK_CTRL_MASK;
+ }
+ if (channel->command & SND_PLAYBACK_MIGRATE_MASK) {
+ if (!snd_playback_send_migrate(playback_channel)) {
+ return;
+ }
+ channel->command &= ~SND_PLAYBACK_MIGRATE_MASK;
+ }
+ }
+}
+
+static void snd_record_send(void* data)
+{
+ RecordChannel *record_channel = (RecordChannel*)data;
+ SndChannel *channel = (SndChannel*)record_channel;
+
+ if (!record_channel || !snd_send_data(data)) {
+ return;
+ }
+
+ while (channel->command) {
+ if (channel->command & SND_RECORD_CTRL_MASK) {
+ if (!snd_record_send_ctl(record_channel)) {
+ return;
+ }
+ channel->command &= ~SND_RECORD_CTRL_MASK;
+ }
+ if (channel->command & SND_RECORD_MIGRATE_MASK) {
+ if (!snd_record_send_migrate(record_channel)) {
+ return;
+ }
+ channel->command &= ~SND_RECORD_MIGRATE_MASK;
+ }
+ }
+}
+
+static SndChannel *__new_channel(SndWorker *worker, int size, RedsStreamContext *peer,
+ int migrate, send_messages_proc send_messages,
+ handle_message_proc handle_message,
+ on_message_done_proc on_message_done,
+ cleanup_channel_proc cleanup)
+{
+ SndChannel *channel;
+ int delay_val;
+ int flags;
+ int priority;
+ int tos;
+
+ if ((flags = fcntl(peer->socket, F_GETFL)) == -1) {
+ red_printf("accept failed, %s", strerror(errno));
+ goto error1;
+ }
+
+ priority = 6;
+ if (setsockopt(peer->socket, SOL_SOCKET, SO_PRIORITY, (void*)&priority,
+ sizeof(priority)) == -1) {
+ red_printf("setsockopt failed, %s", strerror(errno));
+ }
+
+ tos = IPTOS_LOWDELAY;
+ if (setsockopt(peer->socket, IPPROTO_IP, IP_TOS, (void*)&tos, sizeof(tos)) == -1) {
+ red_printf("setsockopt failed, %s", strerror(errno));
+ }
+
+ delay_val = IS_LOW_BANDWIDTH() ? 0 : 1;
+ if (setsockopt(peer->socket, IPPROTO_TCP, TCP_NODELAY, &delay_val, sizeof(delay_val)) == -1) {
+ red_printf("setsockopt failed, %s", strerror(errno));
+ }
+
+ if (fcntl(peer->socket, F_SETFL, flags | O_NONBLOCK) == -1) {
+ red_printf("accept failed, %s", strerror(errno));
+ goto error1;
+ }
+
+ ASSERT(size >= sizeof(*channel));
+ if (!(channel = malloc(size))) {
+ red_printf("malloc failed");
+ goto error1;
+ }
+ memset(channel, 0, size);
+ channel->peer = peer;
+ channel->worker = worker;
+ channel->recive_data.message = (RedDataHeader *)channel->recive_data.buf;
+ channel->recive_data.now = channel->recive_data.buf;
+ channel->recive_data.end = channel->recive_data.buf + sizeof(channel->recive_data.buf);
+
+ if (core->set_file_handlers(core, peer->socket, snd_receive, NULL, channel) == -1) {
+ red_printf("qemu_set_fd_handler failed, %s", strerror(errno));
+ goto error2;
+ }
+
+ channel->migrate = migrate;
+ channel->send_messages = send_messages;
+ channel->handle_message = handle_message;
+ channel->on_message_done = on_message_done;
+ channel->cleanup = cleanup;
+ return channel;
+
+error2:
+ free(channel);
+
+error1:
+ peer->cb_free(peer);
+ return NULL;
+}
+
+static void snd_shutdown(Channel *channel)
+{
+ SndWorker *worker = (SndWorker *)channel;
+ snd_disconnect_channel(worker->connection);
+}
+
+static void snd_set_command(SndChannel *channel, uint32_t command)
+{
+ if (!channel) {
+ return;
+ }
+ channel->command |= command;
+}
+
+static void snd_playback_start(PlaybackPlug *plug)
+{
+ PlaybackChannel *playback_channel = CONTAINEROF(plug, PlaybackChannel, plug);
+
+ ASSERT(!playback_channel->base.active);
+ reds_desable_mm_timer();
+ playback_channel->base.active = TRUE;
+ if (!playback_channel->base.client_active) {
+ snd_set_command(&playback_channel->base, SND_PLAYBACK_CTRL_MASK);
+ snd_playback_send(&playback_channel->base);
+ } else {
+ playback_channel->base.command &= ~SND_PLAYBACK_CTRL_MASK;
+ }
+}
+
+static void snd_playback_stop(PlaybackPlug *plug)
+{
+ PlaybackChannel *playback_channel = CONTAINEROF(plug, PlaybackChannel, plug);
+
+ ASSERT(playback_channel->base.active);
+ reds_enable_mm_timer();
+ playback_channel->base.active = FALSE;
+ if (playback_channel->base.client_active) {
+ snd_set_command(&playback_channel->base, SND_PLAYBACK_CTRL_MASK);
+ snd_playback_send(&playback_channel->base);
+ } else {
+ playback_channel->base.command &= ~SND_PLAYBACK_CTRL_MASK;
+ playback_channel->base.command &= ~SND_PLAYBACK_PCM_MASK;
+
+ if (playback_channel->pending_frame) {
+ ASSERT(!playback_channel->in_progress);
+ snd_playback_free_frame(playback_channel,
+ playback_channel->pending_frame);
+ playback_channel->pending_frame = NULL;
+ }
+ }
+}
+
+static void snd_playback_get_frame(PlaybackPlug *plug, uint32_t **frame, uint32_t *num_samples)
+{
+ PlaybackChannel *playback_channel = CONTAINEROF(plug, PlaybackChannel, plug);
+
+ ASSERT(playback_channel->base.active);
+ if (!playback_channel->free_frames) {
+ *frame = NULL;
+ *num_samples = 0;
+ return;
+ }
+
+ *frame = playback_channel->free_frames->samples;
+ playback_channel->free_frames = playback_channel->free_frames->next;
+ *num_samples = FRAME_SIZE;
+}
+
+static void snd_playback_put_frame(PlaybackPlug *plug, uint32_t *samples)
+{
+ PlaybackChannel *playback_channel = CONTAINEROF(plug, PlaybackChannel, plug);
+ AudioFrame *frame;
+
+ ASSERT(playback_channel->base.active);
+
+ if (playback_channel->pending_frame) {
+ snd_playback_free_frame(playback_channel, playback_channel->pending_frame);
+ }
+ frame = CONTAINEROF(samples, AudioFrame, samples);
+ frame->time = reds_get_mm_time();
+ red_dispatcher_set_mm_time(frame->time);
+ playback_channel->pending_frame = frame;
+ snd_set_command(&playback_channel->base, SND_PLAYBACK_PCM_MASK);
+ snd_playback_send(&playback_channel->base);
+}
+
+static void on_new_playback_channel(SndWorker *worker)
+{
+ PlaybackChannel *playback_channel = (PlaybackChannel *)worker->connection;
+ PlaybackInterface *interface = (PlaybackInterface *)worker->interface;
+ ASSERT(playback_channel);
+
+ playback_channel->plug_ref = interface->plug(interface, &playback_channel->plug,
+ &playback_channel->base.active);
+ snd_set_command((SndChannel *)playback_channel, SND_PLAYBACK_MODE_MASK);
+ if (!playback_channel->base.migrate && playback_channel->base.active) {
+ snd_set_command((SndChannel *)playback_channel, SND_PLAYBACK_CTRL_MASK);
+ }
+ if (playback_channel->base.active) {
+ reds_desable_mm_timer();
+ }
+}
+
+static void snd_playback_cleanup(SndChannel *channel)
+{
+ PlaybackChannel *playback_channel = (PlaybackChannel *)channel;
+ PlaybackInterface *interface = (PlaybackInterface *)channel->worker->interface;
+
+ if (playback_channel->base.active) {
+ reds_enable_mm_timer();
+ }
+ interface->unplug(interface, playback_channel->plug_ref);
+
+ celt051_encoder_destroy(playback_channel->celt_encoder);
+ celt051_mode_destroy(playback_channel->celt_mode);
+}
+
+static void snd_set_playback_peer(Channel *channel, RedsStreamContext *peer, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
+{
+ SndWorker *worker = (SndWorker *)channel;
+ PlaybackChannel *playback_channel;
+ CELTEncoder *celt_encoder;
+ CELTMode *celt_mode;
+ int celt_error;
+
+ snd_disconnect_channel(worker->connection);
+
+ if (!(celt_mode = celt051_mode_create(VD_INTERFACE_PLAYBACK_FREQ, VD_INTERFACE_PLAYBACK_CHAN,
+ FRAME_SIZE, &celt_error))) {
+ red_printf("create celt mode failed %d", celt_error);
+ return;
+ }
+
+ if (!(celt_encoder = celt051_encoder_create(celt_mode))) {
+ red_printf("create celt encoder failed");
+ goto error_1;
+ }
+
+ if (!(playback_channel = (PlaybackChannel *)__new_channel(worker,
+ sizeof(*playback_channel),
+ peer,
+ migration,
+ snd_playback_send,
+ snd_playback_handle_message,
+ snd_playback_on_message_done,
+ snd_playback_cleanup))) {
+ goto error_2;
+ }
+ worker->connection = &playback_channel->base;
+ snd_playback_free_frame(playback_channel, &playback_channel->frames[0]);
+ snd_playback_free_frame(playback_channel, &playback_channel->frames[1]);
+ snd_playback_free_frame(playback_channel, &playback_channel->frames[2]);
+
+ playback_channel->plug.major_version = VD_INTERFACE_PLAYBACK_MAJOR;
+ playback_channel->plug.minor_version = VD_INTERFACE_PLAYBACK_MINOR;
+ playback_channel->plug.start = snd_playback_start;
+ playback_channel->plug.stop = snd_playback_stop;
+ playback_channel->plug.get_frame = snd_playback_get_frame;
+ playback_channel->plug.put_frame = snd_playback_put_frame;
+ playback_channel->celt_mode = celt_mode;
+ playback_channel->celt_encoder = celt_encoder;
+ playback_channel->celt_allowed = num_caps > 0 && (caps[0] & (1 << RED_PLAYBACK_CAP_CELT_0_5_1));
+ playback_channel->mode = playback_channel->celt_allowed ? playback_compression :
+ RED_AUDIO_DATA_MODE_RAW;
+
+ on_new_playback_channel(worker);
+ snd_playback_send(worker->connection);
+ return;
+
+error_2:
+ celt051_encoder_destroy(celt_encoder);
+
+error_1:
+ celt051_mode_destroy(celt_mode);
+}
+
+static void snd_record_migrate(Channel *channel)
+{
+ SndWorker *worker = (SndWorker *)channel;
+ if (worker->connection) {
+ snd_set_command(worker->connection, SND_RECORD_MIGRATE_MASK);
+ snd_record_send(worker->connection);
+ }
+}
+
+static void snd_record_start(RecordPlug *plug)
+{
+ RecordChannel *record_channel = CONTAINEROF(plug, RecordChannel, plug);
+
+ ASSERT(!record_channel->base.active);
+ record_channel->base.active = TRUE;
+ record_channel->read_pos = record_channel->write_pos = 0; //todo: improve by
+ //stream generation
+ if (!record_channel->base.client_active) {
+ snd_set_command(&record_channel->base, SND_RECORD_CTRL_MASK);
+ snd_record_send(&record_channel->base);
+ } else {
+ record_channel->base.command &= ~SND_RECORD_CTRL_MASK;
+ }
+}
+
+static void snd_record_stop(RecordPlug *plug)
+{
+ RecordChannel *record_channel = CONTAINEROF(plug, RecordChannel, plug);
+
+ ASSERT(record_channel->base.active);
+ record_channel->base.active = FALSE;
+ if (record_channel->base.client_active) {
+ snd_set_command(&record_channel->base, SND_RECORD_CTRL_MASK);
+ snd_record_send(&record_channel->base);
+ } else {
+ record_channel->base.command &= ~SND_RECORD_CTRL_MASK;
+ }
+}
+
+static uint32_t snd_record_read(RecordPlug *plug, uint32_t num_samples, uint32_t *samples)
+{
+ RecordChannel *record_channel = CONTAINEROF(plug, RecordChannel, plug);
+ uint32_t read_pos;
+ uint32_t now;
+ uint32_t len;
+
+ ASSERT(record_channel->base.active);
+
+ if (record_channel->write_pos < RECORD_SAMPLES_SIZE / 2) {
+ return 0;
+ }
+
+ len = MIN(record_channel->write_pos - record_channel->read_pos, num_samples);
+
+ if (len < num_samples) {
+ SndWorker *worker = record_channel->base.worker;
+ snd_receive(record_channel);
+ if (!worker->connection) {
+ return 0;
+ }
+ len = MIN(record_channel->write_pos - record_channel->read_pos, num_samples);
+ }
+
+ read_pos = record_channel->read_pos % RECORD_SAMPLES_SIZE;
+ record_channel->read_pos += len;
+ now = MIN(len, RECORD_SAMPLES_SIZE - read_pos);
+ memcpy(samples, &record_channel->samples[read_pos], now * 4);
+ if (now < len) {
+ memcpy(samples + now, record_channel->samples, (len - now) * 4);
+ }
+ return len;
+}
+
+static void on_new_record_channel(SndWorker *worker)
+{
+ RecordChannel *record_channel = (RecordChannel *)worker->connection;
+ RecordInterface *interface = (RecordInterface *)worker->interface;
+ ASSERT(record_channel);
+
+ record_channel->plug_ref = interface->plug(interface, &record_channel->plug,
+ &record_channel->base.active);
+ if (!record_channel->base.migrate) {
+ if (record_channel->base.active) {
+ snd_set_command((SndChannel *)record_channel, SND_RECORD_CTRL_MASK);
+ }
+ }
+}
+
+static void snd_record_cleanup(SndChannel *channel)
+{
+ RecordChannel *record_channel = (RecordChannel *)channel;
+ RecordInterface *interface = (RecordInterface *)channel->worker->interface;
+ interface->unplug(interface, record_channel->plug_ref);
+
+ celt051_decoder_destroy(record_channel->celt_decoder);
+ celt051_mode_destroy(record_channel->celt_mode);
+}
+
+static void snd_set_record_peer(Channel *channel, RedsStreamContext *peer, int migration,
+ int num_common_caps, uint32_t *common_caps, int num_caps,
+ uint32_t *caps)
+{
+ SndWorker *worker = (SndWorker *)channel;
+ RecordChannel *record_channel;
+ CELTDecoder *celt_decoder;
+ CELTMode *celt_mode;
+ int celt_error;
+
+ snd_disconnect_channel(worker->connection);
+
+ if (!(celt_mode = celt051_mode_create(VD_INTERFACE_RECORD_FREQ, VD_INTERFACE_RECORD_CHAN,
+ FRAME_SIZE, &celt_error))) {
+ red_printf("create celt mode failed %d", celt_error);
+ return;
+ }
+
+ if (!(celt_decoder = celt051_decoder_create(celt_mode))) {
+ red_printf("create celt decoder failed");
+ goto error_1;
+ }
+
+ if (!(record_channel = (RecordChannel *)__new_channel(worker,
+ sizeof(*record_channel),
+ peer,
+ migration,
+ snd_record_send,
+ snd_record_handle_message,
+ snd_record_on_message_done,
+ snd_record_cleanup))) {
+ goto error_2;
+ }
+
+ worker->connection = &record_channel->base;
+
+ record_channel->plug.major_version = VD_INTERFACE_RECORD_MAJOR;
+ record_channel->plug.minor_version = VD_INTERFACE_RECORD_MINOR;
+ record_channel->plug.start = snd_record_start;
+ record_channel->plug.stop = snd_record_stop;
+ record_channel->plug.read = snd_record_read;
+ record_channel->celt_mode = celt_mode;
+ record_channel->celt_decoder = celt_decoder;
+
+ on_new_record_channel(worker);
+ snd_record_send(worker->connection);
+ return;
+
+error_1:
+ celt051_decoder_destroy(celt_decoder);
+
+error_2:
+ celt051_mode_destroy(celt_mode);
+}
+
+static void snd_playback_migrate(Channel *channel)
+{
+ SndWorker *worker = (SndWorker *)channel;
+
+ if (worker->connection) {
+ snd_set_command(worker->connection, SND_PLAYBACK_MIGRATE_MASK);
+ snd_playback_send(worker->connection);
+ }
+}
+
+static void add_worker(SndWorker *worker)
+{
+ worker->next = workers;
+ workers = worker;
+}
+
+static void remove_worker(SndWorker *worker)
+{
+ SndWorker **now = &workers;
+ while (*now) {
+ if (*now == worker) {
+ *now = worker->next;
+ return;
+ }
+ now = &(*now)->next;
+ }
+ red_printf("not found");
+}
+
+static SndWorker *find_worker(VDInterface *interface)
+{
+ SndWorker *worker = workers;
+ while (worker) {
+ if (worker->interface == interface) {
+ break;
+ }
+ worker = worker->next;
+ }
+ return worker;
+}
+
+void snd_attach_playback(PlaybackInterface *interface)
+{
+ SndWorker *playback_worker;
+ if (!(playback_worker = (SndWorker *)malloc(sizeof(*playback_worker)))) {
+ red_error("playback channel malloc failed");
+ }
+ memset(playback_worker, 0, sizeof(*playback_worker));
+ playback_worker->base.type = RED_CHANNEL_PLAYBACK;
+ playback_worker->base.link = snd_set_playback_peer;
+ playback_worker->base.shutdown = snd_shutdown;
+ playback_worker->base.migrate = snd_playback_migrate;
+ playback_worker->base.data = NULL;
+
+ playback_worker->interface = &interface->base;
+ playback_worker->base.num_caps = 1;
+ if (!(playback_worker->base.caps = malloc(sizeof(uint32_t)))) {
+ PANIC("malloc failed");
+ }
+ playback_worker->base.caps[0] = (1 << RED_PLAYBACK_CAP_CELT_0_5_1);
+
+ add_worker(playback_worker);
+ reds_register_channel(&playback_worker->base);
+}
+
+void snd_attach_record(RecordInterface *interface)
+{
+ SndWorker *record_worker;
+ if (!(record_worker = (SndWorker *)malloc(sizeof(*record_worker)))) {
+ PANIC("malloc failed");
+ }
+
+ memset(record_worker, 0, sizeof(*record_worker));
+ record_worker->base.type = RED_CHANNEL_RECORD;
+ record_worker->base.link = snd_set_record_peer;
+ record_worker->base.shutdown = snd_shutdown;
+ record_worker->base.migrate = snd_record_migrate;
+ record_worker->base.data = NULL;
+
+ record_worker->interface = &interface->base;
+
+ record_worker->base.num_caps = 1;
+ if (!(record_worker->base.caps = malloc(sizeof(uint32_t)))) {
+ PANIC("malloc failed");
+ }
+ record_worker->base.caps[0] = (1 << RED_RECORD_CAP_CELT_0_5_1);
+ add_worker(record_worker);
+ reds_register_channel(&record_worker->base);
+}
+
+static void snd_detach_common(VDInterface *interface)
+{
+ SndWorker *worker = find_worker(interface);
+
+ if (!worker) {
+ return;
+ }
+ remove_worker(worker);
+ snd_disconnect_channel(worker->connection);
+ reds_unregister_channel(&worker->base);
+
+ free(worker->base.common_caps);
+ free(worker->base.caps);
+ free(worker);
+}
+
+void snd_detach_playback(PlaybackInterface *interface)
+{
+ snd_detach_common(&interface->base);
+}
+
+void snd_detach_record(RecordInterface *interface)
+{
+ snd_detach_common(&interface->base);
+}
+
+void snd_set_playback_compression(int on)
+{
+ SndWorker *now = workers;
+
+ playback_compression = on ? RED_AUDIO_DATA_MODE_CELT_0_5_1 : RED_AUDIO_DATA_MODE_RAW;
+ for (; now; now = now->next) {
+ if (now->base.type == RED_CHANNEL_PLAYBACK && now->connection) {
+ PlaybackChannel* playback = (PlaybackChannel*)now->connection;
+ if (!playback->celt_allowed) {
+ ASSERT(playback->mode == RED_AUDIO_DATA_MODE_RAW);
+ continue;
+ }
+ if (playback->mode != playback_compression) {
+ playback->mode = playback_compression;
+ snd_set_command(now->connection, SND_PLAYBACK_MODE_MASK);
+ }
+ }
+ }
+}
+
+int snd_get_playback_compression()
+{
+ return (playback_compression == RED_AUDIO_DATA_MODE_RAW) ? FALSE : TRUE;
+}
+