diff options
| author | jaypipes@gmail.com <> | 2010-08-30 10:36:59 -0400 |
|---|---|---|
| committer | jaypipes@gmail.com <> | 2010-08-30 10:36:59 -0400 |
| commit | a1791cdca8dbca8f9bf3555b21324503aba58fda (patch) | |
| tree | 12f297f1616172ca7e4bce76ecac1dcd737c83af /nova/process.py | |
| parent | bf2549282067a7a824ea97e66a5b2f0ca06416bd (diff) | |
| parent | 5f14a7955b9ef90afed91bda0343130d83e15a73 (diff) | |
| download | nova-a1791cdca8dbca8f9bf3555b21324503aba58fda.tar.gz nova-a1791cdca8dbca8f9bf3555b21324503aba58fda.tar.xz nova-a1791cdca8dbca8f9bf3555b21324503aba58fda.zip | |
Resolve conflicts and merge trunk
Diffstat (limited to 'nova/process.py')
| -rw-r--r-- | nova/process.py | 174 |
1 files changed, 81 insertions, 93 deletions
diff --git a/nova/process.py b/nova/process.py index 2dc56372f..425d9f162 100644 --- a/nova/process.py +++ b/nova/process.py @@ -2,6 +2,7 @@ # Copyright 2010 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. +# Copyright 2010 FathomDB Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -20,16 +21,12 @@ Process pool, still buggy right now. """ -import logging -import multiprocessing import StringIO + from twisted.internet import defer from twisted.internet import error -from twisted.internet import process from twisted.internet import protocol from twisted.internet import reactor -from twisted.internet import threads -from twisted.python import failure from nova import flags @@ -54,111 +51,100 @@ class UnexpectedErrorOutput(IOError): IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr)) -# NOTE(termie): this too -class _BackRelay(protocol.ProcessProtocol): +# This is based on _BackRelay from twister.internal.utils, but modified to +# capture both stdout and stderr, without odd stderr handling, and also to +# handle stdin +class BackRelayWithInput(protocol.ProcessProtocol): """ Trivial protocol for communicating with a process and turning its output into the result of a L{Deferred}. @ivar deferred: A L{Deferred} which will be called back with all of stdout - and, if C{errortoo} is true, all of stderr as well (mixed together in - one string). If C{errortoo} is false and any bytes are received over - stderr, this will fire with an L{_UnexpectedErrorOutput} instance and - the attribute will be set to C{None}. - - @ivar onProcessEnded: If C{errortoo} is false and bytes are received over - stderr, this attribute will refer to a L{Deferred} which will be called - back when the process ends. This C{Deferred} is also associated with - the L{_UnexpectedErrorOutput} which C{deferred} fires with earlier in - this case so that users can determine when the process has actually - ended, in addition to knowing when bytes have been received via stderr. + and all of stderr as well (as a tuple). C{terminate_on_stderr} is true + and any bytes are received over stderr, this will fire with an + L{_UnexpectedErrorOutput} instance and the attribute will be set to + C{None}. + + @ivar onProcessEnded: If C{terminate_on_stderr} is false and bytes are + received over stderr, this attribute will refer to a L{Deferred} which + will be called back when the process ends. This C{Deferred} is also + associated with the L{_UnexpectedErrorOutput} which C{deferred} fires + with earlier in this case so that users can determine when the process + has actually ended, in addition to knowing when bytes have been received + via stderr. """ - def __init__(self, deferred, errortoo=0): + def __init__(self, deferred, started_deferred=None, + terminate_on_stderr=False, check_exit_code=True, + process_input=None): self.deferred = deferred - self.s = StringIO.StringIO() - if errortoo: - self.errReceived = self.errReceivedIsGood - else: - self.errReceived = self.errReceivedIsBad - - def errReceivedIsBad(self, text): - if self.deferred is not None: - self.onProcessEnded = defer.Deferred() - err = UnexpectedErrorOutput(text, self.onProcessEnded) - self.deferred.errback(failure.Failure(err)) + self.stdout = StringIO.StringIO() + self.stderr = StringIO.StringIO() + self.started_deferred = started_deferred + self.terminate_on_stderr = terminate_on_stderr + self.check_exit_code = check_exit_code + self.process_input = process_input + self.on_process_ended = None + + def errReceived(self, text): + self.stderr.write(text) + if self.terminate_on_stderr and (self.deferred is not None): + self.on_process_ended = defer.Deferred() + self.deferred.errback(UnexpectedErrorOutput( + stdout=self.stdout.getvalue(), + stderr=self.stderr.getvalue())) self.deferred = None self.transport.loseConnection() - def errReceivedIsGood(self, text): - self.s.write(text) - def outReceived(self, text): - self.s.write(text) - - def processEnded(self, reason): - if self.deferred is not None: - self.deferred.callback(self.s.getvalue()) - elif self.onProcessEnded is not None: - self.onProcessEnded.errback(reason) - - -class BackRelayWithInput(_BackRelay): - def __init__(self, deferred, startedDeferred=None, error_ok=0, - input=None): - # Twisted doesn't use new-style classes in most places :( - _BackRelay.__init__(self, deferred, errortoo=error_ok) - self.error_ok = error_ok - self.input = input - self.stderr = StringIO.StringIO() - self.startedDeferred = startedDeferred - - def errReceivedIsBad(self, text): - self.stderr.write(text) - self.transport.loseConnection() - - def errReceivedIsGood(self, text): - self.stderr.write(text) - - def connectionMade(self): - if self.startedDeferred: - self.startedDeferred.callback(self) - if self.input: - self.transport.write(self.input) - self.transport.closeStdin() + self.stdout.write(text) def processEnded(self, reason): if self.deferred is not None: - stdout, stderr = self.s.getvalue(), self.stderr.getvalue() + stdout, stderr = self.stdout.getvalue(), self.stderr.getvalue() try: - # NOTE(termie): current behavior means if error_ok is True - # we won't throw an error even if the process - # exited with a non-0 status, so you can't be - # okay with stderr output and not with bad exit - # codes. - if not self.error_ok: + if self.check_exit_code: reason.trap(error.ProcessDone) self.deferred.callback((stdout, stderr)) except: + # NOTE(justinsb): This logic is a little suspicious to me... + # If the callback throws an exception, then errback will be + # called also. However, this is what the unit tests test for... self.deferred.errback(UnexpectedErrorOutput(stdout, stderr)) + elif self.on_process_ended is not None: + self.on_process_ended.errback(reason) -def getProcessOutput(executable, args=None, env=None, path=None, reactor=None, - error_ok=0, input=None, startedDeferred=None): - if reactor is None: - from twisted.internet import reactor + def connectionMade(self): + if self.started_deferred: + self.started_deferred.callback(self) + if self.process_input: + self.transport.write(self.process_input) + self.transport.closeStdin() + +def get_process_output(executable, args=None, env=None, path=None, + process_reactor=None, check_exit_code=True, + process_input=None, started_deferred=None, + terminate_on_stderr=False): + if process_reactor is None: + process_reactor = reactor args = args and args or () env = env and env and {} - d = defer.Deferred() - p = BackRelayWithInput( - d, startedDeferred=startedDeferred, error_ok=error_ok, input=input) + deferred = defer.Deferred() + process_handler = BackRelayWithInput( + deferred, + started_deferred=started_deferred, + check_exit_code=check_exit_code, + process_input=process_input, + terminate_on_stderr=terminate_on_stderr) # NOTE(vish): commands come in as unicode, but self.executes needs # strings or process.spawn raises a deprecation warning executable = str(executable) if not args is None: args = [str(x) for x in args] - reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) - return d + process_reactor.spawnProcess( process_handler, executable, + (executable,)+tuple(args), env, path) + return deferred class ProcessPool(object): @@ -184,26 +170,27 @@ class ProcessPool(object): return self.execute(executable, args, **kw) def execute(self, *args, **kw): - d = self._pool.acquire() + deferred = self._pool.acquire() - def _associateProcess(proto): - d.process = proto.transport + def _associate_process(proto): + deferred.process = proto.transport return proto.transport started = defer.Deferred() - started.addCallback(_associateProcess) - kw.setdefault('startedDeferred', started) + started.addCallback(_associate_process) + kw.setdefault('started_deferred', started) - d.process = None - d.started = started + deferred.process = None + deferred.started = started - d.addCallback(lambda _: getProcessOutput(*args, **kw)) - d.addBoth(self._release) - return d + deferred.addCallback(lambda _: get_process_output(*args, **kw)) + deferred.addBoth(self._release) + return deferred - def _release(self, rv=None): + def _release(self, retval=None): self._pool.release() - return rv + return retval + class SharedPool(object): _instance = None @@ -213,5 +200,6 @@ class SharedPool(object): def __getattr__(self, key): return getattr(self._instance, key) + def simple_execute(cmd, **kwargs): return SharedPool().simple_execute(cmd, **kwargs) |
