diff options
author | Russell Bryant <rbryant@redhat.com> | 2013-02-18 20:50:45 -0500 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2013-02-20 08:58:58 -0500 |
commit | c2066facd0d5df60fc4129ff9e707175fda3fad1 (patch) | |
tree | b15306d1c4c06b48671729151eb6d7db5d758150 | |
parent | 96a4518575ea550d8e997444bfc243b7a822ec11 (diff) | |
download | nova-c2066facd0d5df60fc4129ff9e707175fda3fad1.tar.gz nova-c2066facd0d5df60fc4129ff9e707175fda3fad1.tar.xz nova-c2066facd0d5df60fc4129ff9e707175fda3fad1.zip |
Sync rpc from oslo-incubator.
This includes the following changes:
* use oslo.config directly
* implementation of rpc-amqp-fast-reply-queue
* simplifications to bin/nova-rpc-zmq-receiver
* add new join_consumer_pool API call
* a bug fix to ensure kombu messages get acked even when an exception
occurs in the message callback
Change-Id: I813ce0ee7d5442d7b5b0f9d477949adabc83174e
-rwxr-xr-x | bin/nova-rpc-zmq-receiver | 32 | ||||
-rw-r--r-- | nova/openstack/common/rpc/__init__.py | 2 | ||||
-rw-r--r-- | nova/openstack/common/rpc/amqp.py | 222 | ||||
-rw-r--r-- | nova/openstack/common/rpc/common.py | 22 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_kombu.py | 27 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 28 |
6 files changed, 286 insertions, 47 deletions
diff --git a/bin/nova-rpc-zmq-receiver b/bin/nova-rpc-zmq-receiver index 13cd05df6..90b652b91 100755 --- a/bin/nova-rpc-zmq-receiver +++ b/bin/nova-rpc-zmq-receiver @@ -22,9 +22,6 @@ import contextlib import os import sys -from oslo.config import cfg -import zmq - # If ../nova/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -33,45 +30,22 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'nova', '__init__.py')): sys.path.insert(0, POSSIBLE_TOPDIR) -from nova import config -from nova import exception +from oslo.config import cfg + from nova.openstack.common import log as logging from nova.openstack.common import rpc from nova.openstack.common.rpc import impl_zmq -from nova import utils CONF = cfg.CONF CONF.register_opts(rpc.rpc_opts) CONF.register_opts(impl_zmq.zmq_opts) +CONF(sys.argv[1:], project='nova') def main(): - config.parse_args(sys.argv) logging.setup("nova") - utils.monkey_patch() - - ipc_dir = CONF.rpc_zmq_ipc_dir - - # Create the necessary directories/files for this service. - if not os.path.isdir(ipc_dir): - try: - utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) - utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), - ipc_dir, run_as_root=True) - utils.execute('chmod', '750', ipc_dir, run_as_root=True) - except exception.ProcessExecutionError: - logging.error(_("Could not create IPC socket directory.")) - return with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: - consume_in = "tcp://%s:%s" % \ - (CONF.rpc_zmq_bind_address, - CONF.rpc_zmq_port) - consumption_proxy = impl_zmq.InternalContext(None) - - reactor.register(consumption_proxy, - consume_in, zmq.PULL, out_bind=True) - reactor.consume_in_thread() reactor.wait() diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index 736b25b24..ff72c3f8e 100644 --- a/nova/openstack/common/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -299,7 +299,7 @@ def _get_impl(): try: _RPCIMPL = importutils.import_module(CONF.rpc_backend) except ImportError: - # For backwards compatibility with older nova config. + # For backwards compatibility with older oslo.config. impl = CONF.rpc_backend.replace('nova.rpc', 'nova.openstack.common.rpc') _RPCIMPL = importutils.import_module(impl) diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 9aadce733..3f25eed67 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -32,13 +32,27 @@ import uuid from eventlet import greenpool from eventlet import pools from eventlet import semaphore +from eventlet import queue +# TODO(pekowsk): Remove import cfg and below comment in Havana. +# This import should no longer be needed when the amqp_rpc_single_reply_queue +# option is removed. +from oslo.config import cfg from nova.openstack.common import excutils from nova.openstack.common.gettextutils import _ from nova.openstack.common import local from nova.openstack.common import log as logging from nova.openstack.common.rpc import common as rpc_common +# TODO(pekowski): Remove this option in Havana. +amqp_opts = [ + cfg.BoolOpt('amqp_rpc_single_reply_queue', + default=False, + help='Enable a fast single reply queue if using AMQP based ' + 'RPC like RabbitMQ or Qpid.'), +] + +cfg.CONF.register_opts(amqp_opts) LOG = logging.getLogger(__name__) @@ -51,6 +65,7 @@ class Pool(pools.Pool): kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) + self.reply_proxy = None # TODO(comstud): Timeout connections not used in a while def create(self): @@ -60,6 +75,16 @@ class Pool(pools.Pool): def empty(self): while self.free_items: self.get().close() + # Force a new connection pool to be created. + # Note that this was added due to failing unit test cases. The issue + # is the above "while loop" gets all the cached connections from the + # pool and closes them, but never returns them to the pool, a pool + # leak. The unit tests hang waiting for an item to be returned to the + # pool. The unit tests get here via the teatDown() method. In the run + # time code, it gets here via cleanup() and only appears in service.py + # just before doing a sys.exit(), so cleanup() only happens once and + # the leakage is not a problem. + self.connection_cls.pool = None _pool_create_sem = semaphore.Semaphore() @@ -137,6 +162,12 @@ class ConnectionContext(rpc_common.Connection): def create_worker(self, topic, proxy, pool_name): self.connection.create_worker(topic, proxy, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + self.connection.join_consumer_pool(callback, + pool_name, + topic, + exchange_name) + def consume_in_thread(self): self.connection.consume_in_thread() @@ -148,8 +179,45 @@ class ConnectionContext(rpc_common.Connection): raise rpc_common.InvalidRPCConnectionReuse() -def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, - ending=False, log_failure=True): +class ReplyProxy(ConnectionContext): + """ Connection class for RPC replies / callbacks """ + def __init__(self, conf, connection_pool): + self._call_waiters = {} + self._num_call_waiters = 0 + self._num_call_waiters_wrn_threshhold = 10 + self._reply_q = 'reply_' + uuid.uuid4().hex + super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) + self.declare_direct_consumer(self._reply_q, self._process_data) + self.consume_in_thread() + + def _process_data(self, message_data): + msg_id = message_data.pop('_msg_id', None) + waiter = self._call_waiters.get(msg_id) + if not waiter: + LOG.warn(_('no calling threads waiting for msg_id : %s' + ', message : %s') % (msg_id, message_data)) + else: + waiter.put(message_data) + + def add_call_waiter(self, waiter, msg_id): + self._num_call_waiters += 1 + if self._num_call_waiters > self._num_call_waiters_wrn_threshhold: + LOG.warn(_('Number of call waiters is greater than warning ' + 'threshhold: %d. There could be a MulticallProxyWaiter ' + 'leak.') % self._num_call_waiters_wrn_threshhold) + self._num_call_waiters_wrn_threshhold *= 2 + self._call_waiters[msg_id] = waiter + + def del_call_waiter(self, msg_id): + self._num_call_waiters -= 1 + del self._call_waiters[msg_id] + + def get_reply_q(self): + return self._reply_q + + +def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, + failure=None, ending=False, log_failure=True): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. @@ -168,13 +236,21 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) + # If a reply_q exists, add the msg_id to the reply and pass the + # reply_q to direct_send() to use it as the response queue. + # Otherwise use the msg_id for backward compatibilty. + if reply_q: + msg['_msg_id'] = msg_id + conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) + else: + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call""" def __init__(self, **kwargs): self.msg_id = kwargs.pop('msg_id', None) + self.reply_q = kwargs.pop('reply_q', None) self.conf = kwargs.pop('conf') super(RpcContext, self).__init__(**kwargs) @@ -182,13 +258,14 @@ class RpcContext(rpc_common.CommonRpcContext): values = self.to_dict() values['conf'] = self.conf values['msg_id'] = self.msg_id + values['reply_q'] = self.reply_q return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, connection_pool=None, log_failure=True): if self.msg_id: - msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, - ending, log_failure) + msg_reply(self.conf, self.msg_id, self.reply_q, connection_pool, + reply, failure, ending, log_failure) if ending: self.msg_id = None @@ -204,6 +281,7 @@ def unpack_context(conf, msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['reply_q'] = msg.pop('_reply_q', None) context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) @@ -224,15 +302,54 @@ def pack_context(msg, context): msg.update(context_d) -class ProxyCallback(object): - """Calls methods on a proxy object based on method and args.""" +class _ThreadPoolWithWait(object): + """Base class for a delayed invocation manager used by + the Connection class to start up green threads + to handle incoming messages. + """ - def __init__(self, conf, proxy, connection_pool): - self.proxy = proxy + def __init__(self, conf, connection_pool): self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size) self.connection_pool = connection_pool self.conf = conf + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + + +class CallbackWrapper(_ThreadPoolWithWait): + """Wraps a straight callback to allow it to be invoked in a green + thread. + """ + + def __init__(self, conf, callback, connection_pool): + """ + :param conf: cfg.CONF instance + :param callback: a callable (probably a function) + :param connection_pool: connection pool as returned by + get_connection_pool() + """ + super(CallbackWrapper, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.callback = callback + + def __call__(self, message_data): + self.pool.spawn_n(self.callback, message_data) + + +class ProxyCallback(_ThreadPoolWithWait): + """Calls methods on a proxy object based on method and args.""" + + def __init__(self, conf, proxy, connection_pool): + super(ProxyCallback, self).__init__( + conf=conf, + connection_pool=connection_pool, + ) + self.proxy = proxy + def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -293,11 +410,66 @@ class ProxyCallback(object): ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) - def wait(self): - """Wait for all callback threads to exit.""" - self.pool.waitall() + +class MulticallProxyWaiter(object): + def __init__(self, conf, msg_id, timeout, connection_pool): + self._msg_id = msg_id + self._timeout = timeout or conf.rpc_response_timeout + self._reply_proxy = connection_pool.reply_proxy + self._done = False + self._got_ending = False + self._conf = conf + self._dataqueue = queue.LightQueue() + # Add this caller to the reply proxy's call_waiters + self._reply_proxy.add_call_waiter(self, self._msg_id) + + def put(self, data): + self._dataqueue.put(data) + + def done(self): + if self._done: + return + self._done = True + # Remove this caller from reply proxy's call_waiters + self._reply_proxy.del_call_waiter(self._msg_id) + + def _process_data(self, data): + result = None + if data['failure']: + failure = data['failure'] + result = rpc_common.deserialize_remote_exception(self._conf, + failure) + elif data.get('ending', False): + self._got_ending = True + else: + result = data['result'] + return result + + def __iter__(self): + """Return a result until we get a reply with an 'ending" flag""" + if self._done: + raise StopIteration + while True: + try: + data = self._dataqueue.get(timeout=self._timeout) + result = self._process_data(data) + except queue.Empty: + LOG.exception(_('Timed out waiting for RPC response.')) + self.done() + raise rpc_common.Timeout() + except Exception: + with excutils.save_and_reraise_exception(): + self.done() + if self._got_ending: + self.done() + raise StopIteration + if isinstance(result, Exception): + self.done() + raise result + yield result +#TODO(pekowski): Remove MulticallWaiter() in Havana. class MulticallWaiter(object): def __init__(self, conf, connection, timeout): self._connection = connection @@ -353,22 +525,40 @@ def create_connection(conf, new, connection_pool): return ConnectionContext(conf, connection_pool, pooled=not new) +_reply_proxy_create_sem = semaphore.Semaphore() + + def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" + # TODO(pekowski): Remove all these comments in Havana. + # For amqp_rpc_single_reply_queue = False, # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool + # For amqp_rpc_single_reply_queue = True, + # The 'with' statement is mandatory for closing the connection LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) pack_context(msg, context) - conn = ConnectionContext(conf, connection_pool) - wait_msg = MulticallWaiter(conf, conn, timeout) - conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + # TODO(pekowski): Remove this flag and the code under the if clause + # in Havana. + if not conf.amqp_rpc_single_reply_queue: + conn = ConnectionContext(conf, connection_pool) + wait_msg = MulticallWaiter(conf, conn, timeout) + conn.declare_direct_consumer(msg_id, wait_msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + else: + with _reply_proxy_create_sem: + if not connection_pool.reply_proxy: + connection_pool.reply_proxy = ReplyProxy(conf, connection_pool) + msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) + wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) + with ConnectionContext(conf, connection_pool) as conn: + conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) return wait_msg diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index 287986da8..178e3fc19 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -197,6 +197,28 @@ class Connection(object): """ raise NotImplementedError() + def join_consumer_pool(self, callback, pool_name, topic, exchange_name): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + + :param callback: Callable to be invoked for each message. + :type callback: callable accepting one argument + :param pool_name: The name of the consumer pool. + :type pool_name: str + :param topic: The routing topic for desired messages. + :type topic: str + :param exchange_name: The name of the message exchange where + the client should attach. Defaults to + the configured exchange. + :type exchange_name: str + """ + raise NotImplementedError() + def consume_in_thread(self): """Spawn a thread to handle incoming messages. diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index 298c82d52..5e1846f91 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -165,9 +165,10 @@ class ConsumerBase(object): try: msg = rpc_common.deserialize_msg(message.payload) callback(msg) - message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) + finally: + message.ack() self.queue.consume(*args, callback=_callback, **options) @@ -750,6 +751,30 @@ class Connection(object): self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + self.declare_topic_consumer( + queue_name=pool_name, + topic=topic, + exchange_name=exchange_name, + callback=callback_wrapper, + ) + def create_connection(conf, new=True): """Create a connection""" diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index 3daf4ba91..5601ddb33 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -560,6 +560,34 @@ class Connection(object): return consumer + def join_consumer_pool(self, callback, pool_name, topic, + exchange_name=None): + """Register as a member of a group of consumers for a given topic from + the specified exchange. + + Exactly one member of a given pool will receive each message. + + A message will be delivered to multiple pools, if more than + one is created. + """ + callback_wrapper = rpc_amqp.CallbackWrapper( + conf=self.conf, + callback=callback, + connection_pool=rpc_amqp.get_connection_pool(self.conf, + Connection), + ) + self.proxy_callbacks.append(callback_wrapper) + + consumer = TopicConsumer(conf=self.conf, + session=self.session, + topic=topic, + callback=callback_wrapper, + name=pool_name, + exchange_name=exchange_name) + + self._register_consumer(consumer) + return consumer + def create_connection(conf, new=True): """Create a connection""" |