summaryrefslogtreecommitdiffstats
path: root/tests/test_gio.py
diff options
context:
space:
mode:
authorPaul Pogonyshev <pogonyshev@gmx.net>2008-08-27 21:22:08 +0000
committerPaul Pogonyshev <paulp@src.gnome.org>2008-08-27 21:22:08 +0000
commit30eb71d505abcbaf42d098dac62d0c747338d910 (patch)
treed0866a230255838c94dee43f1db5b3be199e3541 /tests/test_gio.py
parent70ede7ebbceb28e420c31d5ceb68e8abdd6216a5 (diff)
downloadpygobject-30eb71d505abcbaf42d098dac62d0c747338d910.tar.gz
pygobject-30eb71d505abcbaf42d098dac62d0c747338d910.tar.xz
pygobject-30eb71d505abcbaf42d098dac62d0c747338d910.zip
Rename from read(), document. (gio.InputStream.read): Rename from
2008-08-28 Paul Pogonyshev <pogonyshev@gmx.net> * gio/gio.defs (gio.InputStream.read_part): Rename from read(), document. (gio.InputStream.read): Rename from read_all(), document. (gio.OutputStream.write_part): Rename from write(), document. (gio.OutputStream.write): Rename from write_all(), document. * gio/ginputstream.override (_wrap_g_input_stream_read): Fix several bugs. (_wrap_g_input_stream_read_all): New function. * gio/goutputstream.override (_wrap_g_output_stream_write_all): New function. * tests/test_gio.py (TestInputStream.testRead): Add more tests. (TestInputStream.test_read_part): New test. (TestInputStream._read_in_loop): New helper method. (TestOutputStream.test_write_part): New test. svn path=/trunk/; revision=950
Diffstat (limited to 'tests/test_gio.py')
-rw-r--r--tests/test_gio.py57
1 files changed, 57 insertions, 0 deletions
diff --git a/tests/test_gio.py b/tests/test_gio.py
index 03d09be..c4e9125 100644
--- a/tests/test_gio.py
+++ b/tests/test_gio.py
@@ -485,6 +485,46 @@ class TestInputStream(unittest.TestCase):
def testRead(self):
self.assertEquals(self.stream.read(), "testing")
+ self.stream = gio.MemoryInputStream()
+ self.assertEquals(self.stream.read(), '')
+
+ self.stream = gio.MemoryInputStream()
+ some_data = open("test_gio.py", "rb").read()
+ self.stream.add_data(some_data)
+ self.assertEquals(self.stream.read(), some_data)
+
+ stream = gio.MemoryInputStream()
+ stream.add_data(some_data)
+ self.assertEquals(self._read_in_loop(stream,
+ lambda: stream.read(50),
+ 50),
+ some_data)
+
+ def test_read_part(self):
+ self.assertEquals(self._read_in_loop(self.stream,
+ lambda: self.stream.read_part()),
+ 'testing')
+
+ stream = gio.MemoryInputStream()
+ some_data = open('test_gio.py', 'rb').read()
+ stream.add_data(some_data)
+ self.assertEquals(self._read_in_loop(stream,
+ lambda: stream.read_part(50),
+ 50),
+ some_data)
+
+ def _read_in_loop(self, stream, reader, size_limit=0):
+ read_data = ''
+ while True:
+ read_part = reader()
+ if read_part:
+ read_data += read_part
+ if size_limit > 0:
+ self.assert_(len(read_part) <= size_limit,
+ '%d <= %d' % (len(read_part), size_limit))
+ else:
+ return read_data
+
def testReadAsync(self):
def callback(stream, result):
self.assertEquals(result.get_op_res_gssize(), 7)
@@ -595,6 +635,23 @@ class TestOutputStream(unittest.TestCase):
self.failUnless(os.path.exists("outputstream.txt"))
self.assertEquals(open("outputstream.txt").read(), "testing")
+ def test_write_part(self):
+ stream = gio.MemoryOutputStream()
+ some_data = open('test_gio.py', 'rb').read()
+ buffer = some_data
+
+ # In fact this makes only one looping (memory stream is fast,
+ # write_part behaves just like write), but let's still be
+ # complete.
+ while buffer:
+ written = stream.write_part(buffer)
+ if written == len(buffer):
+ break
+ else:
+ buffer = buffer[written:]
+
+ self.assertEquals(stream.get_contents(), some_data)
+
def testWriteAsync(self):
def callback(stream, result):
self.assertEquals(result.get_op_res_gssize(), 7)