summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/dns.py
diff options
context:
space:
mode:
authorMartin Basti <mbasti@redhat.com>2014-05-23 17:57:20 +0200
committerPetr Vobornik <pvoborni@redhat.com>2014-06-20 13:14:45 +0200
commit727f5f33732df252fa99d5c168d6727589ee6076 (patch)
tree6d15276c192cf2b63c2751e0ebb9a7ba85a86802 /ipalib/plugins/dns.py
parent266015c3e28c04894cd3a52ea2f99c25eef2c6a9 (diff)
downloadfreeipa-727f5f33732df252fa99d5c168d6727589ee6076.tar.gz
freeipa-727f5f33732df252fa99d5c168d6727589ee6076.tar.xz
freeipa-727f5f33732df252fa99d5c168d6727589ee6076.zip
Create BASE zone class
Zones and forward zones have a lot of common code, this patch remove duplications by creating a DNSBase class and its subclasses design: http://www.freeipa.org/page/V4/Forward_zones Ticket: https://fedorahosted.org/freeipa/ticket/3210 Reviewed-By: Petr Vobornik <pvoborni@redhat.com>
Diffstat (limited to 'ipalib/plugins/dns.py')
-rw-r--r--ipalib/plugins/dns.py861
1 files changed, 333 insertions, 528 deletions
diff --git a/ipalib/plugins/dns.py b/ipalib/plugins/dns.py
index 7de4cf249..6c19f8b8c 100644
--- a/ipalib/plugins/dns.py
+++ b/ipalib/plugins/dns.py
@@ -1694,24 +1694,16 @@ def _records_idn_postprocess(record, **options):
record[attr] = rrs
-@register()
-class dnszone(LDAPObject):
+class DNSZoneBase(LDAPObject):
"""
- DNS Zone, container for resource records.
+ Base class for DNS Zone
"""
container_dn = api.env.container_dns
- object_name = _('DNS zone')
- object_name_plural = _('DNS zones')
- object_class = ['top', 'idnsrecord', 'idnszone']
+ object_class = ['top']
possible_objectclasses = ['ipadnszone']
default_attributes = [
- 'idnsname', 'idnszoneactive', 'idnssoamname', 'idnssoarname',
- 'idnssoaserial', 'idnssoarefresh', 'idnssoaretry', 'idnssoaexpire',
- 'idnssoaminimum', 'idnsallowquery', 'idnsallowtransfer',
- 'idnsforwarders', 'idnsforwardpolicy', 'idnssecinlinesigning',
- ] + _record_attributes
- label = _('DNS Zones')
- label_singular = _('DNS Zone')
+ 'idnsname', 'idnszoneactive', 'idnsforwarders', 'idnsforwardpolicy'
+ ]
takes_params = (
DNSNameParam('idnsname',
@@ -1727,6 +1719,296 @@ class dnszone(LDAPObject):
doc=_('IP network to create reverse zone name from'),
flags=('virtual_attribute',),
),
+ Bool('idnszoneactive?',
+ cli_name='zone_active',
+ label=_('Active zone'),
+ doc=_('Is zone active?'),
+ flags=['no_create', 'no_update'],
+ attribute=True,
+ ),
+ Str('idnsforwarders*',
+ _validate_bind_forwarder,
+ cli_name='forwarder',
+ label=_('Zone forwarders'),
+ doc=_('Per-zone forwarders. A custom port can be specified '
+ 'for each forwarder using a standard format "IP_ADDRESS port PORT"'),
+ csv=True,
+ ),
+ StrEnum('idnsforwardpolicy?',
+ cli_name='forward_policy',
+ label=_('Forward policy'),
+ doc=_('Per-zone conditional forwarding policy. Set to "none" to '
+ 'disable forwarding to global forwarder for this zone. In '
+ 'that case, conditional zone forwarders are disregarded.'),
+ values=(u'only', u'first', u'none'),
+ ),
+
+ )
+
+ def get_dn(self, *keys, **options):
+ zone = keys[-1]
+ assert isinstance(zone, DNSName)
+ assert zone.is_absolute()
+ zone = zone.ToASCII()
+
+ # try first relative name, a new zone has to be added as absolute
+ # otherwise ObjectViolation is raised
+ zone = zone[:-1]
+ dn = super(DNSZoneBase, self).get_dn(zone, **options)
+ try:
+ self.backend.get_entry(dn, [''])
+ except errors.NotFound:
+ zone = u"%s." % zone
+ dn = super(DNSZoneBase, self).get_dn(zone, **options)
+
+ return dn
+
+ def permission_name(self, zone):
+ assert isinstance(zone, DNSName)
+ return u"Manage DNS zone %s" % zone.ToASCII()
+
+ def get_name_in_zone(self, zone, hostname):
+ """
+ Get name of a record that is to be added to a new zone. I.e. when
+ we want to add record "ipa.lab.example.com" in a zone "example.com",
+ this function should return "ipa.lab". Returns None when record cannot
+ be added to a zone. Returns '@' when the hostname is the zone record.
+ """
+ assert isinstance(zone, DNSName)
+ assert zone.is_absolute()
+ assert isinstance(hostname, DNSName)
+
+ if not hostname.is_absolute():
+ return hostname
+
+ if hostname.is_subdomain(zone):
+ return hostname.relativize(zone)
+
+ return None
+
+
+class DNSZoneBase_add(LDAPCreate):
+
+ has_output_params = LDAPCreate.has_output_params + dnszone_output_params
+
+ def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
+ assert isinstance(dn, DN)
+ if not dns_container_exists(self.api.Backend.ldap2):
+ raise errors.NotFound(reason=_('DNS is not configured'))
+
+ try:
+ entry = ldap.get_entry(dn)
+ except errors.NotFound:
+ pass
+ else:
+ if _check_entry_objectclass(entry, self.obj.object_class):
+ self.obj.handle_duplicate_entry(*keys)
+ else:
+ raise errors.DuplicateEntry(
+ message=_(u'Only one zone type is allowed per zone name')
+ )
+
+ entry_attrs['idnszoneactive'] = 'TRUE'
+
+ return dn
+
+
+class DNSZoneBase_del(LDAPDelete):
+
+ def pre_callback(self, ldap, dn, *nkeys, **options):
+ assert isinstance(dn, DN)
+ if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
+ self.obj.handle_not_found(*nkeys)
+ return dn
+
+ def post_callback(self, ldap, dn, *keys, **options):
+ try:
+ api.Command['permission_del'](self.obj.permission_name(keys[-1]),
+ force=True)
+ except errors.NotFound:
+ pass
+
+ return True
+
+
+class DNSZoneBase_mod(LDAPUpdate):
+ has_output_params = LDAPUpdate.has_output_params + dnszone_output_params
+
+
+class DNSZoneBase_find(LDAPSearch):
+ __doc__ = _('Search for DNS zones (SOA records).')
+
+ has_output_params = LDAPSearch.has_output_params + dnszone_output_params
+
+ def args_options_2_params(self, *args, **options):
+ # FIXME: Check that name_from_ip is valid. This is necessary because
+ # custom validation rules, including _validate_ipnet, are not
+ # used when doing a search. Once we have a parameter type for
+ # IP network objects, this will no longer be necessary, as the
+ # parameter type will handle the validation itself (see
+ # <https://fedorahosted.org/freeipa/ticket/2266>).
+ if 'name_from_ip' in options:
+ self.obj.params['name_from_ip'](unicode(options['name_from_ip']))
+ return super(DNSZoneBase_find, self).args_options_2_params(*args, **options)
+
+ def args_options_2_entry(self, *args, **options):
+ if 'name_from_ip' in options:
+ if 'idnsname' not in options:
+ options['idnsname'] = self.obj.params['idnsname'].get_default(**options)
+ del options['name_from_ip']
+ search_kw = super(DNSZoneBase_find, self).args_options_2_entry(*args,
+ **options)
+ name = search_kw.get('idnsname')
+ if name:
+ search_kw['idnsname'] = [name, name.relativize(DNSName.root)]
+ return search_kw
+
+ def pre_callback(self, ldap, filter, attrs_list, base_dn, scope, *args, **options):
+ assert isinstance(base_dn, DN)
+ filter = _create_idn_filter(self, ldap, *args, **options)
+ return (filter, base_dn, scope)
+
+
+class DNSZoneBase_show(LDAPRetrieve):
+ has_output_params = LDAPRetrieve.has_output_params + dnszone_output_params
+
+ def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
+ assert isinstance(dn, DN)
+ if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
+ self.obj.handle_not_found(*keys)
+ return dn
+
+
+class DNSZoneBase_disable(LDAPQuery):
+ has_output = output.standard_value
+
+ def execute(self, *keys, **options):
+ ldap = self.obj.backend
+
+ dn = self.obj.get_dn(*keys, **options)
+ entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
+ if not _check_entry_objectclass(entry, self.obj.object_class):
+ self.obj.handle_not_found(*keys)
+
+ entry['idnszoneactive'] = ['FALSE']
+
+ try:
+ ldap.update_entry(entry)
+ except errors.EmptyModlist:
+ pass
+
+ return dict(result=True, value=pkey_to_value(keys[-1], options))
+
+
+class DNSZoneBase_enable(LDAPQuery):
+ has_output = output.standard_value
+
+ def execute(self, *keys, **options):
+ ldap = self.obj.backend
+
+ dn = self.obj.get_dn(*keys, **options)
+ entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
+ if not _check_entry_objectclass(entry, self.obj.object_class):
+ self.obj.handle_not_found(*keys)
+
+ entry['idnszoneactive'] = ['TRUE']
+
+ try:
+ ldap.update_entry(entry)
+ except errors.EmptyModlist:
+ pass
+
+ return dict(result=True, value=pkey_to_value(keys[-1], options))
+
+
+class DNSZoneBase_add_permission(LDAPQuery):
+ has_output = _output_permissions
+ msg_summary = _('Added system permission "%(value)s"')
+
+ def execute(self, *keys, **options):
+ ldap = self.obj.backend
+ dn = self.obj.get_dn(*keys, **options)
+
+ try:
+ entry_attrs = ldap.get_entry(dn, ['objectclass'])
+ except errors.NotFound:
+ self.obj.handle_not_found(*keys)
+ else:
+ if not _check_entry_objectclass(entry_attrs, self.obj.object_class):
+ self.obj.handle_not_found(*keys)
+
+ permission_name = self.obj.permission_name(keys[-1])
+ permission = api.Command['permission_add_noaci'](permission_name,
+ ipapermissiontype=u'SYSTEM'
+ )['result']
+
+ dnszone_ocs = entry_attrs.get('objectclass')
+ if dnszone_ocs:
+ for oc in dnszone_ocs:
+ if oc.lower() == 'ipadnszone':
+ break
+ else:
+ dnszone_ocs.append('ipadnszone')
+
+ entry_attrs['managedby'] = [permission['dn']]
+ ldap.update_entry(entry_attrs)
+
+ return dict(
+ result=True,
+ value=pkey_to_value(permission_name, options),
+ )
+
+
+class DNSZoneBase_remove_permission(LDAPQuery):
+ has_output = _output_permissions
+ msg_summary = _('Removed system permission "%(value)s"')
+
+ def execute(self, *keys, **options):
+ ldap = self.obj.backend
+ dn = self.obj.get_dn(*keys, **options)
+ try:
+ entry = ldap.get_entry(dn, ['managedby', 'objectclass'])
+ except errors.NotFound:
+ self.obj.handle_not_found(*keys)
+ else:
+ if not _check_entry_objectclass(entry, self.obj.object_class):
+ self.obj.handle_not_found(*keys)
+
+ entry['managedby'] = None
+
+ try:
+ ldap.update_entry(entry)
+ except errors.EmptyModlist:
+ # managedBy attribute is clean, lets make sure there is also no
+ # dangling DNS zone permission
+ pass
+
+ permission_name = self.obj.permission_name(keys[-1])
+ api.Command['permission_del'](permission_name, force=True)
+
+ return dict(
+ result=True,
+ value=pkey_to_value(permission_name, options),
+ )
+
+
+@register()
+class dnszone(DNSZoneBase):
+ """
+ DNS Zone, container for resource records.
+ """
+ object_name = _('DNS zone')
+ object_name_plural = _('DNS zones')
+ object_class = DNSZoneBase.object_class + ['idnsrecord', 'idnszone']
+ default_attributes = DNSZoneBase.default_attributes + [
+ 'idnssoamname', 'idnssoarname', 'idnssoaserial', 'idnssoarefresh',
+ 'idnssoaretry', 'idnssoaexpire', 'idnssoaminimum', 'idnsallowquery',
+ 'idnsallowtransfer', 'idnssecinlinesigning',
+ ] + _record_attributes
+ label = _('DNS Zones')
+ label_singular = _('DNS Zone')
+
+ takes_params = DNSZoneBase.takes_params + (
DNSNameParam('idnssoamname',
cli_name='name_server',
label=_('Authoritative nameserver'),
@@ -1807,13 +2089,6 @@ class dnszone(LDAPObject):
default_from=lambda idnsname: default_zone_update_policy(idnsname),
autofill=True
),
- Bool('idnszoneactive?',
- cli_name='zone_active',
- label=_('Active zone'),
- doc=_('Is zone active?'),
- flags=['no_create', 'no_update'],
- attribute=True,
- ),
Bool('idnsallowdynupdate?',
cli_name='dynamic_update',
label=_('Dynamic update'),
@@ -1840,22 +2115,6 @@ class dnszone(LDAPObject):
default=u'none;', # no one can issue queries by default
autofill=True,
),
- Str('idnsforwarders*',
- _validate_bind_forwarder,
- cli_name='forwarder',
- label=_('Zone forwarders'),
- doc=_('Per-zone forwarders. A custom port can be specified '
- 'for each forwarder using a standard format "IP_ADDRESS port PORT"'),
- csv=True,
- ),
- StrEnum('idnsforwardpolicy?',
- cli_name='forward_policy',
- label=_('Forward policy'),
- doc=_('Per-zone conditional forwarding policy. Set to "none" to '
- 'disable forwarding to global forwarder for this zone. In '
- 'that case, conditional zone forwarders are disregarded.'),
- values=(u'only', u'first', u'none'),
- ),
Bool('idnsallowsyncptr?',
cli_name='allow_sync_ptr',
label=_('Allow PTR sync'),
@@ -1941,47 +2200,6 @@ class dnszone(LDAPObject):
},
}
- def get_dn(self, *keys, **options):
- zone = keys[-1]
- assert isinstance(zone, DNSName)
- assert zone.is_absolute()
- zone = zone.ToASCII()
-
- #try first relative name, a new zone has to be added as absolute
- #otherwise ObjectViolation is raised
- zone = zone[:-1]
- dn = super(dnszone, self).get_dn(zone, **options)
- try:
- self.backend.get_entry(dn, [''])
- except errors.NotFound:
- zone = u"%s." % zone
- dn = super(dnszone, self).get_dn(zone, **options)
-
- return dn
-
- def permission_name(self, zone):
- assert isinstance(zone, DNSName)
- return u"Manage DNS zone %s" % zone.ToASCII()
-
- def get_name_in_zone(self, zone, hostname):
- """
- Get name of a record that is to be added to a new zone. I.e. when
- we want to add record "ipa.lab.example.com" in a zone "example.com",
- this function should return "ipa.lab". Returns None when record cannot
- be added to a zone. Returns '@' when the hostname is the zone record.
- """
- assert isinstance(zone, DNSName)
- assert zone.is_absolute()
- assert isinstance(hostname, DNSName)
-
- if not hostname.is_absolute():
- return hostname
-
- if hostname.is_subdomain(zone):
- return hostname.relativize(zone)
-
- return None
-
def _rr_zone_postprocess(self, record, **options):
#Decode IDN ACE form to Unicode, raw records are passed directly from LDAP
if options.get('raw', False):
@@ -1991,11 +2209,10 @@ class dnszone(LDAPObject):
@register()
-class dnszone_add(LDAPCreate):
+class dnszone_add(DNSZoneBase_add):
__doc__ = _('Create new DNS zone (SOA record).')
- has_output_params = LDAPCreate.has_output_params + dnszone_output_params
- takes_options = LDAPCreate.takes_options + (
+ takes_options = DNSZoneBase_add.takes_options + (
Flag('force',
label=_('Force'),
doc=_('Force DNS zone creation even if nameserver is not resolvable.'),
@@ -2036,22 +2253,9 @@ class dnszone_add(LDAPCreate):
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)
- if not dns_container_exists(self.api.Backend.ldap2):
- raise errors.NotFound(reason=_('DNS is not configured'))
- try:
- entry = ldap.get_entry(dn)
- except errors.NotFound:
- pass
- else:
- if _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_duplicate_entry(*keys)
- else:
- raise errors.DuplicateEntry(
- message=_(u'Only one zone type is allowed per zone name')
- )
-
- entry_attrs['idnszoneactive'] = 'TRUE'
+ dn = super(dnszone_add, self).pre_callback(ldap, dn, entry_attrs,
+ attrs_list, *keys, **options)
# Check nameserver has a forward record
nameserver = entry_attrs['idnssoamname']
@@ -2122,30 +2326,21 @@ class dnszone_add(LDAPCreate):
@register()
-class dnszone_del(LDAPDelete):
+class dnszone_del(DNSZoneBase_del):
__doc__ = _('Delete DNS zone (SOA record).')
msg_summary = _('Deleted DNS zone "%(value)s"')
- def pre_callback(self, ldap, dn, *nkeys, **options):
- assert isinstance(dn, DN)
- if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
- self.obj.handle_not_found(*nkeys)
- return dn
-
def post_callback(self, ldap, dn, *keys, **options):
- try:
- api.Command['permission_del'](self.obj.permission_name(keys[-1]),
- force=True)
- except errors.NotFound:
- pass
+ super(dnszone_del, self).post_callback(ldap, dn, *keys, **options)
# Delete entry from realmdomains
# except for our own domain
zone = keys[0].make_absolute()
if (zone != DNSName(api.env.domain).make_absolute() and
- not zone.is_reverse()):
+ not zone.is_reverse()
+ ):
try:
api.Command['realmdomains_mod'](del_domain=unicode(zone),
force=True)
@@ -2157,18 +2352,16 @@ class dnszone_del(LDAPDelete):
@register()
-class dnszone_mod(LDAPUpdate):
+class dnszone_mod(DNSZoneBase_mod):
__doc__ = _('Modify DNS zone (SOA record).')
- takes_options = LDAPUpdate.takes_options + (
+ takes_options = DNSZoneBase_mod.takes_options + (
Flag('force',
label=_('Force'),
doc=_('Force nameserver change even if nameserver not in DNS'),
),
)
- has_output_params = LDAPUpdate.has_output_params + dnszone_output_params
-
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
self.obj.handle_not_found(*keys)
@@ -2184,37 +2377,11 @@ class dnszone_mod(LDAPUpdate):
return dn
-
@register()
-class dnszone_find(LDAPSearch):
+class dnszone_find(DNSZoneBase_find):
__doc__ = _('Search for DNS zones (SOA records).')
- has_output_params = LDAPSearch.has_output_params + dnszone_output_params
-
- def args_options_2_params(self, *args, **options):
- # FIXME: Check that name_from_ip is valid. This is necessary because
- # custom validation rules, including _validate_ipnet, are not
- # used when doing a search. Once we have a parameter type for
- # IP network objects, this will no longer be necessary, as the
- # parameter type will handle the validation itself (see
- # <https://fedorahosted.org/freeipa/ticket/2266>).
- if 'name_from_ip' in options:
- self.obj.params['name_from_ip'](unicode(options['name_from_ip']))
- return super(dnszone_find, self).args_options_2_params(*args, **options)
-
- def args_options_2_entry(self, *args, **options):
- if 'name_from_ip' in options:
- if 'idnsname' not in options:
- options['idnsname'] = self.obj.params['idnsname'].get_default(**options)
- del options['name_from_ip']
- search_kw = super(dnszone_find, self).args_options_2_entry(*args,
- **options)
- name = search_kw.get('idnsname')
- if name:
- search_kw['idnsname'] = [name, name.relativize(DNSName.root)]
- return search_kw
-
- takes_options = LDAPSearch.takes_options + (
+ takes_options = DNSZoneBase_find.takes_options + (
Flag('forward_only',
label=_('Forward zones only'),
cli_name='forward_only',
@@ -2225,7 +2392,8 @@ class dnszone_find(LDAPSearch):
def pre_callback(self, ldap, filter, attrs_list, base_dn, scope, *args, **options):
assert isinstance(base_dn, DN)
- filter = _create_idn_filter(self, ldap, *args, **options)
+ filter, base, dn = super(dnszone_find, self).pre_callback(ldap, filter,
+ attrs_list, base_dn, scope, *args, **options)
if options.get('forward_only', False):
search_kw = {}
@@ -2248,17 +2416,9 @@ class dnszone_find(LDAPSearch):
@register()
-class dnszone_show(LDAPRetrieve):
+class dnszone_show(DNSZoneBase_show):
__doc__ = _('Display information about a DNS zone (SOA record).')
- has_output_params = LDAPRetrieve.has_output_params + dnszone_output_params
-
- def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
- assert isinstance(dn, DN)
- if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
- self.obj.handle_not_found(*keys)
- return dn
-
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN)
self.obj._rr_zone_postprocess(entry_attrs, **options)
@@ -2267,132 +2427,26 @@ class dnszone_show(LDAPRetrieve):
@register()
-class dnszone_disable(LDAPQuery):
+class dnszone_disable(DNSZoneBase_disable):
__doc__ = _('Disable DNS Zone.')
-
- has_output = output.standard_value
msg_summary = _('Disabled DNS zone "%(value)s"')
- def execute(self, *keys, **options):
- ldap = self.obj.backend
-
- dn = self.obj.get_dn(*keys, **options)
- entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
- if not _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- entry['idnszoneactive'] = ['FALSE']
-
- try:
- ldap.update_entry(entry)
- except errors.EmptyModlist:
- pass
-
- return dict(result=True, value=pkey_to_value(keys[-1], options))
-
-
@register()
-class dnszone_enable(LDAPQuery):
+class dnszone_enable(DNSZoneBase_enable):
__doc__ = _('Enable DNS Zone.')
-
- has_output = output.standard_value
msg_summary = _('Enabled DNS zone "%(value)s"')
- def execute(self, *keys, **options):
- ldap = self.obj.backend
-
- dn = self.obj.get_dn(*keys, **options)
- entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
- if not _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- entry['idnszoneactive'] = ['TRUE']
-
- try:
- ldap.update_entry(entry)
- except errors.EmptyModlist:
- pass
-
- return dict(result=True, value=pkey_to_value(keys[-1], options))
-
@register()
-class dnszone_add_permission(LDAPQuery):
+class dnszone_add_permission(DNSZoneBase_add_permission):
__doc__ = _('Add a permission for per-zone access delegation.')
- has_output = _output_permissions
- msg_summary = _('Added system permission "%(value)s"')
-
- def execute(self, *keys, **options):
- ldap = self.obj.backend
- dn = self.obj.get_dn(*keys, **options)
-
- try:
- entry_attrs = ldap.get_entry(dn, ['objectclass'])
- except errors.NotFound:
- self.obj.handle_not_found(*keys)
- else:
- if not _check_entry_objectclass(entry_attrs, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- permission_name = self.obj.permission_name(keys[-1])
- permission = api.Command['permission_add_noaci'](permission_name,
- ipapermissiontype=u'SYSTEM'
- )['result']
-
- dnszone_ocs = entry_attrs.get('objectclass')
- if dnszone_ocs:
- for oc in dnszone_ocs:
- if oc.lower() == 'ipadnszone':
- break
- else:
- dnszone_ocs.append('ipadnszone')
-
- entry_attrs['managedby'] = [permission['dn']]
- ldap.update_entry(entry_attrs)
-
- return dict(
- result=True,
- value=pkey_to_value(permission_name, options),
- )
-
@register()
-class dnszone_remove_permission(LDAPQuery):
+class dnszone_remove_permission(DNSZoneBase_remove_permission):
__doc__ = _('Remove a permission for per-zone access delegation.')
- has_output = _output_permissions
- msg_summary = _('Removed system permission "%(value)s"')
-
- def execute(self, *keys, **options):
- ldap = self.obj.backend
- dn = self.obj.get_dn(*keys, **options)
- try:
- entry = ldap.get_entry(dn, ['managedby', 'objectclass'])
- except errors.NotFound:
- self.obj.handle_not_found(*keys)
- else:
- if not _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- entry['managedby'] = None
-
- try:
- ldap.update_entry(entry)
- except errors.EmptyModlist:
- # managedBy attribute is clean, lets make sure there is also no
- # dangling DNS zone permission
- pass
-
- permission_name = self.obj.permission_name(keys[-1])
- api.Command['permission_del'](permission_name, force=True)
-
- return dict(
- result=True,
- value=pkey_to_value(permission_name, options),
- )
-
@register()
class dnsrecord(LDAPObject):
@@ -2482,8 +2536,8 @@ class dnsrecord(LDAPObject):
addr_len = len(addr.labels)
- #Classless zones (0/25.0.0.10.in-addr.arpa.) -> skip check
- #zone has to be checked without reverse domain suffix (in-addr.arpa.)
+ # Classless zones (0/25.0.0.10.in-addr.arpa.) -> skip check
+ # zone has to be checked without reverse domain suffix (in-addr.arpa.)
for sign in ('/', '-'):
for name in (zone, addr):
for label in name.labels:
@@ -2540,7 +2594,7 @@ class dnsrecord(LDAPObject):
def get_dn(self, *keys, **options):
- dn = self.check_zone(keys[-2])
+ dn = self.check_zone(keys[-2], **options)
if self.is_pkey_zone_record(*keys):
return dn
@@ -3449,7 +3503,7 @@ class dnsrecord_del(LDAPUpdate):
deleted_values = []
for rec_value in dns_record[param.name]:
user_del_value = self.Backend.textui.prompt_yesno(
- _("Delete %(name)s '%(value)s'?") \
+ _("Delete %(name)s '%(value)s'?")
% dict(name=param.label, value=rec_value), default=False)
if user_del_value is True:
deleted_values.append(rec_value)
@@ -3674,110 +3728,30 @@ class dnsconfig_show(LDAPRetrieve):
@register()
-class dnsforwardzone(LDAPObject):
+class dnsforwardzone(DNSZoneBase):
"""
DNS Forward zone, container for resource records.
"""
- container_dn = api.env.container_dns
object_name = _('DNS forward zone')
object_name_plural = _('DNS forward zones')
- object_class = ['top', 'idnsforwardzone']
- possible_objectclasses = ['ipadnszone']
- default_attributes = [
- 'idnsname', 'idnszoneactive', 'idnsforwarders', 'idnsforwardpolicy'
- ]
+ object_class = DNSZoneBase.object_class + ['idnsforwardzone']
label = _('DNS Forward Zones')
label_singular = _('DNS Forward Zone')
default_forward_policy = u'first'
- takes_params = (
- DNSNameParam('idnsname',
- only_absolute=True,
- cli_name='name',
- label=_('Forward zone name'),
- doc=_('Forward zone name (FQDN)'),
- default_from=lambda name_from_ip: _reverse_zone_name(name_from_ip),
- primary_key=True,
- ),
- Str('name_from_ip?', _validate_ipnet,
- label=_('Reverse zone IP network'),
- doc=_('IP network to create reverse zone name from'),
- flags=('virtual_attribute',),
- ),
- Bool('idnszoneactive?',
- cli_name='zone_active',
- label=_('Active zone'),
- doc=_('Is zone active?'),
- flags=['no_create', 'no_update'],
- attribute=True,
- ),
- Str('idnsforwarders*',
- _validate_bind_forwarder,
- cli_name='forwarder',
- label=_('Zone forwarders'),
- doc=_('Per-zone forwarders. A custom port can be specified '
- 'for each forwarder using a standard format "IP_ADDRESS port PORT"'),
- csv=True,
- ),
- StrEnum('idnsforwardpolicy?',
- cli_name='forward_policy',
- label=_('Forward policy'),
- doc=_('Per-zone conditional forwarding policy. Set to "none" to '
- 'disable forwarding.'),
- values=(u'only', u'first', u'none'),
- ),
-
- )
# managed_permissions: permissions was apllied in dnszone class, do NOT
# add them here, they should not be applied twice.
- def get_dn(self, *keys, **options):
- zone = keys[-1]
- assert isinstance(zone, DNSName)
- assert zone.is_absolute()
- zone = zone.ToASCII()
-
- # try first relative name, a new zone has to be added as absolute
- # otherwise ObjectViolation is raised
- zone = zone[:-1]
- dn = super(dnsforwardzone, self).get_dn(zone, **options)
- try:
- self.backend.get_entry(dn, [''])
- except errors.NotFound:
- zone = u"%s." % zone
- dn = super(dnsforwardzone, self).get_dn(zone, **options)
-
- return dn
-
- def permission_name(self, zone):
- assert isinstance(zone, DNSName)
- return u"Manage DNS zone %s" % zone.ToASCII()
-
@register()
-class dnsforwardzone_add(LDAPCreate):
+class dnsforwardzone_add(DNSZoneBase_add):
__doc__ = _('Create new DNS forward zone.')
- has_output_params = LDAPCreate.has_output_params + dnszone_output_params
-
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
assert isinstance(dn, DN)
- if not dns_container_exists(self.api.Backend.ldap2):
- raise errors.NotFound(reason=_('DNS is not configured'))
-
- try:
- entry = ldap.get_entry(dn)
- except errors.NotFound:
- pass
- else:
- if _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_duplicate_entry(*keys)
- else:
- raise errors.DuplicateEntry(
- message=_(u'Only one zone type is allowed per zone name')
- )
- entry_attrs['idnszoneactive'] = 'TRUE'
+ dn = super(dnsforwardzone_add, self).pre_callback(ldap, dn,
+ entry_attrs, attrs_list, *keys, **options)
if 'idnsforwardpolicy' not in entry_attrs:
entry_attrs['idnsforwardpolicy'] = self.obj.default_forward_policy
@@ -3789,39 +3763,18 @@ class dnsforwardzone_add(LDAPCreate):
return dn
- def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
- assert isinstance(dn, DN)
- return dn
-
@register()
-class dnsforwardzone_del(LDAPDelete):
+class dnsforwardzone_del(DNSZoneBase_del):
__doc__ = _('Delete DNS forward zone.')
msg_summary = _('Deleted DNS forward zone "%(value)s"')
- def pre_callback(self, ldap, dn, *nkeys, **options):
- assert isinstance(dn, DN)
- if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
- self.obj.handle_not_found(*nkeys)
- return dn
-
- def post_callback(self, ldap, dn, *keys, **options):
- try:
- api.Command['permission_del'](self.obj.permission_name(keys[-1]),
- force=True)
- except errors.NotFound:
- pass
-
- return True
-
@register()
-class dnsforwardzone_mod(LDAPUpdate):
+class dnsforwardzone_mod(DNSZoneBase_mod):
__doc__ = _('Modify DNS forward zone.')
- has_output_params = LDAPUpdate.has_output_params + dnszone_output_params
-
def pre_callback(self, ldap, dn, entry_attrs, attrs_list, *keys, **options):
try:
entry = ldap.get_entry(dn)
@@ -3850,184 +3803,36 @@ class dnsforwardzone_mod(LDAPUpdate):
return dn
- def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
- return dn
-
@register()
-class dnsforwardzone_find(LDAPSearch):
+class dnsforwardzone_find(DNSZoneBase_find):
__doc__ = _('Search for DNS forward zones.')
- has_output_params = LDAPSearch.has_output_params + dnszone_output_params
-
- def args_options_2_params(self, *args, **options):
- # FIXME: Check that name_from_ip is valid. This is necessary because
- # custom validation rules, including _validate_ipnet, are not
- # used when doing a search. Once we have a parameter type for
- # IP network objects, this will no longer be necessary, as the
- # parameter type will handle the validation itself (see
- # <https://fedorahosted.org/freeipa/ticket/2266>).
- if 'name_from_ip' in options:
- self.obj.params['name_from_ip'](unicode(options['name_from_ip']))
- return super(dnsforwardzone_find, self).args_options_2_params(*args, **options)
-
- def args_options_2_entry(self, *args, **options):
- if 'name_from_ip' in options:
- if 'idnsname' not in options:
- options['idnsname'] = self.obj.params['idnsname'].get_default(**options)
- del options['name_from_ip']
- search_kw = super(dnsforwardzone_find, self).args_options_2_entry(*args,
- **options)
- name = search_kw.get('idnsname')
- if name:
- search_kw['idnsname'] = [name, name.relativize(DNSName.root)]
- return search_kw
-
- def pre_callback(self, ldap, filter, attrs_list, base_dn, scope, *args, **options):
- assert isinstance(base_dn, DN)
-
- filter = _create_idn_filter(self, ldap, *args, **options)
-
- return (filter, base_dn, scope)
-
- def post_callback(self, ldap, entries, truncated, *args, **options):
- return truncated
-
@register()
-class dnsforwardzone_show(LDAPRetrieve):
+class dnsforwardzone_show(DNSZoneBase_show):
__doc__ = _('Display information about a DNS forward zone.')
has_output_params = LDAPRetrieve.has_output_params + dnszone_output_params
- def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
- assert isinstance(dn, DN)
- if not _check_DN_objectclass(ldap, dn, self.obj.object_class):
- self.obj.handle_not_found(*keys)
- return dn
-
@register()
-class dnsforwardzone_disable(LDAPQuery):
+class dnsforwardzone_disable(DNSZoneBase_disable):
__doc__ = _('Disable DNS Forward Zone.')
-
- has_output = output.standard_value
msg_summary = _('Disabled DNS forward zone "%(value)s"')
- def execute(self, *keys, **options):
- ldap = self.obj.backend
-
- dn = self.obj.get_dn(*keys, **options)
- entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
- if not _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- entry['idnszoneactive'] = ['FALSE']
-
- try:
- ldap.update_entry(entry)
- except errors.EmptyModlist:
- pass
-
- return dict(result=True, value=pkey_to_value(keys[-1], options))
-
@register()
-class dnsforwardzone_enable(LDAPQuery):
+class dnsforwardzone_enable(DNSZoneBase_enable):
__doc__ = _('Enable DNS Forward Zone.')
-
- has_output = output.standard_value
msg_summary = _('Enabled DNS forward zone "%(value)s"')
- def execute(self, *keys, **options):
- ldap = self.obj.backend
-
- dn = self.obj.get_dn(*keys, **options)
- entry = ldap.get_entry(dn, ['idnszoneactive', 'objectclass'])
- if not _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_not_found(*keys)
- entry['idnszoneactive'] = ['TRUE']
-
- try:
- ldap.update_entry(entry)
- except errors.EmptyModlist:
- pass
-
- return dict(result=True, value=pkey_to_value(keys[-1], options))
-
@register()
-class dnsforwardzone_add_permission(LDAPQuery):
- __doc__ = _('Add a permission for per-zone access delegation.')
-
- has_output = _output_permissions
- msg_summary = _('Added system permission "%(value)s"')
-
- def execute(self, *keys, **options):
- ldap = self.obj.backend
- dn = self.obj.get_dn(*keys, **options)
-
- try:
- entry_attrs = ldap.get_entry(dn, ['objectclass'])
- except errors.NotFound:
- self.obj.handle_not_found(*keys)
- else:
- if not _check_entry_objectclass(entry_attrs, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- permission_name = self.obj.permission_name(keys[-1])
- permission = api.Command['permission_add_noaci'](permission_name,
- ipapermissiontype=u'SYSTEM'
- )['result']
-
- dnszone_ocs = entry_attrs.get('objectclass')
- if dnszone_ocs:
- for oc in dnszone_ocs:
- if oc.lower() == 'ipadnszone':
- break
- else:
- dnszone_ocs.append('ipadnszone')
-
- entry_attrs['managedby'] = [permission['dn']]
- ldap.update_entry(entry_attrs)
-
- return dict(
- result=True,
- value=pkey_to_value(permission_name, options),
- )
+class dnsforwardzone_add_permission(DNSZoneBase_add_permission):
+ __doc__ = _('Add a permission for per-forward zone access delegation.')
@register()
-class dnsforwardzone_remove_permission(LDAPQuery):
- __doc__ = _('Remove a permission for per-zone access delegation.')
-
- has_output = _output_permissions
- msg_summary = _('Removed system permission "%(value)s"')
-
- def execute(self, *keys, **options):
- ldap = self.obj.backend
- dn = self.obj.get_dn(*keys, **options)
- try:
- entry = ldap.get_entry(dn, ['managedby', 'objectclass'])
- except errors.NotFound:
- self.obj.handle_not_found(*keys)
- else:
- if not _check_entry_objectclass(entry, self.obj.object_class):
- self.obj.handle_not_found(*keys)
-
- entry['managedby'] = None
-
- try:
- ldap.update_entry(entry)
- except errors.EmptyModlist:
- # managedBy attribute is clean, lets make sure there is also no
- # dangling DNS zone permission
- pass
-
- permission_name = self.obj.permission_name(keys[-1])
- api.Command['permission_del'](permission_name, force=True)
-
- return dict(
- result=True,
- value=pkey_to_value(permission_name, options),
- )
+class dnsforwardzone_remove_permission(DNSZoneBase_remove_permission):
+ __doc__ = _('Remove a permission for per-forward zone access delegation.')