diff options
| author | Kei Masumoto <masumotok@nttdata.co.jp> | 2013-01-28 02:49:32 +0900 |
|---|---|---|
| committer | Kei Masumoto <masumotok@nttdata.co.jp> | 2013-02-22 13:41:08 -0800 |
| commit | c312fd6742c035677347a6987fb17b67e2c72087 (patch) | |
| tree | e81470a8968523d4211ac85e9338291f6dfb1027 /tests | |
| parent | 2b418be864a5aa5ba135f7651e83051cf3bf9ce6 (diff) | |
| download | oslo-c312fd6742c035677347a6987fb17b67e2c72087.tar.gz oslo-c312fd6742c035677347a6987fb17b67e2c72087.tar.xz oslo-c312fd6742c035677347a6987fb17b67e2c72087.zip | |
AMQP: skip duplicate messages
When using rabbit's mirrored queues or qpid's replicated queues, there
are conditions under which you can receive the same message twice.
One such condition is where a message has been sent to a consumer but
before an ack is received by a consumer, the master fails over to a
slave and the slave resends the message. Note that the consumer may have
sent the ack, but it was lost as the master went down.
Dispatching the same message twice is obviously something we want to
avoid. In order to do so, we add a unique_id to each message sent and
have consumers maintain a fixed length queue of recently seen unique
message IDs. Before dispatching any received message, the queue is
checked and the message is skipped if it is a duplicate.
Fixes bugs 1107064.
Change-Id: I5bfacadbdf7de8b34d6370b9aa869c271957692d
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/unit/rpc/amqp.py | 32 |
1 files changed, 32 insertions, 0 deletions
diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py index 1e4733c..4a96ce0 100644 --- a/tests/unit/rpc/amqp.py +++ b/tests/unit/rpc/amqp.py @@ -81,6 +81,11 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): def fake_notify_send(_conn, topic, msg): self.test_msg = msg + def remove_unique_id(msg): + oslo_msg = jsonutils.loads(msg['oslo.message']) + oslo_msg.pop('_unique_id') + msg['oslo.message'] = jsonutils.dumps(oslo_msg) + self.stubs.Set(self.rpc.Connection, 'notify_send', fake_notify_send) self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, @@ -100,6 +105,8 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): } self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, envelope=True) + remove_unique_id(self.test_msg) + remove_unique_id(msg) self.assertEqual(self.test_msg, msg) # Make sure envelopes are still on notifications, even if turned off @@ -107,6 +114,7 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False) self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, envelope=True) + remove_unique_id(self.test_msg) self.assertEqual(self.test_msg, msg) def test_single_reply_queue_on_has_ids( @@ -233,3 +241,27 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): self.config(amqp_rpc_single_reply_queue=True) self.test_multithreaded_resp_routing() self.config(amqp_rpc_single_reply_queue=False) + + def test_duplicate_message_check(self): + """Test sending *not-dict* to a topic exchange/queue""" + + conn = self.rpc.create_connection(FLAGS) + message = {'args': 'topic test message', '_unique_id': 'aaaabbbbcccc'} + + self.received_message = None + cache = rpc_amqp._MsgIdCache() + self.exc_raised = False + + def _callback(message): + try: + cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + self.exc_raised = True + + conn.declare_topic_consumer('a_topic', _callback) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) + conn.topic_send('a_topic', rpc_common.serialize_msg(message)) + conn.consume(limit=2) + conn.close() + + self.assertTrue(self.exc_raised) |
