diff options
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 67 | ||||
-rw-r--r-- | openstack/common/rpc/matchmaker.py | 31 |
2 files changed, 78 insertions, 20 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index aab664c..5c9d190 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -58,6 +58,9 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), + cfg.IntOpt('rpc_zmq_port_pub', default=9502, + help='ZeroMQ fanout publisher port'), + cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -206,7 +209,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), + self.outq.send([str(topic), str(msg_id), str('cast'), _serialize(data)]) def close(self): @@ -299,6 +302,9 @@ class ConsumerBase(object): else: return [result] + def consume(self, sock): + raise NotImplementedError() + def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) + self.topic_proxy['fanout~'] = \ + ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['fanout~']) + def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB + + # This doesn't change what is in the message, + # it only specifies that these messages go to + # the generic fanout topic. + topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -434,17 +450,12 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) @@ -471,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data + topic, msg_id, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -488,6 +499,11 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) + def _consume_fanout(self, reactor, topic, proxy, bind=False): + for topic, host in matchmaker.queues("publishers~%s" % (topic, )): + inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) + reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -495,22 +511,35 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Subscription scenarios + # Consume direct-push fanout messages (relay to local consumers) if fanout: - subscribe = ('', fanout)[type(fanout) == str] + # If we're not in here, we can't receive direct fanout messages + if CONF.rpc_zmq_host in matchmaker.queues(topic): + # Consume from all remote publishers. + self._consume_fanout(self.reactor, topic, proxy) + else: + LOG.warn("This service cannot receive direct PUSH fanout " + "messages without being known by the matchmaker.") + return + + # Configure consumer for direct pushes. + subscribe = (topic, fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic + + inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) + # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index 2791ac2..37d5521 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -132,6 +132,14 @@ class FanoutBinding(Binding): return False +class PublisherBinding(Binding): + """Match on publishers keys, where key starts with 'publishers.' string.""" + def test(self, key): + if key.startswith('publishers~'): + return True + return False + + class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] +class PublisherRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(PublisherRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "publishers~", strip it for lookup. + nkey = key.split('publishers~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + return map(lambda x: (key + '.' + x, x), self.ring[nkey] + + ['localhost']) class LocalhostExchange(Exchange): @@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() + self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() + self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) |