From 0e365fd2ba59f54d46b94eb84a12d9c25b2bb560 Mon Sep 17 00:00:00 2001 From: Nathan Kinder Date: Fri, 25 Jan 2008 00:59:00 +0000 Subject: Resolves: 429793 Summary: Fixed crash in replication during bulk import. Use bulk impport code more consistently. --- ldap/servers/plugins/replication/repl5_total.c | 11 +++-- ldap/servers/slapd/add.c | 26 +++++------- ldap/servers/slapd/back-ldbm/import-threads.c | 56 ++++++++++---------------- ldap/servers/slapd/bulk_import.c | 7 ++-- 4 files changed, 41 insertions(+), 59 deletions(-) diff --git a/ldap/servers/plugins/replication/repl5_total.c b/ldap/servers/plugins/replication/repl5_total.c index 80186fd5..0a858dbf 100644 --- a/ldap/servers/plugins/replication/repl5_total.c +++ b/ldap/servers/plugins/replication/repl5_total.c @@ -872,12 +872,11 @@ multimaster_extop_NSDS50ReplicationEntry(Slapi_PBlock *pb) #endif rc = slapi_import_entry (pb, e); - /* slapi_import_entry return an LDAP error in case of problem - * LDAP_BUSY is used to indicate that the import queue is full - * and that flow control must happen to stop the supplier - * from sending entries + /* slapi_import_entry returns an LDAP error in case of a + * problem. If there's a problem, it's our responsibility + * to free the slapi_entry that we're trying to import. */ - if ((rc != LDAP_SUCCESS) && (rc != LDAP_BUSY)) + if (rc != LDAP_SUCCESS) { const char *dn = slapi_entry_get_dn_const(e); slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, @@ -896,7 +895,7 @@ multimaster_extop_NSDS50ReplicationEntry(Slapi_PBlock *pb) rc, connid, opid); } - if ((rc != 0) && (rc != LDAP_BUSY)) + if (rc != 0) { /* just disconnect from the supplier. bulk import is stopped when connection object is destroyed */ diff --git a/ldap/servers/slapd/add.c b/ldap/servers/slapd/add.c index b500e34c..0e5c87cb 100644 --- a/ldap/servers/slapd/add.c +++ b/ldap/servers/slapd/add.c @@ -772,25 +772,19 @@ static void handle_fast_add(Slapi_PBlock *pb, Slapi_Entry *entry) slapi_pblock_set(pb, SLAPI_BULK_IMPORT_STATE, &ret); ret = (*be->be_wire_import)(pb); if (ret != 0) { - if (ret != LDAP_BUSY) { - LDAPDebug(LDAP_DEBUG_ANY, - "wire import: error during import (%d)\n", - ret, 0, 0); - } else { - LDAPDebug(LDAP_DEBUG_TRACE, - "wire import: asking client to wait before resuming (returning LDAP_BUSY)\n", - 0, 0, 0); - } - send_ldap_result(pb, - LDAP_BUSY == ret ? LDAP_BUSY : LDAP_OPERATIONS_ERROR, + LDAPDebug(LDAP_DEBUG_ANY, + "wire import: error during import (%d)\n", + ret, 0, 0); + send_ldap_result(pb, LDAP_OPERATIONS_ERROR, NULL, NULL, 0, NULL); + /* It's our responsibility to free the entry if + * be_wire_import doesn't succeed. */ slapi_entry_free(entry); - if (LDAP_BUSY != ret) { - /* turn off fast replica init -- import is now aborted */ - pb->pb_conn->c_bi_backend = NULL; - pb->pb_conn->c_flags &= ~CONN_FLAG_IMPORT; - } + /* turn off fast replica init -- import is now aborted */ + pb->pb_conn->c_bi_backend = NULL; + pb->pb_conn->c_flags &= ~CONN_FLAG_IMPORT; + return; } diff --git a/ldap/servers/slapd/back-ldbm/import-threads.c b/ldap/servers/slapd/back-ldbm/import-threads.c index add5170a..afbc178d 100644 --- a/ldap/servers/slapd/back-ldbm/import-threads.c +++ b/ldap/servers/slapd/back-ldbm/import-threads.c @@ -1517,15 +1517,11 @@ fail: /* returns 0 on success, or < 0 on error * * on error, the import process is aborted -- so if this returns an error, - * don't try to queue any more entries or you'll be sorry. - * - * flag_block in used to know if this thread should block when - * the fifo is full or return an error LDAP_BUSY - * Typically, import done on from the GUI or the command line will - * block while online import as used by the replication total update - * will not block + * don't try to queue any more entries or you'll be sorry. The caller + * is also responsible for free'ing the passed in entry on error. The + * entry will be consumed on success. */ -static int bulk_import_queue(ImportJob *job, Slapi_Entry *entry, int flag_block) +static int bulk_import_queue(ImportJob *job, Slapi_Entry *entry) { struct backentry *ep = NULL, *old_ep = NULL; int idx; @@ -1569,18 +1565,7 @@ static int bulk_import_queue(ImportJob *job, Slapi_Entry *entry, int flag_block) if (old_ep) { while ((old_ep->ep_refcnt > 0) && !(job->flags & FLAG_ABORT)) { - if (flag_block) - DS_Sleep(PR_MillisecondsToInterval(import_sleep_time)); - else - { - /* DBBD: Argh -- why not just block, what's the benefit to this ?? */ - /* I think that to support pipelining in the transport, we need to block here, */ - /* Otherwise evil things could happen where we say we're busy for operation N, but */ - /* Not for operation N+1, but the sender doesn't find out about this until after sending */ - /* Operation N+2 etc. Seems possible to end up with children processed before parents which won't work. */ - PR_Unlock(job->wire_lock); - return LDAP_BUSY; - } + DS_Sleep(PR_MillisecondsToInterval(import_sleep_time)); } /* the producer could be running thru the fifo while @@ -1589,16 +1574,12 @@ static int bulk_import_queue(ImportJob *job, Slapi_Entry *entry, int flag_block) */ while ((old_ep->ep_id >= job->ready_ID) && !(job->flags & FLAG_ABORT)) { - if (flag_block) - DS_Sleep(PR_MillisecondsToInterval(import_sleep_time)); - else - { - PR_Unlock(job->wire_lock); - return LDAP_BUSY; - } + DS_Sleep(PR_MillisecondsToInterval(import_sleep_time)); } if (job->flags & FLAG_ABORT) { + backentry_clear_entry(ep); /* entry is released in the frontend on failure*/ + backentry_free( &ep ); /* release the backend wrapper, here */ PR_Unlock(job->wire_lock); return -2; } @@ -1618,8 +1599,9 @@ static int bulk_import_queue(ImportJob *job, Slapi_Entry *entry, int flag_block) import_log_notice(job, "WARNING: skipping entry \"%s\"", escape_string(slapi_entry_get_dn(ep->ep_entry), ebuf)); import_log_notice(job, "REASON: entry too large (%d bytes) for " - "the buffer size (%d bytes)", newesize, job->fifo.bsize); - backentry_free(&ep); + "the import buffer size (%d bytes). Try increasing nsslapd-cachememsize.", newesize, job->fifo.bsize); + backentry_clear_entry(ep); /* entry is released in the frontend on failure*/ + backentry_free( &ep ); /* release the backend wrapper, here */ PR_Unlock(job->wire_lock); return -1; } @@ -1677,7 +1659,13 @@ void factory_destructor(void *extension, void *object, void *parent) return; } -/* plugin entry function for replica init */ +/* plugin entry function for replica init + * + * For the SLAPI_BI_STATE_ADD state: + * On success (rc=0), the entry in pb->pb_import_entry will be + * consumed. For any other return value, the caller is + * responsible for freeing the entry in the pb. + */ int ldbm_back_wire_import(Slapi_PBlock *pb) { struct ldbminfo *li; @@ -1710,12 +1698,12 @@ int ldbm_back_wire_import(Slapi_PBlock *pb) if (! import_entry_belongs_here(pb->pb_import_entry, job->inst->inst_be)) { /* silently skip */ + /* We need to consume pb->pb_import_entry on success, so we free it here. */ + slapi_entry_free(pb->pb_import_entry); return 0; } - /* These days, we don't want to return LDAP_BUSY (it makes pipelineing impossible - and actually doesn't achieve anything anyway). So we pass '1' for the block flag. */ - return bulk_import_queue(job, pb->pb_import_entry, - 1); + + return bulk_import_queue(job, pb->pb_import_entry); } thread = job->main_thread; diff --git a/ldap/servers/slapd/bulk_import.c b/ldap/servers/slapd/bulk_import.c index 5e158d36..717ff477 100644 --- a/ldap/servers/slapd/bulk_import.c +++ b/ldap/servers/slapd/bulk_import.c @@ -176,10 +176,11 @@ process_bulk_import_op (Slapi_PBlock *pb, int state, Slapi_Entry *e) rc = be->be_wire_import (pb); if (rc != 0) { - if (rc != LDAP_BUSY) - slapi_log_error(SLAPI_LOG_FATAL, NULL, "slapi_start_bulk_import: " + /* The caller will free the entry (e), so we just + * leave it alone here. */ + slapi_log_error(SLAPI_LOG_FATAL, NULL, "slapi_start_bulk_import: " "failed; error = %d\n", rc); - return (LDAP_BUSY == rc ? LDAP_BUSY : LDAP_OPERATIONS_ERROR); + return LDAP_OPERATIONS_ERROR; } return LDAP_SUCCESS; -- cgit