summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--etc/nova/nova.conf.sample3
-rw-r--r--nova/servicegroup/api.py9
-rw-r--r--nova/servicegroup/drivers/mc.py108
-rw-r--r--nova/tests/servicegroup/test_mc_servicegroup.py220
4 files changed, 336 insertions, 4 deletions
diff --git a/etc/nova/nova.conf.sample b/etc/nova/nova.conf.sample
index 61350b183..9cbb8c1a5 100644
--- a/etc/nova/nova.conf.sample
+++ b/etc/nova/nova.conf.sample
@@ -1689,7 +1689,8 @@
# Options defined in nova.servicegroup.api
#
-# The driver for servicegroup service. (string value)
+# The driver for servicegroup service (valid options are: db,
+# zk, mc) (string value)
#servicegroup_driver=db
diff --git a/nova/servicegroup/api.py b/nova/servicegroup/api.py
index 6dc1aa6d1..057a44103 100644
--- a/nova/servicegroup/api.py
+++ b/nova/servicegroup/api.py
@@ -28,8 +28,10 @@ import random
LOG = logging.getLogger(__name__)
_default_driver = 'db'
servicegroup_driver_opt = cfg.StrOpt('servicegroup_driver',
- default=_default_driver,
- help='The driver for servicegroup service.')
+ default=_default_driver,
+ help='The driver for servicegroup '
+ 'service (valid options are: '
+ 'db, zk, mc)')
CONF = cfg.CONF
CONF.register_opt(servicegroup_driver_opt)
@@ -40,7 +42,8 @@ class API(object):
_driver = None
_driver_name_class_mapping = {
'db': 'nova.servicegroup.drivers.db.DbDriver',
- 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver'
+ 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver',
+ 'mc': 'nova.servicegroup.drivers.mc.MemcachedDriver'
}
def __new__(cls, *args, **kwargs):
diff --git a/nova/servicegroup/drivers/mc.py b/nova/servicegroup/drivers/mc.py
new file mode 100644
index 000000000..c5048a04c
--- /dev/null
+++ b/nova/servicegroup/drivers/mc.py
@@ -0,0 +1,108 @@
+# Service heartbeat driver using Memcached
+# Copyright (c) 2013 Akira Yoshiyama <akirayoshiyama at gmail dot com>
+#
+# This is derived from nova/servicegroup/drivers/db.py.
+# Copyright (c) IBM 2012 Pavel Kravchenco <kpavel at il dot ibm dot com>
+# Alexey Roytman <roytman at il dot ibm dot com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from nova.common import memorycache
+from nova import conductor
+from nova import context
+from nova.openstack.common import cfg
+from nova.openstack.common import log as logging
+from nova.openstack.common import timeutils
+from nova.servicegroup import api
+from nova import utils
+
+
+CONF = cfg.CONF
+CONF.import_opt('service_down_time', 'nova.service')
+CONF.import_opt('memcached_servers', 'nova.common.memorycache')
+
+
+LOG = logging.getLogger(__name__)
+
+
+class MemcachedDriver(api.ServiceGroupDriver):
+
+ def __init__(self, *args, **kwargs):
+ test = kwargs.get('test')
+ if not CONF.memcached_servers and not test:
+ raise RuntimeError(_('memcached_servers not defined'))
+ self.mc = memorycache.get_client()
+ self.db_allowed = kwargs.get('db_allowed', True)
+ self.conductor_api = conductor.API(use_local=self.db_allowed)
+
+ def join(self, member_id, group_id, service=None):
+ """Join the given service with its group."""
+
+ msg = _('Memcached_Driver: join new ServiceGroup member '
+ '%(member_id)s to the %(group_id)s group, '
+ 'service = %(service)s')
+ LOG.debug(msg, locals())
+ if service is None:
+ raise RuntimeError(_('service is a mandatory argument for '
+ 'Memcached based ServiceGroup driver'))
+ report_interval = service.report_interval
+ if report_interval:
+ pulse = utils.FixedIntervalLoopingCall(self._report_state, service)
+ pulse.start(interval=report_interval,
+ initial_delay=report_interval)
+ return pulse
+
+ def is_up(self, service_ref):
+ """Moved from nova.utils
+ Check whether a service is up based on last heartbeat.
+ """
+ key = "%(topic)s:%(host)s" % service_ref
+ return self.mc.get(str(key)) is not None
+
+ def get_all(self, group_id):
+ """
+ Returns ALL members of the given group
+ """
+ LOG.debug(_('Memcached_Driver: get_all members of the %s group') %
+ group_id)
+ rs = []
+ ctxt = context.get_admin_context()
+ services = self.conductor_api.service_get_all_by_topic(ctxt, group_id)
+ for service in services:
+ if self.is_up(service):
+ rs.append(service['host'])
+ return rs
+
+ def _report_state(self, service):
+ """Update the state of this service in the datastore."""
+ ctxt = context.get_admin_context()
+ try:
+ key = "%(topic)s:%(host)s" % service.service_ref
+ # memcached has data expiration time capability.
+ # set(..., time=CONF.service_down_time) uses it and
+ # reduces key-deleting code.
+ self.mc.set(str(key),
+ timeutils.utcnow(),
+ time=CONF.service_down_time)
+
+ # TODO(termie): make this pattern be more elegant.
+ if getattr(service, 'model_disconnected', False):
+ service.model_disconnected = False
+ LOG.error(_('Recovered model server connection!'))
+
+ # TODO(vish): this should probably only catch connection errors
+ except Exception: # pylint: disable=W0702
+ if not getattr(service, 'model_disconnected', False):
+ service.model_disconnected = True
+ LOG.exception(_('model server went away'))
diff --git a/nova/tests/servicegroup/test_mc_servicegroup.py b/nova/tests/servicegroup/test_mc_servicegroup.py
new file mode 100644
index 000000000..255184219
--- /dev/null
+++ b/nova/tests/servicegroup/test_mc_servicegroup.py
@@ -0,0 +1,220 @@
+# Copyright (c) 2013 Akira Yoshiyama <akirayoshiyama at gmail dot com>
+#
+# This is derived from test_db_servicegroup.py.
+# Copyright (c) IBM 2012 Alexey Roytman <roytman at il dot ibm dot com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import eventlet
+import fixtures
+
+from nova import context
+from nova import db
+from nova.openstack.common import timeutils
+from nova import service
+from nova import servicegroup
+from nova import test
+
+
+class ServiceFixture(fixtures.Fixture):
+
+ def __init__(self, host, binary, topic):
+ super(ServiceFixture, self).__init__()
+ self.host = host
+ self.binary = binary
+ self.topic = topic
+ self.serv = None
+
+ def setUp(self):
+ super(ServiceFixture, self).setUp()
+ self.serv = service.Service(self.host,
+ self.binary,
+ self.topic,
+ 'nova.tests.test_service.FakeManager',
+ 1, 1)
+ self.addCleanup(self.serv.kill)
+
+
+class MemcachedServiceGroupTestCase(test.TestCase):
+
+ def setUp(self):
+ super(MemcachedServiceGroupTestCase, self).setUp()
+ servicegroup.API._driver = None
+ self.flags(servicegroup_driver='mc')
+ self.down_time = 3
+ self.flags(enable_new_services=True)
+ self.flags(service_down_time=self.down_time)
+ self.servicegroup_api = servicegroup.API(test=True)
+ self._host = 'foo'
+ self._binary = 'nova-fake'
+ self._topic = 'unittest'
+ self._ctx = context.get_admin_context()
+
+ def test_memcached_driver(self):
+ serv = self.useFixture(
+ ServiceFixture(self._host, self._binary, self._topic)).serv
+ serv.start()
+ service_ref = db.service_get_by_args(self._ctx,
+ self._host,
+ self._binary)
+ hostkey = str("%s:%s" % (self._topic, self._host))
+ self.servicegroup_api._driver.mc.set(hostkey,
+ timeutils.utcnow(),
+ time=self.down_time)
+
+ self.assertTrue(self.servicegroup_api.service_is_up(service_ref))
+ eventlet.sleep(self.down_time + 1)
+ service_ref = db.service_get_by_args(self._ctx,
+ self._host,
+ self._binary)
+
+ self.assertTrue(self.servicegroup_api.service_is_up(service_ref))
+ serv.stop()
+ eventlet.sleep(self.down_time + 1)
+ service_ref = db.service_get_by_args(self._ctx,
+ self._host,
+ self._binary)
+ self.assertFalse(self.servicegroup_api.service_is_up(service_ref))
+
+ def test_get_all(self):
+ host1 = self._host + '_1'
+ host2 = self._host + '_2'
+ host3 = self._host + '_3'
+
+ serv1 = self.useFixture(
+ ServiceFixture(host1, self._binary, self._topic)).serv
+ serv1.start()
+
+ serv2 = self.useFixture(
+ ServiceFixture(host2, self._binary, self._topic)).serv
+ serv2.start()
+
+ serv3 = self.useFixture(
+ ServiceFixture(host3, self._binary, self._topic)).serv
+ serv3.start()
+
+ service_ref1 = db.service_get_by_args(self._ctx,
+ host1,
+ self._binary)
+ service_ref2 = db.service_get_by_args(self._ctx,
+ host2,
+ self._binary)
+ service_ref3 = db.service_get_by_args(self._ctx,
+ host3,
+ self._binary)
+
+ host1key = str("%s:%s" % (self._topic, host1))
+ host2key = str("%s:%s" % (self._topic, host2))
+ host3key = str("%s:%s" % (self._topic, host3))
+ self.servicegroup_api._driver.mc.set(host1key,
+ timeutils.utcnow(),
+ time=self.down_time)
+ self.servicegroup_api._driver.mc.set(host2key,
+ timeutils.utcnow(),
+ time=self.down_time)
+ self.servicegroup_api._driver.mc.set(host3key,
+ timeutils.utcnow(),
+ time=-1)
+
+ services = self.servicegroup_api.get_all(self._topic)
+
+ self.assertTrue(host1 in services)
+ self.assertTrue(host2 in services)
+ self.assertFalse(host3 in services)
+
+ service_id = self.servicegroup_api.get_one(self._topic)
+ self.assertTrue(service_id in services)
+
+ def test_service_is_up(self):
+ serv = self.useFixture(
+ ServiceFixture(self._host, self._binary, self._topic)).serv
+ serv.start()
+ service_ref = db.service_get_by_args(self._ctx,
+ self._host,
+ self._binary)
+ fake_now = 1000
+ down_time = 5
+ self.flags(service_down_time=down_time)
+ self.mox.StubOutWithMock(timeutils, 'utcnow_ts')
+ self.servicegroup_api = servicegroup.API()
+ hostkey = str("%s:%s" % (self._topic, self._host))
+
+ # Up (equal)
+ timeutils.utcnow_ts().AndReturn(fake_now)
+ timeutils.utcnow_ts().AndReturn(fake_now + down_time - 1)
+ self.mox.ReplayAll()
+ self.servicegroup_api._driver.mc.set(hostkey,
+ timeutils.utcnow(),
+ time=down_time)
+ result = self.servicegroup_api.service_is_up(service_ref)
+ self.assertTrue(result)
+
+ self.mox.ResetAll()
+ # Up
+ timeutils.utcnow_ts().AndReturn(fake_now)
+ timeutils.utcnow_ts().AndReturn(fake_now + down_time - 2)
+ self.mox.ReplayAll()
+ self.servicegroup_api._driver.mc.set(hostkey,
+ timeutils.utcnow(),
+ time=down_time)
+ result = self.servicegroup_api.service_is_up(service_ref)
+ self.assertTrue(result)
+
+ self.mox.ResetAll()
+ # Down
+ timeutils.utcnow_ts().AndReturn(fake_now)
+ timeutils.utcnow_ts().AndReturn(fake_now + down_time)
+ self.mox.ReplayAll()
+ self.servicegroup_api._driver.mc.set(hostkey,
+ timeutils.utcnow(),
+ time=down_time)
+ result = self.servicegroup_api.service_is_up(service_ref)
+ self.assertFalse(result)
+
+ self.mox.ResetAll()
+ # Down
+ timeutils.utcnow_ts().AndReturn(fake_now)
+ timeutils.utcnow_ts().AndReturn(fake_now + down_time + 1)
+ self.mox.ReplayAll()
+ self.servicegroup_api._driver.mc.set(hostkey,
+ timeutils.utcnow(),
+ time=down_time)
+ result = self.servicegroup_api.service_is_up(service_ref)
+ self.assertFalse(result)
+
+ self.mox.ResetAll()
+
+ def test_report_state(self):
+ serv = self.useFixture(
+ ServiceFixture(self._host, self._binary, self._topic)).serv
+ serv.start()
+ service_ref = db.service_get_by_args(self._ctx,
+ self._host,
+ self._binary)
+ self.servicegroup_api = servicegroup.API()
+
+ # updating model_disconnected
+ serv.model_disconnected = True
+ self.servicegroup_api._driver._report_state(serv)
+ self.assertFalse(serv.model_disconnected)
+
+ # handling exception
+ serv.model_disconnected = True
+ self.servicegroup_api._driver.mc = None
+ self.servicegroup_api._driver._report_state(serv)
+ self.assertTrue(serv.model_disconnected)
+
+ delattr(serv, 'model_disconnected')
+ self.servicegroup_api._driver.mc = None
+ self.servicegroup_api._driver._report_state(serv)
+ self.assertTrue(serv.model_disconnected)