diff options
author | Gary Kotton <gkotton@redhat.com> | 2012-06-17 04:05:37 -0400 |
---|---|---|
committer | Gary Kotton <gkotton@redhat.com> | 2012-06-18 01:15:09 -0400 |
commit | 9f938720f158889252fa1db44be96745fa48e1ff (patch) | |
tree | 5383ca2084fc6e188c59bef6224c78b2719a5ed9 /openstack/common/rpc | |
parent | 925edb3ee8bbd97afaa43b2888ab45d2bca50faf (diff) | |
download | oslo-9f938720f158889252fa1db44be96745fa48e1ff.tar.gz oslo-9f938720f158889252fa1db44be96745fa48e1ff.tar.xz oslo-9f938720f158889252fa1db44be96745fa48e1ff.zip |
Update common code to support pep 1.3.
bug 1014216
Change-Id: I3f8fa2e11c9d3f3d34fb20f65ce886bb9c94463d
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r-- | openstack/common/rpc/__init__.py | 2 | ||||
-rw-r--r-- | openstack/common/rpc/amqp.py | 21 | ||||
-rw-r--r-- | openstack/common/rpc/common.py | 6 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 204 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 112 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 42 | ||||
-rw-r--r-- | openstack/common/rpc/matchmaker.py | 4 | ||||
-rw-r--r-- | openstack/common/rpc/proxy.py | 2 |
8 files changed, 199 insertions, 194 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 26bd048..dc4b185 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -56,7 +56,7 @@ rpc_opts = [ cfg.BoolOpt('fake_rabbit', default=False, help='If passed, use a fake RabbitMQ provider'), - ] +] cfg.CONF.register_opts(rpc_opts) diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index a79a3aa..cc40ff3 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -92,8 +92,9 @@ class ConnectionContext(rpc_common.Connection): if pooled: self.connection = connection_pool.get() else: - self.connection = connection_pool.connection_cls(conf, - server_params=server_params) + self.connection = connection_pool.connection_cls( + conf, + server_params=server_params) self.pooled = pooled def __enter__(self): @@ -161,8 +162,8 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, msg = {'result': reply, 'failure': failure} except TypeError: msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + for k, v in reply.__dict__.iteritems()), + 'failure': failure} if ending: msg['ending'] = True conn.direct_send(msg_id, msg) @@ -288,8 +289,8 @@ class ProxyCallback(object): class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection - self._iterator = connection.iterconsume( - timeout=timeout or conf.rpc_response_timeout) + self._iterator = connection.iterconsume(timeout=timeout or + conf.rpc_response_timeout) self._result = None self._done = False self._got_ending = False @@ -308,7 +309,7 @@ class MulticallWaiter(object): if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, - failure) + failure) elif data.get('ending', False): self._got_ending = True @@ -389,16 +390,16 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, - server_params=server_params) as conn: + server_params=server_params) as conn: conn.topic_send(topic, msg) def fanout_cast_to_server(conf, context, server_params, topic, msg, - connection_pool): + connection_pool): """Sends a message on a fanout exchange to a specific server.""" pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, - server_params=server_params) as conn: + server_params=server_params) as conn: conn.fanout_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 6acd72c..a724ea2 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -167,10 +167,8 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = { - 'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), - } + SANITIZE = {'set_admin_password': ('new_pass',), + 'run_instance': ('admin_password',), } has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_context_token = '_context_auth_token' in msg_data diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index c32497a..b78df37 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -46,7 +46,7 @@ kombu_opts = [ cfg.StrOpt('kombu_ssl_ca_certs', default='', help=('SSL certification authority file ' - '(valid only if SSL enabled)')), + '(valid only if SSL enabled)')), cfg.StrOpt('rabbit_host', default='localhost', help='the RabbitMQ host'), @@ -80,7 +80,7 @@ kombu_opts = [ default=False, help='use durable queues in RabbitMQ'), - ] +] cfg.CONF.register_opts(kombu_opts) @@ -171,22 +171,20 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=msg_id, - type='direct', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(DirectConsumer, self).__init__( - channel, - callback, - tag, - name=msg_id, - exchange=exchange, - routing_key=msg_id, - **options) + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__(channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) class TopicConsumer(ConsumerBase): @@ -208,22 +206,20 @@ class TopicConsumer(ConsumerBase): """ # Default options options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=conf.control_exchange, - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(TopicConsumer, self).__init__( - channel, - callback, - tag, - name=name or topic, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=conf.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__(channel, + callback, + tag, + name=name or topic, + exchange=exchange, + routing_key=topic, + **options) class FanoutConsumer(ConsumerBase): @@ -245,22 +241,17 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=exchange_name, - type='fanout', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(FanoutConsumer, self).__init__( - channel, - callback, - tag, - name=queue_name, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__(channel, callback, tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) class Publisher(object): @@ -278,9 +269,10 @@ class Publisher(object): def reconnect(self, channel): """Re-establish the Producer after a rabbit reconnection""" self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) + **self.kwargs) self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, routing_key=self.routing_key) + channel=channel, + routing_key=self.routing_key) def send(self, msg): """Send a message""" @@ -296,14 +288,11 @@ class DirectPublisher(Publisher): """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(DirectPublisher, self).__init__(channel, - msg_id, - msg_id, - type='direct', - **options) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) class TopicPublisher(Publisher): @@ -314,14 +303,11 @@ class TopicPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': conf.rabbit_durable_queues, - 'auto_delete': False, - 'exclusive': False} + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - conf.control_exchange, - topic, - type='topic', - **options) + super(TopicPublisher, self).__init__(channel, conf.control_exchange, + topic, type='topic', **options) class FanoutPublisher(Publisher): @@ -332,14 +318,11 @@ class FanoutPublisher(Publisher): Kombu options may be passed as keyword args to override defaults """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, - '%s_fanout' % topic, - None, - type='fanout', - **options) + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, + None, type='fanout', **options) class NotifyPublisher(TopicPublisher): @@ -356,10 +339,10 @@ class NotifyPublisher(TopicPublisher): # we do this to ensure that messages don't get dropped if the # consumer is started after we do queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - name=self.routing_key, - routing_key=self.routing_key) + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) queue.declare() @@ -445,7 +428,7 @@ class Connection(object): """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() except self.connection_errors: @@ -453,8 +436,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection( - **self.params) + self.connection = kombu.connection.BrokerConnection(**self.params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -504,8 +486,8 @@ class Connection(object): if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -520,8 +502,8 @@ class Connection(object): log_info['sleep_time'] = sleep_time LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -571,11 +553,11 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.channel, topic, callback, - self.consumer_num.next()) + self.consumer_num.next()) self.consumers.append(consumer) return consumer @@ -589,11 +571,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) info['do_consume'] = True def _consume(): @@ -627,7 +609,7 @@ class Connection(object): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publish(): publisher = cls(self.conf, self.channel, topic, **kwargs) @@ -691,8 +673,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -701,57 +684,66 @@ class Connection(object): def create_worker(self, topic, proxy, pool_name): """Create a worker that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) self.declare_topic_consumer(topic, proxy_cb, pool_name) def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.notify( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 78236f2..850a60c 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -77,7 +77,7 @@ qpid_opts = [ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Disable Nagle algorithm'), - ] +] cfg.CONF.register_opts(qpid_opts) @@ -161,10 +161,10 @@ class DirectConsumer(ConsumerBase): """ super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) class TopicConsumer(ConsumerBase): @@ -181,8 +181,9 @@ class TopicConsumer(ConsumerBase): """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (conf.control_exchange, topic), {}, - name or topic, {}) + "%s/%s" % (conf.control_exchange, + topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): @@ -196,11 +197,12 @@ class FanoutConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(FanoutConsumer, self).__init__(session, callback, - "%s_fanout" % topic, - {"durable": False, "type": "fanout"}, - "%s_fanout_%s" % (topic, uuid.uuid4().hex), - {"exclusive": True}) + super(FanoutConsumer, self).__init__( + session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) class Publisher(object): @@ -254,8 +256,9 @@ class TopicPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic)) + super(TopicPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic)) class FanoutPublisher(Publisher): @@ -263,8 +266,9 @@ class FanoutPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ - super(FanoutPublisher, self).__init__(session, - "%s_fanout" % topic, {"type": "fanout"}) + super(FanoutPublisher, self).__init__( + session, + "%s_fanout" % topic, {"type": "fanout"}) class NotifyPublisher(Publisher): @@ -272,9 +276,10 @@ class NotifyPublisher(Publisher): def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__(session, - "%s/%s" % (conf.control_exchange, topic), - {"durable": True}) + super(NotifyPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic), + {"durable": True}) class Connection(object): @@ -292,9 +297,9 @@ class Connection(object): server_params = {} default_params = dict(hostname=self.conf.qpid_hostname, - port=self.conf.qpid_port, - username=self.conf.qpid_username, - password=self.conf.qpid_password) + port=self.conf.qpid_port, + username=self.conf.qpid_username, + password=self.conf.qpid_password) params = server_params for key in default_params.keys(): @@ -312,18 +317,18 @@ class Connection(object): self.connection.reconnect = self.conf.qpid_reconnect if self.conf.qpid_reconnect_timeout: self.connection.reconnect_timeout = ( - self.conf.qpid_reconnect_timeout) + self.conf.qpid_reconnect_timeout) if self.conf.qpid_reconnect_limit: self.connection.reconnect_limit = self.conf.qpid_reconnect_limit if self.conf.qpid_reconnect_interval_max: self.connection.reconnect_interval_max = ( - self.conf.qpid_reconnect_interval_max) + self.conf.qpid_reconnect_interval_max) if self.conf.qpid_reconnect_interval_min: self.connection.reconnect_interval_min = ( - self.conf.qpid_reconnect_interval_min) + self.conf.qpid_reconnect_interval_min) if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( - self.conf.qpid_reconnect_interval) + self.conf.qpid_reconnect_interval) self.connection.hearbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay @@ -395,7 +400,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): consumer = consumer_cls(self.conf, self.session, topic, callback) @@ -410,11 +415,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid.messaging.exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) @@ -444,7 +449,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): publisher = cls(self.conf, self.session, topic) @@ -508,8 +513,9 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -522,8 +528,9 @@ class Connection(object): def create_worker(self, topic, proxy, pool_name): """Create a worker that calls a method in a proxy object""" - proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy, - rpc_amqp.get_connection_pool(self.conf, Connection)) + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) @@ -535,50 +542,57 @@ class Connection(object): def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(conf, new, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(conf, context, topic, msg, timeout, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(conf, context, server_params, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.fanout_cast_to_server(conf, context, server_params, topic, - msg, rpc_amqp.get_connection_pool(conf, Connection)) + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index da69dd7..560e649 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -40,25 +40,26 @@ RPCException = rpc_common.RPCException zmq_opts = [ cfg.StrOpt('rpc_zmq_bind_address', default='*', - help='ZeroMQ bind address. Should be a wildcard (*), ' - 'an ethernet interface, or IP. ' - 'The "host" option should point or resolve to this address.'), + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), # The module.Class to use for matchmaking. cfg.StrOpt('rpc_zmq_matchmaker', - default='openstack.common.rpc.matchmaker.MatchMakerLocalhost', - help='MatchMaker driver'), + default='openstack.common.rpc.matchmaker.MatchMakerLocalhost', + help='MatchMaker driver'), # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, - help='ZeroMQ receiver listening port'), + help='ZeroMQ receiver listening port'), cfg.IntOpt('rpc_zmq_contexts', default=1, - help='Number of ZeroMQ contexts, defaults to 1'), + help='Number of ZeroMQ contexts, defaults to 1'), cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', - help='Directory for holding IPC sockets'), - ] + help='Directory for holding IPC sockets'), +] # These globals are defined in register_opts(conf), @@ -119,10 +120,10 @@ class ZmqSocket(object): self.subscribe(f) LOG.debug(_("Connecting to %{addr}s with %{type}s" - "\n-> Subscribed to %{subscribe}s" - "\n-> bind: %{bind}s"), - {'addr': addr, 'type': self.socket_s(), - 'subscribe': subscribe, 'bind': bind}) + "\n-> Subscribed to %{subscribe}s" + "\n-> bind: %{bind}s"), + {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind}) try: if bind: @@ -197,7 +198,7 @@ class ZmqClient(object): def cast(self, msg_id, topic, data): self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + _serialize(data)]) def close(self): self.outq.close() @@ -306,7 +307,7 @@ class ConsumerBase(object): data.setdefault('version', None) data.setdefault('args', []) proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], **data['args']) class ZmqBaseReactor(ConsumerBase): @@ -339,7 +340,7 @@ class ZmqBaseReactor(ConsumerBase): # Items push in. inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, - subscribe=subscribe) + subscribe=subscribe) self.proxies[inq] = proxy self.sockets.append(inq) @@ -353,8 +354,7 @@ class ZmqBaseReactor(ConsumerBase): raise RPCException("Bad output socktype") # Items push out. - outq = ZmqSocket(out_addr, zmq_type_out, - bind=out_bind) + outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) self.mapping[inq] = outq self.mapping[outq] = inq @@ -428,7 +428,7 @@ class ZmqProxy(ZmqBaseReactor): if not topic in self.topic_proxy: outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) + sock_type, bind=True) self.topic_proxy[topic] = outq self.sockets.append(outq) LOG.info(_("Created topic proxy: %s"), topic) @@ -486,7 +486,7 @@ class Connection(rpc_common.Connection): topic = topic.split('.', 1)[0] LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) + {'topic': topic}) # Subscription scenarios if fanout: @@ -502,7 +502,7 @@ class Connection(rpc_common.Connection): (self.conf.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), - ['PULL', 'SUB'][sock_type == zmq.SUB]) + ['PULL', 'SUB'][sock_type == zmq.SUB]) self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index 4da1dcd..34a9ae4 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -29,8 +29,8 @@ from openstack.common import cfg matchmaker_opts = [ # Matchmaker ring file cfg.StrOpt('matchmaker_ringfile', - default='/etc/nova/matchmaker_ring.json', - help='Matchmaker ring file (JSON)'), + default='/etc/nova/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), ] CONF = cfg.CONF diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 4f4dff5..112a66f 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -127,7 +127,7 @@ class RpcProxy(object): rpc.fanout_cast(context, self.topic, msg) def cast_to_server(self, context, server_params, msg, topic=None, - version=None): + version=None): """rpc.cast_to_server() a remote method. :param context: The request context |