diff options
author | cvsadm <cvsadm> | 2005-01-21 00:44:34 +0000 |
---|---|---|
committer | cvsadm <cvsadm> | 2005-01-21 00:44:34 +0000 |
commit | b2093e3016027d6b5cf06b3f91f30769bfc099e2 (patch) | |
tree | cf58939393a9032182c4fbc4441164a9456e82f8 /ldap/servers/plugins/replication/cl5_api.c | |
download | ds-ldapserver7x.tar.gz ds-ldapserver7x.tar.xz ds-ldapserver7x.zip |
Moving NSCP Directory Server from DirectoryBranch to TRUNK, initial drop. (foxworth)ldapserver7x
Diffstat (limited to 'ldap/servers/plugins/replication/cl5_api.c')
-rw-r--r-- | ldap/servers/plugins/replication/cl5_api.c | 6512 |
1 files changed, 6512 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c new file mode 100644 index 00000000..792d3646 --- /dev/null +++ b/ldap/servers/plugins/replication/cl5_api.c @@ -0,0 +1,6512 @@ +/** BEGIN COPYRIGHT BLOCK + * Copyright 2001 Sun Microsystems, Inc. + * Portions copyright 1999, 2001-2003 Netscape Communications Corporation. + * All rights reserved. + * END COPYRIGHT BLOCK **/ + +/* cl5_api.c - implementation of 5.0 style changelog API */ + +#include <errno.h> +#include <sys/stat.h> +#if defined( OS_solaris ) || defined( hpux ) +#include <sys/types.h> +#include <sys/statvfs.h> +#endif +#if defined( linux ) +#include <sys/vfs.h> +#endif + + +#include "cl5_api.h" +#include "plhash.h" + +#include "db.h" +#include "cl5_clcache.h" /* To use the Changelog Cache */ +#include "repl5.h" /* for agmt_get_consumer_rid() */ + +#define CL5_TYPE "Changelog5" /* changelog type */ +#define VERSION_SIZE 127 /* size of the buffer to hold changelog version */ +#define GUARDIAN_FILE "guardian" /* name of the guardian file */ +#define VERSION_FILE "DBVERSION" /* name of the version file */ +#define MAX_TRIALS 50 /* number of retries on db operations */ +#define V_5 5 /* changelog entry version */ +#define CHUNK_SIZE 64*1024 +#define DBID_SIZE 64 +#define FILE_SEP "_" /* separates parts of the db file name */ + +#define T_CSNSTR "csn" +#define T_UNIQUEIDSTR "nsuniqueid" +#define T_PARENTIDSTR "parentuniqueid" +#define T_NEWSUPERIORDNSTR "newsuperiordn" +#define T_NEWSUPERIORIDSTR "newsuperioruniqueid" +#define T_REPLGEN "replgen" + +#define ENTRY_COUNT_TIME 111 /* this time is used to construct csn + used to store/retrieve entry count */ +#define PURGE_RUV_TIME 222 /* this time is used to construct csn + used to store purge RUV vector */ +#define MAX_RUV_TIME 333 /* this time is used to construct csn + used to store upper boundary RUV vector */ + +#define DB_EXTENSION_DB3 "db3" +#define DB_EXTENSION "db4" + +#define HASH_BACKETS_COUNT 16 /* number of buckets in a hash table */ + +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 4100 +#define DEFAULT_DB_OP_FLAGS DB_AUTO_COMMIT +#define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval) \ +{ \ + if (((oflags) & DB_INIT_TXN) && ((oflags) & DB_INIT_LOG)) \ + { \ + (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags)|DB_AUTO_COMMIT, (mode)); \ + } \ + else \ + { \ + (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags), (mode)); \ + } \ +} +#else /* older then db 41 */ +#define DEFAULT_DB_OP_FLAGS 0 +#define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval) \ + (rval) = (db)->open((db), (file), (database), (type), (flags), (mode)) +#endif + +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 4000 +#define DB_ENV_SET_REGION_INIT(env) (env)->set_flags((env), DB_REGION_INIT, 1) +#define DB_ENV_SET_TAS_SPINS(env, tas_spins) \ + (env)->set_tas_spins((env), (tas_spins)) +#define TXN_BEGIN(env, parent_txn, tid, flags) \ + (env)->txn_begin((env), (parent_txn), (tid), (flags)) +#define TXN_COMMIT(txn, flags) (txn)->commit((txn), (flags)) +#define TXN_ABORT(txn) (txn)->abort(txn) +#define TXN_CHECKPOINT(env, kbyte, min, flags) \ + (env)->txn_checkpoint((env), (kbyte), (min), (flags)) +#define MEMP_STAT(env, gsp, fsp, flags, malloc) \ + (env)->memp_stat((env), (gsp), (fsp), (flags)) +#define MEMP_TRICKLE(env, pct, nwrotep) \ + (env)->memp_trickle((env), (pct), (nwrotep)) +#define LOG_ARCHIVE(env, listp, flags, malloc) \ + (env)->log_archive((env), (listp), (flags)) +#define LOG_FLUSH(env, lsn) (env)->log_flush((env), (lsn)) +#define LOCK_DETECT(env, flags, atype, aborted) \ + (env)->lock_detect((env), (flags), (atype), (aborted)) + +#else /* older than db 4.0 */ +#define DB_ENV_SET_REGION_INIT(env) db_env_set_region_init(1) +#define DB_ENV_SET_TAS_SPINS(env, tas_spins) \ + db_env_set_tas_spins((tas_spins)) +#define TXN_BEGIN(env, parent_txn, tid, flags) \ + txn_begin((env), (parent_txn), (tid), (flags)) +#define TXN_COMMIT(txn, flags) txn_commit((txn), (flags)) +#define TXN_ABORT(txn) txn_abort((txn)) +#define TXN_CHECKPOINT(env, kbyte, min, flags) \ + txn_checkpoint((env), (kbyte), (min), (flags)) +#define MEMP_TRICKLE(env, pct, nwrotep) memp_trickle((env), (pct), (nwrotep)) +#define LOG_FLUSH(env, lsn) log_flush((env), (lsn)) +#define LOCK_DETECT(env, flags, atype, aborted) \ + lock_detect((env), (flags), (atype), (aborted)) + +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3300 +#define MEMP_STAT(env, gsp, fsp, flags, malloc) memp_stat((env), (gsp), (fsp)) +#define LOG_ARCHIVE(env, listp, flags, malloc) \ + log_archive((env), (listp), (flags)) + +#else /* older than db 3.3 */ +#define MEMP_STAT(env, gsp, fsp, flags, malloc) \ + memp_stat((env), (gsp), (fsp), (malloc)) +#define LOG_ARCHIVE(env, listp, flags, malloc) \ + log_archive((env), (listp), (flags), (malloc)) +#endif +#endif +/* + * The defult thread stacksize for nspr21 is 64k. For OSF, we require + * a larger stacksize as actual storage allocation is higher i.e + * pointers are allocated 8 bytes but lower 4 bytes are used. + * The value 0 means use the default stacksize. + */ +#if defined (OSF1) || defined (__LP64__) || defined (_LP64) /* 64-bit architectures need bigger stacks */ +#define DEFAULT_THREAD_STACKSIZE 131072L +#else +#define DEFAULT_THREAD_STACKSIZE 0 +#endif + +#ifdef _WIN32 +#define FILE_CREATE_MODE S_IREAD | S_IWRITE +#define DIR_CREATE_MODE 0755 +#else /* _WIN32 */ +#define FILE_CREATE_MODE S_IRUSR | S_IWUSR +#define DIR_CREATE_MODE 0755 +#endif + +#define NO_DISK_SPACE 1024 +#define MIN_DISK_SPACE 10485760 /* 10 MB */ + +/***** Data Definitions *****/ + +/* possible changelog open modes */ +typedef enum +{ + CL5_OPEN_NONE, /* nothing specified */ + CL5_OPEN_NORMAL, /* open for normal read/write use */ + CL5_OPEN_RESTORE_RECOVER, /* restore from archive and recover */ + CL5_OPEN_RESTORE, /* restore, but no recovery */ + CL5_OPEN_LDIF2CL, /* open as part of ldif2cl: no locking, + recovery, checkpointing */ + CL5_OPEN_CLEAN_RECOVER /* remove env after recover open (upgrade) */ +} CL5OpenMode; + +#define DB_FILE_DELETED 0x1 +#define DB_FILE_INIT 0x2 +/* this structure represents one changelog file, Each changelog file contains + changes applied to a single backend. Files are named by the database id */ +typedef struct cl5dbfile +{ + char *name; /* file name (with the extension) */ + char *replGen; /* replica generation of the data */ + char *replName; /* replica name */ + DB *db; /* db handle to the changelog file*/ + int entryCount; /* number of entries in the file */ + int flags; /* currently used to mark the file as deleted + * or as initialized */ + RUV *purgeRUV; /* ruv to which the file has been purged */ + RUV *maxRUV; /* ruv that marks the upper boundary of the data */ + char *semaName; /* semaphore name */ + PRSem *sema; /* semaphore for max concurrent cl writes */ +}CL5DBFile; + +/* structure that allows to iterate through entries to be sent to a consumer + that originated on a particular supplier. */ +struct cl5replayiterator +{ + Object *fileObj; + CLC_Buffer *clcache; /* changelog cache */ + ReplicaId consumerRID; /* consumer's RID */ + const RUV *consumerRuv; /* consumer's update vector */ + Object *supplierRuvObj;/* supplier's update vector object */ +}; + +typedef struct cl5iterator +{ + DBC *cursor; /* current position in the db file */ + Object *file; /* handle to release db file object */ +}CL5Iterator; + +/* changelog trimming configuration */ +typedef struct cl5trim +{ + time_t maxAge; /* maximum entry age in seconds */ + int maxEntries; /* maximum number of entries across all changelog files */ + PRLock* lock; /* controls access to trimming configuration */ +} CL5Trim; + +/* this structure defines 5.0 changelog internals */ +typedef struct cl5desc +{ + char *dbDir; /* absolute path to changelog directory */ + DB_ENV *dbEnv; /* db environment shared by all db files */ + int dbEnvOpenFlags;/* openflag used for env->open */ + Objset *dbFiles; /* ref counted set of changelog files (CL5DBFile) */ + PRLock *fileLock; /* ensures that changelog file is not added twice */ + CL5OpenMode dbOpenMode; /* how we open db */ + CL5DBConfig dbConfig; /* database configuration params */ + CL5Trim dbTrim; /* trimming parameters */ + CL5State dbState; /* changelog current state */ + PRRWLock *stLock; /* lock that controls access to the changelog state */ + PRBool dbRmOnClose;/* indicates whether changelog should be removed when + it is closed */ + PRBool fatalError; /* bad stuff happened like out of disk space; don't + write guardian file on close - UnUsed so far */ + int threadCount;/* threads that globally access changelog like + deadlock detection, etc. */ + PRLock *clLock; /* Lock associated to clVar, used to notify threads on close */ + PRCondVar *clCvar; /* Condition Variable used to notify threads on close */ +} CL5Desc; + +typedef void (*VFP)(void *); + +int g_get_shutdown(); /* declared in proto-slap.h */ + +/***** Global Variables *****/ +static CL5Desc s_cl5Desc; + +/***** Forward Declarations *****/ + +/* changelog initialization and cleanup */ +static int _cl5Open (const char *dir, const CL5DBConfig *config, CL5OpenMode openMode); +static int _cl5AppInit (PRBool *didRecovery); +static int _cl5DBOpen (); +static void _cl5SetDefaultDBConfig (); +static void _cl5SetDBConfig (const CL5DBConfig *config); +static void _cl5InitDBEnv(DB_ENV *dbEnv); +static int _cl5CheckDBVersion (); +static int _cl5ReadDBVersion (const char *dir, char *clVersion); +static int _cl5WriteDBVersion (); +static int _cl5CheckGuardian (); +static int _cl5ReadGuardian (char *buff); +static int _cl5WriteGuardian (); +static int _cl5RemoveGuardian (); +static void _cl5Close (); +static int _cl5Delete (const char *dir, PRBool rmDir); +static void _cl5DBClose (); + +/* thread management */ +static int _cl5DispatchDBThreads (); +static int _cl5AddThread (); +static void _cl5RemoveThread (); +static int _cl5DeadlockMain (void *param); +static int _cl5CheckpointMain (void *param); +static int _cl5TrickleMain (void *param); + +/* functions that work with individual changelog files */ +static int _cl5NewDBFile (const char *replName, const char *replGen, CL5DBFile** dbFile); +static int _cl5DBOpenFile (Object *replica, Object **obj, PRBool checkDups); +static int _cl5DBOpenFileByReplicaName (const char *replName, const char *replGen, + Object **obj, PRBool checkDups); +static void _cl5DBCloseFile (void **data); +static void _cl5DBDeleteFile (Object *obj); +static void _cl5DBFileInitialized (Object *obj); +static int _cl5GetDBFile (Object *replica, Object **obj); +static int _cl5GetDBFileByReplicaName (const char *replName, const char *replGen, + Object **obj); +static int _cl5AddDBFile (CL5DBFile *file, Object **obj); +static int _cl5CompareDBFile (Object *el1, const void *el2); +static int _cl5CopyDBFiles (const char *srcDir, const char *distDir, Object **replicas); +static char* _cl5Replica2FileName (Object *replica); +static char* _cl5MakeFileName (const char *replName, const char *replGen); +static PRBool _cl5FileName2Replica (const char *fileName, Object **replica); +static int _cl5ExportFile (PRFileDesc *prFile, Object *obj); +static PRBool _cl5ReplicaInList (Object *replica, Object **replicas); + +/* data storage and retrieval */ +static int _cl5Entry2DBData (const CL5Entry *entry, char **data, PRUint32 *len); +static int _cl5WriteOperation(const char *replName, const char *replGen, + const slapi_operation_parameters *op, PRBool local); +static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid); +static int _cl5GetNextEntry (CL5Entry *entry, void *iterator); +static int _cl5CurrentDeleteEntry (void *iterator); +static PRBool _cl5IsValidIterator (const CL5Iterator *iterator); +static int _cl5GetOperation (Object *replica, slapi_operation_parameters *op); +static const char* _cl5OperationType2Str (int type); +static int _cl5Str2OperationType (const char *str); +static void _cl5WriteString (const char *str, char **buff); +static void _cl5ReadString (char **str, char **buff); +static void _cl5WriteMods (LDAPMod **mods, char **buff); +static void _cl5WriteMod (LDAPMod *mod, char **buff); +static int _cl5ReadMods (LDAPMod ***mods, char **buff); +static int _cl5ReadMod (Slapi_Mod *mod, char **buff); +static int _cl5GetModsSize (LDAPMod **mods); +static int _cl5GetModSize (LDAPMod *mod); +static void _cl5ReadBerval (struct berval *bv, char** buff); +static void _cl5WriteBerval (struct berval *bv, char** buff); +static int _cl5ReadBervals (struct berval ***bv, char** buff, unsigned int size); +static int _cl5WriteBervals (struct berval **bv, char** buff, unsigned int *size); + +/* replay iteration */ +static PRBool _cl5ValidReplayIterator (const CL5ReplayIterator *iterator); +static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consumerRuv, + Object *replica, Object *fileObject, CL5ReplayIterator **iterator); +static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DBFile *file); + +/* changelog trimming */ +static int _cl5TrimInit (); +static void _cl5TrimCleanup (); +static int _cl5TrimMain (void *param); +static void _cl5DoTrimming (); +static void _cl5TrimFile (Object *obj, long *numToTrim); +static PRBool _cl5CanTrim (time_t time, long *numToTrim); +static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge); +static int _cl5WriteRUV (CL5DBFile *file, PRBool purge); +static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge); +static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge); +static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv); + +/* db error processing */ +static void _cl5DBLogPrint(const char* prefix, char *buffer); + +/* bakup/recovery, import/export */ +static PRBool _cl5IsLogFile (const char *name); +static int _cl5Recover (int open_flags, DB_ENV *dbEnv); +static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op, + char **replGen); +static int _cl5Operation2LDIF (const slapi_operation_parameters *op, const char *replGen, + char **ldifEntry, PRInt32 *lenLDIF); + +/* entry count */ +static int _cl5GetEntryCount (CL5DBFile *file); +static int _cl5WriteEntryCount (CL5DBFile *file); + +/* misc */ +static char* _cl5GetHelperEntryKey (int type, char *csnStr); +static Object* _cl5GetReplica (const slapi_operation_parameters *op, const char* replGen); +static int _cl5FileEndsWith(const char *filename, const char *ext); + +/* Callback function for libdb to spit error info into our log */ +static void dblayer_log_print(const char* prefix, char *buffer) +{ + /* We ignore the prefix since we know who we are anyway */ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "libdb: %s\n", buffer); +} + +static PRLock *cl5_diskfull_lock = NULL; +static int cl5_diskfull_flag = 0; + +static void cl5_set_diskfull(); +static void cl5_set_no_diskfull(); + +/***** Module APIs *****/ + +/* Name: cl5Init + Description: initializes changelog module; must be called by a single thread + before any other changelog function. + Parameters: none + Return: CL5_SUCCESS if function is successful; + CL5_SYSTEM_ERROR error if NSPR call fails. + */ +int cl5Init () +{ + s_cl5Desc.stLock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "state_lock"); + if (s_cl5Desc.stLock == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Init: failed to create state lock; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + if ((s_cl5Desc.clLock = PR_NewLock()) == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Init: failed to create on close lock; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + + } + if ((s_cl5Desc.clCvar = PR_NewCondVar(s_cl5Desc.clLock)) == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Init: failed to create on close cvar; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + if (( clcache_init (&s_cl5Desc.dbEnv) != 0 )) { + return CL5_SYSTEM_ERROR; + } + + s_cl5Desc.dbState = CL5_STATE_CLOSED; + s_cl5Desc.fatalError = PR_FALSE; + s_cl5Desc.dbRmOnClose = PR_FALSE; + s_cl5Desc.threadCount = 0; + + if (NULL == cl5_diskfull_lock) + { + cl5_diskfull_lock = PR_NewLock (); + } + + return CL5_SUCCESS; +} + +/* Name: cl5Cleanup + Description: performs cleanup of the changelog module; must be called by a single + thread; it closes changelog if it is still open. + Parameters: none + Return: none + */ +void cl5Cleanup () +{ + /* close db if it is still open */ + if (s_cl5Desc.dbState == CL5_STATE_OPEN) + { + cl5Close (); + } + + if (s_cl5Desc.stLock) + PR_DestroyRWLock (s_cl5Desc.stLock); + s_cl5Desc.stLock = NULL; + + if (cl5_diskfull_lock) + { + PR_DestroyLock (cl5_diskfull_lock); + cl5_diskfull_lock = NULL; + } + + memset (&s_cl5Desc, 0, sizeof (s_cl5Desc)); +} + +/* Name: cl5Open + Description: opens changelog; must be called after changelog is + initialized using cl5Init. It is thread safe and the second + call is ignored. + Parameters: dir - changelog dir + config - db configuration parameters; currently not used + Return: CL5_SUCCESS if successfull; + CL5_BAD_DATA if invalid directory is passed; + CL5_BAD_STATE if changelog is not initialized; + CL5_BAD_DBVERSION if dbversion file is missing or has unexpected data + CL5_SYSTEM_ERROR if NSPR error occured (during db directory creation); + CL5_MEMORY_ERROR if memory allocation fails; + CL5_DB_ERROR if db initialization fails. + */ +int cl5Open (const char *dir, const CL5DBConfig *config) +{ + int rc; + + if (dir == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5Open: null directory\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Open: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* prevent state from changing */ + PR_RWLock_Wlock (s_cl5Desc.stLock); + + /* already open - ignore */ + if (s_cl5Desc.dbState == CL5_STATE_OPEN) + { + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5Open: changelog already opened; request ignored\n"); + rc = CL5_SUCCESS; + goto done; + } + else if (s_cl5Desc.dbState != CL5_STATE_CLOSED) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Open: invalid state - %d\n", s_cl5Desc.dbState); + rc = CL5_BAD_STATE; + goto done; + } + + rc = _cl5Open (dir, config, CL5_OPEN_NORMAL); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Open: failed to open changelog\n"); + goto done; + } + + /* dispatch global threads like deadlock detection, trimming, etc */ + rc = _cl5DispatchDBThreads (); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Open: failed to start database monitoring threads\n"); + + _cl5Close (); + } + else + { + s_cl5Desc.dbState = CL5_STATE_OPEN; + } + +done:; + PR_RWLock_Unlock (s_cl5Desc.stLock); + + return rc; +} + +/* Name: cl5Close + Description: closes changelog; waits until all threads are done using changelog; + call is ignored if changelog is already closed. + Parameters: none + Return: CL5_SUCCESS if successful; + CL5_BAD_STATE if db is not in the open or closed state; + CL5_SYSTEM_ERROR if NSPR call fails; + CL5_DB_ERROR if db shutdown fails + */ +int cl5Close () +{ + int rc = CL5_SUCCESS; + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5Close: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + PR_RWLock_Wlock (s_cl5Desc.stLock); + + /* already closed - ignore */ + if (s_cl5Desc.dbState == CL5_STATE_CLOSED) + { + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5Close: changelog closed; request ignored\n"); + PR_RWLock_Unlock (s_cl5Desc.stLock); + return CL5_SUCCESS; + } + else if (s_cl5Desc.dbState != CL5_STATE_OPEN) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5Close: invalid state - %d\n", s_cl5Desc.dbState); + PR_RWLock_Unlock (s_cl5Desc.stLock); + return CL5_BAD_STATE; + } + + /* signal changelog closing to all threads */ + s_cl5Desc.dbState = CL5_STATE_CLOSING; + + PR_Lock(s_cl5Desc.clLock); + PR_NotifyCondVar(s_cl5Desc.clCvar); + PR_Unlock(s_cl5Desc.clLock); + + _cl5Close (); + + s_cl5Desc.dbState = CL5_STATE_CLOSED; + + PR_RWLock_Unlock (s_cl5Desc.stLock); + + return rc; +} + +/* Name: cl5Delete + Description: removes changelog; changelog must be in the closed state. + Parameters: dir - changelog directory + Return: CL5_SUCCESS if successful; + CL5_BAD_STATE if the changelog is not in closed state; + CL5_BAD_DATA if invalid directory supplied + CL5_SYSTEM_ERROR if NSPR call fails + */ +int cl5Delete (const char *dir) +{ + int rc; + + if (dir == NULL) + { + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, "cl5Delete: null directory\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5Delete: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + PR_RWLock_Wlock (s_cl5Desc.stLock); + + if (s_cl5Desc.dbState != CL5_STATE_CLOSED) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5Delete: invalid state - %d\n", s_cl5Desc.dbState); + PR_RWLock_Unlock (s_cl5Desc.stLock); + return CL5_BAD_STATE; + } + + rc = _cl5Delete (dir, PR_TRUE /* remove changelog dir */); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5Delete: failed to remove changelog\n"); + } + + PR_RWLock_Unlock (s_cl5Desc.stLock); + return rc; +} + +/* Name: cl5OpenDB + Description: opens changelog file for specified file + Parameters: replica - replica whose file we wish to open + Return: CL5_SUCCESS if successful; + CL5_BAD_STATE if the changelog is not initialized; + CL5_BAD_DATA - if NULL id is supplied + */ +int cl5OpenDB (Object *replica) +{ + int rc; + + if (replica == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5OpenDB: null replica\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5OpenDB: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog stays open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5DBOpenFile (replica, NULL /* file object */, PR_TRUE /* check for duplicates */); + + _cl5RemoveThread (); + + return rc; +} + +/* Name: cl5CloseDB + Description: closes changelog file for the specified replica + Parameters: replica - replica whose file we wish to close + Return: CL5_SUCCESS if successful; + CL5_BAD_STATE if the changelog is not initialized; + CL5_BAD_DATA - if NULL id is supplied + CL5_NOTFOUND - nothing is known about specified database + */ +int cl5CloseDB (Object *replica) +{ + int rc; + Object *obj; + + if (replica == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5CloseDB: null replica\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5CloseDB: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog is open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5GetDBFile (replica, &obj); + if (rc == CL5_SUCCESS) + { + rc = objset_remove_obj(s_cl5Desc.dbFiles, obj); + object_release (obj); + } + else + { + Replica *r; + + r = (Replica*)object_get_data (replica); + PR_ASSERT (r); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5CloseDB: failed to close file for replica at (%s)\n", + slapi_sdn_get_dn (replica_get_root (r))); + } + + _cl5RemoveThread (); + return rc; +} + +/* Name: cl5DeleteDB + Description: asynchronously removes changelog file for the specified replica. + The file is physically removed when it is no longer in use. + This function is called when a backend is removed or reloaded. + Parameters: replica - replica whose file we wish to delete + Return: CL5_SUCCESS if successful; + CL5_BAD_STATE if the changelog is not initialized; + CL5_BAD_DATA - if NULL id is supplied + CL5_NOTFOUND - nothing is known about specified database + */ +int cl5DeleteDB (Object *replica) +{ + Object *obj; + int rc; + + if (replica == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5DeleteDB: invalid database id\n"); + return CL5_BAD_DATA; + } + + /* changelog is not initialized */ + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDB: " + "changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog stays open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5GetDBFile (replica, &obj); + if (rc == CL5_SUCCESS) + { + _cl5DBDeleteFile (obj); + } + else + { + Replica *r = (Replica*)object_get_data (replica); + PR_ASSERT (r); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDB: " + "file for replica at (%s) not found\n", + slapi_sdn_get_dn (replica_get_root (r))); + } + + _cl5RemoveThread (); + return rc; +} + +/* Name: cl5DeleteDBSync + Description: The same as cl5DeleteDB except the function does not return + until the file is removed. +*/ +int cl5DeleteDBSync (Object *replica) +{ + Object *obj; + int rc; + CL5DBFile *file; + char fName [MAXPATHLEN + 1]; + + if (replica == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5DeleteDBSync: invalid database id\n"); + return CL5_BAD_DATA; + } + + /* changelog is not initialized */ + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync: " + "changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog stays open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5GetDBFile (replica, &obj); + if (rc == CL5_SUCCESS) + { + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, file->name); + + _cl5DBDeleteFile (obj); + + /* wait until the file is gone */ + while (PR_Access (fName, PR_ACCESS_EXISTS) == PR_SUCCESS) + { + DS_Sleep (PR_MillisecondsToInterval(100)); + } + + } + else + { + Replica *r = (Replica*)object_get_data (replica); + PR_ASSERT (r); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync: " + "file for replica at (%s) not found\n", + slapi_sdn_get_dn (replica_get_root (r))); + } + + _cl5RemoveThread (); + return rc; +} + +/* Name: cl5GetUpperBoundRUV + Description: retrieves vector for that represnts the upper bound of the changes for a replica. + Parameters: r - replica for which the purge vector is requested + ruv - contains a copy of the purge ruv if function is successful; + unchanged otherwise. It is responsobility pf the caller to free + the ruv when it is no longer is in use + Return: CL5_SUCCESS if function is successfull + CL5_BAD_STATE if the changelog is not initialized; + CL5_BAD_DATA - if NULL id is supplied + CL5_NOTFOUND, if changelog file for replica is not found + */ +int cl5GetUpperBoundRUV (Replica *r, RUV **ruv) +{ + int rc; + Object *r_obj, *file_obj; + CL5DBFile *file; + + if (r == NULL || ruv == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5GetUpperBoundRUV: invalid parameters\n"); + return CL5_BAD_DATA; + } + + /* changelog is not initialized */ + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV: " + "changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog stays open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + /* create a temporary replica object because of the interface we have */ + r_obj = object_new (r, NULL); + + rc = _cl5GetDBFile (r_obj, &file_obj); + if (rc == CL5_SUCCESS) + { + file = (CL5DBFile*)object_get_data (file_obj); + PR_ASSERT (file && file->maxRUV); + + *ruv = ruv_dup (file->maxRUV); + + object_release (file_obj); + } + + object_release (r_obj); + + _cl5RemoveThread (); + return rc; +} + +/* Name: cl5Backup + Description: makes a backup of the changelog including *.db2, + log files, and dbversion. Can be called with the changelog in either open or + closed state. + Parameters: bkDir - directory to which the data is backed up; + created if it does not exist + replicas - optional list of replicas whose changes should be backed up; + if the list is NULL, entire changelog is backed up. + Return: CL5_SUCCESS if function is successful; + CL5_BAD_DATA if invalid directory is passed; + CL5_BAD_STATE if changelog has not been initialized; + CL5_DB_ERROR if db call fails; + CL5_SYSTEM_ERROR if NSPR call or file copy failes. + */ +int cl5Backup (const char *bkDir, Object **replicas) +{ + int rc; + char **list = NULL; + char **logFile; + char srcFile [MAXPATHLEN + 1]; + char destFile[MAXPATHLEN + 1]; + DB_TXN *txn = NULL; + + if (bkDir == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5Backup: null backup directory\n"); + return CL5_BAD_DATA; + } + + /* changelog must be initialized */ + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog stays open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + /* create backup directory if necessary */ + rc = cl5CreateDirIfNeeded (bkDir); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: failed to create backup directory\n"); + goto done; + } + + /* start transaction to tempararily prevent transaction log + from being trimmed + */ + rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL /*pid*/, &txn, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: failed to begin transaction; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } + + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5Backup: starting changelog backup from %s to %s ...\n", s_cl5Desc.dbDir, bkDir); + + /* The following files are backed up: *.<dbext>, log files, dbversion file */ + + /* copy db file */ + /* ONREPL currently, list of replicas is ignored because db code can't handle + discrepancy between transaction log and present files; should be fixed before 5.0 ships */ + rc = _cl5CopyDBFiles (s_cl5Desc.dbDir, bkDir, replicas); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup : failed to copy database files from %s to %s\n", s_cl5Desc.dbDir, bkDir); + goto done; + } + + /* copy db log files */ + rc = LOG_ARCHIVE(s_cl5Desc.dbEnv, &list, DB_ARCH_LOG, malloc); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: failed to get list of log files; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_SYSTEM_ERROR; + goto done; + } + + if (list) + { + logFile = list; + while (*logFile) + { + PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, *logFile); + PR_snprintf(destFile, MAXPATHLEN, "%s/%s", bkDir, *logFile); + rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: failed to copy %s\n", *logFile); + rc = CL5_SYSTEM_ERROR; + goto done; + } + + logFile ++; + } + + free(list); + } + + /* now, copy the version file */ + PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, VERSION_FILE); + PR_snprintf(destFile, MAXPATHLEN, "%s/%s", bkDir, VERSION_FILE); + rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: failed to copy %s\n", VERSION_FILE); + rc = CL5_SYSTEM_ERROR; + goto done; + } + + rc = CL5_SUCCESS; + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5Backup: changelog backup is finished \n"); +done:; + if (txn && TXN_ABORT (txn) != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Backup: failed to abort transaction; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + } + + _cl5RemoveThread (); + + return rc; +} + +/* Name: cl5Restore + Description: restores changelog from the backed up copy. Changelog must be ibnitalized and closed. + Parameters: clDir - changelog dir + bkDir - directory that contains the backup + replicas - optional list of replicas whose changes should be recovered; + if the list is NULL, entire changelog is recovered. + Return: CL5_SUCCESS if function is successfull; + CL5_BAD_DATA if invalid parameter is passed; + CL5_BAD_STATE if changelog is open or not initialized; + CL5_DB_ERROR if db call fails; + CL5_SYSTEM_ERROR if NSPR call of file copy fails + */ +int cl5Restore (const char *clDir, const char *bkDir, Object **replicas) +{ + int rc; + char srcFile[MAXPATHLEN + 1]; + char destFile[MAXPATHLEN + 1]; + PRDir *prDir; + PRDirEntry *prDirEntry; + int seenLog = 0; /* Tells us if we restored any logfiles */ + + if (clDir == NULL || bkDir == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5Restore: null parameter\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Restore: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* prevent state change while recovery is in progress */ + PR_RWLock_Wlock (s_cl5Desc.stLock); + + if (s_cl5Desc.dbState != CL5_STATE_CLOSED) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Restore: changelog must be closed\n"); + PR_RWLock_Unlock (s_cl5Desc.stLock); + return CL5_BAD_STATE; + } + + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5Restore: starting changelog recovery from %s to %s ...\n", bkDir, clDir); + + /* delete current changelog content */ + rc = _cl5Delete (clDir, PR_FALSE); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Restore: failed to remove changelog\n"); + goto done; + } + + /* We copy the files over from the staging area */ + prDir = PR_OpenDir(bkDir); + if (prDir == NULL) + { + rc = CL5_SYSTEM_ERROR; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Restore: unable to access backup directory %s; NSPR error - %d\n", + bkDir, PR_GetError ()); + goto done; + } + + while (NULL != (prDirEntry = PR_ReadDir(prDir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) + { + if (NULL == prDirEntry->name) /* NSPR doesn't behave like the docs say it should */ + { + break; + } + + /* Log files have names of the form "log.xxxxx". We detect these by looking for + the prefix "log." and the lack of the ".<dbext>" suffix */ + seenLog |= _cl5IsLogFile(prDirEntry->name); + + /* ONREPL currently, list of replicas is ignored because db code can't handle discrepancy + between transaction log and present files; this should change before 5.0 ships */ + PR_snprintf(destFile, MAXPATHLEN, "%s/%s", clDir, prDirEntry->name); + PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", bkDir, prDirEntry->name); + rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE); + if (rc != 0) + { + rc = CL5_SYSTEM_ERROR; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Restore: failed to copy %s\n", prDirEntry->name); + PR_CloseDir(prDir); + goto done; + } + } + + PR_CloseDir(prDir); + + /* now open and close changelog to create all necessary files */ + if (seenLog) + rc = _cl5Open (clDir, NULL, CL5_OPEN_RESTORE_RECOVER); + else + rc = _cl5Open (clDir, NULL, CL5_OPEN_RESTORE); + + if (rc == CL5_SUCCESS) + { + _cl5Close (); + + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5Restore: changelog recovery is finished \n"); + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5Restore: failed open changelog after recovery\n"); + } + +done:; + PR_RWLock_Unlock (s_cl5Desc.stLock); + return rc; +} + +/* Name: cl5ExportLDIF + Description: dumps changelog to an LDIF file; changelog can be open or closed. + Parameters: clDir - changelog dir + ldifFile - full path to ldif file to write + replicas - optional list of replicas whose changes should be exported; + if the list is NULL, entire changelog is exported. + Return: CL5_SUCCESS if function is successfull; + CL5_BAD_DATA if invalid parameter is passed; + CL5_BAD_STATE if changelog is not initialized; + CL5_DB_ERROR if db api fails; + CL5_SYSTEM_ERROR if NSPR call fails; + CL5_MEMORY_ERROR if memory allocation fials. + */ +int cl5ExportLDIF (const char *ldifFile, Object **replicas) +{ + int i; + int rc; + PRFileDesc *prFile = NULL; + Object *obj; + + if (ldifFile == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ExportLDIF: null ldif file name\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ExportLDIF: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog is open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + prFile = PR_Open (ldifFile, PR_WRONLY | PR_CREATE_FILE | PR_TRUNCATE, 0600); + if (prFile == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ExportLDIF: failed to open (%s) file; NSPR error - %d\n", + ldifFile, PR_GetError ()); + rc = CL5_SYSTEM_ERROR; + goto done; + } + + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5ExportLDIF: starting changelog export to (%s) ...\n", ldifFile); + + if (replicas) /* export only selected files */ + { + for (i = 0; replicas[i]; i++) + { + rc = _cl5GetDBFile (replicas[i], &obj); + if (rc == CL5_SUCCESS) + { + rc = _cl5ExportFile (prFile, obj); + object_release (obj); + } + else + { + Replica *r = (Replica*)object_get_data (replicas[i]); + + PR_ASSERT (r); + + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5ExportLDIF: " + "failed to locate changelog file for replica at (%s)\n", + slapi_sdn_get_dn (replica_get_root (r))); + } + } + } + else /* export all files */ + { + for (obj = objset_first_obj(s_cl5Desc.dbFiles); obj; + obj = objset_next_obj(s_cl5Desc.dbFiles, obj)) + { + rc = _cl5ExportFile (prFile, obj); + object_release (obj); + } + } + + rc = CL5_SUCCESS; +done:; + + _cl5RemoveThread (); + + if (rc == CL5_SUCCESS) + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5ExportLDIF: changelog export is finished.\n"); + + if (prFile) + PR_Close (prFile); + + return rc; +} + +/* Name: cl5ImportLDIF + Description: imports ldif file into changelog; changelog must be in the closed state + Parameters: clDir - changelog dir + ldifFile - absolute path to the ldif file to import + replicas - optional list of replicas whose data should be imported; + if the list is NULL, all data in the file is imported. + Return: CL5_SUCCESS if function is successfull; + CL5_BAD_DATA if invalid parameter is passed; + CL5_BAD_STATE if changelog is open or not inititalized; + CL5_DB_ERROR if db api fails; + CL5_SYSTEM_ERROR if NSPR call fails; + CL5_MEMORY_ERROR if memory allocation fials. + */ +int cl5ImportLDIF (const char *clDir, const char *ldifFile, Object **replicas) +{ + FILE *file; + int rc; + char *buff; + int lineno = 0; + slapi_operation_parameters op; + Object *replica = NULL; + char *replGen = NULL; + + /* validate params */ + if (ldifFile == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: null ldif file name\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that nobody change changelog state while import is in progress */ + PR_RWLock_Wlock (s_cl5Desc.stLock); + + /* make sure changelog is closed */ + if (s_cl5Desc.dbState != CL5_STATE_CLOSED) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: invalid state - %d \n", s_cl5Desc.dbState); + + PR_RWLock_Unlock (s_cl5Desc.stLock); + return CL5_BAD_STATE; + } + + /* open LDIF file */ + file = fopen (ldifFile, "r"); /* XXXggood Does fopen reliably work if > 255 files open? */ + if (file == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: failed to open (%s) ldif file; system error - %d\n", + ldifFile, errno); + rc = CL5_SYSTEM_ERROR; + goto done; + } + + /* remove changelog */ + rc = _cl5Delete (clDir, PR_FALSE); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: failed to remove changelog\n"); + goto done; + } + + /* open changelog */ + rc = _cl5Open (clDir, NULL, CL5_OPEN_LDIF2CL); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: failed to open changelog\n"); + goto done; + } + + /* read entries and write them to changelog */ + while ((buff = ldif_get_entry( file, &lineno )) != NULL) + { + rc = _cl5LDIF2Operation (buff, &op, &replGen); + slapi_ch_free ((void**)&buff); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: failed to convert LDIF fragment to LDAP operation; " + "end of fragment line number - %d\n", lineno); + goto done; + } + + /* if we perform selective import, check if the operation should be wriiten to changelog */ + replica = _cl5GetReplica (&op, replGen); + if (replica == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: failed to locate replica for target dn (%s) and " + "replica generation %s\n", op.target_address.dn, replGen); + + slapi_ch_free ((void**)&replGen); + operation_parameters_done (&op); + goto done; + } + + if (!replicas || _cl5ReplicaInList (replica, replicas)) + { + /* write operation creates the file if it does not exist */ + rc = _cl5WriteOperation (replica_get_name ((Replica*)object_get_data(replica)), + replGen, &op, 1); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ImportLDIF: failed to write operation to the changelog\n"); + object_release (replica); + slapi_ch_free ((void**)&replGen); + operation_parameters_done (&op); + goto done; + } + } + + object_release (replica); + slapi_ch_free ((void**)&replGen); + operation_parameters_done (&op); + } + +done:; + _cl5Close (); + PR_RWLock_Unlock (s_cl5Desc.stLock); + return rc; +} + +/* Name: cl5GetState + Description: returns database state + Parameters: none + Return: changelog state + */ +int cl5GetState () +{ + return s_cl5Desc.dbState; +} + +/* Name: cl5ConfigTrimming + Description: sets changelog trimming parameters; changelog must be open. + Parameters: maxEntries - maximum number of entries in the chnagelog (in all files); + maxAge - maximum entry age; + Return: CL5_SUCCESS if successful; + CL5_BAD_STATE if changelog is not open + */ +int cl5ConfigTrimming (int maxEntries, const char *maxAge) +{ + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5ConfigTrimming: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure changelog is not closed while trimming configuration + is updated.*/ + _cl5AddThread (); + + PR_Lock (s_cl5Desc.dbTrim.lock); + + if (maxAge) + { + /* don't ignore this argument */ + if (strcmp (maxAge, CL5_STR_IGNORE) != 0) + { + s_cl5Desc.dbTrim.maxAge = age_str2time (maxAge); + } + } + else + { + /* unlimited */ + s_cl5Desc.dbTrim.maxAge = 0; + } + + if (maxEntries != CL5_NUM_IGNORE) + { + s_cl5Desc.dbTrim.maxEntries = maxEntries; + } + + PR_Unlock (s_cl5Desc.dbTrim.lock); + + _cl5RemoveThread (); + + return CL5_SUCCESS; +} + +/* Name: cl5GetOperation + Description: retireves operation specified by its csn and databaseid + Parameters: op - must contain csn and databaseid; the rest of data is + filled if function is successfull + Return: CL5_SUCCESS if function is successfull; + CL5_BAD_DATA if invalid op is passed; + CL5_BAD_STATE if db has not been initialized; + CL5_NOTFOUND if entry was not found; + CL5_DB_ERROR if any other db error occured; + CL5_BADFORMAT if db data format does not match entry format. + */ +int cl5GetOperation (Object *replica, slapi_operation_parameters *op) +{ + int rc; + char *agmt_name; + + agmt_name = get_thread_private_agmtname(); + + if (replica == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation: NULL replica\n"); + return CL5_BAD_DATA; + } + + if (op == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation: NULL operation\n"); + return CL5_BAD_DATA; + } + + if (op->csn == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "%s: cl5GetOperation: operation contains no CSN\n", agmt_name); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "%s: cl5GetOperation: changelog is not initialized\n", agmt_name); + return CL5_BAD_STATE; + } + + /* make sure that changelog is open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5GetOperation (replica, op); + + _cl5RemoveThread (); + + return rc; +} + +/* Name: cl5GetFirstOperation + Description: retrieves first operation for a particular database + replica - replica for which the operation should be retrieved. + Parameters: op - buffer to store the operation; + iterator - to be passed to the call to cl5GetNextOperation + Return: CL5_SUCCESS, if successful + CL5_BADDATA, if operation is NULL + CL5_BAD_STATE, if changelog is not open + CL5_DB_ERROR, if db call fails + */ +int cl5GetFirstOperation (Object *replica, slapi_operation_parameters *op, void **iterator) +{ + int rc; + CL5Entry entry; + Object *obj; + char *agmt_name; + + if (replica == NULL || op == NULL || iterator == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5GetFirstOperation: invalid argument\n"); + return CL5_BAD_DATA; + } + + *iterator = NULL; + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + agmt_name = get_thread_private_agmtname(); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "%s: cl5GetFirstOperation: changelog is not initialized\n", agmt_name); + return CL5_BAD_STATE; + } + + /* make sure that changelog stays open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5GetDBFile (replica, &obj); + if (rc != CL5_SUCCESS) + { + _cl5RemoveThread (); + return rc; + } + + entry.op = op; + /* Callers of this function should cl5_operation_parameters_done(op) */ + rc = _cl5GetFirstEntry (obj, &entry, iterator, NULL); + object_release (obj); + + _cl5RemoveThread (); + + return rc; +} + +/* Name: cl5GetNextOperation + Description: retrieves the next op from the changelog as defined by the iterator; + changelog must be open. + Parameters: op - returned operation, if function is successful + iterator - in: identifies op to retrieve; out: identifies next op + Return: CL5_SUCCESS, if successful + CL5_BADDATA, if op is NULL + CL5_BAD_STATE, if changelog is not open + CL5_NOTFOUND, empty changelog + CL5_DB_ERROR, if db call fails + */ +int cl5GetNextOperation (slapi_operation_parameters *op, void *iterator) +{ + CL5Entry entry; + + if (op == NULL || iterator == NULL || !_cl5IsValidIterator (iterator)) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5GetNextOperation: invalid argument\n"); + return CL5_BAD_DATA; + } + + if (s_cl5Desc.dbState != CL5_STATE_OPEN) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5GetNextOperation: changelog is not open\n"); + return CL5_BAD_STATE; + } + + /* we don't need to increment thread count since cl5GetFirstOperation + locked the file through which we are iterating */ + entry.op = op; + /* Callers of this function should cl5_operation_parameters_done(op) */ + return _cl5GetNextEntry (&entry, iterator); +} + +/* Name: cl5DestroyIterator + Description: destroys iterator once iteration through changelog is done + Parameters: iterator - iterator to destroy + Return: none + */ +void cl5DestroyIterator (void *iterator) +{ + CL5Iterator *it = (CL5Iterator*)iterator; + + if (it == NULL) + return; + + /* close cursor */ + if (it->cursor) + it->cursor->c_close (it->cursor); + + if (it->file) + object_release (it->file); + + slapi_ch_free ((void**)&it); +} + +/* Name: cl5WriteOperation + Description: writes operation to changelog + Parameters: replName - name of the replica to which operation applies + replGen - replica generation for the operation + !!!Note that we pass name and generation rather than + replica object since generation can change while operation + is in progress (if the data is reloaded). !!! + op - operation to write + local - this is a non-replicated operation + Return: CL5_SUCCESS if function is successfull; + CL5_BAD_DATA if invalid op is passed; + CL5_BAD_STATE if db has not been initialized; + CL5_MEMORY_ERROR if memory allocation failed; + CL5_DB_ERROR if any other db error occured; + */ +int cl5WriteOperation(const char *replName, const char *replGen, + const slapi_operation_parameters *op, PRBool local) +{ + int rc; + + if (op == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5WriteOperation: NULL operation passed\n"); + return CL5_BAD_DATA; + } + + if (!IsValidOperation (op)) + { + return CL5_BAD_DATA; + } + + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5WriteOperation: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog is open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return rc; + + rc = _cl5WriteOperation(replName, replGen, op, local); + + /* update the upper bound ruv vector */ + if (rc == CL5_SUCCESS) + { + Object *file_obj = NULL; + + if ( _cl5GetDBFileByReplicaName (replName, replGen, &file_obj) == CL5_SUCCESS) { + rc = _cl5UpdateRUV (file_obj, op->csn, PR_FALSE, PR_FALSE); + object_release (file_obj); + } + + } + + _cl5RemoveThread (); + + return rc; +} + +/* Name: cl5CreateReplayIterator + Description: creates an iterator that allows to retireve changes that should + to be sent to the consumer identified by ruv. The iteration is peformed by + repeated calls to cl5GetNextOperationToReplay. + Parameters: replica - replica whose data we wish to iterate; + ruv - consumer ruv; + iterator - iterator to be passed to cl5GetNextOperationToReplay call + Return: CL5_SUCCESS, if function is successfull; + CL5_MISSING_DATA, if data that should be in the changelog is missing + CL5_PURGED_DATA, if some data that consumer needs has been purged. + Note that the iterator can be non null if the supplier contains + some data that needs to be sent to the consumer + CL5_NOTFOUND if the consumer is up to data with respect to the supplier + CL5_BAD_DATA if invalid parameter is passed; + CL5_BAD_STATE if db has not been open; + CL5_DB_ERROR if any other db error occured; + CL5_MEMORY_ERROR if memory allocation fails. + Algorithm: Build a list of csns from consumer's and supplier's ruv. For each element + of the consumer's ruv put max csn into the csn list. For each element + of the supplier's ruv not in the consumer's ruv put min csn from the + supplier's ruv into the list. The list contains, for each known replica, + the starting point for changes to be sent to the consumer. + Sort the list in accending order. + Build a hash which contains, for each known replica, whether the + supplier can bring the consumer up to data with respect to that replica. + The hash is used to decide whether a change can be sent to the consumer + Find the replica with the smallest csn in the list for which + we can bring the consumer up to date. + Position the db cursor on the change entry that corresponds to this csn. + Hash entries are created for each replica traversed so far. sendChanges + flag is set to FALSE for all repolicas except the last traversed. + + */ +int cl5CreateReplayIterator (Private_Repl_Protocol *prp, const RUV *consumerRuv, + CL5ReplayIterator **iterator) +{ + int rc; + Object *replica; + Object *obj = NULL; + + replica = prp->replica_object; + if (replica == NULL || consumerRuv == NULL || iterator == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5CreateReplayIterator: invalid parameter\n"); + return CL5_BAD_DATA; + } + + *iterator = NULL; + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5CreateReplayIterator: changelog is not initialized\n"); + return CL5_BAD_STATE; + } + + /* make sure that changelog is open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS ) return rc; + + + rc = _cl5GetDBFile (replica, &obj); + if (rc == CL5_SUCCESS) + { + /* iterate through the ruv in csn order to find first master for which + we can replay changes */ + ReplicaId consumerRID = agmt_get_consumer_rid ( prp->agmt, prp->conn ); + rc = _cl5PositionCursorForReplay (consumerRID, consumerRuv, replica, obj, iterator); + if (rc != CL5_SUCCESS) + { + if (obj) + object_release (obj); + } + } + + _cl5RemoveThread (); + + return rc; +} + +/* Name: cl5GetNextOperationToReplay + Description: retrieves next operation to be sent to a particular consumer and + that was created on a particular master. Consumer and master info + is encoded in the iterator parameter that must be created by call + to cl5CreateReplayIterator. + Parameters: iterator - iterator that identifies next entry to retrieve; + op - operation retrieved if function is successful + Return: CL5_SUCCESS if function is successfull; + CL5_BAD_DATA if invalid parameter is passed; + CL5_NOTFOUND if end of iteration list is reached + CL5_DB_ERROR if any other db error occured; + CL5_BADFORMAT if data in db is of unrecognized format; + CL5_MEMORY_ERROR if memory allocation fails. + Algorithm: Iterate through changelog entries until a change is found that + originated at the replica for which we are sending changes + (based on the information in the iteration hash) and + whose csn is larger than the csn already seen by the consumer + If change originated at the replica not in the hash, + determine whether we should send changes originated at the replica + and add replica entry into the hash. We can send the changes for + the replica if the current csn is smaller or equal to the csn + in the consumer's ruv (if present) or if it is equal to the min + csn in the supplier's ruv. + */ +int +cl5GetNextOperationToReplay (CL5ReplayIterator *iterator, CL5Entry *entry) +{ + CSN *csn; + char *key, *data; + size_t keylen, datalen; + char *agmt_name; + int rc = 0; + + agmt_name = get_thread_private_agmtname(); + + if (entry == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "%s: cl5GetNextOperationToReplay: invalid parameter passed\n", agmt_name); + return CL5_BAD_DATA; + } + + rc = clcache_get_next_change (iterator->clcache, (void **)&key, &keylen, (void **)&data, &datalen, &csn); + + if (rc == DB_NOTFOUND) + { + /* + * Abort means we've figured out that we've passed the replica Min CSN, + * so we should stop looping through the changelog + */ + return CL5_NOTFOUND; + } + + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, NULL, "%s: cl5GetNextOperationToReplay: " + "failed to read next entry; DB error %d\n", agmt_name, rc); + return CL5_DB_ERROR; + } + + /* there is an entry we should return */ + /* Callers of this function should cl5_operation_parameters_done(op) */ + if ( 0 != cl5DBData2Entry ( data, datalen, entry ) ) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "%s: cl5GetNextOperationToReplay: failed to format entry rc=%d\n", agmt_name, rc); + return rc; + } + + return CL5_SUCCESS; +} + +/* Name: cl5DestroyReplayIterator + Description: destorys iterator + Parameters: iterator - iterator to destory + Return: none + */ +void cl5DestroyReplayIterator (CL5ReplayIterator **iterator) +{ + if (iterator == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5DestroyReplayIterator: invalid iterartor passed\n"); + return; + } + + clcache_return_buffer ( &(*iterator)->clcache ); + + if ((*iterator)->fileObj) + object_release ((*iterator)->fileObj); + + /* release supplier's ruv */ + if ((*iterator)->supplierRuvObj) + object_release ((*iterator)->supplierRuvObj); + + slapi_ch_free ((void **)iterator); +} + +/* Name: cl5DeleteOnClose + Description: marks changelog for deletion when it is closed + Parameters: flag; if flag = 1 then delete else don't + Return: none + */ +void cl5DeleteOnClose (PRBool rm) +{ + s_cl5Desc.dbRmOnClose = rm; +} + +/* Name: cl5GetDir + Description: returns changelog directory + Parameters: none + Return: copy of the directory; caller needs to free the string + */ + char *cl5GetDir () +{ + if (s_cl5Desc.dbDir == NULL) + { + return NULL; + } + else + { + return slapi_ch_strdup (s_cl5Desc.dbDir); + } +} + +/* Name: cl5Exist + Description: checks if a changelog exists in the specified directory; + We consider changelog to exist if it contains the dbversion file. + Parameters: clDir - directory to check + Return: 1 - if changelog exists; 0 - otherwise + */ +PRBool cl5Exist (const char *clDir) +{ + char fName [MAXPATHLEN + 1]; + int rc; + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", clDir, VERSION_FILE); + rc = PR_Access (fName, PR_ACCESS_EXISTS); + + return (rc == PR_SUCCESS); +} + +/* Name: cl5GetOperationCount + Description: returns number of entries in the changelog. The changelog must be + open for the value to be meaningful. + Parameters: replica - optional parameter that specifies the replica whose operations + we wish to count; if NULL all changelog entries are counted + Return: number of entries in the changelog + */ + +int cl5GetOperationCount (Object *replica) +{ + Object *obj; + CL5DBFile *file; + int count = 0; + int rc; + + if (s_cl5Desc.dbState == CL5_STATE_NONE) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5GetOperationCount: changelog is not initialized\n"); + return -1; + } + + /* make sure that changelog is open while operation is in progress */ + rc = _cl5AddThread (); + if (rc != CL5_SUCCESS) + return -1; + + if (replica == NULL) /* compute total entry count */ + { + obj = objset_first_obj (s_cl5Desc.dbFiles); + while (obj) + { + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + count += file->entryCount; + obj = objset_next_obj (s_cl5Desc.dbFiles, obj); + } + } + else /* return count for particular db */ + { + /* select correct db file */ + rc = _cl5GetDBFile (replica, &obj); + if (rc == CL5_SUCCESS) + { + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + count = file->entryCount; + object_release (obj); + } + else + { + count = 0; + } + } + + _cl5RemoveThread (); + return count; +} + +/***** Helper Functions *****/ + +/* this call happens under state lock */ +static int _cl5Open (const char *dir, const CL5DBConfig *config, CL5OpenMode openMode) +{ + int rc; + PRBool didRecovery; + + PR_ASSERT (dir); + + /* setup db configuration parameters */ + if (config) + { + _cl5SetDBConfig (config); + } + else + { + _cl5SetDefaultDBConfig (); + } + + /* initialize trimming */ + rc = _cl5TrimInit (); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Open: failed to initialize trimming\n"); + goto done; + } + + /* create the changelog directory if it does not exist */ + rc = cl5CreateDirIfNeeded (dir); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Open: failed to create changelog directory (%s)\n", dir); + goto done; + } + + s_cl5Desc.dbDir = slapi_ch_strdup (dir); + + /* check database version */ + rc = _cl5CheckDBVersion (); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5Open: invalid db version\n"); + goto done; + } + + s_cl5Desc.dbOpenMode = openMode; + + /* initialize db environment */ + rc = _cl5AppInit (&didRecovery); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Open: failed to initialize db environment\n"); + goto done; + } + + /* open database files */ + rc = _cl5DBOpen (!didRecovery); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Open: failed to open changelog database\n"); + + goto done; + } + +done:; + + if (rc != CL5_SUCCESS) + { + _cl5Close (); + } + + return rc; +} + +int cl5CreateDirIfNeeded (const char *dirName) +{ + int rc; + char buff [MAXPATHLEN + 1]; + char *t; + + PR_ASSERT (dirName); + + rc = PR_Access(dirName, PR_ACCESS_EXISTS); + if (rc == PR_SUCCESS) + { + return CL5_SUCCESS; + } + + /* directory does not exist - try to create */ + strncpy (buff, dirName, MAXPATHLEN); + t = strchr (buff, '/'); + + /* skip first slash */ + if (t) + { + t = strchr (t+1, '/'); + } + + while (t) + { + *t = '\0'; + if (PR_Access (buff, PR_ACCESS_EXISTS) != PR_SUCCESS) + { + rc = PR_MkDir (buff, DIR_CREATE_MODE); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5CreateDirIfNeeded: failed to create dir (%s); NSPR error - %d\n", + dirName, PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + } + + *t++ = FILE_PATHSEP; + + t = strchr (t, '/'); + } + + /* last piece */ + rc = PR_MkDir (buff, DIR_CREATE_MODE); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5CreateDirIfNeeded: failed to create dir; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +static int _cl5RemoveEnv () +{ + DB_ENV *dbEnv = NULL; + int rc = 0; + + if ((rc = db_env_create(&dbEnv, 0)) != 0) + dbEnv = NULL; + + if (dbEnv == NULL) + { + char *errstr = db_strerror(rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5RemoveEnv: failed to allocate db environment; " + "db error - %d %s\n", rc, errstr ? errstr : "unknown"); + return CL5_MEMORY_ERROR; + } + rc = dbEnv->remove(dbEnv, s_cl5Desc.dbDir, DB_FORCE); + if (0 != rc) + { + char *errstr = db_strerror(rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5AppInit: failed to remove db environment; " + "db error - %d %s\n", rc, errstr ? errstr : "unknown"); + return CL5_DB_ERROR; + } + return CL5_SUCCESS; +} + +static int _cl5AppInit (PRBool *didRecovery) +{ + int rc; + unsigned int flags = DB_CREATE | DB_INIT_MPOOL | DB_THREAD; + DB_ENV *dbEnv; + if ((rc = db_env_create(&dbEnv, 0)) != 0) + dbEnv = NULL; + + if (dbEnv == NULL) + { + char *errstr = db_strerror(rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5AppInit: failed to allocate db environment; db error - %d (%s)\n", + rc, errstr ? errstr : "unknown"); + return CL5_MEMORY_ERROR; + } + + _cl5InitDBEnv (dbEnv); + + if (didRecovery) + *didRecovery = PR_FALSE; + + /* decide how two open based on the mode in which db is open */ + switch (s_cl5Desc.dbOpenMode) + { + case CL5_OPEN_NORMAL: + flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG; + /* check if need to initiate recovery */ + rc = _cl5CheckGuardian (); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5AppInit: recovering changelog after disorderly shutdown\n"); + flags |= DB_RECOVER; + } + break; + + case CL5_OPEN_RESTORE: + flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG; + break; + + case CL5_OPEN_CLEAN_RECOVER: + flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG | DB_RECOVER; + break; + + case CL5_OPEN_RESTORE_RECOVER: + flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG | DB_RECOVER_FATAL; + break; + + case CL5_OPEN_LDIF2CL: + /* ONREPL - don't think we need any extra flags here */ + break; + default: + /* fixme? CL5_OPEN_NONE */ + break; + } + + if (!s_cl5Desc.dbConfig.durableTrans) + { +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3200 + dbEnv->set_flags(dbEnv, DB_TXN_NOSYNC, 1); +#else + flags |= DB_TXN_NOSYNC; +#endif + } + + dbEnv->set_errcall(dbEnv, dblayer_log_print); + + /* do recovery if necessary */ + if ((flags & DB_RECOVER) || (flags & DB_RECOVER_FATAL)) + { + if (CL5_OPEN_CLEAN_RECOVER == s_cl5Desc.dbOpenMode) + _cl5RemoveEnv(); + + rc = _cl5Recover (flags, dbEnv); + if (rc != CL5_SUCCESS) + { + char *errstr = db_strerror(rc); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5AppInit: failed to recover changelog; db error - %d %s\n", + rc, errstr ? errstr : "unknown"); + + slapi_ch_free ((void **)&dbEnv); + + return rc; + } + + if (didRecovery) + *didRecovery = PR_TRUE; + flags &= ~(DB_RECOVER | DB_RECOVER_FATAL); + /* Need to reset the env */ + /* Does this leak the dbEnv? */ + if ((rc = db_env_create(&dbEnv, 0)) != 0) + dbEnv = NULL; + + if (dbEnv == NULL) + { + char *errstr = db_strerror(rc); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5AppInit: failed to allocate db environment after recovery; " + "db error - %d %s\n", rc, errstr ? errstr : "unknown"); + return CL5_MEMORY_ERROR; + } + _cl5InitDBEnv (dbEnv); + } + + rc = dbEnv->open(dbEnv, s_cl5Desc.dbDir, flags, + s_cl5Desc.dbConfig.fileMode); + if (rc == 0) + { + s_cl5Desc.dbEnv = dbEnv; + s_cl5Desc.dbEnvOpenFlags = flags; + return CL5_SUCCESS; + } + else + { + char *errstr = db_strerror(rc); + char flagstr[20]; + + flagstr[0] = 0; + /* EINVAL return means bad flags - let's see what the flags are */ + if (rc == EINVAL) + { + sprintf(flagstr, "%u", flags); + } + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5AppInit: db environment open failed; db error - %d %s %s\n", + rc, errstr ? errstr : "unknown", flagstr); + slapi_ch_free ((void **)&dbEnv); + return CL5_DB_ERROR; + } +} + +static int _cl5DBOpen () +{ + PRBool dbFile; + PRDir *dir; + PRDirEntry *entry = NULL; + int rc; + Object *replica; + + /* create lock that guarantees that each file is only added once to the list */ + s_cl5Desc.fileLock = PR_NewLock (); + + /* loop over all db files and open them; file name format is cl5_<dbid>.<dbext> */ + dir = PR_OpenDir(s_cl5Desc.dbDir); + if (dir == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DBOpen: failed to open changelog dir; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + + } + + /* initialize set of db file objects */ + s_cl5Desc.dbFiles = objset_new(NULL); + while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) + { + if (NULL == entry->name) + { + break; + } + + dbFile = _cl5FileName2Replica (entry->name, &replica); + if (dbFile) /* this is db file, not a log or dbversion; those are just skipped */ + { + /* we only open files for existing replicas */ + if (replica) + { + rc = _cl5DBOpenFile (replica, NULL /* file object */, + PR_FALSE /* check for duplicates */); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen: " + "Error opening file %s\n", + entry->name); + return rc; + } + + object_release (replica); + } + else /* there is no matching replica for the file - remove */ + { + char fullpathname[MAXPATHLEN]; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen: " + "file %s has no matching replica; removing\n", entry->name); + + PR_snprintf(fullpathname, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name); + if (PR_Delete(fullpathname) != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen: " + "failed to remove (%s) file; NSPR error - %d\n", + entry->name, PR_GetError ()); + + } + } + } + } + + PR_CloseDir(dir); + + return CL5_SUCCESS; +} + +/* this function assumes that the entry was validated + using IsValidOperation + + Data in db format: + ------------------ + <1 byte version><1 byte change_type><sizeof time_t time><null terminated csn> + <null terminated uniqueid><null terminated targetdn> + [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....] + + mod format: + ----------- + <1 byte modop><null terminated attr name><4 byte value count> + <4 byte value size><value1><4 byte value size><value2> +*/ +static int _cl5Entry2DBData (const CL5Entry *entry, char **data, PRUint32 *len) +{ + int size = 1 /* version */ + 1 /* operation type */ + sizeof (time_t); + char *pos; + PRUint32 t; + slapi_operation_parameters *op; + LDAPMod **add_mods = NULL; + char *rawDN = NULL; + char s[CSN_STRSIZE]; + + PR_ASSERT (entry && entry->op && data && len); + + op = entry->op; + + /* compute size of the buffer needed to hold the data */ + size += CSN_STRSIZE; + size += strlen (op->target_address.uniqueid) + 1; + + switch (op->operation_type) + { + case SLAPI_OPERATION_ADD: if (op->p.p_add.parentuniqueid) + size += strlen (op->p.p_add.parentuniqueid) + 1; + else + size ++; /* we just store NULL char */ + slapi_entry2mods (op->p.p_add.target_entry, &rawDN/* dn */, &add_mods); + size += strlen (rawDN) + 1; + size += _cl5GetModsSize (add_mods); + break; + + case SLAPI_OPERATION_MODIFY: size += strlen (op->target_address.dn) + 1; + size += _cl5GetModsSize (op->p.p_modify.modify_mods); + break; + + case SLAPI_OPERATION_MODRDN: size += strlen (op->target_address.dn) + 1; + /* 1 for deleteoldrdn */ + size += strlen (op->p.p_modrdn.modrdn_newrdn) + 2; + if (op->p.p_modrdn.modrdn_newsuperior_address.dn) + size += strlen (op->p.p_modrdn.modrdn_newsuperior_address.dn) + 1; + else + size ++; /* for NULL char */ + if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + size += strlen (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + 1; + else + size ++; /* for NULL char */ + size += _cl5GetModsSize (op->p.p_modrdn.modrdn_mods); + break; + + case SLAPI_OPERATION_DELETE: size += strlen (op->target_address.dn) + 1; + break; + } + + /* allocate data buffer */ + (*data) = (char *) slapi_ch_malloc (size); + if ((*data) == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Entry2DBData: failed to allocate data buffer\n"); + return CL5_MEMORY_ERROR; + } + + /* fill in the data buffer */ + pos = *data; + /* write a byte of version */ + (*pos) = V_5; + pos ++; + /* write change type */ + (*pos) = (unsigned char)op->operation_type; + pos ++; + /* write time */ + t = PR_htonl((PRUint32)entry->time); + memcpy (pos, &t, sizeof (t)); + pos += sizeof (t); + /* write csn */ + _cl5WriteString (csn_as_string(op->csn,PR_FALSE,s), &pos); + /* write UniqueID */ + _cl5WriteString (op->target_address.uniqueid, &pos); + + /* figure out what else we need to write depending on the operation type */ + switch (op->operation_type) + { + case SLAPI_OPERATION_ADD: _cl5WriteString (op->p.p_add.parentuniqueid, &pos); + _cl5WriteString (rawDN, &pos); + _cl5WriteMods (add_mods, &pos); + slapi_ch_free ((void**)&rawDN); + ldap_mods_free (add_mods, 1); + break; + + case SLAPI_OPERATION_MODIFY: _cl5WriteString (op->target_address.dn, &pos); + _cl5WriteMods (op->p.p_modify.modify_mods, &pos); + break; + + case SLAPI_OPERATION_MODRDN: _cl5WriteString (op->target_address.dn, &pos); + _cl5WriteString (op->p.p_modrdn.modrdn_newrdn, &pos); + *pos = (PRUint8)op->p.p_modrdn.modrdn_deloldrdn; + pos ++; + _cl5WriteString (op->p.p_modrdn.modrdn_newsuperior_address.dn, &pos); + _cl5WriteString (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos); + _cl5WriteMods (op->p.p_modrdn.modrdn_mods, &pos); + break; + + case SLAPI_OPERATION_DELETE: _cl5WriteString (op->target_address.dn, &pos); + break; + } + + (*len) = size; + + return CL5_SUCCESS; +} + +/* + Data in db format: + ------------------ + <1 byte version><1 byte change_type><sizeof time_t time><null terminated dbid> + <null terminated csn><null terminated uniqueid><null terminated targetdn> + [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....] + + mod format: + ----------- + <1 byte modop><null terminated attr name><4 byte value count> + <4 byte value size><value1><4 byte value size><value2> +*/ + + +int +cl5DBData2Entry (const char *data, PRUint32 len, CL5Entry *entry) +{ + int rc; + PRUint8 version; + char *pos = (char *)data; + char *strCSN; + PRUint32 thetime; + slapi_operation_parameters *op; + LDAPMod **add_mods; + char *rawDN; + char s[CSN_STRSIZE]; + + PR_ASSERT (data && entry && entry->op); + + /* ONREPL - check that we do not go beyond the end of the buffer */ + + /* read byte of version */ + version = (PRUint8)(*pos); + if (version != V_5) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5DBData2Entry: invalid data version\n"); + return CL5_BAD_FORMAT; + } + + op = entry->op; + + pos += sizeof(version); + + /* read change type */ + op->operation_type = (PRUint8)(*pos); + pos ++; + + /* need to do the copy first, to skirt around alignment problems on + certain architectures */ + memcpy((char *)&thetime,pos,sizeof(thetime)); + entry->time = (time_t)PR_ntohl(thetime); + pos += sizeof (thetime); + + /* read csn */ + _cl5ReadString (&strCSN, &pos); + if (op->csn == NULL || strcmp (strCSN, csn_as_string(op->csn,PR_FALSE,s)) != 0) + { + op->csn = csn_new_by_string (strCSN); + } + slapi_ch_free ((void**)&strCSN); + + /* read UniqueID */ + _cl5ReadString (&op->target_address.uniqueid, &pos); + + /* figure out what else we need to read depending on the operation type */ + switch (op->operation_type) + { + case SLAPI_OPERATION_ADD: _cl5ReadString (&op->p.p_add.parentuniqueid, &pos); + /* richm: need to free parentuniqueid */ + _cl5ReadString (&rawDN, &pos); + op->target_address.dn = rawDN; + /* convert mods to entry */ + rc = _cl5ReadMods (&add_mods, &pos); + slapi_mods2entry (&(op->p.p_add.target_entry), rawDN, add_mods); + ldap_mods_free (add_mods, 1); + break; + + case SLAPI_OPERATION_MODIFY: _cl5ReadString (&op->target_address.dn, &pos); + rc = _cl5ReadMods (&op->p.p_modify.modify_mods, &pos); + break; + + case SLAPI_OPERATION_MODRDN: _cl5ReadString (&op->target_address.dn, &pos); + _cl5ReadString (&op->p.p_modrdn.modrdn_newrdn, &pos); + op->p.p_modrdn.modrdn_deloldrdn = *pos; + pos ++; + _cl5ReadString (&op->p.p_modrdn.modrdn_newsuperior_address.dn, &pos); + _cl5ReadString (&op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos); + rc = _cl5ReadMods (&op->p.p_modrdn.modrdn_mods, &pos); + break; + + case SLAPI_OPERATION_DELETE: _cl5ReadString (&op->target_address.dn, &pos); + rc = CL5_SUCCESS; + break; + + default: rc = CL5_BAD_FORMAT; + slapi_log_error(SLAPI_LOG_FATAL, + repl_plugin_name_cl, + "cl5DBData2Entry: failed to format entry\n"); + break; + } + + return rc; +} + +/* thread management functions */ +static int _cl5DispatchDBThreads () +{ + if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void *)_cl5DeadlockMain, + NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE)) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DispatchDBThreads: failed to create deadlock thread; " + "NSPR error - %d\n", PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void *)_cl5CheckpointMain, + NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE)) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DispatchDBThreads: failed to create checkpoint thread; " + "NSPR error - %d\n", PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void *)_cl5TrickleMain, + NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE) ) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DispatchDBThreads: failed to create trickle thread; " + "NSPR error - %d\n", PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void*)_cl5TrimMain, + NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE) ) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DispatchDBThreads: failed to create trimming thread; " + "NSPR error - %d\n", PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +static int _cl5AddThread () +{ + /* lock the state lock so that nobody can change the state + while backup is in progress + */ + PR_RWLock_Rlock (s_cl5Desc.stLock); + + /* open changelog if it is not already open */ + if (s_cl5Desc.dbState != CL5_STATE_OPEN) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5AddThread: invalid changelog state - %d\n", s_cl5Desc.dbState); + PR_RWLock_Unlock (s_cl5Desc.stLock); + return CL5_BAD_STATE; + } + + /* increment global thread count to make sure that changelog does not close while + backup is in progress */ + PR_AtomicIncrement (&s_cl5Desc.threadCount); + + PR_RWLock_Unlock (s_cl5Desc.stLock); + + return CL5_SUCCESS; +} + +static void _cl5RemoveThread () +{ + PR_ASSERT (s_cl5Desc.threadCount > 0); + PR_AtomicDecrement (&s_cl5Desc.threadCount); +} + +/* data conversion functions */ +static void _cl5WriteString (const char *str, char **buff) +{ + if (str) + { + strcpy (*buff, str); + (*buff) += strlen (str) + 1; + } + else /* just write NULL char */ + { + (**buff) = '\0'; + (*buff) ++; + } +} + +static void _cl5ReadString (char **str, char **buff) +{ + if (str) + { + int len = strlen (*buff); + + if (len) + { + *str = slapi_ch_strdup (*buff); + (*buff) += len + 1; + } + else /* just null char - skip it */ + { + *str = NULL; + (*buff) ++; + } + } + else /* just skip this string */ + { + (*buff) += strlen (*buff) + 1; + } +} + +/* mods format: + ----------- + <4 byte mods count><mod1><mod2>... + + mod format: + ----------- + <1 byte modop><null terminated attr name><4 byte count> + <4 byte size><value1><4 byte size><value2>... + */ +static void _cl5WriteMods (LDAPMod **mods, char **buff) +{ + PRInt32 i; + char *mod_start; + PRInt32 count; + + if (mods == NULL) + return; + + /* skip mods count */ + mod_start = (*buff) + sizeof (count); + + /* write mods*/ + for (i=0; mods[i]; i++) + { + _cl5WriteMod (mods[i], &mod_start); + } + + count = PR_htonl(i); + memcpy (*buff, &count, sizeof (count)); + + (*buff) = mod_start; +} + +static void _cl5WriteMod (LDAPMod *mod, char **buff) +{ + char *pos; + PRInt32 count; + struct berval *bv; + Slapi_Mod smod; + + slapi_mod_init_byref(&smod, mod); + + pos = *buff; + /* write mod op */ + *pos = (PRUint8)slapi_mod_get_operation (&smod); + pos ++; + /* write attribute name */ + _cl5WriteString (slapi_mod_get_type (&smod), &pos); + + /* write value count */ + count = PR_htonl(slapi_mod_get_num_values(&smod)); + memcpy (pos, &count, sizeof (count)); + pos += sizeof (PRInt32); + + bv = slapi_mod_get_first_value (&smod); + while (bv) + { + _cl5WriteBerval (bv, &pos); + bv = slapi_mod_get_next_value (&smod); + } + + (*buff) = pos; + + slapi_mod_done (&smod); +} + +/* mods format: + ----------- + <4 byte mods count><mod1><mod2>... + + mod format: + ----------- + <1 byte modop><null terminated attr name><4 byte count> + {<4 byte size><value1><4 byte size><value2>... || + <null terminated str1> <null terminated str2>...} + */ + +static int _cl5ReadMods (LDAPMod ***mods, char **buff) +{ + char *pos = *buff; + int i; + int rc; + PRInt32 mod_count; + Slapi_Mods smods; + Slapi_Mod smod; + + /* need to copy first, to skirt around alignment problems on certain + architectures */ + memcpy((char *)&mod_count,*buff,sizeof(mod_count)); + mod_count = PR_ntohl(mod_count); + pos += sizeof (mod_count); + + slapi_mods_init (&smods , mod_count); + + for (i = 0; i < mod_count; i++) + { + rc = _cl5ReadMod (&smod, &pos); + if (rc != CL5_SUCCESS) + { + slapi_mods_done(&smods); + return rc; + } + + slapi_mods_add_smod(&smods, &smod); + } + + *buff = pos; + + *mods = slapi_mods_get_ldapmods_passout (&smods); + slapi_mods_done(&smods); + + return CL5_SUCCESS; +} + +static int _cl5ReadMod (Slapi_Mod *smod, char **buff) +{ + char *pos = *buff; + int i; + PRInt32 val_count; + char *type; + int op; + struct berval bv; + + op = (*pos) & 0x000000FF; + pos ++; + _cl5ReadString (&type, &pos); + + /* need to do the copy first, to skirt around alignment problems on + certain architectures */ + memcpy((char *)&val_count,pos,sizeof(val_count)); + val_count = PR_ntohl(val_count); + pos += sizeof (PRInt32); + + slapi_mod_init(smod, val_count); + slapi_mod_set_operation (smod, op|LDAP_MOD_BVALUES); + slapi_mod_set_type (smod, type); + slapi_ch_free ((void**)&type); + + for (i = 0; i < val_count; i++) + { + _cl5ReadBerval (&bv, &pos); + slapi_mod_add_value (smod, &bv); + slapi_ch_free((void **) &bv.bv_val); + } + + (*buff) = pos; + + return CL5_SUCCESS; +} + +static int _cl5GetModsSize (LDAPMod **mods) +{ + int size; + int i; + + if (mods == NULL) + return 0; + + size = sizeof (PRInt32); + for (i=0; mods[i]; i++) + { + size += _cl5GetModSize (mods[i]); + } + + return size; +} + +static int _cl5GetModSize (LDAPMod *mod) +{ + int size; + int i; + + size = 1 + strlen (mod->mod_type) + 1 + sizeof (mod->mod_op); + i = 0; + if (mod->mod_op & LDAP_MOD_BVALUES) /* values are in binary form */ + { + while (mod->mod_bvalues != NULL && mod->mod_bvalues[i] != NULL) + { + size += mod->mod_bvalues[i]->bv_len + sizeof (mod->mod_bvalues[i]->bv_len); + i++; + } + } + else /* string data */ + { + PR_ASSERT(0); /* ggood string values should never be used in the server */ + } + + return size; +} + +static void _cl5ReadBerval (struct berval *bv, char** buff) +{ + PRUint32 length = 0; + PRUint32 net_length = 0; + + PR_ASSERT (bv && buff); + + /***PINAKI need to do the copy first, to skirt around alignment problems on + certain architectures */ + /* DBDB : struct berval.bv_len is defined as unsigned long + * But code here expects it to be 32-bits in size. + * On 64-bit machines, this is not the case. + * I changed the code to consistently use 32-bit (4-byte) + * values on the encoded side. This means that it's + * possible to generate a huge berval that will not + * be encoded properly. However, this seems unlikely + * to happen in reality, and I felt that retaining the + * old on-disk format for the changely in the 64-bit + * version of the server was important. + */ + + memcpy((char *)&net_length, *buff, sizeof(net_length)); + length = PR_ntohl(net_length); + *buff += sizeof(net_length); + bv->bv_len = length; + + if (bv->bv_len > 0) { + bv->bv_val = (char*)slapi_ch_malloc (bv->bv_len); + memcpy (bv->bv_val, *buff, bv->bv_len); + *buff += bv->bv_len; + } + else { + bv->bv_val = NULL; + } +} + +static void _cl5WriteBerval (struct berval *bv, char** buff) +{ + PRUint32 length = 0; + PRUint32 net_length = 0; + + length = (PRUint32) bv->bv_len; + net_length = PR_htonl(length); + + memcpy(*buff, &net_length, sizeof (net_length)); + *buff += sizeof (net_length); + memcpy (*buff, bv->bv_val, length); + *buff += length; +} + +/* data format: <value count> <value size> <value> <value size> <value> ..... */ +static int _cl5ReadBervals (struct berval ***bv, char** buff, unsigned int size) +{ + PRInt32 count; + int i; + char *pos; + + PR_ASSERT (bv && buff); + + /* ONREPL - need to check that we don't go beyond the end of the buffer */ + + pos = *buff; + memcpy((char *)&count, pos, sizeof(count)); + count = PR_htonl (count); + pos += sizeof(count); + + /* allocate bervals */ + *bv = (struct berval **)slapi_ch_malloc ((count + 1) * sizeof (struct berval*)); + if (*bv == NULL) + { + return CL5_MEMORY_ERROR; + } + + for (i = 0; i < count; i++) + { + (*bv)[i] = (struct berval *)slapi_ch_malloc (sizeof (struct berval)); + if ((*bv)[i] == NULL) + { + ber_bvecfree(*bv); + return CL5_MEMORY_ERROR; + } + + _cl5ReadBerval ((*bv)[i], &pos); + } + + (*bv)[count] = NULL; + *buff = pos; + + return CL5_SUCCESS; +} + +/* data format: <value count> <value size> <value> <value size> <value> ..... */ +static int _cl5WriteBervals (struct berval **bv, char** buff, unsigned int *size) +{ + PRInt32 count, net_count; + char *pos; + int i; + + PR_ASSERT (bv && buff && size); + + /* compute number of values and size of the buffer to hold them */ + *size = sizeof (count); + for (count = 0; bv[count]; count ++) + { + *size += sizeof (bv[count]->bv_len) + bv[count]->bv_len; + } + + /* allocate buffer */ + *buff = (char*) slapi_ch_malloc (*size); + if (*buff == NULL) + { + *size = 0; + return CL5_MEMORY_ERROR; + } + + /* fill the buffer */ + pos = *buff; + net_count = PR_htonl(count); + memcpy (pos, &net_count, sizeof (net_count)); + pos += sizeof (net_count); + for (i = 0; i < count; i ++) + { + _cl5WriteBerval (bv[i], &pos); + } + + return CL5_SUCCESS; +} + +static int _cl5DeadlockMain (void *param) +{ + PRIntervalTime interval; + int rc; + + PR_AtomicIncrement (&s_cl5Desc.threadCount); + interval = PR_MillisecondsToInterval(100); + while (s_cl5Desc.dbState != CL5_STATE_CLOSING) + { + int aborted; + if ((rc = LOCK_DETECT(s_cl5Desc.dbEnv, 0, DB_LOCK_YOUNGEST, &aborted)) != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5DeadlockMain: lock_detect failed (%d transaction%s aborted); db error - %d %s\n", + aborted, (aborted == 1)? "":"s", rc, db_strerror(rc)); + } + else if (aborted) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5DeadlockMain: lock_detect succeeded, but %d transaction%s ha%s been aborted\n", + aborted, (aborted == 1)? "":"s", (aborted == 1)? "s":"ve"); + } + + DS_Sleep(interval); + } + + PR_AtomicDecrement (&s_cl5Desc.threadCount); + return 0; +} + +static int _cl5CheckpointMain (void *param) +{ + time_t lastCheckpointCompletion = 0; + PRIntervalTime interval; + int rc = -1; + + PR_AtomicIncrement (&s_cl5Desc.threadCount); + + interval = PR_MillisecondsToInterval(1000); + lastCheckpointCompletion = current_time(); + + while (s_cl5Desc.dbState != CL5_STATE_CLOSING) + { + /* Check to see if the checkpoint interval has elapsed */ + if (current_time() - lastCheckpointCompletion > s_cl5Desc.dbConfig.checkpointInterval) + { + rc = TXN_CHECKPOINT(s_cl5Desc.dbEnv, 0, 0, 0); + if (rc == 0) + { + lastCheckpointCompletion = current_time(); + } +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + else if (rc != DB_INCOMPLETE) /* real error happened */ + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CheckpointMain: checkpoint failed, db error - %d %s\n", + rc, db_strerror(rc)); + } +#endif + + /* According to dboreham, we are doing checkpoint twice + to reduce the number of transaction log files which need + to be retained at any time. */ + rc = TXN_CHECKPOINT(s_cl5Desc.dbEnv, 0, 0, 0); + if (rc == 0) + { + lastCheckpointCompletion = current_time(); + } +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + else if (rc != DB_INCOMPLETE) /* real error happened */ + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CheckpointMain: checkpoint failed, db error - %d %s\n", + rc, db_strerror(rc)); + } +#endif + + /* check if we should truncate logs */ + if (s_cl5Desc.dbConfig.circularLogging) + { + char **list = NULL; + char **listp = NULL; + int rc = -1; + char filename[MAXPATHLEN + 1]; + + /* find out which log files don't contain active txns */ + rc = LOG_ARCHIVE(s_cl5Desc.dbEnv, &list, 0, malloc); + if (0 == rc && NULL != list) + { + /* zap 'em ! */ + for (listp = list; *listp != NULL; ++listp) + { + PR_snprintf(filename, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir,*listp); + PR_Delete (filename); + } + slapi_ch_free((void **)&list); + } + } + } + + /* sleep for a while */ + /* why aren't we sleeping exactly the right amount of time ? */ + /* answer---because the interval might be changed after the server starts up */ + DS_Sleep(interval); + } + + PR_AtomicDecrement (&s_cl5Desc.threadCount); + return 0; +} + +static int _cl5TrickleMain (void *param) +{ + PRIntervalTime interval; + int pages_written; + int rc; + + PR_AtomicIncrement (&s_cl5Desc.threadCount); + interval = PR_MillisecondsToInterval(1000); + while (s_cl5Desc.dbState != CL5_STATE_CLOSING) + { + if ((rc = MEMP_TRICKLE(s_cl5Desc.dbEnv, + s_cl5Desc.dbConfig.tricklePercentage, &pages_written)) != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5TrickleMain: memp_trickle failed; db error - %d %s\n", + rc, db_strerror(rc)); + } + + DS_Sleep(interval); + } + + PR_AtomicDecrement (&s_cl5Desc.threadCount); + + return 0; +} + +/* upgrade from db33 to db41 + * 1. Run recovery on the database environment using the DB_ENV->open method + * 2. Remove any Berkeley DB environment using the DB_ENV->remove method + * 3. extention .db3 -> .db4 ### koko kara !!! + */ +static int _cl5Upgrade3_4(char *fromVersion, char *toVersion) +{ + PRDir *dir = NULL; + PRDirEntry *entry = NULL; + DB *thisdb = NULL; + CL5OpenMode backup; + int rc = 0; + + backup = s_cl5Desc.dbOpenMode; + s_cl5Desc.dbOpenMode = CL5_OPEN_CLEAN_RECOVER; + /* CL5_OPEN_CLEAN_RECOVER does 1 and 2 */ + rc = _cl5AppInit (NULL); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Upgrade3_4: failed to open the db env\n"); + return rc; + } + + dir = PR_OpenDir(s_cl5Desc.dbDir); + if (dir == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Upgrade3_4: failed to open changelog dir %s; NSPR error - %d\n", + s_cl5Desc.dbDir, PR_GetError ()); + goto out; + } + + while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) + { + if (NULL == entry->name) + { + break; + } + if (_cl5FileEndsWith(entry->name, DB_EXTENSION_DB3)) + { + char oName [MAXPATHLEN + 1]; + char nName [MAXPATHLEN + 1]; + char *p = NULL; + char c; + int baselen = 0; + PR_snprintf(oName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name); + p = strstr(oName, DB_EXTENSION_DB3); + if (NULL == p) + { + continue; + } + /* db->rename closes DB; need to create every time */ + rc = db_create(&thisdb, s_cl5Desc.dbEnv, 0); + if (0 != rc) { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Upgrade3_4: failed to get db handle\n"); + goto out; + } + + baselen = p - oName; + c = *p; + *p = '\0'; + PR_snprintf(nName, MAXPATHLEN+1, "%s", oName); + PR_snprintf(nName + baselen, MAXPATHLEN+1-baselen, "%s", DB_EXTENSION); + *p = c; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Upgrade3_4: renaming %s to %s\n", oName, nName); + rc = thisdb->rename(thisdb, (const char *)oName, NULL /* subdb */, + (const char *)nName, 0); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Upgrade3_4: failed to rename file (%s -> %s); " + "db error - %d %s\n", oName, nName, rc, db_strerror(rc)); + break; + } + } + } + /* update the version file */ + _cl5WriteDBVersion (); + + /* update the guardian file */ + _cl5WriteGuardian (); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "Upgrading from %s to %s is successfully done (%s)\n", + fromVersion, toVersion, s_cl5Desc.dbDir); +out: + if (NULL != dir) + { + PR_CloseDir(dir); + } + if (s_cl5Desc.dbEnv) + { + DB_ENV *dbEnv = s_cl5Desc.dbEnv; + dbEnv->close(dbEnv, 0); + s_cl5Desc.dbEnv = NULL; + } + return rc; +} + +static int _cl5CheckDBVersion () +{ + char clVersion [VERSION_SIZE + 1]; + char dbVersion [VERSION_SIZE + 1]; + int rc; + + if (!cl5Exist (s_cl5Desc.dbDir)) + { + /* this is new changelog - write DB version and guardian file */ + rc = _cl5WriteDBVersion (); + if (rc == CL5_SUCCESS) { + rc = _cl5WriteGuardian(); + } + } + else + { + PR_snprintf (clVersion, VERSION_SIZE, "%s/%s/%s", CL5_TYPE, REPL_PLUGIN_NAME, + CHANGELOG_DB_VERSION); + rc = _cl5ReadDBVersion (s_cl5Desc.dbDir, dbVersion); + + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CheckDBVersion: invalid dbversion\n"); + rc = CL5_BAD_DBVERSION; + } + else if (strcasecmp (clVersion, dbVersion) != 0) + { + char prevClVersion [VERSION_SIZE + 1]; + PR_snprintf (prevClVersion, VERSION_SIZE, "%s/%s/%s", + CL5_TYPE, REPL_PLUGIN_NAME, CHANGELOG_DB_VERSION_PREV); + if (strcasecmp (prevClVersion, dbVersion) == 0) + { + /* upgrade */ + rc = _cl5Upgrade3_4(prevClVersion, clVersion); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CheckDBVersion: upgrade %s -> %s failed\n", + CHANGELOG_DB_VERSION_PREV, CHANGELOG_DB_VERSION); + rc = CL5_BAD_DBVERSION; + } + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CheckDBVersion: invalid dbversion\n"); + rc = CL5_BAD_DBVERSION; + } + } + + } + + return rc; +} + +static int _cl5ReadDBVersion (const char *dir, char *clVersion) +{ + int rc; + PRFileDesc *file; + char fName [MAXPATHLEN + 1]; + char buff [BUFSIZ]; + PRInt32 size; + char *tok; + char * iter = NULL; + + if (clVersion) + { + clVersion [0] = '\0'; + } + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", dir, VERSION_FILE); + + file = PR_Open (fName, PR_RDONLY, 777); + if (file == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5ReadDBVersion: failed to open DBVERSION; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + size = slapi_read_buffer (file, buff, BUFSIZ); + if (size < 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5ReadDBVersion: failed to read DBVERSION; NSPR error - %d\n", + PR_GetError ()); + PR_Close (file); + return CL5_SYSTEM_ERROR; + } + + /* parse the data */ + buff[size]= '\0'; + tok = ldap_utf8strtok_r (buff, "\n", &iter); + if (tok) + { + if (clVersion) + { + strcpy(clVersion, tok); + } + } + + rc = PR_Close (file); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5ReadDBVersion: failed to close DBVERSION; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +static int _cl5WriteDBVersion () +{ + int rc; + PRFileDesc *file; + char fName [MAXPATHLEN + 1]; + char clVersion [VERSION_SIZE + 1]; + PRInt32 len, size; + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, VERSION_FILE); + + file = PR_Open (fName, PR_WRONLY | PR_CREATE_FILE, s_cl5Desc.dbConfig.fileMode); + if (file == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5WriteDBVersion: failed to open DBVERSION; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + /* write changelog version */ + PR_snprintf (clVersion, VERSION_SIZE, "%s/%s/%s\n", CL5_TYPE, REPL_PLUGIN_NAME, + CHANGELOG_DB_VERSION); + + len = strlen (clVersion); + size = slapi_write_buffer (file, clVersion, len); + if (size != len) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5WriteDBVersion: failed to write DBVERSION; NSPR error - %d\n", + PR_GetError ()); + PR_Close (file); + return CL5_SYSTEM_ERROR; + } + + rc = PR_Close (file); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5WriteDBVersion: failed to close DBVERSION; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +/* for now guardian file is just like dbversion file */ +static int _cl5CheckGuardian () +{ + char plVersion [VERSION_SIZE + 1]; + char dbVersion [VERSION_SIZE + 1]; + int rc; + + /* new changelog - no guardian file */ + if (!cl5Exist(s_cl5Desc.dbDir)) + { + return CL5_SUCCESS; + } + else + { + PR_snprintf (plVersion, VERSION_SIZE, "%s/%s/%s", CL5_TYPE, REPL_PLUGIN_NAME, + CHANGELOG_DB_VERSION); + rc = _cl5ReadGuardian (dbVersion); + + if (rc != CL5_SUCCESS || strcasecmp (plVersion, dbVersion) != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5CheckGuardian: missing or invalid guardian file\n"); + return (CL5_BAD_FORMAT); + } + + /* remove guardian file */ + rc = _cl5RemoveGuardian (); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5CheckGuardian: failed to remove guardian file\n"); + } + } + + return rc; +} + +static int _cl5WriteGuardian () +{ + int rc; + PRFileDesc *file; + char fName [MAXPATHLEN + 1]; + char version [VERSION_SIZE]; + PRInt32 len, size; + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, GUARDIAN_FILE); + + file = PR_Open (fName, PR_WRONLY | PR_CREATE_FILE, s_cl5Desc.dbConfig.fileMode); + if (file == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5WriteGuardian: failed to open guardian file; NSPR error - %d\n", + PR_GetError()); + return CL5_SYSTEM_ERROR; + } + + PR_snprintf (version, VERSION_SIZE, "%s/%s/%s\n", CL5_TYPE, REPL_PLUGIN_NAME, + CHANGELOG_DB_VERSION); + + len = strlen (version); + size = slapi_write_buffer (file, version, len); + if (size != len) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteGuardian: failed to write guardian file; NSPR error - %d\n", + PR_GetError()); + PR_Close (file); + return CL5_SYSTEM_ERROR; + } + + rc = PR_Close (file); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5WriteGuardian: failed to close guardian file; NSPR error - %d\n", + PR_GetError()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +static int _cl5ReadGuardian (char *buff) +{ + int rc; + PRFileDesc *file; + char fName [MAXPATHLEN + 1]; + PRInt32 size; + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, GUARDIAN_FILE); + + file = PR_Open (fName, PR_RDONLY, 0); + if (file == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5ReadGuardian: failed to open guardian file; NSPR error - %d\n", + PR_GetError()); + return CL5_SYSTEM_ERROR; + } + + size = slapi_read_buffer (file, buff, VERSION_SIZE); + if (size <= 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5ReadGuardian: failed to read guardian file; NSPR error - %d\n", + PR_GetError()); + PR_Close (file); + return CL5_SYSTEM_ERROR; + } + + buff [size-1] = '\0'; + + rc = PR_Close (file); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5ReadGuardian: failed to close guardian file; NSPR error - %d\n", + PR_GetError()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +static int _cl5RemoveGuardian () +{ + char fName [MAXPATHLEN + 1]; + int rc; + + PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, GUARDIAN_FILE); + + rc = PR_Delete (fName); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5RemoveGuardian: failed to remove guardian file; NSPR error - %d\n", + PR_GetError()); + return CL5_SYSTEM_ERROR; + } + + return CL5_SUCCESS; +} + +/* must be called under the state lock */ +static void _cl5Close () +{ + int rc2 = 0; + PRIntervalTime interval; + + if (s_cl5Desc.dbState != CL5_STATE_CLOSED) /* Don't try to close twice */ + { + + /* close db files */ + _cl5DBClose (); + + /* stop global threads */ + interval = PR_MillisecondsToInterval(100); + while (s_cl5Desc.threadCount > 0) + { + slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "_cl5Close: waiting for threads to exit: %d thread(s) still active\n", + s_cl5Desc.threadCount); + DS_Sleep(interval); + } + + /* cleanup trimming */ + _cl5TrimCleanup (); + + /* shutdown db environment */ + if (s_cl5Desc.dbEnv) + { + DB_ENV *dbEnv = s_cl5Desc.dbEnv; + rc2 = dbEnv->close(dbEnv, 0); + s_cl5Desc.dbEnv = NULL; + } + + /* record successful close by writing guardian file; + we do it in all case accept incomplete open due to an error */ + if (s_cl5Desc.dbState == CL5_STATE_CLOSING || s_cl5Desc.dbOpenMode != CL5_OPEN_NORMAL) + { + _cl5WriteGuardian (); + } + + /* remove changelog if requested */ + if (s_cl5Desc.dbRmOnClose) + { + + if (_cl5Delete (s_cl5Desc.dbDir, 1) != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5Close: failed to remove changelog\n"); + } + s_cl5Desc.dbRmOnClose = PR_FALSE; + } + + slapi_ch_free ((void **)&s_cl5Desc.dbDir); + memset (&s_cl5Desc.dbConfig, 0, sizeof (s_cl5Desc.dbConfig)); + s_cl5Desc.fatalError = PR_FALSE; + s_cl5Desc.threadCount = 0; + s_cl5Desc.dbOpenMode = CL5_OPEN_NONE; + } +} + +static void _cl5DBClose () +{ + if (NULL != s_cl5Desc.dbFiles) + { + objset_delete (&s_cl5Desc.dbFiles); + } + if (NULL != s_cl5Desc.fileLock) + { + PR_DestroyLock (s_cl5Desc.fileLock); + } +} + +/* state lock must be locked */ +static int _cl5Delete (const char *clDir, int rmDir) +{ + PRDir *dir; + char filename[MAXPATHLEN + 1]; + PRDirEntry *entry = NULL; + int rc; + + /* remove all files in the directory and the directory */ + dir = PR_OpenDir(clDir); + if (dir == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Delete: failed to open changelog dir; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + + } + + while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT))) + { + if (NULL == entry->name) + { + break; + } + PR_snprintf(filename, MAXPATHLEN, "%s/%s", clDir, entry->name); + rc = PR_Delete(filename); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Delete: failed to remove (%s) file; NSPR error - %d\n", + filename, PR_GetError ()); + } + } + + rc = PR_CloseDir(dir); + if (rc != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Delete: failed to close changelog dir (%s); NSPR error - %d\n", + clDir, PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + + if (rmDir) + { + rc = PR_RmDir (clDir); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5Delete: failed to remove changelog dir (%s); errno = %d\n", + clDir, errno); + return CL5_SYSTEM_ERROR; + } + } + + return CL5_SUCCESS; +} + +static void _cl5SetDefaultDBConfig () +{ + s_cl5Desc.dbConfig.cacheSize = CL5_DEFAULT_CONFIG_DB_DBCACHESIZE; + s_cl5Desc.dbConfig.durableTrans = CL5_DEFAULT_CONFIG_DB_DURABLE_TRANSACTIONS; + s_cl5Desc.dbConfig.checkpointInterval = CL5_DEFAULT_CONFIG_DB_CHECKPOINT_INTERVAL; + s_cl5Desc.dbConfig.circularLogging = CL5_DEFAULT_CONFIG_DB_CIRCULAR_LOGGING; + s_cl5Desc.dbConfig.pageSize = CL5_DEFAULT_CONFIG_DB_PAGE_SIZE; + s_cl5Desc.dbConfig.logfileSize = CL5_DEFAULT_CONFIG_DB_LOGFILE_SIZE; + s_cl5Desc.dbConfig.maxTxnSize = CL5_DEFAULT_CONFIG_DB_TXN_MAX; + s_cl5Desc.dbConfig.verbose = CL5_DEFAULT_CONFIG_DB_VERBOSE; + s_cl5Desc.dbConfig.debug = CL5_DEFAULT_CONFIG_DB_DEBUG; + s_cl5Desc.dbConfig.tricklePercentage = CL5_DEFAULT_CONFIG_DB_TRICKLE_PERCENTAGE; + s_cl5Desc.dbConfig.spinCount = CL5_DEFAULT_CONFIG_DB_SPINCOUNT; + s_cl5Desc.dbConfig.nb_lock_config = CL5_DEFAULT_CONFIG_NB_LOCK; + s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE; +} + +static void _cl5SetDBConfig (const CL5DBConfig *config) +{ + /* through CL5DBConfig, we have access to all the LDAP configurable Changelog DB parameters */ + s_cl5Desc.dbConfig.cacheSize = config->cacheSize; + s_cl5Desc.dbConfig.durableTrans = config->durableTrans; + s_cl5Desc.dbConfig.checkpointInterval = config->checkpointInterval; + s_cl5Desc.dbConfig.circularLogging = config->circularLogging; + s_cl5Desc.dbConfig.pageSize = config->pageSize; + s_cl5Desc.dbConfig.logfileSize = config->logfileSize; + s_cl5Desc.dbConfig.maxTxnSize = config->maxTxnSize; + s_cl5Desc.dbConfig.verbose = config->verbose; + s_cl5Desc.dbConfig.debug = config->debug; + s_cl5Desc.dbConfig.tricklePercentage = config->tricklePercentage; + s_cl5Desc.dbConfig.spinCount = config->spinCount; + s_cl5Desc.dbConfig.nb_lock_config = config->nb_lock_config; + s_cl5Desc.dbConfig.maxConcurrentWrites = config->maxConcurrentWrites; + + if (config->spinCount != 0) + { + DB_ENV_SET_TAS_SPINS(s_cl5Desc.dbEnv, config->spinCount); + } + + /* Some other configuration parameters are hardcoded... */ + s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE; +} + +#define ONEG 1073741824 /* one giga bytes */ +static void _cl5InitDBEnv(DB_ENV *dbEnv) +{ + dbEnv->set_errpfx(dbEnv, "ns-slapd"); + dbEnv->set_lg_max(dbEnv, s_cl5Desc.dbConfig.logfileSize); + dbEnv->set_tx_max(dbEnv, s_cl5Desc.dbConfig.maxTxnSize); + dbEnv->set_cachesize(dbEnv, s_cl5Desc.dbConfig.cacheSize/ONEG, + s_cl5Desc.dbConfig.cacheSize%ONEG, + 0); + /* Set default number of locks */ + dbEnv->set_lk_max_locks(dbEnv, s_cl5Desc.dbConfig.nb_lock_config); + + if (s_cl5Desc.dbConfig.verbose) + { + int on = 1; + dbEnv->set_verbose(dbEnv, DB_VERB_CHKPOINT, on); + dbEnv->set_verbose(dbEnv, DB_VERB_DEADLOCK, on); + dbEnv->set_verbose(dbEnv, DB_VERB_RECOVERY, on); + dbEnv->set_verbose(dbEnv, DB_VERB_WAITSFOR, on); + } + if (s_cl5Desc.dbConfig.debug) + { + dbEnv->set_errcall(dbEnv, _cl5DBLogPrint); + } +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3300 + dbEnv->set_alloc(dbEnv, malloc, realloc, free); +#endif +} + +static void _cl5DBLogPrint(const char* prefix, char *buffer) +{ + /* We ignore the prefix since we know who we are anyway */ + slapi_log_error (SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5: %s\n", buffer); +} + +static PRBool _cl5IsLogFile (const char *path) +{ + int rc; + + /* Is the filename at least 4 characters long ? */ + if (strlen(path) < 4) + { + return PR_FALSE; /* Not a log file then */ + } + + /* Are the first 4 characters "log." ? */ + rc = strncmp(path,"log.",4); + if (0 == rc) + { + /* Now, are the last 4 characters _not_ .db# ? */ + const char *piece = path + (strlen(path) - 4); + rc = strcmp(piece, DB_EXTENSION); + if (0 != rc) + { + /* Is */ + return PR_TRUE; + } + } + return PR_FALSE; /* Is not */ +} + +static int _cl5Recover (int open_flags, DB_ENV *dbEnv) +{ + /* If we're doing recovery, we MUST open the env single-threaded ! */ + int recover_flags = open_flags & ~DB_THREAD; + int rc; + + rc = dbEnv->open(dbEnv, s_cl5Desc.dbDir, recover_flags, s_cl5Desc.dbConfig.fileMode); + + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Recover: appinit failed; db error - %d %s\n", + rc, db_strerror(rc)); + return CL5_DB_ERROR; + } + + /* Now close it so we can re-open it again... */ + dbEnv->close(dbEnv, 0); + + return CL5_SUCCESS; +} + +/* Trimming helper functions */ +static int _cl5TrimInit () +{ + /* just create the lock while we are singlethreaded */ + s_cl5Desc.dbTrim.lock = PR_NewLock(); + + if (s_cl5Desc.dbTrim.lock == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5InitTrimming: failed to create lock; NSPR error - %d\n", + PR_GetError ()); + return CL5_SYSTEM_ERROR; + } + else + { + return CL5_SUCCESS; + } +} + +static void _cl5TrimCleanup () +{ + if (s_cl5Desc.dbTrim.lock) + PR_DestroyLock (s_cl5Desc.dbTrim.lock); + + memset (&s_cl5Desc.dbTrim, 0, sizeof (s_cl5Desc.dbTrim)); +} + +static int _cl5TrimMain (void *param) +{ + PRIntervalTime interval; + time_t timePrev = current_time (); + time_t timeNow; + + PR_AtomicIncrement (&s_cl5Desc.threadCount); + interval = PR_SecondsToInterval(CHANGELOGDB_TRIM_INTERVAL); + + while (s_cl5Desc.dbState != CL5_STATE_CLOSING) + { + timeNow = current_time (); + if (timeNow - timePrev >= CHANGELOGDB_TRIM_INTERVAL) + { + /* time to trim */ + timePrev = timeNow; + _cl5DoTrimming (); + } + if (NULL == s_cl5Desc.clLock) + { + /* most likely, emergency */ + break; + } + + PR_Lock(s_cl5Desc.clLock); + PR_WaitCondVar(s_cl5Desc.clCvar, interval); + PR_Unlock(s_cl5Desc.clLock); + } + + PR_AtomicDecrement (&s_cl5Desc.threadCount); + + return 0; +} + +/* We remove an entry if it has been replayed to all consumers and + and the number of entries in the changelog is larger than maxEntries + or age of the entry is larger than maxAge. + Also we can't purge entries which correspond to max csns in the + supplier's ruv. Here is a example where we can get into trouble: + The server is setup with time based trimming and no consumer's + At some point all the entries are trimmed from the changelog. + At a later point a consumer is added and initialized online + Then a change is made on the supplier. + To update the consumer, the supplier would attempt to locate + the last change sent to the consumer in the changelog and will + fail because the change was removed. + + */ + +static void _cl5DoTrimming () +{ + Object *obj; + long numToTrim; + + PR_Lock (s_cl5Desc.dbTrim.lock); + + /* ONREPL We trim file by file which means that some files will be + trimmed more often than other. We might have to fix that by, for + example, randomizing starting point */ + obj = objset_first_obj (s_cl5Desc.dbFiles); + while (obj && _cl5CanTrim ((time_t)0, &numToTrim)) + { + _cl5TrimFile (obj, &numToTrim); + obj = objset_next_obj (s_cl5Desc.dbFiles, obj); + } + + if (obj) + object_release (obj); + + PR_Unlock (s_cl5Desc.dbTrim.lock); + + return; +} + +/* Note that each file contains changes for a single replicated area. + trimming algorithm: +*/ +#define CL5_TRIM_MAX_PER_TRANSACTION 10 + +static void _cl5TrimFile (Object *obj, long *numToTrim) +{ + DB_TXN *txnid; + RUV *ruv = NULL; + CL5Entry entry; + slapi_operation_parameters op = {0}; + void *it; + int finished = 0, totalTrimmed = 0, count; + PRBool abort; + char strCSN[CSN_STRSIZE]; + int rc; + + PR_ASSERT (obj); + + /* construct the ruv up to which we can purge */ + rc = _cl5GetRUV2Purge2 (obj, &ruv); + if (rc != CL5_SUCCESS || ruv == NULL) + { + return; + } + + entry.op = &op; + + while ( !finished && !g_get_shutdown() ) + { + it = NULL; + count = 0; + txnid = NULL; + abort = PR_FALSE; + + /* DB txn lock accessed pages until the end of the transaction. */ + + rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5TrimFile: failed to begin transaction; db error - %d %s\n", + rc, db_strerror(rc)); + finished = PR_TRUE; + break; + } + + finished = _cl5GetFirstEntry (obj, &entry, &it, txnid); + while ( !finished ) + { + /* + * This change can be trimmed if it exceeds purge + * parameters and has been seen by all consumers. + */ + if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) && + ruv_covers_csn_strict (ruv, op.csn) ) + { + rc = _cl5CurrentDeleteEntry (it); + if ( rc == CL5_SUCCESS ) + { + /* update purge vector */ + rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE); + } + if ( rc == CL5_SUCCESS) + { + if (*numToTrim > 0) (*numToTrim)--; + count++; + } + else + { + /* The above two functions have logged the error */ + abort = PR_TRUE; + } + + } + else + { + /* The changelog DB is time ordered. If we can not trim + * a CSN, we will not be allowed to trim the rest of the + * CSNs generally. However, the maxcsn of each replica ID + * is always kept in the changelog as an anchor for + * replaying future changes. We have to skip those anchor + * CSNs, otherwise a non-active replica ID could block + * the trim forever. + */ + CSN *maxcsn = NULL; + ReplicaId rid; + + rid = csn_get_replicaid (op.csn); + ruv_get_largest_csn_for_replica (ruv, rid, &maxcsn); + if ( csn_compare (op.csn, maxcsn) != 0 ) + { + /* op.csn is not anchor CSN */ + finished = 1; + } + else + { + slapi_log_error (SLAPI_LOG_REPL, NULL, + "Changelog purge skipped anchor csn %s\n", + csn_as_string (maxcsn, PR_FALSE, strCSN)); + + /* extra read to skip the current record */ + cl5_operation_parameters_done (&op); + finished =_cl5GetNextEntry (&entry, it); + } + if (maxcsn) csn_free (&maxcsn); + } + cl5_operation_parameters_done (&op); + if (finished || abort || count >= CL5_TRIM_MAX_PER_TRANSACTION) + { + /* If we reach CL5_TRIM_MAX_PER_TRANSACTION, + * we close the cursor, + * commit the transaction and restart a new transaction + */ + break; + } + finished = _cl5GetNextEntry (&entry, it); + } + + /* MAB: We need to close the cursor BEFORE the txn commits/aborts. + * If we don't respect this order, we'll screw up the database, + * placing it in DB_RUNRECOVERY mode + */ + cl5DestroyIterator (it); + + if (abort) + { + finished = 1; + rc = TXN_ABORT (txnid); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5TrimFile: failed to abort transaction; db error - %d %s\n", + rc, db_strerror(rc)); + } + } + else + { + rc = TXN_COMMIT (txnid, 0); + if (rc != 0) + { + finished = 1; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5TrimFile: failed to commit transaction; db error - %d %s\n", + rc, db_strerror(rc)); + } + else + { + totalTrimmed += count; + } + } + + } /* While (!finished) */ + + if (ruv) + ruv_destroy (&ruv); + + if (totalTrimmed) + { + slapi_log_error (SLAPI_LOG_REPL, NULL, "Trimmed %d changes from the changelog\n", totalTrimmed); + } +} + +static PRBool _cl5CanTrim (time_t time, long *numToTrim) +{ + *numToTrim = 0; + + if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) + return PR_FALSE; + + if (s_cl5Desc.dbTrim.maxAge == 0) + { + *numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries; + return ( *numToTrim > 0 ); + } + + if (s_cl5Desc.dbTrim.maxEntries > 0 && + (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) + return PR_TRUE; + + if (time) + return (current_time () - time > s_cl5Desc.dbTrim.maxAge); + else + return PR_TRUE; +} + +static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) +{ + int rc; + char csnStr [CSN_STRSIZE]; + DBT key={0}, data={0}; + struct berval **vals; + CL5DBFile *file; + char *pos; + char *agmt_name; + + + PR_ASSERT (replGen && obj); + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + agmt_name = get_thread_private_agmtname(); + + if (purge) /* read purge vector entry */ + key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr); + else /* read upper bound vector */ + key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr); + + key.size = CSN_STRSIZE; + + data.flags = DB_DBT_MALLOC; + + rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0); + switch (rc) + { + case 0: pos = data.data; + rc = _cl5ReadBervals (&vals, &pos, data.size); + free (data.data); + if (rc != CL5_SUCCESS) + return rc; + + if (purge) + rc = ruv_init_from_bervals(vals, &file->purgeRUV); + else + rc = ruv_init_from_bervals(vals, &file->maxRUV); + + if (rc != RUV_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "%s: _cl5ReadRUV: failed to initialize %s ruv; " + "RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc); + + return CL5_RUV_ERROR; + } + + ber_bvecfree(vals); + + /* delete the entry; it is re-added when file + is successfully closed */ + file->db->del (file->db, NULL, &key, DEFAULT_DB_OP_FLAGS); + + return CL5_SUCCESS; + + case DB_NOTFOUND: /* RUV is lost - need to construct */ + rc = _cl5ConstructRUV (replGen, obj, purge); + return rc; + + default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "%s: _cl5ReadRUV: failed to get purge RUV; " + "db error - %d %s\n", agmt_name, rc, db_strerror(rc)); + return CL5_DB_ERROR; + } +} + +static int _cl5WriteRUV (CL5DBFile *file, PRBool purge) +{ + int rc; + DBT key={0}, data={0}; + char csnStr [CSN_STRSIZE]; + struct berval **vals; + DB_TXN *txnid = NULL; + + if ((purge && file->purgeRUV == NULL) || (!purge && file->maxRUV == NULL)) + return CL5_SUCCESS; + + if (purge) + { + key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr); + rc = ruv_to_bervals(file->purgeRUV, &vals); + } + else + { + key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr); + rc = ruv_to_bervals(file->maxRUV, &vals); + } + + key.size = CSN_STRSIZE; + + rc = _cl5WriteBervals (vals, (char**)&data.data, &data.size); + ber_bvecfree(vals); + if (rc != CL5_SUCCESS) + { + return rc; + } + +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_begin(s_cl5Desc.dbEnv, NULL, &txnid, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteRUV: failed to begin transaction; db error - %d %s\n", + rc, db_strerror(rc)); + return CL5_DB_ERROR; + } +#endif + rc = file->db->put(file->db, txnid, &key, &data, DEFAULT_DB_OP_FLAGS); + + slapi_ch_free ((void**)&data.data); + if ( rc == 0 ) + { +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_commit (txnid, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteRUV: failed to commit transaction; db error - %d %s\n", + rc, db_strerror(rc)); + return CL5_DB_ERROR; + } +#endif + return CL5_SUCCESS; + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteRUV: failed to write %s RUV for file %s; db error - %d\n", + purge? "purge" : "upper bound", file->name, rc); + + if (CL5_OS_ERR_IS_DISKFULL(rc)) + { + cl5_set_diskfull(); + return CL5_DB_ERROR; + } +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_abort (txnid); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteRUV: failed to abort transaction; db error - %d %s\n", + rc, db_strerror(rc)); + } +#endif + return CL5_DB_ERROR; + } +} + +/* This is a very slow process since we have to read every changelog entry. + Hopefully, this function is not called too often */ +static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge) +{ + int rc; + CL5Entry entry; + void *iterator = NULL; + slapi_operation_parameters op = {0}; + CL5DBFile *file; + + PR_ASSERT (replGen && obj); + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + /* construct the RUV */ + if (purge) + rc = ruv_init_new (replGen, 0, NULL, &file->purgeRUV); + else + rc = ruv_init_new (replGen, 0, NULL, &file->maxRUV); + if (rc != RUV_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV: " + "failed to initialize %s RUV for file %s; ruv error - %d\n", + purge? "purge" : "upper bound", file->name, rc); + return CL5_RUV_ERROR; + } + + entry.op = &op; + rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); + while (rc == CL5_SUCCESS) + { + if (purge) + rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn); + else + rc = ruv_set_csns (file->maxRUV, op.csn, NULL); + + cl5_operation_parameters_done (&op); + if (rc != RUV_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV: " + "failed to updated %s RUV for file %s; ruv error - %d\n", + purge ? "purge" : "upper bound", file->name, rc); + rc = CL5_RUV_ERROR; + continue; + } + + rc = _cl5GetNextEntry (&entry, iterator); + } + + cl5_operation_parameters_done (&op); + + if (iterator) + cl5DestroyIterator (iterator); + + if (rc == CL5_NOTFOUND) + { + rc = CL5_SUCCESS; + } + else + { + if (purge) + ruv_destroy (&file->purgeRUV); + else + ruv_destroy (&file->maxRUV); + } + + return rc; +} + +static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge) +{ + ReplicaId rid; + int rc = RUV_SUCCESS; /* initialize rc to avoid erroneous logs */ + CL5DBFile *file; + + PR_ASSERT (obj && csn); + + file = (CL5DBFile*)object_get_data (obj); + + /* if purge is TRUE, file->purgeRUV must be set; + if purge is FALSE, maxRUV must be set */ + PR_ASSERT (file && ((purge && file->purgeRUV) || (!purge && file->maxRUV))); + + /* update vector only if this replica is not yet part of RUV */ + if (purge && newReplica) + { + rid = csn_get_replicaid(csn); + if (ruv_contains_replica (file->purgeRUV, rid)) + return CL5_SUCCESS; + else + { + /* if the replica is not part of the purgeRUV yet, add it */ + ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl()); + } + } + else + { + if (purge) + rc = ruv_set_csns(file->purgeRUV, csn, NULL); + else + rc = ruv_set_csns(file->maxRUV, csn, NULL); + } + + if (rc != RUV_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5UpdatePurgeRUV: " + "failed to update %s RUV for file %s; ruv error - %d\n", + purge ? "purge" : "upper bound", file->name, rc); + return CL5_RUV_ERROR; + } + + return CL5_SUCCESS; +} + +static int _cl5EnumConsumerRUV (const ruv_enum_data *element, void *arg) +{ + int rc; + RUV *ruv; + CSN *csn = NULL; + + PR_ASSERT (element && element->csn && arg); + + ruv = (RUV*)arg; + + rc = ruv_get_largest_csn_for_replica(ruv, csn_get_replicaid (element->csn), &csn); + if (rc != RUV_SUCCESS || csn == NULL || csn_compare (element->csn, csn) < 0) + { + ruv_set_max_csn(ruv, element->csn, NULL); + } + + if (csn) + csn_free (&csn); + + return 0; +} + +static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv) +{ + int rc = CL5_SUCCESS; + CL5DBFile *dbFile; + Object *rObj = NULL; + Replica *r = NULL; + Object *agmtObj = NULL; + Repl_Agmt *agmt; + Object *consRUVObj, *supRUVObj; + RUV *consRUV, *supRUV; + CSN *csn; + + PR_ASSERT (fileObj && ruv); + + dbFile = (CL5DBFile*)object_get_data (fileObj); + PR_ASSERT (dbFile); + + rObj = replica_get_by_name (dbFile->replName); + PR_ASSERT (rObj); + r = (Replica*)object_get_data (rObj); + PR_ASSERT (r); + + /* We start with this replica's RUV. See note in _cl5DoTrimming */ + supRUVObj = replica_get_ruv (r); + PR_ASSERT (supRUVObj); + + supRUV = (RUV*)object_get_data (supRUVObj); + PR_ASSERT (supRUV); + + *ruv = ruv_dup (supRUV); + + object_release (supRUVObj); + + agmtObj = agmtlist_get_first_agreement_for_replica (r); + while (agmtObj) + { + agmt = (Repl_Agmt*)object_get_data (agmtObj); + PR_ASSERT (agmt); + + consRUVObj = agmt_get_consumer_ruv (agmt); + if (consRUVObj) + { + consRUV = (RUV*)object_get_data (consRUVObj); + rc = ruv_enumerate_elements (consRUV, _cl5EnumConsumerRUV, *ruv); + if (rc != RUV_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetRUV2Purge2: " + "failed to construct ruv; ruv error - %d\n", rc); + rc = CL5_RUV_ERROR; + object_release (consRUVObj); + object_release (agmtObj); + break; + } + + object_release (consRUVObj); + } + + agmtObj = agmtlist_get_next_agreement_for_replica (r, agmtObj); + } + + /* check if there is any data in the constructed ruv - otherwise get rid of it */ + if (ruv_get_max_csn(*ruv, &csn) != RUV_SUCCESS || csn == NULL) + { + ruv_destroy (ruv); + } + else + { + csn_free (&csn); + } + + if (rObj) + object_release (rObj); + + if (rc != CL5_SUCCESS && ruv) + ruv_destroy (ruv); + + return rc; +} + +static int _cl5GetEntryCount (CL5DBFile *file) +{ + int rc; + char csnStr [CSN_STRSIZE]; + DBT key={0}, data={0}; + DB_BTREE_STAT *stats = NULL; + + PR_ASSERT (file); + + /* read entry count. if the entry is there - the file was successfully closed + last time it was used */ + key.data = _cl5GetHelperEntryKey (ENTRY_COUNT_TIME, csnStr); + key.size = CSN_STRSIZE; + + data.flags = DB_DBT_MALLOC; + + rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0); + switch (rc) + { + case 0: file->entryCount = *(int*)data.data; + free (data.data); + + /* delete the entry. the entry is re-added when file + is successfully closed */ + file->db->del (file->db, NULL, &key, DEFAULT_DB_OP_FLAGS); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetEntryCount: %d changes for replica %s\n", + file->entryCount, file->replName); + return CL5_SUCCESS; + + case DB_NOTFOUND: file->entryCount = 0; +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3300 + rc = file->db->stat(file->db, (void*)&stats, 0); +#else + rc = file->db->stat(file->db, (void*)&stats, malloc, 0); +#endif + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5GetEntryCount: failed to get changelog statistics; " + "db error - %d %s\n", rc, db_strerror(rc)); + return CL5_DB_ERROR; + } + +#ifdef DB30 + file->entryCount = stats->bt_nrecs; +#else /* DB31 */ + file->entryCount = stats->bt_ndata; +#endif + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetEntryCount: %d changes for replica %s\n", + file->entryCount, file->replName); + + free (stats); + return CL5_SUCCESS; + + default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5GetEntryCount: failed to get count entry; " + "db error - %d %s\n", rc, db_strerror(rc)); + return CL5_DB_ERROR; + } +} + +static int _cl5WriteEntryCount (CL5DBFile *file) +{ + int rc; + DBT key={0}, data={0}; + char csnStr [CSN_STRSIZE]; + DB_TXN *txnid = NULL; + + key.data = _cl5GetHelperEntryKey (ENTRY_COUNT_TIME, csnStr); + key.size = CSN_STRSIZE; + data.data = (void*)&file->entryCount; + data.size = sizeof (file->entryCount); + +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_begin(s_cl5Desc.dbEnv, NULL, &txnid, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteEntryCount: failed to begin transaction; db error - %d %s\n", + rc, db_strerror(rc)); + return CL5_DB_ERROR; + } +#endif + rc = file->db->put(file->db, txnid, &key, &data, DEFAULT_DB_OP_FLAGS); + if (rc == 0) + { +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_commit (txnid, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteEntryCount: failed to commit transaction; db error - %d %s\n", + rc, db_strerror(rc)); + return CL5_DB_ERROR; + } +#endif + return CL5_SUCCESS; + } + else + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteEntryCount: " + "failed to write count entry for file %s; db error - %d %s\n", + file->name, rc, db_strerror(rc)); + if (CL5_OS_ERR_IS_DISKFULL(rc)) + { + cl5_set_diskfull(); + return CL5_DB_ERROR; + } +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_abort (txnid); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteEntryCount: failed to abort transaction; db error - %d %s\n", + rc, db_strerror(rc)); + } +#endif + return CL5_DB_ERROR; + } +} + +static const char* _cl5OperationType2Str (int type) +{ + switch (type) + { + case SLAPI_OPERATION_ADD: return T_ADDCTSTR; + case SLAPI_OPERATION_MODIFY: return T_MODIFYCTSTR; + case SLAPI_OPERATION_MODRDN: return T_MODRDNCTSTR; + case SLAPI_OPERATION_DELETE: return T_DELETECTSTR; + default: return NULL; + } +} + +static int _cl5Str2OperationType (const char *str) +{ + if (strcasecmp (str, T_ADDCTSTR) == 0) + return SLAPI_OPERATION_ADD; + + if (strcasecmp (str, T_MODIFYCTSTR) == 0) + return SLAPI_OPERATION_MODIFY; + + if (strcasecmp (str, T_MODRDNCTSTR) == 0) + return SLAPI_OPERATION_MODRDN; + + if (strcasecmp (str, T_DELETECTSTR) == 0) + return SLAPI_OPERATION_DELETE; + + return -1; +} + +static int _cl5Operation2LDIF (const slapi_operation_parameters *op, const char *replGen, + char **ldifEntry, PRInt32 *lenLDIF) +{ + int len = 2; + lenstr *l = NULL; + const char *strType; + char *strDeleteOldRDN; + char *buff, *start; + LDAPMod **add_mods; + char *rawDN; + char strCSN[CSN_STRSIZE]; + + PR_ASSERT (op && replGen && ldifEntry && IsValidOperation (op)); + + strType = _cl5OperationType2Str (op->operation_type); + csn_as_string(op->csn,PR_FALSE,strCSN); + + /* find length of the buffer */ + len += LDIF_SIZE_NEEDED(strlen (T_CHANGETYPESTR), strlen (strType)); + len += LDIF_SIZE_NEEDED(strlen (T_REPLGEN), strlen (replGen)); + len += LDIF_SIZE_NEEDED(strlen (T_CSNSTR), strlen (strCSN)); + len += LDIF_SIZE_NEEDED(strlen (T_UNIQUEIDSTR), strlen (op->target_address.uniqueid)); + + switch (op->operation_type) + { + case SLAPI_OPERATION_ADD: if (op->p.p_add.parentuniqueid) + len += LDIF_SIZE_NEEDED(strlen (T_PARENTIDSTR), + strlen (op->p.p_add.parentuniqueid)); + slapi_entry2mods (op->p.p_add.target_entry, &rawDN, &add_mods); + len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (rawDN)); + l = make_changes_string(add_mods, NULL); + len += LDIF_SIZE_NEEDED(strlen (T_CHANGESTR), l->ls_len); + ldap_mods_free (add_mods, 1); + break; + + case SLAPI_OPERATION_MODIFY: len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (op->target_address.dn)); + l = make_changes_string(op->p.p_modify.modify_mods, NULL); + len += LDIF_SIZE_NEEDED(strlen (T_CHANGESTR), l->ls_len); + break; + + case SLAPI_OPERATION_MODRDN: len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (op->target_address.dn)); + len += LDIF_SIZE_NEEDED(strlen (T_NEWRDNSTR), + strlen (op->p.p_modrdn.modrdn_newrdn)); + strDeleteOldRDN = (op->p.p_modrdn.modrdn_deloldrdn ? "true" : "false"); + len += LDIF_SIZE_NEEDED(strlen (T_DRDNFLAGSTR), + strlen (strDeleteOldRDN)); + if (op->p.p_modrdn.modrdn_newsuperior_address.dn) + len += LDIF_SIZE_NEEDED(strlen (T_NEWSUPERIORDNSTR), + strlen (op->p.p_modrdn.modrdn_newsuperior_address.dn)); + if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + len += LDIF_SIZE_NEEDED(strlen (T_NEWSUPERIORIDSTR), + strlen (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)); + l = make_changes_string(op->p.p_modrdn.modrdn_mods, NULL); + len += LDIF_SIZE_NEEDED(strlen (T_CHANGESTR), l->ls_len); + break; + + case SLAPI_OPERATION_DELETE: len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (op->target_address.dn)); + break; + + default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Operation2LDIF: invalid operation type - %d\n", op->operation_type); + + return CL5_BAD_FORMAT; + } + + /* allocate buffer */ + buff = (char*)slapi_ch_malloc (len); + start = buff; + if (buff == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5Operation2LDIF: memory allocation failed\n"); + return CL5_MEMORY_ERROR; + } + + /* fill buffer */ + ldif_put_type_and_value(&buff, T_CHANGETYPESTR, (char*)strType, strlen (strType)); + ldif_put_type_and_value(&buff, T_REPLGEN, (char*)replGen, strlen (replGen)); + ldif_put_type_and_value(&buff, T_CSNSTR, (char*)strCSN, strlen (strCSN)); + ldif_put_type_and_value(&buff, T_UNIQUEIDSTR, op->target_address.uniqueid, + strlen (op->target_address.uniqueid)); + + switch (op->operation_type) + { + case SLAPI_OPERATION_ADD: if (op->p.p_add.parentuniqueid) + ldif_put_type_and_value(&buff, T_PARENTIDSTR, + op->p.p_add.parentuniqueid, strlen (op->p.p_add.parentuniqueid)); + ldif_put_type_and_value(&buff, T_DNSTR, rawDN, strlen (rawDN)); + ldif_put_type_and_value(&buff, T_CHANGESTR, l->ls_buf, l->ls_len); + slapi_ch_free ((void**)&rawDN); + break; + + case SLAPI_OPERATION_MODIFY: ldif_put_type_and_value(&buff, T_DNSTR, op->target_address.dn, + strlen (op->target_address.dn)); + ldif_put_type_and_value(&buff, T_CHANGESTR, l->ls_buf, l->ls_len); + break; + + case SLAPI_OPERATION_MODRDN: ldif_put_type_and_value(&buff, T_DNSTR, op->target_address.dn, + strlen (op->target_address.dn)); + ldif_put_type_and_value(&buff, T_NEWRDNSTR, op->p.p_modrdn.modrdn_newrdn, + strlen (op->p.p_modrdn.modrdn_newrdn)); + ldif_put_type_and_value(&buff, T_DRDNFLAGSTR, strDeleteOldRDN, + strlen (strDeleteOldRDN)); + if (op->p.p_modrdn.modrdn_newsuperior_address.dn) + ldif_put_type_and_value(&buff, T_NEWSUPERIORDNSTR, + op->p.p_modrdn.modrdn_newsuperior_address.dn, + strlen (op->p.p_modrdn.modrdn_newsuperior_address.dn)); + if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + ldif_put_type_and_value(&buff, T_NEWSUPERIORIDSTR, + op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, + strlen (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)); + ldif_put_type_and_value(&buff, T_CHANGESTR, l->ls_buf, l->ls_len); + break; + + case SLAPI_OPERATION_DELETE: ldif_put_type_and_value(&buff, T_DNSTR, op->target_address.dn, + strlen (op->target_address.dn)); + break; + } + + *buff = '\n'; + buff ++; + *buff = '\0'; + + *ldifEntry = start; + *lenLDIF = buff - start; + + if (l) + lenstr_free(&l); + + return CL5_SUCCESS; +} + +static int +_cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op, char **replGen) +{ + int rc; + int vlen; + char *next, *line; + char *type, *value; + Slapi_Mods *mods; + char *rawDN; + + PR_ASSERT (op && ldifEntry && replGen); + + memset (op, 0, sizeof (*op)); + + next = ldifEntry; + while ((line = ldif_getline(&next)) != NULL) + { + char *errmsg = NULL; + + if ( *line == '\n' || *line == '\0' ) + { + break; + } + + /* this call modifies ldifEntry */ + rc = ldif_parse_line(line, &type, &value, &vlen, &errmsg); + if (rc != 0) + { + if ( errmsg != NULL ) { + slapi_log_error(SLAPI_LOG_PARSE, repl_plugin_name_cl, "%s", errmsg); + slapi_ch_free( (void**)&errmsg ); + } + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5LDIF2Operation: warning - failed to parse ldif line\n"); + continue; + } + + if (strcasecmp (type, T_CHANGETYPESTR) == 0) + { + op->operation_type = _cl5Str2OperationType (value); + } + else if (strcasecmp (type, T_REPLGEN) == 0) + { + *replGen = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_CSNSTR) == 0) + { + op->csn = csn_new_by_string(value); + } + else if (strcasecmp (type, T_UNIQUEIDSTR) == 0) + { + op->target_address.uniqueid = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_DNSTR) == 0) + { + PR_ASSERT (op->operation_type); + + if (op->operation_type == SLAPI_OPERATION_ADD) + { + rawDN = slapi_ch_strdup (value); + op->target_address.dn = slapi_ch_strdup(rawDN); + } + else + op->target_address.dn = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_PARENTIDSTR) == 0) + { + op->p.p_add.parentuniqueid = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_NEWRDNSTR) == 0) + { + op->p.p_modrdn.modrdn_newrdn = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_DRDNFLAGSTR) == 0) + { + op->p.p_modrdn.modrdn_deloldrdn = (strcasecmp (value, "true") ? PR_FALSE : PR_TRUE); + } + else if (strcasecmp (type, T_NEWSUPERIORDNSTR) == 0) + { + op->p.p_modrdn.modrdn_newsuperior_address.dn = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_NEWSUPERIORIDSTR) == 0) + { + op->p.p_modrdn.modrdn_newsuperior_address.uniqueid = slapi_ch_strdup (value); + } + else if (strcasecmp (type, T_CHANGESTR) == 0) + { + PR_ASSERT (op->operation_type); + + switch (op->operation_type) + { + case SLAPI_OPERATION_ADD: mods = parse_changes_string(value); + slapi_mods2entry (&(op->p.p_add.target_entry), rawDN, + slapi_mods_get_ldapmods_byref(mods)); + slapi_ch_free ((void**)&rawDN); + slapi_mods_free (&mods); + break; + + case SLAPI_OPERATION_MODIFY: mods = parse_changes_string(value); + PR_ASSERT (mods); + op->p.p_modify.modify_mods = slapi_mods_get_ldapmods_passout (mods); + slapi_mods_free (&mods); + break; + + case SLAPI_OPERATION_MODRDN: mods = parse_changes_string(value); + PR_ASSERT (mods); + op->p.p_modrdn.modrdn_mods = slapi_mods_get_ldapmods_passout (mods); + slapi_mods_free (&mods); + break; + + default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5LDIF2Operation: invalid operation type - %d\n", + op->operation_type); + return CL5_BAD_FORMAT; + } + } + } + + if (IsValidOperation (op)) + return CL5_SUCCESS; + + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5LDIF2Operation: invalid data format\n"); + return CL5_BAD_FORMAT; +} + +static int _cl5WriteOperation(const char *replName, const char *replGen, + const slapi_operation_parameters *op, PRBool local) +{ + int rc; + int cnt; + DBT key={0}; + DBT * data=NULL; + char csnStr [CSN_STRSIZE]; + PRIntervalTime interval; + CL5Entry entry; + CL5DBFile *file = NULL; + Object *file_obj = NULL; + DB_TXN *txnid = NULL; + + rc = _cl5GetDBFileByReplicaName (replName, replGen, &file_obj); + if (rc == CL5_NOTFOUND) + { + rc = _cl5DBOpenFileByReplicaName (replName, replGen, &file_obj, + PR_TRUE /* check for duplicates */); + if (rc != CL5_SUCCESS) + { + return rc; + } + } + else if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "_cl5WriteOperation: failed to get db file for target dn (%s)", + op->target_address.dn); + return CL5_OBJSET_ERROR; + } + + /* assign entry time - used for trimming */ + entry.time = current_time (); + entry.op = (slapi_operation_parameters *)op; + + /* construct the key */ + key.data = csn_as_string(op->csn, PR_FALSE, csnStr); + key.size = CSN_STRSIZE; + + /* construct the data */ + data = (DBT *) slapi_ch_calloc(1, sizeof(DBT)); + rc = _cl5Entry2DBData (&entry, (char**)&data->data, &data->size); + if (rc != CL5_SUCCESS) + { + char s[CSN_STRSIZE]; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5WriteOperation: failed to convert entry with csn (%s) " + "to db format\n", csn_as_string(op->csn,PR_FALSE,s)); + goto done; + } + + file = (CL5DBFile*)object_get_data (file_obj); + PR_ASSERT (file); + + /* if this is part of ldif2cl - just write the entry without transaction */ + if (s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL) + { + rc = file->db->put(file->db, NULL, &key, data, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteOperation: failed to write entry; db error - %d %s\n", + rc, db_strerror(rc)); + if (CL5_OS_ERR_IS_DISKFULL(rc)) + { + cl5_set_diskfull(); + } + rc = CL5_DB_ERROR; + } + goto done; + } + + /* write the entry */ + rc = EAGAIN; + cnt = 0; + + while ((rc == EAGAIN || rc == DB_LOCK_DEADLOCK) && cnt < MAX_TRIALS) + { + if (cnt != 0) + { +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + /* abort previous transaction */ + rc = txn_abort (txnid); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteOperation: failed to abort transaction; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } +#endif + /* back off */ + interval = PR_MillisecondsToInterval(slapi_rand() % 100); + DS_Sleep(interval); + } +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + /* begin transaction */ + rc = txn_begin(s_cl5Desc.dbEnv, NULL /*pid*/, &txnid, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteOperation: failed to start transaction; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } +#endif + + if ( file->sema ) + { + PR_WaitSemaphore(file->sema); + } + rc = file->db->put(file->db, txnid, &key, data, DEFAULT_DB_OP_FLAGS); + if ( file->sema ) + { + PR_PostSemaphore(file->sema); + } + if (CL5_OS_ERR_IS_DISKFULL(rc)) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteOperation: changelog (%s) DISK FULL; db error - %d %s\n", + s_cl5Desc.dbDir, rc, db_strerror(rc)); + cl5_set_diskfull(); + rc = CL5_DB_ERROR; + goto done; + } + if (cnt != 0) + { + if (rc == 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5WriteOperation: retry (%d) the transaction (csn=%s) succeeded\n", cnt, (char*)key.data); + } + else if ((cnt + 1) >= MAX_TRIALS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5WriteOperation: retry (%d) the transaction (csn=%s) failed (rc=%d)\n", cnt, (char*)key.data, rc); + } + } + cnt ++; + } + + if (rc == 0) /* we successfully added entry */ + { +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_commit (txnid, 0); +#endif + } + else + { + char s[CSN_STRSIZE]; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteOperation: failed to write entry with csn (%s); " + "db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s), + rc, db_strerror(rc)); +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100 + rc = txn_abort (txnid); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5WriteOperation: failed to abort transaction; db error - %d %s\n", + rc, db_strerror(rc)); + } +#endif + rc = CL5_DB_ERROR; + goto done; + } + + /* update entry count - we assume that all entries are new */ + PR_AtomicIncrement (&file->entryCount); + + /* update purge vector if we have not seen any changes from this replica before */ + _cl5UpdateRUV (file_obj, op->csn, PR_TRUE, PR_TRUE); + + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "cl5WriteOperation: successfully written entry with csn (%s)\n", csnStr); + rc = CL5_SUCCESS; +done: + if (data->data) + slapi_ch_free ((void**)&data->data); + slapi_ch_free((void**) &data); + + if (file_obj) + object_release (file_obj); + + return rc; +} + +static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid) +{ + int rc; + DBC *cursor = NULL; + DBT key={0}, data={0}; + CL5Iterator *it; + CL5DBFile *file; + + PR_ASSERT (obj && entry && iterator); + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + /* create cursor */ + rc = file->db->cursor(file->db, txnid, &cursor, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5GetFirstEntry: failed to create cursor; db error - %d %s\n", rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } + + key.flags = DB_DBT_MALLOC; + data.flags = DB_DBT_MALLOC; + while ((rc = cursor->c_get(cursor, &key, &data, DB_NEXT)) == 0) + { + /* skip service entries */ + if (cl5HelperEntry ((char*)key.data, NULL)) + { + free (key.data); + free (data.data); + continue; + } + + /* format entry */ + free (key.data); + rc = cl5DBData2Entry (data.data, data.size, entry); + free (data.data); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetFirstOperation: failed to format entry\n", rc); + goto done; + } + + it = (CL5Iterator*)slapi_ch_malloc (sizeof (CL5Iterator)); + it->cursor = cursor; + object_acquire (obj); + it->file = obj; + *(CL5Iterator**)iterator = it; + + return CL5_SUCCESS; + } + + /* walked of the end of the file */ + if (rc == DB_NOTFOUND) + { + rc = CL5_NOTFOUND; + goto done; + } + + /* db error occured while iterating */ + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5GetFirstEntry: failed to get entry; db error - %d %s\n", rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } + + /* successfully retrieved next entry but it was out of range */ + if (rc == CL5_SUCCESS) + { + free (key.data); + free (data.data); + rc = CL5_NOTFOUND; + goto done; + } + +done:; + /* error occured */ + /* We didn't success in assigning this cursor to the iterator, + * so we need to free the cursor here */ + if (cursor) + cursor->c_close(cursor); + + return rc; +} + +static int _cl5GetNextEntry (CL5Entry *entry, void *iterator) +{ + int rc; + CL5Iterator *it; + DBT key={0}, data={0}; + + PR_ASSERT (entry && iterator); + + it = (CL5Iterator*) iterator; + + key.flags = DB_DBT_MALLOC; + data.flags = DB_DBT_MALLOC; + while ((rc = it->cursor->c_get(it->cursor, &key, &data, DB_NEXT)) == 0) + { + if (cl5HelperEntry ((char*)key.data, NULL)) + { + free (key.data); + free (data.data); + continue; + } + + free (key.data); + /* format entry */ + rc = cl5DBData2Entry (data.data, data.size, entry); + free (data.data); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetNextEntry: failed to format entry\n", rc); + } + + return rc; + } + + /* walked of the end of the file or entry is out of range */ + if (rc == 0 || rc == DB_NOTFOUND) + { + return CL5_NOTFOUND; + } + + /* cursor operation failed */ + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5GetNextEntry: failed to get entry; db error - %d %s\n", rc, db_strerror(rc)); + + return CL5_DB_ERROR; + } + + return rc; +} + +static int _cl5CurrentDeleteEntry (void *iterator) +{ + int rc; + CL5Iterator *it; + CL5DBFile *file; + + PR_ASSERT (iterator); + + it = (CL5Iterator*)iterator; + + rc = it->cursor->c_del (it->cursor, 0); + + if (rc == 0) { + /* decrement entry count */ + file = (CL5DBFile*)object_get_data (it->file); + PR_AtomicDecrement (&file->entryCount); + return CL5_SUCCESS; + } else { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CurrentDeleteEntry failed, err=%d %s\n", + rc, db_strerror(rc)); + /* We don't free(close) the cursor here, as the caller will free it by a call to cl5DestroyIterator */ + /* Freeing it here is a potential bug, as the cursor can't be referenced later once freed */ + return CL5_DB_ERROR; + } +} + +static PRBool _cl5IsValidIterator (const CL5Iterator *iterator) +{ + return (iterator && iterator->cursor && iterator->file); +} + +static int _cl5GetOperation (Object *replica, slapi_operation_parameters *op) +{ + int rc; + DBT key={0}, data={0}; + CL5DBFile *file; + CL5Entry entry; + Object *obj = NULL; + char csnStr[CSN_STRSIZE]; + + rc = _cl5GetDBFile (replica, &obj); + if (rc != CL5_SUCCESS) + { + return rc; + } + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + /* construct the key */ + key.data = csn_as_string(op->csn, PR_FALSE, csnStr); + key.size = CSN_STRSIZE; + + data.flags = DB_DBT_MALLOC; + + rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0); + switch (rc) + { + case 0: entry.op = op; + /* Callers of this function should cl5_operation_parameters_done(op) */ + rc = cl5DBData2Entry (data.data, data.size, &entry); + if (rc == CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + "_cl5GetOperation: successfully retrieved operation with csn (%s)\n", + csnStr); + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetOperation: failed to convert db data to operation;" + " csn - %s\n", csnStr); + } + goto done; + + case DB_NOTFOUND: slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetOperation: operation for csn (%s) is not found in db that should contain dn (%s)\n", + csnStr, op->target_address.dn); + rc = CL5_NOTFOUND; + goto done; + + default: slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5GetOperation: failed to get entry for csn (%s); " + "db error - %d %s\n", csnStr, rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } + +done:; + if (obj) + object_release (obj); + + if (data.data) + free (data.data); + + return rc; +} + +PRBool cl5HelperEntry (const char *csnstr, CSN *csnp) +{ + CSN *csn; + time_t csnTime; + PRBool retval = PR_FALSE; + + if (csnp) + { + csn = csnp; + } + else + { + csn= csn_new_by_string(csnstr); + } + if (csn == NULL) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "cl5HelperEntry: failed to get csn time; csn error\n"); + return PR_FALSE; + } + csnTime= csn_get_time(csn); + + if (csnTime == ENTRY_COUNT_TIME || csnTime == PURGE_RUV_TIME) + { + retval = PR_TRUE; + } + + if (NULL == csnp) + csn_free(&csn); + return retval; +} + +/* Replay iteration helper functions */ +static PRBool _cl5ValidReplayIterator (const CL5ReplayIterator *iterator) +{ + if (iterator == NULL || + iterator->consumerRuv == NULL || iterator->supplierRuvObj == NULL || + iterator->fileObj == NULL) + return PR_FALSE; + + return PR_TRUE; +} + +/* Algorithm: ONREPL!!! + */ +struct replica_hash_entry +{ + ReplicaId rid; /* replica id */ + PRBool sendChanges; /* indicates whether changes should be sent for this replica */ +}; + + +static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consumerRuv, + Object *replica, Object *fileObj, CL5ReplayIterator **iterator) +{ + CLC_Buffer *clcache = NULL; + CL5DBFile *file; + int i; + CSN **csns = NULL; + CSN *startCSN = NULL; + char csnStr [CSN_STRSIZE]; + int rc = CL5_SUCCESS; + Object *supplierRuvObj = NULL; + RUV *supplierRuv = NULL; + ReplicaId supplierRID; + PRBool newReplica; + PRBool haveChanges = PR_FALSE; + char *agmt_name; + ReplicaId rid; + + PR_ASSERT (consumerRuv && replica && fileObj && iterator); + csnStr[0] = '\0'; + + file = (CL5DBFile*)object_get_data (fileObj); + supplierRID = replica_get_rid((Replica*)object_get_data(replica)); + + /* get supplier's RUV */ + supplierRuvObj = replica_get_ruv((Replica*)object_get_data(replica)); + PR_ASSERT (supplierRuvObj); + supplierRuv = (RUV*)object_get_data (supplierRuvObj); + PR_ASSERT (supplierRuv); + + agmt_name = get_thread_private_agmtname(); + slapi_log_error(SLAPI_LOG_REPL, NULL, "_cl5PositionCursorForReplay (%s): Consumer RUV:\n", agmt_name); + ruv_dump (consumerRuv, agmt_name, NULL); + slapi_log_error(SLAPI_LOG_REPL, NULL, "_cl5PositionCursorForReplay (%s): Supplier RUV:\n", agmt_name); + ruv_dump (supplierRuv, agmt_name, NULL); + + /* + * get the sorted list of SupplierMinCSN (if no ConsumerMaxCSN) + * and ConsumerMaxCSN for those RIDs where consumer is not + * up-to-date. + */ + csns = cl5BuildCSNList (consumerRuv, supplierRuv); + if (csns == NULL) + { + rc = CL5_NOTFOUND; + goto done; + } + + /* iterate over elements of consumer's (and/or supplier's) ruv */ + for (i = 0; csns[i]; i++) + { + CSN *consumerMaxCSN = NULL; + + rid = csn_get_replicaid(csns[i]); + + /* + * Skip CSN that is originated from the consumer. + * If RID==65535, the CSN is originated from a + * legacy consumer. In this case the supplier + * and the consumer may have the same RID. + */ + if (rid == consumerRID && rid != MAX_REPLICA_ID) + continue; + + startCSN = csns[i]; + csn_as_string(startCSN, PR_FALSE, csnStr); + + rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv ); + if ( rc != 0 ) goto done; + + /* This is the first loading of this iteration. For replicas + * already known to the consumer, we exclude the last entry + * sent to the consumer by using DB_NEXT. However, for + * replicas new to the consumer, we include the first change + * ever generated by that replica. + */ + newReplica = ruv_get_largest_csn_for_replica (consumerRuv, rid, &consumerMaxCSN); + csn_free(&consumerMaxCSN); + rc = clcache_load_buffer (clcache, startCSN, (newReplica ? DB_SET : DB_NEXT)); + + /* there is a special case which can occur just after migration - in this case, + the consumer RUV will contain the last state of the supplier before migration, + but the supplier will have an empty changelog, or the supplier changelog will + not contain any entries within the consumer min and max CSN - also, since + the purge RUV contains no CSNs, the changelog has never been purged + ASSUMPTIONS - it is assumed that the supplier had no pending changes to send + to any consumers; that is, we can assume that no changes were lost due to + either changelog purging or database reload - bug# 603061 - richm@netscape.com + */ + if (rc == 0 || (rc == DB_NOTFOUND && !ruv_has_csns(file->purgeRUV))) + { + haveChanges = PR_TRUE; + rc = CL5_SUCCESS; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "%s: CSN %s found, position set for replay\n", agmt_name, csnStr); + break; + } + else if (rc == DB_NOTFOUND) /* entry not found */ + { + /* check whether this csn should be present */ + rc = _cl5CheckMissingCSN (startCSN, supplierRuv, file); + if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */ + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "%s: CSN %s not found, seems to be missing\n", agmt_name, csnStr); + break; + } + else /* we are not as up to date or we purged */ + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "%s: CSN %s not found, we aren't as up to date, or we purged\n", + agmt_name, csnStr); + continue; + } + } + else + { + + /* db error */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "%s: Failed to retrieve change with CSN %s; db error - %d %s\n", + agmt_name, csnStr, rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + break; + } + + } /* end for */ + + /* setup the iterator */ + if (haveChanges) + { + *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator)); + + if (*iterator == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "%s: _cl5PositionCursorForReplay: failed to allocate iterator\n", agmt_name); + rc = CL5_MEMORY_ERROR; + goto done; + } + + /* ONREPL - should we make a copy of both RUVs here ?*/ + (*iterator)->fileObj = fileObj; + (*iterator)->clcache = clcache; clcache = NULL; + (*iterator)->consumerRID = consumerRID; + (*iterator)->consumerRuv = consumerRuv; + (*iterator)->supplierRuvObj = supplierRuvObj; + } + else if (rc == CL5_SUCCESS) + { + /* we have no changes to send */ + rc = CL5_NOTFOUND; + } + +done: + if ( clcache ) + clcache_return_buffer ( &clcache ); + + if (csns) + cl5DestroyCSNList (&csns); + + if (rc != CL5_SUCCESS) + { + if (supplierRuvObj) + object_release (supplierRuvObj); + } + + return rc; +} + +struct ruv_it +{ + CSN **csns; /* csn list */ + int alloc; /* allocated size */ + int pos; /* position in the list */ +}; + +static int ruv_consumer_iterator (const ruv_enum_data *enum_data, void *arg) +{ + struct ruv_it *data = (struct ruv_it*)arg; + + PR_ASSERT (data); + + /* check if we have space for one more element */ + if (data->pos >= data->alloc - 2) + { + data->alloc += 4; + data->csns = (CSN**) slapi_ch_realloc ((void*)data->csns, data->alloc * sizeof (CSN*)); + } + + data->csns [data->pos] = csn_dup (enum_data->csn); + data->pos ++; + + return 0; +} + + +static int ruv_supplier_iterator (const ruv_enum_data *enum_data, void *arg) +{ + int i; + PRBool found = PR_FALSE; + ReplicaId rid; + struct ruv_it *data = (struct ruv_it*)arg; + + PR_ASSERT (data); + + rid = csn_get_replicaid (enum_data->min_csn); + /* check if the replica that generated the csn is already in the list */ + for (i = 0; i < data->pos; i++) + { + if (rid == csn_get_replicaid (data->csns[i])) + { + found = PR_TRUE; + + /* remove datacsn[i] if it is greater or equal to the supplier's maxcsn */ + if ( csn_compare ( data->csns[i], enum_data->csn ) >= 0 ) + { + int j; + + csn_free ( & data->csns[i] ); + for (j = i+1; j < data->pos; j++) + { + data->csns [j-1] = data->csns [j]; + } + data->pos --; + } + break; + } + } + + if (!found) + { + /* check if we have space for one more element */ + if (data->pos >= data->alloc - 2) + { + data->alloc += 4; + data->csns = (CSN**)slapi_ch_realloc ((void*)data->csns, + data->alloc * sizeof (CSN*)); + } + + data->csns [data->pos] = csn_dup (enum_data->min_csn); + data->pos ++; + } + return 0; +} + + + +static int +my_csn_compare(const void *arg1, const void *arg2) +{ + return(csn_compare(*((CSN **)arg1), *((CSN **)arg2))); +} + + + +/* builds CSN ordered list of all csns in the RUV */ +CSN** cl5BuildCSNList (const RUV *consRuv, const RUV *supRuv) +{ + struct ruv_it data; + int count, rc; + CSN **csns; + + PR_ASSERT (consRuv); + + count = ruv_replica_count (consRuv); + csns = (CSN**)slapi_ch_calloc (count + 1, sizeof (CSN*)); + + data.csns = csns; + data.alloc = count + 1; + data.pos = 0; + + /* add consumer elements to the list */ + rc = ruv_enumerate_elements (consRuv, ruv_consumer_iterator, &data); + if (rc == 0 && supRuv) + { + /* add supplier elements to the list */ + rc = ruv_enumerate_elements (supRuv, ruv_supplier_iterator, &data); + } + + /* we have no csns */ + if (data.csns[0] == NULL) + { + /* csns might have been realloced in ruv_supplier_iterator() */ + slapi_ch_free ((void**)&data.csns); + csns = NULL; + } + else + { + csns = data.csns; + data.csns [data.pos] = NULL; + if (rc == 0) + { + qsort (csns, data.pos, sizeof (CSN*), my_csn_compare); + } + else + { + cl5DestroyCSNList (&csns); + } + } + + return csns; +} + +void cl5DestroyCSNList (CSN*** csns) +{ + if (csns && *csns) + { + int i; + + for (i = 0; (*csns)[i]; i++) + { + csn_free (&(*csns)[i]); + } + + slapi_ch_free ((void**)csns); + } +} + +/* A csn should be in the changelog if it is larger than purge vector csn for the same + replica and is smaller than the csn in supplier's ruv for the same replica. + The functions returns + CL5_PURGED if data was purged from the changelog or was never logged + because it was loaded as part of replica initialization + CL5_MISSING if the data erouneously missing + CL5_SUCCESS if that has not and should not been seen by the server + */ +static int _cl5CheckMissingCSN (const CSN *csn, const RUV *supplierRuv, CL5DBFile *file) +{ + ReplicaId rid; + CSN *supplierCsn = NULL; + CSN *purgeCsn = NULL; + int rc = CL5_SUCCESS; + char csnStr [CSN_STRSIZE]; + + PR_ASSERT (csn && supplierRuv && file); + + rid = csn_get_replicaid (csn); + ruv_get_largest_csn_for_replica (supplierRuv, rid, &supplierCsn); + if (supplierCsn == NULL) + { + /* we have not seen any changes from this replica so it is + ok not to have this csn */ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: " + "can't locate %s csn: we have not seen any changes for replica %d\n", + csn_as_string (csn, PR_FALSE, csnStr), rid); + return CL5_SUCCESS; + } + + ruv_get_largest_csn_for_replica (file->purgeRUV, rid, &purgeCsn); + if (purgeCsn == NULL) + { + /* changelog never contained any changes for this replica */ + if (csn_compare (csn, supplierCsn) <= 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: " + "the change with %s csn was never logged because it was imported " + "during replica initialization\n", csn_as_string (csn, PR_FALSE, csnStr)); + rc = CL5_PURGED_DATA; /* XXXggood is that the correct return value? */ + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: " + "change with %s csn has not yet been seen by this server; " + " last csn seen from that replica is %s\n", + csn_as_string (csn, PR_FALSE, csnStr), + csn_as_string (supplierCsn, PR_FALSE, csnStr)); + rc = CL5_SUCCESS; + } + } + else /* we have both purge and supplier csn */ + { + if (csn_compare (csn, purgeCsn) < 0) /* the csn is below the purge point */ + { + rc = CL5_PURGED_DATA; + } + else + { + if (csn_compare (csn, supplierCsn) <= 0) /* we should have the data but we don't */ + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: " + "change with %s csn has been purged by this server; " + "the current purge point for that replica is %s\n", + csn_as_string (csn, PR_FALSE, csnStr), + csn_as_string (purgeCsn, PR_FALSE, csnStr)); + rc = CL5_MISSING_DATA; + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: " + "change with %s csn has not yet been seen by this server; " + " last csn seen from that replica is %s\n", + csn_as_string (csn, PR_FALSE, csnStr), + csn_as_string (supplierCsn, PR_FALSE, csnStr)); + rc = CL5_SUCCESS; + } + } + } + + if (supplierCsn) + csn_free (&supplierCsn); + + if (purgeCsn) + csn_free (&purgeCsn); + + return rc; +} + +/* Helper functions that work with individual changelog files */ + +/* file name format : <replica name>_<replica generation>db{2,3} */ +static PRBool _cl5FileName2Replica (const char *file_name, Object **replica) +{ + Replica *r; + char *repl_name, *file_gen, *repl_gen; + int len; + + PR_ASSERT (file_name && replica); + + *replica = NULL; + + /* this is database file */ + if (_cl5FileEndsWith (file_name, DB_EXTENSION) || + _cl5FileEndsWith (file_name, DB_EXTENSION_DB3) ) + { + repl_name = slapi_ch_strdup (file_name); + file_gen = strstr(repl_name, FILE_SEP); + if (file_gen) + { + int extlen = strlen(DB_EXTENSION); + *file_gen = '\0'; + file_gen += strlen (FILE_SEP); + len = strlen (file_gen); + if (len <= extlen + 1) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica " + "invalid file name (%s)\n", file_name); + } + else + { + /* get rid of the file extension */ + file_gen [len - extlen - 1] = '\0'; + *replica = replica_get_by_name (repl_name); + if (*replica) + { + /* check that generation matches the one in replica object */ + r = (Replica*)object_get_data (*replica); + repl_gen = replica_get_generation (r); + PR_ASSERT (repl_gen); + if (strcmp (file_gen, repl_gen) != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica " + "replica generation mismatch for replica at (%s), " + "file generation %s, new replica generation %s\n", + slapi_sdn_get_dn (replica_get_root (r)), file_gen, repl_gen); + + object_release (*replica); + *replica = NULL; + } + slapi_ch_free ((void**)&repl_gen); + } + } + slapi_ch_free ((void**)&repl_name); + } + else + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica " + "malformed file name - %s\n", file_name); + } + + return PR_TRUE; + } + else + return PR_FALSE; +} + +/* file name format : <replica name>_<replica generation>db{2,3} */ +static char* _cl5Replica2FileName (Object *replica) +{ + const char *replName; + char *replGen, *fileName; + Replica *r; + + PR_ASSERT (replica); + + r = (Replica*)object_get_data (replica); + PR_ASSERT (r); + + replName = replica_get_name (r); + replGen = replica_get_generation (r); + + fileName = _cl5MakeFileName (replName, replGen) ; + + slapi_ch_free ((void**)&replGen); + + return fileName; +} + +static char* _cl5MakeFileName (const char *replName, const char *replGen) +{ + char *fileName; + fileName = slapi_ch_malloc (strlen (replName) + strlen (replGen) + + strlen (DB_EXTENSION) + 3/* '_' + '.' + '\0' */); + sprintf (fileName, "%s%s%s.%s", replName, FILE_SEP, replGen, DB_EXTENSION); + + return fileName; +} + +/* open file that corresponds to a particular database */ +static int _cl5DBOpenFile (Object *replica, Object **obj, PRBool checkDups) +{ + int rc; + const char *replName; + char *replGen; + Replica *r; + + PR_ASSERT (replica); + + r = (Replica*)object_get_data (replica); + replName = replica_get_name (r); + PR_ASSERT (replName); + replGen = replica_get_generation (r); + PR_ASSERT (replGen); + + rc = _cl5DBOpenFileByReplicaName (replName, replGen, obj, checkDups); + + slapi_ch_free ((void**)&replGen); + + return rc; +} + +static int _cl5DBOpenFileByReplicaName (const char *replName, const char *replGen, + Object **obj, PRBool checkDups) +{ + int rc = CL5_SUCCESS; + Object *tmpObj; + CL5DBFile *file; + char *file_name; + + PR_ASSERT (replName && replGen); + + if (checkDups) + { + PR_Lock (s_cl5Desc.fileLock); + file_name = _cl5MakeFileName (replName, replGen); + tmpObj = objset_find (s_cl5Desc.dbFiles, _cl5CompareDBFile, file_name); + slapi_ch_free((void **)&file_name); + file_name = NULL; + if (tmpObj) /* this file already exist */ + { + /* if we were asked for file handle - keep the handle */ + if (obj) + { + *obj = tmpObj; + } + else + { + object_release (tmpObj); + } + + rc = CL5_SUCCESS; + goto done; + } + } + + rc = _cl5NewDBFile (replName, replGen, &file); + if (rc == CL5_SUCCESS) + { + /* This creates the file but doesn't set the init flag + * The flag is set later when the purge and max ruvs are set. + * This is to prevent some thread to get file access before the + * structure is fully initialized */ + rc = _cl5AddDBFile (file, &tmpObj); + if (rc == CL5_SUCCESS) + { + /* read purge RUV - done here because it needs file object rather than file pointer */ + rc = _cl5ReadRUV (replGen, tmpObj, PR_TRUE); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DBOpenFileByReplicaName: failed to get purge RUV\n"); + goto done; + } + + /* read ruv that represents the upper bound of the changes stored in the file */ + rc = _cl5ReadRUV (replGen, tmpObj, PR_FALSE); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5DBOpenFileByReplicaName: failed to get upper bound RUV\n"); + goto done; + } + + /* Mark the DB File initialize */ + _cl5DBFileInitialized(tmpObj); + + if (obj) + { + *obj = tmpObj; + } + else + { + object_release (tmpObj); + } + } + } + +done:; + if (rc != CL5_SUCCESS) + { + if (file) + _cl5DBCloseFile ((void**)&file); + } + + if (checkDups) + { + PR_Unlock (s_cl5Desc.fileLock); + } + + return rc; +} + +/* adds file to the db file list */ +static int _cl5AddDBFile (CL5DBFile *file, Object **obj) +{ + int rc; + Object *tmpObj; + + PR_ASSERT (file); + + tmpObj = object_new (file, _cl5DBCloseFile); + rc = objset_add_obj(s_cl5Desc.dbFiles, tmpObj); + if (rc != OBJSET_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5AddDBFile: failed to add db file to the list; " + "repl_objset error - %d\n", rc); + object_release (tmpObj); + return CL5_OBJSET_ERROR; + } + + if (obj) + { + *obj = tmpObj; + } + else + object_release (tmpObj); + + return CL5_SUCCESS; +} + +static int _cl5NewDBFile (const char *replName, const char *replGen, CL5DBFile** dbFile) +{ + int rc; + DB *db = NULL; + char *name; + char *semadir; +#ifdef HPUX + char cwd [PATH_MAX+1]; +#endif + + PR_ASSERT (replName && replGen && dbFile); + + (*dbFile) = (CL5DBFile *)slapi_ch_calloc (1, sizeof (CL5DBFile)); + if (*dbFile == NULL) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5NewDBFile: memory allocation failed\n"); + return CL5_MEMORY_ERROR; + } + + name = _cl5MakeFileName (replName, replGen); + { + /* The subname argument allows applications to have + * subdatabases, i.e., multiple databases inside of a single + * physical file. This is useful when the logical databases + * are both numerous and reasonably small, in order to + * avoid creating a large number of underlying files. + */ + char *subname = NULL; + DB_ENV *dbEnv = s_cl5Desc.dbEnv; + + rc = db_create(&db, dbEnv, 0); + if (0 != rc) { + goto out; + } + + rc = db->set_pagesize( + db, + s_cl5Desc.dbConfig.pageSize); + + if (0 != rc) { + goto out; + } + +#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 3300 + rc = db->set_malloc(db, malloc); + if (0 != rc) { + goto out; + } +#endif + + DB_OPEN(s_cl5Desc.dbEnvOpenFlags, + db, NULL /* txnid */, name, subname, DB_BTREE, + DB_CREATE | DB_THREAD, s_cl5Desc.dbConfig.fileMode, rc); + } +out: + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5NewDBFile: db_open failed; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } + + (*dbFile)->db = db; + (*dbFile)->name = name; + (*dbFile)->replName = slapi_ch_strdup (replName); + (*dbFile)->replGen = slapi_ch_strdup (replGen); + + /* + * Considerations for setting up cl semaphore: + * (1) The NT version of SleepyCat uses test-and-set mutexes + * at the DB page level instead of blocking mutexes. That has + * proven to be a killer for the changelog DB, as this DB is + * accessed by multiple a reader threads (the repl thread) and + * writer threads (the server ops threads) usually at the last + * pages of the DB, due to the sequential nature of the changelog + * keys. To avoid the test-and-set mutexes, we could use semaphore + * to serialize the writers and avoid the high mutex contention + * that SleepyCat is unable to avoid. + * (2) [610948] Linux master hangs for 2 hours + * [611239] _cl5DeadlockMain: lock_detect succeeded + * (3) DS 6.2 introduced the semaphore on all platforms (replaced + * the serial lock used on Windows and Linux described above). + * The number of the concurrent writes now is configurable by + * nsslapd-changelogmaxconcurrentwrites (the server needs to + * be restarted). + */ + + semadir = s_cl5Desc.dbDir; +#ifdef HPUX + /* + * HP sem_open() does not allow pathname component "./" or "../" + * in the semaphore name. For simplicity and to avoid doing + * chdir() in multi-thread environment, current working dir + * (log dir) is used to replace the original semaphore dir + * if it contains "./". + */ + if ( strstr ( semadir, "./" ) != NULL && getcwd ( cwd, PATH_MAX+1 ) != NULL ) + { + semadir = cwd; + } +#endif + + if ( semadir != NULL ) + { + (*dbFile)->semaName = slapi_ch_malloc (strlen(semadir) + strlen(replName) + strlen(".sema") + 10); + sprintf ((*dbFile)->semaName, "%s/%s.sema", semadir, replName); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5NewDBFile: semaphore %s\n", (*dbFile)->semaName); + (*dbFile)->sema = PR_OpenSemaphore((*dbFile)->semaName, PR_SEM_CREATE, 0666, s_cl5Desc.dbConfig.maxConcurrentWrites ); + slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5NewDBFile: maxConcurrentWrites=%d\n", s_cl5Desc.dbConfig.maxConcurrentWrites ); + } + + if ((*dbFile)->sema == NULL ) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5NewDBFile: failed to create semaphore %s; NSPR error - %d\n", + (*dbFile)->semaName ? (*dbFile)->semaName : "(nil)", PR_GetError ()); + rc = CL5_SYSTEM_ERROR; + goto done; + } + + /* compute number of entries in the file */ + /* ONREPL - to improve performance, we keep entry count in memory + and write it down during shutdown. Problem: this will not + work with multiple processes. Do we have to worry about that? + */ + if (s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL) + { + rc = _cl5GetEntryCount (*dbFile); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5NewDBFile: failed to get entry count\n"); + goto done; + } + } + +done: + if (rc != CL5_SUCCESS) + { + if (dbFile) + _cl5DBCloseFile ((void**)dbFile); + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free ((void**)&name); + + slapi_ch_free ((void**)dbFile); + } + + return rc; +} + +static void _cl5DBCloseFile (void **data) +{ + CL5DBFile *file; + char fullpathname[MAXPATHLEN]; + + PR_ASSERT (data); + + file = *(CL5DBFile**)data; + + /* close the file */ + /* if this is normal close or close after import, update entry count */ + if ((s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL && s_cl5Desc.dbState == CL5_STATE_CLOSING) || + s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL) + { + _cl5WriteEntryCount (file); + _cl5WriteRUV (file, PR_TRUE); + _cl5WriteRUV (file, PR_FALSE); + } + + /* close file */ + if (file->db) + file->db->close(file->db, 0); + + if (file->flags & DB_FILE_DELETED) + { + PR_snprintf(fullpathname, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, file->name); + if (PR_Delete(fullpathname) != PR_SUCCESS) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile: " + "failed to remove (%s) file; NSPR error - %d\n", file->name, PR_GetError ()); + + } + } + + /* slapi_ch_free accepts NULL pointer */ + slapi_ch_free ((void**)&file->name); + slapi_ch_free ((void**)&file->replName); + slapi_ch_free ((void**)&file->replGen); + if (file->sema) { + PR_CloseSemaphore (file->sema); + PR_DeleteSemaphore (file->semaName); + file->sema = NULL; + } + slapi_ch_free ((void**)&file->semaName); + + slapi_ch_free (data); +} + +static int _cl5GetDBFile (Object *replica, Object **obj) +{ + char *fileName; + + PR_ASSERT (replica && obj); + + fileName = _cl5Replica2FileName (replica); + + *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName); + slapi_ch_free ((void**)&fileName); + if (*obj) + { + return CL5_SUCCESS; + } + else + { + return CL5_NOTFOUND; + } +} + +static int _cl5GetDBFileByReplicaName (const char *replName, const char *replGen, + Object **obj) +{ + char *fileName; + + PR_ASSERT (replName && replGen && obj); + + fileName = _cl5MakeFileName (replName, replGen); + + *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName); + slapi_ch_free ((void**)&fileName); + if (*obj) + { + return CL5_SUCCESS; + } + else + { + return CL5_NOTFOUND; + } +} + +static void _cl5DBDeleteFile (Object *obj) +{ + CL5DBFile *file; + + PR_ASSERT (obj); + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + file->flags |= DB_FILE_DELETED; + objset_remove_obj(s_cl5Desc.dbFiles, obj); + object_release (obj); +} + +static void _cl5DBFileInitialized (Object *obj) +{ + CL5DBFile *file; + + PR_ASSERT (obj); + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + file->flags |= DB_FILE_INIT; +} + +static int _cl5CompareDBFile (Object *el1, const void *el2) +{ + CL5DBFile *file; + const char *name; + + PR_ASSERT (el1 && el2); + + file = (CL5DBFile*) object_get_data (el1); + name = (const char*) el2; + return ((file->flags & DB_FILE_INIT) ? strcmp (file->name, name) : 1); +} + +static int _cl5CopyDBFiles (const char *srcDir, const char *destDir, Object **replicas) +{ + char srcFile [MAXPATHLEN + 1]; + char destFile[MAXPATHLEN + 1]; + int rc; + Object *obj; + CL5DBFile *file; + + /* ONREPL currently, dbidlist is ignored because db code can't handle discrepancy between + transaction log and present files; this should change before 5.0 ships */ + obj = objset_first_obj (s_cl5Desc.dbFiles); + while (obj) + { + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", srcDir, file->name); + PR_snprintf(destFile, MAXPATHLEN, "%s/%s", destDir, file->name); + rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE); + if (rc != 0) + { + object_release (obj); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CopyDBFiles: failed to copy %s from %s to %s\n", + file, srcDir, destDir); + return CL5_SYSTEM_ERROR; + } + + obj = objset_next_obj (s_cl5Desc.dbFiles, obj); + } + + return CL5_SUCCESS; +} + +/* + * return 1: true (the "filename" ends with "ext") + * return 0: false + */ +static int _cl5FileEndsWith(const char *filename, const char *ext) +{ + char *p = NULL; + int flen = strlen(filename); + int elen = strlen(ext); + if (0 == flen || 0 == elen) + { + return 0; + } + p = strstr(filename, ext); + if (NULL == p) + { + return 0; + } + if (p - filename + elen == flen) + { + return 1; + } + return 0; +} + +static int _cl5ExportFile (PRFileDesc *prFile, Object *obj) +{ + int rc; + void *iterator = NULL; + slapi_operation_parameters op = {0}; + char *buff; + PRInt32 len, wlen; + CL5Entry entry; + CL5DBFile *file; + + PR_ASSERT (prFile && obj); + + file = (CL5DBFile*)object_get_data (obj); + PR_ASSERT (file); + + ruv_dump (file->purgeRUV, "clpurgeruv", prFile); + ruv_dump (file->maxRUV, "clmaxruv", prFile); + slapi_write_buffer (prFile, "\n", strlen("\n")); + + entry.op = &op; + rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); + while (rc == CL5_SUCCESS) + { + rc = _cl5Operation2LDIF (&op, file->replGen, &buff, &len); + if (rc != CL5_SUCCESS) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5ExportLDIF: failed to convert operation to ldif\n"); + operation_parameters_done (&op); + break; + } + + wlen = slapi_write_buffer (prFile, buff, len); + slapi_ch_free((void **)&buff); + if (wlen < len) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5ExportLDIF: failed to write to ldif file\n"); + rc = CL5_SYSTEM_ERROR; + operation_parameters_done (&op); + break; + } + + cl5_operation_parameters_done (&op); + + rc = _cl5GetNextEntry (&entry, iterator); + } + + cl5_operation_parameters_done (&op); + + if (iterator) + cl5DestroyIterator (iterator); + + if (rc != CL5_NOTFOUND) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5ExportLDIF: failed to retrieve changelog entry\n"); + } + else + { + rc = CL5_SUCCESS; + } + + return rc; +} + +static PRBool _cl5ReplicaInList (Object *replica, Object **replicas) +{ + int i; + + PR_ASSERT (replica && replicas); + + /* ONREPL I think it should be sufficient to just compare replica pointers */ + for (i=0; replicas[i]; i++) + { + if (replica == replicas[i]) + return PR_TRUE; + } + + return PR_FALSE; +} + +static char* _cl5GetHelperEntryKey (int type, char *csnStr) +{ + CSN *csn= csn_new(); + char *rt; + + csn_set_time(csn, type); + csn_set_replicaid(csn, 0); + + rt = csn_as_string(csn, PR_FALSE, csnStr); + csn_free(&csn); + + return rt; +} + +static Object* _cl5GetReplica (const slapi_operation_parameters *op, const char* replGen) +{ + Slapi_DN *sdn; + Object *replObj; + Replica *replica; + char *newGen; + + PR_ASSERT (op && replGen); + + sdn = slapi_sdn_new_dn_byref(op->target_address.dn); + + replObj = replica_get_replica_from_dn (sdn); + if (replObj) + { + /* check to see if replica generation has not change */ + replica = (Replica*)object_get_data (replObj); + PR_ASSERT (replica); + newGen = replica_get_generation (replica); + PR_ASSERT (newGen); + if (strcmp (replGen, newGen) != 0) + { + object_release (replObj); + replObj = NULL; + } + + slapi_ch_free ((void**)&replGen); + } + + slapi_sdn_free (&sdn); + + return replObj; +} + +int +cl5_is_diskfull() +{ + int rc; + PR_Lock(cl5_diskfull_lock); + rc = cl5_diskfull_flag; + PR_Unlock(cl5_diskfull_lock); + return rc; +} + +static void +cl5_set_diskfull() +{ + PR_Lock(cl5_diskfull_lock); + cl5_diskfull_flag = 1; + PR_Unlock(cl5_diskfull_lock); +} + +static void +cl5_set_no_diskfull() +{ + PR_Lock(cl5_diskfull_lock); + cl5_diskfull_flag = 0; + PR_Unlock(cl5_diskfull_lock); +} + +int +cl5_diskspace_is_available() +{ + int rval = 1; + +#if defined( OS_solaris ) || defined( hpux ) + struct statvfs fsbuf; + if (statvfs(s_cl5Desc.dbDir, &fsbuf) < 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5_diskspace_is_available: Cannot get file system info\n"); + rval = 0; + } + else + { + unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_frsize; + if (fsiz < NO_DISK_SPACE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5_diskspace_is_available: No enough diskspace for changelog: (%u bytes free)\n", fsiz); + rval = 0; + } + else if (fsiz > MIN_DISK_SPACE) + { + /* assume recovered */ + cl5_set_no_diskfull(); + } + } +#endif +#if defined( linux ) + struct statfs fsbuf; + if (statfs(s_cl5Desc.dbDir, &fsbuf) < 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5_diskspace_is_available: Cannot get file system info\n"); + rval = 0; + } + else + { + unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_bsize; + if (fsiz < NO_DISK_SPACE) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "cl5_diskspace_is_available: No enough diskspace for changelog: (%u bytes free)\n", fsiz); + rval = 0; + } + else if (fsiz > MIN_DISK_SPACE) + { + /* assume recovered */ + cl5_set_no_diskfull(); + } + } +#endif + return rval; +} |