diff options
author | Eric Windisch <eric@cloudscaling.com> | 2012-08-15 11:13:11 -0400 |
---|---|---|
committer | Eric Windisch <eric@cloudscaling.com> | 2012-09-20 13:59:26 -0400 |
commit | 032bf345ea593e97fcc07e23c1725ba5f4b612d0 (patch) | |
tree | 7a11850a69c3327ae2b407316530b841df150d76 /openstack/common/rpc/matchmaker.py | |
parent | 60de2a453f77ee6fad1f7b619b31ac9caa60d7f6 (diff) | |
download | oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.gz oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.xz oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.zip |
fanout subscriptions in impl_zmq
This necessary to support Quantum's scalable agents
in the Folsom release.
Supports matchmaking and subscribing to publishers,
in addition to sender-only fanout broadcasting.
Until now, senders of fanout messages had to know
of all recipients via the matchmaker. This patch
allows recipients to discover message publishers
via the matchmaker. This is useful in patterns
when there are few publishers, but many subscribers.
The raw message passed to ZeroMQ is changed
so that zmq_subscribe can filter by topic.
This patch may be partially reverted in Grizzly,
after the arrival of the ServiceGroupAPI which
will significantly improve the matchmaking
capabilities.
Change-Id: Ifc132519c6e380cb0a3643680764a38b841add0e
Diffstat (limited to 'openstack/common/rpc/matchmaker.py')
-rw-r--r-- | openstack/common/rpc/matchmaker.py | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index 2791ac2..37d5521 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -132,6 +132,14 @@ class FanoutBinding(Binding): return False +class PublisherBinding(Binding): + """Match on publishers keys, where key starts with 'publishers.' string.""" + def test(self, key): + if key.startswith('publishers~'): + return True + return False + + class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] +class PublisherRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(PublisherRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "publishers~", strip it for lookup. + nkey = key.split('publishers~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + return map(lambda x: (key + '.' + x, x), self.ring[nkey] + + ['localhost']) class LocalhostExchange(Exchange): @@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() + self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) |