From a60367820d826c536018b036480f7061b814f1a1 Mon Sep 17 00:00:00 2001 From: Eric Windisch Date: Sat, 9 Feb 2013 00:57:47 -0500 Subject: Implement replay detection. Bumps the envelope revision to 2.1 Change-Id: Ib0260a0c62e3d312d2e3448a125bed64d861319e --- openstack/common/rpc/common.py | 81 ++++++++++++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 19 deletions(-) (limited to 'openstack/common/rpc/common.py') diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 6c52bd8..14489c8 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -17,9 +17,11 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import copy import sys import traceback +import uuid from oslo.config import cfg @@ -46,33 +48,40 @@ This version number applies to the message envelope that is used in the serialization done inside the rpc layer. See serialize_msg() and deserialize_msg(). -The current message format (version 2.0) is very simple. It is: +The current message format (version 2.1) is very simple. It is: { 'oslo.version': , 'oslo.message': + 'oslo.nonce': } Message format version '1.0' is just considered to be the messages we sent without a message envelope. -So, the current message envelope just includes the envelope version. It may -eventually contain additional information, such as a signature for the message -payload. +Message format version '2.0' sent oslo.message containing a JSON encoded +Application Message Payload without Hashed Parameters. -We will JSON encode the application message payload. The message envelope, +The message format is intended eventually contain additional information, +such as a signature for the message payload. + +We will JSON encode the application message payload. The message, which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_RPC_ENVELOPE_VERSION = '2.0' +_RPC_ENVELOPE_VERSION = '2.1' _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' +_NONCE_KEY = 'oslo.nonce' # TODO(russellb) Turn this on after Grizzly. _SEND_RPC_ENVELOPE = False +DUP_MSG_CHECK_SIZE = 512 # Arbitrary - make configurable. +SEEN_MSGS = collections.deque([], maxlen=DUP_MSG_CHECK_SIZE) + class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -125,6 +134,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicatedMessageError(RPCException): + message = _("Received replayed message(%(msg_id)s). Ignoring.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") @@ -139,6 +152,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException): "not supported by this endpoint.") +class InvalidRpcEnvelope(RPCException): + message = _("RPC envelope was malformed.") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -438,17 +455,32 @@ def version_is_compatible(imp_version, version): def serialize_msg(raw_msg, force_envelope=False): + msg_identifier = uuid.uuid4().hex + if not _SEND_RPC_ENVELOPE and not force_envelope: + if isinstance(raw_msg, dict): + raw_msg['_nonce'] = msg_identifier return raw_msg + """Make an RPC message envelope""" # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more # information about this format. msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, - _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + _MESSAGE_KEY: jsonutils.dumps(raw_msg), + _NONCE_KEY: msg_identifier} return msg +def _raise_if_duplicate(duplicate_key): + """Check if a message is a duplicate based on key.""" + if not duplicate_key: + return + if duplicate_key in SEEN_MSGS: + raise DuplicatedMessageError(duplicate_key) + SEEN_MSGS.append(duplicate_key) + + def deserialize_msg(msg): # NOTE(russellb): Hang on to your hats, this road is about to # get a little bumpy. @@ -473,21 +505,32 @@ def deserialize_msg(msg): # This case covers return values from rpc.call() from before message # envelopes were used. (messages to call a method were always a dict) + has_envelope = True + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) if not isinstance(msg, dict): # See #2 above. return msg - - base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) - if not all(map(lambda key: key in msg, base_envelope_keys)): - # See #1.b above. - return msg - - # At this point we think we have the message envelope - # format we were expecting. (#1.a above) - - if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + elif not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + has_envelope = False + elif not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) - - raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + nonce = None + raw_msg = None + + if has_envelope and '_NONCE_KEY' in msg: # envelope v2.1 + _raise_if_duplicate(msg[_NONCE_KEY]) + + # Here, we can delay jsonutils.loads until + # after we have verified the message. + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + elif has_envelope: # envelope v2.0 + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + nonce = raw_msg.get('_nonce') + _raise_if_duplicate(nonce) + else: # no envelope ("v1.0") + raw_msg = msg + nonce = raw_msg.get('_nonce') + _raise_if_duplicate(nonce) return raw_msg -- cgit