From c4826c203afcca61ff6791d3c0307e792615762e Mon Sep 17 00:00:00 2001
From: Andrew Tridgell <tridge@samba.org>
Date: Sat, 5 Jan 2008 09:31:43 +1100
Subject: added async pull, push and rsn handling functions (This used to be
 ctdb commit 05d30180f64aaff13411b92586ac554d84a35d9a)

---
 ctdb/client/ctdb_client.c | 228 +++++++++++++++++++++++++++++++---------------
 1 file changed, 157 insertions(+), 71 deletions(-)

(limited to 'ctdb/client')

diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c
index 1935f5b7b15..70ccff617a1 100644
--- a/ctdb/client/ctdb_client.c
+++ b/ctdb/client/ctdb_client.c
@@ -1278,67 +1278,118 @@ int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint3
 	return 0;
 }
 
+
 /*
-  get all keys and records for a specific database
+  async send for pull database
  */
-int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid, uint32_t lmaster, 
-		     TALLOC_CTX *mem_ctx, struct ctdb_key_list *keys)
+struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
+	struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
+	uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
 {
-	int i, ret;
-	TDB_DATA indata, outdata;
-	struct ctdb_control_pulldb pull;
-	struct ctdb_control_pulldb_reply *reply;
-	struct ctdb_rec_data *rec;
-	int32_t res;
+	TDB_DATA indata;
+	struct ctdb_control_pulldb *pull;
+	struct ctdb_client_control_state *state;
+
+	pull = talloc(mem_ctx, struct ctdb_control_pulldb);
+	CTDB_NO_MEMORY_NULL(ctdb, pull);
 
-	pull.db_id   = dbid;
-	pull.lmaster = lmaster;
+	pull->db_id   = dbid;
+	pull->lmaster = lmaster;
 
 	indata.dsize = sizeof(struct ctdb_control_pulldb);
-	indata.dptr  = (unsigned char *)&pull;
+	indata.dptr  = (unsigned char *)pull;
 
-	ret = ctdb_control(ctdb, destnode, 0, 
-			   CTDB_CONTROL_PULL_DB, 0, indata, 
-			   mem_ctx, &outdata, &res, NULL, NULL);
-	if (ret != 0 || res != 0) {
-		DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
+	state = ctdb_control_send(ctdb, destnode, 0, 
+				  CTDB_CONTROL_PULL_DB, 0, indata, 
+				  mem_ctx, NULL, &timeout, NULL);
+	talloc_free(pull);
+
+	return state;
+}
+
+/*
+  async recv for pull database
+ */
+int ctdb_ctrl_pulldb_recv(
+	struct ctdb_context *ctdb, 
+	TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
+	TDB_DATA *outdata)
+{
+	int ret;
+	int32_t res;
+
+	ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
+	if ( (ret != 0) || (res != 0) ){
+		DEBUG(0,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
 		return -1;
 	}
 
+	return 0;
+}
 
-	reply = (struct ctdb_control_pulldb_reply *)outdata.dptr;
-	keys->dbid     = reply->db_id;
-	keys->num      = reply->count;
+/*
+  pull all keys and records for a specific database on a node
+ */
+int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
+		uint32_t dbid, uint32_t lmaster, 
+		TALLOC_CTX *mem_ctx, struct timeval timeout,
+		TDB_DATA *outdata)
+{
+	struct ctdb_client_control_state *state;
+
+	state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
+				      timeout);
 	
-	keys->keys     = talloc_array(mem_ctx, TDB_DATA, keys->num);
-	keys->headers  = talloc_array(mem_ctx, struct ctdb_ltdb_header, keys->num);
-	keys->data     = talloc_array(mem_ctx, TDB_DATA, keys->num);
+	return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
+}
 
-	rec = (struct ctdb_rec_data *)&reply->data[0];
 
-	for (i=0;i<reply->count;i++) {
-		keys->keys[i].dptr = talloc_memdup(mem_ctx, &rec->data[0], rec->keylen);
-		keys->keys[i].dsize = rec->keylen;
-		
-		keys->data[i].dptr = talloc_memdup(mem_ctx, &rec->data[keys->keys[i].dsize], rec->datalen);
-		keys->data[i].dsize = rec->datalen;
 
-		if (keys->data[i].dsize < sizeof(struct ctdb_ltdb_header)) {
-			DEBUG(0,(__location__ " bad ltdb record\n"));
-			return -1;
-		}
-		memcpy(&keys->headers[i], keys->data[i].dptr, sizeof(struct ctdb_ltdb_header));
-		keys->data[i].dptr += sizeof(struct ctdb_ltdb_header);
-		keys->data[i].dsize -= sizeof(struct ctdb_ltdb_header);
+/*
+  async send for pushdb
+ */
+struct ctdb_client_control_state *ctdb_ctrl_pushdb_send(
+	struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
+	TALLOC_CTX *mem_ctx, struct timeval timeout, TDB_DATA indata)
+{
+	return ctdb_control_send(ctdb, destnode, 0, 
+			   CTDB_CONTROL_PUSH_DB, 0, indata, 
+			   mem_ctx, NULL, &timeout, NULL);
+}
 
-		rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
-	}	    
+int ctdb_ctrl_pushdb_recv(
+	struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
+	struct ctdb_client_control_state *state)
+{
+	int ret;
+	int32_t res;
 
-	talloc_free(outdata.dptr);
+	ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
+	if ( (ret != 0) || (res != 0) ){
+		DEBUG(0,(__location__ " ctdb_ctrl_pushdb_recv failed\n"));
+		return -1;
+	}
 
 	return 0;
 }
 
+/*
+  push all records to a specific database on a node
+ */
+int ctdb_ctrl_pushdb(struct ctdb_context *ctdb, uint32_t destnode, 
+		uint32_t dbid,
+		TALLOC_CTX *mem_ctx, struct timeval timeout,
+		TDB_DATA indata)
+{
+	struct ctdb_client_control_state *state;
+
+	state = ctdb_ctrl_pushdb_send(ctdb, destnode, dbid, mem_ctx,
+			timeout, indata);
+	
+	return ctdb_ctrl_pushdb_recv(ctdb, mem_ctx, state);
+}
+
+
 /*
   copy a tdb from one node to another node
  */
@@ -1346,32 +1397,22 @@ int ctdb_ctrl_copydb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t
 		     uint32_t destnode, uint32_t dbid, uint32_t lmaster, TALLOC_CTX *mem_ctx)
 {
 	int ret;
-	TDB_DATA indata, outdata;
-	int32_t res;
-
-	indata.dsize = 2*sizeof(uint32_t);
-	indata.dptr  = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
-
-	((uint32_t *)(&indata.dptr[0]))[0] = dbid;
-	((uint32_t *)(&indata.dptr[0]))[1] = lmaster;
+	TDB_DATA outdata;
 
 	DEBUG(3,("pulling dbid 0x%x from %u\n", dbid, sourcenode));
 
-	ret = ctdb_control(ctdb, sourcenode, 0, 
-			   CTDB_CONTROL_PULL_DB, 0, indata, 
-			   mem_ctx, &outdata, &res, &timeout, NULL);
-	if (ret != 0 || res != 0) {
+	ret = ctdb_ctrl_pulldb(ctdb, sourcenode, dbid, lmaster, mem_ctx,
+				timeout, &outdata);
+	if (ret != 0) {
 		DEBUG(0,(__location__ " ctdb_control for pulldb failed\n"));
 		return -1;
 	}
 
 	DEBUG(3,("pushing dbid 0x%x to %u\n", dbid, destnode));
 
-	ret = ctdb_control(ctdb, destnode, 0, 
-			   CTDB_CONTROL_PUSH_DB, 0, outdata, 
-			   mem_ctx, NULL, &res, &timeout, NULL);
+	ret = ctdb_ctrl_pushdb(ctdb, destnode, dbid, mem_ctx, timeout, outdata);
 	talloc_free(outdata.dptr);
-	if (ret != 0 || res != 0) {
+	if (ret != 0) {
 		DEBUG(0,(__location__ " ctdb_control for pushdb failed\n"));
 		return -1;
 	}
@@ -1982,12 +2023,11 @@ int ctdb_ctrl_get_max_rsn(struct ctdb_context *ctdb, struct timeval timeout,
 /*
   set the rsn on non-empty records to the given rsn
  */
-int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
-			       uint32_t destnode, uint32_t db_id, uint64_t rsn)
+struct ctdb_client_control_state *ctdb_ctrl_set_rsn_nonempty_send(
+	struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, 
+	uint32_t destnode, uint32_t db_id, uint64_t rsn)
 {
 	TDB_DATA data;
-	int ret;
-	int32_t res;
 	struct ctdb_control_set_rsn_nonempty p;
 
 	memset(&p, 0, sizeof(p));
@@ -1997,43 +2037,89 @@ int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout
 	data.dptr = (uint8_t *)&p;
 	data.dsize = sizeof(p);
 
-	ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, NULL,
-			   NULL, &res, &timeout, NULL);
+	return ctdb_control_send(ctdb, destnode, 0, CTDB_CONTROL_SET_RSN_NONEMPTY, 0, data, mem_ctx,
+				 NULL, &timeout, NULL);
+}
+
+/*
+  set the rsn on non-empty records to the given rsn
+ */
+int ctdb_ctrl_set_rsn_nonempty_recv(struct ctdb_context *ctdb, 
+				    struct ctdb_client_control_state *state)
+{
+	int32_t res;
+	int ret;
+
+	ret = ctdb_control_recv(ctdb, state, NULL, NULL, &res, NULL);
 	if (ret != 0 || res != 0) {
 		DEBUG(0,(__location__ " ctdb_control for set_rsn_nonempty failed\n"));
 		return -1;
 	}
+	return 0;
+}
 
-	return 0;	
+/*
+  set the rsn on non-empty records to the given rsn
+ */
+int ctdb_ctrl_set_rsn_nonempty(struct ctdb_context *ctdb, struct timeval timeout, 
+			       uint32_t destnode, uint32_t db_id, uint64_t rsn)
+{
+	struct ctdb_client_control_state *state;
+	state = ctdb_ctrl_set_rsn_nonempty_send(ctdb, ctdb, timeout, destnode, db_id, rsn);
+	return ctdb_ctrl_set_rsn_nonempty_recv(ctdb, state);
 }
 
+
 /*
-  delete records which have a rsn below the given rsn
+  set the rsn on non-empty records to the given rsn
  */
-int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
-			     uint32_t destnode, uint32_t db_id, uint64_t rsn)
+struct ctdb_client_control_state *ctdb_ctrl_delete_low_rsn_send(
+	struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, 
+	uint32_t destnode, uint32_t db_id, uint64_t rsn)
 {
 	TDB_DATA data;
-	int ret;
-	int32_t res;
 	struct ctdb_control_delete_low_rsn p;
 
+	memset(&p, 0, sizeof(p));
 	p.db_id = db_id;
 	p.rsn = rsn;
 
 	data.dptr = (uint8_t *)&p;
 	data.dsize = sizeof(p);
 
-	ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, NULL,
-			   NULL, &res, &timeout, NULL);
+	return ctdb_control_send(ctdb, destnode, 0, CTDB_CONTROL_DELETE_LOW_RSN, 0, data, mem_ctx,
+				 NULL, &timeout, NULL);
+}
+
+/*
+  set the rsn on non-empty records to the given rsn
+ */
+int ctdb_ctrl_delete_low_rsn_recv(struct ctdb_context *ctdb, 
+				    struct ctdb_client_control_state *state)
+{
+	int32_t res;
+	int ret;
+
+	ret = ctdb_control_recv(ctdb, state, NULL, NULL, &res, NULL);
 	if (ret != 0 || res != 0) {
 		DEBUG(0,(__location__ " ctdb_control for delete_low_rsn failed\n"));
 		return -1;
 	}
+	return 0;
+}
 
-	return 0;	
+/*
+  set the rsn on non-empty records to the given rsn
+ */
+int ctdb_ctrl_delete_low_rsn(struct ctdb_context *ctdb, struct timeval timeout, 
+			       uint32_t destnode, uint32_t db_id, uint64_t rsn)
+{
+	struct ctdb_client_control_state *state;
+	state = ctdb_ctrl_delete_low_rsn_send(ctdb, ctdb, timeout, destnode, db_id, rsn);
+	return ctdb_ctrl_delete_low_rsn_recv(ctdb, state);
 }
 
+
 /* 
   sent to a node to make it take over an ip address
 */
-- 
cgit