summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-02-20 04:03:02 +0000
committerGerrit Code Review <review@openstack.org>2013-02-20 04:03:02 +0000
commit15377750465b6eb261d2354988b9c90f1f3c1d29 (patch)
tree5b1f75d6e4b83e33f591ed64f50eb628bfdacbd5 /openstack/common
parentc4b6fca586f5c7426cdcefab694edc8214df20e5 (diff)
parent6930432887f3551f88d08815fd04808fd15a07cc (diff)
downloadoslo-15377750465b6eb261d2354988b9c90f1f3c1d29.tar.gz
oslo-15377750465b6eb261d2354988b9c90f1f3c1d29.tar.xz
oslo-15377750465b6eb261d2354988b9c90f1f3c1d29.zip
Merge "Fix IPC direct topic routing."
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_zmq.py28
1 files changed, 17 insertions, 11 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index f011955..cf3fdc8 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic = data[1]
- topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@@ -588,23 +590,23 @@ class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
-
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -615,9 +617,11 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
@@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)