summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/amqp.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/rpc/amqp.py')
-rw-r--r--openstack/common/rpc/amqp.py21
1 files changed, 11 insertions, 10 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index a79a3aa..cc40ff3 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -92,8 +92,9 @@ class ConnectionContext(rpc_common.Connection):
if pooled:
self.connection = connection_pool.get()
else:
- self.connection = connection_pool.connection_cls(conf,
- server_params=server_params)
+ self.connection = connection_pool.connection_cls(
+ conf,
+ server_params=server_params)
self.pooled = pooled
def __enter__(self):
@@ -161,8 +162,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
msg = {'result': reply, 'failure': failure}
except TypeError:
msg = {'result': dict((k, repr(v))
- for k, v in reply.__dict__.iteritems()),
- 'failure': failure}
+ for k, v in reply.__dict__.iteritems()),
+ 'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
@@ -288,8 +289,8 @@ class ProxyCallback(object):
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
- self._iterator = connection.iterconsume(
- timeout=timeout or conf.rpc_response_timeout)
+ self._iterator = connection.iterconsume(timeout=timeout or
+ conf.rpc_response_timeout)
self._result = None
self._done = False
self._got_ending = False
@@ -308,7 +309,7 @@ class MulticallWaiter(object):
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
+ failure)
elif data.get('ending', False):
self._got_ending = True
@@ -389,16 +390,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
+ server_params=server_params) as conn:
conn.topic_send(topic, msg)
def fanout_cast_to_server(conf, context, server_params, topic, msg,
- connection_pool):
+ connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
- server_params=server_params) as conn:
+ server_params=server_params) as conn:
conn.fanout_send(topic, msg)