diff options
Diffstat (limited to 'ldap/servers/plugins/replication/repl5_agmt.c')
-rw-r--r-- | ldap/servers/plugins/replication/repl5_agmt.c | 1766 |
1 files changed, 1766 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c new file mode 100644 index 00000000..2992fc11 --- /dev/null +++ b/ldap/servers/plugins/replication/repl5_agmt.c @@ -0,0 +1,1766 @@ +/** BEGIN COPYRIGHT BLOCK + * Copyright 2001 Sun Microsystems, Inc. + * Portions copyright 1999, 2001-2003 Netscape Communications Corporation. + * All rights reserved. + * END COPYRIGHT BLOCK **/ +/* repl5_agmt.c */ +/* + + Support for 5.0-style replication agreements. + + Directory Server 5.0 replication agreements contain information about + replication consumers that we are supplying. + + This module encapsulates the methods available for adding, deleting, + modifying, and firing replication agreements. + + Methods: + + agmt_new - Create a new replication agreement, in response to a new + replication agreement being added over LDAP. + agmt_delete - Destroy an agreement. It is an error to destroy an + agreement that has not been stopped. + agmt_getstatus - get the status of this replication agreement. + agmt_replicate_now - initiate a replication session asap, even if the + schedule says we shouldn't. + agmt_start - start replicating, according to schedule. Starts a new + thread to handle replication. + agmt_stop - stop replicating asap and end replication thread. + agmt_notify_change - notify the replication agreement about a change that + has been logged. The replication agreement will + decide if it needs to take some action, e.g. start a + replication session. + agmt_initialize_replica - start a complete replica refresh. + agmt_set_schedule_from_entry - (re)set the schedule associated with this + replication agreement based on a RA entry's contents. + agmt_set_credentials_from_entry - (re)set the credentials used to bind + to the remote replica. + agmt_set_binddn_from_entry - (re)set the DN used to bind + to the remote replica. + agmt_set_bind_method_from_entry - (re)set the bind method used to bind + to the remote replica (SIMPLE or SSLCLIENTAUTH). + agmt_set_transportinfo_from_entry - (re)set the transport used to bind + to the remote replica (SSL or not) + +*/ + +#include "repl5.h" +#include "repl5_prot_private.h" +#include "cl5_api.h" + +#define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */ +#define TRANSPORT_FLAG_SSL 1 +#define STATUS_LEN 1024 + +struct changecounter { + ReplicaId rid; + PRUint32 num_replayed; + PRUint32 num_skipped; +}; + +typedef struct repl5agmt { + char *hostname; /* remote hostname */ + int port; /* port of remote server */ + PRUint32 transport_flags; /* SSL, TLS, etc. */ + char *binddn; /* DN to bind as */ + struct berval *creds; /* Password, or certificate */ + int bindmethod; /* Bind method - simple, SSL */ + Slapi_DN *replarea; /* DN of replicated area */ + char **frac_attrs; /* list of fractional attributes to be replicated */ + Schedule *schedule; /* Scheduling information */ + int auto_initialize; /* 1 = automatically re-initialize replica */ + const Slapi_DN *dn; /* DN of replication agreement entry */ + const Slapi_RDN *rdn; /* RDN of replication agreement entry */ + char *long_name; /* Long name (rdn + host, port) of entry, for logging */ + Repl_Protocol *protocol; /* Protocol object - manages protocol */ + struct changecounter *changecounters[MAX_NUM_OF_MASTERS]; /* changes sent/skipped since server start up */ + int num_changecounters; + time_t last_update_start_time; /* Local start time of last update session */ + time_t last_update_end_time; /* Local end time of last update session */ + char last_update_status[STATUS_LEN]; /* Status of last update. Format = numeric code <space> textual description */ + PRBool update_in_progress; + time_t last_init_start_time; /* Local start time of last total init */ + time_t last_init_end_time; /* Local end time of last total init */ + char last_init_status[STATUS_LEN]; /* Status of last total init. Format = numeric code <space> textual description */ + PRLock *lock; + Object *consumerRUV; /* last RUV received from the consumer - used for changelog purging */ + CSN *consumerSchemaCSN; /* last schema CSN received from the consumer */ + ReplicaId consumerRID; /* indicates if the consumer is the originator of a CSN */ + long timeout; /* timeout (in seconds) for outbound LDAP connections to remote server */ + PRBool stop_in_progress; /* set by agmt_stop when shutting down */ + long busywaittime; /* time in seconds to wait after getting a REPLICA BUSY from the consumer - + to allow another supplier to finish sending its updates - + if set to 0, this means to use the default value if we get a busy + signal from the consumer */ + long pausetime; /* time in seconds to pause after sending updates - + to allow another supplier to send its updates - + should be greater than busywaittime - + if set to 0, this means do not pause */ +} repl5agmt; + +/* Forward declarations */ +void agmt_delete(void **rap); +static void update_window_state_change_callback (void *arg, PRBool opened); +static int get_agmt_status(Slapi_PBlock *pb, Slapi_Entry* e, + Slapi_Entry* entryAfter, int *returncode, char *returntext, void *arg); +static int agmt_set_bind_method_no_lock(Repl_Agmt *ra, const Slapi_Entry *e); +static int agmt_set_transportinfo_no_lock(Repl_Agmt *ra, const Slapi_Entry *e); + +/* +Schema for replication agreement: + +cn +nsds5ReplicaHost - hostname +nsds5ReplicaPort - port number +nsds5ReplicaTransportInfo - "SSL", "startTLS", or may be absent; +nsds5ReplicaBindDN +nsds5ReplicaCredentials +nsds5ReplicaBindMethod - "SIMPLE" or "SSLCLIENTAUTH". +nsds5ReplicaRoot - Replicated suffix +nsds5ReplicatedAttributeList - Unused so far (meant for fractional repl). +nsds5ReplicaUpdateSchedule +nsds5ReplicaTimeout - Outbound repl operations timeout +nsds50ruv - consumer's RUV +nsds5ReplicaBusyWaitTime - time to wait after getting a REPLICA BUSY from the consumer +nsds5ReplicaSessionPauseTime - time to pause after sending updates to allow another supplier to send +*/ + + +/* + * Validate an agreement, making sure that it's valid. + * Return 1 if the agreement is valid, 0 otherwise. + */ +static int +agmt_is_valid(Repl_Agmt *ra) +{ + int return_value = 1; /* assume valid, initially */ + PR_ASSERT(NULL != ra); + PR_ASSERT(NULL != ra->dn); + + if (NULL == ra->hostname) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" " + "is malformed: missing host name.\n", slapi_sdn_get_dn(ra->dn)); + return_value = 0; + } + if (ra->port <= 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" " + "is malformed: invalid port number %d.\n", slapi_sdn_get_dn(ra->dn), ra->port); + return_value = 0; + } + if (ra->timeout < 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" " + "is malformed: invalid timeout %d.\n", slapi_sdn_get_dn(ra->dn), ra->timeout); + return_value = 0; + } + if (ra->busywaittime < 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" " + "is malformed: invalid busy wait time %d.\n", slapi_sdn_get_dn(ra->dn), ra->busywaittime); + return_value = 0; + } + if (ra->pausetime < 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Replication agreement \"%s\" " + "is malformed: invalid pausetime %d.\n", slapi_sdn_get_dn(ra->dn), ra->pausetime); + return_value = 0; + } + return return_value; +} + + +Repl_Agmt * +agmt_new_from_entry(Slapi_Entry *e) +{ + Repl_Agmt *ra; + char *tmpstr; + Slapi_Attr *sattr; + + char *auto_initialize = NULL; + char *val_nsds5BeginReplicaRefresh = "start"; + + ra = (Repl_Agmt *)slapi_ch_calloc(1, sizeof(repl5agmt)); + if ((ra->lock = PR_NewLock()) == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Unable to create new lock " + "for replication agreement \"%s\" - agreement ignored.\n", + slapi_entry_get_dn_const(e)); + goto loser; + } + + /* Find all the stuff we need for the agreement */ + + /* To Allow Consumer Initialisation when adding an agreement: */ + + /* + Using 'auto_initialize' member of 'repl5agmt' structure to + store the effect of 'nsds5BeginReplicaRefresh' attribute's value + in it. + */ + auto_initialize = slapi_entry_attr_get_charptr(e, type_nsds5BeginReplicaRefresh); + if ((auto_initialize != NULL) && (strcasecmp(auto_initialize, val_nsds5BeginReplicaRefresh) == 0)) + { + ra->auto_initialize = STATE_PERFORMING_TOTAL_UPDATE; + } + else + { + ra->auto_initialize = STATE_PERFORMING_INCREMENTAL_UPDATE; + } + + if (auto_initialize) + { + slapi_ch_free_string (&auto_initialize); + } + + /* Host name of remote replica */ + ra->hostname = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaHost); + /* Port number for remote replica instance */ + ra->port = slapi_entry_attr_get_int(e, type_nsds5ReplicaPort); + /* SSL, TLS, or other transport stuff */ + ra->transport_flags = 0; + agmt_set_transportinfo_no_lock(ra, e); + + /* DN to use when binding. May be empty if cert-based auth is to be used. */ + ra->binddn = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaBindDN); + if (NULL == ra->binddn) + { + ra->binddn = slapi_ch_strdup(""); + } + /* Credentials to use when binding. */ + ra->creds = (struct berval *)slapi_ch_malloc(sizeof(struct berval)); + ra->creds->bv_val = NULL; + ra->creds->bv_len = 0; + if (slapi_entry_attr_find(e, type_nsds5ReplicaCredentials, &sattr) == 0) + { + Slapi_Value *sval; + if (slapi_attr_first_value(sattr, &sval) == 0) + { + const struct berval *bv = slapi_value_get_berval(sval); + if (NULL != bv) + { + ra->creds->bv_val = slapi_ch_malloc(bv->bv_len + 1); + memcpy(ra->creds->bv_val, bv->bv_val, bv->bv_len); + ra->creds->bv_len = bv->bv_len; + ra->creds->bv_val[bv->bv_len] = '\0'; /* be safe */ + } + } + } + /* How to bind */ + (void)agmt_set_bind_method_no_lock(ra, e); + + /* timeout. */ + ra->timeout = DEFAULT_TIMEOUT; + if (slapi_entry_attr_find(e, type_nsds5ReplicaTimeout, &sattr) == 0) + { + Slapi_Value *sval; + if (slapi_attr_first_value(sattr, &sval) == 0) + { + ra->timeout = slapi_value_get_long(sval); + } + } + + /* DN of entry at root of replicated area */ + tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot); + if (NULL != tmpstr) + { + ra->replarea = slapi_sdn_new_dn_passin(tmpstr); + } + /* XXXggood get fractional attribute include/exclude lists here */ + /* Replication schedule */ + ra->schedule = schedule_new(update_window_state_change_callback, ra, agmt_get_long_name(ra)); + if (slapi_entry_attr_find(e, type_nsds5ReplicaUpdateSchedule, &sattr) == 0) + { + schedule_set(ra->schedule, sattr); + } + + /* busy wait time - time to wait after getting REPLICA BUSY from consumer */ + ra->busywaittime = slapi_entry_attr_get_long(e, type_nsds5ReplicaBusyWaitTime); + + /* pause time - time to pause after a session has ended */ + ra->pausetime = slapi_entry_attr_get_long(e, type_nsds5ReplicaSessionPauseTime); + + /* consumer's RUV */ + if (slapi_entry_attr_find(e, type_ruvElement, &sattr) == 0) + { + RUV *ruv; + + if (ruv_init_from_slapi_attr(sattr, &ruv) == 0) + { + ra->consumerRUV = object_new (ruv, (FNFree)ruv_destroy); + } + } + + ra->consumerRID = 0; + + /* DN and RDN of the replication agreement entry itself */ + ra->dn = slapi_sdn_dup(slapi_entry_get_sdn((Slapi_Entry *)e)); + ra->rdn = slapi_rdn_new_sdn(ra->dn); + + /* Compute long name */ + { + const char *agmtname = slapi_rdn_get_rdn(ra->rdn); + char hostname[128]; + char *dot; + + strncpy(hostname, ra->hostname ? ra->hostname : "(unknown)", sizeof(hostname)); + hostname[sizeof(hostname)-1] = '\0'; + dot = strchr(hostname, '.'); + if (dot) { + *dot = '\0'; + } + ra->long_name = slapi_ch_malloc(strlen(agmtname) + + strlen(hostname) + 25); + sprintf(ra->long_name, "agmt=\"%s\" (%s:%d)", agmtname, hostname, ra->port); + } + + /* Initialize status information */ + ra->last_update_start_time = 0UL; + ra->last_update_end_time = 0UL; + ra->num_changecounters = 0; + ra->last_update_status[0] = '\0'; + ra->update_in_progress = PR_FALSE; + ra->stop_in_progress = PR_FALSE; + ra->last_init_end_time = 0UL; + ra->last_init_start_time = 0UL; + ra->last_init_status[0] = '\0'; + + if (!agmt_is_valid(ra)) + { + goto loser; + } + + /* Now that the agreement is done, just check if changelog is configured */ + if (cl5GetState() != CL5_STATE_OPEN) { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "WARNING: " + "Replication agreement added but there is no changelog configured. " + "No change will be replicated until a changelog is configured.\n"); + } + + /* + * Establish a callback for this agreement's entry, so we can + * adorn it with status information when read. + */ + slapi_config_register_callback(SLAPI_OPERATION_SEARCH, DSE_FLAG_PREOP, slapi_sdn_get_ndn(ra->dn), + LDAP_SCOPE_BASE, "(objectclass=*)", get_agmt_status, ra); + + return ra; +loser: + agmt_delete((void **)&ra); + return NULL; +} + + + +Repl_Agmt * +agmt_new_from_pblock(Slapi_PBlock *pb) +{ + Slapi_Entry *e; + + slapi_pblock_get(pb, SLAPI_ADD_ENTRY, &e); + return agmt_new_from_entry(e); +} + + +/* + This should never be called directly - only should be called + as a destructor. XXXggood this is not finished + */ +void +agmt_delete(void **rap) +{ + Repl_Agmt *ra; + PR_ASSERT(NULL != rap); + PR_ASSERT(NULL != *rap); + + ra = (Repl_Agmt *)*rap; + + /* do prot_delete first - we may be doing some processing using this + replication agreement, and prot_delete will make sure the + processing is complete - then it should be safe to clean up the + other fields below + */ + prot_delete(&ra->protocol); + + /* + * Remove the callback for this agreement's entry + */ + slapi_config_remove_callback(SLAPI_OPERATION_SEARCH, DSE_FLAG_PREOP, + slapi_sdn_get_ndn(ra->dn), + LDAP_SCOPE_BASE, "(objectclass=*)", + get_agmt_status); + + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free((void **)&(ra->hostname)); + slapi_ch_free((void **)&(ra->binddn)); + + if (NULL != ra->creds) + { + /* XXX free berval */ + } + if (NULL != ra->replarea) + { + slapi_sdn_free(&ra->replarea); + } + + if (NULL != ra->consumerRUV) + { + object_release (ra->consumerRUV); + } + + csn_free (&ra->consumerSchemaCSN); + while ( --(ra->num_changecounters) >= 0 ) + { + slapi_ch_free((void **)&ra->changecounters[ra->num_changecounters]); + } + + schedule_destroy(ra->schedule); + slapi_ch_free((void **)&ra->long_name); + slapi_ch_free((void **)rap); +} + + +/* + * Allow replication for this replica to begin. Replication will + * occur at the next scheduled time. Returns 0 on success, -1 on + * failure. + */ +int +agmt_start(Repl_Agmt *ra) +{ + Repl_Protocol *prot = NULL; + + int protocol_state; + + /* To Allow Consumer Initialisation when adding an agreement: */ + if (ra->auto_initialize == STATE_PERFORMING_TOTAL_UPDATE) + { + protocol_state = STATE_PERFORMING_TOTAL_UPDATE; + } + else + { + protocol_state = STATE_PERFORMING_INCREMENTAL_UPDATE; + } + + /* First, create a new protocol object */ + if ((prot = prot_new(ra, protocol_state)) == NULL) { + return -1; + } + + /* Now it is safe to own the agreement lock */ + PR_Lock(ra->lock); + + /* Check that replication is not already started */ + if (ra->protocol != NULL) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replication already started for agreement \"%s\"\n", agmt_get_long_name(ra)); + PR_Unlock(ra->lock); + prot_free(&prot); + return 0; + } + + ra->protocol = prot; + + /* Start the protocol thread */ + prot_start(ra->protocol); + + PR_Unlock(ra->lock); + return 0; +} + +/* +Cease replicating to this replica as soon as possible. +*/ +int +agmt_stop(Repl_Agmt *ra) +{ + int return_value = 0; + Repl_Protocol *rp = NULL; + + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + ra->stop_in_progress = PR_TRUE; + rp = ra->protocol; + PR_Unlock(ra->lock); + if (NULL != rp) /* we use this pointer outside the lock - dangerous? */ + { + prot_stop(rp); + } + PR_Lock(ra->lock); + ra->stop_in_progress = PR_FALSE; + /* we do not reuse the protocol object so free it */ + prot_free(&ra->protocol); + PR_Unlock(ra->lock); + return return_value; +} + +/* +Send any pending updates as soon as possible, ignoring any replication +schedules. +*/ +int +agmt_replicate_now(Repl_Agmt *ra) +{ + int return_value = 0; + + return return_value; +} + +/* + * Return a copy of the remote replica's hostname. + */ +char * +agmt_get_hostname(const Repl_Agmt *ra) +{ + char *return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = slapi_ch_strdup(ra->hostname); + PR_Unlock(ra->lock); + return return_value; +} + +/* + * Return the port number of the remote replica's instance. + */ +int +agmt_get_port(const Repl_Agmt *ra) +{ + int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->port; + PR_Unlock(ra->lock); + return return_value; +} + +/* + * Return the transport flags for this agreement. + */ +PRUint32 +agmt_get_transport_flags(const Repl_Agmt *ra) +{ + unsigned int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->transport_flags; + PR_Unlock(ra->lock); + return return_value; +} + +/* + * Return a copy of the bind dn to be used with this + * agreement (may return NULL if no binddn is required, + * e.g. SSL client auth. + */ +char * +agmt_get_binddn(const Repl_Agmt *ra) +{ + char *return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->binddn == NULL ? NULL : slapi_ch_strdup(ra->binddn); + PR_Unlock(ra->lock); + return return_value; +} + +/* + * Return a copy of the credentials. + */ +struct berval * +agmt_get_credentials(const Repl_Agmt *ra) +{ + struct berval *return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = (struct berval *)slapi_ch_malloc(sizeof(struct berval)); + return_value->bv_val = (char *)slapi_ch_malloc(ra->creds->bv_len + 1); + return_value->bv_len = ra->creds->bv_len; + memcpy(return_value->bv_val, ra->creds->bv_val, ra->creds->bv_len); + return_value->bv_val[return_value->bv_len] = '\0'; /* just in case */ + PR_Unlock(ra->lock); + return return_value; +} + +int +agmt_get_bindmethod(const Repl_Agmt *ra) +{ + int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->bindmethod; + PR_Unlock(ra->lock); + return return_value; +} + +/* + * Return a copy of the dn at the top of the replicated area. + */ +Slapi_DN * +agmt_get_replarea(const Repl_Agmt *ra) +{ + Slapi_DN *return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = slapi_sdn_new(); + slapi_sdn_copy(ra->replarea, return_value); + PR_Unlock(ra->lock); + return return_value; +} + +int +agmt_is_fractional(const Repl_Agmt *ra) +{ + int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->frac_attrs != NULL; + PR_Unlock(ra->lock); + return return_value; +} + +int +agmt_is_fractional_attr(const Repl_Agmt *ra, const char *attrname) +{ + int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = 1; /* XXXggood finish this */ + PR_Unlock(ra->lock); + return return_value; +} + +int +agmt_get_auto_initialize(const Repl_Agmt *ra) +{ + int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->auto_initialize; + PR_Unlock(ra->lock); + return return_value; +} + +long +agmt_get_timeout(const Repl_Agmt *ra) +{ + long return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->timeout; + PR_Unlock(ra->lock); + return return_value; +} + +long +agmt_get_busywaittime(const Repl_Agmt *ra) +{ + long return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->busywaittime; + PR_Unlock(ra->lock); + return return_value; +} +long +agmt_get_pausetime(const Repl_Agmt *ra) +{ + long return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + return_value = ra->pausetime; + PR_Unlock(ra->lock); + return return_value; +} + +/* + * Warning - reference to the long name of the agreement is returned. + * The long name of an agreement is the DN of the agreement entry, + * followed by the host/port for the replica. + */ +const char * +agmt_get_long_name(const Repl_Agmt *ra) +{ + char *return_value = NULL; + + return_value = ra ? ra->long_name : ""; + return return_value; +} + +/* + * Warning - reference to dn is returned. However, since the dn of + * the replication agreement is its name, it won't change during the + * lifetime of the replication agreement object. + */ +const Slapi_DN * +agmt_get_dn_byref(const Repl_Agmt *ra) +{ + const Slapi_DN *return_value = NULL; + + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + return_value = ra->dn; + } + return return_value; +} + +/* Return 1 if name matches the replication Dn, 0 otherwise */ +int +agmt_matches_name(const Repl_Agmt *ra, const Slapi_DN *name) +{ + int return_value = 0; + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + PR_Lock(ra->lock); + if (slapi_sdn_compare(name, ra->dn) == 0) + { + return_value = 1; + } + PR_Unlock(ra->lock); + } + return return_value; +} + +/* Return 1 if name matches the replication area, 0 otherwise */ +int +agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name) +{ + int return_value = 0; + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + PR_Lock(ra->lock); + if (slapi_sdn_compare(name, ra->replarea) == 0) + { + return_value = 1; + } + PR_Unlock(ra->lock); + } + return return_value; +} + + +int +agmt_schedule_in_window_now(const Repl_Agmt *ra) +{ + int return_value; + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (NULL != ra->schedule && schedule_in_window_now(ra->schedule)) + { + return_value = 1; + } + else + { + return_value = 0; + } + PR_Unlock(ra->lock); + return return_value; +} + + +/* + * Set or reset the credentials used to bind to the remote replica. + * + * Returns 0 if credentials set, or -1 if an error occurred. + */ +int +agmt_set_credentials_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = 0; + + PR_ASSERT(NULL != ra); + slapi_entry_attr_find(e, type_nsds5ReplicaCredentials, &sattr); + PR_Lock(ra->lock); + slapi_ch_free((void **)&ra->creds->bv_val); + ra->creds->bv_len = 0; + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + const struct berval *bv = slapi_value_get_berval(sval); + ra->creds->bv_val = slapi_ch_calloc(1, bv->bv_len + 1); + memcpy(ra->creds->bv_val, bv->bv_val, bv->bv_len); + ra->creds->bv_len = bv->bv_len; + } + } + /* If no credentials set, set to zero-length string */ + ra->creds->bv_val = NULL == ra->creds->bv_val ? slapi_ch_strdup("") : ra->creds->bv_val; + PR_Unlock(ra->lock); + prot_notify_agmt_changed(ra->protocol, ra->long_name); + return return_value; +} + +/* + * Set or reset the DN used to bind to the remote replica. + * + * Returns 0 if DN set, or -1 if an error occurred. + */ +int +agmt_set_binddn_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = 0; + + PR_ASSERT(NULL != ra); + slapi_entry_attr_find(e, type_nsds5ReplicaBindDN, &sattr); + PR_Lock(ra->lock); + slapi_ch_free((void **)&ra->binddn); + ra->binddn = NULL; + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + const char *val = slapi_value_get_string(sval); + ra->binddn = strdup(val); + } + } + /* If no BindDN set, set to zero-length string */ + if (ra->binddn == NULL) { + ra->binddn = strdup(""); + } + PR_Unlock(ra->lock); + prot_notify_agmt_changed(ra->protocol, ra->long_name); + return return_value; +} + +/* + * Set or reset the bind method used to bind to the remote replica. + * + * Returns 0 if bind method set, or -1 if an error occurred. + */ +static int +agmt_set_bind_method_no_lock(Repl_Agmt *ra, const Slapi_Entry *e) +{ + char *tmpstr = NULL; + int return_value = 0; + + PR_ASSERT(NULL != ra); + tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaBindMethod); + + if (NULL == tmpstr || strcasecmp(tmpstr, "SIMPLE") == 0) + { + ra->bindmethod = BINDMETHOD_SIMPLE_AUTH; + } + else if (strcasecmp(tmpstr, "SSLCLIENTAUTH") == 0) + { + ra->bindmethod = BINDMETHOD_SSL_CLIENTAUTH; + } + else + { + ra->bindmethod = BINDMETHOD_SIMPLE_AUTH; + } + slapi_ch_free((void **)&tmpstr); + return return_value; +} + +int +agmt_set_bind_method_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + char *tmpstr = NULL; + int return_value = 0; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + return_value = agmt_set_bind_method_no_lock(ra, e); + PR_Unlock(ra->lock); + prot_notify_agmt_changed(ra->protocol, ra->long_name); + return return_value; +} + +/* + * Set or reset the transport used to bind to the remote replica. + * + * Returns 0 if transport set, or -1 if an error occurred. + */ +static int +agmt_set_transportinfo_no_lock(Repl_Agmt *ra, const Slapi_Entry *e) +{ + char *tmpstr; + int rc = 0; + + tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5TransportInfo); + if (NULL != tmpstr && strcasecmp(tmpstr, "SSL") == 0) + { + ra->transport_flags |= TRANSPORT_FLAG_SSL; + } else { + ra->transport_flags &= ~TRANSPORT_FLAG_SSL; + } + + slapi_ch_free((void **)&tmpstr); + return (rc); +} + +int +agmt_set_transportinfo_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + int return_value = 0; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + return_value = agmt_set_transportinfo_no_lock(ra, e); + PR_Unlock(ra->lock); + prot_notify_agmt_changed(ra->protocol, ra->long_name); + + return return_value; +} + + +/* + * Set or reset the replication schedule. Notify the protocol handler + * that a change has been made. + * + * Returns 0 if schedule was set or -1 if an error occurred. + */ +int +agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e ) +{ + Slapi_Attr *sattr; + int return_value = 0; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + PR_Unlock(ra->lock); + + if (slapi_entry_attr_find(e, type_nsds5ReplicaUpdateSchedule, &sattr) != 0) + { + sattr = NULL; /* no schedule ==> delete any existing one */ + } + + /* make it so */ + return_value = schedule_set(ra->schedule, sattr); + + if ( 0 == return_value ) { + /* schedule set OK -- spread the news */ + prot_notify_agmt_changed(ra->protocol, ra->long_name); + } + + return return_value; +} + +/* + * Set or reset the timeout used to bind to the remote replica. + * + * Returns 0 if timeout set, or -1 if an error occurred. + */ +int +agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = -1; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + + slapi_entry_attr_find(e, type_nsds5ReplicaTimeout, &sattr); + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + long tmpval = slapi_value_get_long(sval); + if (tmpval >= 0) { + ra->timeout = tmpval; + return_value = 0; /* success! */ + } + } + } + PR_Unlock(ra->lock); + if (return_value == 0) + { + prot_notify_agmt_changed(ra->protocol, ra->long_name); + } + return return_value; +} + +/* + * Set or reset the busywaittime + * + * Returns 0 if busywaittime set, or -1 if an error occurred. + */ +int +agmt_set_busywaittime_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = -1; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + + slapi_entry_attr_find(e, type_nsds5ReplicaBusyWaitTime, &sattr); + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + long tmpval = slapi_value_get_long(sval); + if (tmpval >= 0) { + ra->busywaittime = tmpval; + return_value = 0; /* success! */ + } + } + } + PR_Unlock(ra->lock); + if (return_value == 0) + { + prot_notify_agmt_changed(ra->protocol, ra->long_name); + } + return return_value; +} + +/* + * Set or reset the pausetime + * + * Returns 0 if pausetime set, or -1 if an error occurred. + */ +int +agmt_set_pausetime_from_entry(Repl_Agmt *ra, const Slapi_Entry *e) +{ + Slapi_Attr *sattr = NULL; + int return_value = -1; + + PR_ASSERT(NULL != ra); + PR_Lock(ra->lock); + if (ra->stop_in_progress) + { + PR_Unlock(ra->lock); + return return_value; + } + + slapi_entry_attr_find(e, type_nsds5ReplicaSessionPauseTime, &sattr); + if (NULL != sattr) + { + Slapi_Value *sval = NULL; + slapi_attr_first_value(sattr, &sval); + if (NULL != sval) + { + long tmpval = slapi_value_get_long(sval); + if (tmpval >= 0) { + ra->pausetime = tmpval; + return_value = 0; /* success! */ + } + } + } + PR_Unlock(ra->lock); + if (return_value == 0) + { + prot_notify_agmt_changed(ra->protocol, ra->long_name); + } + return return_value; +} + +/* XXXggood - also make this pass an arg that tells if there was + * an update to a priority attribute */ +void +agmt_notify_change(Repl_Agmt *agmt, Slapi_PBlock *pb) +{ + if (NULL != pb) + { + /* Is the entry within our replicated area? */ + char *target_dn; + Slapi_DN *target_sdn; + int change_is_relevant = 0; + + PR_ASSERT(NULL != agmt); + PR_Lock(agmt->lock); + if (agmt->stop_in_progress) + { + PR_Unlock(agmt->lock); + return; + } + + slapi_pblock_get(pb, SLAPI_TARGET_DN, &target_dn); + target_sdn = slapi_sdn_new_dn_byref(target_dn); /* XXX see if you can avoid allocating this */ + + if (slapi_sdn_issuffix(target_sdn, agmt->replarea)) + { + /* + * Yep, it's in our replicated area. Is this a fractional + * replication agreement? + */ + if (NULL != agmt->frac_attrs) + { + /* + * Yep, it's fractional. See if the change should be + * tossed because it doesn't affect any of the replicated + * attributes. + */ + int optype; + int affects_fractional_attribute = 0; + + slapi_pblock_get(pb, SLAPI_OPERATION_TYPE, &optype); + if (SLAPI_OPERATION_MODIFY == optype) + { + LDAPMod **mods; + int i, j; + + slapi_pblock_get(pb, SLAPI_MODIFY_MODS, &mods); + for (i = 0; !affects_fractional_attribute && NULL != agmt->frac_attrs[i]; i++) + { + for (j = 0; !affects_fractional_attribute && NULL != mods[j]; j++) + { + if (slapi_attr_types_equivalent(agmt->frac_attrs[i], + mods[i]->mod_type)) + { + affects_fractional_attribute = 1; + } + } + } + } + else + { + /* + * Add, delete, and modrdn always cause some sort of + * operation replay, even if agreement is fractional. + */ + affects_fractional_attribute = 1; + } + if (affects_fractional_attribute) + { + change_is_relevant = 1; + } + } + else + { + /* Not a fractional agreement */ + change_is_relevant = 1; + } + } + PR_Unlock(agmt->lock); + slapi_sdn_free(&target_sdn); + if (change_is_relevant) + { + /* Notify the protocol that a change has occurred */ + prot_notify_update(agmt->protocol); + } + } +} + + + +int +agmt_is_50_mm_protocol(const Repl_Agmt *agmt) +{ + return 1; /* XXXggood could support > 1 protocol */ +} + + + +int +agmt_initialize_replica(const Repl_Agmt *agmt) +{ + PR_ASSERT(NULL != agmt); + PR_Lock(agmt->lock); + if (agmt->stop_in_progress) + { + PR_Unlock(agmt->lock); + return 0; + } + PR_Unlock(agmt->lock); + /* Call prot_initialize_replica only if the suffix is enabled (agmt->protocol != NULL) */ + if (NULL != agmt->protocol) { + prot_initialize_replica(agmt->protocol); + } + else { + /* agmt->protocol == NULL --> Suffix is disabled */ + return -1; + } + return 0; +} + +/* delete nsds5BeginReplicaRefresh attribute to indicate to the clients + that replica initialization have completed */ +void +agmt_replica_init_done (const Repl_Agmt *agmt) +{ + int rc; + Slapi_PBlock *pb = slapi_pblock_new (); + LDAPMod *mods [2]; + LDAPMod mod; + + mods[0] = &mod; + mods[1] = NULL; + mod.mod_op = LDAP_MOD_DELETE | LDAP_MOD_BVALUES; + mod.mod_type = (char*)type_nsds5ReplicaInitialize; + mod.mod_bvalues = NULL; + + slapi_modify_internal_set_pb(pb, slapi_sdn_get_dn (agmt->dn), mods, NULL/* controls */, + NULL/* uniqueid */, repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0/* flags */); + slapi_modify_internal_pb (pb); + + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_ATTRIBUTE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "agmt_replica_init_done: " + "failed to remove (%s) attribute from (%s) entry; LDAP error - %d\n", + type_nsds5ReplicaInitialize, slapi_sdn_get_ndn (agmt->dn), rc); + } + + slapi_pblock_destroy (pb); +} + +/* Agreement object is acquired on behalf of the caller. + The caller is responsible for releasing the object + when it is no longer used */ + +Object* +agmt_get_consumer_ruv (Repl_Agmt *ra) +{ + Object *rt = NULL; + + PR_ASSERT(NULL != ra); + + PR_Lock(ra->lock); + if (ra->consumerRUV) + { + object_acquire (ra->consumerRUV); + rt = ra->consumerRUV; + } + + PR_Unlock(ra->lock); + + return rt; +} + +int +agmt_set_consumer_ruv (Repl_Agmt *ra, RUV *ruv) +{ + if (ra == NULL || ruv == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmt_set_consumer_ruv: invalid argument" + " agmt - %p, ruv - %p\n", ra, ruv); + return -1; + } + + PR_Lock(ra->lock); + + if (ra->consumerRUV) + { + object_release (ra->consumerRUV); + } + + ra->consumerRUV = object_new (ruv_dup (ruv), (FNFree)ruv_destroy); + + PR_Unlock(ra->lock); + + return 0; +} + +void +agmt_update_consumer_ruv (Repl_Agmt *ra) +{ + int rc; + RUV *ruv; + Slapi_Mod smod; + Slapi_Mod smod_last_modified; + Slapi_PBlock *pb; + LDAPMod *mods[3]; + + PR_ASSERT (ra); + PR_Lock(ra->lock); + + if (ra->consumerRUV) + { + ruv = (RUV*) object_get_data (ra->consumerRUV); + PR_ASSERT (ruv); + + ruv_to_smod(ruv, &smod); + ruv_last_modified_to_smod(ruv, &smod_last_modified); + + /* it is ok to release the lock here because we are done with the agreement data. + we have to do it before issuing the modify operation because it causes + agmtlist_notify_all to be called which uses the same lock - hence the deadlock */ + PR_Unlock(ra->lock); + + pb = slapi_pblock_new (); + mods[0] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod); + mods[1] = (LDAPMod *)slapi_mod_get_ldapmod_byref(&smod_last_modified); + mods[2] = NULL; + + slapi_modify_internal_set_pb (pb, (char*)slapi_sdn_get_dn(ra->dn), mods, NULL, NULL, + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); + slapi_modify_internal_pb (pb); + + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &rc); + if (rc != LDAP_SUCCESS && rc != LDAP_NO_SUCH_ATTRIBUTE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "%s: agmt_update_consumer_ruv: " + "failed to update consumer's RUV; LDAP error - %d\n", + ra->long_name, rc); + } + + slapi_mod_done (&smod); + slapi_mod_done (&smod_last_modified); + slapi_pblock_destroy (pb); + } + else + PR_Unlock(ra->lock); +} + +CSN* +agmt_get_consumer_schema_csn (Repl_Agmt *ra) +{ + CSN *rt; + + PR_ASSERT(NULL != ra); + + PR_Lock(ra->lock); + rt = ra->consumerSchemaCSN; + PR_Unlock(ra->lock); + + return rt; +} + +void +agmt_set_consumer_schema_csn (Repl_Agmt *ra, CSN *csn) +{ + PR_ASSERT(NULL != ra); + + PR_Lock(ra->lock); + csn_free(&ra->consumerSchemaCSN); + ra->consumerSchemaCSN = csn; + PR_Unlock(ra->lock); +} + +void +agmt_set_last_update_start (Repl_Agmt *ra, time_t start_time) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + ra->last_update_start_time = start_time; + ra->last_update_end_time = 0UL; + } +} + + +void +agmt_set_last_update_end (Repl_Agmt *ra, time_t end_time) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + ra->last_update_end_time = end_time; + } +} + +void +agmt_set_last_init_start (Repl_Agmt *ra, time_t start_time) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + ra->last_init_start_time = start_time; + ra->last_init_end_time = 0UL; + } +} + + +void +agmt_set_last_init_end (Repl_Agmt *ra, time_t end_time) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + ra->last_init_end_time = end_time; + } +} + +void +agmt_set_last_update_status (Repl_Agmt *ra, int ldaprc, int replrc, const char *message) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + if (replrc == NSDS50_REPL_UPTODATE) + { + /* no session started, no status update */ + } + else if (ldaprc != LDAP_SUCCESS) + { + char *replmsg = NULL; + if ( replrc ) { + replmsg = protocol_response2string(replrc); + /* Do not mix the unknown replication error with the known ldap one */ + if ( strcasecmp(replmsg, "unknown error") == 0 ) { + replmsg = NULL; + } + } + if (ldaprc > 0) { + PR_snprintf(ra->last_update_status, STATUS_LEN, + "%d %s%sLDAP error: %s%s%s", + ldaprc, + message?message:"",message?"":" - ", + ldap_err2string(ldaprc), + replmsg ? " - " : "", replmsg ? replmsg : ""); + } else { /* ldaprc is < 0 */ + PR_snprintf(ra->last_update_status, STATUS_LEN, + "%d %s%sSystem error%s%s", + ldaprc,message?message:"",message?"":" - ", + replmsg ? " - " : "", replmsg ? replmsg : ""); + } + } + else if (replrc != 0) + { + if (replrc == NSDS50_REPL_REPLICA_READY) + { + PR_snprintf(ra->last_update_status, STATUS_LEN, "%d %s", + ldaprc, "Replica acquired successfully"); + } + else if (replrc == NSDS50_REPL_REPLICA_BUSY) + { + PR_snprintf(ra->last_update_status, STATUS_LEN, + "%d Can't acquire busy replica", replrc ); + } + else if (replrc == NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED) + { + PR_snprintf(ra->last_update_status, STATUS_LEN, "%d %s", + ldaprc, "Replication session successful"); + } + else if (replrc == NSDS50_REPL_DISABLED) + { + PR_snprintf(ra->last_update_status, STATUS_LEN, "%d Total update aborted: " + "Replication agreement for %s\n can not be updated while the replica is disabled.\n" + "(If the suffix is disabled you must enable it then restart the server for replication to take place).", + replrc, ra->long_name ? ra->long_name : "a replica"); + /* Log into the errors log, as "ra->long_name" is not accessible from the caller */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Total update aborted: Replication agreement for \"%s\" " + "can not be updated while the replica is disabled\n", ra->long_name ? ra->long_name : "a replica"); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "(If the suffix is disabled you must enable it then restart the server for replication to take place).\n"); + } + else + { + PR_snprintf(ra->last_update_status, STATUS_LEN, + "%d Replication error acquiring replica: %s%s%s", + replrc, protocol_response2string(replrc), + message?" - ":"",message?message:""); + } + } + else if (message != NULL) + { + PR_snprintf(ra->last_update_status, STATUS_LEN, "%d %s", ldaprc, message); + } + else { /* agmt_set_last_update_status(0,0,NULL) to reset agmt */ + PR_snprintf(ra->last_update_status, STATUS_LEN, "%d", ldaprc); + } + } +} + +void +agmt_set_last_init_status (Repl_Agmt *ra, int ldaprc, int replrc, const char *message) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + if (ldaprc != LDAP_SUCCESS) + { + char *replmsg = NULL; + if ( replrc ) { + replmsg = protocol_response2string(replrc); + /* Do not mix the unknown replication error with the known ldap one */ + if ( strcasecmp(replmsg, "unknown error") == 0 ) { + replmsg = NULL; + } + } + if (ldaprc > 0) { + PR_snprintf(ra->last_init_status, STATUS_LEN, + "%d %s%sLDAP error: %s%s%s", + ldaprc, + message?message:"",message?"":" - ", + ldap_err2string(ldaprc), + replmsg ? " - " : "", replmsg ? replmsg : ""); + } else { /* ldaprc is < 0 */ + PR_snprintf(ra->last_init_status, STATUS_LEN, + "%d %s%sSystem error%s%s", + ldaprc,message?message:"",message?"":" - ", + replmsg ? " - " : "", replmsg ? replmsg : ""); + } + } + else if (replrc != 0) + { + if (replrc == NSDS50_REPL_REPLICA_READY) + { + PR_snprintf(ra->last_init_status, STATUS_LEN, "%d %s", + ldaprc, "Replica acquired successfully"); + } + else if (replrc == NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED) + { + PR_snprintf(ra->last_init_status, STATUS_LEN, "%d %s", + ldaprc, "Replication session successful"); + } + else if (replrc == NSDS50_REPL_DISABLED) + { + PR_snprintf(ra->last_init_status, STATUS_LEN, "%d Total update aborted: " + "Replication agreement for %s\n can not be updated while the replica is disabled.\n" + "(If the suffix is disabled you must enable it then restart the server for replication to take place).", + replrc, ra->long_name ? ra->long_name : "a replica"); + /* Log into the errors log, as "ra->long_name" is not accessible from the caller */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "Total update aborted: Replication agreement for \"%s\" " + "can not be updated while the replica is disabled\n", ra->long_name ? ra->long_name : "a replica"); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, + "(If the suffix is disabled you must enable it then restart the server for replication to take place).\n"); + } + else + { + PR_snprintf(ra->last_init_status, STATUS_LEN, + "%d Replication error acquiring replica: %s%s%s", + replrc, protocol_response2string(replrc), + message?" - ":"",message?message:""); + } + } + else if (message != NULL) + { + PR_snprintf(ra->last_init_status, STATUS_LEN, "%d %s", ldaprc, message); + } + else { /* agmt_set_last_init_status(0,0,NULL) to reset agmt */ + PR_snprintf(ra->last_init_status, STATUS_LEN, "%d", ldaprc); + } + } +} + + +void +agmt_set_update_in_progress (Repl_Agmt *ra, PRBool in_progress) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + ra->update_in_progress = in_progress; + } +} + +void +agmt_inc_last_update_changecount (Repl_Agmt *ra, ReplicaId rid, int skipped) +{ + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + int i; + + for ( i = 0; i < ra->num_changecounters; i++ ) + { + if ( ra->changecounters[i]->rid == rid ) + break; + } + + if ( i < ra->num_changecounters ) + { + if ( skipped ) + ra->changecounters[i]->num_skipped ++; + else + ra->changecounters[i]->num_replayed ++; + } + else + { + ra->num_changecounters ++; + ra->changecounters[i] = (struct changecounter*) slapi_ch_calloc(1, sizeof(struct changecounter)); + ra->changecounters[i]->rid = rid; + if ( skipped ) + ra->changecounters[i]->num_skipped = 1; + else + ra->changecounters[i]->num_replayed = 1; + } + } +} + +void +agmt_get_changecount_string (Repl_Agmt *ra, char *buf, int bufsize) +{ + char tmp_buf[32]; /* 5 digit RID, 10 digit each replayed and skipped */ + int i; + int buflen = 0; + + *buf = '\0'; + if (NULL != ra) + { + for ( i = 0; i < ra->num_changecounters; i++ ) + { + PR_snprintf (tmp_buf, sizeof(tmp_buf), "%u:%u/%u ", + ra->changecounters[i]->rid, + ra->changecounters[i]->num_replayed, + ra->changecounters[i]->num_skipped); + PR_snprintf (buf+buflen, bufsize-buflen, "%s", tmp_buf); + buflen += strlen (tmp_buf); + } + } +} + +static int +get_agmt_status(Slapi_PBlock *pb, Slapi_Entry* e, Slapi_Entry* entryAfter, + int *returncode, char *returntext, void *arg) +{ + char *time_tmp = NULL; + char changecount_string[BUFSIZ]; + Repl_Agmt *ra = (Repl_Agmt *)arg; + + PR_ASSERT(NULL != ra); + if (NULL != ra) + { + PRBool reapActive = PR_FALSE; + Slapi_DN *replarea_sdn = NULL; + Object *repl_obj = NULL; + + replarea_sdn = agmt_get_replarea(ra); + repl_obj = replica_get_replica_from_dn(replarea_sdn); + slapi_sdn_free(&replarea_sdn); + if (repl_obj) { + Replica *replica = (Replica*)object_get_data (repl_obj); + reapActive = replica_get_tombstone_reap_active(replica); + object_release(repl_obj); + } + slapi_entry_attr_set_int(e, "nsds5replicaReapActive", (int)reapActive); + + /* these values persist in the dse.ldif file, so we delete them + here to avoid multi valued attributes */ + slapi_entry_attr_delete(e, "nsds5replicaLastUpdateStart"); + slapi_entry_attr_delete(e, "nsds5replicaLastUpdateEnd"); + slapi_entry_attr_delete(e, "nsds5replicaChangesSentSinceStartup"); + slapi_entry_attr_delete(e, "nsds5replicaLastUpdateStatus"); + slapi_entry_attr_delete(e, "nsds5replicaUpdateInProgress"); + slapi_entry_attr_delete(e, "nsds5replicaLastInitStart"); + slapi_entry_attr_delete(e, "nsds5replicaLastInitStatus"); + slapi_entry_attr_delete(e, "nsds5replicaLastInitEnd"); + + /* now, add the real values (singly) */ + if (ra->last_update_start_time == 0) + { + slapi_entry_add_string(e, "nsds5replicaLastUpdateStart", "0"); + } + else + { + time_tmp = format_genTime(ra->last_update_start_time); + slapi_entry_add_string(e, "nsds5replicaLastUpdateStart", time_tmp); + slapi_ch_free((void **)&time_tmp); + } + if (ra->last_update_end_time == 0) + { + slapi_entry_add_string(e, "nsds5replicaLastUpdateEnd", "0"); + } + else + { + time_tmp = format_genTime(ra->last_update_end_time); + slapi_entry_add_string(e, "nsds5replicaLastUpdateEnd", time_tmp); + slapi_ch_free((void **)&time_tmp); + } + agmt_get_changecount_string (ra, changecount_string, sizeof (changecount_string) ); + slapi_entry_add_string(e, "nsds5replicaChangesSentSinceStartup", changecount_string); + if (ra->last_update_status[0] == '\0') + { + slapi_entry_add_string(e, "nsds5replicaLastUpdateStatus", "0 No replication sessions started since server startup"); + } + else + { + slapi_entry_add_string(e, "nsds5replicaLastUpdateStatus", ra->last_update_status); + } + slapi_entry_add_string(e, "nsds5replicaUpdateInProgress", ra->update_in_progress ? "TRUE" : "FALSE"); + if (ra->last_init_start_time == 0) + { + slapi_entry_add_string(e, "nsds5replicaLastInitStart", "0"); + } + else + { + time_tmp = format_genTime(ra->last_init_start_time); + slapi_entry_add_string(e, "nsds5replicaLastInitStart", time_tmp); + slapi_ch_free((void **)&time_tmp); + } + if (ra->last_init_end_time == 0) + { + slapi_entry_add_string(e, "nsds5replicaLastInitEnd", "0"); + } + else + { + time_tmp = format_genTime(ra->last_init_end_time); + slapi_entry_add_string(e, "nsds5replicaLastInitEnd", time_tmp); + slapi_ch_free((void **)&time_tmp); + } + if (ra->last_init_status[0] != '\0') + { + slapi_entry_add_string(e, "nsds5replicaLastInitStatus", ra->last_init_status); + } + } + return SLAPI_DSE_CALLBACK_OK; +} + +static void +update_window_state_change_callback (void *arg, PRBool opened) +{ + Repl_Agmt *agmt = (Repl_Agmt*)arg; + + PR_ASSERT (agmt); + + if (opened) + { + prot_notify_window_opened (agmt->protocol); + } + else + { + prot_notify_window_closed (agmt->protocol); + } +} + +ReplicaId +agmt_get_consumer_rid ( Repl_Agmt *agmt, void *conn ) +{ + if ( agmt->consumerRID <= 0 ) { + + char mapping_tree_node[512]; + struct berval **bvals = NULL; + + PR_snprintf ( mapping_tree_node, + sizeof (mapping_tree_node), + "cn=replica,cn=\"%s\",cn=mapping tree,cn=config", + slapi_sdn_get_dn (agmt->replarea) ); + conn_read_entry_attribute ( conn, mapping_tree_node, "nsDS5ReplicaID", &bvals ); + if ( NULL != bvals && NULL != bvals[0] ) { + char *ridstr = slapi_ch_malloc( bvals[0]->bv_len + 1 ); + memcpy ( ridstr, bvals[0]->bv_val, bvals[0]->bv_len ); + ridstr[bvals[0]->bv_len] = '\0'; + agmt->consumerRID = atoi (ridstr); + slapi_ch_free ( (void**) &ridstr ); + ber_bvecfree ( bvals ); + } + } + + return agmt->consumerRID; +} |