diff options
author | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-07-16 19:12:36 +0000 |
---|---|---|
committer | Vishvananda Ishaya <vishvananda@gmail.com> | 2010-07-16 19:12:36 +0000 |
commit | ae9e4e81d992fb81c01acd2dfcb1cb3d32956041 (patch) | |
tree | ebf0f9c5ab953327bcb358a12c4e7e2acd84f74b | |
parent | 63c5ab9806aeb732dc8a8cb7b902592fb5db9363 (diff) | |
download | nova-ae9e4e81d992fb81c01acd2dfcb1cb3d32956041.tar.gz nova-ae9e4e81d992fb81c01acd2dfcb1cb3d32956041.tar.xz nova-ae9e4e81d992fb81c01acd2dfcb1cb3d32956041.zip |
Removed unused Pool from process.py, added a singleton pool called SharedPool, changed calls in node to use singleton pool
-rw-r--r-- | nova/compute/node.py | 41 | ||||
-rw-r--r-- | nova/process.py | 46 | ||||
-rw-r--r-- | nova/tests/process_unittest.py | 44 |
3 files changed, 57 insertions, 74 deletions
diff --git a/nova/compute/node.py b/nova/compute/node.py index d681ec661..3abd20120 100644 --- a/nova/compute/node.py +++ b/nova/compute/node.py @@ -87,7 +87,6 @@ class Node(object, service.Service): super(Node, self).__init__() self._instances = {} self._conn = self._get_connection() - self._pool = process.ProcessPool() self.instdir = model.InstanceDirectory() # TODO(joshua): This needs to ensure system state, specifically: modprobe aoe @@ -115,7 +114,7 @@ class Node(object, service.Service): # inst = self.instdir.get(instance_id) # return inst if self.instdir.exists(instance_id): - return Instance.fromName(self._conn, self._pool, instance_id) + return Instance.fromName(self._conn, instance_id) return None @exception.wrap_exception @@ -126,7 +125,7 @@ class Node(object, service.Service): for x in self._conn.listDomainsID()] for name in instance_names: try: - new_inst = Instance.fromName(self._conn, self._pool, name) + new_inst = Instance.fromName(self._conn, name) new_inst.update_state() except: pass @@ -136,7 +135,8 @@ class Node(object, service.Service): def describe_instances(self): retval = {} for inst in self.instdir.by_node(FLAGS.node_name): - retval[inst['instance_id']] = (Instance.fromName(self._conn, self._pool, inst['instance_id'])) + retval[inst['instance_id']] = ( + Instance.fromName(self._conn, inst['instance_id'])) return retval @defer.inlineCallbacks @@ -169,8 +169,7 @@ class Node(object, service.Service): inst['node_name'] = FLAGS.node_name inst.save() # TODO(vish) check to make sure the availability zone matches - new_inst = Instance(self._conn, name=instance_id, - pool=self._pool, data=inst) + new_inst = Instance(self._conn, name=instance_id, data=inst) logging.info("Instances current state is %s", new_inst.state) if new_inst.is_running(): raise exception.Error("Instance is already running") @@ -267,11 +266,8 @@ class Instance(object): SHUTOFF = 0x05 CRASHED = 0x06 - def __init__(self, conn, pool, name, data): + def __init__(self, conn, name, data): """ spawn an instance with a given name """ - # TODO(termie): pool should probably be a singleton instead of being passed - # here and in the classmethods - self._pool = pool self._conn = conn # TODO(vish): this can be removed after data has been updated # data doesn't seem to have a working iterator so in doesn't work @@ -324,11 +320,11 @@ class Instance(object): return libvirt_xml @classmethod - def fromName(cls, conn, pool, name): + def fromName(cls, conn, name): """ use the saved data for reloading the instance """ instdir = model.InstanceDirectory() instance = instdir.get(name) - return cls(conn=conn, pool=pool, name=name, data=instance) + return cls(conn=conn, name=name, data=instance) def set_state(self, state_code, state_description=None): self.datamodel['state'] = state_code @@ -450,12 +446,13 @@ class Instance(object): def _fetch_s3_image(self, image, path): url = _image_url('%s/image' % image) - d = self._pool.simpleExecute('curl --silent %s -o %s' % (url, path)) + d = process.SharedPool().simple_execute( + 'curl --silent %s -o %s' % (url, path)) return d def _fetch_local_image(self, image, path): source = _image_path('%s/image' % image) - d = self._pool.simpleExecute('cp %s %s' % (source, path)) + d = process.SharedPool().simple_execute('cp %s %s' % (source, path)) return d @defer.inlineCallbacks @@ -465,8 +462,10 @@ class Instance(object): basepath = self.basepath # ensure directories exist and are writable - yield self._pool.simpleExecute('mkdir -p %s' % basepath()) - yield self._pool.simpleExecute('chmod 0777 %s' % basepath()) + yield process.SharedPool().simple_execute( + 'mkdir -p %s' % basepath()) + yield process.SharedPool().simple_execute( + 'chmod 0777 %s' % basepath()) # TODO(termie): these are blocking calls, it would be great @@ -492,9 +491,10 @@ class Instance(object): if not os.path.exists(basepath('ramdisk')): yield _fetch_file(data['ramdisk_id'], basepath('ramdisk')) - execute = lambda cmd, input=None: self._pool.simpleExecute(cmd=cmd, - input=input, - error_ok=1) + execute = lambda cmd, input=None: \ + process.SharedPool().simple_execute(cmd=cmd, + input=input, + error_ok=1) key = data['key_data'] net = None @@ -511,7 +511,8 @@ class Instance(object): yield disk.inject_data(basepath('disk-raw'), key, net, execute=execute) if os.path.exists(basepath('disk')): - yield self._pool.simpleExecute('rm -f %s' % basepath('disk')) + yield process.SharedPool().simple_execute( + 'rm -f %s' % basepath('disk')) bytes = (INSTANCE_TYPES[data['instance_type']]['local_gb'] * 1024 * 1024 * 1024) diff --git a/nova/process.py b/nova/process.py index ff789a08a..ebfb2f4ba 100644 --- a/nova/process.py +++ b/nova/process.py @@ -85,7 +85,7 @@ class _BackRelay(protocol.ProcessProtocol): def errReceivedIsBad(self, text): if self.deferred is not None: self.onProcessEnded = defer.Deferred() - err = _UnexpectedErrorOutput(text, self.onProcessEnded) + err = UnexpectedErrorOutput(text, self.onProcessEnded) self.deferred.errback(failure.Failure(err)) self.deferred = None self.transport.loseConnection() @@ -152,8 +152,8 @@ def getProcessOutput(executable, args=None, env=None, path=None, reactor=None, d = defer.Deferred() p = BackRelayWithInput( d, startedDeferred=startedDeferred, error_ok=error_ok, input=input) - # VISH: commands come in as unicode, but self.executes needs - # strings or process.spawn raises a deprecation warning + # NOTE(vish): commands come in as unicode, but self.executes needs + # strings or process.spawn raises a deprecation warning executable = str(executable) if not args is None: args = [str(x) for x in args] @@ -171,7 +171,7 @@ class ProcessPool(object): self.size = size and size or FLAGS.process_pool_size self._pool = defer.DeferredSemaphore(self.size) - def simpleExecute(self, cmd, **kw): + def simple_execute(self, cmd, **kw): """ Weak emulation of the old utils.execute() function. This only exists as a way to quickly move old execute methods to @@ -205,34 +205,10 @@ class ProcessPool(object): self._pool.release() return rv - -class Pool(object): - """ A simple process pool implementation around mutliprocessing. - - Allows up to `size` processes at a time and queues the rest. - - Using workarounds for multiprocessing behavior described in: - http://pypi.python.org/pypi/twisted.internet.processes/1.0b1 - """ - - def __init__(self, size=None): - self._size = size - self._pool = multiprocessing.Pool(size) - self._registerShutdown() - - def _registerShutdown(self): - reactor.addSystemEventTrigger( - 'during', 'shutdown', self.shutdown, reactor) - - def shutdown(self, reactor=None): - if not self._pool: - return - self._pool.close() - # wait for workers to finish - self._pool.terminate() - self._pool = None - - def apply(self, f, *args, **kw): - """ Add a task to the pool and return a deferred. """ - result = self._pool.apply_async(f, args, kw) - return threads.deferToThread(result.get) +class SharedPool(ProcessPool): + _instance = None + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super(SharedPool, cls).__new__( + cls, *args, **kwargs) + return cls._instance diff --git a/nova/tests/process_unittest.py b/nova/tests/process_unittest.py index 01648961f..1c15b69a0 100644 --- a/nova/tests/process_unittest.py +++ b/nova/tests/process_unittest.py @@ -37,7 +37,7 @@ class ProcessTestCase(test.TrialTestCase): def test_execute_stdout(self): pool = process.ProcessPool(2) - d = pool.simpleExecute('echo test') + d = pool.simple_execute('echo test') def _check(rv): self.assertEqual(rv[0], 'test\n') self.assertEqual(rv[1], '') @@ -48,38 +48,38 @@ class ProcessTestCase(test.TrialTestCase): def test_execute_stderr(self): pool = process.ProcessPool(2) - d = pool.simpleExecute('cat BAD_FILE', error_ok=1) + d = pool.simple_execute('cat BAD_FILE', error_ok=1) def _check(rv): self.assertEqual(rv[0], '') self.assert_('No such file' in rv[1]) - + d.addCallback(_check) d.addErrback(self.fail) return d def test_execute_unexpected_stderr(self): pool = process.ProcessPool(2) - d = pool.simpleExecute('cat BAD_FILE') + d = pool.simple_execute('cat BAD_FILE') d.addCallback(lambda x: self.fail('should have raised an error')) d.addErrback(lambda failure: failure.trap(IOError)) return d - + def test_max_processes(self): pool = process.ProcessPool(2) - d1 = pool.simpleExecute('sleep 0.01') - d2 = pool.simpleExecute('sleep 0.01') - d3 = pool.simpleExecute('sleep 0.005') - d4 = pool.simpleExecute('sleep 0.005') + d1 = pool.simple_execute('sleep 0.01') + d2 = pool.simple_execute('sleep 0.01') + d3 = pool.simple_execute('sleep 0.005') + d4 = pool.simple_execute('sleep 0.005') called = [] def _called(rv, name): called.append(name) - + d1.addCallback(_called, 'd1') d2.addCallback(_called, 'd2') d3.addCallback(_called, 'd3') d4.addCallback(_called, 'd4') - + # Make sure that d3 and d4 had to wait on the other two and were called # in order # NOTE(termie): there may be a race condition in this test if for some @@ -92,25 +92,31 @@ class ProcessTestCase(test.TrialTestCase): def test_kill_long_process(self): pool = process.ProcessPool(2) - - d1 = pool.simpleExecute('sleep 1') - d2 = pool.simpleExecute('sleep 0.005') + + d1 = pool.simple_execute('sleep 1') + d2 = pool.simple_execute('sleep 0.005') timeout = reactor.callLater(0.1, self.fail, 'should have been killed') - + # kill d1 and wait on it to end then cancel the timeout d2.addCallback(lambda _: d1.process.signalProcess('KILL')) d2.addCallback(lambda _: d1) d2.addBoth(lambda _: timeout.active() and timeout.cancel()) d2.addErrback(self.fail) return d2 - + def test_process_exit_is_contained(self): pool = process.ProcessPool(2) - - d1 = pool.simpleExecute('sleep 1') + + d1 = pool.simple_execute('sleep 1') d1.addCallback(lambda x: self.fail('should have errbacked')) d1.addErrback(lambda fail: fail.trap(IOError)) reactor.callLater(0.05, d1.process.signalProcess, 'KILL') - + return d1 + + def test_shared_pool_is_singleton(self): + pool1 = process.SharedPool() + pool2 = process.SharedPool() + self.assert_(id(pool1) == id(pool2)) + |