diff options
Diffstat (limited to 'ipapython')
-rw-r--r-- | ipapython/certmonger.py | 611 | ||||
-rw-r--r-- | ipapython/sysrestore.py | 441 |
2 files changed, 0 insertions, 1052 deletions
diff --git a/ipapython/certmonger.py b/ipapython/certmonger.py deleted file mode 100644 index 6f0948af6..000000000 --- a/ipapython/certmonger.py +++ /dev/null @@ -1,611 +0,0 @@ -# Authors: Rob Crittenden <rcritten@redhat.com> -# David Kupka <dkupka@redhat.com> -# -# Copyright (C) 2010 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# - -# Some certmonger functions, mostly around updating the request file. -# This is used so we can add tracking to the Apache and 389-ds -# server certificates created during the IPA server installation. - -from __future__ import print_function - -import os -import time -import dbus -import shlex -import subprocess -import tempfile -from ipapython.ipa_log_manager import root_logger -from ipaplatform.paths import paths -from ipaplatform import services - -DBUS_CM_PATH = '/org/fedorahosted/certmonger' -DBUS_CM_IF = 'org.fedorahosted.certmonger' -DBUS_CM_NAME = 'org.fedorahosted.certmonger' -DBUS_CM_REQUEST_IF = 'org.fedorahosted.certmonger.request' -DBUS_CM_CA_IF = 'org.fedorahosted.certmonger.ca' -DBUS_PROPERTY_IF = 'org.freedesktop.DBus.Properties' - - -class _cm_dbus_object(object): - """ - Auxiliary class for convenient DBus object handling. - """ - def __init__(self, bus, parent, object_path, object_dbus_interface, - parent_dbus_interface=None, property_interface=False): - """ - bus - DBus bus object, result of dbus.SystemBus() or dbus.SessionBus() - Object is accesible over this DBus bus instance. - object_path - path to requested object on DBus bus - object_dbus_interface - parent_dbus_interface - property_interface - create DBus property interface? True or False - """ - if bus is None or object_path is None or object_dbus_interface is None: - raise RuntimeError( - "bus, object_path and dbus_interface must not be None.") - if parent_dbus_interface is None: - parent_dbus_interface = object_dbus_interface - self.bus = bus - self.parent = parent - self.path = object_path - self.obj_dbus_if = object_dbus_interface - self.parent_dbus_if = parent_dbus_interface - self.obj = bus.get_object(parent_dbus_interface, object_path) - self.obj_if = dbus.Interface(self.obj, object_dbus_interface) - if property_interface: - self.prop_if = dbus.Interface(self.obj, DBUS_PROPERTY_IF) - - -class _certmonger(_cm_dbus_object): - """ - Create a connection to certmonger. - By default use SystemBus. When not available use private connection - over Unix socket. - This solution is really ugly and should be removed as soon as DBus - SystemBus is available at system install time. - """ - timeout = 300 - - def _start_private_conn(self): - sock_filename = os.path.join(tempfile.mkdtemp(), 'certmonger') - self._proc = subprocess.Popen([paths.CERTMONGER, '-n', '-L', '-P', - sock_filename]) - for _t in range(0, self.timeout, 5): - if os.path.exists(sock_filename): - return "unix:path=%s" % sock_filename - time.sleep(5) - self._stop_private_conn() - raise RuntimeError("Failed to start certmonger: Timed out") - - def _stop_private_conn(self): - if self._proc: - retcode = self._proc.poll() - if retcode is not None: - return - self._proc.terminate() - for _t in range(0, self.timeout, 5): - retcode = self._proc.poll() - if retcode is not None: - return - time.sleep(5) - root_logger.error("Failed to stop certmonger.") - - def __del__(self): - self._stop_private_conn() - - def __init__(self): - self._proc = None - self._bus = None - try: - self._bus = dbus.SystemBus() - except dbus.DBusException as e: - err_name = e.get_dbus_name() - if err_name not in ['org.freedesktop.DBus.Error.NoServer', - 'org.freedesktop.DBus.Error.FileNotFound']: - root_logger.error("Failed to connect to certmonger over " - "SystemBus: %s" % e) - raise - try: - self._private_sock = self._start_private_conn() - self._bus = dbus.connection.Connection(self._private_sock) - except dbus.DBusException as e: - root_logger.error("Failed to connect to certmonger over " - "private socket: %s" % e) - raise - else: - try: - self._bus.get_name_owner(DBUS_CM_NAME) - except dbus.DBusException: - try: - services.knownservices.certmonger.start() - except Exception as e: - root_logger.error("Failed to start certmonger: %s" % e) - raise - - for _t in range(0, self.timeout, 5): - try: - self._bus.get_name_owner(DBUS_CM_NAME) - break - except dbus.DBusException: - pass - time.sleep(5) - raise RuntimeError('Failed to start certmonger') - - super(_certmonger, self).__init__(self._bus, None, DBUS_CM_PATH, - DBUS_CM_IF) - - -def _get_requests(criteria=dict()): - """ - Get all requests that matches the provided criteria. - """ - if not isinstance(criteria, dict): - raise TypeError('"criteria" must be dict.') - - cm = _certmonger() - requests = [] - requests_paths = [] - if 'nickname' in criteria: - request_path = cm.obj_if.find_request_by_nickname(criteria['nickname']) - if request_path: - requests_paths = [request_path] - else: - requests_paths = cm.obj_if.get_requests() - - for request_path in requests_paths: - request = _cm_dbus_object(cm.bus, cm, request_path, DBUS_CM_REQUEST_IF, - DBUS_CM_IF, True) - for criterion in criteria: - if criterion == 'ca-name': - ca_path = request.obj_if.get_ca() - ca = _cm_dbus_object(cm.bus, cm, ca_path, DBUS_CM_CA_IF, - DBUS_CM_IF) - value = ca.obj_if.get_nickname() - else: - value = request.prop_if.Get(DBUS_CM_REQUEST_IF, criterion) - if value != criteria[criterion]: - break - else: - requests.append(request) - - return requests - - -def _get_request(criteria): - """ - Find request that matches criteria. - If 'nickname' is specified other criteria are ignored because 'nickname' - uniquely identify single request. - When multiple or none request matches specified criteria RuntimeError is - raised. - """ - requests = _get_requests(criteria) - if len(requests) == 0: - return None - elif len(requests) == 1: - return requests[0] - else: - raise RuntimeError("Criteria expected to be met by 1 request, got %s." - % len(requests)) - - -def get_request_value(request_id, directive): - """ - Get property of request. - """ - try: - request = _get_request(dict(nickname=request_id)) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - if directive == 'ca-name': - ca_path = request.obj_if.get_ca() - ca = _cm_dbus_object(request.bus, request, ca_path, DBUS_CM_CA_IF, - DBUS_CM_IF) - return ca.obj_if.get_nickname() - else: - return request.prop_if.Get(DBUS_CM_REQUEST_IF, directive) - else: - return None - - -def get_request_id(criteria): - """ - If you don't know the certmonger request_id then try to find it by looking - through all the requests. - - criteria is a tuple of key/value to search for. The more specific - the better. An error is raised if multiple request_ids are returned for - the same criteria. - - None is returned if none of the criteria match. - """ - try: - request = _get_request(criteria) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname') - else: - return None - - -def get_requests_for_dir(dir): - """ - Return a list containing the request ids for a given NSS database - directory. - """ - reqid = [] - criteria = {'cert-storage': 'NSSDB', 'key-storage': 'NSSDB', - 'cert-database': dir, 'key-database': dir, } - requests = _get_requests(criteria) - for request in requests: - reqid.append(request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname')) - - return reqid - - -def add_request_value(request_id, directive, value): - """ - Add a new directive to a certmonger request file. - """ - try: - request = _get_request({'nickname': request_id}) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - request.obj_if.modify({directive: value}) - - -def add_principal(request_id, principal): - """ - In order for a certmonger request to be renewable it needs a principal. - - When an existing certificate is added via start-tracking it won't have - a principal. - """ - add_request_value(request_id, 'template-principal', [principal]) - - -def add_subject(request_id, subject): - """ - In order for a certmonger request to be renwable it needs the subject - set in the request file. - - When an existing certificate is added via start-tracking it won't have - a subject_template set. - """ - add_request_value(request_id, 'template-subject', subject) - - -def request_and_wait_for_cert( - nssdb, nickname, subject, principal, passwd_fname=None, - dns=None, ca='IPA', profile=None, - pre_command=None, post_command=None): - """ - Execute certmonger to request a server certificate. - - The method also waits for the certificate to be available. - """ - reqId = request_cert(nssdb, nickname, subject, principal, - passwd_fname, dns, ca, profile, - pre_command, post_command) - state = wait_for_request(reqId, timeout=60) - ca_error = get_request_value(reqId, 'ca-error') - if state != 'MONITORING' or ca_error: - raise RuntimeError("Certificate issuance failed") - return reqId - -def request_cert( - nssdb, nickname, subject, principal, passwd_fname=None, - dns=None, ca='IPA', profile=None, pre_command=None, post_command=None): - """ - Execute certmonger to request a server certificate. - - ``dns`` - A sequence of DNS names to appear in SAN request extension. - """ - cm = _certmonger() - ca_path = cm.obj_if.find_ca_by_nickname(ca) - if not ca_path: - raise RuntimeError('{} CA not found'.format(ca)) - request_parameters = dict(KEY_STORAGE='NSSDB', CERT_STORAGE='NSSDB', - CERT_LOCATION=nssdb, CERT_NICKNAME=nickname, - KEY_LOCATION=nssdb, KEY_NICKNAME=nickname, - SUBJECT=subject, - CA=ca_path) - if principal: - request_parameters['PRINCIPAL'] = [principal] - if dns is not None and len(dns) > 0: - request_parameters['DNS'] = dns - if passwd_fname: - request_parameters['KEY_PIN_FILE'] = passwd_fname - if profile: - request_parameters['ca-profile'] = profile - - certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE - if pre_command: - if not os.path.isabs(pre_command): - pre_command = certmonger_cmd_template % (pre_command) - request_parameters['cert-presave-command'] = pre_command - if post_command: - if not os.path.isabs(post_command): - post_command = certmonger_cmd_template % (post_command) - request_parameters['cert-postsave-command'] = post_command - - result = cm.obj_if.add_request(request_parameters) - try: - if result[0]: - request = _cm_dbus_object(cm.bus, cm, result[1], DBUS_CM_REQUEST_IF, - DBUS_CM_IF, True) - else: - raise RuntimeError('add_request() returned False') - except Exception as e: - root_logger.error('Failed to create a new request: {error}' - .format(error=e)) - raise - return request.obj_if.get_nickname() - - -def start_tracking(nickname, secdir, password_file=None, command=None): - """ - Tell certmonger to track the given certificate nickname in NSS - database in secdir protected by optional password file password_file. - - command is an optional parameter which specifies a command for - certmonger to run when it renews a certificate. This command must - reside in /usr/lib/ipa/certmonger to work with SELinux. - - Returns certificate nickname. - """ - cm = _certmonger() - params = {'TRACK': True} - params['cert-nickname'] = nickname - params['cert-database'] = os.path.abspath(secdir) - params['cert-storage'] = 'NSSDB' - params['key-nickname'] = nickname - params['key-database'] = os.path.abspath(secdir) - params['key-storage'] = 'NSSDB' - ca_path = cm.obj_if.find_ca_by_nickname('IPA') - if not ca_path: - raise RuntimeError('IPA CA not found') - params['ca'] = ca_path - if command: - params['cert-postsave-command'] = command - if password_file: - params['KEY_PIN_FILE'] = os.path.abspath(password_file) - result = cm.obj_if.add_request(params) - try: - if result[0]: - request = _cm_dbus_object(cm.bus, cm, result[1], DBUS_CM_REQUEST_IF, - DBUS_CM_IF, True) - else: - raise RuntimeError('add_request() returned False') - except Exception as e: - root_logger.error('Failed to add new request: {error}' - .format(error=e)) - raise - return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname') - - -def stop_tracking(secdir, request_id=None, nickname=None): - """ - Stop tracking the current request using either the request_id or nickname. - - Returns True or False - """ - if request_id is None and nickname is None: - raise RuntimeError('Both request_id and nickname are missing.') - - criteria = {'cert-database': secdir} - if request_id: - criteria['nickname'] = request_id - if nickname: - criteria['cert-nickname'] = nickname - try: - request = _get_request(criteria) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - request.parent.obj_if.remove_request(request.path) - - -def modify(request_id, profile=None): - if profile: - request = _get_request({'nickname': request_id}) - if request: - request.obj_if.modify({'template-profile': profile}) - - -def resubmit_request(request_id, profile=None): - request = _get_request({'nickname': request_id}) - if request: - if profile: - request.obj_if.modify({'template-profile': profile}) - request.obj_if.resubmit() - - -def _find_IPA_ca(): - """ - Look through all the certmonger CA files to find the one that - has id=IPA - - We can use find_request_value because the ca files have the - same file format. - """ - cm = _certmonger() - ca_path = cm.obj_if.find_ca_by_nickname('IPA') - return _cm_dbus_object(cm.bus, cm, ca_path, DBUS_CM_CA_IF, DBUS_CM_IF, True) - - -def add_principal_to_cas(principal): - """ - If the hostname we were passed to use in ipa-client-install doesn't - match the value of gethostname() then we need to append - -k host/HOSTNAME@REALM to the ca helper defined for - /usr/libexec/certmonger/ipa-submit. - - We also need to restore this on uninstall. - """ - ca = _find_IPA_ca() - if ca: - ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper') - if ext_helper and '-k' not in shlex.split(ext_helper): - ext_helper = '%s -k %s' % (ext_helper.strip(), principal) - ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper) - - -def remove_principal_from_cas(): - """ - Remove any -k principal options from the ipa_submit helper. - """ - ca = _find_IPA_ca() - if ca: - ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper') - if ext_helper and '-k' in shlex.split(ext_helper): - ext_helper = shlex.split(ext_helper)[0] - ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper) - - -def modify_ca_helper(ca_name, helper): - """ - Modify certmonger CA helper. - - Applies the new helper and return the previous configuration. - """ - bus = dbus.SystemBus() - obj = bus.get_object('org.fedorahosted.certmonger', - '/org/fedorahosted/certmonger') - iface = dbus.Interface(obj, 'org.fedorahosted.certmonger') - path = iface.find_ca_by_nickname(ca_name) - if not path: - raise RuntimeError("{} is not configured".format(ca_name)) - else: - ca_obj = bus.get_object('org.fedorahosted.certmonger', path) - ca_iface = dbus.Interface(ca_obj, - 'org.freedesktop.DBus.Properties') - old_helper = ca_iface.Get('org.fedorahosted.certmonger.ca', - 'external-helper') - ca_iface.Set('org.fedorahosted.certmonger.ca', - 'external-helper', helper) - return old_helper - - -def get_pin(token): - """ - Dogtag stores its NSS pin in a file formatted as token:PIN. - - The caller is expected to handle any exceptions raised. - """ - with open(paths.PKI_TOMCAT_PASSWORD_CONF, 'r') as f: - for line in f: - (tok, pin) = line.split('=', 1) - if token == tok: - return pin.strip() - return None - - -def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command, - post_command, profile=None): - """ - Tell certmonger to start tracking a dogtag CA certificate. These - are handled differently because their renewal must be done directly - and not through IPA. - - This uses the generic certmonger command getcert so we can specify - a different helper. - - pre_command is the script to execute before a renewal is done. - post_command is the script to execute after a renewal is done. - - Both commands can be None. - """ - - cm = _certmonger() - certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE - - params = {'TRACK': True} - params['cert-nickname'] = nickname - params['cert-database'] = os.path.abspath(secdir) - params['cert-storage'] = 'NSSDB' - params['key-nickname'] = nickname - params['key-database'] = os.path.abspath(secdir) - params['key-storage'] = 'NSSDB' - ca_path = cm.obj_if.find_ca_by_nickname(ca) - if ca_path: - params['ca'] = ca_path - if pin: - params['KEY_PIN'] = pin - if pinfile: - params['KEY_PIN_FILE'] = os.path.abspath(pinfile) - if pre_command: - if not os.path.isabs(pre_command): - pre_command = certmonger_cmd_template % (pre_command) - params['cert-presave-command'] = pre_command - if post_command: - if not os.path.isabs(post_command): - post_command = certmonger_cmd_template % (post_command) - params['cert-postsave-command'] = post_command - if profile: - params['ca-profile'] = profile - - cm.obj_if.add_request(params) - - -def check_state(dirs): - """ - Given a set of directories and nicknames verify that we are no longer - tracking certificates. - - dirs is a list of directories to test for. We will return a tuple - of nicknames for any tracked certificates found. - - This can only check for NSS-based certificates. - """ - reqids = [] - for dir in dirs: - reqids.extend(get_requests_for_dir(dir)) - - return reqids - - -def wait_for_request(request_id, timeout=120): - for _i in range(0, timeout, 5): - state = get_request_value(request_id, 'status') - root_logger.debug("certmonger request is in state %r", state) - if state in ('CA_REJECTED', 'CA_UNREACHABLE', 'CA_UNCONFIGURED', - 'NEED_GUIDANCE', 'NEED_CA', 'MONITORING'): - break - time.sleep(5) - else: - raise RuntimeError("request timed out") - - return state - -if __name__ == '__main__': - request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test", - "cn=tiger.example.com,O=IPA", - "HTTP/tiger.example.com@EXAMPLE.COM") - csr = get_request_value(request_id, 'csr') - print(csr) - stop_tracking(request_id) diff --git a/ipapython/sysrestore.py b/ipapython/sysrestore.py deleted file mode 100644 index b1bf4b912..000000000 --- a/ipapython/sysrestore.py +++ /dev/null @@ -1,441 +0,0 @@ -# Authors: Mark McLoughlin <markmc@redhat.com> -# -# Copyright (C) 2007 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# - -# -# This module provides a very simple API which allows -# ipa-xxx-install --uninstall to restore certain -# parts of the system configuration to the way it was -# before ipa-server-install was first run - -import os -import os.path -import shutil -from ipapython.ipa_log_manager import root_logger -import random - -import six -# pylint: disable=import-error -from six.moves.configparser import SafeConfigParser -# pylint: enable=import-error - -from ipaplatform.tasks import tasks -from ipaplatform.paths import paths - -if six.PY3: - unicode = str - -SYSRESTORE_PATH = paths.TMP -SYSRESTORE_INDEXFILE = "sysrestore.index" -SYSRESTORE_STATEFILE = "sysrestore.state" - - -class FileStore(object): - """Class for handling backup and restore of files""" - - def __init__(self, path = SYSRESTORE_PATH, index_file = SYSRESTORE_INDEXFILE): - """Create a _StoreFiles object, that uses @path as the - base directory. - - The file @path/sysrestore.index is used to store information - about the original location of the saved files. - """ - self._path = path - self._index = os.path.join(self._path, index_file) - - self.random = random.Random() - - self.files = {} - self._load() - - def _load(self): - """Load the file list from the index file. @files will - be an empty dictionary if the file doesn't exist. - """ - - root_logger.debug("Loading Index file from '%s'", self._index) - - self.files = {} - - p = SafeConfigParser() - p.optionxform = str - p.read(self._index) - - for section in p.sections(): - if section == "files": - for (key, value) in p.items(section): - self.files[key] = value - - - def save(self): - """Save the file list to @_index. If @files is an empty - dict, then @_index should be removed. - """ - root_logger.debug("Saving Index File to '%s'", self._index) - - if len(self.files) == 0: - root_logger.debug(" -> no files, removing file") - if os.path.exists(self._index): - os.remove(self._index) - return - - p = SafeConfigParser() - p.optionxform = str - - p.add_section('files') - for (key, value) in self.files.items(): - p.set('files', key, str(value)) - - with open(self._index, "w") as f: - p.write(f) - - def backup_file(self, path): - """Create a copy of the file at @path - so long as a copy - does not already exist - which will be restored to its - original location by restore_files(). - """ - root_logger.debug("Backing up system configuration file '%s'", path) - - if not os.path.isabs(path): - raise ValueError("Absolute path required") - - if not os.path.isfile(path): - root_logger.debug(" -> Not backing up - '%s' doesn't exist", path) - return - - _reldir, backupfile = os.path.split(path) - - filename = "" - for _i in range(8): - h = "%02x" % self.random.randint(0,255) - filename += h - filename += "-"+backupfile - - backup_path = os.path.join(self._path, filename) - if os.path.exists(backup_path): - root_logger.debug(" -> Not backing up - already have a copy of '%s'", path) - return - - shutil.copy2(path, backup_path) - - stat = os.stat(path) - - template = '{stat.st_mode},{stat.st_uid},{stat.st_gid},{path}' - self.files[filename] = template.format(stat=stat, path=path) - self.save() - - def has_file(self, path): - """Checks whether file at @path was added to the file store - - Returns #True if the file exists in the file store, #False otherwise - """ - result = False - for _key, value in self.files.items(): - _mode, _uid, _gid, filepath = value.split(',', 3) - if (filepath == path): - result = True - break - return result - - def restore_file(self, path, new_path = None): - """Restore the copy of a file at @path to its original - location and delete the copy. - - Takes optional parameter @new_path which specifies the - location where the file is to be restored. - - Returns #True if the file was restored, #False if there - was no backup file to restore - """ - - if new_path is None: - root_logger.debug("Restoring system configuration file '%s'", path) - else: - root_logger.debug("Restoring system configuration file '%s' to '%s'", path, new_path) - - if not os.path.isabs(path): - raise ValueError("Absolute path required") - if new_path is not None and not os.path.isabs(new_path): - raise ValueError("Absolute new path required") - - mode = None - uid = None - gid = None - filename = None - - for (key, value) in self.files.items(): - (mode,uid,gid,filepath) = value.split(',', 3) - if (filepath == path): - filename = key - break - - if not filename: - raise ValueError("No such file name in the index") - - backup_path = os.path.join(self._path, filename) - if not os.path.exists(backup_path): - root_logger.debug(" -> Not restoring - '%s' doesn't exist", backup_path) - return False - - if new_path is not None: - path = new_path - - shutil.copy(backup_path, path) # SELinux needs copy - os.remove(backup_path) - - os.chown(path, int(uid), int(gid)) - os.chmod(path, int(mode)) - - tasks.restore_context(path) - - del self.files[filename] - self.save() - - return True - - def restore_all_files(self): - """Restore the files in the inbdex to their original - location and delete the copy. - - Returns #True if the file was restored, #False if there - was no backup file to restore - """ - - if len(self.files) == 0: - return False - - for (filename, value) in self.files.items(): - - (mode,uid,gid,path) = value.split(',', 3) - - backup_path = os.path.join(self._path, filename) - if not os.path.exists(backup_path): - root_logger.debug(" -> Not restoring - '%s' doesn't exist", backup_path) - continue - - shutil.copy(backup_path, path) # SELinux needs copy - os.remove(backup_path) - - os.chown(path, int(uid), int(gid)) - os.chmod(path, int(mode)) - - tasks.restore_context(path) - - # force file to be deleted - self.files = {} - self.save() - - return True - - def has_files(self): - """Return True or False if there are any files in the index - - Can be used to determine if a program is configured. - """ - - return len(self.files) > 0 - - def untrack_file(self, path): - """Remove file at path @path from list of backed up files. - - Does not remove any files from the filesystem. - - Returns #True if the file was untracked, #False if there - was no backup file to restore - """ - - root_logger.debug("Untracking system configuration file '%s'", path) - - if not os.path.isabs(path): - raise ValueError("Absolute path required") - - filename = None - - for (key, value) in self.files.items(): - _mode, _uid, _gid, filepath = value.split(',', 3) - if (filepath == path): - filename = key - break - - if not filename: - raise ValueError("No such file name in the index") - - backup_path = os.path.join(self._path, filename) - if not os.path.exists(backup_path): - root_logger.debug(" -> Not restoring - '%s' doesn't exist", backup_path) - return False - - try: - os.unlink(backup_path) - except Exception as e: - root_logger.error('Error removing %s: %s' % (backup_path, str(e))) - - del self.files[filename] - self.save() - - return True - - -class StateFile(object): - """A metadata file for recording system state which can - be backed up and later restored. - StateFile gets reloaded every time to prevent loss of information - recorded by child processes. But we do not solve concurrency - because there is no need for it right now. - The format is something like: - - [httpd] - running=True - enabled=False - """ - - def __init__(self, path = SYSRESTORE_PATH, state_file = SYSRESTORE_STATEFILE): - """Create a StateFile object, loading from @path. - - The dictionary @modules, a member of the returned object, - is where the state can be modified. @modules is indexed - using a module name to return another dictionary containing - key/value pairs with the saved state of that module. - - The keys in these latter dictionaries are arbitrary strings - and the values may either be strings or booleans. - """ - self._path = os.path.join(path, state_file) - - self.modules = {} - - self._load() - - def _load(self): - """Load the modules from the file @_path. @modules will - be an empty dictionary if the file doesn't exist. - """ - root_logger.debug("Loading StateFile from '%s'", self._path) - - self.modules = {} - - p = SafeConfigParser() - p.optionxform = str - p.read(self._path) - - for module in p.sections(): - self.modules[module] = {} - for (key, value) in p.items(module): - if value == str(True): - value = True - elif value == str(False): - value = False - self.modules[module][key] = value - - def save(self): - """Save the modules to @_path. If @modules is an empty - dict, then @_path should be removed. - """ - root_logger.debug("Saving StateFile to '%s'", self._path) - - for module in list(self.modules): - if len(self.modules[module]) == 0: - del self.modules[module] - - if len(self.modules) == 0: - root_logger.debug(" -> no modules, removing file") - if os.path.exists(self._path): - os.remove(self._path) - return - - p = SafeConfigParser() - p.optionxform = str - - for module in self.modules: - p.add_section(module) - for (key, value) in self.modules[module].items(): - p.set(module, key, str(value)) - - with open(self._path, "w") as f: - p.write(f) - - def backup_state(self, module, key, value): - """Backup an item of system state from @module, identified - by the string @key and with the value @value. @value may be - a string or boolean. - """ - if not isinstance(value, (str, bool, unicode)): - raise ValueError("Only strings, booleans or unicode strings are supported") - - self._load() - - if module not in self.modules: - self.modules[module] = {} - - if key not in self.modules: - self.modules[module][key] = value - - self.save() - - def get_state(self, module, key): - """Return the value of an item of system state from @module, - identified by the string @key. - - If the item doesn't exist, #None will be returned, otherwise - the original string or boolean value is returned. - """ - self._load() - - if module not in self.modules: - return None - - return self.modules[module].get(key, None) - - def delete_state(self, module, key): - """Delete system state from @module, identified by the string - @key. - - If the item doesn't exist, no change is done. - """ - self._load() - - try: - del self.modules[module][key] - except KeyError: - pass - else: - self.save() - - def restore_state(self, module, key): - """Return the value of an item of system state from @module, - identified by the string @key, and remove it from the backed - up system state. - - If the item doesn't exist, #None will be returned, otherwise - the original string or boolean value is returned. - """ - - value = self.get_state(module, key) - - if value is not None: - self.delete_state(module, key) - - return value - - def has_state(self, module): - """Return True or False if there is any state stored for @module. - - Can be used to determine if a service is configured. - """ - - return module in self.modules |