diff options
Diffstat (limited to 'ldap/servers/slapd/back-ldbm/import.c')
-rw-r--r-- | ldap/servers/slapd/back-ldbm/import.c | 331 |
1 files changed, 216 insertions, 115 deletions
diff --git a/ldap/servers/slapd/back-ldbm/import.c b/ldap/servers/slapd/back-ldbm/import.c index df0fa5d5..8b66705f 100644 --- a/ldap/servers/slapd/back-ldbm/import.c +++ b/ldap/servers/slapd/back-ldbm/import.c @@ -51,6 +51,7 @@ #include "import.h" #define ERR_IMPORT_ABORTED -23 +#define DRYRUN_QUIT -24 /********** routines to manipulate the entry fifo **********/ @@ -193,8 +194,16 @@ void import_log_notice(ImportJob *job, char *format, ...) slapi_task_log_notice(job->task, "%s", buffer); } /* also save it in the logs for posterity */ - LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name, + if (job->flags & FLAG_UPGRADEDNFORMAT) { + LDAPDebug(LDAP_DEBUG_ANY, "upgradedn %s: %s\n", job->inst->inst_name, buffer, 0); + } else if (job->flags & FLAG_REINDEXING) { + LDAPDebug(LDAP_DEBUG_ANY, "reindex %s: %s\n", job->inst->inst_name, + buffer, 0); + } else { + LDAPDebug(LDAP_DEBUG_ANY, "import %s: %s\n", job->inst->inst_name, + buffer, 0); + } } static void import_task_destroy(Slapi_Task *task) @@ -245,6 +254,22 @@ static int import_attr_callback(void *node, void *param) ImportJob *job = (ImportJob *)param; struct attrinfo *a = (struct attrinfo *)node; + if (job->flags & FLAG_DRYRUN) { /* dryrun; we don't need the workers */ + return 0; + } + if (job->flags & FLAG_UPGRADEDNFORMAT) { + /* Bring up import workers just for indexes having DN syntax + * attribute type. (except entrydn -- taken care below) */ + int rc = 0; + Slapi_Attr attr = {0}; + slapi_attr_init(&attr, a->ai_type); + rc = slapi_attr_is_dn_syntax_attr(&attr); + attr_done(&attr); + if (0 == rc) { + return 0; + } + } + /* OK, so we now have hold of the attribute structure and the job info, * let's see what we have. Remember that although this function is called * many times, all these calls are in the context of a single thread, so we @@ -255,28 +280,28 @@ static int import_attr_callback(void *node, void *param) * we build those in the foreman thread. */ if (IS_INDEXED(a->ai_indexmask) && - (strcasecmp(a->ai_type, "entrydn") != 0) && - (strcasecmp(a->ai_type, "parentid") != 0) && - (strcasecmp(a->ai_type, "ancestorid") != 0) && - (strcasecmp(a->ai_type, numsubordinates) != 0)) { - /* Make an import_index_info structure, fill it in and insert into the - * job's list */ - IndexInfo *info = CALLOC(IndexInfo); + (strcasecmp(a->ai_type, "entrydn") != 0) && + (strcasecmp(a->ai_type, "parentid") != 0) && + (strcasecmp(a->ai_type, "ancestorid") != 0) && + (strcasecmp(a->ai_type, numsubordinates) != 0)) { + /* Make an import_index_info structure, fill it in and insert into the + * job's list */ + IndexInfo *info = CALLOC(IndexInfo); - if (NULL == info) { - /* Memory allocation error */ - return -1; - } - info->name = slapi_ch_strdup(a->ai_type); - info->ai = a; - if (NULL == info->name) { - /* Memory allocation error */ - FREE(info); - return -1; - } - info->next = job->index_list; - job->index_list = info; - job->number_indexers++; + if (NULL == info) { + /* Memory allocation error */ + return -1; + } + info->name = slapi_ch_strdup(a->ai_type); + info->ai = a; + if (NULL == info->name) { + /* Memory allocation error */ + FREE(info); + return -1; + } + info->next = job->index_list; + job->index_list = info; + job->number_indexers++; } return 0; } @@ -401,12 +426,12 @@ static int import_start_threads(ImportJob *job) import_init_worker_info(foreman, job); foreman->work_type = FOREMAN; if (! CREATE_THREAD(PR_USER_THREAD, (VFP)import_foreman, foreman, - PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD, - PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) { + PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD, + PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) { PRErrorCode prerr = PR_GetError(); LDAPDebug(LDAP_DEBUG_ANY, "unable to spawn import foreman thread, " - SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", - prerr, slapd_pr_strerror(prerr), 0); + SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + prerr, slapd_pr_strerror(prerr), 0); FREE(foreman); goto error; } @@ -658,7 +683,7 @@ static int import_monitor_threads(ImportJob *job, int *status) time_t time_now = 0; time_t last_time = 0; time_t time_interval = 0; - + int rc = 0; for (current_worker = job->worker_list; current_worker != NULL; current_worker = current_worker->next) { @@ -721,7 +746,10 @@ static int import_monitor_threads(ImportJob *job, int *status) import_calc_rate(current_worker, time_interval); import_print_worker_status(current_worker); } - if (current_worker->state != FINISHED) { + if (current_worker->state == QUIT) { + rc = DRYRUN_QUIT; /* Set the RC; Don't abort now; + We have to stop other threads */ + } else if (current_worker->state != FINISHED) { finished = 0; } if (current_worker->state == ABORTED) { @@ -764,8 +792,10 @@ static int import_monitor_threads(ImportJob *job, int *status) /* if the producer is finished, and the foreman has caught up... */ if (producer) { - producer_done = (producer->state == FINISHED); + producer_done = (producer->state == FINISHED) || + (producer->state == QUIT); } else { + /* set in ldbm_back_wire_import */ producer_done = (job->flags & FLAG_PRODUCER_DONE); } if (producer_done && (job->lead_ID == job->ready_ID)) { @@ -801,6 +831,7 @@ static int import_monitor_threads(ImportJob *job, int *status) for (current_worker = job->worker_list; current_worker != NULL; ) { if ((current_worker->state != FINISHED) && (current_worker->state != ABORTED) && + (current_worker->state != QUIT) && (current_worker->work_type != PRODUCER)) { DS_Sleep(tenthsecond); /* Only sleep if we hit a thread that is still not done */ continue; @@ -813,7 +844,7 @@ static int import_monitor_threads(ImportJob *job, int *status) /* If we're here and giveup is true, and the primary hadn't finished * processing the input files, we need to return IMPORT_INCOMPLETE_PASS */ if (giveup && (job->input_filenames || (job->flags & FLAG_ONLINE) || - (job->flags & FLAG_REINDEXING /* support multi-pass */))) { + (job->flags & FLAG_REINDEXING /* support multi-paths */))) { if (producer_done && (job->ready_ID == job->lead_ID)) { /* foreman caught up with the producer, and the producer is * done. @@ -825,7 +856,7 @@ static int import_monitor_threads(ImportJob *job, int *status) } else { *status = IMPORT_COMPLETE_PASS; } - return 0; + return rc; error_abort: return ERR_IMPORT_ABORTED; @@ -842,16 +873,16 @@ static int import_run_pass(ImportJob *job, int *status) ret = import_start_threads(job); if (ret != 0) { import_log_notice(job, "Starting threads failed: %d\n", ret); - goto error; + goto error; } /* Monitor the threads until we're done or fail */ ret = import_monitor_threads(job, status); - if (ret == ERR_IMPORT_ABORTED) { + if ((ret == ERR_IMPORT_ABORTED) || (ret == DRYRUN_QUIT)) { goto error; } else if (ret != 0) { import_log_notice(job, "Thread monitoring aborted: %d\n", ret); - goto error; + goto error; } error: @@ -875,20 +906,18 @@ static void import_set_abort_flag_all(ImportJob *job, int wait_for_them) /* allow all the aborts to be processed */ DS_Sleep(PR_MillisecondsToInterval(3000)); - if (wait_for_them) { + if (wait_for_them) { /* Having done that, wait for them to say that they've stopped */ for (worker = job->worker_list; worker != NULL; ) { DS_Sleep(PR_MillisecondsToInterval(100)); - if ((worker->state != FINISHED) && - (worker->state != ABORTED)){ + if ((worker->state != FINISHED) && (worker->state != ABORTED) && + (worker->state != QUIT)) { continue; - } - else{ + } else { worker = worker->next; - } + } } } - } @@ -907,11 +936,12 @@ void import_abort_all(ImportJob *job, int wait_for_them) /* Having done that, wait for them to say that they've stopped */ for (worker = job->worker_list; worker != NULL; ) { DS_Sleep(PR_MillisecondsToInterval(100)); - if ((worker->state != FINISHED) && - (worker->state != ABORTED)) + if ((worker->state != FINISHED) && (worker->state != ABORTED) && + (worker->state != QUIT)) { continue; - else + } else { worker = worker->next; + } } } } @@ -927,11 +957,10 @@ int import_make_merge_filenames(char *directory, char *indexname, int pass, *oldname = slapi_ch_smprintf("%s/%s%s", directory, indexname, LDBM_FILENAME_SUFFIX); *newname = slapi_ch_smprintf("%s/%s.%d%s", directory, indexname, pass, LDBM_FILENAME_SUFFIX); - if (!*oldname || !*newname) { - slapi_ch_free_string(oldname); - slapi_ch_free_string(newname); - return -1; - } + if (!*oldname || !*newname) { slapi_ch_free_string(oldname); + slapi_ch_free_string(newname); + return -1; + } return 0; } @@ -1018,8 +1047,8 @@ static int import_all_done(ImportJob *job, int ret) ldbm_instance *inst = job->inst; /* Writing this file indicates to future server startups that - * the db is OK */ - if (ret == 0) { + * the db is OK unless it's in the dry run mode. */ + if ((ret == 0) && !(job->flags & FLAG_DRYRUN)) { char inst_dir[MAXPATHLEN*2]; char *inst_dirp = NULL; inst_dirp = dblayer_get_full_inst_dir(inst->inst_li, inst, @@ -1077,10 +1106,20 @@ int import_main_offline(void *arg) int verbose = 1; int aborted = 0; ImportWorkerInfo *producer = NULL; + char *opstr = "Import"; if (job->task) slapi_task_inc_refcount(job->task); + if (job->flags & FLAG_UPGRADEDNFORMAT) { + if (job->flags & FLAG_DRYRUN) { + opstr = "Upgrade Dn Dryrun"; + } else { + opstr = "Upgrade Dn"; + } + } else if (job->flags & FLAG_REINDEXING) { + opstr = "Reindexing"; + } PR_ASSERT(inst != NULL); time(&beginning); @@ -1119,7 +1158,20 @@ int import_main_offline(void *arg) /* start the producer */ import_init_worker_info(producer, job); producer->work_type = PRODUCER; - if (job->flags & FLAG_REINDEXING) + if (job->flags & FLAG_UPGRADEDNFORMAT) + { + if (! CREATE_THREAD(PR_USER_THREAD, (VFP)upgradedn_producer, + producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD, + PR_UNJOINABLE_THREAD, SLAPD_DEFAULT_THREAD_STACKSIZE)) { + PRErrorCode prerr = PR_GetError(); + LDAPDebug(LDAP_DEBUG_ANY, + "unable to spawn upgrade dn producer thread, " + SLAPI_COMPONENT_NAME_NSPR " error %d (%s)\n", + prerr, slapd_pr_strerror(prerr), 0); + goto error; + } + } + else if (job->flags & FLAG_REINDEXING) { if (! CREATE_THREAD(PR_USER_THREAD, (VFP)index_producer, producer, PR_PRIORITY_NORMAL, PR_GLOBAL_BOUND_THREAD, @@ -1188,15 +1240,18 @@ int import_main_offline(void *arg) if (ret == ERR_IMPORT_ABORTED) { /* at least one of the threads has aborted -- shut down ALL * of the threads */ - import_log_notice(job, "Aborting all import threads..."); + import_log_notice(job, "Aborting all %s threads...", opstr); /* this abort sets the abort flag on the threads and will block for * the exit of all threads */ import_set_abort_flag_all(job, 1); - import_log_notice(job, "Import threads aborted."); + import_log_notice(job, "%s threads aborted.", opstr); aborted = 1; goto error; } + if (ret == DRYRUN_QUIT) { + goto error; /* Found the candidate; close the db files and quit */ + } if (0 != ret) { /* Some horrible fate has befallen the import */ @@ -1293,7 +1348,8 @@ int import_main_offline(void *arg) * Database. */ error: - /* If we fail, the database is now in a mess, so we delete it */ + /* If we fail, the database is now in a mess, so we delete it + except dry run mode */ import_log_notice(job, "Closing files..."); cache_clear(&job->inst->inst_cache); if (aborted) { @@ -1307,7 +1363,10 @@ error: } } if (0 != ret) { - dblayer_delete_instance_dir(be); + if (!(job->flags & FLAG_DRYRUN)) { /* If not dryrun */ + /* if running in the dry run mode, don't touch the db */ + dblayer_delete_instance_dir(be); + } dblayer_instance_close(job->inst->inst_be); } else { if (0 != (ret = dblayer_instance_close(job->inst->inst_be)) ) { @@ -1321,52 +1380,82 @@ error: if (verbose && (0 == ret)) { int seconds_to_import = end - beginning; size_t entries_processed = job->lead_ID - (job->starting_ID - 1); - double entries_per_second = (double) entries_processed / - (double) seconds_to_import; - - if (job->not_here_skipped) - { - if (job->skipped) - import_log_notice(job, "Import complete. Processed %lu entries " - "(%d bad entries were skipped, " - "%d entries were skipped because they don't " - "belong to this database) in %d seconds. " - "(%.2f entries/sec)", entries_processed, - job->skipped, job->not_here_skipped, - seconds_to_import, entries_per_second); - else - import_log_notice(job, "Import complete. Processed %lu entries " - "(%d entries were skipped because they don't " - "belong to this database) " - "in %d seconds. (%.2f entries/sec)", - entries_processed, job->not_here_skipped, - seconds_to_import, entries_per_second); - } - else - { - if (job->skipped) - import_log_notice(job, "Import complete. Processed %lu entries " - "(%d were skipped) in %d seconds. " - "(%.2f entries/sec)", entries_processed, - job->skipped, seconds_to_import, - entries_per_second); - else - import_log_notice(job, "Import complete. Processed %lu entries " - "in %d seconds. (%.2f entries/sec)", - entries_processed, seconds_to_import, - entries_per_second); + double entries_per_second = + seconds_to_import ? + (double)entries_processed / (double)seconds_to_import : 0; + + if (job->not_here_skipped) { + if (job->skipped) { + import_log_notice(job, + "%s complete. Processed %lu entries " + "(%d bad entries were skipped, " + "%d entries were skipped because they don't " + "belong to this database) in %d seconds. " + "(%.2f entries/sec)", + opstr, entries_processed, + job->skipped, job->not_here_skipped, + seconds_to_import, entries_per_second); + } else { + import_log_notice(job, + "%s complete. Processed %lu entries " + "(%d entries were skipped because they don't " + "belong to this database) " + "in %d seconds. (%.2f entries/sec)", + opstr, entries_processed, + job->not_here_skipped, seconds_to_import, + entries_per_second); + } + } else { + if (job->skipped) { + import_log_notice(job, + "%s complete. Processed %lu entries " + "(%d were skipped) in %d seconds. " + "(%.2f entries/sec)", + opstr, entries_processed, + job->skipped, seconds_to_import, + entries_per_second); + } else { + import_log_notice(job, + "%s complete. Processed %lu entries " + "in %d seconds. (%.2f entries/sec)", + opstr, entries_processed, + seconds_to_import, entries_per_second); + } } } - if (0 != ret) { - import_log_notice(job, "Import failed."); + if (job->flags & FLAG_DRYRUN) { + if (0 == ret) { + import_log_notice(job, "%s complete. %s is up-to-date.", + opstr, job->inst->inst_name); + ret = 1; + if (job->task) { + slapi_task_dec_refcount(job->task); + } + import_all_done(job, ret); + } else if (DRYRUN_QUIT == ret) { + import_log_notice(job, "%s complete. %s needs upgradednformat.", + opstr, job->inst->inst_name); + if (job->task) { + slapi_task_dec_refcount(job->task); + } + import_all_done(job, ret); + ret = 0; + } else { + ret = -1; + if (job->task != NULL) { + slapi_task_finish(job->task, ret); + } + } + } else if (0 != ret) { + import_log_notice(job, "%s failed.", opstr); if (job->task != NULL) { slapi_task_finish(job->task, ret); } } else { - if (job->task) + if (job->task) { slapi_task_dec_refcount(job->task); - + } import_all_done(job, ret); } @@ -1376,7 +1465,6 @@ error: import_free_job(job); if (producer) FREE(producer); - return(ret); } @@ -1399,6 +1487,7 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb) char **name_array = NULL; int total_files, i; PRThread *thread = NULL; + int ud_flags = 0; job = CALLOC(ImportJob); if (job == NULL) { @@ -1412,6 +1501,7 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb) job->inst = (ldbm_instance *)be->be_instance_info; slapi_pblock_get( pb, SLAPI_LDIF2DB_NOATTRINDEXES, &noattrindexes ); slapi_pblock_get( pb, SLAPI_LDIF2DB_FILE, &name_array ); + slapi_pblock_get( pb, SLAPI_SEQ_TYPE, &ud_flags ); /* For upgrade dn */ /* the removedupvals field is blatantly overloaded here to mean * the chunk size too. (chunk size = number of entries that should @@ -1437,12 +1527,22 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb) } job->flags = FLAG_USE_FILES; - if (NULL == name_array) /* no ldif file is given -> reindexing */ - job->flags |= FLAG_REINDEXING; - if (!noattrindexes) - job->flags |= FLAG_INDEX_ATTRS; - for (i = 0; name_array && name_array[i] != NULL; i++) + if (NULL == name_array) { /* no ldif file is given -> reindexing */ + if (ud_flags & SLAPI_UPGRADEDNFORMAT) { + job->flags |= FLAG_UPGRADEDNFORMAT; + if (ud_flags & SLAPI_DRYRUN) { + job->flags |= FLAG_DRYRUN; + } + } else { + job->flags |= FLAG_REINDEXING; + } + } + if (!noattrindexes) { + job->flags |= FLAG_INDEX_ATTRS; + } + for (i = 0; name_array && name_array[i] != NULL; i++) { charray_add(&job->input_filenames, slapi_ch_strdup(name_array[i])); + } job->starting_ID = 1; job->first_ID = 1; job->mothers = CALLOC(import_subcount_stuff); @@ -1450,10 +1550,10 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb) /* how much space should we allocate to index buffering? */ job->job_index_buffer_size = import_get_index_buffer_size(); if (job->job_index_buffer_size == 0) { - /* 10% of the allocated cache size + one meg */ + /* 10% of the allocated cache size + one meg */ PR_Lock(job->inst->inst_li->li_config_mutex); - job->job_index_buffer_size = (job->inst->inst_li->li_import_cachesize/10) + - (1024*1024); + job->job_index_buffer_size = + (job->inst->inst_li->li_import_cachesize/10) + (1024*1024); PR_Unlock(job->inst->inst_li->li_config_mutex); } import_subcount_stuff_init(job->mothers); @@ -1466,18 +1566,19 @@ int ldbm_back_ldif2ldbm_deluxe(Slapi_PBlock *pb) /* add 1 to account for post-import cleanup (which can take a * significant amount of time) */ - /* NGK - This should eventually be cleaned up to use the public - * task API. */ - if (0 == total_files) /* reindexing */ - job->task->task_work = 2; - else - job->task->task_work = total_files + 1; - job->task->task_progress = 0; - job->task->task_state = SLAPI_TASK_RUNNING; - slapi_task_set_data(job->task, job); - slapi_task_set_destructor_fn(job->task, import_task_destroy); - slapi_task_set_cancel_fn(job->task, import_task_abort); - job->flags |= FLAG_ONLINE; + /* NGK - This should eventually be cleaned up to use the public + * task API. */ + if (0 == total_files) { /* reindexing */ + job->task->task_work = 2; + } else { + job->task->task_work = total_files + 1; + } + job->task->task_progress = 0; + job->task->task_state = SLAPI_TASK_RUNNING; + slapi_task_set_data(job->task, job); + slapi_task_set_destructor_fn(job->task, import_task_destroy); + slapi_task_set_cancel_fn(job->task, import_task_abort); + job->flags |= FLAG_ONLINE; /* create thread for import_main, so we can return */ thread = PR_CreateThread(PR_USER_THREAD, import_main, (void *)job, |