summaryrefslogtreecommitdiffstats
path: root/src/software/lmi/software/yumdb/jobmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/software/lmi/software/yumdb/jobmanager.py')
-rw-r--r--src/software/lmi/software/yumdb/jobmanager.py574
1 files changed, 574 insertions, 0 deletions
diff --git a/src/software/lmi/software/yumdb/jobmanager.py b/src/software/lmi/software/yumdb/jobmanager.py
new file mode 100644
index 0000000..a224905
--- /dev/null
+++ b/src/software/lmi/software/yumdb/jobmanager.py
@@ -0,0 +1,574 @@
+# Software Management Providers
+#
+# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Authors: Michal Minar <miminar@redhat.com>
+#
+"""
+This is a module for ``JobManager`` which is a separate thread of
+``YumWorker`` process. It keeps a cache of asynchronous jobs and handles
+input and output queues.
+
+This module uses its own logging facilities because it runs in separeted
+process not having access to broker logging features.
+
+Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should
+be set to callable taking ``YumJob`` instance and returning its matching
+CIM abstraction instance.
+"""
+import heapq
+import inspect
+import logging
+import Queue
+import sys
+import threading
+import time
+import traceback
+
+from lmi.common.IndicationManager import IndicationManager
+from lmi.common.JobManager import JobManager as JM
+from lmi.software.yumdb import errors, jobs
+from lmi.software.yumdb.util import trace_function
+
+# This is a callable, which must be initialized before JobManager is used.
+# It should be a pointer to function, which takes a job and returns
+# corresponding CIM instance. It's used for sending indications.
+JOB_TO_MODEL = lambda job: None
+
+# Minimum time to keep asynchronous job in cache after completion. In seconds.
+MINIMUM_TIME_BEFORE_REMOVAL = 10
+
+# replacement for cmpi_logging.logger
+LOG = None
+
+# *****************************************************************************
+# Decorators
+# *****************************************************************************
+def job_handler(job_from_target=True):
+ """
+ Decorator for JobManager methods serving as handlers for control jobs.
+
+ Decorator locks the job_lock of manager's instance.
+ """
+ def _wrapper_jft(method):
+ """
+ It consumes "target" keyword argument (which is job's id) and makes
+ it an instance of YumJob. The method is then called with "job" argument
+ instead of "target".
+ """
+ logged = trace_function(method)
+
+ def _new_func(self, *args, **kwargs):
+ """Wrapper around method."""
+ if 'target' in kwargs:
+ kwargs['job'] = kwargs.pop('target')
+ callargs = inspect.getcallargs(method, self, *args, **kwargs)
+ target = callargs.pop('job')
+ with self._job_lock: #pylint: disable=W0212
+ if not target in self._async_jobs: #pylint: disable=W0212
+ raise errors.JobNotFound(target)
+ job = self._async_jobs[target] #pylint: disable=W0212
+ callargs['job'] = job
+ return logged(**callargs)
+ return _new_func
+
+ def _simple_wrapper(method):
+ """Just locks the job lock."""
+ def _new_func(self, *args, **kwargs):
+ """Wrapper around method."""
+ with self._job_lock: #pylint: disable=W0212
+ return method(self, *args, **kwargs)
+ return _new_func
+
+ if job_from_target:
+ return _wrapper_jft
+ else:
+ return _simple_wrapper
+
+class JobIndicationSender(object):
+ """
+ Makes creation and sending of indications easy. It keeps a reference
+ to job, which can be *snapshotted* for making CIM instance out of it.
+ These instances are then used to send indications via IndicationManager.
+
+ Typical usage::
+
+ sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2])
+ ... # modify job
+ sender.snapshot()
+ sender.send()
+
+ **Note** that number of kept CIM instances won't exceed 2. First one
+ is created upon instantiation and the second one be calling
+ ``snapshot()``. Any successive call to ``snapshot()`` will overwrite
+ the second instance.
+ """
+
+ def __init__(self, indication_manager, job,
+ indications=JM.IND_JOB_CHANGED, new=None):
+ """
+ :param job (``YumJob``) Is job instance, which will be immediately
+ snapshoted as old instance and later as a new one.
+ :param indications (``list``) Can either be a list of indication ids
+ or a single indication id.
+ :param new (``YumJob``) A job instance stored as new.
+ """
+ if not isinstance(indication_manager, IndicationManager):
+ raise TypeError("indication_manager must be a subclass of"
+ " IndicationManager")
+ if not isinstance(job, jobs.YumJob):
+ raise TypeError("job must be an instance of YumJob")
+ if not new is None and not isinstance(new, jobs.YumJob):
+ raise TypeError("new must be an instance of YumJob")
+ self._indication_manager = indication_manager
+ self._job = job
+ self._old_instance = JOB_TO_MODEL(job)
+ if new is not None:
+ new = JOB_TO_MODEL(job)
+ self._new_instance = new
+ self._indications = set()
+ self.indication_ids = indications
+
+ @property
+ def job(self):
+ """
+ Return instance of ``YumJob``.
+ """
+ return self._job
+
+ @property
+ def indication_ids(self):
+ """
+ Return set of indication filter IDs.
+ """
+ return self._indications.copy()
+
+ @indication_ids.setter
+ def indication_ids(self, indication_ids):
+ """
+ Set the indication filter IDs.
+
+ :param indication_ids (``list``) Can be even single id.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications = set(indication_ids)
+
+ @trace_function
+ def add_indication_ids(self, indication_ids):
+ """
+ Add filter IDs.
+ """
+ if isinstance(indication_ids, basestring):
+ indication_ids = set([indication_ids])
+ self._indications.update(indication_ids)
+
+ @trace_function
+ def snapshot(self):
+ """
+ Make a second CIM instance, overwriting previous one (not the first).
+ """
+ self._new_instance = JOB_TO_MODEL(self._job)
+
+ @trace_function
+ def send(self, make_snapshot=False):
+ """
+ Send all requested indications for given job.
+ """
+ if not self._indications:
+ raise errors.IndicationError(
+ "can not send any indication without id")
+ if make_snapshot:
+ self.snapshot()
+ if ( JM.IND_JOB_CHANGED in self._indications
+ and self._new_instance is None):
+ raise errors.IndicationError("no snapshot made for modified job")
+ for fltr_id in self._indications:
+ if fltr_id == JM.IND_JOB_CREATED:
+ LOG.debug("sending instance creation indication for job %s",
+ self._job)
+ self._indication_manager.send_instcreation(
+ self._new_instance if self._new_instance is not None
+ else self._old_instance,
+ fltr_id)
+ else:
+ LOG.debug("sending instance modification indication for job %s"
+ " with ID: %s", self._job, fltr_id)
+ self._indication_manager.send_instmodification(
+ self._old_instance, self._new_instance,
+ fltr_id)
+
+class JobManager(threading.Thread):
+ """
+ Separate thread for managing queue of jobs requested by client.
+ There are three kinds of jobs, that are handled differently:
+ * asynchronous - kept in _async_jobs dictionary until job is
+ deleted by request or it expires;
+ no reply is sent to client upon job's completion
+ * synchronous - reply is sent to client after job's completion;
+ no reference to the job is kept afterwards
+ * job control - they are not enqueued in _job_queue for YumWorker
+ to process, but are handled directly and in the FIFO order
+
+ Both asynchronous and synchronous jobs are enqueued in _job_queue
+ for YumWorker to obtain them. It's a priority queue sorting jobs by their
+ priority.
+ """
+ # enumeration of actions, that may be enqueued in calendar
+ ACTION_REMOVE = 0
+
+ ACTION_NAMES = ['remove']
+
+ def __init__(self, queue_in, queue_out, indication_manager):
+ threading.Thread.__init__(self, name="JobManager")
+ self._queue_in = queue_in
+ self._queue_out = queue_out
+ self._indication_manager = indication_manager
+ self._terminate = False
+
+ # (time, jobid, action)
+ self._calendar = []
+ # {jobid : job}
+ self._async_jobs = {}
+
+ # lock for critical access to _calendar, _async_jobs and _job_queue
+ self._job_lock = threading.RLock()
+ # priority queue of jobs that are processed by YumWorker
+ self._job_queue = []
+ # condition for YumWorker waiting on empty _job_queue
+ self._job_enqueued = threading.Condition(self._job_lock)
+
+ # *************************************************************************
+ # Private methods
+ # *************************************************************************
+ @trace_function
+ def _control_job(self, job):
+ """
+ Function dispatching job to handler for particular YumJob subclass.
+ """
+ try:
+ handler = {
+ # these are from YumDB client
+ jobs.YumJobGetList : self._handle_get_list,
+ jobs.YumJobGet : self._handle_get,
+ jobs.YumJobGetByName : self._handle_get_by_name,
+ jobs.YumJobSetPriority : self._handle_set_priority,
+ jobs.YumJobReschedule : self._handle_reschedule,
+ jobs.YumJobUpdate : self._handle_update,
+ jobs.YumJobDelete : self._handle_delete,
+ jobs.YumJobTerminate : self._handle_terminate,
+ }[job.__class__]
+ LOG.info("processing control job %s", str(job))
+ except KeyError:
+ raise errors.UnknownJob("No handler for job \"%s\"." %
+ job.__class__.__name__)
+ return handler(**job.job_kwargs)
+
+ @trace_function
+ def _enqueue_job(self, job):
+ """
+ Insert incoming job into _job_queue.
+ """
+ if isinstance(job, jobs.YumJobControl):
+ result = job.RESULT_SUCCESS
+ job.start()
+ try:
+ data = self._control_job(job)
+ except Exception: #pylint: disable=W0703
+ result = job.RESULT_ERROR
+ data = sys.exc_info()
+ data = (data[0], data[1], traceback.format_tb(data[2]))
+ LOG.exception("control job %s failed", job)
+ job.finish(result, data)
+ LOG.debug("sending reply for %s: (%s, %s)", job,
+ job.ResultNames[job.result], job.result_data)
+ self._queue_out.put(job)
+ else:
+ if job is None:
+ LOG.debug('received terminating command')
+ self._terminate = True
+ LOG.debug('job %s enqued for YumWorker to handle', job)
+ heapq.heappush(self._job_queue, job)
+ if getattr(job, 'async', False) is True:
+ ind = self._prepare_indication_for(job, JM.IND_JOB_CREATED)
+ self._async_jobs[job.jobid] = job
+ ind.send()
+ self._job_enqueued.notify()
+
+ @trace_function
+ def _schedule_event(self, after, jobid, action):
+ """
+ Enqueue event into calendar. Event consists of time, jobid and
+ action.
+ """
+ schedule_at = time.time() + after
+ for (sched, jid, act) in self._calendar:
+ if jid == jobid and act == action:
+ if sched <= schedule_at: # same event already scheduled
+ return
+ # schedule it for early time
+ LOG.debug('rescheduling action %s on job %d to take place'
+ ' after %d seconds (instead of %d)',
+ self.ACTION_NAMES[action], jid, after,
+ sched - schedule_at + after)
+ self._calendar.remove((sched, jid, act))
+ self._calendar.append((schedule_at, jid, act))
+ heapq.heapify(self._calendar)
+ return
+ LOG.debug('scheduling action %s on job %d to take place after '
+ ' %d seconds', self.ACTION_NAMES[action], jobid, after)
+ heapq.heappush(self._calendar, (schedule_at, jobid, action))
+
+ @trace_function
+ def _run_event(self, jobid, action):
+ """
+ Process event from calendar.
+ """
+ if action == self.ACTION_REMOVE:
+ with self._job_lock:
+ del self._async_jobs[jobid]
+ else:
+ msg = "unsupported action: %s" % action
+ raise ValueError(msg)
+
+ @trace_function
+ def _prepare_indication_for(self, job, *args, **kwargs):
+ """
+ Return instance of ``JobIndicationSender``.
+ """
+ return JobIndicationSender(self._indication_manager, job,
+ *args, **kwargs)
+
+ # *************************************************************************
+ # Job handlers
+ # *************************************************************************
+ @job_handler()
+ def _handle_get(self, job): #pylint: disable=R0201
+ """@return job object"""
+ return job
+
+ @job_handler(False)
+ def _handle_get_list(self):
+ """@return list of all asynchronous jobs"""
+ with self._job_lock:
+ return sorted(self._async_jobs.values())
+
+ @job_handler(False)
+ def _handle_get_by_name(self, target):
+ """@return job object filtered by name"""
+ for job in self._async_jobs.values():
+ if 'name' in job.metadata and target == job.metadata['name']:
+ return job
+ raise errors.JobNotFound(target)
+
+ @job_handler()
+ def _handle_set_priority(self, job, new_priority):
+ """
+ Modify job's priority and updates its position in queue.
+ @return modified job object
+ """
+ if not isinstance(new_priority, (int, long)):
+ raise TypeError('priority must be an integer')
+ if job.priority != new_priority:
+ ind = self._prepare_indication_for(job)
+ job.update(priority=new_priority)
+ if job in self._job_queue:
+ heapq.heapify(self._job_queue)
+ ind.send(True)
+ return job
+
+ @job_handler()
+ def _handle_reschedule(self, job,
+ delete_on_completion,
+ time_before_removal):
+ """
+ Changes job's schedule for its deletion.
+ """
+ if ( job.delete_on_completion == delete_on_completion
+ and job.time_before_removal == time_before_removal):
+ return
+ if job.finished and job.delete_on_completion:
+ for i, event in enumerate(self._calendar):
+ if event[1] == job.jobid and event[2] == self.ACTION_REMOVE:
+ del self._calendar[i]
+ heapq.heapify(self._calendar)
+ break
+ ind = self._prepare_indication_for(job)
+ if delete_on_completion:
+ schedule_at = time_before_removal
+ if job.finished:
+ schedule_at = job.finished + schedule_at - time.time()
+ self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
+ job.delete_on_completion = delete_on_completion
+ job.time_before_removal = time_before_removal
+ ind.send(True)
+ return job
+
+ @job_handler()
+ def _handle_update(self, job, data): #pylint: disable=R0201
+ """
+ Updates any job metadata.
+ """
+ ind = self._prepare_indication_for(job)
+ job.update(**data)
+ ind.send(True)
+ return job
+
+ @job_handler()
+ def _handle_delete(self, job):
+ """
+ Deletes finished asynchronous job.
+ """
+ if not job.finished:
+ raise errors.InvalidJobState(
+ 'can not delete unfinished job "%s"' % job)
+ try:
+ self._job_queue.remove(job)
+ heapq.heapify(self._job_queue)
+ LOG.debug('job "%s" removed from queue', job)
+ except ValueError:
+ LOG.debug('job "%s" not started and not enqueued', job)
+ del self._async_jobs[job.jobid]
+ return job
+
+ @job_handler()
+ def _handle_terminate(self, job):
+ """
+ Terminates not started job.
+ """
+ if job.started and not job.finished:
+ raise errors.InvalidJobState('can not kill running job "%s"' % job)
+ if job.finished:
+ raise errors.InvalidJobState('job "%s" already finished' % job)
+ self._job_queue.remove(job)
+ heapq.heapify(self._job_queue)
+ ind = self._prepare_indication_for(job)
+ job.finish(result=job.RESULT_TERMINATED)
+ ind.send(True)
+ LOG.info('terminated not started job "%s"', job)
+ return job
+
+ # *************************************************************************
+ # Public properties
+ # *************************************************************************
+ @property
+ def queue_in(self):
+ """Incoming queue for YumJob instances."""
+ return self._queue_in
+
+ @property
+ def queue_out(self):
+ """Output queue for results."""
+ return self._queue_out
+
+ # *************************************************************************
+ # Public methods
+ # *************************************************************************
+ @trace_function
+ def finish_job(self, job, result, result_data):
+ """
+ This should be called for any job by YumWorker after the job is
+ processed.
+
+ If the job is synchronous, reply is send at once. Otherwise the result
+ is stored for later client's query in the job itself.
+ """
+ with self._job_lock:
+ if job.state != job.RUNNING:
+ raise errors.InvalidJobState(
+ 'can not finish not started job "%s"' % job)
+ if getattr(job, 'async', False):
+ ind = self._prepare_indication_for(job,
+ (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
+ job.finish(result, result_data)
+ if getattr(job, 'async', False):
+ if job.delete_on_completion:
+ schedule_at = max( job.time_before_removal
+ , MINIMUM_TIME_BEFORE_REMOVAL)
+ self._schedule_event(schedule_at, job.jobid,
+ self.ACTION_REMOVE)
+ if result == job.RESULT_SUCCESS:
+ ind.add_indication_ids(JM.IND_JOB_SUCCEEDED)
+ elif result == job.RESULT_ERROR:
+ ind.add_indication_ids(JM.IND_JOB_FAILED)
+ ind.send(True)
+ else:
+ LOG.debug("sending reply for %s: (%s, %s)", job,
+ job.ResultNames[job.result], job.result_data)
+ self._queue_out.put(job)
+ return job
+
+ @trace_function
+ def get_job(self, block=True, timeout=None):
+ """
+ Method supposed to be used only by YumWorker. It pops the first job
+ from _job_queue, starts it and returns it.
+ """
+ start = time.time()
+ with self._job_lock:
+ if len(self._job_queue) == 0 and not block:
+ raise Queue.Empty
+ while len(self._job_queue) == 0:
+ if timeout:
+ LOG.debug('waiting for job for %s seconds' % timeout)
+ self._job_enqueued.wait(timeout)
+ if len(self._job_queue) == 0:
+ now = time.time()
+ if timeout > now - start:
+ raise Queue.Empty
+ job = heapq.heappop(self._job_queue)
+ if job is not None:
+ if getattr(job, "async", False):
+ ind = self._prepare_indication_for(job,
+ (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
+ job.start()
+ ind.send(True)
+ else:
+ job.start()
+ return job
+
+ def run(self):
+ """The entry point of thread."""
+ global LOG #pylint: disable=W0603
+ LOG = logging.getLogger(__name__)
+ LOG.info("%s thread started", self.name)
+
+ while self._terminate is False:
+ try:
+ timeout = None
+ with self._job_lock:
+ if len(self._calendar) > 0:
+ timeout = self._calendar[0][0] - time.time()
+ LOG.debug('waiting on input queue for job%s',
+ (' with timeout %s' % timeout) if timeout else '')
+ job = self._queue_in.get(timeout=timeout)
+ with self._job_lock:
+ self._enqueue_job(job)
+ while not self._queue_in.empty():
+ # this won't throw
+ self._enqueue_job(self._queue_in.get_nowait())
+
+ except Queue.Empty:
+ with self._job_lock:
+ while ( len(self._calendar)
+ and self._calendar[0][0] < time.time()):
+ _, jobid, action = heapq.heappop(self._calendar)
+ LOG.info('running action %s on job(id=%d)',
+ self.ACTION_NAMES[action], jobid)
+ self._run_event(jobid, action)
+ LOG.info('%s thread terminating', self.name)
+