summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris Behrens <cbehrens@codestud.com>2011-08-28 18:17:21 -0700
committerChris Behrens <cbehrens@codestud.com>2011-08-28 18:17:21 -0700
commit32943729861ba4ad562e899a55af77b7974af8db (patch)
treecea65cf12699d795f887088cf392d31eeef9a9e5
parent256cb956abeff85f3cddce499b488dd112c4137d (diff)
downloadnova-32943729861ba4ad562e899a55af77b7974af8db.tar.gz
nova-32943729861ba4ad562e899a55af77b7974af8db.tar.xz
nova-32943729861ba4ad562e899a55af77b7974af8db.zip
fix test_rpc and kombu stuff
-rw-r--r--nova/rpc/FIXME2
-rw-r--r--nova/rpc/__init__.py7
-rw-r--r--nova/rpc/impl_carrot.py56
-rw-r--r--nova/rpc/impl_kombu.py21
-rw-r--r--nova/tests/test_rpc.py25
5 files changed, 79 insertions, 32 deletions
diff --git a/nova/rpc/FIXME b/nova/rpc/FIXME
deleted file mode 100644
index 704081802..000000000
--- a/nova/rpc/FIXME
+++ /dev/null
@@ -1,2 +0,0 @@
-Move some code duplication between carrot/kombu into common.py
-The other FIXMEs in __init__.py and impl_kombu.py
diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py
index 9371c2ab3..10b69c8b5 100644
--- a/nova/rpc/__init__.py
+++ b/nova/rpc/__init__.py
@@ -38,18 +38,13 @@ RPCIMPL = import_object(impl_table.get(FLAGS.rpc_backend,
def create_connection(new=True):
- return RPCIMPL.Connection.instance(new=True)
+ return RPCIMPL.create_connection(new=new)
def create_consumer(conn, topic, proxy, fanout=False):
return RPCIMPL.create_consumer(conn, topic, proxy, fanout)
-def create_consumer_set(conn, consumers):
- # FIXME(comstud): replace however necessary
- return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
-
-
def call(context, topic, msg):
return RPCIMPL.call(context, topic, msg)
diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py
index 40097e10e..efff788a0 100644
--- a/nova/rpc/impl_carrot.py
+++ b/nova/rpc/impl_carrot.py
@@ -33,6 +33,7 @@ import uuid
from carrot import connection as carrot_connection
from carrot import messaging
+import eventlet
from eventlet import greenpool
from eventlet import pools
from eventlet import queue
@@ -42,10 +43,10 @@ from nova import context
from nova import exception
from nova import fakerabbit
from nova import flags
-from nova import log as logging
-from nova import utils
from nova.rpc.common import RemoteError, LOG
+# Needed for tests
+eventlet.monkey_patch()
FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
@@ -57,6 +58,11 @@ flags.DEFINE_integer('rpc_conn_pool_size', 30,
class Connection(carrot_connection.BrokerConnection):
"""Connection instance object."""
+ def __init__(self, *args, **kwargs):
+ super(Connection, self).__init__(*args, **kwargs)
+ self._rpc_consumers = []
+ self._rpc_consumer_thread = None
+
@classmethod
def instance(cls, new=True):
"""Returns the instance."""
@@ -94,6 +100,42 @@ class Connection(carrot_connection.BrokerConnection):
pass
return cls.instance()
+ def close(self):
+ self.cancel_consumer_thread()
+ for consumer in self._rpc_consumers:
+ try:
+ consumer.close()
+ except Exception:
+ # ignore all errors
+ pass
+ self._rpc_consumers = []
+ super(Connection, self).close()
+
+ def consume_in_thread(self):
+ """Consumer from all queues/consumers in a greenthread"""
+
+ consumer_set = ConsumerSet(connection=self,
+ consumer_list=self._rpc_consumers)
+
+ def _consumer_thread():
+ try:
+ consumer_set.wait()
+ except greenlet.GreenletExit:
+ return
+ if not self._rpc_consumer_thread:
+ self._rpc_consumer_thread = eventlet.spawn(_consumer_thread)
+ return self._rpc_consumer_thread
+
+ def cancel_consumer_thread(self):
+ """Cancel a consumer thread"""
+ if self._rpc_consumer_thread:
+ self._rpc_consumer_thread.kill()
+ try:
+ self._rpc_consumer_thread.wait()
+ except greenlet.GreenletExit:
+ pass
+ self._rpc_consumer_thread = None
+
class Pool(pools.Pool):
"""Class that implements a Pool of Connections."""
@@ -119,6 +161,7 @@ class Consumer(messaging.Consumer):
"""
def __init__(self, *args, **kwargs):
+ connection = kwargs.get('connection')
max_retries = FLAGS.rabbit_max_retries
sleep_time = FLAGS.rabbit_retry_interval
tries = 0
@@ -148,6 +191,7 @@ class Consumer(messaging.Consumer):
LOG.error(_('Unable to connect to AMQP server '
'after %(tries)d tries. Shutting down.') % locals())
sys.exit(1)
+ connection._rpc_consumers.append(self)
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
"""Wraps the parent fetch with some logic for failed connection."""
@@ -175,12 +219,6 @@ class Consumer(messaging.Consumer):
LOG.exception(_('Failed to fetch message from queue: %s' % e))
self.failed_connection = True
- def attach_to_eventlet(self):
- """Only needed for unit tests!"""
- timer = utils.LoopingCall(self.fetch, enable_callbacks=True)
- timer.start(0.1)
- return timer
-
class AdapterConsumer(Consumer):
"""Calls methods on a proxy object based on method and args."""
@@ -251,7 +289,7 @@ class AdapterConsumer(Consumer):
# NOTE(vish): this iterates through the generator
list(rval)
except Exception as e:
- logging.exception('Exception during message handling')
+ LOG.exception('Exception during message handling')
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
return
diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py
index 01871606c..bd83bc520 100644
--- a/nova/rpc/impl_kombu.py
+++ b/nova/rpc/impl_kombu.py
@@ -14,9 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-from nova import flags
-from nova.rpc.common import RemoteError, LOG
-
import kombu
import kombu.entity
import kombu.messaging
@@ -24,8 +21,22 @@ import kombu.connection
import itertools
import sys
import time
+import traceback
+import types
import uuid
+import eventlet
+from eventlet import greenpool
+from eventlet import pools
+import greenlet
+
+from nova import context
+from nova import exception
+from nova import flags
+from nova.rpc.common import RemoteError, LOG
+
+# Needed for tests
+eventlet.monkey_patch()
FLAGS = flags.FLAGS
@@ -317,7 +328,7 @@ class Connection(object):
pass
time.sleep(1)
self.connection = kombu.connection.Connection(**self.params)
- self.queue_num = itertools.count(1)
+ self.consumer_num = itertools.count(1)
try:
self.connection.ensure_connection(errback=self.connect_error,
@@ -634,7 +645,7 @@ class RpcContext(context.RequestContext):
class MulticallWaiter(object):
def __init__(self, connection):
self._connection = connection
- self._iterator = connection.consume()
+ self._iterator = connection.iterconsume()
self._result = None
self._done = False
diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py
index ba9c0a859..2b9922491 100644
--- a/nova/tests/test_rpc.py
+++ b/nova/tests/test_rpc.py
@@ -33,13 +33,17 @@ class RpcTestCase(test.TestCase):
super(RpcTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
- self.consumer = rpc.create_consumer(self.conn,
- 'test',
- self.receiver,
- False)
- self.consumer.attach_to_eventlet()
+ rpc.create_consumer(self.conn,
+ 'test',
+ self.receiver,
+ False)
+ self.conn.consume_in_thread()
self.context = context.get_admin_context()
+ def tearDown(self):
+ self.conn.close()
+ super(RpcTestCase, self).tearDown()
+
def test_call_succeed(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
@@ -139,16 +143,17 @@ class RpcTestCase(test.TestCase):
nested = Nested()
conn = rpc.create_connection(True)
- consumer = rpc.create_consumer(conn,
- 'nested',
- nested,
- False)
- consumer.attach_to_eventlet()
+ rpc.create_consumer(conn,
+ 'nested',
+ nested,
+ False)
+ conn.consume_in_thread()
value = 42
result = rpc.call(self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
+ conn.close()
self.assertEqual(value, result)