summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--nova/compute/node.py41
-rw-r--r--nova/process.py46
-rw-r--r--nova/tests/process_unittest.py44
3 files changed, 57 insertions, 74 deletions
diff --git a/nova/compute/node.py b/nova/compute/node.py
index d681ec661..3abd20120 100644
--- a/nova/compute/node.py
+++ b/nova/compute/node.py
@@ -87,7 +87,6 @@ class Node(object, service.Service):
super(Node, self).__init__()
self._instances = {}
self._conn = self._get_connection()
- self._pool = process.ProcessPool()
self.instdir = model.InstanceDirectory()
# TODO(joshua): This needs to ensure system state, specifically: modprobe aoe
@@ -115,7 +114,7 @@ class Node(object, service.Service):
# inst = self.instdir.get(instance_id)
# return inst
if self.instdir.exists(instance_id):
- return Instance.fromName(self._conn, self._pool, instance_id)
+ return Instance.fromName(self._conn, instance_id)
return None
@exception.wrap_exception
@@ -126,7 +125,7 @@ class Node(object, service.Service):
for x in self._conn.listDomainsID()]
for name in instance_names:
try:
- new_inst = Instance.fromName(self._conn, self._pool, name)
+ new_inst = Instance.fromName(self._conn, name)
new_inst.update_state()
except:
pass
@@ -136,7 +135,8 @@ class Node(object, service.Service):
def describe_instances(self):
retval = {}
for inst in self.instdir.by_node(FLAGS.node_name):
- retval[inst['instance_id']] = (Instance.fromName(self._conn, self._pool, inst['instance_id']))
+ retval[inst['instance_id']] = (
+ Instance.fromName(self._conn, inst['instance_id']))
return retval
@defer.inlineCallbacks
@@ -169,8 +169,7 @@ class Node(object, service.Service):
inst['node_name'] = FLAGS.node_name
inst.save()
# TODO(vish) check to make sure the availability zone matches
- new_inst = Instance(self._conn, name=instance_id,
- pool=self._pool, data=inst)
+ new_inst = Instance(self._conn, name=instance_id, data=inst)
logging.info("Instances current state is %s", new_inst.state)
if new_inst.is_running():
raise exception.Error("Instance is already running")
@@ -267,11 +266,8 @@ class Instance(object):
SHUTOFF = 0x05
CRASHED = 0x06
- def __init__(self, conn, pool, name, data):
+ def __init__(self, conn, name, data):
""" spawn an instance with a given name """
- # TODO(termie): pool should probably be a singleton instead of being passed
- # here and in the classmethods
- self._pool = pool
self._conn = conn
# TODO(vish): this can be removed after data has been updated
# data doesn't seem to have a working iterator so in doesn't work
@@ -324,11 +320,11 @@ class Instance(object):
return libvirt_xml
@classmethod
- def fromName(cls, conn, pool, name):
+ def fromName(cls, conn, name):
""" use the saved data for reloading the instance """
instdir = model.InstanceDirectory()
instance = instdir.get(name)
- return cls(conn=conn, pool=pool, name=name, data=instance)
+ return cls(conn=conn, name=name, data=instance)
def set_state(self, state_code, state_description=None):
self.datamodel['state'] = state_code
@@ -450,12 +446,13 @@ class Instance(object):
def _fetch_s3_image(self, image, path):
url = _image_url('%s/image' % image)
- d = self._pool.simpleExecute('curl --silent %s -o %s' % (url, path))
+ d = process.SharedPool().simple_execute(
+ 'curl --silent %s -o %s' % (url, path))
return d
def _fetch_local_image(self, image, path):
source = _image_path('%s/image' % image)
- d = self._pool.simpleExecute('cp %s %s' % (source, path))
+ d = process.SharedPool().simple_execute('cp %s %s' % (source, path))
return d
@defer.inlineCallbacks
@@ -465,8 +462,10 @@ class Instance(object):
basepath = self.basepath
# ensure directories exist and are writable
- yield self._pool.simpleExecute('mkdir -p %s' % basepath())
- yield self._pool.simpleExecute('chmod 0777 %s' % basepath())
+ yield process.SharedPool().simple_execute(
+ 'mkdir -p %s' % basepath())
+ yield process.SharedPool().simple_execute(
+ 'chmod 0777 %s' % basepath())
# TODO(termie): these are blocking calls, it would be great
@@ -492,9 +491,10 @@ class Instance(object):
if not os.path.exists(basepath('ramdisk')):
yield _fetch_file(data['ramdisk_id'], basepath('ramdisk'))
- execute = lambda cmd, input=None: self._pool.simpleExecute(cmd=cmd,
- input=input,
- error_ok=1)
+ execute = lambda cmd, input=None: \
+ process.SharedPool().simple_execute(cmd=cmd,
+ input=input,
+ error_ok=1)
key = data['key_data']
net = None
@@ -511,7 +511,8 @@ class Instance(object):
yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute)
if os.path.exists(basepath('disk')):
- yield self._pool.simpleExecute('rm -f %s' % basepath('disk'))
+ yield process.SharedPool().simple_execute(
+ 'rm -f %s' % basepath('disk'))
bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb']
* 1024 * 1024 * 1024)
diff --git a/nova/process.py b/nova/process.py
index ff789a08a..ebfb2f4ba 100644
--- a/nova/process.py
+++ b/nova/process.py
@@ -85,7 +85,7 @@ class _BackRelay(protocol.ProcessProtocol):
def errReceivedIsBad(self, text):
if self.deferred is not None:
self.onProcessEnded = defer.Deferred()
- err = _UnexpectedErrorOutput(text, self.onProcessEnded)
+ err = UnexpectedErrorOutput(text, self.onProcessEnded)
self.deferred.errback(failure.Failure(err))
self.deferred = None
self.transport.loseConnection()
@@ -152,8 +152,8 @@ def getProcessOutput(executable, args=None, env=None, path=None, reactor=None,
d = defer.Deferred()
p = BackRelayWithInput(
d, startedDeferred=startedDeferred, error_ok=error_ok, input=input)
- # VISH: commands come in as unicode, but self.executes needs
- # strings or process.spawn raises a deprecation warning
+ # NOTE(vish): commands come in as unicode, but self.executes needs
+ # strings or process.spawn raises a deprecation warning
executable = str(executable)
if not args is None:
args = [str(x) for x in args]
@@ -171,7 +171,7 @@ class ProcessPool(object):
self.size = size and size or FLAGS.process_pool_size
self._pool = defer.DeferredSemaphore(self.size)
- def simpleExecute(self, cmd, **kw):
+ def simple_execute(self, cmd, **kw):
""" Weak emulation of the old utils.execute() function.
This only exists as a way to quickly move old execute methods to
@@ -205,34 +205,10 @@ class ProcessPool(object):
self._pool.release()
return rv
-
-class Pool(object):
- """ A simple process pool implementation around mutliprocessing.
-
- Allows up to `size` processes at a time and queues the rest.
-
- Using workarounds for multiprocessing behavior described in:
- http://pypi.python.org/pypi/twisted.internet.processes/1.0b1
- """
-
- def __init__(self, size=None):
- self._size = size
- self._pool = multiprocessing.Pool(size)
- self._registerShutdown()
-
- def _registerShutdown(self):
- reactor.addSystemEventTrigger(
- 'during', 'shutdown', self.shutdown, reactor)
-
- def shutdown(self, reactor=None):
- if not self._pool:
- return
- self._pool.close()
- # wait for workers to finish
- self._pool.terminate()
- self._pool = None
-
- def apply(self, f, *args, **kw):
- """ Add a task to the pool and return a deferred. """
- result = self._pool.apply_async(f, args, kw)
- return threads.deferToThread(result.get)
+class SharedPool(ProcessPool):
+ _instance = None
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(SharedPool, cls).__new__(
+ cls, *args, **kwargs)
+ return cls._instance
diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py
index 01648961f..1c15b69a0 100644
--- a/nova/tests/process_unittest.py
+++ b/nova/tests/process_unittest.py
@@ -37,7 +37,7 @@ class ProcessTestCase(test.TrialTestCase):
def test_execute_stdout(self):
pool = process.ProcessPool(2)
- d = pool.simpleExecute('echo test')
+ d = pool.simple_execute('echo test')
def _check(rv):
self.assertEqual(rv[0], 'test\n')
self.assertEqual(rv[1], '')
@@ -48,38 +48,38 @@ class ProcessTestCase(test.TrialTestCase):
def test_execute_stderr(self):
pool = process.ProcessPool(2)
- d = pool.simpleExecute('cat BAD_FILE', error_ok=1)
+ d = pool.simple_execute('cat BAD_FILE', error_ok=1)
def _check(rv):
self.assertEqual(rv[0], '')
self.assert_('No such file' in rv[1])
-
+
d.addCallback(_check)
d.addErrback(self.fail)
return d
def test_execute_unexpected_stderr(self):
pool = process.ProcessPool(2)
- d = pool.simpleExecute('cat BAD_FILE')
+ d = pool.simple_execute('cat BAD_FILE')
d.addCallback(lambda x: self.fail('should have raised an error'))
d.addErrback(lambda failure: failure.trap(IOError))
return d
-
+
def test_max_processes(self):
pool = process.ProcessPool(2)
- d1 = pool.simpleExecute('sleep 0.01')
- d2 = pool.simpleExecute('sleep 0.01')
- d3 = pool.simpleExecute('sleep 0.005')
- d4 = pool.simpleExecute('sleep 0.005')
+ d1 = pool.simple_execute('sleep 0.01')
+ d2 = pool.simple_execute('sleep 0.01')
+ d3 = pool.simple_execute('sleep 0.005')
+ d4 = pool.simple_execute('sleep 0.005')
called = []
def _called(rv, name):
called.append(name)
-
+
d1.addCallback(_called, 'd1')
d2.addCallback(_called, 'd2')
d3.addCallback(_called, 'd3')
d4.addCallback(_called, 'd4')
-
+
# Make sure that d3 and d4 had to wait on the other two and were called
# in order
# NOTE(termie): there may be a race condition in this test if for some
@@ -92,25 +92,31 @@ class ProcessTestCase(test.TrialTestCase):
def test_kill_long_process(self):
pool = process.ProcessPool(2)
-
- d1 = pool.simpleExecute('sleep 1')
- d2 = pool.simpleExecute('sleep 0.005')
+
+ d1 = pool.simple_execute('sleep 1')
+ d2 = pool.simple_execute('sleep 0.005')
timeout = reactor.callLater(0.1, self.fail, 'should have been killed')
-
+
# kill d1 and wait on it to end then cancel the timeout
d2.addCallback(lambda _: d1.process.signalProcess('KILL'))
d2.addCallback(lambda _: d1)
d2.addBoth(lambda _: timeout.active() and timeout.cancel())
d2.addErrback(self.fail)
return d2
-
+
def test_process_exit_is_contained(self):
pool = process.ProcessPool(2)
-
- d1 = pool.simpleExecute('sleep 1')
+
+ d1 = pool.simple_execute('sleep 1')
d1.addCallback(lambda x: self.fail('should have errbacked'))
d1.addErrback(lambda fail: fail.trap(IOError))
reactor.callLater(0.05, d1.process.signalProcess, 'KILL')
-
+
return d1
+
+ def test_shared_pool_is_singleton(self):
+ pool1 = process.SharedPool()
+ pool2 = process.SharedPool()
+ self.assert_(id(pool1) == id(pool2))
+