1 | # -*- coding: utf-8 -*- |
---|
2 | """ |
---|
3 | Ported to Python 3. |
---|
4 | """ |
---|
5 | |
---|
6 | from allmydata.test import common |
---|
7 | from allmydata.monitor import Monitor |
---|
8 | from allmydata import check_results |
---|
9 | from allmydata.interfaces import NotEnoughSharesError |
---|
10 | from allmydata.immutable import upload |
---|
11 | from allmydata.util.consumer import download_to_data |
---|
12 | from twisted.internet import defer |
---|
13 | from twisted.trial import unittest |
---|
14 | import random |
---|
15 | from allmydata.test.no_network import GridTestMixin |
---|
16 | |
---|
17 | # We'll allow you to pass this test even if you trigger eighteen times as |
---|
18 | # many disk reads and block fetches as would be optimal. |
---|
19 | READ_LEEWAY = 18 |
---|
20 | MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10 |
---|
21 | |
---|
22 | timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree |
---|
23 | |
---|
24 | class RepairTestMixin(object): |
---|
25 | def _count_reads(self): |
---|
26 | sum_of_read_counts = 0 |
---|
27 | for (i, ss, storedir) in self.iterate_servers(): |
---|
28 | counters = ss.stats_provider.get_stats()['counters'] |
---|
29 | sum_of_read_counts += counters.get('storage_server.read', 0) |
---|
30 | return sum_of_read_counts |
---|
31 | |
---|
32 | def _count_allocates(self): |
---|
33 | sum_of_allocate_counts = 0 |
---|
34 | for (i, ss, storedir) in self.iterate_servers(): |
---|
35 | counters = ss.stats_provider.get_stats()['counters'] |
---|
36 | sum_of_allocate_counts += counters.get('storage_server.allocate', 0) |
---|
37 | return sum_of_allocate_counts |
---|
38 | |
---|
39 | def _count_writes(self): |
---|
40 | sum_of_write_counts = 0 |
---|
41 | for (i, ss, storedir) in self.iterate_servers(): |
---|
42 | counters = ss.stats_provider.get_stats()['counters'] |
---|
43 | sum_of_write_counts += counters.get('storage_server.write', 0) |
---|
44 | return sum_of_write_counts |
---|
45 | |
---|
46 | def _stash_counts(self): |
---|
47 | self.before_repair_reads = self._count_reads() |
---|
48 | self.before_repair_allocates = self._count_allocates() |
---|
49 | self.before_repair_writes = self._count_writes() |
---|
50 | |
---|
51 | def _get_delta_counts(self): |
---|
52 | delta_reads = self._count_reads() - self.before_repair_reads |
---|
53 | delta_allocates = self._count_allocates() - self.before_repair_allocates |
---|
54 | delta_writes = self._count_writes() - self.before_repair_writes |
---|
55 | return (delta_reads, delta_allocates, delta_writes) |
---|
56 | |
---|
57 | def failIfBigger(self, x, y): |
---|
58 | self.failIf(x > y, "%s > %s" % (x, y)) |
---|
59 | |
---|
60 | def upload_and_stash(self): |
---|
61 | c0 = self.g.clients[0] |
---|
62 | c1 = self.g.clients[1] |
---|
63 | c0.encoding_params['max_segment_size'] = 12 |
---|
64 | d = c0.upload(upload.Data(common.TEST_DATA, convergence=b"")) |
---|
65 | def _stash_uri(ur): |
---|
66 | self.uri = ur.get_uri() |
---|
67 | self.c0_filenode = c0.create_node_from_uri(ur.get_uri()) |
---|
68 | self.c1_filenode = c1.create_node_from_uri(ur.get_uri()) |
---|
69 | d.addCallback(_stash_uri) |
---|
70 | return d |
---|
71 | |
---|
72 | class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin): |
---|
73 | def test_check_without_verify(self): |
---|
74 | """Check says the file is healthy when none of the shares have been |
---|
75 | touched. It says that the file is unhealthy when all of them have |
---|
76 | been removed. It doesn't use any reads. |
---|
77 | """ |
---|
78 | self.basedir = "repairer/Verifier/check_without_verify" |
---|
79 | self.set_up_grid(num_clients=2) |
---|
80 | d = self.upload_and_stash() |
---|
81 | d.addCallback(lambda ignored: self._stash_counts()) |
---|
82 | d.addCallback(lambda ignored: |
---|
83 | self.c0_filenode.check(Monitor(), verify=False)) |
---|
84 | def _check(cr): |
---|
85 | self.failUnless(cr.is_healthy()) |
---|
86 | delta_reads, delta_allocates, delta_writes = self._get_delta_counts() |
---|
87 | self.failIfBigger(delta_reads, 0) |
---|
88 | d.addCallback(_check) |
---|
89 | |
---|
90 | def _remove_all(ignored): |
---|
91 | for sh in self.find_uri_shares(self.uri): |
---|
92 | self.delete_share(sh) |
---|
93 | d.addCallback(_remove_all) |
---|
94 | |
---|
95 | d.addCallback(lambda ignored: self._stash_counts()) |
---|
96 | d.addCallback(lambda ignored: |
---|
97 | self.c0_filenode.check(Monitor(), verify=False)) |
---|
98 | def _check2(cr): |
---|
99 | self.failIf(cr.is_healthy()) |
---|
100 | delta_reads, delta_allocates, delta_writes = self._get_delta_counts() |
---|
101 | self.failIfBigger(delta_reads, 0) |
---|
102 | d.addCallback(_check2) |
---|
103 | return d |
---|
104 | |
---|
105 | def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False): |
---|
106 | self.set_up_grid(num_clients=2) |
---|
107 | d = self.upload_and_stash() |
---|
108 | d.addCallback(lambda ignored: self._stash_counts()) |
---|
109 | |
---|
110 | d.addCallback(lambda ignored: |
---|
111 | self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug)) |
---|
112 | d.addCallback(lambda ignored: |
---|
113 | self.c1_filenode.check(Monitor(), verify=True)) |
---|
114 | def _check(vr): |
---|
115 | delta_reads, delta_allocates, delta_writes = self._get_delta_counts() |
---|
116 | self.failIfBigger(delta_reads, MAX_DELTA_READS) |
---|
117 | try: |
---|
118 | judgement(vr) |
---|
119 | except unittest.FailTest as e: |
---|
120 | # FailTest just uses e.args[0] == str |
---|
121 | new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.as_dict()) |
---|
122 | e.args = (new_arg,) |
---|
123 | raise |
---|
124 | d.addCallback(_check) |
---|
125 | return d |
---|
126 | |
---|
127 | def judge_no_problem(self, vr): |
---|
128 | """ Verify says the file is healthy when none of the shares have been |
---|
129 | touched in a way that matters. It doesn't use more than seven times |
---|
130 | as many reads as it needs.""" |
---|
131 | self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict())) |
---|
132 | self.failUnlessEqual(vr.get_share_counter_good(), 10) |
---|
133 | self.failUnlessEqual(len(vr.get_sharemap()), 10) |
---|
134 | self.failUnlessEqual(vr.get_encoding_needed(), 3) |
---|
135 | self.failUnlessEqual(vr.get_encoding_expected(), 10) |
---|
136 | self.failUnlessEqual(vr.get_host_counter_good_shares(), 10) |
---|
137 | self.failUnlessEqual(len(vr.get_servers_responding()), 10) |
---|
138 | self.failUnlessEqual(len(vr.get_corrupt_shares()), 0) |
---|
139 | |
---|
140 | def test_ok_no_corruption(self): |
---|
141 | self.basedir = "repairer/Verifier/ok_no_corruption" |
---|
142 | return self._help_test_verify(common._corrupt_nothing, |
---|
143 | self.judge_no_problem) |
---|
144 | |
---|
145 | def test_ok_filedata_size(self): |
---|
146 | self.basedir = "repairer/Verifier/ok_filedatasize" |
---|
147 | return self._help_test_verify(common._corrupt_size_of_file_data, |
---|
148 | self.judge_no_problem) |
---|
149 | |
---|
150 | def test_ok_sharedata_size(self): |
---|
151 | self.basedir = "repairer/Verifier/ok_sharedata_size" |
---|
152 | return self._help_test_verify(common._corrupt_size_of_sharedata, |
---|
153 | self.judge_no_problem) |
---|
154 | |
---|
155 | def test_ok_segment_size(self): |
---|
156 | self.basedir = "repairer/Verifier/test_ok_segment_size" |
---|
157 | return self._help_test_verify(common._corrupt_segment_size, |
---|
158 | self.judge_no_problem) |
---|
159 | |
---|
160 | def judge_visible_corruption(self, vr): |
---|
161 | """Corruption which is detected by the server means that the server |
---|
162 | will send you back a Failure in response to get_bucket instead of |
---|
163 | giving you the share data. Test that verifier handles these answers |
---|
164 | correctly. It doesn't use more than seven times as many reads as it |
---|
165 | needs.""" |
---|
166 | self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict())) |
---|
167 | self.failUnlessEqual(vr.get_share_counter_good(), 9) |
---|
168 | self.failUnlessEqual(len(vr.get_sharemap()), 9) |
---|
169 | self.failUnlessEqual(vr.get_encoding_needed(), 3) |
---|
170 | self.failUnlessEqual(vr.get_encoding_expected(), 10) |
---|
171 | self.failUnlessEqual(vr.get_host_counter_good_shares(), 9) |
---|
172 | self.failUnlessEqual(len(vr.get_servers_responding()), 9) |
---|
173 | self.failUnlessEqual(len(vr.get_corrupt_shares()), 0) |
---|
174 | |
---|
175 | def test_corrupt_file_verno(self): |
---|
176 | self.basedir = "repairer/Verifier/corrupt_file_verno" |
---|
177 | return self._help_test_verify(common._corrupt_file_version_number, |
---|
178 | self.judge_visible_corruption) |
---|
179 | |
---|
180 | def judge_share_version_incompatibility(self, vr): |
---|
181 | # corruption of the share version (inside the container, the 1/2 |
---|
182 | # value that determines whether we've got 4-byte offsets or 8-byte |
---|
183 | # offsets) to something larger than 2 will trigger a |
---|
184 | # ShareVersionIncompatible exception, which should be counted in |
---|
185 | # list-incompatible-shares, rather than list-corrupt-shares. |
---|
186 | self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict())) |
---|
187 | self.failUnlessEqual(vr.get_share_counter_good(), 9) |
---|
188 | self.failUnlessEqual(len(vr.get_sharemap()), 9) |
---|
189 | self.failUnlessEqual(vr.get_encoding_needed(), 3) |
---|
190 | self.failUnlessEqual(vr.get_encoding_expected(), 10) |
---|
191 | self.failUnlessEqual(vr.get_host_counter_good_shares(), 9) |
---|
192 | self.failUnlessEqual(len(vr.get_servers_responding()), 10) |
---|
193 | self.failUnlessEqual(len(vr.get_corrupt_shares()), 0) |
---|
194 | self.failUnlessEqual(len(vr.get_incompatible_shares()), 1) |
---|
195 | |
---|
196 | def test_corrupt_share_verno(self): |
---|
197 | self.basedir = "repairer/Verifier/corrupt_share_verno" |
---|
198 | return self._help_test_verify(common._corrupt_sharedata_version_number, |
---|
199 | self.judge_share_version_incompatibility) |
---|
200 | |
---|
201 | def judge_invisible_corruption(self, vr): |
---|
202 | # corruption of fields that the server does not check (which is most |
---|
203 | # of them), which will be detected by the client as it downloads |
---|
204 | # those shares. |
---|
205 | self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict())) |
---|
206 | self.failUnlessEqual(vr.get_share_counter_good(), 9) |
---|
207 | self.failUnlessEqual(vr.get_encoding_needed(), 3) |
---|
208 | self.failUnlessEqual(vr.get_encoding_expected(), 10) |
---|
209 | self.failUnlessEqual(vr.get_host_counter_good_shares(), 9) |
---|
210 | self.failUnlessEqual(len(vr.get_corrupt_shares()), 1) |
---|
211 | self.failUnlessEqual(len(vr.get_incompatible_shares()), 0) |
---|
212 | self.failUnlessEqual(len(vr.get_servers_responding()), 10) |
---|
213 | self.failUnlessEqual(len(vr.get_sharemap()), 9) |
---|
214 | |
---|
215 | def test_corrupt_sharedata_offset(self): |
---|
216 | self.basedir = "repairer/Verifier/corrupt_sharedata_offset" |
---|
217 | return self._help_test_verify(common._corrupt_offset_of_sharedata, |
---|
218 | self.judge_invisible_corruption) |
---|
219 | |
---|
220 | def test_corrupt_ueb_offset(self): |
---|
221 | self.basedir = "repairer/Verifier/corrupt_ueb_offset" |
---|
222 | return self._help_test_verify(common._corrupt_offset_of_uri_extension, |
---|
223 | self.judge_invisible_corruption) |
---|
224 | |
---|
225 | def test_corrupt_ueb_offset_shortread(self): |
---|
226 | self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread" |
---|
227 | return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read, |
---|
228 | self.judge_invisible_corruption) |
---|
229 | |
---|
230 | def test_corrupt_sharedata(self): |
---|
231 | self.basedir = "repairer/Verifier/corrupt_sharedata" |
---|
232 | return self._help_test_verify(common._corrupt_share_data, |
---|
233 | self.judge_invisible_corruption) |
---|
234 | |
---|
235 | def test_corrupt_sharedata_last_byte(self): |
---|
236 | self.basedir = "repairer/Verifier/corrupt_sharedata_last_byte" |
---|
237 | return self._help_test_verify(common._corrupt_share_data_last_byte, |
---|
238 | self.judge_invisible_corruption) |
---|
239 | |
---|
240 | def test_corrupt_ueb_length(self): |
---|
241 | self.basedir = "repairer/Verifier/corrupt_ueb_length" |
---|
242 | return self._help_test_verify(common._corrupt_length_of_uri_extension, |
---|
243 | self.judge_invisible_corruption) |
---|
244 | |
---|
245 | def test_corrupt_ueb(self): |
---|
246 | # Note that in some rare situations this might fail, specifically if |
---|
247 | # the length of the UEB is corrupted to be a value that is bigger than |
---|
248 | # the size but less than 2000, it might not get caught... But that's |
---|
249 | # mostly because in that case it doesn't meaningfully corrupt it. See |
---|
250 | # _get_uri_extension_the_old_way() in layout.py for where the 2000 |
---|
251 | # number comes from. |
---|
252 | self.basedir = "repairer/Verifier/corrupt_ueb" |
---|
253 | return self._help_test_verify(common._corrupt_uri_extension, |
---|
254 | self.judge_invisible_corruption) |
---|
255 | |
---|
256 | def test_truncate_crypttext_hashtree(self): |
---|
257 | # change the start of the block hashtree, to truncate the preceding |
---|
258 | # crypttext hashtree |
---|
259 | self.basedir = "repairer/Verifier/truncate_crypttext_hashtree" |
---|
260 | return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes, |
---|
261 | self.judge_invisible_corruption) |
---|
262 | |
---|
263 | def test_corrupt_block_hashtree_offset(self): |
---|
264 | self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset" |
---|
265 | return self._help_test_verify(common._corrupt_offset_of_block_hashes, |
---|
266 | self.judge_invisible_corruption) |
---|
267 | |
---|
268 | def test_wrong_share_verno(self): |
---|
269 | self.basedir = "repairer/Verifier/wrong_share_verno" |
---|
270 | return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version, |
---|
271 | self.judge_invisible_corruption) |
---|
272 | |
---|
273 | def test_corrupt_share_hashtree_offset(self): |
---|
274 | self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset" |
---|
275 | return self._help_test_verify(common._corrupt_offset_of_share_hashes, |
---|
276 | self.judge_invisible_corruption) |
---|
277 | |
---|
278 | def test_corrupt_crypttext_hashtree_offset(self): |
---|
279 | self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset" |
---|
280 | return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree, |
---|
281 | self.judge_invisible_corruption) |
---|
282 | |
---|
283 | def test_corrupt_crypttext_hashtree(self): |
---|
284 | self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree" |
---|
285 | return self._help_test_verify(common._corrupt_crypttext_hash_tree, |
---|
286 | self.judge_invisible_corruption) |
---|
287 | |
---|
288 | def test_corrupt_crypttext_hashtree_byte_x221(self): |
---|
289 | self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7" |
---|
290 | return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221, |
---|
291 | self.judge_invisible_corruption, debug=True) |
---|
292 | |
---|
293 | def test_corrupt_block_hashtree(self): |
---|
294 | self.basedir = "repairer/Verifier/corrupt_block_hashtree" |
---|
295 | return self._help_test_verify(common._corrupt_block_hashes, |
---|
296 | self.judge_invisible_corruption) |
---|
297 | |
---|
298 | def test_corrupt_share_hashtree(self): |
---|
299 | self.basedir = "repairer/Verifier/corrupt_share_hashtree" |
---|
300 | return self._help_test_verify(common._corrupt_share_hashes, |
---|
301 | self.judge_invisible_corruption) |
---|
302 | |
---|
303 | # TODO: the Verifier should decode to ciphertext and check it against the |
---|
304 | # crypttext-hash-tree. Check this by constructing a bogus file, in which |
---|
305 | # the crypttext-hash-tree is modified after encoding is done, but before |
---|
306 | # the UEB is finalized. The Verifier should see a valid |
---|
307 | # crypttext-hash-tree but then the ciphertext should show up as invalid. |
---|
308 | # Normally this could only be triggered by a bug in FEC decode. |
---|
309 | |
---|
310 | def OFF_test_each_byte(self): |
---|
311 | # this test takes 140s to run on my laptop, and doesn't have any |
---|
312 | # actual asserts, so it's commented out. It corrupts each byte of the |
---|
313 | # share in sequence, and checks to see which ones the Verifier |
---|
314 | # catches and which it misses. Ticket #819 contains details: there |
---|
315 | # are several portions of the share that are unused, for which |
---|
316 | # corruption is not supposed to be caught. |
---|
317 | # |
---|
318 | # If the test ran quickly, we could use the share size to compute the |
---|
319 | # offsets of these unused portions and assert that everything outside |
---|
320 | # of them was detected. We could then replace the rest of |
---|
321 | # Verifier.test_* (which takes 16s to run on my laptop) with this |
---|
322 | # one. |
---|
323 | self.basedir = "repairer/Verifier/each_byte" |
---|
324 | self.set_up_grid(num_clients=2) |
---|
325 | d = self.upload_and_stash() |
---|
326 | def _grab_sh0(res): |
---|
327 | self.sh0_file = [sharefile |
---|
328 | for (shnum, serverid, sharefile) |
---|
329 | in self.find_uri_shares(self.uri) |
---|
330 | if shnum == 0][0] |
---|
331 | self.sh0_orig = open(self.sh0_file, "rb").read() |
---|
332 | d.addCallback(_grab_sh0) |
---|
333 | def _fix_sh0(res): |
---|
334 | f = open(self.sh0_file, "wb") |
---|
335 | f.write(self.sh0_orig) |
---|
336 | f.close() |
---|
337 | def _corrupt(ign, which): |
---|
338 | def _corruptor(s, debug=False): |
---|
339 | return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] |
---|
340 | self.corrupt_shares_numbered(self.uri, [0], _corruptor) |
---|
341 | results = {} |
---|
342 | def _did_check(vr, i): |
---|
343 | #print("corrupt %d: healthy=%s" % (i, vr.is_healthy())) |
---|
344 | results[i] = vr.is_healthy() |
---|
345 | def _start(ign): |
---|
346 | d = defer.succeed(None) |
---|
347 | for i in range(len(self.sh0_orig)): |
---|
348 | d.addCallback(_corrupt, i) |
---|
349 | d.addCallback(lambda ign: |
---|
350 | self.c1_filenode.check(Monitor(), verify=True)) |
---|
351 | d.addCallback(_did_check, i) |
---|
352 | d.addCallback(_fix_sh0) |
---|
353 | return d |
---|
354 | d.addCallback(_start) |
---|
355 | def _show_results(ign): |
---|
356 | f = open("test_each_byte_output", "w") |
---|
357 | for i in sorted(results.keys()): |
---|
358 | print("%d: %s" % (i, results[i]), file=f) |
---|
359 | f.close() |
---|
360 | print("Please look in _trial_temp/test_each_byte_output for results") |
---|
361 | d.addCallback(_show_results) |
---|
362 | return d |
---|
363 | |
---|
364 | # We'll allow you to pass this test even if you trigger thirty-five times as |
---|
365 | # many block sends and disk writes as would be optimal. |
---|
366 | WRITE_LEEWAY = 35 |
---|
367 | # Optimally, you could repair one of these (small) files in a single write. |
---|
368 | DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY |
---|
369 | |
---|
370 | class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, |
---|
371 | common.ShouldFailMixin): |
---|
372 | |
---|
373 | def test_harness(self): |
---|
374 | # This test is actually to make sure our test harness works, rather |
---|
375 | # than testing anything about Tahoe code itself. |
---|
376 | |
---|
377 | self.basedir = "repairer/Repairer/test_code" |
---|
378 | self.set_up_grid(num_clients=2) |
---|
379 | d = self.upload_and_stash() |
---|
380 | |
---|
381 | d.addCallback(lambda ignored: self.find_uri_shares(self.uri)) |
---|
382 | def _stash_shares(oldshares): |
---|
383 | self.oldshares = oldshares |
---|
384 | d.addCallback(_stash_shares) |
---|
385 | d.addCallback(lambda ignored: self.find_uri_shares(self.uri)) |
---|
386 | def _compare(newshares): |
---|
387 | self.failUnlessEqual(newshares, self.oldshares) |
---|
388 | d.addCallback(_compare) |
---|
389 | |
---|
390 | def _delete_8(ignored): |
---|
391 | shnum = self.oldshares[0][0] |
---|
392 | self.delete_shares_numbered(self.uri, [shnum]) |
---|
393 | for sh in self.oldshares[1:8]: |
---|
394 | self.delete_share(sh) |
---|
395 | d.addCallback(_delete_8) |
---|
396 | d.addCallback(lambda ignored: self.find_uri_shares(self.uri)) |
---|
397 | d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2)) |
---|
398 | |
---|
399 | d.addCallback(lambda ignored: |
---|
400 | self.shouldFail(NotEnoughSharesError, "then_download", |
---|
401 | None, |
---|
402 | download_to_data, self.c1_filenode)) |
---|
403 | |
---|
404 | d.addCallback(lambda ignored: |
---|
405 | self.shouldFail(NotEnoughSharesError, "then_repair", |
---|
406 | None, |
---|
407 | self.c1_filenode.check_and_repair, |
---|
408 | Monitor(), verify=False)) |
---|
409 | |
---|
410 | # test share corruption |
---|
411 | def _test_corrupt(ignored): |
---|
412 | olddata = {} |
---|
413 | shares = self.find_uri_shares(self.uri) |
---|
414 | for (shnum, serverid, sharefile) in shares: |
---|
415 | olddata[ (shnum, serverid) ] = open(sharefile, "rb").read() |
---|
416 | for sh in shares: |
---|
417 | self.corrupt_share(sh, common._corrupt_uri_extension) |
---|
418 | for (shnum, serverid, sharefile) in shares: |
---|
419 | newdata = open(sharefile, "rb").read() |
---|
420 | self.failIfEqual(olddata[ (shnum, serverid) ], newdata) |
---|
421 | d.addCallback(_test_corrupt) |
---|
422 | |
---|
423 | def _remove_all(ignored): |
---|
424 | for sh in self.find_uri_shares(self.uri): |
---|
425 | self.delete_share(sh) |
---|
426 | d.addCallback(_remove_all) |
---|
427 | d.addCallback(lambda ignored: self.find_uri_shares(self.uri)) |
---|
428 | d.addCallback(lambda shares: self.failUnlessEqual(shares, [])) |
---|
429 | |
---|
430 | return d |
---|
431 | |
---|
432 | def test_repair_from_deletion_of_1(self): |
---|
433 | """ Repair replaces a share that got deleted. """ |
---|
434 | self.basedir = "repairer/Repairer/repair_from_deletion_of_1" |
---|
435 | self.set_up_grid(num_clients=2) |
---|
436 | d = self.upload_and_stash() |
---|
437 | |
---|
438 | d.addCallback(lambda ignored: |
---|
439 | self.delete_shares_numbered(self.uri, [2])) |
---|
440 | d.addCallback(lambda ignored: self._stash_counts()) |
---|
441 | d.addCallback(lambda ignored: |
---|
442 | self.c0_filenode.check_and_repair(Monitor(), |
---|
443 | verify=False)) |
---|
444 | def _check_results(crr): |
---|
445 | self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults) |
---|
446 | pre = crr.get_pre_repair_results() |
---|
447 | self.failUnlessIsInstance(pre, check_results.CheckResults) |
---|
448 | post = crr.get_post_repair_results() |
---|
449 | self.failUnlessIsInstance(post, check_results.CheckResults) |
---|
450 | delta_reads, delta_allocates, delta_writes = self._get_delta_counts() |
---|
451 | self.failIfBigger(delta_reads, MAX_DELTA_READS) |
---|
452 | self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE) |
---|
453 | self.failIf(pre.is_healthy()) |
---|
454 | self.failUnless(post.is_healthy()) |
---|
455 | |
---|
456 | # Now we inspect the filesystem to make sure that it has 10 |
---|
457 | # shares. |
---|
458 | shares = self.find_uri_shares(self.uri) |
---|
459 | self.failIf(len(shares) < 10) |
---|
460 | d.addCallback(_check_results) |
---|
461 | |
---|
462 | d.addCallback(lambda ignored: |
---|
463 | self.c0_filenode.check(Monitor(), verify=True)) |
---|
464 | d.addCallback(lambda vr: self.failUnless(vr.is_healthy())) |
---|
465 | |
---|
466 | # Now we delete seven of the other shares, then try to download the |
---|
467 | # file and assert that it succeeds at downloading and has the right |
---|
468 | # contents. This can't work unless it has already repaired the |
---|
469 | # previously-deleted share #2. |
---|
470 | |
---|
471 | d.addCallback(lambda ignored: |
---|
472 | self.delete_shares_numbered(self.uri, list(range(3, 10+1)))) |
---|
473 | d.addCallback(lambda ignored: download_to_data(self.c1_filenode)) |
---|
474 | d.addCallback(lambda newdata: |
---|
475 | self.failUnlessEqual(newdata, common.TEST_DATA)) |
---|
476 | return d |
---|
477 | |
---|
478 | def test_repair_from_deletion_of_7(self): |
---|
479 | """ Repair replaces seven shares that got deleted. """ |
---|
480 | self.basedir = "repairer/Repairer/repair_from_deletion_of_7" |
---|
481 | self.set_up_grid(num_clients=2) |
---|
482 | d = self.upload_and_stash() |
---|
483 | d.addCallback(lambda ignored: |
---|
484 | self.delete_shares_numbered(self.uri, list(range(7)))) |
---|
485 | d.addCallback(lambda ignored: self._stash_counts()) |
---|
486 | d.addCallback(lambda ignored: |
---|
487 | self.c0_filenode.check_and_repair(Monitor(), |
---|
488 | verify=False)) |
---|
489 | def _check_results(crr): |
---|
490 | self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults) |
---|
491 | pre = crr.get_pre_repair_results() |
---|
492 | self.failUnlessIsInstance(pre, check_results.CheckResults) |
---|
493 | post = crr.get_post_repair_results() |
---|
494 | self.failUnlessIsInstance(post, check_results.CheckResults) |
---|
495 | delta_reads, delta_allocates, delta_writes = self._get_delta_counts() |
---|
496 | |
---|
497 | self.failIfBigger(delta_reads, MAX_DELTA_READS) |
---|
498 | self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7)) |
---|
499 | self.failIf(pre.is_healthy()) |
---|
500 | self.failUnless(post.is_healthy(), post.as_dict()) |
---|
501 | |
---|
502 | # Make sure we really have 10 shares. |
---|
503 | shares = self.find_uri_shares(self.uri) |
---|
504 | self.failIf(len(shares) < 10) |
---|
505 | d.addCallback(_check_results) |
---|
506 | |
---|
507 | d.addCallback(lambda ignored: |
---|
508 | self.c0_filenode.check(Monitor(), verify=True)) |
---|
509 | d.addCallback(lambda vr: self.failUnless(vr.is_healthy())) |
---|
510 | |
---|
511 | # Now we delete seven of the other shares, then try to download the |
---|
512 | # file and assert that it succeeds at downloading and has the right |
---|
513 | # contents. This can't work unless it has already repaired the |
---|
514 | # previously-deleted share #2. |
---|
515 | |
---|
516 | d.addCallback(lambda ignored: |
---|
517 | self.delete_shares_numbered(self.uri, list(range(3, 10+1)))) |
---|
518 | d.addCallback(lambda ignored: download_to_data(self.c1_filenode)) |
---|
519 | d.addCallback(lambda newdata: |
---|
520 | self.failUnlessEqual(newdata, common.TEST_DATA)) |
---|
521 | return d |
---|
522 | |
---|
523 | def test_repairer_servers_of_happiness(self): |
---|
524 | # The repairer is supposed to generate and place as many of the |
---|
525 | # missing shares as possible without caring about how they are |
---|
526 | # distributed. |
---|
527 | self.basedir = "repairer/Repairer/repairer_servers_of_happiness" |
---|
528 | self.set_up_grid(num_clients=2, num_servers=10) |
---|
529 | d = self.upload_and_stash() |
---|
530 | # Now delete some servers. We want to leave 3 servers, which |
---|
531 | # will allow us to restore the file to a healthy state without |
---|
532 | # distributing the shares widely enough to satisfy the default |
---|
533 | # happiness setting. |
---|
534 | def _delete_some_servers(ignored): |
---|
535 | for i in range(7): |
---|
536 | self.g.remove_server(self.g.servers_by_number[i].my_nodeid) |
---|
537 | |
---|
538 | assert len(self.g.servers_by_number) == 3 |
---|
539 | |
---|
540 | d.addCallback(_delete_some_servers) |
---|
541 | # Now try to repair the file. |
---|
542 | d.addCallback(lambda ignored: |
---|
543 | self.c0_filenode.check_and_repair(Monitor(), verify=False)) |
---|
544 | def _check_results(crr): |
---|
545 | self.failUnlessIsInstance(crr, |
---|
546 | check_results.CheckAndRepairResults) |
---|
547 | pre = crr.get_pre_repair_results() |
---|
548 | post = crr.get_post_repair_results() |
---|
549 | for p in (pre, post): |
---|
550 | self.failUnlessIsInstance(p, check_results.CheckResults) |
---|
551 | |
---|
552 | self.failIf(pre.is_healthy()) |
---|
553 | self.failUnless(post.is_healthy()) |
---|
554 | |
---|
555 | d.addCallback(_check_results) |
---|
556 | return d |
---|
557 | |
---|
558 | # why is test_repair_from_corruption_of_1 disabled? Read on: |
---|
559 | # |
---|
560 | # As recently documented in NEWS.rst for the 1.3.0 release, the current |
---|
561 | # immutable repairer suffers from several limitations: |
---|
562 | # |
---|
563 | # * minimalistic verifier: it's just download without decryption, so we |
---|
564 | # don't look for corruption in N-k shares, and for many fields (those |
---|
565 | # which are the same in all shares) we only look for corruption in a |
---|
566 | # single share |
---|
567 | # |
---|
568 | # * some kinds of corruption cause download to fail (when it ought to |
---|
569 | # just switch to a different share), so repair will fail on these too |
---|
570 | # |
---|
571 | # * RIStorageServer doesn't offer a way to delete old corrupt immutable |
---|
572 | # shares (the authority model is not at all clear), so the best the |
---|
573 | # repairer can do is to put replacement shares on new servers, |
---|
574 | # unfortunately leaving the corrupt shares in place |
---|
575 | # |
---|
576 | # This test is pretty strenuous: it asserts that the repairer does the |
---|
577 | # ideal thing in 8 distinct situations, with randomized corruption in |
---|
578 | # each. Because of the aforementioned limitations, it is highly unlikely |
---|
579 | # to pass any of these. We're also concerned that the download-fails case |
---|
580 | # can provoke a lost-progress bug (one was fixed, but there might be more |
---|
581 | # lurking), which will cause the test to fail despite a ".todo" marker, |
---|
582 | # and will probably cause subsequent unrelated tests to fail too (due to |
---|
583 | # "unclean reactor" problems). |
---|
584 | # |
---|
585 | # In addition, I (warner) have recently refactored the rest of this class |
---|
586 | # to use the much-faster no_network.GridTestMixin, so this tests needs to |
---|
587 | # be updated before it will be able to run again. |
---|
588 | # |
---|
589 | # So we're turning this test off until we've done one or more of the |
---|
590 | # following: |
---|
591 | # * remove some of these limitations |
---|
592 | # * break the test up into smaller, more functionally-oriented pieces |
---|
593 | # * simplify the repairer enough to let us be confident that it is free |
---|
594 | # of lost-progress bugs |
---|
595 | |
---|
596 | def OFF_test_repair_from_corruption_of_1(self): |
---|
597 | d = defer.succeed(None) |
---|
598 | |
---|
599 | d.addCallback(self.find_all_shares) |
---|
600 | stash = [None] |
---|
601 | def _stash_it(res): |
---|
602 | stash[0] = res |
---|
603 | return res |
---|
604 | d.addCallback(_stash_it) |
---|
605 | def _put_it_all_back(ignored): |
---|
606 | self.replace_shares(stash[0], storage_index=self.uri.get_storage_index()) |
---|
607 | return ignored |
---|
608 | |
---|
609 | def _repair_from_corruption(shnum, corruptor_func): |
---|
610 | before_repair_reads = self._count_reads() |
---|
611 | before_repair_allocates = self._count_writes() |
---|
612 | |
---|
613 | d2 = self.filenode.check_and_repair(Monitor(), verify=True) |
---|
614 | def _after_repair(checkandrepairresults): |
---|
615 | prerepairres = checkandrepairresults.get_pre_repair_results() |
---|
616 | postrepairres = checkandrepairresults.get_post_repair_results() |
---|
617 | after_repair_reads = self._count_reads() |
---|
618 | after_repair_allocates = self._count_writes() |
---|
619 | |
---|
620 | # The "* 2" in reads is because you might read a whole share |
---|
621 | # before figuring out that it is corrupted. It might be |
---|
622 | # possible to make this delta reads number a little tighter. |
---|
623 | self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads)) |
---|
624 | # The "* 2" in writes is because each server has two shares, |
---|
625 | # and it is reasonable for repairer to conclude that there |
---|
626 | # are two shares that it should upload, if the server fails |
---|
627 | # to serve the first share. |
---|
628 | self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates)) |
---|
629 | self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func)) |
---|
630 | self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func)) |
---|
631 | |
---|
632 | # Now we inspect the filesystem to make sure that it has 10 |
---|
633 | # shares. |
---|
634 | shares = self.find_all_shares() |
---|
635 | self.failIf(len(shares) < 10) |
---|
636 | |
---|
637 | # Now we assert that the verifier reports the file as healthy. |
---|
638 | d3 = self.filenode.check(Monitor(), verify=True) |
---|
639 | def _after_verify(verifyresults): |
---|
640 | self.failUnless(verifyresults.is_healthy()) |
---|
641 | d3.addCallback(_after_verify) |
---|
642 | |
---|
643 | # Now we delete seven of the other shares, then try to |
---|
644 | # download the file and assert that it succeeds at |
---|
645 | # downloading and has the right contents. This can't work |
---|
646 | # unless it has already repaired the previously-corrupted share. |
---|
647 | def _then_delete_7_and_try_a_download(unused=None): |
---|
648 | shnums = list(range(10)) |
---|
649 | shnums.remove(shnum) |
---|
650 | random.shuffle(shnums) |
---|
651 | for sharenum in shnums[:7]: |
---|
652 | self._delete_a_share(sharenum=sharenum) |
---|
653 | |
---|
654 | return self._download_and_check_plaintext() |
---|
655 | d3.addCallback(_then_delete_7_and_try_a_download) |
---|
656 | return d3 |
---|
657 | |
---|
658 | d2.addCallback(_after_repair) |
---|
659 | return d2 |
---|
660 | |
---|
661 | for corruptor_func in ( |
---|
662 | common._corrupt_file_version_number, |
---|
663 | common._corrupt_sharedata_version_number, |
---|
664 | common._corrupt_offset_of_sharedata, |
---|
665 | common._corrupt_offset_of_uri_extension, |
---|
666 | common._corrupt_offset_of_uri_extension_to_force_short_read, |
---|
667 | common._corrupt_share_data, |
---|
668 | common._corrupt_length_of_uri_extension, |
---|
669 | common._corrupt_uri_extension, |
---|
670 | ): |
---|
671 | # Now we corrupt a share... |
---|
672 | d.addCallback(self._corrupt_a_random_share, corruptor_func) |
---|
673 | # And repair... |
---|
674 | d.addCallback(_repair_from_corruption, corruptor_func) |
---|
675 | |
---|
676 | return d |
---|
677 | #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet." |
---|
678 | |
---|
679 | def test_tiny_reads(self): |
---|
680 | # ticket #1223 points out three problems: |
---|
681 | # repairer reads beyond end of input file |
---|
682 | # new-downloader does not tolerate overreads |
---|
683 | # uploader does lots of tiny reads, inefficient |
---|
684 | self.basedir = "repairer/Repairer/test_tiny_reads" |
---|
685 | self.set_up_grid() |
---|
686 | c0 = self.g.clients[0] |
---|
687 | DATA = b"a"*135 |
---|
688 | c0.encoding_params['k'] = 22 |
---|
689 | c0.encoding_params['n'] = 66 |
---|
690 | d = c0.upload(upload.Data(DATA, convergence=b"")) |
---|
691 | def _then(ur): |
---|
692 | self.uri = ur.get_uri() |
---|
693 | self.delete_shares_numbered(self.uri, [0]) |
---|
694 | self.c0_filenode = c0.create_node_from_uri(ur.get_uri()) |
---|
695 | self._stash_counts() |
---|
696 | return self.c0_filenode.check_and_repair(Monitor()) |
---|
697 | d.addCallback(_then) |
---|
698 | def _check(ign): |
---|
699 | (r,a,w) = self._get_delta_counts() |
---|
700 | # when the uploader (driven by the repairer) does full-segment |
---|
701 | # reads, this makes 44 server read calls (2*k). Before, when it |
---|
702 | # was doing input_chunk_size reads (7 bytes), it was doing over |
---|
703 | # 400. |
---|
704 | self.failIf(r > 100, "too many reads: %d>100" % r) |
---|
705 | d.addCallback(_check) |
---|
706 | return d |
---|
707 | |
---|
708 | def test_servers_responding(self): |
---|
709 | self.basedir = "repairer/Repairer/servers_responding" |
---|
710 | self.set_up_grid(num_clients=2) |
---|
711 | d = self.upload_and_stash() |
---|
712 | # now cause one of the servers to not respond during the pre-repair |
---|
713 | # filecheck, but then *do* respond to the post-repair filecheck |
---|
714 | def _then(ign): |
---|
715 | ss = self.g.servers_by_number[0] |
---|
716 | # we want to delete the share corresponding to the server |
---|
717 | # we're making not-respond |
---|
718 | share = next(ss.get_shares(self.c0_filenode.get_storage_index()))[0] |
---|
719 | self.delete_shares_numbered(self.uri, [share]) |
---|
720 | return self.c0_filenode.check_and_repair(Monitor()) |
---|
721 | d.addCallback(_then) |
---|
722 | def _check(rr): |
---|
723 | # this exercises a bug in which the servers-responding list did |
---|
724 | # not include servers that responded to the Repair, but which did |
---|
725 | # not respond to the pre-repair filecheck |
---|
726 | prr = rr.get_post_repair_results() |
---|
727 | expected = set(self.g.get_all_serverids()) |
---|
728 | responding_set = frozenset([s.get_serverid() for s in prr.get_servers_responding()]) |
---|
729 | self.failIf(expected - responding_set, expected - responding_set) |
---|
730 | self.failIf(responding_set - expected, responding_set - expected) |
---|
731 | self.failUnlessEqual(expected, |
---|
732 | set([s.get_serverid() |
---|
733 | for s in prr.get_servers_responding()])) |
---|
734 | d.addCallback(_check) |
---|
735 | return d |
---|
736 | |
---|
737 | # XXX extend these tests to show that the checker detects which specific |
---|
738 | # share on which specific server is broken -- this is necessary so that the |
---|
739 | # checker results can be passed to the repairer and the repairer can go ahead |
---|
740 | # and upload fixes without first doing what is effectively a check (/verify) |
---|
741 | # run |
---|
742 | |
---|
743 | # XXX extend these tests to show bad behavior of various kinds from servers: |
---|
744 | # raising exception from each remove_foo() method, for example |
---|
745 | |
---|
746 | # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit |
---|
747 | |
---|
748 | # XXX test corruption that truncates other hash trees than just the crypttext |
---|
749 | # hash tree |
---|
750 | |
---|
751 | # XXX test the notify-someone-about-corruption feature (also implement that |
---|
752 | # feature) |
---|
753 | |
---|
754 | # XXX test whether repairer (downloader) correctly downloads a file even if |
---|
755 | # to do so it has to acquire shares from a server that has already tried to |
---|
756 | # serve it a corrupted share. (I don't think the current downloader would |
---|
757 | # pass this test, depending on the kind of corruption.) |
---|