diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-20 04:03:02 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-20 04:03:02 +0000 |
| commit | 15377750465b6eb261d2354988b9c90f1f3c1d29 (patch) | |
| tree | 5b1f75d6e4b83e33f591ed64f50eb628bfdacbd5 /openstack | |
| parent | c4b6fca586f5c7426cdcefab694edc8214df20e5 (diff) | |
| parent | 6930432887f3551f88d08815fd04808fd15a07cc (diff) | |
| download | oslo-15377750465b6eb261d2354988b9c90f1f3c1d29.tar.gz oslo-15377750465b6eb261d2354988b9c90f1f3c1d29.tar.xz oslo-15377750465b6eb261d2354988b9c90f1f3c1d29.zip | |
Merge "Fix IPC direct topic routing."
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index f011955..cf3fdc8 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -439,11 +439,13 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() topic = data[1] - topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - if topic.startswith('fanout~') or topic.startswith('zmq_replies'): + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): sock_type = zmq.PUB else: sock_type = zmq.PUSH @@ -588,23 +590,23 @@ class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) - # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -615,9 +617,11 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() @@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) |
