summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/impl_zmq.py28
-rw-r--r--tests/unit/rpc/common.py9
-rw-r--r--tests/unit/rpc/test_zmq.py9
3 files changed, 32 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index f011955..cf3fdc8 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic = data[1]
- topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@@ -588,23 +590,23 @@ class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
-
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -615,9 +617,11 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
@@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
diff --git a/tests/unit/rpc/common.py b/tests/unit/rpc/common.py
index 6359142..5a2af19 100644
--- a/tests/unit/rpc/common.py
+++ b/tests/unit/rpc/common.py
@@ -140,7 +140,8 @@ class BaseRpcTestCase(test_utils.BaseTestCase):
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
- def _test_cast(self, method, value, args=None, fanout=False):
+ def _test_cast(self, method, value, args=None, fanout=False,
+ topic_nested=None):
"""Test casts by pushing items through a channeled queue.
@param: method a reference to a method returning a value
@@ -148,6 +149,8 @@ class BaseRpcTestCase(test_utils.BaseTestCase):
@param: args optional dictionary arguments to method
@param: fanout boolean for use of rpc fanout method
"""
+ topic_nested = topic_nested or self.topic_nested
+
# Not a true global, but capitalized so
# it is clear it is leaking scope into Nested()
QUEUE = eventlet.queue.Queue()
@@ -163,7 +166,7 @@ class BaseRpcTestCase(test_utils.BaseTestCase):
QUEUE.put(method(*args, **kwargs))
nested = Nested()
- conn = self._create_consumer(nested, self.topic_nested, fanout)
+ conn = self._create_consumer(nested, topic_nested, fanout)
rpc_method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
@@ -173,7 +176,7 @@ class BaseRpcTestCase(test_utils.BaseTestCase):
msg['args'].update(args)
rpc_method(FLAGS, self.context,
- self.topic_nested,
+ topic_nested,
msg)
try:
diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py
index 7a1e501..c197c35 100644
--- a/tests/unit/rpc/test_zmq.py
+++ b/tests/unit/rpc/test_zmq.py
@@ -127,3 +127,12 @@ class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase):
super(RpcZmqDirectTopicTestCase, self).setUp(
topic='test.127.0.0.1',
topic_nested='nested.127.0.0.1')
+
+ def test_cast_wrong_direct_topic_failure(self):
+ try:
+ self._test_cast(common.TestReceiver.echo, 42, {"value": 42},
+ fanout=False, topic_nested='nested.localhost')
+ except Exception:
+ return
+ self.expectFailure("Message should not have been consumed.",
+ self.assertTrue, True)