summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py167
1 files changed, 158 insertions, 9 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 3d7ad5e..98cfd9d 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -32,13 +32,27 @@ import uuid
from eventlet import greenpool
from eventlet import pools
from eventlet import semaphore
+from eventlet import queue
+# TODO(pekowsk): Remove import cfg and below comment in Havana.
+# This import should no longer be needed when the amqp_rpc_single_reply_queue
+# option is removed.
+from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _
from openstack.common import local
from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
+# TODO(pekowski): Remove this option in Havana.
+amqp_opts = [
+ cfg.BoolOpt('amqp_rpc_single_reply_queue',
+ default=False,
+ help='Enable a fast single reply queue if using AMQP based '
+ 'RPC like RabbitMQ or Qpid.'),
+]
+
+cfg.CONF.register_opts(amqp_opts)
LOG = logging.getLogger(__name__)
@@ -51,6 +65,7 @@ class Pool(pools.Pool):
kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size)
kwargs.setdefault("order_as_stack", True)
super(Pool, self).__init__(*args, **kwargs)
+ self.reply_proxy = None
# TODO(comstud): Timeout connections not used in a while
def create(self):
@@ -60,6 +75,16 @@ class Pool(pools.Pool):
def empty(self):
while self.free_items:
self.get().close()
+ # Force a new connection pool to be created.
+ # Note that this was added due to failing unit test cases. The issue
+ # is the above "while loop" gets all the cached connections from the
+ # pool and closes them, but never returns them to the pool, a pool
+ # leak. The unit tests hang waiting for an item to be returned to the
+ # pool. The unit tests get here via the teatDown() method. In the run
+ # time code, it gets here via cleanup() and only appears in service.py
+ # just before doing a sys.exit(), so cleanup() only happens once and
+ # the leakage is not a problem.
+ self.connection_cls.pool = None
_pool_create_sem = semaphore.Semaphore()
@@ -154,8 +179,45 @@ class ConnectionContext(rpc_common.Connection):
raise rpc_common.InvalidRPCConnectionReuse()
-def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
- ending=False, log_failure=True):
+class ReplyProxy(ConnectionContext):
+ """ Connection class for RPC replies / callbacks """
+ def __init__(self, conf, connection_pool):
+ self._call_waiters = {}
+ self._num_call_waiters = 0
+ self._num_call_waiters_wrn_threshhold = 10
+ self._reply_q = 'reply_' + uuid.uuid4().hex
+ super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
+ self.declare_direct_consumer(self._reply_q, self._process_data)
+ self.consume_in_thread()
+
+ def _process_data(self, message_data):
+ msg_id = message_data.pop('_msg_id', None)
+ waiter = self._call_waiters.get(msg_id)
+ if not waiter:
+ LOG.warn(_('no calling threads waiting for msg_id : %s'
+ ', message : %s') % (msg_id, message_data))
+ else:
+ waiter.put(message_data)
+
+ def add_call_waiter(self, waiter, msg_id):
+ self._num_call_waiters += 1
+ if self._num_call_waiters > self._num_call_waiters_wrn_threshhold:
+ LOG.warn(_('Number of call waiters is greater than warning '
+ 'threshhold: %d. There could be a MulticallProxyWaiter '
+ 'leak.') % self._num_call_waiters_wrn_threshhold)
+ self._num_call_waiters_wrn_threshhold *= 2
+ self._call_waiters[msg_id] = waiter
+
+ def del_call_waiter(self, msg_id):
+ self._num_call_waiters -= 1
+ del self._call_waiters[msg_id]
+
+ def get_reply_q(self):
+ return self._reply_q
+
+
+def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
+ failure=None, ending=False, log_failure=True):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
@@ -174,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure}
if ending:
msg['ending'] = True
- conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
+ # If a reply_q exists, add the msg_id to the reply and pass the
+ # reply_q to direct_send() to use it as the response queue.
+ # Otherwise use the msg_id for backward compatibilty.
+ if reply_q:
+ msg['_msg_id'] = msg_id
+ conn.direct_send(reply_q, rpc_common.serialize_msg(msg))
+ else:
+ conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext):
"""Context that supports replying to a rpc.call"""
def __init__(self, **kwargs):
self.msg_id = kwargs.pop('msg_id', None)
+ self.reply_q = kwargs.pop('reply_q', None)
self.conf = kwargs.pop('conf')
super(RpcContext, self).__init__(**kwargs)
@@ -188,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext):
values = self.to_dict()
values['conf'] = self.conf
values['msg_id'] = self.msg_id
+ values['reply_q'] = self.reply_q
return self.__class__(**values)
def reply(self, reply=None, failure=None, ending=False,
connection_pool=None, log_failure=True):
if self.msg_id:
- msg_reply(self.conf, self.msg_id, connection_pool, reply, failure,
- ending, log_failure)
+ msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool,
+ reply, failure, ending, log_failure)
if ending:
self.msg_id = None
@@ -210,6 +281,7 @@ def unpack_context(conf, msg):
value = msg.pop(key)
context_dict[key[9:]] = value
context_dict['msg_id'] = msg.pop('_msg_id', None)
+ context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
@@ -339,6 +411,65 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=self.connection_pool)
+class MulticallProxyWaiter(object):
+ def __init__(self, conf, msg_id, timeout, connection_pool):
+ self._msg_id = msg_id
+ self._timeout = timeout or conf.rpc_response_timeout
+ self._reply_proxy = connection_pool.reply_proxy
+ self._done = False
+ self._got_ending = False
+ self._conf = conf
+ self._dataqueue = queue.LightQueue()
+ # Add this caller to the reply proxy's call_waiters
+ self._reply_proxy.add_call_waiter(self, self._msg_id)
+
+ def put(self, data):
+ self._dataqueue.put(data)
+
+ def done(self):
+ if self._done:
+ return
+ self._done = True
+ # Remove this caller from reply proxy's call_waiters
+ self._reply_proxy.del_call_waiter(self._msg_id)
+
+ def _process_data(self, data):
+ result = None
+ if data['failure']:
+ failure = data['failure']
+ result = rpc_common.deserialize_remote_exception(self._conf,
+ failure)
+ elif data.get('ending', False):
+ self._got_ending = True
+ else:
+ result = data['result']
+ return result
+
+ def __iter__(self):
+ """Return a result until we get a reply with an 'ending" flag"""
+ if self._done:
+ raise StopIteration
+ while True:
+ try:
+ data = self._dataqueue.get(timeout=self._timeout)
+ result = self._process_data(data)
+ except queue.Empty:
+ LOG.exception(_('Timed out waiting for RPC response.'))
+ self.done()
+ raise rpc_common.Timeout()
+ except Exception:
+ with excutils.save_and_reraise_exception():
+ self.done()
+ if self._got_ending:
+ self.done()
+ raise StopIteration
+ if isinstance(result, Exception):
+ self.done()
+ raise result
+ yield result
+
+
+#TODO(pekowski): Remove MulticallWaiter() in Havana.
class MulticallWaiter(object):
def __init__(self, conf, connection, timeout):
self._connection = connection
@@ -394,22 +525,40 @@ def create_connection(conf, new, connection_pool):
return ConnectionContext(conf, connection_pool, pooled=not new)
+_reply_proxy_create_sem = semaphore.Semaphore()
+
+
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
+ # TODO(pekowski): Remove all these comments in Havana.
+ # For amqp_rpc_single_reply_queue = False,
# Can't use 'with' for multicall, as it returns an iterator
# that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into
# the pool
+ # For amqp_rpc_single_reply_queue = True,
+ # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
pack_context(msg, context)
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ # TODO(pekowski): Remove this flag and the code under the if clause
+ # in Havana.
+ if not conf.amqp_rpc_single_reply_queue:
+ conn = ConnectionContext(conf, connection_pool)
+ wait_msg = MulticallWaiter(conf, conn, timeout)
+ conn.declare_direct_consumer(msg_id, wait_msg)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ else:
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
+ conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg