1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
|
# Copyright (C) 2013 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import re
import ssl
import time
import string
import requests
import socket
import subprocess
import test_util
import test_request
# Utility functions to assist in creating Apache configuration based
# on test suite
DEF_PORT=8000
FQDN = socket.gethostname()
default_vars = dict(
DBPREFIX = '',
SERVER_PORT = DEF_PORT,
SERVER_NAME = FQDN,
TEST_ROOT = '%s/work/httpd' % os.getcwd(),
SERVER_ROOT = '%s/work/httpd' % os.getcwd(),
)
def template_str(txt, vars):
val = string.Template(txt).substitute(vars)
# eval() is a special string one can insert into a template to have the
# Python interpreter evaluate the string. This is intended to allow
# math to be performed in templates.
pattern = re.compile('(eval\s*\(([^()]*)\))')
val = pattern.sub(lambda x: str(eval(x.group(2))), val)
return val
def template_file(infilename, vars):
"""Read a file and perform template substitutions"""
with open(infilename) as f:
return template_str(f.read(), vars)
def write_template_file(infilename, outfilename, vars):
"""Read a file and perform template substitutions"""
replacevars = dict(default_vars.items() + vars.items())
with open(outfilename, 'w') as f:
f.write('%s\n' % template_file(infilename, replacevars))
def stop_apache():
"""Stop the Apache process"""
cwd = os.getcwd()
# no try/except, just let it fail
os.chdir('work/httpd')
p = subprocess.Popen(['./stop'],
close_fds=True)
def restart_apache():
"""Restart the Apache process"""
cwd = os.getcwd()
# no try/except, just let it fail
os.chdir('work/httpd')
p = subprocess.Popen(['./stop'],
close_fds=True)
time.sleep(5)
p = subprocess.Popen(['./start'],
close_fds=True)
os.chdir(cwd)
test_util.wait_for_open_ports(FQDN, DEF_PORT)
EXPECTED = """Expected %r to raise %s.
options = %r
output = %r"""
UNEXPECTED = """Expected %r to raise %s, but caught different.
options = %r
%s: %s"""
class Declarative(object):
"""A declarative-style test suite
A Declarative test suite is controlled by the ``tests``
class variable.
The ``tests`` is a list of dictionaries with the following keys:
``desc``
A name/description of the test
``request``
A (uri, options) triple specifying the uri to run
``expected``
Can be either an ``errors.PublicError`` instance, in which case
the command must fail with the given error; or the
expected result.
The result is checked with ``tests.util.assert_deepequal``.
"""
tests = tuple()
def test_generator(self):
"""
Iterate through tests.
nose reports each one as a separate test.
"""
# Iterate through the tests:
name = self.__class__.__name__
for (i, test) in enumerate(self.tests):
nice = '%s[%d]: %s: %s' % (
name, i, test['request'][0], test.get('desc', '')
)
func = lambda: self.check(nice, **test)
func.description = nice
yield (func,)
def make_request(self, uri, options):
session = requests.Session()
session.mount('https://', test_request.MyAdapter())
verify = dict(verify = options)
port = options.get('port', DEF_PORT)
host = options.get('host', FQDN)
request = session.get('https://%s:%d%s' % (host, port, uri), **verify)
return request
def check(self, nice, desc, request, expected, cipher=None, protocol=None,
expected_str=None, content=None):
# TODO: need way to set auth, etc.
(uri, options) = request
if not 'verify' in options:
options['verify'] = 'work/httpd/alias/ca.pem'
if isinstance(expected, Exception):
self.check_exception(nice, uri, options, expected, expected_str)
else:
self.check_result(nice, uri, options, expected, cipher, protocol,
content)
def check_exception(self, nice, uri, options, expected, expected_str):
"""
With some requests we expect an exception to be raised. See if
is the one we expect. There is at least one case where depending
on the distro either a CertificateError or an SSLError is
thrown but the message of the exception is the same, so compare
that.
"""
klass = expected.__class__
name = klass.__name__
try:
output = self.make_request(uri, options)
except StandardError, e:
pass
else:
raise AssertionError(
EXPECTED % (uri, name, options, output)
)
if not isinstance(e, klass):
if expected_str not in str(e):
raise AssertionError(
UNEXPECTED % (uri, name, options, e.__class__.__name__, e)
)
def check_result(self, nice, uri, options, expected, cipher=None,
protocol=None, content=None):
name = expected.__class__.__name__
request = self.make_request(uri, options)
has_sni = options.get('sni', False)
if content and not content in request.content:
raise AssertionError(
'Expected %s not in %s' % (content, request.content)
)
if cipher:
if has_sni:
raise AssertionError('Cannot do cipher tests in SNI')
client_cipher = request.raw._pool._get_conn().client_cipher
if cipher != client_cipher[0]:
raise AssertionError(
'Expected cipher %s, got %s' % (cipher, client_cipher[0])
)
if protocol:
client_cipher = request.raw._pool._get_conn().client_cipher
if has_sni:
raise AssertionError('Cannot do protocol tests in SNI')
if protocol != client_cipher[1]:
raise AssertionError(
'Expected protocol %s, got %s' % (protocol, client_cipher[1])
)
if expected != request.status_code:
raise AssertionError(
'Expected status %s, got %s' % (expected, request.status_code)
)
|