summaryrefslogtreecommitdiffstats
path: root/openstack
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-01-24 16:28:43 -0500
committerEric Windisch <eric@cloudscaling.com>2013-03-01 10:54:05 -0500
commitcb26af207dbcea5fc88ad5f66da80fba5d76cb04 (patch)
tree9da183e01a2da3f99bb904cbd2b9c15863f2c7b8 /openstack
parent2b418be864a5aa5ba135f7651e83051cf3bf9ce6 (diff)
downloadoslo-cb26af207dbcea5fc88ad5f66da80fba5d76cb04.tar.gz
oslo-cb26af207dbcea5fc88ad5f66da80fba5d76cb04.tar.xz
oslo-cb26af207dbcea5fc88ad5f66da80fba5d76cb04.zip
Redis-based matchmaker
Introduces a reference implementation of a matchmaker (based on redis) that supports dynamic host/topic registrations, host expiration, and hooks for consuming applications to acknowledge or neg-acknowledge topic.host service availability. Implements blueprint advanced-matchmaking Change-Id: I8608d2089fca118b0e369f2eb5c6aedacf6821fe
Diffstat (limited to 'openstack')
-rw-r--r--openstack/common/rpc/impl_zmq.py8
-rw-r--r--openstack/common/rpc/matchmaker.py169
-rw-r--r--openstack/common/rpc/matchmaker_redis.py149
3 files changed, 324 insertions, 2 deletions
diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py
index cf3fdc8..6c31378 100644
--- a/openstack/common/rpc/impl_zmq.py
+++ b/openstack/common/rpc/impl_zmq.py
@@ -594,6 +594,9 @@ class Connection(rpc_common.Connection):
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
+ # Register with matchmaker.
+ _get_matchmaker().register(topic, CONF.rpc_zmq_host)
+
# Subscription scenarios
if fanout:
sock_type = zmq.SUB
@@ -620,6 +623,10 @@ class Connection(rpc_common.Connection):
self.topics.append(topic)
def close(self):
+ _get_matchmaker().stop_heartbeat()
+ for topic in self.topics:
+ _get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
+
self.reactor.close()
self.topics = []
@@ -627,6 +634,7 @@ class Connection(rpc_common.Connection):
self.reactor.wait()
def consume_in_thread(self):
+ _get_matchmaker().start_heartbeat()
self.reactor.consume_in_thread()
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index b7e8f3b..2ff5ecc 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -22,6 +22,7 @@ import contextlib
import itertools
import json
+import eventlet
from oslo.config import cfg
from openstack.common.gettextutils import _
@@ -33,6 +34,12 @@ matchmaker_opts = [
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
+ cfg.IntOpt('matchmaker_heartbeat_freq',
+ default='300',
+ help='Heartbeat frequency'),
+ cfg.IntOpt('matchmaker_heartbeat_ttl',
+ default='600',
+ help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@@ -70,12 +77,73 @@ class Binding(object):
class MatchMakerBase(object):
- """Match Maker Base Class."""
-
+ """
+ Match Maker Base Class.
+ Build off HeartbeatMatchMakerBase if building a
+ heartbeat-capable MatchMaker.
+ """
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
+ self.no_heartbeat_msg = _('Matchmaker does not implement '
+ 'registration or heartbeat.')
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ pass
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a key.host is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ pass
+
+ def is_alive(self, topic, host):
+ """
+ Checks if a host is alive.
+ """
+ pass
+
+ def expire(self, topic, host):
+ """
+ Explicitly expire a host's registration.
+ """
+ pass
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ pass
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ pass
+
+ def start_heartbeat(self):
+ """
+ Spawn heartbeat greenthread.
+ """
+ pass
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ pass
+
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@@ -99,6 +167,103 @@ class MatchMakerBase(object):
return workers
+class HeartbeatMatchMakerBase(MatchMakerBase):
+ """
+ Base for a heart-beat capable MatchMaker.
+ Provides common methods for registering,
+ unregistering, and maintaining heartbeats.
+ """
+ def __init__(self):
+ self.hosts = set()
+ self._heart = None
+ self.host_topic = {}
+
+ super(HeartbeatMatchMakerBase, self).__init__()
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ for key, host in self.host_topic:
+ self.ack_alive(key, host)
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a host.topic is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ raise NotImplementedError("Must implement ack_alive")
+
+ def backend_register(self, key, host):
+ """
+ Implements registration logic.
+ Called by register(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_register")
+
+ def backend_unregister(self, key, key_host):
+ """
+ Implements de-registration logic.
+ Called by unregister(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_unregister")
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ self.hosts.add(host)
+ self.host_topic[(key, host)] = host
+ key_host = '.'.join((key, host))
+
+ self.backend_register(key, key_host)
+
+ self.ack_alive(key, host)
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ if (key, host) in self.host_topic:
+ del self.host_topic[(key, host)]
+
+ self.hosts.discard(host)
+ self.backend_unregister(key, '.'.join((key, host)))
+
+ LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+
+ def start_heartbeat(self):
+ """
+ Implementation of MatchMakerBase.start_heartbeat
+ Launches greenthread looping send_heartbeats(),
+ yielding for CONF.matchmaker_heartbeat_freq seconds
+ between iterations.
+ """
+ if len(self.hosts) == 0:
+ raise MatchMakerException(
+ _("Register before starting heartbeat."))
+
+ def do_heartbeat():
+ while True:
+ self.send_heartbeats()
+ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+ self._heart = eventlet.spawn(do_heartbeat)
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ if self._heart:
+ self._heart.kill()
+
+
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character
diff --git a/openstack/common/rpc/matchmaker_redis.py b/openstack/common/rpc/matchmaker_redis.py
new file mode 100644
index 0000000..367c2c9
--- /dev/null
+++ b/openstack/common/rpc/matchmaker_redis.py
@@ -0,0 +1,149 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 Cloudscaling Group, Inc
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""
+The MatchMaker classes should accept a Topic or Fanout exchange key and
+return keys for direct exchanges, per (approximate) AMQP parlance.
+"""
+
+from oslo.config import cfg
+
+from openstack.common import importutils
+from openstack.common import log as logging
+from openstack.common.rpc import matchmaker as mm_common
+
+redis = importutils.try_import('redis')
+
+
+matchmaker_redis_opts = [
+ cfg.StrOpt('host',
+ default='127.0.0.1',
+ help='Host to locate redis'),
+ cfg.IntOpt('port',
+ default=6379,
+ help='Use this port to connect to redis host.'),
+ cfg.StrOpt('password',
+ default=None,
+ help='Password for Redis server. (optional)'),
+]
+
+CONF = cfg.CONF
+opt_group = cfg.OptGroup(name='matchmaker_redis',
+ title='Options for Redis-based MatchMaker')
+CONF.register_group(opt_group)
+CONF.register_opts(matchmaker_redis_opts, opt_group)
+LOG = logging.getLogger(__name__)
+
+
+class RedisExchange(mm_common.Exchange):
+ def __init__(self, matchmaker):
+ self.matchmaker = matchmaker
+ self.redis = matchmaker.redis
+ super(RedisExchange, self).__init__()
+
+
+class RedisTopicExchange(RedisExchange):
+ """
+ Exchange where all topic keys are split, sending to second half.
+ i.e. "compute.host" sends a message to "compute" running on "host"
+ """
+ def run(self, topic):
+ while True:
+ member_name = self.redis.srandmember(topic)
+
+ if not member_name:
+ # If this happens, there are no
+ # longer any members.
+ break
+
+ if not self.matchmaker.is_alive(topic, member_name):
+ continue
+
+ host = member_name.split('.', 1)[1]
+ return [(member_name, host)]
+ return []
+
+
+class RedisFanoutExchange(RedisExchange):
+ """
+ Return a list of all hosts.
+ """
+ def run(self, topic):
+ topic = topic.split('~', 1)[1]
+ hosts = self.redis.smembers(topic)
+ good_hosts = filter(
+ lambda host: self.matchmaker.is_alive(topic, host), hosts)
+
+ return [(x, x.split('.', 1)[1]) for x in good_hosts]
+
+
+class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
+ """
+ MatchMaker registering and looking-up hosts with a Redis server.
+ """
+ def __init__(self):
+ super(MatchMakerRedis, self).__init__()
+
+ if not redis:
+ raise ImportError("Failed to import module redis.")
+
+ self.redis = redis.StrictRedis(
+ host=CONF.matchmaker_redis.host,
+ port=CONF.matchmaker_redis.port,
+ password=CONF.matchmaker_redis.password)
+
+ self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
+ self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
+ self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
+
+ def ack_alive(self, key, host):
+ topic = "%s.%s" % (key, host)
+ if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
+ # If we could not update the expiration, the key
+ # might have been pruned. Re-register, creating a new
+ # key in Redis.
+ self.register(self.topic_host[host], host)
+
+ def is_alive(self, topic, host):
+ if self.redis.ttl(host) == -1:
+ self.expire(topic, host)
+ return False
+ return True
+
+ def expire(self, topic, host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.delete(host)
+ pipe.srem(topic, host)
+ pipe.execute()
+
+ def backend_register(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.sadd(key, key_host)
+
+ # No value is needed, we just
+ # care if it exists. Sets aren't viable
+ # because only keys can expire.
+ pipe.set(key_host, '')
+
+ pipe.execute()
+
+ def backend_unregister(self, key, key_host):
+ with self.redis.pipeline() as pipe:
+ pipe.multi()
+ pipe.srem(key, key_host)
+ pipe.delete(key_host)
+ pipe.execute()