summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorKei Masumoto <masumotok@nttdata.co.jp>2013-01-28 02:49:32 +0900
committerKei Masumoto <masumotok@nttdata.co.jp>2013-02-22 13:41:08 -0800
commitc312fd6742c035677347a6987fb17b67e2c72087 (patch)
treee81470a8968523d4211ac85e9338291f6dfb1027 /openstack
parent2b418be864a5aa5ba135f7651e83051cf3bf9ce6 (diff)
downloadoslo-c312fd6742c035677347a6987fb17b67e2c72087.tar.gz
oslo-c312fd6742c035677347a6987fb17b67e2c72087.tar.xz
oslo-c312fd6742c035677347a6987fb17b67e2c72087.zip
AMQP: skip duplicate messages
When using rabbit's mirrored queues or qpid's replicated queues, there are conditions under which you can receive the same message twice. One such condition is where a message has been sent to a consumer but before an ack is received by a consumer, the master fails over to a slave and the slave resends the message. Note that the consumer may have sent the ack, but it was lost as the master went down. Dispatching the same message twice is obviously something we want to avoid. In order to do so, we add a unique_id to each message sent and have consumers maintain a fixed length queue of recently seen unique message IDs. Before dispatching any received message, the queue is checked and the message is skipped if it is a duplicate. Fixes bugs 1107064. Change-Id: I5bfacadbdf7de8b34d6370b9aa869c271957692d
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/amqp.py46
-rw-r--r--openstack/common/rpc/common.py4
2 files changed, 50 insertions, 0 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 98cfd9d..2ae7af2 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -25,6 +25,7 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
+import collections
import inspect
import sys
import uuid
@@ -54,6 +55,7 @@ amqp_opts = [
cfg.CONF.register_opts(amqp_opts)
+UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@@ -236,6 +238,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
'failure': failure}
if ending:
msg['ending'] = True
+ _add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
@@ -302,6 +305,37 @@ def pack_context(msg, context):
msg.update(context_d)
+class _MsgIdCache(object):
+ """This class checks any duplicate messages."""
+
+ # NOTE: This value is considered can be a configuration item, but
+ # it is not necessary to change its value in most cases,
+ # so let this value as static for now.
+ DUP_MSG_CHECK_SIZE = 16
+
+ def __init__(self, **kwargs):
+ self.prev_msgids = collections.deque([],
+ maxlen=self.DUP_MSG_CHECK_SIZE)
+
+ def check_duplicate_message(self, message_data):
+ """AMQP consumers may read same message twice when exceptions occur
+ before ack is returned. This method prevents doing it.
+ """
+ if UNIQUE_ID in message_data:
+ msg_id = message_data[UNIQUE_ID]
+ if msg_id not in self.prev_msgids:
+ self.prev_msgids.append(msg_id)
+ else:
+ raise rpc_common.DuplicateMessageError(msg_id=msg_id)
+
+
+def _add_unique_id(msg):
+ """Add unique_id for checking duplicate messages."""
+ unique_id = uuid.uuid4().hex
+ msg.update({UNIQUE_ID: unique_id})
+ LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
+
+
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
@@ -349,6 +383,7 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=connection_pool,
)
self.proxy = proxy
+ self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@@ -368,6 +403,7 @@ class ProxyCallback(_ThreadPoolWithWait):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
+ self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
@@ -422,6 +458,7 @@ class MulticallProxyWaiter(object):
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
+ self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
@@ -435,6 +472,7 @@ class MulticallProxyWaiter(object):
def _process_data(self, data):
result = None
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
@@ -479,6 +517,7 @@ class MulticallWaiter(object):
self._done = False
self._got_ending = False
self._conf = conf
+ self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
@@ -490,6 +529,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
+ self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
@@ -542,6 +582,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
+ _add_unique_id(msg)
pack_context(msg, context)
# TODO(pekowski): Remove this flag and the code under the if clause
@@ -575,6 +616,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
@@ -583,6 +625,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@@ -590,6 +633,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@@ -599,6 +643,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@@ -610,6 +655,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
+ _add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 6c52bd8..5661bef 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -125,6 +125,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
+class DuplicateMessageError(RPCException):
+ message = _("Found duplicate message(%(msg_id)s). Skipping it.")
+
+
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")