summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_qpid.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/rpc/impl_qpid.py')
-rw-r--r--openstack/common/rpc/impl_qpid.py112
1 files changed, 63 insertions, 49 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 78236f2..850a60c 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -77,7 +77,7 @@ qpid_opts = [
cfg.BoolOpt('qpid_tcp_nodelay',
default=True,
help='Disable Nagle algorithm'),
- ]
+]
cfg.CONF.register_opts(qpid_opts)
@@ -161,10 +161,10 @@ class DirectConsumer(ConsumerBase):
"""
super(DirectConsumer, self).__init__(session, callback,
- "%s/%s" % (msg_id, msg_id),
- {"type": "direct"},
- msg_id,
- {"exclusive": True})
+ "%s/%s" % (msg_id, msg_id),
+ {"type": "direct"},
+ msg_id,
+ {"exclusive": True})
class TopicConsumer(ConsumerBase):
@@ -181,8 +181,9 @@ class TopicConsumer(ConsumerBase):
"""
super(TopicConsumer, self).__init__(session, callback,
- "%s/%s" % (conf.control_exchange, topic), {},
- name or topic, {})
+ "%s/%s" % (conf.control_exchange,
+ topic),
+ {}, name or topic, {})
class FanoutConsumer(ConsumerBase):
@@ -196,11 +197,12 @@ class FanoutConsumer(ConsumerBase):
'callback' is the callback to call when messages are received
"""
- super(FanoutConsumer, self).__init__(session, callback,
- "%s_fanout" % topic,
- {"durable": False, "type": "fanout"},
- "%s_fanout_%s" % (topic, uuid.uuid4().hex),
- {"exclusive": True})
+ super(FanoutConsumer, self).__init__(
+ session, callback,
+ "%s_fanout" % topic,
+ {"durable": False, "type": "fanout"},
+ "%s_fanout_%s" % (topic, uuid.uuid4().hex),
+ {"exclusive": True})
class Publisher(object):
@@ -254,8 +256,9 @@ class TopicPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(TopicPublisher, self).__init__(session,
- "%s/%s" % (conf.control_exchange, topic))
+ super(TopicPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic))
class FanoutPublisher(Publisher):
@@ -263,8 +266,9 @@ class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'fanout' publisher.
"""
- super(FanoutPublisher, self).__init__(session,
- "%s_fanout" % topic, {"type": "fanout"})
+ super(FanoutPublisher, self).__init__(
+ session,
+ "%s_fanout" % topic, {"type": "fanout"})
class NotifyPublisher(Publisher):
@@ -272,9 +276,10 @@ class NotifyPublisher(Publisher):
def __init__(self, conf, session, topic):
"""init a 'topic' publisher.
"""
- super(NotifyPublisher, self).__init__(session,
- "%s/%s" % (conf.control_exchange, topic),
- {"durable": True})
+ super(NotifyPublisher, self).__init__(
+ session,
+ "%s/%s" % (conf.control_exchange, topic),
+ {"durable": True})
class Connection(object):
@@ -292,9 +297,9 @@ class Connection(object):
server_params = {}
default_params = dict(hostname=self.conf.qpid_hostname,
- port=self.conf.qpid_port,
- username=self.conf.qpid_username,
- password=self.conf.qpid_password)
+ port=self.conf.qpid_port,
+ username=self.conf.qpid_username,
+ password=self.conf.qpid_password)
params = server_params
for key in default_params.keys():
@@ -312,18 +317,18 @@ class Connection(object):
self.connection.reconnect = self.conf.qpid_reconnect
if self.conf.qpid_reconnect_timeout:
self.connection.reconnect_timeout = (
- self.conf.qpid_reconnect_timeout)
+ self.conf.qpid_reconnect_timeout)
if self.conf.qpid_reconnect_limit:
self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
if self.conf.qpid_reconnect_interval_max:
self.connection.reconnect_interval_max = (
- self.conf.qpid_reconnect_interval_max)
+ self.conf.qpid_reconnect_interval_max)
if self.conf.qpid_reconnect_interval_min:
self.connection.reconnect_interval_min = (
- self.conf.qpid_reconnect_interval_min)
+ self.conf.qpid_reconnect_interval_min)
if self.conf.qpid_reconnect_interval:
self.connection.reconnect_interval = (
- self.conf.qpid_reconnect_interval)
+ self.conf.qpid_reconnect_interval)
self.connection.hearbeat = self.conf.qpid_heartbeat
self.connection.protocol = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
@@ -395,7 +400,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
- "%(err_str)s") % log_info)
+ "%(err_str)s") % log_info)
def _declare_consumer():
consumer = consumer_cls(self.conf, self.session, topic, callback)
@@ -410,11 +415,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid.messaging.exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
- str(exc))
+ str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
- str(exc))
+ str(exc))
def _consume():
nxt_receiver = self.session.next_receiver(timeout=timeout)
@@ -444,7 +449,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
- "'%(topic)s': %(err_str)s") % log_info)
+ "'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():
publisher = cls(self.conf, self.session, topic)
@@ -508,8 +513,9 @@ class Connection(object):
def create_consumer(self, topic, proxy, fanout=False):
"""Create a consumer that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -522,8 +528,9 @@ class Connection(object):
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
- proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
- rpc_amqp.get_connection_pool(self.conf, Connection))
+ proxy_cb = rpc_amqp.ProxyCallback(
+ self.conf, proxy,
+ rpc_amqp.get_connection_pool(self.conf, Connection))
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
@@ -535,50 +542,57 @@ class Connection(object):
def create_connection(conf, new=True):
"""Create a connection"""
- return rpc_amqp.create_connection(conf, new,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.create_connection(
+ conf, new,
+ rpc_amqp.get_connection_pool(conf, Connection))
def multicall(conf, context, topic, msg, timeout=None):
"""Make a call that returns multiple times."""
- return rpc_amqp.multicall(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.multicall(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def call(conf, context, topic, msg, timeout=None):
"""Sends a message on a topic and wait for a response."""
- return rpc_amqp.call(conf, context, topic, msg, timeout,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.call(
+ conf, context, topic, msg, timeout,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast(conf, context, topic, msg):
"""Sends a message on a topic without waiting for a response."""
- return rpc_amqp.cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast(conf, context, topic, msg):
"""Sends a message on a fanout exchange without waiting for a response."""
- return rpc_amqp.fanout_cast(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast(
+ conf, context, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a topic to a specific server."""
- return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def fanout_cast_to_server(conf, context, server_params, topic, msg):
"""Sends a message on a fanout exchange to a specific server."""
- return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic,
- msg, rpc_amqp.get_connection_pool(conf, Connection))
+ return rpc_amqp.fanout_cast_to_server(
+ conf, context, server_params, topic, msg,
+ rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection))
def cleanup():