summaryrefslogtreecommitdiffstats
path: root/src/software/openlmi/software/yumdb/jobmanager.py
blob: 872e81fecf030743d89d41d0436a2cb5cd1fb878 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
# Software Management Providers
#
# Copyright (C) 2012-2013 Red Hat, Inc.  All rights reserved.
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
#
# Authors: Michal Minar <miminar@redhat.com>
#
"""
This is a module for JobManager which is a separate thread of
YumWorker process. It keeps a cache of asynchronous jobs and handles
input and output queues.
"""
import heapq
import inspect
import logging
import Queue
import sys
import threading
import time
import traceback

from openlmi.software.yumdb import errors, jobs
from openlmi.software.yumdb.util import trace_function

# Minimum time to keep asynchronous job in cache after completion. In seconds.
MINIMUM_TIME_BEFORE_REMOVAL = 10

LOG = None

# *****************************************************************************
# Decorators
# *****************************************************************************
def job_handler(job_from_target=True):
    """
    Decorator for JobManager methods serving as handlers for control jobs.

    Decorator locks the job_lock of manager's instance.
    """
    def _wrapper_jft(method):
        """
        It consumes "target" keyword argument (which is job's id) and makes
        it an instance of YumJob. The method is then called with "job" argument
        instead of "target".
        """
        logged = trace_function(method)

        def _new_func(self, *args, **kwargs):
            """Wrapper around method."""
            if 'target' in kwargs:
                kwargs['job'] = kwargs.pop('target')
            callargs = inspect.getcallargs(method, self, *args, **kwargs)
            target = callargs.pop('job')
            with self._job_lock:                #pylint: disable=W0212
                if not target in self._async_jobs:  #pylint: disable=W0212
                    raise errors.JobNotFound(target)
                job = self._async_jobs[target]  #pylint: disable=W0212
                callargs['job'] = job
                return logged(**callargs)
        return _new_func

    def _simple_wrapper(method):
        """Just locks the job lock."""
        def _new_func(self, *args, **kwargs):
            """Wrapper around method."""
            with self._job_lock:                #pylint: disable=W0212
                return method(self, *args, **kwargs)
        return _new_func

    if job_from_target:
        return _wrapper_jft
    else:
        return _simple_wrapper

class JobManager(threading.Thread):
    """
    Separate thread for managing queue of jobs requested by client.
    There are three kinds of jobs, that are handled differently:
      * asynchronous - kept in _async_jobs dictionary until job is
            deleted by request or it expires;
            no reply is sent to client upon job's completion
      * synchronous  - reply is sent to client after job's completion;
            no reference to the job is kept afterwards
      * job control  - they are not enqueued in _job_queue for YumWorker
            to process, but are handled directly and in the FIFO order

    Both asynchronous and synchronous jobs are enqueued in _job_queue
    for YumWorker to obtain them. It's a priority queue sorting jobs by their
    priority.
    """
    # enumeration of actions, that may be enqueued in calendar
    ACTION_REMOVE = 0

    ACTION_NAMES = ['remove']

    def __init__(self, queue_in, queue_out):
        threading.Thread.__init__(self, name="JobManager")
        self._queue_in = queue_in
        self._queue_out = queue_out
        self._terminate = False

        # (time, jobid, action)
        self._calendar = []
        # {jobid : job}
        self._async_jobs = {}

        # lock for critical access to _calendar, _async_jobs and _job_queue
        self._job_lock = threading.RLock()
        # priority queue of jobs that are processed by YumWorker
        self._job_queue = []
        # condition for YumWorker waiting on empty _job_queue
        self._job_enqueued = threading.Condition(self._job_lock)

    # *************************************************************************
    # Private methods
    # *************************************************************************
    @trace_function
    def _control_job(self, job):
        """
        Function dispatching job to handler for particular YumJob subclass.
        """
        try:
            handler = {
                # these are from YumDB client
                jobs.YumJobGetList     : self._handle_get_list,
                jobs.YumJobGet         : self._handle_get,
                jobs.YumJobGetByName   : self._handle_get_by_name,
                jobs.YumJobSetPriority : self._handle_set_priority,
                jobs.YumJobReschedule  : self._handle_reschedule,
                jobs.YumJobUpdate      : self._handle_update,
                jobs.YumJobDelete      : self._handle_delete,
                jobs.YumJobTerminate   : self._handle_terminate,
            }[job.__class__]
            LOG.info("processing control job %s", str(job))
        except KeyError:
            raise errors.UnknownJob("No handler for job \"%s\"." %
                    job.__class__.__name__)
        return handler(**job.job_kwargs)

    @trace_function
    def _enqueue_job(self, job):
        """
        Insert incoming job into _job_queue.
        """
        if isinstance(job, jobs.YumJobControl):
            result = job.RESULT_SUCCESS
            job.start()
            try:
                data = self._control_job(job)
            except Exception:   #pylint: disable=W0703
                result = job.RESULT_ERROR
                data = sys.exc_info()
                data = (data[0], data[1], traceback.format_tb(data[2]))
                LOG.exception("control job %s failed", job)
            job.finish(result, data)
            LOG.debug("sending reply for %s: (%s, %s)", job,
                    job.ResultNames[job.result], job.result_data)
            self._queue_out.put(job)
        else:
            if job is None:
                LOG.debug('received terminating command')
                self._terminate = True
            LOG.debug('job %s enqued for YumWorker to handle', job)
            heapq.heappush(self._job_queue, job)
            if getattr(job, 'async', False) is True:
                self._async_jobs[job.jobid] = job
            self._job_enqueued.notify()

    @trace_function
    def _schedule_event(self, after, jobid, action):
        """
        Enqueue event into calendar. Event consists of time, jobid and
        action.
        """
        schedule_at = time.time() + after
        for (sched, jid, act) in self._calendar:
            if jid == jobid and act == action:
                if sched <= schedule_at:    # same event already scheduled
                    return
                # schedule it for early time
                LOG.debug('rescheduling action %s on job %d to take place'
                        ' after %d seconds (instead of %d)',
                        self.ACTION_NAMES[action], jid, after,
                        sched - schedule_at + after)
                self._calendar.remove((sched, jid, act))
                self._calendar.append((schedule_at, jid, act))
                heapq.heapify(self._calendar)
                return
        LOG.debug('scheduling action %s on job %d to take place after '
                ' %d seconds', self.ACTION_NAMES[action], jobid, after)
        heapq.heappush(self._calendar, (schedule_at, jobid, action))

    @trace_function
    def _run_event(self, jobid, action):
        """
        Process event from calendar.
        """
        if action == self.ACTION_REMOVE:
            with self._job_lock:
                del self._async_jobs[jobid]
        else:
            msg = "unsupported action: %s" % action
            raise ValueError(msg)

    # *************************************************************************
    # Job handlers
    # *************************************************************************
    @job_handler()
    def _handle_get(self, job):     #pylint: disable=R0201
        """@return job object"""
        return job

    @job_handler(False)
    def _handle_get_list(self):
        """@return list of all asynchronous jobs"""
        with self._job_lock:
            return sorted(self._async_jobs.values())

    @job_handler(False)
    def _handle_get_by_name(self, target):
        """@return job object filtered by name"""
        for job in self._async_jobs.values():
            if 'name' in job.metadata and target == job.metadata['name']:
                return job
        raise errors.JobNotFound(target)

    @job_handler()
    def _handle_set_priority(self, job, new_priority):
        """
        Modify job's priority and updates its position in queue.
        @return modified job object
        """
        if not isinstance(new_priority, (int, long)):
            raise TypeError('priority must be an integer')
        if job.priority != new_priority:
            job.update(priority=new_priority)
            if job in self._job_queue:
                heapq.heapify(self._job_queue)
        return job

    @job_handler()
    def _handle_reschedule(self, job,
            delete_on_completion,
            time_before_removal):
        """
        Changes job's schedule for its deletion.
        """
        if (   job.delete_on_completion == delete_on_completion
           and job.time_before_removal == time_before_removal):
            return
        if job.finished and job.delete_on_completion:
            for i, event in enumerate(self._calendar):
                if event[1] == job.jobid and event[2] == self.ACTION_REMOVE:
                    del self._calendar[i]
                    heapq.heapify(self._calendar)
                    break
        if delete_on_completion:
            schedule_at = time_before_removal
            if job.finished:
                schedule_at = job.finished + schedule_at - time.time()
            self._schedule_event(schedule_at, job.jobid, self.ACTION_REMOVE)
        job.delete_on_completion = delete_on_completion
        job.time_before_removal = time_before_removal
        return job

    @job_handler()
    def _handle_update(self, job, data):    #pylint: disable=R0201
        """
        Updates any job metadata.
        """
        job.update(**data)
        return job

    @job_handler()
    def _handle_delete(self, job):
        """
        Deletes finished asynchronous job.
        """
        if not job.finished:
            raise errors.InvalidJobState(
                    'can not delete unfinished job "%s"' % job)
        try:
            self._job_queue.remove(job)
            heapq.heapify(self._job_queue)
            LOG.debug('job "%s" removed from queue', job)
        except ValueError:
            LOG.debug('job "%s" not started and not enqueued', job)
        del self._async_jobs[job.jobid]
        return job

    @job_handler()
    def _handle_terminate(self, job):
        """
        Terminates not started job.
        """
        if job.started and not job.finished:
            raise errors.InvalidJobState('can not kill running job "%s"' % job)
        if job.finished:
            raise errors.InvalidJobState('job "%s" already finished' % job)
        self._job_queue.remove(job)
        heapq.heapify(self._job_queue)
        job.finish(result=job.RESULT_TERMINATED)
        LOG.info('terminated not started job "%s"', job)
        return job

    # *************************************************************************
    # Public properties
    # *************************************************************************
    @property
    def queue_in(self):
        """Incoming queue for YumJob instances."""
        return self._queue_in

    @property
    def queue_out(self):
        """Output queue for results."""
        return self._queue_out

    # *************************************************************************
    # Public methods
    # *************************************************************************
    @trace_function
    def finish_job(self, job, result, result_data):
        """
        This should be called for any job by YumWorker after the job is
        processed.

        If the job is synchronous, reply is send at once. Otherwise the result
        is stored for later client's query in the job itself.
        """
        with self._job_lock:
            if job.state != job.RUNNING:
                raise errors.InvalidJobState(
                        'can not finish not started job "%s"' % job)
            job.finish(result, result_data)
            if getattr(job, 'async', False):
                if job.delete_on_completion:
                    schedule_at = max( job.time_before_removal
                                     , MINIMUM_TIME_BEFORE_REMOVAL)
                    self._schedule_event(schedule_at, job.jobid,
                            self.ACTION_REMOVE)
            else:
                LOG.debug("sending reply for %s: (%s, %s)", job,
                        job.ResultNames[job.result], job.result_data)
                self._queue_out.put(job)
            return job

    @trace_function
    def get_job(self, block=True, timeout=None):
        """
        Method supposed to be used only by YumWorker. It pops the first job
        from _job_queue, starts it and returns it.
        """
        start = time.time()
        with self._job_lock:
            if len(self._job_queue) == 0 and not block:
                raise Queue.Empty
            while len(self._job_queue) == 0:
                if timeout:
                    LOG.debug('waiting for job for %s seconds' % timeout)
                self._job_enqueued.wait(timeout)
                if len(self._job_queue) == 0:
                    now = time.time()
                    if timeout > now - start:
                        raise Queue.Empty
            job = heapq.heappop(self._job_queue)
            if job is not None:
                job.start()
            return job

    def run(self):
        """The entry point of thread."""
        global LOG      #pylint: disable=W0603
        LOG = logging.getLogger(__name__)
        LOG.info("%s thread started", self.name)

        while self._terminate is False:
            try:
                timeout = None
                with self._job_lock:
                    if len(self._calendar) > 0:
                        timeout = self._calendar[0][0] - time.time()
                LOG.debug('waiting on input queue for job%s',
                    (' with timeout %s' % timeout) if timeout else '')
                job = self._queue_in.get(timeout=timeout)
                with self._job_lock:
                    self._enqueue_job(job)
                    while not self._queue_in.empty():
                        # this won't throw
                        self._enqueue_job(self._queue_in.get_nowait())

            except Queue.Empty:
                with self._job_lock:
                    while (   len(self._calendar)
                          and self._calendar[0][0] < time.time()):
                        _, jobid, action = heapq.heappop(self._calendar)
                        LOG.info('running action %s on job(id=%d)',
                                self.ACTION_NAMES[action], jobid)
                        self._run_event(jobid, action)
        LOG.info('%s thread terminating', self.name)