summaryrefslogtreecommitdiffstats
path: root/ctdb
diff options
context:
space:
mode:
authorAndrew Tridgell <tridge@samba.org>2006-12-19 12:03:10 +1100
committerAndrew Tridgell <tridge@samba.org>2006-12-19 12:03:10 +1100
commit3c097c9a5fef7a6aa231173cf970b8f6bb11cc82 (patch)
tree8ec011161373f994dc5399fc9ab38b1d9338aea7 /ctdb
parentee547a0f9aa1642110f4740eaecd552b7ea63539 (diff)
downloadsamba-3c097c9a5fef7a6aa231173cf970b8f6bb11cc82.tar.gz
samba-3c097c9a5fef7a6aa231173cf970b8f6bb11cc82.tar.xz
samba-3c097c9a5fef7a6aa231173cf970b8f6bb11cc82.zip
added handling of partial packet reads
added transport level packet allocator, allowing the transport to enforce alignment or special memory rules (This used to be ctdb commit 50304a5c4d8d640732678eeed793857334ca5ec1)
Diffstat (limited to 'ctdb')
-rw-r--r--ctdb/common/ctdb_call.c12
-rw-r--r--ctdb/include/ctdb_private.h1
-rw-r--r--ctdb/tcp/ctdb_tcp.h15
-rw-r--r--ctdb/tcp/tcp_connect.c2
-rw-r--r--ctdb/tcp/tcp_init.c16
-rw-r--r--ctdb/tcp/tcp_io.c69
6 files changed, 97 insertions, 18 deletions
diff --git a/ctdb/common/ctdb_call.c b/ctdb/common/ctdb_call.c
index ca111b7e21f..81f3cbdea10 100644
--- a/ctdb/common/ctdb_call.c
+++ b/ctdb/common/ctdb_call.c
@@ -130,7 +130,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
va_end(ap);
len = strlen(msg)+1;
- r = talloc_size(ctdb, sizeof(*r) + len);
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + len;
r->hdr.operation = CTDB_REPLY_ERROR;
@@ -158,7 +158,7 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
{
struct ctdb_reply_redirect *r;
- r = talloc_size(ctdb, sizeof(*r));
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r);
r->hdr.operation = CTDB_REPLY_REDIRECT;
@@ -188,7 +188,7 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
int len;
len = sizeof(*r) + key->dsize + data->dsize;
- r = talloc_size(ctdb, len);
+ r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = len;
r->hdr.operation = CTDB_REQ_DMASTER;
@@ -255,7 +255,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
}
/* send the CTDB_REPLY_DMASTER */
- r = talloc_size(ctdb, sizeof(*r) + data.dsize);
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + data.dsize;
r->hdr.operation = CTDB_REPLY_DMASTER;
@@ -317,7 +317,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
call_data.dsize?&call_data:NULL,
&reply_data, c->hdr.srcnode);
- r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
+ r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->hdr.length = sizeof(*r) + reply_data.dsize;
r->hdr.operation = CTDB_REPLY_CALL;
@@ -539,7 +539,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
CTDB_NO_MEMORY_NULL(ctdb, state);
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
- state->c = talloc_size(ctdb, len);
+ state->c = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY_NULL(ctdb, state->c);
state->c->hdr.length = len;
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index f21139eab2c..a024147c63e 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -55,6 +55,7 @@ struct ctdb_methods {
int (*start)(struct ctdb_context *); /* start protocol processing */
int (*add_node)(struct ctdb_node *); /* setup a new node */
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
+ void *(*allocate_pkt)(struct ctdb_context *, size_t );
};
/*
diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h
index 14e96ea8979..039343a9985 100644
--- a/ctdb/tcp/ctdb_tcp.h
+++ b/ctdb/tcp/ctdb_tcp.h
@@ -25,13 +25,28 @@ struct ctdb_tcp {
};
/*
+ incoming packet structure - only used when we get a partial packet
+ on read
+*/
+struct ctdb_tcp_partial {
+ uint8_t *data;
+ uint32_t length;
+};
+
+
+/*
state associated with an incoming connection
*/
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
+ struct ctdb_tcp_partial partial;
};
+/*
+ outgoing packet structure - only allocated when we can't write immediately
+ to the socket
+*/
struct ctdb_tcp_packet {
struct ctdb_tcp_packet *next, *prev;
uint8_t *data;
diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c
index 6074b646aa4..bdcab2d5cac 100644
--- a/ctdb/tcp/tcp_connect.c
+++ b/ctdb/tcp/tcp_connect.c
@@ -135,7 +135,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
- in = talloc(ctdb, struct ctdb_incoming);
+ in = talloc_zero(ctdb, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c
index b3378677035..fca6506676f 100644
--- a/ctdb/tcp/tcp_init.c
+++ b/ctdb/tcp/tcp_init.c
@@ -64,10 +64,24 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
}
+/*
+ transport packet allocator - allows transport to control memory for packets
+*/
+void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+{
+ /* tcp transport needs to round to 8 byte alignment to ensure
+ that we can use a length header and 64 bit elements in
+ structures */
+ size = (size+7) & ~7;
+ return talloc_size(ctdb, size);
+}
+
+
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node,
- .queue_pkt = ctdb_tcp_queue_pkt
+ .queue_pkt = ctdb_tcp_queue_pkt,
+ .allocate_pkt = ctdb_tcp_allocate_pkt
};
/*
diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c
index d572ba533fd..167e3a2ca7d 100644
--- a/ctdb/tcp/tcp_io.c
+++ b/ctdb/tcp/tcp_io.c
@@ -25,6 +25,7 @@
#include "ctdb_private.h"
#include "ctdb_tcp.h"
+
/*
called when we fail to send a message to a node
*/
@@ -109,11 +110,8 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
{
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
int num_ready = 0;
- uint8_t *data;
-
- /* NOTE: we don't yet handle combined packets or partial
- packets. Obviously that needed fixing, using a similar
- scheme to the Samba4 packet layer */
+ ssize_t nread;
+ uint8_t *data, *data_base;
if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
num_ready == 0) {
@@ -126,20 +124,71 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
return;
}
- data = talloc_size(in, num_ready);
- if (data == NULL) {
+ in->partial.data = talloc_realloc_size(in, in->partial.data,
+ num_ready + in->partial.length);
+ if (in->partial.data == NULL) {
/* not much we can do except drop the socket */
talloc_free(in);
return;
}
- if (read(in->fd, data, num_ready) != num_ready) {
+ nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
+ if (nread <= 0) {
+ /* the connection must be dead */
talloc_free(in);
return;
}
- /* tell the ctdb layer above that we have a packet */
- in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
+ data = in->partial.data;
+ nread += in->partial.length;
+
+ in->partial.data = NULL;
+ in->partial.length = 0;
+
+ if (nread >= 4 && *(uint32_t *)data == nread) {
+ /* most common case - we got a whole packet in one go
+ tell the ctdb layer above that we have a packet */
+ in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
+ return;
+ }
+
+ data_base = data;
+
+ while (nread >= 4 && *(uint32_t *)data <= nread) {
+ /* we have at least one packet */
+ uint8_t *d2;
+ uint32_t len;
+ len = *(uint32_t *)data;
+ d2 = talloc_memdup(in, data, len);
+ if (d2 == NULL) {
+ /* sigh */
+ talloc_free(in);
+ return;
+ }
+ in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
+ data += len;
+ nread -= len;
+ return;
+ }
+
+ if (nread < 4 || *(uint32_t *)data > nread) {
+ /* we have only part of a packet */
+ if (data_base == data) {
+ in->partial.data = data;
+ in->partial.length = nread;
+ } else {
+ in->partial.data = talloc_memdup(in, data, nread);
+ if (in->partial.data == NULL) {
+ talloc_free(in);
+ return;
+ }
+ in->partial.length = nread;
+ talloc_free(data_base);
+ }
+ return;
+ }
+
+ talloc_free(data_base);
}
/*