summaryrefslogtreecommitdiffstats
path: root/src/software/lmi/software/yumdb/jobmanager.py
blob: 6a360ef8d1e92173c2a2ef2bf5e7e42cf12feadd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
# Software Management Providers
#
# Copyright (C) 2012-2013 Red Hat, Inc.  All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
#
# Authors: Michal Minar <miminar@redhat.com>
#
"""
This is a module for ``JobManager`` which is a separate thread of
``YumWorker`` process. It keeps a cache of asynchronous jobs and handles
input and output queues.

This module uses its own logging facilities because it runs in separeted
process not having access to broker logging features.

Before using ``JobManager``, module's variable ``JOB_TO_MODEL`` should
be set to callable taking ``YumJob`` instance and returning its matching
CIM abstraction instance.
"""
import heapq
import inspect
import logging
import Queue
import sys
import threading
import time
import traceback

from lmi.providers.IndicationManager import IndicationManager
from lmi.providers.JobManager import JobManager as JM
from lmi.software.yumdb import errors
from lmi.software.yumdb import jobs
from lmi.software.util import cmpi_logging
from lmi.software.util import Configuration

# This is a callable, which must be initialized before JobManager is used.
# It should be a pointer to function, which takes a job and returns
# corresponding CIM instance. It's used for sending indications.
JOB_TO_MODEL = lambda job: None

# replacement for cmpi_logging.logger
LOG = None

# *****************************************************************************
# Decorators
# *****************************************************************************
def job_handler(job_from_target=True):
    """
    Decorator for JobManager methods serving as handlers for control jobs.

    Decorator locks the job_lock of manager's instance.
    """
    def _wrapper_jft(method):
        """
        It consumes "target" keyword argument (which is job's id) and makes
        it an instance of YumJob. The method is then called with "job" argument
        instead of "target".
        """
        logged = cmpi_logging.trace_method(method)

        def _new_func(self, *args, **kwargs):
            """Wrapper around method."""
            if 'target' in kwargs:
                kwargs['job'] = kwargs.pop('target')
            callargs = inspect.getcallargs(method, self, *args, **kwargs)
            target = callargs.pop('job')
            with self._job_lock:                #pylint: disable=W0212
                if not target in self._async_jobs:  #pylint: disable=W0212
                    raise errors.JobNotFound(target)
                job = self._async_jobs[target]  #pylint: disable=W0212
                callargs['job'] = job
                return logged(**callargs)
        return _new_func

    def _simple_wrapper(method):
        """Just locks the job lock."""
        def _new_func(self, *args, **kwargs):
            """Wrapper around method."""
            with self._job_lock:                #pylint: disable=W0212
                return method(self, *args, **kwargs)
        return _new_func

    if job_from_target:
        return _wrapper_jft
    else:
        return _simple_wrapper

class JobIndicationSender(object):
    """
    Makes creation and sending of indications easy. It keeps a reference
    to job, which can be *snapshotted* for making CIM instance out of it.
    These instances are then used to send indications via IndicationManager.

    Typical usage::

        sender = JobIndicationSender(im, job, [fltr_id1, fltr_id2])
            ... # modify job
        sender.snapshot()
        sender.send()

    **Note** that number of kept CIM instances won't exceed 2. First one
    is created upon instantiation and the second one be calling
    ``snapshot()``. Any successive call to ``snapshot()`` will overwrite
    the second instance.
    """

    def __init__(self, indication_manager, job,
            indications=JM.IND_JOB_CHANGED, new=None):
        """
        :param job (``YumJob``) Is job instance, which will be immediately
            snapshoted as old instance and later as a new one.
        :param indications (``list``) Can either be a list of indication ids
            or a single indication id.
        :param new (``YumJob``) A job instance stored as new.
        """
        if not isinstance(indication_manager, IndicationManager):
            raise TypeError("indication_manager must be a subclass of"
                    " IndicationManager")
        if not isinstance(job, jobs.YumJob):
            raise TypeError("job must be an instance of YumJob")
        if not new is None and not isinstance(new, jobs.YumJob):
            raise TypeError("new must be an instance of YumJob")
        self._indication_manager = indication_manager
        self._job = job
        self._old_instance = JOB_TO_MODEL(job)
        if new is not None:
            new = JOB_TO_MODEL(job)
        self._new_instance = new
        self._indications = set()
        self.indication_ids = indications

    @property
    def job(self):
        """
        Return instance of ``YumJob``.
        """
        return self._job

    @property
    def indication_ids(self):
        """
        Return set of indication filter IDs.
        """
        return self._indications.copy()

    @indication_ids.setter
    def indication_ids(self, indication_ids):
        """
        Set the indication filter IDs.

        :param indication_ids (``list``) Can be even single id.
        """
        if isinstance(indication_ids, basestring):
            indication_ids = set([indication_ids])
        self._indications = set(indication_ids)

    @cmpi_logging.trace_method
    def add_indication_ids(self, indication_ids):
        """
        Add filter IDs.
        """
        if isinstance(indication_ids, basestring):
            indication_ids = set([indication_ids])
        self._indications.update(indication_ids)

    @cmpi_logging.trace_method
    def snapshot(self):
        """
        Make a second CIM instance, overwriting previous one (not the first).
        """
        self._new_instance = JOB_TO_MODEL(self._job)

    @cmpi_logging.trace_method
    def send(self, make_snapshot=False):
        """
        Send all requested indications for given job.
        """
        if not self._indications:
            raise errors.IndicationError(
                    "can not send any indication without id")
        if make_snapshot:
            self.snapshot()
        if (   JM.IND_JOB_CHANGED in self._indications
           and self._new_instance is None):
            raise errors.IndicationError("no snapshot made for modified job")
        for fltr_id in self._indications:
            if fltr_id == JM.IND_JOB_CREATED:
                LOG.debug("sending instance creation indication for job %s",
                        self._job)
                self._indication_manager.send_instcreation(
                        self._new_instance if self._new_instance is not None
                                           else self._old_instance,
                        fltr_id)
            else:
                LOG.debug("sending instance modification indication for job %s"
                        " with ID: %s", self._job, fltr_id)
                self._indication_manager.send_instmodification(
                        self._old_instance, self._new_instance,
                        fltr_id)

class JobManager(threading.Thread):
    """
    Separate thread for managing queue of jobs requested by client.
    There are three kinds of jobs, that are handled differently:
      * asynchronous - kept in _async_jobs dictionary until job is
            deleted by request or it expires;
            no reply is sent to client upon job's completion
      * synchronous  - reply is sent to client after job's completion;
            no reference to the job is kept afterwards
      * job control  - they are not enqueued in _job_queue for YumWorker
            to process, but are handled directly and in the FIFO order

    Both asynchronous and synchronous jobs are enqueued in _job_queue
    for YumWorker to obtain them. It's a priority queue sorting jobs by their
    priority.
    """
    # enumeration of actions, that may be enqueued in calendar
    ACTION_REMOVE = 0

    ACTION_NAMES = ['remove']

    def __init__(self, queue_in, queue_out, indication_manager):
        threading.Thread.__init__(self, name="JobManager")
        self._queue_in = queue_in
        self._queue_out = queue_out
        self._indication_manager = indication_manager
        self._terminate = False

        # (time, jobid, action)
        self._calendar = []
        # {jobid : job}
        self._async_jobs = {}

        # lock for critical access to _calendar, _async_jobs and _job_queue
        self._job_lock = threading.RLock()
        # priority queue of jobs that are processed by YumWorker
        self._job_queue = []
        # condition for YumWorker waiting on empty _job_queue
        self._job_enqueued = threading.Condition(self._job_lock)

    # *************************************************************************
    # Private methods
    # *************************************************************************
    @cmpi_logging.trace_method
    def _control_job(self, job):
        """
        Function dispatching job to handler for particular YumJob subclass.
        """
        try:
            handler = {
                # these are from YumDB client
                jobs.YumJobGetList     : self._handle_get_list,
                jobs.YumJobGet         : self._handle_get,
                jobs.YumJobGetByName   : self._handle_get_by_name,
                jobs.YumJobSetPriority : self._handle_set_priority,
                jobs.YumJobReschedule  : self._handle_reschedule,
                jobs.YumJobUpdate      : self._handle_update,
                jobs.YumJobDelete      : self._handle_delete,
                jobs.YumJobTerminate   : self._handle_terminate,
            }[job.__class__]
            LOG.info("processing control job %s", str(job))
        except KeyError:
            raise errors.UnknownJob("No handler for job \"%s\"." %
                    job.__class__.__name__)
        return handler(**job.job_kwargs)

    @cmpi_logging.trace_method
    def _enqueue_job(self, job):
        """
        Insert incoming job into _job_queue.
        """
        if isinstance(job, jobs.YumJobControl):
            result = job.RESULT_SUCCESS
            job.start()
            try:
                data = self._control_job(job)
            except Exception:   #pylint: disable=W0703
                result = job.RESULT_ERROR
                data = sys.exc_info()
                data = (data[0], data[1], traceback.format_tb(data[2]))
                LOG.exception("control job %s failed", job)
            job.finish(result, data)
            LOG.debug("sending reply for %s: (%s, %s)", job,
                    job.ResultNames[job.result], job.result_data)
            self._queue_out.put(job)
        else:
            if job is None:
                LOG.debug('received terminating command')
                self._terminate = True
            LOG.debug('job %s enqued for YumWorker to handle', job)
            heapq.heappush(self._job_queue, job)
            if getattr(job, 'async', False) is True:
                ind = self._prepare_indication_for(job, JM.IND_JOB_CREATED)
                self._async_jobs[job.jobid] = job
                ind.send()
            self._job_enqueued.notify()

    @cmpi_logging.trace_method
    def _schedule_event(self, after, jobid, action):
        """
        Enqueue event into calendar. Event consists of time, jobid and
        action.
        """
        schedule_at = time.time() + after
        for (sched, jid, act) in self._calendar:
            if jid == jobid and act == action:
                if sched <= schedule_at:    # same event already scheduled
                    return
                # schedule it for early time
                LOG.debug('rescheduling action %s on job %d to take place'
                        ' after %d seconds (instead of %d)',
                        self.ACTION_NAMES[action], jid, after,
                        sched - schedule_at + after)
                self._calendar.remove((sched, jid, act))
                self._calendar.append((schedule_at, jid, act))
                heapq.heapify(self._calendar)
                return
        LOG.debug('scheduling action %s on job %d to take place after '
                ' %d seconds', self.ACTION_NAMES[action], jobid, after)
        heapq.heappush(self._calendar, (schedule_at, jobid, action))

    @cmpi_logging.trace_method
    def _run_event(self, jobid, action):
        """
        Process event from calendar.
        """
        if action == self.ACTION_REMOVE:
            with self._job_lock:
                del self._async_jobs[jobid]
        else:
            msg = "unsupported action: %s" % action
            raise ValueError(msg)

    @cmpi_logging.trace_method
    def _prepare_indication_for(self, job, *args, **kwargs):
        """
        Return instance of ``JobIndicationSender``.
        """
        return JobIndicationSender(self._indication_manager, job,
            *args, **kwargs)

    # *************************************************************************
    # Job handlers
    # *************************************************************************
    @job_handler()
    def _handle_get(self, job):     #pylint: disable=R0201
        """@return job object"""
        return job

    @job_handler(False)
    def _handle_get_list(self):
        """@return list of all asynchronous jobs"""
        with self._job_lock:
            return sorted(self._async_jobs.values())

    @job_handler(False)
    def _handle_get_by_name(self, target):
        """@return job object filtered by name"""
        for job in self._async_jobs.values():
            if 'name' in job.metadata and target == job.metadata['name']:
                return job
        raise errors.JobNotFound(target)

    @job_handler()
    def _handle_set_priority(self, job, new_priority):
        """
        Modify job's priority and updates its position in queue.
        @return modified job object
        """
        if not isinstance(new_priority, (int, long)):
            raise TypeError('priority must be an integer')
        if job.priority != new_priority:
            ind = self._prepare_indication_for(job)
            job.update(priority=new_priority)
            if job in self._job_queue:
                heapq.heapify(self._job_queue)
            ind.send(True)
        return job

    @job_handler()
    def _handle_reschedule(self, job,
            delete_on_completion,
            time_before_removal):
        """
        Changes job's schedule for its deletion.
        """
        if (   job.delete_on_completion == delete_on_completion
           and job.time_before_removal == time_before_removal):
            return
        if job.finished and job.delete_on_completion:
            for i, event in enumerate(self._calendar):
                if event[1] == job.jobid and event[2] == self.ACTION_REMOVE:
                    del self._calendar[i]
                    heapq.heapify(self._calendar)
                    break
        ind = self._prepare_indication_for(job)
        if delete_on_completion:
            schedule_at = time_before_removal
            if job.finished:
                schedule_at = job.finished + schedule_at - time.time()
            self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
        job.delete_on_completion = delete_on_completion
        job.time_before_removal = time_before_removal
        ind.send(True)
        return job

    @job_handler()
    def _handle_update(self, job, data):    #pylint: disable=R0201
        """
        Updates any job metadata.
        """
        ind = self._prepare_indication_for(job)
        job.update(**data)
        ind.send(True)
        return job

    @job_handler()
    def _handle_delete(self, job):
        """
        Deletes finished asynchronous job.
        """
        if not job.finished:
            raise errors.InvalidJobState(
                    'can not delete unfinished job "%s"' % job)
        try:
            self._job_queue.remove(job)
            heapq.heapify(self._job_queue)
            LOG.debug('job "%s" removed from queue', job)
        except ValueError:
            LOG.debug('job "%s" not started and not enqueued', job)
        del self._async_jobs[job.jobid]
        return job

    @job_handler()
    def _handle_terminate(self, job):
        """
        Terminates not started job.
        """
        if job.started and not job.finished:
            raise errors.InvalidJobState('can not kill running job "%s"' % job)
        if job.finished:
            raise errors.InvalidJobState('job "%s" already finished' % job)
        self._job_queue.remove(job)
        heapq.heapify(self._job_queue)
        ind = self._prepare_indication_for(job)
        job.finish(result=job.RESULT_TERMINATED)
        ind.send(True)
        LOG.info('terminated not started job "%s"', job)
        return job

    # *************************************************************************
    # Public properties
    # *************************************************************************
    @property
    def queue_in(self):
        """Incoming queue for YumJob instances."""
        return self._queue_in

    @property
    def queue_out(self):
        """Output queue for results."""
        return self._queue_out

    # *************************************************************************
    # Public methods
    # *************************************************************************
    @cmpi_logging.trace_method
    def finish_job(self, job, result, result_data):
        """
        This should be called for any job by YumWorker after the job is
        processed.

        If the job is synchronous, reply is send at once. Otherwise the result
        is stored for later client's query in the job itself.
        """
        with self._job_lock:
            if job.state != job.RUNNING:
                raise errors.InvalidJobState(
                        'can not finish not started job "%s"' % job)
            if getattr(job, 'async', False):
                ind = self._prepare_indication_for(job,
                        (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
            job.finish(result, result_data)
            if getattr(job, 'async', False):
                if job.delete_on_completion:
                    default = Configuration.get_instance().get_safe(
                                'Log', 'MinimumTimeBeforeRemoval', float)
                    schedule_at = max(job.time_before_removal, default)
                    self._schedule_event(schedule_at, job.jobid,
                            self.ACTION_REMOVE)
                if result == job.RESULT_SUCCESS:
                    ind.add_indication_ids(JM.IND_JOB_SUCCEEDED)
                elif result == job.RESULT_ERROR:
                    ind.add_indication_ids(JM.IND_JOB_FAILED)
                ind.send(True)
            else:
                LOG.debug("sending reply for %s: (%s, %s)", job,
                        job.ResultNames[job.result], job.result_data)
                self._queue_out.put(job)
            return job

    @cmpi_logging.trace_method
    def get_job(self, block=True, timeout=None):
        """
        Method supposed to be used only by YumWorker. It pops the first job
        from _job_queue, starts it and returns it.
        """
        start = time.time()
        with self._job_lock:
            if len(self._job_queue) == 0 and not block:
                raise Queue.Empty
            while len(self._job_queue) == 0:
                if timeout:
                    LOG.debug('waiting for job for %s seconds' % timeout)
                self._job_enqueued.wait(timeout)
                if len(self._job_queue) == 0:
                    now = time.time()
                    if timeout > now - start:
                        raise Queue.Empty
            job = heapq.heappop(self._job_queue)
            if job is not None:
                if getattr(job, "async", False):
                    ind = self._prepare_indication_for(job,
                            (JM.IND_JOB_CHANGED, JM.IND_JOB_PERCENT_UPDATED))
                    job.start()
                    ind.send(True)
                else:
                    job.start()
            return job

    def run(self):
        """The entry point of thread."""
        global LOG      #pylint: disable=W0603
        LOG = logging.getLogger(__name__)
        LOG.info("%s thread started", self.name)

        while self._terminate is False:
            try:
                timeout = None
                with self._job_lock:
                    if len(self._calendar) > 0:
                        timeout = self._calendar[0][0] - time.time()
                LOG.debug('waiting on input queue for job%s',
                    (' with timeout %s' % timeout) if timeout else '')
                job = self._queue_in.get(timeout=timeout)
                with self._job_lock:
                    self._enqueue_job(job)
                    while not self._queue_in.empty():
                        # this won't throw
                        self._enqueue_job(self._queue_in.get_nowait())

            except Queue.Empty:
                with self._job_lock:
                    while (   len(self._calendar)
                          and self._calendar[0][0] < time.time()):
                        _, jobid, action = heapq.heappop(self._calendar)
                        LOG.info('running action %s on job(id=%d)',
                                self.ACTION_NAMES[action], jobid)
                        self._run_event(jobid, action)
        LOG.info('%s thread terminating', self.name)