summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/matchmaker.py
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2012-08-15 11:13:11 -0400
committerEric Windisch <eric@cloudscaling.com>2012-09-20 13:59:26 -0400
commit032bf345ea593e97fcc07e23c1725ba5f4b612d0 (patch)
tree7a11850a69c3327ae2b407316530b841df150d76 /openstack/common/rpc/matchmaker.py
parent60de2a453f77ee6fad1f7b619b31ac9caa60d7f6 (diff)
downloadoslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.gz
oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.xz
oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.zip
fanout subscriptions in impl_zmq
This necessary to support Quantum's scalable agents in the Folsom release. Supports matchmaking and subscribing to publishers, in addition to sender-only fanout broadcasting. Until now, senders of fanout messages had to know of all recipients via the matchmaker. This patch allows recipients to discover message publishers via the matchmaker. This is useful in patterns when there are few publishers, but many subscribers. The raw message passed to ZeroMQ is changed so that zmq_subscribe can filter by topic. This patch may be partially reverted in Grizzly, after the arrival of the ServiceGroupAPI which will significantly improve the matchmaking capabilities. Change-Id: Ifc132519c6e380cb0a3643680764a38b841add0e
Diffstat (limited to 'openstack/common/rpc/matchmaker.py')
-rw-r--r--openstack/common/rpc/matchmaker.py31
1 files changed, 30 insertions, 1 deletions
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index 2791ac2..37d5521 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -132,6 +132,14 @@ class FanoutBinding(Binding):
return False
+class PublisherBinding(Binding):
+ """Match on publishers keys, where key starts with 'publishers.' string."""
+ def test(self, key):
+ if key.startswith('publishers~'):
+ return True
+ return False
+
+
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
@@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)]
+class PublisherRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(PublisherRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "publishers~", strip it for lookup.
+ nkey = key.split('publishers~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
@@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, )
)
return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
+ ['localhost'])
class LocalhostExchange(Exchange):
@@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
+ self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
@@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())