summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-01-10 22:56:57 -0500
committerGerrit Code Review <review@openstack.org>2013-01-13 05:39:55 +0000
commit8a187ecc010d90ba000632d97999624ee8dee130 (patch)
tree4300330226eee1eff7ef7a99c652d03499659d09 /openstack
parentd74668fbedc29aa4b7868fd3fb8a28d326f43d52 (diff)
downloadoslo-8a187ecc010d90ba000632d97999624ee8dee130.tar.gz
oslo-8a187ecc010d90ba000632d97999624ee8dee130.tar.xz
oslo-8a187ecc010d90ba000632d97999624ee8dee130.zip
reusable ZmqProxy service initialization
Move near-identical proxy setup code from oslo-zmq-receiver and test_zmq into ZmqProxy's consume_in_thread method Provides code reuse and simplifies the tests and receiver binary. Change-Id: I1324eacfa2a456599d5fd462b6476ddf659e95c4
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_zmq.py33
1 files changed, 33 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index d595212..4df0aba 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -15,6 +15,7 @@
# under the License.
import pprint
+import os
import socket
import string
import sys
@@ -29,6 +30,7 @@ from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
+from openstack.common import processutils as utils
from openstack.common.rpc import common as rpc_common
@@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor):
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
+
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
+
+ super(ZmqProxy, self).consume_in_thread()
+
class ZmqReactor(ZmqBaseReactor):
"""