summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorMark McLoughlin <markmc@redhat.com>2013-02-20 23:08:50 +0000
committerMark McLoughlin <markmc@redhat.com>2013-02-20 23:16:22 +0000
commit6f9cef85353155538fe14e5025e2d0bab5cc63e4 (patch)
tree23598db753266a1a4f167c725d267a5d8b74ae59 /openstack
parent7e6513dedc58577c187d69d576659bc7f2ed7a87 (diff)
downloadoslo-6f9cef85353155538fe14e5025e2d0bab5cc63e4.tar.gz
oslo-6f9cef85353155538fe14e5025e2d0bab5cc63e4.tar.xz
oslo-6f9cef85353155538fe14e5025e2d0bab5cc63e4.zip
Revert "Implement replay detection."
This reverts Ib0260a0c62e3d312d2e3448a125bed64d861319e (commit a603678) The issue we're trying to fix here is bug #1107064 - when using mirrored queues with AMQP, acks can be lost while a master is failing over to a slace causing the new slave to re-send messages which had previously been acked. The "replay detection" code applies to more than just amqp and also has the appearance of a security measure (e.g. the use of the term 'nonce') when clearly it serves no security purpose until we actually have message signing. Revert the "replay detection" approach in favour of the more targetted amqp bugfix. Change-Id: I8b8d15835c8b4c85cd388f5df08b60ff4c74e38d
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/common.py81
1 files changed, 19 insertions, 62 deletions
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 14489c8..6c52bd8 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -17,11 +17,9 @@
# License for the specific language governing permissions and limitations
# under the License.
-import collections
import copy
import sys
import traceback
-import uuid
from oslo.config import cfg
@@ -48,40 +46,33 @@ This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.1) is very simple. It is:
+The current message format (version 2.0) is very simple. It is:
{
'oslo.version': <RPC Envelope Version as a String>,
'oslo.message': <Application Message Payload, JSON encoded>
- 'oslo.nonce': <Unique message identifier>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
-Message format version '2.0' sent oslo.message containing a JSON encoded
-Application Message Payload without Hashed Parameters.
+So, the current message envelope just includes the envelope version. It may
+eventually contain additional information, such as a signature for the message
+payload.
-The message format is intended eventually contain additional information,
-such as a signature for the message payload.
-
-We will JSON encode the application message payload. The message,
+We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.1'
+_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
-_NONCE_KEY = 'oslo.nonce'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
-DUP_MSG_CHECK_SIZE = 512 # Arbitrary - make configurable.
-SEEN_MSGS = collections.deque([], maxlen=DUP_MSG_CHECK_SIZE)
-
class RPCException(Exception):
message = _("An unknown RPC related exception occurred.")
@@ -134,10 +125,6 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
-class DuplicatedMessageError(RPCException):
- message = _("Received replayed message(%(msg_id)s). Ignoring.")
-
-
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")
@@ -152,10 +139,6 @@ class UnsupportedRpcEnvelopeVersion(RPCException):
"not supported by this endpoint.")
-class InvalidRpcEnvelope(RPCException):
- message = _("RPC envelope was malformed.")
-
-
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -455,32 +438,17 @@ def version_is_compatible(imp_version, version):
def serialize_msg(raw_msg, force_envelope=False):
- msg_identifier = uuid.uuid4().hex
-
if not _SEND_RPC_ENVELOPE and not force_envelope:
- if isinstance(raw_msg, dict):
- raw_msg['_nonce'] = msg_identifier
return raw_msg
- """Make an RPC message envelope"""
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
- _MESSAGE_KEY: jsonutils.dumps(raw_msg),
- _NONCE_KEY: msg_identifier}
+ _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
-def _raise_if_duplicate(duplicate_key):
- """Check if a message is a duplicate based on key."""
- if not duplicate_key:
- return
- if duplicate_key in SEEN_MSGS:
- raise DuplicatedMessageError(duplicate_key)
- SEEN_MSGS.append(duplicate_key)
-
-
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
@@ -505,32 +473,21 @@ def deserialize_msg(msg):
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
- has_envelope = True
- base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not isinstance(msg, dict):
# See #2 above.
return msg
- elif not all(map(lambda key: key in msg, base_envelope_keys)):
- # See #1.b above.
- has_envelope = False
- elif not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
+
+ base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
+ if not all(map(lambda key: key in msg, base_envelope_keys)):
+ # See #1.b above.
+ return msg
+
+ # At this point we think we have the message envelope
+ # format we were expecting. (#1.a above)
+
+ if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
- nonce = None
- raw_msg = None
-
- if has_envelope and '_NONCE_KEY' in msg: # envelope v2.1
- _raise_if_duplicate(msg[_NONCE_KEY])
-
- # Here, we can delay jsonutils.loads until
- # after we have verified the message.
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
- elif has_envelope: # envelope v2.0
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
- nonce = raw_msg.get('_nonce')
- _raise_if_duplicate(nonce)
- else: # no envelope ("v1.0")
- raw_msg = msg
- nonce = raw_msg.get('_nonce')
- _raise_if_duplicate(nonce)
+
+ raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg