#!/usr/bin/env python # # Copyright (C) 2012 Red Hat, Inc. All rights reserved. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # Authors: Michal Minar # """ Unit tests for LMI_SoftwareFileCheck provider. """ import hashlib import os import pywbem import re import subprocess import unittest from collections import namedtuple import common RE_CHECKSUM = re.compile(r'^([0-9a-fA-F]+)\s+.*') PassedFlags = namedtuple("PassedFlags", #pylint: disable=C0103 "exists, type, size, mode, checksum, dev, ltarget, uid, gid, mtime") class TestSoftwareFileCheck(common.SoftwareBaseTestCase): #pylint: disable=R0904 """ Basic cim operations test. """ CLASS_NAME = "LMI_SoftwareFileCheck" KEYS = ( "CheckID", "Name", "SoftwareElementID", "SoftwareElementState" , "TargetOperatingSystem", "Version") hash_num2algo = { 1 : hashlib.md5, #pylint: disable=E1101 2 : hashlib.sha1, #pylint: disable=E1101 8 : hashlib.sha256, #pylint: disable=E1101 9 : hashlib.sha384, #pylint: disable=E1101 10 : hashlib.sha512, #pylint: disable=E1101 11 : hashlib.sha224 #pylint: disable=E1101 } hash_num2cmd = { 1 : "md5sum", 2 : "sha1sum", 8 : "sha256sum", 9 : "sha384sum", 10 : "sha512sum", 11 : "sha224sum" } hash_num2length = { 1 : 32, 2 : 40, 8 : 64, 9 : 96, 10 : 128, 11 : 56 } def make_op(self, pkg, filename, newer=True): """ @return object path of LMI_SoftwareFileCheck """ objpath = self.objpath.copy() objpath["Name"] = filename objpath["Version"] = getattr(pkg, "up_ver" if newer else "ver") objpath["CheckID"] = "%s#%s" % (pkg.name, filename) objpath["SoftwareElementState"] = pywbem.Uint16(2) objpath["TargetOperatingSystem"] = pywbem.Uint16(36) objpath["SoftwareElementID"] = pkg.get_nevra(newer) return objpath def assertEqualSEID(self, id1, id2): """ This is for comparison of SoftwareElementID property values. """ if ( not isinstance(id1, basestring) or not isinstance(id2, basestring)): return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) match1 = common.RE_NEVRA.match(id1) match2 = common.RE_NEVRA.match(id2) if not match1 or not match2: return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) if any( match1.group(g) != match2.group(g) for g in ('name', 'ver', 'rel', 'arch')): return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) epoch1 = match1.group('epoch') epoch2 = match2.group('epoch') if not epoch1: epoch1 = '0' if not epoch2: epoch2 = '0' if epoch1 != epoch2: return common.SoftwareBaseTestCase.assertEqual(self, id1, id2) return True def assertEqual(self, op1, op2): """ This is override for object paths, that allows some differences (like missing epoch in SoftwareElementID). """ if ( not isinstance(op1, pywbem.CIMInstanceName) or not isinstance(op2, pywbem.CIMInstanceName) or op1.classname != op2.classname or op1.namespace != op2.namespace or op1.keybindings.keys() != op2.keybindings.keys() or any(op1[a] != op2[a] for a in ( 'Name', 'Version', 'CheckID', 'SoftwareElementState' , 'TargetOperatingSystem'))): return common.SoftwareBaseTestCase.assertEqual(self, op1, op2) return self.assertEqualSEID( op1['SoftwareElementID'], op2['SoftwareElementID']) return True def make_checksum_str(self, csumnum, filename): """ @return checksum of installed file """ return RE_CHECKSUM.match(subprocess.check_output([ self.hash_num2cmd[csumnum], filename])).group(1).lower() def do_check_symlink(self, filepath, inst): """ Assert some details about symlink. """ target = os.readlink(filepath) stats = os.lstat(filepath) self.assertEqual(inst["FileType"], pywbem.Uint16(3)) self.assertEqual(inst["FileUserID"], stats.st_uid) self.assertEqual(inst["FileGroupID"], stats.st_gid) self.assertEqual(inst["FileMode"], stats.st_mode) self.assertEqual(inst["FileSize"], stats.st_size) self.assertEqual(inst["LinkTarget"], target) self.assertEqual(inst["FileChecksum"], "0"*self.hash_num2length[inst["FileChecksumType"]]) self.assertEqual(inst["LastModificationTime"], int(stats.st_mtime)) # modify owner prev_user = inst["FileUserID"] #prev_mtime = inst["LastModificationTime"] #prev_pflags = PassedFlags(*inst["PassedFlags"]) os.lchown(filepath, stats.st_uid + 1, -1) inst = self.conn.GetInstance(InstanceName=inst.path) self.assertEqual(inst["FileUserID"], inst["ExpectedFileUserID"] + 1) self.assertEqual(inst["FileUserID"], prev_user + 1) self.assertEqual(inst["FileGroupID"], stats.st_gid) cur_pflags = PassedFlags(*inst["PassedFlags"]) #self.assertGreater(inst["LastModificationTime"], prev_mtime) self.assertTrue(cur_pflags.exists) self.assertTrue(cur_pflags.type) self.assertTrue(cur_pflags.size) self.assertTrue(cur_pflags.mode) self.assertTrue(cur_pflags.checksum) self.assertTrue(cur_pflags.dev) self.assertTrue(cur_pflags.ltarget) self.assertFalse(cur_pflags.uid) self.assertTrue(cur_pflags.gid) self.assertTrue(cur_pflags.mtime) # modify link_target os.remove(filepath) os.symlink("wrong" + "*"*len(inst["ExpectedLinkTarget"]), filepath) os.lchown(filepath, inst["ExpectedFileUserID"], inst["ExpectedFileGroupID"]) inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) self.assertGreater(len(inst["LinkTarget"]), len(inst["ExpectedLinkTarget"])) self.assertTrue(cur_pflags.exists) self.assertTrue(cur_pflags.type) self.assertFalse(cur_pflags.size) self.assertTrue(cur_pflags.mode) self.assertTrue(cur_pflags.checksum) self.assertTrue(cur_pflags.dev) self.assertFalse(cur_pflags.ltarget) self.assertTrue(cur_pflags.uid) self.assertTrue(cur_pflags.gid) self.assertTrue(cur_pflags.mtime) def do_check_directory(self, filepath, inst): """ Assert some details about directory. """ stats = os.lstat(filepath) self.assertEqual(inst["FileType"], pywbem.Uint16(2)) self.assertEqual(inst["FileUserID"], stats.st_uid) self.assertEqual(inst["FileGroupID"], stats.st_gid) self.assertEqual(inst["FileMode"], stats.st_mode) self.assertEqual(inst["FileSize"], stats.st_size) self.assertIs(inst["LinkTarget"], None) self.assertEqual(inst["FileChecksum"], "0"*self.hash_num2length[inst["FileChecksumType"]]) self.assertEqual(inst["LastModificationTime"], int(stats.st_mtime)) def do_check_file(self, filepath, inst): """ Assert some details about regurar file. """ stats = os.lstat(filepath) self.assertEqual(inst["FileType"], pywbem.Uint16(1)) self.assertEqual(inst["FileUserID"], stats.st_uid) self.assertEqual(inst["FileGroupID"], stats.st_gid) self.assertEqual(inst["FileMode"], stats.st_mode) self.assertEqual(inst["FileSize"], stats.st_size) self.assertIs(inst["LinkTarget"], None) csum = self.make_checksum_str(inst['FileChecksumType'], filepath) self.assertEqual(inst["FileChecksum"].lower(), csum) self.assertEqual(inst["ExpectedLastModificationTime"], inst["LastModificationTime"]) self.assertEqual(inst["LastModificationTime"], int(stats.st_mtime)) # make it longer with open(filepath, "a+") as fobj: fobj.write("data\n") inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) self.assertGreater(inst["FileSize"], inst["ExpectedFileSize"]) self.assertGreater(inst["LastModificationTime"], inst["ExpectedLastModificationTime"]) self.assertTrue(cur_pflags.exists) self.assertTrue(cur_pflags.type) self.assertFalse(cur_pflags.size) self.assertTrue(cur_pflags.mode) self.assertFalse(cur_pflags.checksum) self.assertTrue(cur_pflags.dev) self.assertTrue(cur_pflags.ltarget) self.assertTrue(cur_pflags.uid) self.assertTrue(cur_pflags.gid) self.assertFalse(cur_pflags.mtime) # change file type os.remove(filepath) os.symlink(filepath, filepath) os.lchown(filepath, inst["ExpectedFileUserID"], inst["ExpectedFileGroupID"]) inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) self.assertNotEqual(inst["LinkTarget"], inst["ExpectedLinkTarget"]) self.assertNotEqual(inst["FileSize"], inst["ExpectedFileSize"]) self.assertGreater(inst["LastModificationTime"], inst["ExpectedLastModificationTime"]) self.assertNotEqual(inst["FileType"], inst["ExpectedFileType"]) self.assertEqual(inst["FileType"], pywbem.Uint16(3)) self.assertTrue(cur_pflags.exists) self.assertFalse(cur_pflags.type) self.assertFalse(cur_pflags.size) self.assertFalse(cur_pflags.mode) self.assertFalse(cur_pflags.checksum) self.assertTrue(cur_pflags.dev) self.assertFalse(cur_pflags.ltarget) self.assertTrue(cur_pflags.uid) self.assertTrue(cur_pflags.gid) self.assertFalse(cur_pflags.mtime) # remove it os.remove(filepath) inst = self.conn.GetInstance(InstanceName=inst.path) cur_pflags = PassedFlags(*inst["PassedFlags"]) self.assertEqual(inst["LinkTarget"], inst["ExpectedLinkTarget"]) self.assertNotEqual(inst["FileSize"], inst["ExpectedFileSize"]) self.assertIsNone(inst["LastModificationTime"]) self.assertIsNone(inst["FileType"]) self.assertIsNone(inst["FileChecksum"]) self.assertIsNone(inst["FileMode"]) self.assertIsNone(inst["FileUserID"]) self.assertIsNone(inst["FileGroupID"]) self.assertFalse(cur_pflags.exists) self.assertFalse(cur_pflags.type) self.assertFalse(cur_pflags.size) self.assertFalse(cur_pflags.mode) self.assertFalse(cur_pflags.checksum) self.assertFalse(cur_pflags.dev) self.assertFalse(cur_pflags.ltarget) self.assertFalse(cur_pflags.uid) self.assertFalse(cur_pflags.gid) self.assertFalse(cur_pflags.mtime) @common.mark_dangerous def test_get_instance(self): """ Tests GetInstance call. """ for pkg in self.pkgdb: files = self.pkg_files[pkg.name] if ( common.is_installed(pkg) and not common.verify_pkg(pkg.name)): common.remove_pkg(pkg.name) if not common.is_installed(pkg.name): common.install_pkg(pkg) self.assertTrue(common.is_installed(pkg)) for filepath in files: objpath = self.make_op(pkg, filepath) inst = self.conn.GetInstance( InstanceName=objpath, LocalOnly=False) self.assertIsInstance(inst, pywbem.CIMInstance) self.assertEqual(inst.path, objpath) for key in self.KEYS: if key.lower() == "softwareelementid": self.assertEqualSEID(inst[key], objpath[key]) else: self.assertEqual(inst[key], objpath[key]) self.assertTrue(inst["FileExists"]) self.assertEqual(len(inst["PassedFlags"]), 10) self.assertTrue(all( filepath is True for filepath in inst["PassedFlags"])) for prop in ( "FileType", "FileUserID", "FileGroupID" , "FileMode", "FileSize", "LinkTarget" , "FileChecksum", "FileModeFlags"): self.assertEqual(inst["Expected"+prop], inst[prop]) if os.path.islink(filepath): self.do_check_symlink(filepath, inst) elif os.path.isdir(filepath): self.do_check_directory(filepath, inst) elif os.path.isfile(filepath): self.do_check_file(filepath, inst) @common.mark_dangerous def test_invoke_method(self): """ Tests Invoke method invocation. """ for pkg in self.pkgdb: files = self.pkg_files[pkg.name] if common.is_installed(pkg) and not common.verify_pkg(pkg.name): common.remove_pkg(pkg.name) if not common.is_installed(pkg.name): common.install_pkg(pkg) self.assertTrue(common.is_installed(pkg)) for filepath in files: objpath = self.make_op(pkg, filepath) (rval, _) = self.conn.InvokeMethod( MethodName="Invoke", ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(0)) # modify file if os.path.isfile(filepath): os.remove(filepath) else: os.lchown(filepath, os.stat(filepath).st_uid + 1, -1) (rval, _) = self.conn.InvokeMethod( MethodName="Invoke", ObjectName=objpath) self.assertEqual(rval, pywbem.Uint32(2)) def suite(): """For unittest loaders.""" return unittest.TestLoader().loadTestsFromTestCase( TestSoftwareFileCheck) if __name__ == "__main__": unittest.main()