summaryrefslogtreecommitdiffstats
path: root/source3/lib/messages.c
diff options
context:
space:
mode:
Diffstat (limited to 'source3/lib/messages.c')
-rw-r--r--source3/lib/messages.c126
1 files changed, 126 insertions, 0 deletions
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ba473ae8ae2..58f45d3b1cf 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -49,6 +49,7 @@
#include "dbwrap/dbwrap.h"
#include "serverid.h"
#include "messages.h"
+#include "lib/util/tevent_unix.h"
struct messaging_callback {
struct messaging_callback *prev, *next;
@@ -425,6 +426,120 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
return messaging_send(msg_ctx, server, msg_type, &blob);
}
+static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
+ struct messaging_rec *rec)
+{
+ struct messaging_rec *result;
+
+ result = talloc_pooled_object(mem_ctx, struct messaging_rec,
+ 1, rec->buf.length);
+ if (result == NULL) {
+ return NULL;
+ }
+ *result = *rec;
+
+ /* Doesn't fail, see talloc_pooled_object */
+
+ result->buf.data = talloc_memdup(result, rec->buf.data,
+ rec->buf.length);
+ return result;
+}
+
+struct messaging_read_state {
+ struct tevent_context *ev;
+ struct messaging_context *msg_ctx;
+ uint32_t msg_type;
+ struct messaging_rec *rec;
+};
+
+static void messaging_read_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct messaging_context *msg_ctx,
+ uint32_t msg_type)
+{
+ struct tevent_req *req;
+ struct messaging_read_state *state;
+ size_t waiters_len;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct messaging_read_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->msg_ctx = msg_ctx;
+ state->msg_type = msg_type;
+
+ waiters_len = talloc_array_length(msg_ctx->waiters);
+
+ if (waiters_len == msg_ctx->num_waiters) {
+ struct tevent_req **tmp;
+
+ tmp = talloc_realloc(msg_ctx, msg_ctx->waiters,
+ struct tevent_req *, waiters_len+1);
+ if (tevent_req_nomem(tmp, req)) {
+ return tevent_req_post(req, ev);
+ }
+ msg_ctx->waiters = tmp;
+ }
+
+ msg_ctx->waiters[msg_ctx->num_waiters] = req;
+ msg_ctx->num_waiters += 1;
+ tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+
+ return req;
+}
+
+static void messaging_read_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+ struct messaging_context *msg_ctx = state->msg_ctx;
+ struct tevent_req **waiters = msg_ctx->waiters;
+ unsigned i;
+
+ tevent_req_set_cleanup_fn(req, NULL);
+
+ for (i=0; i<msg_ctx->num_waiters; i++) {
+ if (waiters[i] == req) {
+ waiters[i] = waiters[msg_ctx->num_waiters-1];
+ msg_ctx->num_waiters -= 1;
+ return;
+ }
+ }
+}
+
+static void messaging_read_done(struct tevent_req *req, struct messaging_rec *rec)
+{
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+
+ state->rec = messaging_rec_dup(state, rec);
+ if (tevent_req_nomem(state->rec, req)) {
+ return;
+ }
+ tevent_req_done(req);
+}
+
+int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+ struct messaging_rec **presult)
+{
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+ int err;
+
+ if (tevent_req_is_unix_error(req, &err)) {
+ tevent_req_received(req);
+ return err;
+ }
+ *presult = talloc_move(mem_ctx, &state->rec);
+ return 0;
+}
+
/*
Dispatch one messaging_rec
*/
@@ -432,6 +547,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
struct messaging_rec *rec)
{
struct messaging_callback *cb, *next;
+ unsigned i;
for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
next = cb->next;
@@ -445,6 +561,16 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
the same message type */
}
}
+
+ for (i=0; i<msg_ctx->num_waiters; i++) {
+ struct tevent_req *req = msg_ctx->waiters[i];
+ struct messaging_read_state *state = tevent_req_data(
+ req, struct messaging_read_state);
+
+ if (state->msg_type == rec->msg_type) {
+ messaging_read_done(req, rec);
+ }
+ }
return;
}