summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-01-12 12:01:11 +0000
committerGerrit Code Review <review@openstack.org>2013-01-12 12:01:11 +0000
commite8859c0f2cd332f45c632a3980b86707a17cdd8e (patch)
tree7514f358472e946a279da00475ad6d709027cfc9 /openstack/common
parent35c7db44ee57162a4f2e695b568d3e8df6305ee5 (diff)
parentab043101589a9167357ca317ca610a3f4144747c (diff)
downloadoslo-e8859c0f2cd332f45c632a3980b86707a17cdd8e.tar.gz
oslo-e8859c0f2cd332f45c632a3980b86707a17cdd8e.tar.xz
oslo-e8859c0f2cd332f45c632a3980b86707a17cdd8e.zip
Merge "zmq-receiver msg forwarding in greenthreads"
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/impl_zmq.py68
1 files changed, 48 insertions, 20 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index c147b27..d595212 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -61,6 +61,10 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
+ cfg.IntOpt('rpc_zmq_topic_backlog', default=None,
+ help='Maximum number of ingress messages to locally buffer '
+ 'per topic. Default is unlimited.'),
+
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'),
@@ -413,12 +417,6 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).__init__(conf)
self.topic_proxy = {}
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- self.topic_proxy['zmq_replies'] = \
- ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
- zmq.PUB, bind=True)
- self.sockets.append(self.topic_proxy['zmq_replies'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
@@ -444,20 +442,50 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
- outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
- sock_type, bind=True)
- self.topic_proxy[topic] = outq
- self.sockets.append(outq)
- LOG.info(_("Created topic proxy: %s"), topic)
-
- # It takes some time for a pub socket to open,
- # before we can have any faith in doing a send() to it.
- if sock_type == zmq.PUB:
- eventlet.sleep(.5)
-
- LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
- self.topic_proxy[topic].send(data)
- LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
+ def publisher(waiter):
+ LOG.info(_("Creating proxy for topic: %s"), topic)
+
+ try:
+ out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
+ (ipc_dir, topic),
+ sock_type, bind=True)
+ except RPCException:
+ waiter.send_exception(*sys.exc_info())
+ return
+
+ self.topic_proxy[topic] = eventlet.queue.LightQueue(
+ CONF.rpc_zmq_topic_backlog)
+ self.sockets.append(out_sock)
+
+ # It takes some time for a pub socket to open,
+ # before we can have any faith in doing a send() to it.
+ if sock_type == zmq.PUB:
+ eventlet.sleep(.5)
+
+ waiter.send(True)
+
+ while(True):
+ data = self.topic_proxy[topic].get()
+ out_sock.send(data)
+ LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") %
+ {'data': data})
+
+ wait_sock_creation = eventlet.event.Event()
+ eventlet.spawn(publisher, wait_sock_creation)
+
+ try:
+ wait_sock_creation.wait()
+ except RPCException:
+ LOG.error(_("Topic socket file creation failed."))
+ return
+
+ try:
+ self.topic_proxy[topic].put_nowait(data)
+ LOG.debug(_("ROUTER RELAY-OUT QUEUED %(data)s") %
+ {'data': data})
+ except eventlet.queue.Full:
+ LOG.error(_("Local per-topic backlog buffer full for topic "
+ "%(topic)s. Dropping message.") % {'topic': topic})
class ZmqReactor(ZmqBaseReactor):