diff options
author | Chris Behrens <cbehrens@codestud.com> | 2011-08-29 14:36:12 -0700 |
---|---|---|
committer | Chris Behrens <cbehrens@codestud.com> | 2011-08-29 14:36:12 -0700 |
commit | ee15f2a58217d522e23d811db4958e2e9b2338d6 (patch) | |
tree | 94d3e4d645547066bf03b19faf789f5baf6b336f | |
parent | c0775bfd7d6f2b21b95e459e4c78c5ed9b445efe (diff) | |
download | nova-ee15f2a58217d522e23d811db4958e2e9b2338d6.tar.gz nova-ee15f2a58217d522e23d811db4958e2e9b2338d6.tar.xz nova-ee15f2a58217d522e23d811db4958e2e9b2338d6.zip |
ditched rpc.create_consumer(conn) interface... instead you now do conn.create_consumer(..
-rw-r--r-- | nova/rpc/__init__.py | 6 | ||||
-rw-r--r-- | nova/rpc/impl_carrot.py | 29 | ||||
-rw-r--r-- | nova/rpc/impl_kombu.py | 55 | ||||
-rw-r--r-- | nova/service.py | 8 | ||||
-rw-r--r-- | nova/tests/test_rpc.py | 10 | ||||
-rw-r--r-- | nova/tests/test_rpc_carrot.py | 10 | ||||
-rw-r--r-- | nova/tests/test_rpc_kombu.py | 10 | ||||
-rw-r--r-- | nova/tests/test_test.py | 3 |
8 files changed, 52 insertions, 79 deletions
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 2a47ba87b..fe50fb476 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -24,7 +24,7 @@ from nova import flags FLAGS = flags.FLAGS flags.DEFINE_string('rpc_backend', 'kombu', - "The messaging module to use, defaults to carrot.") + "The messaging module to use, defaults to kombu.") impl_table = {'kombu': 'nova.rpc.impl_kombu', 'amqp': 'nova.rpc.impl_kombu', @@ -41,10 +41,6 @@ def create_connection(new=True): return RPCIMPL.create_connection(new=new) -def create_consumer(conn, topic, proxy, fanout=False): - RPCIMPL.create_consumer(conn, topic, proxy, fanout) - - def call(context, topic, msg): return RPCIMPL.call(context, topic, msg) diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index d0e6f8269..6d504aaec 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -132,6 +132,20 @@ class Connection(carrot_connection.BrokerConnection): pass self._rpc_consumer_thread = None + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls methods in the proxy""" + if fanout: + consumer = FanoutAdapterConsumer( + connection=self, + topic=topic, + proxy=proxy) + else: + consumer = TopicAdapterConsumer( + connection=self, + topic=topic, + proxy=proxy) + self._rpc_consumers.append(consumer) + class Pool(pools.Pool): """Class that implements a Pool of Connections.""" @@ -187,7 +201,6 @@ class Consumer(messaging.Consumer): LOG.error(_('Unable to connect to AMQP server ' 'after %(tries)d tries. Shutting down.') % locals()) sys.exit(1) - connection._rpc_consumers.append(self) def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): """Wraps the parent fetch with some logic for failed connection.""" @@ -568,20 +581,6 @@ def create_connection(new=True): return Connection.instance(new=new) -def create_consumer(conn, topic, proxy, fanout=False): - """Create a consumer that calls methods in the proxy""" - if fanout: - return FanoutAdapterConsumer( - connection=conn, - topic=topic, - proxy=proxy) - else: - return TopicAdapterConsumer( - connection=conn, - topic=topic, - proxy=proxy) - - def call(context, topic, msg): """Sends a message on a topic and wait for a response.""" rv = multicall(context, topic, msg) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 49bca1d81..83ee1b122 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -404,26 +404,6 @@ class Connection(object): '%s' % str(e))) self.reconnect() - def consume(self, limit=None): - """Consume from all queues/consumers""" - it = self.iterconsume(limit=limit) - while True: - try: - it.next() - except StopIteration: - return - - def consume_in_thread(self): - """Consumer from all queues/consumers in a greenthread""" - def _consumer_thread(): - try: - self.consume() - except greenlet.GreenletExit: - return - if self.consumer_thread is None: - self.consumer_thread = eventlet.spawn(_consumer_thread) - return self.consumer_thread - def cancel_consumer_thread(self): """Cancel a consumer thread""" if self.consumer_thread is not None: @@ -478,6 +458,33 @@ class Connection(object): """Send a 'fanout' message""" self.publisher_send(FanoutPublisher, topic, msg) + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if self.consumer_thread is None: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def create_consumer(self, topic, proxy, fanout=False): + """Create a consumer that calls a method in a proxy object""" + if fanout: + self.declare_fanout_consumer(topic, ProxyCallback(proxy)) + else: + self.declare_topic_consumer(topic, ProxyCallback(proxy)) + class Pool(pools.Pool): """Class that implements a Pool of Connections.""" @@ -678,14 +685,6 @@ def create_connection(new=True): return ConnectionContext(pooled=not new) -def create_consumer(conn, topic, proxy, fanout=False): - """Create a consumer that calls a method in a proxy object""" - if fanout: - conn.declare_fanout_consumer(topic, ProxyCallback(proxy)) - else: - conn.declare_topic_consumer(topic, ProxyCallback(proxy)) - - def multicall(context, topic, msg): """Make a call that returns multiple times.""" # Can't use 'with' for multicall, as it returns an iterator diff --git a/nova/service.py b/nova/service.py index ab7925eb3..247eb4fb1 100644 --- a/nova/service.py +++ b/nova/service.py @@ -153,14 +153,12 @@ class Service(object): self.topic) # Share this same connection for these Consumers - rpc.create_consumer(self.conn, self.topic, self, - fanout=False) + self.conn.create_consumer(self.topic, self, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - rpc.create_consumer(self.conn, node_topic, self, - fanout=False) + self.conn.create_consumer(node_topic, self, fanout=False) - rpc.create_consumer(self.conn, self.topic, self, fanout=True) + self.conn.create_consumer(self.topic, self, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index 2b9922491..ba91ea3b2 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -33,10 +33,7 @@ class RpcTestCase(test.TestCase): super(RpcTestCase, self).setUp() self.conn = rpc.create_connection(True) self.receiver = TestReceiver() - rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) + self.conn.create_consumer('test', self.receiver, False) self.conn.consume_in_thread() self.context = context.get_admin_context() @@ -143,10 +140,7 @@ class RpcTestCase(test.TestCase): nested = Nested() conn = rpc.create_connection(True) - rpc.create_consumer(conn, - 'nested', - nested, - False) + conn.create_consumer('nested', nested, False) conn.consume_in_thread() value = 42 result = rpc.call(self.context, diff --git a/nova/tests/test_rpc_carrot.py b/nova/tests/test_rpc_carrot.py index cf84980ab..ff704ecf8 100644 --- a/nova/tests/test_rpc_carrot.py +++ b/nova/tests/test_rpc_carrot.py @@ -33,10 +33,7 @@ class RpcCarrotTestCase(test.TestCase): super(RpcCarrotTestCase, self).setUp() self.conn = rpc.create_connection(True) self.receiver = TestReceiver() - rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) + self.conn.create_consumer('test', self.receiver, False) self.conn.consume_in_thread() self.context = context.get_admin_context() @@ -151,10 +148,7 @@ class RpcCarrotTestCase(test.TestCase): nested = Nested() conn = rpc.create_connection(True) - rpc.create_consumer(conn, - 'nested', - nested, - False) + conn.create_consumer('nested', nested, False) conn.consume_in_thread() value = 42 result = rpc.call(self.context, diff --git a/nova/tests/test_rpc_kombu.py b/nova/tests/test_rpc_kombu.py index 457dfdeca..7db88ecd0 100644 --- a/nova/tests/test_rpc_kombu.py +++ b/nova/tests/test_rpc_kombu.py @@ -33,10 +33,7 @@ class RpcKombuTestCase(test.TestCase): super(RpcKombuTestCase, self).setUp() self.conn = rpc.create_connection() self.receiver = TestReceiver() - rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) + self.conn.create_consumer('test', self.receiver, False) self.conn.consume_in_thread() self.context = context.get_admin_context() @@ -215,10 +212,7 @@ class RpcKombuTestCase(test.TestCase): nested = Nested() conn = rpc.create_connection(True) - rpc.create_consumer(conn, - 'nested', - nested, - False) + conn.create_consumer('nested', nested, False) conn.consume_in_thread() value = 42 result = rpc.call(self.context, diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py index 6075abbb0..3482ff6a0 100644 --- a/nova/tests/test_test.py +++ b/nova/tests/test_test.py @@ -40,6 +40,5 @@ class IsolationTestCase(test.TestCase): connection = rpc.create_connection(new=True) proxy = NeverCalled() - rpc.create_consumer(connection, 'compute', - proxy, fanout=False) + connection.create_consumer('compute', proxy, fanout=False) connection.consume_in_thread() |