summaryrefslogtreecommitdiffstats
path: root/src/python/openlmi/common/JobManager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/python/openlmi/common/JobManager.py')
-rw-r--r--src/python/openlmi/common/JobManager.py161
1 files changed, 103 insertions, 58 deletions
diff --git a/src/python/openlmi/common/JobManager.py b/src/python/openlmi/common/JobManager.py
index a923f29..bb6fed8 100644
--- a/src/python/openlmi/common/JobManager.py
+++ b/src/python/openlmi/common/JobManager.py
@@ -49,6 +49,7 @@ import pywbem
import openlmi.common.cmpi_logging as cmpi_logging
from pywbem.cim_provider2 import CIMProvider2
import socket
+import traceback
# Too many instance attributes
# pylint: disable-msg=R0902
@@ -208,7 +209,7 @@ class Job(object):
@cmpi_logging.trace_method
def finish_method(self, new_state, return_value=None, return_type=None,
- output_arguments=None, error=None):
+ output_arguments=None, error=None, affected_elements=None):
"""
Mark the job as finished, with given return value, output parameters and
error.
@@ -224,12 +225,18 @@ class Job(object):
any output parameters.
:param error: (``CIMError``) Error raised by the job. Can be None,
when the job finished successfully.
+ :param affected_elements: (``array of CIMInstanceName``) New list of
+ affected elements to generate LMI_<name>JobAffectedElement
+ association. If None, the old list, passed to constructor, remains
+ untouched.
"""
self.lock()
self.return_value = return_value
self.return_value_type = return_type
self.output_arguments = output_arguments
self.error = error
+ if affected_elements is not None:
+ self.affected_elements = affected_elements
self.change_state(new_state, 100)
self.unlock()
@@ -310,14 +317,13 @@ class Job(object):
self._restart_timer()
self.unlock()
- @cmpi_logging.trace_method
def _expire(self):
"""
Callback when a Job completes and time_before_removal second passed.
The job gets removed from its JobManager.
"""
- cmpi_logging.logger.debug("Got timeout for job %s: '%s', removing"
- " the job" % (self.the_id, self.job_name))
+ # We cannot log here, this method is executed in job's Timer thread,
+ # which is not registered at the cimom.
self.job_manager.remove_job(self)
@cmpi_logging.trace_method
@@ -370,8 +376,16 @@ class Job(object):
try:
self._execute(*(self._execargs), **(self._execkwargs))
except pywbem.CIMError, error:
+ cmpi_logging.logger.trace_warn("Job.execute caught an CIMError %s",
+ str(error))
+ cmpi_logging.logger.trace_verbose("traceback: %s",
+ traceback.format_exc())
self.finish_method(Job.STATE_FAILED, error=error)
except Exception, ex:
+ cmpi_logging.logger.trace_warn("Job.execute caught an Exception %s",
+ str(ex))
+ cmpi_logging.logger.trace_verbose("traceback: %s",
+ traceback.format_exc())
error = pywbem.CIMError(pywbem.CIM_ERR_FAILED, str(ex))
self.finish_method(Job.STATE_FAILED, error=error)
@@ -455,18 +469,14 @@ class Job(object):
namespace=self.job_manager.namespace)
inst = pywbem.CIMInstance(
classname="CIM_InstMethodCall",
- path=path,
- properties={
- 'MethodName' : self.method_name,
- 'MethodParameters' : pywbem.CIMProperty(
- name="MethodParameters",
- type='instance',
- value=self._get_method_params(False)),
- 'PreCall' : True,
- })
+ path=path)
src_instance = self._get_cim_instance()
inst['SourceInstance'] = src_instance
inst['SourceInstanceModelPath'] = str(src_instance.path)
+ inst['MethodName'] = self.method_name
+ inst['MethodParameters'] = self.get_method_params(
+ '__MethodParameters', True, False)
+ inst['PreCall'] = True
return inst
@cmpi_logging.trace_method
@@ -484,22 +494,31 @@ class Job(object):
namespace=self.job_manager.namespace)
inst = pywbem.CIMInstance(
classname="CIM_InstMethodCall",
- path=path,
- properties={
- 'MethodName' : self.method_name,
- 'MethodParameters' : self._get_method_params(True),
- 'PreCall' : False
- })
+ path=path)
+
src_instance = self._get_cim_instance()
inst['SourceInstance'] = src_instance
inst['SourceInstanceModelPath'] = str(src_instance.path)
+ inst['MethodName'] = self.method_name
+ inst['MethodParameters'] = self.get_method_params(
+ '__MethodParameters', True, True)
+ inst['PreCall'] = False
if self.return_value_type is not None:
inst['ReturnValueType'] = self.return_value_type
if self.return_value is not None:
- inst['ReturnValue'] = self.return_value
+ inst['ReturnValue'] = str(self.return_value)
if self.error is not None:
- inst['Error'] = self.error
+ path = pywbem.CIMInstanceName(
+ classname="CIM_Error",
+ host=socket.gethostname(),
+ namespace=self.job_manager.namespace)
+ err = pywbem.CIMInstance(
+ classname="CIM_Error",
+ path=path)
+ err['CIMStatusCode'] = pywbem.Uint32(self.error[0])
+ err['Message'] = self.error[1]
+ inst['Error'] = [err, ]
return inst
@cmpi_logging.trace_method
@@ -512,20 +531,31 @@ class Job(object):
return self.job_manager.get_job_instance(self)
@cmpi_logging.trace_method
- def _get_method_params(self, output=True):
+ def get_method_params(self, class_name, include_input, include_output):
"""
- Assemble __MethodParameters for CIM_InstMethodCall indication.
-
- :rtype: CIMInstance of __MethodParameters.
+ Create a class of given name with all input or output parameters
+ of the asynchronous method. Typically used to assemble
+ CIM_ConcreteJob.JobInParameters or CIM_InstMethodCall.MethodParameters
+ values.
+
+ :param class_name: (``string``) Name of the class to create.
+ :param input: (``boolean``) Whether input parameters should be
+ included in the returned class
+ :param output: (``boolean``) Whether output parameters should be
+ included in the returned class
+ :rtype: CIMInstance of the created class.
"""
+ # TODO: this is workaround for bug #920763, use class_name
+ # when it's fixed
+ clsname = "CIM_ManagedElement"
path = pywbem.CIMInstanceName(
- classname="__MethodParameters",
- namespace=self.job_manager.namespace,
- keybindings={})
- inst = pywbem.CIMInstance(classname="__MethodParameters", path=path)
- for (name, value) in self.input_arguments.iteritems():
- inst[name] = value
- if output:
+ classname=clsname,
+ namespace=self.job_manager.namespace)
+ inst = pywbem.CIMInstance(classname=clsname, path=path)
+ if include_input and self.input_arguments:
+ for (name, value) in self.input_arguments.iteritems():
+ inst[name] = value
+ if include_output and self.output_arguments:
# overwrite any input parameter
for (name, value) in self.output_arguments.iteritems():
inst[name] = value
@@ -599,11 +629,11 @@ class JobManager(object):
timeout.
"""
- IND_JOB_PERCENT_UPDATED = "JobPercentUpdated"
- IND_JOB_SUCCEEDED = "JobSucceeded"
- IND_JOB_FAILED = "JobFailed"
- IND_JOB_CHANGED = "JobChanged"
- IND_JOB_CREATED = "JobCreated"
+ IND_JOB_PERCENT_UPDATED = "PercentUpdated"
+ IND_JOB_SUCCEEDED = "Succeeded"
+ IND_JOB_FAILED = "Failed"
+ IND_JOB_CHANGED = "Changed"
+ IND_JOB_CREATED = "Created"
@cmpi_logging.trace_method
def __init__(self, name, namespace, indication_manager):
@@ -633,7 +663,7 @@ class JobManager(object):
# Start the worker thread (don't forget to register it at CIMOM)
self.worker = threading.Thread(target=self._worker_main)
- self.worker.daemon=True
+ self.worker.daemon = True
self.worker.start()
# Various classnames for job-related classes, with correct infixes.
@@ -656,7 +686,7 @@ class JobManager(object):
filters = {
self.IND_JOB_PERCENT_UPDATED: {
"Query" : "SELECT * FROM CIM_InstModification WHERE "
- "SourceInstance ISA CIM_ConcreteJob AND "
+ "SourceInstance ISA %(classname)s AND "
"SourceInstance.CIM_ConcreteJob::PercentComplete <> "
"PreviousInstance.CIM_ConcreteJob::PercentComplete",
"Description" : "Modification of Percentage Complete for a "
@@ -664,36 +694,35 @@ class JobManager(object):
},
self.IND_JOB_SUCCEEDED: {
"Query" : "SELECT * FROM CIM_InstModification WHERE "
- "SourceInstance ISA CIM_ConcreteJob AND ANY "
- "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 17 "
- "AND ANY "
- "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 2",
- "Description": "Modification of Operational Status for a "
- "Concrete Job to 'Complete' and 'OK'.",
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 17",
+ "Description": "Modification of Job State for a "
+ "Concrete Job to 'Complete'.",
},
self.IND_JOB_FAILED: {
"Query" : "SELECT * FROM CIM_InstModification WHERE "
- "SourceInstance ISA CIM_ConcreteJob AND ANY "
- "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 17 "
- "AND ANY "
- "SourceInstance.CIM_ConcreteJob::OperationalStatus[*] = 6",
- "Description": "Modification of Operational Status for a "
- "Concrete Job to 'Complete' and 'Error'.",
+ "SourceInstance ISA %(classname)s AND "
+ "SourceInstance.CIM_ConcreteJob::JobState = 10",
+ "Description": "Modification of Job State for a "
+ "Concrete Job to 'Exception'.",
},
self.IND_JOB_CHANGED: {
"Query" : "SELECT * FROM CIM_InstModification WHERE "
- "SourceInstance ISA CIM_ConcreteJob AND "
+ "SourceInstance ISA %(classname)s AND "
"SourceInstance.CIM_ConcreteJob::JobState <> "
"PreviousInstance.CIM_ConcreteJob::JobState",
"Description": "Modification of Job State for a ConcreteJob.",
},
self.IND_JOB_CREATED: {
"Query" : "SELECT * FROM CIM_InstCreation WHERE "
- "SourceInstance ISA CIM_ConcreteJob",
+ "SourceInstance ISA %(classname)s",
"Description": "Creation of a ConcreteJob.",
},
}
- self.indication_manager.add_filters(filters)
+ # add class name
+ for f in filters.itervalues():
+ f['Query'] = f['Query'] % {"classname" : self.job_classname }
+ self.indication_manager.add_filters(self.job_classname, filters)
@cmpi_logging.trace_method
def get_providers(self):
@@ -745,7 +774,8 @@ class JobManager(object):
self.jobs[job.the_id] = job
self.queue.put(job)
# send indication
- if self.indication_manager.is_subscribed(self.IND_JOB_CREATED):
+ if self.indication_manager.is_subscribed(
+ self.job_classname, self.IND_JOB_CREATED):
job_instance = self.get_job_instance(job)
self.indication_manager.send_instcreation(
job_instance, self.IND_JOB_CREATED)
@@ -765,7 +795,6 @@ class JobManager(object):
self.indication_manager.send_instmodification(prev_instance,
current_instance, _id)
- @cmpi_logging.trace_method
def remove_job(self, job):
"""
Remove existing job. Note that jobs are removed automatically after a
@@ -773,8 +802,8 @@ class JobManager(object):
:param job: (``Job``) Job to remove.
"""
- cmpi_logging.logger.debug("Removing job %s: '%s'"
- % (job.the_id, job.job_name))
+ # We cannot log here, this method is executed in job's Timer thread,
+ # which is not registered at the cimom.
del self.jobs[job.the_id]
# The job may still be in the queue!
# There is no way, how to remove it, it will be skipped by the
@@ -964,6 +993,17 @@ class LMI_ConcreteJob(CIMProvider2):
value=None,
type='datetime')
+ if job.input_arguments:
+ model['JobInParameters'] = job.get_method_params(
+ "__JobInParameters", True, False)
+
+ if job.job_state in Job.FINAL_STATES:
+ # assemble output parameters with return value
+ outparams = job.get_method_params("__JobOutParameters", False, True)
+ if job.return_value is not None:
+ outparams['__ReturnValue'] = job.return_value
+ model['JobOutParameters'] = outparams
+
model['TimeSubmitted'] = pywbem.CIMDateTime(job.time_submitted)
# set correct state
jobstate, opstate = self.get_job_states(job)
@@ -1383,6 +1423,9 @@ class LMI_AffectedJobElement(CIMProvider2):
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"AffectingElement not found.")
+ if job.affected_elements is None:
+ raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
+ "The AffectingElement has no AffectedElement.")
if model['AffectedElement'] not in job.affected_elements:
raise pywbem.CIMError(pywbem.CIM_ERR_NOT_FOUND,
"AffectedElement is not associated to AffectingElement.")
@@ -1394,6 +1437,8 @@ class LMI_AffectedJobElement(CIMProvider2):
"""Enumerate instances."""
model.path.update({'AffectingElement': None, 'AffectedElement': None})
for job in self.job_manager.jobs.values():
+ if job.affected_elements is None:
+ continue
for element in job.affected_elements:
model['AffectingElement'] = job.get_name()
model['AffectedElement'] = element