From d5095d7effe79c1dcf19a67ff03e00a454662830 Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Wed, 12 Sep 2012 15:21:10 -0400 Subject: Support declare_topic_consumer in impl_zmq Required to support scalable agents in Quantum. declare_topic_consumer is implementation-specific, but it (or similar) is needed to consume notifications. Change-Id: Ifdeff8ae0ed90106a6bfaae0116db9d91d6b6faf --- openstack/common/rpc/impl_zmq.py | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) (limited to 'openstack/common') diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 5c9d190..5e373ba 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -462,6 +462,20 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) +class CallbackReactor(ZmqBaseReactor): + """ + A consumer class passing messages to a callback + """ + + def __init__(self, conf, callback): + self._cb = callback + super(CallbackReactor, self).__init__(conf) + + def consume(self, sock): + data = sock.recv() + self._cb(data[3]) + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -504,6 +518,21 @@ class Connection(rpc_common.Connection): inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + def declare_topic_consumer(self, topic, callback=None, + queue_name=None): + """declare_topic_consumer is a private method, but + it is being used by Quantum (Folsom). + This has been added compatibility. + """ + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): + return + + reactor = CallbackReactor(CONF, callback) + self._consume_fanout(reactor, topic, None, bind=False) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] -- cgit