diff options
Diffstat (limited to 'openstack')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 11 | ||||
| -rw-r--r-- | openstack/common/rpc/dispatcher.py | 21 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_fake.py | 12 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/proxy.py | 6 |
5 files changed, 41 insertions, 14 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 1d45db6..dc9aadb 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py index 0adc4ef..2efa5f2 100644 --- a/openstack/common/rpc/dispatcher.py +++ b/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/openstack/common/rpc/impl_fake.py b/openstack/common/rpc/impl_fake.py index 1086147..815570d 100644 --- a/openstack/common/rpc/impl_fake.py +++ b/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 3fe45f5..fe880c0 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -276,7 +276,8 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -351,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 822248d..284f375 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -59,8 +59,12 @@ class RpcProxy(object): return topic if topic else self.topic @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. |
