#!/usr/bin/python # # Author: Nathaniel McCallum # # Copyright (c) 2013 Red Hat, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # This script tests OTP, both UDP and Unix Sockets, with a variety of # configuration. It requires pyrad to run, but exits gracefully if not found. # It also deliberately shuts down the test daemons between tests in order to # test how OTP handles the case of short daemon restarts. # from Queue import Empty import StringIO import struct import subprocess import sys import socket import os import atexit try: from pyrad import packet, dictionary from multiprocessing import Process, Queue except ImportError: success('Warning: skipping OTP tests due to missing pyrad or old Python') exit(0) from k5test import * class RadiusDaemon(Process): MAX_PACKET_SIZE = 4096 # We could use a dictionary file, but since we need # such few attributes, we'll just include them here DICTIONARY = dictionary.Dictionary(StringIO.StringIO(""" ATTRIBUTE User-Name 1 string ATTRIBUTE User-Password 2 string ATTRIBUTE NAS-Identifier 32 string """)) def listen(self, addr): raise NotImplementedError() def recvRequest(self, data): raise NotImplementedError() def run(self): addr = self._args[0] secr = self._args[1] pswd = self._args[2] outq = self._args[3] if secr: with open(secr) as file: secr = file.read().strip() data = self.listen(addr) outq.put("started") (buf, sock, addr) = self.recvRequest(data) pkt = packet.AuthPacket(secret=secr, dict=RadiusDaemon.DICTIONARY, packet=buf) usernm = [] passwd = [] for key in pkt.keys(): if key == 'User-Password': passwd = map(pkt.PwDecrypt, pkt[key]) elif key == 'User-Name': usernm = pkt[key] reply = pkt.CreateReply() replyq = {'user': usernm, 'pass': passwd} if passwd == [pswd]: reply.code = packet.AccessAccept replyq['reply'] = True else: reply.code = packet.AccessReject replyq['reply'] = False outq.put(replyq) if addr is None: sock.send(reply.ReplyPacket()) else: sock.sendto(reply.ReplyPacket(), addr) sock.close() class UDPRadiusDaemon(RadiusDaemon): def listen(self, addr): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((addr.split(':')[0], int(addr.split(':')[1]))) return sock def recvRequest(self, sock): (buf, addr) = sock.recvfrom(RadiusDaemon.MAX_PACKET_SIZE) return (buf, sock, addr) class UnixRadiusDaemon(RadiusDaemon): def listen(self, addr): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) if os.path.exists(addr): os.remove(addr) sock.bind(addr) sock.listen(1) return (sock, addr) def recvRequest(self, (sock, addr)): conn = sock.accept()[0] sock.close() os.remove(addr) buf = "" remain = RadiusDaemon.MAX_PACKET_SIZE while True: buf += conn.recv(remain) remain = RadiusDaemon.MAX_PACKET_SIZE - len(buf) if (len(buf) >= 4): remain = struct.unpack("!BBH", buf[0:4])[2] - len(buf) if (remain <= 0): return (buf, conn, None) def verify(daemon, queue, reply, usernm, passwd): try: data = queue.get(timeout=1) except Empty: sys.stderr.write("ERROR: Packet not received by daemon!\n") daemon.terminate() sys.exit(1) assert data['reply'] is reply assert data['user'] == [usernm] assert data['pass'] == [passwd] daemon.join() def setstr(princ, type, username=None): cmd = 'setstr %s otp "[{""type"": ""%s""' % (princ, type) if username is None: cmd += '}]"' else: cmd += ', ""username"": ""%s""}]"' % username return cmd prefix = "/tmp/%d" % os.getpid() secret_file = prefix + ".secret" socket_file = prefix + ".socket" with open(secret_file, "w") as file: file.write("otptest") atexit.register(lambda: os.remove(secret_file)) conf = {'plugins': {'kdcpreauth': {'enable_only': 'otp'}}, 'otp': {'udp': {'server': '$port9', 'secret': secret_file, 'strip_realm': 'true'}, 'unix': {'server': socket_file, 'strip_realm': 'false'}}} queue = Queue() realm = K5Realm(kdc_conf=conf) realm.run_kadminl('modprinc +requires_preauth %s' % realm.user_princ) flags = ['-T', realm.ccache] server_addr = '' + str(realm.portbase + 9) ## Test UDP fail / custom username daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue)) daemon.start() queue.get() realm.run_kadminl(setstr(realm.user_princ, 'udp', 'custom')) realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1) verify(daemon, queue, False, 'custom', 'reject') ## Test UDP success / standard username daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue)) daemon.start() queue.get() realm.run_kadminl(setstr(realm.user_princ, 'udp')) realm.kinit(realm.user_princ, 'accept', flags=flags) verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept') # Detect upstream pyrad bug # https://github.com/wichert/pyrad/pull/18 try: auth = packet.Packet.CreateAuthenticator() packet.Packet(authenticator=auth, secret="").ReplyPacket() except AssertionError: success('Warning: skipping UNIX domain socket tests because of pyrad ' 'assertion bug') exit(0) ## Test Unix fail / custom username daemon = UnixRadiusDaemon(args=(socket_file, '', 'accept', queue)) daemon.start() queue.get() realm.run_kadminl(setstr(realm.user_princ, 'unix', 'custom')) realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1) verify(daemon, queue, False, 'custom', 'reject') ## Test Unix success / standard username daemon = UnixRadiusDaemon(args=(socket_file, '', 'accept', queue)) daemon.start() queue.get() realm.run_kadminl(setstr(realm.user_princ, 'unix')) realm.kinit(realm.user_princ, 'accept', flags=flags) verify(daemon, queue, True, realm.user_princ, 'accept') success('OTP tests')