diff options
author | Russell Bryant <rbryant@redhat.com> | 2013-03-12 14:24:49 -0400 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2013-03-12 14:24:49 -0400 |
commit | d984b0b0bcbd17567f127395c8b890cddd9edd30 (patch) | |
tree | 91ff2359350497c8619d5cecd63737e3f04ef579 /nova/openstack | |
parent | 562b0787421b145c4a91ccbbcacfc74c340cead8 (diff) | |
download | nova-d984b0b0bcbd17567f127395c8b890cddd9edd30.tar.gz nova-d984b0b0bcbd17567f127395c8b890cddd9edd30.tar.xz nova-d984b0b0bcbd17567f127395c8b890cddd9edd30.zip |
Sync rpc from oslo-incubator.
Changes include:
* Fix Copyright Headers - Rename LLC to Foundation
* Preserve exceptions in impl_zmq.
* Don't log a traceback on rpc call timeout.
* Redis-based matchmaker
* AMQP: skip duplicate messages
* some other minor fixes / cleanups
Fix bug 1154232.
Change-Id: I8afb6b9a919fca34b4a93f7fad1551a023eba6b2
Diffstat (limited to 'nova/openstack')
-rw-r--r-- | nova/openstack/common/rpc/amqp.py | 59 | ||||
-rw-r--r-- | nova/openstack/common/rpc/common.py | 4 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_kombu.py | 4 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 4 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_zmq.py | 34 | ||||
-rw-r--r-- | nova/openstack/common/rpc/matchmaker.py | 169 |
6 files changed, 250 insertions, 24 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 3f25eed67..c23e36ec8 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -25,25 +25,27 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses AMQP, but is deprecated and predates this code. """ +import collections import inspect import sys import uuid from eventlet import greenpool from eventlet import pools -from eventlet import semaphore from eventlet import queue - +from eventlet import semaphore # TODO(pekowsk): Remove import cfg and below comment in Havana. # This import should no longer be needed when the amqp_rpc_single_reply_queue # option is removed. from oslo.config import cfg + from nova.openstack.common import excutils from nova.openstack.common.gettextutils import _ from nova.openstack.common import local from nova.openstack.common import log as logging from nova.openstack.common.rpc import common as rpc_common + # TODO(pekowski): Remove this option in Havana. amqp_opts = [ cfg.BoolOpt('amqp_rpc_single_reply_queue', @@ -54,6 +56,7 @@ amqp_opts = [ cfg.CONF.register_opts(amqp_opts) +UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -236,6 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, 'failure': failure} if ending: msg['ending'] = True + _add_unique_id(msg) # If a reply_q exists, add the msg_id to the reply and pass the # reply_q to direct_send() to use it as the response queue. # Otherwise use the msg_id for backward compatibilty. @@ -302,6 +306,37 @@ def pack_context(msg, context): msg.update(context_d) +class _MsgIdCache(object): + """This class checks any duplicate messages.""" + + # NOTE: This value is considered can be a configuration item, but + # it is not necessary to change its value in most cases, + # so let this value as static for now. + DUP_MSG_CHECK_SIZE = 16 + + def __init__(self, **kwargs): + self.prev_msgids = collections.deque([], + maxlen=self.DUP_MSG_CHECK_SIZE) + + def check_duplicate_message(self, message_data): + """AMQP consumers may read same message twice when exceptions occur + before ack is returned. This method prevents doing it. + """ + if UNIQUE_ID in message_data: + msg_id = message_data[UNIQUE_ID] + if msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) + else: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + +def _add_unique_id(msg): + """Add unique_id for checking duplicate messages.""" + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + + class _ThreadPoolWithWait(object): """Base class for a delayed invocation manager used by the Connection class to start up green threads @@ -349,6 +384,7 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=connection_pool, ) self.proxy = proxy + self.msg_id_cache = _MsgIdCache() def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -368,6 +404,7 @@ class ProxyCallback(_ThreadPoolWithWait): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) @@ -406,9 +443,11 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=self.connection_pool, log_failure=False) except Exception: - LOG.exception(_('Exception during message handling')) - ctxt.reply(None, sys.exc_info(), - connection_pool=self.connection_pool) + # sys.exc_info() is deleted by LOG.exception(). + exc_info = sys.exc_info() + LOG.error(_('Exception during message handling'), + exc_info=exc_info) + ctxt.reply(None, exc_info, connection_pool=self.connection_pool) class MulticallProxyWaiter(object): @@ -422,6 +461,7 @@ class MulticallProxyWaiter(object): self._dataqueue = queue.LightQueue() # Add this caller to the reply proxy's call_waiters self._reply_proxy.add_call_waiter(self, self._msg_id) + self.msg_id_cache = _MsgIdCache() def put(self, data): self._dataqueue.put(data) @@ -435,6 +475,7 @@ class MulticallProxyWaiter(object): def _process_data(self, data): result = None + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] result = rpc_common.deserialize_remote_exception(self._conf, @@ -479,6 +520,7 @@ class MulticallWaiter(object): self._done = False self._got_ending = False self._conf = conf + self.msg_id_cache = _MsgIdCache() def done(self): if self._done: @@ -490,6 +532,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, @@ -542,6 +585,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) + _add_unique_id(msg) pack_context(msg, context) # TODO(pekowski): Remove this flag and the code under the if clause @@ -575,6 +619,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg)) @@ -583,6 +628,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.fanout_send(topic, rpc_common.serialize_msg(msg)) @@ -590,6 +636,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -599,6 +646,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def fanout_cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a fanout exchange to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -610,6 +658,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index 55fc5b044..5fca30717 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -125,6 +125,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicateMessageError(RPCException): + message = _("Found duplicate message(%(msg_id)s). Skipping it.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index 0d83253f1..81afc2a8b 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -624,8 +624,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index 542256d0c..fd4b25e7f 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -415,8 +415,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index c1cca34e8..4102146fb 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -25,6 +25,7 @@ import eventlet import greenlet from oslo.config import cfg +from nova.openstack.common import excutils from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils from nova.openstack.common import jsonutils @@ -91,8 +92,8 @@ def _serialize(data): try: return jsonutils.dumps(data, ensure_ascii=True) except TypeError: - LOG.error(_("JSON serialization failed.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("JSON serialization failed.")) def _deserialize(data): @@ -511,9 +512,9 @@ class ZmqProxy(ZmqBaseReactor): ipc_dir, run_as_root=True) utils.execute('chmod', '750', ipc_dir, run_as_root=True) except utils.ProcessExecutionError: - LOG.error(_("Could not create IPC directory %s") % - (ipc_dir, )) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) try: self.register(consumption_proxy, @@ -521,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor): zmq.PULL, out_bind=True) except zmq.ZMQError: - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() @@ -594,6 +595,9 @@ class Connection(rpc_common.Connection): self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) + # Subscription scenarios if fanout: sock_type = zmq.SUB @@ -620,6 +624,10 @@ class Connection(rpc_common.Connection): self.topics.append(topic) def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + self.reactor.close() self.topics = [] @@ -627,6 +635,7 @@ class Connection(rpc_common.Connection): self.reactor.wait() def consume_in_thread(self): + _get_matchmaker().start_heartbeat() self.reactor.consume_in_thread() @@ -742,7 +751,7 @@ def _multi_send(method, context, topic, msg, timeout=None, LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. - raise rpc_common.Timeout, "No match from matchmaker." + raise rpc_common.Timeout(_("No match from matchmaker.")) # This supports brokerless fanout (addresses > 1) for queue in queues: @@ -785,7 +794,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs): _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) -def notify(conf, context, topic, msg, **kwargs): +def notify(conf, context, topic, msg, envelope): """ Send notification event. Notifications are sent to topic-priority. @@ -793,9 +802,8 @@ def notify(conf, context, topic, msg, **kwargs): """ # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. - topic.replace('.', '-') - kwargs['envelope'] = kwargs.get('envelope', True) - cast(conf, context, topic, msg, **kwargs) + topic = topic.replace('.', '-') + cast(conf, context, topic, msg, envelope=envelope) def cleanup(): diff --git a/nova/openstack/common/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py index 57cc0b34c..e4862396a 100644 --- a/nova/openstack/common/rpc/matchmaker.py +++ b/nova/openstack/common/rpc/matchmaker.py @@ -22,6 +22,7 @@ import contextlib import itertools import json +import eventlet from oslo.config import cfg from nova.openstack.common.gettextutils import _ @@ -33,6 +34,12 @@ matchmaker_opts = [ cfg.StrOpt('matchmaker_ringfile', default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), + cfg.IntOpt('matchmaker_heartbeat_freq', + default='300', + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default='600', + help='Heartbeat time-to-live.'), ] CONF = cfg.CONF @@ -70,12 +77,73 @@ class Binding(object): class MatchMakerBase(object): - """Match Maker Base Class.""" - + """ + Match Maker Base Class. + Build off HeartbeatMatchMakerBase if building a + heartbeat-capable MatchMaker. + """ def __init__(self): # Array of tuples. Index [2] toggles negation, [3] is last-if-true self.bindings = [] + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """ + Acknowledge that a key.host is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + pass + + def is_alive(self, topic, host): + """ + Checks if a host is alive. + """ + pass + + def expire(self, topic, host): + """ + Explicitly expire a host's registration. + """ + pass + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """ + Unregister a topic. + """ + pass + + def start_heartbeat(self): + """ + Spawn heartbeat greenthread. + """ + pass + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + pass + def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) @@ -99,6 +167,103 @@ class MatchMakerBase(object): return workers +class HeartbeatMatchMakerBase(MatchMakerBase): + """ + Base for a heart-beat capable MatchMaker. + Provides common methods for registering, + unregistering, and maintaining heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """ + Acknowledge that a host.topic is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """ + Implements registration logic. + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """ + Implements de-registration logic. + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """ + Unregister a topic. + """ + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + + def start_heartbeat(self): + """ + Implementation of MatchMakerBase.start_heartbeat + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if len(self.hosts) == 0: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + if self._heart: + self._heart.kill() + + class DirectBinding(Binding): """ Specifies a host in the key via a '.' character |