diff options
author | Eric Windisch <eric@cloudscaling.com> | 2013-01-10 22:56:57 -0500 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-01-13 05:39:55 +0000 |
commit | 8a187ecc010d90ba000632d97999624ee8dee130 (patch) | |
tree | 4300330226eee1eff7ef7a99c652d03499659d09 /openstack/common/rpc/impl_zmq.py | |
parent | d74668fbedc29aa4b7868fd3fb8a28d326f43d52 (diff) | |
download | oslo-8a187ecc010d90ba000632d97999624ee8dee130.tar.gz oslo-8a187ecc010d90ba000632d97999624ee8dee130.tar.xz oslo-8a187ecc010d90ba000632d97999624ee8dee130.zip |
reusable ZmqProxy service initialization
Move near-identical proxy setup code
from oslo-zmq-receiver and test_zmq
into ZmqProxy's consume_in_thread method
Provides code reuse and simplifies the
tests and receiver binary.
Change-Id: I1324eacfa2a456599d5fd462b6476ddf659e95c4
Diffstat (limited to 'openstack/common/rpc/impl_zmq.py')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index d595212..4df0aba 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import os import socket import string import sys @@ -29,6 +30,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils +from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common @@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor): LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() + class ZmqReactor(ZmqBaseReactor): """ |