path: root/ipalib
diff options
authorMartin Kosek <>2011-11-09 17:35:52 +0100
committerRob Crittenden <>2011-11-10 18:48:41 -0500
commitefc3e2c1f7a3dcf5e94736395d39e1fa2800a490 (patch)
tree159e01182b453d41c8b3baa39e9f5bb36079c814 /ipalib
parent9405e1a9db11294a11efa24a7a3c36ea76a42f31 (diff)
Improve DNS record data validation
Implement missing validators for DNS RR types so that we can capture at least basic user errors. Additionally, a normalizer creating a fully-qualified domain name has been implemented for several RRs where name server may mis-interpret the domain name otherwise. Unit tests exercising these new validators for the most common RR types have been added. This patch also consolidates hard-coded values in DNS test to one place.
Diffstat (limited to 'ipalib')
2 files changed, 374 insertions, 23 deletions
diff --git a/ipalib/plugins/ b/ipalib/plugins/
index 912fd36f0..0a0bcb79c 100644
--- a/ipalib/plugins/
+++ b/ipalib/plugins/
@@ -20,13 +20,14 @@
import netaddr
import time
+import re
from ipalib import api, errors, output
from ipalib import Command
from ipalib import Flag, Bool, Int, List, Str, StrEnum
from ipalib.plugins.baseldap import *
from ipalib import _, ngettext
-from ipalib.util import validate_zonemgr
+from ipalib.util import validate_zonemgr, validate_hostname
from ipapython import dnsclient
from ipapython.ipautil import valid_ip
from ldap import explode_dn
@@ -59,6 +60,9 @@ EXAMPLES:
Delete previously added nameserver from
ipa dnsrecord-del @ --ns-rec
+ Add LOC record for
+ ipa dnsrecord-add @ --loc-rec "49 11 42.4 N 16 36 29.6 E 227.64m"
Add new A record for (random IP)
ipa dnsrecord-add www --a-rec
@@ -173,79 +177,412 @@ def _validate_ipaddr(ugettext, ipaddr):
ip = netaddr.IPAddress(ipaddr)
except (netaddr.AddrFormatError, ValueError):
- return u'invalid address format'
+ return _('invalid IP address format')
return None
def _validate_ipnet(ugettext, ipnet):
net = netaddr.IPNetwork(ipnet)
except (netaddr.AddrFormatError, ValueError, UnboundLocalError):
- return u'invalid format'
+ return _('invalid IP network format')
return None
def _validate_srv(ugettext, srv):
+ """see RFC 2782"""
prio, weight, port, host = srv.split()
except ValueError:
- return u'format must be specified as "priority weight port target"'
+ return _('format must be specified as "priority weight port target"')
prio = int(prio)
weight = int(weight)
port = int(port)
except ValueError:
- return u'the values of priority, weight and port must be integers'
+ return _('format must be specified as "priority weight port target" '\
+ '(see RFC 2782 for details)')
return None
def _validate_mx(ugettext, mx):
+ """see RFC 1035"""
prio, host = mx.split()
except ValueError:
- return u'format must be specified as "priority mailserver"'
+ return _('format must be specified as "priority mailserver" '\
+ '(see RFC 1035 for details)')
prio = int(prio)
except ValueError:
- return u'the value of priority must be integer'
+ return _('the value of priority must be integer')
if prio < 0 or prio > 65535:
- return u'the value of priority must be between 0 and 65535'
+ return _('the value of priority must be between 0 and 65535')
return None
def _validate_naptr(ugettext, naptr):
- "see RFC 2915 "
+ """see RFC 2915"""
order, pref, flags, svc, regexp, replacement = naptr.split()
except ValueError:
- return u'format must be specified as "order preference flags service regexp replacement"'
+ return _('format must be specified as "order preference flags service '\
+ 'regexp replacement" (see RFC 2915 for details)')
order = int(order)
pref = int(pref)
except ValueError:
- return u'order and preference must be integers'
+ return _('order and preference must be integers')
if order < 0 or order > 65535 or pref < 0 or pref > 65535:
- return u'the value of order and preference must be between 0 and 65535'
+ return _('the value of order and preference must be between 0 and 65535')
flags = flags.replace('"','')
flags = flags.replace('\'','')
if len(flags) != 1:
- return u'flag must be a single character (quotation is allowed)'
+ return _('flag must be a single character (quotation is allowed)')
if flags.upper() not in "SAUP":
- return u'flag must be one of "S", "A", "U", or "P"'
+ return _('flag must be one of "S", "A", "U", or "P"')
+ return None
+def _validate_afsdb(ugettext, afsdb):
+ """see RFC 1183"""
+ try:
+ sub, host = afsdb.split()
+ except ValueError:
+ return _('format must be specified as "subtype hostname" (see RFC 1183 for details)')
+ try:
+ sub = int(sub)
+ except ValueError:
+ return _('the value of subtype must be integer')
+ if sub < 0 or sub > 65535:
+ return _('the value of subtype must be between 0 and 65535')
+ return None
+def _validate_cert(ugettext, cert):
+ """see RFC 4398"""
+ try:
+ cert_type, key_tag, algorithm, certificate = cert.split()
+ except ValueError:
+ return _('format must be specified as "type key_tag algorithm certificate_or_crl" '\
+ '(see RFC 4398 for details)')
+ try:
+ cert_type = int(cert_type)
+ key_tag = int(key_tag)
+ algorithm = int(algorithm)
+ except ValueError:
+ return _('key_tag, algorithm and digest_type must be integers')
+ if cert_type < 0 or cert_type > 65535 or key_tag < 0 or key_tag > 65535:
+ return _('the value of type and key_tag must be between 0 and 65535')
+ if algorithm < 0 or algorithm > 255:
+ return _('the value of algorithm must be between 0 and 255')
+ return None
+def _validate_cname(ugettext, cname):
+ """see RFC 1035"""
+ try:
+ validate_hostname(cname)
+ except ValueError, e:
+ return _('format must be specified as "domain_name" (see RFC 1035 for details): %s') \
+ % unicode(e)
+ return None
+def _validate_dname(ugettext, dname):
+ """see RFC 2672"""
+ try:
+ validate_hostname(dname)
+ except ValueError, e:
+ return _('format must be specified as "target" (see RFC 2672 for details): %s') \
+ % unicode(e)
+ return None
+def _validate_ds(ugettext, ds):
+ """see RFC 4034"""
+ try:
+ key_tag, algorithm, digest_type, digest = ds.split()
+ except ValueError:
+ return _('format must be specified as "key_tag algorithm digest_type digest" '\
+ '(see RFC 4034 for details)')
+ try:
+ key_tag = int(key_tag)
+ algorithm = int(algorithm)
+ digest_type = int(digest_type)
+ except ValueError:
+ return _('key_tag, algorithm and digest_type must be integers')
+ if key_tag < 0 or key_tag > 65535:
+ return _('the value of flags must be between 0 and 65535')
+ if algorithm < 0 or algorithm > 255 or digest_type < 0 or digest_type > 255:
+ return _('the value of algorithm and digest_type must be between 0 and 255')
+ return None
+def _validate_key(ugettext, key):
+ """see RFC 2535"""
+ try:
+ flags, protocol, algorithm, digest = key.split()
+ except ValueError:
+ return _('format must be specified as "flags protocol algorithm public_key" '\
+ '(see RFC 2535 for details)')
+ try:
+ flags = int(flags)
+ protocol = int(protocol)
+ algorithm = int(algorithm)
+ except ValueError:
+ return _('flags, protocol and algorithm must be integers')
+ if flags < 0 or flags > 65535:
+ return _('the value of flags must be between 0 and 65535')
+ if protocol < 0 or protocol > 255:
+ return _('the value of protocol must be between 0 and 255')
+ if algorithm < 0 or algorithm > 255:
+ return _('the value of algorithm must be between 0 and 255')
+ return None
+def _validate_loc(ugettext, loc):
+ """see RFC 1876"""
+ regex = re.compile(\
+ r'(?P<d1>\d{1,2}\s+)(?P<m1>\d{1,2}\s+)?(?P<s1>\d{1,2}\.?\d{1,3}?\s+)'\
+ r'?[N|S]\s+'\
+ r'(?P<d2>\d{1,2}\s+)(?P<m2>\d{1,2}\s+)?(?P<s2>\d{1,2}\.?\d{1,3}?\s+)'\
+ r'?[W|E]\s+'\
+ r'(?P<alt>-?\d{1,8}\.?\d{1,2}?)m?\s*'\
+ r'(?P<siz>\d{1,8}\.?\d{1,2}?)?m?\s*'\
+ r'(?P<hp>\d{1,8}\.?\d{1,2}?)?m?\s*(?P<vp>\d{1,8}\.?\d{1,2}?)?m?\s*$')
+ m = regex.match(loc)
+ if m is None:
+ return _("""format must be specified as
+ "d1 [m1 [s1]] {"N"|"S"} d2 [m2 [s2]] {"E"|"W"} alt["m"] [siz["m"] [hp["m"] [vp["m"]]]]"
+ where:
+ d1: [0 .. 90] (degrees latitude)
+ d2: [0 .. 180] (degrees longitude)
+ m1, m2: [0 .. 59] (minutes latitude/longitude)
+ s1, s2: [0 .. 59.999] (seconds latitude/longitude)
+ alt: [-100000.00 .. 42849672.95] BY .01 (altitude in meters)
+ siz, hp, vp: [0 .. 90000000.00] (size/precision in meters)
+ See RFC 1876 for details""")
+ attrs = {}
+ for attr in ('d1', 'd2', 'm1', 'm2'):
+ if is not None:
+ try:
+ attrs[attr] = int(
+ except ValueError:
+ return _('%s must be integer') % attr
+ for attr in ('s1', 's2', 'alt', 'siz', 'hp', 'vp'):
+ if is not None:
+ try:
+ attrs[attr] = float(
+ except ValueError:
+ return _('%s must be float') % attr
+ if attrs.get('d1', 0) > 90 or attrs.get('d2', 0) > 90:
+ return _(u'd1 and d2 must be between 0 and 90')
+ if attrs.get('m1', 0) >= 60 or attrs.get('m2', 0) >= 60 or \
+ attrs.get('s1', 0) >= 60 or attrs.get('s2', 0) >= 60:
+ return _('m1, m2, s1 and s2 must be between 0 and 59.999')
+ if attrs.get('alt', 0) < -100000.00 or attrs.get('alt', 0) > 42849672.95:
+ return _('alt must be between -100000.00 and 42849672.95')
+ if attrs.get('siz', 0) > 90000000.00 or attrs.get('hp', 0) > 90000000.00 or \
+ attrs.get('vp', 0) > 90000000.00:
+ return _('siz, hp and vp must be between 0 and 90000000.00')
+ return None
+def _validate_ns(ugettext, ns):
+ """see RFC 1035"""
+ try:
+ ns, = ns.split()
+ except ValueError:
+ return _('format must be specified as "domain_name" (see RFC 1035 for details)')
+ return None
+def _validate_nsec(ugettext, nsec):
+ """see RFC 4034"""
+ fields = nsec.split()
+ if len(fields) < 2:
+ return _('format must be specified as "next_domain_name type1 '\
+ '[type2 [type3 [...]]]" (see RFC 4034 for details)')
+ allowed_types = (u'SOA',) + _record_types
+ for i in range(1, len(fields)):
+ sig_type = fields[i]
+ if sig_type not in allowed_types:
+ return _('type must be one of ' + u', '.join(allowed_types))
return None
+def _validate_kx(ugettext, kx):
+ """see RFC 2230"""
+ try:
+ preference, exchanger = kx.split()
+ except ValueError:
+ return _('format must be specified as "preference exchanger" '\
+ '(see RFC 2230 for details)')
+ try:
+ preference = int(preference)
+ except ValueError:
+ return _(u'the value of preference must be integer')
+ if preference < 0 or preference > 65535:
+ return _('the value of preference must be between 0 and 65535')
+ return None
+def _validate_ptr(ugettext, ptr):
+ """see RFC 1035"""
+ try:
+ validate_hostname(ptr)
+ except ValueError, e:
+ return _('format must be specified as "domain_name" (see RFC 1035 for details): %s') \
+ % unicode(e)
+ return None
+def _validate_sig(ugettext, sig):
+ """see RFCs 2535, 4034"""
+ try:
+ sig_type, algorithm, labels, ttl, sig_expiration, \
+ sig_inception, tag, signer, signature = sig.split()
+ except ValueError:
+ return _('format must be specified as "type_covered algorithm labels original_ttl ' \
+ 'signature_expiration signature_inception key_tag signers_name signature" '\
+ '(see RFC 2535, 4034 for details)')
+ allowed_types = [x for x in _record_types if x != u'SIG']
+ if sig_type not in allowed_types:
+ return _('type_covered must be one of ' + u', '.join(allowed_types))
+ try:
+ algorithm = int(algorithm)
+ labels = int(labels)
+ ttl = int(ttl)
+ tag = int(tag)
+ except ValueError:
+ return _('algorithm, labels, original_ttl and key_tag must be integers')
+ try:
+ time_format = "%Y%m%d%H%M%S"
+ sig_inception = time.strptime(sig_inception, time_format)
+ sig_expiration = time.strptime(sig_expiration, time_format)
+ except ValueError, e:
+ return _('signature_expiration and signature_inception must follow time ' \
+ 'format "YYYYMMDDHHMMSS"')
+ if algorithm < 0 or algorithm > 255 or labels < 0 or labels > 255:
+ return _('the value of algorithm and labels must be between 0 and 255')
+ if ttl < 0 or ttl > 4294967295:
+ return _('the value of original_ttl must be between 0 and 4294967295')
+ if tag < 0 or tag > 65535:
+ return _('the value of tag must be between 0 and 65535')
+ return None
+def _validate_sshfp(ugettext, sshfp):
+ """see RFCs 4255"""
+ try:
+ algorithm, fp_type, fingerprint = sshfp.split()
+ except ValueError:
+ return _('format must be specified as "algorithm fp_type fingerprint" '\
+ '(see RFC 4255 for details)')
+ try:
+ algorithm = int(algorithm)
+ fp_type = int(fp_type)
+ except ValueError:
+ return _('algorithm and fp_type must be integers')
+ if algorithm < 0 or algorithm > 255 or fp_type < 0 or fp_type > 255:
+ return _('the value of algorithm and fp_type must be between 0 and 255')
+ return None
+def _validate_unsupported(ugettext, val):
+ """
+ See for a
+ list of supported records in bind-dyndb-ldap plugin
+ """
+ return _('This DNS RR type is not supported by bind-dyndb-ldap plugin')
+def _normalize_domain_name(domain_name):
+ """Make it fully-qualified"""
+ if domain_name[-1] != '.':
+ return domain_name + '.'
+ else:
+ return domain_name
+# Not validated RR types:
+# - A6: downgraded to experimental state by RFC 3363, AAAA is preferred
_record_validators = {
u'A': _validate_ipaddr,
u'AAAA': _validate_ipaddr,
- u'APL': _validate_ipnet,
- u'SRV': _validate_srv,
+ u'AFSDB': _validate_afsdb,
+ u'APL': _validate_unsupported,
+ u'CERT': _validate_cert,
+ u'CNAME': _validate_cname,
+ u'DHCID': _validate_unsupported,
+ u'DLV': _validate_unsupported,
+ u'DNAME': _validate_dname,
+ u'DNSKEY': _validate_unsupported,
+ u'DS': _validate_ds,
+ u'HIP': _validate_unsupported,
+ u'KEY': _validate_key,
+ u'IPSECKEY': _validate_unsupported,
+ u'KX': _validate_kx,
+ u'LOC': _validate_loc,
u'MX': _validate_mx,
+ u'NS': _validate_ns,
+ u'NSEC': _validate_nsec,
+ u'NSEC3': _validate_unsupported,
+ u'NSEC3PARAM': _validate_unsupported,
u'NAPTR': _validate_naptr,
+ u'PTR': _validate_ptr,
+ u'RP': _validate_unsupported,
+ u'SRV': _validate_srv,
+ u'SIG': _validate_sig,
+ u'RRSIG': _validate_sig,
+ u'SSHFP': _validate_sshfp,
+ u'TA': _validate_unsupported,
+ u'TKEY': _validate_unsupported,
+ u'TSIG': _validate_unsupported,
+_record_normalizers = {
+ u'CNAME': _normalize_domain_name,
+ u'DNAME': _normalize_domain_name,
+ u'NS': _normalize_domain_name,
+ u'PTR': _normalize_domain_name,
# dictionary of valid reverse zone -> number of address components
@@ -654,11 +991,6 @@ class dnsrecord(LDAPObject):
error=unicode(_('Reverse zone %s requires exactly %d IP address components, %d given')
% (zone_name, zone_len, ip_addr_comp_count)))
- for ptr in options['ptrrecord']:
- if not ptr.endswith('.'):
- raise errors.ValidationError(name='ptr-rec',
- error=unicode(_('PTR record \'%s\' is not fully qualified (check trailing \'.\')') % ptr))
return dn
def is_pkey_zone_record(self, *keys):
@@ -720,16 +1052,18 @@ class dnsrecord_cmd_w_record_options(Command):
def get_record_option(self, rec_type):
doc = self.record_param_doc % rec_type
validator = _record_validators.get(rec_type)
+ normalizer = _record_normalizers.get(rec_type)
if validator:
return List(
- '%srecord?' % rec_type.lower(), validator,
+ '%srecord?' % rec_type.lower(), validator, normalizer=normalizer,
cli_name='%s_rec' % rec_type.lower(), doc=doc,
label='%s record' % rec_type, attribute=True
return List(
'%srecord?' % rec_type.lower(), cli_name='%s_rec' % rec_type.lower(),
- doc=doc, label='%s record' % rec_type, attribute=True
+ normalizer=normalizer, doc=doc, label='%s record' % rec_type,
+ attribute=True
def prompt_record_options(self, rec_type_list):
diff --git a/ipalib/ b/ipalib/
index fa93cc750..7a4d256d7 100644
--- a/ipalib/
+++ b/ipalib/
@@ -233,3 +233,20 @@ def validate_zonemgr(zonemgr):
if not all(regex_domain.match(part) for part in domain.split(".")):
raise ValueError(_('domain name may only include letters, numbers, and -'))
+def validate_hostname(hostname):
+ """ See RFC 952, 1123"""
+ regex_name = re.compile(r'^[a-z0-9]([a-z0-9-]?[a-z0-9])*$', re.IGNORECASE)
+ if len(hostname) > 255:
+ raise ValueError(_('cannot be longer that 255 characters'))
+ if hostname.endswith('.'):
+ hostname = hostname[:-1]
+ if '.' not in hostname:
+ raise ValueError(_('hostname is not fully qualified'))
+ if not all(regex_name.match(part) for part in hostname.split(".")):
+ raise ValueError(_('hostname parts may only include letters, numbers, and - ' \
+ '(which is not allowed as the last character)'))