summaryrefslogtreecommitdiffstats
path: root/nova/process.py
blob: 13cb90e82909e8133695d3e614b05f65e9e6ca0c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2010 FathomDB Inc.
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

"""
Process pool using twisted threading
"""

import logging
import StringIO

from twisted.internet import defer
from twisted.internet import error
from twisted.internet import protocol
from twisted.internet import reactor

from nova import flags
from nova.exception import ProcessExecutionError

FLAGS = flags.FLAGS
flags.DEFINE_integer('process_pool_size', 4,
                     'Number of processes to use in the process pool')

# This is based on _BackRelay from twister.internal.utils, but modified to
#  capture both stdout and stderr, without odd stderr handling, and also to
#  handle stdin
class BackRelayWithInput(protocol.ProcessProtocol):
    """
    Trivial protocol for communicating with a process and turning its output
    into the result of a L{Deferred}.

    @ivar deferred: A L{Deferred} which will be called back with all of stdout
        and all of stderr as well (as a tuple).  C{terminate_on_stderr} is true
        and any bytes are received over stderr, this will fire with an
        L{_ProcessExecutionError} instance and the attribute will be set to
        C{None}.

    @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are
        received over stderr, this attribute will refer to a L{Deferred} which
        will be called back when the process ends.  This C{Deferred} is also
        associated with the L{_ProcessExecutionError} which C{deferred} fires
        with earlier in this case so that users can determine when the process
        has actually ended, in addition to knowing when bytes have been received
        via stderr.
    """

    def __init__(self, deferred, cmd, started_deferred=None,
                 terminate_on_stderr=False, check_exit_code=True,
                 process_input=None):
        self.deferred = deferred
        self.cmd = cmd
        self.stdout = StringIO.StringIO()
        self.stderr = StringIO.StringIO()
        self.started_deferred = started_deferred
        self.terminate_on_stderr = terminate_on_stderr
        self.check_exit_code = check_exit_code
        self.process_input = process_input
        self.on_process_ended = None

    def _build_execution_error(self, exit_code=None):
        return ProcessExecutionError(cmd=self.cmd,
                                     exit_code=exit_code,
                                     stdout=self.stdout.getvalue(),
                                     stderr=self.stderr.getvalue())

    def errReceived(self, text):
        self.stderr.write(text)
        if self.terminate_on_stderr and (self.deferred is not None):
            self.on_process_ended = defer.Deferred()
            self.deferred.errback(self._build_execution_error())
            self.deferred = None
            self.transport.loseConnection()

    def outReceived(self, text):
        self.stdout.write(text)

    def processEnded(self, reason):
        if self.deferred is not None:
            stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue()
            exit_code = reason.value.exitCode
            if self.check_exit_code and exit_code <> 0:
                self.deferred.errback(self._build_execution_error(exit_code))
            else:
                try:
                    if self.check_exit_code:
                        reason.trap(error.ProcessDone)
                    self.deferred.callback((stdout, stderr))
                except:
                    # NOTE(justinsb): This logic is a little suspicious to me...
                    # If the callback throws an exception, then errback will be
                    # called also. However, this is what the unit tests test for...
                    self.deferred.errback(self._build_execution_error(exit_code))
        elif self.on_process_ended is not None:
            self.on_process_ended.errback(reason)


    def connectionMade(self):
        if self.started_deferred:
            self.started_deferred.callback(self)
        if self.process_input:
            self.transport.write(str(self.process_input))
        self.transport.closeStdin()

def get_process_output(executable, args=None, env=None, path=None,
                       process_reactor=None, check_exit_code=True,
                       process_input=None, started_deferred=None,
                       terminate_on_stderr=False):
    if process_reactor is None:
        process_reactor = reactor
    args = args and args or ()
    env = env and env and {}
    deferred = defer.Deferred()
    cmd = executable
    if args:
        cmd = " ".join([cmd] + args)
    logging.debug("Running cmd: %s", cmd)
    process_handler = BackRelayWithInput(
            deferred,
            cmd,
            started_deferred=started_deferred,
            check_exit_code=check_exit_code,
            process_input=process_input,
            terminate_on_stderr=terminate_on_stderr)
    # NOTE(vish): commands come in as unicode, but self.executes needs
    #             strings or process.spawn raises a deprecation warning
    executable = str(executable)
    if not args is None:
        args = [str(x) for x in args]
    process_reactor.spawnProcess(process_handler, executable,
                                 (executable,)+tuple(args), env, path)
    return deferred


class ProcessPool(object):
    """ A simple process pool implementation using Twisted's Process bits.

    This is pretty basic right now, but hopefully the API will be the correct
    one so that it can be optimized later.
    """
    def __init__(self, size=None):
        self.size = size and size or FLAGS.process_pool_size
        self._pool = defer.DeferredSemaphore(self.size)

    def simple_execute(self, cmd, **kw):
        """ Weak emulation of the old utils.execute() function.

        This only exists as a way to quickly move old execute methods to
        this new style of code.

        NOTE(termie): This will break on args with spaces in them.
        """
        parsed = cmd.split(' ')
        executable, args = parsed[0], parsed[1:]
        return self.execute(executable, args, **kw)

    def execute(self, *args, **kw):
        deferred = self._pool.acquire()

        def _associate_process(proto):
            deferred.process = proto.transport
            return proto.transport

        started = defer.Deferred()
        started.addCallback(_associate_process)
        kw.setdefault('started_deferred', started)

        deferred.process = None
        deferred.started = started

        deferred.addCallback(lambda _: get_process_output(*args, **kw))
        deferred.addBoth(self._release)
        return deferred

    def _release(self, retval=None):
        self._pool.release()
        return retval


class SharedPool(object):
    _instance = None
    def __init__(self):
        if SharedPool._instance is None:
            self.__class__._instance = ProcessPool()
    def __getattr__(self, key):
        return getattr(self._instance, key)


def simple_execute(cmd, **kwargs):
    return SharedPool().simple_execute(cmd, **kwargs)