# Software Management Providers # # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: Michal Minar # """ Since yum API functions should not be called with different thread_ids repeatedly in the same program. It's neccessary, to make these calls in single thread. But the provider needs to be able to clean up itself, when its not needed. That's why the yum API needs to be accessed from separated process, that is created and terminated when needed. This package contains all the bowels of this separate process together with its management and communication facilities. YumDB is a context manager supposed to be used by any provider as the only accessor to yum api. """ import errno import os import re import time from multiprocessing import Process, JoinableQueue, Queue import Queue as TQueue # T as threaded import threading import yum from openlmi.common import cmpi_logging from openlmi.software.yumdb import jobs from openlmi.software.yumdb import errors from openlmi.software.yumdb.packageinfo import PackageInfo from openlmi.software.yumdb.packagecheck import PackageFile from openlmi.software.yumdb.packagecheck import PackageCheck from openlmi.software.yumdb.process import YumWorker from openlmi.software.util import singletonmixin # this may be used as an argument to YumWorker YUM_WORKER_DEBUG_LOGGING_CONFIG = { "version" : 1, "formatters": { "default": { "format" : "%(asctime)s %(levelname)s:%(module)s:" "%(funcName)s:%(lineno)d - %(message)s" } }, "handlers": { "file" : { "class" : "logging.handlers.RotatingFileHandler", "filename" : "/var/tmp/YumWorker.log", "level" : "DEBUG", "formatter": "default", } }, "root": { "level": "DEBUG", "handlers" : ["file"] } } class YumDB(singletonmixin.Singleton): """ Context manager for accessing yum/rpm database. All requests are bundled into jobs -- instances of jobs.YumJob and sent to YumWorker for processing. YumWorker is a separate process handling all calls to yum api. Communication is done via queues (uplink and downlink). Uplink is used to send jobs to YumWorker and downlink for obtaining results. This is implemented in thread safe manner. It should be used as a context manager in case, we want to process multiple jobs in single transaction. The example of usage: with YumDB.getInstance() as ydb: pkgs = ydb.filter_packages(...) for pkg in pkgs: ydb.install_package(pkg) ... Yum database stays locked in whole block of code under with statement. """ # this is to inform Singleton, that __init__ should be called only once ignoreSubsequent = True def __init__(self, *args, **kwargs): #pylint: disable=W0231 """ All arguments are passed to yum.YumBase constructor. """ self._process = None self._yum_args = (args, kwargs) # used to access _replies dictionary and _expected list self._reply_lock = threading.Lock() # used to wait for job to be processed and received self._reply_cond = threading.Condition(self._reply_lock) # { job_id : reply, ... } self._replies = {} # ids of all expected jobs -- those to be processed by YumWorker self._expected = [] cmpi_logging.logger.trace_info('YumDB: initialized') # ************************************************************************* # Private methods # ************************************************************************* @cmpi_logging.trace_method def _wait_for_reply(self, job): """ Blocks until job is processed by YumWorker and received. Only one thread can block on downlink channel to obtain reply. If it's reply for him, he takes it and leaves, otherwise he adds it to _replies dictionary and notifies other threads. This thread is the one, whose job appears as first in _expected list. @return result of job """ with self._reply_lock: self._expected.append(job.jobid) while True: if job.jobid in self._replies: self._expected.remove(job.jobid) return self._replies.pop(job.jobid) elif job.jobid == self._expected[0]: break else: cmpi_logging.logger.debug( "[jobid=%d] another %s threads expecting reply," " suspending...", job.jobid, len(self._expected) - 1) self._reply_cond.wait() cmpi_logging.logger.debug( "[jobid=%d] received reply, waking up", job.jobid) while True: cmpi_logging.logger.debug("[jobid=%d] blocking on downlink queue", job.jobid) jobid, reply = self._worker.downlink.get() with self._reply_lock: if jobid != job.jobid: cmpi_logging.logger.debug("[jobid=%d] received reply" " for another thread (jobid=%d)", job.jobid, jobid) self._replies[jobid] = reply self._reply_cond.notifyAll() else: cmpi_logging.logger.debug( "[jobid=%d] received desired reply", job.jobid) self._expected.remove(job.jobid) if len(self._expected): self._reply_cond.notify() break return reply def _do_job(self, job): """ Sends the job to YumWorker process and waits for reply. If reply is a tuple, there was an error, while job processing. Incoming exception is in format: (exception_type, exception_value, formated_traceback_as_string) @return reply """ cmpi_logging.logger.trace_verbose("YumDB: doing %s(id=%s) job", job.__class__.__name__, job.jobid) self._worker.uplink.put(job) reply = self._wait_for_reply(job) if isinstance(reply, tuple): cmpi_logging.logger.error( "YumDB: job %s(id=%s) failed with error %s: %s", job.__class__.__name__, job.jobid, reply[0].__name__, str(reply[1])) cmpi_logging.logger.trace_warn( "YumDB: job %s(id=%s) exception traceback:\n%s%s: %s", job.__class__.__name__, job.jobid, "".join(reply[2]), reply[0].__name__, str(reply[1])) reply[1].tb_printed = True raise reply[1] cmpi_logging.logger.trace_verbose("YumDB: job %s(id=%s) done", job.__class__.__name__, job.jobid) return reply @property def _worker(self): """ YumWorker process accessor. It's created upon first need. """ if self._process is None: cmpi_logging.logger.trace_info("YumDB: starting YumWorker") uplink = JoinableQueue() downlink = Queue() self._process = YumWorker(uplink, downlink, yum_args=self._yum_args[0], yum_kwargs=self._yum_args[1]) #logging_config=YUM_WORKER_DEBUG_LOGGING_CONFIG) self._process.start() cmpi_logging.logger.trace_info( "YumDB: YumWorker started with pid=%s", self._process.pid) return self._process # ************************************************************************* # Special methods # ************************************************************************* def __del__(self): """ Ensure, that YumWorker process is correctly shutted down. """ self.clean_up() YumDB.__del__(self) @cmpi_logging.trace_method def __enter__(self): self._do_job(jobs.YumBeginSession()) cmpi_logging.logger.trace_info('YumDB: new session started') return self @cmpi_logging.trace_method def __exit__(self, exc_type, exc_value, traceback): self._do_job(jobs.YumEndSession()) cmpi_logging.logger.trace_info('YumDB: session ended') # ************************************************************************* # Public methods # ************************************************************************* @cmpi_logging.trace_method def clean_up(self): """ Shut down the YumWorker process. """ if self._process is not None: cmpi_logging.logger.info('YumDB: terminating YumWorker') self._process.uplink.put(None) # terminating command self._process.uplink.join() self._process.join() cmpi_logging.logger.info('YumDB: YumWorker terminated') self._process = None else: cmpi_logging.logger.warn("YunDB: clean_up called, when process" " not initialized!") @cmpi_logging.trace_method def get_package_list(self, kind, allow_duplicates=False, sort=False): """ @param kind is one of: jobs.YumGetPackageList.SUPPORTED_KINDS @param allow_duplicates says, whether to list all found versions of single package @return [pkg1, pkg2, ...], pkgi is instance of yumdb.PackageInfo """ return self._do_job(jobs.YumGetPackageList( kind, allow_duplicates=allow_duplicates, sort=sort)) @cmpi_logging.trace_method def filter_packages(self, kind, allow_duplicates=False, sort=False, **filters): """ Similar to get_package_list(), but applies filter on packages. @see yumdb.jobs.YumFilterPackages job for supported filter keys """ return self._do_job(jobs.YumFilterPackages( kind, allow_duplicates=allow_duplicates, sort=sort, **filters)) @cmpi_logging.trace_method def install_package(self, pkg): """ Install package. @param pkg is an instance of PackageInfo obtained with get_package_list() or filter_packages(), which must be not installed """ return self._do_job(jobs.YumInstallPackage(pkg)) @cmpi_logging.trace_method def remove_package(self, pkg): """ @param pkg is an instance of PackageInfo obtained with get_package_list() or filter_packages(), which must be installed """ return self._do_job(jobs.YumRemovePackage(pkg)) @cmpi_logging.trace_method def update_to_package(self, desired_pkg): """ @param desired_pkg is an instance of PackageInfo, which must be available """ return self._do_job(jobs.YumUpdateToPackage(desired_pkg)) @cmpi_logging.trace_method def update_package(self, pkg, to_epoch=None, to_version=None, to_release=None): """ @param pkg is an instance of PackageInfo, which must be installed The other parameters filter candidate available packages for update. """ return self._do_job(jobs.YumUpdatePackage( pkg, to_epoch, to_version, to_release)) @cmpi_logging.trace_method def check_package(self, pkg): """ @param pkg is an instance of PackageInfo representing installed package @return instance of yumdb.PackageCheck """ return self._do_job(jobs.YumCheckPackage(pkg))