diff options
| author | Simo Sorce <simo@redhat.com> | 2013-04-24 22:57:12 -0400 |
|---|---|---|
| committer | Simo Sorce <simo@redhat.com> | 2013-08-19 14:47:16 -0400 |
| commit | 85b975284947a4fc478df3c691d764d0172733e9 (patch) | |
| tree | acc5d532c60c90ded869b19ee9786d583eb7b0b5 /openstack/common | |
| parent | 71a62b2cbc489470c96ac771392270b6058ddaa1 (diff) | |
| download | oslo-85b975284947a4fc478df3c691d764d0172733e9.tar.gz oslo-85b975284947a4fc478df3c691d764d0172733e9.tar.xz oslo-85b975284947a4fc478df3c691d764d0172733e9.zip | |
RPC: Add support for optional message signing
Wires in SecureMessage so that signing can be optionally used.
Signing messages is always attempted but failing to create a signed message is
currently not fatal. In case of failure the message is sent unsigned.
Once all services has been converted to support signing, we can start switching
on mandatory signing.
Currently the service name needs to be a global due to the way services are
constructed.
Change-Id: Ie565bbaf61a69a4e526e78e723efecb5e2712877
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/__init__.py | 27 | ||||
| -rw-r--r-- | openstack/common/rpc/amqp.py | 18 | ||||
| -rw-r--r-- | openstack/common/rpc/common.py | 51 | ||||
| -rw-r--r-- | openstack/common/rpc/securemessage.py | 36 |
4 files changed, 119 insertions, 13 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 104b059..f8513e9 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -289,6 +289,33 @@ def queue_get_for(context, topic, host): return '%s.%s' % (topic, host) if host else topic +_SERVICENAME = None + + +def set_service_name(topic, host): + """Sets a global with the global rpc serivce name. + + This function should be normally used as follows: + + rpc.set_service_name(topic, host) + + This name may be used later on to find the identity of the service in order + to do things like source keys for signing of messages. + """ + global _SERVICENAME + _SERVICENAME = (topic, host) + + +def get_service_name(merge=False): + """Returns the global rpc service name.""" + global _SERVICENAME + if not _SERVICENAME: + return None + if merge: + return '%s.%s' % _SERVICENAME + return _SERVICENAME + + _RPCIMPL = None diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 38f2515..3159815 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -542,7 +542,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + conn.topic_send(topic, rpc_common.serialize_msg(msg, topic), timeout) return wait_msg @@ -562,7 +562,7 @@ def cast(conf, context, topic, msg, connection_pool): _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg, topic)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -571,7 +571,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, rpc_common.serialize_msg(msg)) + conn.fanout_send(topic, rpc_common.serialize_msg(msg, topic)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -580,7 +580,10 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + fulltopic = topic + if 'hostname' in server_params: + fulltopic += ('.' + server_params['hostname']) + conn.topic_send(topic, rpc_common.serialize_msg(msg, fulltopic)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -590,7 +593,10 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, rpc_common.serialize_msg(msg)) + fulltopic = topic + if 'hostname' in server_params: + fulltopic += ('.' + server_params['hostname']) + conn.fanout_send(topic, rpc_common.serialize_msg(msg, fulltopic)) def notify(conf, context, topic, msg, connection_pool, envelope): @@ -602,7 +608,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: - msg = rpc_common.serialize_msg(msg) + msg = rpc_common.serialize_msg(msg, topic) conn.notify_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 45bd7a8..f1ecba7 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -29,6 +29,8 @@ from openstack.common import importutils from openstack.common import jsonutils from openstack.common import local from openstack.common import log as logging +from openstack.common import rpc +from openstack.common.rpc import securemessage CONF = cfg.CONF @@ -47,7 +49,20 @@ This version number applies to the message envelope that is used in the serialization done inside the rpc layer. See serialize_msg() and deserialize_msg(). -The current message format (version 2.0) is very simple. It is: +The current message format (version 2.1) it has 2 mandatory fields and 2 +optional fields. + { + 'oslo.version': <RPC Envelope Version as a String>, + 'oslo.message': <Application Message Payload, JSON encoded, optionally + encrypted> + 'oslo.secure.metadata': <metadata for the signature, JSON encoded> + 'oslo.secure.signature': <a HMAC signature over the other fields + HMAC(key, (version || metadata || message))> + } +The 'secure' fields are optional but should be either both present or both +absent. If they are not present a receiver may decide to refuse the message. + +The message format (version 2.0) is very simple. It is: { 'oslo.version': <RPC Envelope Version as a String>, @@ -65,10 +80,12 @@ We will JSON encode the application message payload. The message envelope, which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_RPC_ENVELOPE_VERSION = '2.0' +_RPC_ENVELOPE_VERSION = '2.1' _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' +_METADATA_KEY = 'oslo.secure.metadata' +_SIGNATURE_KEY = 'oslo.secure.signature' _REMOTE_POSTFIX = '_Remote' @@ -165,6 +182,10 @@ class RpcVersionCapError(RPCException): msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") +class InvalidRPCEnvelope(RPCException): + message = _("Invalid RPC Envelope: %s") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -456,11 +477,22 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg): +def serialize_msg(raw_msg, topic=None): # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more # information about this format. + version = _RPC_ENVELOPE_VERSION + message = jsonutils.dumps(raw_msg) + + mytopic, myname = rpc.get_service_name() + secmsg = securemessage.SecureMessage(mytopic, myname, CONF) + metadata, message, signature = secmsg.serialize(version, topic, message) + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, - _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + _MESSAGE_KEY: message} + + if metadata is not None and signature is not None: + msg[_METADATA_KEY] = metadata + msg[_SIGNATURE_KEY] = signature return msg @@ -500,10 +532,15 @@ def deserialize_msg(msg): # At this point we think we have the message envelope # format we were expecting. (#1.a above) - if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) - raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + mytopic, myname = rpc.get_service_name() + secmsg = securemessage.SecureMessage(mytopic, myname, CONF) + ver = msg[_VERSION_KEY] + meta = msg[_METADATA_KEY] if _METADATA_KEY in msg else None + msg = msg[_MESSAGE_KEY] + sig = msg[_SIGNATURE_KEY] if _SIGNATURE_KEY in msg else None + metadata, raw_msg = secmsg.deserialize(ver, meta, msg, sig) - return raw_msg + return jsonutils.loads(raw_msg) diff --git a/openstack/common/rpc/securemessage.py b/openstack/common/rpc/securemessage.py index a935f9b..523bb2e 100644 --- a/openstack/common/rpc/securemessage.py +++ b/openstack/common/rpc/securemessage.py @@ -93,6 +93,15 @@ class InvalidSignature(SecureMessageException): super(InvalidSignature, self).__init__(self.msg % (src, dst)) +class MissingSignature(SecureMessageException): + """Signature validation failed.""" + + msg = "Signature not found (validating message received by: %s)" + + def __init__(self, dst): + super(InvalidSignature, self).__init__(self.msg % dst) + + class UnknownDestinationName(SecureMessageException): """The Destination name is unknown to us.""" @@ -519,3 +528,30 @@ class SecureMessage(object): msg = message return (md, msg) + + def serialize(self, version, destination, message): + if not self._conf.enabled: + return None, message, None + + try: + meta, msg, sig = self.encode(version, destination, message) + except Exception: + if self._conf.enforced: + raise + else: + meta = None + msg = message + sig = None + + return meta, msg, sig + + def deserialize(self, version, metadata, message, signature): + if signature: + meta, msg = self.decode(version, metadata, message, signature) + elif self._conf.enabled and self._conf.enforced: + raise MissingSignature(self._name) + else: + meta = None + msg = message + + return meta, msg |
