From 30a50c8a6c534f01d518eb3ce4cf0d35877d9a7f Mon Sep 17 00:00:00 2001 From: Brent Eagles Date: Mon, 28 Jan 2013 13:34:55 -0330 Subject: Add TTL to AMQP messages This patch sets a TTL derived from the timeout parameter used when invoking the RPC. Needed for: * Bug 1053614 Change-Id: Idaf9a2a972feac3f95f170c52632033b027b4414 --- openstack/common/rpc/amqp.py | 2 +- openstack/common/rpc/impl_kombu.py | 20 +++++++++++++------- openstack/common/rpc/impl_qpid.py | 15 +++++++++++++-- 3 files changed, 27 insertions(+), 10 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 393e13e..d1cc9ea 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 19f0348..f1d46c4 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -302,9 +302,15 @@ class Publisher(object): channel=channel, routing_key=self.routing_key) - def send(self, msg): + def send(self, msg, timeout=None): """Send a message""" - self.producer.publish(msg) + if timeout: + # + # AMQP TTL is in milliseconds when set in the header. + # + self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) + else: + self.producer.publish(msg) class DirectPublisher(Publisher): @@ -653,7 +659,7 @@ class Connection(object): for proxy_cb in self.proxy_callbacks: proxy_cb.wait() - def publisher_send(self, cls, topic, msg, **kwargs): + def publisher_send(self, cls, topic, msg, timeout=None, **kwargs): """Send to a publisher based on the publisher class""" def _error_callback(exc): @@ -663,7 +669,7 @@ class Connection(object): def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) - publisher.send(msg) + publisher.send(msg, timeout) self.ensure(_error_callback, _publish) @@ -691,9 +697,9 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + self.publisher_send(TopicPublisher, topic, msg, timeout) def fanout_send(self, topic, msg): """Send a 'fanout' message""" @@ -701,7 +707,7 @@ class Connection(object): def notify_send(self, topic, msg, **kwargs): """Send a notify message on a topic""" - self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs) def consume(self, limit=None): """Consume from all queues/consumers""" diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index b6691b4..9d3a3a7 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -486,9 +486,20 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg): + def topic_send(self, topic, msg, timeout=None): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg) + # + # We want to create a message with attributes, e.g. a TTL. We + # don't really need to keep 'msg' in its JSON format any longer + # so let's create an actual qpid message here and get some + # value-add on the go. + # + # WARNING: Request timeout happens to be in the same units as + # qpid's TTL (seconds). If this changes in the future, then this + # will need to be altered accordingly. + # + qpid_message = qpid_messaging.Message(content=msg, ttl=timeout) + self.publisher_send(TopicPublisher, topic, qpid_message) def fanout_send(self, topic, msg): """Send a 'fanout' message""" -- cgit