source: trunk/src/allmydata/test/test_repairer.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 35.6 KB
Line 
1# -*- coding: utf-8 -*-
2"""
3Ported to Python 3.
4"""
5
6from allmydata.test import common
7from allmydata.monitor import Monitor
8from allmydata import check_results
9from allmydata.interfaces import NotEnoughSharesError
10from allmydata.immutable import upload
11from allmydata.util.consumer import download_to_data
12from twisted.internet import defer
13from twisted.trial import unittest
14import random
15from allmydata.test.no_network import GridTestMixin
16
17# We'll allow you to pass this test even if you trigger eighteen times as
18# many disk reads and block fetches as would be optimal.
19READ_LEEWAY = 18
20MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
21
22timeout=240 # François's ARM box timed out after 120 seconds of Verifier.test_corrupt_crypttext_hashtree
23
24class RepairTestMixin(object):
25    def _count_reads(self):
26        sum_of_read_counts = 0
27        for (i, ss, storedir) in self.iterate_servers():
28            counters = ss.stats_provider.get_stats()['counters']
29            sum_of_read_counts += counters.get('storage_server.read', 0)
30        return sum_of_read_counts
31
32    def _count_allocates(self):
33        sum_of_allocate_counts = 0
34        for (i, ss, storedir) in self.iterate_servers():
35            counters = ss.stats_provider.get_stats()['counters']
36            sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
37        return sum_of_allocate_counts
38
39    def _count_writes(self):
40        sum_of_write_counts = 0
41        for (i, ss, storedir) in self.iterate_servers():
42            counters = ss.stats_provider.get_stats()['counters']
43            sum_of_write_counts += counters.get('storage_server.write', 0)
44        return sum_of_write_counts
45
46    def _stash_counts(self):
47        self.before_repair_reads = self._count_reads()
48        self.before_repair_allocates = self._count_allocates()
49        self.before_repair_writes = self._count_writes()
50
51    def _get_delta_counts(self):
52        delta_reads = self._count_reads() - self.before_repair_reads
53        delta_allocates = self._count_allocates() - self.before_repair_allocates
54        delta_writes = self._count_writes() - self.before_repair_writes
55        return (delta_reads, delta_allocates, delta_writes)
56
57    def failIfBigger(self, x, y):
58        self.failIf(x > y, "%s > %s" % (x, y))
59
60    def upload_and_stash(self):
61        c0 = self.g.clients[0]
62        c1 = self.g.clients[1]
63        c0.encoding_params['max_segment_size'] = 12
64        d = c0.upload(upload.Data(common.TEST_DATA, convergence=b""))
65        def _stash_uri(ur):
66            self.uri = ur.get_uri()
67            self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
68            self.c1_filenode = c1.create_node_from_uri(ur.get_uri())
69        d.addCallback(_stash_uri)
70        return d
71
72class Verifier(GridTestMixin, unittest.TestCase, RepairTestMixin):
73    def test_check_without_verify(self):
74        """Check says the file is healthy when none of the shares have been
75        touched. It says that the file is unhealthy when all of them have
76        been removed. It doesn't use any reads.
77        """
78        self.basedir = "repairer/Verifier/check_without_verify"
79        self.set_up_grid(num_clients=2)
80        d = self.upload_and_stash()
81        d.addCallback(lambda ignored: self._stash_counts())
82        d.addCallback(lambda ignored:
83                      self.c0_filenode.check(Monitor(), verify=False))
84        def _check(cr):
85            self.failUnless(cr.is_healthy())
86            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
87            self.failIfBigger(delta_reads, 0)
88        d.addCallback(_check)
89
90        def _remove_all(ignored):
91            for sh in self.find_uri_shares(self.uri):
92                self.delete_share(sh)
93        d.addCallback(_remove_all)
94
95        d.addCallback(lambda ignored: self._stash_counts())
96        d.addCallback(lambda ignored:
97                      self.c0_filenode.check(Monitor(), verify=False))
98        def _check2(cr):
99            self.failIf(cr.is_healthy())
100            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
101            self.failIfBigger(delta_reads, 0)
102        d.addCallback(_check2)
103        return d
104
105    def _help_test_verify(self, corruptor, judgement, shnum=0, debug=False):
106        self.set_up_grid(num_clients=2)
107        d = self.upload_and_stash()
108        d.addCallback(lambda ignored: self._stash_counts())
109
110        d.addCallback(lambda ignored:
111                      self.corrupt_shares_numbered(self.uri, [shnum],corruptor,debug=debug))
112        d.addCallback(lambda ignored:
113                      self.c1_filenode.check(Monitor(), verify=True))
114        def _check(vr):
115            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
116            self.failIfBigger(delta_reads, MAX_DELTA_READS)
117            try:
118                judgement(vr)
119            except unittest.FailTest as e:
120                # FailTest just uses e.args[0] == str
121                new_arg = str(e.args[0]) + "\nvr.data is: " + str(vr.as_dict())
122                e.args = (new_arg,)
123                raise
124        d.addCallback(_check)
125        return d
126
127    def judge_no_problem(self, vr):
128        """ Verify says the file is healthy when none of the shares have been
129        touched in a way that matters. It doesn't use more than seven times
130        as many reads as it needs."""
131        self.failUnless(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
132        self.failUnlessEqual(vr.get_share_counter_good(), 10)
133        self.failUnlessEqual(len(vr.get_sharemap()), 10)
134        self.failUnlessEqual(vr.get_encoding_needed(), 3)
135        self.failUnlessEqual(vr.get_encoding_expected(), 10)
136        self.failUnlessEqual(vr.get_host_counter_good_shares(), 10)
137        self.failUnlessEqual(len(vr.get_servers_responding()), 10)
138        self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
139
140    def test_ok_no_corruption(self):
141        self.basedir = "repairer/Verifier/ok_no_corruption"
142        return self._help_test_verify(common._corrupt_nothing,
143                                      self.judge_no_problem)
144
145    def test_ok_filedata_size(self):
146        self.basedir = "repairer/Verifier/ok_filedatasize"
147        return self._help_test_verify(common._corrupt_size_of_file_data,
148                                      self.judge_no_problem)
149
150    def test_ok_sharedata_size(self):
151        self.basedir = "repairer/Verifier/ok_sharedata_size"
152        return self._help_test_verify(common._corrupt_size_of_sharedata,
153                                      self.judge_no_problem)
154
155    def test_ok_segment_size(self):
156        self.basedir = "repairer/Verifier/test_ok_segment_size"
157        return self._help_test_verify(common._corrupt_segment_size,
158                                      self.judge_no_problem)
159
160    def judge_visible_corruption(self, vr):
161        """Corruption which is detected by the server means that the server
162        will send you back a Failure in response to get_bucket instead of
163        giving you the share data. Test that verifier handles these answers
164        correctly. It doesn't use more than seven times as many reads as it
165        needs."""
166        self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
167        self.failUnlessEqual(vr.get_share_counter_good(), 9)
168        self.failUnlessEqual(len(vr.get_sharemap()), 9)
169        self.failUnlessEqual(vr.get_encoding_needed(), 3)
170        self.failUnlessEqual(vr.get_encoding_expected(), 10)
171        self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
172        self.failUnlessEqual(len(vr.get_servers_responding()), 9)
173        self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
174
175    def test_corrupt_file_verno(self):
176        self.basedir = "repairer/Verifier/corrupt_file_verno"
177        return self._help_test_verify(common._corrupt_file_version_number,
178                                      self.judge_visible_corruption)
179
180    def judge_share_version_incompatibility(self, vr):
181        # corruption of the share version (inside the container, the 1/2
182        # value that determines whether we've got 4-byte offsets or 8-byte
183        # offsets) to something larger than 2 will trigger a
184        # ShareVersionIncompatible exception, which should be counted in
185        # list-incompatible-shares, rather than list-corrupt-shares.
186        self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
187        self.failUnlessEqual(vr.get_share_counter_good(), 9)
188        self.failUnlessEqual(len(vr.get_sharemap()), 9)
189        self.failUnlessEqual(vr.get_encoding_needed(), 3)
190        self.failUnlessEqual(vr.get_encoding_expected(), 10)
191        self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
192        self.failUnlessEqual(len(vr.get_servers_responding()), 10)
193        self.failUnlessEqual(len(vr.get_corrupt_shares()), 0)
194        self.failUnlessEqual(len(vr.get_incompatible_shares()), 1)
195
196    def test_corrupt_share_verno(self):
197        self.basedir = "repairer/Verifier/corrupt_share_verno"
198        return self._help_test_verify(common._corrupt_sharedata_version_number,
199                                      self.judge_share_version_incompatibility)
200
201    def judge_invisible_corruption(self, vr):
202        # corruption of fields that the server does not check (which is most
203        # of them), which will be detected by the client as it downloads
204        # those shares.
205        self.failIf(vr.is_healthy(), (vr, vr.is_healthy(), vr.as_dict()))
206        self.failUnlessEqual(vr.get_share_counter_good(), 9)
207        self.failUnlessEqual(vr.get_encoding_needed(), 3)
208        self.failUnlessEqual(vr.get_encoding_expected(), 10)
209        self.failUnlessEqual(vr.get_host_counter_good_shares(), 9)
210        self.failUnlessEqual(len(vr.get_corrupt_shares()), 1)
211        self.failUnlessEqual(len(vr.get_incompatible_shares()), 0)
212        self.failUnlessEqual(len(vr.get_servers_responding()), 10)
213        self.failUnlessEqual(len(vr.get_sharemap()), 9)
214
215    def test_corrupt_sharedata_offset(self):
216        self.basedir = "repairer/Verifier/corrupt_sharedata_offset"
217        return self._help_test_verify(common._corrupt_offset_of_sharedata,
218                                      self.judge_invisible_corruption)
219
220    def test_corrupt_ueb_offset(self):
221        self.basedir = "repairer/Verifier/corrupt_ueb_offset"
222        return self._help_test_verify(common._corrupt_offset_of_uri_extension,
223                                      self.judge_invisible_corruption)
224
225    def test_corrupt_ueb_offset_shortread(self):
226        self.basedir = "repairer/Verifier/corrupt_ueb_offset_shortread"
227        return self._help_test_verify(common._corrupt_offset_of_uri_extension_to_force_short_read,
228                                      self.judge_invisible_corruption)
229
230    def test_corrupt_sharedata(self):
231        self.basedir = "repairer/Verifier/corrupt_sharedata"
232        return self._help_test_verify(common._corrupt_share_data,
233                                      self.judge_invisible_corruption)
234
235    def test_corrupt_sharedata_last_byte(self):
236        self.basedir = "repairer/Verifier/corrupt_sharedata_last_byte"
237        return self._help_test_verify(common._corrupt_share_data_last_byte,
238                                      self.judge_invisible_corruption)
239
240    def test_corrupt_ueb_length(self):
241        self.basedir = "repairer/Verifier/corrupt_ueb_length"
242        return self._help_test_verify(common._corrupt_length_of_uri_extension,
243                                      self.judge_invisible_corruption)
244
245    def test_corrupt_ueb(self):
246        # Note that in some rare situations this might fail, specifically if
247        # the length of the UEB is corrupted to be a value that is bigger than
248        # the size but less than 2000, it might not get caught... But that's
249        # mostly because in that case it doesn't meaningfully corrupt it. See
250        # _get_uri_extension_the_old_way() in layout.py for where the 2000
251        # number comes from.
252        self.basedir = "repairer/Verifier/corrupt_ueb"
253        return self._help_test_verify(common._corrupt_uri_extension,
254                                      self.judge_invisible_corruption)
255
256    def test_truncate_crypttext_hashtree(self):
257        # change the start of the block hashtree, to truncate the preceding
258        # crypttext hashtree
259        self.basedir = "repairer/Verifier/truncate_crypttext_hashtree"
260        return self._help_test_verify(common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
261                                      self.judge_invisible_corruption)
262
263    def test_corrupt_block_hashtree_offset(self):
264        self.basedir = "repairer/Verifier/corrupt_block_hashtree_offset"
265        return self._help_test_verify(common._corrupt_offset_of_block_hashes,
266                                      self.judge_invisible_corruption)
267
268    def test_wrong_share_verno(self):
269        self.basedir = "repairer/Verifier/wrong_share_verno"
270        return self._help_test_verify(common._corrupt_sharedata_version_number_to_plausible_version,
271                                      self.judge_invisible_corruption)
272
273    def test_corrupt_share_hashtree_offset(self):
274        self.basedir = "repairer/Verifier/corrupt_share_hashtree_offset"
275        return self._help_test_verify(common._corrupt_offset_of_share_hashes,
276                                      self.judge_invisible_corruption)
277
278    def test_corrupt_crypttext_hashtree_offset(self):
279        self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_offset"
280        return self._help_test_verify(common._corrupt_offset_of_ciphertext_hash_tree,
281                                      self.judge_invisible_corruption)
282
283    def test_corrupt_crypttext_hashtree(self):
284        self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree"
285        return self._help_test_verify(common._corrupt_crypttext_hash_tree,
286                                      self.judge_invisible_corruption)
287
288    def test_corrupt_crypttext_hashtree_byte_x221(self):
289        self.basedir = "repairer/Verifier/corrupt_crypttext_hashtree_byte_9_bit_7"
290        return self._help_test_verify(common._corrupt_crypttext_hash_tree_byte_x221,
291                                      self.judge_invisible_corruption, debug=True)
292
293    def test_corrupt_block_hashtree(self):
294        self.basedir = "repairer/Verifier/corrupt_block_hashtree"
295        return self._help_test_verify(common._corrupt_block_hashes,
296                                      self.judge_invisible_corruption)
297
298    def test_corrupt_share_hashtree(self):
299        self.basedir = "repairer/Verifier/corrupt_share_hashtree"
300        return self._help_test_verify(common._corrupt_share_hashes,
301                                      self.judge_invisible_corruption)
302
303    # TODO: the Verifier should decode to ciphertext and check it against the
304    # crypttext-hash-tree. Check this by constructing a bogus file, in which
305    # the crypttext-hash-tree is modified after encoding is done, but before
306    # the UEB is finalized. The Verifier should see a valid
307    # crypttext-hash-tree but then the ciphertext should show up as invalid.
308    # Normally this could only be triggered by a bug in FEC decode.
309
310    def OFF_test_each_byte(self):
311        # this test takes 140s to run on my laptop, and doesn't have any
312        # actual asserts, so it's commented out. It corrupts each byte of the
313        # share in sequence, and checks to see which ones the Verifier
314        # catches and which it misses. Ticket #819 contains details: there
315        # are several portions of the share that are unused, for which
316        # corruption is not supposed to be caught.
317        #
318        # If the test ran quickly, we could use the share size to compute the
319        # offsets of these unused portions and assert that everything outside
320        # of them was detected. We could then replace the rest of
321        # Verifier.test_* (which takes 16s to run on my laptop) with this
322        # one.
323        self.basedir = "repairer/Verifier/each_byte"
324        self.set_up_grid(num_clients=2)
325        d = self.upload_and_stash()
326        def _grab_sh0(res):
327            self.sh0_file = [sharefile
328                             for (shnum, serverid, sharefile)
329                             in self.find_uri_shares(self.uri)
330                             if shnum == 0][0]
331            self.sh0_orig = open(self.sh0_file, "rb").read()
332        d.addCallback(_grab_sh0)
333        def _fix_sh0(res):
334            f = open(self.sh0_file, "wb")
335            f.write(self.sh0_orig)
336            f.close()
337        def _corrupt(ign, which):
338            def _corruptor(s, debug=False):
339                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
340            self.corrupt_shares_numbered(self.uri, [0], _corruptor)
341        results = {}
342        def _did_check(vr, i):
343            #print("corrupt %d: healthy=%s" % (i, vr.is_healthy()))
344            results[i] = vr.is_healthy()
345        def _start(ign):
346            d = defer.succeed(None)
347            for i in range(len(self.sh0_orig)):
348                d.addCallback(_corrupt, i)
349                d.addCallback(lambda ign:
350                              self.c1_filenode.check(Monitor(), verify=True))
351                d.addCallback(_did_check, i)
352                d.addCallback(_fix_sh0)
353            return d
354        d.addCallback(_start)
355        def _show_results(ign):
356            f = open("test_each_byte_output", "w")
357            for i in sorted(results.keys()):
358                print("%d: %s" % (i, results[i]), file=f)
359            f.close()
360            print("Please look in _trial_temp/test_each_byte_output for results")
361        d.addCallback(_show_results)
362        return d
363
364# We'll allow you to pass this test even if you trigger thirty-five times as
365# many block sends and disk writes as would be optimal.
366WRITE_LEEWAY = 35
367# Optimally, you could repair one of these (small) files in a single write.
368DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
369
370class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
371               common.ShouldFailMixin):
372
373    def test_harness(self):
374        # This test is actually to make sure our test harness works, rather
375        # than testing anything about Tahoe code itself.
376
377        self.basedir = "repairer/Repairer/test_code"
378        self.set_up_grid(num_clients=2)
379        d = self.upload_and_stash()
380
381        d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
382        def _stash_shares(oldshares):
383            self.oldshares = oldshares
384        d.addCallback(_stash_shares)
385        d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
386        def _compare(newshares):
387            self.failUnlessEqual(newshares, self.oldshares)
388        d.addCallback(_compare)
389
390        def _delete_8(ignored):
391            shnum = self.oldshares[0][0]
392            self.delete_shares_numbered(self.uri, [shnum])
393            for sh in self.oldshares[1:8]:
394                self.delete_share(sh)
395        d.addCallback(_delete_8)
396        d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
397        d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
398
399        d.addCallback(lambda ignored:
400                      self.shouldFail(NotEnoughSharesError, "then_download",
401                                      None,
402                                      download_to_data, self.c1_filenode))
403
404        d.addCallback(lambda ignored:
405                      self.shouldFail(NotEnoughSharesError, "then_repair",
406                                      None,
407                                      self.c1_filenode.check_and_repair,
408                                      Monitor(), verify=False))
409
410        # test share corruption
411        def _test_corrupt(ignored):
412            olddata = {}
413            shares = self.find_uri_shares(self.uri)
414            for (shnum, serverid, sharefile) in shares:
415                olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
416            for sh in shares:
417                self.corrupt_share(sh, common._corrupt_uri_extension)
418            for (shnum, serverid, sharefile) in shares:
419                newdata = open(sharefile, "rb").read()
420                self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
421        d.addCallback(_test_corrupt)
422
423        def _remove_all(ignored):
424            for sh in self.find_uri_shares(self.uri):
425                self.delete_share(sh)
426        d.addCallback(_remove_all)
427        d.addCallback(lambda ignored: self.find_uri_shares(self.uri))
428        d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
429
430        return d
431
432    def test_repair_from_deletion_of_1(self):
433        """ Repair replaces a share that got deleted. """
434        self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
435        self.set_up_grid(num_clients=2)
436        d = self.upload_and_stash()
437
438        d.addCallback(lambda ignored:
439                      self.delete_shares_numbered(self.uri, [2]))
440        d.addCallback(lambda ignored: self._stash_counts())
441        d.addCallback(lambda ignored:
442                      self.c0_filenode.check_and_repair(Monitor(),
443                                                        verify=False))
444        def _check_results(crr):
445            self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
446            pre = crr.get_pre_repair_results()
447            self.failUnlessIsInstance(pre, check_results.CheckResults)
448            post = crr.get_post_repair_results()
449            self.failUnlessIsInstance(post, check_results.CheckResults)
450            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
451            self.failIfBigger(delta_reads, MAX_DELTA_READS)
452            self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
453            self.failIf(pre.is_healthy())
454            self.failUnless(post.is_healthy())
455
456            # Now we inspect the filesystem to make sure that it has 10
457            # shares.
458            shares = self.find_uri_shares(self.uri)
459            self.failIf(len(shares) < 10)
460        d.addCallback(_check_results)
461
462        d.addCallback(lambda ignored:
463                      self.c0_filenode.check(Monitor(), verify=True))
464        d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
465
466        # Now we delete seven of the other shares, then try to download the
467        # file and assert that it succeeds at downloading and has the right
468        # contents. This can't work unless it has already repaired the
469        # previously-deleted share #2.
470
471        d.addCallback(lambda ignored:
472                      self.delete_shares_numbered(self.uri, list(range(3, 10+1))))
473        d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
474        d.addCallback(lambda newdata:
475                      self.failUnlessEqual(newdata, common.TEST_DATA))
476        return d
477
478    def test_repair_from_deletion_of_7(self):
479        """ Repair replaces seven shares that got deleted. """
480        self.basedir = "repairer/Repairer/repair_from_deletion_of_7"
481        self.set_up_grid(num_clients=2)
482        d = self.upload_and_stash()
483        d.addCallback(lambda ignored:
484                      self.delete_shares_numbered(self.uri, list(range(7))))
485        d.addCallback(lambda ignored: self._stash_counts())
486        d.addCallback(lambda ignored:
487                      self.c0_filenode.check_and_repair(Monitor(),
488                                                        verify=False))
489        def _check_results(crr):
490            self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
491            pre = crr.get_pre_repair_results()
492            self.failUnlessIsInstance(pre, check_results.CheckResults)
493            post = crr.get_post_repair_results()
494            self.failUnlessIsInstance(post, check_results.CheckResults)
495            delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
496
497            self.failIfBigger(delta_reads, MAX_DELTA_READS)
498            self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
499            self.failIf(pre.is_healthy())
500            self.failUnless(post.is_healthy(), post.as_dict())
501
502            # Make sure we really have 10 shares.
503            shares = self.find_uri_shares(self.uri)
504            self.failIf(len(shares) < 10)
505        d.addCallback(_check_results)
506
507        d.addCallback(lambda ignored:
508                      self.c0_filenode.check(Monitor(), verify=True))
509        d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
510
511        # Now we delete seven of the other shares, then try to download the
512        # file and assert that it succeeds at downloading and has the right
513        # contents. This can't work unless it has already repaired the
514        # previously-deleted share #2.
515
516        d.addCallback(lambda ignored:
517                      self.delete_shares_numbered(self.uri, list(range(3, 10+1))))
518        d.addCallback(lambda ignored: download_to_data(self.c1_filenode))
519        d.addCallback(lambda newdata:
520                      self.failUnlessEqual(newdata, common.TEST_DATA))
521        return d
522
523    def test_repairer_servers_of_happiness(self):
524        # The repairer is supposed to generate and place as many of the
525        # missing shares as possible without caring about how they are
526        # distributed.
527        self.basedir = "repairer/Repairer/repairer_servers_of_happiness"
528        self.set_up_grid(num_clients=2, num_servers=10)
529        d = self.upload_and_stash()
530        # Now delete some servers. We want to leave 3 servers, which
531        # will allow us to restore the file to a healthy state without
532        # distributing the shares widely enough to satisfy the default
533        # happiness setting.
534        def _delete_some_servers(ignored):
535            for i in range(7):
536                self.g.remove_server(self.g.servers_by_number[i].my_nodeid)
537
538            assert len(self.g.servers_by_number) == 3
539
540        d.addCallback(_delete_some_servers)
541        # Now try to repair the file.
542        d.addCallback(lambda ignored:
543            self.c0_filenode.check_and_repair(Monitor(), verify=False))
544        def _check_results(crr):
545            self.failUnlessIsInstance(crr,
546                                      check_results.CheckAndRepairResults)
547            pre = crr.get_pre_repair_results()
548            post = crr.get_post_repair_results()
549            for p in (pre, post):
550                self.failUnlessIsInstance(p, check_results.CheckResults)
551
552            self.failIf(pre.is_healthy())
553            self.failUnless(post.is_healthy())
554
555        d.addCallback(_check_results)
556        return d
557
558    # why is test_repair_from_corruption_of_1 disabled? Read on:
559    #
560    # As recently documented in NEWS.rst for the 1.3.0 release, the current
561    # immutable repairer suffers from several limitations:
562    #
563    #  * minimalistic verifier: it's just download without decryption, so we
564    #    don't look for corruption in N-k shares, and for many fields (those
565    #    which are the same in all shares) we only look for corruption in a
566    #    single share
567    #
568    #  * some kinds of corruption cause download to fail (when it ought to
569    #    just switch to a different share), so repair will fail on these too
570    #
571    #  * RIStorageServer doesn't offer a way to delete old corrupt immutable
572    #    shares (the authority model is not at all clear), so the best the
573    #    repairer can do is to put replacement shares on new servers,
574    #    unfortunately leaving the corrupt shares in place
575    #
576    # This test is pretty strenuous: it asserts that the repairer does the
577    # ideal thing in 8 distinct situations, with randomized corruption in
578    # each. Because of the aforementioned limitations, it is highly unlikely
579    # to pass any of these. We're also concerned that the download-fails case
580    # can provoke a lost-progress bug (one was fixed, but there might be more
581    # lurking), which will cause the test to fail despite a ".todo" marker,
582    # and will probably cause subsequent unrelated tests to fail too (due to
583    # "unclean reactor" problems).
584    #
585    # In addition, I (warner) have recently refactored the rest of this class
586    # to use the much-faster no_network.GridTestMixin, so this tests needs to
587    # be updated before it will be able to run again.
588    #
589    # So we're turning this test off until we've done one or more of the
590    # following:
591    #  * remove some of these limitations
592    #  * break the test up into smaller, more functionally-oriented pieces
593    #  * simplify the repairer enough to let us be confident that it is free
594    #    of lost-progress bugs
595
596    def OFF_test_repair_from_corruption_of_1(self):
597        d = defer.succeed(None)
598
599        d.addCallback(self.find_all_shares)
600        stash = [None]
601        def _stash_it(res):
602            stash[0] = res
603            return res
604        d.addCallback(_stash_it)
605        def _put_it_all_back(ignored):
606            self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
607            return ignored
608
609        def _repair_from_corruption(shnum, corruptor_func):
610            before_repair_reads = self._count_reads()
611            before_repair_allocates = self._count_writes()
612
613            d2 = self.filenode.check_and_repair(Monitor(), verify=True)
614            def _after_repair(checkandrepairresults):
615                prerepairres = checkandrepairresults.get_pre_repair_results()
616                postrepairres = checkandrepairresults.get_post_repair_results()
617                after_repair_reads = self._count_reads()
618                after_repair_allocates = self._count_writes()
619
620                # The "* 2" in reads is because you might read a whole share
621                # before figuring out that it is corrupted. It might be
622                # possible to make this delta reads number a little tighter.
623                self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
624                # The "* 2" in writes is because each server has two shares,
625                # and it is reasonable for repairer to conclude that there
626                # are two shares that it should upload, if the server fails
627                # to serve the first share.
628                self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
629                self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
630                self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
631
632                # Now we inspect the filesystem to make sure that it has 10
633                # shares.
634                shares = self.find_all_shares()
635                self.failIf(len(shares) < 10)
636
637                # Now we assert that the verifier reports the file as healthy.
638                d3 = self.filenode.check(Monitor(), verify=True)
639                def _after_verify(verifyresults):
640                    self.failUnless(verifyresults.is_healthy())
641                d3.addCallback(_after_verify)
642
643                # Now we delete seven of the other shares, then try to
644                # download the file and assert that it succeeds at
645                # downloading and has the right contents. This can't work
646                # unless it has already repaired the previously-corrupted share.
647                def _then_delete_7_and_try_a_download(unused=None):
648                    shnums = list(range(10))
649                    shnums.remove(shnum)
650                    random.shuffle(shnums)
651                    for sharenum in shnums[:7]:
652                        self._delete_a_share(sharenum=sharenum)
653
654                    return self._download_and_check_plaintext()
655                d3.addCallback(_then_delete_7_and_try_a_download)
656                return d3
657
658            d2.addCallback(_after_repair)
659            return d2
660
661        for corruptor_func in (
662            common._corrupt_file_version_number,
663            common._corrupt_sharedata_version_number,
664            common._corrupt_offset_of_sharedata,
665            common._corrupt_offset_of_uri_extension,
666            common._corrupt_offset_of_uri_extension_to_force_short_read,
667            common._corrupt_share_data,
668            common._corrupt_length_of_uri_extension,
669            common._corrupt_uri_extension,
670            ):
671            # Now we corrupt a share...
672            d.addCallback(self._corrupt_a_random_share, corruptor_func)
673            # And repair...
674            d.addCallback(_repair_from_corruption, corruptor_func)
675
676        return d
677    #test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
678
679    def test_tiny_reads(self):
680        # ticket #1223 points out three problems:
681        #   repairer reads beyond end of input file
682        #   new-downloader does not tolerate overreads
683        #   uploader does lots of tiny reads, inefficient
684        self.basedir = "repairer/Repairer/test_tiny_reads"
685        self.set_up_grid()
686        c0 = self.g.clients[0]
687        DATA = b"a"*135
688        c0.encoding_params['k'] = 22
689        c0.encoding_params['n'] = 66
690        d = c0.upload(upload.Data(DATA, convergence=b""))
691        def _then(ur):
692            self.uri = ur.get_uri()
693            self.delete_shares_numbered(self.uri, [0])
694            self.c0_filenode = c0.create_node_from_uri(ur.get_uri())
695            self._stash_counts()
696            return self.c0_filenode.check_and_repair(Monitor())
697        d.addCallback(_then)
698        def _check(ign):
699            (r,a,w) = self._get_delta_counts()
700            # when the uploader (driven by the repairer) does full-segment
701            # reads, this makes 44 server read calls (2*k). Before, when it
702            # was doing input_chunk_size reads (7 bytes), it was doing over
703            # 400.
704            self.failIf(r > 100, "too many reads: %d>100" % r)
705        d.addCallback(_check)
706        return d
707
708    def test_servers_responding(self):
709        self.basedir = "repairer/Repairer/servers_responding"
710        self.set_up_grid(num_clients=2)
711        d = self.upload_and_stash()
712        # now cause one of the servers to not respond during the pre-repair
713        # filecheck, but then *do* respond to the post-repair filecheck
714        def _then(ign):
715            ss = self.g.servers_by_number[0]
716            # we want to delete the share corresponding to the server
717            # we're making not-respond
718            share = next(ss.get_shares(self.c0_filenode.get_storage_index()))[0]
719            self.delete_shares_numbered(self.uri, [share])
720            return self.c0_filenode.check_and_repair(Monitor())
721        d.addCallback(_then)
722        def _check(rr):
723            # this exercises a bug in which the servers-responding list did
724            # not include servers that responded to the Repair, but which did
725            # not respond to the pre-repair filecheck
726            prr = rr.get_post_repair_results()
727            expected = set(self.g.get_all_serverids())
728            responding_set = frozenset([s.get_serverid() for s in prr.get_servers_responding()])
729            self.failIf(expected - responding_set, expected - responding_set)
730            self.failIf(responding_set - expected, responding_set - expected)
731            self.failUnlessEqual(expected,
732                                 set([s.get_serverid()
733                                      for s in prr.get_servers_responding()]))
734        d.addCallback(_check)
735        return d
736
737# XXX extend these tests to show that the checker detects which specific
738# share on which specific server is broken -- this is necessary so that the
739# checker results can be passed to the repairer and the repairer can go ahead
740# and upload fixes without first doing what is effectively a check (/verify)
741# run
742
743# XXX extend these tests to show bad behavior of various kinds from servers:
744# raising exception from each remove_foo() method, for example
745
746# XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
747
748# XXX test corruption that truncates other hash trees than just the crypttext
749# hash tree
750
751# XXX test the notify-someone-about-corruption feature (also implement that
752# feature)
753
754# XXX test whether repairer (downloader) correctly downloads a file even if
755# to do so it has to acquire shares from a server that has already tried to
756# serve it a corrupted share. (I don't think the current downloader would
757# pass this test, depending on the kind of corruption.)
Note: See TracBrowser for help on using the repository browser.