From 12f673d54bd7a1a5ecdc2f519ac85876bb22ecae Mon Sep 17 00:00:00 2001 From: Stephen Gallagher Date: Mon, 21 Sep 2009 06:46:29 -0400 Subject: Add new SSSDConfig python API Also adds unit tests for the SSSDConfig API --- server/config/SSSDConfig.py | 645 +++++++++++++ server/config/SSSDConfigTest.py | 1310 ++++++++++++++++++++++++++ server/config/etc/sssd.api.conf | 48 + server/config/etc/sssd.api.d/sssd-krb5.conf | 13 + server/config/etc/sssd.api.d/sssd-ldap.conf | 32 + server/config/etc/sssd.api.d/sssd-local.conf | 11 + server/config/testconfigs/noparse.api.conf | 7 + server/config/testconfigs/sssd-invalid.conf | 3 + server/config/testconfigs/sssd-valid.conf | 42 + 9 files changed, 2111 insertions(+) create mode 100644 server/config/SSSDConfig.py create mode 100644 server/config/SSSDConfigTest.py create mode 100644 server/config/etc/sssd.api.conf create mode 100644 server/config/etc/sssd.api.d/sssd-krb5.conf create mode 100644 server/config/etc/sssd.api.d/sssd-ldap.conf create mode 100644 server/config/etc/sssd.api.d/sssd-local.conf create mode 100644 server/config/testconfigs/noparse.api.conf create mode 100644 server/config/testconfigs/sssd-invalid.conf create mode 100644 server/config/testconfigs/sssd-valid.conf (limited to 'server') diff --git a/server/config/SSSDConfig.py b/server/config/SSSDConfig.py new file mode 100644 index 000000000..07e967bac --- /dev/null +++ b/server/config/SSSDConfig.py @@ -0,0 +1,645 @@ +''' +Created on Sep 18, 2009 + +@author: sgallagh +''' + +import os +import exceptions +from ConfigParser import RawConfigParser, NoSectionError + +# Exceptions +class SSSDConfigException(Exception): pass +class ParsingError(Exception): pass +class AlreadyInitializedError(SSSDConfigException): pass +class NotInitializedError(SSSDConfigException): pass +class NoOutputFileError(SSSDConfigException): pass +class NoServiceError(SSSDConfigException): pass +class NoSectionError(SSSDConfigException): pass +class NoOptionError(SSSDConfigException): pass +class ServiceNotRecognizedError(SSSDConfigException): pass +class ServiceAlreadyExists(SSSDConfigException): pass +class NoDomainError(SSSDConfigException): pass +class DomainNotRecognized(SSSDConfigException): pass +class NoSuchProviderError(SSSDConfigException): pass +class NoSuchProviderSubtypeError(SSSDConfigException): pass +class ProviderSubtypeInUse(SSSDConfigException): pass + +class SSSDConfigSchema(RawConfigParser): + def __init__(self, schemafile, schemaplugindir): + #TODO: get these from a global setting + if not schemafile: + schemafile = '/etc/sssd/sssd.api.conf' + if not schemaplugindir: + schemaplugindir = '/etc/sssd/sssd.api.d' + + RawConfigParser.__init__(self, None, dict) + try: + #Read the primary config file + fd = open(schemafile, 'r') + self.readfp(fd) + fd.close() + # Read in the provider files + for file in os.listdir(schemaplugindir): + fd = open(schemaplugindir+ "/" + file) + self.readfp(fd) + fd.close() + except IOError: + raise + except: + raise ParsingError + + # Set up lookup table for types + self.type_lookup = { + 'bool' : bool, + 'int' : int, + 'long' : long, + 'float': float, + 'str' : str, + 'list' : list, + 'None' : None + } + + # Lookup table for acceptable boolean values + self.bool_lookup = { + 'false' : False, + 'true' : True, + } + + def _striplist(self, l): + return([x.strip() for x in l]) + + def get_options(self, section): + if not self.has_section(section): + raise NoSectionError + options = self.options(section) + + # Parse values + parsed_options = {} + for option in options: + unparsed_option = self.get(section, option) + split_option = self._striplist(unparsed_option.split(',')) + optionlen = len(split_option) + + primarytype = self.type_lookup[split_option[0]] + subtype = self.type_lookup[split_option[1]] + + if optionlen == 2: + # This option has no defaults + parsed_options[option] = \ + (primarytype, + subtype, + None) + elif optionlen == 3: + if type(split_option[2]) == primarytype: + parsed_options[option] = \ + (primarytype, + subtype, + split_option[2]) + elif primarytype == list: + if (type(split_option[2]) == subtype): + parsed_options[option] = \ + (primarytype, + subtype, + [split_option[2]]) + else: + try: + parsed_options[option] = \ + (primarytype, + subtype, + [subtype(split_option[2])]) + except ValueError: + raise ParsingError + else: + try: + parsed_options[option] = \ + (primarytype, + subtype, + primarytype(split_option[2])) + except ValueError: + raise ParsingError + + elif optionlen > 3: + if (primarytype != list): + raise ParsingError + fixed_options = [] + for x in split_option[2:]: + if type(x) != subtype: + try: + fixed_options.extend([subtype(x)]) + except ValueError: + raise ParsingError + else: + fixed_options.extend([x]) + parsed_options[option] = \ + (primarytype, + subtype, + fixed_options) + else: + # Bad config file + raise ParsingError + + return parsed_options + + def get_option(self, section, option): + if not self.has_section(section): + raise NoSectionError(section) + if not self.has_option(section, option): + raise NoOptionError("Section [%s] has no option [%s]" % + (section, option)) + + return self.get_options(section)[option] + + def get_defaults(self, section): + if not self.has_section(section): + raise NoSectionError(section) + + schema_options = self.get_options(section) + defaults = dict([(x,schema_options[x][2]) + for x in schema_options.keys() + if schema_options[x][2] != None]) + + return defaults + + def get_services(self): + service_list = [x for x in self.sections() + if x != 'service' and + not x.startswith('domain') and + not x.startswith('provider')] + return service_list + + def get_providers(self): + providers = {} + for section in self._sections: + splitsection = section.split('/') + if (splitsection[0] == 'provider'): + if(len(splitsection) == 3): + if not providers.has_key(splitsection[1]): + providers[splitsection[1]] = [] + providers[splitsection[1]].extend([splitsection[2]]) + for key in providers.keys(): + providers[key] = tuple(providers[key]) + return providers + +class SSSDService: + ''' + classdocs + ''' + + def __init__(self, servicename, apischema): + if not isinstance(apischema, SSSDConfigSchema) or type(servicename) != str: + raise TypeError + + if not apischema.has_section(servicename): + raise ServiceNotRecognizedError(servicename) + + self.name = servicename + self.schema = apischema + + # Set up the service object with any known defaults + self.options = {} + + # Set up default options for all services + self.options.update(self.schema.get_defaults('service')) + + # Set up default options for this service + self.options.update(self.schema.get_defaults(self.name)) + + def get_name(self): + return self.name + + def list_options(self): + options = {} + + # Get the list of available options for all services + schema_options = self.schema.get_options('service') + options.update(schema_options) + + schema_options = self.schema.get_options(self.name) + options.update(schema_options) + + return options + + def _striplist(self, l): + return([x.strip() for x in l]) + + def set_option(self, optionname, value): + if self.schema.has_option(self.name, optionname): + option_schema = self.schema.get_option(self.name, optionname) + elif self.schema.has_option('service', optionname): + option_schema = self.schema.get_option('service', optionname) + else: + raise NoOptionError('Section [%s] has no option [%s]' % (self.name, optionname)) + + if value == None: + self.remove_option(optionname) + return + + # If we were expecting a list and didn't get one, + # Create a list with a single entry. If it's the + # wrong subtype, it will fail below + if option_schema[0] == list and type(value) != list: + if type(value) == str: + value = self._striplist(value.split(',')) + else: + value = [value] + + if type(value) != option_schema[0]: + # If it's possible to convert it, do so + try: + value = option_schema[0](value) + except ValueError: + raise TypeError('Expected %s for %s, received %s' % + (option_schema[0], optionname, type(value))) + + if type(value) == list: + # Iterate through the list an ensure that all members + # are of the appropriate subtype + try: + value = [option_schema[1](x) + for x in value] + except ValueError: + raise TypeError('Expected %s' % option_schema[1]) + + self.options[optionname] = value + + def get_option(self, optionname): + if optionname in self.options.keys(): + return self.options[optionname] + raise NoOptionError(optionname) + + def get_all_options(self): + return self.options + + def remove_option(self, optionname): + if self.options.has_key(optionname): + del self.options[optionname] + +class SSSDDomain: + def __init__(self, domainname, apischema): + if not isinstance(apischema, SSSDConfigSchema) or type(domainname) != str: + raise TypeError + + self.name = domainname + self.schema = apischema + self.active = False + self.oldname = None + self.providers = [] + + # Set up the domain object with any known defaults + self.options = {} + + # Set up default options for all domains + self.options.update(self.schema.get_defaults('provider')) + self.options.update(self.schema.get_defaults('domain')) + + def get_name(self): + return self.name + + def set_active(self, active): + self.active = bool(active) + + def list_options(self): + options = {} + # Get the list of available options for all domains + options.update(self.schema.get_options('provider')) + + options.update(self.schema.get_options('domain')) + + # Candidate for future optimization: will update primary type + # for each subtype + for (provider, providertype) in self.providers: + schema_options = self.schema.get_options('provider/%s' + % provider) + options.update(schema_options) + schema_options = self.schema.get_options('provider/%s/%s' + % (provider, providertype)) + options.update(schema_options) + return options + + def list_provider_options(self, provider, provider_type=None): + #TODO section checking + + options = self.schema.get_options('provider/%s' % provider) + if(provider_type): + options.update(self.schema.get_options('provider/%s/%s' % + (provider, provider_type))) + else: + # Add options from all provider subtypes + known_providers = self.list_providers() + for provider_type in known_providers[provider]: + options.update(self.list_provider_options(provider, + provider_type)) + return options + + def list_providers(self): + return self.schema.get_providers() + + def set_option(self, option, value): + options = self.list_options() + if (option not in options.keys()): + raise NoOptionError('Section [%s] has no option [%s]' % + (self.name, option)) + + if value == None: + self.remove_option(option) + return + + option_schema = options[option] + + # If we were expecting a list and didn't get one, + # Create a list with a single entry. If it's the + # wrong subtype, it will fail below + if option_schema[0] == list and type(value) != list: + if type(value) == str: + value = self._striplist(value.split(',')) + else: + value = [value] + + if type(value) != option_schema[0]: + # If it's possible to convert it, do so + try: + value = option_schema[0](value) + except ValueError: + raise TypeError('Expected %s for %s, received %s' % + (option_schema[0], option, type(value))) + + if type(value) == list: + # Iterate through the list an ensure that all members + # are of the appropriate subtype + try: + value = [option_schema[1](x) + for x in value] + except ValueError: + raise TypeError('Expected %s' % option_schema[1]) + + # Check whether we're adding a provider entry. + # This requires special handling + is_provider = option.rfind('_provider') + if (is_provider > 0): + provider = option[:is_provider] + self.add_provider(value, provider) + else: + self.options[option] = value + + def get_option(self, optionname): + if optionname in self.options.keys(): + return self.options[optionname] + raise NoOptionError(optionname) + + def get_all_options(self): + return self.options + + def remove_option(self, optionname): + if optionname in self.options.keys(): + del self.options[optionname] + + def add_provider(self, provider, provider_type): + # Check that provider and provider_type are valid + configured_providers = self.list_providers() + if provider in configured_providers.keys(): + if provider_type not in configured_providers[provider]: + raise NoSuchProviderSubtypeError(provider_type) + else: + raise NoSuchProviderError + + # Don't add a provider twice + with_this_type = [x for x in self.providers if x[1] == provider_type] + if len(with_this_type) > 1: + # This should never happen! + raise ProviderSubtypeInUser + if len(with_this_type) == 1: + if with_this_type[0][0] != provider: + raise ProviderSubtypeInUse(with_this_type[0][0]) + else: + self.providers.extend([(provider, provider_type)]) + + option_name = '%s_provider' % provider_type + self.options[option_name] = provider + + # Add defaults for this provider + self.options.update(self.schema.get_defaults('provider/%s' % + provider)) + self.options.update(self.schema.get_defaults('provider/%s/%s' % + (provider, + provider_type))) + + + def remove_provider(self, provider, provider_type): + if (provider,provider_type) not in self.providers: + return + + # TODO: safely remove any unused options when removing + # the provider. This will require modifying the schema + # to account for multiple providers making use of the + # same options (such ask krb5_realm) + + self.providers.remove((provider,provider_type)) + +class SSSDConfig(RawConfigParser): + def __init__(self, schemafile=None, schemaplugindir=None): + RawConfigParser.__init__(self, None, dict) + self.schema = SSSDConfigSchema(schemafile, schemaplugindir) + self.configfile = None + self.initialized = False + + def import_config(self,configfile=None): + if self.initialized: + raise AlreadyInitializedError + + if not configfile: + #TODO: get this from a global setting + configfile = '/etc/sssd/sssd.conf' + # open will raise an IOError if it fails + fd = open(configfile, 'r') + + try: + self.readfp(fd) + except: + raise ParsingError + + fd.close() + self.configfile = configfile + self.initialized = True + + def new_config(self): + if self.initialized: + raise AlreadyInitializedError + + self.initialized = True + + #Initialize all services + for servicename in self.schema.get_services(): + service = self.new_service(servicename) + + def write(self, outputfile=None): + if not self.initialized: + raise NotInitializedError + + if outputfile == None: + if(self.configfile == None): + raise NoOutputFileError + + outputfile = self.configfile + + # open() will raise IOError if it fails + of = open(outputfile, 'w') + RawConfigParser.write(self, of) + of.close() + + def list_services(self): + if not self.initialized: + raise NotInitializedError + + service_list = [x for x in self.sections() + if not x.startswith('domain')] + return service_list + + def get_service(self, name): + if not self.initialized: + raise NotInitializedError + if not self.has_section(name): + raise NoServiceError + + service = SSSDService(name, self.schema) + [service.set_option(option, value) + for (option,value) in self.items(name)] + + return service + + def new_service(self, name): + if not self.initialized: + raise NotInitializedError + if (self.has_section(name)): + raise ServiceAlreadyExists(name) + + service = SSSDService(name, self.schema) + self.save_service(service) + return service + + def delete_service(self, name): + if not self.initialized: + raise NotInitializedError + self.remove_section(name) + + def save_service(self, service): + if not self.initialized: + raise NotInitializedError + if not isinstance(service, SSSDService): + raise TypeError + + name = service.get_name() + # Ensure that the existing section is removed + # This way we ensure that we are getting a + # complete copy of the service. + # remove_section() is a noop if the section + # does not exist. + self.remove_section(name) + self.add_section(name) + option_dict = service.get_all_options() + for option in option_dict.keys(): + value = option_dict[option] + if (type(value) == list): + value = ', '.join(value) + + self.set(name, option, value) + + def _striplist(self, l): + return([x.strip() for x in l]) + + def list_active_domains(self): + if not self.initialized: + raise NotInitializedError + + if (self.has_option('sssd', 'domains')): + active_domains = self._striplist(self.get('sssd', 'domains').split(',')) + else: + active_domains = [] + + domains = [x for x in self.list_domains() + if x in active_domains] + return domains + + def list_inactive_domains(self): + if not self.initialized: + raise NotInitializedError + + if (self.has_option('sssd', 'domains')): + active_domains = self._striplist(self.get('sssd', 'domains').split(',')) + else: + active_domains = [] + + domains = [x for x in self.list_domains() + if x not in active_domains] + return domains + + def list_domains(self): + if not self.initialized: + raise NotInitializedError + domains = [x[7:] for x in self.sections() if x.startswith('domain/')] + return domains + + def get_domain(self, name): + if not self.initialized: + raise NotInitializedError + if not self.has_section('domain/%s' % name): + raise NoDomainError(name) + + domain = SSSDDomain(name, self.schema) + + # Read in the providers first or we may have type + # errors trying to read in their options + providers = [x for x in self.items('domain/%s' % name) + if x[0].rfind('_provider') > 0] + [domain.set_option(option, value) + for (option, value) in providers] + + [domain.set_option(option, value) + for (option,value) in self.items('domain/%s' % name) + if (option,value) not in providers] + + return domain + + def new_domain(self, name): + if not self.initialized: + raise NotInitializedError + if self.has_section('domain/%s' % name): + raise DomainAlreadyExistsError + + domain = SSSDDomain(name, self.schema) + self.save_domain(domain); + return domain + + def delete_domain(self, name): + if not self.initialized: + raise NotInitializedError + self.remove_section('domain/%s' % name) + + def save_domain(self, domain): + if not self.initialized: + raise NotInitializedError + if not isinstance(domain, SSSDDomain): + raise TypeError + + name = domain.get_name() + sectionname = 'domain/%s' % name + # Ensure that the existing section is removed + # This way we ensure that we are getting a + # complete copy of the service. + # remove_section() is a noop if the section + # does not exist. + self.remove_section(sectionname) + self.add_section(sectionname) + option_dict = domain.get_all_options() + [self.set(sectionname, option, option_dict[option]) + for option in option_dict.keys()] + + if domain.active: + if domain.get_name not in self.list_active_domains(): + # Add it to the list of active domains + if (self.has_option('sssd','domains')): + active_domains = self.get('sssd', 'domains') + active_domains += ", %s" % domain.get_name() + else: + active_domains = domain.get_name() + self.set('sssd', 'domains', active_domains) diff --git a/server/config/SSSDConfigTest.py b/server/config/SSSDConfigTest.py new file mode 100644 index 000000000..b597f7601 --- /dev/null +++ b/server/config/SSSDConfigTest.py @@ -0,0 +1,1310 @@ +''' +Created on Sep 18, 2009 + +@author: sgallagh +''' +import unittest + +import SSSDConfig + +class SSSDConfigTestValid(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def testServices(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("testconfigs/sssd-valid.conf") + + # Validate services + services = sssdconfig.list_services() + self.assertTrue('sssd' in services) + self.assertTrue('nss' in services) + self.assertTrue('pam' in services) + self.assertTrue('dp' in services) + + #Verify service attributes + sssd_service = sssdconfig.get_service('sssd') + service_opts = sssd_service.list_options() + + self.assertTrue('config_file_version' in service_opts.keys()) + self.assertEquals(sssd_service.get_option('config_file_version'), 2) + + self.assertTrue('services' in service_opts.keys()) + service_list = sssd_service.get_option('services') + self.assertTrue('nss' in service_list) + self.assertTrue('pam' in service_list) + + self.assertTrue('domains' in service_opts) + + self.assertTrue('reconnection_retries' in service_opts) + + del sssdconfig + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.new_config() + sssdconfig.delete_service('sssd') + new_sssd_service = sssdconfig.new_service('sssd'); + new_options = new_sssd_service.list_options(); + + self.assertTrue('debug_level' in new_options) + self.assertEquals(new_options['debug_level'][0], int) + + self.assertTrue('command' in new_options) + self.assertEquals(new_options['command'][0], str) + + self.assertTrue('reconnection_retries' in new_options) + self.assertEquals(new_options['reconnection_retries'][0], int) + + self.assertTrue('config_file_version' in new_options) + self.assertEquals(new_options['config_file_version'][0], int) + + self.assertTrue('services' in new_options) + self.assertEquals(new_options['debug_level'][0], int) + + self.assertTrue('domains' in new_options) + self.assertEquals(new_options['domains'][0], list) + self.assertEquals(new_options['domains'][1], str) + + self.assertTrue('sbus_timeout' in new_options) + self.assertEquals(new_options['sbus_timeout'][0], int) + + self.assertTrue('re_expression' in new_options) + self.assertEquals(new_options['re_expression'][0], str) + + self.assertTrue('full_name_format' in new_options) + self.assertEquals(new_options['full_name_format'][0], str) + + del sssdconfig + pass + + def testDomains(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("testconfigs/sssd-valid.conf") + + #Validate domain list + domains = sssdconfig.list_domains() + self.assertTrue('LOCAL' in domains) + self.assertTrue('LDAP' in domains) + self.assertTrue('PROXY' in domains) + self.assertTrue('IPA' in domains) + + #Verify domain attributes + ipa_domain = sssdconfig.get_domain('IPA') + domain_opts = ipa_domain.list_options() + self.assertTrue('debug_level' in domain_opts.keys()) + self.assertTrue('id_provider' in domain_opts.keys()) + self.assertTrue('auth_provider' in domain_opts.keys()) + + del sssdconfig + pass + + def testListProviders(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + sssdconfig.new_config() + junk_domain = sssdconfig.new_domain('junk') + providers = junk_domain.list_providers() + self.assertTrue('ldap' in providers.keys()) + + def testCreateNewLocalConfig(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + sssdconfig.new_config() + + local_domain = sssdconfig.new_domain('LOCAL') + local_domain.add_provider('local', 'id') + local_domain.set_option('debug_level', 1) + local_domain.set_option('default_shell', '/bin/tcsh') + local_domain.set_active(True) + sssdconfig.save_domain(local_domain) + + sssdconfig.write('/tmp/testCreateNewLocalConfig.conf') + + def testCreateNewLDAPConfig(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + sssdconfig.new_config() + + ldap_domain = sssdconfig.new_domain('LDAP') + ldap_domain.add_provider('ldap', 'id') + ldap_domain.set_option('debug_level', 1) + ldap_domain.set_active(True) + sssdconfig.save_domain(ldap_domain) + + sssdconfig.write('/tmp/testCreateNewLDAPConfig.conf') + + def testModifyExistingConfig(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("testconfigs/sssd-valid.conf") + + ldap_domain = sssdconfig.get_domain('LDAP') + ldap_domain.set_option('debug_level', 3) + + ldap_domain.remove_provider('ldap', 'auth') + ldap_domain.add_provider('krb5', 'auth') + ldap_domain.set_active(True) + sssdconfig.save_domain(ldap_domain) + + sssdconfig.write('/tmp/testModifyExistingConfig.conf') + +class SSSDConfigTestSSSDService(unittest.TestCase): + def setUp(self): + self.schema = SSSDConfig.SSSDConfigSchema("etc/sssd.api.conf", + "etc/sssd.api.d") + pass + + def tearDown(self): + pass + + def testInit(self): + # Positive test + service = SSSDConfig.SSSDService('sssd', self.schema) + + # Type Error test + # Name is not a string + try: + service = SSSDConfig.SSSDService(3, self.schema) + except TypeError: + pass + else: + self.fail("Expected TypeError exception") + + # TypeError test + # schema is not an SSSDSchema + try: + service = SSSDConfig.SSSDService('3', self) + except TypeError: + pass + else: + self.fail("Expected TypeError exception") + + # ServiceNotRecognizedError test + try: + service = SSSDConfig.SSSDService('ssd', self.schema) + except SSSDConfig.ServiceNotRecognizedError: + pass + else: + self.fail("Expected ServiceNotRecognizedError") + + + def testListOptions(self): + service = SSSDConfig.SSSDService('sssd', self.schema) + + options = service.list_options() + control_list = [ + 'config_file_version', + 'services', + 'domains', + 'sbus_timeout', + 're_expression', + 'full_name_format', + 'debug_level', + 'command', + 'reconnection_retries'] + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + self.assertTrue(type(options['config_file_version']) == tuple, + "Option values should be a tuple") + + self.assertTrue(options['config_file_version'][0] == int, + "config_file_version should require an int. " + + "list_options is requiring a %s" % + options['config_file_version'][0]) + + self.assertTrue(options['config_file_version'][1] == None, + "config_file_version should not require a subtype. " + + "list_options is requiring a %s" % + options['config_file_version'][1]) + + self.assertTrue(options['config_file_version'][0] == int, + "config_file_version should default to 2. " + + "list_options specifies %d" % + options['config_file_version'][2]) + + self.assertTrue(type(options['services']) == tuple, + "Option values should be a tuple") + + self.assertTrue(options['services'][0] == list, + "services should require an list. " + + "list_options is requiring a %s" % + options['services'][0]) + + self.assertTrue(options['services'][1] == str, + "services should require a subtype of str. " + + "list_options is requiring a %s" % + options['services'][1]) + + def testSetOption(self): + service = SSSDConfig.SSSDService('sssd', self.schema) + + # Positive test - Exactly right + service.set_option('debug_level', 2) + self.assertEqual(service.get_option('debug_level'), 2) + + # Positive test - Allow converting "safe" values + service.set_option('debug_level', '2') + self.assertEqual(service.get_option('debug_level'), 2) + + # Positive test - Remove option if value is None + service.set_option('debug_level', None) + self.assertTrue('debug_level' not in service.options.keys()) + + # Negative test - Nonexistent Option + try: + service.set_option('nosuchoption', 1) + except SSSDConfig.NoOptionError: + pass + else: + self.fail("Expected NoOptionError") + + # Negative test - Incorrect type + try: + service.set_option('debug_level', 'two') + except TypeError: + pass + else: + self.fail("Expected TypeError") + + def testGetOption(self): + service = SSSDConfig.SSSDService('sssd', self.schema) + + # Positive test - Single-valued + self.assertEqual(service.get_option('config_file_version'), 2) + + # Positive test - List of values + self.assertEqual(service.get_option('services'), ['nss', 'pam']) + + # Negative Test - Bad Option + try: + service.get_option('nosuchoption') + except SSSDConfig.NoOptionError: + pass + else: + self.fail("Expected NoOptionError") + + def testGetAllOptions(self): + service = SSSDConfig.SSSDService('sssd', self.schema) + + #Positive test + options = service.get_all_options() + control_list = [ + 'config_file_version', + 'services', + 'sbus_timeout', + 're_expression', + 'full_name_format', + 'debug_level', + 'reconnection_retries'] + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + def testRemoveOption(self): + service = SSSDConfig.SSSDService('sssd', self.schema) + + # Positive test - Remove an option that exists + self.assertEqual(service.get_option('debug_level'), 0) + service.remove_option('debug_level') + try: + service.get_option('debug_level') + except SSSDConfig.NoOptionError: + pass + else: + self.fail("debug_level should have been removed") + + # Positive test - Remove an option that doesn't exist + try: + service.get_option('nosuchentry') + except SSSDConfig.NoOptionError: + pass + else: + self.fail("nosuchentry should not exist") + + service.remove_option('nosuchentry') + +class SSSDConfigTestSSSDDomain(unittest.TestCase): + def setUp(self): + self.schema = SSSDConfig.SSSDConfigSchema("etc/sssd.api.conf", + "etc/sssd.api.d") + pass + + def tearDown(self): + pass + + def testInit(self): + # Positive Test + domain = SSSDConfig.SSSDDomain('mydomain', self.schema) + + # Negative Test - Name not a string + try: + domain = SSSDConfig.SSSDDomain(2, self.schema) + except TypeError: + pass + else: + self.fail("Expected TypeError") + + # Negative Test - Schema is not an SSSDSchema + try: + domain = SSSDConfig.SSSDDomain('mydomain', self) + except TypeError: + pass + else: + self.fail("Expected TypeError") + + def testGetName(self): + # Positive Test + domain = SSSDConfig.SSSDDomain('mydomain', self.schema) + + self.assertEqual(domain.get_name(), 'mydomain') + + def testSetActive(self): + #Positive Test + domain = SSSDConfig.SSSDDomain('mydomain', self.schema) + + # Should default to inactive + self.assertFalse(domain.active) + domain.set_active(True) + self.assertTrue(domain.active) + domain.set_active(False) + self.assertFalse(domain.active) + + def testListOptions(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # First test default options + options = domain.list_options() + control_list = [ + 'debug_level', + 'min_id', + 'max_id', + 'timeout', + 'magic_private_groups', + 'enumerate', + 'cache_credentials', + 'use_fully_qualified_names', + 'id_provider', + 'auth_provider', + 'access_provider', + 'chpass_provider'] + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + self.assertTrue(type(options['max_id']) == tuple, + "Option values should be a tuple") + + self.assertTrue(options['max_id'][0] == int, + "config_file_version should require an int. " + + "list_options is requiring a %s" % + options['max_id'][0]) + + self.assertTrue(options['max_id'][1] == None, + "config_file_version should not require a subtype. " + + "list_options is requiring a %s" % + options['max_id'][1]) + + # Add a provider and verify that the new options appear + domain.add_provider('local', 'id') + control_list.extend( + ['default_shell', + 'base_directory']) + + options = domain.list_options() + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + # Add a provider that has global options and verify that + # The new options appear. + domain.add_provider('krb5', 'auth') + + backup_list = control_list[:] + control_list.extend( + ['krb5_kdcip', + 'krb5_realm', + 'krb5_ccachedir', + 'krb5_ccname_template', + 'krb5_auth_timeout']) + + options = domain.list_options() + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + # Remove the auth domain and verify that the options + # revert to the backup_list + domain.remove_provider('krb5', 'auth') + options = domain.list_options() + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in backup_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in backup_list, + 'Option [%s] unexpectedly found' % + option) + + def testListProviders(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + control_provider_dict = { + 'krb5': ('auth', 'access', 'chpass'), + 'local': ('auth', 'chpass', 'access', 'id'), + 'ldap': ('id', 'auth')} + + providers = domain.list_providers() + + self.assertEqual(providers, control_provider_dict) + + def testListProviderOptions(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # Test looking up a specific provider type + options = domain.list_provider_options('krb5', 'auth') + control_list = [ + 'krb5_kdcip', + 'krb5_realm', + 'krb5_ccachedir', + 'krb5_ccname_template', + 'krb5_auth_timeout'] + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + #Test looking up all provider values + options = domain.list_provider_options('krb5') + control_list.extend(['krb5_changepw_principal']) + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + def testAddProvider(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # Positive Test + domain.add_provider('local', 'id') + + # Negative Test - No such backend type + try: + domain.add_provider('nosuchbackend', 'auth') + except SSSDConfig.NoSuchProviderError: + pass + else: + self.fail("Expected NoSuchProviderError") + + # Negative Test - No such backend subtype + try: + domain.add_provider('ldap', 'nosuchsubtype') + except SSSDConfig.NoSuchProviderSubtypeError: + pass + else: + self.fail("Expected NoSuchProviderSubtypeError") + + # Negative Test - Try to add a second provider of the same type + try: + domain.add_provider('ldap', 'id') + except SSSDConfig.ProviderSubtypeInUse: + pass + else: + self.fail("Expected ProviderSubtypeInUse") + + def testRemoveProvider(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # First test default options + options = domain.list_options() + control_list = [ + 'debug_level', + 'min_id', + 'max_id', + 'timeout', + 'magic_private_groups', + 'enumerate', + 'cache_credentials', + 'use_fully_qualified_names', + 'id_provider', + 'auth_provider', + 'access_provider', + 'chpass_provider'] + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + self.assertTrue(type(options['max_id']) == tuple, + "Option values should be a tuple") + + self.assertTrue(options['max_id'][0] == int, + "config_file_version should require an int. " + + "list_options is requiring a %s" % + options['max_id'][0]) + + self.assertTrue(options['max_id'][1] == None, + "config_file_version should not require a subtype. " + + "list_options is requiring a %s" % + options['max_id'][1]) + + # Add a provider and verify that the new options appear + domain.add_provider('local', 'id') + control_list.extend( + ['default_shell', + 'base_directory']) + + options = domain.list_options() + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + # Add a provider that has global options and verify that + # The new options appear. + domain.add_provider('krb5', 'auth') + + backup_list = control_list[:] + control_list.extend( + ['krb5_kdcip', + 'krb5_realm', + 'krb5_ccachedir', + 'krb5_ccname_template', + 'krb5_auth_timeout']) + + options = domain.list_options() + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in control_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in control_list, + 'Option [%s] unexpectedly found' % + option) + + # Remove the auth domain and verify that the options + # revert to the backup_list + domain.remove_provider('krb5', 'auth') + options = domain.list_options() + + self.assertTrue(type(options) == dict, + "Options should be a dictionary") + + # Ensure that all of the expected defaults are there + for option in backup_list: + self.assertTrue(option in options.keys(), + "Option [%s] missing" % + option) + + # Ensure that there aren't any unexpected options listed + for option in options.keys(): + self.assertTrue(option in backup_list, + 'Option [%s] unexpectedly found' % + option) + + # Test removing nonexistent provider - Real + domain.remove_provider('ldap', 'id') + + # Test removing nonexistent provider - Bad backend type + # Should pass without complaint + domain.remove_provider('nosuchbackend', 'id') + + # Test removing nonexistent provider - Bad provider type + # Should pass without complaint + domain.remove_provider('ldap', 'nosuchprovider') + + def testGetOption(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # Positive Test - Ensure that we can get a valid option + self.assertEqual(domain.get_option('debug_level'), 0) + + # Negative Test - Try to get valid option that is not set + try: + domain.get_option('max_id') + except SSSDConfig.NoOptionError: + pass + else: + self.fail("Expected NoOptionError") + + # Positive Test - Set the above option and get it + domain.set_option('max_id', 10000) + self.assertEqual(domain.get_option('max_id'), 10000) + + # Negative Test - Try yo get invalid option + try: + domain.get_option('nosuchoption') + except SSSDConfig.NoOptionError: + pass + else: + self.fail("Expected NoOptionError") + + def testSetOption(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # Positive Test + domain.set_option('max_id', 10000) + self.assertEqual(domain.get_option('max_id'), 10000) + + # Positive Test - Remove option if value is None + domain.set_option('max_id', None) + self.assertTrue('max_id' not in domain.get_all_options().keys()) + + # Negative Test - invalid option + try: + domain.set_option('nosuchoption', 1) + except SSSDConfig.NoOptionError: + pass + else: + self.fail("Expected NoOptionError") + + # Negative Test - incorrect type + try: + domain.set_option('max_id', 'a string') + except TypeError: + pass + else: + self.fail("Expected TypeError") + + # Positive Test - Coax options to appropriate type + domain.set_option('max_id', '10000') + self.assertEqual(domain.get_option('max_id'), 10000) + + domain.set_option('max_id', 30.2) + self.assertEqual(domain.get_option('max_id'), 30) + + def testRemoveOption(self): + domain = SSSDConfig.SSSDDomain('sssd', self.schema) + + # Positive test - Remove existing option + self.assertTrue('min_id' in domain.get_all_options().keys()) + domain.remove_option('min_id') + self.assertFalse('min_id' in domain.get_all_options().keys()) + + # Positive test - Remove unset but valid option + self.assertFalse('max_id' in domain.get_all_options().keys()) + domain.remove_option('max_id') + self.assertFalse('max_id' in domain.get_all_options().keys()) + + # Positive test - Remove unset and unknown option + self.assertFalse('nosuchoption' in domain.get_all_options().keys()) + domain.remove_option('nosuchoption') + self.assertFalse('nosuchoption' in domain.get_all_options().keys()) + +class SSSDConfigTestSSSDConfig(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def testInit(self): + # Positive test + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - No Such File + try: + sssdconfig = SSSDConfig.SSSDConfig("nosuchfile.api.conf", + "etc/sssd.api.d") + except IOError: + pass + else: + self.fail("Expected IOError") + + # Negative Test - Schema is not parsable + try: + sssdconfig = SSSDConfig.SSSDConfig("testconfigs/noparse.api.conf", + "etc/sssd.api.d") + except SSSDConfig.ParsingError: + pass + else: + self.fail("Expected ParsingError") + + def testImportConfig(self): + # Positive Test + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("testconfigs/sssd-valid.conf") + + # Verify that all sections were imported + control_list = [ + 'sssd', + 'nss', + 'pam', + 'dp', + 'domain/PROXY', + 'domain/IPA', + 'domain/LOCAL', + 'domain/LDAP', + ] + + for section in control_list: + self.assertTrue(sssdconfig.has_section(section), + "Section [%s] missing" % + section) + for section in sssdconfig.sections(): + self.assertTrue(section in control_list) + + # Verify that all options were imported for a section + control_list = [ + 'services', + 'reconnection_retries', + 'domains', + 'config_file_version'] + + for option in control_list: + self.assertTrue(sssdconfig.has_option('sssd', option), + "Option [%s] missing from [sssd]" % + option) + for option in sssdconfig.options('sssd'): + self.assertTrue(option in control_list, + "Option [%s] unexpectedly found" % + option) + + #TODO: Check the types and values of the settings + + # Negative Test - Missing config file + try: + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("nosuchfile.conf") + except IOError: + pass + else: + self.fail("Expected IOError") + + # Negative Test - Invalid config file + try: + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("testconfigs/sssd-invalid.conf") + except SSSDConfig.ParsingError: + pass + else: + self.fail("Expected ParsingError") + + # Negative Test - Already initialized + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.import_config("testconfigs/sssd-valid.conf") + try: + sssdconfig.import_config("testconfigs/sssd-valid.conf") + except SSSDConfig.AlreadyInitializedError: + pass + else: + self.fail("Expected AlreadyInitializedError") + + def testNewConfig(self): + # Positive Test + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + sssdconfig.new_config() + + # Check that the defaults were set + control_list = [ + 'sssd', + 'nss', + 'pam'] + for section in control_list: + self.assertTrue(sssdconfig.has_section(section), + "Section [%s] missing" % + section) + for section in sssdconfig.sections(): + self.assertTrue(section in control_list) + + control_list = [ + 'config_file_version', + 'services', + 'sbus_timeout', + 're_expression', + 'full_name_format', + 'debug_level', + 'reconnection_retries'] + for option in control_list: + self.assertTrue(sssdconfig.has_option('sssd', option), + "Option [%s] missing from [sssd]" % + option) + for option in sssdconfig.options('sssd'): + self.assertTrue(option in control_list, + "Option [%s] unexpectedly found" % + option) + + # Negative Test - Already Initialized + try: + sssdconfig.new_config() + except SSSDConfig.AlreadyInitializedError: + pass + else: + self.fail("Expected AlreadyInitializedError") + + def testWrite(self): + #TODO Write tests to compare output files + pass + + def testListServices(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - sssdconfig not initialized + try: + sssdconfig.list_services() + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + sssdconfig.new_config() + + control_list = [ + 'sssd', + 'pam', + 'nss'] + service_list = sssdconfig.list_services() + for service in control_list: + self.assertTrue(service in service_list, + "Service [%s] missing" % + service) + for service in service_list: + self.assertTrue(service in control_list, + "Service [%s] unexpectedly found" % + service) + + def testGetService(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not initialized + try: + service = sssdconfig.get_service('sssd') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + sssdconfig.new_config() + + service = sssdconfig.get_service('sssd') + self.assertTrue(isinstance(service, SSSDConfig.SSSDService)) + + # TODO verify the contents of this service + + # Negative Test - No such service + try: + service = sssdconfig.get_service('nosuchservice') + except SSSDConfig.NoServiceError: + pass + else: + self.fail("Expected NoServiceError") + + def testNewService(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not initialized + try: + service = sssdconfig.new_service('sssd') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + sssdconfig.new_config() + + # Positive Test + # First need to remove the existing service + sssdconfig.delete_service('sssd') + service = sssdconfig.new_service('sssd') + self.failUnless(service.get_name() in sssdconfig.list_services()) + + # TODO: check that the values of this new service + # are set to the defaults from the schema + + def testDeleteService(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not initialized + try: + service = sssdconfig.delete_service('sssd') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + sssdconfig.new_config() + + # Positive Test + service = sssdconfig.delete_service('sssd') + + def testSaveService(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + new_service = SSSDConfig.SSSDService('sssd', sssdconfig.schema) + + # Negative Test - Not initialized + try: + service = sssdconfig.save_service(new_service) + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + # Positive Test + sssdconfig.new_config() + sssdconfig.save_service(new_service) + + # TODO: check that all entries were saved correctly (change a few) + + # Negative Test - Type Error + try: + sssdconfig.save_service(self) + except TypeError: + pass + else: + self.fail("Expected TypeError") + + def testListActiveDomains(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not Initialized + try: + sssdconfig.list_active_domains() + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + # Positive Test + sssdconfig.import_config('testconfigs/sssd-valid.conf') + + control_list = [ + 'IPA', + 'LOCAL'] + active_domains = sssdconfig.list_active_domains() + + for domain in control_list: + self.assertTrue(domain in active_domains, + "Domain [%s] missing" % + domain) + for domain in active_domains: + self.assertTrue(domain in control_list, + "Domain [%s] unexpectedly found" % + domain) + + def testListInactiveDomains(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not Initialized + try: + sssdconfig.list_inactive_domains() + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + # Positive Test + sssdconfig.import_config('testconfigs/sssd-valid.conf') + + control_list = [ + 'PROXY', + 'LDAP'] + inactive_domains = sssdconfig.list_inactive_domains() + + for domain in control_list: + self.assertTrue(domain in inactive_domains, + "Domain [%s] missing" % + domain) + for domain in inactive_domains: + self.assertTrue(domain in control_list, + "Domain [%s] unexpectedly found" % + domain) + + def testListDomains(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not Initialized + try: + sssdconfig.list_domains() + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + # Positive Test + sssdconfig.import_config('testconfigs/sssd-valid.conf') + + control_list = [ + 'IPA', + 'LOCAL', + 'PROXY', + 'LDAP'] + domains = sssdconfig.list_domains() + + for domain in control_list: + self.assertTrue(domain in domains, + "Domain [%s] missing" % + domain) + for domain in domains: + self.assertTrue(domain in control_list, + "Domain [%s] unexpectedly found" % + domain) + + def testGetDomain(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not initialized + try: + domain = sssdconfig.get_domain('sssd') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + sssdconfig.import_config('testconfigs/sssd-valid.conf') + + domain = sssdconfig.get_domain('IPA') + self.assertTrue(isinstance(domain, SSSDConfig.SSSDDomain)) + + # TODO verify the contents of this domain + + # Negative Test - No such domain + try: + domain = sssdconfig.get_domain('nosuchdomain') + except SSSDConfig.NoDomainError: + pass + else: + self.fail("Expected NoDomainError") + + def testNewDomain(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not initialized + try: + domain = sssdconfig.new_domain('example.com') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + sssdconfig.new_config() + + # Positive Test + domain = sssdconfig.new_domain('example.com') + self.assertTrue(isinstance(domain, SSSDConfig.SSSDDomain)) + self.failUnless(domain.get_name() in sssdconfig.list_domains()) + self.failUnless(domain.get_name() in sssdconfig.list_inactive_domains()) + + # TODO: check that the values of this new domain + # are set to the defaults from the schema + + def testDeleteDomain(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + + # Negative Test - Not initialized + try: + sssdconfig.delete_domain('IPA') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + # Positive Test + sssdconfig.import_config('testconfigs/sssd-valid.conf') + + self.assertTrue('IPA' in sssdconfig.list_domains()) + self.assertTrue('IPA' in sssdconfig.list_active_domains()) + sssdconfig.delete_domain('IPA') + self.assertFalse('IPA' in sssdconfig.list_domains()) + self.assertFalse('IPA' in sssdconfig.list_active_domains()) + + def testSaveDomain(self): + sssdconfig = SSSDConfig.SSSDConfig("etc/sssd.api.conf", + "etc/sssd.api.d") + # Negative Test - Not initialized + try: + sssdconfig.delete_domain('IPA') + except SSSDConfig.NotInitializedError: + pass + else: + self.fail("Expected NotInitializedError") + + # Positive Test + sssdconfig.new_config() + domain = sssdconfig.new_domain('example.com') + domain.add_provider('ldap', 'id') + domain.set_option('ldap_uri', 'ldap://ldap.example.com') + domain.set_active(True) + sssdconfig.save_domain(domain) + + self.assertTrue('example.com' in sssdconfig.list_domains()) + self.assertTrue('example.com' in sssdconfig.list_active_domains()) + self.assertEqual(sssdconfig.get('domain/example.com', 'ldap_uri'), + 'ldap://ldap.example.com') + + # Negative Test - Type Error + try: + sssdconfig.save_service(self) + except TypeError: + pass + else: + self.fail("Expected TypeError") + +if __name__ == "__main__": + error = 0 + + suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDService) + res = unittest.TextTestRunner(verbosity=99).run(suite) + if not res.wasSuccessful(): + error |= 0x1 + + suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDDomain) + res = unittest.TextTestRunner(verbosity=99).run(suite) + if not res.wasSuccessful(): + error |= 0x2 + + suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestSSSDConfig) + res = unittest.TextTestRunner(verbosity=99).run(suite) + if not res.wasSuccessful(): + error |= 0x4 + + suite = unittest.TestLoader().loadTestsFromTestCase(SSSDConfigTestValid) + res = unittest.TextTestRunner(verbosity=99).run(suite) + if not res.wasSuccessful(): + error |= 0x8 + + exit(error) diff --git a/server/config/etc/sssd.api.conf b/server/config/etc/sssd.api.conf new file mode 100644 index 000000000..04634ca52 --- /dev/null +++ b/server/config/etc/sssd.api.conf @@ -0,0 +1,48 @@ +# Format: +# option = type, subtype[, default] + +[service] +# Options available to all services +debug_level = int, None, 0 +command = str, None +reconnection_retries = int, None, 3 + +[sssd] +# Monitor service +config_file_version = int, None, 2 +services = list, str, nss, pam +domains = list, str +sbus_timeout = int, None, -1 +re_expression = str, None, (?P[^@]+)@?(?P[^@]*$) +full_name_format = str, None, %1$s@%2$s + +[nss] +# Name service +nss_enum_cache_timeout = int, None +nss_entry_cache_timeout = int, None +nss_entry_cache_no_wait_timeout = int, None +nss_entry_negative_timeout = int, None +nss_filter_users = list, str, root +nss_filter_groups = list, str, root +nss_filter_users_in_groups = bool, None, true + +[pam] +# Authentication service + +[provider] +#Available provider types +id_provider = str, None +auth_provider = str, None +access_provider = str, None +chpass_provider = str, None + +[domain] +# Options available to all domains +debug_level = int, None, 0 +min_id = int, None, 1000 +max_id = int, None +timeout = int, None, 0 +magic_private_groups = bool, None, false +enumerate = bool, None, true +cache_credentials = bool, None, false +use_fully_qualified_names = bool, None, false diff --git a/server/config/etc/sssd.api.d/sssd-krb5.conf b/server/config/etc/sssd.api.d/sssd-krb5.conf new file mode 100644 index 000000000..85067e93a --- /dev/null +++ b/server/config/etc/sssd.api.d/sssd-krb5.conf @@ -0,0 +1,13 @@ +[provider/krb5] +krb5_kdcip = str, None +krb5_realm = str, None +krb5_auth_timeout = int, None + +[provider/krb5/auth] +krb5_ccachedir = str, None +krb5_ccname_template = str, None + +[provider/krb5/access] + +[provider/krb5/chpass] +krb5_changepw_principal = str, None \ No newline at end of file diff --git a/server/config/etc/sssd.api.d/sssd-ldap.conf b/server/config/etc/sssd.api.d/sssd-ldap.conf new file mode 100644 index 000000000..700de0215 --- /dev/null +++ b/server/config/etc/sssd.api.d/sssd-ldap.conf @@ -0,0 +1,32 @@ +[provider/ldap] +ldap_uri = str, None, ldap://localhost +ldap_schema = str, None, rfc2307 +ldap_default_bind_dn = str, None +ldap_default_authtok_type = str, None +ldap_default_authtok = str, None +ldap_network_timeout = int, None +ldap_opt_timeout = int, None +ldap_tls_reqcert = str, None + +[provider/ldap/id] +ldap_user_search_base = str, None +ldap_user_object_class = str, None +ldap_user_name = str, None +ldap_user_uid_number = str, None +ldap_user_gid_number = str, None +ldap_user_gecos = str, None +ldap_user_homedir = str, None +ldap_user_shell = str, None +ldap_user_uuid = str, None +ldap_user_principal = str, None +ldap_user_fullname = str, None +ldap_user_memberof = str, None +ldap_group_search_base = str, None +ldap_group_object_class = str, None +ldap_group_name = str, None +ldap_group_gid_number = str, None +ldap_group_member = str, None +ldap_group_UUID = str, None +ldap_force_upper_case_realm = bool, None + +[provider/ldap/auth] diff --git a/server/config/etc/sssd.api.d/sssd-local.conf b/server/config/etc/sssd.api.d/sssd-local.conf new file mode 100644 index 000000000..48ffae286 --- /dev/null +++ b/server/config/etc/sssd.api.d/sssd-local.conf @@ -0,0 +1,11 @@ +[provider/local] + +[provider/local/id] +default_shell = str, None, /bin/bash +base_directory = str, None, /home + +[provider/local/auth] + +[provider/local/access] + +[provider/local/chpass] \ No newline at end of file diff --git a/server/config/testconfigs/noparse.api.conf b/server/config/testconfigs/noparse.api.conf new file mode 100644 index 000000000..50651001a --- /dev/null +++ b/server/config/testconfigs/noparse.api.conf @@ -0,0 +1,7 @@ +# Format: +# option = type, subtype[, default] + +[service] +# Options available to all services +debug_level = int, None, 0 +command \ No newline at end of file diff --git a/server/config/testconfigs/sssd-invalid.conf b/server/config/testconfigs/sssd-invalid.conf new file mode 100644 index 000000000..3a84ae117 --- /dev/null +++ b/server/config/testconfigs/sssd-invalid.conf @@ -0,0 +1,3 @@ +[sssd] +services +config_file_version = 2 diff --git a/server/config/testconfigs/sssd-valid.conf b/server/config/testconfigs/sssd-valid.conf new file mode 100644 index 000000000..6725e0a72 --- /dev/null +++ b/server/config/testconfigs/sssd-valid.conf @@ -0,0 +1,42 @@ +[nss] +nss_filter_groups = root +nss_entry_negative_timeout = 15 +debug_level = 0 +nss_filter_users_in_groups = true +nss_filter_users = root +nss_entry_cache_no_wait_timeout = 60 +nss_entry_cache_timeout = 600 +nss_enum_cache_timeout = 120 + +[sssd] +services = nss, pam +reconnection_retries = 3 +domains = LOCAL, IPA +config_file_version = 2 + +[domain/PROXY] +id_provider = proxy +auth_provider = proxy +debug_level = 0 + +[domain/IPA] +id_provider = ldap +auth_provider = krb5 +debug_level = 0 + +[domain/LOCAL] +id_provider = local +auth_provider = local +debug_level = 0 + +[domain/LDAP] +id_provider = ldap +auth_provider = ldap +debug_level = 0 + +[pam] +debug_level = 0 + +[dp] +debug_level = 0 + -- cgit