/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
Copyright (C) 2015 Red Hat, Inc.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see .
*/
#include
#include
#include "giopipe.h"
#define TYPE_PIPE_INPUT_STREAM (pipe_input_stream_get_type ())
#define PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStream))
#define PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
#define IS_PIPE_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_INPUT_STREAM))
#define IS_PIPE_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_INPUT_STREAM))
#define PIPE_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_INPUT_STREAM, PipeInputStreamClass))
typedef struct _PipeInputStreamClass PipeInputStreamClass;
typedef struct _PipeInputStream PipeInputStream;
typedef struct _PipeOutputStream PipeOutputStream;
struct _PipeInputStream
{
GInputStream parent_instance;
PipeOutputStream *peer;
gssize read;
/* GIOstream:closed is protected against pending operations, so we
* use an additional close flag to cancel those when the peer is
* closing.
*/
gboolean peer_closed;
GList *sources;
};
struct _PipeInputStreamClass
{
GInputStreamClass parent_class;
};
#define TYPE_PIPE_OUTPUT_STREAM (pipe_output_stream_get_type ())
#define PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStream))
#define PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
#define IS_PIPE_OUTPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), TYPE_PIPE_OUTPUT_STREAM))
#define IS_PIPE_OUTPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), TYPE_PIPE_OUTPUT_STREAM))
#define PIPE_OUTPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), TYPE_PIPE_OUTPUT_STREAM, PipeOutputStreamClass))
typedef struct _PipeOutputStreamClass PipeOutputStreamClass;
struct _PipeOutputStream
{
GOutputStream parent_instance;
PipeInputStream *peer;
const gchar *buffer;
gsize count;
gboolean peer_closed;
GList *sources;
};
struct _PipeOutputStreamClass
{
GOutputStreamClass parent_class;
};
static void pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
static void pipe_input_stream_check_source (PipeInputStream *self);
static void pipe_output_stream_check_source (PipeOutputStream *self);
G_DEFINE_TYPE_WITH_CODE (PipeInputStream, pipe_input_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
pipe_input_stream_pollable_iface_init))
static gssize
pipe_input_stream_read (GInputStream *stream,
void *buffer,
gsize count,
GCancellable *cancellable,
GError **error)
{
PipeInputStream *self = PIPE_INPUT_STREAM (stream);
g_return_val_if_fail(count > 0, -1);
if (g_input_stream_is_closed (stream) || self->peer_closed) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is already closed");
return -1;
}
if (!self->peer->buffer) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror(EAGAIN));
return -1;
}
count = MIN(self->peer->count, count);
memcpy(buffer, self->peer->buffer, count);
self->read = count;
self->peer->buffer = NULL;
//g_debug("read %p :%"G_GSIZE_FORMAT, self->peer, count);
/* schedule peer source */
pipe_output_stream_check_source(self->peer);
return count;
}
static GList *
set_all_sources_ready (GList *sources)
{
GList *it = sources;
while (it != NULL) {
GSource *s = it->data;
GList *next = it->next;
if (s == NULL || g_source_is_destroyed(s)) {
/* remove */
sources = g_list_delete_link(sources, it);
g_source_unref(s);
} else {
/* dispatch */
g_source_set_ready_time(s, 0);
}
it = next;
}
return sources;
}
static void
pipe_input_stream_check_source (PipeInputStream *self)
{
if (g_pollable_input_stream_is_readable(G_POLLABLE_INPUT_STREAM(self)))
self->sources = set_all_sources_ready(self->sources);
}
static gboolean
pipe_input_stream_close (GInputStream *stream,
GCancellable *cancellable,
GError **error)
{
PipeInputStream *self;
self = PIPE_INPUT_STREAM(stream);
if (self->peer) {
/* ignore any pending errors */
self->peer->peer_closed = TRUE;
g_output_stream_close(G_OUTPUT_STREAM(self->peer), cancellable, NULL);
pipe_output_stream_check_source(self->peer);
}
return TRUE;
}
static void
pipe_input_stream_close_async (GInputStream *stream,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data)
{
GTask *task;
task = g_task_new (stream, cancellable, callback, data);
/* will always return TRUE */
pipe_input_stream_close (stream, cancellable, NULL);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
static gboolean
pipe_input_stream_close_finish (GInputStream *stream,
GAsyncResult *result,
GError **error)
{
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
return g_task_propagate_boolean (G_TASK (result), error);
}
static void
pipe_input_stream_init (PipeInputStream *self)
{
self->read = -1;
}
static void
pipe_input_stream_dispose(GObject *object)
{
PipeInputStream *self;
self = PIPE_INPUT_STREAM(object);
if (self->peer) {
g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
self->peer = NULL;
}
g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
self->sources = NULL;
G_OBJECT_CLASS(pipe_input_stream_parent_class)->dispose (object);
}
static void
pipe_input_stream_class_init (PipeInputStreamClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
istream_class->read_fn = pipe_input_stream_read;
istream_class->close_fn = pipe_input_stream_close;
istream_class->close_async = pipe_input_stream_close_async;
istream_class->close_finish = pipe_input_stream_close_finish;
gobject_class->dispose = pipe_input_stream_dispose;
}
static gboolean
pipe_input_stream_is_readable (GPollableInputStream *stream)
{
PipeInputStream *self = PIPE_INPUT_STREAM (stream);
gboolean readable;
readable = (self->peer && self->peer->buffer && self->read == -1) || self->peer_closed;
//g_debug("readable %p %d", self->peer, readable);
return readable;
}
static GSource *
pipe_input_stream_create_source (GPollableInputStream *stream,
GCancellable *cancellable)
{
PipeInputStream *self = PIPE_INPUT_STREAM(stream);
GSource *pollable_source;
pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
return pollable_source;
}
static void
pipe_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
{
iface->is_readable = pipe_input_stream_is_readable;
iface->create_source = pipe_input_stream_create_source;
}
static void pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
G_DEFINE_TYPE_WITH_CODE (PipeOutputStream, pipe_output_stream, G_TYPE_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
pipe_output_stream_pollable_iface_init))
static gssize
pipe_output_stream_write (GOutputStream *stream,
const void *buffer,
gsize count,
GCancellable *cancellable,
GError **error)
{
PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
PipeInputStream *peer = self->peer;
//g_debug("write %p :%"G_GSIZE_FORMAT, stream, count);
if (g_output_stream_is_closed (stream) || self->peer_closed) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
"Stream is already closed");
return -1;
}
/* this abuses pollable stream, writing sync would likely lead to
crashes, since the buffer pointer would become invalid, a
generic solution would need a copy..
*/
g_return_val_if_fail(self->buffer == buffer || self->buffer == NULL, -1);
self->buffer = buffer;
self->count = count;
pipe_input_stream_check_source(self->peer);
if (peer->read < 0) {
g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
g_strerror (EAGAIN));
return -1;
}
g_assert(peer->read <= self->count);
count = peer->read;
self->buffer = NULL;
self->count = 0;
peer->read = -1;
return count;
}
static void
pipe_output_stream_init (PipeOutputStream *stream)
{
}
static void
pipe_output_stream_dispose(GObject *object)
{
PipeOutputStream *self;
self = PIPE_OUTPUT_STREAM(object);
if (self->peer) {
g_object_remove_weak_pointer(G_OBJECT(self->peer), (gpointer*)&self->peer);
self->peer = NULL;
}
g_list_free_full (self->sources, (GDestroyNotify) g_source_unref);
self->sources = NULL;
G_OBJECT_CLASS(pipe_output_stream_parent_class)->dispose (object);
}
static void
pipe_output_stream_check_source (PipeOutputStream *self)
{
if (g_pollable_output_stream_is_writable(G_POLLABLE_OUTPUT_STREAM(self)))
self->sources = set_all_sources_ready(self->sources);
}
static gboolean
pipe_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
GError **error)
{
PipeOutputStream *self;
self = PIPE_OUTPUT_STREAM(stream);
if (self->peer) {
/* ignore any pending errors */
self->peer->peer_closed = TRUE;
g_input_stream_close(G_INPUT_STREAM(self->peer), cancellable, NULL);
pipe_input_stream_check_source(self->peer);
}
return TRUE;
}
static void
pipe_output_stream_close_async (GOutputStream *stream,
int io_priority,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer data)
{
GTask *task;
task = g_task_new (stream, cancellable, callback, data);
/* will always return TRUE */
pipe_output_stream_close (stream, cancellable, NULL);
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
static gboolean
pipe_output_stream_close_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error)
{
g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
return g_task_propagate_boolean (G_TASK (result), error);
}
static void
pipe_output_stream_class_init (PipeOutputStreamClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
GOutputStreamClass *ostream_class = G_OUTPUT_STREAM_CLASS (klass);
ostream_class->write_fn = pipe_output_stream_write;
ostream_class->close_fn = pipe_output_stream_close;
ostream_class->close_async = pipe_output_stream_close_async;
ostream_class->close_finish = pipe_output_stream_close_finish;
gobject_class->dispose = pipe_output_stream_dispose;
}
static gboolean
pipe_output_stream_is_writable (GPollableOutputStream *stream)
{
PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
gboolean writable;
writable = self->buffer == NULL || self->peer->read >= 0;
//g_debug("writable %p %d", self, writable);
return writable;
}
static GSource *
pipe_output_stream_create_source (GPollableOutputStream *stream,
GCancellable *cancellable)
{
PipeOutputStream *self = PIPE_OUTPUT_STREAM(stream);
GSource *pollable_source;
pollable_source = g_pollable_source_new_full (self, NULL, cancellable);
self->sources = g_list_prepend (self->sources, g_source_ref (pollable_source));
return pollable_source;
}
static void
pipe_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
{
iface->is_writable = pipe_output_stream_is_writable;
iface->create_source = pipe_output_stream_create_source;
}
G_GNUC_INTERNAL void
make_gio_pipe(GInputStream **input, GOutputStream **output)
{
PipeInputStream *in;
PipeOutputStream *out;
g_return_if_fail(input != NULL && *input == NULL);
g_return_if_fail(output != NULL && *output == NULL);
in = g_object_new(TYPE_PIPE_INPUT_STREAM, NULL);
out = g_object_new(TYPE_PIPE_OUTPUT_STREAM, NULL);
out->peer = in;
g_object_add_weak_pointer(G_OBJECT(in), (gpointer*)&out->peer);
in->peer = out;
g_object_add_weak_pointer(G_OBJECT(out), (gpointer*)&in->peer);
*input = G_INPUT_STREAM(in);
*output = G_OUTPUT_STREAM(out);
}
G_GNUC_INTERNAL void
spice_make_pipe(GIOStream **p1, GIOStream **p2)
{
GInputStream *in1 = NULL, *in2 = NULL;
GOutputStream *out1 = NULL, *out2 = NULL;
g_return_if_fail(p1 != NULL);
g_return_if_fail(p2 != NULL);
g_return_if_fail(*p1 == NULL);
g_return_if_fail(*p2 == NULL);
make_gio_pipe(&in1, &out2);
make_gio_pipe(&in2, &out1);
*p1 = g_simple_io_stream_new(in1, out1);
*p2 = g_simple_io_stream_new(in2, out2);
g_object_unref(in1);
g_object_unref(in2);
g_object_unref(out1);
g_object_unref(out2);
}