diff options
| author | Jan Cholasta <jcholast@redhat.com> | 2016-06-14 06:29:18 +0200 |
|---|---|---|
| committer | Jan Cholasta <jcholast@redhat.com> | 2016-06-21 09:45:20 +0200 |
| commit | d44ffdad4285bf2a1c0b044e07ef1b18c7d50de1 (patch) | |
| tree | eab47f31cd405f88fa8f1b753f622b0d09707c3e /ipaserver/plugins | |
| parent | b484667d15932cd4a7e03a3a2c1135e7500eee4b (diff) | |
| download | freeipa-d44ffdad4285bf2a1c0b044e07ef1b18c7d50de1.tar.gz freeipa-d44ffdad4285bf2a1c0b044e07ef1b18c7d50de1.tar.xz freeipa-d44ffdad4285bf2a1c0b044e07ef1b18c7d50de1.zip | |
cert: add object plugin
Implement cert as an object with methods rather than a bunch of loosely
related commands.
https://fedorahosted.org/freeipa/ticket/5381
Reviewed-By: David Kupka <dkupka@redhat.com>
Reviewed-By: Pavel Vomacka <pvomacka@redhat.com>
Diffstat (limited to 'ipaserver/plugins')
| -rw-r--r-- | ipaserver/plugins/cert.py | 522 |
1 files changed, 285 insertions, 237 deletions
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 76a2fbc3e..3b180ee49 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -19,9 +19,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. -import os -import time import binascii +import datetime +import os + +from nss import nss +from nss.error import NSPRError +from pyasn1.error import PyAsn1Error +import six from ipalib import Command, Str, Int, Flag from ipalib import api @@ -30,6 +35,9 @@ from ipalib import pkcs10 from ipalib import x509 from ipalib import ngettext from ipalib.constants import IPA_CA_CN +from ipalib.crud import Create, PKQuery, Retrieve, Search +from ipalib.frontend import Method, Object +from ipalib.parameters import Bytes, DateTime, DNParam from ipalib.plugable import Registry from .virtual import VirtualCommand from .baseldap import pkey_to_value @@ -42,11 +50,6 @@ from ipalib import output from .service import validate_principal from ipapython.dn import DN -import six -import nss.nss as nss -from nss.error import NSPRError -from pyasn1.error import PyAsn1Error - if six.PY3: unicode = str @@ -134,16 +137,12 @@ USER, HOST, SERVICE = range(3) register = Registry() -def validate_pkidate(ugettext, value): - """ - A date in the format of %Y-%m-%d - """ - try: - ts = time.strptime(value, '%Y-%m-%d') - except ValueError as e: - return str(e) +PKIDATE_FORMAT = '%Y-%m-%d' + + +def normalize_pkidate(value): + return datetime.datetime.strptime(value, PKIDATE_FORMAT) - return None def validate_csr(ugettext, csr): """ @@ -182,7 +181,8 @@ def normalize_csr(csr): return csr -def _convert_serial_number(num): + +def normalize_serial_number(num): """ Convert a SN given in decimal or hexadecimal. Returns the number or None if conversion fails. @@ -195,18 +195,10 @@ def _convert_serial_number(num): # hexa without prefix num = int(num, 16) except ValueError: - num = None - - return num + pass -def validate_serial_number(ugettext, num): - if _convert_serial_number(num) == None: - return u"Decimal or hexadecimal number is required for serial number" - return None + return unicode(num) -def normalize_serial_number(num): - # It's been already validated - return unicode(_convert_serial_number(num)) def get_host_from_principal(principal): """ @@ -242,85 +234,154 @@ def caacl_check(principal_type, principal_string, ca, profile_id): ) ) -@register() -class cert_request(VirtualCommand): - __doc__ = _('Submit a certificate signing request.') - - takes_args = ( - Str( - 'csr', validate_csr, - label=_('CSR'), - cli_name='csr_file', - normalizer=normalize_csr, - noextrawhitespace=False, - ), - ) - operation="request certificate" - - takes_options = ( - Str('principal', - label=_('Principal'), - doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), - ), - Str('request_type', - default=u'pkcs10', - autofill=True, - ), - Flag('add', - doc=_("automatically add the principal if it doesn't exist"), - default=False, - autofill=True - ), - Str('profile_id?', validate_profile_id, - label=_("Profile ID"), - doc=_("Certificate Profile to use"), - ), - Str('cacn?', - cli_name='ca', - query=True, - label=_("CA"), - doc=_("CA to use"), - ), - ) - has_output_params = ( - Str('certificate', - label=_('Certificate'), +class BaseCertObject(Object): + takes_params = ( + Bytes( + 'certificate', + label=_("Certificate"), + doc=_("Base-64 encoded certificate."), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('subject', + DNParam( + 'subject', label=_('Subject'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('issuer', + DNParam( + 'issuer', label=_('Issuer'), + doc=_('Issuer DN'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('valid_not_before', + DateTime( + 'valid_not_before', label=_('Not Before'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('valid_not_after', + DateTime( + 'valid_not_after', label=_('Not After'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('md5_fingerprint', + Str( + 'md5_fingerprint', label=_('Fingerprint (MD5)'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('sha1_fingerprint', + Str( + 'sha1_fingerprint', label=_('Fingerprint (SHA1)'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('serial_number', + Int( + 'serial_number', label=_('Serial number'), + doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'), + normalizer=normalize_serial_number, + flags={'no_create', 'no_update', 'no_search'}, ), - Str('serial_number_hex', + Str( + 'serial_number_hex', label=_('Serial number (hex)'), + flags={'no_create', 'no_update', 'no_search'}, ), ) - has_output = ( - output.Output('result', - type=dict, - doc=_('Dictionary mapping variable name to value'), + def _parse(self, obj): + cert = x509.load_certificate(obj['certificate']) + obj['subject'] = DN(unicode(cert.subject)) + obj['issuer'] = DN(unicode(cert.issuer)) + obj['valid_not_before'] = unicode(cert.valid_not_before_str) + obj['valid_not_after'] = unicode(cert.valid_not_after_str) + obj['md5_fingerprint'] = unicode( + nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) + obj['sha1_fingerprint'] = unicode( + nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + obj['serial_number'] = cert.serial_number + obj['serial_number_hex'] = u'0x%X' % cert.serial_number + + +class BaseCertMethod(Method): + def get_options(self): + yield Str('cacn?', + cli_name='ca', + query=True, + label=_('Issuing CA'), + doc=_('Name of issuing CA'), + ) + + for option in super(BaseCertMethod, self).get_options(): + yield option + + +@register() +class certreq(BaseCertObject): + takes_params = BaseCertObject.takes_params + ( + Str( + 'request_type', + default=u'pkcs10', + autofill=True, + flags={'no_update', 'no_update', 'no_search'}, + ), + Str( + 'profile_id?', validate_profile_id, + label=_("Profile ID"), + doc=_("Certificate Profile to use"), + flags={'no_update', 'no_update', 'no_search'}, + ), + Str( + 'cert_request_status', + label=_('Request status'), + flags={'no_create', 'no_update', 'no_search'}, + ), + Int( + 'request_id', + label=_('Request id'), + primary_key=True, + flags={'no_create', 'no_update', 'no_search', 'no_output'}, + ), + ) + + +@register() +class cert_request(Create, BaseCertMethod, VirtualCommand): + __doc__ = _('Submit a certificate signing request.') + + obj_name = 'certreq' + attr_name = 'request' + + takes_args = ( + Str( + 'csr', validate_csr, + label=_('CSR'), + cli_name='csr_file', + normalizer=normalize_csr, + noextrawhitespace=False, + ), + ) + operation="request certificate" + + takes_options = ( + Str( + 'principal', + label=_('Principal'), + doc=_('Principal for this certificate (e.g. HTTP/test.example.com)'), + ), + Flag( + 'add', + doc=_("automatically add the principal if it doesn't exist"), ), ) - def execute(self, csr, **kw): + def get_args(self): + # FIXME: the 'no_create' flag is ignored for positional arguments + for arg in super(cert_request, self).get_args(): + if arg.name == 'request_id': + continue + yield arg + + def execute(self, csr, all=False, raw=False, **kw): ca_enabled_check() ldap = self.api.Backend.ldap2 @@ -512,12 +573,9 @@ class cert_request(VirtualCommand): # Request the certificate result = self.Backend.ra.request_certificate( csr, profile_id, ca_id, request_type=request_type) - cert = x509.load_certificate(result['certificate']) - result['issuer'] = unicode(cert.issuer) - result['valid_not_before'] = unicode(cert.valid_not_before_str) - result['valid_not_after'] = unicode(cert.valid_not_after_str) - result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) - result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + if not raw: + self.obj._parse(result) + result['request_id'] = int(result['request_id']) # Success? Then add it to the principal's entry # (unless the profile tells us not to) @@ -534,91 +592,72 @@ class cert_request(VirtualCommand): api.Command['user_mod'](principal_name, **kwargs) return dict( - result=result + result=result, + value=pkey_to_value(int(result['request_id']), kw), ) - @register() -class cert_status(VirtualCommand): +class cert_status(Retrieve, BaseCertMethod, VirtualCommand): __doc__ = _('Check the status of a certificate signing request.') - takes_args = ( - Str('request_id', - label=_('Request id'), - flags=['no_create', 'no_update', 'no_search'], - ), - ) - has_output_params = ( - Str('cert_request_status', - label=_('Request status'), - ), - takes_args[0], - ) + obj_name = 'certreq' + attr_name = 'status' + operation = "certificate status" + def get_options(self): + for option in super(cert_status, self).get_options(): + if option.name == 'cacn': + continue + yield option def execute(self, request_id, **kw): ca_enabled_check() self.check_access() return dict( - result=self.Backend.ra.check_request_status(request_id) + result=self.Backend.ra.check_request_status(str(request_id)), + value=pkey_to_value(request_id, kw), ) - -_serial_number = Str('serial_number', - validate_serial_number, - label=_('Serial number'), - doc=_('Serial number in decimal or if prefixed with 0x in hexadecimal'), - normalizer=normalize_serial_number, -) - @register() -class cert_show(VirtualCommand): - __doc__ = _('Retrieve an existing certificate.') - - takes_args = _serial_number - - has_output_params = ( - Str('certificate', - label=_('Certificate'), - ), - Str('subject', - label=_('Subject'), - ), - Str('issuer', - label=_('Issuer'), - ), - Str('valid_not_before', - label=_('Not Before'), - ), - Str('valid_not_after', - label=_('Not After'), - ), - Str('md5_fingerprint', - label=_('Fingerprint (MD5)'), +class cert(BaseCertObject): + takes_params = BaseCertObject.takes_params + ( + Str( + 'status', + label=_('Status'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('sha1_fingerprint', - label=_('Fingerprint (SHA1)'), + Flag( + 'revoked', + label=_('Revoked'), + flags={'no_create', 'no_update', 'no_search'}, ), - Str('revocation_reason', + Int( + 'revocation_reason', label=_('Revocation reason'), + doc=_('Reason for revoking the certificate (0-10)'), + minvalue=0, + maxvalue=10, + flags={'no_create', 'no_update'}, ), - Str('serial_number_hex', - label=_('Serial number (hex)'), - ), - _serial_number, ) + def get_params(self): + for param in super(cert, self).get_params(): + if param.name == 'serial_number': + param = param.clone(primary_key=True) + elif param.name == 'issuer': + param = param.clone(flags=param.flags - {'no_search'}) + yield param + + +@register() +class cert_show(Retrieve, BaseCertMethod, VirtualCommand): + __doc__ = _('Retrieve an existing certificate.') + takes_options = ( - Str('cacn?', - cli_name='ca', - query=True, - label=_('Issuing CA'), - doc=_('Name of issuing CA'), - autofill=False, - ), Str('out?', label=_('Output filename'), doc=_('File to store the certificate in.'), @@ -628,7 +667,7 @@ class cert_show(VirtualCommand): operation="retrieve certificate" - def execute(self, serial_number, **options): + def execute(self, serial_number, all=False, raw=False, **options): ca_enabled_check() hostname = None try: @@ -648,7 +687,7 @@ class cert_show(VirtualCommand): # Dogtag lightweight CAs have shared serial number domain, so # we don't tell Dogtag the issuer (but we check the cert after). # - result=self.Backend.ra.get_certificate(serial_number) + result = self.Backend.ra.get_certificate(str(serial_number)) cert = x509.load_certificate(result['certificate']) if issuer_dn is not None and DN(unicode(cert.issuer)) != DN(issuer_dn): @@ -658,48 +697,37 @@ class cert_show(VirtualCommand): "issued by CA '%(ca)s' not found") % dict(serial=serial_number, ca=options['cacn'])) - result['subject'] = unicode(cert.subject) - result['issuer'] = unicode(cert.issuer) - result['valid_not_before'] = unicode(cert.valid_not_before_str) - result['valid_not_after'] = unicode(cert.valid_not_after_str) - result['md5_fingerprint'] = unicode(nss.data_to_hex(nss.md5_digest(cert.der_data), 64)[0]) - result['sha1_fingerprint'] = unicode(nss.data_to_hex(nss.sha1_digest(cert.der_data), 64)[0]) + if not raw: + result['certificate'] = result['certificate'].replace('\r\n', '') + self.obj._parse(result) + result['revoked'] = ('revocation_reason' in result) + if hostname: # If we have a hostname we want to verify that the subject # of the certificate matches it, otherwise raise an error if hostname != cert.subject.common_name: #pylint: disable=E1101 raise acierr - return dict(result=result) - - + return dict(result=result, value=pkey_to_value(serial_number, options)) @register() -class cert_revoke(VirtualCommand): +class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand): __doc__ = _('Revoke a certificate.') - takes_args = _serial_number - - has_output_params = ( - Flag('revoked', - label=_('Revoked'), - ), - ) operation = "revoke certificate" - # FIXME: The default is 0. Is this really an Int param? - takes_options = ( - Int('revocation_reason', - label=_('Reason'), - doc=_('Reason for revoking the certificate (0-10). Type ' - '"ipa help cert" for revocation reason details. '), - minvalue=0, - maxvalue=10, + def get_options(self): + # FIXME: The default is 0. Is this really an Int param? + yield self.obj.params['revocation_reason'].clone( default=0, - autofill=True - ), - ) + autofill=True, + ) + + for option in super(cert_revoke, self).get_options(): + if option.name == 'cacn': + continue + yield option def execute(self, serial_number, **kw): ca_enabled_check() @@ -719,17 +747,15 @@ class cert_revoke(VirtualCommand): raise errors.CertificateOperationError(error=_('7 is not a valid revocation reason')) return dict( result=self.Backend.ra.revoke_certificate( - serial_number, revocation_reason=revocation_reason) + str(serial_number), revocation_reason=revocation_reason) ) @register() -class cert_remove_hold(VirtualCommand): +class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand): __doc__ = _('Take a revoked certificate off hold.') - takes_args = _serial_number - has_output_params = ( Flag('unrevoked', label=_('Unrevoked'), @@ -740,17 +766,23 @@ class cert_remove_hold(VirtualCommand): ) operation = "certificate remove hold" + def get_options(self): + for option in super(cert_remove_hold, self).get_options(): + if option.name == 'cacn': + continue + yield option + def execute(self, serial_number, **kw): ca_enabled_check() self.check_access() return dict( - result=self.Backend.ra.take_certificate_off_hold(serial_number) + result=self.Backend.ra.take_certificate_off_hold( + str(serial_number)) ) - @register() -class cert_find(Command): +class cert_find(Search, BaseCertMethod): __doc__ = _('Search for existing certificates.') takes_options = ( @@ -759,26 +791,6 @@ class cert_find(Command): doc=_('Subject'), autofill=False, ), - Str('cacn?', - cli_name='ca', - query=True, - label=_('Issuing CA'), - doc=_('Name of issuing CA'), - autofill=False, - ), - Str('issuer?', - label=_('Issuer'), - doc=_('Issuer DN'), - autofill=False, - ), - Int('revocation_reason?', - label=_('Reason'), - doc=_('Reason for revoking the certificate (0-10). Type ' - '"ipa help cert" for revocation reason details.'), - minvalue=0, - maxvalue=10, - autofill=False, - ), Int('min_serial_number?', doc=_("minimum serial number"), autofill=False, @@ -795,60 +807,55 @@ class cert_find(Command): doc=_('match the common name exactly'), autofill=False, ), - Str('validnotafter_from?', validate_pkidate, + DateTime('validnotafter_from?', doc=_('Valid not after from this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('validnotafter_to?', validate_pkidate, + DateTime('validnotafter_to?', doc=_('Valid not after to this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('validnotbefore_from?', validate_pkidate, + DateTime('validnotbefore_from?', doc=_('Valid not before from this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('validnotbefore_to?', validate_pkidate, + DateTime('validnotbefore_to?', doc=_('Valid not before to this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('issuedon_from?', validate_pkidate, + DateTime('issuedon_from?', doc=_('Issued on from this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('issuedon_to?', validate_pkidate, + DateTime('issuedon_to?', doc=_('Issued on to this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('revokedon_from?', validate_pkidate, + DateTime('revokedon_from?', doc=_('Revoked on from this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), - Str('revokedon_to?', validate_pkidate, + DateTime('revokedon_to?', doc=_('Revoked on to this date (YYYY-mm-dd)'), + normalizer=normalize_pkidate, autofill=False, ), + Flag('pkey_only?', + label=_("Primary key only"), + doc=_("Results should contain primary key attribute only " + "(\"certificate\")"), + ), Int('sizelimit?', - label=_('Size Limit'), - doc=_('Maximum number of certs returned'), - flags=['no_display'], + label=_("Size Limit"), + doc=_("Maximum number of entries returned (0 is unlimited)"), minvalue=0, - default=100, - ), - ) - - has_output = output.standard_list_of_entries - has_output_params = ( - Str('serial_number_hex', - label=_('Serial number (hex)'), - ), - Str('serial_number', - label=_('Serial number'), - ), - Str('status', - label=_('Status'), - ), - Str('subject', - label=_('Subject'), ), ) @@ -856,7 +863,8 @@ class cert_find(Command): '%(count)d certificate matched', '%(count)d certificates matched', 0 ) - def execute(self, **options): + def execute(self, criteria=None, all=False, raw=False, pkey_only=False, + sizelimit=None, **options): ca_enabled_check() if 'cacn' in options: @@ -870,8 +878,48 @@ class cert_find(Command): else: options['issuer'] = ca_sdn + if criteria is not None: + return dict(result=[], count=0, truncated=False) + + obj_seq = [] + + ra_options = {} + for name, value in options.items(): + if isinstance(value, datetime.datetime): + value = value.strftime(PKIDATE_FORMAT) + ra_options[name] = value + if sizelimit is not None and sizelimit != 0: + ra_options['sizelimit'] = sizelimit + + for ra_obj in self.Backend.ra.find(ra_options): + obj = {} + if all: + ra_obj.update( + self.Backend.ra.get_certificate( + str(ra_obj['serial_number']))) + obj_seq.append(obj) + obj.update(ra_obj) + + result = [] + for obj in obj_seq: + if not pkey_only: + if not raw: + if 'certificate' in obj: + obj['certificate'] = ( + obj['certificate'].replace('\r\n', '')) + self.obj._parse(obj) + obj['subject'] = DN(obj['subject']) + obj['issuer'] = DN(obj['issuer']) + obj['revoked'] = ( + obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED')) + else: + serial_number = obj['serial_number'] + obj.clear() + obj['serial_number'] = serial_number + result.append(obj) + ret = dict( - result=self.Backend.ra.find(options) + result=result ) ret['count'] = len(ret['result']) ret['truncated'] = False |
