diff options
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/__init__.py | 27 | ||||
| -rw-r--r-- | openstack/common/rpc/amqp.py | 18 | ||||
| -rw-r--r-- | openstack/common/rpc/common.py | 51 | ||||
| -rw-r--r-- | openstack/common/rpc/securemessage.py | 36 |
4 files changed, 119 insertions, 13 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 104b059..f8513e9 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -289,6 +289,33 @@ def queue_get_for(context, topic, host): return '%s.%s' % (topic, host) if host else topic +_SERVICENAME = None + + +def set_service_name(topic, host): + """Sets a global with the global rpc serivce name. + + This function should be normally used as follows: + + rpc.set_service_name(topic, host) + + This name may be used later on to find the identity of the service in order + to do things like source keys for signing of messages. + """ + global _SERVICENAME + _SERVICENAME = (topic, host) + + +def get_service_name(merge=False): + """Returns the global rpc service name.""" + global _SERVICENAME + if not _SERVICENAME: + return None + if merge: + return '%s.%s' % _SERVICENAME + return _SERVICENAME + + _RPCIMPL = None diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 38f2515..3159815 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -542,7 +542,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()}) wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout) + conn.topic_send(topic, rpc_common.serialize_msg(msg, topic), timeout) return wait_msg @@ -562,7 +562,7 @@ def cast(conf, context, topic, msg, connection_pool): _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + conn.topic_send(topic, rpc_common.serialize_msg(msg, topic)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -571,7 +571,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, rpc_common.serialize_msg(msg)) + conn.fanout_send(topic, rpc_common.serialize_msg(msg, topic)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -580,7 +580,10 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, rpc_common.serialize_msg(msg)) + fulltopic = topic + if 'hostname' in server_params: + fulltopic += ('.' + server_params['hostname']) + conn.topic_send(topic, rpc_common.serialize_msg(msg, fulltopic)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -590,7 +593,10 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, rpc_common.serialize_msg(msg)) + fulltopic = topic + if 'hostname' in server_params: + fulltopic += ('.' + server_params['hostname']) + conn.fanout_send(topic, rpc_common.serialize_msg(msg, fulltopic)) def notify(conf, context, topic, msg, connection_pool, envelope): @@ -602,7 +608,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: - msg = rpc_common.serialize_msg(msg) + msg = rpc_common.serialize_msg(msg, topic) conn.notify_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 45bd7a8..f1ecba7 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -29,6 +29,8 @@ from openstack.common import importutils from openstack.common import jsonutils from openstack.common import local from openstack.common import log as logging +from openstack.common import rpc +from openstack.common.rpc import securemessage CONF = cfg.CONF @@ -47,7 +49,20 @@ This version number applies to the message envelope that is used in the serialization done inside the rpc layer. See serialize_msg() and deserialize_msg(). -The current message format (version 2.0) is very simple. It is: +The current message format (version 2.1) it has 2 mandatory fields and 2 +optional fields. + { + 'oslo.version': <RPC Envelope Version as a String>, + 'oslo.message': <Application Message Payload, JSON encoded, optionally + encrypted> + 'oslo.secure.metadata': <metadata for the signature, JSON encoded> + 'oslo.secure.signature': <a HMAC signature over the other fields + HMAC(key, (version || metadata || message))> + } +The 'secure' fields are optional but should be either both present or both +absent. If they are not present a receiver may decide to refuse the message. + +The message format (version 2.0) is very simple. It is: { 'oslo.version': <RPC Envelope Version as a String>, @@ -65,10 +80,12 @@ We will JSON encode the application message payload. The message envelope, which includes the JSON encoded application message body, will be passed down to the messaging libraries as a dict. ''' -_RPC_ENVELOPE_VERSION = '2.0' +_RPC_ENVELOPE_VERSION = '2.1' _VERSION_KEY = 'oslo.version' _MESSAGE_KEY = 'oslo.message' +_METADATA_KEY = 'oslo.secure.metadata' +_SIGNATURE_KEY = 'oslo.secure.signature' _REMOTE_POSTFIX = '_Remote' @@ -165,6 +182,10 @@ class RpcVersionCapError(RPCException): msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low") +class InvalidRPCEnvelope(RPCException): + message = _("Invalid RPC Envelope: %s") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -456,11 +477,22 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg): +def serialize_msg(raw_msg, topic=None): # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more # information about this format. + version = _RPC_ENVELOPE_VERSION + message = jsonutils.dumps(raw_msg) + + mytopic, myname = rpc.get_service_name() + secmsg = securemessage.SecureMessage(mytopic, myname, CONF) + metadata, message, signature = secmsg.serialize(version, topic, message) + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, - _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + _MESSAGE_KEY: message} + + if metadata is not None and signature is not None: + msg[_METADATA_KEY] = metadata + msg[_SIGNATURE_KEY] = signature return msg @@ -500,10 +532,15 @@ def deserialize_msg(msg): # At this point we think we have the message envelope # format we were expecting. (#1.a above) - if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) - raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + mytopic, myname = rpc.get_service_name() + secmsg = securemessage.SecureMessage(mytopic, myname, CONF) + ver = msg[_VERSION_KEY] + meta = msg[_METADATA_KEY] if _METADATA_KEY in msg else None + msg = msg[_MESSAGE_KEY] + sig = msg[_SIGNATURE_KEY] if _SIGNATURE_KEY in msg else None + metadata, raw_msg = secmsg.deserialize(ver, meta, msg, sig) - return raw_msg + return jsonutils.loads(raw_msg) diff --git a/openstack/common/rpc/securemessage.py b/openstack/common/rpc/securemessage.py index a935f9b..523bb2e 100644 --- a/openstack/common/rpc/securemessage.py +++ b/openstack/common/rpc/securemessage.py @@ -93,6 +93,15 @@ class InvalidSignature(SecureMessageException): super(InvalidSignature, self).__init__(self.msg % (src, dst)) +class MissingSignature(SecureMessageException): + """Signature validation failed.""" + + msg = "Signature not found (validating message received by: %s)" + + def __init__(self, dst): + super(InvalidSignature, self).__init__(self.msg % dst) + + class UnknownDestinationName(SecureMessageException): """The Destination name is unknown to us.""" @@ -519,3 +528,30 @@ class SecureMessage(object): msg = message return (md, msg) + + def serialize(self, version, destination, message): + if not self._conf.enabled: + return None, message, None + + try: + meta, msg, sig = self.encode(version, destination, message) + except Exception: + if self._conf.enforced: + raise + else: + meta = None + msg = message + sig = None + + return meta, msg, sig + + def deserialize(self, version, metadata, message, signature): + if signature: + meta, msg = self.decode(version, metadata, message, signature) + elif self._conf.enabled and self._conf.enforced: + raise MissingSignature(self._name) + else: + meta = None + msg = message + + return meta, msg |
