diff options
Diffstat (limited to 'src/software/lmi/software/yumdb/__init__.py')
-rw-r--r-- | src/software/lmi/software/yumdb/__init__.py | 677 |
1 files changed, 677 insertions, 0 deletions
diff --git a/src/software/lmi/software/yumdb/__init__.py b/src/software/lmi/software/yumdb/__init__.py new file mode 100644 index 0000000..656b090 --- /dev/null +++ b/src/software/lmi/software/yumdb/__init__.py @@ -0,0 +1,677 @@ +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Since yum API functions should not be called with different thread_ids +repeatedly in the same program. It's neccessary, to make these calls +in single thread. But the provider needs to be able to clean up itself, +when its not needed. That's why the yum API needs to be accessed from +separated process, that is created and terminated when needed. + +This package contains all the bowels of this separate process together +with its management and communication facilities. + +YumDB is a context manager supposed to be used by any provider as the +only accessor to yum api. +""" + +import errno +import inspect +import os +import re +import time +from multiprocessing import Process, Queue #pylint: disable=W0404 +from pywbem.cim_provider2 import CIMProvider2 +import Queue as TQueue # T as threaded +import threading +import yum + +from lmi.common import cmpi_logging, singletonmixin +from lmi.common.IndicationManager import IndicationManager +from lmi.software.yumdb import jobs +from lmi.software.yumdb import errors +from lmi.software.yumdb.packageinfo import PackageInfo +from lmi.software.yumdb.packagecheck import PackageFile +from lmi.software.yumdb.packagecheck import PackageCheck +from lmi.software.yumdb.process import YumWorker +from lmi.software.yumdb.repository import Repository +from lmi.software.yumdb.util import DispatchingFormatter +from lmi.software.util import get_signal_name + +# Maximum time in seconds to wait for a job to accomplish. +# If timeout expires, spawned process is checked (it might +# be possibly killed) and is respawned in case it's dead. +MAX_JOB_WAIT_TIME = 30 + +# this may be used as an argument to YumWorker to setup logging +YUM_WORKER_DEBUG_LOGGING_CONFIG = { + "version" : 1, + 'disable_existing_loggers' : True, + "formatters": { + # this is a message format for logging function/method calls + # it's manually set up in YumWorker's init method + "default": { + "()": DispatchingFormatter, + "formatters" : { + "lmi.software.yumdb.util.trace_function": + "%(asctime)s %(levelname)s:%(message)s" + }, + "default" : "%(asctime)s %(levelname)s:%(module)s:" + "%(funcName)s:%(lineno)d - %(message)s" + }, + }, + "handlers": { + "file" : { + "class" : "logging.handlers.RotatingFileHandler", + "filename" : "/var/tmp/YumWorker.log", + "level" : "DEBUG", + "formatter": "default", + }, + }, + "loggers" : { + "root": { + "level": "DEBUG", + "handlers" : ["file"] + }, + "lmi.software.yumdb": { + "level" : "DEBUG", + "handlers" : ["file"], + "propagate" : False, + }, + } +} + +# ***************************************************************************** +# Utilities +# ***************************************************************************** +def log_reply_error(job, reply): + """ + Raises an exception in case of error occured in worker process + while processing job. + """ + if isinstance(reply, (int, long)): + # asynchronous job + return + if not isinstance(reply, jobs.YumJob): + raise TypeError('expected instance of jobs.YumJob for reply, not "%s"' % + reply.__class__.__name__) + if reply.result == jobs.YumJob.RESULT_ERROR: + cmpi_logging.logger.error( + "YumDB: %s failed with error %s: %s", + job, reply.result_data[0].__name__, str(reply.result_data[1])) + cmpi_logging.logger.trace_warn( + "YumDB: %s exception traceback:\n%s%s: %s", + job, "".join(reply.result_data[2]), + reply.result_data[0].__name__, str(reply.result_data[1])) + reply.result_data[1].tb_printed = True + raise reply.result_data[1] + elif reply.result == jobs.YumJob.RESULT_TERMINATED: + cmpi_logging.logger.warn('YumDB: %s terminated', job) + else: + cmpi_logging.logger.debug('YumDB: %s completed with success', job) + +def _make_async_job(jobcls, *args, **kwargs): + """Creates asynchronous job, filling it wih some metadata.""" + if not issubclass(jobcls, jobs.YumAsyncJob): + raise TypeError("jobcls must be a subclass of YumAsyncJob") + job = jobcls(*args, **kwargs) + if job.metadata is None: + job.metadata = {} + job.metadata['name'] = \ + type(job).__name__[len('Yum'):] + ('-%d' % job.jobid) + return job + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def job_request(async=False): + """ + Decorator factory for job entry points. They are YumDB methods. + All of them must return either job objects or jobid for asynchronous calls. + Job objects are processed by this decorator for caller to obtain only the + information he needs. + + It wrapps them with logger wrapper and in case of asynchronous jobs, + it returns just the jobid. + """ + def _decorator(method): + """ + Decorator that just logs the method's call and returns job's result. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + return logged(self, *args, **kwargs).result_data + return _new_func + + def _decorator_async(method): + """ + Decorator for methods accepting async argument. In case of async=True, + the method returns jobid. Job's result is returned otherwise. + """ + logged = cmpi_logging.trace_method(method) + def _new_func(self, *args, **kwargs): + """Wrapper for YumDB's method.""" + callargs = inspect.getcallargs(method, self, *args, **kwargs) + result = logged(self, *args, **kwargs) + if callargs.get('async', False): + return result + else: + return result.result_data + return _new_func + + return _decorator_async if async else _decorator + +class YumDB(singletonmixin.Singleton): + """ + Context manager for accessing yum/rpm database. + All requests are bundled into jobs -- instances of jobs.YumJob and + sent to YumWorker for processing. + + YumWorker is a separate process handling all calls to yum api. + Communication is done via queues (uplink and downlink). + Uplink is used to send jobs to YumWorker and downlink for obtaining + results. + + This is implemented in thread safe manner. + + It should be used as a context manager in case, we want to process + multiple jobs in single transaction. The example of usage: + with YumDB.getInstance() as ydb: + pkgs = ydb.filter_packages(...) + for pkg in pkgs: + ydb.install_package(pkg) + ... + Yum database stays locked in whole block of code under with statement. + """ + + # this is to inform Singleton, that __init__ should be called only once + ignoreSubsequent = True + + # This serves to all code base as a global variable used to check, + # whether YumDB instance is running under cimom broker or under worker + # process. This is important for code used in callback functions passed + # to worker responsible for creating instances of ConcreteJob. This code + # must avoid using calls to YumDB while running under worker. This + # + # Worker process must set this to False before starting its event handling + # loop. + RUNNING_UNDER_CIMOM_PROCESS = True + + @cmpi_logging.trace_method + def __init__(self, **kwargs): #pylint: disable=W0231 + """ + All arguments are passed to yum.YumBase constructor. + """ + self._process = None + if kwargs is None: + kwargs = {} + self._yum_kwargs = kwargs + + self._session_lock = threading.RLock() + self._session_level = 0 + + # used to guard access to _expected list and _process + self._reply_lock = threading.Lock() + # used to wait for job to be processed and received + self._reply_cond = threading.Condition(self._reply_lock) + # ids of all expected jobs -- those to be processed by YumWorker + self._expected = [] + # {job_id : reply, ... } + self._replies = {} + cmpi_logging.logger.trace_info('YumDB: initialized') + + # ************************************************************************* + # Private methods + # ************************************************************************* + @cmpi_logging.trace_method + def _handle_reply_timeout(self, job): + """ + This is called when timeout occurs while waiting on downlink queue for + reply. Delay can be caused by worker process's early termination (bug). + This handler tries to recover from such an situation. + """ + if not self._worker.is_alive(): + if self._worker.exitcode < 0: + cmpi_logging.logger.error("[jobid=%d] worker" + " process(pid=%d) killed by signal %s", job.jobid, + self._worker.pid, get_signal_name(-self._process.exitcode)) + else: + cmpi_logging.logger.error("[jobid=%d] worker" + " process(pid=%d) is dead - exit code: %d", + job.jobid, self._process.pid, self._worker.exitcode) + with self._reply_lock: + self._process = None + cmpi_logging.logger.error( + "[jobid=%d] starting new worker process", job.jobid) + self._expected = [] + if not isinstance(job, jobs.YumBeginSession): + with self._session_lock: + if self._session_level > 0: + cmpi_logging.logger.info('restoring session ' + 'level=%d', self._session_level) + new_session_job = jobs.YumBeginSession() + self._worker.uplink.put(new_session_job) + reply = self._worker.downlink.get() + log_reply_error(new_session_job, reply) + self._worker.uplink.put(job) + self._expected.append(job.jobid) + # other waiting processes need to resend their requests + self._reply_cond.notifyAll() + else: + cmpi_logging.logger.info("[jobid=%d] process is running," + " waiting some more", job.jobid) + + @cmpi_logging.trace_method + def _receive_reply(self, job): + """ + Block on downlink queue to receive expected replies from worker + process. Only one thread can be executing this code at any time. + + Only one thread can block on downlink channel to obtain reply. If + it's reply for him, he takes it and leaves, otherwise he adds it to + _replies dictionary and notifies other threads. This thread is the + one, whose job appears as first in _expected list. + + In case, that worker process terminated due to some error. Restart it + and resend all the job requests again. + """ + while True: + cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue", + job.jobid) + try: + jobout = self._worker.downlink.get( + block=True, timeout=MAX_JOB_WAIT_TIME) + if jobout.jobid == job.jobid: + cmpi_logging.logger.debug( + "[jobid=%d] received desired reply", job.jobid) + with self._reply_lock: + self._expected.remove(job.jobid) + if len(self._expected): + self._reply_cond.notify() + return jobout + else: + cmpi_logging.logger.info("[jobid=%d] received reply" + " for another thread (jobid=%d)", + job.jobid, jobout.jobid) + with self._reply_lock: + self._replies[jobout.jobid] = jobout + self._reply_cond.notifyAll() + except TQueue.Empty: + cmpi_logging.logger.warn("[jobid=%d] wait for job reply timeout" + "(%d seconds) occured", job.jobid, MAX_JOB_WAIT_TIME) + self._handle_reply_timeout(job) + + @cmpi_logging.trace_method + def _send_and_receive(self, job): + """ + Sends a request to server and blocks until job is processed by + YumWorker and reply is received. + + Only one thread can block on downlink channel to obtain reply. This + thread is the one, whose job appears as first in _expected list. Server + processes input jobs sequentially. That's why it's safe to presume, + that jobs are received in the same order as they were send. Thanks to + that we don't have to care about receiving replies for the other + waiting threads. + + @return result of job + """ + with self._reply_lock: + self._worker.uplink.put(job) + if getattr(job, 'async', False) is True: + return job.jobid + self._expected.append(job.jobid) + while True: + if job.jobid in self._replies: + cmpi_logging.logger.debug( + "[jobid=%d] desired reply already received", + job.jobid) + try: + self._expected.remove(job.jobid) + except ValueError: + cmpi_logging.logger.warn( + "[jobid=%d] reply not in expected list", + job.jobid) + return self._replies.pop(job.jobid) + elif job.jobid not in self._expected: + # process terminated, resending job + cmpi_logging.logger.warn("[jobid=%d] job removed" + " from expected list, sending request again", job.jobid) + self._worker.uplink.put(job) + self._expected.append(job.jobid) + elif job.jobid == self._expected[0]: + # now it's our turn to block on downlink + break + else: # another thread blocks on downlink -> let's sleep + cmpi_logging.logger.debug( + "[jobid=%d] another %d threads expecting reply," + " suspending...", job.jobid, len(self._expected) - 1) + self._reply_cond.wait() + cmpi_logging.logger.debug( + "[jobid=%d] received reply, waking up", job.jobid) + return self._receive_reply(job) + + def _do_job(self, job): + """ + Sends the job to YumWorker process and waits for reply. + If reply is a tuple, there was an error, while job processing. + Incoming exception is in format: + (exception_type, exception_value, formated_traceback_as_string) + @return reply + """ + cmpi_logging.logger.trace_verbose("YumDB: doing %s", job) + reply = self._send_and_receive(job) + log_reply_error(job, reply) + cmpi_logging.logger.trace_verbose("YumDB: job %s done", job.jobid) + return reply + + @property + def _worker(self): + """ + YumWorker process accessor. It's created upon first need. + """ + if self._process is None: + cmpi_logging.logger.trace_info("YumDB: starting YumWorker") + uplink = Queue() + downlink = Queue() + self._process = YumWorker(uplink, downlink, + indication_manager=IndicationManager.get_instance(), + yum_kwargs=self._yum_kwargs) + #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) + self._process.start() + cmpi_logging.logger.trace_info( + "YumDB: YumWorker started with pid=%s", self._process.pid) + return self._process + + # ************************************************************************* + # Special methods + # ************************************************************************* + def __del__(self): + """ + Ensure, that YumWorker process is correctly shutted down. + """ + self.clean_up() + singletonmixing.Singleton.__del__(self) + + @cmpi_logging.trace_method + def __enter__(self): + with self._session_lock: + if self._session_level == 0: + self._do_job(jobs.YumBeginSession()) + cmpi_logging.logger.trace_info('YumDB: new session started') + self._session_level += 1 + cmpi_logging.logger.trace_info('YumDB: nested to session level=%d', + self._session_level) + return self + + @cmpi_logging.trace_method + def __exit__(self, exc_type, exc_value, traceback): + with self._session_lock: + if self._session_level == 1: + self._do_job(jobs.YumEndSession()) + cmpi_logging.logger.trace_info('YumDB: session ended') + cmpi_logging.logger.trace_info('YumDB: emerged from session' + ' level=%d', self._session_level) + self._session_level = max(self._session_level - 1, 0) + + # ************************************************************************* + # Public methods + # ************************************************************************* + @cmpi_logging.trace_method + def clean_up(self): + """ + Shut down the YumWorker process. + """ + with self._reply_lock: + if self._process is not None: + cmpi_logging.logger.info('YumDB: terminating YumWorker') + self._process.uplink.put(None) # terminating command + self._process.join() + cmpi_logging.logger.info('YumDB: YumWorker terminated') + self._process = None + else: + cmpi_logging.logger.warn("YumDB: clean_up called, when process" + " not initialized!") + + # ************************************************************************* + # Jobs with simple results + # ************************************************************************* + @job_request() + def get_package_list(self, kind, + allow_duplicates=False, + sort=False, + include_repos=None, + exclude_repos=None): + """ + @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo + """ + return self._do_job(jobs.YumGetPackageList( + kind, allow_duplicates=allow_duplicates, sort=sort, + include_repos=include_repos, exclude_repos=exclude_repos)) + + @job_request() + def filter_packages(self, kind, + allow_duplicates=False, + sort=False, + include_repos=None, + exclude_repos=None, + **filters): + """ + Similar to get_package_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterPackages job for supported filter keys + """ + return self._do_job(jobs.YumFilterPackages( + kind, allow_duplicates=allow_duplicates, sort=sort, + include_repos=include_repos, exclude_repos=exclude_repos, + **filters)) + + @job_request() + def get_repository_list(self, kind): + """ + @param kind is one of: jobs.YumGetRepositoryList.SUPPORTED_KINDS + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.Repository + """ + return self._do_job(jobs.YumGetRepositoryList(kind)) + + @job_request() + def filter_repositories(self, kind, **filters): + """ + Similar to get_repository_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterRepositories job for supported filter keys + """ + return self._do_job(jobs.YumFilterRepositories(kind, **filters)) + + @job_request() + def set_repository_enabled(self, repoid, enable): + """ + Enable or disable repository. + @param enable is a boolean + """ + return self._do_job(jobs.YumSetRepositoryEnabled(repoid, enable)) + + # ************************************************************************* + # Asynchronous jobs + # ************************************************************************* + @job_request(async=True) + def install_package(self, pkg, async=False, force=False, **metadata): + """ + Install package. + @param pkg is an instance of PackageInfo obtained with + get_package_list() or filter_packages() or a valid nevra as string. + Package must not be installed if force is False. + """ + return self._do_job(_make_async_job(jobs.YumInstallPackage, + pkg, force=force, async=async, metadata=metadata)) + + @job_request(async=True) + def remove_package(self, pkg, async=False, **metadata): + """ + @param pkg is an instance of PackageInfo obtained with + get_package_list() or filter_packages(), which must be installed + """ + return self._do_job(_make_async_job(jobs.YumRemovePackage, + pkg, async=async, metadata=metadata)) + + @job_request(async=True) + def update_to_package(self, desired_pkg, async=False, **metadata): + """ + @param desired_pkg is an instance of PackageInfo, + which must be available + """ + return self._do_job(_make_async_job(jobs.YumUpdateToPackage, + desired_pkg, async=async, metadata=metadata)) + + @job_request(async=True) + def update_package(self, pkg, + async=False, + to_epoch=None, + to_version=None, + to_release=None, + force=False, + **metadata): + """ + @param pkg is an instance of PackageInfo, which must be installed + + The other parameters filter candidate available packages for update. + """ + return self._do_job(_make_async_job(jobs.YumUpdatePackage, + pkg, async, to_epoch, to_version, to_release, force=force, + metadata=metadata)) + + @job_request(async=True) + def check_package(self, pkg, async=False, **metadata): + """ + Return all necessary information from package database for package + verification. + + :param pkg: (``PackageInfo``) An instance of PackageInfo + representing installed package or its nevra string. + :rtype: (``PackageCheck``) + """ + return self._do_job(_make_async_job(jobs.YumCheckPackage, + pkg, async=async, metadata=metadata)) + + @job_request(async=True) + def check_package_file(self, pkg, file_name, async=False): + """ + Return all necessary information from package database concerning + on particular file of package. If ``pkg`` does not contain + ``file_name``, ``FileNotFound`` error is raised. + + :param pkg: (``PackageInfo``) An instance of PackageInfo + representing installed package or its nevra string. + :rtype: (``PackageFile``) + """ + return self._do_job(_make_async_job(jobs.YumCheckPackageFile, + pkg, file_name, async=async)) + + @job_request(async=True) + def install_package_from_uri(self, uri, + async=False, update_only=False, force=False, **metadata): + """ + Install package from uri. + @param uri is either remote url or local path. + """ + return self._do_job(_make_async_job(jobs.YumInstallPackageFromURI, + uri, async, update_only, force=force, metadata=metadata)) + + # ************************************************************************* + # Control of asynchronous jobs + # ************************************************************************* + @job_request() + def get_job(self, jobid): + """ + Return instance of ``YumJob`` with given ``jobid``. + """ + return self._do_job(jobs.YumJobGet(jobid)) + + @job_request() + def get_job_list(self): + """ + Return list of all asynchronous jobs. + """ + return self._do_job(jobs.YumJobGetList()) + + @job_request() + def get_job_by_name(self, name): + """ + Return asynchronous job filtered by its name. + """ + return self._do_job(jobs.YumJobGetByName(name)) + + @job_request() + def set_job_priority(self, jobid, priority): + """ + Change priority of asynchronous job. This will change + its order in queue, if it is still enqeueued. + + Return object of job. + """ + return self._do_job(jobs.YumJobSetPriority(jobid, priority)) + + @job_request() + def update_job(self, jobid, **kwargs): + """ + Update metadata of job. + + :param kwargs: (``dict``) Is a dictionary of job's property names + with mapped new values. Only keys given will be changed in + desired job. + + **Note** that only keys, that do not affect job's priority or its + scheduling for deletion can be changed. See :ref:`YumJobUpdate`. + """ + return self._do_job(jobs.YumJobUpdate(jobid, **kwargs)) + + @job_request() + def reschedule_job(self, jobid, + delete_on_completion, time_before_removal): + """ + Change the scheduling of job for deletion. + + :param delete_on_completion: (``bool``) Says, whether the job will + be scheduled for deletion at ``finished + time_before_removal`` + time. + :param time_before_removal: (``int``) Number of seconds, after the job + is finished, it will be kept alive. + """ + return self._do_job(jobs.YumJobReschedule(jobid, + delete_on_completion, time_before_removal)) + + @job_request() + def delete_job(self, jobid): + """ + Delete job object. This can be called only on finished job. + """ + return self._do_job(jobs.YumJobDelete(jobid)) + + @job_request() + def terminate_job(self, jobid): + """ + Terminate job. This can be called only on *NEW* job. + """ + return self._do_job(jobs.YumJobTerminate(jobid)) + |