summaryrefslogtreecommitdiffstats
path: root/nova/process.py
diff options
context:
space:
mode:
authorJesse Andrews <anotherjesse@gmail.com>2010-05-27 23:05:26 -0700
committerJesse Andrews <anotherjesse@gmail.com>2010-05-27 23:05:26 -0700
commitbf6e6e718cdc7488e2da87b21e258ccc065fe499 (patch)
tree51cf4f72047eb6b16079c7fe21e9822895541801 /nova/process.py
downloadnova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.tar.gz
nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.tar.xz
nova-bf6e6e718cdc7488e2da87b21e258ccc065fe499.zip
initial commit
Diffstat (limited to 'nova/process.py')
-rw-r--r--nova/process.py131
1 files changed, 131 insertions, 0 deletions
diff --git a/nova/process.py b/nova/process.py
new file mode 100644
index 000000000..754728fdf
--- /dev/null
+++ b/nova/process.py
@@ -0,0 +1,131 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+# Copyright [2010] [Anso Labs, LLC]
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Process pool, still buggy right now.
+"""
+
+import logging
+import multiprocessing
+
+from nova import vendor
+from twisted.internet import defer
+from twisted.internet import reactor
+from twisted.internet import protocol
+from twisted.internet import threads
+
+# NOTE(termie): this is copied from twisted.internet.utils but since
+# they don't export it I've copied.
+class _BackRelay(protocol.ProcessProtocol):
+ """
+ Trivial protocol for communicating with a process and turning its output
+ into the result of a L{Deferred}.
+
+ @ivar deferred: A L{Deferred} which will be called back with all of stdout
+ and, if C{errortoo} is true, all of stderr as well (mixed together in
+ one string). If C{errortoo} is false and any bytes are received over
+ stderr, this will fire with an L{_UnexpectedErrorOutput} instance and
+ the attribute will be set to C{None}.
+
+ @ivar onProcessEnded: If C{errortoo} is false and bytes are received over
+ stderr, this attribute will refer to a L{Deferred} which will be called
+ back when the process ends. This C{Deferred} is also associated with
+ the L{_UnexpectedErrorOutput} which C{deferred} fires with earlier in
+ this case so that users can determine when the process has actually
+ ended, in addition to knowing when bytes have been received via stderr.
+ """
+
+ def __init__(self, deferred, errortoo=0):
+ self.deferred = deferred
+ self.s = StringIO.StringIO()
+ if errortoo:
+ self.errReceived = self.errReceivedIsGood
+ else:
+ self.errReceived = self.errReceivedIsBad
+
+ def errReceivedIsBad(self, text):
+ if self.deferred is not None:
+ self.onProcessEnded = defer.Deferred()
+ err = _UnexpectedErrorOutput(text, self.onProcessEnded)
+ self.deferred.errback(failure.Failure(err))
+ self.deferred = None
+ self.transport.loseConnection()
+
+ def errReceivedIsGood(self, text):
+ self.s.write(text)
+
+ def outReceived(self, text):
+ self.s.write(text)
+
+ def processEnded(self, reason):
+ if self.deferred is not None:
+ self.deferred.callback(self.s.getvalue())
+ elif self.onProcessEnded is not None:
+ self.onProcessEnded.errback(reason)
+
+
+class BackRelayWithInput(_BackRelay):
+ def __init__(self, deferred, errortoo=0, input=None):
+ super(BackRelayWithInput, self).__init__(deferred, errortoo)
+ self.input = input
+
+ def connectionMade(self):
+ if self.input:
+ self.transport.write(self.input)
+ self.transport.closeStdin()
+
+
+def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
+ errortoo=0, input=None):
+ if reactor is None:
+ from twisted.internet import reactor
+ args = args and args or ()
+ env = env and env and {}
+ d = defer.Deferred()
+ p = BackRelayWithInput(d, errortoo=errortoo, input=input)
+ reactor.spawnProcess(p, executable, (executable,)+tuple(args), env, path)
+ return d
+
+
+class Pool(object):
+ """ A simple process pool implementation around mutliprocessing.
+
+ Allows up to `size` processes at a time and queues the rest.
+
+ Using workarounds for multiprocessing behavior described in:
+ http://pypi.python.org/pypi/twisted.internet.processes/1.0b1
+ """
+
+ def __init__(self, size=None):
+ self._size = size
+ self._pool = multiprocessing.Pool(size)
+ self._registerShutdown()
+
+ def _registerShutdown(self):
+ reactor.addSystemEventTrigger(
+ 'during', 'shutdown', self.shutdown, reactor)
+
+ def shutdown(self, reactor=None):
+ if not self._pool:
+ return
+ self._pool.close()
+ # wait for workers to finish
+ self._pool.terminate()
+ self._pool = None
+
+ def apply(self, f, *args, **kw):
+ """ Add a task to the pool and return a deferred. """
+ result = self._pool.apply_async(f, args, kw)
+ return threads.deferToThread(result.get)