diff options
author | Eric Windisch <eric@cloudscaling.com> | 2013-02-09 00:57:47 -0500 |
---|---|---|
committer | Eric Windisch <eric@cloudscaling.com> | 2013-02-19 14:43:19 -0500 |
commit | a60367820d826c536018b036480f7061b814f1a1 (patch) | |
tree | 76c6c7280e135c8a008d1eef27843356d1acdc86 | |
parent | 70891c271e011f59792933eaf65c3214493ef14a (diff) | |
download | oslo-a60367820d826c536018b036480f7061b814f1a1.tar.gz oslo-a60367820d826c536018b036480f7061b814f1a1.tar.xz oslo-a60367820d826c536018b036480f7061b814f1a1.zip |
Implement replay detection.
Bumps the envelope revision to 2.1
Change-Id: Ib0260a0c62e3d312d2e3448a125bed64d861319e
-rw-r--r-- | openstack/common/rpc/common.py | 81 | ||||
-rw-r--r-- | tests/unit/rpc/amqp.py | 8 | ||||
-rw-r--r-- | tests/unit/rpc/test_common.py | 32 |
3 files changed, 99 insertions, 22 deletions
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 6c52bd8..14489c8 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -17,9 +17,11 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import copy import sys import traceback +import uuid from oslo.config import cfg @@ -46,33 +48,40 @@ This version number applies to the message envelope that is used in the serialization done inside the rpc layer. See serialize_msg() and deserialize_msg(). -The current message format (version 2.0) is very simple. It is: +The current message format (version 2.1) is very simple. It is: { 'oslo.version': <RPC Envelope Version as a String>, 'oslo.message': <Application Message Payload, JSON encoded> + 'oslo.nonce': <Unique message identifier> } Message format version '1.0' is just considered to be the messages we sent without a message envelope. -So, the current message envelope just includes the envelope version. It may -eventually contain additional information, such as a signature for the message -payload. +Message format version '2.0' sent oslo.message containing a JSON encoded +Application Message Payload without Hashed Parameters. -We will JSON encode the application message payload. The message envelope, +The message format is intended eventually contain additional information, +such as a signature for the message payload. + +We will JSON encode the application message payload. The message, which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_RPC_ENVELOPE_VERSION = '2.0' +_RPC_ENVELOPE_VERSION = '2.1' _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' +_NONCE_KEY = 'oslo.nonce' # TODO(russellb) Turn this on after Grizzly. _SEND_RPC_ENVELOPE = False +DUP_MSG_CHECK_SIZE = 512 # Arbitrary - make configurable. +SEEN_MSGS = collections.deque([], maxlen=DUP_MSG_CHECK_SIZE) + class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -125,6 +134,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicatedMessageError(RPCException): + message = _("Received replayed message(%(msg_id)s). Ignoring.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") @@ -139,6 +152,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException): "not supported by this endpoint.") +class InvalidRpcEnvelope(RPCException): + message = _("RPC envelope was malformed.") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -438,17 +455,32 @@ def version_is_compatible(imp_version, version): def serialize_msg(raw_msg, force_envelope=False): + msg_identifier = uuid.uuid4().hex + if not _SEND_RPC_ENVELOPE and not force_envelope: + if isinstance(raw_msg, dict): + raw_msg['_nonce'] = msg_identifier return raw_msg + """Make an RPC message envelope""" # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more # information about this format. msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, - _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + _MESSAGE_KEY: jsonutils.dumps(raw_msg), + _NONCE_KEY: msg_identifier} return msg +def _raise_if_duplicate(duplicate_key): + """Check if a message is a duplicate based on key.""" + if not duplicate_key: + return + if duplicate_key in SEEN_MSGS: + raise DuplicatedMessageError(duplicate_key) + SEEN_MSGS.append(duplicate_key) + + def deserialize_msg(msg): # NOTE(russellb): Hang on to your hats, this road is about to # get a little bumpy. @@ -473,21 +505,32 @@ def deserialize_msg(msg): # This case covers return values from rpc.call() from before message # envelopes were used. (messages to call a method were always a dict) + has_envelope = True + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) if not isinstance(msg, dict): # See #2 above. return msg - - base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) - if not all(map(lambda key: key in msg, base_envelope_keys)): - # See #1.b above. - return msg - - # At this point we think we have the message envelope - # format we were expecting. (#1.a above) - - if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + elif not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + has_envelope = False + elif not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) - - raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + nonce = None + raw_msg = None + + if has_envelope and '_NONCE_KEY' in msg: # envelope v2.1 + _raise_if_duplicate(msg[_NONCE_KEY]) + + # Here, we can delay jsonutils.loads until + # after we have verified the message. + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + elif has_envelope: # envelope v2.0 + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + nonce = raw_msg.get('_nonce') + _raise_if_duplicate(nonce) + else: # no envelope ("v1.0") + raw_msg = msg + nonce = raw_msg.get('_nonce') + _raise_if_duplicate(nonce) return raw_msg diff --git a/tests/unit/rpc/amqp.py b/tests/unit/rpc/amqp.py index 1e4733c..c7215ea 100644 --- a/tests/unit/rpc/amqp.py +++ b/tests/unit/rpc/amqp.py @@ -100,14 +100,18 @@ class BaseRpcAMQPTestCase(common.BaseRpcTestCase): } self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, envelope=True) - self.assertEqual(self.test_msg, msg) + for k, v in msg.items(): + self.assertIn(k, self.test_msg) + self.assertEqual(self.test_msg[k], v) # Make sure envelopes are still on notifications, even if turned off # for general messages. self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False) self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, envelope=True) - self.assertEqual(self.test_msg, msg) + for k, v in msg.items(): + self.assertIn(k, self.test_msg) + self.assertEqual(self.test_msg[k], v) def test_single_reply_queue_on_has_ids( self, single_reply_queue_for_callee_off=False): diff --git a/tests/unit/rpc/test_common.py b/tests/unit/rpc/test_common.py index 976da84..b4d4cf4 100644 --- a/tests/unit/rpc/test_common.py +++ b/tests/unit/rpc/test_common.py @@ -250,10 +250,40 @@ class RpcCommonTestCase(test_utils.BaseTestCase): 'oslo.message': jsonutils.dumps(msg)} serialized = rpc_common.serialize_msg(msg) - self.assertEqual(s_msg, rpc_common.serialize_msg(msg)) + for k, v in s_msg.items(): + self.assertIn(k, serialized) + self.assertEqual(serialized[k], v) self.assertEqual(msg, rpc_common.deserialize_msg(serialized)) + def test_serialize_msg_v2_1(self): + self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True) + msg = {'foo': 'bar'} + s_msg = {'oslo.version': rpc_common._RPC_ENVELOPE_VERSION, + 'oslo.message': jsonutils.dumps(msg), + 'oslo.nonce': ''} + serialized = rpc_common.serialize_msg(msg) + + for k, v in s_msg.items(): + self.assertIn(k, serialized) + + if k == 'oslo.nonce': + # This key's value is set by serialize_msg + re_uuid = re.compile(r'[0-9a-f]{32}$', re.I) + self.assertTrue(re_uuid.match(serialized[k])) + continue + + self.assertEqual(serialized[k], v) + + self.assertEqual(msg, rpc_common.deserialize_msg(serialized)) + + def test_serialize_msg_v2_1(self): + self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True) + msg = {'foo': 'bar'} + serialized = rpc_common.serialize_msg(msg) + self.assertIn('oslo.nonce', serialized) + self.assertEqual(msg, rpc_common.deserialize_msg(serialized)) + def test_deserialize_msg_no_envelope(self): self.assertEqual(1, rpc_common.deserialize_msg(1)) self.assertEqual([], rpc_common.deserialize_msg([])) |