summaryrefslogtreecommitdiffstats
path: root/nova/openstack
diff options
context:
space:
mode:
authorRussell Bryant <rbryant@redhat.com>2013-04-14 20:58:11 -0400
committerRussell Bryant <rbryant@redhat.com>2013-04-14 22:57:25 -0700
commit6e5c59bb8c5b633344a407d5238f8b36ab24a397 (patch)
tree2b8234c01360701cc508a914d1bcf2cfef85d034 /nova/openstack
parent0fc0588a41d85a61695f4fa0aab238784ffe5999 (diff)
downloadnova-6e5c59bb8c5b633344a407d5238f8b36ab24a397.tar.gz
nova-6e5c59bb8c5b633344a407d5238f8b36ab24a397.tar.xz
nova-6e5c59bb8c5b633344a407d5238f8b36ab24a397.zip
Sync rpc from oslo-incubator.
This includes rpc api namespace support, some minor configuration related fixes, and the redis based matchmaker for use with impl_zmq. Change-Id: I9fc826a498461417afe8b2a923edac8d68c2c18f
Diffstat (limited to 'nova/openstack')
-rw-r--r--nova/openstack/common/rpc/amqp.py11
-rw-r--r--nova/openstack/common/rpc/common.py2
-rw-r--r--nova/openstack/common/rpc/dispatcher.py21
-rw-r--r--nova/openstack/common/rpc/impl_fake.py12
-rw-r--r--nova/openstack/common/rpc/impl_qpid.py2
-rw-r--r--nova/openstack/common/rpc/impl_zmq.py5
-rw-r--r--nova/openstack/common/rpc/matchmaker_redis.py149
-rw-r--r--nova/openstack/common/rpc/proxy.py6
-rwxr-xr-xnova/openstack/common/rpc/zmq_receiver.py41
9 files changed, 233 insertions, 16 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py
index e6b899cd9..8c98ff367 100644
--- a/nova/openstack/common/rpc/amqp.py
+++ b/nova/openstack/common/rpc/amqp.py
@@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait):
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
- version = message_data.get('version', None)
+ version = message_data.get('version')
+ namespace = message_data.get('namespace')
if not method:
LOG.warn(_('no method for message: %s') % message_data)
ctxt.reply(_('No method for message: %s') % message_data,
connection_pool=self.connection_pool)
return
- self.pool.spawn_n(self._process_data, ctxt, version, method, args)
+ self.pool.spawn_n(self._process_data, ctxt, version, method,
+ namespace, args)
- def _process_data(self, ctxt, version, method, args):
+ def _process_data(self, ctxt, version, method, namespace, args):
"""Process a message in a new thread.
If the proxy object we have has a dispatch method
@@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait):
"""
ctxt.update_store()
try:
- rval = self.proxy.dispatch(ctxt, version, method, **args)
+ rval = self.proxy.dispatch(ctxt, version, method, namespace,
+ **args)
# Check if the result was a generator
if inspect.isgenerator(rval):
for x in rval:
diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py
index a3be2fe10..73e508e3e 100644
--- a/nova/openstack/common/rpc/common.py
+++ b/nova/openstack/common/rpc/common.py
@@ -339,7 +339,7 @@ def deserialize_remote_exception(conf, data):
if not issubclass(klass, Exception):
raise TypeError("Can only deserialize Exceptions")
- failure = klass(**failure.get('kwargs', {}))
+ failure = klass(*failure.get('args', []), **failure.get('kwargs', {}))
except (AttributeError, TypeError, ImportError):
return RemoteError(name, failure.get('message'), trace)
diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py
index 5f4cc3a7c..3c84671df 100644
--- a/nova/openstack/common/rpc/dispatcher.py
+++ b/nova/openstack/common/rpc/dispatcher.py
@@ -103,13 +103,16 @@ class RpcDispatcher(object):
self.callbacks = callbacks
super(RpcDispatcher, self).__init__()
- def dispatch(self, ctxt, version, method, **kwargs):
+ def dispatch(self, ctxt, version, method, namespace, **kwargs):
"""Dispatch a message based on a requested version.
:param ctxt: The request context
:param version: The requested API version from the incoming message
:param method: The method requested to be called by the incoming
message.
+ :param namespace: The namespace for the requested method. If None,
+ the dispatcher will look for a method on a callback
+ object with no namespace set.
:param kwargs: A dict of keyword arguments to be passed to the method.
:returns: Whatever is returned by the underlying method that gets
@@ -120,13 +123,25 @@ class RpcDispatcher(object):
had_compatible = False
for proxyobj in self.callbacks:
- if hasattr(proxyobj, 'RPC_API_VERSION'):
+ # Check for namespace compatibility
+ try:
+ cb_namespace = proxyobj.RPC_API_NAMESPACE
+ except AttributeError:
+ cb_namespace = None
+
+ if namespace != cb_namespace:
+ continue
+
+ # Check for version compatibility
+ try:
rpc_api_version = proxyobj.RPC_API_VERSION
- else:
+ except AttributeError:
rpc_api_version = '1.0'
+
is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible
+
if not hasattr(proxyobj, method):
continue
if is_compatible:
diff --git a/nova/openstack/common/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py
index 965d2640d..367b6ed93 100644
--- a/nova/openstack/common/rpc/impl_fake.py
+++ b/nova/openstack/common/rpc/impl_fake.py
@@ -57,13 +57,14 @@ class Consumer(object):
self.topic = topic
self.proxy = proxy
- def call(self, context, version, method, args, timeout):
+ def call(self, context, version, method, namespace, args, timeout):
done = eventlet.event.Event()
def _inner():
ctxt = RpcContext.from_dict(context.to_dict())
try:
- rval = self.proxy.dispatch(context, version, method, **args)
+ rval = self.proxy.dispatch(context, version, method,
+ namespace, **args)
res = []
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
@@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
- return consumer.call(context, version, method, args, timeout)
+ return consumer.call(context, version, method, namespace, args,
+ timeout)
def call(conf, context, topic, msg, timeout=None):
@@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg):
return
args = msg.get('args', {})
version = msg.get('version', None)
+ namespace = msg.get('namespace', None)
for consumer in CONSUMERS.get(topic, []):
try:
- consumer.call(context, version, method, args, None)
+ consumer.call(context, version, method, namespace, args, None)
except Exception:
pass
diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py
index d3cc12d3c..43f17cbc9 100644
--- a/nova/openstack/common/rpc/impl_qpid.py
+++ b/nova/openstack/common/rpc/impl_qpid.py
@@ -320,7 +320,7 @@ class Connection(object):
# Reconnection is done by self.reconnect()
self.connection.reconnect = False
self.connection.heartbeat = self.conf.qpid_heartbeat
- self.connection.protocol = self.conf.qpid_protocol
+ self.connection.transport = self.conf.qpid_protocol
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
def _register_consumer(self, consumer):
diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py
index c501504cb..0b69ed8e4 100644
--- a/nova/openstack/common/rpc/impl_zmq.py
+++ b/nova/openstack/common/rpc/impl_zmq.py
@@ -276,7 +276,8 @@ class InternalContext(object):
try:
result = proxy.dispatch(
- ctx, data['version'], data['method'], **data['args'])
+ ctx, data['version'], data['method'],
+ data.get('namespace'), **data['args'])
return ConsumerBase.normalize_reply(result, ctx.replies)
except greenlet.GreenletExit:
# ignore these since they are just from shutdowns
@@ -351,7 +352,7 @@ class ConsumerBase(object):
return
proxy.dispatch(ctx, data['version'],
- data['method'], **data['args'])
+ data['method'], data.get('namespace'), **data['args'])
class ZmqBaseReactor(ConsumerBase):
diff --git a/nova/openstack/common/rpc/matchmaker_redis.py b/nova/openstack/common/rpc/matchmaker_redis.py
new file mode 100644
index 000000000..03e399ce7
--- /dev/null
+++ b/nova/openstack/common/rpc/matchmaker_redis.py
@@ -0,0 +1,149 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should accept a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+from oslo.config import cfg
+
+from nova.openstack.common import importutils
+from nova.openstack.common import log as logging
+from nova.openstack.common.rpc import matchmaker as mm_common
+
+redis = importutils.try_import('redis')
+
+
+matchmaker_redis_opts = [
+ cfg.StrOpt('host',
+ default='127.0.0.1',
+ help='Host to locate redis'),
+ cfg.IntOpt('port',
+ default=6379,
+ help='Use this port to connect to redis host.'),
+ cfg.StrOpt('password',
+ default=None,
+ help='Password for Redis server. (optional)'),
+]
+
+CONF = cfg.CONF
+opt_group = cfg.OptGroup(name='matchmaker_redis',
+ title='Options for Redis-based MatchMaker')
+CONF.register_group(opt_group)
+CONF.register_opts(matchmaker_redis_opts, opt_group)
+LOG = logging.getLogger(__name__)
+
+
+class RedisExchange(mm_common.Exchange):
+ def __init__(self, matchmaker):
+ self.matchmaker = matchmaker
+ self.redis = matchmaker.redis
+ super(RedisExchange, self).__init__()
+
+
+class RedisTopicExchange(RedisExchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def run(self, topic):
+ while True:
+ member_name = self.redis.srandmember(topic)
+
+ if not member_name:
+ # If this happens, there are no
+ # longer any members.
+ break
+
+ if not self.matchmaker.is_alive(topic, member_name):
+ continue
+
+ host = member_name.split('.', 1)[1]
+ return [(member_name, host)]
+ return []
+
+
+class RedisFanoutExchange(RedisExchange):
+ """
+ Return a list of all hosts.
+ """
+ def run(self, topic):
+ topic = topic.split('~', 1)[1]
+ hosts = self.redis.smembers(topic)
+ good_hosts = filter(
+ lambda host: self.matchmaker.is_alive(topic, host), hosts)
+
+ return [(x, x.split('.', 1)[1]) for x in good_hosts]
+
+
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
+ """
+ MatchMaker registering and looking-up hosts with a Redis server.
+ """
+ def __init__(self):
+ super(MatchMakerRedis, self).__init__()
+
+ if not redis:
+ raise ImportError("Failed to import module redis.")
+
+ self.redis = redis.StrictRedis(
+ host=CONF.matchmaker_redis.host,
+ port=CONF.matchmaker_redis.port,
+ password=CONF.matchmaker_redis.password)
+
+ self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
+ self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
+ self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
+
+ def ack_alive(self, key, host):
+ topic = "%s.%s" % (key, host)
+ if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
+ # If we could not update the expiration, the key
+ # might have been pruned. Re-register, creating a new
+ # key in Redis.
+ self.register(self.topic_host[host], host)
+
+ def is_alive(self, topic, host):
+ if self.redis.ttl(host) == -1:
+ self.expire(topic, host)
+ return False
+ return True
+
+ def expire(self, topic, host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.delete(host)
+ pipe.srem(topic, host)
+ pipe.execute()
+
+ def backend_register(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.sadd(key, key_host)
+
+ # No value is needed, we just
+ # care if it exists. Sets aren't viable
+ # because only keys can expire.
+ pipe.set(key_host, '')
+
+ pipe.execute()
+
+ def backend_unregister(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.srem(key, key_host)
+ pipe.delete(key_host)
+ pipe.execute()
diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py
index f6a7dbc74..daebcbfa7 100644
--- a/nova/openstack/common/rpc/proxy.py
+++ b/nova/openstack/common/rpc/proxy.py
@@ -59,8 +59,12 @@ class RpcProxy(object):
return topic if topic else self.topic
@staticmethod
+ def make_namespaced_msg(method, namespace, **kwargs):
+ return {'method': method, 'namespace': namespace, 'args': kwargs}
+
+ @staticmethod
def make_msg(method, **kwargs):
- return {'method': method, 'args': kwargs}
+ return RpcProxy.make_namespaced_msg(method, None, **kwargs)
def call(self, context, msg, topic=None, version=None, timeout=None):
"""rpc.call() a remote method.
diff --git a/nova/openstack/common/rpc/zmq_receiver.py b/nova/openstack/common/rpc/zmq_receiver.py
new file mode 100755
index 000000000..bf3386516
--- /dev/null
+++ b/nova/openstack/common/rpc/zmq_receiver.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2011 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+eventlet.monkey_patch()
+
+import contextlib
+import sys
+
+from oslo.config import cfg
+
+from nova.openstack.common import log as logging
+from nova.openstack.common import rpc
+from nova.openstack.common.rpc import impl_zmq
+
+CONF = cfg.CONF
+CONF.register_opts(rpc.rpc_opts)
+CONF.register_opts(impl_zmq.zmq_opts)
+
+
+def main():
+ CONF(sys.argv[1:], project='oslo')
+ logging.setup("oslo")
+
+ with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
+ reactor.consume_in_thread()
+ reactor.wait()