summaryrefslogtreecommitdiffstats
path: root/tests/tests.py
diff options
context:
space:
mode:
authorColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
committerColin Walters <walters@verbum.org>2010-10-03 12:55:07 -0400
commitdb8df4a040883b631ff3c719c453246df9085776 (patch)
tree66e00f6726f6b0cdb612b00d069bba49fdd2bc0c /tests/tests.py
parent77ad7371c5906b83db49de309ba39d867b673975 (diff)
downloadrpmci-db8df4a040883b631ff3c719c453246df9085776.tar.gz
rpmci-db8df4a040883b631ff3c719c453246df9085776.tar.xz
rpmci-db8df4a040883b631ff3c719c453246df9085776.zip
rpmci-vcs-mirror: Basically appears to work
Diffstat (limited to 'tests/tests.py')
-rw-r--r--tests/tests.py58
1 files changed, 58 insertions, 0 deletions
diff --git a/tests/tests.py b/tests/tests.py
new file mode 100644
index 0000000..6ff7846
--- /dev/null
+++ b/tests/tests.py
@@ -0,0 +1,58 @@
+import os
+import sys
+import unittest
+import shutil
+import tempfile
+
+import glib
+
+if os.path.isdir('.git'):
+ sys.path.insert(0, os.getcwd())
+
+from rpmci import async_subprocess
+from rpmci import msgqueue
+
+class BaseTestCase(unittest.TestCase):
+ def setUp(self):
+ self._tmpdir = tempfile.mkdtemp()
+ self._loop = glib.MainLoop()
+
+ def tearDown(self):
+ del self._loop
+ shutil.rmtree(self._tmpdir)
+
+class TestAsyncSubprocess(BaseTestCase):
+ def testEchoAsync(self):
+ temp_filepath = os.path.join(self._tmpdir, 'echo-output')
+ def _on_exited(process, condition):
+ self._loop.quit()
+ if not os.path.isfile(temp_filepath):
+ raise AssertionError("Couldn't find output file %r" % (temp_filepath, ))
+ f = open(temp_filepath)
+ contents = f.read()
+ f.close()
+ self.assertEquals(contents, 'hello\n')
+ async_echo = async_subprocess.spawn_async_output_to_file(['echo', 'hello'], temp_filepath, _on_exited)
+ self._loop.run()
+
+class TestMsgQueue(BaseTestCase):
+ def testWriteReadMessage(self):
+ baseq_dir_path = os.path.join(self._tmpdir, 'msgq')
+ q = msgqueue.MessageQueue(baseq_dir_path)
+ def _on_message(queue, messages):
+ self._loop.quit()
+ msgs = list(messages)
+ for msg in msgs:
+ queue.consume(msg)
+ self.assertEquals(len(msgs), 1)
+ msg = msgs[0]
+ self.assertEquals(msg.headers['type'], 'update')
+ self.assertEquals(msg.payload['id'], 42)
+ self.assertEquals(msg.payload['otherdata'], 'foo bar baz')
+ q.connect(_on_message)
+ msg = msgqueue.Message(None, {'type': 'update'}, {'id': 42, 'otherdata': 'foo bar baz'})
+ q.append(msg)
+ self._loop.run()
+
+if __name__ == '__main__':
+ unittest.main()