diff options
Diffstat (limited to 'openstack')
-rw-r--r-- | openstack/common/importutils.py | 8 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 42 | ||||
-rw-r--r-- | openstack/common/service.py | 4 |
3 files changed, 49 insertions, 5 deletions
diff --git a/openstack/common/importutils.py b/openstack/common/importutils.py index 2a28b45..9dec764 100644 --- a/openstack/common/importutils.py +++ b/openstack/common/importutils.py @@ -57,3 +57,11 @@ def import_module(import_str): """Import a module.""" __import__(import_str) return sys.modules[import_str] + + +def try_import(import_str, default=None): + """Try to import a module and if it fails return default.""" + try: + return import_module(import_str) + except ImportError: + return default diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index d595212..d2af079 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import os import socket import string import sys @@ -29,6 +30,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils +from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common @@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor): LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() + class ZmqReactor(ZmqBaseReactor): """ @@ -579,7 +612,8 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, conn.close() -def _call(addr, context, msg_id, topic, msg, timeout=None): +def _call(addr, context, msg_id, topic, msg, timeout=None, + serialize=True, force_envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -614,7 +648,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): ) LOG.debug(_("Sending cast")) - _cast(addr, context, msg_id, topic, payload) + _cast(addr, context, msg_id, topic, payload, + serialize=serialize, force_envelope=force_envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply @@ -670,7 +705,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, _topic, _topic, msg, timeout, serialize, force_envelope) return - return method(_addr, context, _topic, _topic, msg, timeout) + return method(_addr, context, _topic, _topic, msg, timeout, + serialize, force_envelope) def create_connection(conf, new=True): diff --git a/openstack/common/service.py b/openstack/common/service.py index 740378f..6b180aa 100644 --- a/openstack/common/service.py +++ b/openstack/common/service.py @@ -27,17 +27,17 @@ import sys import time import eventlet -import extras import logging as std_logging from openstack.common import cfg from openstack.common import eventlet_backdoor from openstack.common.gettextutils import _ +from openstack.common import importutils from openstack.common import log as logging from openstack.common import threadgroup -rpc = extras.try_import('openstack.common.rpc') +rpc = importutils.try_import('openstack.common.rpc') CONF = cfg.CONF LOG = logging.getLogger(__name__) |