#!/usr/bin/python # # Copyright (C) 2014 Simo Sorce # # see file 'COPYING' for use and warranty information # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import ConfigParser import io import os import pwd import shutil import signal import random from string import Template import subprocess WRAP_HOSTNAME = 'idp.ipsilon.dev' TESTREALM = 'IPSILON.DEV' TESTDOMAIN = 'ipsilon.dev' KDC_DBNAME = 'db.file' KDC_STASH = 'stash.file' KDC_PASSWORD = 'ipsilon' KRB5_CONF_TEMPLATE = ''' [libdefaults] default_realm = ${TESTREALM} dns_lookup_realm = false dns_lookup_kdc = false rdns = false ticket_lifetime = 24h forwardable = yes default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX udp_preference_limit = 0 [realms] ${TESTREALM} = { kdc =${WRAP_HOSTNAME} } [domain_realm] .${TESTDOMAIN} = ${TESTREALM} ${TESTDOMAIN} = ${TESTREALM} [dbmodules] ${TESTREALM} = { database_name = ${KDCDIR}/${KDC_DBNAME} } ''' KDC_CONF_TEMPLATE = ''' [kdcdefaults] kdc_ports = 88 kdc_tcp_ports = 88 restrict_anonymous_to_tgt = true [realms] ${TESTREALM} = { master_key_type = aes256-cts max_life = 7d max_renewable_life = 14d acl_file = ${KDCDIR}/kadm5.acl dict_file = /usr/share/dict/words default_principal_flags = +preauth admin_keytab = ${TESTREALM}/kadm5.keytab key_stash_file = ${KDCDIR}/${KDC_STASH} } [logging] kdc = FILE:${KDCLOG} ''' USER_KTNAME = "user.keytab" HTTP_KTNAME = "http.keytab" KEY_TYPE = "aes256-cts-hmac-sha1-96:normal" class IpsilonTestBase(object): def __init__(self, name, execname): self.name = name self.execname = execname self.rootdir = os.getcwd() self.testdir = None self.testuser = pwd.getpwuid(os.getuid())[0] self.processes = [] def force_remove(self, op, name, info): os.chmod(name, 0700) os.remove(name) def setup_base(self, path, test): self.testdir = os.path.join(path, test.name) if os.path.exists(self.testdir): shutil.rmtree(self.testdir, onerror=self.force_remove) os.makedirs(self.testdir) shutil.copytree(os.path.join(self.rootdir, 'templates'), os.path.join(self.testdir, 'templates')) os.mkdir(os.path.join(self.testdir, 'etc')) os.mkdir(os.path.join(self.testdir, 'lib')) os.mkdir(os.path.join(self.testdir, 'lib', test.name)) os.mkdir(os.path.join(self.testdir, 'log')) def generate_profile(self, global_opts, args_opts, name, addr, port, nameid='unspecified'): newconf = ConfigParser.ConfigParser() newconf.add_section('globals') for k in global_opts.keys(): newconf.set('globals', k, global_opts[k]) newconf.add_section('arguments') for k in args_opts.keys(): newconf.set('arguments', k, args_opts[k]) profile = io.BytesIO() newconf.write(profile) t = Template(profile.getvalue()) text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port, 'TESTDIR': self.testdir, 'ROOTDIR': self.rootdir, 'NAMEID': nameid, 'HTTP_KTNAME': HTTP_KTNAME, 'TEST_USER': self.testuser}) filename = os.path.join(self.testdir, '%s_profile.cfg' % name) with open(filename, 'wb') as f: f.write(text) return filename def setup_http(self, name, addr, port): httpdir = os.path.join(self.testdir, name) os.mkdir(httpdir) os.mkdir(os.path.join(httpdir, 'conf.d')) os.mkdir(os.path.join(httpdir, 'html')) os.mkdir(os.path.join(httpdir, 'logs')) os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules')) with open(os.path.join(self.rootdir, 'tests/httpd.conf')) as f: t = Template(f.read()) text = t.substitute({'HTTPROOT': httpdir, 'HTTPADDR': addr, 'HTTPPORT': port}) filename = os.path.join(httpdir, 'httpd.conf') with open(filename, 'w+') as f: f.write(text) return filename def setup_idp_server(self, profile, name, addr, port, env): http_conf_file = self.setup_http(name, addr, port) cmd = [os.path.join(self.rootdir, 'ipsilon/install/ipsilon-server-install'), '--config-profile=%s' % profile] subprocess.check_call(cmd, env=env) os.symlink(os.path.join(self.rootdir, 'ipsilon'), os.path.join(self.testdir, 'lib', name, 'ipsilon')) return http_conf_file def setup_sp_server(self, profile, name, addr, port, env): http_conf_file = self.setup_http(name, addr, port) cmd = [os.path.join(self.rootdir, 'ipsilon/install/ipsilon-client-install'), '--config-profile=%s' % profile] subprocess.check_call(cmd, env=env) return http_conf_file def setup_pgdb(self, datadir, env): cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir] subprocess.check_call(cmd, env=env) auth = 'host all all 127.0.0.1/24 trust\n' filename = os.path.join(datadir, 'pg_hba.conf') with open(filename, 'a') as f: f.write(auth) def start_http_server(self, conf, env): env['MALLOC_CHECK_'] = '3' env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1) p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf], env=env, preexec_fn=os.setsid) self.processes.append(p) def start_pgdb_server(self, datadir, rundir, log, addr, port, env): p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o', '-c unix_socket_directories=%s -c port=%s -c \ listen_addresses=%s' % (rundir, port, addr), '-l', log, '-w'], env=env, preexec_fn=os.setsid) self.processes.append(p) p.wait() for d in ['adminconfig', 'users', 'transactions', 'sessions']: cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d] subprocess.check_call(cmd, env=env) def setup_ldap(self, env): ldapdir = os.path.join(self.testdir, 'ldap') os.mkdir(ldapdir) with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f: t = Template(f.read()) text = t.substitute({'ldapdir': ldapdir}) filename = os.path.join(ldapdir, 'slapd.conf') with open(filename, 'w+') as f: f.write(text) subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l', 'tests/ldapdata.ldif'], env=env) return filename def start_ldap_server(self, conf, addr, port, env): p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf, '-h', 'ldap://%s:%s' % (addr, port)], env=env, preexec_fn=os.setsid) self.processes.append(p) def setup_kdc(self, env): # setup kerberos environment testlog = os.path.join(self.testdir, 'kerb.log') krb5conf = os.path.join(self.testdir, 'krb5.conf') kdcconf = os.path.join(self.testdir, 'kdc.conf') kdcdir = os.path.join(self.testdir, 'kdc') if os.path.exists(kdcdir): shutil.rmtree(kdcdir) os.makedirs(kdcdir) t = Template(KRB5_CONF_TEMPLATE) text = t.substitute({'TESTREALM': TESTREALM, 'TESTDOMAIN': TESTDOMAIN, 'TESTDIR': self.testdir, 'KDCDIR': kdcdir, 'KDC_DBNAME': KDC_DBNAME, 'WRAP_HOSTNAME': WRAP_HOSTNAME}) with open(krb5conf, 'w+') as f: f.write(text) t = Template(KDC_CONF_TEMPLATE) text = t.substitute({'TESTREALM': TESTREALM, 'KDCDIR': kdcdir, 'KDCLOG': testlog, 'KDC_STASH': KDC_STASH}) with open(kdcconf, 'w+') as f: f.write(text) kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin', 'KRB5_CONFIG': krb5conf, 'KRB5_KDC_PROFILE': kdcconf} kdcenv.update(env) with (open(testlog, 'a')) as logfile: ksetup = subprocess.Popen(["kdb5_util", "create", "-s", "-r", TESTREALM, "-P", KDC_PASSWORD], stdout=logfile, stderr=logfile, env=kdcenv, preexec_fn=os.setsid) ksetup.wait() if ksetup.returncode != 0: raise ValueError('KDC Setup failed') kdcproc = subprocess.Popen(['krb5kdc', '-n'], env=kdcenv, preexec_fn=os.setsid) self.processes.append(kdcproc) return kdcenv def kadmin_local(self, cmd, env, logfile): ksetup = subprocess.Popen(["kadmin.local", "-q", cmd], stdout=logfile, stderr=logfile, env=env, preexec_fn=os.setsid) ksetup.wait() if ksetup.returncode != 0: raise ValueError('Kadmin local [%s] failed' % cmd) def setup_keys(self, env): testlog = os.path.join(self.testdir, 'kerb.log') svc_name = "HTTP/%s" % WRAP_HOSTNAME svc_keytab = os.path.join(self.testdir, HTTP_KTNAME) cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name) with (open(testlog, 'a')) as logfile: self.kadmin_local(cmd, env, logfile) cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name) with (open(testlog, 'a')) as logfile: self.kadmin_local(cmd, env, logfile) usr_keytab = os.path.join(self.testdir, USER_KTNAME) cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser) with (open(testlog, 'a')) as logfile: self.kadmin_local(cmd, env, logfile) cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser) with (open(testlog, 'a')) as logfile: self.kadmin_local(cmd, env, logfile) keys_env = {"KRB5_KTNAME": svc_keytab} keys_env.update(env) return keys_env def kinit_keytab(self, kdcenv): testlog = os.path.join(self.testdir, 'kinit.log') usr_keytab = os.path.join(self.testdir, USER_KTNAME) kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join( self.testdir, 'ccaches/user') with (open(testlog, 'a')) as logfile: logfile.write("\n%s\n" % kdcenv) ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab, self.testuser], stdout=logfile, stderr=logfile, env=kdcenv, preexec_fn=os.setsid) ksetup.wait() if ksetup.returncode != 0: raise ValueError('kinit %s failed' % self.testuser) def wait(self): for p in self.processes: os.killpg(p.pid, signal.SIGTERM) def setup_servers(self, env=None): raise NotImplementedError() def run(self, env): exe = self.execname if exe.endswith('c'): exe = exe[:-1] return subprocess.call([exe], env=env)