summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/common.py
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-02-09 00:57:47 -0500
committerEric Windisch <eric@cloudscaling.com>2013-02-19 14:43:19 -0500
commita60367820d826c536018b036480f7061b814f1a1 (patch)
tree76c6c7280e135c8a008d1eef27843356d1acdc86 /openstack/common/rpc/common.py
parent70891c271e011f59792933eaf65c3214493ef14a (diff)
downloadoslo-a60367820d826c536018b036480f7061b814f1a1.tar.gz
oslo-a60367820d826c536018b036480f7061b814f1a1.tar.xz
oslo-a60367820d826c536018b036480f7061b814f1a1.zip
Implement replay detection.
Bumps the envelope revision to 2.1 Change-Id: Ib0260a0c62e3d312d2e3448a125bed64d861319e
Diffstat (limited to 'openstack/common/rpc/common.py')
-rw-r--r--openstack/common/rpc/common.py81
1 files changed, 62 insertions, 19 deletions
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 6c52bd8..14489c8 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -17,9 +17,11 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import copy
import sys
import traceback
+import uuid
from oslo.config import cfg
@@ -46,33 +48,40 @@ This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.0) is very simple. It is:
+The current message format (version 2.1) is very simple. It is:
{
'oslo.version': <RPC Envelope Version as a String>,
'oslo.message': <Application Message Payload, JSON encoded>
+ 'oslo.nonce': <Unique message identifier>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
-So, the current message envelope just includes the envelope version. It may
-eventually contain additional information, such as a signature for the message
-payload.
+Message format version '2.0' sent oslo.message containing a JSON encoded
+Application Message Payload without Hashed Parameters.
-We will JSON encode the application message payload. The message envelope,
+The message format is intended eventually contain additional information,
+such as a signature for the message payload.
+
+We will JSON encode the application message payload. The message,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.0'
+_RPC_ENVELOPE_VERSION = '2.1'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
+_NONCE_KEY = 'oslo.nonce'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
+DUP_MSG_CHECK_SIZE = 512 # Arbitrary - make configurable.
+SEEN_MSGS = collections.deque([], maxlen=DUP_MSG_CHECK_SIZE)
+
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -125,6 +134,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
+class DuplicatedMessageError(RPCException):
+ message = _("Received replayed message(%(msg_id)s). Ignoring.")
+
+
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
@@ -139,6 +152,10 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
+class InvalidRpcEnvelope(RPCException):
+ message = _("RPC envelope was malformed.")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -438,17 +455,32 @@ def version_is_compatible(imp_version, version):
def serialize_msg(raw_msg, force_envelope=False):
+ msg_identifier = uuid.uuid4().hex
+
if not _SEND_RPC_ENVELOPE and not force_envelope:
+ if isinstance(raw_msg, dict):
+ raw_msg['_nonce'] = msg_identifier
return raw_msg
+ """Make an RPC message envelope"""
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
- _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg),
+ _NONCE_KEY: msg_identifier}
return msg
+def _raise_if_duplicate(duplicate_key):
+ """Check if a message is a duplicate based on key."""
+ if not duplicate_key:
+ return
+ if duplicate_key in SEEN_MSGS:
+ raise DuplicatedMessageError(duplicate_key)
+ SEEN_MSGS.append(duplicate_key)
+
+
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
@@ -473,21 +505,32 @@ def deserialize_msg(msg):
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
+ has_envelope = True
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not isinstance(msg, dict):
# See #2 above.
return msg
-
- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
- if not all(map(lambda key: key in msg, base_envelope_keys)):
- # See #1.b above.
- return msg
-
- # At this point we think we have the message envelope
- # format we were expecting. (#1.a above)
-
- if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+ elif not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ has_envelope = False
+ elif not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
-
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+ nonce = None
+ raw_msg = None
+
+ if has_envelope and '_NONCE_KEY' in msg: # envelope v2.1
+ _raise_if_duplicate(msg[_NONCE_KEY])
+
+ # Here, we can delay jsonutils.loads until
+ # after we have verified the message.
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+ elif has_envelope: # envelope v2.0
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+ nonce = raw_msg.get('_nonce')
+ _raise_if_duplicate(nonce)
+ else: # no envelope ("v1.0")
+ raw_msg = msg
+ nonce = raw_msg.get('_nonce')
+ _raise_if_duplicate(nonce)
return raw_msg