From 2f93ad27a25e10df47a4181938e3a183fc899954 Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Mon, 14 Jan 2013 08:37:11 +0000 Subject: Allow rpc drivers to always be imported The qpid and zmq drivers current can't be import unless their respective libraries are installed and neither are currently listed in test-requires. This prevents us from being able to use nosetests --with-doctests because the import failure is seen as a test failure. Fix this issue by making the import always succeed and deferring failure until the module's APIs are actually used. This will also help Nova's sample config file generation since it will allow the generate-sample tool discover config options for these drivers even when the messaging libraries aren't installed. Change-Id: I30087fd33eef41d50d1a374cb5694dc15c43d1da --- openstack/common/rpc/impl_qpid.py | 21 +++++++++++++-------- openstack/common/rpc/impl_zmq.py | 9 +++++++-- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index 3bddaad..b6691b4 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -22,16 +22,18 @@ import uuid import eventlet import greenlet -import qpid.messaging -import qpid.messaging.exceptions from openstack.common import cfg from openstack.common.gettextutils import _ +from openstack.common import importutils from openstack.common import jsonutils from openstack.common import log as logging from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common +qpid_messaging = importutils.try_import("qpid.messaging") +qpid_exceptions = importutils.try_import("qpid.messaging.exceptions") + LOG = logging.getLogger(__name__) qpid_opts = [ @@ -275,6 +277,9 @@ class Connection(object): pool = None def __init__(self, conf, server_params=None): + if not qpid_messaging: + raise ImportError("Failed to import qpid.messaging") + self.session = None self.consumers = {} self.consumer_thread = None @@ -303,7 +308,7 @@ class Connection(object): def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(broker) + self.connection = qpid_messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -328,7 +333,7 @@ class Connection(object): if self.connection.opened(): try: self.connection.close() - except qpid.messaging.exceptions.ConnectionError: + except qpid_exceptions.ConnectionError: pass attempt = 0 @@ -340,7 +345,7 @@ class Connection(object): try: self.connection_create(broker) self.connection.open() - except qpid.messaging.exceptions.ConnectionError, e: + except qpid_exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) msg = _("Unable to connect to AMQP server: %(e)s. " "Sleeping %(delay)s seconds") % msg_dict @@ -367,8 +372,8 @@ class Connection(object): while True: try: return method(*args, **kwargs) - except (qpid.messaging.exceptions.Empty, - qpid.messaging.exceptions.ConnectionError), e: + except (qpid_exceptions.Empty, + qpid_exceptions.ConnectionError), e: if error_callback: error_callback(e) self.reconnect() @@ -408,7 +413,7 @@ class Connection(object): """Return an iterator that will consume from all queues/consumers""" def _error_callback(exc): - if isinstance(exc, qpid.messaging.exceptions.Empty): + if isinstance(exc, qpid_exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % str(exc)) raise rpc_common.Timeout() diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 12bfffa..b865de2 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -23,7 +23,6 @@ import types import uuid import eventlet -from eventlet.green import zmq import greenlet from openstack.common import cfg @@ -33,6 +32,7 @@ from openstack.common import jsonutils from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common +zmq = importutils.try_import('eventlet.green.zmq') # for convenience, are not modified. pformat = pprint.pformat @@ -208,7 +208,9 @@ class ZmqSocket(object): class ZmqClient(object): """Client for ZMQ sockets.""" - def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + def __init__(self, addr, socket_type=None, bind=False): + if socket_type is None: + socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): @@ -762,6 +764,9 @@ def cleanup(): def _get_ctxt(): + if not zmq: + raise ImportError("Failed to import eventlet.green.zmq") + global ZMQ_CTX if not ZMQ_CTX: ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) -- cgit