1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | |
---|
6 | from io import BytesIO |
---|
7 | import attr |
---|
8 | from twisted.internet import defer, reactor |
---|
9 | from foolscap.api import eventually, fireEventually |
---|
10 | from allmydata import client |
---|
11 | from allmydata.nodemaker import NodeMaker |
---|
12 | from allmydata.interfaces import SDMF_VERSION, MDMF_VERSION |
---|
13 | from allmydata.util import base32 |
---|
14 | from allmydata.util.hashutil import tagged_hash |
---|
15 | from allmydata.storage_client import StorageFarmBroker |
---|
16 | from allmydata.mutable.layout import MDMFSlotReadProxy |
---|
17 | from allmydata.mutable.publish import MutableData |
---|
18 | from ..common import ( |
---|
19 | EMPTY_CLIENT_CONFIG, |
---|
20 | ) |
---|
21 | |
---|
22 | def bchr(s): |
---|
23 | return bytes([s]) |
---|
24 | |
---|
25 | def eventuaaaaaly(res=None): |
---|
26 | d = fireEventually(res) |
---|
27 | d.addCallback(fireEventually) |
---|
28 | d.addCallback(fireEventually) |
---|
29 | return d |
---|
30 | |
---|
31 | # this "FakeStorage" exists to put the share data in RAM and avoid using real |
---|
32 | # network connections, both to speed up the tests and to reduce the amount of |
---|
33 | # non-mutable.py code being exercised. |
---|
34 | |
---|
35 | class FakeStorage: |
---|
36 | # this class replaces the collection of storage servers, allowing the |
---|
37 | # tests to examine and manipulate the published shares. It also lets us |
---|
38 | # control the order in which read queries are answered, to exercise more |
---|
39 | # of the error-handling code in Retrieve . |
---|
40 | # |
---|
41 | # Note that we ignore the storage index: this FakeStorage instance can |
---|
42 | # only be used for a single storage index. |
---|
43 | |
---|
44 | |
---|
45 | def __init__(self): |
---|
46 | self._peers = {} |
---|
47 | # _sequence is used to cause the responses to occur in a specific |
---|
48 | # order. If it is in use, then we will defer queries instead of |
---|
49 | # answering them right away, accumulating the Deferreds in a dict. We |
---|
50 | # don't know exactly how many queries we'll get, so exactly one |
---|
51 | # second after the first query arrives, we will release them all (in |
---|
52 | # order). |
---|
53 | self._sequence = None |
---|
54 | self._pending = {} |
---|
55 | self._pending_timer = None |
---|
56 | |
---|
57 | def read(self, peerid, storage_index): |
---|
58 | shares = self._peers.get(peerid, {}) |
---|
59 | if self._sequence is None: |
---|
60 | return eventuaaaaaly(shares) |
---|
61 | d = defer.Deferred() |
---|
62 | if not self._pending: |
---|
63 | self._pending_timer = reactor.callLater(1.0, self._fire_readers) |
---|
64 | if peerid not in self._pending: |
---|
65 | self._pending[peerid] = [] |
---|
66 | self._pending[peerid].append( (d, shares) ) |
---|
67 | return d |
---|
68 | |
---|
69 | def _fire_readers(self): |
---|
70 | self._pending_timer = None |
---|
71 | pending = self._pending |
---|
72 | self._pending = {} |
---|
73 | for peerid in self._sequence: |
---|
74 | if peerid in pending: |
---|
75 | for (d, shares) in pending.pop(peerid): |
---|
76 | eventually(d.callback, shares) |
---|
77 | for peerid in pending: |
---|
78 | for (d, shares) in pending[peerid]: |
---|
79 | eventually(d.callback, shares) |
---|
80 | |
---|
81 | def write(self, peerid, storage_index, shnum, offset, data): |
---|
82 | if peerid not in self._peers: |
---|
83 | self._peers[peerid] = {} |
---|
84 | shares = self._peers[peerid] |
---|
85 | f = BytesIO() |
---|
86 | f.write(shares.get(shnum, b"")) |
---|
87 | f.seek(offset) |
---|
88 | f.write(data) |
---|
89 | shares[shnum] = f.getvalue() |
---|
90 | |
---|
91 | |
---|
92 | # This doesn't actually implement the whole interface, but adding a commented |
---|
93 | # interface implementation annotation for grepping purposes. |
---|
94 | #@implementer(RIStorageServer) |
---|
95 | class FakeStorageServer: |
---|
96 | """ |
---|
97 | A fake Foolscap remote object, implemented by overriding callRemote() to |
---|
98 | call local methods. |
---|
99 | """ |
---|
100 | def __init__(self, peerid, storage): |
---|
101 | self.peerid = peerid |
---|
102 | self.storage = storage |
---|
103 | self.queries = 0 |
---|
104 | |
---|
105 | def callRemote(self, methname, *args, **kwargs): |
---|
106 | self.queries += 1 |
---|
107 | def _call(): |
---|
108 | meth = getattr(self, methname) |
---|
109 | return meth(*args, **kwargs) |
---|
110 | d = fireEventually() |
---|
111 | d.addCallback(lambda res: _call()) |
---|
112 | return d |
---|
113 | |
---|
114 | def callRemoteOnly(self, methname, *args, **kwargs): |
---|
115 | self.queries += 1 |
---|
116 | d = self.callRemote(methname, *args, **kwargs) |
---|
117 | d.addBoth(lambda ignore: None) |
---|
118 | pass |
---|
119 | |
---|
120 | def advise_corrupt_share(self, share_type, storage_index, shnum, reason): |
---|
121 | pass |
---|
122 | |
---|
123 | def slot_readv(self, storage_index, shnums, readv): |
---|
124 | d = self.storage.read(self.peerid, storage_index) |
---|
125 | def _read(shares): |
---|
126 | response = {} |
---|
127 | for shnum in shares: |
---|
128 | if shnums and shnum not in shnums: |
---|
129 | continue |
---|
130 | vector = response[shnum] = [] |
---|
131 | for (offset, length) in readv: |
---|
132 | assert isinstance(offset, int), offset |
---|
133 | assert isinstance(length, int), length |
---|
134 | vector.append(shares[shnum][offset:offset+length]) |
---|
135 | return response |
---|
136 | d.addCallback(_read) |
---|
137 | return d |
---|
138 | |
---|
139 | def slot_testv_and_readv_and_writev(self, storage_index, secrets, |
---|
140 | tw_vectors, read_vector): |
---|
141 | # always-pass: parrot the test vectors back to them. |
---|
142 | readv = {} |
---|
143 | for shnum, (testv, writev, new_length) in list(tw_vectors.items()): |
---|
144 | for (offset, length, op, specimen) in testv: |
---|
145 | assert op == b"eq" |
---|
146 | # TODO: this isn't right, the read is controlled by read_vector, |
---|
147 | # not by testv |
---|
148 | readv[shnum] = [ specimen |
---|
149 | for (offset, length, op, specimen) |
---|
150 | in testv ] |
---|
151 | for (offset, data) in writev: |
---|
152 | self.storage.write(self.peerid, storage_index, shnum, |
---|
153 | offset, data) |
---|
154 | answer = (True, readv) |
---|
155 | return fireEventually(answer) |
---|
156 | |
---|
157 | |
---|
158 | def flip_bit(original, byte_offset): |
---|
159 | return (original[:byte_offset] + |
---|
160 | bchr(ord(original[byte_offset:byte_offset+1]) ^ 0x01) + |
---|
161 | original[byte_offset+1:]) |
---|
162 | |
---|
163 | def add_two(original, byte_offset): |
---|
164 | # It isn't enough to simply flip the bit for the version number, |
---|
165 | # because 1 is a valid version number. So we add two instead. |
---|
166 | return (original[:byte_offset] + |
---|
167 | bchr(ord(original[byte_offset:byte_offset+1]) ^ 0x02) + |
---|
168 | original[byte_offset+1:]) |
---|
169 | |
---|
170 | def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0): |
---|
171 | # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a |
---|
172 | # list of shnums to corrupt. |
---|
173 | ds = [] |
---|
174 | for peerid in s._peers: |
---|
175 | shares = s._peers[peerid] |
---|
176 | for shnum in shares: |
---|
177 | if (shnums_to_corrupt is not None |
---|
178 | and shnum not in shnums_to_corrupt): |
---|
179 | continue |
---|
180 | data = shares[shnum] |
---|
181 | # We're feeding the reader all of the share data, so it |
---|
182 | # won't need to use the rref that we didn't provide, nor the |
---|
183 | # storage index that we didn't provide. We do this because |
---|
184 | # the reader will work for both MDMF and SDMF. |
---|
185 | reader = MDMFSlotReadProxy(None, None, shnum, data) |
---|
186 | # We need to get the offsets for the next part. |
---|
187 | d = reader.get_verinfo() |
---|
188 | def _do_corruption(verinfo, data, shnum, shares): |
---|
189 | (seqnum, |
---|
190 | root_hash, |
---|
191 | IV, |
---|
192 | segsize, |
---|
193 | datalen, |
---|
194 | k, n, prefix, o) = verinfo |
---|
195 | if isinstance(offset, tuple): |
---|
196 | offset1, offset2 = offset |
---|
197 | else: |
---|
198 | offset1 = offset |
---|
199 | offset2 = 0 |
---|
200 | if offset1 == "pubkey" and IV: |
---|
201 | real_offset = 107 |
---|
202 | elif offset1 in o: |
---|
203 | real_offset = o[offset1] |
---|
204 | else: |
---|
205 | real_offset = offset1 |
---|
206 | real_offset = int(real_offset) + offset2 + offset_offset |
---|
207 | assert isinstance(real_offset, int), offset |
---|
208 | if offset1 == 0: # verbyte |
---|
209 | f = add_two |
---|
210 | else: |
---|
211 | f = flip_bit |
---|
212 | shares[shnum] = f(data, real_offset) |
---|
213 | d.addCallback(_do_corruption, data, shnum, shares) |
---|
214 | ds.append(d) |
---|
215 | dl = defer.DeferredList(ds) |
---|
216 | dl.addCallback(lambda ignored: res) |
---|
217 | return dl |
---|
218 | |
---|
219 | @attr.s |
---|
220 | class Peer: |
---|
221 | peerid = attr.ib() |
---|
222 | storage_server = attr.ib() |
---|
223 | announcement = attr.ib() |
---|
224 | |
---|
225 | def make_peer(s, i): |
---|
226 | """ |
---|
227 | Create a "peer" suitable for use with ``make_storagebroker_with_peers`` or |
---|
228 | ``make_nodemaker_with_peers``. |
---|
229 | |
---|
230 | :param IServer s: The server with which to associate the peers. |
---|
231 | |
---|
232 | :param int i: A unique identifier for this peer within the whole group of |
---|
233 | peers to be used. For example, a sequence number. This is used to |
---|
234 | generate a unique peer id. |
---|
235 | |
---|
236 | :rtype: ``Peer`` |
---|
237 | """ |
---|
238 | peerid = base32.b2a(tagged_hash(b"peerid", b"%d" % i)[:20]) |
---|
239 | fss = FakeStorageServer(peerid, s) |
---|
240 | ann = { |
---|
241 | "anonymous-storage-FURL": "pb://%s@nowhere/fake" % (str(peerid, "utf-8"),), |
---|
242 | "permutation-seed-base32": peerid, |
---|
243 | } |
---|
244 | return Peer(peerid=peerid, storage_server=fss, announcement=ann) |
---|
245 | |
---|
246 | |
---|
247 | def make_storagebroker(s=None, num_peers=10): |
---|
248 | """ |
---|
249 | Make a ``StorageFarmBroker`` connected to some number of fake storage |
---|
250 | servers. |
---|
251 | |
---|
252 | :param IServer s: The server with which to associate the fake storage |
---|
253 | servers. |
---|
254 | |
---|
255 | :param int num_peers: The number of fake storage servers to associate with |
---|
256 | the broker. |
---|
257 | """ |
---|
258 | if not s: |
---|
259 | s = FakeStorage() |
---|
260 | peers = [] |
---|
261 | for peer_num in range(num_peers): |
---|
262 | peers.append(make_peer(s, peer_num)) |
---|
263 | return make_storagebroker_with_peers(peers) |
---|
264 | |
---|
265 | |
---|
266 | def make_storagebroker_with_peers(peers): |
---|
267 | """ |
---|
268 | Make a ``StorageFarmBroker`` connected to the given storage servers. |
---|
269 | |
---|
270 | :param list peers: The storage servers to associate with the storage |
---|
271 | broker. |
---|
272 | """ |
---|
273 | storage_broker = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG) |
---|
274 | for peer in peers: |
---|
275 | storage_broker.test_add_rref( |
---|
276 | peer.peerid, |
---|
277 | peer.storage_server, |
---|
278 | peer.announcement, |
---|
279 | ) |
---|
280 | return storage_broker |
---|
281 | |
---|
282 | |
---|
283 | def make_nodemaker(s=None, num_peers=10): |
---|
284 | """ |
---|
285 | Make a ``NodeMaker`` connected to some number of fake storage servers. |
---|
286 | |
---|
287 | :param IServer s: The server with which to associate the fake storage |
---|
288 | servers. |
---|
289 | |
---|
290 | :param int num_peers: The number of fake storage servers to associate with |
---|
291 | the node maker. |
---|
292 | """ |
---|
293 | storage_broker = make_storagebroker(s, num_peers) |
---|
294 | return make_nodemaker_with_storage_broker(storage_broker) |
---|
295 | |
---|
296 | |
---|
297 | def make_nodemaker_with_peers(peers): |
---|
298 | """ |
---|
299 | Make a ``NodeMaker`` connected to the given storage servers. |
---|
300 | |
---|
301 | :param list peers: The storage servers to associate with the node maker. |
---|
302 | """ |
---|
303 | storage_broker = make_storagebroker_with_peers(peers) |
---|
304 | return make_nodemaker_with_storage_broker(storage_broker) |
---|
305 | |
---|
306 | |
---|
307 | def make_nodemaker_with_storage_broker(storage_broker): |
---|
308 | """ |
---|
309 | Make a ``NodeMaker`` using the given storage broker. |
---|
310 | |
---|
311 | :param StorageFarmBroker peers: The storage broker to use. |
---|
312 | """ |
---|
313 | sh = client.SecretHolder(b"lease secret", b"convergence secret") |
---|
314 | keygen = client.KeyGenerator() |
---|
315 | nodemaker = NodeMaker(storage_broker, sh, None, |
---|
316 | None, None, |
---|
317 | {"k": 3, "n": 10}, SDMF_VERSION, keygen) |
---|
318 | return nodemaker |
---|
319 | |
---|
320 | |
---|
321 | class PublishMixin: |
---|
322 | def publish_one(self): |
---|
323 | # publish a file and create shares, which can then be manipulated |
---|
324 | # later. |
---|
325 | self.CONTENTS = b"New contents go here" * 1000 |
---|
326 | self.uploadable = MutableData(self.CONTENTS) |
---|
327 | self._storage = FakeStorage() |
---|
328 | self._nodemaker = make_nodemaker(self._storage) |
---|
329 | self._storage_broker = self._nodemaker.storage_broker |
---|
330 | d = self._nodemaker.create_mutable_file(self.uploadable) |
---|
331 | def _created(node): |
---|
332 | self._fn = node |
---|
333 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
---|
334 | d.addCallback(_created) |
---|
335 | return d |
---|
336 | |
---|
337 | def publish_mdmf(self, data=None): |
---|
338 | # like publish_one, except that the result is guaranteed to be |
---|
339 | # an MDMF file. |
---|
340 | # self.CONTENTS should have more than one segment. |
---|
341 | if data is None: |
---|
342 | data = b"This is an MDMF file" * 100000 |
---|
343 | self.CONTENTS = data |
---|
344 | self.uploadable = MutableData(self.CONTENTS) |
---|
345 | self._storage = FakeStorage() |
---|
346 | self._nodemaker = make_nodemaker(self._storage) |
---|
347 | self._storage_broker = self._nodemaker.storage_broker |
---|
348 | d = self._nodemaker.create_mutable_file(self.uploadable, version=MDMF_VERSION) |
---|
349 | def _created(node): |
---|
350 | self._fn = node |
---|
351 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
---|
352 | d.addCallback(_created) |
---|
353 | return d |
---|
354 | |
---|
355 | |
---|
356 | def publish_sdmf(self, data=None): |
---|
357 | # like publish_one, except that the result is guaranteed to be |
---|
358 | # an SDMF file |
---|
359 | if data is None: |
---|
360 | data = b"This is an SDMF file" * 1000 |
---|
361 | self.CONTENTS = data |
---|
362 | self.uploadable = MutableData(self.CONTENTS) |
---|
363 | self._storage = FakeStorage() |
---|
364 | self._nodemaker = make_nodemaker(self._storage) |
---|
365 | self._storage_broker = self._nodemaker.storage_broker |
---|
366 | d = self._nodemaker.create_mutable_file(self.uploadable, version=SDMF_VERSION) |
---|
367 | def _created(node): |
---|
368 | self._fn = node |
---|
369 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
---|
370 | d.addCallback(_created) |
---|
371 | return d |
---|
372 | |
---|
373 | |
---|
374 | def publish_multiple(self, version=0): |
---|
375 | self.CONTENTS = [b"Contents 0", |
---|
376 | b"Contents 1", |
---|
377 | b"Contents 2", |
---|
378 | b"Contents 3a", |
---|
379 | b"Contents 3b"] |
---|
380 | self.uploadables = [MutableData(d) for d in self.CONTENTS] |
---|
381 | self._copied_shares = {} |
---|
382 | self._storage = FakeStorage() |
---|
383 | self._nodemaker = make_nodemaker(self._storage) |
---|
384 | d = self._nodemaker.create_mutable_file(self.uploadables[0], version=version) # seqnum=1 |
---|
385 | def _created(node): |
---|
386 | self._fn = node |
---|
387 | # now create multiple versions of the same file, and accumulate |
---|
388 | # their shares, so we can mix and match them later. |
---|
389 | d = defer.succeed(None) |
---|
390 | d.addCallback(self._copy_shares, 0) |
---|
391 | d.addCallback(lambda res: node.overwrite(self.uploadables[1])) #s2 |
---|
392 | d.addCallback(self._copy_shares, 1) |
---|
393 | d.addCallback(lambda res: node.overwrite(self.uploadables[2])) #s3 |
---|
394 | d.addCallback(self._copy_shares, 2) |
---|
395 | d.addCallback(lambda res: node.overwrite(self.uploadables[3])) #s4a |
---|
396 | d.addCallback(self._copy_shares, 3) |
---|
397 | # now we replace all the shares with version s3, and upload a new |
---|
398 | # version to get s4b. |
---|
399 | rollback = dict([(i,2) for i in range(10)]) |
---|
400 | d.addCallback(lambda res: self._set_versions(rollback)) |
---|
401 | d.addCallback(lambda res: node.overwrite(self.uploadables[4])) #s4b |
---|
402 | d.addCallback(self._copy_shares, 4) |
---|
403 | # we leave the storage in state 4 |
---|
404 | return d |
---|
405 | d.addCallback(_created) |
---|
406 | return d |
---|
407 | |
---|
408 | |
---|
409 | def _copy_shares(self, ignored, index): |
---|
410 | shares = self._storage._peers |
---|
411 | # we need a deep copy |
---|
412 | new_shares = {} |
---|
413 | for peerid in shares: |
---|
414 | new_shares[peerid] = {} |
---|
415 | for shnum in shares[peerid]: |
---|
416 | new_shares[peerid][shnum] = shares[peerid][shnum] |
---|
417 | self._copied_shares[index] = new_shares |
---|
418 | |
---|
419 | def _set_versions(self, versionmap): |
---|
420 | # versionmap maps shnums to which version (0,1,2,3,4) we want the |
---|
421 | # share to be at. Any shnum which is left out of the map will stay at |
---|
422 | # its current version. |
---|
423 | shares = self._storage._peers |
---|
424 | oldshares = self._copied_shares |
---|
425 | for peerid in shares: |
---|
426 | for shnum in shares[peerid]: |
---|
427 | if shnum in versionmap: |
---|
428 | index = versionmap[shnum] |
---|
429 | shares[peerid][shnum] = oldshares[index][peerid][shnum] |
---|
430 | |
---|
431 | class CheckerMixin: |
---|
432 | def check_good(self, r, where): |
---|
433 | self.failUnless(r.is_healthy(), where) |
---|
434 | return r |
---|
435 | |
---|
436 | def check_bad(self, r, where): |
---|
437 | self.failIf(r.is_healthy(), where) |
---|
438 | return r |
---|
439 | |
---|
440 | def check_expected_failure(self, r, expected_exception, substring, where): |
---|
441 | for (peerid, storage_index, shnum, f) in r.get_share_problems(): |
---|
442 | if f.check(expected_exception): |
---|
443 | self.failUnless(substring in str(f), |
---|
444 | "%s: substring '%s' not in '%s'" % |
---|
445 | (where, substring, str(f))) |
---|
446 | return |
---|
447 | self.fail("%s: didn't see expected exception %s in problems %s" % |
---|
448 | (where, expected_exception, r.get_share_problems())) |
---|