# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2011 OpenStack Foundation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ Time related utilities and helper functions. """ import calendar import datetime import iso8601 import six # ISO 8601 extended time format with microseconds _ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f' _ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S' PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND def isotime(at=None, subsecond=False): """Stringify time in ISO 8601 format.""" if not at: at = utcnow() st = at.strftime(_ISO8601_TIME_FORMAT if not subsecond else _ISO8601_TIME_FORMAT_SUBSECOND) tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' st += ('Z' if tz == 'UTC' else tz) return st def parse_isotime(timestr): """Parse time from ISO 8601 format.""" try: return iso8601.parse_date(timestr) except iso8601.ParseError as e: raise ValueError(e.message) except TypeError as e: raise ValueError(e.message) def strtime(at=None, fmt=PERFECT_TIME_FORMAT): """Returns formatted utcnow.""" if not at: at = utcnow() return at.strftime(fmt) def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): """Turn a formatted time back into a datetime.""" return datetime.datetime.strptime(timestr, fmt) def normalize_time(timestamp): """Normalize time in arbitrary timezone to UTC naive object.""" offset = timestamp.utcoffset() if offset is None: return timestamp return timestamp.replace(tzinfo=None) - offset def is_older_than(before, seconds): """Return True if before is older than seconds.""" if isinstance(before, six.string_types): before = parse_strtime(before).replace(tzinfo=None) return utcnow() - before > datetime.timedelta(seconds=seconds) def is_newer_than(after, seconds): """Return True if after is newer than seconds.""" if isinstance(after, six.string_types): after = parse_strtime(after).replace(tzinfo=None) return after - utcnow() > datetime.timedelta(seconds=seconds) def utcnow_ts(): """Timestamp version of our utcnow function.""" return calendar.timegm(utcnow().timetuple()) def utcnow(): """Overridable version of utils.utcnow.""" if utcnow.override_time: try: return utcnow.override_time.pop(0) except AttributeError: return utcnow.override_time return datetime.datetime.utcnow() def iso8601_from_timestamp(timestamp): """Returns a iso8601 formated date from timestamp.""" return isotime(datetime.datetime.utcfromtimestamp(timestamp)) utcnow.override_time = None def set_time_override(override_time=datetime.datetime.utcnow()): """Overrides utils.utcnow. Make it return a constant time or a list thereof, one at a time. """ utcnow.override_time = override_time def advance_time_delta(timedelta): """Advance overridden time using a datetime.timedelta.""" assert(not utcnow.override_time is None) try: for dt in utcnow.override_time: dt += timedelta except TypeError: utcnow.override_time += timedelta def advance_time_seconds(seconds): """Advance overridden time by seconds.""" advance_time_delta(datetime.timedelta(0, seconds)) def clear_time_override(): """Remove the overridden time.""" utcnow.override_time = None def marshall_now(now=None): """Make an rpc-safe datetime with microseconds. Note: tzinfo is stripped, but not required for relative times. """ if not now: now = utcnow() return dict(day=now.day, month=now.month, year=now.year, hour=now.hour, minute=now.minute, second=now.second, microsecond=now.microsecond) def unmarshall_time(tyme): """Unmarshall a datetime dict.""" return datetime.datetime(day=tyme['day'], month=tyme['month'], year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], second=tyme['second'], microsecond=tyme['microsecond']) def delta_seconds(before, after): """Return the difference between two timing objects. Compute the difference in seconds between two date, time, or datetime objects (as a float, to microsecond resolution). """ delta = after - before try: return delta.total_seconds() except AttributeError: return ((delta.days * 24 * 3600) + delta.seconds + float(delta.microseconds) / (10 ** 6)) def is_soon(dt, window): """Determines if time is going to happen in the next window seconds. :params dt: the time :params window: minimum seconds to remain to consider the time not soon :return: True if expiration is within the given duration """ soon = (utcnow() + datetime.timedelta(seconds=window)) return normalize_time(dt) <= soon