diff options
Diffstat (limited to 'src/account/test/common.py')
-rw-r--r-- | src/account/test/common.py | 164 |
1 files changed, 164 insertions, 0 deletions
diff --git a/src/account/test/common.py b/src/account/test/common.py index 938fb11..2b3b423 100644 --- a/src/account/test/common.py +++ b/src/account/test/common.py @@ -21,10 +21,17 @@ Base class and utilities for all OpenLMI Account tests. """ +import hashlib import os +import tempfile +import subprocess +from collections import Counter +from collections import OrderedDict +import methods from lmi.test import lmibase + class AccountBase(lmibase.LmiTestCase): """ Base class for all LMI Account tests. @@ -38,3 +45,160 @@ class AccountBase(lmibase.LmiTestCase): cls.user_name = os.environ.get("LMI_ACCOUNT_USER") cls.group_name = os.environ.get("LMI_ACCOUNT_GROUP") + +## ......................................................................... ## +## Validators +## ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' ## + +class PasswdFile(): + """ + Parse /etc/passwd and perform basic heuristics to assess validity. + + By heuristics, I mean it's OK to include here what is considered to + be "normal", or "expected" rather than strictly vaid/invalid. For + example, you can consider "not normal" to have UID!=GID, but depending + on what you did, it could be OK. OTOH, keep in mind that more specific + things should be in the test itself. + """ + + DEFAULT_OPTIONS = { + 'username_prefix': 'user', + 'unique': [ + "name", + "uid", + ] + } + + def __init__(self, options=None): + self.options = self.__class__.DEFAULT_OPTIONS + if options is not None: + self.options.update(options) + self.users = [] + + with open('/etc/passwd') as pf: + lines = pf.readlines() + self.fulltext = "".join(lines) + + for line in lines: + fields = line.split(":") + user = { + "name": fields[0], + "password": fields[1], + "uid": fields[2], + "gid": fields[3], + "gecos": fields[4], + "directory": fields[5], + "shell": fields[6], + } + if user['name'].startswith(self.options['username_prefix']): + self.users.append(user) + + def find_dups(self): + """ + Find dups in fields that should be unique + """ + dups = Counter() + for field in self.options['unique']: + if not methods.field_is_unique(field, self.users): + dups[field] += 1 + return dict(dups) + + def get_errors(self): + """ + Get hash of errors. + """ + errlist = {} + dups = self.find_dups() + if dups: + errlist['duplicates'] = dups + return errlist + + def get_names(self): + """ + Get list of user names + """ + return [u['name'] for u in self.users] + + +## ......................................................................... ## +## Other helpers +## ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' ## + +class BackupStorage(): + """ + Simple file backup storage. + + * Only supports files. + * Only supports absolute paths. + * Consecutive backups rewrite each other. + * Does not autodestroy the backup. + """ + + def __init__(self): + self.root = tempfile.mkdtemp(prefix=self.__class__.__name__ + ".") + self.backups = OrderedDict() + subprocess.check_call(["mkdir", "-p", self.root]) + + def _copy(self, src, dest): + """ + Copy src to dst --- force, keep meta, no questions asked + """ + subprocess.check_call(["cp", "-a", "-f", src, dest]) + + def _get_bpath(self, path): + """ + Take original path and return path to backup. + """ + if not path.startswith("/"): + raise ValueError("only absolute paths are supported") + digest = hashlib.sha1(path).hexdigest() + return self.root + "/" + digest + + def _update_index(self): + """ + Create/update an index file to help in case of backup investigation + + For convenience, index file is sorted by real path. + """ + paths = sorted(self.backups.keys()) + with open(self.root + "/index", "w+") as fh: + for path in paths: + fh.write("%s %s\n" % (self.backups[path], path)) + + def add_files(self, paths): + """ + Add list of tiles to backup storage + """ + for path in paths: + self.add_file(path) + + def add_file(self, path): + """ + Add a file to backup storage + """ + bpath = self._get_bpath(path) + self._copy(path, bpath) + self.backups[path] = bpath + self._update_index() + + def restore(self, path): + """ + Restore particular path + """ + try: + self._copy(self.backups[path], path) + except KeyError: + raise ValueError("path not stored: %s" % path) + + def restore_all(self): + """ + Restore all stored paths in same order as they were stored + """ + for key in self.backups.keys(): + self.restore(key) + + def destroy_backup(self): + """ + Destroy the temporary backup + """ + subprocess.call(["rm", "-rf", self.root]) |