summaryrefslogtreecommitdiffstats
path: root/ipa-python
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2009-02-05 15:03:08 -0500
committerRob Crittenden <rcritten@redhat.com>2009-02-09 14:35:15 -0500
commit262ff2d731b1bfc4acd91153088b8fcde7ae92b8 (patch)
treebaf8894d4b357b610113b87d4bfee84de24f08bd /ipa-python
parent58ae191a5afbf29d78afd3969f8d106415897958 (diff)
downloadfreeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.gz
freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.tar.xz
freeipa-262ff2d731b1bfc4acd91153088b8fcde7ae92b8.zip
Rename ipa-python directory to ipapython so it is a real python library
We used to install it as ipa, now installing it as ipapython. The rpm is still ipa-python.
Diffstat (limited to 'ipa-python')
-rw-r--r--ipa-python/MANIFEST.in3
-rw-r--r--ipa-python/Makefile28
-rw-r--r--ipa-python/README18
-rw-r--r--ipa-python/__init__.py0
-rw-r--r--ipa-python/config.py183
-rw-r--r--ipa-python/dnsclient.py443
-rw-r--r--ipa-python/entity.py202
-rwxr-xr-xipa-python/ipa-python.spec.in82
-rw-r--r--ipa-python/ipa.conf3
-rw-r--r--ipa-python/ipautil.py969
-rw-r--r--ipa-python/ipavalidate.py137
-rw-r--r--ipa-python/radius_util.py366
-rw-r--r--ipa-python/setup.py.in77
-rw-r--r--ipa-python/sysrestore.py317
-rw-r--r--ipa-python/test/test_aci.py127
-rw-r--r--ipa-python/test/test_ipautil.py309
-rw-r--r--ipa-python/test/test_ipavalidate.py97
-rw-r--r--ipa-python/version.py.in25
18 files changed, 0 insertions, 3386 deletions
diff --git a/ipa-python/MANIFEST.in b/ipa-python/MANIFEST.in
deleted file mode 100644
index e2cad6f22..000000000
--- a/ipa-python/MANIFEST.in
+++ /dev/null
@@ -1,3 +0,0 @@
-include *.conf
-include ipa-python.spec*
-
diff --git a/ipa-python/Makefile b/ipa-python/Makefile
deleted file mode 100644
index 4ac027e14..000000000
--- a/ipa-python/Makefile
+++ /dev/null
@@ -1,28 +0,0 @@
-PYTHONLIBDIR ?= $(shell python -c "from distutils.sysconfig import *; print get_python_lib()")
-PACKAGEDIR ?= $(DESTDIR)/$(PYTHONLIBDIR)/ipa
-CONFIGDIR ?= $(DESTDIR)/etc/ipa
-TESTS = $(wildcard test/*.py)
-
-all: ;
-
-install:
- if [ "$(DESTDIR)" = "" ]; then \
- python setup.py install; \
- else \
- python setup.py install --root $(DESTDIR); \
- fi
-
-clean:
- rm -f *~ *.pyc
-
-distclean: clean
- rm -f setup.py ipa-python.spec version.py
-
-maintainer-clean: distclean
- rm -rf build
-
-.PHONY: test
-test: $(subst .py,.tst,$(TESTS))
-
-%.tst: %.py
- python $<
diff --git a/ipa-python/README b/ipa-python/README
deleted file mode 100644
index c966e44bf..000000000
--- a/ipa-python/README
+++ /dev/null
@@ -1,18 +0,0 @@
-This is a set of libraries common to IPA clients and servers though mostly
-geared currently towards command-line tools.
-
-A brief overview:
-
-config.py - identify the IPA server domain and realm. It uses dnsclient to
- try to detect this information first and will fall back to
- /etc/ipa/ipa.conf if that fails.
-dnsclient.py - find IPA information via DNS
-
-ipautil.py - helper functions
-
-radius_util.py - helper functions for Radius
-
-entity.py - entity is the main data type. User and Group extend this class
- (but don't add anything currently).
-
-ipavalidate.py - basic data validation routines
diff --git a/ipa-python/__init__.py b/ipa-python/__init__.py
deleted file mode 100644
index e69de29bb..000000000
--- a/ipa-python/__init__.py
+++ /dev/null
diff --git a/ipa-python/config.py b/ipa-python/config.py
deleted file mode 100644
index 8755f628c..000000000
--- a/ipa-python/config.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# Authors: Karl MacMillan <kmacmill@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import ConfigParser
-from optparse import OptionParser, IndentedHelpFormatter
-
-import krbV
-import socket
-import ipa.dnsclient
-import re
-
-class IPAConfigError(Exception):
- def __init__(self, msg=''):
- self.msg = msg
- Exception.__init__(self, msg)
-
- def __repr__(self):
- return self.msg
-
- __str__ = __repr__
-
-class IPAFormatter(IndentedHelpFormatter):
- """Our own optparse formatter that indents multiple lined usage string."""
- def format_usage(self, usage):
- usage_string = "Usage:"
- spacing = " " * len(usage_string)
- lines = usage.split("\n")
- ret = "%s %s\n" % (usage_string, lines[0])
- for line in lines[1:]:
- ret += "%s %s\n" % (spacing, line)
- return ret
-
-def verify_args(parser, args, needed_args = None):
- """Verify that we have all positional arguments we need, if not, exit."""
- if needed_args:
- needed_list = needed_args.split(" ")
- else:
- needed_list = []
- len_need = len(needed_list)
- len_have = len(args)
- if len_have > len_need:
- parser.error("too many arguments")
- elif len_have < len_need:
- parser.error("no %s specified" % needed_list[len_have])
-
-class IPAConfig:
- def __init__(self):
- self.default_realm = None
- self.default_server = []
- self.default_domain = None
-
- def get_realm(self):
- if self.default_realm:
- return self.default_realm
- else:
- raise IPAConfigError("no default realm")
-
- def get_server(self):
- if len(self.default_server):
- return self.default_server
- else:
- raise IPAConfigError("no default server")
-
- def get_domain(self):
- if self.default_domain:
- return self.default_domain
- else:
- raise IPAConfigError("no default domain")
-
-# Global library config
-config = IPAConfig()
-
-def __parse_config(discover_server = True):
- p = ConfigParser.SafeConfigParser()
- p.read("/etc/ipa/ipa.conf")
-
- try:
- if not config.default_realm:
- config.default_realm = p.get("defaults", "realm")
- except:
- pass
- if discover_server:
- try:
- s = p.get("defaults", "server")
- config.default_server.extend(re.sub("\s+", "", s).split(','))
- except:
- pass
- try:
- if not config.default_domain:
- config.default_domain = p.get("defaults", "domain")
- except:
- pass
-
-def __discover_config(discover_server = True):
- rl = 0
- try:
- if not config.default_realm:
- krbctx = krbV.default_context()
- config.default_realm = krbctx.default_realm
- if not config.default_realm:
- return False
-
- if not config.default_domain:
- #try once with REALM -> domain
- dom_name = config.default_realm.lower()
- name = "_ldap._tcp."+dom_name+"."
- rs = ipa.dnsclient.query(name, ipa.dnsclient.DNS_C_IN, ipa.dnsclient.DNS_T_SRV)
- rl = len(rs)
- if rl == 0:
- #try cycling on domain components of FQDN
- dom_name = socket.getfqdn()
- while rl == 0:
- tok = dom_name.find(".")
- if tok == -1:
- return False
- dom_name = dom_name[tok+1:]
- name = "_ldap._tcp." + dom_name + "."
- rs = ipa.dnsclient.query(name, ipa.dnsclient.DNS_C_IN, ipa.dnsclient.DNS_T_SRV)
- rl = len(rs)
-
- config.default_domain = dom_name
-
- if discover_server:
- if rl == 0:
- name = "_ldap._tcp."+config.default_domain+"."
- rs = ipa.dnsclient.query(name, ipa.dnsclient.DNS_C_IN, ipa.dnsclient.DNS_T_SRV)
-
- for r in rs:
- if r.dns_type == ipa.dnsclient.DNS_T_SRV:
- rsrv = r.rdata.server.rstrip(".")
- config.default_server.append(rsrv)
-
- except:
- pass
-
-def add_standard_options(parser):
- parser.add_option("--realm", dest="realm", help="Override default IPA realm")
- parser.add_option("--server", dest="server", help="Override default IPA server")
- parser.add_option("--domain", dest="domain", help="Override default IPA DNS domain")
-
-def init_config(options=None):
- if options:
- config.default_realm = options.realm
- config.default_domain = options.domain
- if options.server:
- config.default_server.extend(options.server.split(","))
-
- if len(config.default_server):
- discover_server = False
- else:
- discover_server = True
- __parse_config(discover_server)
- __discover_config(discover_server)
-
- # make sure the server list only contains unique items
- new_server = []
- for server in config.default_server:
- if server not in new_server:
- new_server.append(server)
- config.default_server = new_server
-
- if not config.default_realm:
- raise IPAConfigError("IPA realm not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.")
- if not config.default_server:
- raise IPAConfigError("IPA server not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.")
- if not config.default_domain:
- raise IPAConfigError("IPA domain not found in the config file (/etc/ipa/ipa.conf) or on the command line.")
diff --git a/ipa-python/dnsclient.py b/ipa-python/dnsclient.py
deleted file mode 100644
index 58d93d850..000000000
--- a/ipa-python/dnsclient.py
+++ /dev/null
@@ -1,443 +0,0 @@
-#
-# Copyright 2001, 2005 Red Hat, Inc.
-#
-# This is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 only
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-#
-
-import struct
-import socket
-import sys
-
-import acutil
-
-DNS_C_IN = 1
-DNS_C_CS = 2
-DNS_C_CHAOS = 3
-DNS_C_HS = 4
-DNS_C_ANY = 255
-
-DNS_T_A = 1
-DNS_T_NS = 2
-DNS_T_CNAME = 5
-DNS_T_SOA = 6
-DNS_T_NULL = 10
-DNS_T_WKS = 11
-DNS_T_PTR = 12
-DNS_T_HINFO = 13
-DNS_T_MX = 15
-DNS_T_TXT = 16
-DNS_T_SRV = 33
-DNS_T_ANY = 255
-
-DEBUG_DNSCLIENT = False
-
-class DNSQueryHeader:
- FORMAT = "!HBBHHHH"
- def __init__(self):
- self.dns_id = 0
- self.dns_rd = 0
- self.dns_tc = 0
- self.dns_aa = 0
- self.dns_opcode = 0
- self.dns_qr = 0
- self.dns_rcode = 0
- self.dns_z = 0
- self.dns_ra = 0
- self.dns_qdcount = 0
- self.dns_ancount = 0
- self.dns_nscount = 0
- self.dns_arcount = 0
-
- def pack(self):
- return struct.pack(DNSQueryHeader.FORMAT,
- self.dns_id,
- (self.dns_rd & 1) |
- (self.dns_tc & 1) << 1 |
- (self.dns_aa & 1) << 2 |
- (self.dns_opcode & 15) << 3 |
- (self.dns_qr & 1) << 7,
- (self.dns_rcode & 15) |
- (self.dns_z & 7) << 4 |
- (self.dns_ra & 1) << 7,
- self.dns_qdcount,
- self.dns_ancount,
- self.dns_nscount,
- self.dns_arcount)
-
- def unpack(self, data):
- (self.dns_id, byte1, byte2, self.dns_qdcount, self.dns_ancount,
- self.dns_nscount, self.dns_arcount) = struct.unpack(DNSQueryHeader.FORMAT, data[0:self.size()])
- self.dns_rd = byte1 & 1
- self.dns_tc = (byte1 >> 1) & 1
- self.dns_aa = (byte1 >> 2) & 1
- self.dns_opcode = (byte1 >> 3) & 15
- self.dns_qr = (byte1 >> 7) & 1
- self.dns_rcode = byte2 & 15
- self.dns_z = (byte2 >> 4) & 7
- self.dns_ra = (byte1 >> 7) & 1
-
- def size(self):
- return struct.calcsize(DNSQueryHeader.FORMAT)
-
-def unpackQueryHeader(data):
- header = DNSQueryHeader()
- header.unpack(data)
- return header
-
-class DNSResult:
- FORMAT = "!HHIH"
- QFORMAT = "!HH"
- def __init__(self):
- self.dns_name = ""
- self.dns_type = 0
- self.dns_class = 0
- self.dns_ttl = 0
- self.dns_rlength = 0
- self.rdata = None
-
- def unpack(self, data):
- (self.dns_type, self.dns_class, self.dns_ttl,
- self.dns_rlength) = struct.unpack(DNSResult.FORMAT, data[0:self.size()])
-
- def qunpack(self, data):
- (self.dns_type, self.dns_class) = struct.unpack(DNSResult.QFORMAT, data[0:self.qsize()])
-
- def size(self):
- return struct.calcsize(DNSResult.FORMAT)
-
- def qsize(self):
- return struct.calcsize(DNSResult.QFORMAT)
-
-class DNSRData:
- def __init__(self):
- pass
-
-#typedef struct dns_rr_a {
-# u_int32_t address;
-#} dns_rr_a_t;
-#
-#typedef struct dns_rr_cname {
-# const char *cname;
-#} dns_rr_cname_t;
-#
-#typedef struct dns_rr_hinfo {
-# const char *cpu, *os;
-#} dns_rr_hinfo_t;
-#
-#typedef struct dns_rr_mx {
-# u_int16_t preference;
-# const char *exchange;
-#} dns_rr_mx_t;
-#
-#typedef struct dns_rr_null {
-# unsigned const char *data;
-#} dns_rr_null_t;
-#
-#typedef struct dns_rr_ns {
-# const char *nsdname;
-#} dns_rr_ns_t;
-#
-#typedef struct dns_rr_ptr {
-# const char *ptrdname;
-#} dns_rr_ptr_t;
-#
-#typedef struct dns_rr_soa {
-# const char *mname;
-# const char *rname;
-# u_int32_t serial;
-# int32_t refresh;
-# int32_t retry;
-# int32_t expire;
-# int32_t minimum;
-#} dns_rr_soa_t;
-#
-#typedef struct dns_rr_txt {
-# const char *data;
-#} dns_rr_txt_t;
-#
-#typedef struct dns_rr_srv {
-# const char *server;
-# u_int16_t priority;
-# u_int16_t weight;
-# u_int16_t port;
-#} dns_rr_srv_t;
-
-def dnsNameToLabel(name):
- out = ""
- name = name.split(".")
- for part in name:
- out += chr(len(part)) + part
- return out
-
-def dnsFormatQuery(query, qclass, qtype):
- header = DNSQueryHeader()
-
- header.dns_id = 0 # FIXME: id = 0
- header.dns_rd = 1 # don't know why the original code didn't request recursion for non SOA requests
- header.dns_qr = 0 # query
- header.dns_opcode = 0 # standard query
- header.dns_qdcount = 1 # single query
-
- qlabel = dnsNameToLabel(query)
- if not qlabel:
- return ""
-
- out = header.pack() + qlabel
- out += chr(qtype >> 8)
- out += chr(qtype & 0xff)
- out += chr(qclass >> 8)
- out += chr(qclass & 0xff)
-
- return out
-
-def dnsParseLabel(label, base):
- # returns (output, rest)
- if not label:
- return ("", None)
-
- update = 1
- rest = label
- output = ""
- skip = 0
-
- try:
- while ord(rest[0]):
- if ord(rest[0]) & 0xc0:
- rest = base[((ord(rest[0]) & 0x3f) << 8) + ord(rest[1]):]
- if update:
- skip += 2
- update = 0
- continue
- output += rest[1:ord(rest[0]) + 1] + "."
- if update:
- skip += ord(rest[0]) + 1
- rest = rest[ord(rest[0]) + 1:]
- except IndexError:
- return ("", None)
- return (label[skip+update:], output)
-
-def dnsParseA(data, base):
- rdata = DNSRData()
- if len(data) < 4:
- rdata.address = 0
- return None
-
- rdata.address = (ord(data[0])<<24) | (ord(data[1])<<16) | (ord(data[2])<<8) | (ord(data[3])<<0)
-
- if DEBUG_DNSCLIENT:
- print "A = %d.%d.%d.%d." % (ord(data[0]), ord(data[1]), ord(data[2]), ord(data[3]))
- return rdata
-
-def dnsParseText(data):
- if len(data) < 1:
- return ("", None)
- tlen = ord(data[0])
- if len(data) < tlen + 1:
- return ("", None)
- return (data[tlen+1:], data[1:tlen+1])
-
-def dnsParseNS(data, base):
- rdata = DNSRData()
- (rest, rdata.nsdname) = dnsParseLabel(data, base)
- if DEBUG_DNSCLIENT:
- print "NS DNAME = \"%s\"." % (rdata.nsdname)
- return rdata
-
-def dnsParseCNAME(data, base):
- rdata = DNSRData()
- (rest, rdata.cname) = dnsParseLabel(data, base)
- if DEBUG_DNSCLIENT:
- print "CNAME = \"%s\"." % (rdata.cname)
- return rdata
-
-def dnsParseSOA(data, base):
- rdata = DNSRData()
- format = "!IIIII"
-
- (rest, rdata.mname) = dnsParseLabel(data, base)
- if rdata.mname is None:
- return None
- (rest, rdata.rname) = dnsParseLabel(rest, base)
- if rdata.rname is None:
- return None
- if len(rest) < struct.calcsize(format):
- return None
-
- (rdata.serial, rdata.refresh, rdata.retry, rdata.expire,
- rdata.minimum) = struct.unpack(format, rest[:struct.calcsize(format)])
-
- if DEBUG_DNSCLIENT:
- print "SOA(mname) = \"%s\"." % rdata.mname
- print "SOA(rname) = \"%s\"." % rdata.rname
- print "SOA(serial) = %d." % rdata.serial
- print "SOA(refresh) = %d." % rdata.refresh
- print "SOA(retry) = %d." % rdata.retry
- print "SOA(expire) = %d." % rdata.expire
- print "SOA(minimum) = %d." % rdata.minimum
- return rdata
-
-def dnsParseNULL(data, base):
- # um, yeah
- return None
-
-def dnsParseWKS(data, base):
- return None
-
-def dnsParseHINFO(data, base):
- rdata = DNSRData()
- (rest, rdata.cpu) = dnsParseText(data)
- if rest:
- (rest, rdata.os) = dnsParseText(rest)
- if DEBUG_DNSCLIENT:
- print "HINFO(cpu) = \"%s\"." % rdata.cpu
- print "HINFO(os) = \"%s\"." % rdata.os
- return rdata
-
-def dnsParseMX(data, base):
- rdata = DNSRData()
- if len(data) < 2:
- return None
- rdata.preference = (ord(data[0]) << 8) | ord(data[1])
- (rest, rdata.exchange) = dnsParseLabel(data[2:], base)
- if DEBUG_DNSCLIENT:
- print "MX(exchanger) = \"%s\"." % rdata.exchange
- print "MX(preference) = %d." % rdata.preference
- return rdata
-
-def dnsParseTXT(data, base):
- rdata = DNSRData()
- (rest, rdata.data) = dnsParseText(data)
- if DEBUG_DNSCLIENT:
- print "TXT = \"%s\"." % rdata.data
- return rdata
-
-def dnsParsePTR(data, base):
- rdata = DNSRData()
- (rest, rdata.ptrdname) = dnsParseLabel(data, base)
- if DEBUG_DNSCLIENT:
- print "PTR = \"%s\"." % rdata.ptrdname
- return rdata
-
-def dnsParseSRV(data, base):
- rdata = DNSRData()
- format = "!HHH"
- flen = struct.calcsize(format)
- if len(data) < flen:
- return None
-
- (rdata.priority, rdata.weight, rdata.port) = struct.unpack(format, data[:flen])
- (rest, rdata.server) = dnsParseLabel(data[flen:], base)
- if DEBUG_DNSCLIENT:
- print "SRV(server) = \"%s\"." % rdata.server
- print "SRV(weight) = %d." % rdata.weight
- print "SRV(priority) = %d." % rdata.priority
- print "SRV(port) = %d." % rdata.port
- return rdata
-
-def dnsParseResults(results):
- try:
- header = unpackQueryHeader(results)
- except struct.error:
- return []
-
- if header.dns_qr != 1: # should be a response
- return []
-
- if header.dns_rcode != 0: # should be no error
- return []
-
- rest = results[header.size():]
-
- rrlist = []
-
- for i in xrange(header.dns_qdcount):
- if not rest:
- return []
-
- qq = DNSResult()
-
- (rest, label) = dnsParseLabel(rest, results)
- if label is None:
- return []
-
- if len(rest) < qq.qsize():
- return []
-
- qq.qunpack(rest)
-
- rest = rest[qq.qsize():]
-
- if DEBUG_DNSCLIENT:
- print "Queried for '%s', class = %d, type = %d." % (label,
- qq.dns_class, qq.dns_type)
-
- for i in xrange(header.dns_ancount + header.dns_nscount + header.dns_arcount):
- (rest, label) = dnsParseLabel(rest, results)
- if label is None:
- return []
-
- rr = DNSResult()
-
- rr.dns_name = label
-
- if len(rest) < rr.size():
- return []
-
- rr.unpack(rest)
-
- rest = rest[rr.size():]
-
- if DEBUG_DNSCLIENT:
- print "Answer %d for '%s', class = %d, type = %d, ttl = %d." % (i,
- rr.dns_name, rr.dns_class, rr.dns_type,
- rr.dns_ttl)
-
- if len(rest) < rr.dns_rlength:
- if DEBUG_DNSCLIENT:
- print "Answer too short."
- return []
-
- fmap = { DNS_T_A: dnsParseA, DNS_T_NS: dnsParseNS,
- DNS_T_CNAME: dnsParseCNAME, DNS_T_SOA: dnsParseSOA,
- DNS_T_NULL: dnsParseNULL, DNS_T_WKS: dnsParseWKS,
- DNS_T_PTR: dnsParsePTR, DNS_T_HINFO: dnsParseHINFO,
- DNS_T_MX: dnsParseMX, DNS_T_TXT: dnsParseTXT,
- DNS_T_SRV: dnsParseSRV}
-
- if not rr.dns_type in fmap:
- if DEBUG_DNSCLIENT:
- print "Don't know how to parse RR type %d!" % rr.dns_type
- else:
- rr.rdata = fmap[rr.dns_type](rest[:rr.dns_rlength], results)
-
- rest = rest[rr.dns_rlength:]
- rrlist += [rr]
-
- return rrlist
-
-def query(query, qclass, qtype):
- qdata = dnsFormatQuery(query, qclass, qtype)
- if not qdata:
- return []
- answer = acutil.res_send(qdata)
- if not answer:
- return []
- return dnsParseResults(answer)
-
-if __name__ == '__main__':
- DEBUG_DNSCLIENT = True
- print "Sending query."
- rr = query(len(sys.argv) > 1 and sys.argv[1] or "devserv.devel.redhat.com.",
- DNS_C_IN, DNS_T_ANY)
- sys.exit(0)
diff --git a/ipa-python/entity.py b/ipa-python/entity.py
deleted file mode 100644
index 64db350ba..000000000
--- a/ipa-python/entity.py
+++ /dev/null
@@ -1,202 +0,0 @@
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import ldap
-import ldif
-import re
-import cStringIO
-import copy
-
-import ipa.ipautil
-
-def utf8_encode_value(value):
- if isinstance(value,unicode):
- return value.encode('utf-8')
- return value
-
-def utf8_encode_values(values):
- if isinstance(values,list) or isinstance(values,tuple):
- return map(utf8_encode_value, values)
- else:
- return utf8_encode_value(values)
-
-def copy_CIDict(x):
- """Do a deep copy of a CIDict"""
- y = {}
- for key, value in x.iteritems():
- y[copy.deepcopy(key)] = copy.deepcopy(value)
- return y
-
-class Entity:
- """This class represents an IPA user. An LDAP entry consists of a DN
- and a list of attributes. Each attribute consists of a name and a list of
- values. For the time being I will maintain this.
-
- In python-ldap, entries are returned as a list of 2-tuples.
- Instance variables:
- dn - string - the string DN of the entry
- data - CIDict - case insensitive dict of the attributes and values
- orig_data - CIDict - case insentiive dict of the original attributes and values"""
-
- def __init__(self,entrydata=None):
- """data is the raw data returned from the python-ldap result method,
- which is a search result entry or a reference or None.
- If creating a new empty entry, data is the string DN."""
- if entrydata:
- if isinstance(entrydata,tuple):
- self.dn = entrydata[0]
- self.data = ipa.ipautil.CIDict(entrydata[1])
- elif isinstance(entrydata,str) or isinstance(entrydata,unicode):
- self.dn = entrydata
- self.data = ipa.ipautil.CIDict()
- elif isinstance(entrydata,dict):
- self.dn = entrydata['dn']
- del entrydata['dn']
- self.data = ipa.ipautil.CIDict(entrydata)
- else:
- self.dn = ''
- self.data = ipa.ipautil.CIDict()
-
- self.orig_data = ipa.ipautil.CIDict(copy_CIDict(self.data))
-
- def __nonzero__(self):
- """This allows us to do tests like if entry: returns false if there is no data,
- true otherwise"""
- return self.data != None and len(self.data) > 0
-
- def hasAttr(self,name):
- """Return True if this entry has an attribute named name, False otherwise"""
- return self.data and self.data.has_key(name)
-
- def __setattr__(self,name,value):
- """One should use setValue() or setValues() to set values except for
- dn and data which are special."""
- if name != 'dn' and name != 'data' and name != 'orig_data':
- raise KeyError, 'use setValue() or setValues()'
- else:
- self.__dict__[name] = value
-
- def __getattr__(self,name):
- """If name is the name of an LDAP attribute, return the first value for that
- attribute - equivalent to getValue - this allows the use of
- entry.cn
- instead of
- entry.getValue('cn')
- This also allows us to return None if an attribute is not found rather than
- throwing an exception"""
- return self.getValue(name)
-
- def getValues(self,name):
- """Get the list (array) of values for the attribute named name"""
- return self.data.get(name)
-
- def getValue(self,name,default=None):
- """Get the first value for the attribute named name"""
- value = self.data.get(name,default)
- if isinstance(value,list) or isinstance(value,tuple):
- return value[0]
- else:
- return value
-
- def setValue(self,name,*value):
- """Value passed in may be a single value, several values, or a single sequence.
- For example:
- ent.setValue('name', 'value')
- ent.setValue('name', 'value1', 'value2', ..., 'valueN')
- ent.setValue('name', ['value1', 'value2', ..., 'valueN'])
- ent.setValue('name', ('value1', 'value2', ..., 'valueN'))
- Since *value is a tuple, we may have to extract a list or tuple from that
- tuple as in the last two examples above"""
- if (len(value) < 1):
- return
- if (len(value) == 1):
- self.data[name] = utf8_encode_values(value[0])
- else:
- self.data[name] = utf8_encode_values(value)
-
- setValues = setValue
-
- def setValueNotEmpty(self,name,*value):
- """Similar to setValue() but will not set an empty field. This
- is an attempt to avoid adding empty attributes."""
- if (len(value) >= 1) and value[0] and len(value[0]) > 0:
- if isinstance(value[0], list):
- if len(value[0][0]) > 0:
- self.setValue(name, *value)
- return
- else:
- self.setValue(name, *value)
- return
-
- # At this point we have an empty incoming value. See if they are
- # trying to erase the current value. If so we'll delete it so
- # it gets marked as removed in the modlist.
- v = self.getValues(name)
- if v:
- self.delValue(name)
-
- return
-
- def delValue(self,name):
- """Remove the attribute named name."""
- if self.data.get(name,None):
- del self.data[name]
-
- def toTupleList(self):
- """Convert the attrs and values to a list of 2-tuples. The first element
- of the tuple is the attribute name. The second element is either a
- single value or a list of values."""
- return self.data.items()
-
- def toDict(self):
- """Convert the attrs and values to a dict. The dict is keyed on the
- attribute name. The value is either single value or a list of values."""
- result = ipa.ipautil.CIDict(self.data)
- result['dn'] = self.dn
- return result
-
- def attrList(self):
- """Return a list of all attributes in the entry"""
- return self.data.keys()
-
- def origDataDict(self):
- """Returns a dict of the original values of the user. Used for updates."""
- result = ipa.ipautil.CIDict(self.orig_data)
- result['dn'] = self.dn
- return result
-
-# def __str__(self):
-# """Convert the Entry to its LDIF representation"""
-# return self.__repr__()
-#
-# # the ldif class base64 encodes some attrs which I would rather see in raw form - to
-# # encode specific attrs as base64, add them to the list below
-# ldif.safe_string_re = re.compile('^$')
-# base64_attrs = ['nsstate', 'krbprincipalkey', 'krbExtraData']
-#
-# def __repr__(self):
-# """Convert the Entry to its LDIF representation"""
-# sio = cStringIO.StringIO()
-# # what's all this then? the unparse method will currently only accept
-# # a list or a dict, not a class derived from them. self.data is a
-# # cidict, so unparse barfs on it. I've filed a bug against python-ldap,
-# # but in the meantime, we have to convert to a plain old dict for printing
-# # I also don't want to see wrapping, so set the line width really high (1000)
-# newdata = {}
-# newdata.update(self.data)
-# ldif.LDIFWriter(sio,User.base64_attrs,1000).unparse(self.dn,newdata)
-# return sio.getvalue()
diff --git a/ipa-python/ipa-python.spec.in b/ipa-python/ipa-python.spec.in
deleted file mode 100755
index a41a413e6..000000000
--- a/ipa-python/ipa-python.spec.in
+++ /dev/null
@@ -1,82 +0,0 @@
-Name: ipa-python
-Version: __VERSION__
-Release: __RELEASE__%{?dist}
-Summary: IPA authentication server
-
-Group: System Environment/Base
-License: GPLv2
-URL: http://www.freeipa.org
-Source0: http://www.freeipa.org/downloads/%{name}-%{version}.tgz
-BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n)
-BuildArch: noarch
-BuildRequires: python-devel
-Requires: python-kerberos gnupg
-
-%{!?python_sitelib: %define python_sitelib %(%{__python} -c "from distutils.sysconfig import get_python_lib; print get_python_lib()")}
-
-%description
-IPA is a server for identity, policy, and audit.
-
-%prep
-%setup -q
-
-%build
-
-%install
-rm -rf %{buildroot}
-%{__python} setup.py install --no-compile --root=%{buildroot}
-
-%clean
-rm -rf %{buildroot}
-
-%files
-%defattr(-,root,root,-)
-%{python_sitelib}/*
-%config(noreplace) %{_sysconfdir}/ipa/ipa.conf
-
-%changelog
-* Thu Apr 3 2008 Rob Crittenden <rcritten@redhat.com> - 1.0.0-1
-- Version bump for release
-
-* Thu Feb 21 2008 Rob Crittenden <rcritten@redhat.com> - 0.99.0-1
-- Version bump for release
-
-* Thu Jan 31 2008 Rob Crittenden <rcritten@redhat.com> - 0.6.0-4
-- Marked with wrong license. IPA is GPLv2.
-
-* Thu Jan 24 2008 Rob Crittenden <rcritten@redhat.com> - 0.6.0-3
-- Use new name of PyKerberos, python-kerberos, in Requires
-
-* Thu Jan 17 2008 Rob Crittenden <rcritten@redhat.com> - 0.6.0-2
-- Fixed License in specfile
-
-* Fri Dec 21 2007 Karl MacMillan <kmacmill@redhat.com> - 0.6.0-1
-- Version bump for release
-
-* Wed Nov 21 2007 Karl MacMillan <kmacmill@redhat.com> - 0.5.0-1
-- Version bump for release and rename of rpm
-
-* Thu Nov 1 2007 Karl MacMillan <kmacmill@redhat.com> - 0.4.1-1
-- Version bump for release
-
-* Wed Oct 17 2007 Rob Crittenden <rcritten@redhat.com> - 0.4.0-2
-- Use new python setup.py build script
-
-* Tue Oct 2 2007 Karl MacMillan <kmacmill@redhat.com> - 0.4.0-1
-- Milestone 4
-
-* Mon Sep 10 2007 Karl MacMillan <kmacmill@redhat.com> - 0.3.0-1
-- Milestone 3
-
-* Fri Aug 17 2007 Karl MacMillan <kmacmill@redhat.com> = 0.2.0-4
-- Added PyKerberos dep.
-
-* Mon Aug 5 2007 Rob Crittenden <rcritten@redhat.com> - 0.1.0-3
-- Abstracted client class to work directly or over RPC
-
-* Wed Aug 1 2007 Rob Crittenden <rcritten@redhat.com> - 0.1.0-2
-- Add User class
-- Add kerberos authentication to the XML-RPC request made from tools.
-
-* Fri Jul 27 2007 Karl MacMillan <kmacmill@localhost.localdomain> - 0.1.0-1
-- Initial rpm version
diff --git a/ipa-python/ipa.conf b/ipa-python/ipa.conf
deleted file mode 100644
index 516f764d5..000000000
--- a/ipa-python/ipa.conf
+++ /dev/null
@@ -1,3 +0,0 @@
-[defaults]
-# realm = EXAMPLE.COM
-# server = ipa.example.com
diff --git a/ipa-python/ipautil.py b/ipa-python/ipautil.py
deleted file mode 100644
index aa49c3ba3..000000000
--- a/ipa-python/ipautil.py
+++ /dev/null
@@ -1,969 +0,0 @@
-# Authors: Simo Sorce <ssorce@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-SHARE_DIR = "/usr/share/ipa/"
-PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins"
-
-import string
-import tempfile
-import logging
-import subprocess
-import random
-import os, sys, traceback, readline
-import stat
-import shutil
-
-from ipa import ipavalidate
-from types import *
-
-import re
-import xmlrpclib
-import datetime
-from ipa import config
-try:
- from subprocess import CalledProcessError
- class CalledProcessError(subprocess.CalledProcessError):
- def __init__(self, returncode, cmd):
- super(CalledProcessError, self).__init__(returncode, cmd)
-except ImportError:
- # Python 2.4 doesn't implement CalledProcessError
- class CalledProcessError(Exception):
- """This exception is raised when a process run by check_call() returns
- a non-zero exit status. The exit status will be stored in the
- returncode attribute."""
- def __init__(self, returncode, cmd):
- self.returncode = returncode
- self.cmd = cmd
- def __str__(self):
- return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
-
-def get_domain_name():
- try:
- config.init_config()
- domain_name = config.config.get_domain()
- except Exception, e:
- return None
-
- return domain_name
-
-def realm_to_suffix(realm_name):
- s = realm_name.split(".")
- terms = ["dc=" + x.lower() for x in s]
- return ",".join(terms)
-
-def template_str(txt, vars):
- return string.Template(txt).substitute(vars)
-
-def template_file(infilename, vars):
- txt = open(infilename).read()
- return template_str(txt, vars)
-
-def write_tmp_file(txt):
- fd = tempfile.NamedTemporaryFile()
- fd.write(txt)
- fd.flush()
-
- return fd
-
-def run(args, stdin=None):
- if stdin:
- p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
- stdout,stderr = p.communicate(stdin)
- else:
- p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
- stdout,stderr = p.communicate()
-
- logging.info(stdout)
- logging.info(stderr)
-
- if p.returncode != 0:
- raise CalledProcessError(p.returncode, ' '.join(args))
-
- return (stdout, stderr)
-
-def file_exists(filename):
- try:
- mode = os.stat(filename)[stat.ST_MODE]
- if stat.S_ISREG(mode):
- return True
- else:
- return False
- except:
- return False
-
-def dir_exists(filename):
- try:
- mode = os.stat(filename)[stat.ST_MODE]
- if stat.S_ISDIR(mode):
- return True
- else:
- return False
- except:
- return False
-
-def install_file(fname, dest):
- if file_exists(dest):
- os.rename(dest, dest + ".orig")
- shutil.move(fname, dest)
-
-def backup_file(fname):
- if file_exists(fname):
- os.rename(fname, fname + ".orig")
-
-# uses gpg to compress and encrypt a file
-def encrypt_file(source, dest, password, workdir = None):
- if type(source) is not StringType or not len(source):
- raise ValueError('Missing Source File')
- #stat it so that we get back an exception if it does no t exist
- os.stat(source)
-
- if type(dest) is not StringType or not len(dest):
- raise ValueError('Missing Destination File')
-
- if type(password) is not StringType or not len(password):
- raise ValueError('Missing Password')
-
- #create a tempdir so that we can clean up with easily
- tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
- gpgdir = tempdir+"/.gnupg"
-
- try:
- try:
- #give gpg a fake dir so that we can leater remove all
- #the cruft when we clean up the tempdir
- os.mkdir(gpgdir)
- args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-c', source]
- run(args, password)
- except:
- raise
- finally:
- #job done, clean up
- shutil.rmtree(tempdir, ignore_errors=True)
-
-
-def decrypt_file(source, dest, password, workdir = None):
- if type(source) is not StringType or not len(source):
- raise ValueError('Missing Source File')
- #stat it so that we get back an exception if it does no t exist
- os.stat(source)
-
- if type(dest) is not StringType or not len(dest):
- raise ValueError('Missing Destination File')
-
- if type(password) is not StringType or not len(password):
- raise ValueError('Missing Password')
-
- #create a tempdir so that we can clean up with easily
- tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
- gpgdir = tempdir+"/.gnupg"
-
- try:
- try:
- #give gpg a fake dir so that we can leater remove all
- #the cruft when we clean up the tempdir
- os.mkdir(gpgdir)
- args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-d', source]
- run(args, password)
- except:
- raise
- finally:
- #job done, clean up
- shutil.rmtree(tempdir, ignore_errors=True)
-
-
-class CIDict(dict):
- """
- Case-insensitive but case-respecting dictionary.
-
- This code is derived from python-ldap's cidict.py module,
- written by stroeder: http://python-ldap.sourceforge.net/
-
- This version extends 'dict' so it works properly with TurboGears.
- If you extend UserDict, isinstance(foo, dict) returns false.
- """
-
- def __init__(self,default=None):
- super(CIDict, self).__init__()
- self._keys = {}
- self.update(default or {})
-
- def __getitem__(self,key):
- return super(CIDict,self).__getitem__(string.lower(key))
-
- def __setitem__(self,key,value):
- lower_key = string.lower(key)
- self._keys[lower_key] = key
- return super(CIDict,self).__setitem__(string.lower(key),value)
-
- def __delitem__(self,key):
- lower_key = string.lower(key)
- del self._keys[lower_key]
- return super(CIDict,self).__delitem__(string.lower(key))
-
- def update(self,dict):
- for key in dict.keys():
- self[key] = dict[key]
-
- def has_key(self,key):
- return super(CIDict, self).has_key(string.lower(key))
-
- def get(self,key,failobj=None):
- try:
- return self[key]
- except KeyError:
- return failobj
-
- def keys(self):
- return self._keys.values()
-
- def items(self):
- result = []
- for k in self._keys.values():
- result.append((k,self[k]))
- return result
-
- def copy(self):
- copy = {}
- for k in self._keys.values():
- copy[k] = self[k]
- return copy
-
- def iteritems(self):
- return self.copy().iteritems()
-
- def iterkeys(self):
- return self.copy().iterkeys()
-
- def setdefault(self,key,value=None):
- try:
- return self[key]
- except KeyError:
- self[key] = value
- return value
-
- def pop(self, key, *args):
- try:
- value = self[key]
- del self[key]
- return value
- except KeyError:
- if len(args) == 1:
- return args[0]
- raise
-
- def popitem(self):
- (lower_key,value) = super(CIDict,self).popitem()
- key = self._keys[lower_key]
- del self._keys[lower_key]
-
- return (key,value)
-
-
-#
-# The safe_string_re regexp and needs_base64 function are extracted from the
-# python-ldap ldif module, which was
-# written by Michael Stroeder <michael@stroeder.com>
-# http://python-ldap.sourceforge.net
-#
-# It was extracted because ipaldap.py is naughtily reaching into the ldif
-# module and squashing this regexp.
-#
-SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)'
-safe_string_re = re.compile(SAFE_STRING_PATTERN)
-
-def needs_base64(s):
- """
- returns 1 if s has to be base-64 encoded because of special chars
- """
- return not safe_string_re.search(s) is None
-
-
-def wrap_binary_data(data):
- """Converts all binary data strings into Binary objects for transport
- back over xmlrpc."""
- if isinstance(data, str):
- if needs_base64(data):
- return xmlrpclib.Binary(data)
- else:
- return data
- elif isinstance(data, list) or isinstance(data,tuple):
- retval = []
- for value in data:
- retval.append(wrap_binary_data(value))
- return retval
- elif isinstance(data, dict):
- retval = {}
- for (k,v) in data.iteritems():
- retval[k] = wrap_binary_data(v)
- return retval
- else:
- return data
-
-
-def unwrap_binary_data(data):
- """Converts all Binary objects back into strings."""
- if isinstance(data, xmlrpclib.Binary):
- # The data is decoded by the xmlproxy, but is stored
- # in a binary object for us.
- return str(data)
- elif isinstance(data, str):
- return data
- elif isinstance(data, list) or isinstance(data,tuple):
- retval = []
- for value in data:
- retval.append(unwrap_binary_data(value))
- return retval
- elif isinstance(data, dict):
- retval = {}
- for (k,v) in data.iteritems():
- retval[k] = unwrap_binary_data(v)
- return retval
- else:
- return data
-
-class GeneralizedTimeZone(datetime.tzinfo):
- """This class is a basic timezone wrapper for the offset specified
- in a Generalized Time. It is dst-ignorant."""
- def __init__(self,offsetstr="Z"):
- super(GeneralizedTimeZone, self).__init__()
-
- self.name = offsetstr
- self.houroffset = 0
- self.minoffset = 0
-
- if offsetstr == "Z":
- self.houroffset = 0
- self.minoffset = 0
- else:
- if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr):
- self.houroffset = int(offsetstr[0:3])
- offsetstr = offsetstr[3:]
- if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr):
- self.minoffset = int(offsetstr[0:2])
- offsetstr = offsetstr[2:]
- if len(offsetstr) > 0:
- raise ValueError()
- if self.houroffset < 0:
- self.minoffset *= -1
-
- def utcoffset(self, dt):
- return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset)
-
- def dst(self, dt):
- return datetime.timedelta(0)
-
- def tzname(self, dt):
- return self.name
-
-
-def parse_generalized_time(timestr):
- """Parses are Generalized Time string (as specified in X.680),
- returning a datetime object. Generalized Times are stored inside
- the krbPasswordExpiration attribute in LDAP.
-
- This method doesn't attempt to be perfect wrt timezones. If python
- can't be bothered to implement them, how can we..."""
-
- if len(timestr) < 8:
- return None
- try:
- date = timestr[:8]
- time = timestr[8:]
-
- year = int(date[:4])
- month = int(date[4:6])
- day = int(date[6:8])
-
- hour = min = sec = msec = 0
- tzone = None
-
- if (len(time) >= 2) and re.match(r'\d', time[0]):
- hour = int(time[:2])
- time = time[2:]
- if len(time) >= 2 and (time[0] == "," or time[0] == "."):
- hour_fraction = "."
- time = time[1:]
- while (len(time) > 0) and re.match(r'\d', time[0]):
- hour_fraction += time[0]
- time = time[1:]
- total_secs = int(float(hour_fraction) * 3600)
- min, sec = divmod(total_secs, 60)
-
- if (len(time) >= 2) and re.match(r'\d', time[0]):
- min = int(time[:2])
- time = time[2:]
- if len(time) >= 2 and (time[0] == "," or time[0] == "."):
- min_fraction = "."
- time = time[1:]
- while (len(time) > 0) and re.match(r'\d', time[0]):
- min_fraction += time[0]
- time = time[1:]
- sec = int(float(min_fraction) * 60)
-
- if (len(time) >= 2) and re.match(r'\d', time[0]):
- sec = int(time[:2])
- time = time[2:]
- if len(time) >= 2 and (time[0] == "," or time[0] == "."):
- sec_fraction = "."
- time = time[1:]
- while (len(time) > 0) and re.match(r'\d', time[0]):
- sec_fraction += time[0]
- time = time[1:]
- msec = int(float(sec_fraction) * 1000000)
-
- if (len(time) > 0):
- tzone = GeneralizedTimeZone(time)
-
- return datetime.datetime(year, month, day, hour, min, sec, msec, tzone)
-
- except ValueError:
- return None
-
-def ipa_generate_password():
- rndpwd = ''
- r = random.SystemRandom()
- for x in range(12):
- rndpwd += chr(r.randint(32,126))
- return rndpwd
-
-
-def format_list(items, quote=None, page_width=80):
- '''Format a list of items formatting them so they wrap to fit the
- available width. The items will be sorted.
-
- The items may optionally be quoted. The quote parameter may either be
- a string, in which case it is added before and after the item. Or the
- quote parameter may be a pair (either a tuple or list). In this case
- quote[0] is left hand quote and quote[1] is the right hand quote.
- '''
- left_quote = right_quote = ''
- num_items = len(items)
- if not num_items: return ""
-
- if quote is not None:
- if type(quote) in StringTypes:
- left_quote = right_quote = quote
- elif type(quote) is TupleType or type(quote) is ListType:
- left_quote = quote[0]
- right_quote = quote[1]
-
- max_len = max(map(len, items))
- max_len += len(left_quote) + len(right_quote)
- num_columns = (page_width + max_len) / (max_len+1)
- num_rows = (num_items + num_columns - 1) / num_columns
- items.sort()
-
- rows = [''] * num_rows
- i = row = col = 0
-
- while i < num_items:
- row = 0
- if col == 0:
- separator = ''
- else:
- separator = ' '
-
- while i < num_items and row < num_rows:
- rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote))
- i += 1
- row += 1
- col += 1
- return '\n'.join(rows)
-
-key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))")
-def parse_key_value_pairs(input):
- ''' Given a string composed of key=value pairs parse it and return
- a dict of the key/value pairs. Keys must be a word, a key must be followed
- by an equal sign (=) and a value. The value may be a single word or may be
- quoted. Quotes may be either single or double quotes, but must be balanced.
- Inside the quoted text the same quote used to start the quoted value may be
- used if it is escaped by preceding it with a backslash (\).
- White space between the key, the equal sign, and the value is ignored.
- Values are always strings. Empty values must be specified with an empty
- quoted string, it's value after parsing will be an empty string.
-
- Example: The string
-
- arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote"
-
- will produce
-
- arg0= arg1=1
- arg2=two
- arg3=three's a crowd
- arg4=this is a " quote
- '''
-
- kv_dict = {}
- for match in key_value_re.finditer(input):
- key = match.group(1)
- quote = match.group('quote')
- if match.group(5):
- value = match.group(6)
- if value is None: value = ''
- value = re.sub('\\\%s' % quote, quote, value)
- else:
- value = match.group(2)
- kv_dict[key] = value
- return kv_dict
-
-def parse_items(text):
- '''Given text with items separated by whitespace or comma, return a list of those items'''
- split_re = re.compile('[ ,\t\n]+')
- items = split_re.split(text)
- for item in items[:]:
- if not item: items.remove(item)
- return items
-
-def read_pairs_file(filename):
- comment_re = re.compile('#.*$', re.MULTILINE)
- if filename == '-':
- fd = sys.stdin
- else:
- fd = open(filename)
- text = fd.read()
- text = comment_re.sub('', text) # kill comments
- pairs = parse_key_value_pairs(text)
- if fd != sys.stdin: fd.close()
- return pairs
-
-def read_items_file(filename):
- comment_re = re.compile('#.*$', re.MULTILINE)
- if filename == '-':
- fd = sys.stdin
- else:
- fd = open(filename)
- text = fd.read()
- text = comment_re.sub('', text) # kill comments
- items = parse_items(text)
- if fd != sys.stdin: fd.close()
- return items
-
-def user_input(prompt, default = None, allow_empty = True):
- if default == None:
- while True:
- ret = raw_input("%s: " % prompt)
- if allow_empty or ret.strip():
- return ret
-
- if isinstance(default, basestring):
- while True:
- ret = raw_input("%s [%s]: " % (prompt, default))
- if not ret and (allow_empty or default):
- return default
- elif ret.strip():
- return ret
- if isinstance(default, bool):
- if default:
- choice = "yes"
- else:
- choice = "no"
- while True:
- ret = raw_input("%s [%s]: " % (prompt, choice))
- if not ret:
- return default
- elif ret.lower()[0] == "y":
- return True
- elif ret.lower()[0] == "n":
- return False
- if isinstance(default, int):
- while True:
- try:
- ret = raw_input("%s [%s]: " % (prompt, default))
- if not ret:
- return default
- ret = int(ret)
- except ValueError:
- pass
- else:
- return ret
-
-def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True):
- while True:
- ret = user_input(prompt, default, allow_empty)
- if ipavalidate.Plain(ret, not allow_empty, allow_spaces):
- return ret
-
-def user_input_path(prompt, default = None, allow_empty = True):
- if default != None and allow_empty:
- prompt += " (enter \"none\" for empty)"
- while True:
- ret = user_input(prompt, default, allow_empty)
- if allow_empty and ret.lower() == "none":
- return ""
- if ipavalidate.Path(ret, not allow_empty):
- return ret
-
-class AttributeValueCompleter:
- '''
- Gets input from the user in the form "lhs operator rhs"
- TAB completes partial input.
- lhs completes to a name in @lhs_names
- The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will
- complete to the operator and a default value.
- Default values for a lhs value can specified as:
- - a string, all lhs values will use this default
- - a dict, the lhs value is looked up in the dict to return the default or None
- - a function with a single arg, the lhs value, it returns the default or None
-
- After creating the completer you must open it to set the terminal
- up, Then get a line of input from the user by calling read_input()
- which returns two values, the lhs and rhs, which might be None if
- lhs or rhs was not parsed. After you are done getting input you
- should close the completer to restore the terminal.
-
- Example: (note this is essentially what the convenience function get_pairs() does)
-
- This will allow the user to autocomplete foo & foobar, both have
- defaults defined in a dict. In addition the foobar attribute must
- be specified before the prompting loop will exit. Also, this
- example show how to require that each attrbute entered by the user
- is valid.
-
- attrs = ['foo', 'foobar']
- defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'}
- mandatory_attrs = ['foobar']
-
- c = AttributeValueCompleter(attrs, defaults)
- c.open()
- mandatory_attrs_remaining = mandatory_attrs[:]
-
- while True:
- if mandatory_attrs_remaining:
- attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0])
- try:
- mandatory_attrs_remaining.remove(attribute)
- except ValueError:
- pass
- else:
- attribute, value = c.read_input("Enter: ")
- if attribute is None:
- # Are we done?
- if mandatory_attrs_remaining:
- print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
- continue
- else:
- break
- if attribute not in attrs:
- print "ERROR: %s is not a valid attribute" % (attribute)
- else:
- print "got '%s' = '%s'" % (attribute, value)
-
- c.close()
- print "exiting..."
- '''
-
- def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =',
- operator='=', strip_rhs=True):
- self.lhs_names = lhs_names
- self.default_value = default_value
- # lhs_regexp must have named group 'lhs' which returns the contents of the lhs
- self.lhs_regexp = lhs_regexp
- self.lhs_re = re.compile(self.lhs_regexp)
- self.lhs_delims = lhs_delims
- self.operator = operator
- self.strip_rhs = strip_rhs
- self.pairs = None
- self._reset()
-
- def _reset(self):
- self.lhs = None
- self.lhs_complete = False
- self.operator_complete = False
- self.rhs = None
-
- def open(self):
- # Save state
- self.prev_completer = readline.get_completer()
- self.prev_completer_delims = readline.get_completer_delims()
-
- # Set up for ourself
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
- readline.set_completer_delims(self.lhs_delims)
-
- def close(self):
- # Restore previous state
- readline.set_completer_delims(self.prev_completer_delims)
- readline.set_completer(self.prev_completer)
-
- def parse_input(self):
- '''We are looking for 3 tokens: <lhs,op,rhs>
- Extract as much of each token as possible.
- Set flags indicating if token is fully parsed.
- '''
- try:
- self._reset()
- buf_len = len(self.line_buffer)
- pos = 0
- lhs_match = self.lhs_re.search(self.line_buffer, pos)
- if not lhs_match: return # no lhs content
- self.lhs = lhs_match.group('lhs') # get lhs contents
- pos = lhs_match.end('lhs') # new scanning position
- if pos == buf_len: return # nothing after lhs, lhs incomplete
- self.lhs_complete = True # something trails the lhs, lhs is complete
- operator_beg = self.line_buffer.find(self.operator, pos) # locate operator
- if operator_beg == -1: return # did not find the operator
- self.operator_complete = True # operator fully parsed
- operator_end = operator_beg + len(self.operator)
- pos = operator_end # step over the operator
- self.rhs = self.line_buffer[pos:]
- except Exception, e:
- traceback.print_exc()
- print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e)
-
- def get_default_value(self):
- '''default_value can be a string, a dict, or a function.
- If it's a string it's a global default for all attributes.
- If it's a dict the default is looked up in the dict index by attribute.
- If it's a function, the function is called with 1 parameter, the attribute
- and it should return the default value for the attriubte or None'''
-
- if not self.lhs_complete: raise ValueError("attribute not parsed")
-
- # If the user previously provided a value let that override the supplied default
- if self.pairs is not None:
- prev_value = self.pairs.get(self.lhs)
- if prev_value is not None: return prev_value
-
- # No previous user provided value, query for a default
- default_value_type = type(self.default_value)
- if default_value_type is DictType:
- return self.default_value.get(self.lhs, None)
- elif default_value_type is FunctionType:
- return self.default_value(self.lhs)
- elif default_value_type is StringsType:
- return self.default_value
- else:
- return None
-
- def get_lhs_completions(self, text):
- if text:
- self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)]
- else:
- self.completions = self.lhs_names
-
- def complete(self, text, state):
- self.line_buffer= readline.get_line_buffer()
- self.parse_input()
- if not self.lhs_complete:
- # lhs is not complete, set up to complete the lhs
- if state == 0:
- beg = readline.get_begidx()
- end = readline.get_endidx()
- self.get_lhs_completions(self.line_buffer[beg:end])
- if state >= len(self.completions): return None
- return self.completions[state]
-
-
- elif not self.operator_complete:
- # lhs is complete, but the operator is not so we complete
- # by inserting the operator manually.
- # Also try to complete the default value at this time.
- readline.insert_text('%s ' % self.operator)
- default_value = self.get_default_value()
- if default_value is not None:
- readline.insert_text(default_value)
- readline.redisplay()
- return None
- else:
- # lhs and operator are complete, if the the rhs is blank
- # (either empty or only only whitespace) then attempt
- # to complete by inserting the default value, otherwise
- # there is nothing we can complete to so we're done.
- if self.rhs.strip():
- return None
- default_value = self.get_default_value()
- if default_value is not None:
- readline.insert_text(default_value)
- readline.redisplay()
- return None
-
- def pre_input_hook(self):
- readline.insert_text('%s %s ' % (self.initial_lhs, self.operator))
- readline.redisplay()
-
- def read_input(self, prompt, initial_lhs=None):
- self.initial_lhs = initial_lhs
- try:
- self._reset()
- if initial_lhs is None:
- readline.set_pre_input_hook(None)
- else:
- readline.set_pre_input_hook(self.pre_input_hook)
- self.line_buffer = raw_input(prompt).strip()
- self.parse_input()
- if self.strip_rhs and self.rhs is not None:
- return self.lhs, self.rhs.strip()
- else:
- return self.lhs, self.rhs
- except EOFError:
- return None, None
-
- def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True):
- self.pairs = {}
- if mandatory_attrs:
- mandatory_attrs_remaining = mandatory_attrs[:]
- else:
- mandatory_attrs_remaining = []
-
- print "Enter name = value"
- print "Press <ENTER> to accept, a blank line terminates input"
- print "Pressing <TAB> will auto completes name, assignment, and value"
- print
- while True:
- if mandatory_attrs_remaining:
- attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0])
- else:
- attribute, value = self.read_input(prompt)
- if attribute is None:
- # Are we done?
- if mandatory_attrs_remaining:
- print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
- continue
- else:
- break
- if value is None:
- if value_required:
- print "ERROR: you must specify a value for %s" % attribute
- continue
- else:
- if must_match and attribute not in self.lhs_names:
- print "ERROR: %s is not a valid name" % (attribute)
- continue
- if validate_callback is not None:
- if not validate_callback(attribute, value):
- print "ERROR: %s is not valid for %s" % (value, attribute)
- continue
- try:
- mandatory_attrs_remaining.remove(attribute)
- except ValueError:
- pass
-
- self.pairs[attribute] = value
- return self.pairs
-
-class ItemCompleter:
- '''
- Prompts the user for items in a list of items with auto completion.
- TAB completes partial input.
- More than one item can be specifed during input, whitespace and/or comma's seperate.
- Example:
-
- possible_items = ['foo', 'bar']
- c = ItemCompleter(possible_items)
- c.open()
- # Use read_input() to limit input to a single carriage return (e.g. <ENTER>)
- #items = c.read_input("Enter: ")
- # Use get_items to iterate until a blank line is entered.
- items = c.get_items("Enter: ")
- c.close()
- print "items=%s" % (items)
-
- '''
-
- def __init__(self, items):
- self.items = items
- self.initial_input = None
- self.item_delims = ' \t,'
- self.split_re = re.compile('[%s]+' % self.item_delims)
-
- def open(self):
- # Save state
- self.prev_completer = readline.get_completer()
- self.prev_completer_delims = readline.get_completer_delims()
-
- # Set up for ourself
- readline.parse_and_bind("tab: complete")
- readline.set_completer(self.complete)
- readline.set_completer_delims(self.item_delims)
-
- def close(self):
- # Restore previous state
- readline.set_completer_delims(self.prev_completer_delims)
- readline.set_completer(self.prev_completer)
-
- def get_item_completions(self, text):
- if text:
- self.completions = [lhs for lhs in self.items if lhs.startswith(text)]
- else:
- self.completions = self.items
-
- def complete(self, text, state):
- self.line_buffer= readline.get_line_buffer()
- if state == 0:
- beg = readline.get_begidx()
- end = readline.get_endidx()
- self.get_item_completions(self.line_buffer[beg:end])
- if state >= len(self.completions): return None
- return self.completions[state]
-
- def pre_input_hook(self):
- readline.insert_text('%s %s ' % (self.initial_input, self.operator))
- readline.redisplay()
-
- def read_input(self, prompt, initial_input=None):
- items = []
-
- self.initial_input = initial_input
- try:
- if initial_input is None:
- readline.set_pre_input_hook(None)
- else:
- readline.set_pre_input_hook(self.pre_input_hook)
- self.line_buffer = raw_input(prompt).strip()
- items = self.split_re.split(self.line_buffer)
- for item in items[:]:
- if not item: items.remove(item)
- return items
- except EOFError:
- return items
-
- def get_items(self, prompt, must_match=True):
- items = []
-
- print "Enter name [name ...]"
- print "Press <ENTER> to accept, blank line or control-D terminates input"
- print "Pressing <TAB> auto completes name"
- print
- while True:
- new_items = self.read_input(prompt)
- if not new_items: break
- for item in new_items:
- if must_match:
- if item not in self.items:
- print "ERROR: %s is not valid" % (item)
- continue
- if item in items: continue
- items.append(item)
-
- return items
-
-def get_gsserror(e):
- """A GSSError exception looks differently in python 2.4 than it does
- in python 2.5, deal with it."""
-
- try:
- primary = e[0]
- secondary = e[1]
- except:
- primary = e[0][0]
- secondary = e[0][1]
-
- return (primary[0], secondary[0])
diff --git a/ipa-python/ipavalidate.py b/ipa-python/ipavalidate.py
deleted file mode 100644
index 63e0a7614..000000000
--- a/ipa-python/ipavalidate.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# Authors: Rob Crittenden <rcritten@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import re
-
-def Email(mail, notEmpty=True):
- """Do some basic validation of an e-mail address.
- Return True if ok
- Return False if not
-
- If notEmpty is True the this will return an error if the field
- is "" or None.
- """
- usernameRE = re.compile(r"^[^ \t\n\r@<>()]+$", re.I)
- domainRE = re.compile(r"^[a-z0-9][a-z0-9\.\-_]*\.[a-z]+$", re.I)
-
- if not mail or mail is None:
- if notEmpty is True:
- return False
- else:
- return True
-
- mail = mail.strip()
- s = mail.split('@', 1)
- try:
- username, domain=s
- except ValueError:
- return False
- if not usernameRE.search(username):
- return False
- if not domainRE.search(domain):
- return False
-
- return True
-
-def Plain(text, notEmpty=False, allowSpaces=True):
- """Do some basic validation of a plain text field
- Return True if ok
- Return False if not
-
- If notEmpty is True the this will return an error if the field
- is "" or None.
- """
- if (text is None) or (not text.strip()):
- if notEmpty is True:
- return False
- else:
- return True
-
- if allowSpaces:
- textRE = re.compile(r"^[a-zA-Z_\-0-9\'\ ]*$")
- else:
- textRE = re.compile(r"^[a-zA-Z_\-0-9\']*$")
- if not textRE.search(text):
- return False
-
- return True
-
-def String(text, notEmpty=False):
- """A string type. This is much looser in what it allows than plain"""
-
- if text is None or not text.strip():
- if notEmpty is True:
- return False
- else:
- return True
-
- return True
-
-def Path(text, notEmpty=False):
- """Do some basic validation of a path
- Return True if ok
- Return False if not
-
- If notEmpty is True the this will return an error if the field
- is "" or None.
- """
- textRE = re.compile(r"^[a-zA-Z_\-0-9\\ \.\/\\:]*$")
-
- if not text and notEmpty is True:
- return False
-
- if text is None:
- if notEmpty is True:
- return False
- else:
- return True
-
- if not textRE.search(text):
- return False
-
- return True
-
-def GoodName(text, notEmpty=False):
- """From shadow-utils:
-
- User/group names must match gnu e-regex:
- [a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?
-
- as a non-POSIX, extension, allow "$" as the last char for
- sake of Samba 3.x "add machine script"
-
- Return True if ok
- Return False if not
- """
- textRE = re.compile(r"^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?$")
-
- if not text and notEmpty is True:
- return False
-
- if text is None:
- if notEmpty is True:
- return False
- else:
- return True
-
- m = textRE.match(text)
- if not m or text != m.group(0):
- return False
-
- return True
diff --git a/ipa-python/radius_util.py b/ipa-python/radius_util.py
deleted file mode 100644
index 3d2e83e18..000000000
--- a/ipa-python/radius_util.py
+++ /dev/null
@@ -1,366 +0,0 @@
-# Authors: John Dennis <jdennis@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import sys
-import os
-import re
-import ldap
-import getpass
-import ldap.filter
-
-from ipa import ipautil
-from ipa.entity import Entity
-import ipa.ipavalidate as ipavalidate
-
-
-__all__ = [
- 'RADIUS_PKG_NAME',
- 'RADIUS_PKG_CONFIG_DIR',
- 'RADIUS_SERVICE_NAME',
- 'RADIUS_USER',
- 'RADIUS_IPA_KEYTAB_FILEPATH',
- 'RADIUS_LDAP_ATTR_MAP_FILEPATH',
- 'RADIUSD_CONF_FILEPATH',
- 'RADIUSD_CONF_TEMPLATE_FILEPATH',
- 'RADIUSD',
-
- 'RadiusClient',
- 'RadiusProfile',
-
- 'clients_container',
- 'radius_clients_basedn',
- 'radius_client_filter',
- 'radius_client_dn',
-
- 'profiles_container',
- 'radius_profiles_basedn',
- 'radius_profile_filter',
- 'radius_profile_dn',
-
- 'radius_client_ldap_attr_to_radius_attr',
- 'radius_client_attr_to_ldap_attr',
-
- 'radius_profile_ldap_attr_to_radius_attr',
- 'radius_profile_attr_to_ldap_attr',
-
- 'get_secret',
- 'validate_ip_addr',
- 'validate_secret',
- 'validate_name',
- 'validate_nastype',
- 'validate_desc',
- 'validate',
- ]
-
-#------------------------------------------------------------------------------
-
-RADIUS_PKG_NAME = 'freeradius'
-RADIUS_PKG_CONFIG_DIR = '/etc/raddb'
-
-RADIUS_SERVICE_NAME = 'radius'
-RADIUS_USER = 'radiusd'
-
-RADIUS_IPA_KEYTAB_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ipa.keytab')
-RADIUS_LDAP_ATTR_MAP_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ldap.attrmap')
-RADIUSD_CONF_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'radiusd.conf')
-RADIUSD_CONF_TEMPLATE_FILEPATH = os.path.join(ipautil.PLUGINS_SHARE_DIR, 'radius.radiusd.conf.template')
-
-RADIUSD = '/usr/sbin/radiusd'
-
-#------------------------------------------------------------------------------
-
-dotted_octet_re = re.compile(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)(/(\d+))?$")
-dns_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9.-]+$")
-# secret, name, nastype all have 31 char max in freeRADIUS, max ip address len is 255
-valid_secret_len = (1,31)
-valid_name_len = (1,31)
-valid_nastype_len = (1,31)
-valid_ip_addr_len = (1,255)
-
-valid_ip_addr_msg = '''\
-IP address must be either a DNS name (letters,digits,dot,hyphen, beginning with
-a letter),or a dotted octet followed by an optional mask (e.g 192.168.1.0/24)'''
-
-valid_desc_msg = "Description must text string"
-
-#------------------------------------------------------------------------------
-
-class RadiusClient(Entity):
-
- def __init2__(self):
- pass
-
-class RadiusProfile(Entity):
-
- def __init2__(self):
- pass
-
-
-#------------------------------------------------------------------------------
-
-def reverse_map_dict(src_dict):
- reverse_dict = {}
-
- for k,v in src_dict.items():
- if reverse_dict.has_key(v):
- raise ValueError("reverse_map_dict: collision on (%s) with values (%s),(%s)" % \
- v, reverse_dict[v], src_dict[k])
- reverse_dict[v] = k
- return reverse_dict
-
-#------------------------------------------------------------------------------
-
-radius_client_ldap_attr_to_radius_attr = ipautil.CIDict({
- 'radiusClientIPAddress' : 'Client-IP-Address',
- 'radiusClientSecret' : 'Secret',
- 'radiusClientNASType' : 'NAS-Type',
- 'radiusClientShortName' : 'Name',
- 'description' : 'Description',
- })
-
-radius_client_attr_to_ldap_attr = reverse_map_dict(radius_client_ldap_attr_to_radius_attr)
-
-#------------------------------------------------------------------------------
-
-radius_profile_ldap_attr_to_radius_attr = ipautil.CIDict({
- 'uid' : 'UID',
- 'radiusArapFeatures' : 'Arap-Features',
- 'radiusArapSecurity' : 'Arap-Security',
- 'radiusArapZoneAccess' : 'Arap-Zone-Access',
- 'radiusAuthType' : 'Auth-Type',
- 'radiusCallbackId' : 'Callback-Id',
- 'radiusCallbackNumber' : 'Callback-Number',
- 'radiusCalledStationId' : 'Called-Station-Id',
- 'radiusCallingStationId' : 'Calling-Station-Id',
- 'radiusClass' : 'Class',
- 'radiusClientIPAddress' : 'Client-IP-Address',
- 'radiusExpiration' : 'Expiration',
- 'radiusFilterId' : 'Filter-Id',
- 'radiusFramedAppleTalkLink' : 'Framed-AppleTalk-Link',
- 'radiusFramedAppleTalkNetwork' : 'Framed-AppleTalk-Network',
- 'radiusFramedAppleTalkZone' : 'Framed-AppleTalk-Zone',
- 'radiusFramedCompression' : 'Framed-Compression',
- 'radiusFramedIPAddress' : 'Framed-IP-Address',
- 'radiusFramedIPNetmask' : 'Framed-IP-Netmask',
- 'radiusFramedIPXNetwork' : 'Framed-IPX-Network',
- 'radiusFramedMTU' : 'Framed-MTU',
- 'radiusFramedProtocol' : 'Framed-Protocol',
- 'radiusFramedRoute' : 'Framed-Route',
- 'radiusFramedRouting' : 'Framed-Routing',
- 'radiusGroupName' : 'Group-Name',
- 'radiusHint' : 'Hint',
- 'radiusHuntgroupName' : 'Huntgroup-Name',
- 'radiusIdleTimeout' : 'Idle-Timeout',
- 'radiusLoginIPHost' : 'Login-IP-Host',
- 'radiusLoginLATGroup' : 'Login-LAT-Group',
- 'radiusLoginLATNode' : 'Login-LAT-Node',
- 'radiusLoginLATPort' : 'Login-LAT-Port',
- 'radiusLoginLATService' : 'Login-LAT-Service',
- 'radiusLoginService' : 'Login-Service',
- 'radiusLoginTCPPort' : 'Login-TCP-Port',
- 'radiusLoginTime' : 'Login-Time',
- 'radiusNASIpAddress' : 'NAS-IP-Address',
- 'radiusPasswordRetry' : 'Password-Retry',
- 'radiusPortLimit' : 'Port-Limit',
- 'radiusProfileDn' : 'Profile-Dn',
- 'radiusPrompt' : 'Prompt',
- 'radiusProxyToRealm' : 'Proxy-To-Realm',
- 'radiusRealm' : 'Realm',
- 'radiusReplicateToRealm' : 'Replicate-To-Realm',
- 'radiusReplyMessage' : 'Reply-Message',
- 'radiusServiceType' : 'Service-Type',
- 'radiusSessionTimeout' : 'Session-Timeout',
- 'radiusSimultaneousUse' : 'Simultaneous-Use',
- 'radiusStripUserName' : 'Strip-User-Name',
- 'radiusTerminationAction' : 'Termination-Action',
- 'radiusTunnelAssignmentId' : 'Tunnel-Assignment-Id',
- 'radiusTunnelClientEndpoint' : 'Tunnel-Client-Endpoint',
- 'radiusTunnelMediumType' : 'Tunnel-Medium-Type',
- 'radiusTunnelPassword' : 'Tunnel-Password',
- 'radiusTunnelPreference' : 'Tunnel-Preference',
- 'radiusTunnelPrivateGroupId' : 'Tunnel-Private-Group-Id',
- 'radiusTunnelServerEndpoint' : 'Tunnel-Server-Endpoint',
- 'radiusTunnelType' : 'Tunnel-Type',
- 'radiusUserCategory' : 'User-Category',
- 'radiusVSA' : 'VSA',
-})
-
-radius_profile_attr_to_ldap_attr = reverse_map_dict(radius_profile_ldap_attr_to_radius_attr)
-
-#------------------------------------------------------------------------------
-
-clients_container = 'cn=clients,cn=radius'
-
-def radius_clients_basedn(container, suffix):
- if container is None: container = clients_container
- return '%s,%s' % (container, suffix)
-
-def radius_client_filter(ip_addr):
- return "(&(radiusClientIPAddress=%s)(objectclass=radiusClientProfile))" % \
- ldap.filter.escape_filter_chars(ip_addr)
-
-def radius_client_dn(client, container, suffix):
- if container is None: container = clients_container
- return 'radiusClientIPAddress=%s,%s,%s' % (ldap.dn.escape_dn_chars(client), container, suffix)
-
-# --
-
-profiles_container = 'cn=profiles,cn=radius'
-
-def radius_profiles_basedn(container, suffix):
- if container is None: container = profiles_container
- return '%s,%s' % (container, suffix)
-
-def radius_profile_filter(uid):
- return "(&(uid=%s)(objectclass=radiusprofile))" % \
- ldap.filter.escape_filter_chars(uid)
-
-def radius_profile_dn(uid, container, suffix):
- if container is None: container = profiles_container
- return 'uid=%s,%s,%s' % (ldap.dn.escape_dn_chars(uid), container, suffix)
-
-
-#------------------------------------------------------------------------------
-
-def get_ldap_attr_translations():
- comment_re = re.compile('#.*$')
- radius_attr_to_ldap_attr = {}
- ldap_attr_to_radius_attr = {}
- try:
- f = open(LDAP_ATTR_MAP_FILEPATH)
- for line in f.readlines():
- line = comment_re.sub('', line).strip()
- if not line: continue
- attr_type, radius_attr, ldap_attr = line.split()
- print 'type="%s" radius="%s" ldap="%s"' % (attr_type, radius_attr, ldap_attr)
- radius_attr_to_ldap_attr[radius_attr] = {'ldap_attr':ldap_attr, 'attr_type':attr_type}
- ldap_attr_to_radius_attr[ldap_attr] = {'radius_attr':radius_attr, 'attr_type':attr_type}
- f.close()
- except Exception, e:
- logging.error('cold not read radius ldap attribute map file (%s): %s', LDAP_ATTR_MAP_FILEPATH, e)
- pass # FIXME
-
- #for k,v in radius_attr_to_ldap_attr.items():
- # print '%s --> %s' % (k,v)
- #for k,v in ldap_attr_to_radius_attr.items():
- # print '%s --> %s' % (k,v)
-
-def get_secret():
- valid = False
- while (not valid):
- secret = getpass.getpass("Enter Secret: ")
- confirm = getpass.getpass("Confirm Secret: ")
- if (secret != confirm):
- print "Secrets do not match"
- continue
- valid = True
- return secret
-
-#------------------------------------------------------------------------------
-
-def valid_ip_addr(text):
-
- # is it a dotted octet? If so there should be 4 integers seperated
- # by a dot and each integer should be between 0 and 255
- # there may be an optional mask preceded by a slash (e.g. 1.2.3.4/24)
- match = dotted_octet_re.search(text)
- if match:
- # dotted octet notation
- i = 1
- while i <= 4:
- octet = int(match.group(i))
- if octet > 255: return False
- i += 1
- if match.group(5):
- mask = int(match.group(6))
- if mask <= 32:
- return True
- else:
- return False
- return True
- else:
- # DNS name, can contain letters, numbers, dot and hypen, must start with a letter
- if dns_re.search(text): return True
- return False
-
-def validate_length(value, limits):
- length = len(value)
- if length < limits[0] or length > limits[1]:
- return False
- return True
-
-def valid_length_msg(name, limits):
- return "%s length must be at least %d and not more than %d" % (name, limits[0], limits[1])
-
-def err_msg(variable, variable_name=None):
- if variable_name is None: variable_name = 'value'
- print "ERROR: %s = %s" % (variable_name, variable)
-
-#------------------------------------------------------------------------------
-
-def validate_ip_addr(ip_addr, variable_name=None):
- if not validate_length(ip_addr, valid_ip_addr_len):
- err_msg(ip_addr, variable_name)
- print valid_length_msg('ip address', valid_ip_addr_len)
- return False
- if not valid_ip_addr(ip_addr):
- err_msg(ip_addr, variable_name)
- print valid_ip_addr_msg
- return False
- return True
-
-def validate_secret(secret, variable_name=None):
- if not validate_length(secret, valid_secret_len):
- err_msg(secret, variable_name)
- print valid_length_msg('secret', valid_secret_len)
- return False
- return True
-
-def validate_name(name, variable_name=None):
- if not validate_length(name, valid_name_len):
- err_msg(name, variable_name)
- print valid_length_msg('name', valid_name_len)
- return False
- return True
-
-def validate_nastype(nastype, variable_name=None):
- if not validate_length(nastype, valid_nastype_len):
- err_msg(nastype, variable_name)
- print valid_length_msg('NAS Type', valid_nastype_len)
- return False
- return True
-
-def validate_desc(desc, variable_name=None):
- if not ipavalidate.Plain(desc):
- print valid_desc_msg
- return False
- return True
-
-def validate(attribute, value):
- if attribute == 'Client-IP-Address':
- return validate_ip_addr(value, attribute)
- if attribute == 'Secret':
- return validate_secret(value, attribute)
- if attribute == 'NAS-Type':
- return validate_nastype(value, attribute)
- if attribute == 'Name':
- return validate_name(value, attribute)
- if attribute == 'Description':
- return validate_desc(value, attribute)
- return True
diff --git a/ipa-python/setup.py.in b/ipa-python/setup.py.in
deleted file mode 100644
index 19940f38f..000000000
--- a/ipa-python/setup.py.in
+++ /dev/null
@@ -1,77 +0,0 @@
-#!/usr/bin/env python
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-"""FreeIPA python support library
-
-FreeIPA is a server for identity, policy, and audit.
-"""
-
-DOCLINES = __doc__.split("\n")
-
-import os
-import sys
-import distutils.sysconfig
-
-CLASSIFIERS = """\
-Development Status :: 4 - Beta
-Intended Audience :: System Environment/Base
-License :: GPL
-Programming Language :: Python
-Operating System :: POSIX
-Operating System :: Unix
-"""
-
-# BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
-# update it when the contents of directories change.
-if os.path.exists('MANIFEST'): os.remove('MANIFEST')
-
-def setup_package():
-
- from distutils.core import setup
-
- old_path = os.getcwd()
- local_path = os.path.dirname(os.path.abspath(sys.argv[0]))
- os.chdir(local_path)
- sys.path.insert(0,local_path)
-
- try:
- setup(
- name = "ipa",
- version = "__VERSION__",
- license = "GPL",
- author = "Karl MacMillan, et.al.",
- author_email = "kmacmill@redhat.com",
- maintainer = "freeIPA Developers",
- maintainer_email = "freeipa-devel@redhat.com",
- url = "http://www.freeipa.org/",
- description = DOCLINES[0],
- long_description = "\n".join(DOCLINES[2:]),
- download_url = "http://www.freeipa.org/page/Downloads",
- classifiers=filter(None, CLASSIFIERS.split('\n')),
- platforms = ["Linux", "Solaris", "Unix"],
- package_dir = {'ipa': ''},
- packages = [ "ipa" ],
- data_files = [('/etc/ipa', ['ipa.conf'])]
- )
- finally:
- del sys.path[0]
- os.chdir(old_path)
- return
-
-if __name__ == '__main__':
- setup_package()
diff --git a/ipa-python/sysrestore.py b/ipa-python/sysrestore.py
deleted file mode 100644
index 5d5692be6..000000000
--- a/ipa-python/sysrestore.py
+++ /dev/null
@@ -1,317 +0,0 @@
-# Authors: Mark McLoughlin <markmc@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-#
-# This module provides a very simple API which allows
-# ipa-xxx-install --uninstall to restore certain
-# parts of the system configuration to the way it was
-# before ipa-server-install was first run
-
-import os
-import os.path
-import errno
-import shutil
-import logging
-import ConfigParser
-import random
-import string
-
-from ipa import ipautil
-
-SYSRESTORE_PATH = "/tmp"
-SYSRESTORE_INDEXFILE = "sysrestore.index"
-SYSRESTORE_STATEFILE = "sysrestore.state"
-
-class FileStore:
- """Class for handling backup and restore of files"""
-
- def __init__(self, path = SYSRESTORE_PATH):
- """Create a _StoreFiles object, that uses @path as the
- base directory.
-
- The file @path/sysrestore.index is used to store information
- about the original location of the saved files.
- """
- self._path = path
- self._index = self._path + "/" + SYSRESTORE_INDEXFILE
-
- self.random = random.Random()
-
- self.files = {}
- self._load()
-
- def _load(self):
- """Load the file list from the index file. @files will
- be an empty dictionary if the file doesn't exist.
- """
-
- logging.debug("Loading Index file from '%s'", self._index)
-
- self.files = {}
-
- p = ConfigParser.SafeConfigParser()
- p.read(self._index)
-
- for section in p.sections():
- if section == "files":
- for (key, value) in p.items(section):
- self.files[key] = value
-
-
- def save(self):
- """Save the file list to @_index. If @files is an empty
- dict, then @_index should be removed.
- """
- logging.debug("Saving Index File to '%s'", self._index)
-
- if len(self.files) == 0:
- logging.debug(" -> no files, removing file")
- if os.path.exists(self._index):
- os.remove(self._index)
- return
-
- p = ConfigParser.SafeConfigParser()
-
- p.add_section('files')
- for (key, value) in self.files.items():
- p.set('files', key, str(value))
-
- f = file(self._index, "w")
- p.write(f)
- f.close()
-
- def backup_file(self, path):
- """Create a copy of the file at @path - so long as a copy
- does not already exist - which will be restored to its
- original location by restore_files().
- """
- logging.debug("Backing up system configuration file '%s'", path)
-
- if not os.path.isabs(path):
- raise ValueError("Absolute path required")
-
- if not os.path.isfile(path):
- logging.debug(" -> Not backing up - '%s' doesn't exist", path)
- return
-
- (reldir, file) = os.path.split(path)
-
- filename = ""
- for i in range(8):
- h = "%02x" % self.random.randint(0,255)
- filename += h
- filename += "-"+file
-
- backup_path = os.path.join(self._path, filename)
- if os.path.exists(backup_path):
- logging.debug(" -> Not backing up - already have a copy of '%s'", path)
- return
-
- shutil.copy2(path, backup_path)
-
- stat = os.stat(path)
-
- self.files[filename] = string.join([str(stat.st_mode),str(stat.st_uid),str(stat.st_gid),path], ',')
- self.save()
-
- def restore_file(self, path):
- """Restore the copy of a file at @path to its original
- location and delete the copy.
-
- Returns #True if the file was restored, #False if there
- was no backup file to restore
- """
-
- logging.debug("Restoring system configuration file '%s'", path)
-
- if not os.path.isabs(path):
- raise ValueError("Absolute path required")
-
- mode = None
- uid = None
- gid = None
- filename = None
-
- for (key, value) in self.files.items():
- (mode,uid,gid,filepath) = string.split(value, ',', 3)
- if (filepath == path):
- filename = key
- break
-
- if not filename:
- raise ValueError("No such file name in the index")
-
- backup_path = os.path.join(self._path, filename)
- if not os.path.exists(backup_path):
- logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path)
- return False
-
- shutil.move(backup_path, path)
- os.chown(path, int(uid), int(gid))
- os.chmod(path, int(mode))
-
- ipautil.run(["/sbin/restorecon", path])
-
- del self.files[filename]
- self.save()
-
- return True
-
- def restore_all_files(self):
- """Restore the files in the inbdex to their original
- location and delete the copy.
-
- Returns #True if the file was restored, #False if there
- was no backup file to restore
- """
-
- if len(self.files) == 0:
- return False
-
- for (filename, value) in self.files.items():
-
- (mode,uid,gid,path) = string.split(value, ',', 3)
-
- backup_path = os.path.join(self._path, filename)
- if not os.path.exists(backup_path):
- logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path)
-
- shutil.move(backup_path, path)
- os.chown(path, int(uid), int(gid))
- os.chmod(path, int(mode))
-
- ipautil.run(["/sbin/restorecon", path])
-
- #force file to be deleted
- self.files = {}
- self.save()
-
- return True
-
-class StateFile:
- """A metadata file for recording system state which can
- be backed up and later restored. The format is something
- like:
-
- [httpd]
- running=True
- enabled=False
- """
-
- def __init__(self, path = SYSRESTORE_PATH):
- """Create a StateFile object, loading from @path.
-
- The dictionary @modules, a member of the returned object,
- is where the state can be modified. @modules is indexed
- using a module name to return another dictionary containing
- key/value pairs with the saved state of that module.
-
- The keys in these latter dictionaries are arbitrary strings
- and the values may either be strings or booleans.
- """
- self._path = path+"/"+SYSRESTORE_STATEFILE
-
- self.modules = {}
-
- self._load()
-
- def _load(self):
- """Load the modules from the file @_path. @modules will
- be an empty dictionary if the file doesn't exist.
- """
- logging.debug("Loading StateFile from '%s'", self._path)
-
- self.modules = {}
-
- p = ConfigParser.SafeConfigParser()
- p.read(self._path)
-
- for module in p.sections():
- self.modules[module] = {}
- for (key, value) in p.items(module):
- if value == str(True):
- value = True
- elif value == str(False):
- value = False
- self.modules[module][key] = value
-
- def save(self):
- """Save the modules to @_path. If @modules is an empty
- dict, then @_path should be removed.
- """
- logging.debug("Saving StateFile to '%s'", self._path)
-
- for module in self.modules.keys():
- if len(self.modules[module]) == 0:
- del self.modules[module]
-
- if len(self.modules) == 0:
- logging.debug(" -> no modules, removing file")
- if os.path.exists(self._path):
- os.remove(self._path)
- return
-
- p = ConfigParser.SafeConfigParser()
-
- for module in self.modules.keys():
- p.add_section(module)
- for (key, value) in self.modules[module].items():
- p.set(module, key, str(value))
-
- f = file(self._path, "w")
- p.write(f)
- f.close()
-
- def backup_state(self, module, key, value):
- """Backup an item of system state from @module, identified
- by the string @key and with the value @value. @value may be
- a string or boolean.
- """
- if not (isinstance(value, str) or isinstance(value, bool)):
- raise ValueError("Only strings or booleans supported")
-
- if not self.modules.has_key(module):
- self.modules[module] = {}
-
- if not self.modules.has_key(key):
- self.modules[module][key] = value
-
- self.save()
-
- def restore_state(self, module, key):
- """Return the value of an item of system state from @module,
- identified by the string @key, and remove it from the backed
- up system state.
-
- If the item doesn't exist, #None will be returned, otherwise
- the original string or boolean value is returned.
- """
-
- if not self.modules.has_key(module):
- return None
-
- if not self.modules[module].has_key(key):
- return None
-
- value = self.modules[module][key]
- del self.modules[module][key]
-
- self.save()
-
- return value
diff --git a/ipa-python/test/test_aci.py b/ipa-python/test/test_aci.py
deleted file mode 100644
index fb9d84c7f..000000000
--- a/ipa-python/test/test_aci.py
+++ /dev/null
@@ -1,127 +0,0 @@
-#! /usr/bin/python -E
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import sys
-sys.path.insert(0, ".")
-
-import unittest
-import aci
-import urllib
-
-
-class TestACI(unittest.TestCase):
- acitemplate = ('(targetattr="%s")' +
- '(targetfilter="(memberOf=%s)")' +
- '(version 3.0;' +
- 'acl "%s";' +
- 'allow (write) ' +
- 'groupdn="ldap:///%s";)')
-
- def setUp(self):
- self.aci = aci.ACI()
-
- def tearDown(self):
- pass
-
- def testExport(self):
- self.aci.source_group = 'cn=foo, dc=freeipa, dc=org'
- self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org'
- self.aci.name = 'this is a "name'
- self.aci.attrs = ['field1', 'field2', 'field3']
-
- exportaci = self.aci.export_to_string()
- aci = TestACI.acitemplate % ('field1 || field2 || field3',
- self.aci.dest_group,
- 'this is a "name',
- self.aci.source_group)
-
- self.assertEqual(aci, exportaci)
-
- def testURLEncodedExport(self):
- self.aci.source_group = 'cn=foo " bar, dc=freeipa, dc=org'
- self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org'
- self.aci.name = 'this is a "name'
- self.aci.attrs = ['field1', 'field2', 'field3']
-
- exportaci = self.aci.export_to_string()
- aci = TestACI.acitemplate % ('field1 || field2 || field3',
- self.aci.dest_group,
- 'this is a "name',
- urllib.quote(self.aci.source_group, "/=, "))
-
- self.assertEqual(aci, exportaci)
-
- def testSimpleParse(self):
- attr_str = 'field3 || field4 || field5'
- dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
- name = 'my name'
- src_dn = 'cn=srcgroup, dc=freeipa, dc=org'
-
- acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn)
- self.aci.parse_acistr(acistr)
-
- self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs)
- self.assertEqual(dest_dn, self.aci.dest_group)
- self.assertEqual(name, self.aci.name)
- self.assertEqual(src_dn, self.aci.source_group)
-
- def testUrlEncodedParse(self):
- attr_str = 'field3 || field4 || field5'
- dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
- name = 'my name'
- src_dn = 'cn=src " group, dc=freeipa, dc=org'
-
- acistr = TestACI.acitemplate % (attr_str, dest_dn, name,
- urllib.quote(src_dn, "/=, "))
- self.aci.parse_acistr(acistr)
-
- self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs)
- self.assertEqual(dest_dn, self.aci.dest_group)
- self.assertEqual(name, self.aci.name)
- self.assertEqual(src_dn, self.aci.source_group)
-
- def testInvalidParse(self):
- try:
- self.aci.parse_acistr('foo bar')
- self.fail('Should have failed to parse')
- except SyntaxError:
- pass
-
- try:
- self.aci.parse_acistr('')
- self.fail('Should have failed to parse')
- except SyntaxError:
- pass
-
- attr_str = 'field3 || field4 || field5'
- dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
- name = 'my name'
- src_dn = 'cn=srcgroup, dc=freeipa, dc=org'
-
- acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn)
- acistr += 'trailing garbage'
- try:
- self.aci.parse_acistr('')
- self.fail('Should have failed to parse')
- except SyntaxError:
- pass
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/ipa-python/test/test_ipautil.py b/ipa-python/test/test_ipautil.py
deleted file mode 100644
index 60d53a270..000000000
--- a/ipa-python/test/test_ipautil.py
+++ /dev/null
@@ -1,309 +0,0 @@
-#! /usr/bin/python -E
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import sys
-sys.path.insert(0, ".")
-
-import unittest
-import datetime
-
-import ipautil
-
-
-class TestCIDict(unittest.TestCase):
- def setUp(self):
- self.cidict = ipautil.CIDict()
- self.cidict["Key1"] = "val1"
- self.cidict["key2"] = "val2"
- self.cidict["KEY3"] = "VAL3"
-
- def tearDown(self):
- pass
-
- def testLen(self):
- self.assertEqual(3, len(self.cidict))
-
- def test__GetItem(self):
- self.assertEqual("val1", self.cidict["Key1"])
- self.assertEqual("val1", self.cidict["key1"])
- self.assertEqual("val2", self.cidict["KEY2"])
- self.assertEqual("VAL3", self.cidict["key3"])
- self.assertEqual("VAL3", self.cidict["KEY3"])
- try:
- self.cidict["key4"]
- fail("should have raised KeyError")
- except KeyError:
- pass
-
- def testGet(self):
- self.assertEqual("val1", self.cidict.get("Key1"))
- self.assertEqual("val1", self.cidict.get("key1"))
- self.assertEqual("val2", self.cidict.get("KEY2"))
- self.assertEqual("VAL3", self.cidict.get("key3"))
- self.assertEqual("VAL3", self.cidict.get("KEY3"))
- self.assertEqual("default", self.cidict.get("key4", "default"))
-
- def test__SetItem(self):
- self.cidict["key4"] = "val4"
- self.assertEqual("val4", self.cidict["key4"])
- self.cidict["KEY4"] = "newval4"
- self.assertEqual("newval4", self.cidict["key4"])
-
- def testDel(self):
- self.assert_(self.cidict.has_key("Key1"))
- del(self.cidict["Key1"])
- self.failIf(self.cidict.has_key("Key1"))
-
- self.assert_(self.cidict.has_key("key2"))
- del(self.cidict["KEY2"])
- self.failIf(self.cidict.has_key("key2"))
-
- def testClear(self):
- self.assertEqual(3, len(self.cidict))
- self.cidict.clear()
- self.assertEqual(0, len(self.cidict))
-
- def testCopy(self):
- """A copy is no longer a CIDict, but should preserve the case of
- the keys as they were inserted."""
- copy = self.cidict.copy()
- self.assertEqual(3, len(copy))
- self.assert_(copy.has_key("Key1"))
- self.assertEqual("val1", copy["Key1"])
- self.failIf(copy.has_key("key1"))
-
- def testHasKey(self):
- self.assert_(self.cidict.has_key("KEY1"))
- self.assert_(self.cidict.has_key("key2"))
- self.assert_(self.cidict.has_key("key3"))
-
- def testItems(self):
- items = self.cidict.items()
- self.assertEqual(3, len(items))
- items_set = set(items)
- self.assert_(("Key1", "val1") in items_set)
- self.assert_(("key2", "val2") in items_set)
- self.assert_(("KEY3", "VAL3") in items_set)
-
- def testIterItems(self):
- items = []
- for (k,v) in self.cidict.iteritems():
- items.append((k,v))
- self.assertEqual(3, len(items))
- items_set = set(items)
- self.assert_(("Key1", "val1") in items_set)
- self.assert_(("key2", "val2") in items_set)
- self.assert_(("KEY3", "VAL3") in items_set)
-
- def testIterKeys(self):
- keys = []
- for k in self.cidict.iterkeys():
- keys.append(k)
- self.assertEqual(3, len(keys))
- keys_set = set(keys)
- self.assert_("Key1" in keys_set)
- self.assert_("key2" in keys_set)
- self.assert_("KEY3" in keys_set)
-
- def testIterValues(self):
- values = []
- for k in self.cidict.itervalues():
- values.append(k)
- self.assertEqual(3, len(values))
- values_set = set(values)
- self.assert_("val1" in values_set)
- self.assert_("val2" in values_set)
- self.assert_("VAL3" in values_set)
-
- def testKeys(self):
- keys = self.cidict.keys()
- self.assertEqual(3, len(keys))
- keys_set = set(keys)
- self.assert_("Key1" in keys_set)
- self.assert_("key2" in keys_set)
- self.assert_("KEY3" in keys_set)
-
- def testValues(self):
- values = self.cidict.values()
- self.assertEqual(3, len(values))
- values_set = set(values)
- self.assert_("val1" in values_set)
- self.assert_("val2" in values_set)
- self.assert_("VAL3" in values_set)
-
- def testUpdate(self):
- newdict = { "KEY2": "newval2",
- "key4": "val4" }
- self.cidict.update(newdict)
- self.assertEqual(4, len(self.cidict))
-
- items = self.cidict.items()
- self.assertEqual(4, len(items))
- items_set = set(items)
- self.assert_(("Key1", "val1") in items_set)
- # note the update "overwrites" the case of the key2
- self.assert_(("KEY2", "newval2") in items_set)
- self.assert_(("KEY3", "VAL3") in items_set)
- self.assert_(("key4", "val4") in items_set)
-
- def testSetDefault(self):
- self.assertEqual("val1", self.cidict.setdefault("KEY1", "default"))
-
- self.failIf(self.cidict.has_key("KEY4"))
- self.assertEqual("default", self.cidict.setdefault("KEY4", "default"))
- self.assert_(self.cidict.has_key("KEY4"))
- self.assertEqual("default", self.cidict["key4"])
-
- self.failIf(self.cidict.has_key("KEY5"))
- self.assertEqual(None, self.cidict.setdefault("KEY5"))
- self.assert_(self.cidict.has_key("KEY5"))
- self.assertEqual(None, self.cidict["key5"])
-
- def testPop(self):
- self.assertEqual("val1", self.cidict.pop("KEY1", "default"))
- self.failIf(self.cidict.has_key("key1"))
-
- self.assertEqual("val2", self.cidict.pop("KEY2"))
- self.failIf(self.cidict.has_key("key2"))
-
- self.assertEqual("default", self.cidict.pop("key4", "default"))
- try:
- self.cidict.pop("key4")
- fail("should have raised KeyError")
- except KeyError:
- pass
-
- def testPopItem(self):
- items = set(self.cidict.items())
- self.assertEqual(3, len(self.cidict))
-
- item = self.cidict.popitem()
- self.assertEqual(2, len(self.cidict))
- self.assert_(item in items)
- items.discard(item)
-
- item = self.cidict.popitem()
- self.assertEqual(1, len(self.cidict))
- self.assert_(item in items)
- items.discard(item)
-
- item = self.cidict.popitem()
- self.assertEqual(0, len(self.cidict))
- self.assert_(item in items)
- items.discard(item)
-
-class TestTimeParser(unittest.TestCase):
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def testSimple(self):
- timestr = "20070803"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(2007, time.year)
- self.assertEqual(8, time.month)
- self.assertEqual(3, time.day)
- self.assertEqual(0, time.hour)
- self.assertEqual(0, time.minute)
- self.assertEqual(0, time.second)
-
- def testHourMinSec(self):
- timestr = "20051213141205"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(2005, time.year)
- self.assertEqual(12, time.month)
- self.assertEqual(13, time.day)
- self.assertEqual(14, time.hour)
- self.assertEqual(12, time.minute)
- self.assertEqual(5, time.second)
-
- def testFractions(self):
- timestr = "2003092208.5"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(2003, time.year)
- self.assertEqual(9, time.month)
- self.assertEqual(22, time.day)
- self.assertEqual(8, time.hour)
- self.assertEqual(30, time.minute)
- self.assertEqual(0, time.second)
-
- timestr = "199203301544,25"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(1992, time.year)
- self.assertEqual(3, time.month)
- self.assertEqual(30, time.day)
- self.assertEqual(15, time.hour)
- self.assertEqual(44, time.minute)
- self.assertEqual(15, time.second)
-
- timestr = "20060401185912,8"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(2006, time.year)
- self.assertEqual(4, time.month)
- self.assertEqual(1, time.day)
- self.assertEqual(18, time.hour)
- self.assertEqual(59, time.minute)
- self.assertEqual(12, time.second)
- self.assertEqual(800000, time.microsecond)
-
- def testTimeZones(self):
- timestr = "20051213141205Z"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(0, time.tzinfo.houroffset)
- self.assertEqual(0, time.tzinfo.minoffset)
- offset = time.tzinfo.utcoffset(None)
- self.assertEqual(0, offset.seconds)
-
- timestr = "20051213141205+0500"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(5, time.tzinfo.houroffset)
- self.assertEqual(0, time.tzinfo.minoffset)
- offset = time.tzinfo.utcoffset(None)
- self.assertEqual(5 * 60 * 60, offset.seconds)
-
- timestr = "20051213141205-0500"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(-5, time.tzinfo.houroffset)
- self.assertEqual(0, time.tzinfo.minoffset)
- # NOTE - the offset is always positive - it's minutes
- # _east_ of UTC
- offset = time.tzinfo.utcoffset(None)
- self.assertEqual((24 - 5) * 60 * 60, offset.seconds)
-
- timestr = "20051213141205-0930"
-
- time = ipautil.parse_generalized_time(timestr)
- self.assertEqual(-9, time.tzinfo.houroffset)
- self.assertEqual(-30, time.tzinfo.minoffset)
- offset = time.tzinfo.utcoffset(None)
- self.assertEqual(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/ipa-python/test/test_ipavalidate.py b/ipa-python/test/test_ipavalidate.py
deleted file mode 100644
index 8b79fbf07..000000000
--- a/ipa-python/test/test_ipavalidate.py
+++ /dev/null
@@ -1,97 +0,0 @@
-#! /usr/bin/python -E
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import sys
-sys.path.insert(0, ".")
-
-import unittest
-
-import ipavalidate
-
-class TestValidate(unittest.TestCase):
- def setUp(self):
- pass
-
- def tearDown(self):
- pass
-
- def test_validEmail(self):
- self.assertEqual(True, ipavalidate.Email("test@freeipa.org"))
- self.assertEqual(True, ipavalidate.Email("", notEmpty=False))
-
- def test_invalidEmail(self):
- self.assertEqual(False, ipavalidate.Email("test"))
- self.assertEqual(False, ipavalidate.Email("test@freeipa"))
- self.assertEqual(False, ipavalidate.Email("test@.com"))
- self.assertEqual(False, ipavalidate.Email(""))
- self.assertEqual(False, ipavalidate.Email(None))
-
- def test_validPlain(self):
- self.assertEqual(True, ipavalidate.Plain("Joe User"))
- self.assertEqual(True, ipavalidate.Plain("Joe O'Malley"))
- self.assertEqual(True, ipavalidate.Plain("", notEmpty=False))
- self.assertEqual(True, ipavalidate.Plain(None, notEmpty=False))
- self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=False))
- self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=True))
-
- def test_invalidPlain(self):
- self.assertEqual(False, ipavalidate.Plain("Joe (User)"))
- self.assertEqual(False, ipavalidate.Plain("Joe C. User"))
- self.assertEqual(False, ipavalidate.Plain("", notEmpty=True))
- self.assertEqual(False, ipavalidate.Plain(None, notEmpty=True))
- self.assertEqual(False, ipavalidate.Plain("Joe User", allowSpaces=False))
- self.assertEqual(False, ipavalidate.Plain("Joe C. User"))
-
- def test_validString(self):
- self.assertEqual(True, ipavalidate.String("Joe User"))
- self.assertEqual(True, ipavalidate.String("Joe O'Malley"))
- self.assertEqual(True, ipavalidate.String("", notEmpty=False))
- self.assertEqual(True, ipavalidate.String(None, notEmpty=False))
- self.assertEqual(True, ipavalidate.String("Joe C. User"))
-
- def test_invalidString(self):
- self.assertEqual(False, ipavalidate.String("", notEmpty=True))
- self.assertEqual(False, ipavalidate.String(None, notEmpty=True))
-
- def test_validPath(self):
- self.assertEqual(True, ipavalidate.Path("/"))
- self.assertEqual(True, ipavalidate.Path("/home/user"))
- self.assertEqual(True, ipavalidate.Path("../home/user"))
- self.assertEqual(True, ipavalidate.Path("", notEmpty=False))
- self.assertEqual(True, ipavalidate.Path(None, notEmpty=False))
-
- def test_invalidPath(self):
- self.assertEqual(False, ipavalidate.Path("(foo)"))
- self.assertEqual(False, ipavalidate.Path("", notEmpty=True))
- self.assertEqual(False, ipavalidate.Path(None, notEmpty=True))
-
- def test_validName(self):
- self.assertEqual(True, ipavalidate.GoodName("foo"))
- self.assertEqual(True, ipavalidate.GoodName("1foo"))
- self.assertEqual(True, ipavalidate.GoodName("foo.bar"))
- self.assertEqual(True, ipavalidate.GoodName("foo.bar$"))
-
- def test_invalidName(self):
- self.assertEqual(False, ipavalidate.GoodName("foo bar"))
- self.assertEqual(False, ipavalidate.GoodName("foo%bar"))
- self.assertEqual(False, ipavalidate.GoodName("*foo"))
- self.assertEqual(False, ipavalidate.GoodName("$foo.bar$"))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/ipa-python/version.py.in b/ipa-python/version.py.in
deleted file mode 100644
index fdb689f02..000000000
--- a/ipa-python/version.py.in
+++ /dev/null
@@ -1,25 +0,0 @@
-# Authors: Rob Crittenden <rcritten@redhat.com>
-#
-# Copyright (C) 2007 Red Hat
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License as
-# published by the Free Software Foundation; version 2 only
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-# The full version including strings
-VERSION="__VERSION__"
-
-# Just the numeric portion of the version so one can do direct numeric
-# comparisons to see if the API is compatible.
-NUM_VERSION=__NUM_VERSION__