summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/__init__.py27
-rw-r--r--openstack/common/rpc/amqp.py18
-rw-r--r--openstack/common/rpc/common.py51
-rw-r--r--openstack/common/rpc/securemessage.py36
4 files changed, 119 insertions, 13 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py
index 104b059..f8513e9 100644
--- a/openstack/common/rpc/__init__.py
+++ b/openstack/common/rpc/__init__.py
@@ -289,6 +289,33 @@ def queue_get_for(context, topic, host):
return '%s.%s' % (topic, host) if host else topic
+_SERVICENAME = None
+
+
+def set_service_name(topic, host):
+ """Sets a global with the global rpc serivce name.
+
+ This function should be normally used as follows:
+
+ rpc.set_service_name(topic, host)
+
+ This name may be used later on to find the identity of the service in order
+ to do things like source keys for signing of messages.
+ """
+ global _SERVICENAME
+ _SERVICENAME = (topic, host)
+
+
+def get_service_name(merge=False):
+ """Returns the global rpc service name."""
+ global _SERVICENAME
+ if not _SERVICENAME:
+ return None
+ if merge:
+ return '%s.%s' % _SERVICENAME
+ return _SERVICENAME
+
+
_RPCIMPL = None
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 38f2515..3159815 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -542,7 +542,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
msg.update({'_reply_q': connection_pool.reply_proxy.get_reply_q()})
wait_msg = MulticallProxyWaiter(conf, msg_id, timeout, connection_pool)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg), timeout)
+ conn.topic_send(topic, rpc_common.serialize_msg(msg, topic), timeout)
return wait_msg
@@ -562,7 +562,7 @@ def cast(conf, context, topic, msg, connection_pool):
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
+ conn.topic_send(topic, rpc_common.serialize_msg(msg, topic))
def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -571,7 +571,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- conn.fanout_send(topic, rpc_common.serialize_msg(msg))
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg, topic))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -580,7 +580,10 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.topic_send(topic, rpc_common.serialize_msg(msg))
+ fulltopic = topic
+ if 'hostname' in server_params:
+ fulltopic += ('.' + server_params['hostname'])
+ conn.topic_send(topic, rpc_common.serialize_msg(msg, fulltopic))
def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -590,7 +593,10 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
- conn.fanout_send(topic, rpc_common.serialize_msg(msg))
+ fulltopic = topic
+ if 'hostname' in server_params:
+ fulltopic += ('.' + server_params['hostname'])
+ conn.fanout_send(topic, rpc_common.serialize_msg(msg, fulltopic))
def notify(conf, context, topic, msg, connection_pool, envelope):
@@ -602,7 +608,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:
- msg = rpc_common.serialize_msg(msg)
+ msg = rpc_common.serialize_msg(msg, topic)
conn.notify_send(topic, msg)
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 45bd7a8..f1ecba7 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -29,6 +29,8 @@ from openstack.common import importutils
from openstack.common import jsonutils
from openstack.common import local
from openstack.common import log as logging
+from openstack.common import rpc
+from openstack.common.rpc import securemessage
CONF = cfg.CONF
@@ -47,7 +49,20 @@ This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
-The current message format (version 2.0) is very simple. It is:
+The current message format (version 2.1) it has 2 mandatory fields and 2
+optional fields.
+ {
+ 'oslo.version': <RPC Envelope Version as a String>,
+ 'oslo.message': <Application Message Payload, JSON encoded, optionally
+ encrypted>
+ 'oslo.secure.metadata': <metadata for the signature, JSON encoded>
+ 'oslo.secure.signature': <a HMAC signature over the other fields
+ HMAC(key, (version || metadata || message))>
+ }
+The 'secure' fields are optional but should be either both present or both
+absent. If they are not present a receiver may decide to refuse the message.
+
+The message format (version 2.0) is very simple. It is:
{
'oslo.version': <RPC Envelope Version as a String>,
@@ -65,10 +80,12 @@ We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
-_RPC_ENVELOPE_VERSION = '2.0'
+_RPC_ENVELOPE_VERSION = '2.1'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
+_METADATA_KEY = 'oslo.secure.metadata'
+_SIGNATURE_KEY = 'oslo.secure.signature'
_REMOTE_POSTFIX = '_Remote'
@@ -165,6 +182,10 @@ class RpcVersionCapError(RPCException):
msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
+class InvalidRPCEnvelope(RPCException):
+ message = _("Invalid RPC Envelope: %s")
+
+
class Connection(object):
"""A connection, returned by rpc.create_connection().
@@ -456,11 +477,22 @@ def version_is_compatible(imp_version, version):
return True
-def serialize_msg(raw_msg):
+def serialize_msg(raw_msg, topic=None):
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
+ version = _RPC_ENVELOPE_VERSION
+ message = jsonutils.dumps(raw_msg)
+
+ mytopic, myname = rpc.get_service_name()
+ secmsg = securemessage.SecureMessage(mytopic, myname, CONF)
+ metadata, message, signature = secmsg.serialize(version, topic, message)
+
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
- _MESSAGE_KEY: jsonutils.dumps(raw_msg)}
+ _MESSAGE_KEY: message}
+
+ if metadata is not None and signature is not None:
+ msg[_METADATA_KEY] = metadata
+ msg[_SIGNATURE_KEY] = signature
return msg
@@ -500,10 +532,15 @@ def deserialize_msg(msg):
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
-
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
- raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
+ mytopic, myname = rpc.get_service_name()
+ secmsg = securemessage.SecureMessage(mytopic, myname, CONF)
+ ver = msg[_VERSION_KEY]
+ meta = msg[_METADATA_KEY] if _METADATA_KEY in msg else None
+ msg = msg[_MESSAGE_KEY]
+ sig = msg[_SIGNATURE_KEY] if _SIGNATURE_KEY in msg else None
+ metadata, raw_msg = secmsg.deserialize(ver, meta, msg, sig)
- return raw_msg
+ return jsonutils.loads(raw_msg)
diff --git a/openstack/common/rpc/securemessage.py b/openstack/common/rpc/securemessage.py
index a935f9b..523bb2e 100644
--- a/openstack/common/rpc/securemessage.py
+++ b/openstack/common/rpc/securemessage.py
@@ -93,6 +93,15 @@ class InvalidSignature(SecureMessageException):
super(InvalidSignature, self).__init__(self.msg % (src, dst))
+class MissingSignature(SecureMessageException):
+ """Signature validation failed."""
+
+ msg = "Signature not found (validating message received by: %s)"
+
+ def __init__(self, dst):
+ super(InvalidSignature, self).__init__(self.msg % dst)
+
+
class UnknownDestinationName(SecureMessageException):
"""The Destination name is unknown to us."""
@@ -519,3 +528,30 @@ class SecureMessage(object):
msg = message
return (md, msg)
+
+ def serialize(self, version, destination, message):
+ if not self._conf.enabled:
+ return None, message, None
+
+ try:
+ meta, msg, sig = self.encode(version, destination, message)
+ except Exception:
+ if self._conf.enforced:
+ raise
+ else:
+ meta = None
+ msg = message
+ sig = None
+
+ return meta, msg, sig
+
+ def deserialize(self, version, metadata, message, signature):
+ if signature:
+ meta, msg = self.decode(version, metadata, message, signature)
+ elif self._conf.enabled and self._conf.enforced:
+ raise MissingSignature(self._name)
+ else:
+ meta = None
+ msg = message
+
+ return meta, msg