summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--API.txt13
-rw-r--r--VERSION4
-rw-r--r--ipaserver/plugins/cert.py272
3 files changed, 254 insertions, 35 deletions
diff --git a/API.txt b/API.txt
index 4d16c50c2..ba942799c 100644
--- a/API.txt
+++ b/API.txt
@@ -723,23 +723,31 @@ output: Entry('result')
output: Output('summary', type=[<type 'unicode'>, <type 'NoneType'>])
output: PrimaryKey('value')
command: cert_find
-args: 1,20,4
+args: 1,28,4
arg: Str('criteria?')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
option: Flag('exactly?', autofill=True, default=False)
+option: Str('host*', cli_name='hosts')
option: DateTime('issuedon_from?', autofill=False)
option: DateTime('issuedon_to?', autofill=False)
option: DNParam('issuer?', autofill=False)
option: Int('max_serial_number?', autofill=False)
option: Int('min_serial_number?', autofill=False)
+option: Str('no_host*', cli_name='no_hosts')
+option: Flag('no_members', autofill=True, default=True)
+option: Str('no_service*', cli_name='no_services')
+option: Str('no_user*', cli_name='no_users')
option: Flag('pkey_only?', autofill=True, default=False)
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Int('revocation_reason?', autofill=False)
option: DateTime('revokedon_from?', autofill=False)
option: DateTime('revokedon_to?', autofill=False)
+option: Str('service*', cli_name='services')
option: Int('sizelimit?')
option: Str('subject?', autofill=False)
+option: Int('timelimit?')
+option: Str('user*', cli_name='users')
option: DateTime('validnotafter_from?', autofill=False)
option: DateTime('validnotafter_to?', autofill=False)
option: DateTime('validnotbefore_from?', autofill=False)
@@ -775,10 +783,11 @@ option: Int('revocation_reason', autofill=True, default=0)
option: Str('version?')
output: Output('result')
command: cert_show
-args: 1,5,3
+args: 1,6,3
arg: Int('serial_number')
option: Flag('all', autofill=True, cli_name='all', default=False)
option: Str('cacn?', cli_name='ca')
+option: Flag('no_members', autofill=True, default=False)
option: Str('out?')
option: Flag('raw', autofill=True, cli_name='raw', default=False)
option: Str('version?')
diff --git a/VERSION b/VERSION
index 32e9b79dc..46e17b95f 100644
--- a/VERSION
+++ b/VERSION
@@ -90,5 +90,5 @@ IPA_DATA_VERSION=20100614120000
# #
########################################################
IPA_API_VERSION_MAJOR=2
-IPA_API_VERSION_MINOR=194
-# Last change: cert: add object plugin
+IPA_API_VERSION_MINOR=195
+# Last change: cert: add owner information
diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py
index 3b180ee49..39144ddf3 100644
--- a/ipaserver/plugins/cert.py
+++ b/ipaserver/plugins/cert.py
@@ -19,6 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
+import base64
import binascii
import datetime
import os
@@ -110,6 +111,9 @@ EXAMPLES:
Search for certificates based on issuance date
ipa cert-find --issuedon-from=2013-02-01 --issuedon-to=2013-02-07
+ Search for certificates owned by a specific user:
+ ipa cert-find --user=user
+
IPA currently immediately issues (or declines) all certificate requests so
the status of a request is not normally useful. This is for future use
or the case where a CA does not immediately issue a certificate.
@@ -652,9 +656,48 @@ class cert(BaseCertObject):
param = param.clone(flags=param.flags - {'no_search'})
yield param
+ for owner in self._owners():
+ yield owner.primary_key.clone_rename(
+ 'owner_{0}'.format(owner.name),
+ required=False,
+ multivalue=True,
+ primary_key=False,
+ label=_("Owner %s") % owner.object_name,
+ flags={'no_create', 'no_update', 'no_search'},
+ )
+
+ def _owners(self):
+ for name in ('user', 'host', 'service'):
+ yield self.api.Object[name]
+
+ def _fill_owners(self, obj):
+ for owner in self._owners():
+ container_dn = DN(owner.container_dn, self.api.env.basedn)
+ name = 'owner_' + owner.name
+ for dn in obj['owner']:
+ if dn.endswith(container_dn, 1):
+ value = owner.get_primary_key_from_dn(dn)
+ obj.setdefault(name, []).append(value)
+
+
+class CertMethod(BaseCertMethod):
+ def get_options(self):
+ for option in super(CertMethod, self).get_options():
+ yield option
+
+ for o in self.has_output:
+ if isinstance(o, (output.Entry, output.ListOfEntries)):
+ yield Flag(
+ 'no_members',
+ doc=_("Suppress processing of membership attributes."),
+ exclude='webui',
+ flags={'no_output'},
+ )
+ break
+
@register()
-class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
+class cert_show(Retrieve, CertMethod, VirtualCommand):
__doc__ = _('Retrieve an existing certificate.')
takes_options = (
@@ -667,7 +710,8 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
operation="retrieve certificate"
- def execute(self, serial_number, all=False, raw=False, **options):
+ def execute(self, serial_number, all=False, raw=False, no_members=False,
+ **options):
ca_enabled_check()
hostname = None
try:
@@ -697,10 +741,26 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
"issued by CA '%(ca)s' not found")
% dict(serial=serial_number, ca=options['cacn']))
+ if all or not no_members:
+ ldap = self.api.Backend.ldap2
+ filter = ldap.make_filter_from_attr(
+ 'usercertificate', base64.b64decode(result['certificate']))
+ try:
+ entries = ldap.get_entries(base_dn=self.api.env.basedn,
+ filter=filter,
+ attrs_list=[''])
+ except errors.EmptyResult:
+ entries = []
+ for entry in entries:
+ result.setdefault('owner', []).append(entry.dn)
+
if not raw:
result['certificate'] = result['certificate'].replace('\r\n', '')
self.obj._parse(result)
result['revoked'] = ('revocation_reason' in result)
+ if 'owner' in result:
+ self.obj._fill_owners(result)
+ del result['owner']
if hostname:
# If we have a hostname we want to verify that the subject
@@ -712,7 +772,7 @@ class cert_show(Retrieve, BaseCertMethod, VirtualCommand):
@register()
-class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
+class cert_revoke(PKQuery, CertMethod, VirtualCommand):
__doc__ = _('Revoke a certificate.')
operation = "revoke certificate"
@@ -753,7 +813,7 @@ class cert_revoke(PKQuery, BaseCertMethod, VirtualCommand):
@register()
-class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
+class cert_remove_hold(PKQuery, CertMethod, VirtualCommand):
__doc__ = _('Take a revoked certificate off hold.')
has_output_params = (
@@ -782,7 +842,7 @@ class cert_remove_hold(PKQuery, BaseCertMethod, VirtualCommand):
@register()
-class cert_find(Search, BaseCertMethod):
+class cert_find(Search, CertMethod):
__doc__ = _('Search for existing certificates.')
takes_options = (
@@ -852,6 +912,11 @@ class cert_find(Search, BaseCertMethod):
doc=_("Results should contain primary key attribute only "
"(\"certificate\")"),
),
+ Int('timelimit?',
+ label=_('Time Limit'),
+ doc=_('Time limit of search in seconds (0 is unlimited)'),
+ minvalue=0,
+ ),
Int('sizelimit?',
label=_("Size Limit"),
doc=_("Maximum number of entries returned (0 is unlimited)"),
@@ -863,9 +928,65 @@ class cert_find(Search, BaseCertMethod):
'%(count)d certificate matched', '%(count)d certificates matched', 0
)
+ def get_options(self):
+ for option in super(cert_find, self).get_options():
+ if option.name == 'no_members':
+ option = option.clone(default=True,
+ flags=set(option.flags) | {'no_option'})
+ yield option
+
+ for owner in self.obj._owners():
+ yield owner.primary_key.clone_rename(
+ '{0}'.format(owner.name),
+ required=False,
+ multivalue=True,
+ primary_key=False,
+ query=True,
+ cli_name='{0}s'.format(owner.name),
+ doc=(_("Search for certificates with these owner %s.") %
+ owner.object_name_plural),
+ label=owner.object_name,
+ )
+ yield owner.primary_key.clone_rename(
+ 'no_{0}'.format(owner.name),
+ required=False,
+ multivalue=True,
+ primary_key=False,
+ query=True,
+ cli_name='no_{0}s'.format(owner.name),
+ doc=(_("Search for certificates without these owner %s.") %
+ owner.object_name_plural),
+ label=owner.object_name,
+ )
+
def execute(self, criteria=None, all=False, raw=False, pkey_only=False,
- sizelimit=None, **options):
- ca_enabled_check()
+ no_members=True, timelimit=None, sizelimit=None, **options):
+ ca_options = {'cacn',
+ 'revocation_reason',
+ 'issuer',
+ 'subject',
+ 'min_serial_number', 'max_serial_number',
+ 'exactly',
+ 'validnotafter_from', 'validnotafter_to',
+ 'validnotbefore_from', 'validnotbefore_to',
+ 'issuedon_from', 'issuedon_to',
+ 'revokedon_from', 'revokedon_to'}
+ ldap_options = {prefix + owner.name
+ for owner in self.obj._owners()
+ for prefix in ('', 'no_')}
+ has_ca_options = (
+ any(name in options for name in ca_options - {'exactly'}) or
+ options['exactly'])
+ has_ldap_options = any(name in options for name in ldap_options)
+
+ try:
+ ca_enabled_check()
+ except errors.NotFound:
+ if has_ca_options:
+ raise
+ ca_enabled = False
+ else:
+ ca_enabled = True
if 'cacn' in options:
ca_obj = api.Command.ca_show(options['cacn'])['result']
@@ -882,47 +1003,136 @@ class cert_find(Search, BaseCertMethod):
return dict(result=[], count=0, truncated=False)
obj_seq = []
+ obj_dict = {}
+ truncated = False
+
+ if ca_enabled:
+ ra_options = {}
+ for name, value in options.items():
+ if name not in ca_options:
+ continue
+ if isinstance(value, datetime.datetime):
+ value = value.strftime(PKIDATE_FORMAT)
+ ra_options[name] = value
+ if sizelimit is not None:
+ if sizelimit != 0:
+ ra_options['sizelimit'] = sizelimit
+ sizelimit = 0
+ has_ca_options = True
+
+ for ra_obj in self.Backend.ra.find(ra_options):
+ obj = {}
+ if ((not pkey_only and all) or
+ not no_members or
+ not has_ca_options or
+ has_ldap_options):
+ ra_obj.update(
+ self.Backend.ra.get_certificate(
+ str(ra_obj['serial_number'])))
+ cert = base64.b64decode(ra_obj['certificate'])
+ obj_dict[cert] = obj
+ obj_seq.append(obj)
+ obj.update(ra_obj)
+
+ if ((not pkey_only and all) or
+ not no_members or
+ not has_ca_options or
+ has_ldap_options):
+ ldap = self.api.Backend.ldap2
+
+ filters = []
+ cert_filter = '(usercertificate=*)'
+ filters.append(cert_filter)
+ for owner in self.obj._owners():
+ oc_filter = ldap.make_filter_from_attr(
+ 'objectclass', owner.object_class, ldap.MATCH_ALL)
+ for prefix, rule in (('', ldap.MATCH_ALL),
+ ('no_', ldap.MATCH_NONE)):
+ value = options.get(prefix + owner.name)
+ if value is None:
+ continue
+ pkey_filter = ldap.make_filter_from_attr(
+ owner.primary_key.name, value, rule)
+ filters.append(oc_filter)
+ filters.append(pkey_filter)
+ filter = ldap.combine_filters(filters, ldap.MATCH_ALL)
- ra_options = {}
- for name, value in options.items():
- if isinstance(value, datetime.datetime):
- value = value.strftime(PKIDATE_FORMAT)
- ra_options[name] = value
- if sizelimit is not None and sizelimit != 0:
- ra_options['sizelimit'] = sizelimit
-
- for ra_obj in self.Backend.ra.find(ra_options):
- obj = {}
- if all:
- ra_obj.update(
- self.Backend.ra.get_certificate(
- str(ra_obj['serial_number'])))
- obj_seq.append(obj)
- obj.update(ra_obj)
+ try:
+ entries, truncated = ldap.find_entries(
+ base_dn=self.api.env.basedn,
+ filter=filter,
+ attrs_list=['usercertificate'],
+ time_limit=timelimit,
+ size_limit=sizelimit,
+ )
+ except errors.EmptyResult:
+ entries, truncated = [], False
+ for entry in entries:
+ seen = set()
+ for attr in ('usercertificate', 'usercertificate;binary'):
+ for cert in entry.get(attr, []):
+ if cert in seen:
+ continue
+ seen.add(cert)
+ try:
+ obj = obj_dict[cert]
+ except KeyError:
+ if has_ca_options:
+ continue
+ obj = {
+ 'certificate': unicode(base64.b64encode(cert))}
+ obj_seq.append(obj)
+ obj_dict[cert] = obj
+ obj.setdefault('owner', []).append(entry.dn)
result = []
for obj in obj_seq:
+ if has_ldap_options and 'owner' not in obj:
+ continue
if not pkey_only:
if not raw:
if 'certificate' in obj:
obj['certificate'] = (
obj['certificate'].replace('\r\n', ''))
self.obj._parse(obj)
- obj['subject'] = DN(obj['subject'])
- obj['issuer'] = DN(obj['issuer'])
- obj['revoked'] = (
- obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+ if not all:
+ del obj['certificate']
+ del obj['valid_not_before']
+ del obj['valid_not_after']
+ del obj['md5_fingerprint']
+ del obj['sha1_fingerprint']
+ if 'subject' in obj:
+ obj['subject'] = DN(obj['subject'])
+ if 'issuer' in obj:
+ obj['issuer'] = DN(obj['issuer'])
+ if 'status' in obj:
+ obj['revoked'] = (
+ obj['status'] in (u'REVOKED', u'REVOKED_EXPIRED'))
+ if 'owner' in obj:
+ if all or not no_members:
+ self.obj._fill_owners(obj)
+ del obj['owner']
+ else:
+ if 'certificate' in obj:
+ if not all:
+ del obj['certificate']
+ if 'owner' in obj:
+ if not all and no_members:
+ del obj['owner']
else:
- serial_number = obj['serial_number']
- obj.clear()
- obj['serial_number'] = serial_number
+ if 'serial_number' in obj:
+ serial_number = obj['serial_number']
+ obj.clear()
+ obj['serial_number'] = serial_number
+ else:
+ obj.clear()
result.append(obj)
ret = dict(
result=result
)
ret['count'] = len(ret['result'])
- ret['truncated'] = False
+ ret['truncated'] = bool(truncated)
return ret