diff options
-rw-r--r-- | openstack/common/rpc/impl_zmq.py | 8 | ||||
-rw-r--r-- | openstack/common/rpc/matchmaker.py | 169 | ||||
-rw-r--r-- | openstack/common/rpc/matchmaker_redis.py | 149 | ||||
-rw-r--r-- | tests/unit/rpc/matchmaker_common.py | 72 | ||||
-rw-r--r-- | tests/unit/rpc/test_matchmaker.py | 36 | ||||
-rw-r--r-- | tests/unit/rpc/test_matchmaker_redis.py | 140 | ||||
-rw-r--r-- | tools/test-requires | 1 |
7 files changed, 542 insertions, 33 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index cf3fdc8..6c31378 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -594,6 +594,9 @@ class Connection(rpc_common.Connection): self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) + # Subscription scenarios if fanout: sock_type = zmq.SUB @@ -620,6 +623,10 @@ class Connection(rpc_common.Connection): self.topics.append(topic) def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + self.reactor.close() self.topics = [] @@ -627,6 +634,7 @@ class Connection(rpc_common.Connection): self.reactor.wait() def consume_in_thread(self): + _get_matchmaker().start_heartbeat() self.reactor.consume_in_thread() diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py index b7e8f3b..2ff5ecc 100644 --- a/openstack/common/rpc/matchmaker.py +++ b/openstack/common/rpc/matchmaker.py @@ -22,6 +22,7 @@ import contextlib import itertools import json +import eventlet from oslo.config import cfg from openstack.common.gettextutils import _ @@ -33,6 +34,12 @@ matchmaker_opts = [ cfg.StrOpt('matchmaker_ringfile', default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), + cfg.IntOpt('matchmaker_heartbeat_freq', + default='300', + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default='600', + help='Heartbeat time-to-live.'), ] CONF = cfg.CONF @@ -70,12 +77,73 @@ class Binding(object): class MatchMakerBase(object): - """Match Maker Base Class.""" - + """ + Match Maker Base Class. + Build off HeartbeatMatchMakerBase if building a + heartbeat-capable MatchMaker. + """ def __init__(self): # Array of tuples. Index [2] toggles negation, [3] is last-if-true self.bindings = [] + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """ + Acknowledge that a key.host is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + pass + + def is_alive(self, topic, host): + """ + Checks if a host is alive. + """ + pass + + def expire(self, topic, host): + """ + Explicitly expire a host's registration. + """ + pass + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """ + Unregister a topic. + """ + pass + + def start_heartbeat(self): + """ + Spawn heartbeat greenthread. + """ + pass + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + pass + def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) @@ -99,6 +167,103 @@ class MatchMakerBase(object): return workers +class HeartbeatMatchMakerBase(MatchMakerBase): + """ + Base for a heart-beat capable MatchMaker. + Provides common methods for registering, + unregistering, and maintaining heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """ + Acknowledge that a host.topic is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """ + Implements registration logic. + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """ + Implements de-registration logic. + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """ + Unregister a topic. + """ + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + + def start_heartbeat(self): + """ + Implementation of MatchMakerBase.start_heartbeat + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if len(self.hosts) == 0: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + if self._heart: + self._heart.kill() + + class DirectBinding(Binding): """ Specifies a host in the key via a '.' character diff --git a/openstack/common/rpc/matchmaker_redis.py b/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 0000000..367c2c9 --- /dev/null +++ b/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from openstack.common import importutils +from openstack.common import log as logging +from openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default=None, + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """ + Return a list of all hosts. + """ + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """ + MatchMaker registering and looking-up hosts with a Redis server. + """ + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.StrictRedis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute() diff --git a/tests/unit/rpc/matchmaker_common.py b/tests/unit/rpc/matchmaker_common.py new file mode 100644 index 0000000..154162d --- /dev/null +++ b/tests/unit/rpc/matchmaker_common.py @@ -0,0 +1,72 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + + +LOG = logging.getLogger(__name__) + + +class _MatchMakerDirectedTopicTestCase(object): + """Mix-in to test directed topics.""" + def test_firstval_is_directed_topic(self): + matches = self.driver.queues(self.topic) + topics = map(lambda x: x[0], matches) + + for topic in topics: + self.assertTrue('.' in topic) + + +class _MatchMakerTestCase(_MatchMakerDirectedTopicTestCase): + def test_valid_host_matches(self): + queues = self.driver.queues(self.topic) + matched_hosts = map(lambda x: x[1], queues) + + for host in matched_hosts: + self.assertTrue(host in self.hosts) + + def test_fanout_host_matches(self): + """For known hosts, see if they're in fanout.""" + queues = self.driver.queues("fanout~" + self.topic) + matched_hosts = map(lambda x: x[1], queues) + + LOG.info("Received result from matchmaker: %s", queues) + for host in self.hosts: + self.assertTrue(host in matched_hosts) + + +class _MatchMakerDynRegTestCase(object): + def test_registers_host(self): + """ + Registers a host, ensures it is registered. + """ + self.driver.register(self.topic, self.hosts[0]) + + match = self.driver.queues(self.topic) + self.assertEqual(match[0][1], self.hosts[0]) + + def test_unregister(self): + """ + Tests that hosts unregister cleanly. + Registers a host, ensures it is registered, + then unregisters and ensures is no + longer registered. + """ + # Can only unregister if registrations work. + self.test_registers_host() + + self.driver.unregister(self.topic, self.hosts[0]) + self.assertEqual(self.driver.queues(self.topic), []) diff --git a/tests/unit/rpc/test_matchmaker.py b/tests/unit/rpc/test_matchmaker.py index 08c2387..bcc7ef8 100644 --- a/tests/unit/rpc/test_matchmaker.py +++ b/tests/unit/rpc/test_matchmaker.py @@ -17,41 +17,14 @@ import logging from openstack.common.rpc import matchmaker +from tests.unit.rpc import matchmaker_common as common from tests import utils LOG = logging.getLogger(__name__) -class _MatchMakerDirectedTopicTestCase(object): - """Mix-in to test directed topics.""" - def test_firstval_is_directed_topic(self): - matches = self.driver.queues(self.topic) - topics = map(lambda x: x[0], matches) - - for topic in topics: - self.assertTrue('.' in topic) - - -class _MatchMakerTestCase(_MatchMakerDirectedTopicTestCase): - def test_valid_host_matches(self): - queues = self.driver.queues(self.topic) - matched_hosts = map(lambda x: x[1], queues) - - for host in matched_hosts: - self.assertTrue(host in self.hosts) - - def test_fanout_host_matches(self): - """For known hosts, see if they're in fanout.""" - queues = self.driver.queues("fanout~" + self.topic) - matched_hosts = map(lambda x: x[1], queues) - - LOG.info("Received result from matchmaker: %s", queues) - for host in self.hosts: - self.assertTrue(host in matched_hosts) - - -class MatchMakerFileTestCase(utils.BaseTestCase, _MatchMakerTestCase): +class MatchMakerFileTestCase(utils.BaseTestCase, common._MatchMakerTestCase): def setUp(self): super(MatchMakerFileTestCase, self).setUp() self.topic = "test" @@ -62,7 +35,8 @@ class MatchMakerFileTestCase(utils.BaseTestCase, _MatchMakerTestCase): self.driver = matchmaker.MatchMakerRing(ring) -class MatchMakerLocalhostTestCase(utils.BaseTestCase, _MatchMakerTestCase): +class MatchMakerLocalhostTestCase(utils.BaseTestCase, + common._MatchMakerTestCase): def setUp(self): super(MatchMakerLocalhostTestCase, self).setUp() self.driver = matchmaker.MatchMakerLocalhost() @@ -70,7 +44,7 @@ class MatchMakerLocalhostTestCase(utils.BaseTestCase, _MatchMakerTestCase): self.hosts = ['localhost'] -class MatchMakerDirectExchangeTestCase(utils.BaseTestCase, +class MatchMakerDirectExchangeTestCase(utils.BaseTestCase, common. _MatchMakerDirectedTopicTestCase): """Test lookups against a directed topic.""" def setUp(self): diff --git a/tests/unit/rpc/test_matchmaker_redis.py b/tests/unit/rpc/test_matchmaker_redis.py new file mode 100644 index 0000000..a1eb0a0 --- /dev/null +++ b/tests/unit/rpc/test_matchmaker_redis.py @@ -0,0 +1,140 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import eventlet + +from openstack.common import importutils +from openstack.common.rpc import matchmaker_redis as matchmaker +from tests.unit.rpc import matchmaker_common as common +from tests import utils + +redis = importutils.try_import('redis') + +LOG = logging.getLogger(__name__) + + +class MatchMakerRedisLookupTestCase(utils.BaseTestCase, + common._MatchMakerTestCase): + """Test lookups against the Redis matchmaker""" + def setUp(self): + super(MatchMakerRedisLookupTestCase, self).setUp() + + if not redis: + self.skipTest("Redis required for test.") + + self.config(matchmaker_heartbeat_ttl=1) + + self.topic = "test" + self.hosts = map(lambda x: 'mockhost-' + str(x), range(1, 10)) + + try: + self.driver = matchmaker.MatchMakerRedis() + self.driver.redis.connection_pool.connection_kwargs[ + 'socket_timeout'] = 1 + # Test the connection + self.driver.redis.ping() + except redis.exceptions.ConnectionError: + raise self.skipTest("Redis server not available.") + + # Wipe all entries... + for host in self.hosts: + self.driver.unregister(self.topic, host) + + for h in self.hosts: + self.driver.register(self.topic, h) + + self.driver.start_heartbeat() + + def tearDown(self): + super(MatchMakerRedisLookupTestCase, self).tearDown() + if not redis: + self.skipTest("Redis required for test.") + self.driver.stop_heartbeat() + + +class MatchMakerRedisHeartbeatTestCase(utils.BaseTestCase, + common._MatchMakerDynRegTestCase): + """Test the ability to register and perform heartbeats.""" + def setUp(self): + super(MatchMakerRedisHeartbeatTestCase, self).setUp() + + if not redis: + self.skipTest("Redis required for test.") + + self.config(matchmaker_heartbeat_ttl=1) + self.driver = matchmaker.MatchMakerRedis() + self.topic = "test" + self.hosts = map(lambda x: 'mockhost-' + str(x), range(1, 10)) + + try: + self.driver = matchmaker.MatchMakerRedis() + self.driver.redis.connection_pool.connection_kwargs[ + 'socket_timeout'] = 1 + # Test the connection + self.driver.redis.ping() + except redis.exceptions.ConnectionError: + raise self.skipTest("Redis server not available.") + + # Wipe all entries... + for host in self.hosts: + self.driver.unregister(self.topic, host) + + def test_expires_set(self): + """ + Test that expirations are set. + """ + self.driver.register(self.topic, self.hosts[0]) + + ttl = self.driver.redis.ttl('.'.join((self.topic, self.hosts[0]))) + self.assertTrue(ttl > -1) + + def test_expires_hosts(self): + """ + Tests that hosts expire. + Registers a host, ensures it is registered, + then waits for it to expire. Ensures is no + longer registered. + """ + self.driver.register(self.topic, self.hosts[0]) + + key_host = '.'.join((self.topic, self.hosts[0])) + + ttl = self.driver.redis.ttl(key_host) + eventlet.sleep(ttl + 1) + ttl2 = self.driver.redis.ttl(key_host) + + # Tests that host has actually expired. + self.assertEqual(ttl2, -1) + + def test_expired_hosts_removed(self): + """ + Test that expired hosts are removed from results. + """ + self.test_expires_hosts() + self.assertEqual(self.driver.queues(self.topic), []) + + +class MatchMakerRedisTestCase(utils.BaseTestCase): + """Generic tests that do not require a Redis server.""" + def test_redis_import_exception(self): + """ + Try initializing an object without redis. + """ + matchmaker.redis = None + self.assertRaises(ImportError, matchmaker.MatchMakerRedis) + reload(matchmaker) diff --git a/tools/test-requires b/tools/test-requires index 179071d..735744d 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -15,6 +15,7 @@ pep8==1.3.3 pyflakes pylint pyzmq==2.2.0.1 +redis setuptools-git>=0.4 sphinx testtools>=0.9.22 |