diff options
author | Michal Minar <miminar@redhat.com> | 2013-12-12 13:10:36 +0100 |
---|---|---|
committer | Michal Minar <miminar@redhat.com> | 2013-12-12 15:04:10 +0100 |
commit | 2b22e2975cd12852ee9931e472cc85e7b42e65a0 (patch) | |
tree | fddd67cab2348bef14bee8d2fa693c7e89565ea1 /src/software/lmi/software/yumdb/process.py | |
parent | 0ff1628666bc2ad52da91297c55a59d10b077e4e (diff) | |
download | openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.tar.gz openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.tar.xz openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.zip |
software: redone job handling
There was a serious flaw in previous object model. JobManager was a
thread spawned from inside of separated YumWorker process. Meanwhile
IndicationManager was spawned in provider process which is correct
otherwise it coult not send indications through broker.
The problem is that JobManager needs to create indications and access
IndicationManager. But they were in different processes.
JobManager worked with static data duplicated from provider process when
the worker process has been forked. Therefor all subscriptions and
indication enablement made after the worker processed has been
created did not affect jobmanager.
For some unknown reasons this could also cause a segfault to worker
process when creating indications that were sent to provider process.
This patch shuffles classes a bit:
* JobManager is spawned as a thread in provider process by YumDB.
* JObManager spawns its own SessionManager that is a wrapper for
YumWorker process
* SessionManager is a thread running in provider's process. It
manages worker process and ensures that yum database is locked when
there is an active session.
* YumWorker does not spawn any other process. It processes jobs one by
one.
Resolves: #1039018
Diffstat (limited to 'src/software/lmi/software/yumdb/process.py')
-rw-r--r-- | src/software/lmi/software/yumdb/process.py | 143 |
1 files changed, 72 insertions, 71 deletions
diff --git a/src/software/lmi/software/yumdb/process.py b/src/software/lmi/software/yumdb/process.py index 9069f72..ba429ac 100644 --- a/src/software/lmi/software/yumdb/process.py +++ b/src/software/lmi/software/yumdb/process.py @@ -44,13 +44,16 @@ from lmi.software.yumdb import jobs from lmi.software.yumdb import packageinfo from lmi.software.yumdb import packagecheck from lmi.software.yumdb import repository -from lmi.software.yumdb.jobmanager import JobManager from lmi.software.yumdb.util import is_pkg_installed from lmi.software.yumdb.util import setup_logging # Global variable which gets its value after the start of YumWorker process. LOG = None +class ShutDownWorker(Exception): + """ Raised when shut down job is received. """ + pass + # ***************************************************************************** # Utilities # **************************************************************************** @@ -185,19 +188,17 @@ def _needs_database(method): Wrapper for the job handler method. """ created_session = False - self._init_database() #pylint: disable=W0212 - if self._session_level == 0: #pylint: disable=W0212 - self._session_level = 1 #pylint: disable=W0212 + self._init_database() + if self._state != YumWorker.STATE_LOCKED: created_session = True - self._lock_database() #pylint: disable=W0212 + self._lock_database() self._check_repository_configs() try: result = logged(self, *args, **kwargs) return result finally: - if created_session is True: #pylint: disable=W0212 - self._session_level = 0 #pylint: disable=W0212 - self._unlock_database() #pylint: disable=W0212 + if created_session is True: + self._unlock_database() return _wrapper # ***************************************************************************** @@ -210,19 +211,29 @@ class YumWorker(Process): Jobs are dispatched by their class names to particular handler method. - It spawns a second thread for managing asynchronous jobs and queue - of incoming jobs. It's an instance of JobManager. + It can process just one job at once. It has following states: + + ``STATE_IDLE`` + Initial state where no information is cached. Next state + is ``STATE_LOCKED``. + ``STATE_LOCKED`` + Database is locked, and :py:class:`yum.YumBase` is initialied. + Next state is ``STATE_UNLOCKED``. + ``STATE_UNLOCKED`` + Database is unlocked. Yum objects are cached. Next state is + any of above ones. """ + STATE_IDLE, STATE_LOCKED, STATE_UNLOCKED = range(3) + def __init__(self, queue_in, queue_out, - indication_manager, yum_kwargs=None): Process.__init__(self, name="YumWorker") - self._jobmgr = JobManager(queue_in, queue_out, indication_manager) - self._session_level = 0 - self._session_ended = False + self._queue_in = queue_in + self._queue_out = queue_out + self._state = YumWorker.STATE_IDLE if yum_kwargs is None: yum_kwargs = {} @@ -258,6 +269,7 @@ class YumWorker(Process): LOG.info("freing database") self._pkg_cache.clear() self._yum_base = None + self._state = YumWorker.STATE_IDLE @cmpi_logging.trace_method def _lock_database(self): @@ -267,11 +279,12 @@ class YumWorker(Process): Try to lock it in loop, until success. """ + assert self._state != YumWorker.STATE_LOCKED while True: try: - LOG.info("trying to lock database - session level %d", - self._session_level) + LOG.info("trying to lock database") self._yum_base.doLock() + self._state = YumWorker.STATE_LOCKED LOG.info("successfully locked up") break except yum.Errors.LockError as exc: @@ -294,24 +307,24 @@ class YumWorker(Process): self._yum_base.closeRpmDB() del self._yum_base.history self._yum_base.doUnlock() + self._state = YumWorker.STATE_UNLOCKED @cmpi_logging.trace_method def _get_job(self): """ - Get job from JobManager thread. - If no job comes for long time, free database to save memory. + Wait for new job to process. If no job comes for a long time, free + database to save memory. """ while True: - if self._session_ended and self._session_level == 0: + if self._state == YumWorker.STATE_UNLOCKED: try: timeout = util.Configuration.get_instance().get_safe( 'Yum', 'FreeDatabaseTimeout', float) - return self._jobmgr.get_job(timeout=timeout) + return self._queue_in.get(timeout=timeout) except TQueue.Empty: self._free_database() - self._session_ended = False else: - return self._jobmgr.get_job() + return self._queue_in.get() @cmpi_logging.trace_method def _transform_packages(self, packages, @@ -457,8 +470,8 @@ class YumWorker(Process): jobs.YumRemovePackage : self._handle_remove_package, jobs.YumUpdateToPackage : self._handle_update_to_package, jobs.YumUpdatePackage : self._handle_update_package, - jobs.YumBeginSession : self._handle_begin_session, - jobs.YumEndSession : self._handle_end_session, + jobs.YumLock : self._handle_lock, + jobs.YumUnlock : self._handle_unlock, jobs.YumCheckPackage : self._handle_check_package, jobs.YumCheckPackageFile : self._handle_check_package_file, jobs.YumInstallPackageFromURI : \ @@ -472,6 +485,8 @@ class YumWorker(Process): LOG.info("processing job %s(id=%d)", job.__class__.__name__, job.jobid) except KeyError: + if isinstance(job, jobs.YumShutDown): + raise ShutDownWorker() LOG.error("No handler for job \"%s\"", job.__class__.__name__) raise errors.UnknownJob("No handler for job \"%s\"." % job.__class__.__name__) @@ -499,56 +514,50 @@ class YumWorker(Process): def _main_loop(self): """ This is a main loop called from run(). Jobs are handled here. - It accepts a job from input queue, handles it, - sends the result to output queue and marks the job as done. + It accepts a job from input queue, handles it and + sends the result to output queue. - It is terminated, when None is received from input queue. + It is terminated, when :py:class:`~lmi.software.yumdb.jobs.YumShutDown` + object is received. """ while True: job = self._get_job() - if job is not None: # not a terminate command - result = jobs.YumJob.RESULT_SUCCESS - try: - data = self._do_work(job) - except Exception: #pylint: disable=W0703 - result = jobs.YumJob.RESULT_ERROR - # (type, value, traceback) - data = sys.exc_info() - # traceback is not pickable - replace it with formatted - # text - data = (data[0], data[1], traceback.format_tb(data[2])) - LOG.exception("job %s failed", job) - self._jobmgr.finish_job(job, result, data) - if job is None: - self._session_level = 0 - self._session_ended = False - self._unlock_database() - LOG.info("waiting for %s to finish", self._jobmgr.name) - self._jobmgr.join() + result = jobs.YumJob.RESULT_SUCCESS + try: + data = self._do_work(job) + except ShutDownWorker: break + except Exception: #pylint: disable=W0703 + result = jobs.YumJob.RESULT_ERROR + # (type, value, traceback) + data = sys.exc_info() + # traceback is not pickable - replace it with formatted + # text + data = (data[0], data[1], traceback.format_tb(data[2])) + LOG.exception("job %s failed", job) + self._queue_out.put((job.jobid, result, data)) + self._unlock_database() + self._free_database() @cmpi_logging.trace_method - def _handle_begin_session(self): - """ - Handler for session begin job. - """ - self._session_level += 1 - LOG.info("beginning session level %s", self._session_level) - if self._session_level == 1: + def _handle_lock(self): + """ Handle lock request. """ + if self._state != YumWorker.STATE_LOCKED: self._init_database() self._lock_database() self._check_repository_configs() + else: + LOG.warn('requested lock while database is already locked') @cmpi_logging.trace_method - def _handle_end_session(self): + def _handle_unlock(self): """ - Handler for session end job. + Handler for unlock job. """ - LOG.info("ending session level %d", self._session_level) - self._session_level = max(self._session_level - 1, 0) - if self._session_level == 0: + if self._state == YumWorker.STATE_LOCKED: self._unlock_database() - self._session_ended = True + else: + LOG.trace_warn('requested unlock on not locked database') @_needs_database def _handle_get_package_list(self, kind, allow_duplicates, sort, @@ -871,10 +880,10 @@ class YumWorker(Process): try: if enable ^ res: if enable is True: - LOG.info("enabling repository %s" % repo) + LOG.info("enabling repository %s", repo) repo.enable() else: - LOG.info("disabling repository %s" % repo) + LOG.info("disabling repository %s", repo) repo.disable() try: yum.config.writeRawRepoFile(repo, only=["enabled"]) @@ -897,14 +906,14 @@ class YumWorker(Process): """ @return input queue for jobs """ - return self._jobmgr.queue_in + return self._queue_in @property def downlink(self): """ @return output queue for job results """ - return self._jobmgr.queue_out + return self._queue_out # ************************************************************************* # Public methods @@ -917,16 +926,8 @@ class YumWorker(Process): global LOG LOG = logging.getLogger(__name__) LOG.info("running as pid=%d", self.pid) - self._jobmgr.start() - LOG.info("started %s as thread %s", - self._jobmgr.name, self._jobmgr.ident) self._pkg_cache = weakref.WeakValueDictionary() - # This allows the code, that can be run both from broker and - # YumWorker, to check, whether it's called by this process. - from lmi.software.yumdb import YumDB - YumDB.RUNNING_UNDER_CIMOM_PROCESS = False - self._main_loop() LOG.info("terminating") |