diff options
Diffstat (limited to 'src/software/openlmi/software/yumdb/__init__.py')
-rw-r--r-- | src/software/openlmi/software/yumdb/__init__.py | 311 |
1 files changed, 237 insertions, 74 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py index 512602a..c9fc0c5 100644 --- a/src/software/openlmi/software/yumdb/__init__.py +++ b/src/software/openlmi/software/yumdb/__init__.py @@ -33,10 +33,12 @@ only accessor to yum api. """ import errno +import inspect import os import re import time -from multiprocessing import Process, JoinableQueue, Queue #pylint: disable=W0404 +from multiprocessing import Process, Queue #pylint: disable=W0404 +from pywbem.cim_provider2 import CIMProvider2 import Queue as TQueue # T as threaded import threading import yum @@ -49,21 +51,29 @@ from openlmi.software.yumdb.packagecheck import PackageFile from openlmi.software.yumdb.packagecheck import PackageCheck from openlmi.software.yumdb.process import YumWorker from openlmi.software.yumdb.repository import Repository +from openlmi.software.yumdb.util import DispatchingFormatter from openlmi.software.util import get_signal_name, singletonmixin # Maximum time in seconds to wait for a job to accomplish. # If timeout expires, spawned process is checked (it might # be possibly killed) and is respawned in case it's dead. -MAX_JOB_WAIT_TIME = 120 +MAX_JOB_WAIT_TIME = 30 # this may be used as an argument to YumWorker YUM_WORKER_DEBUG_LOGGING_CONFIG = { "version" : 1, "formatters": { + # this is a message format for logging function/method calls + # it's manually set up in YumWorker's init method "default": { - "format" : "%(asctime)s %(levelname)s:%(module)s:" - "%(funcName)s:%(lineno)d - %(message)s" - } + "()": DispatchingFormatter, + "formatters" : { + "openlmi.software.yumdb.util.trace_function": + "%(asctime)s %(levelname)s:%(message)s" + }, + "default" : "%(asctime)s %(levelname)s:%(module)s:" + "%(funcName)s:%(lineno)d - %(message)s" + }, }, "handlers": { "file" : { @@ -71,30 +81,121 @@ YUM_WORKER_DEBUG_LOGGING_CONFIG = { "filename" : "/var/tmp/YumWorker.log", "level" : "DEBUG", "formatter": "default", - } + }, }, - "root": { - "level": "DEBUG", - "handlers" : ["file"] - } + "loggers" : { + "root": { + "level": "ERROR", + "handlers" : ["file"] + }, + "openlmi.software.yumdb": { + "level" : "DEBUG", + "handlers" : ["file"], + "propagate" : False, + }, } +} +# ***************************************************************************** +# Utilities +# ***************************************************************************** def log_reply_error(job, reply): """ Raises an exception in case of error occured in worker process while processing job. """ - if isinstance(reply, tuple): + if isinstance(reply, (int, long)): + # asynchronous job + return + if not isinstance(reply, jobs.YumJob): + raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' % + reply.__class__.__name__) + if reply.result == jobs.YumJob.RESULT_ERROR: cmpi_logging.logger.error( - "YumDB: job %s(id=%s) failed with error %s: %s", - job.__class__.__name__, job.jobid, - reply[0].__name__, str(reply[1])) + "YumDB: %s failed with error %s: %s", + job, reply.result_data[0].__name__, str(reply.result_data[1])) cmpi_logging.logger.trace_warn( - "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s", - job.__class__.__name__, job.jobid, "".join(reply[2]), - reply[0].__name__, str(reply[1])) - reply[1].tb_printed = True - raise reply[1] + "YumDB: %s exception traceback:\n%s%s: %s", + job, "".join(reply.result_data[2]), + reply.result_data[0].__name__, str(reply.result_data[1])) + reply.result_data[1].tb_printed = True + raise reply.result_data[1] + elif reply.result == jobs.YumJob.RESULT_TERMINATED: + cmpi_logging.logger.warn('YumDB: %s terminated', job) + else: + cmpi_logging.logger.debug('YumDB: %s completed with success', job) + +def _make_async_job(jobcls, *args, **kwargs): + """Creates asynchronous job, filling it wih some metadata.""" + if not issubclass(jobcls, jobs.YumAsyncJob): + raise TypeError("jobcls must be a subclass of YumAsyncJob") + job = jobcls(*args, **kwargs) + if job.metadata is None: + job.metadata = {} + job.metadata['name'] = \ + type(job).__name__[len('Yum'):] + ('-%d' % job.jobid) + frm = inspect.currentframe() + method_name = None + while ( frm is not None + and ( not 'self' in frm.f_locals + or not isinstance(frm.f_locals['self'], CIMProvider2))): + frm = frm.f_back + if frm is not None: + prov = frm.f_locals['self'] + method_name = frm.f_code.co_name.lower() + if method_name.startswith('cim_method_'): + method_name = method_name[len('cim_method_'):] + if hasattr(prov, 'values'): + lowertocorrectcase = { + k.lower(): k for k in prov.values.__dict__ } + try: + method_name = lowertocorrectcase[method_name] + except KeyError: + pass + if method_name is not None: + job.metadata['method_name'] = method_name + return job + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def job_request(async=False): + """ + Decorator factory for job entry points. They are YumDB methods. + All of them must return either job objects or jobid for asynchronous calls. + Job objects are processed by this decorator for caller to obtain only the + information he needs. + + It wrapps them with logger wrapper and in case of asynchronous jobs, + it returns just the jobid. + """ + def _decorator(method): + """ + Decorator that just logs the method's call and returns job's result. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + return logged(self, *args, **kwargs).result_data + return _new_func + + def _decorator_async(method): + """ + Decorator for methods accepting async argument. In case of async=True, + the method returns jobid. Job's result is returned otherwise. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + callargs = inspect.getcallargs(method, self, *args, **kwargs) + result = logged(self, *args, **kwargs) + if callargs.get('async', False): + return result + else: + return result.result_data + return _new_func + + return _decorator_async if async else _decorator class YumDB(singletonmixin.Singleton): """ @@ -122,12 +223,15 @@ class YumDB(singletonmixin.Singleton): # this is to inform Singleton, that __init__ should be called only once ignoreSubsequent = True - def __init__(self, *args, **kwargs): #pylint: disable=W0231 + @cmpi_logging.trace_method + def __init__(self, **kwargs): #pylint: disable=W0231 """ All arguments are passed to yum.YumBase constructor. """ self._process = None - self._yum_args = (args, kwargs) + if kwargs is None: + kwargs = {} + self._yum_kwargs = kwargs self._session_lock = threading.RLock() self._session_level = 0 @@ -194,21 +298,21 @@ class YumDB(singletonmixin.Singleton): cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue", job.jobid) try: - jobid, reply = self._worker.downlink.get( + jobout = self._worker.downlink.get( block=True, timeout=MAX_JOB_WAIT_TIME) - if jobid == job.jobid: + if jobout.jobid == job.jobid: with self._reply_lock: cmpi_logging.logger.debug( "[jobid=%d] received desired reply", job.jobid) self._expected.remove(job.jobid) if len(self._expected): self._reply_cond.notify() - return reply + return jobout else: # this should not happen cmpi_logging.logger.error("[jobid=%d] received reply" " for another thread (jobid=%d)", - job.jobid, jobid) + job.jobid, jobout.jobid) except TQueue.Empty: cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout" "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME) @@ -231,6 +335,8 @@ class YumDB(singletonmixin.Singleton): """ with self._reply_lock: self._worker.uplink.put(job) + if getattr(job, 'async', False) is True: + return job.jobid self._expected.append(job.jobid) while True: if job.jobid not in self._expected: @@ -259,12 +365,10 @@ class YumDB(singletonmixin.Singleton): (exception_type, exception_value, formated_traceback_as_string) @return reply """ - cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job", - job.__class__.__name__, job.jobid) + cmpi_logging.logger.trace_verbose("YumDB: doing %s", job) reply = self._send_and_receive(job) log_reply_error(job, reply) - cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done", - job.__class__.__name__, job.jobid) + cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid) return reply @property @@ -274,11 +378,11 @@ class YumDB(singletonmixin.Singleton): """ if self._process is None: cmpi_logging.logger.trace_info("YumDB: starting YumWorker") - uplink = JoinableQueue() + uplink = Queue() downlink = Queue() self._process = YumWorker(uplink, downlink, - yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1]) - #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) + yum_kwargs=self._yum_kwargs, + logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) self._process.start() cmpi_logging.logger.trace_info( "YumDB: YumWorker started with pid=%s", self._process.pid) @@ -327,7 +431,6 @@ class YumDB(singletonmixin.Singleton): if self._process is not None: cmpi_logging.logger.info('YumDB: terminating YumWorker') self._process.uplink.put(None) # terminating command - self._process.uplink.join() self._process.join() cmpi_logging.logger.info('YumDB: YumWorker terminated') self._process = None @@ -335,7 +438,10 @@ class YumDB(singletonmixin.Singleton): cmpi_logging.logger.warn("YunDB: clean_up called, when process" " not initialized!") - @cmpi_logging.trace_method + # ************************************************************************* + # Jobs with simple results + # ************************************************************************* + @job_request() def get_package_list(self, kind, allow_duplicates=False, sort=False, @@ -351,7 +457,7 @@ class YumDB(singletonmixin.Singleton): kind, allow_duplicates=allow_duplicates, sort=sort, include_repos=include_repos, exclude_repos=exclude_repos)) - @cmpi_logging.trace_method + @job_request() def filter_packages(self, kind, allow_duplicates=False, sort=False, @@ -367,75 +473,132 @@ class YumDB(singletonmixin.Singleton): include_repos=include_repos, exclude_repos=exclude_repos, **filters)) - @cmpi_logging.trace_method - def install_package(self, pkg): + @job_request() + def get_repository_list(self, kind): + """ + @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository + """ + return self._do_job(jobs.YumGetRepositoryList(kind)) + + @job_request() + def filter_repositories(self, kind, **filters): + """ + Similar to get_repository_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterRepositories job for supported filter keys + """ + return self._do_job(jobs.YumFilterRepositories(kind, **filters)) + + @job_request() + def set_repository_enabled(self, repoid, enable): + """ + Enable or disable repository. + @param enable is a boolean + """ + return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable)) + + # ************************************************************************* + # Asynchronous jobs + # ************************************************************************* + @job_request(async=True) + def install_package(self, pkg, async=False, force=False): """ Install package. @param pkg is an instance of PackageInfo obtained with - get_package_list() or filter_packages(), which must be not installed + get_package_list() or filter_packages() or a valid nevra as string. + Package must not be installed if force is False. """ - return self._do_job(jobs.YumInstallPackage(pkg)) + return self._do_job(_make_async_job(jobs.YumInstallPackage, + pkg, force=force, async=async)) - @cmpi_logging.trace_method - def remove_package(self, pkg): + @job_request(async=True) + def remove_package(self, pkg, async=False): """ @param pkg is an instance of PackageInfo obtained with get_package_list() or filter_packages(), which must be installed """ - return self._do_job(jobs.YumRemovePackage(pkg)) + return self._do_job(_make_async_job(jobs.YumRemovePackage, + pkg, async=async)) - @cmpi_logging.trace_method - def update_to_package(self, desired_pkg): + @job_request(async=True) + def update_to_package(self, desired_pkg, async=False): """ @param desired_pkg is an instance of PackageInfo, which must be available """ - return self._do_job(jobs.YumUpdateToPackage(desired_pkg)) + return self._do_job(_make_async_job(jobs.YumUpdateToPackage, + desired_pkg, async=async)) - @cmpi_logging.trace_method + @job_request(async=True) def update_package(self, pkg, + async=False, to_epoch=None, to_version=None, - to_release=None): + to_release=None, + force=False): """ @param pkg is an instance of PackageInfo, which must be installed The other parameters filter candidate available packages for update. """ - return self._do_job(jobs.YumUpdatePackage( - pkg, to_epoch, to_version, to_release)) + return self._do_job(_make_async_job(jobs.YumUpdatePackage, + pkg, async, to_epoch, to_version, to_release, force=force)) - @cmpi_logging.trace_method - def check_package(self, pkg): + @job_request(async=True) + def check_package(self, pkg, async=False): """ @param pkg is an instance of PackageInfo representing installed package @return instance of yumdb.PackageCheck """ - return self._do_job(jobs.YumCheckPackage(pkg)) + return self._do_job(_make_async_job(jobs.YumCheckPackage, + pkg, async=async)) - @cmpi_logging.trace_method - def get_repository_list(self, kind): + @job_request(async=True) + def install_package_from_uri(self, uri, + async=False, update_only=False, force=False): """ - @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS - @param allow_duplicates says, whether to list all found versions - of single package - @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository + Install package from uri. + @param uri is either remote url or local path. """ - return self._do_job(jobs.YumGetRepositoryList(kind)) + return self._do_job(jobs.YumInstallPackageFromURI( + uri, async, update_only, force=force)) - @cmpi_logging.trace_method - def filter_repositories(self, kind, **filters): - """ - Similar to get_repository_list(), but applies filter on packages. - @see yumdb.jobs.YumFilterRepositories job for supported filter keys - """ - return self._do_job(jobs.YumFilterRepositories(kind, **filters)) - - @cmpi_logging.trace_method - def set_repository_enabled(self, repoid, enable): - """ - Enable or disable repository. - @param enable is a boolean - """ - return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable)) + # ************************************************************************* + # Control of asynchronous jobs + # ************************************************************************* + @job_request() + def get_job(self, jobid): + return self._do_job(jobs.YumJobGet(jobid)) + + @job_request() + def get_job_list(self): + return self._do_job(jobs.YumJobGetList()) + + @job_request() + def get_job_by_name(self, name): + return self._do_job(jobs.YumJobGetByName(name)) + + @job_request() + def set_job_priority(self, jobid, priority): + return self._do_job(jobs.YumJobSetPriority(jobid, priority)) + + @job_request() + def update_job(self, jobid, **kwargs): + return self._do_job(jobs.YumJobUpdate(jobid, **kwargs)) + + @job_request() + def reschedule_job(self, jobid, + delete_on_completion, time_before_removal): + return self._do_job(jobs.YumJobReschedule(jobid, + delete_on_completion, time_before_removal)) + + @job_request() + def delete_job(self, jobid): + return self._do_job(jobs.YumJobDelete(jobid)) + + @job_request() + def terminate_job(self, jobid): + return self._do_job(jobs.YumJobTerminate(jobid)) |