diff options
Diffstat (limited to 'nova/process.py')
-rw-r--r-- | nova/process.py | 95 |
1 files changed, 47 insertions, 48 deletions
diff --git a/nova/process.py b/nova/process.py index 425d9f162..74725c157 100644 --- a/nova/process.py +++ b/nova/process.py @@ -18,9 +18,10 @@ # under the License. """ -Process pool, still buggy right now. +Process pool using twisted threading """ +import logging import StringIO from twisted.internet import defer @@ -29,30 +30,14 @@ from twisted.internet import protocol from twisted.internet import reactor from nova import flags +from nova.utils import ProcessExecutionError FLAGS = flags.FLAGS flags.DEFINE_integer('process_pool_size', 4, 'Number of processes to use in the process pool') - -# NOTE(termie): this is copied from twisted.internet.utils but since -# they don't export it I've copied and modified -class UnexpectedErrorOutput(IOError): - """ - Standard error data was received where it was not expected. This is a - subclass of L{IOError} to preserve backward compatibility with the previous - error behavior of L{getProcessOutput}. - - @ivar processEnded: A L{Deferred} which will fire when the process which - produced the data on stderr has ended (exited and all file descriptors - closed). - """ - def __init__(self, stdout=None, stderr=None): - IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr)) - - -# This is based on _BackRelay from twister.internal.utils, but modified to -# capture both stdout and stderr, without odd stderr handling, and also to +# This is based on _BackRelay from twister.internal.utils, but modified to +# capture both stdout and stderr, without odd stderr handling, and also to # handle stdin class BackRelayWithInput(protocol.ProcessProtocol): """ @@ -62,22 +47,23 @@ class BackRelayWithInput(protocol.ProcessProtocol): @ivar deferred: A L{Deferred} which will be called back with all of stdout and all of stderr as well (as a tuple). C{terminate_on_stderr} is true and any bytes are received over stderr, this will fire with an - L{_UnexpectedErrorOutput} instance and the attribute will be set to + L{_ProcessExecutionError} instance and the attribute will be set to C{None}. - @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are - received over stderr, this attribute will refer to a L{Deferred} which - will be called back when the process ends. This C{Deferred} is also - associated with the L{_UnexpectedErrorOutput} which C{deferred} fires - with earlier in this case so that users can determine when the process + @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are + received over stderr, this attribute will refer to a L{Deferred} which + will be called back when the process ends. This C{Deferred} is also + associated with the L{_ProcessExecutionError} which C{deferred} fires + with earlier in this case so that users can determine when the process has actually ended, in addition to knowing when bytes have been received via stderr. """ - def __init__(self, deferred, started_deferred=None, - terminate_on_stderr=False, check_exit_code=True, - process_input=None): + def __init__(self, deferred, cmd, started_deferred=None, + terminate_on_stderr=False, check_exit_code=True, + process_input=None): self.deferred = deferred + self.cmd = cmd self.stdout = StringIO.StringIO() self.stderr = StringIO.StringIO() self.started_deferred = started_deferred @@ -85,14 +71,18 @@ class BackRelayWithInput(protocol.ProcessProtocol): self.check_exit_code = check_exit_code self.process_input = process_input self.on_process_ended = None - + + def _build_execution_error(self, exit_code=None): + return ProcessExecutionError(cmd=self.cmd, + exit_code=exit_code, + stdout=self.stdout.getvalue(), + stderr=self.stderr.getvalue()) + def errReceived(self, text): self.stderr.write(text) if self.terminate_on_stderr and (self.deferred is not None): self.on_process_ended = defer.Deferred() - self.deferred.errback(UnexpectedErrorOutput( - stdout=self.stdout.getvalue(), - stderr=self.stderr.getvalue())) + self.deferred.errback(self._build_execution_error()) self.deferred = None self.transport.loseConnection() @@ -102,15 +92,19 @@ class BackRelayWithInput(protocol.ProcessProtocol): def processEnded(self, reason): if self.deferred is not None: stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue() - try: - if self.check_exit_code: - reason.trap(error.ProcessDone) - self.deferred.callback((stdout, stderr)) - except: - # NOTE(justinsb): This logic is a little suspicious to me... - # If the callback throws an exception, then errback will be - # called also. However, this is what the unit tests test for... - self.deferred.errback(UnexpectedErrorOutput(stdout, stderr)) + exit_code = reason.value.exitCode + if self.check_exit_code and exit_code <> 0: + self.deferred.errback(self._build_execution_error(exit_code)) + else: + try: + if self.check_exit_code: + reason.trap(error.ProcessDone) + self.deferred.callback((stdout, stderr)) + except: + # NOTE(justinsb): This logic is a little suspicious to me... + # If the callback throws an exception, then errback will be + # called also. However, this is what the unit tests test for... + self.deferred.errback(self._build_execution_error(exit_code)) elif self.on_process_ended is not None: self.on_process_ended.errback(reason) @@ -122,8 +116,8 @@ class BackRelayWithInput(protocol.ProcessProtocol): self.transport.write(self.process_input) self.transport.closeStdin() -def get_process_output(executable, args=None, env=None, path=None, - process_reactor=None, check_exit_code=True, +def get_process_output(executable, args=None, env=None, path=None, + process_reactor=None, check_exit_code=True, process_input=None, started_deferred=None, terminate_on_stderr=False): if process_reactor is None: @@ -131,10 +125,15 @@ def get_process_output(executable, args=None, env=None, path=None, args = args and args or () env = env and env and {} deferred = defer.Deferred() + cmd = executable + if args: + cmd = cmd + " " + ' '.join(args) + logging.debug("Running cmd: %s", cmd) process_handler = BackRelayWithInput( - deferred, - started_deferred=started_deferred, - check_exit_code=check_exit_code, + deferred, + cmd, + started_deferred=started_deferred, + check_exit_code=check_exit_code, process_input=process_input, terminate_on_stderr=terminate_on_stderr) # NOTE(vish): commands come in as unicode, but self.executes needs @@ -142,7 +141,7 @@ def get_process_output(executable, args=None, env=None, path=None, executable = str(executable) if not args is None: args = [str(x) for x in args] - process_reactor.spawnProcess( process_handler, executable, + process_reactor.spawnProcess( process_handler, executable, (executable,)+tuple(args), env, path) return deferred |