1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from io import StringIO |
---|
6 | from ..common import AsyncTestCase |
---|
7 | from testtools.matchers import Equals, HasLength, Contains |
---|
8 | from twisted.internet import defer |
---|
9 | |
---|
10 | from allmydata.util import base32, consumer |
---|
11 | from allmydata.interfaces import NotEnoughSharesError |
---|
12 | from allmydata.monitor import Monitor |
---|
13 | from allmydata.mutable.common import MODE_READ, UnrecoverableFileError |
---|
14 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
---|
15 | from allmydata.mutable.retrieve import Retrieve |
---|
16 | from .util import PublishMixin, make_storagebroker, corrupt |
---|
17 | from .. import common_util as testutil |
---|
18 | |
---|
19 | class Roundtrip(AsyncTestCase, testutil.ShouldFailMixin, PublishMixin): |
---|
20 | def setUp(self): |
---|
21 | super(Roundtrip, self).setUp() |
---|
22 | return self.publish_one() |
---|
23 | |
---|
24 | def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None): |
---|
25 | if oldmap is None: |
---|
26 | oldmap = ServerMap() |
---|
27 | if sb is None: |
---|
28 | sb = self._storage_broker |
---|
29 | smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode) |
---|
30 | d = smu.update() |
---|
31 | return d |
---|
32 | |
---|
33 | def abbrev_verinfo(self, verinfo): |
---|
34 | if verinfo is None: |
---|
35 | return None |
---|
36 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
37 | offsets_tuple) = verinfo |
---|
38 | return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4]) |
---|
39 | |
---|
40 | def abbrev_verinfo_dict(self, verinfo_d): |
---|
41 | output = {} |
---|
42 | for verinfo,value in list(verinfo_d.items()): |
---|
43 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
44 | offsets_tuple) = verinfo |
---|
45 | output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value |
---|
46 | return output |
---|
47 | |
---|
48 | def dump_servermap(self, servermap): |
---|
49 | print("SERVERMAP", servermap) |
---|
50 | print("RECOVERABLE", [self.abbrev_verinfo(v) |
---|
51 | for v in servermap.recoverable_versions()]) |
---|
52 | print("BEST", self.abbrev_verinfo(servermap.best_recoverable_version())) |
---|
53 | print("available", self.abbrev_verinfo_dict(servermap.shares_available())) |
---|
54 | |
---|
55 | def do_download(self, servermap, version=None): |
---|
56 | if version is None: |
---|
57 | version = servermap.best_recoverable_version() |
---|
58 | r = Retrieve(self._fn, self._storage_broker, servermap, version) |
---|
59 | c = consumer.MemoryConsumer() |
---|
60 | d = r.download(consumer=c) |
---|
61 | d.addCallback(lambda mc: b"".join(mc.chunks)) |
---|
62 | return d |
---|
63 | |
---|
64 | |
---|
65 | def test_basic(self): |
---|
66 | d = self.make_servermap() |
---|
67 | def _do_retrieve(servermap): |
---|
68 | self._smap = servermap |
---|
69 | #self.dump_servermap(servermap) |
---|
70 | self.assertThat(servermap.recoverable_versions(), HasLength(1)) |
---|
71 | return self.do_download(servermap) |
---|
72 | d.addCallback(_do_retrieve) |
---|
73 | def _retrieved(new_contents): |
---|
74 | self.assertThat(new_contents, Equals(self.CONTENTS)) |
---|
75 | d.addCallback(_retrieved) |
---|
76 | # we should be able to re-use the same servermap, both with and |
---|
77 | # without updating it. |
---|
78 | d.addCallback(lambda res: self.do_download(self._smap)) |
---|
79 | d.addCallback(_retrieved) |
---|
80 | d.addCallback(lambda res: self.make_servermap(oldmap=self._smap)) |
---|
81 | d.addCallback(lambda res: self.do_download(self._smap)) |
---|
82 | d.addCallback(_retrieved) |
---|
83 | # clobbering the pubkey should make the servermap updater re-fetch it |
---|
84 | def _clobber_pubkey(res): |
---|
85 | self._fn._pubkey = None |
---|
86 | d.addCallback(_clobber_pubkey) |
---|
87 | d.addCallback(lambda res: self.make_servermap(oldmap=self._smap)) |
---|
88 | d.addCallback(lambda res: self.do_download(self._smap)) |
---|
89 | d.addCallback(_retrieved) |
---|
90 | return d |
---|
91 | |
---|
92 | def test_all_shares_vanished(self): |
---|
93 | d = self.make_servermap() |
---|
94 | def _remove_shares(servermap): |
---|
95 | for shares in list(self._storage._peers.values()): |
---|
96 | shares.clear() |
---|
97 | d1 = self.shouldFail(NotEnoughSharesError, |
---|
98 | "test_all_shares_vanished", |
---|
99 | "ran out of servers", |
---|
100 | self.do_download, servermap) |
---|
101 | return d1 |
---|
102 | d.addCallback(_remove_shares) |
---|
103 | return d |
---|
104 | |
---|
105 | def test_all_but_two_shares_vanished_updated_servermap(self): |
---|
106 | # tests error reporting for ticket #1742 |
---|
107 | d = self.make_servermap() |
---|
108 | def _remove_shares(servermap): |
---|
109 | self._version = servermap.best_recoverable_version() |
---|
110 | for shares in list(self._storage._peers.values())[2:]: |
---|
111 | shares.clear() |
---|
112 | return self.make_servermap(servermap) |
---|
113 | d.addCallback(_remove_shares) |
---|
114 | def _check(updated_servermap): |
---|
115 | d1 = self.shouldFail(NotEnoughSharesError, |
---|
116 | "test_all_but_two_shares_vanished_updated_servermap", |
---|
117 | "ran out of servers", |
---|
118 | self.do_download, updated_servermap, version=self._version) |
---|
119 | return d1 |
---|
120 | d.addCallback(_check) |
---|
121 | return d |
---|
122 | |
---|
123 | def test_no_servers(self): |
---|
124 | sb2 = make_storagebroker(num_peers=0) |
---|
125 | # if there are no servers, then a MODE_READ servermap should come |
---|
126 | # back empty |
---|
127 | d = self.make_servermap(sb=sb2) |
---|
128 | def _check_servermap(servermap): |
---|
129 | self.assertThat(servermap.best_recoverable_version(), Equals(None)) |
---|
130 | self.assertFalse(servermap.recoverable_versions()) |
---|
131 | self.assertFalse(servermap.unrecoverable_versions()) |
---|
132 | self.assertFalse(servermap.all_servers()) |
---|
133 | d.addCallback(_check_servermap) |
---|
134 | return d |
---|
135 | |
---|
136 | def test_no_servers_download(self): |
---|
137 | sb2 = make_storagebroker(num_peers=0) |
---|
138 | self._fn._storage_broker = sb2 |
---|
139 | d = self.shouldFail(UnrecoverableFileError, |
---|
140 | "test_no_servers_download", |
---|
141 | "no recoverable versions", |
---|
142 | self._fn.download_best_version) |
---|
143 | def _restore(res): |
---|
144 | # a failed download that occurs while we aren't connected to |
---|
145 | # anybody should not prevent a subsequent download from working. |
---|
146 | # This isn't quite the webapi-driven test that #463 wants, but it |
---|
147 | # should be close enough. |
---|
148 | self._fn._storage_broker = self._storage_broker |
---|
149 | return self._fn.download_best_version() |
---|
150 | def _retrieved(new_contents): |
---|
151 | self.assertThat(new_contents, Equals(self.CONTENTS)) |
---|
152 | d.addCallback(_restore) |
---|
153 | d.addCallback(_retrieved) |
---|
154 | return d |
---|
155 | |
---|
156 | |
---|
157 | def _test_corrupt_all(self, offset, substring, |
---|
158 | should_succeed=False, |
---|
159 | corrupt_early=True, |
---|
160 | failure_checker=None, |
---|
161 | fetch_privkey=False): |
---|
162 | d = defer.succeed(None) |
---|
163 | if corrupt_early: |
---|
164 | d.addCallback(corrupt, self._storage, offset) |
---|
165 | d.addCallback(lambda res: self.make_servermap()) |
---|
166 | if not corrupt_early: |
---|
167 | d.addCallback(corrupt, self._storage, offset) |
---|
168 | def _do_retrieve(servermap): |
---|
169 | ver = servermap.best_recoverable_version() |
---|
170 | if ver is None and not should_succeed: |
---|
171 | # no recoverable versions == not succeeding. The problem |
---|
172 | # should be noted in the servermap's list of problems. |
---|
173 | if substring: |
---|
174 | allproblems = [str(f) for f in servermap.get_problems()] |
---|
175 | self.assertThat("".join(allproblems), Contains(substring)) |
---|
176 | return servermap |
---|
177 | if should_succeed: |
---|
178 | d1 = self._fn.download_version(servermap, ver, |
---|
179 | fetch_privkey) |
---|
180 | d1.addCallback(lambda new_contents: |
---|
181 | self.assertThat(new_contents, Equals(self.CONTENTS))) |
---|
182 | else: |
---|
183 | d1 = self.shouldFail(NotEnoughSharesError, |
---|
184 | "_corrupt_all(offset=%s)" % (offset,), |
---|
185 | substring, |
---|
186 | self._fn.download_version, servermap, |
---|
187 | ver, |
---|
188 | fetch_privkey) |
---|
189 | if failure_checker: |
---|
190 | d1.addCallback(failure_checker) |
---|
191 | d1.addCallback(lambda res: servermap) |
---|
192 | return d1 |
---|
193 | d.addCallback(_do_retrieve) |
---|
194 | return d |
---|
195 | |
---|
196 | def test_corrupt_all_verbyte(self): |
---|
197 | # when the version byte is not 0 or 1, we hit an UnknownVersionError |
---|
198 | # error in unpack_share(). |
---|
199 | d = self._test_corrupt_all(0, "UnknownVersionError") |
---|
200 | def _check_servermap(servermap): |
---|
201 | # and the dump should mention the problems |
---|
202 | s = StringIO() |
---|
203 | dump = servermap.dump(s).getvalue() |
---|
204 | self.assertTrue("30 PROBLEMS" in dump, msg=dump) |
---|
205 | d.addCallback(_check_servermap) |
---|
206 | return d |
---|
207 | |
---|
208 | def test_corrupt_all_seqnum(self): |
---|
209 | # a corrupt sequence number will trigger a bad signature |
---|
210 | return self._test_corrupt_all(1, "signature is invalid") |
---|
211 | |
---|
212 | def test_corrupt_all_R(self): |
---|
213 | # a corrupt root hash will trigger a bad signature |
---|
214 | return self._test_corrupt_all(9, "signature is invalid") |
---|
215 | |
---|
216 | def test_corrupt_all_IV(self): |
---|
217 | # a corrupt salt/IV will trigger a bad signature |
---|
218 | return self._test_corrupt_all(41, "signature is invalid") |
---|
219 | |
---|
220 | def test_corrupt_all_k(self): |
---|
221 | # a corrupt 'k' will trigger a bad signature |
---|
222 | return self._test_corrupt_all(57, "signature is invalid") |
---|
223 | |
---|
224 | def test_corrupt_all_N(self): |
---|
225 | # a corrupt 'N' will trigger a bad signature |
---|
226 | return self._test_corrupt_all(58, "signature is invalid") |
---|
227 | |
---|
228 | def test_corrupt_all_segsize(self): |
---|
229 | # a corrupt segsize will trigger a bad signature |
---|
230 | return self._test_corrupt_all(59, "signature is invalid") |
---|
231 | |
---|
232 | def test_corrupt_all_datalen(self): |
---|
233 | # a corrupt data length will trigger a bad signature |
---|
234 | return self._test_corrupt_all(67, "signature is invalid") |
---|
235 | |
---|
236 | def test_corrupt_all_pubkey(self): |
---|
237 | # a corrupt pubkey won't match the URI's fingerprint. We need to |
---|
238 | # remove the pubkey from the filenode, or else it won't bother trying |
---|
239 | # to update it. |
---|
240 | self._fn._pubkey = None |
---|
241 | return self._test_corrupt_all("pubkey", |
---|
242 | "pubkey doesn't match fingerprint") |
---|
243 | |
---|
244 | def test_corrupt_all_sig(self): |
---|
245 | # a corrupt signature is a bad one |
---|
246 | # the signature runs from about [543:799], depending upon the length |
---|
247 | # of the pubkey |
---|
248 | return self._test_corrupt_all("signature", "signature is invalid") |
---|
249 | |
---|
250 | def test_corrupt_all_share_hash_chain_number(self): |
---|
251 | # a corrupt share hash chain entry will show up as a bad hash. If we |
---|
252 | # mangle the first byte, that will look like a bad hash number, |
---|
253 | # causing an IndexError |
---|
254 | return self._test_corrupt_all("share_hash_chain", "corrupt hashes") |
---|
255 | |
---|
256 | def test_corrupt_all_share_hash_chain_hash(self): |
---|
257 | # a corrupt share hash chain entry will show up as a bad hash. If we |
---|
258 | # mangle a few bytes in, that will look like a bad hash. |
---|
259 | return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes") |
---|
260 | |
---|
261 | def test_corrupt_all_block_hash_tree(self): |
---|
262 | return self._test_corrupt_all("block_hash_tree", |
---|
263 | "block hash tree failure") |
---|
264 | |
---|
265 | def test_corrupt_all_block(self): |
---|
266 | return self._test_corrupt_all("share_data", "block hash tree failure") |
---|
267 | |
---|
268 | def test_corrupt_all_encprivkey(self): |
---|
269 | # a corrupted privkey won't even be noticed by the reader, only by a |
---|
270 | # writer. |
---|
271 | return self._test_corrupt_all("enc_privkey", None, should_succeed=True) |
---|
272 | |
---|
273 | |
---|
274 | def test_corrupt_all_encprivkey_late(self): |
---|
275 | # this should work for the same reason as above, but we corrupt |
---|
276 | # after the servermap update to exercise the error handling |
---|
277 | # code. |
---|
278 | # We need to remove the privkey from the node, or the retrieve |
---|
279 | # process won't know to update it. |
---|
280 | self._fn._privkey = None |
---|
281 | return self._test_corrupt_all("enc_privkey", |
---|
282 | None, # this shouldn't fail |
---|
283 | should_succeed=True, |
---|
284 | corrupt_early=False, |
---|
285 | fetch_privkey=True) |
---|
286 | |
---|
287 | |
---|
288 | # disabled until retrieve tests checkstring on each blockfetch. I didn't |
---|
289 | # just use a .todo because the failing-but-ignored test emits about 30kB |
---|
290 | # of noise. |
---|
291 | def OFF_test_corrupt_all_seqnum_late(self): |
---|
292 | # corrupting the seqnum between mapupdate and retrieve should result |
---|
293 | # in NotEnoughSharesError, since each share will look invalid |
---|
294 | def _check(res): |
---|
295 | f = res[0] |
---|
296 | self.assertThat(f.check(NotEnoughSharesError), HasLength(1)) |
---|
297 | self.assertThat("uncoordinated write" in str(f), Equals(True)) |
---|
298 | return self._test_corrupt_all(1, "ran out of servers", |
---|
299 | corrupt_early=False, |
---|
300 | failure_checker=_check) |
---|
301 | |
---|
302 | |
---|
303 | def test_corrupt_all_block_late(self): |
---|
304 | def _check(res): |
---|
305 | f = res[0] |
---|
306 | self.assertTrue(f.check(NotEnoughSharesError)) |
---|
307 | return self._test_corrupt_all("share_data", "block hash tree failure", |
---|
308 | corrupt_early=False, |
---|
309 | failure_checker=_check) |
---|
310 | |
---|
311 | |
---|
312 | def test_basic_pubkey_at_end(self): |
---|
313 | # we corrupt the pubkey in all but the last 'k' shares, allowing the |
---|
314 | # download to succeed but forcing a bunch of retries first. Note that |
---|
315 | # this is rather pessimistic: our Retrieve process will throw away |
---|
316 | # the whole share if the pubkey is bad, even though the rest of the |
---|
317 | # share might be good. |
---|
318 | |
---|
319 | self._fn._pubkey = None |
---|
320 | k = self._fn.get_required_shares() |
---|
321 | N = self._fn.get_total_shares() |
---|
322 | d = defer.succeed(None) |
---|
323 | d.addCallback(corrupt, self._storage, "pubkey", |
---|
324 | shnums_to_corrupt=list(range(0, N-k))) |
---|
325 | d.addCallback(lambda res: self.make_servermap()) |
---|
326 | def _do_retrieve(servermap): |
---|
327 | self.assertTrue(servermap.get_problems()) |
---|
328 | self.assertThat("pubkey doesn't match fingerprint" |
---|
329 | in str(servermap.get_problems()[0]), Equals(True)) |
---|
330 | ver = servermap.best_recoverable_version() |
---|
331 | r = Retrieve(self._fn, self._storage_broker, servermap, ver) |
---|
332 | c = consumer.MemoryConsumer() |
---|
333 | return r.download(c) |
---|
334 | d.addCallback(_do_retrieve) |
---|
335 | d.addCallback(lambda mc: b"".join(mc.chunks)) |
---|
336 | d.addCallback(lambda new_contents: |
---|
337 | self.assertThat(new_contents, Equals(self.CONTENTS))) |
---|
338 | return d |
---|
339 | |
---|
340 | |
---|
341 | def _test_corrupt_some(self, offset, mdmf=False): |
---|
342 | if mdmf: |
---|
343 | d = self.publish_mdmf() |
---|
344 | else: |
---|
345 | d = defer.succeed(None) |
---|
346 | d.addCallback(lambda ignored: |
---|
347 | corrupt(None, self._storage, offset, list(range(5)))) |
---|
348 | d.addCallback(lambda ignored: |
---|
349 | self.make_servermap()) |
---|
350 | def _do_retrieve(servermap): |
---|
351 | ver = servermap.best_recoverable_version() |
---|
352 | self.assertTrue(ver) |
---|
353 | return self._fn.download_best_version() |
---|
354 | d.addCallback(_do_retrieve) |
---|
355 | d.addCallback(lambda new_contents: |
---|
356 | self.assertThat(new_contents, Equals(self.CONTENTS))) |
---|
357 | return d |
---|
358 | |
---|
359 | |
---|
360 | def test_corrupt_some(self): |
---|
361 | # corrupt the data of first five shares (so the servermap thinks |
---|
362 | # they're good but retrieve marks them as bad), so that the |
---|
363 | # MODE_READ set of 6 will be insufficient, forcing node.download to |
---|
364 | # retry with more servers. |
---|
365 | return self._test_corrupt_some("share_data") |
---|
366 | |
---|
367 | |
---|
368 | def test_download_fails(self): |
---|
369 | d = corrupt(None, self._storage, "signature") |
---|
370 | d.addCallback(lambda ignored: |
---|
371 | self.shouldFail(UnrecoverableFileError, "test_download_anyway", |
---|
372 | "no recoverable versions", |
---|
373 | self._fn.download_best_version)) |
---|
374 | return d |
---|
375 | |
---|
376 | |
---|
377 | |
---|
378 | def test_corrupt_mdmf_block_hash_tree(self): |
---|
379 | d = self.publish_mdmf() |
---|
380 | d.addCallback(lambda ignored: |
---|
381 | self._test_corrupt_all(("block_hash_tree", 12 * 32), |
---|
382 | "block hash tree failure", |
---|
383 | corrupt_early=True, |
---|
384 | should_succeed=False)) |
---|
385 | return d |
---|
386 | |
---|
387 | |
---|
388 | def test_corrupt_mdmf_block_hash_tree_late(self): |
---|
389 | # Note - there is no SDMF counterpart to this test, as the SDMF |
---|
390 | # files are guaranteed to have exactly one block, and therefore |
---|
391 | # the block hash tree fits within the initial read (#1240). |
---|
392 | d = self.publish_mdmf() |
---|
393 | d.addCallback(lambda ignored: |
---|
394 | self._test_corrupt_all(("block_hash_tree", 12 * 32), |
---|
395 | "block hash tree failure", |
---|
396 | corrupt_early=False, |
---|
397 | should_succeed=False)) |
---|
398 | return d |
---|
399 | |
---|
400 | |
---|
401 | def test_corrupt_mdmf_share_data(self): |
---|
402 | d = self.publish_mdmf() |
---|
403 | d.addCallback(lambda ignored: |
---|
404 | # TODO: Find out what the block size is and corrupt a |
---|
405 | # specific block, rather than just guessing. |
---|
406 | self._test_corrupt_all(("share_data", 12 * 40), |
---|
407 | "block hash tree failure", |
---|
408 | corrupt_early=True, |
---|
409 | should_succeed=False)) |
---|
410 | return d |
---|
411 | |
---|
412 | |
---|
413 | def test_corrupt_some_mdmf(self): |
---|
414 | return self._test_corrupt_some(("share_data", 12 * 40), |
---|
415 | mdmf=True) |
---|