summaryrefslogtreecommitdiffstats
path: root/ctdb/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/tcp')
-rw-r--r--ctdb/tcp/ctdb_tcp.h15
-rw-r--r--ctdb/tcp/tcp_connect.c2
-rw-r--r--ctdb/tcp/tcp_init.c16
-rw-r--r--ctdb/tcp/tcp_io.c69
4 files changed, 90 insertions, 12 deletions
diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h
index 14e96ea8979..039343a9985 100644
--- a/ctdb/tcp/ctdb_tcp.h
+++ b/ctdb/tcp/ctdb_tcp.h
@@ -25,13 +25,28 @@ struct ctdb_tcp {
};
/*
+ incoming packet structure - only used when we get a partial packet
+ on read
+*/
+struct ctdb_tcp_partial {
+ uint8_t *data;
+ uint32_t length;
+};
+
+
+/*
state associated with an incoming connection
*/
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
+ struct ctdb_tcp_partial partial;
};
+/*
+ outgoing packet structure - only allocated when we can't write immediately
+ to the socket
+*/
struct ctdb_tcp_packet {
struct ctdb_tcp_packet *next, *prev;
uint8_t *data;
diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c
index 6074b646aa4..bdcab2d5cac 100644
--- a/ctdb/tcp/tcp_connect.c
+++ b/ctdb/tcp/tcp_connect.c
@@ -135,7 +135,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
- in = talloc(ctdb, struct ctdb_incoming);
+ in = talloc_zero(ctdb, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c
index b3378677035..fca6506676f 100644
--- a/ctdb/tcp/tcp_init.c
+++ b/ctdb/tcp/tcp_init.c
@@ -64,10 +64,24 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
}
+/*
+ transport packet allocator - allows transport to control memory for packets
+*/
+void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+{
+ /* tcp transport needs to round to 8 byte alignment to ensure
+ that we can use a length header and 64 bit elements in
+ structures */
+ size = (size+7) & ~7;
+ return talloc_size(ctdb, size);
+}
+
+
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node,
- .queue_pkt = ctdb_tcp_queue_pkt
+ .queue_pkt = ctdb_tcp_queue_pkt,
+ .allocate_pkt = ctdb_tcp_allocate_pkt
};
/*
diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c
index d572ba533fd..167e3a2ca7d 100644
--- a/ctdb/tcp/tcp_io.c
+++ b/ctdb/tcp/tcp_io.c
@@ -25,6 +25,7 @@
#include "ctdb_private.h"
#include "ctdb_tcp.h"
+
/*
called when we fail to send a message to a node
*/
@@ -109,11 +110,8 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
{
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
int num_ready = 0;
- uint8_t *data;
-
- /* NOTE: we don't yet handle combined packets or partial
- packets. Obviously that needed fixing, using a similar
- scheme to the Samba4 packet layer */
+ ssize_t nread;
+ uint8_t *data, *data_base;
if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
num_ready == 0) {
@@ -126,20 +124,71 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
return;
}
- data = talloc_size(in, num_ready);
- if (data == NULL) {
+ in->partial.data = talloc_realloc_size(in, in->partial.data,
+ num_ready + in->partial.length);
+ if (in->partial.data == NULL) {
/* not much we can do except drop the socket */
talloc_free(in);
return;
}
- if (read(in->fd, data, num_ready) != num_ready) {
+ nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
+ if (nread <= 0) {
+ /* the connection must be dead */
talloc_free(in);
return;
}
- /* tell the ctdb layer above that we have a packet */
- in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
+ data = in->partial.data;
+ nread += in->partial.length;
+
+ in->partial.data = NULL;
+ in->partial.length = 0;
+
+ if (nread >= 4 && *(uint32_t *)data == nread) {
+ /* most common case - we got a whole packet in one go
+ tell the ctdb layer above that we have a packet */
+ in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
+ return;
+ }
+
+ data_base = data;
+
+ while (nread >= 4 && *(uint32_t *)data <= nread) {
+ /* we have at least one packet */
+ uint8_t *d2;
+ uint32_t len;
+ len = *(uint32_t *)data;
+ d2 = talloc_memdup(in, data, len);
+ if (d2 == NULL) {
+ /* sigh */
+ talloc_free(in);
+ return;
+ }
+ in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
+ data += len;
+ nread -= len;
+ return;
+ }
+
+ if (nread < 4 || *(uint32_t *)data > nread) {
+ /* we have only part of a packet */
+ if (data_base == data) {
+ in->partial.data = data;
+ in->partial.length = nread;
+ } else {
+ in->partial.data = talloc_memdup(in, data, nread);
+ if (in->partial.data == NULL) {
+ talloc_free(in);
+ return;
+ }
+ in->partial.length = nread;
+ talloc_free(data_base);
+ }
+ return;
+ }
+
+ talloc_free(data_base);
}
/*