summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandy <github@anarkystic.com>2010-06-11 10:04:06 +0100
committerandy <github@anarkystic.com>2010-06-11 10:04:06 +0100
commitb07af87974052abcbb12c0531b22fe9be416a498 (patch)
tree1e2b4c79622ba6accd9b786298988c317fffb86b
parent3b594c7cd3c8054ca3b210198162d895aacee179 (diff)
downloadnova-b07af87974052abcbb12c0531b22fe9be416a498.tar.gz
nova-b07af87974052abcbb12c0531b22fe9be416a498.tar.xz
nova-b07af87974052abcbb12c0531b22fe9be416a498.zip
Adds a Twisted implementation of a process pool
Meant for use instead of utils.execute()
-rw-r--r--nova/process.py113
-rw-r--r--nova/tests/process_unittest.py115
-rw-r--r--run_tests.py1
3 files changed, 223 insertions, 6 deletions
diff --git a/nova/process.py b/nova/process.py
index 754728fdf..b114146ce 100644
--- a/nova/process.py
+++ b/nova/process.py
@@ -19,15 +19,41 @@ Process pool, still buggy right now.
import logging
import multiprocessing
+import StringIO
from nova import vendor
from twisted.internet import defer
-from twisted.internet import reactor
+from twisted.internet import error
+from twisted.internet import process
from twisted.internet import protocol
+from twisted.internet import reactor
from twisted.internet import threads
+from twisted.python import failure
+
+from nova import flags
+
+FLAGS = flags.FLAGS
+flags.DEFINE_integer('process_pool_size', 4,
+ 'Number of processes to use in the process pool')
+
# NOTE(termie): this is copied from twisted.internet.utils but since
-# they don't export it I've copied.
+# they don't export it I've copied and modified
+class UnexpectedErrorOutput(IOError):
+ """
+ Standard error data was received where it was not expected. This is a
+ subclass of L{IOError} to preserve backward compatibility with the previous
+ error behavior of L{getProcessOutput}.
+
+ @ivar processEnded: A L{Deferred} which will fire when the process which
+ produced the data on stderr has ended (exited and all file descriptors
+ closed).
+ """
+ def __init__(self, stdout=None, stderr=None):
+ IOError.__init__(self, "got stdout: %r\nstderr: %r" % (stdout, stderr))
+
+
+# NOTE(termie): this too
class _BackRelay(protocol.ProcessProtocol):
"""
Trivial protocol for communicating with a process and turning its output
@@ -77,28 +103,103 @@ class _BackRelay(protocol.ProcessProtocol):
class BackRelayWithInput(_BackRelay):
- def __init__(self, deferred, errortoo=0, input=None):
- super(BackRelayWithInput, self).__init__(deferred, errortoo)
+ def __init__(self, deferred, startedDeferred=None, error_ok=0,
+ input=None):
+ # Twisted doesn't use new-style classes in most places :(
+ _BackRelay.__init__(self, deferred, errortoo=error_ok)
+ self.error_ok = error_ok
self.input = input
+ self.stderr = StringIO.StringIO()
+ self.startedDeferred = startedDeferred
+ def errReceivedIsBad(self, text):
+ self.stderr.write(text)
+ self.transport.loseConnection()
+
+ def errReceivedIsGood(self, text):
+ self.stderr.write(text)
+
def connectionMade(self):
+ if self.startedDeferred:
+ self.startedDeferred.callback(self)
if self.input:
self.transport.write(self.input)
self.transport.closeStdin()
+ def processEnded(self, reason):
+ if self.deferred is not None:
+ stdout, stderr = self.s.getvalue(), self.stderr.getvalue()
+ try:
+ # NOTE(termie): current behavior means if error_ok is True
+ # we won't throw an error even if the process
+ # exited with a non-0 status, so you can't be
+ # okay with stderr output and not with bad exit
+ # codes.
+ if not self.error_ok:
+ reason.trap(error.ProcessDone)
+ self.deferred.callback((stdout, stderr))
+ except:
+ self.deferred.errback(UnexpectedErrorOutput(stdout, stderr))
+
def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
- errortoo=0, input=None):
+ error_ok=0, input=None, startedDeferred=None):
if reactor is None:
from twisted.internet import reactor
args = args and args or ()
env = env and env and {}
d = defer.Deferred()
- p = BackRelayWithInput(d, errortoo=errortoo, input=input)
+ p = BackRelayWithInput(
+ d, startedDeferred=startedDeferred, error_ok=error_ok, input=input)
reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
return d
+class ProcessPool(object):
+ """ A simple process pool implementation using Twisted's Process bits.
+
+ This is pretty basic right now, but hopefully the API will be the correct
+ one so that it can be optimized later.
+ """
+ def __init__(self, size=None):
+ self.size = size and size or FLAGS.process_pool_size
+ self._pool = defer.DeferredSemaphore(self.size)
+
+ def simpleExecute(self, cmd, **kw):
+ """ Weak emulation of the old utils.execute() function.
+
+ This only exists as a way to quickly move old execute methods to
+ this new style of code.
+
+ NOTE(termie): This will break on args with spaces in them.
+ """
+ parsed = cmd.split(' ')
+ executable, args = parsed[0], parsed[1:]
+ return self.execute(executable, args, **kw)
+
+ def execute(self, *args, **kw):
+ d = self._pool.acquire()
+
+ def _associateProcess(proto):
+ d.process = proto.transport
+ return proto.transport
+
+ started = defer.Deferred()
+ started.addCallback(_associateProcess)
+ kw.setdefault('startedDeferred', started)
+
+ d.process = None
+ d.started = started
+
+ d.addCallback(lambda _: getProcessOutput(*args, **kw))
+ d.addBoth(self._release)
+ return d
+
+ def _release(self, rv=None):
+ self._pool.release()
+ return rv
+
+
class Pool(object):
""" A simple process pool implementation around mutliprocessing.
diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py
new file mode 100644
index 000000000..50368dd3f
--- /dev/null
+++ b/nova/tests/process_unittest.py
@@ -0,0 +1,115 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright [2010] [Anso Labs, LLC]
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from xml.etree import ElementTree
+
+from nova import vendor
+from twisted.internet import defer
+from twisted.internet import reactor
+
+from nova import exception
+from nova import flags
+from nova import process
+from nova import test
+from nova import utils
+
+FLAGS = flags.FLAGS
+
+
+class ProcessTestCase(test.TrialTestCase):
+ def setUp(self):
+ logging.getLogger().setLevel(logging.DEBUG)
+ super(ProcessTestCase, self).setUp()
+
+ def test_execute_stdout(self):
+ pool = process.ProcessPool(2)
+ d = pool.simpleExecute('echo test')
+ def _check(rv):
+ self.assertEqual(rv[0], 'test\n')
+ self.assertEqual(rv[1], '')
+
+ d.addCallback(_check)
+ d.addErrback(self.fail)
+ return d
+
+ def test_execute_stderr(self):
+ pool = process.ProcessPool(2)
+ d = pool.simpleExecute('cat BAD_FILE', error_ok=1)
+ def _check(rv):
+ self.assertEqual(rv[0], '')
+ self.assert_('No such file' in rv[1])
+
+ d.addCallback(_check)
+ d.addErrback(self.fail)
+ return d
+
+ def test_execute_unexpected_stderr(self):
+ pool = process.ProcessPool(2)
+ d = pool.simpleExecute('cat BAD_FILE')
+ d.addCallback(lambda x: self.fail('should have raised an error'))
+ d.addErrback(lambda failure: failure.trap(IOError))
+ return d
+
+ def test_max_processes(self):
+ pool = process.ProcessPool(2)
+ d1 = pool.simpleExecute('sleep 0.01')
+ d2 = pool.simpleExecute('sleep 0.01')
+ d3 = pool.simpleExecute('sleep 0.005')
+ d4 = pool.simpleExecute('sleep 0.005')
+
+ called = []
+ def _called(rv, name):
+ called.append(name)
+
+ d1.addCallback(_called, 'd1')
+ d2.addCallback(_called, 'd2')
+ d3.addCallback(_called, 'd3')
+ d4.addCallback(_called, 'd4')
+
+ # Make sure that d3 and d4 had to wait on the other two and were called
+ # in order
+ # NOTE(termie): there may be a race condition in this test if for some
+ # reason one of the sleeps takes longer to complete
+ # than it should
+ d4.addCallback(lambda x: self.assertEqual(called[2], 'd3'))
+ d4.addCallback(lambda x: self.assertEqual(called[3], 'd4'))
+ d4.addErrback(self.fail)
+ return d4
+
+ def test_kill_long_process(self):
+ pool = process.ProcessPool(2)
+
+ d1 = pool.simpleExecute('sleep 1')
+ d2 = pool.simpleExecute('sleep 0.005')
+
+ timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
+
+ # kill d1 and wait on it to end then cancel the timeout
+ d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
+ d2.addCallback(lambda _: d1)
+ d2.addBoth(lambda _: timeout.active() and timeout.cancel())
+ d2.addErrback(self.fail)
+ return d2
+
+ def test_process_exit_is_contained(self):
+ pool = process.ProcessPool(2)
+
+ d1 = pool.simpleExecute('sleep 1')
+ d1.addCallback(lambda x: self.fail('should have errbacked'))
+ d1.addErrback(lambda fail: fail.trap(IOError))
+ reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
+
+ return d1
diff --git a/run_tests.py b/run_tests.py
index 886ab4bd0..f80f0af16 100644
--- a/run_tests.py
+++ b/run_tests.py
@@ -50,6 +50,7 @@ from nova.tests.keeper_unittest import *
from nova.tests.network_unittest import *
from nova.tests.node_unittest import *
from nova.tests.objectstore_unittest import *
+from nova.tests.process_unittest import *
from nova.tests.storage_unittest import *
from nova.tests.users_unittest import *
from nova.tests.datastore_unittest import *