summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/yumdb/jobs.py
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-03-20 12:59:49 +0100
committerMichal Minar <miminar@redhat.com>2013-03-20 12:59:49 +0100
commiteed3cc9b4b2a78e238ece6dab185e7c76ab13142 (patch)
tree8df1d6b3a3a60e42a27882e3d8003de8cddfa92c /src/software/openlmi/software/yumdb/jobs.py
parent19ab3372fe708be3e937df5d1ed6945ac813b51c (diff)
downloadopenlmi-providers-eed3cc9b4b2a78e238ece6dab185e7c76ab13142.tar.gz
openlmi-providers-eed3cc9b4b2a78e238ece6dab185e7c76ab13142.tar.xz
openlmi-providers-eed3cc9b4b2a78e238ece6dab185e7c76ab13142.zip
added new providers, allowed for asynchronous jobs
new providers: LMI_SoftwareInstallationJob LMI_SoftwareInstallationService
Diffstat (limited to 'src/software/openlmi/software/yumdb/jobs.py')
-rw-r--r--src/software/openlmi/software/yumdb/jobs.py343
1 files changed, 320 insertions, 23 deletions
diff --git a/src/software/openlmi/software/yumdb/jobs.py b/src/software/openlmi/software/yumdb/jobs.py
index 437f123..cb36f43 100644
--- a/src/software/openlmi/software/yumdb/jobs.py
+++ b/src/software/openlmi/software/yumdb/jobs.py
@@ -23,25 +23,45 @@
Define job classes representing kinds of jobs of worker process.
"""
+import os
import threading
+import time
+import yum
from openlmi.software import util
+from openlmi.software.yumdb import errors
from openlmi.software.yumdb.packageinfo import PackageInfo
from openlmi.software.yumdb.repository import Repository
+DEFAULT_JOB_PRIORITY = 10
+# in seconds
+DEFAULT_TIME_BEFORE_REMOVAL = 60 * 5
+
class YumJob(object): #pylint: disable=R0903
"""
Base class for any job, that is processable by YumWorker process.
- It contains at least a jobid attribute, that must be unique for
+ It contains jobid attribute, that must be unique for
each job, it's counted from zero a incremented after each creation.
+
+ metadata attribute typically contain:
+ name - name of job, that is modifiable by user
+ method_name - name of provider's method, that lead to creation of job
"""
- __slots__ = ('jobid', )
+ __slots__ = ( 'jobid', 'created', 'started', 'finished', 'last_change'
+ , 'priority', 'result', 'result_data')
# jobs can be created concurrently from multiple threads, that's
# why we need to make its creation thread safe
_JOB_ID_LOCK = threading.Lock()
_JOB_ID = 0
+ # job state enumeration
+ NEW, RUNNING, COMPLETED, TERMINATED, EXCEPTION = range(5)
+ # job result enumeration
+ RESULT_SUCCESS, RESULT_TERMINATED, RESULT_ERROR = range(3)
+
+ ResultNames = ("success", "terminated", "error")
+
@staticmethod
def _get_job_id():
"""
@@ -54,35 +74,261 @@ class YumJob(object): #pylint: disable=R0903
YumJob._JOB_ID += 1
return val
- def __init__(self):
+ @classmethod
+ def handle_ignore_job_props(cls):
+ """
+ @return set of job properties, that does not count as job's handler
+ arguments - job handler does not care fore metadata, jobid, priority,
+ etc...
+ """
+ return set(YumJob.__slots__)
+
+ def __init__(self, priority=10):
+ if not isinstance(priority, (int, long)):
+ raise TypeError("priority must be integer")
self.jobid = self._get_job_id()
+ self.started = None
+ self.finished = None
+ self.priority = priority
+ self.created = time.time()
+ self.last_change = self.created
+ self.result = None
+ self.result_data = None
+
+ @property
+ def state(self):
+ """
+ @return integer representing job's state
+ """
+ if not self.started:
+ return self.NEW
+ if not self.finished:
+ return self.RUNNING
+ if self.result == self.RESULT_ERROR:
+ return self.EXCEPTION
+ if self.result == self.RESULT_TERMINATED:
+ return self.TERMINATED
+ return self.COMPLETED
@property
def job_kwargs(self):
"""
- Jobs are in worker represented be methods with arguments.
- Those can be obtained from job by calling this property.
+ Jobs are in worker handled in handlers specific for each subclass.
+ These handlers are methods of worker. They accepts concrete arguments
+ that can be obtained from job by invoking this property.
@return dictionary of keyword arguments of job
"""
kwargs = {}
cls = self.__class__
while not cls in (YumJob, object):
for slot in cls.__slots__:
- if not slot in kwargs:
+ if ( not slot in kwargs
+ and not slot in cls.handle_ignore_job_props()):
kwargs[slot] = getattr(self, slot)
cls = cls.__bases__[0]
- kwargs.pop('jobid', None)
+ for prop in YumJob.__slots__:
+ kwargs.pop(prop, None)
return kwargs
+ def start(self):
+ """Modify the state of job to RUNNING."""
+ if self.started:
+ raise errors.InvalidJobState("can not start already started job")
+ self.started = time.time()
+ self.last_change = self.started
+
+ def finish(self, result, data=None):
+ """
+ Modify the state of job to one of {COMPLETED, EXCEPTION, TERMINATED}.
+ Depending on result parameter.
+ """
+ if not self.started and result != self.RESULT_TERMINATED:
+ raise errors.InvalidJobState("can not finish not started job")
+ self.finished = time.time()
+ if result == self.RESULT_TERMINATED:
+ self.started = self.finished
+ self.result = result
+ self.result_data = data
+ self.last_change = self.finished
+
+ def update(self, **kwargs):
+ """Change job's properties."""
+ change = False
+ for key, value in kwargs.items():
+ if getattr(self, key) != value:
+ setattr(self, key, value)
+ change = True
+ if change is True:
+ self.last_change = time.time()
+
+ def __eq__(self, other):
+ return self.__class__ is other.__class__ and self.jobid == other.jobid
+
+ def __ne__(self, other):
+ return ( self.__class__ is not other.__class__
+ or self.jobid != other.jobid)
+
+ def __lt__(self, other):
+ """
+ JobControl jobs have the highest priority.
+ """
+ return ( ( isinstance(self, YumJobControl)
+ and not isinstance(other, YumJobControl))
+ or ( self.priority < other.priority
+ or ( self.priority == other.priority
+ and ( self.jobid < other.jobid
+ or ( self.jobid == other.jobid
+ and (self.created < other.created))))))
+
+ def __cmp__(self, other):
+ if ( isinstance(self, YumJobControl)
+ and not isinstance(other, YumJobControl)):
+ return -1
+ if ( not isinstance(self, YumJobControl)
+ and isinstance(other, YumJobControl)):
+ return 1
+ if self.priority < other.priority:
+ return -1
+ if self.priority > other.priority:
+ return 1
+ if self.jobid < other.jobid:
+ return -1
+ if self.jobid > other.jobid:
+ return 1
+ if self.created < other.created:
+ return -1
+ if self.created > other.created:
+ return 1
+ return 0
+
+ def __str__(self):
+ return "%s(id=%d,p=%d)" % (
+ self.__class__.__name__, self.jobid, self.priority)
+
def __getstate__(self):
ret = self.job_kwargs
- ret.update(jobid=self.jobid)
+ for prop in self.handle_ignore_job_props():
+ ret[prop] = getattr(self, prop)
return ret
def __setstate__(self, state):
for k, value in state.items():
setattr(self, k, value)
+class YumAsyncJob(YumJob): #pylint: disable=R0903
+ """
+ Base class for jobs, that support asynchronnous execution.
+ No reply is sent upon job completition or error. The results are
+ kept on server.
+ """
+ __slots__ = ( 'async'
+ , 'delete_on_completion'
+ , 'time_before_removal'
+ , 'metadata')
+
+ @classmethod
+ def handle_ignore_job_props(cls):
+ return YumJob.handle_ignore_job_props().union(YumAsyncJob.__slots__)
+
+ def __init__(self, priority=10, async=False, metadata=None):
+ YumJob.__init__(self, priority)
+ self.async = bool(async)
+ self.delete_on_completion = True
+ self.time_before_removal = DEFAULT_TIME_BEFORE_REMOVAL
+ if metadata is None and self.async is True:
+ metadata = {}
+ self.metadata = metadata
+
+ def __str__(self):
+ return "%s(id=%d,p=%d%s%s)" % (
+ self.__class__.__name__, self.jobid,
+ self.priority,
+ ',async' if self.async else '',
+ (',name="%s"'%self.metadata['name'])
+ if self.metadata and 'name' in self.metadata else '')
+
+ def update(self, **kwargs):
+ if 'metadata' in kwargs:
+ self.metadata.update(kwargs.pop('metadata'))
+ return YumJob.update(self, **kwargs)
+
+# *****************************************************************************
+# Job control funtions
+# *****************************************************************************
+class YumJobControl(YumJob): #pylint: disable=R0903
+ """Base class for any job used for asynchronous jobs management."""
+ pass
+
+class YumJobGetList(YumJobControl): #pylint: disable=R0903
+ """Request for obtaining list of all asynchronous jobs."""
+ pass
+
+class YumJobOnJob(YumJobControl):
+ """
+ Base class for any control job acting upon particular asynchronous job.
+ """
+ __slots__ = ('target', )
+ def __init__(self, target):
+ YumJobControl.__init__(self)
+ if not isinstance(target, (int, long)):
+ raise TypeError("target must be an integer")
+ self.target = target
+
+class YumJobGet(YumJobOnJob): #pylint: disable=R0903
+ """Get job object by its id."""
+ pass
+
+class YumJobGetByName(YumJobOnJob): #pylint: disable=R0903
+ """Get job object by its name property."""
+ def __init__(self, name):
+ YumJobOnJob.__init__(self, -1)
+ self.target = name
+
+class YumJobSetPriority(YumJobOnJob): #pylint: disable=R0903
+ """Change priority of job."""
+ __slots__ = ('new_priority', )
+
+ def __init__(self, target, priority):
+ YumJobOnJob.__init__(self, target)
+ self.new_priority = priority
+
+class YumJobUpdate(YumJobOnJob): #pylint: disable=R0903
+ """Update job's metadata."""
+ __slots__ = ('data', )
+ FORBIDDEN_PROPERTIES = (
+ 'async', 'jobid', 'created', 'started', 'priority', 'finished',
+ 'delete_on_completion', 'time_before_removal', 'last_change')
+
+ def __init__(self, target, **kwargs):
+ YumJobOnJob.__init__(self, target)
+ assert not set.intersection(
+ set(YumJobUpdate.FORBIDDEN_PROPERTIES), set(kwargs))
+ self.data = kwargs
+
+class YumJobReschedule(YumJobOnJob): #pylint: disable=R0903
+ """Change the schedule of job's deletion."""
+ __slots__ = ('delete_on_completion', 'time_before_removal')
+ def __init__(self, target, delete_on_completion, time_before_removal):
+ YumJobOnJob.__init__(self, target)
+ if not isinstance(time_before_removal, (int, long, float)):
+ raise TypeError("time_before_removal must be float")
+ self.delete_on_completion = bool(delete_on_completion)
+ self.time_before_removal = time_before_removal
+
+class YumJobDelete(YumJobOnJob): #pylint: disable=R0903
+ """Delete job - can only be called on finished job."""
+ pass
+
+class YumJobTerminate(YumJobOnJob): #pylint: disable=R0903
+ """
+ Can only be called on not yet started job.
+ Running job can not be terminated.
+ """
+ pass
+
+# *****************************************************************************
+# Yum API functions
+# *****************************************************************************
class YumBeginSession(YumJob): #pylint: disable=R0903
"""
Begin session on YumWorker which ensures that yum database is locked
@@ -195,27 +441,42 @@ class YumFilterPackages(YumGetPackageList): #pylint: disable=R0903
self.envra = envra
self.repoid = repoid
-class YumSpecificPackageJob(YumJob): #pylint: disable=R0903
+class YumSpecificPackageJob(YumAsyncJob): #pylint: disable=R0903
"""
- Abstract job taking instance of yumdb.PackageInfo as argument.
+ Abstract job taking instance of yumdb.PackageInfo as argument or
+ package's nevra.
Arguments:
- pkg - plays different role depending on job subclass
+ pkg - plays different role depending on job subclass;
+ can also be a nevra
"""
__slots__ = ('pkg', )
- def __init__(self, pkg):
- if not isinstance(pkg, PackageInfo):
- raise TypeError("pkg must be instance of PackageInfo")
- YumJob.__init__(self)
+ def __init__(self, pkg, async=False, metadata=None):
+ if isinstance(pkg, basestring):
+ if not util.RE_NEVRA_OPT_EPOCH.match(pkg):
+ raise errors.InvalidNevra('not a valid nevra "%s"' % pkg)
+ elif not isinstance(pkg, PackageInfo):
+ raise TypeError("pkg must be either string or instance"
+ " of PackageInfo")
+ YumAsyncJob.__init__(self, async=async, metadata=metadata)
self.pkg = pkg
class YumInstallPackage(YumSpecificPackageJob): #pylint: disable=R0903
"""
Job requesting installation of specific package.
pkg argument should be available.
+ Arguments:
+ pkg - same as in YumSpecificPackageJob
+ force is a boolean saying:
+ True -> reinstall the package if it's already installed
+ False -> fail if the package is already installed
Worker replies with new instance of package.
"""
- pass
+ __slots__ = ('force', )
+ def __init__(self, pkg, async=False, force=False, metadata=None):
+ YumSpecificPackageJob.__init__(
+ self, pkg, async=async, metadata=metadata)
+ self.force = bool(force)
class YumRemovePackage(YumSpecificPackageJob): #pylint: disable=R0903
"""
@@ -240,21 +501,27 @@ class YumUpdatePackage(YumSpecificPackageJob): #pylint: disable=R0903
candidate packages to ones with specific evr.
Arguments:
to_epoch, to_version, to_release
+ force is a boolean, that has meaning only when update_only is False:
+ True -> reinstall the package if it's already installed
+ False -> fail if the package is already installed
The arguments more given, the more complete filter of candidates.
Worker replies with new instance of package.
"""
- __slots__ = ('to_epoch', 'to_version', 'to_release')
+ __slots__ = ('to_epoch', 'to_version', 'to_release', 'force')
- def __init__(self, pkg,
- to_epoch=None, to_version=None, to_release=None):
+ def __init__(self, pkg, async=False,
+ to_epoch=None, to_version=None, to_release=None, force=False,
+ metadata=None):
if not isinstance(pkg, PackageInfo):
raise TypeError("pkg must be instance of yumdb.PackageInfo")
- YumSpecificPackageJob.__init__(self, pkg)
+ YumSpecificPackageJob.__init__(
+ self, pkg, async=async, metadata=metadata)
self.to_epoch = to_epoch
self.to_version = to_version
self.to_release = to_release
+ self.force = bool(force)
class YumCheckPackage(YumSpecificPackageJob): #pylint: disable=R0903
"""
@@ -262,11 +529,41 @@ class YumCheckPackage(YumSpecificPackageJob): #pylint: disable=R0903
Worker replies with new instance of yumdb.PackageCheck.
"""
- def __init__(self, pkg):
- YumSpecificPackageJob.__init__(self, pkg)
+ def __init__(self, pkg, async=False, metadata=None):
+ YumSpecificPackageJob.__init__(self, pkg, async=async,
+ metadata=metadata)
if not pkg.installed:
raise ValueError("package must be installed to check it")
+class YumInstallPackageFromURI(YumAsyncJob): #pylint: disable=R0903
+ """
+ Job requesting installation of specific package from URI.
+ Arguments:
+ uri is either a path to rpm package on local filesystem or url
+ of rpm stored on remote host
+ update_only is a boolean:
+ True -> install the package only if the older version is installed
+ False -> install the package if it's not already installed
+ force is a boolean, that has meaning only when update_only is False:
+ True -> reinstall the package if it's already installed
+ False -> fail if the package is already installed
+
+ Worker replies with new instance of package.
+ """
+ __slots__ = ('uri', 'update_only', "force")
+ def __init__(self, uri, async=False, update_only=False, force=False,
+ metadata=None):
+ if not isinstance(uri, basestring):
+ raise TypeError("uri must be a string")
+ if uri.startswith('file://'):
+ uri = uri[len('file://'):]
+ if not yum.misc.re_remote_url(uri) and not os.path.exists(uri):
+ raise errors.InvalidURI(uri)
+ YumAsyncJob.__init__(self, async=async, metadata=metadata)
+ self.uri = uri
+ self.update_only = bool(update_only)
+ self.force = bool(force)
+
class YumGetRepositoryList(YumJob): #pylint: disable=R0903
"""
Job requesing a list of repositories.
@@ -288,7 +585,7 @@ class YumGetRepositoryList(YumJob): #pylint: disable=R0903
", ".join(self.SUPPORTED_KINDS))
self.kind = kind
-class YumFilterRepositories(YumGetRepositoryList):
+class YumFilterRepositories(YumGetRepositoryList): #pylint: disable=R0903
"""
Job similar to YumGetRepositoryList, but allowing to specify
filter on packages.