summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorBrent Eagles <beagles@redhat.com>2013-01-28 13:34:55 -0330
committerBrent Eagles <beagles@redhat.com>2013-01-28 17:45:08 -0330
commit30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f (patch)
treeadb7220ed3ecacf1fc5cab2e94c6643f2ea3e185 /openstack
parentb5288ab022d3733abae25971ca8afb56f1a7011b (diff)
downloadoslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.tar.gz
oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.tar.xz
oslo-30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f.zip
Add TTL to AMQP messages
This patch sets a TTL derived from the timeout parameter used when invoking the RPC. Needed for: * Bug 1053614 Change-Id: Idaf9a2a972feac3f95f170c52632033b027b4414
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py2
-rw-r--r--openstack/common/rpc/impl_kombu.py20
-rw-r--r--openstack/common/rpc/impl_qpid.py15
3 files changed, 27 insertions, 10 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 393e13e..d1cc9ea 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 19f0348..f1d46c4 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -302,9 +302,15 @@ class Publisher(object):
channel=channel,
routing_key=self.routing_key)
- def send(self, msg):
+ def send(self, msg, timeout=None):
"""Send a message"""
- self.producer.publish(msg)
+ if timeout:
+ #
+ # AMQP TTL is in milliseconds when set in the header.
+ #
+ self.producer.publish(msg, headers={'ttl': (timeout * 1000)})
+ else:
+ self.producer.publish(msg)
class DirectPublisher(Publisher):
@@ -653,7 +659,7 @@ class Connection(object):
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
- def publisher_send(self, cls, topic, msg, **kwargs):
+ def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
"""Send to a publisher based on the publisher class"""
def _error_callback(exc):
@@ -663,7 +669,7 @@ class Connection(object):
def _publish():
publisher = cls(self.conf, self.channel, topic, **kwargs)
- publisher.send(msg)
+ publisher.send(msg, timeout)
self.ensure(_error_callback, _publish)
@@ -691,9 +697,9 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
@@ -701,7 +707,7 @@ class Connection(object):
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
- self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
+ self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index b6691b4..9d3a3a7 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -486,9 +486,20 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
- def topic_send(self, topic, msg):
+ def topic_send(self, topic, msg, timeout=None):
"""Send a 'topic' message"""
- self.publisher_send(TopicPublisher, topic, msg)
+ #
+ # We want to create a message with attributes, e.g. a TTL. We
+ # don't really need to keep 'msg' in its JSON format any longer
+ # so let's create an actual qpid message here and get some
+ # value-add on the go.
+ #
+ # WARNING: Request timeout happens to be in the same units as
+ # qpid's TTL (seconds). If this changes in the future, then this
+ # will need to be altered accordingly.
+ #
+ qpid_message = qpid_messaging.Message(content=msg, ttl=timeout)
+ self.publisher_send(TopicPublisher, topic, qpid_message)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""