diff options
author | Michael Adam <obnox@samba.org> | 2012-12-21 00:24:47 +0100 |
---|---|---|
committer | Amitay Isaacs <amitay@gmail.com> | 2013-04-24 18:47:32 +1000 |
commit | 527976d02aa004be96dff50cebdaf08eff82e936 (patch) | |
tree | 5bc4daa98f863668e40aeedfca5d7d78d1c74092 | |
parent | f49d57c21d8e44f81eadc824d48a67d634d56415 (diff) | |
download | samba-527976d02aa004be96dff50cebdaf08eff82e936.tar.gz samba-527976d02aa004be96dff50cebdaf08eff82e936.tar.xz samba-527976d02aa004be96dff50cebdaf08eff82e936.zip |
vacuum: introduce the RECEIVE_RECORDS control
This in preparation of turning the vacuming on the lmaster into
into a two phase process:
- First the node sends the list of records to be vacuumed
to all other nodes with this new RECEIVE_RECORDS control.
The remote nodes should store the lmaster's empty current copy.
- Only those records that could be stored on all other nodes
are processed further. They are send to all other nodes with
the TRY_DELETE_RECORDS control as before for deletion.
Signed-off-by: Michael Adam <obnox@samba.org>
Reviewed-By: Amitay Isaacs <amitay@gmail.com>
(This used to be ctdb commit e397702e271af38204fd99733bbeba7c1db3a999)
-rw-r--r-- | ctdb/include/ctdb_private.h | 1 | ||||
-rw-r--r-- | ctdb/include/ctdb_protocol.h | 1 | ||||
-rw-r--r-- | ctdb/server/ctdb_control.c | 3 | ||||
-rw-r--r-- | ctdb/server/ctdb_recover.c | 204 |
4 files changed, 209 insertions, 0 deletions
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h index 09f7dd93fd0..03e996bc7eb 100644 --- a/ctdb/include/ctdb_private.h +++ b/ctdb/include/ctdb_private.h @@ -1253,6 +1253,7 @@ int32_t ctdb_control_get_tunable(struct ctdb_context *ctdb, TDB_DATA indata, int32_t ctdb_control_set_tunable(struct ctdb_context *ctdb, TDB_DATA indata); int32_t ctdb_control_list_tunables(struct ctdb_context *ctdb, TDB_DATA *outdata); int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata); +int32_t ctdb_control_receive_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata); int32_t ctdb_control_add_public_address(struct ctdb_context *ctdb, TDB_DATA indata); int32_t ctdb_control_del_public_address(struct ctdb_context *ctdb, TDB_DATA indata); diff --git a/ctdb/include/ctdb_protocol.h b/ctdb/include/ctdb_protocol.h index 751fe32b16b..2d01b3c9777 100644 --- a/ctdb/include/ctdb_protocol.h +++ b/ctdb/include/ctdb_protocol.h @@ -403,6 +403,7 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS = 0, CTDB_CONTROL_SET_DB_STICKY = 133, CTDB_CONTROL_RELOAD_PUBLIC_IPS = 134, CTDB_CONTROL_TRAVERSE_ALL_EXT = 135, + CTDB_CONTROL_RECEIVE_RECORDS = 136, }; /* diff --git a/ctdb/server/ctdb_control.c b/ctdb/server/ctdb_control.c index affb9dd00d3..0d0f61c07e3 100644 --- a/ctdb/server/ctdb_control.c +++ b/ctdb/server/ctdb_control.c @@ -654,6 +654,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb, CHECK_CONTROL_DATA_SIZE(0); return ctdb_control_reload_public_ips(ctdb, c, async_reply); + case CTDB_CONTROL_RECEIVE_RECORDS: + return ctdb_control_receive_records(ctdb, indata, outdata); + default: DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode)); return -1; diff --git a/ctdb/server/ctdb_recover.c b/ctdb/server/ctdb_recover.c index 433a6656969..3250eaf35e0 100644 --- a/ctdb/server/ctdb_recover.c +++ b/ctdb/server/ctdb_recover.c @@ -1092,6 +1092,210 @@ int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA inda return 0; } +/** + * Store a record as part of the vacuum process: + * This is called from the RECEIVE_RECORD control which + * the lmaster uses to send the current empty copy + * to all nodes for storing, before it lets the other + * nodes delete the records in the second phase with + * the TRY_DELETE_RECORDS control. + * + * Only store if we are not lmaster or dmaster, and our + * rsn is <= the provided rsn. Use non-blocking locks. + * + * return 0 if the record was successfully stored. + * return !0 if the record still exists in the tdb after returning. + */ +static int store_tdb_record(struct ctdb_context *ctdb, + struct ctdb_db_context *ctdb_db, + struct ctdb_rec_data *rec) +{ + TDB_DATA key, data, data2; + struct ctdb_ltdb_header *hdr, *hdr2; + int ret; + + key.dsize = rec->keylen; + key.dptr = &rec->data[0]; + data.dsize = rec->datalen; + data.dptr = &rec->data[rec->keylen]; + + if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) { + DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record " + "where we are lmaster\n")); + return -1; + } + + if (data.dsize != sizeof(struct ctdb_ltdb_header)) { + DEBUG(DEBUG_ERR, (__location__ " Bad record size\n")); + return -1; + } + + hdr = (struct ctdb_ltdb_header *)data.dptr; + + /* use a non-blocking lock */ + if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) { + DEBUG(DEBUG_ERR, (__location__ " Failed to lock chain\n")); + return -1; + } + + data2 = tdb_fetch(ctdb_db->ltdb->tdb, key); + if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) { + tdb_store(ctdb_db->ltdb->tdb, key, data, 0); + DEBUG(DEBUG_INFO, (__location__ " Stored record\n")); + ret = 0; + goto done; + } + + hdr2 = (struct ctdb_ltdb_header *)data.dptr; + + if (hdr2->rsn > hdr->rsn) { + DEBUG(DEBUG_INFO, (__location__ " Skipping record with " + "rsn=%llu - called with rsn=%llu\n", + (unsigned long long)hdr2->rsn, + (unsigned long long)hdr->rsn)); + ret = -1; + goto done; + } + + /* do not allow vacuuming of records that have readonly flags set. */ + if (hdr->flags & (CTDB_REC_RO_HAVE_DELEGATIONS| + CTDB_REC_RO_HAVE_READONLY| + CTDB_REC_RO_REVOKING_READONLY| + CTDB_REC_RO_REVOKE_COMPLETE)) + { + DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly " + "flags set\n")); + ret = -1; + goto done; + } + if (hdr2->flags & (CTDB_REC_RO_HAVE_DELEGATIONS| + CTDB_REC_RO_HAVE_READONLY| + CTDB_REC_RO_REVOKING_READONLY| + CTDB_REC_RO_REVOKE_COMPLETE)) + { + DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly " + "flags set\n")); + ret = -1; + goto done; + } + + if (hdr2->dmaster == ctdb->pnn) { + DEBUG(DEBUG_INFO, (__location__ " Attempted to store record " + "where we are the dmaster\n")); + ret = -1; + goto done; + } + + if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) { + DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n")); + ret = -1; + goto done; + } + + ret = 0; + +done: + tdb_chainunlock(ctdb_db->ltdb->tdb, key); + free(data2.dptr); + return ret; +} + + + +/** + * Try to store all these records as part of the vacuuming process + * and return the records we failed to store. + */ +int32_t ctdb_control_receive_records(struct ctdb_context *ctdb, + TDB_DATA indata, TDB_DATA *outdata) +{ + struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr; + struct ctdb_db_context *ctdb_db; + int i; + struct ctdb_rec_data *rec; + struct ctdb_marshall_buffer *records; + + if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) { + DEBUG(DEBUG_ERR, + (__location__ " invalid data in receive_records\n")); + return -1; + } + + ctdb_db = find_ctdb_db(ctdb, reply->db_id); + if (!ctdb_db) { + DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n", + reply->db_id)); + return -1; + } + + DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for " + "dbid 0x%x\n", reply->count, reply->db_id)); + + /* create a blob to send back the records we could not store */ + records = (struct ctdb_marshall_buffer *) + talloc_zero_size(outdata, + offsetof(struct ctdb_marshall_buffer, data)); + if (records == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Out of memory\n")); + return -1; + } + records->db_id = ctdb_db->db_id; + + rec = (struct ctdb_rec_data *)&reply->data[0]; + for (i=0; i<reply->count; i++) { + TDB_DATA key, data; + + key.dptr = &rec->data[0]; + key.dsize = rec->keylen; + data.dptr = &rec->data[key.dsize]; + data.dsize = rec->datalen; + + if (data.dsize < sizeof(struct ctdb_ltdb_header)) { + DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record " + "in indata\n")); + return -1; + } + + /* + * If we can not store the record we must add it to the reply + * so the lmaster knows it may not purge this record. + */ + if (store_tdb_record(ctdb, ctdb_db, rec) != 0) { + size_t old_size; + struct ctdb_ltdb_header *hdr; + + hdr = (struct ctdb_ltdb_header *)data.dptr; + data.dptr += sizeof(*hdr); + data.dsize -= sizeof(*hdr); + + DEBUG(DEBUG_INFO, (__location__ " Failed to store " + "record with hash 0x%08x in vacuum " + "via RECEIVE_RECORDS\n", + ctdb_hash(&key))); + + old_size = talloc_get_size(records); + records = talloc_realloc_size(outdata, records, + old_size + rec->length); + if (records == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Failed to " + "expand\n")); + return -1; + } + records->count++; + memcpy(old_size+(uint8_t *)records, rec, rec->length); + } + + rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec); + } + + + outdata->dptr = (uint8_t *)records; + outdata->dsize = talloc_get_size(records); + + return 0; +} + + /* report capabilities */ |