summaryrefslogtreecommitdiffstats
path: root/src/software
diff options
context:
space:
mode:
authorMichal Minar <miminar@redhat.com>2013-12-12 13:10:36 +0100
committerMichal Minar <miminar@redhat.com>2013-12-12 15:04:10 +0100
commit2b22e2975cd12852ee9931e472cc85e7b42e65a0 (patch)
treefddd67cab2348bef14bee8d2fa693c7e89565ea1 /src/software
parent0ff1628666bc2ad52da91297c55a59d10b077e4e (diff)
downloadopenlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.tar.gz
openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.tar.xz
openlmi-providers-2b22e2975cd12852ee9931e472cc85e7b42e65a0.zip
software: redone job handling
There was a serious flaw in previous object model. JobManager was a thread spawned from inside of separated YumWorker process. Meanwhile IndicationManager was spawned in provider process which is correct otherwise it coult not send indications through broker. The problem is that JobManager needs to create indications and access IndicationManager. But they were in different processes. JobManager worked with static data duplicated from provider process when the worker process has been forked. Therefor all subscriptions and indication enablement made after the worker processed has been created did not affect jobmanager. For some unknown reasons this could also cause a segfault to worker process when creating indications that were sent to provider process. This patch shuffles classes a bit: * JobManager is spawned as a thread in provider process by YumDB. * JObManager spawns its own SessionManager that is a wrapper for YumWorker process * SessionManager is a thread running in provider's process. It manages worker process and ensures that yum database is locked when there is an active session. * YumWorker does not spawn any other process. It processes jobs one by one. Resolves: #1039018
Diffstat (limited to 'src/software')
-rw-r--r--src/software/lmi/software/core/Job.py7
-rw-r--r--src/software/lmi/software/yumdb/__init__.py288
-rw-r--r--src/software/lmi/software/yumdb/jobmanager.py485
-rw-r--r--src/software/lmi/software/yumdb/jobs.py48
-rw-r--r--src/software/lmi/software/yumdb/process.py143
5 files changed, 445 insertions, 526 deletions
diff --git a/src/software/lmi/software/core/Job.py b/src/software/lmi/software/core/Job.py
index 7c75778..d7667e6 100644
--- a/src/software/lmi/software/core/Job.py
+++ b/src/software/lmi/software/core/Job.py
@@ -443,13 +443,6 @@ def get_verification_out_params(job):
}
# update local instance
job.update(metadata=metadata)
- if YumDB.RUNNING_UNDER_CIMOM_PROCESS:
- # update instance on server
- YumDB.get_instance().update_job(job.jobid, metadata=metadata)
- # else: we are called from YumWorker process;
- # (update on server already occured)
- # moreover YumWorker blocks on us - we can not wait for another job
- # to finish
return job.metadata.get('output_params', [])
@cmpi_logging.trace_function
diff --git a/src/software/lmi/software/yumdb/__init__.py b/src/software/lmi/software/yumdb/__init__.py
index e27278a..bb7d578 100644
--- a/src/software/lmi/software/yumdb/__init__.py
+++ b/src/software/lmi/software/yumdb/__init__.py
@@ -32,65 +32,24 @@ YumDB is a context manager supposed to be used by any provider as the
only accessor to yum api.
"""
-import errno
from functools import wraps
import inspect
-import os
-import re
-import time
-from multiprocessing import Process, Queue #pylint: disable=W0404
-from pywbem.cim_provider2 import CIMProvider2
-import Queue as TQueue # T as threaded
import threading
-import yum
from lmi.base import singletonmixin
from lmi.providers import cmpi_logging
-from lmi.providers.IndicationManager import IndicationManager
-from lmi.software.util import Configuration
-from lmi.software.util import get_signal_name
from lmi.software.yumdb import jobs
+from lmi.software.yumdb.jobmanager import JobManager
from lmi.software.yumdb.packagecheck import PackageCheck
from lmi.software.yumdb.packagecheck import PackageFile
from lmi.software.yumdb.packageinfo import PackageInfo
-from lmi.software.yumdb.process import YumWorker
from lmi.software.yumdb.repository import Repository
-from lmi.software.yumdb import errors
LOG = cmpi_logging.get_logger(__name__)
-#: Number of times the worker process will be resurrected for a completion of
-#: single job. If the process dies afterwards (while still doing the same job)
-#: an exception will be raised.
-MAX_RESURRECTIONS = 1
-
# *****************************************************************************
# Utilities
# *****************************************************************************
-def log_reply_error(job, reply):
- """
- Raises an exception in case of error occured in worker process
- while processing job.
- """
- if isinstance(reply, (int, long)):
- # asynchronous job
- return
- if not isinstance(reply, jobs.YumJob):
- raise TypeError('expected instance of jobs.YumJob for reply, not "%s"'
- % reply.__class__.__name__)
- if reply.result == jobs.YumJob.RESULT_ERROR:
- LOG().error("%s failed with error %s: %s",
- job, reply.result_data[0].__name__, str(reply.result_data[1]))
- LOG().trace_warn("%s exception traceback:\n%s%s: %s",
- job, "".join(reply.result_data[2]),
- reply.result_data[0].__name__, str(reply.result_data[1]))
- reply.result_data[1].tb_printed = True
- raise reply.result_data[1]
- elif reply.result == jobs.YumJob.RESULT_TERMINATED:
- LOG().warn('%s terminated', job)
- else:
- LOG().debug('%s completed with success', job)
-
def _make_async_job(jobcls, *args, **kwargs):
"""Creates asynchronous job, filling it wih some metadata."""
if not issubclass(jobcls, jobs.YumAsyncJob):
@@ -171,186 +130,25 @@ class YumDB(singletonmixin.Singleton):
# this is to inform Singleton, that __init__ should be called only once
ignoreSubsequent = True
- # This serves to all code base as a global variable used to check,
- # whether YumDB instance is running under cimom broker or under worker
- # process. This is important for code used in callback functions passed
- # to worker responsible for creating instances of ConcreteJob. This code
- # must avoid using calls to YumDB while running under worker. This
- #
- # Worker process must set this to False before starting its event handling
- # loop.
- RUNNING_UNDER_CIMOM_PROCESS = True
-
@cmpi_logging.trace_method
- def __init__(self, **kwargs): #pylint: disable=W0231
+ def __init__(self): #pylint: disable=W0231
"""
All arguments are passed to yum.YumBase constructor.
"""
- #: Separated process accessing yum's api and handling jobs. Initialized
- #: when first needed. If this is ``False``, clean up request has been
- #: received. If ``None``, process has not been started yet or it was
- #: killed.
- self._process = None
- if kwargs is None:
- kwargs = {}
- self._yum_kwargs = kwargs
-
- self._session_lock = threading.RLock()
- self._session_level = 0
-
- # used to guard access to _expected list and _process
- self._reply_lock = threading.Lock()
- # used to wait for job to be processed and received
- self._reply_cond = threading.Condition(self._reply_lock)
- # ids of all expected jobs -- those to be processed by YumWorker
- self._expected = []
- # {job_id : reply, ... }
- self._replies = {}
- # {job_id : number_of_resurrections}
- self._resurrections = {}
+ self._access_lock = threading.RLock()
+ self._jobmgr = None
LOG().trace_info('YumDB initialized')
# *************************************************************************
# Private methods
# *************************************************************************
- @cmpi_logging.trace_method
- def _handle_reply_timeout(self, job):
- """
- This is called when timeout occurs while waiting on downlink queue for
- reply. Delay can be caused by worker process's early termination (bug).
- This handler tries to recover from such an situation.
- """
- if not self._worker.is_alive():
- if self._worker.exitcode < 0:
- msg = "worker process(pid=%d) killed by signal %s" % (
- self._worker.pid, get_signal_name(-self._process.exitcode))
- else:
- msg = "worker process(pid=%d) is dead - exit code: %d" % (
- self._process.pid, self._worker.exitcode)
- LOG().error("[jobid=%d] %s", job.jobid, msg)
- with self._reply_lock:
- if not job.jobid in self._resurrections:
- self._resurrections[job.jobid] = 0
- self._resurrections[job.jobid] += 1
- self._process = None
- LOG().error("[jobid=%d] starting new worker process", job.jobid)
- self._expected = []
- if not isinstance(job, jobs.YumBeginSession):
- with self._session_lock:
- if self._session_level > 0:
- LOG().info('restoring session level=%d',
- self._session_level)
- new_session_job = jobs.YumBeginSession()
- self._worker.uplink.put(new_session_job)
- reply = self._worker.downlink.get()
- log_reply_error(new_session_job, reply)
- if self._resurrections[job.jobid] > MAX_RESURRECTIONS:
- LOG().warn("[jobid=%d] process has been resurrected maximum"
- " number of times (%d times), cancelling job", job.jobid,
- MAX_RESURRECTIONS)
- self._reply_cond.notifyAll()
- raise errors.TransactionError(
- "failed to complete job: %s" % (msg))
- else:
- self._worker.uplink.put(job)
- self._expected.append(job.jobid)
- # other waiting processes need to resend their requests
- self._reply_cond.notifyAll()
- else:
- LOG().info("[jobid=%d] process is running, waiting some more",
- job.jobid)
-
- @cmpi_logging.trace_method
- def _receive_reply(self, job):
- """
- Block on downlink queue to receive expected replies from worker
- process. Only one thread can be executing this code at any time.
-
- Only one thread can block on downlink channel to obtain reply. If
- it's reply for him, he takes it and leaves, otherwise he adds it to
- _replies dictionary and notifies other threads. This thread is the
- one, whose job appears as first in _expected list.
-
- In case, that worker process terminated due to some error. Restart it
- and resend all the job requests again.
- """
- timeout = Configuration.get_instance().get_safe(
- 'Jobs', 'WaitCompleteTimeout', float)
- while self._process is not False: # process terminated
- LOG().debug("[jobid=%d] blocking on downlink queue", job.jobid)
- try:
- jobout = self._worker.downlink.get(block=True, timeout=timeout)
- if jobout.jobid == job.jobid:
- LOG().debug("[jobid=%d] received desired reply", job.jobid)
- with self._reply_lock:
- self._expected.remove(job.jobid)
- self._resurrections.pop(job.jobid, None)
- self._reply_cond.notifyAll()
- return jobout
- else:
- LOG().info("[jobid=%d] received reply for another thread"
- " (jobid=%d)", job.jobid, jobout.jobid)
- with self._reply_lock:
- self._replies[jobout.jobid] = jobout
- self._reply_cond.notifyAll()
- except TQueue.Empty:
- LOG().warn("[jobid=%d] wait for job reply timeout"
- "(%d seconds) occured", job.jobid, timeout)
- self._handle_reply_timeout(job)
- raise errors.TerminatingError("can not complete job %s,"
- " terminating ..." % job)
-
- @cmpi_logging.trace_method
- def _send_and_receive(self, job):
- """
- Sends a request to server and blocks until job is processed by
- YumWorker and reply is received.
-
- Only one thread can block on downlink channel to obtain reply. This
- thread is the one, whose job appears as first in _expected list. Server
- processes input jobs sequentially. That's why it's safe to presume,
- that jobs are received in the same order as they were send. Thanks to
- that we don't have to care about receiving replies for the other
- waiting threads.
-
- @return result of job
- """
- with self._reply_lock:
- self._worker.uplink.put(job)
- if getattr(job, 'async', False) is True:
- return job.jobid
- self._expected.append(job.jobid)
- while True:
- if job.jobid in self._replies:
- LOG().debug("[jobid=%d] desired reply already received",
- job.jobid)
- try:
- self._expected.remove(job.jobid)
- self._resurrections.pop(job.jobid, False)
- except ValueError:
- LOG().warn("[jobid=%d] reply not in expected list",
- job.jobid)
- self._reply_cond.notifyAll()
- return self._replies.pop(job.jobid)
- if self._process is False:
- raise errors.TerminatingError("can not complete job %s,"
- " terminating ..." % job)
- elif job.jobid not in self._expected:
- # process terminated, resending job
- LOG().warn("[jobid=%d] job removed from expected list,"
- " sending request again", job.jobid)
- self._worker.uplink.put(job)
- self._expected.append(job.jobid)
- elif job.jobid == self._expected[0]:
- # now it's our turn to block on downlink
- break
- else: # another thread blocks on downlink -> let's sleep
- LOG().debug("[jobid=%d] another %d threads expecting reply,"
- " suspending...", job.jobid, len(self._expected) - 1)
- self._reply_cond.wait()
- LOG().debug("[jobid=%d] received reply or terminating; waking up",
- job.jobid)
- return self._receive_reply(job)
+ @property
+ def jobmgr(self):
+ with self._access_lock:
+ if self._jobmgr is None:
+ self._jobmgr = JobManager()
+ self._jobmgr.start()
+ return self._jobmgr
def _do_job(self, job):
"""
@@ -361,51 +159,26 @@ class YumDB(singletonmixin.Singleton):
@return reply
"""
LOG().trace_verbose("doing %s", job)
- reply = self._send_and_receive(job)
- log_reply_error(job, reply)
+ reply = self.jobmgr.process(job)
+ jobs.log_reply_error(job, reply)
LOG().trace_verbose("job %s done", job.jobid)
return reply
- @property
- def _worker(self):
- """
- YumWorker process accessor. It's created upon first need.
- """
- if self._process is False:
- raise errors.TerminatingError("provider is terminating")
- if self._process is None:
- LOG().trace_info("starting YumWorker")
- uplink = Queue()
- downlink = Queue()
- self._process = YumWorker(uplink, downlink,
- indication_manager=IndicationManager.get_instance(),
- yum_kwargs=self._yum_kwargs)
- self._process.start()
- LOG().trace_info("YumWorker started with pid=%s", self._process.pid)
- return self._process
-
# *************************************************************************
# Special methods
# *************************************************************************
@cmpi_logging.trace_method
def __enter__(self):
- with self._session_lock:
- if self._session_level == 0:
- self._do_job(jobs.YumBeginSession())
- LOG().trace_info('new session started')
- self._session_level += 1
- LOG().trace_info('nested to session level=%d', self._session_level)
- return self
+ """ Start a new session. """
+ with self._access_lock:
+ self.jobmgr.begin_session()
+ return self
@cmpi_logging.trace_method
def __exit__(self, exc_type, exc_value, traceback):
- with self._session_lock:
- if self._session_level == 1:
- self._do_job(jobs.YumEndSession())
- LOG().trace_info('session ended')
- LOG().trace_info('emerged from session level=%d',
- self._session_level)
- self._session_level = max(self._session_level - 1, 0)
+ """ End active session. """
+ with self._access_lock:
+ self.jobmgr.end_session()
# *************************************************************************
# Public methods
@@ -413,19 +186,16 @@ class YumDB(singletonmixin.Singleton):
@cmpi_logging.trace_method
def clean_up(self):
"""
- Shut down the YumWorker process.
- """
- with self._reply_lock:
- if self._process:
- LOG().info('terminating YumWorker')
- self._process.uplink.put(None) # terminating command
- self._process.join()
- LOG().info('YumWorker terminated')
- # prohibit next instantiation
- self._process = False
- self._reply_cond.notifyAll()
+ Shut down job manager and all the other threads and processes it has
+ created.
+ """
+ with self._access_lock:
+ if self._jobmgr is not None:
+ self._do_job(jobs.YumShutDown())
+ self._jobmgr.join()
+ self._jobmgr = None
else:
- LOG().warn("clean_up called, when process not initialized!")
+ LOG().warn("clean_up called with JobManager not initialized")
# *************************************************************************
# Jobs with simple results
diff --git a/src/software/lmi/software/yumdb/jobmanager.py b/src/software/lmi/software/yumdb/jobmanager.py
index ebe0ba2..e171653 100644
--- a/src/software/lmi/software/yumdb/jobmanager.py
+++ b/src/software/lmi/software/yumdb/jobmanager.py
@@ -19,22 +19,18 @@
# Authors: Michal Minar <miminar@redhat.com>
#
"""
-This is a module for ``JobManager`` which is a separate thread of
-``YumWorker`` process. It keeps a cache of asynchronous jobs and handles
-input and output queues.
-
-This module uses its own logging facilities because it runs in separeted
-process not having access to broker logging features.
-
-Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should
-be set to callable taking ``YumJob`` instance and returning its matching
-CIM abstraction instance.
+This is a module for ``JobManager`` which is a standalone thread in provider
+process. It keeps a cache of asynchronous jobs and handles input queue of jobs
+that are about to be handled by
+:py:class:`~lmi.software.yumdb.process.YumWorker` process.
+
+Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should be set
+to a callable taking :py:class:`~lmi.software.yumdb.jobs.YumJob` instance and
+returning its matching CIM abstraction instance.
"""
from functools import wraps
import heapq
import inspect
-import logging
-import Queue
import sys
import threading
import time
@@ -45,6 +41,7 @@ from lmi.providers.IndicationManager import IndicationManager
from lmi.providers.JobManager import JobManager as JM
from lmi.software.yumdb import errors
from lmi.software.yumdb import jobs
+from lmi.software.yumdb.sessionmanager import SessionManager
from lmi.software.util import Configuration
# This is a callable, which must be initialized before JobManager is used.
@@ -52,8 +49,7 @@ from lmi.software.util import Configuration
# corresponding CIM instance. It's used for sending indications.
JOB_TO_MODEL = lambda job: None
-# replacement for cmpi_logging.logger
-LOG = None
+LOG = cmpi_logging.get_logger(__name__)
# *****************************************************************************
# Decorators
@@ -62,7 +58,7 @@ def job_handler(job_from_target=True):
"""
Decorator for JobManager methods serving as handlers for control jobs.
- Decorator locks the job_lock of manager's instance.
+ Decorator locks the :py:attr:`JobManager._job_lock` of manager's instance.
"""
def _wrapper_jft(method):
"""
@@ -79,10 +75,10 @@ def job_handler(job_from_target=True):
kwargs['job'] = kwargs.pop('target')
callargs = inspect.getcallargs(method, self, *args, **kwargs)
target = callargs.pop('job')
- with self._job_lock: #pylint: disable=W0212
- if not target in self._async_jobs: #pylint: disable=W0212
+ with self._job_lock:
+ if not target in self._async_jobs:
raise errors.JobNotFound(target)
- job = self._async_jobs[target] #pylint: disable=W0212
+ job = self._async_jobs[target]
callargs['job'] = job
return logged(**callargs)
return _new_func
@@ -92,7 +88,7 @@ def job_handler(job_from_target=True):
@wraps(method)
def _new_func(self, *args, **kwargs):
"""Wrapper around method."""
- with self._job_lock: #pylint: disable=W0212
+ with self._job_lock:
return method(self, *args, **kwargs)
return _new_func
@@ -105,7 +101,8 @@ class JobIndicationSender(object):
"""
Makes creation and sending of indications easy. It keeps a reference
to job, which can be *snapshotted* for making CIM instance out of it.
- These instances are then used to send indications via IndicationManager.
+ These instances are then used to send indications via
+ :py:class:`lmi.providers.IndicationManager.IndicationManager`.
Typical usage::
@@ -114,21 +111,23 @@ class JobIndicationSender(object):
sender.snapshot()
sender.send()
- **Note** that number of kept CIM instances won't exceed 2. First one
- is created upon instantiation and the second one be calling
- ``snapshot()``. Any successive call to ``snapshot()`` will overwrite
- the second instance.
+ .. note::
+ Number of kept CIM instances won't exceed 2. First one is created upon
+ instantiation and the second one with the subsequent call to
+ :py:meth:`snapshot()`. Any successive call to it will overwrite the
+ second instance.
+
+ :param job: Is job instance, which will be immediately snapshoted as old
+ instance and later as a new one.
+ :type job: :py:class:`~lmi.software.yumdb.jobs.YumJob`
+ :param list indications: Can either be a list of indication ids or a single
+ indication id.
+ :param new: A job instance stored as new.
+ :type new: :py:class:`~lmi.software.yumdb.jobs.YumJob`
"""
def __init__(self, indication_manager, job,
indications=JM.IND_JOB_CHANGED, new=None):
- """
- :param job (``YumJob``) Is job instance, which will be immediately
- snapshoted as old instance and later as a new one.
- :param indications (``list``) Can either be a list of indication ids
- or a single indication id.
- :param new (``YumJob``) A job instance stored as new.
- """
if not isinstance(indication_manager, IndicationManager):
raise TypeError("indication_manager must be a subclass of"
" IndicationManager")
@@ -148,14 +147,16 @@ class JobIndicationSender(object):
@property
def job(self):
"""
- Return instance of ``YumJob``.
+ :returns: Associated job object.
+ :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
"""
return self._job
@property
def indication_ids(self):
"""
- Return set of indication filter IDs.
+ :returns: Set of indication filter IDs.
+ :rtype: set
"""
return self._indications.copy()
@@ -164,7 +165,7 @@ class JobIndicationSender(object):
"""
Set the indication filter IDs.
- :param indication_ids (``list``) Can be even single id.
+ :param list indication_ids: Can be even single id.
"""
if isinstance(indication_ids, basestring):
indication_ids = set([indication_ids])
@@ -199,18 +200,19 @@ class JobIndicationSender(object):
if ( JM.IND_JOB_CHANGED in self._indications
and self._new_instance is None):
raise errors.IndicationError("no snapshot made for modified job")
+ indication_manager = IndicationManager.get_instance()
for fltr_id in self._indications:
if fltr_id == JM.IND_JOB_CREATED:
- LOG.debug("sending instance creation indication for job %s",
+ LOG().debug("sending instance creation indication for job %s",
self._job)
- self._indication_manager.send_instcreation(
+ indication_manager.send_instcreation(
self._new_instance if self._new_instance is not None
else self._old_instance,
fltr_id)
else:
- LOG.debug("sending instance modification indication for job %s"
- " with ID: %s", self._job, fltr_id)
- self._indication_manager.send_instmodification(
+ LOG().debug("sending instance modification indication for job"
+ " %s with ID: %s", self._job, fltr_id)
+ indication_manager.send_instmodification(
self._old_instance, self._new_instance,
fltr_id)
@@ -218,41 +220,55 @@ class JobManager(threading.Thread):
"""
Separate thread for managing queue of jobs requested by client.
There are three kinds of jobs, that are handled differently:
- * asynchronous - kept in _async_jobs dictionary until job is
+
+ * asynchronous - kept in ``_async_jobs`` dictionary until job is
deleted by request or it expires;
no reply is sent to client upon job's completion
* synchronous - reply is sent to client after job's completion;
no reference to the job is kept afterwards
- * job control - they are not enqueued in _job_queue for YumWorker
- to process, but are handled directly and in the FIFO order
-
- Both asynchronous and synchronous jobs are enqueued in _job_queue
- for YumWorker to obtain them. It's a priority queue sorting jobs by their
- priority.
+ * job control - they are not enqueued in ``_job_queue`` for
+ :py:class:`YumWorker` to process, but are handled directly and in
+ the *FIFO* order
+
+ Both asynchronous and synchronous jobs are enqueued in ``_job_queue`` and
+ will be sent to :py:class:`~lmi.software.yumdb.process.YumWorker` for
+ processing. It's a priority queue sorting jobs by their priority.
+ :py:class:`~lmi.software.yumdb.sessionmanager.SessionManager` is the actual
+ object managing ``YumWorker`` process. All jobs are passed to it.
"""
- # enumeration of actions, that may be enqueued in calendar
+ #: enumeration of actions, that may be enqueued in calendar
ACTION_REMOVE = 0
+ #: names for actions defined above, their values are indexes to this list
ACTION_NAMES = ['remove']
- def __init__(self, queue_in, queue_out, indication_manager):
+ def __init__(self):
threading.Thread.__init__(self, name="JobManager")
- self._queue_in = queue_in
- self._queue_out = queue_out
- self._indication_manager = indication_manager
- self._terminate = False
- # (time, jobid, action)
- self._calendar = []
- # {jobid : job}
- self._async_jobs = {}
+ #: Session manager shall be accessed via property ``session_manager``.
+ #: It's instantiated when first needed.
+ self._sessionmgr = None
- # lock for critical access to _calendar, _async_jobs and _job_queue
+ #: lock for critical access to _sessionmgr, _calendar,
+ #: _async_jobs and _job_queue
self._job_lock = threading.RLock()
- # priority queue of jobs that are processed by YumWorker
+ #: priority queue of jobs that are processed by YumWorker
self._job_queue = []
- # condition for YumWorker waiting on empty _job_queue
- self._job_enqueued = threading.Condition(self._job_lock)
+ #: (time, jobid, action)
+ self._calendar = []
+ #: {jobid : job}
+ self._async_jobs = {}
+ #: Condition object being wait upon in the main loop of this thread.
+ #: It's notified either when the new job request comes or when the
+ #: YumWorker replies.
+ self._job_pending = threading.Condition(self._job_lock)
+
+ #: used to guard access to _finished set
+ self._reply_lock = threading.Lock()
+ #: used to wait for job to be processed and received
+ self._reply_cond = threading.Condition(self._reply_lock)
+ #: job ids are kept here for each finished job
+ self._finished = set()
# *************************************************************************
# Private methods
@@ -260,7 +276,11 @@ class JobManager(threading.Thread):
@cmpi_logging.trace_method
def _control_job(self, job):
"""
- Function dispatching job to handler for particular YumJob subclass.
+ Function dispatching job to handler for particular
+ :py:class:`~lmi.software.yumdb.jobs.YumJob` subclass.
+
+ .. note::
+ This can be called only from client thread.
"""
try:
handler = {
@@ -274,7 +294,7 @@ class JobManager(threading.Thread):
jobs.YumJobDelete : self._handle_delete,
jobs.YumJobTerminate : self._handle_terminate,
}[job.__class__]
- LOG.info("processing control job %s", str(job))
+ LOG().info("processing control job %s", str(job))
except KeyError:
raise errors.UnknownJob("No handler for job \"%s\"." %
job.__class__.__name__)
@@ -283,7 +303,11 @@ class JobManager(threading.Thread):
@cmpi_logging.trace_method
def _enqueue_job(self, job):
"""
- Insert incoming job into _job_queue.
+ Insert incoming job into :py:attr:`_job_queue` or handle it directly if
+ it's control job.
+
+ .. note::
+ This can be called only from client thread.
"""
if isinstance(job, jobs.YumJobControl):
result = job.RESULT_SUCCESS
@@ -294,29 +318,30 @@ class JobManager(threading.Thread):
result = job.RESULT_ERROR
data = sys.exc_info()
data = (data[0], data[1], traceback.format_tb(data[2]))
- LOG.exception("control job %s failed", job)
- job.finish(result, data)
- LOG.debug("sending reply for %s: (%s, %s)", job,
- job.ResultNames[job.result],
- cmpi_logging.render_value(job.result_data))
- self._queue_out.put(job)
+ LOG().exception("control job %s failed", job)
+ self._finish_job(job, result, data)
else:
- if job is None:
- LOG.debug('received terminating command')
- self._terminate = True
- LOG.debug('job %s enqued for YumWorker to handle', job)
+ LOG().debug('job %s enqued for YumWorker to handle', job)
heapq.heappush(self._job_queue, job)
if getattr(job, 'async', False) is True:
ind = self._prepare_indication_for(job, JM.IND_JOB_CREATED)
self._async_jobs[job.jobid] = job
ind.send()
- self._job_enqueued.notify()
+ self._job_pending.notify()
@cmpi_logging.trace_method
def _schedule_event(self, after, jobid, action):
"""
- Enqueue event into calendar. Event consists of time, jobid and
- action.
+ Enqueue event into calendar. Event consists of *time*, *jobid* and
+ *action*.
+
+ :param float after: Number of seconds from now until the action is
+ shall be triggered.
+ :param int jobid: Id of job which is a subject of action execution.
+ :param int action: Value of action to do.
+
+ .. note::
+ This can be called only from client thread.
"""
schedule_at = time.time() + after
for (sched, jid, act) in self._calendar:
@@ -324,7 +349,7 @@ class JobManager(threading.Thread):
if sched <= schedule_at: # same event already scheduled
return
# schedule it for early time
- LOG.debug('rescheduling action %s on job %d to take place'
+ LOG().debug('rescheduling action %s on job %d to take place'
' after %d seconds (instead of %d)',
self.ACTION_NAMES[action], jid, after,
sched - schedule_at + after)
@@ -332,7 +357,7 @@ class JobManager(threading.Thread):
self._calendar.append((schedule_at, jid, act))
heapq.heapify(self._calendar)
return
- LOG.debug('scheduling action %s on job %d to take place after '
+ LOG().debug('scheduling action %s on job %d to take place after '
' %d seconds', self.ACTION_NAMES[action], jobid, after)
heapq.heappush(self._calendar, (schedule_at, jobid, action))
@@ -340,7 +365,13 @@ class JobManager(threading.Thread):
def _run_event(self, jobid, action):
"""
Process event from calendar.
+
+ :param int jobid: Id of job that is a subject of action.
+ :param int action: One of predefined actions to be executed on
+ particular job.
"""
+ LOG().info('running action %s on job(id=%d)',
+ self.ACTION_NAMES[action], jobid)
if action == self.ACTION_REMOVE:
with self._job_lock:
del self._async_jobs[jobid]
@@ -351,28 +382,53 @@ class JobManager(threading.Thread):
@cmpi_logging.trace_method
def _prepare_indication_for(self, job, *args, **kwargs):
"""
- Return instance of ``JobIndicationSender``.
+ Convenience method making indication sender context manager.
+
+ :param list args: Positional arguments passed after *job* to
+ the constructor of :py:class:`JobIndicationSender`.
+ :param dictionary kwargs: Its keyword arguments.
+ :returns: Context manager for snapshoting and sending job replated
+ indications.
+ :rtype: :py:class:`JobIndicationSender`
"""
- return JobIndicationSender(self._indication_manager, job,
+ return JobIndicationSender(IndicationManager.get_instance(), job,
*args, **kwargs)
+ @property
+ def session_manager(self):
+ """
+ Accessor of session manager. Object is instantiated upon first access.
+
+ :rtype: :py:class:`lmi.software.yumdb.sessionmanager.SessionManager`
+ """
+ with self._job_lock:
+ if self._sessionmgr is None:
+ self._sessionmgr = SessionManager(self._job_pending)
+ self._sessionmgr.start()
+ return self._sessionmgr
+
# *************************************************************************
# Job handlers
# *************************************************************************
@job_handler()
- def _handle_get(self, job): #pylint: disable=R0201
- """@return job object"""
+ def _handle_get(self, job):
+ """ Control job handler returning job object requested. """
return job
@job_handler(False)
def _handle_get_list(self):
- """@return list of all asynchronous jobs"""
+ """ Control job handler returning list of all asynchronous jobs. """
with self._job_lock:
return sorted(self._async_jobs.values())
@job_handler(False)
def _handle_get_by_name(self, target):
- """@return job object filtered by name"""
+ """
+ Control job handler returning job object requested by its name.
+
+ :param string target: Name of job to find. ``Name`` property of
+ available asynchronous jobs is queried.
+ """
for job in self._async_jobs.values():
if 'name' in job.metadata and target == job.metadata['name']:
return job
@@ -381,8 +437,11 @@ class JobManager(threading.Thread):
@job_handler()
def _handle_set_priority(self, job, new_priority):
"""
- Modify job's priority and updates its position in queue.
- @return modified job object
+ Control job handler modifying job's priority and updating its position
+ in queue.
+
+ :returns: Modified job object.
+ :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
"""
if not isinstance(new_priority, (int, long)):
raise TypeError('priority must be an integer')
@@ -399,7 +458,15 @@ class JobManager(threading.Thread):
delete_on_completion,
time_before_removal):
"""
- Changes job's schedule for its deletion.
+ Control job handler rescheduling job's deletion.
+
+ :param job: Affected job object.
+ :type job: :py:class:`~lmi.software.yumdb.jobs.YumJob`
+ :param boolean delete_on_completion: Whether the job shall be
+ automatically removed after its completion. The
+ *time_before_removel* then comes into play.
+ :param float time_before_removal: Number of seconds to wait before
+ job's automatic removal.
"""
if ( job.delete_on_completion == delete_on_completion
and job.time_before_removal == time_before_removal):
@@ -422,9 +489,12 @@ class JobManager(threading.Thread):
return job
@job_handler()
- def _handle_update(self, job, data): #pylint: disable=R0201
+ def _handle_update(self, job, data):
"""
- Updates any job metadata.
+ Control job handler updating any job metadata.
+
+ :param dictionary data: Job attributes with associated values that
+ shall be assigned to particular *job*.
"""
ind = self._prepare_indication_for(job)
job.update(**data)
@@ -434,7 +504,9 @@ class JobManager(threading.Thread):
@job_handler()
def _handle_delete(self, job):
"""
- Deletes finished asynchronous job.
+ Control job handler deleting finished asynchronous job. If the *job* is
+ not yet finished, the
+ :py:exc:`lmi.software.yumdb.errors.InvalidJobState` is raised.
"""
if not job.finished:
raise errors.InvalidJobState(
@@ -442,16 +514,16 @@ class JobManager(threading.Thread):
try:
self._job_queue.remove(job)
heapq.heapify(self._job_queue)
- LOG.debug('job "%s" removed from queue', job)
+ LOG().debug('job "%s" removed from queue', job)
except ValueError:
- LOG.debug('job "%s" not started and not enqueued', job)
+ LOG().debug('job "%s" not started and not enqueued', job)
del self._async_jobs[job.jobid]
return job
@job_handler()
def _handle_terminate(self, job):
"""
- Terminates not started job.
+ Control job handler terminating not started job.
"""
if job.started and not job.finished:
raise errors.InvalidJobState('can not kill running job "%s"' % job)
@@ -462,119 +534,168 @@ class JobManager(threading.Thread):
ind = self._prepare_indication_for(job)
job.finish(result=job.RESULT_TERMINATED)
ind.send(True)
- LOG.info('terminated not started job "%s"', job)
+ LOG().info('terminated not started job "%s"', job)
return job
# *************************************************************************
- # Public properties
+ # Public methods
# *************************************************************************
- @property
- def queue_in(self):
- """Incoming queue for YumJob instances."""
- return self._queue_in
+ @cmpi_logging.trace_method
+ def _finish_job(self, job, result, result_data):
+ """
+ This should be called on any processed job.
- @property
- def queue_out(self):
- """Output queue for results."""
- return self._queue_out
+ If the *job* is synchronous, reply is send at once. Otherwise the
+ result is stored for later client's query in the job itself, and
+ indications are sent.
+
+ .. note::
+ This shall be called only from :py:meth:`run` method.
+
+ :param int result: Result code that will be stored in a *job*.
+ :param result_data: Resulting data object that is to be stored in a
+ *job*.
+ :returns: Job object.
+ """
+ if job.state != job.RUNNING:
+ raise errors.InvalidJobState(
+ 'can not finish not started job "%s"' % job)
+ if getattr(job, 'async', False):
+ ind = self._prepare_indication_for(job,
+ (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
+ job.finish(result, result_data)
+ if getattr(job, 'async', False):
+ if job.delete_on_completion:
+ default = Configuration.get_instance().get_safe(
+ 'Log', 'MinimumTimeBeforeRemoval', float)
+ schedule_at = max(job.time_before_removal, default)
+ self._schedule_event(schedule_at, job.jobid,
+ self.ACTION_REMOVE)
+ if result == job.RESULT_SUCCESS:
+ ind.add_indication_ids(JM.IND_JOB_SUCCEEDED)
+ elif result == job.RESULT_ERROR:
+ ind.add_indication_ids(JM.IND_JOB_FAILED)
+ ind.send(True)
+ with self._reply_lock:
+ self._finished.add(job.jobid)
+ self._reply_cond.notifyAll()
+ return job
- # *************************************************************************
- # Public methods
- # *************************************************************************
@cmpi_logging.trace_method
- def finish_job(self, job, result, result_data):
+ def _get_job(self):
"""
- This should be called for any job by YumWorker after the job is
- processed.
+ Pop the first job enqueued by client out of :py:attr:`_job_queue`.
+ Job is started and returned.
- If the job is synchronous, reply is send at once. Otherwise the result
- is stored for later client's query in the job itself.
+ :returns: Job object. If the queue is empty, ``None`` is returned.
+ :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
"""
- with self._job_lock:
- if job.state != job.RUNNING:
- raise errors.InvalidJobState(
- 'can not finish not started job "%s"' % job)
- if getattr(job, 'async', False):
+ if self._job_queue:
+ job = heapq.heappop(self._job_queue)
+ if getattr(job, "async", False):
ind = self._prepare_indication_for(job,
(JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
- job.finish(result, result_data)
- if getattr(job, 'async', False):
- if job.delete_on_completion:
- default = Configuration.get_instance().get_safe(
- 'Log', 'MinimumTimeBeforeRemoval', float)
- schedule_at = max(job.time_before_removal, default)
- self._schedule_event(schedule_at, job.jobid,
- self.ACTION_REMOVE)
- if result == job.RESULT_SUCCESS:
- ind.add_indication_ids(JM.IND_JOB_SUCCEEDED)
- elif result == job.RESULT_ERROR:
- ind.add_indication_ids(JM.IND_JOB_FAILED)
+ job.start()
ind.send(True)
else:
- LOG.debug("sending reply for %s: (%s, %s)", job,
- job.ResultNames[job.result],
- cmpi_logging.render_value(job.result_data))
- self._queue_out.put(job)
+ job.start()
return job
@cmpi_logging.trace_method
- def get_job(self, block=True, timeout=None):
+ def process(self, job):
"""
- Method supposed to be used only by YumWorker. It pops the first job
- from _job_queue, starts it and returns it.
+ Enqueue given job and block until it's processed.
+
+ :returns: Modified job object with assigned result.
+ :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
"""
- start = time.time()
with self._job_lock:
- if len(self._job_queue) == 0 and not block:
- raise Queue.Empty
- while len(self._job_queue) == 0:
- if timeout:
- LOG.debug('waiting for job for %s seconds' % timeout)
- self._job_enqueued.wait(timeout)
- if len(self._job_queue) == 0:
- now = time.time()
- if timeout > now - start:
- raise Queue.Empty
- job = heapq.heappop(self._job_queue)
- if job is not None:
- if getattr(job, "async", False):
- ind = self._prepare_indication_for(job,
- (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
- job.start()
- ind.send(True)
- else:
- job.start()
+ self._enqueue_job(job)
+ if getattr(job, 'async', False):
+ return job.jobid
+ with self._reply_lock:
+ while job.jobid not in self._finished:
+ self._reply_cond.wait()
return job
- def run(self):
- """The entry point of thread."""
- global LOG #pylint: disable=W0603
- LOG = logging.getLogger(__name__)
- LOG.info("%s thread started", self.name)
+ @cmpi_logging.trace_method
+ def _clean_up(self):
+ """
+ Release the session manager with its separated process.
+ This shall be called from the main loop of this thread.
+ """
+ with self._job_lock:
+ if self._sessionmgr is not None:
+ self._sessionmgr.join()
+ self._sessionmgr = False
- while self._terminate is False:
- try:
+ @cmpi_logging.trace_method
+ def begin_session(self):
+ """
+ Nest into a session. This means: lock the yum database for exclusive
+ use until the top-level session ends. The opposite method,
+ :py:meth:`end_session` needs to be called same number of times in order
+ for yum database to be released.
+ """
+ return self.session_manager.begin_session()
+
+ @cmpi_logging.trace_method
+ def end_session(self):
+ """
+ Emerge from session. When the last session ends, yum database is
+ unlocked.
+
+ .. see::
+ :py:meth:`begin_session`
+ """
+ return self.session_manager.end_session()
+
+ def run(self):
+ """
+ Main lool of *JobManager*'s thread. It works with just one job object
+ at once until it's processed. It consumes enqueued jobs in
+ :py:attr:`_job_queue` and passes them to session manager object. It
+ also triggers actions scheduled in calendar.
+ """
+ LOG().info("%s thread started", self.name)
+ # This points to the currect job being processed by session manager.
+ # If no job is being processed, it's ``None``.
+ job = None
+ with self._job_lock:
+ while True:
timeout = None
- with self._job_lock:
- if len(self._calendar) > 0:
+
+ # check calendar for scheduled events
+ if self._calendar:
+ timeout = self._calendar[0][0] - time.time()
+ while timeout is not None and timeout <= 0:
+ _, jobid, action = heapq.heappop(self._calendar)
+ self._run_event(jobid, action)
+ if self._calendar:
timeout = self._calendar[0][0] - time.time()
- LOG.debug('waiting on input queue for job%s',
+ else:
+ timeout = None
+
+ # handle processed job if any
+ if job:
+ if self.session_manager.got_reply:
+ _, result, data = self.session_manager.pop_reply()
+ self._finish_job(job, result, data)
+ if isinstance(job, jobs.YumShutDown):
+ break
+ job = None
+
+ # check newly enqueued jobs
+ if job is None:
+ job = self._get_job()
+ if job:
+ self.session_manager.process(job, sync=False)
+
+ # wait for any event
+ LOG().debug('waiting on input queue for job%s',
(' with timeout %s' % timeout) if timeout else '')
- job = self._queue_in.get(timeout=timeout)
- with self._job_lock:
- self._enqueue_job(job)
- while self._terminate is False and \
- not self._queue_in.empty():
- # this won't throw
- self._enqueue_job(self._queue_in.get_nowait())
-
- except Queue.Empty:
- with self._job_lock:
- while ( len(self._calendar)
- and self._calendar[0][0] < time.time()):
- _, jobid, action = heapq.heappop(self._calendar)
- LOG.info('running action %s on job(id=%d)',
- self.ACTION_NAMES[action], jobid)
- self._run_event(jobid, action)
- LOG.info('%s thread terminating', self.name)
+ self._job_pending.wait(timeout=timeout)
+
+ self._clean_up()
+ LOG().info('%s thread terminating', self.name)
diff --git a/src/software/lmi/software/yumdb/jobs.py b/src/software/lmi/software/yumdb/jobs.py
index c30a875..9a07dc7 100644
--- a/src/software/lmi/software/yumdb/jobs.py
+++ b/src/software/lmi/software/yumdb/jobs.py
@@ -23,6 +23,7 @@
Define job classes representing kinds of jobs of worker process.
"""
+import logging
import os
import threading
import time
@@ -339,17 +340,25 @@ class YumJobTerminate(YumJobOnJob): #pylint: disable=R0903
# *****************************************************************************
# Yum API functions
# *****************************************************************************
-class YumBeginSession(YumJob): #pylint: disable=R0903
+class YumLock(YumJob): #pylint: disable=R0903
"""
- Begin session on YumWorker which ensures that yum database is locked
- during its lifetime. Sessions can be nested, but the number of
- YumEndSession jobs must be processed to make the database unlocked.
+ This job shall be sent when session starts. Session consists of several
+ requests to ``YumWorker`` during which no other application shall
+ interfere. Exclusive access to yum database is enforced with this job. When
+ the session is over, YumUnlock job needs to be sent to YumWorker.
"""
pass
-class YumEndSession(YumJob): #pylint: disable=R0903
+
+class YumUnlock(YumJob): #pylint: disable=R0903
+ """
+ This job shall be sent when session ends.
+ """
+ pass
+
+class YumShutDown(YumJob):
"""
- End the session started with YumBeginSession. If the last active session
- is ended, database will be unlocked.
+ Last job executed before provider terminates. It shuts down *YumWorker*,
+ *SessionManager* and *JobManager*.
"""
pass
@@ -677,3 +686,28 @@ class YumSetRepositoryEnabled(YumSpecificRepositoryJob):#pylint: disable=R0903
YumSpecificRepositoryJob.__init__(self, repo)
self.enable = bool(enable)
+def log_reply_error(job, reply):
+ """
+ Raises an exception in case of error occured in worker process
+ while processing job.
+ """
+ if isinstance(reply, (int, long)):
+ # asynchronous job
+ return
+ logger = logging.getLogger(__name__)
+ if not isinstance(reply, YumJob):
+ raise TypeError('expected instance of YumJob for reply, not "%s"'
+ % reply.__class__.__name__)
+ if reply.result == YumJob.RESULT_ERROR:
+ logger.error("%s failed with error %s: %s",
+ job, reply.result_data[0].__name__, str(reply.result_data[1]))
+ logger.trace_warn("%s exception traceback:\n%s%s: %s",
+ job, "".join(reply.result_data[2]),
+ reply.result_data[0].__name__, str(reply.result_data[1]))
+ reply.result_data[1].tb_printed = True
+ raise reply.result_data[1]
+ elif reply.result == YumJob.RESULT_TERMINATED:
+ logger.warn('%s terminated', job)
+ else:
+ logger.debug('%s completed with success', job)
+
diff --git a/src/software/lmi/software/yumdb/process.py b/src/software/lmi/software/yumdb/process.py
index 9069f72..ba429ac 100644
--- a/src/software/lmi/software/yumdb/process.py
+++ b/src/software/lmi/software/yumdb/process.py
@@ -44,13 +44,16 @@ from lmi.software.yumdb import jobs
from lmi.software.yumdb import packageinfo
from lmi.software.yumdb import packagecheck
from lmi.software.yumdb import repository
-from lmi.software.yumdb.jobmanager import JobManager
from lmi.software.yumdb.util import is_pkg_installed
from lmi.software.yumdb.util import setup_logging
# Global variable which gets its value after the start of YumWorker process.
LOG = None
+class ShutDownWorker(Exception):
+ """ Raised when shut down job is received. """
+ pass
+
# *****************************************************************************
# Utilities
# ****************************************************************************
@@ -185,19 +188,17 @@ def _needs_database(method):
Wrapper for the job handler method.
"""
created_session = False
- self._init_database() #pylint: disable=W0212
- if self._session_level == 0: #pylint: disable=W0212
- self._session_level = 1 #pylint: disable=W0212
+ self._init_database()
+ if self._state != YumWorker.STATE_LOCKED:
created_session = True
- self._lock_database() #pylint: disable=W0212
+ self._lock_database()
self._check_repository_configs()
try:
result = logged(self, *args, **kwargs)
return result
finally:
- if created_session is True: #pylint: disable=W0212
- self._session_level = 0 #pylint: disable=W0212
- self._unlock_database() #pylint: disable=W0212
+ if created_session is True:
+ self._unlock_database()
return _wrapper
# *****************************************************************************
@@ -210,19 +211,29 @@ class YumWorker(Process):
Jobs are dispatched by their class names to particular handler method.
- It spawns a second thread for managing asynchronous jobs and queue
- of incoming jobs. It's an instance of JobManager.
+ It can process just one job at once. It has following states:
+
+ ``STATE_IDLE``
+ Initial state where no information is cached. Next state
+ is ``STATE_LOCKED``.
+ ``STATE_LOCKED``
+ Database is locked, and :py:class:`yum.YumBase` is initialied.
+ Next state is ``STATE_UNLOCKED``.
+ ``STATE_UNLOCKED``
+ Database is unlocked. Yum objects are cached. Next state is
+ any of above ones.
"""
+ STATE_IDLE, STATE_LOCKED, STATE_UNLOCKED = range(3)
+
def __init__(self,
queue_in,
queue_out,
- indication_manager,
yum_kwargs=None):
Process.__init__(self, name="YumWorker")
- self._jobmgr = JobManager(queue_in, queue_out, indication_manager)
- self._session_level = 0
- self._session_ended = False
+ self._queue_in = queue_in
+ self._queue_out = queue_out
+ self._state = YumWorker.STATE_IDLE
if yum_kwargs is None:
yum_kwargs = {}
@@ -258,6 +269,7 @@ class YumWorker(Process):
LOG.info("freing database")
self._pkg_cache.clear()
self._yum_base = None
+ self._state = YumWorker.STATE_IDLE
@cmpi_logging.trace_method
def _lock_database(self):
@@ -267,11 +279,12 @@ class YumWorker(Process):
Try to lock it in loop, until success.
"""
+ assert self._state != YumWorker.STATE_LOCKED
while True:
try:
- LOG.info("trying to lock database - session level %d",
- self._session_level)
+ LOG.info("trying to lock database")
self._yum_base.doLock()
+ self._state = YumWorker.STATE_LOCKED
LOG.info("successfully locked up")
break
except yum.Errors.LockError as exc:
@@ -294,24 +307,24 @@ class YumWorker(Process):
self._yum_base.closeRpmDB()
del self._yum_base.history
self._yum_base.doUnlock()
+ self._state = YumWorker.STATE_UNLOCKED
@cmpi_logging.trace_method
def _get_job(self):
"""
- Get job from JobManager thread.
- If no job comes for long time, free database to save memory.
+ Wait for new job to process. If no job comes for a long time, free
+ database to save memory.
"""
while True:
- if self._session_ended and self._session_level == 0:
+ if self._state == YumWorker.STATE_UNLOCKED:
try:
timeout = util.Configuration.get_instance().get_safe(
'Yum', 'FreeDatabaseTimeout', float)
- return self._jobmgr.get_job(timeout=timeout)
+ return self._queue_in.get(timeout=timeout)
except TQueue.Empty:
self._free_database()
- self._session_ended = False
else:
- return self._jobmgr.get_job()
+ return self._queue_in.get()
@cmpi_logging.trace_method
def _transform_packages(self, packages,
@@ -457,8 +470,8 @@ class YumWorker(Process):
jobs.YumRemovePackage : self._handle_remove_package,
jobs.YumUpdateToPackage : self._handle_update_to_package,
jobs.YumUpdatePackage : self._handle_update_package,
- jobs.YumBeginSession : self._handle_begin_session,
- jobs.YumEndSession : self._handle_end_session,
+ jobs.YumLock : self._handle_lock,
+ jobs.YumUnlock : self._handle_unlock,
jobs.YumCheckPackage : self._handle_check_package,
jobs.YumCheckPackageFile : self._handle_check_package_file,
jobs.YumInstallPackageFromURI : \
@@ -472,6 +485,8 @@ class YumWorker(Process):
LOG.info("processing job %s(id=%d)",
job.__class__.__name__, job.jobid)
except KeyError:
+ if isinstance(job, jobs.YumShutDown):
+ raise ShutDownWorker()
LOG.error("No handler for job \"%s\"", job.__class__.__name__)
raise errors.UnknownJob("No handler for job \"%s\"." %
job.__class__.__name__)
@@ -499,56 +514,50 @@ class YumWorker(Process):
def _main_loop(self):
"""
This is a main loop called from run(). Jobs are handled here.
- It accepts a job from input queue, handles it,
- sends the result to output queue and marks the job as done.
+ It accepts a job from input queue, handles it and
+ sends the result to output queue.
- It is terminated, when None is received from input queue.
+ It is terminated, when :py:class:`~lmi.software.yumdb.jobs.YumShutDown`
+ object is received.
"""
while True:
job = self._get_job()
- if job is not None: # not a terminate command
- result = jobs.YumJob.RESULT_SUCCESS
- try:
- data = self._do_work(job)
- except Exception: #pylint: disable=W0703
- result = jobs.YumJob.RESULT_ERROR
- # (type, value, traceback)
- data = sys.exc_info()
- # traceback is not pickable - replace it with formatted
- # text
- data = (data[0], data[1], traceback.format_tb(data[2]))
- LOG.exception("job %s failed", job)
- self._jobmgr.finish_job(job, result, data)
- if job is None:
- self._session_level = 0
- self._session_ended = False
- self._unlock_database()
- LOG.info("waiting for %s to finish", self._jobmgr.name)
- self._jobmgr.join()
+ result = jobs.YumJob.RESULT_SUCCESS
+ try:
+ data = self._do_work(job)
+ except ShutDownWorker:
break
+ except Exception: #pylint: disable=W0703
+ result = jobs.YumJob.RESULT_ERROR
+ # (type, value, traceback)
+ data = sys.exc_info()
+ # traceback is not pickable - replace it with formatted
+ # text
+ data = (data[0], data[1], traceback.format_tb(data[2]))
+ LOG.exception("job %s failed", job)
+ self._queue_out.put((job.jobid, result, data))
+ self._unlock_database()
+ self._free_database()
@cmpi_logging.trace_method
- def _handle_begin_session(self):
- """
- Handler for session begin job.
- """
- self._session_level += 1
- LOG.info("beginning session level %s", self._session_level)
- if self._session_level == 1:
+ def _handle_lock(self):
+ """ Handle lock request. """
+ if self._state != YumWorker.STATE_LOCKED:
self._init_database()
self._lock_database()
self._check_repository_configs()
+ else:
+ LOG.warn('requested lock while database is already locked')
@cmpi_logging.trace_method
- def _handle_end_session(self):
+ def _handle_unlock(self):
"""
- Handler for session end job.
+ Handler for unlock job.
"""
- LOG.info("ending session level %d", self._session_level)
- self._session_level = max(self._session_level - 1, 0)
- if self._session_level == 0:
+ if self._state == YumWorker.STATE_LOCKED:
self._unlock_database()
- self._session_ended = True
+ else:
+ LOG.trace_warn('requested unlock on not locked database')
@_needs_database
def _handle_get_package_list(self, kind, allow_duplicates, sort,
@@ -871,10 +880,10 @@ class YumWorker(Process):
try:
if enable ^ res:
if enable is True:
- LOG.info("enabling repository %s" % repo)
+ LOG.info("enabling repository %s", repo)
repo.enable()
else:
- LOG.info("disabling repository %s" % repo)
+ LOG.info("disabling repository %s", repo)
repo.disable()
try:
yum.config.writeRawRepoFile(repo, only=["enabled"])
@@ -897,14 +906,14 @@ class YumWorker(Process):
"""
@return input queue for jobs
"""
- return self._jobmgr.queue_in
+ return self._queue_in
@property
def downlink(self):
"""
@return output queue for job results
"""
- return self._jobmgr.queue_out
+ return self._queue_out
# *************************************************************************
# Public methods
@@ -917,16 +926,8 @@ class YumWorker(Process):
global LOG
LOG = logging.getLogger(__name__)
LOG.info("running as pid=%d", self.pid)
- self._jobmgr.start()
- LOG.info("started %s as thread %s",
- self._jobmgr.name, self._jobmgr.ident)
self._pkg_cache = weakref.WeakValueDictionary()
- # This allows the code, that can be run both from broker and
- # YumWorker, to check, whether it's called by this process.
- from lmi.software.yumdb import YumDB
- YumDB.RUNNING_UNDER_CIMOM_PROCESS = False
-
self._main_loop()
LOG.info("terminating")