summaryrefslogtreecommitdiffstats
path: root/src/software/lmi/software/yumdb/process.py
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-12-12 13:10:36 +0100
committerMichal Minar <miminar@redhat.com>2013-12-12 15:04:10 +0100
commit2b22e2975cd12852ee9931e472cc85e7b42e65a0 (patch)
treefddd67cab2348bef14bee8d2fa693c7e89565ea1 /src/software/lmi/software/yumdb/process.py
parent0ff1628666bc2ad52da91297c55a59d10b077e4e (diff)
downloadopenlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.tar.gz
openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.tar.xz
openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.zip
software: redone job handling
There was a serious flaw in previous object model. JobManager was a thread spawned from inside of separated YumWorker process. Meanwhile IndicationManager was spawned in provider process which is correct otherwise it coult not send indications through broker. The problem is that JobManager needs to create indications and access IndicationManager. But they were in different processes. JobManager worked with static data duplicated from provider process when the worker process has been forked. Therefor all subscriptions and indication enablement made after the worker processed has been created did not affect jobmanager. For some unknown reasons this could also cause a segfault to worker process when creating indications that were sent to provider process. This patch shuffles classes a bit: * JobManager is spawned as a thread in provider process by YumDB. * JObManager spawns its own SessionManager that is a wrapper for YumWorker process * SessionManager is a thread running in provider's process. It manages worker process and ensures that yum database is locked when there is an active session. * YumWorker does not spawn any other process. It processes jobs one by one. Resolves: #1039018
Diffstat (limited to 'src/software/lmi/software/yumdb/process.py')
-rw-r--r--src/software/lmi/software/yumdb/process.py143
1 files changed, 72 insertions, 71 deletions
diff --git a/src/software/lmi/software/yumdb/process.py b/src/software/lmi/software/yumdb/process.py
index 9069f72..ba429ac 100644
--- a/src/software/lmi/software/yumdb/process.py
+++ b/src/software/lmi/software/yumdb/process.py
@@ -44,13 +44,16 @@ from lmi.software.yumdb import jobs
from lmi.software.yumdb import packageinfo
from lmi.software.yumdb import packagecheck
from lmi.software.yumdb import repository
-from lmi.software.yumdb.jobmanager import JobManager
from lmi.software.yumdb.util import is_pkg_installed
from lmi.software.yumdb.util import setup_logging
# Global variable which gets its value after the start of YumWorker process.
LOG = None
+class ShutDownWorker(Exception):
+ """ Raised when shut down job is received. """
+ pass
+
# *****************************************************************************
# Utilities
# ****************************************************************************
@@ -185,19 +188,17 @@ def _needs_database(method):
Wrapper for the job handler method.
"""
created_session = False
- self._init_database() #pylint: disable=W0212
- if self._session_level == 0: #pylint: disable=W0212
- self._session_level = 1 #pylint: disable=W0212
+ self._init_database()
+ if self._state != YumWorker.STATE_LOCKED:
created_session = True
- self._lock_database() #pylint: disable=W0212
+ self._lock_database()
self._check_repository_configs()
try:
result = logged(self, *args, **kwargs)
return result
finally:
- if created_session is True: #pylint: disable=W0212
- self._session_level = 0 #pylint: disable=W0212
- self._unlock_database() #pylint: disable=W0212
+ if created_session is True:
+ self._unlock_database()
return _wrapper
# *****************************************************************************
@@ -210,19 +211,29 @@ class YumWorker(Process):
Jobs are dispatched by their class names to particular handler method.
- It spawns a second thread for managing asynchronous jobs and queue
- of incoming jobs. It's an instance of JobManager.
+ It can process just one job at once. It has following states:
+
+ ``STATE_IDLE``
+ Initial state where no information is cached. Next state
+ is ``STATE_LOCKED``.
+ ``STATE_LOCKED``
+ Database is locked, and :py:class:`yum.YumBase` is initialied.
+ Next state is ``STATE_UNLOCKED``.
+ ``STATE_UNLOCKED``
+ Database is unlocked. Yum objects are cached. Next state is
+ any of above ones.
"""
+ STATE_IDLE, STATE_LOCKED, STATE_UNLOCKED = range(3)
+
def __init__(self,
queue_in,
queue_out,
- indication_manager,
yum_kwargs=None):
Process.__init__(self, name="YumWorker")
- self._jobmgr = JobManager(queue_in, queue_out, indication_manager)
- self._session_level = 0
- self._session_ended = False
+ self._queue_in = queue_in
+ self._queue_out = queue_out
+ self._state = YumWorker.STATE_IDLE
if yum_kwargs is None:
yum_kwargs = {}
@@ -258,6 +269,7 @@ class YumWorker(Process):
LOG.info("freing database")
self._pkg_cache.clear()
self._yum_base = None
+ self._state = YumWorker.STATE_IDLE
@cmpi_logging.trace_method
def _lock_database(self):
@@ -267,11 +279,12 @@ class YumWorker(Process):
Try to lock it in loop, until success.
"""
+ assert self._state != YumWorker.STATE_LOCKED
while True:
try:
- LOG.info("trying to lock database - session level %d",
- self._session_level)
+ LOG.info("trying to lock database")
self._yum_base.doLock()
+ self._state = YumWorker.STATE_LOCKED
LOG.info("successfully locked up")
break
except yum.Errors.LockError as exc:
@@ -294,24 +307,24 @@ class YumWorker(Process):
self._yum_base.closeRpmDB()
del self._yum_base.history
self._yum_base.doUnlock()
+ self._state = YumWorker.STATE_UNLOCKED
@cmpi_logging.trace_method
def _get_job(self):
"""
- Get job from JobManager thread.
- If no job comes for long time, free database to save memory.
+ Wait for new job to process. If no job comes for a long time, free
+ database to save memory.
"""
while True:
- if self._session_ended and self._session_level == 0:
+ if self._state == YumWorker.STATE_UNLOCKED:
try:
timeout = util.Configuration.get_instance().get_safe(
'Yum', 'FreeDatabaseTimeout', float)
- return self._jobmgr.get_job(timeout=timeout)
+ return self._queue_in.get(timeout=timeout)
except TQueue.Empty:
self._free_database()
- self._session_ended = False
else:
- return self._jobmgr.get_job()
+ return self._queue_in.get()
@cmpi_logging.trace_method
def _transform_packages(self, packages,
@@ -457,8 +470,8 @@ class YumWorker(Process):
jobs.YumRemovePackage : self._handle_remove_package,
jobs.YumUpdateToPackage : self._handle_update_to_package,
jobs.YumUpdatePackage : self._handle_update_package,
- jobs.YumBeginSession : self._handle_begin_session,
- jobs.YumEndSession : self._handle_end_session,
+ jobs.YumLock : self._handle_lock,
+ jobs.YumUnlock : self._handle_unlock,
jobs.YumCheckPackage : self._handle_check_package,
jobs.YumCheckPackageFile : self._handle_check_package_file,
jobs.YumInstallPackageFromURI : \
@@ -472,6 +485,8 @@ class YumWorker(Process):
LOG.info("processing job %s(id=%d)",
job.__class__.__name__, job.jobid)
except KeyError:
+ if isinstance(job, jobs.YumShutDown):
+ raise ShutDownWorker()
LOG.error("No handler for job \"%s\"", job.__class__.__name__)
raise errors.UnknownJob("No handler for job \"%s\"." %
job.__class__.__name__)
@@ -499,56 +514,50 @@ class YumWorker(Process):
def _main_loop(self):
"""
This is a main loop called from run(). Jobs are handled here.
- It accepts a job from input queue, handles it,
- sends the result to output queue and marks the job as done.
+ It accepts a job from input queue, handles it and
+ sends the result to output queue.
- It is terminated, when None is received from input queue.
+ It is terminated, when :py:class:`~lmi.software.yumdb.jobs.YumShutDown`
+ object is received.
"""
while True:
job = self._get_job()
- if job is not None: # not a terminate command
- result = jobs.YumJob.RESULT_SUCCESS
- try:
- data = self._do_work(job)
- except Exception: #pylint: disable=W0703
- result = jobs.YumJob.RESULT_ERROR
- # (type, value, traceback)
- data = sys.exc_info()
- # traceback is not pickable - replace it with formatted
- # text
- data = (data[0], data[1], traceback.format_tb(data[2]))
- LOG.exception("job %s failed", job)
- self._jobmgr.finish_job(job, result, data)
- if job is None:
- self._session_level = 0
- self._session_ended = False
- self._unlock_database()
- LOG.info("waiting for %s to finish", self._jobmgr.name)
- self._jobmgr.join()
+ result = jobs.YumJob.RESULT_SUCCESS
+ try:
+ data = self._do_work(job)
+ except ShutDownWorker:
break
+ except Exception: #pylint: disable=W0703
+ result = jobs.YumJob.RESULT_ERROR
+ # (type, value, traceback)
+ data = sys.exc_info()
+ # traceback is not pickable - replace it with formatted
+ # text
+ data = (data[0], data[1], traceback.format_tb(data[2]))
+ LOG.exception("job %s failed", job)
+ self._queue_out.put((job.jobid, result, data))
+ self._unlock_database()
+ self._free_database()
@cmpi_logging.trace_method
- def _handle_begin_session(self):
- """
- Handler for session begin job.
- """
- self._session_level += 1
- LOG.info("beginning session level %s", self._session_level)
- if self._session_level == 1:
+ def _handle_lock(self):
+ """ Handle lock request. """
+ if self._state != YumWorker.STATE_LOCKED:
self._init_database()
self._lock_database()
self._check_repository_configs()
+ else:
+ LOG.warn('requested lock while database is already locked')
@cmpi_logging.trace_method
- def _handle_end_session(self):
+ def _handle_unlock(self):
"""
- Handler for session end job.
+ Handler for unlock job.
"""
- LOG.info("ending session level %d", self._session_level)
- self._session_level = max(self._session_level - 1, 0)
- if self._session_level == 0:
+ if self._state == YumWorker.STATE_LOCKED:
self._unlock_database()
- self._session_ended = True
+ else:
+ LOG.trace_warn('requested unlock on not locked database')
@_needs_database
def _handle_get_package_list(self, kind, allow_duplicates, sort,
@@ -871,10 +880,10 @@ class YumWorker(Process):
try:
if enable ^ res:
if enable is True:
- LOG.info("enabling repository %s" % repo)
+ LOG.info("enabling repository %s", repo)
repo.enable()
else:
- LOG.info("disabling repository %s" % repo)
+ LOG.info("disabling repository %s", repo)
repo.disable()
try:
yum.config.writeRawRepoFile(repo, only=["enabled"])
@@ -897,14 +906,14 @@ class YumWorker(Process):
"""
@return input queue for jobs
"""
- return self._jobmgr.queue_in
+ return self._queue_in
@property
def downlink(self):
"""
@return output queue for job results
"""
- return self._jobmgr.queue_out
+ return self._queue_out
# *************************************************************************
# Public methods
@@ -917,16 +926,8 @@ class YumWorker(Process):
global LOG
LOG = logging.getLogger(__name__)
LOG.info("running as pid=%d", self.pid)
- self._jobmgr.start()
- LOG.info("started %s as thread %s",
- self._jobmgr.name, self._jobmgr.ident)
self._pkg_cache = weakref.WeakValueDictionary()
- # This allows the code, that can be run both from broker and
- # YumWorker, to check, whether it's called by this process.
- from lmi.software.yumdb import YumDB
- YumDB.RUNNING_UNDER_CIMOM_PROCESS = False
-
self._main_loop()
LOG.info("terminating")