summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-01-15 07:53:38 +0000
committerGerrit Code Review <review@openstack.org>2013-01-15 07:53:38 +0000
commit859cc3df24751e4fb0acda8643d4feee10c3b39a (patch)
treeb02b49b4871911d32162cce4e6216458a47a0cf2 /openstack
parent3f93dc8a7fa24b81811e34402b1d9f2c30781265 (diff)
parent8a187ecc010d90ba000632d97999624ee8dee130 (diff)
downloadoslo-859cc3df24751e4fb0acda8643d4feee10c3b39a.tar.gz
oslo-859cc3df24751e4fb0acda8643d4feee10c3b39a.tar.xz
oslo-859cc3df24751e4fb0acda8643d4feee10c3b39a.zip
Merge "reusable ZmqProxy service initialization"
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_zmq.py33
1 files changed, 33 insertions, 0 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index b6195b0..d2af079 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -15,6 +15,7 @@
# under the License.
import pprint
+import os
import socket
import string
import sys
@@ -29,6 +30,7 @@ from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
+from openstack.common import processutils as utils
from openstack.common.rpc import common as rpc_common
@@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor):
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
+
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
+
+ super(ZmqProxy, self).consume_in_thread()
+
class ZmqReactor(ZmqBaseReactor):
"""