diff options
author | Chris Behrens <cbehrens@codestud.com> | 2011-08-28 18:17:21 -0700 |
---|---|---|
committer | Chris Behrens <cbehrens@codestud.com> | 2011-08-28 18:17:21 -0700 |
commit | 32943729861ba4ad562e899a55af77b7974af8db (patch) | |
tree | cea65cf12699d795f887088cf392d31eeef9a9e5 | |
parent | 256cb956abeff85f3cddce499b488dd112c4137d (diff) | |
download | nova-32943729861ba4ad562e899a55af77b7974af8db.tar.gz nova-32943729861ba4ad562e899a55af77b7974af8db.tar.xz nova-32943729861ba4ad562e899a55af77b7974af8db.zip |
fix test_rpc and kombu stuff
-rw-r--r-- | nova/rpc/FIXME | 2 | ||||
-rw-r--r-- | nova/rpc/__init__.py | 7 | ||||
-rw-r--r-- | nova/rpc/impl_carrot.py | 56 | ||||
-rw-r--r-- | nova/rpc/impl_kombu.py | 21 | ||||
-rw-r--r-- | nova/tests/test_rpc.py | 25 |
5 files changed, 79 insertions, 32 deletions
diff --git a/nova/rpc/FIXME b/nova/rpc/FIXME deleted file mode 100644 index 704081802..000000000 --- a/nova/rpc/FIXME +++ /dev/null @@ -1,2 +0,0 @@ -Move some code duplication between carrot/kombu into common.py -The other FIXMEs in __init__.py and impl_kombu.py diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 9371c2ab3..10b69c8b5 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -38,18 +38,13 @@ RPCIMPL = import_object(impl_table.get(FLAGS.rpc_backend, def create_connection(new=True): - return RPCIMPL.Connection.instance(new=True) + return RPCIMPL.create_connection(new=new) def create_consumer(conn, topic, proxy, fanout=False): return RPCIMPL.create_consumer(conn, topic, proxy, fanout) -def create_consumer_set(conn, consumers): - # FIXME(comstud): replace however necessary - return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers) - - def call(context, topic, msg): return RPCIMPL.call(context, topic, msg) diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index 40097e10e..efff788a0 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -33,6 +33,7 @@ import uuid from carrot import connection as carrot_connection from carrot import messaging +import eventlet from eventlet import greenpool from eventlet import pools from eventlet import queue @@ -42,10 +43,10 @@ from nova import context from nova import exception from nova import fakerabbit from nova import flags -from nova import log as logging -from nova import utils from nova.rpc.common import RemoteError, LOG +# Needed for tests +eventlet.monkey_patch() FLAGS = flags.FLAGS flags.DEFINE_integer('rpc_thread_pool_size', 1024, @@ -57,6 +58,11 @@ flags.DEFINE_integer('rpc_conn_pool_size', 30, class Connection(carrot_connection.BrokerConnection): """Connection instance object.""" + def __init__(self, *args, **kwargs): + super(Connection, self).__init__(*args, **kwargs) + self._rpc_consumers = [] + self._rpc_consumer_thread = None + @classmethod def instance(cls, new=True): """Returns the instance.""" @@ -94,6 +100,42 @@ class Connection(carrot_connection.BrokerConnection): pass return cls.instance() + def close(self): + self.cancel_consumer_thread() + for consumer in self._rpc_consumers: + try: + consumer.close() + except Exception: + # ignore all errors + pass + self._rpc_consumers = [] + super(Connection, self).close() + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + + consumer_set = ConsumerSet(connection=self, + consumer_list=self._rpc_consumers) + + def _consumer_thread(): + try: + consumer_set.wait() + except greenlet.GreenletExit: + return + if not self._rpc_consumer_thread: + self._rpc_consumer_thread = eventlet.spawn(_consumer_thread) + return self._rpc_consumer_thread + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self._rpc_consumer_thread: + self._rpc_consumer_thread.kill() + try: + self._rpc_consumer_thread.wait() + except greenlet.GreenletExit: + pass + self._rpc_consumer_thread = None + class Pool(pools.Pool): """Class that implements a Pool of Connections.""" @@ -119,6 +161,7 @@ class Consumer(messaging.Consumer): """ def __init__(self, *args, **kwargs): + connection = kwargs.get('connection') max_retries = FLAGS.rabbit_max_retries sleep_time = FLAGS.rabbit_retry_interval tries = 0 @@ -148,6 +191,7 @@ class Consumer(messaging.Consumer): LOG.error(_('Unable to connect to AMQP server ' 'after %(tries)d tries. Shutting down.') % locals()) sys.exit(1) + connection._rpc_consumers.append(self) def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False): """Wraps the parent fetch with some logic for failed connection.""" @@ -175,12 +219,6 @@ class Consumer(messaging.Consumer): LOG.exception(_('Failed to fetch message from queue: %s' % e)) self.failed_connection = True - def attach_to_eventlet(self): - """Only needed for unit tests!""" - timer = utils.LoopingCall(self.fetch, enable_callbacks=True) - timer.start(0.1) - return timer - class AdapterConsumer(Consumer): """Calls methods on a proxy object based on method and args.""" @@ -251,7 +289,7 @@ class AdapterConsumer(Consumer): # NOTE(vish): this iterates through the generator list(rval) except Exception as e: - logging.exception('Exception during message handling') + LOG.exception('Exception during message handling') if msg_id: msg_reply(msg_id, None, sys.exc_info()) return diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 01871606c..bd83bc520 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -14,9 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -from nova import flags -from nova.rpc.common import RemoteError, LOG - import kombu import kombu.entity import kombu.messaging @@ -24,8 +21,22 @@ import kombu.connection import itertools import sys import time +import traceback +import types import uuid +import eventlet +from eventlet import greenpool +from eventlet import pools +import greenlet + +from nova import context +from nova import exception +from nova import flags +from nova.rpc.common import RemoteError, LOG + +# Needed for tests +eventlet.monkey_patch() FLAGS = flags.FLAGS @@ -317,7 +328,7 @@ class Connection(object): pass time.sleep(1) self.connection = kombu.connection.Connection(**self.params) - self.queue_num = itertools.count(1) + self.consumer_num = itertools.count(1) try: self.connection.ensure_connection(errback=self.connect_error, @@ -634,7 +645,7 @@ class RpcContext(context.RequestContext): class MulticallWaiter(object): def __init__(self, connection): self._connection = connection - self._iterator = connection.consume() + self._iterator = connection.iterconsume() self._result = None self._done = False diff --git a/nova/tests/test_rpc.py b/nova/tests/test_rpc.py index ba9c0a859..2b9922491 100644 --- a/nova/tests/test_rpc.py +++ b/nova/tests/test_rpc.py @@ -33,13 +33,17 @@ class RpcTestCase(test.TestCase): super(RpcTestCase, self).setUp() self.conn = rpc.create_connection(True) self.receiver = TestReceiver() - self.consumer = rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) - self.consumer.attach_to_eventlet() + rpc.create_consumer(self.conn, + 'test', + self.receiver, + False) + self.conn.consume_in_thread() self.context = context.get_admin_context() + def tearDown(self): + self.conn.close() + super(RpcTestCase, self).tearDown() + def test_call_succeed(self): value = 42 result = rpc.call(self.context, 'test', {"method": "echo", @@ -139,16 +143,17 @@ class RpcTestCase(test.TestCase): nested = Nested() conn = rpc.create_connection(True) - consumer = rpc.create_consumer(conn, - 'nested', - nested, - False) - consumer.attach_to_eventlet() + rpc.create_consumer(conn, + 'nested', + nested, + False) + conn.consume_in_thread() value = 42 result = rpc.call(self.context, 'nested', {"method": "echo", "args": {"queue": "test", "value": value}}) + conn.close() self.assertEqual(value, result) |