diff options
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 167 |
1 files changed, 158 insertions, 9 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 3d7ad5e..98cfd9d 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -32,13 +32,27 @@ import uuid from eventlet import greenpool from eventlet import pools from eventlet import semaphore +from eventlet import queue +# TODO(pekowsk): Remove import cfg and below comment in Havana. +# This import should no longer be needed when the amqp_rpc_single_reply_queue +# option is removed. +from oslo.config import cfg from openstack.common import excutils from openstack.common.gettextutils import _ from openstack.common import local from openstack.common import log as logging from openstack.common.rpc import common as rpc_common +# TODO(pekowski): Remove this option in Havana. +amqp_opts = [ + cfg.BoolOpt('amqp_rpc_single_reply_queue', + default=False, + help='Enable a fast single reply queue if using AMQP based ' + 'RPC like RabbitMQ or Qpid.'), +] + +cfg.CONF.register_opts(amqp_opts) LOG = logging.getLogger(__name__) @@ -51,6 +65,7 @@ class Pool(pools.Pool): kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) + self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while def create(self): @@ -60,6 +75,16 @@ class Pool(pools.Pool): def empty(self): while self.free_items: self.get().close() + # Force a new connection pool to be created. + # Note that this was added due to failing unit test cases. The issue + # is the above "while loop" gets all the cached connections from the + # pool and closes them, but never returns them to the pool, a pool + # leak. The unit tests hang waiting for an item to be returned to the + # pool. The unit tests get here via the teatDown() method. In the run + # time code, it gets here via cleanup() and only appears in service.py + # just before doing a sys.exit(), so cleanup() only happens once and + # the leakage is not a problem. + self.connection_cls.pool = None _pool_create_sem = semaphore.Semaphore() @@ -154,8 +179,45 @@ class ConnectionContext(rpc_common.Connection): raise rpc_common.InvalidRPCConnectionReuse() -def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, - ending=False, log_failure=True): +class ReplyProxy(ConnectionContext): + """ Connection class for RPC replies / callbacks """ + def __init__(self, conf, connection_pool): + self._call_waiters = {} + self._num_call_waiters = 0 + self._num_call_waiters_wrn_threshhold = 10 + self._reply_q = 'reply_' + uuid.uuid4().hex + super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) + self.declare_direct_consumer(self._reply_q, self._process_data) + self.consume_in_thread() + + def _process_data(self, message_data): + msg_id = message_data.pop('_msg_id', None) + waiter = self._call_waiters.get(msg_id) + if not waiter: + LOG.warn(_('no calling threads waiting for msg_id : %s' + ', message : %s') % (msg_id, message_data)) + else: + waiter.put(message_data) + + def add_call_waiter(self, waiter, msg_id): + self._num_call_waiters += 1 + if self._num_call_waiters > self._num_call_waiters_wrn_threshhold: + LOG.warn(_('Number of call waiters is greater than warning ' + 'threshhold: %d. There could be a MulticallProxyWaiter ' + 'leak.') % self._num_call_waiters_wrn_threshhold) + self._num_call_waiters_wrn_threshhold *= 2 + self._call_waiters[msg_id] = waiter + + def del_call_waiter(self, msg_id): + self._num_call_waiters -= 1 + del self._call_waiters[msg_id] + + def get_reply_q(self): + return self._reply_q + + +def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, + failure=None, ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -174,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) + # If a reply_q exists, add the msg_id to the reply and pass the + # reply_q to direct_send() to use it as the response queue. + # Otherwise use the msg_id for backward compatibilty. + if reply_q: + msg['_msg_id'] = msg_id + conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) + else: + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call""" def __init__(self, **kwargs): self.msg_id = kwargs.pop('msg_id', None) + self.reply_q = kwargs.pop('reply_q', None) self.conf = kwargs.pop('conf') super(RpcContext, self).__init__(**kwargs) @@ -188,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext): values = self.to_dict() values['conf'] = self.conf values['msg_id'] = self.msg_id + values['reply_q'] = self.reply_q return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, connection_pool=None, log_failure=True): if self.msg_id: - msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, - ending, log_failure) + msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, + reply, failure, ending, log_failure) if ending: self.msg_id = None @@ -210,6 +281,7 @@ def unpack_context(conf, msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['reply_q'] = msg.pop('_reply_q', None) context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) @@ -339,6 +411,65 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=self.connection_pool) +class MulticallProxyWaiter(object): + def __init__(self, conf, msg_id, timeout, connection_pool): + self._msg_id = msg_id + self._timeout = timeout or conf.rpc_response_timeout + self._reply_proxy = connection_pool.reply_proxy + self._done = False + self._got_ending = False + self._conf = conf + self._dataqueue = queue.LightQueue() + # Add this caller to the reply proxy's call_waiters + self._reply_proxy.add_call_waiter(self, self._msg_id) + + def put(self, data): + self._dataqueue.put(data) + + def done(self): + if self._done: + return + self._done = True + # Remove this caller from reply proxy's call_waiters + self._reply_proxy.del_call_waiter(self._msg_id) + + def _process_data(self, data): + result = None + if data['failure']: + failure = data['failure'] + result = rpc_common.deserialize_remote_exception(self._conf, + failure) + elif data.get('ending', False): + self._got_ending = True + else: + result = data['result'] + return result + + def __iter__(self): + """Return a result until we get a reply with an 'ending" flag""" + if self._done: + raise StopIteration + while True: + try: + data = self._dataqueue.get(timeout=self._timeout) + result = self._process_data(data) + except queue.Empty: + LOG.exception(_('Timed out waiting for RPC response.')) + self.done() + raise rpc_common.Timeout() + except Exception: + with excutils.save_and_reraise_exception(): + self.done() + if self._got_ending: + self.done() + raise StopIteration + if isinstance(result, Exception): + self.done() + raise result + yield result + + +#TODO(pekowski): Remove MulticallWaiter() in Havana. class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection @@ -394,22 +525,40 @@ def create_connection(conf, new, connection_pool): return ConnectionContext(conf, connection_pool, pooled=not new) +_reply_proxy_create_sem = semaphore.Semaphore() + + def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" + # TODO(pekowski): Remove all these comments in Havana. + # For amqp_rpc_single_reply_queue = False, # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool + # For amqp_rpc_single_reply_queue = True, + # The 'with' statement is mandatory for closing the connection LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) pack_context(msg, context) - conn = ConnectionContext(conf, connection_pool) - wait_msg = MulticallWaiter(conf, conn, timeout) - conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + # TODO(pekowski): Remove this flag and the code under the if clause + # in Havana. + if not conf.amqp_rpc_single_reply_queue: + conn = ConnectionContext(conf, connection_pool) + wait_msg = MulticallWaiter(conf, conn, timeout) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + else: + with _reply_proxy_create_sem: + if not connection_pool.reply_proxy: + connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) + msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) + wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) + with ConnectionContext(conf, connection_pool) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg |
