summaryrefslogtreecommitdiffstats
path: root/ldap/servers/plugins/replication/repl5_agmt.c
diff options
context:
space:
mode:
authorcvsadm <cvsadm>2005-01-21 00:44:34 +0000
committercvsadm <cvsadm>2005-01-21 00:44:34 +0000
commitb2093e3016027d6b5cf06b3f91f30769bfc099e2 (patch)
treecf58939393a9032182c4fbc4441164a9456e82f8 /ldap/servers/plugins/replication/repl5_agmt.c
downloadds-b2093e3016027d6b5cf06b3f91f30769bfc099e2.tar.gz
ds-b2093e3016027d6b5cf06b3f91f30769bfc099e2.tar.xz
ds-b2093e3016027d6b5cf06b3f91f30769bfc099e2.zip
Moving NSCP Directory Server from DirectoryBranch to TRUNK, initial drop. (foxworth)ldapserver7x
Diffstat (limited to 'ldap/servers/plugins/replication/repl5_agmt.c')
-rw-r--r--ldap/servers/plugins/replication/repl5_agmt.c1766
1 files changed, 1766 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
new file mode 100644
index 00000000..2992fc11
--- /dev/null
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -0,0 +1,1766 @@
+/** BEGIN COPYRIGHT BLOCK
+ * Copyright 2001 Sun Microsystems, Inc.
+ * Portions copyright 1999, 2001-2003 Netscape Communications Corporation.
+ * All rights reserved.
+ * END COPYRIGHT BLOCK **/
+/* repl5_agmt.c */
+/*
+
+ Support for 5.0-style replication agreements.
+
+ Directory Server 5.0 replication agreements contain information about
+ replication consumers that we are supplying.
+
+ This module encapsulates the methods available for adding, deleting,
+ modifying, and firing replication agreements.
+
+ Methods:
+
+ agmt_new - Create a new replication agreement, in response to a new
+ replication agreement being added over LDAP.
+ agmt_delete - Destroy an agreement. It is an error to destroy an
+ agreement that has not been stopped.
+ agmt_getstatus - get the status of this replication agreement.
+ agmt_replicate_now - initiate a replication session asap, even if the
+ schedule says we shouldn't.
+ agmt_start - start replicating, according to schedule. Starts a new
+ thread to handle replication.
+ agmt_stop - stop replicating asap and end replication thread.
+ agmt_notify_change - notify the replication agreement about a change that
+ has been logged. The replication agreement will
+ decide if it needs to take some action, e.g. start a
+ replication session.
+ agmt_initialize_replica - start a complete replica refresh.
+ agmt_set_schedule_from_entry - (re)set the schedule associated with this
+ replication agreement based on a RA entry's contents.
+ agmt_set_credentials_from_entry - (re)set the credentials used to bind
+ to the remote replica.
+ agmt_set_binddn_from_entry - (re)set the DN used to bind
+ to the remote replica.
+ agmt_set_bind_method_from_entry - (re)set the bind method used to bind
+ to the remote replica (SIMPLE or SSLCLIENTAUTH).
+ agmt_set_transportinfo_from_entry - (re)set the transport used to bind
+ to the remote replica (SSL or not)
+
+*/
+
+#include "repl5.h"
+#include "repl5_prot_private.h"
+#include "cl5_api.h"
+
+#define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
+#define TRANSPORT_FLAG_SSL 1
+#define STATUS_LEN 1024
+
+struct changecounter {
+ ReplicaId rid;
+ PRUint32 num_replayed;
+ PRUint32 num_skipped;
+};
+
+typedef struct repl5agmt {
+ char *hostname; /* remote hostname */
+ int port; /* port of remote server */
+ PRUint32 transport_flags; /* SSL, TLS, etc. */
+ char *binddn; /* DN to bind as */
+ struct berval *creds; /* Password, or certificate */
+ int bindmethod; /* Bind method - simple, SSL */
+ Slapi_DN *replarea; /* DN of replicated area */
+ char **frac_attrs; /* list of fractional attributes to be replicated */
+ Schedule *schedule; /* Scheduling information */
+ int auto_initialize; /* 1 = automatically re-initialize replica */
+ const Slapi_DN *dn; /* DN of replication agreement entry */
+ const Slapi_RDN *rdn; /* RDN of replication agreement entry */
+ char *long_name; /* Long name (rdn + host, port) of entry, for logging */
+ Repl_Protocol *protocol; /* Protocol object - manages protocol */
+ struct changecounter *changecounters[MAX_NUM_OF_MASTERS]; /* changes sent/skipped since server start up */
+ int num_changecounters;
+ time_t last_update_start_time; /* Local start time of last update session */
+ time_t last_update_end_time; /* Local end time of last update session */
+ char last_update_status[STATUS_LEN]; /* Status of last update. Format = numeric code <space> textual description */
+ PRBool update_in_progress;
+ time_t last_init_start_time; /* Local start time of last total init */
+ time_t last_init_end_time; /* Local end time of last total init */
+ char last_init_status[STATUS_LEN]; /* Status of last total init. Format = numeric code <space> textual description */
+ PRLock *lock;
+ Object *consumerRUV; /* last RUV received from the consumer - used for changelog purging */
+ CSN *consumerSchemaCSN; /* last schema CSN received from the consumer */
+ ReplicaId consumerRID; /* indicates if the consumer is the originator of a CSN */
+ long timeout; /* timeout (in seconds) for outbound LDAP connections to remote server */
+ PRBool stop_in_progress; /* set by agmt_stop when shutting down */
+ long busywaittime; /* time in seconds to wait after getting a REPLICA BUSY from the consumer -
+ to allow another supplier to finish sending its updates -
+ if set to 0, this means to use the default value if we get a busy
+ signal from the consumer */
+ long pausetime; /* time in seconds to pause after sending updates -
+ to allow another supplier to send its updates -
+ should be greater than busywaittime -
+ if set to 0, this means do not pause */
+} repl5agmt;
+
+/* Forward declarations */
+void agmt_delete(void **rap);
+static void update_window_state_change_callback (void *arg, PRBool opened);
+static int get_agmt_status(Slapi_PBlock *pb, Slapi_Entry* e,
+ Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg);
+static int agmt_set_bind_method_no_lock(Repl_Agmt *ra, const Slapi_Entry *e);
+static int agmt_set_transportinfo_no_lock(Repl_Agmt *ra, const Slapi_Entry *e);
+
+/*
+Schema for replication agreement:
+
+cn
+nsds5ReplicaHost - hostname
+nsds5ReplicaPort - port number
+nsds5ReplicaTransportInfo - "SSL", "startTLS", or may be absent;
+nsds5ReplicaBindDN
+nsds5ReplicaCredentials
+nsds5ReplicaBindMethod - "SIMPLE" or "SSLCLIENTAUTH".
+nsds5ReplicaRoot - Replicated suffix
+nsds5ReplicatedAttributeList - Unused so far (meant for fractional repl).
+nsds5ReplicaUpdateSchedule
+nsds5ReplicaTimeout - Outbound repl operations timeout
+nsds50ruv - consumer's RUV
+nsds5ReplicaBusyWaitTime - time to wait after getting a REPLICA BUSY from the consumer
+nsds5ReplicaSessionPauseTime - time to pause after sending updates to allow another supplier to send
+*/
+
+
+/*
+ * Validate an agreement, making sure that it's valid.
+ * Return 1 if the agreement is valid, 0 otherwise.
+ */
+static int
+agmt_is_valid(Repl_Agmt *ra)
+{
+ int return_value = 1; /* assume valid, initially */
+ PR_ASSERT(NULL != ra);
+ PR_ASSERT(NULL != ra->dn);
+
+ if (NULL == ra->hostname)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" "
+ "is malformed: missing host name.\n", slapi_sdn_get_dn(ra->dn));
+ return_value = 0;
+ }
+ if (ra->port <= 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" "
+ "is malformed: invalid port number %d.\n", slapi_sdn_get_dn(ra->dn), ra->port);
+ return_value = 0;
+ }
+ if (ra->timeout < 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" "
+ "is malformed: invalid timeout %d.\n", slapi_sdn_get_dn(ra->dn), ra->timeout);
+ return_value = 0;
+ }
+ if (ra->busywaittime < 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" "
+ "is malformed: invalid busy wait time %d.\n", slapi_sdn_get_dn(ra->dn), ra->busywaittime);
+ return_value = 0;
+ }
+ if (ra->pausetime < 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" "
+ "is malformed: invalid pausetime %d.\n", slapi_sdn_get_dn(ra->dn), ra->pausetime);
+ return_value = 0;
+ }
+ return return_value;
+}
+
+
+Repl_Agmt *
+agmt_new_from_entry(Slapi_Entry *e)
+{
+ Repl_Agmt *ra;
+ char *tmpstr;
+ Slapi_Attr *sattr;
+
+ char *auto_initialize = NULL;
+ char *val_nsds5BeginReplicaRefresh = "start";
+
+ ra = (Repl_Agmt *)slapi_ch_calloc(1, sizeof(repl5agmt));
+ if ((ra->lock = PR_NewLock()) == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Unable to create new lock "
+ "for replication agreement \"%s\" - agreement ignored.\n",
+ slapi_entry_get_dn_const(e));
+ goto loser;
+ }
+
+ /* Find all the stuff we need for the agreement */
+
+ /* To Allow Consumer Initialisation when adding an agreement: */
+
+ /*
+ Using 'auto_initialize' member of 'repl5agmt' structure to
+ store the effect of 'nsds5BeginReplicaRefresh' attribute's value
+ in it.
+ */
+ auto_initialize = slapi_entry_attr_get_charptr(e, type_nsds5BeginReplicaRefresh);
+ if ((auto_initialize != NULL) && (strcasecmp(auto_initialize, val_nsds5BeginReplicaRefresh) == 0))
+ {
+ ra->auto_initialize = STATE_PERFORMING_TOTAL_UPDATE;
+ }
+ else
+ {
+ ra->auto_initialize = STATE_PERFORMING_INCREMENTAL_UPDATE;
+ }
+
+ if (auto_initialize)
+ {
+ slapi_ch_free_string (&auto_initialize);
+ }
+
+ /* Host name of remote replica */
+ ra->hostname = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaHost);
+ /* Port number for remote replica instance */
+ ra->port = slapi_entry_attr_get_int(e, type_nsds5ReplicaPort);
+ /* SSL, TLS, or other transport stuff */
+ ra->transport_flags = 0;
+ agmt_set_transportinfo_no_lock(ra, e);
+
+ /* DN to use when binding. May be empty if cert-based auth is to be used. */
+ ra->binddn = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaBindDN);
+ if (NULL == ra->binddn)
+ {
+ ra->binddn = slapi_ch_strdup("");
+ }
+ /* Credentials to use when binding. */
+ ra->creds = (struct berval *)slapi_ch_malloc(sizeof(struct berval));
+ ra->creds->bv_val = NULL;
+ ra->creds->bv_len = 0;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaCredentials, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ const struct berval *bv = slapi_value_get_berval(sval);
+ if (NULL != bv)
+ {
+ ra->creds->bv_val = slapi_ch_malloc(bv->bv_len + 1);
+ memcpy(ra->creds->bv_val, bv->bv_val, bv->bv_len);
+ ra->creds->bv_len = bv->bv_len;
+ ra->creds->bv_val[bv->bv_len] = '\0'; /* be safe */
+ }
+ }
+ }
+ /* How to bind */
+ (void)agmt_set_bind_method_no_lock(ra, e);
+
+ /* timeout. */
+ ra->timeout = DEFAULT_TIMEOUT;
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaTimeout, &sattr) == 0)
+ {
+ Slapi_Value *sval;
+ if (slapi_attr_first_value(sattr, &sval) == 0)
+ {
+ ra->timeout = slapi_value_get_long(sval);
+ }
+ }
+
+ /* DN of entry at root of replicated area */
+ tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
+ if (NULL != tmpstr)
+ {
+ ra->replarea = slapi_sdn_new_dn_passin(tmpstr);
+ }
+ /* XXXggood get fractional attribute include/exclude lists here */
+ /* Replication schedule */
+ ra->schedule = schedule_new(update_window_state_change_callback, ra, agmt_get_long_name(ra));
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaUpdateSchedule, &sattr) == 0)
+ {
+ schedule_set(ra->schedule, sattr);
+ }
+
+ /* busy wait time - time to wait after getting REPLICA BUSY from consumer */
+ ra->busywaittime = slapi_entry_attr_get_long(e, type_nsds5ReplicaBusyWaitTime);
+
+ /* pause time - time to pause after a session has ended */
+ ra->pausetime = slapi_entry_attr_get_long(e, type_nsds5ReplicaSessionPauseTime);
+
+ /* consumer's RUV */
+ if (slapi_entry_attr_find(e, type_ruvElement, &sattr) == 0)
+ {
+ RUV *ruv;
+
+ if (ruv_init_from_slapi_attr(sattr, &ruv) == 0)
+ {
+ ra->consumerRUV = object_new (ruv, (FNFree)ruv_destroy);
+ }
+ }
+
+ ra->consumerRID = 0;
+
+ /* DN and RDN of the replication agreement entry itself */
+ ra->dn = slapi_sdn_dup(slapi_entry_get_sdn((Slapi_Entry *)e));
+ ra->rdn = slapi_rdn_new_sdn(ra->dn);
+
+ /* Compute long name */
+ {
+ const char *agmtname = slapi_rdn_get_rdn(ra->rdn);
+ char hostname[128];
+ char *dot;
+
+ strncpy(hostname, ra->hostname ? ra->hostname : "(unknown)", sizeof(hostname));
+ hostname[sizeof(hostname)-1] = '\0';
+ dot = strchr(hostname, '.');
+ if (dot) {
+ *dot = '\0';
+ }
+ ra->long_name = slapi_ch_malloc(strlen(agmtname) +
+ strlen(hostname) + 25);
+ sprintf(ra->long_name, "agmt=\"%s\" (%s:%d)", agmtname, hostname, ra->port);
+ }
+
+ /* Initialize status information */
+ ra->last_update_start_time = 0UL;
+ ra->last_update_end_time = 0UL;
+ ra->num_changecounters = 0;
+ ra->last_update_status[0] = '\0';
+ ra->update_in_progress = PR_FALSE;
+ ra->stop_in_progress = PR_FALSE;
+ ra->last_init_end_time = 0UL;
+ ra->last_init_start_time = 0UL;
+ ra->last_init_status[0] = '\0';
+
+ if (!agmt_is_valid(ra))
+ {
+ goto loser;
+ }
+
+ /* Now that the agreement is done, just check if changelog is configured */
+ if (cl5GetState() != CL5_STATE_OPEN) {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "WARNING: "
+ "Replication agreement added but there is no changelog configured. "
+ "No change will be replicated until a changelog is configured.\n");
+ }
+
+ /*
+ * Establish a callback for this agreement's entry, so we can
+ * adorn it with status information when read.
+ */
+ slapi_config_register_callback(SLAPI_OPERATION_SEARCH, DSE_FLAG_PREOP, slapi_sdn_get_ndn(ra->dn),
+ LDAP_SCOPE_BASE, "(objectclass=*)", get_agmt_status, ra);
+
+ return ra;
+loser:
+ agmt_delete((void **)&ra);
+ return NULL;
+}
+
+
+
+Repl_Agmt *
+agmt_new_from_pblock(Slapi_PBlock *pb)
+{
+ Slapi_Entry *e;
+
+ slapi_pblock_get(pb, SLAPI_ADD_ENTRY, &e);
+ return agmt_new_from_entry(e);
+}
+
+
+/*
+ This should never be called directly - only should be called
+ as a destructor. XXXggood this is not finished
+ */
+void
+agmt_delete(void **rap)
+{
+ Repl_Agmt *ra;
+ PR_ASSERT(NULL != rap);
+ PR_ASSERT(NULL != *rap);
+
+ ra = (Repl_Agmt *)*rap;
+
+ /* do prot_delete first - we may be doing some processing using this
+ replication agreement, and prot_delete will make sure the
+ processing is complete - then it should be safe to clean up the
+ other fields below
+ */
+ prot_delete(&ra->protocol);
+
+ /*
+ * Remove the callback for this agreement's entry
+ */
+ slapi_config_remove_callback(SLAPI_OPERATION_SEARCH, DSE_FLAG_PREOP,
+ slapi_sdn_get_ndn(ra->dn),
+ LDAP_SCOPE_BASE, "(objectclass=*)",
+ get_agmt_status);
+
+ /* slapi_ch_free accepts NULL pointer */
+ slapi_ch_free((void **)&(ra->hostname));
+ slapi_ch_free((void **)&(ra->binddn));
+
+ if (NULL != ra->creds)
+ {
+ /* XXX free berval */
+ }
+ if (NULL != ra->replarea)
+ {
+ slapi_sdn_free(&ra->replarea);
+ }
+
+ if (NULL != ra->consumerRUV)
+ {
+ object_release (ra->consumerRUV);
+ }
+
+ csn_free (&ra->consumerSchemaCSN);
+ while ( --(ra->num_changecounters) >= 0 )
+ {
+ slapi_ch_free((void **)&ra->changecounters[ra->num_changecounters]);
+ }
+
+ schedule_destroy(ra->schedule);
+ slapi_ch_free((void **)&ra->long_name);
+ slapi_ch_free((void **)rap);
+}
+
+
+/*
+ * Allow replication for this replica to begin. Replication will
+ * occur at the next scheduled time. Returns 0 on success, -1 on
+ * failure.
+ */
+int
+agmt_start(Repl_Agmt *ra)
+{
+ Repl_Protocol *prot = NULL;
+
+ int protocol_state;
+
+ /* To Allow Consumer Initialisation when adding an agreement: */
+ if (ra->auto_initialize == STATE_PERFORMING_TOTAL_UPDATE)
+ {
+ protocol_state = STATE_PERFORMING_TOTAL_UPDATE;
+ }
+ else
+ {
+ protocol_state = STATE_PERFORMING_INCREMENTAL_UPDATE;
+ }
+
+ /* First, create a new protocol object */
+ if ((prot = prot_new(ra, protocol_state)) == NULL) {
+ return -1;
+ }
+
+ /* Now it is safe to own the agreement lock */
+ PR_Lock(ra->lock);
+
+ /* Check that replication is not already started */
+ if (ra->protocol != NULL) {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replication already started for agreement \"%s\"\n", agmt_get_long_name(ra));
+ PR_Unlock(ra->lock);
+ prot_free(&prot);
+ return 0;
+ }
+
+ ra->protocol = prot;
+
+ /* Start the protocol thread */
+ prot_start(ra->protocol);
+
+ PR_Unlock(ra->lock);
+ return 0;
+}
+
+/*
+Cease replicating to this replica as soon as possible.
+*/
+int
+agmt_stop(Repl_Agmt *ra)
+{
+ int return_value = 0;
+ Repl_Protocol *rp = NULL;
+
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+ ra->stop_in_progress = PR_TRUE;
+ rp = ra->protocol;
+ PR_Unlock(ra->lock);
+ if (NULL != rp) /* we use this pointer outside the lock - dangerous? */
+ {
+ prot_stop(rp);
+ }
+ PR_Lock(ra->lock);
+ ra->stop_in_progress = PR_FALSE;
+ /* we do not reuse the protocol object so free it */
+ prot_free(&ra->protocol);
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+Send any pending updates as soon as possible, ignoring any replication
+schedules.
+*/
+int
+agmt_replicate_now(Repl_Agmt *ra)
+{
+ int return_value = 0;
+
+ return return_value;
+}
+
+/*
+ * Return a copy of the remote replica's hostname.
+ */
+char *
+agmt_get_hostname(const Repl_Agmt *ra)
+{
+ char *return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = slapi_ch_strdup(ra->hostname);
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+ * Return the port number of the remote replica's instance.
+ */
+int
+agmt_get_port(const Repl_Agmt *ra)
+{
+ int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->port;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+ * Return the transport flags for this agreement.
+ */
+PRUint32
+agmt_get_transport_flags(const Repl_Agmt *ra)
+{
+ unsigned int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->transport_flags;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+ * Return a copy of the bind dn to be used with this
+ * agreement (may return NULL if no binddn is required,
+ * e.g. SSL client auth.
+ */
+char *
+agmt_get_binddn(const Repl_Agmt *ra)
+{
+ char *return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->binddn == NULL ? NULL : slapi_ch_strdup(ra->binddn);
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+ * Return a copy of the credentials.
+ */
+struct berval *
+agmt_get_credentials(const Repl_Agmt *ra)
+{
+ struct berval *return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = (struct berval *)slapi_ch_malloc(sizeof(struct berval));
+ return_value->bv_val = (char *)slapi_ch_malloc(ra->creds->bv_len + 1);
+ return_value->bv_len = ra->creds->bv_len;
+ memcpy(return_value->bv_val, ra->creds->bv_val, ra->creds->bv_len);
+ return_value->bv_val[return_value->bv_len] = '\0'; /* just in case */
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+int
+agmt_get_bindmethod(const Repl_Agmt *ra)
+{
+ int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->bindmethod;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+ * Return a copy of the dn at the top of the replicated area.
+ */
+Slapi_DN *
+agmt_get_replarea(const Repl_Agmt *ra)
+{
+ Slapi_DN *return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = slapi_sdn_new();
+ slapi_sdn_copy(ra->replarea, return_value);
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+int
+agmt_is_fractional(const Repl_Agmt *ra)
+{
+ int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->frac_attrs != NULL;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+int
+agmt_is_fractional_attr(const Repl_Agmt *ra, const char *attrname)
+{
+ int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = 1; /* XXXggood finish this */
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+int
+agmt_get_auto_initialize(const Repl_Agmt *ra)
+{
+ int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->auto_initialize;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+long
+agmt_get_timeout(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->timeout;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+long
+agmt_get_busywaittime(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->busywaittime;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+long
+agmt_get_pausetime(const Repl_Agmt *ra)
+{
+ long return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ return_value = ra->pausetime;
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+/*
+ * Warning - reference to the long name of the agreement is returned.
+ * The long name of an agreement is the DN of the agreement entry,
+ * followed by the host/port for the replica.
+ */
+const char *
+agmt_get_long_name(const Repl_Agmt *ra)
+{
+ char *return_value = NULL;
+
+ return_value = ra ? ra->long_name : "";
+ return return_value;
+}
+
+/*
+ * Warning - reference to dn is returned. However, since the dn of
+ * the replication agreement is its name, it won't change during the
+ * lifetime of the replication agreement object.
+ */
+const Slapi_DN *
+agmt_get_dn_byref(const Repl_Agmt *ra)
+{
+ const Slapi_DN *return_value = NULL;
+
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ return_value = ra->dn;
+ }
+ return return_value;
+}
+
+/* Return 1 if name matches the replication Dn, 0 otherwise */
+int
+agmt_matches_name(const Repl_Agmt *ra, const Slapi_DN *name)
+{
+ int return_value = 0;
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ PR_Lock(ra->lock);
+ if (slapi_sdn_compare(name, ra->dn) == 0)
+ {
+ return_value = 1;
+ }
+ PR_Unlock(ra->lock);
+ }
+ return return_value;
+}
+
+/* Return 1 if name matches the replication area, 0 otherwise */
+int
+agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name)
+{
+ int return_value = 0;
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ PR_Lock(ra->lock);
+ if (slapi_sdn_compare(name, ra->replarea) == 0)
+ {
+ return_value = 1;
+ }
+ PR_Unlock(ra->lock);
+ }
+ return return_value;
+}
+
+
+int
+agmt_schedule_in_window_now(const Repl_Agmt *ra)
+{
+ int return_value;
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (NULL != ra->schedule && schedule_in_window_now(ra->schedule))
+ {
+ return_value = 1;
+ }
+ else
+ {
+ return_value = 0;
+ }
+ PR_Unlock(ra->lock);
+ return return_value;
+}
+
+
+/*
+ * Set or reset the credentials used to bind to the remote replica.
+ *
+ * Returns 0 if credentials set, or -1 if an error occurred.
+ */
+int
+agmt_set_credentials_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = 0;
+
+ PR_ASSERT(NULL != ra);
+ slapi_entry_attr_find(e, type_nsds5ReplicaCredentials, &sattr);
+ PR_Lock(ra->lock);
+ slapi_ch_free((void **)&ra->creds->bv_val);
+ ra->creds->bv_len = 0;
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ const struct berval *bv = slapi_value_get_berval(sval);
+ ra->creds->bv_val = slapi_ch_calloc(1, bv->bv_len + 1);
+ memcpy(ra->creds->bv_val, bv->bv_val, bv->bv_len);
+ ra->creds->bv_len = bv->bv_len;
+ }
+ }
+ /* If no credentials set, set to zero-length string */
+ ra->creds->bv_val = NULL == ra->creds->bv_val ? slapi_ch_strdup("") : ra->creds->bv_val;
+ PR_Unlock(ra->lock);
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ return return_value;
+}
+
+/*
+ * Set or reset the DN used to bind to the remote replica.
+ *
+ * Returns 0 if DN set, or -1 if an error occurred.
+ */
+int
+agmt_set_binddn_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = 0;
+
+ PR_ASSERT(NULL != ra);
+ slapi_entry_attr_find(e, type_nsds5ReplicaBindDN, &sattr);
+ PR_Lock(ra->lock);
+ slapi_ch_free((void **)&ra->binddn);
+ ra->binddn = NULL;
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ const char *val = slapi_value_get_string(sval);
+ ra->binddn = strdup(val);
+ }
+ }
+ /* If no BindDN set, set to zero-length string */
+ if (ra->binddn == NULL) {
+ ra->binddn = strdup("");
+ }
+ PR_Unlock(ra->lock);
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ return return_value;
+}
+
+/*
+ * Set or reset the bind method used to bind to the remote replica.
+ *
+ * Returns 0 if bind method set, or -1 if an error occurred.
+ */
+static int
+agmt_set_bind_method_no_lock(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ char *tmpstr = NULL;
+ int return_value = 0;
+
+ PR_ASSERT(NULL != ra);
+ tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaBindMethod);
+
+ if (NULL == tmpstr || strcasecmp(tmpstr, "SIMPLE") == 0)
+ {
+ ra->bindmethod = BINDMETHOD_SIMPLE_AUTH;
+ }
+ else if (strcasecmp(tmpstr, "SSLCLIENTAUTH") == 0)
+ {
+ ra->bindmethod = BINDMETHOD_SSL_CLIENTAUTH;
+ }
+ else
+ {
+ ra->bindmethod = BINDMETHOD_SIMPLE_AUTH;
+ }
+ slapi_ch_free((void **)&tmpstr);
+ return return_value;
+}
+
+int
+agmt_set_bind_method_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ char *tmpstr = NULL;
+ int return_value = 0;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+ return_value = agmt_set_bind_method_no_lock(ra, e);
+ PR_Unlock(ra->lock);
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ return return_value;
+}
+
+/*
+ * Set or reset the transport used to bind to the remote replica.
+ *
+ * Returns 0 if transport set, or -1 if an error occurred.
+ */
+static int
+agmt_set_transportinfo_no_lock(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ char *tmpstr;
+ int rc = 0;
+
+ tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5TransportInfo);
+ if (NULL != tmpstr && strcasecmp(tmpstr, "SSL") == 0)
+ {
+ ra->transport_flags |= TRANSPORT_FLAG_SSL;
+ } else {
+ ra->transport_flags &= ~TRANSPORT_FLAG_SSL;
+ }
+
+ slapi_ch_free((void **)&tmpstr);
+ return (rc);
+}
+
+int
+agmt_set_transportinfo_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ int return_value = 0;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+ return_value = agmt_set_transportinfo_no_lock(ra, e);
+ PR_Unlock(ra->lock);
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+
+ return return_value;
+}
+
+
+/*
+ * Set or reset the replication schedule. Notify the protocol handler
+ * that a change has been made.
+ *
+ * Returns 0 if schedule was set or -1 if an error occurred.
+ */
+int
+agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e )
+{
+ Slapi_Attr *sattr;
+ int return_value = 0;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+ PR_Unlock(ra->lock);
+
+ if (slapi_entry_attr_find(e, type_nsds5ReplicaUpdateSchedule, &sattr) != 0)
+ {
+ sattr = NULL; /* no schedule ==> delete any existing one */
+ }
+
+ /* make it so */
+ return_value = schedule_set(ra->schedule, sattr);
+
+ if ( 0 == return_value ) {
+ /* schedule set OK -- spread the news */
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+
+ return return_value;
+}
+
+/*
+ * Set or reset the timeout used to bind to the remote replica.
+ *
+ * Returns 0 if timeout set, or -1 if an error occurred.
+ */
+int
+agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaTimeout, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->timeout = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
+/*
+ * Set or reset the busywaittime
+ *
+ * Returns 0 if busywaittime set, or -1 if an error occurred.
+ */
+int
+agmt_set_busywaittime_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaBusyWaitTime, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->busywaittime = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
+/*
+ * Set or reset the pausetime
+ *
+ * Returns 0 if pausetime set, or -1 if an error occurred.
+ */
+int
+agmt_set_pausetime_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+ Slapi_Attr *sattr = NULL;
+ int return_value = -1;
+
+ PR_ASSERT(NULL != ra);
+ PR_Lock(ra->lock);
+ if (ra->stop_in_progress)
+ {
+ PR_Unlock(ra->lock);
+ return return_value;
+ }
+
+ slapi_entry_attr_find(e, type_nsds5ReplicaSessionPauseTime, &sattr);
+ if (NULL != sattr)
+ {
+ Slapi_Value *sval = NULL;
+ slapi_attr_first_value(sattr, &sval);
+ if (NULL != sval)
+ {
+ long tmpval = slapi_value_get_long(sval);
+ if (tmpval >= 0) {
+ ra->pausetime = tmpval;
+ return_value = 0; /* success! */
+ }
+ }
+ }
+ PR_Unlock(ra->lock);
+ if (return_value == 0)
+ {
+ prot_notify_agmt_changed(ra->protocol, ra->long_name);
+ }
+ return return_value;
+}
+
+/* XXXggood - also make this pass an arg that tells if there was
+ * an update to a priority attribute */
+void
+agmt_notify_change(Repl_Agmt *agmt, Slapi_PBlock *pb)
+{
+ if (NULL != pb)
+ {
+ /* Is the entry within our replicated area? */
+ char *target_dn;
+ Slapi_DN *target_sdn;
+ int change_is_relevant = 0;
+
+ PR_ASSERT(NULL != agmt);
+ PR_Lock(agmt->lock);
+ if (agmt->stop_in_progress)
+ {
+ PR_Unlock(agmt->lock);
+ return;
+ }
+
+ slapi_pblock_get(pb, SLAPI_TARGET_DN, &target_dn);
+ target_sdn = slapi_sdn_new_dn_byref(target_dn); /* XXX see if you can avoid allocating this */
+
+ if (slapi_sdn_issuffix(target_sdn, agmt->replarea))
+ {
+ /*
+ * Yep, it's in our replicated area. Is this a fractional
+ * replication agreement?
+ */
+ if (NULL != agmt->frac_attrs)
+ {
+ /*
+ * Yep, it's fractional. See if the change should be
+ * tossed because it doesn't affect any of the replicated
+ * attributes.
+ */
+ int optype;
+ int affects_fractional_attribute = 0;
+
+ slapi_pblock_get(pb, SLAPI_OPERATION_TYPE, &optype);
+ if (SLAPI_OPERATION_MODIFY == optype)
+ {
+ LDAPMod **mods;
+ int i, j;
+
+ slapi_pblock_get(pb, SLAPI_MODIFY_MODS, &mods);
+ for (i = 0; !affects_fractional_attribute && NULL != agmt->frac_attrs[i]; i++)
+ {
+ for (j = 0; !affects_fractional_attribute && NULL != mods[j]; j++)
+ {
+ if (slapi_attr_types_equivalent(agmt->frac_attrs[i],
+ mods[i]->mod_type))
+ {
+ affects_fractional_attribute = 1;
+ }
+ }
+ }
+ }
+ else
+ {
+ /*
+ * Add, delete, and modrdn always cause some sort of
+ * operation replay, even if agreement is fractional.
+ */
+ affects_fractional_attribute = 1;
+ }
+ if (affects_fractional_attribute)
+ {
+ change_is_relevant = 1;
+ }
+ }
+ else
+ {
+ /* Not a fractional agreement */
+ change_is_relevant = 1;
+ }
+ }
+ PR_Unlock(agmt->lock);
+ slapi_sdn_free(&target_sdn);
+ if (change_is_relevant)
+ {
+ /* Notify the protocol that a change has occurred */
+ prot_notify_update(agmt->protocol);
+ }
+ }
+}
+
+
+
+int
+agmt_is_50_mm_protocol(const Repl_Agmt *agmt)
+{
+ return 1; /* XXXggood could support > 1 protocol */
+}
+
+
+
+int
+agmt_initialize_replica(const Repl_Agmt *agmt)
+{
+ PR_ASSERT(NULL != agmt);
+ PR_Lock(agmt->lock);
+ if (agmt->stop_in_progress)
+ {
+ PR_Unlock(agmt->lock);
+ return 0;
+ }
+ PR_Unlock(agmt->lock);
+ /* Call prot_initialize_replica only if the suffix is enabled (agmt->protocol != NULL) */
+ if (NULL != agmt->protocol) {
+ prot_initialize_replica(agmt->protocol);
+ }
+ else {
+ /* agmt->protocol == NULL --> Suffix is disabled */
+ return -1;
+ }
+ return 0;
+}
+
+/* delete nsds5BeginReplicaRefresh attribute to indicate to the clients
+ that replica initialization have completed */
+void
+agmt_replica_init_done (const Repl_Agmt *agmt)
+{
+ int rc;
+ Slapi_PBlock *pb = slapi_pblock_new ();
+ LDAPMod *mods [2];
+ LDAPMod mod;
+
+ mods[0] = &mod;
+ mods[1] = NULL;
+ mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES;
+ mod.mod_type = (char*)type_nsds5ReplicaInitialize;
+ mod.mod_bvalues = NULL;
+
+ slapi_modify_internal_set_pb(pb, slapi_sdn_get_dn (agmt->dn), mods, NULL/* controls */,
+ NULL/* uniqueid */, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0/* flags */);
+ slapi_modify_internal_pb (pb);
+
+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
+ if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_ATTRIBUTE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "agmt_replica_init_done: "
+ "failed to remove (%s) attribute from (%s) entry; LDAP error - %d\n",
+ type_nsds5ReplicaInitialize, slapi_sdn_get_ndn (agmt->dn), rc);
+ }
+
+ slapi_pblock_destroy (pb);
+}
+
+/* Agreement object is acquired on behalf of the caller.
+ The caller is responsible for releasing the object
+ when it is no longer used */
+
+Object*
+agmt_get_consumer_ruv (Repl_Agmt *ra)
+{
+ Object *rt = NULL;
+
+ PR_ASSERT(NULL != ra);
+
+ PR_Lock(ra->lock);
+ if (ra->consumerRUV)
+ {
+ object_acquire (ra->consumerRUV);
+ rt = ra->consumerRUV;
+ }
+
+ PR_Unlock(ra->lock);
+
+ return rt;
+}
+
+int
+agmt_set_consumer_ruv (Repl_Agmt *ra, RUV *ruv)
+{
+ if (ra == NULL || ruv == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmt_set_consumer_ruv: invalid argument"
+ " agmt - %p, ruv - %p\n", ra, ruv);
+ return -1;
+ }
+
+ PR_Lock(ra->lock);
+
+ if (ra->consumerRUV)
+ {
+ object_release (ra->consumerRUV);
+ }
+
+ ra->consumerRUV = object_new (ruv_dup (ruv), (FNFree)ruv_destroy);
+
+ PR_Unlock(ra->lock);
+
+ return 0;
+}
+
+void
+agmt_update_consumer_ruv (Repl_Agmt *ra)
+{
+ int rc;
+ RUV *ruv;
+ Slapi_Mod smod;
+ Slapi_Mod smod_last_modified;
+ Slapi_PBlock *pb;
+ LDAPMod *mods[3];
+
+ PR_ASSERT (ra);
+ PR_Lock(ra->lock);
+
+ if (ra->consumerRUV)
+ {
+ ruv = (RUV*) object_get_data (ra->consumerRUV);
+ PR_ASSERT (ruv);
+
+ ruv_to_smod(ruv, &smod);
+ ruv_last_modified_to_smod(ruv, &smod_last_modified);
+
+ /* it is ok to release the lock here because we are done with the agreement data.
+ we have to do it before issuing the modify operation because it causes
+ agmtlist_notify_all to be called which uses the same lock - hence the deadlock */
+ PR_Unlock(ra->lock);
+
+ pb = slapi_pblock_new ();
+ mods[0] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod);
+ mods[1] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod_last_modified);
+ mods[2] = NULL;
+
+ slapi_modify_internal_set_pb (pb, (char*)slapi_sdn_get_dn(ra->dn), mods, NULL, NULL,
+ repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
+ slapi_modify_internal_pb (pb);
+
+ slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc);
+ if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_ATTRIBUTE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "%s: agmt_update_consumer_ruv: "
+ "failed to update consumer's RUV; LDAP error - %d\n",
+ ra->long_name, rc);
+ }
+
+ slapi_mod_done (&smod);
+ slapi_mod_done (&smod_last_modified);
+ slapi_pblock_destroy (pb);
+ }
+ else
+ PR_Unlock(ra->lock);
+}
+
+CSN*
+agmt_get_consumer_schema_csn (Repl_Agmt *ra)
+{
+ CSN *rt;
+
+ PR_ASSERT(NULL != ra);
+
+ PR_Lock(ra->lock);
+ rt = ra->consumerSchemaCSN;
+ PR_Unlock(ra->lock);
+
+ return rt;
+}
+
+void
+agmt_set_consumer_schema_csn (Repl_Agmt *ra, CSN *csn)
+{
+ PR_ASSERT(NULL != ra);
+
+ PR_Lock(ra->lock);
+ csn_free(&ra->consumerSchemaCSN);
+ ra->consumerSchemaCSN = csn;
+ PR_Unlock(ra->lock);
+}
+
+void
+agmt_set_last_update_start (Repl_Agmt *ra, time_t start_time)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ ra->last_update_start_time = start_time;
+ ra->last_update_end_time = 0UL;
+ }
+}
+
+
+void
+agmt_set_last_update_end (Repl_Agmt *ra, time_t end_time)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ ra->last_update_end_time = end_time;
+ }
+}
+
+void
+agmt_set_last_init_start (Repl_Agmt *ra, time_t start_time)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ ra->last_init_start_time = start_time;
+ ra->last_init_end_time = 0UL;
+ }
+}
+
+
+void
+agmt_set_last_init_end (Repl_Agmt *ra, time_t end_time)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ ra->last_init_end_time = end_time;
+ }
+}
+
+void
+agmt_set_last_update_status (Repl_Agmt *ra, int ldaprc, int replrc, const char *message)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ if (replrc == NSDS50_REPL_UPTODATE)
+ {
+ /* no session started, no status update */
+ }
+ else if (ldaprc != LDAP_SUCCESS)
+ {
+ char *replmsg = NULL;
+ if ( replrc ) {
+ replmsg = protocol_response2string(replrc);
+ /* Do not mix the unknown replication error with the known ldap one */
+ if ( strcasecmp(replmsg, "unknown error") == 0 ) {
+ replmsg = NULL;
+ }
+ }
+ if (ldaprc > 0) {
+ PR_snprintf(ra->last_update_status, STATUS_LEN,
+ "%d %s%sLDAP error: %s%s%s",
+ ldaprc,
+ message?message:"",message?"":" - ",
+ ldap_err2string(ldaprc),
+ replmsg ? " - " : "", replmsg ? replmsg : "");
+ } else { /* ldaprc is < 0 */
+ PR_snprintf(ra->last_update_status, STATUS_LEN,
+ "%d %s%sSystem error%s%s",
+ ldaprc,message?message:"",message?"":" - ",
+ replmsg ? " - " : "", replmsg ? replmsg : "");
+ }
+ }
+ else if (replrc != 0)
+ {
+ if (replrc == NSDS50_REPL_REPLICA_READY)
+ {
+ PR_snprintf(ra->last_update_status, STATUS_LEN, "%d %s",
+ ldaprc, "Replica acquired successfully");
+ }
+ else if (replrc == NSDS50_REPL_REPLICA_BUSY)
+ {
+ PR_snprintf(ra->last_update_status, STATUS_LEN,
+ "%d Can't acquire busy replica", replrc );
+ }
+ else if (replrc == NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED)
+ {
+ PR_snprintf(ra->last_update_status, STATUS_LEN, "%d %s",
+ ldaprc, "Replication session successful");
+ }
+ else if (replrc == NSDS50_REPL_DISABLED)
+ {
+ PR_snprintf(ra->last_update_status, STATUS_LEN, "%d Total update aborted: "
+ "Replication agreement for %s\n can not be updated while the replica is disabled.\n"
+ "(If the suffix is disabled you must enable it then restart the server for replication to take place).",
+ replrc, ra->long_name ? ra->long_name : "a replica");
+ /* Log into the errors log, as "ra->long_name" is not accessible from the caller */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "Total update aborted: Replication agreement for \"%s\" "
+ "can not be updated while the replica is disabled\n", ra->long_name ? ra->long_name : "a replica");
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "(If the suffix is disabled you must enable it then restart the server for replication to take place).\n");
+ }
+ else
+ {
+ PR_snprintf(ra->last_update_status, STATUS_LEN,
+ "%d Replication error acquiring replica: %s%s%s",
+ replrc, protocol_response2string(replrc),
+ message?" - ":"",message?message:"");
+ }
+ }
+ else if (message != NULL)
+ {
+ PR_snprintf(ra->last_update_status, STATUS_LEN, "%d %s", ldaprc, message);
+ }
+ else { /* agmt_set_last_update_status(0,0,NULL) to reset agmt */
+ PR_snprintf(ra->last_update_status, STATUS_LEN, "%d", ldaprc);
+ }
+ }
+}
+
+void
+agmt_set_last_init_status (Repl_Agmt *ra, int ldaprc, int replrc, const char *message)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ if (ldaprc != LDAP_SUCCESS)
+ {
+ char *replmsg = NULL;
+ if ( replrc ) {
+ replmsg = protocol_response2string(replrc);
+ /* Do not mix the unknown replication error with the known ldap one */
+ if ( strcasecmp(replmsg, "unknown error") == 0 ) {
+ replmsg = NULL;
+ }
+ }
+ if (ldaprc > 0) {
+ PR_snprintf(ra->last_init_status, STATUS_LEN,
+ "%d %s%sLDAP error: %s%s%s",
+ ldaprc,
+ message?message:"",message?"":" - ",
+ ldap_err2string(ldaprc),
+ replmsg ? " - " : "", replmsg ? replmsg : "");
+ } else { /* ldaprc is < 0 */
+ PR_snprintf(ra->last_init_status, STATUS_LEN,
+ "%d %s%sSystem error%s%s",
+ ldaprc,message?message:"",message?"":" - ",
+ replmsg ? " - " : "", replmsg ? replmsg : "");
+ }
+ }
+ else if (replrc != 0)
+ {
+ if (replrc == NSDS50_REPL_REPLICA_READY)
+ {
+ PR_snprintf(ra->last_init_status, STATUS_LEN, "%d %s",
+ ldaprc, "Replica acquired successfully");
+ }
+ else if (replrc == NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED)
+ {
+ PR_snprintf(ra->last_init_status, STATUS_LEN, "%d %s",
+ ldaprc, "Replication session successful");
+ }
+ else if (replrc == NSDS50_REPL_DISABLED)
+ {
+ PR_snprintf(ra->last_init_status, STATUS_LEN, "%d Total update aborted: "
+ "Replication agreement for %s\n can not be updated while the replica is disabled.\n"
+ "(If the suffix is disabled you must enable it then restart the server for replication to take place).",
+ replrc, ra->long_name ? ra->long_name : "a replica");
+ /* Log into the errors log, as "ra->long_name" is not accessible from the caller */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "Total update aborted: Replication agreement for \"%s\" "
+ "can not be updated while the replica is disabled\n", ra->long_name ? ra->long_name : "a replica");
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "(If the suffix is disabled you must enable it then restart the server for replication to take place).\n");
+ }
+ else
+ {
+ PR_snprintf(ra->last_init_status, STATUS_LEN,
+ "%d Replication error acquiring replica: %s%s%s",
+ replrc, protocol_response2string(replrc),
+ message?" - ":"",message?message:"");
+ }
+ }
+ else if (message != NULL)
+ {
+ PR_snprintf(ra->last_init_status, STATUS_LEN, "%d %s", ldaprc, message);
+ }
+ else { /* agmt_set_last_init_status(0,0,NULL) to reset agmt */
+ PR_snprintf(ra->last_init_status, STATUS_LEN, "%d", ldaprc);
+ }
+ }
+}
+
+
+void
+agmt_set_update_in_progress (Repl_Agmt *ra, PRBool in_progress)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ ra->update_in_progress = in_progress;
+ }
+}
+
+void
+agmt_inc_last_update_changecount (Repl_Agmt *ra, ReplicaId rid, int skipped)
+{
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ int i;
+
+ for ( i = 0; i < ra->num_changecounters; i++ )
+ {
+ if ( ra->changecounters[i]->rid == rid )
+ break;
+ }
+
+ if ( i < ra->num_changecounters )
+ {
+ if ( skipped )
+ ra->changecounters[i]->num_skipped ++;
+ else
+ ra->changecounters[i]->num_replayed ++;
+ }
+ else
+ {
+ ra->num_changecounters ++;
+ ra->changecounters[i] = (struct changecounter*) slapi_ch_calloc(1, sizeof(struct changecounter));
+ ra->changecounters[i]->rid = rid;
+ if ( skipped )
+ ra->changecounters[i]->num_skipped = 1;
+ else
+ ra->changecounters[i]->num_replayed = 1;
+ }
+ }
+}
+
+void
+agmt_get_changecount_string (Repl_Agmt *ra, char *buf, int bufsize)
+{
+ char tmp_buf[32]; /* 5 digit RID, 10 digit each replayed and skipped */
+ int i;
+ int buflen = 0;
+
+ *buf = '\0';
+ if (NULL != ra)
+ {
+ for ( i = 0; i < ra->num_changecounters; i++ )
+ {
+ PR_snprintf (tmp_buf, sizeof(tmp_buf), "%u:%u/%u ",
+ ra->changecounters[i]->rid,
+ ra->changecounters[i]->num_replayed,
+ ra->changecounters[i]->num_skipped);
+ PR_snprintf (buf+buflen, bufsize-buflen, "%s", tmp_buf);
+ buflen += strlen (tmp_buf);
+ }
+ }
+}
+
+static int
+get_agmt_status(Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter,
+ int *returncode, char *returntext, void *arg)
+{
+ char *time_tmp = NULL;
+ char changecount_string[BUFSIZ];
+ Repl_Agmt *ra = (Repl_Agmt *)arg;
+
+ PR_ASSERT(NULL != ra);
+ if (NULL != ra)
+ {
+ PRBool reapActive = PR_FALSE;
+ Slapi_DN *replarea_sdn = NULL;
+ Object *repl_obj = NULL;
+
+ replarea_sdn = agmt_get_replarea(ra);
+ repl_obj = replica_get_replica_from_dn(replarea_sdn);
+ slapi_sdn_free(&replarea_sdn);
+ if (repl_obj) {
+ Replica *replica = (Replica*)object_get_data (repl_obj);
+ reapActive = replica_get_tombstone_reap_active(replica);
+ object_release(repl_obj);
+ }
+ slapi_entry_attr_set_int(e, "nsds5replicaReapActive", (int)reapActive);
+
+ /* these values persist in the dse.ldif file, so we delete them
+ here to avoid multi valued attributes */
+ slapi_entry_attr_delete(e, "nsds5replicaLastUpdateStart");
+ slapi_entry_attr_delete(e, "nsds5replicaLastUpdateEnd");
+ slapi_entry_attr_delete(e, "nsds5replicaChangesSentSinceStartup");
+ slapi_entry_attr_delete(e, "nsds5replicaLastUpdateStatus");
+ slapi_entry_attr_delete(e, "nsds5replicaUpdateInProgress");
+ slapi_entry_attr_delete(e, "nsds5replicaLastInitStart");
+ slapi_entry_attr_delete(e, "nsds5replicaLastInitStatus");
+ slapi_entry_attr_delete(e, "nsds5replicaLastInitEnd");
+
+ /* now, add the real values (singly) */
+ if (ra->last_update_start_time == 0)
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastUpdateStart", "0");
+ }
+ else
+ {
+ time_tmp = format_genTime(ra->last_update_start_time);
+ slapi_entry_add_string(e, "nsds5replicaLastUpdateStart", time_tmp);
+ slapi_ch_free((void **)&time_tmp);
+ }
+ if (ra->last_update_end_time == 0)
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastUpdateEnd", "0");
+ }
+ else
+ {
+ time_tmp = format_genTime(ra->last_update_end_time);
+ slapi_entry_add_string(e, "nsds5replicaLastUpdateEnd", time_tmp);
+ slapi_ch_free((void **)&time_tmp);
+ }
+ agmt_get_changecount_string (ra, changecount_string, sizeof (changecount_string) );
+ slapi_entry_add_string(e, "nsds5replicaChangesSentSinceStartup", changecount_string);
+ if (ra->last_update_status[0] == '\0')
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastUpdateStatus", "0 No replication sessions started since server startup");
+ }
+ else
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastUpdateStatus", ra->last_update_status);
+ }
+ slapi_entry_add_string(e, "nsds5replicaUpdateInProgress", ra->update_in_progress ? "TRUE" : "FALSE");
+ if (ra->last_init_start_time == 0)
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastInitStart", "0");
+ }
+ else
+ {
+ time_tmp = format_genTime(ra->last_init_start_time);
+ slapi_entry_add_string(e, "nsds5replicaLastInitStart", time_tmp);
+ slapi_ch_free((void **)&time_tmp);
+ }
+ if (ra->last_init_end_time == 0)
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastInitEnd", "0");
+ }
+ else
+ {
+ time_tmp = format_genTime(ra->last_init_end_time);
+ slapi_entry_add_string(e, "nsds5replicaLastInitEnd", time_tmp);
+ slapi_ch_free((void **)&time_tmp);
+ }
+ if (ra->last_init_status[0] != '\0')
+ {
+ slapi_entry_add_string(e, "nsds5replicaLastInitStatus", ra->last_init_status);
+ }
+ }
+ return SLAPI_DSE_CALLBACK_OK;
+}
+
+static void
+update_window_state_change_callback (void *arg, PRBool opened)
+{
+ Repl_Agmt *agmt = (Repl_Agmt*)arg;
+
+ PR_ASSERT (agmt);
+
+ if (opened)
+ {
+ prot_notify_window_opened (agmt->protocol);
+ }
+ else
+ {
+ prot_notify_window_closed (agmt->protocol);
+ }
+}
+
+ReplicaId
+agmt_get_consumer_rid ( Repl_Agmt *agmt, void *conn )
+{
+ if ( agmt->consumerRID <= 0 ) {
+
+ char mapping_tree_node[512];
+ struct berval **bvals = NULL;
+
+ PR_snprintf ( mapping_tree_node,
+ sizeof (mapping_tree_node),
+ "cn=replica,cn=\"%s\",cn=mapping tree,cn=config",
+ slapi_sdn_get_dn (agmt->replarea) );
+ conn_read_entry_attribute ( conn, mapping_tree_node, "nsDS5ReplicaID", &bvals );
+ if ( NULL != bvals && NULL != bvals[0] ) {
+ char *ridstr = slapi_ch_malloc( bvals[0]->bv_len + 1 );
+ memcpy ( ridstr, bvals[0]->bv_val, bvals[0]->bv_len );
+ ridstr[bvals[0]->bv_len] = '\0';
+ agmt->consumerRID = atoi (ridstr);
+ slapi_ch_free ( (void**) &ridstr );
+ ber_bvecfree ( bvals );
+ }
+ }
+
+ return agmt->consumerRID;
+}