diff options
author | Andrew Tridgell <tridge@samba.org> | 2006-11-28 14:15:46 +1100 |
---|---|---|
committer | Andrew Tridgell <tridge@samba.org> | 2006-11-28 14:15:46 +1100 |
commit | 5d0ba69e06d2f37788697c3b7c39287334d2fa22 (patch) | |
tree | 52f2309a327511535dfde9b7e130f731d51d09d9 | |
parent | 5b06e73fb1051e9fe370a76ef4fc87e514a1f9e5 (diff) | |
download | samba-5d0ba69e06d2f37788697c3b7c39287334d2fa22.tar.gz samba-5d0ba69e06d2f37788697c3b7c39287334d2fa22.tar.xz samba-5d0ba69e06d2f37788697c3b7c39287334d2fa22.zip |
- setup a convenience name field for nodes
- added basic IO handling for the tcp backend
- added a ctdb_node_dead upcall
- added packet queueing
- adding incoming packet handling
(This used to be ctdb commit 415497c952630e746e8cdcf8e1e2a7b2ac3e51fb)
-rw-r--r-- | ctdb/common/ctdb.c | 16 | ||||
-rw-r--r-- | ctdb/include/ctdb_private.h | 4 | ||||
-rw-r--r-- | ctdb/tcp/ctdb_tcp.h | 15 | ||||
-rw-r--r-- | ctdb/tcp/tcp_connect.c | 33 | ||||
-rw-r--r-- | ctdb/tcp/tcp_init.c | 5 | ||||
-rw-r--r-- | ctdb/tcp/tcp_io.c | 158 |
6 files changed, 205 insertions, 26 deletions
diff --git a/ctdb/common/ctdb.c b/ctdb/common/ctdb.c index fc82e8cf17..588bff2f7f 100644 --- a/ctdb/common/ctdb.c +++ b/ctdb/common/ctdb.c @@ -89,6 +89,9 @@ static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr) return -1; } node->ctdb = ctdb; + node->name = talloc_asprintf(node, "%s:%u", + node->address.address, + node->address.port); if (ctdb->methods->add_node(node) != 0) { talloc_free(node); @@ -194,13 +197,22 @@ bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2) /* called by the transport layer when a packet comes in */ -static void ctdb_recv_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) +static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length) { printf("received pkt of length %d\n", length); } +/* + called by the transport layer when a node is dead +*/ +static void ctdb_node_dead(struct ctdb_node *node) +{ + printf("node %s is dead\n", node->name); +} + static const struct ctdb_upcalls ctdb_upcalls = { - .recv_pkt = ctdb_recv_pkt + .recv_pkt = ctdb_recv_pkt, + .node_dead = ctdb_node_dead }; /* diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index bc13c935c5..41d51a1773 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -44,6 +44,7 @@ struct ctdb_node { struct ctdb_context *ctdb; struct ctdb_node *next, *prev; struct ctdb_address address; + const char *name; /* for debug messages */ void *private; /* private to transport */ }; @@ -60,7 +61,8 @@ struct ctdb_methods { transport calls up to the ctdb layer */ struct ctdb_upcalls { - void (*recv_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length); + void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length); + void (*node_dead)(struct ctdb_node *); }; /* main state of the ctdb daemon */ diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h index 571c20508b..14e96ea897 100644 --- a/ctdb/tcp/ctdb_tcp.h +++ b/ctdb/tcp/ctdb_tcp.h @@ -32,21 +32,28 @@ struct ctdb_incoming { int fd; }; +struct ctdb_tcp_packet { + struct ctdb_tcp_packet *next, *prev; + uint8_t *data; + uint32_t length; +}; + /* state associated with one tcp node */ struct ctdb_tcp_node { int fd; + struct fd_event *fde; + struct ctdb_tcp_packet *queue; }; /* prototypes internal to tcp transport */ -void ctdb_tcp_node_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private); +void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private); void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private); +int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length); int ctdb_tcp_listen(struct ctdb_context *ctdb); void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, struct timeval t, void *private); - - diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c index 8287146583..fff6938e56 100644 --- a/ctdb/tcp/tcp_connect.c +++ b/ctdb/tcp/tcp_connect.c @@ -25,6 +25,14 @@ #include "ctdb_private.h" #include "ctdb_tcp.h" +static void set_nonblocking(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, v | O_NONBLOCK); +} + + /* called when socket becomes writeable on connect */ @@ -53,11 +61,10 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f return; } - printf("Established connection to %s:%u\n", - node->address.address, node->address.port); + printf("Established connection to %s\n", node->name); talloc_free(fde); - event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, - ctdb_tcp_node_read, node); + tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, + ctdb_tcp_node_write, node); } /* @@ -70,13 +77,11 @@ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, struct ctdb_tcp_node *tnode = talloc_get_type(node->private, struct ctdb_tcp_node); struct ctdb_context *ctdb = node->ctdb; - unsigned v; struct sockaddr_in sock_out; tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - v = fcntl(tnode->fd, F_GETFL, 0); - fcntl(tnode->fd, F_SETFL, v | O_NONBLOCK); + set_nonblocking(tnode->fd); inet_pton(AF_INET, node->address.address, &sock_out.sin_addr); sock_out.sin_port = htons(node->address.port); @@ -97,6 +102,16 @@ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, } /* + destroy a ctdb_incoming structure +*/ +static int ctdb_incoming_destructor(struct ctdb_incoming *in) +{ + close(in->fd); + in->fd = -1; + return 0; +} + +/* called when we get contacted by another node currently makes no attempt to check if the connection is really from a ctdb node in our cluster @@ -122,9 +137,13 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, in->fd = fd; in->ctdb = ctdb; + set_nonblocking(in->fd); + event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, ctdb_tcp_incoming_read, in); + talloc_set_destructor(in, ctdb_incoming_destructor); + printf("New incoming socket %d\n", in->fd); } diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c index d3ca1e581d..b98a92818d 100644 --- a/ctdb/tcp/tcp_init.c +++ b/ctdb/tcp/tcp_init.c @@ -63,8 +63,9 @@ int ctdb_tcp_add_node(struct ctdb_node *node) static const struct ctdb_methods ctdb_tcp_methods = { - .start = ctdb_tcp_start, - .add_node = ctdb_tcp_add_node + .start = ctdb_tcp_start, + .add_node = ctdb_tcp_add_node, + .queue_pkt = ctdb_tcp_queue_pkt }; /* diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c index d522472cb1..d6bc2db83e 100644 --- a/ctdb/tcp/tcp_io.c +++ b/ctdb/tcp/tcp_io.c @@ -26,15 +26,78 @@ #include "ctdb_tcp.h" /* + called when we fail to send a message to a node +*/ +static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + + /* flush the queue */ + while (tnode->queue) { + struct ctdb_tcp_packet *pkt = tnode->queue; + DLIST_REMOVE(tnode->queue, pkt); + talloc_free(pkt); + } + + /* start a new connect cycle to try to re-establish the + link */ + talloc_free(tnode->fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); +} + +/* called when socket becomes readable */ -void ctdb_tcp_node_read(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) +void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) { struct ctdb_node *node = talloc_get_type(private, struct ctdb_node); - printf("connection to node %s:%u is readable\n", - node->address.address, node->address.port); - event_set_fd_flags(fde, 0); + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + if (flags & EVENT_FD_READ) { + /* getting a read event on this fd in the current tcp model is + always an error, as we have separate read and write + sockets. In future we may combine them, but for now it must + mean that the socket is dead, so we try to reconnect */ + talloc_free(tnode->fde); + close(tnode->fd); + tnode->fd = -1; + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_connect, node); + return; + } + + while (tnode->queue) { + struct ctdb_tcp_packet *pkt = tnode->queue; + ssize_t n; + + n = write(tnode->fd, pkt->data, pkt->length); + + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_dead, node); + EVENT_FD_NOT_WRITEABLE(tnode->fde); + return; + } + if (n <= 0) return; + + if (n != pkt->length) { + pkt->length -= n; + pkt->data += n; + return; + } + + DLIST_REMOVE(tnode->queue, pkt); + talloc_free(pkt); + } + + EVENT_FD_NOT_WRITEABLE(tnode->fde); } @@ -45,12 +108,87 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private) { struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming); - char c; - printf("Incoming data\n"); - if (read(in->fd, &c, 1) <= 0) { - /* socket is dead */ - close(in->fd); + int num_ready = 0; + uint8_t *data; + + /* NOTE: we don't yet handle combined packets or partial + packets. Obviously that needed fixing, using a similar + scheme to the Samba4 packet layer */ + + if (ioctl(in->fd, FIONREAD, &num_ready) != 0 || + num_ready == 0) { + /* we've lost the link from another node. We don't + notify the upper layers, as we only want to trigger + a full node reorganisation when a send fails - that + allows nodes to restart without penalty as long as + the network is idle */ + talloc_free(in); + return; + } + + data = talloc_size(in, num_ready); + if (data == NULL) { + /* not much we can do except drop the socket */ talloc_free(in); + return; } + + if (read(in->fd, data, num_ready) != num_ready) { + talloc_free(in); + return; + } + + /* tell the ctdb layer above that we have a packet */ + in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready); + + talloc_free(data); } +/* + queue a packet for sending +*/ +int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length) +{ + struct ctdb_tcp_node *tnode = talloc_get_type(node->private, + struct ctdb_tcp_node); + struct ctdb_tcp_packet *pkt; + + if (tnode->fd == -1) { + ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name); + return -1; + } + + /* if the queue is empty then try an immediate write, avoiding + queue overhead. This relies on non-blocking sockets */ + if (tnode->queue == NULL) { + ssize_t n = write(tnode->fd, data, length); + if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) { + event_add_timed(node->ctdb->ev, node, timeval_zero(), + ctdb_tcp_node_dead, node); + /* yes, we report success, as the dead node is + handled via a separate event */ + return 0; + } + if (n > 0) { + data += n; + length -= n; + } + if (length == 0) return 0; + } + + pkt = talloc(tnode, struct ctdb_tcp_packet); + CTDB_NO_MEMORY(node->ctdb, pkt); + + pkt->data = talloc_memdup(pkt, data, length); + CTDB_NO_MEMORY(node->ctdb, pkt->data); + + pkt->length = length; + + if (tnode->queue == NULL) { + EVENT_FD_WRITEABLE(tnode->fde); + } + + DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *); + + return 0; +} |