diff options
Diffstat (limited to 'openstack/common/rpc/amqp.py')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 00b2f24..ce4c028 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -168,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, msg) + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): @@ -364,7 +364,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) return wait_msg @@ -383,7 +383,7 @@ def cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -391,7 +391,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -399,7 +399,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -408,7 +408,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def notify(conf, context, topic, msg, connection_pool): @@ -418,6 +418,10 @@ def notify(conf, context, topic, msg, connection_pool): topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: + # NOTE(russellb): We are explicitly *NOT* using serialize_msg() here. + # The messages sent out as notifications are intended to be consumed by + # 3rd party applications. The notification producer is entirely + # responsible for the message content and versioning. conn.notify_send(topic, msg) |
