From ab043101589a9167357ca317ca610a3f4144747c Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Wed, 9 Jan 2013 12:30:47 -0500 Subject: zmq-receiver msg forwarding in greenthreads Because PUSH sockets can block and there may not be PULL consumers, the oslo-zmq-receiver process may stall in certain senarios. Improves error handling in ZmqProxy. This addresses bug 1097856. Additionally, this bug seems to improve (but not necessarily fix) the behavior reported in bug 1065532. Change-Id: I6df6035a6676c5bcdddaec7a332ac77e621ba9f3 --- openstack/common/rpc/impl_zmq.py | 68 ++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 20 deletions(-) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index c147b27..d595212 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -61,6 +61,10 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), + cfg.IntOpt('rpc_zmq_topic_backlog', default=None, + help='Maximum number of ingress messages to locally buffer ' + 'per topic. Default is unlimited.'), + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), @@ -413,12 +417,6 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = CONF.rpc_zmq_ipc_dir - - self.topic_proxy['zmq_replies'] = \ - ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), - zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir @@ -444,20 +442,50 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUSH if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) - - # It takes some time for a pub socket to open, - # before we can have any faith in doing a send() to it. - if sock_type == zmq.PUB: - eventlet.sleep(.5) - - LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) - self.topic_proxy[topic].send(data) - LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + def publisher(waiter): + LOG.info(_("Creating proxy for topic: %s"), topic) + + try: + out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" % + (ipc_dir, topic), + sock_type, bind=True) + except RPCException: + waiter.send_exception(*sys.exc_info()) + return + + self.topic_proxy[topic] = eventlet.queue.LightQueue( + CONF.rpc_zmq_topic_backlog) + self.sockets.append(out_sock) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + waiter.send(True) + + while(True): + data = self.topic_proxy[topic].get() + out_sock.send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % + {'data': data}) + + wait_sock_creation = eventlet.event.Event() + eventlet.spawn(publisher, wait_sock_creation) + + try: + wait_sock_creation.wait() + except RPCException: + LOG.error(_("Topic socket file creation failed.")) + return + + try: + self.topic_proxy[topic].put_nowait(data) + LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") % + {'data': data}) + except eventlet.queue.Full: + LOG.error(_("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) class ZmqReactor(ZmqBaseReactor): -- cgit