diff options
author | Russell Bryant <rbryant@redhat.com> | 2013-04-14 20:58:11 -0400 |
---|---|---|
committer | Russell Bryant <rbryant@redhat.com> | 2013-04-14 22:57:25 -0700 |
commit | 6e5c59bb8c5b633344a407d5238f8b36ab24a397 (patch) | |
tree | 2b8234c01360701cc508a914d1bcf2cfef85d034 /nova/openstack | |
parent | 0fc0588a41d85a61695f4fa0aab238784ffe5999 (diff) | |
download | nova-6e5c59bb8c5b633344a407d5238f8b36ab24a397.tar.gz nova-6e5c59bb8c5b633344a407d5238f8b36ab24a397.tar.xz nova-6e5c59bb8c5b633344a407d5238f8b36ab24a397.zip |
Sync rpc from oslo-incubator.
This includes rpc api namespace support, some minor configuration
related fixes, and the redis based matchmaker for use with impl_zmq.
Change-Id: I9fc826a498461417afe8b2a923edac8d68c2c18f
Diffstat (limited to 'nova/openstack')
-rw-r--r-- | nova/openstack/common/rpc/amqp.py | 11 | ||||
-rw-r--r-- | nova/openstack/common/rpc/common.py | 2 | ||||
-rw-r--r-- | nova/openstack/common/rpc/dispatcher.py | 21 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_fake.py | 12 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_qpid.py | 2 | ||||
-rw-r--r-- | nova/openstack/common/rpc/impl_zmq.py | 5 | ||||
-rw-r--r-- | nova/openstack/common/rpc/matchmaker_redis.py | 149 | ||||
-rw-r--r-- | nova/openstack/common/rpc/proxy.py | 6 | ||||
-rwxr-xr-x | nova/openstack/common/rpc/zmq_receiver.py | 41 |
9 files changed, 233 insertions, 16 deletions
diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index e6b899cd9..8c98ff367 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -408,15 +408,17 @@ class ProxyCallback(_ThreadPoolWithWait): ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) - version = message_data.get('version', None) + version = message_data.get('version') + namespace = message_data.get('namespace') if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, version, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, + namespace, args) - def _process_data(self, ctxt, version, method, args): + def _process_data(self, ctxt, version, method, namespace, args): """Process a message in a new thread. If the proxy object we have has a dispatch method @@ -427,7 +429,8 @@ class ProxyCallback(_ThreadPoolWithWait): """ ctxt.update_store() try: - rval = self.proxy.dispatch(ctxt, version, method, **args) + rval = self.proxy.dispatch(ctxt, version, method, namespace, + **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index a3be2fe10..73e508e3e 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -339,7 +339,7 @@ def deserialize_remote_exception(conf, data): if not issubclass(klass, Exception): raise TypeError("Can only deserialize Exceptions") - failure = klass(**failure.get('kwargs', {})) + failure = klass(*failure.get('args', []), **failure.get('kwargs', {})) except (AttributeError, TypeError, ImportError): return RemoteError(name, failure.get('message'), trace) diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py index 5f4cc3a7c..3c84671df 100644 --- a/nova/openstack/common/rpc/dispatcher.py +++ b/nova/openstack/common/rpc/dispatcher.py @@ -103,13 +103,16 @@ class RpcDispatcher(object): self.callbacks = callbacks super(RpcDispatcher, self).__init__() - def dispatch(self, ctxt, version, method, **kwargs): + def dispatch(self, ctxt, version, method, namespace, **kwargs): """Dispatch a message based on a requested version. :param ctxt: The request context :param version: The requested API version from the incoming message :param method: The method requested to be called by the incoming message. + :param namespace: The namespace for the requested method. If None, + the dispatcher will look for a method on a callback + object with no namespace set. :param kwargs: A dict of keyword arguments to be passed to the method. :returns: Whatever is returned by the underlying method that gets @@ -120,13 +123,25 @@ class RpcDispatcher(object): had_compatible = False for proxyobj in self.callbacks: - if hasattr(proxyobj, 'RPC_API_VERSION'): + # Check for namespace compatibility + try: + cb_namespace = proxyobj.RPC_API_NAMESPACE + except AttributeError: + cb_namespace = None + + if namespace != cb_namespace: + continue + + # Check for version compatibility + try: rpc_api_version = proxyobj.RPC_API_VERSION - else: + except AttributeError: rpc_api_version = '1.0' + is_compatible = rpc_common.version_is_compatible(rpc_api_version, version) had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): continue if is_compatible: diff --git a/nova/openstack/common/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py index 965d2640d..367b6ed93 100644 --- a/nova/openstack/common/rpc/impl_fake.py +++ b/nova/openstack/common/rpc/impl_fake.py @@ -57,13 +57,14 @@ class Consumer(object): self.topic = topic self.proxy = proxy - def call(self, context, version, method, args, timeout): + def call(self, context, version, method, namespace, args, timeout): done = eventlet.event.Event() def _inner(): ctxt = RpcContext.from_dict(context.to_dict()) try: - rval = self.proxy.dispatch(context, version, method, **args) + rval = self.proxy.dispatch(context, version, method, + namespace, **args) res = [] # Caller might have called ctxt.reply() manually for (reply, failure) in ctxt._response: @@ -140,13 +141,15 @@ def multicall(conf, context, topic, msg, timeout=None): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) try: consumer = CONSUMERS[topic][0] except (KeyError, IndexError): return iter([None]) else: - return consumer.call(context, version, method, args, timeout) + return consumer.call(context, version, method, namespace, args, + timeout) def call(conf, context, topic, msg, timeout=None): @@ -183,9 +186,10 @@ def fanout_cast(conf, context, topic, msg): return args = msg.get('args', {}) version = msg.get('version', None) + namespace = msg.get('namespace', None) for consumer in CONSUMERS.get(topic, []): try: - consumer.call(context, version, method, args, None) + consumer.call(context, version, method, namespace, args, None) except Exception: pass diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index d3cc12d3c..43f17cbc9 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -320,7 +320,7 @@ class Connection(object): # Reconnection is done by self.reconnect() self.connection.reconnect = False self.connection.heartbeat = self.conf.qpid_heartbeat - self.connection.protocol = self.conf.qpid_protocol + self.connection.transport = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay def _register_consumer(self, consumer): diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index c501504cb..0b69ed8e4 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -276,7 +276,8 @@ class InternalContext(object): try: result = proxy.dispatch( - ctx, data['version'], data['method'], **data['args']) + ctx, data['version'], data['method'], + data.get('namespace'), **data['args']) return ConsumerBase.normalize_reply(result, ctx.replies) except greenlet.GreenletExit: # ignore these since they are just from shutdowns @@ -351,7 +352,7 @@ class ConsumerBase(object): return proxy.dispatch(ctx, data['version'], - data['method'], **data['args']) + data['method'], data.get('namespace'), **data['args']) class ZmqBaseReactor(ConsumerBase): diff --git a/nova/openstack/common/rpc/matchmaker_redis.py b/nova/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 000000000..03e399ce7 --- /dev/null +++ b/nova/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from nova.openstack.common import importutils +from nova.openstack.common import log as logging +from nova.openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default=None, + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """ + Return a list of all hosts. + """ + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """ + MatchMaker registering and looking-up hosts with a Redis server. + """ + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.StrictRedis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute() diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py index f6a7dbc74..daebcbfa7 100644 --- a/nova/openstack/common/rpc/proxy.py +++ b/nova/openstack/common/rpc/proxy.py @@ -59,8 +59,12 @@ class RpcProxy(object): return topic if topic else self.topic @staticmethod + def make_namespaced_msg(method, namespace, **kwargs): + return {'method': method, 'namespace': namespace, 'args': kwargs} + + @staticmethod def make_msg(method, **kwargs): - return {'method': method, 'args': kwargs} + return RpcProxy.make_namespaced_msg(method, None, **kwargs) def call(self, context, msg, topic=None, version=None, timeout=None): """rpc.call() a remote method. diff --git a/nova/openstack/common/rpc/zmq_receiver.py b/nova/openstack/common/rpc/zmq_receiver.py new file mode 100755 index 000000000..bf3386516 --- /dev/null +++ b/nova/openstack/common/rpc/zmq_receiver.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import sys + +from oslo.config import cfg + +from nova.openstack.common import log as logging +from nova.openstack.common import rpc +from nova.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='oslo') + logging.setup("oslo") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() |