summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2012-08-15 11:13:11 -0400
committerEric Windisch <eric@cloudscaling.com>2012-09-20 13:59:26 -0400
commit032bf345ea593e97fcc07e23c1725ba5f4b612d0 (patch)
tree7a11850a69c3327ae2b407316530b841df150d76 /openstack
parent60de2a453f77ee6fad1f7b619b31ac9caa60d7f6 (diff)
downloadoslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.gz
oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.tar.xz
oslo-032bf345ea593e97fcc07e23c1725ba5f4b612d0.zip
fanout subscriptions in impl_zmq
This necessary to support Quantum's scalable agents in the Folsom release. Supports matchmaking and subscribing to publishers, in addition to sender-only fanout broadcasting. Until now, senders of fanout messages had to know of all recipients via the matchmaker. This patch allows recipients to discover message publishers via the matchmaker. This is useful in patterns when there are few publishers, but many subscribers. The raw message passed to ZeroMQ is changed so that zmq_subscribe can filter by topic. This patch may be partially reverted in Grizzly, after the arrival of the ServiceGroupAPI which will significantly improve the matchmaking capabilities. Change-Id: Ifc132519c6e380cb0a3643680764a38b841add0e
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_zmq.py67
-rw-r--r--openstack/common/rpc/matchmaker.py31
2 files changed, 78 insertions, 20 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index aab664c..5c9d190 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -58,6 +58,9 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
+ cfg.IntOpt('rpc_zmq_port_pub', default=9502,
+ help='ZeroMQ fanout publisher port'),
+
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
@@ -206,7 +209,7 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
- self.outq.send([str(msg_id), str(topic), str('cast'),
+ self.outq.send([str(topic), str(msg_id), str('cast'),
_serialize(data)])
def close(self):
@@ -299,6 +302,9 @@ class ConsumerBase(object):
else:
return [result]
+ def consume(self, sock):
+ raise NotImplementedError()
+
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
@@ -411,12 +417,17 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
+ self.topic_proxy['fanout~'] = \
+ ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
+ self.sockets.append(self.topic_proxy['fanout~'])
+
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- msg_id, topic, style, in_msg = data
+ topic, msg_id, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -424,6 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
+
+ # This doesn't change what is in the message,
+ # it only specifies that these messages go to
+ # the generic fanout topic.
+ topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
@@ -434,17 +450,12 @@ class ZmqProxy(ZmqBaseReactor):
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
-
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
@@ -471,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- msg_id, topic, style, in_msg = data
+ topic, msg_id, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
@@ -488,6 +499,11 @@ class Connection(rpc_common.Connection):
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
+ def _consume_fanout(self, reactor, topic, proxy, bind=False):
+ for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
+ inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
+ reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
+
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
@@ -495,22 +511,35 @@ class Connection(rpc_common.Connection):
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
- # Subscription scenarios
+ # Consume direct-push fanout messages (relay to local consumers)
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
+ # If we're not in here, we can't receive direct fanout messages
+ if CONF.rpc_zmq_host in matchmaker.queues(topic):
+ # Consume from all remote publishers.
+ self._consume_fanout(self.reactor, topic, proxy)
+ else:
+ LOG.warn("This service cannot receive direct PUSH fanout "
+ "messages without being known by the matchmaker.")
+ return
+
+ # Configure consumer for direct pushes.
+ subscribe = (topic, fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
+
+ inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
+ # Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index 2791ac2..37d5521 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -132,6 +132,14 @@ class FanoutBinding(Binding):
return False
+class PublisherBinding(Binding):
+ """Match on publishers keys, where key starts with 'publishers.' string."""
+ def test(self, key):
+ if key.startswith('publishers~'):
+ return True
+ return False
+
+
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
@@ -182,6 +190,23 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)]
+class PublisherRingExchange(RingExchange):
+ """Fanout Exchange based on a hashmap."""
+ def __init__(self, ring=None):
+ super(PublisherRingExchange, self).__init__(ring)
+
+ def run(self, key):
+ # Assume starts with "publishers~", strip it for lookup.
+ nkey = key.split('publishers~')[1:][0]
+ if not self._ring_has(nkey):
+ LOG.warn(
+ _("No key defining hosts for topic '%s', "
+ "see ringfile") % (nkey, )
+ )
+ return []
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+
+
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
@@ -196,7 +221,8 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, )
)
return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
+ ['localhost'])
class LocalhostExchange(Exchange):
@@ -227,6 +253,7 @@ class MatchMakerRing(MatchMakerBase):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
+ self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -239,6 +266,7 @@ class MatchMakerLocalhost(MatchMakerBase):
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
@@ -253,6 +281,7 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
+ self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())