summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2012-12-13 12:53:43 -0500
committerRussell Bryant <rbryant@redhat.com>2013-01-03 14:02:09 -0500
commitadffcd58fb09dafb0c4e128797b4a61a59087f9a (patch)
treeafcbabee9561afb0fbc56ef2dafe6288c107ca01 /openstack
parentfdc74c07d63ff897f42405b6dde0ffa46d6e26c3 (diff)
downloadoslo-adffcd58fb09dafb0c4e128797b4a61a59087f9a.tar.gz
oslo-adffcd58fb09dafb0c4e128797b4a61a59087f9a.tar.xz
oslo-adffcd58fb09dafb0c4e128797b4a61a59087f9a.zip
Optionally add message envelope to notifications.
When message envelopes were added for rpc messages, notifications were explicitly left out. This was due to the fact that notifications are consumed outside of OpenStack. However, notification consumers will likely want to eventually start taking advantage of some of the features that the envelope is a prerequisite for, such as message signing. This patch adds a new notification driver that produces message based notifications including the message envelope. Related to blueprint version-rpc-messages. Change-Id: Iaa8b437e6b4f64053f4b02bf486516f60bb020e9
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/notifier/rpc_notifier2.py51
-rw-r--r--openstack/common/rpc/__init__.py5
-rw-r--r--openstack/common/rpc/amqp.py8
-rw-r--r--openstack/common/rpc/common.py8
-rw-r--r--openstack/common/rpc/impl_kombu.py5
-rw-r--r--openstack/common/rpc/impl_qpid.py5
-rw-r--r--openstack/common/rpc/impl_zmq.py18
7 files changed, 78 insertions, 22 deletions
diff --git a/openstack/common/notifier/rpc_notifier2.py b/openstack/common/notifier/rpc_notifier2.py
new file mode 100644
index 0000000..b7696fd
--- /dev/null
+++ b/openstack/common/notifier/rpc_notifier2.py
@@ -0,0 +1,51 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''messaging based notification driver, with message envelopes'''
+
+from openstack.common import cfg
+from openstack.common import context as req_context
+from openstack.common.gettextutils import _
+from openstack.common import log as logging
+from openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+ 'topics', default=['notifications', ],
+ help='AMQP topic(s) used for openstack notifications')
+
+opt_group = cfg.OptGroup(name='rpc_notifier2',
+ title='Options for rpc_notifier2')
+
+CONF = cfg.CONF
+CONF.register_group(opt_group)
+CONF.register_opt(notification_topic_opt, opt_group)
+
+
+def notify(context, message):
+ """Sends a notification via RPC"""
+ if not context:
+ context = req_context.get_admin_context()
+ priority = message.get('priority',
+ CONF.default_notification_level)
+ priority = priority.lower()
+ for topic in CONF.rpc_notifier2.topics:
+ topic = '%s.%s' % (topic, priority)
+ try:
+ rpc.notify(context, topic, message, envelope=True)
+ except Exception:
+ LOG.exception(_("Could not send notification to %(topic)s. "
+ "Payload=%(message)s"), locals())
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py
index 902cea2..9303505 100644
--- a/openstack/common/rpc/__init__.py
+++ b/openstack/common/rpc/__init__.py
@@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index ce4c028..98baa53 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -411,17 +411,15 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here.
- # The messages sent out as notifications are intended to be consumed by
- # 3rd party applications. The notification producer is entirely
- # responsible for the message content and versioning.
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index a3a3699..86aa986 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -408,8 +408,8 @@ def version_is_compatible(imp_version, version):
return True
-def serialize_msg(raw_msg):
- if not _SEND_RPC_ENVELOPE:
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
@@ -436,8 +436,8 @@ def deserialize_msg(msg):
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
- # be a notification (which we don't wrap), or a message from before
- # we added a message envelope (referred to as version 1.0).
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 96e81d7..de9200a 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -783,11 +783,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index a6dd983..d42fe16 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -583,10 +583,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 880896b..7196125 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -183,9 +183,9 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True):
+ def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
- data = rpc_common.serialize_msg(data)
+ data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self):
@@ -510,7 +510,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -519,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize)
+ conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -588,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -614,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize)
+ _topic, _topic, msg, timeout, serialize,
+ force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@@ -655,7 +658,8 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = False
+ kwargs['serialize'] = kwargs.pop('envelope')
+ kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)