From eed3cc9b4b2a78e238ece6dab185e7c76ab13142 Mon Sep 17 00:00:00 2001 From: Michal Minar Date: Wed, 20 Mar 2013 12:59:49 +0100 Subject: added new providers, allowed for asynchronous jobs new providers: LMI_SoftwareInstallationJob LMI_SoftwareInstallationService --- src/software/openlmi/software/yumdb/jobs.py | 343 ++++++++++++++++++++++++++-- 1 file changed, 320 insertions(+), 23 deletions(-) (limited to 'src/software/openlmi/software/yumdb/jobs.py') diff --git a/src/software/openlmi/software/yumdb/jobs.py b/src/software/openlmi/software/yumdb/jobs.py index 437f123..cb36f43 100644 --- a/src/software/openlmi/software/yumdb/jobs.py +++ b/src/software/openlmi/software/yumdb/jobs.py @@ -23,25 +23,45 @@ Define job classes representing kinds of jobs of worker process. """ +import os import threading +import time +import yum from openlmi.software import util +from openlmi.software.yumdb import errors from openlmi.software.yumdb.packageinfo import PackageInfo from openlmi.software.yumdb.repository import Repository +DEFAULT_JOB_PRIORITY = 10 +# in seconds +DEFAULT_TIME_BEFORE_REMOVAL = 60 * 5 + class YumJob(object): #pylint: disable=R0903 """ Base class for any job, that is processable by YumWorker process. - It contains at least a jobid attribute, that must be unique for + It contains jobid attribute, that must be unique for each job, it's counted from zero a incremented after each creation. + + metadata attribute typically contain: + name - name of job, that is modifiable by user + method_name - name of provider's method, that lead to creation of job """ - __slots__ = ('jobid', ) + __slots__ = ( 'jobid', 'created', 'started', 'finished', 'last_change' + , 'priority', 'result', 'result_data') # jobs can be created concurrently from multiple threads, that's # why we need to make its creation thread safe _JOB_ID_LOCK = threading.Lock() _JOB_ID = 0 + # job state enumeration + NEW, RUNNING, COMPLETED, TERMINATED, EXCEPTION = range(5) + # job result enumeration + RESULT_SUCCESS, RESULT_TERMINATED, RESULT_ERROR = range(3) + + ResultNames = ("success", "terminated", "error") + @staticmethod def _get_job_id(): """ @@ -54,35 +74,261 @@ class YumJob(object): #pylint: disable=R0903 YumJob._JOB_ID += 1 return val - def __init__(self): + @classmethod + def handle_ignore_job_props(cls): + """ + @return set of job properties, that does not count as job's handler + arguments - job handler does not care fore metadata, jobid, priority, + etc... + """ + return set(YumJob.__slots__) + + def __init__(self, priority=10): + if not isinstance(priority, (int, long)): + raise TypeError("priority must be integer") self.jobid = self._get_job_id() + self.started = None + self.finished = None + self.priority = priority + self.created = time.time() + self.last_change = self.created + self.result = None + self.result_data = None + + @property + def state(self): + """ + @return integer representing job's state + """ + if not self.started: + return self.NEW + if not self.finished: + return self.RUNNING + if self.result == self.RESULT_ERROR: + return self.EXCEPTION + if self.result == self.RESULT_TERMINATED: + return self.TERMINATED + return self.COMPLETED @property def job_kwargs(self): """ - Jobs are in worker represented be methods with arguments. - Those can be obtained from job by calling this property. + Jobs are in worker handled in handlers specific for each subclass. + These handlers are methods of worker. They accepts concrete arguments + that can be obtained from job by invoking this property. @return dictionary of keyword arguments of job """ kwargs = {} cls = self.__class__ while not cls in (YumJob, object): for slot in cls.__slots__: - if not slot in kwargs: + if ( not slot in kwargs + and not slot in cls.handle_ignore_job_props()): kwargs[slot] = getattr(self, slot) cls = cls.__bases__[0] - kwargs.pop('jobid', None) + for prop in YumJob.__slots__: + kwargs.pop(prop, None) return kwargs + def start(self): + """Modify the state of job to RUNNING.""" + if self.started: + raise errors.InvalidJobState("can not start already started job") + self.started = time.time() + self.last_change = self.started + + def finish(self, result, data=None): + """ + Modify the state of job to one of {COMPLETED, EXCEPTION, TERMINATED}. + Depending on result parameter. + """ + if not self.started and result != self.RESULT_TERMINATED: + raise errors.InvalidJobState("can not finish not started job") + self.finished = time.time() + if result == self.RESULT_TERMINATED: + self.started = self.finished + self.result = result + self.result_data = data + self.last_change = self.finished + + def update(self, **kwargs): + """Change job's properties.""" + change = False + for key, value in kwargs.items(): + if getattr(self, key) != value: + setattr(self, key, value) + change = True + if change is True: + self.last_change = time.time() + + def __eq__(self, other): + return self.__class__ is other.__class__ and self.jobid == other.jobid + + def __ne__(self, other): + return ( self.__class__ is not other.__class__ + or self.jobid != other.jobid) + + def __lt__(self, other): + """ + JobControl jobs have the highest priority. + """ + return ( ( isinstance(self, YumJobControl) + and not isinstance(other, YumJobControl)) + or ( self.priority < other.priority + or ( self.priority == other.priority + and ( self.jobid < other.jobid + or ( self.jobid == other.jobid + and (self.created < other.created)))))) + + def __cmp__(self, other): + if ( isinstance(self, YumJobControl) + and not isinstance(other, YumJobControl)): + return -1 + if ( not isinstance(self, YumJobControl) + and isinstance(other, YumJobControl)): + return 1 + if self.priority < other.priority: + return -1 + if self.priority > other.priority: + return 1 + if self.jobid < other.jobid: + return -1 + if self.jobid > other.jobid: + return 1 + if self.created < other.created: + return -1 + if self.created > other.created: + return 1 + return 0 + + def __str__(self): + return "%s(id=%d,p=%d)" % ( + self.__class__.__name__, self.jobid, self.priority) + def __getstate__(self): ret = self.job_kwargs - ret.update(jobid=self.jobid) + for prop in self.handle_ignore_job_props(): + ret[prop] = getattr(self, prop) return ret def __setstate__(self, state): for k, value in state.items(): setattr(self, k, value) +class YumAsyncJob(YumJob): #pylint: disable=R0903 + """ + Base class for jobs, that support asynchronnous execution. + No reply is sent upon job completition or error. The results are + kept on server. + """ + __slots__ = ( 'async' + , 'delete_on_completion' + , 'time_before_removal' + , 'metadata') + + @classmethod + def handle_ignore_job_props(cls): + return YumJob.handle_ignore_job_props().union(YumAsyncJob.__slots__) + + def __init__(self, priority=10, async=False, metadata=None): + YumJob.__init__(self, priority) + self.async = bool(async) + self.delete_on_completion = True + self.time_before_removal = DEFAULT_TIME_BEFORE_REMOVAL + if metadata is None and self.async is True: + metadata = {} + self.metadata = metadata + + def __str__(self): + return "%s(id=%d,p=%d%s%s)" % ( + self.__class__.__name__, self.jobid, + self.priority, + ',async' if self.async else '', + (',name="%s"'%self.metadata['name']) + if self.metadata and 'name' in self.metadata else '') + + def update(self, **kwargs): + if 'metadata' in kwargs: + self.metadata.update(kwargs.pop('metadata')) + return YumJob.update(self, **kwargs) + +# ***************************************************************************** +# Job control funtions +# ***************************************************************************** +class YumJobControl(YumJob): #pylint: disable=R0903 + """Base class for any job used for asynchronous jobs management.""" + pass + +class YumJobGetList(YumJobControl): #pylint: disable=R0903 + """Request for obtaining list of all asynchronous jobs.""" + pass + +class YumJobOnJob(YumJobControl): + """ + Base class for any control job acting upon particular asynchronous job. + """ + __slots__ = ('target', ) + def __init__(self, target): + YumJobControl.__init__(self) + if not isinstance(target, (int, long)): + raise TypeError("target must be an integer") + self.target = target + +class YumJobGet(YumJobOnJob): #pylint: disable=R0903 + """Get job object by its id.""" + pass + +class YumJobGetByName(YumJobOnJob): #pylint: disable=R0903 + """Get job object by its name property.""" + def __init__(self, name): + YumJobOnJob.__init__(self, -1) + self.target = name + +class YumJobSetPriority(YumJobOnJob): #pylint: disable=R0903 + """Change priority of job.""" + __slots__ = ('new_priority', ) + + def __init__(self, target, priority): + YumJobOnJob.__init__(self, target) + self.new_priority = priority + +class YumJobUpdate(YumJobOnJob): #pylint: disable=R0903 + """Update job's metadata.""" + __slots__ = ('data', ) + FORBIDDEN_PROPERTIES = ( + 'async', 'jobid', 'created', 'started', 'priority', 'finished', + 'delete_on_completion', 'time_before_removal', 'last_change') + + def __init__(self, target, **kwargs): + YumJobOnJob.__init__(self, target) + assert not set.intersection( + set(YumJobUpdate.FORBIDDEN_PROPERTIES), set(kwargs)) + self.data = kwargs + +class YumJobReschedule(YumJobOnJob): #pylint: disable=R0903 + """Change the schedule of job's deletion.""" + __slots__ = ('delete_on_completion', 'time_before_removal') + def __init__(self, target, delete_on_completion, time_before_removal): + YumJobOnJob.__init__(self, target) + if not isinstance(time_before_removal, (int, long, float)): + raise TypeError("time_before_removal must be float") + self.delete_on_completion = bool(delete_on_completion) + self.time_before_removal = time_before_removal + +class YumJobDelete(YumJobOnJob): #pylint: disable=R0903 + """Delete job - can only be called on finished job.""" + pass + +class YumJobTerminate(YumJobOnJob): #pylint: disable=R0903 + """ + Can only be called on not yet started job. + Running job can not be terminated. + """ + pass + +# ***************************************************************************** +# Yum API functions +# ***************************************************************************** class YumBeginSession(YumJob): #pylint: disable=R0903 """ Begin session on YumWorker which ensures that yum database is locked @@ -195,27 +441,42 @@ class YumFilterPackages(YumGetPackageList): #pylint: disable=R0903 self.envra = envra self.repoid = repoid -class YumSpecificPackageJob(YumJob): #pylint: disable=R0903 +class YumSpecificPackageJob(YumAsyncJob): #pylint: disable=R0903 """ - Abstract job taking instance of yumdb.PackageInfo as argument. + Abstract job taking instance of yumdb.PackageInfo as argument or + package's nevra. Arguments: - pkg - plays different role depending on job subclass + pkg - plays different role depending on job subclass; + can also be a nevra """ __slots__ = ('pkg', ) - def __init__(self, pkg): - if not isinstance(pkg, PackageInfo): - raise TypeError("pkg must be instance of PackageInfo") - YumJob.__init__(self) + def __init__(self, pkg, async=False, metadata=None): + if isinstance(pkg, basestring): + if not util.RE_NEVRA_OPT_EPOCH.match(pkg): + raise errors.InvalidNevra('not a valid nevra "%s"' % pkg) + elif not isinstance(pkg, PackageInfo): + raise TypeError("pkg must be either string or instance" + " of PackageInfo") + YumAsyncJob.__init__(self, async=async, metadata=metadata) self.pkg = pkg class YumInstallPackage(YumSpecificPackageJob): #pylint: disable=R0903 """ Job requesting installation of specific package. pkg argument should be available. + Arguments: + pkg - same as in YumSpecificPackageJob + force is a boolean saying: + True -> reinstall the package if it's already installed + False -> fail if the package is already installed Worker replies with new instance of package. """ - pass + __slots__ = ('force', ) + def __init__(self, pkg, async=False, force=False, metadata=None): + YumSpecificPackageJob.__init__( + self, pkg, async=async, metadata=metadata) + self.force = bool(force) class YumRemovePackage(YumSpecificPackageJob): #pylint: disable=R0903 """ @@ -240,21 +501,27 @@ class YumUpdatePackage(YumSpecificPackageJob): #pylint: disable=R0903 candidate packages to ones with specific evr. Arguments: to_epoch, to_version, to_release + force is a boolean, that has meaning only when update_only is False: + True -> reinstall the package if it's already installed + False -> fail if the package is already installed The arguments more given, the more complete filter of candidates. Worker replies with new instance of package. """ - __slots__ = ('to_epoch', 'to_version', 'to_release') + __slots__ = ('to_epoch', 'to_version', 'to_release', 'force') - def __init__(self, pkg, - to_epoch=None, to_version=None, to_release=None): + def __init__(self, pkg, async=False, + to_epoch=None, to_version=None, to_release=None, force=False, + metadata=None): if not isinstance(pkg, PackageInfo): raise TypeError("pkg must be instance of yumdb.PackageInfo") - YumSpecificPackageJob.__init__(self, pkg) + YumSpecificPackageJob.__init__( + self, pkg, async=async, metadata=metadata) self.to_epoch = to_epoch self.to_version = to_version self.to_release = to_release + self.force = bool(force) class YumCheckPackage(YumSpecificPackageJob): #pylint: disable=R0903 """ @@ -262,11 +529,41 @@ class YumCheckPackage(YumSpecificPackageJob): #pylint: disable=R0903 Worker replies with new instance of yumdb.PackageCheck. """ - def __init__(self, pkg): - YumSpecificPackageJob.__init__(self, pkg) + def __init__(self, pkg, async=False, metadata=None): + YumSpecificPackageJob.__init__(self, pkg, async=async, + metadata=metadata) if not pkg.installed: raise ValueError("package must be installed to check it") +class YumInstallPackageFromURI(YumAsyncJob): #pylint: disable=R0903 + """ + Job requesting installation of specific package from URI. + Arguments: + uri is either a path to rpm package on local filesystem or url + of rpm stored on remote host + update_only is a boolean: + True -> install the package only if the older version is installed + False -> install the package if it's not already installed + force is a boolean, that has meaning only when update_only is False: + True -> reinstall the package if it's already installed + False -> fail if the package is already installed + + Worker replies with new instance of package. + """ + __slots__ = ('uri', 'update_only', "force") + def __init__(self, uri, async=False, update_only=False, force=False, + metadata=None): + if not isinstance(uri, basestring): + raise TypeError("uri must be a string") + if uri.startswith('file://'): + uri = uri[len('file://'):] + if not yum.misc.re_remote_url(uri) and not os.path.exists(uri): + raise errors.InvalidURI(uri) + YumAsyncJob.__init__(self, async=async, metadata=metadata) + self.uri = uri + self.update_only = bool(update_only) + self.force = bool(force) + class YumGetRepositoryList(YumJob): #pylint: disable=R0903 """ Job requesing a list of repositories. @@ -288,7 +585,7 @@ class YumGetRepositoryList(YumJob): #pylint: disable=R0903 ", ".join(self.SUPPORTED_KINDS)) self.kind = kind -class YumFilterRepositories(YumGetRepositoryList): +class YumFilterRepositories(YumGetRepositoryList): #pylint: disable=R0903 """ Job similar to YumGetRepositoryList, but allowing to specify filter on packages. -- cgit