summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/impl_zmq.py67
-rw-r--r--openstack/common/rpc/matchmaker.py31
2 files changed, 20 insertions, 78 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 5c9d190..aab664c 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -58,9 +58,6 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
- cfg.IntOpt('rpc_zmq_port_pub', default=9502,
- help='ZeroMQ fanout publisher port'),
-
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
@@ -209,7 +206,7 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
- self.outq.send([str(topic), str(msg_id), str('cast'),
+ self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
def close(self):
@@ -302,9 +299,6 @@ class ConsumerBase(object):
else:
return [result]
- def consume(self, sock):
- raise NotImplementedError()
-
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
@@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
- self.topic_proxy['fanout~'] = \
- ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
- CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['fanout~'])
-
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
- topic, msg_id, style, in_msg = data
+ msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor):
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
-
- # This doesn't change what is in the message,
- # it only specifies that these messages go to
- # the generic fanout topic.
- topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
@@ -450,12 +434,17 @@ class ZmqProxy(ZmqBaseReactor):
else:
sock_type = zmq.PUSH
- if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
+ if not topic in self.topic_proxy:
+ outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
+ sock_type, bind=True)
+ self.topic_proxy[topic] = outq
+ self.sockets.append(outq)
+ LOG.info(_("Created topic proxy: %s"), topic)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
@@ -482,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
- topic, msg_id, style, in_msg = data
+ msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
@@ -499,11 +488,6 @@ class Connection(rpc_common.Connection):
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
- def _consume_fanout(self, reactor, topic, proxy, bind=False):
- for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
- inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
- reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
-
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
@@ -511,35 +495,22 @@ class Connection(rpc_common.Connection):
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
- # Consume direct-push fanout messages (relay to local consumers)
+ # Subscription scenarios
if fanout:
- # If we're not in here, we can't receive direct fanout messages
- if CONF.rpc_zmq_host in matchmaker.queues(topic):
- # Consume from all remote publishers.
- self._consume_fanout(self.reactor, topic, proxy)
- else:
- LOG.warn("This service cannot receive direct PUSH fanout "
- "messages without being known by the matchmaker.")
- return
-
- # Configure consumer for direct pushes.
- subscribe = (topic, fanout)[type(fanout) == str]
+ subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
-
- inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
- # Receive messages from (local) proxy
- inaddr = "ipc://%s/zmq_topic_%s" % \
- (CONF.rpc_zmq_ipc_dir, topic)
+ # Receive messages from (local) proxy
+ inaddr = "ipc://%s/zmq_topic_%s" % \
+ (CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
- # Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index 37d5521..2791ac2 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -132,14 +132,6 @@ class FanoutBinding(Binding):
return False
-class PublisherBinding(Binding):
- """Match on publishers keys, where key starts with 'publishers.' string."""
- def test(self, key):
- if key.startswith('publishers~'):
- return True
- return False
-
-
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
@@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)]
-class PublisherRingExchange(RingExchange):
- """Fanout Exchange based on a hashmap."""
- def __init__(self, ring=None):
- super(PublisherRingExchange, self).__init__(ring)
-
- def run(self, key):
- # Assume starts with "publishers~", strip it for lookup.
- nkey = key.split('publishers~')[1:][0]
- if not self._ring_has(nkey):
- LOG.warn(
- _("No key defining hosts for topic '%s', "
- "see ringfile") % (nkey, )
- )
- return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey])
-
-
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
@@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, )
)
return []
- return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
- ['localhost'])
+ return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
@@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
- self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase):
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
@@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
- self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())