diff options
-rw-r--r-- | ctdb/client/ctdb_client.c | 207 | ||||
-rw-r--r-- | ctdb/include/ctdb_client.h | 2 | ||||
-rw-r--r-- | ctdb/include/ctdb_private.h | 6 |
3 files changed, 212 insertions, 3 deletions
diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c index bcf9b2e89a..20b4a74842 100644 --- a/ctdb/client/ctdb_client.c +++ b/ctdb/client/ctdb_client.c @@ -585,6 +585,48 @@ static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA } /* + try to fetch a readonly copy of a record + */ +static int +ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data) +{ + int ret; + + struct ctdb_call call; + ZERO_STRUCT(call); + + call.call_id = CTDB_FETCH_WITH_HEADER_FUNC; + call.call_data.dptr = NULL; + call.call_data.dsize = 0; + call.key = key; + call.flags = CTDB_WANT_READONLY; + ret = ctdb_call(ctdb_db, &call); + + if (ret != 0) { + return -1; + } + if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) { + return -1; + } + + *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header)); + if (*hdr == NULL) { + talloc_free(call.reply_data.dptr); + return -1; + } + + data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header); + data->dptr = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize); + if (data->dptr == NULL) { + talloc_free(call.reply_data.dptr); + talloc_free(hdr); + return -1; + } + + return 0; +} + +/* get a lock on a record, and return the records data. Blocks until it gets the lock */ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, @@ -661,6 +703,170 @@ again: } /* + get a readonly lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle * +ctdb_fetch_readonly_lock( + struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data, + int read_only) +{ + int ret; + struct ctdb_record_handle *h; + struct ctdb_ltdb_header *roheader = NULL; + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { + return NULL; + } + + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); + return NULL; + } + h->data = data; + + data->dptr = NULL; + data->dsize = 0; + + +again: + talloc_free(roheader); + roheader = NULL; + + talloc_free(data->dptr); + data->dptr = NULL; + data->dsize = 0; + + /* Lock the record/chain */ + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } + + talloc_set_destructor(h, fetch_lock_destructor); + + /* Check if record exists yet in the TDB */ + ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + + /* if we are dmaster, just return the handle */ + if (h->header.dmaster == ctdb_db->ctdb->pnn) { + return h; + } + + if (read_only != 0) { + TDB_DATA rodata = {NULL, 0}; + + if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY) + || (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) { + return h; + } + + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata); + if (ret != 0) { + DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock: failed. force migration and try again\n")); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) { + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } + + ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + if (h->header.rsn >= roheader->rsn) { + DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n")); + ctdb_ltdb_unlock(ctdb_db, key); + + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + return h; + } + + /* we are not dmaster and this was not a request for a readonly lock + * so unlock the record, migrate it and try again + */ + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; +} + +/* store some data to the record that was locked with ctdb_fetch_lock() */ int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) @@ -685,6 +891,7 @@ int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, call.call_id = CTDB_FETCH_FUNC; call.call_data.dptr = NULL; call.call_data.dsize = 0; + call.key = key; ret = ctdb_call(ctdb_db, &call); diff --git a/ctdb/include/ctdb_client.h b/ctdb/include/ctdb_client.h index 0699bf1cfa..720f073361 100644 --- a/ctdb/include/ctdb_client.h +++ b/ctdb/include/ctdb_client.h @@ -174,6 +174,8 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn, struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data); +struct ctdb_record_handle *ctdb_fetch_readonly_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data, int read_only); + int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data); int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index 4937bc8762..4cf3709fa3 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -669,9 +669,9 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db, int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data); int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key); -int ctdb_ltdb_fetch_readonly(struct ctdb_db_context *ctdb_db, - TDB_DATA key, struct ctdb_ltdb_header *header, - TALLOC_CTX *mem_ctx, TDB_DATA *data); +int ctdb_ltdb_fetch_readonly(struct ctdb_db_context *ctdb_db, + TDB_DATA key, struct ctdb_ltdb_header *header, + TALLOC_CTX *mem_ctx, TDB_DATA *data); int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb, struct ctdb_req_control *c, TDB_DATA recdata); |