From 8a187ecc010d90ba000632d97999624ee8dee130 Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Thu, 10 Jan 2013 22:56:57 -0500 Subject: reusable ZmqProxy service initialization Move near-identical proxy setup code from oslo-zmq-receiver and test_zmq into ZmqProxy's consume_in_thread method Provides code reuse and simplifies the tests and receiver binary. Change-Id: I1324eacfa2a456599d5fd462b6476ddf659e95c4 --- openstack/common/rpc/impl_zmq.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) (limited to 'openstack') diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index d595212..4df0aba 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import os import socket import string import sys @@ -29,6 +30,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils +from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common @@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor): LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() + class ZmqReactor(ZmqBaseReactor): """ -- cgit