source: trunk/src/allmydata/test/mutable/test_roundtrip.py

Last change on this file was 53084f7, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-27T23:49:07Z

remove more Python2 compatibility

  • Property mode set to 100644
File size: 17.4 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from io import StringIO
6from ..common import AsyncTestCase
7from testtools.matchers import Equals, HasLength, Contains
8from twisted.internet import defer
9
10from allmydata.util import base32, consumer
11from allmydata.interfaces import NotEnoughSharesError
12from allmydata.monitor import Monitor
13from allmydata.mutable.common import MODE_READ, UnrecoverableFileError
14from allmydata.mutable.servermap import ServerMap, ServermapUpdater
15from allmydata.mutable.retrieve import Retrieve
16from .util import PublishMixin, make_storagebroker, corrupt
17from .. import common_util as testutil
18
19class Roundtrip(AsyncTestCase, testutil.ShouldFailMixin, PublishMixin):
20    def setUp(self):
21        super(Roundtrip, self).setUp()
22        return self.publish_one()
23
24    def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None):
25        if oldmap is None:
26            oldmap = ServerMap()
27        if sb is None:
28            sb = self._storage_broker
29        smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode)
30        d = smu.update()
31        return d
32
33    def abbrev_verinfo(self, verinfo):
34        if verinfo is None:
35            return None
36        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
37         offsets_tuple) = verinfo
38        return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
39
40    def abbrev_verinfo_dict(self, verinfo_d):
41        output = {}
42        for verinfo,value in list(verinfo_d.items()):
43            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
44             offsets_tuple) = verinfo
45            output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value
46        return output
47
48    def dump_servermap(self, servermap):
49        print("SERVERMAP", servermap)
50        print("RECOVERABLE", [self.abbrev_verinfo(v)
51                              for v in servermap.recoverable_versions()])
52        print("BEST", self.abbrev_verinfo(servermap.best_recoverable_version()))
53        print("available", self.abbrev_verinfo_dict(servermap.shares_available()))
54
55    def do_download(self, servermap, version=None):
56        if version is None:
57            version = servermap.best_recoverable_version()
58        r = Retrieve(self._fn, self._storage_broker, servermap, version)
59        c = consumer.MemoryConsumer()
60        d = r.download(consumer=c)
61        d.addCallback(lambda mc: b"".join(mc.chunks))
62        return d
63
64
65    def test_basic(self):
66        d = self.make_servermap()
67        def _do_retrieve(servermap):
68            self._smap = servermap
69            #self.dump_servermap(servermap)
70            self.assertThat(servermap.recoverable_versions(), HasLength(1))
71            return self.do_download(servermap)
72        d.addCallback(_do_retrieve)
73        def _retrieved(new_contents):
74            self.assertThat(new_contents, Equals(self.CONTENTS))
75        d.addCallback(_retrieved)
76        # we should be able to re-use the same servermap, both with and
77        # without updating it.
78        d.addCallback(lambda res: self.do_download(self._smap))
79        d.addCallback(_retrieved)
80        d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
81        d.addCallback(lambda res: self.do_download(self._smap))
82        d.addCallback(_retrieved)
83        # clobbering the pubkey should make the servermap updater re-fetch it
84        def _clobber_pubkey(res):
85            self._fn._pubkey = None
86        d.addCallback(_clobber_pubkey)
87        d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
88        d.addCallback(lambda res: self.do_download(self._smap))
89        d.addCallback(_retrieved)
90        return d
91
92    def test_all_shares_vanished(self):
93        d = self.make_servermap()
94        def _remove_shares(servermap):
95            for shares in list(self._storage._peers.values()):
96                shares.clear()
97            d1 = self.shouldFail(NotEnoughSharesError,
98                                 "test_all_shares_vanished",
99                                 "ran out of servers",
100                                 self.do_download, servermap)
101            return d1
102        d.addCallback(_remove_shares)
103        return d
104
105    def test_all_but_two_shares_vanished_updated_servermap(self):
106        # tests error reporting for ticket #1742
107        d = self.make_servermap()
108        def _remove_shares(servermap):
109            self._version = servermap.best_recoverable_version()
110            for shares in list(self._storage._peers.values())[2:]:
111                shares.clear()
112            return self.make_servermap(servermap)
113        d.addCallback(_remove_shares)
114        def _check(updated_servermap):
115            d1 = self.shouldFail(NotEnoughSharesError,
116                                 "test_all_but_two_shares_vanished_updated_servermap",
117                                 "ran out of servers",
118                                 self.do_download, updated_servermap, version=self._version)
119            return d1
120        d.addCallback(_check)
121        return d
122
123    def test_no_servers(self):
124        sb2 = make_storagebroker(num_peers=0)
125        # if there are no servers, then a MODE_READ servermap should come
126        # back empty
127        d = self.make_servermap(sb=sb2)
128        def _check_servermap(servermap):
129            self.assertThat(servermap.best_recoverable_version(), Equals(None))
130            self.assertFalse(servermap.recoverable_versions())
131            self.assertFalse(servermap.unrecoverable_versions())
132            self.assertFalse(servermap.all_servers())
133        d.addCallback(_check_servermap)
134        return d
135
136    def test_no_servers_download(self):
137        sb2 = make_storagebroker(num_peers=0)
138        self._fn._storage_broker = sb2
139        d = self.shouldFail(UnrecoverableFileError,
140                            "test_no_servers_download",
141                            "no recoverable versions",
142                            self._fn.download_best_version)
143        def _restore(res):
144            # a failed download that occurs while we aren't connected to
145            # anybody should not prevent a subsequent download from working.
146            # This isn't quite the webapi-driven test that #463 wants, but it
147            # should be close enough.
148            self._fn._storage_broker = self._storage_broker
149            return self._fn.download_best_version()
150        def _retrieved(new_contents):
151            self.assertThat(new_contents, Equals(self.CONTENTS))
152        d.addCallback(_restore)
153        d.addCallback(_retrieved)
154        return d
155
156
157    def _test_corrupt_all(self, offset, substring,
158                          should_succeed=False,
159                          corrupt_early=True,
160                          failure_checker=None,
161                          fetch_privkey=False):
162        d = defer.succeed(None)
163        if corrupt_early:
164            d.addCallback(corrupt, self._storage, offset)
165        d.addCallback(lambda res: self.make_servermap())
166        if not corrupt_early:
167            d.addCallback(corrupt, self._storage, offset)
168        def _do_retrieve(servermap):
169            ver = servermap.best_recoverable_version()
170            if ver is None and not should_succeed:
171                # no recoverable versions == not succeeding. The problem
172                # should be noted in the servermap's list of problems.
173                if substring:
174                    allproblems = [str(f) for f in servermap.get_problems()]
175                    self.assertThat("".join(allproblems), Contains(substring))
176                return servermap
177            if should_succeed:
178                d1 = self._fn.download_version(servermap, ver,
179                                               fetch_privkey)
180                d1.addCallback(lambda new_contents:
181                               self.assertThat(new_contents, Equals(self.CONTENTS)))
182            else:
183                d1 = self.shouldFail(NotEnoughSharesError,
184                                     "_corrupt_all(offset=%s)" % (offset,),
185                                     substring,
186                                     self._fn.download_version, servermap,
187                                                                ver,
188                                                                fetch_privkey)
189            if failure_checker:
190                d1.addCallback(failure_checker)
191            d1.addCallback(lambda res: servermap)
192            return d1
193        d.addCallback(_do_retrieve)
194        return d
195
196    def test_corrupt_all_verbyte(self):
197        # when the version byte is not 0 or 1, we hit an UnknownVersionError
198        # error in unpack_share().
199        d = self._test_corrupt_all(0, "UnknownVersionError")
200        def _check_servermap(servermap):
201            # and the dump should mention the problems
202            s = StringIO()
203            dump = servermap.dump(s).getvalue()
204            self.assertTrue("30 PROBLEMS" in dump, msg=dump)
205        d.addCallback(_check_servermap)
206        return d
207
208    def test_corrupt_all_seqnum(self):
209        # a corrupt sequence number will trigger a bad signature
210        return self._test_corrupt_all(1, "signature is invalid")
211
212    def test_corrupt_all_R(self):
213        # a corrupt root hash will trigger a bad signature
214        return self._test_corrupt_all(9, "signature is invalid")
215
216    def test_corrupt_all_IV(self):
217        # a corrupt salt/IV will trigger a bad signature
218        return self._test_corrupt_all(41, "signature is invalid")
219
220    def test_corrupt_all_k(self):
221        # a corrupt 'k' will trigger a bad signature
222        return self._test_corrupt_all(57, "signature is invalid")
223
224    def test_corrupt_all_N(self):
225        # a corrupt 'N' will trigger a bad signature
226        return self._test_corrupt_all(58, "signature is invalid")
227
228    def test_corrupt_all_segsize(self):
229        # a corrupt segsize will trigger a bad signature
230        return self._test_corrupt_all(59, "signature is invalid")
231
232    def test_corrupt_all_datalen(self):
233        # a corrupt data length will trigger a bad signature
234        return self._test_corrupt_all(67, "signature is invalid")
235
236    def test_corrupt_all_pubkey(self):
237        # a corrupt pubkey won't match the URI's fingerprint. We need to
238        # remove the pubkey from the filenode, or else it won't bother trying
239        # to update it.
240        self._fn._pubkey = None
241        return self._test_corrupt_all("pubkey",
242                                      "pubkey doesn't match fingerprint")
243
244    def test_corrupt_all_sig(self):
245        # a corrupt signature is a bad one
246        # the signature runs from about [543:799], depending upon the length
247        # of the pubkey
248        return self._test_corrupt_all("signature", "signature is invalid")
249
250    def test_corrupt_all_share_hash_chain_number(self):
251        # a corrupt share hash chain entry will show up as a bad hash. If we
252        # mangle the first byte, that will look like a bad hash number,
253        # causing an IndexError
254        return self._test_corrupt_all("share_hash_chain", "corrupt hashes")
255
256    def test_corrupt_all_share_hash_chain_hash(self):
257        # a corrupt share hash chain entry will show up as a bad hash. If we
258        # mangle a few bytes in, that will look like a bad hash.
259        return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes")
260
261    def test_corrupt_all_block_hash_tree(self):
262        return self._test_corrupt_all("block_hash_tree",
263                                      "block hash tree failure")
264
265    def test_corrupt_all_block(self):
266        return self._test_corrupt_all("share_data", "block hash tree failure")
267
268    def test_corrupt_all_encprivkey(self):
269        # a corrupted privkey won't even be noticed by the reader, only by a
270        # writer.
271        return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
272
273
274    def test_corrupt_all_encprivkey_late(self):
275        # this should work for the same reason as above, but we corrupt
276        # after the servermap update to exercise the error handling
277        # code.
278        # We need to remove the privkey from the node, or the retrieve
279        # process won't know to update it.
280        self._fn._privkey = None
281        return self._test_corrupt_all("enc_privkey",
282                                      None, # this shouldn't fail
283                                      should_succeed=True,
284                                      corrupt_early=False,
285                                      fetch_privkey=True)
286
287
288    # disabled until retrieve tests checkstring on each blockfetch. I didn't
289    # just use a .todo because the failing-but-ignored test emits about 30kB
290    # of noise.
291    def OFF_test_corrupt_all_seqnum_late(self):
292        # corrupting the seqnum between mapupdate and retrieve should result
293        # in NotEnoughSharesError, since each share will look invalid
294        def _check(res):
295            f = res[0]
296            self.assertThat(f.check(NotEnoughSharesError), HasLength(1))
297            self.assertThat("uncoordinated write" in str(f), Equals(True))
298        return self._test_corrupt_all(1, "ran out of servers",
299                                      corrupt_early=False,
300                                      failure_checker=_check)
301
302
303    def test_corrupt_all_block_late(self):
304        def _check(res):
305            f = res[0]
306            self.assertTrue(f.check(NotEnoughSharesError))
307        return self._test_corrupt_all("share_data", "block hash tree failure",
308                                      corrupt_early=False,
309                                      failure_checker=_check)
310
311
312    def test_basic_pubkey_at_end(self):
313        # we corrupt the pubkey in all but the last 'k' shares, allowing the
314        # download to succeed but forcing a bunch of retries first. Note that
315        # this is rather pessimistic: our Retrieve process will throw away
316        # the whole share if the pubkey is bad, even though the rest of the
317        # share might be good.
318
319        self._fn._pubkey = None
320        k = self._fn.get_required_shares()
321        N = self._fn.get_total_shares()
322        d = defer.succeed(None)
323        d.addCallback(corrupt, self._storage, "pubkey",
324                      shnums_to_corrupt=list(range(0, N-k)))
325        d.addCallback(lambda res: self.make_servermap())
326        def _do_retrieve(servermap):
327            self.assertTrue(servermap.get_problems())
328            self.assertThat("pubkey doesn't match fingerprint"
329                            in str(servermap.get_problems()[0]), Equals(True))
330            ver = servermap.best_recoverable_version()
331            r = Retrieve(self._fn, self._storage_broker, servermap, ver)
332            c = consumer.MemoryConsumer()
333            return r.download(c)
334        d.addCallback(_do_retrieve)
335        d.addCallback(lambda mc: b"".join(mc.chunks))
336        d.addCallback(lambda new_contents:
337                      self.assertThat(new_contents, Equals(self.CONTENTS)))
338        return d
339
340
341    def _test_corrupt_some(self, offset, mdmf=False):
342        if mdmf:
343            d = self.publish_mdmf()
344        else:
345            d = defer.succeed(None)
346        d.addCallback(lambda ignored:
347            corrupt(None, self._storage, offset, list(range(5))))
348        d.addCallback(lambda ignored:
349            self.make_servermap())
350        def _do_retrieve(servermap):
351            ver = servermap.best_recoverable_version()
352            self.assertTrue(ver)
353            return self._fn.download_best_version()
354        d.addCallback(_do_retrieve)
355        d.addCallback(lambda new_contents:
356            self.assertThat(new_contents, Equals(self.CONTENTS)))
357        return d
358
359
360    def test_corrupt_some(self):
361        # corrupt the data of first five shares (so the servermap thinks
362        # they're good but retrieve marks them as bad), so that the
363        # MODE_READ set of 6 will be insufficient, forcing node.download to
364        # retry with more servers.
365        return self._test_corrupt_some("share_data")
366
367
368    def test_download_fails(self):
369        d = corrupt(None, self._storage, "signature")
370        d.addCallback(lambda ignored:
371            self.shouldFail(UnrecoverableFileError, "test_download_anyway",
372                            "no recoverable versions",
373                            self._fn.download_best_version))
374        return d
375
376
377
378    def test_corrupt_mdmf_block_hash_tree(self):
379        d = self.publish_mdmf()
380        d.addCallback(lambda ignored:
381            self._test_corrupt_all(("block_hash_tree", 12 * 32),
382                                   "block hash tree failure",
383                                   corrupt_early=True,
384                                   should_succeed=False))
385        return d
386
387
388    def test_corrupt_mdmf_block_hash_tree_late(self):
389        # Note - there is no SDMF counterpart to this test, as the SDMF
390        # files are guaranteed to have exactly one block, and therefore
391        # the block hash tree fits within the initial read (#1240).
392        d = self.publish_mdmf()
393        d.addCallback(lambda ignored:
394            self._test_corrupt_all(("block_hash_tree", 12 * 32),
395                                   "block hash tree failure",
396                                   corrupt_early=False,
397                                   should_succeed=False))
398        return d
399
400
401    def test_corrupt_mdmf_share_data(self):
402        d = self.publish_mdmf()
403        d.addCallback(lambda ignored:
404            # TODO: Find out what the block size is and corrupt a
405            # specific block, rather than just guessing.
406            self._test_corrupt_all(("share_data", 12 * 40),
407                                    "block hash tree failure",
408                                    corrupt_early=True,
409                                    should_succeed=False))
410        return d
411
412
413    def test_corrupt_some_mdmf(self):
414        return self._test_corrupt_some(("share_data", 12 * 40),
415                                       mdmf=True)
Note: See TracBrowser for help on using the repository browser.