From f5923aed88c2d163d76e16de82e9a9999609372f Mon Sep 17 00:00:00 2001 From: Jan Synacek Date: Wed, 7 May 2014 10:51:30 +0200 Subject: logicalfile: test providers' configuration --- src/logicalfile/test/test_base.py | 70 +++++++++++++++++++++ src/logicalfile/test/test_basic.py | 121 ++++++++++++++++++++++++++++++------- src/python/lmi/test/cimbase.py | 19 ++++++ 3 files changed, 188 insertions(+), 22 deletions(-) (limited to 'src') diff --git a/src/logicalfile/test/test_base.py b/src/logicalfile/test/test_base.py index 8c7f231..05e7bb2 100644 --- a/src/logicalfile/test/test_base.py +++ b/src/logicalfile/test/test_base.py @@ -17,6 +17,7 @@ # Authors: Jan Synacek import os +import shutil import subprocess import pyudev @@ -51,3 +52,72 @@ class LogicalFileTestBase(cimbase.CIMTestCase): def tearDown(self): pass + + +class Configuration(): + """ + Ini-like configuration. + + {group : [(key1, value1), (key2, value2), ...]} + + """ + def __init__(self, path=None): + self.configuration = dict() + self.path = path + if path: + self.backupfile = path + '.backup_openlmi_logicalfile' + self.backup = None + self.original_path = None + + def add(self, group, options): + if self.configuration.has_key(group): + self.configuration[group].extend(options) + else: + self.configuration[group] = options + + def save(self): + def get_original_path(path): + """ + Compute already existing path when creating a directory. Returns + path to original directory or the whole path. + + Example: + Suppose there already exists '/tmp/stuff/' directory. + get_original_path('/tmp/stuff/new/directory/file.txt') will return '/tmp/stuff/'. + get_original_path('/tmp/stuff/file.txt') will return '/tmp/stuff/file.txt'. + """ + (head, tail) = os.path.split(path) + while (head != ''): + if os.path.exists(head): + return os.path.join(head, tail) + (head, tail) = os.path.split(head) + return None + + if os.path.exists(self.path): + # backup + self.backup = self.backupfile + shutil.move(self.path, self.backupfile) + else: + # create new + self.original_path = get_original_path(self.path) + d = os.path.dirname(self.path) + if (self.path != self.original_path): + os.makedirs(os.path.dirname(self.path)) + + f = open(self.path, 'w') + for group in self.configuration.keys(): + f.write('[' + group + ']' + '\n') + for (key, val) in self.configuration[group]: + f.write(key + '=' + val + '\n') + f.close() + + def dispose(self): + if self.backup: + shutil.move(self.backupfile, self.path) + elif self.original_path: + if self.path != self.original_path: + shutil.rmtree(self.original_path) + else: + os.unlink(self.path) + self.backup = None + self.original_path = None diff --git a/src/logicalfile/test/test_basic.py b/src/logicalfile/test/test_basic.py index 187d220..3577ee2 100644 --- a/src/logicalfile/test/test_basic.py +++ b/src/logicalfile/test/test_basic.py @@ -18,7 +18,7 @@ # # Authors: Jan Synacek -from test_base import LogicalFileTestBase +from test_base import LogicalFileTestBase, Configuration from lmi.test import unittest import pywbem import os @@ -84,6 +84,21 @@ class TestLogicalFile(LogicalFileTestBase): 'CreationClassName':'LMI_UnixDirectory', 'Name':self.testdir }) + + conf_path = '/etc/openlmi/logicalfile/logicalfile.conf' + self.configurations = dict() + conf = Configuration(conf_path) + conf.add('LMI_UnixDirectory', [('AllowMkdir', 'True')]) + self.configurations['mkdir'] = conf + + conf = Configuration(conf_path) + conf.add('LMI_UnixDirectory', [('AllowRmdir', 'True')]) + self.configurations['rmdir'] = conf + + conf = Configuration(conf_path) + conf.add('LMI_SymbolicLink', [('AllowSymlink', 'True')]) + self.configurations['symlink'] = conf + self._prepare() def tearDown(self): @@ -135,6 +150,8 @@ class TestLogicalFile(LogicalFileTestBase): def _cleanup(self): subprocess.call(['umount', self.transient_file['path']]) shutil.rmtree(self.testdir) + for conf in self.configurations.values(): + conf.dispose() def test_lmi_directorycontainsfile(self): assoc_class = 'LMI_DirectoryContainsFile' @@ -428,47 +445,89 @@ class TestLogicalFile(LogicalFileTestBase): self.assertEquals(rootdir['CSName'], self.SYSTEM_NAME) self.assertEquals(rootdir['Name'], '/') - def test_mkdir(self): - classname = 'LMI_UnixDirectory' + def _mkdir(self, path): cop = self.cop.copy() - path = self.testdir + '/mkdir-test' + cop['Name'] = path + inst = pywbem.CIMInstance('LMI_UnixDirectory', cop.keybindings) + self.wbemconnection.CreateInstance(inst) - def mkdir(path): - cop['Name'] = path - inst = pywbem.CIMInstance(classname, cop.keybindings) - self.wbemconnection.CreateInstance(inst) + def _test_mkdir(self, enabled_by_configuration): + classname = 'LMI_UnixDirectory' + path = self.testdir + '/mkdir-test' + cop = self.cop.copy() + cop['Name'] = path try: - mkdir(path) + self._mkdir(path) inst = self.wbemconnection.GetInstance(cop) self.assertEquals(inst.classname, classname) self.assertEquals(inst['CreationClassName'], classname) self.assertEquals(inst['Name'], path) + self._rmdir(path) except pywbem.CIMError as pe: - self.fail(pe[1]) + if enabled_by_configuration: + self.fail(pe[1]) + else: + self.assertEquals(pe[1], + "CIM_ERR_FAILED: Can't mkdir: disabled by provider configuration") self.assertRaises(pywbem.CIMError, - mkdir, + self._mkdir, '/cant/create/me') + def test_mkdir(self): + conf = self.configurations['mkdir'] + # should succeed + conf.save() + self.restart_cim() + self._test_mkdir(True) + conf.dispose() + # should fail + conf.add('LMI_UnixDirectory', [('AllowMkdir', 'False')]) + conf.save() + self.restart_cim() + self._test_mkdir(False) + conf.dispose() + + def _rmdir(self, path): + cop = self.cop.copy() + cop['Name'] = path + inst = pywbem.CIMInstance('LMI_UnixDirectory', cop.keybindings) + self.wbemconnection.DeleteInstance(cop) - def test_rmdir(self): - def rmdir(path): - cop = self.cop.copy() - cop['Name'] = path - inst = pywbem.CIMInstance('LMI_UnixDirectory', cop.keybindings) - self.wbemconnection.DeleteInstance(cop) + def _test_rmdir(self, enabled_by_configuration): + path = self.files['dir']['path'] + if not os.path.exists(path): + self._mkdir(path) try: - rmdir(self.files['dir']['path']) + self._rmdir(path) except pywbem.CIMError as pe: - self.fail(pe[1]) + if enabled_by_configuration: + self.fail(pe[1]) + else: + self.assertEquals(pe[1], + "CIM_ERR_FAILED: Can't rmdir: disabled by provider configuration") self.assertRaises(pywbem.CIMError, - rmdir, + self._rmdir, '/cant/remove/me') - def test_create_symlink(self): + def test_rmdir(self): + conf = self.configurations['rmdir'] + # should succeed + conf.save() + self.restart_cim() + self._test_rmdir(True) + conf.dispose() + # should fail + conf.add('LMI_UnixDirectory', [('AllowRmdir', 'False')]) + conf.save() + self.restart_cim() + self._test_rmdir(False) + conf.dispose() + + def _test_create_symlink(self, enabled_by_configuration): classname = 'LMI_SymbolicLink' cop = self.cop.copy() cop.classname = classname @@ -489,13 +548,31 @@ class TestLogicalFile(LogicalFileTestBase): self.assertEquals(inst['TargetFile'], target) self.assertEquals(inst['Name'], name) except pywbem.CIMError as pe: - self.fail(pe[1]) + if enabled_by_configuration: + self.fail(pe[1]) + else: + self.assertEquals(pe[1], + "CIM_ERR_FAILED: Can't create symlink: disabled by provider configuration") self.assertRaises(pywbem.CIMError, create_symlink, target, name) + def test_create_symlink(self): + conf = self.configurations['symlink'] + # should succeed + conf.save() + self.restart_cim() + self._test_create_symlink(True) + conf.dispose() + # should fail + conf.add('LMI_SymbolicLink', [('AllowSymlink', 'False')]) + conf.save() + self.restart_cim() + self._test_create_symlink(False) + conf.dispose() + # for now, this test just checks if FSName, FSCreationClassName, # CreationClassName and LFCreationClassName are properly ignored; empty # strings are used, since they should represent "ignored" pretty well diff --git a/src/python/lmi/test/cimbase.py b/src/python/lmi/test/cimbase.py index f5155a3..4582d8e 100644 --- a/src/python/lmi/test/cimbase.py +++ b/src/python/lmi/test/cimbase.py @@ -27,6 +27,7 @@ import BaseHTTPServer import pywbem import Queue import random +import subprocess import threading import time @@ -360,3 +361,21 @@ class CIMTestCase(base.BaseLmiTestCase): jobname = outparams['Job'] (ret, outparams) = self.finish_job(jobname, return_constructor) return (ret, outparams) + + def restart_cim(self): + """ + Restart CIMOM + """ + ret = self.log_run(["systemctl", "restart", self.cimom]) + time.sleep(1) + if ret == 0: + CIMTestCase._WBEMCONNECTION = None + return ret + + def log_run(self, args): + """ + Print arguments and run them. + args must be prepared for subprocess.call() + """ + print "Running:", " ".join(args) + return subprocess.call(args) -- cgit