summaryrefslogtreecommitdiffstats
path: root/src/software/lmi/software/yumdb/jobmanager.py
blob: e171653619c8c0e48cb5e14f2125e2fc39a7ee12 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
# Software Management Providers
#
# Copyright (C) 2012-2013 Red Hat, Inc.  All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
#
# Authors: Michal Minar <miminar@redhat.com>
#
"""
This is a module for ``JobManager`` which is a standalone thread in provider
process. It keeps a cache of asynchronous jobs and handles input queue of jobs
that are about to be handled by
:py:class:`~lmi.software.yumdb.process.YumWorker` process.

Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should be set
to a callable taking :py:class:`~lmi.software.yumdb.jobs.YumJob` instance and
returning its matching CIM abstraction instance.
"""
from functools import wraps
import heapq
import inspect
import sys
import threading
import time
import traceback

from lmi.providers import cmpi_logging
from lmi.providers.IndicationManager import IndicationManager
from lmi.providers.JobManager import JobManager as JM
from lmi.software.yumdb import errors
from lmi.software.yumdb import jobs
from lmi.software.yumdb.sessionmanager import SessionManager
from lmi.software.util import Configuration

# This is a callable, which must be initialized before JobManager is used.
# It should be a pointer to function, which takes a job and returns
# corresponding CIM instance. It's used for sending indications.
JOB_TO_MODEL = lambda job: None

LOG = cmpi_logging.get_logger(__name__)

# *****************************************************************************
# Decorators
# *****************************************************************************
def job_handler(job_from_target=True):
    """
    Decorator for JobManager methods serving as handlers for control jobs.

    Decorator locks the :py:attr:`JobManager._job_lock` of manager's instance.
    """
    def _wrapper_jft(method):
        """
        It consumes "target" keyword argument (which is job's id) and makes
        it an instance of YumJob. The method is then called with "job" argument
        instead of "target".
        """
        logged = cmpi_logging.trace_method(method, frame_level=2)

        @wraps(method)
        def _new_func(self, *args, **kwargs):
            """Wrapper around method."""
            if 'target' in kwargs:
                kwargs['job'] = kwargs.pop('target')
            callargs = inspect.getcallargs(method, self, *args, **kwargs)
            target = callargs.pop('job')
            with self._job_lock:
                if not target in self._async_jobs:
                    raise errors.JobNotFound(target)
                job = self._async_jobs[target]
                callargs['job'] = job
                return logged(**callargs)
        return _new_func

    def _simple_wrapper(method):
        """Just locks the job lock."""
        @wraps(method)
        def _new_func(self, *args, **kwargs):
            """Wrapper around method."""
            with self._job_lock:
                return method(self, *args, **kwargs)
        return _new_func

    if job_from_target:
        return _wrapper_jft
    else:
        return _simple_wrapper

class JobIndicationSender(object):
    """
    Makes creation and sending of indications easy. It keeps a reference
    to job, which can be *snapshotted* for making CIM instance out of it.
    These instances are then used to send indications via
    :py:class:`lmi.providers.IndicationManager.IndicationManager`.

    Typical usage::

        sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2])
            ... # modify job
        sender.snapshot()
        sender.send()

    .. note::
        Number of kept CIM instances won't exceed 2. First one is created upon
        instantiation and the second one with the subsequent call to
        :py:meth:`snapshot()`. Any successive call to it will overwrite the
        second instance.

    :param job: Is job instance, which will be immediately snapshoted as old
        instance and later as a new one.
    :type job: :py:class:`~lmi.software.yumdb.jobs.YumJob`
    :param list indications: Can either be a list of indication ids or a single
        indication id.
    :param new: A job instance stored as new.
    :type new: :py:class:`~lmi.software.yumdb.jobs.YumJob`
    """

    def __init__(self, indication_manager, job,
            indications=JM.IND_JOB_CHANGED, new=None):
        if not isinstance(indication_manager, IndicationManager):
            raise TypeError("indication_manager must be a subclass of"
                    " IndicationManager")
        if not isinstance(job, jobs.YumJob):
            raise TypeError("job must be an instance of YumJob")
        if not new is None and not isinstance(new, jobs.YumJob):
            raise TypeError("new must be an instance of YumJob")
        self._indication_manager = indication_manager
        self._job = job
        self._old_instance = JOB_TO_MODEL(job)
        if new is not None:
            new = JOB_TO_MODEL(job)
        self._new_instance = new
        self._indications = set()
        self.indication_ids = indications

    @property
    def job(self):
        """
        :returns: Associated job object.
        :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
        """
        return self._job

    @property
    def indication_ids(self):
        """
        :returns: Set of indication filter IDs.
        :rtype: set
        """
        return self._indications.copy()

    @indication_ids.setter
    def indication_ids(self, indication_ids):
        """
        Set the indication filter IDs.

        :param list indication_ids: Can be even single id.
        """
        if isinstance(indication_ids, basestring):
            indication_ids = set([indication_ids])
        self._indications = set(indication_ids)

    @cmpi_logging.trace_method
    def add_indication_ids(self, indication_ids):
        """
        Add filter IDs.
        """
        if isinstance(indication_ids, basestring):
            indication_ids = set([indication_ids])
        self._indications.update(indication_ids)

    @cmpi_logging.trace_method
    def snapshot(self):
        """
        Make a second CIM instance, overwriting previous one (not the first).
        """
        self._new_instance = JOB_TO_MODEL(self._job)

    @cmpi_logging.trace_method
    def send(self, make_snapshot=False):
        """
        Send all requested indications for given job.
        """
        if not self._indications:
            raise errors.IndicationError(
                    "can not send any indication without id")
        if make_snapshot:
            self.snapshot()
        if (   JM.IND_JOB_CHANGED in self._indications
           and self._new_instance is None):
            raise errors.IndicationError("no snapshot made for modified job")
        indication_manager = IndicationManager.get_instance()
        for fltr_id in self._indications:
            if fltr_id == JM.IND_JOB_CREATED:
                LOG().debug("sending instance creation indication for job %s",
                        self._job)
                indication_manager.send_instcreation(
                        self._new_instance if self._new_instance is not None
                                           else self._old_instance,
                        fltr_id)
            else:
                LOG().debug("sending instance modification indication for job"
                        " %s with ID: %s", self._job, fltr_id)
                indication_manager.send_instmodification(
                        self._old_instance, self._new_instance,
                        fltr_id)

class JobManager(threading.Thread):
    """
    Separate thread for managing queue of jobs requested by client.
    There are three kinds of jobs, that are handled differently:

      * asynchronous - kept in ``_async_jobs`` dictionary until job is
            deleted by request or it expires;
            no reply is sent to client upon job's completion
      * synchronous  - reply is sent to client after job's completion;
            no reference to the job is kept afterwards
      * job control  - they are not enqueued in ``_job_queue`` for
            :py:class:`YumWorker` to process, but are handled directly and in
            the *FIFO* order

    Both asynchronous and synchronous jobs are enqueued in ``_job_queue`` and
    will be sent to :py:class:`~lmi.software.yumdb.process.YumWorker` for
    processing. It's a priority queue sorting jobs by their priority.
    :py:class:`~lmi.software.yumdb.sessionmanager.SessionManager` is the actual
    object managing ``YumWorker`` process. All jobs are passed to it.
    """
    #: enumeration of actions, that may be enqueued in calendar
    ACTION_REMOVE = 0

    #: names for actions defined above, their values are indexes to this list
    ACTION_NAMES = ['remove']

    def __init__(self):
        threading.Thread.__init__(self, name="JobManager")

        #: Session manager shall be accessed via property ``session_manager``.
        #: It's instantiated when first needed.
        self._sessionmgr = None

        #: lock for critical access to _sessionmgr, _calendar,
        #: _async_jobs and _job_queue
        self._job_lock = threading.RLock()
        #: priority queue of jobs that are processed by YumWorker
        self._job_queue = []
        #: (time, jobid, action)
        self._calendar = []
        #: {jobid : job}
        self._async_jobs = {}
        #: Condition object being wait upon in the main loop of this thread.
        #: It's notified either when the new job request comes or when the
        #: YumWorker replies.
        self._job_pending = threading.Condition(self._job_lock)

        #: used to guard access to _finished set
        self._reply_lock = threading.Lock()
        #: used to wait for job to be processed and received
        self._reply_cond = threading.Condition(self._reply_lock)
        #: job ids are kept here for each finished job
        self._finished = set()

    # *************************************************************************
    # Private methods
    # *************************************************************************
    @cmpi_logging.trace_method
    def _control_job(self, job):
        """
        Function dispatching job to handler for particular
        :py:class:`~lmi.software.yumdb.jobs.YumJob` subclass.

        .. note::
            This can be called only from client thread.
        """
        try:
            handler = {
                # these are from YumDB client
                jobs.YumJobGetList     : self._handle_get_list,
                jobs.YumJobGet         : self._handle_get,
                jobs.YumJobGetByName   : self._handle_get_by_name,
                jobs.YumJobSetPriority : self._handle_set_priority,
                jobs.YumJobReschedule  : self._handle_reschedule,
                jobs.YumJobUpdate      : self._handle_update,
                jobs.YumJobDelete      : self._handle_delete,
                jobs.YumJobTerminate   : self._handle_terminate,
            }[job.__class__]
            LOG().info("processing control job %s", str(job))
        except KeyError:
            raise errors.UnknownJob("No handler for job \"%s\"." %
                    job.__class__.__name__)
        return handler(**job.job_kwargs)

    @cmpi_logging.trace_method
    def _enqueue_job(self, job):
        """
        Insert incoming job into :py:attr:`_job_queue` or handle it directly if
        it's control job.

        .. note::
            This can be called only from client thread.
        """
        if isinstance(job, jobs.YumJobControl):
            result = job.RESULT_SUCCESS
            job.start()
            try:
                data = self._control_job(job)
            except Exception:   #pylint: disable=W0703
                result = job.RESULT_ERROR
                data = sys.exc_info()
                data = (data[0], data[1], traceback.format_tb(data[2]))
                LOG().exception("control job %s failed", job)
            self._finish_job(job, result, data)
        else:
            LOG().debug('job %s enqued for YumWorker to handle', job)
            heapq.heappush(self._job_queue, job)
            if getattr(job, 'async', False) is True:
                ind = self._prepare_indication_for(job, JM.IND_JOB_CREATED)
                self._async_jobs[job.jobid] = job
                ind.send()
            self._job_pending.notify()

    @cmpi_logging.trace_method
    def _schedule_event(self, after, jobid, action):
        """
        Enqueue event into calendar. Event consists of *time*, *jobid* and
        *action*.

        :param float after: Number of seconds from now until the action is
            shall be triggered.
        :param int jobid: Id of job which is a subject of action execution.
        :param int action: Value of action to do.

        .. note::
            This can be called only from client thread.
        """
        schedule_at = time.time() + after
        for (sched, jid, act) in self._calendar:
            if jid == jobid and act == action:
                if sched <= schedule_at:    # same event already scheduled
                    return
                # schedule it for early time
                LOG().debug('rescheduling action %s on job %d to take place'
                        ' after %d seconds (instead of %d)',
                        self.ACTION_NAMES[action], jid, after,
                        sched - schedule_at + after)
                self._calendar.remove((sched, jid, act))
                self._calendar.append((schedule_at, jid, act))
                heapq.heapify(self._calendar)
                return
        LOG().debug('scheduling action %s on job %d to take place after '
                ' %d seconds', self.ACTION_NAMES[action], jobid, after)
        heapq.heappush(self._calendar, (schedule_at, jobid, action))

    @cmpi_logging.trace_method
    def _run_event(self, jobid, action):
        """
        Process event from calendar.

        :param int jobid: Id of job that is a subject of action.
        :param int action: One of predefined actions to be executed on
            particular job.
        """
        LOG().info('running action %s on job(id=%d)',
                self.ACTION_NAMES[action], jobid)
        if action == self.ACTION_REMOVE:
            with self._job_lock:
                del self._async_jobs[jobid]
        else:
            msg = "unsupported action: %s" % action
            raise ValueError(msg)

    @cmpi_logging.trace_method
    def _prepare_indication_for(self, job, *args, **kwargs):
        """
        Convenience method making indication sender context manager.

        :param list args: Positional arguments passed after *job* to
            the constructor of :py:class:`JobIndicationSender`.
        :param dictionary kwargs: Its keyword arguments.
        :returns: Context manager for snapshoting and sending job replated
            indications.
        :rtype: :py:class:`JobIndicationSender`
        """
        return JobIndicationSender(IndicationManager.get_instance(), job,
            *args, **kwargs)

    @property
    def session_manager(self):
        """
        Accessor of session manager. Object is instantiated upon first access.

        :rtype: :py:class:`lmi.software.yumdb.sessionmanager.SessionManager`
        """
        with self._job_lock:
            if self._sessionmgr is None:
                self._sessionmgr = SessionManager(self._job_pending)
                self._sessionmgr.start()
            return self._sessionmgr

    # *************************************************************************
    # Job handlers
    # *************************************************************************
    @job_handler()
    def _handle_get(self, job):
        """ Control job handler returning job object requested. """
        return job

    @job_handler(False)
    def _handle_get_list(self):
        """ Control job handler returning list of all asynchronous jobs. """
        with self._job_lock:
            return sorted(self._async_jobs.values())

    @job_handler(False)
    def _handle_get_by_name(self, target):
        """
        Control job handler returning job object requested by its name.

        :param string target: Name of job to find. ``Name`` property of
            available asynchronous jobs is queried.
        """
        for job in self._async_jobs.values():
            if 'name' in job.metadata and target == job.metadata['name']:
                return job
        raise errors.JobNotFound(target)

    @job_handler()
    def _handle_set_priority(self, job, new_priority):
        """
        Control job handler modifying job's priority and updating its position
        in queue.

        :returns: Modified job object.
        :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
        """
        if not isinstance(new_priority, (int, long)):
            raise TypeError('priority must be an integer')
        if job.priority != new_priority:
            ind = self._prepare_indication_for(job)
            job.update(priority=new_priority)
            if job in self._job_queue:
                heapq.heapify(self._job_queue)
            ind.send(True)
        return job

    @job_handler()
    def _handle_reschedule(self, job,
            delete_on_completion,
            time_before_removal):
        """
        Control job handler rescheduling job's deletion.

        :param job: Affected job object.
        :type job: :py:class:`~lmi.software.yumdb.jobs.YumJob`
        :param boolean delete_on_completion: Whether the job shall be
            automatically removed after its completion. The
            *time_before_removel* then comes into play.
        :param float time_before_removal: Number of seconds to wait before
            job's automatic removal.
        """
        if (   job.delete_on_completion == delete_on_completion
           and job.time_before_removal == time_before_removal):
            return
        if job.finished and job.delete_on_completion:
            for i, event in enumerate(self._calendar):
                if event[1] == job.jobid and event[2] == self.ACTION_REMOVE:
                    del self._calendar[i]
                    heapq.heapify(self._calendar)
                    break
        ind = self._prepare_indication_for(job)
        if delete_on_completion:
            schedule_at = time_before_removal
            if job.finished:
                schedule_at = job.finished + schedule_at - time.time()
            self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
        job.delete_on_completion = delete_on_completion
        job.time_before_removal = time_before_removal
        ind.send(True)
        return job

    @job_handler()
    def _handle_update(self, job, data):
        """
        Control job handler updating any job metadata.

        :param dictionary data: Job attributes with associated values that
            shall be assigned to particular *job*.
        """
        ind = self._prepare_indication_for(job)
        job.update(**data)
        ind.send(True)
        return job

    @job_handler()
    def _handle_delete(self, job):
        """
        Control job handler deleting finished asynchronous job. If the *job* is
        not yet finished, the
        :py:exc:`lmi.software.yumdb.errors.InvalidJobState` is raised.
        """
        if not job.finished:
            raise errors.InvalidJobState(
                    'can not delete unfinished job "%s"' % job)
        try:
            self._job_queue.remove(job)
            heapq.heapify(self._job_queue)
            LOG().debug('job "%s" removed from queue', job)
        except ValueError:
            LOG().debug('job "%s" not started and not enqueued', job)
        del self._async_jobs[job.jobid]
        return job

    @job_handler()
    def _handle_terminate(self, job):
        """
        Control job handler terminating not started job.
        """
        if job.started and not job.finished:
            raise errors.InvalidJobState('can not kill running job "%s"' % job)
        if job.finished:
            raise errors.InvalidJobState('job "%s" already finished' % job)
        self._job_queue.remove(job)
        heapq.heapify(self._job_queue)
        ind = self._prepare_indication_for(job)
        job.finish(result=job.RESULT_TERMINATED)
        ind.send(True)
        LOG().info('terminated not started job "%s"', job)
        return job

    # *************************************************************************
    # Public methods
    # *************************************************************************
    @cmpi_logging.trace_method
    def _finish_job(self, job, result, result_data):
        """
        This should be called on any processed job.

        If the *job* is synchronous, reply is send at once. Otherwise the
        result is stored for later client's query in the job itself, and
        indications are sent.

        .. note::
            This shall be called only from :py:meth:`run` method.

        :param int result: Result code that will be stored in a *job*.
        :param result_data: Resulting data object that is to be stored in a
            *job*.
        :returns: Job object.
        """
        if job.state != job.RUNNING:
            raise errors.InvalidJobState(
                    'can not finish not started job "%s"' % job)
        if getattr(job, 'async', False):
            ind = self._prepare_indication_for(job,
                    (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
        job.finish(result, result_data)
        if getattr(job, 'async', False):
            if job.delete_on_completion:
                default = Configuration.get_instance().get_safe(
                            'Log', 'MinimumTimeBeforeRemoval', float)
                schedule_at = max(job.time_before_removal, default)
                self._schedule_event(schedule_at, job.jobid,
                        self.ACTION_REMOVE)
            if result == job.RESULT_SUCCESS:
                ind.add_indication_ids(JM.IND_JOB_SUCCEEDED)
            elif result == job.RESULT_ERROR:
                ind.add_indication_ids(JM.IND_JOB_FAILED)
            ind.send(True)
        with self._reply_lock:
            self._finished.add(job.jobid)
            self._reply_cond.notifyAll()
        return job

    @cmpi_logging.trace_method
    def _get_job(self):
        """
        Pop the first job enqueued by client out of :py:attr:`_job_queue`.
        Job is started and returned.

        :returns: Job object. If the queue is empty, ``None`` is returned.
        :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
        """
        if self._job_queue:
            job = heapq.heappop(self._job_queue)
            if getattr(job, "async", False):
                ind = self._prepare_indication_for(job,
                        (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
                job.start()
                ind.send(True)
            else:
                job.start()
            return job

    @cmpi_logging.trace_method
    def process(self, job):
        """
        Enqueue given job and block until it's processed.

        :returns: Modified job object with assigned result.
        :rtype: :py:class:`~lmi.software.yumdb.jobs.YumJob`
        """
        with self._job_lock:
            self._enqueue_job(job)
        if getattr(job, 'async', False):
            return job.jobid
        with self._reply_lock:
            while job.jobid not in self._finished:
                self._reply_cond.wait()
            return job

    @cmpi_logging.trace_method
    def _clean_up(self):
        """
        Release the session manager with its separated process.
        This shall be called from the main loop of this thread.
        """
        with self._job_lock:
            if self._sessionmgr is not None:
                self._sessionmgr.join()
                self._sessionmgr = False

    @cmpi_logging.trace_method
    def begin_session(self):
        """
        Nest into a session. This means: lock the yum database for exclusive
        use until the top-level session ends. The opposite method,
        :py:meth:`end_session` needs to be called same number of times in order
        for yum database to be released.
        """
        return self.session_manager.begin_session()

    @cmpi_logging.trace_method
    def end_session(self):
        """
        Emerge from session. When the last session ends, yum database is
        unlocked.

        .. see::
            :py:meth:`begin_session`
        """
        return self.session_manager.end_session()

    def run(self):
        """
        Main lool of *JobManager*'s thread. It works with just one job object
        at once until it's processed. It consumes enqueued jobs in
        :py:attr:`_job_queue` and passes them to session manager object. It
        also triggers actions scheduled in calendar.
        """
        LOG().info("%s thread started", self.name)
        # This points to the currect job being processed by session manager.
        # If no job is being processed, it's ``None``.
        job = None
        with self._job_lock:
            while True:
                timeout = None

                # check calendar for scheduled events
                if self._calendar:
                    timeout = self._calendar[0][0] - time.time()
                while timeout is not None and timeout <= 0:
                    _, jobid, action = heapq.heappop(self._calendar)
                    self._run_event(jobid, action)
                    if self._calendar:
                        timeout = self._calendar[0][0] - time.time()
                    else:
                        timeout = None

                # handle processed job if any
                if job:
                    if self.session_manager.got_reply:
                        _, result, data = self.session_manager.pop_reply()
                        self._finish_job(job, result, data)
                        if isinstance(job, jobs.YumShutDown):
                            break
                        job = None

                # check newly enqueued jobs
                if job is None:
                    job = self._get_job()
                    if job:
                        self.session_manager.process(job, sync=False)

                # wait for any event
                LOG().debug('waiting on input queue for job%s',
                    (' with timeout %s' % timeout) if timeout else '')
                self._job_pending.wait(timeout=timeout)

        self._clean_up()
        LOG().info('%s thread terminating', self.name)