diff options
Diffstat (limited to 'ldap/servers/plugins/replication/repl5_replica.c')
-rw-r--r-- | ldap/servers/plugins/replication/repl5_replica.c | 3387 |
1 files changed, 3387 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c new file mode 100644 index 00000000..5bc3e8ee --- /dev/null +++ b/ldap/servers/plugins/replication/repl5_replica.c @@ -0,0 +1,3387 @@ +/** BEGIN COPYRIGHT BLOCK + * Copyright 2001 Sun Microsystems, Inc. + * Portions copyright 1999, 2001-2003 Netscape Communications Corporation. + * All rights reserved. + * END COPYRIGHT BLOCK **/ +/* repl5_replica.c */ + +#include "slapi-plugin.h" +#include "repl.h" /* ONREPL - this is bad */ +#include "repl5.h" +#include "repl_shared.h" +#include "csnpl.h" +#include "cl5_api.h" + +/* from proto-slap.h */ +int g_get_shutdown(); + +#define RUV_SAVE_INTERVAL (30 * 1000) /* 30 seconds */ +#define START_UPDATE_DELAY 2 /* 2 second */ +#define START_REAP_DELAY 3600 /* 1 hour */ + +#define REPLICA_RDN "cn=replica" +#define CHANGELOG_RDN "cn=legacy changelog" + +/* + * A replica is a locally-held copy of a portion of the DIT. + */ +struct replica { + Slapi_DN *repl_root; /* top of the replicated area */ + char *repl_name; /* unique replica name */ + PRBool new_name; /* new name was generated - need to be saved */ + ReplicaUpdateDNList updatedn_list; /* list of dns with which a supplier should bind + to update this replica */ + ReplicaType repl_type; /* is this replica read-only ? */ + PRBool legacy_consumer; /* if true, this replica is supplied by 4.0 consumer */ + char* legacy_purl; /* partial url of the legacy supplier */ + ReplicaId repl_rid; /* replicaID */ + Object *repl_ruv; /* replica update vector */ + PRBool repl_ruv_dirty; /* Dirty flag for ruv */ + CSNPL *min_csn_pl; /* Pending list for minimal CSN */ + void *csn_pl_reg_id; /* registration assignment for csn callbacks */ + unsigned long repl_state_flags; /* state flags */ + PRUint32 repl_flags; /* persistent, externally visible flags */ + PRLock *repl_lock; /* protects entire structure */ + Slapi_Eq_Context repl_eqcxt_rs; /* context to cancel event that saves ruv */ + Slapi_Eq_Context repl_eqcxt_tr; /* context to cancel event that reaps tombstones */ + Object *repl_csngen; /* CSN generator for this replica */ + PRBool repl_csn_assigned; /* Flag set when new csn is assigned. */ + PRUint32 repl_purge_delay; /* When purgeable, CSNs are held on to for this many extra seconds */ + PRBool tombstone_reap_stop; /* TRUE when the tombstone reaper should stop */ + PRBool tombstone_reap_active; /* TRUE when the tombstone reaper is running */ + long tombstone_reap_interval; /* Time in seconds between tombstone reaping */ + Slapi_ValueSet *repl_referral; /* A list of administrator provided referral URLs */ + PRBool state_update_inprogress; /* replica state is being updated */ + PRLock *agmt_lock; /* protects agreement creation, start and stop */ + char *locking_purl; /* supplier who has exclusive access */ +}; + + +typedef struct reap_callback_data +{ + int rc; + unsigned long num_entries; + unsigned long num_purged_entries; + CSN *purge_csn; + PRBool *tombstone_reap_stop; +} reap_callback_data; + + +/* Forward declarations of helper functions*/ +static Slapi_Entry* _replica_get_config_entry (const Slapi_DN *root); +static int _replica_check_validity (const Replica *r); +static int _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext); +static int _replica_update_entry (Replica *r, Slapi_Entry *e, char *errortext); +static int _replica_configure_ruv (Replica *r, PRBool isLocked); +static void _replica_update_state (time_t when, void *arg); +static char * _replica_get_config_dn (const Slapi_DN *root); +static char * _replica_type_as_string (const Replica *r); +static int replica_create_ruv_tombstone(Replica *r); +static void assign_csn_callback(const CSN *csn, void *data); +static void abort_csn_callback(const CSN *csn, void *data); +static void eq_cb_reap_tombstones(time_t when, void *arg); +static CSN *_replica_get_purge_csn_nolock (const Replica *r); +static void replica_get_referrals_nolock (const Replica *r, char ***referrals); +static void replica_clear_legacy_referrals (const Slapi_DN *repl_root_sdn, char **referrals, const char *state); +static void replica_remove_legacy_attr (const Slapi_DN *repl_root_sdn, const char *attr); +static int replica_log_ruv_elements_nolock (const Replica *r); +static void replica_replace_ruv_tombstone(Replica *r); +static void start_agreements_for_replica (Replica *r, PRBool start); + +/* Allocates new replica and reads its state and state of its component from + * various parts of the DIT. + */ +Replica * +replica_new(const Slapi_DN *root) +{ + Replica *r = NULL; + Slapi_Entry *e = NULL; + char errorbuf[BUFSIZ]; + char ebuf[BUFSIZ]; + + PR_ASSERT (root); + + /* check if there is a replica associated with the tree */ + e = _replica_get_config_entry (root); + if (e) + { + errorbuf[0] = '\0'; + r = replica_new_from_entry(e, errorbuf, + PR_FALSE /* not a newly added entry */); + + if (NULL == r) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Unable to " + "configure replica %s: %s\n", + escape_string(slapi_sdn_get_dn(root), ebuf), + errorbuf); + } + + slapi_entry_free (e); + } + + return r; +} + +/* constructs the replica object from the newly added entry */ +Replica * +replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation) +{ + int rc = 0; + Replica *r; + RUV *ruv; + char *repl_name = NULL; + + if (e == NULL) + { + if (NULL != errortext) + { + sprintf (errortext, "NULL entry"); + } + return NULL; + } + + r = (Replica *)slapi_ch_calloc(1, sizeof(Replica)); + + if ((r->repl_lock = PR_NewLock()) == NULL) + { + if (NULL != errortext) + { + sprintf (errortext, "failed to create replica lock"); + } + rc = -1; + goto done; + } + + if ((r->agmt_lock = PR_NewLock()) == NULL) + { + if (NULL != errortext) + { + sprintf (errortext, "failed to create replica lock"); + } + rc = -1; + goto done; + } + + /* read parameters from the replica config entry */ + rc = _replica_init_from_config (r, e, errortext); + if (rc != 0) + { + goto done; + } + + /* configure ruv */ + rc = _replica_configure_ruv (r, PR_FALSE); + if (rc != 0) + { + goto done; + } + + /* If smallest csn exists in RUV for our local replica, it's ok to begin iteration */ + ruv = (RUV*) object_get_data (r->repl_ruv); + PR_ASSERT (ruv); + + if (is_add_operation) + { + /* + * This is called by an ldap add operation. + * Update the entry to contain information generated + * during replica initialization + */ + rc = _replica_update_entry (r, e, errortext); + } + else + { + /* + * Entry is already in dse.ldif - update it on the disk + * (done by the update state event scheduled below) + */ + } + if (rc != 0) + goto done; + + /* ONREPL - the state update can occur before the entry is added to the DIT. + In that case the updated would fail but nothing bad would happen. The next + scheduled update would save the state */ + repl_name = slapi_ch_strdup (r->repl_name); + r->repl_eqcxt_rs = slapi_eq_repeat(_replica_update_state, repl_name, + current_time () + START_UPDATE_DELAY, RUV_SAVE_INTERVAL); + + if (r->tombstone_reap_interval > 0) + { + /* + * Reap Tombstone should be started some time after the plugin started. + * This will allow the server to fully start before consuming resources. + */ + repl_name = slapi_ch_strdup (r->repl_name); + r->repl_eqcxt_tr = slapi_eq_repeat(eq_cb_reap_tombstones, repl_name, current_time() + START_REAP_DELAY, 1000 * r->tombstone_reap_interval); + } + + if (r->legacy_consumer) + { + char ebuf[BUFSIZ]; + + legacy_consumer_init_referrals (r); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replica_new_from_entry: " + "replica for %s was configured as legacy consumer\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + } + +done: + if (rc != 0 && r) + { + replica_destroy ((void**)&r); + } + + return r; +} + + +void +replica_flush(Replica *r) +{ + PR_ASSERT(NULL != r); + if (NULL != r) + { + PR_Lock(r->repl_lock); + /* Make sure we dump the CSNGen state */ + r->repl_csn_assigned = PR_TRUE; + PR_Unlock(r->repl_lock); + /* This function take the Lock Inside */ + /* And also write the RUV */ + _replica_update_state((time_t)0, r->repl_name); + } +} + + +/* + * Deallocate a replica. arg should point to the address of a + * pointer that points to a replica structure. + */ +void +replica_destroy(void **arg) +{ + Replica *r; + void *repl_name; + + if (arg == NULL) + return; + + r = *((Replica **)arg); + + PR_ASSERT(r); + + slapi_log_error (SLAPI_LOG_REPL, NULL, "replica_destroy\n"); + + /* + * The function will not be called unless the refcnt of its + * wrapper object is 0. Hopefully this refcnt could sync up + * this destruction and the events such as tombstone reap + * and ruv updates. + */ + + if (r->repl_eqcxt_rs) + { + repl_name = slapi_eq_get_arg (r->repl_eqcxt_rs); + slapi_ch_free (&repl_name); + slapi_eq_cancel(r->repl_eqcxt_rs); + r->repl_eqcxt_rs = NULL; + } + + if (r->repl_eqcxt_tr) + { + repl_name = slapi_eq_get_arg (r->repl_eqcxt_tr); + slapi_ch_free (&repl_name); + slapi_eq_cancel(r->repl_eqcxt_tr); + r->repl_eqcxt_tr = NULL; + } + + if (r->repl_root) + { + slapi_sdn_free(&r->repl_root); + } + + slapi_ch_free_string(&r->locking_purl); + + if (r->updatedn_list) + { + replica_updatedn_list_free(r->updatedn_list); + r->updatedn_list = NULL; + } + + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free ((void**)&r->repl_name); + slapi_ch_free ((void**)&r->legacy_purl); + + if (r->repl_lock) + { + PR_DestroyLock(r->repl_lock); + r->repl_lock = NULL; + } + + if (r->agmt_lock) + { + PR_DestroyLock(r->agmt_lock); + r->agmt_lock = NULL; + } + + if(NULL != r->repl_ruv) + { + object_release(r->repl_ruv); + } + + if(NULL != r->repl_csngen) + { + if (r->csn_pl_reg_id) + { + csngen_unregister_callbacks((CSNGen *)object_get_data (r->repl_csngen), r->csn_pl_reg_id); + } + object_release(r->repl_csngen); + } + + if (NULL != r->repl_referral) + { + slapi_valueset_free(r->repl_referral); + } + + if (NULL != r->min_csn_pl) + { + csnplFree(&r->min_csn_pl);; + } + + slapi_ch_free((void **)arg); +} + +/* + * Attempt to obtain exclusive access to replica (advisory only) + * + * Returns PR_TRUE if exclusive access was granted, + * PR_FALSE otherwise + * The parameter isInc tells whether or not the replica is being + * locked for an incremental update session - if the replica is + * successfully locked, this value is used - if the replica is already + * in use, this value will be set to TRUE or FALSE, depending on what + * type of update session has the replica in use currently + * locking_purl is the supplier who is attempting to acquire access + * current_purl is the supplier who already has access, if any + */ +PRBool +replica_get_exclusive_access(Replica *r, PRBool *isInc, int connid, int opid, + const char *locking_purl, + char **current_purl) +{ + char ebuf[BUFSIZ]; + PRBool rval = PR_TRUE; + + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + if (r->repl_state_flags & REPLICA_IN_USE) + { + if (isInc) + *isInc = (r->repl_state_flags & REPLICA_INCREMENTAL_IN_PROGRESS); + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": " + "Replica in use locking_purl=%s\n", + connid, opid, + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), + r->locking_purl ? r->locking_purl : "unknown"); + rval = PR_FALSE; + if (current_purl) + { + *current_purl = slapi_ch_strdup(r->locking_purl); + } + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": Acquired replica\n", + connid, opid, + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + r->repl_state_flags |= REPLICA_IN_USE; + if (isInc && *isInc) + { + r->repl_state_flags |= REPLICA_INCREMENTAL_IN_PROGRESS; + } + else + { + /* if connid or opid != 0, it's a total update */ + /* Both set to 0 means we're disabling replication */ + if (connid || opid) + { + r->repl_state_flags |= REPLICA_TOTAL_IN_PROGRESS; + } + } + slapi_ch_free_string(&r->locking_purl); + r->locking_purl = slapi_ch_strdup(locking_purl); + } + PR_Unlock(r->repl_lock); + return rval; +} + +/* + * Relinquish exclusive access to the replica + */ +void +replica_relinquish_exclusive_access(Replica *r, int connid, int opid) +{ + char ebuf[BUFSIZ]; + PRBool isInc; + + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + isInc = (r->repl_state_flags & REPLICA_INCREMENTAL_IN_PROGRESS); + /* check to see if the replica is in use and log a warning if not */ + if (!(r->repl_state_flags & REPLICA_IN_USE)) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": " + "Replica not in use\n", + connid, opid, + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + } else { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": " + "Released replica\n", + connid, opid, + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + slapi_ch_free_string(&r->locking_purl); + r->repl_state_flags &= ~(REPLICA_IN_USE); + if (isInc) + r->repl_state_flags &= ~(REPLICA_INCREMENTAL_IN_PROGRESS); + else + r->repl_state_flags &= ~(REPLICA_TOTAL_IN_PROGRESS); + } + PR_Unlock(r->repl_lock); +} + +/* + * Returns root of the replicated area + */ +PRBool +replica_get_tombstone_reap_active(const Replica *r) +{ + PR_ASSERT(r); + + return(r->tombstone_reap_active); +} + +/* + * Returns root of the replicated area + */ +const Slapi_DN * +replica_get_root(const Replica *r) /* ONREPL - should we return copy instead? */ +{ + PR_ASSERT(r); + + /* replica root never changes so we don't have to lock */ + return(r->repl_root); +} + +/* + * Returns normalized dn of the root of the replicated area + */ +const char * +replica_get_name(const Replica *r) /* ONREPL - should we return copy instead? */ +{ + PR_ASSERT(r); + + /* replica name never changes so we don't have to lock */ + return(r->repl_name); +} + +/* + * Returns replicaid of this replica + */ +ReplicaId +replica_get_rid (const Replica *r) +{ + ReplicaId rid; + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + rid = r->repl_rid; + PR_Unlock(r->repl_lock); + return rid; +} + +/* + * Sets replicaid of this replica - should only be used when also changing the type + */ +void +replica_set_rid (Replica *r, ReplicaId rid) +{ + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + r->repl_rid = rid; + PR_Unlock(r->repl_lock); +} + +/* Returns true if replica was initialized through ORC or import; + * otherwise, false. An uninitialized replica should return + * LDAP_UNWILLING_TO_PERFORM to all client requests + */ +PRBool +replica_is_initialized (const Replica *r) +{ + PR_ASSERT(r); + return (r->repl_ruv != NULL); +} + +/* + * Returns refcounted object that contains RUV. The caller should release the + * object once it is no longer used. To release, call object_release + */ +Object * +replica_get_ruv (const Replica *r) +{ + Object *ruv = NULL; + + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + + PR_ASSERT (r->repl_ruv); + + object_acquire (r->repl_ruv); + + ruv = r->repl_ruv; + + PR_Unlock(r->repl_lock); + + return ruv; +} + +/* + * Sets RUV vector. This function should be called during replica + * (re)initialization. During normal operation, the RUV is read from + * the root of the replicated in the replica_new call + */ +void +replica_set_ruv (Replica *r, RUV *ruv) +{ + PR_ASSERT(r && ruv); + + PR_Lock(r->repl_lock); + + if(NULL != r->repl_ruv) + { + object_release(r->repl_ruv); + } + + /* if the local replica is not in the RUV and it is writable - add it + and reinitialize min_csn pending list */ + if (r->repl_type == REPLICA_TYPE_UPDATABLE) + { + CSN *csn = NULL; + if (r->min_csn_pl) + csnplFree (&r->min_csn_pl); + + if (ruv_contains_replica (ruv, r->repl_rid)) + { + ruv_get_smallest_csn_for_replica(ruv, r->repl_rid, &csn); + if (csn) + csn_free (&csn); + else + r->min_csn_pl = csnplNew (); + /* We need to make sure the local ruv element is the 1st. */ + ruv_move_local_supplier_to_first(ruv, r->repl_rid); + } + else + { + r->min_csn_pl = csnplNew (); + /* To be sure that the local is in first */ + ruv_add_index_replica(ruv, r->repl_rid, multimaster_get_local_purl(), 1); + } + } + + r->repl_ruv = object_new((void*)ruv, (FNFree)ruv_destroy); + r->repl_ruv_dirty = PR_TRUE; + + PR_Unlock(r->repl_lock); +} + +/* + * Update one particular CSN in an RUV. This is meant to be called + * whenever (a) the server has processed a client operation and + * needs to update its CSN, or (b) the server is completing an + * inbound replication session operation, and needs to update its + * local RUV. + */ +void +replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl) +{ + char csn_str[CSN_STRSIZE]; + char ebuf[BUFSIZ]; + + PR_ASSERT(NULL != r); + PR_ASSERT(NULL != updated_csn); +#ifdef DEBUG + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "replica_update_ruv: csn %s\n", + csn_as_string(updated_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */ +#endif + if (NULL == r) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_update_ruv: replica " + "is NULL\n"); + } + else if (NULL == updated_csn) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_update_ruv: csn " + "is NULL when updating replica %s\n", escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + } + else + { + RUV *ruv; + PR_Lock(r->repl_lock); + + if (r->repl_ruv != NULL) + { + ruv = object_get_data(r->repl_ruv); + if (NULL != ruv) + { + ReplicaId rid = csn_get_replicaid(updated_csn); + if (rid == r->repl_rid) + { + if (NULL != r->min_csn_pl) + { + CSN *min_csn; + PRBool committed; + (void)csnplCommit(r->min_csn_pl, updated_csn); + min_csn = csnplGetMinCSN(r->min_csn_pl, &committed); + if (NULL != min_csn) + { + if (committed) + { + ruv_set_min_csn(ruv, min_csn, replica_purl); + csnplFree(&r->min_csn_pl); + } + csn_free(&min_csn); + } + } + } + /* Update max csn for local and remote replicas */ + if (ruv_update_ruv (ruv, updated_csn, replica_purl, rid == r->repl_rid) + != RUV_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, + repl_plugin_name, "replica_update_ruv: unable " + "to update RUV for replica %s, csn = %s\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), + csn_as_string(updated_csn, PR_FALSE, csn_str)); + } + + r->repl_ruv_dirty = PR_TRUE; + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "replica_update_ruv: unable to get RUV object for replica " + "%s\n", escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + } + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_update_ruv: " + "unable to initialize RUV for replica %s\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + } + PR_Unlock(r->repl_lock); + } +} + +/* + * Returns refcounted object that contains csn generator. The caller should release the + * object once it is no longer used. To release, call object_release + */ +Object * +replica_get_csngen (const Replica *r) +{ + Object *csngen; + + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + + object_acquire (r->repl_csngen); + csngen = r->repl_csngen; + + PR_Unlock(r->repl_lock); + + return csngen; +} + +/* + * Returns the replica type. + */ +ReplicaType +replica_get_type (const Replica *r) +{ + PR_ASSERT(r); + return r->repl_type; +} + +/* + * Sets the replica type. + */ +void +replica_set_type (Replica *r, ReplicaType type) +{ + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + r->repl_type = type; + PR_Unlock(r->repl_lock); +} + +/* + * Returns PR_TRUE if this replica is a consumer of 4.0 server + * and PR_FALSE otherwise + */ +PRBool +replica_is_legacy_consumer (const Replica *r) +{ + PR_ASSERT(r); + return r->legacy_consumer; +} + +/* + * Sets the replica type. + */ +void +replica_set_legacy_consumer (Replica *r, PRBool legacy_consumer) +{ + int legacy2mmr; + Slapi_DN *repl_root_sdn = NULL; + char **referrals = NULL; + char *replstate = NULL; + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + + legacy2mmr = r->legacy_consumer && !legacy_consumer; + + /* making the server a regular 5.0 replica */ + if (legacy2mmr) + { + slapi_ch_free ((void**)&r->legacy_purl); + /* Remove copiedFrom/copyingFrom attributes from the root entry */ + /* set the right state in the mapping tree */ + if (r->repl_type == REPLICA_TYPE_READONLY) + { + replica_get_referrals_nolock (r, &referrals); + replstate = STATE_UPDATE_REFERRAL; + } + else /* updateable */ + { + replstate = STATE_BACKEND; + } + } + + r->legacy_consumer = legacy_consumer; + repl_root_sdn = slapi_sdn_dup(r->repl_root); + PR_Unlock(r->repl_lock); + + if (legacy2mmr) + { + replica_clear_legacy_referrals(repl_root_sdn, referrals, replstate); + /* Also change state of the mapping tree node and/or referrals */ + replica_remove_legacy_attr (repl_root_sdn, type_copiedFrom); + replica_remove_legacy_attr (repl_root_sdn, type_copyingFrom); + } + charray_free(referrals); + slapi_sdn_free(&repl_root_sdn); +} + +/* Gets partial url of the legacy supplier - applicable for legacy consumer only */ +char * +replica_get_legacy_purl (const Replica *r) +{ + char *purl; + + PR_Lock (r->repl_lock); + + PR_ASSERT (r->legacy_consumer); + + purl = slapi_ch_strdup (r->legacy_purl); + + PR_Unlock (r->repl_lock); + + return purl; +} + +void +replica_set_legacy_purl (Replica *r, const char *purl) +{ + PR_Lock (r->repl_lock); + + PR_ASSERT (r->legacy_consumer); + + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free ((void**)&r->legacy_purl); + + r->legacy_purl = slapi_ch_strdup (purl); + + PR_Unlock (r->repl_lock); +} + +/* + * Returns true if sdn is the same as updatedn and false otherwise + */ +PRBool +replica_is_updatedn (const Replica *r, const Slapi_DN *sdn) +{ + PRBool result; + + PR_ASSERT (r); + + PR_Lock(r->repl_lock); + + if (sdn == NULL) + { + result = (r->updatedn_list == NULL); + } + else if (r->updatedn_list == NULL) + { + result = PR_FALSE; + } + else + { + result = replica_updatedn_list_ismember(r->updatedn_list, sdn); + } + + PR_Unlock(r->repl_lock); + + return result; +} + +/* + * Sets updatedn list for this replica + */ +void +replica_set_updatedn (Replica *r, const Slapi_ValueSet *vs, int mod_op) +{ + PR_ASSERT (r); + + PR_Lock(r->repl_lock); + + if (!r->updatedn_list) + r->updatedn_list = replica_updatedn_list_new(NULL); + + if (mod_op & LDAP_MOD_DELETE || vs == NULL || + (0 == slapi_valueset_count(vs))) /* null value also causes list deletion */ + replica_updatedn_list_delete(r->updatedn_list, vs); + else if (mod_op & LDAP_MOD_REPLACE) + replica_updatedn_list_replace(r->updatedn_list, vs); + else if (mod_op & LDAP_MOD_ADD) + replica_updatedn_list_add(r->updatedn_list, vs); + + PR_Unlock(r->repl_lock); +} + +/* gets current replica generation for this replica */ +char *replica_get_generation (const Replica *r) +{ + int rc = 0; + char *gen = NULL; + + if (r) + { + PR_Lock(r->repl_lock); + + PR_ASSERT (r->repl_ruv); + + if (rc == 0) + gen = ruv_get_replica_generation ((RUV*)object_get_data (r->repl_ruv)); + + PR_Unlock(r->repl_lock); + } + + return gen; +} + +PRBool replica_is_flag_set (const Replica *r, PRUint32 flag) +{ + if (r) + return (r->repl_flags & flag); + else + return PR_FALSE; +} + +void replica_set_flag (Replica *r, PRUint32 flag, PRBool clear) +{ + if (r == NULL) + return; + + PR_Lock(r->repl_lock); + + if (clear) + { + r->repl_flags &= ~flag; + } + else + { + r->repl_flags |= flag; + } + + PR_Unlock(r->repl_lock); +} + +void replica_replace_flags (Replica *r, PRUint32 flags) +{ + if (r) + { + PR_Lock(r->repl_lock); + r->repl_flags = flags; + PR_Unlock(r->repl_lock); + } +} + +void +replica_get_referrals(const Replica *r, char ***referrals) +{ + PR_Lock(r->repl_lock); + replica_get_referrals_nolock (r, referrals); + PR_Unlock(r->repl_lock); +} + +void +replica_set_referrals(Replica *r,const Slapi_ValueSet *vs) +{ + int ii = 0; + Slapi_Value *vv = NULL; + if (r->repl_referral == NULL) + { + r->repl_referral = slapi_valueset_new(); + } + else + { + slapi_valueset_done(r->repl_referral); + } + slapi_valueset_set_valueset(r->repl_referral, vs); + /* make sure the DN is included in the referral LDAP URL */ + if (r->repl_referral) + { + Slapi_ValueSet *newvs = slapi_valueset_new(); + const char *repl_root = slapi_sdn_get_dn(r->repl_root); + int rootlen = strlen(repl_root); + ii = slapi_valueset_first_value(r->repl_referral, &vv); + while (vv) + { + const char *ref = slapi_value_get_string(vv); + struct ldap_url_desc *lud = NULL; + int myrc = ldap_url_parse(ref, &lud); + /* see if the dn is already in the referral URL */ + if (myrc == LDAP_URL_ERR_NODN || !lud || !lud->lud_dn) { + /* add the dn */ + Slapi_Value *newval = NULL; + int len = strlen(ref); + char *tmpref = NULL; + int need_slash = 0; + if (ref[len-1] != '/') { + len++; /* add another one for the slash */ + need_slash = 1; + } + len += rootlen + 2; + tmpref = slapi_ch_malloc(len); + sprintf(tmpref, "%s%s%s", ref, (need_slash ? "/" : ""), + repl_root); + newval = slapi_value_new_string(tmpref); + slapi_ch_free_string(&tmpref); /* sv_new_string makes a copy */ + slapi_valueset_add_value(newvs, newval); + slapi_value_free(&newval); /* s_vs_add_value makes a copy */ + } + if (lud) + ldap_free_urldesc(lud); + ii = slapi_valueset_next_value(r->repl_referral, ii, &vv); + } + if (slapi_valueset_count(newvs) > 0) { + slapi_valueset_done(r->repl_referral); + slapi_valueset_set_valueset(r->repl_referral, newvs); + } + slapi_valueset_free(newvs); /* s_vs_set_vs makes a copy */ + } +} + +int +replica_update_csngen_state (Replica *r, const RUV *ruv) +{ + int rc = 0; + CSNGen *gen; + CSN *csn = NULL; + + PR_ASSERT (r && ruv); + + rc = ruv_get_max_csn(ruv, &csn); + if (rc != RUV_SUCCESS) + { + return -1; + } + + if (csn == NULL) /* ruv contains no csn - we are done */ + { + return 0; + } + + PR_Lock(r->repl_lock); + + gen = (CSNGen *)object_get_data (r->repl_csngen); + PR_ASSERT (gen); + + rc = csngen_adjust_time (gen, csn); + if (rc != CSN_SUCCESS) + { + rc = -1; + goto done; + } + + rc = 0; + +done: + + PR_Unlock(r->repl_lock); + if (csn) + csn_free (&csn); + + return rc; +} + +/* + * dumps replica state for debugging purpose + */ +void +replica_dump(Replica *r) +{ + char *updatedn_list = NULL; + PR_ASSERT (r); + + PR_Lock(r->repl_lock); + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "Replica state:\n"); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\treplica root: %s\n", + slapi_sdn_get_ndn (r->repl_root)); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\treplica type: %s\n", + _replica_type_as_string (r)); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\treplica id: %d\n", r->repl_rid); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\tflags: %d\n", r->repl_flags); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\tstate flags: %d\n", r->repl_state_flags); + if (r->updatedn_list) + updatedn_list = replica_updatedn_list_to_string(r->updatedn_list, "\n\t\t"); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\tupdate dn: %s\n", + updatedn_list? updatedn_list : "not configured"); + slapi_ch_free_string(&updatedn_list); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\truv: %s configured and is %sdirty\n", + r->repl_ruv ? "" : "not", r->repl_ruv_dirty ? "" : "not "); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "\tCSN generator: %s configured\n", + r->repl_csngen ? "" : "not"); + /* JCMREPL - Dump Referrals */ + + PR_Unlock(r->repl_lock); +} + + +/* + * Return the CSN of the purge point. Any CSNs smaller than the + * purge point can be safely removed from entries within this + * this replica. Returns an allocated CSN that must be freed by + * the caller, or NULL if purging is disabled. + */ + +CSN * +replica_get_purge_csn(const Replica *r) +{ + CSN *csn; + + PR_Lock(r->repl_lock); + + csn= _replica_get_purge_csn_nolock(r); + + PR_Unlock(r->repl_lock); + + return csn; +} + + +/* + * This function logs a dummy entry for the smallest csn in the RUV. + * This is necessary because, to get the next change, we need to position + * changelog on the previous change. So this function insures that we always have one. + */ + +/* ONREPL we will need to change this function to log all the + * ruv elements not just the smallest when changelog iteration + * algoritm changes to iterate replica by replica +*/ +int +replica_log_ruv_elements (const Replica *r) +{ + int rc = 0; + + PR_ASSERT (r); + + PR_Lock(r->repl_lock); + + rc = replica_log_ruv_elements_nolock (r); + + PR_Unlock(r->repl_lock); + + return rc; +} + +void +consumer5_set_mapping_tree_state_for_replica(const Replica *r, RUV *supplierRuv) +{ + const Slapi_DN *repl_root_sdn= replica_get_root(r); + char **ruv_referrals= NULL; + char **replica_referrals= NULL; + RUV *ruv; + int state_backend = -1; + const char *mtn_state = NULL; + + PR_Lock (r->repl_lock); + + if ( supplierRuv == NULL ) + { + ruv = (RUV*)object_get_data (r->repl_ruv); + PR_ASSERT (ruv); + + ruv_referrals= ruv_get_referrals(ruv); /* ruv_referrals has to be free'd */ + } + else + { + ruv_referrals = ruv_get_referrals(supplierRuv); + } + + replica_get_referrals_nolock (r, &replica_referrals); /* replica_referrals has to be free'd */ + + /* JCMREPL - What if there's a Total update in progress? */ + if( (r->repl_type==REPLICA_TYPE_READONLY) || (r->legacy_consumer) ) + { + state_backend = 0; + } + else if (r->repl_type==REPLICA_TYPE_UPDATABLE) + { + state_backend = 1; + } + /* Unlock to avoid changing MTN state under repl lock */ + PR_Unlock (r->repl_lock); + + if(state_backend == 0 ) + { + /* Read-Only - The mapping tree should be refering all update operations. */ + mtn_state = STATE_UPDATE_REFERRAL; + } + else if (state_backend == 1) + { + /* Updatable - The mapping tree should be accepting all update operations. */ + mtn_state = STATE_BACKEND; + } + + /* JCMREPL - Check the return code. */ + repl_set_mtn_state_and_referrals(repl_root_sdn, mtn_state, NULL, + ruv_referrals, replica_referrals); + charray_free(ruv_referrals); + charray_free(replica_referrals); +} + +void +replica_set_enabled (Replica *r, PRBool enable) +{ + char *repl_name = NULL; + + PR_ASSERT (r); + + PR_Lock (r->repl_lock); + + if (enable) + { + if (r->repl_eqcxt_rs == NULL) /* event is not already registered */ + { + repl_name = slapi_ch_strdup (r->repl_name); + r->repl_eqcxt_rs = slapi_eq_repeat(_replica_update_state, repl_name, + current_time() + START_UPDATE_DELAY, RUV_SAVE_INTERVAL); + } + } + else /* disable */ + { + if (r->repl_eqcxt_rs) /* event is still registerd */ + { + repl_name = slapi_eq_get_arg (r->repl_eqcxt_rs); + slapi_ch_free ((void**)&repl_name); + slapi_eq_cancel(r->repl_eqcxt_rs); + r->repl_eqcxt_rs = NULL; + } + } + + PR_Unlock (r->repl_lock); +} + +/* This function is generally called when replica's data store + is reloaded. It retrieves new RUV from the datastore. If new + RUV does not exist or if it is not as up to date as the purge RUV + of the corresponding changelog file, we need to remove */ + +/* the function minimizes the use of replica lock where ever possible. + Locking replica lock while calling changelog functions + causes a deadlock because changelog calls replica functions that + that lock the same lock */ + +int +replica_reload_ruv (Replica *r) +{ + int rc = 0; + Object *old_ruv_obj = NULL, *new_ruv_obj = NULL; + RUV *upper_bound_ruv = NULL; + RUV *new_ruv = NULL; + Object *r_obj; + + PR_ASSERT (r); + + PR_Lock (r->repl_lock); + + old_ruv_obj = r->repl_ruv; + + r->repl_ruv = NULL; + + rc = _replica_configure_ruv (r, PR_TRUE); + + PR_Unlock (r->repl_lock); + + if (rc != 0) + { + return rc; + } + + /* check if there is a changelog and whether this replica logs changes */ + if (cl5GetState () == CL5_STATE_OPEN && r->repl_flags & REPLICA_LOG_CHANGES) + { + + /* Compare new ruv to the changelog's upper bound ruv. We could only keep + the existing changelog if its upper bound is the same as replica's RUV. + This is because if changelog has changes not in RUV, they will be + eventually sent to the consumer's which will cause a state mismatch + (because the supplier does not actually contain the changes in its data store. + If, on the other hand, the changelog is not as up to date as the supplier, + it is not really useful since out of sync consumer's can't be brought + up to date using this changelog and hence will need to be reinitialized */ + + /* replace ruv to make sure we work with the correct changelog file */ + PR_Lock (r->repl_lock); + + new_ruv_obj = r->repl_ruv; + r->repl_ruv = old_ruv_obj; + + PR_Unlock (r->repl_lock); + + rc = cl5GetUpperBoundRUV (r, &upper_bound_ruv); + if (rc != CL5_SUCCESS && rc != CL5_NOTFOUND) + { + return -1; + } + + if (upper_bound_ruv) + { + new_ruv = object_get_data (new_ruv_obj); + PR_ASSERT (new_ruv); + + /* ONREPL - there are more efficient ways to establish RUV equality. + However, because this is not in the critical path and we at most + have 2 elements in the RUV, this will not effect performance */ + + if (!ruv_covers_ruv (new_ruv, upper_bound_ruv) || + !ruv_covers_ruv (upper_bound_ruv, new_ruv)) + { + char ebuf[BUFSIZ]; + + /* create a temporary replica object to conform to the interface */ + r_obj = object_new (r, NULL); + + /* We can't use existing changelog - remove existing file */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_reload_ruv: " + "Warning: new data for replica %s does not match the data in the changelog.\n" + " Recreating the changelog file. This could affect replication with replica's " + " consumers in which case the consumers should be reinitialized.\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + rc = cl5DeleteDBSync (r_obj); + + /* reinstate new ruv */ + PR_Lock (r->repl_lock); + + r->repl_ruv = new_ruv_obj; + + object_release (r_obj); + + if (rc == CL5_SUCCESS) + { + /* log changes to mark starting point for replication */ + rc = replica_log_ruv_elements_nolock (r); + } + + PR_Unlock (r->repl_lock); + } + else + { + /* we just need to reinstate new ruv */ + PR_Lock (r->repl_lock); + + r->repl_ruv = new_ruv_obj; + + PR_Unlock (r->repl_lock); + } + } + else /* upper bound vector is not there - we have no changes logged */ + { + /* reinstate new ruv */ + PR_Lock (r->repl_lock); + + r->repl_ruv = new_ruv_obj; + + /* just log elements of the current RUV. This is to have + a starting point for iteration through the changes */ + rc = replica_log_ruv_elements_nolock (r); + + PR_Unlock (r->repl_lock); + } + } + + if (rc == 0) + { + consumer5_set_mapping_tree_state_for_replica(r, NULL); + /* reset mapping tree referrals based on new local RUV */ + } + + if (old_ruv_obj) + object_release (old_ruv_obj); + + if (upper_bound_ruv) + ruv_destroy (&upper_bound_ruv); + + return rc; +} + +/* this function is called during server startup for each replica + to check whether the replica's data was reloaded offline and + whether replica's changelog needs to be reinitialized */ + +/* the function does not use replica lock but all functions it calls are + thread safe. Locking replica lock while calling changelog functions + causes a deadlock because changelog calls replica functions that + that lock the same lock */ +int replica_check_for_data_reload (Replica *r, void *arg) +{ + int rc = 0; + RUV *upper_bound_ruv = NULL; + RUV *r_ruv = NULL; + Object *r_obj, *ruv_obj; + int cl_cover_be, be_cover_cl; + + PR_ASSERT (r); + + /* check that we have a changelog and if this replica logs changes */ + if (cl5GetState () == CL5_STATE_OPEN && r->repl_flags & REPLICA_LOG_CHANGES) + { + /* Compare new ruv to the purge ruv. If the new contains csns which + are smaller than those in purge ruv, we need to remove old and + create new changelog file for this replica. This is because we + will not have sufficient changes to incrementally update a consumer + to the current state of the supplier. */ + + rc = cl5GetUpperBoundRUV (r, &upper_bound_ruv); + if (rc != CL5_SUCCESS && rc != CL5_NOTFOUND) + { + return -1; + } + + if (upper_bound_ruv) + { + ruv_obj = replica_get_ruv (r); + r_ruv = object_get_data (ruv_obj); + PR_ASSERT (r_ruv); + + /* Compare new ruv to the changelog's upper bound ruv. We could only keep + the existing changelog if its upper bound is the same as replica's RUV. + This is because if changelog has changes not in RUV, they will be + eventually sent to the consumer's which will cause a state mismatch + (because the supplier does not actually contain the changes in its data store. + If, on the other hand, the changelog is not as up to date as the supplier, + it is not really useful since out of sync consumer's can't be brought + up to date using this changelog and hence will need to be reinitialized */ + + /* + * Actually we can ignore the scenario that the changelog's upper + * bound ruv covers data store's ruv for two reasons: (1) a change + * is always written to the changelog after it is committed to the + * data store; (2) a change will be ignored if the server has seen + * it before - this happens frequently at the beginning of replication + * sessions. + */ + + be_cover_cl = ruv_covers_ruv (r_ruv, upper_bound_ruv); + cl_cover_be = ruv_covers_ruv (upper_bound_ruv, r_ruv); + if (!cl_cover_be) + { + /* the data was reloaded and we can no longer use existing changelog */ + char ebuf[BUFSIZ]; + + /* create a temporary replica object to conform to the interface */ + r_obj = object_new (r, NULL); + + /* We can't use existing changelog - remove existing file */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_check_for_data_reload: " + "Warning: data for replica %s was reloaded and it no longer matches the data " + "in the changelog (replica data %s changelog). Recreating the changelog file. This could affect replication " + "with replica's consumers in which case the consumers should be reinitialized.\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), + ((!be_cover_cl && !cl_cover_be) ? "<>" : (!be_cover_cl ? "<" : ">")) ); + + rc = cl5DeleteDBSync (r_obj); + + object_release (r_obj); + + if (rc == CL5_SUCCESS) + { + /* log changes to mark starting point for replication */ + rc = replica_log_ruv_elements (r); + } + } + + object_release (ruv_obj); + } + else /* we have no changes currently logged for this replica */ + { + /* log changes to mark starting point for replication */ + rc = replica_log_ruv_elements (r); + } + } + + if (rc == 0) + { + /* reset mapping tree referrals based on new local RUV */ + consumer5_set_mapping_tree_state_for_replica(r, NULL); + } + + if (upper_bound_ruv) + ruv_destroy (&upper_bound_ruv); + + return rc; +} + +/* Helper functions */ +/* reads replica configuration entry. The entry is the child of the + mapping tree node for the replica's backend */ + +static Slapi_Entry* +_replica_get_config_entry (const Slapi_DN *root) +{ + int rc = 0; + char *dn = NULL; + Slapi_Entry **entries; + Slapi_Entry *e = NULL; + Slapi_PBlock *pb = NULL; + + dn = _replica_get_config_dn (root); + pb = slapi_pblock_new (); + + slapi_search_internal_set_pb (pb, dn, LDAP_SCOPE_BASE, "objectclass=*", NULL, 0, NULL, + NULL, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_search_internal_pb (pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + if (rc == 0) + { + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries); + e = slapi_entry_dup (entries [0]); + } + + slapi_free_search_results_internal(pb); + slapi_pblock_destroy (pb); + slapi_ch_free_string(&dn); + + return e; +} + +static int +_replica_check_validity (const Replica *r) +{ + PR_ASSERT (r); + + if (r->repl_root == NULL || r->repl_type == 0 || r->repl_rid == 0 || + r->repl_rid > MAX_REPLICA_ID || r->repl_csngen == NULL || r->repl_name == NULL) + { + return -1; + } + else + { + return 0; + } +} + +/* replica configuration entry has the following format: + dn: cn=replica,<mapping tree node dn> + objectclass: top + objectclass: nsds5Replica + objectclass: extensibleObject + nsds5ReplicaRoot: <root of the replica> + nsds5ReplicaId: <replica id> + nsds5ReplicaType: <type of the replica: primary, read-write or read-only> + nsState: <state of the csn generator> missing the first time replica is started + nsds5ReplicaBindDN: <supplier update dn> consumers only + nsds5ReplicaReferral: <referral URL to updatable replica> consumers only + nsds5ReplicaPurgeDelay: <time, in seconds, to keep purgeable CSNs, 0 == keep forever> + nsds5ReplicaTombstonePurgeInterval: <time, in seconds, between tombstone purge runs, 0 == don't reap> + nsds5ReplicaLegacyConsumer: <TRUE | FALSE> + + richm: changed slapi entry from const to editable - if the replica id is supplied for a read + only replica, we ignore it and replace the value with the READ_ONLY_REPLICA_ID + */ +static int +_replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext) +{ + int rc; + Slapi_Attr *attr; + char *val; + CSNGen *gen; + char buf [BUFSIZ]; + char *errormsg = errortext? errortext : buf; + Slapi_Attr *a = NULL; + char dnescape[BUFSIZ]; /* for escape_string */ + + PR_ASSERT (r && e); + + /* get replica root */ + val = slapi_entry_attr_get_charptr (e, attr_replicaRoot); + if (val == NULL) + { + sprintf (errormsg, "failed to retrieve %s attribute from (%s)\n", + attr_replicaRoot, + escape_string((char*)slapi_entry_get_dn ((Slapi_Entry*)e), dnescape)); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "_replica_init_from_config: %s\n", + errormsg); + + return -1; + } + + r->repl_root = slapi_sdn_new_dn_passin (val); + + /* get replica type */ + val = slapi_entry_attr_get_charptr (e, attr_replicaType); + if (val) + { + r->repl_type = atoi(val); + slapi_ch_free ((void**)&val); + } + else + { + r->repl_type = REPLICA_TYPE_READONLY; + } + + /* get legacy consumer flag */ + val = slapi_entry_attr_get_charptr (e, type_replicaLegacyConsumer); + if (val) + { + if (strcasecmp (val, "on") == 0 || strcasecmp (val, "yes") == 0 || + strcasecmp (val, "true") == 0 || strcasecmp (val, "1") == 0) + { + r->legacy_consumer = PR_TRUE; + } + else + { + r->legacy_consumer = PR_FALSE; + } + + slapi_ch_free ((void**)&val); + } + else + { + r->legacy_consumer = PR_FALSE; + } + + /* get replica flags */ + r->repl_flags = slapi_entry_attr_get_ulong(e, attr_flags); + + /* get replicaid */ + /* the replica id is ignored for read only replicas and is set to the + special value READ_ONLY_REPLICA_ID */ + if (r->repl_type == REPLICA_TYPE_READONLY) + { + r->repl_rid = READ_ONLY_REPLICA_ID; + slapi_entry_attr_set_uint(e, attr_replicaId, (unsigned int)READ_ONLY_REPLICA_ID); + } + /* a replica id is required for updatable and primary replicas */ + else if (r->repl_type == REPLICA_TYPE_UPDATABLE || + r->repl_type == REPLICA_TYPE_PRIMARY) + { + if ((val = slapi_entry_attr_get_charptr (e, attr_replicaId))) + { + int temprid = atoi (val); + slapi_ch_free ((void**)&val); + if (temprid <= 0 || temprid >= READ_ONLY_REPLICA_ID) + { + sprintf (errormsg, + "attribute %s must have a value greater than 0 " + "and less than %d: entry %s", + attr_replicaId, READ_ONLY_REPLICA_ID, + escape_string((char*)slapi_entry_get_dn ((Slapi_Entry*)e), + dnescape)); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_init_from_config: %s\n", + errormsg); + return -1; + } + else + { + r->repl_rid = (ReplicaId)temprid; + } + } + else + { + sprintf (errormsg, "failed to retrieve required %s attribute from %s", + attr_replicaId, + escape_string((char*)slapi_entry_get_dn ((Slapi_Entry*)e), + dnescape)); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_init_from_config: %s\n", + errormsg); + return -1; + } + } + + attr = NULL; + rc = slapi_entry_attr_find(e, attr_state, &attr); + gen = csngen_new (r->repl_rid, attr); + if (gen == NULL) + { + sprintf (errormsg, "failed to create csn generator for replica (%s)", + escape_string((char*)slapi_entry_get_dn ((Slapi_Entry*)e), + dnescape)); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_init_from_config: %s\n", + errormsg); + return -1; + } + r->repl_csngen = object_new((void*)gen, (FNFree)csngen_free); + + /* Hook generator so we can maintain min/max CSN info */ + r->csn_pl_reg_id = csngen_register_callbacks(gen, assign_csn_callback, r, abort_csn_callback, r); + + /* get replication bind dn */ + r->updatedn_list = replica_updatedn_list_new(e); + + /* get replica name */ + val = slapi_entry_attr_get_charptr (e, attr_replicaName); + if (val) { + r->repl_name = val; + } + else + { + rc = slapi_uniqueIDGenerateString (&r->repl_name); + if (rc != UID_SUCCESS) + { + sprintf (errormsg, "failed to assign replica name for replica (%s); " + "uuid generator error - %d ", + escape_string((char*)slapi_entry_get_dn ((Slapi_Entry*)e), dnescape), + rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "_replica_init_from_config: %s\n", + errormsg); + return -1; + } + else + r->new_name = PR_TRUE; + } + + /* get the list of referrals */ + slapi_entry_attr_find( e, attr_replicaReferral, &attr ); + if(attr!=NULL) + { + slapi_attr_get_valueset(attr, &r->repl_referral); + } + + /* + * Set the purge offset (default 7 days). This is the extra + * time we allow purgeable CSNs to stick around, in case a + * replica regresses. Could also be useful when LCUP happens, + * since we don't know about LCUP replicas, and they can just + * turn up whenever they want to. + */ + if (slapi_entry_attr_find(e, type_replicaPurgeDelay, &a) == -1) + { + /* No purge delay provided, so use default */ + r->repl_purge_delay = 60 * 60 * 24 * 7; /* One week, in seconds */ + } + else + { + r->repl_purge_delay = slapi_entry_attr_get_uint(e, type_replicaPurgeDelay); + } + + if (slapi_entry_attr_find(e, type_replicaTombstonePurgeInterval, &a) == -1) + { + /* No reap interval provided, so use default */ + r->tombstone_reap_interval = 3600 * 24; /* One day */ + } + else + { + r->tombstone_reap_interval = slapi_entry_attr_get_int(e, type_replicaTombstonePurgeInterval); + } + + r->tombstone_reap_stop = r->tombstone_reap_active = PR_FALSE; + + return (_replica_check_validity (r)); +} + +/* This function updates the entry to contain information generated + during replica initialization. + Returns 0 if successful and -1 otherwise */ +static int +_replica_update_entry (Replica *r, Slapi_Entry *e, char *errortext) +{ + int rc; + Slapi_Mod smod; + Slapi_Value *val; + + PR_ASSERT (r); + + /* add attribute that stores state of csn generator */ + rc = csngen_get_state ((CSNGen*)object_get_data (r->repl_csngen), &smod); + if (rc != CSN_SUCCESS) + { + sprintf (errortext, "failed to get csn generator's state; csn error - %d", rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_update_entry: %s\n", errortext); + return -1; + } + + val = slapi_value_new_berval(slapi_mod_get_first_value(&smod)); + + rc = slapi_entry_add_value (e, slapi_mod_get_type (&smod), val); + + slapi_value_free(&val); + slapi_mod_done (&smod); + + if (rc != 0) + { + sprintf (errortext, "failed to update replica entry"); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_update_entry: %s\n", errortext); + return -1; + } + + /* add attribute that stores replica name */ + rc = slapi_entry_add_string (e, attr_replicaName, r->repl_name); + if (rc != 0) + { + sprintf (errortext, "failed to update replica entry"); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_update_entry: %s\n", errortext); + return -1; + } + else + r->new_name = PR_FALSE; + + return 0; +} + +/* DN format: cn=replica,cn=\"<root>\",cn=mapping tree,cn=config */ +static char* +_replica_get_config_dn (const Slapi_DN *root) +{ + char *dn; + const char *mp_base = slapi_get_mapping_tree_config_root (); + int len; + + PR_ASSERT (root); + + len = strlen (REPLICA_RDN) + strlen (slapi_sdn_get_dn (root)) + + strlen (mp_base) + 8; /* 8 = , + cn= + \" + \" + , + \0 */ + + dn = (char*)slapi_ch_malloc (len); + sprintf (dn, "%s,cn=\"%s\",%s", REPLICA_RDN, slapi_sdn_get_dn (root), mp_base); + + return dn; +} + +/* This function retrieves RUV from the root of the replicated tree. + * The attribute can be missing if + * (1) this replica is the first supplier and replica generation has not been assigned + * or + * (2) this is a consumer that has not been yet initialized + * In either case, replica_set_ruv should be used to further initialize the replica. + * Returns 0 on success, -1 on failure. If 0 is returned, the RUV is present in the replica. + */ +static int +_replica_configure_ruv (Replica *r, PRBool isLocked) +{ + Slapi_PBlock *pb = NULL; + char *attrs[2]; + int rc; + int return_value = -1; + Slapi_Entry **entries = NULL; + Slapi_Attr *attr; + RUV *ruv = NULL; + CSN *csn = NULL; + ReplicaId rid = 0; + char ebuf[BUFSIZ]; + + /* read ruv state from the ruv tombstone entry */ + pb = slapi_pblock_new(); + attrs[0] = (char*)type_ruvElement; + attrs[1] = NULL; + slapi_search_internal_set_pb( + pb, + slapi_sdn_get_dn(r->repl_root), + LDAP_SCOPE_BASE, + "objectclass=*", + attrs, + 0, /* attrsonly */ + NULL, /* controls */ + RUV_STORAGE_ENTRY_UNIQUEID, + repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), + OP_FLAG_REPLICATED); /* flags */ + slapi_search_internal_pb (pb); + + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + if (rc == LDAP_SUCCESS) + { + /* get RUV attributes and construct the RUV */ + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries); + if (NULL == entries || NULL == entries[0]) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_configure_ruv: replica ruv tombstone entry for " + "replica %s not found\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + goto done; + } + + rc = slapi_entry_attr_find(entries[0], type_ruvElement, &attr); + if (rc != 0) /* ruv attribute is missing - this not allowed */ + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_configure_ruv: replica ruv tombstone entry for " + "replica %s does not contain %s\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), type_ruvElement); + goto done; + } + + /* Check in the tombstone we have retrieved if the local purl is + already present: + rid == 0: the local purl is not present + rid != 0: the local purl is present ==> nothing to do + */ + ruv_init_from_slapi_attr_and_check_purl (attr, &ruv, &rid); + if (ruv) + { + char *generation = NULL; + generation = ruv_get_replica_generation(ruv); + if (NULL != generation) + { + r->repl_ruv = object_new((void*)ruv, (FNFree)ruv_destroy); + + /* Is the local purl in the ruv? (the port or the host could have + changed) + */ + /* A consumer only doesn't have its purl in its ruv */ + if (r->repl_type == REPLICA_TYPE_UPDATABLE) + { + int need_update = 0; + if (rid == 0) + { + /* We can not have more than 1 ruv with the same rid + so we replace it */ + const char *purl = NULL; + + purl = multimaster_get_local_purl(); + ruv_delete_replica(ruv, r->repl_rid); + ruv_add_index_replica(ruv, r->repl_rid, purl, 1); + need_update = 1; /* ruv changed, so write tombstone */ + } + else /* bug 540844: make sure the local supplier rid is first in the ruv */ + { + /* make sure local supplier is first in list */ + ReplicaId first_rid = 0; + char *first_purl = NULL; + ruv_get_first_id_and_purl(ruv, &first_rid, &first_purl); + /* if the local supplier is not first in the list . . . */ + if (rid != first_rid) + { + /* . . . move the local supplier to the beginning of the list */ + ruv_move_local_supplier_to_first(ruv, rid); + need_update = 1; /* must update tombstone also */ + } + } + + /* Update also the directory entry */ + if (need_update) { + /* richm 20010821 bug 556498 + replica_replace_ruv_tombstone acquires the repl_lock, so release + the lock then reacquire it if locked */ + if (isLocked) PR_Unlock(r->repl_lock); + replica_replace_ruv_tombstone(r); + if (isLocked) PR_Lock(r->repl_lock); + } + } + + slapi_ch_free((void **)&generation); + return_value = 0; + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "RUV for replica %s is missing replica generation\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + goto done; + } + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Unable to convert %s attribute in entry %s to a replica update vector.\n", + type_ruvElement, escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + goto done; + } + + } + else /* search failed */ + { + if (LDAP_NO_SUCH_OBJECT == rc) + { + /* The entry doesn't exist: create it */ + rc = replica_create_ruv_tombstone(r); + if (LDAP_SUCCESS != rc) + { + /* + * XXXggood - the following error appears on startup if we try + * to initialize replica RUVs before the backend instance is up. + * It's alarming to see this error, and we should suppress it + * (or avoid trying to configure it) if the backend instance is + * not yet online. + */ + /* + * XXXrichm - you can also get this error when the backend is in + * read only mode c.f. bug 539782 + */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_configure_ruv: failed to create replica ruv tombstone " + "entry (%s); LDAP error - %d\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), rc); + goto done; + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "_replica_configure_ruv: No ruv tombstone found for replica %s. " + "Created a new one\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + return_value = 0; + } + } + else + { + /* see if the suffix is disabled */ + char *state = slapi_mtn_get_state(r->repl_root); + if (state && !strcasecmp(state, "disabled")) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_configure_ruv: replication disabled for " + "entry (%s); LDAP error - %d\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), rc); + slapi_ch_free_string(&state); + goto done; + } + else if (!r->repl_ruv) /* other error */ + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_configure_ruv: replication broken for " + "entry (%s); LDAP error - %d\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), rc); + slapi_ch_free_string(&state); + goto done; + } + else /* some error but continue anyway? */ + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "_replica_configure_ruv: Error %d reading tombstone for replica %s.\n", + rc, escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + return_value = 0; + } + slapi_ch_free_string(&state); + } + } + + if (NULL != r->min_csn_pl) + { + csnplFree (&r->min_csn_pl); + } + + /* create pending list for min csn if necessary */ + if (ruv_get_smallest_csn_for_replica ((RUV*)object_get_data (r->repl_ruv), + r->repl_rid, &csn) == RUV_SUCCESS) + { + csn_free (&csn); + r->min_csn_pl = NULL; + } + else + { + /* + * The local replica has not generated any of its own CSNs yet. + * We need to watch CSNs being generated and note the first + * locally-generated CSN that's committed. Once that event occurs, + * the RUV is suitable for iteration over locally generated + * changes. + */ + r->min_csn_pl = csnplNew(); + } + +done: + if (NULL != pb) + { + slapi_free_search_results_internal(pb); + slapi_pblock_destroy (pb); + } + if (return_value != 0) + { + if (ruv) + ruv_destroy (&ruv); + } + + return return_value; +} + +/* NOTE - this is the only non-api function that performs locking because + it is called by the event queue */ +static void +_replica_update_state (time_t when, void *arg) +{ + int rc; + const char *replica_name = (const char *)arg; + Object *replica_object = NULL; + Replica *r; + Slapi_Mod smod; + LDAPMod *mods[3]; + Slapi_PBlock *pb = NULL; + char *dn = NULL; + + if (NULL == replica_name) + return; + + /* + * replica_get_by_name() will acquire the replica object + * and that could prevent the replica from being destroyed + * until the object_release is called. + */ + replica_object = replica_get_by_name(replica_name); + if (NULL == replica_object) + { + return; + } + + /* We have a reference, so replica won't vanish on us. */ + r = (Replica *)object_get_data(replica_object); + if (NULL == r) + { + goto done; + } + + PR_Lock(r->repl_lock); + + /* replica state is currently being updated + or no CSN was assigned - bail out */ + if (r->state_update_inprogress) + { + PR_Unlock(r->repl_lock); + goto done; + } + + /* This might be a consumer */ + if (!r->repl_csn_assigned) + { + /* EY: the consumer needs to flush ruv to disk. */ + PR_Unlock(r->repl_lock); + replica_write_ruv(r); + goto done; + } + + /* ONREPL update csn generator state of an updatable replica only */ + /* ONREPL state always changes because we update time every second and + we write state to the disk less frequently */ + rc = csngen_get_state ((CSNGen*)object_get_data (r->repl_csngen), &smod); + if (rc != 0) + { + PR_Unlock(r->repl_lock); + goto done; + } + + r->state_update_inprogress = PR_TRUE; + r->repl_csn_assigned = PR_FALSE; + + dn = _replica_get_config_dn (r->repl_root); + pb = slapi_pblock_new(); + mods[0] = (LDAPMod*)slapi_mod_get_ldapmod_byref(&smod); + + /* we don't want to held lock during operations since it causes lock contention + and sometimes deadlock. So releasing lock here */ + + PR_Unlock(r->repl_lock); + + /* replica repl_name and new_name attributes do not get changed once + the replica is configured - so it is ok that they are outside replica lock */ + + /* write replica name if it has not been written before */ + if (r->new_name) + { + struct berval *vals[2]; + struct berval val; + LDAPMod mod; + + mods[1] = &mod; + + mod.mod_op = LDAP_MOD_REPLACE; + mod.mod_type = (char*)attr_replicaName; + mod.mod_bvalues = vals; + vals [0] = &val; + vals [1] = NULL; + val.bv_val = r->repl_name; + val.bv_len = strlen (val.bv_val); + mods[2] = NULL; + } + else + { + mods[1] = NULL; + } + + slapi_modify_internal_set_pb (pb, dn, mods, NULL, NULL, + repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_modify_internal_pb (pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + if (rc != LDAP_SUCCESS) + { + char ebuf[BUFSIZ]; + + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "_replica_update_state: " + "failed to update state of csn generator for replica %s: LDAP " + "error - %d\n", escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), rc); + } + else + { + r->new_name = PR_FALSE; + } + + /* update RUV - performs its own locking */ + replica_write_ruv (r); + + /* since this is the only place this value is changed and we are + guaranteed that only one thread enters the function, its ok + to change it outside replica lock */ + r->state_update_inprogress = PR_FALSE; + + slapi_ch_free ((void**)&dn); + slapi_pblock_destroy (pb); + slapi_mod_done (&smod); + +done: + if (replica_object) + object_release (replica_object); +} + +void +replica_write_ruv (Replica *r) +{ + int rc; + Slapi_Mod smod; + Slapi_Mod smod_last_modified; + LDAPMod *mods [3]; + Slapi_PBlock *pb; + + PR_ASSERT(r); + + PR_Lock(r->repl_lock); + + if (!r->repl_ruv_dirty) + { + PR_Unlock(r->repl_lock); + return; + } + + PR_ASSERT (r->repl_ruv); + + ruv_to_smod ((RUV*)object_get_data(r->repl_ruv), &smod); + ruv_last_modified_to_smod ((RUV*)object_get_data(r->repl_ruv), &smod_last_modified); + + PR_Unlock (r->repl_lock); + + mods [0] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod); + mods [1] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod_last_modified); + mods [2] = NULL; + pb = slapi_pblock_new(); + + /* replica name never changes so it is ok to reference it outside the lock */ + slapi_modify_internal_set_pb( + pb, + slapi_sdn_get_dn(r->repl_root), /* only used to select be */ + mods, + NULL, /* controls */ + RUV_STORAGE_ENTRY_UNIQUEID, + repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), + /* Add OP_FLAG_TOMBSTONE_ENTRY so that this doesn't get logged in the Retro ChangeLog */ + OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP | OP_FLAG_TOMBSTONE_ENTRY); + slapi_modify_internal_pb (pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + + /* ruv does not exist - create one */ + PR_Lock(r->repl_lock); + + if (rc == LDAP_SUCCESS) + { + r->repl_ruv_dirty = PR_FALSE; + } + else if (rc == LDAP_NO_SUCH_OBJECT) + { + /* this includes an internal operation - but since this only happens + during server startup - its ok that we have lock around it */ + rc = _replica_configure_ruv (r, PR_TRUE); + if (rc == 0) + r->repl_ruv_dirty = PR_FALSE; + } + else /* error */ + { + char ebuf[BUFSIZ]; + + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "replica_write_ruv: failed to update RUV tombstone for %s; " + "LDAP error - %d\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf), rc); + PR_ASSERT (0); + } + + PR_Unlock(r->repl_lock); + + slapi_mod_done (&smod); + slapi_mod_done (&smod_last_modified); + slapi_pblock_destroy (pb); +} + + +const CSN * +_get_deletion_csn(Slapi_Entry *e) +{ + const CSN *deletion_csn = NULL; + + PR_ASSERT(NULL != e); + if (NULL != e) + { + Slapi_Attr *oc_attr = NULL; + if (entry_attr_find_wsi(e, SLAPI_ATTR_OBJECTCLASS, &oc_attr) == ATTRIBUTE_PRESENT) + { + Slapi_Value *tombstone_value = NULL; + struct berval v; + v.bv_val = SLAPI_ATTR_VALUE_TOMBSTONE; + v.bv_len = strlen(SLAPI_ATTR_VALUE_TOMBSTONE); + if (attr_value_find_wsi(oc_attr, &v, &tombstone_value) == VALUE_PRESENT) + { + deletion_csn = value_get_csn(tombstone_value, CSN_TYPE_VALUE_UPDATED); + } + } + } + return deletion_csn; +} + + +static void +_delete_tombstone(const char *tombstone_dn, const char *uniqueid) +{ + + PR_ASSERT(NULL != tombstone_dn && NULL != uniqueid); + if (NULL == tombstone_dn || NULL == uniqueid) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "_delete_tombstone: " + "NULL tombstone_dn or uniqueid provided.\n"); + } + else + { + int ldaprc; + Slapi_PBlock *pb = slapi_pblock_new(); + slapi_delete_internal_set_pb(pb, tombstone_dn, NULL, /* controls */ + uniqueid, repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), + OP_FLAG_TOMBSTONE_ENTRY); + slapi_delete_internal_pb(pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &ldaprc); + if (LDAP_SUCCESS != ldaprc) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_delete_tombstone: unable to delete tombstone %s, " + "uniqueid %s: %s.\n", tombstone_dn, uniqueid, + ldap_err2string(ldaprc)); + } + slapi_pblock_destroy(pb); + } +} + +static +void get_reap_result (int rc, void *cb_data) +{ + PR_ASSERT (cb_data); + + ((reap_callback_data*)cb_data)->rc = rc; +} + +static +int process_reap_entry (Slapi_Entry *entry, void *cb_data) +{ + char ebuf[BUFSIZ]; + char deletion_csn_str[CSN_STRSIZE]; + char purge_csn_str[CSN_STRSIZE]; + unsigned long *num_entriesp = &((reap_callback_data *)cb_data)->num_entries; + unsigned long *num_purged_entriesp = &((reap_callback_data *)cb_data)->num_purged_entries; + CSN *purge_csn = ((reap_callback_data *)cb_data)->purge_csn; + PRBool *tombstone_reap_stop = ((reap_callback_data *)cb_data)->tombstone_reap_stop; + /* we only ask for the objectclass in the search - the deletion csn is in the + objectclass attribute values - if we need more attributes returned by the + search in the future, see _replica_reap_tombstones below and add more to the + attrs array */ + const CSN *deletion_csn = _get_deletion_csn(entry); + + if ((NULL == deletion_csn || csn_compare(deletion_csn, purge_csn) < 0) && + (!is_ruv_tombstone_entry(entry))) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "_replica_reap_tombstones: removing tombstone %s " + "because its deletion csn (%s) is less than the " + "purge csn (%s).\n", + escape_string(slapi_entry_get_dn(entry), ebuf), + csn_as_string(deletion_csn, PR_FALSE, deletion_csn_str), + csn_as_string(purge_csn, PR_FALSE, purge_csn_str)); + _delete_tombstone(slapi_entry_get_dn(entry), + slapi_entry_get_uniqueid(entry)); + (*num_purged_entriesp)++; + } + else { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "_replica_reap_tombstones: NOT removing tombstone " + "%s\n", escape_string(slapi_entry_get_dn(entry),ebuf)); + } + (*num_entriesp)++; + if (*tombstone_reap_stop || g_get_shutdown()) { + return -1; + } + + return 0; +} + + + + +/* This does the actual work of searching for tombstones and deleting them. + This must be called in a separate thread because it may take a long time. +*/ +static void +_replica_reap_tombstones(void *arg) +{ + const char *replica_name = (const char *)arg; + Slapi_PBlock *pb = NULL; + Object *replica_object = NULL; + Replica *replica = NULL; + CSN *purge_csn = NULL; + char ebuf[BUFSIZ]; + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Info: Beginning tombstone reap for replica %s.\n", + replica_name ? replica_name : "(null)"); + + if (NULL == replica_name) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Warning: Replica name is null in tombstone reap\n"); + goto done; + } + + /* + * replica_get_by_name() will acquire the replica object + * and that could prevent the replica from being destroyed + * until the object_release is called. + */ + replica_object = replica_get_by_name(replica_name); + if (NULL == replica_object) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Warning: Replica object %s is null in tombstone reap\n", replica_name); + goto done; + } + + /* We have a reference, so replica won't vanish on us. */ + replica = (Replica *)object_get_data(replica_object); + if (NULL == replica) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Warning: Replica %s is null in tombstone reap\n", replica_name); + goto done; + } + + if (replica->tombstone_reap_stop) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Info: Replica %s reap stop flag is set for tombstone reap\n", replica_name); + goto done; + } + + purge_csn = replica_get_purge_csn(replica); + if (NULL != purge_csn) + { + LDAPControl **ctrls; + int oprc; + reap_callback_data cb_data; + char **attrs = NULL; + + /* we just need the objectclass - for the deletion csn + and the dn and nsuniqueid - for possible deletion + saves time to return only 2 attrs + */ + charray_add(&attrs, slapi_ch_strdup("objectclass")); + charray_add(&attrs, slapi_ch_strdup("nsuniqueid")); + + ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *)); + ctrls[0] = create_managedsait_control(); + ctrls[1] = create_backend_control(replica->repl_root); + ctrls[2] = NULL; + pb = slapi_pblock_new(); + slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(replica->repl_root), + LDAP_SCOPE_SUBTREE, "(&(objectclass=nstombstone)(nscpentrydn=*))", + attrs, 0, ctrls, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + + cb_data.rc = 0; + cb_data.num_entries = 0UL; + cb_data.num_purged_entries = 0UL; + cb_data.purge_csn = purge_csn; + cb_data.tombstone_reap_stop = &(replica->tombstone_reap_stop); + + slapi_search_internal_callback_pb (pb, &cb_data /* callback data */, + get_reap_result /* result callback */, + process_reap_entry /* entry callback */, + NULL /* referral callback*/); + + charray_free(attrs); + + oprc = cb_data.rc; + + if (LDAP_SUCCESS != oprc) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "_replica_reap_tombstones: failed when searching for " + "tombstones in replica %s: %s. Will try again in %d " + "seconds.\n", escape_string(slapi_sdn_get_dn(replica->repl_root),ebuf), + ldap_err2string(oprc), replica->tombstone_reap_interval); + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "_replica_reap_tombstones: purged %d of %d tombstones " + "in replica %s. Will try again in %d " + "seconds.\n", cb_data.num_purged_entries, cb_data.num_entries, + escape_string(slapi_sdn_get_dn(replica->repl_root),ebuf), + replica->tombstone_reap_interval); + } + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Info: No purge CSN for tombstone reap for replica %s.\n", + replica_name ? replica_name : "(null)"); + } + + PR_Lock(replica->repl_lock); + replica->tombstone_reap_active = PR_FALSE; + PR_Unlock(replica->repl_lock); + +done: + if (NULL != purge_csn) + { + csn_free(&purge_csn); + } + if (NULL != pb) + { + slapi_free_search_results_internal(pb); + slapi_pblock_destroy(pb); + } + if (NULL != replica_object) + { + object_release(replica_object); + replica_object = NULL; + replica = NULL; + } + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Info: Finished tombstone reap for replica %s.\n", + replica_name ? replica_name : "(null)"); + +} + +/* + We don't want to run the reaper function directly from the event + queue since it may hog the event queue, starving other events. + See bug 604441 + The function eq_cb_reap_tombstones will fire off the actual thread + that does the real work. +*/ +static void +eq_cb_reap_tombstones(time_t when, void *arg) +{ + const char *replica_name = (const char *)arg; + Object *replica_object = NULL; + Replica *replica = NULL; + + if (NULL != replica_name) + { + /* + * replica_get_by_name() will acquire the replica object + * and that could prevent the replica from being destroyed + * until the object_release is called. + */ + replica_object = replica_get_by_name(replica_name); + if (NULL != replica_object) + { + /* We have a reference, so replica won't vanish on us. */ + replica = (Replica *)object_get_data(replica_object); + if (replica) + { + + PR_Lock(replica->repl_lock); + + /* No action if purge is disabled or the previous purge is not done yet */ + if (replica->tombstone_reap_interval != 0 && + replica->tombstone_reap_active == PR_FALSE) + { + /* set the flag here to minimize race conditions */ + replica->tombstone_reap_active = PR_TRUE; + if (PR_CreateThread(PR_USER_THREAD, + _replica_reap_tombstones, (void *)replica_name, + PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, + SLAPD_DEFAULT_THREAD_STACKSIZE) == NULL) + { + replica->tombstone_reap_active = PR_FALSE; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Error: unable to create the tombstone reap thread for replica %s. " + "Possible system resources problem\n", + replica_name); + } + } + /* reap thread will wait until this lock is released */ + PR_Unlock(replica->repl_lock); + } + object_release(replica_object); + replica_object = NULL; + replica = NULL; + } + } +} + +static char * +_replica_type_as_string (const Replica *r) +{ + switch (r->repl_type) + { + case REPLICA_TYPE_PRIMARY: return "primary"; + case REPLICA_TYPE_READONLY: return "read-only"; + case REPLICA_TYPE_UPDATABLE: return "updatable"; + default: return "unknown"; + } +} + + +static const char *root_glue = + "dn: %s\n" + "objectclass: top\n" + "objectclass: nsTombstone\n" + "objectclass: extensibleobject\n" + "nsuniqueid: %s\n"; + +static int +replica_create_ruv_tombstone(Replica *r) +{ + int return_value = LDAP_LOCAL_ERROR; + char *root_entry_str; + Slapi_Entry *e; + const char *purl = NULL; + RUV *ruv; + struct berval **bvals = NULL; + Slapi_PBlock *pb = NULL; + int rc; + char ebuf[BUFSIZ]; + + PR_ASSERT(NULL != r && NULL != r->repl_root); + root_entry_str = slapi_ch_malloc(strlen(root_glue) + + slapi_sdn_get_ndn_len(r->repl_root) + + strlen(RUV_STORAGE_ENTRY_UNIQUEID) + 1); + sprintf(root_entry_str, root_glue, slapi_sdn_get_ndn(r->repl_root), + RUV_STORAGE_ENTRY_UNIQUEID); + + e = slapi_str2entry(root_entry_str, SLAPI_STR2ENTRY_TOMBSTONE_CHECK); + if (e == NULL) + goto done; + + /* Add ruv */ + if (r->repl_ruv == NULL) + { + CSNGen *gen; + CSN *csn; + char csnstr [CSN_STRSIZE]; + + /* first attempt to write RUV tombstone - need to create RUV */ + gen = (CSNGen *)object_get_data(r->repl_csngen); + PR_ASSERT (gen); + + if (csngen_new_csn(gen, &csn, PR_FALSE /* notify */) == CSN_SUCCESS) + { + (void)csn_as_string(csn, PR_FALSE, csnstr); + csn_free(&csn); + + /* if this is an updateable replica - add its own + element to the RUV so that referrals work correctly */ + if (r->repl_type == REPLICA_TYPE_UPDATABLE) + purl = multimaster_get_local_purl(); + + if (ruv_init_new(csnstr, r->repl_rid, purl, &ruv) == RUV_SUCCESS) + { + r->repl_ruv = object_new((void*)ruv, (FNFree)ruv_destroy); + r->repl_ruv_dirty = PR_TRUE; + return_value = LDAP_SUCCESS; + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Cannot create new replica update vector for %s\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + goto done; + } + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Cannot obtain CSN for new replica update vector for %s\n", + escape_string(slapi_sdn_get_dn(r->repl_root),ebuf)); + goto done; + } + } + else /* failed to write the entry because DB was not initialized - retry */ + { + ruv = (RUV*) object_get_data (r->repl_ruv); + PR_ASSERT (ruv); + } + + PR_ASSERT (r->repl_ruv); + + rc = ruv_to_bervals(ruv, &bvals); + if (rc != RUV_SUCCESS) + { + goto done; + } + + /* ONREPL this is depricated function but there is currently no better API to use */ + rc = slapi_entry_add_values(e, type_ruvElement, bvals); + if (rc != 0) + { + goto done; + } + + + pb = slapi_pblock_new(); + slapi_add_entry_internal_set_pb( + pb, + e, + NULL /* controls */, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), + OP_FLAG_TOMBSTONE_ENTRY | OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP); + slapi_add_internal_pb(pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value); + if (return_value == LDAP_SUCCESS) + r->repl_ruv_dirty = PR_FALSE; + +done: + if (return_value != LDAP_SUCCESS) + { + slapi_entry_free (e); + } + + if (bvals) + ber_bvecfree(bvals); + + if (pb) + slapi_pblock_destroy(pb); + + slapi_ch_free((void **) &root_entry_str); + + return return_value; +} + + +static void +assign_csn_callback(const CSN *csn, void *data) +{ + Replica *r = (Replica *)data; + Object *ruv_obj; + RUV *ruv; + + PR_ASSERT(NULL != csn); + PR_ASSERT(NULL != r); + + ruv_obj = replica_get_ruv (r); + PR_ASSERT (ruv_obj); + ruv = (RUV*)object_get_data (ruv_obj); + PR_ASSERT (ruv); + + PR_Lock(r->repl_lock); + + r->repl_csn_assigned = PR_TRUE; + + if (NULL != r->min_csn_pl) + { + if (csnplInsert(r->min_csn_pl, csn) != 0) + { + char ebuf[BUFSIZ]; + char csn_str[CSN_STRSIZE]; /* For logging only */ + /* Ack, we can't keep track of min csn. Punt. */ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "assign_csn_callback: " + "failed to insert csn %s for replica %s\n", + csn_as_string(csn, PR_FALSE, csn_str), + escape_string(slapi_sdn_get_dn(r->repl_root), ebuf)); + csnplFree(&r->min_csn_pl); + } + } + + ruv_add_csn_inprogress (ruv, csn); + + PR_Unlock(r->repl_lock); + + object_release (ruv_obj); +} + + +static void +abort_csn_callback(const CSN *csn, void *data) +{ + Replica *r = (Replica *)data; + Object *ruv_obj; + RUV *ruv; + int rc; + + PR_ASSERT(NULL != csn); + PR_ASSERT(NULL != data); + + ruv_obj = replica_get_ruv (r); + PR_ASSERT (ruv_obj); + ruv = (RUV*)object_get_data (ruv_obj); + PR_ASSERT (ruv); + + PR_Lock(r->repl_lock); + + if (NULL != r->min_csn_pl) + { + rc = csnplRemove(r->min_csn_pl, csn); + PR_ASSERT(rc == 0); + } + + ruv_cancel_csn_inprogress (ruv, csn); + PR_Unlock(r->repl_lock); + + object_release (ruv_obj); +} + +static CSN * +_replica_get_purge_csn_nolock(const Replica *r) +{ + static unsigned long a_week = 3600*24*7; + CSN *purge_csn = NULL; + CSN **csns = NULL; + RUV *ruv; + time_t cutoff_time; + time_t max_time_in_csn_list; + int i; + + if (r->repl_purge_delay > 0) + { + /* + * Don't let inactive or obsolete masters in the ruv hold back + * the purge forever: + * - set a graceful period of at least 7 days; + * - set cutoff_time = max(maxcsns) - gracefule_period; + * - the first maxcsn that was generated at or after the cutoff + * time would be the purge csn. + */ + + /* get a sorted list of all maxcsns in ruv in ascend order */ + object_acquire(r->repl_ruv); + ruv = object_get_data(r->repl_ruv); + csns = cl5BuildCSNList (ruv, NULL); + object_release(r->repl_ruv); + + if (csns == NULL) + return NULL; + + /* locate the max csn in the csn list */ + for (i = 0; csns[i]; i++); + max_time_in_csn_list = csn_get_time (csns[i-1]); + + if ( r->repl_purge_delay > a_week ) + { + cutoff_time = max_time_in_csn_list - r->repl_purge_delay; + } + else + { + cutoff_time = max_time_in_csn_list - a_week; + } + for (i = 0; csns[i]; i++) + { + if ( csn_get_time (csns[i]) >= cutoff_time ) + { + purge_csn = csn_dup (csns[i]); + break; + } + } + + /* Subtract purge delay */ + if (purge_csn) + { + csn_set_time(purge_csn, csn_get_time(purge_csn) - r->repl_purge_delay); + } + } + + if (csns) + cl5DestroyCSNList (&csns); + + return purge_csn; +} + +static void +replica_get_referrals_nolock (const Replica *r, char ***referrals) +{ + if(referrals!=NULL) + { + + int hint; + int i= 0; + Slapi_Value *v= NULL; + + if (NULL == r->repl_referral) + { + *referrals = NULL; + } + else + { + /* richm: +1 for trailing NULL */ + *referrals= (char**)slapi_ch_calloc(sizeof(char*),1+slapi_valueset_count(r->repl_referral)); + hint= slapi_valueset_first_value( r->repl_referral, &v ); + while(v!=NULL) + { + const char *s= slapi_value_get_string(v); + if(s!=NULL && s[0]!='\0') + { + (*referrals)[i]= slapi_ch_strdup(s); + i++; + } + hint= slapi_valueset_next_value( r->repl_referral, hint, &v); + } + (*referrals)[i] = NULL; + } + + } +} + +static void +replica_clear_legacy_referrals(const Slapi_DN *repl_root_sdn, + char **referrals, const char *state) +{ + repl_set_mtn_state_and_referrals(repl_root_sdn, state, NULL, NULL, referrals); +} + +static void +replica_remove_legacy_attr (const Slapi_DN *repl_root_sdn, const char *attr) +{ + Slapi_PBlock *pb; + Slapi_Mods smods; + LDAPControl **ctrls; + int rc; + + pb = slapi_pblock_new (); + + slapi_mods_init(&smods, 1); + slapi_mods_add(&smods, LDAP_MOD_DELETE, attr, 0, NULL); + + + ctrls = (LDAPControl**)slapi_ch_malloc (2 * sizeof (LDAPControl*)); + ctrls[0] = create_managedsait_control (); + ctrls[1] = NULL; + + /* remove copiedFrom/copyingFrom first */ + slapi_modify_internal_set_pb (pb, slapi_sdn_get_dn (repl_root_sdn), + slapi_mods_get_ldapmods_passout (&smods), ctrls, + NULL /*uniqueid */, + repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION) , + 0 /* operation_flags */); + + slapi_modify_internal_pb (pb); + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + if (rc != LDAP_SUCCESS) + { + char ebuf[BUFSIZ]; + + /* this is not a fatal error because the attribute may not be there */ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replica_remove_legacy_attr: " + "failed to remove legacy attribute %s for replica %s; LDAP error - %d\n", + attr, escape_string(slapi_sdn_get_dn(repl_root_sdn),ebuf), rc); + } + + slapi_mods_done (&smods); + slapi_pblock_destroy (pb); +} + +static int +replica_log_ruv_elements_nolock (const Replica *r) +{ + int rc = 0; + slapi_operation_parameters op_params; + RUV *ruv; + char *repl_gen; + CSN *csn = NULL; + + ruv = (RUV*) object_get_data (r->repl_ruv); + PR_ASSERT (ruv); + + if ((ruv_get_min_csn(ruv, &csn) == RUV_SUCCESS) && csn) + { + /* we log it as a delete operation to have the least number of fields + to set. the entry can be identified by a special target uniqueid and + special target dn */ + memset (&op_params, 0, sizeof (op_params)); + op_params.operation_type = SLAPI_OPERATION_DELETE; + op_params.target_address.dn = START_ITERATION_ENTRY_DN; + op_params.target_address.uniqueid = START_ITERATION_ENTRY_UNIQUEID; + op_params.csn = csn; + repl_gen = ruv_get_replica_generation (ruv); + + rc = cl5WriteOperation(r->repl_name, repl_gen, &op_params, PR_FALSE); + if (rc == CL5_SUCCESS) + rc = 0; + else + rc = -1; + + slapi_ch_free ((void**)&repl_gen); + csn_free (&csn); + } + + return rc; +} + +void +replica_set_purge_delay(Replica *r, PRUint32 purge_delay) +{ + PR_ASSERT(r); + PR_Lock(r->repl_lock); + r->repl_purge_delay = purge_delay; + PR_Unlock(r->repl_lock); +} + +void +replica_set_tombstone_reap_interval (Replica *r, long interval) +{ + char *repl_name; + + PR_Lock(r->repl_lock); + + /* + * Leave the event there to purge the existing tombstones + * if we are about to turn off tombstone creation + */ + if (interval > 0 && r->repl_eqcxt_tr && r->tombstone_reap_interval != interval) + { + int found; + + repl_name = slapi_eq_get_arg (r->repl_eqcxt_tr); + slapi_ch_free ((void**)&repl_name); + found = slapi_eq_cancel (r->repl_eqcxt_tr); + slapi_log_error (SLAPI_LOG_REPL, NULL, + "tombstone_reap event (interval=%d) was %s\n", + r->tombstone_reap_interval, (found ? "cancelled" : "not found")); + r->repl_eqcxt_tr = NULL; + } + r->tombstone_reap_interval = interval; + if ( interval > 0 && r->repl_eqcxt_tr == NULL ) + { + repl_name = slapi_ch_strdup (r->repl_name); + r->repl_eqcxt_tr = slapi_eq_repeat (eq_cb_reap_tombstones, repl_name, current_time() + START_REAP_DELAY, 1000 * r->tombstone_reap_interval); + slapi_log_error (SLAPI_LOG_REPL, NULL, + "tombstone_reap event (interval=%d) was %s\n", + r->tombstone_reap_interval, (r->repl_eqcxt_tr ? "scheduled" : "not scheduled successfully")); + } + PR_Unlock(r->repl_lock); +} + +/* Update the tombstone entry to reflect the content of the ruv */ +static void +replica_replace_ruv_tombstone(Replica *r) +{ + Slapi_PBlock *pb = NULL; + char *dn; + int rc; + + Slapi_Mod smod; + Slapi_Mod smod_last_modified; + LDAPMod *mods [3]; + + PR_ASSERT(NULL != r && NULL != r->repl_root); + + PR_Lock(r->repl_lock); + + PR_ASSERT (r->repl_ruv); + ruv_to_smod ((RUV*)object_get_data(r->repl_ruv), &smod); + ruv_last_modified_to_smod ((RUV*)object_get_data(r->repl_ruv), &smod_last_modified); + + dn = _replica_get_config_dn (r->repl_root); + mods[0] = (LDAPMod*)slapi_mod_get_ldapmod_byref(&smod); + mods[1] = (LDAPMod*)slapi_mod_get_ldapmod_byref(&smod_last_modified); + + PR_Unlock (r->repl_lock); + + mods [2] = NULL; + pb = slapi_pblock_new(); + + slapi_modify_internal_set_pb( + pb, + (char*)slapi_sdn_get_dn (r->repl_root), /* only used to select be */ + mods, + NULL, /* controls */ + RUV_STORAGE_ENTRY_UNIQUEID, + repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), + OP_FLAG_REPLICATED | OP_FLAG_REPL_FIXUP); + + slapi_modify_internal_pb (pb); + + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + + if (rc != LDAP_SUCCESS) + { + if ((rc != LDAP_NO_SUCH_OBJECT) || !replica_is_state_flag_set(r, REPLICA_IN_USE)) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_replace_ruv_tombstone: " + "failed to update replication update vector for replica %s: LDAP " + "error - %d\n", (char*)slapi_sdn_get_dn (r->repl_root), rc); + } + } + + slapi_ch_free ((void**)&dn); + slapi_pblock_destroy (pb); + slapi_mod_done (&smod); + slapi_mod_done (&smod_last_modified); +} + +void +replica_update_ruv_consumer(Replica *r, RUV *supplier_ruv) +{ + ReplicaId supplier_id = 0; + char *supplier_purl = NULL; + + if ( ruv_get_first_id_and_purl(supplier_ruv, &supplier_id, &supplier_purl) == RUV_SUCCESS ) + { + RUV *local_ruv = NULL; + + PR_Lock(r->repl_lock); + + local_ruv = (RUV*)object_get_data (r->repl_ruv); + PR_ASSERT (local_ruv); + + if ( ruv_local_contains_supplier(local_ruv, supplier_id) == 0 ) + { + if ( r->repl_type == REPLICA_TYPE_UPDATABLE ) + { + /* Add the new ruv right after the consumer own purl */ + ruv_add_index_replica(local_ruv, supplier_id, supplier_purl, 2); + } + else + { + /* This is a consumer only, add it first */ + ruv_add_index_replica(local_ruv, supplier_id, supplier_purl, 1); + } + } + else + { + /* Replace it */ + ruv_replace_replica_purl(local_ruv, supplier_id, supplier_purl); + } + PR_Unlock(r->repl_lock); + + /* Update also the directory entry */ + replica_replace_ruv_tombstone(r); + } +} + +void +replica_set_ruv_dirty(Replica *r) +{ + PR_ASSERT(r); + PR_Lock(r->repl_lock); + r->repl_ruv_dirty = PR_TRUE; + PR_Unlock(r->repl_lock); +} + +PRBool +replica_is_state_flag_set(Replica *r, PRInt32 flag) +{ + PR_ASSERT(r); + if (r) + return (r->repl_state_flags & flag); + else + return PR_FALSE; +} + +void +replica_set_state_flag (Replica *r, PRUint32 flag, PRBool clear) +{ + if (r == NULL) + return; + + PR_Lock(r->repl_lock); + + if (clear) + { + r->repl_state_flags &= ~flag; + } + else + { + r->repl_state_flags |= flag; + } + + PR_Unlock(r->repl_lock); +} + +/* replica just came back online, probably after data was reloaded */ +void +replica_enable_replication (Replica *r) +{ + int rc; + + PR_ASSERT(r); + + /* prevent creation of new agreements until the replica is enabled */ + PR_Lock(r->agmt_lock); + + /* retrieve new ruv */ + rc = replica_reload_ruv (r); + if (rc) { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_enable_replication: " + "reloading ruv failed\n"); + /* What to do ? */ + } + + /* Replica came back online, Check if the total update was terminated. + If flag is still set, it was not terminated, therefore the data is + very likely to be incorrect, and we should not restart Replication threads... + */ + if (!replica_is_state_flag_set(r, REPLICA_TOTAL_IN_PROGRESS)){ + /* restart outbound replication */ + start_agreements_for_replica (r, PR_TRUE); + + /* enable ruv state update */ + replica_set_enabled (r, PR_TRUE); + } + + /* mark the replica as being available for updates */ + replica_relinquish_exclusive_access(r, 0, 0); + + replica_set_state_flag(r, REPLICA_AGREEMENTS_DISABLED, PR_TRUE /* clear */); + PR_Unlock(r->agmt_lock); + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replica_enable_replication: " + "replica %s is relinquished\n", + slapi_sdn_get_ndn (replica_get_root (r))); +} + +/* replica is about to be taken offline */ +void +replica_disable_replication (Replica *r, Object *r_obj) +{ + char *current_purl = NULL; + char *p_locking_purl = NULL; + char *locking_purl = NULL; + int junkrc; + ReplicaId junkrid; + PRBool isInc = PR_FALSE; /* get exclusive access, but not for inc update */ + RUV *repl_ruv = NULL; + + /* prevent creation of new agreements until the replica is disabled */ + PR_Lock(r->agmt_lock); + + /* stop ruv update */ + replica_set_enabled (r, PR_FALSE); + + /* disable outbound replication */ + start_agreements_for_replica (r, PR_FALSE); + + /* close the corresponding changelog file */ + /* close_changelog_for_replica (r_obj); */ + + /* mark the replica as being unavailable for updates */ + /* If an incremental update is in progress, we want to wait until it is + finished until we get exclusive access to the replica, because we have + to make sure no operations are in progress - it messes up replication + when a restore is in progress but we are still adding replicated entries + from a supplier + */ + repl_ruv = (RUV*) object_get_data (r->repl_ruv); + junkrc = ruv_get_first_id_and_purl(repl_ruv, &junkrid, &p_locking_purl); + locking_purl = slapi_ch_strdup(p_locking_purl); + p_locking_purl = NULL; + repl_ruv = NULL; + while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication", + ¤t_purl)) { + if (!isInc) /* already locked, but not by inc update - break */ + break; + isInc = PR_FALSE; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "replica_disable_replication: " + "replica %s is already locked by (%s) for incoming " + "incremental update; sleeping 100ms\n", + slapi_sdn_get_ndn (replica_get_root (r)), + current_purl ? current_purl : "unknown"); + slapi_ch_free_string(¤t_purl); + DS_Sleep(PR_MillisecondsToInterval(100)); + } + + slapi_ch_free_string(¤t_purl); + slapi_ch_free_string(&locking_purl); + replica_set_state_flag(r, REPLICA_AGREEMENTS_DISABLED, PR_FALSE); + PR_Unlock(r->agmt_lock); + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replica_disable_replication: " + "replica %s is acquired\n", + slapi_sdn_get_ndn (replica_get_root (r))); +} + +static void +start_agreements_for_replica (Replica *r, PRBool start) +{ + Object *agmt_obj; + Repl_Agmt *agmt; + + agmt_obj = agmtlist_get_first_agreement_for_replica (r); + while (agmt_obj) + { + agmt = (Repl_Agmt*)object_get_data (agmt_obj); + PR_ASSERT (agmt); + + if (start) + agmt_start (agmt); + else /* stop */ + agmt_stop (agmt); + + agmt_obj = agmtlist_get_next_agreement_for_replica (r, agmt_obj); + } +} + +int replica_start_agreement(Replica *r, Repl_Agmt *ra) +{ + int ret = 0; + + if (r == NULL) return -1; + + PR_Lock(r->agmt_lock); + + if (!replica_is_state_flag_set(r, REPLICA_AGREEMENTS_DISABLED)) { + ret = agmt_start(ra); /* Start the replication agreement */ + } + + PR_Unlock(r->agmt_lock); + return ret; +} + +/* + * A callback function registed as op->o_csngen_handler and + * called by backend ops to generate opcsn. + */ +CSN * +replica_generate_next_csn ( Slapi_PBlock *pb, const CSN *basecsn ) +{ + CSN *opcsn = NULL; + Object *replica_obj; + + replica_obj = replica_get_replica_for_op (pb); + if (NULL != replica_obj) + { + Replica *replica = (Replica*) object_get_data (replica_obj); + if ( NULL != replica ) + { + Slapi_Operation *op; + slapi_pblock_get (pb, SLAPI_OPERATION, &op); + if ( replica->repl_type != REPLICA_TYPE_READONLY || + operation_is_flag_set (op, OP_FLAG_LEGACY_REPLICATION_DN )) + { + Object *gen_obj = replica_get_csngen (replica); + if (NULL != gen_obj) + { + CSNGen *gen = (CSNGen*) object_get_data (gen_obj); + if (NULL != gen) + { + /* The new CSN should be greater than the base CSN */ + csngen_new_csn (gen, &opcsn, PR_FALSE /* don't notify */); + if (csn_compare (opcsn, basecsn) <= 0) + { + char opcsnstr[CSN_STRSIZE], basecsnstr[CSN_STRSIZE]; + char opcsn2str[CSN_STRSIZE]; + + csn_as_string (opcsn, PR_FALSE, opcsnstr); + csn_as_string (basecsn, PR_FALSE, basecsnstr); + csn_free ( &opcsn ); + csngen_adjust_time (gen, basecsn); + csngen_new_csn (gen, &opcsn, PR_FALSE /* don't notify */); + csn_as_string (opcsn, PR_FALSE, opcsn2str); + slapi_log_error (SLAPI_LOG_FATAL, NULL, + "replica_generate_next_csn: " + "opcsn=%s <= basecsn=%s, adjusted opcsn=%s\n", + opcsnstr, basecsnstr, opcsn2str); + } + /* + * Insert opcsn into the csn pending list. + * This is the notify effect in csngen_new_csn(). + */ + assign_csn_callback (opcsn, (void *)replica); + } + object_release (gen_obj); + } + } + } + object_release (replica_obj); + } + + return opcsn; +} + +/* + * A callback function registed as op->o_replica_attr_handler and + * called by backend ops to get replica attributes. + */ +int +replica_get_attr ( Slapi_PBlock *pb, const char* type, void *value ) +{ + int rc = -1; + + Object *replica_obj; + replica_obj = replica_get_replica_for_op (pb); + if (NULL != replica_obj) + { + Replica *replica = (Replica*) object_get_data (replica_obj); + if ( NULL != replica ) + { + if (strcasecmp (type, type_replicaTombstonePurgeInterval) == 0) + { + *((int*)value) = replica->tombstone_reap_interval; + rc = 0; + } + else if (strcasecmp (type, type_replicaPurgeDelay) == 0) + { + *((int*)value) = replica->repl_purge_delay; + rc = 0; + } + } + object_release (replica_obj); + } + + return rc; +} |