source: trunk/src/allmydata/test/mutable/test_filehandle.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 2.6 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os
6from io import BytesIO
7from ..common import SyncTestCase
8from allmydata.mutable.publish import MutableFileHandle
9
10
11class FileHandle(SyncTestCase):
12    def setUp(self):
13        super(FileHandle, self).setUp()
14        self.test_data = b"Test Data" * 50000
15        self.sio = BytesIO(self.test_data)
16        self.uploadable = MutableFileHandle(self.sio)
17
18
19    def test_filehandle_read(self):
20        self.basedir = "mutable/FileHandle/test_filehandle_read"
21        chunk_size = 10
22        for i in range(0, len(self.test_data), chunk_size):
23            data = self.uploadable.read(chunk_size)
24            data = b"".join(data)
25            start = i
26            end = i + chunk_size
27            self.failUnlessEqual(data, self.test_data[start:end])
28
29
30    def test_filehandle_get_size(self):
31        self.basedir = "mutable/FileHandle/test_filehandle_get_size"
32        actual_size = len(self.test_data)
33        size = self.uploadable.get_size()
34        self.failUnlessEqual(size, actual_size)
35
36
37    def test_filehandle_get_size_out_of_order(self):
38        # We should be able to call get_size whenever we want without
39        # disturbing the location of the seek pointer.
40        chunk_size = 100
41        data = self.uploadable.read(chunk_size)
42        self.failUnlessEqual(b"".join(data), self.test_data[:chunk_size])
43
44        # Now get the size.
45        size = self.uploadable.get_size()
46        self.failUnlessEqual(size, len(self.test_data))
47
48        # Now get more data. We should be right where we left off.
49        more_data = self.uploadable.read(chunk_size)
50        start = chunk_size
51        end = chunk_size * 2
52        self.failUnlessEqual(b"".join(more_data), self.test_data[start:end])
53
54
55    def test_filehandle_file(self):
56        # Make sure that the MutableFileHandle works on a file as well
57        # as a BytesIO object, since in some cases it will be asked to
58        # deal with files.
59        self.basedir = self.mktemp()
60        # necessary? What am I doing wrong here?
61        os.mkdir(self.basedir)
62        f_path = os.path.join(self.basedir, "test_file")
63        f = open(f_path, "wb")
64        f.write(self.test_data)
65        f.close()
66        f = open(f_path, "rb")
67
68        uploadable = MutableFileHandle(f)
69
70        data = uploadable.read(len(self.test_data))
71        self.failUnlessEqual(b"".join(data), self.test_data)
72        size = uploadable.get_size()
73        self.failUnlessEqual(size, len(self.test_data))
74
75
76    def test_close(self):
77        # Make sure that the MutableFileHandle closes its handle when
78        # told to do so.
79        self.uploadable.close()
80        self.failUnless(self.sio.closed)
Note: See TracBrowser for help on using the repository browser.