diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-01-12 12:01:11 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-01-12 12:01:11 +0000 |
commit | e8859c0f2cd332f45c632a3980b86707a17cdd8e (patch) | |
tree | 7514f358472e946a279da00475ad6d709027cfc9 /openstack/common | |
parent | 35c7db44ee57162a4f2e695b568d3e8df6305ee5 (diff) | |
parent | ab043101589a9167357ca317ca610a3f4144747c (diff) | |
download | oslo-e8859c0f2cd332f45c632a3980b86707a17cdd8e.tar.gz oslo-e8859c0f2cd332f45c632a3980b86707a17cdd8e.tar.xz oslo-e8859c0f2cd332f45c632a3980b86707a17cdd8e.zip |
Merge "zmq-receiver msg forwarding in greenthreads"
Diffstat (limited to 'openstack/common')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 68 |
1 files changed, 48 insertions, 20 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index c147b27..d595212 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -61,6 +61,10 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), + cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), @@ -413,12 +417,6 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = CONF.rpc_zmq_ipc_dir - - self.topic_proxy['zmq_replies'] = \ - ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), - zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir @@ -444,20 +442,50 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUSH if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) - - LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) - self.topic_proxy[topic].send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + def publisher(waiter): + LOG.info(_("Creating proxy for topic: %s"), topic) + + try: + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return + + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % + {'data': data}) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_("Topic socket file creation failed.")) + return + + try: + self.topic_proxy[topic].put_nowait(data) + LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % + {'data': data}) + except eventlet.queue.Full: + LOG.error(_("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) class ZmqReactor(ZmqBaseReactor): |