diff options
author | andy <github@anarkystic.com> | 2010-06-11 10:04:06 +0100 |
---|---|---|
committer | andy <github@anarkystic.com> | 2010-06-11 10:04:06 +0100 |
commit | b07af87974052abcbb12c0531b22fe9be416a498 (patch) | |
tree | 1e2b4c79622ba6accd9b786298988c317fffb86b | |
parent | 3b594c7cd3c8054ca3b210198162d895aacee179 (diff) | |
download | nova-b07af87974052abcbb12c0531b22fe9be416a498.tar.gz nova-b07af87974052abcbb12c0531b22fe9be416a498.tar.xz nova-b07af87974052abcbb12c0531b22fe9be416a498.zip |
Adds a Twisted implementation of a process pool
Meant for use instead of utils.execute()
-rw-r--r-- | nova/process.py | 113 | ||||
-rw-r--r-- | nova/tests/process_unittest.py | 115 | ||||
-rw-r--r-- | run_tests.py | 1 |
3 files changed, 223 insertions, 6 deletions
diff --git a/nova/process.py b/nova/process.py index 754728fdf..b114146ce 100644 --- a/nova/process.py +++ b/nova/process.py @@ -19,15 +19,41 @@ Process pool, still buggy right now. import logging import multiprocessing +import StringIO from nova import vendor from twisted.internet import defer -from twisted.internet import reactor +from twisted.internet import error +from twisted.internet import process from twisted.internet import protocol +from twisted.internet import reactor from twisted.internet import threads +from twisted.python import failure + +from nova import flags + +FLAGS = flags.FLAGS +flags.DEFINE_integer('process_pool_size', 4, + 'Number of processes to use in the process pool') + # NOTE(termie): this is copied from twisted.internet.utils but since -# they don't export it I've copied. +# they don't export it I've copied and modified +class UnexpectedErrorOutput(IOError): + """ + Standard error data was received where it was not expected. This is a + subclass of L{IOError} to preserve backward compatibility with the previous + error behavior of L{getProcessOutput}. + + @ivar processEnded: A L{Deferred} which will fire when the process which + produced the data on stderr has ended (exited and all file descriptors + closed). + """ + def __init__(self, stdout=None, stderr=None): + IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr)) + + +# NOTE(termie): this too class _BackRelay(protocol.ProcessProtocol): """ Trivial protocol for communicating with a process and turning its output @@ -77,28 +103,103 @@ class _BackRelay(protocol.ProcessProtocol): class BackRelayWithInput(_BackRelay): - def __init__(self, deferred, errortoo=0, input=None): - super(BackRelayWithInput, self).__init__(deferred, errortoo) + def __init__(self, deferred, startedDeferred=None, error_ok=0, + input=None): + # Twisted doesn't use new-style classes in most places :( + _BackRelay.__init__(self, deferred, errortoo=error_ok) + self.error_ok = error_ok self.input = input + self.stderr = StringIO.StringIO() + self.startedDeferred = startedDeferred + def errReceivedIsBad(self, text): + self.stderr.write(text) + self.transport.loseConnection() + + def errReceivedIsGood(self, text): + self.stderr.write(text) + def connectionMade(self): + if self.startedDeferred: + self.startedDeferred.callback(self) if self.input: self.transport.write(self.input) self.transport.closeStdin() + def processEnded(self, reason): + if self.deferred is not None: + stdout, stderr = self.s.getvalue(), self.stderr.getvalue() + try: + # NOTE(termie): current behavior means if error_ok is True + # we won't throw an error even if the process + # exited with a non-0 status, so you can't be + # okay with stderr output and not with bad exit + # codes. + if not self.error_ok: + reason.trap(error.ProcessDone) + self.deferred.callback((stdout, stderr)) + except: + self.deferred.errback(UnexpectedErrorOutput(stdout, stderr)) + def getProcessOutput(executable, args=None, env=None, path=None, reactor=None, - errortoo=0, input=None): + error_ok=0, input=None, startedDeferred=None): if reactor is None: from twisted.internet import reactor args = args and args or () env = env and env and {} d = defer.Deferred() - p = BackRelayWithInput(d, errortoo=errortoo, input=input) + p = BackRelayWithInput( + d, startedDeferred=startedDeferred, error_ok=error_ok, input=input) reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path) return d +class ProcessPool(object): + """ A simple process pool implementation using Twisted's Process bits. + + This is pretty basic right now, but hopefully the API will be the correct + one so that it can be optimized later. + """ + def __init__(self, size=None): + self.size = size and size or FLAGS.process_pool_size + self._pool = defer.DeferredSemaphore(self.size) + + def simpleExecute(self, cmd, **kw): + """ Weak emulation of the old utils.execute() function. + + This only exists as a way to quickly move old execute methods to + this new style of code. + + NOTE(termie): This will break on args with spaces in them. + """ + parsed = cmd.split(' ') + executable, args = parsed[0], parsed[1:] + return self.execute(executable, args, **kw) + + def execute(self, *args, **kw): + d = self._pool.acquire() + + def _associateProcess(proto): + d.process = proto.transport + return proto.transport + + started = defer.Deferred() + started.addCallback(_associateProcess) + kw.setdefault('startedDeferred', started) + + d.process = None + d.started = started + + d.addCallback(lambda _: getProcessOutput(*args, **kw)) + d.addBoth(self._release) + return d + + def _release(self, rv=None): + self._pool.release() + return rv + + class Pool(object): """ A simple process pool implementation around mutliprocessing. diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py new file mode 100644 index 000000000..50368dd3f --- /dev/null +++ b/nova/tests/process_unittest.py @@ -0,0 +1,115 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 +# Copyright [2010] [Anso Labs, LLC] +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from xml.etree import ElementTree + +from nova import vendor +from twisted.internet import defer +from twisted.internet import reactor + +from nova import exception +from nova import flags +from nova import process +from nova import test +from nova import utils + +FLAGS = flags.FLAGS + + +class ProcessTestCase(test.TrialTestCase): + def setUp(self): + logging.getLogger().setLevel(logging.DEBUG) + super(ProcessTestCase, self).setUp() + + def test_execute_stdout(self): + pool = process.ProcessPool(2) + d = pool.simpleExecute('echo test') + def _check(rv): + self.assertEqual(rv[0], 'test\n') + self.assertEqual(rv[1], '') + + d.addCallback(_check) + d.addErrback(self.fail) + return d + + def test_execute_stderr(self): + pool = process.ProcessPool(2) + d = pool.simpleExecute('cat BAD_FILE', error_ok=1) + def _check(rv): + self.assertEqual(rv[0], '') + self.assert_('No such file' in rv[1]) + + d.addCallback(_check) + d.addErrback(self.fail) + return d + + def test_execute_unexpected_stderr(self): + pool = process.ProcessPool(2) + d = pool.simpleExecute('cat BAD_FILE') + d.addCallback(lambda x: self.fail('should have raised an error')) + d.addErrback(lambda failure: failure.trap(IOError)) + return d + + def test_max_processes(self): + pool = process.ProcessPool(2) + d1 = pool.simpleExecute('sleep 0.01') + d2 = pool.simpleExecute('sleep 0.01') + d3 = pool.simpleExecute('sleep 0.005') + d4 = pool.simpleExecute('sleep 0.005') + + called = [] + def _called(rv, name): + called.append(name) + + d1.addCallback(_called, 'd1') + d2.addCallback(_called, 'd2') + d3.addCallback(_called, 'd3') + d4.addCallback(_called, 'd4') + + # Make sure that d3 and d4 had to wait on the other two and were called + # in order + # NOTE(termie): there may be a race condition in this test if for some + # reason one of the sleeps takes longer to complete + # than it should + d4.addCallback(lambda x: self.assertEqual(called[2], 'd3')) + d4.addCallback(lambda x: self.assertEqual(called[3], 'd4')) + d4.addErrback(self.fail) + return d4 + + def test_kill_long_process(self): + pool = process.ProcessPool(2) + + d1 = pool.simpleExecute('sleep 1') + d2 = pool.simpleExecute('sleep 0.005') + + timeout = reactor.callLater(0.1, self.fail, 'should have been killed') + + # kill d1 and wait on it to end then cancel the timeout + d2.addCallback(lambda _: d1.process.signalProcess('KILL')) + d2.addCallback(lambda _: d1) + d2.addBoth(lambda _: timeout.active() and timeout.cancel()) + d2.addErrback(self.fail) + return d2 + + def test_process_exit_is_contained(self): + pool = process.ProcessPool(2) + + d1 = pool.simpleExecute('sleep 1') + d1.addCallback(lambda x: self.fail('should have errbacked')) + d1.addErrback(lambda fail: fail.trap(IOError)) + reactor.callLater(0.05, d1.process.signalProcess, 'KILL') + + return d1 diff --git a/run_tests.py b/run_tests.py index 886ab4bd0..f80f0af16 100644 --- a/run_tests.py +++ b/run_tests.py @@ -50,6 +50,7 @@ from nova.tests.keeper_unittest import * from nova.tests.network_unittest import * from nova.tests.node_unittest import * from nova.tests.objectstore_unittest import * +from nova.tests.process_unittest import * from nova.tests.storage_unittest import * from nova.tests.users_unittest import * from nova.tests.datastore_unittest import * |