diff options
author | Rob Crittenden <rcritten@redhat.com> | 2011-06-08 10:54:41 -0400 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2011-06-21 19:09:50 -0400 |
commit | dd69c7dbe68e8f8674994a54ea913f2dd2e52c32 (patch) | |
tree | 5fdc303354eb26a1d2cd206c81babdc73e8d51b9 /ipalib/plugins | |
parent | 3a36eced53e540fe8f2b23eadf7dffda080324de (diff) | |
download | freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.tar.gz freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.tar.xz freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.zip |
Make data type of certificates more obvious/predictable internally.
For the most part certificates will be treated as being in DER format.
When we load a certificate we will generally accept it in any format but
will convert it to DER before proceeding in normalize_certificate().
This also re-arranges a bit of code to pull some certificate-specific
functions out of ipalib/plugins/service.py into ipalib/x509.py.
This also tries to use variable names to indicate what format the certificate
is in at any given point:
dercert: DER
cert: PEM
nsscert: a python-nss Certificate object
rawcert: unknown format
ticket 32
Diffstat (limited to 'ipalib/plugins')
-rw-r--r-- | ipalib/plugins/cert.py | 7 | ||||
-rw-r--r-- | ipalib/plugins/entitle.py | 27 | ||||
-rw-r--r-- | ipalib/plugins/host.py | 22 | ||||
-rw-r--r-- | ipalib/plugins/service.py | 118 |
4 files changed, 31 insertions, 143 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 647a2e526..643e1cdd2 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -87,10 +87,9 @@ from ipalib import Command, Str, Int, Bytes, Flag, File from ipalib import errors from ipalib import pkcs10 from ipalib import x509 +from ipalib import util from ipalib.plugins.virtual import * from ipalib.plugins.service import split_principal -from ipalib.plugins.service import make_pem, check_writable_file -from ipalib.plugins.service import write_certificate import base64 import logging import traceback @@ -501,10 +500,10 @@ class cert_show(VirtualCommand): def forward(self, *keys, **options): if 'out' in options: - check_writable_file(options['out']) + util.check_writable_file(options['out']) result = super(cert_show, self).forward(*keys, **options) if 'certificate' in result['result']: - write_certificate(result['result']['certificate'], options['out']) + x509.write_certificate(result['result']['certificate'], options['out']) return result else: raise errors.NoCertificateError(entry=keys[-1]) diff --git a/ipalib/plugins/entitle.py b/ipalib/plugins/entitle.py index ad4c2c6df..ab7dd456f 100644 --- a/ipalib/plugins/entitle.py +++ b/ipalib/plugins/entitle.py @@ -78,7 +78,8 @@ import base64 from OpenSSL import crypto from ipapython.ipautil import run from ipalib.request import context -from ipalib.plugins.service import validate_certificate, normalize_certificate +from ipalib.plugins.service import validate_certificate +from ipalib import x509 import locale @@ -101,16 +102,6 @@ def read_pkcs12_pin(): fp.close() return pwd -def make_pem(data): - """ - The M2Crypto/openSSL modules are very picky about PEM format and - require lines split to 64 characters with proper headers. - """ - cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)]) - return '-----BEGIN CERTIFICATE-----\n' + \ - cert + \ - '\n-----END CERTIFICATE-----' - def get_pool(ldap): """ Get our entitlement pool. Assume there is only one pool. @@ -256,7 +247,7 @@ class entitle_status(VirtualCommand): if u'usercertificate' in registrations: certs = registrations['usercertificate'] for cert in certs: - cert = make_pem(base64.b64encode(cert)) + cert = x509.make_pem(base64.b64encode(cert)) try: pc = EntitlementCertificate(cert) o = pc.getOrder() @@ -358,7 +349,7 @@ class entitle_consume(LDAPUpdate): results = cp.getCertificates(uuid) usercertificate = [] for cert in results: - usercertificate.append(normalize_certificate(cert['cert'])) + usercertificate.append(x509.normalize_certificate(cert['cert'])) entry_attrs['usercertificate'] = usercertificate entry_attrs['ipaentitlementid'] = uuid finally: @@ -427,7 +418,7 @@ class entitle_get(VirtualCommand): if u'usercertificate' in registrations: # make it look like a UEP cert for cert in registrations['usercertificate']: - certs.append(dict(cert = make_pem(base64.b64encode(cert)))) + certs.append(dict(cert = x509.make_pem(base64.b64encode(cert)))) else: try: cp = UEPConnection(handler='/candlepin', cert_file=certfile, key_file=keyfile) @@ -626,8 +617,8 @@ class entitle_import(LDAPUpdate): try: entry_attrs['ipaentitlementid'] = unicode('IMPORTED') - newcert = normalize_certificate(keys[-1][0]) - cert = make_pem(base64.b64encode(newcert)) + newcert = x509.normalize_certificate(keys[-1][0]) + cert = x509.make_pem(base64.b64encode(newcert)) try: pc = EntitlementCertificate(cert) o = pc.getOrder() @@ -645,7 +636,7 @@ class entitle_import(LDAPUpdate): # First import, create the entry entry_attrs['ipaentitlementid'] = unicode('IMPORTED') entry_attrs['objectclass'] = self.obj.object_class - entry_attrs['usercertificate'] = normalize_certificate(keys[-1][0]) + entry_attrs['usercertificate'] = x509.normalize_certificate(keys[-1][0]) ldap.add_entry(dn, entry_attrs) setattr(context, 'entitle_import', True) @@ -717,7 +708,7 @@ class entitle_sync(LDAPUpdate): results = cp.getCertificates(uuid) usercertificate = [] for cert in results: - usercertificate.append(normalize_certificate(cert['cert'])) + usercertificate.append(x509.normalize_certificate(cert['cert'])) entry_attrs['usercertificate'] = usercertificate entry_attrs['ipaentitlementid'] = uuid finally: diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py index ec58e1e40..1cd3fc061 100644 --- a/ipalib/plugins/host.py +++ b/ipalib/plugins/host.py @@ -81,11 +81,7 @@ from ipalib import Str, Flag, Bytes from ipalib.plugins.baseldap import * from ipalib.plugins.service import split_principal from ipalib.plugins.service import validate_certificate -from ipalib.plugins.service import normalize_certificate from ipalib.plugins.service import set_certificate_attrs -from ipalib.plugins.service import make_pem, check_writable_file -from ipalib.plugins.service import write_certificate -from ipalib.plugins.service import verify_cert_subject from ipalib.plugins.dns import dns_container_exists, _record_types from ipalib.plugins.dns import add_forward_record from ipalib import _, ngettext @@ -423,8 +419,8 @@ class host_add(LDAPCreate): del entry_attrs['random'] cert = options.get('usercertificate') if cert: - cert = normalize_certificate(cert) - verify_cert_subject(ldap, keys[-1], cert) + cert = x509.normalize_certificate(cert) + x509.verify_cert_subject(ldap, keys[-1], cert) entry_attrs['usercertificate'] = cert entry_attrs['managedby'] = dn return dn @@ -562,7 +558,7 @@ class host_del(LDAPDelete): self.obj.handle_not_found(*keys) if 'usercertificate' in entry_attrs: - cert = normalize_certificate(entry_attrs.get('usercertificate')[0]) + cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0]) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) try: @@ -626,12 +622,12 @@ class host_mod(LDAPUpdate): if 'krbprincipalaux' not in obj_classes: obj_classes.append('krbprincipalaux') entry_attrs['objectclass'] = obj_classes - cert = normalize_certificate(entry_attrs.get('usercertificate')) + cert = x509.normalize_certificate(entry_attrs.get('usercertificate')) if cert: - verify_cert_subject(ldap, keys[-1], cert) + x509.verify_cert_subject(ldap, keys[-1], cert) (dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate']) if 'usercertificate' in entry_attrs_old: - oldcert = normalize_certificate(entry_attrs_old.get('usercertificate')[0]) + oldcert = x509.normalize_certificate(entry_attrs_old.get('usercertificate')[0]) try: serial = unicode(x509.get_serial_number(oldcert, x509.DER)) try: @@ -733,10 +729,10 @@ class host_show(LDAPRetrieve): def forward(self, *keys, **options): if 'out' in options: - check_writable_file(options['out']) + util.check_writable_file(options['out']) result = super(host_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - write_certificate(result['result']['usercertificate'][0], options['out']) + x509.write_certificate(result['result']['usercertificate'][0], options['out']) result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out']) return result else: @@ -792,7 +788,7 @@ class host_disable(LDAPQuery): except errors.AlreadyInactive: pass if 'usercertificate' in entry_attrs: - cert = normalize_certificate(entry_attrs.get('usercertificate')[0]) + cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0]) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) try: diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 85956272b..14e04d26b 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -171,60 +171,6 @@ def validate_certificate(ugettext, cert): # We'll assume this is DER data pass -def normalize_certificate(cert): - """ - Incoming certificates should be DER-encoded. - - Note that this can't be a normalizer on the Param because only unicode - variables are normalized. - """ - if not cert: - return cert - - s = cert.find('-----BEGIN CERTIFICATE-----') - if s > -1: - e = cert.find('-----END CERTIFICATE-----') - cert = cert[s+27:e] - - if util.isvalid_base64(cert): - try: - cert = base64.b64decode(cert) - except Exception, e: - raise errors.Base64DecodeError(reason=str(e)) - - # At this point we should have a certificate, either because the data - # was base64-encoded and now its not or it came in as DER format. - # Let's decode it and see. Fetching the serial number will pass the - # certificate through the NSS DER parser. - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - except NSPRError, nsprerr: - if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER - raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate') - else: - raise errors.CertificateFormatError(error=str(nsprerr)) - - return cert - -def verify_cert_subject(ldap, hostname, cert): - """ - Verify that the certificate issuer we're adding matches the issuer - base of our installation. - - This assumes the certificate has already been normalized. - - This raises an exception on errors and returns nothing otherwise. - """ - cert = x509.load_certificate(cert, datatype=x509.DER) - subject = str(cert.subject) - issuer = str(cert.issuer) - - # Handle both supported forms of issuer, from selfsign and dogtag. - if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and - (issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)): - raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \ - {'issuer' : issuer}) - def set_certificate_attrs(entry_attrs): """ Set individual attributes from some values from a certificate. @@ -239,7 +185,7 @@ def set_certificate_attrs(entry_attrs): cert = entry_attrs['usercertificate'][0] else: cert = entry_attrs['usercertificate'] - cert = normalize_certificate(cert) + cert = x509.normalize_certificate(cert) cert = x509.load_certificate(cert, datatype=x509.DER) entry_attrs['subject'] = unicode(cert.subject) entry_attrs['serial_number'] = unicode(cert.serial_number) @@ -249,50 +195,6 @@ def set_certificate_attrs(entry_attrs): entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) -def check_writable_file(filename): - """ - Determine if the file is writable. If the file doesn't exist then - open the file to test writability. - """ - if filename is None: - raise errors.FileError(reason='Filename is empty') - try: - if file_exists(filename): - if not os.access(filename, os.W_OK): - raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename)) - else: - fp = open(filename, 'w') - fp.close() - except (IOError, OSError), e: - raise errors.FileError(reason=str(e)) - -def make_pem(data): - """ - Convert a raw base64-encoded blob into something that looks like a PE - file with lines split to 64 characters and proper headers. - """ - cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)]) - return '-----BEGIN CERTIFICATE-----\n' + \ - cert + \ - '\n-----END CERTIFICATE-----' - -def write_certificate(cert, filename): - """ - Check to see if the certificate should be written to a file and do so. - """ - if cert and util.isvalid_base64(cert): - try: - cert = base64.b64decode(cert) - except Exception, e: - raise errors.Base64DecodeError(reason=str(e)) - - try: - fp = open(filename, 'w') - fp.write(make_pem(base64.b64encode(cert))) - fp.close() - except (IOError, OSError), e: - raise errors.FileError(reason=str(e)) - class service(LDAPObject): """ Service object. @@ -361,9 +263,9 @@ class service_add(LDAPCreate): cert = options.get('usercertificate') if cert: - cert = normalize_certificate(cert) - verify_cert_subject(ldap, hostname, cert) - entry_attrs['usercertificate'] = cert + dercert = x509.normalize_certificate(cert) + x509.verify_cert_subject(ldap, hostname, dercert) + entry_attrs['usercertificate'] = dercert if not options.get('force', False): # We know the host exists if we've gotten this far but we @@ -430,8 +332,8 @@ class service_mod(LDAPUpdate): (service, hostname, realm) = split_principal(keys[-1]) cert = options.get('usercertificate') if cert: - cert = normalize_certificate(cert) - verify_cert_subject(ldap, hostname, cert) + dercert = x509.normalize_certificate(cert) + x509.verify_cert_subject(ldap, hostname, dercert) (dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate']) if 'usercertificate' in entry_attrs_old: # FIXME: what to do here? do we revoke the old cert? @@ -439,7 +341,7 @@ class service_mod(LDAPUpdate): x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER) ) raise errors.GenericError(format=fmt) - entry_attrs['usercertificate'] = cert + entry_attrs['usercertificate'] = dercert else: entry_attrs['usercertificate'] = None return dn @@ -513,10 +415,10 @@ class service_show(LDAPRetrieve): def forward(self, *keys, **options): if 'out' in options: - check_writable_file(options['out']) + util.check_writable_file(options['out']) result = super(service_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - write_certificate(result['result']['usercertificate'][0], options['out']) + x509.write_certificate(result['result']['usercertificate'][0], options['out']) result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out']) return result else: @@ -564,7 +466,7 @@ class service_disable(LDAPQuery): done_work = False if 'usercertificate' in entry_attrs: - cert = normalize_certificate(entry_attrs.get('usercertificate')[0]) + cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0]) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) try: |