diff options
| author | Russell Bryant <rbryant@redhat.com> | 2013-03-30 01:05:12 -0400 |
|---|---|---|
| committer | Russell Bryant <rbryant@redhat.com> | 2013-04-12 10:42:50 -0400 |
| commit | 6901a3ba3e09ae091480b650ec23c2f2d9543152 (patch) | |
| tree | 34062944991e1611b1f66965da7f1870fc1589fd /openstack/common | |
| parent | 9dcf688ea80f52cdb5413514198b2aa81d5a4e09 (diff) | |
| download | oslo-6901a3ba3e09ae091480b650ec23c2f2d9543152.tar.gz oslo-6901a3ba3e09ae091480b650ec23c2f2d9543152.tar.xz oslo-6901a3ba3e09ae091480b650ec23c2f2d9543152.zip | |
Add rpc method namespace support.
RPC endpoints already had the ability to expose multiple APIs ... sort
of. You could pass multiple callback objects to the dispatcher and it
would check all of them for a method call.
This patch adds the ability to set a namespace on a callback object.
This makes exposing multiple APIs a bit more like you would expect it to
work. You can invoke a method on a specific callback object, as opposed
to having it check all of them for the method.
This will allow you to create, manage, and version APIs without any
potential conflicts with other APIs being exposed by the same endpoint.
An example of where I would like to use this is in Nova, where we have
some methods that we would like to expose on *all* rpc endpoints.
This includes no public API changes and is fully backwards compatible.
Implement blueprint rpc-multi-api.
Change-Id: Ief4433e2e1c32cfb05b4cd27b87fe32b40f4341d
Diffstat (limited to 'openstack/common')
| -rw-r--r-- | openstack/common/rpc/amqp.py | 11 | ||||
| -rw-r--r-- | openstack/common/rpc/dispatcher.py | 21 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_fake.py | 12 | ||||
| -rw-r--r-- | openstack/common/rpc/impl_zmq.py | 5 | ||||
| -rw-r--r-- | openstack/common/rpc/proxy.py | 6 |
5 files changed, 41 insertions, 14 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py index 1d45db6..dc9aadb 100644 --- a/openstack/common/rpc/amqp.py +++ b/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py index 0adc4ef..2efa5f2 100644 --- a/openstack/common/rpc/dispatcher.py +++ b/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/openstack/common/rpc/impl_fake.py b/openstack/common/rpc/impl_fake.py index 1086147..815570d 100644 --- a/openstack/common/rpc/impl_fake.py +++ b/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index 3fe45f5..fe880c0 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -276,7 +276,8 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -351,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py index 822248d..284f375 100644 --- a/openstack/common/rpc/proxy.py +++ b/openstack/common/rpc/proxy.py @@ -59,8 +59,12 @@ class RpcProxy(object): return topic if topic else self.topic @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. |
