diff options
-rw-r--r-- | nova/exception.py | 5 | ||||
-rw-r--r-- | nova/servicegroup/api.py | 3 | ||||
-rw-r--r-- | nova/servicegroup/drivers/zk.py | 157 | ||||
-rw-r--r-- | nova/tests/servicegroup/test_zk_driver.py | 65 |
4 files changed, 229 insertions, 1 deletions
diff --git a/nova/exception.py b/nova/exception.py index 3b20b7e78..ede512a97 100644 --- a/nova/exception.py +++ b/nova/exception.py @@ -1110,3 +1110,8 @@ class CryptoCRLFileNotFound(FileNotFound): class InstanceRecreateNotSupported(Invalid): message = _('Instance recreate is not implemented by this virt driver.') + + +class ServiceGroupUnavailable(NovaException): + message = _("The service from servicegroup driver %(driver) is " + "temporarily unavailable.") diff --git a/nova/servicegroup/api.py b/nova/servicegroup/api.py index 0fb30cdf5..793d4bfc9 100644 --- a/nova/servicegroup/api.py +++ b/nova/servicegroup/api.py @@ -40,7 +40,8 @@ class API(object): _driver = None _driver_name_class_mapping = { - 'db': 'nova.servicegroup.drivers.db.DbDriver' + 'db': 'nova.servicegroup.drivers.db.DbDriver', + 'zk': 'nova.servicegroup.drivers.zk.ZooKeeperDriver' } @lockutils.synchronized('nova.servicegroup.api.new', 'nova-') diff --git a/nova/servicegroup/drivers/zk.py b/nova/servicegroup/drivers/zk.py new file mode 100644 index 000000000..c4e3f7b71 --- /dev/null +++ b/nova/servicegroup/drivers/zk.py @@ -0,0 +1,157 @@ +# Copyright (c) AT&T 2012-2013 Yun Mao <yunmao@gmail.com> +# +# Copyright (c) IBM 2012 Pavel Kravchenco <kpavel at il dot ibm dot com> +# Alexey Roytman <roytman at il dot ibm dot com> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import eventlet +import evzookeeper +from evzookeeper import membership +import zookeeper + +from nova import exception +from nova.openstack.common import cfg +from nova.openstack.common import log as logging +from nova.servicegroup import api +from nova import utils + + +zk_driver_opts = [ + cfg.StrOpt('address', + default=None, + help='The ZooKeeper addresses for servicegroup service in the ' + 'format of host1:port,host2:port,host3:port'), + cfg.IntOpt('recv_timeout', + default=4000, + help='recv_timeout parameter for the zk session'), + cfg.StrOpt('sg_prefix', + default="/servicegroups", + help='The prefix used in ZooKeeper to store ephemeral nodes'), + cfg.IntOpt('sg_retry_interval', + default=5, + help='Number of seconds to wait until retrying to join the ' + 'session'), + ] + +CONF = cfg.CONF +CONF.register_opts(zk_driver_opts, group="zk") + +LOG = logging.getLogger(__name__) + + +class ZooKeeperDriver(api.ServiceGroupDriver): + """ZooKeeper driver for the service group API.""" + + def __init__(self, *args, **kwargs): + """Create the zk session object.""" + null = open(os.devnull, "w") + self._session = evzookeeper.ZKSession(CONF.zk.address, + recv_timeout= + CONF.zk.recv_timeout, + zklog_fd=null) + self._memberships = {} + self._monitors = {} + # Make sure the prefix exists + try: + self._session.create(CONF.zk.sg_prefix, "", + acl=[evzookeeper.ZOO_OPEN_ACL_UNSAFE]) + except zookeeper.NodeExistsException: + pass + + super(ZooKeeperDriver, self).__init__() + + def join(self, member_id, group, service=None): + """Join the given service with its group.""" + LOG.debug(_('ZooKeeperDriver: join new member %(id)s to the ' + '%(gr)s group, service=%(sr)s'), + {'id': member_id, 'gr': group, 'sr': service}) + member = self._memberships.get((group, member_id), None) + if member is None: + # the first time to join. Generate a new object + path = "%s/%s" % (CONF.zk.sg_prefix, group) + try: + member = membership.Membership(self._session, path, member_id) + except RuntimeError: + LOG.exception(_("Unable to join. It is possible that either " + "another node exists with the same name, or " + "this node just restarted. We will try " + "again in a short while to make sure.")) + eventlet.sleep(CONF.zk.sg_retry_interval) + member = membership.Membership(self._session, path, member_id) + self._memberships[(group, member_id)] = member + return FakeLoopingCall(self, member_id, group) + + def leave(self, member_id, group): + """Remove the given member from the service group.""" + LOG.debug(_('ZooKeeperDriver.leave: %(member)s from group %(group)s'), + {'member': member_id, 'group': group}) + try: + key = (group, member_id) + member = self._memberships[key] + member.leave() + del self._memberships[key] + except KeyError: + LOG.error(_('ZooKeeperDriver.leave: %(id)s has not joined to the ' + '%(gr)s group'), {'id': member_id, 'gr': group}) + + def is_up(self, service_ref): + group_id = service_ref['topic'] + member_id = service_ref['host'] + all_members = self.get_all(group_id) + return member_id in all_members + + def get_all(self, group_id): + """Return all members in a list, or a ServiceGroupUnavailable + exception. + """ + monitor = self._monitors.get(group_id, None) + if monitor is None: + path = "%s/%s" % (CONF.zk.sg_prefix, group_id) + monitor = membership.MembershipMonitor(self._session, path) + self._monitors[group_id] = monitor + # Note(maoy): When initialized for the first time, it takes a + # while to retrieve all members from zookeeper. To prevent + # None to be returned, we sleep 5 sec max to wait for data to + # be ready. + for _retry in range(50): + eventlet.sleep(0.1) + all_members = monitor.get_all() + if all_members is not None: + return all_members + all_members = monitor.get_all() + if all_members is None: + raise exception.ServiceGroupUnavailable(driver="ZooKeeperDriver") + return all_members + + +class FakeLoopingCall(utils.LoopingCallBase): + """The fake Looping Call implementation, created for backward + compatibility with a membership based on DB. + """ + def __init__(self, driver, host, group): + self._driver = driver + self._group = group + self._host = host + + def stop(self): + self._driver.leave(self._host, self._group) + + def start(self, interval, initial_delay=None): + pass + + def wait(self): + pass diff --git a/nova/tests/servicegroup/test_zk_driver.py b/nova/tests/servicegroup/test_zk_driver.py new file mode 100644 index 000000000..753153bb5 --- /dev/null +++ b/nova/tests/servicegroup/test_zk_driver.py @@ -0,0 +1,65 @@ +# Copyright (c) AT&T 2012-2013 Yun Mao <yunmao@gmail.com> +# Copyright (c) IBM 2012 Alexey Roytman <roytman at il dot ibm dot com>. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Test the ZooKeeper driver for servicegroup. + +You need to install ZooKeeper locally and related dependencies +to run the test. It's unclear how to install python-zookeeper lib +in venv so you might have to run the test without it. + +To set up in Ubuntu 12.04: +$ sudo apt-get install zookeeper zookeeperd python-zookeeper +$ sudo pip install evzookeeper +$ nosetests nova.tests.servicegroup.test_zk_driver +""" + +import eventlet + +from nova import servicegroup +from nova import test + + +class ZKServiceGroupTestCase(test.TestCase): + + def setUp(self): + super(ZKServiceGroupTestCase, self).setUp() + servicegroup.API._driver = None + try: + from nova.servicegroup.drivers import zk + _unused = zk + except ImportError: + self.skipTest("Unable to test due to lack of ZooKeeper") + self.flags(servicegroup_driver='zk') + self.flags(address='localhost:2181', group="zk") + + def test_join_leave(self): + self.servicegroup_api = servicegroup.API() + service_id = {'topic': 'unittest', 'host': 'serviceA'} + self.servicegroup_api.join(service_id['host'], service_id['topic']) + self.assertTrue(self.servicegroup_api.service_is_up(service_id)) + self.servicegroup_api.leave(service_id['host'], service_id['topic']) + # make sure zookeeper is updated and watcher is triggered + eventlet.sleep(1) + self.assertFalse(self.servicegroup_api.service_is_up(service_id)) + + def test_stop(self): + self.servicegroup_api = servicegroup.API() + service_id = {'topic': 'unittest', 'host': 'serviceA'} + pulse = self.servicegroup_api.join(service_id['host'], + service_id['topic'], None) + self.assertTrue(self.servicegroup_api.service_is_up(service_id)) + pulse.stop() + eventlet.sleep(1) + self.assertFalse(self.servicegroup_api.service_is_up(service_id)) |