summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/notifier/rpc_notifier2.py51
-rw-r--r--openstack/common/rpc/__init__.py5
-rw-r--r--openstack/common/rpc/amqp.py8
-rw-r--r--openstack/common/rpc/common.py8
-rw-r--r--openstack/common/rpc/impl_kombu.py5
-rw-r--r--openstack/common/rpc/impl_qpid.py5
-rw-r--r--openstack/common/rpc/impl_zmq.py18
7 files changed, 78 insertions, 22 deletions
diff --git a/openstack/common/notifier/rpc_notifier2.py b/openstack/common/notifier/rpc_notifier2.py
new file mode 100644
index 0000000..b7696fd
--- /dev/null
+++ b/openstack/common/notifier/rpc_notifier2.py
@@ -0,0 +1,51 @@
+# Copyright 2011 OpenStack LLC.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+'''messaging based notification driver, with message envelopes'''
+
+from openstack.common import cfg
+from openstack.common import context as req_context
+from openstack.common.gettextutils import _
+from openstack.common import log as logging
+from openstack.common import rpc
+
+LOG = logging.getLogger(__name__)
+
+notification_topic_opt = cfg.ListOpt(
+ 'topics', default=['notifications', ],
+ help='AMQP topic(s) used for openstack notifications')
+
+opt_group = cfg.OptGroup(name='rpc_notifier2',
+ title='Options for rpc_notifier2')
+
+CONF = cfg.CONF
+CONF.register_group(opt_group)
+CONF.register_opt(notification_topic_opt, opt_group)
+
+
+def notify(context, message):
+ """Sends a notification via RPC"""
+ if not context:
+ context = req_context.get_admin_context()
+ priority = message.get('priority',
+ CONF.default_notification_level)
+ priority = priority.lower()
+ for topic in CONF.rpc_notifier2.topics:
+ topic = '%s.%s' % (topic, priority)
+ try:
+ rpc.notify(context, topic, message, envelope=True)
+ except Exception:
+ LOG.exception(_("Could not send notification to %(topic)s. "
+ "Payload=%(message)s"), locals())
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py
index 902cea2..9303505 100644
--- a/openstack/common/rpc/__init__.py
+++ b/openstack/common/rpc/__init__.py
@@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
-def notify(context, topic, msg):
+def notify(context, topic, msg, envelope=False):
"""Send notification event.
:param context: Information that identifies the user that has made this
request.
:param topic: The topic to send the notification to.
:param msg: This is a dict of content of event.
+ :param envelope: Set to True to enable message envelope for notifications.
:returns: None
"""
- return _get_impl().notify(cfg.CONF, context, topic, msg)
+ return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup():
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index ce4c028..98baa53 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -411,17 +411,15 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
-def notify(conf, context, topic, msg, connection_pool):
+def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
- # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here.
- # The messages sent out as notifications are intended to be consumed by
- # 3rd party applications. The notification producer is entirely
- # responsible for the message content and versioning.
+ if envelope:
+ msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg)
diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py
index a3a3699..86aa986 100644
--- a/openstack/common/rpc/common.py
+++ b/openstack/common/rpc/common.py
@@ -408,8 +408,8 @@ def version_is_compatible(imp_version, version):
return True
-def serialize_msg(raw_msg):
- if not _SEND_RPC_ENVELOPE:
+def serialize_msg(raw_msg, force_envelope=False):
+ if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
@@ -436,8 +436,8 @@ def deserialize_msg(msg):
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
- # be a notification (which we don't wrap), or a message from before
- # we added a message envelope (referred to as version 1.0).
+ # be a notification, or a message from before we added a message
+ # envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py
index 96e81d7..de9200a 100644
--- a/openstack/common/rpc/impl_kombu.py
+++ b/openstack/common/rpc/impl_kombu.py
@@ -783,11 +783,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(
conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py
index a6dd983..d42fe16 100644
--- a/openstack/common/rpc/impl_qpid.py
+++ b/openstack/common/rpc/impl_qpid.py
@@ -583,10 +583,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection))
-def notify(conf, context, topic, msg):
+def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg,
- rpc_amqp.get_connection_pool(conf, Connection))
+ rpc_amqp.get_connection_pool(conf, Connection),
+ envelope)
def cleanup():
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 880896b..7196125 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -183,9 +183,9 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
- def cast(self, msg_id, topic, data, serialize=True):
+ def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
if serialize:
- data = rpc_common.serialize_msg(data)
+ data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self):
@@ -510,7 +510,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
+def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -519,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True):
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize)
+ conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -588,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1]
-def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
+def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -614,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True):
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize)
+ _topic, _topic, msg, timeout, serialize,
+ force_envelope)
return
return method(_addr, context, _topic, _topic, msg, timeout)
@@ -655,7 +658,8 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
- kwargs['serialize'] = False
+ kwargs['serialize'] = kwargs.pop('envelope')
+ kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs)