diff options
Diffstat (limited to 'ldap/servers/plugins/replication/repl_extop.c')
-rw-r--r-- | ldap/servers/plugins/replication/repl_extop.c | 1134 |
1 files changed, 1134 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c new file mode 100644 index 00000000..b13ad6ac --- /dev/null +++ b/ldap/servers/plugins/replication/repl_extop.c @@ -0,0 +1,1134 @@ +/** BEGIN COPYRIGHT BLOCK + * Copyright 2001 Sun Microsystems, Inc. + * Portions copyright 1999, 2001-2003 Netscape Communications Corporation. + * All rights reserved. + * END COPYRIGHT BLOCK **/ + +#include "slapi-plugin.h" +#include "repl.h" +#include "repl5.h" +#include "repl5_prot_private.h" +#include "cl5_api.h" + + +/* + * repl_extop.c - there are two types of functions in this file: + * - Code that implements an extended operation plugin. + * The replication DLL arranges for this code to + * be called when a StartNSDS50ReplicationRequest + * or an EndNSDS50ReplicationRequest extended operation + * is received. + * - Code that sends extended operations on an already- + * established client connection. + * + * The requestValue portion of the StartNSDS50ReplicationRequest + * looks like this: + * + * requestValue ::= SEQUENCE { + * replProtocolOID LDAPOID, + * replicatedTree LDAPDN, + supplierRUV OCTET STRING + * referralURLs SET of LDAPURL OPTIONAL + * csn OCTET STRING OPTIONAL + * } + * + */ +static int check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv); + +static int +encode_ruv (BerElement *ber, const RUV *ruv) +{ + int rc = LDAP_SUCCESS; + struct berval **bvals = NULL; + + PR_ASSERT (ber); + PR_ASSERT (ruv); + + if (ruv_to_bervals(ruv, &bvals) != 0) + { + rc = LDAP_OPERATIONS_ERROR; + goto done; + } + + if (ber_printf(ber, "[V]", bvals) == -1) + { + rc = LDAP_ENCODING_ERROR; + goto done; + } + + rc = LDAP_SUCCESS; + +done: + if (bvals) + ber_bvecfree (bvals); + + return rc; +} + +static struct berval * +create_NSDS50ReplicationExtopPayload(const char *protocol_oid, + const char *repl_root, char **extra_referrals, CSN *csn, + int send_end) +{ + struct berval *req_data = NULL; + BerElement *tmp_bere = NULL; + int rc = 0; + const char *csnstr = NULL; + Object *repl_obj, *ruv_obj = NULL; + Replica *repl; + RUV *ruv; + Slapi_DN *sdn; + + PR_ASSERT(protocol_oid != NULL || send_end); + PR_ASSERT(repl_root != NULL); + + /* Create the request data */ + + if ((tmp_bere = der_alloc()) == NULL) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + if (!send_end) + { + if (ber_printf(tmp_bere, "{ss", protocol_oid, repl_root) == -1) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + } + else + { + if (ber_printf(tmp_bere, "{s", repl_root) == -1) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + } + + sdn = slapi_sdn_new_dn_byref(repl_root); + repl_obj = replica_get_replica_from_dn (sdn); + if (repl_obj == NULL) + { + rc = LDAP_OPERATIONS_ERROR; + goto loser; + } + + repl = (Replica*)object_get_data (repl_obj); + PR_ASSERT (repl); + ruv_obj = replica_get_ruv (repl); + if (ruv_obj == NULL) + { + rc = LDAP_OPERATIONS_ERROR; + goto loser; + } + ruv = object_get_data(ruv_obj); + PR_ASSERT(ruv); + + /* send supplier's ruv so that consumer can build its own referrals. + In case of total protocol, it is also used as consumer's ruv once + protocol successfully completes */ + /* We need to encode and send each time the local ruv in case we have changed it */ + rc = encode_ruv (tmp_bere, ruv); + if (rc != 0) + { + goto loser; + } + + if (!send_end) + { + char s[CSN_STRSIZE]; + ReplicaId rid; + char *local_replica_referral[2] = {0}; + char **referrals_to_send = NULL; + /* Add the referral URL(s), if present */ + rid = replica_get_rid(repl); + if (!ruv_contains_replica(ruv, rid)) + { + /* + * In the event that there is no RUV component for this replica (e.g. + * if the database was just loaded from LDIF and no local CSNs have been + * generated), then we need to explicitly add this server to the list + * of referrals, since it wouldn't have been sent with the RUV. + */ + local_replica_referral[0] = (char *)multimaster_get_local_purl(); /* XXXggood had to cast away const */ + } + charray_merge(&referrals_to_send, extra_referrals, 0); + charray_merge(&referrals_to_send, local_replica_referral, 0); + if (NULL != referrals_to_send) + { + if (ber_printf(tmp_bere, "[v]", referrals_to_send) == -1) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + slapi_ch_free((void **)&referrals_to_send); + } + /* Add the CSN */ + PR_ASSERT(NULL != csn); + if (ber_printf(tmp_bere, "s", csnstr = csn_as_string(csn,PR_FALSE,s)) == -1) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + } + + if (ber_printf(tmp_bere, "}") == -1) + { + rc = LDAP_ENCODING_ERROR; + goto loser; + } + + if (ber_flatten(tmp_bere, &req_data) == -1) + { + rc = LDAP_LOCAL_ERROR; + goto loser; + } + /* Success */ + goto done; + +loser: + /* Free stuff we allocated */ + if (NULL != req_data) + { + ber_bvfree(req_data); req_data = NULL; + } + +done: + if (NULL != tmp_bere) + { + ber_free(tmp_bere, 1); tmp_bere = NULL; + } + if (NULL != sdn) + { + slapi_sdn_free (&sdn); /* Put on stack instead of allocating? */ + } + if (NULL != repl_obj) + { + object_release (repl_obj); + } + if (NULL != ruv_obj) + { + object_release (ruv_obj); + } + return req_data; +} + + +struct berval * +NSDS50StartReplicationRequest_new(const char *protocol_oid, + const char *repl_root, char **extra_referrals, CSN *csn) +{ + return(create_NSDS50ReplicationExtopPayload(protocol_oid, + repl_root, extra_referrals, csn, 0)); +} + +struct berval * +NSDS50EndReplicationRequest_new(char *repl_root) +{ + return(create_NSDS50ReplicationExtopPayload(NULL, repl_root, NULL, NULL, 1)); +} + +static int +decode_ruv (BerElement *ber, RUV **ruv) +{ + int rc = -1; + struct berval **bvals = NULL; + + PR_ASSERT (ber && ruv); + + if (ber_scanf(ber, "[V]", &bvals) == -1) + { + goto done; + } + + if (ruv_init_from_bervals(bvals, ruv) != 0) + { + goto done; + } + + rc = 0; +done: + if (bvals) + ber_bvecfree (bvals); + + return rc; +} + +/* + * Decode an NSDS50 Start Replication Request extended + * operation. Returns 0 on success, -1 on decoding error. + * The caller is responsible for freeing protocol_oid, + * repl_root, referrals, and csn. + */ +static int +decode_startrepl_extop(Slapi_PBlock *pb, char **protocol_oid, char **repl_root, + RUV **supplier_ruv, char ***extra_referrals, char **csnstr) +{ + char *extop_oid = NULL; + struct berval *extop_value = NULL; + BerElement *tmp_bere = NULL; + unsigned long len; + int rc = 0; + + PR_ASSERT (pb && protocol_oid && repl_root && supplier_ruv && extra_referrals && csnstr); + + *protocol_oid = NULL; + *repl_root = NULL; + *supplier_ruv = NULL; + *extra_referrals = NULL; + *csnstr = NULL; + + slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid); + slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value); + + if (NULL == extop_oid || + strcmp(extop_oid, REPL_START_NSDS50_REPLICATION_REQUEST_OID) != 0 || + NULL == extop_value) + { + /* bogus */ + rc = -1; + goto free_and_return; + } + + if ((tmp_bere = ber_init(extop_value)) == NULL) + { + rc = -1; + goto free_and_return; + } + if (ber_scanf(tmp_bere, "{") == -1) + { + rc = -1; + goto free_and_return; + } + /* Get the required protocol OID and root of replicated subtree */ + if (ber_get_stringa(tmp_bere, protocol_oid) == -1) + { + rc = -1; + goto free_and_return; + } + if (ber_get_stringa(tmp_bere, repl_root) == -1) + { + rc = -1; + goto free_and_return; + } + + /* get supplier's ruv */ + if (decode_ruv (tmp_bere, supplier_ruv) == -1) + { + rc = -1; + goto free_and_return; + } + + /* Get the optional set of referral URLs */ + if (ber_peek_tag(tmp_bere, &len) == LBER_SET) + { + if (ber_scanf(tmp_bere, "[v]", extra_referrals) == -1) + { + rc = -1; + goto free_and_return; + } + } + /* Get the optional CSN */ + if (ber_peek_tag(tmp_bere, &len) == LBER_OCTETSTRING) + { + if (ber_get_stringa(tmp_bere, csnstr) == -1) + { + rc = -1; + goto free_and_return; + } + } + if (ber_scanf(tmp_bere, "}") == -1) + { + rc = -1; + goto free_and_return; + } + +free_and_return: + if (-1 == rc) + { + /* Free everything when error encountered */ + + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free ((void**)protocol_oid); + slapi_ch_free ((void**)repl_root); + slapi_ch_free ((void **)extra_referrals); + slapi_ch_free ((void**)csnstr); + + if (*supplier_ruv) + { + ruv_destroy (supplier_ruv); + } + + } + if (NULL != tmp_bere) + { + ber_free(tmp_bere, 1); + tmp_bere = NULL; + } + + return rc; +} + + +/* + * Decode an NSDS50 End Replication Request extended + * operation. Returns 0 on success, -1 on decoding error. + * The caller is responsible for freeing repl_root. + */ +static int +decode_endrepl_extop(Slapi_PBlock *pb, char **repl_root) +{ + char *extop_oid = NULL; + struct berval *extop_value = NULL; + BerElement *tmp_bere = NULL; + int rc = 0; + + slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_OID, &extop_oid); + slapi_pblock_get(pb, SLAPI_EXT_OP_REQ_VALUE, &extop_value); + + if (NULL == extop_oid || + strcmp(extop_oid, REPL_END_NSDS50_REPLICATION_REQUEST_OID) != 0 || + NULL == extop_value) + { + /* bogus */ + rc = -1; + goto free_and_return; + } + + if ((tmp_bere = ber_init(extop_value)) == NULL) + { + rc = -1; + goto free_and_return; + } + if (ber_scanf(tmp_bere, "{") == -1) + { + rc = -1; + goto free_and_return; + } + /* Get the required root of replicated subtree */ + if (ber_get_stringa(tmp_bere, repl_root) == -1) + { + rc = -1; + goto free_and_return; + } + if (ber_scanf(tmp_bere, "}") == -1) + { + rc = -1; + goto free_and_return; + } + +free_and_return: + if (NULL != tmp_bere) + { + ber_free(tmp_bere, 1); + tmp_bere = NULL; + } + + return rc; +} + + + + +/* + * Decode an NSDS50ReplicationResponse extended response. + * The extended response just contains a sequence that contains: + * 1) An integer response code + * 2) An optional array of bervals representing the consumer + * replica's update vector + * Returns 0 on success, or -1 if the response could not be parsed. + */ +int +decode_repl_ext_response(struct berval *data, int *response_code, + struct berval ***ruv_bervals) +{ + BerElement *tmp_bere = NULL; + int return_value = 0; + + PR_ASSERT(NULL != response_code); + PR_ASSERT(NULL != ruv_bervals); + + if (NULL == data || NULL == response_code || NULL == ruv_bervals) + { + return_value = -1; + } + else + { + unsigned long len, tag = 0; + long temp_response_code = 0; + *ruv_bervals = NULL; + if ((tmp_bere = ber_init(data)) == NULL) + { + return_value = -1; + } + else if (ber_scanf(tmp_bere, "{e", &temp_response_code) == -1) + { + return_value = -1; + } + else if ((tag = ber_peek_tag(tmp_bere, &len)) == LBER_SEQUENCE) + { + if (ber_scanf(tmp_bere, "{V}}", ruv_bervals) == -1) + { + return_value = -1; + } + } else if (ber_scanf(tmp_bere, "}") == -1) + { + return_value = -1; + } + *response_code = (int)temp_response_code; + } + if (0 != return_value) + { + if (NULL != *ruv_bervals) + { + ber_bvecfree(*ruv_bervals); + } + } + if (NULL != tmp_bere) + { + ber_free(tmp_bere, 1); tmp_bere = NULL; + } + return return_value; +} + + +/* + * This plugin entry point is called whenever a + * StartNSDS50ReplicationRequest is received. + */ +int +multimaster_extop_StartNSDS50ReplicationRequest(Slapi_PBlock *pb) +{ + int return_value = SLAPI_PLUGIN_EXTENDED_NOT_HANDLED; + int response = 0; + int rc = 0; + BerElement *resp_bere = NULL; + struct berval *resp_bval = NULL; + char *protocol_oid = NULL; + char *repl_root = NULL; + Slapi_DN *repl_root_sdn = NULL; + char **referrals = NULL; + Object *replica_object = NULL; + Replica *replica = NULL; + void *conn; + consumer_connection_extension *connext = NULL; + CSN *mycsn = NULL; + char *replicacsnstr = NULL; + CSN *replicacsn = NULL; + int zero = 0; + int one = 1; + RUV *ruv = NULL; + struct berval **ruv_bervals = NULL; + CSNGen *gen = NULL; + Object *gen_obj = NULL; + Slapi_DN *bind_sdn = NULL; + char *bind_dn = NULL; + Object *ruv_object = NULL; + RUV *supplier_ruv = NULL; + int connid, opid; + PRBool isInc = PR_FALSE; /* true if incremental update */ + char *locking_purl = NULL; /* the supplier contacting us */ + char *current_purl = NULL; /* the supplier which already has exclusive access */ + char locking_session[24]; + + /* Decode the extended operation */ + if (decode_startrepl_extop(pb, &protocol_oid, &repl_root, &supplier_ruv, + &referrals, &replicacsnstr) == -1) + { + response = NSDS50_REPL_DECODING_ERROR; + goto send_response; + } + if (NULL == protocol_oid || NULL == repl_root || NULL == replicacsnstr) + { + response = NSDS50_REPL_DECODING_ERROR; + goto send_response; + } + + connid = 0; + slapi_pblock_get(pb, SLAPI_CONN_ID, &connid); + opid = 0; + slapi_pblock_get(pb, SLAPI_OPERATION_ID, &opid); + + /* + * Get a hold of the connection extension object and + * make sure it's there. + */ + slapi_pblock_get(pb, SLAPI_CONNECTION, &conn); + connext = (consumer_connection_extension *)repl_con_get_ext( + REPL_CON_EXT_CONN, conn); + if (NULL == connext) + { + /* Something bad happened. Don't go any further */ + response = NSDS50_REPL_INTERNAL_ERROR; + goto send_response; + } + + /* Verify that we know about this replication protocol OID */ + if (strcmp(protocol_oid, REPL_NSDS50_INCREMENTAL_PROTOCOL_OID) == 0) + { + /* Stash info that this is an incremental update session */ + connext->repl_protocol_version = REPL_PROTOCOL_50_INCREMENTAL; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": Begin incremental protocol\n", + connid, opid, repl_root); + isInc = PR_TRUE; + } + else if (strcmp(protocol_oid, REPL_NSDS50_TOTAL_PROTOCOL_OID) == 0) + { + /* Stash info that this is a total update session */ + if (NULL != connext) + { + connext->repl_protocol_version = REPL_PROTOCOL_50_TOTALUPDATE; + } + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": Begin total protocol\n", + connid, opid, repl_root); + isInc = PR_FALSE; + } + else + { + /* Unknown replication protocol */ + response = NSDS50_REPL_UNKNOWN_UPDATE_PROTOCOL; + goto send_response; + } + + /* Verify that repl_root names a valid replicated area */ + if ((repl_root_sdn = slapi_sdn_new_dn_byval(repl_root)) == NULL) + { + response = NSDS50_REPL_INTERNAL_ERROR; + goto send_response; + } + + /* see if this replica is being configured and wait for it */ + if (replica_is_being_configured(repl_root)) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d replica=\"%s\": " + "Replica is being configured: try again later\n", + connid, opid, repl_root); + response = NSDS50_REPL_REPLICA_BUSY; + goto send_response; + } + + replica_object = replica_get_replica_from_dn(repl_root_sdn); + if (NULL != replica_object) + { + replica = object_get_data(replica_object); + } + if (NULL == replica) + { + response = NSDS50_REPL_NO_SUCH_REPLICA; + goto send_response; + } + + /* check that this replica is not a 4.0 consumer */ + if (replica_is_legacy_consumer (replica)) + { + response = NSDS50_REPL_LEGACY_CONSUMER; + goto send_response; + } + + /* Check that bind dn is authorized to supply replication updates */ + slapi_pblock_get(pb, SLAPI_CONN_DN, &bind_dn); /* bind_dn is allocated */ + bind_sdn = slapi_sdn_new_dn_passin(bind_dn); + if (replica_is_updatedn(replica, bind_sdn) == PR_FALSE) + { + response = NSDS50_REPL_PERMISSION_DENIED; + goto send_response; + } + + /* Check received CSN for clock skew */ + gen_obj = replica_get_csngen(replica); + if (NULL != gen_obj) + { + gen = object_get_data(gen_obj); + if (NULL != gen) + { + if (csngen_new_csn(gen, &mycsn, PR_FALSE /* notify */) == CSN_SUCCESS) + { + replicacsn = csn_new_by_string(replicacsnstr); + if (NULL != replicacsn) + { + /* ONREPL - we used to manage clock skew here. However, csn generator + code already does it. The csngen also manages local skew caused by + system clock reset, so to keep it consistent, I removed code from here */ + time_t diff = 0L; + diff = csn_time_difference(mycsn, replicacsn); + if (diff > 0) + { + /* update the state of the csn generator */ + rc = csngen_adjust_time (gen, replicacsn); + if (rc == CSN_LIMIT_EXCEEDED) /* too much skew */ + { + response = NSDS50_REPL_EXCESSIVE_CLOCK_SKEW; + goto send_response; + } + } + else if (diff <= 0) + { + /* Supplier's clock is behind ours */ + /* XXXggood check if CSN smaller than purge point */ + /* response = NSDS50_REPL_BELOW_PURGEPOINT; */ + /* goto send_response; */ + } + } + else + { + /* Oops, csnstr couldn't be converted */ + response = NSDS50_REPL_INTERNAL_ERROR; + goto send_response; + } + } + else + { + /* Oops, csn generator failed */ + response = NSDS50_REPL_INTERNAL_ERROR; + goto send_response; + } + + /* update csn generator's state from the supplier's ruv */ + rc = replica_update_csngen_state (replica, supplier_ruv); /* too much skew */ + if (rc != 0) + { + response = NSDS50_REPL_EXCESSIVE_CLOCK_SKEW; + goto send_response; + } + } + else + { + /* Oops, no csn generator */ + response = NSDS50_REPL_INTERNAL_ERROR; + goto send_response; + } + } + else + { + /* Oops, no csn generator object */ + response = NSDS50_REPL_INTERNAL_ERROR; + goto send_response; + } + + if (check_replica_id_uniqueness(replica, supplier_ruv) != 0){ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": " + "Replica has same replicaID %d as supplier\n", + connid, opid, repl_root, replica_get_rid(replica)); + response = NSDS50_REPL_REPLICAID_ERROR; + goto send_response; + } + + /* Attempt to acquire exclusive access to the replicated area */ + /* Since partial URL is always the master, this locking_purl does not + * help us to know the true locker when it is a hub. Change to use + * the session's conn id and op id to identify the the supplier. + */ + /* junkrc = ruv_get_first_id_and_purl(supplier_ruv, &junkrid, &locking_purl); */ + sprintf(locking_session, "conn=%d id=%d", connid, opid); + locking_purl = &locking_session[0]; + if (replica_get_exclusive_access(replica, &isInc, connid, opid, + locking_purl, + ¤t_purl) == PR_FALSE) + { + locking_purl = NULL; /* no dangling pointers */ + response = NSDS50_REPL_REPLICA_BUSY; + goto send_response; + } + else + { + locking_purl = NULL; /* no dangling pointers */ + /* Stick the replica object pointer in the connection extension */ + connext->replica_acquired = (void *)replica_object; + replica_object = NULL; + } + + /* If this is incremental protocol get replica's ruv to return to the supplier */ + if (connext->repl_protocol_version == REPL_PROTOCOL_50_INCREMENTAL) + { + ruv_object = replica_get_ruv(replica); + if (NULL != ruv_object) + { + ruv = object_get_data(ruv_object); + (void)ruv_to_bervals(ruv, &ruv_bervals); + object_release(ruv_object); + } + } + + /* + * Save the supplier ruv in the connection extension so it can + * either (a) be installed upon successful initialization (if this + * is a total update session) or used to update referral information + * for new replicas that show up in the supplier's RUV. + */ + /* + * the supplier_ruv may have been set before, so free it here + * (in ruv_copy_and_destroy) + */ + ruv_copy_and_destroy(&supplier_ruv, (RUV **)&connext->supplier_ruv); + + if (connext->repl_protocol_version == REPL_PROTOCOL_50_INCREMENTAL) + { + /* The supplier ruv may have changed, so let's update the referrals */ + consumer5_set_mapping_tree_state_for_replica(replica, connext->supplier_ruv); + } + else /* full protocol */ + { + char *mtnstate = slapi_mtn_get_state(repl_root_sdn); + char **mtnreferral = slapi_mtn_get_referral(repl_root_sdn); + + /* richm 20010831 - set the mapping tree to the referral state *before* + we invoke slapi_start_bulk_import - see bug 556992 - + slapi_start_bulk_import sets the database offline, if an operation comes + in while the database is offline but the mapping tree is not referring yet, + the server gets confused + */ + /* During a total update we refer *all* operations */ + repl_set_mtn_state_and_referrals(repl_root_sdn, STATE_REFERRAL, + connext->supplier_ruv, NULL, referrals); + /* LPREPL - check the return code. + * But what do we do if mapping tree could not be updated ? */ + + /* start the bulk import */ + slapi_pblock_set (pb, SLAPI_TARGET_DN, repl_root); + rc = slapi_start_bulk_import (pb); + if (rc != LDAP_SUCCESS) + { + response = NSDS50_REPL_INTERNAL_ERROR; + /* reset the mapping tree state to what it was before + we tried to do the bulk import */ + repl_set_mtn_state_and_referrals(repl_root_sdn, mtnstate, + NULL, NULL, mtnreferral); + slapi_ch_free_string(&mtnstate); + charray_free(mtnreferral); + mtnreferral = NULL; + + goto send_response; + } + slapi_ch_free_string(&mtnstate); + charray_free(mtnreferral); + mtnreferral = NULL; + } + + response = NSDS50_REPL_REPLICA_READY; + /* Set the "is replication session" flag in the connection extension */ + slapi_pblock_set( pb, SLAPI_CONN_IS_REPLICATION_SESSION, &one ); + connext->isreplicationsession = 1; + /* Save away the connection */ + slapi_pblock_get(pb, SLAPI_CONNECTION, &connext->connection); + +send_response: + if (response != NSDS50_REPL_REPLICA_READY) + { + int resp_log_level = SLAPI_LOG_FATAL; + char purlstr[1024] = {0}; + if (current_purl) + sprintf(purlstr, " locked by %s for %s update", current_purl, + isInc ? "incremental" : "total"); + + /* Don't log replica busy as errors - these are almost always not + errors - use the replication monitoring tools to determine if + a replica is not converging, then look for pathological replica + busy errors by turning on the replication log level */ + if (response == NSDS50_REPL_REPLICA_BUSY) { + resp_log_level = SLAPI_LOG_REPL; + } + + slapi_log_error (resp_log_level, repl_plugin_name, + "conn=%d op=%d replica=\"%s\": " + "Unable to acquire replica: error: %s%s\n", + connid, opid, + (replica ? slapi_sdn_get_dn(replica_get_root(replica)) : "unknown"), + protocol_response2string (response), purlstr); + } + /* Send the response */ + if ((resp_bere = der_alloc()) == NULL) + { + /* ONREPL - not sure what we suppose to do here */ + } + ber_printf(resp_bere, "{e", response); + if (NULL != ruv_bervals) + { + ber_printf(resp_bere, "{V}", ruv_bervals); + } + ber_printf(resp_bere, "}"); + ber_flatten(resp_bere, &resp_bval); + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS50_REPLICATION_RESPONSE_OID); + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "conn=%d op=%d repl=\"%s\": " + "StartNSDS50ReplicationRequest: response=%d rc=%d\n", + connid, opid, repl_root, + response, rc); + slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL); + + return_value = SLAPI_PLUGIN_EXTENDED_SENT_RESULT; + + slapi_ch_free_string(¤t_purl); + + /* protocol_oid */ + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free((void **)&protocol_oid); + + /* repl_root */ + slapi_ch_free((void **)&repl_root); + + /* supplier's ruv */ + if (supplier_ruv) + { + ruv_destroy (&supplier_ruv); + } + /* referrals */ + slapi_ch_free((void **)&referrals); + + /* replicacsnstr */ + slapi_ch_free((void **)&replicacsnstr); + + /* repl_root_sdn */ + if (NULL != repl_root_sdn) + { + slapi_sdn_free(&repl_root_sdn); + } + if (NSDS50_REPL_REPLICA_READY != response) + { + /* + * Something went wrong, and we never told the other end that the + * replica had been acquired, so we'd better release it. + */ + if (NULL != connext && NULL != connext->replica_acquired) + { + Object *r_obj = (Object*)connext->replica_acquired; + replica_relinquish_exclusive_access((Replica*)object_get_data (r_obj), + connid, opid); + } + /* Remove any flags that would indicate repl session in progress */ + if (NULL != connext) + { + connext->repl_protocol_version = REPL_PROTOCOL_UNKNOWN; + connext->isreplicationsession = 0; + } + slapi_pblock_set( pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero ); + } + /* Release reference to replica_object */ + if (NULL != replica_object) + { + object_release(replica_object); + } + /* bind_sdn */ + if (NULL != bind_sdn) + { + slapi_sdn_free(&bind_sdn); + } + /* Release reference to gen_obj */ + if (NULL != gen_obj) + { + object_release(gen_obj); + } + /* mycsn */ + if (NULL != mycsn) + { + csn_free(&mycsn); + } + /* replicacsn */ + if (NULL != replicacsn) + { + csn_free(&replicacsn); + } + /* resp_bere */ + if (NULL != resp_bere) + { + ber_free(resp_bere, 1); + } + /* resp_bval */ + if (NULL != resp_bval) + { + ber_bvfree(resp_bval); + } + /* ruv_bervals */ + if (NULL != ruv_bervals) + { + ber_bvecfree(ruv_bervals); + } + + return return_value; +} + +/* + * This plugin entry point is called whenever an + * EndNSDS50ReplicationRequest is received. + * XXXggood this code is not finished. + */ +int +multimaster_extop_EndNSDS50ReplicationRequest(Slapi_PBlock *pb) +{ + int return_value = SLAPI_PLUGIN_EXTENDED_NOT_HANDLED; + char *repl_root = NULL; + BerElement *resp_bere = NULL; + struct berval *resp_bval = NULL; + int response; + void *conn; + consumer_connection_extension *connext = NULL; + int rc; + int connid=-1, opid=-1; + + /* Decode the extended operation */ + if (decode_endrepl_extop(pb, &repl_root) == -1) + { + response = NSDS50_REPL_DECODING_ERROR; + } + else + { + + /* First, verify that the current connection is a replication session */ + /* XXXggood - do we need to wait around for any pending updates to complete? + I suppose it's possible that the end request may arrive asynchronously, before + we're really done processing all the updates. + */ + /* Get a hold of the connection extension object */ + slapi_pblock_get(pb, SLAPI_CONNECTION, &conn); + connext = (consumer_connection_extension *)repl_con_get_ext( + REPL_CON_EXT_CONN, conn); + if (NULL != connext && NULL != connext->replica_acquired) + { + int zero= 0; + Replica *r = (Replica*)object_get_data ((Object*)connext->replica_acquired); + + /* if this is total protocol we need to install suppliers ruv for the replica */ + if (connext->repl_protocol_version == REPL_PROTOCOL_50_TOTALUPDATE) + { + /* We no longer need to refer all operations... + * and update the referrals on the mapping tree node + */ + consumer5_set_mapping_tree_state_for_replica(r, NULL); + + /* LPREPL - First we clear the total in progress flag + Like this we know it's a normal termination of import. This is required by + the replication function that responds to backend state change. + If the flag is not clear, the callback knows that replication should not be + enabled again */ + replica_set_state_flag(r, REPLICA_TOTAL_IN_PROGRESS, PR_TRUE /* clear flag */); + + slapi_pblock_set (pb, SLAPI_TARGET_DN, repl_root); + slapi_stop_bulk_import (pb); + + /* ONREPL - this is a bit of a hack. Once bulk import is finished, + the replication function that responds to backend state change + will be called. That function normally do all ruv and changelog + processing. However, in the case of replica initalization, it + will not do the right thing because supplier does not send its + ruv tombstone to the consumer. So that's why we need to do the + second processing here. + The supplier does not send its RUV entry because it could be + more up to date then the data send to the consumer. + The best solution I think, would be to "fake" on the supplier + an entry that corresponds to the ruv sent to the consumer and then + send it as part of the data */ + + if (cl5GetState () == CL5_STATE_OPEN) + { + rc = cl5DeleteDBSync (connext->replica_acquired); + } + + replica_set_ruv (r, connext->supplier_ruv); + connext->supplier_ruv = NULL; + + /* if changelog is enabled, we need to log a dummy change for the + smallest csn in the new ruv, so that this replica ca supply + other servers. + */ + if (cl5GetState () == CL5_STATE_OPEN) + { + replica_log_ruv_elements (r); + } + + /* ONREPL code that dealt with new RUV, etc was moved into the code + that enables replication when a backend comes back online. This + code is called once the bulk import is finished */ + } + else if (connext->repl_protocol_version == REPL_PROTOCOL_50_INCREMENTAL) + { + /* The ruv from the supplier may have changed. Report the change on the + consumer side */ + + replica_update_ruv_consumer(r, connext->supplier_ruv); + } + + /* Relinquish control of the replica */ + slapi_pblock_get (pb, SLAPI_OPERATION_ID, &opid); + if (opid) slapi_pblock_get (pb, SLAPI_CONN_ID, &connid); + replica_relinquish_exclusive_access(r, connid, opid); + object_release ((Object*)connext->replica_acquired); + connext->replica_acquired = NULL; + connext->isreplicationsession= 0; + slapi_pblock_set( pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero ); + response = NSDS50_REPL_REPLICA_RELEASE_SUCCEEDED; + /* Outbound replication agreements need to all be restarted now */ + /* XXXGGOOD RESTART REEPL AGREEMENTS */ + } + } + + /* Send the response code */ + if ((resp_bere = der_alloc()) == NULL) + { + rc = LDAP_ENCODING_ERROR; + goto free_and_return; + } + ber_printf(resp_bere, "{e}", response); + ber_flatten(resp_bere, &resp_bval); + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_OID, REPL_NSDS50_REPLICATION_RESPONSE_OID); + slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval); + slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL); + + return_value = SLAPI_PLUGIN_EXTENDED_SENT_RESULT; + +free_and_return: + /* repl_root */ + slapi_ch_free((void **)&repl_root); + + /* BerElement */ + if (NULL != resp_bere) + { + ber_free(resp_bere, 1); + } + /* response */ + if (NULL != resp_bval) + { + ber_bvfree(resp_bval); + } + + return return_value; +} + +/* + * This plugin entry point is a noop entry + * point. It's used when registering extops that + * are only used as responses. We'll never receive + * one of those, unsolicited, but we still want to + * register them so they appear in the + * supportedextension attribute in the root DSE. + */ +int +extop_noop(Slapi_PBlock *pb) +{ + return SLAPI_PLUGIN_EXTENDED_NOT_HANDLED; +} + + +static int +check_replica_id_uniqueness(Replica *replica, RUV *supplier_ruv) +{ + ReplicaId local_rid = replica_get_rid(replica); + ReplicaId sup_rid = 0; + char *sup_purl = NULL; + + if (ruv_get_first_id_and_purl(supplier_ruv, &sup_rid, &sup_purl) == RUV_SUCCESS) { + /* ReplicaID Uniqueness is checked only on Masters */ + if ((replica_get_type(replica) == REPLICA_TYPE_UPDATABLE) && + (sup_rid == local_rid)) { + return 1; + } + } + return 0; +} + + + |