summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_kombu.py
diff options
context:
space:
mode:
authorSergey Lukjanov <slukjanov@mirantis.com>2013-06-02 20:41:20 +0400
committerSergey Lukjanov <slukjanov@mirantis.com>2013-06-03 07:53:21 +0400
commite3545f828dabe165dc08b2f1670e5f1f19919d0d (patch)
treeead970c643632ed624c6bda25e902f47cf13b49e /openstack/common/rpc/impl_kombu.py
parent15d8d698b7c67c43dc7a2b0c2c6952734bd2ba66 (diff)
downloadoslo-e3545f828dabe165dc08b2f1670e5f1f19919d0d.tar.gz
oslo-e3545f828dabe165dc08b2f1670e5f1f19919d0d.tar.xz
oslo-e3545f828dabe165dc08b2f1670e5f1f19919d0d.zip
Enable hacking H402 test
H402 one line docstring needs punctuation Change-Id: Ie848453cace318d8310cdf0234c512f4c1121119
Diffstat (limited to 'openstack/common/rpc/impl_kombu.py')
-rw-r--r--openstack/common/rpc/impl_kombu.py56
1 files changed, 28 insertions, 28 deletions
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 0960b9a..c062d9a 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -132,7 +132,7 @@ class ConsumerBase(object):
self.reconnect(channel)
def reconnect(self, channel):
- """Re-declare the queue after a rabbit reconnect"""
+ """Re-declare the queue after a rabbit reconnect."""
self.channel = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
@@ -173,7 +173,7 @@ class ConsumerBase(object):
self.queue.consume(*args, callback=_callback, **options)
def cancel(self):
- """Cancel the consuming from the queue, if it has started"""
+ """Cancel the consuming from the queue, if it has started."""
try:
self.queue.cancel(self.tag)
except KeyError as e:
@@ -184,7 +184,7 @@ class ConsumerBase(object):
class DirectConsumer(ConsumerBase):
- """Queue/consumer class for 'direct'"""
+ """Queue/consumer class for 'direct'."""
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
"""Init a 'direct' queue.
@@ -216,7 +216,7 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
- """Consumer class for 'topic'"""
+ """Consumer class for 'topic'."""
def __init__(self, conf, channel, topic, callback, tag, name=None,
exchange_name=None, **kwargs):
@@ -253,7 +253,7 @@ class TopicConsumer(ConsumerBase):
class FanoutConsumer(ConsumerBase):
- """Consumer class for 'fanout'"""
+ """Consumer class for 'fanout'."""
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
"""Init a 'fanout' queue.
@@ -286,7 +286,7 @@ class FanoutConsumer(ConsumerBase):
class Publisher(object):
- """Base Publisher class"""
+ """Base Publisher class."""
def __init__(self, channel, exchange_name, routing_key, **kwargs):
"""Init the Publisher class with the exchange_name, routing_key,
@@ -298,7 +298,7 @@ class Publisher(object):
self.reconnect(channel)
def reconnect(self, channel):
- """Re-establish the Producer after a rabbit reconnection"""
+ """Re-establish the Producer after a rabbit reconnection."""
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
**self.kwargs)
self.producer = kombu.messaging.Producer(exchange=self.exchange,
@@ -306,7 +306,7 @@ class Publisher(object):
routing_key=self.routing_key)
def send(self, msg, timeout=None):
- """Send a message"""
+ """Send a message."""
if timeout:
#
# AMQP TTL is in milliseconds when set in the header.
@@ -317,7 +317,7 @@ class Publisher(object):
class DirectPublisher(Publisher):
- """Publisher class for 'direct'"""
+ """Publisher class for 'direct'."""
def __init__(self, conf, channel, msg_id, **kwargs):
"""init a 'direct' publisher.
@@ -333,7 +333,7 @@ class DirectPublisher(Publisher):
class TopicPublisher(Publisher):
- """Publisher class for 'topic'"""
+ """Publisher class for 'topic'."""
def __init__(self, conf, channel, topic, **kwargs):
"""init a 'topic' publisher.
@@ -352,7 +352,7 @@ class TopicPublisher(Publisher):
class FanoutPublisher(Publisher):
- """Publisher class for 'fanout'"""
+ """Publisher class for 'fanout'."""
def __init__(self, conf, channel, topic, **kwargs):
"""init a 'fanout' publisher.
@@ -367,7 +367,7 @@ class FanoutPublisher(Publisher):
class NotifyPublisher(TopicPublisher):
- """Publisher class for 'notify'"""
+ """Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
@@ -579,18 +579,18 @@ class Connection(object):
self.reconnect()
def get_channel(self):
- """Convenience call for bin/clear_rabbit_queues"""
+ """Convenience call for bin/clear_rabbit_queues."""
return self.channel
def close(self):
- """Close/release this connection"""
+ """Close/release this connection."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release()
self.connection = None
def reset(self):
- """Reset a connection so it can be used again"""
+ """Reset a connection so it can be used again."""
self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close()
@@ -619,7 +619,7 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None):
- """Return an iterator that will consume from all queues/consumers"""
+ """Return an iterator that will consume from all queues/consumers."""
info = {'do_consume': True}
@@ -649,7 +649,7 @@ class Connection(object):
yield self.ensure(_error_callback, _consume)
def cancel_consumer_thread(self):
- """Cancel a consumer thread"""
+ """Cancel a consumer thread."""
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
@@ -664,7 +664,7 @@ class Connection(object):
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, timeout=None, **kwargs):
- """Send to a publisher based on the publisher class"""
+ """Send to a publisher based on the publisher class."""
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
@@ -694,27 +694,27 @@ class Connection(object):
topic, callback)
def declare_fanout_consumer(self, topic, callback):
- """Create a 'fanout' consumer"""
+ """Create a 'fanout' consumer."""
self.declare_consumer(FanoutConsumer, topic, callback)
def direct_send(self, msg_id, msg):
- """Send a 'direct' message"""
+ """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, timeout=None):
- """Send a 'topic' message"""
+ """Send a 'topic' message."""
self.publisher_send(TopicPublisher, topic, msg, timeout)
def fanout_send(self, topic, msg):
- """Send a 'fanout' message"""
+ """Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
- """Send a notify message on a topic"""
+ """Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, None, **kwargs)
def consume(self, limit=None):
- """Consume from all queues/consumers"""
+ """Consume from all queues/consumers."""
it = self.iterconsume(limit=limit)
while True:
try:
@@ -723,7 +723,7 @@ class Connection(object):
return
def consume_in_thread(self):
- """Consumer from all queues/consumers in a greenthread"""
+ """Consumer from all queues/consumers in a greenthread."""
def _consumer_thread():
try:
self.consume()
@@ -734,7 +734,7 @@ class Connection(object):
return self.consumer_thread
def create_consumer(self, topic, proxy, fanout=False):
- """Create a consumer that calls a method in a proxy object"""
+ """Create a consumer that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -746,7 +746,7 @@ class Connection(object):
self.declare_topic_consumer(topic, proxy_cb)
def create_worker(self, topic, proxy, pool_name):
- """Create a worker that calls a method in a proxy object"""
+ """Create a worker that calls a method in a proxy object."""
proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection))
@@ -779,7 +779,7 @@ class Connection(object):
def create_connection(conf, new=True):
- """Create a connection"""
+ """Create a connection."""
return rpc_amqp.create_connection(
conf, new,
rpc_amqp.get_connection_pool(conf, Connection))