diff options
Diffstat (limited to 'source4/lib/messaging/messaging.c')
-rw-r--r-- | source4/lib/messaging/messaging.c | 442 |
1 files changed, 106 insertions, 336 deletions
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c index 0b4e109c76..cad2d64233 100644 --- a/source4/lib/messaging/messaging.c +++ b/source4/lib/messaging/messaging.c @@ -33,6 +33,9 @@ #include "../lib/util/tevent_ntstatus.h" #include "lib/param/param.h" #include "lib/util/server_id_db.h" +#include "../source3/lib/messages_dgm.h" +#include "../source3/lib/messages_dgm_ref.h" +#include "../source3/lib/messages_util.h" #include <tdb.h> /* change the message version with any incompatible changes in the protocol */ @@ -51,10 +54,10 @@ struct irpc_request { }; struct imessaging_context { + struct imessaging_context *prev, *next; struct server_id server_id; - struct socket_context *sock; - const char *base_path; - const char *path; + const char *sock_dir; + const char *lock_dir; struct dispatch_fn **dispatch; uint32_t num_types; struct idr_context *dispatch_tree; @@ -64,10 +67,7 @@ struct imessaging_context { struct idr_context *idr; struct server_id_db *names; struct timeval start_time; - struct tevent_timer *retry_te; - struct { - struct tevent_fd *fde; - } event; + void *msg_dgm_ref; }; /* we have a linked list of dispatch handlers for each msg_type that @@ -126,248 +126,20 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg, return NT_STATUS_OK; } -/* - return the path to a messaging socket -*/ -static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id) -{ - struct server_id_buf buf; - - return talloc_asprintf(msg, "%s/msg.%s", msg->base_path, - server_id_str_buf(server_id, &buf)); -} - -/* - dispatch a fully received message - - note that this deliberately can match more than one message handler - per message. That allows a single messasging context to register - (for example) a debug handler for more than one piece of code -*/ -static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec) +static struct dispatch_fn *imessaging_find_dispatch( + struct imessaging_context *msg, uint32_t msg_type) { - struct dispatch_fn *d, *next; - /* temporary IDs use an idtree, the rest use a array of pointers */ - if (rec->header->msg_type >= MSG_TMP_BASE) { - d = (struct dispatch_fn *)idr_find(msg->dispatch_tree, - rec->header->msg_type); - } else if (rec->header->msg_type < msg->num_types) { - d = msg->dispatch[rec->header->msg_type]; - } else { - d = NULL; - } - - for (; d; d = next) { - DATA_BLOB data; - next = d->next; - data.data = rec->packet.data + sizeof(*rec->header); - data.length = rec->header->length; - d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data); - } - rec->header->length = 0; -} - -/* - handler for messages that arrive from other nodes in the cluster -*/ -static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet) -{ - struct imessaging_rec *rec; - - rec = talloc(msg, struct imessaging_rec); - if (rec == NULL) { - smb_panic("Unable to allocate imessaging_rec"); - } - - rec->msg = msg; - rec->path = msg->path; - rec->header = (struct imessaging_header *)packet.data; - rec->packet = packet; - rec->retries = 0; - - if (packet.length != sizeof(*rec->header) + rec->header->length) { - DEBUG(0,("messaging: bad message header size %d should be %d\n", - rec->header->length, (int)(packet.length - sizeof(*rec->header)))); - talloc_free(rec); - return; - } - - imessaging_dispatch(msg, rec); - talloc_free(rec); -} - - - -/* - try to send the message -*/ -static NTSTATUS try_send(struct imessaging_rec *rec) -{ - struct imessaging_context *msg = rec->msg; - size_t nsent; - void *priv; - NTSTATUS status; - struct socket_address *path; - - /* rec->path is the path of the *other* socket, where we want - * this to end up */ - path = socket_address_from_strings(msg, msg->sock->backend_name, - rec->path, 0); - if (!path) { - return NT_STATUS_NO_MEMORY; - } - - /* we send with privileges so messages work from any context */ - priv = root_privileges(); - status = socket_sendto(msg->sock, &rec->packet, &nsent, path); - talloc_free(path); - talloc_free(priv); - - return status; -} - -/* - retry backed off messages -*/ -static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te, - struct timeval t, void *private_data) -{ - struct imessaging_context *msg = talloc_get_type(private_data, - struct imessaging_context); - msg->retry_te = NULL; - - /* put the messages back on the main queue */ - while (msg->retry_queue) { - struct imessaging_rec *rec = msg->retry_queue; - DLIST_REMOVE(msg->retry_queue, rec); - DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *); - } - - TEVENT_FD_WRITEABLE(msg->event.fde); -} - -/* - handle a socket write event -*/ -static void imessaging_send_handler(struct imessaging_context *msg, struct tevent_context *ev) -{ - while (msg->pending) { - struct imessaging_rec *rec = msg->pending; - NTSTATUS status; - status = try_send(rec); - if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { - rec->retries++; - if (rec->retries > 3) { - /* we're getting continuous write errors - - backoff this record */ - DLIST_REMOVE(msg->pending, rec); - DLIST_ADD_END(msg->retry_queue, rec, - struct imessaging_rec *); - if (msg->retry_te == NULL) { - msg->retry_te = - tevent_add_timer(ev, msg, - timeval_current_ofs(1, 0), - msg_retry_timer, msg); - } - } - break; - } - rec->retries = 0; - if (!NT_STATUS_IS_OK(status)) { - TALLOC_CTX *tmp_ctx = talloc_new(msg); - DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n", - server_id_str(tmp_ctx, &rec->header->from), - server_id_str(tmp_ctx, &rec->header->to), - rec->header->msg_type, - nt_errstr(status))); - talloc_free(tmp_ctx); - } - DLIST_REMOVE(msg->pending, rec); - talloc_free(rec); - } - if (msg->pending == NULL) { - TEVENT_FD_NOT_WRITEABLE(msg->event.fde); - } -} - -/* - handle a new incoming packet -*/ -static void imessaging_recv_handler(struct imessaging_context *msg, struct tevent_context *ev) -{ - struct imessaging_rec *rec; - NTSTATUS status; - DATA_BLOB packet; - size_t msize; - - /* see how many bytes are in the next packet */ - status = socket_pending(msg->sock, &msize); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(0,("socket_pending failed in messaging - %s\n", - nt_errstr(status))); - return; - } - - packet = data_blob_talloc(msg, NULL, msize); - if (packet.data == NULL) { - /* assume this is temporary and retry */ - return; - } - - status = socket_recv(msg->sock, packet.data, msize, &msize); - if (!NT_STATUS_IS_OK(status)) { - data_blob_free(&packet); - return; - } - - if (msize < sizeof(*rec->header)) { - DEBUG(0,("messaging: bad message of size %d\n", (int)msize)); - data_blob_free(&packet); - return; - } - - rec = talloc(msg, struct imessaging_rec); - if (rec == NULL) { - smb_panic("Unable to allocate imessaging_rec"); - } - - talloc_steal(rec, packet.data); - rec->msg = msg; - rec->path = msg->path; - rec->header = (struct imessaging_header *)packet.data; - rec->packet = packet; - rec->retries = 0; - - if (msize != sizeof(*rec->header) + rec->header->length) { - DEBUG(0,("messaging: bad message header size %d should be %d\n", - rec->header->length, (int)(msize - sizeof(*rec->header)))); - talloc_free(rec); - return; - } - - imessaging_dispatch(msg, rec); - talloc_free(rec); -} - - -/* - handle a socket event -*/ -static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde, - uint16_t flags, void *private_data) -{ - struct imessaging_context *msg = talloc_get_type(private_data, - struct imessaging_context); - if (flags & TEVENT_FD_WRITE) { - imessaging_send_handler(msg, ev); + if (msg_type >= MSG_TMP_BASE) { + return (struct dispatch_fn *)idr_find(msg->dispatch_tree, + msg_type); } - if (flags & TEVENT_FD_READ) { - imessaging_recv_handler(msg, ev); + if (msg_type < msg->num_types) { + return msg->dispatch[msg_type]; } + return NULL; } - /* Register a dispatch function for a particular message type. */ @@ -458,64 +230,40 @@ void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, vo NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server, uint32_t msg_type, const DATA_BLOB *data) { - struct imessaging_rec *rec; - NTSTATUS status; - size_t dlength = data?data->length:0; + uint8_t hdr[MESSAGE_HDR_LENGTH]; + struct iovec iov[2]; + int num_iov, ret; + pid_t pid; + void *priv; - rec = talloc(msg, struct imessaging_rec); - if (rec == NULL) { - return NT_STATUS_NO_MEMORY; + if (!cluster_node_equal(&msg->server_id, &server)) { + /* No cluster in source4... */ + return NT_STATUS_OK; } - rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength); - if (rec->packet.data == NULL) { - talloc_free(rec); - return NT_STATUS_NO_MEMORY; - } + message_hdr_put(hdr, msg_type, msg->server_id, server); - rec->retries = 0; - rec->msg = msg; - rec->header = (struct imessaging_header *)rec->packet.data; - /* zero padding */ - ZERO_STRUCTP(rec->header); - rec->header->version = IMESSAGING_VERSION; - rec->header->msg_type = msg_type; - rec->header->from = msg->server_id; - rec->header->to = server; - rec->header->length = dlength; - if (dlength != 0) { - memcpy(rec->packet.data + sizeof(*rec->header), - data->data, dlength); - } + iov[0] = (struct iovec) { .iov_base = &hdr, .iov_len = sizeof(hdr) }; + num_iov = 1; - if (!cluster_node_equal(&msg->server_id, &server)) { - /* the destination is on another node - dispatch via - the cluster layer */ - status = cluster_message_send(server, &rec->packet); - talloc_free(rec); - return status; + if (data != NULL) { + iov[1] = (struct iovec) { .iov_base = data->data, + .iov_len = data->length }; + num_iov += 1; } - rec->path = imessaging_path(msg, server); - talloc_steal(rec, rec->path); - - if (msg->pending != NULL) { - status = STATUS_MORE_ENTRIES; - } else { - status = try_send(rec); + pid = server.pid; + if (pid == 0) { + pid = getpid(); } - if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { - if (msg->pending == NULL) { - TEVENT_FD_WRITEABLE(msg->event.fde); - } - DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *); - return NT_STATUS_OK; + priv = root_privileges(); + ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0); + TALLOC_FREE(priv); + if (ret != 0) { + return map_nt_error_from_unix_common(ret); } - - talloc_free(rec); - - return status; + return NT_STATUS_OK; } /* @@ -541,12 +289,13 @@ int imessaging_cleanup(struct imessaging_context *msg) if (!msg) { return 0; } - - DEBUG(5,("imessaging: cleaning up %s\n", msg->path)); - unlink(msg->path); return 0; } +static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len, + int *fds, size_t num_fds, + void *private_data); + /* create the listening socket and setup the dispatcher @@ -562,9 +311,8 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, bool auto_remove) { struct imessaging_context *msg; - NTSTATUS status; - struct socket_address *path; bool ok; + int ret; if (ev == NULL) { return NULL; @@ -575,26 +323,31 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, return NULL; } - /* setup a handler for messages from other cluster nodes, if appropriate */ - status = cluster_message_init(msg, server_id, cluster_message_handler); - if (!NT_STATUS_IS_OK(status)) { - goto fail; - } - /* create the messaging directory if needed */ - msg->base_path = lpcfg_imessaging_path(msg, lp_ctx); - if (msg->base_path == NULL) { + msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "sock"); + if (msg->sock_dir == NULL) { + goto fail; + } + ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700); + if (!ok) { goto fail; } - ok = directory_create_or_exist_strict(msg->base_path, geteuid(), 0700); + msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg"); + if (msg->lock_dir == NULL) { + goto fail; + } + ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755); if (!ok) { goto fail; } - msg->path = imessaging_path(msg, server_id); - if (msg->path == NULL) { + msg->msg_dgm_ref = messaging_dgm_ref( + msg, ev, server_id.unique_id, msg->sock_dir, msg->lock_dir, + imessaging_dgm_recv, msg, &ret); + + if (msg->msg_dgm_ref == NULL) { goto fail; } @@ -612,41 +365,13 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx, msg->start_time = timeval_current(); msg->names = server_id_db_init( - msg, server_id, msg->base_path, 0, + msg, server_id, msg->lock_dir, 0, TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST| lpcfg_tdb_flags(lp_ctx, 0)); if (msg->names == NULL) { goto fail; } - status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); - if (!NT_STATUS_IS_OK(status)) { - goto fail; - } - - /* by stealing here we ensure that the socket is cleaned up (and even - deleted) on exit */ - talloc_steal(msg, msg->sock); - - path = socket_address_from_strings(msg, msg->sock->backend_name, - msg->path, 0); - if (!path) { - goto fail; - } - - status = socket_listen(msg->sock, path, 50, 0); - if (!NT_STATUS_IS_OK(status)) { - DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status))); - goto fail; - } - - /* it needs to be non blocking for sends */ - set_blocking(socket_get_fd(msg->sock), false); - - msg->event.fde = tevent_add_fd(ev, msg, socket_get_fd(msg->sock), - TEVENT_FD_READ, imessaging_handler, msg); - tevent_fd_set_auto_close(msg->event.fde); - if (auto_remove) { talloc_set_destructor(msg, imessaging_cleanup); } @@ -661,6 +386,51 @@ fail: return NULL; } +static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len, + int *fds, size_t num_fds, + void *private_data) +{ + struct imessaging_context *msg = talloc_get_type_abort( + private_data, struct imessaging_context); + uint32_t msg_type; + struct server_id src, dst; + struct server_id_buf srcbuf, dstbuf; + DATA_BLOB data; + + if (buf_len < MESSAGE_HDR_LENGTH) { + /* Invalid message, ignore */ + return; + } + + message_hdr_get(&msg_type, &src, &dst, buf); + + data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH); + data.length = buf_len - MESSAGE_HDR_LENGTH; + + if ((cluster_id_equal(&dst, &msg->server_id)) || + ((dst.task_id == 0) && (msg->server_id.pid == 0))) { + struct dispatch_fn *d, *next; + + DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n", + __func__, + server_id_str_buf(dst, &dstbuf), + server_id_str_buf(msg->server_id, &srcbuf), + (unsigned)msg_type)); + + d = imessaging_find_dispatch(msg, msg_type); + + for (; d; d = next) { + next = d->next; + d->fn(msg, d->private_data, d->msg_type, src, &data); + } + } else { + DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n", + __func__, (unsigned)msg_type, + server_id_str_buf(dst, &dstbuf), + server_id_str_buf(msg->server_id, &srcbuf))); + } +} + /* A hack, for the short term until we get 'client only' messaging in place */ |