diff options
| author | Eric Windisch <eric@cloudscaling.com> | 2012-09-12 15:21:10 -0400 |
|---|---|---|
| committer | Eric Windisch <eric@cloudscaling.com> | 2012-09-20 15:40:19 -0400 |
| commit | d5095d7effe79c1dcf19a67ff03e00a454662830 (patch) | |
| tree | 5139d358ec47fe6050a9d01ca2e127433074a239 /openstack | |
| parent | 032bf345ea593e97fcc07e23c1725ba5f4b612d0 (diff) | |
| download | oslo-d5095d7effe79c1dcf19a67ff03e00a454662830.tar.gz oslo-d5095d7effe79c1dcf19a67ff03e00a454662830.tar.xz oslo-d5095d7effe79c1dcf19a67ff03e00a454662830.zip | |
Support declare_topic_consumer in impl_zmq
Required to support scalable agents in Quantum.
declare_topic_consumer is implementation-specific, but it (or similar) is
needed to consume notifications.
Change-Id: Ifdeff8ae0ed90106a6bfaae0116db9d91d6b6faf
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 29 |
1 files changed, 29 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 5c9d190..5e373ba 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -462,6 +462,20 @@ class ZmqProxy(ZmqBaseReactor): LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) +class CallbackReactor(ZmqBaseReactor): + """ + A consumer class passing messages to a callback + """ + + def __init__(self, conf, callback): + self._cb = callback + super(CallbackReactor, self).__init__(conf) + + def consume(self, sock): + data = sock.recv() + self._cb(data[3]) + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -504,6 +518,21 @@ class Connection(rpc_common.Connection): inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) + def declare_topic_consumer(self, topic, callback=None, + queue_name=None): + """declare_topic_consumer is a private method, but + it is being used by Quantum (Folsom). + This has been added compatibility. + """ + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): + return + + reactor = CallbackReactor(CONF, callback) + self._consume_fanout(reactor, topic, None, bind=False) + def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] |
