summaryrefslogtreecommitdiffstats
path: root/rpmci/subtask.py
blob: 283cfffbb1a4d1515c23f5c2ce2c6b2ae1cfaa18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# subprocess_msg.py:
# Wrapper for creating processes that logs their stderr to
# a given directory, rotating earlier logs.
#
# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
# Copyright (C) 2010 Red Hat, Inc.
# Written by Colin Walters <walters@verbum.org>

import os
import shutil
import logging
import time

import subprocess
import async_subprocess

_base_path = None
_failed_path = None
_old_path = None
def global_configure(config):
    global _base_path
    global _failed_path
    global _old_path
    _base_path = config.get('subtask', 'subtask_dir')
    _failed_path = os.path.join(_base_path, 'failed')
    _old_path = os.path.join(_base_path, 'old')
    if not os.path.isdir(_failed_path):
        os.makedirs(_failed_path)
    if not os.path.isdir(_old_path):
        os.makedirs(_old_path)

def prepare_task_logfile(taskid):
    log_path = os.path.join(_base_path, '%s.log' % (taskid, ))
    if os.path.isfile(log_path):
        curtime = int(time.time())
        saved_name = '%s-%d.log' % (taskid, int(time.time()),)
        os.rename(log_path, os.path.join(_old_path, saved_name))
    return log_path

def _init_task_run(taskid, argv, cwd):
    log_path = prepare_task_logfile(taskid)
    logging.info("Calling task %r synchronously, cwd=%r args=%r" % (taskid, cwd, argv))
    f = open(log_path, 'w')
    f.write("Logfile for process cwd=%r args=%r\n" % (cwd, argv))
    f.flush()
    return (log_path, f)

def _handle_task_exit(taskid, log_path, logf, proc):
    ecode = proc.wait()
    if ecode == 0:
        msg = "Subtask %s completed succesfully" % (taskid, )
        logging.info(msg)
    else:
        msg = "Subtask %s exited with code %d" % (taskid, ecode)
        logging.warn(msg)
    logf.write(msg)
    logf.close()
    if ecode != 0:
        os.rename(log_path, os.path.join(_failed_path, os.path.basename(log_path)))

def spawn_sync(taskid, argv, cwd=None):
    (log_path, f) = _init_task_run(taskid, argv, cwd)
    nullf = open(os.devnull, 'w')
    proc = subprocess.Popen(argv, cwd=cwd, stdin=nullf, stdout=f, stderr=f)
    logging.info("Started subtask %s, pid=%d" % (taskid, proc.pid))
    _handle_task_exit(taskid, log_path, f, proc)

def spawn_sync_get_output(taskid, argv, cwd=None):
    (log_path, f) = _init_task_run(taskid, argv, cwd)
    nullf = open(os.devnull, 'w')
    proc = subprocess.Popen(argv, cwd=cwd, stdin=nullf, stdout=subprocess.PIPE, stderr=f)
    logging.info("Started subtask %s, pid=%d" % (taskid, proc.pid))
    output = proc.communicate()[0]
    _handle_task_exit(taskid, log_path, f, proc)
    return output