diff options
Diffstat (limited to 'openstack/common/rpc/impl_qpid.py')
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 112 |
1 files changed, 63 insertions, 49 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 78236f2..850a60c 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -77,7 +77,7 @@ qpid_opts = [ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Disable Nagle algorithm'), - ] +] cfg.CONF.register_opts(qpid_opts) @@ -161,10 +161,10 @@ class DirectConsumer(ConsumerBase): """ super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) class TopicConsumer(ConsumerBase): @@ -181,8 +181,9 @@ class TopicConsumer(ConsumerBase): """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (conf.control_exchange, topic), {}, - name or topic, {}) + "%s/%s" % (conf.control_exchange, + topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -196,11 +197,12 @@ class FanoutConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(FanoutConsumer, self).__init__(session, callback, - "%s_fanout" % topic, - {"durable": False, "type": "fanout"}, - "%s_fanout_%s" % (topic, uuid.uuid4().hex), - {"exclusive": True}) + super(FanoutConsumer, self).__init__( + session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) class Publisher(object): @@ -254,8 +256,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic)) + super(TopicPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic)) class FanoutPublisher(Publisher): @@ -263,8 +266,9 @@ class FanoutPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ - super(FanoutPublisher, self).__init__(session, - "%s_fanout" % topic, {"type": "fanout"}) + super(FanoutPublisher, self).__init__( + session, + "%s_fanout" % topic, {"type": "fanout"}) class NotifyPublisher(Publisher): @@ -272,9 +276,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic), - {"durable": True}) + super(NotifyPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic), + {"durable": True}) class Connection(object): @@ -292,9 +297,9 @@ class Connection(object): server_params = {} default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) + port=self.conf.qpid_port, + username=self.conf.qpid_username, + password=self.conf.qpid_password) params = server_params for key in default_params.keys(): @@ -312,18 +317,18 @@ class Connection(object): self.connection.reconnect = self.conf.qpid_reconnect if self.conf.qpid_reconnect_timeout: self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) + self.conf.qpid_reconnect_timeout) if self.conf.qpid_reconnect_limit: self.connection.reconnect_limit = self.conf.qpid_reconnect_limit if self.conf.qpid_reconnect_interval_max: self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) + self.conf.qpid_reconnect_interval_max) if self.conf.qpid_reconnect_interval_min: self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) + self.conf.qpid_reconnect_interval_min) if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + self.conf.qpid_reconnect_interval) self.connection.hearbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay @@ -395,7 +400,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.session, topic, callback) @@ -410,11 +415,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid.messaging.exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) @@ -444,7 +449,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): publisher = cls(self.conf, self.session, topic) @@ -508,8 +513,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -522,8 +528,9 @@ class Connection(object): def create_worker(self, topic, proxy, pool_name): """Create a worker that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) @@ -535,50 +542,57 @@ class Connection(object): def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic, - msg, rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): |