diff options
| author | Jan Cholasta <jcholast@redhat.com> | 2016-11-23 15:04:40 +0100 |
|---|---|---|
| committer | Martin Basti <mbasti@redhat.com> | 2016-11-29 14:50:51 +0100 |
| commit | 26c46a447f82b4cf37a5076b72cf6328857d5f35 (patch) | |
| tree | a0ee44b13530ed34f190e65f7e0726d88a71f304 /ipapython/certmonger.py | |
| parent | a1f260d021bf5d018e634438fde6b7c81ebbbcef (diff) | |
| download | freeipa-26c46a447f82b4cf37a5076b72cf6328857d5f35.tar.gz freeipa-26c46a447f82b4cf37a5076b72cf6328857d5f35.tar.xz freeipa-26c46a447f82b4cf37a5076b72cf6328857d5f35.zip | |
ipapython: move certmonger and sysrestore to ipalib.install
The certmonger and sysrestore modules depend on ipaplatform.
Move them to ipalib.install as they are used only from installers.
https://fedorahosted.org/freeipa/ticket/6474
Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
Diffstat (limited to 'ipapython/certmonger.py')
| -rw-r--r-- | ipapython/certmonger.py | 611 |
1 files changed, 0 insertions, 611 deletions
diff --git a/ipapython/certmonger.py b/ipapython/certmonger.py deleted file mode 100644 index 6f0948af6..000000000 --- a/ipapython/certmonger.py +++ /dev/null @@ -1,611 +0,0 @@ -# Authors: Rob Crittenden <rcritten@redhat.com> -# David Kupka <dkupka@redhat.com> -# -# Copyright (C) 2010 Red Hat -# see file 'COPYING' for use and warranty information -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. -# - -# Some certmonger functions, mostly around updating the request file. -# This is used so we can add tracking to the Apache and 389-ds -# server certificates created during the IPA server installation. - -from __future__ import print_function - -import os -import time -import dbus -import shlex -import subprocess -import tempfile -from ipapython.ipa_log_manager import root_logger -from ipaplatform.paths import paths -from ipaplatform import services - -DBUS_CM_PATH = '/org/fedorahosted/certmonger' -DBUS_CM_IF = 'org.fedorahosted.certmonger' -DBUS_CM_NAME = 'org.fedorahosted.certmonger' -DBUS_CM_REQUEST_IF = 'org.fedorahosted.certmonger.request' -DBUS_CM_CA_IF = 'org.fedorahosted.certmonger.ca' -DBUS_PROPERTY_IF = 'org.freedesktop.DBus.Properties' - - -class _cm_dbus_object(object): - """ - Auxiliary class for convenient DBus object handling. - """ - def __init__(self, bus, parent, object_path, object_dbus_interface, - parent_dbus_interface=None, property_interface=False): - """ - bus - DBus bus object, result of dbus.SystemBus() or dbus.SessionBus() - Object is accesible over this DBus bus instance. - object_path - path to requested object on DBus bus - object_dbus_interface - parent_dbus_interface - property_interface - create DBus property interface? True or False - """ - if bus is None or object_path is None or object_dbus_interface is None: - raise RuntimeError( - "bus, object_path and dbus_interface must not be None.") - if parent_dbus_interface is None: - parent_dbus_interface = object_dbus_interface - self.bus = bus - self.parent = parent - self.path = object_path - self.obj_dbus_if = object_dbus_interface - self.parent_dbus_if = parent_dbus_interface - self.obj = bus.get_object(parent_dbus_interface, object_path) - self.obj_if = dbus.Interface(self.obj, object_dbus_interface) - if property_interface: - self.prop_if = dbus.Interface(self.obj, DBUS_PROPERTY_IF) - - -class _certmonger(_cm_dbus_object): - """ - Create a connection to certmonger. - By default use SystemBus. When not available use private connection - over Unix socket. - This solution is really ugly and should be removed as soon as DBus - SystemBus is available at system install time. - """ - timeout = 300 - - def _start_private_conn(self): - sock_filename = os.path.join(tempfile.mkdtemp(), 'certmonger') - self._proc = subprocess.Popen([paths.CERTMONGER, '-n', '-L', '-P', - sock_filename]) - for _t in range(0, self.timeout, 5): - if os.path.exists(sock_filename): - return "unix:path=%s" % sock_filename - time.sleep(5) - self._stop_private_conn() - raise RuntimeError("Failed to start certmonger: Timed out") - - def _stop_private_conn(self): - if self._proc: - retcode = self._proc.poll() - if retcode is not None: - return - self._proc.terminate() - for _t in range(0, self.timeout, 5): - retcode = self._proc.poll() - if retcode is not None: - return - time.sleep(5) - root_logger.error("Failed to stop certmonger.") - - def __del__(self): - self._stop_private_conn() - - def __init__(self): - self._proc = None - self._bus = None - try: - self._bus = dbus.SystemBus() - except dbus.DBusException as e: - err_name = e.get_dbus_name() - if err_name not in ['org.freedesktop.DBus.Error.NoServer', - 'org.freedesktop.DBus.Error.FileNotFound']: - root_logger.error("Failed to connect to certmonger over " - "SystemBus: %s" % e) - raise - try: - self._private_sock = self._start_private_conn() - self._bus = dbus.connection.Connection(self._private_sock) - except dbus.DBusException as e: - root_logger.error("Failed to connect to certmonger over " - "private socket: %s" % e) - raise - else: - try: - self._bus.get_name_owner(DBUS_CM_NAME) - except dbus.DBusException: - try: - services.knownservices.certmonger.start() - except Exception as e: - root_logger.error("Failed to start certmonger: %s" % e) - raise - - for _t in range(0, self.timeout, 5): - try: - self._bus.get_name_owner(DBUS_CM_NAME) - break - except dbus.DBusException: - pass - time.sleep(5) - raise RuntimeError('Failed to start certmonger') - - super(_certmonger, self).__init__(self._bus, None, DBUS_CM_PATH, - DBUS_CM_IF) - - -def _get_requests(criteria=dict()): - """ - Get all requests that matches the provided criteria. - """ - if not isinstance(criteria, dict): - raise TypeError('"criteria" must be dict.') - - cm = _certmonger() - requests = [] - requests_paths = [] - if 'nickname' in criteria: - request_path = cm.obj_if.find_request_by_nickname(criteria['nickname']) - if request_path: - requests_paths = [request_path] - else: - requests_paths = cm.obj_if.get_requests() - - for request_path in requests_paths: - request = _cm_dbus_object(cm.bus, cm, request_path, DBUS_CM_REQUEST_IF, - DBUS_CM_IF, True) - for criterion in criteria: - if criterion == 'ca-name': - ca_path = request.obj_if.get_ca() - ca = _cm_dbus_object(cm.bus, cm, ca_path, DBUS_CM_CA_IF, - DBUS_CM_IF) - value = ca.obj_if.get_nickname() - else: - value = request.prop_if.Get(DBUS_CM_REQUEST_IF, criterion) - if value != criteria[criterion]: - break - else: - requests.append(request) - - return requests - - -def _get_request(criteria): - """ - Find request that matches criteria. - If 'nickname' is specified other criteria are ignored because 'nickname' - uniquely identify single request. - When multiple or none request matches specified criteria RuntimeError is - raised. - """ - requests = _get_requests(criteria) - if len(requests) == 0: - return None - elif len(requests) == 1: - return requests[0] - else: - raise RuntimeError("Criteria expected to be met by 1 request, got %s." - % len(requests)) - - -def get_request_value(request_id, directive): - """ - Get property of request. - """ - try: - request = _get_request(dict(nickname=request_id)) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - if directive == 'ca-name': - ca_path = request.obj_if.get_ca() - ca = _cm_dbus_object(request.bus, request, ca_path, DBUS_CM_CA_IF, - DBUS_CM_IF) - return ca.obj_if.get_nickname() - else: - return request.prop_if.Get(DBUS_CM_REQUEST_IF, directive) - else: - return None - - -def get_request_id(criteria): - """ - If you don't know the certmonger request_id then try to find it by looking - through all the requests. - - criteria is a tuple of key/value to search for. The more specific - the better. An error is raised if multiple request_ids are returned for - the same criteria. - - None is returned if none of the criteria match. - """ - try: - request = _get_request(criteria) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname') - else: - return None - - -def get_requests_for_dir(dir): - """ - Return a list containing the request ids for a given NSS database - directory. - """ - reqid = [] - criteria = {'cert-storage': 'NSSDB', 'key-storage': 'NSSDB', - 'cert-database': dir, 'key-database': dir, } - requests = _get_requests(criteria) - for request in requests: - reqid.append(request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname')) - - return reqid - - -def add_request_value(request_id, directive, value): - """ - Add a new directive to a certmonger request file. - """ - try: - request = _get_request({'nickname': request_id}) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - request.obj_if.modify({directive: value}) - - -def add_principal(request_id, principal): - """ - In order for a certmonger request to be renewable it needs a principal. - - When an existing certificate is added via start-tracking it won't have - a principal. - """ - add_request_value(request_id, 'template-principal', [principal]) - - -def add_subject(request_id, subject): - """ - In order for a certmonger request to be renwable it needs the subject - set in the request file. - - When an existing certificate is added via start-tracking it won't have - a subject_template set. - """ - add_request_value(request_id, 'template-subject', subject) - - -def request_and_wait_for_cert( - nssdb, nickname, subject, principal, passwd_fname=None, - dns=None, ca='IPA', profile=None, - pre_command=None, post_command=None): - """ - Execute certmonger to request a server certificate. - - The method also waits for the certificate to be available. - """ - reqId = request_cert(nssdb, nickname, subject, principal, - passwd_fname, dns, ca, profile, - pre_command, post_command) - state = wait_for_request(reqId, timeout=60) - ca_error = get_request_value(reqId, 'ca-error') - if state != 'MONITORING' or ca_error: - raise RuntimeError("Certificate issuance failed") - return reqId - -def request_cert( - nssdb, nickname, subject, principal, passwd_fname=None, - dns=None, ca='IPA', profile=None, pre_command=None, post_command=None): - """ - Execute certmonger to request a server certificate. - - ``dns`` - A sequence of DNS names to appear in SAN request extension. - """ - cm = _certmonger() - ca_path = cm.obj_if.find_ca_by_nickname(ca) - if not ca_path: - raise RuntimeError('{} CA not found'.format(ca)) - request_parameters = dict(KEY_STORAGE='NSSDB', CERT_STORAGE='NSSDB', - CERT_LOCATION=nssdb, CERT_NICKNAME=nickname, - KEY_LOCATION=nssdb, KEY_NICKNAME=nickname, - SUBJECT=subject, - CA=ca_path) - if principal: - request_parameters['PRINCIPAL'] = [principal] - if dns is not None and len(dns) > 0: - request_parameters['DNS'] = dns - if passwd_fname: - request_parameters['KEY_PIN_FILE'] = passwd_fname - if profile: - request_parameters['ca-profile'] = profile - - certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE - if pre_command: - if not os.path.isabs(pre_command): - pre_command = certmonger_cmd_template % (pre_command) - request_parameters['cert-presave-command'] = pre_command - if post_command: - if not os.path.isabs(post_command): - post_command = certmonger_cmd_template % (post_command) - request_parameters['cert-postsave-command'] = post_command - - result = cm.obj_if.add_request(request_parameters) - try: - if result[0]: - request = _cm_dbus_object(cm.bus, cm, result[1], DBUS_CM_REQUEST_IF, - DBUS_CM_IF, True) - else: - raise RuntimeError('add_request() returned False') - except Exception as e: - root_logger.error('Failed to create a new request: {error}' - .format(error=e)) - raise - return request.obj_if.get_nickname() - - -def start_tracking(nickname, secdir, password_file=None, command=None): - """ - Tell certmonger to track the given certificate nickname in NSS - database in secdir protected by optional password file password_file. - - command is an optional parameter which specifies a command for - certmonger to run when it renews a certificate. This command must - reside in /usr/lib/ipa/certmonger to work with SELinux. - - Returns certificate nickname. - """ - cm = _certmonger() - params = {'TRACK': True} - params['cert-nickname'] = nickname - params['cert-database'] = os.path.abspath(secdir) - params['cert-storage'] = 'NSSDB' - params['key-nickname'] = nickname - params['key-database'] = os.path.abspath(secdir) - params['key-storage'] = 'NSSDB' - ca_path = cm.obj_if.find_ca_by_nickname('IPA') - if not ca_path: - raise RuntimeError('IPA CA not found') - params['ca'] = ca_path - if command: - params['cert-postsave-command'] = command - if password_file: - params['KEY_PIN_FILE'] = os.path.abspath(password_file) - result = cm.obj_if.add_request(params) - try: - if result[0]: - request = _cm_dbus_object(cm.bus, cm, result[1], DBUS_CM_REQUEST_IF, - DBUS_CM_IF, True) - else: - raise RuntimeError('add_request() returned False') - except Exception as e: - root_logger.error('Failed to add new request: {error}' - .format(error=e)) - raise - return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname') - - -def stop_tracking(secdir, request_id=None, nickname=None): - """ - Stop tracking the current request using either the request_id or nickname. - - Returns True or False - """ - if request_id is None and nickname is None: - raise RuntimeError('Both request_id and nickname are missing.') - - criteria = {'cert-database': secdir} - if request_id: - criteria['nickname'] = request_id - if nickname: - criteria['cert-nickname'] = nickname - try: - request = _get_request(criteria) - except RuntimeError as e: - root_logger.error('Failed to get request: %s' % e) - raise - if request: - request.parent.obj_if.remove_request(request.path) - - -def modify(request_id, profile=None): - if profile: - request = _get_request({'nickname': request_id}) - if request: - request.obj_if.modify({'template-profile': profile}) - - -def resubmit_request(request_id, profile=None): - request = _get_request({'nickname': request_id}) - if request: - if profile: - request.obj_if.modify({'template-profile': profile}) - request.obj_if.resubmit() - - -def _find_IPA_ca(): - """ - Look through all the certmonger CA files to find the one that - has id=IPA - - We can use find_request_value because the ca files have the - same file format. - """ - cm = _certmonger() - ca_path = cm.obj_if.find_ca_by_nickname('IPA') - return _cm_dbus_object(cm.bus, cm, ca_path, DBUS_CM_CA_IF, DBUS_CM_IF, True) - - -def add_principal_to_cas(principal): - """ - If the hostname we were passed to use in ipa-client-install doesn't - match the value of gethostname() then we need to append - -k host/HOSTNAME@REALM to the ca helper defined for - /usr/libexec/certmonger/ipa-submit. - - We also need to restore this on uninstall. - """ - ca = _find_IPA_ca() - if ca: - ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper') - if ext_helper and '-k' not in shlex.split(ext_helper): - ext_helper = '%s -k %s' % (ext_helper.strip(), principal) - ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper) - - -def remove_principal_from_cas(): - """ - Remove any -k principal options from the ipa_submit helper. - """ - ca = _find_IPA_ca() - if ca: - ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper') - if ext_helper and '-k' in shlex.split(ext_helper): - ext_helper = shlex.split(ext_helper)[0] - ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper) - - -def modify_ca_helper(ca_name, helper): - """ - Modify certmonger CA helper. - - Applies the new helper and return the previous configuration. - """ - bus = dbus.SystemBus() - obj = bus.get_object('org.fedorahosted.certmonger', - '/org/fedorahosted/certmonger') - iface = dbus.Interface(obj, 'org.fedorahosted.certmonger') - path = iface.find_ca_by_nickname(ca_name) - if not path: - raise RuntimeError("{} is not configured".format(ca_name)) - else: - ca_obj = bus.get_object('org.fedorahosted.certmonger', path) - ca_iface = dbus.Interface(ca_obj, - 'org.freedesktop.DBus.Properties') - old_helper = ca_iface.Get('org.fedorahosted.certmonger.ca', - 'external-helper') - ca_iface.Set('org.fedorahosted.certmonger.ca', - 'external-helper', helper) - return old_helper - - -def get_pin(token): - """ - Dogtag stores its NSS pin in a file formatted as token:PIN. - - The caller is expected to handle any exceptions raised. - """ - with open(paths.PKI_TOMCAT_PASSWORD_CONF, 'r') as f: - for line in f: - (tok, pin) = line.split('=', 1) - if token == tok: - return pin.strip() - return None - - -def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command, - post_command, profile=None): - """ - Tell certmonger to start tracking a dogtag CA certificate. These - are handled differently because their renewal must be done directly - and not through IPA. - - This uses the generic certmonger command getcert so we can specify - a different helper. - - pre_command is the script to execute before a renewal is done. - post_command is the script to execute after a renewal is done. - - Both commands can be None. - """ - - cm = _certmonger() - certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE - - params = {'TRACK': True} - params['cert-nickname'] = nickname - params['cert-database'] = os.path.abspath(secdir) - params['cert-storage'] = 'NSSDB' - params['key-nickname'] = nickname - params['key-database'] = os.path.abspath(secdir) - params['key-storage'] = 'NSSDB' - ca_path = cm.obj_if.find_ca_by_nickname(ca) - if ca_path: - params['ca'] = ca_path - if pin: - params['KEY_PIN'] = pin - if pinfile: - params['KEY_PIN_FILE'] = os.path.abspath(pinfile) - if pre_command: - if not os.path.isabs(pre_command): - pre_command = certmonger_cmd_template % (pre_command) - params['cert-presave-command'] = pre_command - if post_command: - if not os.path.isabs(post_command): - post_command = certmonger_cmd_template % (post_command) - params['cert-postsave-command'] = post_command - if profile: - params['ca-profile'] = profile - - cm.obj_if.add_request(params) - - -def check_state(dirs): - """ - Given a set of directories and nicknames verify that we are no longer - tracking certificates. - - dirs is a list of directories to test for. We will return a tuple - of nicknames for any tracked certificates found. - - This can only check for NSS-based certificates. - """ - reqids = [] - for dir in dirs: - reqids.extend(get_requests_for_dir(dir)) - - return reqids - - -def wait_for_request(request_id, timeout=120): - for _i in range(0, timeout, 5): - state = get_request_value(request_id, 'status') - root_logger.debug("certmonger request is in state %r", state) - if state in ('CA_REJECTED', 'CA_UNREACHABLE', 'CA_UNCONFIGURED', - 'NEED_GUIDANCE', 'NEED_CA', 'MONITORING'): - break - time.sleep(5) - else: - raise RuntimeError("request timed out") - - return state - -if __name__ == '__main__': - request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test", - "cn=tiger.example.com,O=IPA", - "HTTP/tiger.example.com@EXAMPLE.COM") - csr = get_request_value(request_id, 'csr') - print(csr) - stop_tracking(request_id) |
