diff options
author | Eric Windisch <eric@cloudscaling.com> | 2012-08-15 11:13:11 -0400 |
---|---|---|
committer | Eric Windisch <eric@cloudscaling.com> | 2012-09-20 13:59:26 -0400 |
commit | 032bf345ea593e97fcc07e23c1725ba5f4b612d0 (patch) | |
tree | 7a11850a69c3327ae2b407316530b841df150d76 /openstack | |
parent | 60de2a453f77ee6fad1f7b619b31ac9caa60d7f6 (diff) | |
download | oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.gz oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.xz oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.zip |
fanout subscriptions in impl_zmq
This necessary to support Quantum's scalable agents
in the Folsom release.
Supports matchmaking and subscribing to publishers,
in addition to sender-only fanout broadcasting.
Until now, senders of fanout messages had to know
of all recipients via the matchmaker. This patch
allows recipients to discover message publishers
via the matchmaker. This is useful in patterns
when there are few publishers, but many subscribers.
The raw message passed to ZeroMQ is changed
so that zmq_subscribe can filter by topic.
This patch may be partially reverted in Grizzly,
after the arrival of the ServiceGroupAPI which
will significantly improve the matchmaking
capabilities.
Change-Id: Ifc132519c6e380cb0a3643680764a38b841add0e
Diffstat (limited to 'openstack')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 67 | ||||
-rw-r--r-- | openstack/common/rpc/matchmaker.py | 31 |
2 files changed, 78 insertions, 20 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index aab664c..5c9d190 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -58,6 +58,9 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), + cfg.IntOpt('rpc_zmq_port_pub', default=9502, + help='ZeroMQ fanout publisher port'), + cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -206,7 +209,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), + self.outq.send([str(topic), str(msg_id), str('cast'), _serialize(data)]) def close(self): @@ -299,6 +302,9 @@ class ConsumerBase(object): else: return [result] + def consume(self, sock): + raise NotImplementedError() + def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) + self.topic_proxy['fanout~'] = \ + ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['fanout~']) + def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB + + # This doesn't change what is in the message, + # it only specifies that these messages go to + # the generic fanout topic. + topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -434,17 +450,12 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) @@ -471,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -488,6 +499,11 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) + def _consume_fanout(self, reactor, topic, proxy, bind=False): + for topic, host in matchmaker.queues("publishers~%s" % (topic, )): + inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) + reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -495,22 +511,35 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Subscription scenarios + # Consume direct-push fanout messages (relay to local consumers) if fanout: - subscribe = ('', fanout)[type(fanout) == str] + # If we're not in here, we can't receive direct fanout messages + if CONF.rpc_zmq_host in matchmaker.queues(topic): + # Consume from all remote publishers. + self._consume_fanout(self.reactor, topic, proxy) + else: + LOG.warn("This service cannot receive direct PUSH fanout " + "messages without being known by the matchmaker.") + return + + # Configure consumer for direct pushes. + subscribe = (topic, fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic + + inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) + # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index 2791ac2..37d5521 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -132,6 +132,14 @@ class FanoutBinding(Binding): return False +class PublisherBinding(Binding): + """Match on publishers keys, where key starts with 'publishers.' string.""" + def test(self, key): + if key.startswith('publishers~'): + return True + return False + + class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] +class PublisherRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(PublisherRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "publishers~", strip it for lookup. + nkey = key.split('publishers~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + return map(lambda x: (key + '.' + x, x), self.ring[nkey] + + ['localhost']) class LocalhostExchange(Exchange): @@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() + self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) |