summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2011-08-29 14:36:12 -0700
committerChris Behrens <cbehrens@codestud.com>2011-08-29 14:36:12 -0700
commitee15f2a58217d522e23d811db4958e2e9b2338d6 (patch)
tree94d3e4d645547066bf03b19faf789f5baf6b336f
parentc0775bfd7d6f2b21b95e459e4c78c5ed9b445efe (diff)
downloadnova-ee15f2a58217d522e23d811db4958e2e9b2338d6.tar.gz
nova-ee15f2a58217d522e23d811db4958e2e9b2338d6.tar.xz
nova-ee15f2a58217d522e23d811db4958e2e9b2338d6.zip
ditched rpc.create_consumer(conn) interface... instead you now do conn.create_consumer(..
-rw-r--r--nova/rpc/__init__.py6
-rw-r--r--nova/rpc/impl_carrot.py29
-rw-r--r--nova/rpc/impl_kombu.py55
-rw-r--r--nova/service.py8
-rw-r--r--nova/tests/test_rpc.py10
-rw-r--r--nova/tests/test_rpc_carrot.py10
-rw-r--r--nova/tests/test_rpc_kombu.py10
-rw-r--r--nova/tests/test_test.py3
8 files changed, 52 insertions, 79 deletions
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index 2a47ba87b..fe50fb476 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -24,7 +24,7 @@ from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('rpc_backend',
'kombu',
- "The messaging module to use, defaults to carrot.")
+ "The messaging module to use, defaults to kombu.")
impl_table = {'kombu': 'nova.rpc.impl_kombu',
'amqp': 'nova.rpc.impl_kombu',
@@ -41,10 +41,6 @@ def create_connection(new=True):
return RPCIMPL.create_connection(new=new)
-def create_consumer(conn, topic, proxy, fanout=False):
- RPCIMPL.create_consumer(conn, topic, proxy, fanout)
-
-
def call(context, topic, msg):
return RPCIMPL.call(context, topic, msg)
diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py
index d0e6f8269..6d504aaec 100644
--- a/nova/rpc/impl_carrot.py
+++ b/nova/rpc/impl_carrot.py
@@ -132,6 +132,20 @@ class Connection(carrot_connection.BrokerConnection):
pass
self._rpc_consumer_thread = None
+ def create_consumer(self, topic, proxy, fanout=False):
+ """Create a consumer that calls methods in the proxy"""
+ if fanout:
+ consumer = FanoutAdapterConsumer(
+ connection=self,
+ topic=topic,
+ proxy=proxy)
+ else:
+ consumer = TopicAdapterConsumer(
+ connection=self,
+ topic=topic,
+ proxy=proxy)
+ self._rpc_consumers.append(consumer)
+
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
@@ -187,7 +201,6 @@ class Consumer(messaging.Consumer):
LOG.error(_('Unable to connect to AMQP server '
'after %(tries)d tries. Shutting down.') % locals())
sys.exit(1)
- connection._rpc_consumers.append(self)
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connection."""
@@ -568,20 +581,6 @@ def create_connection(new=True):
return Connection.instance(new=new)
-def create_consumer(conn, topic, proxy, fanout=False):
- """Create a consumer that calls methods in the proxy"""
- if fanout:
- return FanoutAdapterConsumer(
- connection=conn,
- topic=topic,
- proxy=proxy)
- else:
- return TopicAdapterConsumer(
- connection=conn,
- topic=topic,
- proxy=proxy)
-
-
def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 49bca1d81..83ee1b122 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -404,26 +404,6 @@ class Connection(object):
'%s' % str(e)))
self.reconnect()
- def consume(self, limit=None):
- """Consume from all queues/consumers"""
- it = self.iterconsume(limit=limit)
- while True:
- try:
- it.next()
- except StopIteration:
- return
-
- def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
- def _consumer_thread():
- try:
- self.consume()
- except greenlet.GreenletExit:
- return
- if self.consumer_thread is None:
- self.consumer_thread = eventlet.spawn(_consumer_thread)
- return self.consumer_thread
-
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self.consumer_thread is not None:
@@ -478,6 +458,33 @@ class Connection(object):
"""Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg)
+ def consume(self, limit=None):
+ """Consume from all queues/consumers"""
+ it = self.iterconsume(limit=limit)
+ while True:
+ try:
+ it.next()
+ except StopIteration:
+ return
+
+ def consume_in_thread(self):
+ """Consumer from all queues/consumers in a greenthread"""
+ def _consumer_thread():
+ try:
+ self.consume()
+ except greenlet.GreenletExit:
+ return
+ if self.consumer_thread is None:
+ self.consumer_thread = eventlet.spawn(_consumer_thread)
+ return self.consumer_thread
+
+ def create_consumer(self, topic, proxy, fanout=False):
+ """Create a consumer that calls a method in a proxy object"""
+ if fanout:
+ self.declare_fanout_consumer(topic, ProxyCallback(proxy))
+ else:
+ self.declare_topic_consumer(topic, ProxyCallback(proxy))
+
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
@@ -678,14 +685,6 @@ def create_connection(new=True):
return ConnectionContext(pooled=not new)
-def create_consumer(conn, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object"""
- if fanout:
- conn.declare_fanout_consumer(topic, ProxyCallback(proxy))
- else:
- conn.declare_topic_consumer(topic, ProxyCallback(proxy))
-
-
def multicall(context, topic, msg):
"""Make a call that returns multiple times."""
# Can't use 'with' for multicall, as it returns an iterator
diff --git a/nova/service.py b/nova/service.py
index ab7925eb3..247eb4fb1 100644
--- a/nova/service.py
+++ b/nova/service.py
@@ -153,14 +153,12 @@ class Service(object):
self.topic)
# Share this same connection for these Consumers
- rpc.create_consumer(self.conn, self.topic, self,
- fanout=False)
+ self.conn.create_consumer(self.topic, self, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
- rpc.create_consumer(self.conn, node_topic, self,
- fanout=False)
+ self.conn.create_consumer(node_topic, self, fanout=False)
- rpc.create_consumer(self.conn, self.topic, self, fanout=True)
+ self.conn.create_consumer(self.topic, self, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index 2b9922491..ba91ea3b2 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -33,10 +33,7 @@ class RpcTestCase(test.TestCase):
super(RpcTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
- rpc.create_consumer(self.conn,
- 'test',
- self.receiver,
- False)
+ self.conn.create_consumer('test', self.receiver, False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
@@ -143,10 +140,7 @@ class RpcTestCase(test.TestCase):
nested = Nested()
conn = rpc.create_connection(True)
- rpc.create_consumer(conn,
- 'nested',
- nested,
- False)
+ conn.create_consumer('nested', nested, False)
conn.consume_in_thread()
value = 42
result = rpc.call(self.context,
diff --git a/nova/tests/test_rpc_carrot.py b/nova/tests/test_rpc_carrot.py
index cf84980ab..ff704ecf8 100644
--- a/nova/tests/test_rpc_carrot.py
+++ b/nova/tests/test_rpc_carrot.py
@@ -33,10 +33,7 @@ class RpcCarrotTestCase(test.TestCase):
super(RpcCarrotTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
- rpc.create_consumer(self.conn,
- 'test',
- self.receiver,
- False)
+ self.conn.create_consumer('test', self.receiver, False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
@@ -151,10 +148,7 @@ class RpcCarrotTestCase(test.TestCase):
nested = Nested()
conn = rpc.create_connection(True)
- rpc.create_consumer(conn,
- 'nested',
- nested,
- False)
+ conn.create_consumer('nested', nested, False)
conn.consume_in_thread()
value = 42
result = rpc.call(self.context,
diff --git a/nova/tests/test_rpc_kombu.py b/nova/tests/test_rpc_kombu.py
index 457dfdeca..7db88ecd0 100644
--- a/nova/tests/test_rpc_kombu.py
+++ b/nova/tests/test_rpc_kombu.py
@@ -33,10 +33,7 @@ class RpcKombuTestCase(test.TestCase):
super(RpcKombuTestCase, self).setUp()
self.conn = rpc.create_connection()
self.receiver = TestReceiver()
- rpc.create_consumer(self.conn,
- 'test',
- self.receiver,
- False)
+ self.conn.create_consumer('test', self.receiver, False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
@@ -215,10 +212,7 @@ class RpcKombuTestCase(test.TestCase):
nested = Nested()
conn = rpc.create_connection(True)
- rpc.create_consumer(conn,
- 'nested',
- nested,
- False)
+ conn.create_consumer('nested', nested, False)
conn.consume_in_thread()
value = 42
result = rpc.call(self.context,
diff --git a/nova/tests/test_test.py b/nova/tests/test_test.py
index 6075abbb0..3482ff6a0 100644
--- a/nova/tests/test_test.py
+++ b/nova/tests/test_test.py
@@ -40,6 +40,5 @@ class IsolationTestCase(test.TestCase):
connection = rpc.create_connection(new=True)
proxy = NeverCalled()
- rpc.create_consumer(connection, 'compute',
- proxy, fanout=False)
+ connection.create_consumer('compute', proxy, fanout=False)
connection.consume_in_thread()