diff options
author | Volker Lendecke <vlendec@samba.org> | 2007-05-21 22:17:13 +0000 |
---|---|---|
committer | Gerald (Jerry) Carter <jerry@samba.org> | 2007-10-10 12:22:17 -0500 |
commit | ac3f08ddbe0b484375624db0e35999a8584b57f4 (patch) | |
tree | 477347104c60dc6ae205257d654b1d89c7903f35 /source3/lib/messages.c | |
parent | f96242d9331a5fcdc65445d0d0ea7177c7ddc6e0 (diff) | |
download | samba-ac3f08ddbe0b484375624db0e35999a8584b57f4.tar.gz samba-ac3f08ddbe0b484375624db0e35999a8584b57f4.tar.xz samba-ac3f08ddbe0b484375624db0e35999a8584b57f4.zip |
r23055: Rewrite messages.c to use auto-generated marshalling in the tdb. I'm
doing this because for the clustering the marshalling is needed in more
than one place, so I wanted a decent routine to marshall a message_rec
struct which was not there before.
Tridge, this seems about the same speed as it used to be before, the
librpc/ndr overhead in my tests was under the noise.
Volker
(This used to be commit eaefd00563173dfabb7716c5695ac0a2f7139bb6)
Diffstat (limited to 'source3/lib/messages.c')
-rw-r--r-- | source3/lib/messages.c | 597 |
1 files changed, 267 insertions, 330 deletions
diff --git a/source3/lib/messages.c b/source3/lib/messages.c index 95f4aba4e78..6932369b217 100644 --- a/source3/lib/messages.c +++ b/source3/lib/messages.c @@ -47,54 +47,30 @@ */ #include "includes.h" +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/ndr_messaging.h" /* the locking database handle */ -static TDB_CONTEXT *tdb; static int received_signal; /* change the message version with any incompatible changes in the protocol */ -#define MESSAGE_VERSION 1 +#define MESSAGE_VERSION 2 -struct message_rec { - int msg_version; - int msg_type; - struct server_id dest; - struct server_id src; - size_t len; -}; - -/* we have a linked list of dispatch handlers */ -static struct dispatch_fns { - struct dispatch_fns *next, *prev; - int msg_type; - void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len, - void *private_data); +struct messaging_callback { + struct messaging_callback *prev, *next; + uint32 msg_type; + void (*fn)(struct messaging_context *msg, void *private_data, + uint32_t msg_type, + struct server_id server_id, DATA_BLOB *data); void *private_data; -} *dispatch_fns; - -static void message_register(int msg_type, - void (*fn)(int msg_type, struct server_id pid, - void *buf, size_t len, - void *private_data), - void *private_data); - -/**************************************************************************** - Free global objects. -****************************************************************************/ +}; -void gfree_messages(void) -{ - struct dispatch_fns *dfn, *next; - - /* delete the dispatch_fns list */ - dfn = dispatch_fns; - while( dfn ) { - next = dfn->next; - DLIST_REMOVE(dispatch_fns, dfn); - SAFE_FREE(dfn); - dfn = next; - } -} +struct messaging_context { + TDB_CONTEXT *tdb; + struct server_id id; + struct event_context *event_ctx; + struct messaging_callback *callbacks; +}; /**************************************************************************** Notifications come in as signals. @@ -106,21 +82,25 @@ static void sig_usr1(void) sys_select_signal(SIGUSR1); } -static NTSTATUS message_send_pid(struct server_id pid, int msg_type, - const void *buf, size_t len); +static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb, + struct server_id pid, int msg_type, + const void *buf, size_t len); /**************************************************************************** A useful function for testing the message system. ****************************************************************************/ -static void ping_message(int msg_type, struct server_id src, - void *buf, size_t len, void *private_data) +static void ping_message(struct messaging_context *msg_ctx, + void *private_data, + uint32_t msg_type, + struct server_id src, + DATA_BLOB *data) { - const char *msg = buf ? (const char *)buf : "none"; + const char *msg = data->data ? (const char *)data->data : "none"; DEBUG(1,("INFO: Received PING message from PID %s [%s]\n", procid_str_static(&src), msg)); - message_send_pid(src, MSG_PONG, buf, len); + messaging_send(msg_ctx, src, MSG_PONG, data); } /**************************************************************************** @@ -131,24 +111,21 @@ static BOOL message_init(struct messaging_context *msg_ctx) { sec_init(); - if (tdb) - return True; - - tdb = tdb_open_log(lock_path("messages.tdb"), - 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, - O_RDWR|O_CREAT,0600); + msg_ctx->tdb = tdb_open_log(lock_path("messages.tdb"), + 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, + O_RDWR|O_CREAT,0600); - if (!tdb) { + if (!msg_ctx->tdb) { DEBUG(0,("ERROR: Failed to initialise messages database\n")); return False; } /* Activate the per-hashchain freelist */ - tdb_set_max_dead(tdb, 5); + tdb_set_max_dead(msg_ctx->tdb, 5); CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1); - message_register(MSG_PING, ping_message, NULL); + messaging_register(msg_ctx, NULL, MSG_PING, ping_message); /* Register some debugging related messages */ @@ -175,6 +152,99 @@ static TDB_DATA message_key_pid(struct server_id pid) return kbuf; } +/* + Fetch the messaging array for a process + */ + +static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb, + TDB_DATA key, + TALLOC_CTX *mem_ctx, + struct messaging_array **presult) +{ + struct messaging_array *result; + TDB_DATA data; + DATA_BLOB blob; + NTSTATUS status; + + if (!(result = TALLOC_ZERO_P(mem_ctx, struct messaging_array))) { + return NT_STATUS_NO_MEMORY; + } + + data = tdb_fetch(msg_tdb, key); + + if (data.dptr == NULL) { + *presult = result; + return NT_STATUS_OK; + } + + blob = data_blob_const(data.dptr, data.dsize); + + status = ndr_pull_struct_blob( + &blob, result, result, + (ndr_pull_flags_fn_t)ndr_pull_messaging_array); + + SAFE_FREE(data.dptr); + + if (!NT_STATUS_IS_OK(status)) { + TALLOC_FREE(result); + return status; + } + + if (DEBUGLEVEL >= 10) { + DEBUG(10, ("messaging_tdb_fetch:\n")); + NDR_PRINT_DEBUG(messaging_array, result); + } + + *presult = result; + return NT_STATUS_OK; +} + +/* + Store a messaging array for a pid +*/ + +static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb, + TDB_DATA key, + struct messaging_array *array) +{ + TDB_DATA data; + DATA_BLOB blob; + NTSTATUS status; + TALLOC_CTX *mem_ctx; + int ret; + + if (array->num_messages == 0) { + tdb_delete(msg_tdb, key); + return NT_STATUS_OK; + } + + if (!(mem_ctx = talloc_new(array))) { + return NT_STATUS_NO_MEMORY; + } + + status = ndr_push_struct_blob( + &blob, mem_ctx, array, + (ndr_push_flags_fn_t)ndr_push_messaging_array); + + if (!NT_STATUS_IS_OK(status)) { + talloc_free(mem_ctx); + return status; + } + + if (DEBUGLEVEL >= 10) { + DEBUG(10, ("messaging_tdb_store:\n")); + NDR_PRINT_DEBUG(messaging_array, array); + } + + data.dptr = blob.data; + data.dsize = blob.length; + + ret = tdb_store(msg_tdb, key, data, TDB_REPLACE); + TALLOC_FREE(mem_ctx); + + return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION; +} + /**************************************************************************** Notify a process that it has a message. If the process doesn't exist then delete its record in the database. @@ -216,17 +286,6 @@ static NTSTATUS message_notify(struct server_id procid) * Something has gone wrong */ - if (errno == ESRCH) { - DEBUG(2,("pid %d doesn't exist - deleting messages record\n", - (int)pid)); - tdb_delete(tdb, message_key_pid(procid)); - - /* - * INVALID_HANDLE is the closest I can think of -- vl - */ - return NT_STATUS_INVALID_HANDLE; - } - DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno))); @@ -235,6 +294,7 @@ static NTSTATUS message_notify(struct server_id procid) * errormap.o into lots of utils. */ + if (errno == ESRCH) return NT_STATUS_INVALID_HANDLE; if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER; if (errno == EPERM) return NT_STATUS_ACCESS_DENIED; return NT_STATUS_UNSUCCESSFUL; @@ -244,12 +304,15 @@ static NTSTATUS message_notify(struct server_id procid) Send a message to a particular pid. ****************************************************************************/ -static NTSTATUS message_send_pid(struct server_id pid, int msg_type, - const void *buf, size_t len) +static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb, + struct server_id pid, int msg_type, + const void *buf, size_t len) { - TDB_DATA dbuf; - struct message_rec rec; - int ret; + struct messaging_array *msg_array; + struct messaging_rec *rec; + TALLOC_CTX *mem_ctx; + NTSTATUS status; + TDB_DATA key = message_key_pid(pid); /* NULL pointer means implicit length zero. */ if (!buf) { @@ -263,138 +326,129 @@ static NTSTATUS message_send_pid(struct server_id pid, int msg_type, SMB_ASSERT(procid_to_pid(&pid) > 0); - rec.msg_version = MESSAGE_VERSION; - rec.msg_type = msg_type; - rec.dest = pid; - rec.src = procid_self(); - rec.len = buf ? len : 0; + if (!(mem_ctx = talloc_init("message_send_pid"))) { + return NT_STATUS_NO_MEMORY; + } + + if (tdb_chainlock(msg_tdb, key) == -1) { + return NT_STATUS_LOCK_NOT_GRANTED; + } + + status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &msg_array); + + if (!NT_STATUS_IS_OK(status)) { + tdb_chainunlock(msg_tdb, key); + TALLOC_FREE(mem_ctx); + return status; + } - dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec)); - if (!dbuf.dptr) { + if (!(rec = TALLOC_REALLOC_ARRAY(mem_ctx, msg_array->messages, + struct messaging_rec, + msg_array->num_messages+1))) { + tdb_chainunlock(msg_tdb, key); + TALLOC_FREE(mem_ctx); return NT_STATUS_NO_MEMORY; } - memcpy(dbuf.dptr, &rec, sizeof(rec)); - if (len > 0 && buf) - memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len); + rec[msg_array->num_messages].msg_version = MESSAGE_VERSION; + rec[msg_array->num_messages].msg_type = msg_type; + rec[msg_array->num_messages].dest = pid; + rec[msg_array->num_messages].src = procid_self(); + rec[msg_array->num_messages].buf = data_blob_const(buf, len); - dbuf.dsize = len + sizeof(rec); + msg_array->messages = rec; + msg_array->num_messages += 1; - ret = tdb_append(tdb, message_key_pid(pid), dbuf); + status = messaging_tdb_store(msg_tdb, key, msg_array); - SAFE_FREE(dbuf.dptr); + tdb_chainunlock(msg_tdb, key); + TALLOC_FREE(mem_ctx); + + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + status = message_notify(pid); - if (ret == -1) { - return NT_STATUS_INTERNAL_ERROR; + if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) { + DEBUG(2, ("pid %s doesn't exist - deleting messages record\n", + procid_str_static(&pid))); + tdb_delete(msg_tdb, message_key_pid(pid)); } - errno = 0; /* paranoia */ - return message_notify(pid); + return status; } /**************************************************************************** Count the messages pending for a particular pid. Expensive.... ****************************************************************************/ -unsigned int messages_pending_for_pid(struct server_id pid) +unsigned int messages_pending_for_pid(struct messaging_context *msg_ctx, + struct server_id pid) { - TDB_DATA dbuf; - uint8 *buf; - unsigned int message_count = 0; + struct messaging_array *msg_array; + unsigned int result; - dbuf = tdb_fetch(tdb, message_key_pid(pid)); - if (dbuf.dptr == NULL || dbuf.dsize == 0) { - SAFE_FREE(dbuf.dptr); + if (!NT_STATUS_IS_OK(messaging_tdb_fetch(msg_ctx->tdb, + message_key_pid(pid), NULL, + &msg_array))) { + DEBUG(10, ("messaging_tdb_fetch failed\n")); return 0; } - for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) { - struct message_rec rec; - memcpy(&rec, buf, sizeof(rec)); - buf += (sizeof(rec) + rec.len); - dbuf.dsize -= (sizeof(rec) + rec.len); - message_count++; - } - - SAFE_FREE(dbuf.dptr); - return message_count; -} + result = msg_array->num_messages; + TALLOC_FREE(msg_array); + return result; +} /**************************************************************************** Retrieve all messages for the current process. ****************************************************************************/ -static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len) +static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb, + TALLOC_CTX *mem_ctx, + struct messaging_array **presult) { - TDB_DATA kbuf; - TDB_DATA dbuf; - TDB_DATA null_dbuf; - - ZERO_STRUCT(null_dbuf); - - *msgs_buf = NULL; - *total_len = 0; + struct messaging_array *result; + TDB_DATA key = message_key_pid(procid_self()); + NTSTATUS status; - kbuf = message_key_pid(procid_self()); + if (tdb_chainlock(msg_tdb, key) == -1) { + return NT_STATUS_LOCK_NOT_GRANTED; + } - if (tdb_chainlock(tdb, kbuf) == -1) - return False; + status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result); - dbuf = tdb_fetch(tdb, kbuf); /* - * Replace with an empty record to keep the allocated - * space in the tdb. + * We delete the record here, tdb_set_max_dead keeps it around */ - tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE); - tdb_chainunlock(tdb, kbuf); + tdb_delete(msg_tdb, key); + tdb_chainunlock(msg_tdb, key); - if (dbuf.dptr == NULL || dbuf.dsize == 0) { - SAFE_FREE(dbuf.dptr); - return False; + if (NT_STATUS_IS_OK(status)) { + *presult = result; } - *msgs_buf = (char *)dbuf.dptr; - *total_len = dbuf.dsize; - - return True; + return status; } -/**************************************************************************** - Parse out the next message for the current process. -****************************************************************************/ - -static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, - struct server_id *src, char **buf, size_t *len) +/* + Dispatch one messsaging_rec +*/ +static void messaging_dispatch_rec(struct messaging_context *msg_ctx, + struct messaging_rec *rec) { - struct message_rec rec; - char *ret_buf = *buf; - - *buf = NULL; - *len = 0; - - if (total_len - (ret_buf - msgs_buf) < sizeof(rec)) - return False; - - memcpy(&rec, ret_buf, sizeof(rec)); - ret_buf += sizeof(rec); - - if (rec.msg_version != MESSAGE_VERSION) { - DEBUG(0,("message version %d received (expected %d)\n", - rec.msg_version, MESSAGE_VERSION)); - return False; - } + struct messaging_callback *cb, *next; - if (rec.len > 0) { - if (total_len - (ret_buf - msgs_buf) < rec.len) - return False; + for (cb = msg_ctx->callbacks; cb != NULL; cb = next) { + next = cb->next; + if (cb->msg_type == rec->msg_type) { + cb->fn(msg_ctx, cb->private_data, rec->msg_type, + rec->src, &rec->buf); + return; + } } - - *len = rec.len; - *msg_type = rec.msg_type; - *src = rec.src; - *buf = ret_buf; - - return True; + return; } /**************************************************************************** @@ -404,14 +458,10 @@ static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, messages on an *odd* byte boundary. ****************************************************************************/ -void message_dispatch(void) +void message_dispatch(struct messaging_context *msg_ctx) { - int msg_type; - struct server_id src; - char *buf; - char *msgs_buf; - size_t len, total_len; - int n_handled; + struct messaging_array *msg_array = NULL; + uint32 i; if (!received_signal) return; @@ -421,37 +471,16 @@ void message_dispatch(void) received_signal = 0; - if (!retrieve_all_messages(&msgs_buf, &total_len)) + if (!NT_STATUS_IS_OK(retrieve_all_messages(msg_ctx->tdb, NULL, + &msg_array))) { return; + } - for (buf = msgs_buf; - message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); - buf += len) { - struct dispatch_fns *dfn; - - DEBUG(10,("message_dispatch: received msg_type=%d " - "src_pid=%u\n", msg_type, - (unsigned int) procid_to_pid(&src))); - - n_handled = 0; - for (dfn = dispatch_fns; dfn; dfn = dfn->next) { - if (dfn->msg_type == msg_type) { - DEBUG(10,("message_dispatch: processing " - "message of type %d.\n", msg_type)); - dfn->fn(msg_type, src, - len ? (void *)buf : NULL, len, - dfn->private_data); - n_handled++; - break; - } - } - if (!n_handled) { - DEBUG(5,("message_dispatch: warning: no handler " - "registed for msg_type %d in pid %u\n", - msg_type, (unsigned int)sys_getpid())); - } + for (i=0; i<msg_array->num_messages; i++) { + messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]); } - SAFE_FREE(msgs_buf); + + TALLOC_FREE(msg_array); } /**************************************************************************** @@ -461,60 +490,12 @@ void message_dispatch(void) messages on an *odd* byte boundary. ****************************************************************************/ -static void message_register(int msg_type, - void (*fn)(int msg_type, struct server_id pid, - void *buf, size_t len, - void *private_data), - void *private_data) -{ - struct dispatch_fns *dfn; - - for (dfn = dispatch_fns; dfn; dfn = dfn->next) { - if (dfn->msg_type == msg_type) { - dfn->fn = fn; - return; - } - } - - if (!(dfn = SMB_MALLOC_P(struct dispatch_fns))) { - DEBUG(0,("message_register: Not enough memory. malloc " - "failed!\n")); - return; - } - - ZERO_STRUCTPN(dfn); - - dfn->msg_type = msg_type; - dfn->fn = fn; - dfn->private_data = private_data; - - DLIST_ADD(dispatch_fns, dfn); -} - -/**************************************************************************** - De-register the function for a particular message type. -****************************************************************************/ - -static void message_deregister(int msg_type) -{ - struct dispatch_fns *dfn, *next; - - for (dfn = dispatch_fns; dfn; dfn = next) { - next = dfn->next; - if (dfn->msg_type == msg_type) { - DLIST_REMOVE(dispatch_fns, dfn); - SAFE_FREE(dfn); - return; - } - } -} - struct msg_all { + struct messaging_context *msg_ctx; int msg_type; uint32 msg_flag; const void *buf; size_t len; - BOOL duplicates; int n_sent; }; @@ -522,41 +503,44 @@ struct msg_all { Send one of the messages for the broadcast. ****************************************************************************/ -static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, - void *state) +static int traverse_fn(TDB_CONTEXT *the_tdb, + const struct connections_key *ckey, + const struct connections_data *crec, + void *private_data) { - struct connections_data crec; - struct msg_all *msg_all = (struct msg_all *)state; + struct msg_all *msg_all = (struct msg_all *)private_data; NTSTATUS status; - if (dbuf.dsize != sizeof(crec)) - return 0; - - memcpy(&crec, dbuf.dptr, sizeof(crec)); - - if (crec.cnum != -1) + if (crec->cnum != -1) return 0; /* Don't send if the receiver hasn't registered an interest. */ - if(!(crec.bcast_msg_flags & msg_all->msg_flag)) + if(!(crec->bcast_msg_flags & msg_all->msg_flag)) return 0; /* If the msg send fails because the pid was not found (i.e. smbd died), * the msg has already been deleted from the messages.tdb.*/ - status = message_send_pid(crec.pid, msg_all->msg_type, - msg_all->buf, msg_all->len); + status = messaging_send_buf(msg_all->msg_ctx, + crec->pid, msg_all->msg_type, + (uint8 *)msg_all->buf, msg_all->len); if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) { + + TDB_DATA key; /* If the pid was not found delete the entry from * connections.tdb */ DEBUG(2,("pid %s doesn't exist - deleting connections " - "%d [%s]\n", procid_str_static(&crec.pid), crec.cnum, - crec.servicename)); - tdb_delete(the_tdb, kbuf); + "%d [%s]\n", procid_str_static(&crec->pid), + crec->cnum, crec->servicename)); + + key.dptr = (uint8 *)ckey; + key.dsize = sizeof(*ckey); + + tdb_delete(the_tdb, key); } msg_all->n_sent++; return 0; @@ -577,7 +561,6 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, BOOL message_send_all(struct messaging_context *msg_ctx, int msg_type, const void *buf, size_t len, - BOOL duplicates_allowed, int *n_sent) { struct msg_all msg_all; @@ -598,10 +581,10 @@ BOOL message_send_all(struct messaging_context *msg_ctx, msg_all.buf = buf; msg_all.len = len; - msg_all.duplicates = duplicates_allowed; msg_all.n_sent = 0; + msg_all.msg_ctx = msg_ctx; - connections_traverse(traverse_fn, &msg_all); + connections_forall(traverse_fn, &msg_all); if (n_sent) *n_sent = msg_all.n_sent; return True; @@ -622,40 +605,6 @@ void message_unblock(void) BlockSignals(False, SIGUSR1); } -/* - * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could - * import the whole Samba4 thing, but I want notify.c from Samba4 in first. - */ - -struct messaging_callback { - struct messaging_callback *prev, *next; - uint32 msg_type; - void (*fn)(struct messaging_context *msg, void *private_data, - uint32_t msg_type, - struct server_id server_id, DATA_BLOB *data); - void *private_data; -}; - -struct messaging_context { - struct server_id id; - struct event_context *event_ctx; - struct messaging_callback *callbacks; -}; - -static int messaging_context_destructor(struct messaging_context *ctx) -{ - struct messaging_callback *cb; - - for (cb = ctx->callbacks; cb; cb = cb->next) { - /* - * We unconditionally remove all instances of our callback - * from the tdb basis. - */ - message_deregister(cb->msg_type); - } - return 0; -} - struct event_context *messaging_event_context(struct messaging_context *msg_ctx) { return msg_ctx->event_ctx; @@ -673,7 +622,6 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, ctx->id = server_id; ctx->event_ctx = ev; - talloc_set_destructor(ctx, messaging_context_destructor); if (!message_init(ctx)) { DEBUG(0, ("message_init failed: %s\n", strerror(errno))); @@ -683,35 +631,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, return ctx; } -static void messaging_callback(int msg_type, struct server_id pid, - void *buf, size_t len, void *private_data) -{ - struct messaging_context *ctx = talloc_get_type_abort( - private_data, struct messaging_context); - struct messaging_callback *cb, *next; - - for (cb = ctx->callbacks; cb; cb = next) { - /* - * Allow a callback to remove itself - */ - next = cb->next; - - if (msg_type == cb->msg_type) { - DATA_BLOB blob; - - blob.data = (uint8 *)buf; - blob.length = len; - - cb->fn(ctx, cb->private_data, msg_type, pid, &blob); - } - } -} - /* * Register a dispatch function for a particular message type. Allow multiple * registrants */ -NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data, +NTSTATUS messaging_register(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, void (*fn)(struct messaging_context *msg, void *private_data, @@ -721,7 +646,19 @@ NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data, { struct messaging_callback *cb; - if (!(cb = talloc(ctx, struct messaging_callback))) { + /* + * Only one callback per type + */ + + for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) { + if (cb->msg_type == msg_type) { + cb->fn = fn; + cb->private_data = private_data; + return NT_STATUS_OK; + } + } + + if (!(cb = talloc(msg_ctx, struct messaging_callback))) { return NT_STATUS_NO_MEMORY; } @@ -729,8 +666,7 @@ NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data, cb->fn = fn; cb->private_data = private_data; - DLIST_ADD(ctx->callbacks, cb); - message_register(msg_type, messaging_callback, ctx); + DLIST_ADD(msg_ctx->callbacks, cb); return NT_STATUS_OK; } @@ -759,7 +695,8 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx, struct server_id server, uint32_t msg_type, const DATA_BLOB *data) { - return message_send_pid(server, msg_type, data->data, data->length); + return messaging_tdb_send(msg_ctx->tdb, server, msg_type, + data->data, data->length); } NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx, |