source: trunk/src/allmydata/test/cli/test_backupdb.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 9.3 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import sys
6import os.path, time
7from io import StringIO
8from twisted.trial import unittest
9
10from allmydata.util import fileutil
11from allmydata.util.encodingutil import listdir_unicode
12from allmydata.scripts import backupdb
13from ..common_util import skip_if_cannot_represent_filename
14
15class BackupDB(unittest.TestCase):
16    def create(self, dbfile):
17        stderr = StringIO()
18        bdb = backupdb.get_backupdb(dbfile, stderr=stderr)
19        self.failUnless(bdb, "unable to create backupdb from %r" % (dbfile,))
20        return bdb
21
22    def test_basic(self):
23        self.basedir = basedir = os.path.join("backupdb", "create")
24        fileutil.make_dirs(basedir)
25        dbfile = os.path.join(basedir, "dbfile")
26        bdb = self.create(dbfile)
27        self.failUnlessEqual(bdb.VERSION, 2)
28
29    def test_upgrade_v1_v2(self):
30        self.basedir = basedir = os.path.join("backupdb", "upgrade_v1_v2")
31        fileutil.make_dirs(basedir)
32        dbfile = os.path.join(basedir, "dbfile")
33        stderr = StringIO()
34        created = backupdb.get_backupdb(dbfile, stderr=stderr,
35                                        create_version=(backupdb.SCHEMA_v1, 1),
36                                        just_create=True)
37        self.failUnless(created, "unable to create v1 backupdb")
38        # now we should have a v1 database on disk
39        bdb = self.create(dbfile)
40        self.failUnlessEqual(bdb.VERSION, 2)
41
42    def test_fail(self):
43        self.basedir = basedir = os.path.join("backupdb", "fail")
44        fileutil.make_dirs(basedir)
45
46        # put a non-DB file in the way
47        not_a_db = ("I do not look like a sqlite database\n" +
48                    "I'M NOT" * 1000) # OS-X sqlite-2.3.2 takes some convincing
49        self.writeto("not-a-database", not_a_db)
50        stderr_f = StringIO()
51        bdb = backupdb.get_backupdb(os.path.join(basedir, "not-a-database"),
52                                    stderr_f)
53        self.failUnlessEqual(bdb, None)
54        stderr = stderr_f.getvalue()
55        self.failUnlessIn("backupdb file is unusable", stderr)
56        # sqlite-3.19.3 says "file is encrypted or is not a database"
57        # sqlite-3.20.0 says "file is not a database"
58        self.failUnlessIn("is not a database", stderr)
59
60        # put a directory in the way, to exercise a different error path
61        where = os.path.join(basedir, "roadblock-dir")
62        fileutil.make_dirs(where)
63        stderr_f = StringIO()
64        bdb = backupdb.get_backupdb(where, stderr_f)
65        self.failUnlessEqual(bdb, None)
66        stderr = stderr_f.getvalue()
67        # the error-message is different under PyPy ... not sure why?
68        if 'pypy' in sys.version.lower():
69            self.failUnlessIn("Could not open database", stderr)
70        else:
71            self.failUnlessIn("unable to open database file", stderr)
72
73
74    def writeto(self, filename, data):
75        fn = os.path.join(self.basedir, filename)
76        parentdir = os.path.dirname(fn)
77        fileutil.make_dirs(parentdir)
78        fileutil.write(fn, data)
79        return fn
80
81    def test_check(self):
82        self.basedir = basedir = os.path.join("backupdb", "check")
83        fileutil.make_dirs(basedir)
84        dbfile = os.path.join(basedir, "dbfile")
85        bdb = self.create(dbfile)
86
87        foo_fn = self.writeto("foo.txt", "foo.txt")
88        blah_fn = self.writeto("bar/blah.txt", "blah.txt")
89
90        r = bdb.check_file(foo_fn)
91        self.failUnlessEqual(r.was_uploaded(), False)
92        r.did_upload(b"foo-cap")
93
94        r = bdb.check_file(blah_fn)
95        self.failUnlessEqual(r.was_uploaded(), False)
96        r.did_upload("blah-cap")
97
98        r = bdb.check_file(foo_fn)
99        self.failUnlessEqual(r.was_uploaded(), b"foo-cap")
100        self.failUnlessEqual(type(r.was_uploaded()), bytes)
101        self.failUnlessEqual(r.should_check(), False)
102
103        time.sleep(1.0) # make sure the timestamp changes
104        self.writeto("foo.txt", "NEW")
105
106        r = bdb.check_file(foo_fn)
107        self.failUnlessEqual(r.was_uploaded(), False)
108        r.did_upload(b"new-cap")
109
110        r = bdb.check_file(foo_fn)
111        self.failUnlessEqual(r.was_uploaded(), b"new-cap")
112        self.failUnlessEqual(r.should_check(), False)
113        # if we spontaneously decide to upload it anyways, nothing should
114        # break
115        r.did_upload(b"new-cap")
116
117        r = bdb.check_file(foo_fn, use_timestamps=False)
118        self.failUnlessEqual(r.was_uploaded(), False)
119        r.did_upload(b"new-cap")
120
121        r = bdb.check_file(foo_fn)
122        self.failUnlessEqual(r.was_uploaded(), b"new-cap")
123        self.failUnlessEqual(r.should_check(), False)
124
125        bdb.NO_CHECK_BEFORE = 0
126        bdb.ALWAYS_CHECK_AFTER = 0.1
127
128        r = bdb.check_file(blah_fn)
129        self.failUnlessEqual(r.was_uploaded(), b"blah-cap")
130        self.failUnlessEqual(r.should_check(), True)
131        r.did_check_healthy("results") # we know they're ignored for now
132
133        bdb.NO_CHECK_BEFORE = 200
134        bdb.ALWAYS_CHECK_AFTER = 400
135
136        r = bdb.check_file(blah_fn)
137        self.failUnlessEqual(r.was_uploaded(), b"blah-cap")
138        self.failUnlessEqual(r.should_check(), False)
139
140        os.unlink(os.path.join(basedir, "foo.txt"))
141        fileutil.make_dirs(os.path.join(basedir, "foo.txt")) # file becomes dir
142        r = bdb.check_file(foo_fn)
143        self.failUnlessEqual(r.was_uploaded(), False)
144
145    def test_wrong_version(self):
146        self.basedir = basedir = os.path.join("backupdb", "wrong_version")
147        fileutil.make_dirs(basedir)
148
149        where = os.path.join(basedir, "tooold.db")
150        bdb = self.create(where)
151        # reach into the DB and make it old
152        bdb.cursor.execute("UPDATE version SET version=0")
153        bdb.connection.commit()
154
155        # now the next time we open the database, it should be an unusable
156        # version
157        stderr_f = StringIO()
158        bdb = backupdb.get_backupdb(where, stderr_f)
159        self.failUnlessEqual(bdb, None)
160        stderr = stderr_f.getvalue()
161        self.failUnlessEqual(stderr.strip(),
162                             "Unable to handle backupdb version 0")
163
164    def test_directory(self):
165        self.basedir = basedir = os.path.join("backupdb", "directory")
166        fileutil.make_dirs(basedir)
167        dbfile = os.path.join(basedir, "dbfile")
168        bdb = self.create(dbfile)
169
170        contents = {u"file1": b"URI:CHK:blah1",
171                    u"file2": b"URI:CHK:blah2",
172                    u"dir1": b"URI:DIR2-CHK:baz2"}
173        r = bdb.check_directory(contents)
174        self.failUnless(isinstance(r, backupdb.DirectoryResult))
175        self.failIf(r.was_created())
176        dircap = b"URI:DIR2-CHK:foo1"
177        r.did_create(dircap)
178
179        r = bdb.check_directory(contents)
180        self.failUnless(r.was_created())
181        self.failUnlessEqual(r.was_created(), dircap)
182        self.failUnlessEqual(r.should_check(), False)
183
184        # if we spontaneously decide to upload it anyways, nothing should
185        # break
186        r.did_create(dircap)
187        r = bdb.check_directory(contents)
188        self.failUnless(r.was_created())
189        self.failUnlessEqual(r.was_created(), dircap)
190        self.failUnlessEqual(type(r.was_created()), bytes)
191        self.failUnlessEqual(r.should_check(), False)
192
193        bdb.NO_CHECK_BEFORE = 0
194        bdb.ALWAYS_CHECK_AFTER = 0.1
195        time.sleep(1.0)
196
197        r = bdb.check_directory(contents)
198        self.failUnless(r.was_created())
199        self.failUnlessEqual(r.was_created(), dircap)
200        self.failUnlessEqual(r.should_check(), True)
201        r.did_check_healthy("results")
202
203        bdb.NO_CHECK_BEFORE = 200
204        bdb.ALWAYS_CHECK_AFTER = 400
205
206        r = bdb.check_directory(contents)
207        self.failUnless(r.was_created())
208        self.failUnlessEqual(r.was_created(), dircap)
209        self.failUnlessEqual(r.should_check(), False)
210
211
212        contents2 = {u"file1": b"URI:CHK:blah1",
213                     u"dir1": b"URI:DIR2-CHK:baz2"}
214        r = bdb.check_directory(contents2)
215        self.failIf(r.was_created())
216
217        contents3 = {u"file1": b"URI:CHK:blah1",
218                     u"file2": b"URI:CHK:blah3",
219                     u"dir1": b"URI:DIR2-CHK:baz2"}
220        r = bdb.check_directory(contents3)
221        self.failIf(r.was_created())
222
223    def test_unicode(self):
224        skip_if_cannot_represent_filename(u"f\u00f6\u00f6.txt")
225        skip_if_cannot_represent_filename(u"b\u00e5r.txt")
226
227        self.basedir = basedir = os.path.join("backupdb", "unicode")
228        fileutil.make_dirs(basedir)
229        dbfile = os.path.join(basedir, "dbfile")
230        bdb = self.create(dbfile)
231
232        self.writeto(u"f\u00f6\u00f6.txt", "foo.txt")
233        files = [fn for fn in listdir_unicode(str(basedir)) if fn.endswith(".txt")]
234        self.failUnlessEqual(len(files), 1)
235        foo_fn = os.path.join(basedir, files[0])
236        #print(foo_fn, type(foo_fn))
237
238        r = bdb.check_file(foo_fn)
239        self.failUnlessEqual(r.was_uploaded(), False)
240        r.did_upload(b"foo-cap")
241
242        r = bdb.check_file(foo_fn)
243        self.failUnlessEqual(r.was_uploaded(), b"foo-cap")
244        self.failUnlessEqual(r.should_check(), False)
245
246        bar_fn = self.writeto(u"b\u00e5r.txt", "bar.txt")
247        #print(bar_fn, type(bar_fn))
248
249        r = bdb.check_file(bar_fn)
250        self.failUnlessEqual(r.was_uploaded(), False)
251        r.did_upload(b"bar-cap")
252
253        r = bdb.check_file(bar_fn)
254        self.failUnlessEqual(r.was_uploaded(), b"bar-cap")
255        self.failUnlessEqual(r.should_check(), False)
256
Note: See TracBrowser for help on using the repository browser.