diff options
-rw-r--r-- | API.txt | 13 | ||||
-rw-r--r-- | VERSION | 4 | ||||
-rw-r--r-- | ipaserver/plugins/cert.py | 272 |
3 files changed, 254 insertions, 35 deletions
@@ -723,23 +723,31 @@ output: Entry('result') output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>]) output: PrimaryKey('value') command: cert_find -args: 1,20,4 +args: 1,28,4 arg: Str('criteria?') option: Flag('all', autofill=True, cli_name='all', default=False) option: Str('cacn?', cli_name='ca') option: Flag('exactly?', autofill=True, default=False) +option: Str('host*', cli_name='hosts') option: DateTime('issuedon_from?', autofill=False) option: DateTime('issuedon_to?', autofill=False) option: DNParam('issuer?', autofill=False) option: Int('max_serial_number?', autofill=False) option: Int('min_serial_number?', autofill=False) +option: Str('no_host*', cli_name='no_hosts') +option: Flag('no_members', autofill=True, default=True) +option: Str('no_service*', cli_name='no_services') +option: Str('no_user*', cli_name='no_users') option: Flag('pkey_only?', autofill=True, default=False) option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Int('revocation_reason?', autofill=False) option: DateTime('revokedon_from?', autofill=False) option: DateTime('revokedon_to?', autofill=False) +option: Str('service*', cli_name='services') option: Int('sizelimit?') option: Str('subject?', autofill=False) +option: Int('timelimit?') +option: Str('user*', cli_name='users') option: DateTime('validnotafter_from?', autofill=False) option: DateTime('validnotafter_to?', autofill=False) option: DateTime('validnotbefore_from?', autofill=False) @@ -775,10 +783,11 @@ option: Int('revocation_reason', autofill=True, default=0) option: Str('version?') output: Output('result') command: cert_show -args: 1,5,3 +args: 1,6,3 arg: Int('serial_number') option: Flag('all', autofill=True, cli_name='all', default=False) option: Str('cacn?', cli_name='ca') +option: Flag('no_members', autofill=True, default=False) option: Str('out?') option: Flag('raw', autofill=True, cli_name='raw', default=False) option: Str('version?') @@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000 # # ######################################################## IPA_API_VERSION_MAJOR=2 -IPA_API_VERSION_MINOR=194 -# Last change: cert: add object plugin +IPA_API_VERSION_MINOR=195 +# Last change: cert: add owner information diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index 3b180ee49..39144ddf3 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -19,6 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +import base64 import binascii import datetime import os @@ -110,6 +111,9 @@ EXAMPLES: Search for certificates based on issuance date ipa cert-find --issuedon-from=2013-02-01 --issuedon-to=2013-02-07 + Search for certificates owned by a specific user: + ipa cert-find --user=user + IPA currently immediately issues (or declines) all certificate requests so the status of a request is not normally useful. This is for future use or the case where a CA does not immediately issue a certificate. @@ -652,9 +656,48 @@ class cert(BaseCertObject): param = param.clone(flags=param.flags - {'no_search'}) yield param + for owner in self._owners(): + yield owner.primary_key.clone_rename( + 'owner_{0}'.format(owner.name), + required=False, + multivalue=True, + primary_key=False, + label=_("Owner %s") % owner.object_name, + flags={'no_create', 'no_update', 'no_search'}, + ) + + def _owners(self): + for name in ('user', 'host', 'service'): + yield self.api.Object[name] + + def _fill_owners(self, obj): + for owner in self._owners(): + container_dn = DN(owner.container_dn, self.api.env.basedn) + name = 'owner_' + owner.name + for dn in obj['owner']: + if dn.endswith(container_dn, 1): + value = owner.get_primary_key_from_dn(dn) + obj.setdefault(name, []).append(value) + + +class CertMethod(BaseCertMethod): + def get_options(self): + for option in super(CertMethod, self).get_options(): + yield option + + for o in self.has_output: + if isinstance(o, (output.Entry, output.ListOfEntries)): + yield Flag( + 'no_members', + doc=_("Suppress processing of membership attributes."), + exclude='webui', + flags={'no_output'}, + ) + break + @register() -class cert_show(Retrieve, BaseCertMethod, VirtualCommand): +class cert_show(Retrieve, CertMethod, VirtualCommand): __doc__ = _('Retrieve an existing certificate.') takes_options = ( @@ -667,7 +710,8 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand): operation="retrieve certificate" - def execute(self, serial_number, all=False, raw=False, **options): + def execute(self, serial_number, all=False, raw=False, no_members=False, + **options): ca_enabled_check() hostname = None try: @@ -697,10 +741,26 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand): "issued by CA '%(ca)s' not found") % dict(serial=serial_number, ca=options['cacn'])) + if all or not no_members: + ldap = self.api.Backend.ldap2 + filter = ldap.make_filter_from_attr( + 'usercertificate', base64.b64decode(result['certificate'])) + try: + entries = ldap.get_entries(base_dn=self.api.env.basedn, + filter=filter, + attrs_list=['']) + except errors.EmptyResult: + entries = [] + for entry in entries: + result.setdefault('owner', []).append(entry.dn) + if not raw: result['certificate'] = result['certificate'].replace('\r\n', '') self.obj._parse(result) result['revoked'] = ('revocation_reason' in result) + if 'owner' in result: + self.obj._fill_owners(result) + del result['owner'] if hostname: # If we have a hostname we want to verify that the subject @@ -712,7 +772,7 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand): @register() -class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand): +class cert_revoke(PKQuery, CertMethod, VirtualCommand): __doc__ = _('Revoke a certificate.') operation = "revoke certificate" @@ -753,7 +813,7 @@ class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand): @register() -class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand): +class cert_remove_hold(PKQuery, CertMethod, VirtualCommand): __doc__ = _('Take a revoked certificate off hold.') has_output_params = ( @@ -782,7 +842,7 @@ class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand): @register() -class cert_find(Search, BaseCertMethod): +class cert_find(Search, CertMethod): __doc__ = _('Search for existing certificates.') takes_options = ( @@ -852,6 +912,11 @@ class cert_find(Search, BaseCertMethod): doc=_("Results should contain primary key attribute only " "(\"certificate\")"), ), + Int('timelimit?', + label=_('Time Limit'), + doc=_('Time limit of search in seconds (0 is unlimited)'), + minvalue=0, + ), Int('sizelimit?', label=_("Size Limit"), doc=_("Maximum number of entries returned (0 is unlimited)"), @@ -863,9 +928,65 @@ class cert_find(Search, BaseCertMethod): '%(count)d certificate matched', '%(count)d certificates matched', 0 ) + def get_options(self): + for option in super(cert_find, self).get_options(): + if option.name == 'no_members': + option = option.clone(default=True, + flags=set(option.flags) | {'no_option'}) + yield option + + for owner in self.obj._owners(): + yield owner.primary_key.clone_rename( + '{0}'.format(owner.name), + required=False, + multivalue=True, + primary_key=False, + query=True, + cli_name='{0}s'.format(owner.name), + doc=(_("Search for certificates with these owner %s.") % + owner.object_name_plural), + label=owner.object_name, + ) + yield owner.primary_key.clone_rename( + 'no_{0}'.format(owner.name), + required=False, + multivalue=True, + primary_key=False, + query=True, + cli_name='no_{0}s'.format(owner.name), + doc=(_("Search for certificates without these owner %s.") % + owner.object_name_plural), + label=owner.object_name, + ) + def execute(self, criteria=None, all=False, raw=False, pkey_only=False, - sizelimit=None, **options): - ca_enabled_check() + no_members=True, timelimit=None, sizelimit=None, **options): + ca_options = {'cacn', + 'revocation_reason', + 'issuer', + 'subject', + 'min_serial_number', 'max_serial_number', + 'exactly', + 'validnotafter_from', 'validnotafter_to', + 'validnotbefore_from', 'validnotbefore_to', + 'issuedon_from', 'issuedon_to', + 'revokedon_from', 'revokedon_to'} + ldap_options = {prefix + owner.name + for owner in self.obj._owners() + for prefix in ('', 'no_')} + has_ca_options = ( + any(name in options for name in ca_options - {'exactly'}) or + options['exactly']) + has_ldap_options = any(name in options for name in ldap_options) + + try: + ca_enabled_check() + except errors.NotFound: + if has_ca_options: + raise + ca_enabled = False + else: + ca_enabled = True if 'cacn' in options: ca_obj = api.Command.ca_show(options['cacn'])['result'] @@ -882,47 +1003,136 @@ class cert_find(Search, BaseCertMethod): return dict(result=[], count=0, truncated=False) obj_seq = [] + obj_dict = {} + truncated = False + + if ca_enabled: + ra_options = {} + for name, value in options.items(): + if name not in ca_options: + continue + if isinstance(value, datetime.datetime): + value = value.strftime(PKIDATE_FORMAT) + ra_options[name] = value + if sizelimit is not None: + if sizelimit != 0: + ra_options['sizelimit'] = sizelimit + sizelimit = 0 + has_ca_options = True + + for ra_obj in self.Backend.ra.find(ra_options): + obj = {} + if ((not pkey_only and all) or + not no_members or + not has_ca_options or + has_ldap_options): + ra_obj.update( + self.Backend.ra.get_certificate( + str(ra_obj['serial_number']))) + cert = base64.b64decode(ra_obj['certificate']) + obj_dict[cert] = obj + obj_seq.append(obj) + obj.update(ra_obj) + + if ((not pkey_only and all) or + not no_members or + not has_ca_options or + has_ldap_options): + ldap = self.api.Backend.ldap2 + + filters = [] + cert_filter = '(usercertificate=*)' + filters.append(cert_filter) + for owner in self.obj._owners(): + oc_filter = ldap.make_filter_from_attr( + 'objectclass', owner.object_class, ldap.MATCH_ALL) + for prefix, rule in (('', ldap.MATCH_ALL), + ('no_', ldap.MATCH_NONE)): + value = options.get(prefix + owner.name) + if value is None: + continue + pkey_filter = ldap.make_filter_from_attr( + owner.primary_key.name, value, rule) + filters.append(oc_filter) + filters.append(pkey_filter) + filter = ldap.combine_filters(filters, ldap.MATCH_ALL) - ra_options = {} - for name, value in options.items(): - if isinstance(value, datetime.datetime): - value = value.strftime(PKIDATE_FORMAT) - ra_options[name] = value - if sizelimit is not None and sizelimit != 0: - ra_options['sizelimit'] = sizelimit - - for ra_obj in self.Backend.ra.find(ra_options): - obj = {} - if all: - ra_obj.update( - self.Backend.ra.get_certificate( - str(ra_obj['serial_number']))) - obj_seq.append(obj) - obj.update(ra_obj) + try: + entries, truncated = ldap.find_entries( + base_dn=self.api.env.basedn, + filter=filter, + attrs_list=['usercertificate'], + time_limit=timelimit, + size_limit=sizelimit, + ) + except errors.EmptyResult: + entries, truncated = [], False + for entry in entries: + seen = set() + for attr in ('usercertificate', 'usercertificate;binary'): + for cert in entry.get(attr, []): + if cert in seen: + continue + seen.add(cert) + try: + obj = obj_dict[cert] + except KeyError: + if has_ca_options: + continue + obj = { + 'certificate': unicode(base64.b64encode(cert))} + obj_seq.append(obj) + obj_dict[cert] = obj + obj.setdefault('owner', []).append(entry.dn) result = [] for obj in obj_seq: + if has_ldap_options and 'owner' not in obj: + continue if not pkey_only: if not raw: if 'certificate' in obj: obj['certificate'] = ( obj['certificate'].replace('\r\n', '')) self.obj._parse(obj) - obj['subject'] = DN(obj['subject']) - obj['issuer'] = DN(obj['issuer']) - obj['revoked'] = ( - obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED')) + if not all: + del obj['certificate'] + del obj['valid_not_before'] + del obj['valid_not_after'] + del obj['md5_fingerprint'] + del obj['sha1_fingerprint'] + if 'subject' in obj: + obj['subject'] = DN(obj['subject']) + if 'issuer' in obj: + obj['issuer'] = DN(obj['issuer']) + if 'status' in obj: + obj['revoked'] = ( + obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED')) + if 'owner' in obj: + if all or not no_members: + self.obj._fill_owners(obj) + del obj['owner'] + else: + if 'certificate' in obj: + if not all: + del obj['certificate'] + if 'owner' in obj: + if not all and no_members: + del obj['owner'] else: - serial_number = obj['serial_number'] - obj.clear() - obj['serial_number'] = serial_number + if 'serial_number' in obj: + serial_number = obj['serial_number'] + obj.clear() + obj['serial_number'] = serial_number + else: + obj.clear() result.append(obj) ret = dict( result=result ) ret['count'] = len(ret['result']) - ret['truncated'] = False + ret['truncated'] = bool(truncated) return ret |