summaryrefslogtreecommitdiffstats
path: root/ldap/servers/plugins/replication/repl5_schedule.c
diff options
context:
space:
mode:
Diffstat (limited to 'ldap/servers/plugins/replication/repl5_schedule.c')
-rw-r--r--ldap/servers/plugins/replication/repl5_schedule.c742
1 files changed, 742 insertions, 0 deletions
diff --git a/ldap/servers/plugins/replication/repl5_schedule.c b/ldap/servers/plugins/replication/repl5_schedule.c
new file mode 100644
index 00000000..427e79a9
--- /dev/null
+++ b/ldap/servers/plugins/replication/repl5_schedule.c
@@ -0,0 +1,742 @@
+/** BEGIN COPYRIGHT BLOCK
+ * Copyright 2001 Sun Microsystems, Inc.
+ * Portions copyright 1999, 2001-2003 Netscape Communications Corporation.
+ * All rights reserved.
+ * END COPYRIGHT BLOCK **/
+/* repl5_schedule.c */
+/*
+
+The schedule object implements the scheduling policy for a DS 5.0 replication
+supplier.
+
+Methods:
+schedule_set() - sets the schedule
+schedule_get() - gets the schedule
+schedule_in_window_now() - returns TRUE if a replication session
+ should commence.
+schedule_next() - returns the next time that replication is
+ scheduled to occur.
+schedule_notify() - called to inform the scheduler when entries
+ have been updated.
+schedule_set_priority_attributes() - sets the attributes that are
+ considered "high priority". A modification to one of these attributes
+ will cause replication to commence asap, overriding the startup
+ delay and maximum backlog. Also includes an additional parameter
+ that controls whether priority attributes are propagated regardless
+ of the scheduling window, e.g. it's possible to configure things
+ so that password changes get propagated even if we're not in a
+ replication window.
+schedule_set_startup_delay() - sets the time that replication should
+ wait before commencing replication sessions.
+schedule_set_maximum_backlog() - sets the maximum number of updates
+ which can occur before replication will commence. If the backlog
+ threshhold is exceeded, then replication will commence ASAP,
+ overriding the startup delay.
+
+*/
+
+/* ONREPL - I made a simplifying assumption that a schedule item does not
+ cross day boundaries. Implementing this is hard because we search
+ for the items for a particular day only based on the item's staring time.
+ For instance if the current time is tuesday morning, we would not consider
+ the item that started on monday and continued through tuesday.
+ To simulate an item that crosses day boundaries, you can create 2 items -
+ one for the time in the first day and one for the time in the second.
+ We could do this internally by allowing items do span 2 days and
+ splitting them ourselves. This, however, is not currently implemented */
+
+#include "slapi-plugin.h"
+#include "repl5.h"
+
+#include <ctype.h> /* For isdigit() */
+
+/* from proto-slap.h */
+char *get_timestring(time_t *t);
+void free_timestring(char *timestr);
+
+typedef struct schedule_item {
+ struct schedule_item *next;
+ PRUint32 start; /* Start time, given as seconds after midnight */
+ PRUint32 end; /* End time */
+ unsigned char dow; /* Days of week, LSB = Sunday */
+} schedule_item;
+
+typedef struct schedule {
+ const char *session_id;
+ size_t max_backlog;
+ size_t startup_delay;
+ schedule_item *schedule_list; /* Linked list of schedule windows */
+ char **prio_attrs; /* Priority attributes - start replication now */
+ int prio_attrs_override_schedule;
+ PRTime last_session_end;
+ int last_session_status;
+ PRTime last_successful_session_end;
+ window_state_change_callback callback_fn; /* function to call when window opens/closes */
+ void *callback_arg; /* argument to pass to the window state change callback */
+ Slapi_Eq_Context pending_event; /* event scheduled with the event queue */
+ PRLock *lock;
+} schedule;
+
+/* Forward declarations */
+static schedule_item *parse_schedule_value(const Slapi_Value *v);
+static void schedule_window_state_change_event (Schedule *sch);
+static void unschedule_window_state_change_event (Schedule *sch);
+static void window_state_changed (time_t when, void *arg);
+static int schedule_in_window_now_nolock(Schedule *sch);
+static schedule_item* get_current_schedule_item (Schedule *sch);
+static time_t PRTime2time_t (PRTime tm);
+static PRTime schedule_next_nolock (Schedule *sch, PRBool start);
+static void free_schedule_list(schedule_item **schedule_list);
+
+#define SECONDS_PER_MINUTE 60
+#define SECONDS_PER_HOUR (60 * SECONDS_PER_MINUTE)
+#define SECONDS_PER_DAY (24 * SECONDS_PER_HOUR)
+#define DAYS_PER_WEEK 7
+#define ALL_DAYS 0x7F /* Bit mask */
+
+
+
+/*
+ * Create a new schedule object and return a pointer to it.
+ */
+Schedule*
+schedule_new(window_state_change_callback callback_fn, void *callback_arg, const char *session_id)
+{
+ Schedule *sch = NULL;
+ sch = (Schedule *)slapi_ch_calloc(1, sizeof(struct schedule));
+
+ sch->session_id = session_id ? session_id : "";
+ sch->callback_fn = callback_fn;
+ sch->callback_arg = callback_arg;
+
+ if ((sch->lock = PR_NewLock()) == NULL)
+ {
+ slapi_ch_free((void **)&sch);
+ }
+
+ return sch;
+}
+
+
+void
+schedule_destroy(Schedule *s)
+{
+ int i;
+
+ /* unschedule update window event if exists */
+ unschedule_window_state_change_event (s);
+
+ if (s->schedule_list)
+ {
+ free_schedule_list (&s->schedule_list);
+ }
+
+ if (NULL != s->prio_attrs)
+ {
+ for (i = 0; NULL != s->prio_attrs[i]; i++)
+ {
+ slapi_ch_free((void **)&(s->prio_attrs[i]));
+ }
+ slapi_ch_free((void **)&(s->prio_attrs));
+ }
+ PR_DestroyLock(s->lock);
+ s->lock = NULL;
+ slapi_ch_free((void **)&s);
+}
+
+static void
+free_schedule_list(schedule_item **schedule_list)
+{
+ schedule_item *si = *schedule_list;
+ schedule_item *tmp_si;
+ while (NULL != si)
+ {
+ tmp_si = si->next;
+ slapi_ch_free((void **)&si);
+ si = tmp_si;
+ }
+ *schedule_list = NULL;
+}
+
+
+
+/*
+ * Sets the schedule. Returns 0 if all of the schedule lines were
+ * correctly parsed and the new schedule has been put into effect.
+ * Returns -1 if one or more of the schedule items could not be
+ * parsed. If -1 is returned, then no changes have been made to the
+ * current schedule.
+ */
+int
+schedule_set(Schedule *sch, Slapi_Attr *attr)
+{
+ int return_value;
+ schedule_item *si = NULL;
+ schedule_item *new_schedule_list = NULL;
+ int valid = 1;
+
+ if (NULL != attr)
+ {
+ int ind;
+ Slapi_Value *sval;
+ ind = slapi_attr_first_value(attr, &sval);
+ while (ind >= 0)
+ {
+ si = parse_schedule_value(sval);
+ if (NULL == si)
+ {
+ valid = 0;
+ break;
+ }
+ /* Put at head of linked list */
+ si->next = new_schedule_list;
+ new_schedule_list = si;
+ ind = slapi_attr_next_value(attr, ind, &sval);
+ }
+ }
+
+ if (!valid)
+ {
+ /* deallocate any new schedule items */
+ free_schedule_list(&new_schedule_list);
+ return_value = -1;
+ }
+ else
+ {
+ PR_Lock(sch->lock);
+
+ /* if there is an update window event scheduled - unschedule it */
+ unschedule_window_state_change_event (sch);
+
+ free_schedule_list(&sch->schedule_list);
+ sch->schedule_list = new_schedule_list;
+
+ /* schedule an event to notify the caller about openning/closing of the update window */
+ schedule_window_state_change_event (sch);
+
+ PR_Unlock(sch->lock);
+ return_value = 0;
+ }
+ return return_value;
+}
+
+
+
+/*
+ * Returns the schedule.
+ */
+char **
+schedule_get(Schedule *sch)
+{
+ char **return_value = NULL;
+
+ return return_value;
+}
+
+
+
+/*
+ * Return an integer corresponding to the day of the week for
+ * "when".
+ */
+static PRInt32
+day_of_week(PRTime when)
+{
+
+ PRExplodedTime exp;
+
+ PR_ExplodeTime(when, PR_LocalTimeParameters, &exp);
+ return(exp.tm_wday);
+}
+
+
+/*
+ * Return the number of seconds between "when" and the
+ * most recent midnight.
+ */
+static PRUint32
+seconds_since_midnight(PRTime when)
+{
+ PRExplodedTime exp;
+
+ PR_ExplodeTime(when, PR_LocalTimeParameters, &exp);
+ return(exp.tm_hour * 3600 + exp.tm_min * 60 + exp.tm_sec);
+}
+
+
+/*
+ * Return 1 if "now" is within the schedule window
+ * specified by "si", 0 otherwise.
+ */
+static int
+time_in_window(PRTime now, schedule_item *si)
+{
+ unsigned char dow = 1 << day_of_week(now);
+ int return_value = 0;
+
+ if (dow & si->dow)
+ {
+ PRUint32 nowsec = seconds_since_midnight(now);
+
+ return_value = (nowsec >= si->start) && (nowsec <= si->end);
+ }
+
+ return return_value;
+}
+
+
+
+/*
+ * Returns a non-zero value if the current time is within a
+ * replication window and if scheduling constraints are all met.
+ * Otherwise, returns zero.
+ */
+
+int
+schedule_in_window_now (Schedule *sch)
+{
+ int rc;
+
+ PR_ASSERT(NULL != sch);
+ PR_Lock(sch->lock);
+
+ rc = schedule_in_window_now_nolock(sch);
+
+ PR_Unlock(sch->lock);
+
+ return rc;
+}
+
+/* Must be called under sch->lock */
+static int
+schedule_in_window_now_nolock(Schedule *sch)
+{
+ int return_value = 0;
+
+ if (NULL == sch->schedule_list)
+ {
+ /* Absence of a schedule is the same as 0000-2359 0123456 */
+ return_value = 1;
+ }
+ else
+ {
+ schedule_item *si = sch->schedule_list;
+ PRTime now;
+ now = PR_Now();
+ while (NULL != si)
+ {
+ if (time_in_window(now, si))
+ {
+ /* XXX check backoff timers??? */
+ return_value = 1;
+ break;
+ }
+ si = si->next;
+ }
+ }
+
+ return return_value;
+}
+
+
+
+/*
+ * Calculate the next time (expressed as a PRTime) when this
+ * schedule item will change state (from open to close or vice versa).
+ */
+static PRTime
+next_change_time(schedule_item *si, PRTime now, PRBool start)
+{
+ PRUint32 nowsec = seconds_since_midnight(now);
+ PRUint32 sec_til_change;
+ PRUint32 change_time;
+ PRExplodedTime exp;
+ PRInt32 dow = day_of_week(now);
+ unsigned char dow_bit = 1 << dow;
+ unsigned char next_dow;
+
+ if (start) /* we are looking for the next window opening */
+ {
+ change_time = si->start;
+ }
+ else /* we are looking for the next window closing */
+ {
+ /* open range is inclusive - so we need to add a minute if we are looking for close time */
+ change_time = si->end + SECONDS_PER_MINUTE;
+ }
+
+ /* we are replicating today and next change is also today */
+ if ((dow_bit & si->dow) && (nowsec < change_time))
+ {
+ sec_til_change = change_time - nowsec;
+ }
+ else /* not replicating today or the change already occured today */
+ {
+ int i;
+
+ /* find next day when we replicate */
+ for (i = 1; i <= DAYS_PER_WEEK; i++)
+ {
+ next_dow = 1 << ((dow + i) % DAYS_PER_WEEK);
+ if (next_dow & si->dow)
+ break;
+ }
+
+ sec_til_change = change_time + i * SECONDS_PER_DAY - nowsec;
+ }
+
+ PR_ExplodeTime(now, PR_LocalTimeParameters, &exp);
+ exp.tm_sec += sec_til_change;
+
+
+ PR_NormalizeTime(&exp, PR_LocalTimeParameters);
+ return PR_ImplodeTime(&exp);
+}
+
+
+
+/*
+ * Returns the next time that replication is scheduled to occur.
+ * Returns 0 if there is no time in the future that replication
+ * will begin (e.g. there's no schedule at all).
+ */
+PRTime
+schedule_next(Schedule *sch)
+{
+ PRTime tm;
+
+ PR_ASSERT(NULL != sch);
+ PR_Lock(sch->lock);
+
+ tm = schedule_next_nolock (sch, PR_TRUE);
+
+ PR_Unlock(sch->lock);
+
+ return tm;
+}
+
+/* Must be called under sch->lock */
+static PRTime
+schedule_next_nolock (Schedule *sch, PRBool start)
+{
+
+ PRTime closest_time = LL_Zero();
+
+ if (NULL != sch->schedule_list)
+ {
+ schedule_item *si = sch->schedule_list;
+ PRTime now = PR_Now();
+ unsigned char dow = 1 << day_of_week(now);
+
+ while (NULL != si)
+ {
+ PRTime tmp_time;
+
+ /* Check if this item's change time is sooner than the others */
+ tmp_time = next_change_time(si, now, start);
+ if (LL_IS_ZERO(closest_time))
+ {
+ LL_ADD(closest_time, tmp_time, LL_Zero()); /* Really just an asignment */
+ }
+ else if (LL_CMP(tmp_time, <, closest_time))
+ {
+ LL_ADD(closest_time, tmp_time, LL_Zero()); /* Really just an asignment */
+ }
+
+ si = si->next;
+ }
+ }
+
+ return closest_time;
+}
+
+
+
+
+/*
+ * Called by the enclosing object (replsupplier) when a change within the
+ * replicated area has occurred. This allows the scheduler to update its
+ * internal counters, timers, etc. Returns a non-zero value if replication
+ * should commence, zero if it should not.
+ */
+int
+schedule_notify(Schedule *sch, Slapi_PBlock *pb)
+{
+ int return_value = 0;
+
+ return return_value;
+}
+
+
+
+
+/*
+ * Provide a list of attributes which, if changed,
+ * will cause replication to commence as soon as possible. There
+ * is also a flag that tells the scheduler if the update of a
+ * priority attribute should cause the schedule to be overridden,
+ * e.g. if the administrator wants password changes to propagate
+ * even if not in a replication window.
+ *
+ * This function consumes "prio_attrs" and assumes management
+ * of the memory.
+ */
+void
+schedule_set_priority_attributes(Schedule *sch, char **prio_attrs, int override_schedule)
+{
+ PR_ASSERT(NULL != sch);
+ PR_Lock(sch->lock);
+ if (NULL != sch->prio_attrs)
+ {
+ int i;
+ for (i = 0; NULL != prio_attrs[i]; i++) {
+ slapi_ch_free((void **)&sch->prio_attrs[i]);
+ }
+ slapi_ch_free((void **)&sch->prio_attrs);
+ }
+ sch->prio_attrs = prio_attrs;
+ sch->prio_attrs_override_schedule = override_schedule;
+
+ PR_Unlock(sch->lock);
+}
+
+
+
+
+
+/*
+ * Set the time, in seconds, that replication will wait after a change is
+ * available before propagating it. This capability will allow multiple
+ * updates to be coalesced into a single replication session.
+ */
+void
+schedule_set_startup_delay(Schedule *sch, size_t startup_delay)
+{
+ PR_ASSERT(NULL != sch);
+ PR_Lock(sch->lock);
+ sch->startup_delay = startup_delay;
+ PR_Unlock(sch->lock);
+}
+
+
+
+
+
+/*
+ * Set the maximum number of pending changes allowed to accumulate
+ * before a replication session is begun.
+ */
+void
+schedule_set_maximum_backlog(Schedule *sch, size_t max_backlog)
+{
+ PR_ASSERT(NULL != sch);
+ PR_Lock(sch->lock);
+ sch->max_backlog = max_backlog;
+ PR_Unlock(sch->lock);
+}
+
+
+
+
+
+/*
+ * Notify the scheduler that a replication session completed at a certain
+ * time. There is also a status argument that says more about the session's
+ * termination (normal, abnormal), which the scheduler uses in determining
+ * the backoff strategy.
+ */
+void
+schedule_notify_session(Schedule *sch, PRTime session_end_time, unsigned int status)
+{
+ PR_ASSERT(NULL != sch);
+ PR_Lock(sch->lock);
+ sch->last_session_end = session_end_time;
+ sch->last_session_status = status;
+ if (REPLICATION_SESSION_SUCCESS == status)
+ {
+ sch->last_successful_session_end = session_end_time;
+ }
+ PR_Unlock(sch->lock);
+}
+
+/* schedule an event that will fire the next time the update window state
+ changes from open to closed or vice versa */
+static void
+schedule_window_state_change_event (Schedule *sch)
+{
+ time_t wakeup_time;
+ PRTime tm;
+ int window_opened;
+ char *timestr = NULL;
+
+ /* if we have a schedule and a callback function is registerd -
+ register an event with the event queue */
+ if (sch->schedule_list && sch->callback_fn)
+ {
+ /* ONREPL what if the window is really small and by the time we are done
+ with the computation - we cross window boundary.
+ I think we should put some constrains on schedule to avoid that */
+
+ window_opened = schedule_in_window_now_nolock(sch);
+
+ tm = schedule_next_nolock(sch, !window_opened);
+
+ wakeup_time = PRTime2time_t (tm);
+
+ /* schedule the event */
+ sch->pending_event = slapi_eq_once(window_state_changed, sch, wakeup_time);
+
+ timestr = get_timestring(&wakeup_time);
+ slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name, "%s: Update window will %s at %s\n",
+ sch->session_id,
+ window_opened ? "close" : "open", timestr);
+ free_timestring(timestr);
+ timestr = NULL;
+ }
+}
+
+/* this function is called by the even queue the next time
+ the window is opened or closed */
+static void
+window_state_changed (time_t when, void *arg)
+{
+ Schedule *sch = (Schedule*)arg;
+ int open;
+
+ PR_ASSERT (sch);
+
+ PR_Lock(sch->lock);
+
+ open = schedule_in_window_now_nolock(sch);
+
+ slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s: Update window is now %s\n",
+ sch->session_id,
+ open ? "open" : "closed");
+
+ /* schedule next event */
+ schedule_window_state_change_event (sch);
+
+ /* notify the agreement */
+ sch->callback_fn (sch->callback_arg, open);
+
+ PR_Unlock(sch->lock);
+}
+
+/* cancel the event registered with the event queue */
+static void
+unschedule_window_state_change_event (Schedule *sch)
+{
+ if (sch->pending_event)
+ {
+ slapi_eq_cancel(sch->pending_event);
+ sch->pending_event = NULL;
+ }
+}
+
+static time_t
+PRTime2time_t (PRTime tm)
+{
+ PRInt64 rt;
+
+ PR_ASSERT (tm);
+
+ LL_DIV(rt, tm, PR_USEC_PER_SEC);
+
+ return (time_t)rt;
+}
+
+/*
+ * Parse a schedule line.
+ * The format is:
+ * <start>-<end> <day_of_week>
+ * <start> and <end> are in 24-hour time
+ * <day_of_week> is like cron(5): 0 = Sunday, 1 = Monday, etc.
+ *
+ * The schedule item "*" is equivalen to 0000-2359 0123456
+ *
+ * Returns a pointer to a schedule item on success, NULL if the
+ * schedule item cannot be parsed.
+ */
+static schedule_item *
+parse_schedule_value(const Slapi_Value *v)
+{
+#define RANGE_VALID(p, limit) \
+ ((p + 9) < limit && \
+ isdigit(p[0]) && \
+ isdigit(p[1]) && \
+ isdigit(p[2]) && \
+ isdigit(p[3]) && \
+ ('-' == p[4]) && \
+ isdigit(p[5]) && \
+ isdigit(p[6]) && \
+ isdigit(p[7]) && \
+ isdigit(p[8]))
+
+ schedule_item *si = NULL;
+ int valid = 0;
+ const struct berval *sch_bval;
+
+ if (NULL != v && (sch_bval = slapi_value_get_berval(v)) != NULL &&
+ NULL != sch_bval && sch_bval->bv_len > 0 && NULL != sch_bval->bv_val )
+ {
+ char *p = sch_bval->bv_val;
+ char *limit = p + sch_bval->bv_len;
+
+ si = (schedule_item *)slapi_ch_malloc(sizeof(schedule_item));
+ si->next = NULL;
+ si->start = 0UL;
+ si->end = SECONDS_PER_DAY;
+ si->dow = ALL_DAYS;
+
+ if (*p == '*')
+ {
+ valid = 1;
+ goto done;
+ }
+ else
+ {
+ if (RANGE_VALID(p, limit))
+ {
+ si->start = ((strntoul(p, 2, 10) * 60) +
+ strntoul(p + 2, 2, 10)) * 60;
+ p += 5;
+ si->end = ((strntoul(p, 2, 10) * 60) +
+ strntoul(p + 2, 2, 10)) * 60;
+ p += 4;
+
+ /* ONREPL - for now wi don't allow items that span multiple days.
+ See note in the beginning of the file for more details. */
+ /* ONREPL - we should also decide on the minimum of the item size */
+ if (si->start > si->end)
+ {
+ valid = 0;
+ goto done;
+ }
+
+ if (p < limit && ' ' == *p)
+ {
+ /* Specific days of week */
+ si->dow = 0;
+ while (++p < limit)
+ {
+ if (!isdigit(*p))
+ {
+ valid = 0;
+ goto done;
+ }
+ si->dow |= (1 << strntoul(p, 1, 10));
+
+ }
+ }
+ valid = 1;
+ }
+ }
+ }
+
+done:
+ if (!valid)
+ {
+ slapi_ch_free((void **)&si);
+ }
+ return si;
+}