summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xintegration.py78
-rw-r--r--ipaserver/plugins/ra.py263
2 files changed, 199 insertions, 142 deletions
diff --git a/integration.py b/integration.py
new file mode 100755
index 00000000..23a9498c
--- /dev/null
+++ b/integration.py
@@ -0,0 +1,78 @@
+#!/usr/bin/python
+
+from base64 import b64encode, b64decode
+from ipalib import api
+
+# certificate with serial number 17
+cert = b64decode("""
+MIIC3zCCAcegAwIBAgIBETANBgkqhkiG9w0BAQUFADA7MRkwFwYDVQQKExBTamNSZWRoYXQgRG9tYW
+luMR4wHAYDVQQDExVDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMDkwMTIyMjMzODA2WhcNMDkwNzIx
+MjMzODA2WjAUMRIwEAYKCZImiZPyLGQBARMCbGwwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAM
+id6i9ri9ldyAXaH4MJSPdUDjdc9+E10hwxw7crFE1K0uvr8YT2e1YotNqv7Q+Bk7KVRrLH6Y5UPlWY
+uSAP8G9t8yjn5Uo3iXU5AqsrRek+pxerD/WocwedF6yjJ/zlQyYyg93h0njJr1lStyVLTyp+VVqtk3
+FSDIwLCWQHOTejAgMBAAGjgZgwgZUwHwYDVR0jBBgwFoAUlz9JZxqVabh4QQOEkxyWt80pIQkwQwYI
+KwYBBQUHAQEENzA1MDMGCCsGAQUFBzABhidodHRwOi8vYS1mOC5zamMucmVkaGF0LmNvbTo5MTgwL2
+NhL29jc3AwDgYDVR0PAQH/BAQDAgXgMB0GA1UdJQQWMBQGCCsGAQUFBwMCBggrBgEFBQcDBDANBgkq
+hkiG9w0BAQUFAAOCAQEAhU+oqPh+rlYFPm0D8HAJ0RIWw9gkNctHUfVGi+NeYTaUAEGWUOpXjLSQgP
+gq1fNBHd+IRLhycwp4uUsFCPE1n3eStmn/D6o9u1eNnTFPj74MLZVQQTXPE8+LBYeHgTUwFuKp2WyW
+9J/BDZ3pDWKYWWMawhD7ext7UhZkpIJODFEaDxiXCfB8GsAEbmfoYFk21znuGQQu3Wu1s6licyunLh
+/W3sxCFGIT9DHxS0GZKimm7M02IPGxK/0TZr0kVcLQx6XGKqiK1464rvl4u60mQjwJwfhawshs84YT
+xFnXZKkvsT3GjfIe/k687TMG3paTFtKkis+u7z0v6355uJzLpQ==
+""")
+
+csr = 'MIIBlDCB/gIBADBVMR0wGwYDVQQKExRVc2Vyc3lzUmVkaGF0LURvbWFpbjEQMA4GA1UECxMHcGtpLWlwYTEiMCAGA1UEAxMZSVBBLVN1YnN5c3RlbS1DZXJ0aWZpY2F0ZTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA3Qmpr81WxbnISmyyhc2ShiPzUvWIrCg5FgJ1QrBl7CRe62Wl/YYiV/DbuMoex1ec7zKfgfSFVFU9/2iwj7Du0sZdXYJNQPdj9yLdPk2tyxdgJuHLdxI0SNgaEFyvmIMP/X9vQN9H5w0/PyrJQscOxc6tbTcYL0ZSSylLQ+diaQECAwEAA'
+
+api.bootstrap(in_server=True, debug=True)
+api.finalize()
+ra = api.Backend.ra
+
+def assert_equal(*vals):
+ val0 = vals[0]
+ for val in vals[1:]:
+ assert val == val0, '%r != %r' % (val, val0)
+
+
+api.log.info('******** Testing ra.check_request_status() ********')
+assert_equal(
+ ra.check_request_status('35'),
+ dict(
+ status='0',
+ serial_number='17',
+ request_status='complete',
+ request_id='35',
+ )
+)
+
+api.log.info('******** Testing ra.get_certificate() ********')
+assert_equal(
+ ra.get_certificate('17'),
+ dict(
+ status='0',
+ certificate=b64encode(cert),
+ )
+)
+
+api.log.info('******** Testing ra.request_certificate() ********')
+assert_equal(
+ ra.request_certificate(csr),
+ dict(
+ status='1',
+ )
+)
+
+api.log.info('******** Testing ra.revoke_certificate() ********')
+assert_equal(
+ ra.revoke_certificate('17', revocation_reason=6), # Put on hold
+ dict(
+ status='0',
+ revoked=True,
+ )
+)
+
+api.log.info('******** Testing ra.take_certificate_off_hold() ********')
+assert_equal(
+ ra.take_certificate_off_hold('17'),
+ dict(
+ taken_off_hold=True,
+ )
+)
diff --git a/ipaserver/plugins/ra.py b/ipaserver/plugins/ra.py
index 107f8dfa..9932e09c 100644
--- a/ipaserver/plugins/ra.py
+++ b/ipaserver/plugins/ra.py
@@ -21,13 +21,14 @@
"""
Backend plugin for IPA-RA.
-IPA-RA provides an access to CA to issue, retrieve, and revoke certificates.
-IPA-RA plugin provides CA interface via the following methods:
- check_request_status to check certificate request status
- get_certificate to retrieve an existing certificate
- request_certificate to request certificate
- revoke_certificate to revoke certificate
- take_certificate_off_hold to take certificate off hold
+The `ra` plugin provides access to the CA to issue, retrieve, and revoke
+certificates via the following methods:
+
+ * `ra.check_request_status()` - check certificate request status.
+ * `ra.get_certificate()` - retrieve an existing certificate.
+ * `ra.request_certificate()` - request a new certificate.
+ * `ra.revoke_certificate()` - revoke a certificate.
+ * `ra.take_certificate_off_hold()` - take a certificate off hold.
"""
import os, stat, subprocess
@@ -43,6 +44,7 @@ from ipalib import api, Backend
from ipalib.errors2 import NetworkError
from ipaserver import servercore
from ipaserver import ipaldap
+from ipalib.constants import TYPE_ERROR
class ra(Backend):
@@ -66,42 +68,68 @@ class ra(Backend):
self.__import_ca_chain()
self.__request_ipa_certificate(self.__generate_ipa_request())
- def _request(self, method, **kw):
+ def _request(self, url, **kw):
"""
- Perform an HTTP request to CA server.
+ Perform an HTTP request.
+
+ :param url: The URL to post to.
+ :param kw: Keyword arguments to encode into POST body.
"""
- # FIXME: should '/ca/ee/ca/%s' be hardcoded, or should it be in Env?
- url = '/ca/ee/ca/%s' % method
- self.info('CA request: %s:%s%s',
- self.env.ca_host, self.env.ca_port, url)
+ uri = 'http://%s:%s%s' % (self.env.ca_host, self.env.ca_port, url)
+ post = urlencode(kw)
+ self.info('request %r', uri)
+ self.debug('request post %r', post)
conn = HTTPConnection(self.env.ca_host, self.env.ca_port)
try:
conn.request('POST', url,
- body=urlencode(kw),
+ body=post,
headers={'Content-type': 'application/x-www-form-urlencoded'},
)
except socket.error, e:
- raise NetworkError(
- uri='http://%s:%d' % (self.env.ca_host, self.env.ca_port),
- error=e.args[1],
- )
+ raise NetworkError(uri=uri, error=e.args[1])
response = conn.getresponse()
(status, reason) = (response.status, response.reason)
data = response.read()
conn.close()
- self.debug('response status: %r', status)
- self.debug('response reason: %r', reason)
- #self.debug('response data: %r', data)
+ self.debug('request status %r', status)
+ self.debug('request reason %s', reason)
+ self.debug('request data %s', data)
return (status, reason, data)
+ def _sslget(self, url, **kw):
+ """
+ Perform an HTTPS request using the ``sslget`` command.
+
+ :param url: The URL to post to.
+ :param kw: Keyword arguments to encode into POST body.
+ """
+ uri = 'https://%s:%d%s' % (self.env.ca_host, self.env.ca_ssl_port, url)
+ post = urlencode(kw)
+ self.info('sslget %r', uri)
+ self.debug('sslget post %r', post)
+ argv = [
+ '/usr/bin/sslget',
+ '-n', self.ipa_certificate_nickname, # nickname
+ '-w', self.pwd_file, # pwfile
+ '-d', self.sec_dir, # dbdir
+ '-e', post, # post
+ '-r', url, # url
+ '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
+ ]
+ (returncode, stdout, stderr) = self.__run(argv)
+ self.debug('sslget returncode %r', returncode)
+ self.debug('sslget stderr %s', stderr)
+ self.debug('sslget stdout %s', stdout)
+ return (returncode, stdout, stderr)
+
def check_request_status(self, request_id):
"""
Check status of a certificate signing request.
:param request_id: request ID
"""
- self.debug('IPA-RA: check_request_status')
- (s, r, data) = self._request('checkRequest',
+ self.debug('%s.check_request_status()', self.fullname)
+ (s, r, data) = self._request('/ca/ee/ca/checkRequest',
requestId=request_id,
xmlOutput='true',
)
@@ -118,7 +146,7 @@ class ra(Backend):
)
if serial_number is not None:
# This was "0x"+serial_number, but we should return it in
- # the same form used in get_certificate()
+ # the same form used as arg to get_certificate(), etc.
response['serial_number'] = serial_number
request_id = self.__find_substring(
data, 'header.requestId = "', '"'
@@ -132,46 +160,19 @@ class ra(Backend):
response['error'] = error
return response
- def __run_sslget(self, args, stdin=None):
- new_args = ["/usr/bin/sslget", "-d", self.sec_dir, "-w", self.pwd_file, "-n", self.ipa_certificate_nickname]
- new_args = new_args + args
- return self.__run(new_args, stdin)
-
- def _sslget(self, url, **kw):
- """
- Perform HTTPS request using ``sslget`` command.
-
- This is only a stop-gap till it is replaced with python-nss.
- """
- post = urlencode(kw)
- self.debug('sslget %s %s', url, post)
- argv = [
- '/usr/bin/sslget',
- '-n', self.ipa_certificate_nickname, # nickname
- '-w', self.pwd_file, # pwfile
- '-d', self.sec_dir, # dbdir
- '-e', post, # post
- '-r', url, # url
- '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
- ]
- (returncode, stdout, stderr) = self.__run(argv)
- self.debug('sslget response %s', stdout)
- return (returncode, stdout, stderr)
-
def get_certificate(self, serial_number=None):
"""
Retrieve an existing certificate.
:param serial_number: certificate serial number
"""
- self.debug('IPA-RA: get_certificate')
+ self.debug('%s.get_certificate()', self.fullname)
issued_certificate = None
(returncode, stdout, stderr) = self._sslget(
'/ca/agent/ca/displayBySerial',
serialNumber=serial_number,
xmlOutput='true',
)
- self.debug("IPA-RA: returncode: %d" % returncode)
response = {}
if (returncode == 0):
issued_certificate = self.__find_substring(
@@ -201,124 +202,102 @@ class ra(Backend):
:param csr: The certificate signing request.
:param request_type: The request type (defaults to ``'pkcs10'``).
"""
- self.debug("IPA-RA: request_certificate")
+ self.debug('%s.request_certificate()', self.fullname)
certificate = None
- (returncode, stdout, stderr) = self._sslget(
- '/ca/ee/ca/profileSubmit',
+ (returncode, stdout, stderr) = self._sslget('/ca/ee/ca/profileSubmit',
profileId='caRAserverCert',
cert_request_type=request_type,
cert_request=csr,
xmlOutput='true',
)
- return_values = {}
- self.debug("IPA-RA: returncode: %d" % returncode)
+ response = {}
if (returncode == 0):
status = self.__find_substring(stdout, "<Status>", "</Status>")
if status is not None:
- self.debug ("status=%s" % status)
- return_values["status"] = status
+ response["status"] = status
request_id = self.__find_substring(stdout, "<Id>", "</Id>")
if request_id is not None:
- self.debug ("request_id=%s" % request_id)
- return_values["request_id"] = request_id
+ response["request_id"] = request_id
serial_number = self.__find_substring(stdout, "<serialno>", "</serialno>")
if serial_number is not None:
- self.debug ("serial_number=%s" % serial_number)
- return_values["serial_number"] = ("0x%s" % serial_number)
+ response["serial_number"] = ("0x%s" % serial_number)
subject = self.__find_substring(stdout, "<SubjectDN>", "</SubjectDN>")
if subject is not None:
- self.debug ("subject=%s" % subject)
- return_values["subject"] = subject
+ response["subject"] = subject
certificate = self.__find_substring(stdout, "<b64>", "</b64>")
if certificate is not None:
- self.debug ("certificate=%s" % certificate)
- return_values["certificate"] = certificate
- if return_values.has_key("status") is False:
- return_values["status"] = "2"
+ response["certificate"] = certificate
+ if response.has_key("status") is False:
+ response["status"] = "2"
else:
- return_values["status"] = str(-returncode)
- return return_values
-
+ response["status"] = str(-returncode)
+ return response
- def revoke_certificate(self, serial_number=None, revocation_reason=0):
+ def revoke_certificate(self, serial_number, revocation_reason=0):
"""
- Revoke a certificate
- :param serial_number: certificate serial number
- :param revocation_reason: revocation reason
- revocationr reasons: 0 - unspecified
- 1 - key compromise
- 2 - ca compromise
- 3 - affiliation changed
- 4 - superseded
- 5 - cessation of operation
- 6 - certificate hold
- 7 - value 7 is not used
- 8 - remove from CRL
- 9 - privilege withdrawn
- 10 - aa compromise
- see RFC 5280 for more details
+ Revoke a certificate.
+
+ The integer ``revocation_reason`` code must have one of these values:
+
+ * ``0`` - unspecified
+ * ``1`` - keyCompromise
+ * ``2`` - cACompromise
+ * ``3`` - affiliationChanged
+ * ``4`` - superseded
+ * ``5`` - cessationOfOperation
+ * ``6`` - certificateHold
+ * ``8`` - removeFromCRL
+ * ``9`` - privilegeWithdrawn
+ * ``10`` - aACompromise
+
+ Note that reason code ``7`` is not used. See RFC 5280 for more details:
+
+ http://www.ietf.org/rfc/rfc5280.txt
+
+ :param serial_number: Certificate serial number.
+ :param revocation_reason: Integer code of revocation reason.
"""
- return_values = {}
- self.debug("IPA-RA: revoke_certificate")
- if revocation_reason is None:
- revocation_reason = 0
- if serial_number is not None:
- if isinstance(serial_number, int):
- serial_number = str(serial_number)
- if isinstance(revocation_reason, int):
- revocation_reason = str(revocation_reason)
- request_info = "op=revoke&revocationReason="+revocation_reason+"&revokeAll=(certRecordId%3D"+serial_number+")&totalRecordCount=1"
- (returncode, stdout, stderr) = self.__run_sslget([
- '-e',
- request_info,
- '-r',
- '/ca/agent/ca/doRevoke',
- '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
- ])
- self.debug("IPA-RA: returncode: %d" % returncode)
- if (returncode == 0):
- return_values["status"] = "0"
- if (stdout.find('revoked = "yes"') > -1):
- return_values["revoked"] = True
- else:
- return_values["revoked"] = False
+ self.debug('%s.revoke_certificate()', self.fullname)
+ if type(revocation_reason) is not int:
+ raise TYPE_ERROR('revocation_reason', int, revocation_reason,
+ type(revocation_reason)
+ )
+ response = {}
+ (returncode, stdout, stderr) = self._sslget('/ca/agent/ca/doRevoke',
+ op='revoke',
+ revocationReason=revocation_reason,
+ revokeAll='(certRecordId=%s)' % serial_number,
+ totalRecordCount=1,
+ )
+ if returncode == 0:
+ response['status'] = '0'
+ if (stdout.find('revoked = "yes"') > -1):
+ response['revoked'] = True
else:
- return_values["status"] = str(-returncode)
+ response['revoked'] = False
else:
- return_values["status"] = "1"
- return return_values
-
+ response['status'] = str(-returncode)
+ return response
- def take_certificate_off_hold(self, serial_number=None):
+ def take_certificate_off_hold(self, serial_number):
"""
- Take revoked certificate off hold
- :param serial_number: certificate serial number
+ Take revoked certificate off hold.
+
+ :param serial_number: Certificate serial number.
"""
- return_values = {}
- self.debug("IPA-RA: revoke_certificate")
- if serial_number is not None:
- if isinstance(serial_number, int):
- serial_number = str(serial_number)
- request_info = "serialNumber="+serial_number
- (returncode, stdout, stderr) = self.__run_sslget([
- '-e',
- request_info,
- '-r',
- '/ca/agent/ca/doUnrevoke',
- '%s:%d' % (self.env.ca_host, self.env.ca_ssl_port),
- ])
- self.debug("IPA-RA: returncode: %d" % returncode)
- if (returncode == 0):
- if (stdout.find('unrevoked = "yes"') > -1):
- return_values["taken_off_hold"] = True
- else:
- return_values["taken_off_hold"] = False
+ response = {}
+ self.debug('%s.take_certificate_off_hold()', self.fullname)
+ (returncode, stdout, stderr) = self._sslget('/ca/agent/ca/doUnrevoke',
+ serialNumber=serial_number,
+ )
+ if (returncode == 0):
+ if (stdout.find('unrevoked = "yes"') > -1):
+ response['taken_off_hold'] = True
else:
- return_values["status"] = str(-returncode)
+ response['taken_off_hold'] = False
else:
- return_values["status"] = "1"
- return return_values
-
+ response['status'] = str(-returncode)
+ return response
def __find_substring(self, str, str1, str2):
sub_str = None