diff options
Diffstat (limited to 'openstack/common/rpc')
| -rw-r--r-- | openstack/common/rpc/__init__.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/amqp.py | 8 | ||||
| -rw-r--r-- | openstack/common/rpc/common.py | 8 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_kombu.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_qpid.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 18 |
6 files changed, 27 insertions, 22 deletions
diff --git a/openstack/common/rpc/__init__.py b/openstack/common/rpc/__init__.py index 902cea2..9303505 100644 --- a/openstack/common/rpc/__init__.py +++ b/openstack/common/rpc/__init__.py @@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None): return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) -def notify(context, topic, msg): +def notify(context, topic, msg, envelope=False): """Send notification event. :param context: Information that identifies the user that has made this request. :param topic: The topic to send the notification to. :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. :returns: None """ - return _get_impl().notify(cfg.CONF, context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) def cleanup(): diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index ce4c028..98baa53 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -411,17 +411,15 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, conn.fanout_send(topic, rpc_common.serialize_msg(msg)) -def notify(conf, context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here. - # The messages sent out as notifications are intended to be consumed by - # 3rd party applications. The notification producer is entirely - # responsible for the message content and versioning. + if envelope: + msg = rpc_common.serialize_msg(msg, force_envelope=True) conn.notify_send(topic, msg) diff --git a/openstack/common/rpc/common.py b/openstack/common/rpc/common.py index a3a3699..86aa986 100644 --- a/openstack/common/rpc/common.py +++ b/openstack/common/rpc/common.py @@ -408,8 +408,8 @@ def version_is_compatible(imp_version, version): return True -def serialize_msg(raw_msg): - if not _SEND_RPC_ENVELOPE: +def serialize_msg(raw_msg, force_envelope=False): + if not _SEND_RPC_ENVELOPE and not force_envelope: return raw_msg # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more @@ -436,8 +436,8 @@ def deserialize_msg(msg): # If so, great! # # b) If it doesn't look like a standard message envelope, it could either - # be a notification (which we don't wrap), or a message from before - # we added a message envelope (referred to as version 1.0). + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). # Just return the message as-is. # # 2) It's any other non-dict type. Just return it and hope for the best. diff --git a/openstack/common/rpc/impl_kombu.py b/openstack/common/rpc/impl_kombu.py index 96e81d7..de9200a 100644 --- a/openstack/common/rpc/impl_kombu.py +++ b/openstack/common/rpc/impl_kombu.py @@ -783,11 +783,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify( conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/openstack/common/rpc/impl_qpid.py b/openstack/common/rpc/impl_qpid.py index a6dd983..d42fe16 100644 --- a/openstack/common/rpc/impl_qpid.py +++ b/openstack/common/rpc/impl_qpid.py @@ -583,10 +583,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 880896b..7196125 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -183,9 +183,9 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True): + def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): if serialize: - data = rpc_common.serialize_msg(data) + data = rpc_common.serialize_msg(data, force_envelope) self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): @@ -510,7 +510,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, + force_envelope=False): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -519,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload, serialize) + conn.cast(msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -588,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True, + force_envelope=False): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -614,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout, serialize) + _topic, _topic, msg, timeout, serialize, + force_envelope) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -655,7 +658,8 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = False + kwargs['serialize'] = kwargs.pop('envelope') + kwargs['force_envelope'] = True cast(conf, context, topic, msg, **kwargs) |
