summaryrefslogtreecommitdiffstats
path: root/tests/test_gio.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_gio.py')
-rw-r--r--tests/test_gio.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/tests/test_gio.py b/tests/test_gio.py
index 03d09be..c4e9125 100644
--- a/tests/test_gio.py
+++ b/tests/test_gio.py
@@ -485,6 +485,46 @@ class TestInputStream(unittest.TestCase):
def testRead(self):
self.assertEquals(self.stream.read(), "testing")
+ self.stream = gio.MemoryInputStream()
+ self.assertEquals(self.stream.read(), '')
+
+ self.stream = gio.MemoryInputStream()
+ some_data = open("test_gio.py", "rb").read()
+ self.stream.add_data(some_data)
+ self.assertEquals(self.stream.read(), some_data)
+
+ stream = gio.MemoryInputStream()
+ stream.add_data(some_data)
+ self.assertEquals(self._read_in_loop(stream,
+ lambda: stream.read(50),
+ 50),
+ some_data)
+
+ def test_read_part(self):
+ self.assertEquals(self._read_in_loop(self.stream,
+ lambda: self.stream.read_part()),
+ 'testing')
+
+ stream = gio.MemoryInputStream()
+ some_data = open('test_gio.py', 'rb').read()
+ stream.add_data(some_data)
+ self.assertEquals(self._read_in_loop(stream,
+ lambda: stream.read_part(50),
+ 50),
+ some_data)
+
+ def _read_in_loop(self, stream, reader, size_limit=0):
+ read_data = ''
+ while True:
+ read_part = reader()
+ if read_part:
+ read_data += read_part
+ if size_limit > 0:
+ self.assert_(len(read_part) <= size_limit,
+ '%d <= %d' % (len(read_part), size_limit))
+ else:
+ return read_data
+
def testReadAsync(self):
def callback(stream, result):
self.assertEquals(result.get_op_res_gssize(), 7)
@@ -595,6 +635,23 @@ class TestOutputStream(unittest.TestCase):
self.failUnless(os.path.exists("outputstream.txt"))
self.assertEquals(open("outputstream.txt").read(), "testing")
+ def test_write_part(self):
+ stream = gio.MemoryOutputStream()
+ some_data = open('test_gio.py', 'rb').read()
+ buffer = some_data
+
+ # In fact this makes only one looping (memory stream is fast,
+ # write_part behaves just like write), but let's still be
+ # complete.
+ while buffer:
+ written = stream.write_part(buffer)
+ if written == len(buffer):
+ break
+ else:
+ buffer = buffer[written:]
+
+ self.assertEquals(stream.get_contents(), some_data)
+
def testWriteAsync(self):
def callback(stream, result):
self.assertEquals(result.get_op_res_gssize(), 7)