diff options
| author | Jenkins <jenkins@review.openstack.org> | 2013-02-20 02:56:56 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2013-02-20 02:56:56 +0000 |
| commit | c8fc2fba5fd628bdec76850ab8b0706d458bd206 (patch) | |
| tree | 1abcce68701affff787218213290e257289f46c4 /nova/servicegroup | |
| parent | 2aa9070adb2c6c88680c32eaaa4709e335861aaa (diff) | |
| parent | 625e074033bc4d4b42f2ef641a69dd425965ac8f (diff) | |
| download | nova-c8fc2fba5fd628bdec76850ab8b0706d458bd206.tar.gz nova-c8fc2fba5fd628bdec76850ab8b0706d458bd206.tar.xz nova-c8fc2fba5fd628bdec76850ab8b0706d458bd206.zip | |
Merge "Added a service heartbeat driver using Memcached."
Diffstat (limited to 'nova/servicegroup')
| -rw-r--r-- | nova/servicegroup/api.py | 9 | ||||
| -rw-r--r-- | nova/servicegroup/drivers/mc.py | 108 |
2 files changed, 114 insertions, 3 deletions
diff --git a/nova/servicegroup/api.py b/nova/servicegroup/api.py index 6dc1aa6d1..057a44103 100644 --- a/nova/servicegroup/api.py +++ b/nova/servicegroup/api.py @@ -28,8 +28,10 @@ import random LOG = logging.getLogger(__name__) _default_driver = 'db' servicegroup_driver_opt = cfg.StrOpt('servicegroup_driver', - default=_default_driver, - help='The driver for servicegroup service.') + default=_default_driver, + help='The driver for servicegroup ' + 'service (valid options are: ' + 'db, zk, mc)') CONF = cfg.CONF CONF.register_opt(servicegroup_driver_opt) @@ -40,7 +42,8 @@ class API(object): _driver = None _driver_name_class_mapping = { 'db': 'nova.servicegroup.drivers.db.DbDriver', - 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver' + 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver', + 'mc': 'nova.servicegroup.drivers.mc.MemcachedDriver' } def __new__(cls, *args, **kwargs): diff --git a/nova/servicegroup/drivers/mc.py b/nova/servicegroup/drivers/mc.py new file mode 100644 index 000000000..c5048a04c --- /dev/null +++ b/nova/servicegroup/drivers/mc.py @@ -0,0 +1,108 @@ +# Service heartbeat driver using Memcached +# Copyright (c) 2013 Akira Yoshiyama <akirayoshiyama at gmail dot com> +# +# This is derived from nova/servicegroup/drivers/db.py. +# Copyright (c) IBM 2012 Pavel Kravchenco <kpavel at il dot ibm dot com> +# Alexey Roytman <roytman at il dot ibm dot com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nova.common import memorycache +from nova import conductor +from nova import context +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils +from nova.servicegroup import api +from nova import utils + + +CONF = cfg.CONF +CONF.import_opt('service_down_time', 'nova.service') +CONF.import_opt('memcached_servers', 'nova.common.memorycache') + + +LOG = logging.getLogger(__name__) + + +class MemcachedDriver(api.ServiceGroupDriver): + + def __init__(self, *args, **kwargs): + test = kwargs.get('test') + if not CONF.memcached_servers and not test: + raise RuntimeError(_('memcached_servers not defined')) + self.mc = memorycache.get_client() + self.db_allowed = kwargs.get('db_allowed', True) + self.conductor_api = conductor.API(use_local=self.db_allowed) + + def join(self, member_id, group_id, service=None): + """Join the given service with its group.""" + + msg = _('Memcached_Driver: join new ServiceGroup member ' + '%(member_id)s to the %(group_id)s group, ' + 'service = %(service)s') + LOG.debug(msg, locals()) + if service is None: + raise RuntimeError(_('service is a mandatory argument for ' + 'Memcached based ServiceGroup driver')) + report_interval = service.report_interval + if report_interval: + pulse = utils.FixedIntervalLoopingCall(self._report_state, service) + pulse.start(interval=report_interval, + initial_delay=report_interval) + return pulse + + def is_up(self, service_ref): + """Moved from nova.utils + Check whether a service is up based on last heartbeat. + """ + key = "%(topic)s:%(host)s" % service_ref + return self.mc.get(str(key)) is not None + + def get_all(self, group_id): + """ + Returns ALL members of the given group + """ + LOG.debug(_('Memcached_Driver: get_all members of the %s group') % + group_id) + rs = [] + ctxt = context.get_admin_context() + services = self.conductor_api.service_get_all_by_topic(ctxt, group_id) + for service in services: + if self.is_up(service): + rs.append(service['host']) + return rs + + def _report_state(self, service): + """Update the state of this service in the datastore.""" + ctxt = context.get_admin_context() + try: + key = "%(topic)s:%(host)s" % service.service_ref + # memcached has data expiration time capability. + # set(..., time=CONF.service_down_time) uses it and + # reduces key-deleting code. + self.mc.set(str(key), + timeutils.utcnow(), + time=CONF.service_down_time) + + # TODO(termie): make this pattern be more elegant. + if getattr(service, 'model_disconnected', False): + service.model_disconnected = False + LOG.error(_('Recovered model server connection!')) + + # TODO(vish): this should probably only catch connection errors + except Exception: # pylint: disable=W0702 + if not getattr(service, 'model_disconnected', False): + service.model_disconnected = True + LOG.exception(_('model server went away')) |
