summaryrefslogtreecommitdiffstats
path: root/openstack/common
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2013-03-30 01:05:12 -0400
committerRussell Bryant <rbryant@redhat.com>2013-04-12 10:42:50 -0400
commit6901a3ba3e09ae091480b650ec23c2f2d9543152 (patch)
tree34062944991e1611b1f66965da7f1870fc1589fd /openstack/common
parent9dcf688ea80f52cdb5413514198b2aa81d5a4e09 (diff)
downloadoslo-6901a3ba3e09ae091480b650ec23c2f2d9543152.tar.gz
oslo-6901a3ba3e09ae091480b650ec23c2f2d9543152.tar.xz
oslo-6901a3ba3e09ae091480b650ec23c2f2d9543152.zip
Add rpc method namespace support.
RPC endpoints already had the ability to expose multiple APIs ... sort of. You could pass multiple callback objects to the dispatcher and it would check all of them for a method call. This patch adds the ability to set a namespace on a callback object. This makes exposing multiple APIs a bit more like you would expect it to work. You can invoke a method on a specific callback object, as opposed to having it check all of them for the method. This will allow you to create, manage, and version APIs without any potential conflicts with other APIs being exposed by the same endpoint. An example of where I would like to use this is in Nova, where we have some methods that we would like to expose on *all* rpc endpoints. This includes no public API changes and is fully backwards compatible. Implement blueprint rpc-multi-api. Change-Id: Ief4433e2e1c32cfb05b4cd27b87fe32b40f4341d
Diffstat (limited to 'openstack/common')
-rw-r--r--openstack/common/rpc/amqp.py11
-rw-r--r--openstack/common/rpc/dispatcher.py21
-rw-r--r--openstack/common/rpc/impl_fake.py12
-rw-r--r--openstack/common/rpc/impl_zmq.py5
-rw-r--r--openstack/common/rpc/proxy.py6
5 files changed, 41 insertions, 14 deletions
diff --git a/openstack/common/rpc/amqp.py b/openstack/common/rpc/amqp.py
index 1d45db6..dc9aadb 100644
--- a/openstack/common/rpc/amqp.py
+++ b/openstack/common/rpc/amqp.py
@@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait):
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
- version = message_data.get('version', None)
+ version = message_data.get('version')
+ namespace = message_data.get('namespace')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method,
+ namespace, args)
- def _process_data(self, ctxt, version, method, args):
+ def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
@@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait):
"""
ctxt.update_store()
try:
- rval = self.proxy.dispatch(ctxt, version, method, **args)
+ rval = self.proxy.dispatch(ctxt, version, method, namespace,
+ **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
diff --git a/openstack/common/rpc/dispatcher.py b/openstack/common/rpc/dispatcher.py
index 0adc4ef..2efa5f2 100644
--- a/openstack/common/rpc/dispatcher.py
+++ b/openstack/common/rpc/dispatcher.py
@@ -103,13 +103,16 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- def dispatch(self, ctxt, version, method, **kwargs):
+ def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
:param ctxt: The request context
:param version: The requested API version from the incoming message
:param method: The method requested to be called by the incoming
message.
+ :param namespace: The namespace for the requested method. If None,
+ the dispatcher will look for a method on a callback
+ object with no namespace set.
:param kwargs: A dict of keyword arguments to be passed to the method.
:returns: Whatever is returned by the underlying method that gets
@@ -120,13 +123,25 @@ class RpcDispatcher(object):
had_compatible = False
for proxyobj in self.callbacks:
- if hasattr(proxyobj, 'RPC_API_VERSION'):
+ # Check for namespace compatibility
+ try:
+ cb_namespace = proxyobj.RPC_API_NAMESPACE
+ except AttributeError:
+ cb_namespace = None
+
+ if namespace != cb_namespace:
+ continue
+
+ # Check for version compatibility
+ try:
rpc_api_version = proxyobj.RPC_API_VERSION
- else:
+ except AttributeError:
rpc_api_version = '1.0'
+
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
+
if not hasattr(proxyobj, method):
continue
if is_compatible:
diff --git a/openstack/common/rpc/impl_fake.py b/openstack/common/rpc/impl_fake.py
index 1086147..815570d 100644
--- a/openstack/common/rpc/impl_fake.py
+++ b/openstack/common/rpc/impl_fake.py
@@ -57,13 +57,14 @@ class Consumer(object):
self.topic = topic
self.proxy = proxy
- def call(self, context, version, method, args, timeout):
+ def call(self, context, version, method, namespace, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = self.proxy.dispatch(context, version, method, **args)
+ rval = self.proxy.dispatch(context, version, method,
+ namespace, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
@@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, version, method, args, timeout)
+ return consumer.call(context, version, method, namespace, args,
+ timeout)
def call(conf, context, topic, msg, timeout=None):
@@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, version, method, args, None)
+ consumer.call(context, version, method, namespace, args, None)
except Exception:
pass
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index 3fe45f5..fe880c0 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -276,7 +276,8 @@ class InternalContext(object):
try:
result = proxy.dispatch(
- ctx, data['version'], data['method'], **data['args'])
+ ctx, data['version'], data['method'],
+ data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
@@ -351,7 +352,7 @@ class ConsumerBase(object):
return
proxy.dispatch(ctx, data['version'],
- data['method'], **data['args'])
+ data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
diff --git a/openstack/common/rpc/proxy.py b/openstack/common/rpc/proxy.py
index 822248d..284f375 100644
--- a/openstack/common/rpc/proxy.py
+++ b/openstack/common/rpc/proxy.py
@@ -59,8 +59,12 @@ class RpcProxy(object):
return topic if topic else self.topic
@staticmethod
+ def make_namespaced_msg(method, namespace, **kwargs):
+ return {'method': method, 'namespace': namespace, 'args': kwargs}
+
+ @staticmethod
def make_msg(method, **kwargs):
- return {'method': method, 'args': kwargs}
+ return RpcProxy.make_namespaced_msg(method, None, **kwargs)
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.