diff options
author | Michal Minar <miminar@redhat.com> | 2013-07-03 18:15:23 +0200 |
---|---|---|
committer | Michal Minar <miminar@redhat.com> | 2013-07-04 10:33:48 +0200 |
commit | a4e6cc3c9f273cbbf911340edd02fa17a8379917 (patch) | |
tree | 11e5616c41385a9a9a455f4874e12cf5bbd0c376 /src/python/openlmi/common/JobManager.py | |
parent | 4c0ec0ef72a6aef0413b9e8eca9380bd31bf4444 (diff) | |
download | openlmi-providers-a4e6cc3c9f273cbbf911340edd02fa17a8379917.tar.gz openlmi-providers-a4e6cc3c9f273cbbf911340edd02fa17a8379917.tar.xz openlmi-providers-a4e6cc3c9f273cbbf911340edd02fa17a8379917.zip |
renamed openlmi namespace to lmi
To comply with lmi shell, which is placed in *lmi* package, and to
make our imports shorter, we are renaming *openlmi* namespace to *lmi*.
Diffstat (limited to 'src/python/openlmi/common/JobManager.py')
-rw-r--r-- | src/python/openlmi/common/JobManager.py | 1654 |
1 files changed, 0 insertions, 1654 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py deleted file mode 100644 index 9f977c9..0000000 --- a/src/python/openlmi/common/JobManager.py +++ /dev/null @@ -1,1654 +0,0 @@ -# Copyright (C) 2013 Red Hat, Inc. All rights reserved. -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License as published by the Free Software Foundation; either -# version 2.1 of the License, or (at your option) any later version. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA -# -# Authors: Jan Safranek <jsafrane@redhat.com> -# -*- coding: utf-8 -*- -""" - Basic infrastructure for asynchronous jobs. All necessary CIM classes and - indications are implemented here. - - .. autoclass:: JobManager - :members: - - .. autoclass:: Job - :members: - - .. autoclass:: LMI_ConcreteJob - :members: - - .. autoclass:: LMI_OwningJobElement - :members: - - .. autoclass:: LMI_AffectedJobElement - :members: - - .. autoclass:: LMI_MethodResult - :members: - - .. autoclass:: LMI_AssociatedJobMethodResult - :members: -""" - -from datetime import datetime, timedelta -import threading -from Queue import Queue -import pywbem -import openlmi.common.cmpi_logging as cmpi_logging -import openlmi.common -from openlmi.common.IndicationManager import IndicationManager -from pywbem.cim_provider2 import CIMProvider2 -import socket -import traceback - -@cmpi_logging.trace_function -def register_filters(job_clsname, indication_manager=None): - """ - This function registers static indication filters at IndicationManager. - It should be called upon provider's initialization. - - :param job_clsname: (``String``) CIM class name for asynchonous jobs. - Will be part of filter queries. - :param indication_manager: If not given, global instance will be obtained. - """ - if indication_manager is None: - ind_manager = IndicationManager.get_instance() - filters = {} - query_args = { - "classname" : job_clsname, - "prefix" : indication_manager.nameprefix - } - for fltr_id, fltr_props in JobManager.IND_FILTERS.items(): - filters[fltr_id] = fltr_props.copy() - filters[fltr_id]['Query'] = fltr_props['Query'] % query_args - indication_manager.add_filters(job_clsname, filters) - -# Too many instance attributes -# pylint: disable-msg=R0902 -class Job(object): - """ - Generic abstract class representing one CIM_ConcreteJob. - It remembers input and output arguments, affected ManagedElements and - owning ManagedElement (to be able to create associations to them) - and all CIM_ConcreteJob properties. - - Due to multiple threads processing the job, each job has its own - lock to guard its status changes. It is expected that number of jobs - is quite low. - """ - - DEFAULT_TIME_BEFORE_REMOVAL = 60 # in seconds - - STATE_QUEUED = 1 # Job has not started yet - STATE_RUNNING = 2 # Job is running - STATE_FINISHED_OK = 3 # Job finished OK - STATE_FAILED = 4 # Job finished with error - STATE_SUSPENDED = 5 # Job is queued and suspended - STATE_TERMINATED = 6 # Job was queued and terminated - - FINAL_STATES = [STATE_FINISHED_OK, STATE_FAILED, STATE_SUSPENDED, - STATE_TERMINATED] - - # There is no way how to suspend/terminate running job! - - @cmpi_logging.trace_method - def __init__(self, job_manager, job_name, input_arguments, - method_name, affected_elements, owning_element): - """ - Create new storage job. - - :param job_manager: (``JobManager``) Reference to ``JobManager``, which - will manage this job. - :param job_name: (``string``) User-friendly name of the job. - :param input_arguments: (``dictionary param_name -> param_value``) - Input arguments of the method, which spawned this job. - :param method_name: (``string``) Name of the CIM method, which spawned - this job. - :param affected_elements: (``array of CIMInstanceName``) List of - affected elements. ``LMI_AffectedJobElement`` association will be - created for them. - :param owning_element: (``CIMInstanceName``) Reference to service, which - spawned the job. ``LMI_OwningJobElement`` association will be - created for it. - """ - self.job_manager = job_manager - self.timer_manager = job_manager.timer_manager - - # Unique ID - self.the_id = job_manager.get_next_id() - - # User friendly name of the job - self.job_name = job_name - - # Dictionary of input arguments, 'parameter_name' -> 'parameter_value' - # The parameter value must be CIMProperty or something that can be - # assigned to it. - self.input_arguments = input_arguments - - # Dictionary of output arguments, 'parameter_name' -> 'parameter_value' - # The parameter value must be CIMProperty or something that can be - # assigned to it. - self.output_arguments = None - - # Method return value, as CIMProperty or something that can be - # assigned to it. - self.return_value = None - # Value of Job.ReturnValueType - self.return_value_type = None - - # Name of the method - self.method_name = method_name - - # Time when the job was created - self.time_submitted = datetime.utcnow() - - # Nr. of seconds before the job is removed when the job finishes - self.time_before_removal = self.DEFAULT_TIME_BEFORE_REMOVAL - - # If the job should be removed after completion - self.delete_on_completion = True - - self.percent_complete = 0 - - # State of the job - self.job_state = self.STATE_QUEUED - - # Last change of job state, wall clock time - self.clocktime_of_last_state_change = self.time_submitted - - # Duration of the job in RUNNING state (in seconds) - self.elapsed_time = None - - # When the job started (= switched to RUNNING), wall clock time - self.start_clocktime = None - # When the job started (= switched to RUNNING), monotonic clock time - self.start_monotime = None - # When the job finished (= switched from RUNNING), monotonic clock time - self.finish_monotime = None - - # Array of CIMInstanceNames of affected elements, so we can - # enumerate associations to them. - self.affected_elements = affected_elements - - # CIMInstanceName to owning element (service), so we can enumerate - # instances. - self.owning_element = owning_element - - # Timer used to delete the job after time_before_removal seconds - self.timer = None - - # CIMError with result code - self.error = None - - # internal lock to protect state changes from races - self._lock = threading.RLock() - - self._execute = None - self._execargs = None - self._execkwargs = None - self._cancel = None - self._cancelargs = None - self._cancelkwargs = None - - @cmpi_logging.trace_method - def set_execute_action(self, callback, *args, **kwargs): - """ - Set callback, which will be called when the job is to be executed. It is - expected that the callback will take some time to execute. The callback - must change state of the job and set output parameters and error in a - thread-safe way, i.e. by calling ``finish_method()``. - - :param callback: (``function``) Reference to callback to call. - :param args, kwargs: All other parameters will be passed to the - callback. It is highly recommended to add reference to the job - to the callback. - """ - self._execute = callback - self._execargs = args - self._execkwargs = kwargs - - @cmpi_logging.trace_method - def set_cancel_action(self, callback, *args, **kwargs): - """ - Set callbacks, which will be called when the job is to be - cancelled. The callback must be quick, the job is already locked! - - :param callback: (``function``) Reference to callback to call. - :param args, kwargs: All other parameters will be passed to the - callback. It is highly recommended to add reference to the job - to the callback. - """ - self._cancel = callback - self._cancelargs = args - self._cancelkwargs = kwargs - - @cmpi_logging.trace_method - def finish_method(self, new_state, return_value=None, return_type=None, - output_arguments=None, error=None, affected_elements=None): - """ - Mark the job as finished, with given return value, output parameters and - error. - This method is thread-safe. - - :param new_state: (``Job.STATE_* value``) Resulting state of the job. - :param return_value: (``string``) Return value of the job, encoded - into string. Can be None when the job does not return any value. - :param return_type: (``Job.RetunValueType.* value``) Type of the return - value. Can be None when the job does not return any value. - :param output_arguments: (``dictionary param_name -> param_value``) - Output arguments of the job. Can be None when the job does not have - any output parameters. - :param error: (``CIMError``) Error raised by the job. Can be None, - when the job finished successfully. - :param affected_elements: (``array of CIMInstanceName``) New list of - affected elements to generate LMI_<name>JobAffectedElement - association. If None, the old list, passed to constructor, remains - untouched. - """ - self.lock() - self.return_value = return_value - self.return_value_type = return_type - self.output_arguments = output_arguments - self.error = error - if affected_elements is not None: - self.affected_elements = affected_elements - self.change_state(new_state, 100) - self.unlock() - - @cmpi_logging.trace_method - def change_state(self, new_state, percent=None): - """ - Change state of a job. (Re-)calculate various times based on the state - change. Send indications as necessary. - This method is thread-safe. - - :param new_state: (``Job.STATE_* value``) New state of the job. - It can be the same as the previous state to indicate progress of - the job. - :param percent: (``int``)) Percent complete of the job. When None, - this valuu will be automatically calculated (in progress = 50%, - finished = 100%). - """ - self.lock() - - cmpi_logging.logger.debug("Job %s: %s changes state from %d to %d" - % (self.the_id, self.job_name, self.job_state, new_state)) - - # For sending indications - prev_instance = None - send_indication = False - indication_ids = [] - - if self.job_state != new_state: - # Remember to send indications - prev_instance = self.job_manager.get_job_instance(self) - send_indication = True - indication_ids.append(JobManager.IND_JOB_CHANGED) - - # Check if the job has just finished - if (self.job_state not in self.FINAL_STATES - and new_state in self.FINAL_STATES): - # Remember finish time - self.finish_clocktime = datetime.utcnow() - self.finish_monotime = self.timer_manager.now() - # Remember job execution time. - if self.start_monotime: - self.elapsed_time = self.finish_monotime \ - - self.start_monotime - # Send indication - if self.job_state == self.STATE_FAILED: - indication_ids.append(JobManager.IND_JOB_FAILED) - if self.job_state == self.STATE_SUSPENDED: - indication_ids.append(JobManager.IND_JOB_SUCCEEDED) - - # Check if the job has just started - if new_state == self.STATE_RUNNING: - self.start_clocktime = datetime.utcnow() - self.start_monotime = self.timer_manager.now() - - self.clocktime_of_last_state_change = datetime.utcnow() - self.job_state = new_state - - if percent is None: - # guess the percentage from status - if new_state == self.STATE_QUEUED: - percent = 0 - elif new_state == self.STATE_RUNNING: - percent = 50 - else: - percent = 100 - if self.percent_complete != percent: - # Remember to send indications - if not send_indication: - self.clocktime_of_last_state_change = datetime.utcnow() - prev_instance = self.job_manager.get_job_instance(self) - send_indication = True - indication_ids.append(JobManager.IND_JOB_PERCENT_UPDATED) - self.percent_complete = percent - - if send_indication: - current_instance = self.job_manager.get_job_instance(self) - self.job_manager.send_modify_indications( - prev_instance, current_instance, indication_ids) - - # start / update the timer if necesasry - self._restart_timer() - self.unlock() - - @cmpi_logging.trace_method - def _expire(self): - """ - Callback when a Job completes and time_before_removal second passed. - The job gets removed from its JobManager. - """ - cmpi_logging.logger.debug("Job %s: %s expired" - % (self.the_id, self.job_name)) - - self.job_manager.remove_job(self) - - @cmpi_logging.trace_method - def _restart_timer(self): - """ - Re-schedule timer for TimeBeforeRemoval because some property has - changed. - """ - if not self.job_state in self.FINAL_STATES: - return - - # Stop the old timer. - if self.timer: - self.timer.cancel() - self.timer = None - - # Start the new timer. - if self.delete_on_completion: - now = self.timer_manager.now() - passed = now - self.finish_monotime - timeout = self.time_before_removal - passed - if timeout <= 0: - # Just in case... - self._expire() - return - - cmpi_logging.logger.debug("Starting timer for job %s: '%s' for %f" - " seconds" % (self.the_id, self.job_name, timeout)) - self.timer = self.timer_manager.create_timer( - "Job " + self.job_name, - callback=self._expire) - self.timer.start(timeout) - - @cmpi_logging.trace_method - def lock(self): - """ - Lock internal mutex. Other threads will block on subsequent lock(). - The lock is recursive, i.e. can be called multiple times from - single thread. - """ - self._lock.acquire() - - @cmpi_logging.trace_method - def unlock(self): - """ Unlock internal mutex.""" - self._lock.release() - - @cmpi_logging.trace_method - def execute(self): - """ - Start executing the job. It calls the execute callback, set by - ``set_execute_action()``. - - job_state must be already set to STATE_RUNNING. - Any exception is translated to CIMError and appropriate state is set. - """ - try: - self._execute(*(self._execargs), **(self._execkwargs)) - except pywbem.CIMError, error: - cmpi_logging.logger.trace_warn("Job.execute caught an CIMError %s", - str(error)) - cmpi_logging.logger.trace_verbose("traceback: %s", - traceback.format_exc()) - self.finish_method(Job.STATE_FAILED, error=error) - except Exception, ex: - cmpi_logging.logger.trace_warn("Job.execute caught an Exception %s", - str(ex)) - cmpi_logging.logger.trace_verbose("traceback: %s", - traceback.format_exc()) - error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex)) - self.finish_method(Job.STATE_FAILED, error=error) - - @cmpi_logging.trace_method - def cancel(self): - """ - Cancels queued action. The action must have not been started. - """ - self.change_state(self.STATE_TERMINATED) - if self._cancel: - self._cancel(*(self._cancelargs), **(self._cancelkwargs)) - - @cmpi_logging.trace_method - def get_name(self): - """ - Return CIMInstanceName of the job. - - :rtype: ``CIMInstanceName`` - """ - name = pywbem.CIMInstanceName( - classname=self.job_manager.job_classname, - namespace=self.job_manager.namespace, - keybindings={ - 'InstanceID': self.get_instance_id() - }) - return name - - @cmpi_logging.trace_method - def get_instance_id(self, classname=None): - """ - Return InstanceID. - - :param classname: (``string``) Optional classname to generate InstanceID - for different class, e.g. for LMI_<name>MethodResult. - :rtype: ``string`` - """ - if classname is None: - classname = self.job_manager.job_classname - return 'LMI:' + classname + ':' + str(self.the_id) - - @cmpi_logging.trace_method - def get_pre_call(self): - """ - Return indication that describes the pre-execution values of the - job's invocation. - - :rtype: ``CIMInstance of CIM_InstMethodCall`` - """ - path = pywbem.CIMInstanceName( - classname="CIM_InstMethodCall", - keybindings={}, - host=socket.gethostname(), - namespace=self.job_manager.namespace) - inst = pywbem.CIMInstance( - classname="CIM_InstMethodCall", - path=path) - src_instance = self._get_cim_instance() - inst['SourceInstance'] = src_instance - inst['SourceInstanceModelPath'] = str(src_instance.path) - inst['MethodName'] = self.method_name - inst['MethodParameters'] = self.get_method_params( - '__MethodParameters', True, False) - inst['PreCall'] = True - return inst - - @cmpi_logging.trace_method - def get_cim_error(self): - """ - Return job error as CIMInstance of CIM_Error. - :returns: CIMInstance of CIM_Error - """ - path = pywbem.CIMInstanceName( - classname="CIM_Error", - host=socket.gethostname(), - namespace=self.job_manager.namespace) - err = pywbem.CIMInstance( - classname="CIM_Error", - path=path) - err['CIMStatusCode'] = pywbem.Uint32(self.error[0]) - err['Message'] = self.error[1] - return err - - @cmpi_logging.trace_method - def get_post_call(self): - """ - Return indication that describes the post-execution values of the - job's invocation. - - :rtype: ``CIMInstance of CIM_InstMethodCall`` - """ - path = pywbem.CIMInstanceName( - classname="CIM_InstMethodCall", - keybindings={}, - host=socket.gethostname(), - namespace=self.job_manager.namespace) - inst = pywbem.CIMInstance( - classname="CIM_InstMethodCall", - path=path) - - src_instance = self._get_cim_instance() - inst['SourceInstance'] = src_instance - inst['SourceInstanceModelPath'] = str(src_instance.path) - inst['MethodName'] = self.method_name - inst['MethodParameters'] = self.get_method_params( - '__MethodParameters', True, True) - inst['PreCall'] = False - - if self.return_value_type is not None: - inst['ReturnValueType'] = self.return_value_type - if self.return_value is not None: - inst['ReturnValue'] = str(self.return_value) - if self.error is not None: - err = self.get_cim_error() - inst['Error'] = [err, ] - return inst - - @cmpi_logging.trace_method - def _get_cim_instance(self): - """ - Return CIMInstance of this job. - - :rtype: CIMInstance - """ - return self.job_manager.get_job_instance(self) - - @cmpi_logging.trace_method - def get_method_params(self, class_name, include_input, include_output): - """ - Create a class of given name with all input or output parameters - of the asynchronous method. Typically used to assemble - CIM_ConcreteJob.JobInParameters or CIM_InstMethodCall.MethodParameters - values. - - :param class_name: (``string``) Name of the class to create. - :param input: (``boolean``) Whether input parameters should be - included in the returned class - :param output: (``boolean``) Whether output parameters should be - included in the returned class - :rtype: CIMInstance of the created class. - """ - # TODO: this is workaround for bug #920763, use class_name - # when it's fixed - clsname = "CIM_ManagedElement" - path = pywbem.CIMInstanceName( - classname=clsname, - namespace=self.job_manager.namespace) - inst = pywbem.CIMInstance(classname=clsname, path=path) - if include_input and self.input_arguments: - for (name, value) in self.input_arguments.iteritems(): - inst[name] = value - if include_output and self.output_arguments: - # overwrite any input parameter - for (name, value) in self.output_arguments.iteritems(): - inst[name] = value - return inst - - # pylint: disable-msg=R0903 - class ReturnValueType(object): - """ CIM_InstMethodCall.ReturnValueType values.""" - Boolean = pywbem.Uint16(2) - String = pywbem.Uint16(3) - Char16 = pywbem.Uint16(4) - Uint8 = pywbem.Uint16(5) - Sint8 = pywbem.Uint16(6) - Uint16 = pywbem.Uint16(7) - Sint16 = pywbem.Uint16(8) - Uint32 = pywbem.Uint16(9) - Sint32 = pywbem.Uint16(10) - Uint64 = pywbem.Uint16(11) - Sint64 = pywbem.Uint16(12) - Datetime = pywbem.Uint16(13) - Real32 = pywbem.Uint16(14) - Real64 = pywbem.Uint16(15) - Reference = pywbem.Uint16(16) - -class JobManager(object): - """ - Container of all queued, running or finished ``LMI_ConcreteJobs``. - - Usage: - - 1. Create MOF file for these classes: - - * ``LMI_<name>Job`` - - * ``LMI_<name>MethodResult`` - - * ``LMI_Affected<name>JobElement`` - - * ``LMI_Owning<name>JobElement`` - - * ``LMI_Associated<name>JobMethodResult`` - - Where ``<name>`` is prefix of your classes, for example 'Storage' - - 2. During initialization, initialize ``TimerManager`` and create - ``JobManager``. - - 3. When needed. create new Job instance: - - 4. Set its execute callback using ``set_execute_action()``. This callback - will be called when the job is to be executed. It will be called in - context of ``JobManager`` worker thread! - - 5. Optionally, set cancel callback using ``set_execute_action()``. This - callback will be called when the job is still queued and is cancelled by - application. This callback will be called in context of CIMOM callback - and should be quick! - - 6. Enqueue the job using ``JobManager.add_job()`` method. - - 7. When your execute callback is called, you can optionally call - ``job.change_state()`` to update percentage of completion. - - 8. When your execute callback is finished, don't forget to set method - result using ``job.finish_method()``. - - * ``JobManager`` automatically sends all job-related indications. - * ``Job`` automatically tracks various timestamps. - * By default, the job automatically disappears after 60 seconds after it - finishes. Application may set ``DeleteOnCompletion`` and - ``TimeBeforeRemoval`` properties of ``LMI_<name>Job`` to override this - timeout. - """ - - COMMAND_STOP = 1 - - IND_JOB_PERCENT_UPDATED = "PercentUpdated" - IND_JOB_SUCCEEDED = "Succeeded" - IND_JOB_FAILED = "Failed" - IND_JOB_CHANGED = "Changed" - IND_JOB_CREATED = "Created" - - IND_FILTERS = { - IND_JOB_PERCENT_UPDATED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::PercentComplete <> " - "PreviousInstance.CIM_ConcreteJob::PercentComplete", - "Description" : "Modification of Percentage Complete for a " - "Concrete Job.", - }, - IND_JOB_SUCCEEDED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::JobState = 17", - "Description": "Modification of Job State for a " - "Concrete Job to 'Complete'.", - }, - IND_JOB_FAILED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::JobState = 10", - "Description": "Modification of Job State for a " - "Concrete Job to 'Exception'.", - }, - IND_JOB_CHANGED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstModification WHERE " - "SourceInstance ISA %(classname)s AND " - "SourceInstance.CIM_ConcreteJob::JobState <> " - "PreviousInstance.CIM_ConcreteJob::JobState", - "Description": "Modification of Job State for a ConcreteJob.", - }, - IND_JOB_CREATED: { - "Query" : "SELECT * FROM LMI_%(prefix)sInstCreation WHERE " - "SourceInstance ISA %(classname)s", - "Description": "Creation of a ConcreteJob.", - }, - } - - @cmpi_logging.trace_method - def __init__(self, name, namespace, indication_manager, timer_manager): - """ - Initialize new Manager. It automatically registers all job-related - filters to indication_manager and starts a worker thread. - - :param name: (``string``) String with classname infix. For example - 'Storage' for ``LMI_StorageJob``, ``LMI_StorageJobMethodResult`` - etc. - :param namespace: (``string``) Namespace of all providers. - :param indication_manager: (``IndicationManager``): a manager where - indications and filters should be added. - :param timer_manager: (``TimerManager``): Timer manager instance. - """ - # List of all jobs. Dictionary job_id -> Job. - self.jobs = {} - # Queue of jobs scheduled to execute. - self.queue = Queue() - # Last created job_id. - self.last_instance_id = 0 - # Classname infix. - self.name = name - # CIMProvider2 instances for job classes. - self.providers = {} - self.namespace = namespace - self.indication_manager = indication_manager - self.timer_manager = timer_manager - - # Start the worker thread (don't forget to register it at CIMOM) - self.worker = threading.Thread(target=self._worker_main) - self.worker.daemon = False - self.worker.start() - - # Various classnames for job-related classes, with correct infixes. - self.job_classname = 'LMI_' + self.name + 'Job' - self.method_result_classname = "LMI_" + self.name + "MethodResult" - self.affected_classname = "LMI_Affected" + self.name + "JobElement" - self.owning_classname = "LMI_Owning" + self.name + "JobElement" - self.associated_result_classname = ('LMI_Associated' + self.name - + 'JobMethodResult') - self.indication_filter_classname = ('LMI_' + self.name - + 'JobIndicationFilter') - self.job_provider = None - self._add_indication_filters() - - @cmpi_logging.trace_method - def _add_indication_filters(self): - """ - Add all job-related ``IndicationFilters`` to indication manager. - """ - register_filters(self.job_classname, self.indication_manager) - - @cmpi_logging.trace_method - def get_providers(self): - """ - Get dictionary of providers for these classes: - - * ``LMI_<name>Job`` - * ``LMI_<name>MethodResult`` - * ``LMI_Affected<name>JobElement`` - * ``LMI_Owning<name>JobElement`` - * ``LMI_Associated<name>JobMethodResult`` - - :rtype: dictionary class_name -> CIMProvider2 - """ - - if not self.providers: - job_provider = LMI_ConcreteJob(self.job_classname, job_manager=self) - self.providers[self.job_classname] = job_provider - self.job_provider = job_provider - - provider = LMI_MethodResult( - self.method_result_classname, job_manager=self) - self.providers[self.method_result_classname] = provider - - provider = LMI_AffectedJobElement( - self.affected_classname, job_manager=self) - self.providers[self.affected_classname] = provider - - provider = LMI_OwningJobElement( - self.owning_classname, job_manager=self) - self.providers[self.owning_classname] = provider - - provider = LMI_AssociatedJobMethodResult( - self.owning_classname, job_manager=self) - self.providers[self.associated_result_classname] = provider - - return self.providers - - @cmpi_logging.trace_method - def add_job(self, job): - """ - Enqueue new job. Send indication when needed. - - :param job: (``Job``) A job to enqueue. - """ - cmpi_logging.logger.debug("Job %s: '%s' enqueued" - % (job.the_id, job.job_name)) - - self.jobs[job.the_id] = job - self.queue.put(job) - # send indication - if self.indication_manager.is_subscribed( - self.job_classname, self.IND_JOB_CREATED): - job_instance = self.get_job_instance(job) - self.indication_manager.send_instcreation( - job_instance, self.IND_JOB_CREATED) - - def send_modify_indications(self, prev_instance, current_instance, - indication_ids): - """ - Send InstModification. This is helper method called by ``Job`` when - needed. - - :param prev_instance: Instance of ``LMI_<name>Job`` before it was - modified. - :param current_instance: Instance of ``LMI_<name>Job`` after it was - modified. - """ - for _id in indication_ids: - self.indication_manager.send_instmodification(prev_instance, - current_instance, _id) - - @cmpi_logging.trace_method - def remove_job(self, job): - """ - Remove existing job. Note that jobs are removed automatically after a - timeout, providers should not call this method directly. - - :param job: (``Job``) Job to remove. - """ - cmpi_logging.logger.debug("Job %s: '%s' removed from queue." - % (job.the_id, job.job_name)) - del self.jobs[job.the_id] - # The job may still be in the queue! - # There is no way, how to remove it, it will be skipped by the - # worker thread. - - @cmpi_logging.trace_method - def get_job_for_instance_id(self, instance_id, classname=None): - """ - Return Job for given InstanceID or None when no such Job exist. - - :param instance_id: (``string``) InstanceID value to parse. - :param classname: (``string``) Optional classname to parse the - InstanceID (e.g. when parsing InstanceID of - ``LMI_<name>MethodResult``). - :rtype: ``Job`` - """ - if classname is None: - classname = self.job_classname - the_id = openlmi.common.parse_instance_id(instance_id, classname) - if not the_id.isdigit(): - return None - return self.jobs.get(the_id, None) - - @cmpi_logging.trace_method - def _worker_main(self): - """ - This is the main loop of the job queue. It just processes enqueued - jobs and never ends. - """ - cmpi_logging.logger.info("Started Job thread.") - while True: - command = self.queue.get() - if isinstance(command, Job): - # we need to protect from changes between checking state and - # setting new state - job = command - job.lock() - if job.job_state == Job.STATE_QUEUED: - # the job was not cancelled - job.change_state(Job.STATE_RUNNING) - job.unlock() - cmpi_logging.logger.info("Starting job %s: '%s'" % - (job.the_id, job.job_name)) - - job.execute() - if job.error: - cmpi_logging.logger.warn("Job %s: '%s' finished with error:" - " %s" % (job.the_id, job.job_name, str(job.error))) - else: - cmpi_logging.logger.info("Job %s: '%s' finished OK" % - (job.the_id, job.job_name)) - else: - # just skip suspended and terminated jobs - job.unlock() - - elif isinstance(command, int): - self.queue.task_done() - break - - self.queue.task_done() - - cmpi_logging.logger.info("Stopped Job thread.") - - @cmpi_logging.trace_method - def get_next_id(self): - """ - Return next unused job id. - - :rtype: string - """ - self.last_instance_id += 1 - return str(self.last_instance_id) - - @cmpi_logging.trace_method - def get_job_instance(self, job): - """ - Return CIMInstance for given job. - - :param job: (``Job``) - :rtype: ``CIMInstance`` - """ - path = pywbem.CIMInstanceName( - classname=self.job_classname, - keybindings={'InstanceID': job.get_instance_id()}, - host=socket.gethostname(), - namespace=self.namespace) - inst = pywbem.CIMInstance(classname=self.job_classname, path=path) - inst['InstanceID'] = job.get_instance_id() - return self.job_provider.get_instance(None, inst) - - @cmpi_logging.trace_method - def shutdown(self, timeout=1): - """ - Stop the thread. If a job is running, it may leave the job process - (mkfs, resize2fs, ...) and the worker thread (waiting for the process to - finish) still running. - - JobManager still needs Indication Manager and TimeManager working at - this point! - - :param timeout: Nr. of seconds to wait for the current job. Afterwards - the thread is abandoned, leaving the process still running. - """ - # Empty the queue, we don't want the worker to proceed with any other - # queued job. - while not self.queue.empty(): - queue.get(False) - queue.task_done() - - self.queue.put(self.COMMAND_STOP) - self.worker.join(timeout) - - # Cancel all running/suspended/queued jobs. - # This will send indications. - for job in self.jobs.itervalues(): - if job.state in (Job.STATE_QUEUED, Job.STATE_SUSPENDED, - Job.STATE_RUNNING): - job.cancel() - - if self.worker.isAlive(): - # There is no way, how to stop the thread in Python, so abandon it. - self.worker.daemon = True - self.indication_manager = None - self.timer_manager = None - - def can_shutdown(self): - """ - Return True, if there is no running Job. - """ - return self.queue.empty() - - -class LMI_ConcreteJob(CIMProvider2): - """ - Provider of LMI_ConcreteJob class or its subclass. - """ - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """ - Provider implementation of EnumerateInstances intrinsic method. - """ - model.path.update({'InstanceID': None}) - for job in self.job_manager.jobs.values(): - model['InstanceID'] = job.get_instance_id() - if keys_only: - yield model - else: - yield self.get_instance(env, model, job) - - @cmpi_logging.trace_method - def get_job_states(self, job): - """ - Return JobState and OperationalStatus property values. - - :param job: (``int``) Job.STATE_* value. - :rtype: tuple ``(JobState, OperationalStatus)`` values. - """ - if job.job_state == Job.STATE_QUEUED: - jobstate = self.Values.JobState.New - opstate = [self.Values.OperationalStatus.Dormant] - elif job.job_state == Job.STATE_RUNNING: - jobstate = self.Values.JobState.Running - opstate = [self.Values.OperationalStatus.OK] - elif job.job_state == Job.STATE_FINISHED_OK: - jobstate = self.Values.JobState.Completed - opstate = [self.Values.OperationalStatus.OK, - self.Values.OperationalStatus.Completed] - elif job.job_state == Job.STATE_SUSPENDED: - jobstate = self.Values.JobState.Suspended - opstate = [self.Values.OperationalStatus.OK] - elif job.job_state == Job.STATE_FAILED: - jobstate = self.Values.JobState.Exception - opstate = [self.Values.OperationalStatus.Error, - self.Values.OperationalStatus.Completed] - elif job.job_state == Job.STATE_TERMINATED: - jobstate = self.Values.JobState.Terminated - opstate = [self.Values.OperationalStatus.Stopped] - return jobstate, opstate - - @cmpi_logging.trace_method - # pylint: disable-msg=W0221 - def get_instance(self, env, model, job=None): - """ - Provider implementation of GetInstance intrinsic method. - """ - if not job: - instance_id = model['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - model['DeleteOnCompletion'] = job.delete_on_completion - model['Name'] = job.job_name - - # convert seconds to timedelta - seconds = job.time_before_removal - if seconds: - delta = timedelta(seconds=seconds) - model['TimeBeforeRemoval'] = pywbem.CIMDateTime(delta) - else: - model['TimeBeforeRemoval'] = pywbem.CIMProperty( - name='TimeBeforeRemoval', - value=None, - type='datetime') - - if job.clocktime_of_last_state_change: - model['TimeOfLastStateChange'] = pywbem.CIMDateTime( - job.clocktime_of_last_state_change) - else: - model['TimeOfLastStateChange'] = pywbem.CIMProperty( - name='TimeOfLastStateChange', - value=None, - type='datetime') - - if job.elapsed_time: - elapsed_time = timedelta(seconds=job.elapsed_time) - model['ElapsedTime'] = pywbem.CIMDateTime(elapsed_time) - else: - model['ElapsedTime'] = pywbem.CIMProperty( - name='ElapsedTime', - value=None, - type='datetime') - - model['Description'] = job.job_name - model['LocalOrUtcTime'] = self.Values.LocalOrUtcTime.UTC_Time - model['PercentComplete'] = pywbem.Uint16(job.percent_complete) - if job.start_clocktime: - model['StartTime'] = pywbem.CIMDateTime(job.start_clocktime) - else: - model['StartTime'] = pywbem.CIMProperty( - name='StartTime', - value=None, - type='datetime') - - if job.input_arguments: - model['JobInParameters'] = job.get_method_params( - "__JobInParameters", True, False) - - if job.job_state in Job.FINAL_STATES: - # assemble output parameters with return value - outparams = job.get_method_params("__JobOutParameters", False, True) - if job.return_value is not None: - outparams['__ReturnValue'] = job.return_value - model['JobOutParameters'] = outparams - - model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted) - # set correct state - jobstate, opstate = self.get_job_states(job) - model['JobState'] = jobstate - model['OperationalStatus'] = opstate - return model - - @cmpi_logging.trace_method - def set_instance(self, env, instance, modify_existing): - """Return a newly created or modified instance. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param instance: The new pywbem.CIMInstance. If modifying an existing - instance, the properties on this instance have been filtered by - the PropertyList from the request. - :param modify_existing: True if ModifyInstance, False if CreateInstance - - Return the new instance. The keys must be set on the new instance. - """ - if not modify_existing: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "Creation of Job instances is not supported.") - - job = self.job_manager.get_job_for_instance_id(instance['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - try: - job.lock() - restart_timer = False - - for (key, value) in instance.iteritems(): - if value is None: - continue - if key == 'DeleteOnCompletion': - job.delete_on_completion = value - restart_timer = True - elif key == 'TimeBeforeRemoval': - job.time_before_removal = value.total_seconds() - restart_timer = True - elif key == 'JobRunTimes': - if value != 1: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "JobRunTimes property is not supported.") - elif key == 'LocalOrUtcTime': - if value != self.Values.LocalOrUtcTime.UTC_Time: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "Setting of LocalOrUtcTime property is not" - " supported.") - else: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED, - "Setting of %s property is not supported." % (key,)) - - if restart_timer: - job._restart_timer() - finally: - job.unlock() - return instance - - @cmpi_logging.trace_method - def delete_instance(self, env, instance_name): - """Delete an instance. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param instance_name: A pywbem.CIMInstanceName specifying the instance - to delete. - """ - job = self.job_manager.get_job_for_instance_id( - instance_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - if not job.job_status in Job.FINAL_STATES: - raise pywbem.CIMError(pywbem.CIM_ERR_FAILED, - "Job has not finished.") - - self.job_manager.remove_job(job) - - @cmpi_logging.trace_method - def cim_method_geterrors(self, env, object_name): - """Implements LMI_StorageJob.GetErrors() - - If JobState is "Completed" and Operational Status is "Completed" - then no instance of CIM_Error is returned. - - If JobState is "Exception" then GetErrors may return intances of - CIM_Error related to the execution of the procedure or method invoked by - the job. - - If Operatational Status is not "OK" or "Completed" then - GetErrors may return CIM_Error instances related to the running of - the job. - - :param env: -- Provider Environment (pycimmb.ProviderEnvironment) - :param object_name: -- A pywbem.CIMInstanceName or pywbem.CIMCLassName - specifying the object on which the method GetErrors() - should be invoked. - - Output parameters: - - * Errors -- (type pywbem.CIMInstance(classname='CIM_Error', ...)) - If the OperationalStatus on the Job is not "OK", then this - method will return one or more CIM Error instance(s). - Otherwise, when the Job is "OK", null is returned. - """ - job = self.job_manager.get_job_for_instance_id( - object_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - if job.error is None: - errors = [] - else: - err = job.get_cim_error() - errors = [err, ] - out_params = [ - pywbem.CIMParameter( - name='Errors', - value=errors, - type='instance', - is_array=True, - array_size=len(errors)) - ] - rval = self.Values.GetErrors.Success - - return (rval, out_params) - - @cmpi_logging.trace_method - def cim_method_requeststatechange(self, env, object_name, - param_requestedstate=None, - param_timeoutperiod=None): - """Implements LMI_StorageJob.RequestStateChange() - - Requests that the state of the job be changed to the value - specified in the RequestedState parameter. Invoking the - RequestStateChange method multiple times could result in earlier - requests being overwritten or lost. - - If 0 is returned, then the - task completed successfully. Any other return code indicates an - error condition. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName - specifying the object on which the method RequestStateChange() - should be invoked. - :param param_requestedstate: The input parameter RequestedState (type pywbem.Uint16 self.Values.RequestStateChange.RequestedState) - RequestStateChange changes the state of a job. The possible - values are as follows: Start (2) changes the state to - \'Running\'. Suspend (3) stops the job temporarily. The - intention is to subsequently restart the job with \'Start\'. - It might be possible to enter the \'Service\' state while - suspended. (This is job-specific.) Terminate (4) stops the - job cleanly, saving data, preserving the state, and shutting - down all underlying processes in an orderly manner. Kill (5) - terminates the job immediately with no requirement to save - data or preserve the state. Service (6) puts the job into a - vendor-specific service state. It might be possible to restart - the job. - - :param param_timeoutperiod: -- The input parameter TimeoutPeriod (type pywbem.CIMDateTime) - A timeout period that specifies the maximum amount of time that - the client expects the transition to the new state to take. - The interval format must be used to specify the TimeoutPeriod. - A value of 0 or a null parameter indicates that the client has - no time requirements for the transition. If this property - does not contain 0 or null and the implementation does not - support this parameter, a return code of \'Use Of Timeout - Parameter Not Supported\' must be returned. - """ - job = self.job_manager.get_job_for_instance_id( - object_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - try: - job.lock() - states = self.Values.RequestStateChange.RequestedState - retcodes = self.Values.RequestStateChange - if param_requestedstate == states.Suspend: - if job.job_state != Job.STATE_QUEUED: - # Can suspend only queued jobs - rval = retcodes.Invalid_State_Transition - else: - job.change_state(Job.STATE_SUSPENDED) - rval = retcodes.Completed_with_No_Error - - elif param_requestedstate == states.Terminate: - if job.job_state not in (Job.STATE_QUEUED, Job.STATE_SUSPENDED): - # Can terminate only queued or suspended jobs - rval = retcodes.Invalid_State_Transition - else: - job.cancel() - rval = retcodes.Completed_with_No_Error - - elif param_requestedstate == states.Start: - if job.job_state != Job.STATE_SUSPENDED: - # Can start only suspended jobs - rval = retcodes.Invalid_State_Transition - else: - job.change_state(Job.STATE_QUEUED) - # Enqueue the job again, it may be already processed - # (we might get the job in the queue twice, but - # we have only one worker thread so it won't collide). - self.job_manager.add_job(job) - rval = retcodes.Completed_with_No_Error - - else: - rval = retcodes.Invalid_State_Transition - finally: - job.unlock() - return (rval, []) - - @cmpi_logging.trace_method - def cim_method_killjob(self, env, object_name, - param_deleteonkill=None): - """Implements LMI_StorageJob.KillJob() """ - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_SUPPORTED) - - @cmpi_logging.trace_method - def cim_method_geterror(self, env, object_name): - """Implements LMI_StorageJob.GetError() - - GetError is deprecated because Error should be an array,not a - scalar. - - When the job is executing or has terminated without - error, then this method returns no CIM_Error instance. However, if - the job has failed because of some internal problem or because the - job has been terminated by a client, then a CIM_Error instance is - returned. - - :param env: Provider Environment (pycimmb.ProviderEnvironment) - :param object_name: A pywbem.CIMInstanceName or pywbem.CIMCLassName - specifying the object on which the method GetError() - should be invoked. - - Output parameters: - - * Error -- (``pywbem.CIMInstance(classname='CIM_Error', ...)``) - If the OperationalStatus on the Job is not "OK", then this - method will return a CIM Error instance. Otherwise, when the - Job is "OK", null is returned. - """ - job = self.job_manager.get_job_for_instance_id( - object_name['InstanceID']) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - if job.error is None: - error = pywbem.CIMParameter( - name='error', - value=None, - type='instance', - is_array=False) - else: - err = job.get_cim_error() - error = pywbem.CIMParameter( - name='error', - value=err, - type='instance') - rval = self.Values.GetError.Success - return (rval, [error]) - - class Values(object): - class JobState(object): - New = pywbem.Uint16(2) - Starting = pywbem.Uint16(3) - Running = pywbem.Uint16(4) - Suspended = pywbem.Uint16(5) - Shutting_Down = pywbem.Uint16(6) - Completed = pywbem.Uint16(7) - Terminated = pywbem.Uint16(8) - Killed = pywbem.Uint16(9) - Exception = pywbem.Uint16(10) - Service = pywbem.Uint16(11) - Query_Pending = pywbem.Uint16(12) - # DMTF_Reserved = 13..32767 - # Vendor_Reserved = 32768..65535 - - class LocalOrUtcTime(object): - Local_Time = pywbem.Uint16(1) - UTC_Time = pywbem.Uint16(2) - - class OperationalStatus(object): - Unknown = pywbem.Uint16(0) - Other = pywbem.Uint16(1) - OK = pywbem.Uint16(2) - Degraded = pywbem.Uint16(3) - Stressed = pywbem.Uint16(4) - Predictive_Failure = pywbem.Uint16(5) - Error = pywbem.Uint16(6) - Non_Recoverable_Error = pywbem.Uint16(7) - Starting = pywbem.Uint16(8) - Stopping = pywbem.Uint16(9) - Stopped = pywbem.Uint16(10) - In_Service = pywbem.Uint16(11) - No_Contact = pywbem.Uint16(12) - Lost_Communication = pywbem.Uint16(13) - Aborted = pywbem.Uint16(14) - Dormant = pywbem.Uint16(15) - Supporting_Entity_in_Error = pywbem.Uint16(16) - Completed = pywbem.Uint16(17) - Power_Mode = pywbem.Uint16(18) - Relocating = pywbem.Uint16(19) - # DMTF_Reserved = .. - # Vendor_Reserved = 0x8000.. - - class GetErrors(object): - Success = pywbem.Uint32(0) - Not_Supported = pywbem.Uint32(1) - Unspecified_Error = pywbem.Uint32(2) - Timeout = pywbem.Uint32(3) - Failed = pywbem.Uint32(4) - Invalid_Parameter = pywbem.Uint32(5) - Access_Denied = pywbem.Uint32(6) - # DMTF_Reserved = .. - # Vendor_Specific = 32768..65535 - - class GetError(object): - Success = pywbem.Uint32(0) - Not_Supported = pywbem.Uint32(1) - Unspecified_Error = pywbem.Uint32(2) - Timeout = pywbem.Uint32(3) - Failed = pywbem.Uint32(4) - Invalid_Parameter = pywbem.Uint32(5) - Access_Denied = pywbem.Uint32(6) - # DMTF_Reserved = .. - # Vendor_Specific = 32768..65535 - - class RequestStateChange(object): - Completed_with_No_Error = pywbem.Uint32(0) - Not_Supported = pywbem.Uint32(1) - Unknown_Unspecified_Error = pywbem.Uint32(2) - Can_NOT_complete_within_Timeout_Period = pywbem.Uint32(3) - Failed = pywbem.Uint32(4) - Invalid_Parameter = pywbem.Uint32(5) - In_Use = pywbem.Uint32(6) - # DMTF_Reserved = .. - Method_Parameters_Checked___Transition_Started = pywbem.Uint32(4096) - Invalid_State_Transition = pywbem.Uint32(4097) - Use_of_Timeout_Parameter_Not_Supported = pywbem.Uint32(4098) - Busy = pywbem.Uint32(4099) - # Method_Reserved = 4100..32767 - # Vendor_Specific = 32768..65535 - class RequestedState(object): - Start = pywbem.Uint16(2) - Suspend = pywbem.Uint16(3) - Terminate = pywbem.Uint16(4) - Kill = pywbem.Uint16(5) - Service = pywbem.Uint16(6) - # DMTF_Reserved = 7..32767 - # Vendor_Reserved = 32768..65535 - -class LMI_OwningJobElement(CIMProvider2): - """ Instrumentation of LMI_OwningJobElement class and its subclasses.""" - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def get_instance(self, env, model): - """Return an instance.""" - instance_id = model['OwnedElement']['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "OwnedElement not found.") - - if job.owning_element != model['OwningElement']: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "OwnedElement is not associated to OwningElement.") - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'OwnedElement': None, 'OwningElement': None}) - for job in self.job_manager.jobs.values(): - if job.owning_element: - model['OwnedElement'] = job.get_name() - model['OwningElement'] = job.owning_element - yield model - - @cmpi_logging.trace_method - def references(self, env, object_name, model, result_class_name, role, - result_role, keys_only): - """Instrument Associations.""" - ch = env.get_cimom_handle() - if ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super='CIM_ManagedElement') or \ - ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.job_classname): - return self.simple_refs(env, object_name, model, - result_class_name, role, result_role, keys_only) - -class LMI_AffectedJobElement(CIMProvider2): - """ Instrumentation of LMI_AffectedJobElement class and its subclasses.""" - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def get_instance(self, env, model): - """Return an instance.""" - instance_id = model['AffectingElement']['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "AffectingElement not found.") - - if job.affected_elements is None: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "The AffectingElement has no AffectedElement.") - if model['AffectedElement'] not in job.affected_elements: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "AffectedElement is not associated to AffectingElement.") - model['ElementEffects'] = [self.Values.ElementEffects.Unknown, ] - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'AffectingElement': None, 'AffectedElement': None}) - for job in self.job_manager.jobs.values(): - if job.affected_elements is None: - continue - for element in job.affected_elements: - model['AffectingElement'] = job.get_name() - model['AffectedElement'] = element - if keys_only: - yield model - else: - yield self.get_instance(env, model) - - @cmpi_logging.trace_method - def references(self, env, object_name, model, result_class_name, role, - result_role, keys_only): - """Instrument Associations.""" - ch = env.get_cimom_handle() - if ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super='CIM_ManagedElement') or \ - ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.job_classname): - return self.simple_refs(env, object_name, model, - result_class_name, role, result_role, keys_only) - - class Values(object): - class ElementEffects(object): - Unknown = pywbem.Uint16(0) - Other = pywbem.Uint16(1) - Exclusive_Use = pywbem.Uint16(2) - Performance_Impact = pywbem.Uint16(3) - Element_Integrity = pywbem.Uint16(4) - Create = pywbem.Uint16(5) - - -class LMI_MethodResult(CIMProvider2): - """Instrumentation of LMI_MethodResult class and its subclasses.""" - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - # pylint: disable-msg=W0221 - def get_instance(self, env, model, job=None): - """Return an instance.""" - if not job: - instance_id = model['InstanceID'] - job = self.job_manager.get_job_for_instance_id( - instance_id, self.classname) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - model['Description'] = job.job_name - if job.job_state in Job.FINAL_STATES: - model['PostCallIndication'] = pywbem.CIMProperty( - name='PostCallIndication', - value=job.get_post_call()) - else: - model['PostCallIndication'] = pywbem.CIMProperty( - name='PostCallIndication', - type='instance', - value=None) - model['PreCallIndication'] = pywbem.CIMProperty( - name='PreCallIndication', - value=job.get_pre_call()) - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'InstanceID': None}) - for job in self.job_manager.jobs.values(): - model['InstanceID'] = job.get_instance_id( - classname=self.classname) - if keys_only: - yield model - else: - yield self.get_instance(env, model, job) - -class LMI_AssociatedJobMethodResult(CIMProvider2): - """ - Instrumentation of LMI_AssociatedJobMethodResult class and its - subclasses. - """ - - @cmpi_logging.trace_method - def __init__(self, classname, job_manager): - self.classname = classname - self.job_manager = job_manager - - @cmpi_logging.trace_method - def get_instance(self, env, model): - """Return an instance.""" - instance_id = model['Job']['InstanceID'] - job = self.job_manager.get_job_for_instance_id(instance_id) - if not job: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job not found.") - - expected_result_id = job.get_instance_id( - classname=self.job_manager.method_result_classname) - if model['JobParameters']['InstanceID'] != expected_result_id: - raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND, - "Job is not associated to JobParameters.") - return model - - @cmpi_logging.trace_method - def enum_instances(self, env, model, keys_only): - """Enumerate instances.""" - model.path.update({'JobParameters': None, 'Job': None}) - for job in self.job_manager.jobs.values(): - if job.owning_element: - model['Job'] = job.get_name() - model['JobParameters'] = pywbem.CIMInstanceName( - classname=self.job_manager.method_result_classname, - namespace=self.job_manager.namespace, - keybindings={ - 'InstanceID': job.get_instance_id( - classname=self.job_manager.method_result_classname) - }) - yield model - - @cmpi_logging.trace_method - def references(self, env, object_name, model, result_class_name, role, - result_role, keys_only): - """Instrument Associations.""" - ch = env.get_cimom_handle() - if ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.method_result_classname) or \ - ch.is_subclass(object_name.namespace, - sub=object_name.classname, - super=self.job_manager.job_classname): - return self.simple_refs(env, object_name, model, - result_class_name, role, result_role, keys_only) - |