summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2011-06-08 10:54:41 -0400
committerRob Crittenden <rcritten@redhat.com>2011-06-21 19:09:50 -0400
commitdd69c7dbe68e8f8674994a54ea913f2dd2e52c32 (patch)
tree5fdc303354eb26a1d2cd206c81babdc73e8d51b9 /ipalib/plugins
parent3a36eced53e540fe8f2b23eadf7dffda080324de (diff)
downloadfreeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.tar.gz
freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.tar.xz
freeipa-dd69c7dbe68e8f8674994a54ea913f2dd2e52c32.zip
Make data type of certificates more obvious/predictable internally.
For the most part certificates will be treated as being in DER format. When we load a certificate we will generally accept it in any format but will convert it to DER before proceeding in normalize_certificate(). This also re-arranges a bit of code to pull some certificate-specific functions out of ipalib/plugins/service.py into ipalib/x509.py. This also tries to use variable names to indicate what format the certificate is in at any given point: dercert: DER cert: PEM nsscert: a python-nss Certificate object rawcert: unknown format ticket 32
Diffstat (limited to 'ipalib/plugins')
-rw-r--r--ipalib/plugins/cert.py7
-rw-r--r--ipalib/plugins/entitle.py27
-rw-r--r--ipalib/plugins/host.py22
-rw-r--r--ipalib/plugins/service.py118
4 files changed, 31 insertions, 143 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 647a2e526..643e1cdd2 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -87,10 +87,9 @@ from ipalib import Command, Str, Int, Bytes, Flag, File
from ipalib import errors
from ipalib import pkcs10
from ipalib import x509
+from ipalib import util
from ipalib.plugins.virtual import *
from ipalib.plugins.service import split_principal
-from ipalib.plugins.service import make_pem, check_writable_file
-from ipalib.plugins.service import write_certificate
import base64
import logging
import traceback
@@ -501,10 +500,10 @@ class cert_show(VirtualCommand):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(cert_show, self).forward(*keys, **options)
if 'certificate' in result['result']:
- write_certificate(result['result']['certificate'], options['out'])
+ x509.write_certificate(result['result']['certificate'], options['out'])
return result
else:
raise errors.NoCertificateError(entry=keys[-1])
diff --git a/ipalib/plugins/entitle.py b/ipalib/plugins/entitle.py
index ad4c2c6df..ab7dd456f 100644
--- a/ipalib/plugins/entitle.py
+++ b/ipalib/plugins/entitle.py
@@ -78,7 +78,8 @@ import base64
from OpenSSL import crypto
from ipapython.ipautil import run
from ipalib.request import context
-from ipalib.plugins.service import validate_certificate, normalize_certificate
+from ipalib.plugins.service import validate_certificate
+from ipalib import x509
import locale
@@ -101,16 +102,6 @@ def read_pkcs12_pin():
fp.close()
return pwd
-def make_pem(data):
- """
- The M2Crypto/openSSL modules are very picky about PEM format and
- require lines split to 64 characters with proper headers.
- """
- cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- cert + \
- '\n-----END CERTIFICATE-----'
-
def get_pool(ldap):
"""
Get our entitlement pool. Assume there is only one pool.
@@ -256,7 +247,7 @@ class entitle_status(VirtualCommand):
if u'usercertificate' in registrations:
certs = registrations['usercertificate']
for cert in certs:
- cert = make_pem(base64.b64encode(cert))
+ cert = x509.make_pem(base64.b64encode(cert))
try:
pc = EntitlementCertificate(cert)
o = pc.getOrder()
@@ -358,7 +349,7 @@ class entitle_consume(LDAPUpdate):
results = cp.getCertificates(uuid)
usercertificate = []
for cert in results:
- usercertificate.append(normalize_certificate(cert['cert']))
+ usercertificate.append(x509.normalize_certificate(cert['cert']))
entry_attrs['usercertificate'] = usercertificate
entry_attrs['ipaentitlementid'] = uuid
finally:
@@ -427,7 +418,7 @@ class entitle_get(VirtualCommand):
if u'usercertificate' in registrations:
# make it look like a UEP cert
for cert in registrations['usercertificate']:
- certs.append(dict(cert = make_pem(base64.b64encode(cert))))
+ certs.append(dict(cert = x509.make_pem(base64.b64encode(cert))))
else:
try:
cp = UEPConnection(handler='/candlepin', cert_file=certfile, key_file=keyfile)
@@ -626,8 +617,8 @@ class entitle_import(LDAPUpdate):
try:
entry_attrs['ipaentitlementid'] = unicode('IMPORTED')
- newcert = normalize_certificate(keys[-1][0])
- cert = make_pem(base64.b64encode(newcert))
+ newcert = x509.normalize_certificate(keys[-1][0])
+ cert = x509.make_pem(base64.b64encode(newcert))
try:
pc = EntitlementCertificate(cert)
o = pc.getOrder()
@@ -645,7 +636,7 @@ class entitle_import(LDAPUpdate):
# First import, create the entry
entry_attrs['ipaentitlementid'] = unicode('IMPORTED')
entry_attrs['objectclass'] = self.obj.object_class
- entry_attrs['usercertificate'] = normalize_certificate(keys[-1][0])
+ entry_attrs['usercertificate'] = x509.normalize_certificate(keys[-1][0])
ldap.add_entry(dn, entry_attrs)
setattr(context, 'entitle_import', True)
@@ -717,7 +708,7 @@ class entitle_sync(LDAPUpdate):
results = cp.getCertificates(uuid)
usercertificate = []
for cert in results:
- usercertificate.append(normalize_certificate(cert['cert']))
+ usercertificate.append(x509.normalize_certificate(cert['cert']))
entry_attrs['usercertificate'] = usercertificate
entry_attrs['ipaentitlementid'] = uuid
finally:
diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py
index ec58e1e40..1cd3fc061 100644
--- a/ipalib/plugins/host.py
+++ b/ipalib/plugins/host.py
@@ -81,11 +81,7 @@ from ipalib import Str, Flag, Bytes
from ipalib.plugins.baseldap import *
from ipalib.plugins.service import split_principal
from ipalib.plugins.service import validate_certificate
-from ipalib.plugins.service import normalize_certificate
from ipalib.plugins.service import set_certificate_attrs
-from ipalib.plugins.service import make_pem, check_writable_file
-from ipalib.plugins.service import write_certificate
-from ipalib.plugins.service import verify_cert_subject
from ipalib.plugins.dns import dns_container_exists, _record_types
from ipalib.plugins.dns import add_forward_record
from ipalib import _, ngettext
@@ -423,8 +419,8 @@ class host_add(LDAPCreate):
del entry_attrs['random']
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, keys[-1], cert)
+ cert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, keys[-1], cert)
entry_attrs['usercertificate'] = cert
entry_attrs['managedby'] = dn
return dn
@@ -562,7 +558,7 @@ class host_del(LDAPDelete):
self.obj.handle_not_found(*keys)
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
@@ -626,12 +622,12 @@ class host_mod(LDAPUpdate):
if 'krbprincipalaux' not in obj_classes:
obj_classes.append('krbprincipalaux')
entry_attrs['objectclass'] = obj_classes
- cert = normalize_certificate(entry_attrs.get('usercertificate'))
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate'))
if cert:
- verify_cert_subject(ldap, keys[-1], cert)
+ x509.verify_cert_subject(ldap, keys[-1], cert)
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
- oldcert = normalize_certificate(entry_attrs_old.get('usercertificate')[0])
+ oldcert = x509.normalize_certificate(entry_attrs_old.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(oldcert, x509.DER))
try:
@@ -733,10 +729,10 @@ class host_show(LDAPRetrieve):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(host_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- write_certificate(result['result']['usercertificate'][0], options['out'])
+ x509.write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
@@ -792,7 +788,7 @@ class host_disable(LDAPQuery):
except errors.AlreadyInactive:
pass
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try:
diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py
index 85956272b..14e04d26b 100644
--- a/ipalib/plugins/service.py
+++ b/ipalib/plugins/service.py
@@ -171,60 +171,6 @@ def validate_certificate(ugettext, cert):
# We'll assume this is DER data
pass
-def normalize_certificate(cert):
- """
- Incoming certificates should be DER-encoded.
-
- Note that this can't be a normalizer on the Param because only unicode
- variables are normalized.
- """
- if not cert:
- return cert
-
- s = cert.find('-----BEGIN CERTIFICATE-----')
- if s > -1:
- e = cert.find('-----END CERTIFICATE-----')
- cert = cert[s+27:e]
-
- if util.isvalid_base64(cert):
- try:
- cert = base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
-
- # At this point we should have a certificate, either because the data
- # was base64-encoded and now its not or it came in as DER format.
- # Let's decode it and see. Fetching the serial number will pass the
- # certificate through the NSS DER parser.
- try:
- serial = unicode(x509.get_serial_number(cert, x509.DER))
- except NSPRError, nsprerr:
- if nsprerr.errno == -8183: # SEC_ERROR_BAD_DER
- raise errors.CertificateFormatError(error='improperly formatted DER-encoded certificate')
- else:
- raise errors.CertificateFormatError(error=str(nsprerr))
-
- return cert
-
-def verify_cert_subject(ldap, hostname, cert):
- """
- Verify that the certificate issuer we're adding matches the issuer
- base of our installation.
-
- This assumes the certificate has already been normalized.
-
- This raises an exception on errors and returns nothing otherwise.
- """
- cert = x509.load_certificate(cert, datatype=x509.DER)
- subject = str(cert.subject)
- issuer = str(cert.issuer)
-
- # Handle both supported forms of issuer, from selfsign and dogtag.
- if ((issuer != 'CN=%s Certificate Authority' % api.env.realm) and
- (issuer != 'CN=Certificate Authority,O=%s' % api.env.realm)):
- raise errors.CertificateOperationError(error=_('Issuer "%(issuer)s" does not match the expected issuer') % \
- {'issuer' : issuer})
-
def set_certificate_attrs(entry_attrs):
"""
Set individual attributes from some values from a certificate.
@@ -239,7 +185,7 @@ def set_certificate_attrs(entry_attrs):
cert = entry_attrs['usercertificate'][0]
else:
cert = entry_attrs['usercertificate']
- cert = normalize_certificate(cert)
+ cert = x509.normalize_certificate(cert)
cert = x509.load_certificate(cert, datatype=x509.DER)
entry_attrs['subject'] = unicode(cert.subject)
entry_attrs['serial_number'] = unicode(cert.serial_number)
@@ -249,50 +195,6 @@ def set_certificate_attrs(entry_attrs):
entry_attrs['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
entry_attrs['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
-def check_writable_file(filename):
- """
- Determine if the file is writable. If the file doesn't exist then
- open the file to test writability.
- """
- if filename is None:
- raise errors.FileError(reason='Filename is empty')
- try:
- if file_exists(filename):
- if not os.access(filename, os.W_OK):
- raise errors.FileError(reason=_('Permission denied: %(file)s') % dict(file=filename))
- else:
- fp = open(filename, 'w')
- fp.close()
- except (IOError, OSError), e:
- raise errors.FileError(reason=str(e))
-
-def make_pem(data):
- """
- Convert a raw base64-encoded blob into something that looks like a PE
- file with lines split to 64 characters and proper headers.
- """
- cert = '\n'.join([data[x:x+64] for x in range(0, len(data), 64)])
- return '-----BEGIN CERTIFICATE-----\n' + \
- cert + \
- '\n-----END CERTIFICATE-----'
-
-def write_certificate(cert, filename):
- """
- Check to see if the certificate should be written to a file and do so.
- """
- if cert and util.isvalid_base64(cert):
- try:
- cert = base64.b64decode(cert)
- except Exception, e:
- raise errors.Base64DecodeError(reason=str(e))
-
- try:
- fp = open(filename, 'w')
- fp.write(make_pem(base64.b64encode(cert)))
- fp.close()
- except (IOError, OSError), e:
- raise errors.FileError(reason=str(e))
-
class service(LDAPObject):
"""
Service object.
@@ -361,9 +263,9 @@ class service_add(LDAPCreate):
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, hostname, cert)
- entry_attrs['usercertificate'] = cert
+ dercert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, hostname, dercert)
+ entry_attrs['usercertificate'] = dercert
if not options.get('force', False):
# We know the host exists if we've gotten this far but we
@@ -430,8 +332,8 @@ class service_mod(LDAPUpdate):
(service, hostname, realm) = split_principal(keys[-1])
cert = options.get('usercertificate')
if cert:
- cert = normalize_certificate(cert)
- verify_cert_subject(ldap, hostname, cert)
+ dercert = x509.normalize_certificate(cert)
+ x509.verify_cert_subject(ldap, hostname, dercert)
(dn, entry_attrs_old) = ldap.get_entry(dn, ['usercertificate'])
if 'usercertificate' in entry_attrs_old:
# FIXME: what to do here? do we revoke the old cert?
@@ -439,7 +341,7 @@ class service_mod(LDAPUpdate):
x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER)
)
raise errors.GenericError(format=fmt)
- entry_attrs['usercertificate'] = cert
+ entry_attrs['usercertificate'] = dercert
else:
entry_attrs['usercertificate'] = None
return dn
@@ -513,10 +415,10 @@ class service_show(LDAPRetrieve):
def forward(self, *keys, **options):
if 'out' in options:
- check_writable_file(options['out'])
+ util.check_writable_file(options['out'])
result = super(service_show, self).forward(*keys, **options)
if 'usercertificate' in result['result']:
- write_certificate(result['result']['usercertificate'][0], options['out'])
+ x509.write_certificate(result['result']['usercertificate'][0], options['out'])
result['summary'] = _('Certificate stored in file \'%(file)s\'') % dict(file=options['out'])
return result
else:
@@ -564,7 +466,7 @@ class service_disable(LDAPQuery):
done_work = False
if 'usercertificate' in entry_attrs:
- cert = normalize_certificate(entry_attrs.get('usercertificate')[0])
+ cert = x509.normalize_certificate(entry_attrs.get('usercertificate')[0])
try:
serial = unicode(x509.get_serial_number(cert, x509.DER))
try: