diff options
-rwxr-xr-x | integration.py | 78 | ||||
-rw-r--r-- | ipaserver/plugins/ra.py | 263 |
2 files changed, 199 insertions, 142 deletions
diff --git a/integration.py b/integration.py new file mode 100755 index 00000000..23a9498c --- /dev/null +++ b/integration.py @@ -0,0 +1,78 @@ +#!/usr/bin/python + +from base64 import b64encode, b64decode +from ipalib import api + +# certificate with serial number 17 +cert = b64decode(""" +MIIC3zCCAcegAwIBAgIBETANBgkqhkiG9w0BAQUFADA7MRkwFwYDVQQKExBTamNSZWRoYXQgRG9tYW +luMR4wHAYDVQQDExVDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMDkwMTIyMjMzODA2WhcNMDkwNzIx +MjMzODA2WjAUMRIwEAYKCZImiZPyLGQBARMCbGwwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAM +id6i9ri9ldyAXaH4MJSPdUDjdc9+E10hwxw7crFE1K0uvr8YT2e1YotNqv7Q+Bk7KVRrLH6Y5UPlWY +uSAP8G9t8yjn5Uo3iXU5AqsrRek+pxerD/WocwedF6yjJ/zlQyYyg93h0njJr1lStyVLTyp+VVqtk3 +FSDIwLCWQHOTejAgMBAAGjgZgwgZUwHwYDVR0jBBgwFoAUlz9JZxqVabh4QQOEkxyWt80pIQkwQwYI +KwYBBQUHAQEENzA1MDMGCCsGAQUFBzABhidodHRwOi8vYS1mOC5zamMucmVkaGF0LmNvbTo5MTgwL2 +NhL29jc3AwDgYDVR0PAQH/BAQDAgXgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDBDANBgkq +hkiG9w0BAQUFAAOCAQEAhU+oqPh+rlYFPm0D8HAJ0RIWw9gkNctHUfVGi+NeYTaUAEGWUOpXjLSQgP +gq1fNBHd+IRLhycwp4uUsFCPE1n3eStmn/D6o9u1eNnTFPj74MLZVQQTXPE8+LBYeHgTUwFuKp2WyW +9J/BDZ3pDWKYWWMawhD7ext7UhZkpIJODFEaDxiXCfB8GsAEbmfoYFk21znuGQQu3Wu1s6licyunLh +/W3sxCFGIT9DHxS0GZKimm7M02IPGxK/0TZr0kVcLQx6XGKqiK1464rvl4u60mQjwJwfhawshs84YT +xFnXZKkvsT3GjfIe/k687TMG3paTFtKkis+u7z0v6355uJzLpQ== +""") + +csr = 'MIIBlDCB/gIBADBVMR0wGwYDVQQKExRVc2Vyc3lzUmVkaGF0LURvbWFpbjEQMA4GA1UECxMHcGtpLWlwYTEiMCAGA1UEAxMZSVBBLVN1YnN5c3RlbS1DZXJ0aWZpY2F0ZTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA3Qmpr81WxbnISmyyhc2ShiPzUvWIrCg5FgJ1QrBl7CRe62Wl/YYiV/DbuMoex1ec7zKfgfSFVFU9/2iwj7Du0sZdXYJNQPdj9yLdPk2tyxdgJuHLdxI0SNgaEFyvmIMP/X9vQN9H5w0/PyrJQscOxc6tbTcYL0ZSSylLQ+diaQECAwEAA' + +api.bootstrap(in_server=True, debug=True) +api.finalize() +ra = api.Backend.ra + +def assert_equal(*vals): + val0 = vals[0] + for val in vals[1:]: + assert val == val0, '%r != %r' % (val, val0) + + +api.log.info('******** Testing ra.check_request_status() ********') +assert_equal( + ra.check_request_status('35'), + dict( + status='0', + serial_number='17', + request_status='complete', + request_id='35', + ) +) + +api.log.info('******** Testing ra.get_certificate() ********') +assert_equal( + ra.get_certificate('17'), + dict( + status='0', + certificate=b64encode(cert), + ) +) + +api.log.info('******** Testing ra.request_certificate() ********') +assert_equal( + ra.request_certificate(csr), + dict( + status='1', + ) +) + +api.log.info('******** Testing ra.revoke_certificate() ********') +assert_equal( + ra.revoke_certificate('17', revocation_reason=6), # Put on hold + dict( + status='0', + revoked=True, + ) +) + +api.log.info('******** Testing ra.take_certificate_off_hold() ********') +assert_equal( + ra.take_certificate_off_hold('17'), + dict( + taken_off_hold=True, + ) +) diff --git a/ipaserver/plugins/ra.py b/ipaserver/plugins/ra.py index 107f8dfa..9932e09c 100644 --- a/ipaserver/plugins/ra.py +++ b/ipaserver/plugins/ra.py @@ -21,13 +21,14 @@ """ Backend plugin for IPA-RA. -IPA-RA provides an access to CA to issue, retrieve, and revoke certificates. -IPA-RA plugin provides CA interface via the following methods: - check_request_status to check certificate request status - get_certificate to retrieve an existing certificate - request_certificate to request certificate - revoke_certificate to revoke certificate - take_certificate_off_hold to take certificate off hold +The `ra` plugin provides access to the CA to issue, retrieve, and revoke +certificates via the following methods: + + * `ra.check_request_status()` - check certificate request status. + * `ra.get_certificate()` - retrieve an existing certificate. + * `ra.request_certificate()` - request a new certificate. + * `ra.revoke_certificate()` - revoke a certificate. + * `ra.take_certificate_off_hold()` - take a certificate off hold. """ import os, stat, subprocess @@ -43,6 +44,7 @@ from ipalib import api, Backend from ipalib.errors2 import NetworkError from ipaserver import servercore from ipaserver import ipaldap +from ipalib.constants import TYPE_ERROR class ra(Backend): @@ -66,42 +68,68 @@ class ra(Backend): self.__import_ca_chain() self.__request_ipa_certificate(self.__generate_ipa_request()) - def _request(self, method, **kw): + def _request(self, url, **kw): """ - Perform an HTTP request to CA server. + Perform an HTTP request. + + :param url: The URL to post to. + :param kw: Keyword arguments to encode into POST body. """ - # FIXME: should '/ca/ee/ca/%s' be hardcoded, or should it be in Env? - url = '/ca/ee/ca/%s' % method - self.info('CA request: %s:%s%s', - self.env.ca_host, self.env.ca_port, url) + uri = 'http://%s:%s%s' % (self.env.ca_host, self.env.ca_port, url) + post = urlencode(kw) + self.info('request %r', uri) + self.debug('request post %r', post) conn = HTTPConnection(self.env.ca_host, self.env.ca_port) try: conn.request('POST', url, - body=urlencode(kw), + body=post, headers={'Content-type': 'application/x-www-form-urlencoded'}, ) except socket.error, e: - raise NetworkError( - uri='http://%s:%d' % (self.env.ca_host, self.env.ca_port), - error=e.args[1], - ) + raise NetworkError(uri=uri, error=e.args[1]) response = conn.getresponse() (status, reason) = (response.status, response.reason) data = response.read() conn.close() - self.debug('response status: %r', status) - self.debug('response reason: %r', reason) - #self.debug('response data: %r', data) + self.debug('request status %r', status) + self.debug('request reason %s', reason) + self.debug('request data %s', data) return (status, reason, data) + def _sslget(self, url, **kw): + """ + Perform an HTTPS request using the ``sslget`` command. + + :param url: The URL to post to. + :param kw: Keyword arguments to encode into POST body. + """ + uri = 'https://%s:%d%s' % (self.env.ca_host, self.env.ca_ssl_port, url) + post = urlencode(kw) + self.info('sslget %r', uri) + self.debug('sslget post %r', post) + argv = [ + '/usr/bin/sslget', + '-n', self.ipa_certificate_nickname, # nickname + '-w', self.pwd_file, # pwfile + '-d', self.sec_dir, # dbdir + '-e', post, # post + '-r', url, # url + '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port), + ] + (returncode, stdout, stderr) = self.__run(argv) + self.debug('sslget returncode %r', returncode) + self.debug('sslget stderr %s', stderr) + self.debug('sslget stdout %s', stdout) + return (returncode, stdout, stderr) + def check_request_status(self, request_id): """ Check status of a certificate signing request. :param request_id: request ID """ - self.debug('IPA-RA: check_request_status') - (s, r, data) = self._request('checkRequest', + self.debug('%s.check_request_status()', self.fullname) + (s, r, data) = self._request('/ca/ee/ca/checkRequest', requestId=request_id, xmlOutput='true', ) @@ -118,7 +146,7 @@ class ra(Backend): ) if serial_number is not None: # This was "0x"+serial_number, but we should return it in - # the same form used in get_certificate() + # the same form used as arg to get_certificate(), etc. response['serial_number'] = serial_number request_id = self.__find_substring( data, 'header.requestId = "', '"' @@ -132,46 +160,19 @@ class ra(Backend): response['error'] = error return response - def __run_sslget(self, args, stdin=None): - new_args = ["/usr/bin/sslget", "-d", self.sec_dir, "-w", self.pwd_file, "-n", self.ipa_certificate_nickname] - new_args = new_args + args - return self.__run(new_args, stdin) - - def _sslget(self, url, **kw): - """ - Perform HTTPS request using ``sslget`` command. - - This is only a stop-gap till it is replaced with python-nss. - """ - post = urlencode(kw) - self.debug('sslget %s %s', url, post) - argv = [ - '/usr/bin/sslget', - '-n', self.ipa_certificate_nickname, # nickname - '-w', self.pwd_file, # pwfile - '-d', self.sec_dir, # dbdir - '-e', post, # post - '-r', url, # url - '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port), - ] - (returncode, stdout, stderr) = self.__run(argv) - self.debug('sslget response %s', stdout) - return (returncode, stdout, stderr) - def get_certificate(self, serial_number=None): """ Retrieve an existing certificate. :param serial_number: certificate serial number """ - self.debug('IPA-RA: get_certificate') + self.debug('%s.get_certificate()', self.fullname) issued_certificate = None (returncode, stdout, stderr) = self._sslget( '/ca/agent/ca/displayBySerial', serialNumber=serial_number, xmlOutput='true', ) - self.debug("IPA-RA: returncode: %d" % returncode) response = {} if (returncode == 0): issued_certificate = self.__find_substring( @@ -201,124 +202,102 @@ class ra(Backend): :param csr: The certificate signing request. :param request_type: The request type (defaults to ``'pkcs10'``). """ - self.debug("IPA-RA: request_certificate") + self.debug('%s.request_certificate()', self.fullname) certificate = None - (returncode, stdout, stderr) = self._sslget( - '/ca/ee/ca/profileSubmit', + (returncode, stdout, stderr) = self._sslget('/ca/ee/ca/profileSubmit', profileId='caRAserverCert', cert_request_type=request_type, cert_request=csr, xmlOutput='true', ) - return_values = {} - self.debug("IPA-RA: returncode: %d" % returncode) + response = {} if (returncode == 0): status = self.__find_substring(stdout, "<Status>", "</Status>") if status is not None: - self.debug ("status=%s" % status) - return_values["status"] = status + response["status"] = status request_id = self.__find_substring(stdout, "<Id>", "</Id>") if request_id is not None: - self.debug ("request_id=%s" % request_id) - return_values["request_id"] = request_id + response["request_id"] = request_id serial_number = self.__find_substring(stdout, "<serialno>", "</serialno>") if serial_number is not None: - self.debug ("serial_number=%s" % serial_number) - return_values["serial_number"] = ("0x%s" % serial_number) + response["serial_number"] = ("0x%s" % serial_number) subject = self.__find_substring(stdout, "<SubjectDN>", "</SubjectDN>") if subject is not None: - self.debug ("subject=%s" % subject) - return_values["subject"] = subject + response["subject"] = subject certificate = self.__find_substring(stdout, "<b64>", "</b64>") if certificate is not None: - self.debug ("certificate=%s" % certificate) - return_values["certificate"] = certificate - if return_values.has_key("status") is False: - return_values["status"] = "2" + response["certificate"] = certificate + if response.has_key("status") is False: + response["status"] = "2" else: - return_values["status"] = str(-returncode) - return return_values - + response["status"] = str(-returncode) + return response - def revoke_certificate(self, serial_number=None, revocation_reason=0): + def revoke_certificate(self, serial_number, revocation_reason=0): """ - Revoke a certificate - :param serial_number: certificate serial number - :param revocation_reason: revocation reason - revocationr reasons: 0 - unspecified - 1 - key compromise - 2 - ca compromise - 3 - affiliation changed - 4 - superseded - 5 - cessation of operation - 6 - certificate hold - 7 - value 7 is not used - 8 - remove from CRL - 9 - privilege withdrawn - 10 - aa compromise - see RFC 5280 for more details + Revoke a certificate. + + The integer ``revocation_reason`` code must have one of these values: + + * ``0`` - unspecified + * ``1`` - keyCompromise + * ``2`` - cACompromise + * ``3`` - affiliationChanged + * ``4`` - superseded + * ``5`` - cessationOfOperation + * ``6`` - certificateHold + * ``8`` - removeFromCRL + * ``9`` - privilegeWithdrawn + * ``10`` - aACompromise + + Note that reason code ``7`` is not used. See RFC 5280 for more details: + + http://www.ietf.org/rfc/rfc5280.txt + + :param serial_number: Certificate serial number. + :param revocation_reason: Integer code of revocation reason. """ - return_values = {} - self.debug("IPA-RA: revoke_certificate") - if revocation_reason is None: - revocation_reason = 0 - if serial_number is not None: - if isinstance(serial_number, int): - serial_number = str(serial_number) - if isinstance(revocation_reason, int): - revocation_reason = str(revocation_reason) - request_info = "op=revoke&revocationReason="+revocation_reason+"&revokeAll=(certRecordId%3D"+serial_number+")&totalRecordCount=1" - (returncode, stdout, stderr) = self.__run_sslget([ - '-e', - request_info, - '-r', - '/ca/agent/ca/doRevoke', - '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port), - ]) - self.debug("IPA-RA: returncode: %d" % returncode) - if (returncode == 0): - return_values["status"] = "0" - if (stdout.find('revoked = "yes"') > -1): - return_values["revoked"] = True - else: - return_values["revoked"] = False + self.debug('%s.revoke_certificate()', self.fullname) + if type(revocation_reason) is not int: + raise TYPE_ERROR('revocation_reason', int, revocation_reason, + type(revocation_reason) + ) + response = {} + (returncode, stdout, stderr) = self._sslget('/ca/agent/ca/doRevoke', + op='revoke', + revocationReason=revocation_reason, + revokeAll='(certRecordId=%s)' % serial_number, + totalRecordCount=1, + ) + if returncode == 0: + response['status'] = '0' + if (stdout.find('revoked = "yes"') > -1): + response['revoked'] = True else: - return_values["status"] = str(-returncode) + response['revoked'] = False else: - return_values["status"] = "1" - return return_values - + response['status'] = str(-returncode) + return response - def take_certificate_off_hold(self, serial_number=None): + def take_certificate_off_hold(self, serial_number): """ - Take revoked certificate off hold - :param serial_number: certificate serial number + Take revoked certificate off hold. + + :param serial_number: Certificate serial number. """ - return_values = {} - self.debug("IPA-RA: revoke_certificate") - if serial_number is not None: - if isinstance(serial_number, int): - serial_number = str(serial_number) - request_info = "serialNumber="+serial_number - (returncode, stdout, stderr) = self.__run_sslget([ - '-e', - request_info, - '-r', - '/ca/agent/ca/doUnrevoke', - '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port), - ]) - self.debug("IPA-RA: returncode: %d" % returncode) - if (returncode == 0): - if (stdout.find('unrevoked = "yes"') > -1): - return_values["taken_off_hold"] = True - else: - return_values["taken_off_hold"] = False + response = {} + self.debug('%s.take_certificate_off_hold()', self.fullname) + (returncode, stdout, stderr) = self._sslget('/ca/agent/ca/doUnrevoke', + serialNumber=serial_number, + ) + if (returncode == 0): + if (stdout.find('unrevoked = "yes"') > -1): + response['taken_off_hold'] = True else: - return_values["status"] = str(-returncode) + response['taken_off_hold'] = False else: - return_values["status"] = "1" - return return_values - + response['status'] = str(-returncode) + return response def __find_substring(self, str, str1, str2): sub_str = None |