summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2012-09-12 15:21:10 -0400
committerEric Windisch <eric@cloudscaling.com>2012-09-20 15:40:19 -0400
commitd5095d7effe79c1dcf19a67ff03e00a454662830 (patch)
tree5139d358ec47fe6050a9d01ca2e127433074a239
parent032bf345ea593e97fcc07e23c1725ba5f4b612d0 (diff)
downloadoslo-d5095d7effe79c1dcf19a67ff03e00a454662830.tar.gz
oslo-d5095d7effe79c1dcf19a67ff03e00a454662830.tar.xz
oslo-d5095d7effe79c1dcf19a67ff03e00a454662830.zip
Support declare_topic_consumer in impl_zmq
Required to support scalable agents in Quantum. declare_topic_consumer is implementation-specific, but it (or similar) is needed to consume notifications. Change-Id: Ifdeff8ae0ed90106a6bfaae0116db9d91d6b6faf
-rw-r--r--openstack/common/rpc/impl_zmq.py29
1 files changed, 29 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 5c9d190..5e373ba 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -462,6 +462,20 @@ class ZmqProxy(ZmqBaseReactor):
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+class CallbackReactor(ZmqBaseReactor):
+ """
+ A consumer class passing messages to a callback
+ """
+
+ def __init__(self, conf, callback):
+ self._cb = callback
+ super(CallbackReactor, self).__init__(conf)
+
+ def consume(self, sock):
+ data = sock.recv()
+ self._cb(data[3])
+
+
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@@ -504,6 +518,21 @@ class Connection(rpc_common.Connection):
inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
+ def declare_topic_consumer(self, topic, callback=None,
+ queue_name=None):
+ """declare_topic_consumer is a private method, but
+ it is being used by Quantum (Folsom).
+ This has been added compatibility.
+ """
+ # Only consume on the base topic name.
+ topic = topic.split('.', 1)[0]
+
+ if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
+ return
+
+ reactor = CallbackReactor(CONF, callback)
+ self._consume_fanout(reactor, topic, None, bind=False)
+
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]