diff options
| author | Akira Yoshiyama <akirayoshiyama@gmail.com> | 2013-02-09 14:26:04 +0000 |
|---|---|---|
| committer | Akira Yoshiyama <akirayoshiyama@gmail.com> | 2013-02-13 14:31:12 +0000 |
| commit | 625e074033bc4d4b42f2ef641a69dd425965ac8f (patch) | |
| tree | 88d8bb7f53499752cd75ae4e6de9aa3c7e7bb602 /nova/servicegroup | |
| parent | d980805880c681881504e269e03130e4452630ab (diff) | |
| download | nova-625e074033bc4d4b42f2ef641a69dd425965ac8f.tar.gz nova-625e074033bc4d4b42f2ef641a69dd425965ac8f.tar.xz nova-625e074033bc4d4b42f2ef641a69dd425965ac8f.zip | |
Added a service heartbeat driver using Memcached.
Today the heartbeat information of Nova services/nodes
is maintained in the DB, while each service updates the
corresponding record in the Service table periodically
(by default -- every 10 seconds), specifying the timestamp
of the last update. This mechanism is highly inefficient
and does not scale. E.g., maintaining the heartbeat
information for 1,000 nodes/services would require 100 DB
updates per second (just for the heartbeat).
This patch adds nova.servicegroup.drivers.memcached, a
service heartbeat driver using Memcached. You can reduce
DB updates with it.
blueprint memcached-service-heartbeat
Change-Id: I60bdb1cfbce1fea051f276ebfd6ccc4ad8fe6d2b
Diffstat (limited to 'nova/servicegroup')
| -rw-r--r-- | nova/servicegroup/api.py | 9 | ||||
| -rw-r--r-- | nova/servicegroup/drivers/mc.py | 108 |
2 files changed, 114 insertions, 3 deletions
diff --git a/nova/servicegroup/api.py b/nova/servicegroup/api.py index 6dc1aa6d1..057a44103 100644 --- a/nova/servicegroup/api.py +++ b/nova/servicegroup/api.py @@ -28,8 +28,10 @@ import random LOG = logging.getLogger(__name__) _default_driver = 'db' servicegroup_driver_opt = cfg.StrOpt('servicegroup_driver', - default=_default_driver, - help='The driver for servicegroup service.') + default=_default_driver, + help='The driver for servicegroup ' + 'service (valid options are: ' + 'db, zk, mc)') CONF = cfg.CONF CONF.register_opt(servicegroup_driver_opt) @@ -40,7 +42,8 @@ class API(object): _driver = None _driver_name_class_mapping = { 'db': 'nova.servicegroup.drivers.db.DbDriver', - 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver' + 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver', + 'mc': 'nova.servicegroup.drivers.mc.MemcachedDriver' } def __new__(cls, *args, **kwargs): diff --git a/nova/servicegroup/drivers/mc.py b/nova/servicegroup/drivers/mc.py new file mode 100644 index 000000000..c5048a04c --- /dev/null +++ b/nova/servicegroup/drivers/mc.py @@ -0,0 +1,108 @@ +# Service heartbeat driver using Memcached +# Copyright (c) 2013 Akira Yoshiyama <akirayoshiyama at gmail dot com> +# +# This is derived from nova/servicegroup/drivers/db.py. +# Copyright (c) IBM 2012 Pavel Kravchenco <kpavel at il dot ibm dot com> +# Alexey Roytman <roytman at il dot ibm dot com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nova.common import memorycache +from nova import conductor +from nova import context +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.openstack.common import timeutils +from nova.servicegroup import api +from nova import utils + + +CONF = cfg.CONF +CONF.import_opt('service_down_time', 'nova.service') +CONF.import_opt('memcached_servers', 'nova.common.memorycache') + + +LOG = logging.getLogger(__name__) + + +class MemcachedDriver(api.ServiceGroupDriver): + + def __init__(self, *args, **kwargs): + test = kwargs.get('test') + if not CONF.memcached_servers and not test: + raise RuntimeError(_('memcached_servers not defined')) + self.mc = memorycache.get_client() + self.db_allowed = kwargs.get('db_allowed', True) + self.conductor_api = conductor.API(use_local=self.db_allowed) + + def join(self, member_id, group_id, service=None): + """Join the given service with its group.""" + + msg = _('Memcached_Driver: join new ServiceGroup member ' + '%(member_id)s to the %(group_id)s group, ' + 'service = %(service)s') + LOG.debug(msg, locals()) + if service is None: + raise RuntimeError(_('service is a mandatory argument for ' + 'Memcached based ServiceGroup driver')) + report_interval = service.report_interval + if report_interval: + pulse = utils.FixedIntervalLoopingCall(self._report_state, service) + pulse.start(interval=report_interval, + initial_delay=report_interval) + return pulse + + def is_up(self, service_ref): + """Moved from nova.utils + Check whether a service is up based on last heartbeat. + """ + key = "%(topic)s:%(host)s" % service_ref + return self.mc.get(str(key)) is not None + + def get_all(self, group_id): + """ + Returns ALL members of the given group + """ + LOG.debug(_('Memcached_Driver: get_all members of the %s group') % + group_id) + rs = [] + ctxt = context.get_admin_context() + services = self.conductor_api.service_get_all_by_topic(ctxt, group_id) + for service in services: + if self.is_up(service): + rs.append(service['host']) + return rs + + def _report_state(self, service): + """Update the state of this service in the datastore.""" + ctxt = context.get_admin_context() + try: + key = "%(topic)s:%(host)s" % service.service_ref + # memcached has data expiration time capability. + # set(..., time=CONF.service_down_time) uses it and + # reduces key-deleting code. + self.mc.set(str(key), + timeutils.utcnow(), + time=CONF.service_down_time) + + # TODO(termie): make this pattern be more elegant. + if getattr(service, 'model_disconnected', False): + service.model_disconnected = False + LOG.error(_('Recovered model server connection!')) + + # TODO(vish): this should probably only catch connection errors + except Exception: # pylint: disable=W0702 + if not getattr(service, 'model_disconnected', False): + service.model_disconnected = True + LOG.exception(_('model server went away')) |
