summaryrefslogtreecommitdiffstats
path: root/ldap/servers/slapd/back-ldbm/import.c
diff options
context:
space:
mode:
Diffstat (limited to 'ldap/servers/slapd/back-ldbm/import.c')
-rw-r--r--ldap/servers/slapd/back-ldbm/import.c331
1 files changed, 216 insertions, 115 deletions
diff --git a/ldap/servers/slapd/back-ldbm/import.c b/ldap/servers/slapd/back-ldbm/import.c
index df0fa5d5..8b66705f 100644
--- a/ldap/servers/slapd/back-ldbm/import.c
+++ b/ldap/servers/slapd/back-ldbm/import.c
@@ -51,6 +51,7 @@
#include "import.h"
#define ERR_IMPORT_ABORTED -23
+#define DRYRUN_QUIT -24
/********** routines to manipulate the entry fifo **********/
@@ -193,8 +194,16 @@ void import_log_notice(ImportJob *job, char *format, ...)
slapi_task_log_notice(job->task, "%s", buffer);
}
/* also save it in the logs for posterity */
- LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
+ if (job->flags & FLAG_UPGRADEDNFORMAT) {
+ LDAPDebug(LDAP_DEBUG_ANY, "upgradedn %s: %s\n", job->inst->inst_name,
buffer, 0);
+ } else if (job->flags & FLAG_REINDEXING) {
+ LDAPDebug(LDAP_DEBUG_ANY, "reindex %s: %s\n", job->inst->inst_name,
+ buffer, 0);
+ } else {
+ LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name,
+ buffer, 0);
+ }
}
static void import_task_destroy(Slapi_Task *task)
@@ -245,6 +254,22 @@ static int import_attr_callback(void *node, void *param)
ImportJob *job = (ImportJob *)param;
struct attrinfo *a = (struct attrinfo *)node;
+ if (job->flags & FLAG_DRYRUN) { /* dryrun; we don't need the workers */
+ return 0;
+ }
+ if (job->flags & FLAG_UPGRADEDNFORMAT) {
+ /* Bring up import workers just for indexes having DN syntax
+ * attribute type. (except entrydn -- taken care below) */
+ int rc = 0;
+ Slapi_Attr attr = {0};
+ slapi_attr_init(&attr, a->ai_type);
+ rc = slapi_attr_is_dn_syntax_attr(&attr);
+ attr_done(&attr);
+ if (0 == rc) {
+ return 0;
+ }
+ }
+
/* OK, so we now have hold of the attribute structure and the job info,
* let's see what we have. Remember that although this function is called
* many times, all these calls are in the context of a single thread, so we
@@ -255,28 +280,28 @@ static int import_attr_callback(void *node, void *param)
* we build those in the foreman thread.
*/
if (IS_INDEXED(a->ai_indexmask) &&
- (strcasecmp(a->ai_type, "entrydn") != 0) &&
- (strcasecmp(a->ai_type, "parentid") != 0) &&
- (strcasecmp(a->ai_type, "ancestorid") != 0) &&
- (strcasecmp(a->ai_type, numsubordinates) != 0)) {
- /* Make an import_index_info structure, fill it in and insert into the
- * job's list */
- IndexInfo *info = CALLOC(IndexInfo);
+ (strcasecmp(a->ai_type, "entrydn") != 0) &&
+ (strcasecmp(a->ai_type, "parentid") != 0) &&
+ (strcasecmp(a->ai_type, "ancestorid") != 0) &&
+ (strcasecmp(a->ai_type, numsubordinates) != 0)) {
+ /* Make an import_index_info structure, fill it in and insert into the
+ * job's list */
+ IndexInfo *info = CALLOC(IndexInfo);
- if (NULL == info) {
- /* Memory allocation error */
- return -1;
- }
- info->name = slapi_ch_strdup(a->ai_type);
- info->ai = a;
- if (NULL == info->name) {
- /* Memory allocation error */
- FREE(info);
- return -1;
- }
- info->next = job->index_list;
- job->index_list = info;
- job->number_indexers++;
+ if (NULL == info) {
+ /* Memory allocation error */
+ return -1;
+ }
+ info->name = slapi_ch_strdup(a->ai_type);
+ info->ai = a;
+ if (NULL == info->name) {
+ /* Memory allocation error */
+ FREE(info);
+ return -1;
+ }
+ info->next = job->index_list;
+ job->index_list = info;
+ job->number_indexers++;
}
return 0;
}
@@ -401,12 +426,12 @@ static int import_start_threads(ImportJob *job)
import_init_worker_info(foreman, job);
foreman->work_type = FOREMAN;
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman,
- PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
- PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
+ PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
PRErrorCode prerr = PR_GetError();
LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, "
- SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
- prerr, slapd_pr_strerror(prerr), 0);
+ SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
+ prerr, slapd_pr_strerror(prerr), 0);
FREE(foreman);
goto error;
}
@@ -658,7 +683,7 @@ static int import_monitor_threads(ImportJob *job, int *status)
time_t time_now = 0;
time_t last_time = 0;
time_t time_interval = 0;
-
+ int rc = 0;
for (current_worker = job->worker_list; current_worker != NULL;
current_worker = current_worker->next) {
@@ -721,7 +746,10 @@ static int import_monitor_threads(ImportJob *job, int *status)
import_calc_rate(current_worker, time_interval);
import_print_worker_status(current_worker);
}
- if (current_worker->state != FINISHED) {
+ if (current_worker->state == QUIT) {
+ rc = DRYRUN_QUIT; /* Set the RC; Don't abort now;
+ We have to stop other threads */
+ } else if (current_worker->state != FINISHED) {
finished = 0;
}
if (current_worker->state == ABORTED) {
@@ -764,8 +792,10 @@ static int import_monitor_threads(ImportJob *job, int *status)
/* if the producer is finished, and the foreman has caught up... */
if (producer) {
- producer_done = (producer->state == FINISHED);
+ producer_done = (producer->state == FINISHED) ||
+ (producer->state == QUIT);
} else {
+ /* set in ldbm_back_wire_import */
producer_done = (job->flags & FLAG_PRODUCER_DONE);
}
if (producer_done && (job->lead_ID == job->ready_ID)) {
@@ -801,6 +831,7 @@ static int import_monitor_threads(ImportJob *job, int *status)
for (current_worker = job->worker_list; current_worker != NULL; ) {
if ((current_worker->state != FINISHED) &&
(current_worker->state != ABORTED) &&
+ (current_worker->state != QUIT) &&
(current_worker->work_type != PRODUCER)) {
DS_Sleep(tenthsecond); /* Only sleep if we hit a thread that is still not done */
continue;
@@ -813,7 +844,7 @@ static int import_monitor_threads(ImportJob *job, int *status)
/* If we're here and giveup is true, and the primary hadn't finished
* processing the input files, we need to return IMPORT_INCOMPLETE_PASS */
if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) ||
- (job->flags & FLAG_REINDEXING /* support multi-pass */))) {
+ (job->flags & FLAG_REINDEXING /* support multi-paths */))) {
if (producer_done && (job->ready_ID == job->lead_ID)) {
/* foreman caught up with the producer, and the producer is
* done.
@@ -825,7 +856,7 @@ static int import_monitor_threads(ImportJob *job, int *status)
} else {
*status = IMPORT_COMPLETE_PASS;
}
- return 0;
+ return rc;
error_abort:
return ERR_IMPORT_ABORTED;
@@ -842,16 +873,16 @@ static int import_run_pass(ImportJob *job, int *status)
ret = import_start_threads(job);
if (ret != 0) {
import_log_notice(job, "Starting threads failed: %d\n", ret);
- goto error;
+ goto error;
}
/* Monitor the threads until we're done or fail */
ret = import_monitor_threads(job, status);
- if (ret == ERR_IMPORT_ABORTED) {
+ if ((ret == ERR_IMPORT_ABORTED) || (ret == DRYRUN_QUIT)) {
goto error;
} else if (ret != 0) {
import_log_notice(job, "Thread monitoring aborted: %d\n", ret);
- goto error;
+ goto error;
}
error:
@@ -875,20 +906,18 @@ static void import_set_abort_flag_all(ImportJob *job, int wait_for_them)
/* allow all the aborts to be processed */
DS_Sleep(PR_MillisecondsToInterval(3000));
- if (wait_for_them) {
+ if (wait_for_them) {
/* Having done that, wait for them to say that they've stopped */
for (worker = job->worker_list; worker != NULL; ) {
DS_Sleep(PR_MillisecondsToInterval(100));
- if ((worker->state != FINISHED) &&
- (worker->state != ABORTED)){
+ if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
+ (worker->state != QUIT)) {
continue;
- }
- else{
+ } else {
worker = worker->next;
- }
+ }
}
}
-
}
@@ -907,11 +936,12 @@ void import_abort_all(ImportJob *job, int wait_for_them)
/* Having done that, wait for them to say that they've stopped */
for (worker = job->worker_list; worker != NULL; ) {
DS_Sleep(PR_MillisecondsToInterval(100));
- if ((worker->state != FINISHED) &&
- (worker->state != ABORTED))
+ if ((worker->state != FINISHED) && (worker->state != ABORTED) &&
+ (worker->state != QUIT)) {
continue;
- else
+ } else {
worker = worker->next;
+ }
}
}
}
@@ -927,11 +957,10 @@ int import_make_merge_filenames(char *directory, char *indexname, int pass,
*oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX);
*newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass,
LDBM_FILENAME_SUFFIX);
- if (!*oldname || !*newname) {
- slapi_ch_free_string(oldname);
- slapi_ch_free_string(newname);
- return -1;
- }
+ if (!*oldname || !*newname) { slapi_ch_free_string(oldname);
+ slapi_ch_free_string(newname);
+ return -1;
+ }
return 0;
}
@@ -1018,8 +1047,8 @@ static int import_all_done(ImportJob *job, int ret)
ldbm_instance *inst = job->inst;
/* Writing this file indicates to future server startups that
- * the db is OK */
- if (ret == 0) {
+ * the db is OK unless it's in the dry run mode. */
+ if ((ret == 0) && !(job->flags & FLAG_DRYRUN)) {
char inst_dir[MAXPATHLEN*2];
char *inst_dirp = NULL;
inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst,
@@ -1077,10 +1106,20 @@ int import_main_offline(void *arg)
int verbose = 1;
int aborted = 0;
ImportWorkerInfo *producer = NULL;
+ char *opstr = "Import";
if (job->task)
slapi_task_inc_refcount(job->task);
+ if (job->flags & FLAG_UPGRADEDNFORMAT) {
+ if (job->flags & FLAG_DRYRUN) {
+ opstr = "Upgrade Dn Dryrun";
+ } else {
+ opstr = "Upgrade Dn";
+ }
+ } else if (job->flags & FLAG_REINDEXING) {
+ opstr = "Reindexing";
+ }
PR_ASSERT(inst != NULL);
time(&beginning);
@@ -1119,7 +1158,20 @@ int import_main_offline(void *arg)
/* start the producer */
import_init_worker_info(producer, job);
producer->work_type = PRODUCER;
- if (job->flags & FLAG_REINDEXING)
+ if (job->flags & FLAG_UPGRADEDNFORMAT)
+ {
+ if (! CREATE_THREAD(PR_USER_THREAD, (VFP)upgradedn_producer,
+ producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
+ PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) {
+ PRErrorCode prerr = PR_GetError();
+ LDAPDebug(LDAP_DEBUG_ANY,
+ "unable to spawn upgrade dn producer thread, "
+ SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n",
+ prerr, slapd_pr_strerror(prerr), 0);
+ goto error;
+ }
+ }
+ else if (job->flags & FLAG_REINDEXING)
{
if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer,
PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD,
@@ -1188,15 +1240,18 @@ int import_main_offline(void *arg)
if (ret == ERR_IMPORT_ABORTED) {
/* at least one of the threads has aborted -- shut down ALL
* of the threads */
- import_log_notice(job, "Aborting all import threads...");
+ import_log_notice(job, "Aborting all %s threads...", opstr);
/* this abort sets the abort flag on the threads and will block for
* the exit of all threads
*/
import_set_abort_flag_all(job, 1);
- import_log_notice(job, "Import threads aborted.");
+ import_log_notice(job, "%s threads aborted.", opstr);
aborted = 1;
goto error;
}
+ if (ret == DRYRUN_QUIT) {
+ goto error; /* Found the candidate; close the db files and quit */
+ }
if (0 != ret) {
/* Some horrible fate has befallen the import */
@@ -1293,7 +1348,8 @@ int import_main_offline(void *arg)
* Database. */
error:
- /* If we fail, the database is now in a mess, so we delete it */
+ /* If we fail, the database is now in a mess, so we delete it
+ except dry run mode */
import_log_notice(job, "Closing files...");
cache_clear(&job->inst->inst_cache);
if (aborted) {
@@ -1307,7 +1363,10 @@ error:
}
}
if (0 != ret) {
- dblayer_delete_instance_dir(be);
+ if (!(job->flags & FLAG_DRYRUN)) { /* If not dryrun */
+ /* if running in the dry run mode, don't touch the db */
+ dblayer_delete_instance_dir(be);
+ }
dblayer_instance_close(job->inst->inst_be);
} else {
if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) {
@@ -1321,52 +1380,82 @@ error:
if (verbose && (0 == ret)) {
int seconds_to_import = end - beginning;
size_t entries_processed = job->lead_ID - (job->starting_ID - 1);
- double entries_per_second = (double) entries_processed /
- (double) seconds_to_import;
-
- if (job->not_here_skipped)
- {
- if (job->skipped)
- import_log_notice(job, "Import complete. Processed %lu entries "
- "(%d bad entries were skipped, "
- "%d entries were skipped because they don't "
- "belong to this database) in %d seconds. "
- "(%.2f entries/sec)", entries_processed,
- job->skipped, job->not_here_skipped,
- seconds_to_import, entries_per_second);
- else
- import_log_notice(job, "Import complete. Processed %lu entries "
- "(%d entries were skipped because they don't "
- "belong to this database) "
- "in %d seconds. (%.2f entries/sec)",
- entries_processed, job->not_here_skipped,
- seconds_to_import, entries_per_second);
- }
- else
- {
- if (job->skipped)
- import_log_notice(job, "Import complete. Processed %lu entries "
- "(%d were skipped) in %d seconds. "
- "(%.2f entries/sec)", entries_processed,
- job->skipped, seconds_to_import,
- entries_per_second);
- else
- import_log_notice(job, "Import complete. Processed %lu entries "
- "in %d seconds. (%.2f entries/sec)",
- entries_processed, seconds_to_import,
- entries_per_second);
+ double entries_per_second =
+ seconds_to_import ?
+ (double)entries_processed / (double)seconds_to_import : 0;
+
+ if (job->not_here_skipped) {
+ if (job->skipped) {
+ import_log_notice(job,
+ "%s complete. Processed %lu entries "
+ "(%d bad entries were skipped, "
+ "%d entries were skipped because they don't "
+ "belong to this database) in %d seconds. "
+ "(%.2f entries/sec)",
+ opstr, entries_processed,
+ job->skipped, job->not_here_skipped,
+ seconds_to_import, entries_per_second);
+ } else {
+ import_log_notice(job,
+ "%s complete. Processed %lu entries "
+ "(%d entries were skipped because they don't "
+ "belong to this database) "
+ "in %d seconds. (%.2f entries/sec)",
+ opstr, entries_processed,
+ job->not_here_skipped, seconds_to_import,
+ entries_per_second);
+ }
+ } else {
+ if (job->skipped) {
+ import_log_notice(job,
+ "%s complete. Processed %lu entries "
+ "(%d were skipped) in %d seconds. "
+ "(%.2f entries/sec)",
+ opstr, entries_processed,
+ job->skipped, seconds_to_import,
+ entries_per_second);
+ } else {
+ import_log_notice(job,
+ "%s complete. Processed %lu entries "
+ "in %d seconds. (%.2f entries/sec)",
+ opstr, entries_processed,
+ seconds_to_import, entries_per_second);
+ }
}
}
- if (0 != ret) {
- import_log_notice(job, "Import failed.");
+ if (job->flags & FLAG_DRYRUN) {
+ if (0 == ret) {
+ import_log_notice(job, "%s complete. %s is up-to-date.",
+ opstr, job->inst->inst_name);
+ ret = 1;
+ if (job->task) {
+ slapi_task_dec_refcount(job->task);
+ }
+ import_all_done(job, ret);
+ } else if (DRYRUN_QUIT == ret) {
+ import_log_notice(job, "%s complete. %s needs upgradednformat.",
+ opstr, job->inst->inst_name);
+ if (job->task) {
+ slapi_task_dec_refcount(job->task);
+ }
+ import_all_done(job, ret);
+ ret = 0;
+ } else {
+ ret = -1;
+ if (job->task != NULL) {
+ slapi_task_finish(job->task, ret);
+ }
+ }
+ } else if (0 != ret) {
+ import_log_notice(job, "%s failed.", opstr);
if (job->task != NULL) {
slapi_task_finish(job->task, ret);
}
} else {
- if (job->task)
+ if (job->task) {
slapi_task_dec_refcount(job->task);
-
+ }
import_all_done(job, ret);
}
@@ -1376,7 +1465,6 @@ error:
import_free_job(job);
if (producer)
FREE(producer);
-
return(ret);
}
@@ -1399,6 +1487,7 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
char **name_array = NULL;
int total_files, i;
PRThread *thread = NULL;
+ int ud_flags = 0;
job = CALLOC(ImportJob);
if (job == NULL) {
@@ -1412,6 +1501,7 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
job->inst = (ldbm_instance *)be->be_instance_info;
slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes );
slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array );
+ slapi_pblock_get( pb, SLAPI_SEQ_TYPE, &ud_flags ); /* For upgrade dn */
/* the removedupvals field is blatantly overloaded here to mean
* the chunk size too. (chunk size = number of entries that should
@@ -1437,12 +1527,22 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
}
job->flags = FLAG_USE_FILES;
- if (NULL == name_array) /* no ldif file is given -> reindexing */
- job->flags |= FLAG_REINDEXING;
- if (!noattrindexes)
- job->flags |= FLAG_INDEX_ATTRS;
- for (i = 0; name_array && name_array[i] != NULL; i++)
+ if (NULL == name_array) { /* no ldif file is given -> reindexing */
+ if (ud_flags & SLAPI_UPGRADEDNFORMAT) {
+ job->flags |= FLAG_UPGRADEDNFORMAT;
+ if (ud_flags & SLAPI_DRYRUN) {
+ job->flags |= FLAG_DRYRUN;
+ }
+ } else {
+ job->flags |= FLAG_REINDEXING;
+ }
+ }
+ if (!noattrindexes) {
+ job->flags |= FLAG_INDEX_ATTRS;
+ }
+ for (i = 0; name_array && name_array[i] != NULL; i++) {
charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i]));
+ }
job->starting_ID = 1;
job->first_ID = 1;
job->mothers = CALLOC(import_subcount_stuff);
@@ -1450,10 +1550,10 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
/* how much space should we allocate to index buffering? */
job->job_index_buffer_size = import_get_index_buffer_size();
if (job->job_index_buffer_size == 0) {
- /* 10% of the allocated cache size + one meg */
+ /* 10% of the allocated cache size + one meg */
PR_Lock(job->inst->inst_li->li_config_mutex);
- job->job_index_buffer_size = (job->inst->inst_li->li_import_cachesize/10) +
- (1024*1024);
+ job->job_index_buffer_size =
+ (job->inst->inst_li->li_import_cachesize/10) + (1024*1024);
PR_Unlock(job->inst->inst_li->li_config_mutex);
}
import_subcount_stuff_init(job->mothers);
@@ -1466,18 +1566,19 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb)
/* add 1 to account for post-import cleanup (which can take a
* significant amount of time)
*/
- /* NGK - This should eventually be cleaned up to use the public
- * task API. */
- if (0 == total_files) /* reindexing */
- job->task->task_work = 2;
- else
- job->task->task_work = total_files + 1;
- job->task->task_progress = 0;
- job->task->task_state = SLAPI_TASK_RUNNING;
- slapi_task_set_data(job->task, job);
- slapi_task_set_destructor_fn(job->task, import_task_destroy);
- slapi_task_set_cancel_fn(job->task, import_task_abort);
- job->flags |= FLAG_ONLINE;
+ /* NGK - This should eventually be cleaned up to use the public
+ * task API. */
+ if (0 == total_files) { /* reindexing */
+ job->task->task_work = 2;
+ } else {
+ job->task->task_work = total_files + 1;
+ }
+ job->task->task_progress = 0;
+ job->task->task_state = SLAPI_TASK_RUNNING;
+ slapi_task_set_data(job->task, job);
+ slapi_task_set_destructor_fn(job->task, import_task_destroy);
+ slapi_task_set_cancel_fn(job->task, import_task_abort);
+ job->flags |= FLAG_ONLINE;
/* create thread for import_main, so we can return */
thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job,