summaryrefslogtreecommitdiffstats
path: root/ipapython
diff options
context:
space:
mode:
authorDavid Kupka <dkupka@redhat.com>2014-09-03 09:07:16 +0200
committerMartin Kosek <mkosek@redhat.com>2014-09-05 10:51:42 +0200
commit6d94cdf250c470bf77a0e769ea30a90fa5815b81 (patch)
tree499bd2ba7ddc4cd5311b35070e8f7db9a85b8b88 /ipapython
parent62a255949377d4a6b3cc197462223b5b0495d18d (diff)
downloadfreeipa-6d94cdf250c470bf77a0e769ea30a90fa5815b81.tar.gz
freeipa-6d94cdf250c470bf77a0e769ea30a90fa5815b81.tar.xz
freeipa-6d94cdf250c470bf77a0e769ea30a90fa5815b81.zip
Use certmonger D-Bus API instead of messing with its files.
FreeIPA certmonger module changed to use D-Bus to communicate with certmonger. Using the D-Bus API should be more stable and supported way of using cermonger than tampering with its files. >=certmonger-0.75.13 is needed for this to work. https://fedorahosted.org/freeipa/ticket/4280 Reviewed-By: Jan Cholasta <jcholast@redhat.com>
Diffstat (limited to 'ipapython')
-rw-r--r--ipapython/certmonger.py533
1 files changed, 270 insertions, 263 deletions
diff --git a/ipapython/certmonger.py b/ipapython/certmonger.py
index 673707384..f58b6dafb 100644
--- a/ipapython/certmonger.py
+++ b/ipapython/certmonger.py
@@ -1,4 +1,5 @@
# Authors: Rob Crittenden <rcritten@redhat.com>
+# David Kupka <dkupka@redhat.com>
#
# Copyright (C) 2010 Red Hat
# see file 'COPYING' for use and warranty information
@@ -23,137 +24,202 @@
import os
import sys
-import re
import time
+import dbus
from ipapython import ipautil
from ipapython import dogtag
from ipapython.ipa_log_manager import *
from ipaplatform.paths import paths
+from ipaplatform import services
+
+REQUEST_DIR = paths.CERTMONGER_REQUESTS_DIR
+CA_DIR = paths.CERTMONGER_CAS_DIR
+
+DBUS_CM_PATH = '/org/fedorahosted/certmonger'
+DBUS_CM_IF = 'org.fedorahosted.certmonger'
+DBUS_CM_REQUEST_IF = 'org.fedorahosted.certmonger.request'
+DBUS_CM_CA_IF = 'org.fedorahosted.certmonger.ca'
+DBUS_PROPERTY_IF = 'org.freedesktop.DBus.Properties'
+
+
+class _cm_dbus_object(object):
+ """
+ Auxiliary class for convenient DBus object handling.
+ """
+ def __init__(self, bus, object_path, object_dbus_interface,
+ parent_dbus_interface=None, property_interface=False):
+ """
+ bus - DBus bus object, result of dbus.SystemBus() or dbus.SessionBus()
+ Object is accesible over this DBus bus instance.
+ object_path - path to requested object on DBus bus
+ object_dbus_interface
+ parent_dbus_interface
+ property_interface - create DBus property interface? True or False
+ """
+ if bus is None or object_path is None or object_dbus_interface is None:
+ raise RuntimeError(
+ "bus, object_path and dbus_interface must not be None.")
+ if parent_dbus_interface is None:
+ parent_dbus_interface = object_dbus_interface
+ self.bus = bus
+ self.path = object_path
+ self.obj_dbus_if = object_dbus_interface
+ self.parent_dbus_if = parent_dbus_interface
+ self.obj = bus.get_object(parent_dbus_interface, object_path)
+ self.obj_if = dbus.Interface(self.obj, object_dbus_interface)
+ if property_interface:
+ self.prop_if = dbus.Interface(self.obj, DBUS_PROPERTY_IF)
+
+
+def _start_certmonger():
+ """
+ Start certmonger daemon. If it's already running systemctl just ignores
+ the command.
+ """
+ if not services.knownservices.certmonger.is_running():
+ try:
+ services.knownservices.certmonger.start()
+ except Exception, e:
+ root_logger.error('Failed to start certmonger: %s' % e)
+ raise
+
+
+def _connect_to_certmonger():
+ """
+ Start certmonger daemon and connect to it via DBus.
+ """
+ try:
+ _start_certmonger()
+ except (KeyboardInterrupt, OSError), e:
+ root_logger.error('Failed to start certmonger: %s' % e)
+ raise
+
+ try:
+ bus = dbus.SystemBus()
+ cm = _cm_dbus_object(bus, DBUS_CM_PATH, DBUS_CM_IF)
+ except dbus.DBusException, e:
+ root_logger.error("Failed to access certmonger over DBus: %s", e)
+ raise
+ return cm
-REQUEST_DIR=paths.CERTMONGER_REQUESTS_DIR
-CA_DIR=paths.CERTMONGER_CAS_DIR
-
-# Normalizer types for critera in get_request_id()
-NPATH = 1
-
-def find_request_value(filename, directive):
- """
- Return a value from a certmonger request file for the requested directive
-
- It tries to do this a number of times because sometimes there is a delay
- when ipa-getcert returns and the file is fully updated, particularly
- when doing a request. Generating a CSR is fast but not instantaneous.
- """
- tries = 1
- value = None
- found = False
- while value is None and tries <= 5:
- tries=tries + 1
- time.sleep(1)
- fp = open(filename, 'r')
- lines = fp.readlines()
- fp.close()
-
- for line in lines:
- if found:
- # A value can span multiple lines. If it does then it has a
- # leading space.
- if not line.startswith(' '):
- # We hit the next directive, return now
- return value
- else:
- value = value + line[1:]
+
+def _get_requests(criteria=dict()):
+ """
+ Get all requests that matches the provided criteria.
+ """
+ if not isinstance(criteria, dict):
+ raise TypeError('"criteria" must be dict.')
+
+ cm = _connect_to_certmonger()
+ requests = []
+ requests_paths = []
+ if 'nickname' in criteria:
+ request_path = cm.obj_if.find_request_by_nickname(criteria['nickname'])
+ if request_path:
+ requests_paths = [request_path]
+ else:
+ requests_paths = cm.obj_if.get_requests()
+
+ for request_path in requests_paths:
+ request = _cm_dbus_object(cm.bus, request_path, DBUS_CM_REQUEST_IF,
+ DBUS_CM_IF, True)
+ for criterion in criteria:
+ if criterion == 'ca-name':
+ ca_path = request.obj_if.get_ca()
+ ca = _cm_dbus_object(cm.bus, ca_path, DBUS_CM_CA_IF,
+ DBUS_CM_IF)
+ value = ca.obj_if.get_nickname()
else:
- if line.startswith(directive + '='):
- found = True
- value = line[len(directive)+1:]
+ value = request.prop_if.Get(DBUS_CM_REQUEST_IF, criterion)
+ if value != criteria[criterion]:
+ break
+ else:
+ requests.append(request)
+ return requests
+
+
+def _get_request(criteria):
+ """
+ Find request that matches criteria.
+ If 'nickname' is specified other criteria are ignored because 'nickname'
+ uniquely identify single request.
+ When multiple or none request matches specified criteria RuntimeError is
+ raised.
+ """
+ requests = _get_requests(criteria)
+ if len(requests) == 0:
+ return None
+ elif len(requests) == 1:
+ return requests[0]
+ else:
+ raise RuntimeError("Criteria expected to be met by 1 request, got %s."
+ % len(requests))
- return value
def get_request_value(request_id, directive):
"""
- There is no guarantee that the request_id will match the filename
- in the certmonger requests directory, so open each one to find the
- request_id.
+ Get property of request.
"""
- fileList=os.listdir(REQUEST_DIR)
- for file in fileList:
- value = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id')
- if value is not None and value.rstrip() == request_id:
- return find_request_value('%s/%s' % (REQUEST_DIR, file), directive)
+ try:
+ request = _get_request(dict(nickname=request_id))
+ except RuntimeError, e:
+ root_logger.error('Failed to get request: %s' % e)
+ raise
+ if request:
+ return request.prop_if.Get(DBUS_CM_REQUEST_IF, directive)
+ else:
+ return None
- return None
def get_request_id(criteria):
"""
If you don't know the certmonger request_id then try to find it by looking
- through all the request files. An alternative would be to parse the
- ipa-getcert list output but this seems cleaner.
+ through all the requests.
- criteria is a tuple of key/value/type to search for. The more specific
+ criteria is a tuple of key/value to search for. The more specific
the better. An error is raised if multiple request_ids are returned for
the same criteria.
None is returned if none of the criteria match.
"""
- assert type(criteria) is tuple
-
- reqid=None
- fileList=os.listdir(REQUEST_DIR)
- for file in fileList:
- match = True
- for (key, value, valtype) in criteria:
- rv = find_request_value('%s/%s' % (REQUEST_DIR, file), key)
- if rv and valtype == NPATH:
- rv = os.path.abspath(rv)
- if rv is None or rv.rstrip() != value:
- match = False
- break
- if match and reqid is not None:
- raise RuntimeError('multiple certmonger requests match the criteria')
- if match:
- reqid = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id').rstrip()
+ try:
+ request = _get_request(criteria)
+ except RuntimeError, e:
+ root_logger.error('Failed to get request: %s' % e)
+ raise
+ if request:
+ return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname')
+ else:
+ return None
- return reqid
def get_requests_for_dir(dir):
"""
Return a list containing the request ids for a given NSS database
directory.
"""
- reqid=[]
- fileList=os.listdir(REQUEST_DIR)
- for file in fileList:
- rv = find_request_value(os.path.join(REQUEST_DIR, file),
- 'cert_storage_location')
- if rv is None:
- continue
- rv = os.path.abspath(rv).rstrip()
- if rv != dir:
- continue
- id = find_request_value(os.path.join(REQUEST_DIR, file), 'id')
- if id is not None:
- reqid.append(id.rstrip())
+ reqid = []
+ criteria = {'cert-storage': 'NSSDB', 'key-storage': 'NSSDB',
+ 'cert-database': dir, 'key-database': dir, }
+ requests = _get_requests(criteria)
+ for request in requests:
+ reqid.append(request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname'))
return reqid
+
def add_request_value(request_id, directive, value):
"""
Add a new directive to a certmonger request file.
-
- The certmonger service MUST be stopped in order for this to work.
"""
- fileList=os.listdir(REQUEST_DIR)
- for file in fileList:
- id = find_request_value('%s/%s' % (REQUEST_DIR, file), 'id')
- if id is not None and id.rstrip() == request_id:
- current_value = find_request_value('%s/%s' % (REQUEST_DIR, file), directive)
- if not current_value:
- fp = open('%s/%s' % (REQUEST_DIR, file), 'a')
- fp.write('%s=%s\n' % (directive, value))
- fp.close()
+ try:
+ request = _get_request({'nickname': request_id})
+ except RuntimeError, e:
+ root_logger.error('Failed to get request: %s' % e)
+ raise
+ if request:
+ request.obj_if.modify({directive: value})
- return
def add_principal(request_id, principal):
"""
@@ -162,7 +228,8 @@ def add_principal(request_id, principal):
When an existing certificate is added via start-tracking it won't have
a principal.
"""
- return add_request_value(request_id, 'template_principal', principal)
+ add_request_value(request_id, 'template-principal', [principal])
+
def add_subject(request_id, subject):
"""
@@ -172,47 +239,29 @@ def add_subject(request_id, subject):
When an existing certificate is added via start-tracking it won't have
a subject_template set.
"""
- return add_request_value(request_id, 'template_subject', subject)
+ add_request_value(request_id, 'template-subject', subject)
+
def request_cert(nssdb, nickname, subject, principal, passwd_fname=None):
"""
- Execute certmonger to request a server certificate
+ Execute certmonger to request a server certificate.
"""
- args = [paths.IPA_GETCERT,
- 'request',
- '-d', nssdb,
- '-n', nickname,
- '-N', subject,
- '-K', principal,
- ]
+ cm = _connect_to_certmonger()
+ request_parameters = dict(KEY_STORAGE='NSSDB', CERT_STORAGE='NSSDB',
+ CERT_LOCATION=nssdb, CERT_NICKNAME=nickname,
+ SUBJECT=subject, PRINCIPAL=principal,)
if passwd_fname:
- args.append('-p')
- args.append(os.path.abspath(passwd_fname))
- (stdout, stderr, returncode) = ipautil.run(args)
- # FIXME: should be some error handling around this
- m = re.match('New signing request "(\d+)" added', stdout)
- request_id = m.group(1)
- return request_id
-
-def cert_exists(nickname, secdir):
- """
- See if a nickname exists in an NSS database.
-
- Returns True/False
-
- This isn't very sophisticated in that it doesn't differentiate between
- a database that doesn't exist and a nickname that doesn't exist within
- the database.
- """
- args = [paths.CERTUTIL, "-L",
- "-d", os.path.abspath(secdir),
- "-n", nickname
- ]
- (stdout, stderr, rc) = ipautil.run(args, raiseonerr=False)
- if rc == 0:
- return True
- else:
- return False
+ request_parameters['KEY_PIN_FILE'] = passwd_fname
+ result = cm.obj_if.add_request(request_parameters)
+ try:
+ if result[0]:
+ request = _cm_dbus_object(cm.bus, result[1], DBUS_CM_REQUEST_IF,
+ DBUS_CM_IF, True)
+ except TypeError:
+ root_logger.error('Failed to get create new request.')
+ raise
+ return request.obj_if.get_nickname()
+
def start_tracking(nickname, secdir, password_file=None, command=None):
"""
@@ -223,75 +272,69 @@ def start_tracking(nickname, secdir, password_file=None, command=None):
certmonger to run when it renews a certificate. This command must
reside in /usr/lib/ipa/certmonger to work with SELinux.
- Returns the stdout, stderr and returncode from running ipa-getcert
-
- This assumes that certmonger is already running.
+ Returns True or False
"""
- if not cert_exists(nickname, os.path.abspath(secdir)):
- raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir))
- args = [paths.IPA_GETCERT, "start-tracking",
- "-d", os.path.abspath(secdir),
- "-n", nickname]
- if password_file:
- args.append("-p")
- args.append(os.path.abspath(password_file))
+ cm = _connect_to_certmonger()
+ params = {'TRACK': True}
+ params['cert-nickname'] = nickname
+ params['cert-database'] = os.path.abspath(secdir)
+ params['cert-storage'] = 'NSSDB'
+ params['key-nickname'] = nickname
+ params['key-database'] = os.path.abspath(secdir)
+ params['key-storage'] = 'NSSDB'
if command:
- args.append("-C")
- args.append(command)
-
- (stdout, stderr, returncode) = ipautil.run(args)
+ params['cert-postsave-command'] = command
+ if password_file:
+ params['KEY_PIN_FILE'] = os.path.abspath(password_file)
+ result = cm.obj_if.add_request(params)
+ try:
+ if result[0]:
+ request = _cm_dbus_object(cm.bus, result[1], DBUS_CM_REQUEST_IF,
+ DBUS_CM_IF, True)
+ except TypeError, e:
+ root_logger.error('Failed to add new request.')
+ raise
+ return request.prop_if.Get(DBUS_CM_REQUEST_IF, 'nickname')
- return (stdout, stderr, returncode)
def stop_tracking(secdir, request_id=None, nickname=None):
"""
Stop tracking the current request using either the request_id or nickname.
- This assumes that the certmonger service is running.
+ Returns True or False
"""
if request_id is None and nickname is None:
raise RuntimeError('Both request_id and nickname are missing.')
- if nickname:
- # Using the nickname find the certmonger request_id
- criteria = (('cert_storage_location', os.path.abspath(secdir), NPATH),('cert_nickname', nickname, None))
- try:
- request_id = get_request_id(criteria)
- if request_id is None:
- return ('', '', 0)
- except RuntimeError:
- # This means that multiple requests matched, skip it for now
- # Fall back to trying to stop tracking using nickname
- pass
-
- args = [paths.GETCERT,
- 'stop-tracking',
- ]
- if request_id:
- args.append('-i')
- args.append(request_id)
- else:
- args.append('-n')
- args.append(nickname)
- args.append('-d')
- args.append(os.path.abspath(secdir))
- (stdout, stderr, returncode) = ipautil.run(args)
+ criteria = {'cert-database': secdir}
+ if request_id:
+ criteria['nickname'] = request_id
+ if nickname:
+ criteria['cert-nickname'] = nickname
+ try:
+ request = _get_request(criteria)
+ except RuntimeError, e:
+ root_logger.error('Failed to get request: %s' % e)
+ raise
+ if request:
+ cm = _connect_to_certmonger()
+ cm.obj_if.remove_request(request.path)
- return (stdout, stderr, returncode)
def modify(request_id, profile=None):
- args = [paths.GETCERT, 'start-tracking',
- '-i', request_id]
if profile:
- args += ['-T', profile]
- return ipautil.run(args)
+ request = _get_request({'nickname': request_id})
+ if request:
+ request.obj_if.modify({'template-profile': profile})
+
def resubmit_request(request_id, profile=None):
- args = [paths.IPA_GETCERT, 'resubmit',
- '-i', request_id]
- if profile:
- args += ['-T', profile]
- return ipautil.run(args)
+ request = _get_request({'nickname': request_id})
+ if request:
+ if profile:
+ request.obj_if.modify({'template-profile': profile})
+ request.obj_if.resubmit()
+
def _find_IPA_ca():
"""
@@ -301,13 +344,10 @@ def _find_IPA_ca():
We can use find_request_value because the ca files have the
same file format.
"""
- fileList=os.listdir(CA_DIR)
- for file in fileList:
- value = find_request_value('%s/%s' % (CA_DIR, file), 'id')
- if value is not None and value.strip() == 'IPA':
- return '%s/%s' % (CA_DIR, file)
+ cm = _connect_to_certmonger()
+ ca_path = cm.obj_if.find_ca_by_nickname('IPA')
+ return _cm_dbus_object(cm.bus, ca_path, DBUS_CM_CA_IF, DBUS_CM_IF, True)
- return None
def add_principal_to_cas(principal):
"""
@@ -317,58 +357,27 @@ def add_principal_to_cas(principal):
/usr/libexec/certmonger/ipa-submit.
We also need to restore this on uninstall.
-
- The certmonger service MUST be stopped in order for this to work.
"""
- cafile = _find_IPA_ca()
- if cafile is None:
- return
-
- update = False
- fp = open(cafile, 'r')
- lines = fp.readlines()
- fp.close()
-
- for i in xrange(len(lines)):
- if lines[i].startswith('ca_external_helper') and \
- lines[i].find('-k') == -1:
- lines[i] = '%s -k %s\n' % (lines[i].strip(), principal)
- update = True
+ ca = _find_IPA_ca()
+ if ca:
+ ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper')
+ if ext_helper and ext_helper.find('-k') == -1:
+ ext_helper = '%s -k %s' % (ext_helper.strip(), principal)
+ ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper)
- if update:
- fp = open(cafile, 'w')
- for line in lines:
- fp.write(line)
- fp.close()
def remove_principal_from_cas():
"""
Remove any -k principal options from the ipa_submit helper.
-
- The certmonger service MUST be stopped in order for this to work.
"""
- cafile = _find_IPA_ca()
- if cafile is None:
- return
-
- update = False
- fp = open(cafile, 'r')
- lines = fp.readlines()
- fp.close()
+ ca = _find_IPA_ca()
+ if ca:
+ ext_helper = ca.prop_if.Get(DBUS_CM_CA_IF, 'external-helper')
+ if ext_helper and ext_helper.find('-k'):
+ ext_helper = ext_helper.strip()[0]
+ ca.prop_if.Set(DBUS_CM_CA_IF, 'external-helper', ext_helper)
- for i in xrange(len(lines)):
- if lines[i].startswith('ca_external_helper') and \
- lines[i].find('-k') > 0:
- lines[i] = lines[i].strip().split(' ')[0] + '\n'
- update = True
- if update:
- fp = open(cafile, 'w')
- for line in lines:
- fp.write(line)
- fp.close()
-
-# Routines specific to renewing dogtag CA certificates
def get_pin(token, dogtag_constants=None):
"""
Dogtag stores its NSS pin in a file formatted as token:PIN.
@@ -384,6 +393,7 @@ def get_pin(token, dogtag_constants=None):
return pin.strip()
return None
+
def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command,
post_command, profile=None):
"""
@@ -398,52 +408,46 @@ def dogtag_start_tracking(ca, nickname, pin, pinfile, secdir, pre_command,
post_command is the script to execute after a renewal is done.
Both commands can be None.
-
- Returns the stdout, stderr and returncode from running ipa-getcert
-
- This assumes that certmonger is already running.
"""
- if not cert_exists(nickname, os.path.abspath(secdir)):
- raise RuntimeError('Nickname "%s" doesn\'t exist in NSS database "%s"' % (nickname, secdir))
- args = [paths.GETCERT, "start-tracking",
- "-d", os.path.abspath(secdir),
- "-n", nickname,
- "-c", ca,
- ]
-
- if pre_command is not None:
+ cm = _connect_to_certmonger()
+ certmonger_cmd_template = paths.CERTMONGER_COMMAND_TEMPLATE
+
+ params = {'TRACK': True}
+ params['cert-nickname'] = nickname
+ params['cert-database'] = os.path.abspath(secdir)
+ params['cert-storage'] = 'NSSDB'
+ params['key-nickname'] = nickname
+ params['key-database'] = os.path.abspath(secdir)
+ params['key-storage'] = 'NSSDB'
+ ca_path = cm.obj_if.find_ca_by_nickname(ca)
+ if ca_path:
+ params['ca'] = ca_path
+ if pin:
+ params['KEY_PIN'] = pin
+ if pinfile:
+ params['KEY_PIN_FILE'] = os.path.abspath(pinfile)
+ if pre_command:
if not os.path.isabs(pre_command):
if sys.maxsize > 2**32L:
libpath = 'lib64'
else:
libpath = 'lib'
- pre_command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, pre_command)
- args.append("-B")
- args.append(pre_command)
-
- if post_command is not None:
+ pre_command = certmonger_cmd_template % (libpath, pre_command)
+ params['cert-presave-command'] = pre_command
+ if post_command:
if not os.path.isabs(post_command):
if sys.maxsize > 2**32L:
libpath = 'lib64'
else:
libpath = 'lib'
- post_command = paths.CERTMONGER_COMMAND_TEMPLATE % (libpath, post_command)
- args.append("-C")
- args.append(post_command)
-
- if pinfile:
- args.append("-p")
- args.append(pinfile)
- else:
- args.append("-P")
- args.append(pin)
-
+ post_command = certmonger_cmd_template % (libpath, post_command)
+ params['cert-postsave-command'] = post_command
if profile:
- args.append("-T")
- args.append(profile)
+ params['ca-profile'] = profile
+
+ cm.obj_if.add_request(params)
- (stdout, stderr, returncode) = ipautil.run(args, nolog=[pin])
def check_state(dirs):
"""
@@ -461,6 +465,7 @@ def check_state(dirs):
return reqids
+
def wait_for_request(request_id, timeout=120):
for i in range(0, timeout, 5):
state = get_request_value(request_id, 'state').strip()
@@ -475,7 +480,9 @@ def wait_for_request(request_id, timeout=120):
return state
if __name__ == '__main__':
- request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test", "cn=tiger.example.com,O=IPA", "HTTP/tiger.example.com@EXAMPLE.COM")
+ request_id = request_cert(paths.HTTPD_ALIAS_DIR, "Test",
+ "cn=tiger.example.com,O=IPA",
+ "HTTP/tiger.example.com@EXAMPLE.COM")
csr = get_request_value(request_id, 'csr')
print csr
stop_tracking(request_id)