summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/service.py
diff options
context:
space:
mode:
Diffstat (limited to 'ipalib/plugins/service.py')
-rw-r--r--ipalib/plugins/service.py118
1 files changed, 10 insertions, 108 deletions
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 85956272b..14e04d26b 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -171,60 +171,6 @@ def validate_certificate(ugettext, cert):
# We'll assume this is DER data
pass
-def normalize_certificate(cert):
- """
- Incoming certificates should be DER-encoded.
-
- Note that this can't be a normalizer on the Param because only unicode
- variables are normalized.
- """
- if not cert:
- return cert
-
- s = cert.find('-----BEGIN CERTIFICATE-----')
- if s > -1:
- e = cert.find('-----END CERTIFICATE-----')
- cert = cert[s+27:e]
-
- if util.isvalid_base64(cert):
- try:
- cert = base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
-
- # At this point we should have a certificate, either because the data
- # was base64-encoded and now its not or it came in as DER format.
- # Let's decode it and see. Fetching the serial number will pass the
- # certificate through the NSS DER parser.
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
- raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
- else:
- raise errors.CertificateFormatError(error=str(nsprerr))
-
- return cert
-
-def verify_cert_subject(ldap, hostname, cert):
- """
- Verify that the certificate issuer we're adding matches the issuer
- base of our installation.
-
- This assumes the certificate has already been normalized.
-
- This raises an exception on errors and returns nothing otherwise.
- """
- cert = x509.load_certificate(cert, datatype=x509.DER)
- subject = str(cert.subject)
- issuer = str(cert.issuer)
-
- # Handle both supported forms of issuer, from selfsign and dogtag.
- if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and
- (issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)):
- raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \
- {'issuer' : issuer})
-
def set_certificate_attrs(entry_attrs):
"""
Set individual attributes from some values from a certificate.
@@ -239,7 +185,7 @@ def set_certificate_attrs(entry_attrs):
cert = entry_attrs['usercertificate'][0]
else:
cert = entry_attrs['usercertificate']
- cert = normalize_certificate(cert)
+ cert = x509.normalize_certificate(cert)
cert = x509.load_certificate(cert, datatype=x509.DER)
entry_attrs['subject'] = unicode(cert.subject)
entry_attrs['serial_number'] = unicode(cert.serial_number)
@@ -249,50 +195,6 @@ def set_certificate_attrs(entry_attrs):
entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
-def check_writable_file(filename):
- """
- Determine if the file is writable. If the file doesn't exist then
- open the file to test writability.
- """
- if filename is None:
- raise errors.FileError(reason='Filename is empty')
- try:
- if file_exists(filename):
- if not os.access(filename, os.W_OK):
- raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
- else:
- fp = open(filename, 'w')
- fp.close()
- except (IOError, OSError), e:
- raise errors.FileError(reason=str(e))
-
-def make_pem(data):
- """
- Convert a raw base64-encoded blob into something that looks like a PE
- file with lines split to 64 characters and proper headers.
- """
- cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- cert + \
- '\n-----END CERTIFICATE-----'
-
-def write_certificate(cert, filename):
- """
- Check to see if the certificate should be written to a file and do so.
- """
- if cert and util.isvalid_base64(cert):
- try:
- cert = base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
-
- try:
- fp = open(filename, 'w')
- fp.write(make_pem(base64.b64encode(cert)))
- fp.close()
- except (IOError, OSError), e:
- raise errors.FileError(reason=str(e))
-
class service(LDAPObject):
"""
Service object.
@@ -361,9 +263,9 @@ class service_add(LDAPCreate):
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, hostname, cert)
- entry_attrs['usercertificate'] = cert
+ dercert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, hostname, dercert)
+ entry_attrs['usercertificate'] = dercert
if not options.get('force', False):
# We know the host exists if we've gotten this far but we
@@ -430,8 +332,8 @@ class service_mod(LDAPUpdate):
(service, hostname, realm) = split_principal(keys[-1])
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, hostname, cert)
+ dercert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, hostname, dercert)
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
# FIXME: what to do here? do we revoke the old cert?
@@ -439,7 +341,7 @@ class service_mod(LDAPUpdate):
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
)
raise errors.GenericError(format=fmt)
- entry_attrs['usercertificate'] = cert
+ entry_attrs['usercertificate'] = dercert
else:
entry_attrs['usercertificate'] = None
return dn
@@ -513,10 +415,10 @@ class service_show(LDAPRetrieve):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- write_certificate(result['result']['usercertificate'][0], options['out'])
+ x509.write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
@@ -564,7 +466,7 @@ class service_disable(LDAPQuery):
done_work = False
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try: