diff options
-rw-r--r-- | openstack/common/notifier/rpc_notifier2.py | 51 | ||||
-rw-r--r-- | openstack/common/rpc/__init__.py | 5 | ||||
-rw-r--r-- | openstack/common/rpc/amqp.py | 8 | ||||
-rw-r--r-- | openstack/common/rpc/common.py | 8 | ||||
-rw-r--r-- | openstack/common/rpc/impl_kombu.py | 5 | ||||
-rw-r--r-- | openstack/common/rpc/impl_qpid.py | 5 | ||||
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 18 | ||||
-rw-r--r-- | tests/unit/rpc/common.py | 36 | ||||
-rw-r--r-- | tests/unit/test_notifier.py | 19 |
9 files changed, 129 insertions, 26 deletions
diff --git a/openstack/common/notifier/rpc_notifier2.py b/openstack/common/notifier/rpc_notifier2.py new file mode 100644 index 0000000..b7696fd --- /dev/null +++ b/openstack/common/notifier/rpc_notifier2.py @@ -0,0 +1,51 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''messaging based notification driver, with message envelopes''' + +from openstack.common import cfg +from openstack.common import context as req_context +from openstack.common.gettextutils import _ +from openstack.common import log as logging +from openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'topics', default=['notifications', ], + help='AMQP topic(s) used for openstack notifications') + +opt_group = cfg.OptGroup(name='rpc_notifier2', + title='Options for rpc_notifier2') + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opt(notification_topic_opt, opt_group) + + +def notify(context, message): + """Sends a notification via RPC""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.rpc_notifier2.topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message, envelope=True) + except Exception: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 902cea2..9303505 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None): return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) -def notify(context, topic, msg): +def notify(context, topic, msg, envelope=False): """Send notification event. :param context: Information that identifies the user that has made this request. :param topic: The topic to send the notification to. :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. :returns: None """ - return _get_impl().notify(cfg.CONF, context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) def cleanup(): diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index c32815d..ad448f2 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -415,17 +415,15 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, conn.fanout_send(topic, rpc_common.serialize_msg(msg)) -def notify(conf, context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here. - # The messages sent out as notifications are intended to be consumed by - # 3rd party applications. The notification producer is entirely - # responsible for the message content and versioning. + if envelope: + msg = rpc_common.serialize_msg(msg, force_envelope=True) conn.notify_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 7434a2d..8af2f03 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -414,8 +414,8 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg): - if not _SEND_RPC_ENVELOPE: +def serialize_msg(raw_msg, force_envelope=False): + if not _SEND_RPC_ENVELOPE and not force_envelope: return raw_msg # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more @@ -442,8 +442,8 @@ def deserialize_msg(msg): # If so, great! # # b) If it doesn't look like a standard message envelope, it could either - # be a notification (which we don't wrap), or a message from before - # we added a message envelope (referred to as version 1.0). + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). # Just return the message as-is. # # 2) It's any other non-dict type. Just return it and hope for the best. diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index fa3e25b..8129e15 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -793,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify( conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index f3e558e..3bddaad 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -593,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 880896b..7196125 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -183,9 +183,9 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True): + def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): if serialize: - data = rpc_common.serialize_msg(data) + data = rpc_common.serialize_msg(data, force_envelope) self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): @@ -510,7 +510,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, + force_envelope=False): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -519,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize) + conn.cast(msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -588,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True, + force_envelope=False): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -614,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize) + _topic, _topic, msg, timeout, serialize, + force_envelope) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -655,7 +658,8 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = False + kwargs['serialize'] = kwargs.pop('envelope') + kwargs['force_envelope'] = True cast(conf, context, topic, msg, **kwargs) diff --git a/tests/unit/rpc/common.py b/tests/unit/rpc/common.py index 70a1ad0..9838c6f 100644 --- a/tests/unit/rpc/common.py +++ b/tests/unit/rpc/common.py @@ -29,6 +29,7 @@ import nose from openstack.common import cfg from openstack.common import exception from openstack.common.gettextutils import _ +from openstack.common import jsonutils from openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import common as rpc_common from openstack.common.rpc import dispatcher as rpc_dispatcher @@ -276,6 +277,41 @@ class BaseRpcAMQPTestCase(BaseRpcTestCase): "args": {"value": value}}) self.assertEqual(value, result) + def test_notification_envelope(self): + raw_msg = {'a': 'b'} + self.test_msg = None + + def fake_notify_send(_conn, topic, msg): + self.test_msg = msg + + self.stubs.Set(self.rpc.Connection, 'notify_send', fake_notify_send) + + self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, + envelope=False) + self.assertEqual(self.test_msg, raw_msg) + + # Envelopes enabled, but not enabled for notifications + self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', True) + self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, + envelope=False) + self.assertEqual(self.test_msg, raw_msg) + + # Now turn it on for notifications + msg = { + 'oslo.version': rpc_common._RPC_ENVELOPE_VERSION, + 'oslo.message': jsonutils.dumps(raw_msg), + } + self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, + envelope=True) + self.assertEqual(self.test_msg, msg) + + # Make sure envelopes are still on notifications, even if turned off + # for general messages. + self.stubs.Set(rpc_common, '_SEND_RPC_ENVELOPE', False) + self.rpc.notify(FLAGS, self.context, 'notifications.info', raw_msg, + envelope=True) + self.assertEqual(self.test_msg, msg) + class TestReceiver(object): """Simple Proxy class so the consumer has methods to call. diff --git a/tests/unit/test_notifier.py b/tests/unit/test_notifier.py index 1cabae0..be58da2 100644 --- a/tests/unit/test_notifier.py +++ b/tests/unit/test_notifier.py @@ -75,19 +75,30 @@ class NotifierTestCase(test_utils.BaseTestCase): notifier_api.notify(ctxt, 'publisher_id', 'event_type', notifier_api.WARN, dict(a=3)) - def test_send_rabbit_notification(self): - self.stubs.Set(cfg.CONF, 'notification_driver', - ['openstack.common.notifier.rabbit_notifier']) + def _test_rpc_notify(self, driver, envelope=False): + self.stubs.Set(cfg.CONF, 'notification_driver', [driver]) self.mock_notify = False + self.envelope = False - def mock_notify(cls, *args): + def mock_notify(cls, *args, **kwargs): self.mock_notify = True + self.envelope = kwargs.get('envelope', False) self.stubs.Set(rpc, 'notify', mock_notify) notifier_api.notify(ctxt, 'publisher_id', 'event_type', notifier_api.WARN, dict(a=3)) self.assertEqual(self.mock_notify, True) + self.assertEqual(self.envelope, envelope) + + def test_rabbit_notifier(self): + self._test_rpc_notify('openstack.common.notifier.rabbit_notifier') + + def test_rpc_notifier(self): + self._test_rpc_notify('openstack.common.notifier.rpc_notifier') + + def test_rpc_notifier2(self): + self._test_rpc_notify('openstack.common.notifier.rpc_notifier2', True) def test_invalid_priority(self): self.assertRaises(notifier_api.BadPriorityException, |