diff options
Diffstat (limited to 'ctdb/tcp')
-rw-r--r-- | ctdb/tcp/ctdb_tcp.h | 15 | ||||
-rw-r--r-- | ctdb/tcp/tcp_connect.c | 2 | ||||
-rw-r--r-- | ctdb/tcp/tcp_init.c | 16 | ||||
-rw-r--r-- | ctdb/tcp/tcp_io.c | 69 |
4 files changed, 90 insertions, 12 deletions
diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h index 14e96ea8979..039343a9985 100644 --- a/ctdb/tcp/ctdb_tcp.h +++ b/ctdb/tcp/ctdb_tcp.h @@ -25,13 +25,28 @@ struct ctdb_tcp { }; /* + incoming packet structure - only used when we get a partial packet + on read +*/ +struct ctdb_tcp_partial { + uint8_t *data; + uint32_t length; +}; + + +/* state associated with an incoming connection */ struct ctdb_incoming { struct ctdb_context *ctdb; int fd; + struct ctdb_tcp_partial partial; }; +/* + outgoing packet structure - only allocated when we can't write immediately + to the socket +*/ struct ctdb_tcp_packet { struct ctdb_tcp_packet *next, *prev; uint8_t *data; diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c index 6074b646aa4..bdcab2d5cac 100644 --- a/ctdb/tcp/tcp_connect.c +++ b/ctdb/tcp/tcp_connect.c @@ -135,7 +135,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len); if (fd == -1) return; - in = talloc(ctdb, struct ctdb_incoming); + in = talloc_zero(ctdb, struct ctdb_incoming); in->fd = fd; in->ctdb = ctdb; diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c index b3378677035..fca6506676f 100644 --- a/ctdb/tcp/tcp_init.c +++ b/ctdb/tcp/tcp_init.c @@ -64,10 +64,24 @@ int ctdb_tcp_add_node(struct ctdb_node *node) } +/* + transport packet allocator - allows transport to control memory for packets +*/ +void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size) +{ + /* tcp transport needs to round to 8 byte alignment to ensure + that we can use a length header and 64 bit elements in + structures */ + size = (size+7) & ~7; + return talloc_size(ctdb, size); +} + + static const struct ctdb_methods ctdb_tcp_methods = { .start = ctdb_tcp_start, .add_node = ctdb_tcp_add_node, - .queue_pkt = ctdb_tcp_queue_pkt + .queue_pkt = ctdb_tcp_queue_pkt, + .allocate_pkt = ctdb_tcp_allocate_pkt }; /* diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c index d572ba533fd..167e3a2ca7d 100644 --- a/ctdb/tcp/tcp_io.c +++ b/ctdb/tcp/tcp_io.c @@ -25,6 +25,7 @@ #include "ctdb_private.h" #include "ctdb_tcp.h" + /* called when we fail to send a message to a node */ @@ -109,11 +110,8 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, { struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); int num_ready = 0; - uint8_t *data; - - /* NOTE: we don't yet handle combined packets or partial - packets. Obviously that needed fixing, using a similar - scheme to the Samba4 packet layer */ + ssize_t nread; + uint8_t *data, *data_base; if (ioctl(in->fd, FIONREAD, &num_ready) != 0 || num_ready == 0) { @@ -126,20 +124,71 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, return; } - data = talloc_size(in, num_ready); - if (data == NULL) { + in->partial.data = talloc_realloc_size(in, in->partial.data, + num_ready + in->partial.length); + if (in->partial.data == NULL) { /* not much we can do except drop the socket */ talloc_free(in); return; } - if (read(in->fd, data, num_ready) != num_ready) { + nread = read(in->fd, in->partial.data+in->partial.length, num_ready); + if (nread <= 0) { + /* the connection must be dead */ talloc_free(in); return; } - /* tell the ctdb layer above that we have a packet */ - in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready); + data = in->partial.data; + nread += in->partial.length; + + in->partial.data = NULL; + in->partial.length = 0; + + if (nread >= 4 && *(uint32_t *)data == nread) { + /* most common case - we got a whole packet in one go + tell the ctdb layer above that we have a packet */ + in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread); + return; + } + + data_base = data; + + while (nread >= 4 && *(uint32_t *)data <= nread) { + /* we have at least one packet */ + uint8_t *d2; + uint32_t len; + len = *(uint32_t *)data; + d2 = talloc_memdup(in, data, len); + if (d2 == NULL) { + /* sigh */ + talloc_free(in); + return; + } + in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len); + data += len; + nread -= len; + return; + } + + if (nread < 4 || *(uint32_t *)data > nread) { + /* we have only part of a packet */ + if (data_base == data) { + in->partial.data = data; + in->partial.length = nread; + } else { + in->partial.data = talloc_memdup(in, data, nread); + if (in->partial.data == NULL) { + talloc_free(in); + return; + } + in->partial.length = nread; + talloc_free(data_base); + } + return; + } + + talloc_free(data_base); } /* |