diff options
Diffstat (limited to 'openstack/common/rpc/impl_kombu.py')
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 204 |
1 files changed, 98 insertions, 106 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index c32497a..b78df37 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -46,7 +46,7 @@ kombu_opts = [ cfg.StrOpt('kombu_ssl_ca_certs', default='', help=('SSL certification authority file ' - '(valid only if SSL enabled)')), + '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', help='the RabbitMQ host'), @@ -80,7 +80,7 @@ kombu_opts = [ default=False, help='use durable queues in RabbitMQ'), - ] +] cfg.CONF.register_opts(kombu_opts) @@ -171,22 +171,20 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=msg_id, - type='direct', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(DirectConsumer, self).__init__( - channel, - callback, - tag, - name=msg_id, - exchange=exchange, - routing_key=msg_id, - **options) + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__(channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) class TopicConsumer(ConsumerBase): @@ -208,22 +206,20 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=conf.control_exchange, - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(TopicConsumer, self).__init__( - channel, - callback, - tag, - name=name or topic, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=conf.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__(channel, + callback, + tag, + name=name or topic, + exchange=exchange, + routing_key=topic, + **options) class FanoutConsumer(ConsumerBase): @@ -245,22 +241,17 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=exchange_name, - type='fanout', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(FanoutConsumer, self).__init__( - channel, - callback, - tag, - name=queue_name, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__(channel, callback, tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) class Publisher(object): @@ -278,9 +269,10 @@ class Publisher(object): def reconnect(self, channel): """Re-establish the Producer after a rabbit reconnection""" self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) + **self.kwargs) self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, routing_key=self.routing_key) + channel=channel, + routing_key=self.routing_key) def send(self, msg): """Send a message""" @@ -296,14 +288,11 @@ class DirectPublisher(Publisher): """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(DirectPublisher, self).__init__(channel, - msg_id, - msg_id, - type='direct', - **options) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) class TopicPublisher(Publisher): @@ -314,14 +303,11 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - conf.control_exchange, - topic, - type='topic', - **options) + super(TopicPublisher, self).__init__(channel, conf.control_exchange, + topic, type='topic', **options) class FanoutPublisher(Publisher): @@ -332,14 +318,11 @@ class FanoutPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, - '%s_fanout' % topic, - None, - type='fanout', - **options) + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, + None, type='fanout', **options) class NotifyPublisher(TopicPublisher): @@ -356,10 +339,10 @@ class NotifyPublisher(TopicPublisher): # we do this to ensure that messages don't get dropped if the # consumer is started after we do queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - name=self.routing_key, - routing_key=self.routing_key) + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) queue.declare() @@ -445,7 +428,7 @@ class Connection(object): """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() except self.connection_errors: @@ -453,8 +436,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection( - **self.params) + self.connection = kombu.connection.BrokerConnection(**self.params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -504,8 +486,8 @@ class Connection(object): if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -520,8 +502,8 @@ class Connection(object): log_info['sleep_time'] = sleep_time LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -571,11 +553,11 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.channel, topic, callback, - self.consumer_num.next()) + self.consumer_num.next()) self.consumers.append(consumer) return consumer @@ -589,11 +571,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) info['do_consume'] = True def _consume(): @@ -627,7 +609,7 @@ class Connection(object): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) @@ -691,8 +673,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -701,57 +684,66 @@ class Connection(object): def create_worker(self, topic, proxy, pool_name): """Create a worker that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) self.declare_topic_consumer(topic, proxy_cb, pool_name) def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.notify( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): |