summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2013-01-15 20:40:15 +0000
committerGerrit Code Review <review@openstack.org>2013-01-15 20:40:15 +0000
commitf1bf204b0bffbb407bf38c8545929b49a3055958 (patch)
treef861d5e8a2e3149e5f01fe6bfff981a8b1c6c349 /openstack
parent5ed83aa0797033847fd2f757ce78b3931ce4fcea (diff)
parent2f93ad27a25e10df47a4181938e3a183fc899954 (diff)
downloadoslo-f1bf204b0bffbb407bf38c8545929b49a3055958.tar.gz
oslo-f1bf204b0bffbb407bf38c8545929b49a3055958.tar.xz
oslo-f1bf204b0bffbb407bf38c8545929b49a3055958.zip
Merge "Allow rpc drivers to always be imported"
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_qpid.py21
-rw-r--r--openstack/common/rpc/impl_zmq.py9
2 files changed, 20 insertions, 10 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index 3bddaad..b6691b4 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -22,16 +22,18 @@ import uuid
import eventlet
import greenlet
-import qpid.messaging
-import qpid.messaging.exceptions
from openstack.common import cfg
from openstack.common.gettextutils import _
+from openstack.common import importutils
from openstack.common import jsonutils
from openstack.common import log as logging
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
+qpid_messaging = importutils.try_import("qpid.messaging")
+qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")
+
LOG = logging.getLogger(__name__)
qpid_opts = [
@@ -275,6 +277,9 @@ class Connection(object):
pool = None
def __init__(self, conf, server_params=None):
+ if not qpid_messaging:
+ raise ImportError("Failed to import qpid.messaging")
+
self.session = None
self.consumers = {}
self.consumer_thread = None
@@ -303,7 +308,7 @@ class Connection(object):
def connection_create(self, broker):
# Create the connection - this does not open the connection
- self.connection = qpid.messaging.Connection(broker)
+ self.connection = qpid_messaging.Connection(broker)
# Check if flags are set and if so set them for the connection
# before we call open
@@ -328,7 +333,7 @@ class Connection(object):
if self.connection.opened():
try:
self.connection.close()
- except qpid.messaging.exceptions.ConnectionError:
+ except qpid_exceptions.ConnectionError:
pass
attempt = 0
@@ -340,7 +345,7 @@ class Connection(object):
try:
self.connection_create(broker)
self.connection.open()
- except qpid.messaging.exceptions.ConnectionError, e:
+ except qpid_exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
@@ -367,8 +372,8 @@ class Connection(object):
while True:
try:
return method(*args, **kwargs)
- except (qpid.messaging.exceptions.Empty,
- qpid.messaging.exceptions.ConnectionError), e:
+ except (qpid_exceptions.Empty,
+ qpid_exceptions.ConnectionError), e:
if error_callback:
error_callback(e)
self.reconnect()
@@ -408,7 +413,7 @@ class Connection(object):
"""Return an iterator that will consume from all queues/consumers"""
def _error_callback(exc):
- if isinstance(exc, qpid.messaging.exceptions.Empty):
+ if isinstance(exc, qpid_exceptions.Empty):
LOG.exception(_('Timed out waiting for RPC response: %s') %
str(exc))
raise rpc_common.Timeout()
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index b3c83ef..d12fd86 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -23,7 +23,6 @@ import types
import uuid
import eventlet
-from eventlet.green import zmq
import greenlet
from openstack.common import cfg
@@ -33,6 +32,7 @@ from openstack.common import jsonutils
from openstack.common import processutils as utils
from openstack.common.rpc import common as rpc_common
+zmq = importutils.try_import('eventlet.green.zmq')
# for convenience, are not modified.
pformat = pprint.pformat
@@ -212,7 +212,9 @@ class ZmqSocket(object):
class ZmqClient(object):
"""Client for ZMQ sockets."""
- def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
+ def __init__(self, addr, socket_type=None, bind=False):
+ if socket_type is None:
+ socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
@@ -766,6 +768,9 @@ def cleanup():
def _get_ctxt():
+ if not zmq:
+ raise ImportError("Failed to import eventlet.green.zmq")
+
global ZMQ_CTX
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts)