diff options
| -rw-r--r-- | openstack/common/rpc/amqp.py | 11 | ||||
| -rw-r--r-- | openstack/common/rpc/dispatcher.py | 21 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_fake.py | 12 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/proxy.py | 6 | ||||
| -rw-r--r-- | tests/unit/rpc/test_dispatcher.py | 44 | ||||
| -rw-r--r-- | tests/unit/rpc/test_proxy.py | 12 |
7 files changed, 91 insertions, 20 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 1d45db6..dc9aadb 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py index 0adc4ef..2efa5f2 100644 --- a/openstack/common/rpc/dispatcher.py +++ b/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/openstack/common/rpc/impl_fake.py b/openstack/common/rpc/impl_fake.py index 1086147..815570d 100644 --- a/openstack/common/rpc/impl_fake.py +++ b/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 3fe45f5..fe880c0 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -276,7 +276,8 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -351,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 822248d..284f375 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -59,8 +59,12 @@ class RpcProxy(object): return topic if topic else self.topic @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. diff --git a/tests/unit/rpc/test_dispatcher.py b/tests/unit/rpc/test_dispatcher.py index fe49776..8998a90 100644 --- a/tests/unit/rpc/test_dispatcher.py +++ b/tests/unit/rpc/test_dispatcher.py @@ -58,6 +58,18 @@ class RpcDispatcherTestCase(utils.BaseTestCase): self.test_method_ctxt = ctxt self.test_method_arg1 = arg1 + class API4(object): + RPC_API_VERSION = '1.0' + RPC_API_NAMESPACE = 'testapi' + + def __init__(self): + self.test_method_ctxt = None + self.test_method_arg1 = None + + def test_method(self, ctxt, arg1): + self.test_method_ctxt = ctxt + self.test_method_arg1 = arg1 + def setUp(self): super(RpcDispatcherTestCase, self).setUp() self.ctxt = context.RequestContext('fake_user', 'fake_project') @@ -67,7 +79,7 @@ class RpcDispatcherTestCase(utils.BaseTestCase): v3 = self.API3() disp = dispatcher.RpcDispatcher([v2, v3]) - disp.dispatch(self.ctxt, version, 'test_method', arg1=1) + disp.dispatch(self.ctxt, version, 'test_method', None, arg1=1) self.assertEqual(v2.test_method_ctxt, expectations[0]) self.assertEqual(v2.test_method_arg1, expectations[1]) @@ -104,7 +116,7 @@ class RpcDispatcherTestCase(utils.BaseTestCase): v1 = self.API1() disp = dispatcher.RpcDispatcher([v1]) - disp.dispatch(self.ctxt, None, 'test_method', arg1=1) + disp.dispatch(self.ctxt, None, 'test_method', None, arg1=1) self.assertEqual(v1.test_method_ctxt, self.ctxt) self.assertEqual(v1.test_method_arg1, 1) @@ -114,11 +126,35 @@ class RpcDispatcherTestCase(utils.BaseTestCase): disp = dispatcher.RpcDispatcher([v1]) self.assertRaises(AttributeError, disp.dispatch, - self.ctxt, "1.0", "does_not_exist") + self.ctxt, "1.0", "does_not_exist", None) def test_missing_method_version_no_match(self): v1 = self.API1() disp = dispatcher.RpcDispatcher([v1]) self.assertRaises(rpc_common.UnsupportedRpcVersion, disp.dispatch, - self.ctxt, "2.0", "does_not_exist") + self.ctxt, "2.0", "does_not_exist", None) + + def test_method_without_namespace(self): + v1 = self.API1() + v4 = self.API4() + disp = dispatcher.RpcDispatcher([v1, v4]) + + disp.dispatch(self.ctxt, '1.0', 'test_method', None, arg1=1) + + self.assertEqual(v1.test_method_ctxt, self.ctxt) + self.assertEqual(v1.test_method_arg1, 1) + self.assertEqual(v4.test_method_ctxt, None) + self.assertEqual(v4.test_method_arg1, None) + + def test_method_with_namespace(self): + v1 = self.API1() + v4 = self.API4() + disp = dispatcher.RpcDispatcher([v1, v4]) + + disp.dispatch(self.ctxt, '1.0', 'test_method', 'testapi', arg1=1) + + self.assertEqual(v1.test_method_ctxt, None) + self.assertEqual(v1.test_method_arg1, None) + self.assertEqual(v4.test_method_ctxt, self.ctxt) + self.assertEqual(v4.test_method_arg1, 1) diff --git a/tests/unit/rpc/test_proxy.py b/tests/unit/rpc/test_proxy.py index 97bb9d5..fe4bffb 100644 --- a/tests/unit/rpc/test_proxy.py +++ b/tests/unit/rpc/test_proxy.py @@ -144,6 +144,14 @@ class RpcProxyTestCase(utils.BaseTestCase): 'fanout_cast_to_server', server_params={'blah': 1}, supports_topic_override=False) + def test_make_namespaced_msg(self): + msg = proxy.RpcProxy.make_namespaced_msg('test_method', 'x', a=1, b=2) + expected = {'method': 'test_method', 'namespace': 'x', + 'args': {'a': 1, 'b': 2}} + self.assertEqual(msg, expected) + def test_make_msg(self): - self.assertEqual(proxy.RpcProxy.make_msg('test_method', a=1, b=2), - {'method': 'test_method', 'args': {'a': 1, 'b': 2}}) + msg = proxy.RpcProxy.make_msg('test_method', a=1, b=2) + expected = {'method': 'test_method', 'namespace': None, + 'args': {'a': 1, 'b': 2}} + self.assertEqual(msg, expected) |
