diff options
-rw-r--r-- | nova/openstack/common/notifier/api.py | 7 | ||||
-rw-r--r-- | nova/openstack/common/notifier/rpc_notifier.py | 2 | ||||
-rw-r--r-- | nova/openstack/common/notifier/rpc_notifier2.py | 51 | ||||
-rw-r--r-- | nova/openstack/common/rpc/__init__.py | 5 | ||||
-rw-r--r-- | nova/openstack/common/rpc/amqp.py | 23 | ||||
-rw-r--r-- | nova/openstack/common/rpc/common.py | 144 | ||||
-rw-r--r-- | nova/openstack/common/rpc/dispatcher.py | 18 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_kombu.py | 28 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 25 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_zmq.py | 54 |
10 files changed, 270 insertions, 87 deletions
diff --git a/nova/openstack/common/notifier/api.py b/nova/openstack/common/notifier/api.py index 76b725c0a..0ec55fbf0 100644 --- a/nova/openstack/common/notifier/api.py +++ b/nova/openstack/common/notifier/api.py @@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload): for driver in _get_drivers(): try: driver.notify(context, msg) - except Exception, e: + except Exception as e: LOG.exception(_("Problem '%(e)s' attempting to " "send to notification system. " - "Payload=%(payload)s") % locals()) + "Payload=%(payload)s") + % dict(e=e, payload=payload)) _drivers = None @@ -166,7 +167,7 @@ def add_driver(notification_driver): try: driver = importutils.import_module(notification_driver) _drivers[notification_driver] = driver - except ImportError as e: + except ImportError: LOG.exception(_("Failed to load notifier %s. " "These notifications will not be sent.") % notification_driver) diff --git a/nova/openstack/common/notifier/rpc_notifier.py b/nova/openstack/common/notifier/rpc_notifier.py index aa9e8860e..8316f7ab7 100644 --- a/nova/openstack/common/notifier/rpc_notifier.py +++ b/nova/openstack/common/notifier/rpc_notifier.py @@ -41,6 +41,6 @@ def notify(context, message): topic = '%s.%s' % (topic, priority) try: rpc.notify(context, topic, message) - except Exception, e: + except Exception: LOG.exception(_("Could not send notification to %(topic)s. " "Payload=%(message)s"), locals()) diff --git a/nova/openstack/common/notifier/rpc_notifier2.py b/nova/openstack/common/notifier/rpc_notifier2.py new file mode 100644 index 000000000..beb2e9f71 --- /dev/null +++ b/nova/openstack/common/notifier/rpc_notifier2.py @@ -0,0 +1,51 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +'''messaging based notification driver, with message envelopes''' + +from nova.openstack.common import cfg +from nova.openstack.common import context as req_context +from nova.openstack.common.gettextutils import _ +from nova.openstack.common import log as logging +from nova.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt( + 'topics', default=['notifications', ], + help='AMQP topic(s) used for openstack notifications') + +opt_group = cfg.OptGroup(name='rpc_notifier2', + title='Options for rpc_notifier2') + +CONF = cfg.CONF +CONF.register_group(opt_group) +CONF.register_opt(notification_topic_opt, opt_group) + + +def notify(context, message): + """Sends a notification via RPC""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.rpc_notifier2.topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message, envelope=True) + except Exception: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index cfdac03bd..d43b48fa2 100644 --- a/nova/openstack/common/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None): return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) -def notify(context, topic, msg): +def notify(context, topic, msg, envelope=False): """Send notification event. :param context: Information that identifies the user that has made this request. :param topic: The topic to send the notification to. :param msg: This is a dict of content of event. + :param envelope: Set to True to enable message envelope for notifications. :returns: None """ - return _get_impl().notify(cfg.CONF, context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg, envelope) def cleanup(): diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 6464914db..105e6fcbe 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -33,7 +33,6 @@ from eventlet import greenpool from eventlet import pools from eventlet import semaphore -from nova.openstack.common import cfg from nova.openstack.common import excutils from nova.openstack.common.gettextutils import _ from nova.openstack.common import local @@ -169,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, 'failure': failure} if ending: msg['ending'] = True - conn.direct_send(msg_id, msg) + conn.direct_send(msg_id, rpc_common.serialize_msg(msg)) class RpcContext(rpc_common.CommonRpcContext): @@ -294,6 +293,10 @@ class ProxyCallback(object): ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) + def wait(self): + """Wait for all callback threads to exit.""" + self.pool.waitall() + class MulticallWaiter(object): def __init__(self, conf, connection, timeout): @@ -356,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): # that will continue to use the connection. When it's done, # connection.close() will get called which will put it back into # the pool - LOG.debug(_('Making asynchronous call on %s ...'), topic) + LOG.debug(_('Making synchronous call on %s ...'), topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) @@ -365,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): conn = ConnectionContext(conf, connection_pool) wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) return wait_msg @@ -384,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast(conf, context, topic, msg, connection_pool): @@ -392,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) def cast_to_server(conf, context, server_params, topic, msg, connection_pool): @@ -400,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.topic_send(topic, msg) + conn.topic_send(topic, rpc_common.serialize_msg(msg)) def fanout_cast_to_server(conf, context, server_params, topic, msg, @@ -409,16 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: - conn.fanout_send(topic, msg) + conn.fanout_send(topic, rpc_common.serialize_msg(msg)) -def notify(conf, context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: + if envelope: + msg = rpc_common.serialize_msg(msg, force_envelope=True) conn.notify_send(topic, msg) diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index a941ab1d0..bf4f5a3de 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -21,6 +21,7 @@ import copy import sys import traceback +from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils from nova.openstack.common import jsonutils @@ -28,9 +29,50 @@ from nova.openstack.common import local from nova.openstack.common import log as logging +CONF = cfg.CONF LOG = logging.getLogger(__name__) +'''RPC Envelope Version. + +This version number applies to the top level structure of messages sent out. +It does *not* apply to the message payload, which must be versioned +independently. For example, when using rpc APIs, a version number is applied +for changes to the API being exposed over rpc. This version number is handled +in the rpc proxy and dispatcher modules. + +This version number applies to the message envelope that is used in the +serialization done inside the rpc layer. See serialize_msg() and +deserialize_msg(). + +The current message format (version 2.0) is very simple. It is: + + { + 'nova.version': <RPC Envelope Version as a String>, + 'nova.message': <Application Message Payload, JSON encoded> + } + +Message format version '1.0' is just considered to be the messages we sent +without a message envelope. + +So, the current message envelope just includes the envelope version. It may +eventually contain additional information, such as a signature for the message +payload. + +We will JSON encode the application message payload. The message envelope, +which includes the JSON encoded application message body, will be passed down +to the messaging libraries as a dict. +''' +_RPC_ENVELOPE_VERSION = '2.0' + +_VERSION_KEY = 'nova.version' +_MESSAGE_KEY = 'nova.message' + + +# TODO(russellb) Turn this on after Grizzly. +_SEND_RPC_ENVELOPE = False + + class RPCException(Exception): message = _("An unknown RPC related exception occurred.") @@ -91,6 +133,11 @@ class UnsupportedRpcVersion(RPCException): "this endpoint.") +class UnsupportedRpcEnvelopeVersion(RPCException): + message = _("Specified RPC envelope version, %(version)s, " + "not supported by this endpoint.") + + class Connection(object): """A connection, returned by rpc.create_connection(). @@ -165,8 +212,12 @@ class Connection(object): def _safe_log(log_func, msg, msg_data): """Sanitizes the msg_data field before logging.""" - SANITIZE = {'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), } + SANITIZE = {'set_admin_password': [('args', 'new_pass')], + 'run_instance': [('args', 'admin_password')], + 'route_message': [('args', 'message', 'args', 'method_info', + 'method_kwargs', 'password'), + ('args', 'message', 'args', 'method_info', + 'method_kwargs', 'admin_password')]} has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_context_token = '_context_auth_token' in msg_data @@ -178,14 +229,16 @@ def _safe_log(log_func, msg, msg_data): msg_data = copy.deepcopy(msg_data) if has_method: - method = msg_data['method'] - if method in SANITIZE: - args_to_sanitize = SANITIZE[method] - for arg in args_to_sanitize: - try: - msg_data['args'][arg] = "<SANITIZED>" - except KeyError: - pass + for arg in SANITIZE.get(msg_data['method'], []): + try: + d = msg_data + for elem in arg[:-1]: + d = d[elem] + d[arg[-1]] = '<SANITIZED>' + except KeyError, e: + LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'), + {'item': arg, + 'err': e}) if has_context_token: msg_data['_context_auth_token'] = '<SANITIZED>' @@ -344,3 +397,74 @@ def client_exceptions(*exceptions): return catch_client_exception(exceptions, func, *args, **kwargs) return inner return outer + + +def version_is_compatible(imp_version, version): + """Determine whether versions are compatible. + + :param imp_version: The version implemented + :param version: The version requested by an incoming message. + """ + version_parts = version.split('.') + imp_version_parts = imp_version.split('.') + if int(version_parts[0]) != int(imp_version_parts[0]): # Major + return False + if int(version_parts[1]) > int(imp_version_parts[1]): # Minor + return False + return True + + +def serialize_msg(raw_msg, force_envelope=False): + if not _SEND_RPC_ENVELOPE and not force_envelope: + return raw_msg + + # NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more + # information about this format. + msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION, + _MESSAGE_KEY: jsonutils.dumps(raw_msg)} + + return msg + + +def deserialize_msg(msg): + # NOTE(russellb): Hang on to your hats, this road is about to + # get a little bumpy. + # + # Robustness Principle: + # "Be strict in what you send, liberal in what you accept." + # + # At this point we have to do a bit of guessing about what it + # is we just received. Here is the set of possibilities: + # + # 1) We received a dict. This could be 2 things: + # + # a) Inspect it to see if it looks like a standard message envelope. + # If so, great! + # + # b) If it doesn't look like a standard message envelope, it could either + # be a notification, or a message from before we added a message + # envelope (referred to as version 1.0). + # Just return the message as-is. + # + # 2) It's any other non-dict type. Just return it and hope for the best. + # This case covers return values from rpc.call() from before message + # envelopes were used. (messages to call a method were always a dict) + + if not isinstance(msg, dict): + # See #2 above. + return msg + + base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY) + if not all(map(lambda key: key in msg, base_envelope_keys)): + # See #1.b above. + return msg + + # At this point we think we have the message envelope + # format we were expecting. (#1.a above) + + if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]): + raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY]) + + raw_msg = jsonutils.loads(msg[_MESSAGE_KEY]) + + return raw_msg diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py index 4dee5d509..5f4cc3a7c 100644 --- a/nova/openstack/common/rpc/dispatcher.py +++ b/nova/openstack/common/rpc/dispatcher.py @@ -103,21 +103,6 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - @staticmethod - def _is_compatible(mversion, version): - """Determine whether versions are compatible. - - :param mversion: The API version implemented by a callback. - :param version: The API version requested by an incoming message. - """ - version_parts = version.split('.') - mversion_parts = mversion.split('.') - if int(version_parts[0]) != int(mversion_parts[0]): # Major - return False - if int(version_parts[1]) > int(mversion_parts[1]): # Minor - return False - return True - def dispatch(self, ctxt, version, method, **kwargs): """Dispatch a message based on a requested version. @@ -139,7 +124,8 @@ class RpcDispatcher(object): rpc_api_version = proxyobj.RPC_API_VERSION else: rpc_api_version = '1.0' - is_compatible = self._is_compatible(rpc_api_version, version) + is_compatible = rpc_common.version_is_compatible(rpc_api_version, + version) had_compatible = had_compatible or is_compatible if not hasattr(proxyobj, method): continue diff --git a/nova/openstack/common/rpc/impl_kombu.py b/nova/openstack/common/rpc/impl_kombu.py index bb0ade27c..bf38201f5 100644 --- a/nova/openstack/common/rpc/impl_kombu.py +++ b/nova/openstack/common/rpc/impl_kombu.py @@ -162,7 +162,8 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) try: - callback(message.payload) + msg = rpc_common.deserialize_msg(message.payload) + callback(msg) message.ack() except Exception: LOG.exception(_("Failed to process message... skipping it.")) @@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase): # Default options options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) exchange = kombu.entity.Exchange(name=msg_id, type='direct', @@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase): options = {'durable': False, 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', durable=options['durable'], @@ -316,7 +317,7 @@ class DirectPublisher(Publisher): options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) super(DirectPublisher, self).__init__(channel, msg_id, msg_id, type='direct', **options) @@ -350,7 +351,7 @@ class FanoutPublisher(Publisher): """ options = {'durable': False, 'auto_delete': True, - 'exclusive': True} + 'exclusive': False} options.update(kwargs) super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, None, type='fanout', **options) @@ -387,6 +388,7 @@ class Connection(object): def __init__(self, conf, server_params=None): self.consumers = [] self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf self.max_retries = self.conf.rabbit_max_retries # Try forever? @@ -469,7 +471,7 @@ class Connection(object): LOG.info(_("Reconnecting to AMQP server on " "%(hostname)s:%(port)d") % params) try: - self.connection.close() + self.connection.release() except self.connection_errors: pass # Setting this in case the next statement fails, though @@ -573,12 +575,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.channel.close() self.channel = self.connection.channel() # work around 'memory' transport bug in 1.1.3 @@ -644,6 +648,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg, **kwargs): """Send to a publisher based on the publisher class""" @@ -719,6 +728,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: self.declare_fanout_consumer(topic, proxy_cb) @@ -730,6 +740,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) self.declare_topic_consumer(topic, proxy_cb, pool_name) @@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify( conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index 5570ea867..2e05f02f1 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -124,7 +124,8 @@ class ConsumerBase(object): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() try: - self.callback(message.content) + msg = rpc_common.deserialize_msg(message.content) + self.callback(msg) except Exception: LOG.exception(_("Failed to process message... skipping it.")) finally: @@ -277,8 +278,16 @@ class Connection(object): self.session = None self.consumers = {} self.consumer_thread = None + self.proxy_callbacks = [] self.conf = conf + if server_params and 'hostname' in server_params: + # NOTE(russellb) This enables support for cast_to_server. + server_params['qpid_hosts'] = [ + '%s:%d' % (server_params['hostname'], + server_params.get('port', 5672)) + ] + params = { 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, @@ -367,12 +376,14 @@ class Connection(object): def close(self): """Close/release this connection""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.connection.close() self.connection = None def reset(self): """Reset a connection so it can be used again""" self.cancel_consumer_thread() + self.wait_on_proxy_callbacks() self.session.close() self.session = self.connection.session() self.consumers = {} @@ -427,6 +438,11 @@ class Connection(object): pass self.consumer_thread = None + def wait_on_proxy_callbacks(self): + """Wait for all proxy callback threads to exit.""" + for proxy_cb in self.proxy_callbacks: + proxy_cb.wait() + def publisher_send(self, cls, topic, msg): """Send to a publisher based on the publisher class""" @@ -502,6 +518,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) if fanout: consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) @@ -517,6 +534,7 @@ class Connection(object): proxy_cb = rpc_amqp.ProxyCallback( self.conf, proxy, rpc_amqp.get_connection_pool(self.conf, Connection)) + self.proxy_callbacks.append(proxy_cb) consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, name=pool_name) @@ -575,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg): rpc_amqp.get_connection_pool(conf, Connection)) -def notify(conf, context, topic, msg): +def notify(conf, context, topic, msg, envelope): """Sends a notification event on a topic.""" return rpc_amqp.notify(conf, context, topic, msg, - rpc_amqp.get_connection_pool(conf, Connection)) + rpc_amqp.get_connection_pool(conf, Connection), + envelope) def cleanup(): diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index 4ab47b8f8..ef4aa21db 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -28,7 +28,6 @@ import greenlet from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils -from nova.openstack.common import jsonutils from nova.openstack.common.rpc import common as rpc_common @@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object -def _serialize(data): - """ - Serialization wrapper - We prefer using JSON, but it cannot encode all types. - Error if a developer passes us bad data. - """ - try: - return str(jsonutils.dumps(data, ensure_ascii=True)) - except TypeError: - LOG.error(_("JSON serialization failed.")) - raise - - -def _deserialize(data): - """ - Deserialization wrapper - """ - LOG.debug(_("Deserializing: %s"), data) - return jsonutils.loads(data) - - class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -205,9 +183,10 @@ class ZmqClient(object): def __init__(self, addr, socket_type=zmq.PUSH, bind=False): self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data): - self.outq.send([str(msg_id), str(topic), str('cast'), - _serialize(data)]) + def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + if serialize: + data = rpc_common.serialize_msg(data, force_envelope) + self.outq.send([str(msg_id), str(topic), str('cast'), data]) def close(self): self.outq.close() @@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return _serialize(ctx_data) + return rpc_common.serialize_msg(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(_deserialize(data)) + return RpcContext.from_dict(rpc_common.deserialize_msg(data)) class InternalContext(object): @@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = _deserialize(in_msg) + inside = rpc_common.deserialize_msg(in_msg) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), _serialize(response)] + data = [str(msg_id), rpc_common.serialize_msg(response)] else: sock_type = zmq.PUSH @@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = _deserialize(in_msg) + ctx, request = rpc_common.deserialize_msg(in_msg) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -531,7 +510,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, msg_id, topic, msg, timeout=None): +def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True, + force_envelope=False): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -540,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(msg_id, topic, payload) + conn.cast(msg_id, topic, payload, serialize, force_envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -590,7 +570,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1]) + responses = rpc_common.deserialize_msg(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -609,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None): +def _multi_send(method, context, topic, msg, timeout=None, serialize=True, + force_envelope=False): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -635,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None): if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, _topic, msg, timeout) + _topic, _topic, msg, timeout, serialize, + force_envelope) return return method(_addr, context, _topic, _topic, msg, timeout) @@ -676,6 +658,8 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') + kwargs['serialize'] = kwargs.pop('envelope') + kwargs['force_envelope'] = True cast(conf, context, topic, msg, **kwargs) |