diff options
Diffstat (limited to 'ipalib/plugins/service.py')
-rw-r--r-- | ipalib/plugins/service.py | 118 |
1 files changed, 10 insertions, 108 deletions
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 85956272b..14e04d26b 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -171,60 +171,6 @@ def validate_certificate(ugettext, cert): # We'll assume this is DER data pass -def normalize_certificate(cert): - """ - Incoming certificates should be DER-encoded. - - Note that this can't be a normalizer on the Param because only unicode - variables are normalized. - """ - if not cert: - return cert - - s = cert.find('-----BEGIN CERTIFICATE-----') - if s > -1: - e = cert.find('-----END CERTIFICATE-----') - cert = cert[s+27:e] - - if util.isvalid_base64(cert): - try: - cert = base64.b64decode(cert) - except Exception, e: - raise errors.Base64DecodeError(reason=str(e)) - - # At this point we should have a certificate, either because the data - # was base64-encoded and now its not or it came in as DER format. - # Let's decode it and see. Fetching the serial number will pass the - # certificate through the NSS DER parser. - try: - serial = unicode(x509.get_serial_number(cert, x509.DER)) - except NSPRError, nsprerr: - if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER - raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate') - else: - raise errors.CertificateFormatError(error=str(nsprerr)) - - return cert - -def verify_cert_subject(ldap, hostname, cert): - """ - Verify that the certificate issuer we're adding matches the issuer - base of our installation. - - This assumes the certificate has already been normalized. - - This raises an exception on errors and returns nothing otherwise. - """ - cert = x509.load_certificate(cert, datatype=x509.DER) - subject = str(cert.subject) - issuer = str(cert.issuer) - - # Handle both supported forms of issuer, from selfsign and dogtag. - if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and - (issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)): - raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \ - {'issuer' : issuer}) - def set_certificate_attrs(entry_attrs): """ Set individual attributes from some values from a certificate. @@ -239,7 +185,7 @@ def set_certificate_attrs(entry_attrs): cert = entry_attrs['usercertificate'][0] else: cert = entry_attrs['usercertificate'] - cert = normalize_certificate(cert) + cert = x509.normalize_certificate(cert) cert = x509.load_certificate(cert, datatype=x509.DER) entry_attrs['subject'] = unicode(cert.subject) entry_attrs['serial_number'] = unicode(cert.serial_number) @@ -249,50 +195,6 @@ def set_certificate_attrs(entry_attrs): entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) -def check_writable_file(filename): - """ - Determine if the file is writable. If the file doesn't exist then - open the file to test writability. - """ - if filename is None: - raise errors.FileError(reason='Filename is empty') - try: - if file_exists(filename): - if not os.access(filename, os.W_OK): - raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename)) - else: - fp = open(filename, 'w') - fp.close() - except (IOError, OSError), e: - raise errors.FileError(reason=str(e)) - -def make_pem(data): - """ - Convert a raw base64-encoded blob into something that looks like a PE - file with lines split to 64 characters and proper headers. - """ - cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)]) - return '-----BEGIN CERTIFICATE-----\n' + \ - cert + \ - '\n-----END CERTIFICATE-----' - -def write_certificate(cert, filename): - """ - Check to see if the certificate should be written to a file and do so. - """ - if cert and util.isvalid_base64(cert): - try: - cert = base64.b64decode(cert) - except Exception, e: - raise errors.Base64DecodeError(reason=str(e)) - - try: - fp = open(filename, 'w') - fp.write(make_pem(base64.b64encode(cert))) - fp.close() - except (IOError, OSError), e: - raise errors.FileError(reason=str(e)) - class service(LDAPObject): """ Service object. @@ -361,9 +263,9 @@ class service_add(LDAPCreate): cert = options.get('usercertificate') if cert: - cert = normalize_certificate(cert) - verify_cert_subject(ldap, hostname, cert) - entry_attrs['usercertificate'] = cert + dercert = x509.normalize_certificate(cert) + x509.verify_cert_subject(ldap, hostname, dercert) + entry_attrs['usercertificate'] = dercert if not options.get('force', False): # We know the host exists if we've gotten this far but we @@ -430,8 +332,8 @@ class service_mod(LDAPUpdate): (service, hostname, realm) = split_principal(keys[-1]) cert = options.get('usercertificate') if cert: - cert = normalize_certificate(cert) - verify_cert_subject(ldap, hostname, cert) + dercert = x509.normalize_certificate(cert) + x509.verify_cert_subject(ldap, hostname, dercert) (dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate']) if 'usercertificate' in entry_attrs_old: # FIXME: what to do here? do we revoke the old cert? @@ -439,7 +341,7 @@ class service_mod(LDAPUpdate): x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER) ) raise errors.GenericError(format=fmt) - entry_attrs['usercertificate'] = cert + entry_attrs['usercertificate'] = dercert else: entry_attrs['usercertificate'] = None return dn @@ -513,10 +415,10 @@ class service_show(LDAPRetrieve): def forward(self, *keys, **options): if 'out' in options: - check_writable_file(options['out']) + util.check_writable_file(options['out']) result = super(service_show, self).forward(*keys, **options) if 'usercertificate' in result['result']: - write_certificate(result['result']['usercertificate'][0], options['out']) + x509.write_certificate(result['result']['usercertificate'][0], options['out']) result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out']) return result else: @@ -564,7 +466,7 @@ class service_disable(LDAPQuery): done_work = False if 'usercertificate' in entry_attrs: - cert = normalize_certificate(entry_attrs.get('usercertificate')[0]) + cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0]) try: serial = unicode(x509.get_serial_number(cert, x509.DER)) try: |