diff options
Diffstat (limited to 'ctdb/client/ctdb_client.c')
-rw-r--r-- | ctdb/client/ctdb_client.c | 375 |
1 files changed, 371 insertions, 4 deletions
diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c index 0828e989d2..89eeb4836a 100644 --- a/ctdb/client/ctdb_client.c +++ b/ctdb/client/ctdb_client.c @@ -72,7 +72,7 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb, */ int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx, - TDB_DATA *data) + TDB_DATA *data, bool updatetdb) { struct ctdb_call_info *c; struct ctdb_registered_call *fn; @@ -89,6 +89,7 @@ int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, c->new_data = NULL; c->reply_data = NULL; c->status = 0; + c->header = header; for (fn=ctdb_db->calls;fn;fn=fn->next) { if (fn->id == call->call_id) break; @@ -110,7 +111,7 @@ int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call, c->new_data = &c->record_data; } - if (c->new_data) { + if (c->new_data && updatetdb) { /* XXX check that we always have the lock here? */ if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) { ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n"); @@ -345,7 +346,7 @@ int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call) call->status = state->call->status; talloc_free(state); - return 0; + return call->status; } @@ -386,7 +387,7 @@ static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db *(state->call) = *call; state->ctdb_db = ctdb_db; - ret = ctdb_call_local(ctdb_db, state->call, header, state, data); + ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true); return state; } @@ -421,6 +422,10 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data); + if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) { + ret = -1; + } + if (ret == 0 && header.dmaster == ctdb->pnn) { state = ctdb_client_call_local_send(ctdb_db, call, &header, &data); talloc_free(data.dptr); @@ -584,6 +589,48 @@ static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA } /* + try to fetch a readonly copy of a record + */ +static int +ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data) +{ + int ret; + + struct ctdb_call call; + ZERO_STRUCT(call); + + call.call_id = CTDB_FETCH_WITH_HEADER_FUNC; + call.call_data.dptr = NULL; + call.call_data.dsize = 0; + call.key = key; + call.flags = CTDB_WANT_READONLY; + ret = ctdb_call(ctdb_db, &call); + + if (ret != 0) { + return -1; + } + if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) { + return -1; + } + + *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header)); + if (*hdr == NULL) { + talloc_free(call.reply_data.dptr); + return -1; + } + + data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header); + data->dptr = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize); + if (data->dptr == NULL) { + talloc_free(call.reply_data.dptr); + talloc_free(hdr); + return -1; + } + + return 0; +} + +/* get a lock on a record, and return the records data. Blocks until it gets the lock */ struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, @@ -660,6 +707,185 @@ again: } /* + get a readonly lock on a record, and return the records data. Blocks until it gets the lock + */ +struct ctdb_record_handle * +ctdb_fetch_readonly_lock( + struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, + TDB_DATA key, TDB_DATA *data, + int read_only) +{ + int ret; + struct ctdb_record_handle *h; + struct ctdb_ltdb_header *roheader = NULL; + + h = talloc_zero(mem_ctx, struct ctdb_record_handle); + if (h == NULL) { + return NULL; + } + + h->ctdb_db = ctdb_db; + h->key = key; + h->key.dptr = talloc_memdup(h, key.dptr, key.dsize); + if (h->key.dptr == NULL) { + talloc_free(h); + return NULL; + } + h->data = data; + + data->dptr = NULL; + data->dsize = 0; + + +again: + talloc_free(roheader); + roheader = NULL; + + talloc_free(data->dptr); + data->dptr = NULL; + data->dsize = 0; + + /* Lock the record/chain */ + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } + + talloc_set_destructor(h, fetch_lock_destructor); + + /* Check if record exists yet in the TDB */ + ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + /* if this is a request for read/write and we have delegations + we have to revoke all delegations first + */ + if ((read_only == 0) + && (h->header.dmaster == ctdb_db->ctdb->pnn) + && (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) { + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; + } + + /* if we are dmaster, just return the handle */ + if (h->header.dmaster == ctdb_db->ctdb->pnn) { + return h; + } + + if (read_only != 0) { + TDB_DATA rodata = {NULL, 0}; + + if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY) + || (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) { + return h; + } + + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata); + if (ret != 0) { + DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock: failed. force migration and try again\n")); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) { + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + ret = ctdb_ltdb_lock(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n")); + talloc_free(h); + return NULL; + } + + ret = ctdb_ltdb_fetch_readonly(ctdb_db, key, &h->header, h, data); + if (ret != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + if (h->header.rsn >= roheader->rsn) { + DEBUG(DEBUG_ERR,("READONLY RECORD: Too small RSN, migrate and try again\n")); + ctdb_ltdb_unlock(ctdb_db, key); + + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + + if (ctdb_ltdb_store(ctdb_db, key, roheader, rodata) != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + + goto again; + } + return h; + } + + /* we are not dmaster and this was not a request for a readonly lock + * so unlock the record, migrate it and try again + */ + ctdb_ltdb_unlock(ctdb_db, key); + ret = ctdb_client_force_migration(ctdb_db, key); + if (ret != 0) { + DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n")); + talloc_free(h); + return NULL; + } + goto again; +} + +/* store some data to the record that was locked with ctdb_fetch_lock() */ int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data) @@ -684,6 +910,7 @@ int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, call.call_id = CTDB_FETCH_FUNC; call.call_data.dptr = NULL; call.call_data.dsize = 0; + call.key = key; ret = ctdb_call(ctdb_db, &call); @@ -1694,6 +1921,27 @@ static int ctdb_fetch_func(struct ctdb_call_info *call) } /* + this is a plain fetch procedure that all databases support + this returns the full record including the ltdb header +*/ +static int ctdb_fetch_with_header_func(struct ctdb_call_info *call) +{ + call->reply_data = talloc(call, TDB_DATA); + if (call->reply_data == NULL) { + return -1; + } + call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize; + call->reply_data->dptr = talloc_size(call->reply_data, call->reply_data->dsize); + if (call->reply_data->dptr == NULL) { + return -1; + } + memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header)); + memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize); + + return 0; +} + +/* attach to a specific database - client call */ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, @@ -1762,6 +2010,7 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, /* add well known functions */ ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC); ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC); + ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC); return ctdb_db; } @@ -1927,6 +2176,15 @@ int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, v fprintf(f, "dmaster: %u\n", h->dmaster); fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn); + fprintf(f, "flags: 0x%08x", h->flags); + if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA"); + if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED"); + if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC"); + if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS"); + if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY"); + if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY"); + if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE"); + fprintf(f, "\n"); fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h))); for (i=sizeof(*h);i<data.dsize;i++) { @@ -4272,3 +4530,112 @@ struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handl return &h->header; } + + +struct ctdb_client_control_state * +ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data) +{ + struct ctdb_client_control_state *handle; + struct ctdb_marshall_buffer *m; + struct ctdb_rec_data *rec; + TDB_DATA outdata; + + m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer); + if (m == NULL) { + DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n")); + return NULL; + } + + m->db_id = ctdb_db->db_id; + + rec = ctdb_marshall_record(m, 0, key, header, data); + if (rec == NULL) { + DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n")); + talloc_free(m); + return NULL; + } + m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data)); + if (m == NULL) { + DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n")); + talloc_free(m); + return NULL; + } + m->count++; + memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length); + + + outdata.dptr = (uint8_t *)m; + outdata.dsize = talloc_get_size(m); + + handle = ctdb_control_send(ctdb, destnode, 0, + CTDB_CONTROL_UPDATE_RECORD, 0, outdata, + mem_ctx, &timeout, NULL); + talloc_free(m); + return handle; +} + +int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state) +{ + int ret; + int32_t res; + + ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL); + if ( (ret != 0) || (res != 0) ){ + DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n")); + return -1; + } + + return 0; +} + +int +ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data) +{ + struct ctdb_client_control_state *state; + + state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data); + return ctdb_ctrl_updaterecord_recv(ctdb, state); +} + + + + + + +/* + set a database to be readonly + */ +struct ctdb_client_control_state * +ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid) +{ + TDB_DATA data; + + data.dptr = (uint8_t *)&dbid; + data.dsize = sizeof(dbid); + + return ctdb_control_send(ctdb, destnode, 0, + CTDB_CONTROL_SET_DB_READONLY, 0, data, + ctdb, NULL, NULL); +} + +int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state) +{ + int ret; + int32_t res; + + ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL); + if (ret != 0 || res != 0) { + DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed ret:%d res:%d\n", ret, res)); + return -1; + } + + return 0; +} + +int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid) +{ + struct ctdb_client_control_state *state; + + state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid); + return ctdb_ctrl_set_db_readonly_recv(ctdb, state); +} |