summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/yumdb/process.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/openlmi/software/yumdb/process.py')
-rw-r--r--src/software/openlmi/software/yumdb/process.py399
1 files changed, 229 insertions, 170 deletions
diff --git a/src/software/openlmi/software/yumdb/process.py b/src/software/openlmi/software/yumdb/process.py
index 96c471e..e77b360 100644
--- a/src/software/openlmi/software/yumdb/process.py
+++ b/src/software/openlmi/software/yumdb/process.py
@@ -24,7 +24,6 @@ Module holding the code of separate process accessing the YUM API.
"""
import errno
-import inspect
from itertools import chain
import logging
from multiprocessing import Process
@@ -33,6 +32,7 @@ import Queue as TQueue # T as threaded
import sys
import time
import traceback
+from urlgrabber.grabber import default_grabber
import weakref
import yum
@@ -42,6 +42,8 @@ from openlmi.software.yumdb import jobs
from openlmi.software.yumdb import packageinfo
from openlmi.software.yumdb import packagecheck
from openlmi.software.yumdb import repository
+from openlmi.software.yumdb.jobmanager import JobManager
+from openlmi.software.yumdb.util import trace_function, setup_logging
# *****************************************************************************
# Constants
@@ -50,19 +52,11 @@ from openlmi.software.yumdb import repository
FREE_DATABASE_TIMEOUT = 60
LOCK_WAIT_INTERVAL = 0.5
RPMDB_PATH = '/var/lib/rpm/Packages'
+LOG = None
# *****************************************************************************
# Utilities
# ****************************************************************************
-def _logger():
- """
- Returns logger for this module, when first needed.
- @return logger specific for this process
- """
- if not hasattr(_logger, "logger"):
- _logger.logger = logging.getLogger(__name__)
- return _logger.logger
-
def _get_package_filter_function(filters):
"""
@param filters is a dictionary, where keys are package property
@@ -130,14 +124,14 @@ class RepoFilterSetter(object):
repos = set()
if exclude:
repos.update(self._yum_base.repos.disableRepo(exclude))
- _logger().info('disabling repositories: [%s]', ", ".join(repos))
+ LOG.info('disabling repositories: [%s]', ", ".join(repos))
if isinstance(self._include, (list, tuple, set)):
include = ",".join(self._include)
else:
include = self._include
if include:
affected = self._yum_base.repos.enableRepo(include)
- _logger().info('enabling repositories: [%s]', ", ".join(affected))
+ LOG.info('enabling repositories: [%s]', ", ".join(affected))
repos.update(affected)
for repoid, prev_enabled in self._prev_states.items():
if ( repoid not in repos
@@ -154,7 +148,7 @@ class RepoFilterSetter(object):
def __exit__(self, exc_type, exc_value, exc_tb):
# restore previous repository states
if len(self._prev_states):
- _logger().info('restoring repositories: [%s]',
+ LOG.info('restoring repositories: [%s]',
", ".join(self._prev_states.keys()))
for repoid, enabled in self._prev_states.items():
repo = self._yum_base.repos.getRepo(repoid)
@@ -169,51 +163,13 @@ class RepoFilterSetter(object):
# *****************************************************************************
# Decorators
# *****************************************************************************
-def _trace_function(func):
- """
- Decorator for logging entries and exits of function or method.
- """
- if not inspect.ismethod(func) and not inspect.isfunction(func):
- raise TypeError("func must be a function")
-
- def _print_value(val):
- """
- Used here for printing function arguments. Shortens the output
- string, if that would be too long.
- """
- if isinstance(val, list):
- if len(val) < 2:
- return str(val)
- else:
- return "[%s, ...]" % _print_value(val[0])
- return str(val)
-
- def _wrapper(self, *args, **kwargs):
- """
- Wrapper for function or method, that does the logging.
- """
- ftype = "method" if inspect.ismethod(func) else "function"
- parent = ( func.im_class.__name__ + "."
- if inspect.ismethod(func) else "")
-
- _logger().debug("entering %s %s%s with args=(%s)",
- ftype, parent, func.__name__,
- ", ".join(chain(
- (_print_value(a) for a in args),
- ( "%s=%s"%(k, _print_value(v))
- for k, v in kwargs.items()))))
- result = func(self, *args, **kwargs)
- _logger().debug("exiting %s %s%s", ftype, parent, func.__name__)
- return result
-
- return _wrapper
-
def _needs_database(method):
"""
Decorator for YumWorker job handlers, that need to access the yum database.
It ensures, that database is initialized and locks it in case, that
no session is active.
"""
+ logged = trace_function(method)
def _wrapper(self, *args, **kwargs):
"""
Wrapper for the job handler method.
@@ -222,18 +178,14 @@ def _needs_database(method):
if self._session_level == 0: #pylint: disable=W0212
self._lock_database() #pylint: disable=W0212
try:
- _logger().debug("calling job handler %s with args=(%s)",
+ LOG.debug("calling job handler %s with args=(%s)",
method.__name__,
", ".join(chain(
(str(a) for a in args),
("%s=%s"%(k, str(v)) for k, v in kwargs.items()))))
- result = method(self, *args, **kwargs)
- _logger().debug("job handler %s finished", method.__name__)
+ result = logged(self, *args, **kwargs)
+ LOG.debug("job handler %s finished", method.__name__)
return result
- except:
- _logger().debug("job handler %s terminated with exception: %s",
- method.__name__, traceback.format_exc())
- raise
finally:
if self._session_level == 0: #pylint: disable=W0212
self._unlock_database() #pylint: disable=W0212
@@ -248,26 +200,25 @@ class YumWorker(Process):
for input jobs and second for results.
Jobs are dispatched by their class names to particular handler method.
+
+ It spawns a second thread for managing asynchronous jobs and queue
+ of incoming jobs. It's an instance of JobManager.
"""
def __init__(self,
queue_in,
queue_out,
- yum_args=None,
yum_kwargs=None,
logging_config=None):
Process.__init__(self, name="YumWorker")
- self._queue_in = queue_in
- self._queue_out = queue_out
+ self._jobmgr = JobManager(queue_in, queue_out)
self._session_level = 0
self._session_ended = False
- if yum_args is None:
- yum_args = tuple()
if yum_kwargs is None:
yum_kwargs = {}
- self._yum_args = (yum_args, yum_kwargs)
+ self._yum_kwargs = yum_kwargs
self._yum_base = None
self._pkg_cache = None
@@ -279,31 +230,28 @@ class YumWorker(Process):
# *************************************************************************
# Private methods
# *************************************************************************
- @_trace_function
+ @trace_function
def _init_database(self):
"""
Initializes yum base object, when it does no exists.
And updates the cache (when out of date).
"""
if self._yum_base is None:
- _logger().info("creating YumBase with args=(%s)",
- ", ".join(chain(
- (str(a) for a in self._yum_args[0]),
- ( "%s=%s"%(k, str(v))
- for k, v in self._yum_args[1].items()))))
- self._yum_base = yum.YumBase(
- *self._yum_args[0], **self._yum_args[1])
-
- @_trace_function
+ LOG.info("creating YumBase with kwargs=(%s)",
+ ", ".join(( "%s=%s"%(k, str(v))
+ for k, v in self._yum_kwargs.items())))
+ self._yum_base = yum.YumBase(**self._yum_kwargs)
+
+ @trace_function
def _free_database(self):
"""
Release the yum base object to safe memory.
"""
- _logger().info("freing database")
+ LOG.info("freing database")
self._pkg_cache.clear()
self._yum_base = None
- @_trace_function
+ @trace_function
def _lock_database(self):
"""
Only one process is allowed to work with package database at given time.
@@ -313,31 +261,47 @@ class YumWorker(Process):
"""
while True:
try:
- _logger().info("trying to lock database - session level %d",
+ LOG.info("trying to lock database - session level %d",
self._session_level)
self._yum_base.doLock()
- _logger().info("successfully locked up")
+ LOG.info("successfully locked up")
break
except yum.Errors.LockError as exc:
- _logger().warn("failed to lock")
+ LOG.warn("failed to lock")
if exc.errno in (errno.EPERM, errno.EACCES, errno.ENOSPC):
- _logger().error("can't create lock file")
+ LOG.error("can't create lock file")
raise errors.DatabaseLockError("Can't create lock file.")
- _logger().info("trying to lock again after %.1f seconds",
+ LOG.info("trying to lock again after %.1f seconds",
LOCK_WAIT_INTERVAL)
time.sleep(LOCK_WAIT_INTERVAL)
- @_trace_function
+ @trace_function
def _unlock_database(self):
"""
The opposite to _lock_database() method.
"""
if self._yum_base is not None:
- _logger().info("unlocking database")
+ LOG.info("unlocking database")
self._yum_base.closeRpmDB()
self._yum_base.doUnlock()
- @_trace_function
+ @trace_function
+ def _get_job(self):
+ """
+ Get job from JobManager thread.
+ If no job comes for long time, free database to save memory.
+ """
+ while True:
+ if self._session_ended and self._session_level == 0:
+ try:
+ return self._jobmgr.get_job(timeout=FREE_DATABASE_TIMEOUT)
+ except TQueue.Empty:
+ self._free_database()
+ self._session_ended = False
+ else:
+ return self._jobmgr.get_job()
+
+ @trace_function
def _transform_packages(self, packages,
cache_packages=True,
flush_cache=True):
@@ -350,7 +314,7 @@ class YumWorker(Process):
packages; makes sense only with cachee_packages=True
"""
if cache_packages is True and flush_cache is True:
- _logger().debug("flushing package cache")
+ LOG.debug("flushing package cache")
self._pkg_cache.clear()
res = []
for orig in packages:
@@ -360,7 +324,7 @@ class YumWorker(Process):
res.append(pkg)
return res
- @_trace_function
+ @trace_function
def _cache_packages(self, packages, flush_cache=True, transform=False):
"""
Store packages in cache and return them.
@@ -372,13 +336,13 @@ class YumWorker(Process):
if transform is True:
return self._transform_packages(packages, flush_cache=flush_cache)
if flush_cache is True:
- _logger().debug("flushing package cache")
+ LOG.debug("flushing package cache")
self._pkg_cache.clear()
for pkg in packages:
self._pkg_cache[id(pkg)] = pkg
return packages
- @_trace_function
+ @trace_function
def _lookup_package(self, pkg):
"""
Lookup the original package in cache.
@@ -387,13 +351,13 @@ class YumWorker(Process):
"""
if not isinstance(pkg, packageinfo.PackageInfo):
raise TypeError("pkg must be instance of PackageInfo")
- _logger().debug("looking up yum package %s with id=%d",
+ LOG.debug("looking up yum package %s with id=%d",
pkg, pkg.objid)
try:
result = self._pkg_cache[pkg.objid]
- _logger().debug("lookup successful")
+ LOG.debug("lookup successful")
except KeyError:
- _logger().warn("lookup of package %s with id=%d failed, trying"
+ LOG.warn("lookup of package %s with id=%d failed, trying"
" to query database", pkg, pkg.objid)
result = self._handle_filter_packages(
'installed' if pkg.installed else 'available',
@@ -402,13 +366,13 @@ class YumWorker(Process):
transform=False,
**pkg.key_props)
if len(result) < 1:
- _logger().warn("package %s not found", pkg)
+ LOG.warn("package %s not found", pkg)
raise errors.PackageNotFound(
"package %s could not be found" % pkg)
result = result[0]
return result
- @_trace_function
+ @trace_function
def _clear_repository_cache(self):
"""
Clears the repository cache and their configuration directory
@@ -421,7 +385,7 @@ class YumWorker(Process):
del self._yum_base.pkgSack
self._repodir_mtimes.clear()
- @_trace_function
+ @trace_function
def _check_repository_configs(self):
"""
Checks whether repository information is up to date with configuration
@@ -435,7 +399,7 @@ class YumWorker(Process):
and ( not repodir in self._repodir_mtimes
or ( os.stat(repodir).st_mtime
> self._repodir_mtimes[repodir]))):
- _logger().info("repository config dir %s changed", repodir)
+ LOG.info("repository config dir %s changed", repodir)
dirty = True
break
if not dirty:
@@ -444,18 +408,18 @@ class YumWorker(Process):
if ( not os.path.exists(filename)
or ( int(os.stat(filename).st_mtime)
> repo.repo_config_age)):
- _logger().info('config file of repository "%s" changed',
+ LOG.info('config file of repository "%s" changed',
repo.id)
dirty = True
break
if dirty is True:
- _logger().info("repository cache is dirty, cleaning up ...")
+ LOG.info("repository cache is dirty, cleaning up ...")
self._clear_repository_cache()
self._yum_base.getReposFromConfig()
if dirty is True or not self._repodir_mtimes:
self._update_repodir_mtimes()
- @_trace_function
+ @trace_function
def _update_repodir_mtimes(self):
"""
Updates the last modification times of repo configuration directories.
@@ -465,7 +429,7 @@ class YumWorker(Process):
if os.path.exists(repodir):
self._repodir_mtimes[repodir] = os.stat(repodir).st_mtime
- @_trace_function
+ @trace_function
def _do_work(self, job):
"""
Dispatcher of incoming jobs. Job is passed to the right handler
@@ -484,55 +448,86 @@ class YumWorker(Process):
jobs.YumBeginSession : self._handle_begin_session,
jobs.YumEndSession : self._handle_end_session,
jobs.YumCheckPackage : self._handle_check_package,
+ jobs.YumInstallPackageFromURI : \
+ self._handle_install_package_from_uri,
jobs.YumGetRepositoryList : \
self._handle_get_repository_list,
jobs.YumFilterRepositories : self._handle_filter_repositories,
jobs.YumSetRepositoryEnabled : \
self._handle_set_repository_enabled
}[job.__class__]
- _logger().info("processing job %s(id=%d)",
+ LOG.info("processing job %s(id=%d)",
job.__class__.__name__, job.jobid)
except KeyError:
- _logger().info("No handler for job \"%s\"", job.__class__.__name__)
+ LOG.error("No handler for job \"%s\"", job.__class__.__name__)
raise errors.UnknownJob("No handler for job \"%s\"." %
job.__class__.__name__)
return handler(**job.job_kwargs)
- @_trace_function
+ @trace_function
def _run_transaction(self, name):
"""
Builds and runs the yum transaction and checks for errors.
@param name of transaction used only in error description on failure
"""
- _logger().info("building transaction %s", name)
+ LOG.info("building transaction %s", name)
(code, msgs) = self._yum_base.buildTransaction()
if code == 1:
- _logger().error("building transaction %s failed: %s",
+ LOG.error("building transaction %s failed: %s",
name, "\n".join(msgs))
raise errors.TransactionBuildFailed(
"Failed to build \"%s\" transaction: %s" % (
name, "\n".join(msgs)))
- _logger().info("processing transaction %s", name)
+ LOG.info("processing transaction %s", name)
self._yum_base.processTransaction()
self._yum_base.closeRpmDB()
- @_trace_function
+ @trace_function
+ def _main_loop(self):
+ """
+ This is a main loop called from run(). Jobs are handled here.
+ It accepts a job from input queue, handles it,
+ sends the result to output queue and marks the job as done.
+
+ It is terminated, when None is received from input queue.
+ """
+ while True:
+ job = self._get_job()
+ if job is not None: # not a terminate command
+ result = jobs.YumJob.RESULT_SUCCESS
+ try:
+ data = self._do_work(job)
+ except Exception: #pylint: disable=W0703
+ result = jobs.YumJob.RESULT_ERROR
+ # (type, value, traceback)
+ data = sys.exc_info()
+ # traceback is not pickable - replace it with formatted
+ # text
+ data = (data[0], data[1], traceback.format_tb(data[2]))
+ LOG.exception("job %s failed", job)
+ self._jobmgr.finish_job(job, result, data)
+ if job is None:
+ LOG.info("waiting for %s to finish", self._jobmgr.name)
+ self._jobmgr.join()
+ break
+
+ @trace_function
def _handle_begin_session(self):
"""
Handler for session begin job.
"""
self._session_level += 1
- _logger().info("beginning session level %s", self._session_level)
+ LOG.info("beginning session level %s", self._session_level)
if self._session_level == 1:
self._init_database()
self._lock_database()
- @_trace_function
+ @trace_function
def _handle_end_session(self):
"""
Handler for session end job.
"""
- _logger().info("ending session level %d", self._session_level)
+ LOG.info("ending session level %d", self._session_level)
self._session_level = max(self._session_level - 1, 0)
if self._session_level == 0:
self._unlock_database()
@@ -556,7 +551,7 @@ class YumWorker(Process):
else:
what = kind
with RepoFilterSetter(self._yum_base, include_repos, exclude_repos):
- _logger().debug("calling YumBase.doPackageLists(%s, showdups=%s)",
+ LOG.debug("calling YumBase.doPackageLists(%s, showdups=%s)",
what, allow_duplicates)
pkglist = self._yum_base.doPackageLists(what,
showdups=allow_duplicates)
@@ -570,7 +565,7 @@ class YumWorker(Process):
result = getattr(pkglist, what)
if sort is True:
result.sort()
- _logger().debug("returning %s packages", len(result))
+ LOG.debug("returning %s packages", len(result))
return self._cache_packages(result, transform=transform)
@_needs_database
@@ -588,21 +583,40 @@ class YumWorker(Process):
result = [p for p in pkglist if matches(p)]
if sort is True:
result.sort()
- _logger().debug("%d packages matching", len(result))
+ LOG.debug("%d packages matching", len(result))
if transform is True:
# caching has been already done by _handle_get_package_list()
result = self._transform_packages(result, cache_packages=False)
return result
@_needs_database
- def _handle_install_package(self, pkg):
+ def _handle_install_package(self, pkg, force=False):
"""
Handler for package installation job.
@return installed package instance
"""
- pkg_desired = self._lookup_package(pkg)
- self._yum_base.install(pkg_desired)
- self._run_transaction("install")
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages(
+ 'available' if force else 'avail_notinst',
+ allow_duplicates=False, sort=True,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ elif len(pkgs) > 1:
+ LOG.warn('multiple packages matches nevra "%s": [%s]',
+ pkg, ", ".join(p.nevra for p in pkgs))
+ pkg_desired = pkgs[-1]
+ else:
+ pkg_desired = self._lookup_package(pkg)
+ if isinstance(pkg_desired, yum.rpmsack.RPMInstalledPackage):
+ if force is False:
+ raise errors.PackageAlreadyInstalled(pkg)
+ action = "reinstall"
+ else:
+ action = "install"
+ getattr(self._yum_base, action)(pkg_desired)
+ self._run_transaction(action)
installed = self._handle_filter_packages("installed", False, False,
nevra=util.pkg2nevra(pkg_desired, with_epoch="ALWAYS"))
if len(installed) < 1:
@@ -615,7 +629,16 @@ class YumWorker(Process):
"""
Handler for package removal job.
"""
- pkg = self._lookup_package(pkg)
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('installed',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg = pkgs[-1]
+ else:
+ pkg = self._lookup_package(pkg)
self._yum_base.remove(pkg)
self._run_transaction("remove")
@@ -625,7 +648,16 @@ class YumWorker(Process):
Handler for specific package update job.
@return package corresponding to pkg after update
"""
- pkg_desired = self._lookup_package(pkg)
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('available',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg_desired = pkgs[-1]
+ else:
+ pkg_desired = self._lookup_package(pkg)
self._yum_base.update(update_to=True,
name=pkg_desired.name,
epoch=pkg_desired.epoch,
@@ -641,12 +673,24 @@ class YumWorker(Process):
return installed[0]
@_needs_database
- def _handle_update_package(self, pkg, to_epoch, to_version, to_release):
+ def _handle_update_package(self, pkg, to_epoch, to_version, to_release,
+ _force=False):
"""
Handler for package update job.
@return updated package instance
"""
- pkg = self._lookup_package(pkg)
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('installed',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg = pkgs[-1]
+ else:
+ pkg = self._lookup_package(pkg)
+ if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ raise errors.PackageNotInstalled(pkg)
kwargs = { "name" : pkg.name, "arch" : pkg.arch }
if any(v is not None for v in (to_epoch, to_version, to_release)):
kwargs["update_to"] = True
@@ -672,12 +716,54 @@ class YumWorker(Process):
"""
@return PackageFile instance for requested package
"""
- pkg = self._lookup_package(pkg)
+ if isinstance(pkg, basestring):
+ pkgs = self._handle_filter_packages('installed',
+ allow_duplicates=False, sort=False,
+ transform=False, nevra=pkg)
+ if len(pkgs) < 1:
+ raise errors.PackageNotFound('No available package matches'
+ ' nevra "%s".' % pkg)
+ pkg = pkgs[-1]
+ else:
+ pkg = self._lookup_package(pkg)
+ if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage):
+ raise errors.PackageNotInstalled(pkg)
vpkg = yum.packages._RPMVerifyPackage(pkg, pkg.hdr.fiFromHeader(),
packagecheck.pkg_checksum_type(pkg), [], True)
return packagecheck.make_package_check_from_db(vpkg)
@_needs_database
+ def _handle_install_package_from_uri(self, uri,
+ update_only=False, force=False):
+ """
+ @return installed PackageInfo instance
+ """
+ try:
+ pkg = yum.packages.YumUrlPackage(self,
+ ts=self._yum_base.rpmdb.readOnlyTS(), url=uri,
+ ua=default_grabber)
+ except yum.Errors.MiscError as exc:
+ raise errors.PackageOpenError(uri, str(exc))
+ installed = self._handle_filter_packages("installed", False, False,
+ nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS"))
+ if installed and force is False:
+ raise errors.PackageAlreadyInstalled(pkg)
+ kwargs = { 'po' : pkg }
+ if installed:
+ action = 'reinstallLocal'
+ else:
+ action = 'installLocal'
+ kwargs = { 'updateonly' : update_only }
+ getattr(self._yum_base, action)(uri, **kwargs)
+ self._run_transaction('installLocal')
+ installed = self._handle_filter_packages("installed", False, False,
+ nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS"))
+ if len(installed) < 1:
+ raise errors.TransactionExecutionFailed(
+ "Failed to install desired package %s." % pkg)
+ return installed[0]
+
+ @_needs_database
def _handle_get_repository_list(self, kind, transform=True):
"""
@return list of yumdb.Repository instances
@@ -692,7 +778,7 @@ class YumWorker(Process):
repos.sort()
if transform:
repos = [repository.make_repository_from_db(r) for r in repos]
- _logger().debug("returning %d repositories from %s",
+ LOG.debug("returning %d repositories from %s",
len(repos), kind)
return repos
@@ -709,15 +795,15 @@ class YumWorker(Process):
self._yum_base.repos.getRepo(filters["repoid"]))
if ( (kind == "enabled" and not repo.enabled)
or (kind == "disabled" and repo.enabled)):
- _logger().warn(
+ LOG.warn(
'no such repository with id="%s"matching filters',
filters['repoid'])
return []
- _logger().debug(
+ LOG.debug(
"exactly one repository matching filters found")
return [repo]
except (KeyError, yum.Errors.RepoError):
- _logger().warn('repository with id="%s" could not be found',
+ LOG.warn('repository with id="%s" could not be found',
filters['repoid'])
raise errors.RepositoryNotFound(filters['repoid'])
repos = self._handle_get_repository_list(kind, transform=False)
@@ -730,7 +816,7 @@ class YumWorker(Process):
break
else: # all properties passed
result.append(repository.make_repository_from_db(repo))
- _logger().debug("found %d repositories matching", len(result))
+ LOG.debug("found %d repositories matching", len(result))
return result
@_needs_database
@@ -746,10 +832,10 @@ class YumWorker(Process):
try:
if enable ^ res:
if enable is True:
- _logger().info("enabling repository %s" % repo)
+ LOG.info("enabling repository %s" % repo)
repo.enable()
else:
- _logger().info("disabling repository %s" % repo)
+ LOG.info("disabling repository %s" % repo)
repo.disable()
try:
yum.config.writeRawRepoFile(repo, only=["enabled"])
@@ -758,7 +844,7 @@ class YumWorker(Process):
'failed to modify repository "%s": %s - %s' % (
repo, exc.__class__.__name__, str(exc)))
else:
- _logger().info("no change for repo %s", repo)
+ LOG.info("no change for repo %s", repo)
except yum.Errors.RepoError as exc:
raise errors.RepositoryChangeError(
'failed to modify repository "%s": %s' % (repo, str(exc)))
@@ -772,59 +858,32 @@ class YumWorker(Process):
"""
@return input queue for jobs
"""
- return self._queue_in
+ return self._jobmgr.queue_in
@property
def downlink(self):
"""
@return output queue for job results
"""
- return self._queue_out
+ return self._jobmgr.queue_out
# *************************************************************************
# Public methods
# *************************************************************************
def run(self):
"""
- Main loop of process. It accepts a job from input queue, handles it,
- sends the result to output queue and marks the job as done.
-
- It is terminated, when None is received from input queue.
+ Thread's entry point. After initial setup it calls _main_loop().
"""
if self._logging_config is not None:
- try:
- logging.config.dictConfig(self._logging_config)
- except Exception: #pylint: disable=W0703
- # logging is not set up but client expects us to work
- pass
- _logger().info("running as pid=%d", self.pid)
- _logger().info("starting %s main loop", self.__class__.__name__)
+ setup_logging(self._logging_config)
+ global LOG
+ LOG = logging.getLogger(__name__)
+ LOG.info("running as pid=%d", self.pid)
+ self._jobmgr.start()
+ LOG.info("started %s as thread %s",
+ self._jobmgr.name, self._jobmgr.ident)
self._pkg_cache = weakref.WeakValueDictionary()
- while True:
- if self._session_ended and self._session_level == 0:
- try:
- job = self._queue_in.get(True, FREE_DATABASE_TIMEOUT)
- except TQueue.Empty:
- self._free_database()
- self._session_ended = False
- continue
- else:
- job = self._queue_in.get()
- if job is not None: # not a terminate command
- try:
- result = self._do_work(job)
- except Exception: #pylint: disable=W0703
- # (type, value, traceback)
- result = sys.exc_info()
- # traceback is not pickable - replace it with formatted
- # text
- result = ( result[0], result[1]
- , traceback.format_tb(result[2]))
- _logger().error("job %s(id=%d) failed: %s",
- job.__class__.__name__, job.jobid,
- traceback.format_exc())
- self._queue_out.put((job.jobid, result))
- self._queue_in.task_done()
- if job is None:
- break
+
+ self._main_loop()
+ LOG.info("terminating")