diff options
author | Rob Crittenden <rcritten@redhat.com> | 2009-02-05 15:03:08 -0500 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2009-02-09 14:35:15 -0500 |
commit | 262ff2d731b1bfc4acd91153088b8fcde7ae92b8 (patch) | |
tree | baf8894d4b357b610113b87d4bfee84de24f08bd /ipapython | |
parent | 58ae191a5afbf29d78afd3969f8d106415897958 (diff) | |
download | freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.gz freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.xz freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.zip |
Rename ipa-python directory to ipapython so it is a real python library
We used to install it as ipa, now installing it as ipapython. The rpm
is still ipa-python.
Diffstat (limited to 'ipapython')
-rw-r--r-- | ipapython/MANIFEST.in | 2 | ||||
-rw-r--r-- | ipapython/Makefile | 28 | ||||
-rw-r--r-- | ipapython/README | 18 | ||||
-rw-r--r-- | ipapython/__init__.py | 0 | ||||
-rw-r--r-- | ipapython/config.py | 183 | ||||
-rw-r--r-- | ipapython/dnsclient.py | 443 | ||||
-rw-r--r-- | ipapython/entity.py | 202 | ||||
-rw-r--r-- | ipapython/ipa.conf | 3 | ||||
-rw-r--r-- | ipapython/ipautil.py | 969 | ||||
-rw-r--r-- | ipapython/ipavalidate.py | 137 | ||||
-rw-r--r-- | ipapython/radius_util.py | 366 | ||||
-rw-r--r-- | ipapython/setup.py.in | 77 | ||||
-rw-r--r-- | ipapython/sysrestore.py | 317 | ||||
-rw-r--r-- | ipapython/test/test_aci.py | 127 | ||||
-rw-r--r-- | ipapython/test/test_ipautil.py | 309 | ||||
-rw-r--r-- | ipapython/test/test_ipavalidate.py | 97 | ||||
-rw-r--r-- | ipapython/version.py.in | 25 |
17 files changed, 3303 insertions, 0 deletions
diff --git a/ipapython/MANIFEST.in b/ipapython/MANIFEST.in new file mode 100644 index 000000000..d178f0838 --- /dev/null +++ b/ipapython/MANIFEST.in @@ -0,0 +1,2 @@ +include *.conf + diff --git a/ipapython/Makefile b/ipapython/Makefile new file mode 100644 index 000000000..4ac027e14 --- /dev/null +++ b/ipapython/Makefile @@ -0,0 +1,28 @@ +PYTHONLIBDIR ?= $(shell python -c "from distutils.sysconfig import *; print get_python_lib()") +PACKAGEDIR ?= $(DESTDIR)/$(PYTHONLIBDIR)/ipa +CONFIGDIR ?= $(DESTDIR)/etc/ipa +TESTS = $(wildcard test/*.py) + +all: ; + +install: + if [ "$(DESTDIR)" = "" ]; then \ + python setup.py install; \ + else \ + python setup.py install --root $(DESTDIR); \ + fi + +clean: + rm -f *~ *.pyc + +distclean: clean + rm -f setup.py ipa-python.spec version.py + +maintainer-clean: distclean + rm -rf build + +.PHONY: test +test: $(subst .py,.tst,$(TESTS)) + +%.tst: %.py + python $< diff --git a/ipapython/README b/ipapython/README new file mode 100644 index 000000000..c966e44bf --- /dev/null +++ b/ipapython/README @@ -0,0 +1,18 @@ +This is a set of libraries common to IPA clients and servers though mostly +geared currently towards command-line tools. + +A brief overview: + +config.py - identify the IPA server domain and realm. It uses dnsclient to + try to detect this information first and will fall back to + /etc/ipa/ipa.conf if that fails. +dnsclient.py - find IPA information via DNS + +ipautil.py - helper functions + +radius_util.py - helper functions for Radius + +entity.py - entity is the main data type. User and Group extend this class + (but don't add anything currently). + +ipavalidate.py - basic data validation routines diff --git a/ipapython/__init__.py b/ipapython/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/ipapython/__init__.py diff --git a/ipapython/config.py b/ipapython/config.py new file mode 100644 index 000000000..efae910eb --- /dev/null +++ b/ipapython/config.py @@ -0,0 +1,183 @@ +# Authors: Karl MacMillan <kmacmill@redhat.com> +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import ConfigParser +from optparse import OptionParser, IndentedHelpFormatter + +import krbV +import socket +import ipapython.dnsclient +import re + +class IPAConfigError(Exception): + def __init__(self, msg=''): + self.msg = msg + Exception.__init__(self, msg) + + def __repr__(self): + return self.msg + + __str__ = __repr__ + +class IPAFormatter(IndentedHelpFormatter): + """Our own optparse formatter that indents multiple lined usage string.""" + def format_usage(self, usage): + usage_string = "Usage:" + spacing = " " * len(usage_string) + lines = usage.split("\n") + ret = "%s %s\n" % (usage_string, lines[0]) + for line in lines[1:]: + ret += "%s %s\n" % (spacing, line) + return ret + +def verify_args(parser, args, needed_args = None): + """Verify that we have all positional arguments we need, if not, exit.""" + if needed_args: + needed_list = needed_args.split(" ") + else: + needed_list = [] + len_need = len(needed_list) + len_have = len(args) + if len_have > len_need: + parser.error("too many arguments") + elif len_have < len_need: + parser.error("no %s specified" % needed_list[len_have]) + +class IPAConfig: + def __init__(self): + self.default_realm = None + self.default_server = [] + self.default_domain = None + + def get_realm(self): + if self.default_realm: + return self.default_realm + else: + raise IPAConfigError("no default realm") + + def get_server(self): + if len(self.default_server): + return self.default_server + else: + raise IPAConfigError("no default server") + + def get_domain(self): + if self.default_domain: + return self.default_domain + else: + raise IPAConfigError("no default domain") + +# Global library config +config = IPAConfig() + +def __parse_config(discover_server = True): + p = ConfigParser.SafeConfigParser() + p.read("/etc/ipa/ipa.conf") + + try: + if not config.default_realm: + config.default_realm = p.get("defaults", "realm") + except: + pass + if discover_server: + try: + s = p.get("defaults", "server") + config.default_server.extend(re.sub("\s+", "", s).split(',')) + except: + pass + try: + if not config.default_domain: + config.default_domain = p.get("defaults", "domain") + except: + pass + +def __discover_config(discover_server = True): + rl = 0 + try: + if not config.default_realm: + krbctx = krbV.default_context() + config.default_realm = krbctx.default_realm + if not config.default_realm: + return False + + if not config.default_domain: + #try once with REALM -> domain + dom_name = config.default_realm.lower() + name = "_ldap._tcp."+dom_name+"." + rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV) + rl = len(rs) + if rl == 0: + #try cycling on domain components of FQDN + dom_name = socket.getfqdn() + while rl == 0: + tok = dom_name.find(".") + if tok == -1: + return False + dom_name = dom_name[tok+1:] + name = "_ldap._tcp." + dom_name + "." + rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV) + rl = len(rs) + + config.default_domain = dom_name + + if discover_server: + if rl == 0: + name = "_ldap._tcp."+config.default_domain+"." + rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV) + + for r in rs: + if r.dns_type == ipapython.dnsclient.DNS_T_SRV: + rsrv = r.rdata.server.rstrip(".") + config.default_server.append(rsrv) + + except: + pass + +def add_standard_options(parser): + parser.add_option("--realm", dest="realm", help="Override default IPA realm") + parser.add_option("--server", dest="server", help="Override default IPA server") + parser.add_option("--domain", dest="domain", help="Override default IPA DNS domain") + +def init_config(options=None): + if options: + config.default_realm = options.realm + config.default_domain = options.domain + if options.server: + config.default_server.extend(options.server.split(",")) + + if len(config.default_server): + discover_server = False + else: + discover_server = True + __parse_config(discover_server) + __discover_config(discover_server) + + # make sure the server list only contains unique items + new_server = [] + for server in config.default_server: + if server not in new_server: + new_server.append(server) + config.default_server = new_server + + if not config.default_realm: + raise IPAConfigError("IPA realm not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.") + if not config.default_server: + raise IPAConfigError("IPA server not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.") + if not config.default_domain: + raise IPAConfigError("IPA domain not found in the config file (/etc/ipa/ipa.conf) or on the command line.") diff --git a/ipapython/dnsclient.py b/ipapython/dnsclient.py new file mode 100644 index 000000000..58d93d850 --- /dev/null +++ b/ipapython/dnsclient.py @@ -0,0 +1,443 @@ +# +# Copyright 2001, 2005 Red Hat, Inc. +# +# This is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 only +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. +# + +import struct +import socket +import sys + +import acutil + +DNS_C_IN = 1 +DNS_C_CS = 2 +DNS_C_CHAOS = 3 +DNS_C_HS = 4 +DNS_C_ANY = 255 + +DNS_T_A = 1 +DNS_T_NS = 2 +DNS_T_CNAME = 5 +DNS_T_SOA = 6 +DNS_T_NULL = 10 +DNS_T_WKS = 11 +DNS_T_PTR = 12 +DNS_T_HINFO = 13 +DNS_T_MX = 15 +DNS_T_TXT = 16 +DNS_T_SRV = 33 +DNS_T_ANY = 255 + +DEBUG_DNSCLIENT = False + +class DNSQueryHeader: + FORMAT = "!HBBHHHH" + def __init__(self): + self.dns_id = 0 + self.dns_rd = 0 + self.dns_tc = 0 + self.dns_aa = 0 + self.dns_opcode = 0 + self.dns_qr = 0 + self.dns_rcode = 0 + self.dns_z = 0 + self.dns_ra = 0 + self.dns_qdcount = 0 + self.dns_ancount = 0 + self.dns_nscount = 0 + self.dns_arcount = 0 + + def pack(self): + return struct.pack(DNSQueryHeader.FORMAT, + self.dns_id, + (self.dns_rd & 1) | + (self.dns_tc & 1) << 1 | + (self.dns_aa & 1) << 2 | + (self.dns_opcode & 15) << 3 | + (self.dns_qr & 1) << 7, + (self.dns_rcode & 15) | + (self.dns_z & 7) << 4 | + (self.dns_ra & 1) << 7, + self.dns_qdcount, + self.dns_ancount, + self.dns_nscount, + self.dns_arcount) + + def unpack(self, data): + (self.dns_id, byte1, byte2, self.dns_qdcount, self.dns_ancount, + self.dns_nscount, self.dns_arcount) = struct.unpack(DNSQueryHeader.FORMAT, data[0:self.size()]) + self.dns_rd = byte1 & 1 + self.dns_tc = (byte1 >> 1) & 1 + self.dns_aa = (byte1 >> 2) & 1 + self.dns_opcode = (byte1 >> 3) & 15 + self.dns_qr = (byte1 >> 7) & 1 + self.dns_rcode = byte2 & 15 + self.dns_z = (byte2 >> 4) & 7 + self.dns_ra = (byte1 >> 7) & 1 + + def size(self): + return struct.calcsize(DNSQueryHeader.FORMAT) + +def unpackQueryHeader(data): + header = DNSQueryHeader() + header.unpack(data) + return header + +class DNSResult: + FORMAT = "!HHIH" + QFORMAT = "!HH" + def __init__(self): + self.dns_name = "" + self.dns_type = 0 + self.dns_class = 0 + self.dns_ttl = 0 + self.dns_rlength = 0 + self.rdata = None + + def unpack(self, data): + (self.dns_type, self.dns_class, self.dns_ttl, + self.dns_rlength) = struct.unpack(DNSResult.FORMAT, data[0:self.size()]) + + def qunpack(self, data): + (self.dns_type, self.dns_class) = struct.unpack(DNSResult.QFORMAT, data[0:self.qsize()]) + + def size(self): + return struct.calcsize(DNSResult.FORMAT) + + def qsize(self): + return struct.calcsize(DNSResult.QFORMAT) + +class DNSRData: + def __init__(self): + pass + +#typedef struct dns_rr_a { +# u_int32_t address; +#} dns_rr_a_t; +# +#typedef struct dns_rr_cname { +# const char *cname; +#} dns_rr_cname_t; +# +#typedef struct dns_rr_hinfo { +# const char *cpu, *os; +#} dns_rr_hinfo_t; +# +#typedef struct dns_rr_mx { +# u_int16_t preference; +# const char *exchange; +#} dns_rr_mx_t; +# +#typedef struct dns_rr_null { +# unsigned const char *data; +#} dns_rr_null_t; +# +#typedef struct dns_rr_ns { +# const char *nsdname; +#} dns_rr_ns_t; +# +#typedef struct dns_rr_ptr { +# const char *ptrdname; +#} dns_rr_ptr_t; +# +#typedef struct dns_rr_soa { +# const char *mname; +# const char *rname; +# u_int32_t serial; +# int32_t refresh; +# int32_t retry; +# int32_t expire; +# int32_t minimum; +#} dns_rr_soa_t; +# +#typedef struct dns_rr_txt { +# const char *data; +#} dns_rr_txt_t; +# +#typedef struct dns_rr_srv { +# const char *server; +# u_int16_t priority; +# u_int16_t weight; +# u_int16_t port; +#} dns_rr_srv_t; + +def dnsNameToLabel(name): + out = "" + name = name.split(".") + for part in name: + out += chr(len(part)) + part + return out + +def dnsFormatQuery(query, qclass, qtype): + header = DNSQueryHeader() + + header.dns_id = 0 # FIXME: id = 0 + header.dns_rd = 1 # don't know why the original code didn't request recursion for non SOA requests + header.dns_qr = 0 # query + header.dns_opcode = 0 # standard query + header.dns_qdcount = 1 # single query + + qlabel = dnsNameToLabel(query) + if not qlabel: + return "" + + out = header.pack() + qlabel + out += chr(qtype >> 8) + out += chr(qtype & 0xff) + out += chr(qclass >> 8) + out += chr(qclass & 0xff) + + return out + +def dnsParseLabel(label, base): + # returns (output, rest) + if not label: + return ("", None) + + update = 1 + rest = label + output = "" + skip = 0 + + try: + while ord(rest[0]): + if ord(rest[0]) & 0xc0: + rest = base[((ord(rest[0]) & 0x3f) << 8) + ord(rest[1]):] + if update: + skip += 2 + update = 0 + continue + output += rest[1:ord(rest[0]) + 1] + "." + if update: + skip += ord(rest[0]) + 1 + rest = rest[ord(rest[0]) + 1:] + except IndexError: + return ("", None) + return (label[skip+update:], output) + +def dnsParseA(data, base): + rdata = DNSRData() + if len(data) < 4: + rdata.address = 0 + return None + + rdata.address = (ord(data[0])<<24) | (ord(data[1])<<16) | (ord(data[2])<<8) | (ord(data[3])<<0) + + if DEBUG_DNSCLIENT: + print "A = %d.%d.%d.%d." % (ord(data[0]), ord(data[1]), ord(data[2]), ord(data[3])) + return rdata + +def dnsParseText(data): + if len(data) < 1: + return ("", None) + tlen = ord(data[0]) + if len(data) < tlen + 1: + return ("", None) + return (data[tlen+1:], data[1:tlen+1]) + +def dnsParseNS(data, base): + rdata = DNSRData() + (rest, rdata.nsdname) = dnsParseLabel(data, base) + if DEBUG_DNSCLIENT: + print "NS DNAME = \"%s\"." % (rdata.nsdname) + return rdata + +def dnsParseCNAME(data, base): + rdata = DNSRData() + (rest, rdata.cname) = dnsParseLabel(data, base) + if DEBUG_DNSCLIENT: + print "CNAME = \"%s\"." % (rdata.cname) + return rdata + +def dnsParseSOA(data, base): + rdata = DNSRData() + format = "!IIIII" + + (rest, rdata.mname) = dnsParseLabel(data, base) + if rdata.mname is None: + return None + (rest, rdata.rname) = dnsParseLabel(rest, base) + if rdata.rname is None: + return None + if len(rest) < struct.calcsize(format): + return None + + (rdata.serial, rdata.refresh, rdata.retry, rdata.expire, + rdata.minimum) = struct.unpack(format, rest[:struct.calcsize(format)]) + + if DEBUG_DNSCLIENT: + print "SOA(mname) = \"%s\"." % rdata.mname + print "SOA(rname) = \"%s\"." % rdata.rname + print "SOA(serial) = %d." % rdata.serial + print "SOA(refresh) = %d." % rdata.refresh + print "SOA(retry) = %d." % rdata.retry + print "SOA(expire) = %d." % rdata.expire + print "SOA(minimum) = %d." % rdata.minimum + return rdata + +def dnsParseNULL(data, base): + # um, yeah + return None + +def dnsParseWKS(data, base): + return None + +def dnsParseHINFO(data, base): + rdata = DNSRData() + (rest, rdata.cpu) = dnsParseText(data) + if rest: + (rest, rdata.os) = dnsParseText(rest) + if DEBUG_DNSCLIENT: + print "HINFO(cpu) = \"%s\"." % rdata.cpu + print "HINFO(os) = \"%s\"." % rdata.os + return rdata + +def dnsParseMX(data, base): + rdata = DNSRData() + if len(data) < 2: + return None + rdata.preference = (ord(data[0]) << 8) | ord(data[1]) + (rest, rdata.exchange) = dnsParseLabel(data[2:], base) + if DEBUG_DNSCLIENT: + print "MX(exchanger) = \"%s\"." % rdata.exchange + print "MX(preference) = %d." % rdata.preference + return rdata + +def dnsParseTXT(data, base): + rdata = DNSRData() + (rest, rdata.data) = dnsParseText(data) + if DEBUG_DNSCLIENT: + print "TXT = \"%s\"." % rdata.data + return rdata + +def dnsParsePTR(data, base): + rdata = DNSRData() + (rest, rdata.ptrdname) = dnsParseLabel(data, base) + if DEBUG_DNSCLIENT: + print "PTR = \"%s\"." % rdata.ptrdname + return rdata + +def dnsParseSRV(data, base): + rdata = DNSRData() + format = "!HHH" + flen = struct.calcsize(format) + if len(data) < flen: + return None + + (rdata.priority, rdata.weight, rdata.port) = struct.unpack(format, data[:flen]) + (rest, rdata.server) = dnsParseLabel(data[flen:], base) + if DEBUG_DNSCLIENT: + print "SRV(server) = \"%s\"." % rdata.server + print "SRV(weight) = %d." % rdata.weight + print "SRV(priority) = %d." % rdata.priority + print "SRV(port) = %d." % rdata.port + return rdata + +def dnsParseResults(results): + try: + header = unpackQueryHeader(results) + except struct.error: + return [] + + if header.dns_qr != 1: # should be a response + return [] + + if header.dns_rcode != 0: # should be no error + return [] + + rest = results[header.size():] + + rrlist = [] + + for i in xrange(header.dns_qdcount): + if not rest: + return [] + + qq = DNSResult() + + (rest, label) = dnsParseLabel(rest, results) + if label is None: + return [] + + if len(rest) < qq.qsize(): + return [] + + qq.qunpack(rest) + + rest = rest[qq.qsize():] + + if DEBUG_DNSCLIENT: + print "Queried for '%s', class = %d, type = %d." % (label, + qq.dns_class, qq.dns_type) + + for i in xrange(header.dns_ancount + header.dns_nscount + header.dns_arcount): + (rest, label) = dnsParseLabel(rest, results) + if label is None: + return [] + + rr = DNSResult() + + rr.dns_name = label + + if len(rest) < rr.size(): + return [] + + rr.unpack(rest) + + rest = rest[rr.size():] + + if DEBUG_DNSCLIENT: + print "Answer %d for '%s', class = %d, type = %d, ttl = %d." % (i, + rr.dns_name, rr.dns_class, rr.dns_type, + rr.dns_ttl) + + if len(rest) < rr.dns_rlength: + if DEBUG_DNSCLIENT: + print "Answer too short." + return [] + + fmap = { DNS_T_A: dnsParseA, DNS_T_NS: dnsParseNS, + DNS_T_CNAME: dnsParseCNAME, DNS_T_SOA: dnsParseSOA, + DNS_T_NULL: dnsParseNULL, DNS_T_WKS: dnsParseWKS, + DNS_T_PTR: dnsParsePTR, DNS_T_HINFO: dnsParseHINFO, + DNS_T_MX: dnsParseMX, DNS_T_TXT: dnsParseTXT, + DNS_T_SRV: dnsParseSRV} + + if not rr.dns_type in fmap: + if DEBUG_DNSCLIENT: + print "Don't know how to parse RR type %d!" % rr.dns_type + else: + rr.rdata = fmap[rr.dns_type](rest[:rr.dns_rlength], results) + + rest = rest[rr.dns_rlength:] + rrlist += [rr] + + return rrlist + +def query(query, qclass, qtype): + qdata = dnsFormatQuery(query, qclass, qtype) + if not qdata: + return [] + answer = acutil.res_send(qdata) + if not answer: + return [] + return dnsParseResults(answer) + +if __name__ == '__main__': + DEBUG_DNSCLIENT = True + print "Sending query." + rr = query(len(sys.argv) > 1 and sys.argv[1] or "devserv.devel.redhat.com.", + DNS_C_IN, DNS_T_ANY) + sys.exit(0) diff --git a/ipapython/entity.py b/ipapython/entity.py new file mode 100644 index 000000000..580cbd00b --- /dev/null +++ b/ipapython/entity.py @@ -0,0 +1,202 @@ +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import ldap +import ldif +import re +import cStringIO +import copy + +import ipapython.ipautil + +def utf8_encode_value(value): + if isinstance(value,unicode): + return value.encode('utf-8') + return value + +def utf8_encode_values(values): + if isinstance(values,list) or isinstance(values,tuple): + return map(utf8_encode_value, values) + else: + return utf8_encode_value(values) + +def copy_CIDict(x): + """Do a deep copy of a CIDict""" + y = {} + for key, value in x.iteritems(): + y[copy.deepcopy(key)] = copy.deepcopy(value) + return y + +class Entity: + """This class represents an IPA user. An LDAP entry consists of a DN + and a list of attributes. Each attribute consists of a name and a list of + values. For the time being I will maintain this. + + In python-ldap, entries are returned as a list of 2-tuples. + Instance variables: + dn - string - the string DN of the entry + data - CIDict - case insensitive dict of the attributes and values + orig_data - CIDict - case insentiive dict of the original attributes and values""" + + def __init__(self,entrydata=None): + """data is the raw data returned from the python-ldap result method, + which is a search result entry or a reference or None. + If creating a new empty entry, data is the string DN.""" + if entrydata: + if isinstance(entrydata,tuple): + self.dn = entrydata[0] + self.data = ipapython.ipautil.CIDict(entrydata[1]) + elif isinstance(entrydata,str) or isinstance(entrydata,unicode): + self.dn = entrydata + self.data = ipapython.ipautil.CIDict() + elif isinstance(entrydata,dict): + self.dn = entrydata['dn'] + del entrydata['dn'] + self.data = ipapython.ipautil.CIDict(entrydata) + else: + self.dn = '' + self.data = ipapython.ipautil.CIDict() + + self.orig_data = ipapython.ipautil.CIDict(copy_CIDict(self.data)) + + def __nonzero__(self): + """This allows us to do tests like if entry: returns false if there is no data, + true otherwise""" + return self.data != None and len(self.data) > 0 + + def hasAttr(self,name): + """Return True if this entry has an attribute named name, False otherwise""" + return self.data and self.data.has_key(name) + + def __setattr__(self,name,value): + """One should use setValue() or setValues() to set values except for + dn and data which are special.""" + if name != 'dn' and name != 'data' and name != 'orig_data': + raise KeyError, 'use setValue() or setValues()' + else: + self.__dict__[name] = value + + def __getattr__(self,name): + """If name is the name of an LDAP attribute, return the first value for that + attribute - equivalent to getValue - this allows the use of + entry.cn + instead of + entry.getValue('cn') + This also allows us to return None if an attribute is not found rather than + throwing an exception""" + return self.getValue(name) + + def getValues(self,name): + """Get the list (array) of values for the attribute named name""" + return self.data.get(name) + + def getValue(self,name,default=None): + """Get the first value for the attribute named name""" + value = self.data.get(name,default) + if isinstance(value,list) or isinstance(value,tuple): + return value[0] + else: + return value + + def setValue(self,name,*value): + """Value passed in may be a single value, several values, or a single sequence. + For example: + ent.setValue('name', 'value') + ent.setValue('name', 'value1', 'value2', ..., 'valueN') + ent.setValue('name', ['value1', 'value2', ..., 'valueN']) + ent.setValue('name', ('value1', 'value2', ..., 'valueN')) + Since *value is a tuple, we may have to extract a list or tuple from that + tuple as in the last two examples above""" + if (len(value) < 1): + return + if (len(value) == 1): + self.data[name] = utf8_encode_values(value[0]) + else: + self.data[name] = utf8_encode_values(value) + + setValues = setValue + + def setValueNotEmpty(self,name,*value): + """Similar to setValue() but will not set an empty field. This + is an attempt to avoid adding empty attributes.""" + if (len(value) >= 1) and value[0] and len(value[0]) > 0: + if isinstance(value[0], list): + if len(value[0][0]) > 0: + self.setValue(name, *value) + return + else: + self.setValue(name, *value) + return + + # At this point we have an empty incoming value. See if they are + # trying to erase the current value. If so we'll delete it so + # it gets marked as removed in the modlist. + v = self.getValues(name) + if v: + self.delValue(name) + + return + + def delValue(self,name): + """Remove the attribute named name.""" + if self.data.get(name,None): + del self.data[name] + + def toTupleList(self): + """Convert the attrs and values to a list of 2-tuples. The first element + of the tuple is the attribute name. The second element is either a + single value or a list of values.""" + return self.data.items() + + def toDict(self): + """Convert the attrs and values to a dict. The dict is keyed on the + attribute name. The value is either single value or a list of values.""" + result = ipapython.ipautil.CIDict(self.data) + result['dn'] = self.dn + return result + + def attrList(self): + """Return a list of all attributes in the entry""" + return self.data.keys() + + def origDataDict(self): + """Returns a dict of the original values of the user. Used for updates.""" + result = ipapython.ipautil.CIDict(self.orig_data) + result['dn'] = self.dn + return result + +# def __str__(self): +# """Convert the Entry to its LDIF representation""" +# return self.__repr__() +# +# # the ldif class base64 encodes some attrs which I would rather see in raw form - to +# # encode specific attrs as base64, add them to the list below +# ldif.safe_string_re = re.compile('^$') +# base64_attrs = ['nsstate', 'krbprincipalkey', 'krbExtraData'] +# +# def __repr__(self): +# """Convert the Entry to its LDIF representation""" +# sio = cStringIO.StringIO() +# # what's all this then? the unparse method will currently only accept +# # a list or a dict, not a class derived from them. self.data is a +# # cidict, so unparse barfs on it. I've filed a bug against python-ldap, +# # but in the meantime, we have to convert to a plain old dict for printing +# # I also don't want to see wrapping, so set the line width really high (1000) +# newdata = {} +# newdata.update(self.data) +# ldif.LDIFWriter(sio,User.base64_attrs,1000).unparse(self.dn,newdata) +# return sio.getvalue() diff --git a/ipapython/ipa.conf b/ipapython/ipa.conf new file mode 100644 index 000000000..516f764d5 --- /dev/null +++ b/ipapython/ipa.conf @@ -0,0 +1,3 @@ +[defaults] +# realm = EXAMPLE.COM +# server = ipa.example.com diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py new file mode 100644 index 000000000..57f5dcd9b --- /dev/null +++ b/ipapython/ipautil.py @@ -0,0 +1,969 @@ +# Authors: Simo Sorce <ssorce@redhat.com> +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +SHARE_DIR = "/usr/share/ipa/" +PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins" + +import string +import tempfile +import logging +import subprocess +import random +import os, sys, traceback, readline +import stat +import shutil + +from ipapython import ipavalidate +from types import * + +import re +import xmlrpclib +import datetime +from ipapython import config +try: + from subprocess import CalledProcessError + class CalledProcessError(subprocess.CalledProcessError): + def __init__(self, returncode, cmd): + super(CalledProcessError, self).__init__(returncode, cmd) +except ImportError: + # Python 2.4 doesn't implement CalledProcessError + class CalledProcessError(Exception): + """This exception is raised when a process run by check_call() returns + a non-zero exit status. The exit status will be stored in the + returncode attribute.""" + def __init__(self, returncode, cmd): + self.returncode = returncode + self.cmd = cmd + def __str__(self): + return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) + +def get_domain_name(): + try: + config.init_config() + domain_name = config.config.get_domain() + except Exception, e: + return None + + return domain_name + +def realm_to_suffix(realm_name): + s = realm_name.split(".") + terms = ["dc=" + x.lower() for x in s] + return ",".join(terms) + +def template_str(txt, vars): + return string.Template(txt).substitute(vars) + +def template_file(infilename, vars): + txt = open(infilename).read() + return template_str(txt, vars) + +def write_tmp_file(txt): + fd = tempfile.NamedTemporaryFile() + fd.write(txt) + fd.flush() + + return fd + +def run(args, stdin=None): + if stdin: + p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) + stdout,stderr = p.communicate(stdin) + else: + p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) + stdout,stderr = p.communicate() + + logging.info(stdout) + logging.info(stderr) + + if p.returncode != 0: + raise CalledProcessError(p.returncode, ' '.join(args)) + + return (stdout, stderr) + +def file_exists(filename): + try: + mode = os.stat(filename)[stat.ST_MODE] + if stat.S_ISREG(mode): + return True + else: + return False + except: + return False + +def dir_exists(filename): + try: + mode = os.stat(filename)[stat.ST_MODE] + if stat.S_ISDIR(mode): + return True + else: + return False + except: + return False + +def install_file(fname, dest): + if file_exists(dest): + os.rename(dest, dest + ".orig") + shutil.move(fname, dest) + +def backup_file(fname): + if file_exists(fname): + os.rename(fname, fname + ".orig") + +# uses gpg to compress and encrypt a file +def encrypt_file(source, dest, password, workdir = None): + if type(source) is not StringType or not len(source): + raise ValueError('Missing Source File') + #stat it so that we get back an exception if it does no t exist + os.stat(source) + + if type(dest) is not StringType or not len(dest): + raise ValueError('Missing Destination File') + + if type(password) is not StringType or not len(password): + raise ValueError('Missing Password') + + #create a tempdir so that we can clean up with easily + tempdir = tempfile.mkdtemp('', 'ipa-', workdir) + gpgdir = tempdir+"/.gnupg" + + try: + try: + #give gpg a fake dir so that we can leater remove all + #the cruft when we clean up the tempdir + os.mkdir(gpgdir) + args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-c', source] + run(args, password) + except: + raise + finally: + #job done, clean up + shutil.rmtree(tempdir, ignore_errors=True) + + +def decrypt_file(source, dest, password, workdir = None): + if type(source) is not StringType or not len(source): + raise ValueError('Missing Source File') + #stat it so that we get back an exception if it does no t exist + os.stat(source) + + if type(dest) is not StringType or not len(dest): + raise ValueError('Missing Destination File') + + if type(password) is not StringType or not len(password): + raise ValueError('Missing Password') + + #create a tempdir so that we can clean up with easily + tempdir = tempfile.mkdtemp('', 'ipa-', workdir) + gpgdir = tempdir+"/.gnupg" + + try: + try: + #give gpg a fake dir so that we can leater remove all + #the cruft when we clean up the tempdir + os.mkdir(gpgdir) + args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-d', source] + run(args, password) + except: + raise + finally: + #job done, clean up + shutil.rmtree(tempdir, ignore_errors=True) + + +class CIDict(dict): + """ + Case-insensitive but case-respecting dictionary. + + This code is derived from python-ldap's cidict.py module, + written by stroeder: http://python-ldap.sourceforge.net/ + + This version extends 'dict' so it works properly with TurboGears. + If you extend UserDict, isinstance(foo, dict) returns false. + """ + + def __init__(self,default=None): + super(CIDict, self).__init__() + self._keys = {} + self.update(default or {}) + + def __getitem__(self,key): + return super(CIDict,self).__getitem__(string.lower(key)) + + def __setitem__(self,key,value): + lower_key = string.lower(key) + self._keys[lower_key] = key + return super(CIDict,self).__setitem__(string.lower(key),value) + + def __delitem__(self,key): + lower_key = string.lower(key) + del self._keys[lower_key] + return super(CIDict,self).__delitem__(string.lower(key)) + + def update(self,dict): + for key in dict.keys(): + self[key] = dict[key] + + def has_key(self,key): + return super(CIDict, self).has_key(string.lower(key)) + + def get(self,key,failobj=None): + try: + return self[key] + except KeyError: + return failobj + + def keys(self): + return self._keys.values() + + def items(self): + result = [] + for k in self._keys.values(): + result.append((k,self[k])) + return result + + def copy(self): + copy = {} + for k in self._keys.values(): + copy[k] = self[k] + return copy + + def iteritems(self): + return self.copy().iteritems() + + def iterkeys(self): + return self.copy().iterkeys() + + def setdefault(self,key,value=None): + try: + return self[key] + except KeyError: + self[key] = value + return value + + def pop(self, key, *args): + try: + value = self[key] + del self[key] + return value + except KeyError: + if len(args) == 1: + return args[0] + raise + + def popitem(self): + (lower_key,value) = super(CIDict,self).popitem() + key = self._keys[lower_key] + del self._keys[lower_key] + + return (key,value) + + +# +# The safe_string_re regexp and needs_base64 function are extracted from the +# python-ldap ldif module, which was +# written by Michael Stroeder <michael@stroeder.com> +# http://python-ldap.sourceforge.net +# +# It was extracted because ipaldap.py is naughtily reaching into the ldif +# module and squashing this regexp. +# +SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)' +safe_string_re = re.compile(SAFE_STRING_PATTERN) + +def needs_base64(s): + """ + returns 1 if s has to be base-64 encoded because of special chars + """ + return not safe_string_re.search(s) is None + + +def wrap_binary_data(data): + """Converts all binary data strings into Binary objects for transport + back over xmlrpc.""" + if isinstance(data, str): + if needs_base64(data): + return xmlrpclib.Binary(data) + else: + return data + elif isinstance(data, list) or isinstance(data,tuple): + retval = [] + for value in data: + retval.append(wrap_binary_data(value)) + return retval + elif isinstance(data, dict): + retval = {} + for (k,v) in data.iteritems(): + retval[k] = wrap_binary_data(v) + return retval + else: + return data + + +def unwrap_binary_data(data): + """Converts all Binary objects back into strings.""" + if isinstance(data, xmlrpclib.Binary): + # The data is decoded by the xmlproxy, but is stored + # in a binary object for us. + return str(data) + elif isinstance(data, str): + return data + elif isinstance(data, list) or isinstance(data,tuple): + retval = [] + for value in data: + retval.append(unwrap_binary_data(value)) + return retval + elif isinstance(data, dict): + retval = {} + for (k,v) in data.iteritems(): + retval[k] = unwrap_binary_data(v) + return retval + else: + return data + +class GeneralizedTimeZone(datetime.tzinfo): + """This class is a basic timezone wrapper for the offset specified + in a Generalized Time. It is dst-ignorant.""" + def __init__(self,offsetstr="Z"): + super(GeneralizedTimeZone, self).__init__() + + self.name = offsetstr + self.houroffset = 0 + self.minoffset = 0 + + if offsetstr == "Z": + self.houroffset = 0 + self.minoffset = 0 + else: + if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr): + self.houroffset = int(offsetstr[0:3]) + offsetstr = offsetstr[3:] + if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr): + self.minoffset = int(offsetstr[0:2]) + offsetstr = offsetstr[2:] + if len(offsetstr) > 0: + raise ValueError() + if self.houroffset < 0: + self.minoffset *= -1 + + def utcoffset(self, dt): + return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset) + + def dst(self, dt): + return datetime.timedelta(0) + + def tzname(self, dt): + return self.name + + +def parse_generalized_time(timestr): + """Parses are Generalized Time string (as specified in X.680), + returning a datetime object. Generalized Times are stored inside + the krbPasswordExpiration attribute in LDAP. + + This method doesn't attempt to be perfect wrt timezones. If python + can't be bothered to implement them, how can we...""" + + if len(timestr) < 8: + return None + try: + date = timestr[:8] + time = timestr[8:] + + year = int(date[:4]) + month = int(date[4:6]) + day = int(date[6:8]) + + hour = min = sec = msec = 0 + tzone = None + + if (len(time) >= 2) and re.match(r'\d', time[0]): + hour = int(time[:2]) + time = time[2:] + if len(time) >= 2 and (time[0] == "," or time[0] == "."): + hour_fraction = "." + time = time[1:] + while (len(time) > 0) and re.match(r'\d', time[0]): + hour_fraction += time[0] + time = time[1:] + total_secs = int(float(hour_fraction) * 3600) + min, sec = divmod(total_secs, 60) + + if (len(time) >= 2) and re.match(r'\d', time[0]): + min = int(time[:2]) + time = time[2:] + if len(time) >= 2 and (time[0] == "," or time[0] == "."): + min_fraction = "." + time = time[1:] + while (len(time) > 0) and re.match(r'\d', time[0]): + min_fraction += time[0] + time = time[1:] + sec = int(float(min_fraction) * 60) + + if (len(time) >= 2) and re.match(r'\d', time[0]): + sec = int(time[:2]) + time = time[2:] + if len(time) >= 2 and (time[0] == "," or time[0] == "."): + sec_fraction = "." + time = time[1:] + while (len(time) > 0) and re.match(r'\d', time[0]): + sec_fraction += time[0] + time = time[1:] + msec = int(float(sec_fraction) * 1000000) + + if (len(time) > 0): + tzone = GeneralizedTimeZone(time) + + return datetime.datetime(year, month, day, hour, min, sec, msec, tzone) + + except ValueError: + return None + +def ipa_generate_password(): + rndpwd = '' + r = random.SystemRandom() + for x in range(12): + rndpwd += chr(r.randint(32,126)) + return rndpwd + + +def format_list(items, quote=None, page_width=80): + '''Format a list of items formatting them so they wrap to fit the + available width. The items will be sorted. + + The items may optionally be quoted. The quote parameter may either be + a string, in which case it is added before and after the item. Or the + quote parameter may be a pair (either a tuple or list). In this case + quote[0] is left hand quote and quote[1] is the right hand quote. + ''' + left_quote = right_quote = '' + num_items = len(items) + if not num_items: return "" + + if quote is not None: + if type(quote) in StringTypes: + left_quote = right_quote = quote + elif type(quote) is TupleType or type(quote) is ListType: + left_quote = quote[0] + right_quote = quote[1] + + max_len = max(map(len, items)) + max_len += len(left_quote) + len(right_quote) + num_columns = (page_width + max_len) / (max_len+1) + num_rows = (num_items + num_columns - 1) / num_columns + items.sort() + + rows = [''] * num_rows + i = row = col = 0 + + while i < num_items: + row = 0 + if col == 0: + separator = '' + else: + separator = ' ' + + while i < num_items and row < num_rows: + rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote)) + i += 1 + row += 1 + col += 1 + return '\n'.join(rows) + +key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))") +def parse_key_value_pairs(input): + ''' Given a string composed of key=value pairs parse it and return + a dict of the key/value pairs. Keys must be a word, a key must be followed + by an equal sign (=) and a value. The value may be a single word or may be + quoted. Quotes may be either single or double quotes, but must be balanced. + Inside the quoted text the same quote used to start the quoted value may be + used if it is escaped by preceding it with a backslash (\). + White space between the key, the equal sign, and the value is ignored. + Values are always strings. Empty values must be specified with an empty + quoted string, it's value after parsing will be an empty string. + + Example: The string + + arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote" + + will produce + + arg0= arg1=1 + arg2=two + arg3=three's a crowd + arg4=this is a " quote + ''' + + kv_dict = {} + for match in key_value_re.finditer(input): + key = match.group(1) + quote = match.group('quote') + if match.group(5): + value = match.group(6) + if value is None: value = '' + value = re.sub('\\\%s' % quote, quote, value) + else: + value = match.group(2) + kv_dict[key] = value + return kv_dict + +def parse_items(text): + '''Given text with items separated by whitespace or comma, return a list of those items''' + split_re = re.compile('[ ,\t\n]+') + items = split_re.split(text) + for item in items[:]: + if not item: items.remove(item) + return items + +def read_pairs_file(filename): + comment_re = re.compile('#.*$', re.MULTILINE) + if filename == '-': + fd = sys.stdin + else: + fd = open(filename) + text = fd.read() + text = comment_re.sub('', text) # kill comments + pairs = parse_key_value_pairs(text) + if fd != sys.stdin: fd.close() + return pairs + +def read_items_file(filename): + comment_re = re.compile('#.*$', re.MULTILINE) + if filename == '-': + fd = sys.stdin + else: + fd = open(filename) + text = fd.read() + text = comment_re.sub('', text) # kill comments + items = parse_items(text) + if fd != sys.stdin: fd.close() + return items + +def user_input(prompt, default = None, allow_empty = True): + if default == None: + while True: + ret = raw_input("%s: " % prompt) + if allow_empty or ret.strip(): + return ret + + if isinstance(default, basestring): + while True: + ret = raw_input("%s [%s]: " % (prompt, default)) + if not ret and (allow_empty or default): + return default + elif ret.strip(): + return ret + if isinstance(default, bool): + if default: + choice = "yes" + else: + choice = "no" + while True: + ret = raw_input("%s [%s]: " % (prompt, choice)) + if not ret: + return default + elif ret.lower()[0] == "y": + return True + elif ret.lower()[0] == "n": + return False + if isinstance(default, int): + while True: + try: + ret = raw_input("%s [%s]: " % (prompt, default)) + if not ret: + return default + ret = int(ret) + except ValueError: + pass + else: + return ret + +def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True): + while True: + ret = user_input(prompt, default, allow_empty) + if ipavalidate.Plain(ret, not allow_empty, allow_spaces): + return ret + +def user_input_path(prompt, default = None, allow_empty = True): + if default != None and allow_empty: + prompt += " (enter \"none\" for empty)" + while True: + ret = user_input(prompt, default, allow_empty) + if allow_empty and ret.lower() == "none": + return "" + if ipavalidate.Path(ret, not allow_empty): + return ret + +class AttributeValueCompleter: + ''' + Gets input from the user in the form "lhs operator rhs" + TAB completes partial input. + lhs completes to a name in @lhs_names + The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will + complete to the operator and a default value. + Default values for a lhs value can specified as: + - a string, all lhs values will use this default + - a dict, the lhs value is looked up in the dict to return the default or None + - a function with a single arg, the lhs value, it returns the default or None + + After creating the completer you must open it to set the terminal + up, Then get a line of input from the user by calling read_input() + which returns two values, the lhs and rhs, which might be None if + lhs or rhs was not parsed. After you are done getting input you + should close the completer to restore the terminal. + + Example: (note this is essentially what the convenience function get_pairs() does) + + This will allow the user to autocomplete foo & foobar, both have + defaults defined in a dict. In addition the foobar attribute must + be specified before the prompting loop will exit. Also, this + example show how to require that each attrbute entered by the user + is valid. + + attrs = ['foo', 'foobar'] + defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'} + mandatory_attrs = ['foobar'] + + c = AttributeValueCompleter(attrs, defaults) + c.open() + mandatory_attrs_remaining = mandatory_attrs[:] + + while True: + if mandatory_attrs_remaining: + attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0]) + try: + mandatory_attrs_remaining.remove(attribute) + except ValueError: + pass + else: + attribute, value = c.read_input("Enter: ") + if attribute is None: + # Are we done? + if mandatory_attrs_remaining: + print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) + continue + else: + break + if attribute not in attrs: + print "ERROR: %s is not a valid attribute" % (attribute) + else: + print "got '%s' = '%s'" % (attribute, value) + + c.close() + print "exiting..." + ''' + + def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =', + operator='=', strip_rhs=True): + self.lhs_names = lhs_names + self.default_value = default_value + # lhs_regexp must have named group 'lhs' which returns the contents of the lhs + self.lhs_regexp = lhs_regexp + self.lhs_re = re.compile(self.lhs_regexp) + self.lhs_delims = lhs_delims + self.operator = operator + self.strip_rhs = strip_rhs + self.pairs = None + self._reset() + + def _reset(self): + self.lhs = None + self.lhs_complete = False + self.operator_complete = False + self.rhs = None + + def open(self): + # Save state + self.prev_completer = readline.get_completer() + self.prev_completer_delims = readline.get_completer_delims() + + # Set up for ourself + readline.parse_and_bind("tab: complete") + readline.set_completer(self.complete) + readline.set_completer_delims(self.lhs_delims) + + def close(self): + # Restore previous state + readline.set_completer_delims(self.prev_completer_delims) + readline.set_completer(self.prev_completer) + + def parse_input(self): + '''We are looking for 3 tokens: <lhs,op,rhs> + Extract as much of each token as possible. + Set flags indicating if token is fully parsed. + ''' + try: + self._reset() + buf_len = len(self.line_buffer) + pos = 0 + lhs_match = self.lhs_re.search(self.line_buffer, pos) + if not lhs_match: return # no lhs content + self.lhs = lhs_match.group('lhs') # get lhs contents + pos = lhs_match.end('lhs') # new scanning position + if pos == buf_len: return # nothing after lhs, lhs incomplete + self.lhs_complete = True # something trails the lhs, lhs is complete + operator_beg = self.line_buffer.find(self.operator, pos) # locate operator + if operator_beg == -1: return # did not find the operator + self.operator_complete = True # operator fully parsed + operator_end = operator_beg + len(self.operator) + pos = operator_end # step over the operator + self.rhs = self.line_buffer[pos:] + except Exception, e: + traceback.print_exc() + print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e) + + def get_default_value(self): + '''default_value can be a string, a dict, or a function. + If it's a string it's a global default for all attributes. + If it's a dict the default is looked up in the dict index by attribute. + If it's a function, the function is called with 1 parameter, the attribute + and it should return the default value for the attriubte or None''' + + if not self.lhs_complete: raise ValueError("attribute not parsed") + + # If the user previously provided a value let that override the supplied default + if self.pairs is not None: + prev_value = self.pairs.get(self.lhs) + if prev_value is not None: return prev_value + + # No previous user provided value, query for a default + default_value_type = type(self.default_value) + if default_value_type is DictType: + return self.default_value.get(self.lhs, None) + elif default_value_type is FunctionType: + return self.default_value(self.lhs) + elif default_value_type is StringsType: + return self.default_value + else: + return None + + def get_lhs_completions(self, text): + if text: + self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)] + else: + self.completions = self.lhs_names + + def complete(self, text, state): + self.line_buffer= readline.get_line_buffer() + self.parse_input() + if not self.lhs_complete: + # lhs is not complete, set up to complete the lhs + if state == 0: + beg = readline.get_begidx() + end = readline.get_endidx() + self.get_lhs_completions(self.line_buffer[beg:end]) + if state >= len(self.completions): return None + return self.completions[state] + + + elif not self.operator_complete: + # lhs is complete, but the operator is not so we complete + # by inserting the operator manually. + # Also try to complete the default value at this time. + readline.insert_text('%s ' % self.operator) + default_value = self.get_default_value() + if default_value is not None: + readline.insert_text(default_value) + readline.redisplay() + return None + else: + # lhs and operator are complete, if the the rhs is blank + # (either empty or only only whitespace) then attempt + # to complete by inserting the default value, otherwise + # there is nothing we can complete to so we're done. + if self.rhs.strip(): + return None + default_value = self.get_default_value() + if default_value is not None: + readline.insert_text(default_value) + readline.redisplay() + return None + + def pre_input_hook(self): + readline.insert_text('%s %s ' % (self.initial_lhs, self.operator)) + readline.redisplay() + + def read_input(self, prompt, initial_lhs=None): + self.initial_lhs = initial_lhs + try: + self._reset() + if initial_lhs is None: + readline.set_pre_input_hook(None) + else: + readline.set_pre_input_hook(self.pre_input_hook) + self.line_buffer = raw_input(prompt).strip() + self.parse_input() + if self.strip_rhs and self.rhs is not None: + return self.lhs, self.rhs.strip() + else: + return self.lhs, self.rhs + except EOFError: + return None, None + + def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True): + self.pairs = {} + if mandatory_attrs: + mandatory_attrs_remaining = mandatory_attrs[:] + else: + mandatory_attrs_remaining = [] + + print "Enter name = value" + print "Press <ENTER> to accept, a blank line terminates input" + print "Pressing <TAB> will auto completes name, assignment, and value" + print + while True: + if mandatory_attrs_remaining: + attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0]) + else: + attribute, value = self.read_input(prompt) + if attribute is None: + # Are we done? + if mandatory_attrs_remaining: + print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining)) + continue + else: + break + if value is None: + if value_required: + print "ERROR: you must specify a value for %s" % attribute + continue + else: + if must_match and attribute not in self.lhs_names: + print "ERROR: %s is not a valid name" % (attribute) + continue + if validate_callback is not None: + if not validate_callback(attribute, value): + print "ERROR: %s is not valid for %s" % (value, attribute) + continue + try: + mandatory_attrs_remaining.remove(attribute) + except ValueError: + pass + + self.pairs[attribute] = value + return self.pairs + +class ItemCompleter: + ''' + Prompts the user for items in a list of items with auto completion. + TAB completes partial input. + More than one item can be specifed during input, whitespace and/or comma's seperate. + Example: + + possible_items = ['foo', 'bar'] + c = ItemCompleter(possible_items) + c.open() + # Use read_input() to limit input to a single carriage return (e.g. <ENTER>) + #items = c.read_input("Enter: ") + # Use get_items to iterate until a blank line is entered. + items = c.get_items("Enter: ") + c.close() + print "items=%s" % (items) + + ''' + + def __init__(self, items): + self.items = items + self.initial_input = None + self.item_delims = ' \t,' + self.split_re = re.compile('[%s]+' % self.item_delims) + + def open(self): + # Save state + self.prev_completer = readline.get_completer() + self.prev_completer_delims = readline.get_completer_delims() + + # Set up for ourself + readline.parse_and_bind("tab: complete") + readline.set_completer(self.complete) + readline.set_completer_delims(self.item_delims) + + def close(self): + # Restore previous state + readline.set_completer_delims(self.prev_completer_delims) + readline.set_completer(self.prev_completer) + + def get_item_completions(self, text): + if text: + self.completions = [lhs for lhs in self.items if lhs.startswith(text)] + else: + self.completions = self.items + + def complete(self, text, state): + self.line_buffer= readline.get_line_buffer() + if state == 0: + beg = readline.get_begidx() + end = readline.get_endidx() + self.get_item_completions(self.line_buffer[beg:end]) + if state >= len(self.completions): return None + return self.completions[state] + + def pre_input_hook(self): + readline.insert_text('%s %s ' % (self.initial_input, self.operator)) + readline.redisplay() + + def read_input(self, prompt, initial_input=None): + items = [] + + self.initial_input = initial_input + try: + if initial_input is None: + readline.set_pre_input_hook(None) + else: + readline.set_pre_input_hook(self.pre_input_hook) + self.line_buffer = raw_input(prompt).strip() + items = self.split_re.split(self.line_buffer) + for item in items[:]: + if not item: items.remove(item) + return items + except EOFError: + return items + + def get_items(self, prompt, must_match=True): + items = [] + + print "Enter name [name ...]" + print "Press <ENTER> to accept, blank line or control-D terminates input" + print "Pressing <TAB> auto completes name" + print + while True: + new_items = self.read_input(prompt) + if not new_items: break + for item in new_items: + if must_match: + if item not in self.items: + print "ERROR: %s is not valid" % (item) + continue + if item in items: continue + items.append(item) + + return items + +def get_gsserror(e): + """A GSSError exception looks differently in python 2.4 than it does + in python 2.5, deal with it.""" + + try: + primary = e[0] + secondary = e[1] + except: + primary = e[0][0] + secondary = e[0][1] + + return (primary[0], secondary[0]) diff --git a/ipapython/ipavalidate.py b/ipapython/ipavalidate.py new file mode 100644 index 000000000..63e0a7614 --- /dev/null +++ b/ipapython/ipavalidate.py @@ -0,0 +1,137 @@ +# Authors: Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import re + +def Email(mail, notEmpty=True): + """Do some basic validation of an e-mail address. + Return True if ok + Return False if not + + If notEmpty is True the this will return an error if the field + is "" or None. + """ + usernameRE = re.compile(r"^[^ \t\n\r@<>()]+$", re.I) + domainRE = re.compile(r"^[a-z0-9][a-z0-9\.\-_]*\.[a-z]+$", re.I) + + if not mail or mail is None: + if notEmpty is True: + return False + else: + return True + + mail = mail.strip() + s = mail.split('@', 1) + try: + username, domain=s + except ValueError: + return False + if not usernameRE.search(username): + return False + if not domainRE.search(domain): + return False + + return True + +def Plain(text, notEmpty=False, allowSpaces=True): + """Do some basic validation of a plain text field + Return True if ok + Return False if not + + If notEmpty is True the this will return an error if the field + is "" or None. + """ + if (text is None) or (not text.strip()): + if notEmpty is True: + return False + else: + return True + + if allowSpaces: + textRE = re.compile(r"^[a-zA-Z_\-0-9\'\ ]*$") + else: + textRE = re.compile(r"^[a-zA-Z_\-0-9\']*$") + if not textRE.search(text): + return False + + return True + +def String(text, notEmpty=False): + """A string type. This is much looser in what it allows than plain""" + + if text is None or not text.strip(): + if notEmpty is True: + return False + else: + return True + + return True + +def Path(text, notEmpty=False): + """Do some basic validation of a path + Return True if ok + Return False if not + + If notEmpty is True the this will return an error if the field + is "" or None. + """ + textRE = re.compile(r"^[a-zA-Z_\-0-9\\ \.\/\\:]*$") + + if not text and notEmpty is True: + return False + + if text is None: + if notEmpty is True: + return False + else: + return True + + if not textRE.search(text): + return False + + return True + +def GoodName(text, notEmpty=False): + """From shadow-utils: + + User/group names must match gnu e-regex: + [a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]? + + as a non-POSIX, extension, allow "$" as the last char for + sake of Samba 3.x "add machine script" + + Return True if ok + Return False if not + """ + textRE = re.compile(r"^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?$") + + if not text and notEmpty is True: + return False + + if text is None: + if notEmpty is True: + return False + else: + return True + + m = textRE.match(text) + if not m or text != m.group(0): + return False + + return True diff --git a/ipapython/radius_util.py b/ipapython/radius_util.py new file mode 100644 index 000000000..8e66855eb --- /dev/null +++ b/ipapython/radius_util.py @@ -0,0 +1,366 @@ +# Authors: John Dennis <jdennis@redhat.com> +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import sys +import os +import re +import ldap +import getpass +import ldap.filter + +from ipapython import ipautil +from ipapython.entity import Entity +import ipapython.ipavalidate as ipavalidate + + +__all__ = [ + 'RADIUS_PKG_NAME', + 'RADIUS_PKG_CONFIG_DIR', + 'RADIUS_SERVICE_NAME', + 'RADIUS_USER', + 'RADIUS_IPA_KEYTAB_FILEPATH', + 'RADIUS_LDAP_ATTR_MAP_FILEPATH', + 'RADIUSD_CONF_FILEPATH', + 'RADIUSD_CONF_TEMPLATE_FILEPATH', + 'RADIUSD', + + 'RadiusClient', + 'RadiusProfile', + + 'clients_container', + 'radius_clients_basedn', + 'radius_client_filter', + 'radius_client_dn', + + 'profiles_container', + 'radius_profiles_basedn', + 'radius_profile_filter', + 'radius_profile_dn', + + 'radius_client_ldap_attr_to_radius_attr', + 'radius_client_attr_to_ldap_attr', + + 'radius_profile_ldap_attr_to_radius_attr', + 'radius_profile_attr_to_ldap_attr', + + 'get_secret', + 'validate_ip_addr', + 'validate_secret', + 'validate_name', + 'validate_nastype', + 'validate_desc', + 'validate', + ] + +#------------------------------------------------------------------------------ + +RADIUS_PKG_NAME = 'freeradius' +RADIUS_PKG_CONFIG_DIR = '/etc/raddb' + +RADIUS_SERVICE_NAME = 'radius' +RADIUS_USER = 'radiusd' + +RADIUS_IPA_KEYTAB_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ipa.keytab') +RADIUS_LDAP_ATTR_MAP_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ldap.attrmap') +RADIUSD_CONF_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'radiusd.conf') +RADIUSD_CONF_TEMPLATE_FILEPATH = os.path.join(ipautil.PLUGINS_SHARE_DIR, 'radius.radiusd.conf.template') + +RADIUSD = '/usr/sbin/radiusd' + +#------------------------------------------------------------------------------ + +dotted_octet_re = re.compile(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)(/(\d+))?$") +dns_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9.-]+$") +# secret, name, nastype all have 31 char max in freeRADIUS, max ip address len is 255 +valid_secret_len = (1,31) +valid_name_len = (1,31) +valid_nastype_len = (1,31) +valid_ip_addr_len = (1,255) + +valid_ip_addr_msg = '''\ +IP address must be either a DNS name (letters,digits,dot,hyphen, beginning with +a letter),or a dotted octet followed by an optional mask (e.g 192.168.1.0/24)''' + +valid_desc_msg = "Description must text string" + +#------------------------------------------------------------------------------ + +class RadiusClient(Entity): + + def __init2__(self): + pass + +class RadiusProfile(Entity): + + def __init2__(self): + pass + + +#------------------------------------------------------------------------------ + +def reverse_map_dict(src_dict): + reverse_dict = {} + + for k,v in src_dict.items(): + if reverse_dict.has_key(v): + raise ValueError("reverse_map_dict: collision on (%s) with values (%s),(%s)" % \ + v, reverse_dict[v], src_dict[k]) + reverse_dict[v] = k + return reverse_dict + +#------------------------------------------------------------------------------ + +radius_client_ldap_attr_to_radius_attr = ipautil.CIDict({ + 'radiusClientIPAddress' : 'Client-IP-Address', + 'radiusClientSecret' : 'Secret', + 'radiusClientNASType' : 'NAS-Type', + 'radiusClientShortName' : 'Name', + 'description' : 'Description', + }) + +radius_client_attr_to_ldap_attr = reverse_map_dict(radius_client_ldap_attr_to_radius_attr) + +#------------------------------------------------------------------------------ + +radius_profile_ldap_attr_to_radius_attr = ipautil.CIDict({ + 'uid' : 'UID', + 'radiusArapFeatures' : 'Arap-Features', + 'radiusArapSecurity' : 'Arap-Security', + 'radiusArapZoneAccess' : 'Arap-Zone-Access', + 'radiusAuthType' : 'Auth-Type', + 'radiusCallbackId' : 'Callback-Id', + 'radiusCallbackNumber' : 'Callback-Number', + 'radiusCalledStationId' : 'Called-Station-Id', + 'radiusCallingStationId' : 'Calling-Station-Id', + 'radiusClass' : 'Class', + 'radiusClientIPAddress' : 'Client-IP-Address', + 'radiusExpiration' : 'Expiration', + 'radiusFilterId' : 'Filter-Id', + 'radiusFramedAppleTalkLink' : 'Framed-AppleTalk-Link', + 'radiusFramedAppleTalkNetwork' : 'Framed-AppleTalk-Network', + 'radiusFramedAppleTalkZone' : 'Framed-AppleTalk-Zone', + 'radiusFramedCompression' : 'Framed-Compression', + 'radiusFramedIPAddress' : 'Framed-IP-Address', + 'radiusFramedIPNetmask' : 'Framed-IP-Netmask', + 'radiusFramedIPXNetwork' : 'Framed-IPX-Network', + 'radiusFramedMTU' : 'Framed-MTU', + 'radiusFramedProtocol' : 'Framed-Protocol', + 'radiusFramedRoute' : 'Framed-Route', + 'radiusFramedRouting' : 'Framed-Routing', + 'radiusGroupName' : 'Group-Name', + 'radiusHint' : 'Hint', + 'radiusHuntgroupName' : 'Huntgroup-Name', + 'radiusIdleTimeout' : 'Idle-Timeout', + 'radiusLoginIPHost' : 'Login-IP-Host', + 'radiusLoginLATGroup' : 'Login-LAT-Group', + 'radiusLoginLATNode' : 'Login-LAT-Node', + 'radiusLoginLATPort' : 'Login-LAT-Port', + 'radiusLoginLATService' : 'Login-LAT-Service', + 'radiusLoginService' : 'Login-Service', + 'radiusLoginTCPPort' : 'Login-TCP-Port', + 'radiusLoginTime' : 'Login-Time', + 'radiusNASIpAddress' : 'NAS-IP-Address', + 'radiusPasswordRetry' : 'Password-Retry', + 'radiusPortLimit' : 'Port-Limit', + 'radiusProfileDn' : 'Profile-Dn', + 'radiusPrompt' : 'Prompt', + 'radiusProxyToRealm' : 'Proxy-To-Realm', + 'radiusRealm' : 'Realm', + 'radiusReplicateToRealm' : 'Replicate-To-Realm', + 'radiusReplyMessage' : 'Reply-Message', + 'radiusServiceType' : 'Service-Type', + 'radiusSessionTimeout' : 'Session-Timeout', + 'radiusSimultaneousUse' : 'Simultaneous-Use', + 'radiusStripUserName' : 'Strip-User-Name', + 'radiusTerminationAction' : 'Termination-Action', + 'radiusTunnelAssignmentId' : 'Tunnel-Assignment-Id', + 'radiusTunnelClientEndpoint' : 'Tunnel-Client-Endpoint', + 'radiusTunnelMediumType' : 'Tunnel-Medium-Type', + 'radiusTunnelPassword' : 'Tunnel-Password', + 'radiusTunnelPreference' : 'Tunnel-Preference', + 'radiusTunnelPrivateGroupId' : 'Tunnel-Private-Group-Id', + 'radiusTunnelServerEndpoint' : 'Tunnel-Server-Endpoint', + 'radiusTunnelType' : 'Tunnel-Type', + 'radiusUserCategory' : 'User-Category', + 'radiusVSA' : 'VSA', +}) + +radius_profile_attr_to_ldap_attr = reverse_map_dict(radius_profile_ldap_attr_to_radius_attr) + +#------------------------------------------------------------------------------ + +clients_container = 'cn=clients,cn=radius' + +def radius_clients_basedn(container, suffix): + if container is None: container = clients_container + return '%s,%s' % (container, suffix) + +def radius_client_filter(ip_addr): + return "(&(radiusClientIPAddress=%s)(objectclass=radiusClientProfile))" % \ + ldap.filter.escape_filter_chars(ip_addr) + +def radius_client_dn(client, container, suffix): + if container is None: container = clients_container + return 'radiusClientIPAddress=%s,%s,%s' % (ldap.dn.escape_dn_chars(client), container, suffix) + +# -- + +profiles_container = 'cn=profiles,cn=radius' + +def radius_profiles_basedn(container, suffix): + if container is None: container = profiles_container + return '%s,%s' % (container, suffix) + +def radius_profile_filter(uid): + return "(&(uid=%s)(objectclass=radiusprofile))" % \ + ldap.filter.escape_filter_chars(uid) + +def radius_profile_dn(uid, container, suffix): + if container is None: container = profiles_container + return 'uid=%s,%s,%s' % (ldap.dn.escape_dn_chars(uid), container, suffix) + + +#------------------------------------------------------------------------------ + +def get_ldap_attr_translations(): + comment_re = re.compile('#.*$') + radius_attr_to_ldap_attr = {} + ldap_attr_to_radius_attr = {} + try: + f = open(LDAP_ATTR_MAP_FILEPATH) + for line in f.readlines(): + line = comment_re.sub('', line).strip() + if not line: continue + attr_type, radius_attr, ldap_attr = line.split() + print 'type="%s" radius="%s" ldap="%s"' % (attr_type, radius_attr, ldap_attr) + radius_attr_to_ldap_attr[radius_attr] = {'ldap_attr':ldap_attr, 'attr_type':attr_type} + ldap_attr_to_radius_attr[ldap_attr] = {'radius_attr':radius_attr, 'attr_type':attr_type} + f.close() + except Exception, e: + logging.error('cold not read radius ldap attribute map file (%s): %s', LDAP_ATTR_MAP_FILEPATH, e) + pass # FIXME + + #for k,v in radius_attr_to_ldap_attr.items(): + # print '%s --> %s' % (k,v) + #for k,v in ldap_attr_to_radius_attr.items(): + # print '%s --> %s' % (k,v) + +def get_secret(): + valid = False + while (not valid): + secret = getpass.getpass("Enter Secret: ") + confirm = getpass.getpass("Confirm Secret: ") + if (secret != confirm): + print "Secrets do not match" + continue + valid = True + return secret + +#------------------------------------------------------------------------------ + +def valid_ip_addr(text): + + # is it a dotted octet? If so there should be 4 integers seperated + # by a dot and each integer should be between 0 and 255 + # there may be an optional mask preceded by a slash (e.g. 1.2.3.4/24) + match = dotted_octet_re.search(text) + if match: + # dotted octet notation + i = 1 + while i <= 4: + octet = int(match.group(i)) + if octet > 255: return False + i += 1 + if match.group(5): + mask = int(match.group(6)) + if mask <= 32: + return True + else: + return False + return True + else: + # DNS name, can contain letters, numbers, dot and hypen, must start with a letter + if dns_re.search(text): return True + return False + +def validate_length(value, limits): + length = len(value) + if length < limits[0] or length > limits[1]: + return False + return True + +def valid_length_msg(name, limits): + return "%s length must be at least %d and not more than %d" % (name, limits[0], limits[1]) + +def err_msg(variable, variable_name=None): + if variable_name is None: variable_name = 'value' + print "ERROR: %s = %s" % (variable_name, variable) + +#------------------------------------------------------------------------------ + +def validate_ip_addr(ip_addr, variable_name=None): + if not validate_length(ip_addr, valid_ip_addr_len): + err_msg(ip_addr, variable_name) + print valid_length_msg('ip address', valid_ip_addr_len) + return False + if not valid_ip_addr(ip_addr): + err_msg(ip_addr, variable_name) + print valid_ip_addr_msg + return False + return True + +def validate_secret(secret, variable_name=None): + if not validate_length(secret, valid_secret_len): + err_msg(secret, variable_name) + print valid_length_msg('secret', valid_secret_len) + return False + return True + +def validate_name(name, variable_name=None): + if not validate_length(name, valid_name_len): + err_msg(name, variable_name) + print valid_length_msg('name', valid_name_len) + return False + return True + +def validate_nastype(nastype, variable_name=None): + if not validate_length(nastype, valid_nastype_len): + err_msg(nastype, variable_name) + print valid_length_msg('NAS Type', valid_nastype_len) + return False + return True + +def validate_desc(desc, variable_name=None): + if not ipavalidate.Plain(desc): + print valid_desc_msg + return False + return True + +def validate(attribute, value): + if attribute == 'Client-IP-Address': + return validate_ip_addr(value, attribute) + if attribute == 'Secret': + return validate_secret(value, attribute) + if attribute == 'NAS-Type': + return validate_nastype(value, attribute) + if attribute == 'Name': + return validate_name(value, attribute) + if attribute == 'Description': + return validate_desc(value, attribute) + return True diff --git a/ipapython/setup.py.in b/ipapython/setup.py.in new file mode 100644 index 000000000..667c5ae00 --- /dev/null +++ b/ipapython/setup.py.in @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +"""FreeIPA python support library + +FreeIPA is a server for identity, policy, and audit. +""" + +DOCLINES = __doc__.split("\n") + +import os +import sys +import distutils.sysconfig + +CLASSIFIERS = """\ +Development Status :: 4 - Beta +Intended Audience :: System Environment/Base +License :: GPL +Programming Language :: Python +Operating System :: POSIX +Operating System :: Unix +""" + +# BEFORE importing distutils, remove MANIFEST. distutils doesn't properly +# update it when the contents of directories change. +if os.path.exists('MANIFEST'): os.remove('MANIFEST') + +def setup_package(): + + from distutils.core import setup + + old_path = os.getcwd() + local_path = os.path.dirname(os.path.abspath(sys.argv[0])) + os.chdir(local_path) + sys.path.insert(0,local_path) + + try: + setup( + name = "ipapython", + version = "__VERSION__", + license = "GPL", + author = "Karl MacMillan, et.al.", + author_email = "kmacmill@redhat.com", + maintainer = "freeIPA Developers", + maintainer_email = "freeipa-devel@redhat.com", + url = "http://www.freeipa.org/", + description = DOCLINES[0], + long_description = "\n".join(DOCLINES[2:]), + download_url = "http://www.freeipa.org/page/Downloads", + classifiers=filter(None, CLASSIFIERS.split('\n')), + platforms = ["Linux", "Solaris", "Unix"], + package_dir = {'ipapython': ''}, + packages = [ "ipapython" ], + data_files = [('/etc/ipa', ['ipa.conf'])] + ) + finally: + del sys.path[0] + os.chdir(old_path) + return + +if __name__ == '__main__': + setup_package() diff --git a/ipapython/sysrestore.py b/ipapython/sysrestore.py new file mode 100644 index 000000000..503f38b24 --- /dev/null +++ b/ipapython/sysrestore.py @@ -0,0 +1,317 @@ +# Authors: Mark McLoughlin <markmc@redhat.com> +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +# +# This module provides a very simple API which allows +# ipa-xxx-install --uninstall to restore certain +# parts of the system configuration to the way it was +# before ipa-server-install was first run + +import os +import os.path +import errno +import shutil +import logging +import ConfigParser +import random +import string + +from ipapython import ipautil + +SYSRESTORE_PATH = "/tmp" +SYSRESTORE_INDEXFILE = "sysrestore.index" +SYSRESTORE_STATEFILE = "sysrestore.state" + +class FileStore: + """Class for handling backup and restore of files""" + + def __init__(self, path = SYSRESTORE_PATH): + """Create a _StoreFiles object, that uses @path as the + base directory. + + The file @path/sysrestore.index is used to store information + about the original location of the saved files. + """ + self._path = path + self._index = self._path + "/" + SYSRESTORE_INDEXFILE + + self.random = random.Random() + + self.files = {} + self._load() + + def _load(self): + """Load the file list from the index file. @files will + be an empty dictionary if the file doesn't exist. + """ + + logging.debug("Loading Index file from '%s'", self._index) + + self.files = {} + + p = ConfigParser.SafeConfigParser() + p.read(self._index) + + for section in p.sections(): + if section == "files": + for (key, value) in p.items(section): + self.files[key] = value + + + def save(self): + """Save the file list to @_index. If @files is an empty + dict, then @_index should be removed. + """ + logging.debug("Saving Index File to '%s'", self._index) + + if len(self.files) == 0: + logging.debug(" -> no files, removing file") + if os.path.exists(self._index): + os.remove(self._index) + return + + p = ConfigParser.SafeConfigParser() + + p.add_section('files') + for (key, value) in self.files.items(): + p.set('files', key, str(value)) + + f = file(self._index, "w") + p.write(f) + f.close() + + def backup_file(self, path): + """Create a copy of the file at @path - so long as a copy + does not already exist - which will be restored to its + original location by restore_files(). + """ + logging.debug("Backing up system configuration file '%s'", path) + + if not os.path.isabs(path): + raise ValueError("Absolute path required") + + if not os.path.isfile(path): + logging.debug(" -> Not backing up - '%s' doesn't exist", path) + return + + (reldir, file) = os.path.split(path) + + filename = "" + for i in range(8): + h = "%02x" % self.random.randint(0,255) + filename += h + filename += "-"+file + + backup_path = os.path.join(self._path, filename) + if os.path.exists(backup_path): + logging.debug(" -> Not backing up - already have a copy of '%s'", path) + return + + shutil.copy2(path, backup_path) + + stat = os.stat(path) + + self.files[filename] = string.join([str(stat.st_mode),str(stat.st_uid),str(stat.st_gid),path], ',') + self.save() + + def restore_file(self, path): + """Restore the copy of a file at @path to its original + location and delete the copy. + + Returns #True if the file was restored, #False if there + was no backup file to restore + """ + + logging.debug("Restoring system configuration file '%s'", path) + + if not os.path.isabs(path): + raise ValueError("Absolute path required") + + mode = None + uid = None + gid = None + filename = None + + for (key, value) in self.files.items(): + (mode,uid,gid,filepath) = string.split(value, ',', 3) + if (filepath == path): + filename = key + break + + if not filename: + raise ValueError("No such file name in the index") + + backup_path = os.path.join(self._path, filename) + if not os.path.exists(backup_path): + logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path) + return False + + shutil.move(backup_path, path) + os.chown(path, int(uid), int(gid)) + os.chmod(path, int(mode)) + + ipautil.run(["/sbin/restorecon", path]) + + del self.files[filename] + self.save() + + return True + + def restore_all_files(self): + """Restore the files in the inbdex to their original + location and delete the copy. + + Returns #True if the file was restored, #False if there + was no backup file to restore + """ + + if len(self.files) == 0: + return False + + for (filename, value) in self.files.items(): + + (mode,uid,gid,path) = string.split(value, ',', 3) + + backup_path = os.path.join(self._path, filename) + if not os.path.exists(backup_path): + logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path) + + shutil.move(backup_path, path) + os.chown(path, int(uid), int(gid)) + os.chmod(path, int(mode)) + + ipautil.run(["/sbin/restorecon", path]) + + #force file to be deleted + self.files = {} + self.save() + + return True + +class StateFile: + """A metadata file for recording system state which can + be backed up and later restored. The format is something + like: + + [httpd] + running=True + enabled=False + """ + + def __init__(self, path = SYSRESTORE_PATH): + """Create a StateFile object, loading from @path. + + The dictionary @modules, a member of the returned object, + is where the state can be modified. @modules is indexed + using a module name to return another dictionary containing + key/value pairs with the saved state of that module. + + The keys in these latter dictionaries are arbitrary strings + and the values may either be strings or booleans. + """ + self._path = path+"/"+SYSRESTORE_STATEFILE + + self.modules = {} + + self._load() + + def _load(self): + """Load the modules from the file @_path. @modules will + be an empty dictionary if the file doesn't exist. + """ + logging.debug("Loading StateFile from '%s'", self._path) + + self.modules = {} + + p = ConfigParser.SafeConfigParser() + p.read(self._path) + + for module in p.sections(): + self.modules[module] = {} + for (key, value) in p.items(module): + if value == str(True): + value = True + elif value == str(False): + value = False + self.modules[module][key] = value + + def save(self): + """Save the modules to @_path. If @modules is an empty + dict, then @_path should be removed. + """ + logging.debug("Saving StateFile to '%s'", self._path) + + for module in self.modules.keys(): + if len(self.modules[module]) == 0: + del self.modules[module] + + if len(self.modules) == 0: + logging.debug(" -> no modules, removing file") + if os.path.exists(self._path): + os.remove(self._path) + return + + p = ConfigParser.SafeConfigParser() + + for module in self.modules.keys(): + p.add_section(module) + for (key, value) in self.modules[module].items(): + p.set(module, key, str(value)) + + f = file(self._path, "w") + p.write(f) + f.close() + + def backup_state(self, module, key, value): + """Backup an item of system state from @module, identified + by the string @key and with the value @value. @value may be + a string or boolean. + """ + if not (isinstance(value, str) or isinstance(value, bool)): + raise ValueError("Only strings or booleans supported") + + if not self.modules.has_key(module): + self.modules[module] = {} + + if not self.modules.has_key(key): + self.modules[module][key] = value + + self.save() + + def restore_state(self, module, key): + """Return the value of an item of system state from @module, + identified by the string @key, and remove it from the backed + up system state. + + If the item doesn't exist, #None will be returned, otherwise + the original string or boolean value is returned. + """ + + if not self.modules.has_key(module): + return None + + if not self.modules[module].has_key(key): + return None + + value = self.modules[module][key] + del self.modules[module][key] + + self.save() + + return value diff --git a/ipapython/test/test_aci.py b/ipapython/test/test_aci.py new file mode 100644 index 000000000..fb9d84c7f --- /dev/null +++ b/ipapython/test/test_aci.py @@ -0,0 +1,127 @@ +#! /usr/bin/python -E +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import sys +sys.path.insert(0, ".") + +import unittest +import aci +import urllib + + +class TestACI(unittest.TestCase): + acitemplate = ('(targetattr="%s")' + + '(targetfilter="(memberOf=%s)")' + + '(version 3.0;' + + 'acl "%s";' + + 'allow (write) ' + + 'groupdn="ldap:///%s";)') + + def setUp(self): + self.aci = aci.ACI() + + def tearDown(self): + pass + + def testExport(self): + self.aci.source_group = 'cn=foo, dc=freeipa, dc=org' + self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org' + self.aci.name = 'this is a "name' + self.aci.attrs = ['field1', 'field2', 'field3'] + + exportaci = self.aci.export_to_string() + aci = TestACI.acitemplate % ('field1 || field2 || field3', + self.aci.dest_group, + 'this is a "name', + self.aci.source_group) + + self.assertEqual(aci, exportaci) + + def testURLEncodedExport(self): + self.aci.source_group = 'cn=foo " bar, dc=freeipa, dc=org' + self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org' + self.aci.name = 'this is a "name' + self.aci.attrs = ['field1', 'field2', 'field3'] + + exportaci = self.aci.export_to_string() + aci = TestACI.acitemplate % ('field1 || field2 || field3', + self.aci.dest_group, + 'this is a "name', + urllib.quote(self.aci.source_group, "/=, ")) + + self.assertEqual(aci, exportaci) + + def testSimpleParse(self): + attr_str = 'field3 || field4 || field5' + dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org' + name = 'my name' + src_dn = 'cn=srcgroup, dc=freeipa, dc=org' + + acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn) + self.aci.parse_acistr(acistr) + + self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs) + self.assertEqual(dest_dn, self.aci.dest_group) + self.assertEqual(name, self.aci.name) + self.assertEqual(src_dn, self.aci.source_group) + + def testUrlEncodedParse(self): + attr_str = 'field3 || field4 || field5' + dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org' + name = 'my name' + src_dn = 'cn=src " group, dc=freeipa, dc=org' + + acistr = TestACI.acitemplate % (attr_str, dest_dn, name, + urllib.quote(src_dn, "/=, ")) + self.aci.parse_acistr(acistr) + + self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs) + self.assertEqual(dest_dn, self.aci.dest_group) + self.assertEqual(name, self.aci.name) + self.assertEqual(src_dn, self.aci.source_group) + + def testInvalidParse(self): + try: + self.aci.parse_acistr('foo bar') + self.fail('Should have failed to parse') + except SyntaxError: + pass + + try: + self.aci.parse_acistr('') + self.fail('Should have failed to parse') + except SyntaxError: + pass + + attr_str = 'field3 || field4 || field5' + dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org' + name = 'my name' + src_dn = 'cn=srcgroup, dc=freeipa, dc=org' + + acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn) + acistr += 'trailing garbage' + try: + self.aci.parse_acistr('') + self.fail('Should have failed to parse') + except SyntaxError: + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/ipapython/test/test_ipautil.py b/ipapython/test/test_ipautil.py new file mode 100644 index 000000000..60d53a270 --- /dev/null +++ b/ipapython/test/test_ipautil.py @@ -0,0 +1,309 @@ +#! /usr/bin/python -E +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import sys +sys.path.insert(0, ".") + +import unittest +import datetime + +import ipautil + + +class TestCIDict(unittest.TestCase): + def setUp(self): + self.cidict = ipautil.CIDict() + self.cidict["Key1"] = "val1" + self.cidict["key2"] = "val2" + self.cidict["KEY3"] = "VAL3" + + def tearDown(self): + pass + + def testLen(self): + self.assertEqual(3, len(self.cidict)) + + def test__GetItem(self): + self.assertEqual("val1", self.cidict["Key1"]) + self.assertEqual("val1", self.cidict["key1"]) + self.assertEqual("val2", self.cidict["KEY2"]) + self.assertEqual("VAL3", self.cidict["key3"]) + self.assertEqual("VAL3", self.cidict["KEY3"]) + try: + self.cidict["key4"] + fail("should have raised KeyError") + except KeyError: + pass + + def testGet(self): + self.assertEqual("val1", self.cidict.get("Key1")) + self.assertEqual("val1", self.cidict.get("key1")) + self.assertEqual("val2", self.cidict.get("KEY2")) + self.assertEqual("VAL3", self.cidict.get("key3")) + self.assertEqual("VAL3", self.cidict.get("KEY3")) + self.assertEqual("default", self.cidict.get("key4", "default")) + + def test__SetItem(self): + self.cidict["key4"] = "val4" + self.assertEqual("val4", self.cidict["key4"]) + self.cidict["KEY4"] = "newval4" + self.assertEqual("newval4", self.cidict["key4"]) + + def testDel(self): + self.assert_(self.cidict.has_key("Key1")) + del(self.cidict["Key1"]) + self.failIf(self.cidict.has_key("Key1")) + + self.assert_(self.cidict.has_key("key2")) + del(self.cidict["KEY2"]) + self.failIf(self.cidict.has_key("key2")) + + def testClear(self): + self.assertEqual(3, len(self.cidict)) + self.cidict.clear() + self.assertEqual(0, len(self.cidict)) + + def testCopy(self): + """A copy is no longer a CIDict, but should preserve the case of + the keys as they were inserted.""" + copy = self.cidict.copy() + self.assertEqual(3, len(copy)) + self.assert_(copy.has_key("Key1")) + self.assertEqual("val1", copy["Key1"]) + self.failIf(copy.has_key("key1")) + + def testHasKey(self): + self.assert_(self.cidict.has_key("KEY1")) + self.assert_(self.cidict.has_key("key2")) + self.assert_(self.cidict.has_key("key3")) + + def testItems(self): + items = self.cidict.items() + self.assertEqual(3, len(items)) + items_set = set(items) + self.assert_(("Key1", "val1") in items_set) + self.assert_(("key2", "val2") in items_set) + self.assert_(("KEY3", "VAL3") in items_set) + + def testIterItems(self): + items = [] + for (k,v) in self.cidict.iteritems(): + items.append((k,v)) + self.assertEqual(3, len(items)) + items_set = set(items) + self.assert_(("Key1", "val1") in items_set) + self.assert_(("key2", "val2") in items_set) + self.assert_(("KEY3", "VAL3") in items_set) + + def testIterKeys(self): + keys = [] + for k in self.cidict.iterkeys(): + keys.append(k) + self.assertEqual(3, len(keys)) + keys_set = set(keys) + self.assert_("Key1" in keys_set) + self.assert_("key2" in keys_set) + self.assert_("KEY3" in keys_set) + + def testIterValues(self): + values = [] + for k in self.cidict.itervalues(): + values.append(k) + self.assertEqual(3, len(values)) + values_set = set(values) + self.assert_("val1" in values_set) + self.assert_("val2" in values_set) + self.assert_("VAL3" in values_set) + + def testKeys(self): + keys = self.cidict.keys() + self.assertEqual(3, len(keys)) + keys_set = set(keys) + self.assert_("Key1" in keys_set) + self.assert_("key2" in keys_set) + self.assert_("KEY3" in keys_set) + + def testValues(self): + values = self.cidict.values() + self.assertEqual(3, len(values)) + values_set = set(values) + self.assert_("val1" in values_set) + self.assert_("val2" in values_set) + self.assert_("VAL3" in values_set) + + def testUpdate(self): + newdict = { "KEY2": "newval2", + "key4": "val4" } + self.cidict.update(newdict) + self.assertEqual(4, len(self.cidict)) + + items = self.cidict.items() + self.assertEqual(4, len(items)) + items_set = set(items) + self.assert_(("Key1", "val1") in items_set) + # note the update "overwrites" the case of the key2 + self.assert_(("KEY2", "newval2") in items_set) + self.assert_(("KEY3", "VAL3") in items_set) + self.assert_(("key4", "val4") in items_set) + + def testSetDefault(self): + self.assertEqual("val1", self.cidict.setdefault("KEY1", "default")) + + self.failIf(self.cidict.has_key("KEY4")) + self.assertEqual("default", self.cidict.setdefault("KEY4", "default")) + self.assert_(self.cidict.has_key("KEY4")) + self.assertEqual("default", self.cidict["key4"]) + + self.failIf(self.cidict.has_key("KEY5")) + self.assertEqual(None, self.cidict.setdefault("KEY5")) + self.assert_(self.cidict.has_key("KEY5")) + self.assertEqual(None, self.cidict["key5"]) + + def testPop(self): + self.assertEqual("val1", self.cidict.pop("KEY1", "default")) + self.failIf(self.cidict.has_key("key1")) + + self.assertEqual("val2", self.cidict.pop("KEY2")) + self.failIf(self.cidict.has_key("key2")) + + self.assertEqual("default", self.cidict.pop("key4", "default")) + try: + self.cidict.pop("key4") + fail("should have raised KeyError") + except KeyError: + pass + + def testPopItem(self): + items = set(self.cidict.items()) + self.assertEqual(3, len(self.cidict)) + + item = self.cidict.popitem() + self.assertEqual(2, len(self.cidict)) + self.assert_(item in items) + items.discard(item) + + item = self.cidict.popitem() + self.assertEqual(1, len(self.cidict)) + self.assert_(item in items) + items.discard(item) + + item = self.cidict.popitem() + self.assertEqual(0, len(self.cidict)) + self.assert_(item in items) + items.discard(item) + +class TestTimeParser(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def testSimple(self): + timestr = "20070803" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(2007, time.year) + self.assertEqual(8, time.month) + self.assertEqual(3, time.day) + self.assertEqual(0, time.hour) + self.assertEqual(0, time.minute) + self.assertEqual(0, time.second) + + def testHourMinSec(self): + timestr = "20051213141205" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(2005, time.year) + self.assertEqual(12, time.month) + self.assertEqual(13, time.day) + self.assertEqual(14, time.hour) + self.assertEqual(12, time.minute) + self.assertEqual(5, time.second) + + def testFractions(self): + timestr = "2003092208.5" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(2003, time.year) + self.assertEqual(9, time.month) + self.assertEqual(22, time.day) + self.assertEqual(8, time.hour) + self.assertEqual(30, time.minute) + self.assertEqual(0, time.second) + + timestr = "199203301544,25" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(1992, time.year) + self.assertEqual(3, time.month) + self.assertEqual(30, time.day) + self.assertEqual(15, time.hour) + self.assertEqual(44, time.minute) + self.assertEqual(15, time.second) + + timestr = "20060401185912,8" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(2006, time.year) + self.assertEqual(4, time.month) + self.assertEqual(1, time.day) + self.assertEqual(18, time.hour) + self.assertEqual(59, time.minute) + self.assertEqual(12, time.second) + self.assertEqual(800000, time.microsecond) + + def testTimeZones(self): + timestr = "20051213141205Z" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(0, time.tzinfo.houroffset) + self.assertEqual(0, time.tzinfo.minoffset) + offset = time.tzinfo.utcoffset(None) + self.assertEqual(0, offset.seconds) + + timestr = "20051213141205+0500" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(5, time.tzinfo.houroffset) + self.assertEqual(0, time.tzinfo.minoffset) + offset = time.tzinfo.utcoffset(None) + self.assertEqual(5 * 60 * 60, offset.seconds) + + timestr = "20051213141205-0500" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(-5, time.tzinfo.houroffset) + self.assertEqual(0, time.tzinfo.minoffset) + # NOTE - the offset is always positive - it's minutes + # _east_ of UTC + offset = time.tzinfo.utcoffset(None) + self.assertEqual((24 - 5) * 60 * 60, offset.seconds) + + timestr = "20051213141205-0930" + + time = ipautil.parse_generalized_time(timestr) + self.assertEqual(-9, time.tzinfo.houroffset) + self.assertEqual(-30, time.tzinfo.minoffset) + offset = time.tzinfo.utcoffset(None) + self.assertEqual(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds) + + +if __name__ == '__main__': + unittest.main() diff --git a/ipapython/test/test_ipavalidate.py b/ipapython/test/test_ipavalidate.py new file mode 100644 index 000000000..8b79fbf07 --- /dev/null +++ b/ipapython/test/test_ipavalidate.py @@ -0,0 +1,97 @@ +#! /usr/bin/python -E +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +import sys +sys.path.insert(0, ".") + +import unittest + +import ipavalidate + +class TestValidate(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_validEmail(self): + self.assertEqual(True, ipavalidate.Email("test@freeipa.org")) + self.assertEqual(True, ipavalidate.Email("", notEmpty=False)) + + def test_invalidEmail(self): + self.assertEqual(False, ipavalidate.Email("test")) + self.assertEqual(False, ipavalidate.Email("test@freeipa")) + self.assertEqual(False, ipavalidate.Email("test@.com")) + self.assertEqual(False, ipavalidate.Email("")) + self.assertEqual(False, ipavalidate.Email(None)) + + def test_validPlain(self): + self.assertEqual(True, ipavalidate.Plain("Joe User")) + self.assertEqual(True, ipavalidate.Plain("Joe O'Malley")) + self.assertEqual(True, ipavalidate.Plain("", notEmpty=False)) + self.assertEqual(True, ipavalidate.Plain(None, notEmpty=False)) + self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=False)) + self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=True)) + + def test_invalidPlain(self): + self.assertEqual(False, ipavalidate.Plain("Joe (User)")) + self.assertEqual(False, ipavalidate.Plain("Joe C. User")) + self.assertEqual(False, ipavalidate.Plain("", notEmpty=True)) + self.assertEqual(False, ipavalidate.Plain(None, notEmpty=True)) + self.assertEqual(False, ipavalidate.Plain("Joe User", allowSpaces=False)) + self.assertEqual(False, ipavalidate.Plain("Joe C. User")) + + def test_validString(self): + self.assertEqual(True, ipavalidate.String("Joe User")) + self.assertEqual(True, ipavalidate.String("Joe O'Malley")) + self.assertEqual(True, ipavalidate.String("", notEmpty=False)) + self.assertEqual(True, ipavalidate.String(None, notEmpty=False)) + self.assertEqual(True, ipavalidate.String("Joe C. User")) + + def test_invalidString(self): + self.assertEqual(False, ipavalidate.String("", notEmpty=True)) + self.assertEqual(False, ipavalidate.String(None, notEmpty=True)) + + def test_validPath(self): + self.assertEqual(True, ipavalidate.Path("/")) + self.assertEqual(True, ipavalidate.Path("/home/user")) + self.assertEqual(True, ipavalidate.Path("../home/user")) + self.assertEqual(True, ipavalidate.Path("", notEmpty=False)) + self.assertEqual(True, ipavalidate.Path(None, notEmpty=False)) + + def test_invalidPath(self): + self.assertEqual(False, ipavalidate.Path("(foo)")) + self.assertEqual(False, ipavalidate.Path("", notEmpty=True)) + self.assertEqual(False, ipavalidate.Path(None, notEmpty=True)) + + def test_validName(self): + self.assertEqual(True, ipavalidate.GoodName("foo")) + self.assertEqual(True, ipavalidate.GoodName("1foo")) + self.assertEqual(True, ipavalidate.GoodName("foo.bar")) + self.assertEqual(True, ipavalidate.GoodName("foo.bar$")) + + def test_invalidName(self): + self.assertEqual(False, ipavalidate.GoodName("foo bar")) + self.assertEqual(False, ipavalidate.GoodName("foo%bar")) + self.assertEqual(False, ipavalidate.GoodName("*foo")) + self.assertEqual(False, ipavalidate.GoodName("$foo.bar$")) + +if __name__ == '__main__': + unittest.main() diff --git a/ipapython/version.py.in b/ipapython/version.py.in new file mode 100644 index 000000000..fdb689f02 --- /dev/null +++ b/ipapython/version.py.in @@ -0,0 +1,25 @@ +# Authors: Rob Crittenden <rcritten@redhat.com> +# +# Copyright (C) 2007 Red Hat +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation; version 2 only +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +# The full version including strings +VERSION="__VERSION__" + +# Just the numeric portion of the version so one can do direct numeric +# comparisons to see if the API is compatible. +NUM_VERSION=__NUM_VERSION__ |