diff options
Diffstat (limited to 'src/software/openlmi/software/yumdb/__init__.py')
-rw-r--r-- | src/software/openlmi/software/yumdb/__init__.py | 307 |
1 files changed, 307 insertions, 0 deletions
diff --git a/src/software/openlmi/software/yumdb/__init__.py b/src/software/openlmi/software/yumdb/__init__.py new file mode 100644 index 0000000..54eeae0 --- /dev/null +++ b/src/software/openlmi/software/yumdb/__init__.py @@ -0,0 +1,307 @@ +# Software Management Providers +# +# Copyright (C) 2012 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Since yum API functions should not be called with different thread_ids +repeatedly in the same program. It's neccessary, to make these calls +in single thread. But the provider needs to be able to clean up itself, +when its not needed. That's why the yum API needs to be accessed from +separated process, that is created and terminated when needed. + +This package contains all the bowels of this separate process together +with its management and communication facilities. + +YumDB is a context manager supposed to be used by any provider as the +only accessor to yum api. +""" + +import errno +import os +import re +import time +from multiprocessing import Process, JoinableQueue, Queue +import Queue as TQueue # T as threaded +import threading +import yum + +from openlmi.software.yumdb import jobs +from openlmi.software.yumdb import errors +from openlmi.software.yumdb.packageinfo import PackageInfo +from openlmi.software.yumdb.packagecheck import PackageFile +from openlmi.software.yumdb.packagecheck import PackageCheck +from openlmi.software.yumdb.process import YumWorker +from openlmi.software.util import cmpi_logging +from openlmi.software.util import singletonmixin + +# this may be used as an argument to YumWorker +YUM_WORKER_DEBUG_LOGGING_CONFIG = { + "version" : 1, + "formatters": { + "default": { + "format" : "%(asctime)s %(levelname)s:%(module)s:" + "%(funcName)s:%(lineno)d - %(message)s" + } + }, + "handlers": { + "file" : { + "class" : "logging.handlers.RotatingFileHandler", + "filename" : "/var/tmp/YumWorker.log", + "level" : "DEBUG", + "formatter": "default", + } + }, + "root": { + "level": "DEBUG", + "handlers" : ["file"] + } + } + +class YumDB(singletonmixin.Singleton): + """ + Context manager for accessing yum/rpm database. + All requests are bundled into jobs -- instances of jobs.YumJob and + sent to YumWorker for processing. + + YumWorker is a separate process handling all calls to yum api. + Communication is done via queues (uplink and downlink). + Uplink is used to send jobs to YumWorker and downlink for obtaining + results. + + This is implemented in thread safe manner. + + It should be used as a context manager in case, we want to process + multiple jobs in single transaction. The example of usage: + with YumDB.getInstance() as ydb: + pkgs = ydb.filter_packages(...) + for pkg in pkgs: + ydb.install_package(pkg) + ... + Yum database stays locked in whole block of code under with statement. + """ + + # this is to inform Singleton, that __init__ should be called only once + ignoreSubsequent = True + + def __init__(self, *args, **kwargs): #pylint: disable=W0231 + """ + All arguments are passed to yum.YumBase constructor. + """ + self._process = None + self._yum_args = (args, kwargs) + + # used to access _replies dictionary and _expected list + self._reply_lock = threading.Lock() + # used to wait for job to be processed and received + self._reply_cond = threading.Condition(self._reply_lock) + # { job_id : reply, ... } + self._replies = {} + # ids of all expected jobs -- those to be processed by YumWorker + self._expected = [] + cmpi_logging.logger.trace_info('YumDB: initialized') + + # ************************************************************************* + # Private methods + # ************************************************************************* + @cmpi_logging.trace_method + def _wait_for_reply(self, job): + """ + Blocks until job is processed by YumWorker and received. + + Only one thread can block on downlink channel to obtain reply. If + it's reply for him, he takes it and leaves, otherwise he adds it to + _replies dictionary and notifies other threads. This thread is the + one, whose job appears as first in _expected list. + + @return result of job + """ + with self._reply_lock: + # until our job is not at the head of + self._expected.append(job.jobid) + while job.jobid != self._expected[0]: + if job.jobid in self._replies: + self._expected.remove(job.jobid) + return self._replies.pop(job.jobid) + else: + self._reply_cond.wait() + while True: + jobid, reply = self._worker.downlink.get() + with self._reply_lock: + if jobid != job.jobid: + self._replies[jobid] = reply + self._reply_cond.notifyAll() + else: + self._expected.remove(job.jobid) + if len(self._expected): + self._reply_cond.notify() + break + return reply + + def _do_job(self, job): + """ + Sends the job to YumWorker process and waits for reply. + If reply is a tuple, there was an error, while job processing. + Incoming exception is in format: + (exception_type, exception_value, formated_traceback_as_string) + @return reply + """ + cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job", + job.__class__.__name__, job.jobid) + self._worker.uplink.put(job) + reply = self._wait_for_reply(job) + if isinstance(reply, tuple): + cmpi_logging.logger.error( + "YumDB: job %s(id=%s) failed with error %s: %s", + job.__class__.__name__, job.jobid, + reply[0].__name__, str(reply[1])) + cmpi_logging.logger.trace_warn( + "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s", + job.__class__.__name__, job.jobid, "".join(reply[2]), + reply[0].__name__, str(reply[1])) + reply[1].tb_printed = True + raise reply[1] + cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done", + job.__class__.__name__, job.jobid) + return reply + + @property + def _worker(self): + """ + YumWorker process accessor. It's created upon first need. + """ + if self._process is None: + cmpi_logging.logger.trace_info("YumDB: starting YumWorker") + uplink = JoinableQueue() + downlink = Queue() + self._process = YumWorker(uplink, downlink, + yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1]) + #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) + self._process.start() + return self._process + + # ************************************************************************* + # Special methods + # ************************************************************************* + def __del__(self): + """ + Ensure, that YumWorker process is correctly shutted down. + """ + self.clean_up() + YumDB.__del__(self) + + @cmpi_logging.trace_method + def __enter__(self): + self._do_job(jobs.YumBeginSession()) + cmpi_logging.logger.trace_info('YumDB: new session started') + return self + + @cmpi_logging.trace_method + def __exit__(self, exc_type, exc_value, traceback): + self._do_job(jobs.YumEndSession()) + cmpi_logging.logger.trace_info('YumDB: session ended') + + # ************************************************************************* + # Public methods + # ************************************************************************* + @cmpi_logging.trace_method + def clean_up(self): + """ + Shut down the YumWorker process. + """ + cmpi_logging.logger.info('YumDB: cleanup called') + if self._process: + cmpi_logging.logger.info('YumDB: terminating YumWorker') + self._process.uplink.put(None) # terminating command + self._process.uplink.join() + self._process.join() + cmpi_logging.logger.info('YumDB: YumWorker terminated') + self._process = None + + @cmpi_logging.trace_method + def get_package_list(self, kind, + allow_duplicates=False, + sort=False): + """ + @param kind is one of: {"installed", "available", "all"} + @param allow_duplicates says, whether to list all found versions + of single package + @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo + """ + return self._do_job(jobs.YumGetPackageList( + kind, allow_duplicates=allow_duplicates, sort=sort)) + + @cmpi_logging.trace_method + def filter_packages(self, kind, + allow_duplicates=False, + sort=False, + **filters): + """ + Similar to get_package_list(), but applies filter on packages. + @see yumdb.jobs.YumFilterPackages job for supported filter keys + """ + return self._do_job(jobs.YumFilterPackages( + kind, allow_duplicates=allow_duplicates, sort=sort, + **filters)) + + @cmpi_logging.trace_method + def install_package(self, pkg): + """ + Install package. + @param pkg is an instance of PackageInfo obtained with + get_package_list() or filter_packages(), which must be not installed + """ + return self._do_job(jobs.YumInstallPackage(pkg)) + + @cmpi_logging.trace_method + def remove_package(self, pkg): + """ + @param pkg is an instance of PackageInfo obtained with + get_package_list() or filter_packages(), which must be installed + """ + return self._do_job(jobs.YumRemovePackage(pkg)) + + @cmpi_logging.trace_method + def update_to_package(self, desired_pkg): + """ + @param desired_pkg is an instance of PackageInfo, + which must be available + """ + return self._do_job(jobs.YumUpdateToPackage(desired_pkg)) + + @cmpi_logging.trace_method + def update_package(self, pkg, + to_epoch=None, + to_version=None, + to_release=None): + """ + @param pkg is an instance of PackageInfo, which must be installed + + The other parameters filter candidate available packages for update. + """ + return self._do_job(jobs.YumUpdatePackage( + pkg, to_epoch, to_version, to_release)) + + @cmpi_logging.trace_method + def check_package(self, pkg): + """ + @param pkg is an instance of PackageInfo representing installed package + @return instance of yumdb.PackageCheck + """ + return self._do_job(jobs.YumCheckPackage(pkg)) + |