summaryrefslogtreecommitdiffstats
path: root/ipapython
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2009-02-05 15:03:08 -0500
committerRob Crittenden <rcritten@redhat.com>2009-02-09 14:35:15 -0500
commit262ff2d731b1bfc4acd91153088b8fcde7ae92b8 (patch)
treebaf8894d4b357b610113b87d4bfee84de24f08bd /ipapython
parent58ae191a5afbf29d78afd3969f8d106415897958 (diff)
downloadfreeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.gz
freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.xz
freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.zip
Rename ipa-python directory to ipapython so it is a real python library
We used to install it as ipa, now installing it as ipapython. The rpm is still ipa-python.
Diffstat (limited to 'ipapython')
-rw-r--r--ipapython/MANIFEST.in2
-rw-r--r--ipapython/Makefile28
-rw-r--r--ipapython/README18
-rw-r--r--ipapython/__init__.py0
-rw-r--r--ipapython/config.py183
-rw-r--r--ipapython/dnsclient.py443
-rw-r--r--ipapython/entity.py202
-rw-r--r--ipapython/ipa.conf3
-rw-r--r--ipapython/ipautil.py969
-rw-r--r--ipapython/ipavalidate.py137
-rw-r--r--ipapython/radius_util.py366
-rw-r--r--ipapython/setup.py.in77
-rw-r--r--ipapython/sysrestore.py317
-rw-r--r--ipapython/test/test_aci.py127
-rw-r--r--ipapython/test/test_ipautil.py309
-rw-r--r--ipapython/test/test_ipavalidate.py97
-rw-r--r--ipapython/version.py.in25
17 files changed, 3303 insertions, 0 deletions
diff --git a/ipapython/MANIFEST.in b/ipapython/MANIFEST.in
new file mode 100644
index 000000000..d178f0838
--- /dev/null
+++ b/ipapython/MANIFEST.in
@@ -0,0 +1,2 @@
+include *.conf
+
diff --git a/ipapython/Makefile b/ipapython/Makefile
new file mode 100644
index 000000000..4ac027e14
--- /dev/null
+++ b/ipapython/Makefile
@@ -0,0 +1,28 @@
+PYTHONLIBDIR ?= $(shell python -c "from distutils.sysconfig import *; print get_python_lib()")
+PACKAGEDIR ?= $(DESTDIR)/$(PYTHONLIBDIR)/ipa
+CONFIGDIR ?= $(DESTDIR)/etc/ipa
+TESTS = $(wildcard test/*.py)
+
+all: ;
+
+install:
+ if [ "$(DESTDIR)" = "" ]; then \
+ python setup.py install; \
+ else \
+ python setup.py install --root $(DESTDIR); \
+ fi
+
+clean:
+ rm -f *~ *.pyc
+
+distclean: clean
+ rm -f setup.py ipa-python.spec version.py
+
+maintainer-clean: distclean
+ rm -rf build
+
+.PHONY: test
+test: $(subst .py,.tst,$(TESTS))
+
+%.tst: %.py
+ python $<
diff --git a/ipapython/README b/ipapython/README
new file mode 100644
index 000000000..c966e44bf
--- /dev/null
+++ b/ipapython/README
@@ -0,0 +1,18 @@
+This is a set of libraries common to IPA clients and servers though mostly
+geared currently towards command-line tools.
+
+A brief overview:
+
+config.py - identify the IPA server domain and realm. It uses dnsclient to
+ try to detect this information first and will fall back to
+ /etc/ipa/ipa.conf if that fails.
+dnsclient.py - find IPA information via DNS
+
+ipautil.py - helper functions
+
+radius_util.py - helper functions for Radius
+
+entity.py - entity is the main data type. User and Group extend this class
+ (but don't add anything currently).
+
+ipavalidate.py - basic data validation routines
diff --git a/ipapython/__init__.py b/ipapython/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/ipapython/__init__.py
diff --git a/ipapython/config.py b/ipapython/config.py
new file mode 100644
index 000000000..efae910eb
--- /dev/null
+++ b/ipapython/config.py
@@ -0,0 +1,183 @@
+# Authors: Karl MacMillan <kmacmill@redhat.com>
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import ConfigParser
+from optparse import OptionParser, IndentedHelpFormatter
+
+import krbV
+import socket
+import ipapython.dnsclient
+import re
+
+class IPAConfigError(Exception):
+ def __init__(self, msg=''):
+ self.msg = msg
+ Exception.__init__(self, msg)
+
+ def __repr__(self):
+ return self.msg
+
+ __str__ = __repr__
+
+class IPAFormatter(IndentedHelpFormatter):
+ """Our own optparse formatter that indents multiple lined usage string."""
+ def format_usage(self, usage):
+ usage_string = "Usage:"
+ spacing = " " * len(usage_string)
+ lines = usage.split("\n")
+ ret = "%s %s\n" % (usage_string, lines[0])
+ for line in lines[1:]:
+ ret += "%s %s\n" % (spacing, line)
+ return ret
+
+def verify_args(parser, args, needed_args = None):
+ """Verify that we have all positional arguments we need, if not, exit."""
+ if needed_args:
+ needed_list = needed_args.split(" ")
+ else:
+ needed_list = []
+ len_need = len(needed_list)
+ len_have = len(args)
+ if len_have > len_need:
+ parser.error("too many arguments")
+ elif len_have < len_need:
+ parser.error("no %s specified" % needed_list[len_have])
+
+class IPAConfig:
+ def __init__(self):
+ self.default_realm = None
+ self.default_server = []
+ self.default_domain = None
+
+ def get_realm(self):
+ if self.default_realm:
+ return self.default_realm
+ else:
+ raise IPAConfigError("no default realm")
+
+ def get_server(self):
+ if len(self.default_server):
+ return self.default_server
+ else:
+ raise IPAConfigError("no default server")
+
+ def get_domain(self):
+ if self.default_domain:
+ return self.default_domain
+ else:
+ raise IPAConfigError("no default domain")
+
+# Global library config
+config = IPAConfig()
+
+def __parse_config(discover_server = True):
+ p = ConfigParser.SafeConfigParser()
+ p.read("/etc/ipa/ipa.conf")
+
+ try:
+ if not config.default_realm:
+ config.default_realm = p.get("defaults", "realm")
+ except:
+ pass
+ if discover_server:
+ try:
+ s = p.get("defaults", "server")
+ config.default_server.extend(re.sub("\s+", "", s).split(','))
+ except:
+ pass
+ try:
+ if not config.default_domain:
+ config.default_domain = p.get("defaults", "domain")
+ except:
+ pass
+
+def __discover_config(discover_server = True):
+ rl = 0
+ try:
+ if not config.default_realm:
+ krbctx = krbV.default_context()
+ config.default_realm = krbctx.default_realm
+ if not config.default_realm:
+ return False
+
+ if not config.default_domain:
+ #try once with REALM -> domain
+ dom_name = config.default_realm.lower()
+ name = "_ldap._tcp."+dom_name+"."
+ rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV)
+ rl = len(rs)
+ if rl == 0:
+ #try cycling on domain components of FQDN
+ dom_name = socket.getfqdn()
+ while rl == 0:
+ tok = dom_name.find(".")
+ if tok == -1:
+ return False
+ dom_name = dom_name[tok+1:]
+ name = "_ldap._tcp." + dom_name + "."
+ rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV)
+ rl = len(rs)
+
+ config.default_domain = dom_name
+
+ if discover_server:
+ if rl == 0:
+ name = "_ldap._tcp."+config.default_domain+"."
+ rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV)
+
+ for r in rs:
+ if r.dns_type == ipapython.dnsclient.DNS_T_SRV:
+ rsrv = r.rdata.server.rstrip(".")
+ config.default_server.append(rsrv)
+
+ except:
+ pass
+
+def add_standard_options(parser):
+ parser.add_option("--realm", dest="realm", help="Override default IPA realm")
+ parser.add_option("--server", dest="server", help="Override default IPA server")
+ parser.add_option("--domain", dest="domain", help="Override default IPA DNS domain")
+
+def init_config(options=None):
+ if options:
+ config.default_realm = options.realm
+ config.default_domain = options.domain
+ if options.server:
+ config.default_server.extend(options.server.split(","))
+
+ if len(config.default_server):
+ discover_server = False
+ else:
+ discover_server = True
+ __parse_config(discover_server)
+ __discover_config(discover_server)
+
+ # make sure the server list only contains unique items
+ new_server = []
+ for server in config.default_server:
+ if server not in new_server:
+ new_server.append(server)
+ config.default_server = new_server
+
+ if not config.default_realm:
+ raise IPAConfigError("IPA realm not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.")
+ if not config.default_server:
+ raise IPAConfigError("IPA server not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.")
+ if not config.default_domain:
+ raise IPAConfigError("IPA domain not found in the config file (/etc/ipa/ipa.conf) or on the command line.")
diff --git a/ipapython/dnsclient.py b/ipapython/dnsclient.py
new file mode 100644
index 000000000..58d93d850
--- /dev/null
+++ b/ipapython/dnsclient.py
@@ -0,0 +1,443 @@
+#
+# Copyright 2001, 2005 Red Hat, Inc.
+#
+# This is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 only
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+#
+
+import struct
+import socket
+import sys
+
+import acutil
+
+DNS_C_IN = 1
+DNS_C_CS = 2
+DNS_C_CHAOS = 3
+DNS_C_HS = 4
+DNS_C_ANY = 255
+
+DNS_T_A = 1
+DNS_T_NS = 2
+DNS_T_CNAME = 5
+DNS_T_SOA = 6
+DNS_T_NULL = 10
+DNS_T_WKS = 11
+DNS_T_PTR = 12
+DNS_T_HINFO = 13
+DNS_T_MX = 15
+DNS_T_TXT = 16
+DNS_T_SRV = 33
+DNS_T_ANY = 255
+
+DEBUG_DNSCLIENT = False
+
+class DNSQueryHeader:
+ FORMAT = "!HBBHHHH"
+ def __init__(self):
+ self.dns_id = 0
+ self.dns_rd = 0
+ self.dns_tc = 0
+ self.dns_aa = 0
+ self.dns_opcode = 0
+ self.dns_qr = 0
+ self.dns_rcode = 0
+ self.dns_z = 0
+ self.dns_ra = 0
+ self.dns_qdcount = 0
+ self.dns_ancount = 0
+ self.dns_nscount = 0
+ self.dns_arcount = 0
+
+ def pack(self):
+ return struct.pack(DNSQueryHeader.FORMAT,
+ self.dns_id,
+ (self.dns_rd & 1) |
+ (self.dns_tc & 1) << 1 |
+ (self.dns_aa & 1) << 2 |
+ (self.dns_opcode & 15) << 3 |
+ (self.dns_qr & 1) << 7,
+ (self.dns_rcode & 15) |
+ (self.dns_z & 7) << 4 |
+ (self.dns_ra & 1) << 7,
+ self.dns_qdcount,
+ self.dns_ancount,
+ self.dns_nscount,
+ self.dns_arcount)
+
+ def unpack(self, data):
+ (self.dns_id, byte1, byte2, self.dns_qdcount, self.dns_ancount,
+ self.dns_nscount, self.dns_arcount) = struct.unpack(DNSQueryHeader.FORMAT, data[0:self.size()])
+ self.dns_rd = byte1 & 1
+ self.dns_tc = (byte1 >> 1) & 1
+ self.dns_aa = (byte1 >> 2) & 1
+ self.dns_opcode = (byte1 >> 3) & 15
+ self.dns_qr = (byte1 >> 7) & 1
+ self.dns_rcode = byte2 & 15
+ self.dns_z = (byte2 >> 4) & 7
+ self.dns_ra = (byte1 >> 7) & 1
+
+ def size(self):
+ return struct.calcsize(DNSQueryHeader.FORMAT)
+
+def unpackQueryHeader(data):
+ header = DNSQueryHeader()
+ header.unpack(data)
+ return header
+
+class DNSResult:
+ FORMAT = "!HHIH"
+ QFORMAT = "!HH"
+ def __init__(self):
+ self.dns_name = ""
+ self.dns_type = 0
+ self.dns_class = 0
+ self.dns_ttl = 0
+ self.dns_rlength = 0
+ self.rdata = None
+
+ def unpack(self, data):
+ (self.dns_type, self.dns_class, self.dns_ttl,
+ self.dns_rlength) = struct.unpack(DNSResult.FORMAT, data[0:self.size()])
+
+ def qunpack(self, data):
+ (self.dns_type, self.dns_class) = struct.unpack(DNSResult.QFORMAT, data[0:self.qsize()])
+
+ def size(self):
+ return struct.calcsize(DNSResult.FORMAT)
+
+ def qsize(self):
+ return struct.calcsize(DNSResult.QFORMAT)
+
+class DNSRData:
+ def __init__(self):
+ pass
+
+#typedef struct dns_rr_a {
+# u_int32_t address;
+#} dns_rr_a_t;
+#
+#typedef struct dns_rr_cname {
+# const char *cname;
+#} dns_rr_cname_t;
+#
+#typedef struct dns_rr_hinfo {
+# const char *cpu, *os;
+#} dns_rr_hinfo_t;
+#
+#typedef struct dns_rr_mx {
+# u_int16_t preference;
+# const char *exchange;
+#} dns_rr_mx_t;
+#
+#typedef struct dns_rr_null {
+# unsigned const char *data;
+#} dns_rr_null_t;
+#
+#typedef struct dns_rr_ns {
+# const char *nsdname;
+#} dns_rr_ns_t;
+#
+#typedef struct dns_rr_ptr {
+# const char *ptrdname;
+#} dns_rr_ptr_t;
+#
+#typedef struct dns_rr_soa {
+# const char *mname;
+# const char *rname;
+# u_int32_t serial;
+# int32_t refresh;
+# int32_t retry;
+# int32_t expire;
+# int32_t minimum;
+#} dns_rr_soa_t;
+#
+#typedef struct dns_rr_txt {
+# const char *data;
+#} dns_rr_txt_t;
+#
+#typedef struct dns_rr_srv {
+# const char *server;
+# u_int16_t priority;
+# u_int16_t weight;
+# u_int16_t port;
+#} dns_rr_srv_t;
+
+def dnsNameToLabel(name):
+ out = ""
+ name = name.split(".")
+ for part in name:
+ out += chr(len(part)) + part
+ return out
+
+def dnsFormatQuery(query, qclass, qtype):
+ header = DNSQueryHeader()
+
+ header.dns_id = 0 # FIXME: id = 0
+ header.dns_rd = 1 # don't know why the original code didn't request recursion for non SOA requests
+ header.dns_qr = 0 # query
+ header.dns_opcode = 0 # standard query
+ header.dns_qdcount = 1 # single query
+
+ qlabel = dnsNameToLabel(query)
+ if not qlabel:
+ return ""
+
+ out = header.pack() + qlabel
+ out += chr(qtype >> 8)
+ out += chr(qtype & 0xff)
+ out += chr(qclass >> 8)
+ out += chr(qclass & 0xff)
+
+ return out
+
+def dnsParseLabel(label, base):
+ # returns (output, rest)
+ if not label:
+ return ("", None)
+
+ update = 1
+ rest = label
+ output = ""
+ skip = 0
+
+ try:
+ while ord(rest[0]):
+ if ord(rest[0]) & 0xc0:
+ rest = base[((ord(rest[0]) & 0x3f) << 8) + ord(rest[1]):]
+ if update:
+ skip += 2
+ update = 0
+ continue
+ output += rest[1:ord(rest[0]) + 1] + "."
+ if update:
+ skip += ord(rest[0]) + 1
+ rest = rest[ord(rest[0]) + 1:]
+ except IndexError:
+ return ("", None)
+ return (label[skip+update:], output)
+
+def dnsParseA(data, base):
+ rdata = DNSRData()
+ if len(data) < 4:
+ rdata.address = 0
+ return None
+
+ rdata.address = (ord(data[0])<<24) | (ord(data[1])<<16) | (ord(data[2])<<8) | (ord(data[3])<<0)
+
+ if DEBUG_DNSCLIENT:
+ print "A = %d.%d.%d.%d." % (ord(data[0]), ord(data[1]), ord(data[2]), ord(data[3]))
+ return rdata
+
+def dnsParseText(data):
+ if len(data) < 1:
+ return ("", None)
+ tlen = ord(data[0])
+ if len(data) < tlen + 1:
+ return ("", None)
+ return (data[tlen+1:], data[1:tlen+1])
+
+def dnsParseNS(data, base):
+ rdata = DNSRData()
+ (rest, rdata.nsdname) = dnsParseLabel(data, base)
+ if DEBUG_DNSCLIENT:
+ print "NS DNAME = \"%s\"." % (rdata.nsdname)
+ return rdata
+
+def dnsParseCNAME(data, base):
+ rdata = DNSRData()
+ (rest, rdata.cname) = dnsParseLabel(data, base)
+ if DEBUG_DNSCLIENT:
+ print "CNAME = \"%s\"." % (rdata.cname)
+ return rdata
+
+def dnsParseSOA(data, base):
+ rdata = DNSRData()
+ format = "!IIIII"
+
+ (rest, rdata.mname) = dnsParseLabel(data, base)
+ if rdata.mname is None:
+ return None
+ (rest, rdata.rname) = dnsParseLabel(rest, base)
+ if rdata.rname is None:
+ return None
+ if len(rest) < struct.calcsize(format):
+ return None
+
+ (rdata.serial, rdata.refresh, rdata.retry, rdata.expire,
+ rdata.minimum) = struct.unpack(format, rest[:struct.calcsize(format)])
+
+ if DEBUG_DNSCLIENT:
+ print "SOA(mname) = \"%s\"." % rdata.mname
+ print "SOA(rname) = \"%s\"." % rdata.rname
+ print "SOA(serial) = %d." % rdata.serial
+ print "SOA(refresh) = %d." % rdata.refresh
+ print "SOA(retry) = %d." % rdata.retry
+ print "SOA(expire) = %d." % rdata.expire
+ print "SOA(minimum) = %d." % rdata.minimum
+ return rdata
+
+def dnsParseNULL(data, base):
+ # um, yeah
+ return None
+
+def dnsParseWKS(data, base):
+ return None
+
+def dnsParseHINFO(data, base):
+ rdata = DNSRData()
+ (rest, rdata.cpu) = dnsParseText(data)
+ if rest:
+ (rest, rdata.os) = dnsParseText(rest)
+ if DEBUG_DNSCLIENT:
+ print "HINFO(cpu) = \"%s\"." % rdata.cpu
+ print "HINFO(os) = \"%s\"." % rdata.os
+ return rdata
+
+def dnsParseMX(data, base):
+ rdata = DNSRData()
+ if len(data) < 2:
+ return None
+ rdata.preference = (ord(data[0]) << 8) | ord(data[1])
+ (rest, rdata.exchange) = dnsParseLabel(data[2:], base)
+ if DEBUG_DNSCLIENT:
+ print "MX(exchanger) = \"%s\"." % rdata.exchange
+ print "MX(preference) = %d." % rdata.preference
+ return rdata
+
+def dnsParseTXT(data, base):
+ rdata = DNSRData()
+ (rest, rdata.data) = dnsParseText(data)
+ if DEBUG_DNSCLIENT:
+ print "TXT = \"%s\"." % rdata.data
+ return rdata
+
+def dnsParsePTR(data, base):
+ rdata = DNSRData()
+ (rest, rdata.ptrdname) = dnsParseLabel(data, base)
+ if DEBUG_DNSCLIENT:
+ print "PTR = \"%s\"." % rdata.ptrdname
+ return rdata
+
+def dnsParseSRV(data, base):
+ rdata = DNSRData()
+ format = "!HHH"
+ flen = struct.calcsize(format)
+ if len(data) < flen:
+ return None
+
+ (rdata.priority, rdata.weight, rdata.port) = struct.unpack(format, data[:flen])
+ (rest, rdata.server) = dnsParseLabel(data[flen:], base)
+ if DEBUG_DNSCLIENT:
+ print "SRV(server) = \"%s\"." % rdata.server
+ print "SRV(weight) = %d." % rdata.weight
+ print "SRV(priority) = %d." % rdata.priority
+ print "SRV(port) = %d." % rdata.port
+ return rdata
+
+def dnsParseResults(results):
+ try:
+ header = unpackQueryHeader(results)
+ except struct.error:
+ return []
+
+ if header.dns_qr != 1: # should be a response
+ return []
+
+ if header.dns_rcode != 0: # should be no error
+ return []
+
+ rest = results[header.size():]
+
+ rrlist = []
+
+ for i in xrange(header.dns_qdcount):
+ if not rest:
+ return []
+
+ qq = DNSResult()
+
+ (rest, label) = dnsParseLabel(rest, results)
+ if label is None:
+ return []
+
+ if len(rest) < qq.qsize():
+ return []
+
+ qq.qunpack(rest)
+
+ rest = rest[qq.qsize():]
+
+ if DEBUG_DNSCLIENT:
+ print "Queried for '%s', class = %d, type = %d." % (label,
+ qq.dns_class, qq.dns_type)
+
+ for i in xrange(header.dns_ancount + header.dns_nscount + header.dns_arcount):
+ (rest, label) = dnsParseLabel(rest, results)
+ if label is None:
+ return []
+
+ rr = DNSResult()
+
+ rr.dns_name = label
+
+ if len(rest) < rr.size():
+ return []
+
+ rr.unpack(rest)
+
+ rest = rest[rr.size():]
+
+ if DEBUG_DNSCLIENT:
+ print "Answer %d for '%s', class = %d, type = %d, ttl = %d." % (i,
+ rr.dns_name, rr.dns_class, rr.dns_type,
+ rr.dns_ttl)
+
+ if len(rest) < rr.dns_rlength:
+ if DEBUG_DNSCLIENT:
+ print "Answer too short."
+ return []
+
+ fmap = { DNS_T_A: dnsParseA, DNS_T_NS: dnsParseNS,
+ DNS_T_CNAME: dnsParseCNAME, DNS_T_SOA: dnsParseSOA,
+ DNS_T_NULL: dnsParseNULL, DNS_T_WKS: dnsParseWKS,
+ DNS_T_PTR: dnsParsePTR, DNS_T_HINFO: dnsParseHINFO,
+ DNS_T_MX: dnsParseMX, DNS_T_TXT: dnsParseTXT,
+ DNS_T_SRV: dnsParseSRV}
+
+ if not rr.dns_type in fmap:
+ if DEBUG_DNSCLIENT:
+ print "Don't know how to parse RR type %d!" % rr.dns_type
+ else:
+ rr.rdata = fmap[rr.dns_type](rest[:rr.dns_rlength], results)
+
+ rest = rest[rr.dns_rlength:]
+ rrlist += [rr]
+
+ return rrlist
+
+def query(query, qclass, qtype):
+ qdata = dnsFormatQuery(query, qclass, qtype)
+ if not qdata:
+ return []
+ answer = acutil.res_send(qdata)
+ if not answer:
+ return []
+ return dnsParseResults(answer)
+
+if __name__ == '__main__':
+ DEBUG_DNSCLIENT = True
+ print "Sending query."
+ rr = query(len(sys.argv) > 1 and sys.argv[1] or "devserv.devel.redhat.com.",
+ DNS_C_IN, DNS_T_ANY)
+ sys.exit(0)
diff --git a/ipapython/entity.py b/ipapython/entity.py
new file mode 100644
index 000000000..580cbd00b
--- /dev/null
+++ b/ipapython/entity.py
@@ -0,0 +1,202 @@
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import ldap
+import ldif
+import re
+import cStringIO
+import copy
+
+import ipapython.ipautil
+
+def utf8_encode_value(value):
+ if isinstance(value,unicode):
+ return value.encode('utf-8')
+ return value
+
+def utf8_encode_values(values):
+ if isinstance(values,list) or isinstance(values,tuple):
+ return map(utf8_encode_value, values)
+ else:
+ return utf8_encode_value(values)
+
+def copy_CIDict(x):
+ """Do a deep copy of a CIDict"""
+ y = {}
+ for key, value in x.iteritems():
+ y[copy.deepcopy(key)] = copy.deepcopy(value)
+ return y
+
+class Entity:
+ """This class represents an IPA user. An LDAP entry consists of a DN
+ and a list of attributes. Each attribute consists of a name and a list of
+ values. For the time being I will maintain this.
+
+ In python-ldap, entries are returned as a list of 2-tuples.
+ Instance variables:
+ dn - string - the string DN of the entry
+ data - CIDict - case insensitive dict of the attributes and values
+ orig_data - CIDict - case insentiive dict of the original attributes and values"""
+
+ def __init__(self,entrydata=None):
+ """data is the raw data returned from the python-ldap result method,
+ which is a search result entry or a reference or None.
+ If creating a new empty entry, data is the string DN."""
+ if entrydata:
+ if isinstance(entrydata,tuple):
+ self.dn = entrydata[0]
+ self.data = ipapython.ipautil.CIDict(entrydata[1])
+ elif isinstance(entrydata,str) or isinstance(entrydata,unicode):
+ self.dn = entrydata
+ self.data = ipapython.ipautil.CIDict()
+ elif isinstance(entrydata,dict):
+ self.dn = entrydata['dn']
+ del entrydata['dn']
+ self.data = ipapython.ipautil.CIDict(entrydata)
+ else:
+ self.dn = ''
+ self.data = ipapython.ipautil.CIDict()
+
+ self.orig_data = ipapython.ipautil.CIDict(copy_CIDict(self.data))
+
+ def __nonzero__(self):
+ """This allows us to do tests like if entry: returns false if there is no data,
+ true otherwise"""
+ return self.data != None and len(self.data) > 0
+
+ def hasAttr(self,name):
+ """Return True if this entry has an attribute named name, False otherwise"""
+ return self.data and self.data.has_key(name)
+
+ def __setattr__(self,name,value):
+ """One should use setValue() or setValues() to set values except for
+ dn and data which are special."""
+ if name != 'dn' and name != 'data' and name != 'orig_data':
+ raise KeyError, 'use setValue() or setValues()'
+ else:
+ self.__dict__[name] = value
+
+ def __getattr__(self,name):
+ """If name is the name of an LDAP attribute, return the first value for that
+ attribute - equivalent to getValue - this allows the use of
+ entry.cn
+ instead of
+ entry.getValue('cn')
+ This also allows us to return None if an attribute is not found rather than
+ throwing an exception"""
+ return self.getValue(name)
+
+ def getValues(self,name):
+ """Get the list (array) of values for the attribute named name"""
+ return self.data.get(name)
+
+ def getValue(self,name,default=None):
+ """Get the first value for the attribute named name"""
+ value = self.data.get(name,default)
+ if isinstance(value,list) or isinstance(value,tuple):
+ return value[0]
+ else:
+ return value
+
+ def setValue(self,name,*value):
+ """Value passed in may be a single value, several values, or a single sequence.
+ For example:
+ ent.setValue('name', 'value')
+ ent.setValue('name', 'value1', 'value2', ..., 'valueN')
+ ent.setValue('name', ['value1', 'value2', ..., 'valueN'])
+ ent.setValue('name', ('value1', 'value2', ..., 'valueN'))
+ Since *value is a tuple, we may have to extract a list or tuple from that
+ tuple as in the last two examples above"""
+ if (len(value) < 1):
+ return
+ if (len(value) == 1):
+ self.data[name] = utf8_encode_values(value[0])
+ else:
+ self.data[name] = utf8_encode_values(value)
+
+ setValues = setValue
+
+ def setValueNotEmpty(self,name,*value):
+ """Similar to setValue() but will not set an empty field. This
+ is an attempt to avoid adding empty attributes."""
+ if (len(value) >= 1) and value[0] and len(value[0]) > 0:
+ if isinstance(value[0], list):
+ if len(value[0][0]) > 0:
+ self.setValue(name, *value)
+ return
+ else:
+ self.setValue(name, *value)
+ return
+
+ # At this point we have an empty incoming value. See if they are
+ # trying to erase the current value. If so we'll delete it so
+ # it gets marked as removed in the modlist.
+ v = self.getValues(name)
+ if v:
+ self.delValue(name)
+
+ return
+
+ def delValue(self,name):
+ """Remove the attribute named name."""
+ if self.data.get(name,None):
+ del self.data[name]
+
+ def toTupleList(self):
+ """Convert the attrs and values to a list of 2-tuples. The first element
+ of the tuple is the attribute name. The second element is either a
+ single value or a list of values."""
+ return self.data.items()
+
+ def toDict(self):
+ """Convert the attrs and values to a dict. The dict is keyed on the
+ attribute name. The value is either single value or a list of values."""
+ result = ipapython.ipautil.CIDict(self.data)
+ result['dn'] = self.dn
+ return result
+
+ def attrList(self):
+ """Return a list of all attributes in the entry"""
+ return self.data.keys()
+
+ def origDataDict(self):
+ """Returns a dict of the original values of the user. Used for updates."""
+ result = ipapython.ipautil.CIDict(self.orig_data)
+ result['dn'] = self.dn
+ return result
+
+# def __str__(self):
+# """Convert the Entry to its LDIF representation"""
+# return self.__repr__()
+#
+# # the ldif class base64 encodes some attrs which I would rather see in raw form - to
+# # encode specific attrs as base64, add them to the list below
+# ldif.safe_string_re = re.compile('^$')
+# base64_attrs = ['nsstate', 'krbprincipalkey', 'krbExtraData']
+#
+# def __repr__(self):
+# """Convert the Entry to its LDIF representation"""
+# sio = cStringIO.StringIO()
+# # what's all this then? the unparse method will currently only accept
+# # a list or a dict, not a class derived from them. self.data is a
+# # cidict, so unparse barfs on it. I've filed a bug against python-ldap,
+# # but in the meantime, we have to convert to a plain old dict for printing
+# # I also don't want to see wrapping, so set the line width really high (1000)
+# newdata = {}
+# newdata.update(self.data)
+# ldif.LDIFWriter(sio,User.base64_attrs,1000).unparse(self.dn,newdata)
+# return sio.getvalue()
diff --git a/ipapython/ipa.conf b/ipapython/ipa.conf
new file mode 100644
index 000000000..516f764d5
--- /dev/null
+++ b/ipapython/ipa.conf
@@ -0,0 +1,3 @@
+[defaults]
+# realm = EXAMPLE.COM
+# server = ipa.example.com
diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py
new file mode 100644
index 000000000..57f5dcd9b
--- /dev/null
+++ b/ipapython/ipautil.py
@@ -0,0 +1,969 @@
+# Authors: Simo Sorce <ssorce@redhat.com>
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+SHARE_DIR = "/usr/share/ipa/"
+PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins"
+
+import string
+import tempfile
+import logging
+import subprocess
+import random
+import os, sys, traceback, readline
+import stat
+import shutil
+
+from ipapython import ipavalidate
+from types import *
+
+import re
+import xmlrpclib
+import datetime
+from ipapython import config
+try:
+ from subprocess import CalledProcessError
+ class CalledProcessError(subprocess.CalledProcessError):
+ def __init__(self, returncode, cmd):
+ super(CalledProcessError, self).__init__(returncode, cmd)
+except ImportError:
+ # Python 2.4 doesn't implement CalledProcessError
+ class CalledProcessError(Exception):
+ """This exception is raised when a process run by check_call() returns
+ a non-zero exit status. The exit status will be stored in the
+ returncode attribute."""
+ def __init__(self, returncode, cmd):
+ self.returncode = returncode
+ self.cmd = cmd
+ def __str__(self):
+ return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
+
+def get_domain_name():
+ try:
+ config.init_config()
+ domain_name = config.config.get_domain()
+ except Exception, e:
+ return None
+
+ return domain_name
+
+def realm_to_suffix(realm_name):
+ s = realm_name.split(".")
+ terms = ["dc=" + x.lower() for x in s]
+ return ",".join(terms)
+
+def template_str(txt, vars):
+ return string.Template(txt).substitute(vars)
+
+def template_file(infilename, vars):
+ txt = open(infilename).read()
+ return template_str(txt, vars)
+
+def write_tmp_file(txt):
+ fd = tempfile.NamedTemporaryFile()
+ fd.write(txt)
+ fd.flush()
+
+ return fd
+
+def run(args, stdin=None):
+ if stdin:
+ p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
+ stdout,stderr = p.communicate(stdin)
+ else:
+ p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
+ stdout,stderr = p.communicate()
+
+ logging.info(stdout)
+ logging.info(stderr)
+
+ if p.returncode != 0:
+ raise CalledProcessError(p.returncode, ' '.join(args))
+
+ return (stdout, stderr)
+
+def file_exists(filename):
+ try:
+ mode = os.stat(filename)[stat.ST_MODE]
+ if stat.S_ISREG(mode):
+ return True
+ else:
+ return False
+ except:
+ return False
+
+def dir_exists(filename):
+ try:
+ mode = os.stat(filename)[stat.ST_MODE]
+ if stat.S_ISDIR(mode):
+ return True
+ else:
+ return False
+ except:
+ return False
+
+def install_file(fname, dest):
+ if file_exists(dest):
+ os.rename(dest, dest + ".orig")
+ shutil.move(fname, dest)
+
+def backup_file(fname):
+ if file_exists(fname):
+ os.rename(fname, fname + ".orig")
+
+# uses gpg to compress and encrypt a file
+def encrypt_file(source, dest, password, workdir = None):
+ if type(source) is not StringType or not len(source):
+ raise ValueError('Missing Source File')
+ #stat it so that we get back an exception if it does no t exist
+ os.stat(source)
+
+ if type(dest) is not StringType or not len(dest):
+ raise ValueError('Missing Destination File')
+
+ if type(password) is not StringType or not len(password):
+ raise ValueError('Missing Password')
+
+ #create a tempdir so that we can clean up with easily
+ tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
+ gpgdir = tempdir+"/.gnupg"
+
+ try:
+ try:
+ #give gpg a fake dir so that we can leater remove all
+ #the cruft when we clean up the tempdir
+ os.mkdir(gpgdir)
+ args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-c', source]
+ run(args, password)
+ except:
+ raise
+ finally:
+ #job done, clean up
+ shutil.rmtree(tempdir, ignore_errors=True)
+
+
+def decrypt_file(source, dest, password, workdir = None):
+ if type(source) is not StringType or not len(source):
+ raise ValueError('Missing Source File')
+ #stat it so that we get back an exception if it does no t exist
+ os.stat(source)
+
+ if type(dest) is not StringType or not len(dest):
+ raise ValueError('Missing Destination File')
+
+ if type(password) is not StringType or not len(password):
+ raise ValueError('Missing Password')
+
+ #create a tempdir so that we can clean up with easily
+ tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
+ gpgdir = tempdir+"/.gnupg"
+
+ try:
+ try:
+ #give gpg a fake dir so that we can leater remove all
+ #the cruft when we clean up the tempdir
+ os.mkdir(gpgdir)
+ args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-d', source]
+ run(args, password)
+ except:
+ raise
+ finally:
+ #job done, clean up
+ shutil.rmtree(tempdir, ignore_errors=True)
+
+
+class CIDict(dict):
+ """
+ Case-insensitive but case-respecting dictionary.
+
+ This code is derived from python-ldap's cidict.py module,
+ written by stroeder: http://python-ldap.sourceforge.net/
+
+ This version extends 'dict' so it works properly with TurboGears.
+ If you extend UserDict, isinstance(foo, dict) returns false.
+ """
+
+ def __init__(self,default=None):
+ super(CIDict, self).__init__()
+ self._keys = {}
+ self.update(default or {})
+
+ def __getitem__(self,key):
+ return super(CIDict,self).__getitem__(string.lower(key))
+
+ def __setitem__(self,key,value):
+ lower_key = string.lower(key)
+ self._keys[lower_key] = key
+ return super(CIDict,self).__setitem__(string.lower(key),value)
+
+ def __delitem__(self,key):
+ lower_key = string.lower(key)
+ del self._keys[lower_key]
+ return super(CIDict,self).__delitem__(string.lower(key))
+
+ def update(self,dict):
+ for key in dict.keys():
+ self[key] = dict[key]
+
+ def has_key(self,key):
+ return super(CIDict, self).has_key(string.lower(key))
+
+ def get(self,key,failobj=None):
+ try:
+ return self[key]
+ except KeyError:
+ return failobj
+
+ def keys(self):
+ return self._keys.values()
+
+ def items(self):
+ result = []
+ for k in self._keys.values():
+ result.append((k,self[k]))
+ return result
+
+ def copy(self):
+ copy = {}
+ for k in self._keys.values():
+ copy[k] = self[k]
+ return copy
+
+ def iteritems(self):
+ return self.copy().iteritems()
+
+ def iterkeys(self):
+ return self.copy().iterkeys()
+
+ def setdefault(self,key,value=None):
+ try:
+ return self[key]
+ except KeyError:
+ self[key] = value
+ return value
+
+ def pop(self, key, *args):
+ try:
+ value = self[key]
+ del self[key]
+ return value
+ except KeyError:
+ if len(args) == 1:
+ return args[0]
+ raise
+
+ def popitem(self):
+ (lower_key,value) = super(CIDict,self).popitem()
+ key = self._keys[lower_key]
+ del self._keys[lower_key]
+
+ return (key,value)
+
+
+#
+# The safe_string_re regexp and needs_base64 function are extracted from the
+# python-ldap ldif module, which was
+# written by Michael Stroeder <michael@stroeder.com>
+# http://python-ldap.sourceforge.net
+#
+# It was extracted because ipaldap.py is naughtily reaching into the ldif
+# module and squashing this regexp.
+#
+SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)'
+safe_string_re = re.compile(SAFE_STRING_PATTERN)
+
+def needs_base64(s):
+ """
+ returns 1 if s has to be base-64 encoded because of special chars
+ """
+ return not safe_string_re.search(s) is None
+
+
+def wrap_binary_data(data):
+ """Converts all binary data strings into Binary objects for transport
+ back over xmlrpc."""
+ if isinstance(data, str):
+ if needs_base64(data):
+ return xmlrpclib.Binary(data)
+ else:
+ return data
+ elif isinstance(data, list) or isinstance(data,tuple):
+ retval = []
+ for value in data:
+ retval.append(wrap_binary_data(value))
+ return retval
+ elif isinstance(data, dict):
+ retval = {}
+ for (k,v) in data.iteritems():
+ retval[k] = wrap_binary_data(v)
+ return retval
+ else:
+ return data
+
+
+def unwrap_binary_data(data):
+ """Converts all Binary objects back into strings."""
+ if isinstance(data, xmlrpclib.Binary):
+ # The data is decoded by the xmlproxy, but is stored
+ # in a binary object for us.
+ return str(data)
+ elif isinstance(data, str):
+ return data
+ elif isinstance(data, list) or isinstance(data,tuple):
+ retval = []
+ for value in data:
+ retval.append(unwrap_binary_data(value))
+ return retval
+ elif isinstance(data, dict):
+ retval = {}
+ for (k,v) in data.iteritems():
+ retval[k] = unwrap_binary_data(v)
+ return retval
+ else:
+ return data
+
+class GeneralizedTimeZone(datetime.tzinfo):
+ """This class is a basic timezone wrapper for the offset specified
+ in a Generalized Time. It is dst-ignorant."""
+ def __init__(self,offsetstr="Z"):
+ super(GeneralizedTimeZone, self).__init__()
+
+ self.name = offsetstr
+ self.houroffset = 0
+ self.minoffset = 0
+
+ if offsetstr == "Z":
+ self.houroffset = 0
+ self.minoffset = 0
+ else:
+ if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr):
+ self.houroffset = int(offsetstr[0:3])
+ offsetstr = offsetstr[3:]
+ if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr):
+ self.minoffset = int(offsetstr[0:2])
+ offsetstr = offsetstr[2:]
+ if len(offsetstr) > 0:
+ raise ValueError()
+ if self.houroffset < 0:
+ self.minoffset *= -1
+
+ def utcoffset(self, dt):
+ return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset)
+
+ def dst(self, dt):
+ return datetime.timedelta(0)
+
+ def tzname(self, dt):
+ return self.name
+
+
+def parse_generalized_time(timestr):
+ """Parses are Generalized Time string (as specified in X.680),
+ returning a datetime object. Generalized Times are stored inside
+ the krbPasswordExpiration attribute in LDAP.
+
+ This method doesn't attempt to be perfect wrt timezones. If python
+ can't be bothered to implement them, how can we..."""
+
+ if len(timestr) < 8:
+ return None
+ try:
+ date = timestr[:8]
+ time = timestr[8:]
+
+ year = int(date[:4])
+ month = int(date[4:6])
+ day = int(date[6:8])
+
+ hour = min = sec = msec = 0
+ tzone = None
+
+ if (len(time) >= 2) and re.match(r'\d', time[0]):
+ hour = int(time[:2])
+ time = time[2:]
+ if len(time) >= 2 and (time[0] == "," or time[0] == "."):
+ hour_fraction = "."
+ time = time[1:]
+ while (len(time) > 0) and re.match(r'\d', time[0]):
+ hour_fraction += time[0]
+ time = time[1:]
+ total_secs = int(float(hour_fraction) * 3600)
+ min, sec = divmod(total_secs, 60)
+
+ if (len(time) >= 2) and re.match(r'\d', time[0]):
+ min = int(time[:2])
+ time = time[2:]
+ if len(time) >= 2 and (time[0] == "," or time[0] == "."):
+ min_fraction = "."
+ time = time[1:]
+ while (len(time) > 0) and re.match(r'\d', time[0]):
+ min_fraction += time[0]
+ time = time[1:]
+ sec = int(float(min_fraction) * 60)
+
+ if (len(time) >= 2) and re.match(r'\d', time[0]):
+ sec = int(time[:2])
+ time = time[2:]
+ if len(time) >= 2 and (time[0] == "," or time[0] == "."):
+ sec_fraction = "."
+ time = time[1:]
+ while (len(time) > 0) and re.match(r'\d', time[0]):
+ sec_fraction += time[0]
+ time = time[1:]
+ msec = int(float(sec_fraction) * 1000000)
+
+ if (len(time) > 0):
+ tzone = GeneralizedTimeZone(time)
+
+ return datetime.datetime(year, month, day, hour, min, sec, msec, tzone)
+
+ except ValueError:
+ return None
+
+def ipa_generate_password():
+ rndpwd = ''
+ r = random.SystemRandom()
+ for x in range(12):
+ rndpwd += chr(r.randint(32,126))
+ return rndpwd
+
+
+def format_list(items, quote=None, page_width=80):
+ '''Format a list of items formatting them so they wrap to fit the
+ available width. The items will be sorted.
+
+ The items may optionally be quoted. The quote parameter may either be
+ a string, in which case it is added before and after the item. Or the
+ quote parameter may be a pair (either a tuple or list). In this case
+ quote[0] is left hand quote and quote[1] is the right hand quote.
+ '''
+ left_quote = right_quote = ''
+ num_items = len(items)
+ if not num_items: return ""
+
+ if quote is not None:
+ if type(quote) in StringTypes:
+ left_quote = right_quote = quote
+ elif type(quote) is TupleType or type(quote) is ListType:
+ left_quote = quote[0]
+ right_quote = quote[1]
+
+ max_len = max(map(len, items))
+ max_len += len(left_quote) + len(right_quote)
+ num_columns = (page_width + max_len) / (max_len+1)
+ num_rows = (num_items + num_columns - 1) / num_columns
+ items.sort()
+
+ rows = [''] * num_rows
+ i = row = col = 0
+
+ while i < num_items:
+ row = 0
+ if col == 0:
+ separator = ''
+ else:
+ separator = ' '
+
+ while i < num_items and row < num_rows:
+ rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote))
+ i += 1
+ row += 1
+ col += 1
+ return '\n'.join(rows)
+
+key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))")
+def parse_key_value_pairs(input):
+ ''' Given a string composed of key=value pairs parse it and return
+ a dict of the key/value pairs. Keys must be a word, a key must be followed
+ by an equal sign (=) and a value. The value may be a single word or may be
+ quoted. Quotes may be either single or double quotes, but must be balanced.
+ Inside the quoted text the same quote used to start the quoted value may be
+ used if it is escaped by preceding it with a backslash (\).
+ White space between the key, the equal sign, and the value is ignored.
+ Values are always strings. Empty values must be specified with an empty
+ quoted string, it's value after parsing will be an empty string.
+
+ Example: The string
+
+ arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote"
+
+ will produce
+
+ arg0= arg1=1
+ arg2=two
+ arg3=three's a crowd
+ arg4=this is a " quote
+ '''
+
+ kv_dict = {}
+ for match in key_value_re.finditer(input):
+ key = match.group(1)
+ quote = match.group('quote')
+ if match.group(5):
+ value = match.group(6)
+ if value is None: value = ''
+ value = re.sub('\\\%s' % quote, quote, value)
+ else:
+ value = match.group(2)
+ kv_dict[key] = value
+ return kv_dict
+
+def parse_items(text):
+ '''Given text with items separated by whitespace or comma, return a list of those items'''
+ split_re = re.compile('[ ,\t\n]+')
+ items = split_re.split(text)
+ for item in items[:]:
+ if not item: items.remove(item)
+ return items
+
+def read_pairs_file(filename):
+ comment_re = re.compile('#.*$', re.MULTILINE)
+ if filename == '-':
+ fd = sys.stdin
+ else:
+ fd = open(filename)
+ text = fd.read()
+ text = comment_re.sub('', text) # kill comments
+ pairs = parse_key_value_pairs(text)
+ if fd != sys.stdin: fd.close()
+ return pairs
+
+def read_items_file(filename):
+ comment_re = re.compile('#.*$', re.MULTILINE)
+ if filename == '-':
+ fd = sys.stdin
+ else:
+ fd = open(filename)
+ text = fd.read()
+ text = comment_re.sub('', text) # kill comments
+ items = parse_items(text)
+ if fd != sys.stdin: fd.close()
+ return items
+
+def user_input(prompt, default = None, allow_empty = True):
+ if default == None:
+ while True:
+ ret = raw_input("%s: " % prompt)
+ if allow_empty or ret.strip():
+ return ret
+
+ if isinstance(default, basestring):
+ while True:
+ ret = raw_input("%s [%s]: " % (prompt, default))
+ if not ret and (allow_empty or default):
+ return default
+ elif ret.strip():
+ return ret
+ if isinstance(default, bool):
+ if default:
+ choice = "yes"
+ else:
+ choice = "no"
+ while True:
+ ret = raw_input("%s [%s]: " % (prompt, choice))
+ if not ret:
+ return default
+ elif ret.lower()[0] == "y":
+ return True
+ elif ret.lower()[0] == "n":
+ return False
+ if isinstance(default, int):
+ while True:
+ try:
+ ret = raw_input("%s [%s]: " % (prompt, default))
+ if not ret:
+ return default
+ ret = int(ret)
+ except ValueError:
+ pass
+ else:
+ return ret
+
+def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True):
+ while True:
+ ret = user_input(prompt, default, allow_empty)
+ if ipavalidate.Plain(ret, not allow_empty, allow_spaces):
+ return ret
+
+def user_input_path(prompt, default = None, allow_empty = True):
+ if default != None and allow_empty:
+ prompt += " (enter \"none\" for empty)"
+ while True:
+ ret = user_input(prompt, default, allow_empty)
+ if allow_empty and ret.lower() == "none":
+ return ""
+ if ipavalidate.Path(ret, not allow_empty):
+ return ret
+
+class AttributeValueCompleter:
+ '''
+ Gets input from the user in the form "lhs operator rhs"
+ TAB completes partial input.
+ lhs completes to a name in @lhs_names
+ The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will
+ complete to the operator and a default value.
+ Default values for a lhs value can specified as:
+ - a string, all lhs values will use this default
+ - a dict, the lhs value is looked up in the dict to return the default or None
+ - a function with a single arg, the lhs value, it returns the default or None
+
+ After creating the completer you must open it to set the terminal
+ up, Then get a line of input from the user by calling read_input()
+ which returns two values, the lhs and rhs, which might be None if
+ lhs or rhs was not parsed. After you are done getting input you
+ should close the completer to restore the terminal.
+
+ Example: (note this is essentially what the convenience function get_pairs() does)
+
+ This will allow the user to autocomplete foo & foobar, both have
+ defaults defined in a dict. In addition the foobar attribute must
+ be specified before the prompting loop will exit. Also, this
+ example show how to require that each attrbute entered by the user
+ is valid.
+
+ attrs = ['foo', 'foobar']
+ defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'}
+ mandatory_attrs = ['foobar']
+
+ c = AttributeValueCompleter(attrs, defaults)
+ c.open()
+ mandatory_attrs_remaining = mandatory_attrs[:]
+
+ while True:
+ if mandatory_attrs_remaining:
+ attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0])
+ try:
+ mandatory_attrs_remaining.remove(attribute)
+ except ValueError:
+ pass
+ else:
+ attribute, value = c.read_input("Enter: ")
+ if attribute is None:
+ # Are we done?
+ if mandatory_attrs_remaining:
+ print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
+ continue
+ else:
+ break
+ if attribute not in attrs:
+ print "ERROR: %s is not a valid attribute" % (attribute)
+ else:
+ print "got '%s' = '%s'" % (attribute, value)
+
+ c.close()
+ print "exiting..."
+ '''
+
+ def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =',
+ operator='=', strip_rhs=True):
+ self.lhs_names = lhs_names
+ self.default_value = default_value
+ # lhs_regexp must have named group 'lhs' which returns the contents of the lhs
+ self.lhs_regexp = lhs_regexp
+ self.lhs_re = re.compile(self.lhs_regexp)
+ self.lhs_delims = lhs_delims
+ self.operator = operator
+ self.strip_rhs = strip_rhs
+ self.pairs = None
+ self._reset()
+
+ def _reset(self):
+ self.lhs = None
+ self.lhs_complete = False
+ self.operator_complete = False
+ self.rhs = None
+
+ def open(self):
+ # Save state
+ self.prev_completer = readline.get_completer()
+ self.prev_completer_delims = readline.get_completer_delims()
+
+ # Set up for ourself
+ readline.parse_and_bind("tab: complete")
+ readline.set_completer(self.complete)
+ readline.set_completer_delims(self.lhs_delims)
+
+ def close(self):
+ # Restore previous state
+ readline.set_completer_delims(self.prev_completer_delims)
+ readline.set_completer(self.prev_completer)
+
+ def parse_input(self):
+ '''We are looking for 3 tokens: <lhs,op,rhs>
+ Extract as much of each token as possible.
+ Set flags indicating if token is fully parsed.
+ '''
+ try:
+ self._reset()
+ buf_len = len(self.line_buffer)
+ pos = 0
+ lhs_match = self.lhs_re.search(self.line_buffer, pos)
+ if not lhs_match: return # no lhs content
+ self.lhs = lhs_match.group('lhs') # get lhs contents
+ pos = lhs_match.end('lhs') # new scanning position
+ if pos == buf_len: return # nothing after lhs, lhs incomplete
+ self.lhs_complete = True # something trails the lhs, lhs is complete
+ operator_beg = self.line_buffer.find(self.operator, pos) # locate operator
+ if operator_beg == -1: return # did not find the operator
+ self.operator_complete = True # operator fully parsed
+ operator_end = operator_beg + len(self.operator)
+ pos = operator_end # step over the operator
+ self.rhs = self.line_buffer[pos:]
+ except Exception, e:
+ traceback.print_exc()
+ print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e)
+
+ def get_default_value(self):
+ '''default_value can be a string, a dict, or a function.
+ If it's a string it's a global default for all attributes.
+ If it's a dict the default is looked up in the dict index by attribute.
+ If it's a function, the function is called with 1 parameter, the attribute
+ and it should return the default value for the attriubte or None'''
+
+ if not self.lhs_complete: raise ValueError("attribute not parsed")
+
+ # If the user previously provided a value let that override the supplied default
+ if self.pairs is not None:
+ prev_value = self.pairs.get(self.lhs)
+ if prev_value is not None: return prev_value
+
+ # No previous user provided value, query for a default
+ default_value_type = type(self.default_value)
+ if default_value_type is DictType:
+ return self.default_value.get(self.lhs, None)
+ elif default_value_type is FunctionType:
+ return self.default_value(self.lhs)
+ elif default_value_type is StringsType:
+ return self.default_value
+ else:
+ return None
+
+ def get_lhs_completions(self, text):
+ if text:
+ self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)]
+ else:
+ self.completions = self.lhs_names
+
+ def complete(self, text, state):
+ self.line_buffer= readline.get_line_buffer()
+ self.parse_input()
+ if not self.lhs_complete:
+ # lhs is not complete, set up to complete the lhs
+ if state == 0:
+ beg = readline.get_begidx()
+ end = readline.get_endidx()
+ self.get_lhs_completions(self.line_buffer[beg:end])
+ if state >= len(self.completions): return None
+ return self.completions[state]
+
+
+ elif not self.operator_complete:
+ # lhs is complete, but the operator is not so we complete
+ # by inserting the operator manually.
+ # Also try to complete the default value at this time.
+ readline.insert_text('%s ' % self.operator)
+ default_value = self.get_default_value()
+ if default_value is not None:
+ readline.insert_text(default_value)
+ readline.redisplay()
+ return None
+ else:
+ # lhs and operator are complete, if the the rhs is blank
+ # (either empty or only only whitespace) then attempt
+ # to complete by inserting the default value, otherwise
+ # there is nothing we can complete to so we're done.
+ if self.rhs.strip():
+ return None
+ default_value = self.get_default_value()
+ if default_value is not None:
+ readline.insert_text(default_value)
+ readline.redisplay()
+ return None
+
+ def pre_input_hook(self):
+ readline.insert_text('%s %s ' % (self.initial_lhs, self.operator))
+ readline.redisplay()
+
+ def read_input(self, prompt, initial_lhs=None):
+ self.initial_lhs = initial_lhs
+ try:
+ self._reset()
+ if initial_lhs is None:
+ readline.set_pre_input_hook(None)
+ else:
+ readline.set_pre_input_hook(self.pre_input_hook)
+ self.line_buffer = raw_input(prompt).strip()
+ self.parse_input()
+ if self.strip_rhs and self.rhs is not None:
+ return self.lhs, self.rhs.strip()
+ else:
+ return self.lhs, self.rhs
+ except EOFError:
+ return None, None
+
+ def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True):
+ self.pairs = {}
+ if mandatory_attrs:
+ mandatory_attrs_remaining = mandatory_attrs[:]
+ else:
+ mandatory_attrs_remaining = []
+
+ print "Enter name = value"
+ print "Press <ENTER> to accept, a blank line terminates input"
+ print "Pressing <TAB> will auto completes name, assignment, and value"
+ print
+ while True:
+ if mandatory_attrs_remaining:
+ attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0])
+ else:
+ attribute, value = self.read_input(prompt)
+ if attribute is None:
+ # Are we done?
+ if mandatory_attrs_remaining:
+ print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
+ continue
+ else:
+ break
+ if value is None:
+ if value_required:
+ print "ERROR: you must specify a value for %s" % attribute
+ continue
+ else:
+ if must_match and attribute not in self.lhs_names:
+ print "ERROR: %s is not a valid name" % (attribute)
+ continue
+ if validate_callback is not None:
+ if not validate_callback(attribute, value):
+ print "ERROR: %s is not valid for %s" % (value, attribute)
+ continue
+ try:
+ mandatory_attrs_remaining.remove(attribute)
+ except ValueError:
+ pass
+
+ self.pairs[attribute] = value
+ return self.pairs
+
+class ItemCompleter:
+ '''
+ Prompts the user for items in a list of items with auto completion.
+ TAB completes partial input.
+ More than one item can be specifed during input, whitespace and/or comma's seperate.
+ Example:
+
+ possible_items = ['foo', 'bar']
+ c = ItemCompleter(possible_items)
+ c.open()
+ # Use read_input() to limit input to a single carriage return (e.g. <ENTER>)
+ #items = c.read_input("Enter: ")
+ # Use get_items to iterate until a blank line is entered.
+ items = c.get_items("Enter: ")
+ c.close()
+ print "items=%s" % (items)
+
+ '''
+
+ def __init__(self, items):
+ self.items = items
+ self.initial_input = None
+ self.item_delims = ' \t,'
+ self.split_re = re.compile('[%s]+' % self.item_delims)
+
+ def open(self):
+ # Save state
+ self.prev_completer = readline.get_completer()
+ self.prev_completer_delims = readline.get_completer_delims()
+
+ # Set up for ourself
+ readline.parse_and_bind("tab: complete")
+ readline.set_completer(self.complete)
+ readline.set_completer_delims(self.item_delims)
+
+ def close(self):
+ # Restore previous state
+ readline.set_completer_delims(self.prev_completer_delims)
+ readline.set_completer(self.prev_completer)
+
+ def get_item_completions(self, text):
+ if text:
+ self.completions = [lhs for lhs in self.items if lhs.startswith(text)]
+ else:
+ self.completions = self.items
+
+ def complete(self, text, state):
+ self.line_buffer= readline.get_line_buffer()
+ if state == 0:
+ beg = readline.get_begidx()
+ end = readline.get_endidx()
+ self.get_item_completions(self.line_buffer[beg:end])
+ if state >= len(self.completions): return None
+ return self.completions[state]
+
+ def pre_input_hook(self):
+ readline.insert_text('%s %s ' % (self.initial_input, self.operator))
+ readline.redisplay()
+
+ def read_input(self, prompt, initial_input=None):
+ items = []
+
+ self.initial_input = initial_input
+ try:
+ if initial_input is None:
+ readline.set_pre_input_hook(None)
+ else:
+ readline.set_pre_input_hook(self.pre_input_hook)
+ self.line_buffer = raw_input(prompt).strip()
+ items = self.split_re.split(self.line_buffer)
+ for item in items[:]:
+ if not item: items.remove(item)
+ return items
+ except EOFError:
+ return items
+
+ def get_items(self, prompt, must_match=True):
+ items = []
+
+ print "Enter name [name ...]"
+ print "Press <ENTER> to accept, blank line or control-D terminates input"
+ print "Pressing <TAB> auto completes name"
+ print
+ while True:
+ new_items = self.read_input(prompt)
+ if not new_items: break
+ for item in new_items:
+ if must_match:
+ if item not in self.items:
+ print "ERROR: %s is not valid" % (item)
+ continue
+ if item in items: continue
+ items.append(item)
+
+ return items
+
+def get_gsserror(e):
+ """A GSSError exception looks differently in python 2.4 than it does
+ in python 2.5, deal with it."""
+
+ try:
+ primary = e[0]
+ secondary = e[1]
+ except:
+ primary = e[0][0]
+ secondary = e[0][1]
+
+ return (primary[0], secondary[0])
diff --git a/ipapython/ipavalidate.py b/ipapython/ipavalidate.py
new file mode 100644
index 000000000..63e0a7614
--- /dev/null
+++ b/ipapython/ipavalidate.py
@@ -0,0 +1,137 @@
+# Authors: Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import re
+
+def Email(mail, notEmpty=True):
+ """Do some basic validation of an e-mail address.
+ Return True if ok
+ Return False if not
+
+ If notEmpty is True the this will return an error if the field
+ is "" or None.
+ """
+ usernameRE = re.compile(r"^[^ \t\n\r@<>()]+$", re.I)
+ domainRE = re.compile(r"^[a-z0-9][a-z0-9\.\-_]*\.[a-z]+$", re.I)
+
+ if not mail or mail is None:
+ if notEmpty is True:
+ return False
+ else:
+ return True
+
+ mail = mail.strip()
+ s = mail.split('@', 1)
+ try:
+ username, domain=s
+ except ValueError:
+ return False
+ if not usernameRE.search(username):
+ return False
+ if not domainRE.search(domain):
+ return False
+
+ return True
+
+def Plain(text, notEmpty=False, allowSpaces=True):
+ """Do some basic validation of a plain text field
+ Return True if ok
+ Return False if not
+
+ If notEmpty is True the this will return an error if the field
+ is "" or None.
+ """
+ if (text is None) or (not text.strip()):
+ if notEmpty is True:
+ return False
+ else:
+ return True
+
+ if allowSpaces:
+ textRE = re.compile(r"^[a-zA-Z_\-0-9\'\ ]*$")
+ else:
+ textRE = re.compile(r"^[a-zA-Z_\-0-9\']*$")
+ if not textRE.search(text):
+ return False
+
+ return True
+
+def String(text, notEmpty=False):
+ """A string type. This is much looser in what it allows than plain"""
+
+ if text is None or not text.strip():
+ if notEmpty is True:
+ return False
+ else:
+ return True
+
+ return True
+
+def Path(text, notEmpty=False):
+ """Do some basic validation of a path
+ Return True if ok
+ Return False if not
+
+ If notEmpty is True the this will return an error if the field
+ is "" or None.
+ """
+ textRE = re.compile(r"^[a-zA-Z_\-0-9\\ \.\/\\:]*$")
+
+ if not text and notEmpty is True:
+ return False
+
+ if text is None:
+ if notEmpty is True:
+ return False
+ else:
+ return True
+
+ if not textRE.search(text):
+ return False
+
+ return True
+
+def GoodName(text, notEmpty=False):
+ """From shadow-utils:
+
+ User/group names must match gnu e-regex:
+ [a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?
+
+ as a non-POSIX, extension, allow "$" as the last char for
+ sake of Samba 3.x "add machine script"
+
+ Return True if ok
+ Return False if not
+ """
+ textRE = re.compile(r"^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?$")
+
+ if not text and notEmpty is True:
+ return False
+
+ if text is None:
+ if notEmpty is True:
+ return False
+ else:
+ return True
+
+ m = textRE.match(text)
+ if not m or text != m.group(0):
+ return False
+
+ return True
diff --git a/ipapython/radius_util.py b/ipapython/radius_util.py
new file mode 100644
index 000000000..8e66855eb
--- /dev/null
+++ b/ipapython/radius_util.py
@@ -0,0 +1,366 @@
+# Authors: John Dennis <jdennis@redhat.com>
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import sys
+import os
+import re
+import ldap
+import getpass
+import ldap.filter
+
+from ipapython import ipautil
+from ipapython.entity import Entity
+import ipapython.ipavalidate as ipavalidate
+
+
+__all__ = [
+ 'RADIUS_PKG_NAME',
+ 'RADIUS_PKG_CONFIG_DIR',
+ 'RADIUS_SERVICE_NAME',
+ 'RADIUS_USER',
+ 'RADIUS_IPA_KEYTAB_FILEPATH',
+ 'RADIUS_LDAP_ATTR_MAP_FILEPATH',
+ 'RADIUSD_CONF_FILEPATH',
+ 'RADIUSD_CONF_TEMPLATE_FILEPATH',
+ 'RADIUSD',
+
+ 'RadiusClient',
+ 'RadiusProfile',
+
+ 'clients_container',
+ 'radius_clients_basedn',
+ 'radius_client_filter',
+ 'radius_client_dn',
+
+ 'profiles_container',
+ 'radius_profiles_basedn',
+ 'radius_profile_filter',
+ 'radius_profile_dn',
+
+ 'radius_client_ldap_attr_to_radius_attr',
+ 'radius_client_attr_to_ldap_attr',
+
+ 'radius_profile_ldap_attr_to_radius_attr',
+ 'radius_profile_attr_to_ldap_attr',
+
+ 'get_secret',
+ 'validate_ip_addr',
+ 'validate_secret',
+ 'validate_name',
+ 'validate_nastype',
+ 'validate_desc',
+ 'validate',
+ ]
+
+#------------------------------------------------------------------------------
+
+RADIUS_PKG_NAME = 'freeradius'
+RADIUS_PKG_CONFIG_DIR = '/etc/raddb'
+
+RADIUS_SERVICE_NAME = 'radius'
+RADIUS_USER = 'radiusd'
+
+RADIUS_IPA_KEYTAB_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ipa.keytab')
+RADIUS_LDAP_ATTR_MAP_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ldap.attrmap')
+RADIUSD_CONF_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'radiusd.conf')
+RADIUSD_CONF_TEMPLATE_FILEPATH = os.path.join(ipautil.PLUGINS_SHARE_DIR, 'radius.radiusd.conf.template')
+
+RADIUSD = '/usr/sbin/radiusd'
+
+#------------------------------------------------------------------------------
+
+dotted_octet_re = re.compile(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)(/(\d+))?$")
+dns_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9.-]+$")
+# secret, name, nastype all have 31 char max in freeRADIUS, max ip address len is 255
+valid_secret_len = (1,31)
+valid_name_len = (1,31)
+valid_nastype_len = (1,31)
+valid_ip_addr_len = (1,255)
+
+valid_ip_addr_msg = '''\
+IP address must be either a DNS name (letters,digits,dot,hyphen, beginning with
+a letter),or a dotted octet followed by an optional mask (e.g 192.168.1.0/24)'''
+
+valid_desc_msg = "Description must text string"
+
+#------------------------------------------------------------------------------
+
+class RadiusClient(Entity):
+
+ def __init2__(self):
+ pass
+
+class RadiusProfile(Entity):
+
+ def __init2__(self):
+ pass
+
+
+#------------------------------------------------------------------------------
+
+def reverse_map_dict(src_dict):
+ reverse_dict = {}
+
+ for k,v in src_dict.items():
+ if reverse_dict.has_key(v):
+ raise ValueError("reverse_map_dict: collision on (%s) with values (%s),(%s)" % \
+ v, reverse_dict[v], src_dict[k])
+ reverse_dict[v] = k
+ return reverse_dict
+
+#------------------------------------------------------------------------------
+
+radius_client_ldap_attr_to_radius_attr = ipautil.CIDict({
+ 'radiusClientIPAddress' : 'Client-IP-Address',
+ 'radiusClientSecret' : 'Secret',
+ 'radiusClientNASType' : 'NAS-Type',
+ 'radiusClientShortName' : 'Name',
+ 'description' : 'Description',
+ })
+
+radius_client_attr_to_ldap_attr = reverse_map_dict(radius_client_ldap_attr_to_radius_attr)
+
+#------------------------------------------------------------------------------
+
+radius_profile_ldap_attr_to_radius_attr = ipautil.CIDict({
+ 'uid' : 'UID',
+ 'radiusArapFeatures' : 'Arap-Features',
+ 'radiusArapSecurity' : 'Arap-Security',
+ 'radiusArapZoneAccess' : 'Arap-Zone-Access',
+ 'radiusAuthType' : 'Auth-Type',
+ 'radiusCallbackId' : 'Callback-Id',
+ 'radiusCallbackNumber' : 'Callback-Number',
+ 'radiusCalledStationId' : 'Called-Station-Id',
+ 'radiusCallingStationId' : 'Calling-Station-Id',
+ 'radiusClass' : 'Class',
+ 'radiusClientIPAddress' : 'Client-IP-Address',
+ 'radiusExpiration' : 'Expiration',
+ 'radiusFilterId' : 'Filter-Id',
+ 'radiusFramedAppleTalkLink' : 'Framed-AppleTalk-Link',
+ 'radiusFramedAppleTalkNetwork' : 'Framed-AppleTalk-Network',
+ 'radiusFramedAppleTalkZone' : 'Framed-AppleTalk-Zone',
+ 'radiusFramedCompression' : 'Framed-Compression',
+ 'radiusFramedIPAddress' : 'Framed-IP-Address',
+ 'radiusFramedIPNetmask' : 'Framed-IP-Netmask',
+ 'radiusFramedIPXNetwork' : 'Framed-IPX-Network',
+ 'radiusFramedMTU' : 'Framed-MTU',
+ 'radiusFramedProtocol' : 'Framed-Protocol',
+ 'radiusFramedRoute' : 'Framed-Route',
+ 'radiusFramedRouting' : 'Framed-Routing',
+ 'radiusGroupName' : 'Group-Name',
+ 'radiusHint' : 'Hint',
+ 'radiusHuntgroupName' : 'Huntgroup-Name',
+ 'radiusIdleTimeout' : 'Idle-Timeout',
+ 'radiusLoginIPHost' : 'Login-IP-Host',
+ 'radiusLoginLATGroup' : 'Login-LAT-Group',
+ 'radiusLoginLATNode' : 'Login-LAT-Node',
+ 'radiusLoginLATPort' : 'Login-LAT-Port',
+ 'radiusLoginLATService' : 'Login-LAT-Service',
+ 'radiusLoginService' : 'Login-Service',
+ 'radiusLoginTCPPort' : 'Login-TCP-Port',
+ 'radiusLoginTime' : 'Login-Time',
+ 'radiusNASIpAddress' : 'NAS-IP-Address',
+ 'radiusPasswordRetry' : 'Password-Retry',
+ 'radiusPortLimit' : 'Port-Limit',
+ 'radiusProfileDn' : 'Profile-Dn',
+ 'radiusPrompt' : 'Prompt',
+ 'radiusProxyToRealm' : 'Proxy-To-Realm',
+ 'radiusRealm' : 'Realm',
+ 'radiusReplicateToRealm' : 'Replicate-To-Realm',
+ 'radiusReplyMessage' : 'Reply-Message',
+ 'radiusServiceType' : 'Service-Type',
+ 'radiusSessionTimeout' : 'Session-Timeout',
+ 'radiusSimultaneousUse' : 'Simultaneous-Use',
+ 'radiusStripUserName' : 'Strip-User-Name',
+ 'radiusTerminationAction' : 'Termination-Action',
+ 'radiusTunnelAssignmentId' : 'Tunnel-Assignment-Id',
+ 'radiusTunnelClientEndpoint' : 'Tunnel-Client-Endpoint',
+ 'radiusTunnelMediumType' : 'Tunnel-Medium-Type',
+ 'radiusTunnelPassword' : 'Tunnel-Password',
+ 'radiusTunnelPreference' : 'Tunnel-Preference',
+ 'radiusTunnelPrivateGroupId' : 'Tunnel-Private-Group-Id',
+ 'radiusTunnelServerEndpoint' : 'Tunnel-Server-Endpoint',
+ 'radiusTunnelType' : 'Tunnel-Type',
+ 'radiusUserCategory' : 'User-Category',
+ 'radiusVSA' : 'VSA',
+})
+
+radius_profile_attr_to_ldap_attr = reverse_map_dict(radius_profile_ldap_attr_to_radius_attr)
+
+#------------------------------------------------------------------------------
+
+clients_container = 'cn=clients,cn=radius'
+
+def radius_clients_basedn(container, suffix):
+ if container is None: container = clients_container
+ return '%s,%s' % (container, suffix)
+
+def radius_client_filter(ip_addr):
+ return "(&(radiusClientIPAddress=%s)(objectclass=radiusClientProfile))" % \
+ ldap.filter.escape_filter_chars(ip_addr)
+
+def radius_client_dn(client, container, suffix):
+ if container is None: container = clients_container
+ return 'radiusClientIPAddress=%s,%s,%s' % (ldap.dn.escape_dn_chars(client), container, suffix)
+
+# --
+
+profiles_container = 'cn=profiles,cn=radius'
+
+def radius_profiles_basedn(container, suffix):
+ if container is None: container = profiles_container
+ return '%s,%s' % (container, suffix)
+
+def radius_profile_filter(uid):
+ return "(&(uid=%s)(objectclass=radiusprofile))" % \
+ ldap.filter.escape_filter_chars(uid)
+
+def radius_profile_dn(uid, container, suffix):
+ if container is None: container = profiles_container
+ return 'uid=%s,%s,%s' % (ldap.dn.escape_dn_chars(uid), container, suffix)
+
+
+#------------------------------------------------------------------------------
+
+def get_ldap_attr_translations():
+ comment_re = re.compile('#.*$')
+ radius_attr_to_ldap_attr = {}
+ ldap_attr_to_radius_attr = {}
+ try:
+ f = open(LDAP_ATTR_MAP_FILEPATH)
+ for line in f.readlines():
+ line = comment_re.sub('', line).strip()
+ if not line: continue
+ attr_type, radius_attr, ldap_attr = line.split()
+ print 'type="%s" radius="%s" ldap="%s"' % (attr_type, radius_attr, ldap_attr)
+ radius_attr_to_ldap_attr[radius_attr] = {'ldap_attr':ldap_attr, 'attr_type':attr_type}
+ ldap_attr_to_radius_attr[ldap_attr] = {'radius_attr':radius_attr, 'attr_type':attr_type}
+ f.close()
+ except Exception, e:
+ logging.error('cold not read radius ldap attribute map file (%s): %s', LDAP_ATTR_MAP_FILEPATH, e)
+ pass # FIXME
+
+ #for k,v in radius_attr_to_ldap_attr.items():
+ # print '%s --> %s' % (k,v)
+ #for k,v in ldap_attr_to_radius_attr.items():
+ # print '%s --> %s' % (k,v)
+
+def get_secret():
+ valid = False
+ while (not valid):
+ secret = getpass.getpass("Enter Secret: ")
+ confirm = getpass.getpass("Confirm Secret: ")
+ if (secret != confirm):
+ print "Secrets do not match"
+ continue
+ valid = True
+ return secret
+
+#------------------------------------------------------------------------------
+
+def valid_ip_addr(text):
+
+ # is it a dotted octet? If so there should be 4 integers seperated
+ # by a dot and each integer should be between 0 and 255
+ # there may be an optional mask preceded by a slash (e.g. 1.2.3.4/24)
+ match = dotted_octet_re.search(text)
+ if match:
+ # dotted octet notation
+ i = 1
+ while i <= 4:
+ octet = int(match.group(i))
+ if octet > 255: return False
+ i += 1
+ if match.group(5):
+ mask = int(match.group(6))
+ if mask <= 32:
+ return True
+ else:
+ return False
+ return True
+ else:
+ # DNS name, can contain letters, numbers, dot and hypen, must start with a letter
+ if dns_re.search(text): return True
+ return False
+
+def validate_length(value, limits):
+ length = len(value)
+ if length < limits[0] or length > limits[1]:
+ return False
+ return True
+
+def valid_length_msg(name, limits):
+ return "%s length must be at least %d and not more than %d" % (name, limits[0], limits[1])
+
+def err_msg(variable, variable_name=None):
+ if variable_name is None: variable_name = 'value'
+ print "ERROR: %s = %s" % (variable_name, variable)
+
+#------------------------------------------------------------------------------
+
+def validate_ip_addr(ip_addr, variable_name=None):
+ if not validate_length(ip_addr, valid_ip_addr_len):
+ err_msg(ip_addr, variable_name)
+ print valid_length_msg('ip address', valid_ip_addr_len)
+ return False
+ if not valid_ip_addr(ip_addr):
+ err_msg(ip_addr, variable_name)
+ print valid_ip_addr_msg
+ return False
+ return True
+
+def validate_secret(secret, variable_name=None):
+ if not validate_length(secret, valid_secret_len):
+ err_msg(secret, variable_name)
+ print valid_length_msg('secret', valid_secret_len)
+ return False
+ return True
+
+def validate_name(name, variable_name=None):
+ if not validate_length(name, valid_name_len):
+ err_msg(name, variable_name)
+ print valid_length_msg('name', valid_name_len)
+ return False
+ return True
+
+def validate_nastype(nastype, variable_name=None):
+ if not validate_length(nastype, valid_nastype_len):
+ err_msg(nastype, variable_name)
+ print valid_length_msg('NAS Type', valid_nastype_len)
+ return False
+ return True
+
+def validate_desc(desc, variable_name=None):
+ if not ipavalidate.Plain(desc):
+ print valid_desc_msg
+ return False
+ return True
+
+def validate(attribute, value):
+ if attribute == 'Client-IP-Address':
+ return validate_ip_addr(value, attribute)
+ if attribute == 'Secret':
+ return validate_secret(value, attribute)
+ if attribute == 'NAS-Type':
+ return validate_nastype(value, attribute)
+ if attribute == 'Name':
+ return validate_name(value, attribute)
+ if attribute == 'Description':
+ return validate_desc(value, attribute)
+ return True
diff --git a/ipapython/setup.py.in b/ipapython/setup.py.in
new file mode 100644
index 000000000..667c5ae00
--- /dev/null
+++ b/ipapython/setup.py.in
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+"""FreeIPA python support library
+
+FreeIPA is a server for identity, policy, and audit.
+"""
+
+DOCLINES = __doc__.split("\n")
+
+import os
+import sys
+import distutils.sysconfig
+
+CLASSIFIERS = """\
+Development Status :: 4 - Beta
+Intended Audience :: System Environment/Base
+License :: GPL
+Programming Language :: Python
+Operating System :: POSIX
+Operating System :: Unix
+"""
+
+# BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
+# update it when the contents of directories change.
+if os.path.exists('MANIFEST'): os.remove('MANIFEST')
+
+def setup_package():
+
+ from distutils.core import setup
+
+ old_path = os.getcwd()
+ local_path = os.path.dirname(os.path.abspath(sys.argv[0]))
+ os.chdir(local_path)
+ sys.path.insert(0,local_path)
+
+ try:
+ setup(
+ name = "ipapython",
+ version = "__VERSION__",
+ license = "GPL",
+ author = "Karl MacMillan, et.al.",
+ author_email = "kmacmill@redhat.com",
+ maintainer = "freeIPA Developers",
+ maintainer_email = "freeipa-devel@redhat.com",
+ url = "http://www.freeipa.org/",
+ description = DOCLINES[0],
+ long_description = "\n".join(DOCLINES[2:]),
+ download_url = "http://www.freeipa.org/page/Downloads",
+ classifiers=filter(None, CLASSIFIERS.split('\n')),
+ platforms = ["Linux", "Solaris", "Unix"],
+ package_dir = {'ipapython': ''},
+ packages = [ "ipapython" ],
+ data_files = [('/etc/ipa', ['ipa.conf'])]
+ )
+ finally:
+ del sys.path[0]
+ os.chdir(old_path)
+ return
+
+if __name__ == '__main__':
+ setup_package()
diff --git a/ipapython/sysrestore.py b/ipapython/sysrestore.py
new file mode 100644
index 000000000..503f38b24
--- /dev/null
+++ b/ipapython/sysrestore.py
@@ -0,0 +1,317 @@
+# Authors: Mark McLoughlin <markmc@redhat.com>
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+#
+# This module provides a very simple API which allows
+# ipa-xxx-install --uninstall to restore certain
+# parts of the system configuration to the way it was
+# before ipa-server-install was first run
+
+import os
+import os.path
+import errno
+import shutil
+import logging
+import ConfigParser
+import random
+import string
+
+from ipapython import ipautil
+
+SYSRESTORE_PATH = "/tmp"
+SYSRESTORE_INDEXFILE = "sysrestore.index"
+SYSRESTORE_STATEFILE = "sysrestore.state"
+
+class FileStore:
+ """Class for handling backup and restore of files"""
+
+ def __init__(self, path = SYSRESTORE_PATH):
+ """Create a _StoreFiles object, that uses @path as the
+ base directory.
+
+ The file @path/sysrestore.index is used to store information
+ about the original location of the saved files.
+ """
+ self._path = path
+ self._index = self._path + "/" + SYSRESTORE_INDEXFILE
+
+ self.random = random.Random()
+
+ self.files = {}
+ self._load()
+
+ def _load(self):
+ """Load the file list from the index file. @files will
+ be an empty dictionary if the file doesn't exist.
+ """
+
+ logging.debug("Loading Index file from '%s'", self._index)
+
+ self.files = {}
+
+ p = ConfigParser.SafeConfigParser()
+ p.read(self._index)
+
+ for section in p.sections():
+ if section == "files":
+ for (key, value) in p.items(section):
+ self.files[key] = value
+
+
+ def save(self):
+ """Save the file list to @_index. If @files is an empty
+ dict, then @_index should be removed.
+ """
+ logging.debug("Saving Index File to '%s'", self._index)
+
+ if len(self.files) == 0:
+ logging.debug(" -> no files, removing file")
+ if os.path.exists(self._index):
+ os.remove(self._index)
+ return
+
+ p = ConfigParser.SafeConfigParser()
+
+ p.add_section('files')
+ for (key, value) in self.files.items():
+ p.set('files', key, str(value))
+
+ f = file(self._index, "w")
+ p.write(f)
+ f.close()
+
+ def backup_file(self, path):
+ """Create a copy of the file at @path - so long as a copy
+ does not already exist - which will be restored to its
+ original location by restore_files().
+ """
+ logging.debug("Backing up system configuration file '%s'", path)
+
+ if not os.path.isabs(path):
+ raise ValueError("Absolute path required")
+
+ if not os.path.isfile(path):
+ logging.debug(" -> Not backing up - '%s' doesn't exist", path)
+ return
+
+ (reldir, file) = os.path.split(path)
+
+ filename = ""
+ for i in range(8):
+ h = "%02x" % self.random.randint(0,255)
+ filename += h
+ filename += "-"+file
+
+ backup_path = os.path.join(self._path, filename)
+ if os.path.exists(backup_path):
+ logging.debug(" -> Not backing up - already have a copy of '%s'", path)
+ return
+
+ shutil.copy2(path, backup_path)
+
+ stat = os.stat(path)
+
+ self.files[filename] = string.join([str(stat.st_mode),str(stat.st_uid),str(stat.st_gid),path], ',')
+ self.save()
+
+ def restore_file(self, path):
+ """Restore the copy of a file at @path to its original
+ location and delete the copy.
+
+ Returns #True if the file was restored, #False if there
+ was no backup file to restore
+ """
+
+ logging.debug("Restoring system configuration file '%s'", path)
+
+ if not os.path.isabs(path):
+ raise ValueError("Absolute path required")
+
+ mode = None
+ uid = None
+ gid = None
+ filename = None
+
+ for (key, value) in self.files.items():
+ (mode,uid,gid,filepath) = string.split(value, ',', 3)
+ if (filepath == path):
+ filename = key
+ break
+
+ if not filename:
+ raise ValueError("No such file name in the index")
+
+ backup_path = os.path.join(self._path, filename)
+ if not os.path.exists(backup_path):
+ logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path)
+ return False
+
+ shutil.move(backup_path, path)
+ os.chown(path, int(uid), int(gid))
+ os.chmod(path, int(mode))
+
+ ipautil.run(["/sbin/restorecon", path])
+
+ del self.files[filename]
+ self.save()
+
+ return True
+
+ def restore_all_files(self):
+ """Restore the files in the inbdex to their original
+ location and delete the copy.
+
+ Returns #True if the file was restored, #False if there
+ was no backup file to restore
+ """
+
+ if len(self.files) == 0:
+ return False
+
+ for (filename, value) in self.files.items():
+
+ (mode,uid,gid,path) = string.split(value, ',', 3)
+
+ backup_path = os.path.join(self._path, filename)
+ if not os.path.exists(backup_path):
+ logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path)
+
+ shutil.move(backup_path, path)
+ os.chown(path, int(uid), int(gid))
+ os.chmod(path, int(mode))
+
+ ipautil.run(["/sbin/restorecon", path])
+
+ #force file to be deleted
+ self.files = {}
+ self.save()
+
+ return True
+
+class StateFile:
+ """A metadata file for recording system state which can
+ be backed up and later restored. The format is something
+ like:
+
+ [httpd]
+ running=True
+ enabled=False
+ """
+
+ def __init__(self, path = SYSRESTORE_PATH):
+ """Create a StateFile object, loading from @path.
+
+ The dictionary @modules, a member of the returned object,
+ is where the state can be modified. @modules is indexed
+ using a module name to return another dictionary containing
+ key/value pairs with the saved state of that module.
+
+ The keys in these latter dictionaries are arbitrary strings
+ and the values may either be strings or booleans.
+ """
+ self._path = path+"/"+SYSRESTORE_STATEFILE
+
+ self.modules = {}
+
+ self._load()
+
+ def _load(self):
+ """Load the modules from the file @_path. @modules will
+ be an empty dictionary if the file doesn't exist.
+ """
+ logging.debug("Loading StateFile from '%s'", self._path)
+
+ self.modules = {}
+
+ p = ConfigParser.SafeConfigParser()
+ p.read(self._path)
+
+ for module in p.sections():
+ self.modules[module] = {}
+ for (key, value) in p.items(module):
+ if value == str(True):
+ value = True
+ elif value == str(False):
+ value = False
+ self.modules[module][key] = value
+
+ def save(self):
+ """Save the modules to @_path. If @modules is an empty
+ dict, then @_path should be removed.
+ """
+ logging.debug("Saving StateFile to '%s'", self._path)
+
+ for module in self.modules.keys():
+ if len(self.modules[module]) == 0:
+ del self.modules[module]
+
+ if len(self.modules) == 0:
+ logging.debug(" -> no modules, removing file")
+ if os.path.exists(self._path):
+ os.remove(self._path)
+ return
+
+ p = ConfigParser.SafeConfigParser()
+
+ for module in self.modules.keys():
+ p.add_section(module)
+ for (key, value) in self.modules[module].items():
+ p.set(module, key, str(value))
+
+ f = file(self._path, "w")
+ p.write(f)
+ f.close()
+
+ def backup_state(self, module, key, value):
+ """Backup an item of system state from @module, identified
+ by the string @key and with the value @value. @value may be
+ a string or boolean.
+ """
+ if not (isinstance(value, str) or isinstance(value, bool)):
+ raise ValueError("Only strings or booleans supported")
+
+ if not self.modules.has_key(module):
+ self.modules[module] = {}
+
+ if not self.modules.has_key(key):
+ self.modules[module][key] = value
+
+ self.save()
+
+ def restore_state(self, module, key):
+ """Return the value of an item of system state from @module,
+ identified by the string @key, and remove it from the backed
+ up system state.
+
+ If the item doesn't exist, #None will be returned, otherwise
+ the original string or boolean value is returned.
+ """
+
+ if not self.modules.has_key(module):
+ return None
+
+ if not self.modules[module].has_key(key):
+ return None
+
+ value = self.modules[module][key]
+ del self.modules[module][key]
+
+ self.save()
+
+ return value
diff --git a/ipapython/test/test_aci.py b/ipapython/test/test_aci.py
new file mode 100644
index 000000000..fb9d84c7f
--- /dev/null
+++ b/ipapython/test/test_aci.py
@@ -0,0 +1,127 @@
+#! /usr/bin/python -E
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import sys
+sys.path.insert(0, ".")
+
+import unittest
+import aci
+import urllib
+
+
+class TestACI(unittest.TestCase):
+ acitemplate = ('(targetattr="%s")' +
+ '(targetfilter="(memberOf=%s)")' +
+ '(version 3.0;' +
+ 'acl "%s";' +
+ 'allow (write) ' +
+ 'groupdn="ldap:///%s";)')
+
+ def setUp(self):
+ self.aci = aci.ACI()
+
+ def tearDown(self):
+ pass
+
+ def testExport(self):
+ self.aci.source_group = 'cn=foo, dc=freeipa, dc=org'
+ self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org'
+ self.aci.name = 'this is a "name'
+ self.aci.attrs = ['field1', 'field2', 'field3']
+
+ exportaci = self.aci.export_to_string()
+ aci = TestACI.acitemplate % ('field1 || field2 || field3',
+ self.aci.dest_group,
+ 'this is a "name',
+ self.aci.source_group)
+
+ self.assertEqual(aci, exportaci)
+
+ def testURLEncodedExport(self):
+ self.aci.source_group = 'cn=foo " bar, dc=freeipa, dc=org'
+ self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org'
+ self.aci.name = 'this is a "name'
+ self.aci.attrs = ['field1', 'field2', 'field3']
+
+ exportaci = self.aci.export_to_string()
+ aci = TestACI.acitemplate % ('field1 || field2 || field3',
+ self.aci.dest_group,
+ 'this is a "name',
+ urllib.quote(self.aci.source_group, "/=, "))
+
+ self.assertEqual(aci, exportaci)
+
+ def testSimpleParse(self):
+ attr_str = 'field3 || field4 || field5'
+ dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
+ name = 'my name'
+ src_dn = 'cn=srcgroup, dc=freeipa, dc=org'
+
+ acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn)
+ self.aci.parse_acistr(acistr)
+
+ self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs)
+ self.assertEqual(dest_dn, self.aci.dest_group)
+ self.assertEqual(name, self.aci.name)
+ self.assertEqual(src_dn, self.aci.source_group)
+
+ def testUrlEncodedParse(self):
+ attr_str = 'field3 || field4 || field5'
+ dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
+ name = 'my name'
+ src_dn = 'cn=src " group, dc=freeipa, dc=org'
+
+ acistr = TestACI.acitemplate % (attr_str, dest_dn, name,
+ urllib.quote(src_dn, "/=, "))
+ self.aci.parse_acistr(acistr)
+
+ self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs)
+ self.assertEqual(dest_dn, self.aci.dest_group)
+ self.assertEqual(name, self.aci.name)
+ self.assertEqual(src_dn, self.aci.source_group)
+
+ def testInvalidParse(self):
+ try:
+ self.aci.parse_acistr('foo bar')
+ self.fail('Should have failed to parse')
+ except SyntaxError:
+ pass
+
+ try:
+ self.aci.parse_acistr('')
+ self.fail('Should have failed to parse')
+ except SyntaxError:
+ pass
+
+ attr_str = 'field3 || field4 || field5'
+ dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
+ name = 'my name'
+ src_dn = 'cn=srcgroup, dc=freeipa, dc=org'
+
+ acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn)
+ acistr += 'trailing garbage'
+ try:
+ self.aci.parse_acistr('')
+ self.fail('Should have failed to parse')
+ except SyntaxError:
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/ipapython/test/test_ipautil.py b/ipapython/test/test_ipautil.py
new file mode 100644
index 000000000..60d53a270
--- /dev/null
+++ b/ipapython/test/test_ipautil.py
@@ -0,0 +1,309 @@
+#! /usr/bin/python -E
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import sys
+sys.path.insert(0, ".")
+
+import unittest
+import datetime
+
+import ipautil
+
+
+class TestCIDict(unittest.TestCase):
+ def setUp(self):
+ self.cidict = ipautil.CIDict()
+ self.cidict["Key1"] = "val1"
+ self.cidict["key2"] = "val2"
+ self.cidict["KEY3"] = "VAL3"
+
+ def tearDown(self):
+ pass
+
+ def testLen(self):
+ self.assertEqual(3, len(self.cidict))
+
+ def test__GetItem(self):
+ self.assertEqual("val1", self.cidict["Key1"])
+ self.assertEqual("val1", self.cidict["key1"])
+ self.assertEqual("val2", self.cidict["KEY2"])
+ self.assertEqual("VAL3", self.cidict["key3"])
+ self.assertEqual("VAL3", self.cidict["KEY3"])
+ try:
+ self.cidict["key4"]
+ fail("should have raised KeyError")
+ except KeyError:
+ pass
+
+ def testGet(self):
+ self.assertEqual("val1", self.cidict.get("Key1"))
+ self.assertEqual("val1", self.cidict.get("key1"))
+ self.assertEqual("val2", self.cidict.get("KEY2"))
+ self.assertEqual("VAL3", self.cidict.get("key3"))
+ self.assertEqual("VAL3", self.cidict.get("KEY3"))
+ self.assertEqual("default", self.cidict.get("key4", "default"))
+
+ def test__SetItem(self):
+ self.cidict["key4"] = "val4"
+ self.assertEqual("val4", self.cidict["key4"])
+ self.cidict["KEY4"] = "newval4"
+ self.assertEqual("newval4", self.cidict["key4"])
+
+ def testDel(self):
+ self.assert_(self.cidict.has_key("Key1"))
+ del(self.cidict["Key1"])
+ self.failIf(self.cidict.has_key("Key1"))
+
+ self.assert_(self.cidict.has_key("key2"))
+ del(self.cidict["KEY2"])
+ self.failIf(self.cidict.has_key("key2"))
+
+ def testClear(self):
+ self.assertEqual(3, len(self.cidict))
+ self.cidict.clear()
+ self.assertEqual(0, len(self.cidict))
+
+ def testCopy(self):
+ """A copy is no longer a CIDict, but should preserve the case of
+ the keys as they were inserted."""
+ copy = self.cidict.copy()
+ self.assertEqual(3, len(copy))
+ self.assert_(copy.has_key("Key1"))
+ self.assertEqual("val1", copy["Key1"])
+ self.failIf(copy.has_key("key1"))
+
+ def testHasKey(self):
+ self.assert_(self.cidict.has_key("KEY1"))
+ self.assert_(self.cidict.has_key("key2"))
+ self.assert_(self.cidict.has_key("key3"))
+
+ def testItems(self):
+ items = self.cidict.items()
+ self.assertEqual(3, len(items))
+ items_set = set(items)
+ self.assert_(("Key1", "val1") in items_set)
+ self.assert_(("key2", "val2") in items_set)
+ self.assert_(("KEY3", "VAL3") in items_set)
+
+ def testIterItems(self):
+ items = []
+ for (k,v) in self.cidict.iteritems():
+ items.append((k,v))
+ self.assertEqual(3, len(items))
+ items_set = set(items)
+ self.assert_(("Key1", "val1") in items_set)
+ self.assert_(("key2", "val2") in items_set)
+ self.assert_(("KEY3", "VAL3") in items_set)
+
+ def testIterKeys(self):
+ keys = []
+ for k in self.cidict.iterkeys():
+ keys.append(k)
+ self.assertEqual(3, len(keys))
+ keys_set = set(keys)
+ self.assert_("Key1" in keys_set)
+ self.assert_("key2" in keys_set)
+ self.assert_("KEY3" in keys_set)
+
+ def testIterValues(self):
+ values = []
+ for k in self.cidict.itervalues():
+ values.append(k)
+ self.assertEqual(3, len(values))
+ values_set = set(values)
+ self.assert_("val1" in values_set)
+ self.assert_("val2" in values_set)
+ self.assert_("VAL3" in values_set)
+
+ def testKeys(self):
+ keys = self.cidict.keys()
+ self.assertEqual(3, len(keys))
+ keys_set = set(keys)
+ self.assert_("Key1" in keys_set)
+ self.assert_("key2" in keys_set)
+ self.assert_("KEY3" in keys_set)
+
+ def testValues(self):
+ values = self.cidict.values()
+ self.assertEqual(3, len(values))
+ values_set = set(values)
+ self.assert_("val1" in values_set)
+ self.assert_("val2" in values_set)
+ self.assert_("VAL3" in values_set)
+
+ def testUpdate(self):
+ newdict = { "KEY2": "newval2",
+ "key4": "val4" }
+ self.cidict.update(newdict)
+ self.assertEqual(4, len(self.cidict))
+
+ items = self.cidict.items()
+ self.assertEqual(4, len(items))
+ items_set = set(items)
+ self.assert_(("Key1", "val1") in items_set)
+ # note the update "overwrites" the case of the key2
+ self.assert_(("KEY2", "newval2") in items_set)
+ self.assert_(("KEY3", "VAL3") in items_set)
+ self.assert_(("key4", "val4") in items_set)
+
+ def testSetDefault(self):
+ self.assertEqual("val1", self.cidict.setdefault("KEY1", "default"))
+
+ self.failIf(self.cidict.has_key("KEY4"))
+ self.assertEqual("default", self.cidict.setdefault("KEY4", "default"))
+ self.assert_(self.cidict.has_key("KEY4"))
+ self.assertEqual("default", self.cidict["key4"])
+
+ self.failIf(self.cidict.has_key("KEY5"))
+ self.assertEqual(None, self.cidict.setdefault("KEY5"))
+ self.assert_(self.cidict.has_key("KEY5"))
+ self.assertEqual(None, self.cidict["key5"])
+
+ def testPop(self):
+ self.assertEqual("val1", self.cidict.pop("KEY1", "default"))
+ self.failIf(self.cidict.has_key("key1"))
+
+ self.assertEqual("val2", self.cidict.pop("KEY2"))
+ self.failIf(self.cidict.has_key("key2"))
+
+ self.assertEqual("default", self.cidict.pop("key4", "default"))
+ try:
+ self.cidict.pop("key4")
+ fail("should have raised KeyError")
+ except KeyError:
+ pass
+
+ def testPopItem(self):
+ items = set(self.cidict.items())
+ self.assertEqual(3, len(self.cidict))
+
+ item = self.cidict.popitem()
+ self.assertEqual(2, len(self.cidict))
+ self.assert_(item in items)
+ items.discard(item)
+
+ item = self.cidict.popitem()
+ self.assertEqual(1, len(self.cidict))
+ self.assert_(item in items)
+ items.discard(item)
+
+ item = self.cidict.popitem()
+ self.assertEqual(0, len(self.cidict))
+ self.assert_(item in items)
+ items.discard(item)
+
+class TestTimeParser(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def testSimple(self):
+ timestr = "20070803"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(2007, time.year)
+ self.assertEqual(8, time.month)
+ self.assertEqual(3, time.day)
+ self.assertEqual(0, time.hour)
+ self.assertEqual(0, time.minute)
+ self.assertEqual(0, time.second)
+
+ def testHourMinSec(self):
+ timestr = "20051213141205"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(2005, time.year)
+ self.assertEqual(12, time.month)
+ self.assertEqual(13, time.day)
+ self.assertEqual(14, time.hour)
+ self.assertEqual(12, time.minute)
+ self.assertEqual(5, time.second)
+
+ def testFractions(self):
+ timestr = "2003092208.5"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(2003, time.year)
+ self.assertEqual(9, time.month)
+ self.assertEqual(22, time.day)
+ self.assertEqual(8, time.hour)
+ self.assertEqual(30, time.minute)
+ self.assertEqual(0, time.second)
+
+ timestr = "199203301544,25"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(1992, time.year)
+ self.assertEqual(3, time.month)
+ self.assertEqual(30, time.day)
+ self.assertEqual(15, time.hour)
+ self.assertEqual(44, time.minute)
+ self.assertEqual(15, time.second)
+
+ timestr = "20060401185912,8"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(2006, time.year)
+ self.assertEqual(4, time.month)
+ self.assertEqual(1, time.day)
+ self.assertEqual(18, time.hour)
+ self.assertEqual(59, time.minute)
+ self.assertEqual(12, time.second)
+ self.assertEqual(800000, time.microsecond)
+
+ def testTimeZones(self):
+ timestr = "20051213141205Z"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(0, time.tzinfo.houroffset)
+ self.assertEqual(0, time.tzinfo.minoffset)
+ offset = time.tzinfo.utcoffset(None)
+ self.assertEqual(0, offset.seconds)
+
+ timestr = "20051213141205+0500"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(5, time.tzinfo.houroffset)
+ self.assertEqual(0, time.tzinfo.minoffset)
+ offset = time.tzinfo.utcoffset(None)
+ self.assertEqual(5 * 60 * 60, offset.seconds)
+
+ timestr = "20051213141205-0500"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(-5, time.tzinfo.houroffset)
+ self.assertEqual(0, time.tzinfo.minoffset)
+ # NOTE - the offset is always positive - it's minutes
+ # _east_ of UTC
+ offset = time.tzinfo.utcoffset(None)
+ self.assertEqual((24 - 5) * 60 * 60, offset.seconds)
+
+ timestr = "20051213141205-0930"
+
+ time = ipautil.parse_generalized_time(timestr)
+ self.assertEqual(-9, time.tzinfo.houroffset)
+ self.assertEqual(-30, time.tzinfo.minoffset)
+ offset = time.tzinfo.utcoffset(None)
+ self.assertEqual(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/ipapython/test/test_ipavalidate.py b/ipapython/test/test_ipavalidate.py
new file mode 100644
index 000000000..8b79fbf07
--- /dev/null
+++ b/ipapython/test/test_ipavalidate.py
@@ -0,0 +1,97 @@
+#! /usr/bin/python -E
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+import sys
+sys.path.insert(0, ".")
+
+import unittest
+
+import ipavalidate
+
+class TestValidate(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_validEmail(self):
+ self.assertEqual(True, ipavalidate.Email("test@freeipa.org"))
+ self.assertEqual(True, ipavalidate.Email("", notEmpty=False))
+
+ def test_invalidEmail(self):
+ self.assertEqual(False, ipavalidate.Email("test"))
+ self.assertEqual(False, ipavalidate.Email("test@freeipa"))
+ self.assertEqual(False, ipavalidate.Email("test@.com"))
+ self.assertEqual(False, ipavalidate.Email(""))
+ self.assertEqual(False, ipavalidate.Email(None))
+
+ def test_validPlain(self):
+ self.assertEqual(True, ipavalidate.Plain("Joe User"))
+ self.assertEqual(True, ipavalidate.Plain("Joe O'Malley"))
+ self.assertEqual(True, ipavalidate.Plain("", notEmpty=False))
+ self.assertEqual(True, ipavalidate.Plain(None, notEmpty=False))
+ self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=False))
+ self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=True))
+
+ def test_invalidPlain(self):
+ self.assertEqual(False, ipavalidate.Plain("Joe (User)"))
+ self.assertEqual(False, ipavalidate.Plain("Joe C. User"))
+ self.assertEqual(False, ipavalidate.Plain("", notEmpty=True))
+ self.assertEqual(False, ipavalidate.Plain(None, notEmpty=True))
+ self.assertEqual(False, ipavalidate.Plain("Joe User", allowSpaces=False))
+ self.assertEqual(False, ipavalidate.Plain("Joe C. User"))
+
+ def test_validString(self):
+ self.assertEqual(True, ipavalidate.String("Joe User"))
+ self.assertEqual(True, ipavalidate.String("Joe O'Malley"))
+ self.assertEqual(True, ipavalidate.String("", notEmpty=False))
+ self.assertEqual(True, ipavalidate.String(None, notEmpty=False))
+ self.assertEqual(True, ipavalidate.String("Joe C. User"))
+
+ def test_invalidString(self):
+ self.assertEqual(False, ipavalidate.String("", notEmpty=True))
+ self.assertEqual(False, ipavalidate.String(None, notEmpty=True))
+
+ def test_validPath(self):
+ self.assertEqual(True, ipavalidate.Path("/"))
+ self.assertEqual(True, ipavalidate.Path("/home/user"))
+ self.assertEqual(True, ipavalidate.Path("../home/user"))
+ self.assertEqual(True, ipavalidate.Path("", notEmpty=False))
+ self.assertEqual(True, ipavalidate.Path(None, notEmpty=False))
+
+ def test_invalidPath(self):
+ self.assertEqual(False, ipavalidate.Path("(foo)"))
+ self.assertEqual(False, ipavalidate.Path("", notEmpty=True))
+ self.assertEqual(False, ipavalidate.Path(None, notEmpty=True))
+
+ def test_validName(self):
+ self.assertEqual(True, ipavalidate.GoodName("foo"))
+ self.assertEqual(True, ipavalidate.GoodName("1foo"))
+ self.assertEqual(True, ipavalidate.GoodName("foo.bar"))
+ self.assertEqual(True, ipavalidate.GoodName("foo.bar$"))
+
+ def test_invalidName(self):
+ self.assertEqual(False, ipavalidate.GoodName("foo bar"))
+ self.assertEqual(False, ipavalidate.GoodName("foo%bar"))
+ self.assertEqual(False, ipavalidate.GoodName("*foo"))
+ self.assertEqual(False, ipavalidate.GoodName("$foo.bar$"))
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/ipapython/version.py.in b/ipapython/version.py.in
new file mode 100644
index 000000000..fdb689f02
--- /dev/null
+++ b/ipapython/version.py.in
@@ -0,0 +1,25 @@
+# Authors: Rob Crittenden <rcritten@redhat.com>
+#
+# Copyright (C) 2007 Red Hat
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; version 2 only
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+# The full version including strings
+VERSION="__VERSION__"
+
+# Just the numeric portion of the version so one can do direct numeric
+# comparisons to see if the API is compatible.
+NUM_VERSION=__NUM_VERSION__