summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ctdb/client/ctdb_client.c207
-rw-r--r--ctdb/include/ctdb_client.h2
-rw-r--r--ctdb/include/ctdb_private.h6
3 files changed, 212 insertions, 3 deletions
diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c
index bcf9b2e89a..20b4a74842 100644
--- a/ctdb/client/ctdb_client.c
+++ b/ctdb/client/ctdb_client.c
@@ -585,6 +585,48 @@ static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA
}
/*
+ try to fetch a readonly copy of a record
+ */
+static int
+ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
+{
+ int ret;
+
+ struct ctdb_call call;
+ ZERO_STRUCT(call);
+
+ call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
+ call.call_data.dptr = NULL;
+ call.call_data.dsize = 0;
+ call.key = key;
+ call.flags = CTDB_WANT_READONLY;
+ ret = ctdb_call(ctdb_db, &call);
+
+ if (ret != 0) {
+ return -1;
+ }
+ if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ return -1;
+ }
+
+ *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
+ if (*hdr == NULL) {
+ talloc_free(call.reply_data.dptr);
+ return -1;
+ }
+
+ data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
+ if (data->dptr == NULL) {
+ talloc_free(call.reply_data.dptr);
+ talloc_free(hdr);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
get a lock on a record, and return the records data. Blocks until it gets the lock
*/
struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
@@ -661,6 +703,170 @@ again:
}
/*
+ get a readonly lock on a record, and return the records data. Blocks until it gets the lock
+ */
+struct ctdb_record_handle *
+ctdb_fetch_readonly_lock(
+ struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data,
+ int read_only)
+{
+ int ret;
+ struct ctdb_record_handle *h;
+ struct ctdb_ltdb_header *roheader = NULL;
+
+ h = talloc_zero(mem_ctx, struct ctdb_record_handle);
+ if (h == NULL) {
+ return NULL;
+ }
+
+ h->ctdb_db = ctdb_db;
+ h->key = key;
+ h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
+ if (h->key.dptr == NULL) {
+ talloc_free(h);
+ return NULL;
+ }
+ h->data = data;
+
+ data->dptr = NULL;
+ data->dsize = 0;
+
+
+again:
+ talloc_free(roheader);
+ roheader = NULL;
+
+ talloc_free(data->dptr);
+ data->dptr = NULL;
+ data->dsize = 0;
+
+ /* Lock the record/chain */
+ ret = ctdb_ltdb_lock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ talloc_set_destructor(h, fetch_lock_destructor);
+
+ /* Check if record exists yet in the TDB */
+ ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+ }
+
+
+ /* if we are dmaster, just return the handle */
+ if (h->header.dmaster == ctdb_db->ctdb->pnn) {
+ return h;
+ }
+
+ if (read_only != 0) {
+ TDB_DATA rodata = {NULL, 0};
+
+ if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
+ || (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+ return h;
+ }
+
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock: failed. force migration and try again\n"));
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ ret = ctdb_ltdb_lock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ if (h->header.rsn >= roheader->rsn) {
+ DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+ return h;
+ }
+
+ /* we are not dmaster and this was not a request for a readonly lock
+ * so unlock the record, migrate it and try again
+ */
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+}
+
+/*
store some data to the record that was locked with ctdb_fetch_lock()
*/
int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
@@ -685,6 +891,7 @@ int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
call.call_id = CTDB_FETCH_FUNC;
call.call_data.dptr = NULL;
call.call_data.dsize = 0;
+ call.key = key;
ret = ctdb_call(ctdb_db, &call);
diff --git a/ctdb/include/ctdb_client.h b/ctdb/include/ctdb_client.h
index 0699bf1cfa..720f073361 100644
--- a/ctdb/include/ctdb_client.h
+++ b/ctdb/include/ctdb_client.h
@@ -174,6 +174,8 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data);
+struct ctdb_record_handle *ctdb_fetch_readonly_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, TDB_DATA key, TDB_DATA *data, int read_only);
+
int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data);
int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index 4937bc8762..4cf3709fa3 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -669,9 +669,9 @@ int ctdb_ltdb_fetch(struct ctdb_db_context *ctdb_db,
int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
struct ctdb_ltdb_header *header, TDB_DATA data);
int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key);
-int ctdb_ltdb_fetch_readonly(struct ctdb_db_context *ctdb_db,
- TDB_DATA key, struct ctdb_ltdb_header *header,
- TALLOC_CTX *mem_ctx, TDB_DATA *data);
+int ctdb_ltdb_fetch_readonly(struct ctdb_db_context *ctdb_db,
+ TDB_DATA key, struct ctdb_ltdb_header *header,
+ TALLOC_CTX *mem_ctx, TDB_DATA *data);
int32_t ctdb_control_start_persistent_update(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata);