diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-01-15 07:53:38 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-01-15 07:53:38 +0000 |
commit | 859cc3df24751e4fb0acda8643d4feee10c3b39a (patch) | |
tree | b02b49b4871911d32162cce4e6216458a47a0cf2 /openstack/common | |
parent | 3f93dc8a7fa24b81811e34402b1d9f2c30781265 (diff) | |
parent | 8a187ecc010d90ba000632d97999624ee8dee130 (diff) | |
download | oslo-859cc3df24751e4fb0acda8643d4feee10c3b39a.tar.gz oslo-859cc3df24751e4fb0acda8643d4feee10c3b39a.tar.xz oslo-859cc3df24751e4fb0acda8643d4feee10c3b39a.zip |
Merge "reusable ZmqProxy service initialization"
Diffstat (limited to 'openstack/common')
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 33 |
1 files changed, 33 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index b6195b0..d2af079 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import os import socket import string import sys @@ -29,6 +30,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils +from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common @@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor): LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() + class ZmqReactor(ZmqBaseReactor): """ |