summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--openstack/common/rpc/impl_zmq.py8
-rw-r--r--openstack/common/rpc/matchmaker.py169
-rw-r--r--openstack/common/rpc/matchmaker_redis.py149
-rw-r--r--tests/unit/rpc/matchmaker_common.py72
-rw-r--r--tests/unit/rpc/test_matchmaker.py36
-rw-r--r--tests/unit/rpc/test_matchmaker_redis.py140
-rw-r--r--tools/test-requires1
7 files changed, 542 insertions, 33 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index cf3fdc8..6c31378 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -594,6 +594,9 @@ class Connection(rpc_common.Connection):
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
+ # Register with matchmaker.
+ _get_matchmaker().register(topic, CONF.rpc_zmq_host)
+
# Subscription scenarios
if fanout:
sock_type = zmq.SUB
@@ -620,6 +623,10 @@ class Connection(rpc_common.Connection):
self.topics.append(topic)
def close(self):
+ _get_matchmaker().stop_heartbeat()
+ for topic in self.topics:
+ _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
+
self.reactor.close()
self.topics = []
@@ -627,6 +634,7 @@ class Connection(rpc_common.Connection):
self.reactor.wait()
def consume_in_thread(self):
+ _get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index b7e8f3b..2ff5ecc 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -22,6 +22,7 @@ import contextlib
import itertools
import json
+import eventlet
from oslo.config import cfg
from openstack.common.gettextutils import _
@@ -33,6 +34,12 @@ matchmaker_opts = [
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
+ cfg.IntOpt('matchmaker_heartbeat_freq',
+ default='300',
+ help='Heartbeat frequency'),
+ cfg.IntOpt('matchmaker_heartbeat_ttl',
+ default='600',
+ help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@@ -70,12 +77,73 @@ class Binding(object):
class MatchMakerBase(object):
- """Match Maker Base Class."""
-
+ """
+ Match Maker Base Class.
+ Build off HeartbeatMatchMakerBase if building a
+ heartbeat-capable MatchMaker.
+ """
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
+ self.no_heartbeat_msg = _('Matchmaker does not implement '
+ 'registration or heartbeat.')
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ pass
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a key.host is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ pass
+
+ def is_alive(self, topic, host):
+ """
+ Checks if a host is alive.
+ """
+ pass
+
+ def expire(self, topic, host):
+ """
+ Explicitly expire a host's registration.
+ """
+ pass
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ pass
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ pass
+
+ def start_heartbeat(self):
+ """
+ Spawn heartbeat greenthread.
+ """
+ pass
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ pass
+
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@@ -99,6 +167,103 @@ class MatchMakerBase(object):
return workers
+class HeartbeatMatchMakerBase(MatchMakerBase):
+ """
+ Base for a heart-beat capable MatchMaker.
+ Provides common methods for registering,
+ unregistering, and maintaining heartbeats.
+ """
+ def __init__(self):
+ self.hosts = set()
+ self._heart = None
+ self.host_topic = {}
+
+ super(HeartbeatMatchMakerBase, self).__init__()
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ for key, host in self.host_topic:
+ self.ack_alive(key, host)
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a host.topic is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ raise NotImplementedError("Must implement ack_alive")
+
+ def backend_register(self, key, host):
+ """
+ Implements registration logic.
+ Called by register(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_register")
+
+ def backend_unregister(self, key, key_host):
+ """
+ Implements de-registration logic.
+ Called by unregister(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_unregister")
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ self.hosts.add(host)
+ self.host_topic[(key, host)] = host
+ key_host = '.'.join((key, host))
+
+ self.backend_register(key, key_host)
+
+ self.ack_alive(key, host)
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ if (key, host) in self.host_topic:
+ del self.host_topic[(key, host)]
+
+ self.hosts.discard(host)
+ self.backend_unregister(key, '.'.join((key, host)))
+
+ LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+
+ def start_heartbeat(self):
+ """
+ Implementation of MatchMakerBase.start_heartbeat
+ Launches greenthread looping send_heartbeats(),
+ yielding for CONF.matchmaker_heartbeat_freq seconds
+ between iterations.
+ """
+ if len(self.hosts) == 0:
+ raise MatchMakerException(
+ _("Register before starting heartbeat."))
+
+ def do_heartbeat():
+ while True:
+ self.send_heartbeats()
+ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+ self._heart = eventlet.spawn(do_heartbeat)
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ if self._heart:
+ self._heart.kill()
+
+
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
diff --git a/openstack/common/rpc/matchmaker_redis.py b/openstack/common/rpc/matchmaker_redis.py
new file mode 100644
index 0000000..367c2c9
--- /dev/null
+++ b/openstack/common/rpc/matchmaker_redis.py
@@ -0,0 +1,149 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should accept a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+from oslo.config import cfg
+
+from openstack.common import importutils
+from openstack.common import log as logging
+from openstack.common.rpc import matchmaker as mm_common
+
+redis = importutils.try_import('redis')
+
+
+matchmaker_redis_opts = [
+ cfg.StrOpt('host',
+ default='127.0.0.1',
+ help='Host to locate redis'),
+ cfg.IntOpt('port',
+ default=6379,
+ help='Use this port to connect to redis host.'),
+ cfg.StrOpt('password',
+ default=None,
+ help='Password for Redis server. (optional)'),
+]
+
+CONF = cfg.CONF
+opt_group = cfg.OptGroup(name='matchmaker_redis',
+ title='Options for Redis-based MatchMaker')
+CONF.register_group(opt_group)
+CONF.register_opts(matchmaker_redis_opts, opt_group)
+LOG = logging.getLogger(__name__)
+
+
+class RedisExchange(mm_common.Exchange):
+ def __init__(self, matchmaker):
+ self.matchmaker = matchmaker
+ self.redis = matchmaker.redis
+ super(RedisExchange, self).__init__()
+
+
+class RedisTopicExchange(RedisExchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def run(self, topic):
+ while True:
+ member_name = self.redis.srandmember(topic)
+
+ if not member_name:
+ # If this happens, there are no
+ # longer any members.
+ break
+
+ if not self.matchmaker.is_alive(topic, member_name):
+ continue
+
+ host = member_name.split('.', 1)[1]
+ return [(member_name, host)]
+ return []
+
+
+class RedisFanoutExchange(RedisExchange):
+ """
+ Return a list of all hosts.
+ """
+ def run(self, topic):
+ topic = topic.split('~', 1)[1]
+ hosts = self.redis.smembers(topic)
+ good_hosts = filter(
+ lambda host: self.matchmaker.is_alive(topic, host), hosts)
+
+ return [(x, x.split('.', 1)[1]) for x in good_hosts]
+
+
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
+ """
+ MatchMaker registering and looking-up hosts with a Redis server.
+ """
+ def __init__(self):
+ super(MatchMakerRedis, self).__init__()
+
+ if not redis:
+ raise ImportError("Failed to import module redis.")
+
+ self.redis = redis.StrictRedis(
+ host=CONF.matchmaker_redis.host,
+ port=CONF.matchmaker_redis.port,
+ password=CONF.matchmaker_redis.password)
+
+ self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
+ self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
+ self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
+
+ def ack_alive(self, key, host):
+ topic = "%s.%s" % (key, host)
+ if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
+ # If we could not update the expiration, the key
+ # might have been pruned. Re-register, creating a new
+ # key in Redis.
+ self.register(self.topic_host[host], host)
+
+ def is_alive(self, topic, host):
+ if self.redis.ttl(host) == -1:
+ self.expire(topic, host)
+ return False
+ return True
+
+ def expire(self, topic, host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.delete(host)
+ pipe.srem(topic, host)
+ pipe.execute()
+
+ def backend_register(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.sadd(key, key_host)
+
+ # No value is needed, we just
+ # care if it exists. Sets aren't viable
+ # because only keys can expire.
+ pipe.set(key_host, '')
+
+ pipe.execute()
+
+ def backend_unregister(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.srem(key, key_host)
+ pipe.delete(key_host)
+ pipe.execute()
diff --git a/tests/unit/rpc/matchmaker_common.py b/tests/unit/rpc/matchmaker_common.py
new file mode 100644
index 0000000..154162d
--- /dev/null
+++ b/tests/unit/rpc/matchmaker_common.py
@@ -0,0 +1,72 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2012 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+class _MatchMakerDirectedTopicTestCase(object):
+ """Mix-in to test directed topics."""
+ def test_firstval_is_directed_topic(self):
+ matches = self.driver.queues(self.topic)
+ topics = map(lambda x: x[0], matches)
+
+ for topic in topics:
+ self.assertTrue('.' in topic)
+
+
+class _MatchMakerTestCase(_MatchMakerDirectedTopicTestCase):
+ def test_valid_host_matches(self):
+ queues = self.driver.queues(self.topic)
+ matched_hosts = map(lambda x: x[1], queues)
+
+ for host in matched_hosts:
+ self.assertTrue(host in self.hosts)
+
+ def test_fanout_host_matches(self):
+ """For known hosts, see if they're in fanout."""
+ queues = self.driver.queues("fanout~" + self.topic)
+ matched_hosts = map(lambda x: x[1], queues)
+
+ LOG.info("Received result from matchmaker: %s", queues)
+ for host in self.hosts:
+ self.assertTrue(host in matched_hosts)
+
+
+class _MatchMakerDynRegTestCase(object):
+ def test_registers_host(self):
+ """
+ Registers a host, ensures it is registered.
+ """
+ self.driver.register(self.topic, self.hosts[0])
+
+ match = self.driver.queues(self.topic)
+ self.assertEqual(match[0][1], self.hosts[0])
+
+ def test_unregister(self):
+ """
+ Tests that hosts unregister cleanly.
+ Registers a host, ensures it is registered,
+ then unregisters and ensures is no
+ longer registered.
+ """
+ # Can only unregister if registrations work.
+ self.test_registers_host()
+
+ self.driver.unregister(self.topic, self.hosts[0])
+ self.assertEqual(self.driver.queues(self.topic), [])
diff --git a/tests/unit/rpc/test_matchmaker.py b/tests/unit/rpc/test_matchmaker.py
index 08c2387..bcc7ef8 100644
--- a/tests/unit/rpc/test_matchmaker.py
+++ b/tests/unit/rpc/test_matchmaker.py
@@ -17,41 +17,14 @@
import logging
from openstack.common.rpc import matchmaker
+from tests.unit.rpc import matchmaker_common as common
from tests import utils
LOG = logging.getLogger(__name__)
-class _MatchMakerDirectedTopicTestCase(object):
- """Mix-in to test directed topics."""
- def test_firstval_is_directed_topic(self):
- matches = self.driver.queues(self.topic)
- topics = map(lambda x: x[0], matches)
-
- for topic in topics:
- self.assertTrue('.' in topic)
-
-
-class _MatchMakerTestCase(_MatchMakerDirectedTopicTestCase):
- def test_valid_host_matches(self):
- queues = self.driver.queues(self.topic)
- matched_hosts = map(lambda x: x[1], queues)
-
- for host in matched_hosts:
- self.assertTrue(host in self.hosts)
-
- def test_fanout_host_matches(self):
- """For known hosts, see if they're in fanout."""
- queues = self.driver.queues("fanout~" + self.topic)
- matched_hosts = map(lambda x: x[1], queues)
-
- LOG.info("Received result from matchmaker: %s", queues)
- for host in self.hosts:
- self.assertTrue(host in matched_hosts)
-
-
-class MatchMakerFileTestCase(utils.BaseTestCase, _MatchMakerTestCase):
+class MatchMakerFileTestCase(utils.BaseTestCase, common._MatchMakerTestCase):
def setUp(self):
super(MatchMakerFileTestCase, self).setUp()
self.topic = "test"
@@ -62,7 +35,8 @@ class MatchMakerFileTestCase(utils.BaseTestCase, _MatchMakerTestCase):
self.driver = matchmaker.MatchMakerRing(ring)
-class MatchMakerLocalhostTestCase(utils.BaseTestCase, _MatchMakerTestCase):
+class MatchMakerLocalhostTestCase(utils.BaseTestCase,
+ common._MatchMakerTestCase):
def setUp(self):
super(MatchMakerLocalhostTestCase, self).setUp()
self.driver = matchmaker.MatchMakerLocalhost()
@@ -70,7 +44,7 @@ class MatchMakerLocalhostTestCase(utils.BaseTestCase, _MatchMakerTestCase):
self.hosts = ['localhost']
-class MatchMakerDirectExchangeTestCase(utils.BaseTestCase,
+class MatchMakerDirectExchangeTestCase(utils.BaseTestCase, common.
_MatchMakerDirectedTopicTestCase):
"""Test lookups against a directed topic."""
def setUp(self):
diff --git a/tests/unit/rpc/test_matchmaker_redis.py b/tests/unit/rpc/test_matchmaker_redis.py
new file mode 100644
index 0000000..a1eb0a0
--- /dev/null
+++ b/tests/unit/rpc/test_matchmaker_redis.py
@@ -0,0 +1,140 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import eventlet
+
+from openstack.common import importutils
+from openstack.common.rpc import matchmaker_redis as matchmaker
+from tests.unit.rpc import matchmaker_common as common
+from tests import utils
+
+redis = importutils.try_import('redis')
+
+LOG = logging.getLogger(__name__)
+
+
+class MatchMakerRedisLookupTestCase(utils.BaseTestCase,
+ common._MatchMakerTestCase):
+ """Test lookups against the Redis matchmaker"""
+ def setUp(self):
+ super(MatchMakerRedisLookupTestCase, self).setUp()
+
+ if not redis:
+ self.skipTest("Redis required for test.")
+
+ self.config(matchmaker_heartbeat_ttl=1)
+
+ self.topic = "test"
+ self.hosts = map(lambda x: 'mockhost-' + str(x), range(1, 10))
+
+ try:
+ self.driver = matchmaker.MatchMakerRedis()
+ self.driver.redis.connection_pool.connection_kwargs[
+ 'socket_timeout'] = 1
+ # Test the connection
+ self.driver.redis.ping()
+ except redis.exceptions.ConnectionError:
+ raise self.skipTest("Redis server not available.")
+
+ # Wipe all entries...
+ for host in self.hosts:
+ self.driver.unregister(self.topic, host)
+
+ for h in self.hosts:
+ self.driver.register(self.topic, h)
+
+ self.driver.start_heartbeat()
+
+ def tearDown(self):
+ super(MatchMakerRedisLookupTestCase, self).tearDown()
+ if not redis:
+ self.skipTest("Redis required for test.")
+ self.driver.stop_heartbeat()
+
+
+class MatchMakerRedisHeartbeatTestCase(utils.BaseTestCase,
+ common._MatchMakerDynRegTestCase):
+ """Test the ability to register and perform heartbeats."""
+ def setUp(self):
+ super(MatchMakerRedisHeartbeatTestCase, self).setUp()
+
+ if not redis:
+ self.skipTest("Redis required for test.")
+
+ self.config(matchmaker_heartbeat_ttl=1)
+ self.driver = matchmaker.MatchMakerRedis()
+ self.topic = "test"
+ self.hosts = map(lambda x: 'mockhost-' + str(x), range(1, 10))
+
+ try:
+ self.driver = matchmaker.MatchMakerRedis()
+ self.driver.redis.connection_pool.connection_kwargs[
+ 'socket_timeout'] = 1
+ # Test the connection
+ self.driver.redis.ping()
+ except redis.exceptions.ConnectionError:
+ raise self.skipTest("Redis server not available.")
+
+ # Wipe all entries...
+ for host in self.hosts:
+ self.driver.unregister(self.topic, host)
+
+ def test_expires_set(self):
+ """
+ Test that expirations are set.
+ """
+ self.driver.register(self.topic, self.hosts[0])
+
+ ttl = self.driver.redis.ttl('.'.join((self.topic, self.hosts[0])))
+ self.assertTrue(ttl > -1)
+
+ def test_expires_hosts(self):
+ """
+ Tests that hosts expire.
+ Registers a host, ensures it is registered,
+ then waits for it to expire. Ensures is no
+ longer registered.
+ """
+ self.driver.register(self.topic, self.hosts[0])
+
+ key_host = '.'.join((self.topic, self.hosts[0]))
+
+ ttl = self.driver.redis.ttl(key_host)
+ eventlet.sleep(ttl + 1)
+ ttl2 = self.driver.redis.ttl(key_host)
+
+ # Tests that host has actually expired.
+ self.assertEqual(ttl2, -1)
+
+ def test_expired_hosts_removed(self):
+ """
+ Test that expired hosts are removed from results.
+ """
+ self.test_expires_hosts()
+ self.assertEqual(self.driver.queues(self.topic), [])
+
+
+class MatchMakerRedisTestCase(utils.BaseTestCase):
+ """Generic tests that do not require a Redis server."""
+ def test_redis_import_exception(self):
+ """
+ Try initializing an object without redis.
+ """
+ matchmaker.redis = None
+ self.assertRaises(ImportError, matchmaker.MatchMakerRedis)
+ reload(matchmaker)
diff --git a/tools/test-requires b/tools/test-requires
index 179071d..735744d 100644
--- a/tools/test-requires
+++ b/tools/test-requires
@@ -15,6 +15,7 @@ pep8==1.3.3
pyflakes
pylint
pyzmq==2.2.0.1
+redis
setuptools-git>=0.4
sphinx
testtools>=0.9.22