import os import sys import unittest import shutil import tempfile import glib if os.path.isdir('.git'): sys.path.insert(0, os.getcwd()) from rpmci import async_subprocess from rpmci import msgqueue class BaseTestCase(unittest.TestCase): def setUp(self): self._tmpdir = tempfile.mkdtemp() self._loop = glib.MainLoop() def tearDown(self): del self._loop shutil.rmtree(self._tmpdir) class TestAsyncSubprocess(BaseTestCase): def testEchoAsync(self): temp_filepath = os.path.join(self._tmpdir, 'echo-output') def _on_exited(process, condition): self._loop.quit() if not os.path.isfile(temp_filepath): raise AssertionError("Couldn't find output file %r" % (temp_filepath, )) f = open(temp_filepath) contents = f.read() f.close() self.assertEquals(contents, 'hello\n') async_echo = async_subprocess.spawn_async_output_to_file(['echo', 'hello'], temp_filepath, _on_exited) self._loop.run() class TestMsgQueue(BaseTestCase): def testWriteReadMessage(self): baseq_dir_path = os.path.join(self._tmpdir, 'msgq') q = msgqueue.MessageQueue(baseq_dir_path) def _on_message(queue, messages): self._loop.quit() msgs = list(messages) for msg in msgs: queue.consume(msg) self.assertEquals(len(msgs), 1) msg = msgs[0] self.assertEquals(msg.headers['type'], 'update') self.assertEquals(msg.payload['id'], 42) self.assertEquals(msg.payload['otherdata'], 'foo bar baz') q.connect(_on_message) msg = msgqueue.Message(None, {'type': 'update'}, {'id': 42, 'otherdata': 'foo bar baz'}) q.append(msg) self._loop.run() if __name__ == '__main__': unittest.main()