diff options
author | Colin Walters <walters@verbum.org> | 2010-10-03 12:55:07 -0400 |
---|---|---|
committer | Colin Walters <walters@verbum.org> | 2010-10-03 12:55:07 -0400 |
commit | db8df4a040883b631ff3c719c453246df9085776 (patch) | |
tree | 66e00f6726f6b0cdb612b00d069bba49fdd2bc0c /tests/tests.py | |
parent | 77ad7371c5906b83db49de309ba39d867b673975 (diff) | |
download | rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz rpmci-db8df4a040883b631ff3c719c453246df9085776.zip |
rpmci-vcs-mirror: Basically appears to work
Diffstat (limited to 'tests/tests.py')
-rw-r--r-- | tests/tests.py | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/tests/tests.py b/tests/tests.py new file mode 100644 index 0000000..6ff7846 --- /dev/null +++ b/tests/tests.py @@ -0,0 +1,58 @@ +import os +import sys +import unittest +import shutil +import tempfile + +import glib + +if os.path.isdir('.git'): + sys.path.insert(0, os.getcwd()) + +from rpmci import async_subprocess +from rpmci import msgqueue + +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._tmpdir = tempfile.mkdtemp() + self._loop = glib.MainLoop() + + def tearDown(self): + del self._loop + shutil.rmtree(self._tmpdir) + +class TestAsyncSubprocess(BaseTestCase): + def testEchoAsync(self): + temp_filepath = os.path.join(self._tmpdir, 'echo-output') + def _on_exited(process, condition): + self._loop.quit() + if not os.path.isfile(temp_filepath): + raise AssertionError("Couldn't find output file %r" % (temp_filepath, )) + f = open(temp_filepath) + contents = f.read() + f.close() + self.assertEquals(contents, 'hello\n') + async_echo = async_subprocess.spawn_async_output_to_file(['echo', 'hello'], temp_filepath, _on_exited) + self._loop.run() + +class TestMsgQueue(BaseTestCase): + def testWriteReadMessage(self): + baseq_dir_path = os.path.join(self._tmpdir, 'msgq') + q = msgqueue.MessageQueue(baseq_dir_path) + def _on_message(queue, messages): + self._loop.quit() + msgs = list(messages) + for msg in msgs: + queue.consume(msg) + self.assertEquals(len(msgs), 1) + msg = msgs[0] + self.assertEquals(msg.headers['type'], 'update') + self.assertEquals(msg.payload['id'], 42) + self.assertEquals(msg.payload['otherdata'], 'foo bar baz') + q.connect(_on_message) + msg = msgqueue.Message(None, {'type': 'update'}, {'id': 42, 'otherdata': 'foo bar baz'}) + q.append(msg) + self._loop.run() + +if __name__ == '__main__': + unittest.main() |