summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/amqp.py
diff options
context:
space:
mode:
Diffstat (limited to 'openstack/common/rpc/amqp.py')
-rw-r--r--openstack/common/rpc/amqp.py100
1 files changed, 9 insertions, 91 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 64ef582..22e01d7 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -34,10 +34,6 @@ from eventlet import greenpool
from eventlet import pools
from eventlet import queue
from eventlet import semaphore
-# TODO(pekowsk): Remove import cfg and below comment in Havana.
-# This import should no longer be needed when the amqp_rpc_single_reply_queue
-# option is removed.
-from oslo.config import cfg
from openstack.common import excutils
from openstack.common.gettextutils import _
@@ -46,16 +42,6 @@ from openstack.common import log as logging
from openstack.common.rpc import common as rpc_common
-# TODO(pekowski): Remove this option in Havana.
-amqp_opts = [
- cfg.BoolOpt('amqp_rpc_single_reply_queue',
- default=False,
- help='Enable a fast single reply queue if using AMQP based '
- 'RPC like RabbitMQ or Qpid.'),
-]
-
-cfg.CONF.register_opts(amqp_opts)
-
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@@ -83,7 +69,7 @@ class Pool(pools.Pool):
# is the above "while loop" gets all the cached connections from the
# pool and closes them, but never returns them to the pool, a pool
# leak. The unit tests hang waiting for an item to be returned to the
- # pool. The unit tests get here via the teatDown() method. In the run
+ # pool. The unit tests get here via the tearDown() method. In the run
# time code, it gets here via cleanup() and only appears in service.py
# just before doing a sys.exit(), so cleanup() only happens once and
# the leakage is not a problem.
@@ -197,9 +183,10 @@ class ReplyProxy(ConnectionContext):
msg_id = message_data.pop('_msg_id', None)
waiter = self._call_waiters.get(msg_id)
if not waiter:
- LOG.warn(_('no calling threads waiting for msg_id : %(msg_id)s'
+ LOG.warn(_('No calling threads waiting for msg_id : %(msg_id)s'
', message : %(data)s'), {'msg_id': msg_id,
'data': message_data})
+ LOG.warn(_('_call_waiters: %s') % str(self._call_waiters))
else:
waiter.put(message_data)
@@ -516,59 +503,6 @@ class MulticallProxyWaiter(object):
yield result
-#TODO(pekowski): Remove MulticallWaiter() in Havana.
-class MulticallWaiter(object):
- def __init__(self, conf, connection, timeout):
- self._connection = connection
- self._iterator = connection.iterconsume(timeout=timeout or
- conf.rpc_response_timeout)
- self._result = None
- self._done = False
- self._got_ending = False
- self._conf = conf
- self.msg_id_cache = _MsgIdCache()
-
- def done(self):
- if self._done:
- return
- self._done = True
- self._iterator.close()
- self._iterator = None
- self._connection.close()
-
- def __call__(self, data):
- """The consume() callback will call this. Store the result."""
- self.msg_id_cache.check_duplicate_message(data)
- if data['failure']:
- failure = data['failure']
- self._result = rpc_common.deserialize_remote_exception(self._conf,
- failure)
-
- elif data.get('ending', False):
- self._got_ending = True
- else:
- self._result = data['result']
-
- def __iter__(self):
- """Return a result until we get a 'None' response from consumer"""
- if self._done:
- raise StopIteration
- while True:
- try:
- self._iterator.next()
- except Exception:
- with excutils.save_and_reraise_exception():
- self.done()
- if self._got_ending:
- self.done()
- raise StopIteration
- result = self._result
- if isinstance(result, Exception):
- self.done()
- raise result
- yield result
-
-
def create_connection(conf, new, connection_pool):
"""Create a connection."""
return ConnectionContext(conf, connection_pool, pooled=not new)
@@ -579,14 +513,6 @@ _reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
- # TODO(pekowski): Remove all these comments in Havana.
- # For amqp_rpc_single_reply_queue = False,
- # Can't use 'with' for multicall, as it returns an iterator
- # that will continue to use the connection. When it's done,
- # connection.close() will get called which will put it back into
- # the pool
- # For amqp_rpc_single_reply_queue = True,
- # The 'with' statement is mandatory for closing the connection
LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
@@ -594,21 +520,13 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
_add_unique_id(msg)
pack_context(msg, context)
- # TODO(pekowski): Remove this flag and the code under the if clause
- # in Havana.
- if not conf.amqp_rpc_single_reply_queue:
- conn = ConnectionContext(conf, connection_pool)
- wait_msg = MulticallWaiter(conf, conn, timeout)
- conn.declare_direct_consumer(msg_id, wait_msg)
+ with _reply_proxy_create_sem:
+ if not connection_pool.reply_proxy:
+ connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
+ msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
+ wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
+ with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
- else:
- with _reply_proxy_create_sem:
- if not connection_pool.reply_proxy:
- connection_pool.reply_proxy = ReplyProxy(conf, connection_pool)
- msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
- wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
- with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
return wait_msg