diff options
| author | Volker Lendecke <vl@samba.org> | 2014-02-24 12:23:49 +0000 |
|---|---|---|
| committer | Jeremy Allison <jra@samba.org> | 2014-04-23 22:33:08 +0200 |
| commit | 29603d1cd9072bf32adfe13ee3d764fd13d12bd0 (patch) | |
| tree | f75503cedea76fe50fc46c70c21d3c3fdb3baaeb /source3 | |
| parent | 3e24e07467962436fa505f3b8e591f1af6cafdc0 (diff) | |
lib: Add messaging_dgm
Messaging based on unix domain datagram sockets
This makes every process participating in messaging bind on a unix domain
datagram socket, similar to the source4 based messaging. The details are a bit
different though:
Retry after EWOULDBLOCK is done with a blocking thread, not by polling. This
was the only way I could in experiments avoid a thundering herd or high load
under Linux in extreme overload situations like many thousands of processes
sending to one blocked process. If there are better ideas to do this in a
simple way, I'm more than happy to remove the pthreadpool dependency again.
There is only one socket per process, not per task. I don't think that per-task
sockets are really necessary, we can do filtering in user space. The message
contains the destination server_id, which contains the destination task_id. I
think we can rebase the source4 based imessaging on top of this, allowing
multiple imessaging contexts on top of one messaging_context. I had planned to
do this conversion before this goes in, but Jeremy convinced me that this has
value in itself :-)
Per socket we also create a fcntl-based lockfile to allow race-free cleanup of
orphaned sockets. This lockfile contains the unique_id, which in the future
will make the server_id.tdb obsolete.
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'source3')
| -rw-r--r-- | source3/include/messages.h | 5 | ||||
| -rw-r--r-- | source3/lib/messages.c | 8 | ||||
| -rw-r--r-- | source3/lib/messages_dgm.c | 409 | ||||
| -rw-r--r-- | source3/smbd/server.c | 6 | ||||
| -rwxr-xr-x | source3/wscript_build | 3 |
5 files changed, 427 insertions, 4 deletions
diff --git a/source3/include/messages.h b/source3/include/messages.h index 47c5f7a2d9..9437965724 100644 --- a/source3/include/messages.h +++ b/source3/include/messages.h @@ -91,6 +91,11 @@ struct messaging_backend { void *private_data; }; +NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx, + TALLOC_CTX *mem_ctx, + struct messaging_backend **presult); +NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid); + NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx, TALLOC_CTX *mem_ctx, struct messaging_backend **presult); diff --git a/source3/lib/messages.c b/source3/lib/messages.c index 4ff933dc6e..983fe699ed 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -197,10 +197,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, ctx->id = procid_self(); ctx->event_ctx = ev; - status = messaging_tdb_init(ctx, ctx, &ctx->local); + status = messaging_dgm_init(ctx, ctx, &ctx->local); if (!NT_STATUS_IS_OK(status)) { - DEBUG(2, ("messaging_tdb_init failed: %s\n", + DEBUG(2, ("messaging_dgm_init failed: %s\n", nt_errstr(status))); TALLOC_FREE(ctx); return NULL; @@ -245,9 +245,9 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx) msg_ctx->id = procid_self(); - status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local); + status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local); if (!NT_STATUS_IS_OK(status)) { - DEBUG(0, ("messaging_tdb_init failed: %s\n", + DEBUG(0, ("messaging_dgm_init failed: %s\n", nt_errstr(status))); return status; } diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c new file mode 100644 index 0000000000..8327f9d505 --- /dev/null +++ b/source3/lib/messages_dgm.c @@ -0,0 +1,409 @@ +/* + * Unix SMB/CIFS implementation. + * Samba internal messaging functions + * Copyright (C) 2013 by Volker Lendecke + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "includes.h" +#include "lib/util/data_blob.h" +#include "lib/util/debug.h" +#include "lib/unix_msg/unix_msg.h" +#include "system/filesys.h" +#include "messages.h" +#include "lib/param/param.h" +#include "poll_funcs/poll_funcs_tevent.h" +#include "unix_msg/unix_msg.h" +#include "librpc/gen_ndr/messaging.h" + +struct messaging_dgm_context { + struct messaging_context *msg_ctx; + struct poll_funcs msg_callbacks; + struct unix_msg_ctx *dgm_ctx; + char *cache_dir; + int lockfile_fd; +}; + +struct messaging_dgm_hdr { + uint32_t msg_version; + enum messaging_type msg_type; + struct server_id dst; + struct server_id src; +}; + +static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx, + struct server_id pid, int msg_type, + const DATA_BLOB *data, + struct messaging_backend *backend); +static void messaging_dgm_recv(struct unix_msg_ctx *ctx, + uint8_t *msg, size_t msg_len, + void *private_data); + +static int messaging_dgm_context_destructor(struct messaging_dgm_context *c); + +static int messaging_dgm_lockfile_create(const char *cache_dir, pid_t pid, + int *plockfile_fd, uint64_t unique) +{ + char buf[PATH_MAX]; + char *dir, *to_free; + ssize_t dirlen; + char *lockfile_name; + int lockfile_fd; + struct flock lck = {}; + int unique_len, ret; + ssize_t written; + bool ok; + + dirlen = full_path_tos(cache_dir, "lck", buf, sizeof(buf), + &dir, &to_free); + if (dirlen == -1) { + return ENOMEM; + } + + ok = directory_create_or_exist_strict(dir, sec_initial_uid(), 0755); + if (!ok) { + ret = errno; + DEBUG(1, ("%s: Could not create lock directory: %s\n", + __func__, strerror(ret))); + TALLOC_FREE(to_free); + return ret; + } + + lockfile_name = talloc_asprintf(talloc_tos(), "%s/%u", dir, + (unsigned)pid); + TALLOC_FREE(to_free); + if (lockfile_name == NULL) { + DEBUG(1, ("%s: talloc_asprintf failed\n", __func__)); + return ENOMEM; + } + + /* no O_EXCL, existence check is via the fcntl lock */ + + lockfile_fd = open(lockfile_name, O_NONBLOCK|O_CREAT|O_WRONLY, 0644); + if (lockfile_fd == -1) { + ret = errno; + DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno))); + goto fail_free; + } + + lck.l_type = F_WRLCK; + lck.l_whence = SEEK_SET; + lck.l_start = 0; + lck.l_len = 0; + + ret = fcntl(lockfile_fd, F_SETLK, &lck); + if (ret == -1) { + ret = errno; + DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret))); + goto fail_close; + } + + unique_len = snprintf(buf, sizeof(buf), "%"PRIu64, unique); + + /* shorten a potentially preexisting file */ + + ret = ftruncate(lockfile_fd, unique_len); + if (ret == -1) { + ret = errno; + DEBUG(1, ("%s: ftruncate failed: %s\n", __func__, + strerror(ret))); + goto fail_unlink; + } + + written = write(lockfile_fd, buf, unique_len); + if (written != unique_len) { + ret = errno; + DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret))); + goto fail_unlink; + } + + *plockfile_fd = lockfile_fd; + return 0; + +fail_unlink: + unlink(lockfile_name); +fail_close: + close(lockfile_fd); +fail_free: + TALLOC_FREE(lockfile_name); + return ret; +} + +static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid) +{ + fstring fname; + char buf[PATH_MAX]; + char *lockfile_name, *to_free; + ssize_t len; + int ret; + + fstr_sprintf(fname, "lck/%u", (unsigned)pid); + + len = full_path_tos(cache_dir, fname, buf, sizeof(buf), + &lockfile_name, &to_free); + if (len == -1) { + return ENOMEM; + } + + ret = unlink(lockfile_name); + if (ret == -1) { + ret = errno; + DEBUG(10, ("%s: unlink failed: %s\n", __func__, + strerror(ret))); + } + TALLOC_FREE(to_free); + return ret; +} + +NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx, + TALLOC_CTX *mem_ctx, + struct messaging_backend **presult) +{ + struct messaging_backend *result; + struct messaging_dgm_context *ctx; + struct server_id pid = messaging_server_id(msg_ctx); + int ret; + bool ok; + const char *cache_dir; + char *socket_dir, *socket_name; + uint64_t cookie; + + cache_dir = lp_cache_directory(); + if (cache_dir == NULL) { + NTSTATUS status = map_nt_error_from_unix(errno); + return status; + } + + result = talloc(mem_ctx, struct messaging_backend); + if (result == NULL) { + goto fail_nomem; + } + ctx = talloc_zero(result, struct messaging_dgm_context); + if (ctx == NULL) { + goto fail_nomem; + } + + result->private_data = ctx; + result->send_fn = messaging_dgm_send; + ctx->msg_ctx = msg_ctx; + + ctx->cache_dir = talloc_strdup(ctx, cache_dir); + if (ctx->cache_dir == NULL) { + goto fail_nomem; + } + socket_dir = talloc_asprintf(ctx, "%s/msg", cache_dir); + if (socket_dir == NULL) { + goto fail_nomem; + } + socket_name = talloc_asprintf(ctx, "%s/%u", socket_dir, + (unsigned)pid.pid); + if (socket_name == NULL) { + goto fail_nomem; + } + + sec_init(); + + ret = messaging_dgm_lockfile_create(cache_dir, pid.pid, + &ctx->lockfile_fd, pid.unique_id); + if (ret != 0) { + DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n", + __func__, strerror(ret))); + TALLOC_FREE(result); + return map_nt_error_from_unix(ret); + } + + poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx); + + ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(), + 0700); + if (!ok) { + DEBUG(1, ("Could not create socket directory\n")); + TALLOC_FREE(result); + return NT_STATUS_ACCESS_DENIED; + } + TALLOC_FREE(socket_dir); + + unlink(socket_name); + + generate_random_buffer((uint8_t *)&cookie, sizeof(cookie)); + + ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie, + messaging_dgm_recv, ctx, &ctx->dgm_ctx); + TALLOC_FREE(socket_name); + if (ret != 0) { + DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret))); + TALLOC_FREE(result); + return map_nt_error_from_unix(ret); + } + talloc_set_destructor(ctx, messaging_dgm_context_destructor); + + *presult = result; + return NT_STATUS_OK; + +fail_nomem: + TALLOC_FREE(result); + return NT_STATUS_NO_MEMORY; +} + +static int messaging_dgm_context_destructor(struct messaging_dgm_context *c) +{ + struct server_id pid = messaging_server_id(c->msg_ctx); + + /* + * First delete the socket to avoid races. The lockfile is the + * indicator that we're still around. + */ + unix_msg_free(c->dgm_ctx); + + if (getpid() == pid.pid) { + (void)messaging_dgm_lockfile_remove(c->cache_dir, pid.pid); + } + close(c->lockfile_fd); + return 0; +} + +static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx, + struct server_id pid, int msg_type, + const DATA_BLOB *data, + struct messaging_backend *backend) +{ + struct messaging_dgm_context *ctx = talloc_get_type_abort( + backend->private_data, struct messaging_dgm_context); + fstring pid_str; + char buf[PATH_MAX]; + char *dst_sock, *to_free; + struct messaging_dgm_hdr hdr; + struct iovec iov[2]; + ssize_t pathlen; + int ret; + + fstr_sprintf(pid_str, "msg/%u", (unsigned)pid.pid); + + pathlen = full_path_tos(ctx->cache_dir, pid_str, buf, sizeof(buf), + &dst_sock, &to_free); + if (pathlen == -1) { + return NT_STATUS_NO_MEMORY; + } + + hdr.msg_version = MESSAGE_VERSION; + hdr.msg_type = msg_type & MSG_TYPE_MASK; + hdr.dst = pid; + hdr.src = msg_ctx->id; + + DEBUG(10, ("%s: Sending message 0x%x len %u to %s\n", __func__, + (unsigned)hdr.msg_type, (unsigned)data->length, + server_id_str(talloc_tos(), &pid))); + + iov[0].iov_base = &hdr; + iov[0].iov_len = sizeof(hdr); + iov[1].iov_base = data->data; + iov[1].iov_len = data->length; + + become_root(); + ret = unix_msg_send(ctx->dgm_ctx, dst_sock, iov, ARRAY_SIZE(iov)); + unbecome_root(); + + TALLOC_FREE(to_free); + + if (ret != 0) { + return map_nt_error_from_unix(ret); + } + return NT_STATUS_OK; +} + +static void messaging_dgm_recv(struct unix_msg_ctx *ctx, + uint8_t *msg, size_t msg_len, + void *private_data) +{ + struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort( + private_data, struct messaging_dgm_context); + struct messaging_dgm_hdr *hdr; + struct messaging_rec rec; + + if (msg_len < sizeof(*hdr)) { + DEBUG(1, ("message too short: %u\n", (unsigned)msg_len)); + return; + } + + /* + * unix_msg guarantees alignment, so we can cast here + */ + hdr = (struct messaging_dgm_hdr *)msg; + + rec.msg_version = hdr->msg_version; + rec.msg_type = hdr->msg_type; + rec.dest = hdr->dst; + rec.src = hdr->src; + rec.buf.data = msg + sizeof(*hdr); + rec.buf.length = msg_len - sizeof(*hdr); + + DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__, + (unsigned)hdr->msg_type, (unsigned)rec.buf.length, + server_id_str(talloc_tos(), &rec.src))); + + messaging_dispatch_rec(dgm_ctx->msg_ctx, &rec); +} + +NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid) +{ + struct messaging_dgm_context *ctx = talloc_get_type_abort( + msg_ctx->local->private_data, struct messaging_dgm_context); + char *lockfile_name, *socket_name; + int fd, ret; + struct flock lck = {}; + NTSTATUS status = NT_STATUS_OK; + + lockfile_name = talloc_asprintf(talloc_tos(), "%s/lck/%u", + ctx->cache_dir, (unsigned)pid); + if (lockfile_name == NULL) { + return NT_STATUS_NO_MEMORY; + } + socket_name = talloc_asprintf(lockfile_name, "%s/msg/%u", + ctx->cache_dir, (unsigned)pid); + if (socket_name == NULL) { + TALLOC_FREE(lockfile_name); + return NT_STATUS_NO_MEMORY; + } + + fd = open(lockfile_name, O_NONBLOCK|O_WRONLY, 0); + if (fd == -1) { + status = map_nt_error_from_unix(errno); + DEBUG(10, ("%s: open(%s) failed: %s\n", __func__, + lockfile_name, strerror(errno))); + return status; + } + + lck.l_type = F_WRLCK; + lck.l_whence = SEEK_SET; + lck.l_start = 0; + lck.l_len = 0; + + ret = fcntl(fd, F_SETLK, &lck); + if (ret != 0) { + status = map_nt_error_from_unix(errno); + DEBUG(10, ("%s: Could not get lock: %s\n", __func__, + strerror(errno))); + TALLOC_FREE(lockfile_name); + close(fd); + return status; + } + + (void)unlink(socket_name); + (void)unlink(lockfile_name); + (void)close(fd); + + TALLOC_FREE(lockfile_name); + return NT_STATUS_OK; +} diff --git a/source3/smbd/server.c b/source3/smbd/server.c index 96580ba4e2..5ff370d1ed 100644 --- a/source3/smbd/server.c +++ b/source3/smbd/server.c @@ -465,6 +465,8 @@ static void remove_child_pid(struct smbd_parent_context *parent, } if (unclean_shutdown) { + NTSTATUS status; + /* a child terminated uncleanly so tickle all processes to see if they can grab any of the pending locks @@ -488,6 +490,10 @@ static void remove_child_pid(struct smbd_parent_context *parent, * terminated uncleanly. */ messaging_cleanup_server(parent->msg_ctx, child_id); + + status = messaging_dgm_cleanup(parent->msg_ctx, pid); + DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n", + __func__, nt_errstr(status))); } if (!serverid_deregister(child_id)) { diff --git a/source3/wscript_build b/source3/wscript_build index 4d261c645f..2ba7a96877 100755 --- a/source3/wscript_build +++ b/source3/wscript_build @@ -314,6 +314,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB', bld.SAMBA3_SUBSYSTEM('samba3core', source='''lib/messages.c lib/messages_local.c + lib/messages_dgm.c lib/util_cluster.c lib/id_cache.c lib/talloc_dict.c @@ -352,6 +353,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core', UTIL_PW SAMBA_VERSION PTHREADPOOL + UNIX_MSG + POLL_FUNCS_TEVENT interfaces param dbwrap |
