diff options
-rw-r--r-- | openstack/common/lockutils.py | 5 | ||||
-rw-r--r-- | openstack/common/rpc/amqp.py | 2 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 20 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 15 | ||||
-rw-r--r-- | openstack/common/setup.py | 2 | ||||
-rw-r--r-- | tests/unit/rpc/test_kombu.py | 19 | ||||
-rw-r--r-- | tests/unit/rpc/test_qpid.py | 109 | ||||
-rw-r--r-- | tools/install_venv_common.py | 1 |
8 files changed, 154 insertions, 19 deletions
diff --git a/openstack/common/lockutils.py b/openstack/common/lockutils.py index afd6d00..21115fd 100644 --- a/openstack/common/lockutils.py +++ b/openstack/common/lockutils.py @@ -39,9 +39,8 @@ util_opts = [ cfg.BoolOpt('disable_process_locking', default=False, help='Whether to disable inter-process locks'), cfg.StrOpt('lock_path', - default=os.path.abspath(os.path.join(os.path.dirname(__file__), - '../')), - help='Directory to use for lock files') + help=('Directory to use for lock files. Default to a ' + 'temp directory')) ] diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 393e13e..d1cc9ea 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 19f0348..f1d46c4 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -302,9 +302,15 @@ class Publisher(object): channel=channel, routing_key=self.routing_key) - def send(self, msg): + def send(self, msg, timeout=None): """Send a message""" - self.producer.publish(msg) + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) class DirectPublisher(Publisher): @@ -653,7 +659,7 @@ class Connection(object): for proxy_cb in self.proxy_callbacks: proxy_cb.wait() - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): """Send to a publisher based on the publisher class""" def _error_callback(exc): @@ -663,7 +669,7 @@ class Connection(object): def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) - publisher.send(msg) + publisher.send(msg, timeout) self.ensure(_error_callback, _publish) @@ -691,9 +697,9 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + self.publisher_send(TopicPublisher, topic, msg, timeout) def fanout_send(self, topic, msg): """Send a 'fanout' message""" @@ -701,7 +707,7 @@ class Connection(object): def notify_send(self, topic, msg, **kwargs): """Send a notify message on a topic""" - self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) def consume(self, limit=None): """Consume from all queues/consumers""" diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index b6691b4..9d3a3a7 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -486,9 +486,20 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): """Send a 'fanout' message""" diff --git a/openstack/common/setup.py b/openstack/common/setup.py index cc8b99e..fb187ff 100644 --- a/openstack/common/setup.py +++ b/openstack/common/setup.py @@ -282,7 +282,7 @@ def get_version_from_git(pre_version): if os.path.isdir('.git'): if pre_version: try: - return _run_shell_command( + return _run_shell_command( "git describe --exact-match", throw_on_error=True).replace('-', '.') except Exception: diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py index b28c568..a201940 100644 --- a/tests/unit/rpc/test_kombu.py +++ b/tests/unit/rpc/test_kombu.py @@ -24,6 +24,7 @@ eventlet.monkey_patch() import contextlib import logging +import time from openstack.common import cfg from openstack.common import exception @@ -109,6 +110,24 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase): self.assertEqual(self.received_message, message) + def test_message_ttl_on_timeout(self): + """Test message ttl being set by request timeout. The message + should die on the vine and never arrive.""" + conn = self.rpc.create_connection(FLAGS) + message = 'topic test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + self.fail("should not have received this message") + + conn.declare_topic_consumer('a_topic', _callback) + conn.topic_send('a_topic', rpc_common.serialize_msg(message), 0) + conn.iterconsume(1, 2) + + conn.close() + def test_topic_send_receive_exchange_name(self): """Test sending to a topic exchange/queue with an exchange name""" diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py index 27445ee..1df94d5 100644 --- a/tests/unit/rpc/test_qpid.py +++ b/tests/unit/rpc/test_qpid.py @@ -26,22 +26,24 @@ eventlet.monkey_patch() import logging import fixtures +import time +import testtools +import mox from openstack.common import cfg from openstack.common import context from openstack.common.rpc import amqp as rpc_amqp from tests import utils +from openstack.common.rpc import common as rpc_common try: - from openstack.common.rpc import impl_qpid import qpid + from openstack.common.rpc import impl_qpid except ImportError: qpid = None impl_qpid = None - FLAGS = cfg.CONF -LOG = logging.getLogger(__name__) class RpcQpidTestCase(utils.BaseTestCase): @@ -71,6 +73,12 @@ class RpcQpidTestCase(utils.BaseTestCase): self.mock_session = None self.mock_sender = None self.mock_receiver = None + self.mox = mox.Mox() + + self.orig_connection = qpid.messaging.Connection + self.orig_session = qpid.messaging.Session + self.orig_sender = qpid.messaging.Sender + self.orig_receiver = qpid.messaging.Receiver self.useFixture( fixtures.MonkeyPatch('qpid.messaging.Connection', @@ -82,9 +90,14 @@ class RpcQpidTestCase(utils.BaseTestCase): fixtures.MonkeyPatch('qpid.messaging.Sender', lambda *_x, **_y: self.mock_sender)) self.useFixture( - fixtures.MonekyPatch('qpid.messaging.Receiver', + fixtures.MonkeyPatch('qpid.messaging.Receiver', lambda *_x, **_y: self.mock_receiver)) - self.addCleanup(self._close_qpid) + + def cleanUp(self): + if impl_qpid: + # Need to reset this in case we changed the connection_cls + # in self._setup_to_server_tests() + impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection def test_create_connection(self): self.mock_connection = self.mox.CreateMock(self.orig_connection) @@ -368,6 +381,92 @@ class RpcQpidTestCase(utils.BaseTestCase): def test_call(self): self._test_call(multi=False) + def _test_call_with_timeout(self, timeout, expect_failure): + # TODO(beagles): should be possible to refactor this method and + # _test_call to share common code. Maybe making the messages + # and test checks parameters, etc. + self.mock_connection = self.mox.CreateMock(self.orig_connection) + self.mock_session = self.mox.CreateMock(self.orig_session) + self.mock_sender = self.mox.CreateMock(self.orig_sender) + self.mock_receiver = self.mox.CreateMock(self.orig_receiver) + + self.mock_connection.opened().AndReturn(False) + self.mock_connection.open() + self.mock_connection.session().AndReturn(self.mock_session) + rcv_addr = mox.Regex( + r'^.*/.* ; {"node": {"x-declare": {"auto-delete":' + ' true, "durable": true, "type": "direct"}, "type": ' + '"topic"}, "create": "always", "link": {"x-declare": ' + '{"auto-delete": true, "exclusive": true, "durable": ' + 'false}, "durable": true, "name": ".*"}}') + self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver) + self.mock_receiver.capacity = 1 + send_addr = ( + 'openstack/impl_qpid_test ; {"node": {"x-declare": ' + '{"auto-delete": true, "durable": false}, "type": "topic"}, ' + '"create": "always"}') + self.mock_session.sender(send_addr).AndReturn(self.mock_sender) + self.mock_sender.send(mox.IgnoreArg()) + + if expect_failure: + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise( + qpid.messaging.exceptions.Empty()) + self.mock_receiver.fetch() + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) + else: + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) + self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( + {"result": "foo", "failure": False, "ending": False})) + self.mock_session.acknowledge(mox.IgnoreArg()) + self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn( + self.mock_receiver) + self.mock_receiver.fetch().AndReturn(qpid.messaging.Message( + {"failure": False, "ending": True})) + self.mock_session.acknowledge(mox.IgnoreArg()) + self.mock_session.close() + self.mock_connection.session().AndReturn(self.mock_session) + + self.mox.ReplayAll() + + try: + ctx = context.RequestContext("user", "project") + method = impl_qpid.call + if expect_failure: + try: + res = method(FLAGS, ctx, "impl_qpid_test", + {"method": "test_method", "args": {}}, + timeout) + self.fail('Expected a timeout exception') + except rpc_common.Timeout: + # Good, this is expected! + pass + else: + res = method(FLAGS, ctx, "impl_qpid_test", + {"method": "test_method", "args": {}}, timeout) + self.assertEquals(res, "foo") + finally: + while impl_qpid.Connection.pool.free_items: + # Pull the mock connection object out of the connection pool so + # that it doesn't mess up other test cases. + impl_qpid.Connection.pool.get() + + def test_call(self): + self._test_call(multi=False) + + def test_call_with_timeout(self): + """A little more indepth for a timeout test. Specifically we are + looking to simulate the event sent to qpid dying on the vine due + to a TTL. A string test that actually involved qpid would be + excellent, but this at least verifies that the exceptions flow + like they should. TODO(beagles): is this really necessary or is + the the case for qpid at least the basic timeout test is + sufficient. + """ + self._test_call_with_timeout(timeout=5, expect_failure=False) + self._test_call_with_timeout(timeout=0, expect_failure=True) + def test_multicall(self): self._test_call(multi=True) diff --git a/tools/install_venv_common.py b/tools/install_venv_common.py index d6a78eb..155db3b 100644 --- a/tools/install_venv_common.py +++ b/tools/install_venv_common.py @@ -1,6 +1,7 @@ # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2013 OpenStack, LLC +# Copyright 2013 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain |