summaryrefslogtreecommitdiffstats
path: root/ctdb/common/ctdb_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/common/ctdb_client.c')
-rw-r--r--ctdb/common/ctdb_client.c239
1 files changed, 133 insertions, 106 deletions
diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c
index 4f63da5e6c..94dec19d56 100644
--- a/ctdb/common/ctdb_client.c
+++ b/ctdb/common/ctdb_client.c
@@ -48,6 +48,20 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
ctdb->num_connected = r->num_connected;
}
+enum fetch_lock_state { CTDB_FETCH_LOCK_WAIT, CTDB_FETCH_LOCK_DONE, CTDB_FETCH_LOCK_ERROR };
+
+/*
+ state of a in-progress ctdb call
+*/
+struct ctdb_fetch_lock_state {
+ enum fetch_lock_state state;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_reply_fetch_lock *r;
+ struct ctdb_ltdb_header header;
+};
+
+
+
/*
called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon
@@ -56,26 +70,19 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
*/
void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
- struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr;
- struct ctdb_call_state *state;
+ struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr;
+ struct ctdb_fetch_lock_state *state;
state = idr_find(ctdb->idr, hdr->reqid);
if (state == NULL) return;
- state->call.reply_data.dptr = c->data;
- state->call.reply_data.dsize = c->datalen;
- state->call.status = c->state;
-
- talloc_steal(state, c);
+ state->r = talloc_steal(state, r);
/* get an extra reference here - this prevents the free in ctdb_recv_pkt()
from freeing the data */
- (void)talloc_reference(state, c);
+ (void)talloc_reference(state, r);
- state->state = CTDB_CALL_DONE;
- if (state->async.fn) {
- state->async.fn(state);
- }
+ state->state = CTDB_FETCH_LOCK_DONE;
}
/*
@@ -165,7 +172,7 @@ static int ux_socket_connect(struct ctdb_context *ctdb)
This is called when the program wants to wait for a ctdb_call to complete and get the
results. This call will block unless the call has already completed.
*/
-int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
+int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
{
struct ctdb_record_handle *rec;
@@ -223,8 +230,8 @@ static int ctdb_client_call_destructor(struct ctdb_call_state *state)
This constructs a ctdb_call request and queues it for processing.
This call never blocks.
*/
-struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db,
- struct ctdb_call *call)
+struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
+ struct ctdb_call *call)
{
struct ctdb_call_state *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
@@ -316,14 +323,25 @@ struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db,
}
+/*
+ full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
+*/
+int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+ struct ctdb_call_state *state;
+
+ state = ctdb_call_send(ctdb_db, call);
+ return ctdb_call_recv(state, call);
+}
+
/*
tell the daemon what messaging srvid we will use, and register the message
handler function in the client
*/
-int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
- ctdb_message_fn_t handler,
- void *private_data)
+int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
+ ctdb_message_fn_t handler,
+ void *private_data)
{
struct ctdb_req_register c;
@@ -352,26 +370,10 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
}
-
-/*
- setup handler for receipt of ctdb messages from ctdb_send_message()
-*/
-int ctdb_set_message_handler(struct ctdb_context *ctdb,
- uint32_t srvid,
- ctdb_message_fn_t handler,
- void *private_data)
-{
- if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) {
- return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data);
- }
- return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data);
-}
-
-
/*
send a message - from client context
*/
-int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
+int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn,
uint32_t srvid, TDB_DATA data)
{
struct ctdb_req_message *r;
@@ -405,7 +407,7 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
/*
wait for all nodes to be connected - from client
*/
-static void ctdb_client_connect_wait(struct ctdb_context *ctdb)
+void ctdb_connect_wait(struct ctdb_context *ctdb)
{
struct ctdb_req_connect_wait r;
int res;
@@ -428,25 +430,11 @@ static void ctdb_client_connect_wait(struct ctdb_context *ctdb)
ctdb_daemon_connect_wait(ctdb);
}
-/*
- wait for all nodes to be connected
-*/
-void ctdb_connect_wait(struct ctdb_context *ctdb)
-{
- if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) {
- ctdb_daemon_connect_wait(ctdb);
- return;
- }
-
- ctdb_client_connect_wait(ctdb);
-}
-
-
-struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
- TALLOC_CTX *mem_ctx,
- TDB_DATA key)
+static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key)
{
- struct ctdb_call_state *state;
+ struct ctdb_fetch_lock_state *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
struct ctdb_req_fetch_lock *req;
int len, res;
@@ -456,24 +444,23 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb
ux_socket_connect(ctdb);
}
- state = talloc_zero(ctdb_db, struct ctdb_call_state);
+ state = talloc_zero(ctdb_db, struct ctdb_fetch_lock_state);
if (state == NULL) {
printf("failed to allocate state\n");
return NULL;
}
- state->state = CTDB_CALL_WAIT;
+ state->state = CTDB_FETCH_LOCK_WAIT;
state->ctdb_db = ctdb_db;
len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize;
- state->c = ctdbd_allocate_pkt(ctdb, len);
- if (state->c == NULL) {
+ req = ctdbd_allocate_pkt(ctdb, len);
+ if (req == NULL) {
printf("failed to allocate packet\n");
return NULL;
}
- bzero(state->c, len);
- talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet");
- talloc_steal(state, state->c);
+ ZERO_STRUCT(*req);
+ talloc_set_name_const(req, "ctdbd req_fetch_lock packet");
+ talloc_steal(state, req);
- req = (struct ctdb_req_fetch_lock *)state->c;
req->hdr.length = len;
req->hdr.ctdb_magic = CTDB_MAGIC;
req->hdr.ctdb_version = CTDB_VERSION;
@@ -488,8 +475,6 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb
return NULL;
}
- talloc_free(req);
-
return state;
}
@@ -500,77 +485,119 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb
This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the
results. This call will block unless the call has already completed.
*/
-int ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data)
+int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
{
- while (state->state < CTDB_CALL_DONE) {
+ while (state->state < CTDB_FETCH_LOCK_DONE) {
event_loop_once(state->ctdb_db->ctdb->ev);
}
- if (state->state != CTDB_CALL_DONE) {
- ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+ if (state->state != CTDB_FETCH_LOCK_DONE) {
talloc_free(state);
return -1;
}
- data->dsize = state->call.reply_data.dsize;
- data->dptr = talloc_memdup(mem_ctx, state->call.reply_data.dptr, data->dsize);
+ *header = state->r->header;
+ data->dsize = state->r->datalen;
+ data->dptr = talloc_memdup(mem_ctx, state->r->data, data->dsize);
+
+ talloc_free(state);
return 0;
}
-int ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db,
- TALLOC_CTX *mem_ctx,
- TDB_DATA key,
- TDB_DATA *data)
+/*
+ cancel a ctdb_fetch_lock operation, releasing the lock
+ */
+static int fetch_lock_destructor(struct ctdb_record_handle *h)
+{
+ ctdb_ltdb_unlock(h->ctdb_db, h->key);
+ return 0;
+}
+
+/*
+ get a lock on a record, and return the records data. Blocks until it gets the lock
+ */
+struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
{
- struct ctdb_ltdb_header header;
int ret;
+ struct ctdb_record_handle *h;
+ struct ctdb_fetch_lock_state *state;
+
+ /*
+ procedure is as follows:
+
+ 1) get the chain lock.
+ 2) check if we are dmaster
+ 3) if we are the dmaster then return handle
+ 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
+ reply from ctdbd
+ 5) when we get the reply, we are now dmaster, update vnn in header
+ 6) return handle
+ */
+
+ h = talloc_zero(mem_ctx, struct ctdb_record_handle);
+ if (h == NULL) {
+ return NULL;
+ }
+ h->ctdb_db = ctdb_db;
+ h->key = key;
+ h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
+ if (h->key.dptr == NULL) {
+ talloc_free(h);
+ return NULL;
+ }
+ h->data = data;
+
+ /* step 1 - get the chain lock */
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
printf("failed to lock ltdb record\n");
- return FETCH_LOCK_LOCKFAILED;
+ talloc_free(h);
+ return NULL;
}
- ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, data);
+ talloc_set_destructor(h, fetch_lock_destructor);
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, ctdb_db, data);
if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, key);
- return FETCH_LOCK_FETCHFAILED;
+ talloc_free(h);
+ return NULL;
}
+ /* step 2 - check if we are the dmaster */
+ if (h->header.dmaster == ctdb_db->ctdb->vnn) {
+ return h;
+ }
- if (header.dmaster != ctdb_db->ctdb->vnn) {
- struct ctdb_call_state *state;
+ /* we're not the dmaster - ask the ctdb daemon to make us dmaster */
+ state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
+ ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, &h->header, data);
+ if (ret != 0) {
+ talloc_free(h);
+ return NULL;
+ }
- state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key);
- ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data);
- if (ret != 0) {
- ctdb_ltdb_unlock(ctdb_db, key);
- return FETCH_LOCK_DMASTERFAILED;
- }
+ /* the record is now local, and locked. update the record on disk
+ to mark us as the dmaster*/
+ h->header.dmaster = ctdb_db->ctdb->vnn;
+ ret = ctdb_ltdb_store(ctdb_db, key, &h->header, *data);
+ if (ret != 0) {
+ printf("bugger - we're in real trouble now! can't update record to mark us as dmasterx\n");
+ talloc_free(h);
+ return NULL;
}
- return 0;
+ /* give the caller a handle to be used for ctdb_record_store() or a cancel via
+ a talloc_free() */
+ return h;
}
/*
- a helper function for the client that will store the new data for the
- record and release the tdb chainlock
+ store some data to the record that was locked with ctdb_fetch_lock()
*/
-int ctdb_client_store_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key, TDB_DATA data)
+int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
{
- int ret;
- struct ctdb_ltdb_header header;
-
- /* should be avoided if possible hang header off rec ? */
- ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
- if (ret) {
- ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed");
- return ret;
- }
-
- ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
-
- ctdb_ltdb_unlock(ctdb_db, key);
-
- return ret;
+ return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
}