diff options
author | Eric Windisch <eric@cloudscaling.com> | 2013-01-31 23:14:32 -0500 |
---|---|---|
committer | Eric Windisch <eric@cloudscaling.com> | 2013-02-19 21:16:03 -0500 |
commit | 6930432887f3551f88d08815fd04808fd15a07cc (patch) | |
tree | cbefd1f2dfa04aa15cd63ceeacb4bd8d6aadc8ed /openstack/common/rpc/impl_zmq.py | |
parent | a1cc88f63228ab42d1cf7958c0989c7d7d4b1ef2 (diff) | |
download | oslo-6930432887f3551f88d08815fd04808fd15a07cc.tar.gz oslo-6930432887f3551f88d08815fd04808fd15a07cc.tar.xz oslo-6930432887f3551f88d08815fd04808fd15a07cc.zip |
Fix IPC direct topic routing.
Direct messages were being stripped of the host
value when performing IPC forwarding. This caused
direct topics to be round-robined to all
services running on the system consuming from
the same base topic name.
i.e. if 'scheduler.host1' and 'scheduler.host2'
were running on the SAME machine, messages to
'scheduler.host1' may have been routed to
'scheduler.host2'.
Now, mulitple processing specifying different
rpc_zmq_host parameters will consume on
separate direct topics and will not
round-robin to other processes.
Adds a zmq-specific test to ensure that messages to
directed topics are not consumed by other
consumers of direct topics sharing a bare
topic on the same host.
Fixes bug 1123715
Change-Id: I939c24397e58492fc16561666aed3ca891325e9c
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index f011955..cf3fdc8 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() topic = data[1] - topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - if topic.startswith('fanout~') or topic.startswith('zmq_replies'): + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): sock_type = zmq.PUB else: sock_type = zmq.PUSH @@ -588,23 +590,23 @@ class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) - # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -615,9 +617,11 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() @@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) |