diff options
Diffstat (limited to 'ipaserver/plugins/dogtag.py')
-rw-r--r-- | ipaserver/plugins/dogtag.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/ipaserver/plugins/dogtag.py b/ipaserver/plugins/dogtag.py index d52bb7e98..28bf754cb 100644 --- a/ipaserver/plugins/dogtag.py +++ b/ipaserver/plugins/dogtag.py @@ -237,9 +237,14 @@ digits and nothing else follows. ''' from lxml import etree +import urllib +import urllib2 import datetime +import time from ipapython.dn import DN from ldap.filter import escape_filter_chars +import ipapython.dogtag +from ipapython import ipautil # These are general status return values used when # CMSServlet.outputError() is invoked. @@ -1715,4 +1720,137 @@ class ra(rabase.rabase): return cmd_result + def find(self, options): + """ + Search for certificates + + :param options: dictionary of search options + """ + + def convert_time(value): + """ + Convert time to milliseconds to pass to dogtag + """ + ts = time.strptime(value, '%Y-%m-%d') + return int(time.mktime(ts) * 1000) + + self.debug('%s.find()', self.fullname) + + # Create the root element + page = etree.Element('CertSearchRequest') + + # Make a new document tree + doc = etree.ElementTree(page) + + # This matches the default configuration of the pki tool. + booloptions = {'serialNumberRangeInUse': True, + 'subjectInUse': False, + 'matchExactly': False, + 'revokedByInUse': False, + 'revokedOnInUse': False, + 'revocationReasonInUse': False, + 'issuedByInUse': False, + 'issuedOnInUse': False, + 'validNotBeforeInUse': False, + 'validNotAfterInUse': False, + 'validityLengthInUse': False, + 'certTypeInUse': False} + + if options.get('exactly', False): + booloptions['matchExactly'] = True + + if 'subject' in options: + node = etree.SubElement(page, 'commonName') + node.text = options['subject'] + booloptions['subjectInUse'] = True + + if 'revocation_reason' in options: + node = etree.SubElement(page, 'revocationReason') + node.text = unicode(options['revocation_reason']) + booloptions['revocationReasonInUse'] = True + + if 'min_serial_number' in options: + node = etree.SubElement(page, 'serialFrom') + node.text = unicode(options['min_serial_number']) + + if 'max_serial_number' in options: + node = etree.SubElement(page, 'serialTo') + node.text = unicode(options['max_serial_number']) + + # date_types is a tuple that consists of: + # 1. attribute name passed from IPA API + # 2. attribute name used by REST API + # 3. boolean to set in the REST API + + date_types = ( + ('validnotbefore_from', 'validNotBeforeFrom', 'validNotBeforeInUse'), + ('validnotbefore_to', 'validNotBeforeTo', 'validNotBeforeInUse'), + ('validnotafter_from', 'validNotAfterFrom', 'validNotAfterInUse'), + ('validnotafter_to', 'validNotAfterTo', 'validNotAfterInUse'), + ('issuedon_from', 'issuedOnFrom','issuedOnInUse'), + ('issuedon_to', 'issuedOnTo','issuedOnInUse'), + ('revokedon_from', 'revokedOnFrom','revokedOnInUse'), + ('revokedon_to', 'revokedOnTo','revokedOnInUse'), + ) + + for (attr, dattr, battr) in date_types: + if attr in options: + epoch = convert_time(options[attr]) + node = etree.SubElement(page, dattr) + node.text = unicode(epoch) + booloptions[battr] = True + + # Add the boolean options to our XML document + for opt in booloptions: + e = etree.SubElement(page, opt) + e.text = str(booloptions[opt]).lower() + + payload = etree.tostring(doc, pretty_print=False, xml_declaration=True, encoding='UTF-8') + self.debug('%s.find(): request: %s', self.fullname, payload) + + url = 'http://%s/ca/rest/certs/search?size=%d' % (ipautil.format_netloc(self.ca_host, ipapython.dogtag.configured_constants().UNSECURE_PORT), options.get('sizelimit', 100)) + + opener = urllib2.build_opener() + opener.addheaders = [('Accept-Encoding', 'gzip, deflate'), + ('User-Agent', 'IPA')] + + req = urllib2.Request(url=url, data=payload, headers={'Content-Type': 'application/xml'}) + try: + response = opener.open(req) + except urllib2.HTTPError, e: + self.raise_certificate_operation_error('find', + detail=e.msg) + except urllib2.URLError, e: + self.raise_certificate_operation_error('find', + detail=e.reason) + + data = response.readlines() + self.debug('%s.find(): response: %s', self.fullname, data) + parser = etree.XMLParser() + try: + doc = etree.fromstring(data[0], parser) + except etree.XMLSyntaxError, e: + self.raise_certificate_operation_error('find', + detail=e.msg) + + # Grab all the certificates + certs = doc.xpath('//CertDataInfo') + + results = [] + + for cert in certs: + response_request = {} + response_request['serial_number'] = int(cert.get('id'), 16) # parse as hex + response_request['serial_number_hex'] = u'0x%X' % response_request['serial_number'] + + dn = cert.xpath('SubjectDN') + if len(dn) == 1: + response_request['subject'] = unicode(dn[0].text) + status = cert.xpath('Status') + if len(status) == 1: + response_request['status'] = unicode(status[0].text) + results.append(response_request) + + return results + api.register(ra) |