summaryrefslogtreecommitdiffstats
path: root/lib/subunit/python/subunit/tests
diff options
context:
space:
mode:
Diffstat (limited to 'lib/subunit/python/subunit/tests')
-rw-r--r--lib/subunit/python/subunit/tests/TestUtil.py80
-rw-r--r--lib/subunit/python/subunit/tests/__init__.py43
-rwxr-xr-xlib/subunit/python/subunit/tests/sample-script.py21
-rwxr-xr-xlib/subunit/python/subunit/tests/sample-two-script.py7
-rw-r--r--lib/subunit/python/subunit/tests/test_chunked.py152
-rw-r--r--lib/subunit/python/subunit/tests/test_details.py112
-rw-r--r--lib/subunit/python/subunit/tests/test_progress_model.py118
-rw-r--r--lib/subunit/python/subunit/tests/test_run.py52
-rw-r--r--lib/subunit/python/subunit/tests/test_subunit_filter.py370
-rw-r--r--lib/subunit/python/subunit/tests/test_subunit_stats.py84
-rw-r--r--lib/subunit/python/subunit/tests/test_subunit_tags.py69
-rw-r--r--lib/subunit/python/subunit/tests/test_tap2subunit.py445
-rw-r--r--lib/subunit/python/subunit/tests/test_test_protocol.py1337
-rw-r--r--lib/subunit/python/subunit/tests/test_test_results.py572
14 files changed, 0 insertions, 3462 deletions
diff --git a/lib/subunit/python/subunit/tests/TestUtil.py b/lib/subunit/python/subunit/tests/TestUtil.py
deleted file mode 100644
index 39d901e0a9..0000000000
--- a/lib/subunit/python/subunit/tests/TestUtil.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright (c) 2004 Canonical Limited
-# Author: Robert Collins <robert.collins@canonical.com>
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-#
-
-import sys
-import logging
-import unittest
-
-
-class LogCollector(logging.Handler):
- def __init__(self):
- logging.Handler.__init__(self)
- self.records=[]
- def emit(self, record):
- self.records.append(record.getMessage())
-
-
-def makeCollectingLogger():
- """I make a logger instance that collects its logs for programmatic analysis
- -> (logger, collector)"""
- logger=logging.Logger("collector")
- handler=LogCollector()
- handler.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
- logger.addHandler(handler)
- return logger, handler
-
-
-def visitTests(suite, visitor):
- """A foreign method for visiting the tests in a test suite."""
- for test in suite._tests:
- #Abusing types to avoid monkey patching unittest.TestCase.
- # Maybe that would be better?
- try:
- test.visit(visitor)
- except AttributeError:
- if isinstance(test, unittest.TestCase):
- visitor.visitCase(test)
- elif isinstance(test, unittest.TestSuite):
- visitor.visitSuite(test)
- visitTests(test, visitor)
- else:
- print ("unvisitable non-unittest.TestCase element %r (%r)" % (test, test.__class__))
-
-
-class TestSuite(unittest.TestSuite):
- """I am an extended TestSuite with a visitor interface.
- This is primarily to allow filtering of tests - and suites or
- more in the future. An iterator of just tests wouldn't scale..."""
-
- def visit(self, visitor):
- """visit the composite. Visiting is depth-first.
- current callbacks are visitSuite and visitCase."""
- visitor.visitSuite(self)
- visitTests(self, visitor)
-
-
-class TestLoader(unittest.TestLoader):
- """Custome TestLoader to set the right TestSuite class."""
- suiteClass = TestSuite
-
-class TestVisitor(object):
- """A visitor for Tests"""
- def visitSuite(self, aTestSuite):
- pass
- def visitCase(self, aTestCase):
- pass
diff --git a/lib/subunit/python/subunit/tests/__init__.py b/lib/subunit/python/subunit/tests/__init__.py
deleted file mode 100644
index e0e1eb1b04..0000000000
--- a/lib/subunit/python/subunit/tests/__init__.py
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-from subunit.tests import (
- TestUtil,
- test_chunked,
- test_details,
- test_progress_model,
- test_run,
- test_subunit_filter,
- test_subunit_stats,
- test_subunit_tags,
- test_tap2subunit,
- test_test_protocol,
- test_test_results,
- )
-
-def test_suite():
- result = TestUtil.TestSuite()
- result.addTest(test_chunked.test_suite())
- result.addTest(test_details.test_suite())
- result.addTest(test_progress_model.test_suite())
- result.addTest(test_test_results.test_suite())
- result.addTest(test_test_protocol.test_suite())
- result.addTest(test_tap2subunit.test_suite())
- result.addTest(test_subunit_filter.test_suite())
- result.addTest(test_subunit_tags.test_suite())
- result.addTest(test_subunit_stats.test_suite())
- result.addTest(test_run.test_suite())
- return result
diff --git a/lib/subunit/python/subunit/tests/sample-script.py b/lib/subunit/python/subunit/tests/sample-script.py
deleted file mode 100755
index 91838f6d6f..0000000000
--- a/lib/subunit/python/subunit/tests/sample-script.py
+++ /dev/null
@@ -1,21 +0,0 @@
-#!/usr/bin/env python
-import sys
-if sys.platform == "win32":
- import msvcrt, os
- msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)
-if len(sys.argv) == 2:
- # subunit.tests.test_test_protocol.TestExecTestCase.test_sample_method_args
- # uses this code path to be sure that the arguments were passed to
- # sample-script.py
- print("test fail")
- print("error fail")
- sys.exit(0)
-print("test old mcdonald")
-print("success old mcdonald")
-print("test bing crosby")
-print("failure bing crosby [")
-print("foo.c:53:ERROR invalid state")
-print("]")
-print("test an error")
-print("error an error")
-sys.exit(0)
diff --git a/lib/subunit/python/subunit/tests/sample-two-script.py b/lib/subunit/python/subunit/tests/sample-two-script.py
deleted file mode 100755
index fc73dfc409..0000000000
--- a/lib/subunit/python/subunit/tests/sample-two-script.py
+++ /dev/null
@@ -1,7 +0,0 @@
-#!/usr/bin/env python
-import sys
-print("test old mcdonald")
-print("success old mcdonald")
-print("test bing crosby")
-print("success bing crosby")
-sys.exit(0)
diff --git a/lib/subunit/python/subunit/tests/test_chunked.py b/lib/subunit/python/subunit/tests/test_chunked.py
deleted file mode 100644
index e0742f1af3..0000000000
--- a/lib/subunit/python/subunit/tests/test_chunked.py
+++ /dev/null
@@ -1,152 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-# Copyright (C) 2011 Martin Pool <mbp@sourcefrog.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-import unittest
-
-from testtools.compat import _b, BytesIO
-
-import subunit.chunked
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
-
-
-class TestDecode(unittest.TestCase):
-
- def setUp(self):
- unittest.TestCase.setUp(self)
- self.output = BytesIO()
- self.decoder = subunit.chunked.Decoder(self.output)
-
- def test_close_read_length_short_errors(self):
- self.assertRaises(ValueError, self.decoder.close)
-
- def test_close_body_short_errors(self):
- self.assertEqual(None, self.decoder.write(_b('2\r\na')))
- self.assertRaises(ValueError, self.decoder.close)
-
- def test_close_body_buffered_data_errors(self):
- self.assertEqual(None, self.decoder.write(_b('2\r')))
- self.assertRaises(ValueError, self.decoder.close)
-
- def test_close_after_finished_stream_safe(self):
- self.assertEqual(None, self.decoder.write(_b('2\r\nab')))
- self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
- self.decoder.close()
-
- def test_decode_nothing(self):
- self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
- self.assertEqual(_b(''), self.output.getvalue())
-
- def test_decode_serialised_form(self):
- self.assertEqual(None, self.decoder.write(_b("F\r\n")))
- self.assertEqual(None, self.decoder.write(_b("serialised\n")))
- self.assertEqual(_b(''), self.decoder.write(_b("form0\r\n")))
-
- def test_decode_short(self):
- self.assertEqual(_b(''), self.decoder.write(_b('3\r\nabc0\r\n')))
- self.assertEqual(_b('abc'), self.output.getvalue())
-
- def test_decode_combines_short(self):
- self.assertEqual(_b(''), self.decoder.write(_b('6\r\nabcdef0\r\n')))
- self.assertEqual(_b('abcdef'), self.output.getvalue())
-
- def test_decode_excess_bytes_from_write(self):
- self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
- self.assertEqual(_b('abc'), self.output.getvalue())
-
- def test_decode_write_after_finished_errors(self):
- self.assertEqual(_b('1234'), self.decoder.write(_b('3\r\nabc0\r\n1234')))
- self.assertRaises(ValueError, self.decoder.write, _b(''))
-
- def test_decode_hex(self):
- self.assertEqual(_b(''), self.decoder.write(_b('A\r\n12345678900\r\n')))
- self.assertEqual(_b('1234567890'), self.output.getvalue())
-
- def test_decode_long_ranges(self):
- self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
- self.assertEqual(None, self.decoder.write(_b('1' * 65536)))
- self.assertEqual(None, self.decoder.write(_b('10000\r\n')))
- self.assertEqual(None, self.decoder.write(_b('2' * 65536)))
- self.assertEqual(_b(''), self.decoder.write(_b('0\r\n')))
- self.assertEqual(_b('1' * 65536 + '2' * 65536), self.output.getvalue())
-
- def test_decode_newline_nonstrict(self):
- """Tolerate chunk markers with no CR character."""
- # From <http://pad.lv/505078>
- self.decoder = subunit.chunked.Decoder(self.output, strict=False)
- self.assertEqual(None, self.decoder.write(_b('a\n')))
- self.assertEqual(None, self.decoder.write(_b('abcdeabcde')))
- self.assertEqual(_b(''), self.decoder.write(_b('0\n')))
- self.assertEqual(_b('abcdeabcde'), self.output.getvalue())
-
- def test_decode_strict_newline_only(self):
- """Reject chunk markers with no CR character in strict mode."""
- # From <http://pad.lv/505078>
- self.assertRaises(ValueError,
- self.decoder.write, _b('a\n'))
-
- def test_decode_strict_multiple_crs(self):
- self.assertRaises(ValueError,
- self.decoder.write, _b('a\r\r\n'))
-
- def test_decode_short_header(self):
- self.assertRaises(ValueError,
- self.decoder.write, _b('\n'))
-
-
-class TestEncode(unittest.TestCase):
-
- def setUp(self):
- unittest.TestCase.setUp(self)
- self.output = BytesIO()
- self.encoder = subunit.chunked.Encoder(self.output)
-
- def test_encode_nothing(self):
- self.encoder.close()
- self.assertEqual(_b('0\r\n'), self.output.getvalue())
-
- def test_encode_empty(self):
- self.encoder.write(_b(''))
- self.encoder.close()
- self.assertEqual(_b('0\r\n'), self.output.getvalue())
-
- def test_encode_short(self):
- self.encoder.write(_b('abc'))
- self.encoder.close()
- self.assertEqual(_b('3\r\nabc0\r\n'), self.output.getvalue())
-
- def test_encode_combines_short(self):
- self.encoder.write(_b('abc'))
- self.encoder.write(_b('def'))
- self.encoder.close()
- self.assertEqual(_b('6\r\nabcdef0\r\n'), self.output.getvalue())
-
- def test_encode_over_9_is_in_hex(self):
- self.encoder.write(_b('1234567890'))
- self.encoder.close()
- self.assertEqual(_b('A\r\n12345678900\r\n'), self.output.getvalue())
-
- def test_encode_long_ranges_not_combined(self):
- self.encoder.write(_b('1' * 65536))
- self.encoder.write(_b('2' * 65536))
- self.encoder.close()
- self.assertEqual(_b('10000\r\n' + '1' * 65536 + '10000\r\n' +
- '2' * 65536 + '0\r\n'), self.output.getvalue())
diff --git a/lib/subunit/python/subunit/tests/test_details.py b/lib/subunit/python/subunit/tests/test_details.py
deleted file mode 100644
index 746aa041e5..0000000000
--- a/lib/subunit/python/subunit/tests/test_details.py
+++ /dev/null
@@ -1,112 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-import unittest
-
-from testtools.compat import _b, StringIO
-
-import subunit.tests
-from subunit import content, content_type, details
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
-
-
-class TestSimpleDetails(unittest.TestCase):
-
- def test_lineReceived(self):
- parser = details.SimpleDetailsParser(None)
- parser.lineReceived(_b("foo\n"))
- parser.lineReceived(_b("bar\n"))
- self.assertEqual(_b("foo\nbar\n"), parser._message)
-
- def test_lineReceived_escaped_bracket(self):
- parser = details.SimpleDetailsParser(None)
- parser.lineReceived(_b("foo\n"))
- parser.lineReceived(_b(" ]are\n"))
- parser.lineReceived(_b("bar\n"))
- self.assertEqual(_b("foo\n]are\nbar\n"), parser._message)
-
- def test_get_message(self):
- parser = details.SimpleDetailsParser(None)
- self.assertEqual(_b(""), parser.get_message())
-
- def test_get_details(self):
- parser = details.SimpleDetailsParser(None)
- traceback = ""
- expected = {}
- expected['traceback'] = content.Content(
- content_type.ContentType("text", "x-traceback",
- {'charset': 'utf8'}),
- lambda:[_b("")])
- found = parser.get_details()
- self.assertEqual(expected.keys(), found.keys())
- self.assertEqual(expected['traceback'].content_type,
- found['traceback'].content_type)
- self.assertEqual(_b('').join(expected['traceback'].iter_bytes()),
- _b('').join(found['traceback'].iter_bytes()))
-
- def test_get_details_skip(self):
- parser = details.SimpleDetailsParser(None)
- traceback = ""
- expected = {}
- expected['reason'] = content.Content(
- content_type.ContentType("text", "plain"),
- lambda:[_b("")])
- found = parser.get_details("skip")
- self.assertEqual(expected, found)
-
- def test_get_details_success(self):
- parser = details.SimpleDetailsParser(None)
- traceback = ""
- expected = {}
- expected['message'] = content.Content(
- content_type.ContentType("text", "plain"),
- lambda:[_b("")])
- found = parser.get_details("success")
- self.assertEqual(expected, found)
-
-
-class TestMultipartDetails(unittest.TestCase):
-
- def test_get_message_is_None(self):
- parser = details.MultipartDetailsParser(None)
- self.assertEqual(None, parser.get_message())
-
- def test_get_details(self):
- parser = details.MultipartDetailsParser(None)
- self.assertEqual({}, parser.get_details())
-
- def test_parts(self):
- parser = details.MultipartDetailsParser(None)
- parser.lineReceived(_b("Content-Type: text/plain\n"))
- parser.lineReceived(_b("something\n"))
- parser.lineReceived(_b("F\r\n"))
- parser.lineReceived(_b("serialised\n"))
- parser.lineReceived(_b("form0\r\n"))
- expected = {}
- expected['something'] = content.Content(
- content_type.ContentType("text", "plain"),
- lambda:[_b("serialised\nform")])
- found = parser.get_details()
- self.assertEqual(expected.keys(), found.keys())
- self.assertEqual(expected['something'].content_type,
- found['something'].content_type)
- self.assertEqual(_b('').join(expected['something'].iter_bytes()),
- _b('').join(found['something'].iter_bytes()))
diff --git a/lib/subunit/python/subunit/tests/test_progress_model.py b/lib/subunit/python/subunit/tests/test_progress_model.py
deleted file mode 100644
index 76200c6107..0000000000
--- a/lib/subunit/python/subunit/tests/test_progress_model.py
+++ /dev/null
@@ -1,118 +0,0 @@
-#
-# subunit: extensions to Python unittest to get test results from subprocesses.
-# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-import unittest
-
-import subunit
-from subunit.progress_model import ProgressModel
-
-
-class TestProgressModel(unittest.TestCase):
-
- def assertProgressSummary(self, pos, total, progress):
- """Assert that a progress model has reached a particular point."""
- self.assertEqual(pos, progress.pos())
- self.assertEqual(total, progress.width())
-
- def test_new_progress_0_0(self):
- progress = ProgressModel()
- self.assertProgressSummary(0, 0, progress)
-
- def test_advance_0_0(self):
- progress = ProgressModel()
- progress.advance()
- self.assertProgressSummary(1, 0, progress)
-
- def test_advance_1_0(self):
- progress = ProgressModel()
- progress.advance()
- self.assertProgressSummary(1, 0, progress)
-
- def test_set_width_absolute(self):
- progress = ProgressModel()
- progress.set_width(10)
- self.assertProgressSummary(0, 10, progress)
-
- def test_set_width_absolute_preserves_pos(self):
- progress = ProgressModel()
- progress.advance()
- progress.set_width(2)
- self.assertProgressSummary(1, 2, progress)
-
- def test_adjust_width(self):
- progress = ProgressModel()
- progress.adjust_width(10)
- self.assertProgressSummary(0, 10, progress)
- progress.adjust_width(-10)
- self.assertProgressSummary(0, 0, progress)
-
- def test_adjust_width_preserves_pos(self):
- progress = ProgressModel()
- progress.advance()
- progress.adjust_width(10)
- self.assertProgressSummary(1, 10, progress)
- progress.adjust_width(-10)
- self.assertProgressSummary(1, 0, progress)
-
- def test_push_preserves_progress(self):
- progress = ProgressModel()
- progress.adjust_width(3)
- progress.advance()
- progress.push()
- self.assertProgressSummary(1, 3, progress)
-
- def test_advance_advances_substack(self):
- progress = ProgressModel()
- progress.adjust_width(3)
- progress.advance()
- progress.push()
- progress.adjust_width(1)
- progress.advance()
- self.assertProgressSummary(2, 3, progress)
-
- def test_adjust_width_adjusts_substack(self):
- progress = ProgressModel()
- progress.adjust_width(3)
- progress.advance()
- progress.push()
- progress.adjust_width(2)
- progress.advance()
- self.assertProgressSummary(3, 6, progress)
-
- def test_set_width_adjusts_substack(self):
- progress = ProgressModel()
- progress.adjust_width(3)
- progress.advance()
- progress.push()
- progress.set_width(2)
- progress.advance()
- self.assertProgressSummary(3, 6, progress)
-
- def test_pop_restores_progress(self):
- progress = ProgressModel()
- progress.adjust_width(3)
- progress.advance()
- progress.push()
- progress.adjust_width(1)
- progress.advance()
- progress.pop()
- self.assertProgressSummary(1, 3, progress)
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
diff --git a/lib/subunit/python/subunit/tests/test_run.py b/lib/subunit/python/subunit/tests/test_run.py
deleted file mode 100644
index 10519ed086..0000000000
--- a/lib/subunit/python/subunit/tests/test_run.py
+++ /dev/null
@@ -1,52 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2011 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-from testtools.compat import BytesIO
-import unittest
-
-from testtools import PlaceHolder
-
-import subunit
-from subunit.run import SubunitTestRunner
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
-
-
-class TimeCollectingTestResult(unittest.TestResult):
-
- def __init__(self, *args, **kwargs):
- super(TimeCollectingTestResult, self).__init__(*args, **kwargs)
- self.time_called = []
-
- def time(self, a_time):
- self.time_called.append(a_time)
-
-
-class TestSubunitTestRunner(unittest.TestCase):
-
- def test_includes_timing_output(self):
- io = BytesIO()
- runner = SubunitTestRunner(stream=io)
- test = PlaceHolder('name')
- runner.run(test)
- client = TimeCollectingTestResult()
- io.seek(0)
- subunit.TestProtocolServer(client).readFrom(io)
- self.assertTrue(len(client.time_called) > 0)
diff --git a/lib/subunit/python/subunit/tests/test_subunit_filter.py b/lib/subunit/python/subunit/tests/test_subunit_filter.py
deleted file mode 100644
index 33b924824d..0000000000
--- a/lib/subunit/python/subunit/tests/test_subunit_filter.py
+++ /dev/null
@@ -1,370 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-"""Tests for subunit.TestResultFilter."""
-
-from datetime import datetime
-import os
-import subprocess
-import sys
-from subunit import iso8601
-import unittest
-
-from testtools import TestCase
-from testtools.compat import _b, BytesIO
-from testtools.testresult.doubles import ExtendedTestResult
-
-import subunit
-from subunit.test_results import make_tag_filter, TestResultFilter
-
-
-class TestTestResultFilter(TestCase):
- """Test for TestResultFilter, a TestResult object which filters tests."""
-
- # While TestResultFilter works on python objects, using a subunit stream
- # is an easy pithy way of getting a series of test objects to call into
- # the TestResult, and as TestResultFilter is intended for use with subunit
- # also has the benefit of detecting any interface skew issues.
- example_subunit_stream = _b("""\
-tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error [
-error details
-]
-test skipped
-skip skipped
-test todo
-xfail todo
-""")
-
- def run_tests(self, result_filter, input_stream=None):
- """Run tests through the given filter.
-
- :param result_filter: A filtering TestResult object.
- :param input_stream: Bytes of subunit stream data. If not provided,
- uses TestTestResultFilter.example_subunit_stream.
- """
- if input_stream is None:
- input_stream = self.example_subunit_stream
- test = subunit.ProtocolTestCase(BytesIO(input_stream))
- test.run(result_filter)
-
- def test_default(self):
- """The default is to exclude success and include everything else."""
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result)
- self.run_tests(result_filter)
- # skips are seen as success by default python TestResult.
- self.assertEqual(['error'],
- [error[0].id() for error in filtered_result.errors])
- self.assertEqual(['failed'],
- [failure[0].id() for failure in
- filtered_result.failures])
- self.assertEqual(4, filtered_result.testsRun)
-
- def test_tag_filter(self):
- tag_filter = make_tag_filter(['global'], ['local'])
- result = ExtendedTestResult()
- result_filter = TestResultFilter(
- result, filter_success=False, filter_predicate=tag_filter)
- self.run_tests(result_filter)
- tests_included = [
- event[1] for event in result._events if event[0] == 'startTest']
- tests_expected = list(map(
- subunit.RemotedTestCase,
- ['passed', 'error', 'skipped', 'todo']))
- self.assertEquals(tests_expected, tests_included)
-
- def test_tags_tracked_correctly(self):
- tag_filter = make_tag_filter(['a'], [])
- result = ExtendedTestResult()
- result_filter = TestResultFilter(
- result, filter_success=False, filter_predicate=tag_filter)
- input_stream = _b(
- "test: foo\n"
- "tags: a\n"
- "successful: foo\n"
- "test: bar\n"
- "successful: bar\n")
- self.run_tests(result_filter, input_stream)
- foo = subunit.RemotedTestCase('foo')
- self.assertEquals(
- [('startTest', foo),
- ('tags', set(['a']), set()),
- ('addSuccess', foo),
- ('stopTest', foo),
- ],
- result._events)
-
- def test_exclude_errors(self):
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result, filter_error=True)
- self.run_tests(result_filter)
- # skips are seen as errors by default python TestResult.
- self.assertEqual([], filtered_result.errors)
- self.assertEqual(['failed'],
- [failure[0].id() for failure in
- filtered_result.failures])
- self.assertEqual(3, filtered_result.testsRun)
-
- def test_fixup_expected_failures(self):
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result,
- fixup_expected_failures=set(["failed"]))
- self.run_tests(result_filter)
- self.assertEqual(['failed', 'todo'],
- [failure[0].id() for failure in filtered_result.expectedFailures])
- self.assertEqual([], filtered_result.failures)
- self.assertEqual(4, filtered_result.testsRun)
-
- def test_fixup_expected_errors(self):
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result,
- fixup_expected_failures=set(["error"]))
- self.run_tests(result_filter)
- self.assertEqual(['error', 'todo'],
- [failure[0].id() for failure in filtered_result.expectedFailures])
- self.assertEqual([], filtered_result.errors)
- self.assertEqual(4, filtered_result.testsRun)
-
- def test_fixup_unexpected_success(self):
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result, filter_success=False,
- fixup_expected_failures=set(["passed"]))
- self.run_tests(result_filter)
- self.assertEqual(['passed'],
- [passed.id() for passed in filtered_result.unexpectedSuccesses])
- self.assertEqual(5, filtered_result.testsRun)
-
- def test_exclude_failure(self):
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result, filter_failure=True)
- self.run_tests(result_filter)
- self.assertEqual(['error'],
- [error[0].id() for error in filtered_result.errors])
- self.assertEqual([],
- [failure[0].id() for failure in
- filtered_result.failures])
- self.assertEqual(3, filtered_result.testsRun)
-
- def test_exclude_skips(self):
- filtered_result = subunit.TestResultStats(None)
- result_filter = TestResultFilter(filtered_result, filter_skip=True)
- self.run_tests(result_filter)
- self.assertEqual(0, filtered_result.skipped_tests)
- self.assertEqual(2, filtered_result.failed_tests)
- self.assertEqual(3, filtered_result.testsRun)
-
- def test_include_success(self):
- """Successes can be included if requested."""
- filtered_result = unittest.TestResult()
- result_filter = TestResultFilter(filtered_result,
- filter_success=False)
- self.run_tests(result_filter)
- self.assertEqual(['error'],
- [error[0].id() for error in filtered_result.errors])
- self.assertEqual(['failed'],
- [failure[0].id() for failure in
- filtered_result.failures])
- self.assertEqual(5, filtered_result.testsRun)
-
- def test_filter_predicate(self):
- """You can filter by predicate callbacks"""
- # 0.0.7 and earlier did not support the 'tags' parameter, so we need
- # to test that we still support behaviour without it.
- filtered_result = unittest.TestResult()
- def filter_cb(test, outcome, err, details):
- return outcome == 'success'
- result_filter = TestResultFilter(filtered_result,
- filter_predicate=filter_cb,
- filter_success=False)
- self.run_tests(result_filter)
- # Only success should pass
- self.assertEqual(1, filtered_result.testsRun)
-
- def test_filter_predicate_with_tags(self):
- """You can filter by predicate callbacks that accept tags"""
- filtered_result = unittest.TestResult()
- def filter_cb(test, outcome, err, details, tags):
- return outcome == 'success'
- result_filter = TestResultFilter(filtered_result,
- filter_predicate=filter_cb,
- filter_success=False)
- self.run_tests(result_filter)
- # Only success should pass
- self.assertEqual(1, filtered_result.testsRun)
-
- def test_time_ordering_preserved(self):
- # Passing a subunit stream through TestResultFilter preserves the
- # relative ordering of 'time' directives and any other subunit
- # directives that are still included.
- date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
- date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
- date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
- subunit_stream = _b('\n'.join([
- "time: %s",
- "test: foo",
- "time: %s",
- "error: foo",
- "time: %s",
- ""]) % (date_a, date_b, date_c))
- result = ExtendedTestResult()
- result_filter = TestResultFilter(result)
- self.run_tests(result_filter, subunit_stream)
- foo = subunit.RemotedTestCase('foo')
- self.maxDiff = None
- self.assertEqual(
- [('time', date_a),
- ('time', date_b),
- ('startTest', foo),
- ('addError', foo, {}),
- ('stopTest', foo),
- ('time', date_c)], result._events)
-
- def test_time_passes_through_filtered_tests(self):
- # Passing a subunit stream through TestResultFilter preserves 'time'
- # directives even if a specific test is filtered out.
- date_a = datetime(year=2000, month=1, day=1, tzinfo=iso8601.UTC)
- date_b = datetime(year=2000, month=1, day=2, tzinfo=iso8601.UTC)
- date_c = datetime(year=2000, month=1, day=3, tzinfo=iso8601.UTC)
- subunit_stream = _b('\n'.join([
- "time: %s",
- "test: foo",
- "time: %s",
- "success: foo",
- "time: %s",
- ""]) % (date_a, date_b, date_c))
- result = ExtendedTestResult()
- result_filter = TestResultFilter(result)
- result_filter.startTestRun()
- self.run_tests(result_filter, subunit_stream)
- result_filter.stopTestRun()
- foo = subunit.RemotedTestCase('foo')
- self.maxDiff = None
- self.assertEqual(
- [('startTestRun',),
- ('time', date_a),
- ('time', date_c),
- ('stopTestRun',),], result._events)
-
- def test_skip_preserved(self):
- subunit_stream = _b('\n'.join([
- "test: foo",
- "skip: foo",
- ""]))
- result = ExtendedTestResult()
- result_filter = TestResultFilter(result)
- self.run_tests(result_filter, subunit_stream)
- foo = subunit.RemotedTestCase('foo')
- self.assertEquals(
- [('startTest', foo),
- ('addSkip', foo, {}),
- ('stopTest', foo), ], result._events)
-
- if sys.version_info < (2, 7):
- # These tests require Python >=2.7.
- del test_fixup_expected_failures, test_fixup_expected_errors, test_fixup_unexpected_success
-
-
-class TestFilterCommand(TestCase):
-
- example_subunit_stream = _b("""\
-tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error [
-error details
-]
-test skipped
-skip skipped
-test todo
-xfail todo
-""")
-
- def run_command(self, args, stream):
- root = os.path.dirname(
- os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
- script_path = os.path.join(root, 'filters', 'subunit-filter')
- command = [sys.executable, script_path] + list(args)
- ps = subprocess.Popen(
- command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- out, err = ps.communicate(stream)
- if ps.returncode != 0:
- raise RuntimeError("%s failed: %s" % (command, err))
- return out
-
- def to_events(self, stream):
- test = subunit.ProtocolTestCase(BytesIO(stream))
- result = ExtendedTestResult()
- test.run(result)
- return result._events
-
- def test_default(self):
- output = self.run_command([], _b(
- "test: foo\n"
- "skip: foo\n"
- ))
- events = self.to_events(output)
- foo = subunit.RemotedTestCase('foo')
- self.assertEqual(
- [('startTest', foo),
- ('addSkip', foo, {}),
- ('stopTest', foo)],
- events)
-
- def test_tags(self):
- output = self.run_command(['-s', '--with-tag', 'a'], _b(
- "tags: a\n"
- "test: foo\n"
- "success: foo\n"
- "tags: -a\n"
- "test: bar\n"
- "success: bar\n"
- "test: baz\n"
- "tags: a\n"
- "success: baz\n"
- ))
- events = self.to_events(output)
- foo = subunit.RemotedTestCase('foo')
- baz = subunit.RemotedTestCase('baz')
- self.assertEqual(
- [('tags', set(['a']), set()),
- ('startTest', foo),
- ('addSuccess', foo),
- ('stopTest', foo),
- ('tags', set(), set(['a'])),
- ('startTest', baz),
- ('tags', set(['a']), set()),
- ('addSuccess', baz),
- ('stopTest', baz),
- ],
- events)
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
diff --git a/lib/subunit/python/subunit/tests/test_subunit_stats.py b/lib/subunit/python/subunit/tests/test_subunit_stats.py
deleted file mode 100644
index 6fd3301060..0000000000
--- a/lib/subunit/python/subunit/tests/test_subunit_stats.py
+++ /dev/null
@@ -1,84 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-"""Tests for subunit.TestResultStats."""
-
-import unittest
-
-from testtools.compat import _b, BytesIO, StringIO
-
-import subunit
-
-
-class TestTestResultStats(unittest.TestCase):
- """Test for TestResultStats, a TestResult object that generates stats."""
-
- def setUp(self):
- self.output = StringIO()
- self.result = subunit.TestResultStats(self.output)
- self.input_stream = BytesIO()
- self.test = subunit.ProtocolTestCase(self.input_stream)
-
- def test_stats_empty(self):
- self.test.run(self.result)
- self.assertEqual(0, self.result.total_tests)
- self.assertEqual(0, self.result.passed_tests)
- self.assertEqual(0, self.result.failed_tests)
- self.assertEqual(set(), self.result.seen_tags)
-
- def setUpUsedStream(self):
- self.input_stream.write(_b("""tags: global
-test passed
-success passed
-test failed
-tags: local
-failure failed
-test error
-error error
-test skipped
-skip skipped
-test todo
-xfail todo
-"""))
- self.input_stream.seek(0)
- self.test.run(self.result)
-
- def test_stats_smoke_everything(self):
- # Statistics are calculated usefully.
- self.setUpUsedStream()
- self.assertEqual(5, self.result.total_tests)
- self.assertEqual(2, self.result.passed_tests)
- self.assertEqual(2, self.result.failed_tests)
- self.assertEqual(1, self.result.skipped_tests)
- self.assertEqual(set(["global", "local"]), self.result.seen_tags)
-
- def test_stat_formatting(self):
- expected = ("""
-Total tests: 5
-Passed tests: 2
-Failed tests: 2
-Skipped tests: 1
-Seen tags: global, local
-""")[1:]
- self.setUpUsedStream()
- self.result.formatStats()
- self.assertEqual(expected, self.output.getvalue())
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
diff --git a/lib/subunit/python/subunit/tests/test_subunit_tags.py b/lib/subunit/python/subunit/tests/test_subunit_tags.py
deleted file mode 100644
index c98506a737..0000000000
--- a/lib/subunit/python/subunit/tests/test_subunit_tags.py
+++ /dev/null
@@ -1,69 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-"""Tests for subunit.tag_stream."""
-
-import unittest
-
-from testtools.compat import StringIO
-
-import subunit
-import subunit.test_results
-
-
-class TestSubUnitTags(unittest.TestCase):
-
- def setUp(self):
- self.original = StringIO()
- self.filtered = StringIO()
-
- def test_add_tag(self):
- self.original.write("tags: foo\n")
- self.original.write("test: test\n")
- self.original.write("tags: bar -quux\n")
- self.original.write("success: test\n")
- self.original.seek(0)
- result = subunit.tag_stream(self.original, self.filtered, ["quux"])
- self.assertEqual([
- "tags: quux",
- "tags: foo",
- "test: test",
- "tags: bar",
- "success: test",
- ],
- self.filtered.getvalue().splitlines())
-
- def test_remove_tag(self):
- self.original.write("tags: foo\n")
- self.original.write("test: test\n")
- self.original.write("tags: bar -quux\n")
- self.original.write("success: test\n")
- self.original.seek(0)
- result = subunit.tag_stream(self.original, self.filtered, ["-bar"])
- self.assertEqual([
- "tags: -bar",
- "tags: foo",
- "test: test",
- "tags: -quux",
- "success: test",
- ],
- self.filtered.getvalue().splitlines())
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
diff --git a/lib/subunit/python/subunit/tests/test_tap2subunit.py b/lib/subunit/python/subunit/tests/test_tap2subunit.py
deleted file mode 100644
index 11bc1916b3..0000000000
--- a/lib/subunit/python/subunit/tests/test_tap2subunit.py
+++ /dev/null
@@ -1,445 +0,0 @@
-#
-# subunit: extensions to python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-"""Tests for TAP2SubUnit."""
-
-import unittest
-
-from testtools.compat import StringIO
-
-import subunit
-
-
-class TestTAP2SubUnit(unittest.TestCase):
- """Tests for TAP2SubUnit.
-
- These tests test TAP string data in, and subunit string data out.
- This is ok because the subunit protocol is intended to be stable,
- but it might be easier/pithier to write tests against TAP string in,
- parsed subunit objects out (by hooking the subunit stream to a subunit
- protocol server.
- """
-
- def setUp(self):
- self.tap = StringIO()
- self.subunit = StringIO()
-
- def test_skip_entire_file(self):
- # A file
- # 1..- # Skipped: comment
- # results in a single skipped test.
- self.tap.write("1..0 # Skipped: entire file skipped\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test file skip",
- "skip file skip [",
- "Skipped: entire file skipped",
- "]",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_test_pass(self):
- # A file
- # ok
- # results in a passed test with name 'test 1' (a synthetic name as tap
- # does not require named fixtures - it is the first test in the tap
- # stream).
- self.tap.write("ok\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "success test 1",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_test_number_pass(self):
- # A file
- # ok 1
- # results in a passed test with name 'test 1'
- self.tap.write("ok 1\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "success test 1",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_test_number_description_pass(self):
- # A file
- # ok 1 - There is a description
- # results in a passed test with name 'test 1 - There is a description'
- self.tap.write("ok 1 - There is a description\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1 - There is a description",
- "success test 1 - There is a description",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_test_description_pass(self):
- # A file
- # ok There is a description
- # results in a passed test with name 'test 1 There is a description'
- self.tap.write("ok There is a description\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1 There is a description",
- "success test 1 There is a description",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_SKIP_skip(self):
- # A file
- # ok # SKIP
- # results in a skkip test with name 'test 1'
- self.tap.write("ok # SKIP\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "skip test 1",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_skip_number_comment_lowercase(self):
- self.tap.write("ok 1 # skip no samba environment available, skipping compilation\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "skip test 1 [",
- "no samba environment available, skipping compilation",
- "]"
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_number_description_SKIP_skip_comment(self):
- # A file
- # ok 1 foo # SKIP Not done yet
- # results in a skip test with name 'test 1 foo' and a log of
- # Not done yet
- self.tap.write("ok 1 foo # SKIP Not done yet\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1 foo",
- "skip test 1 foo [",
- "Not done yet",
- "]",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_SKIP_skip_comment(self):
- # A file
- # ok # SKIP Not done yet
- # results in a skip test with name 'test 1' and a log of Not done yet
- self.tap.write("ok # SKIP Not done yet\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "skip test 1 [",
- "Not done yet",
- "]",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_TODO_xfail(self):
- # A file
- # ok # TODO
- # results in a xfail test with name 'test 1'
- self.tap.write("ok # TODO\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "xfail test 1",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_ok_TODO_xfail_comment(self):
- # A file
- # ok # TODO Not done yet
- # results in a xfail test with name 'test 1' and a log of Not done yet
- self.tap.write("ok # TODO Not done yet\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1",
- "xfail test 1 [",
- "Not done yet",
- "]",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_bail_out_errors(self):
- # A file with line in it
- # Bail out! COMMENT
- # is treated as an error
- self.tap.write("ok 1 foo\n")
- self.tap.write("Bail out! Lifejacket engaged\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- "test test 1 foo",
- "success test 1 foo",
- "test Bail out! Lifejacket engaged",
- "error Bail out! Lifejacket engaged",
- ],
- self.subunit.getvalue().splitlines())
-
- def test_missing_test_at_end_with_plan_adds_error(self):
- # A file
- # 1..3
- # ok first test
- # not ok third test
- # results in three tests, with the third being created
- self.tap.write('1..3\n')
- self.tap.write('ok first test\n')
- self.tap.write('not ok second test\n')
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 first test',
- 'success test 1 first test',
- 'test test 2 second test',
- 'failure test 2 second test',
- 'test test 3',
- 'error test 3 [',
- 'test missing from TAP output',
- ']',
- ],
- self.subunit.getvalue().splitlines())
-
- def test_missing_test_with_plan_adds_error(self):
- # A file
- # 1..3
- # ok first test
- # not ok 3 third test
- # results in three tests, with the second being created
- self.tap.write('1..3\n')
- self.tap.write('ok first test\n')
- self.tap.write('not ok 3 third test\n')
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 first test',
- 'success test 1 first test',
- 'test test 2',
- 'error test 2 [',
- 'test missing from TAP output',
- ']',
- 'test test 3 third test',
- 'failure test 3 third test',
- ],
- self.subunit.getvalue().splitlines())
-
- def test_missing_test_no_plan_adds_error(self):
- # A file
- # ok first test
- # not ok 3 third test
- # results in three tests, with the second being created
- self.tap.write('ok first test\n')
- self.tap.write('not ok 3 third test\n')
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 first test',
- 'success test 1 first test',
- 'test test 2',
- 'error test 2 [',
- 'test missing from TAP output',
- ']',
- 'test test 3 third test',
- 'failure test 3 third test',
- ],
- self.subunit.getvalue().splitlines())
-
- def test_four_tests_in_a_row_trailing_plan(self):
- # A file
- # ok 1 - first test in a script with no plan at all
- # not ok 2 - second
- # ok 3 - third
- # not ok 4 - fourth
- # 1..4
- # results in four tests numbered and named
- self.tap.write('ok 1 - first test in a script with trailing plan\n')
- self.tap.write('not ok 2 - second\n')
- self.tap.write('ok 3 - third\n')
- self.tap.write('not ok 4 - fourth\n')
- self.tap.write('1..4\n')
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 - first test in a script with trailing plan',
- 'success test 1 - first test in a script with trailing plan',
- 'test test 2 - second',
- 'failure test 2 - second',
- 'test test 3 - third',
- 'success test 3 - third',
- 'test test 4 - fourth',
- 'failure test 4 - fourth'
- ],
- self.subunit.getvalue().splitlines())
-
- def test_four_tests_in_a_row_with_plan(self):
- # A file
- # 1..4
- # ok 1 - first test in a script with no plan at all
- # not ok 2 - second
- # ok 3 - third
- # not ok 4 - fourth
- # results in four tests numbered and named
- self.tap.write('1..4\n')
- self.tap.write('ok 1 - first test in a script with a plan\n')
- self.tap.write('not ok 2 - second\n')
- self.tap.write('ok 3 - third\n')
- self.tap.write('not ok 4 - fourth\n')
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 - first test in a script with a plan',
- 'success test 1 - first test in a script with a plan',
- 'test test 2 - second',
- 'failure test 2 - second',
- 'test test 3 - third',
- 'success test 3 - third',
- 'test test 4 - fourth',
- 'failure test 4 - fourth'
- ],
- self.subunit.getvalue().splitlines())
-
- def test_four_tests_in_a_row_no_plan(self):
- # A file
- # ok 1 - first test in a script with no plan at all
- # not ok 2 - second
- # ok 3 - third
- # not ok 4 - fourth
- # results in four tests numbered and named
- self.tap.write('ok 1 - first test in a script with no plan at all\n')
- self.tap.write('not ok 2 - second\n')
- self.tap.write('ok 3 - third\n')
- self.tap.write('not ok 4 - fourth\n')
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 - first test in a script with no plan at all',
- 'success test 1 - first test in a script with no plan at all',
- 'test test 2 - second',
- 'failure test 2 - second',
- 'test test 3 - third',
- 'success test 3 - third',
- 'test test 4 - fourth',
- 'failure test 4 - fourth'
- ],
- self.subunit.getvalue().splitlines())
-
- def test_todo_and_skip(self):
- # A file
- # not ok 1 - a fail but # TODO but is TODO
- # not ok 2 - another fail # SKIP instead
- # results in two tests, numbered and commented.
- self.tap.write("not ok 1 - a fail but # TODO but is TODO\n")
- self.tap.write("not ok 2 - another fail # SKIP instead\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1 - a fail but',
- 'xfail test 1 - a fail but [',
- 'but is TODO',
- ']',
- 'test test 2 - another fail',
- 'skip test 2 - another fail [',
- 'instead',
- ']',
- ],
- self.subunit.getvalue().splitlines())
-
- def test_leading_comments_add_to_next_test_log(self):
- # A file
- # # comment
- # ok
- # ok
- # results in a single test with the comment included
- # in the first test and not the second.
- self.tap.write("# comment\n")
- self.tap.write("ok\n")
- self.tap.write("ok\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1',
- 'success test 1 [',
- '# comment',
- ']',
- 'test test 2',
- 'success test 2',
- ],
- self.subunit.getvalue().splitlines())
-
- def test_trailing_comments_are_included_in_last_test_log(self):
- # A file
- # ok foo
- # ok foo
- # # comment
- # results in a two tests, with the second having the comment
- # attached to its log.
- self.tap.write("ok\n")
- self.tap.write("ok\n")
- self.tap.write("# comment\n")
- self.tap.seek(0)
- result = subunit.TAP2SubUnit(self.tap, self.subunit)
- self.assertEqual(0, result)
- self.assertEqual([
- 'test test 1',
- 'success test 1',
- 'test test 2',
- 'success test 2 [',
- '# comment',
- ']',
- ],
- self.subunit.getvalue().splitlines())
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
diff --git a/lib/subunit/python/subunit/tests/test_test_protocol.py b/lib/subunit/python/subunit/tests/test_test_protocol.py
deleted file mode 100644
index 7831ba16cd..0000000000
--- a/lib/subunit/python/subunit/tests/test_test_protocol.py
+++ /dev/null
@@ -1,1337 +0,0 @@
-#
-# subunit: extensions to Python unittest to get test results from subprocesses.
-# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-import datetime
-import unittest
-import os
-
-from testtools import PlaceHolder, skipIf, TestCase, TestResult
-from testtools.compat import _b, _u, BytesIO
-from testtools.content import Content, TracebackContent, text_content
-from testtools.content_type import ContentType
-try:
- from testtools.testresult.doubles import (
- Python26TestResult,
- Python27TestResult,
- ExtendedTestResult,
- )
-except ImportError:
- from testtools.tests.helpers import (
- Python26TestResult,
- Python27TestResult,
- ExtendedTestResult,
- )
-
-import subunit
-from subunit import _remote_exception_str, _remote_exception_str_chunked
-import subunit.iso8601 as iso8601
-
-
-def details_to_str(details):
- return TestResult()._err_details_to_string(None, details=details)
-
-
-class TestTestImports(unittest.TestCase):
-
- def test_imports(self):
- from subunit import DiscardStream
- from subunit import TestProtocolServer
- from subunit import RemotedTestCase
- from subunit import RemoteError
- from subunit import ExecTestCase
- from subunit import IsolatedTestCase
- from subunit import TestProtocolClient
- from subunit import ProtocolTestCase
-
-
-class TestDiscardStream(unittest.TestCase):
-
- def test_write(self):
- subunit.DiscardStream().write("content")
-
-
-class TestProtocolServerForward(unittest.TestCase):
-
- def test_story(self):
- client = unittest.TestResult()
- out = BytesIO()
- protocol = subunit.TestProtocolServer(client, forward_stream=out)
- pipe = BytesIO(_b("test old mcdonald\n"
- "success old mcdonald\n"))
- protocol.readFrom(pipe)
- self.assertEqual(client.testsRun, 1)
- self.assertEqual(pipe.getvalue(), out.getvalue())
-
- def test_not_command(self):
- client = unittest.TestResult()
- out = BytesIO()
- protocol = subunit.TestProtocolServer(client,
- stream=subunit.DiscardStream(), forward_stream=out)
- pipe = BytesIO(_b("success old mcdonald\n"))
- protocol.readFrom(pipe)
- self.assertEqual(client.testsRun, 0)
- self.assertEqual(_b(""), out.getvalue())
-
-
-class TestTestProtocolServerPipe(unittest.TestCase):
-
- def test_story(self):
- client = unittest.TestResult()
- protocol = subunit.TestProtocolServer(client)
- traceback = "foo.c:53:ERROR invalid state\n"
- pipe = BytesIO(_b("test old mcdonald\n"
- "success old mcdonald\n"
- "test bing crosby\n"
- "failure bing crosby [\n"
- + traceback +
- "]\n"
- "test an error\n"
- "error an error\n"))
- protocol.readFrom(pipe)
- bing = subunit.RemotedTestCase("bing crosby")
- an_error = subunit.RemotedTestCase("an error")
- self.assertEqual(client.errors,
- [(an_error, _remote_exception_str + '\n')])
- self.assertEqual(
- client.failures,
- [(bing, _remote_exception_str + ": "
- + details_to_str({'traceback': text_content(traceback)}) + "\n")])
- self.assertEqual(client.testsRun, 3)
-
- def test_non_test_characters_forwarded_immediately(self):
- pass
-
-
-class TestTestProtocolServerStartTest(unittest.TestCase):
-
- def setUp(self):
- self.client = Python26TestResult()
- self.stream = BytesIO()
- self.protocol = subunit.TestProtocolServer(self.client, self.stream)
-
- def test_start_test(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.assertEqual(self.client._events,
- [('startTest', subunit.RemotedTestCase("old mcdonald"))])
-
- def test_start_testing(self):
- self.protocol.lineReceived(_b("testing old mcdonald\n"))
- self.assertEqual(self.client._events,
- [('startTest', subunit.RemotedTestCase("old mcdonald"))])
-
- def test_start_test_colon(self):
- self.protocol.lineReceived(_b("test: old mcdonald\n"))
- self.assertEqual(self.client._events,
- [('startTest', subunit.RemotedTestCase("old mcdonald"))])
-
- def test_indented_test_colon_ignored(self):
- ignored_line = _b(" test: old mcdonald\n")
- self.protocol.lineReceived(ignored_line)
- self.assertEqual([], self.client._events)
- self.assertEqual(self.stream.getvalue(), ignored_line)
-
- def test_start_testing_colon(self):
- self.protocol.lineReceived(_b("testing: old mcdonald\n"))
- self.assertEqual(self.client._events,
- [('startTest', subunit.RemotedTestCase("old mcdonald"))])
-
-
-class TestTestProtocolServerPassThrough(unittest.TestCase):
-
- def setUp(self):
- self.stdout = BytesIO()
- self.test = subunit.RemotedTestCase("old mcdonald")
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client, self.stdout)
-
- def keywords_before_test(self):
- self.protocol.lineReceived(_b("failure a\n"))
- self.protocol.lineReceived(_b("failure: a\n"))
- self.protocol.lineReceived(_b("error a\n"))
- self.protocol.lineReceived(_b("error: a\n"))
- self.protocol.lineReceived(_b("success a\n"))
- self.protocol.lineReceived(_b("success: a\n"))
- self.protocol.lineReceived(_b("successful a\n"))
- self.protocol.lineReceived(_b("successful: a\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.assertEqual(self.stdout.getvalue(), _b("failure a\n"
- "failure: a\n"
- "error a\n"
- "error: a\n"
- "success a\n"
- "success: a\n"
- "successful a\n"
- "successful: a\n"
- "]\n"))
-
- def test_keywords_before_test(self):
- self.keywords_before_test()
- self.assertEqual(self.client._events, [])
-
- def test_keywords_after_error(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("error old mcdonald\n"))
- self.keywords_before_test()
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, {}),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_keywords_after_failure(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("failure old mcdonald\n"))
- self.keywords_before_test()
- self.assertEqual(self.client._events, [
- ('startTest', self.test),
- ('addFailure', self.test, {}),
- ('stopTest', self.test),
- ])
-
- def test_keywords_after_success(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("success old mcdonald\n"))
- self.keywords_before_test()
- self.assertEqual([
- ('startTest', self.test),
- ('addSuccess', self.test),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_keywords_after_test(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("failure a\n"))
- self.protocol.lineReceived(_b("failure: a\n"))
- self.protocol.lineReceived(_b("error a\n"))
- self.protocol.lineReceived(_b("error: a\n"))
- self.protocol.lineReceived(_b("success a\n"))
- self.protocol.lineReceived(_b("success: a\n"))
- self.protocol.lineReceived(_b("successful a\n"))
- self.protocol.lineReceived(_b("successful: a\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.protocol.lineReceived(_b("failure old mcdonald\n"))
- self.assertEqual(self.stdout.getvalue(), _b("test old mcdonald\n"
- "failure a\n"
- "failure: a\n"
- "error a\n"
- "error: a\n"
- "success a\n"
- "success: a\n"
- "successful a\n"
- "successful: a\n"
- "]\n"))
- self.assertEqual(self.client._events, [
- ('startTest', self.test),
- ('addFailure', self.test, {}),
- ('stopTest', self.test),
- ])
-
- def test_keywords_during_failure(self):
- # A smoke test to make sure that the details parsers have control
- # appropriately.
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("failure: old mcdonald [\n"))
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("failure a\n"))
- self.protocol.lineReceived(_b("failure: a\n"))
- self.protocol.lineReceived(_b("error a\n"))
- self.protocol.lineReceived(_b("error: a\n"))
- self.protocol.lineReceived(_b("success a\n"))
- self.protocol.lineReceived(_b("success: a\n"))
- self.protocol.lineReceived(_b("successful a\n"))
- self.protocol.lineReceived(_b("successful: a\n"))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.assertEqual(self.stdout.getvalue(), _b(""))
- details = {}
- details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}),
- lambda:[_b(
- "test old mcdonald\n"
- "failure a\n"
- "failure: a\n"
- "error a\n"
- "error: a\n"
- "success a\n"
- "success: a\n"
- "successful a\n"
- "successful: a\n"
- "]\n")])
- self.assertEqual(self.client._events, [
- ('startTest', self.test),
- ('addFailure', self.test, details),
- ('stopTest', self.test),
- ])
-
- def test_stdout_passthrough(self):
- """Lines received which cannot be interpreted as any protocol action
- should be passed through to sys.stdout.
- """
- bytes = _b("randombytes\n")
- self.protocol.lineReceived(bytes)
- self.assertEqual(self.stdout.getvalue(), bytes)
-
-
-class TestTestProtocolServerLostConnection(unittest.TestCase):
-
- def setUp(self):
- self.client = Python26TestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
- self.test = subunit.RemotedTestCase("old mcdonald")
-
- def test_lost_connection_no_input(self):
- self.protocol.lostConnection()
- self.assertEqual([], self.client._events)
-
- def test_lost_connection_after_start(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lostConnection()
- failure = subunit.RemoteError(
- _u("lost connection during test 'old mcdonald'"))
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, failure),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_lost_connected_after_error(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("error old mcdonald\n"))
- self.protocol.lostConnection()
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, subunit.RemoteError(_u(""))),
- ('stopTest', self.test),
- ], self.client._events)
-
- def do_connection_lost(self, outcome, opening):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("%s old mcdonald %s" % (outcome, opening)))
- self.protocol.lostConnection()
- failure = subunit.RemoteError(
- _u("lost connection during %s report of test 'old mcdonald'") %
- outcome)
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, failure),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_lost_connection_during_error(self):
- self.do_connection_lost("error", "[\n")
-
- def test_lost_connection_during_error_details(self):
- self.do_connection_lost("error", "[ multipart\n")
-
- def test_lost_connected_after_failure(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("failure old mcdonald\n"))
- self.protocol.lostConnection()
- self.assertEqual([
- ('startTest', self.test),
- ('addFailure', self.test, subunit.RemoteError(_u(""))),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_lost_connection_during_failure(self):
- self.do_connection_lost("failure", "[\n")
-
- def test_lost_connection_during_failure_details(self):
- self.do_connection_lost("failure", "[ multipart\n")
-
- def test_lost_connection_after_success(self):
- self.protocol.lineReceived(_b("test old mcdonald\n"))
- self.protocol.lineReceived(_b("success old mcdonald\n"))
- self.protocol.lostConnection()
- self.assertEqual([
- ('startTest', self.test),
- ('addSuccess', self.test),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_lost_connection_during_success(self):
- self.do_connection_lost("success", "[\n")
-
- def test_lost_connection_during_success_details(self):
- self.do_connection_lost("success", "[ multipart\n")
-
- def test_lost_connection_during_skip(self):
- self.do_connection_lost("skip", "[\n")
-
- def test_lost_connection_during_skip_details(self):
- self.do_connection_lost("skip", "[ multipart\n")
-
- def test_lost_connection_during_xfail(self):
- self.do_connection_lost("xfail", "[\n")
-
- def test_lost_connection_during_xfail_details(self):
- self.do_connection_lost("xfail", "[ multipart\n")
-
- def test_lost_connection_during_uxsuccess(self):
- self.do_connection_lost("uxsuccess", "[\n")
-
- def test_lost_connection_during_uxsuccess_details(self):
- self.do_connection_lost("uxsuccess", "[ multipart\n")
-
-
-class TestInTestMultipart(unittest.TestCase):
-
- def setUp(self):
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = subunit.RemotedTestCase(_u("mcdonalds farm"))
-
- def test__outcome_sets_details_parser(self):
- self.protocol._reading_success_details.details_parser = None
- self.protocol._state._outcome(0, _b("mcdonalds farm [ multipart\n"),
- None, self.protocol._reading_success_details)
- parser = self.protocol._reading_success_details.details_parser
- self.assertNotEqual(None, parser)
- self.assertTrue(isinstance(parser,
- subunit.details.MultipartDetailsParser))
-
-
-class TestTestProtocolServerAddError(unittest.TestCase):
-
- def setUp(self):
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = subunit.RemotedTestCase("mcdonalds farm")
-
- def simple_error_keyword(self, keyword):
- self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
- details = {}
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, details),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_simple_error(self):
- self.simple_error_keyword("error")
-
- def test_simple_error_colon(self):
- self.simple_error_keyword("error:")
-
- def test_error_empty_message(self):
- self.protocol.lineReceived(_b("error mcdonalds farm [\n"))
- self.protocol.lineReceived(_b("]\n"))
- details = {}
- details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[_b("")])
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, details),
- ('stopTest', self.test),
- ], self.client._events)
-
- def error_quoted_bracket(self, keyword):
- self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- details = {}
- details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[_b("]\n")])
- self.assertEqual([
- ('startTest', self.test),
- ('addError', self.test, details),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_error_quoted_bracket(self):
- self.error_quoted_bracket("error")
-
- def test_error_colon_quoted_bracket(self):
- self.error_quoted_bracket("error:")
-
-
-class TestTestProtocolServerAddFailure(unittest.TestCase):
-
- def setUp(self):
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = subunit.RemotedTestCase("mcdonalds farm")
-
- def assertFailure(self, details):
- self.assertEqual([
- ('startTest', self.test),
- ('addFailure', self.test, details),
- ('stopTest', self.test),
- ], self.client._events)
-
- def simple_failure_keyword(self, keyword):
- self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
- details = {}
- self.assertFailure(details)
-
- def test_simple_failure(self):
- self.simple_failure_keyword("failure")
-
- def test_simple_failure_colon(self):
- self.simple_failure_keyword("failure:")
-
- def test_failure_empty_message(self):
- self.protocol.lineReceived(_b("failure mcdonalds farm [\n"))
- self.protocol.lineReceived(_b("]\n"))
- details = {}
- details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[_b("")])
- self.assertFailure(details)
-
- def failure_quoted_bracket(self, keyword):
- self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- details = {}
- details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[_b("]\n")])
- self.assertFailure(details)
-
- def test_failure_quoted_bracket(self):
- self.failure_quoted_bracket("failure")
-
- def test_failure_colon_quoted_bracket(self):
- self.failure_quoted_bracket("failure:")
-
-
-class TestTestProtocolServerAddxFail(unittest.TestCase):
- """Tests for the xfail keyword.
-
- In Python this can thunk through to Success due to stdlib limitations (see
- README).
- """
-
- def capture_expected_failure(self, test, err):
- self._events.append((test, err))
-
- def setup_python26(self):
- """Setup a test object ready to be xfailed and thunk to success."""
- self.client = Python26TestResult()
- self.setup_protocol()
-
- def setup_python27(self):
- """Setup a test object ready to be xfailed."""
- self.client = Python27TestResult()
- self.setup_protocol()
-
- def setup_python_ex(self):
- """Setup a test object ready to be xfailed with details."""
- self.client = ExtendedTestResult()
- self.setup_protocol()
-
- def setup_protocol(self):
- """Setup the protocol based on self.client."""
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = self.client._events[-1][-1]
-
- def simple_xfail_keyword(self, keyword, as_success):
- self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
- self.check_success_or_xfail(as_success)
-
- def check_success_or_xfail(self, as_success, error_message=None):
- if as_success:
- self.assertEqual([
- ('startTest', self.test),
- ('addSuccess', self.test),
- ('stopTest', self.test),
- ], self.client._events)
- else:
- details = {}
- if error_message is not None:
- details['traceback'] = Content(
- ContentType("text", "x-traceback", {'charset': 'utf8'}),
- lambda:[_b(error_message)])
- if isinstance(self.client, ExtendedTestResult):
- value = details
- else:
- if error_message is not None:
- value = subunit.RemoteError(details_to_str(details))
- else:
- value = subunit.RemoteError()
- self.assertEqual([
- ('startTest', self.test),
- ('addExpectedFailure', self.test, value),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_simple_xfail(self):
- self.setup_python26()
- self.simple_xfail_keyword("xfail", True)
- self.setup_python27()
- self.simple_xfail_keyword("xfail", False)
- self.setup_python_ex()
- self.simple_xfail_keyword("xfail", False)
-
- def test_simple_xfail_colon(self):
- self.setup_python26()
- self.simple_xfail_keyword("xfail:", True)
- self.setup_python27()
- self.simple_xfail_keyword("xfail:", False)
- self.setup_python_ex()
- self.simple_xfail_keyword("xfail:", False)
-
- def test_xfail_empty_message(self):
- self.setup_python26()
- self.empty_message(True)
- self.setup_python27()
- self.empty_message(False)
- self.setup_python_ex()
- self.empty_message(False, error_message="")
-
- def empty_message(self, as_success, error_message="\n"):
- self.protocol.lineReceived(_b("xfail mcdonalds farm [\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.check_success_or_xfail(as_success, error_message)
-
- def xfail_quoted_bracket(self, keyword, as_success):
- # This tests it is accepted, but cannot test it is used today, because
- # of not having a way to expose it in Python so far.
- self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.check_success_or_xfail(as_success, "]\n")
-
- def test_xfail_quoted_bracket(self):
- self.setup_python26()
- self.xfail_quoted_bracket("xfail", True)
- self.setup_python27()
- self.xfail_quoted_bracket("xfail", False)
- self.setup_python_ex()
- self.xfail_quoted_bracket("xfail", False)
-
- def test_xfail_colon_quoted_bracket(self):
- self.setup_python26()
- self.xfail_quoted_bracket("xfail:", True)
- self.setup_python27()
- self.xfail_quoted_bracket("xfail:", False)
- self.setup_python_ex()
- self.xfail_quoted_bracket("xfail:", False)
-
-
-class TestTestProtocolServerAddunexpectedSuccess(TestCase):
- """Tests for the uxsuccess keyword."""
-
- def capture_expected_failure(self, test, err):
- self._events.append((test, err))
-
- def setup_python26(self):
- """Setup a test object ready to be xfailed and thunk to success."""
- self.client = Python26TestResult()
- self.setup_protocol()
-
- def setup_python27(self):
- """Setup a test object ready to be xfailed."""
- self.client = Python27TestResult()
- self.setup_protocol()
-
- def setup_python_ex(self):
- """Setup a test object ready to be xfailed with details."""
- self.client = ExtendedTestResult()
- self.setup_protocol()
-
- def setup_protocol(self):
- """Setup the protocol based on self.client."""
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = self.client._events[-1][-1]
-
- def simple_uxsuccess_keyword(self, keyword, as_fail):
- self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
- self.check_fail_or_uxsuccess(as_fail)
-
- def check_fail_or_uxsuccess(self, as_fail, error_message=None):
- details = {}
- if error_message is not None:
- details['traceback'] = Content(
- ContentType("text", "x-traceback", {'charset': 'utf8'}),
- lambda:[_b(error_message)])
- if isinstance(self.client, ExtendedTestResult):
- value = details
- else:
- value = None
- if as_fail:
- self.client._events[1] = self.client._events[1][:2]
- # The value is generated within the extended to original decorator:
- # todo use the testtools matcher to check on this.
- self.assertEqual([
- ('startTest', self.test),
- ('addFailure', self.test),
- ('stopTest', self.test),
- ], self.client._events)
- elif value:
- self.assertEqual([
- ('startTest', self.test),
- ('addUnexpectedSuccess', self.test, value),
- ('stopTest', self.test),
- ], self.client._events)
- else:
- self.assertEqual([
- ('startTest', self.test),
- ('addUnexpectedSuccess', self.test),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_simple_uxsuccess(self):
- self.setup_python26()
- self.simple_uxsuccess_keyword("uxsuccess", True)
- self.setup_python27()
- self.simple_uxsuccess_keyword("uxsuccess", False)
- self.setup_python_ex()
- self.simple_uxsuccess_keyword("uxsuccess", False)
-
- def test_simple_uxsuccess_colon(self):
- self.setup_python26()
- self.simple_uxsuccess_keyword("uxsuccess:", True)
- self.setup_python27()
- self.simple_uxsuccess_keyword("uxsuccess:", False)
- self.setup_python_ex()
- self.simple_uxsuccess_keyword("uxsuccess:", False)
-
- def test_uxsuccess_empty_message(self):
- self.setup_python26()
- self.empty_message(True)
- self.setup_python27()
- self.empty_message(False)
- self.setup_python_ex()
- self.empty_message(False, error_message="")
-
- def empty_message(self, as_fail, error_message="\n"):
- self.protocol.lineReceived(_b("uxsuccess mcdonalds farm [\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.check_fail_or_uxsuccess(as_fail, error_message)
-
- def uxsuccess_quoted_bracket(self, keyword, as_fail):
- self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.check_fail_or_uxsuccess(as_fail, "]\n")
-
- def test_uxsuccess_quoted_bracket(self):
- self.setup_python26()
- self.uxsuccess_quoted_bracket("uxsuccess", True)
- self.setup_python27()
- self.uxsuccess_quoted_bracket("uxsuccess", False)
- self.setup_python_ex()
- self.uxsuccess_quoted_bracket("uxsuccess", False)
-
- def test_uxsuccess_colon_quoted_bracket(self):
- self.setup_python26()
- self.uxsuccess_quoted_bracket("uxsuccess:", True)
- self.setup_python27()
- self.uxsuccess_quoted_bracket("uxsuccess:", False)
- self.setup_python_ex()
- self.uxsuccess_quoted_bracket("uxsuccess:", False)
-
-
-class TestTestProtocolServerAddSkip(unittest.TestCase):
- """Tests for the skip keyword.
-
- In Python this meets the testtools extended TestResult contract.
- (See https://launchpad.net/testtools).
- """
-
- def setUp(self):
- """Setup a test object ready to be skipped."""
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = self.client._events[-1][-1]
-
- def assertSkip(self, reason):
- details = {}
- if reason is not None:
- details['reason'] = Content(
- ContentType("text", "plain"), lambda:[reason])
- self.assertEqual([
- ('startTest', self.test),
- ('addSkip', self.test, details),
- ('stopTest', self.test),
- ], self.client._events)
-
- def simple_skip_keyword(self, keyword):
- self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
- self.assertSkip(None)
-
- def test_simple_skip(self):
- self.simple_skip_keyword("skip")
-
- def test_simple_skip_colon(self):
- self.simple_skip_keyword("skip:")
-
- def test_skip_empty_message(self):
- self.protocol.lineReceived(_b("skip mcdonalds farm [\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.assertSkip(_b(""))
-
- def skip_quoted_bracket(self, keyword):
- # This tests it is accepted, but cannot test it is used today, because
- # of not having a way to expose it in Python so far.
- self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- self.assertSkip(_b("]\n"))
-
- def test_skip_quoted_bracket(self):
- self.skip_quoted_bracket("skip")
-
- def test_skip_colon_quoted_bracket(self):
- self.skip_quoted_bracket("skip:")
-
-
-class TestTestProtocolServerAddSuccess(unittest.TestCase):
-
- def setUp(self):
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- self.test = subunit.RemotedTestCase("mcdonalds farm")
-
- def simple_success_keyword(self, keyword):
- self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword))
- self.assertEqual([
- ('startTest', self.test),
- ('addSuccess', self.test),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_simple_success(self):
- self.simple_success_keyword("successful")
-
- def test_simple_success_colon(self):
- self.simple_success_keyword("successful:")
-
- def assertSuccess(self, details):
- self.assertEqual([
- ('startTest', self.test),
- ('addSuccess', self.test, details),
- ('stopTest', self.test),
- ], self.client._events)
-
- def test_success_empty_message(self):
- self.protocol.lineReceived(_b("success mcdonalds farm [\n"))
- self.protocol.lineReceived(_b("]\n"))
- details = {}
- details['message'] = Content(ContentType("text", "plain"),
- lambda:[_b("")])
- self.assertSuccess(details)
-
- def success_quoted_bracket(self, keyword):
- # This tests it is accepted, but cannot test it is used today, because
- # of not having a way to expose it in Python so far.
- self.protocol.lineReceived(_b("%s mcdonalds farm [\n" % keyword))
- self.protocol.lineReceived(_b(" ]\n"))
- self.protocol.lineReceived(_b("]\n"))
- details = {}
- details['message'] = Content(ContentType("text", "plain"),
- lambda:[_b("]\n")])
- self.assertSuccess(details)
-
- def test_success_quoted_bracket(self):
- self.success_quoted_bracket("success")
-
- def test_success_colon_quoted_bracket(self):
- self.success_quoted_bracket("success:")
-
-
-class TestTestProtocolServerProgress(unittest.TestCase):
- """Test receipt of progress: directives."""
-
- def test_progress_accepted_stdlib(self):
- self.result = Python26TestResult()
- self.stream = BytesIO()
- self.protocol = subunit.TestProtocolServer(self.result,
- stream=self.stream)
- self.protocol.lineReceived(_b("progress: 23"))
- self.protocol.lineReceived(_b("progress: -2"))
- self.protocol.lineReceived(_b("progress: +4"))
- self.assertEqual(_b(""), self.stream.getvalue())
-
- def test_progress_accepted_extended(self):
- # With a progress capable TestResult, progress events are emitted.
- self.result = ExtendedTestResult()
- self.stream = BytesIO()
- self.protocol = subunit.TestProtocolServer(self.result,
- stream=self.stream)
- self.protocol.lineReceived(_b("progress: 23"))
- self.protocol.lineReceived(_b("progress: push"))
- self.protocol.lineReceived(_b("progress: -2"))
- self.protocol.lineReceived(_b("progress: pop"))
- self.protocol.lineReceived(_b("progress: +4"))
- self.assertEqual(_b(""), self.stream.getvalue())
- self.assertEqual([
- ('progress', 23, subunit.PROGRESS_SET),
- ('progress', None, subunit.PROGRESS_PUSH),
- ('progress', -2, subunit.PROGRESS_CUR),
- ('progress', None, subunit.PROGRESS_POP),
- ('progress', 4, subunit.PROGRESS_CUR),
- ], self.result._events)
-
-
-class TestTestProtocolServerStreamTags(unittest.TestCase):
- """Test managing tags on the protocol level."""
-
- def setUp(self):
- self.client = ExtendedTestResult()
- self.protocol = subunit.TestProtocolServer(self.client)
-
- def test_initial_tags(self):
- self.protocol.lineReceived(_b("tags: foo bar:baz quux\n"))
- self.assertEqual([
- ('tags', set(["foo", "bar:baz", "quux"]), set()),
- ], self.client._events)
-
- def test_minus_removes_tags(self):
- self.protocol.lineReceived(_b("tags: -bar quux\n"))
- self.assertEqual([
- ('tags', set(["quux"]), set(["bar"])),
- ], self.client._events)
-
- def test_tags_do_not_get_set_on_test(self):
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- test = self.client._events[0][-1]
- self.assertEqual(None, getattr(test, 'tags', None))
-
- def test_tags_do_not_get_set_on_global_tags(self):
- self.protocol.lineReceived(_b("tags: foo bar\n"))
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- test = self.client._events[-1][-1]
- self.assertEqual(None, getattr(test, 'tags', None))
-
- def test_tags_get_set_on_test_tags(self):
- self.protocol.lineReceived(_b("test mcdonalds farm\n"))
- test = self.client._events[-1][-1]
- self.protocol.lineReceived(_b("tags: foo bar\n"))
- self.protocol.lineReceived(_b("success mcdonalds farm\n"))
- self.assertEqual(None, getattr(test, 'tags', None))
-
-
-class TestTestProtocolServerStreamTime(unittest.TestCase):
- """Test managing time information at the protocol level."""
-
- def test_time_accepted_stdlib(self):
- self.result = Python26TestResult()
- self.stream = BytesIO()
- self.protocol = subunit.TestProtocolServer(self.result,
- stream=self.stream)
- self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n"))
- self.assertEqual(_b(""), self.stream.getvalue())
-
- def test_time_accepted_extended(self):
- self.result = ExtendedTestResult()
- self.stream = BytesIO()
- self.protocol = subunit.TestProtocolServer(self.result,
- stream=self.stream)
- self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n"))
- self.assertEqual(_b(""), self.stream.getvalue())
- self.assertEqual([
- ('time', datetime.datetime(2001, 12, 12, 12, 59, 59, 0,
- iso8601.Utc()))
- ], self.result._events)
-
-
-class TestRemotedTestCase(unittest.TestCase):
-
- def test_simple(self):
- test = subunit.RemotedTestCase("A test description")
- self.assertRaises(NotImplementedError, test.setUp)
- self.assertRaises(NotImplementedError, test.tearDown)
- self.assertEqual("A test description",
- test.shortDescription())
- self.assertEqual("A test description",
- test.id())
- self.assertEqual("A test description (subunit.RemotedTestCase)", "%s" % test)
- self.assertEqual("<subunit.RemotedTestCase description="
- "'A test description'>", "%r" % test)
- result = unittest.TestResult()
- test.run(result)
- self.assertEqual([(test, _remote_exception_str + ": "
- "Cannot run RemotedTestCases.\n\n")],
- result.errors)
- self.assertEqual(1, result.testsRun)
- another_test = subunit.RemotedTestCase("A test description")
- self.assertEqual(test, another_test)
- different_test = subunit.RemotedTestCase("ofo")
- self.assertNotEqual(test, different_test)
- self.assertNotEqual(another_test, different_test)
-
-
-class TestRemoteError(unittest.TestCase):
-
- def test_eq(self):
- error = subunit.RemoteError(_u("Something went wrong"))
- another_error = subunit.RemoteError(_u("Something went wrong"))
- different_error = subunit.RemoteError(_u("boo!"))
- self.assertEqual(error, another_error)
- self.assertNotEqual(error, different_error)
- self.assertNotEqual(different_error, another_error)
-
- def test_empty_constructor(self):
- self.assertEqual(subunit.RemoteError(), subunit.RemoteError(_u("")))
-
-
-class TestExecTestCase(unittest.TestCase):
-
- class SampleExecTestCase(subunit.ExecTestCase):
-
- def test_sample_method(self):
- """sample-script.py"""
- # the sample script runs three tests, one each
- # that fails, errors and succeeds
-
- def test_sample_method_args(self):
- """sample-script.py foo"""
- # sample that will run just one test.
-
- def test_construct(self):
- test = self.SampleExecTestCase("test_sample_method")
- self.assertEqual(test.script,
- subunit.join_dir(__file__, 'sample-script.py'))
-
- def test_args(self):
- result = unittest.TestResult()
- test = self.SampleExecTestCase("test_sample_method_args")
- test.run(result)
- self.assertEqual(1, result.testsRun)
-
- def test_run(self):
- result = ExtendedTestResult()
- test = self.SampleExecTestCase("test_sample_method")
- test.run(result)
- mcdonald = subunit.RemotedTestCase("old mcdonald")
- bing = subunit.RemotedTestCase("bing crosby")
- bing_details = {}
- bing_details['traceback'] = Content(ContentType("text", "x-traceback",
- {'charset': 'utf8'}), lambda:[_b("foo.c:53:ERROR invalid state\n")])
- an_error = subunit.RemotedTestCase("an error")
- error_details = {}
- self.assertEqual([
- ('startTest', mcdonald),
- ('addSuccess', mcdonald),
- ('stopTest', mcdonald),
- ('startTest', bing),
- ('addFailure', bing, bing_details),
- ('stopTest', bing),
- ('startTest', an_error),
- ('addError', an_error, error_details),
- ('stopTest', an_error),
- ], result._events)
-
- def test_debug(self):
- test = self.SampleExecTestCase("test_sample_method")
- test.debug()
-
- def test_count_test_cases(self):
- """TODO run the child process and count responses to determine the count."""
-
- def test_join_dir(self):
- sibling = subunit.join_dir(__file__, 'foo')
- filedir = os.path.abspath(os.path.dirname(__file__))
- expected = os.path.join(filedir, 'foo')
- self.assertEqual(sibling, expected)
-
-
-class DoExecTestCase(subunit.ExecTestCase):
-
- def test_working_script(self):
- """sample-two-script.py"""
-
-
-class TestIsolatedTestCase(TestCase):
-
- class SampleIsolatedTestCase(subunit.IsolatedTestCase):
-
- SETUP = False
- TEARDOWN = False
- TEST = False
-
- def setUp(self):
- TestIsolatedTestCase.SampleIsolatedTestCase.SETUP = True
-
- def tearDown(self):
- TestIsolatedTestCase.SampleIsolatedTestCase.TEARDOWN = True
-
- def test_sets_global_state(self):
- TestIsolatedTestCase.SampleIsolatedTestCase.TEST = True
-
-
- def test_construct(self):
- self.SampleIsolatedTestCase("test_sets_global_state")
-
- @skipIf(os.name != "posix", "Need a posix system for forking tests")
- def test_run(self):
- result = unittest.TestResult()
- test = self.SampleIsolatedTestCase("test_sets_global_state")
- test.run(result)
- self.assertEqual(result.testsRun, 1)
- self.assertEqual(self.SampleIsolatedTestCase.SETUP, False)
- self.assertEqual(self.SampleIsolatedTestCase.TEARDOWN, False)
- self.assertEqual(self.SampleIsolatedTestCase.TEST, False)
-
- def test_debug(self):
- pass
- #test = self.SampleExecTestCase("test_sample_method")
- #test.debug()
-
-
-class TestIsolatedTestSuite(TestCase):
-
- class SampleTestToIsolate(unittest.TestCase):
-
- SETUP = False
- TEARDOWN = False
- TEST = False
-
- def setUp(self):
- TestIsolatedTestSuite.SampleTestToIsolate.SETUP = True
-
- def tearDown(self):
- TestIsolatedTestSuite.SampleTestToIsolate.TEARDOWN = True
-
- def test_sets_global_state(self):
- TestIsolatedTestSuite.SampleTestToIsolate.TEST = True
-
-
- def test_construct(self):
- subunit.IsolatedTestSuite()
-
- @skipIf(os.name != "posix", "Need a posix system for forking tests")
- def test_run(self):
- result = unittest.TestResult()
- suite = subunit.IsolatedTestSuite()
- sub_suite = unittest.TestSuite()
- sub_suite.addTest(self.SampleTestToIsolate("test_sets_global_state"))
- sub_suite.addTest(self.SampleTestToIsolate("test_sets_global_state"))
- suite.addTest(sub_suite)
- suite.addTest(self.SampleTestToIsolate("test_sets_global_state"))
- suite.run(result)
- self.assertEqual(result.testsRun, 3)
- self.assertEqual(self.SampleTestToIsolate.SETUP, False)
- self.assertEqual(self.SampleTestToIsolate.TEARDOWN, False)
- self.assertEqual(self.SampleTestToIsolate.TEST, False)
-
-
-class TestTestProtocolClient(unittest.TestCase):
-
- def setUp(self):
- self.io = BytesIO()
- self.protocol = subunit.TestProtocolClient(self.io)
- self.unicode_test = PlaceHolder(_u('\u2603'))
- self.test = TestTestProtocolClient("test_start_test")
- self.sample_details = {'something':Content(
- ContentType('text', 'plain'), lambda:[_b('serialised\nform')])}
- self.sample_tb_details = dict(self.sample_details)
- self.sample_tb_details['traceback'] = TracebackContent(
- subunit.RemoteError(_u("boo qux")), self.test)
-
- def test_start_test(self):
- """Test startTest on a TestProtocolClient."""
- self.protocol.startTest(self.test)
- self.assertEqual(self.io.getvalue(), _b("test: %s\n" % self.test.id()))
-
- def test_start_test_unicode_id(self):
- """Test startTest on a TestProtocolClient."""
- self.protocol.startTest(self.unicode_test)
- expected = _b("test: ") + _u('\u2603').encode('utf8') + _b("\n")
- self.assertEqual(expected, self.io.getvalue())
-
- def test_stop_test(self):
- # stopTest doesn't output anything.
- self.protocol.stopTest(self.test)
- self.assertEqual(self.io.getvalue(), _b(""))
-
- def test_add_success(self):
- """Test addSuccess on a TestProtocolClient."""
- self.protocol.addSuccess(self.test)
- self.assertEqual(
- self.io.getvalue(), _b("successful: %s\n" % self.test.id()))
-
- def test_add_outcome_unicode_id(self):
- """Test addSuccess on a TestProtocolClient."""
- self.protocol.addSuccess(self.unicode_test)
- expected = _b("successful: ") + _u('\u2603').encode('utf8') + _b("\n")
- self.assertEqual(expected, self.io.getvalue())
-
- def test_add_success_details(self):
- """Test addSuccess on a TestProtocolClient with details."""
- self.protocol.addSuccess(self.test, details=self.sample_details)
- self.assertEqual(
- self.io.getvalue(), _b("successful: %s [ multipart\n"
- "Content-Type: text/plain\n"
- "something\n"
- "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
-
- def test_add_failure(self):
- """Test addFailure on a TestProtocolClient."""
- self.protocol.addFailure(
- self.test, subunit.RemoteError(_u("boo qux")))
- self.assertEqual(
- self.io.getvalue(),
- _b(('failure: %s [\n' + _remote_exception_str + ': boo qux\n]\n')
- % self.test.id()))
-
- def test_add_failure_details(self):
- """Test addFailure on a TestProtocolClient with details."""
- self.protocol.addFailure(
- self.test, details=self.sample_tb_details)
- self.assertEqual(
- self.io.getvalue(),
- _b(("failure: %s [ multipart\n"
- "Content-Type: text/plain\n"
- "something\n"
- "F\r\nserialised\nform0\r\n"
- "Content-Type: text/x-traceback;charset=utf8,language=python\n"
- "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
- "]\n") % self.test.id()))
-
- def test_add_error(self):
- """Test stopTest on a TestProtocolClient."""
- self.protocol.addError(
- self.test, subunit.RemoteError(_u("phwoar crikey")))
- self.assertEqual(
- self.io.getvalue(),
- _b(('error: %s [\n' +
- _remote_exception_str + ": phwoar crikey\n"
- "]\n") % self.test.id()))
-
- def test_add_error_details(self):
- """Test stopTest on a TestProtocolClient with details."""
- self.protocol.addError(
- self.test, details=self.sample_tb_details)
- self.assertEqual(
- self.io.getvalue(),
- _b(("error: %s [ multipart\n"
- "Content-Type: text/plain\n"
- "something\n"
- "F\r\nserialised\nform0\r\n"
- "Content-Type: text/x-traceback;charset=utf8,language=python\n"
- "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
- "]\n") % self.test.id()))
-
- def test_add_expected_failure(self):
- """Test addExpectedFailure on a TestProtocolClient."""
- self.protocol.addExpectedFailure(
- self.test, subunit.RemoteError(_u("phwoar crikey")))
- self.assertEqual(
- self.io.getvalue(),
- _b(('xfail: %s [\n' +
- _remote_exception_str + ": phwoar crikey\n"
- "]\n") % self.test.id()))
-
- def test_add_expected_failure_details(self):
- """Test addExpectedFailure on a TestProtocolClient with details."""
- self.protocol.addExpectedFailure(
- self.test, details=self.sample_tb_details)
- self.assertEqual(
- self.io.getvalue(),
- _b(("xfail: %s [ multipart\n"
- "Content-Type: text/plain\n"
- "something\n"
- "F\r\nserialised\nform0\r\n"
- "Content-Type: text/x-traceback;charset=utf8,language=python\n"
- "traceback\n" + _remote_exception_str_chunked + ": boo qux\n0\r\n"
- "]\n") % self.test.id()))
-
-
- def test_add_skip(self):
- """Test addSkip on a TestProtocolClient."""
- self.protocol.addSkip(
- self.test, "Has it really?")
- self.assertEqual(
- self.io.getvalue(),
- _b('skip: %s [\nHas it really?\n]\n' % self.test.id()))
-
- def test_add_skip_details(self):
- """Test addSkip on a TestProtocolClient with details."""
- details = {'reason':Content(
- ContentType('text', 'plain'), lambda:[_b('Has it really?')])}
- self.protocol.addSkip(self.test, details=details)
- self.assertEqual(
- self.io.getvalue(),
- _b("skip: %s [ multipart\n"
- "Content-Type: text/plain\n"
- "reason\n"
- "E\r\nHas it really?0\r\n"
- "]\n" % self.test.id()))
-
- def test_progress_set(self):
- self.protocol.progress(23, subunit.PROGRESS_SET)
- self.assertEqual(self.io.getvalue(), _b('progress: 23\n'))
-
- def test_progress_neg_cur(self):
- self.protocol.progress(-23, subunit.PROGRESS_CUR)
- self.assertEqual(self.io.getvalue(), _b('progress: -23\n'))
-
- def test_progress_pos_cur(self):
- self.protocol.progress(23, subunit.PROGRESS_CUR)
- self.assertEqual(self.io.getvalue(), _b('progress: +23\n'))
-
- def test_progress_pop(self):
- self.protocol.progress(1234, subunit.PROGRESS_POP)
- self.assertEqual(self.io.getvalue(), _b('progress: pop\n'))
-
- def test_progress_push(self):
- self.protocol.progress(1234, subunit.PROGRESS_PUSH)
- self.assertEqual(self.io.getvalue(), _b('progress: push\n'))
-
- def test_time(self):
- # Calling time() outputs a time signal immediately.
- self.protocol.time(
- datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc()))
- self.assertEqual(
- _b("time: 2009-10-11 12:13:14.000015Z\n"),
- self.io.getvalue())
-
- def test_add_unexpected_success(self):
- """Test addUnexpectedSuccess on a TestProtocolClient."""
- self.protocol.addUnexpectedSuccess(self.test)
- self.assertEqual(
- self.io.getvalue(), _b("uxsuccess: %s\n" % self.test.id()))
-
- def test_add_unexpected_success_details(self):
- """Test addUnexpectedSuccess on a TestProtocolClient with details."""
- self.protocol.addUnexpectedSuccess(self.test, details=self.sample_details)
- self.assertEqual(
- self.io.getvalue(), _b("uxsuccess: %s [ multipart\n"
- "Content-Type: text/plain\n"
- "something\n"
- "F\r\nserialised\nform0\r\n]\n" % self.test.id()))
-
- def test_tags_empty(self):
- self.protocol.tags(set(), set())
- self.assertEqual(_b(""), self.io.getvalue())
-
- def test_tags_add(self):
- self.protocol.tags(set(['foo']), set())
- self.assertEqual(_b("tags: foo\n"), self.io.getvalue())
-
- def test_tags_both(self):
- self.protocol.tags(set(['quux']), set(['bar']))
- self.assertEqual(_b("tags: quux -bar\n"), self.io.getvalue())
-
- def test_tags_gone(self):
- self.protocol.tags(set(), set(['bar']))
- self.assertEqual(_b("tags: -bar\n"), self.io.getvalue())
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result
diff --git a/lib/subunit/python/subunit/tests/test_test_results.py b/lib/subunit/python/subunit/tests/test_test_results.py
deleted file mode 100644
index ff74b9a818..0000000000
--- a/lib/subunit/python/subunit/tests/test_test_results.py
+++ /dev/null
@@ -1,572 +0,0 @@
-#
-# subunit: extensions to Python unittest to get test results from subprocesses.
-# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
-#
-# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
-# license at the users choice. A copy of both licenses are available in the
-# project source as Apache-2.0 and BSD. You may not use this file except in
-# compliance with one of these two licences.
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# license you chose for the specific language governing permissions and
-# limitations under that license.
-#
-
-import csv
-import datetime
-import sys
-import unittest
-
-from testtools import TestCase
-from testtools.compat import StringIO
-from testtools.content import (
- text_content,
- TracebackContent,
- )
-from testtools.testresult.doubles import ExtendedTestResult
-
-import subunit
-import subunit.iso8601 as iso8601
-import subunit.test_results
-
-import testtools
-
-
-class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
-
- def __init__(self, decorated):
- self._calls = 0
- super(LoggingDecorator, self).__init__(decorated)
-
- def _before_event(self):
- self._calls += 1
-
-
-class AssertBeforeTestResult(LoggingDecorator):
- """A TestResult for checking preconditions."""
-
- def __init__(self, decorated, test):
- self.test = test
- super(AssertBeforeTestResult, self).__init__(decorated)
-
- def _before_event(self):
- self.test.assertEqual(1, self.earlier._calls)
- super(AssertBeforeTestResult, self)._before_event()
-
-
-class TimeCapturingResult(unittest.TestResult):
-
- def __init__(self):
- super(TimeCapturingResult, self).__init__()
- self._calls = []
- self.failfast = False
-
- def time(self, a_datetime):
- self._calls.append(a_datetime)
-
-
-class TestHookedTestResultDecorator(unittest.TestCase):
-
- def setUp(self):
- # An end to the chain
- terminal = unittest.TestResult()
- # Asserts that the call was made to self.result before asserter was
- # called.
- asserter = AssertBeforeTestResult(terminal, self)
- # The result object we call, which much increase its call count.
- self.result = LoggingDecorator(asserter)
- asserter.earlier = self.result
- self.decorated = asserter
-
- def tearDown(self):
- # The hook in self.result must have been called
- self.assertEqual(1, self.result._calls)
- # The hook in asserter must have been called too, otherwise the
- # assertion about ordering won't have completed.
- self.assertEqual(1, self.decorated._calls)
-
- def test_startTest(self):
- self.result.startTest(self)
-
- def test_startTestRun(self):
- self.result.startTestRun()
-
- def test_stopTest(self):
- self.result.stopTest(self)
-
- def test_stopTestRun(self):
- self.result.stopTestRun()
-
- def test_addError(self):
- self.result.addError(self, subunit.RemoteError())
-
- def test_addError_details(self):
- self.result.addError(self, details={})
-
- def test_addFailure(self):
- self.result.addFailure(self, subunit.RemoteError())
-
- def test_addFailure_details(self):
- self.result.addFailure(self, details={})
-
- def test_addSuccess(self):
- self.result.addSuccess(self)
-
- def test_addSuccess_details(self):
- self.result.addSuccess(self, details={})
-
- def test_addSkip(self):
- self.result.addSkip(self, "foo")
-
- def test_addSkip_details(self):
- self.result.addSkip(self, details={})
-
- def test_addExpectedFailure(self):
- self.result.addExpectedFailure(self, subunit.RemoteError())
-
- def test_addExpectedFailure_details(self):
- self.result.addExpectedFailure(self, details={})
-
- def test_addUnexpectedSuccess(self):
- self.result.addUnexpectedSuccess(self)
-
- def test_addUnexpectedSuccess_details(self):
- self.result.addUnexpectedSuccess(self, details={})
-
- def test_progress(self):
- self.result.progress(1, subunit.PROGRESS_SET)
-
- def test_wasSuccessful(self):
- self.result.wasSuccessful()
-
- def test_shouldStop(self):
- self.result.shouldStop
-
- def test_stop(self):
- self.result.stop()
-
- def test_time(self):
- self.result.time(None)
-
-
-class TestAutoTimingTestResultDecorator(unittest.TestCase):
-
- def setUp(self):
- # And end to the chain which captures time events.
- terminal = TimeCapturingResult()
- # The result object under test.
- self.result = subunit.test_results.AutoTimingTestResultDecorator(
- terminal)
- self.decorated = terminal
-
- def test_without_time_calls_time_is_called_and_not_None(self):
- self.result.startTest(self)
- self.assertEqual(1, len(self.decorated._calls))
- self.assertNotEqual(None, self.decorated._calls[0])
-
- def test_no_time_from_progress(self):
- self.result.progress(1, subunit.PROGRESS_CUR)
- self.assertEqual(0, len(self.decorated._calls))
-
- def test_no_time_from_shouldStop(self):
- self.decorated.stop()
- self.result.shouldStop
- self.assertEqual(0, len(self.decorated._calls))
-
- def test_calling_time_inhibits_automatic_time(self):
- # Calling time() outputs a time signal immediately and prevents
- # automatically adding one when other methods are called.
- time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
- self.result.time(time)
- self.result.startTest(self)
- self.result.stopTest(self)
- self.assertEqual(1, len(self.decorated._calls))
- self.assertEqual(time, self.decorated._calls[0])
-
- def test_calling_time_None_enables_automatic_time(self):
- time = datetime.datetime(2009,10,11,12,13,14,15, iso8601.Utc())
- self.result.time(time)
- self.assertEqual(1, len(self.decorated._calls))
- self.assertEqual(time, self.decorated._calls[0])
- # Calling None passes the None through, in case other results care.
- self.result.time(None)
- self.assertEqual(2, len(self.decorated._calls))
- self.assertEqual(None, self.decorated._calls[1])
- # Calling other methods doesn't generate an automatic time event.
- self.result.startTest(self)
- self.assertEqual(3, len(self.decorated._calls))
- self.assertNotEqual(None, self.decorated._calls[2])
-
- def test_set_failfast_True(self):
- self.assertFalse(self.decorated.failfast)
- self.result.failfast = True
- self.assertTrue(self.decorated.failfast)
-
-
-class TestTagCollapsingDecorator(TestCase):
-
- def test_tags_collapsed_outside_of_tests(self):
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- tag_collapser.tags(set(['a']), set())
- tag_collapser.tags(set(['b']), set())
- tag_collapser.startTest(self)
- self.assertEquals(
- [('tags', set(['a', 'b']), set([])),
- ('startTest', self),
- ], result._events)
-
- def test_tags_collapsed_outside_of_tests_are_flushed(self):
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- tag_collapser.startTestRun()
- tag_collapser.tags(set(['a']), set())
- tag_collapser.tags(set(['b']), set())
- tag_collapser.startTest(self)
- tag_collapser.addSuccess(self)
- tag_collapser.stopTest(self)
- tag_collapser.stopTestRun()
- self.assertEquals(
- [('startTestRun',),
- ('tags', set(['a', 'b']), set([])),
- ('startTest', self),
- ('addSuccess', self),
- ('stopTest', self),
- ('stopTestRun',),
- ], result._events)
-
- def test_tags_forwarded_after_tests(self):
- test = subunit.RemotedTestCase('foo')
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- tag_collapser.startTestRun()
- tag_collapser.startTest(test)
- tag_collapser.addSuccess(test)
- tag_collapser.stopTest(test)
- tag_collapser.tags(set(['a']), set(['b']))
- tag_collapser.stopTestRun()
- self.assertEqual(
- [('startTestRun',),
- ('startTest', test),
- ('addSuccess', test),
- ('stopTest', test),
- ('tags', set(['a']), set(['b'])),
- ('stopTestRun',),
- ],
- result._events)
-
- def test_tags_collapsed_inside_of_tests(self):
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- test = subunit.RemotedTestCase('foo')
- tag_collapser.startTest(test)
- tag_collapser.tags(set(['a']), set())
- tag_collapser.tags(set(['b']), set(['a']))
- tag_collapser.tags(set(['c']), set())
- tag_collapser.stopTest(test)
- self.assertEquals(
- [('startTest', test),
- ('tags', set(['b', 'c']), set(['a'])),
- ('stopTest', test)],
- result._events)
-
- def test_tags_collapsed_inside_of_tests_different_ordering(self):
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- test = subunit.RemotedTestCase('foo')
- tag_collapser.startTest(test)
- tag_collapser.tags(set(), set(['a']))
- tag_collapser.tags(set(['a', 'b']), set())
- tag_collapser.tags(set(['c']), set())
- tag_collapser.stopTest(test)
- self.assertEquals(
- [('startTest', test),
- ('tags', set(['a', 'b', 'c']), set()),
- ('stopTest', test)],
- result._events)
-
- def test_tags_sent_before_result(self):
- # Because addSuccess and friends tend to send subunit output
- # immediately, and because 'tags:' before a result line means
- # something different to 'tags:' after a result line, we need to be
- # sure that tags are emitted before 'addSuccess' (or whatever).
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
- test = subunit.RemotedTestCase('foo')
- tag_collapser.startTest(test)
- tag_collapser.tags(set(['a']), set())
- tag_collapser.addSuccess(test)
- tag_collapser.stopTest(test)
- self.assertEquals(
- [('startTest', test),
- ('tags', set(['a']), set()),
- ('addSuccess', test),
- ('stopTest', test)],
- result._events)
-
-
-class TestTimeCollapsingDecorator(TestCase):
-
- def make_time(self):
- # Heh heh.
- return datetime.datetime(
- 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
-
- def test_initial_time_forwarded(self):
- # We always forward the first time event we see.
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
- a_time = self.make_time()
- tag_collapser.time(a_time)
- self.assertEquals([('time', a_time)], result._events)
-
- def test_time_collapsed_to_first_and_last(self):
- # If there are many consecutive time events, only the first and last
- # are sent through.
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
- times = [self.make_time() for i in range(5)]
- for a_time in times:
- tag_collapser.time(a_time)
- tag_collapser.startTest(subunit.RemotedTestCase('foo'))
- self.assertEquals(
- [('time', times[0]), ('time', times[-1])], result._events[:-1])
-
- def test_only_one_time_sent(self):
- # If we receive a single time event followed by a non-time event, we
- # send exactly one time event.
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
- a_time = self.make_time()
- tag_collapser.time(a_time)
- tag_collapser.startTest(subunit.RemotedTestCase('foo'))
- self.assertEquals([('time', a_time)], result._events[:-1])
-
- def test_duplicate_times_not_sent(self):
- # Many time events with the exact same time are collapsed into one
- # time event.
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
- a_time = self.make_time()
- for i in range(5):
- tag_collapser.time(a_time)
- tag_collapser.startTest(subunit.RemotedTestCase('foo'))
- self.assertEquals([('time', a_time)], result._events[:-1])
-
- def test_no_times_inserted(self):
- result = ExtendedTestResult()
- tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
- a_time = self.make_time()
- tag_collapser.time(a_time)
- foo = subunit.RemotedTestCase('foo')
- tag_collapser.startTest(foo)
- tag_collapser.addSuccess(foo)
- tag_collapser.stopTest(foo)
- self.assertEquals(
- [('time', a_time),
- ('startTest', foo),
- ('addSuccess', foo),
- ('stopTest', foo)], result._events)
-
-
-class TestByTestResultTests(testtools.TestCase):
-
- def setUp(self):
- super(TestByTestResultTests, self).setUp()
- self.log = []
- self.result = subunit.test_results.TestByTestResult(self.on_test)
- if sys.version_info >= (3, 0):
- self.result._now = iter(range(5)).__next__
- else:
- self.result._now = iter(range(5)).next
-
- def assertCalled(self, **kwargs):
- defaults = {
- 'test': self,
- 'tags': set(),
- 'details': None,
- 'start_time': 0,
- 'stop_time': 1,
- }
- defaults.update(kwargs)
- self.assertEqual([defaults], self.log)
-
- def on_test(self, **kwargs):
- self.log.append(kwargs)
-
- def test_no_tests_nothing_reported(self):
- self.result.startTestRun()
- self.result.stopTestRun()
- self.assertEqual([], self.log)
-
- def test_add_success(self):
- self.result.startTest(self)
- self.result.addSuccess(self)
- self.result.stopTest(self)
- self.assertCalled(status='success')
-
- def test_add_success_details(self):
- self.result.startTest(self)
- details = {'foo': 'bar'}
- self.result.addSuccess(self, details=details)
- self.result.stopTest(self)
- self.assertCalled(status='success', details=details)
-
- def test_tags(self):
- if not getattr(self.result, 'tags', None):
- self.skipTest("No tags in testtools")
- self.result.tags(['foo'], [])
- self.result.startTest(self)
- self.result.addSuccess(self)
- self.result.stopTest(self)
- self.assertCalled(status='success', tags=set(['foo']))
-
- def test_add_error(self):
- self.result.startTest(self)
- try:
- 1/0
- except ZeroDivisionError:
- error = sys.exc_info()
- self.result.addError(self, error)
- self.result.stopTest(self)
- self.assertCalled(
- status='error',
- details={'traceback': TracebackContent(error, self)})
-
- def test_add_error_details(self):
- self.result.startTest(self)
- details = {"foo": text_content("bar")}
- self.result.addError(self, details=details)
- self.result.stopTest(self)
- self.assertCalled(status='error', details=details)
-
- def test_add_failure(self):
- self.result.startTest(self)
- try:
- self.fail("intentional failure")
- except self.failureException:
- failure = sys.exc_info()
- self.result.addFailure(self, failure)
- self.result.stopTest(self)
- self.assertCalled(
- status='failure',
- details={'traceback': TracebackContent(failure, self)})
-
- def test_add_failure_details(self):
- self.result.startTest(self)
- details = {"foo": text_content("bar")}
- self.result.addFailure(self, details=details)
- self.result.stopTest(self)
- self.assertCalled(status='failure', details=details)
-
- def test_add_xfail(self):
- self.result.startTest(self)
- try:
- 1/0
- except ZeroDivisionError:
- error = sys.exc_info()
- self.result.addExpectedFailure(self, error)
- self.result.stopTest(self)
- self.assertCalled(
- status='xfail',
- details={'traceback': TracebackContent(error, self)})
-
- def test_add_xfail_details(self):
- self.result.startTest(self)
- details = {"foo": text_content("bar")}
- self.result.addExpectedFailure(self, details=details)
- self.result.stopTest(self)
- self.assertCalled(status='xfail', details=details)
-
- def test_add_unexpected_success(self):
- self.result.startTest(self)
- details = {'foo': 'bar'}
- self.result.addUnexpectedSuccess(self, details=details)
- self.result.stopTest(self)
- self.assertCalled(status='success', details=details)
-
- def test_add_skip_reason(self):
- self.result.startTest(self)
- reason = self.getUniqueString()
- self.result.addSkip(self, reason)
- self.result.stopTest(self)
- self.assertCalled(
- status='skip', details={'reason': text_content(reason)})
-
- def test_add_skip_details(self):
- self.result.startTest(self)
- details = {'foo': 'bar'}
- self.result.addSkip(self, details=details)
- self.result.stopTest(self)
- self.assertCalled(status='skip', details=details)
-
- def test_twice(self):
- self.result.startTest(self)
- self.result.addSuccess(self, details={'foo': 'bar'})
- self.result.stopTest(self)
- self.result.startTest(self)
- self.result.addSuccess(self)
- self.result.stopTest(self)
- self.assertEqual(
- [{'test': self,
- 'status': 'success',
- 'start_time': 0,
- 'stop_time': 1,
- 'tags': set(),
- 'details': {'foo': 'bar'}},
- {'test': self,
- 'status': 'success',
- 'start_time': 2,
- 'stop_time': 3,
- 'tags': set(),
- 'details': None},
- ],
- self.log)
-
-
-class TestCsvResult(testtools.TestCase):
-
- def parse_stream(self, stream):
- stream.seek(0)
- reader = csv.reader(stream)
- return list(reader)
-
- def test_csv_output(self):
- stream = StringIO()
- result = subunit.test_results.CsvResult(stream)
- if sys.version_info >= (3, 0):
- result._now = iter(range(5)).__next__
- else:
- result._now = iter(range(5)).next
- result.startTestRun()
- result.startTest(self)
- result.addSuccess(self)
- result.stopTest(self)
- result.stopTestRun()
- self.assertEqual(
- [['test', 'status', 'start_time', 'stop_time'],
- [self.id(), 'success', '0', '1'],
- ],
- self.parse_stream(stream))
-
- def test_just_header_when_no_tests(self):
- stream = StringIO()
- result = subunit.test_results.CsvResult(stream)
- result.startTestRun()
- result.stopTestRun()
- self.assertEqual(
- [['test', 'status', 'start_time', 'stop_time']],
- self.parse_stream(stream))
-
- def test_no_output_before_events(self):
- stream = StringIO()
- subunit.test_results.CsvResult(stream)
- self.assertEqual([], self.parse_stream(stream))
-
-
-def test_suite():
- loader = subunit.tests.TestUtil.TestLoader()
- result = loader.loadTestsFromName(__name__)
- return result