diff options
Diffstat (limited to 'src/software/openlmi/software/yumdb/process.py')
-rw-r--r-- | src/software/openlmi/software/yumdb/process.py | 399 |
1 files changed, 229 insertions, 170 deletions
diff --git a/src/software/openlmi/software/yumdb/process.py b/src/software/openlmi/software/yumdb/process.py index 96c471e..e77b360 100644 --- a/src/software/openlmi/software/yumdb/process.py +++ b/src/software/openlmi/software/yumdb/process.py @@ -24,7 +24,6 @@ Module holding the code of separate process accessing the YUM API. """ import errno -import inspect from itertools import chain import logging from multiprocessing import Process @@ -33,6 +32,7 @@ import Queue as TQueue # T as threaded import sys import time import traceback +from urlgrabber.grabber import default_grabber import weakref import yum @@ -42,6 +42,8 @@ from openlmi.software.yumdb import jobs from openlmi.software.yumdb import packageinfo from openlmi.software.yumdb import packagecheck from openlmi.software.yumdb import repository +from openlmi.software.yumdb.jobmanager import JobManager +from openlmi.software.yumdb.util import trace_function, setup_logging # ***************************************************************************** # Constants @@ -50,19 +52,11 @@ from openlmi.software.yumdb import repository FREE_DATABASE_TIMEOUT = 60 LOCK_WAIT_INTERVAL = 0.5 RPMDB_PATH = '/var/lib/rpm/Packages' +LOG = None # ***************************************************************************** # Utilities # **************************************************************************** -def _logger(): - """ - Returns logger for this module, when first needed. - @return logger specific for this process - """ - if not hasattr(_logger, "logger"): - _logger.logger = logging.getLogger(__name__) - return _logger.logger - def _get_package_filter_function(filters): """ @param filters is a dictionary, where keys are package property @@ -130,14 +124,14 @@ class RepoFilterSetter(object): repos = set() if exclude: repos.update(self._yum_base.repos.disableRepo(exclude)) - _logger().info('disabling repositories: [%s]', ", ".join(repos)) + LOG.info('disabling repositories: [%s]', ", ".join(repos)) if isinstance(self._include, (list, tuple, set)): include = ",".join(self._include) else: include = self._include if include: affected = self._yum_base.repos.enableRepo(include) - _logger().info('enabling repositories: [%s]', ", ".join(affected)) + LOG.info('enabling repositories: [%s]', ", ".join(affected)) repos.update(affected) for repoid, prev_enabled in self._prev_states.items(): if ( repoid not in repos @@ -154,7 +148,7 @@ class RepoFilterSetter(object): def __exit__(self, exc_type, exc_value, exc_tb): # restore previous repository states if len(self._prev_states): - _logger().info('restoring repositories: [%s]', + LOG.info('restoring repositories: [%s]', ", ".join(self._prev_states.keys())) for repoid, enabled in self._prev_states.items(): repo = self._yum_base.repos.getRepo(repoid) @@ -169,51 +163,13 @@ class RepoFilterSetter(object): # ***************************************************************************** # Decorators # ***************************************************************************** -def _trace_function(func): - """ - Decorator for logging entries and exits of function or method. - """ - if not inspect.ismethod(func) and not inspect.isfunction(func): - raise TypeError("func must be a function") - - def _print_value(val): - """ - Used here for printing function arguments. Shortens the output - string, if that would be too long. - """ - if isinstance(val, list): - if len(val) < 2: - return str(val) - else: - return "[%s, ...]" % _print_value(val[0]) - return str(val) - - def _wrapper(self, *args, **kwargs): - """ - Wrapper for function or method, that does the logging. - """ - ftype = "method" if inspect.ismethod(func) else "function" - parent = ( func.im_class.__name__ + "." - if inspect.ismethod(func) else "") - - _logger().debug("entering %s %s%s with args=(%s)", - ftype, parent, func.__name__, - ", ".join(chain( - (_print_value(a) for a in args), - ( "%s=%s"%(k, _print_value(v)) - for k, v in kwargs.items())))) - result = func(self, *args, **kwargs) - _logger().debug("exiting %s %s%s", ftype, parent, func.__name__) - return result - - return _wrapper - def _needs_database(method): """ Decorator for YumWorker job handlers, that need to access the yum database. It ensures, that database is initialized and locks it in case, that no session is active. """ + logged = trace_function(method) def _wrapper(self, *args, **kwargs): """ Wrapper for the job handler method. @@ -222,18 +178,14 @@ def _needs_database(method): if self._session_level == 0: #pylint: disable=W0212 self._lock_database() #pylint: disable=W0212 try: - _logger().debug("calling job handler %s with args=(%s)", + LOG.debug("calling job handler %s with args=(%s)", method.__name__, ", ".join(chain( (str(a) for a in args), ("%s=%s"%(k, str(v)) for k, v in kwargs.items())))) - result = method(self, *args, **kwargs) - _logger().debug("job handler %s finished", method.__name__) + result = logged(self, *args, **kwargs) + LOG.debug("job handler %s finished", method.__name__) return result - except: - _logger().debug("job handler %s terminated with exception: %s", - method.__name__, traceback.format_exc()) - raise finally: if self._session_level == 0: #pylint: disable=W0212 self._unlock_database() #pylint: disable=W0212 @@ -248,26 +200,25 @@ class YumWorker(Process): for input jobs and second for results. Jobs are dispatched by their class names to particular handler method. + + It spawns a second thread for managing asynchronous jobs and queue + of incoming jobs. It's an instance of JobManager. """ def __init__(self, queue_in, queue_out, - yum_args=None, yum_kwargs=None, logging_config=None): Process.__init__(self, name="YumWorker") - self._queue_in = queue_in - self._queue_out = queue_out + self._jobmgr = JobManager(queue_in, queue_out) self._session_level = 0 self._session_ended = False - if yum_args is None: - yum_args = tuple() if yum_kwargs is None: yum_kwargs = {} - self._yum_args = (yum_args, yum_kwargs) + self._yum_kwargs = yum_kwargs self._yum_base = None self._pkg_cache = None @@ -279,31 +230,28 @@ class YumWorker(Process): # ************************************************************************* # Private methods # ************************************************************************* - @_trace_function + @trace_function def _init_database(self): """ Initializes yum base object, when it does no exists. And updates the cache (when out of date). """ if self._yum_base is None: - _logger().info("creating YumBase with args=(%s)", - ", ".join(chain( - (str(a) for a in self._yum_args[0]), - ( "%s=%s"%(k, str(v)) - for k, v in self._yum_args[1].items())))) - self._yum_base = yum.YumBase( - *self._yum_args[0], **self._yum_args[1]) - - @_trace_function + LOG.info("creating YumBase with kwargs=(%s)", + ", ".join(( "%s=%s"%(k, str(v)) + for k, v in self._yum_kwargs.items()))) + self._yum_base = yum.YumBase(**self._yum_kwargs) + + @trace_function def _free_database(self): """ Release the yum base object to safe memory. """ - _logger().info("freing database") + LOG.info("freing database") self._pkg_cache.clear() self._yum_base = None - @_trace_function + @trace_function def _lock_database(self): """ Only one process is allowed to work with package database at given time. @@ -313,31 +261,47 @@ class YumWorker(Process): """ while True: try: - _logger().info("trying to lock database - session level %d", + LOG.info("trying to lock database - session level %d", self._session_level) self._yum_base.doLock() - _logger().info("successfully locked up") + LOG.info("successfully locked up") break except yum.Errors.LockError as exc: - _logger().warn("failed to lock") + LOG.warn("failed to lock") if exc.errno in (errno.EPERM, errno.EACCES, errno.ENOSPC): - _logger().error("can't create lock file") + LOG.error("can't create lock file") raise errors.DatabaseLockError("Can't create lock file.") - _logger().info("trying to lock again after %.1f seconds", + LOG.info("trying to lock again after %.1f seconds", LOCK_WAIT_INTERVAL) time.sleep(LOCK_WAIT_INTERVAL) - @_trace_function + @trace_function def _unlock_database(self): """ The opposite to _lock_database() method. """ if self._yum_base is not None: - _logger().info("unlocking database") + LOG.info("unlocking database") self._yum_base.closeRpmDB() self._yum_base.doUnlock() - @_trace_function + @trace_function + def _get_job(self): + """ + Get job from JobManager thread. + If no job comes for long time, free database to save memory. + """ + while True: + if self._session_ended and self._session_level == 0: + try: + return self._jobmgr.get_job(timeout=FREE_DATABASE_TIMEOUT) + except TQueue.Empty: + self._free_database() + self._session_ended = False + else: + return self._jobmgr.get_job() + + @trace_function def _transform_packages(self, packages, cache_packages=True, flush_cache=True): @@ -350,7 +314,7 @@ class YumWorker(Process): packages; makes sense only with cachee_packages=True """ if cache_packages is True and flush_cache is True: - _logger().debug("flushing package cache") + LOG.debug("flushing package cache") self._pkg_cache.clear() res = [] for orig in packages: @@ -360,7 +324,7 @@ class YumWorker(Process): res.append(pkg) return res - @_trace_function + @trace_function def _cache_packages(self, packages, flush_cache=True, transform=False): """ Store packages in cache and return them. @@ -372,13 +336,13 @@ class YumWorker(Process): if transform is True: return self._transform_packages(packages, flush_cache=flush_cache) if flush_cache is True: - _logger().debug("flushing package cache") + LOG.debug("flushing package cache") self._pkg_cache.clear() for pkg in packages: self._pkg_cache[id(pkg)] = pkg return packages - @_trace_function + @trace_function def _lookup_package(self, pkg): """ Lookup the original package in cache. @@ -387,13 +351,13 @@ class YumWorker(Process): """ if not isinstance(pkg, packageinfo.PackageInfo): raise TypeError("pkg must be instance of PackageInfo") - _logger().debug("looking up yum package %s with id=%d", + LOG.debug("looking up yum package %s with id=%d", pkg, pkg.objid) try: result = self._pkg_cache[pkg.objid] - _logger().debug("lookup successful") + LOG.debug("lookup successful") except KeyError: - _logger().warn("lookup of package %s with id=%d failed, trying" + LOG.warn("lookup of package %s with id=%d failed, trying" " to query database", pkg, pkg.objid) result = self._handle_filter_packages( 'installed' if pkg.installed else 'available', @@ -402,13 +366,13 @@ class YumWorker(Process): transform=False, **pkg.key_props) if len(result) < 1: - _logger().warn("package %s not found", pkg) + LOG.warn("package %s not found", pkg) raise errors.PackageNotFound( "package %s could not be found" % pkg) result = result[0] return result - @_trace_function + @trace_function def _clear_repository_cache(self): """ Clears the repository cache and their configuration directory @@ -421,7 +385,7 @@ class YumWorker(Process): del self._yum_base.pkgSack self._repodir_mtimes.clear() - @_trace_function + @trace_function def _check_repository_configs(self): """ Checks whether repository information is up to date with configuration @@ -435,7 +399,7 @@ class YumWorker(Process): and ( not repodir in self._repodir_mtimes or ( os.stat(repodir).st_mtime > self._repodir_mtimes[repodir]))): - _logger().info("repository config dir %s changed", repodir) + LOG.info("repository config dir %s changed", repodir) dirty = True break if not dirty: @@ -444,18 +408,18 @@ class YumWorker(Process): if ( not os.path.exists(filename) or ( int(os.stat(filename).st_mtime) > repo.repo_config_age)): - _logger().info('config file of repository "%s" changed', + LOG.info('config file of repository "%s" changed', repo.id) dirty = True break if dirty is True: - _logger().info("repository cache is dirty, cleaning up ...") + LOG.info("repository cache is dirty, cleaning up ...") self._clear_repository_cache() self._yum_base.getReposFromConfig() if dirty is True or not self._repodir_mtimes: self._update_repodir_mtimes() - @_trace_function + @trace_function def _update_repodir_mtimes(self): """ Updates the last modification times of repo configuration directories. @@ -465,7 +429,7 @@ class YumWorker(Process): if os.path.exists(repodir): self._repodir_mtimes[repodir] = os.stat(repodir).st_mtime - @_trace_function + @trace_function def _do_work(self, job): """ Dispatcher of incoming jobs. Job is passed to the right handler @@ -484,55 +448,86 @@ class YumWorker(Process): jobs.YumBeginSession : self._handle_begin_session, jobs.YumEndSession : self._handle_end_session, jobs.YumCheckPackage : self._handle_check_package, + jobs.YumInstallPackageFromURI : \ + self._handle_install_package_from_uri, jobs.YumGetRepositoryList : \ self._handle_get_repository_list, jobs.YumFilterRepositories : self._handle_filter_repositories, jobs.YumSetRepositoryEnabled : \ self._handle_set_repository_enabled }[job.__class__] - _logger().info("processing job %s(id=%d)", + LOG.info("processing job %s(id=%d)", job.__class__.__name__, job.jobid) except KeyError: - _logger().info("No handler for job \"%s\"", job.__class__.__name__) + LOG.error("No handler for job \"%s\"", job.__class__.__name__) raise errors.UnknownJob("No handler for job \"%s\"." % job.__class__.__name__) return handler(**job.job_kwargs) - @_trace_function + @trace_function def _run_transaction(self, name): """ Builds and runs the yum transaction and checks for errors. @param name of transaction used only in error description on failure """ - _logger().info("building transaction %s", name) + LOG.info("building transaction %s", name) (code, msgs) = self._yum_base.buildTransaction() if code == 1: - _logger().error("building transaction %s failed: %s", + LOG.error("building transaction %s failed: %s", name, "\n".join(msgs)) raise errors.TransactionBuildFailed( "Failed to build \"%s\" transaction: %s" % ( name, "\n".join(msgs))) - _logger().info("processing transaction %s", name) + LOG.info("processing transaction %s", name) self._yum_base.processTransaction() self._yum_base.closeRpmDB() - @_trace_function + @trace_function + def _main_loop(self): + """ + This is a main loop called from run(). Jobs are handled here. + It accepts a job from input queue, handles it, + sends the result to output queue and marks the job as done. + + It is terminated, when None is received from input queue. + """ + while True: + job = self._get_job() + if job is not None: # not a terminate command + result = jobs.YumJob.RESULT_SUCCESS + try: + data = self._do_work(job) + except Exception: #pylint: disable=W0703 + result = jobs.YumJob.RESULT_ERROR + # (type, value, traceback) + data = sys.exc_info() + # traceback is not pickable - replace it with formatted + # text + data = (data[0], data[1], traceback.format_tb(data[2])) + LOG.exception("job %s failed", job) + self._jobmgr.finish_job(job, result, data) + if job is None: + LOG.info("waiting for %s to finish", self._jobmgr.name) + self._jobmgr.join() + break + + @trace_function def _handle_begin_session(self): """ Handler for session begin job. """ self._session_level += 1 - _logger().info("beginning session level %s", self._session_level) + LOG.info("beginning session level %s", self._session_level) if self._session_level == 1: self._init_database() self._lock_database() - @_trace_function + @trace_function def _handle_end_session(self): """ Handler for session end job. """ - _logger().info("ending session level %d", self._session_level) + LOG.info("ending session level %d", self._session_level) self._session_level = max(self._session_level - 1, 0) if self._session_level == 0: self._unlock_database() @@ -556,7 +551,7 @@ class YumWorker(Process): else: what = kind with RepoFilterSetter(self._yum_base, include_repos, exclude_repos): - _logger().debug("calling YumBase.doPackageLists(%s, showdups=%s)", + LOG.debug("calling YumBase.doPackageLists(%s, showdups=%s)", what, allow_duplicates) pkglist = self._yum_base.doPackageLists(what, showdups=allow_duplicates) @@ -570,7 +565,7 @@ class YumWorker(Process): result = getattr(pkglist, what) if sort is True: result.sort() - _logger().debug("returning %s packages", len(result)) + LOG.debug("returning %s packages", len(result)) return self._cache_packages(result, transform=transform) @_needs_database @@ -588,21 +583,40 @@ class YumWorker(Process): result = [p for p in pkglist if matches(p)] if sort is True: result.sort() - _logger().debug("%d packages matching", len(result)) + LOG.debug("%d packages matching", len(result)) if transform is True: # caching has been already done by _handle_get_package_list() result = self._transform_packages(result, cache_packages=False) return result @_needs_database - def _handle_install_package(self, pkg): + def _handle_install_package(self, pkg, force=False): """ Handler for package installation job. @return installed package instance """ - pkg_desired = self._lookup_package(pkg) - self._yum_base.install(pkg_desired) - self._run_transaction("install") + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages( + 'available' if force else 'avail_notinst', + allow_duplicates=False, sort=True, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + elif len(pkgs) > 1: + LOG.warn('multiple packages matches nevra "%s": [%s]', + pkg, ", ".join(p.nevra for p in pkgs)) + pkg_desired = pkgs[-1] + else: + pkg_desired = self._lookup_package(pkg) + if isinstance(pkg_desired, yum.rpmsack.RPMInstalledPackage): + if force is False: + raise errors.PackageAlreadyInstalled(pkg) + action = "reinstall" + else: + action = "install" + getattr(self._yum_base, action)(pkg_desired) + self._run_transaction(action) installed = self._handle_filter_packages("installed", False, False, nevra=util.pkg2nevra(pkg_desired, with_epoch="ALWAYS")) if len(installed) < 1: @@ -615,7 +629,16 @@ class YumWorker(Process): """ Handler for package removal job. """ - pkg = self._lookup_package(pkg) + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) self._yum_base.remove(pkg) self._run_transaction("remove") @@ -625,7 +648,16 @@ class YumWorker(Process): Handler for specific package update job. @return package corresponding to pkg after update """ - pkg_desired = self._lookup_package(pkg) + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('available', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg_desired = pkgs[-1] + else: + pkg_desired = self._lookup_package(pkg) self._yum_base.update(update_to=True, name=pkg_desired.name, epoch=pkg_desired.epoch, @@ -641,12 +673,24 @@ class YumWorker(Process): return installed[0] @_needs_database - def _handle_update_package(self, pkg, to_epoch, to_version, to_release): + def _handle_update_package(self, pkg, to_epoch, to_version, to_release, + _force=False): """ Handler for package update job. @return updated package instance """ - pkg = self._lookup_package(pkg) + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) + if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + raise errors.PackageNotInstalled(pkg) kwargs = { "name" : pkg.name, "arch" : pkg.arch } if any(v is not None for v in (to_epoch, to_version, to_release)): kwargs["update_to"] = True @@ -672,12 +716,54 @@ class YumWorker(Process): """ @return PackageFile instance for requested package """ - pkg = self._lookup_package(pkg) + if isinstance(pkg, basestring): + pkgs = self._handle_filter_packages('installed', + allow_duplicates=False, sort=False, + transform=False, nevra=pkg) + if len(pkgs) < 1: + raise errors.PackageNotFound('No available package matches' + ' nevra "%s".' % pkg) + pkg = pkgs[-1] + else: + pkg = self._lookup_package(pkg) + if not isinstance(pkg, yum.rpmsack.RPMInstalledPackage): + raise errors.PackageNotInstalled(pkg) vpkg = yum.packages._RPMVerifyPackage(pkg, pkg.hdr.fiFromHeader(), packagecheck.pkg_checksum_type(pkg), [], True) return packagecheck.make_package_check_from_db(vpkg) @_needs_database + def _handle_install_package_from_uri(self, uri, + update_only=False, force=False): + """ + @return installed PackageInfo instance + """ + try: + pkg = yum.packages.YumUrlPackage(self, + ts=self._yum_base.rpmdb.readOnlyTS(), url=uri, + ua=default_grabber) + except yum.Errors.MiscError as exc: + raise errors.PackageOpenError(uri, str(exc)) + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS")) + if installed and force is False: + raise errors.PackageAlreadyInstalled(pkg) + kwargs = { 'po' : pkg } + if installed: + action = 'reinstallLocal' + else: + action = 'installLocal' + kwargs = { 'updateonly' : update_only } + getattr(self._yum_base, action)(uri, **kwargs) + self._run_transaction('installLocal') + installed = self._handle_filter_packages("installed", False, False, + nevra=util.pkg2nevra(pkg, with_epoch="ALWAYS")) + if len(installed) < 1: + raise errors.TransactionExecutionFailed( + "Failed to install desired package %s." % pkg) + return installed[0] + + @_needs_database def _handle_get_repository_list(self, kind, transform=True): """ @return list of yumdb.Repository instances @@ -692,7 +778,7 @@ class YumWorker(Process): repos.sort() if transform: repos = [repository.make_repository_from_db(r) for r in repos] - _logger().debug("returning %d repositories from %s", + LOG.debug("returning %d repositories from %s", len(repos), kind) return repos @@ -709,15 +795,15 @@ class YumWorker(Process): self._yum_base.repos.getRepo(filters["repoid"])) if ( (kind == "enabled" and not repo.enabled) or (kind == "disabled" and repo.enabled)): - _logger().warn( + LOG.warn( 'no such repository with id="%s"matching filters', filters['repoid']) return [] - _logger().debug( + LOG.debug( "exactly one repository matching filters found") return [repo] except (KeyError, yum.Errors.RepoError): - _logger().warn('repository with id="%s" could not be found', + LOG.warn('repository with id="%s" could not be found', filters['repoid']) raise errors.RepositoryNotFound(filters['repoid']) repos = self._handle_get_repository_list(kind, transform=False) @@ -730,7 +816,7 @@ class YumWorker(Process): break else: # all properties passed result.append(repository.make_repository_from_db(repo)) - _logger().debug("found %d repositories matching", len(result)) + LOG.debug("found %d repositories matching", len(result)) return result @_needs_database @@ -746,10 +832,10 @@ class YumWorker(Process): try: if enable ^ res: if enable is True: - _logger().info("enabling repository %s" % repo) + LOG.info("enabling repository %s" % repo) repo.enable() else: - _logger().info("disabling repository %s" % repo) + LOG.info("disabling repository %s" % repo) repo.disable() try: yum.config.writeRawRepoFile(repo, only=["enabled"]) @@ -758,7 +844,7 @@ class YumWorker(Process): 'failed to modify repository "%s": %s - %s' % ( repo, exc.__class__.__name__, str(exc))) else: - _logger().info("no change for repo %s", repo) + LOG.info("no change for repo %s", repo) except yum.Errors.RepoError as exc: raise errors.RepositoryChangeError( 'failed to modify repository "%s": %s' % (repo, str(exc))) @@ -772,59 +858,32 @@ class YumWorker(Process): """ @return input queue for jobs """ - return self._queue_in + return self._jobmgr.queue_in @property def downlink(self): """ @return output queue for job results """ - return self._queue_out + return self._jobmgr.queue_out # ************************************************************************* # Public methods # ************************************************************************* def run(self): """ - Main loop of process. It accepts a job from input queue, handles it, - sends the result to output queue and marks the job as done. - - It is terminated, when None is received from input queue. + Thread's entry point. After initial setup it calls _main_loop(). """ if self._logging_config is not None: - try: - logging.config.dictConfig(self._logging_config) - except Exception: #pylint: disable=W0703 - # logging is not set up but client expects us to work - pass - _logger().info("running as pid=%d", self.pid) - _logger().info("starting %s main loop", self.__class__.__name__) + setup_logging(self._logging_config) + global LOG + LOG = logging.getLogger(__name__) + LOG.info("running as pid=%d", self.pid) + self._jobmgr.start() + LOG.info("started %s as thread %s", + self._jobmgr.name, self._jobmgr.ident) self._pkg_cache = weakref.WeakValueDictionary() - while True: - if self._session_ended and self._session_level == 0: - try: - job = self._queue_in.get(True, FREE_DATABASE_TIMEOUT) - except TQueue.Empty: - self._free_database() - self._session_ended = False - continue - else: - job = self._queue_in.get() - if job is not None: # not a terminate command - try: - result = self._do_work(job) - except Exception: #pylint: disable=W0703 - # (type, value, traceback) - result = sys.exc_info() - # traceback is not pickable - replace it with formatted - # text - result = ( result[0], result[1] - , traceback.format_tb(result[2])) - _logger().error("job %s(id=%d) failed: %s", - job.__class__.__name__, job.jobid, - traceback.format_exc()) - self._queue_out.put((job.jobid, result)) - self._queue_in.task_done() - if job is None: - break + + self._main_loop() + LOG.info("terminating") |