diff options
Diffstat (limited to 'src/software/lmi/software/yumdb/process.py')
-rw-r--r-- | src/software/lmi/software/yumdb/process.py | 914 |
1 files changed, 914 insertions, 0 deletions
diff --git a/src/software/lmi/software/yumdb/process.py b/src/software/lmi/software/yumdb/process.py new file mode 100644 index 0000000..85abe61 --- /dev/null +++ b/src/software/lmi/software/yumdb/process.py @@ -0,0 +1,914 @@ +# -*- encoding: utf-8 -*- +# Software Management Providers +# +# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved. +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: Michal Minar <miminar@redhat.com> +# +""" +Module holding the code of separate process accessing the YUM API. +""" + +import errno +from itertools import chain +import logging +from multiprocessing import Process +import os +import Queue as TQueue # T as threaded +import sys +import time +import traceback +from urlgrabber.grabber import default_grabber +import weakref +import yum + +from lmi.software import util +from lmi.software.yumdb import errors +from lmi.software.yumdb import jobs +from lmi.software.yumdb import packageinfo +from lmi.software.yumdb import packagecheck +from lmi.software.yumdb import repository +from lmi.software.yumdb.jobmanager import JobManager +from lmi.software.yumdb.util import trace_function, setup_logging + +# ***************************************************************************** +# Constants +# ***************************************************************************** +# interval in seconds +FREE_DATABASE_TIMEOUT = 60 +LOCK_WAIT_INTERVAL = 0.5 +RPMDB_PATH = '/var/lib/rpm/Packages' +LOG = None + +# ***************************************************************************** +# Utilities +# **************************************************************************** +def _get_package_filter_function(filters): + """ + @param filters is a dictionary, where keys are package property + names and values are their desired values. + @return a function used to filter list of packages + """ + if not isinstance(filters, dict): + raise TypeError("filters must be a dictionary") + + filters = dict((k, value) for k, value in filters.items() + if value is not None) + + match = None + if "nevra" in filters: + match = util.RE_NEVRA.match(filters["nevra"]) + elif "envra" in filters: + match = util.RE_ENVRA.match(filters["envra"]) + if match is not None: + for attr in ("name", "epoch", "version", "release", "arch"): + match_attr = attr + filters[attr] = match.group(match_attr) + filters.pop('nevra', None) + filters.pop('envra', None) + elif "evra" in filters: + for prop_name in ("epoch", "version", "release", "epoch"): + filters.pop(prop_name, None) + filter_list = [] + # properties are sorted by their filtering ability + # (the most unprobable property, that can match, comes first) + for prop_name in ("evra", "name", "version", "epoch", + "release", "repoid", "arch"): + if not prop_name in filters: + continue + filter_list.append((prop_name, filters.pop(prop_name))) + def _cmp_props(pkg): + """@return True if pkg matches properies filter""" + return all(getattr(pkg, p) == v for p, v in filter_list) + return _cmp_props + +class RepoFilterSetter(object): + """ + A context manager, that will set a repository filter lasting + as long as the object itself. + """ + def __init__(self, yum_base, include_repos=None, exclude_repos=None): + if not isinstance(yum_base, yum.YumBase): + raise TypeError("yum_base must be a YumBase instance") + self._yum_base = yum_base + self._include = include_repos + self._exclude = exclude_repos + # after __enter__ this will be dictionary containing ( + # repoid, enabled) pairs + self._prev_states = None + + def __enter__(self): + self._prev_states = { r.id: r.enabled + for r in self._yum_base.repos.repos.values()} + if isinstance(self._exclude, (list, tuple, set)): + exclude = ",".join(self._exclude) + else: + exclude = self._exclude + # set of repositories, that were affected + repos = set() + if exclude: + repos.update(self._yum_base.repos.disableRepo(exclude)) + LOG.info('disabling repositories: [%s]', ", ".join(repos)) + if isinstance(self._include, (list, tuple, set)): + include = ",".join(self._include) + else: + include = self._include + if include: + affected = self._yum_base.repos.enableRepo(include) + LOG.info('enabling repositories: [%s]', ", ".join(affected)) + repos.update(affected) + for repoid, prev_enabled in self._prev_states.items(): + if ( repoid not in repos + or ( bool(prev_enabled) + is bool(self._yum_base.repos.getRepo(repoid).enabled))): + # keep only manipulated repositories + del self._prev_states[repoid] + if len(self._prev_states): + for repoid in (r for r, v in self._prev_states.items() if v): + self._yum_base.pkgSack.sacks.pop(repoid, None) + self._yum_base.repos.populateSack() + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + # restore previous repository states + if len(self._prev_states): + LOG.info('restoring repositories: [%s]', + ", ".join(self._prev_states.keys())) + for repoid, enabled in self._prev_states.items(): + repo = self._yum_base.repos.getRepo(repoid) + if enabled: + repo.enable() + else: + repo.disable() + for repoid in (r for r, v in self._prev_states.items() if not v): + self._yum_base.pkgSack.sacks.pop(repoid, None) + self._yum_base.repos.populateSack() + +# ***************************************************************************** +# Decorators +# ***************************************************************************** +def _needs_database(method): + """ + Decorator for YumWorker job handlers, that need to access the yum database. + It ensures, that database is initialized and locks it in case, that + no session is active. + """ + logged = trace_function(method) + def _wrapper(self, *args, **kwargs): + """ + Wrapper for the job handler method. + """ + created_session = False + self._init_database() #pylint: disable=W0212 + if self._session_level == 0: #pylint: disable=W0212 + self._session_level = 1 #pylint: disable=W0212 + created_session = True + self._lock_database() #pylint: disable=W0212 + try: + LOG.debug("calling job handler %s with args=(%s)", + method.__name__, + ", ".join(chain( + (str(a) for a in args), + ("%s=%s"%(k, str(v)) for k, v in kwargs.items())))) + result = logged(self, *args, **kwargs) + LOG.debug("job handler %s finished", method.__name__) + return result + finally: + if created_session is True: #pylint: disable=W0212 + self._session_level = 0 #pylint: disable=W0212 + self._unlock_database() #pylint: disable=W0212 + return _wrapper + +# ***************************************************************************** +# Classes +# ***************************************************************************** +class YumWorker(Process): + """ + The main process, that works with YUM API. It has two queues, one + for input jobs and second for results. + + Jobs are dispatched by their class names to particular handler method. + + It spawns a second thread for managing asynchronous jobs and queue + of incoming jobs. It's an instance of JobManager. + """ + + def __init__(self, + queue_in, + queue_out, + indication_manager, + yum_kwargs=None, + logging_config=None): + Process.__init__(self, name="YumWorker") + self._jobmgr = JobManager(queue_in, queue_out, indication_manager) + self._session_level = 0 + self._session_ended = False + + if yum_kwargs is None: + yum_kwargs = {} + + self._yum_kwargs = yum_kwargs + self._yum_base = None + + self._pkg_cache = None + # contains (repoid, time_stamp_of_config_file) + # plus (/repos/dir, ...) for each repo config directory + self._repodir_mtimes = {} + self._logging_config = logging_config + + # ************************************************************************* + # Private methods + # ************************************************************************* + @trace_function + def _init_database(self): + """ + Initializes yum base object, when it does no exists. + And updates the cache (when out of date). + """ + if self._yum_base is None: + LOG.info("creating YumBase with kwargs=(%s)", + ", ".join(( "%s=%s"%(k, str(v)) + for k, v in self._yum_kwargs.items()))) + self._yum_base = yum.YumBase(**self._yum_kwargs) + + @trace_function + def _free_database(self): + """ + Release the yum base object to safe memory. + """ + LOG.info("freing database") + self._pkg_cache.clear() + self._yum_base = None + + @trace_function + def _lock_database(self): + """ + Only one process is allowed to work with package database at given time. + That's why we lock it. + + Try to lock it in loop, until success. + """ + while True: + try: + LOG.info("trying to lock database - session level %d", + self._session_level) + self._yum_base.doLock() + LOG.info("successfully locked up") + break + except yum.Errors.LockError as exc: + LOG.warn("failed to lock") + if exc.errno in (errno.EPERM, errno.EACCES, errno.ENOSPC): + LOG.error("can't create lock file") + raise errors.DatabaseLockError("Can't create lock file.") + LOG.info("trying to lock again after %.1f seconds", + LOCK_WAIT_INTERVAL) + time.sleep(LOCK_WAIT_INTERVAL) + + @trace_function + def _unlock_database(self): + """ + The opposite to _lock_database() method. + """ + if self._yum_base is not None: + LOG.info("unlocking database") + self._yum_base.closeRpmDB() + self._yum_base.doUnlock() + + @trace_function + def _get_job(self): + """ + Get job from JobManager thread. + If no job comes for long time, free database to save memory. + """ + while True: + if self._session_ended and self._session_level == 0: + try: + return self._jobmgr.get_job(timeout=FREE_DATABASE_TIMEOUT) + except TQueue.Empty: + self._free_database() + self._session_ended = False + else: + return self._jobmgr.get_job() + + @trace_function + def _transform_packages(self, packages, + cache_packages=True, + flush_cache=True): + """ + Return instances of PackageInfo for each package in packages. + Cache all the packages. + @param packages list of YumAvailablePackage instances + @param cache_packages whether to update cache with packages + @param flush_cache whether to clear the cache before adding input + packages; makes sense only with cachee_packages=True + """ + if cache_packages is True and flush_cache is True: + LOG.debug("flushing package cache") + self._pkg_cache.clear() + res = [] + for orig in packages: + pkg = packageinfo.make_package_from_db(orig) + if cache_packages is True: + self._pkg_cache[pkg.objid] = orig + res.append(pkg) + return res + + @trace_function + def _cache_packages(self, packages, flush_cache=True, transform=False): + """ + Store packages in cache and return them. + @param flush_cache whether to clear the cache before adding new + packages + @param transform whether to return packages as PackageInfos + @return either list of original packages or PackageInfo instances + """ + if transform is True: + return self._transform_packages(packages, flush_cache=flush_cache) + if flush_cache is True: + LOG.debug("flushing package cache") + self._pkg_cache.clear() + for pkg in packages: + self._pkg_cache[id(pkg)] = pkg + return packages + + @trace_function + def _lookup_package(self, pkg): + """ + Lookup the original package in cache. + If it was garbage collected already, make new query to find it. + @return instance of YumAvailablePackage + """ + if not isinstance(pkg, packageinfo.PackageInfo): + raise TypeError("pkg must be instance of PackageInfo") + LOG.debug("looking up yum package %s with id=%d", + pkg, pkg.objid) + try: + result = self._pkg_cache[pkg.objid] + LOG.debug("lookup successful") + except KeyError: + LOG.warn("lookup of package %s with id=%d failed, trying" + " to query database", pkg, pkg.objid) + result = self._handle_filter_packages( + 'installed' if pkg.installed else 'available', + allow_duplicates=False, + sort=False, + transform=False, + **pkg.key_props) + if len(result) < 1: + LOG.warn("package %s not found", pkg) + raise errors.PackageNotFound( + "package %s could not be found" % pkg) + result = result[0] + return result + + @trace_function + def _clear_repository_cache(self): + """ + Clears the repository cache and their configuration directory + last modification times. + """ + if self._yum_base is not None: + for repoid in self._yum_base.repos.repos.keys(): + self._yum_base.repos.delete(repoid) + del self._yum_base.repos + del self._yum_base.pkgSack + self._repodir_mtimes.clear() + + @trace_function + def _check_repository_configs(self): + """ + Checks whether repository information is up to date with configuration + files by comparing timestamps. If not, repository cache will be + released. + """ + dirty = False + if self._repodir_mtimes: + for repodir in self._yum_base.conf.reposdir: + if ( os.path.exists(repodir) + and ( not repodir in self._repodir_mtimes + or ( os.stat(repodir).st_mtime + > self._repodir_mtimes[repodir]))): + LOG.info("repository config dir %s changed", repodir) + dirty = True + break + if not dirty: + for repo in self._yum_base.repos.repos.values(): + filename = repo.repofile + if ( not os.path.exists(filename) + or ( int(os.stat(filename).st_mtime) + > repo.repo_config_age)): + LOG.info('config file of repository "%s" changed', + repo.id) + dirty = True + break + if dirty is True: + LOG.info("repository cache is dirty, cleaning up ...") + self._clear_repository_cache() + self._yum_base.getReposFromConfig() + if dirty is True or not self._repodir_mtimes: + self._update_repodir_mtimes() + + @trace_function + def _update_repodir_mtimes(self): + """ + Updates the last modification times of repo configuration directories. + """ + assert self._yum_base is not None + for repodir in self._yum_base.conf.reposdir: + if os.path.exists(repodir): + self._repodir_mtimes[repodir] = os.stat(repodir).st_mtime + + @trace_function + def _do_work(self, job): + """ + Dispatcher of incoming jobs. Job is passed to the right handler + depending on its class. + """ + if not isinstance(job, jobs.YumJob): + raise TypeError("job must be instance of YumJob") + try: + handler = { + jobs.YumGetPackageList : self._handle_get_package_list, + jobs.YumFilterPackages : self._handle_filter_packages, + jobs.YumInstallPackage : self._handle_install_package, + jobs.YumRemovePackage : self._handle_remove_package, + jobs.YumUpdateToPackage : self._handle_update_to_package, + jobs.YumUpdatePackage : self._handle_update_package, + jobs.YumBeginSession : self._handle_begin_session, + jobs.YumEndSession : self._handle_end_session, + jobs.YumCheckPackage : self._handle_check_package, + jobs.YumCheckPackageFile : self._handle_check_package_file, + jobs.YumInstallPackageFromURI : \ + self._handle_install_package_from_uri, + jobs.YumGetRepositoryList : \ + self._handle_get_repository_list, + jobs.YumFilterRepositories : self._handle_filter_repositories, + jobs.YumSetRepositoryEnabled : \ + self._handle_set_repository_enabled + }[job.__class__] + LOG.info("processing job %s(id=%d)", + job.__class__.__name__, job.jobid) + except KeyError: + LOG.error("No handler for job \"%s\"", job.__class__.__name__) + raise errors.UnknownJob("No handler for job \"%s\"." % + job.__class__.__name__) + return handler(**job.job_kwargs) + + @trace_function + def _run_transaction(self, name): + """ + Builds and runs the yum transaction and checks for errors. + @param name of transaction used only in error description on failure + """ + LOG.info("building transaction %s", name) + (code, msgs) = self._yum_base.buildTransaction() + if code == 1: + LOG.error("building transaction %s failed: %s", + name, "\n".join(msgs)) + raise errors.TransactionBuildFailed( + "Failed to build \"%s\" transaction: %s" % ( + name, "\n".join(msgs))) + LOG.info("processing transaction %s", name) + self._yum_base.processTransaction() + self._yum_base.closeRpmDB() + + @trace_function + def _main_loop(self): + """ + This is a main loop called from run(). Jobs are handled here. + It accepts a job from input queue, handles it, + sends the result to output queue and marks the job as done. + + It is terminated, when None is received from input queue. + """ + while True: + job = self._get_job() + if job is not None: # not a terminate command + result = jobs.YumJob.RESULT_SUCCESS + try: + data = self._do_work(job) + except Exception: #pylint: disable=W0703 + result = jobs.YumJob.RESULT_ERROR + # (type, value, traceback) + data = sys.exc_info() + # traceback is not pickable - replace it with formatted + # text + data = (data[0], data[1], traceback.format_tb(data[2])) + LOG.exception("job %s failed", job) + self._jobmgr.finish_job(job, result, data) + if job is None: + LOG.info("waiting for %s to finish", self._jobmgr.name) + self._jobmgr.join() + break + + @trace_function + def _handle_begin_session(self): + """ + Handler for session begin job. + """ + self._session_level += 1 + LOG.info("beginning session level %s", self._session_level) + if self._session_level == 1: + self._init_database() + self._lock_database() + + @trace_function + def _handle_end_session(self): + """ + Handler for session end job. + """ + LOG.info("ending session level %d", self._session_level) + self._session_level = max(self._session_level - 1, 0) + if self._session_level == 0: + self._unlock_database() + self._session_ended = True + + @_needs_database + def _handle_get_package_list(self, kind, allow_duplicates, sort, + include_repos=None, exclude_repos=None, transform=True): + """ + Handler for listing packages job. + @param transform says, whether to return just a package abstractions + or original ones + @return [pkg1, pkg2, ...] + """ + if kind == 'avail_notinst': + what = 'available' + elif kind == 'available': + what = 'all' + elif kind == 'avail_reinst': + what = 'all' + else: + what = kind + with RepoFilterSetter(self._yum_base, include_repos, exclude_repos): + LOG.debug("calling YumBase.doPackageLists(%s, showdups=%s)", + what, allow_duplicates) + pkglist = self._yum_base.doPackageLists(what, + showdups=allow_duplicates) + if kind == 'all': + result = pkglist.available + pkglist.installed + elif kind == 'available': + result = pkglist.available + pkglist.reinstall_available + elif kind == 'avail_reinst': + result = pkglist.reinstall_available + else: # get installed or available + result = getattr(pkglist, what) + if sort is True: + result.sort() + LOG.debug("returning %s packages", len(result)) + return self._cache_packages(result, transform=transform) + + @_needs_database + def _handle_filter_packages(self, kind, allow_duplicates, sort, + include_repos=None, exclude_repos=None, + transform=True, **filters): + """ + Handler for filtering packages job. + @return [pkg1, pkg2, ...] + """ + pkglist = self._handle_get_package_list(kind, allow_duplicates, False, + include_repos=include_repos, exclude_repos=exclude_repos, + transform=False) + matches = _get_package_filter_function(filters) + result = [p for p in pkglist if matches(p)] + if sort is True: + result.sort() + LOG.debug("%d packages matching", len(result)) + if transform is True: + # caching has been already done by _handle_get_package_list() + result = self._transform_packages(result, cache_packages=False) + return result + + @_needs_database + def _handle_install_package(self, pkg, force=False): + """ + Handler for package installation job. + @return installed package instance + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages( + 'available' if force else 'avail_notinst', + allow_duplicates=False, sort=True, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + elif len(pkgs) > 1: + LOG.warn('multiple packages matches nevra "%s": [%s]', + pkg, ", ".join(p.nevra for p in pkgs)) + pkg_desired = pkgs[-1] + else: + pkg_desired = self._lookup_package(pkg) + if isinstance(pkg_desired, yum.rpmsack.RPMInstalledPackage): + if force is False: + raise errors.PackageAlreadyInstalled(pkg) + action = "reinstall" + else: + action = "install" + getattr(self._yum_base, action)(pkg_desired) + self._run_transaction(action) + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg_desired, with_epoch="ALWAYS")) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to install desired package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_remove_package(self, pkg): + """ + Handler for package removal job. + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) + self._yum_base.remove(pkg) + self._run_transaction("remove") + + @_needs_database + def _handle_update_to_package(self, pkg): + """ + Handler for specific package update job. + @return package corresponding to pkg after update + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('available', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg_desired = pkgs[-1] + else: + pkg_desired = self._lookup_package(pkg) + self._yum_base.update(update_to=True, + name=pkg_desired.name, + epoch=pkg_desired.epoch, + version=pkg_desired.version, + release=pkg_desired.release, + arch=pkg_desired.arch) + self._run_transaction("update") + installed = self._handle_filter_packages("installed", False, False, + **pkg.key_props) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to update to desired package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_update_package(self, pkg, to_epoch, to_version, to_release, + _force=False): + """ + Handler for package update job. + @return updated package instance + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) + if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + raise errors.PackageNotInstalled(pkg) + kwargs = { "name" : pkg.name, "arch" : pkg.arch } + if any(v is not None for v in (to_epoch, to_version, to_release)): + kwargs["update_to"] = True + if to_epoch: + kwargs["to_epoch"] = to_epoch + if to_version: + kwargs["to_version"] = to_version + if to_release: + kwargs["to_release"] = to_release + self._yum_base.update(**kwargs) + self._run_transaction("update") + kwargs = dict( (k[3:] if k.startswith("to_") else k, v) + for k, v in kwargs.items()) + installed = self._handle_filter_packages( + "installed", False, False, **kwargs) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to update package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_check_package(self, pkg, file_name=None): + """ + @return PackageCheck instance for requested package + """ + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + rpm = pkgs[-1] + pkg = self._transform_packages((rpm, ), cache_packages=False)[0] + else: + rpm = self._lookup_package(pkg) + if not isinstance(rpm, yum.rpmsack.RPMInstalledPackage): + raise errors.PackageNotInstalled(rpm) + vpkg = yum.packages._RPMVerifyPackage(rpm, rpm.hdr.fiFromHeader(), + packagecheck.pkg_checksum_type(rpm), [], True) + return (pkg, packagecheck.make_package_check_from_db(vpkg, + file_name=file_name)) + + @_needs_database + def _handle_check_package_file(self, pkg, file_name): + """ + @return PackageCheck instance for requested package containing + just one PackageFile instance for given ``file_name``. + """ + return self._handle_check_package(pkg, file_name) + + @_needs_database + def _handle_install_package_from_uri(self, uri, + update_only=False, force=False): + """ + @return installed PackageInfo instance + """ + try: + pkg = yum.packages.YumUrlPackage(self, + ts=self._yum_base.rpmdb.readOnlyTS(), url=uri, + ua=default_grabber) + except yum.Errors.MiscError as exc: + raise errors.PackageOpenError(uri, str(exc)) + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS")) + if installed and force is False: + raise errors.PackageAlreadyInstalled(pkg) + kwargs = { 'po' : pkg } + if installed: + action = 'reinstallLocal' + else: + action = 'installLocal' + kwargs = { 'updateonly' : update_only } + getattr(self._yum_base, action)(uri, **kwargs) + self._run_transaction('installLocal') + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS")) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to install desired package %s." % pkg) + return installed[0] + + @_needs_database + def _handle_get_repository_list(self, kind, transform=True): + """ + @return list of yumdb.Repository instances + """ + self._check_repository_configs() + if kind == 'enabled': + repos = sorted(self._yum_base.repos.listEnabled()) + else: + repos = self._yum_base.repos.repos.values() + if kind == 'disabled': + repos = [repo for repo in repos if not repo.enabled] + repos.sort() + if transform: + repos = [repository.make_repository_from_db(r) for r in repos] + LOG.debug("returning %d repositories from %s", + len(repos), kind) + return repos + + @_needs_database + def _handle_filter_repositories(self, kind, **filters): + """ + @return list of yumdb.Repository instances -- filtered + """ + filters = dict((k, v) for k, v in filters.items() if v is not None) + if 'repoid' in filters: + self._check_repository_configs() + try: + repo = repository.make_repository_from_db( + self._yum_base.repos.getRepo(filters["repoid"])) + if ( (kind == "enabled" and not repo.enabled) + or (kind == "disabled" and repo.enabled)): + LOG.warn( + 'no such repository with id="%s"matching filters', + filters['repoid']) + return [] + LOG.debug( + "exactly one repository matching filters found") + return [repo] + except (KeyError, yum.Errors.RepoError): + LOG.warn('repository with id="%s" could not be found', + filters['repoid']) + raise errors.RepositoryNotFound(filters['repoid']) + + repos = self._handle_get_repository_list(kind, transform=False) + result = [] + for repo in repos: + # do the filtering and safe transformed repo into result + for prop, value in filters.items(): + if repository.get_prop_from_yum_repo(repo, prop) != value: + # did not pass the filter + break + else: # all properties passed + result.append(repository.make_repository_from_db(repo)) + LOG.debug("found %d repositories matching", len(result)) + return result + + @_needs_database + def _handle_set_repository_enabled(self, repo, enable): + """ + @return previous enabled state + """ + self._check_repository_configs() + if isinstance(repo, repository.Repository): + repoid = repo.repoid + else: + repoid = repo + try: + repo = self._yum_base.repos.getRepo(repoid) + except (KeyError, yum.Errors.RepoError): + raise errors.RepositoryNotFound(repoid) + res = repo.enabled + try: + if enable ^ res: + if enable is True: + LOG.info("enabling repository %s" % repo) + repo.enable() + else: + LOG.info("disabling repository %s" % repo) + repo.disable() + try: + yum.config.writeRawRepoFile(repo, only=["enabled"]) + except Exception as exc: + raise errors.RepositoryChangeError( + 'failed to modify repository "%s": %s - %s' % ( + repo, exc.__class__.__name__, str(exc))) + else: + LOG.info("no change for repo %s", repo) + except yum.Errors.RepoError as exc: + raise errors.RepositoryChangeError( + 'failed to modify repository "%s": %s' % (repo, str(exc))) + return res + + # ************************************************************************* + # Public properties + # ************************************************************************* + @property + def uplink(self): + """ + @return input queue for jobs + """ + return self._jobmgr.queue_in + + @property + def downlink(self): + """ + @return output queue for job results + """ + return self._jobmgr.queue_out + + # ************************************************************************* + # Public methods + # ************************************************************************* + def run(self): + """ + Thread's entry point. After initial setup it calls _main_loop(). + """ + if self._logging_config is not None: + setup_logging(self._logging_config) + global LOG + LOG = logging.getLogger(__name__) + LOG.info("running as pid=%d", self.pid) + self._jobmgr.start() + LOG.info("started %s as thread %s", + self._jobmgr.name, self._jobmgr.ident) + self._pkg_cache = weakref.WeakValueDictionary() + + # This allows the code, that can be run both from broker and + # YumWorker, to check, whether it's called by this process. + from lmi.software.yumdb import YumDB + YumDB.RUNNING_UNDER_CIMOM_PROCESS = False + + self._main_loop() + LOG.info("terminating") + |