1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from ..common import AsyncTestCase |
---|
6 | from testtools.matchers import Equals, NotEquals, HasLength |
---|
7 | from twisted.internet import defer |
---|
8 | from allmydata.monitor import Monitor |
---|
9 | from allmydata.mutable.common import \ |
---|
10 | MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ |
---|
11 | from allmydata.mutable.publish import MutableData |
---|
12 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
---|
13 | from .util import PublishMixin |
---|
14 | |
---|
15 | class Servermap(AsyncTestCase, PublishMixin): |
---|
16 | def setUp(self): |
---|
17 | super(Servermap, self).setUp() |
---|
18 | return self.publish_one() |
---|
19 | |
---|
20 | def make_servermap(self, mode=MODE_CHECK, fn=None, sb=None, |
---|
21 | update_range=None): |
---|
22 | if fn is None: |
---|
23 | fn = self._fn |
---|
24 | if sb is None: |
---|
25 | sb = self._storage_broker |
---|
26 | smu = ServermapUpdater(fn, sb, Monitor(), |
---|
27 | ServerMap(), mode, update_range=update_range) |
---|
28 | d = smu.update() |
---|
29 | return d |
---|
30 | |
---|
31 | def update_servermap(self, oldmap, mode=MODE_CHECK): |
---|
32 | smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(), |
---|
33 | oldmap, mode) |
---|
34 | d = smu.update() |
---|
35 | return d |
---|
36 | |
---|
37 | def failUnlessOneRecoverable(self, sm, num_shares): |
---|
38 | self.assertThat(sm.recoverable_versions(), HasLength(1)) |
---|
39 | self.assertThat(sm.unrecoverable_versions(), HasLength(0)) |
---|
40 | best = sm.best_recoverable_version() |
---|
41 | self.assertThat(best, NotEquals(None)) |
---|
42 | self.assertThat(sm.recoverable_versions(), Equals(set([best]))) |
---|
43 | self.assertThat(sm.shares_available(), HasLength(1)) |
---|
44 | self.assertThat(sm.shares_available()[best], Equals((num_shares, 3, 10))) |
---|
45 | shnum, servers = list(sm.make_sharemap().items())[0] |
---|
46 | server = list(servers)[0] |
---|
47 | self.assertThat(sm.version_on_server(server, shnum), Equals(best)) |
---|
48 | self.assertThat(sm.version_on_server(server, 666), Equals(None)) |
---|
49 | return sm |
---|
50 | |
---|
51 | def test_basic(self): |
---|
52 | d = defer.succeed(None) |
---|
53 | ms = self.make_servermap |
---|
54 | us = self.update_servermap |
---|
55 | |
---|
56 | d.addCallback(lambda res: ms(mode=MODE_CHECK)) |
---|
57 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
58 | d.addCallback(lambda res: ms(mode=MODE_WRITE)) |
---|
59 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
60 | d.addCallback(lambda res: ms(mode=MODE_READ)) |
---|
61 | # this mode stops at k+epsilon, and epsilon=k, so 6 shares |
---|
62 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6)) |
---|
63 | d.addCallback(lambda res: ms(mode=MODE_ANYTHING)) |
---|
64 | # this mode stops at 'k' shares |
---|
65 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3)) |
---|
66 | |
---|
67 | # and can we re-use the same servermap? Note that these are sorted in |
---|
68 | # increasing order of number of servers queried, since once a server |
---|
69 | # gets into the servermap, we'll always ask it for an update. |
---|
70 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3)) |
---|
71 | d.addCallback(lambda sm: us(sm, mode=MODE_READ)) |
---|
72 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6)) |
---|
73 | d.addCallback(lambda sm: us(sm, mode=MODE_WRITE)) |
---|
74 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
75 | d.addCallback(lambda sm: us(sm, mode=MODE_CHECK)) |
---|
76 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
77 | d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING)) |
---|
78 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
79 | |
---|
80 | return d |
---|
81 | |
---|
82 | def test_fetch_privkey(self): |
---|
83 | d = defer.succeed(None) |
---|
84 | # use the sibling filenode (which hasn't been used yet), and make |
---|
85 | # sure it can fetch the privkey. The file is small, so the privkey |
---|
86 | # will be fetched on the first (query) pass. |
---|
87 | d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2)) |
---|
88 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
89 | |
---|
90 | # create a new file, which is large enough to knock the privkey out |
---|
91 | # of the early part of the file |
---|
92 | LARGE = b"These are Larger contents" * 200 # about 5KB |
---|
93 | LARGE_uploadable = MutableData(LARGE) |
---|
94 | d.addCallback(lambda res: self._nodemaker.create_mutable_file(LARGE_uploadable)) |
---|
95 | def _created(large_fn): |
---|
96 | large_fn2 = self._nodemaker.create_from_cap(large_fn.get_uri()) |
---|
97 | return self.make_servermap(MODE_WRITE, large_fn2) |
---|
98 | d.addCallback(_created) |
---|
99 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10)) |
---|
100 | return d |
---|
101 | |
---|
102 | |
---|
103 | def test_mark_bad(self): |
---|
104 | d = defer.succeed(None) |
---|
105 | ms = self.make_servermap |
---|
106 | |
---|
107 | d.addCallback(lambda res: ms(mode=MODE_READ)) |
---|
108 | d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6)) |
---|
109 | def _made_map(sm): |
---|
110 | v = sm.best_recoverable_version() |
---|
111 | vm = sm.make_versionmap() |
---|
112 | shares = list(vm[v]) |
---|
113 | self.assertThat(shares, HasLength(6)) |
---|
114 | self._corrupted = set() |
---|
115 | # mark the first 5 shares as corrupt, then update the servermap. |
---|
116 | # The map should not have the marked shares it in any more, and |
---|
117 | # new shares should be found to replace the missing ones. |
---|
118 | for (shnum, server, timestamp) in shares: |
---|
119 | if shnum < 5: |
---|
120 | self._corrupted.add( (server, shnum) ) |
---|
121 | sm.mark_bad_share(server, shnum, b"") |
---|
122 | return self.update_servermap(sm, MODE_WRITE) |
---|
123 | d.addCallback(_made_map) |
---|
124 | def _check_map(sm): |
---|
125 | # this should find all 5 shares that weren't marked bad |
---|
126 | v = sm.best_recoverable_version() |
---|
127 | vm = sm.make_versionmap() |
---|
128 | shares = list(vm[v]) |
---|
129 | for (server, shnum) in self._corrupted: |
---|
130 | server_shares = sm.debug_shares_on_server(server) |
---|
131 | self.assertFalse(shnum in server_shares, "%d was in %s" % (shnum, server_shares)) |
---|
132 | self.assertThat(shares, HasLength(5)) |
---|
133 | d.addCallback(_check_map) |
---|
134 | return d |
---|
135 | |
---|
136 | def failUnlessNoneRecoverable(self, sm): |
---|
137 | self.assertThat(sm.recoverable_versions(), HasLength(0)) |
---|
138 | self.assertThat(sm.unrecoverable_versions(), HasLength(0)) |
---|
139 | best = sm.best_recoverable_version() |
---|
140 | self.assertThat(best, Equals(None)) |
---|
141 | self.assertThat(sm.shares_available(), HasLength(0)) |
---|
142 | |
---|
143 | def test_no_shares(self): |
---|
144 | self._storage._peers = {} # delete all shares |
---|
145 | ms = self.make_servermap |
---|
146 | d = defer.succeed(None) |
---|
147 | # |
---|
148 | d.addCallback(lambda res: ms(mode=MODE_CHECK)) |
---|
149 | d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) |
---|
150 | |
---|
151 | d.addCallback(lambda res: ms(mode=MODE_ANYTHING)) |
---|
152 | d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) |
---|
153 | |
---|
154 | d.addCallback(lambda res: ms(mode=MODE_WRITE)) |
---|
155 | d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) |
---|
156 | |
---|
157 | d.addCallback(lambda res: ms(mode=MODE_READ)) |
---|
158 | d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm)) |
---|
159 | |
---|
160 | return d |
---|
161 | |
---|
162 | def failUnlessNotQuiteEnough(self, sm): |
---|
163 | self.assertThat(sm.recoverable_versions(), HasLength(0)) |
---|
164 | self.assertThat(sm.unrecoverable_versions(), HasLength(1)) |
---|
165 | best = sm.best_recoverable_version() |
---|
166 | self.assertThat(best, Equals(None)) |
---|
167 | self.assertThat(sm.shares_available(), HasLength(1)) |
---|
168 | self.assertThat(list(sm.shares_available().values())[0], Equals((2,3,10))) |
---|
169 | return sm |
---|
170 | |
---|
171 | def test_not_quite_enough_shares(self): |
---|
172 | s = self._storage |
---|
173 | ms = self.make_servermap |
---|
174 | num_shares = len(s._peers) |
---|
175 | for peerid in s._peers: |
---|
176 | s._peers[peerid] = {} |
---|
177 | num_shares -= 1 |
---|
178 | if num_shares == 2: |
---|
179 | break |
---|
180 | # now there ought to be only two shares left |
---|
181 | assert len([peerid for peerid in s._peers if s._peers[peerid]]) == 2 |
---|
182 | |
---|
183 | d = defer.succeed(None) |
---|
184 | |
---|
185 | d.addCallback(lambda res: ms(mode=MODE_CHECK)) |
---|
186 | d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) |
---|
187 | d.addCallback(lambda sm: |
---|
188 | self.assertThat(sm.make_sharemap(), HasLength(2))) |
---|
189 | d.addCallback(lambda res: ms(mode=MODE_ANYTHING)) |
---|
190 | d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) |
---|
191 | d.addCallback(lambda res: ms(mode=MODE_WRITE)) |
---|
192 | d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) |
---|
193 | d.addCallback(lambda res: ms(mode=MODE_READ)) |
---|
194 | d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm)) |
---|
195 | |
---|
196 | return d |
---|
197 | |
---|
198 | |
---|
199 | def test_servermapupdater_finds_mdmf_files(self): |
---|
200 | # setUp already published an MDMF file for us. We just need to |
---|
201 | # make sure that when we run the ServermapUpdater, the file is |
---|
202 | # reported to have one recoverable version. |
---|
203 | d = defer.succeed(None) |
---|
204 | d.addCallback(lambda ignored: |
---|
205 | self.publish_mdmf()) |
---|
206 | d.addCallback(lambda ignored: |
---|
207 | self.make_servermap(mode=MODE_CHECK)) |
---|
208 | # Calling make_servermap also updates the servermap in the mode |
---|
209 | # that we specify, so we just need to see what it says. |
---|
210 | def _check_servermap(sm): |
---|
211 | self.assertThat(sm.recoverable_versions(), HasLength(1)) |
---|
212 | d.addCallback(_check_servermap) |
---|
213 | return d |
---|
214 | |
---|
215 | |
---|
216 | def test_fetch_update(self): |
---|
217 | d = defer.succeed(None) |
---|
218 | d.addCallback(lambda ignored: |
---|
219 | self.publish_mdmf()) |
---|
220 | d.addCallback(lambda ignored: |
---|
221 | self.make_servermap(mode=MODE_WRITE, update_range=(1, 2))) |
---|
222 | def _check_servermap(sm): |
---|
223 | # 10 shares |
---|
224 | self.assertThat(sm.update_data, HasLength(10)) |
---|
225 | # one version |
---|
226 | for data in sm.update_data.values(): |
---|
227 | self.assertThat(data, HasLength(1)) |
---|
228 | d.addCallback(_check_servermap) |
---|
229 | return d |
---|
230 | |
---|
231 | |
---|
232 | def test_servermapupdater_finds_sdmf_files(self): |
---|
233 | d = defer.succeed(None) |
---|
234 | d.addCallback(lambda ignored: |
---|
235 | self.publish_sdmf()) |
---|
236 | d.addCallback(lambda ignored: |
---|
237 | self.make_servermap(mode=MODE_CHECK)) |
---|
238 | d.addCallback(lambda servermap: |
---|
239 | self.assertThat(servermap.recoverable_versions(), HasLength(1))) |
---|
240 | return d |
---|