summaryrefslogtreecommitdiffstats
path: root/source3/lib
diff options
context:
space:
mode:
authorVolker Lendecke <vl@samba.org>2013-12-30 11:26:52 +0100
committerVolker Lendecke <vl@samba.org>2014-01-21 08:10:41 +0100
commit5f3ccfc16b7c64d2de109ba26d92d22ae1010882 (patch)
treeff43659880bca36e4a0a33aef2a85bcc66a4abdd /source3/lib
parenteee450fec2f7cb5f45c47162fd5b7c0717978598 (diff)
downloadsamba-5f3ccfc16b7c64d2de109ba26d92d22ae1010882.tar.gz
samba-5f3ccfc16b7c64d2de109ba26d92d22ae1010882.tar.xz
samba-5f3ccfc16b7c64d2de109ba26d92d22ae1010882.zip
messaging3: Add messaging_read_send/recv
This is made to replace the msg_channel abstraction. msg_channel was created to not miss any messages. For this, some complex queueing was installed. This complexity has caused quite a few problems in the past (see bug 10284 for example). messaging_read_send/recv is able to achieve the same goal with a lot less complexity. The messaging_read_send atomically installs the reader into the messaging_context, we will not miss any messages while this installed. messaging_send_recv will deinstall that listener, but in the callback function you can directly call messaging_read_send again without going through the tevent_loop_once. As long as this is always made sure, no messages will be lost. Signed-off-by: Volker Lendecke <vl@samba.org> Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'source3/lib')
-rw-r--r--source3/lib/dbwrap/dbwrap_watch.c1
-rw-r--r--source3/lib/messages.c126
2 files changed, 126 insertions, 1 deletions
diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c
index 7bdcd998f89..e65dbf41bde 100644
--- a/source3/lib/dbwrap/dbwrap_watch.c
+++ b/source3/lib/dbwrap/dbwrap_watch.c
@@ -22,7 +22,6 @@
#include "dbwrap/dbwrap.h"
#include "dbwrap_watch.h"
#include "dbwrap_open.h"
-#include "msg_channel.h"
#include "lib/util/util_tdb.h"
#include "lib/util/tevent_ntstatus.h"
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ba473ae8ae2..58f45d3b1cf 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -49,6 +49,7 @@
#include "dbwrap/dbwrap.h"
#include "serverid.h"
#include "messages.h"
+#include "lib/util/tevent_unix.h"
struct messaging_callback {
struct messaging_callback *prev, *next;
@@ -425,6 +426,120 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
return messaging_send(msg_ctx, server, msg_type, &blob);
}
+static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
+ struct messaging_rec *rec)
+{
+ struct messaging_rec *result;
+
+ result = talloc_pooled_object(mem_ctx, struct messaging_rec,
+ 1, rec->buf.length);
+ if (result == NULL) {
+ return NULL;
+ }
+ *result = *rec;
+
+ /* Doesn't fail, see talloc_pooled_object */
+
+ result->buf.data = talloc_memdup(result, rec->buf.data,
+ rec->buf.length);
+ return result;
+}
+
+struct messaging_read_state {
+ struct tevent_context *ev;
+ struct messaging_context *msg_ctx;
+ uint32_t msg_type;
+ struct messaging_rec *rec;
+};
+
+static void messaging_read_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ uint32_t msg_type)
+{
+ struct tevent_req *req;
+ struct messaging_read_state *state;
+ size_t waiters_len;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct messaging_read_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->msg_ctx = msg_ctx;
+ state->msg_type = msg_type;
+
+ waiters_len = talloc_array_length(msg_ctx->waiters);
+
+ if (waiters_len == msg_ctx->num_waiters) {
+ struct tevent_req **tmp;
+
+ tmp = talloc_realloc(msg_ctx, msg_ctx->waiters,
+ struct tevent_req *, waiters_len+1);
+ if (tevent_req_nomem(tmp, req)) {
+ return tevent_req_post(req, ev);
+ }
+ msg_ctx->waiters = tmp;
+ }
+
+ msg_ctx->waiters[msg_ctx->num_waiters] = req;
+ msg_ctx->num_waiters += 1;
+ tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+
+ return req;
+}
+
+static void messaging_read_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+ struct messaging_context *msg_ctx = state->msg_ctx;
+ struct tevent_req **waiters = msg_ctx->waiters;
+ unsigned i;
+
+ tevent_req_set_cleanup_fn(req, NULL);
+
+ for (i=0; i<msg_ctx->num_waiters; i++) {
+ if (waiters[i] == req) {
+ waiters[i] = waiters[msg_ctx->num_waiters-1];
+ msg_ctx->num_waiters -= 1;
+ return;
+ }
+ }
+}
+
+static void messaging_read_done(struct tevent_req *req, struct messaging_rec *rec)
+{
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+
+ state->rec = messaging_rec_dup(state, rec);
+ if (tevent_req_nomem(state->rec, req)) {
+ return;
+ }
+ tevent_req_done(req);
+}
+
+int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+ struct messaging_rec **presult)
+{
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ tevent_req_received(req);
+ return err;
+ }
+ *presult = talloc_move(mem_ctx, &state->rec);
+ return 0;
+}
+
/*
Dispatch one messaging_rec
*/
@@ -432,6 +547,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
struct messaging_rec *rec)
{
struct messaging_callback *cb, *next;
+ unsigned i;
for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
next = cb->next;
@@ -445,6 +561,16 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
the same message type */
}
}
+
+ for (i=0; i<msg_ctx->num_waiters; i++) {
+ struct tevent_req *req = msg_ctx->waiters[i];
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+
+ if (state->msg_type == rec->msg_type) {
+ messaging_read_done(req, rec);
+ }
+ }
return;
}