diff options
Diffstat (limited to 'openstack/common/rpc/amqp.py')
-rw-r--r-- | openstack/common/rpc/amqp.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 2f90a9b..c32815d 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, msg) + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): @@ -368,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) return wait_msg @@ -387,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -395,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -403,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -412,7 +412,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def notify(conf, context, topic, msg, connection_pool): @@ -422,6 +422,10 @@ def notify(conf, context, topic, msg, connection_pool): topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: + # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here. + # The messages sent out as notifications are intended to be consumed by + # 3rd party applications. The notification producer is entirely + # responsible for the message content and versioning. conn.notify_send(topic, msg) |