summaryrefslogtreecommitdiffstats
path: root/rpmci/async_subprocess.py
blob: 798872492ddb42e8b4193cd1cd9929fd1dcda1da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# async_subprocess.py:
# Run a subprocess asynchronously using GLib
#
# Licensed under the new-BSD license (http://www.opensource.org/licenses/bsd-license.php)
# Copyright (C) 2010 Red Hat, Inc.
# Written by Colin Walters <walters@verbum.org>

import os
import logging

import glib

PIPE_STDOUT = 'pipe-stdout'

def spawn_async_log_info(argv, cwd=None, stdout=None):
    logging.info("Starting subprocess: %r" % (argv, ))
    return AsyncProcess(argv, cwd=cwd, stdout=stdout)

def spawn_async_output_to_file(argv, output_filepath, on_exited, cwd=None):
    f = open(output_filepath, 'w')
    process = AsyncProcess(argv, cwd=cwd, stdout=PIPE_STDOUT)
    def _on_io(process, buf):
        if buf is not None:
            f.write(buf)
        else:
            f.close()
    def _on_exited(process, condition):
        on_exited(process, condition)
    process.set_callbacks(_on_io, _on_exited)
    return process
    
class AsyncProcess(object):
    def __init__(self, argv, cwd=None, stdout=None):
        self.argv = argv
        target_cwd = cwd or os.getcwd()
            
        if stdout == PIPE_STDOUT:
            (self.stdout, self._pipeout) = os.pipe()
            (pid, stdin, stdout, stderr) = glib.spawn_async(argv,
                                                            working_directory=target_cwd,
                                                            child_setup=self._child_setup_pipes,
                                                            user_data=None,
                                                            flags=(glib.SPAWN_SEARCH_PATH
                                                                   | glib.SPAWN_DO_NOT_REAP_CHILD)) 
            os.close(self._pipeout)
        else:
            (pid, stdin, stdout, stderr) = glib.spawn_async(argv,
                                                            working_directory=target_cwd,
                                                            child_setup=None,
                                                            user_data=None,
                                                            flags=(glib.SPAWN_SEARCH_PATH
                                                                   | glib.SPAWN_DO_NOT_REAP_CHILD
                                                                   | glib.SPAWN_STDOUT_TO_DEV_NULL
                                                                   | glib.SPAWN_STDERR_TO_DEV_NULL))
            self.stdout = None
            assert stdout is None
            assert stderr is None

        assert stdin is None
        self.pid = pid
        self._child_watch_id = glib.child_watch_add(pid, self._on_child_exited)
        self._output_callback = None
        self._exited_callback = None

    def _child_setup_pipes(self, *args):
        # Sadly we have to do a child setup function to get merged stdout/stderr
        os.dup2(self._pipeout, 1)
        os.dup2(self._pipeout, 2)

    def set_callbacks(self, output_callback, exited_callback):
        assert self._output_callback is None
        assert self._exited_callback is None
        self._output_callback = output_callback
        self._exited_callback = exited_callback
        if self.stdout is not None:
            glib.io_add_watch(self.stdout, glib.IO_IN | glib.IO_ERR | glib.IO_HUP,
                              self._on_io)

    def _on_child_exited(self, pid, condition):
        logging.info("Child pid %d exited with code %r" % (pid, condition))
        self._exited_callback(self, condition)

    def _on_io(self, source, condition):
        have_read = condition & glib.IO_IN
        if have_read:
            buf = os.read(source, 8192)
            self._output_callback(self, buf)
        if ((condition & glib.IO_ERR) > 0
            or (condition & glib.IO_HUP) > 0):
            self._output_callback(self, None)
            os.close(source)
            return False
        return have_read