diff options
Diffstat (limited to 'src/tests')
| -rw-r--r-- | src/tests/Makefile.in | 1 | ||||
| -rw-r--r-- | src/tests/t_otp.py | 226 |
2 files changed, 227 insertions, 0 deletions
diff --git a/src/tests/Makefile.in b/src/tests/Makefile.in index a7f8c2d413..bf097387ea 100644 --- a/src/tests/Makefile.in +++ b/src/tests/Makefile.in @@ -87,6 +87,7 @@ check-pytests:: gcred hist kdbtest plugorder t_init_creds t_localauth $(RUNPYTEST) $(srcdir)/t_anonpkinit.py $(PYTESTFLAGS) $(RUNPYTEST) $(srcdir)/t_authpkinit.py $(PYTESTFLAGS) $(RUNPYTEST) $(srcdir)/t_policy.py $(PYTESTFLAGS) + $(RUNPYTEST) $(srcdir)/t_otp.py $(PYTESTFLAGS) $(RUNPYTEST) $(srcdir)/t_localauth.py $(PYTESTFLAGS) $(RUNPYTEST) $(srcdir)/t_kadm5_hook.py $(PYTESTFLAGS) $(RUNPYTEST) $(srcdir)/t_pwqual.py $(PYTESTFLAGS) diff --git a/src/tests/t_otp.py b/src/tests/t_otp.py new file mode 100644 index 0000000000..66a03ee573 --- /dev/null +++ b/src/tests/t_otp.py @@ -0,0 +1,226 @@ +#!/usr/bin/python +# +# Author: Nathaniel McCallum <npmccallum@redhat.com> +# +# Copyright (c) 2013 Red Hat, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + + +# +# This script tests OTP, both UDP and Unix Sockets, with a variety of +# configuration. It requires pyrad to run, but exits gracefully if not found. +# It also deliberately shuts down the test daemons between tests in order to +# test how OTP handles the case of short daemon restarts. +# + +from Queue import Empty +import StringIO +import struct +import subprocess +import sys +import socket +import os +import atexit + +try: + from pyrad import packet, dictionary + from multiprocessing import Process, Queue +except ImportError: + success('Warning: skipping OTP tests due to missing pyrad or old Python') + exit(0) + +from k5test import * + +class RadiusDaemon(Process): + MAX_PACKET_SIZE = 4096 + + # We could use a dictionary file, but since we need + # such few attributes, we'll just include them here + DICTIONARY = dictionary.Dictionary(StringIO.StringIO(""" +ATTRIBUTE User-Name 1 string +ATTRIBUTE User-Password 2 string +ATTRIBUTE NAS-Identifier 32 string +""")) + + def listen(self, addr): + raise NotImplementedError() + + def recvRequest(self, data): + raise NotImplementedError() + + def run(self): + addr = self._args[0] + secr = self._args[1] + pswd = self._args[2] + outq = self._args[3] + + if secr: + with open(secr) as file: + secr = file.read().strip() + + data = self.listen(addr) + outq.put("started") + (buf, sock, addr) = self.recvRequest(data) + pkt = packet.AuthPacket(secret=secr, + dict=RadiusDaemon.DICTIONARY, + packet=buf) + + usernm = [] + passwd = [] + for key in pkt.keys(): + if key == 'User-Password': + passwd = map(pkt.PwDecrypt, pkt[key]) + elif key == 'User-Name': + usernm = pkt[key] + + reply = pkt.CreateReply() + replyq = {'user': usernm, 'pass': passwd} + if passwd == [pswd]: + reply.code = packet.AccessAccept + replyq['reply'] = True + else: + reply.code = packet.AccessReject + replyq['reply'] = False + + outq.put(replyq) + if addr is None: + sock.send(reply.ReplyPacket()) + else: + sock.sendto(reply.ReplyPacket(), addr) + sock.close() + +class UDPRadiusDaemon(RadiusDaemon): + def listen(self, addr): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind((addr.split(':')[0], int(addr.split(':')[1]))) + return sock + + def recvRequest(self, sock): + (buf, addr) = sock.recvfrom(RadiusDaemon.MAX_PACKET_SIZE) + return (buf, sock, addr) + +class UnixRadiusDaemon(RadiusDaemon): + def listen(self, addr): + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + if os.path.exists(addr): + os.remove(addr) + sock.bind(addr) + sock.listen(1) + return (sock, addr) + + def recvRequest(self, (sock, addr)): + conn = sock.accept()[0] + sock.close() + os.remove(addr) + + buf = "" + remain = RadiusDaemon.MAX_PACKET_SIZE + while True: + buf += conn.recv(remain) + remain = RadiusDaemon.MAX_PACKET_SIZE - len(buf) + if (len(buf) >= 4): + remain = struct.unpack("!BBH", buf[0:4])[2] - len(buf) + if (remain <= 0): + return (buf, conn, None) + +def verify(daemon, queue, reply, usernm, passwd): + try: + data = queue.get(timeout=1) + except Empty: + sys.stderr.write("ERROR: Packet not received by daemon!\n") + daemon.terminate() + sys.exit(1) + assert data['reply'] is reply + assert data['user'] == [usernm] + assert data['pass'] == [passwd] + daemon.join() + +def setstr(princ, type, username=None): + cmd = 'setstr %s otp "[{""type"": ""%s""' % (princ, type) + if username is None: + cmd += '}]"' + else: + cmd += ', ""username"": ""%s""}]"' % username + return cmd + +prefix = "/tmp/%d" % os.getpid() +secret_file = prefix + ".secret" +socket_file = prefix + ".socket" +with open(secret_file, "w") as file: + file.write("otptest") +atexit.register(lambda: os.remove(secret_file)) + +conf = {'plugins': {'kdcpreauth': {'enable_only': 'otp'}}, + 'otp': {'udp': {'server': '127.0.0.1:$port9', + 'secret': secret_file, + 'strip_realm': 'true'}, + 'unix': {'server': socket_file, + 'strip_realm': 'false'}}} + +queue = Queue() + +realm = K5Realm(kdc_conf=conf) +realm.run_kadminl('modprinc +requires_preauth %s' % realm.user_princ) +flags = ['-T', realm.ccache] +server_addr = '127.0.0.1:' + str(realm.portbase + 9) + +## Test UDP fail / custom username +daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue)) +daemon.start() +queue.get() +realm.run_kadminl(setstr(realm.user_princ, 'udp', 'custom')) +realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1) +verify(daemon, queue, False, 'custom', 'reject') + +## Test UDP success / standard username +daemon = UDPRadiusDaemon(args=(server_addr, secret_file, 'accept', queue)) +daemon.start() +queue.get() +realm.run_kadminl(setstr(realm.user_princ, 'udp')) +realm.kinit(realm.user_princ, 'accept', flags=flags) +verify(daemon, queue, True, realm.user_princ.split('@')[0], 'accept') + +# Detect upstream pyrad bug +# https://github.com/wichert/pyrad/pull/18 +try: + auth = packet.Packet.CreateAuthenticator() + packet.Packet(authenticator=auth, secret="").ReplyPacket() +except AssertionError: + success('Warning: skipping UNIX domain socket tests because of pyrad ' + 'assertion bug') + exit(0) + +## Test Unix fail / custom username +daemon = UnixRadiusDaemon(args=(socket_file, '', 'accept', queue)) +daemon.start() +queue.get() +realm.run_kadminl(setstr(realm.user_princ, 'unix', 'custom')) +realm.kinit(realm.user_princ, 'reject', flags=flags, expected_code=1) +verify(daemon, queue, False, 'custom', 'reject') + +## Test Unix success / standard username +daemon = UnixRadiusDaemon(args=(socket_file, '', 'accept', queue)) +daemon.start() +queue.get() +realm.run_kadminl(setstr(realm.user_princ, 'unix')) +realm.kinit(realm.user_princ, 'accept', flags=flags) +verify(daemon, queue, True, realm.user_princ, 'accept') + +success('OTP tests') |
