From 6930432887f3551f88d08815fd04808fd15a07cc Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Thu, 31 Jan 2013 23:14:32 -0500 Subject: Fix IPC direct topic routing. Direct messages were being stripped of the host value when performing IPC forwarding. This caused direct topics to be round-robined to all services running on the system consuming from the same base topic name. i.e. if 'scheduler.host1' and 'scheduler.host2' were running on the SAME machine, messages to 'scheduler.host1' may have been routed to 'scheduler.host2'. Now, mulitple processing specifying different rpc_zmq_host parameters will consume on separate direct topics and will not round-robin to other processes. Adds a zmq-specific test to ensure that messages to directed topics are not consumed by other consumers of direct topics sharing a bare topic on the same host. Fixes bug 1123715 Change-Id: I939c24397e58492fc16561666aed3ca891325e9c --- openstack/common/rpc/impl_zmq.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index f011955..cf3fdc8 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() topic = data[1] - topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - if topic.startswith('fanout~') or topic.startswith('zmq_replies'): + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): sock_type = zmq.PUB else: sock_type = zmq.PUSH @@ -588,23 +590,23 @@ class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) - # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -615,9 +617,11 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() @@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) -- cgit