summaryrefslogtreecommitdiffstats
path: root/ctdb/libctdb
diff options
context:
space:
mode:
authorRonnie Sahlberg <ronniesahlberg@gmail.com>2011-01-14 17:35:31 +1100
committerRonnie Sahlberg <ronniesahlberg@gmail.com>2011-01-14 17:38:56 +1100
commitfcd98a7e59641021ed1b81462a154e6385795643 (patch)
tree90f6229c6189b3fda83c740d8d9ea9957b5e34c8 /ctdb/libctdb
parent6494574d8fd4b9247e45a06cd33472e67b3ed99f (diff)
downloadsamba-fcd98a7e59641021ed1b81462a154e6385795643.tar.gz
samba-fcd98a7e59641021ed1b81462a154e6385795643.tar.xz
samba-fcd98a7e59641021ed1b81462a154e6385795643.zip
LIBCTDB: add support for traverse
(This used to be ctdb commit 9463e04038ba36792583f83bd95c1af322dc283a)
Diffstat (limited to 'ctdb/libctdb')
-rw-r--r--ctdb/libctdb/ctdb.c183
-rw-r--r--ctdb/libctdb/tst.c22
2 files changed, 205 insertions, 0 deletions
diff --git a/ctdb/libctdb/ctdb.c b/ctdb/libctdb/ctdb.c
index e06c66ca85..7115982d1a 100644
--- a/ctdb/libctdb/ctdb.c
+++ b/ctdb/libctdb/ctdb.c
@@ -936,3 +936,186 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db,
return false;
}
}
+
+
+struct ctdb_traverse_state {
+ struct ctdb_request *handle;
+ struct ctdb_db *ctdb_db;
+ uint64_t srvid;
+
+ ctdb_traverse_callback_t callback;
+ void *cbdata;
+};
+
+static void traverse_remhnd_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+
+ if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) {
+ DEBUG(ctdb, LOG_ERR,
+ "Failed to remove message handler for"
+ " traverse.");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ }
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+ free(state);
+}
+
+static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid,
+ TDB_DATA data, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+ struct ctdb_db *ctdb_db = state->ctdb_db;
+ struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
+ TDB_DATA key;
+
+ if (data.dsize < sizeof(uint32_t) ||
+ d->length != data.dsize) {
+ DEBUG(ctdb, LOG_ERR,
+ "Bad data size %u in traverse_handler",
+ (unsigned)data.dsize);
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+
+ key.dsize = d->keylen;
+ key.dptr = &d->data[0];
+ data.dsize = d->datalen;
+ data.dptr = &d->data[d->keylen];
+
+ if (key.dsize == 0 && data.dsize == 0) {
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_FINISHED,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+
+ if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+ /* empty records are deleted records in ctdb */
+ return;
+ }
+
+ data.dsize -= sizeof(struct ctdb_ltdb_header);
+ data.dptr += sizeof(struct ctdb_ltdb_header);
+
+ if (state->callback(ctdb, ctdb_db,
+ TRAVERSE_STATUS_RECORD,
+ key, data, state->cbdata) != 0) {
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+}
+
+static void traverse_start_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+}
+
+static void traverse_msghnd_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+ struct ctdb_db *ctdb_db = state->ctdb_db;
+ struct ctdb_traverse_start t;
+
+ if (!ctdb_set_message_handler_recv(ctdb, state->handle)) {
+ DEBUG(ctdb, LOG_ERR,
+ "Failed to register message handler for"
+ " traverse.");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+ free(state);
+ return;
+ }
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+
+ t.db_id = ctdb_db->id;
+ t.srvid = state->srvid;
+ t.reqid = 0;
+
+ state->handle = new_ctdb_control_request(ctdb,
+ CTDB_CONTROL_TRAVERSE_START,
+ CTDB_CURRENT_NODE,
+ &t, sizeof(t),
+ traverse_start_cb, state);
+ if (state->handle == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async:"
+ " failed to send traverse_start control");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+}
+
+bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
+ ctdb_traverse_callback_t callback, void *cbdata)
+{
+ struct ctdb_connection *ctdb = ctdb_db->ctdb;
+ struct ctdb_traverse_state *state;
+ static uint32_t tid = 0;
+
+ state = malloc(sizeof(struct ctdb_traverse_state));
+ if (state == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async: no memory."
+ " allocate state failed");
+ return false;
+ }
+
+ tid++;
+ state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid;
+
+ state->callback = callback;
+ state->cbdata = cbdata;
+ state->ctdb_db = ctdb_db;
+
+ state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb,
+ state->srvid,
+ msg_h, state,
+ traverse_msghnd_cb, state);
+ if (state->handle == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async:"
+ " failed ctdb_set_message_handler_send");
+ free(state);
+ return false;
+ }
+
+ return true;
+}
diff --git a/ctdb/libctdb/tst.c b/ctdb/libctdb/tst.c
index e61561fc77..0e3531d039 100644
--- a/ctdb/libctdb/tst.c
+++ b/ctdb/libctdb/tst.c
@@ -223,6 +223,25 @@ void message_handler_cb(struct ctdb_connection *ctdb,
registered = true;
}
+static int traverse_callback(struct ctdb_connection *ctdb_connection, struct ctdb_db *ctdb_db, int status, TDB_DATA key, TDB_DATA data, void *private_data)
+{
+ if (status == TRAVERSE_STATUS_FINISHED) {
+ printf("Traverse finished\n");
+ return 0;
+ }
+ if (status == TRAVERSE_STATUS_ERROR) {
+ printf("Traverse failed\n");
+ return 1;
+ }
+
+ printf("traverse callback status:%d\n", status);
+ printf("key: %d [%s]\n", key.dsize, key.dptr);
+ printf("data:%d [%s]\n", data.dsize, data.dptr);
+
+ return 0;
+}
+
+
int main(int argc, char *argv[])
{
struct ctdb_connection *ctdb_connection;
@@ -366,6 +385,9 @@ int main(int argc, char *argv[])
print_nodemap(nodemap);
ctdb_free_nodemap(nodemap);
+ printf("Traverse the test_test.tdb database\n");
+ ctdb_traverse_async(ctdb_db_context, traverse_callback, NULL);
+
for (;;) {
pfd.events = ctdb_which_events(ctdb_connection);