source: trunk/src/allmydata/test/mutable/test_repair.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 12.1 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from ..common import AsyncTestCase
6from testtools.matchers import Equals, HasLength
7from allmydata.interfaces import IRepairResults, ICheckAndRepairResults
8from allmydata.monitor import Monitor
9from allmydata.mutable.common import MODE_CHECK
10from allmydata.mutable.layout import unpack_header
11from allmydata.mutable.repairer import MustForceRepairError
12from ..common import ShouldFailMixin
13from .util import PublishMixin
14
15class Repair(AsyncTestCase, PublishMixin, ShouldFailMixin):
16
17    def get_shares(self, s):
18        all_shares = {} # maps (peerid, shnum) to share data
19        for peerid in s._peers:
20            shares = s._peers[peerid]
21            for shnum in shares:
22                data = shares[shnum]
23                all_shares[ (peerid, shnum) ] = data
24        return all_shares
25
26    def copy_shares(self, ignored=None):
27        self.old_shares.append(self.get_shares(self._storage))
28
29    def test_repair_nop(self):
30        self.old_shares = []
31        d = self.publish_one()
32        d.addCallback(self.copy_shares)
33        d.addCallback(lambda res: self._fn.check(Monitor()))
34        d.addCallback(lambda check_results: self._fn.repair(check_results))
35        def _check_results(rres):
36            self.assertThat(IRepairResults.providedBy(rres), Equals(True))
37            self.assertThat(rres.get_successful(), Equals(True))
38            # TODO: examine results
39
40            self.copy_shares()
41
42            initial_shares = self.old_shares[0]
43            new_shares = self.old_shares[1]
44            # TODO: this really shouldn't change anything. When we implement
45            # a "minimal-bandwidth" repairer", change this test to assert:
46            #self.assertThat(new_shares, Equals(initial_shares))
47
48            # all shares should be in the same place as before
49            self.assertThat(set(initial_shares.keys()),
50                                 Equals(set(new_shares.keys())))
51            # but they should all be at a newer seqnum. The IV will be
52            # different, so the roothash will be too.
53            for key in initial_shares:
54                (version0,
55                 seqnum0,
56                 root_hash0,
57                 IV0,
58                 k0, N0, segsize0, datalen0,
59                 o0) = unpack_header(initial_shares[key])
60                (version1,
61                 seqnum1,
62                 root_hash1,
63                 IV1,
64                 k1, N1, segsize1, datalen1,
65                 o1) = unpack_header(new_shares[key])
66                self.assertThat(version0, Equals(version1))
67                self.assertThat(seqnum0+1, Equals(seqnum1))
68                self.assertThat(k0, Equals(k1))
69                self.assertThat(N0, Equals(N1))
70                self.assertThat(segsize0, Equals(segsize1))
71                self.assertThat(datalen0, Equals(datalen1))
72        d.addCallback(_check_results)
73        return d
74
75    def failIfSharesChanged(self, ignored=None):
76        old_shares = self.old_shares[-2]
77        current_shares = self.old_shares[-1]
78        self.assertThat(old_shares, Equals(current_shares))
79
80
81    def _test_whether_repairable(self, publisher, nshares, expected_result):
82        d = publisher()
83        def _delete_some_shares(ign):
84            shares = self._storage._peers
85            for peerid in shares:
86                for shnum in list(shares[peerid]):
87                    if shnum >= nshares:
88                        del shares[peerid][shnum]
89        d.addCallback(_delete_some_shares)
90        d.addCallback(lambda ign: self._fn.check(Monitor()))
91        def _check(cr):
92            self.assertThat(cr.is_healthy(), Equals(False))
93            self.assertThat(cr.is_recoverable(), Equals(expected_result))
94            return cr
95        d.addCallback(_check)
96        d.addCallback(lambda check_results: self._fn.repair(check_results))
97        d.addCallback(lambda crr: self.assertThat(crr.get_successful(), Equals(expected_result)))
98        return d
99
100    def test_unrepairable_0shares(self):
101        return self._test_whether_repairable(self.publish_one, 0, False)
102
103    def test_mdmf_unrepairable_0shares(self):
104        return self._test_whether_repairable(self.publish_mdmf, 0, False)
105
106    def test_unrepairable_1share(self):
107        return self._test_whether_repairable(self.publish_one, 1, False)
108
109    def test_mdmf_unrepairable_1share(self):
110        return self._test_whether_repairable(self.publish_mdmf, 1, False)
111
112    def test_repairable_5shares(self):
113        return self._test_whether_repairable(self.publish_one, 5, True)
114
115    def test_mdmf_repairable_5shares(self):
116        return self._test_whether_repairable(self.publish_mdmf, 5, True)
117
118    def _test_whether_checkandrepairable(self, publisher, nshares, expected_result):
119        """
120        Like the _test_whether_repairable tests, but invoking check_and_repair
121        instead of invoking check and then invoking repair.
122        """
123        d = publisher()
124        def _delete_some_shares(ign):
125            shares = self._storage._peers
126            for peerid in shares:
127                for shnum in list(shares[peerid]):
128                    if shnum >= nshares:
129                        del shares[peerid][shnum]
130        d.addCallback(_delete_some_shares)
131        d.addCallback(lambda ign: self._fn.check_and_repair(Monitor()))
132        d.addCallback(lambda crr: self.assertThat(crr.get_repair_successful(), Equals(expected_result)))
133        return d
134
135    def test_unrepairable_0shares_checkandrepair(self):
136        return self._test_whether_checkandrepairable(self.publish_one, 0, False)
137
138    def test_mdmf_unrepairable_0shares_checkandrepair(self):
139        return self._test_whether_checkandrepairable(self.publish_mdmf, 0, False)
140
141    def test_unrepairable_1share_checkandrepair(self):
142        return self._test_whether_checkandrepairable(self.publish_one, 1, False)
143
144    def test_mdmf_unrepairable_1share_checkandrepair(self):
145        return self._test_whether_checkandrepairable(self.publish_mdmf, 1, False)
146
147    def test_repairable_5shares_checkandrepair(self):
148        return self._test_whether_checkandrepairable(self.publish_one, 5, True)
149
150    def test_mdmf_repairable_5shares_checkandrepair(self):
151        return self._test_whether_checkandrepairable(self.publish_mdmf, 5, True)
152
153
154    def test_merge(self):
155        self.old_shares = []
156        d = self.publish_multiple()
157        # repair will refuse to merge multiple highest seqnums unless you
158        # pass force=True
159        d.addCallback(lambda res:
160                      self._set_versions({0:3,2:3,4:3,6:3,8:3,
161                                          1:4,3:4,5:4,7:4,9:4}))
162        d.addCallback(self.copy_shares)
163        d.addCallback(lambda res: self._fn.check(Monitor()))
164        def _try_repair(check_results):
165            ex = "There were multiple recoverable versions with identical seqnums, so force=True must be passed to the repair() operation"
166            d2 = self.shouldFail(MustForceRepairError, "test_merge", ex,
167                                 self._fn.repair, check_results)
168            d2.addCallback(self.copy_shares)
169            d2.addCallback(self.failIfSharesChanged)
170            d2.addCallback(lambda res: check_results)
171            return d2
172        d.addCallback(_try_repair)
173        d.addCallback(lambda check_results:
174                      self._fn.repair(check_results, force=True))
175        # this should give us 10 shares of the highest roothash
176        def _check_repair_results(rres):
177            self.assertThat(rres.get_successful(), Equals(True))
178            pass # TODO
179        d.addCallback(_check_repair_results)
180        d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
181        def _check_smap(smap):
182            self.assertThat(smap.recoverable_versions(), HasLength(1))
183            self.assertThat(smap.unrecoverable_versions(), HasLength(0))
184            # now, which should have won?
185            roothash_s4a = self.get_roothash_for(3)
186            roothash_s4b = self.get_roothash_for(4)
187            if roothash_s4b > roothash_s4a:
188                expected_contents = self.CONTENTS[4]
189            else:
190                expected_contents = self.CONTENTS[3]
191            new_versionid = smap.best_recoverable_version()
192            self.assertThat(new_versionid[0], Equals(5)) # seqnum 5
193            d2 = self._fn.download_version(smap, new_versionid)
194            d2.addCallback(self.assertEqual, expected_contents)
195            return d2
196        d.addCallback(_check_smap)
197        return d
198
199    def test_non_merge(self):
200        self.old_shares = []
201        d = self.publish_multiple()
202        # repair should not refuse a repair that doesn't need to merge. In
203        # this case, we combine v2 with v3. The repair should ignore v2 and
204        # copy v3 into a new v5.
205        d.addCallback(lambda res:
206                      self._set_versions({0:2,2:2,4:2,6:2,8:2,
207                                          1:3,3:3,5:3,7:3,9:3}))
208        d.addCallback(lambda res: self._fn.check(Monitor()))
209        d.addCallback(lambda check_results: self._fn.repair(check_results))
210        # this should give us 10 shares of v3
211        def _check_repair_results(rres):
212            self.assertThat(rres.get_successful(), Equals(True))
213            pass # TODO
214        d.addCallback(_check_repair_results)
215        d.addCallback(lambda res: self._fn.get_servermap(MODE_CHECK))
216        def _check_smap(smap):
217            self.assertThat(smap.recoverable_versions(), HasLength(1))
218            self.assertThat(smap.unrecoverable_versions(), HasLength(0))
219            # now, which should have won?
220            expected_contents = self.CONTENTS[3]
221            new_versionid = smap.best_recoverable_version()
222            self.assertThat(new_versionid[0], Equals(5)) # seqnum 5
223            d2 = self._fn.download_version(smap, new_versionid)
224            d2.addCallback(self.assertEquals, expected_contents)
225            return d2
226        d.addCallback(_check_smap)
227        return d
228
229    def get_roothash_for(self, index):
230        # return the roothash for the first share we see in the saved set
231        shares = self._copied_shares[index]
232        for peerid in shares:
233            for shnum in shares[peerid]:
234                share = shares[peerid][shnum]
235                (version, seqnum, root_hash, IV, k, N, segsize, datalen, o) = \
236                          unpack_header(share)
237                return root_hash
238
239    def test_check_and_repair_readcap(self):
240        # we can't currently repair from a mutable readcap: #625
241        self.old_shares = []
242        d = self.publish_one()
243        d.addCallback(self.copy_shares)
244        def _get_readcap(res):
245            self._fn3 = self._fn.get_readonly()
246            # also delete some shares
247            for peerid,shares in list(self._storage._peers.items()):
248                shares.pop(0, None)
249        d.addCallback(_get_readcap)
250        d.addCallback(lambda res: self._fn3.check_and_repair(Monitor()))
251        def _check_results(crr):
252            self.assertThat(ICheckAndRepairResults.providedBy(crr), Equals(True))
253            # we should detect the unhealthy, but skip over mutable-readcap
254            # repairs until #625 is fixed
255            self.assertThat(crr.get_pre_repair_results().is_healthy(), Equals(False))
256            self.assertThat(crr.get_repair_attempted(), Equals(False))
257            self.assertThat(crr.get_post_repair_results().is_healthy(), Equals(False))
258        d.addCallback(_check_results)
259        return d
260
261    def test_repair_empty(self):
262        # bug 1689: delete one share of an empty mutable file, then repair.
263        # In the buggy version, the check that precedes the retrieve+publish
264        # cycle uses MODE_READ, instead of MODE_REPAIR, and fails to get the
265        # privkey that repair needs.
266        d = self.publish_sdmf(b"")
267        def _delete_one_share(ign):
268            shares = self._storage._peers
269            for peerid in shares:
270                for shnum in list(shares[peerid]):
271                    if shnum == 0:
272                        del shares[peerid][shnum]
273        d.addCallback(_delete_one_share)
274        d.addCallback(lambda ign: self._fn2.check(Monitor()))
275        d.addCallback(lambda check_results: self._fn2.repair(check_results))
276        def _check(crr):
277            self.assertThat(crr.get_successful(), Equals(True))
278        d.addCallback(_check)
279        return d
Note: See TracBrowser for help on using the repository browser.