diff options
Diffstat (limited to 'lib/subunit/python/subunit/tests')
-rw-r--r-- | lib/subunit/python/subunit/tests/TestUtil.py | 80 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/__init__.py | 43 | ||||
-rwxr-xr-x | lib/subunit/python/subunit/tests/sample-script.py | 21 | ||||
-rwxr-xr-x | lib/subunit/python/subunit/tests/sample-two-script.py | 7 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_chunked.py | 152 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_details.py | 112 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_progress_model.py | 118 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_run.py | 52 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_subunit_filter.py | 370 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_subunit_stats.py | 84 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_subunit_tags.py | 69 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_tap2subunit.py | 445 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_protocol.py | 1337 | ||||
-rw-r--r-- | lib/subunit/python/subunit/tests/test_test_results.py | 572 |
14 files changed, 0 insertions, 3462 deletions
diff --git a/lib/subunit/python/subunit/tests/TestUtil.py b/lib/subunit/python/subunit/tests/TestUtil.py deleted file mode 100644 index 39d901e0a9..0000000000 --- a/lib/subunit/python/subunit/tests/TestUtil.py +++ /dev/null @@ -1,80 +0,0 @@ -# Copyright (c) 2004 Canonical Limited -# Author: Robert Collins <robert.collins@canonical.com> -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -# - -import sys -import logging -import unittest - - -class LogCollector(logging.Handler): - def __init__(self): - logging.Handler.__init__(self) - self.records=[] - def emit(self, record): - self.records.append(record.getMessage()) - - -def makeCollectingLogger(): - """I make a logger instance that collects its logs for programmatic analysis - -> (logger, collector)""" - logger=logging.Logger("collector") - handler=LogCollector() - handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s")) - logger.addHandler(handler) - return logger, handler - - -def visitTests(suite, visitor): - """A foreign method for visiting the tests in a test suite.""" - for test in suite._tests: - #Abusing types to avoid monkey patching unittest.TestCase. - # Maybe that would be better? - try: - test.visit(visitor) - except AttributeError: - if isinstance(test, unittest.TestCase): - visitor.visitCase(test) - elif isinstance(test, unittest.TestSuite): - visitor.visitSuite(test) - visitTests(test, visitor) - else: - print ("unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__)) - - -class TestSuite(unittest.TestSuite): - """I am an extended TestSuite with a visitor interface. - This is primarily to allow filtering of tests - and suites or - more in the future. An iterator of just tests wouldn't scale...""" - - def visit(self, visitor): - """visit the composite. Visiting is depth-first. - current callbacks are visitSuite and visitCase.""" - visitor.visitSuite(self) - visitTests(self, visitor) - - -class TestLoader(unittest.TestLoader): - """Custome TestLoader to set the right TestSuite class.""" - suiteClass = TestSuite - -class TestVisitor(object): - """A visitor for Tests""" - def visitSuite(self, aTestSuite): - pass - def visitCase(self, aTestCase): - pass diff --git a/lib/subunit/python/subunit/tests/__init__.py b/lib/subunit/python/subunit/tests/__init__.py deleted file mode 100644 index e0e1eb1b04..0000000000 --- a/lib/subunit/python/subunit/tests/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -from subunit.tests import ( - TestUtil, - test_chunked, - test_details, - test_progress_model, - test_run, - test_subunit_filter, - test_subunit_stats, - test_subunit_tags, - test_tap2subunit, - test_test_protocol, - test_test_results, - ) - -def test_suite(): - result = TestUtil.TestSuite() - result.addTest(test_chunked.test_suite()) - result.addTest(test_details.test_suite()) - result.addTest(test_progress_model.test_suite()) - result.addTest(test_test_results.test_suite()) - result.addTest(test_test_protocol.test_suite()) - result.addTest(test_tap2subunit.test_suite()) - result.addTest(test_subunit_filter.test_suite()) - result.addTest(test_subunit_tags.test_suite()) - result.addTest(test_subunit_stats.test_suite()) - result.addTest(test_run.test_suite()) - return result diff --git a/lib/subunit/python/subunit/tests/sample-script.py b/lib/subunit/python/subunit/tests/sample-script.py deleted file mode 100755 index 91838f6d6f..0000000000 --- a/lib/subunit/python/subunit/tests/sample-script.py +++ /dev/null @@ -1,21 +0,0 @@ -#!/usr/bin/env python -import sys -if sys.platform == "win32": - import msvcrt, os - msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY) -if len(sys.argv) == 2: - # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args - # uses this code path to be sure that the arguments were passed to - # sample-script.py - print("test fail") - print("error fail") - sys.exit(0) -print("test old mcdonald") -print("success old mcdonald") -print("test bing crosby") -print("failure bing crosby [") -print("foo.c:53:ERROR invalid state") -print("]") -print("test an error") -print("error an error") -sys.exit(0) diff --git a/lib/subunit/python/subunit/tests/sample-two-script.py b/lib/subunit/python/subunit/tests/sample-two-script.py deleted file mode 100755 index fc73dfc409..0000000000 --- a/lib/subunit/python/subunit/tests/sample-two-script.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python -import sys -print("test old mcdonald") -print("success old mcdonald") -print("test bing crosby") -print("success bing crosby") -sys.exit(0) diff --git a/lib/subunit/python/subunit/tests/test_chunked.py b/lib/subunit/python/subunit/tests/test_chunked.py deleted file mode 100644 index e0742f1af3..0000000000 --- a/lib/subunit/python/subunit/tests/test_chunked.py +++ /dev/null @@ -1,152 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -import unittest - -from testtools.compat import _b, BytesIO - -import subunit.chunked - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result - - -class TestDecode(unittest.TestCase): - - def setUp(self): - unittest.TestCase.setUp(self) - self.output = BytesIO() - self.decoder = subunit.chunked.Decoder(self.output) - - def test_close_read_length_short_errors(self): - self.assertRaises(ValueError, self.decoder.close) - - def test_close_body_short_errors(self): - self.assertEqual(None, self.decoder.write(_b('2\r\na'))) - self.assertRaises(ValueError, self.decoder.close) - - def test_close_body_buffered_data_errors(self): - self.assertEqual(None, self.decoder.write(_b('2\r'))) - self.assertRaises(ValueError, self.decoder.close) - - def test_close_after_finished_stream_safe(self): - self.assertEqual(None, self.decoder.write(_b('2\r\nab'))) - self.assertEqual(_b(''), self.decoder.write(_b('0\r\n'))) - self.decoder.close() - - def test_decode_nothing(self): - self.assertEqual(_b(''), self.decoder.write(_b('0\r\n'))) - self.assertEqual(_b(''), self.output.getvalue()) - - def test_decode_serialised_form(self): - self.assertEqual(None, self.decoder.write(_b("F\r\n"))) - self.assertEqual(None, self.decoder.write(_b("serialised\n"))) - self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n"))) - - def test_decode_short(self): - self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n'))) - self.assertEqual(_b('abc'), self.output.getvalue()) - - def test_decode_combines_short(self): - self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n'))) - self.assertEqual(_b('abcdef'), self.output.getvalue()) - - def test_decode_excess_bytes_from_write(self): - self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234'))) - self.assertEqual(_b('abc'), self.output.getvalue()) - - def test_decode_write_after_finished_errors(self): - self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234'))) - self.assertRaises(ValueError, self.decoder.write, _b('')) - - def test_decode_hex(self): - self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n'))) - self.assertEqual(_b('1234567890'), self.output.getvalue()) - - def test_decode_long_ranges(self): - self.assertEqual(None, self.decoder.write(_b('10000\r\n'))) - self.assertEqual(None, self.decoder.write(_b('1' * 65536))) - self.assertEqual(None, self.decoder.write(_b('10000\r\n'))) - self.assertEqual(None, self.decoder.write(_b('2' * 65536))) - self.assertEqual(_b(''), self.decoder.write(_b('0\r\n'))) - self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue()) - - def test_decode_newline_nonstrict(self): - """Tolerate chunk markers with no CR character.""" - # From <http://pad.lv/505078> - self.decoder = subunit.chunked.Decoder(self.output, strict=False) - self.assertEqual(None, self.decoder.write(_b('a\n'))) - self.assertEqual(None, self.decoder.write(_b('abcdeabcde'))) - self.assertEqual(_b(''), self.decoder.write(_b('0\n'))) - self.assertEqual(_b('abcdeabcde'), self.output.getvalue()) - - def test_decode_strict_newline_only(self): - """Reject chunk markers with no CR character in strict mode.""" - # From <http://pad.lv/505078> - self.assertRaises(ValueError, - self.decoder.write, _b('a\n')) - - def test_decode_strict_multiple_crs(self): - self.assertRaises(ValueError, - self.decoder.write, _b('a\r\r\n')) - - def test_decode_short_header(self): - self.assertRaises(ValueError, - self.decoder.write, _b('\n')) - - -class TestEncode(unittest.TestCase): - - def setUp(self): - unittest.TestCase.setUp(self) - self.output = BytesIO() - self.encoder = subunit.chunked.Encoder(self.output) - - def test_encode_nothing(self): - self.encoder.close() - self.assertEqual(_b('0\r\n'), self.output.getvalue()) - - def test_encode_empty(self): - self.encoder.write(_b('')) - self.encoder.close() - self.assertEqual(_b('0\r\n'), self.output.getvalue()) - - def test_encode_short(self): - self.encoder.write(_b('abc')) - self.encoder.close() - self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue()) - - def test_encode_combines_short(self): - self.encoder.write(_b('abc')) - self.encoder.write(_b('def')) - self.encoder.close() - self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue()) - - def test_encode_over_9_is_in_hex(self): - self.encoder.write(_b('1234567890')) - self.encoder.close() - self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue()) - - def test_encode_long_ranges_not_combined(self): - self.encoder.write(_b('1' * 65536)) - self.encoder.write(_b('2' * 65536)) - self.encoder.close() - self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' + - '2' * 65536 + '0\r\n'), self.output.getvalue()) diff --git a/lib/subunit/python/subunit/tests/test_details.py b/lib/subunit/python/subunit/tests/test_details.py deleted file mode 100644 index 746aa041e5..0000000000 --- a/lib/subunit/python/subunit/tests/test_details.py +++ /dev/null @@ -1,112 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -import unittest - -from testtools.compat import _b, StringIO - -import subunit.tests -from subunit import content, content_type, details - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result - - -class TestSimpleDetails(unittest.TestCase): - - def test_lineReceived(self): - parser = details.SimpleDetailsParser(None) - parser.lineReceived(_b("foo\n")) - parser.lineReceived(_b("bar\n")) - self.assertEqual(_b("foo\nbar\n"), parser._message) - - def test_lineReceived_escaped_bracket(self): - parser = details.SimpleDetailsParser(None) - parser.lineReceived(_b("foo\n")) - parser.lineReceived(_b(" ]are\n")) - parser.lineReceived(_b("bar\n")) - self.assertEqual(_b("foo\n]are\nbar\n"), parser._message) - - def test_get_message(self): - parser = details.SimpleDetailsParser(None) - self.assertEqual(_b(""), parser.get_message()) - - def test_get_details(self): - parser = details.SimpleDetailsParser(None) - traceback = "" - expected = {} - expected['traceback'] = content.Content( - content_type.ContentType("text", "x-traceback", - {'charset': 'utf8'}), - lambda:[_b("")]) - found = parser.get_details() - self.assertEqual(expected.keys(), found.keys()) - self.assertEqual(expected['traceback'].content_type, - found['traceback'].content_type) - self.assertEqual(_b('').join(expected['traceback'].iter_bytes()), - _b('').join(found['traceback'].iter_bytes())) - - def test_get_details_skip(self): - parser = details.SimpleDetailsParser(None) - traceback = "" - expected = {} - expected['reason'] = content.Content( - content_type.ContentType("text", "plain"), - lambda:[_b("")]) - found = parser.get_details("skip") - self.assertEqual(expected, found) - - def test_get_details_success(self): - parser = details.SimpleDetailsParser(None) - traceback = "" - expected = {} - expected['message'] = content.Content( - content_type.ContentType("text", "plain"), - lambda:[_b("")]) - found = parser.get_details("success") - self.assertEqual(expected, found) - - -class TestMultipartDetails(unittest.TestCase): - - def test_get_message_is_None(self): - parser = details.MultipartDetailsParser(None) - self.assertEqual(None, parser.get_message()) - - def test_get_details(self): - parser = details.MultipartDetailsParser(None) - self.assertEqual({}, parser.get_details()) - - def test_parts(self): - parser = details.MultipartDetailsParser(None) - parser.lineReceived(_b("Content-Type: text/plain\n")) - parser.lineReceived(_b("something\n")) - parser.lineReceived(_b("F\r\n")) - parser.lineReceived(_b("serialised\n")) - parser.lineReceived(_b("form0\r\n")) - expected = {} - expected['something'] = content.Content( - content_type.ContentType("text", "plain"), - lambda:[_b("serialised\nform")]) - found = parser.get_details() - self.assertEqual(expected.keys(), found.keys()) - self.assertEqual(expected['something'].content_type, - found['something'].content_type) - self.assertEqual(_b('').join(expected['something'].iter_bytes()), - _b('').join(found['something'].iter_bytes())) diff --git a/lib/subunit/python/subunit/tests/test_progress_model.py b/lib/subunit/python/subunit/tests/test_progress_model.py deleted file mode 100644 index 76200c6107..0000000000 --- a/lib/subunit/python/subunit/tests/test_progress_model.py +++ /dev/null @@ -1,118 +0,0 @@ -# -# subunit: extensions to Python unittest to get test results from subprocesses. -# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -import unittest - -import subunit -from subunit.progress_model import ProgressModel - - -class TestProgressModel(unittest.TestCase): - - def assertProgressSummary(self, pos, total, progress): - """Assert that a progress model has reached a particular point.""" - self.assertEqual(pos, progress.pos()) - self.assertEqual(total, progress.width()) - - def test_new_progress_0_0(self): - progress = ProgressModel() - self.assertProgressSummary(0, 0, progress) - - def test_advance_0_0(self): - progress = ProgressModel() - progress.advance() - self.assertProgressSummary(1, 0, progress) - - def test_advance_1_0(self): - progress = ProgressModel() - progress.advance() - self.assertProgressSummary(1, 0, progress) - - def test_set_width_absolute(self): - progress = ProgressModel() - progress.set_width(10) - self.assertProgressSummary(0, 10, progress) - - def test_set_width_absolute_preserves_pos(self): - progress = ProgressModel() - progress.advance() - progress.set_width(2) - self.assertProgressSummary(1, 2, progress) - - def test_adjust_width(self): - progress = ProgressModel() - progress.adjust_width(10) - self.assertProgressSummary(0, 10, progress) - progress.adjust_width(-10) - self.assertProgressSummary(0, 0, progress) - - def test_adjust_width_preserves_pos(self): - progress = ProgressModel() - progress.advance() - progress.adjust_width(10) - self.assertProgressSummary(1, 10, progress) - progress.adjust_width(-10) - self.assertProgressSummary(1, 0, progress) - - def test_push_preserves_progress(self): - progress = ProgressModel() - progress.adjust_width(3) - progress.advance() - progress.push() - self.assertProgressSummary(1, 3, progress) - - def test_advance_advances_substack(self): - progress = ProgressModel() - progress.adjust_width(3) - progress.advance() - progress.push() - progress.adjust_width(1) - progress.advance() - self.assertProgressSummary(2, 3, progress) - - def test_adjust_width_adjusts_substack(self): - progress = ProgressModel() - progress.adjust_width(3) - progress.advance() - progress.push() - progress.adjust_width(2) - progress.advance() - self.assertProgressSummary(3, 6, progress) - - def test_set_width_adjusts_substack(self): - progress = ProgressModel() - progress.adjust_width(3) - progress.advance() - progress.push() - progress.set_width(2) - progress.advance() - self.assertProgressSummary(3, 6, progress) - - def test_pop_restores_progress(self): - progress = ProgressModel() - progress.adjust_width(3) - progress.advance() - progress.push() - progress.adjust_width(1) - progress.advance() - progress.pop() - self.assertProgressSummary(1, 3, progress) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result diff --git a/lib/subunit/python/subunit/tests/test_run.py b/lib/subunit/python/subunit/tests/test_run.py deleted file mode 100644 index 10519ed086..0000000000 --- a/lib/subunit/python/subunit/tests/test_run.py +++ /dev/null @@ -1,52 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2011 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -from testtools.compat import BytesIO -import unittest - -from testtools import PlaceHolder - -import subunit -from subunit.run import SubunitTestRunner - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result - - -class TimeCollectingTestResult(unittest.TestResult): - - def __init__(self, *args, **kwargs): - super(TimeCollectingTestResult, self).__init__(*args, **kwargs) - self.time_called = [] - - def time(self, a_time): - self.time_called.append(a_time) - - -class TestSubunitTestRunner(unittest.TestCase): - - def test_includes_timing_output(self): - io = BytesIO() - runner = SubunitTestRunner(stream=io) - test = PlaceHolder('name') - runner.run(test) - client = TimeCollectingTestResult() - io.seek(0) - subunit.TestProtocolServer(client).readFrom(io) - self.assertTrue(len(client.time_called) > 0) diff --git a/lib/subunit/python/subunit/tests/test_subunit_filter.py b/lib/subunit/python/subunit/tests/test_subunit_filter.py deleted file mode 100644 index 33b924824d..0000000000 --- a/lib/subunit/python/subunit/tests/test_subunit_filter.py +++ /dev/null @@ -1,370 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -"""Tests for subunit.TestResultFilter.""" - -from datetime import datetime -import os -import subprocess -import sys -from subunit import iso8601 -import unittest - -from testtools import TestCase -from testtools.compat import _b, BytesIO -from testtools.testresult.doubles import ExtendedTestResult - -import subunit -from subunit.test_results import make_tag_filter, TestResultFilter - - -class TestTestResultFilter(TestCase): - """Test for TestResultFilter, a TestResult object which filters tests.""" - - # While TestResultFilter works on python objects, using a subunit stream - # is an easy pithy way of getting a series of test objects to call into - # the TestResult, and as TestResultFilter is intended for use with subunit - # also has the benefit of detecting any interface skew issues. - example_subunit_stream = _b("""\ -tags: global -test passed -success passed -test failed -tags: local -failure failed -test error -error error [ -error details -] -test skipped -skip skipped -test todo -xfail todo -""") - - def run_tests(self, result_filter, input_stream=None): - """Run tests through the given filter. - - :param result_filter: A filtering TestResult object. - :param input_stream: Bytes of subunit stream data. If not provided, - uses TestTestResultFilter.example_subunit_stream. - """ - if input_stream is None: - input_stream = self.example_subunit_stream - test = subunit.ProtocolTestCase(BytesIO(input_stream)) - test.run(result_filter) - - def test_default(self): - """The default is to exclude success and include everything else.""" - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result) - self.run_tests(result_filter) - # skips are seen as success by default python TestResult. - self.assertEqual(['error'], - [error[0].id() for error in filtered_result.errors]) - self.assertEqual(['failed'], - [failure[0].id() for failure in - filtered_result.failures]) - self.assertEqual(4, filtered_result.testsRun) - - def test_tag_filter(self): - tag_filter = make_tag_filter(['global'], ['local']) - result = ExtendedTestResult() - result_filter = TestResultFilter( - result, filter_success=False, filter_predicate=tag_filter) - self.run_tests(result_filter) - tests_included = [ - event[1] for event in result._events if event[0] == 'startTest'] - tests_expected = list(map( - subunit.RemotedTestCase, - ['passed', 'error', 'skipped', 'todo'])) - self.assertEquals(tests_expected, tests_included) - - def test_tags_tracked_correctly(self): - tag_filter = make_tag_filter(['a'], []) - result = ExtendedTestResult() - result_filter = TestResultFilter( - result, filter_success=False, filter_predicate=tag_filter) - input_stream = _b( - "test: foo\n" - "tags: a\n" - "successful: foo\n" - "test: bar\n" - "successful: bar\n") - self.run_tests(result_filter, input_stream) - foo = subunit.RemotedTestCase('foo') - self.assertEquals( - [('startTest', foo), - ('tags', set(['a']), set()), - ('addSuccess', foo), - ('stopTest', foo), - ], - result._events) - - def test_exclude_errors(self): - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result, filter_error=True) - self.run_tests(result_filter) - # skips are seen as errors by default python TestResult. - self.assertEqual([], filtered_result.errors) - self.assertEqual(['failed'], - [failure[0].id() for failure in - filtered_result.failures]) - self.assertEqual(3, filtered_result.testsRun) - - def test_fixup_expected_failures(self): - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result, - fixup_expected_failures=set(["failed"])) - self.run_tests(result_filter) - self.assertEqual(['failed', 'todo'], - [failure[0].id() for failure in filtered_result.expectedFailures]) - self.assertEqual([], filtered_result.failures) - self.assertEqual(4, filtered_result.testsRun) - - def test_fixup_expected_errors(self): - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result, - fixup_expected_failures=set(["error"])) - self.run_tests(result_filter) - self.assertEqual(['error', 'todo'], - [failure[0].id() for failure in filtered_result.expectedFailures]) - self.assertEqual([], filtered_result.errors) - self.assertEqual(4, filtered_result.testsRun) - - def test_fixup_unexpected_success(self): - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result, filter_success=False, - fixup_expected_failures=set(["passed"])) - self.run_tests(result_filter) - self.assertEqual(['passed'], - [passed.id() for passed in filtered_result.unexpectedSuccesses]) - self.assertEqual(5, filtered_result.testsRun) - - def test_exclude_failure(self): - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result, filter_failure=True) - self.run_tests(result_filter) - self.assertEqual(['error'], - [error[0].id() for error in filtered_result.errors]) - self.assertEqual([], - [failure[0].id() for failure in - filtered_result.failures]) - self.assertEqual(3, filtered_result.testsRun) - - def test_exclude_skips(self): - filtered_result = subunit.TestResultStats(None) - result_filter = TestResultFilter(filtered_result, filter_skip=True) - self.run_tests(result_filter) - self.assertEqual(0, filtered_result.skipped_tests) - self.assertEqual(2, filtered_result.failed_tests) - self.assertEqual(3, filtered_result.testsRun) - - def test_include_success(self): - """Successes can be included if requested.""" - filtered_result = unittest.TestResult() - result_filter = TestResultFilter(filtered_result, - filter_success=False) - self.run_tests(result_filter) - self.assertEqual(['error'], - [error[0].id() for error in filtered_result.errors]) - self.assertEqual(['failed'], - [failure[0].id() for failure in - filtered_result.failures]) - self.assertEqual(5, filtered_result.testsRun) - - def test_filter_predicate(self): - """You can filter by predicate callbacks""" - # 0.0.7 and earlier did not support the 'tags' parameter, so we need - # to test that we still support behaviour without it. - filtered_result = unittest.TestResult() - def filter_cb(test, outcome, err, details): - return outcome == 'success' - result_filter = TestResultFilter(filtered_result, - filter_predicate=filter_cb, - filter_success=False) - self.run_tests(result_filter) - # Only success should pass - self.assertEqual(1, filtered_result.testsRun) - - def test_filter_predicate_with_tags(self): - """You can filter by predicate callbacks that accept tags""" - filtered_result = unittest.TestResult() - def filter_cb(test, outcome, err, details, tags): - return outcome == 'success' - result_filter = TestResultFilter(filtered_result, - filter_predicate=filter_cb, - filter_success=False) - self.run_tests(result_filter) - # Only success should pass - self.assertEqual(1, filtered_result.testsRun) - - def test_time_ordering_preserved(self): - # Passing a subunit stream through TestResultFilter preserves the - # relative ordering of 'time' directives and any other subunit - # directives that are still included. - date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC) - date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC) - date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC) - subunit_stream = _b('\n'.join([ - "time: %s", - "test: foo", - "time: %s", - "error: foo", - "time: %s", - ""]) % (date_a, date_b, date_c)) - result = ExtendedTestResult() - result_filter = TestResultFilter(result) - self.run_tests(result_filter, subunit_stream) - foo = subunit.RemotedTestCase('foo') - self.maxDiff = None - self.assertEqual( - [('time', date_a), - ('time', date_b), - ('startTest', foo), - ('addError', foo, {}), - ('stopTest', foo), - ('time', date_c)], result._events) - - def test_time_passes_through_filtered_tests(self): - # Passing a subunit stream through TestResultFilter preserves 'time' - # directives even if a specific test is filtered out. - date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC) - date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC) - date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC) - subunit_stream = _b('\n'.join([ - "time: %s", - "test: foo", - "time: %s", - "success: foo", - "time: %s", - ""]) % (date_a, date_b, date_c)) - result = ExtendedTestResult() - result_filter = TestResultFilter(result) - result_filter.startTestRun() - self.run_tests(result_filter, subunit_stream) - result_filter.stopTestRun() - foo = subunit.RemotedTestCase('foo') - self.maxDiff = None - self.assertEqual( - [('startTestRun',), - ('time', date_a), - ('time', date_c), - ('stopTestRun',),], result._events) - - def test_skip_preserved(self): - subunit_stream = _b('\n'.join([ - "test: foo", - "skip: foo", - ""])) - result = ExtendedTestResult() - result_filter = TestResultFilter(result) - self.run_tests(result_filter, subunit_stream) - foo = subunit.RemotedTestCase('foo') - self.assertEquals( - [('startTest', foo), - ('addSkip', foo, {}), - ('stopTest', foo), ], result._events) - - if sys.version_info < (2, 7): - # These tests require Python >=2.7. - del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success - - -class TestFilterCommand(TestCase): - - example_subunit_stream = _b("""\ -tags: global -test passed -success passed -test failed -tags: local -failure failed -test error -error error [ -error details -] -test skipped -skip skipped -test todo -xfail todo -""") - - def run_command(self, args, stream): - root = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) - script_path = os.path.join(root, 'filters', 'subunit-filter') - command = [sys.executable, script_path] + list(args) - ps = subprocess.Popen( - command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = ps.communicate(stream) - if ps.returncode != 0: - raise RuntimeError("%s failed: %s" % (command, err)) - return out - - def to_events(self, stream): - test = subunit.ProtocolTestCase(BytesIO(stream)) - result = ExtendedTestResult() - test.run(result) - return result._events - - def test_default(self): - output = self.run_command([], _b( - "test: foo\n" - "skip: foo\n" - )) - events = self.to_events(output) - foo = subunit.RemotedTestCase('foo') - self.assertEqual( - [('startTest', foo), - ('addSkip', foo, {}), - ('stopTest', foo)], - events) - - def test_tags(self): - output = self.run_command(['-s', '--with-tag', 'a'], _b( - "tags: a\n" - "test: foo\n" - "success: foo\n" - "tags: -a\n" - "test: bar\n" - "success: bar\n" - "test: baz\n" - "tags: a\n" - "success: baz\n" - )) - events = self.to_events(output) - foo = subunit.RemotedTestCase('foo') - baz = subunit.RemotedTestCase('baz') - self.assertEqual( - [('tags', set(['a']), set()), - ('startTest', foo), - ('addSuccess', foo), - ('stopTest', foo), - ('tags', set(), set(['a'])), - ('startTest', baz), - ('tags', set(['a']), set()), - ('addSuccess', baz), - ('stopTest', baz), - ], - events) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result diff --git a/lib/subunit/python/subunit/tests/test_subunit_stats.py b/lib/subunit/python/subunit/tests/test_subunit_stats.py deleted file mode 100644 index 6fd3301060..0000000000 --- a/lib/subunit/python/subunit/tests/test_subunit_stats.py +++ /dev/null @@ -1,84 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -"""Tests for subunit.TestResultStats.""" - -import unittest - -from testtools.compat import _b, BytesIO, StringIO - -import subunit - - -class TestTestResultStats(unittest.TestCase): - """Test for TestResultStats, a TestResult object that generates stats.""" - - def setUp(self): - self.output = StringIO() - self.result = subunit.TestResultStats(self.output) - self.input_stream = BytesIO() - self.test = subunit.ProtocolTestCase(self.input_stream) - - def test_stats_empty(self): - self.test.run(self.result) - self.assertEqual(0, self.result.total_tests) - self.assertEqual(0, self.result.passed_tests) - self.assertEqual(0, self.result.failed_tests) - self.assertEqual(set(), self.result.seen_tags) - - def setUpUsedStream(self): - self.input_stream.write(_b("""tags: global -test passed -success passed -test failed -tags: local -failure failed -test error -error error -test skipped -skip skipped -test todo -xfail todo -""")) - self.input_stream.seek(0) - self.test.run(self.result) - - def test_stats_smoke_everything(self): - # Statistics are calculated usefully. - self.setUpUsedStream() - self.assertEqual(5, self.result.total_tests) - self.assertEqual(2, self.result.passed_tests) - self.assertEqual(2, self.result.failed_tests) - self.assertEqual(1, self.result.skipped_tests) - self.assertEqual(set(["global", "local"]), self.result.seen_tags) - - def test_stat_formatting(self): - expected = (""" -Total tests: 5 -Passed tests: 2 -Failed tests: 2 -Skipped tests: 1 -Seen tags: global, local -""")[1:] - self.setUpUsedStream() - self.result.formatStats() - self.assertEqual(expected, self.output.getvalue()) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result diff --git a/lib/subunit/python/subunit/tests/test_subunit_tags.py b/lib/subunit/python/subunit/tests/test_subunit_tags.py deleted file mode 100644 index c98506a737..0000000000 --- a/lib/subunit/python/subunit/tests/test_subunit_tags.py +++ /dev/null @@ -1,69 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -"""Tests for subunit.tag_stream.""" - -import unittest - -from testtools.compat import StringIO - -import subunit -import subunit.test_results - - -class TestSubUnitTags(unittest.TestCase): - - def setUp(self): - self.original = StringIO() - self.filtered = StringIO() - - def test_add_tag(self): - self.original.write("tags: foo\n") - self.original.write("test: test\n") - self.original.write("tags: bar -quux\n") - self.original.write("success: test\n") - self.original.seek(0) - result = subunit.tag_stream(self.original, self.filtered, ["quux"]) - self.assertEqual([ - "tags: quux", - "tags: foo", - "test: test", - "tags: bar", - "success: test", - ], - self.filtered.getvalue().splitlines()) - - def test_remove_tag(self): - self.original.write("tags: foo\n") - self.original.write("test: test\n") - self.original.write("tags: bar -quux\n") - self.original.write("success: test\n") - self.original.seek(0) - result = subunit.tag_stream(self.original, self.filtered, ["-bar"]) - self.assertEqual([ - "tags: -bar", - "tags: foo", - "test: test", - "tags: -quux", - "success: test", - ], - self.filtered.getvalue().splitlines()) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result diff --git a/lib/subunit/python/subunit/tests/test_tap2subunit.py b/lib/subunit/python/subunit/tests/test_tap2subunit.py deleted file mode 100644 index 11bc1916b3..0000000000 --- a/lib/subunit/python/subunit/tests/test_tap2subunit.py +++ /dev/null @@ -1,445 +0,0 @@ -# -# subunit: extensions to python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -"""Tests for TAP2SubUnit.""" - -import unittest - -from testtools.compat import StringIO - -import subunit - - -class TestTAP2SubUnit(unittest.TestCase): - """Tests for TAP2SubUnit. - - These tests test TAP string data in, and subunit string data out. - This is ok because the subunit protocol is intended to be stable, - but it might be easier/pithier to write tests against TAP string in, - parsed subunit objects out (by hooking the subunit stream to a subunit - protocol server. - """ - - def setUp(self): - self.tap = StringIO() - self.subunit = StringIO() - - def test_skip_entire_file(self): - # A file - # 1..- # Skipped: comment - # results in a single skipped test. - self.tap.write("1..0 # Skipped: entire file skipped\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test file skip", - "skip file skip [", - "Skipped: entire file skipped", - "]", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_test_pass(self): - # A file - # ok - # results in a passed test with name 'test 1' (a synthetic name as tap - # does not require named fixtures - it is the first test in the tap - # stream). - self.tap.write("ok\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "success test 1", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_test_number_pass(self): - # A file - # ok 1 - # results in a passed test with name 'test 1' - self.tap.write("ok 1\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "success test 1", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_test_number_description_pass(self): - # A file - # ok 1 - There is a description - # results in a passed test with name 'test 1 - There is a description' - self.tap.write("ok 1 - There is a description\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1 - There is a description", - "success test 1 - There is a description", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_test_description_pass(self): - # A file - # ok There is a description - # results in a passed test with name 'test 1 There is a description' - self.tap.write("ok There is a description\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1 There is a description", - "success test 1 There is a description", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_SKIP_skip(self): - # A file - # ok # SKIP - # results in a skkip test with name 'test 1' - self.tap.write("ok # SKIP\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "skip test 1", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_skip_number_comment_lowercase(self): - self.tap.write("ok 1 # skip no samba environment available, skipping compilation\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "skip test 1 [", - "no samba environment available, skipping compilation", - "]" - ], - self.subunit.getvalue().splitlines()) - - def test_ok_number_description_SKIP_skip_comment(self): - # A file - # ok 1 foo # SKIP Not done yet - # results in a skip test with name 'test 1 foo' and a log of - # Not done yet - self.tap.write("ok 1 foo # SKIP Not done yet\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1 foo", - "skip test 1 foo [", - "Not done yet", - "]", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_SKIP_skip_comment(self): - # A file - # ok # SKIP Not done yet - # results in a skip test with name 'test 1' and a log of Not done yet - self.tap.write("ok # SKIP Not done yet\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "skip test 1 [", - "Not done yet", - "]", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_TODO_xfail(self): - # A file - # ok # TODO - # results in a xfail test with name 'test 1' - self.tap.write("ok # TODO\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "xfail test 1", - ], - self.subunit.getvalue().splitlines()) - - def test_ok_TODO_xfail_comment(self): - # A file - # ok # TODO Not done yet - # results in a xfail test with name 'test 1' and a log of Not done yet - self.tap.write("ok # TODO Not done yet\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1", - "xfail test 1 [", - "Not done yet", - "]", - ], - self.subunit.getvalue().splitlines()) - - def test_bail_out_errors(self): - # A file with line in it - # Bail out! COMMENT - # is treated as an error - self.tap.write("ok 1 foo\n") - self.tap.write("Bail out! Lifejacket engaged\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - "test test 1 foo", - "success test 1 foo", - "test Bail out! Lifejacket engaged", - "error Bail out! Lifejacket engaged", - ], - self.subunit.getvalue().splitlines()) - - def test_missing_test_at_end_with_plan_adds_error(self): - # A file - # 1..3 - # ok first test - # not ok third test - # results in three tests, with the third being created - self.tap.write('1..3\n') - self.tap.write('ok first test\n') - self.tap.write('not ok second test\n') - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 first test', - 'success test 1 first test', - 'test test 2 second test', - 'failure test 2 second test', - 'test test 3', - 'error test 3 [', - 'test missing from TAP output', - ']', - ], - self.subunit.getvalue().splitlines()) - - def test_missing_test_with_plan_adds_error(self): - # A file - # 1..3 - # ok first test - # not ok 3 third test - # results in three tests, with the second being created - self.tap.write('1..3\n') - self.tap.write('ok first test\n') - self.tap.write('not ok 3 third test\n') - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 first test', - 'success test 1 first test', - 'test test 2', - 'error test 2 [', - 'test missing from TAP output', - ']', - 'test test 3 third test', - 'failure test 3 third test', - ], - self.subunit.getvalue().splitlines()) - - def test_missing_test_no_plan_adds_error(self): - # A file - # ok first test - # not ok 3 third test - # results in three tests, with the second being created - self.tap.write('ok first test\n') - self.tap.write('not ok 3 third test\n') - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 first test', - 'success test 1 first test', - 'test test 2', - 'error test 2 [', - 'test missing from TAP output', - ']', - 'test test 3 third test', - 'failure test 3 third test', - ], - self.subunit.getvalue().splitlines()) - - def test_four_tests_in_a_row_trailing_plan(self): - # A file - # ok 1 - first test in a script with no plan at all - # not ok 2 - second - # ok 3 - third - # not ok 4 - fourth - # 1..4 - # results in four tests numbered and named - self.tap.write('ok 1 - first test in a script with trailing plan\n') - self.tap.write('not ok 2 - second\n') - self.tap.write('ok 3 - third\n') - self.tap.write('not ok 4 - fourth\n') - self.tap.write('1..4\n') - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 - first test in a script with trailing plan', - 'success test 1 - first test in a script with trailing plan', - 'test test 2 - second', - 'failure test 2 - second', - 'test test 3 - third', - 'success test 3 - third', - 'test test 4 - fourth', - 'failure test 4 - fourth' - ], - self.subunit.getvalue().splitlines()) - - def test_four_tests_in_a_row_with_plan(self): - # A file - # 1..4 - # ok 1 - first test in a script with no plan at all - # not ok 2 - second - # ok 3 - third - # not ok 4 - fourth - # results in four tests numbered and named - self.tap.write('1..4\n') - self.tap.write('ok 1 - first test in a script with a plan\n') - self.tap.write('not ok 2 - second\n') - self.tap.write('ok 3 - third\n') - self.tap.write('not ok 4 - fourth\n') - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 - first test in a script with a plan', - 'success test 1 - first test in a script with a plan', - 'test test 2 - second', - 'failure test 2 - second', - 'test test 3 - third', - 'success test 3 - third', - 'test test 4 - fourth', - 'failure test 4 - fourth' - ], - self.subunit.getvalue().splitlines()) - - def test_four_tests_in_a_row_no_plan(self): - # A file - # ok 1 - first test in a script with no plan at all - # not ok 2 - second - # ok 3 - third - # not ok 4 - fourth - # results in four tests numbered and named - self.tap.write('ok 1 - first test in a script with no plan at all\n') - self.tap.write('not ok 2 - second\n') - self.tap.write('ok 3 - third\n') - self.tap.write('not ok 4 - fourth\n') - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 - first test in a script with no plan at all', - 'success test 1 - first test in a script with no plan at all', - 'test test 2 - second', - 'failure test 2 - second', - 'test test 3 - third', - 'success test 3 - third', - 'test test 4 - fourth', - 'failure test 4 - fourth' - ], - self.subunit.getvalue().splitlines()) - - def test_todo_and_skip(self): - # A file - # not ok 1 - a fail but # TODO but is TODO - # not ok 2 - another fail # SKIP instead - # results in two tests, numbered and commented. - self.tap.write("not ok 1 - a fail but # TODO but is TODO\n") - self.tap.write("not ok 2 - another fail # SKIP instead\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1 - a fail but', - 'xfail test 1 - a fail but [', - 'but is TODO', - ']', - 'test test 2 - another fail', - 'skip test 2 - another fail [', - 'instead', - ']', - ], - self.subunit.getvalue().splitlines()) - - def test_leading_comments_add_to_next_test_log(self): - # A file - # # comment - # ok - # ok - # results in a single test with the comment included - # in the first test and not the second. - self.tap.write("# comment\n") - self.tap.write("ok\n") - self.tap.write("ok\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1', - 'success test 1 [', - '# comment', - ']', - 'test test 2', - 'success test 2', - ], - self.subunit.getvalue().splitlines()) - - def test_trailing_comments_are_included_in_last_test_log(self): - # A file - # ok foo - # ok foo - # # comment - # results in a two tests, with the second having the comment - # attached to its log. - self.tap.write("ok\n") - self.tap.write("ok\n") - self.tap.write("# comment\n") - self.tap.seek(0) - result = subunit.TAP2SubUnit(self.tap, self.subunit) - self.assertEqual(0, result) - self.assertEqual([ - 'test test 1', - 'success test 1', - 'test test 2', - 'success test 2 [', - '# comment', - ']', - ], - self.subunit.getvalue().splitlines()) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result diff --git a/lib/subunit/python/subunit/tests/test_test_protocol.py b/lib/subunit/python/subunit/tests/test_test_protocol.py deleted file mode 100644 index 7831ba16cd..0000000000 --- a/lib/subunit/python/subunit/tests/test_test_protocol.py +++ /dev/null @@ -1,1337 +0,0 @@ -# -# subunit: extensions to Python unittest to get test results from subprocesses. -# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -import datetime -import unittest -import os - -from testtools import PlaceHolder, skipIf, TestCase, TestResult -from testtools.compat import _b, _u, BytesIO -from testtools.content import Content, TracebackContent, text_content -from testtools.content_type import ContentType -try: - from testtools.testresult.doubles import ( - Python26TestResult, - Python27TestResult, - ExtendedTestResult, - ) -except ImportError: - from testtools.tests.helpers import ( - Python26TestResult, - Python27TestResult, - ExtendedTestResult, - ) - -import subunit -from subunit import _remote_exception_str, _remote_exception_str_chunked -import subunit.iso8601 as iso8601 - - -def details_to_str(details): - return TestResult()._err_details_to_string(None, details=details) - - -class TestTestImports(unittest.TestCase): - - def test_imports(self): - from subunit import DiscardStream - from subunit import TestProtocolServer - from subunit import RemotedTestCase - from subunit import RemoteError - from subunit import ExecTestCase - from subunit import IsolatedTestCase - from subunit import TestProtocolClient - from subunit import ProtocolTestCase - - -class TestDiscardStream(unittest.TestCase): - - def test_write(self): - subunit.DiscardStream().write("content") - - -class TestProtocolServerForward(unittest.TestCase): - - def test_story(self): - client = unittest.TestResult() - out = BytesIO() - protocol = subunit.TestProtocolServer(client, forward_stream=out) - pipe = BytesIO(_b("test old mcdonald\n" - "success old mcdonald\n")) - protocol.readFrom(pipe) - self.assertEqual(client.testsRun, 1) - self.assertEqual(pipe.getvalue(), out.getvalue()) - - def test_not_command(self): - client = unittest.TestResult() - out = BytesIO() - protocol = subunit.TestProtocolServer(client, - stream=subunit.DiscardStream(), forward_stream=out) - pipe = BytesIO(_b("success old mcdonald\n")) - protocol.readFrom(pipe) - self.assertEqual(client.testsRun, 0) - self.assertEqual(_b(""), out.getvalue()) - - -class TestTestProtocolServerPipe(unittest.TestCase): - - def test_story(self): - client = unittest.TestResult() - protocol = subunit.TestProtocolServer(client) - traceback = "foo.c:53:ERROR invalid state\n" - pipe = BytesIO(_b("test old mcdonald\n" - "success old mcdonald\n" - "test bing crosby\n" - "failure bing crosby [\n" - + traceback + - "]\n" - "test an error\n" - "error an error\n")) - protocol.readFrom(pipe) - bing = subunit.RemotedTestCase("bing crosby") - an_error = subunit.RemotedTestCase("an error") - self.assertEqual(client.errors, - [(an_error, _remote_exception_str + '\n')]) - self.assertEqual( - client.failures, - [(bing, _remote_exception_str + ": " - + details_to_str({'traceback': text_content(traceback)}) + "\n")]) - self.assertEqual(client.testsRun, 3) - - def test_non_test_characters_forwarded_immediately(self): - pass - - -class TestTestProtocolServerStartTest(unittest.TestCase): - - def setUp(self): - self.client = Python26TestResult() - self.stream = BytesIO() - self.protocol = subunit.TestProtocolServer(self.client, self.stream) - - def test_start_test(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.assertEqual(self.client._events, - [('startTest', subunit.RemotedTestCase("old mcdonald"))]) - - def test_start_testing(self): - self.protocol.lineReceived(_b("testing old mcdonald\n")) - self.assertEqual(self.client._events, - [('startTest', subunit.RemotedTestCase("old mcdonald"))]) - - def test_start_test_colon(self): - self.protocol.lineReceived(_b("test: old mcdonald\n")) - self.assertEqual(self.client._events, - [('startTest', subunit.RemotedTestCase("old mcdonald"))]) - - def test_indented_test_colon_ignored(self): - ignored_line = _b(" test: old mcdonald\n") - self.protocol.lineReceived(ignored_line) - self.assertEqual([], self.client._events) - self.assertEqual(self.stream.getvalue(), ignored_line) - - def test_start_testing_colon(self): - self.protocol.lineReceived(_b("testing: old mcdonald\n")) - self.assertEqual(self.client._events, - [('startTest', subunit.RemotedTestCase("old mcdonald"))]) - - -class TestTestProtocolServerPassThrough(unittest.TestCase): - - def setUp(self): - self.stdout = BytesIO() - self.test = subunit.RemotedTestCase("old mcdonald") - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client, self.stdout) - - def keywords_before_test(self): - self.protocol.lineReceived(_b("failure a\n")) - self.protocol.lineReceived(_b("failure: a\n")) - self.protocol.lineReceived(_b("error a\n")) - self.protocol.lineReceived(_b("error: a\n")) - self.protocol.lineReceived(_b("success a\n")) - self.protocol.lineReceived(_b("success: a\n")) - self.protocol.lineReceived(_b("successful a\n")) - self.protocol.lineReceived(_b("successful: a\n")) - self.protocol.lineReceived(_b("]\n")) - self.assertEqual(self.stdout.getvalue(), _b("failure a\n" - "failure: a\n" - "error a\n" - "error: a\n" - "success a\n" - "success: a\n" - "successful a\n" - "successful: a\n" - "]\n")) - - def test_keywords_before_test(self): - self.keywords_before_test() - self.assertEqual(self.client._events, []) - - def test_keywords_after_error(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("error old mcdonald\n")) - self.keywords_before_test() - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, {}), - ('stopTest', self.test), - ], self.client._events) - - def test_keywords_after_failure(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("failure old mcdonald\n")) - self.keywords_before_test() - self.assertEqual(self.client._events, [ - ('startTest', self.test), - ('addFailure', self.test, {}), - ('stopTest', self.test), - ]) - - def test_keywords_after_success(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("success old mcdonald\n")) - self.keywords_before_test() - self.assertEqual([ - ('startTest', self.test), - ('addSuccess', self.test), - ('stopTest', self.test), - ], self.client._events) - - def test_keywords_after_test(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("failure a\n")) - self.protocol.lineReceived(_b("failure: a\n")) - self.protocol.lineReceived(_b("error a\n")) - self.protocol.lineReceived(_b("error: a\n")) - self.protocol.lineReceived(_b("success a\n")) - self.protocol.lineReceived(_b("success: a\n")) - self.protocol.lineReceived(_b("successful a\n")) - self.protocol.lineReceived(_b("successful: a\n")) - self.protocol.lineReceived(_b("]\n")) - self.protocol.lineReceived(_b("failure old mcdonald\n")) - self.assertEqual(self.stdout.getvalue(), _b("test old mcdonald\n" - "failure a\n" - "failure: a\n" - "error a\n" - "error: a\n" - "success a\n" - "success: a\n" - "successful a\n" - "successful: a\n" - "]\n")) - self.assertEqual(self.client._events, [ - ('startTest', self.test), - ('addFailure', self.test, {}), - ('stopTest', self.test), - ]) - - def test_keywords_during_failure(self): - # A smoke test to make sure that the details parsers have control - # appropriately. - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("failure: old mcdonald [\n")) - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("failure a\n")) - self.protocol.lineReceived(_b("failure: a\n")) - self.protocol.lineReceived(_b("error a\n")) - self.protocol.lineReceived(_b("error: a\n")) - self.protocol.lineReceived(_b("success a\n")) - self.protocol.lineReceived(_b("success: a\n")) - self.protocol.lineReceived(_b("successful a\n")) - self.protocol.lineReceived(_b("successful: a\n")) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - self.assertEqual(self.stdout.getvalue(), _b("")) - details = {} - details['traceback'] = Content(ContentType("text", "x-traceback", - {'charset': 'utf8'}), - lambda:[_b( - "test old mcdonald\n" - "failure a\n" - "failure: a\n" - "error a\n" - "error: a\n" - "success a\n" - "success: a\n" - "successful a\n" - "successful: a\n" - "]\n")]) - self.assertEqual(self.client._events, [ - ('startTest', self.test), - ('addFailure', self.test, details), - ('stopTest', self.test), - ]) - - def test_stdout_passthrough(self): - """Lines received which cannot be interpreted as any protocol action - should be passed through to sys.stdout. - """ - bytes = _b("randombytes\n") - self.protocol.lineReceived(bytes) - self.assertEqual(self.stdout.getvalue(), bytes) - - -class TestTestProtocolServerLostConnection(unittest.TestCase): - - def setUp(self): - self.client = Python26TestResult() - self.protocol = subunit.TestProtocolServer(self.client) - self.test = subunit.RemotedTestCase("old mcdonald") - - def test_lost_connection_no_input(self): - self.protocol.lostConnection() - self.assertEqual([], self.client._events) - - def test_lost_connection_after_start(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lostConnection() - failure = subunit.RemoteError( - _u("lost connection during test 'old mcdonald'")) - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, failure), - ('stopTest', self.test), - ], self.client._events) - - def test_lost_connected_after_error(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("error old mcdonald\n")) - self.protocol.lostConnection() - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, subunit.RemoteError(_u(""))), - ('stopTest', self.test), - ], self.client._events) - - def do_connection_lost(self, outcome, opening): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("%s old mcdonald %s" % (outcome, opening))) - self.protocol.lostConnection() - failure = subunit.RemoteError( - _u("lost connection during %s report of test 'old mcdonald'") % - outcome) - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, failure), - ('stopTest', self.test), - ], self.client._events) - - def test_lost_connection_during_error(self): - self.do_connection_lost("error", "[\n") - - def test_lost_connection_during_error_details(self): - self.do_connection_lost("error", "[ multipart\n") - - def test_lost_connected_after_failure(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("failure old mcdonald\n")) - self.protocol.lostConnection() - self.assertEqual([ - ('startTest', self.test), - ('addFailure', self.test, subunit.RemoteError(_u(""))), - ('stopTest', self.test), - ], self.client._events) - - def test_lost_connection_during_failure(self): - self.do_connection_lost("failure", "[\n") - - def test_lost_connection_during_failure_details(self): - self.do_connection_lost("failure", "[ multipart\n") - - def test_lost_connection_after_success(self): - self.protocol.lineReceived(_b("test old mcdonald\n")) - self.protocol.lineReceived(_b("success old mcdonald\n")) - self.protocol.lostConnection() - self.assertEqual([ - ('startTest', self.test), - ('addSuccess', self.test), - ('stopTest', self.test), - ], self.client._events) - - def test_lost_connection_during_success(self): - self.do_connection_lost("success", "[\n") - - def test_lost_connection_during_success_details(self): - self.do_connection_lost("success", "[ multipart\n") - - def test_lost_connection_during_skip(self): - self.do_connection_lost("skip", "[\n") - - def test_lost_connection_during_skip_details(self): - self.do_connection_lost("skip", "[ multipart\n") - - def test_lost_connection_during_xfail(self): - self.do_connection_lost("xfail", "[\n") - - def test_lost_connection_during_xfail_details(self): - self.do_connection_lost("xfail", "[ multipart\n") - - def test_lost_connection_during_uxsuccess(self): - self.do_connection_lost("uxsuccess", "[\n") - - def test_lost_connection_during_uxsuccess_details(self): - self.do_connection_lost("uxsuccess", "[ multipart\n") - - -class TestInTestMultipart(unittest.TestCase): - - def setUp(self): - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = subunit.RemotedTestCase(_u("mcdonalds farm")) - - def test__outcome_sets_details_parser(self): - self.protocol._reading_success_details.details_parser = None - self.protocol._state._outcome(0, _b("mcdonalds farm [ multipart\n"), - None, self.protocol._reading_success_details) - parser = self.protocol._reading_success_details.details_parser - self.assertNotEqual(None, parser) - self.assertTrue(isinstance(parser, - subunit.details.MultipartDetailsParser)) - - -class TestTestProtocolServerAddError(unittest.TestCase): - - def setUp(self): - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = subunit.RemotedTestCase("mcdonalds farm") - - def simple_error_keyword(self, keyword): - self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) - details = {} - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, details), - ('stopTest', self.test), - ], self.client._events) - - def test_simple_error(self): - self.simple_error_keyword("error") - - def test_simple_error_colon(self): - self.simple_error_keyword("error:") - - def test_error_empty_message(self): - self.protocol.lineReceived(_b("error mcdonalds farm [\n")) - self.protocol.lineReceived(_b("]\n")) - details = {} - details['traceback'] = Content(ContentType("text", "x-traceback", - {'charset': 'utf8'}), lambda:[_b("")]) - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, details), - ('stopTest', self.test), - ], self.client._events) - - def error_quoted_bracket(self, keyword): - self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword)) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - details = {} - details['traceback'] = Content(ContentType("text", "x-traceback", - {'charset': 'utf8'}), lambda:[_b("]\n")]) - self.assertEqual([ - ('startTest', self.test), - ('addError', self.test, details), - ('stopTest', self.test), - ], self.client._events) - - def test_error_quoted_bracket(self): - self.error_quoted_bracket("error") - - def test_error_colon_quoted_bracket(self): - self.error_quoted_bracket("error:") - - -class TestTestProtocolServerAddFailure(unittest.TestCase): - - def setUp(self): - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = subunit.RemotedTestCase("mcdonalds farm") - - def assertFailure(self, details): - self.assertEqual([ - ('startTest', self.test), - ('addFailure', self.test, details), - ('stopTest', self.test), - ], self.client._events) - - def simple_failure_keyword(self, keyword): - self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) - details = {} - self.assertFailure(details) - - def test_simple_failure(self): - self.simple_failure_keyword("failure") - - def test_simple_failure_colon(self): - self.simple_failure_keyword("failure:") - - def test_failure_empty_message(self): - self.protocol.lineReceived(_b("failure mcdonalds farm [\n")) - self.protocol.lineReceived(_b("]\n")) - details = {} - details['traceback'] = Content(ContentType("text", "x-traceback", - {'charset': 'utf8'}), lambda:[_b("")]) - self.assertFailure(details) - - def failure_quoted_bracket(self, keyword): - self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword)) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - details = {} - details['traceback'] = Content(ContentType("text", "x-traceback", - {'charset': 'utf8'}), lambda:[_b("]\n")]) - self.assertFailure(details) - - def test_failure_quoted_bracket(self): - self.failure_quoted_bracket("failure") - - def test_failure_colon_quoted_bracket(self): - self.failure_quoted_bracket("failure:") - - -class TestTestProtocolServerAddxFail(unittest.TestCase): - """Tests for the xfail keyword. - - In Python this can thunk through to Success due to stdlib limitations (see - README). - """ - - def capture_expected_failure(self, test, err): - self._events.append((test, err)) - - def setup_python26(self): - """Setup a test object ready to be xfailed and thunk to success.""" - self.client = Python26TestResult() - self.setup_protocol() - - def setup_python27(self): - """Setup a test object ready to be xfailed.""" - self.client = Python27TestResult() - self.setup_protocol() - - def setup_python_ex(self): - """Setup a test object ready to be xfailed with details.""" - self.client = ExtendedTestResult() - self.setup_protocol() - - def setup_protocol(self): - """Setup the protocol based on self.client.""" - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = self.client._events[-1][-1] - - def simple_xfail_keyword(self, keyword, as_success): - self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) - self.check_success_or_xfail(as_success) - - def check_success_or_xfail(self, as_success, error_message=None): - if as_success: - self.assertEqual([ - ('startTest', self.test), - ('addSuccess', self.test), - ('stopTest', self.test), - ], self.client._events) - else: - details = {} - if error_message is not None: - details['traceback'] = Content( - ContentType("text", "x-traceback", {'charset': 'utf8'}), - lambda:[_b(error_message)]) - if isinstance(self.client, ExtendedTestResult): - value = details - else: - if error_message is not None: - value = subunit.RemoteError(details_to_str(details)) - else: - value = subunit.RemoteError() - self.assertEqual([ - ('startTest', self.test), - ('addExpectedFailure', self.test, value), - ('stopTest', self.test), - ], self.client._events) - - def test_simple_xfail(self): - self.setup_python26() - self.simple_xfail_keyword("xfail", True) - self.setup_python27() - self.simple_xfail_keyword("xfail", False) - self.setup_python_ex() - self.simple_xfail_keyword("xfail", False) - - def test_simple_xfail_colon(self): - self.setup_python26() - self.simple_xfail_keyword("xfail:", True) - self.setup_python27() - self.simple_xfail_keyword("xfail:", False) - self.setup_python_ex() - self.simple_xfail_keyword("xfail:", False) - - def test_xfail_empty_message(self): - self.setup_python26() - self.empty_message(True) - self.setup_python27() - self.empty_message(False) - self.setup_python_ex() - self.empty_message(False, error_message="") - - def empty_message(self, as_success, error_message="\n"): - self.protocol.lineReceived(_b("xfail mcdonalds farm [\n")) - self.protocol.lineReceived(_b("]\n")) - self.check_success_or_xfail(as_success, error_message) - - def xfail_quoted_bracket(self, keyword, as_success): - # This tests it is accepted, but cannot test it is used today, because - # of not having a way to expose it in Python so far. - self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword)) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - self.check_success_or_xfail(as_success, "]\n") - - def test_xfail_quoted_bracket(self): - self.setup_python26() - self.xfail_quoted_bracket("xfail", True) - self.setup_python27() - self.xfail_quoted_bracket("xfail", False) - self.setup_python_ex() - self.xfail_quoted_bracket("xfail", False) - - def test_xfail_colon_quoted_bracket(self): - self.setup_python26() - self.xfail_quoted_bracket("xfail:", True) - self.setup_python27() - self.xfail_quoted_bracket("xfail:", False) - self.setup_python_ex() - self.xfail_quoted_bracket("xfail:", False) - - -class TestTestProtocolServerAddunexpectedSuccess(TestCase): - """Tests for the uxsuccess keyword.""" - - def capture_expected_failure(self, test, err): - self._events.append((test, err)) - - def setup_python26(self): - """Setup a test object ready to be xfailed and thunk to success.""" - self.client = Python26TestResult() - self.setup_protocol() - - def setup_python27(self): - """Setup a test object ready to be xfailed.""" - self.client = Python27TestResult() - self.setup_protocol() - - def setup_python_ex(self): - """Setup a test object ready to be xfailed with details.""" - self.client = ExtendedTestResult() - self.setup_protocol() - - def setup_protocol(self): - """Setup the protocol based on self.client.""" - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = self.client._events[-1][-1] - - def simple_uxsuccess_keyword(self, keyword, as_fail): - self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) - self.check_fail_or_uxsuccess(as_fail) - - def check_fail_or_uxsuccess(self, as_fail, error_message=None): - details = {} - if error_message is not None: - details['traceback'] = Content( - ContentType("text", "x-traceback", {'charset': 'utf8'}), - lambda:[_b(error_message)]) - if isinstance(self.client, ExtendedTestResult): - value = details - else: - value = None - if as_fail: - self.client._events[1] = self.client._events[1][:2] - # The value is generated within the extended to original decorator: - # todo use the testtools matcher to check on this. - self.assertEqual([ - ('startTest', self.test), - ('addFailure', self.test), - ('stopTest', self.test), - ], self.client._events) - elif value: - self.assertEqual([ - ('startTest', self.test), - ('addUnexpectedSuccess', self.test, value), - ('stopTest', self.test), - ], self.client._events) - else: - self.assertEqual([ - ('startTest', self.test), - ('addUnexpectedSuccess', self.test), - ('stopTest', self.test), - ], self.client._events) - - def test_simple_uxsuccess(self): - self.setup_python26() - self.simple_uxsuccess_keyword("uxsuccess", True) - self.setup_python27() - self.simple_uxsuccess_keyword("uxsuccess", False) - self.setup_python_ex() - self.simple_uxsuccess_keyword("uxsuccess", False) - - def test_simple_uxsuccess_colon(self): - self.setup_python26() - self.simple_uxsuccess_keyword("uxsuccess:", True) - self.setup_python27() - self.simple_uxsuccess_keyword("uxsuccess:", False) - self.setup_python_ex() - self.simple_uxsuccess_keyword("uxsuccess:", False) - - def test_uxsuccess_empty_message(self): - self.setup_python26() - self.empty_message(True) - self.setup_python27() - self.empty_message(False) - self.setup_python_ex() - self.empty_message(False, error_message="") - - def empty_message(self, as_fail, error_message="\n"): - self.protocol.lineReceived(_b("uxsuccess mcdonalds farm [\n")) - self.protocol.lineReceived(_b("]\n")) - self.check_fail_or_uxsuccess(as_fail, error_message) - - def uxsuccess_quoted_bracket(self, keyword, as_fail): - self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword)) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - self.check_fail_or_uxsuccess(as_fail, "]\n") - - def test_uxsuccess_quoted_bracket(self): - self.setup_python26() - self.uxsuccess_quoted_bracket("uxsuccess", True) - self.setup_python27() - self.uxsuccess_quoted_bracket("uxsuccess", False) - self.setup_python_ex() - self.uxsuccess_quoted_bracket("uxsuccess", False) - - def test_uxsuccess_colon_quoted_bracket(self): - self.setup_python26() - self.uxsuccess_quoted_bracket("uxsuccess:", True) - self.setup_python27() - self.uxsuccess_quoted_bracket("uxsuccess:", False) - self.setup_python_ex() - self.uxsuccess_quoted_bracket("uxsuccess:", False) - - -class TestTestProtocolServerAddSkip(unittest.TestCase): - """Tests for the skip keyword. - - In Python this meets the testtools extended TestResult contract. - (See https://launchpad.net/testtools). - """ - - def setUp(self): - """Setup a test object ready to be skipped.""" - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = self.client._events[-1][-1] - - def assertSkip(self, reason): - details = {} - if reason is not None: - details['reason'] = Content( - ContentType("text", "plain"), lambda:[reason]) - self.assertEqual([ - ('startTest', self.test), - ('addSkip', self.test, details), - ('stopTest', self.test), - ], self.client._events) - - def simple_skip_keyword(self, keyword): - self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) - self.assertSkip(None) - - def test_simple_skip(self): - self.simple_skip_keyword("skip") - - def test_simple_skip_colon(self): - self.simple_skip_keyword("skip:") - - def test_skip_empty_message(self): - self.protocol.lineReceived(_b("skip mcdonalds farm [\n")) - self.protocol.lineReceived(_b("]\n")) - self.assertSkip(_b("")) - - def skip_quoted_bracket(self, keyword): - # This tests it is accepted, but cannot test it is used today, because - # of not having a way to expose it in Python so far. - self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword)) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - self.assertSkip(_b("]\n")) - - def test_skip_quoted_bracket(self): - self.skip_quoted_bracket("skip") - - def test_skip_colon_quoted_bracket(self): - self.skip_quoted_bracket("skip:") - - -class TestTestProtocolServerAddSuccess(unittest.TestCase): - - def setUp(self): - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - self.test = subunit.RemotedTestCase("mcdonalds farm") - - def simple_success_keyword(self, keyword): - self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) - self.assertEqual([ - ('startTest', self.test), - ('addSuccess', self.test), - ('stopTest', self.test), - ], self.client._events) - - def test_simple_success(self): - self.simple_success_keyword("successful") - - def test_simple_success_colon(self): - self.simple_success_keyword("successful:") - - def assertSuccess(self, details): - self.assertEqual([ - ('startTest', self.test), - ('addSuccess', self.test, details), - ('stopTest', self.test), - ], self.client._events) - - def test_success_empty_message(self): - self.protocol.lineReceived(_b("success mcdonalds farm [\n")) - self.protocol.lineReceived(_b("]\n")) - details = {} - details['message'] = Content(ContentType("text", "plain"), - lambda:[_b("")]) - self.assertSuccess(details) - - def success_quoted_bracket(self, keyword): - # This tests it is accepted, but cannot test it is used today, because - # of not having a way to expose it in Python so far. - self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword)) - self.protocol.lineReceived(_b(" ]\n")) - self.protocol.lineReceived(_b("]\n")) - details = {} - details['message'] = Content(ContentType("text", "plain"), - lambda:[_b("]\n")]) - self.assertSuccess(details) - - def test_success_quoted_bracket(self): - self.success_quoted_bracket("success") - - def test_success_colon_quoted_bracket(self): - self.success_quoted_bracket("success:") - - -class TestTestProtocolServerProgress(unittest.TestCase): - """Test receipt of progress: directives.""" - - def test_progress_accepted_stdlib(self): - self.result = Python26TestResult() - self.stream = BytesIO() - self.protocol = subunit.TestProtocolServer(self.result, - stream=self.stream) - self.protocol.lineReceived(_b("progress: 23")) - self.protocol.lineReceived(_b("progress: -2")) - self.protocol.lineReceived(_b("progress: +4")) - self.assertEqual(_b(""), self.stream.getvalue()) - - def test_progress_accepted_extended(self): - # With a progress capable TestResult, progress events are emitted. - self.result = ExtendedTestResult() - self.stream = BytesIO() - self.protocol = subunit.TestProtocolServer(self.result, - stream=self.stream) - self.protocol.lineReceived(_b("progress: 23")) - self.protocol.lineReceived(_b("progress: push")) - self.protocol.lineReceived(_b("progress: -2")) - self.protocol.lineReceived(_b("progress: pop")) - self.protocol.lineReceived(_b("progress: +4")) - self.assertEqual(_b(""), self.stream.getvalue()) - self.assertEqual([ - ('progress', 23, subunit.PROGRESS_SET), - ('progress', None, subunit.PROGRESS_PUSH), - ('progress', -2, subunit.PROGRESS_CUR), - ('progress', None, subunit.PROGRESS_POP), - ('progress', 4, subunit.PROGRESS_CUR), - ], self.result._events) - - -class TestTestProtocolServerStreamTags(unittest.TestCase): - """Test managing tags on the protocol level.""" - - def setUp(self): - self.client = ExtendedTestResult() - self.protocol = subunit.TestProtocolServer(self.client) - - def test_initial_tags(self): - self.protocol.lineReceived(_b("tags: foo bar:baz quux\n")) - self.assertEqual([ - ('tags', set(["foo", "bar:baz", "quux"]), set()), - ], self.client._events) - - def test_minus_removes_tags(self): - self.protocol.lineReceived(_b("tags: -bar quux\n")) - self.assertEqual([ - ('tags', set(["quux"]), set(["bar"])), - ], self.client._events) - - def test_tags_do_not_get_set_on_test(self): - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - test = self.client._events[0][-1] - self.assertEqual(None, getattr(test, 'tags', None)) - - def test_tags_do_not_get_set_on_global_tags(self): - self.protocol.lineReceived(_b("tags: foo bar\n")) - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - test = self.client._events[-1][-1] - self.assertEqual(None, getattr(test, 'tags', None)) - - def test_tags_get_set_on_test_tags(self): - self.protocol.lineReceived(_b("test mcdonalds farm\n")) - test = self.client._events[-1][-1] - self.protocol.lineReceived(_b("tags: foo bar\n")) - self.protocol.lineReceived(_b("success mcdonalds farm\n")) - self.assertEqual(None, getattr(test, 'tags', None)) - - -class TestTestProtocolServerStreamTime(unittest.TestCase): - """Test managing time information at the protocol level.""" - - def test_time_accepted_stdlib(self): - self.result = Python26TestResult() - self.stream = BytesIO() - self.protocol = subunit.TestProtocolServer(self.result, - stream=self.stream) - self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n")) - self.assertEqual(_b(""), self.stream.getvalue()) - - def test_time_accepted_extended(self): - self.result = ExtendedTestResult() - self.stream = BytesIO() - self.protocol = subunit.TestProtocolServer(self.result, - stream=self.stream) - self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n")) - self.assertEqual(_b(""), self.stream.getvalue()) - self.assertEqual([ - ('time', datetime.datetime(2001, 12, 12, 12, 59, 59, 0, - iso8601.Utc())) - ], self.result._events) - - -class TestRemotedTestCase(unittest.TestCase): - - def test_simple(self): - test = subunit.RemotedTestCase("A test description") - self.assertRaises(NotImplementedError, test.setUp) - self.assertRaises(NotImplementedError, test.tearDown) - self.assertEqual("A test description", - test.shortDescription()) - self.assertEqual("A test description", - test.id()) - self.assertEqual("A test description (subunit.RemotedTestCase)", "%s" % test) - self.assertEqual("<subunit.RemotedTestCase description=" - "'A test description'>", "%r" % test) - result = unittest.TestResult() - test.run(result) - self.assertEqual([(test, _remote_exception_str + ": " - "Cannot run RemotedTestCases.\n\n")], - result.errors) - self.assertEqual(1, result.testsRun) - another_test = subunit.RemotedTestCase("A test description") - self.assertEqual(test, another_test) - different_test = subunit.RemotedTestCase("ofo") - self.assertNotEqual(test, different_test) - self.assertNotEqual(another_test, different_test) - - -class TestRemoteError(unittest.TestCase): - - def test_eq(self): - error = subunit.RemoteError(_u("Something went wrong")) - another_error = subunit.RemoteError(_u("Something went wrong")) - different_error = subunit.RemoteError(_u("boo!")) - self.assertEqual(error, another_error) - self.assertNotEqual(error, different_error) - self.assertNotEqual(different_error, another_error) - - def test_empty_constructor(self): - self.assertEqual(subunit.RemoteError(), subunit.RemoteError(_u(""))) - - -class TestExecTestCase(unittest.TestCase): - - class SampleExecTestCase(subunit.ExecTestCase): - - def test_sample_method(self): - """sample-script.py""" - # the sample script runs three tests, one each - # that fails, errors and succeeds - - def test_sample_method_args(self): - """sample-script.py foo""" - # sample that will run just one test. - - def test_construct(self): - test = self.SampleExecTestCase("test_sample_method") - self.assertEqual(test.script, - subunit.join_dir(__file__, 'sample-script.py')) - - def test_args(self): - result = unittest.TestResult() - test = self.SampleExecTestCase("test_sample_method_args") - test.run(result) - self.assertEqual(1, result.testsRun) - - def test_run(self): - result = ExtendedTestResult() - test = self.SampleExecTestCase("test_sample_method") - test.run(result) - mcdonald = subunit.RemotedTestCase("old mcdonald") - bing = subunit.RemotedTestCase("bing crosby") - bing_details = {} - bing_details['traceback'] = Content(ContentType("text", "x-traceback", - {'charset': 'utf8'}), lambda:[_b("foo.c:53:ERROR invalid state\n")]) - an_error = subunit.RemotedTestCase("an error") - error_details = {} - self.assertEqual([ - ('startTest', mcdonald), - ('addSuccess', mcdonald), - ('stopTest', mcdonald), - ('startTest', bing), - ('addFailure', bing, bing_details), - ('stopTest', bing), - ('startTest', an_error), - ('addError', an_error, error_details), - ('stopTest', an_error), - ], result._events) - - def test_debug(self): - test = self.SampleExecTestCase("test_sample_method") - test.debug() - - def test_count_test_cases(self): - """TODO run the child process and count responses to determine the count.""" - - def test_join_dir(self): - sibling = subunit.join_dir(__file__, 'foo') - filedir = os.path.abspath(os.path.dirname(__file__)) - expected = os.path.join(filedir, 'foo') - self.assertEqual(sibling, expected) - - -class DoExecTestCase(subunit.ExecTestCase): - - def test_working_script(self): - """sample-two-script.py""" - - -class TestIsolatedTestCase(TestCase): - - class SampleIsolatedTestCase(subunit.IsolatedTestCase): - - SETUP = False - TEARDOWN = False - TEST = False - - def setUp(self): - TestIsolatedTestCase.SampleIsolatedTestCase.SETUP = True - - def tearDown(self): - TestIsolatedTestCase.SampleIsolatedTestCase.TEARDOWN = True - - def test_sets_global_state(self): - TestIsolatedTestCase.SampleIsolatedTestCase.TEST = True - - - def test_construct(self): - self.SampleIsolatedTestCase("test_sets_global_state") - - @skipIf(os.name != "posix", "Need a posix system for forking tests") - def test_run(self): - result = unittest.TestResult() - test = self.SampleIsolatedTestCase("test_sets_global_state") - test.run(result) - self.assertEqual(result.testsRun, 1) - self.assertEqual(self.SampleIsolatedTestCase.SETUP, False) - self.assertEqual(self.SampleIsolatedTestCase.TEARDOWN, False) - self.assertEqual(self.SampleIsolatedTestCase.TEST, False) - - def test_debug(self): - pass - #test = self.SampleExecTestCase("test_sample_method") - #test.debug() - - -class TestIsolatedTestSuite(TestCase): - - class SampleTestToIsolate(unittest.TestCase): - - SETUP = False - TEARDOWN = False - TEST = False - - def setUp(self): - TestIsolatedTestSuite.SampleTestToIsolate.SETUP = True - - def tearDown(self): - TestIsolatedTestSuite.SampleTestToIsolate.TEARDOWN = True - - def test_sets_global_state(self): - TestIsolatedTestSuite.SampleTestToIsolate.TEST = True - - - def test_construct(self): - subunit.IsolatedTestSuite() - - @skipIf(os.name != "posix", "Need a posix system for forking tests") - def test_run(self): - result = unittest.TestResult() - suite = subunit.IsolatedTestSuite() - sub_suite = unittest.TestSuite() - sub_suite.addTest(self.SampleTestToIsolate("test_sets_global_state")) - sub_suite.addTest(self.SampleTestToIsolate("test_sets_global_state")) - suite.addTest(sub_suite) - suite.addTest(self.SampleTestToIsolate("test_sets_global_state")) - suite.run(result) - self.assertEqual(result.testsRun, 3) - self.assertEqual(self.SampleTestToIsolate.SETUP, False) - self.assertEqual(self.SampleTestToIsolate.TEARDOWN, False) - self.assertEqual(self.SampleTestToIsolate.TEST, False) - - -class TestTestProtocolClient(unittest.TestCase): - - def setUp(self): - self.io = BytesIO() - self.protocol = subunit.TestProtocolClient(self.io) - self.unicode_test = PlaceHolder(_u('\u2603')) - self.test = TestTestProtocolClient("test_start_test") - self.sample_details = {'something':Content( - ContentType('text', 'plain'), lambda:[_b('serialised\nform')])} - self.sample_tb_details = dict(self.sample_details) - self.sample_tb_details['traceback'] = TracebackContent( - subunit.RemoteError(_u("boo qux")), self.test) - - def test_start_test(self): - """Test startTest on a TestProtocolClient.""" - self.protocol.startTest(self.test) - self.assertEqual(self.io.getvalue(), _b("test: %s\n" % self.test.id())) - - def test_start_test_unicode_id(self): - """Test startTest on a TestProtocolClient.""" - self.protocol.startTest(self.unicode_test) - expected = _b("test: ") + _u('\u2603').encode('utf8') + _b("\n") - self.assertEqual(expected, self.io.getvalue()) - - def test_stop_test(self): - # stopTest doesn't output anything. - self.protocol.stopTest(self.test) - self.assertEqual(self.io.getvalue(), _b("")) - - def test_add_success(self): - """Test addSuccess on a TestProtocolClient.""" - self.protocol.addSuccess(self.test) - self.assertEqual( - self.io.getvalue(), _b("successful: %s\n" % self.test.id())) - - def test_add_outcome_unicode_id(self): - """Test addSuccess on a TestProtocolClient.""" - self.protocol.addSuccess(self.unicode_test) - expected = _b("successful: ") + _u('\u2603').encode('utf8') + _b("\n") - self.assertEqual(expected, self.io.getvalue()) - - def test_add_success_details(self): - """Test addSuccess on a TestProtocolClient with details.""" - self.protocol.addSuccess(self.test, details=self.sample_details) - self.assertEqual( - self.io.getvalue(), _b("successful: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n]\n" % self.test.id())) - - def test_add_failure(self): - """Test addFailure on a TestProtocolClient.""" - self.protocol.addFailure( - self.test, subunit.RemoteError(_u("boo qux"))) - self.assertEqual( - self.io.getvalue(), - _b(('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n') - % self.test.id())) - - def test_add_failure_details(self): - """Test addFailure on a TestProtocolClient with details.""" - self.protocol.addFailure( - self.test, details=self.sample_tb_details) - self.assertEqual( - self.io.getvalue(), - _b(("failure: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n" - "]\n") % self.test.id())) - - def test_add_error(self): - """Test stopTest on a TestProtocolClient.""" - self.protocol.addError( - self.test, subunit.RemoteError(_u("phwoar crikey"))) - self.assertEqual( - self.io.getvalue(), - _b(('error: %s [\n' + - _remote_exception_str + ": phwoar crikey\n" - "]\n") % self.test.id())) - - def test_add_error_details(self): - """Test stopTest on a TestProtocolClient with details.""" - self.protocol.addError( - self.test, details=self.sample_tb_details) - self.assertEqual( - self.io.getvalue(), - _b(("error: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n" - "]\n") % self.test.id())) - - def test_add_expected_failure(self): - """Test addExpectedFailure on a TestProtocolClient.""" - self.protocol.addExpectedFailure( - self.test, subunit.RemoteError(_u("phwoar crikey"))) - self.assertEqual( - self.io.getvalue(), - _b(('xfail: %s [\n' + - _remote_exception_str + ": phwoar crikey\n" - "]\n") % self.test.id())) - - def test_add_expected_failure_details(self): - """Test addExpectedFailure on a TestProtocolClient with details.""" - self.protocol.addExpectedFailure( - self.test, details=self.sample_tb_details) - self.assertEqual( - self.io.getvalue(), - _b(("xfail: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n" - "]\n") % self.test.id())) - - - def test_add_skip(self): - """Test addSkip on a TestProtocolClient.""" - self.protocol.addSkip( - self.test, "Has it really?") - self.assertEqual( - self.io.getvalue(), - _b('skip: %s [\nHas it really?\n]\n' % self.test.id())) - - def test_add_skip_details(self): - """Test addSkip on a TestProtocolClient with details.""" - details = {'reason':Content( - ContentType('text', 'plain'), lambda:[_b('Has it really?')])} - self.protocol.addSkip(self.test, details=details) - self.assertEqual( - self.io.getvalue(), - _b("skip: %s [ multipart\n" - "Content-Type: text/plain\n" - "reason\n" - "E\r\nHas it really?0\r\n" - "]\n" % self.test.id())) - - def test_progress_set(self): - self.protocol.progress(23, subunit.PROGRESS_SET) - self.assertEqual(self.io.getvalue(), _b('progress: 23\n')) - - def test_progress_neg_cur(self): - self.protocol.progress(-23, subunit.PROGRESS_CUR) - self.assertEqual(self.io.getvalue(), _b('progress: -23\n')) - - def test_progress_pos_cur(self): - self.protocol.progress(23, subunit.PROGRESS_CUR) - self.assertEqual(self.io.getvalue(), _b('progress: +23\n')) - - def test_progress_pop(self): - self.protocol.progress(1234, subunit.PROGRESS_POP) - self.assertEqual(self.io.getvalue(), _b('progress: pop\n')) - - def test_progress_push(self): - self.protocol.progress(1234, subunit.PROGRESS_PUSH) - self.assertEqual(self.io.getvalue(), _b('progress: push\n')) - - def test_time(self): - # Calling time() outputs a time signal immediately. - self.protocol.time( - datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())) - self.assertEqual( - _b("time: 2009-10-11 12:13:14.000015Z\n"), - self.io.getvalue()) - - def test_add_unexpected_success(self): - """Test addUnexpectedSuccess on a TestProtocolClient.""" - self.protocol.addUnexpectedSuccess(self.test) - self.assertEqual( - self.io.getvalue(), _b("uxsuccess: %s\n" % self.test.id())) - - def test_add_unexpected_success_details(self): - """Test addUnexpectedSuccess on a TestProtocolClient with details.""" - self.protocol.addUnexpectedSuccess(self.test, details=self.sample_details) - self.assertEqual( - self.io.getvalue(), _b("uxsuccess: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n]\n" % self.test.id())) - - def test_tags_empty(self): - self.protocol.tags(set(), set()) - self.assertEqual(_b(""), self.io.getvalue()) - - def test_tags_add(self): - self.protocol.tags(set(['foo']), set()) - self.assertEqual(_b("tags: foo\n"), self.io.getvalue()) - - def test_tags_both(self): - self.protocol.tags(set(['quux']), set(['bar'])) - self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue()) - - def test_tags_gone(self): - self.protocol.tags(set(), set(['bar'])) - self.assertEqual(_b("tags: -bar\n"), self.io.getvalue()) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py deleted file mode 100644 index ff74b9a818..0000000000 --- a/lib/subunit/python/subunit/tests/test_test_results.py +++ /dev/null @@ -1,572 +0,0 @@ -# -# subunit: extensions to Python unittest to get test results from subprocesses. -# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net> -# -# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause -# license at the users choice. A copy of both licenses are available in the -# project source as Apache-2.0 and BSD. You may not use this file except in -# compliance with one of these two licences. -# -# Unless required by applicable law or agreed to in writing, software -# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# license you chose for the specific language governing permissions and -# limitations under that license. -# - -import csv -import datetime -import sys -import unittest - -from testtools import TestCase -from testtools.compat import StringIO -from testtools.content import ( - text_content, - TracebackContent, - ) -from testtools.testresult.doubles import ExtendedTestResult - -import subunit -import subunit.iso8601 as iso8601 -import subunit.test_results - -import testtools - - -class LoggingDecorator(subunit.test_results.HookedTestResultDecorator): - - def __init__(self, decorated): - self._calls = 0 - super(LoggingDecorator, self).__init__(decorated) - - def _before_event(self): - self._calls += 1 - - -class AssertBeforeTestResult(LoggingDecorator): - """A TestResult for checking preconditions.""" - - def __init__(self, decorated, test): - self.test = test - super(AssertBeforeTestResult, self).__init__(decorated) - - def _before_event(self): - self.test.assertEqual(1, self.earlier._calls) - super(AssertBeforeTestResult, self)._before_event() - - -class TimeCapturingResult(unittest.TestResult): - - def __init__(self): - super(TimeCapturingResult, self).__init__() - self._calls = [] - self.failfast = False - - def time(self, a_datetime): - self._calls.append(a_datetime) - - -class TestHookedTestResultDecorator(unittest.TestCase): - - def setUp(self): - # An end to the chain - terminal = unittest.TestResult() - # Asserts that the call was made to self.result before asserter was - # called. - asserter = AssertBeforeTestResult(terminal, self) - # The result object we call, which much increase its call count. - self.result = LoggingDecorator(asserter) - asserter.earlier = self.result - self.decorated = asserter - - def tearDown(self): - # The hook in self.result must have been called - self.assertEqual(1, self.result._calls) - # The hook in asserter must have been called too, otherwise the - # assertion about ordering won't have completed. - self.assertEqual(1, self.decorated._calls) - - def test_startTest(self): - self.result.startTest(self) - - def test_startTestRun(self): - self.result.startTestRun() - - def test_stopTest(self): - self.result.stopTest(self) - - def test_stopTestRun(self): - self.result.stopTestRun() - - def test_addError(self): - self.result.addError(self, subunit.RemoteError()) - - def test_addError_details(self): - self.result.addError(self, details={}) - - def test_addFailure(self): - self.result.addFailure(self, subunit.RemoteError()) - - def test_addFailure_details(self): - self.result.addFailure(self, details={}) - - def test_addSuccess(self): - self.result.addSuccess(self) - - def test_addSuccess_details(self): - self.result.addSuccess(self, details={}) - - def test_addSkip(self): - self.result.addSkip(self, "foo") - - def test_addSkip_details(self): - self.result.addSkip(self, details={}) - - def test_addExpectedFailure(self): - self.result.addExpectedFailure(self, subunit.RemoteError()) - - def test_addExpectedFailure_details(self): - self.result.addExpectedFailure(self, details={}) - - def test_addUnexpectedSuccess(self): - self.result.addUnexpectedSuccess(self) - - def test_addUnexpectedSuccess_details(self): - self.result.addUnexpectedSuccess(self, details={}) - - def test_progress(self): - self.result.progress(1, subunit.PROGRESS_SET) - - def test_wasSuccessful(self): - self.result.wasSuccessful() - - def test_shouldStop(self): - self.result.shouldStop - - def test_stop(self): - self.result.stop() - - def test_time(self): - self.result.time(None) - - -class TestAutoTimingTestResultDecorator(unittest.TestCase): - - def setUp(self): - # And end to the chain which captures time events. - terminal = TimeCapturingResult() - # The result object under test. - self.result = subunit.test_results.AutoTimingTestResultDecorator( - terminal) - self.decorated = terminal - - def test_without_time_calls_time_is_called_and_not_None(self): - self.result.startTest(self) - self.assertEqual(1, len(self.decorated._calls)) - self.assertNotEqual(None, self.decorated._calls[0]) - - def test_no_time_from_progress(self): - self.result.progress(1, subunit.PROGRESS_CUR) - self.assertEqual(0, len(self.decorated._calls)) - - def test_no_time_from_shouldStop(self): - self.decorated.stop() - self.result.shouldStop - self.assertEqual(0, len(self.decorated._calls)) - - def test_calling_time_inhibits_automatic_time(self): - # Calling time() outputs a time signal immediately and prevents - # automatically adding one when other methods are called. - time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()) - self.result.time(time) - self.result.startTest(self) - self.result.stopTest(self) - self.assertEqual(1, len(self.decorated._calls)) - self.assertEqual(time, self.decorated._calls[0]) - - def test_calling_time_None_enables_automatic_time(self): - time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()) - self.result.time(time) - self.assertEqual(1, len(self.decorated._calls)) - self.assertEqual(time, self.decorated._calls[0]) - # Calling None passes the None through, in case other results care. - self.result.time(None) - self.assertEqual(2, len(self.decorated._calls)) - self.assertEqual(None, self.decorated._calls[1]) - # Calling other methods doesn't generate an automatic time event. - self.result.startTest(self) - self.assertEqual(3, len(self.decorated._calls)) - self.assertNotEqual(None, self.decorated._calls[2]) - - def test_set_failfast_True(self): - self.assertFalse(self.decorated.failfast) - self.result.failfast = True - self.assertTrue(self.decorated.failfast) - - -class TestTagCollapsingDecorator(TestCase): - - def test_tags_collapsed_outside_of_tests(self): - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - tag_collapser.tags(set(['a']), set()) - tag_collapser.tags(set(['b']), set()) - tag_collapser.startTest(self) - self.assertEquals( - [('tags', set(['a', 'b']), set([])), - ('startTest', self), - ], result._events) - - def test_tags_collapsed_outside_of_tests_are_flushed(self): - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - tag_collapser.startTestRun() - tag_collapser.tags(set(['a']), set()) - tag_collapser.tags(set(['b']), set()) - tag_collapser.startTest(self) - tag_collapser.addSuccess(self) - tag_collapser.stopTest(self) - tag_collapser.stopTestRun() - self.assertEquals( - [('startTestRun',), - ('tags', set(['a', 'b']), set([])), - ('startTest', self), - ('addSuccess', self), - ('stopTest', self), - ('stopTestRun',), - ], result._events) - - def test_tags_forwarded_after_tests(self): - test = subunit.RemotedTestCase('foo') - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - tag_collapser.startTestRun() - tag_collapser.startTest(test) - tag_collapser.addSuccess(test) - tag_collapser.stopTest(test) - tag_collapser.tags(set(['a']), set(['b'])) - tag_collapser.stopTestRun() - self.assertEqual( - [('startTestRun',), - ('startTest', test), - ('addSuccess', test), - ('stopTest', test), - ('tags', set(['a']), set(['b'])), - ('stopTestRun',), - ], - result._events) - - def test_tags_collapsed_inside_of_tests(self): - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - test = subunit.RemotedTestCase('foo') - tag_collapser.startTest(test) - tag_collapser.tags(set(['a']), set()) - tag_collapser.tags(set(['b']), set(['a'])) - tag_collapser.tags(set(['c']), set()) - tag_collapser.stopTest(test) - self.assertEquals( - [('startTest', test), - ('tags', set(['b', 'c']), set(['a'])), - ('stopTest', test)], - result._events) - - def test_tags_collapsed_inside_of_tests_different_ordering(self): - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - test = subunit.RemotedTestCase('foo') - tag_collapser.startTest(test) - tag_collapser.tags(set(), set(['a'])) - tag_collapser.tags(set(['a', 'b']), set()) - tag_collapser.tags(set(['c']), set()) - tag_collapser.stopTest(test) - self.assertEquals( - [('startTest', test), - ('tags', set(['a', 'b', 'c']), set()), - ('stopTest', test)], - result._events) - - def test_tags_sent_before_result(self): - # Because addSuccess and friends tend to send subunit output - # immediately, and because 'tags:' before a result line means - # something different to 'tags:' after a result line, we need to be - # sure that tags are emitted before 'addSuccess' (or whatever). - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TagCollapsingDecorator(result) - test = subunit.RemotedTestCase('foo') - tag_collapser.startTest(test) - tag_collapser.tags(set(['a']), set()) - tag_collapser.addSuccess(test) - tag_collapser.stopTest(test) - self.assertEquals( - [('startTest', test), - ('tags', set(['a']), set()), - ('addSuccess', test), - ('stopTest', test)], - result._events) - - -class TestTimeCollapsingDecorator(TestCase): - - def make_time(self): - # Heh heh. - return datetime.datetime( - 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC) - - def test_initial_time_forwarded(self): - # We always forward the first time event we see. - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) - a_time = self.make_time() - tag_collapser.time(a_time) - self.assertEquals([('time', a_time)], result._events) - - def test_time_collapsed_to_first_and_last(self): - # If there are many consecutive time events, only the first and last - # are sent through. - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) - times = [self.make_time() for i in range(5)] - for a_time in times: - tag_collapser.time(a_time) - tag_collapser.startTest(subunit.RemotedTestCase('foo')) - self.assertEquals( - [('time', times[0]), ('time', times[-1])], result._events[:-1]) - - def test_only_one_time_sent(self): - # If we receive a single time event followed by a non-time event, we - # send exactly one time event. - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) - a_time = self.make_time() - tag_collapser.time(a_time) - tag_collapser.startTest(subunit.RemotedTestCase('foo')) - self.assertEquals([('time', a_time)], result._events[:-1]) - - def test_duplicate_times_not_sent(self): - # Many time events with the exact same time are collapsed into one - # time event. - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) - a_time = self.make_time() - for i in range(5): - tag_collapser.time(a_time) - tag_collapser.startTest(subunit.RemotedTestCase('foo')) - self.assertEquals([('time', a_time)], result._events[:-1]) - - def test_no_times_inserted(self): - result = ExtendedTestResult() - tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) - a_time = self.make_time() - tag_collapser.time(a_time) - foo = subunit.RemotedTestCase('foo') - tag_collapser.startTest(foo) - tag_collapser.addSuccess(foo) - tag_collapser.stopTest(foo) - self.assertEquals( - [('time', a_time), - ('startTest', foo), - ('addSuccess', foo), - ('stopTest', foo)], result._events) - - -class TestByTestResultTests(testtools.TestCase): - - def setUp(self): - super(TestByTestResultTests, self).setUp() - self.log = [] - self.result = subunit.test_results.TestByTestResult(self.on_test) - if sys.version_info >= (3, 0): - self.result._now = iter(range(5)).__next__ - else: - self.result._now = iter(range(5)).next - - def assertCalled(self, **kwargs): - defaults = { - 'test': self, - 'tags': set(), - 'details': None, - 'start_time': 0, - 'stop_time': 1, - } - defaults.update(kwargs) - self.assertEqual([defaults], self.log) - - def on_test(self, **kwargs): - self.log.append(kwargs) - - def test_no_tests_nothing_reported(self): - self.result.startTestRun() - self.result.stopTestRun() - self.assertEqual([], self.log) - - def test_add_success(self): - self.result.startTest(self) - self.result.addSuccess(self) - self.result.stopTest(self) - self.assertCalled(status='success') - - def test_add_success_details(self): - self.result.startTest(self) - details = {'foo': 'bar'} - self.result.addSuccess(self, details=details) - self.result.stopTest(self) - self.assertCalled(status='success', details=details) - - def test_tags(self): - if not getattr(self.result, 'tags', None): - self.skipTest("No tags in testtools") - self.result.tags(['foo'], []) - self.result.startTest(self) - self.result.addSuccess(self) - self.result.stopTest(self) - self.assertCalled(status='success', tags=set(['foo'])) - - def test_add_error(self): - self.result.startTest(self) - try: - 1/0 - except ZeroDivisionError: - error = sys.exc_info() - self.result.addError(self, error) - self.result.stopTest(self) - self.assertCalled( - status='error', - details={'traceback': TracebackContent(error, self)}) - - def test_add_error_details(self): - self.result.startTest(self) - details = {"foo": text_content("bar")} - self.result.addError(self, details=details) - self.result.stopTest(self) - self.assertCalled(status='error', details=details) - - def test_add_failure(self): - self.result.startTest(self) - try: - self.fail("intentional failure") - except self.failureException: - failure = sys.exc_info() - self.result.addFailure(self, failure) - self.result.stopTest(self) - self.assertCalled( - status='failure', - details={'traceback': TracebackContent(failure, self)}) - - def test_add_failure_details(self): - self.result.startTest(self) - details = {"foo": text_content("bar")} - self.result.addFailure(self, details=details) - self.result.stopTest(self) - self.assertCalled(status='failure', details=details) - - def test_add_xfail(self): - self.result.startTest(self) - try: - 1/0 - except ZeroDivisionError: - error = sys.exc_info() - self.result.addExpectedFailure(self, error) - self.result.stopTest(self) - self.assertCalled( - status='xfail', - details={'traceback': TracebackContent(error, self)}) - - def test_add_xfail_details(self): - self.result.startTest(self) - details = {"foo": text_content("bar")} - self.result.addExpectedFailure(self, details=details) - self.result.stopTest(self) - self.assertCalled(status='xfail', details=details) - - def test_add_unexpected_success(self): - self.result.startTest(self) - details = {'foo': 'bar'} - self.result.addUnexpectedSuccess(self, details=details) - self.result.stopTest(self) - self.assertCalled(status='success', details=details) - - def test_add_skip_reason(self): - self.result.startTest(self) - reason = self.getUniqueString() - self.result.addSkip(self, reason) - self.result.stopTest(self) - self.assertCalled( - status='skip', details={'reason': text_content(reason)}) - - def test_add_skip_details(self): - self.result.startTest(self) - details = {'foo': 'bar'} - self.result.addSkip(self, details=details) - self.result.stopTest(self) - self.assertCalled(status='skip', details=details) - - def test_twice(self): - self.result.startTest(self) - self.result.addSuccess(self, details={'foo': 'bar'}) - self.result.stopTest(self) - self.result.startTest(self) - self.result.addSuccess(self) - self.result.stopTest(self) - self.assertEqual( - [{'test': self, - 'status': 'success', - 'start_time': 0, - 'stop_time': 1, - 'tags': set(), - 'details': {'foo': 'bar'}}, - {'test': self, - 'status': 'success', - 'start_time': 2, - 'stop_time': 3, - 'tags': set(), - 'details': None}, - ], - self.log) - - -class TestCsvResult(testtools.TestCase): - - def parse_stream(self, stream): - stream.seek(0) - reader = csv.reader(stream) - return list(reader) - - def test_csv_output(self): - stream = StringIO() - result = subunit.test_results.CsvResult(stream) - if sys.version_info >= (3, 0): - result._now = iter(range(5)).__next__ - else: - result._now = iter(range(5)).next - result.startTestRun() - result.startTest(self) - result.addSuccess(self) - result.stopTest(self) - result.stopTestRun() - self.assertEqual( - [['test', 'status', 'start_time', 'stop_time'], - [self.id(), 'success', '0', '1'], - ], - self.parse_stream(stream)) - - def test_just_header_when_no_tests(self): - stream = StringIO() - result = subunit.test_results.CsvResult(stream) - result.startTestRun() - result.stopTestRun() - self.assertEqual( - [['test', 'status', 'start_time', 'stop_time']], - self.parse_stream(stream)) - - def test_no_output_before_events(self): - stream = StringIO() - subunit.test_results.CsvResult(stream) - self.assertEqual([], self.parse_stream(stream)) - - -def test_suite(): - loader = subunit.tests.TestUtil.TestLoader() - result = loader.loadTestsFromName(__name__) - return result |