From 80b4b3d44bbbe745e644b56c5371ef5f4cda6600 Mon Sep 17 00:00:00 2001 From: Jan Cholasta Date: Fri, 27 May 2011 20:17:22 +0200 Subject: Parse netmasks in IP addresses passed to server install. ticket 1212 --- ipapython/config.py | 13 +++++++++- ipapython/ipautil.py | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 1 deletion(-) (limited to 'ipapython') diff --git a/ipapython/config.py b/ipapython/config.py index 7e5b19515..c78508541 100644 --- a/ipapython/config.py +++ b/ipapython/config.py @@ -18,7 +18,8 @@ # import ConfigParser -from optparse import Option, Values, OptionParser, IndentedHelpFormatter +from optparse import Option, Values, OptionParser, IndentedHelpFormatter, OptionValueError +from copy import copy import socket import ipapython.dnsclient @@ -46,12 +47,22 @@ class IPAFormatter(IndentedHelpFormatter): ret += "%s %s\n" % (spacing, line) return ret +def check_ip_option(option, opt, value): + from ipapython.ipautil import CheckedIPAddress + try: + return CheckedIPAddress(value, parse_netmask=(option.type == "ipnet")) + except Exception as e: + raise OptionValueError("option %s: invalid IP address %s: %s" % (opt, value, e)) + class IPAOption(Option): """ optparse.Option subclass with support of options labeled as security-sensitive such as passwords. """ ATTRS = Option.ATTRS + ["sensitive"] + TYPES = Option.TYPES + ("ipaddr", "ipnet") + TYPE_CHECKER = copy(Option.TYPE_CHECKER) + TYPE_CHECKER["ipaddr"] = TYPE_CHECKER["ipnet"] = check_ip_option class IPAOptionParser(OptionParser): """ diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py index 4280cd9f4..444487ad9 100644 --- a/ipapython/ipautil.py +++ b/ipapython/ipautil.py @@ -39,6 +39,7 @@ from types import * import re import xmlrpclib import datetime +import netaddr from ipapython import config try: from subprocess import CalledProcessError @@ -63,6 +64,72 @@ def get_domain_name(): return domain_name +class CheckedIPAddress(netaddr.IPAddress): + def __init__(self, addr, match_local=True, parse_netmask=True): + if isinstance(addr, CheckedIPAddress): + super(CheckedIPAddress, self).__init__(addr) + self.prefixlen = addr.prefixlen + self.defaultnet = addr.defaultnet + self.interface = addr.interface + return + + net = None + iface = None + defnet = False + + if isinstance(addr, netaddr.IPNetwork): + net = addr + addr = net.ip + elif isinstance(addr, netaddr.IPAddress): + pass + else: + try: + addr = netaddr.IPAddress(addr) + except ValueError: + net = netaddr.IPNetwork(addr) + if not parse_netmask: + raise ValueError("netmask and prefix length not allowed here") + addr = net.ip + + if addr.version not in (4, 6): + raise ValueError("unsupported IP version") + if addr.is_loopback(): + raise ValueError("cannot use loopback IP address") + + if match_local: + if addr.version == 4: + family = 'inet' + elif addr.version == 6: + family = 'inet6' + + ipresult = run(['/sbin/ip', '-family', family, '-oneline', 'address', 'show']) + lines = ipresult[0].split('\n') + for line in lines: + fields = line.split() + if len(fields) < 4: + continue + + ifnet = netaddr.IPNetwork(fields[3]) + if ifnet == net or ifnet.ip == addr: + net = ifnet + iface = fields[1] + break + + if net is None: + defnet = True + if addr.version == 4: + net = netaddr.IPNetwork(netaddr.cidr_abbrev_to_verbose(str(addr))) + elif addr.version == 6: + net = netaddr.IPNetwork(str(addr) + '/64') + + super(CheckedIPAddress, self).__init__(addr) + self.prefixlen = net.prefixlen + self.defaultnet = defnet + self.interface = iface + + def is_local(self): + return self.interface is not None + def realm_to_suffix(realm_name): s = realm_name.split(".") terms = ["dc=" + x.lower() for x in s] -- cgit