diff options
Diffstat (limited to 'ctdb/common/ctdb_client.c')
-rw-r--r-- | ctdb/common/ctdb_client.c | 239 |
1 files changed, 133 insertions, 106 deletions
diff --git a/ctdb/common/ctdb_client.c b/ctdb/common/ctdb_client.c index 4f63da5e6c..94dec19d56 100644 --- a/ctdb/common/ctdb_client.c +++ b/ctdb/common/ctdb_client.c @@ -48,6 +48,20 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, ctdb->num_connected = r->num_connected; } +enum fetch_lock_state { CTDB_FETCH_LOCK_WAIT, CTDB_FETCH_LOCK_DONE, CTDB_FETCH_LOCK_ERROR }; + +/* + state of a in-progress ctdb call +*/ +struct ctdb_fetch_lock_state { + enum fetch_lock_state state; + struct ctdb_db_context *ctdb_db; + struct ctdb_reply_fetch_lock *r; + struct ctdb_ltdb_header header; +}; + + + /* called in the client when we receive a CTDB_REPLY_FETCH_LOCK from the daemon @@ -56,26 +70,19 @@ static void ctdb_reply_connect_wait(struct ctdb_context *ctdb, */ void ctdb_reply_fetch_lock(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_fetch_lock *c = (struct ctdb_reply_fetch_lock *)hdr; - struct ctdb_call_state *state; + struct ctdb_reply_fetch_lock *r = (struct ctdb_reply_fetch_lock *)hdr; + struct ctdb_fetch_lock_state *state; state = idr_find(ctdb->idr, hdr->reqid); if (state == NULL) return; - state->call.reply_data.dptr = c->data; - state->call.reply_data.dsize = c->datalen; - state->call.status = c->state; - - talloc_steal(state, c); + state->r = talloc_steal(state, r); /* get an extra reference here - this prevents the free in ctdb_recv_pkt() from freeing the data */ - (void)talloc_reference(state, c); + (void)talloc_reference(state, r); - state->state = CTDB_CALL_DONE; - if (state->async.fn) { - state->async.fn(state); - } + state->state = CTDB_FETCH_LOCK_DONE; } /* @@ -165,7 +172,7 @@ static int ux_socket_connect(struct ctdb_context *ctdb) This is called when the program wants to wait for a ctdb_call to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_client_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) +int ctdb_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { struct ctdb_record_handle *rec; @@ -223,8 +230,8 @@ static int ctdb_client_call_destructor(struct ctdb_call_state *state) This constructs a ctdb_call request and queues it for processing. This call never blocks. */ -struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, - struct ctdb_call *call) +struct ctdb_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, + struct ctdb_call *call) { struct ctdb_call_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; @@ -316,14 +323,25 @@ struct ctdb_call_state *ctdb_client_call_send(struct ctdb_db_context *ctdb_db, } +/* + full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv() +*/ +int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call) +{ + struct ctdb_call_state *state; + + state = ctdb_call_send(ctdb_db, call); + return ctdb_call_recv(state, call); +} + /* tell the daemon what messaging srvid we will use, and register the message handler function in the client */ -int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) +int ctdb_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, + ctdb_message_fn_t handler, + void *private_data) { struct ctdb_req_register c; @@ -352,26 +370,10 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid, } - -/* - setup handler for receipt of ctdb messages from ctdb_send_message() -*/ -int ctdb_set_message_handler(struct ctdb_context *ctdb, - uint32_t srvid, - ctdb_message_fn_t handler, - void *private_data) -{ - if (ctdb->flags & CTDB_FLAG_DAEMON_MODE) { - return ctdb_client_set_message_handler(ctdb, srvid, handler, private_data); - } - return ctdb_daemon_set_message_handler(ctdb, srvid, handler, private_data); -} - - /* send a message - from client context */ -int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, +int ctdb_send_message(struct ctdb_context *ctdb, uint32_t vnn, uint32_t srvid, TDB_DATA data) { struct ctdb_req_message *r; @@ -405,7 +407,7 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn, /* wait for all nodes to be connected - from client */ -static void ctdb_client_connect_wait(struct ctdb_context *ctdb) +void ctdb_connect_wait(struct ctdb_context *ctdb) { struct ctdb_req_connect_wait r; int res; @@ -428,25 +430,11 @@ static void ctdb_client_connect_wait(struct ctdb_context *ctdb) ctdb_daemon_connect_wait(ctdb); } -/* - wait for all nodes to be connected -*/ -void ctdb_connect_wait(struct ctdb_context *ctdb) -{ - if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) { - ctdb_daemon_connect_wait(ctdb); - return; - } - - ctdb_client_connect_wait(ctdb); -} - - -struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key) +static struct ctdb_fetch_lock_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx, + TDB_DATA key) { - struct ctdb_call_state *state; + struct ctdb_fetch_lock_state *state; struct ctdb_context *ctdb = ctdb_db->ctdb; struct ctdb_req_fetch_lock *req; int len, res; @@ -456,24 +444,23 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb ux_socket_connect(ctdb); } - state = talloc_zero(ctdb_db, struct ctdb_call_state); + state = talloc_zero(ctdb_db, struct ctdb_fetch_lock_state); if (state == NULL) { printf("failed to allocate state\n"); return NULL; } - state->state = CTDB_CALL_WAIT; + state->state = CTDB_FETCH_LOCK_WAIT; state->ctdb_db = ctdb_db; len = offsetof(struct ctdb_req_fetch_lock, key) + key.dsize; - state->c = ctdbd_allocate_pkt(ctdb, len); - if (state->c == NULL) { + req = ctdbd_allocate_pkt(ctdb, len); + if (req == NULL) { printf("failed to allocate packet\n"); return NULL; } - bzero(state->c, len); - talloc_set_name_const(state->c, "ctdbd req_fetch_lock packet"); - talloc_steal(state, state->c); + ZERO_STRUCT(*req); + talloc_set_name_const(req, "ctdbd req_fetch_lock packet"); + talloc_steal(state, req); - req = (struct ctdb_req_fetch_lock *)state->c; req->hdr.length = len; req->hdr.ctdb_magic = CTDB_MAGIC; req->hdr.ctdb_version = CTDB_VERSION; @@ -488,8 +475,6 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb return NULL; } - talloc_free(req); - return state; } @@ -500,77 +485,119 @@ struct ctdb_call_state *ctdb_client_fetch_lock_send(struct ctdb_db_context *ctdb This is called when the program wants to wait for a ctdb_fetch_lock to complete and get the results. This call will block unless the call has already completed. */ -int ctdb_client_fetch_lock_recv(struct ctdb_call_state *state, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data) +int ctdb_client_fetch_lock_recv(struct ctdb_fetch_lock_state *state, TALLOC_CTX *mem_ctx, + TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data) { - while (state->state < CTDB_CALL_DONE) { + while (state->state < CTDB_FETCH_LOCK_DONE) { event_loop_once(state->ctdb_db->ctdb->ev); } - if (state->state != CTDB_CALL_DONE) { - ctdb_set_error(state->node->ctdb, "%s", state->errmsg); + if (state->state != CTDB_FETCH_LOCK_DONE) { talloc_free(state); return -1; } - data->dsize = state->call.reply_data.dsize; - data->dptr = talloc_memdup(mem_ctx, state->call.reply_data.dptr, data->dsize); + *header = state->r->header; + data->dsize = state->r->datalen; + data->dptr = talloc_memdup(mem_ctx, state->r->data, data->dsize); + + talloc_free(state); return 0; } -int ctdb_client_fetch_lock(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx, - TDB_DATA key, - TDB_DATA *data) +/* + cancel a ctdb_fetch_lock operation, releasing the lock + */ +static int fetch_lock_destructor(struct ctdb_record_handle *h) +{ + ctdb_ltdb_unlock(h->ctdb_db, h->key); + return 0; +} + +/* + get a lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data) { - struct ctdb_ltdb_header header; int ret; + struct ctdb_record_handle *h; + struct ctdb_fetch_lock_state *state; + + /* + procedure is as follows: + + 1) get the chain lock. + 2) check if we are dmaster + 3) if we are the dmaster then return handle + 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for + reply from ctdbd + 5) when we get the reply, we are now dmaster, update vnn in header + 6) return handle + */ + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { + return NULL; + } + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); + return NULL; + } + h->data = data; + + /* step 1 - get the chain lock */ ret = ctdb_ltdb_lock(ctdb_db, key); if (ret != 0) { printf("failed to lock ltdb record\n"); - return FETCH_LOCK_LOCKFAILED; + talloc_free(h); + return NULL; } - ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, data); + talloc_set_destructor(h, fetch_lock_destructor); + + ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, ctdb_db, data); if (ret != 0) { - ctdb_ltdb_unlock(ctdb_db, key); - return FETCH_LOCK_FETCHFAILED; + talloc_free(h); + return NULL; } + /* step 2 - check if we are the dmaster */ + if (h->header.dmaster == ctdb_db->ctdb->vnn) { + return h; + } - if (header.dmaster != ctdb_db->ctdb->vnn) { - struct ctdb_call_state *state; + /* we're not the dmaster - ask the ctdb daemon to make us dmaster */ + state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); + ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, &h->header, data); + if (ret != 0) { + talloc_free(h); + return NULL; + } - state = ctdb_client_fetch_lock_send(ctdb_db, mem_ctx, key); - ret = ctdb_client_fetch_lock_recv(state, mem_ctx, key, data); - if (ret != 0) { - ctdb_ltdb_unlock(ctdb_db, key); - return FETCH_LOCK_DMASTERFAILED; - } + /* the record is now local, and locked. update the record on disk + to mark us as the dmaster*/ + h->header.dmaster = ctdb_db->ctdb->vnn; + ret = ctdb_ltdb_store(ctdb_db, key, &h->header, *data); + if (ret != 0) { + printf("bugger - we're in real trouble now! can't update record to mark us as dmasterx\n"); + talloc_free(h); + return NULL; } - return 0; + /* give the caller a handle to be used for ctdb_record_store() or a cancel via + a talloc_free() */ + return h; } /* - a helper function for the client that will store the new data for the - record and release the tdb chainlock + store some data to the record that was locked with ctdb_fetch_lock() */ -int ctdb_client_store_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key, TDB_DATA data) +int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) { - int ret; - struct ctdb_ltdb_header header; - - /* should be avoided if possible hang header off rec ? */ - ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL); - if (ret) { - ctdb_set_error(ctdb_db->ctdb, "Fetch of locally held record failed"); - return ret; - } - - ret = ctdb_ltdb_store(ctdb_db, key, &header, data); - - ctdb_ltdb_unlock(ctdb_db, key); - - return ret; + return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data); } |