summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/lockutils.py5
-rw-r--r--openstack/common/rpc/amqp.py2
-rw-r--r--openstack/common/rpc/impl_kombu.py20
-rw-r--r--openstack/common/rpc/impl_qpid.py15
-rw-r--r--openstack/common/setup.py2
-rw-r--r--tests/unit/rpc/test_kombu.py19
-rw-r--r--tests/unit/rpc/test_qpid.py109
-rw-r--r--tools/install_venv_common.py1
8 files changed, 154 insertions, 19 deletions
diff --git a/openstack/common/lockutils.py b/openstack/common/lockutils.py
index afd6d00..21115fd 100644
--- a/openstack/common/lockutils.py
+++ b/openstack/common/lockutils.py
@@ -39,9 +39,8 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
- default=os.path.abspath(os.path.join(os.path.dirname(__file__),
- '../')),
- help='Directory to use for lock files')
+ help=('Directory to use for lock files. Default to a '
+ 'temp directory'))
]
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 393e13e..d1cc9ea 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 19f0348..f1d46c4 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -302,9 +302,15 @@ class Publisher(object):
channel=channel,
routing_key=self.routing_key)
- def send(self, msg):
+ def send(self, msg, timeout=None):
"""Send a message"""
- self.producer.publish(msg)
+ if timeout:
+ #
+ # AMQP TTL is in milliseconds when set in the header.
+ #
+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+ else:
+ self.producer.publish(msg)
class DirectPublisher(Publisher):
@@ -653,7 +659,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
- def publisher_send(self, cls, topic, msg, **kwargs):
+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
@@ -663,7 +669,7 @@ class Connection(object):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
- publisher.send(msg)
+ publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
@@ -691,9 +697,9 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
@@ -701,7 +707,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
- self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index b6691b4..9d3a3a7 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -486,9 +486,20 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ #
+ # We want to create a message with attributes, e.g. a TTL. We
+ # don't really need to keep 'msg' in its JSON format any longer
+ # so let's create an actual qpid message here and get some
+ # value-add on the go.
+ #
+ # WARNING: Request timeout happens to be in the same units as
+ # qpid's TTL (seconds). If this changes in the future, then this
+ # will need to be altered accordingly.
+ #
+ qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
+ self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
diff --git a/openstack/common/setup.py b/openstack/common/setup.py
index cc8b99e..fb187ff 100644
--- a/openstack/common/setup.py
+++ b/openstack/common/setup.py
@@ -282,7 +282,7 @@ def get_version_from_git(pre_version):
if os.path.isdir('.git'):
if pre_version:
try:
- return _run_shell_command(
+ return _run_shell_command(
"git describe --exact-match",
throw_on_error=True).replace('-', '.')
except Exception:
diff --git a/tests/unit/rpc/test_kombu.py b/tests/unit/rpc/test_kombu.py
index b28c568..a201940 100644
--- a/tests/unit/rpc/test_kombu.py
+++ b/tests/unit/rpc/test_kombu.py
@@ -24,6 +24,7 @@ eventlet.monkey_patch()
import contextlib
import logging
+import time
from openstack.common import cfg
from openstack.common import exception
@@ -109,6 +110,24 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
+ def test_message_ttl_on_timeout(self):
+ """Test message ttl being set by request timeout. The message
+ should die on the vine and never arrive."""
+ conn = self.rpc.create_connection(FLAGS)
+ message = 'topic test message'
+
+ self.received_message = None
+
+ def _callback(message):
+ self.received_message = message
+ self.fail("should not have received this message")
+
+ conn.declare_topic_consumer('a_topic', _callback)
+ conn.topic_send('a_topic', rpc_common.serialize_msg(message), 0)
+ conn.iterconsume(1, 2)
+
+ conn.close()
+
def test_topic_send_receive_exchange_name(self):
"""Test sending to a topic exchange/queue with an exchange name"""
diff --git a/tests/unit/rpc/test_qpid.py b/tests/unit/rpc/test_qpid.py
index 27445ee..1df94d5 100644
--- a/tests/unit/rpc/test_qpid.py
+++ b/tests/unit/rpc/test_qpid.py
@@ -26,22 +26,24 @@ eventlet.monkey_patch()
import logging
import fixtures
+import time
+import testtools
+import mox
from openstack.common import cfg
from openstack.common import context
from openstack.common.rpc import amqp as rpc_amqp
from tests import utils
+from openstack.common.rpc import common as rpc_common
try:
- from openstack.common.rpc import impl_qpid
import qpid
+ from openstack.common.rpc import impl_qpid
except ImportError:
qpid = None
impl_qpid = None
-
FLAGS = cfg.CONF
-LOG = logging.getLogger(__name__)
class RpcQpidTestCase(utils.BaseTestCase):
@@ -71,6 +73,12 @@ class RpcQpidTestCase(utils.BaseTestCase):
self.mock_session = None
self.mock_sender = None
self.mock_receiver = None
+ self.mox = mox.Mox()
+
+ self.orig_connection = qpid.messaging.Connection
+ self.orig_session = qpid.messaging.Session
+ self.orig_sender = qpid.messaging.Sender
+ self.orig_receiver = qpid.messaging.Receiver
self.useFixture(
fixtures.MonkeyPatch('qpid.messaging.Connection',
@@ -82,9 +90,14 @@ class RpcQpidTestCase(utils.BaseTestCase):
fixtures.MonkeyPatch('qpid.messaging.Sender',
lambda *_x, **_y: self.mock_sender))
self.useFixture(
- fixtures.MonekyPatch('qpid.messaging.Receiver',
+ fixtures.MonkeyPatch('qpid.messaging.Receiver',
lambda *_x, **_y: self.mock_receiver))
- self.addCleanup(self._close_qpid)
+
+ def cleanUp(self):
+ if impl_qpid:
+ # Need to reset this in case we changed the connection_cls
+ # in self._setup_to_server_tests()
+ impl_qpid.Connection.pool.connection_cls = impl_qpid.Connection
def test_create_connection(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
@@ -368,6 +381,92 @@ class RpcQpidTestCase(utils.BaseTestCase):
def test_call(self):
self._test_call(multi=False)
+ def _test_call_with_timeout(self, timeout, expect_failure):
+ # TODO(beagles): should be possible to refactor this method and
+ # _test_call to share common code. Maybe making the messages
+ # and test checks parameters, etc.
+ self.mock_connection = self.mox.CreateMock(self.orig_connection)
+ self.mock_session = self.mox.CreateMock(self.orig_session)
+ self.mock_sender = self.mox.CreateMock(self.orig_sender)
+ self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
+
+ self.mock_connection.opened().AndReturn(False)
+ self.mock_connection.open()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ rcv_addr = mox.Regex(
+ r'^.*/.* ; {"node": {"x-declare": {"auto-delete":'
+ ' true, "durable": true, "type": "direct"}, "type": '
+ '"topic"}, "create": "always", "link": {"x-declare": '
+ '{"auto-delete": true, "exclusive": true, "durable": '
+ 'false}, "durable": true, "name": ".*"}}')
+ self.mock_session.receiver(rcv_addr).AndReturn(self.mock_receiver)
+ self.mock_receiver.capacity = 1
+ send_addr = (
+ 'openstack/impl_qpid_test ; {"node": {"x-declare": '
+ '{"auto-delete": true, "durable": false}, "type": "topic"}, '
+ '"create": "always"}')
+ self.mock_session.sender(send_addr).AndReturn(self.mock_sender)
+ self.mock_sender.send(mox.IgnoreArg())
+
+ if expect_failure:
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndRaise(
+ qpid.messaging.exceptions.Empty())
+ self.mock_receiver.fetch()
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+ else:
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
+ {"result": "foo", "failure": False, "ending": False}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
+ self.mock_session.next_receiver(timeout=mox.IsA(int)).AndReturn(
+ self.mock_receiver)
+ self.mock_receiver.fetch().AndReturn(qpid.messaging.Message(
+ {"failure": False, "ending": True}))
+ self.mock_session.acknowledge(mox.IgnoreArg())
+ self.mock_session.close()
+ self.mock_connection.session().AndReturn(self.mock_session)
+
+ self.mox.ReplayAll()
+
+ try:
+ ctx = context.RequestContext("user", "project")
+ method = impl_qpid.call
+ if expect_failure:
+ try:
+ res = method(FLAGS, ctx, "impl_qpid_test",
+ {"method": "test_method", "args": {}},
+ timeout)
+ self.fail('Expected a timeout exception')
+ except rpc_common.Timeout:
+ # Good, this is expected!
+ pass
+ else:
+ res = method(FLAGS, ctx, "impl_qpid_test",
+ {"method": "test_method", "args": {}}, timeout)
+ self.assertEquals(res, "foo")
+ finally:
+ while impl_qpid.Connection.pool.free_items:
+ # Pull the mock connection object out of the connection pool so
+ # that it doesn't mess up other test cases.
+ impl_qpid.Connection.pool.get()
+
+ def test_call(self):
+ self._test_call(multi=False)
+
+ def test_call_with_timeout(self):
+ """A little more indepth for a timeout test. Specifically we are
+ looking to simulate the event sent to qpid dying on the vine due
+ to a TTL. A string test that actually involved qpid would be
+ excellent, but this at least verifies that the exceptions flow
+ like they should. TODO(beagles): is this really necessary or is
+ the the case for qpid at least the basic timeout test is
+ sufficient.
+ """
+ self._test_call_with_timeout(timeout=5, expect_failure=False)
+ self._test_call_with_timeout(timeout=0, expect_failure=True)
+
def test_multicall(self):
self._test_call(multi=True)
diff --git a/tools/install_venv_common.py b/tools/install_venv_common.py
index d6a78eb..155db3b 100644
--- a/tools/install_venv_common.py
+++ b/tools/install_venv_common.py
@@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack, LLC
+# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain