summaryrefslogtreecommitdiffstats
path: root/src/tests/intg/sssd_ldb.py
blob: bf5264b77880bbf4ae085f12ebf6b2e58c3e28df (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#
# SSSD integration test - access the ldb cache
#
# Copyright (c) 2016 Red Hat, Inc.
#
# This is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import ldb
import config


class CacheType(object):
    sysdb = 1
    timestamps = 2


class TsCacheEntry(object):
    user = 1
    group = 2


class SssdLdb(object):
    def __init__(self, domain_name):
        self._domain_name = domain_name
        self._sysdb = self._create_dbconn(CacheType.sysdb,
                                          domain_name)
        self._timestamps = self._create_dbconn(CacheType.timestamps,
                                               domain_name)

    def _create_dbconn(self, cache_type, domain_name):
        if cache_type == CacheType.sysdb:
            db_path = os.path.join(config.DB_PATH,
                                   "cache_%s.ldb" % domain_name)
        elif cache_type == CacheType.timestamps:
            db_path = os.path.join(config.DB_PATH,
                                   "timestamps_%s.ldb" % domain_name)
        else:
            raise ValueError("Unknown cache type\n")

        pyldb = ldb.Ldb()
        pyldb.connect(db_path)
        return pyldb

    def _get_dbconn(self, cache_type):
        dbconn = None
        if cache_type == CacheType.sysdb:
            dbconn = self._sysdb
        elif cache_type == CacheType.timestamps:
            dbconn = self._timestamps
        return dbconn

    def _entry_basedn(self, entry_type):
        if entry_type == TsCacheEntry.user:
            rdn = "users"
        elif entry_type == TsCacheEntry.group:
            rdn = "groups"
        else:
            raise ValueError("Unknown entry type\n")
        return "cn=%s,cn=%s,cn=sysdb" % (rdn, self._domain_name)

    def _basedn(self, name, entry_type):
        return "name=%s,%s" % (name, self._entry_basedn(entry_type))

    def get_entry_attr(self, cache_type, entry_type, name, attr):
        dbconn = self._get_dbconn(cache_type)
        basedn = self._basedn(name, entry_type)

        res = dbconn.search(base=basedn, scope=ldb.SCOPE_BASE, attrs=[attr])
        if res.count != 1:
            return None

        return res.msgs[0].get(attr).get(0)