summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Adam <obnox@samba.org>2012-12-21 00:24:47 +0100
committerAmitay Isaacs <amitay@gmail.com>2013-04-24 18:47:32 +1000
commit527976d02aa004be96dff50cebdaf08eff82e936 (patch)
tree5bc4daa98f863668e40aeedfca5d7d78d1c74092
parentf49d57c21d8e44f81eadc824d48a67d634d56415 (diff)
downloadsamba-527976d02aa004be96dff50cebdaf08eff82e936.tar.gz
samba-527976d02aa004be96dff50cebdaf08eff82e936.tar.xz
samba-527976d02aa004be96dff50cebdaf08eff82e936.zip
vacuum: introduce the RECEIVE_RECORDS control
This in preparation of turning the vacuming on the lmaster into into a two phase process: - First the node sends the list of records to be vacuumed to all other nodes with this new RECEIVE_RECORDS control. The remote nodes should store the lmaster's empty current copy. - Only those records that could be stored on all other nodes are processed further. They are send to all other nodes with the TRY_DELETE_RECORDS control as before for deletion. Signed-off-by: Michael Adam <obnox@samba.org> Reviewed-By: Amitay Isaacs <amitay@gmail.com> (This used to be ctdb commit e397702e271af38204fd99733bbeba7c1db3a999)
-rw-r--r--ctdb/include/ctdb_private.h1
-rw-r--r--ctdb/include/ctdb_protocol.h1
-rw-r--r--ctdb/server/ctdb_control.c3
-rw-r--r--ctdb/server/ctdb_recover.c204
4 files changed, 209 insertions, 0 deletions
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index 09f7dd93fd0..03e996bc7eb 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -1253,6 +1253,7 @@ int32_t ctdb_control_get_tunable(struct ctdb_context *ctdb, TDB_DATA indata,
int32_t ctdb_control_set_tunable(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_list_tunables(struct ctdb_context *ctdb, TDB_DATA *outdata);
int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
+int32_t ctdb_control_receive_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
int32_t ctdb_control_add_public_address(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_del_public_address(struct ctdb_context *ctdb, TDB_DATA indata);
diff --git a/ctdb/include/ctdb_protocol.h b/ctdb/include/ctdb_protocol.h
index 751fe32b16b..2d01b3c9777 100644
--- a/ctdb/include/ctdb_protocol.h
+++ b/ctdb/include/ctdb_protocol.h
@@ -403,6 +403,7 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS = 0,
CTDB_CONTROL_SET_DB_STICKY = 133,
CTDB_CONTROL_RELOAD_PUBLIC_IPS = 134,
CTDB_CONTROL_TRAVERSE_ALL_EXT = 135,
+ CTDB_CONTROL_RECEIVE_RECORDS = 136,
};
/*
diff --git a/ctdb/server/ctdb_control.c b/ctdb/server/ctdb_control.c
index affb9dd00d3..0d0f61c07e3 100644
--- a/ctdb/server/ctdb_control.c
+++ b/ctdb/server/ctdb_control.c
@@ -654,6 +654,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
CHECK_CONTROL_DATA_SIZE(0);
return ctdb_control_reload_public_ips(ctdb, c, async_reply);
+ case CTDB_CONTROL_RECEIVE_RECORDS:
+ return ctdb_control_receive_records(ctdb, indata, outdata);
+
default:
DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return -1;
diff --git a/ctdb/server/ctdb_recover.c b/ctdb/server/ctdb_recover.c
index 433a6656969..3250eaf35e0 100644
--- a/ctdb/server/ctdb_recover.c
+++ b/ctdb/server/ctdb_recover.c
@@ -1092,6 +1092,210 @@ int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA inda
return 0;
}
+/**
+ * Store a record as part of the vacuum process:
+ * This is called from the RECEIVE_RECORD control which
+ * the lmaster uses to send the current empty copy
+ * to all nodes for storing, before it lets the other
+ * nodes delete the records in the second phase with
+ * the TRY_DELETE_RECORDS control.
+ *
+ * Only store if we are not lmaster or dmaster, and our
+ * rsn is <= the provided rsn. Use non-blocking locks.
+ *
+ * return 0 if the record was successfully stored.
+ * return !0 if the record still exists in the tdb after returning.
+ */
+static int store_tdb_record(struct ctdb_context *ctdb,
+ struct ctdb_db_context *ctdb_db,
+ struct ctdb_rec_data *rec)
+{
+ TDB_DATA key, data, data2;
+ struct ctdb_ltdb_header *hdr, *hdr2;
+ int ret;
+
+ key.dsize = rec->keylen;
+ key.dptr = &rec->data[0];
+ data.dsize = rec->datalen;
+ data.dptr = &rec->data[rec->keylen];
+
+ if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
+ DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
+ "where we are lmaster\n"));
+ return -1;
+ }
+
+ if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+ DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
+ return -1;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+ /* use a non-blocking lock */
+ if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to lock chain\n"));
+ return -1;
+ }
+
+ data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
+ tdb_store(ctdb_db->ltdb->tdb, key, data, 0);
+ DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
+ ret = 0;
+ goto done;
+ }
+
+ hdr2 = (struct ctdb_ltdb_header *)data.dptr;
+
+ if (hdr2->rsn > hdr->rsn) {
+ DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
+ "rsn=%llu - called with rsn=%llu\n",
+ (unsigned long long)hdr2->rsn,
+ (unsigned long long)hdr->rsn));
+ ret = -1;
+ goto done;
+ }
+
+ /* do not allow vacuuming of records that have readonly flags set. */
+ if (hdr->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|
+ CTDB_REC_RO_HAVE_READONLY|
+ CTDB_REC_RO_REVOKING_READONLY|
+ CTDB_REC_RO_REVOKE_COMPLETE))
+ {
+ DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
+ "flags set\n"));
+ ret = -1;
+ goto done;
+ }
+ if (hdr2->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|
+ CTDB_REC_RO_HAVE_READONLY|
+ CTDB_REC_RO_REVOKING_READONLY|
+ CTDB_REC_RO_REVOKE_COMPLETE))
+ {
+ DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly "
+ "flags set\n"));
+ ret = -1;
+ goto done;
+ }
+
+ if (hdr2->dmaster == ctdb->pnn) {
+ DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
+ "where we are the dmaster\n"));
+ ret = -1;
+ goto done;
+ }
+
+ if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
+ DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
+ ret = -1;
+ goto done;
+ }
+
+ ret = 0;
+
+done:
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ free(data2.dptr);
+ return ret;
+}
+
+
+
+/**
+ * Try to store all these records as part of the vacuuming process
+ * and return the records we failed to store.
+ */
+int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
+ TDB_DATA indata, TDB_DATA *outdata)
+{
+ struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
+ struct ctdb_db_context *ctdb_db;
+ int i;
+ struct ctdb_rec_data *rec;
+ struct ctdb_marshall_buffer *records;
+
+ if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
+ DEBUG(DEBUG_ERR,
+ (__location__ " invalid data in receive_records\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, reply->db_id);
+ if (!ctdb_db) {
+ DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
+ reply->db_id));
+ return -1;
+ }
+
+ DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
+ "dbid 0x%x\n", reply->count, reply->db_id));
+
+ /* create a blob to send back the records we could not store */
+ records = (struct ctdb_marshall_buffer *)
+ talloc_zero_size(outdata,
+ offsetof(struct ctdb_marshall_buffer, data));
+ if (records == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
+ return -1;
+ }
+ records->db_id = ctdb_db->db_id;
+
+ rec = (struct ctdb_rec_data *)&reply->data[0];
+ for (i=0; i<reply->count; i++) {
+ TDB_DATA key, data;
+
+ key.dptr = &rec->data[0];
+ key.dsize = rec->keylen;
+ data.dptr = &rec->data[key.dsize];
+ data.dsize = rec->datalen;
+
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
+ "in indata\n"));
+ return -1;
+ }
+
+ /*
+ * If we can not store the record we must add it to the reply
+ * so the lmaster knows it may not purge this record.
+ */
+ if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
+ size_t old_size;
+ struct ctdb_ltdb_header *hdr;
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+ data.dptr += sizeof(*hdr);
+ data.dsize -= sizeof(*hdr);
+
+ DEBUG(DEBUG_INFO, (__location__ " Failed to store "
+ "record with hash 0x%08x in vacuum "
+ "via RECEIVE_RECORDS\n",
+ ctdb_hash(&key)));
+
+ old_size = talloc_get_size(records);
+ records = talloc_realloc_size(outdata, records,
+ old_size + rec->length);
+ if (records == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to "
+ "expand\n"));
+ return -1;
+ }
+ records->count++;
+ memcpy(old_size+(uint8_t *)records, rec, rec->length);
+ }
+
+ rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
+ }
+
+
+ outdata->dptr = (uint8_t *)records;
+ outdata->dsize = talloc_get_size(records);
+
+ return 0;
+}
+
+
/*
report capabilities
*/