summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xbin/oslo-zmq-receiver21
-rw-r--r--openstack/common/rpc/impl_zmq.py33
-rw-r--r--tests/unit/rpc/test_zmq.py29
3 files changed, 35 insertions, 48 deletions
diff --git a/bin/oslo-zmq-receiver b/bin/oslo-zmq-receiver
index 4ef24b6..00f4a85 100755
--- a/bin/oslo-zmq-receiver
+++ b/bin/oslo-zmq-receiver
@@ -47,28 +47,7 @@ CONF(sys.argv[1:], project='oslo')
def main():
logging.setup("oslo")
- ipc_dir = CONF.rpc_zmq_ipc_dir
-
- # Create the necessary directories/files for this service.
- if not os.path.isdir(ipc_dir):
- try:
- utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
- utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
- ipc_dir, run_as_root=True)
- utils.execute('chmod', '750', ipc_dir, run_as_root=True)
- except exception.ProcessExecutionError:
- logging.error(_("Could not create IPC socket directory."))
- return
-
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
- consume_in = "tcp://%s:%s" % \
- (CONF.rpc_zmq_bind_address,
- CONF.rpc_zmq_port)
- consumption_proxy = impl_zmq.InternalContext(None)
-
- reactor.register(consumption_proxy,
- consume_in, zmq.PULL, out_bind=True)
-
reactor.consume_in_thread()
reactor.wait()
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index d595212..4df0aba 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -15,6 +15,7 @@
# under the License.
import pprint
+import os
import socket
import string
import sys
@@ -29,6 +30,7 @@ from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
+from openstack.common import processutils as utils
from openstack.common.rpc import common as rpc_common
@@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor):
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
+
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
+
+ super(ZmqProxy, self).consume_in_thread()
+
class ZmqReactor(ZmqBaseReactor):
"""
diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py
index d43c384..d120bb2 100644
--- a/tests/unit/rpc/test_zmq.py
+++ b/tests/unit/rpc/test_zmq.py
@@ -75,7 +75,8 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
self.config(rpc_zmq_ipc_dir=internal_ipc_dir)
LOG.info(_("Running internal zmq receiver."))
- self.setup_receiver(internal_ipc_dir)
+ reactor = impl_zmq.ZmqProxy(FLAGS)
+ reactor.consume_in_thread()
else:
LOG.warning(_("Detected zmq-receiver socket."))
LOG.warning(_("Assuming nova-rpc-zmq-receiver is running."))
@@ -84,32 +85,6 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
super(_RpcZmqBaseTestCase, self).setUp(
topic=topic, topic_nested=topic_nested)
- def setup_receiver(self, ipc_dir):
- # Only launch the receiver if it isn't running independently.
- # This is checked again, with the (possibly) new ipc_dir.
- if not os.path.isdir(ipc_dir):
- try:
- os.mkdir(ipc_dir)
- except OSError:
- LOG.error(_("Could not create IPC directory %s") % (ipc_dir, ))
- raise
- try:
- self.reactor = impl_zmq.ZmqProxy(FLAGS)
- consume_in = "tcp://%s:%s" % \
- (FLAGS.rpc_zmq_bind_address,
- FLAGS.rpc_zmq_port)
- consumption_proxy = impl_zmq.InternalContext(None)
-
- self.reactor.register(consumption_proxy,
- consume_in,
- zmq.PULL,
- out_bind=True)
- self.reactor.consume_in_thread()
- except zmq.ZMQError:
- LOG.error(_("Could not create ZeroMQ receiver daemon. "
- "Socket may already be in use."))
- raise
-
@testutils.skip_if(not impl_zmq, "ZeroMQ library required")
def tearDown(self):
if self.reactor: