diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-01-06 05:55:28 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-01-06 05:55:28 +0000 |
| commit | 5d4a9c99fe20085aa1c9afc130776ebe21574a7f (patch) | |
| tree | ec0e3061c3b61bab03125483bfda599da924a579 /openstack | |
| parent | 193803bd4a812b75d92d7365e75a5f71606de549 (diff) | |
| parent | adffcd58fb09dafb0c4e128797b4a61a59087f9a (diff) | |
| download | oslo-5d4a9c99fe20085aa1c9afc130776ebe21574a7f.tar.gz oslo-5d4a9c99fe20085aa1c9afc130776ebe21574a7f.tar.xz oslo-5d4a9c99fe20085aa1c9afc130776ebe21574a7f.zip | |
Merge "Optionally add message envelope to notifications."
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/notifier/rpc_notifier2.py | 51 | ||||
| -rw-r--r-- | openstack/common/rpc/__init__.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/amqp.py | 8 | ||||
| -rw-r--r-- | openstack/common/rpc/common.py | 8 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 18 |
7 files changed, 78 insertions, 22 deletions
diff --git a/openstack/common/notifier/rpc_notifier2.py b/openstack/common/notifier/rpc_notifier2.py new file mode 100644 index 0000000..b7696fd --- /dev/null +++ b/openstack/common/notifier/rpc_notifier2.py @@ -0,0 +1,51 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''messaging based notification driver, with message envelopes''' + +from openstack.common import cfg +from openstack.common import context as req_context +from openstack.common.gettextutils import _ +from openstack.common import log as logging +from openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'topics', default=['notifications', ], + help='AMQP topic(s) used for openstack notifications') + +opt_group = cfg.OptGroup(name='rpc_notifier2', + title='Options for rpc_notifier2') + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opt(notification_topic_opt, opt_group) + + +def notify(context, message): + """Sends a notification via RPC""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.rpc_notifier2.topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message, envelope=True) + except Exception: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 902cea2..9303505 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None): return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) -def notify(context, topic, msg): +def notify(context, topic, msg, envelope=False): """Send notification event. :param context: Information that identifies the user that has made this request. :param topic: The topic to send the notification to. :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. :returns: None """ - return _get_impl().notify(cfg.CONF, context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) def cleanup(): diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index c32815d..ad448f2 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -415,17 +415,15 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, conn.fanout_send(topic, rpc_common.serialize_msg(msg)) -def notify(conf, context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here. - # The messages sent out as notifications are intended to be consumed by - # 3rd party applications. The notification producer is entirely - # responsible for the message content and versioning. + if envelope: + msg = rpc_common.serialize_msg(msg, force_envelope=True) conn.notify_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index 7434a2d..8af2f03 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -414,8 +414,8 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg): - if not _SEND_RPC_ENVELOPE: +def serialize_msg(raw_msg, force_envelope=False): + if not _SEND_RPC_ENVELOPE and not force_envelope: return raw_msg # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more @@ -442,8 +442,8 @@ def deserialize_msg(msg): # If so, great! # # b) If it doesn't look like a standard message envelope, it could either - # be a notification (which we don't wrap), or a message from before - # we added a message envelope (referred to as version 1.0). + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). # Just return the message as-is. # # 2) It's any other non-dict type. Just return it and hope for the best. diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index fa3e25b..8129e15 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -793,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify( conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index f3e558e..3bddaad 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -593,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 880896b..7196125 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -183,9 +183,9 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True): + def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): if serialize: - data = rpc_common.serialize_msg(data) + data = rpc_common.serialize_msg(data, force_envelope) self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): @@ -510,7 +510,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, + force_envelope=False): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -519,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize) + conn.cast(msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -588,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True, + force_envelope=False): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -614,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize) + _topic, _topic, msg, timeout, serialize, + force_envelope) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -655,7 +658,8 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = False + kwargs['serialize'] = kwargs.pop('envelope') + kwargs['force_envelope'] = True cast(conf, context, topic, msg, **kwargs) |
