summaryrefslogtreecommitdiffstats
path: root/ipatests/test_xmlrpc/tracker/server_plugin.py
blob: 0a78cc9ace4d008b2c6d3306c3c2819439c4ae5d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#
# Copyright (C) 2016  FreeIPA Contributors see COPYING for license
#
from __future__ import absolute_import

from ipalib import errors
from ipapython.dn import DN
from ipatests.util import assert_deepequal
from ipatests.test_xmlrpc.tracker.base import Tracker


class ServerTracker(Tracker):
    """Tracker for IPA Location tests"""
    retrieve_keys = {
        'cn', 'dn', 'ipamaxdomainlevel', 'ipamindomainlevel',
        'iparepltopomanagedsuffix_topologysuffix', 'ipalocation_location',
        'ipaserviceweight', 'enabled_role_servrole'
    }
    retrieve_all_keys = retrieve_keys | {'objectclass'}
    create_keys = retrieve_keys | {'objectclass'}
    find_keys = {
        'cn', 'dn', 'ipamaxdomainlevel', 'ipamindomainlevel',
        'ipaserviceweight',
    }
    find_all_keys = retrieve_all_keys
    update_keys = {
        'cn', 'ipamaxdomainlevel', 'ipamindomainlevel',
        'ipalocation_location', 'ipaserviceweight',
    }

    def __init__(self, name):
        super(ServerTracker, self).__init__(default_version=None)
        self.server_name = name
        self.dn = DN(
            ('cn', self.server_name),
            'cn=masters,cn=ipa,cn=etc',
            self.api.env.basedn
        )
        self.exists = True  # we cannot add server manually using server-add
        self.attrs = dict(
            dn=self.dn,
            cn=[self.server_name],
            iparepltopomanagedsuffix_topologysuffix=[u'domain', u'ca'],
            objectclass=[
                u"ipalocationmember",
                u"ipaReplTopoManagedServer",
                u"top",
                u"ipaConfigObject",
                u"nsContainer",
                u"ipaSupportedDomainLevelConfig"
            ],
            ipamaxdomainlevel=[u"1"],
            ipamindomainlevel=[u"0"],
        )
        self.exists = True

    def make_retrieve_command(self, all=False, raw=False):
        """Make function that retrieves this server using server-show"""
        return self.make_command(
            'server_show', self.name, all=all, raw=raw
        )

    def make_find_command(self, *args, **kwargs):
        """Make function that finds servers using server-find"""
        return self.make_command('server_find', *args, **kwargs)

    def make_update_command(self, updates):
        """Make function that modifies the server using server-mod"""
        return self.make_command('server_mod', self.name, **updates)

    def check_retrieve(self, result, all=False, raw=False):
        """Check `server-show` command result"""
        if all:
            expected = self.filter_attrs(self.retrieve_all_keys)
        else:
            expected = self.filter_attrs(self.retrieve_keys)
        assert_deepequal(dict(
            value=self.server_name,
            summary=None,
            result=expected,
        ), result)

    def check_find(self, result, all=False, raw=False):
        """Check `server-find` command result"""
        if all:
            expected = self.filter_attrs(self.find_all_keys)
        else:
            expected = self.filter_attrs(self.find_keys)
        assert_deepequal(dict(
            count=1,
            truncated=False,
            summary=u'1 IPA server matched',
            result=[expected],
        ), result)

    def check_find_nomatch(self, result):
        """ Check 'server-find' command result when no match is expected """
        assert_deepequal(dict(
            count=0,
            truncated=False,
            summary=u'0 IPA servers matched',
            result=[],
        ), result)

    def check_update(self, result, extra_keys=(), messages=None):
        """Check `server-update` command result"""
        expected = dict(
            value=self.server_name,
            summary=u'Modified IPA server "{server}"'.format(
                server=self.name),
            result=self.filter_attrs(self.update_keys | set(extra_keys))
            )
        if messages:
            expected['messages'] = messages

        assert_deepequal(expected, result)

    def update(self, updates, expected_updates=None, messages=None):
        if expected_updates is None:
            expected_updates = {}

        self.ensure_exists()
        command = self.make_update_command(updates)
        result = command()
        self.attrs.update(updates)
        self.attrs.update(expected_updates)
        for key, value in self.attrs.items():
            if value is None:
                del self.attrs[key]

        self.check_update(
            result,
            extra_keys=set(updates.keys()) | set(expected_updates.keys()),
            messages=messages)

    def make_fixture_clean_location(self, request):
        command = self.make_update_command({u'ipalocation_location': None})
        try:
            command()
        except errors.EmptyModlist:
            pass

        def cleanup():
            try:
                command()
            except errors.EmptyModlist:
                pass
        request.addfinalizer(cleanup)
        return self