diff options
| author | Michael DeHaan <mdehaan@redhat.com> | 2008-03-27 16:50:58 -0400 |
|---|---|---|
| committer | Michael DeHaan <mdehaan@redhat.com> | 2008-03-27 16:50:58 -0400 |
| commit | 5b2e32746600a45af8ce85f645cb3c0d8ae2d084 (patch) | |
| tree | 597b9683cba53d3d3a31ec352dd9fdc9c60cc698 | |
| parent | 0be993e430bfe2c7c742f7f4d100fb73cb3e317c (diff) | |
| download | third_party-cobbler-5b2e32746600a45af8ce85f645cb3c0d8ae2d084.tar.gz third_party-cobbler-5b2e32746600a45af8ce85f645cb3c0d8ae2d084.tar.xz third_party-cobbler-5b2e32746600a45af8ce85f645cb3c0d8ae2d084.zip | |
Adding ownership module + tests and associated changes to cobblerd to make it
work a little better. This module is not fully tested yet, so don't use it
yet in production.
| -rw-r--r-- | cobbler/modules/authz_ownership.py | 155 | ||||
| -rw-r--r-- | cobbler/remote.py | 8 | ||||
| -rw-r--r-- | tests/tests.py | 109 |
3 files changed, 238 insertions, 34 deletions
diff --git a/cobbler/modules/authz_ownership.py b/cobbler/modules/authz_ownership.py new file mode 100644 index 0000000..9b271f4 --- /dev/null +++ b/cobbler/modules/authz_ownership.py @@ -0,0 +1,155 @@ +""" +Authorization module that allow users listed in +/etc/cobbler/users.conf to be permitted to access resources, with +the further restriction that cobbler objects can be edited to only +allow certain users/groups to access those specific objects. + +Copyright 2008, Red Hat, Inc +Michael DeHaan <mdehaan@redhat.com> + +This software may be freely redistributed under the terms of the GNU +general public license. + +You should have received a copy of the GNU General Public License +along with this program; if not, write to the Free Software +Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +""" + +import distutils.sysconfig +import ConfigParser +import sys +import os +from rhpl.translate import _, N_, textdomain, utf8 + +plib = distutils.sysconfig.get_python_lib() +mod_path="%s/cobbler" % plib +sys.path.insert(0, mod_path) + +import cexceptions +import utils + + +def register(): + """ + The mandatory cobbler module registration hook. + """ + return "authz" + +def __parse_config(debug=False): + etcfile='/etc/cobbler/users.conf' + if not os.path.exists(etcfile): + raise CX(_("/etc/cobbler/users.conf does not exist")) + config = ConfigParser.ConfigParser() + config.read(etcfile) + alldata = {} + sections = config.sections() + if debug: + print "[OWNERSHIP] sections=%s" % sections + for g in sections: + alldata[str(g)] = {} + opts = config.options(g) + if debug: + print "[OWNERSHIP] for group %s, users: %s" % (g,opts) + for o in opts: + alldata[g][o] = 1 + return alldata + + +def authorize(api_handle,user,resource,arg1=None,arg2=None,debug=False): + """ + Validate a user against a resource. + All users in the file are permitted by this module. + """ + + user_groups = __parse_config(debug) + if debug: + print "[OWNERSHIP] ------------" + print "can user %s do %s (arg1=%s)?" % (user,resource,arg1) + print "consult db: %s" % user_groups + + # classify the type of operation + save_or_remove = False + for criteria in ["save","remove","modify"]: + if resource.find(criteria) != -1: + save_or_remove = True + + # FIXME: is everyone allowed to copy? I think so. + # FIXME: deal with the problem of deleted parents and promotion + + found_user = False + for g in user_groups: + if user in user_groups[g]: + found_user = True + # if user is in the admin group, always authorize + # regardless of the ownership of the object. + if g == "admin": + if debug: + print "[OWNERSHIP] user % is an admin, PASS" % user + return 1 + break + + if not found_user: + # if the user isn't anywhere in the file, reject regardless + # they can still use read-only XMLRPC + if debug: + print "[OWNERSHIP] user %s not found in list, FAIL" % user + return 0 + if not save_or_remove: + # sufficient to allow access for non save/remove ops to all + # users for now, may want to refine later. + if debug: + print "[OWNERSHIP] user %s is cleared for non-edit ops, PASS" % user + return 1 + + # now we have a save_or_remove op, so we must check ownership + # of the object. remove ops pass in arg1 as a string name, + # saves pass in actual objects, so we must treat them differently. + + obj = None + if resource.find("remove") != -1: + if resource == "remove_distro": + obj = api_handle.find_distro(arg1) + elif resource == "remove_profile": + obj = api_handle.find_profile(arg1) + elif resource == "remove_system": + obj = api_handle.find_system(arg1) + elif resource == "remove_repo": + obj = api_handle.find_system(arg1) + else: + obj = arg1 + + # if the object has no ownership data, allow access regardless + if obj.owners is None or obj.owners == []: + if debug: + print "[OWNERSHIP] user %s is cleared, object is not owned, PASS" % user + return 1 + + # otherwise, ownership by user/group + for allowed in obj.owners: + if user == allowed: + # user match + if debug: + print "[OWNERSHIP] user %s in match list, PASS" % user + return 1 + for group in user_groups: + if user in user_groups[group]: + if debug: + print "[OWNERSHIP] user %s matched by group, PASS" % user + return 1 + + # can't find user or group in ownership list and ownership is defined + # so reject the operation + if debug: + print "[OWNERSHIP] user %s rejected by default policy, FAIL" % user + return 0 + + +if __name__ == "__main__": + import api as cobbler_api + api = cobbler_api.BootAPI() + print __parse_config() + print authorize(api, "admin1", "sync") + d = api.find_distro("F9B-i386") + print authorize(api, "admin1", "save_distro", d, debug=True) + + # real tests are contained in tests/tests.py diff --git a/cobbler/remote.py b/cobbler/remote.py index 4b04fcb..76ffbcf 100644 --- a/cobbler/remote.py +++ b/cobbler/remote.py @@ -777,8 +777,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface): Saves a newly created or modified distro object to disk. """ self.log("save_distro",object_id=object_id,token=token) - self.check_access(token,"save_distro") obj = self.__get_object(object_id) + self.check_access(token,"save_distro",obj) return self.api.distros().add(obj,save=True) def save_profile(self,object_id,token): @@ -786,8 +786,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface): Saves a newly created or modified profile object to disk. """ self.log("save_profile",token=token,object_id=object_id) - self.check_access(token,"save_profile") obj = self.__get_object(object_id) + self.check_access(token,"save_profile",obj) return self.api.profiles().add(obj,save=True) def save_system(self,object_id,token): @@ -795,8 +795,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface): Saves a newly created or modified system object to disk. """ self.log("save_system",token=token,object_id=object_id) - self.check_access(token,"save_system") obj = self.__get_object(object_id) + self.check_access(token,"save_system",obj) return self.api.systems().add(obj,save=True) def save_repo(self,object_id,token=None): @@ -804,8 +804,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface): Saves a newly created or modified repo object to disk. """ self.log("save_repo",object_id=object_id,token=token) - self.check_access(token,"save_repo") obj = self.__get_object(object_id) + self.check_access(token,"save_repo",obj) return self.api.repos().add(obj,save=True) def copy_distro(self,object_id,newname,token=None): diff --git a/tests/tests.py b/tests/tests.py index 3fb4177..546c83f 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -2,16 +2,6 @@ # # Michael DeHaan <mdehaan@redhat.com> -TRY_GRAPH = False -HAS_GRAPH = False - -if TRY_GRAPH: - try: - import pycallgraph_mod as pycallgraph - HAS_GRAPH = True - except: - pass - import sys import unittest import os @@ -25,6 +15,7 @@ from cobbler import settings from cobbler import collection_distros from cobbler import collection_profiles from cobbler import collection_systems +import cobbler.modules.authz_ownership as authz_module from cobbler import api @@ -54,11 +45,11 @@ class BootTest(unittest.TestCase): except: pass - self.fk_initrd = os.path.join(self.topdir, FAKE_INITRD) + self.fk_initrd = os.path.join(self.topdir, FAKE_INITRD) self.fk_initrd2 = os.path.join(self.topdir, FAKE_INITRD2) self.fk_initrd3 = os.path.join(self.topdir, FAKE_INITRD3) - self.fk_kernel = os.path.join(self.topdir, FAKE_KERNEL) + self.fk_kernel = os.path.join(self.topdir, FAKE_KERNEL) self.fk_kernel2 = os.path.join(self.topdir, FAKE_KERNEL2) self.fk_kernel3 = os.path.join(self.topdir, FAKE_KERNEL3) @@ -75,9 +66,6 @@ class BootTest(unittest.TestCase): shutil.rmtree(self.topdir,ignore_errors=True) self.api = None - if HAS_GRAPH: - pycallgraph.save_dot("%s.dot" % self.__class__.__name__) - def make_basic_config(self): distro = self.api.new_distro() self.assertTrue(distro.set_name("testdistro0")) @@ -116,25 +104,90 @@ class BootTest(unittest.TestCase): class Ownership(BootTest): def test_ownership_params(self): - return # FIXME - # NOTE: these tests are relaeively weak because they only test - # that the options are usable, not that they work, since cobbler - # as a command line tool ignores them, and cobblerd only cares - # in certain modes. + # initially just test that we can set ownership on various components + distro = self.api.find_distro(name="testdistro0") - profile = self.api.find_distro(name="testprofile0") - system = self.api.find_distro(name="drwily.rdu.redhat.com") + profile = self.api.find_profile(name="testprofile0") + system = self.api.find_system(name="drwily.rdu.redhat.com") repo = self.api.find_repo(name="test_repo") - self.assertTrue(distro.set_owners("a,b")) - self.assertTrue(profile.set_owners("c,d")) - self.assertTrue(system.set_owners("e")) - self.assertTrue(repo.set_owners("f,g")) + self.assertTrue(distro.set_owners("superlab,basement1")) + self.assertTrue(profile.set_owners("superlab,basement1")) + self.assertTrue(system.set_owners("superlab,basement1")) + self.assertTrue(repo.set_owners([])) self.api.add_distro(distro) self.api.add_profile(profile) self.api.add_system(system) self.api.add_repo(repo) + # now edit the groups file. We won't test the full XMLRPC + # auth stack here, but just the module in question + + authorize = authz_module.authorize + + # if the users.conf file exists, back it up for the tests + #if os.path.exists("/etc/cobbler/users.conf"): + # shutil.copyfile("/etc/cobbler/users.conf","/tmp/cobbler_ubak") + + fd = open("/etc/cobbler/users.conf","w+") + fd.write("\n") + fd.write("[admins]\n") + fd.write("admin1 = 1\n") + fd.write("\n") + fd.write("[superlab]\n") + fd.write("superlab1 = 1\n") + fd.write("\n") + fd.write("[basement]\n") + fd.write("basement1 = 1\n") + fd.write("basement2 = 1\n") + fd.close() + + xo = self.api.find_distro("testdistro0") + xn = "testdistro0" + ro = self.api.find_repo("testrepo0") + rn = "testrepo0" + + # ensure admin1 can edit (he's an admin) and do other tasks + # same applies to basement1 who is explicitly added as a user + # and superlab1 who is in a group in the ownership list + for user in ["admin1","superlab1","basement1"]: + self.assertTrue(1==authorize(self.api, user, "save_distro", xo, debug=True),"%s can save_distro" % user) + self.assertTrue(1==authorize(self.api, user, "modify_distro", xo, debug=True),"%s can modify_distro" % user) + self.assertTrue(1==authorize(self.api, user, "copy_distro", xo, debug=True),"%s can copy_distro" % user) + self.assertTrue(1==authorize(self.api, user, "remove_distro", xn, debug=True),"%s can remove_distro" % user) + + # ensure all users in the file can sync + for user in [ "admin1", "superlab1", "basement1", "basement2" ]: + self.assertTrue(1==authorize(self.api, user, "sync", debug=True)) + + # make sure basement2 can't edit (not in group) + # and same goes for "dne" (does not exist in users.conf) + + for user in [ "basement2", "dne" ]: + self.assertTrue(0==authorize(self.api, user, "save_distro", xo, debug=True), "user %s cannot save_distro" % user) + self.assertTrue(0==authorize(self.api, user, "modify_distro", xo, debug=True), "user %s cannot modify_distro" % user) + self.assertTrue(0==authorize(self.api, user, "remove_distro", xn, debug=True), "user %s cannot remove_distro" % user) + + # basement2 is in the file so he can still copy + self.assertTrue(1==authorize(self.api, "basement2", "copy_distro", xo, debug=True), "basement2 can copy_distro") + + # dne can not copy or sync either (not in the users.conf) + self.assertTrue(0==authorize(self.api, "dne", "copy_distro", xo, debug=True), "dne cannot copy_distro") + self.assertTrue(0==authorize(self.api, "dne", "sync", debug=True), "dne cannot sync") + + # unlike the distro testdistro0, testrepo0 is unowned + # so any user in the file will be able to edit it. + for user in [ "admin1", "superlab1", "basement1", "basement2" ]: + self.assertTrue(1==authorize(self.api, user, "save_repo", ro, debug=True), "user %s can save_repo" % user) + + # though dne is still not listed and will be denied + self.assertTrue(0==authorize(self.api, "dne", "save_repo", ro, debug=True), "dne cannot save_repo") + + # if we survive, restore the users file as module testing is done + #if os.path.exists("/tmp/cobbler_ubak"): + # shutil.copyfile("/etc/cobbler/users.conf","/tmp/cobbler_ubak") + + class MultiNIC(BootTest): def test_multi_nic_support(self): @@ -612,14 +665,10 @@ if __name__ == "__main__": if not os.path.exists("setup.py"): print "tests: must invoke from top level directory" sys.exit(1) - if HAS_GRAPH: - pycallgraph.start_trace() loader = unittest.defaultTestLoader test_module = __import__("tests") # self import considered harmful? tests = loader.loadTestsFromModule(test_module) runner = unittest.TextTestRunner() runner.run(tests) - if HAS_GRAPH: - pycallgraph.make_graph('cg_dot.png', tool='dot') sys.exit(0) |
