diff options
author | Jenkins <jenkins@review.openstack.org> | 2013-01-15 20:40:15 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2013-01-15 20:40:15 +0000 |
commit | f1bf204b0bffbb407bf38c8545929b49a3055958 (patch) | |
tree | f861d5e8a2e3149e5f01fe6bfff981a8b1c6c349 | |
parent | 5ed83aa0797033847fd2f757ce78b3931ce4fcea (diff) | |
parent | 2f93ad27a25e10df47a4181938e3a183fc899954 (diff) | |
download | oslo-f1bf204b0bffbb407bf38c8545929b49a3055958.tar.gz oslo-f1bf204b0bffbb407bf38c8545929b49a3055958.tar.xz oslo-f1bf204b0bffbb407bf38c8545929b49a3055958.zip |
Merge "Allow rpc drivers to always be imported"
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 21 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 9 |
2 files changed, 20 insertions, 10 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 3bddaad..b6691b4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -22,16 +22,18 @@ import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from openstack.common import cfg from openstack.common.gettextutils import _ +from openstack.common import importutils from openstack.common import jsonutils from openstack.common import log as logging from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -275,6 +277,9 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None @@ -303,7 +308,7 @@ class Connection(object): def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -328,7 +333,7 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass attempt = 0 @@ -340,7 +345,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -367,8 +372,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -408,7 +413,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index b3c83ef..d12fd86 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -23,7 +23,6 @@ import types import uuid import eventlet -from eventlet.green import zmq import greenlet from openstack.common import cfg @@ -33,6 +32,7 @@ from openstack.common import jsonutils from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common +zmq = importutils.try_import('eventlet.green.zmq') # for convenience, are not modified. pformat = pprint.pformat @@ -212,7 +212,9 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + def __init__(self, addr, socket_type=None, bind=False): + if socket_type is None: + socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): @@ -766,6 +768,9 @@ def cleanup(): def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") + global ZMQ_CTX if not ZMQ_CTX: ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) |