summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc
diff options
context:
space:
mode:
authorDina Belova <dbelova@mirantis.com>2013-06-03 17:44:55 +0400
committerDina Belova <dbelova@mirantis.com>2013-06-11 18:28:29 +0400
commit7119e29cb535426c587eaf2cfc2cfcd11a422df0 (patch)
treeee61f4db5c8ef559a9c50c5c501fb6c421f41fa0 /openstack/common/rpc
parentebaa578351c9c6b47c2f28ef6d74451e1483036b (diff)
downloadoslo-7119e29cb535426c587eaf2cfc2cfcd11a422df0.tar.gz
oslo-7119e29cb535426c587eaf2cfc2cfcd11a422df0.tar.xz
oslo-7119e29cb535426c587eaf2cfc2cfcd11a422df0.zip
Enable hacking H404 test.
H404 - multi line docstring should start with a summary. Change-Id: I2099e1ee81ff9657f7a07401b8e8f3327d03bdbd
Diffstat (limited to 'openstack/common/rpc')
-rw-r--r--openstack/common/rpc/amqp.py31
-rw-r--r--openstack/common/rpc/common.py16
-rw-r--r--openstack/common/rpc/impl_zmq.py50
-rw-r--r--openstack/common/rpc/matchmaker.py122
-rw-r--r--openstack/common/rpc/matchmaker_redis.py12
-rw-r--r--openstack/common/rpc/matchmaker_ring.py8
6 files changed, 106 insertions, 133 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index f5b7cab..64ef582 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -102,15 +102,15 @@ def get_connection_pool(conf, connection_cls):
class ConnectionContext(rpc_common.Connection):
- """The class that is actually returned to the caller of
- create_connection(). This is essentially a wrapper around
- Connection that supports 'with'. It can also return a new
- Connection, or one from a pool. The function will also catch
- when an instance of this class is to be deleted. With that
- we can return Connections to the pool on exceptions and so
- forth without making the caller be responsible for catching
- them. If possible the function makes sure to return a
- connection to the pool.
+ """The class that is actually returned to the create_connection() caller.
+
+ This is essentially a wrapper around Connection that supports 'with'.
+ It can also return a new Connection, or one from a pool.
+
+ The function will also catch when an instance of this class is to be
+ deleted. With that we can return Connections to the pool on exceptions
+ and so forth without making the caller be responsible for catching them.
+ If possible the function makes sure to return a connection to the pool.
"""
def __init__(self, conf, connection_pool, pooled=True, server_params=None):
@@ -339,8 +339,9 @@ def _add_unique_id(msg):
class _ThreadPoolWithWait(object):
- """Base class for a delayed invocation manager used by
- the Connection class to start up green threads
+ """Base class for a delayed invocation manager.
+
+ Used by the Connection class to start up green threads
to handle incoming messages.
"""
@@ -355,12 +356,14 @@ class _ThreadPoolWithWait(object):
class CallbackWrapper(_ThreadPoolWithWait):
- """Wraps a straight callback to allow it to be invoked in a green
- thread.
+ """Wraps a straight callback.
+
+ Allows it to be invoked in a green thread.
"""
def __init__(self, conf, callback, connection_pool):
- """
+ """Initiates CallbackWrapper object.
+
:param conf: cfg.CONF instance
:param callback: a callable (probably a function)
:param connection_pool: connection pool as returned by
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 28dcacd..f880608 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -124,7 +124,8 @@ class Timeout(RPCException):
'info: "%(info)s"')
def __init__(self, info=None, topic=None, method=None):
- """
+ """Initiates Timeout object.
+
:param info: Extra info to convey to the user
:param topic: The topic that the rpc call was sent to
:param rpc_method_name: The name of the rpc method being
@@ -221,9 +222,9 @@ class Connection(object):
raise NotImplementedError()
def join_consumer_pool(self, callback, pool_name, topic, exchange_name):
- """Register as a member of a group of consumers for a given topic from
- the specified exchange.
+ """Register as a member of a group of consumers.
+ Uses given topic from the specified exchange.
Exactly one member of a given pool will receive each message.
A message will be delivered to multiple pools, if more than
@@ -414,10 +415,10 @@ class CommonRpcContext(object):
class ClientException(Exception):
- """This encapsulates some actual exception that is expected to be
- hit by an RPC proxy object. Merely instantiating it records the
- current exception information, which will be passed back to the
- RPC client without exceptional logging.
+ """Encapsulates actual exception expected to be hit by a RPC proxy object.
+
+ Merely instantiating it records the current exception information, which
+ will be passed back to the RPC client without exceptional logging.
"""
def __init__(self):
self._exc_info = sys.exc_info()
@@ -435,6 +436,7 @@ def catch_client_exception(exceptions, func, *args, **kwargs):
def client_exceptions(*exceptions):
"""Decorator for manager methods that raise expected exceptions.
+
Marking a Manager method with this decorator allows the declaration
of expected exceptions that the RPC layer should not consider fatal,
and not log as if they were generated in a real error scenario. Note
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 07b7b41..0bb5741 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -84,8 +84,8 @@ matchmaker = None # memoized matchmaker object
def _serialize(data):
- """
- Serialization wrapper
+ """Serialization wrapper.
+
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
@@ -97,18 +97,15 @@ def _serialize(data):
def _deserialize(data):
- """
- Deserialization wrapper
- """
+ """Deserialization wrapper."""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
class ZmqSocket(object):
- """
- A tiny wrapper around ZeroMQ to simplify the send/recv protocol
- and connection management.
+ """A tiny wrapper around ZeroMQ.
+ Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement).
"""
@@ -355,10 +352,9 @@ class ConsumerBase(object):
class ZmqBaseReactor(ConsumerBase):
- """
- A consumer class implementing a
- centralized casting broker (PULL-PUSH)
- for RoundRobin requests.
+ """A consumer class implementing a centralized casting broker (PULL-PUSH).
+
+ Used for RoundRobin requests.
"""
def __init__(self, conf):
@@ -429,10 +425,9 @@ class ZmqBaseReactor(ConsumerBase):
class ZmqProxy(ZmqBaseReactor):
- """
- A consumer class implementing a
- topic-based proxy, forwarding to
- IPC sockets.
+ """A consumer class implementing a topic-based proxy.
+
+ Forwards to IPC sockets.
"""
def __init__(self, conf):
@@ -539,8 +534,9 @@ class ZmqProxy(ZmqBaseReactor):
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
- Takes a list and returns a dictionary.
- i.e. [1,2,3,4] => {1: 2, 3: 4}
+
+ Takes a list and returns a dictionary.
+ i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
@@ -553,10 +549,9 @@ def unflatten_envelope(packenv):
class ZmqReactor(ZmqBaseReactor):
- """
- A consumer class implementing a
- consumer for messages. Can also be
- used as a 1:1 proxy
+ """A consumer class implementing a consumer for messages.
+
+ Can also be used as a 1:1 proxy
"""
def __init__(self, conf):
@@ -743,10 +738,9 @@ def _call(addr, context, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
- """
- Wraps the sending of messages,
- dispatches to the matchmaker and sends
- message to all relevant hosts.
+ """Wraps the sending of messages.
+
+ Dispatches to the matchmaker and sends message to all relevant hosts.
"""
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
@@ -803,8 +797,8 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
def notify(conf, context, topic, msg, envelope):
- """
- Send notification event.
+ """Send notification event.
+
Notifications are sent to topic-priority.
This differs from the AMQP drivers which send to topic.priority.
"""
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index 98a8f43..e51636d 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -48,8 +48,8 @@ class MatchMakerException(Exception):
class Exchange(object):
- """
- Implements lookups.
+ """Implements lookups.
+
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
@@ -60,9 +60,7 @@ class Exchange(object):
class Binding(object):
- """
- A binding on which to perform a lookup.
- """
+ """A binding on which to perform a lookup."""
def __init__(self):
pass
@@ -71,10 +69,10 @@ class Binding(object):
class MatchMakerBase(object):
- """
- Match Maker Base Class.
- Build off HeartbeatMatchMakerBase if building a
- heartbeat-capable MatchMaker.
+ """Match Maker Base Class.
+
+ Build off HeartbeatMatchMakerBase if building a heartbeat-capable
+ MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
@@ -84,58 +82,47 @@ class MatchMakerBase(object):
'registration or heartbeat.')
def register(self, key, host):
- """
- Register a host on a backend.
+ """Register a host on a backend.
+
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
- """
- Acknowledge that a key.host is alive.
- Used internally for updating heartbeats,
- but may also be used publically to acknowledge
- a system is alive (i.e. rpc message successfully
- sent to host)
+ """Acknowledge that a key.host is alive.
+
+ Used internally for updating heartbeats, but may also be used
+ publically to acknowledge a system is alive (i.e. rpc message
+ successfully sent to host)
"""
pass
def is_alive(self, topic, host):
- """
- Checks if a host is alive.
- """
+ """Checks if a host is alive."""
pass
def expire(self, topic, host):
- """
- Explicitly expire a host's registration.
- """
+ """Explicitly expire a host's registration."""
pass
def send_heartbeats(self):
- """
- Send all heartbeats.
+ """Send all heartbeats.
+
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
- """
- Unregister a topic.
- """
+ """Unregister a topic."""
pass
def start_heartbeat(self):
- """
- Spawn heartbeat greenthread.
- """
+ """Spawn heartbeat greenthread."""
pass
def stop_heartbeat(self):
- """
- Destroys the heartbeat greenthread.
- """
+ """Destroys the heartbeat greenthread."""
pass
def add_binding(self, binding, rule, last=True):
@@ -162,10 +149,10 @@ class MatchMakerBase(object):
class HeartbeatMatchMakerBase(MatchMakerBase):
- """
- Base for a heart-beat capable MatchMaker.
- Provides common methods for registering,
- unregistering, and maintaining heartbeats.
+ """Base for a heart-beat capable MatchMaker.
+
+ Provides common methods for registering, unregistering, and maintaining
+ heartbeats.
"""
def __init__(self):
self.hosts = set()
@@ -175,8 +162,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
- """
- Send all heartbeats.
+ """Send all heartbeats.
+
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
@@ -184,32 +171,31 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self.ack_alive(key, host)
def ack_alive(self, key, host):
- """
- Acknowledge that a host.topic is alive.
- Used internally for updating heartbeats,
- but may also be used publically to acknowledge
- a system is alive (i.e. rpc message successfully
- sent to host)
+ """Acknowledge that a host.topic is alive.
+
+ Used internally for updating heartbeats, but may also be used
+ publically to acknowledge a system is alive (i.e. rpc message
+ successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
- """
- Implements registration logic.
+ """Implements registration logic.
+
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
- """
- Implements de-registration logic.
+ """Implements de-registration logic.
+
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
- """
- Register a host on a backend.
+ """Register a host on a backend.
+
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
@@ -221,9 +207,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self.ack_alive(key, host)
def unregister(self, key, host):
- """
- Unregister a topic.
- """
+ """Unregister a topic."""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
@@ -234,8 +218,8 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
{'key': key, 'host': host})
def start_heartbeat(self):
- """
- Implementation of MatchMakerBase.start_heartbeat
+ """Implementation of MatchMakerBase.start_heartbeat.
+
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
@@ -252,16 +236,14 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
- """
- Destroys the heartbeat greenthread.
- """
+ """Destroys the heartbeat greenthread."""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
- """
- Specifies a host in the key via a '.' character
+ """Specifies a host in the key via a '.' character.
+
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
@@ -272,8 +254,8 @@ class DirectBinding(Binding):
class TopicBinding(Binding):
- """
- Where a 'bare' key without dots.
+ """Where a 'bare' key without dots.
+
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
@@ -310,8 +292,8 @@ class LocalhostExchange(Exchange):
class DirectExchange(Exchange):
- """
- Exchange where all topic keys are split, sending to second half.
+ """Exchange where all topic keys are split, sending to second half.
+
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
@@ -323,8 +305,8 @@ class DirectExchange(Exchange):
class MatchMakerLocalhost(MatchMakerBase):
- """
- Match Maker where all bare topics resolve to localhost.
+ """Match Maker where all bare topics resolve to localhost.
+
Useful for testing.
"""
def __init__(self, host='localhost'):
@@ -335,8 +317,8 @@ class MatchMakerLocalhost(MatchMakerBase):
class MatchMakerStub(MatchMakerBase):
- """
- Match Maker where topics are untouched.
+ """Match Maker where topics are untouched.
+
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
diff --git a/openstack/common/rpc/matchmaker_redis.py b/openstack/common/rpc/matchmaker_redis.py
index 367c2c9..8bab9f9 100644
--- a/openstack/common/rpc/matchmaker_redis.py
+++ b/openstack/common/rpc/matchmaker_redis.py
@@ -55,8 +55,8 @@ class RedisExchange(mm_common.Exchange):
class RedisTopicExchange(RedisExchange):
- """
- Exchange where all topic keys are split, sending to second half.
+ """Exchange where all topic keys are split, sending to second half.
+
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
@@ -77,9 +77,7 @@ class RedisTopicExchange(RedisExchange):
class RedisFanoutExchange(RedisExchange):
- """
- Return a list of all hosts.
- """
+ """Return a list of all hosts."""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
@@ -90,9 +88,7 @@ class RedisFanoutExchange(RedisExchange):
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
- """
- MatchMaker registering and looking-up hosts with a Redis server.
- """
+ """MatchMaker registering and looking-up hosts with a Redis server."""
def __init__(self):
super(MatchMakerRedis, self).__init__()
diff --git a/openstack/common/rpc/matchmaker_ring.py b/openstack/common/rpc/matchmaker_ring.py
index dbfb36f..2f1f79b 100644
--- a/openstack/common/rpc/matchmaker_ring.py
+++ b/openstack/common/rpc/matchmaker_ring.py
@@ -43,9 +43,7 @@ LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
- """
- Match Maker where hosts are loaded from a static file containing
- a hashmap (JSON formatted).
+ """Match Maker where hosts are loaded from a static JSON formatted file.
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
@@ -104,9 +102,7 @@ class FanoutRingExchange(RingExchange):
class MatchMakerRing(mm.MatchMakerBase):
- """
- Match Maker where hosts are loaded from a static hashmap.
- """
+ """Match Maker where hosts are loaded from a static hashmap."""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))