summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2011-06-08 10:54:41 -0400
committerRob Crittenden <rcritten@redhat.com>2011-06-21 19:09:50 -0400
commitdd69c7dbe68e8f8674994a54ea913f2dd2e52c32 (patch)
tree5fdc303354eb26a1d2cd206c81babdc73e8d51b9
parent3a36eced53e540fe8f2b23eadf7dffda080324de (diff)
downloadfreeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.tar.gz
freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.tar.xz
freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.zip
Make data type of certificates more obvious/predictable internally.
For the most part certificates will be treated as being in DER format. When we load a certificate we will generally accept it in any format but will convert it to DER before proceeding in normalize_certificate(). This also re-arranges a bit of code to pull some certificate-specific functions out of ipalib/plugins/service.py into ipalib/x509.py. This also tries to use variable names to indicate what format the certificate is in at any given point: dercert: DER cert: PEM nsscert: a python-nss Certificate object rawcert: unknown format ticket 32
-rw-r--r--install/tools/ipa-compliance2
-rw-r--r--ipalib/plugins/cert.py7
-rw-r--r--ipalib/plugins/entitle.py27
-rw-r--r--ipalib/plugins/host.py22
-rw-r--r--ipalib/plugins/service.py118
-rw-r--r--ipalib/util.py18
-rw-r--r--ipalib/x509.py116
-rw-r--r--ipaserver/install/cainstance.py14
-rw-r--r--ipaserver/install/certs.py17
-rw-r--r--ipaserver/install/dsinstance.py4
-rw-r--r--ipaserver/install/httpinstance.py2
-rw-r--r--ipaserver/install/service.py14
-rwxr-xr-xmake-testcert5
13 files changed, 184 insertions, 182 deletions
diff --git a/install/tools/ipa-compliance b/install/tools/ipa-compliance
index e1de25283..81cd8c370 100644
--- a/install/tools/ipa-compliance
+++ b/install/tools/ipa-compliance
@@ -81,7 +81,7 @@ def check_compliance(tmpdir, debug=False):
api.bootstrap(**cfg)
api.register(client)
api.finalize()
- from ipalib.plugins.service import normalize_certificate, make_pem
+ from ipalib.x509 import normalize_certificate, make_pem
try:
# Create a new credentials cache for this tool. This executes
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 647a2e526..643e1cdd2 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -87,10 +87,9 @@ from ipalib import Command, Str, Int, Bytes, Flag, File
from ipalib import errors
from ipalib import pkcs10
from ipalib import x509
+from ipalib import util
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
-from ipalib.plugins.service import make_pem, check_writable_file
-from ipalib.plugins.service import write_certificate
import base64
import logging
import traceback
@@ -501,10 +500,10 @@ class cert_show(VirtualCommand):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(cert_show, self).forward(*keys, **options)
if 'certificate' in result['result']:
- write_certificate(result['result']['certificate'], options['out'])
+ x509.write_certificate(result['result']['certificate'], options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
diff --git a/ipalib/plugins/entitle.py b/ipalib/plugins/entitle.py
index ad4c2c6df..ab7dd456f 100644
--- a/ipalib/plugins/entitle.py
+++ b/ipalib/plugins/entitle.py
@@ -78,7 +78,8 @@ import base64
from OpenSSL import crypto
from ipapython.ipautil import run
from ipalib.request import context
-from ipalib.plugins.service import validate_certificate, normalize_certificate
+from ipalib.plugins.service import validate_certificate
+from ipalib import x509
import locale
@@ -101,16 +102,6 @@ def read_pkcs12_pin():
fp.close()
return pwd
-def make_pem(data):
- """
- The M2Crypto/openSSL modules are very picky about PEM format and
- require lines split to 64 characters with proper headers.
- """
- cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- cert + \
- '\n-----END CERTIFICATE-----'
-
def get_pool(ldap):
"""
Get our entitlement pool. Assume there is only one pool.
@@ -256,7 +247,7 @@ class entitle_status(VirtualCommand):
if u'usercertificate' in registrations:
certs = registrations['usercertificate']
for cert in certs:
- cert = make_pem(base64.b64encode(cert))
+ cert = x509.make_pem(base64.b64encode(cert))
try:
pc = EntitlementCertificate(cert)
o = pc.getOrder()
@@ -358,7 +349,7 @@ class entitle_consume(LDAPUpdate):
results = cp.getCertificates(uuid)
usercertificate = []
for cert in results:
- usercertificate.append(normalize_certificate(cert['cert']))
+ usercertificate.append(x509.normalize_certificate(cert['cert']))
entry_attrs['usercertificate'] = usercertificate
entry_attrs['ipaentitlementid'] = uuid
finally:
@@ -427,7 +418,7 @@ class entitle_get(VirtualCommand):
if u'usercertificate' in registrations:
# make it look like a UEP cert
for cert in registrations['usercertificate']:
- certs.append(dict(cert = make_pem(base64.b64encode(cert))))
+ certs.append(dict(cert = x509.make_pem(base64.b64encode(cert))))
else:
try:
cp = UEPConnection(handler='/candlepin', cert_file=certfile, key_file=keyfile)
@@ -626,8 +617,8 @@ class entitle_import(LDAPUpdate):
try:
entry_attrs['ipaentitlementid'] = unicode('IMPORTED')
- newcert = normalize_certificate(keys[-1][0])
- cert = make_pem(base64.b64encode(newcert))
+ newcert = x509.normalize_certificate(keys[-1][0])
+ cert = x509.make_pem(base64.b64encode(newcert))
try:
pc = EntitlementCertificate(cert)
o = pc.getOrder()
@@ -645,7 +636,7 @@ class entitle_import(LDAPUpdate):
# First import, create the entry
entry_attrs['ipaentitlementid'] = unicode('IMPORTED')
entry_attrs['objectclass'] = self.obj.object_class
- entry_attrs['usercertificate'] = normalize_certificate(keys[-1][0])
+ entry_attrs['usercertificate'] = x509.normalize_certificate(keys[-1][0])
ldap.add_entry(dn, entry_attrs)
setattr(context, 'entitle_import', True)
@@ -717,7 +708,7 @@ class entitle_sync(LDAPUpdate):
results = cp.getCertificates(uuid)
usercertificate = []
for cert in results:
- usercertificate.append(normalize_certificate(cert['cert']))
+ usercertificate.append(x509.normalize_certificate(cert['cert']))
entry_attrs['usercertificate'] = usercertificate
entry_attrs['ipaentitlementid'] = uuid
finally:
diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py
index ec58e1e40..1cd3fc061 100644
--- a/ipalib/plugins/host.py
+++ b/ipalib/plugins/host.py
@@ -81,11 +81,7 @@ from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
from ipalib.plugins.service import split_principal
from ipalib.plugins.service import validate_certificate
-from ipalib.plugins.service import normalize_certificate
from ipalib.plugins.service import set_certificate_attrs
-from ipalib.plugins.service import make_pem, check_writable_file
-from ipalib.plugins.service import write_certificate
-from ipalib.plugins.service import verify_cert_subject
from ipalib.plugins.dns import dns_container_exists, _record_types
from ipalib.plugins.dns import add_forward_record
from ipalib import _, ngettext
@@ -423,8 +419,8 @@ class host_add(LDAPCreate):
del entry_attrs['random']
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, keys[-1], cert)
+ cert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, keys[-1], cert)
entry_attrs['usercertificate'] = cert
entry_attrs['managedby'] = dn
return dn
@@ -562,7 +558,7 @@ class host_del(LDAPDelete):
self.obj.handle_not_found(*keys)
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
@@ -626,12 +622,12 @@ class host_mod(LDAPUpdate):
if 'krbprincipalaux' not in obj_classes:
obj_classes.append('krbprincipalaux')
entry_attrs['objectclass'] = obj_classes
- cert = normalize_certificate(entry_attrs.get('usercertificate'))
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate'))
if cert:
- verify_cert_subject(ldap, keys[-1], cert)
+ x509.verify_cert_subject(ldap, keys[-1], cert)
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
- oldcert = normalize_certificate(entry_attrs_old.get('usercertificate')[0])
+ oldcert = x509.normalize_certificate(entry_attrs_old.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(oldcert, x509.DER))
try:
@@ -733,10 +729,10 @@ class host_show(LDAPRetrieve):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(host_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- write_certificate(result['result']['usercertificate'][0], options['out'])
+ x509.write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
@@ -792,7 +788,7 @@ class host_disable(LDAPQuery):
except errors.AlreadyInactive:
pass
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 85956272b..14e04d26b 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -171,60 +171,6 @@ def validate_certificate(ugettext, cert):
# We'll assume this is DER data
pass
-def normalize_certificate(cert):
- """
- Incoming certificates should be DER-encoded.
-
- Note that this can't be a normalizer on the Param because only unicode
- variables are normalized.
- """
- if not cert:
- return cert
-
- s = cert.find('-----BEGIN CERTIFICATE-----')
- if s > -1:
- e = cert.find('-----END CERTIFICATE-----')
- cert = cert[s+27:e]
-
- if util.isvalid_base64(cert):
- try:
- cert = base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
-
- # At this point we should have a certificate, either because the data
- # was base64-encoded and now its not or it came in as DER format.
- # Let's decode it and see. Fetching the serial number will pass the
- # certificate through the NSS DER parser.
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
- raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
- else:
- raise errors.CertificateFormatError(error=str(nsprerr))
-
- return cert
-
-def verify_cert_subject(ldap, hostname, cert):
- """
- Verify that the certificate issuer we're adding matches the issuer
- base of our installation.
-
- This assumes the certificate has already been normalized.
-
- This raises an exception on errors and returns nothing otherwise.
- """
- cert = x509.load_certificate(cert, datatype=x509.DER)
- subject = str(cert.subject)
- issuer = str(cert.issuer)
-
- # Handle both supported forms of issuer, from selfsign and dogtag.
- if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and
- (issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)):
- raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \
- {'issuer' : issuer})
-
def set_certificate_attrs(entry_attrs):
"""
Set individual attributes from some values from a certificate.
@@ -239,7 +185,7 @@ def set_certificate_attrs(entry_attrs):
cert = entry_attrs['usercertificate'][0]
else:
cert = entry_attrs['usercertificate']
- cert = normalize_certificate(cert)
+ cert = x509.normalize_certificate(cert)
cert = x509.load_certificate(cert, datatype=x509.DER)
entry_attrs['subject'] = unicode(cert.subject)
entry_attrs['serial_number'] = unicode(cert.serial_number)
@@ -249,50 +195,6 @@ def set_certificate_attrs(entry_attrs):
entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
-def check_writable_file(filename):
- """
- Determine if the file is writable. If the file doesn't exist then
- open the file to test writability.
- """
- if filename is None:
- raise errors.FileError(reason='Filename is empty')
- try:
- if file_exists(filename):
- if not os.access(filename, os.W_OK):
- raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
- else:
- fp = open(filename, 'w')
- fp.close()
- except (IOError, OSError), e:
- raise errors.FileError(reason=str(e))
-
-def make_pem(data):
- """
- Convert a raw base64-encoded blob into something that looks like a PE
- file with lines split to 64 characters and proper headers.
- """
- cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- cert + \
- '\n-----END CERTIFICATE-----'
-
-def write_certificate(cert, filename):
- """
- Check to see if the certificate should be written to a file and do so.
- """
- if cert and util.isvalid_base64(cert):
- try:
- cert = base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
-
- try:
- fp = open(filename, 'w')
- fp.write(make_pem(base64.b64encode(cert)))
- fp.close()
- except (IOError, OSError), e:
- raise errors.FileError(reason=str(e))
-
class service(LDAPObject):
"""
Service object.
@@ -361,9 +263,9 @@ class service_add(LDAPCreate):
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, hostname, cert)
- entry_attrs['usercertificate'] = cert
+ dercert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, hostname, dercert)
+ entry_attrs['usercertificate'] = dercert
if not options.get('force', False):
# We know the host exists if we've gotten this far but we
@@ -430,8 +332,8 @@ class service_mod(LDAPUpdate):
(service, hostname, realm) = split_principal(keys[-1])
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, hostname, cert)
+ dercert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, hostname, dercert)
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
# FIXME: what to do here? do we revoke the old cert?
@@ -439,7 +341,7 @@ class service_mod(LDAPUpdate):
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
)
raise errors.GenericError(format=fmt)
- entry_attrs['usercertificate'] = cert
+ entry_attrs['usercertificate'] = dercert
else:
entry_attrs['usercertificate'] = None
return dn
@@ -513,10 +415,10 @@ class service_show(LDAPRetrieve):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- write_certificate(result['result']['usercertificate'][0], options['out'])
+ x509.write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
@@ -564,7 +466,7 @@ class service_disable(LDAPQuery):
done_work = False
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
diff --git a/ipalib/util.py b/ipalib/util.py
index 78b954ee2..cc887c348 100644
--- a/ipalib/util.py
+++ b/ipalib/util.py
@@ -30,6 +30,7 @@ import re
from types import NoneType
from ipalib import errors
+from ipalib.text import _
from ipapython import dnsclient
@@ -185,3 +186,20 @@ def validate_ipaddr(ipaddr):
except socket.error:
return False
return True
+
+def check_writable_file(filename):
+ """
+ Determine if the file is writable. If the file doesn't exist then
+ open the file to test writability.
+ """
+ if filename is None:
+ raise errors.FileError(reason='Filename is empty')
+ try:
+ if os.path.exists(filename):
+ if not os.access(filename, os.W_OK):
+ raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
+ else:
+ fp = open(filename, 'w')
+ fp.close()
+ except (IOError, OSError), e:
+ raise errors.FileError(reason=str(e))
diff --git a/ipalib/x509.py b/ipalib/x509.py
index fc6f9c121..77d6aabf4 100644
--- a/ipalib/x509.py
+++ b/ipalib/x509.py
@@ -17,12 +17,30 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+# Certificates should be stored internally DER-encoded. We can be passed
+# a certificate several ways: read if from LDAP, read it from a 3rd party
+# app (dogtag, candlepin, etc) or as user input. The normalize_certificate()
+# function will convert an incoming certificate to DER-encoding.
+
+# Conventions
+#
+# Where possible the following naming conventions are used:
+#
+# cert: the certificate is a PEM-encoded certificate
+# dercert: the certificate is DER-encoded
+# nsscert: the certificate is an NSS Certificate object
+# rawcert: the cert is in an unknown format
+
import os
import sys
import base64
import nss.nss as nss
+from nss.error import NSPRError
from ipapython import ipautil
from ipalib import api
+from ipalib import _
+from ipalib import util
+from ipalib import errors
PEM = 0
DER = 1
@@ -66,17 +84,103 @@ def get_subject(certificate, datatype=PEM):
Load an X509.3 certificate and get the subject.
"""
- cert = load_certificate(certificate, datatype)
- return cert.subject
+ nsscert = load_certificate(certificate, datatype)
+ return nsscert.subject
def get_serial_number(certificate, datatype=PEM):
"""
Return the decimal value of the serial number.
"""
- cert = load_certificate(certificate, datatype)
- return cert.serial_number
+ nsscert = load_certificate(certificate, datatype)
+ return nsscert.serial_number
+
+def make_pem(data):
+ """
+ Convert a raw base64-encoded blob into something that looks like a PE
+ file with lines split to 64 characters and proper headers.
+ """
+ pemcert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
+ return '-----BEGIN CERTIFICATE-----\n' + \
+ pemcert + \
+ '\n-----END CERTIFICATE-----'
+
+def normalize_certificate(rawcert):
+ """
+ Incoming certificates should be DER-encoded. If not it is converted to
+ DER-format.
+
+ Note that this can't be a normalizer on a Param because only unicode
+ variables are normalized.
+ """
+ if not rawcert:
+ return None
+
+ rawcert = strip_header(rawcert)
+
+ if util.isvalid_base64(rawcert):
+ try:
+ dercert = base64.b64decode(rawcert)
+ except Exception, e:
+ raise errors.Base64DecodeError(reason=str(e))
+ else:
+ dercert = rawcert
+
+ # At this point we should have a certificate, either because the data
+ # was base64-encoded and now its not or it came in as DER format.
+ # Let's decode it and see. Fetching the serial number will pass the
+ # certificate through the NSS DER parser.
+ try:
+ serial = unicode(get_serial_number(dercert, DER))
+ except NSPRError, nsprerr:
+ if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
+ raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
+ else:
+ raise errors.CertificateFormatError(error=str(nsprerr))
+
+ return dercert
+
+def write_certificate(rawcert, filename):
+ """
+ Write the certificate to a file in PEM format.
+
+ The cert value can be either DER or PEM-encoded, it will be normalized
+ to DER regardless, then back out to PEM.
+ """
+ dercert = normalize_certificate(rawcert)
+
+ try:
+ fp = open(filename, 'w')
+ fp.write(make_pem(base64.b64encode(dercert)))
+ fp.close()
+ except (IOError, OSError), e:
+ raise errors.FileError(reason=str(e))
+
+def verify_cert_subject(ldap, hostname, dercert):
+ """
+ Verify that the certificate issuer we're adding matches the issuer
+ base of our installation.
+
+ This assumes the certificate has already been normalized.
+
+ This raises an exception on errors and returns nothing otherwise.
+ """
+ nsscert = load_certificate(dercert, datatype=DER)
+ subject = str(nsscert.subject)
+ issuer = str(nsscert.issuer)
+
+ # Handle both supported forms of issuer, from selfsign and dogtag.
+ if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and
+ (issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)):
+ raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \
+ {'issuer' : issuer})
if __name__ == '__main__':
+ # this can be run with:
+ # python ipalib/x509.py < /etc/ipa/ca.crt
+
+ from ipalib import api
+ api.bootstrap()
+ api.finalize()
nss.nss_init_nodb()
@@ -85,6 +189,6 @@ if __name__ == '__main__':
certlines = sys.stdin.readlines()
cert = ''.join(certlines)
- cert = load_certificate(cert)
+ nsscert = load_certificate(cert)
- print cert
+ print nsscert
diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py
index 30aa9f525..001e6eb09 100644
--- a/ipaserver/install/cainstance.py
+++ b/ipaserver/install/cainstance.py
@@ -38,7 +38,7 @@ import stat
import socket
from ipapython import dogtag
from ipapython.certdb import get_ca_nickname
-from ipalib import pkcs10
+from ipalib import pkcs10, x509
import subprocess
from nss.error import NSPRError
@@ -322,7 +322,7 @@ class CADSInstance(service.Service):
# We only handle one server cert
self.nickname = server_certs[0][0]
- self.dercert = dsdb.get_cert_from_db(self.nickname)
+ self.dercert = dsdb.get_cert_from_db(self.nickname, pem=False)
dsdb.track_server_cert(self.nickname, self.principal, dsdb.passwd_fname)
def create_certdb(self):
@@ -721,13 +721,6 @@ class CAInstance(service.Service):
# TODO: roll back here?
logging.critical("Failed to restart the certificate server. See the installation log for details.")
- def __get_agent_cert(self, nickname):
- args = ["/usr/bin/certutil", "-L", "-d", self.ca_agent_db, "-n", nickname, "-a"]
- (out, err, returncode) = ipautil.run(args)
- out = out.replace('-----BEGIN CERTIFICATE-----', '')
- out = out.replace('-----END CERTIFICATE-----', '')
- return out
-
def __issue_ra_cert(self):
# The CA certificate is in the agent DB but isn't trusted
(admin_fd, admin_name) = tempfile.mkstemp()
@@ -801,8 +794,7 @@ class CAInstance(service.Service):
self.ra_cert = outputList['b64_cert']
self.ra_cert = self.ra_cert.replace('\\n','')
- self.ra_cert = self.ra_cert.replace('-----BEGIN CERTIFICATE-----','')
- self.ra_cert = self.ra_cert.replace('-----END CERTIFICATE-----','')
+ self.ra_cert = x509.strip_header(self.ra_cert)
# Add the new RA cert to the database in /etc/httpd/alias
(agent_fd, agent_name) = tempfile.mkstemp()
diff --git a/ipaserver/install/certs.py b/ipaserver/install/certs.py
index da89370af..07dda2cc0 100644
--- a/ipaserver/install/certs.py
+++ b/ipaserver/install/certs.py
@@ -432,11 +432,22 @@ class CertDB(object):
except RuntimeError:
break
- def get_cert_from_db(self, nickname):
+ def get_cert_from_db(self, nickname, pem=True):
+ """
+ Retrieve a certificate from the current NSS database for nickname.
+
+ pem controls whether the value returned PEM or DER-encoded. The
+ default is the data straight from certutil -a.
+ """
try:
args = ["-L", "-n", nickname, "-a"]
(cert, err, returncode) = self.run_certutil(args)
- return cert
+ if pem:
+ return cert
+ else:
+ (cert, start) = find_cert_from_txt(cert, start=0)
+ dercert = base64.b64decode(cert)
+ return dercert
except ipautil.CalledProcessError:
return ''
@@ -501,6 +512,8 @@ class CertDB(object):
that will issue our cert.
You can override the certificate Subject by specifying a subject.
+
+ Returns a certificate in DER format.
"""
cdb = other_certdb
if not cdb:
diff --git a/ipaserver/install/dsinstance.py b/ipaserver/install/dsinstance.py
index 845e1e253..574a5afd8 100644
--- a/ipaserver/install/dsinstance.py
+++ b/ipaserver/install/dsinstance.py
@@ -379,7 +379,7 @@ class DsInstance(service.Service):
logging.debug("completed creating ds instance")
except ipautil.CalledProcessError, e:
logging.critical("failed to restart ds instance %s" % e)
-
+
# check for open port 389 from now on
self.open_ports.append(389)
@@ -517,7 +517,7 @@ class DsInstance(service.Service):
# We only handle one server cert
nickname = server_certs[0][0]
- self.dercert = dsdb.get_cert_from_db(nickname)
+ self.dercert = dsdb.get_cert_from_db(nickname, pem=False)
dsdb.track_server_cert(nickname, self.principal, dsdb.passwd_fname)
else:
nickname = "Server-Cert"
diff --git a/ipaserver/install/httpinstance.py b/ipaserver/install/httpinstance.py
index e53c01e1c..26fde51f9 100644
--- a/ipaserver/install/httpinstance.py
+++ b/ipaserver/install/httpinstance.py
@@ -185,7 +185,7 @@ class HTTPInstance(service.Service):
db.create_password_conf()
# We only handle one server cert
nickname = server_certs[0][0]
- self.dercert = db.get_cert_from_db(nickname)
+ self.dercert = db.get_cert_from_db(nickname, pem=False)
db.track_server_cert(nickname, self.principal, db.passwd_fname)
self.__set_mod_nss_nickname(nickname)
diff --git a/ipaserver/install/service.py b/ipaserver/install/service.py
index d8d04e73a..efbb2c933 100644
--- a/ipaserver/install/service.py
+++ b/ipaserver/install/service.py
@@ -94,6 +94,7 @@ class Service(object):
self.realm = None
self.suffix = None
self.principal = None
+ self.dercert = None
def ldap_connect(self):
self.admin_conn = self.__get_conn(self.fqdn, self.dm_password)
@@ -192,23 +193,12 @@ class Service(object):
"""
Add a certificate to a service
- This should be passed in DER format but we'll be nice and convert
- a base64-encoded cert if needed (like when we add certs that come
- from PKCS#12 files.)
+ This server cert should be in DER format.
"""
if not self.admin_conn:
self.ldap_connect()
- try:
- s = self.dercert.find('-----BEGIN CERTIFICATE-----')
- if s > -1:
- e = self.dercert.find('-----END CERTIFICATE-----')
- s = s + 27
- self.dercert = self.dercert[s:e]
- self.dercert = base64.b64decode(self.dercert)
- except Exception:
- pass
dn = "krbprincipalname=%s,cn=services,cn=accounts,%s" % (self.principal, self.suffix)
mod = [(ldap.MOD_ADD, 'userCertificate', self.dercert)]
try:
diff --git a/make-testcert b/make-testcert
index e0c3db649..8a90de1ec 100755
--- a/make-testcert
+++ b/make-testcert
@@ -83,9 +83,6 @@ def makecert(reqdir):
api.register(client)
api.finalize()
- # This needs to be imported after the API is initialized
- from ipalib.plugins.service import make_pem
-
ra = rabase.rabase()
if not os.path.exists(ra.sec_dir) and api.env.xmlrpc_uri == 'http://localhost:8888/ipa/xml':
sys.exit('The in-tree self-signed CA is not configured, see tests/test_xmlrpc/test_cert.py')
@@ -108,7 +105,7 @@ def makecert(reqdir):
try:
res = api.Backend.client.run('cert_request', csr, principal=princ,
add=True)
- cert = make_pem(res['result']['certificate'])
+ cert = x509.make_pem(res['result']['certificate'])
fd = open(CERTPATH, 'w')
fd.write(cert)
fd.close()