summaryrefslogtreecommitdiffstats
path: root/ipapython/ipautil.py
diff options
context:
space:
mode:
authorPetr Spacek <pspacek@redhat.com>2016-06-30 20:41:48 +0200
committerMartin Basti <mbasti@redhat.com>2016-07-01 10:35:39 +0200
commit5e78b54d7c532bec0ee5a4ce3f1b6d6c94d17c51 (patch)
tree13fd82fb19f99817eb20495e605c1dbfaa0c8196 /ipapython/ipautil.py
parentce1f9ca51bd91ed66233c1bac7eb05fac9c855c7 (diff)
downloadfreeipa-5e78b54d7c532bec0ee5a4ce3f1b6d6c94d17c51.tar.gz
freeipa-5e78b54d7c532bec0ee5a4ce3f1b6d6c94d17c51.tar.xz
freeipa-5e78b54d7c532bec0ee5a4ce3f1b6d6c94d17c51.zip
Fix internal errors in host-add and other commands caused by DNS resolution
Previously resolver was returning CheckedIPAddress objects. This internal server error in cases where DNS actually returned reserved IP addresses. Now the resolver is returning UnsafeIPAddress objects which do syntactic checks but do not filter IP addresses. From now on we can decide if some IP address should be accepted as-is or if it needs to be contrained to some subset of IP addresses using CheckedIPAddress class. This regression was caused by changes for https://fedorahosted.org/freeipa/ticket/5710 Reviewed-By: Martin Basti <mbasti@redhat.com>
Diffstat (limited to 'ipapython/ipautil.py')
-rw-r--r--ipapython/ipautil.py149
1 files changed, 90 insertions, 59 deletions
diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py
index 8506bf2d5..763a99c11 100644
--- a/ipapython/ipautil.py
+++ b/ipapython/ipautil.py
@@ -74,102 +74,133 @@ def get_domain_name():
return domain_name
-class CheckedIPAddress(netaddr.IPAddress):
+
+class UnsafeIPAddress(netaddr.IPAddress):
+ """Any valid IP address with or without netmask."""
# Use inet_pton() rather than inet_aton() for IP address parsing. We
# will use the same function in IPv4/IPv6 conversions + be stricter
# and don't allow IP addresses such as '1.1.1' in the same time
netaddr_ip_flags = netaddr.INET_PTON
+ def __init__(self, addr):
+ if isinstance(addr, UnsafeIPAddress):
+ self._net = addr._net
+ super(UnsafeIPAddress, self).__init__(addr,
+ flags=self.netaddr_ip_flags)
+ return
+
+ elif isinstance(addr, netaddr.IPAddress):
+ self._net = None # no information about netmask
+ super(UnsafeIPAddress, self).__init__(addr,
+ flags=self.netaddr_ip_flags)
+ return
+
+ elif isinstance(addr, netaddr.IPNetwork):
+ self._net = addr
+ super(UnsafeIPAddress, self).__init__(self._net.ip,
+ flags=self.netaddr_ip_flags)
+ return
+
+ # option of last resort: parse it as string
+ self._net = None
+ addr = str(addr)
+ try:
+ try:
+ addr = netaddr.IPAddress(addr, flags=self.netaddr_ip_flags)
+ except netaddr.AddrFormatError:
+ # netaddr.IPAddress doesn't handle zone indices in textual
+ # IPv6 addresses. Try removing zone index and parse the
+ # address again.
+ addr, sep, foo = addr.partition('%')
+ if sep != '%':
+ raise
+ addr = netaddr.IPAddress(addr, flags=self.netaddr_ip_flags)
+ if addr.version != 6:
+ raise
+ except ValueError:
+ self._net = netaddr.IPNetwork(addr, flags=self.netaddr_ip_flags)
+ addr = self._net.ip
+ super(UnsafeIPAddress, self).__init__(addr,
+ flags=self.netaddr_ip_flags)
+
+
+class CheckedIPAddress(UnsafeIPAddress):
+ """IPv4 or IPv6 address with additional constraints.
+
+ Reserved or link-local addresses are never accepted.
+ """
def __init__(self, addr, match_local=False, parse_netmask=True,
allow_network=False, allow_loopback=False,
allow_broadcast=False, allow_multicast=False):
+
+ super(CheckedIPAddress, self).__init__(addr)
if isinstance(addr, CheckedIPAddress):
- super(CheckedIPAddress, self).__init__(addr, flags=self.netaddr_ip_flags)
self.prefixlen = addr.prefixlen
return
- net = None
- iface = None
-
- if isinstance(addr, netaddr.IPNetwork):
- net = addr
- addr = net.ip
- elif isinstance(addr, netaddr.IPAddress):
- pass
- else:
- try:
- try:
- addr = netaddr.IPAddress(str(addr), flags=self.netaddr_ip_flags)
- except netaddr.AddrFormatError:
- # netaddr.IPAddress doesn't handle zone indices in textual
- # IPv6 addresses. Try removing zone index and parse the
- # address again.
- if not isinstance(addr, six.string_types):
- raise
- addr, sep, foo = addr.partition('%')
- if sep != '%':
- raise
- addr = netaddr.IPAddress(str(addr), flags=self.netaddr_ip_flags)
- if addr.version != 6:
- raise
- except ValueError:
- net = netaddr.IPNetwork(str(addr), flags=self.netaddr_ip_flags)
- if not parse_netmask:
- raise ValueError("netmask and prefix length not allowed here")
- addr = net.ip
+ if not parse_netmask and self._net:
+ raise ValueError(
+ "netmask and prefix length not allowed here: {}".format(addr))
- if addr.version not in (4, 6):
- raise ValueError("unsupported IP version")
+ if self.version not in (4, 6):
+ raise ValueError("unsupported IP version {}".format(self.version))
- if not allow_loopback and addr.is_loopback():
- raise ValueError("cannot use loopback IP address")
- if (not addr.is_loopback() and addr.is_reserved()) \
- or addr in netaddr.ip.IPV4_6TO4:
- raise ValueError("cannot use IANA reserved IP address")
+ if not allow_loopback and self.is_loopback():
+ raise ValueError("cannot use loopback IP address {}".format(addr))
+ if (not self.is_loopback() and self.is_reserved()) \
+ or self in netaddr.ip.IPV4_6TO4:
+ raise ValueError(
+ "cannot use IANA reserved IP address {}".format(addr))
- if addr.is_link_local():
- raise ValueError("cannot use link-local IP address")
- if not allow_multicast and addr.is_multicast():
- raise ValueError("cannot use multicast IP address")
+ if self.is_link_local():
+ raise ValueError(
+ "cannot use link-local IP address {}".format(addr))
+ if not allow_multicast and self.is_multicast():
+ raise ValueError("cannot use multicast IP address {}".format(addr))
if match_local:
- if addr.version == 4:
+ if self.version == 4:
family = netifaces.AF_INET
- elif addr.version == 6:
+ elif self.version == 6:
family = netifaces.AF_INET6
else:
raise ValueError(
- "Unsupported address family ({})".format(addr.version)
+ "Unsupported address family ({})".format(self.version)
)
+ iface = None
for interface in netifaces.interfaces():
for ifdata in netifaces.ifaddresses(interface).get(family, []):
ifnet = netaddr.IPNetwork('{addr}/{netmask}'.format(
addr=ifdata['addr'],
netmask=ifdata['netmask']
))
- if ifnet == net or (net is None and ifnet.ip == addr):
- net = ifnet
+ if ifnet == self._net or (
+ self._net is None and ifnet.ip == self):
+ self._net = ifnet
iface = interface
break
if iface is None:
- raise ValueError('No network interface matches the provided IP address and netmask')
+ raise ValueError('no network interface matches the IP address '
+ 'and netmask {}'.format(addr))
+
+ if self._net is None:
+ if self.version == 4:
+ self._net = netaddr.IPNetwork(
+ netaddr.cidr_abbrev_to_verbose(str(self)))
+ elif self.version == 6:
+ self._net = netaddr.IPNetwork(str(self) + '/64')
- if net is None:
- if addr.version == 4:
- net = netaddr.IPNetwork(netaddr.cidr_abbrev_to_verbose(str(addr)))
- elif addr.version == 6:
- net = netaddr.IPNetwork(str(addr) + '/64')
+ if not allow_network and self == self._net.network:
+ raise ValueError("cannot use IP network address {}".format(addr))
+ if not allow_broadcast and (self.version == 4 and
+ self == self._net.broadcast):
+ raise ValueError("cannot use broadcast IP address {}".format(addr))
- if not allow_network and addr == net.network:
- raise ValueError("cannot use IP network address")
- if not allow_broadcast and addr.version == 4 and addr == net.broadcast:
- raise ValueError("cannot use broadcast IP address")
+ self.prefixlen = self._net.prefixlen
- super(CheckedIPAddress, self).__init__(addr, flags=self.netaddr_ip_flags)
- self.prefixlen = net.prefixlen
def valid_ip(addr):
return netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr)