diff options
author | Ronnie Sahlberg <ronniesahlberg@gmail.com> | 2011-01-14 17:35:31 +1100 |
---|---|---|
committer | Ronnie Sahlberg <ronniesahlberg@gmail.com> | 2011-01-14 17:38:56 +1100 |
commit | fcd98a7e59641021ed1b81462a154e6385795643 (patch) | |
tree | 90f6229c6189b3fda83c740d8d9ea9957b5e34c8 /ctdb/libctdb | |
parent | 6494574d8fd4b9247e45a06cd33472e67b3ed99f (diff) | |
download | samba-fcd98a7e59641021ed1b81462a154e6385795643.tar.gz samba-fcd98a7e59641021ed1b81462a154e6385795643.tar.xz samba-fcd98a7e59641021ed1b81462a154e6385795643.zip |
LIBCTDB: add support for traverse
(This used to be ctdb commit 9463e04038ba36792583f83bd95c1af322dc283a)
Diffstat (limited to 'ctdb/libctdb')
-rw-r--r-- | ctdb/libctdb/ctdb.c | 183 | ||||
-rw-r--r-- | ctdb/libctdb/tst.c | 22 |
2 files changed, 205 insertions, 0 deletions
diff --git a/ctdb/libctdb/ctdb.c b/ctdb/libctdb/ctdb.c index e06c66ca85..7115982d1a 100644 --- a/ctdb/libctdb/ctdb.c +++ b/ctdb/libctdb/ctdb.c @@ -936,3 +936,186 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db, return false; } } + + +struct ctdb_traverse_state { + struct ctdb_request *handle; + struct ctdb_db *ctdb_db; + uint64_t srvid; + + ctdb_traverse_callback_t callback; + void *cbdata; +}; + +static void traverse_remhnd_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private_data) +{ + struct ctdb_traverse_state *state = private_data; + + if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) { + DEBUG(ctdb, LOG_ERR, + "Failed to remove message handler for" + " traverse."); + state->callback(state->ctdb_db->ctdb, state->ctdb_db, + TRAVERSE_STATUS_ERROR, + tdb_null, tdb_null, + state->cbdata); + } + ctdb_request_free(ctdb, state->handle); + state->handle = NULL; + free(state); +} + +static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid, + TDB_DATA data, void *private_data) +{ + struct ctdb_traverse_state *state = private_data; + struct ctdb_db *ctdb_db = state->ctdb_db; + struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr; + TDB_DATA key; + + if (data.dsize < sizeof(uint32_t) || + d->length != data.dsize) { + DEBUG(ctdb, LOG_ERR, + "Bad data size %u in traverse_handler", + (unsigned)data.dsize); + state->callback(state->ctdb_db->ctdb, state->ctdb_db, + TRAVERSE_STATUS_ERROR, + tdb_null, tdb_null, + state->cbdata); + state->handle = ctdb_remove_message_handler_send( + state->ctdb_db->ctdb, state->srvid, + msg_h, state, + traverse_remhnd_cb, state); + return; + } + + key.dsize = d->keylen; + key.dptr = &d->data[0]; + data.dsize = d->datalen; + data.dptr = &d->data[d->keylen]; + + if (key.dsize == 0 && data.dsize == 0) { + state->callback(state->ctdb_db->ctdb, state->ctdb_db, + TRAVERSE_STATUS_FINISHED, + tdb_null, tdb_null, + state->cbdata); + state->handle = ctdb_remove_message_handler_send( + state->ctdb_db->ctdb, state->srvid, + msg_h, state, + traverse_remhnd_cb, state); + return; + } + + if (data.dsize <= sizeof(struct ctdb_ltdb_header)) { + /* empty records are deleted records in ctdb */ + return; + } + + data.dsize -= sizeof(struct ctdb_ltdb_header); + data.dptr += sizeof(struct ctdb_ltdb_header); + + if (state->callback(ctdb, ctdb_db, + TRAVERSE_STATUS_RECORD, + key, data, state->cbdata) != 0) { + state->handle = ctdb_remove_message_handler_send( + state->ctdb_db->ctdb, state->srvid, + msg_h, state, + traverse_remhnd_cb, state); + return; + } +} + +static void traverse_start_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private_data) +{ + struct ctdb_traverse_state *state = private_data; + + ctdb_request_free(ctdb, state->handle); + state->handle = NULL; +} + +static void traverse_msghnd_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private_data) +{ + struct ctdb_traverse_state *state = private_data; + struct ctdb_db *ctdb_db = state->ctdb_db; + struct ctdb_traverse_start t; + + if (!ctdb_set_message_handler_recv(ctdb, state->handle)) { + DEBUG(ctdb, LOG_ERR, + "Failed to register message handler for" + " traverse."); + state->callback(state->ctdb_db->ctdb, state->ctdb_db, + TRAVERSE_STATUS_ERROR, + tdb_null, tdb_null, + state->cbdata); + ctdb_request_free(ctdb, state->handle); + state->handle = NULL; + free(state); + return; + } + ctdb_request_free(ctdb, state->handle); + state->handle = NULL; + + t.db_id = ctdb_db->id; + t.srvid = state->srvid; + t.reqid = 0; + + state->handle = new_ctdb_control_request(ctdb, + CTDB_CONTROL_TRAVERSE_START, + CTDB_CURRENT_NODE, + &t, sizeof(t), + traverse_start_cb, state); + if (state->handle == NULL) { + DEBUG(ctdb, LOG_ERR, + "ctdb_traverse_async:" + " failed to send traverse_start control"); + state->callback(state->ctdb_db->ctdb, state->ctdb_db, + TRAVERSE_STATUS_ERROR, + tdb_null, tdb_null, + state->cbdata); + state->handle = ctdb_remove_message_handler_send( + state->ctdb_db->ctdb, state->srvid, + msg_h, state, + traverse_remhnd_cb, state); + return; + } +} + +bool ctdb_traverse_async(struct ctdb_db *ctdb_db, + ctdb_traverse_callback_t callback, void *cbdata) +{ + struct ctdb_connection *ctdb = ctdb_db->ctdb; + struct ctdb_traverse_state *state; + static uint32_t tid = 0; + + state = malloc(sizeof(struct ctdb_traverse_state)); + if (state == NULL) { + DEBUG(ctdb, LOG_ERR, + "ctdb_traverse_async: no memory." + " allocate state failed"); + return false; + } + + tid++; + state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid; + + state->callback = callback; + state->cbdata = cbdata; + state->ctdb_db = ctdb_db; + + state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb, + state->srvid, + msg_h, state, + traverse_msghnd_cb, state); + if (state->handle == NULL) { + DEBUG(ctdb, LOG_ERR, + "ctdb_traverse_async:" + " failed ctdb_set_message_handler_send"); + free(state); + return false; + } + + return true; +} diff --git a/ctdb/libctdb/tst.c b/ctdb/libctdb/tst.c index e61561fc77..0e3531d039 100644 --- a/ctdb/libctdb/tst.c +++ b/ctdb/libctdb/tst.c @@ -223,6 +223,25 @@ void message_handler_cb(struct ctdb_connection *ctdb, registered = true; } +static int traverse_callback(struct ctdb_connection *ctdb_connection, struct ctdb_db *ctdb_db, int status, TDB_DATA key, TDB_DATA data, void *private_data) +{ + if (status == TRAVERSE_STATUS_FINISHED) { + printf("Traverse finished\n"); + return 0; + } + if (status == TRAVERSE_STATUS_ERROR) { + printf("Traverse failed\n"); + return 1; + } + + printf("traverse callback status:%d\n", status); + printf("key: %d [%s]\n", key.dsize, key.dptr); + printf("data:%d [%s]\n", data.dsize, data.dptr); + + return 0; +} + + int main(int argc, char *argv[]) { struct ctdb_connection *ctdb_connection; @@ -366,6 +385,9 @@ int main(int argc, char *argv[]) print_nodemap(nodemap); ctdb_free_nodemap(nodemap); + printf("Traverse the test_test.tdb database\n"); + ctdb_traverse_async(ctdb_db_context, traverse_callback, NULL); + for (;;) { pfd.events = ctdb_which_events(ctdb_connection); |