summaryrefslogtreecommitdiffstats
path: root/ldap/servers/plugins/replication/cl5_api.c
diff options
context:
space:
mode:
authorcvsadm <cvsadm>2005-01-21 00:44:34 +0000
committercvsadm <cvsadm>2005-01-21 00:44:34 +0000
commitb2093e3016027d6b5cf06b3f91f30769bfc099e2 (patch)
treecf58939393a9032182c4fbc4441164a9456e82f8 /ldap/servers/plugins/replication/cl5_api.c
downloadds-ldapserver7x.tar.gz
ds-ldapserver7x.tar.xz
ds-ldapserver7x.zip
Moving NSCP Directory Server from DirectoryBranch to TRUNK, initial drop. (foxworth)ldapserver7x
Diffstat (limited to 'ldap/servers/plugins/replication/cl5_api.c')
-rw-r--r--ldap/servers/plugins/replication/cl5_api.c6512
1 files changed, 6512 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
new file mode 100644
index 00000000..792d3646
--- /dev/null
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -0,0 +1,6512 @@
+/** BEGIN COPYRIGHT BLOCK
+ * Copyright 2001 Sun Microsystems, Inc.
+ * Portions copyright 1999, 2001-2003 Netscape Communications Corporation.
+ * All rights reserved.
+ * END COPYRIGHT BLOCK **/
+
+/* cl5_api.c - implementation of 5.0 style changelog API */
+
+#include <errno.h>
+#include <sys/stat.h>
+#if defined( OS_solaris ) || defined( hpux )
+#include <sys/types.h>
+#include <sys/statvfs.h>
+#endif
+#if defined( linux )
+#include <sys/vfs.h>
+#endif
+
+
+#include "cl5_api.h"
+#include "plhash.h"
+
+#include "db.h"
+#include "cl5_clcache.h" /* To use the Changelog Cache */
+#include "repl5.h" /* for agmt_get_consumer_rid() */
+
+#define CL5_TYPE "Changelog5" /* changelog type */
+#define VERSION_SIZE 127 /* size of the buffer to hold changelog version */
+#define GUARDIAN_FILE "guardian" /* name of the guardian file */
+#define VERSION_FILE "DBVERSION" /* name of the version file */
+#define MAX_TRIALS 50 /* number of retries on db operations */
+#define V_5 5 /* changelog entry version */
+#define CHUNK_SIZE 64*1024
+#define DBID_SIZE 64
+#define FILE_SEP "_" /* separates parts of the db file name */
+
+#define T_CSNSTR "csn"
+#define T_UNIQUEIDSTR "nsuniqueid"
+#define T_PARENTIDSTR "parentuniqueid"
+#define T_NEWSUPERIORDNSTR "newsuperiordn"
+#define T_NEWSUPERIORIDSTR "newsuperioruniqueid"
+#define T_REPLGEN "replgen"
+
+#define ENTRY_COUNT_TIME 111 /* this time is used to construct csn
+ used to store/retrieve entry count */
+#define PURGE_RUV_TIME 222 /* this time is used to construct csn
+ used to store purge RUV vector */
+#define MAX_RUV_TIME 333 /* this time is used to construct csn
+ used to store upper boundary RUV vector */
+
+#define DB_EXTENSION_DB3 "db3"
+#define DB_EXTENSION "db4"
+
+#define HASH_BACKETS_COUNT 16 /* number of buckets in a hash table */
+
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 4100
+#define DEFAULT_DB_OP_FLAGS DB_AUTO_COMMIT
+#define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval) \
+{ \
+ if (((oflags) & DB_INIT_TXN) && ((oflags) & DB_INIT_LOG)) \
+ { \
+ (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags)|DB_AUTO_COMMIT, (mode)); \
+ } \
+ else \
+ { \
+ (rval) = (db)->open((db), (txnid), (file), (database), (type), (flags), (mode)); \
+ } \
+}
+#else /* older then db 41 */
+#define DEFAULT_DB_OP_FLAGS 0
+#define DB_OPEN(oflags, db, txnid, file, database, type, flags, mode, rval) \
+ (rval) = (db)->open((db), (file), (database), (type), (flags), (mode))
+#endif
+
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 4000
+#define DB_ENV_SET_REGION_INIT(env) (env)->set_flags((env), DB_REGION_INIT, 1)
+#define DB_ENV_SET_TAS_SPINS(env, tas_spins) \
+ (env)->set_tas_spins((env), (tas_spins))
+#define TXN_BEGIN(env, parent_txn, tid, flags) \
+ (env)->txn_begin((env), (parent_txn), (tid), (flags))
+#define TXN_COMMIT(txn, flags) (txn)->commit((txn), (flags))
+#define TXN_ABORT(txn) (txn)->abort(txn)
+#define TXN_CHECKPOINT(env, kbyte, min, flags) \
+ (env)->txn_checkpoint((env), (kbyte), (min), (flags))
+#define MEMP_STAT(env, gsp, fsp, flags, malloc) \
+ (env)->memp_stat((env), (gsp), (fsp), (flags))
+#define MEMP_TRICKLE(env, pct, nwrotep) \
+ (env)->memp_trickle((env), (pct), (nwrotep))
+#define LOG_ARCHIVE(env, listp, flags, malloc) \
+ (env)->log_archive((env), (listp), (flags))
+#define LOG_FLUSH(env, lsn) (env)->log_flush((env), (lsn))
+#define LOCK_DETECT(env, flags, atype, aborted) \
+ (env)->lock_detect((env), (flags), (atype), (aborted))
+
+#else /* older than db 4.0 */
+#define DB_ENV_SET_REGION_INIT(env) db_env_set_region_init(1)
+#define DB_ENV_SET_TAS_SPINS(env, tas_spins) \
+ db_env_set_tas_spins((tas_spins))
+#define TXN_BEGIN(env, parent_txn, tid, flags) \
+ txn_begin((env), (parent_txn), (tid), (flags))
+#define TXN_COMMIT(txn, flags) txn_commit((txn), (flags))
+#define TXN_ABORT(txn) txn_abort((txn))
+#define TXN_CHECKPOINT(env, kbyte, min, flags) \
+ txn_checkpoint((env), (kbyte), (min), (flags))
+#define MEMP_TRICKLE(env, pct, nwrotep) memp_trickle((env), (pct), (nwrotep))
+#define LOG_FLUSH(env, lsn) log_flush((env), (lsn))
+#define LOCK_DETECT(env, flags, atype, aborted) \
+ lock_detect((env), (flags), (atype), (aborted))
+
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3300
+#define MEMP_STAT(env, gsp, fsp, flags, malloc) memp_stat((env), (gsp), (fsp))
+#define LOG_ARCHIVE(env, listp, flags, malloc) \
+ log_archive((env), (listp), (flags))
+
+#else /* older than db 3.3 */
+#define MEMP_STAT(env, gsp, fsp, flags, malloc) \
+ memp_stat((env), (gsp), (fsp), (malloc))
+#define LOG_ARCHIVE(env, listp, flags, malloc) \
+ log_archive((env), (listp), (flags), (malloc))
+#endif
+#endif
+/*
+ * The defult thread stacksize for nspr21 is 64k. For OSF, we require
+ * a larger stacksize as actual storage allocation is higher i.e
+ * pointers are allocated 8 bytes but lower 4 bytes are used.
+ * The value 0 means use the default stacksize.
+ */
+#if defined (OSF1) || defined (__LP64__) || defined (_LP64) /* 64-bit architectures need bigger stacks */
+#define DEFAULT_THREAD_STACKSIZE 131072L
+#else
+#define DEFAULT_THREAD_STACKSIZE 0
+#endif
+
+#ifdef _WIN32
+#define FILE_CREATE_MODE S_IREAD | S_IWRITE
+#define DIR_CREATE_MODE 0755
+#else /* _WIN32 */
+#define FILE_CREATE_MODE S_IRUSR | S_IWUSR
+#define DIR_CREATE_MODE 0755
+#endif
+
+#define NO_DISK_SPACE 1024
+#define MIN_DISK_SPACE 10485760 /* 10 MB */
+
+/***** Data Definitions *****/
+
+/* possible changelog open modes */
+typedef enum
+{
+ CL5_OPEN_NONE, /* nothing specified */
+ CL5_OPEN_NORMAL, /* open for normal read/write use */
+ CL5_OPEN_RESTORE_RECOVER, /* restore from archive and recover */
+ CL5_OPEN_RESTORE, /* restore, but no recovery */
+ CL5_OPEN_LDIF2CL, /* open as part of ldif2cl: no locking,
+ recovery, checkpointing */
+ CL5_OPEN_CLEAN_RECOVER /* remove env after recover open (upgrade) */
+} CL5OpenMode;
+
+#define DB_FILE_DELETED 0x1
+#define DB_FILE_INIT 0x2
+/* this structure represents one changelog file, Each changelog file contains
+ changes applied to a single backend. Files are named by the database id */
+typedef struct cl5dbfile
+{
+ char *name; /* file name (with the extension) */
+ char *replGen; /* replica generation of the data */
+ char *replName; /* replica name */
+ DB *db; /* db handle to the changelog file*/
+ int entryCount; /* number of entries in the file */
+ int flags; /* currently used to mark the file as deleted
+ * or as initialized */
+ RUV *purgeRUV; /* ruv to which the file has been purged */
+ RUV *maxRUV; /* ruv that marks the upper boundary of the data */
+ char *semaName; /* semaphore name */
+ PRSem *sema; /* semaphore for max concurrent cl writes */
+}CL5DBFile;
+
+/* structure that allows to iterate through entries to be sent to a consumer
+ that originated on a particular supplier. */
+struct cl5replayiterator
+{
+ Object *fileObj;
+ CLC_Buffer *clcache; /* changelog cache */
+ ReplicaId consumerRID; /* consumer's RID */
+ const RUV *consumerRuv; /* consumer's update vector */
+ Object *supplierRuvObj;/* supplier's update vector object */
+};
+
+typedef struct cl5iterator
+{
+ DBC *cursor; /* current position in the db file */
+ Object *file; /* handle to release db file object */
+}CL5Iterator;
+
+/* changelog trimming configuration */
+typedef struct cl5trim
+{
+ time_t maxAge; /* maximum entry age in seconds */
+ int maxEntries; /* maximum number of entries across all changelog files */
+ PRLock* lock; /* controls access to trimming configuration */
+} CL5Trim;
+
+/* this structure defines 5.0 changelog internals */
+typedef struct cl5desc
+{
+ char *dbDir; /* absolute path to changelog directory */
+ DB_ENV *dbEnv; /* db environment shared by all db files */
+ int dbEnvOpenFlags;/* openflag used for env->open */
+ Objset *dbFiles; /* ref counted set of changelog files (CL5DBFile) */
+ PRLock *fileLock; /* ensures that changelog file is not added twice */
+ CL5OpenMode dbOpenMode; /* how we open db */
+ CL5DBConfig dbConfig; /* database configuration params */
+ CL5Trim dbTrim; /* trimming parameters */
+ CL5State dbState; /* changelog current state */
+ PRRWLock *stLock; /* lock that controls access to the changelog state */
+ PRBool dbRmOnClose;/* indicates whether changelog should be removed when
+ it is closed */
+ PRBool fatalError; /* bad stuff happened like out of disk space; don't
+ write guardian file on close - UnUsed so far */
+ int threadCount;/* threads that globally access changelog like
+ deadlock detection, etc. */
+ PRLock *clLock; /* Lock associated to clVar, used to notify threads on close */
+ PRCondVar *clCvar; /* Condition Variable used to notify threads on close */
+} CL5Desc;
+
+typedef void (*VFP)(void *);
+
+int g_get_shutdown(); /* declared in proto-slap.h */
+
+/***** Global Variables *****/
+static CL5Desc s_cl5Desc;
+
+/***** Forward Declarations *****/
+
+/* changelog initialization and cleanup */
+static int _cl5Open (const char *dir, const CL5DBConfig *config, CL5OpenMode openMode);
+static int _cl5AppInit (PRBool *didRecovery);
+static int _cl5DBOpen ();
+static void _cl5SetDefaultDBConfig ();
+static void _cl5SetDBConfig (const CL5DBConfig *config);
+static void _cl5InitDBEnv(DB_ENV *dbEnv);
+static int _cl5CheckDBVersion ();
+static int _cl5ReadDBVersion (const char *dir, char *clVersion);
+static int _cl5WriteDBVersion ();
+static int _cl5CheckGuardian ();
+static int _cl5ReadGuardian (char *buff);
+static int _cl5WriteGuardian ();
+static int _cl5RemoveGuardian ();
+static void _cl5Close ();
+static int _cl5Delete (const char *dir, PRBool rmDir);
+static void _cl5DBClose ();
+
+/* thread management */
+static int _cl5DispatchDBThreads ();
+static int _cl5AddThread ();
+static void _cl5RemoveThread ();
+static int _cl5DeadlockMain (void *param);
+static int _cl5CheckpointMain (void *param);
+static int _cl5TrickleMain (void *param);
+
+/* functions that work with individual changelog files */
+static int _cl5NewDBFile (const char *replName, const char *replGen, CL5DBFile** dbFile);
+static int _cl5DBOpenFile (Object *replica, Object **obj, PRBool checkDups);
+static int _cl5DBOpenFileByReplicaName (const char *replName, const char *replGen,
+ Object **obj, PRBool checkDups);
+static void _cl5DBCloseFile (void **data);
+static void _cl5DBDeleteFile (Object *obj);
+static void _cl5DBFileInitialized (Object *obj);
+static int _cl5GetDBFile (Object *replica, Object **obj);
+static int _cl5GetDBFileByReplicaName (const char *replName, const char *replGen,
+ Object **obj);
+static int _cl5AddDBFile (CL5DBFile *file, Object **obj);
+static int _cl5CompareDBFile (Object *el1, const void *el2);
+static int _cl5CopyDBFiles (const char *srcDir, const char *distDir, Object **replicas);
+static char* _cl5Replica2FileName (Object *replica);
+static char* _cl5MakeFileName (const char *replName, const char *replGen);
+static PRBool _cl5FileName2Replica (const char *fileName, Object **replica);
+static int _cl5ExportFile (PRFileDesc *prFile, Object *obj);
+static PRBool _cl5ReplicaInList (Object *replica, Object **replicas);
+
+/* data storage and retrieval */
+static int _cl5Entry2DBData (const CL5Entry *entry, char **data, PRUint32 *len);
+static int _cl5WriteOperation(const char *replName, const char *replGen,
+ const slapi_operation_parameters *op, PRBool local);
+static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid);
+static int _cl5GetNextEntry (CL5Entry *entry, void *iterator);
+static int _cl5CurrentDeleteEntry (void *iterator);
+static PRBool _cl5IsValidIterator (const CL5Iterator *iterator);
+static int _cl5GetOperation (Object *replica, slapi_operation_parameters *op);
+static const char* _cl5OperationType2Str (int type);
+static int _cl5Str2OperationType (const char *str);
+static void _cl5WriteString (const char *str, char **buff);
+static void _cl5ReadString (char **str, char **buff);
+static void _cl5WriteMods (LDAPMod **mods, char **buff);
+static void _cl5WriteMod (LDAPMod *mod, char **buff);
+static int _cl5ReadMods (LDAPMod ***mods, char **buff);
+static int _cl5ReadMod (Slapi_Mod *mod, char **buff);
+static int _cl5GetModsSize (LDAPMod **mods);
+static int _cl5GetModSize (LDAPMod *mod);
+static void _cl5ReadBerval (struct berval *bv, char** buff);
+static void _cl5WriteBerval (struct berval *bv, char** buff);
+static int _cl5ReadBervals (struct berval ***bv, char** buff, unsigned int size);
+static int _cl5WriteBervals (struct berval **bv, char** buff, unsigned int *size);
+
+/* replay iteration */
+static PRBool _cl5ValidReplayIterator (const CL5ReplayIterator *iterator);
+static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consumerRuv,
+ Object *replica, Object *fileObject, CL5ReplayIterator **iterator);
+static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DBFile *file);
+
+/* changelog trimming */
+static int _cl5TrimInit ();
+static void _cl5TrimCleanup ();
+static int _cl5TrimMain (void *param);
+static void _cl5DoTrimming ();
+static void _cl5TrimFile (Object *obj, long *numToTrim);
+static PRBool _cl5CanTrim (time_t time, long *numToTrim);
+static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge);
+static int _cl5WriteRUV (CL5DBFile *file, PRBool purge);
+static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge);
+static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
+static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv);
+
+/* db error processing */
+static void _cl5DBLogPrint(const char* prefix, char *buffer);
+
+/* bakup/recovery, import/export */
+static PRBool _cl5IsLogFile (const char *name);
+static int _cl5Recover (int open_flags, DB_ENV *dbEnv);
+static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op,
+ char **replGen);
+static int _cl5Operation2LDIF (const slapi_operation_parameters *op, const char *replGen,
+ char **ldifEntry, PRInt32 *lenLDIF);
+
+/* entry count */
+static int _cl5GetEntryCount (CL5DBFile *file);
+static int _cl5WriteEntryCount (CL5DBFile *file);
+
+/* misc */
+static char* _cl5GetHelperEntryKey (int type, char *csnStr);
+static Object* _cl5GetReplica (const slapi_operation_parameters *op, const char* replGen);
+static int _cl5FileEndsWith(const char *filename, const char *ext);
+
+/* Callback function for libdb to spit error info into our log */
+static void dblayer_log_print(const char* prefix, char *buffer)
+{
+ /* We ignore the prefix since we know who we are anyway */
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "libdb: %s\n", buffer);
+}
+
+static PRLock *cl5_diskfull_lock = NULL;
+static int cl5_diskfull_flag = 0;
+
+static void cl5_set_diskfull();
+static void cl5_set_no_diskfull();
+
+/***** Module APIs *****/
+
+/* Name: cl5Init
+ Description: initializes changelog module; must be called by a single thread
+ before any other changelog function.
+ Parameters: none
+ Return: CL5_SUCCESS if function is successful;
+ CL5_SYSTEM_ERROR error if NSPR call fails.
+ */
+int cl5Init ()
+{
+ s_cl5Desc.stLock = PR_NewRWLock(PR_RWLOCK_RANK_NONE, "state_lock");
+ if (s_cl5Desc.stLock == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Init: failed to create state lock; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+ if ((s_cl5Desc.clLock = PR_NewLock()) == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Init: failed to create on close lock; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+
+ }
+ if ((s_cl5Desc.clCvar = PR_NewCondVar(s_cl5Desc.clLock)) == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Init: failed to create on close cvar; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ if (( clcache_init (&s_cl5Desc.dbEnv) != 0 )) {
+ return CL5_SYSTEM_ERROR;
+ }
+
+ s_cl5Desc.dbState = CL5_STATE_CLOSED;
+ s_cl5Desc.fatalError = PR_FALSE;
+ s_cl5Desc.dbRmOnClose = PR_FALSE;
+ s_cl5Desc.threadCount = 0;
+
+ if (NULL == cl5_diskfull_lock)
+ {
+ cl5_diskfull_lock = PR_NewLock ();
+ }
+
+ return CL5_SUCCESS;
+}
+
+/* Name: cl5Cleanup
+ Description: performs cleanup of the changelog module; must be called by a single
+ thread; it closes changelog if it is still open.
+ Parameters: none
+ Return: none
+ */
+void cl5Cleanup ()
+{
+ /* close db if it is still open */
+ if (s_cl5Desc.dbState == CL5_STATE_OPEN)
+ {
+ cl5Close ();
+ }
+
+ if (s_cl5Desc.stLock)
+ PR_DestroyRWLock (s_cl5Desc.stLock);
+ s_cl5Desc.stLock = NULL;
+
+ if (cl5_diskfull_lock)
+ {
+ PR_DestroyLock (cl5_diskfull_lock);
+ cl5_diskfull_lock = NULL;
+ }
+
+ memset (&s_cl5Desc, 0, sizeof (s_cl5Desc));
+}
+
+/* Name: cl5Open
+ Description: opens changelog; must be called after changelog is
+ initialized using cl5Init. It is thread safe and the second
+ call is ignored.
+ Parameters: dir - changelog dir
+ config - db configuration parameters; currently not used
+ Return: CL5_SUCCESS if successfull;
+ CL5_BAD_DATA if invalid directory is passed;
+ CL5_BAD_STATE if changelog is not initialized;
+ CL5_BAD_DBVERSION if dbversion file is missing or has unexpected data
+ CL5_SYSTEM_ERROR if NSPR error occured (during db directory creation);
+ CL5_MEMORY_ERROR if memory allocation fails;
+ CL5_DB_ERROR if db initialization fails.
+ */
+int cl5Open (const char *dir, const CL5DBConfig *config)
+{
+ int rc;
+
+ if (dir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5Open: null directory\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Open: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* prevent state from changing */
+ PR_RWLock_Wlock (s_cl5Desc.stLock);
+
+ /* already open - ignore */
+ if (s_cl5Desc.dbState == CL5_STATE_OPEN)
+ {
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5Open: changelog already opened; request ignored\n");
+ rc = CL5_SUCCESS;
+ goto done;
+ }
+ else if (s_cl5Desc.dbState != CL5_STATE_CLOSED)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Open: invalid state - %d\n", s_cl5Desc.dbState);
+ rc = CL5_BAD_STATE;
+ goto done;
+ }
+
+ rc = _cl5Open (dir, config, CL5_OPEN_NORMAL);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Open: failed to open changelog\n");
+ goto done;
+ }
+
+ /* dispatch global threads like deadlock detection, trimming, etc */
+ rc = _cl5DispatchDBThreads ();
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Open: failed to start database monitoring threads\n");
+
+ _cl5Close ();
+ }
+ else
+ {
+ s_cl5Desc.dbState = CL5_STATE_OPEN;
+ }
+
+done:;
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+
+ return rc;
+}
+
+/* Name: cl5Close
+ Description: closes changelog; waits until all threads are done using changelog;
+ call is ignored if changelog is already closed.
+ Parameters: none
+ Return: CL5_SUCCESS if successful;
+ CL5_BAD_STATE if db is not in the open or closed state;
+ CL5_SYSTEM_ERROR if NSPR call fails;
+ CL5_DB_ERROR if db shutdown fails
+ */
+int cl5Close ()
+{
+ int rc = CL5_SUCCESS;
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5Close: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ PR_RWLock_Wlock (s_cl5Desc.stLock);
+
+ /* already closed - ignore */
+ if (s_cl5Desc.dbState == CL5_STATE_CLOSED)
+ {
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5Close: changelog closed; request ignored\n");
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return CL5_SUCCESS;
+ }
+ else if (s_cl5Desc.dbState != CL5_STATE_OPEN)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5Close: invalid state - %d\n", s_cl5Desc.dbState);
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return CL5_BAD_STATE;
+ }
+
+ /* signal changelog closing to all threads */
+ s_cl5Desc.dbState = CL5_STATE_CLOSING;
+
+ PR_Lock(s_cl5Desc.clLock);
+ PR_NotifyCondVar(s_cl5Desc.clCvar);
+ PR_Unlock(s_cl5Desc.clLock);
+
+ _cl5Close ();
+
+ s_cl5Desc.dbState = CL5_STATE_CLOSED;
+
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+
+ return rc;
+}
+
+/* Name: cl5Delete
+ Description: removes changelog; changelog must be in the closed state.
+ Parameters: dir - changelog directory
+ Return: CL5_SUCCESS if successful;
+ CL5_BAD_STATE if the changelog is not in closed state;
+ CL5_BAD_DATA if invalid directory supplied
+ CL5_SYSTEM_ERROR if NSPR call fails
+ */
+int cl5Delete (const char *dir)
+{
+ int rc;
+
+ if (dir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, "cl5Delete: null directory\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5Delete: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ PR_RWLock_Wlock (s_cl5Desc.stLock);
+
+ if (s_cl5Desc.dbState != CL5_STATE_CLOSED)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5Delete: invalid state - %d\n", s_cl5Desc.dbState);
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return CL5_BAD_STATE;
+ }
+
+ rc = _cl5Delete (dir, PR_TRUE /* remove changelog dir */);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5Delete: failed to remove changelog\n");
+ }
+
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return rc;
+}
+
+/* Name: cl5OpenDB
+ Description: opens changelog file for specified file
+ Parameters: replica - replica whose file we wish to open
+ Return: CL5_SUCCESS if successful;
+ CL5_BAD_STATE if the changelog is not initialized;
+ CL5_BAD_DATA - if NULL id is supplied
+ */
+int cl5OpenDB (Object *replica)
+{
+ int rc;
+
+ if (replica == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5OpenDB: null replica\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5OpenDB: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog stays open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5DBOpenFile (replica, NULL /* file object */, PR_TRUE /* check for duplicates */);
+
+ _cl5RemoveThread ();
+
+ return rc;
+}
+
+/* Name: cl5CloseDB
+ Description: closes changelog file for the specified replica
+ Parameters: replica - replica whose file we wish to close
+ Return: CL5_SUCCESS if successful;
+ CL5_BAD_STATE if the changelog is not initialized;
+ CL5_BAD_DATA - if NULL id is supplied
+ CL5_NOTFOUND - nothing is known about specified database
+ */
+int cl5CloseDB (Object *replica)
+{
+ int rc;
+ Object *obj;
+
+ if (replica == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5CloseDB: null replica\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5CloseDB: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog is open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc == CL5_SUCCESS)
+ {
+ rc = objset_remove_obj(s_cl5Desc.dbFiles, obj);
+ object_release (obj);
+ }
+ else
+ {
+ Replica *r;
+
+ r = (Replica*)object_get_data (replica);
+ PR_ASSERT (r);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5CloseDB: failed to close file for replica at (%s)\n",
+ slapi_sdn_get_dn (replica_get_root (r)));
+ }
+
+ _cl5RemoveThread ();
+ return rc;
+}
+
+/* Name: cl5DeleteDB
+ Description: asynchronously removes changelog file for the specified replica.
+ The file is physically removed when it is no longer in use.
+ This function is called when a backend is removed or reloaded.
+ Parameters: replica - replica whose file we wish to delete
+ Return: CL5_SUCCESS if successful;
+ CL5_BAD_STATE if the changelog is not initialized;
+ CL5_BAD_DATA - if NULL id is supplied
+ CL5_NOTFOUND - nothing is known about specified database
+ */
+int cl5DeleteDB (Object *replica)
+{
+ Object *obj;
+ int rc;
+
+ if (replica == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5DeleteDB: invalid database id\n");
+ return CL5_BAD_DATA;
+ }
+
+ /* changelog is not initialized */
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDB: "
+ "changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog stays open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc == CL5_SUCCESS)
+ {
+ _cl5DBDeleteFile (obj);
+ }
+ else
+ {
+ Replica *r = (Replica*)object_get_data (replica);
+ PR_ASSERT (r);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDB: "
+ "file for replica at (%s) not found\n",
+ slapi_sdn_get_dn (replica_get_root (r)));
+ }
+
+ _cl5RemoveThread ();
+ return rc;
+}
+
+/* Name: cl5DeleteDBSync
+ Description: The same as cl5DeleteDB except the function does not return
+ until the file is removed.
+*/
+int cl5DeleteDBSync (Object *replica)
+{
+ Object *obj;
+ int rc;
+ CL5DBFile *file;
+ char fName [MAXPATHLEN + 1];
+
+ if (replica == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5DeleteDBSync: invalid database id\n");
+ return CL5_BAD_DATA;
+ }
+
+ /* changelog is not initialized */
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync: "
+ "changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog stays open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc == CL5_SUCCESS)
+ {
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, file->name);
+
+ _cl5DBDeleteFile (obj);
+
+ /* wait until the file is gone */
+ while (PR_Access (fName, PR_ACCESS_EXISTS) == PR_SUCCESS)
+ {
+ DS_Sleep (PR_MillisecondsToInterval(100));
+ }
+
+ }
+ else
+ {
+ Replica *r = (Replica*)object_get_data (replica);
+ PR_ASSERT (r);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5DeleteDBSync: "
+ "file for replica at (%s) not found\n",
+ slapi_sdn_get_dn (replica_get_root (r)));
+ }
+
+ _cl5RemoveThread ();
+ return rc;
+}
+
+/* Name: cl5GetUpperBoundRUV
+ Description: retrieves vector for that represnts the upper bound of the changes for a replica.
+ Parameters: r - replica for which the purge vector is requested
+ ruv - contains a copy of the purge ruv if function is successful;
+ unchanged otherwise. It is responsobility pf the caller to free
+ the ruv when it is no longer is in use
+ Return: CL5_SUCCESS if function is successfull
+ CL5_BAD_STATE if the changelog is not initialized;
+ CL5_BAD_DATA - if NULL id is supplied
+ CL5_NOTFOUND, if changelog file for replica is not found
+ */
+int cl5GetUpperBoundRUV (Replica *r, RUV **ruv)
+{
+ int rc;
+ Object *r_obj, *file_obj;
+ CL5DBFile *file;
+
+ if (r == NULL || ruv == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5GetUpperBoundRUV: invalid parameters\n");
+ return CL5_BAD_DATA;
+ }
+
+ /* changelog is not initialized */
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetUpperBoundRUV: "
+ "changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog stays open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ /* create a temporary replica object because of the interface we have */
+ r_obj = object_new (r, NULL);
+
+ rc = _cl5GetDBFile (r_obj, &file_obj);
+ if (rc == CL5_SUCCESS)
+ {
+ file = (CL5DBFile*)object_get_data (file_obj);
+ PR_ASSERT (file && file->maxRUV);
+
+ *ruv = ruv_dup (file->maxRUV);
+
+ object_release (file_obj);
+ }
+
+ object_release (r_obj);
+
+ _cl5RemoveThread ();
+ return rc;
+}
+
+/* Name: cl5Backup
+ Description: makes a backup of the changelog including *.db2,
+ log files, and dbversion. Can be called with the changelog in either open or
+ closed state.
+ Parameters: bkDir - directory to which the data is backed up;
+ created if it does not exist
+ replicas - optional list of replicas whose changes should be backed up;
+ if the list is NULL, entire changelog is backed up.
+ Return: CL5_SUCCESS if function is successful;
+ CL5_BAD_DATA if invalid directory is passed;
+ CL5_BAD_STATE if changelog has not been initialized;
+ CL5_DB_ERROR if db call fails;
+ CL5_SYSTEM_ERROR if NSPR call or file copy failes.
+ */
+int cl5Backup (const char *bkDir, Object **replicas)
+{
+ int rc;
+ char **list = NULL;
+ char **logFile;
+ char srcFile [MAXPATHLEN + 1];
+ char destFile[MAXPATHLEN + 1];
+ DB_TXN *txn = NULL;
+
+ if (bkDir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5Backup: null backup directory\n");
+ return CL5_BAD_DATA;
+ }
+
+ /* changelog must be initialized */
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog stays open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ /* create backup directory if necessary */
+ rc = cl5CreateDirIfNeeded (bkDir);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: failed to create backup directory\n");
+ goto done;
+ }
+
+ /* start transaction to tempararily prevent transaction log
+ from being trimmed
+ */
+ rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL /*pid*/, &txn, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: failed to begin transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5Backup: starting changelog backup from %s to %s ...\n", s_cl5Desc.dbDir, bkDir);
+
+ /* The following files are backed up: *.<dbext>, log files, dbversion file */
+
+ /* copy db file */
+ /* ONREPL currently, list of replicas is ignored because db code can't handle
+ discrepancy between transaction log and present files; should be fixed before 5.0 ships */
+ rc = _cl5CopyDBFiles (s_cl5Desc.dbDir, bkDir, replicas);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup : failed to copy database files from %s to %s\n", s_cl5Desc.dbDir, bkDir);
+ goto done;
+ }
+
+ /* copy db log files */
+ rc = LOG_ARCHIVE(s_cl5Desc.dbEnv, &list, DB_ARCH_LOG, malloc);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: failed to get list of log files; db error - %d %s\n",
+ rc, db_strerror(rc));
+ rc = CL5_SYSTEM_ERROR;
+ goto done;
+ }
+
+ if (list)
+ {
+ logFile = list;
+ while (*logFile)
+ {
+ PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, *logFile);
+ PR_snprintf(destFile, MAXPATHLEN, "%s/%s", bkDir, *logFile);
+ rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: failed to copy %s\n", *logFile);
+ rc = CL5_SYSTEM_ERROR;
+ goto done;
+ }
+
+ logFile ++;
+ }
+
+ free(list);
+ }
+
+ /* now, copy the version file */
+ PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, VERSION_FILE);
+ PR_snprintf(destFile, MAXPATHLEN, "%s/%s", bkDir, VERSION_FILE);
+ rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: failed to copy %s\n", VERSION_FILE);
+ rc = CL5_SYSTEM_ERROR;
+ goto done;
+ }
+
+ rc = CL5_SUCCESS;
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5Backup: changelog backup is finished \n");
+done:;
+ if (txn && TXN_ABORT (txn) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Backup: failed to abort transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ }
+
+ _cl5RemoveThread ();
+
+ return rc;
+}
+
+/* Name: cl5Restore
+ Description: restores changelog from the backed up copy. Changelog must be ibnitalized and closed.
+ Parameters: clDir - changelog dir
+ bkDir - directory that contains the backup
+ replicas - optional list of replicas whose changes should be recovered;
+ if the list is NULL, entire changelog is recovered.
+ Return: CL5_SUCCESS if function is successfull;
+ CL5_BAD_DATA if invalid parameter is passed;
+ CL5_BAD_STATE if changelog is open or not initialized;
+ CL5_DB_ERROR if db call fails;
+ CL5_SYSTEM_ERROR if NSPR call of file copy fails
+ */
+int cl5Restore (const char *clDir, const char *bkDir, Object **replicas)
+{
+ int rc;
+ char srcFile[MAXPATHLEN + 1];
+ char destFile[MAXPATHLEN + 1];
+ PRDir *prDir;
+ PRDirEntry *prDirEntry;
+ int seenLog = 0; /* Tells us if we restored any logfiles */
+
+ if (clDir == NULL || bkDir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5Restore: null parameter\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Restore: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* prevent state change while recovery is in progress */
+ PR_RWLock_Wlock (s_cl5Desc.stLock);
+
+ if (s_cl5Desc.dbState != CL5_STATE_CLOSED)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Restore: changelog must be closed\n");
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return CL5_BAD_STATE;
+ }
+
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5Restore: starting changelog recovery from %s to %s ...\n", bkDir, clDir);
+
+ /* delete current changelog content */
+ rc = _cl5Delete (clDir, PR_FALSE);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Restore: failed to remove changelog\n");
+ goto done;
+ }
+
+ /* We copy the files over from the staging area */
+ prDir = PR_OpenDir(bkDir);
+ if (prDir == NULL)
+ {
+ rc = CL5_SYSTEM_ERROR;
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Restore: unable to access backup directory %s; NSPR error - %d\n",
+ bkDir, PR_GetError ());
+ goto done;
+ }
+
+ while (NULL != (prDirEntry = PR_ReadDir(prDir, PR_SKIP_DOT | PR_SKIP_DOT_DOT)))
+ {
+ if (NULL == prDirEntry->name) /* NSPR doesn't behave like the docs say it should */
+ {
+ break;
+ }
+
+ /* Log files have names of the form "log.xxxxx". We detect these by looking for
+ the prefix "log." and the lack of the ".<dbext>" suffix */
+ seenLog |= _cl5IsLogFile(prDirEntry->name);
+
+ /* ONREPL currently, list of replicas is ignored because db code can't handle discrepancy
+ between transaction log and present files; this should change before 5.0 ships */
+ PR_snprintf(destFile, MAXPATHLEN, "%s/%s", clDir, prDirEntry->name);
+ PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", bkDir, prDirEntry->name);
+ rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE);
+ if (rc != 0)
+ {
+ rc = CL5_SYSTEM_ERROR;
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Restore: failed to copy %s\n", prDirEntry->name);
+ PR_CloseDir(prDir);
+ goto done;
+ }
+ }
+
+ PR_CloseDir(prDir);
+
+ /* now open and close changelog to create all necessary files */
+ if (seenLog)
+ rc = _cl5Open (clDir, NULL, CL5_OPEN_RESTORE_RECOVER);
+ else
+ rc = _cl5Open (clDir, NULL, CL5_OPEN_RESTORE);
+
+ if (rc == CL5_SUCCESS)
+ {
+ _cl5Close ();
+
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5Restore: changelog recovery is finished \n");
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5Restore: failed open changelog after recovery\n");
+ }
+
+done:;
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return rc;
+}
+
+/* Name: cl5ExportLDIF
+ Description: dumps changelog to an LDIF file; changelog can be open or closed.
+ Parameters: clDir - changelog dir
+ ldifFile - full path to ldif file to write
+ replicas - optional list of replicas whose changes should be exported;
+ if the list is NULL, entire changelog is exported.
+ Return: CL5_SUCCESS if function is successfull;
+ CL5_BAD_DATA if invalid parameter is passed;
+ CL5_BAD_STATE if changelog is not initialized;
+ CL5_DB_ERROR if db api fails;
+ CL5_SYSTEM_ERROR if NSPR call fails;
+ CL5_MEMORY_ERROR if memory allocation fials.
+ */
+int cl5ExportLDIF (const char *ldifFile, Object **replicas)
+{
+ int i;
+ int rc;
+ PRFileDesc *prFile = NULL;
+ Object *obj;
+
+ if (ldifFile == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ExportLDIF: null ldif file name\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ExportLDIF: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog is open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ prFile = PR_Open (ldifFile, PR_WRONLY | PR_CREATE_FILE | PR_TRUNCATE, 0600);
+ if (prFile == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ExportLDIF: failed to open (%s) file; NSPR error - %d\n",
+ ldifFile, PR_GetError ());
+ rc = CL5_SYSTEM_ERROR;
+ goto done;
+ }
+
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5ExportLDIF: starting changelog export to (%s) ...\n", ldifFile);
+
+ if (replicas) /* export only selected files */
+ {
+ for (i = 0; replicas[i]; i++)
+ {
+ rc = _cl5GetDBFile (replicas[i], &obj);
+ if (rc == CL5_SUCCESS)
+ {
+ rc = _cl5ExportFile (prFile, obj);
+ object_release (obj);
+ }
+ else
+ {
+ Replica *r = (Replica*)object_get_data (replicas[i]);
+
+ PR_ASSERT (r);
+
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5ExportLDIF: "
+ "failed to locate changelog file for replica at (%s)\n",
+ slapi_sdn_get_dn (replica_get_root (r)));
+ }
+ }
+ }
+ else /* export all files */
+ {
+ for (obj = objset_first_obj(s_cl5Desc.dbFiles); obj;
+ obj = objset_next_obj(s_cl5Desc.dbFiles, obj))
+ {
+ rc = _cl5ExportFile (prFile, obj);
+ object_release (obj);
+ }
+ }
+
+ rc = CL5_SUCCESS;
+done:;
+
+ _cl5RemoveThread ();
+
+ if (rc == CL5_SUCCESS)
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5ExportLDIF: changelog export is finished.\n");
+
+ if (prFile)
+ PR_Close (prFile);
+
+ return rc;
+}
+
+/* Name: cl5ImportLDIF
+ Description: imports ldif file into changelog; changelog must be in the closed state
+ Parameters: clDir - changelog dir
+ ldifFile - absolute path to the ldif file to import
+ replicas - optional list of replicas whose data should be imported;
+ if the list is NULL, all data in the file is imported.
+ Return: CL5_SUCCESS if function is successfull;
+ CL5_BAD_DATA if invalid parameter is passed;
+ CL5_BAD_STATE if changelog is open or not inititalized;
+ CL5_DB_ERROR if db api fails;
+ CL5_SYSTEM_ERROR if NSPR call fails;
+ CL5_MEMORY_ERROR if memory allocation fials.
+ */
+int cl5ImportLDIF (const char *clDir, const char *ldifFile, Object **replicas)
+{
+ FILE *file;
+ int rc;
+ char *buff;
+ int lineno = 0;
+ slapi_operation_parameters op;
+ Object *replica = NULL;
+ char *replGen = NULL;
+
+ /* validate params */
+ if (ldifFile == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: null ldif file name\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that nobody change changelog state while import is in progress */
+ PR_RWLock_Wlock (s_cl5Desc.stLock);
+
+ /* make sure changelog is closed */
+ if (s_cl5Desc.dbState != CL5_STATE_CLOSED)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: invalid state - %d \n", s_cl5Desc.dbState);
+
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return CL5_BAD_STATE;
+ }
+
+ /* open LDIF file */
+ file = fopen (ldifFile, "r"); /* XXXggood Does fopen reliably work if > 255 files open? */
+ if (file == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: failed to open (%s) ldif file; system error - %d\n",
+ ldifFile, errno);
+ rc = CL5_SYSTEM_ERROR;
+ goto done;
+ }
+
+ /* remove changelog */
+ rc = _cl5Delete (clDir, PR_FALSE);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: failed to remove changelog\n");
+ goto done;
+ }
+
+ /* open changelog */
+ rc = _cl5Open (clDir, NULL, CL5_OPEN_LDIF2CL);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: failed to open changelog\n");
+ goto done;
+ }
+
+ /* read entries and write them to changelog */
+ while ((buff = ldif_get_entry( file, &lineno )) != NULL)
+ {
+ rc = _cl5LDIF2Operation (buff, &op, &replGen);
+ slapi_ch_free ((void**)&buff);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: failed to convert LDIF fragment to LDAP operation; "
+ "end of fragment line number - %d\n", lineno);
+ goto done;
+ }
+
+ /* if we perform selective import, check if the operation should be wriiten to changelog */
+ replica = _cl5GetReplica (&op, replGen);
+ if (replica == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: failed to locate replica for target dn (%s) and "
+ "replica generation %s\n", op.target_address.dn, replGen);
+
+ slapi_ch_free ((void**)&replGen);
+ operation_parameters_done (&op);
+ goto done;
+ }
+
+ if (!replicas || _cl5ReplicaInList (replica, replicas))
+ {
+ /* write operation creates the file if it does not exist */
+ rc = _cl5WriteOperation (replica_get_name ((Replica*)object_get_data(replica)),
+ replGen, &op, 1);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ImportLDIF: failed to write operation to the changelog\n");
+ object_release (replica);
+ slapi_ch_free ((void**)&replGen);
+ operation_parameters_done (&op);
+ goto done;
+ }
+ }
+
+ object_release (replica);
+ slapi_ch_free ((void**)&replGen);
+ operation_parameters_done (&op);
+ }
+
+done:;
+ _cl5Close ();
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return rc;
+}
+
+/* Name: cl5GetState
+ Description: returns database state
+ Parameters: none
+ Return: changelog state
+ */
+int cl5GetState ()
+{
+ return s_cl5Desc.dbState;
+}
+
+/* Name: cl5ConfigTrimming
+ Description: sets changelog trimming parameters; changelog must be open.
+ Parameters: maxEntries - maximum number of entries in the chnagelog (in all files);
+ maxAge - maximum entry age;
+ Return: CL5_SUCCESS if successful;
+ CL5_BAD_STATE if changelog is not open
+ */
+int cl5ConfigTrimming (int maxEntries, const char *maxAge)
+{
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5ConfigTrimming: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure changelog is not closed while trimming configuration
+ is updated.*/
+ _cl5AddThread ();
+
+ PR_Lock (s_cl5Desc.dbTrim.lock);
+
+ if (maxAge)
+ {
+ /* don't ignore this argument */
+ if (strcmp (maxAge, CL5_STR_IGNORE) != 0)
+ {
+ s_cl5Desc.dbTrim.maxAge = age_str2time (maxAge);
+ }
+ }
+ else
+ {
+ /* unlimited */
+ s_cl5Desc.dbTrim.maxAge = 0;
+ }
+
+ if (maxEntries != CL5_NUM_IGNORE)
+ {
+ s_cl5Desc.dbTrim.maxEntries = maxEntries;
+ }
+
+ PR_Unlock (s_cl5Desc.dbTrim.lock);
+
+ _cl5RemoveThread ();
+
+ return CL5_SUCCESS;
+}
+
+/* Name: cl5GetOperation
+ Description: retireves operation specified by its csn and databaseid
+ Parameters: op - must contain csn and databaseid; the rest of data is
+ filled if function is successfull
+ Return: CL5_SUCCESS if function is successfull;
+ CL5_BAD_DATA if invalid op is passed;
+ CL5_BAD_STATE if db has not been initialized;
+ CL5_NOTFOUND if entry was not found;
+ CL5_DB_ERROR if any other db error occured;
+ CL5_BADFORMAT if db data format does not match entry format.
+ */
+int cl5GetOperation (Object *replica, slapi_operation_parameters *op)
+{
+ int rc;
+ char *agmt_name;
+
+ agmt_name = get_thread_private_agmtname();
+
+ if (replica == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation: NULL replica\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (op == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "cl5GetOperation: NULL operation\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (op->csn == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "%s: cl5GetOperation: operation contains no CSN\n", agmt_name);
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: cl5GetOperation: changelog is not initialized\n", agmt_name);
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog is open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5GetOperation (replica, op);
+
+ _cl5RemoveThread ();
+
+ return rc;
+}
+
+/* Name: cl5GetFirstOperation
+ Description: retrieves first operation for a particular database
+ replica - replica for which the operation should be retrieved.
+ Parameters: op - buffer to store the operation;
+ iterator - to be passed to the call to cl5GetNextOperation
+ Return: CL5_SUCCESS, if successful
+ CL5_BADDATA, if operation is NULL
+ CL5_BAD_STATE, if changelog is not open
+ CL5_DB_ERROR, if db call fails
+ */
+int cl5GetFirstOperation (Object *replica, slapi_operation_parameters *op, void **iterator)
+{
+ int rc;
+ CL5Entry entry;
+ Object *obj;
+ char *agmt_name;
+
+ if (replica == NULL || op == NULL || iterator == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5GetFirstOperation: invalid argument\n");
+ return CL5_BAD_DATA;
+ }
+
+ *iterator = NULL;
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ agmt_name = get_thread_private_agmtname();
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: cl5GetFirstOperation: changelog is not initialized\n", agmt_name);
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog stays open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc != CL5_SUCCESS)
+ {
+ _cl5RemoveThread ();
+ return rc;
+ }
+
+ entry.op = op;
+ /* Callers of this function should cl5_operation_parameters_done(op) */
+ rc = _cl5GetFirstEntry (obj, &entry, iterator, NULL);
+ object_release (obj);
+
+ _cl5RemoveThread ();
+
+ return rc;
+}
+
+/* Name: cl5GetNextOperation
+ Description: retrieves the next op from the changelog as defined by the iterator;
+ changelog must be open.
+ Parameters: op - returned operation, if function is successful
+ iterator - in: identifies op to retrieve; out: identifies next op
+ Return: CL5_SUCCESS, if successful
+ CL5_BADDATA, if op is NULL
+ CL5_BAD_STATE, if changelog is not open
+ CL5_NOTFOUND, empty changelog
+ CL5_DB_ERROR, if db call fails
+ */
+int cl5GetNextOperation (slapi_operation_parameters *op, void *iterator)
+{
+ CL5Entry entry;
+
+ if (op == NULL || iterator == NULL || !_cl5IsValidIterator (iterator))
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5GetNextOperation: invalid argument\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (s_cl5Desc.dbState != CL5_STATE_OPEN)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5GetNextOperation: changelog is not open\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* we don't need to increment thread count since cl5GetFirstOperation
+ locked the file through which we are iterating */
+ entry.op = op;
+ /* Callers of this function should cl5_operation_parameters_done(op) */
+ return _cl5GetNextEntry (&entry, iterator);
+}
+
+/* Name: cl5DestroyIterator
+ Description: destroys iterator once iteration through changelog is done
+ Parameters: iterator - iterator to destroy
+ Return: none
+ */
+void cl5DestroyIterator (void *iterator)
+{
+ CL5Iterator *it = (CL5Iterator*)iterator;
+
+ if (it == NULL)
+ return;
+
+ /* close cursor */
+ if (it->cursor)
+ it->cursor->c_close (it->cursor);
+
+ if (it->file)
+ object_release (it->file);
+
+ slapi_ch_free ((void**)&it);
+}
+
+/* Name: cl5WriteOperation
+ Description: writes operation to changelog
+ Parameters: replName - name of the replica to which operation applies
+ replGen - replica generation for the operation
+ !!!Note that we pass name and generation rather than
+ replica object since generation can change while operation
+ is in progress (if the data is reloaded). !!!
+ op - operation to write
+ local - this is a non-replicated operation
+ Return: CL5_SUCCESS if function is successfull;
+ CL5_BAD_DATA if invalid op is passed;
+ CL5_BAD_STATE if db has not been initialized;
+ CL5_MEMORY_ERROR if memory allocation failed;
+ CL5_DB_ERROR if any other db error occured;
+ */
+int cl5WriteOperation(const char *replName, const char *replGen,
+ const slapi_operation_parameters *op, PRBool local)
+{
+ int rc;
+
+ if (op == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5WriteOperation: NULL operation passed\n");
+ return CL5_BAD_DATA;
+ }
+
+ if (!IsValidOperation (op))
+ {
+ return CL5_BAD_DATA;
+ }
+
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5WriteOperation: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog is open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ rc = _cl5WriteOperation(replName, replGen, op, local);
+
+ /* update the upper bound ruv vector */
+ if (rc == CL5_SUCCESS)
+ {
+ Object *file_obj = NULL;
+
+ if ( _cl5GetDBFileByReplicaName (replName, replGen, &file_obj) == CL5_SUCCESS) {
+ rc = _cl5UpdateRUV (file_obj, op->csn, PR_FALSE, PR_FALSE);
+ object_release (file_obj);
+ }
+
+ }
+
+ _cl5RemoveThread ();
+
+ return rc;
+}
+
+/* Name: cl5CreateReplayIterator
+ Description: creates an iterator that allows to retireve changes that should
+ to be sent to the consumer identified by ruv. The iteration is peformed by
+ repeated calls to cl5GetNextOperationToReplay.
+ Parameters: replica - replica whose data we wish to iterate;
+ ruv - consumer ruv;
+ iterator - iterator to be passed to cl5GetNextOperationToReplay call
+ Return: CL5_SUCCESS, if function is successfull;
+ CL5_MISSING_DATA, if data that should be in the changelog is missing
+ CL5_PURGED_DATA, if some data that consumer needs has been purged.
+ Note that the iterator can be non null if the supplier contains
+ some data that needs to be sent to the consumer
+ CL5_NOTFOUND if the consumer is up to data with respect to the supplier
+ CL5_BAD_DATA if invalid parameter is passed;
+ CL5_BAD_STATE if db has not been open;
+ CL5_DB_ERROR if any other db error occured;
+ CL5_MEMORY_ERROR if memory allocation fails.
+ Algorithm: Build a list of csns from consumer's and supplier's ruv. For each element
+ of the consumer's ruv put max csn into the csn list. For each element
+ of the supplier's ruv not in the consumer's ruv put min csn from the
+ supplier's ruv into the list. The list contains, for each known replica,
+ the starting point for changes to be sent to the consumer.
+ Sort the list in accending order.
+ Build a hash which contains, for each known replica, whether the
+ supplier can bring the consumer up to data with respect to that replica.
+ The hash is used to decide whether a change can be sent to the consumer
+ Find the replica with the smallest csn in the list for which
+ we can bring the consumer up to date.
+ Position the db cursor on the change entry that corresponds to this csn.
+ Hash entries are created for each replica traversed so far. sendChanges
+ flag is set to FALSE for all repolicas except the last traversed.
+
+ */
+int cl5CreateReplayIterator (Private_Repl_Protocol *prp, const RUV *consumerRuv,
+ CL5ReplayIterator **iterator)
+{
+ int rc;
+ Object *replica;
+ Object *obj = NULL;
+
+ replica = prp->replica_object;
+ if (replica == NULL || consumerRuv == NULL || iterator == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5CreateReplayIterator: invalid parameter\n");
+ return CL5_BAD_DATA;
+ }
+
+ *iterator = NULL;
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5CreateReplayIterator: changelog is not initialized\n");
+ return CL5_BAD_STATE;
+ }
+
+ /* make sure that changelog is open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS ) return rc;
+
+
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc == CL5_SUCCESS)
+ {
+ /* iterate through the ruv in csn order to find first master for which
+ we can replay changes */
+ ReplicaId consumerRID = agmt_get_consumer_rid ( prp->agmt, prp->conn );
+ rc = _cl5PositionCursorForReplay (consumerRID, consumerRuv, replica, obj, iterator);
+ if (rc != CL5_SUCCESS)
+ {
+ if (obj)
+ object_release (obj);
+ }
+ }
+
+ _cl5RemoveThread ();
+
+ return rc;
+}
+
+/* Name: cl5GetNextOperationToReplay
+ Description: retrieves next operation to be sent to a particular consumer and
+ that was created on a particular master. Consumer and master info
+ is encoded in the iterator parameter that must be created by call
+ to cl5CreateReplayIterator.
+ Parameters: iterator - iterator that identifies next entry to retrieve;
+ op - operation retrieved if function is successful
+ Return: CL5_SUCCESS if function is successfull;
+ CL5_BAD_DATA if invalid parameter is passed;
+ CL5_NOTFOUND if end of iteration list is reached
+ CL5_DB_ERROR if any other db error occured;
+ CL5_BADFORMAT if data in db is of unrecognized format;
+ CL5_MEMORY_ERROR if memory allocation fails.
+ Algorithm: Iterate through changelog entries until a change is found that
+ originated at the replica for which we are sending changes
+ (based on the information in the iteration hash) and
+ whose csn is larger than the csn already seen by the consumer
+ If change originated at the replica not in the hash,
+ determine whether we should send changes originated at the replica
+ and add replica entry into the hash. We can send the changes for
+ the replica if the current csn is smaller or equal to the csn
+ in the consumer's ruv (if present) or if it is equal to the min
+ csn in the supplier's ruv.
+ */
+int
+cl5GetNextOperationToReplay (CL5ReplayIterator *iterator, CL5Entry *entry)
+{
+ CSN *csn;
+ char *key, *data;
+ size_t keylen, datalen;
+ char *agmt_name;
+ int rc = 0;
+
+ agmt_name = get_thread_private_agmtname();
+
+ if (entry == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: cl5GetNextOperationToReplay: invalid parameter passed\n", agmt_name);
+ return CL5_BAD_DATA;
+ }
+
+ rc = clcache_get_next_change (iterator->clcache, (void **)&key, &keylen, (void **)&data, &datalen, &csn);
+
+ if (rc == DB_NOTFOUND)
+ {
+ /*
+ * Abort means we've figured out that we've passed the replica Min CSN,
+ * so we should stop looping through the changelog
+ */
+ return CL5_NOTFOUND;
+ }
+
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, NULL, "%s: cl5GetNextOperationToReplay: "
+ "failed to read next entry; DB error %d\n", agmt_name, rc);
+ return CL5_DB_ERROR;
+ }
+
+ /* there is an entry we should return */
+ /* Callers of this function should cl5_operation_parameters_done(op) */
+ if ( 0 != cl5DBData2Entry ( data, datalen, entry ) )
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "%s: cl5GetNextOperationToReplay: failed to format entry rc=%d\n", agmt_name, rc);
+ return rc;
+ }
+
+ return CL5_SUCCESS;
+}
+
+/* Name: cl5DestroyReplayIterator
+ Description: destorys iterator
+ Parameters: iterator - iterator to destory
+ Return: none
+ */
+void cl5DestroyReplayIterator (CL5ReplayIterator **iterator)
+{
+ if (iterator == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5DestroyReplayIterator: invalid iterartor passed\n");
+ return;
+ }
+
+ clcache_return_buffer ( &(*iterator)->clcache );
+
+ if ((*iterator)->fileObj)
+ object_release ((*iterator)->fileObj);
+
+ /* release supplier's ruv */
+ if ((*iterator)->supplierRuvObj)
+ object_release ((*iterator)->supplierRuvObj);
+
+ slapi_ch_free ((void **)iterator);
+}
+
+/* Name: cl5DeleteOnClose
+ Description: marks changelog for deletion when it is closed
+ Parameters: flag; if flag = 1 then delete else don't
+ Return: none
+ */
+void cl5DeleteOnClose (PRBool rm)
+{
+ s_cl5Desc.dbRmOnClose = rm;
+}
+
+/* Name: cl5GetDir
+ Description: returns changelog directory
+ Parameters: none
+ Return: copy of the directory; caller needs to free the string
+ */
+ char *cl5GetDir ()
+{
+ if (s_cl5Desc.dbDir == NULL)
+ {
+ return NULL;
+ }
+ else
+ {
+ return slapi_ch_strdup (s_cl5Desc.dbDir);
+ }
+}
+
+/* Name: cl5Exist
+ Description: checks if a changelog exists in the specified directory;
+ We consider changelog to exist if it contains the dbversion file.
+ Parameters: clDir - directory to check
+ Return: 1 - if changelog exists; 0 - otherwise
+ */
+PRBool cl5Exist (const char *clDir)
+{
+ char fName [MAXPATHLEN + 1];
+ int rc;
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", clDir, VERSION_FILE);
+ rc = PR_Access (fName, PR_ACCESS_EXISTS);
+
+ return (rc == PR_SUCCESS);
+}
+
+/* Name: cl5GetOperationCount
+ Description: returns number of entries in the changelog. The changelog must be
+ open for the value to be meaningful.
+ Parameters: replica - optional parameter that specifies the replica whose operations
+ we wish to count; if NULL all changelog entries are counted
+ Return: number of entries in the changelog
+ */
+
+int cl5GetOperationCount (Object *replica)
+{
+ Object *obj;
+ CL5DBFile *file;
+ int count = 0;
+ int rc;
+
+ if (s_cl5Desc.dbState == CL5_STATE_NONE)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5GetOperationCount: changelog is not initialized\n");
+ return -1;
+ }
+
+ /* make sure that changelog is open while operation is in progress */
+ rc = _cl5AddThread ();
+ if (rc != CL5_SUCCESS)
+ return -1;
+
+ if (replica == NULL) /* compute total entry count */
+ {
+ obj = objset_first_obj (s_cl5Desc.dbFiles);
+ while (obj)
+ {
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+ count += file->entryCount;
+ obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
+ }
+ }
+ else /* return count for particular db */
+ {
+ /* select correct db file */
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc == CL5_SUCCESS)
+ {
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ count = file->entryCount;
+ object_release (obj);
+ }
+ else
+ {
+ count = 0;
+ }
+ }
+
+ _cl5RemoveThread ();
+ return count;
+}
+
+/***** Helper Functions *****/
+
+/* this call happens under state lock */
+static int _cl5Open (const char *dir, const CL5DBConfig *config, CL5OpenMode openMode)
+{
+ int rc;
+ PRBool didRecovery;
+
+ PR_ASSERT (dir);
+
+ /* setup db configuration parameters */
+ if (config)
+ {
+ _cl5SetDBConfig (config);
+ }
+ else
+ {
+ _cl5SetDefaultDBConfig ();
+ }
+
+ /* initialize trimming */
+ rc = _cl5TrimInit ();
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Open: failed to initialize trimming\n");
+ goto done;
+ }
+
+ /* create the changelog directory if it does not exist */
+ rc = cl5CreateDirIfNeeded (dir);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Open: failed to create changelog directory (%s)\n", dir);
+ goto done;
+ }
+
+ s_cl5Desc.dbDir = slapi_ch_strdup (dir);
+
+ /* check database version */
+ rc = _cl5CheckDBVersion ();
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5Open: invalid db version\n");
+ goto done;
+ }
+
+ s_cl5Desc.dbOpenMode = openMode;
+
+ /* initialize db environment */
+ rc = _cl5AppInit (&didRecovery);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Open: failed to initialize db environment\n");
+ goto done;
+ }
+
+ /* open database files */
+ rc = _cl5DBOpen (!didRecovery);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Open: failed to open changelog database\n");
+
+ goto done;
+ }
+
+done:;
+
+ if (rc != CL5_SUCCESS)
+ {
+ _cl5Close ();
+ }
+
+ return rc;
+}
+
+int cl5CreateDirIfNeeded (const char *dirName)
+{
+ int rc;
+ char buff [MAXPATHLEN + 1];
+ char *t;
+
+ PR_ASSERT (dirName);
+
+ rc = PR_Access(dirName, PR_ACCESS_EXISTS);
+ if (rc == PR_SUCCESS)
+ {
+ return CL5_SUCCESS;
+ }
+
+ /* directory does not exist - try to create */
+ strncpy (buff, dirName, MAXPATHLEN);
+ t = strchr (buff, '/');
+
+ /* skip first slash */
+ if (t)
+ {
+ t = strchr (t+1, '/');
+ }
+
+ while (t)
+ {
+ *t = '\0';
+ if (PR_Access (buff, PR_ACCESS_EXISTS) != PR_SUCCESS)
+ {
+ rc = PR_MkDir (buff, DIR_CREATE_MODE);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5CreateDirIfNeeded: failed to create dir (%s); NSPR error - %d\n",
+ dirName, PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+ }
+
+ *t++ = FILE_PATHSEP;
+
+ t = strchr (t, '/');
+ }
+
+ /* last piece */
+ rc = PR_MkDir (buff, DIR_CREATE_MODE);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5CreateDirIfNeeded: failed to create dir; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5RemoveEnv ()
+{
+ DB_ENV *dbEnv = NULL;
+ int rc = 0;
+
+ if ((rc = db_env_create(&dbEnv, 0)) != 0)
+ dbEnv = NULL;
+
+ if (dbEnv == NULL)
+ {
+ char *errstr = db_strerror(rc);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5RemoveEnv: failed to allocate db environment; "
+ "db error - %d %s\n", rc, errstr ? errstr : "unknown");
+ return CL5_MEMORY_ERROR;
+ }
+ rc = dbEnv->remove(dbEnv, s_cl5Desc.dbDir, DB_FORCE);
+ if (0 != rc)
+ {
+ char *errstr = db_strerror(rc);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5AppInit: failed to remove db environment; "
+ "db error - %d %s\n", rc, errstr ? errstr : "unknown");
+ return CL5_DB_ERROR;
+ }
+ return CL5_SUCCESS;
+}
+
+static int _cl5AppInit (PRBool *didRecovery)
+{
+ int rc;
+ unsigned int flags = DB_CREATE | DB_INIT_MPOOL | DB_THREAD;
+ DB_ENV *dbEnv;
+ if ((rc = db_env_create(&dbEnv, 0)) != 0)
+ dbEnv = NULL;
+
+ if (dbEnv == NULL)
+ {
+ char *errstr = db_strerror(rc);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5AppInit: failed to allocate db environment; db error - %d (%s)\n",
+ rc, errstr ? errstr : "unknown");
+ return CL5_MEMORY_ERROR;
+ }
+
+ _cl5InitDBEnv (dbEnv);
+
+ if (didRecovery)
+ *didRecovery = PR_FALSE;
+
+ /* decide how two open based on the mode in which db is open */
+ switch (s_cl5Desc.dbOpenMode)
+ {
+ case CL5_OPEN_NORMAL:
+ flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG;
+ /* check if need to initiate recovery */
+ rc = _cl5CheckGuardian ();
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5AppInit: recovering changelog after disorderly shutdown\n");
+ flags |= DB_RECOVER;
+ }
+ break;
+
+ case CL5_OPEN_RESTORE:
+ flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG;
+ break;
+
+ case CL5_OPEN_CLEAN_RECOVER:
+ flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG | DB_RECOVER;
+ break;
+
+ case CL5_OPEN_RESTORE_RECOVER:
+ flags |= DB_INIT_LOCK | DB_INIT_TXN | DB_INIT_LOG | DB_RECOVER_FATAL;
+ break;
+
+ case CL5_OPEN_LDIF2CL:
+ /* ONREPL - don't think we need any extra flags here */
+ break;
+ default:
+ /* fixme? CL5_OPEN_NONE */
+ break;
+ }
+
+ if (!s_cl5Desc.dbConfig.durableTrans)
+ {
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3200
+ dbEnv->set_flags(dbEnv, DB_TXN_NOSYNC, 1);
+#else
+ flags |= DB_TXN_NOSYNC;
+#endif
+ }
+
+ dbEnv->set_errcall(dbEnv, dblayer_log_print);
+
+ /* do recovery if necessary */
+ if ((flags & DB_RECOVER) || (flags & DB_RECOVER_FATAL))
+ {
+ if (CL5_OPEN_CLEAN_RECOVER == s_cl5Desc.dbOpenMode)
+ _cl5RemoveEnv();
+
+ rc = _cl5Recover (flags, dbEnv);
+ if (rc != CL5_SUCCESS)
+ {
+ char *errstr = db_strerror(rc);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5AppInit: failed to recover changelog; db error - %d %s\n",
+ rc, errstr ? errstr : "unknown");
+
+ slapi_ch_free ((void **)&dbEnv);
+
+ return rc;
+ }
+
+ if (didRecovery)
+ *didRecovery = PR_TRUE;
+ flags &= ~(DB_RECOVER | DB_RECOVER_FATAL);
+ /* Need to reset the env */
+ /* Does this leak the dbEnv? */
+ if ((rc = db_env_create(&dbEnv, 0)) != 0)
+ dbEnv = NULL;
+
+ if (dbEnv == NULL)
+ {
+ char *errstr = db_strerror(rc);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5AppInit: failed to allocate db environment after recovery; "
+ "db error - %d %s\n", rc, errstr ? errstr : "unknown");
+ return CL5_MEMORY_ERROR;
+ }
+ _cl5InitDBEnv (dbEnv);
+ }
+
+ rc = dbEnv->open(dbEnv, s_cl5Desc.dbDir, flags,
+ s_cl5Desc.dbConfig.fileMode);
+ if (rc == 0)
+ {
+ s_cl5Desc.dbEnv = dbEnv;
+ s_cl5Desc.dbEnvOpenFlags = flags;
+ return CL5_SUCCESS;
+ }
+ else
+ {
+ char *errstr = db_strerror(rc);
+ char flagstr[20];
+
+ flagstr[0] = 0;
+ /* EINVAL return means bad flags - let's see what the flags are */
+ if (rc == EINVAL)
+ {
+ sprintf(flagstr, "%u", flags);
+ }
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5AppInit: db environment open failed; db error - %d %s %s\n",
+ rc, errstr ? errstr : "unknown", flagstr);
+ slapi_ch_free ((void **)&dbEnv);
+ return CL5_DB_ERROR;
+ }
+}
+
+static int _cl5DBOpen ()
+{
+ PRBool dbFile;
+ PRDir *dir;
+ PRDirEntry *entry = NULL;
+ int rc;
+ Object *replica;
+
+ /* create lock that guarantees that each file is only added once to the list */
+ s_cl5Desc.fileLock = PR_NewLock ();
+
+ /* loop over all db files and open them; file name format is cl5_<dbid>.<dbext> */
+ dir = PR_OpenDir(s_cl5Desc.dbDir);
+ if (dir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DBOpen: failed to open changelog dir; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+
+ }
+
+ /* initialize set of db file objects */
+ s_cl5Desc.dbFiles = objset_new(NULL);
+ while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT)))
+ {
+ if (NULL == entry->name)
+ {
+ break;
+ }
+
+ dbFile = _cl5FileName2Replica (entry->name, &replica);
+ if (dbFile) /* this is db file, not a log or dbversion; those are just skipped */
+ {
+ /* we only open files for existing replicas */
+ if (replica)
+ {
+ rc = _cl5DBOpenFile (replica, NULL /* file object */,
+ PR_FALSE /* check for duplicates */);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen: "
+ "Error opening file %s\n",
+ entry->name);
+ return rc;
+ }
+
+ object_release (replica);
+ }
+ else /* there is no matching replica for the file - remove */
+ {
+ char fullpathname[MAXPATHLEN];
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen: "
+ "file %s has no matching replica; removing\n", entry->name);
+
+ PR_snprintf(fullpathname, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name);
+ if (PR_Delete(fullpathname) != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBOpen: "
+ "failed to remove (%s) file; NSPR error - %d\n",
+ entry->name, PR_GetError ());
+
+ }
+ }
+ }
+ }
+
+ PR_CloseDir(dir);
+
+ return CL5_SUCCESS;
+}
+
+/* this function assumes that the entry was validated
+ using IsValidOperation
+
+ Data in db format:
+ ------------------
+ <1 byte version><1 byte change_type><sizeof time_t time><null terminated csn>
+ <null terminated uniqueid><null terminated targetdn>
+ [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....]
+
+ mod format:
+ -----------
+ <1 byte modop><null terminated attr name><4 byte value count>
+ <4 byte value size><value1><4 byte value size><value2>
+*/
+static int _cl5Entry2DBData (const CL5Entry *entry, char **data, PRUint32 *len)
+{
+ int size = 1 /* version */ + 1 /* operation type */ + sizeof (time_t);
+ char *pos;
+ PRUint32 t;
+ slapi_operation_parameters *op;
+ LDAPMod **add_mods = NULL;
+ char *rawDN = NULL;
+ char s[CSN_STRSIZE];
+
+ PR_ASSERT (entry && entry->op && data && len);
+
+ op = entry->op;
+
+ /* compute size of the buffer needed to hold the data */
+ size += CSN_STRSIZE;
+ size += strlen (op->target_address.uniqueid) + 1;
+
+ switch (op->operation_type)
+ {
+ case SLAPI_OPERATION_ADD: if (op->p.p_add.parentuniqueid)
+ size += strlen (op->p.p_add.parentuniqueid) + 1;
+ else
+ size ++; /* we just store NULL char */
+ slapi_entry2mods (op->p.p_add.target_entry, &rawDN/* dn */, &add_mods);
+ size += strlen (rawDN) + 1;
+ size += _cl5GetModsSize (add_mods);
+ break;
+
+ case SLAPI_OPERATION_MODIFY: size += strlen (op->target_address.dn) + 1;
+ size += _cl5GetModsSize (op->p.p_modify.modify_mods);
+ break;
+
+ case SLAPI_OPERATION_MODRDN: size += strlen (op->target_address.dn) + 1;
+ /* 1 for deleteoldrdn */
+ size += strlen (op->p.p_modrdn.modrdn_newrdn) + 2;
+ if (op->p.p_modrdn.modrdn_newsuperior_address.dn)
+ size += strlen (op->p.p_modrdn.modrdn_newsuperior_address.dn) + 1;
+ else
+ size ++; /* for NULL char */
+ if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
+ size += strlen (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid) + 1;
+ else
+ size ++; /* for NULL char */
+ size += _cl5GetModsSize (op->p.p_modrdn.modrdn_mods);
+ break;
+
+ case SLAPI_OPERATION_DELETE: size += strlen (op->target_address.dn) + 1;
+ break;
+ }
+
+ /* allocate data buffer */
+ (*data) = (char *) slapi_ch_malloc (size);
+ if ((*data) == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Entry2DBData: failed to allocate data buffer\n");
+ return CL5_MEMORY_ERROR;
+ }
+
+ /* fill in the data buffer */
+ pos = *data;
+ /* write a byte of version */
+ (*pos) = V_5;
+ pos ++;
+ /* write change type */
+ (*pos) = (unsigned char)op->operation_type;
+ pos ++;
+ /* write time */
+ t = PR_htonl((PRUint32)entry->time);
+ memcpy (pos, &t, sizeof (t));
+ pos += sizeof (t);
+ /* write csn */
+ _cl5WriteString (csn_as_string(op->csn,PR_FALSE,s), &pos);
+ /* write UniqueID */
+ _cl5WriteString (op->target_address.uniqueid, &pos);
+
+ /* figure out what else we need to write depending on the operation type */
+ switch (op->operation_type)
+ {
+ case SLAPI_OPERATION_ADD: _cl5WriteString (op->p.p_add.parentuniqueid, &pos);
+ _cl5WriteString (rawDN, &pos);
+ _cl5WriteMods (add_mods, &pos);
+ slapi_ch_free ((void**)&rawDN);
+ ldap_mods_free (add_mods, 1);
+ break;
+
+ case SLAPI_OPERATION_MODIFY: _cl5WriteString (op->target_address.dn, &pos);
+ _cl5WriteMods (op->p.p_modify.modify_mods, &pos);
+ break;
+
+ case SLAPI_OPERATION_MODRDN: _cl5WriteString (op->target_address.dn, &pos);
+ _cl5WriteString (op->p.p_modrdn.modrdn_newrdn, &pos);
+ *pos = (PRUint8)op->p.p_modrdn.modrdn_deloldrdn;
+ pos ++;
+ _cl5WriteString (op->p.p_modrdn.modrdn_newsuperior_address.dn, &pos);
+ _cl5WriteString (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos);
+ _cl5WriteMods (op->p.p_modrdn.modrdn_mods, &pos);
+ break;
+
+ case SLAPI_OPERATION_DELETE: _cl5WriteString (op->target_address.dn, &pos);
+ break;
+ }
+
+ (*len) = size;
+
+ return CL5_SUCCESS;
+}
+
+/*
+ Data in db format:
+ ------------------
+ <1 byte version><1 byte change_type><sizeof time_t time><null terminated dbid>
+ <null terminated csn><null terminated uniqueid><null terminated targetdn>
+ [<null terminated newrdn><1 byte deleteoldrdn>][<4 byte mod count><mod1><mod2>....]
+
+ mod format:
+ -----------
+ <1 byte modop><null terminated attr name><4 byte value count>
+ <4 byte value size><value1><4 byte value size><value2>
+*/
+
+
+int
+cl5DBData2Entry (const char *data, PRUint32 len, CL5Entry *entry)
+{
+ int rc;
+ PRUint8 version;
+ char *pos = (char *)data;
+ char *strCSN;
+ PRUint32 thetime;
+ slapi_operation_parameters *op;
+ LDAPMod **add_mods;
+ char *rawDN;
+ char s[CSN_STRSIZE];
+
+ PR_ASSERT (data && entry && entry->op);
+
+ /* ONREPL - check that we do not go beyond the end of the buffer */
+
+ /* read byte of version */
+ version = (PRUint8)(*pos);
+ if (version != V_5)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5DBData2Entry: invalid data version\n");
+ return CL5_BAD_FORMAT;
+ }
+
+ op = entry->op;
+
+ pos += sizeof(version);
+
+ /* read change type */
+ op->operation_type = (PRUint8)(*pos);
+ pos ++;
+
+ /* need to do the copy first, to skirt around alignment problems on
+ certain architectures */
+ memcpy((char *)&thetime,pos,sizeof(thetime));
+ entry->time = (time_t)PR_ntohl(thetime);
+ pos += sizeof (thetime);
+
+ /* read csn */
+ _cl5ReadString (&strCSN, &pos);
+ if (op->csn == NULL || strcmp (strCSN, csn_as_string(op->csn,PR_FALSE,s)) != 0)
+ {
+ op->csn = csn_new_by_string (strCSN);
+ }
+ slapi_ch_free ((void**)&strCSN);
+
+ /* read UniqueID */
+ _cl5ReadString (&op->target_address.uniqueid, &pos);
+
+ /* figure out what else we need to read depending on the operation type */
+ switch (op->operation_type)
+ {
+ case SLAPI_OPERATION_ADD: _cl5ReadString (&op->p.p_add.parentuniqueid, &pos);
+ /* richm: need to free parentuniqueid */
+ _cl5ReadString (&rawDN, &pos);
+ op->target_address.dn = rawDN;
+ /* convert mods to entry */
+ rc = _cl5ReadMods (&add_mods, &pos);
+ slapi_mods2entry (&(op->p.p_add.target_entry), rawDN, add_mods);
+ ldap_mods_free (add_mods, 1);
+ break;
+
+ case SLAPI_OPERATION_MODIFY: _cl5ReadString (&op->target_address.dn, &pos);
+ rc = _cl5ReadMods (&op->p.p_modify.modify_mods, &pos);
+ break;
+
+ case SLAPI_OPERATION_MODRDN: _cl5ReadString (&op->target_address.dn, &pos);
+ _cl5ReadString (&op->p.p_modrdn.modrdn_newrdn, &pos);
+ op->p.p_modrdn.modrdn_deloldrdn = *pos;
+ pos ++;
+ _cl5ReadString (&op->p.p_modrdn.modrdn_newsuperior_address.dn, &pos);
+ _cl5ReadString (&op->p.p_modrdn.modrdn_newsuperior_address.uniqueid, &pos);
+ rc = _cl5ReadMods (&op->p.p_modrdn.modrdn_mods, &pos);
+ break;
+
+ case SLAPI_OPERATION_DELETE: _cl5ReadString (&op->target_address.dn, &pos);
+ rc = CL5_SUCCESS;
+ break;
+
+ default: rc = CL5_BAD_FORMAT;
+ slapi_log_error(SLAPI_LOG_FATAL,
+ repl_plugin_name_cl,
+ "cl5DBData2Entry: failed to format entry\n");
+ break;
+ }
+
+ return rc;
+}
+
+/* thread management functions */
+static int _cl5DispatchDBThreads ()
+{
+ if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void *)_cl5DeadlockMain,
+ NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+ PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE))
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DispatchDBThreads: failed to create deadlock thread; "
+ "NSPR error - %d\n", PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void *)_cl5CheckpointMain,
+ NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+ PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE))
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DispatchDBThreads: failed to create checkpoint thread; "
+ "NSPR error - %d\n", PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void *)_cl5TrickleMain,
+ NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+ PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE) )
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DispatchDBThreads: failed to create trickle thread; "
+ "NSPR error - %d\n", PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ if (NULL == PR_CreateThread (PR_USER_THREAD, (VFP)(void*)_cl5TrimMain,
+ NULL, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+ PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE) )
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DispatchDBThreads: failed to create trimming thread; "
+ "NSPR error - %d\n", PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5AddThread ()
+{
+ /* lock the state lock so that nobody can change the state
+ while backup is in progress
+ */
+ PR_RWLock_Rlock (s_cl5Desc.stLock);
+
+ /* open changelog if it is not already open */
+ if (s_cl5Desc.dbState != CL5_STATE_OPEN)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5AddThread: invalid changelog state - %d\n", s_cl5Desc.dbState);
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+ return CL5_BAD_STATE;
+ }
+
+ /* increment global thread count to make sure that changelog does not close while
+ backup is in progress */
+ PR_AtomicIncrement (&s_cl5Desc.threadCount);
+
+ PR_RWLock_Unlock (s_cl5Desc.stLock);
+
+ return CL5_SUCCESS;
+}
+
+static void _cl5RemoveThread ()
+{
+ PR_ASSERT (s_cl5Desc.threadCount > 0);
+ PR_AtomicDecrement (&s_cl5Desc.threadCount);
+}
+
+/* data conversion functions */
+static void _cl5WriteString (const char *str, char **buff)
+{
+ if (str)
+ {
+ strcpy (*buff, str);
+ (*buff) += strlen (str) + 1;
+ }
+ else /* just write NULL char */
+ {
+ (**buff) = '\0';
+ (*buff) ++;
+ }
+}
+
+static void _cl5ReadString (char **str, char **buff)
+{
+ if (str)
+ {
+ int len = strlen (*buff);
+
+ if (len)
+ {
+ *str = slapi_ch_strdup (*buff);
+ (*buff) += len + 1;
+ }
+ else /* just null char - skip it */
+ {
+ *str = NULL;
+ (*buff) ++;
+ }
+ }
+ else /* just skip this string */
+ {
+ (*buff) += strlen (*buff) + 1;
+ }
+}
+
+/* mods format:
+ -----------
+ <4 byte mods count><mod1><mod2>...
+
+ mod format:
+ -----------
+ <1 byte modop><null terminated attr name><4 byte count>
+ <4 byte size><value1><4 byte size><value2>...
+ */
+static void _cl5WriteMods (LDAPMod **mods, char **buff)
+{
+ PRInt32 i;
+ char *mod_start;
+ PRInt32 count;
+
+ if (mods == NULL)
+ return;
+
+ /* skip mods count */
+ mod_start = (*buff) + sizeof (count);
+
+ /* write mods*/
+ for (i=0; mods[i]; i++)
+ {
+ _cl5WriteMod (mods[i], &mod_start);
+ }
+
+ count = PR_htonl(i);
+ memcpy (*buff, &count, sizeof (count));
+
+ (*buff) = mod_start;
+}
+
+static void _cl5WriteMod (LDAPMod *mod, char **buff)
+{
+ char *pos;
+ PRInt32 count;
+ struct berval *bv;
+ Slapi_Mod smod;
+
+ slapi_mod_init_byref(&smod, mod);
+
+ pos = *buff;
+ /* write mod op */
+ *pos = (PRUint8)slapi_mod_get_operation (&smod);
+ pos ++;
+ /* write attribute name */
+ _cl5WriteString (slapi_mod_get_type (&smod), &pos);
+
+ /* write value count */
+ count = PR_htonl(slapi_mod_get_num_values(&smod));
+ memcpy (pos, &count, sizeof (count));
+ pos += sizeof (PRInt32);
+
+ bv = slapi_mod_get_first_value (&smod);
+ while (bv)
+ {
+ _cl5WriteBerval (bv, &pos);
+ bv = slapi_mod_get_next_value (&smod);
+ }
+
+ (*buff) = pos;
+
+ slapi_mod_done (&smod);
+}
+
+/* mods format:
+ -----------
+ <4 byte mods count><mod1><mod2>...
+
+ mod format:
+ -----------
+ <1 byte modop><null terminated attr name><4 byte count>
+ {<4 byte size><value1><4 byte size><value2>... ||
+ <null terminated str1> <null terminated str2>...}
+ */
+
+static int _cl5ReadMods (LDAPMod ***mods, char **buff)
+{
+ char *pos = *buff;
+ int i;
+ int rc;
+ PRInt32 mod_count;
+ Slapi_Mods smods;
+ Slapi_Mod smod;
+
+ /* need to copy first, to skirt around alignment problems on certain
+ architectures */
+ memcpy((char *)&mod_count,*buff,sizeof(mod_count));
+ mod_count = PR_ntohl(mod_count);
+ pos += sizeof (mod_count);
+
+ slapi_mods_init (&smods , mod_count);
+
+ for (i = 0; i < mod_count; i++)
+ {
+ rc = _cl5ReadMod (&smod, &pos);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_mods_done(&smods);
+ return rc;
+ }
+
+ slapi_mods_add_smod(&smods, &smod);
+ }
+
+ *buff = pos;
+
+ *mods = slapi_mods_get_ldapmods_passout (&smods);
+ slapi_mods_done(&smods);
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5ReadMod (Slapi_Mod *smod, char **buff)
+{
+ char *pos = *buff;
+ int i;
+ PRInt32 val_count;
+ char *type;
+ int op;
+ struct berval bv;
+
+ op = (*pos) & 0x000000FF;
+ pos ++;
+ _cl5ReadString (&type, &pos);
+
+ /* need to do the copy first, to skirt around alignment problems on
+ certain architectures */
+ memcpy((char *)&val_count,pos,sizeof(val_count));
+ val_count = PR_ntohl(val_count);
+ pos += sizeof (PRInt32);
+
+ slapi_mod_init(smod, val_count);
+ slapi_mod_set_operation (smod, op|LDAP_MOD_BVALUES);
+ slapi_mod_set_type (smod, type);
+ slapi_ch_free ((void**)&type);
+
+ for (i = 0; i < val_count; i++)
+ {
+ _cl5ReadBerval (&bv, &pos);
+ slapi_mod_add_value (smod, &bv);
+ slapi_ch_free((void **) &bv.bv_val);
+ }
+
+ (*buff) = pos;
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5GetModsSize (LDAPMod **mods)
+{
+ int size;
+ int i;
+
+ if (mods == NULL)
+ return 0;
+
+ size = sizeof (PRInt32);
+ for (i=0; mods[i]; i++)
+ {
+ size += _cl5GetModSize (mods[i]);
+ }
+
+ return size;
+}
+
+static int _cl5GetModSize (LDAPMod *mod)
+{
+ int size;
+ int i;
+
+ size = 1 + strlen (mod->mod_type) + 1 + sizeof (mod->mod_op);
+ i = 0;
+ if (mod->mod_op & LDAP_MOD_BVALUES) /* values are in binary form */
+ {
+ while (mod->mod_bvalues != NULL && mod->mod_bvalues[i] != NULL)
+ {
+ size += mod->mod_bvalues[i]->bv_len + sizeof (mod->mod_bvalues[i]->bv_len);
+ i++;
+ }
+ }
+ else /* string data */
+ {
+ PR_ASSERT(0); /* ggood string values should never be used in the server */
+ }
+
+ return size;
+}
+
+static void _cl5ReadBerval (struct berval *bv, char** buff)
+{
+ PRUint32 length = 0;
+ PRUint32 net_length = 0;
+
+ PR_ASSERT (bv && buff);
+
+ /***PINAKI need to do the copy first, to skirt around alignment problems on
+ certain architectures */
+ /* DBDB : struct berval.bv_len is defined as unsigned long
+ * But code here expects it to be 32-bits in size.
+ * On 64-bit machines, this is not the case.
+ * I changed the code to consistently use 32-bit (4-byte)
+ * values on the encoded side. This means that it's
+ * possible to generate a huge berval that will not
+ * be encoded properly. However, this seems unlikely
+ * to happen in reality, and I felt that retaining the
+ * old on-disk format for the changely in the 64-bit
+ * version of the server was important.
+ */
+
+ memcpy((char *)&net_length, *buff, sizeof(net_length));
+ length = PR_ntohl(net_length);
+ *buff += sizeof(net_length);
+ bv->bv_len = length;
+
+ if (bv->bv_len > 0) {
+ bv->bv_val = (char*)slapi_ch_malloc (bv->bv_len);
+ memcpy (bv->bv_val, *buff, bv->bv_len);
+ *buff += bv->bv_len;
+ }
+ else {
+ bv->bv_val = NULL;
+ }
+}
+
+static void _cl5WriteBerval (struct berval *bv, char** buff)
+{
+ PRUint32 length = 0;
+ PRUint32 net_length = 0;
+
+ length = (PRUint32) bv->bv_len;
+ net_length = PR_htonl(length);
+
+ memcpy(*buff, &net_length, sizeof (net_length));
+ *buff += sizeof (net_length);
+ memcpy (*buff, bv->bv_val, length);
+ *buff += length;
+}
+
+/* data format: <value count> <value size> <value> <value size> <value> ..... */
+static int _cl5ReadBervals (struct berval ***bv, char** buff, unsigned int size)
+{
+ PRInt32 count;
+ int i;
+ char *pos;
+
+ PR_ASSERT (bv && buff);
+
+ /* ONREPL - need to check that we don't go beyond the end of the buffer */
+
+ pos = *buff;
+ memcpy((char *)&count, pos, sizeof(count));
+ count = PR_htonl (count);
+ pos += sizeof(count);
+
+ /* allocate bervals */
+ *bv = (struct berval **)slapi_ch_malloc ((count + 1) * sizeof (struct berval*));
+ if (*bv == NULL)
+ {
+ return CL5_MEMORY_ERROR;
+ }
+
+ for (i = 0; i < count; i++)
+ {
+ (*bv)[i] = (struct berval *)slapi_ch_malloc (sizeof (struct berval));
+ if ((*bv)[i] == NULL)
+ {
+ ber_bvecfree(*bv);
+ return CL5_MEMORY_ERROR;
+ }
+
+ _cl5ReadBerval ((*bv)[i], &pos);
+ }
+
+ (*bv)[count] = NULL;
+ *buff = pos;
+
+ return CL5_SUCCESS;
+}
+
+/* data format: <value count> <value size> <value> <value size> <value> ..... */
+static int _cl5WriteBervals (struct berval **bv, char** buff, unsigned int *size)
+{
+ PRInt32 count, net_count;
+ char *pos;
+ int i;
+
+ PR_ASSERT (bv && buff && size);
+
+ /* compute number of values and size of the buffer to hold them */
+ *size = sizeof (count);
+ for (count = 0; bv[count]; count ++)
+ {
+ *size += sizeof (bv[count]->bv_len) + bv[count]->bv_len;
+ }
+
+ /* allocate buffer */
+ *buff = (char*) slapi_ch_malloc (*size);
+ if (*buff == NULL)
+ {
+ *size = 0;
+ return CL5_MEMORY_ERROR;
+ }
+
+ /* fill the buffer */
+ pos = *buff;
+ net_count = PR_htonl(count);
+ memcpy (pos, &net_count, sizeof (net_count));
+ pos += sizeof (net_count);
+ for (i = 0; i < count; i ++)
+ {
+ _cl5WriteBerval (bv[i], &pos);
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5DeadlockMain (void *param)
+{
+ PRIntervalTime interval;
+ int rc;
+
+ PR_AtomicIncrement (&s_cl5Desc.threadCount);
+ interval = PR_MillisecondsToInterval(100);
+ while (s_cl5Desc.dbState != CL5_STATE_CLOSING)
+ {
+ int aborted;
+ if ((rc = LOCK_DETECT(s_cl5Desc.dbEnv, 0, DB_LOCK_YOUNGEST, &aborted)) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5DeadlockMain: lock_detect failed (%d transaction%s aborted); db error - %d %s\n",
+ aborted, (aborted == 1)? "":"s", rc, db_strerror(rc));
+ }
+ else if (aborted)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5DeadlockMain: lock_detect succeeded, but %d transaction%s ha%s been aborted\n",
+ aborted, (aborted == 1)? "":"s", (aborted == 1)? "s":"ve");
+ }
+
+ DS_Sleep(interval);
+ }
+
+ PR_AtomicDecrement (&s_cl5Desc.threadCount);
+ return 0;
+}
+
+static int _cl5CheckpointMain (void *param)
+{
+ time_t lastCheckpointCompletion = 0;
+ PRIntervalTime interval;
+ int rc = -1;
+
+ PR_AtomicIncrement (&s_cl5Desc.threadCount);
+
+ interval = PR_MillisecondsToInterval(1000);
+ lastCheckpointCompletion = current_time();
+
+ while (s_cl5Desc.dbState != CL5_STATE_CLOSING)
+ {
+ /* Check to see if the checkpoint interval has elapsed */
+ if (current_time() - lastCheckpointCompletion > s_cl5Desc.dbConfig.checkpointInterval)
+ {
+ rc = TXN_CHECKPOINT(s_cl5Desc.dbEnv, 0, 0, 0);
+ if (rc == 0)
+ {
+ lastCheckpointCompletion = current_time();
+ }
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ else if (rc != DB_INCOMPLETE) /* real error happened */
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CheckpointMain: checkpoint failed, db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+#endif
+
+ /* According to dboreham, we are doing checkpoint twice
+ to reduce the number of transaction log files which need
+ to be retained at any time. */
+ rc = TXN_CHECKPOINT(s_cl5Desc.dbEnv, 0, 0, 0);
+ if (rc == 0)
+ {
+ lastCheckpointCompletion = current_time();
+ }
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ else if (rc != DB_INCOMPLETE) /* real error happened */
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CheckpointMain: checkpoint failed, db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+#endif
+
+ /* check if we should truncate logs */
+ if (s_cl5Desc.dbConfig.circularLogging)
+ {
+ char **list = NULL;
+ char **listp = NULL;
+ int rc = -1;
+ char filename[MAXPATHLEN + 1];
+
+ /* find out which log files don't contain active txns */
+ rc = LOG_ARCHIVE(s_cl5Desc.dbEnv, &list, 0, malloc);
+ if (0 == rc && NULL != list)
+ {
+ /* zap 'em ! */
+ for (listp = list; *listp != NULL; ++listp)
+ {
+ PR_snprintf(filename, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir,*listp);
+ PR_Delete (filename);
+ }
+ slapi_ch_free((void **)&list);
+ }
+ }
+ }
+
+ /* sleep for a while */
+ /* why aren't we sleeping exactly the right amount of time ? */
+ /* answer---because the interval might be changed after the server starts up */
+ DS_Sleep(interval);
+ }
+
+ PR_AtomicDecrement (&s_cl5Desc.threadCount);
+ return 0;
+}
+
+static int _cl5TrickleMain (void *param)
+{
+ PRIntervalTime interval;
+ int pages_written;
+ int rc;
+
+ PR_AtomicIncrement (&s_cl5Desc.threadCount);
+ interval = PR_MillisecondsToInterval(1000);
+ while (s_cl5Desc.dbState != CL5_STATE_CLOSING)
+ {
+ if ((rc = MEMP_TRICKLE(s_cl5Desc.dbEnv,
+ s_cl5Desc.dbConfig.tricklePercentage, &pages_written)) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5TrickleMain: memp_trickle failed; db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+
+ DS_Sleep(interval);
+ }
+
+ PR_AtomicDecrement (&s_cl5Desc.threadCount);
+
+ return 0;
+}
+
+/* upgrade from db33 to db41
+ * 1. Run recovery on the database environment using the DB_ENV->open method
+ * 2. Remove any Berkeley DB environment using the DB_ENV->remove method
+ * 3. extention .db3 -> .db4 ### koko kara !!!
+ */
+static int _cl5Upgrade3_4(char *fromVersion, char *toVersion)
+{
+ PRDir *dir = NULL;
+ PRDirEntry *entry = NULL;
+ DB *thisdb = NULL;
+ CL5OpenMode backup;
+ int rc = 0;
+
+ backup = s_cl5Desc.dbOpenMode;
+ s_cl5Desc.dbOpenMode = CL5_OPEN_CLEAN_RECOVER;
+ /* CL5_OPEN_CLEAN_RECOVER does 1 and 2 */
+ rc = _cl5AppInit (NULL);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Upgrade3_4: failed to open the db env\n");
+ return rc;
+ }
+
+ dir = PR_OpenDir(s_cl5Desc.dbDir);
+ if (dir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Upgrade3_4: failed to open changelog dir %s; NSPR error - %d\n",
+ s_cl5Desc.dbDir, PR_GetError ());
+ goto out;
+ }
+
+ while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT)))
+ {
+ if (NULL == entry->name)
+ {
+ break;
+ }
+ if (_cl5FileEndsWith(entry->name, DB_EXTENSION_DB3))
+ {
+ char oName [MAXPATHLEN + 1];
+ char nName [MAXPATHLEN + 1];
+ char *p = NULL;
+ char c;
+ int baselen = 0;
+ PR_snprintf(oName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, entry->name);
+ p = strstr(oName, DB_EXTENSION_DB3);
+ if (NULL == p)
+ {
+ continue;
+ }
+ /* db->rename closes DB; need to create every time */
+ rc = db_create(&thisdb, s_cl5Desc.dbEnv, 0);
+ if (0 != rc) {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Upgrade3_4: failed to get db handle\n");
+ goto out;
+ }
+
+ baselen = p - oName;
+ c = *p;
+ *p = '\0';
+ PR_snprintf(nName, MAXPATHLEN+1, "%s", oName);
+ PR_snprintf(nName + baselen, MAXPATHLEN+1-baselen, "%s", DB_EXTENSION);
+ *p = c;
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Upgrade3_4: renaming %s to %s\n", oName, nName);
+ rc = thisdb->rename(thisdb, (const char *)oName, NULL /* subdb */,
+ (const char *)nName, 0);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Upgrade3_4: failed to rename file (%s -> %s); "
+ "db error - %d %s\n", oName, nName, rc, db_strerror(rc));
+ break;
+ }
+ }
+ }
+ /* update the version file */
+ _cl5WriteDBVersion ();
+
+ /* update the guardian file */
+ _cl5WriteGuardian ();
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "Upgrading from %s to %s is successfully done (%s)\n",
+ fromVersion, toVersion, s_cl5Desc.dbDir);
+out:
+ if (NULL != dir)
+ {
+ PR_CloseDir(dir);
+ }
+ if (s_cl5Desc.dbEnv)
+ {
+ DB_ENV *dbEnv = s_cl5Desc.dbEnv;
+ dbEnv->close(dbEnv, 0);
+ s_cl5Desc.dbEnv = NULL;
+ }
+ return rc;
+}
+
+static int _cl5CheckDBVersion ()
+{
+ char clVersion [VERSION_SIZE + 1];
+ char dbVersion [VERSION_SIZE + 1];
+ int rc;
+
+ if (!cl5Exist (s_cl5Desc.dbDir))
+ {
+ /* this is new changelog - write DB version and guardian file */
+ rc = _cl5WriteDBVersion ();
+ if (rc == CL5_SUCCESS) {
+ rc = _cl5WriteGuardian();
+ }
+ }
+ else
+ {
+ PR_snprintf (clVersion, VERSION_SIZE, "%s/%s/%s", CL5_TYPE, REPL_PLUGIN_NAME,
+ CHANGELOG_DB_VERSION);
+ rc = _cl5ReadDBVersion (s_cl5Desc.dbDir, dbVersion);
+
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CheckDBVersion: invalid dbversion\n");
+ rc = CL5_BAD_DBVERSION;
+ }
+ else if (strcasecmp (clVersion, dbVersion) != 0)
+ {
+ char prevClVersion [VERSION_SIZE + 1];
+ PR_snprintf (prevClVersion, VERSION_SIZE, "%s/%s/%s",
+ CL5_TYPE, REPL_PLUGIN_NAME, CHANGELOG_DB_VERSION_PREV);
+ if (strcasecmp (prevClVersion, dbVersion) == 0)
+ {
+ /* upgrade */
+ rc = _cl5Upgrade3_4(prevClVersion, clVersion);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CheckDBVersion: upgrade %s -> %s failed\n",
+ CHANGELOG_DB_VERSION_PREV, CHANGELOG_DB_VERSION);
+ rc = CL5_BAD_DBVERSION;
+ }
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CheckDBVersion: invalid dbversion\n");
+ rc = CL5_BAD_DBVERSION;
+ }
+ }
+
+ }
+
+ return rc;
+}
+
+static int _cl5ReadDBVersion (const char *dir, char *clVersion)
+{
+ int rc;
+ PRFileDesc *file;
+ char fName [MAXPATHLEN + 1];
+ char buff [BUFSIZ];
+ PRInt32 size;
+ char *tok;
+ char * iter = NULL;
+
+ if (clVersion)
+ {
+ clVersion [0] = '\0';
+ }
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", dir, VERSION_FILE);
+
+ file = PR_Open (fName, PR_RDONLY, 777);
+ if (file == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5ReadDBVersion: failed to open DBVERSION; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ size = slapi_read_buffer (file, buff, BUFSIZ);
+ if (size < 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5ReadDBVersion: failed to read DBVERSION; NSPR error - %d\n",
+ PR_GetError ());
+ PR_Close (file);
+ return CL5_SYSTEM_ERROR;
+ }
+
+ /* parse the data */
+ buff[size]= '\0';
+ tok = ldap_utf8strtok_r (buff, "\n", &iter);
+ if (tok)
+ {
+ if (clVersion)
+ {
+ strcpy(clVersion, tok);
+ }
+ }
+
+ rc = PR_Close (file);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5ReadDBVersion: failed to close DBVERSION; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5WriteDBVersion ()
+{
+ int rc;
+ PRFileDesc *file;
+ char fName [MAXPATHLEN + 1];
+ char clVersion [VERSION_SIZE + 1];
+ PRInt32 len, size;
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, VERSION_FILE);
+
+ file = PR_Open (fName, PR_WRONLY | PR_CREATE_FILE, s_cl5Desc.dbConfig.fileMode);
+ if (file == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5WriteDBVersion: failed to open DBVERSION; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ /* write changelog version */
+ PR_snprintf (clVersion, VERSION_SIZE, "%s/%s/%s\n", CL5_TYPE, REPL_PLUGIN_NAME,
+ CHANGELOG_DB_VERSION);
+
+ len = strlen (clVersion);
+ size = slapi_write_buffer (file, clVersion, len);
+ if (size != len)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5WriteDBVersion: failed to write DBVERSION; NSPR error - %d\n",
+ PR_GetError ());
+ PR_Close (file);
+ return CL5_SYSTEM_ERROR;
+ }
+
+ rc = PR_Close (file);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5WriteDBVersion: failed to close DBVERSION; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+/* for now guardian file is just like dbversion file */
+static int _cl5CheckGuardian ()
+{
+ char plVersion [VERSION_SIZE + 1];
+ char dbVersion [VERSION_SIZE + 1];
+ int rc;
+
+ /* new changelog - no guardian file */
+ if (!cl5Exist(s_cl5Desc.dbDir))
+ {
+ return CL5_SUCCESS;
+ }
+ else
+ {
+ PR_snprintf (plVersion, VERSION_SIZE, "%s/%s/%s", CL5_TYPE, REPL_PLUGIN_NAME,
+ CHANGELOG_DB_VERSION);
+ rc = _cl5ReadGuardian (dbVersion);
+
+ if (rc != CL5_SUCCESS || strcasecmp (plVersion, dbVersion) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5CheckGuardian: missing or invalid guardian file\n");
+ return (CL5_BAD_FORMAT);
+ }
+
+ /* remove guardian file */
+ rc = _cl5RemoveGuardian ();
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5CheckGuardian: failed to remove guardian file\n");
+ }
+ }
+
+ return rc;
+}
+
+static int _cl5WriteGuardian ()
+{
+ int rc;
+ PRFileDesc *file;
+ char fName [MAXPATHLEN + 1];
+ char version [VERSION_SIZE];
+ PRInt32 len, size;
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, GUARDIAN_FILE);
+
+ file = PR_Open (fName, PR_WRONLY | PR_CREATE_FILE, s_cl5Desc.dbConfig.fileMode);
+ if (file == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5WriteGuardian: failed to open guardian file; NSPR error - %d\n",
+ PR_GetError());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ PR_snprintf (version, VERSION_SIZE, "%s/%s/%s\n", CL5_TYPE, REPL_PLUGIN_NAME,
+ CHANGELOG_DB_VERSION);
+
+ len = strlen (version);
+ size = slapi_write_buffer (file, version, len);
+ if (size != len)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteGuardian: failed to write guardian file; NSPR error - %d\n",
+ PR_GetError());
+ PR_Close (file);
+ return CL5_SYSTEM_ERROR;
+ }
+
+ rc = PR_Close (file);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5WriteGuardian: failed to close guardian file; NSPR error - %d\n",
+ PR_GetError());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5ReadGuardian (char *buff)
+{
+ int rc;
+ PRFileDesc *file;
+ char fName [MAXPATHLEN + 1];
+ PRInt32 size;
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, GUARDIAN_FILE);
+
+ file = PR_Open (fName, PR_RDONLY, 0);
+ if (file == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5ReadGuardian: failed to open guardian file; NSPR error - %d\n",
+ PR_GetError());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ size = slapi_read_buffer (file, buff, VERSION_SIZE);
+ if (size <= 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5ReadGuardian: failed to read guardian file; NSPR error - %d\n",
+ PR_GetError());
+ PR_Close (file);
+ return CL5_SYSTEM_ERROR;
+ }
+
+ buff [size-1] = '\0';
+
+ rc = PR_Close (file);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5ReadGuardian: failed to close guardian file; NSPR error - %d\n",
+ PR_GetError());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5RemoveGuardian ()
+{
+ char fName [MAXPATHLEN + 1];
+ int rc;
+
+ PR_snprintf (fName, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, GUARDIAN_FILE);
+
+ rc = PR_Delete (fName);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5RemoveGuardian: failed to remove guardian file; NSPR error - %d\n",
+ PR_GetError());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+/* must be called under the state lock */
+static void _cl5Close ()
+{
+ int rc2 = 0;
+ PRIntervalTime interval;
+
+ if (s_cl5Desc.dbState != CL5_STATE_CLOSED) /* Don't try to close twice */
+ {
+
+ /* close db files */
+ _cl5DBClose ();
+
+ /* stop global threads */
+ interval = PR_MillisecondsToInterval(100);
+ while (s_cl5Desc.threadCount > 0)
+ {
+ slapi_log_error( SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "_cl5Close: waiting for threads to exit: %d thread(s) still active\n",
+ s_cl5Desc.threadCount);
+ DS_Sleep(interval);
+ }
+
+ /* cleanup trimming */
+ _cl5TrimCleanup ();
+
+ /* shutdown db environment */
+ if (s_cl5Desc.dbEnv)
+ {
+ DB_ENV *dbEnv = s_cl5Desc.dbEnv;
+ rc2 = dbEnv->close(dbEnv, 0);
+ s_cl5Desc.dbEnv = NULL;
+ }
+
+ /* record successful close by writing guardian file;
+ we do it in all case accept incomplete open due to an error */
+ if (s_cl5Desc.dbState == CL5_STATE_CLOSING || s_cl5Desc.dbOpenMode != CL5_OPEN_NORMAL)
+ {
+ _cl5WriteGuardian ();
+ }
+
+ /* remove changelog if requested */
+ if (s_cl5Desc.dbRmOnClose)
+ {
+
+ if (_cl5Delete (s_cl5Desc.dbDir, 1) != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5Close: failed to remove changelog\n");
+ }
+ s_cl5Desc.dbRmOnClose = PR_FALSE;
+ }
+
+ slapi_ch_free ((void **)&s_cl5Desc.dbDir);
+ memset (&s_cl5Desc.dbConfig, 0, sizeof (s_cl5Desc.dbConfig));
+ s_cl5Desc.fatalError = PR_FALSE;
+ s_cl5Desc.threadCount = 0;
+ s_cl5Desc.dbOpenMode = CL5_OPEN_NONE;
+ }
+}
+
+static void _cl5DBClose ()
+{
+ if (NULL != s_cl5Desc.dbFiles)
+ {
+ objset_delete (&s_cl5Desc.dbFiles);
+ }
+ if (NULL != s_cl5Desc.fileLock)
+ {
+ PR_DestroyLock (s_cl5Desc.fileLock);
+ }
+}
+
+/* state lock must be locked */
+static int _cl5Delete (const char *clDir, int rmDir)
+{
+ PRDir *dir;
+ char filename[MAXPATHLEN + 1];
+ PRDirEntry *entry = NULL;
+ int rc;
+
+ /* remove all files in the directory and the directory */
+ dir = PR_OpenDir(clDir);
+ if (dir == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Delete: failed to open changelog dir; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+
+ }
+
+ while (NULL != (entry = PR_ReadDir(dir, PR_SKIP_DOT | PR_SKIP_DOT_DOT)))
+ {
+ if (NULL == entry->name)
+ {
+ break;
+ }
+ PR_snprintf(filename, MAXPATHLEN, "%s/%s", clDir, entry->name);
+ rc = PR_Delete(filename);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Delete: failed to remove (%s) file; NSPR error - %d\n",
+ filename, PR_GetError ());
+ }
+ }
+
+ rc = PR_CloseDir(dir);
+ if (rc != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Delete: failed to close changelog dir (%s); NSPR error - %d\n",
+ clDir, PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+
+ if (rmDir)
+ {
+ rc = PR_RmDir (clDir);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5Delete: failed to remove changelog dir (%s); errno = %d\n",
+ clDir, errno);
+ return CL5_SYSTEM_ERROR;
+ }
+ }
+
+ return CL5_SUCCESS;
+}
+
+static void _cl5SetDefaultDBConfig ()
+{
+ s_cl5Desc.dbConfig.cacheSize = CL5_DEFAULT_CONFIG_DB_DBCACHESIZE;
+ s_cl5Desc.dbConfig.durableTrans = CL5_DEFAULT_CONFIG_DB_DURABLE_TRANSACTIONS;
+ s_cl5Desc.dbConfig.checkpointInterval = CL5_DEFAULT_CONFIG_DB_CHECKPOINT_INTERVAL;
+ s_cl5Desc.dbConfig.circularLogging = CL5_DEFAULT_CONFIG_DB_CIRCULAR_LOGGING;
+ s_cl5Desc.dbConfig.pageSize = CL5_DEFAULT_CONFIG_DB_PAGE_SIZE;
+ s_cl5Desc.dbConfig.logfileSize = CL5_DEFAULT_CONFIG_DB_LOGFILE_SIZE;
+ s_cl5Desc.dbConfig.maxTxnSize = CL5_DEFAULT_CONFIG_DB_TXN_MAX;
+ s_cl5Desc.dbConfig.verbose = CL5_DEFAULT_CONFIG_DB_VERBOSE;
+ s_cl5Desc.dbConfig.debug = CL5_DEFAULT_CONFIG_DB_DEBUG;
+ s_cl5Desc.dbConfig.tricklePercentage = CL5_DEFAULT_CONFIG_DB_TRICKLE_PERCENTAGE;
+ s_cl5Desc.dbConfig.spinCount = CL5_DEFAULT_CONFIG_DB_SPINCOUNT;
+ s_cl5Desc.dbConfig.nb_lock_config = CL5_DEFAULT_CONFIG_NB_LOCK;
+ s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE;
+}
+
+static void _cl5SetDBConfig (const CL5DBConfig *config)
+{
+ /* through CL5DBConfig, we have access to all the LDAP configurable Changelog DB parameters */
+ s_cl5Desc.dbConfig.cacheSize = config->cacheSize;
+ s_cl5Desc.dbConfig.durableTrans = config->durableTrans;
+ s_cl5Desc.dbConfig.checkpointInterval = config->checkpointInterval;
+ s_cl5Desc.dbConfig.circularLogging = config->circularLogging;
+ s_cl5Desc.dbConfig.pageSize = config->pageSize;
+ s_cl5Desc.dbConfig.logfileSize = config->logfileSize;
+ s_cl5Desc.dbConfig.maxTxnSize = config->maxTxnSize;
+ s_cl5Desc.dbConfig.verbose = config->verbose;
+ s_cl5Desc.dbConfig.debug = config->debug;
+ s_cl5Desc.dbConfig.tricklePercentage = config->tricklePercentage;
+ s_cl5Desc.dbConfig.spinCount = config->spinCount;
+ s_cl5Desc.dbConfig.nb_lock_config = config->nb_lock_config;
+ s_cl5Desc.dbConfig.maxConcurrentWrites = config->maxConcurrentWrites;
+
+ if (config->spinCount != 0)
+ {
+ DB_ENV_SET_TAS_SPINS(s_cl5Desc.dbEnv, config->spinCount);
+ }
+
+ /* Some other configuration parameters are hardcoded... */
+ s_cl5Desc.dbConfig.fileMode = FILE_CREATE_MODE;
+}
+
+#define ONEG 1073741824 /* one giga bytes */
+static void _cl5InitDBEnv(DB_ENV *dbEnv)
+{
+ dbEnv->set_errpfx(dbEnv, "ns-slapd");
+ dbEnv->set_lg_max(dbEnv, s_cl5Desc.dbConfig.logfileSize);
+ dbEnv->set_tx_max(dbEnv, s_cl5Desc.dbConfig.maxTxnSize);
+ dbEnv->set_cachesize(dbEnv, s_cl5Desc.dbConfig.cacheSize/ONEG,
+ s_cl5Desc.dbConfig.cacheSize%ONEG,
+ 0);
+ /* Set default number of locks */
+ dbEnv->set_lk_max_locks(dbEnv, s_cl5Desc.dbConfig.nb_lock_config);
+
+ if (s_cl5Desc.dbConfig.verbose)
+ {
+ int on = 1;
+ dbEnv->set_verbose(dbEnv, DB_VERB_CHKPOINT, on);
+ dbEnv->set_verbose(dbEnv, DB_VERB_DEADLOCK, on);
+ dbEnv->set_verbose(dbEnv, DB_VERB_RECOVERY, on);
+ dbEnv->set_verbose(dbEnv, DB_VERB_WAITSFOR, on);
+ }
+ if (s_cl5Desc.dbConfig.debug)
+ {
+ dbEnv->set_errcall(dbEnv, _cl5DBLogPrint);
+ }
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3300
+ dbEnv->set_alloc(dbEnv, malloc, realloc, free);
+#endif
+}
+
+static void _cl5DBLogPrint(const char* prefix, char *buffer)
+{
+ /* We ignore the prefix since we know who we are anyway */
+ slapi_log_error (SLAPI_LOG_FATAL, repl_plugin_name_cl, "cl5: %s\n", buffer);
+}
+
+static PRBool _cl5IsLogFile (const char *path)
+{
+ int rc;
+
+ /* Is the filename at least 4 characters long ? */
+ if (strlen(path) < 4)
+ {
+ return PR_FALSE; /* Not a log file then */
+ }
+
+ /* Are the first 4 characters "log." ? */
+ rc = strncmp(path,"log.",4);
+ if (0 == rc)
+ {
+ /* Now, are the last 4 characters _not_ .db# ? */
+ const char *piece = path + (strlen(path) - 4);
+ rc = strcmp(piece, DB_EXTENSION);
+ if (0 != rc)
+ {
+ /* Is */
+ return PR_TRUE;
+ }
+ }
+ return PR_FALSE; /* Is not */
+}
+
+static int _cl5Recover (int open_flags, DB_ENV *dbEnv)
+{
+ /* If we're doing recovery, we MUST open the env single-threaded ! */
+ int recover_flags = open_flags & ~DB_THREAD;
+ int rc;
+
+ rc = dbEnv->open(dbEnv, s_cl5Desc.dbDir, recover_flags, s_cl5Desc.dbConfig.fileMode);
+
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Recover: appinit failed; db error - %d %s\n",
+ rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+
+ /* Now close it so we can re-open it again... */
+ dbEnv->close(dbEnv, 0);
+
+ return CL5_SUCCESS;
+}
+
+/* Trimming helper functions */
+static int _cl5TrimInit ()
+{
+ /* just create the lock while we are singlethreaded */
+ s_cl5Desc.dbTrim.lock = PR_NewLock();
+
+ if (s_cl5Desc.dbTrim.lock == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5InitTrimming: failed to create lock; NSPR error - %d\n",
+ PR_GetError ());
+ return CL5_SYSTEM_ERROR;
+ }
+ else
+ {
+ return CL5_SUCCESS;
+ }
+}
+
+static void _cl5TrimCleanup ()
+{
+ if (s_cl5Desc.dbTrim.lock)
+ PR_DestroyLock (s_cl5Desc.dbTrim.lock);
+
+ memset (&s_cl5Desc.dbTrim, 0, sizeof (s_cl5Desc.dbTrim));
+}
+
+static int _cl5TrimMain (void *param)
+{
+ PRIntervalTime interval;
+ time_t timePrev = current_time ();
+ time_t timeNow;
+
+ PR_AtomicIncrement (&s_cl5Desc.threadCount);
+ interval = PR_SecondsToInterval(CHANGELOGDB_TRIM_INTERVAL);
+
+ while (s_cl5Desc.dbState != CL5_STATE_CLOSING)
+ {
+ timeNow = current_time ();
+ if (timeNow - timePrev >= CHANGELOGDB_TRIM_INTERVAL)
+ {
+ /* time to trim */
+ timePrev = timeNow;
+ _cl5DoTrimming ();
+ }
+ if (NULL == s_cl5Desc.clLock)
+ {
+ /* most likely, emergency */
+ break;
+ }
+
+ PR_Lock(s_cl5Desc.clLock);
+ PR_WaitCondVar(s_cl5Desc.clCvar, interval);
+ PR_Unlock(s_cl5Desc.clLock);
+ }
+
+ PR_AtomicDecrement (&s_cl5Desc.threadCount);
+
+ return 0;
+}
+
+/* We remove an entry if it has been replayed to all consumers and
+ and the number of entries in the changelog is larger than maxEntries
+ or age of the entry is larger than maxAge.
+ Also we can't purge entries which correspond to max csns in the
+ supplier's ruv. Here is a example where we can get into trouble:
+ The server is setup with time based trimming and no consumer's
+ At some point all the entries are trimmed from the changelog.
+ At a later point a consumer is added and initialized online
+ Then a change is made on the supplier.
+ To update the consumer, the supplier would attempt to locate
+ the last change sent to the consumer in the changelog and will
+ fail because the change was removed.
+
+ */
+
+static void _cl5DoTrimming ()
+{
+ Object *obj;
+ long numToTrim;
+
+ PR_Lock (s_cl5Desc.dbTrim.lock);
+
+ /* ONREPL We trim file by file which means that some files will be
+ trimmed more often than other. We might have to fix that by, for
+ example, randomizing starting point */
+ obj = objset_first_obj (s_cl5Desc.dbFiles);
+ while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
+ {
+ _cl5TrimFile (obj, &numToTrim);
+ obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
+ }
+
+ if (obj)
+ object_release (obj);
+
+ PR_Unlock (s_cl5Desc.dbTrim.lock);
+
+ return;
+}
+
+/* Note that each file contains changes for a single replicated area.
+ trimming algorithm:
+*/
+#define CL5_TRIM_MAX_PER_TRANSACTION 10
+
+static void _cl5TrimFile (Object *obj, long *numToTrim)
+{
+ DB_TXN *txnid;
+ RUV *ruv = NULL;
+ CL5Entry entry;
+ slapi_operation_parameters op = {0};
+ void *it;
+ int finished = 0, totalTrimmed = 0, count;
+ PRBool abort;
+ char strCSN[CSN_STRSIZE];
+ int rc;
+
+ PR_ASSERT (obj);
+
+ /* construct the ruv up to which we can purge */
+ rc = _cl5GetRUV2Purge2 (obj, &ruv);
+ if (rc != CL5_SUCCESS || ruv == NULL)
+ {
+ return;
+ }
+
+ entry.op = &op;
+
+ while ( !finished && !g_get_shutdown() )
+ {
+ it = NULL;
+ count = 0;
+ txnid = NULL;
+ abort = PR_FALSE;
+
+ /* DB txn lock accessed pages until the end of the transaction. */
+
+ rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5TrimFile: failed to begin transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ finished = PR_TRUE;
+ break;
+ }
+
+ finished = _cl5GetFirstEntry (obj, &entry, &it, txnid);
+ while ( !finished )
+ {
+ /*
+ * This change can be trimmed if it exceeds purge
+ * parameters and has been seen by all consumers.
+ */
+ if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) &&
+ ruv_covers_csn_strict (ruv, op.csn) )
+ {
+ rc = _cl5CurrentDeleteEntry (it);
+ if ( rc == CL5_SUCCESS )
+ {
+ /* update purge vector */
+ rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE);
+ }
+ if ( rc == CL5_SUCCESS)
+ {
+ if (*numToTrim > 0) (*numToTrim)--;
+ count++;
+ }
+ else
+ {
+ /* The above two functions have logged the error */
+ abort = PR_TRUE;
+ }
+
+ }
+ else
+ {
+ /* The changelog DB is time ordered. If we can not trim
+ * a CSN, we will not be allowed to trim the rest of the
+ * CSNs generally. However, the maxcsn of each replica ID
+ * is always kept in the changelog as an anchor for
+ * replaying future changes. We have to skip those anchor
+ * CSNs, otherwise a non-active replica ID could block
+ * the trim forever.
+ */
+ CSN *maxcsn = NULL;
+ ReplicaId rid;
+
+ rid = csn_get_replicaid (op.csn);
+ ruv_get_largest_csn_for_replica (ruv, rid, &maxcsn);
+ if ( csn_compare (op.csn, maxcsn) != 0 )
+ {
+ /* op.csn is not anchor CSN */
+ finished = 1;
+ }
+ else
+ {
+ slapi_log_error (SLAPI_LOG_REPL, NULL,
+ "Changelog purge skipped anchor csn %s\n",
+ csn_as_string (maxcsn, PR_FALSE, strCSN));
+
+ /* extra read to skip the current record */
+ cl5_operation_parameters_done (&op);
+ finished =_cl5GetNextEntry (&entry, it);
+ }
+ if (maxcsn) csn_free (&maxcsn);
+ }
+ cl5_operation_parameters_done (&op);
+ if (finished || abort || count >= CL5_TRIM_MAX_PER_TRANSACTION)
+ {
+ /* If we reach CL5_TRIM_MAX_PER_TRANSACTION,
+ * we close the cursor,
+ * commit the transaction and restart a new transaction
+ */
+ break;
+ }
+ finished = _cl5GetNextEntry (&entry, it);
+ }
+
+ /* MAB: We need to close the cursor BEFORE the txn commits/aborts.
+ * If we don't respect this order, we'll screw up the database,
+ * placing it in DB_RUNRECOVERY mode
+ */
+ cl5DestroyIterator (it);
+
+ if (abort)
+ {
+ finished = 1;
+ rc = TXN_ABORT (txnid);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5TrimFile: failed to abort transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+ }
+ else
+ {
+ rc = TXN_COMMIT (txnid, 0);
+ if (rc != 0)
+ {
+ finished = 1;
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5TrimFile: failed to commit transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+ else
+ {
+ totalTrimmed += count;
+ }
+ }
+
+ } /* While (!finished) */
+
+ if (ruv)
+ ruv_destroy (&ruv);
+
+ if (totalTrimmed)
+ {
+ slapi_log_error (SLAPI_LOG_REPL, NULL, "Trimmed %d changes from the changelog\n", totalTrimmed);
+ }
+}
+
+static PRBool _cl5CanTrim (time_t time, long *numToTrim)
+{
+ *numToTrim = 0;
+
+ if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0)
+ return PR_FALSE;
+
+ if (s_cl5Desc.dbTrim.maxAge == 0)
+ {
+ *numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
+ return ( *numToTrim > 0 );
+ }
+
+ if (s_cl5Desc.dbTrim.maxEntries > 0 &&
+ (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
+ return PR_TRUE;
+
+ if (time)
+ return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
+ else
+ return PR_TRUE;
+}
+
+static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
+{
+ int rc;
+ char csnStr [CSN_STRSIZE];
+ DBT key={0}, data={0};
+ struct berval **vals;
+ CL5DBFile *file;
+ char *pos;
+ char *agmt_name;
+
+
+ PR_ASSERT (replGen && obj);
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ agmt_name = get_thread_private_agmtname();
+
+ if (purge) /* read purge vector entry */
+ key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
+ else /* read upper bound vector */
+ key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
+
+ key.size = CSN_STRSIZE;
+
+ data.flags = DB_DBT_MALLOC;
+
+ rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
+ switch (rc)
+ {
+ case 0: pos = data.data;
+ rc = _cl5ReadBervals (&vals, &pos, data.size);
+ free (data.data);
+ if (rc != CL5_SUCCESS)
+ return rc;
+
+ if (purge)
+ rc = ruv_init_from_bervals(vals, &file->purgeRUV);
+ else
+ rc = ruv_init_from_bervals(vals, &file->maxRUV);
+
+ if (rc != RUV_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: _cl5ReadRUV: failed to initialize %s ruv; "
+ "RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc);
+
+ return CL5_RUV_ERROR;
+ }
+
+ ber_bvecfree(vals);
+
+ /* delete the entry; it is re-added when file
+ is successfully closed */
+ file->db->del (file->db, NULL, &key, DEFAULT_DB_OP_FLAGS);
+
+ return CL5_SUCCESS;
+
+ case DB_NOTFOUND: /* RUV is lost - need to construct */
+ rc = _cl5ConstructRUV (replGen, obj, purge);
+ return rc;
+
+ default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "%s: _cl5ReadRUV: failed to get purge RUV; "
+ "db error - %d %s\n", agmt_name, rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+}
+
+static int _cl5WriteRUV (CL5DBFile *file, PRBool purge)
+{
+ int rc;
+ DBT key={0}, data={0};
+ char csnStr [CSN_STRSIZE];
+ struct berval **vals;
+ DB_TXN *txnid = NULL;
+
+ if ((purge && file->purgeRUV == NULL) || (!purge && file->maxRUV == NULL))
+ return CL5_SUCCESS;
+
+ if (purge)
+ {
+ key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
+ rc = ruv_to_bervals(file->purgeRUV, &vals);
+ }
+ else
+ {
+ key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
+ rc = ruv_to_bervals(file->maxRUV, &vals);
+ }
+
+ key.size = CSN_STRSIZE;
+
+ rc = _cl5WriteBervals (vals, (char**)&data.data, &data.size);
+ ber_bvecfree(vals);
+ if (rc != CL5_SUCCESS)
+ {
+ return rc;
+ }
+
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_begin(s_cl5Desc.dbEnv, NULL, &txnid, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteRUV: failed to begin transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+#endif
+ rc = file->db->put(file->db, txnid, &key, &data, DEFAULT_DB_OP_FLAGS);
+
+ slapi_ch_free ((void**)&data.data);
+ if ( rc == 0 )
+ {
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_commit (txnid, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteRUV: failed to commit transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+#endif
+ return CL5_SUCCESS;
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteRUV: failed to write %s RUV for file %s; db error - %d\n",
+ purge? "purge" : "upper bound", file->name, rc);
+
+ if (CL5_OS_ERR_IS_DISKFULL(rc))
+ {
+ cl5_set_diskfull();
+ return CL5_DB_ERROR;
+ }
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_abort (txnid);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteRUV: failed to abort transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+#endif
+ return CL5_DB_ERROR;
+ }
+}
+
+/* This is a very slow process since we have to read every changelog entry.
+ Hopefully, this function is not called too often */
+static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge)
+{
+ int rc;
+ CL5Entry entry;
+ void *iterator = NULL;
+ slapi_operation_parameters op = {0};
+ CL5DBFile *file;
+
+ PR_ASSERT (replGen && obj);
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ /* construct the RUV */
+ if (purge)
+ rc = ruv_init_new (replGen, 0, NULL, &file->purgeRUV);
+ else
+ rc = ruv_init_new (replGen, 0, NULL, &file->maxRUV);
+ if (rc != RUV_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV: "
+ "failed to initialize %s RUV for file %s; ruv error - %d\n",
+ purge? "purge" : "upper bound", file->name, rc);
+ return CL5_RUV_ERROR;
+ }
+
+ entry.op = &op;
+ rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL);
+ while (rc == CL5_SUCCESS)
+ {
+ if (purge)
+ rc = ruv_set_csns_keep_smallest(file->purgeRUV, op.csn);
+ else
+ rc = ruv_set_csns (file->maxRUV, op.csn, NULL);
+
+ cl5_operation_parameters_done (&op);
+ if (rc != RUV_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5ConstructRUV: "
+ "failed to updated %s RUV for file %s; ruv error - %d\n",
+ purge ? "purge" : "upper bound", file->name, rc);
+ rc = CL5_RUV_ERROR;
+ continue;
+ }
+
+ rc = _cl5GetNextEntry (&entry, iterator);
+ }
+
+ cl5_operation_parameters_done (&op);
+
+ if (iterator)
+ cl5DestroyIterator (iterator);
+
+ if (rc == CL5_NOTFOUND)
+ {
+ rc = CL5_SUCCESS;
+ }
+ else
+ {
+ if (purge)
+ ruv_destroy (&file->purgeRUV);
+ else
+ ruv_destroy (&file->maxRUV);
+ }
+
+ return rc;
+}
+
+static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge)
+{
+ ReplicaId rid;
+ int rc = RUV_SUCCESS; /* initialize rc to avoid erroneous logs */
+ CL5DBFile *file;
+
+ PR_ASSERT (obj && csn);
+
+ file = (CL5DBFile*)object_get_data (obj);
+
+ /* if purge is TRUE, file->purgeRUV must be set;
+ if purge is FALSE, maxRUV must be set */
+ PR_ASSERT (file && ((purge && file->purgeRUV) || (!purge && file->maxRUV)));
+
+ /* update vector only if this replica is not yet part of RUV */
+ if (purge && newReplica)
+ {
+ rid = csn_get_replicaid(csn);
+ if (ruv_contains_replica (file->purgeRUV, rid))
+ return CL5_SUCCESS;
+ else
+ {
+ /* if the replica is not part of the purgeRUV yet, add it */
+ ruv_add_replica (file->purgeRUV, rid, multimaster_get_local_purl());
+ }
+ }
+ else
+ {
+ if (purge)
+ rc = ruv_set_csns(file->purgeRUV, csn, NULL);
+ else
+ rc = ruv_set_csns(file->maxRUV, csn, NULL);
+ }
+
+ if (rc != RUV_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5UpdatePurgeRUV: "
+ "failed to update %s RUV for file %s; ruv error - %d\n",
+ purge ? "purge" : "upper bound", file->name, rc);
+ return CL5_RUV_ERROR;
+ }
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5EnumConsumerRUV (const ruv_enum_data *element, void *arg)
+{
+ int rc;
+ RUV *ruv;
+ CSN *csn = NULL;
+
+ PR_ASSERT (element && element->csn && arg);
+
+ ruv = (RUV*)arg;
+
+ rc = ruv_get_largest_csn_for_replica(ruv, csn_get_replicaid (element->csn), &csn);
+ if (rc != RUV_SUCCESS || csn == NULL || csn_compare (element->csn, csn) < 0)
+ {
+ ruv_set_max_csn(ruv, element->csn, NULL);
+ }
+
+ if (csn)
+ csn_free (&csn);
+
+ return 0;
+}
+
+static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv)
+{
+ int rc = CL5_SUCCESS;
+ CL5DBFile *dbFile;
+ Object *rObj = NULL;
+ Replica *r = NULL;
+ Object *agmtObj = NULL;
+ Repl_Agmt *agmt;
+ Object *consRUVObj, *supRUVObj;
+ RUV *consRUV, *supRUV;
+ CSN *csn;
+
+ PR_ASSERT (fileObj && ruv);
+
+ dbFile = (CL5DBFile*)object_get_data (fileObj);
+ PR_ASSERT (dbFile);
+
+ rObj = replica_get_by_name (dbFile->replName);
+ PR_ASSERT (rObj);
+ r = (Replica*)object_get_data (rObj);
+ PR_ASSERT (r);
+
+ /* We start with this replica's RUV. See note in _cl5DoTrimming */
+ supRUVObj = replica_get_ruv (r);
+ PR_ASSERT (supRUVObj);
+
+ supRUV = (RUV*)object_get_data (supRUVObj);
+ PR_ASSERT (supRUV);
+
+ *ruv = ruv_dup (supRUV);
+
+ object_release (supRUVObj);
+
+ agmtObj = agmtlist_get_first_agreement_for_replica (r);
+ while (agmtObj)
+ {
+ agmt = (Repl_Agmt*)object_get_data (agmtObj);
+ PR_ASSERT (agmt);
+
+ consRUVObj = agmt_get_consumer_ruv (agmt);
+ if (consRUVObj)
+ {
+ consRUV = (RUV*)object_get_data (consRUVObj);
+ rc = ruv_enumerate_elements (consRUV, _cl5EnumConsumerRUV, *ruv);
+ if (rc != RUV_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetRUV2Purge2: "
+ "failed to construct ruv; ruv error - %d\n", rc);
+ rc = CL5_RUV_ERROR;
+ object_release (consRUVObj);
+ object_release (agmtObj);
+ break;
+ }
+
+ object_release (consRUVObj);
+ }
+
+ agmtObj = agmtlist_get_next_agreement_for_replica (r, agmtObj);
+ }
+
+ /* check if there is any data in the constructed ruv - otherwise get rid of it */
+ if (ruv_get_max_csn(*ruv, &csn) != RUV_SUCCESS || csn == NULL)
+ {
+ ruv_destroy (ruv);
+ }
+ else
+ {
+ csn_free (&csn);
+ }
+
+ if (rObj)
+ object_release (rObj);
+
+ if (rc != CL5_SUCCESS && ruv)
+ ruv_destroy (ruv);
+
+ return rc;
+}
+
+static int _cl5GetEntryCount (CL5DBFile *file)
+{
+ int rc;
+ char csnStr [CSN_STRSIZE];
+ DBT key={0}, data={0};
+ DB_BTREE_STAT *stats = NULL;
+
+ PR_ASSERT (file);
+
+ /* read entry count. if the entry is there - the file was successfully closed
+ last time it was used */
+ key.data = _cl5GetHelperEntryKey (ENTRY_COUNT_TIME, csnStr);
+ key.size = CSN_STRSIZE;
+
+ data.flags = DB_DBT_MALLOC;
+
+ rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
+ switch (rc)
+ {
+ case 0: file->entryCount = *(int*)data.data;
+ free (data.data);
+
+ /* delete the entry. the entry is re-added when file
+ is successfully closed */
+ file->db->del (file->db, NULL, &key, DEFAULT_DB_OP_FLAGS);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetEntryCount: %d changes for replica %s\n",
+ file->entryCount, file->replName);
+ return CL5_SUCCESS;
+
+ case DB_NOTFOUND: file->entryCount = 0;
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR >= 3300
+ rc = file->db->stat(file->db, (void*)&stats, 0);
+#else
+ rc = file->db->stat(file->db, (void*)&stats, malloc, 0);
+#endif
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5GetEntryCount: failed to get changelog statistics; "
+ "db error - %d %s\n", rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+
+#ifdef DB30
+ file->entryCount = stats->bt_nrecs;
+#else /* DB31 */
+ file->entryCount = stats->bt_ndata;
+#endif
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetEntryCount: %d changes for replica %s\n",
+ file->entryCount, file->replName);
+
+ free (stats);
+ return CL5_SUCCESS;
+
+ default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5GetEntryCount: failed to get count entry; "
+ "db error - %d %s\n", rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+}
+
+static int _cl5WriteEntryCount (CL5DBFile *file)
+{
+ int rc;
+ DBT key={0}, data={0};
+ char csnStr [CSN_STRSIZE];
+ DB_TXN *txnid = NULL;
+
+ key.data = _cl5GetHelperEntryKey (ENTRY_COUNT_TIME, csnStr);
+ key.size = CSN_STRSIZE;
+ data.data = (void*)&file->entryCount;
+ data.size = sizeof (file->entryCount);
+
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_begin(s_cl5Desc.dbEnv, NULL, &txnid, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteEntryCount: failed to begin transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+#endif
+ rc = file->db->put(file->db, txnid, &key, &data, DEFAULT_DB_OP_FLAGS);
+ if (rc == 0)
+ {
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_commit (txnid, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteEntryCount: failed to commit transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ return CL5_DB_ERROR;
+ }
+#endif
+ return CL5_SUCCESS;
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteEntryCount: "
+ "failed to write count entry for file %s; db error - %d %s\n",
+ file->name, rc, db_strerror(rc));
+ if (CL5_OS_ERR_IS_DISKFULL(rc))
+ {
+ cl5_set_diskfull();
+ return CL5_DB_ERROR;
+ }
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_abort (txnid);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteEntryCount: failed to abort transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+#endif
+ return CL5_DB_ERROR;
+ }
+}
+
+static const char* _cl5OperationType2Str (int type)
+{
+ switch (type)
+ {
+ case SLAPI_OPERATION_ADD: return T_ADDCTSTR;
+ case SLAPI_OPERATION_MODIFY: return T_MODIFYCTSTR;
+ case SLAPI_OPERATION_MODRDN: return T_MODRDNCTSTR;
+ case SLAPI_OPERATION_DELETE: return T_DELETECTSTR;
+ default: return NULL;
+ }
+}
+
+static int _cl5Str2OperationType (const char *str)
+{
+ if (strcasecmp (str, T_ADDCTSTR) == 0)
+ return SLAPI_OPERATION_ADD;
+
+ if (strcasecmp (str, T_MODIFYCTSTR) == 0)
+ return SLAPI_OPERATION_MODIFY;
+
+ if (strcasecmp (str, T_MODRDNCTSTR) == 0)
+ return SLAPI_OPERATION_MODRDN;
+
+ if (strcasecmp (str, T_DELETECTSTR) == 0)
+ return SLAPI_OPERATION_DELETE;
+
+ return -1;
+}
+
+static int _cl5Operation2LDIF (const slapi_operation_parameters *op, const char *replGen,
+ char **ldifEntry, PRInt32 *lenLDIF)
+{
+ int len = 2;
+ lenstr *l = NULL;
+ const char *strType;
+ char *strDeleteOldRDN;
+ char *buff, *start;
+ LDAPMod **add_mods;
+ char *rawDN;
+ char strCSN[CSN_STRSIZE];
+
+ PR_ASSERT (op && replGen && ldifEntry && IsValidOperation (op));
+
+ strType = _cl5OperationType2Str (op->operation_type);
+ csn_as_string(op->csn,PR_FALSE,strCSN);
+
+ /* find length of the buffer */
+ len += LDIF_SIZE_NEEDED(strlen (T_CHANGETYPESTR), strlen (strType));
+ len += LDIF_SIZE_NEEDED(strlen (T_REPLGEN), strlen (replGen));
+ len += LDIF_SIZE_NEEDED(strlen (T_CSNSTR), strlen (strCSN));
+ len += LDIF_SIZE_NEEDED(strlen (T_UNIQUEIDSTR), strlen (op->target_address.uniqueid));
+
+ switch (op->operation_type)
+ {
+ case SLAPI_OPERATION_ADD: if (op->p.p_add.parentuniqueid)
+ len += LDIF_SIZE_NEEDED(strlen (T_PARENTIDSTR),
+ strlen (op->p.p_add.parentuniqueid));
+ slapi_entry2mods (op->p.p_add.target_entry, &rawDN, &add_mods);
+ len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (rawDN));
+ l = make_changes_string(add_mods, NULL);
+ len += LDIF_SIZE_NEEDED(strlen (T_CHANGESTR), l->ls_len);
+ ldap_mods_free (add_mods, 1);
+ break;
+
+ case SLAPI_OPERATION_MODIFY: len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (op->target_address.dn));
+ l = make_changes_string(op->p.p_modify.modify_mods, NULL);
+ len += LDIF_SIZE_NEEDED(strlen (T_CHANGESTR), l->ls_len);
+ break;
+
+ case SLAPI_OPERATION_MODRDN: len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (op->target_address.dn));
+ len += LDIF_SIZE_NEEDED(strlen (T_NEWRDNSTR),
+ strlen (op->p.p_modrdn.modrdn_newrdn));
+ strDeleteOldRDN = (op->p.p_modrdn.modrdn_deloldrdn ? "true" : "false");
+ len += LDIF_SIZE_NEEDED(strlen (T_DRDNFLAGSTR),
+ strlen (strDeleteOldRDN));
+ if (op->p.p_modrdn.modrdn_newsuperior_address.dn)
+ len += LDIF_SIZE_NEEDED(strlen (T_NEWSUPERIORDNSTR),
+ strlen (op->p.p_modrdn.modrdn_newsuperior_address.dn));
+ if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
+ len += LDIF_SIZE_NEEDED(strlen (T_NEWSUPERIORIDSTR),
+ strlen (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid));
+ l = make_changes_string(op->p.p_modrdn.modrdn_mods, NULL);
+ len += LDIF_SIZE_NEEDED(strlen (T_CHANGESTR), l->ls_len);
+ break;
+
+ case SLAPI_OPERATION_DELETE: len += LDIF_SIZE_NEEDED(strlen (T_DNSTR), strlen (op->target_address.dn));
+ break;
+
+ default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Operation2LDIF: invalid operation type - %d\n", op->operation_type);
+
+ return CL5_BAD_FORMAT;
+ }
+
+ /* allocate buffer */
+ buff = (char*)slapi_ch_malloc (len);
+ start = buff;
+ if (buff == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5Operation2LDIF: memory allocation failed\n");
+ return CL5_MEMORY_ERROR;
+ }
+
+ /* fill buffer */
+ ldif_put_type_and_value(&buff, T_CHANGETYPESTR, (char*)strType, strlen (strType));
+ ldif_put_type_and_value(&buff, T_REPLGEN, (char*)replGen, strlen (replGen));
+ ldif_put_type_and_value(&buff, T_CSNSTR, (char*)strCSN, strlen (strCSN));
+ ldif_put_type_and_value(&buff, T_UNIQUEIDSTR, op->target_address.uniqueid,
+ strlen (op->target_address.uniqueid));
+
+ switch (op->operation_type)
+ {
+ case SLAPI_OPERATION_ADD: if (op->p.p_add.parentuniqueid)
+ ldif_put_type_and_value(&buff, T_PARENTIDSTR,
+ op->p.p_add.parentuniqueid, strlen (op->p.p_add.parentuniqueid));
+ ldif_put_type_and_value(&buff, T_DNSTR, rawDN, strlen (rawDN));
+ ldif_put_type_and_value(&buff, T_CHANGESTR, l->ls_buf, l->ls_len);
+ slapi_ch_free ((void**)&rawDN);
+ break;
+
+ case SLAPI_OPERATION_MODIFY: ldif_put_type_and_value(&buff, T_DNSTR, op->target_address.dn,
+ strlen (op->target_address.dn));
+ ldif_put_type_and_value(&buff, T_CHANGESTR, l->ls_buf, l->ls_len);
+ break;
+
+ case SLAPI_OPERATION_MODRDN: ldif_put_type_and_value(&buff, T_DNSTR, op->target_address.dn,
+ strlen (op->target_address.dn));
+ ldif_put_type_and_value(&buff, T_NEWRDNSTR, op->p.p_modrdn.modrdn_newrdn,
+ strlen (op->p.p_modrdn.modrdn_newrdn));
+ ldif_put_type_and_value(&buff, T_DRDNFLAGSTR, strDeleteOldRDN,
+ strlen (strDeleteOldRDN));
+ if (op->p.p_modrdn.modrdn_newsuperior_address.dn)
+ ldif_put_type_and_value(&buff, T_NEWSUPERIORDNSTR,
+ op->p.p_modrdn.modrdn_newsuperior_address.dn,
+ strlen (op->p.p_modrdn.modrdn_newsuperior_address.dn));
+ if (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid)
+ ldif_put_type_and_value(&buff, T_NEWSUPERIORIDSTR,
+ op->p.p_modrdn.modrdn_newsuperior_address.uniqueid,
+ strlen (op->p.p_modrdn.modrdn_newsuperior_address.uniqueid));
+ ldif_put_type_and_value(&buff, T_CHANGESTR, l->ls_buf, l->ls_len);
+ break;
+
+ case SLAPI_OPERATION_DELETE: ldif_put_type_and_value(&buff, T_DNSTR, op->target_address.dn,
+ strlen (op->target_address.dn));
+ break;
+ }
+
+ *buff = '\n';
+ buff ++;
+ *buff = '\0';
+
+ *ldifEntry = start;
+ *lenLDIF = buff - start;
+
+ if (l)
+ lenstr_free(&l);
+
+ return CL5_SUCCESS;
+}
+
+static int
+_cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op, char **replGen)
+{
+ int rc;
+ int vlen;
+ char *next, *line;
+ char *type, *value;
+ Slapi_Mods *mods;
+ char *rawDN;
+
+ PR_ASSERT (op && ldifEntry && replGen);
+
+ memset (op, 0, sizeof (*op));
+
+ next = ldifEntry;
+ while ((line = ldif_getline(&next)) != NULL)
+ {
+ char *errmsg = NULL;
+
+ if ( *line == '\n' || *line == '\0' )
+ {
+ break;
+ }
+
+ /* this call modifies ldifEntry */
+ rc = ldif_parse_line(line, &type, &value, &vlen, &errmsg);
+ if (rc != 0)
+ {
+ if ( errmsg != NULL ) {
+ slapi_log_error(SLAPI_LOG_PARSE, repl_plugin_name_cl, "%s", errmsg);
+ slapi_ch_free( (void**)&errmsg );
+ }
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5LDIF2Operation: warning - failed to parse ldif line\n");
+ continue;
+ }
+
+ if (strcasecmp (type, T_CHANGETYPESTR) == 0)
+ {
+ op->operation_type = _cl5Str2OperationType (value);
+ }
+ else if (strcasecmp (type, T_REPLGEN) == 0)
+ {
+ *replGen = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_CSNSTR) == 0)
+ {
+ op->csn = csn_new_by_string(value);
+ }
+ else if (strcasecmp (type, T_UNIQUEIDSTR) == 0)
+ {
+ op->target_address.uniqueid = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_DNSTR) == 0)
+ {
+ PR_ASSERT (op->operation_type);
+
+ if (op->operation_type == SLAPI_OPERATION_ADD)
+ {
+ rawDN = slapi_ch_strdup (value);
+ op->target_address.dn = slapi_ch_strdup(rawDN);
+ }
+ else
+ op->target_address.dn = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_PARENTIDSTR) == 0)
+ {
+ op->p.p_add.parentuniqueid = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_NEWRDNSTR) == 0)
+ {
+ op->p.p_modrdn.modrdn_newrdn = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_DRDNFLAGSTR) == 0)
+ {
+ op->p.p_modrdn.modrdn_deloldrdn = (strcasecmp (value, "true") ? PR_FALSE : PR_TRUE);
+ }
+ else if (strcasecmp (type, T_NEWSUPERIORDNSTR) == 0)
+ {
+ op->p.p_modrdn.modrdn_newsuperior_address.dn = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_NEWSUPERIORIDSTR) == 0)
+ {
+ op->p.p_modrdn.modrdn_newsuperior_address.uniqueid = slapi_ch_strdup (value);
+ }
+ else if (strcasecmp (type, T_CHANGESTR) == 0)
+ {
+ PR_ASSERT (op->operation_type);
+
+ switch (op->operation_type)
+ {
+ case SLAPI_OPERATION_ADD: mods = parse_changes_string(value);
+ slapi_mods2entry (&(op->p.p_add.target_entry), rawDN,
+ slapi_mods_get_ldapmods_byref(mods));
+ slapi_ch_free ((void**)&rawDN);
+ slapi_mods_free (&mods);
+ break;
+
+ case SLAPI_OPERATION_MODIFY: mods = parse_changes_string(value);
+ PR_ASSERT (mods);
+ op->p.p_modify.modify_mods = slapi_mods_get_ldapmods_passout (mods);
+ slapi_mods_free (&mods);
+ break;
+
+ case SLAPI_OPERATION_MODRDN: mods = parse_changes_string(value);
+ PR_ASSERT (mods);
+ op->p.p_modrdn.modrdn_mods = slapi_mods_get_ldapmods_passout (mods);
+ slapi_mods_free (&mods);
+ break;
+
+ default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5LDIF2Operation: invalid operation type - %d\n",
+ op->operation_type);
+ return CL5_BAD_FORMAT;
+ }
+ }
+ }
+
+ if (IsValidOperation (op))
+ return CL5_SUCCESS;
+
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5LDIF2Operation: invalid data format\n");
+ return CL5_BAD_FORMAT;
+}
+
+static int _cl5WriteOperation(const char *replName, const char *replGen,
+ const slapi_operation_parameters *op, PRBool local)
+{
+ int rc;
+ int cnt;
+ DBT key={0};
+ DBT * data=NULL;
+ char csnStr [CSN_STRSIZE];
+ PRIntervalTime interval;
+ CL5Entry entry;
+ CL5DBFile *file = NULL;
+ Object *file_obj = NULL;
+ DB_TXN *txnid = NULL;
+
+ rc = _cl5GetDBFileByReplicaName (replName, replGen, &file_obj);
+ if (rc == CL5_NOTFOUND)
+ {
+ rc = _cl5DBOpenFileByReplicaName (replName, replGen, &file_obj,
+ PR_TRUE /* check for duplicates */);
+ if (rc != CL5_SUCCESS)
+ {
+ return rc;
+ }
+ }
+ else if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to get db file for target dn (%s)",
+ op->target_address.dn);
+ return CL5_OBJSET_ERROR;
+ }
+
+ /* assign entry time - used for trimming */
+ entry.time = current_time ();
+ entry.op = (slapi_operation_parameters *)op;
+
+ /* construct the key */
+ key.data = csn_as_string(op->csn, PR_FALSE, csnStr);
+ key.size = CSN_STRSIZE;
+
+ /* construct the data */
+ data = (DBT *) slapi_ch_calloc(1, sizeof(DBT));
+ rc = _cl5Entry2DBData (&entry, (char**)&data->data, &data->size);
+ if (rc != CL5_SUCCESS)
+ {
+ char s[CSN_STRSIZE];
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to convert entry with csn (%s) "
+ "to db format\n", csn_as_string(op->csn,PR_FALSE,s));
+ goto done;
+ }
+
+ file = (CL5DBFile*)object_get_data (file_obj);
+ PR_ASSERT (file);
+
+ /* if this is part of ldif2cl - just write the entry without transaction */
+ if (s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL)
+ {
+ rc = file->db->put(file->db, NULL, &key, data, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to write entry; db error - %d %s\n",
+ rc, db_strerror(rc));
+ if (CL5_OS_ERR_IS_DISKFULL(rc))
+ {
+ cl5_set_diskfull();
+ }
+ rc = CL5_DB_ERROR;
+ }
+ goto done;
+ }
+
+ /* write the entry */
+ rc = EAGAIN;
+ cnt = 0;
+
+ while ((rc == EAGAIN || rc == DB_LOCK_DEADLOCK) && cnt < MAX_TRIALS)
+ {
+ if (cnt != 0)
+ {
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ /* abort previous transaction */
+ rc = txn_abort (txnid);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to abort transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+#endif
+ /* back off */
+ interval = PR_MillisecondsToInterval(slapi_rand() % 100);
+ DS_Sleep(interval);
+ }
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ /* begin transaction */
+ rc = txn_begin(s_cl5Desc.dbEnv, NULL /*pid*/, &txnid, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to start transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+#endif
+
+ if ( file->sema )
+ {
+ PR_WaitSemaphore(file->sema);
+ }
+ rc = file->db->put(file->db, txnid, &key, data, DEFAULT_DB_OP_FLAGS);
+ if ( file->sema )
+ {
+ PR_PostSemaphore(file->sema);
+ }
+ if (CL5_OS_ERR_IS_DISKFULL(rc))
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteOperation: changelog (%s) DISK FULL; db error - %d %s\n",
+ s_cl5Desc.dbDir, rc, db_strerror(rc));
+ cl5_set_diskfull();
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+ if (cnt != 0)
+ {
+ if (rc == 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5WriteOperation: retry (%d) the transaction (csn=%s) succeeded\n", cnt, (char*)key.data);
+ }
+ else if ((cnt + 1) >= MAX_TRIALS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5WriteOperation: retry (%d) the transaction (csn=%s) failed (rc=%d)\n", cnt, (char*)key.data, rc);
+ }
+ }
+ cnt ++;
+ }
+
+ if (rc == 0) /* we successfully added entry */
+ {
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_commit (txnid, 0);
+#endif
+ }
+ else
+ {
+ char s[CSN_STRSIZE];
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to write entry with csn (%s); "
+ "db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s),
+ rc, db_strerror(rc));
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 4100
+ rc = txn_abort (txnid);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5WriteOperation: failed to abort transaction; db error - %d %s\n",
+ rc, db_strerror(rc));
+ }
+#endif
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+
+ /* update entry count - we assume that all entries are new */
+ PR_AtomicIncrement (&file->entryCount);
+
+ /* update purge vector if we have not seen any changes from this replica before */
+ _cl5UpdateRUV (file_obj, op->csn, PR_TRUE, PR_TRUE);
+
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "cl5WriteOperation: successfully written entry with csn (%s)\n", csnStr);
+ rc = CL5_SUCCESS;
+done:
+ if (data->data)
+ slapi_ch_free ((void**)&data->data);
+ slapi_ch_free((void**) &data);
+
+ if (file_obj)
+ object_release (file_obj);
+
+ return rc;
+}
+
+static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid)
+{
+ int rc;
+ DBC *cursor = NULL;
+ DBT key={0}, data={0};
+ CL5Iterator *it;
+ CL5DBFile *file;
+
+ PR_ASSERT (obj && entry && iterator);
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+ /* create cursor */
+ rc = file->db->cursor(file->db, txnid, &cursor, 0);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5GetFirstEntry: failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+
+ key.flags = DB_DBT_MALLOC;
+ data.flags = DB_DBT_MALLOC;
+ while ((rc = cursor->c_get(cursor, &key, &data, DB_NEXT)) == 0)
+ {
+ /* skip service entries */
+ if (cl5HelperEntry ((char*)key.data, NULL))
+ {
+ free (key.data);
+ free (data.data);
+ continue;
+ }
+
+ /* format entry */
+ free (key.data);
+ rc = cl5DBData2Entry (data.data, data.size, entry);
+ free (data.data);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetFirstOperation: failed to format entry\n", rc);
+ goto done;
+ }
+
+ it = (CL5Iterator*)slapi_ch_malloc (sizeof (CL5Iterator));
+ it->cursor = cursor;
+ object_acquire (obj);
+ it->file = obj;
+ *(CL5Iterator**)iterator = it;
+
+ return CL5_SUCCESS;
+ }
+
+ /* walked of the end of the file */
+ if (rc == DB_NOTFOUND)
+ {
+ rc = CL5_NOTFOUND;
+ goto done;
+ }
+
+ /* db error occured while iterating */
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5GetFirstEntry: failed to get entry; db error - %d %s\n", rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+
+ /* successfully retrieved next entry but it was out of range */
+ if (rc == CL5_SUCCESS)
+ {
+ free (key.data);
+ free (data.data);
+ rc = CL5_NOTFOUND;
+ goto done;
+ }
+
+done:;
+ /* error occured */
+ /* We didn't success in assigning this cursor to the iterator,
+ * so we need to free the cursor here */
+ if (cursor)
+ cursor->c_close(cursor);
+
+ return rc;
+}
+
+static int _cl5GetNextEntry (CL5Entry *entry, void *iterator)
+{
+ int rc;
+ CL5Iterator *it;
+ DBT key={0}, data={0};
+
+ PR_ASSERT (entry && iterator);
+
+ it = (CL5Iterator*) iterator;
+
+ key.flags = DB_DBT_MALLOC;
+ data.flags = DB_DBT_MALLOC;
+ while ((rc = it->cursor->c_get(it->cursor, &key, &data, DB_NEXT)) == 0)
+ {
+ if (cl5HelperEntry ((char*)key.data, NULL))
+ {
+ free (key.data);
+ free (data.data);
+ continue;
+ }
+
+ free (key.data);
+ /* format entry */
+ rc = cl5DBData2Entry (data.data, data.size, entry);
+ free (data.data);
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetNextEntry: failed to format entry\n", rc);
+ }
+
+ return rc;
+ }
+
+ /* walked of the end of the file or entry is out of range */
+ if (rc == 0 || rc == DB_NOTFOUND)
+ {
+ return CL5_NOTFOUND;
+ }
+
+ /* cursor operation failed */
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5GetNextEntry: failed to get entry; db error - %d %s\n", rc, db_strerror(rc));
+
+ return CL5_DB_ERROR;
+ }
+
+ return rc;
+}
+
+static int _cl5CurrentDeleteEntry (void *iterator)
+{
+ int rc;
+ CL5Iterator *it;
+ CL5DBFile *file;
+
+ PR_ASSERT (iterator);
+
+ it = (CL5Iterator*)iterator;
+
+ rc = it->cursor->c_del (it->cursor, 0);
+
+ if (rc == 0) {
+ /* decrement entry count */
+ file = (CL5DBFile*)object_get_data (it->file);
+ PR_AtomicDecrement (&file->entryCount);
+ return CL5_SUCCESS;
+ } else {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CurrentDeleteEntry failed, err=%d %s\n",
+ rc, db_strerror(rc));
+ /* We don't free(close) the cursor here, as the caller will free it by a call to cl5DestroyIterator */
+ /* Freeing it here is a potential bug, as the cursor can't be referenced later once freed */
+ return CL5_DB_ERROR;
+ }
+}
+
+static PRBool _cl5IsValidIterator (const CL5Iterator *iterator)
+{
+ return (iterator && iterator->cursor && iterator->file);
+}
+
+static int _cl5GetOperation (Object *replica, slapi_operation_parameters *op)
+{
+ int rc;
+ DBT key={0}, data={0};
+ CL5DBFile *file;
+ CL5Entry entry;
+ Object *obj = NULL;
+ char csnStr[CSN_STRSIZE];
+
+ rc = _cl5GetDBFile (replica, &obj);
+ if (rc != CL5_SUCCESS)
+ {
+ return rc;
+ }
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ /* construct the key */
+ key.data = csn_as_string(op->csn, PR_FALSE, csnStr);
+ key.size = CSN_STRSIZE;
+
+ data.flags = DB_DBT_MALLOC;
+
+ rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
+ switch (rc)
+ {
+ case 0: entry.op = op;
+ /* Callers of this function should cl5_operation_parameters_done(op) */
+ rc = cl5DBData2Entry (data.data, data.size, &entry);
+ if (rc == CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
+ "_cl5GetOperation: successfully retrieved operation with csn (%s)\n",
+ csnStr);
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetOperation: failed to convert db data to operation;"
+ " csn - %s\n", csnStr);
+ }
+ goto done;
+
+ case DB_NOTFOUND: slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetOperation: operation for csn (%s) is not found in db that should contain dn (%s)\n",
+ csnStr, op->target_address.dn);
+ rc = CL5_NOTFOUND;
+ goto done;
+
+ default: slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5GetOperation: failed to get entry for csn (%s); "
+ "db error - %d %s\n", csnStr, rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+
+done:;
+ if (obj)
+ object_release (obj);
+
+ if (data.data)
+ free (data.data);
+
+ return rc;
+}
+
+PRBool cl5HelperEntry (const char *csnstr, CSN *csnp)
+{
+ CSN *csn;
+ time_t csnTime;
+ PRBool retval = PR_FALSE;
+
+ if (csnp)
+ {
+ csn = csnp;
+ }
+ else
+ {
+ csn= csn_new_by_string(csnstr);
+ }
+ if (csn == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "cl5HelperEntry: failed to get csn time; csn error\n");
+ return PR_FALSE;
+ }
+ csnTime= csn_get_time(csn);
+
+ if (csnTime == ENTRY_COUNT_TIME || csnTime == PURGE_RUV_TIME)
+ {
+ retval = PR_TRUE;
+ }
+
+ if (NULL == csnp)
+ csn_free(&csn);
+ return retval;
+}
+
+/* Replay iteration helper functions */
+static PRBool _cl5ValidReplayIterator (const CL5ReplayIterator *iterator)
+{
+ if (iterator == NULL ||
+ iterator->consumerRuv == NULL || iterator->supplierRuvObj == NULL ||
+ iterator->fileObj == NULL)
+ return PR_FALSE;
+
+ return PR_TRUE;
+}
+
+/* Algorithm: ONREPL!!!
+ */
+struct replica_hash_entry
+{
+ ReplicaId rid; /* replica id */
+ PRBool sendChanges; /* indicates whether changes should be sent for this replica */
+};
+
+
+static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consumerRuv,
+ Object *replica, Object *fileObj, CL5ReplayIterator **iterator)
+{
+ CLC_Buffer *clcache = NULL;
+ CL5DBFile *file;
+ int i;
+ CSN **csns = NULL;
+ CSN *startCSN = NULL;
+ char csnStr [CSN_STRSIZE];
+ int rc = CL5_SUCCESS;
+ Object *supplierRuvObj = NULL;
+ RUV *supplierRuv = NULL;
+ ReplicaId supplierRID;
+ PRBool newReplica;
+ PRBool haveChanges = PR_FALSE;
+ char *agmt_name;
+ ReplicaId rid;
+
+ PR_ASSERT (consumerRuv && replica && fileObj && iterator);
+ csnStr[0] = '\0';
+
+ file = (CL5DBFile*)object_get_data (fileObj);
+ supplierRID = replica_get_rid((Replica*)object_get_data(replica));
+
+ /* get supplier's RUV */
+ supplierRuvObj = replica_get_ruv((Replica*)object_get_data(replica));
+ PR_ASSERT (supplierRuvObj);
+ supplierRuv = (RUV*)object_get_data (supplierRuvObj);
+ PR_ASSERT (supplierRuv);
+
+ agmt_name = get_thread_private_agmtname();
+ slapi_log_error(SLAPI_LOG_REPL, NULL, "_cl5PositionCursorForReplay (%s): Consumer RUV:\n", agmt_name);
+ ruv_dump (consumerRuv, agmt_name, NULL);
+ slapi_log_error(SLAPI_LOG_REPL, NULL, "_cl5PositionCursorForReplay (%s): Supplier RUV:\n", agmt_name);
+ ruv_dump (supplierRuv, agmt_name, NULL);
+
+ /*
+ * get the sorted list of SupplierMinCSN (if no ConsumerMaxCSN)
+ * and ConsumerMaxCSN for those RIDs where consumer is not
+ * up-to-date.
+ */
+ csns = cl5BuildCSNList (consumerRuv, supplierRuv);
+ if (csns == NULL)
+ {
+ rc = CL5_NOTFOUND;
+ goto done;
+ }
+
+ /* iterate over elements of consumer's (and/or supplier's) ruv */
+ for (i = 0; csns[i]; i++)
+ {
+ CSN *consumerMaxCSN = NULL;
+
+ rid = csn_get_replicaid(csns[i]);
+
+ /*
+ * Skip CSN that is originated from the consumer.
+ * If RID==65535, the CSN is originated from a
+ * legacy consumer. In this case the supplier
+ * and the consumer may have the same RID.
+ */
+ if (rid == consumerRID && rid != MAX_REPLICA_ID)
+ continue;
+
+ startCSN = csns[i];
+ csn_as_string(startCSN, PR_FALSE, csnStr);
+
+ rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
+ if ( rc != 0 ) goto done;
+
+ /* This is the first loading of this iteration. For replicas
+ * already known to the consumer, we exclude the last entry
+ * sent to the consumer by using DB_NEXT. However, for
+ * replicas new to the consumer, we include the first change
+ * ever generated by that replica.
+ */
+ newReplica = ruv_get_largest_csn_for_replica (consumerRuv, rid, &consumerMaxCSN);
+ csn_free(&consumerMaxCSN);
+ rc = clcache_load_buffer (clcache, startCSN, (newReplica ? DB_SET : DB_NEXT));
+
+ /* there is a special case which can occur just after migration - in this case,
+ the consumer RUV will contain the last state of the supplier before migration,
+ but the supplier will have an empty changelog, or the supplier changelog will
+ not contain any entries within the consumer min and max CSN - also, since
+ the purge RUV contains no CSNs, the changelog has never been purged
+ ASSUMPTIONS - it is assumed that the supplier had no pending changes to send
+ to any consumers; that is, we can assume that no changes were lost due to
+ either changelog purging or database reload - bug# 603061 - richm@netscape.com
+ */
+ if (rc == 0 || (rc == DB_NOTFOUND && !ruv_has_csns(file->purgeRUV)))
+ {
+ haveChanges = PR_TRUE;
+ rc = CL5_SUCCESS;
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
+ break;
+ }
+ else if (rc == DB_NOTFOUND) /* entry not found */
+ {
+ /* check whether this csn should be present */
+ rc = _cl5CheckMissingCSN (startCSN, supplierRuv, file);
+ if (rc == CL5_MISSING_DATA) /* we should have had the change but we don't */
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "%s: CSN %s not found, seems to be missing\n", agmt_name, csnStr);
+ break;
+ }
+ else /* we are not as up to date or we purged */
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "%s: CSN %s not found, we aren't as up to date, or we purged\n",
+ agmt_name, csnStr);
+ continue;
+ }
+ }
+ else
+ {
+
+ /* db error */
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "%s: Failed to retrieve change with CSN %s; db error - %d %s\n",
+ agmt_name, csnStr, rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ break;
+ }
+
+ } /* end for */
+
+ /* setup the iterator */
+ if (haveChanges)
+ {
+ *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
+
+ if (*iterator == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "%s: _cl5PositionCursorForReplay: failed to allocate iterator\n", agmt_name);
+ rc = CL5_MEMORY_ERROR;
+ goto done;
+ }
+
+ /* ONREPL - should we make a copy of both RUVs here ?*/
+ (*iterator)->fileObj = fileObj;
+ (*iterator)->clcache = clcache; clcache = NULL;
+ (*iterator)->consumerRID = consumerRID;
+ (*iterator)->consumerRuv = consumerRuv;
+ (*iterator)->supplierRuvObj = supplierRuvObj;
+ }
+ else if (rc == CL5_SUCCESS)
+ {
+ /* we have no changes to send */
+ rc = CL5_NOTFOUND;
+ }
+
+done:
+ if ( clcache )
+ clcache_return_buffer ( &clcache );
+
+ if (csns)
+ cl5DestroyCSNList (&csns);
+
+ if (rc != CL5_SUCCESS)
+ {
+ if (supplierRuvObj)
+ object_release (supplierRuvObj);
+ }
+
+ return rc;
+}
+
+struct ruv_it
+{
+ CSN **csns; /* csn list */
+ int alloc; /* allocated size */
+ int pos; /* position in the list */
+};
+
+static int ruv_consumer_iterator (const ruv_enum_data *enum_data, void *arg)
+{
+ struct ruv_it *data = (struct ruv_it*)arg;
+
+ PR_ASSERT (data);
+
+ /* check if we have space for one more element */
+ if (data->pos >= data->alloc - 2)
+ {
+ data->alloc += 4;
+ data->csns = (CSN**) slapi_ch_realloc ((void*)data->csns, data->alloc * sizeof (CSN*));
+ }
+
+ data->csns [data->pos] = csn_dup (enum_data->csn);
+ data->pos ++;
+
+ return 0;
+}
+
+
+static int ruv_supplier_iterator (const ruv_enum_data *enum_data, void *arg)
+{
+ int i;
+ PRBool found = PR_FALSE;
+ ReplicaId rid;
+ struct ruv_it *data = (struct ruv_it*)arg;
+
+ PR_ASSERT (data);
+
+ rid = csn_get_replicaid (enum_data->min_csn);
+ /* check if the replica that generated the csn is already in the list */
+ for (i = 0; i < data->pos; i++)
+ {
+ if (rid == csn_get_replicaid (data->csns[i]))
+ {
+ found = PR_TRUE;
+
+ /* remove datacsn[i] if it is greater or equal to the supplier's maxcsn */
+ if ( csn_compare ( data->csns[i], enum_data->csn ) >= 0 )
+ {
+ int j;
+
+ csn_free ( & data->csns[i] );
+ for (j = i+1; j < data->pos; j++)
+ {
+ data->csns [j-1] = data->csns [j];
+ }
+ data->pos --;
+ }
+ break;
+ }
+ }
+
+ if (!found)
+ {
+ /* check if we have space for one more element */
+ if (data->pos >= data->alloc - 2)
+ {
+ data->alloc += 4;
+ data->csns = (CSN**)slapi_ch_realloc ((void*)data->csns,
+ data->alloc * sizeof (CSN*));
+ }
+
+ data->csns [data->pos] = csn_dup (enum_data->min_csn);
+ data->pos ++;
+ }
+ return 0;
+}
+
+
+
+static int
+my_csn_compare(const void *arg1, const void *arg2)
+{
+ return(csn_compare(*((CSN **)arg1), *((CSN **)arg2)));
+}
+
+
+
+/* builds CSN ordered list of all csns in the RUV */
+CSN** cl5BuildCSNList (const RUV *consRuv, const RUV *supRuv)
+{
+ struct ruv_it data;
+ int count, rc;
+ CSN **csns;
+
+ PR_ASSERT (consRuv);
+
+ count = ruv_replica_count (consRuv);
+ csns = (CSN**)slapi_ch_calloc (count + 1, sizeof (CSN*));
+
+ data.csns = csns;
+ data.alloc = count + 1;
+ data.pos = 0;
+
+ /* add consumer elements to the list */
+ rc = ruv_enumerate_elements (consRuv, ruv_consumer_iterator, &data);
+ if (rc == 0 && supRuv)
+ {
+ /* add supplier elements to the list */
+ rc = ruv_enumerate_elements (supRuv, ruv_supplier_iterator, &data);
+ }
+
+ /* we have no csns */
+ if (data.csns[0] == NULL)
+ {
+ /* csns might have been realloced in ruv_supplier_iterator() */
+ slapi_ch_free ((void**)&data.csns);
+ csns = NULL;
+ }
+ else
+ {
+ csns = data.csns;
+ data.csns [data.pos] = NULL;
+ if (rc == 0)
+ {
+ qsort (csns, data.pos, sizeof (CSN*), my_csn_compare);
+ }
+ else
+ {
+ cl5DestroyCSNList (&csns);
+ }
+ }
+
+ return csns;
+}
+
+void cl5DestroyCSNList (CSN*** csns)
+{
+ if (csns && *csns)
+ {
+ int i;
+
+ for (i = 0; (*csns)[i]; i++)
+ {
+ csn_free (&(*csns)[i]);
+ }
+
+ slapi_ch_free ((void**)csns);
+ }
+}
+
+/* A csn should be in the changelog if it is larger than purge vector csn for the same
+ replica and is smaller than the csn in supplier's ruv for the same replica.
+ The functions returns
+ CL5_PURGED if data was purged from the changelog or was never logged
+ because it was loaded as part of replica initialization
+ CL5_MISSING if the data erouneously missing
+ CL5_SUCCESS if that has not and should not been seen by the server
+ */
+static int _cl5CheckMissingCSN (const CSN *csn, const RUV *supplierRuv, CL5DBFile *file)
+{
+ ReplicaId rid;
+ CSN *supplierCsn = NULL;
+ CSN *purgeCsn = NULL;
+ int rc = CL5_SUCCESS;
+ char csnStr [CSN_STRSIZE];
+
+ PR_ASSERT (csn && supplierRuv && file);
+
+ rid = csn_get_replicaid (csn);
+ ruv_get_largest_csn_for_replica (supplierRuv, rid, &supplierCsn);
+ if (supplierCsn == NULL)
+ {
+ /* we have not seen any changes from this replica so it is
+ ok not to have this csn */
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: "
+ "can't locate %s csn: we have not seen any changes for replica %d\n",
+ csn_as_string (csn, PR_FALSE, csnStr), rid);
+ return CL5_SUCCESS;
+ }
+
+ ruv_get_largest_csn_for_replica (file->purgeRUV, rid, &purgeCsn);
+ if (purgeCsn == NULL)
+ {
+ /* changelog never contained any changes for this replica */
+ if (csn_compare (csn, supplierCsn) <= 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: "
+ "the change with %s csn was never logged because it was imported "
+ "during replica initialization\n", csn_as_string (csn, PR_FALSE, csnStr));
+ rc = CL5_PURGED_DATA; /* XXXggood is that the correct return value? */
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: "
+ "change with %s csn has not yet been seen by this server; "
+ " last csn seen from that replica is %s\n",
+ csn_as_string (csn, PR_FALSE, csnStr),
+ csn_as_string (supplierCsn, PR_FALSE, csnStr));
+ rc = CL5_SUCCESS;
+ }
+ }
+ else /* we have both purge and supplier csn */
+ {
+ if (csn_compare (csn, purgeCsn) < 0) /* the csn is below the purge point */
+ {
+ rc = CL5_PURGED_DATA;
+ }
+ else
+ {
+ if (csn_compare (csn, supplierCsn) <= 0) /* we should have the data but we don't */
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: "
+ "change with %s csn has been purged by this server; "
+ "the current purge point for that replica is %s\n",
+ csn_as_string (csn, PR_FALSE, csnStr),
+ csn_as_string (purgeCsn, PR_FALSE, csnStr));
+ rc = CL5_MISSING_DATA;
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5CheckMissingCSN: "
+ "change with %s csn has not yet been seen by this server; "
+ " last csn seen from that replica is %s\n",
+ csn_as_string (csn, PR_FALSE, csnStr),
+ csn_as_string (supplierCsn, PR_FALSE, csnStr));
+ rc = CL5_SUCCESS;
+ }
+ }
+ }
+
+ if (supplierCsn)
+ csn_free (&supplierCsn);
+
+ if (purgeCsn)
+ csn_free (&purgeCsn);
+
+ return rc;
+}
+
+/* Helper functions that work with individual changelog files */
+
+/* file name format : <replica name>_<replica generation>db{2,3} */
+static PRBool _cl5FileName2Replica (const char *file_name, Object **replica)
+{
+ Replica *r;
+ char *repl_name, *file_gen, *repl_gen;
+ int len;
+
+ PR_ASSERT (file_name && replica);
+
+ *replica = NULL;
+
+ /* this is database file */
+ if (_cl5FileEndsWith (file_name, DB_EXTENSION) ||
+ _cl5FileEndsWith (file_name, DB_EXTENSION_DB3) )
+ {
+ repl_name = slapi_ch_strdup (file_name);
+ file_gen = strstr(repl_name, FILE_SEP);
+ if (file_gen)
+ {
+ int extlen = strlen(DB_EXTENSION);
+ *file_gen = '\0';
+ file_gen += strlen (FILE_SEP);
+ len = strlen (file_gen);
+ if (len <= extlen + 1)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica "
+ "invalid file name (%s)\n", file_name);
+ }
+ else
+ {
+ /* get rid of the file extension */
+ file_gen [len - extlen - 1] = '\0';
+ *replica = replica_get_by_name (repl_name);
+ if (*replica)
+ {
+ /* check that generation matches the one in replica object */
+ r = (Replica*)object_get_data (*replica);
+ repl_gen = replica_get_generation (r);
+ PR_ASSERT (repl_gen);
+ if (strcmp (file_gen, repl_gen) != 0)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica "
+ "replica generation mismatch for replica at (%s), "
+ "file generation %s, new replica generation %s\n",
+ slapi_sdn_get_dn (replica_get_root (r)), file_gen, repl_gen);
+
+ object_release (*replica);
+ *replica = NULL;
+ }
+ slapi_ch_free ((void**)&repl_gen);
+ }
+ }
+ slapi_ch_free ((void**)&repl_name);
+ }
+ else
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5FileName2Replica "
+ "malformed file name - %s\n", file_name);
+ }
+
+ return PR_TRUE;
+ }
+ else
+ return PR_FALSE;
+}
+
+/* file name format : <replica name>_<replica generation>db{2,3} */
+static char* _cl5Replica2FileName (Object *replica)
+{
+ const char *replName;
+ char *replGen, *fileName;
+ Replica *r;
+
+ PR_ASSERT (replica);
+
+ r = (Replica*)object_get_data (replica);
+ PR_ASSERT (r);
+
+ replName = replica_get_name (r);
+ replGen = replica_get_generation (r);
+
+ fileName = _cl5MakeFileName (replName, replGen) ;
+
+ slapi_ch_free ((void**)&replGen);
+
+ return fileName;
+}
+
+static char* _cl5MakeFileName (const char *replName, const char *replGen)
+{
+ char *fileName;
+ fileName = slapi_ch_malloc (strlen (replName) + strlen (replGen) +
+ strlen (DB_EXTENSION) + 3/* '_' + '.' + '\0' */);
+ sprintf (fileName, "%s%s%s.%s", replName, FILE_SEP, replGen, DB_EXTENSION);
+
+ return fileName;
+}
+
+/* open file that corresponds to a particular database */
+static int _cl5DBOpenFile (Object *replica, Object **obj, PRBool checkDups)
+{
+ int rc;
+ const char *replName;
+ char *replGen;
+ Replica *r;
+
+ PR_ASSERT (replica);
+
+ r = (Replica*)object_get_data (replica);
+ replName = replica_get_name (r);
+ PR_ASSERT (replName);
+ replGen = replica_get_generation (r);
+ PR_ASSERT (replGen);
+
+ rc = _cl5DBOpenFileByReplicaName (replName, replGen, obj, checkDups);
+
+ slapi_ch_free ((void**)&replGen);
+
+ return rc;
+}
+
+static int _cl5DBOpenFileByReplicaName (const char *replName, const char *replGen,
+ Object **obj, PRBool checkDups)
+{
+ int rc = CL5_SUCCESS;
+ Object *tmpObj;
+ CL5DBFile *file;
+ char *file_name;
+
+ PR_ASSERT (replName && replGen);
+
+ if (checkDups)
+ {
+ PR_Lock (s_cl5Desc.fileLock);
+ file_name = _cl5MakeFileName (replName, replGen);
+ tmpObj = objset_find (s_cl5Desc.dbFiles, _cl5CompareDBFile, file_name);
+ slapi_ch_free((void **)&file_name);
+ file_name = NULL;
+ if (tmpObj) /* this file already exist */
+ {
+ /* if we were asked for file handle - keep the handle */
+ if (obj)
+ {
+ *obj = tmpObj;
+ }
+ else
+ {
+ object_release (tmpObj);
+ }
+
+ rc = CL5_SUCCESS;
+ goto done;
+ }
+ }
+
+ rc = _cl5NewDBFile (replName, replGen, &file);
+ if (rc == CL5_SUCCESS)
+ {
+ /* This creates the file but doesn't set the init flag
+ * The flag is set later when the purge and max ruvs are set.
+ * This is to prevent some thread to get file access before the
+ * structure is fully initialized */
+ rc = _cl5AddDBFile (file, &tmpObj);
+ if (rc == CL5_SUCCESS)
+ {
+ /* read purge RUV - done here because it needs file object rather than file pointer */
+ rc = _cl5ReadRUV (replGen, tmpObj, PR_TRUE);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DBOpenFileByReplicaName: failed to get purge RUV\n");
+ goto done;
+ }
+
+ /* read ruv that represents the upper bound of the changes stored in the file */
+ rc = _cl5ReadRUV (replGen, tmpObj, PR_FALSE);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5DBOpenFileByReplicaName: failed to get upper bound RUV\n");
+ goto done;
+ }
+
+ /* Mark the DB File initialize */
+ _cl5DBFileInitialized(tmpObj);
+
+ if (obj)
+ {
+ *obj = tmpObj;
+ }
+ else
+ {
+ object_release (tmpObj);
+ }
+ }
+ }
+
+done:;
+ if (rc != CL5_SUCCESS)
+ {
+ if (file)
+ _cl5DBCloseFile ((void**)&file);
+ }
+
+ if (checkDups)
+ {
+ PR_Unlock (s_cl5Desc.fileLock);
+ }
+
+ return rc;
+}
+
+/* adds file to the db file list */
+static int _cl5AddDBFile (CL5DBFile *file, Object **obj)
+{
+ int rc;
+ Object *tmpObj;
+
+ PR_ASSERT (file);
+
+ tmpObj = object_new (file, _cl5DBCloseFile);
+ rc = objset_add_obj(s_cl5Desc.dbFiles, tmpObj);
+ if (rc != OBJSET_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5AddDBFile: failed to add db file to the list; "
+ "repl_objset error - %d\n", rc);
+ object_release (tmpObj);
+ return CL5_OBJSET_ERROR;
+ }
+
+ if (obj)
+ {
+ *obj = tmpObj;
+ }
+ else
+ object_release (tmpObj);
+
+ return CL5_SUCCESS;
+}
+
+static int _cl5NewDBFile (const char *replName, const char *replGen, CL5DBFile** dbFile)
+{
+ int rc;
+ DB *db = NULL;
+ char *name;
+ char *semadir;
+#ifdef HPUX
+ char cwd [PATH_MAX+1];
+#endif
+
+ PR_ASSERT (replName && replGen && dbFile);
+
+ (*dbFile) = (CL5DBFile *)slapi_ch_calloc (1, sizeof (CL5DBFile));
+ if (*dbFile == NULL)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5NewDBFile: memory allocation failed\n");
+ return CL5_MEMORY_ERROR;
+ }
+
+ name = _cl5MakeFileName (replName, replGen);
+ {
+ /* The subname argument allows applications to have
+ * subdatabases, i.e., multiple databases inside of a single
+ * physical file. This is useful when the logical databases
+ * are both numerous and reasonably small, in order to
+ * avoid creating a large number of underlying files.
+ */
+ char *subname = NULL;
+ DB_ENV *dbEnv = s_cl5Desc.dbEnv;
+
+ rc = db_create(&db, dbEnv, 0);
+ if (0 != rc) {
+ goto out;
+ }
+
+ rc = db->set_pagesize(
+ db,
+ s_cl5Desc.dbConfig.pageSize);
+
+ if (0 != rc) {
+ goto out;
+ }
+
+#if 1000*DB_VERSION_MAJOR + 100*DB_VERSION_MINOR < 3300
+ rc = db->set_malloc(db, malloc);
+ if (0 != rc) {
+ goto out;
+ }
+#endif
+
+ DB_OPEN(s_cl5Desc.dbEnvOpenFlags,
+ db, NULL /* txnid */, name, subname, DB_BTREE,
+ DB_CREATE | DB_THREAD, s_cl5Desc.dbConfig.fileMode, rc);
+ }
+out:
+ if (rc != 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5NewDBFile: db_open failed; db error - %d %s\n",
+ rc, db_strerror(rc));
+ rc = CL5_DB_ERROR;
+ goto done;
+ }
+
+ (*dbFile)->db = db;
+ (*dbFile)->name = name;
+ (*dbFile)->replName = slapi_ch_strdup (replName);
+ (*dbFile)->replGen = slapi_ch_strdup (replGen);
+
+ /*
+ * Considerations for setting up cl semaphore:
+ * (1) The NT version of SleepyCat uses test-and-set mutexes
+ * at the DB page level instead of blocking mutexes. That has
+ * proven to be a killer for the changelog DB, as this DB is
+ * accessed by multiple a reader threads (the repl thread) and
+ * writer threads (the server ops threads) usually at the last
+ * pages of the DB, due to the sequential nature of the changelog
+ * keys. To avoid the test-and-set mutexes, we could use semaphore
+ * to serialize the writers and avoid the high mutex contention
+ * that SleepyCat is unable to avoid.
+ * (2) [610948] Linux master hangs for 2 hours
+ * [611239] _cl5DeadlockMain: lock_detect succeeded
+ * (3) DS 6.2 introduced the semaphore on all platforms (replaced
+ * the serial lock used on Windows and Linux described above).
+ * The number of the concurrent writes now is configurable by
+ * nsslapd-changelogmaxconcurrentwrites (the server needs to
+ * be restarted).
+ */
+
+ semadir = s_cl5Desc.dbDir;
+#ifdef HPUX
+ /*
+ * HP sem_open() does not allow pathname component "./" or "../"
+ * in the semaphore name. For simplicity and to avoid doing
+ * chdir() in multi-thread environment, current working dir
+ * (log dir) is used to replace the original semaphore dir
+ * if it contains "./".
+ */
+ if ( strstr ( semadir, "./" ) != NULL && getcwd ( cwd, PATH_MAX+1 ) != NULL )
+ {
+ semadir = cwd;
+ }
+#endif
+
+ if ( semadir != NULL )
+ {
+ (*dbFile)->semaName = slapi_ch_malloc (strlen(semadir) + strlen(replName) + strlen(".sema") + 10);
+ sprintf ((*dbFile)->semaName, "%s/%s.sema", semadir, replName);
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5NewDBFile: semaphore %s\n", (*dbFile)->semaName);
+ (*dbFile)->sema = PR_OpenSemaphore((*dbFile)->semaName, PR_SEM_CREATE, 0666, s_cl5Desc.dbConfig.maxConcurrentWrites );
+ slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5NewDBFile: maxConcurrentWrites=%d\n", s_cl5Desc.dbConfig.maxConcurrentWrites );
+ }
+
+ if ((*dbFile)->sema == NULL )
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5NewDBFile: failed to create semaphore %s; NSPR error - %d\n",
+ (*dbFile)->semaName ? (*dbFile)->semaName : "(nil)", PR_GetError ());
+ rc = CL5_SYSTEM_ERROR;
+ goto done;
+ }
+
+ /* compute number of entries in the file */
+ /* ONREPL - to improve performance, we keep entry count in memory
+ and write it down during shutdown. Problem: this will not
+ work with multiple processes. Do we have to worry about that?
+ */
+ if (s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL)
+ {
+ rc = _cl5GetEntryCount (*dbFile);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+ "_cl5NewDBFile: failed to get entry count\n");
+ goto done;
+ }
+ }
+
+done:
+ if (rc != CL5_SUCCESS)
+ {
+ if (dbFile)
+ _cl5DBCloseFile ((void**)dbFile);
+ /* slapi_ch_free accepts NULL pointer */
+ slapi_ch_free ((void**)&name);
+
+ slapi_ch_free ((void**)dbFile);
+ }
+
+ return rc;
+}
+
+static void _cl5DBCloseFile (void **data)
+{
+ CL5DBFile *file;
+ char fullpathname[MAXPATHLEN];
+
+ PR_ASSERT (data);
+
+ file = *(CL5DBFile**)data;
+
+ /* close the file */
+ /* if this is normal close or close after import, update entry count */
+ if ((s_cl5Desc.dbOpenMode == CL5_OPEN_NORMAL && s_cl5Desc.dbState == CL5_STATE_CLOSING) ||
+ s_cl5Desc.dbOpenMode == CL5_OPEN_LDIF2CL)
+ {
+ _cl5WriteEntryCount (file);
+ _cl5WriteRUV (file, PR_TRUE);
+ _cl5WriteRUV (file, PR_FALSE);
+ }
+
+ /* close file */
+ if (file->db)
+ file->db->close(file->db, 0);
+
+ if (file->flags & DB_FILE_DELETED)
+ {
+ PR_snprintf(fullpathname, MAXPATHLEN, "%s/%s", s_cl5Desc.dbDir, file->name);
+ if (PR_Delete(fullpathname) != PR_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5DBCloseFile: "
+ "failed to remove (%s) file; NSPR error - %d\n", file->name, PR_GetError ());
+
+ }
+ }
+
+ /* slapi_ch_free accepts NULL pointer */
+ slapi_ch_free ((void**)&file->name);
+ slapi_ch_free ((void**)&file->replName);
+ slapi_ch_free ((void**)&file->replGen);
+ if (file->sema) {
+ PR_CloseSemaphore (file->sema);
+ PR_DeleteSemaphore (file->semaName);
+ file->sema = NULL;
+ }
+ slapi_ch_free ((void**)&file->semaName);
+
+ slapi_ch_free (data);
+}
+
+static int _cl5GetDBFile (Object *replica, Object **obj)
+{
+ char *fileName;
+
+ PR_ASSERT (replica && obj);
+
+ fileName = _cl5Replica2FileName (replica);
+
+ *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
+ slapi_ch_free ((void**)&fileName);
+ if (*obj)
+ {
+ return CL5_SUCCESS;
+ }
+ else
+ {
+ return CL5_NOTFOUND;
+ }
+}
+
+static int _cl5GetDBFileByReplicaName (const char *replName, const char *replGen,
+ Object **obj)
+{
+ char *fileName;
+
+ PR_ASSERT (replName && replGen && obj);
+
+ fileName = _cl5MakeFileName (replName, replGen);
+
+ *obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
+ slapi_ch_free ((void**)&fileName);
+ if (*obj)
+ {
+ return CL5_SUCCESS;
+ }
+ else
+ {
+ return CL5_NOTFOUND;
+ }
+}
+
+static void _cl5DBDeleteFile (Object *obj)
+{
+ CL5DBFile *file;
+
+ PR_ASSERT (obj);
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+ file->flags |= DB_FILE_DELETED;
+ objset_remove_obj(s_cl5Desc.dbFiles, obj);
+ object_release (obj);
+}
+
+static void _cl5DBFileInitialized (Object *obj)
+{
+ CL5DBFile *file;
+
+ PR_ASSERT (obj);
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+ file->flags |= DB_FILE_INIT;
+}
+
+static int _cl5CompareDBFile (Object *el1, const void *el2)
+{
+ CL5DBFile *file;
+ const char *name;
+
+ PR_ASSERT (el1 && el2);
+
+ file = (CL5DBFile*) object_get_data (el1);
+ name = (const char*) el2;
+ return ((file->flags & DB_FILE_INIT) ? strcmp (file->name, name) : 1);
+}
+
+static int _cl5CopyDBFiles (const char *srcDir, const char *destDir, Object **replicas)
+{
+ char srcFile [MAXPATHLEN + 1];
+ char destFile[MAXPATHLEN + 1];
+ int rc;
+ Object *obj;
+ CL5DBFile *file;
+
+ /* ONREPL currently, dbidlist is ignored because db code can't handle discrepancy between
+ transaction log and present files; this should change before 5.0 ships */
+ obj = objset_first_obj (s_cl5Desc.dbFiles);
+ while (obj)
+ {
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ PR_snprintf(srcFile, MAXPATHLEN, "%s/%s", srcDir, file->name);
+ PR_snprintf(destFile, MAXPATHLEN, "%s/%s", destDir, file->name);
+ rc = copyfile(srcFile, destFile, 0, FILE_CREATE_MODE);
+ if (rc != 0)
+ {
+ object_release (obj);
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5CopyDBFiles: failed to copy %s from %s to %s\n",
+ file, srcDir, destDir);
+ return CL5_SYSTEM_ERROR;
+ }
+
+ obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
+ }
+
+ return CL5_SUCCESS;
+}
+
+/*
+ * return 1: true (the "filename" ends with "ext")
+ * return 0: false
+ */
+static int _cl5FileEndsWith(const char *filename, const char *ext)
+{
+ char *p = NULL;
+ int flen = strlen(filename);
+ int elen = strlen(ext);
+ if (0 == flen || 0 == elen)
+ {
+ return 0;
+ }
+ p = strstr(filename, ext);
+ if (NULL == p)
+ {
+ return 0;
+ }
+ if (p - filename + elen == flen)
+ {
+ return 1;
+ }
+ return 0;
+}
+
+static int _cl5ExportFile (PRFileDesc *prFile, Object *obj)
+{
+ int rc;
+ void *iterator = NULL;
+ slapi_operation_parameters op = {0};
+ char *buff;
+ PRInt32 len, wlen;
+ CL5Entry entry;
+ CL5DBFile *file;
+
+ PR_ASSERT (prFile && obj);
+
+ file = (CL5DBFile*)object_get_data (obj);
+ PR_ASSERT (file);
+
+ ruv_dump (file->purgeRUV, "clpurgeruv", prFile);
+ ruv_dump (file->maxRUV, "clmaxruv", prFile);
+ slapi_write_buffer (prFile, "\n", strlen("\n"));
+
+ entry.op = &op;
+ rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL);
+ while (rc == CL5_SUCCESS)
+ {
+ rc = _cl5Operation2LDIF (&op, file->replGen, &buff, &len);
+ if (rc != CL5_SUCCESS)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5ExportLDIF: failed to convert operation to ldif\n");
+ operation_parameters_done (&op);
+ break;
+ }
+
+ wlen = slapi_write_buffer (prFile, buff, len);
+ slapi_ch_free((void **)&buff);
+ if (wlen < len)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5ExportLDIF: failed to write to ldif file\n");
+ rc = CL5_SYSTEM_ERROR;
+ operation_parameters_done (&op);
+ break;
+ }
+
+ cl5_operation_parameters_done (&op);
+
+ rc = _cl5GetNextEntry (&entry, iterator);
+ }
+
+ cl5_operation_parameters_done (&op);
+
+ if (iterator)
+ cl5DestroyIterator (iterator);
+
+ if (rc != CL5_NOTFOUND)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "_cl5ExportLDIF: failed to retrieve changelog entry\n");
+ }
+ else
+ {
+ rc = CL5_SUCCESS;
+ }
+
+ return rc;
+}
+
+static PRBool _cl5ReplicaInList (Object *replica, Object **replicas)
+{
+ int i;
+
+ PR_ASSERT (replica && replicas);
+
+ /* ONREPL I think it should be sufficient to just compare replica pointers */
+ for (i=0; replicas[i]; i++)
+ {
+ if (replica == replicas[i])
+ return PR_TRUE;
+ }
+
+ return PR_FALSE;
+}
+
+static char* _cl5GetHelperEntryKey (int type, char *csnStr)
+{
+ CSN *csn= csn_new();
+ char *rt;
+
+ csn_set_time(csn, type);
+ csn_set_replicaid(csn, 0);
+
+ rt = csn_as_string(csn, PR_FALSE, csnStr);
+ csn_free(&csn);
+
+ return rt;
+}
+
+static Object* _cl5GetReplica (const slapi_operation_parameters *op, const char* replGen)
+{
+ Slapi_DN *sdn;
+ Object *replObj;
+ Replica *replica;
+ char *newGen;
+
+ PR_ASSERT (op && replGen);
+
+ sdn = slapi_sdn_new_dn_byref(op->target_address.dn);
+
+ replObj = replica_get_replica_from_dn (sdn);
+ if (replObj)
+ {
+ /* check to see if replica generation has not change */
+ replica = (Replica*)object_get_data (replObj);
+ PR_ASSERT (replica);
+ newGen = replica_get_generation (replica);
+ PR_ASSERT (newGen);
+ if (strcmp (replGen, newGen) != 0)
+ {
+ object_release (replObj);
+ replObj = NULL;
+ }
+
+ slapi_ch_free ((void**)&replGen);
+ }
+
+ slapi_sdn_free (&sdn);
+
+ return replObj;
+}
+
+int
+cl5_is_diskfull()
+{
+ int rc;
+ PR_Lock(cl5_diskfull_lock);
+ rc = cl5_diskfull_flag;
+ PR_Unlock(cl5_diskfull_lock);
+ return rc;
+}
+
+static void
+cl5_set_diskfull()
+{
+ PR_Lock(cl5_diskfull_lock);
+ cl5_diskfull_flag = 1;
+ PR_Unlock(cl5_diskfull_lock);
+}
+
+static void
+cl5_set_no_diskfull()
+{
+ PR_Lock(cl5_diskfull_lock);
+ cl5_diskfull_flag = 0;
+ PR_Unlock(cl5_diskfull_lock);
+}
+
+int
+cl5_diskspace_is_available()
+{
+ int rval = 1;
+
+#if defined( OS_solaris ) || defined( hpux )
+ struct statvfs fsbuf;
+ if (statvfs(s_cl5Desc.dbDir, &fsbuf) < 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5_diskspace_is_available: Cannot get file system info\n");
+ rval = 0;
+ }
+ else
+ {
+ unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_frsize;
+ if (fsiz < NO_DISK_SPACE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5_diskspace_is_available: No enough diskspace for changelog: (%u bytes free)\n", fsiz);
+ rval = 0;
+ }
+ else if (fsiz > MIN_DISK_SPACE)
+ {
+ /* assume recovered */
+ cl5_set_no_diskfull();
+ }
+ }
+#endif
+#if defined( linux )
+ struct statfs fsbuf;
+ if (statfs(s_cl5Desc.dbDir, &fsbuf) < 0)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5_diskspace_is_available: Cannot get file system info\n");
+ rval = 0;
+ }
+ else
+ {
+ unsigned long fsiz = fsbuf.f_bavail * fsbuf.f_bsize;
+ if (fsiz < NO_DISK_SPACE)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+ "cl5_diskspace_is_available: No enough diskspace for changelog: (%u bytes free)\n", fsiz);
+ rval = 0;
+ }
+ else if (fsiz > MIN_DISK_SPACE)
+ {
+ /* assume recovered */
+ cl5_set_no_diskfull();
+ }
+ }
+#endif
+ return rval;
+}