diff options
author | Volker Lendecke <vl@samba.org> | 2013-12-30 11:26:52 +0100 |
---|---|---|
committer | Volker Lendecke <vl@samba.org> | 2014-01-21 08:10:41 +0100 |
commit | 5f3ccfc16b7c64d2de109ba26d92d22ae1010882 (patch) | |
tree | ff43659880bca36e4a0a33aef2a85bcc66a4abdd /source3/lib | |
parent | eee450fec2f7cb5f45c47162fd5b7c0717978598 (diff) | |
download | samba-5f3ccfc16b7c64d2de109ba26d92d22ae1010882.tar.gz samba-5f3ccfc16b7c64d2de109ba26d92d22ae1010882.tar.xz samba-5f3ccfc16b7c64d2de109ba26d92d22ae1010882.zip |
messaging3: Add messaging_read_send/recv
This is made to replace the msg_channel abstraction.
msg_channel was created to not miss any messages. For this, some
complex queueing was installed. This complexity has caused quite a
few problems in the past (see bug 10284 for example).
messaging_read_send/recv is able to achieve the same goal with a
lot less complexity. The messaging_read_send atomically installs
the reader into the messaging_context, we will not miss any messages
while this installed. messaging_send_recv will deinstall that
listener, but in the callback function you can directly call
messaging_read_send again without going through the tevent_loop_once.
As long as this is always made sure, no messages will be lost.
Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
Diffstat (limited to 'source3/lib')
-rw-r--r-- | source3/lib/dbwrap/dbwrap_watch.c | 1 | ||||
-rw-r--r-- | source3/lib/messages.c | 126 |
2 files changed, 126 insertions, 1 deletions
diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c index 7bdcd998f89..e65dbf41bde 100644 --- a/source3/lib/dbwrap/dbwrap_watch.c +++ b/source3/lib/dbwrap/dbwrap_watch.c @@ -22,7 +22,6 @@ #include "dbwrap/dbwrap.h" #include "dbwrap_watch.h" #include "dbwrap_open.h" -#include "msg_channel.h" #include "lib/util/util_tdb.h" #include "lib/util/tevent_ntstatus.h" diff --git a/source3/lib/messages.c b/source3/lib/messages.c index ba473ae8ae2..58f45d3b1cf 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -49,6 +49,7 @@ #include "dbwrap/dbwrap.h" #include "serverid.h" #include "messages.h" +#include "lib/util/tevent_unix.h" struct messaging_callback { struct messaging_callback *prev, *next; @@ -425,6 +426,120 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx, return messaging_send(msg_ctx, server, msg_type, &blob); } +static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx, + struct messaging_rec *rec) +{ + struct messaging_rec *result; + + result = talloc_pooled_object(mem_ctx, struct messaging_rec, + 1, rec->buf.length); + if (result == NULL) { + return NULL; + } + *result = *rec; + + /* Doesn't fail, see talloc_pooled_object */ + + result->buf.data = talloc_memdup(result, rec->buf.data, + rec->buf.length); + return result; +} + +struct messaging_read_state { + struct tevent_context *ev; + struct messaging_context *msg_ctx; + uint32_t msg_type; + struct messaging_rec *rec; +}; + +static void messaging_read_cleanup(struct tevent_req *req, + enum tevent_req_state req_state); + +struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx, + struct tevent_context *ev, + struct messaging_context *msg_ctx, + uint32_t msg_type) +{ + struct tevent_req *req; + struct messaging_read_state *state; + size_t waiters_len; + + req = tevent_req_create(mem_ctx, &state, + struct messaging_read_state); + if (req == NULL) { + return NULL; + } + state->ev = ev; + state->msg_ctx = msg_ctx; + state->msg_type = msg_type; + + waiters_len = talloc_array_length(msg_ctx->waiters); + + if (waiters_len == msg_ctx->num_waiters) { + struct tevent_req **tmp; + + tmp = talloc_realloc(msg_ctx, msg_ctx->waiters, + struct tevent_req *, waiters_len+1); + if (tevent_req_nomem(tmp, req)) { + return tevent_req_post(req, ev); + } + msg_ctx->waiters = tmp; + } + + msg_ctx->waiters[msg_ctx->num_waiters] = req; + msg_ctx->num_waiters += 1; + tevent_req_set_cleanup_fn(req, messaging_read_cleanup); + + return req; +} + +static void messaging_read_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct messaging_read_state *state = tevent_req_data( + req, struct messaging_read_state); + struct messaging_context *msg_ctx = state->msg_ctx; + struct tevent_req **waiters = msg_ctx->waiters; + unsigned i; + + tevent_req_set_cleanup_fn(req, NULL); + + for (i=0; i<msg_ctx->num_waiters; i++) { + if (waiters[i] == req) { + waiters[i] = waiters[msg_ctx->num_waiters-1]; + msg_ctx->num_waiters -= 1; + return; + } + } +} + +static void messaging_read_done(struct tevent_req *req, struct messaging_rec *rec) +{ + struct messaging_read_state *state = tevent_req_data( + req, struct messaging_read_state); + + state->rec = messaging_rec_dup(state, rec); + if (tevent_req_nomem(state->rec, req)) { + return; + } + tevent_req_done(req); +} + +int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx, + struct messaging_rec **presult) +{ + struct messaging_read_state *state = tevent_req_data( + req, struct messaging_read_state); + int err; + + if (tevent_req_is_unix_error(req, &err)) { + tevent_req_received(req); + return err; + } + *presult = talloc_move(mem_ctx, &state->rec); + return 0; +} + /* Dispatch one messaging_rec */ @@ -432,6 +547,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx, struct messaging_rec *rec) { struct messaging_callback *cb, *next; + unsigned i; for (cb = msg_ctx->callbacks; cb != NULL; cb = next) { next = cb->next; @@ -445,6 +561,16 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx, the same message type */ } } + + for (i=0; i<msg_ctx->num_waiters; i++) { + struct tevent_req *req = msg_ctx->waiters[i]; + struct messaging_read_state *state = tevent_req_data( + req, struct messaging_read_state); + + if (state->msg_type == rec->msg_type) { + messaging_read_done(req, rec); + } + } return; } |