summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJan Safranek <jsafrane@redhat.com>2013-04-19 16:59:00 +0200
committerJan Safranek <jsafrane@redhat.com>2013-04-19 16:59:00 +0200
commite229f6f4752c30c4fb48c429174f81bbda02242d (patch)
tree06dc391a15b15b48f02f910f71c37a609062035d
parentb2e4ff58fca7073394f633da353c7332beee4f88 (diff)
parent105b143df8e5d84e8ccd76eacb5896a24d318b11 (diff)
downloadopenlmi-providers-e229f6f4752c30c4fb48c429174f81bbda02242d.tar.gz
openlmi-providers-e229f6f4752c30c4fb48c429174f81bbda02242d.tar.xz
openlmi-providers-e229f6f4752c30c4fb48c429174f81bbda02242d.zip
Merge branch 'devel/sync-job'
-rw-r--r--src/python/openlmi/common/JobManager.py228
-rw-r--r--src/python/openlmi/common/TimerManager.py400
-rw-r--r--src/python/openlmi/common/cmpi_logging.py2
3 files changed, 525 insertions, 105 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py
index bb6fed8..5ed7830 100644
--- a/src/python/openlmi/common/JobManager.py
+++ b/src/python/openlmi/common/JobManager.py
@@ -19,10 +19,10 @@
"""
Basic infrastructure for asynchronous jobs. All necessary CIM classes and
indications are implemented here.
-
+
.. autoclass:: JobManager
:members:
-
+
.. autoclass:: Job
:members:
@@ -31,7 +31,7 @@
.. autoclass:: LMI_OwningJobElement
:members:
-
+
.. autoclass:: LMI_AffectedJobElement
:members:
@@ -59,10 +59,10 @@ class Job(object):
It remembers input and output arguments, affected ManagedElements and
owning ManagedElement (to be able to create associations to them)
and all CIM_ConcreteJob properties.
-
- Due to multiple threads processing the job, each job has its own
+
+ Due to multiple threads processing the job, each job has its own
lock to guard its status changes. It is expected that number of jobs
- is quite low.
+ is quite low.
"""
DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds
@@ -84,7 +84,7 @@ class Job(object):
method_name, affected_elements, owning_element):
"""
Create new storage job.
-
+
:param job_manager: (``JobManager``) Reference to ``JobManager``, which
will manage this job.
:param job_name: (``string``) User-friendly name of the job.
@@ -100,6 +100,7 @@ class Job(object):
created for it.
"""
self.job_manager = job_manager
+ self.timer_manager = job_manager.timer_manager
# Unique ID
self.the_id = job_manager.get_next_id()
@@ -140,16 +141,18 @@ class Job(object):
# State of the job
self.job_state = self.STATE_QUEUED
- # Last change of job state
- self.time_of_last_state_change = self.time_submitted
+ # Last change of job state, wall clock time
+ self.clocktime_of_last_state_change = self.time_submitted
- # Duration of the job in RUNNING state
+ # Duration of the job in RUNNING state (in seconds)
self.elapsed_time = None
- # When the job started (= switched to RUNNING)
- self.start_time = None
- # When the job finished (= switched from RUNNING)
- self.finish_time = None
+ # When the job started (= switched to RUNNING), wall clock time
+ self.start_clocktime = None
+ # When the job started (= switched to RUNNING), monotonic clock time
+ self.start_monotime = None
+ # When the job finished (= switched from RUNNING), monotonic clock time
+ self.finish_monotime = None
# Array of CIMInstanceNames of affected elements, so we can
# enumerate associations to them.
@@ -182,11 +185,11 @@ class Job(object):
expected that the callback will take some time to execute. The callback
must change state of the job and set output parameters and error in a
thread-safe way, i.e. by calling ``finish_method()``.
-
+
:param callback: (``function``) Reference to callback to call.
:param args, kwargs: All other parameters will be passed to the
callback. It is highly recommended to add reference to the job
- to the callback.
+ to the callback.
"""
self._execute = callback
self._execargs = args
@@ -197,11 +200,11 @@ class Job(object):
"""
Set callbacks, which will be called when the job is to be
cancelled. The callback must be quick, the job is already locked!
-
+
:param callback: (``function``) Reference to callback to call.
:param args, kwargs: All other parameters will be passed to the
callback. It is highly recommended to add reference to the job
- to the callback.
+ to the callback.
"""
self._cancel = callback
self._cancelargs = args
@@ -214,7 +217,7 @@ class Job(object):
Mark the job as finished, with given return value, output parameters and
error.
This method is thread-safe.
-
+
:param new_state: (``Job.STATE_* value``) Resulting state of the job.
:param return_value: (``string``) Return value of the job, encoded
into string. Can be None when the job does not return any value.
@@ -246,7 +249,7 @@ class Job(object):
Change state of a job. (Re-)calculate various times based on the state
change. Send indications as necessary.
This method is thread-safe.
-
+
:param new_state: (``Job.STATE_* value``) New state of the job.
It can be the same as the previous state to indicate progress of
the job.
@@ -274,10 +277,12 @@ class Job(object):
if (self.job_state not in self.FINAL_STATES
and new_state in self.FINAL_STATES):
# Remember finish time
- self.finish_time = datetime.utcnow()
+ self.finish_clocktime = datetime.utcnow()
+ self.finish_monotime = self.timer_manager.now()
# Remember job execution time.
- if self.start_time:
- self.elapsed_time = self.finish_time - self.start_time
+ if self.start_monotime:
+ self.elapsed_time = self.finish_monotime \
+ - self.start_monotime
# Send indication
if self.job_state == self.STATE_FAILED:
indication_ids.append(JobManager.IND_JOB_FAILED)
@@ -286,9 +291,10 @@ class Job(object):
# Check if the job has just started
if new_state == self.STATE_RUNNING:
- self.start_time = datetime.utcnow()
+ self.start_clocktime = datetime.utcnow()
+ self.start_monotime = self.timer_manager.now()
- self.time_of_last_state_change = datetime.now()
+ self.clocktime_of_last_state_change = datetime.utcnow()
self.job_state = new_state
if percent is None:
@@ -302,7 +308,7 @@ class Job(object):
if self.percent_complete != percent:
# Remember to send indications
if not send_indication:
- self.time_of_last_state_change = datetime.now()
+ self.clocktime_of_last_state_change = datetime.utcnow()
prev_instance = self.job_manager.get_job_instance(self)
send_indication = True
indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED)
@@ -317,13 +323,15 @@ class Job(object):
self._restart_timer()
self.unlock()
+ @cmpi_logging.trace_method
def _expire(self):
"""
Callback when a Job completes and time_before_removal second passed.
The job gets removed from its JobManager.
"""
- # We cannot log here, this method is executed in job's Timer thread,
- # which is not registered at the cimom.
+ cmpi_logging.logger.debug("Job %s: %s expired"
+ % (self.the_id, self.job_name))
+
self.job_manager.remove_job(self)
@cmpi_logging.trace_method
@@ -342,17 +350,24 @@ class Job(object):
# Start the new timer.
if self.delete_on_completion:
- now = datetime.utcnow()
- passed = now - self.finish_time
- timeout = self.time_before_removal - passed.total_seconds()
+ now = self.timer_manager.now()
+ passed = now - self.finish_monotime
+ timeout = self.time_before_removal - passed
+ if timeout <= 0:
+ # Just in case...
+ self._expire()
+ return
+
cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f"
" seconds" % (self.the_id, self.job_name, timeout))
- self.timer = threading.Timer(timeout, self._expire)
- self.timer.start()
+ self.timer = self.timer_manager.create_timer(
+ "Job " + self.job_name,
+ callback=self._expire)
+ self.timer.start(timeout)
@cmpi_logging.trace_method
def lock(self):
- """
+ """
Lock internal mutex. Other threads will block on subsequent lock().
The lock is recursive, i.e. can be called multiple times from
single thread.
@@ -369,7 +384,7 @@ class Job(object):
"""
Start executing the job. It calls the execute callback, set by
``set_execute_action()``.
-
+
job_state must be already set to STATE_RUNNING.
Any exception is translated to CIMError and appropriate state is set.
"""
@@ -402,7 +417,7 @@ class Job(object):
def get_name(self):
"""
Return CIMInstanceName of the job.
-
+
:rtype: ``CIMInstanceName``
"""
name = pywbem.CIMInstanceName(
@@ -417,7 +432,7 @@ class Job(object):
def get_instance_id(self, classname=None):
"""
Return InstanceID.
-
+
:param classname: (``string``) Optional classname to generate InstanceID
for different class, e.g. for LMI_<name>MethodResult.
:rtype: ``string``
@@ -430,7 +445,7 @@ class Job(object):
def parse_instance_id(instance_id, job_manager, classname=None):
"""
Return the last part of instance_id.
-
+
:param instance_id: (``string``) InstanceID to parse.
:param job_manager: (``JobManager``) JobManager to query for Job's
classname.
@@ -438,7 +453,7 @@ class Job(object):
JobManager's job_classname will be used for parsing. Other
classnames may be used to parse e.g. LMI_<name>MethodResult
InstanceIDs.
-
+
:rtype: ``string`` or None if the ``instance_id`` has wrong format.
"""
if classname is None:
@@ -456,10 +471,10 @@ class Job(object):
@cmpi_logging.trace_method
def get_pre_call(self):
- """
+ """
Return indication that describes the pre-execution values of the
job's invocation.
-
+
:rtype: ``CIMInstance of CIM_InstMethodCall``
"""
path = pywbem.CIMInstanceName(
@@ -481,10 +496,10 @@ class Job(object):
@cmpi_logging.trace_method
def get_post_call(self):
- """
+ """
Return indication that describes the post-execution values of the
job's invocation.
-
+
:rtype: ``CIMInstance of CIM_InstMethodCall``
"""
path = pywbem.CIMInstanceName(
@@ -525,7 +540,7 @@ class Job(object):
def _get_cim_instance(self):
"""
Return CIMInstance of this job.
-
+
:rtype: CIMInstance
"""
return self.job_manager.get_job_instance(self)
@@ -583,44 +598,45 @@ class Job(object):
class JobManager(object):
"""
Container of all queued, running or finished ``LMI_ConcreteJobs``.
-
+
Usage:
1. Create MOF file for these classes:
-
+
* ``LMI_<name>Job``
-
+
* ``LMI_<name>MethodResult``
-
+
* ``LMI_Affected<name>JobElement``
-
+
* ``LMI_Owning<name>JobElement``
-
+
* ``LMI_Associated<name>JobMethodResult``
-
+
Where ``<name>`` is prefix of your classes, for example 'Storage'
- 2. During initialization, create ``JobManager``.
-
+ 2. During initialization, initialize ``TimerManager`` and create
+ ``JobManager``.
+
3. When needed. create new Job instance:
4. Set its execute callback using ``set_execute_action()``. This callback
will be called when the job is to be executed. It will be called in
context of ``JobManager`` worker thread!
-
+
5. Optionally, set cancel callback using ``set_execute_action()``. This
callback will be called when the job is still queued and is cancelled by
application. This callback will be called in context of CIMOM callback
and should be quick!
-
+
6. Enqueue the job using ``JobManager.add_job()`` method.
-
+
7. When your execute callback is called, you can optionally call
``job.change_state()`` to update percentage of completion.
-
+
8. When your execute callback is finished, don't forget to set method
result using ``job.finish_method()``.
-
+
* ``JobManager`` automatically sends all job-related indications.
* ``Job`` automatically tracks various timestamps.
* By default, the job automatically disappears after 60 seconds after it
@@ -636,17 +652,18 @@ class JobManager(object):
IND_JOB_CREATED = "Created"
@cmpi_logging.trace_method
- def __init__(self, name, namespace, indication_manager):
- """
+ def __init__(self, name, namespace, indication_manager, timer_manager):
+ """
Initialize new Manager. It automatically registers all job-related
filters to indication_manager and starts a worker thread.
-
+
:param name: (``string``) String with classname infix. For example
'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult``
etc.
- :param namespace: (``string``) Namespace of all providers.
+ :param namespace: (``string``) Namespace of all providers.
:param indication_manager: (``IndicationManager``): a manager where
- indications and filters should be added.
+ indications and filters should be added.
+ :param timer_manager: (``TimerManager``): Timer manager instance.
"""
# List of all jobs. Dictionary job_id -> Job.
self.jobs = {}
@@ -660,6 +677,7 @@ class JobManager(object):
self.providers = {}
self.namespace = namespace
self.indication_manager = indication_manager
+ self.timer_manager = timer_manager
# Start the worker thread (don't forget to register it at CIMOM)
self.worker = threading.Thread(target=self._worker_main)
@@ -734,7 +752,7 @@ class JobManager(object):
* ``LMI_Affected<name>JobElement``
* ``LMI_Owning<name>JobElement``
* ``LMI_Associated<name>JobMethodResult``
-
+
:rtype: dictionary class_name -> CIMProvider2
"""
@@ -765,7 +783,7 @@ class JobManager(object):
def add_job(self, job):
"""
Enqueue new job. Send indication when needed.
-
+
:param job: (``Job``) A job to enqueue.
"""
cmpi_logging.logger.debug("Job %s: '%s' enqueued"
@@ -785,7 +803,7 @@ class JobManager(object):
"""
Send InstModification. This is helper method called by ``Job`` when
needed.
-
+
:param prev_instance: Instance of ``LMI_<name>Job`` before it was
modified.
:param current_instance: Instance of ``LMI_<name>Job`` after it was
@@ -795,15 +813,16 @@ class JobManager(object):
self.indication_manager.send_instmodification(prev_instance,
current_instance, _id)
+ @cmpi_logging.trace_method
def remove_job(self, job):
"""
Remove existing job. Note that jobs are removed automatically after a
timeout, providers should not call this method directly.
-
+
:param job: (``Job``) Job to remove.
"""
- # We cannot log here, this method is executed in job's Timer thread,
- # which is not registered at the cimom.
+ cmpi_logging.logger.debug("Job %s: '%s' removed from queue."
+ % (job.the_id, job.job_name))
del self.jobs[job.the_id]
# The job may still be in the queue!
# There is no way, how to remove it, it will be skipped by the
@@ -813,7 +832,7 @@ class JobManager(object):
def get_job_for_instance_id(self, instance_id, classname=None):
"""
Return Job for given InstanceID or None when no such Job exist.
-
+
:param instance_id: (``string``) InstanceID value to parse.
:param classname: (``string``) Optional classname to parse the
InstanceID (e.g. when parsing InstanceID of
@@ -862,7 +881,7 @@ class JobManager(object):
def get_next_id(self):
"""
Return next unused job id.
-
+
:rtype: string
"""
self.last_instance_id += 1
@@ -872,7 +891,7 @@ class JobManager(object):
def get_job_instance(self, job):
"""
Return CIMInstance for given job.
-
+
:param job: (``Job``)
:rtype: ``CIMInstance``
"""
@@ -912,7 +931,7 @@ class LMI_ConcreteJob(CIMProvider2):
def get_job_states(self, job):
"""
Return JobState and OperationalStatus property values.
-
+
:param job: (``int``) Job.STATE_* value.
:rtype: tuple ``(JobState, OperationalStatus)`` values.
"""
@@ -965,9 +984,9 @@ class LMI_ConcreteJob(CIMProvider2):
value=None,
type='datetime')
- if job.time_of_last_state_change:
+ if job.clocktime_of_last_state_change:
model['TimeOfLastStateChange'] = pywbem.CIMDateTime(
- job.time_of_last_state_change)
+ job.clocktime_of_last_state_change)
else:
model['TimeOfLastStateChange'] = pywbem.CIMProperty(
name='TimeOfLastStateChange',
@@ -975,7 +994,8 @@ class LMI_ConcreteJob(CIMProvider2):
type='datetime')
if job.elapsed_time:
- model['ElapsedTime'] = pywbem.CIMDateTime(job.elapsed_time)
+ elapsed_time = timedelta(seconds=job.elapsed_time)
+ model['ElapsedTime'] = pywbem.CIMDateTime(elapsed_time)
else:
model['ElapsedTime'] = pywbem.CIMProperty(
name='ElapsedTime',
@@ -985,8 +1005,8 @@ class LMI_ConcreteJob(CIMProvider2):
model['Description'] = job.job_name
model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time
model['PercentComplete'] = pywbem.Uint16(job.percent_complete)
- if job.start_time:
- model['StartTime'] = pywbem.CIMDateTime(job.start_time)
+ if job.start_clocktime:
+ model['StartTime'] = pywbem.CIMDateTime(job.start_clocktime)
else:
model['StartTime'] = pywbem.CIMProperty(
name='StartTime',
@@ -1016,12 +1036,12 @@ class LMI_ConcreteJob(CIMProvider2):
"""Return a newly created or modified instance.
:param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param instance: The new pywbem.CIMInstance. If modifying an existing
- instance, the properties on this instance have been filtered by
+ :param instance: The new pywbem.CIMInstance. If modifying an existing
+ instance, the properties on this instance have been filtered by
the PropertyList from the request.
:param modify_existing: True if ModifyInstance, False if CreateInstance
- Return the new instance. The keys must be set on the new instance.
+ Return the new instance. The keys must be set on the new instance.
"""
if not modify_existing:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED,
@@ -1069,7 +1089,7 @@ class LMI_ConcreteJob(CIMProvider2):
"""Delete an instance.
:param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param instance_name: A pywbem.CIMInstanceName specifying the instance
+ :param instance_name: A pywbem.CIMInstanceName specifying the instance
to delete.
"""
job = self.job_manager.get_job_for_instance_id(
@@ -1089,23 +1109,23 @@ class LMI_ConcreteJob(CIMProvider2):
If JobState is "Completed" and Operational Status is "Completed"
then no instance of CIM_Error is returned.
-
+
If JobState is "Exception" then GetErrors may return intances of
CIM_Error related to the execution of the procedure or method invoked by
the job.
-
+
If Operatational Status is not "OK" or "Completed" then
GetErrors may return CIM_Error instances related to the running of
the job.
-
+
:param env: -- Provider Environment (pycimmb.ProviderEnvironment)
- :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method GetErrors()
+ :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName
+ specifying the object on which the method GetErrors()
should be invoked.
Output parameters:
-
- * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...))
+
+ * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...))
If the OperationalStatus on the Job is not "OK", then this
method will return one or more CIM Error instance(s).
Otherwise, when the Job is "OK", null is returned.
@@ -1141,17 +1161,17 @@ class LMI_ConcreteJob(CIMProvider2):
Requests that the state of the job be changed to the value
specified in the RequestedState parameter. Invoking the
RequestStateChange method multiple times could result in earlier
- requests being overwritten or lost.
-
+ requests being overwritten or lost.
+
If 0 is returned, then the
task completed successfully. Any other return code indicates an
error condition.
-
+
:param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method RequestStateChange()
+ :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
+ specifying the object on which the method RequestStateChange()
should be invoked.
- :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState)
+ :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState)
RequestStateChange changes the state of a job. The possible
values are as follows: Start (2) changes the state to
\'Running\'. Suspend (3) stops the job temporarily. The
@@ -1164,8 +1184,8 @@ class LMI_ConcreteJob(CIMProvider2):
data or preserve the state. Service (6) puts the job into a
vendor-specific service state. It might be possible to restart
the job.
-
- :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime)
+
+ :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime)
A timeout period that specifies the maximum amount of time that
the client expects the transition to the new state to take.
The interval format must be used to specify the TimeoutPeriod.
@@ -1231,21 +1251,21 @@ class LMI_ConcreteJob(CIMProvider2):
GetError is deprecated because Error should be an array,not a
scalar.
-
+
When the job is executing or has terminated without
error, then this method returns no CIM_Error instance. However, if
the job has failed because of some internal problem or because the
job has been terminated by a client, then a CIM_Error instance is
returned.
-
+
:param env: Provider Environment (pycimmb.ProviderEnvironment)
- :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
- specifying the object on which the method GetError()
+ :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName
+ specifying the object on which the method GetError()
should be invoked.
Output parameters:
-
- * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``)
+
+ * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``)
If the OperationalStatus on the Job is not "OK", then this
method will return a CIM Error instance. Otherwise, when the
Job is "OK", null is returned.
@@ -1519,7 +1539,7 @@ class LMI_MethodResult(CIMProvider2):
yield self.get_instance(env, model, job)
class LMI_AssociatedJobMethodResult(CIMProvider2):
- """
+ """
Instrumentation of LMI_AssociatedJobMethodResult class and its
subclasses.
"""
diff --git a/src/python/openlmi/common/TimerManager.py b/src/python/openlmi/common/TimerManager.py
new file mode 100644
index 0000000..5eb39ae
--- /dev/null
+++ b/src/python/openlmi/common/TimerManager.py
@@ -0,0 +1,400 @@
+# Copyright (C) 2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Jan Safranek <jsafrane@redhat.com>
+# -*- coding: utf-8 -*-
+"""
+Module with functionality to create timers, which can be used in CMPI providers.
+
+Default python threading.Timer is not suitable, because it creates thread
+for each timer, which is inefficient. In addition, each such thread would need
+to be registered at CIMOM to enable logging in timer callbacks.
+
+Usage:
+
+1. Initialize the TimerManager when your provider initializes!
+Otherwise you may encounter weird exceptions.
+
+2. When any provider needs timer, create it using Time.create_timer() to create
+Timer instance.
+
+3. Call Timer.start() to start the timer. It will call registered callback
+when the timer expires. The callback is called in context of TimerManager
+thread, which has enabled logging to CIMOM, i.e. the callback can log as usual.
+
+4. (optionally) cancel the timer before expiration using Timer.cancel().
+However, this does not guarantee that the timer callback won't be called -
+it may be already being scheduled / called.
+
+.. autoclass:: TimerManager
+ :members:
+
+.. autoclass:: Timer
+ :members:
+
+.. autoclass:: MonotonicClock
+ :members:
+"""
+
+import ctypes
+import openlmi.common.singletonmixin as singletonmixin
+import threading
+import Queue
+import openlmi.common.cmpi_logging as cmpi_logging
+
+class TimerException(Exception):
+ pass
+
+class MonotonicClock(object):
+ """
+ Monotonic clock, represented by clock_gettime() and CLOCK_MONOTONIC.
+ This clock is not influenced by NTP or administrator setting time or date.
+ """
+ CLOCK_MONOTONIC = ctypes.c_int(1)
+
+ class timespec(ctypes.Structure):
+ _fields_ = [
+ ("tv_sec", ctypes.c_long),
+ ("tv_nsec", ctypes.c_long)]
+
+ def __init__(self):
+ libc = ctypes.CDLL("librt.so.1")
+ self._clock_gettime = libc.clock_gettime
+
+ def now(self):
+ """
+ Return current time, i.e. float representing seconds with precision up
+ to nanoseconds (depends on glibc). The actual value of current time is
+ meaningless, it can be used only to measure time differences.
+
+ :returns: ``float`` with current time in seconds.
+ """
+ t = MonotonicClock.timespec(0, 0)
+ ret = self._clock_gettime(self.CLOCK_MONOTONIC, ctypes.pointer(t))
+
+ if ret < 0:
+ raise TimerException("Cannot get clock time, clock_gettime() failed.")
+ return t.tv_sec + t.tv_nsec * 10 ** (-9)
+
+class Timer(object):
+ """
+ A class representing a timer. A timer has a timeout and after the timeout,
+ given callback is called and the timer is deleted.
+ """
+ @cmpi_logging.trace_method
+ def __init__(self, timer_manager, name, callback=None, *args, **kwargs):
+ """
+ Create a timer. If specified, given callback is registered.
+ The callback is called with *args and **kwargs.
+
+ :param timer_manager: (``TimerManager)`` Instance of the timer manager
+ which will manage the timer.
+ :param name: (``string``) Name of the timer, used for logging.
+ :param callback: (``function``) Callback to call when the timer expires.
+ :param *args, **kwargs: Parameters of the callback.
+ """
+ self._mgr = timer_manager
+ self._name = name
+ self._callback = callback
+ self._args = args
+ self._kwargs = kwargs
+
+ cmpi_logging.logger.trace_info("Timer: Timer %s created" % name)
+
+ @cmpi_logging.trace_method
+ def set_callback(self, callback, *args, **kwargs):
+ """
+ Set callback to call when the timer expires.
+
+ :param callback: (``function``) Callback to call when the timer expires.
+ :param *args, **kwargs: Parameters of the callback.
+ """
+ self._callback = callback
+ self._args = args
+ self._kwargs = kwargs
+
+ @cmpi_logging.trace_method
+ def start(self, timeout):
+ """
+ Start the timer with given timeout. After the timeout, the registered
+ callback will be called.
+
+ :param timeout: (``float``) Timeout in seconds.
+ """
+
+ self._timeout = timeout
+ now = self._mgr.now()
+ self._end_time = now + timeout
+ cmpi_logging.logger.trace_info(
+ "Timer: Timer %s started at %f for %f seconds"
+ % (self._name, now, self._timeout))
+ self._mgr._add_timer(self)
+
+ @cmpi_logging.trace_method
+ def cancel(self):
+ """
+ Cancel the timer. This method does not guarantee that the callback won't
+ be called, the timer might be calling the callback right now,
+ """
+ cmpi_logging.logger.trace_info("Timer: Timer %s cancelled"
+ % (self._name))
+ self._mgr._remove_timer(self)
+
+ @cmpi_logging.trace_method
+ def _expired(self, now):
+ """
+ Returns True, if the timer is expired.
+
+ :param now: (``float``) Current time, as returned by MonotonicClock.now().
+ :returns: (``boolean``) ``True``, if the timer is expired.
+ """
+ if self._end_time <= now:
+ cmpi_logging.logger.trace_info("Timer: Timer %s has expired"
+ % (self._name))
+ return True
+ return False
+
+ @cmpi_logging.trace_method
+ def _expire(self):
+ """
+ Called when the timer expired. It calls the callback.
+ """
+ cmpi_logging.logger.trace_info("Timer: Calling callback for timer %s"
+ % (self._name))
+ self._callback(*self._args, **self._kwargs)
+
+class TimerManager(singletonmixin.Singleton):
+ """
+ Manages set of timers.
+
+ Python standard Timer class creates a thread for
+
+ each timer, which is inefficient. This class uses only one thread, which
+ is registered at CIMOM, i.e. it can log as usual.
+
+ This class is singleton, use TimerManager.get_instance() to get the
+ instance.
+
+ Still, the singleton needs to be initialized with ProviderEnvironment to
+ enable logging in the timer thread. Use TimerManager.get_instance(env) in
+ you provider initialization.
+ """
+
+ @cmpi_logging.trace_method
+ def __init__(self, env=None):
+ """
+ Initialize new thread manager.
+
+ :param env: (``ProviderEnvironment``) Environment to use for logging.
+ """
+ self._clock = MonotonicClock()
+ self._lock = threading.RLock()
+ self._queue = Queue.Queue()
+
+ # Array of timers. Assumption: nr. of timers is relatively small,
+ # i.e. hundreds at the worst.
+ self._timers = []
+
+ new_broker = None
+ if env:
+ broker = env.get_cimom_handle()
+ new_broker = broker.PrepareAttachThread()
+
+ self._timer_thread = threading.Thread(
+ target=self._timer_loop, args=(new_broker,))
+ self._timer_thread.daemon = True
+ self._timer_thread.start()
+
+ def create_timer(self, name, callback=None, *args, **kwargs):
+ """
+ Create new timer. If specified, given callback is registered.
+ The callback is called with *args and **kwargs.
+
+ :param name: (``string``) Name of the timer, used for logging.
+ :param callback: (``function``) Callback to call when the timer expires.
+ :param *args, **kwargs: Parameters of the callback.
+ """
+ return Timer(self, name, callback, *args, **kwargs)
+
+ def _timer_loop(self, broker):
+ """
+ TimerManager thread main loop. It waits for timeout of all timers
+ and calls their callbacks.
+
+ :param broker: (``BrokerCIMOMHandle``) CIM broker handle, used for
+ logging.
+ """
+ if broker:
+ broker.AttachThread()
+ while True:
+ self._handle_expired()
+ timeout = self._find_timeout()
+ if timeout != 0:
+ # Wait for the timeout or any change in timers.
+ try:
+ self._queue.get(timeout=timeout)
+ self._queue.task_done()
+ except Queue.Empty:
+ # Timeout has happened, ignore the exception.
+ pass
+
+ @cmpi_logging.trace_method
+ def _handle_expired(self):
+ """
+ Finds all expired timers, calls their callback and removes them from
+ list of timers.
+ """
+
+ # Get list of expired timers.
+ with self._lock:
+ now = self.now()
+ cmpi_logging.logger.trace_info(
+ "Timer: Checking for expired, now=%f." % (now))
+ expired = [t for t in self._timers if t._expired(now)]
+
+ # Call the callbacks (unlocked!).
+ for t in expired:
+ t._expire()
+
+ # Remove the timers (locked).
+ with self._lock:
+ for t in expired:
+ try:
+ cmpi_logging.logger.trace_info(
+ "Timer: Removing %s" % (t._name))
+ self._timers.remove(t)
+ except ValueError:
+ # The timer has already been removed.
+ pass
+
+ @cmpi_logging.trace_method
+ def _find_timeout(self):
+ """
+ Return nearest timeout, in seconds (as float, i.e. subsecond timeout
+ is possible). If no timer is scheduled, None is returned.
+ If there are expired timers, 0 is returned.
+
+ :returns: Positive ``float``: Nearest timeout.
+ :returns: ``0``: Some timer has expired.
+ :returns: ``None``: No timer is scheduled.
+ """
+ with self._lock:
+ if not self._timers:
+ cmpi_logging.logger.trace_info(
+ "Timer: No timers scheduled, waiting forever.")
+ return None
+ closest = min(self._timers, key=lambda timer: timer._end_time)
+ now = self.now()
+ timeout = closest._end_time - now
+ if timeout > 0:
+ cmpi_logging.logger.trace_info(
+ "Timer: Waiting for %f seconds, now=%f."
+ % (timeout, now))
+ return timeout
+ cmpi_logging.logger.trace_info(
+ "Timer: Some timer has already expired, no waiting.")
+ return 0
+
+ @cmpi_logging.trace_method
+ def _add_timer(self, timer):
+ """
+ Adds timer to list of timers. The timer must be started, i.e. its
+ timeout must be nozero!
+ This is internal method called by Timer.start().
+
+ :param timer: (``Timer``) Timer to add.
+ """
+ with self._lock:
+ self._timers.append(timer)
+ # Wake up the timer manager thread.
+ self._queue.put("")
+ cmpi_logging.logger.trace_info("Timer: Timer %s added" % (timer._name))
+
+ @cmpi_logging.trace_method
+ def _remove_timer(self, timer):
+ """
+ Remove timer from list of timers.
+ This is internal method called by Timer.cancel().
+ :param timer: (``Timer``) Timer to remove.
+ """
+ with self._lock:
+ try:
+ self._timers.remove(timer)
+ except ValueError:
+ pass
+ # Wake up the timer manager thread.
+ self._queue.put("")
+ cmpi_logging.logger.trace_info("Timer: Timer %s removed"
+ % (timer._name))
+
+ def now(self):
+ """
+ Return current time, not influenced by NTP or admin setting date or
+ time. The actual value of current time is meaningless, it can be used
+ only to measure time differences.
+
+ :returns: ``float`` Current time, in seconds.
+ """
+ return self._clock.now()
+
+if __name__ == "__main__":
+ cmpi_logging.logger = cmpi_logging.CMPILogger("")
+ import time
+
+ class Env(object):
+ def AttachThread(self):
+ pass
+ def PrepareAttachThread(self):
+ return self
+ def get_cimom_handle(self):
+ return self
+
+ clock = MonotonicClock()
+
+ start = clock.now()
+ time.sleep(0.5)
+ print "Clock 0.5:", clock.now() - start
+
+ time.sleep(0.5)
+ print "Clock 1:", clock.now() - start
+
+ mgr = TimerManager.get_instance(Env())
+
+ def callback(msg):
+ if callback.first:
+ t = mgr.create_timer("internal 0.5")
+ t.set_callback(callback, "internal 0.5")
+ t.start(0.5)
+ callback.first = False
+
+ print clock.now(), msg
+
+ callback.first = True
+
+ t1 = mgr.create_timer("one second")
+ t1.set_callback(callback, "1")
+ t1.start(1)
+ t2 = mgr.create_timer("two seconds")
+ t2.set_callback(callback, "2")
+ t2.start(2)
+ t22 = mgr.create_timer("two seconds 2")
+ t22.set_callback(callback, "2 again")
+ t22.start(2)
+ t15 = mgr.create_timer("one+half seconds")
+ t15.set_callback(callback, "1.5")
+ t15.start(1.5)
+
+ time.sleep(4)
diff --git a/src/python/openlmi/common/cmpi_logging.py b/src/python/openlmi/common/cmpi_logging.py
index c8755f5..d9fb5e8 100644
--- a/src/python/openlmi/common/cmpi_logging.py
+++ b/src/python/openlmi/common/cmpi_logging.py
@@ -146,7 +146,7 @@ class LogManager(object):
self.logger = logging.getLogger(self.LOGGER_NAME)
self.logger.addHandler(self.cmpi_handler)
- self.logger.setLevel(logging.DEBUG)
+ self.logger.setLevel(logging.INFO)
self.stderr_handler = None
self.config = None