summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/notifier/rpc_notifier2.py51
-rw-r--r--openstack/common/rpc/__init__.py5
-rw-r--r--openstack/common/rpc/amqp.py8
-rw-r--r--openstack/common/rpc/common.py8
-rw-r--r--openstack/common/rpc/impl_kombu.py5
-rw-r--r--openstack/common/rpc/impl_qpid.py5
-rw-r--r--openstack/common/rpc/impl_zmq.py18
-rw-r--r--tests/unit/rpc/common.py36
-rw-r--r--tests/unit/test_notifier.py19
9 files changed, 129 insertions, 26 deletions
diff --git a/openstack/common/notifier/rpc_notifier2.py b/openstack/common/notifier/rpc_notifier2.py
new file mode 100644
index 0000000..b7696fd
--- /dev/null
+++ b/openstack/common/notifier/rpc_notifier2.py
@@ -0,0 +1,51 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''messaging based notification driver, with message envelopes'''
+
+from openstack.common import cfg
+from openstack.common import context as req_context
+from openstack.common.gettextutils import _
+from openstack.common import log as logging
+from openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+ 'topics', default=['notifications', ],
+ help='AMQP topic(s) used for openstack notifications')
+
+opt_group = cfg.OptGroup(name='rpc_notifier2',
+ title='Options for rpc_notifier2')
+
+CONF = cfg.CONF
+CONF.register_group(opt_group)
+CONF.register_opt(notification_topic_opt, opt_group)
+
+
+def notify(context, message):
+ """Sends a notification via RPC"""
+ if not context:
+ context = req_context.get_admin_context()
+ priority = message.get('priority',
+ CONF.default_notification_level)
+ priority = priority.lower()
+ for topic in CONF.rpc_notifier2.topics:
+ topic = '%s.%s' % (topic, priority)
+ try:
+ rpc.notify(context, topic, message, envelope=True)
+ except Exception:
+ LOG.exception(_("Could not send notification to %(topic)s. "
+ "Payload=%(message)s"), locals())
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py
index 902cea2..9303505 100644
--- a/openstack/common/rpc/__init__.py
+++ b/openstack/common/rpc/__init__.py
@@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index c32815d..ad448f2 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -415,17 +415,15 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here.
- # The messages sent out as notifications are intended to be consumed by
- # 3rd party applications. The notification producer is entirely
- # responsible for the message content and versioning.
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index 7434a2d..8af2f03 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -414,8 +414,8 @@ def version_is_compatible(imp_version, version):
return True
-def serialize_msg(raw_msg):
- if not _SEND_RPC_ENVELOPE:
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
@@ -442,8 +442,8 @@ def deserialize_msg(msg):
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
- # be a notification (which we don't wrap), or a message from before
- # we added a message envelope (referred to as version 1.0).
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index fa3e25b..8129e15 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -793,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index f3e558e..3bddaad 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -593,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 880896b..7196125 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -183,9 +183,9 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True):
+ def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
- data = rpc_common.serialize_msg(data)
+ data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self):
@@ -510,7 +510,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -519,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize)
+ conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -588,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -614,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize)
+ _topic, _topic, msg, timeout, serialize,
+ force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@@ -655,7 +658,8 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = False
+ kwargs['serialize'] = kwargs.pop('envelope')
+ kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)
diff --git a/tests/unit/rpc/common.py b/tests/unit/rpc/common.py
index 70a1ad0..9838c6f 100644
--- a/tests/unit/rpc/common.py
+++ b/tests/unit/rpc/common.py
@@ -29,6 +29,7 @@ import nose
from openstack.common import cfg
from openstack.common import exception
from openstack.common.gettextutils import _
+from openstack.common import jsonutils
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from openstack.common.rpc import dispatcher as rpc_dispatcher
@@ -276,6 +277,41 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase):
"args": {"value": value}})
self.assertEqual(value, result)
+ def test_notification_envelope(self):
+ raw_msg = {'a': 'b'}
+ self.test_msg = None
+
+ def fake_notify_send(_conn, topic, msg):
+ self.test_msg = msg
+
+ self.stubs.Set(self.rpc.Connection, 'notify_send', fake_notify_send)
+
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=False)
+ self.assertEqual(self.test_msg, raw_msg)
+
+ # Envelopes enabled, but not enabled for notifications
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True)
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=False)
+ self.assertEqual(self.test_msg, raw_msg)
+
+ # Now turn it on for notifications
+ msg = {
+ 'oslo.version': rpc_common._RPC_ENVELOPE_VERSION,
+ 'oslo.message': jsonutils.dumps(raw_msg),
+ }
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=True)
+ self.assertEqual(self.test_msg, msg)
+
+ # Make sure envelopes are still on notifications, even if turned off
+ # for general messages.
+ self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False)
+ self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg,
+ envelope=True)
+ self.assertEqual(self.test_msg, msg)
+
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
diff --git a/tests/unit/test_notifier.py b/tests/unit/test_notifier.py
index 1cabae0..be58da2 100644
--- a/tests/unit/test_notifier.py
+++ b/tests/unit/test_notifier.py
@@ -75,19 +75,30 @@ class NotifierTestCase(test_utils.BaseTestCase):
notifier_api.notify(ctxt, 'publisher_id', 'event_type',
notifier_api.WARN, dict(a=3))
- def test_send_rabbit_notification(self):
- self.stubs.Set(cfg.CONF, 'notification_driver',
- ['openstack.common.notifier.rabbit_notifier'])
+ def _test_rpc_notify(self, driver, envelope=False):
+ self.stubs.Set(cfg.CONF, 'notification_driver', [driver])
self.mock_notify = False
+ self.envelope = False
- def mock_notify(cls, *args):
+ def mock_notify(cls, *args, **kwargs):
self.mock_notify = True
+ self.envelope = kwargs.get('envelope', False)
self.stubs.Set(rpc, 'notify', mock_notify)
notifier_api.notify(ctxt, 'publisher_id', 'event_type',
notifier_api.WARN, dict(a=3))
self.assertEqual(self.mock_notify, True)
+ self.assertEqual(self.envelope, envelope)
+
+ def test_rabbit_notifier(self):
+ self._test_rpc_notify('openstack.common.notifier.rabbit_notifier')
+
+ def test_rpc_notifier(self):
+ self._test_rpc_notify('openstack.common.notifier.rpc_notifier')
+
+ def test_rpc_notifier2(self):
+ self._test_rpc_notify('openstack.common.notifier.rpc_notifier2', True)
def test_invalid_priority(self):
self.assertRaises(notifier_api.BadPriorityException,