summaryrefslogtreecommitdiffstats
path: root/src/account/test/common.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/account/test/common.py')
-rw-r--r--src/account/test/common.py164
1 files changed, 164 insertions, 0 deletions
diff --git a/src/account/test/common.py b/src/account/test/common.py
index 938fb11..2b3b423 100644
--- a/src/account/test/common.py
+++ b/src/account/test/common.py
@@ -21,10 +21,17 @@
Base class and utilities for all OpenLMI Account tests.
"""
+import hashlib
import os
+import tempfile
+import subprocess
+from collections import Counter
+from collections import OrderedDict
+import methods
from lmi.test import lmibase
+
class AccountBase(lmibase.LmiTestCase):
"""
Base class for all LMI Account tests.
@@ -38,3 +45,160 @@ class AccountBase(lmibase.LmiTestCase):
cls.user_name = os.environ.get("LMI_ACCOUNT_USER")
cls.group_name = os.environ.get("LMI_ACCOUNT_GROUP")
+
+## ......................................................................... ##
+## Validators
+## ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' ##
+
+class PasswdFile():
+ """
+ Parse /etc/passwd and perform basic heuristics to assess validity.
+
+ By heuristics, I mean it's OK to include here what is considered to
+ be "normal", or "expected" rather than strictly vaid/invalid. For
+ example, you can consider "not normal" to have UID!=GID, but depending
+ on what you did, it could be OK. OTOH, keep in mind that more specific
+ things should be in the test itself.
+ """
+
+ DEFAULT_OPTIONS = {
+ 'username_prefix': 'user',
+ 'unique': [
+ "name",
+ "uid",
+ ]
+ }
+
+ def __init__(self, options=None):
+ self.options = self.__class__.DEFAULT_OPTIONS
+ if options is not None:
+ self.options.update(options)
+ self.users = []
+
+ with open('/etc/passwd') as pf:
+ lines = pf.readlines()
+ self.fulltext = "".join(lines)
+
+ for line in lines:
+ fields = line.split(":")
+ user = {
+ "name": fields[0],
+ "password": fields[1],
+ "uid": fields[2],
+ "gid": fields[3],
+ "gecos": fields[4],
+ "directory": fields[5],
+ "shell": fields[6],
+ }
+ if user['name'].startswith(self.options['username_prefix']):
+ self.users.append(user)
+
+ def find_dups(self):
+ """
+ Find dups in fields that should be unique
+ """
+ dups = Counter()
+ for field in self.options['unique']:
+ if not methods.field_is_unique(field, self.users):
+ dups[field] += 1
+ return dict(dups)
+
+ def get_errors(self):
+ """
+ Get hash of errors.
+ """
+ errlist = {}
+ dups = self.find_dups()
+ if dups:
+ errlist['duplicates'] = dups
+ return errlist
+
+ def get_names(self):
+ """
+ Get list of user names
+ """
+ return [u['name'] for u in self.users]
+
+
+## ......................................................................... ##
+## Other helpers
+## ''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''' ##
+
+class BackupStorage():
+ """
+ Simple file backup storage.
+
+ * Only supports files.
+ * Only supports absolute paths.
+ * Consecutive backups rewrite each other.
+ * Does not autodestroy the backup.
+ """
+
+ def __init__(self):
+ self.root = tempfile.mkdtemp(prefix=self.__class__.__name__ + ".")
+ self.backups = OrderedDict()
+ subprocess.check_call(["mkdir", "-p", self.root])
+
+ def _copy(self, src, dest):
+ """
+ Copy src to dst --- force, keep meta, no questions asked
+ """
+ subprocess.check_call(["cp", "-a", "-f", src, dest])
+
+ def _get_bpath(self, path):
+ """
+ Take original path and return path to backup.
+ """
+ if not path.startswith("/"):
+ raise ValueError("only absolute paths are supported")
+ digest = hashlib.sha1(path).hexdigest()
+ return self.root + "/" + digest
+
+ def _update_index(self):
+ """
+ Create/update an index file to help in case of backup investigation
+
+ For convenience, index file is sorted by real path.
+ """
+ paths = sorted(self.backups.keys())
+ with open(self.root + "/index", "w+") as fh:
+ for path in paths:
+ fh.write("%s %s\n" % (self.backups[path], path))
+
+ def add_files(self, paths):
+ """
+ Add list of tiles to backup storage
+ """
+ for path in paths:
+ self.add_file(path)
+
+ def add_file(self, path):
+ """
+ Add a file to backup storage
+ """
+ bpath = self._get_bpath(path)
+ self._copy(path, bpath)
+ self.backups[path] = bpath
+ self._update_index()
+
+ def restore(self, path):
+ """
+ Restore particular path
+ """
+ try:
+ self._copy(self.backups[path], path)
+ except KeyError:
+ raise ValueError("path not stored: %s" % path)
+
+ def restore_all(self):
+ """
+ Restore all stored paths in same order as they were stored
+ """
+ for key in self.backups.keys():
+ self.restore(key)
+
+ def destroy_backup(self):
+ """
+ Destroy the temporary backup
+ """
+ subprocess.call(["rm", "-rf", self.root])