summaryrefslogtreecommitdiffstats
path: root/ctdb/client/ctdb_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'ctdb/client/ctdb_client.c')
-rw-r--r--ctdb/client/ctdb_client.c375
1 files changed, 371 insertions, 4 deletions
diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c
index 0828e989d2..89eeb4836a 100644
--- a/ctdb/client/ctdb_client.c
+++ b/ctdb/client/ctdb_client.c
@@ -72,7 +72,7 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
*/
int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
- TDB_DATA *data)
+ TDB_DATA *data, bool updatetdb)
{
struct ctdb_call_info *c;
struct ctdb_registered_call *fn;
@@ -89,6 +89,7 @@ int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
c->new_data = NULL;
c->reply_data = NULL;
c->status = 0;
+ c->header = header;
for (fn=ctdb_db->calls;fn;fn=fn->next) {
if (fn->id == call->call_id) break;
@@ -110,7 +111,7 @@ int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
c->new_data = &c->record_data;
}
- if (c->new_data) {
+ if (c->new_data && updatetdb) {
/* XXX check that we always have the lock here? */
if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
@@ -345,7 +346,7 @@ int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
call->status = state->call->status;
talloc_free(state);
- return 0;
+ return call->status;
}
@@ -386,7 +387,7 @@ static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db
*(state->call) = *call;
state->ctdb_db = ctdb_db;
- ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
+ ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
return state;
}
@@ -421,6 +422,10 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
+ if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+ ret = -1;
+ }
+
if (ret == 0 && header.dmaster == ctdb->pnn) {
state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
talloc_free(data.dptr);
@@ -584,6 +589,48 @@ static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA
}
/*
+ try to fetch a readonly copy of a record
+ */
+static int
+ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
+{
+ int ret;
+
+ struct ctdb_call call;
+ ZERO_STRUCT(call);
+
+ call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
+ call.call_data.dptr = NULL;
+ call.call_data.dsize = 0;
+ call.key = key;
+ call.flags = CTDB_WANT_READONLY;
+ ret = ctdb_call(ctdb_db, &call);
+
+ if (ret != 0) {
+ return -1;
+ }
+ if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ return -1;
+ }
+
+ *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
+ if (*hdr == NULL) {
+ talloc_free(call.reply_data.dptr);
+ return -1;
+ }
+
+ data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
+ data->dptr = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
+ if (data->dptr == NULL) {
+ talloc_free(call.reply_data.dptr);
+ talloc_free(hdr);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
get a lock on a record, and return the records data. Blocks until it gets the lock
*/
struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
@@ -660,6 +707,185 @@ again:
}
/*
+ get a readonly lock on a record, and return the records data. Blocks until it gets the lock
+ */
+struct ctdb_record_handle *
+ctdb_fetch_readonly_lock(
+ struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data,
+ int read_only)
+{
+ int ret;
+ struct ctdb_record_handle *h;
+ struct ctdb_ltdb_header *roheader = NULL;
+
+ h = talloc_zero(mem_ctx, struct ctdb_record_handle);
+ if (h == NULL) {
+ return NULL;
+ }
+
+ h->ctdb_db = ctdb_db;
+ h->key = key;
+ h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
+ if (h->key.dptr == NULL) {
+ talloc_free(h);
+ return NULL;
+ }
+ h->data = data;
+
+ data->dptr = NULL;
+ data->dsize = 0;
+
+
+again:
+ talloc_free(roheader);
+ roheader = NULL;
+
+ talloc_free(data->dptr);
+ data->dptr = NULL;
+ data->dsize = 0;
+
+ /* Lock the record/chain */
+ ret = ctdb_ltdb_lock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ talloc_set_destructor(h, fetch_lock_destructor);
+
+ /* Check if record exists yet in the TDB */
+ ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+ }
+
+ /* if this is a request for read/write and we have delegations
+ we have to revoke all delegations first
+ */
+ if ((read_only == 0)
+ && (h->header.dmaster == ctdb_db->ctdb->pnn)
+ && (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+ }
+
+ /* if we are dmaster, just return the handle */
+ if (h->header.dmaster == ctdb_db->ctdb->pnn) {
+ return h;
+ }
+
+ if (read_only != 0) {
+ TDB_DATA rodata = {NULL, 0};
+
+ if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
+ || (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+ return h;
+ }
+
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock: failed. force migration and try again\n"));
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ ret = ctdb_ltdb_lock(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data);
+ if (ret != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ if (h->header.rsn >= roheader->rsn) {
+ DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n"));
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+
+ if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ goto again;
+ }
+ return h;
+ }
+
+ /* we are not dmaster and this was not a request for a readonly lock
+ * so unlock the record, migrate it and try again
+ */
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+}
+
+/*
store some data to the record that was locked with ctdb_fetch_lock()
*/
int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
@@ -684,6 +910,7 @@ int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
call.call_id = CTDB_FETCH_FUNC;
call.call_data.dptr = NULL;
call.call_data.dsize = 0;
+ call.key = key;
ret = ctdb_call(ctdb_db, &call);
@@ -1694,6 +1921,27 @@ static int ctdb_fetch_func(struct ctdb_call_info *call)
}
/*
+ this is a plain fetch procedure that all databases support
+ this returns the full record including the ltdb header
+*/
+static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
+{
+ call->reply_data = talloc(call, TDB_DATA);
+ if (call->reply_data == NULL) {
+ return -1;
+ }
+ call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
+ call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize);
+ if (call->reply_data->dptr == NULL) {
+ return -1;
+ }
+ memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
+ memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
+
+ return 0;
+}
+
+/*
attach to a specific database - client call
*/
struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
@@ -1762,6 +2010,7 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
/* add well known functions */
ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
+ ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
return ctdb_db;
}
@@ -1927,6 +2176,15 @@ int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, v
fprintf(f, "dmaster: %u\n", h->dmaster);
fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
+ fprintf(f, "flags: 0x%08x", h->flags);
+ if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
+ if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
+ if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
+ if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
+ if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
+ if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
+ if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
+ fprintf(f, "\n");
fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
for (i=sizeof(*h);i<data.dsize;i++) {
@@ -4272,3 +4530,112 @@ struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handl
return &h->header;
}
+
+
+struct ctdb_client_control_state *
+ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ struct ctdb_client_control_state *handle;
+ struct ctdb_marshall_buffer *m;
+ struct ctdb_rec_data *rec;
+ TDB_DATA outdata;
+
+ m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
+ if (m == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
+ return NULL;
+ }
+
+ m->db_id = ctdb_db->db_id;
+
+ rec = ctdb_marshall_record(m, 0, key, header, data);
+ if (rec == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
+ talloc_free(m);
+ return NULL;
+ }
+ m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
+ if (m == NULL) {
+ DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
+ talloc_free(m);
+ return NULL;
+ }
+ m->count++;
+ memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
+
+
+ outdata.dptr = (uint8_t *)m;
+ outdata.dsize = talloc_get_size(m);
+
+ handle = ctdb_control_send(ctdb, destnode, 0,
+ CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
+ mem_ctx, &timeout, NULL);
+ talloc_free(m);
+ return handle;
+}
+
+int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
+{
+ int ret;
+ int32_t res;
+
+ ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
+ if ( (ret != 0) || (res != 0) ){
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+ struct ctdb_client_control_state *state;
+
+ state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
+ return ctdb_ctrl_updaterecord_recv(ctdb, state);
+}
+
+
+
+
+
+
+/*
+ set a database to be readonly
+ */
+struct ctdb_client_control_state *
+ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
+{
+ TDB_DATA data;
+
+ data.dptr = (uint8_t *)&dbid;
+ data.dsize = sizeof(dbid);
+
+ return ctdb_control_send(ctdb, destnode, 0,
+ CTDB_CONTROL_SET_DB_READONLY, 0, data,
+ ctdb, NULL, NULL);
+}
+
+int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
+{
+ int ret;
+ int32_t res;
+
+ ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
+ if (ret != 0 || res != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed ret:%d res:%d\n", ret, res));
+ return -1;
+ }
+
+ return 0;
+}
+
+int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
+{
+ struct ctdb_client_control_state *state;
+
+ state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
+ return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
+}