diff options
Diffstat (limited to 'openstack/common/rpc/impl_qpid.py')
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 21 |
1 files changed, 13 insertions, 8 deletions
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 3bddaad..b6691b4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -22,16 +22,18 @@ import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from openstack.common import cfg from openstack.common.gettextutils import _ +from openstack.common import importutils from openstack.common import jsonutils from openstack.common import log as logging from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -275,6 +277,9 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None @@ -303,7 +308,7 @@ class Connection(object): def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -328,7 +333,7 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass attempt = 0 @@ -340,7 +345,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -367,8 +372,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -408,7 +413,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() |