summaryrefslogtreecommitdiffstats
path: root/ctdb/client
diff options
context:
space:
mode:
authorAmitay Isaacs <amitay@gmail.com>2013-09-23 18:30:04 +1000
committerAmitay Isaacs <amitay@gmail.com>2013-10-04 15:46:15 +1000
commitc5ec04f24ead4c5c5418fedf50d1ef77ba8620cd (patch)
tree1c7864228f07d45840e1e818784ab0be5934f2c9 /ctdb/client
parent1203e82d9b81798d4ad25f19b59c8943446e0f8c (diff)
downloadsamba-c5ec04f24ead4c5c5418fedf50d1ef77ba8620cd.tar.gz
samba-c5ec04f24ead4c5c5418fedf50d1ef77ba8620cd.tar.xz
samba-c5ec04f24ead4c5c5418fedf50d1ef77ba8620cd.zip
client: Reimplement persistent transaction code using TRANS3_COMMIT
Implementing persistent trasnaction code from Samba. Persistent transaction code was reimplemented in Samba using g_lock.tdb to hold transaction locks and using TRANS3_COMMIT control. Implementation details: 1. When starting a transaction, create a record with "transaction-<dbid>" as key and store current server_id in the structure. 2. If a record already exists, some other client has already started a transaction. Verify that the process corresponding to server_id stored in the record really exists or it's a stale record and overwrite it. 3. All modifications to the actual persistent database are stored in a marshal buffer. 4. When transaction is committed, read the sequence number of the persistent database and increment it. Sequence number record is also stored in the marshal buffer. 5. Send the changed records (marshal buffer) in TRANS3_COMMIT control to all the active nodes. 6. If all controls succeed, verify that the sequence number has been incremented. Commit is successful. If any of the controls fail, abort the transaction. 7. In case sequence number has not yet been incremented, then database recovery has been triggered. So repeat from step 5. Signed-off-by: Amitay Isaacs <amitay@gmail.com> (This used to be ctdb commit 4e0f1971792c9431d8d51dc57d54ecc9e4576dd5)
Diffstat (limited to 'ctdb/client')
-rw-r--r--ctdb/client/ctdb_client.c290
1 files changed, 290 insertions, 0 deletions
diff --git a/ctdb/client/ctdb_client.c b/ctdb/client/ctdb_client.c
index 2666ca2a31..3d9c6ed5ff 100644
--- a/ctdb/client/ctdb_client.c
+++ b/ctdb/client/ctdb_client.c
@@ -3994,6 +3994,295 @@ static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
return true;
}
+
+struct ctdb_transaction_handle {
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_db_context *g_lock_db;
+ char *lock_name;
+ uint32_t reqid;
+ /*
+ * we store reads and writes done under a transaction:
+ * - one list stores both reads and writes (m_all)
+ * - the other just writes (m_write)
+ */
+ struct ctdb_marshall_buffer *m_all;
+ struct ctdb_marshall_buffer *m_write;
+};
+
+static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
+{
+ g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
+ ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
+ return 0;
+}
+
+
+/**
+ * start a transaction on a database
+ */
+struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx)
+{
+ struct ctdb_transaction_handle *h;
+ struct ctdb_server_id id;
+
+ h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
+ if (h == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
+ return NULL;
+ }
+
+ h->ctdb_db = ctdb_db;
+ h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
+ (unsigned int)ctdb_db->db_id);
+ if (h->lock_name == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
+ "g_lock.tdb", false, 0);
+ if (!h->g_lock_db) {
+ DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ id.type = SERVER_TYPE_SAMBA;
+ id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
+ id.server_id = getpid();
+
+ if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
+ &id) != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
+
+ if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
+ DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ talloc_set_destructor(h, ctdb_transaction_destructor);
+ return h;
+}
+
+/**
+ * fetch a record inside a transaction
+ */
+int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
+{
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ ZERO_STRUCT(header);
+
+ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
+ if (ret == -1 && header.dmaster == (uint32_t)-1) {
+ /* record doesn't exist yet */
+ *data = tdb_null;
+ ret = 0;
+ }
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * stores a record inside a transaction
+ */
+int ctdb_transaction_store(struct ctdb_transaction_handle *h,
+ TDB_DATA key, TDB_DATA data)
+{
+ TALLOC_CTX *tmp_ctx = talloc_new(h);
+ struct ctdb_ltdb_header header;
+ TDB_DATA olddata;
+ int ret;
+
+ /* we need the header so we can update the RSN */
+ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
+ if (ret == -1 && header.dmaster == (uint32_t)-1) {
+ /* the record doesn't exist - create one with us as dmaster.
+ This is only safe because we are in a transaction and this
+ is a persistent database */
+ ZERO_STRUCT(header);
+ } else if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
+ talloc_free(tmp_ctx);
+ return ret;
+ }
+
+ if (data.dsize == olddata.dsize &&
+ memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
+ header.rsn != 0) {
+ /* save writing the same data */
+ talloc_free(tmp_ctx);
+ return 0;
+ }
+
+ header.dmaster = h->ctdb_db->ctdb->pnn;
+ header.rsn++;
+
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
+ if (h->m_write == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ talloc_free(tmp_ctx);
+ return 0;
+}
+
+static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
+ if (ret != 0) {
+ *seqnum = 0;
+ return 0;
+ }
+
+ if (data.dsize != sizeof(*seqnum)) {
+ DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
+ data.dsize));
+ talloc_free(data.dptr);
+ return -1;
+ }
+
+ *seqnum = *(uint64_t *)data.dptr;
+ talloc_free(data.dptr);
+
+ return 0;
+}
+
+
+static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
+ uint64_t seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ data.dptr = (uint8_t *)&seqnum;
+ data.dsize = sizeof(seqnum);
+
+ return ctdb_transaction_store(h, key, data);
+}
+
+
+/**
+ * commit a transaction
+ */
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+ int ret;
+ uint64_t old_seqnum, new_seqnum;
+ int32_t status;
+ struct timeval timeout;
+
+ if (h->m_write == NULL) {
+ /* no changes were made */
+ talloc_free(h);
+ return 0;
+ }
+
+ ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+ ret = -1;
+ goto done;
+ }
+
+ new_seqnum = old_seqnum + 1;
+ ret = ctdb_store_db_seqnum(h, new_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
+ ret = -1;
+ goto done;
+ }
+
+again:
+ timeout = timeval_current_ofs(3,0);
+ ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
+ h->ctdb_db->db_id,
+ CTDB_CONTROL_TRANS3_COMMIT, 0,
+ ctdb_marshall_finish(h->m_write), NULL, NULL,
+ &status, &timeout, NULL);
+ if (ret != 0 || status != 0) {
+ /*
+ * TRANS3_COMMIT control will only fail if recovery has been
+ * triggered. Check if the database has been updated or not.
+ */
+ ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+ goto done;
+ }
+
+ if (new_seqnum == old_seqnum) {
+ /* Database not yet updated, try again */
+ goto again;
+ }
+
+ if (new_seqnum != (old_seqnum + 1)) {
+ DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
+ (long long unsigned)new_seqnum,
+ (long long unsigned)old_seqnum));
+ ret = -1;
+ goto done;
+ }
+ }
+
+ ret = 0;
+
+done:
+ talloc_free(h);
+ return ret;
+}
+
+/**
+ * cancel a transaction
+ */
+int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
+{
+ talloc_free(h);
+ return 0;
+}
+
+#if 0
/**
* check whether a transaction is active on a given db on a given node
*/
@@ -4437,6 +4726,7 @@ again:
talloc_free(h);
return 0;
}
+#endif
/*
recovery daemon ping to main daemon