From 3a536353fb9c92d076063fa44b95e5230bd58fa6 Mon Sep 17 00:00:00 2001 From: Rob Crittenden Date: Wed, 20 Jan 2010 10:03:42 -0500 Subject: Fix plugin to work with new output validation, add new helpers Add a new get_subject() helper and return the subject when retrieving certificates. Add a normalizer so that everything before and after the BEGIN/END block is removed. --- ipalib/plugins/cert.py | 91 +++++++++++++++++++++++++++++++------------------- 1 file changed, 57 insertions(+), 34 deletions(-) (limited to 'ipalib') diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py index 312ad39b..2b1d3ffc 100644 --- a/ipalib/plugins/cert.py +++ b/ipalib/plugins/cert.py @@ -64,6 +64,31 @@ def get_serial(certificate): return serial +def get_subject(certificate): + """ + Given a certificate, return the subject + + In theory there should be only one cert per object so even if we get + passed in a list/tuple only return the first one. + """ + if type(certificate) in (list, tuple): + certificate = certificate[0] + try: + certificate = base64.b64decode(certificate) + except Exception, e: + pass + try: + sub = list(x509.get_subject_components(certificate, type=x509.DER)) + sub.reverse() + except PyAsn1Error: + raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry')) + + subject = "" + for s in sub: + subject = subject + "%s=%s," % (s[0], s[1]) + + return subject[:-1] + def get_csr_hostname(csr): """ Return the value of CN in the subject of the request @@ -113,6 +138,25 @@ def validate_csr(ugettext, csr): except Exception, e: raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e)) +def normalize_csr(csr): + """ + Strip any leading and trailing cruft around the BEGIN/END block + """ + end_len = 37 + s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----') + if s == -1: + s = csr.find('-----BEGIN CERTIFICATE REQUEST-----') + e = csr.find('-----END NEW CERTIFICATE REQUEST-----') + if e == -1: + e = csr.find('-----END CERTIFICATE REQUEST-----') + if e != -1: + end_len = 33 + + if s > -1 and e > -1: + # We're normalizing here, not validating + csr = csr[s:e+end_len] + + return csr class cert_request(VirtualCommand): """ @@ -122,6 +166,7 @@ class cert_request(VirtualCommand): takes_args = ( File('csr', validate_csr, cli_name='csr_file', + normalizer=normalize_csr, ), ) operation="request certificate" @@ -242,12 +287,6 @@ class cert_request(VirtualCommand): result=result ) - def output_for_cli(self, textui, result, *args, **kw): - if isinstance(result, dict) and len(result) > 0: - textui.print_entry(result, 0) - else: - textui.print_plain(_('Failed to submit a certificate request.')) - api.register(cert_request) @@ -262,13 +301,9 @@ class cert_status(VirtualCommand): def execute(self, request_id, **kw): self.check_access() - return self.Backend.ra.check_request_status(request_id) - - def output_for_cli(self, textui, result, *args, **kw): - if isinstance(result, dict) and len(result) > 0: - textui.print_entry(result, 0) - else: - textui.print_plain(_('Failed to retrieve a request status.')) + return dict( + result=self.Backend.ra.check_request_status(request_id) + ) api.register(cert_status) @@ -284,13 +319,9 @@ class cert_get(VirtualCommand): def execute(self, serial_number): self.check_access() - return self.Backend.ra.get_certificate(serial_number) - - def output_for_cli(self, textui, result, *args, **kw): - if isinstance(result, dict) and len(result) > 0: - textui.print_entry(result, 0) - else: - textui.print_plain(_('Failed to obtain a certificate.')) + result=self.Backend.ra.get_certificate(serial_number) + result['subject'] = get_subject(result['certificate']) + return dict(result=result) api.register(cert_get) @@ -317,13 +348,9 @@ class cert_revoke(VirtualCommand): def execute(self, serial_number, **kw): self.check_access() - return self.Backend.ra.revoke_certificate(serial_number, **kw) - - def output_for_cli(self, textui, result, *args, **kw): - if isinstance(result, dict) and len(result) > 0: - textui.print_entry(result, 0) - else: - textui.print_plain(_('Failed to revoke a certificate.')) + return dict( + result=self.Backend.ra.revoke_certificate(serial_number, **kw) + ) api.register(cert_revoke) @@ -339,12 +366,8 @@ class cert_remove_hold(VirtualCommand): def execute(self, serial_number, **kw): self.check_access() - return self.Backend.ra.take_certificate_off_hold(serial_number) - - def output_for_cli(self, textui, result, *args, **kw): - if isinstance(result, dict) and len(result) > 0: - textui.print_entry(result, 0) - else: - textui.print_plain(_('Failed to take a revoked certificate off hold.')) + return dict( + result=self.Backend.ra.take_certificate_off_hold(serial_number) + ) api.register(cert_remove_hold) -- cgit