summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/impl_zmq.py
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-01-31 23:14:32 -0500
committerEric Windisch <eric@cloudscaling.com>2013-02-19 21:16:03 -0500
commit6930432887f3551f88d08815fd04808fd15a07cc (patch)
treecbefd1f2dfa04aa15cd63ceeacb4bd8d6aadc8ed /openstack/common/rpc/impl_zmq.py
parenta1cc88f63228ab42d1cf7958c0989c7d7d4b1ef2 (diff)
downloadoslo-6930432887f3551f88d08815fd04808fd15a07cc.tar.gz
oslo-6930432887f3551f88d08815fd04808fd15a07cc.tar.xz
oslo-6930432887f3551f88d08815fd04808fd15a07cc.zip
Fix IPC direct topic routing.
Direct messages were being stripped of the host value when performing IPC forwarding. This caused direct topics to be round-robined to all services running on the system consuming from the same base topic name. i.e. if 'scheduler.host1' and 'scheduler.host2' were running on the SAME machine, messages to 'scheduler.host1' may have been routed to 'scheduler.host2'. Now, mulitple processing specifying different rpc_zmq_host parameters will consume on separate direct topics and will not round-robin to other processes. Adds a zmq-specific test to ensure that messages to directed topics are not consumed by other consumers of direct topics sharing a bare topic on the same host. Fixes bug 1123715 Change-Id: I939c24397e58492fc16561666aed3ca891325e9c
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r--openstack/common/rpc/impl_zmq.py28
1 files changed, 17 insertions, 11 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index f011955..cf3fdc8 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic = data[1]
- topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@@ -588,23 +590,23 @@ class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
-
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -615,9 +617,11 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
@@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)