summaryrefslogtreecommitdiffstats
path: root/ipatests/test_integration/test_dnssec.py
blob: 74dc1be25476353e676f2601ace673212234df63 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
#
# Copyright (C) 2015  FreeIPA Contributors see COPYING for license
#

import dns.dnssec
import dns.resolver
import dns.name
import time

from ipatests.test_integration.base import IntegrationTest
from ipatests.test_integration import tasks
from ipaplatform.paths import paths

test_zone = "dnssec.test."
test_zone_repl = "dnssec-replica.test."
root_zone = "."
example_test_zone = "example.test."


def resolve_with_dnssec(nameserver, query, log, rtype="SOA"):
    res = dns.resolver.Resolver()
    res.nameservers = [nameserver]
    res.lifetime = 10  # wait max 10 seconds for reply
    # enable Authenticated Data + Checking Disabled flags
    res.set_flags(dns.flags.AD | dns.flags.CD)

    # enable EDNS v0 + enable DNSSEC-Ok flag
    res.use_edns(0, dns.flags.DO, 0)

    ans = res.query(query, rtype)
    return ans


def is_record_signed(nameserver, query, log, rtype="SOA"):
    try:
        ans = resolve_with_dnssec(nameserver, query, log, rtype=rtype)
        ans.response.find_rrset(ans.response.answer, dns.name.from_text(query),
                                dns.rdataclass.IN, dns.rdatatype.RRSIG,
                                dns.rdatatype.from_text(rtype))
    except KeyError:
        return False
    except dns.exception.DNSException:
        return False
    return True


def wait_until_record_is_signed(nameserver, record, log, rtype="SOA",
                                timeout=100):
    """
    Returns True if record is signed, or False on timeout
    :param nameserver: nameserver to query
    :param record: query
    :param log: logger
    :param rtype: record type
    :param timeout:
    :return: True if records is signed, False if timeout
    """
    log.info("Waiting for signed %s record of %s from server %s (timeout %s "
             "sec)", rtype, record, nameserver, timeout)
    wait_until = time.time() + timeout
    while time.time() < wait_until:
        if is_record_signed(nameserver, record, log, rtype=rtype):
            return True
        time.sleep(1)
    return False


class TestInstallDNSSECLast(IntegrationTest):
    """Simple DNSSEC test

    Install a server and a replica with DNS, then reinstall server
    as DNSSEC master
    """
    num_replicas = 1
    topology = 'star'

    @classmethod
    def install(cls, mh):
        tasks.install_master(cls.master, setup_dns=True)
        tasks.install_replica(cls.master, cls.replicas[0], setup_dns=True)

    def test_install_dnssec_master(self):
        """Both master and replica have DNS installed"""
        args = [
            "ipa-dns-install",
            "--dnssec-master",
            "--forwarder", self.master.config.dns_forwarder,
            "-p", self.master.config.dirman_password,
            "-U",
        ]
        self.master.run_command(args)

    def test_if_zone_is_signed_master(self):
        # add zone with enabled DNSSEC signing on master
        args = [
            "ipa",
            "dnszone-add", test_zone,
            "--dnssec", "true",
        ]
        self.master.run_command(args)

        # test master
        assert wait_until_record_is_signed(
            self.master.ip, test_zone, self.log, timeout=100
        ), "Zone %s is not signed (master)" % test_zone

        # test replica
        assert wait_until_record_is_signed(
            self.replicas[0].ip, test_zone, self.log, timeout=200
        ), "DNS zone %s is not signed (replica)" % test_zone

    def test_if_zone_is_signed_replica(self):
        # add zone with enabled DNSSEC signing on replica
        args = [
            "ipa",
            "dnszone-add", test_zone_repl,
            "--dnssec", "true",
        ]
        self.replicas[0].run_command(args)

        # test replica
        assert wait_until_record_is_signed(
            self.replicas[0].ip, test_zone_repl, self.log, timeout=300
        ), "Zone %s is not signed (replica)" % test_zone_repl

        # we do not need to wait, on master zones should be singed faster
        # than on replicas

        assert wait_until_record_is_signed(
            self.master.ip, test_zone_repl, self.log, timeout=5
        ), "DNS zone %s is not signed (master)" % test_zone


class TestInstallDNSSECFirst(IntegrationTest):
    """Simple DNSSEC test

    Install the server with DNSSEC and then install the replica with DNS
    """
    num_replicas = 1
    topology = 'star'

    @classmethod
    def install(cls, mh):
        tasks.install_master(cls.master, setup_dns=False)
        args = [
            "ipa-dns-install",
            "--dnssec-master",
            "--forwarder", cls.master.config.dns_forwarder,
            "-p", cls.master.config.dirman_password,
            "-U",
        ]
        cls.master.run_command(args)

        tasks.install_replica(cls.master, cls.replicas[0], setup_dns=True)

        # backup trusted key
        tasks.backup_file(cls.master, paths.DNSSEC_TRUSTED_KEY)
        tasks.backup_file(cls.replicas[0], paths.DNSSEC_TRUSTED_KEY)

    @classmethod
    def uninstall(cls, mh):
        # restore trusted key
        tasks.restore_files(cls.master)
        tasks.restore_files(cls.replicas[0])

        super(TestInstallDNSSECFirst, cls).uninstall(mh)

    def test_sign_root_zone(self):
        args = [
            "ipa", "dnszone-add", root_zone, "--dnssec", "true"
        ]
        self.master.run_command(args)

        # make BIND happy, and delegate zone which contains A record of master
        args = [
            "ipa", "dnsrecord-add", root_zone, self.master.domain.name,
            "--ns-rec=" + self.master.hostname
        ]
        self.master.run_command(args)

        # test master
        assert wait_until_record_is_signed(
            self.master.ip, root_zone, self.log, timeout=100
        ), "Zone %s is not signed (master)" % root_zone

        # test replica
        assert wait_until_record_is_signed(
            self.replicas[0].ip, root_zone, self.log, timeout=300
        ), "Zone %s is not signed (replica)" % root_zone

    def test_chain_of_trust(self):
        """
        Validate signed DNS records, using our own signed root zone
        :return:
        """

        # add test zone
        args = [
            "ipa", "dnszone-add", example_test_zone, "--dnssec", "true"
        ]

        self.master.run_command(args)

        # wait until zone is signed
        assert wait_until_record_is_signed(
            self.master.ip, example_test_zone, self.log, timeout=100
        ), "Zone %s is not signed (master)" % example_test_zone

        # GET DNSKEY records from zone
        ans = resolve_with_dnssec(self.master.ip, example_test_zone, self.log,
                                  rtype="DNSKEY")
        dnskey_rrset = ans.response.get_rrset(
            ans.response.answer,
            dns.name.from_text(example_test_zone),
            dns.rdataclass.IN,
            dns.rdatatype.DNSKEY)
        assert dnskey_rrset, "No DNSKEY records received"

        self.log.debug("DNSKEY records returned: %s", dnskey_rrset.to_text())

        # generate DS records
        ds_records = []
        for key_rdata in dnskey_rrset:
            if key_rdata.flags != 257:
                continue  # it is not KSK
            ds_records.append(dns.dnssec.make_ds(example_test_zone, key_rdata,
                                                 'sha256'))
        assert ds_records, ("No KSK returned from the %s zone" %
                            example_test_zone)

        self.log.debug("DS records for %s created: %r", example_test_zone,
                       ds_records)

        # add DS records to root zone
        args = [
            "ipa", "dnsrecord-add", root_zone, example_test_zone,
            # DS record requires to coexists with NS
            "--ns-rec", self.master.hostname,
        ]
        for ds in ds_records:
            args.append("--ds-rec")
            args.append(ds.to_text())

        self.master.run_command(args)

        # extract DSKEY from root zone
        ans = resolve_with_dnssec(self.master.ip, root_zone, self.log,
                                  rtype="DNSKEY")
        dnskey_rrset = ans.response.get_rrset(ans.response.answer,
                                              dns.name.from_text(root_zone),
                                              dns.rdataclass.IN,
                                              dns.rdatatype.DNSKEY)
        assert dnskey_rrset, "No DNSKEY records received"

        self.log.debug("DNSKEY records returned: %s", dnskey_rrset.to_text())

        # export trust keys for root zone
        root_key_rdatas = []
        for key_rdata in dnskey_rrset:
            if key_rdata.flags != 257:
                continue  # it is not KSK
            root_key_rdatas.append(key_rdata)

        assert root_key_rdatas, "No KSK returned from the root zone"

        root_keys_rrset = dns.rrset.from_rdata_list(dnskey_rrset.name,
                                                    dnskey_rrset.ttl,
                                                    root_key_rdatas)
        self.log.debug("Root zone trusted key: %s", root_keys_rrset.to_text())

        # set trusted key for our root zone
        self.master.put_file_contents(paths.DNSSEC_TRUSTED_KEY,
                                      root_keys_rrset.to_text() + '\n')
        self.replicas[0].put_file_contents(paths.DNSSEC_TRUSTED_KEY,
                                           root_keys_rrset.to_text() + '\n')

        # verify signatures
        args = [
            "drill", "@localhost", "-k",
            paths.DNSSEC_TRUSTED_KEY, "-S",
            example_test_zone, "SOA"
        ]

        # test if signature chains are valid
        self.master.run_command(args)
        self.replicas[0].run_command(args)