diff options
Diffstat (limited to 'src/software/openlmi/software/yumdb/__init__.py')
-rw-r--r-- | src/software/openlmi/software/yumdb/__init__.py | 488 |
1 files changed, 387 insertions, 101 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py index d52b469..b5160b4 100644 --- a/src/software/openlmi/software/yumdb/__init__.py +++ b/src/software/openlmi/software/yumdb/__init__.py @@ -33,10 +33,12 @@ only accessor to yum api. """ import errno +import inspect import os import re import time -from multiprocessing import Process, JoinableQueue, Queue +from multiprocessing import Process, Queue #pylint: disable=W0404 +from pywbem.cim_provider2 import CIMProvider2 import Queue as TQueue # T as threaded import threading import yum @@ -48,16 +50,30 @@ from openlmi.software.yumdb.packageinfo import PackageInfo from openlmi.software.yumdb.packagecheck import PackageFile from openlmi.software.yumdb.packagecheck import PackageCheck from openlmi.software.yumdb.process import YumWorker -from openlmi.software.util import singletonmixin +from openlmi.software.yumdb.repository import Repository +from openlmi.software.yumdb.util import DispatchingFormatter +from openlmi.software.util import get_signal_name, singletonmixin + +# Maximum time in seconds to wait for a job to accomplish. +# If timeout expires, spawned process is checked (it might +# be possibly killed) and is respawned in case it's dead. +MAX_JOB_WAIT_TIME = 30 # this may be used as an argument to YumWorker YUM_WORKER_DEBUG_LOGGING_CONFIG = { "version" : 1, "formatters": { + # this is a message format for logging function/method calls + # it's manually set up in YumWorker's init method "default": { - "format" : "%(asctime)s %(levelname)s:%(module)s:" - "%(funcName)s:%(lineno)d - %(message)s" - } + "()": DispatchingFormatter, + "formatters" : { + "openlmi.software.yumdb.util.trace_function": + "%(asctime)s %(levelname)s:%(message)s" + }, + "default" : "%(asctime)s %(levelname)s:%(module)s:" + "%(funcName)s:%(lineno)d - %(message)s" + }, }, "handlers": { "file" : { @@ -65,20 +81,128 @@ YUM_WORKER_DEBUG_LOGGING_CONFIG = { "filename" : "/var/tmp/YumWorker.log", "level" : "DEBUG", "formatter": "default", - } + }, }, - "root": { - "level": "DEBUG", - "handlers" : ["file"] - } + "loggers" : { + "root": { + "level": "ERROR", + "handlers" : ["file"] + }, + "openlmi.software.yumdb": { + "level" : "DEBUG", + "handlers" : ["file"], + "propagate" : False, + }, } +} + +# ***************************************************************************** +# Utilities +# ***************************************************************************** +def log_reply_error(job, reply): + """ + Raises an exception in case of error occured in worker process + while processing job. + """ + if isinstance(reply, (int, long)): + # asynchronous job + return + if not isinstance(reply, jobs.YumJob): + raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' % + reply.__class__.__name__) + if reply.result == jobs.YumJob.RESULT_ERROR: + cmpi_logging.logger.error( + "YumDB: %s failed with error %s: %s", + job, reply.result_data[0].__name__, str(reply.result_data[1])) + cmpi_logging.logger.trace_warn( + "YumDB: %s exception traceback:\n%s%s: %s", + job, "".join(reply.result_data[2]), + reply.result_data[0].__name__, str(reply.result_data[1])) + reply.result_data[1].tb_printed = True + raise reply.result_data[1] + elif reply.result == jobs.YumJob.RESULT_TERMINATED: + cmpi_logging.logger.warn('YumDB: %s terminated', job) + else: + cmpi_logging.logger.debug('YumDB: %s completed with success', job) + +def _make_async_job(jobcls, *args, **kwargs): + """Creates asynchronous job, filling it wih some metadata.""" + if not issubclass(jobcls, jobs.YumAsyncJob): + raise TypeError("jobcls must be a subclass of YumAsyncJob") + job = jobcls(*args, **kwargs) + if job.metadata is None: + job.metadata = {} + job.metadata['name'] = \ + type(job).__name__[len('Yum'):] + ('-%d' % job.jobid) + frm = inspect.currentframe() + method_name = None + while ( frm is not None + and ( not 'self' in frm.f_locals + or not isinstance(frm.f_locals['self'], CIMProvider2))): + frm = frm.f_back + if frm is not None: + prov = frm.f_locals['self'] + method_name = frm.f_code.co_name.lower() + if method_name.startswith('cim_method_'): + method_name = method_name[len('cim_method_'):] + if hasattr(prov, 'values'): + lowertocorrectcase = { + k.lower(): k for k in prov.values.__dict__ } + try: + method_name = lowertocorrectcase[method_name] + except KeyError: + pass + if method_name is not None: + job.metadata['method_name'] = method_name + return job + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def job_request(async=False): + """ + Decorator factory for job entry points. They are YumDB methods. + All of them must return either job objects or jobid for asynchronous calls. + Job objects are processed by this decorator for caller to obtain only the + information he needs. + + It wrapps them with logger wrapper and in case of asynchronous jobs, + it returns just the jobid. + """ + def _decorator(method): + """ + Decorator that just logs the method's call and returns job's result. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + return logged(self, *args, **kwargs).result_data + return _new_func + + def _decorator_async(method): + """ + Decorator for methods accepting async argument. In case of async=True, + the method returns jobid. Job's result is returned otherwise. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + callargs = inspect.getcallargs(method, self, *args, **kwargs) + result = logged(self, *args, **kwargs) + if callargs.get('async', False): + return result + else: + return result.result_data + return _new_func + + return _decorator_async if async else _decorator class YumDB(singletonmixin.Singleton): """ Context manager for accessing yum/rpm database. All requests are bundled into jobs -- instances of jobs.YumJob and sent to YumWorker for processing. - + YumWorker is a separate process handling all calls to yum api. Communication is done via queues (uplink and downlink). Uplink is used to send jobs to YumWorker and downlink for obtaining @@ -99,19 +223,23 @@ class YumDB(singletonmixin.Singleton): # this is to inform Singleton, that __init__ should be called only once ignoreSubsequent = True - def __init__(self, *args, **kwargs): #pylint: disable=W0231 + @cmpi_logging.trace_method + def __init__(self, **kwargs): #pylint: disable=W0231 """ All arguments are passed to yum.YumBase constructor. """ self._process = None - self._yum_args = (args, kwargs) + if kwargs is None: + kwargs = {} + self._yum_kwargs = kwargs + + self._session_lock = threading.RLock() + self._session_level = 0 - # used to access _replies dictionary and _expected list + # used to guard access to _expected list and _process self._reply_lock = threading.Lock() # used to wait for job to be processed and received self._reply_cond = threading.Condition(self._reply_lock) - # { job_id : reply, ... } - self._replies = {} # ids of all expected jobs -- those to be processed by YumWorker self._expected = [] cmpi_logging.logger.trace_info('YumDB: initialized') @@ -120,49 +248,114 @@ class YumDB(singletonmixin.Singleton): # Private methods # ************************************************************************* @cmpi_logging.trace_method - def _wait_for_reply(self, job): + def _handle_reply_timeout(self, job): """ - Blocks until job is processed by YumWorker and received. + This is called when timeout occurs while waiting on downlink queue for + reply. Delay can be caused by worker process's early termination (bug). + This handler tries to recover from such an situation. + """ + if not self._worker.is_alive(): + if self._worker.exitcode < 0: + cmpi_logging.logger.error("[jobid=%d] worker" + " process(pid=%d) killed by signal %s", job.jobid, + self._worker.pid, get_signal_name(-self._process.exitcode)) + else: + cmpi_logging.logger.error("[jobid=%d] worker" + " process(pid=%d) is dead - exit code: %d", + job.jobid, self._process.pid, self._worker.exitcode) + with self._reply_lock: + self._process = None + cmpi_logging.logger.error( + "[jobid=%d] starting new worker process", job.jobid) + self._expected = [] + if not isinstance(job, jobs.YumBeginSession): + with self._session_lock: + if self._session_level > 0: + cmpi_logging.logger.info('restoring session ' + 'level=%d', self._session_level) + new_session_job = jobs.YumBeginSession() + self._worker.uplink.put(new_session_job) + reply = self._worker.downlink.get() + log_reply_error(new_session_job, reply) + self._worker.uplink.put(job) + self._expected.append(job.jobid) + # other waiting processes need to resend their requests + self._reply_cond.notifyAll() + else: + cmpi_logging.logger.info("[jobid=%d] process is running," + " waiting some more", job.jobid) - Only one thread can block on downlink channel to obtain reply. If - it's reply for him, he takes it and leaves, otherwise he adds it to - _replies dictionary and notifies other threads. This thread is the - one, whose job appears as first in _expected list. + @cmpi_logging.trace_method + def _receive_reply(self, job): + """ + Block on downlink queue to receive expected replies from worker + process. Only one thread can be executing this code at any time. + + In case, that worker process terminated due to some error. Restart it + and resend all the job requests again. + """ + while True: + cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue", + job.jobid) + try: + jobout = self._worker.downlink.get( + block=True, timeout=MAX_JOB_WAIT_TIME) + if jobout.jobid == job.jobid: + with self._reply_lock: + cmpi_logging.logger.debug( + "[jobid=%d] received desired reply", job.jobid) + self._expected.remove(job.jobid) + if len(self._expected): + self._reply_cond.notify() + return jobout + else: + # this should not happen + cmpi_logging.logger.error("[jobid=%d] received reply" + " for another thread (jobid=%d)", + job.jobid, jobout.jobid) + except TQueue.Empty: + cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout" + "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME) + self._handle_reply_timeout(job) + + @cmpi_logging.trace_method + def _send_and_receive(self, job): + """ + Sends a request to server and blocks until job is processed by + YumWorker and reply is received. + + Only one thread can block on downlink channel to obtain reply. This + thread is the one, whose job appears as first in _expected list. Server + processes input jobs sequentially. That's why it's safe to presume, + that jobs are received in the same order as they were send. Thanks to + that we don't have to care about receiving replies for the other + waiting threads. @return result of job """ with self._reply_lock: - # until our job is not at the head of + self._worker.uplink.put(job) + if getattr(job, 'async', False) is True: + return job.jobid self._expected.append(job.jobid) - while job.jobid != self._expected[0]: - if job.jobid in self._replies: - self._expected.remove(job.jobid) - return self._replies.pop(job.jobid) - else: + while True: + if job.jobid not in self._expected: + # process terminated, resending job + cmpi_logging.logger.warn("[jobid=%d] job removed" + " from expected list, sending request again", job.jobid) + self._worker.uplink.put(job) + self._expected.append(job.jobid) + elif job.jobid == self._expected[0]: + # now it's our turn to block on downlink + break + else: # another thread blocks on downlink -> let's sleep cmpi_logging.logger.debug( - "[jobid=%d] another %s threads expecting reply," + "[jobid=%d] another %d threads expecting reply," " suspending...", job.jobid, len(self._expected) - 1) self._reply_cond.wait() cmpi_logging.logger.debug( "[jobid=%d] received reply, waking up", job.jobid) - while True: - cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue", - job.jobid) - jobid, reply = self._worker.downlink.get() - with self._reply_lock: - if jobid != job.jobid: - cmpi_logging.logger.debug("[jobid=%d] received reply" - " for another thread (jobid=%d)", job.jobid, jobid) - self._replies[jobid] = reply - self._reply_cond.notifyAll() - else: - cmpi_logging.logger.debug( - "[jobid=%d] received desired reply", job.jobid) - self._expected.remove(job.jobid) - if len(self._expected): - self._reply_cond.notify() - break - return reply + return self._receive_reply(job) def _do_job(self, job): """ @@ -172,23 +365,10 @@ class YumDB(singletonmixin.Singleton): (exception_type, exception_value, formated_traceback_as_string) @return reply """ - cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job", - job.__class__.__name__, job.jobid) - self._worker.uplink.put(job) - reply = self._wait_for_reply(job) - if isinstance(reply, tuple): - cmpi_logging.logger.error( - "YumDB: job %s(id=%s) failed with error %s: %s", - job.__class__.__name__, job.jobid, - reply[0].__name__, str(reply[1])) - cmpi_logging.logger.trace_warn( - "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s", - job.__class__.__name__, job.jobid, "".join(reply[2]), - reply[0].__name__, str(reply[1])) - reply[1].tb_printed = True - raise reply[1] - cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done", - job.__class__.__name__, job.jobid) + cmpi_logging.logger.trace_verbose("YumDB: doing %s", job) + reply = self._send_and_receive(job) + log_reply_error(job, reply) + cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid) return reply @property @@ -198,12 +378,14 @@ class YumDB(singletonmixin.Singleton): """ if self._process is None: cmpi_logging.logger.trace_info("YumDB: starting YumWorker") - uplink = JoinableQueue() + uplink = Queue() downlink = Queue() self._process = YumWorker(uplink, downlink, - yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1]) + yum_kwargs=self._yum_kwargs) #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) self._process.start() + cmpi_logging.logger.trace_info( + "YumDB: YumWorker started with pid=%s", self._process.pid) return self._process # ************************************************************************* @@ -218,14 +400,24 @@ class YumDB(singletonmixin.Singleton): @cmpi_logging.trace_method def __enter__(self): - self._do_job(jobs.YumBeginSession()) - cmpi_logging.logger.trace_info('YumDB: new session started') - return self + with self._session_lock: + if self._session_level == 0: + self._do_job(jobs.YumBeginSession()) + cmpi_logging.logger.trace_info('YumDB: new session started') + self._session_level += 1 + cmpi_logging.logger.trace_info('YumDB: nested to session level=%d', + self._session_level) + return self @cmpi_logging.trace_method def __exit__(self, exc_type, exc_value, traceback): - self._do_job(jobs.YumEndSession()) - cmpi_logging.logger.trace_info('YumDB: session ended') + with self._session_lock: + if self._session_level == 1: + self._do_job(jobs.YumEndSession()) + cmpi_logging.logger.trace_info('YumDB: session ended') + cmpi_logging.logger.trace_info('YumDB: emerged from session' + ' level=%d', self._session_level) + self._session_level = max(self._session_level - 1, 0) # ************************************************************************* # Public methods @@ -235,32 +427,42 @@ class YumDB(singletonmixin.Singleton): """ Shut down the YumWorker process. """ - cmpi_logging.logger.info('YumDB: cleanup called') - if self._process: - cmpi_logging.logger.info('YumDB: terminating YumWorker') - self._process.uplink.put(None) # terminating command - self._process.uplink.join() - self._process.join() - cmpi_logging.logger.info('YumDB: YumWorker terminated') - self._process = None + with self._reply_lock: + if self._process is not None: + cmpi_logging.logger.info('YumDB: terminating YumWorker') + self._process.uplink.put(None) # terminating command + self._process.join() + cmpi_logging.logger.info('YumDB: YumWorker terminated') + self._process = None + else: + cmpi_logging.logger.warn("YunDB: clean_up called, when process" + " not initialized!") - @cmpi_logging.trace_method + # ************************************************************************* + # Jobs with simple results + # ************************************************************************* + @job_request() def get_package_list(self, kind, allow_duplicates=False, - sort=False): + sort=False, + include_repos=None, + exclude_repos=None): """ - @param kind is one of: {"installed", "available", "all"} + @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS @param allow_duplicates says, whether to list all found versions of single package @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo """ return self._do_job(jobs.YumGetPackageList( - kind, allow_duplicates=allow_duplicates, sort=sort)) + kind, allow_duplicates=allow_duplicates, sort=sort, + include_repos=include_repos, exclude_repos=exclude_repos)) - @cmpi_logging.trace_method + @job_request() def filter_packages(self, kind, allow_duplicates=False, sort=False, + include_repos=None, + exclude_repos=None, **filters): """ Similar to get_package_list(), but applies filter on packages. @@ -268,51 +470,135 @@ class YumDB(singletonmixin.Singleton): """ return self._do_job(jobs.YumFilterPackages( kind, allow_duplicates=allow_duplicates, sort=sort, + include_repos=include_repos, exclude_repos=exclude_repos, **filters)) - @cmpi_logging.trace_method - def install_package(self, pkg): + @job_request() + def get_repository_list(self, kind): + """ + @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository + """ + return self._do_job(jobs.YumGetRepositoryList(kind)) + + @job_request() + def filter_repositories(self, kind, **filters): + """ + Similar to get_repository_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterRepositories job for supported filter keys + """ + return self._do_job(jobs.YumFilterRepositories(kind, **filters)) + + @job_request() + def set_repository_enabled(self, repoid, enable): + """ + Enable or disable repository. + @param enable is a boolean + """ + return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable)) + + # ************************************************************************* + # Asynchronous jobs + # ************************************************************************* + @job_request(async=True) + def install_package(self, pkg, async=False, force=False): """ Install package. @param pkg is an instance of PackageInfo obtained with - get_package_list() or filter_packages(), which must be not installed + get_package_list() or filter_packages() or a valid nevra as string. + Package must not be installed if force is False. """ - return self._do_job(jobs.YumInstallPackage(pkg)) + return self._do_job(_make_async_job(jobs.YumInstallPackage, + pkg, force=force, async=async)) - @cmpi_logging.trace_method - def remove_package(self, pkg): + @job_request(async=True) + def remove_package(self, pkg, async=False): """ @param pkg is an instance of PackageInfo obtained with get_package_list() or filter_packages(), which must be installed """ - return self._do_job(jobs.YumRemovePackage(pkg)) + return self._do_job(_make_async_job(jobs.YumRemovePackage, + pkg, async=async)) - @cmpi_logging.trace_method - def update_to_package(self, desired_pkg): + @job_request(async=True) + def update_to_package(self, desired_pkg, async=False): """ @param desired_pkg is an instance of PackageInfo, which must be available """ - return self._do_job(jobs.YumUpdateToPackage(desired_pkg)) + return self._do_job(_make_async_job(jobs.YumUpdateToPackage, + desired_pkg, async=async)) - @cmpi_logging.trace_method + @job_request(async=True) def update_package(self, pkg, + async=False, to_epoch=None, to_version=None, - to_release=None): + to_release=None, + force=False): """ @param pkg is an instance of PackageInfo, which must be installed The other parameters filter candidate available packages for update. """ - return self._do_job(jobs.YumUpdatePackage( - pkg, to_epoch, to_version, to_release)) + return self._do_job(_make_async_job(jobs.YumUpdatePackage, + pkg, async, to_epoch, to_version, to_release, force=force)) - @cmpi_logging.trace_method - def check_package(self, pkg): + @job_request(async=True) + def check_package(self, pkg, async=False): """ @param pkg is an instance of PackageInfo representing installed package @return instance of yumdb.PackageCheck """ - return self._do_job(jobs.YumCheckPackage(pkg)) + return self._do_job(_make_async_job(jobs.YumCheckPackage, + pkg, async=async)) + + @job_request(async=True) + def install_package_from_uri(self, uri, + async=False, update_only=False, force=False): + """ + Install package from uri. + @param uri is either remote url or local path. + """ + return self._do_job(jobs.YumInstallPackageFromURI( + uri, async, update_only, force=force)) + + # ************************************************************************* + # Control of asynchronous jobs + # ************************************************************************* + @job_request() + def get_job(self, jobid): + return self._do_job(jobs.YumJobGet(jobid)) + + @job_request() + def get_job_list(self): + return self._do_job(jobs.YumJobGetList()) + + @job_request() + def get_job_by_name(self, name): + return self._do_job(jobs.YumJobGetByName(name)) + + @job_request() + def set_job_priority(self, jobid, priority): + return self._do_job(jobs.YumJobSetPriority(jobid, priority)) + + @job_request() + def update_job(self, jobid, **kwargs): + return self._do_job(jobs.YumJobUpdate(jobid, **kwargs)) + + @job_request() + def reschedule_job(self, jobid, + delete_on_completion, time_before_removal): + return self._do_job(jobs.YumJobReschedule(jobid, + delete_on_completion, time_before_removal)) + + @job_request() + def delete_job(self, jobid): + return self._do_job(jobs.YumJobDelete(jobid)) + + @job_request() + def terminate_job(self, jobid): + return self._do_job(jobs.YumJobTerminate(jobid)) |