summaryrefslogtreecommitdiffstats
path: root/openstack/common/rpc/matchmaker.py
diff options
context:
space:
mode:
authorEric Windisch <eric@cloudscaling.com>2013-01-24 16:28:43 -0500
committerEric Windisch <eric@cloudscaling.com>2013-03-01 10:54:05 -0500
commitcb26af207dbcea5fc88ad5f66da80fba5d76cb04 (patch)
tree9da183e01a2da3f99bb904cbd2b9c15863f2c7b8 /openstack/common/rpc/matchmaker.py
parent2b418be864a5aa5ba135f7651e83051cf3bf9ce6 (diff)
downloadoslo-cb26af207dbcea5fc88ad5f66da80fba5d76cb04.tar.gz
oslo-cb26af207dbcea5fc88ad5f66da80fba5d76cb04.tar.xz
oslo-cb26af207dbcea5fc88ad5f66da80fba5d76cb04.zip
Redis-based matchmaker
Introduces a reference implementation of a matchmaker (based on redis) that supports dynamic host/topic registrations, host expiration, and hooks for consuming applications to acknowledge or neg-acknowledge topic.host service availability. Implements blueprint advanced-matchmaking Change-Id: I8608d2089fca118b0e369f2eb5c6aedacf6821fe
Diffstat (limited to 'openstack/common/rpc/matchmaker.py')
-rw-r--r--openstack/common/rpc/matchmaker.py169
1 files changed, 167 insertions, 2 deletions
diff --git a/openstack/common/rpc/matchmaker.py b/openstack/common/rpc/matchmaker.py
index b7e8f3b..2ff5ecc 100644
--- a/openstack/common/rpc/matchmaker.py
+++ b/openstack/common/rpc/matchmaker.py
@@ -22,6 +22,7 @@ import contextlib
import itertools
import json
+import eventlet
from oslo.config import cfg
from openstack.common.gettextutils import _
@@ -33,6 +34,12 @@ matchmaker_opts = [
cfg.StrOpt('matchmaker_ringfile',
default='/etc/nova/matchmaker_ring.json',
help='Matchmaker ring file (JSON)'),
+ cfg.IntOpt('matchmaker_heartbeat_freq',
+ default='300',
+ help='Heartbeat frequency'),
+ cfg.IntOpt('matchmaker_heartbeat_ttl',
+ default='600',
+ help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
@@ -70,12 +77,73 @@ class Binding(object):
class MatchMakerBase(object):
- """Match Maker Base Class."""
-
+ """
+ Match Maker Base Class.
+ Build off HeartbeatMatchMakerBase if building a
+ heartbeat-capable MatchMaker.
+ """
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
+ self.no_heartbeat_msg = _('Matchmaker does not implement '
+ 'registration or heartbeat.')
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ pass
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a key.host is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ pass
+
+ def is_alive(self, topic, host):
+ """
+ Checks if a host is alive.
+ """
+ pass
+
+ def expire(self, topic, host):
+ """
+ Explicitly expire a host's registration.
+ """
+ pass
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ pass
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ pass
+
+ def start_heartbeat(self):
+ """
+ Spawn heartbeat greenthread.
+ """
+ pass
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ pass
+
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
@@ -99,6 +167,103 @@ class MatchMakerBase(object):
return workers
+class HeartbeatMatchMakerBase(MatchMakerBase):
+ """
+ Base for a heart-beat capable MatchMaker.
+ Provides common methods for registering,
+ unregistering, and maintaining heartbeats.
+ """
+ def __init__(self):
+ self.hosts = set()
+ self._heart = None
+ self.host_topic = {}
+
+ super(HeartbeatMatchMakerBase, self).__init__()
+
+ def send_heartbeats(self):
+ """
+ Send all heartbeats.
+ Use start_heartbeat to spawn a heartbeat greenthread,
+ which loops this method.
+ """
+ for key, host in self.host_topic:
+ self.ack_alive(key, host)
+
+ def ack_alive(self, key, host):
+ """
+ Acknowledge that a host.topic is alive.
+ Used internally for updating heartbeats,
+ but may also be used publically to acknowledge
+ a system is alive (i.e. rpc message successfully
+ sent to host)
+ """
+ raise NotImplementedError("Must implement ack_alive")
+
+ def backend_register(self, key, host):
+ """
+ Implements registration logic.
+ Called by register(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_register")
+
+ def backend_unregister(self, key, key_host):
+ """
+ Implements de-registration logic.
+ Called by unregister(self,key,host)
+ """
+ raise NotImplementedError("Must implement backend_unregister")
+
+ def register(self, key, host):
+ """
+ Register a host on a backend.
+ Heartbeats, if applicable, may keepalive registration.
+ """
+ self.hosts.add(host)
+ self.host_topic[(key, host)] = host
+ key_host = '.'.join((key, host))
+
+ self.backend_register(key, key_host)
+
+ self.ack_alive(key, host)
+
+ def unregister(self, key, host):
+ """
+ Unregister a topic.
+ """
+ if (key, host) in self.host_topic:
+ del self.host_topic[(key, host)]
+
+ self.hosts.discard(host)
+ self.backend_unregister(key, '.'.join((key, host)))
+
+ LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
+
+ def start_heartbeat(self):
+ """
+ Implementation of MatchMakerBase.start_heartbeat
+ Launches greenthread looping send_heartbeats(),
+ yielding for CONF.matchmaker_heartbeat_freq seconds
+ between iterations.
+ """
+ if len(self.hosts) == 0:
+ raise MatchMakerException(
+ _("Register before starting heartbeat."))
+
+ def do_heartbeat():
+ while True:
+ self.send_heartbeats()
+ eventlet.sleep(CONF.matchmaker_heartbeat_freq)
+
+ self._heart = eventlet.spawn(do_heartbeat)
+
+ def stop_heartbeat(self):
+ """
+ Destroys the heartbeat greenthread.
+ """
+ if self._heart:
+ self._heart.kill()
+
+
class DirectBinding(Binding):
"""
Specifies a host in the key via a '.' character