summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael DeHaan <mdehaan@redhat.com>2008-03-27 16:50:58 -0400
committerMichael DeHaan <mdehaan@redhat.com>2008-03-27 16:50:58 -0400
commit5b2e32746600a45af8ce85f645cb3c0d8ae2d084 (patch)
tree597b9683cba53d3d3a31ec352dd9fdc9c60cc698
parent0be993e430bfe2c7c742f7f4d100fb73cb3e317c (diff)
downloadthird_party-cobbler-5b2e32746600a45af8ce85f645cb3c0d8ae2d084.tar.gz
third_party-cobbler-5b2e32746600a45af8ce85f645cb3c0d8ae2d084.tar.xz
third_party-cobbler-5b2e32746600a45af8ce85f645cb3c0d8ae2d084.zip
Adding ownership module + tests and associated changes to cobblerd to make it
work a little better. This module is not fully tested yet, so don't use it yet in production.
-rw-r--r--cobbler/modules/authz_ownership.py155
-rw-r--r--cobbler/remote.py8
-rw-r--r--tests/tests.py109
3 files changed, 238 insertions, 34 deletions
diff --git a/cobbler/modules/authz_ownership.py b/cobbler/modules/authz_ownership.py
new file mode 100644
index 0000000..9b271f4
--- /dev/null
+++ b/cobbler/modules/authz_ownership.py
@@ -0,0 +1,155 @@
+"""
+Authorization module that allow users listed in
+/etc/cobbler/users.conf to be permitted to access resources, with
+the further restriction that cobbler objects can be edited to only
+allow certain users/groups to access those specific objects.
+
+Copyright 2008, Red Hat, Inc
+Michael DeHaan <mdehaan@redhat.com>
+
+This software may be freely redistributed under the terms of the GNU
+general public license.
+
+You should have received a copy of the GNU General Public License
+along with this program; if not, write to the Free Software
+Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+"""
+
+import distutils.sysconfig
+import ConfigParser
+import sys
+import os
+from rhpl.translate import _, N_, textdomain, utf8
+
+plib = distutils.sysconfig.get_python_lib()
+mod_path="%s/cobbler" % plib
+sys.path.insert(0, mod_path)
+
+import cexceptions
+import utils
+
+
+def register():
+ """
+ The mandatory cobbler module registration hook.
+ """
+ return "authz"
+
+def __parse_config(debug=False):
+ etcfile='/etc/cobbler/users.conf'
+ if not os.path.exists(etcfile):
+ raise CX(_("/etc/cobbler/users.conf does not exist"))
+ config = ConfigParser.ConfigParser()
+ config.read(etcfile)
+ alldata = {}
+ sections = config.sections()
+ if debug:
+ print "[OWNERSHIP] sections=%s" % sections
+ for g in sections:
+ alldata[str(g)] = {}
+ opts = config.options(g)
+ if debug:
+ print "[OWNERSHIP] for group %s, users: %s" % (g,opts)
+ for o in opts:
+ alldata[g][o] = 1
+ return alldata
+
+
+def authorize(api_handle,user,resource,arg1=None,arg2=None,debug=False):
+ """
+ Validate a user against a resource.
+ All users in the file are permitted by this module.
+ """
+
+ user_groups = __parse_config(debug)
+ if debug:
+ print "[OWNERSHIP] ------------"
+ print "can user %s do %s (arg1=%s)?" % (user,resource,arg1)
+ print "consult db: %s" % user_groups
+
+ # classify the type of operation
+ save_or_remove = False
+ for criteria in ["save","remove","modify"]:
+ if resource.find(criteria) != -1:
+ save_or_remove = True
+
+ # FIXME: is everyone allowed to copy? I think so.
+ # FIXME: deal with the problem of deleted parents and promotion
+
+ found_user = False
+ for g in user_groups:
+ if user in user_groups[g]:
+ found_user = True
+ # if user is in the admin group, always authorize
+ # regardless of the ownership of the object.
+ if g == "admin":
+ if debug:
+ print "[OWNERSHIP] user % is an admin, PASS" % user
+ return 1
+ break
+
+ if not found_user:
+ # if the user isn't anywhere in the file, reject regardless
+ # they can still use read-only XMLRPC
+ if debug:
+ print "[OWNERSHIP] user %s not found in list, FAIL" % user
+ return 0
+ if not save_or_remove:
+ # sufficient to allow access for non save/remove ops to all
+ # users for now, may want to refine later.
+ if debug:
+ print "[OWNERSHIP] user %s is cleared for non-edit ops, PASS" % user
+ return 1
+
+ # now we have a save_or_remove op, so we must check ownership
+ # of the object. remove ops pass in arg1 as a string name,
+ # saves pass in actual objects, so we must treat them differently.
+
+ obj = None
+ if resource.find("remove") != -1:
+ if resource == "remove_distro":
+ obj = api_handle.find_distro(arg1)
+ elif resource == "remove_profile":
+ obj = api_handle.find_profile(arg1)
+ elif resource == "remove_system":
+ obj = api_handle.find_system(arg1)
+ elif resource == "remove_repo":
+ obj = api_handle.find_system(arg1)
+ else:
+ obj = arg1
+
+ # if the object has no ownership data, allow access regardless
+ if obj.owners is None or obj.owners == []:
+ if debug:
+ print "[OWNERSHIP] user %s is cleared, object is not owned, PASS" % user
+ return 1
+
+ # otherwise, ownership by user/group
+ for allowed in obj.owners:
+ if user == allowed:
+ # user match
+ if debug:
+ print "[OWNERSHIP] user %s in match list, PASS" % user
+ return 1
+ for group in user_groups:
+ if user in user_groups[group]:
+ if debug:
+ print "[OWNERSHIP] user %s matched by group, PASS" % user
+ return 1
+
+ # can't find user or group in ownership list and ownership is defined
+ # so reject the operation
+ if debug:
+ print "[OWNERSHIP] user %s rejected by default policy, FAIL" % user
+ return 0
+
+
+if __name__ == "__main__":
+ import api as cobbler_api
+ api = cobbler_api.BootAPI()
+ print __parse_config()
+ print authorize(api, "admin1", "sync")
+ d = api.find_distro("F9B-i386")
+ print authorize(api, "admin1", "save_distro", d, debug=True)
+
+ # real tests are contained in tests/tests.py
diff --git a/cobbler/remote.py b/cobbler/remote.py
index 4b04fcb..76ffbcf 100644
--- a/cobbler/remote.py
+++ b/cobbler/remote.py
@@ -777,8 +777,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface):
Saves a newly created or modified distro object to disk.
"""
self.log("save_distro",object_id=object_id,token=token)
- self.check_access(token,"save_distro")
obj = self.__get_object(object_id)
+ self.check_access(token,"save_distro",obj)
return self.api.distros().add(obj,save=True)
def save_profile(self,object_id,token):
@@ -786,8 +786,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface):
Saves a newly created or modified profile object to disk.
"""
self.log("save_profile",token=token,object_id=object_id)
- self.check_access(token,"save_profile")
obj = self.__get_object(object_id)
+ self.check_access(token,"save_profile",obj)
return self.api.profiles().add(obj,save=True)
def save_system(self,object_id,token):
@@ -795,8 +795,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface):
Saves a newly created or modified system object to disk.
"""
self.log("save_system",token=token,object_id=object_id)
- self.check_access(token,"save_system")
obj = self.__get_object(object_id)
+ self.check_access(token,"save_system",obj)
return self.api.systems().add(obj,save=True)
def save_repo(self,object_id,token=None):
@@ -804,8 +804,8 @@ class CobblerReadWriteXMLRPCInterface(CobblerXMLRPCInterface):
Saves a newly created or modified repo object to disk.
"""
self.log("save_repo",object_id=object_id,token=token)
- self.check_access(token,"save_repo")
obj = self.__get_object(object_id)
+ self.check_access(token,"save_repo",obj)
return self.api.repos().add(obj,save=True)
def copy_distro(self,object_id,newname,token=None):
diff --git a/tests/tests.py b/tests/tests.py
index 3fb4177..546c83f 100644
--- a/tests/tests.py
+++ b/tests/tests.py
@@ -2,16 +2,6 @@
#
# Michael DeHaan <mdehaan@redhat.com>
-TRY_GRAPH = False
-HAS_GRAPH = False
-
-if TRY_GRAPH:
- try:
- import pycallgraph_mod as pycallgraph
- HAS_GRAPH = True
- except:
- pass
-
import sys
import unittest
import os
@@ -25,6 +15,7 @@ from cobbler import settings
from cobbler import collection_distros
from cobbler import collection_profiles
from cobbler import collection_systems
+import cobbler.modules.authz_ownership as authz_module
from cobbler import api
@@ -54,11 +45,11 @@ class BootTest(unittest.TestCase):
except:
pass
- self.fk_initrd = os.path.join(self.topdir, FAKE_INITRD)
+ self.fk_initrd = os.path.join(self.topdir, FAKE_INITRD)
self.fk_initrd2 = os.path.join(self.topdir, FAKE_INITRD2)
self.fk_initrd3 = os.path.join(self.topdir, FAKE_INITRD3)
- self.fk_kernel = os.path.join(self.topdir, FAKE_KERNEL)
+ self.fk_kernel = os.path.join(self.topdir, FAKE_KERNEL)
self.fk_kernel2 = os.path.join(self.topdir, FAKE_KERNEL2)
self.fk_kernel3 = os.path.join(self.topdir, FAKE_KERNEL3)
@@ -75,9 +66,6 @@ class BootTest(unittest.TestCase):
shutil.rmtree(self.topdir,ignore_errors=True)
self.api = None
- if HAS_GRAPH:
- pycallgraph.save_dot("%s.dot" % self.__class__.__name__)
-
def make_basic_config(self):
distro = self.api.new_distro()
self.assertTrue(distro.set_name("testdistro0"))
@@ -116,25 +104,90 @@ class BootTest(unittest.TestCase):
class Ownership(BootTest):
def test_ownership_params(self):
- return # FIXME
- # NOTE: these tests are relaeively weak because they only test
- # that the options are usable, not that they work, since cobbler
- # as a command line tool ignores them, and cobblerd only cares
- # in certain modes.
+ # initially just test that we can set ownership on various components
+
distro = self.api.find_distro(name="testdistro0")
- profile = self.api.find_distro(name="testprofile0")
- system = self.api.find_distro(name="drwily.rdu.redhat.com")
+ profile = self.api.find_profile(name="testprofile0")
+ system = self.api.find_system(name="drwily.rdu.redhat.com")
repo = self.api.find_repo(name="test_repo")
- self.assertTrue(distro.set_owners("a,b"))
- self.assertTrue(profile.set_owners("c,d"))
- self.assertTrue(system.set_owners("e"))
- self.assertTrue(repo.set_owners("f,g"))
+ self.assertTrue(distro.set_owners("superlab,basement1"))
+ self.assertTrue(profile.set_owners("superlab,basement1"))
+ self.assertTrue(system.set_owners("superlab,basement1"))
+ self.assertTrue(repo.set_owners([]))
self.api.add_distro(distro)
self.api.add_profile(profile)
self.api.add_system(system)
self.api.add_repo(repo)
+ # now edit the groups file. We won't test the full XMLRPC
+ # auth stack here, but just the module in question
+
+ authorize = authz_module.authorize
+
+ # if the users.conf file exists, back it up for the tests
+ #if os.path.exists("/etc/cobbler/users.conf"):
+ # shutil.copyfile("/etc/cobbler/users.conf","/tmp/cobbler_ubak")
+
+ fd = open("/etc/cobbler/users.conf","w+")
+ fd.write("\n")
+ fd.write("[admins]\n")
+ fd.write("admin1 = 1\n")
+ fd.write("\n")
+ fd.write("[superlab]\n")
+ fd.write("superlab1 = 1\n")
+ fd.write("\n")
+ fd.write("[basement]\n")
+ fd.write("basement1 = 1\n")
+ fd.write("basement2 = 1\n")
+ fd.close()
+
+ xo = self.api.find_distro("testdistro0")
+ xn = "testdistro0"
+ ro = self.api.find_repo("testrepo0")
+ rn = "testrepo0"
+
+ # ensure admin1 can edit (he's an admin) and do other tasks
+ # same applies to basement1 who is explicitly added as a user
+ # and superlab1 who is in a group in the ownership list
+ for user in ["admin1","superlab1","basement1"]:
+ self.assertTrue(1==authorize(self.api, user, "save_distro", xo, debug=True),"%s can save_distro" % user)
+ self.assertTrue(1==authorize(self.api, user, "modify_distro", xo, debug=True),"%s can modify_distro" % user)
+ self.assertTrue(1==authorize(self.api, user, "copy_distro", xo, debug=True),"%s can copy_distro" % user)
+ self.assertTrue(1==authorize(self.api, user, "remove_distro", xn, debug=True),"%s can remove_distro" % user)
+
+ # ensure all users in the file can sync
+ for user in [ "admin1", "superlab1", "basement1", "basement2" ]:
+ self.assertTrue(1==authorize(self.api, user, "sync", debug=True))
+
+ # make sure basement2 can't edit (not in group)
+ # and same goes for "dne" (does not exist in users.conf)
+
+ for user in [ "basement2", "dne" ]:
+ self.assertTrue(0==authorize(self.api, user, "save_distro", xo, debug=True), "user %s cannot save_distro" % user)
+ self.assertTrue(0==authorize(self.api, user, "modify_distro", xo, debug=True), "user %s cannot modify_distro" % user)
+ self.assertTrue(0==authorize(self.api, user, "remove_distro", xn, debug=True), "user %s cannot remove_distro" % user)
+
+ # basement2 is in the file so he can still copy
+ self.assertTrue(1==authorize(self.api, "basement2", "copy_distro", xo, debug=True), "basement2 can copy_distro")
+
+ # dne can not copy or sync either (not in the users.conf)
+ self.assertTrue(0==authorize(self.api, "dne", "copy_distro", xo, debug=True), "dne cannot copy_distro")
+ self.assertTrue(0==authorize(self.api, "dne", "sync", debug=True), "dne cannot sync")
+
+ # unlike the distro testdistro0, testrepo0 is unowned
+ # so any user in the file will be able to edit it.
+ for user in [ "admin1", "superlab1", "basement1", "basement2" ]:
+ self.assertTrue(1==authorize(self.api, user, "save_repo", ro, debug=True), "user %s can save_repo" % user)
+
+ # though dne is still not listed and will be denied
+ self.assertTrue(0==authorize(self.api, "dne", "save_repo", ro, debug=True), "dne cannot save_repo")
+
+ # if we survive, restore the users file as module testing is done
+ #if os.path.exists("/tmp/cobbler_ubak"):
+ # shutil.copyfile("/etc/cobbler/users.conf","/tmp/cobbler_ubak")
+
+
class MultiNIC(BootTest):
def test_multi_nic_support(self):
@@ -612,14 +665,10 @@ if __name__ == "__main__":
if not os.path.exists("setup.py"):
print "tests: must invoke from top level directory"
sys.exit(1)
- if HAS_GRAPH:
- pycallgraph.start_trace()
loader = unittest.defaultTestLoader
test_module = __import__("tests") # self import considered harmful?
tests = loader.loadTestsFromModule(test_module)
runner = unittest.TextTestRunner()
runner.run(tests)
- if HAS_GRAPH:
- pycallgraph.make_graph('cg_dot.png', tool='dot')
sys.exit(0)