summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/impl_zmq.py67
-rw-r--r--openstack/common/rpc/matchmaker.py31
2 files changed, 78 insertions, 20 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index aab664c..5c9d190 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -58,6 +58,9 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
+ cfg.IntOpt('rpc_zmq_port_pub', default=9502,
+ help='ZeroMQ fanout publisher port'),
+
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
@@ -206,7 +209,7 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
+ self.outq.send([str(topic), str(msg_id), str('cast'),
_serialize(data)])
def close(self):
@@ -299,6 +302,9 @@ class ConsumerBase(object):
else:
return [result]
+ def consume(self, sock):
+ raise NotImplementedError()
+
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
@@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
+ self.topic_proxy['fanout~'] = \
+ ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['fanout~'])
+
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
+ topic, msg_id, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
+
+ # This doesn't change what is in the message,
+ # it only specifies that these messages go to
+ # the generic fanout topic.
+ topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
@@ -434,17 +450,12 @@ class ZmqProxy(ZmqBaseReactor):
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
-
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
@@ -471,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ topic, msg_id, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
@@ -488,6 +499,11 @@ class Connection(rpc_common.Connection):
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
+ def _consume_fanout(self, reactor, topic, proxy, bind=False):
+ for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
+ inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
+ reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
+
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
@@ -495,22 +511,35 @@ class Connection(rpc_common.Connection):
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
- # Subscription scenarios
+ # Consume direct-push fanout messages (relay to local consumers)
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
+ # If we're not in here, we can't receive direct fanout messages
+ if CONF.rpc_zmq_host in matchmaker.queues(topic):
+ # Consume from all remote publishers.
+ self._consume_fanout(self.reactor, topic, proxy)
+ else:
+ LOG.warn("This service cannot receive direct PUSH fanout "
+ "messages without being known by the matchmaker.")
+ return
+
+ # Configure consumer for direct pushes.
+ subscribe = (topic, fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
+
+ inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
+ # Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index 2791ac2..37d5521 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -132,6 +132,14 @@ class FanoutBinding(Binding):
return False
+class PublisherBinding(Binding):
+ """Match on publishers keys, where key starts with 'publishers.' string."""
+ def test(self, key):
+ if key.startswith('publishers~'):
+ return True
+ return False
+
+
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
@@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)]
+class PublisherRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(PublisherRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "publishers~", strip it for lookup.
+ nkey = key.split('publishers~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
@@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, )
)
return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
+ ['localhost'])
class LocalhostExchange(Exchange):
@@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
+ self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
@@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())