diff options
author | Rob Crittenden <rcritten@redhat.com> | 2014-02-26 16:37:51 -0500 |
---|---|---|
committer | Rob Crittenden <rcritten@redhat.com> | 2014-02-26 16:50:55 -0500 |
commit | 07c27296c2c940cb119386304ebffb4ab41f0fb9 (patch) | |
tree | 201d87fb7f87d734bcec06aef66d8f20d8fb4706 /test/test_request.py | |
parent | c2ac0d128e776f3edb8aeb8920bf41b99742e74c (diff) | |
download | mod_nss-07c27296c2c940cb119386304ebffb4ab41f0fb9.tar.gz mod_nss-07c27296c2c940cb119386304ebffb4ab41f0fb9.tar.xz mod_nss-07c27296c2c940cb119386304ebffb4ab41f0fb9.zip |
Add some basic functional tests.
This tests in an in-tree Apache instance using the local libmodnss.so
shared library, so no pre-installation is necessary.
The tests use python-nose and a hacked python-requests library. It is
hacked so I can obtain the negotiated cipher and protocol as well as
pass a few other things into it.
Tests right now are limited to GET requests.
A new user certificate for 'beta' was added to gencert to do pass/fail
access control testing.
The basic process of the tests are:
- run setup.sh which sets up a new instance with createinstance.sh
and does some variable substitution.
- nosetests -v
I picture multiple test "suites" of different configurations. Right now
there is only one. A template file is provided for each suite.
Tested only on Fedora 20 right now.
Diffstat (limited to 'test/test_request.py')
-rw-r--r-- | test/test_request.py | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/test/test_request.py b/test/test_request.py new file mode 100644 index 0000000..40d8024 --- /dev/null +++ b/test/test_request.py @@ -0,0 +1,190 @@ +# +# Override a slew of methods to have more control over SSL + +import socket +import requests +import urlparse +from urllib3.util import get_host +from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool +import logging + +# Don't bend over backwards for ssl support, assume it is there. +import ssl +try: # Python 3 + from http.client import HTTPConnection, HTTPException + from http.client import HTTP_PORT, HTTPS_PORT + from http.client import HTTPSConnection +except ImportError: + from httplib import HTTPConnection, HTTPException + from httplib import HTTP_PORT, HTTPS_PORT + from httplib import HTTPSConnection + +try: + # python3.2+ + from ssl import match_hostname, CertificateError +except ImportError: + try: + # Older python where the backport from pypi is installed + from backports.ssl_match_hostname import match_hostname, CertificateError + except ImportError: + # Other older python we use the urllib3 bundled copy + from urllib3.packages.ssl_match_hostname import match_hostname, CertificateError + +log = logging.getLogger(__name__) + + +def connection_from_url(url, **kw): + """ + Given a url, return an :class:`.ConnectionPool` instance of its host. + + This is a shortcut for not having to parse out the scheme, host, and port + of the url before creating an :class:`.ConnectionPool` instance. + + :param url: + Absolute URL string that must include the scheme. Port is optional. + + :param \**kw: + Passes additional parameters to the constructor of the appropriate + :class:`.ConnectionPool`. Useful for specifying things like + timeout, maxsize, headers, etc. + + Example: :: + + >>> conn = connection_from_url('http://google.com/') + >>> r = conn.request('GET', '/') + """ + scheme, host, port = get_host(url) + if scheme == 'https': + return MyHTTPSConnectionPool(host, port=port, **kw) + else: + return HTTPConnectionPool(host, port=port, **kw) + +class MyHTTPSConnectionPool(HTTPSConnectionPool): + def __init__(self, host, port=None, + strict=False, timeout=None, maxsize=1, + block=False, headers=None, + key_file=None, cert_file=None, + cert_reqs='CERT_REQUIRED', ca_certs='/etc/ssl/certs/ca-certificates.crt', ssl_version=ssl.PROTOCOL_SSLv23, ciphers=None): + + super(HTTPSConnectionPool, self).__init__(host, port, + strict, timeout, maxsize, + block, headers) + self.key_file = key_file + self.cert_file = cert_file + self.cert_reqs = cert_reqs + self.ca_certs = ca_certs + self.ssl_version = ssl_version + self.ciphers = ciphers + + def _new_conn(self): + """ + Return a fresh :class:`httplib.HTTPSConnection`. + """ + self.num_connections += 1 + log.info("Starting new HTTPS connection (%d): %s" + % (self.num_connections, self.host)) + + #if not ssl: # Platform-specific: Python compiled without +ssl + # if not HTTPSConnection or HTTPSConnection is object: + # raise SSLError("Can't connect to HTTPS URL because the SSL " + # "module is not available.") + + # return HTTPSConnection(host=self.host, port=self.port) + + connection = MyVerifiedHTTPSConnection(host=self.host, port=self.port) + connection.set_cert(key_file=self.key_file, cert_file=self.cert_file, + cert_reqs=self.cert_reqs, ca_certs=self.ca_certs) + connection.set_ssl_version(self.ssl_version) + connection.set_ciphers(self.ciphers) + return connection + +class MyVerifiedHTTPSConnection(HTTPSConnection): + """ + Based on httplib.HTTPSConnection but wraps the socket with + SSL certification. + """ + cert_reqs = None + ca_certs = None + client_cipher = None + + def set_cert(self, key_file=None, cert_file=None, + cert_reqs='CERT_NONE', ca_certs=None): + ssl_req_scheme = { + 'CERT_NONE': ssl.CERT_NONE, + 'CERT_OPTIONAL': ssl.CERT_OPTIONAL, + 'CERT_REQUIRED': ssl.CERT_REQUIRED + } + + self.key_file = key_file + self.cert_file = cert_file + self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE + self.ca_certs = ca_certs + + def set_ssl_version(self, ssl_version=ssl.PROTOCOL_SSLv23): + self.ssl_version = ssl_version + + def set_ciphers(self, ciphers=None): + self.ciphers = ciphers + + def connect(self): + # Add certificate verification + sock = socket.create_connection((self.host, self.port), self.timeout) + + # Wrap socket using verification with the root certs in + # trusted_root_certs + self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file, + cert_reqs=self.cert_reqs, + ca_certs=self.ca_certs, + ssl_version=self.ssl_version, + ciphers=self.ciphers) + if self.ca_certs: + match_hostname(self.sock.getpeercert(), self.host) + + def close(self): + self.client_cipher = self.sock.cipher() + HTTPSConnection.close(self) + +class MyAdapter(requests.adapters.HTTPAdapter): + + def get_connection(self, url, proxies=None): + """Returns a connection for the given URL.""" + + # proxies are not supported + return connection_from_url(url) + + def cert_verify(self, conn, url, verify, cert): + # I'm overloading the content of verify since this API is so + # braindead. If verify is a dict then key 'verify' represents the + # original meaning, the other keys are my own. + if isinstance(verify, bool): + super(MyAdapter, self).cert_verify(conn, url, verify, cert) + elif isinstance(verify, dict): + if 'verify' in verify: + super(MyAdapter, self).cert_verify(conn, url, + verify['verify'], cert) + if 'ssl_version' in verify: + conn.ssl_version = verify['ssl_version'] + if 'ciphers' in verify: + conn.ciphers = verify['ciphers'] + if 'cert_file' in verify: + conn.cert_file = verify['cert_file'] + if 'key_file' in verify: + conn.key_file = verify['key_file'] + else: # huh? Do nothing + pass + +""" +s = requests.Session() +s.mount('https://', MyAdapter()) +try: + r = s.get('https://darlene.greyoak.com:8000/', verify={'verify': False, 'ssl_version': ssl.PROTOCOL_SSLv23, 'ciphers': 'HIGH'}) + cipher = r.raw._pool._get_conn().client_cipher +except requests.exceptions.SSLError, e: + print e.message +else: + print r.status_code + print cipher + +#request = requests.get('https://darlene.greyoak.com:8000/', verify=False) +#print request.status_code +""" |