summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/importutils.py8
-rw-r--r--openstack/common/rpc/impl_zmq.py42
-rw-r--r--openstack/common/service.py4
3 files changed, 49 insertions, 5 deletions
diff --git a/openstack/common/importutils.py b/openstack/common/importutils.py
index 2a28b45..9dec764 100644
--- a/openstack/common/importutils.py
+++ b/openstack/common/importutils.py
@@ -57,3 +57,11 @@ def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
+
+
+def try_import(import_str, default=None):
+ """Try to import a module and if it fails return default."""
+ try:
+ return import_module(import_str)
+ except ImportError:
+ return default
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index d595212..d2af079 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -15,6 +15,7 @@
# under the License.
import pprint
+import os
import socket
import string
import sys
@@ -29,6 +30,7 @@ from openstack.common import cfg
from openstack.common.gettextutils import _
from openstack.common import importutils
from openstack.common import jsonutils
+from openstack.common import processutils as utils
from openstack.common.rpc import common as rpc_common
@@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor):
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
+ def consume_in_thread(self):
+ """Runs the ZmqProxy service"""
+ ipc_dir = CONF.rpc_zmq_ipc_dir
+ consume_in = "tcp://%s:%s" % \
+ (CONF.rpc_zmq_bind_address,
+ CONF.rpc_zmq_port)
+ consumption_proxy = InternalContext(None)
+
+ if not os.path.isdir(ipc_dir):
+ try:
+ utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
+ utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
+ ipc_dir, run_as_root=True)
+ utils.execute('chmod', '750', ipc_dir, run_as_root=True)
+ except utils.ProcessExecutionError:
+ LOG.error(_("Could not create IPC directory %s") %
+ (ipc_dir, ))
+ raise
+
+ try:
+ self.register(consumption_proxy,
+ consume_in,
+ zmq.PULL,
+ out_bind=True)
+ except zmq.ZMQError:
+ LOG.error(_("Could not create ZeroMQ receiver daemon. "
+ "Socket may already be in use."))
+ raise
+
+ super(ZmqProxy, self).consume_in_thread()
+
class ZmqReactor(ZmqBaseReactor):
"""
@@ -579,7 +612,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None):
+def _call(addr, context, msg_id, topic, msg, timeout=None,
+ serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -614,7 +648,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload)
+ _cast(addr, context, msg_id, topic, payload,
+ serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
@@ -670,7 +705,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
_topic, _topic, msg, timeout, serialize,
force_envelope)
return
- return method(_addr, context, _topic, _topic, msg, timeout)
+ return method(_addr, context, _topic, _topic, msg, timeout,
+ serialize, force_envelope)
def create_connection(conf, new=True):
diff --git a/openstack/common/service.py b/openstack/common/service.py
index 740378f..6b180aa 100644
--- a/openstack/common/service.py
+++ b/openstack/common/service.py
@@ -27,17 +27,17 @@ import sys
import time
import eventlet
-import extras
import logging as std_logging
from openstack.common import cfg
from openstack.common import eventlet_backdoor
from openstack.common.gettextutils import _
+from openstack.common import importutils
from openstack.common import log as logging
from openstack.common import threadgroup
-rpc = extras.try_import('openstack.common.rpc')
+rpc = importutils.try_import('openstack.common.rpc')
CONF = cfg.CONF
LOG = logging.getLogger(__name__)