diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-02-05 15:03:08 -0500 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2009-02-09 14:35:15 -0500 |
commit | 262ff2d731b1bfc4acd91153088b8fcde7ae92b8 (patch) | |
tree | baf8894d4b357b610113b87d4bfee84de24f08bd /ipa-python | |
parent | 58ae191a5afbf29d78afd3969f8d106415897958 (diff) | |
download | freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.gz freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.xz freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.zip |
Rename ipa-python directory to ipapython so it is a real python library
We used to install it as ipa, now installing it as ipapython. The rpm
is still ipa-python.
Diffstat (limited to 'ipa-python')
-rw-r--r-- | ipa-python/MANIFEST.in | 3 | ||||
-rw-r--r-- | ipa-python/Makefile | 28 | ||||
-rw-r--r-- | ipa-python/README | 18 | ||||
-rw-r--r-- | ipa-python/__init__.py | 0 | ||||
-rw-r--r-- | ipa-python/config.py | 183 | ||||
-rw-r--r-- | ipa-python/dnsclient.py | 443 | ||||
-rw-r--r-- | ipa-python/entity.py | 202 | ||||
-rwxr-xr-x | ipa-python/ipa-python.spec.in | 82 | ||||
-rw-r--r-- | ipa-python/ipa.conf | 3 | ||||
-rw-r--r-- | ipa-python/ipautil.py | 969 | ||||
-rw-r--r-- | ipa-python/ipavalidate.py | 137 | ||||
-rw-r--r-- | ipa-python/radius_util.py | 366 | ||||
-rw-r--r-- | ipa-python/setup.py.in | 77 | ||||
-rw-r--r-- | ipa-python/sysrestore.py | 317 | ||||
-rw-r--r-- | ipa-python/test/test_aci.py | 127 | ||||
-rw-r--r-- | ipa-python/test/test_ipautil.py | 309 | ||||
-rw-r--r-- | ipa-python/test/test_ipavalidate.py | 97 | ||||
-rw-r--r-- | ipa-python/version.py.in | 25 |
18 files changed, 0 insertions, 3386 deletions
diff --git a/ipa-python/MANIFEST.in b/ipa-python/MANIFEST.in deleted file mode 100644 index e2cad6f22..000000000 --- a/ipa-python/MANIFEST.in +++ /dev/null @@ -1,3 +0,0 @@ -include *.conf -include ipa-python.spec* - diff --git a/ipa-python/Makefile b/ipa-python/Makefile deleted file mode 100644 index 4ac027e14..000000000 --- a/ipa-python/Makefile +++ /dev/null @@ -1,28 +0,0 @@ -PYTHONLIBDIR ?= $(shell python -c "from distutils.sysconfig import *; print get_python_lib()") -PACKAGEDIR ?= $(DESTDIR)/$(PYTHONLIBDIR)/ipa -CONFIGDIR ?= $(DESTDIR)/etc/ipa -TESTS = $(wildcard test/*.py) - -all: ; - -install: - if [ "$(DESTDIR)" = "" ]; then \ - python setup.py install; \ - else \ - python setup.py install --root $(DESTDIR); \ - fi - -clean: - rm -f *~ *.pyc - -distclean: clean - rm -f setup.py ipa-python.spec version.py - -maintainer-clean: distclean - rm -rf build - -.PHONY: test -test: $(subst .py,.tst,$(TESTS)) - -%.tst: %.py - python $< diff --git a/ipa-python/README b/ipa-python/README deleted file mode 100644 index c966e44bf..000000000 --- a/ipa-python/README +++ /dev/null @@ -1,18 +0,0 @@ -This is a set of libraries common to IPA clients and servers though mostly -geared currently towards command-line tools. - -A brief overview: - -config.py - identify the IPA server domain and realm. It uses dnsclient to - try to detect this information first and will fall back to - /etc/ipa/ipa.conf if that fails. -dnsclient.py - find IPA information via DNS - -ipautil.py - helper functions - -radius_util.py - helper functions for Radius - -entity.py - entity is the main data type. User and Group extend this class - (but don't add anything currently). - -ipavalidate.py - basic data validation routines diff --git a/ipa-python/__init__.py b/ipa-python/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/ipa-python/__init__.py +++ /dev/null diff --git a/ipa-python/config.py b/ipa-python/config.py deleted file mode 100644 index 8755f628c..000000000 --- a/ipa-python/config.py +++ /dev/null @@ -1,183 +0,0 @@ -# Authors: Karl MacMillan <kmacmill@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import ConfigParser -from optparse import OptionParser, IndentedHelpFormatter - -import krbV -import socket -import ipa.dnsclient -import re - -class IPAConfigError(Exception): - def __init__(self, msg=''): - self.msg = msg - Exception.__init__(self, msg) - - def __repr__(self): - return self.msg - - __str__ = __repr__ - -class IPAFormatter(IndentedHelpFormatter): - """Our own optparse formatter that indents multiple lined usage string.""" - def format_usage(self, usage): - usage_string = "Usage:" - spacing = " " * len(usage_string) - lines = usage.split("\n") - ret = "%s %s\n" % (usage_string, lines[0]) - for line in lines[1:]: - ret += "%s %s\n" % (spacing, line) - return ret - -def verify_args(parser, args, needed_args = None): - """Verify that we have all positional arguments we need, if not, exit.""" - if needed_args: - needed_list = needed_args.split(" ") - else: - needed_list = [] - len_need = len(needed_list) - len_have = len(args) - if len_have > len_need: - parser.error("too many arguments") - elif len_have < len_need: - parser.error("no %s specified" % needed_list[len_have]) - -class IPAConfig: - def __init__(self): - self.default_realm = None - self.default_server = [] - self.default_domain = None - - def get_realm(self): - if self.default_realm: - return self.default_realm - else: - raise IPAConfigError("no default realm") - - def get_server(self): - if len(self.default_server): - return self.default_server - else: - raise IPAConfigError("no default server") - - def get_domain(self): - if self.default_domain: - return self.default_domain - else: - raise IPAConfigError("no default domain") - -# Global library config -config = IPAConfig() - -def __parse_config(discover_server = True): - p = ConfigParser.SafeConfigParser() - p.read("/etc/ipa/ipa.conf") - - try: - if not config.default_realm: - config.default_realm = p.get("defaults", "realm") - except: - pass - if discover_server: - try: - s = p.get("defaults", "server") - config.default_server.extend(re.sub("\s+", "", s).split(',')) - except: - pass - try: - if not config.default_domain: - config.default_domain = p.get("defaults", "domain") - except: - pass - -def __discover_config(discover_server = True): - rl = 0 - try: - if not config.default_realm: - krbctx = krbV.default_context() - config.default_realm = krbctx.default_realm - if not config.default_realm: - return False - - if not config.default_domain: - #try once with REALM -> domain - dom_name = config.default_realm.lower() - name = "_ldap._tcp."+dom_name+"." - rs = ipa.dnsclient.query(name, ipa.dnsclient.DNS_C_IN, ipa.dnsclient.DNS_T_SRV) - rl = len(rs) - if rl == 0: - #try cycling on domain components of FQDN - dom_name = socket.getfqdn() - while rl == 0: - tok = dom_name.find(".") - if tok == -1: - return False - dom_name = dom_name[tok+1:] - name = "_ldap._tcp." + dom_name + "." - rs = ipa.dnsclient.query(name, ipa.dnsclient.DNS_C_IN, ipa.dnsclient.DNS_T_SRV) - rl = len(rs) - - config.default_domain = dom_name - - if discover_server: - if rl == 0: - name = "_ldap._tcp."+config.default_domain+"." - rs = ipa.dnsclient.query(name, ipa.dnsclient.DNS_C_IN, ipa.dnsclient.DNS_T_SRV) - - for r in rs: - if r.dns_type == ipa.dnsclient.DNS_T_SRV: - rsrv = r.rdata.server.rstrip(".") - config.default_server.append(rsrv) - - except: - pass - -def add_standard_options(parser): - parser.add_option("--realm", dest="realm", help="Override default IPA realm") - parser.add_option("--server", dest="server", help="Override default IPA server") - parser.add_option("--domain", dest="domain", help="Override default IPA DNS domain") - -def init_config(options=None): - if options: - config.default_realm = options.realm - config.default_domain = options.domain - if options.server: - config.default_server.extend(options.server.split(",")) - - if len(config.default_server): - discover_server = False - else: - discover_server = True - __parse_config(discover_server) - __discover_config(discover_server) - - # make sure the server list only contains unique items - new_server = [] - for server in config.default_server: - if server not in new_server: - new_server.append(server) - config.default_server = new_server - - if not config.default_realm: - raise IPAConfigError("IPA realm not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.") - if not config.default_server: - raise IPAConfigError("IPA server not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.") - if not config.default_domain: - raise IPAConfigError("IPA domain not found in the config file (/etc/ipa/ipa.conf) or on the command line.") diff --git a/ipa-python/dnsclient.py b/ipa-python/dnsclient.py deleted file mode 100644 index 58d93d850..000000000 --- a/ipa-python/dnsclient.py +++ /dev/null @@ -1,443 +0,0 @@ -# -# Copyright 2001, 2005 Red Hat, Inc. -# -# This is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 only -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. -# - -import struct -import socket -import sys - -import acutil - -DNS_C_IN = 1 -DNS_C_CS = 2 -DNS_C_CHAOS = 3 -DNS_C_HS = 4 -DNS_C_ANY = 255 - -DNS_T_A = 1 -DNS_T_NS = 2 -DNS_T_CNAME = 5 -DNS_T_SOA = 6 -DNS_T_NULL = 10 -DNS_T_WKS = 11 -DNS_T_PTR = 12 -DNS_T_HINFO = 13 -DNS_T_MX = 15 -DNS_T_TXT = 16 -DNS_T_SRV = 33 -DNS_T_ANY = 255 - -DEBUG_DNSCLIENT = False - -class DNSQueryHeader: - FORMAT = "!HBBHHHH" - def __init__(self): - self.dns_id = 0 - self.dns_rd = 0 - self.dns_tc = 0 - self.dns_aa = 0 - self.dns_opcode = 0 - self.dns_qr = 0 - self.dns_rcode = 0 - self.dns_z = 0 - self.dns_ra = 0 - self.dns_qdcount = 0 - self.dns_ancount = 0 - self.dns_nscount = 0 - self.dns_arcount = 0 - - def pack(self): - return struct.pack(DNSQueryHeader.FORMAT, - self.dns_id, - (self.dns_rd & 1) | - (self.dns_tc & 1) << 1 | - (self.dns_aa & 1) << 2 | - (self.dns_opcode & 15) << 3 | - (self.dns_qr & 1) << 7, - (self.dns_rcode & 15) | - (self.dns_z & 7) << 4 | - (self.dns_ra & 1) << 7, - self.dns_qdcount, - self.dns_ancount, - self.dns_nscount, - self.dns_arcount) - - def unpack(self, data): - (self.dns_id, byte1, byte2, self.dns_qdcount, self.dns_ancount, - self.dns_nscount, self.dns_arcount) = struct.unpack(DNSQueryHeader.FORMAT, data[0:self.size()]) - self.dns_rd = byte1 & 1 - self.dns_tc = (byte1 >> 1) & 1 - self.dns_aa = (byte1 >> 2) & 1 - self.dns_opcode = (byte1 >> 3) & 15 - self.dns_qr = (byte1 >> 7) & 1 - self.dns_rcode = byte2 & 15 - self.dns_z = (byte2 >> 4) & 7 - self.dns_ra = (byte1 >> 7) & 1 - - def size(self): - return struct.calcsize(DNSQueryHeader.FORMAT) - -def unpackQueryHeader(data): - header = DNSQueryHeader() - header.unpack(data) - return header - -class DNSResult: - FORMAT = "!HHIH" - QFORMAT = "!HH" - def __init__(self): - self.dns_name = "" - self.dns_type = 0 - self.dns_class = 0 - self.dns_ttl = 0 - self.dns_rlength = 0 - self.rdata = None - - def unpack(self, data): - (self.dns_type, self.dns_class, self.dns_ttl, - self.dns_rlength) = struct.unpack(DNSResult.FORMAT, data[0:self.size()]) - - def qunpack(self, data): - (self.dns_type, self.dns_class) = struct.unpack(DNSResult.QFORMAT, data[0:self.qsize()]) - - def size(self): - return struct.calcsize(DNSResult.FORMAT) - - def qsize(self): - return struct.calcsize(DNSResult.QFORMAT) - -class DNSRData: - def __init__(self): - pass - -#typedef struct dns_rr_a { -# u_int32_t address; -#} dns_rr_a_t; -# -#typedef struct dns_rr_cname { -# const char *cname; -#} dns_rr_cname_t; -# -#typedef struct dns_rr_hinfo { -# const char *cpu, *os; -#} dns_rr_hinfo_t; -# -#typedef struct dns_rr_mx { -# u_int16_t preference; -# const char *exchange; -#} dns_rr_mx_t; -# -#typedef struct dns_rr_null { -# unsigned const char *data; -#} dns_rr_null_t; -# -#typedef struct dns_rr_ns { -# const char *nsdname; -#} dns_rr_ns_t; -# -#typedef struct dns_rr_ptr { -# const char *ptrdname; -#} dns_rr_ptr_t; -# -#typedef struct dns_rr_soa { -# const char *mname; -# const char *rname; -# u_int32_t serial; -# int32_t refresh; -# int32_t retry; -# int32_t expire; -# int32_t minimum; -#} dns_rr_soa_t; -# -#typedef struct dns_rr_txt { -# const char *data; -#} dns_rr_txt_t; -# -#typedef struct dns_rr_srv { -# const char *server; -# u_int16_t priority; -# u_int16_t weight; -# u_int16_t port; -#} dns_rr_srv_t; - -def dnsNameToLabel(name): - out = "" - name = name.split(".") - for part in name: - out += chr(len(part)) + part - return out - -def dnsFormatQuery(query, qclass, qtype): - header = DNSQueryHeader() - - header.dns_id = 0 # FIXME: id = 0 - header.dns_rd = 1 # don't know why the original code didn't request recursion for non SOA requests - header.dns_qr = 0 # query - header.dns_opcode = 0 # standard query - header.dns_qdcount = 1 # single query - - qlabel = dnsNameToLabel(query) - if not qlabel: - return "" - - out = header.pack() + qlabel - out += chr(qtype >> 8) - out += chr(qtype & 0xff) - out += chr(qclass >> 8) - out += chr(qclass & 0xff) - - return out - -def dnsParseLabel(label, base): - # returns (output, rest) - if not label: - return ("", None) - - update = 1 - rest = label - output = "" - skip = 0 - - try: - while ord(rest[0]): - if ord(rest[0]) & 0xc0: - rest = base[((ord(rest[0]) & 0x3f) << 8) + ord(rest[1]):] - if update: - skip += 2 - update = 0 - continue - output += rest[1:ord(rest[0]) + 1] + "." - if update: - skip += ord(rest[0]) + 1 - rest = rest[ord(rest[0]) + 1:] - except IndexError: - return ("", None) - return (label[skip+update:], output) - -def dnsParseA(data, base): - rdata = DNSRData() - if len(data) < 4: - rdata.address = 0 - return None - - rdata.address = (ord(data[0])<<24) | (ord(data[1])<<16) | (ord(data[2])<<8) | (ord(data[3])<<0) - - if DEBUG_DNSCLIENT: - print "A = %d.%d.%d.%d." % (ord(data[0]), ord(data[1]), ord(data[2]), ord(data[3])) - return rdata - -def dnsParseText(data): - if len(data) < 1: - return ("", None) - tlen = ord(data[0]) - if len(data) < tlen + 1: - return ("", None) - return (data[tlen+1:], data[1:tlen+1]) - -def dnsParseNS(data, base): - rdata = DNSRData() - (rest, rdata.nsdname) = dnsParseLabel(data, base) - if DEBUG_DNSCLIENT: - print "NS DNAME = \"%s\"." % (rdata.nsdname) - return rdata - -def dnsParseCNAME(data, base): - rdata = DNSRData() - (rest, rdata.cname) = dnsParseLabel(data, base) - if DEBUG_DNSCLIENT: - print "CNAME = \"%s\"." % (rdata.cname) - return rdata - -def dnsParseSOA(data, base): - rdata = DNSRData() - format = "!IIIII" - - (rest, rdata.mname) = dnsParseLabel(data, base) - if rdata.mname is None: - return None - (rest, rdata.rname) = dnsParseLabel(rest, base) - if rdata.rname is None: - return None - if len(rest) < struct.calcsize(format): - return None - - (rdata.serial, rdata.refresh, rdata.retry, rdata.expire, - rdata.minimum) = struct.unpack(format, rest[:struct.calcsize(format)]) - - if DEBUG_DNSCLIENT: - print "SOA(mname) = \"%s\"." % rdata.mname - print "SOA(rname) = \"%s\"." % rdata.rname - print "SOA(serial) = %d." % rdata.serial - print "SOA(refresh) = %d." % rdata.refresh - print "SOA(retry) = %d." % rdata.retry - print "SOA(expire) = %d." % rdata.expire - print "SOA(minimum) = %d." % rdata.minimum - return rdata - -def dnsParseNULL(data, base): - # um, yeah - return None - -def dnsParseWKS(data, base): - return None - -def dnsParseHINFO(data, base): - rdata = DNSRData() - (rest, rdata.cpu) = dnsParseText(data) - if rest: - (rest, rdata.os) = dnsParseText(rest) - if DEBUG_DNSCLIENT: - print "HINFO(cpu) = \"%s\"." % rdata.cpu - print "HINFO(os) = \"%s\"." % rdata.os - return rdata - -def dnsParseMX(data, base): - rdata = DNSRData() - if len(data) < 2: - return None - rdata.preference = (ord(data[0]) << 8) | ord(data[1]) - (rest, rdata.exchange) = dnsParseLabel(data[2:], base) - if DEBUG_DNSCLIENT: - print "MX(exchanger) = \"%s\"." % rdata.exchange - print "MX(preference) = %d." % rdata.preference - return rdata - -def dnsParseTXT(data, base): - rdata = DNSRData() - (rest, rdata.data) = dnsParseText(data) - if DEBUG_DNSCLIENT: - print "TXT = \"%s\"." % rdata.data - return rdata - -def dnsParsePTR(data, base): - rdata = DNSRData() - (rest, rdata.ptrdname) = dnsParseLabel(data, base) - if DEBUG_DNSCLIENT: - print "PTR = \"%s\"." % rdata.ptrdname - return rdata - -def dnsParseSRV(data, base): - rdata = DNSRData() - format = "!HHH" - flen = struct.calcsize(format) - if len(data) < flen: - return None - - (rdata.priority, rdata.weight, rdata.port) = struct.unpack(format, data[:flen]) - (rest, rdata.server) = dnsParseLabel(data[flen:], base) - if DEBUG_DNSCLIENT: - print "SRV(server) = \"%s\"." % rdata.server - print "SRV(weight) = %d." % rdata.weight - print "SRV(priority) = %d." % rdata.priority - print "SRV(port) = %d." % rdata.port - return rdata - -def dnsParseResults(results): - try: - header = unpackQueryHeader(results) - except struct.error: - return [] - - if header.dns_qr != 1: # should be a response - return [] - - if header.dns_rcode != 0: # should be no error - return [] - - rest = results[header.size():] - - rrlist = [] - - for i in xrange(header.dns_qdcount): - if not rest: - return [] - - qq = DNSResult() - - (rest, label) = dnsParseLabel(rest, results) - if label is None: - return [] - - if len(rest) < qq.qsize(): - return [] - - qq.qunpack(rest) - - rest = rest[qq.qsize():] - - if DEBUG_DNSCLIENT: - print "Queried for '%s', class = %d, type = %d." % (label, - qq.dns_class, qq.dns_type) - - for i in xrange(header.dns_ancount + header.dns_nscount + header.dns_arcount): - (rest, label) = dnsParseLabel(rest, results) - if label is None: - return [] - - rr = DNSResult() - - rr.dns_name = label - - if len(rest) < rr.size(): - return [] - - rr.unpack(rest) - - rest = rest[rr.size():] - - if DEBUG_DNSCLIENT: - print "Answer %d for '%s', class = %d, type = %d, ttl = %d." % (i, - rr.dns_name, rr.dns_class, rr.dns_type, - rr.dns_ttl) - - if len(rest) < rr.dns_rlength: - if DEBUG_DNSCLIENT: - print "Answer too short." - return [] - - fmap = { DNS_T_A: dnsParseA, DNS_T_NS: dnsParseNS, - DNS_T_CNAME: dnsParseCNAME, DNS_T_SOA: dnsParseSOA, - DNS_T_NULL: dnsParseNULL, DNS_T_WKS: dnsParseWKS, - DNS_T_PTR: dnsParsePTR, DNS_T_HINFO: dnsParseHINFO, - DNS_T_MX: dnsParseMX, DNS_T_TXT: dnsParseTXT, - DNS_T_SRV: dnsParseSRV} - - if not rr.dns_type in fmap: - if DEBUG_DNSCLIENT: - print "Don't know how to parse RR type %d!" % rr.dns_type - else: - rr.rdata = fmap[rr.dns_type](rest[:rr.dns_rlength], results) - - rest = rest[rr.dns_rlength:] - rrlist += [rr] - - return rrlist - -def query(query, qclass, qtype): - qdata = dnsFormatQuery(query, qclass, qtype) - if not qdata: - return [] - answer = acutil.res_send(qdata) - if not answer: - return [] - return dnsParseResults(answer) - -if __name__ == '__main__': - DEBUG_DNSCLIENT = True - print "Sending query." - rr = query(len(sys.argv) > 1 and sys.argv[1] or "devserv.devel.redhat.com.", - DNS_C_IN, DNS_T_ANY) - sys.exit(0) diff --git a/ipa-python/entity.py b/ipa-python/entity.py deleted file mode 100644 index 64db350ba..000000000 --- a/ipa-python/entity.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import ldap -import ldif -import re -import cStringIO -import copy - -import ipa.ipautil - -def utf8_encode_value(value): - if isinstance(value,unicode): - return value.encode('utf-8') - return value - -def utf8_encode_values(values): - if isinstance(values,list) or isinstance(values,tuple): - return map(utf8_encode_value, values) - else: - return utf8_encode_value(values) - -def copy_CIDict(x): - """Do a deep copy of a CIDict""" - y = {} - for key, value in x.iteritems(): - y[copy.deepcopy(key)] = copy.deepcopy(value) - return y - -class Entity: - """This class represents an IPA user. An LDAP entry consists of a DN - and a list of attributes. Each attribute consists of a name and a list of - values. For the time being I will maintain this. - - In python-ldap, entries are returned as a list of 2-tuples. - Instance variables: - dn - string - the string DN of the entry - data - CIDict - case insensitive dict of the attributes and values - orig_data - CIDict - case insentiive dict of the original attributes and values""" - - def __init__(self,entrydata=None): - """data is the raw data returned from the python-ldap result method, - which is a search result entry or a reference or None. - If creating a new empty entry, data is the string DN.""" - if entrydata: - if isinstance(entrydata,tuple): - self.dn = entrydata[0] - self.data = ipa.ipautil.CIDict(entrydata[1]) - elif isinstance(entrydata,str) or isinstance(entrydata,unicode): - self.dn = entrydata - self.data = ipa.ipautil.CIDict() - elif isinstance(entrydata,dict): - self.dn = entrydata['dn'] - del entrydata['dn'] - self.data = ipa.ipautil.CIDict(entrydata) - else: - self.dn = '' - self.data = ipa.ipautil.CIDict() - - self.orig_data = ipa.ipautil.CIDict(copy_CIDict(self.data)) - - def __nonzero__(self): - """This allows us to do tests like if entry: returns false if there is no data, - true otherwise""" - return self.data != None and len(self.data) > 0 - - def hasAttr(self,name): - """Return True if this entry has an attribute named name, False otherwise""" - return self.data and self.data.has_key(name) - - def __setattr__(self,name,value): - """One should use setValue() or setValues() to set values except for - dn and data which are special.""" - if name != 'dn' and name != 'data' and name != 'orig_data': - raise KeyError, 'use setValue() or setValues()' - else: - self.__dict__[name] = value - - def __getattr__(self,name): - """If name is the name of an LDAP attribute, return the first value for that - attribute - equivalent to getValue - this allows the use of - entry.cn - instead of - entry.getValue('cn') - This also allows us to return None if an attribute is not found rather than - throwing an exception""" - return self.getValue(name) - - def getValues(self,name): - """Get the list (array) of values for the attribute named name""" - return self.data.get(name) - - def getValue(self,name,default=None): - """Get the first value for the attribute named name""" - value = self.data.get(name,default) - if isinstance(value,list) or isinstance(value,tuple): - return value[0] - else: - return value - - def setValue(self,name,*value): - """Value passed in may be a single value, several values, or a single sequence. - For example: - ent.setValue('name', 'value') - ent.setValue('name', 'value1', 'value2', ..., 'valueN') - ent.setValue('name', ['value1', 'value2', ..., 'valueN']) - ent.setValue('name', ('value1', 'value2', ..., 'valueN')) - Since *value is a tuple, we may have to extract a list or tuple from that - tuple as in the last two examples above""" - if (len(value) < 1): - return - if (len(value) == 1): - self.data[name] = utf8_encode_values(value[0]) - else: - self.data[name] = utf8_encode_values(value) - - setValues = setValue - - def setValueNotEmpty(self,name,*value): - """Similar to setValue() but will not set an empty field. This - is an attempt to avoid adding empty attributes.""" - if (len(value) >= 1) and value[0] and len(value[0]) > 0: - if isinstance(value[0], list): - if len(value[0][0]) > 0: - self.setValue(name, *value) - return - else: - self.setValue(name, *value) - return - - # At this point we have an empty incoming value. See if they are - # trying to erase the current value. If so we'll delete it so - # it gets marked as removed in the modlist. - v = self.getValues(name) - if v: - self.delValue(name) - - return - - def delValue(self,name): - """Remove the attribute named name.""" - if self.data.get(name,None): - del self.data[name] - - def toTupleList(self): - """Convert the attrs and values to a list of 2-tuples. The first element - of the tuple is the attribute name. The second element is either a - single value or a list of values.""" - return self.data.items() - - def toDict(self): - """Convert the attrs and values to a dict. The dict is keyed on the - attribute name. The value is either single value or a list of values.""" - result = ipa.ipautil.CIDict(self.data) - result['dn'] = self.dn - return result - - def attrList(self): - """Return a list of all attributes in the entry""" - return self.data.keys() - - def origDataDict(self): - """Returns a dict of the original values of the user. Used for updates.""" - result = ipa.ipautil.CIDict(self.orig_data) - result['dn'] = self.dn - return result - -# def __str__(self): -# """Convert the Entry to its LDIF representation""" -# return self.__repr__() -# -# # the ldif class base64 encodes some attrs which I would rather see in raw form - to -# # encode specific attrs as base64, add them to the list below -# ldif.safe_string_re = re.compile('^$') -# base64_attrs = ['nsstate', 'krbprincipalkey', 'krbExtraData'] -# -# def __repr__(self): -# """Convert the Entry to its LDIF representation""" -# sio = cStringIO.StringIO() -# # what's all this then? the unparse method will currently only accept -# # a list or a dict, not a class derived from them. self.data is a -# # cidict, so unparse barfs on it. I've filed a bug against python-ldap, -# # but in the meantime, we have to convert to a plain old dict for printing -# # I also don't want to see wrapping, so set the line width really high (1000) -# newdata = {} -# newdata.update(self.data) -# ldif.LDIFWriter(sio,User.base64_attrs,1000).unparse(self.dn,newdata) -# return sio.getvalue() diff --git a/ipa-python/ipa-python.spec.in b/ipa-python/ipa-python.spec.in deleted file mode 100755 index a41a413e6..000000000 --- a/ipa-python/ipa-python.spec.in +++ /dev/null @@ -1,82 +0,0 @@ -Name: ipa-python -Version: __VERSION__ -Release: __RELEASE__%{?dist} -Summary: IPA authentication server - -Group: System Environment/Base -License: GPLv2 -URL: http://www.freeipa.org -Source0: http://www.freeipa.org/downloads/%{name}-%{version}.tgz -BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) -BuildArch: noarch -BuildRequires: python-devel -Requires: python-kerberos gnupg - -%{!?python_sitelib: %define python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")} - -%description -IPA is a server for identity, policy, and audit. - -%prep -%setup -q - -%build - -%install -rm -rf %{buildroot} -%{__python} setup.py install --no-compile --root=%{buildroot} - -%clean -rm -rf %{buildroot} - -%files -%defattr(-,root,root,-) -%{python_sitelib}/* -%config(noreplace) %{_sysconfdir}/ipa/ipa.conf - -%changelog -* Thu Apr 3 2008 Rob Crittenden <rcritten@redhat.com> - 1.0.0-1 -- Version bump for release - -* Thu Feb 21 2008 Rob Crittenden <rcritten@redhat.com> - 0.99.0-1 -- Version bump for release - -* Thu Jan 31 2008 Rob Crittenden <rcritten@redhat.com> - 0.6.0-4 -- Marked with wrong license. IPA is GPLv2. - -* Thu Jan 24 2008 Rob Crittenden <rcritten@redhat.com> - 0.6.0-3 -- Use new name of PyKerberos, python-kerberos, in Requires - -* Thu Jan 17 2008 Rob Crittenden <rcritten@redhat.com> - 0.6.0-2 -- Fixed License in specfile - -* Fri Dec 21 2007 Karl MacMillan <kmacmill@redhat.com> - 0.6.0-1 -- Version bump for release - -* Wed Nov 21 2007 Karl MacMillan <kmacmill@redhat.com> - 0.5.0-1 -- Version bump for release and rename of rpm - -* Thu Nov 1 2007 Karl MacMillan <kmacmill@redhat.com> - 0.4.1-1 -- Version bump for release - -* Wed Oct 17 2007 Rob Crittenden <rcritten@redhat.com> - 0.4.0-2 -- Use new python setup.py build script - -* Tue Oct 2 2007 Karl MacMillan <kmacmill@redhat.com> - 0.4.0-1 -- Milestone 4 - -* Mon Sep 10 2007 Karl MacMillan <kmacmill@redhat.com> - 0.3.0-1 -- Milestone 3 - -* Fri Aug 17 2007 Karl MacMillan <kmacmill@redhat.com> = 0.2.0-4 -- Added PyKerberos dep. - -* Mon Aug 5 2007 Rob Crittenden <rcritten@redhat.com> - 0.1.0-3 -- Abstracted client class to work directly or over RPC - -* Wed Aug 1 2007 Rob Crittenden <rcritten@redhat.com> - 0.1.0-2 -- Add User class -- Add kerberos authentication to the XML-RPC request made from tools. - -* Fri Jul 27 2007 Karl MacMillan <kmacmill@localhost.localdomain> - 0.1.0-1 -- Initial rpm version diff --git a/ipa-python/ipa.conf b/ipa-python/ipa.conf deleted file mode 100644 index 516f764d5..000000000 --- a/ipa-python/ipa.conf +++ /dev/null @@ -1,3 +0,0 @@ -[defaults] -# realm = EXAMPLE.COM -# server = ipa.example.com diff --git a/ipa-python/ipautil.py b/ipa-python/ipautil.py deleted file mode 100644 index aa49c3ba3..000000000 --- a/ipa-python/ipautil.py +++ /dev/null @@ -1,969 +0,0 @@ -# Authors: Simo Sorce <ssorce@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -SHARE_DIR = "/usr/share/ipa/" -PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins" - -import string -import tempfile -import logging -import subprocess -import random -import os, sys, traceback, readline -import stat -import shutil - -from ipa import ipavalidate -from types import * - -import re -import xmlrpclib -import datetime -from ipa import config -try: - from subprocess import CalledProcessError - class CalledProcessError(subprocess.CalledProcessError): - def __init__(self, returncode, cmd): - super(CalledProcessError, self).__init__(returncode, cmd) -except ImportError: - # Python 2.4 doesn't implement CalledProcessError - class CalledProcessError(Exception): - """This exception is raised when a process run by check_call() returns - a non-zero exit status. The exit status will be stored in the - returncode attribute.""" - def __init__(self, returncode, cmd): - self.returncode = returncode - self.cmd = cmd - def __str__(self): - return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) - -def get_domain_name(): - try: - config.init_config() - domain_name = config.config.get_domain() - except Exception, e: - return None - - return domain_name - -def realm_to_suffix(realm_name): - s = realm_name.split(".") - terms = ["dc=" + x.lower() for x in s] - return ",".join(terms) - -def template_str(txt, vars): - return string.Template(txt).substitute(vars) - -def template_file(infilename, vars): - txt = open(infilename).read() - return template_str(txt, vars) - -def write_tmp_file(txt): - fd = tempfile.NamedTemporaryFile() - fd.write(txt) - fd.flush() - - return fd - -def run(args, stdin=None): - if stdin: - p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) - stdout,stderr = p.communicate(stdin) - else: - p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) - stdout,stderr = p.communicate() - - logging.info(stdout) - logging.info(stderr) - - if p.returncode != 0: - raise CalledProcessError(p.returncode, ' '.join(args)) - - return (stdout, stderr) - -def file_exists(filename): - try: - mode = os.stat(filename)[stat.ST_MODE] - if stat.S_ISREG(mode): - return True - else: - return False - except: - return False - -def dir_exists(filename): - try: - mode = os.stat(filename)[stat.ST_MODE] - if stat.S_ISDIR(mode): - return True - else: - return False - except: - return False - -def install_file(fname, dest): - if file_exists(dest): - os.rename(dest, dest + ".orig") - shutil.move(fname, dest) - -def backup_file(fname): - if file_exists(fname): - os.rename(fname, fname + ".orig") - -# uses gpg to compress and encrypt a file -def encrypt_file(source, dest, password, workdir = None): - if type(source) is not StringType or not len(source): - raise ValueError('Missing Source File') - #stat it so that we get back an exception if it does no t exist - os.stat(source) - - if type(dest) is not StringType or not len(dest): - raise ValueError('Missing Destination File') - - if type(password) is not StringType or not len(password): - raise ValueError('Missing Password') - - #create a tempdir so that we can clean up with easily - tempdir = tempfile.mkdtemp('', 'ipa-', workdir) - gpgdir = tempdir+"/.gnupg" - - try: - try: - #give gpg a fake dir so that we can leater remove all - #the cruft when we clean up the tempdir - os.mkdir(gpgdir) - args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-c', source] - run(args, password) - except: - raise - finally: - #job done, clean up - shutil.rmtree(tempdir, ignore_errors=True) - - -def decrypt_file(source, dest, password, workdir = None): - if type(source) is not StringType or not len(source): - raise ValueError('Missing Source File') - #stat it so that we get back an exception if it does no t exist - os.stat(source) - - if type(dest) is not StringType or not len(dest): - raise ValueError('Missing Destination File') - - if type(password) is not StringType or not len(password): - raise ValueError('Missing Password') - - #create a tempdir so that we can clean up with easily - tempdir = tempfile.mkdtemp('', 'ipa-', workdir) - gpgdir = tempdir+"/.gnupg" - - try: - try: - #give gpg a fake dir so that we can leater remove all - #the cruft when we clean up the tempdir - os.mkdir(gpgdir) - args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-d', source] - run(args, password) - except: - raise - finally: - #job done, clean up - shutil.rmtree(tempdir, ignore_errors=True) - - -class CIDict(dict): - """ - Case-insensitive but case-respecting dictionary. - - This code is derived from python-ldap's cidict.py module, - written by stroeder: http://python-ldap.sourceforge.net/ - - This version extends 'dict' so it works properly with TurboGears. - If you extend UserDict, isinstance(foo, dict) returns false. - """ - - def __init__(self,default=None): - super(CIDict, self).__init__() - self._keys = {} - self.update(default or {}) - - def __getitem__(self,key): - return super(CIDict,self).__getitem__(string.lower(key)) - - def __setitem__(self,key,value): - lower_key = string.lower(key) - self._keys[lower_key] = key - return super(CIDict,self).__setitem__(string.lower(key),value) - - def __delitem__(self,key): - lower_key = string.lower(key) - del self._keys[lower_key] - return super(CIDict,self).__delitem__(string.lower(key)) - - def update(self,dict): - for key in dict.keys(): - self[key] = dict[key] - - def has_key(self,key): - return super(CIDict, self).has_key(string.lower(key)) - - def get(self,key,failobj=None): - try: - return self[key] - except KeyError: - return failobj - - def keys(self): - return self._keys.values() - - def items(self): - result = [] - for k in self._keys.values(): - result.append((k,self[k])) - return result - - def copy(self): - copy = {} - for k in self._keys.values(): - copy[k] = self[k] - return copy - - def iteritems(self): - return self.copy().iteritems() - - def iterkeys(self): - return self.copy().iterkeys() - - def setdefault(self,key,value=None): - try: - return self[key] - except KeyError: - self[key] = value - return value - - def pop(self, key, *args): - try: - value = self[key] - del self[key] - return value - except KeyError: - if len(args) == 1: - return args[0] - raise - - def popitem(self): - (lower_key,value) = super(CIDict,self).popitem() - key = self._keys[lower_key] - del self._keys[lower_key] - - return (key,value) - - -# -# The safe_string_re regexp and needs_base64 function are extracted from the -# python-ldap ldif module, which was -# written by Michael Stroeder <michael@stroeder.com> -# http://python-ldap.sourceforge.net -# -# It was extracted because ipaldap.py is naughtily reaching into the ldif -# module and squashing this regexp. -# -SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)' -safe_string_re = re.compile(SAFE_STRING_PATTERN) - -def needs_base64(s): - """ - returns 1 if s has to be base-64 encoded because of special chars - """ - return not safe_string_re.search(s) is None - - -def wrap_binary_data(data): - """Converts all binary data strings into Binary objects for transport - back over xmlrpc.""" - if isinstance(data, str): - if needs_base64(data): - return xmlrpclib.Binary(data) - else: - return data - elif isinstance(data, list) or isinstance(data,tuple): - retval = [] - for value in data: - retval.append(wrap_binary_data(value)) - return retval - elif isinstance(data, dict): - retval = {} - for (k,v) in data.iteritems(): - retval[k] = wrap_binary_data(v) - return retval - else: - return data - - -def unwrap_binary_data(data): - """Converts all Binary objects back into strings.""" - if isinstance(data, xmlrpclib.Binary): - # The data is decoded by the xmlproxy, but is stored - # in a binary object for us. - return str(data) - elif isinstance(data, str): - return data - elif isinstance(data, list) or isinstance(data,tuple): - retval = [] - for value in data: - retval.append(unwrap_binary_data(value)) - return retval - elif isinstance(data, dict): - retval = {} - for (k,v) in data.iteritems(): - retval[k] = unwrap_binary_data(v) - return retval - else: - return data - -class GeneralizedTimeZone(datetime.tzinfo): - """This class is a basic timezone wrapper for the offset specified - in a Generalized Time. It is dst-ignorant.""" - def __init__(self,offsetstr="Z"): - super(GeneralizedTimeZone, self).__init__() - - self.name = offsetstr - self.houroffset = 0 - self.minoffset = 0 - - if offsetstr == "Z": - self.houroffset = 0 - self.minoffset = 0 - else: - if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr): - self.houroffset = int(offsetstr[0:3]) - offsetstr = offsetstr[3:] - if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr): - self.minoffset = int(offsetstr[0:2]) - offsetstr = offsetstr[2:] - if len(offsetstr) > 0: - raise ValueError() - if self.houroffset < 0: - self.minoffset *= -1 - - def utcoffset(self, dt): - return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset) - - def dst(self, dt): - return datetime.timedelta(0) - - def tzname(self, dt): - return self.name - - -def parse_generalized_time(timestr): - """Parses are Generalized Time string (as specified in X.680), - returning a datetime object. Generalized Times are stored inside - the krbPasswordExpiration attribute in LDAP. - - This method doesn't attempt to be perfect wrt timezones. If python - can't be bothered to implement them, how can we...""" - - if len(timestr) < 8: - return None - try: - date = timestr[:8] - time = timestr[8:] - - year = int(date[:4]) - month = int(date[4:6]) - day = int(date[6:8]) - - hour = min = sec = msec = 0 - tzone = None - - if (len(time) >= 2) and re.match(r'\d', time[0]): - hour = int(time[:2]) - time = time[2:] - if len(time) >= 2 and (time[0] == "," or time[0] == "."): - hour_fraction = "." - time = time[1:] - while (len(time) > 0) and re.match(r'\d', time[0]): - hour_fraction += time[0] - time = time[1:] - total_secs = int(float(hour_fraction) * 3600) - min, sec = divmod(total_secs, 60) - - if (len(time) >= 2) and re.match(r'\d', time[0]): - min = int(time[:2]) - time = time[2:] - if len(time) >= 2 and (time[0] == "," or time[0] == "."): - min_fraction = "." - time = time[1:] - while (len(time) > 0) and re.match(r'\d', time[0]): - min_fraction += time[0] - time = time[1:] - sec = int(float(min_fraction) * 60) - - if (len(time) >= 2) and re.match(r'\d', time[0]): - sec = int(time[:2]) - time = time[2:] - if len(time) >= 2 and (time[0] == "," or time[0] == "."): - sec_fraction = "." - time = time[1:] - while (len(time) > 0) and re.match(r'\d', time[0]): - sec_fraction += time[0] - time = time[1:] - msec = int(float(sec_fraction) * 1000000) - - if (len(time) > 0): - tzone = GeneralizedTimeZone(time) - - return datetime.datetime(year, month, day, hour, min, sec, msec, tzone) - - except ValueError: - return None - -def ipa_generate_password(): - rndpwd = '' - r = random.SystemRandom() - for x in range(12): - rndpwd += chr(r.randint(32,126)) - return rndpwd - - -def format_list(items, quote=None, page_width=80): - '''Format a list of items formatting them so they wrap to fit the - available width. The items will be sorted. - - The items may optionally be quoted. The quote parameter may either be - a string, in which case it is added before and after the item. Or the - quote parameter may be a pair (either a tuple or list). In this case - quote[0] is left hand quote and quote[1] is the right hand quote. - ''' - left_quote = right_quote = '' - num_items = len(items) - if not num_items: return "" - - if quote is not None: - if type(quote) in StringTypes: - left_quote = right_quote = quote - elif type(quote) is TupleType or type(quote) is ListType: - left_quote = quote[0] - right_quote = quote[1] - - max_len = max(map(len, items)) - max_len += len(left_quote) + len(right_quote) - num_columns = (page_width + max_len) / (max_len+1) - num_rows = (num_items + num_columns - 1) / num_columns - items.sort() - - rows = [''] * num_rows - i = row = col = 0 - - while i < num_items: - row = 0 - if col == 0: - separator = '' - else: - separator = ' ' - - while i < num_items and row < num_rows: - rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote)) - i += 1 - row += 1 - col += 1 - return '\n'.join(rows) - -key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))") -def parse_key_value_pairs(input): - ''' Given a string composed of key=value pairs parse it and return - a dict of the key/value pairs. Keys must be a word, a key must be followed - by an equal sign (=) and a value. The value may be a single word or may be - quoted. Quotes may be either single or double quotes, but must be balanced. - Inside the quoted text the same quote used to start the quoted value may be - used if it is escaped by preceding it with a backslash (\). - White space between the key, the equal sign, and the value is ignored. - Values are always strings. Empty values must be specified with an empty - quoted string, it's value after parsing will be an empty string. - - Example: The string - - arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote" - - will produce - - arg0= arg1=1 - arg2=two - arg3=three's a crowd - arg4=this is a " quote - ''' - - kv_dict = {} - for match in key_value_re.finditer(input): - key = match.group(1) - quote = match.group('quote') - if match.group(5): - value = match.group(6) - if value is None: value = '' - value = re.sub('\\\%s' % quote, quote, value) - else: - value = match.group(2) - kv_dict[key] = value - return kv_dict - -def parse_items(text): - '''Given text with items separated by whitespace or comma, return a list of those items''' - split_re = re.compile('[ ,\t\n]+') - items = split_re.split(text) - for item in items[:]: - if not item: items.remove(item) - return items - -def read_pairs_file(filename): - comment_re = re.compile('#.*$', re.MULTILINE) - if filename == '-': - fd = sys.stdin - else: - fd = open(filename) - text = fd.read() - text = comment_re.sub('', text) # kill comments - pairs = parse_key_value_pairs(text) - if fd != sys.stdin: fd.close() - return pairs - -def read_items_file(filename): - comment_re = re.compile('#.*$', re.MULTILINE) - if filename == '-': - fd = sys.stdin - else: - fd = open(filename) - text = fd.read() - text = comment_re.sub('', text) # kill comments - items = parse_items(text) - if fd != sys.stdin: fd.close() - return items - -def user_input(prompt, default = None, allow_empty = True): - if default == None: - while True: - ret = raw_input("%s: " % prompt) - if allow_empty or ret.strip(): - return ret - - if isinstance(default, basestring): - while True: - ret = raw_input("%s [%s]: " % (prompt, default)) - if not ret and (allow_empty or default): - return default - elif ret.strip(): - return ret - if isinstance(default, bool): - if default: - choice = "yes" - else: - choice = "no" - while True: - ret = raw_input("%s [%s]: " % (prompt, choice)) - if not ret: - return default - elif ret.lower()[0] == "y": - return True - elif ret.lower()[0] == "n": - return False - if isinstance(default, int): - while True: - try: - ret = raw_input("%s [%s]: " % (prompt, default)) - if not ret: - return default - ret = int(ret) - except ValueError: - pass - else: - return ret - -def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True): - while True: - ret = user_input(prompt, default, allow_empty) - if ipavalidate.Plain(ret, not allow_empty, allow_spaces): - return ret - -def user_input_path(prompt, default = None, allow_empty = True): - if default != None and allow_empty: - prompt += " (enter \"none\" for empty)" - while True: - ret = user_input(prompt, default, allow_empty) - if allow_empty and ret.lower() == "none": - return "" - if ipavalidate.Path(ret, not allow_empty): - return ret - -class AttributeValueCompleter: - ''' - Gets input from the user in the form "lhs operator rhs" - TAB completes partial input. - lhs completes to a name in @lhs_names - The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will - complete to the operator and a default value. - Default values for a lhs value can specified as: - - a string, all lhs values will use this default - - a dict, the lhs value is looked up in the dict to return the default or None - - a function with a single arg, the lhs value, it returns the default or None - - After creating the completer you must open it to set the terminal - up, Then get a line of input from the user by calling read_input() - which returns two values, the lhs and rhs, which might be None if - lhs or rhs was not parsed. After you are done getting input you - should close the completer to restore the terminal. - - Example: (note this is essentially what the convenience function get_pairs() does) - - This will allow the user to autocomplete foo & foobar, both have - defaults defined in a dict. In addition the foobar attribute must - be specified before the prompting loop will exit. Also, this - example show how to require that each attrbute entered by the user - is valid. - - attrs = ['foo', 'foobar'] - defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'} - mandatory_attrs = ['foobar'] - - c = AttributeValueCompleter(attrs, defaults) - c.open() - mandatory_attrs_remaining = mandatory_attrs[:] - - while True: - if mandatory_attrs_remaining: - attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0]) - try: - mandatory_attrs_remaining.remove(attribute) - except ValueError: - pass - else: - attribute, value = c.read_input("Enter: ") - if attribute is None: - # Are we done? - if mandatory_attrs_remaining: - print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) - continue - else: - break - if attribute not in attrs: - print "ERROR: %s is not a valid attribute" % (attribute) - else: - print "got '%s' = '%s'" % (attribute, value) - - c.close() - print "exiting..." - ''' - - def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =', - operator='=', strip_rhs=True): - self.lhs_names = lhs_names - self.default_value = default_value - # lhs_regexp must have named group 'lhs' which returns the contents of the lhs - self.lhs_regexp = lhs_regexp - self.lhs_re = re.compile(self.lhs_regexp) - self.lhs_delims = lhs_delims - self.operator = operator - self.strip_rhs = strip_rhs - self.pairs = None - self._reset() - - def _reset(self): - self.lhs = None - self.lhs_complete = False - self.operator_complete = False - self.rhs = None - - def open(self): - # Save state - self.prev_completer = readline.get_completer() - self.prev_completer_delims = readline.get_completer_delims() - - # Set up for ourself - readline.parse_and_bind("tab: complete") - readline.set_completer(self.complete) - readline.set_completer_delims(self.lhs_delims) - - def close(self): - # Restore previous state - readline.set_completer_delims(self.prev_completer_delims) - readline.set_completer(self.prev_completer) - - def parse_input(self): - '''We are looking for 3 tokens: <lhs,op,rhs> - Extract as much of each token as possible. - Set flags indicating if token is fully parsed. - ''' - try: - self._reset() - buf_len = len(self.line_buffer) - pos = 0 - lhs_match = self.lhs_re.search(self.line_buffer, pos) - if not lhs_match: return # no lhs content - self.lhs = lhs_match.group('lhs') # get lhs contents - pos = lhs_match.end('lhs') # new scanning position - if pos == buf_len: return # nothing after lhs, lhs incomplete - self.lhs_complete = True # something trails the lhs, lhs is complete - operator_beg = self.line_buffer.find(self.operator, pos) # locate operator - if operator_beg == -1: return # did not find the operator - self.operator_complete = True # operator fully parsed - operator_end = operator_beg + len(self.operator) - pos = operator_end # step over the operator - self.rhs = self.line_buffer[pos:] - except Exception, e: - traceback.print_exc() - print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e) - - def get_default_value(self): - '''default_value can be a string, a dict, or a function. - If it's a string it's a global default for all attributes. - If it's a dict the default is looked up in the dict index by attribute. - If it's a function, the function is called with 1 parameter, the attribute - and it should return the default value for the attriubte or None''' - - if not self.lhs_complete: raise ValueError("attribute not parsed") - - # If the user previously provided a value let that override the supplied default - if self.pairs is not None: - prev_value = self.pairs.get(self.lhs) - if prev_value is not None: return prev_value - - # No previous user provided value, query for a default - default_value_type = type(self.default_value) - if default_value_type is DictType: - return self.default_value.get(self.lhs, None) - elif default_value_type is FunctionType: - return self.default_value(self.lhs) - elif default_value_type is StringsType: - return self.default_value - else: - return None - - def get_lhs_completions(self, text): - if text: - self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)] - else: - self.completions = self.lhs_names - - def complete(self, text, state): - self.line_buffer= readline.get_line_buffer() - self.parse_input() - if not self.lhs_complete: - # lhs is not complete, set up to complete the lhs - if state == 0: - beg = readline.get_begidx() - end = readline.get_endidx() - self.get_lhs_completions(self.line_buffer[beg:end]) - if state >= len(self.completions): return None - return self.completions[state] - - - elif not self.operator_complete: - # lhs is complete, but the operator is not so we complete - # by inserting the operator manually. - # Also try to complete the default value at this time. - readline.insert_text('%s ' % self.operator) - default_value = self.get_default_value() - if default_value is not None: - readline.insert_text(default_value) - readline.redisplay() - return None - else: - # lhs and operator are complete, if the the rhs is blank - # (either empty or only only whitespace) then attempt - # to complete by inserting the default value, otherwise - # there is nothing we can complete to so we're done. - if self.rhs.strip(): - return None - default_value = self.get_default_value() - if default_value is not None: - readline.insert_text(default_value) - readline.redisplay() - return None - - def pre_input_hook(self): - readline.insert_text('%s %s ' % (self.initial_lhs, self.operator)) - readline.redisplay() - - def read_input(self, prompt, initial_lhs=None): - self.initial_lhs = initial_lhs - try: - self._reset() - if initial_lhs is None: - readline.set_pre_input_hook(None) - else: - readline.set_pre_input_hook(self.pre_input_hook) - self.line_buffer = raw_input(prompt).strip() - self.parse_input() - if self.strip_rhs and self.rhs is not None: - return self.lhs, self.rhs.strip() - else: - return self.lhs, self.rhs - except EOFError: - return None, None - - def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True): - self.pairs = {} - if mandatory_attrs: - mandatory_attrs_remaining = mandatory_attrs[:] - else: - mandatory_attrs_remaining = [] - - print "Enter name = value" - print "Press <ENTER> to accept, a blank line terminates input" - print "Pressing <TAB> will auto completes name, assignment, and value" - print - while True: - if mandatory_attrs_remaining: - attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0]) - else: - attribute, value = self.read_input(prompt) - if attribute is None: - # Are we done? - if mandatory_attrs_remaining: - print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) - continue - else: - break - if value is None: - if value_required: - print "ERROR: you must specify a value for %s" % attribute - continue - else: - if must_match and attribute not in self.lhs_names: - print "ERROR: %s is not a valid name" % (attribute) - continue - if validate_callback is not None: - if not validate_callback(attribute, value): - print "ERROR: %s is not valid for %s" % (value, attribute) - continue - try: - mandatory_attrs_remaining.remove(attribute) - except ValueError: - pass - - self.pairs[attribute] = value - return self.pairs - -class ItemCompleter: - ''' - Prompts the user for items in a list of items with auto completion. - TAB completes partial input. - More than one item can be specifed during input, whitespace and/or comma's seperate. - Example: - - possible_items = ['foo', 'bar'] - c = ItemCompleter(possible_items) - c.open() - # Use read_input() to limit input to a single carriage return (e.g. <ENTER>) - #items = c.read_input("Enter: ") - # Use get_items to iterate until a blank line is entered. - items = c.get_items("Enter: ") - c.close() - print "items=%s" % (items) - - ''' - - def __init__(self, items): - self.items = items - self.initial_input = None - self.item_delims = ' \t,' - self.split_re = re.compile('[%s]+' % self.item_delims) - - def open(self): - # Save state - self.prev_completer = readline.get_completer() - self.prev_completer_delims = readline.get_completer_delims() - - # Set up for ourself - readline.parse_and_bind("tab: complete") - readline.set_completer(self.complete) - readline.set_completer_delims(self.item_delims) - - def close(self): - # Restore previous state - readline.set_completer_delims(self.prev_completer_delims) - readline.set_completer(self.prev_completer) - - def get_item_completions(self, text): - if text: - self.completions = [lhs for lhs in self.items if lhs.startswith(text)] - else: - self.completions = self.items - - def complete(self, text, state): - self.line_buffer= readline.get_line_buffer() - if state == 0: - beg = readline.get_begidx() - end = readline.get_endidx() - self.get_item_completions(self.line_buffer[beg:end]) - if state >= len(self.completions): return None - return self.completions[state] - - def pre_input_hook(self): - readline.insert_text('%s %s ' % (self.initial_input, self.operator)) - readline.redisplay() - - def read_input(self, prompt, initial_input=None): - items = [] - - self.initial_input = initial_input - try: - if initial_input is None: - readline.set_pre_input_hook(None) - else: - readline.set_pre_input_hook(self.pre_input_hook) - self.line_buffer = raw_input(prompt).strip() - items = self.split_re.split(self.line_buffer) - for item in items[:]: - if not item: items.remove(item) - return items - except EOFError: - return items - - def get_items(self, prompt, must_match=True): - items = [] - - print "Enter name [name ...]" - print "Press <ENTER> to accept, blank line or control-D terminates input" - print "Pressing <TAB> auto completes name" - print - while True: - new_items = self.read_input(prompt) - if not new_items: break - for item in new_items: - if must_match: - if item not in self.items: - print "ERROR: %s is not valid" % (item) - continue - if item in items: continue - items.append(item) - - return items - -def get_gsserror(e): - """A GSSError exception looks differently in python 2.4 than it does - in python 2.5, deal with it.""" - - try: - primary = e[0] - secondary = e[1] - except: - primary = e[0][0] - secondary = e[0][1] - - return (primary[0], secondary[0]) diff --git a/ipa-python/ipavalidate.py b/ipa-python/ipavalidate.py deleted file mode 100644 index 63e0a7614..000000000 --- a/ipa-python/ipavalidate.py +++ /dev/null @@ -1,137 +0,0 @@ -# Authors: Rob Crittenden <rcritten@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import re - -def Email(mail, notEmpty=True): - """Do some basic validation of an e-mail address. - Return True if ok - Return False if not - - If notEmpty is True the this will return an error if the field - is "" or None. - """ - usernameRE = re.compile(r"^[^ \t\n\r@<>()]+$", re.I) - domainRE = re.compile(r"^[a-z0-9][a-z0-9\.\-_]*\.[a-z]+$", re.I) - - if not mail or mail is None: - if notEmpty is True: - return False - else: - return True - - mail = mail.strip() - s = mail.split('@', 1) - try: - username, domain=s - except ValueError: - return False - if not usernameRE.search(username): - return False - if not domainRE.search(domain): - return False - - return True - -def Plain(text, notEmpty=False, allowSpaces=True): - """Do some basic validation of a plain text field - Return True if ok - Return False if not - - If notEmpty is True the this will return an error if the field - is "" or None. - """ - if (text is None) or (not text.strip()): - if notEmpty is True: - return False - else: - return True - - if allowSpaces: - textRE = re.compile(r"^[a-zA-Z_\-0-9\'\ ]*$") - else: - textRE = re.compile(r"^[a-zA-Z_\-0-9\']*$") - if not textRE.search(text): - return False - - return True - -def String(text, notEmpty=False): - """A string type. This is much looser in what it allows than plain""" - - if text is None or not text.strip(): - if notEmpty is True: - return False - else: - return True - - return True - -def Path(text, notEmpty=False): - """Do some basic validation of a path - Return True if ok - Return False if not - - If notEmpty is True the this will return an error if the field - is "" or None. - """ - textRE = re.compile(r"^[a-zA-Z_\-0-9\\ \.\/\\:]*$") - - if not text and notEmpty is True: - return False - - if text is None: - if notEmpty is True: - return False - else: - return True - - if not textRE.search(text): - return False - - return True - -def GoodName(text, notEmpty=False): - """From shadow-utils: - - User/group names must match gnu e-regex: - [a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]? - - as a non-POSIX, extension, allow "$" as the last char for - sake of Samba 3.x "add machine script" - - Return True if ok - Return False if not - """ - textRE = re.compile(r"^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?$") - - if not text and notEmpty is True: - return False - - if text is None: - if notEmpty is True: - return False - else: - return True - - m = textRE.match(text) - if not m or text != m.group(0): - return False - - return True diff --git a/ipa-python/radius_util.py b/ipa-python/radius_util.py deleted file mode 100644 index 3d2e83e18..000000000 --- a/ipa-python/radius_util.py +++ /dev/null @@ -1,366 +0,0 @@ -# Authors: John Dennis <jdennis@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import sys -import os -import re -import ldap -import getpass -import ldap.filter - -from ipa import ipautil -from ipa.entity import Entity -import ipa.ipavalidate as ipavalidate - - -__all__ = [ - 'RADIUS_PKG_NAME', - 'RADIUS_PKG_CONFIG_DIR', - 'RADIUS_SERVICE_NAME', - 'RADIUS_USER', - 'RADIUS_IPA_KEYTAB_FILEPATH', - 'RADIUS_LDAP_ATTR_MAP_FILEPATH', - 'RADIUSD_CONF_FILEPATH', - 'RADIUSD_CONF_TEMPLATE_FILEPATH', - 'RADIUSD', - - 'RadiusClient', - 'RadiusProfile', - - 'clients_container', - 'radius_clients_basedn', - 'radius_client_filter', - 'radius_client_dn', - - 'profiles_container', - 'radius_profiles_basedn', - 'radius_profile_filter', - 'radius_profile_dn', - - 'radius_client_ldap_attr_to_radius_attr', - 'radius_client_attr_to_ldap_attr', - - 'radius_profile_ldap_attr_to_radius_attr', - 'radius_profile_attr_to_ldap_attr', - - 'get_secret', - 'validate_ip_addr', - 'validate_secret', - 'validate_name', - 'validate_nastype', - 'validate_desc', - 'validate', - ] - -#------------------------------------------------------------------------------ - -RADIUS_PKG_NAME = 'freeradius' -RADIUS_PKG_CONFIG_DIR = '/etc/raddb' - -RADIUS_SERVICE_NAME = 'radius' -RADIUS_USER = 'radiusd' - -RADIUS_IPA_KEYTAB_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ipa.keytab') -RADIUS_LDAP_ATTR_MAP_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ldap.attrmap') -RADIUSD_CONF_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'radiusd.conf') -RADIUSD_CONF_TEMPLATE_FILEPATH = os.path.join(ipautil.PLUGINS_SHARE_DIR, 'radius.radiusd.conf.template') - -RADIUSD = '/usr/sbin/radiusd' - -#------------------------------------------------------------------------------ - -dotted_octet_re = re.compile(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)(/(\d+))?$") -dns_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9.-]+$") -# secret, name, nastype all have 31 char max in freeRADIUS, max ip address len is 255 -valid_secret_len = (1,31) -valid_name_len = (1,31) -valid_nastype_len = (1,31) -valid_ip_addr_len = (1,255) - -valid_ip_addr_msg = '''\ -IP address must be either a DNS name (letters,digits,dot,hyphen, beginning with -a letter),or a dotted octet followed by an optional mask (e.g 192.168.1.0/24)''' - -valid_desc_msg = "Description must text string" - -#------------------------------------------------------------------------------ - -class RadiusClient(Entity): - - def __init2__(self): - pass - -class RadiusProfile(Entity): - - def __init2__(self): - pass - - -#------------------------------------------------------------------------------ - -def reverse_map_dict(src_dict): - reverse_dict = {} - - for k,v in src_dict.items(): - if reverse_dict.has_key(v): - raise ValueError("reverse_map_dict: collision on (%s) with values (%s),(%s)" % \ - v, reverse_dict[v], src_dict[k]) - reverse_dict[v] = k - return reverse_dict - -#------------------------------------------------------------------------------ - -radius_client_ldap_attr_to_radius_attr = ipautil.CIDict({ - 'radiusClientIPAddress' : 'Client-IP-Address', - 'radiusClientSecret' : 'Secret', - 'radiusClientNASType' : 'NAS-Type', - 'radiusClientShortName' : 'Name', - 'description' : 'Description', - }) - -radius_client_attr_to_ldap_attr = reverse_map_dict(radius_client_ldap_attr_to_radius_attr) - -#------------------------------------------------------------------------------ - -radius_profile_ldap_attr_to_radius_attr = ipautil.CIDict({ - 'uid' : 'UID', - 'radiusArapFeatures' : 'Arap-Features', - 'radiusArapSecurity' : 'Arap-Security', - 'radiusArapZoneAccess' : 'Arap-Zone-Access', - 'radiusAuthType' : 'Auth-Type', - 'radiusCallbackId' : 'Callback-Id', - 'radiusCallbackNumber' : 'Callback-Number', - 'radiusCalledStationId' : 'Called-Station-Id', - 'radiusCallingStationId' : 'Calling-Station-Id', - 'radiusClass' : 'Class', - 'radiusClientIPAddress' : 'Client-IP-Address', - 'radiusExpiration' : 'Expiration', - 'radiusFilterId' : 'Filter-Id', - 'radiusFramedAppleTalkLink' : 'Framed-AppleTalk-Link', - 'radiusFramedAppleTalkNetwork' : 'Framed-AppleTalk-Network', - 'radiusFramedAppleTalkZone' : 'Framed-AppleTalk-Zone', - 'radiusFramedCompression' : 'Framed-Compression', - 'radiusFramedIPAddress' : 'Framed-IP-Address', - 'radiusFramedIPNetmask' : 'Framed-IP-Netmask', - 'radiusFramedIPXNetwork' : 'Framed-IPX-Network', - 'radiusFramedMTU' : 'Framed-MTU', - 'radiusFramedProtocol' : 'Framed-Protocol', - 'radiusFramedRoute' : 'Framed-Route', - 'radiusFramedRouting' : 'Framed-Routing', - 'radiusGroupName' : 'Group-Name', - 'radiusHint' : 'Hint', - 'radiusHuntgroupName' : 'Huntgroup-Name', - 'radiusIdleTimeout' : 'Idle-Timeout', - 'radiusLoginIPHost' : 'Login-IP-Host', - 'radiusLoginLATGroup' : 'Login-LAT-Group', - 'radiusLoginLATNode' : 'Login-LAT-Node', - 'radiusLoginLATPort' : 'Login-LAT-Port', - 'radiusLoginLATService' : 'Login-LAT-Service', - 'radiusLoginService' : 'Login-Service', - 'radiusLoginTCPPort' : 'Login-TCP-Port', - 'radiusLoginTime' : 'Login-Time', - 'radiusNASIpAddress' : 'NAS-IP-Address', - 'radiusPasswordRetry' : 'Password-Retry', - 'radiusPortLimit' : 'Port-Limit', - 'radiusProfileDn' : 'Profile-Dn', - 'radiusPrompt' : 'Prompt', - 'radiusProxyToRealm' : 'Proxy-To-Realm', - 'radiusRealm' : 'Realm', - 'radiusReplicateToRealm' : 'Replicate-To-Realm', - 'radiusReplyMessage' : 'Reply-Message', - 'radiusServiceType' : 'Service-Type', - 'radiusSessionTimeout' : 'Session-Timeout', - 'radiusSimultaneousUse' : 'Simultaneous-Use', - 'radiusStripUserName' : 'Strip-User-Name', - 'radiusTerminationAction' : 'Termination-Action', - 'radiusTunnelAssignmentId' : 'Tunnel-Assignment-Id', - 'radiusTunnelClientEndpoint' : 'Tunnel-Client-Endpoint', - 'radiusTunnelMediumType' : 'Tunnel-Medium-Type', - 'radiusTunnelPassword' : 'Tunnel-Password', - 'radiusTunnelPreference' : 'Tunnel-Preference', - 'radiusTunnelPrivateGroupId' : 'Tunnel-Private-Group-Id', - 'radiusTunnelServerEndpoint' : 'Tunnel-Server-Endpoint', - 'radiusTunnelType' : 'Tunnel-Type', - 'radiusUserCategory' : 'User-Category', - 'radiusVSA' : 'VSA', -}) - -radius_profile_attr_to_ldap_attr = reverse_map_dict(radius_profile_ldap_attr_to_radius_attr) - -#------------------------------------------------------------------------------ - -clients_container = 'cn=clients,cn=radius' - -def radius_clients_basedn(container, suffix): - if container is None: container = clients_container - return '%s,%s' % (container, suffix) - -def radius_client_filter(ip_addr): - return "(&(radiusClientIPAddress=%s)(objectclass=radiusClientProfile))" % \ - ldap.filter.escape_filter_chars(ip_addr) - -def radius_client_dn(client, container, suffix): - if container is None: container = clients_container - return 'radiusClientIPAddress=%s,%s,%s' % (ldap.dn.escape_dn_chars(client), container, suffix) - -# -- - -profiles_container = 'cn=profiles,cn=radius' - -def radius_profiles_basedn(container, suffix): - if container is None: container = profiles_container - return '%s,%s' % (container, suffix) - -def radius_profile_filter(uid): - return "(&(uid=%s)(objectclass=radiusprofile))" % \ - ldap.filter.escape_filter_chars(uid) - -def radius_profile_dn(uid, container, suffix): - if container is None: container = profiles_container - return 'uid=%s,%s,%s' % (ldap.dn.escape_dn_chars(uid), container, suffix) - - -#------------------------------------------------------------------------------ - -def get_ldap_attr_translations(): - comment_re = re.compile('#.*$') - radius_attr_to_ldap_attr = {} - ldap_attr_to_radius_attr = {} - try: - f = open(LDAP_ATTR_MAP_FILEPATH) - for line in f.readlines(): - line = comment_re.sub('', line).strip() - if not line: continue - attr_type, radius_attr, ldap_attr = line.split() - print 'type="%s" radius="%s" ldap="%s"' % (attr_type, radius_attr, ldap_attr) - radius_attr_to_ldap_attr[radius_attr] = {'ldap_attr':ldap_attr, 'attr_type':attr_type} - ldap_attr_to_radius_attr[ldap_attr] = {'radius_attr':radius_attr, 'attr_type':attr_type} - f.close() - except Exception, e: - logging.error('cold not read radius ldap attribute map file (%s): %s', LDAP_ATTR_MAP_FILEPATH, e) - pass # FIXME - - #for k,v in radius_attr_to_ldap_attr.items(): - # print '%s --> %s' % (k,v) - #for k,v in ldap_attr_to_radius_attr.items(): - # print '%s --> %s' % (k,v) - -def get_secret(): - valid = False - while (not valid): - secret = getpass.getpass("Enter Secret: ") - confirm = getpass.getpass("Confirm Secret: ") - if (secret != confirm): - print "Secrets do not match" - continue - valid = True - return secret - -#------------------------------------------------------------------------------ - -def valid_ip_addr(text): - - # is it a dotted octet? If so there should be 4 integers seperated - # by a dot and each integer should be between 0 and 255 - # there may be an optional mask preceded by a slash (e.g. 1.2.3.4/24) - match = dotted_octet_re.search(text) - if match: - # dotted octet notation - i = 1 - while i <= 4: - octet = int(match.group(i)) - if octet > 255: return False - i += 1 - if match.group(5): - mask = int(match.group(6)) - if mask <= 32: - return True - else: - return False - return True - else: - # DNS name, can contain letters, numbers, dot and hypen, must start with a letter - if dns_re.search(text): return True - return False - -def validate_length(value, limits): - length = len(value) - if length < limits[0] or length > limits[1]: - return False - return True - -def valid_length_msg(name, limits): - return "%s length must be at least %d and not more than %d" % (name, limits[0], limits[1]) - -def err_msg(variable, variable_name=None): - if variable_name is None: variable_name = 'value' - print "ERROR: %s = %s" % (variable_name, variable) - -#------------------------------------------------------------------------------ - -def validate_ip_addr(ip_addr, variable_name=None): - if not validate_length(ip_addr, valid_ip_addr_len): - err_msg(ip_addr, variable_name) - print valid_length_msg('ip address', valid_ip_addr_len) - return False - if not valid_ip_addr(ip_addr): - err_msg(ip_addr, variable_name) - print valid_ip_addr_msg - return False - return True - -def validate_secret(secret, variable_name=None): - if not validate_length(secret, valid_secret_len): - err_msg(secret, variable_name) - print valid_length_msg('secret', valid_secret_len) - return False - return True - -def validate_name(name, variable_name=None): - if not validate_length(name, valid_name_len): - err_msg(name, variable_name) - print valid_length_msg('name', valid_name_len) - return False - return True - -def validate_nastype(nastype, variable_name=None): - if not validate_length(nastype, valid_nastype_len): - err_msg(nastype, variable_name) - print valid_length_msg('NAS Type', valid_nastype_len) - return False - return True - -def validate_desc(desc, variable_name=None): - if not ipavalidate.Plain(desc): - print valid_desc_msg - return False - return True - -def validate(attribute, value): - if attribute == 'Client-IP-Address': - return validate_ip_addr(value, attribute) - if attribute == 'Secret': - return validate_secret(value, attribute) - if attribute == 'NAS-Type': - return validate_nastype(value, attribute) - if attribute == 'Name': - return validate_name(value, attribute) - if attribute == 'Description': - return validate_desc(value, attribute) - return True diff --git a/ipa-python/setup.py.in b/ipa-python/setup.py.in deleted file mode 100644 index 19940f38f..000000000 --- a/ipa-python/setup.py.in +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -"""FreeIPA python support library - -FreeIPA is a server for identity, policy, and audit. -""" - -DOCLINES = __doc__.split("\n") - -import os -import sys -import distutils.sysconfig - -CLASSIFIERS = """\ -Development Status :: 4 - Beta -Intended Audience :: System Environment/Base -License :: GPL -Programming Language :: Python -Operating System :: POSIX -Operating System :: Unix -""" - -# BEFORE importing distutils, remove MANIFEST. distutils doesn't properly -# update it when the contents of directories change. -if os.path.exists('MANIFEST'): os.remove('MANIFEST') - -def setup_package(): - - from distutils.core import setup - - old_path = os.getcwd() - local_path = os.path.dirname(os.path.abspath(sys.argv[0])) - os.chdir(local_path) - sys.path.insert(0,local_path) - - try: - setup( - name = "ipa", - version = "__VERSION__", - license = "GPL", - author = "Karl MacMillan, et.al.", - author_email = "kmacmill@redhat.com", - maintainer = "freeIPA Developers", - maintainer_email = "freeipa-devel@redhat.com", - url = "http://www.freeipa.org/", - description = DOCLINES[0], - long_description = "\n".join(DOCLINES[2:]), - download_url = "http://www.freeipa.org/page/Downloads", - classifiers=filter(None, CLASSIFIERS.split('\n')), - platforms = ["Linux", "Solaris", "Unix"], - package_dir = {'ipa': ''}, - packages = [ "ipa" ], - data_files = [('/etc/ipa', ['ipa.conf'])] - ) - finally: - del sys.path[0] - os.chdir(old_path) - return - -if __name__ == '__main__': - setup_package() diff --git a/ipa-python/sysrestore.py b/ipa-python/sysrestore.py deleted file mode 100644 index 5d5692be6..000000000 --- a/ipa-python/sysrestore.py +++ /dev/null @@ -1,317 +0,0 @@ -# Authors: Mark McLoughlin <markmc@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -# -# This module provides a very simple API which allows -# ipa-xxx-install --uninstall to restore certain -# parts of the system configuration to the way it was -# before ipa-server-install was first run - -import os -import os.path -import errno -import shutil -import logging -import ConfigParser -import random -import string - -from ipa import ipautil - -SYSRESTORE_PATH = "/tmp" -SYSRESTORE_INDEXFILE = "sysrestore.index" -SYSRESTORE_STATEFILE = "sysrestore.state" - -class FileStore: - """Class for handling backup and restore of files""" - - def __init__(self, path = SYSRESTORE_PATH): - """Create a _StoreFiles object, that uses @path as the - base directory. - - The file @path/sysrestore.index is used to store information - about the original location of the saved files. - """ - self._path = path - self._index = self._path + "/" + SYSRESTORE_INDEXFILE - - self.random = random.Random() - - self.files = {} - self._load() - - def _load(self): - """Load the file list from the index file. @files will - be an empty dictionary if the file doesn't exist. - """ - - logging.debug("Loading Index file from '%s'", self._index) - - self.files = {} - - p = ConfigParser.SafeConfigParser() - p.read(self._index) - - for section in p.sections(): - if section == "files": - for (key, value) in p.items(section): - self.files[key] = value - - - def save(self): - """Save the file list to @_index. If @files is an empty - dict, then @_index should be removed. - """ - logging.debug("Saving Index File to '%s'", self._index) - - if len(self.files) == 0: - logging.debug(" -> no files, removing file") - if os.path.exists(self._index): - os.remove(self._index) - return - - p = ConfigParser.SafeConfigParser() - - p.add_section('files') - for (key, value) in self.files.items(): - p.set('files', key, str(value)) - - f = file(self._index, "w") - p.write(f) - f.close() - - def backup_file(self, path): - """Create a copy of the file at @path - so long as a copy - does not already exist - which will be restored to its - original location by restore_files(). - """ - logging.debug("Backing up system configuration file '%s'", path) - - if not os.path.isabs(path): - raise ValueError("Absolute path required") - - if not os.path.isfile(path): - logging.debug(" -> Not backing up - '%s' doesn't exist", path) - return - - (reldir, file) = os.path.split(path) - - filename = "" - for i in range(8): - h = "%02x" % self.random.randint(0,255) - filename += h - filename += "-"+file - - backup_path = os.path.join(self._path, filename) - if os.path.exists(backup_path): - logging.debug(" -> Not backing up - already have a copy of '%s'", path) - return - - shutil.copy2(path, backup_path) - - stat = os.stat(path) - - self.files[filename] = string.join([str(stat.st_mode),str(stat.st_uid),str(stat.st_gid),path], ',') - self.save() - - def restore_file(self, path): - """Restore the copy of a file at @path to its original - location and delete the copy. - - Returns #True if the file was restored, #False if there - was no backup file to restore - """ - - logging.debug("Restoring system configuration file '%s'", path) - - if not os.path.isabs(path): - raise ValueError("Absolute path required") - - mode = None - uid = None - gid = None - filename = None - - for (key, value) in self.files.items(): - (mode,uid,gid,filepath) = string.split(value, ',', 3) - if (filepath == path): - filename = key - break - - if not filename: - raise ValueError("No such file name in the index") - - backup_path = os.path.join(self._path, filename) - if not os.path.exists(backup_path): - logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path) - return False - - shutil.move(backup_path, path) - os.chown(path, int(uid), int(gid)) - os.chmod(path, int(mode)) - - ipautil.run(["/sbin/restorecon", path]) - - del self.files[filename] - self.save() - - return True - - def restore_all_files(self): - """Restore the files in the inbdex to their original - location and delete the copy. - - Returns #True if the file was restored, #False if there - was no backup file to restore - """ - - if len(self.files) == 0: - return False - - for (filename, value) in self.files.items(): - - (mode,uid,gid,path) = string.split(value, ',', 3) - - backup_path = os.path.join(self._path, filename) - if not os.path.exists(backup_path): - logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path) - - shutil.move(backup_path, path) - os.chown(path, int(uid), int(gid)) - os.chmod(path, int(mode)) - - ipautil.run(["/sbin/restorecon", path]) - - #force file to be deleted - self.files = {} - self.save() - - return True - -class StateFile: - """A metadata file for recording system state which can - be backed up and later restored. The format is something - like: - - [httpd] - running=True - enabled=False - """ - - def __init__(self, path = SYSRESTORE_PATH): - """Create a StateFile object, loading from @path. - - The dictionary @modules, a member of the returned object, - is where the state can be modified. @modules is indexed - using a module name to return another dictionary containing - key/value pairs with the saved state of that module. - - The keys in these latter dictionaries are arbitrary strings - and the values may either be strings or booleans. - """ - self._path = path+"/"+SYSRESTORE_STATEFILE - - self.modules = {} - - self._load() - - def _load(self): - """Load the modules from the file @_path. @modules will - be an empty dictionary if the file doesn't exist. - """ - logging.debug("Loading StateFile from '%s'", self._path) - - self.modules = {} - - p = ConfigParser.SafeConfigParser() - p.read(self._path) - - for module in p.sections(): - self.modules[module] = {} - for (key, value) in p.items(module): - if value == str(True): - value = True - elif value == str(False): - value = False - self.modules[module][key] = value - - def save(self): - """Save the modules to @_path. If @modules is an empty - dict, then @_path should be removed. - """ - logging.debug("Saving StateFile to '%s'", self._path) - - for module in self.modules.keys(): - if len(self.modules[module]) == 0: - del self.modules[module] - - if len(self.modules) == 0: - logging.debug(" -> no modules, removing file") - if os.path.exists(self._path): - os.remove(self._path) - return - - p = ConfigParser.SafeConfigParser() - - for module in self.modules.keys(): - p.add_section(module) - for (key, value) in self.modules[module].items(): - p.set(module, key, str(value)) - - f = file(self._path, "w") - p.write(f) - f.close() - - def backup_state(self, module, key, value): - """Backup an item of system state from @module, identified - by the string @key and with the value @value. @value may be - a string or boolean. - """ - if not (isinstance(value, str) or isinstance(value, bool)): - raise ValueError("Only strings or booleans supported") - - if not self.modules.has_key(module): - self.modules[module] = {} - - if not self.modules.has_key(key): - self.modules[module][key] = value - - self.save() - - def restore_state(self, module, key): - """Return the value of an item of system state from @module, - identified by the string @key, and remove it from the backed - up system state. - - If the item doesn't exist, #None will be returned, otherwise - the original string or boolean value is returned. - """ - - if not self.modules.has_key(module): - return None - - if not self.modules[module].has_key(key): - return None - - value = self.modules[module][key] - del self.modules[module][key] - - self.save() - - return value diff --git a/ipa-python/test/test_aci.py b/ipa-python/test/test_aci.py deleted file mode 100644 index fb9d84c7f..000000000 --- a/ipa-python/test/test_aci.py +++ /dev/null @@ -1,127 +0,0 @@ -#! /usr/bin/python -E -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import sys -sys.path.insert(0, ".") - -import unittest -import aci -import urllib - - -class TestACI(unittest.TestCase): - acitemplate = ('(targetattr="%s")' + - '(targetfilter="(memberOf=%s)")' + - '(version 3.0;' + - 'acl "%s";' + - 'allow (write) ' + - 'groupdn="ldap:///%s";)') - - def setUp(self): - self.aci = aci.ACI() - - def tearDown(self): - pass - - def testExport(self): - self.aci.source_group = 'cn=foo, dc=freeipa, dc=org' - self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org' - self.aci.name = 'this is a "name' - self.aci.attrs = ['field1', 'field2', 'field3'] - - exportaci = self.aci.export_to_string() - aci = TestACI.acitemplate % ('field1 || field2 || field3', - self.aci.dest_group, - 'this is a "name', - self.aci.source_group) - - self.assertEqual(aci, exportaci) - - def testURLEncodedExport(self): - self.aci.source_group = 'cn=foo " bar, dc=freeipa, dc=org' - self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org' - self.aci.name = 'this is a "name' - self.aci.attrs = ['field1', 'field2', 'field3'] - - exportaci = self.aci.export_to_string() - aci = TestACI.acitemplate % ('field1 || field2 || field3', - self.aci.dest_group, - 'this is a "name', - urllib.quote(self.aci.source_group, "/=, ")) - - self.assertEqual(aci, exportaci) - - def testSimpleParse(self): - attr_str = 'field3 || field4 || field5' - dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org' - name = 'my name' - src_dn = 'cn=srcgroup, dc=freeipa, dc=org' - - acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn) - self.aci.parse_acistr(acistr) - - self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs) - self.assertEqual(dest_dn, self.aci.dest_group) - self.assertEqual(name, self.aci.name) - self.assertEqual(src_dn, self.aci.source_group) - - def testUrlEncodedParse(self): - attr_str = 'field3 || field4 || field5' - dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org' - name = 'my name' - src_dn = 'cn=src " group, dc=freeipa, dc=org' - - acistr = TestACI.acitemplate % (attr_str, dest_dn, name, - urllib.quote(src_dn, "/=, ")) - self.aci.parse_acistr(acistr) - - self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs) - self.assertEqual(dest_dn, self.aci.dest_group) - self.assertEqual(name, self.aci.name) - self.assertEqual(src_dn, self.aci.source_group) - - def testInvalidParse(self): - try: - self.aci.parse_acistr('foo bar') - self.fail('Should have failed to parse') - except SyntaxError: - pass - - try: - self.aci.parse_acistr('') - self.fail('Should have failed to parse') - except SyntaxError: - pass - - attr_str = 'field3 || field4 || field5' - dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org' - name = 'my name' - src_dn = 'cn=srcgroup, dc=freeipa, dc=org' - - acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn) - acistr += 'trailing garbage' - try: - self.aci.parse_acistr('') - self.fail('Should have failed to parse') - except SyntaxError: - pass - - -if __name__ == '__main__': - unittest.main() diff --git a/ipa-python/test/test_ipautil.py b/ipa-python/test/test_ipautil.py deleted file mode 100644 index 60d53a270..000000000 --- a/ipa-python/test/test_ipautil.py +++ /dev/null @@ -1,309 +0,0 @@ -#! /usr/bin/python -E -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import sys -sys.path.insert(0, ".") - -import unittest -import datetime - -import ipautil - - -class TestCIDict(unittest.TestCase): - def setUp(self): - self.cidict = ipautil.CIDict() - self.cidict["Key1"] = "val1" - self.cidict["key2"] = "val2" - self.cidict["KEY3"] = "VAL3" - - def tearDown(self): - pass - - def testLen(self): - self.assertEqual(3, len(self.cidict)) - - def test__GetItem(self): - self.assertEqual("val1", self.cidict["Key1"]) - self.assertEqual("val1", self.cidict["key1"]) - self.assertEqual("val2", self.cidict["KEY2"]) - self.assertEqual("VAL3", self.cidict["key3"]) - self.assertEqual("VAL3", self.cidict["KEY3"]) - try: - self.cidict["key4"] - fail("should have raised KeyError") - except KeyError: - pass - - def testGet(self): - self.assertEqual("val1", self.cidict.get("Key1")) - self.assertEqual("val1", self.cidict.get("key1")) - self.assertEqual("val2", self.cidict.get("KEY2")) - self.assertEqual("VAL3", self.cidict.get("key3")) - self.assertEqual("VAL3", self.cidict.get("KEY3")) - self.assertEqual("default", self.cidict.get("key4", "default")) - - def test__SetItem(self): - self.cidict["key4"] = "val4" - self.assertEqual("val4", self.cidict["key4"]) - self.cidict["KEY4"] = "newval4" - self.assertEqual("newval4", self.cidict["key4"]) - - def testDel(self): - self.assert_(self.cidict.has_key("Key1")) - del(self.cidict["Key1"]) - self.failIf(self.cidict.has_key("Key1")) - - self.assert_(self.cidict.has_key("key2")) - del(self.cidict["KEY2"]) - self.failIf(self.cidict.has_key("key2")) - - def testClear(self): - self.assertEqual(3, len(self.cidict)) - self.cidict.clear() - self.assertEqual(0, len(self.cidict)) - - def testCopy(self): - """A copy is no longer a CIDict, but should preserve the case of - the keys as they were inserted.""" - copy = self.cidict.copy() - self.assertEqual(3, len(copy)) - self.assert_(copy.has_key("Key1")) - self.assertEqual("val1", copy["Key1"]) - self.failIf(copy.has_key("key1")) - - def testHasKey(self): - self.assert_(self.cidict.has_key("KEY1")) - self.assert_(self.cidict.has_key("key2")) - self.assert_(self.cidict.has_key("key3")) - - def testItems(self): - items = self.cidict.items() - self.assertEqual(3, len(items)) - items_set = set(items) - self.assert_(("Key1", "val1") in items_set) - self.assert_(("key2", "val2") in items_set) - self.assert_(("KEY3", "VAL3") in items_set) - - def testIterItems(self): - items = [] - for (k,v) in self.cidict.iteritems(): - items.append((k,v)) - self.assertEqual(3, len(items)) - items_set = set(items) - self.assert_(("Key1", "val1") in items_set) - self.assert_(("key2", "val2") in items_set) - self.assert_(("KEY3", "VAL3") in items_set) - - def testIterKeys(self): - keys = [] - for k in self.cidict.iterkeys(): - keys.append(k) - self.assertEqual(3, len(keys)) - keys_set = set(keys) - self.assert_("Key1" in keys_set) - self.assert_("key2" in keys_set) - self.assert_("KEY3" in keys_set) - - def testIterValues(self): - values = [] - for k in self.cidict.itervalues(): - values.append(k) - self.assertEqual(3, len(values)) - values_set = set(values) - self.assert_("val1" in values_set) - self.assert_("val2" in values_set) - self.assert_("VAL3" in values_set) - - def testKeys(self): - keys = self.cidict.keys() - self.assertEqual(3, len(keys)) - keys_set = set(keys) - self.assert_("Key1" in keys_set) - self.assert_("key2" in keys_set) - self.assert_("KEY3" in keys_set) - - def testValues(self): - values = self.cidict.values() - self.assertEqual(3, len(values)) - values_set = set(values) - self.assert_("val1" in values_set) - self.assert_("val2" in values_set) - self.assert_("VAL3" in values_set) - - def testUpdate(self): - newdict = { "KEY2": "newval2", - "key4": "val4" } - self.cidict.update(newdict) - self.assertEqual(4, len(self.cidict)) - - items = self.cidict.items() - self.assertEqual(4, len(items)) - items_set = set(items) - self.assert_(("Key1", "val1") in items_set) - # note the update "overwrites" the case of the key2 - self.assert_(("KEY2", "newval2") in items_set) - self.assert_(("KEY3", "VAL3") in items_set) - self.assert_(("key4", "val4") in items_set) - - def testSetDefault(self): - self.assertEqual("val1", self.cidict.setdefault("KEY1", "default")) - - self.failIf(self.cidict.has_key("KEY4")) - self.assertEqual("default", self.cidict.setdefault("KEY4", "default")) - self.assert_(self.cidict.has_key("KEY4")) - self.assertEqual("default", self.cidict["key4"]) - - self.failIf(self.cidict.has_key("KEY5")) - self.assertEqual(None, self.cidict.setdefault("KEY5")) - self.assert_(self.cidict.has_key("KEY5")) - self.assertEqual(None, self.cidict["key5"]) - - def testPop(self): - self.assertEqual("val1", self.cidict.pop("KEY1", "default")) - self.failIf(self.cidict.has_key("key1")) - - self.assertEqual("val2", self.cidict.pop("KEY2")) - self.failIf(self.cidict.has_key("key2")) - - self.assertEqual("default", self.cidict.pop("key4", "default")) - try: - self.cidict.pop("key4") - fail("should have raised KeyError") - except KeyError: - pass - - def testPopItem(self): - items = set(self.cidict.items()) - self.assertEqual(3, len(self.cidict)) - - item = self.cidict.popitem() - self.assertEqual(2, len(self.cidict)) - self.assert_(item in items) - items.discard(item) - - item = self.cidict.popitem() - self.assertEqual(1, len(self.cidict)) - self.assert_(item in items) - items.discard(item) - - item = self.cidict.popitem() - self.assertEqual(0, len(self.cidict)) - self.assert_(item in items) - items.discard(item) - -class TestTimeParser(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def testSimple(self): - timestr = "20070803" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(2007, time.year) - self.assertEqual(8, time.month) - self.assertEqual(3, time.day) - self.assertEqual(0, time.hour) - self.assertEqual(0, time.minute) - self.assertEqual(0, time.second) - - def testHourMinSec(self): - timestr = "20051213141205" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(2005, time.year) - self.assertEqual(12, time.month) - self.assertEqual(13, time.day) - self.assertEqual(14, time.hour) - self.assertEqual(12, time.minute) - self.assertEqual(5, time.second) - - def testFractions(self): - timestr = "2003092208.5" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(2003, time.year) - self.assertEqual(9, time.month) - self.assertEqual(22, time.day) - self.assertEqual(8, time.hour) - self.assertEqual(30, time.minute) - self.assertEqual(0, time.second) - - timestr = "199203301544,25" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(1992, time.year) - self.assertEqual(3, time.month) - self.assertEqual(30, time.day) - self.assertEqual(15, time.hour) - self.assertEqual(44, time.minute) - self.assertEqual(15, time.second) - - timestr = "20060401185912,8" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(2006, time.year) - self.assertEqual(4, time.month) - self.assertEqual(1, time.day) - self.assertEqual(18, time.hour) - self.assertEqual(59, time.minute) - self.assertEqual(12, time.second) - self.assertEqual(800000, time.microsecond) - - def testTimeZones(self): - timestr = "20051213141205Z" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(0, time.tzinfo.houroffset) - self.assertEqual(0, time.tzinfo.minoffset) - offset = time.tzinfo.utcoffset(None) - self.assertEqual(0, offset.seconds) - - timestr = "20051213141205+0500" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(5, time.tzinfo.houroffset) - self.assertEqual(0, time.tzinfo.minoffset) - offset = time.tzinfo.utcoffset(None) - self.assertEqual(5 * 60 * 60, offset.seconds) - - timestr = "20051213141205-0500" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(-5, time.tzinfo.houroffset) - self.assertEqual(0, time.tzinfo.minoffset) - # NOTE - the offset is always positive - it's minutes - # _east_ of UTC - offset = time.tzinfo.utcoffset(None) - self.assertEqual((24 - 5) * 60 * 60, offset.seconds) - - timestr = "20051213141205-0930" - - time = ipautil.parse_generalized_time(timestr) - self.assertEqual(-9, time.tzinfo.houroffset) - self.assertEqual(-30, time.tzinfo.minoffset) - offset = time.tzinfo.utcoffset(None) - self.assertEqual(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds) - - -if __name__ == '__main__': - unittest.main() diff --git a/ipa-python/test/test_ipavalidate.py b/ipa-python/test/test_ipavalidate.py deleted file mode 100644 index 8b79fbf07..000000000 --- a/ipa-python/test/test_ipavalidate.py +++ /dev/null @@ -1,97 +0,0 @@ -#! /usr/bin/python -E -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import sys -sys.path.insert(0, ".") - -import unittest - -import ipavalidate - -class TestValidate(unittest.TestCase): - def setUp(self): - pass - - def tearDown(self): - pass - - def test_validEmail(self): - self.assertEqual(True, ipavalidate.Email("test@freeipa.org")) - self.assertEqual(True, ipavalidate.Email("", notEmpty=False)) - - def test_invalidEmail(self): - self.assertEqual(False, ipavalidate.Email("test")) - self.assertEqual(False, ipavalidate.Email("test@freeipa")) - self.assertEqual(False, ipavalidate.Email("test@.com")) - self.assertEqual(False, ipavalidate.Email("")) - self.assertEqual(False, ipavalidate.Email(None)) - - def test_validPlain(self): - self.assertEqual(True, ipavalidate.Plain("Joe User")) - self.assertEqual(True, ipavalidate.Plain("Joe O'Malley")) - self.assertEqual(True, ipavalidate.Plain("", notEmpty=False)) - self.assertEqual(True, ipavalidate.Plain(None, notEmpty=False)) - self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=False)) - self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=True)) - - def test_invalidPlain(self): - self.assertEqual(False, ipavalidate.Plain("Joe (User)")) - self.assertEqual(False, ipavalidate.Plain("Joe C. User")) - self.assertEqual(False, ipavalidate.Plain("", notEmpty=True)) - self.assertEqual(False, ipavalidate.Plain(None, notEmpty=True)) - self.assertEqual(False, ipavalidate.Plain("Joe User", allowSpaces=False)) - self.assertEqual(False, ipavalidate.Plain("Joe C. User")) - - def test_validString(self): - self.assertEqual(True, ipavalidate.String("Joe User")) - self.assertEqual(True, ipavalidate.String("Joe O'Malley")) - self.assertEqual(True, ipavalidate.String("", notEmpty=False)) - self.assertEqual(True, ipavalidate.String(None, notEmpty=False)) - self.assertEqual(True, ipavalidate.String("Joe C. User")) - - def test_invalidString(self): - self.assertEqual(False, ipavalidate.String("", notEmpty=True)) - self.assertEqual(False, ipavalidate.String(None, notEmpty=True)) - - def test_validPath(self): - self.assertEqual(True, ipavalidate.Path("/")) - self.assertEqual(True, ipavalidate.Path("/home/user")) - self.assertEqual(True, ipavalidate.Path("../home/user")) - self.assertEqual(True, ipavalidate.Path("", notEmpty=False)) - self.assertEqual(True, ipavalidate.Path(None, notEmpty=False)) - - def test_invalidPath(self): - self.assertEqual(False, ipavalidate.Path("(foo)")) - self.assertEqual(False, ipavalidate.Path("", notEmpty=True)) - self.assertEqual(False, ipavalidate.Path(None, notEmpty=True)) - - def test_validName(self): - self.assertEqual(True, ipavalidate.GoodName("foo")) - self.assertEqual(True, ipavalidate.GoodName("1foo")) - self.assertEqual(True, ipavalidate.GoodName("foo.bar")) - self.assertEqual(True, ipavalidate.GoodName("foo.bar$")) - - def test_invalidName(self): - self.assertEqual(False, ipavalidate.GoodName("foo bar")) - self.assertEqual(False, ipavalidate.GoodName("foo%bar")) - self.assertEqual(False, ipavalidate.GoodName("*foo")) - self.assertEqual(False, ipavalidate.GoodName("$foo.bar$")) - -if __name__ == '__main__': - unittest.main() diff --git a/ipa-python/version.py.in b/ipa-python/version.py.in deleted file mode 100644 index fdb689f02..000000000 --- a/ipa-python/version.py.in +++ /dev/null @@ -1,25 +0,0 @@ -# Authors: Rob Crittenden <rcritten@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License as -# published by the Free Software Foundation; version 2 only -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -# The full version including strings -VERSION="__VERSION__" - -# Just the numeric portion of the version so one can do direct numeric -# comparisons to see if the API is compatible. -NUM_VERSION=__NUM_VERSION__ |