summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/cert.py
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2010-06-24 11:40:02 -0400
committerRob Crittenden <rcritten@redhat.com>2010-07-15 10:51:49 -0400
commit8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c (patch)
treec364bfb5b5926a165f1e6bc29e355131636afe45 /ipalib/plugins/cert.py
parent1e1985b17c3988056bef045fa84a9c7aaf0c4c65 (diff)
downloadfreeipa-8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c.tar.gz
freeipa-8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c.tar.xz
freeipa-8d2d7429beb6bf66cb3c4fc35a7a3dbb165a432c.zip
Clean up crypto code, take advantage of new nss-python capabilities
This patch does the following: - drops our in-tree x509v3 parser to use the python-nss one - return more information on certificates - make an API change, renaming cert-get to cert-show - Drop a lot of duplicated code
Diffstat (limited to 'ipalib/plugins/cert.py')
-rw-r--r--ipalib/plugins/cert.py165
1 files changed, 87 insertions, 78 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 17e4c46b..1de4ac64 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -44,7 +44,7 @@ EXAMPLES:
ipa cert-request --add --principal=HTTP/lion.example.com example.csr
Retrieve an existing certificate:
- ipa cert-request 1032
+ ipa cert-show 1032
Revoke a certificate (see RFC 5280 for reason details):
ipa cert-revoke --revocation-reason=6 1032
@@ -75,53 +75,8 @@ import traceback
from ipalib.text import _
from ipalib.request import context
from ipalib.output import Output
-
-def get_serial(certificate):
- """
- Given a certificate, return the serial number in that cert
- as a Python long object.
-
- In theory there should be only one cert per object so even if we get
- passed in a list/tuple only return the first one.
- """
- if type(certificate) in (list, tuple):
- certificate = certificate[0]
- try:
- certificate = base64.b64decode(certificate)
- except Exception, e:
- pass
- try:
-
- serial = x509.get_serial_number(certificate, x509.DER)
- except PyAsn1Error:
- raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
-
- return serial
-
-def get_subject(certificate):
- """
- Given a certificate, return the subject
-
- In theory there should be only one cert per object so even if we get
- passed in a list/tuple only return the first one.
- """
- if type(certificate) in (list, tuple):
- certificate = certificate[0]
- try:
- certificate = base64.b64decode(certificate)
- except Exception, e:
- pass
- try:
- sub = list(x509.get_subject_components(certificate, type=x509.DER))
- sub.reverse()
- except PyAsn1Error:
- raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
-
- subject = ""
- for s in sub:
- subject = subject + "%s=%s," % (s[0], s[1])
-
- return subject[:-1]
+from ipalib.plugins.service import validate_principal
+import nss.nss as nss
def get_csr_hostname(csr):
"""
@@ -192,6 +147,20 @@ def normalize_csr(csr):
return csr
+def get_host_from_principal(principal):
+ """
+ Given a principal with or without a realm return the
+ host portion.
+ """
+ validate_principal(None, principal)
+ realm = principal.find('@')
+ slash = principal.find('/')
+ if realm == -1:
+ realm = len(principal)
+ hostname = principal[slash+1:realm]
+
+ return hostname
+
class cert_request(VirtualCommand):
"""
Submit a certificate signing request.
@@ -219,6 +188,9 @@ class cert_request(VirtualCommand):
default=False,
autofill=True
),
+ )
+
+ has_output_params = (
Str('certificate?',
label=_('Certificate'),
flags=['no_create', 'no_update', 'no_search'],
@@ -227,6 +199,26 @@ class cert_request(VirtualCommand):
label=_('Subject'),
flags=['no_create', 'no_update', 'no_search'],
),
+ Str('issuer?',
+ label=_('Issuer'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('valid_not_before?',
+ label=_('Not Before'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('valid_not_after?',
+ label=_('Not After'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('md5_fingerprint?',
+ label=_('Fingerprint (MD5)'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('sha1_fingerprint?',
+ label=_('Fingerprint (SHA1)'),
+ flags=['no_create', 'no_update', 'no_search'],
+ ),
Str('serial_number?',
label=_('Serial number'),
flags=['no_create', 'no_update', 'no_search'],
@@ -281,11 +273,7 @@ class cert_request(VirtualCommand):
service = api.Command['service_show'](principal, all=True, raw=True)['result']
dn = service['dn']
else:
- realm = principal.find('@')
- if realm == -1:
- realm = len(principal)
- hostname = principal[5:realm]
-
+ hostname = get_host_from_principal(principal)
service = api.Command['host_show'](hostname, all=True, raw=True)['result']
dn = service['dn']
except errors.NotFound, e:
@@ -319,12 +307,12 @@ class cert_request(VirtualCommand):
raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name)
if 'usercertificate' in service:
- serial = get_serial(base64.b64encode(service['usercertificate'][0]))
+ serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER)
# revoke the certificate and remove it from the service
# entry before proceeding. First we retrieve the certificate to
# see if it is already revoked, if not then we revoke it.
try:
- result = api.Command['cert_get'](unicode(serial))['result']
+ result = api.Command['cert_show'](unicode(serial))['result']
if 'revocation_reason' not in result:
try:
api.Command['cert_revoke'](unicode(serial), revocation_reason=4)
@@ -334,10 +322,20 @@ class cert_request(VirtualCommand):
except errors.NotImplementedError:
# some CA's might not implement get
pass
- api.Command['service_mod'](principal, usercertificate=None)
+ if not principal.startswith('host/'):
+ api.Command['service_mod'](principal, usercertificate=None)
+ else:
+ hostname = get_host_from_principal(principal)
+ api.Command['host_mod'](hostname, usercertificate=None)
# Request the certificate
result = self.Backend.ra.request_certificate(csr, **kw)
+ cert = x509.load_certificate(result['certificate'])
+ result['issuer'] = unicode(cert.issuer)
+ result['valid_not_before'] = unicode(cert.valid_not_before_str)
+ result['valid_not_after'] = unicode(cert.valid_not_after_str)
+ result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
+ result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
# Success? Then add it to the service entry.
if 'certificate' in result:
@@ -345,10 +343,7 @@ class cert_request(VirtualCommand):
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['service_mod'](principal, **skw)
else:
- realm = principal.find('@')
- if realm == -1:
- realm = len(principal)
- hostname = principal[5:realm]
+ hostname = get_host_from_principal(principal)
skw = {"usercertificate": str(result.get('certificate'))}
api.Command['host_mod'](hostname, **skw)
@@ -370,10 +365,9 @@ class cert_status(VirtualCommand):
flags=['no_create', 'no_update', 'no_search'],
),
)
- takes_options = (
- Str('cert_request_status?',
+ has_output_params = (
+ Str('cert_request_status',
label=_('Request status'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
operation = "certificate status"
@@ -393,25 +387,37 @@ _serial_number = Str('serial_number',
doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'),
)
-class cert_get(VirtualCommand):
+class cert_show(VirtualCommand):
"""
Retrieve an existing certificate.
"""
takes_args = _serial_number
- takes_options = (
- Str('certificate?',
+ has_output_params = (
+ Str('certificate',
label=_('Certificate'),
- flags=['no_create', 'no_update', 'no_search'],
),
- Str('subject?',
+ Str('subject',
label=_('Subject'),
- flags=['no_create', 'no_update', 'no_search'],
+ ),
+ Str('issuer',
+ label=_('Issuer'),
+ ),
+ Str('valid_not_before',
+ label=_('Not Before'),
+ ),
+ Str('valid_not_after',
+ label=_('Not After'),
+ ),
+ Str('md5_fingerprint',
+ label=_('Fingerprint (MD5)'),
+ ),
+ Str('sha1_fingerprint',
+ label=_('Fingerprint (SHA1)'),
),
Str('revocation_reason?',
label=_('Revocation reason'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
@@ -420,10 +426,16 @@ class cert_get(VirtualCommand):
def execute(self, serial_number):
self.check_access()
result=self.Backend.ra.get_certificate(serial_number)
- result['subject'] = get_subject(result['certificate'])
+ cert = x509.load_certificate(result['certificate'])
+ result['subject'] = unicode(cert.subject)
+ result['issuer'] = unicode(cert.issuer)
+ result['valid_not_before'] = unicode(cert.valid_not_before_str)
+ result['valid_not_after'] = unicode(cert.valid_not_after_str)
+ result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0])
+ result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0])
return dict(result=result)
-api.register(cert_get)
+api.register(cert_show)
class cert_revoke(VirtualCommand):
@@ -433,10 +445,9 @@ class cert_revoke(VirtualCommand):
takes_args = _serial_number
- takes_options = (
- Flag('revoked?',
+ has_output_params = (
+ Flag('revoked',
label=_('Revoked'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
operation = "revoke certificate"
@@ -468,14 +479,12 @@ class cert_remove_hold(VirtualCommand):
takes_args = _serial_number
- takes_options = (
+ has_output_params = (
Flag('unrevoked?',
label=_('Unrevoked'),
- flags=['no_create', 'no_update', 'no_search'],
),
Str('error_string?',
label=_('Error'),
- flags=['no_create', 'no_update', 'no_search'],
),
)
operation = "certificate remove hold"