summaryrefslogtreecommitdiffstats
path: root/ipalib/plugins/cert.py
diff options
context:
space:
mode:
authorRob Crittenden <rcritten@redhat.com>2010-01-20 10:03:42 -0500
committerRob Crittenden <rcritten@redhat.com>2010-01-20 17:01:24 -0500
commit3a536353fb9c92d076063fa44b95e5230bd58fa6 (patch)
treec765be598b63e8d17670b5b3431b97189261b802 /ipalib/plugins/cert.py
parentc15c1eee729e912f4f55c90861d4dd0be0bdd601 (diff)
downloadfreeipa-3a536353fb9c92d076063fa44b95e5230bd58fa6.tar.gz
freeipa-3a536353fb9c92d076063fa44b95e5230bd58fa6.tar.xz
freeipa-3a536353fb9c92d076063fa44b95e5230bd58fa6.zip
Fix plugin to work with new output validation, add new helpers
Add a new get_subject() helper and return the subject when retrieving certificates. Add a normalizer so that everything before and after the BEGIN/END block is removed.
Diffstat (limited to 'ipalib/plugins/cert.py')
-rw-r--r--ipalib/plugins/cert.py91
1 files changed, 57 insertions, 34 deletions
diff --git a/ipalib/plugins/cert.py b/ipalib/plugins/cert.py
index 312ad39b8..2b1d3ffcc 100644
--- a/ipalib/plugins/cert.py
+++ b/ipalib/plugins/cert.py
@@ -64,6 +64,31 @@ def get_serial(certificate):
return serial
+def get_subject(certificate):
+ """
+ Given a certificate, return the subject
+
+ In theory there should be only one cert per object so even if we get
+ passed in a list/tuple only return the first one.
+ """
+ if type(certificate) in (list, tuple):
+ certificate = certificate[0]
+ try:
+ certificate = base64.b64decode(certificate)
+ except Exception, e:
+ pass
+ try:
+ sub = list(x509.get_subject_components(certificate, type=x509.DER))
+ sub.reverse()
+ except PyAsn1Error:
+ raise errors.CertificateOperationError(error=_('Unable to decode certificate in entry'))
+
+ subject = ""
+ for s in sub:
+ subject = subject + "%s=%s," % (s[0], s[1])
+
+ return subject[:-1]
+
def get_csr_hostname(csr):
"""
Return the value of CN in the subject of the request
@@ -113,6 +138,25 @@ def validate_csr(ugettext, csr):
except Exception, e:
raise errors.CertificateOperationError(error=_('Failure decoding Certificate Signing Request: %s') % str(e))
+def normalize_csr(csr):
+ """
+ Strip any leading and trailing cruft around the BEGIN/END block
+ """
+ end_len = 37
+ s = csr.find('-----BEGIN NEW CERTIFICATE REQUEST-----')
+ if s == -1:
+ s = csr.find('-----BEGIN CERTIFICATE REQUEST-----')
+ e = csr.find('-----END NEW CERTIFICATE REQUEST-----')
+ if e == -1:
+ e = csr.find('-----END CERTIFICATE REQUEST-----')
+ if e != -1:
+ end_len = 33
+
+ if s > -1 and e > -1:
+ # We're normalizing here, not validating
+ csr = csr[s:e+end_len]
+
+ return csr
class cert_request(VirtualCommand):
"""
@@ -122,6 +166,7 @@ class cert_request(VirtualCommand):
takes_args = (
File('csr', validate_csr,
cli_name='csr_file',
+ normalizer=normalize_csr,
),
)
operation="request certificate"
@@ -242,12 +287,6 @@ class cert_request(VirtualCommand):
result=result
)
- def output_for_cli(self, textui, result, *args, **kw):
- if isinstance(result, dict) and len(result) > 0:
- textui.print_entry(result, 0)
- else:
- textui.print_plain(_('Failed to submit a certificate request.'))
-
api.register(cert_request)
@@ -262,13 +301,9 @@ class cert_status(VirtualCommand):
def execute(self, request_id, **kw):
self.check_access()
- return self.Backend.ra.check_request_status(request_id)
-
- def output_for_cli(self, textui, result, *args, **kw):
- if isinstance(result, dict) and len(result) > 0:
- textui.print_entry(result, 0)
- else:
- textui.print_plain(_('Failed to retrieve a request status.'))
+ return dict(
+ result=self.Backend.ra.check_request_status(request_id)
+ )
api.register(cert_status)
@@ -284,13 +319,9 @@ class cert_get(VirtualCommand):
def execute(self, serial_number):
self.check_access()
- return self.Backend.ra.get_certificate(serial_number)
-
- def output_for_cli(self, textui, result, *args, **kw):
- if isinstance(result, dict) and len(result) > 0:
- textui.print_entry(result, 0)
- else:
- textui.print_plain(_('Failed to obtain a certificate.'))
+ result=self.Backend.ra.get_certificate(serial_number)
+ result['subject'] = get_subject(result['certificate'])
+ return dict(result=result)
api.register(cert_get)
@@ -317,13 +348,9 @@ class cert_revoke(VirtualCommand):
def execute(self, serial_number, **kw):
self.check_access()
- return self.Backend.ra.revoke_certificate(serial_number, **kw)
-
- def output_for_cli(self, textui, result, *args, **kw):
- if isinstance(result, dict) and len(result) > 0:
- textui.print_entry(result, 0)
- else:
- textui.print_plain(_('Failed to revoke a certificate.'))
+ return dict(
+ result=self.Backend.ra.revoke_certificate(serial_number, **kw)
+ )
api.register(cert_revoke)
@@ -339,12 +366,8 @@ class cert_remove_hold(VirtualCommand):
def execute(self, serial_number, **kw):
self.check_access()
- return self.Backend.ra.take_certificate_off_hold(serial_number)
-
- def output_for_cli(self, textui, result, *args, **kw):
- if isinstance(result, dict) and len(result) > 0:
- textui.print_entry(result, 0)
- else:
- textui.print_plain(_('Failed to take a revoked certificate off hold.'))
+ return dict(
+ result=self.Backend.ra.take_certificate_off_hold(serial_number)
+ )
api.register(cert_remove_hold)