diff options
Diffstat (limited to 'ipalib/plugins')
-rw-r--r-- | ipalib/plugins/cert.py | 165 | ||||
-rw-r--r-- | ipalib/plugins/host.py | 6 | ||||
-rw-r--r-- | ipalib/plugins/service.py | 25 |
3 files changed, 95 insertions, 101 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 17e4c46b0..1de4ac64e 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -44,7 +44,7 @@ EXAMPLES: ipa cert-request --add --principal=HTTP/lion.example.com example.csr Retrieve an existing certificate: - ipa cert-request 1032 + ipa cert-show 1032 Revoke a certificate (see RFC 5280 for reason details): ipa cert-revoke --revocation-reason=6 1032 @@ -75,53 +75,8 @@ import traceback from ipalib.text import _ from ipalib.request import context from ipalib.output import Output - -def get_serial(certificate): - """ - Given a certificate, return the serial number in that cert - as a Python long object. - - In theory there should be only one cert per object so even if we get - passed in a list/tuple only return the first one. - """ - if type(certificate) in (list, tuple): - certificate = certificate[0] - try: - certificate = base64.b64decode(certificate) - except Exception, e: - pass - try: - - serial = x509.get_serial_number(certificate, x509.DER) - except PyAsn1Error: - raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry')) - - return serial - -def get_subject(certificate): - """ - Given a certificate, return the subject - - In theory there should be only one cert per object so even if we get - passed in a list/tuple only return the first one. - """ - if type(certificate) in (list, tuple): - certificate = certificate[0] - try: - certificate = base64.b64decode(certificate) - except Exception, e: - pass - try: - sub = list(x509.get_subject_components(certificate, type=x509.DER)) - sub.reverse() - except PyAsn1Error: - raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry')) - - subject = "" - for s in sub: - subject = subject + "%s=%s," % (s[0], s[1]) - - return subject[:-1] +from ipalib.plugins.service import validate_principal +import nss.nss as nss def get_csr_hostname(csr): """ @@ -192,6 +147,20 @@ def normalize_csr(csr): return csr +def get_host_from_principal(principal): + """ + Given a principal with or without a realm return the + host portion. + """ + validate_principal(None, principal) + realm = principal.find('@') + slash = principal.find('/') + if realm == -1: + realm = len(principal) + hostname = principal[slash+1:realm] + + return hostname + class cert_request(VirtualCommand): """ Submit a certificate signing request. @@ -219,6 +188,9 @@ class cert_request(VirtualCommand): default=False, autofill=True ), + ) + + has_output_params = ( Str('certificate?', label=_('Certificate'), flags=['no_create', 'no_update', 'no_search'], @@ -227,6 +199,26 @@ class cert_request(VirtualCommand): label=_('Subject'), flags=['no_create', 'no_update', 'no_search'], ), + Str('issuer?', + label=_('Issuer'), + flags=['no_create', 'no_update', 'no_search'], + ), + Str('valid_not_before?', + label=_('Not Before'), + flags=['no_create', 'no_update', 'no_search'], + ), + Str('valid_not_after?', + label=_('Not After'), + flags=['no_create', 'no_update', 'no_search'], + ), + Str('md5_fingerprint?', + label=_('Fingerprint (MD5)'), + flags=['no_create', 'no_update', 'no_search'], + ), + Str('sha1_fingerprint?', + label=_('Fingerprint (SHA1)'), + flags=['no_create', 'no_update', 'no_search'], + ), Str('serial_number?', label=_('Serial number'), flags=['no_create', 'no_update', 'no_search'], @@ -281,11 +273,7 @@ class cert_request(VirtualCommand): service = api.Command['service_show'](principal, all=True, raw=True)['result'] dn = service['dn'] else: - realm = principal.find('@') - if realm == -1: - realm = len(principal) - hostname = principal[5:realm] - + hostname = get_host_from_principal(principal) service = api.Command['host_show'](hostname, all=True, raw=True)['result'] dn = service['dn'] except errors.NotFound, e: @@ -319,12 +307,12 @@ class cert_request(VirtualCommand): raise errors.ACIError(info="Insufficient privilege to create a certificate with subject alt name '%s'." % name) if 'usercertificate' in service: - serial = get_serial(base64.b64encode(service['usercertificate'][0])) + serial = x509.get_serial_number(service['usercertificate'][0], datatype=x509.DER) # revoke the certificate and remove it from the service # entry before proceeding. First we retrieve the certificate to # see if it is already revoked, if not then we revoke it. try: - result = api.Command['cert_get'](unicode(serial))['result'] + result = api.Command['cert_show'](unicode(serial))['result'] if 'revocation_reason' not in result: try: api.Command['cert_revoke'](unicode(serial), revocation_reason=4) @@ -334,10 +322,20 @@ class cert_request(VirtualCommand): except errors.NotImplementedError: # some CA's might not implement get pass - api.Command['service_mod'](principal, usercertificate=None) + if not principal.startswith('host/'): + api.Command['service_mod'](principal, usercertificate=None) + else: + hostname = get_host_from_principal(principal) + api.Command['host_mod'](hostname, usercertificate=None) # Request the certificate result = self.Backend.ra.request_certificate(csr, **kw) + cert = x509.load_certificate(result['certificate']) + result['issuer'] = unicode(cert.issuer) + result['valid_not_before'] = unicode(cert.valid_not_before_str) + result['valid_not_after'] = unicode(cert.valid_not_after_str) + result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) + result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) # Success? Then add it to the service entry. if 'certificate' in result: @@ -345,10 +343,7 @@ class cert_request(VirtualCommand): skw = {"usercertificate": str(result.get('certificate'))} api.Command['service_mod'](principal, **skw) else: - realm = principal.find('@') - if realm == -1: - realm = len(principal) - hostname = principal[5:realm] + hostname = get_host_from_principal(principal) skw = {"usercertificate": str(result.get('certificate'))} api.Command['host_mod'](hostname, **skw) @@ -370,10 +365,9 @@ class cert_status(VirtualCommand): flags=['no_create', 'no_update', 'no_search'], ), ) - takes_options = ( - Str('cert_request_status?', + has_output_params = ( + Str('cert_request_status', label=_('Request status'), - flags=['no_create', 'no_update', 'no_search'], ), ) operation = "certificate status" @@ -393,25 +387,37 @@ _serial_number = Str('serial_number', doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'), ) -class cert_get(VirtualCommand): +class cert_show(VirtualCommand): """ Retrieve an existing certificate. """ takes_args = _serial_number - takes_options = ( - Str('certificate?', + has_output_params = ( + Str('certificate', label=_('Certificate'), - flags=['no_create', 'no_update', 'no_search'], ), - Str('subject?', + Str('subject', label=_('Subject'), - flags=['no_create', 'no_update', 'no_search'], + ), + Str('issuer', + label=_('Issuer'), + ), + Str('valid_not_before', + label=_('Not Before'), + ), + Str('valid_not_after', + label=_('Not After'), + ), + Str('md5_fingerprint', + label=_('Fingerprint (MD5)'), + ), + Str('sha1_fingerprint', + label=_('Fingerprint (SHA1)'), ), Str('revocation_reason?', label=_('Revocation reason'), - flags=['no_create', 'no_update', 'no_search'], ), ) @@ -420,10 +426,16 @@ class cert_get(VirtualCommand): def execute(self, serial_number): self.check_access() result=self.Backend.ra.get_certificate(serial_number) - result['subject'] = get_subject(result['certificate']) + cert = x509.load_certificate(result['certificate']) + result['subject'] = unicode(cert.subject) + result['issuer'] = unicode(cert.issuer) + result['valid_not_before'] = unicode(cert.valid_not_before_str) + result['valid_not_after'] = unicode(cert.valid_not_after_str) + result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) + result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) return dict(result=result) -api.register(cert_get) +api.register(cert_show) class cert_revoke(VirtualCommand): @@ -433,10 +445,9 @@ class cert_revoke(VirtualCommand): takes_args = _serial_number - takes_options = ( - Flag('revoked?', + has_output_params = ( + Flag('revoked', label=_('Revoked'), - flags=['no_create', 'no_update', 'no_search'], ), ) operation = "revoke certificate" @@ -468,14 +479,12 @@ class cert_remove_hold(VirtualCommand): takes_args = _serial_number - takes_options = ( + has_output_params = ( Flag('unrevoked?', label=_('Unrevoked'), - flags=['no_create', 'no_update', 'no_search'], ), Str('error_string?', label=_('Error'), - flags=['no_create', 'no_update', 'no_search'], ), ) operation = "certificate remove hold" diff --git a/ipalib/plugins/host.py b/ipalib/plugins/host.py index b0d7289a8..b42cbbcb7 100644 --- a/ipalib/plugins/host.py +++ b/ipalib/plugins/host.py @@ -71,8 +71,8 @@ from ipalib import Str, Flag, Bytes from ipalib.plugins.baseldap import * from ipalib.plugins.service import split_principal from ipalib.plugins.service import validate_certificate -from ipalib.plugins.service import get_serial from ipalib import _, ngettext +from ipalib import x509 import base64 @@ -291,10 +291,10 @@ class host_mod(LDAPUpdate): if 'usercertificate' in entry_attrs_old: # FIXME: what to do here? do we revoke the old cert? fmt = 'entry already has a certificate, serial number: %s' % ( - get_serial(entry_attrs_old['usercertificate']) + x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER) ) raise errors.GenericError(format=fmt) - # FIXME: should be in normalizer; see service_add + # FIXME: decoding should be in normalizer; see service_add entry_attrs['usercertificate'] = base64.b64decode(cert) return dn diff --git a/ipalib/plugins/service.py b/ipalib/plugins/service.py index 623128bf1..37de3df42 100644 --- a/ipalib/plugins/service.py +++ b/ipalib/plugins/service.py @@ -64,26 +64,9 @@ from ipalib import api, errors from ipalib import Str, Flag, Bytes from ipalib.plugins.baseldap import * from ipalib import x509 -from pyasn1.error import PyAsn1Error from ipalib import _, ngettext -def get_serial(certificate): - """ - Given a certificate, return the serial number in that - cert as a Python long object. - """ - if type(certificate) in (list, tuple): - certificate = certificate[0] - - try: - serial = x509.get_serial_number(certificate, type=x509.DER) - except PyAsn1Error, e: - raise errors.GenericError( - format='Unable to decode certificate in entry: %s' % e - ) - return serial - def split_principal(principal): service = hostname = realm = None @@ -194,6 +177,7 @@ class service_add(LDAPCreate): cert = entry_attrs.get('usercertificate') if cert: + cert = cert[0] # FIXME: should be in a normalizer: need to fix normalizers # to work on non-unicode data entry_attrs['usercertificate'] = base64.b64decode(cert) @@ -229,9 +213,10 @@ class service_del(LDAPDelete): (dn, entry_attrs) = ldap.get_entry(dn, ['usercertificate']) cert = entry_attrs.get('usercertificate') if cert: - serial = unicode(get_serial(cert)) + cert = cert[0] + serial = unicode(x509.get_serial_number(cert, x509.DER)) try: - result = api.Command['cert_get'](unicode(serial))['result'] + result = api.Command['cert_show'](unicode(serial))['result'] if 'revocation_reason' not in result: try: api.Command['cert_revoke'](unicode(serial), revocation_reason=4) @@ -267,7 +252,7 @@ class service_mod(LDAPUpdate): if 'usercertificate' in entry_attrs_old: # FIXME: what to do here? do we revoke the old cert? fmt = 'entry already has a certificate, serial number: %s' % ( - get_serial(entry_attrs_old['usercertificate']) + x509.get_serial_number(entry_attrs_old['usercertificate'][0], x509.DER) ) raise errors.GenericError(format=fmt) # FIXME: should be in normalizer; see service_add |