diff options
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 28 | ||||
-rw-r--r-- | tests/unit/rpc/common.py | 9 | ||||
-rw-r--r-- | tests/unit/rpc/test_zmq.py | 9 |
3 files changed, 32 insertions, 14 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index f011955..cf3fdc8 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() topic = data[1] - topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - if topic.startswith('fanout~') or topic.startswith('zmq_replies'): + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): sock_type = zmq.PUB else: sock_type = zmq.PUSH @@ -588,23 +590,23 @@ class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) - # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -615,9 +617,11 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() @@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) diff --git a/tests/unit/rpc/common.py b/tests/unit/rpc/common.py index 6359142..5a2af19 100644 --- a/tests/unit/rpc/common.py +++ b/tests/unit/rpc/common.py @@ -140,7 +140,8 @@ class BaseRpcTestCase(test_utils.BaseTestCase): "args": {"value": value}}) self.assertEqual(self.context.to_dict(), result) - def _test_cast(self, method, value, args=None, fanout=False): + def _test_cast(self, method, value, args=None, fanout=False, + topic_nested=None): """Test casts by pushing items through a channeled queue. @param: method a reference to a method returning a value @@ -148,6 +149,8 @@ class BaseRpcTestCase(test_utils.BaseTestCase): @param: args optional dictionary arguments to method @param: fanout boolean for use of rpc fanout method """ + topic_nested = topic_nested or self.topic_nested + # Not a true global, but capitalized so # it is clear it is leaking scope into Nested() QUEUE = eventlet.queue.Queue() @@ -163,7 +166,7 @@ class BaseRpcTestCase(test_utils.BaseTestCase): QUEUE.put(method(*args, **kwargs)) nested = Nested() - conn = self._create_consumer(nested, self.topic_nested, fanout) + conn = self._create_consumer(nested, topic_nested, fanout) rpc_method = (self.rpc.cast, self.rpc.fanout_cast)[fanout] @@ -173,7 +176,7 @@ class BaseRpcTestCase(test_utils.BaseTestCase): msg['args'].update(args) rpc_method(FLAGS, self.context, - self.topic_nested, + topic_nested, msg) try: diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py index 7a1e501..c197c35 100644 --- a/tests/unit/rpc/test_zmq.py +++ b/tests/unit/rpc/test_zmq.py @@ -127,3 +127,12 @@ class RpcZmqDirectTopicTestCase(_RpcZmqBaseTestCase): super(RpcZmqDirectTopicTestCase, self).setUp( topic='test.127.0.0.1', topic_nested='nested.127.0.0.1') + + def test_cast_wrong_direct_topic_failure(self): + try: + self._test_cast(common.TestReceiver.echo, 42, {"value": 42}, + fanout=False, topic_nested='nested.localhost') + except Exception: + return + self.expectFailure("Message should not have been consumed.", + self.assertTrue, True) |