1 | import random, weakref, itertools, time |
---|
2 | from zope.interface import implements |
---|
3 | from twisted.internet import defer |
---|
4 | from twisted.internet.interfaces import IPushProducer, IConsumer |
---|
5 | from foolscap.api import DeadReferenceError, RemoteException, eventually |
---|
6 | |
---|
7 | from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib |
---|
8 | from allmydata.util.assertutil import _assert, precondition |
---|
9 | from allmydata import codec, hashtree, uri |
---|
10 | from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \ |
---|
11 | IDownloadStatus, IDownloadResults, IValidatedThingProxy, \ |
---|
12 | IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \ |
---|
13 | UnableToFetchCriticalDownloadDataError |
---|
14 | from allmydata.immutable import layout |
---|
15 | from allmydata.monitor import Monitor |
---|
16 | from pycryptopp.cipher.aes import AES |
---|
17 | |
---|
18 | class IntegrityCheckReject(Exception): |
---|
19 | pass |
---|
20 | |
---|
21 | class BadURIExtensionHashValue(IntegrityCheckReject): |
---|
22 | pass |
---|
23 | class BadURIExtension(IntegrityCheckReject): |
---|
24 | pass |
---|
25 | class UnsupportedErasureCodec(BadURIExtension): |
---|
26 | pass |
---|
27 | class BadCrypttextHashValue(IntegrityCheckReject): |
---|
28 | pass |
---|
29 | class BadOrMissingHash(IntegrityCheckReject): |
---|
30 | pass |
---|
31 | |
---|
32 | class DownloadStopped(Exception): |
---|
33 | pass |
---|
34 | |
---|
35 | class DownloadResults: |
---|
36 | implements(IDownloadResults) |
---|
37 | |
---|
38 | def __init__(self): |
---|
39 | self.servers_used = set() |
---|
40 | self.server_problems = {} |
---|
41 | self.servermap = {} |
---|
42 | self.timings = {} |
---|
43 | self.file_size = None |
---|
44 | |
---|
45 | class DecryptingTarget(log.PrefixingLogMixin): |
---|
46 | implements(IDownloadTarget, IConsumer) |
---|
47 | def __init__(self, target, key, _log_msg_id=None): |
---|
48 | precondition(IDownloadTarget.providedBy(target), target) |
---|
49 | self.target = target |
---|
50 | self._decryptor = AES(key) |
---|
51 | prefix = str(target) |
---|
52 | log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix) |
---|
53 | # methods to satisfy the IConsumer interface |
---|
54 | def registerProducer(self, producer, streaming): |
---|
55 | if IConsumer.providedBy(self.target): |
---|
56 | self.target.registerProducer(producer, streaming) |
---|
57 | def unregisterProducer(self): |
---|
58 | if IConsumer.providedBy(self.target): |
---|
59 | self.target.unregisterProducer() |
---|
60 | def write(self, ciphertext): |
---|
61 | plaintext = self._decryptor.process(ciphertext) |
---|
62 | self.target.write(plaintext) |
---|
63 | def open(self, size): |
---|
64 | self.target.open(size) |
---|
65 | def close(self): |
---|
66 | self.target.close() |
---|
67 | def finish(self): |
---|
68 | return self.target.finish() |
---|
69 | # The following methods is just to pass through to the next target, and |
---|
70 | # just because that target might be a repairer.DownUpConnector, and just |
---|
71 | # because the current CHKUpload object expects to find the storage index |
---|
72 | # in its Uploadable. |
---|
73 | def set_storageindex(self, storageindex): |
---|
74 | self.target.set_storageindex(storageindex) |
---|
75 | def set_encodingparams(self, encodingparams): |
---|
76 | self.target.set_encodingparams(encodingparams) |
---|
77 | |
---|
78 | class ValidatedThingObtainer: |
---|
79 | def __init__(self, validatedthingproxies, debugname, log_id): |
---|
80 | self._validatedthingproxies = validatedthingproxies |
---|
81 | self._debugname = debugname |
---|
82 | self._log_id = log_id |
---|
83 | |
---|
84 | def _bad(self, f, validatedthingproxy): |
---|
85 | f.trap(RemoteException, DeadReferenceError, |
---|
86 | IntegrityCheckReject, layout.LayoutInvalid, |
---|
87 | layout.ShareVersionIncompatible) |
---|
88 | level = log.WEIRD |
---|
89 | if f.check(DeadReferenceError): |
---|
90 | level = log.UNUSUAL |
---|
91 | elif f.check(RemoteException): |
---|
92 | level = log.WEIRD |
---|
93 | else: |
---|
94 | level = log.SCARY |
---|
95 | log.msg(parent=self._log_id, facility="tahoe.immutable.download", |
---|
96 | format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed", |
---|
97 | op=self._debugname, validatedthingproxy=str(validatedthingproxy), |
---|
98 | failure=f, level=level, umid="JGXxBA") |
---|
99 | if not self._validatedthingproxies: |
---|
100 | raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,)) |
---|
101 | # try again with a different one |
---|
102 | d = self._try_the_next_one() |
---|
103 | return d |
---|
104 | |
---|
105 | def _try_the_next_one(self): |
---|
106 | vtp = self._validatedthingproxies.pop(0) |
---|
107 | # start() obtains, validates, and callsback-with the thing or else |
---|
108 | # errbacks |
---|
109 | d = vtp.start() |
---|
110 | d.addErrback(self._bad, vtp) |
---|
111 | return d |
---|
112 | |
---|
113 | def start(self): |
---|
114 | return self._try_the_next_one() |
---|
115 | |
---|
116 | class ValidatedCrypttextHashTreeProxy: |
---|
117 | implements(IValidatedThingProxy) |
---|
118 | """ I am a front-end for a remote crypttext hash tree using a local |
---|
119 | ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the |
---|
120 | Validated Thing protocol (i.e., I have a start() method that fires with |
---|
121 | self once I get a valid one).""" |
---|
122 | def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, |
---|
123 | fetch_failures=None): |
---|
124 | # fetch_failures is for debugging -- see test_encode.py |
---|
125 | self._readbucketproxy = readbucketproxy |
---|
126 | self._num_segments = num_segments |
---|
127 | self._fetch_failures = fetch_failures |
---|
128 | self._crypttext_hash_tree = crypttext_hash_tree |
---|
129 | |
---|
130 | def _validate(self, proposal): |
---|
131 | ct_hashes = dict(list(enumerate(proposal))) |
---|
132 | try: |
---|
133 | self._crypttext_hash_tree.set_hashes(ct_hashes) |
---|
134 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: |
---|
135 | if self._fetch_failures is not None: |
---|
136 | self._fetch_failures["crypttext_hash_tree"] += 1 |
---|
137 | raise BadOrMissingHash(le) |
---|
138 | # If we now have enough of the crypttext hash tree to integrity-check |
---|
139 | # *any* segment of ciphertext, then we are done. TODO: It would have |
---|
140 | # better alacrity if we downloaded only part of the crypttext hash |
---|
141 | # tree at a time. |
---|
142 | for segnum in range(self._num_segments): |
---|
143 | if self._crypttext_hash_tree.needed_hashes(segnum): |
---|
144 | raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,)) |
---|
145 | return self |
---|
146 | |
---|
147 | def start(self): |
---|
148 | d = self._readbucketproxy.get_crypttext_hashes() |
---|
149 | d.addCallback(self._validate) |
---|
150 | return d |
---|
151 | |
---|
152 | class ValidatedExtendedURIProxy: |
---|
153 | implements(IValidatedThingProxy) |
---|
154 | """ I am a front-end for a remote UEB (using a local ReadBucketProxy), |
---|
155 | responsible for retrieving and validating the elements from the UEB.""" |
---|
156 | |
---|
157 | def __init__(self, readbucketproxy, verifycap, fetch_failures=None): |
---|
158 | # fetch_failures is for debugging -- see test_encode.py |
---|
159 | self._fetch_failures = fetch_failures |
---|
160 | self._readbucketproxy = readbucketproxy |
---|
161 | precondition(IVerifierURI.providedBy(verifycap), verifycap) |
---|
162 | self._verifycap = verifycap |
---|
163 | |
---|
164 | # required |
---|
165 | self.segment_size = None |
---|
166 | self.crypttext_root_hash = None |
---|
167 | self.share_root_hash = None |
---|
168 | |
---|
169 | # computed |
---|
170 | self.block_size = None |
---|
171 | self.share_size = None |
---|
172 | self.num_segments = None |
---|
173 | self.tail_data_size = None |
---|
174 | self.tail_segment_size = None |
---|
175 | |
---|
176 | # optional |
---|
177 | self.crypttext_hash = None |
---|
178 | |
---|
179 | def __str__(self): |
---|
180 | return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string()) |
---|
181 | |
---|
182 | def _check_integrity(self, data): |
---|
183 | h = hashutil.uri_extension_hash(data) |
---|
184 | if h != self._verifycap.uri_extension_hash: |
---|
185 | msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" % |
---|
186 | (self._readbucketproxy, |
---|
187 | base32.b2a(self._verifycap.uri_extension_hash), |
---|
188 | base32.b2a(h))) |
---|
189 | if self._fetch_failures is not None: |
---|
190 | self._fetch_failures["uri_extension"] += 1 |
---|
191 | raise BadURIExtensionHashValue(msg) |
---|
192 | else: |
---|
193 | return data |
---|
194 | |
---|
195 | def _parse_and_validate(self, data): |
---|
196 | self.share_size = mathutil.div_ceil(self._verifycap.size, |
---|
197 | self._verifycap.needed_shares) |
---|
198 | |
---|
199 | d = uri.unpack_extension(data) |
---|
200 | |
---|
201 | # There are several kinds of things that can be found in a UEB. |
---|
202 | # First, things that we really need to learn from the UEB in order to |
---|
203 | # do this download. Next: things which are optional but not redundant |
---|
204 | # -- if they are present in the UEB they will get used. Next, things |
---|
205 | # that are optional and redundant. These things are required to be |
---|
206 | # consistent: they don't have to be in the UEB, but if they are in |
---|
207 | # the UEB then they will be checked for consistency with the |
---|
208 | # already-known facts, and if they are inconsistent then an exception |
---|
209 | # will be raised. These things aren't actually used -- they are just |
---|
210 | # tested for consistency and ignored. Finally: things which are |
---|
211 | # deprecated -- they ought not be in the UEB at all, and if they are |
---|
212 | # present then a warning will be logged but they are otherwise |
---|
213 | # ignored. |
---|
214 | |
---|
215 | # First, things that we really need to learn from the UEB: |
---|
216 | # segment_size, crypttext_root_hash, and share_root_hash. |
---|
217 | self.segment_size = d['segment_size'] |
---|
218 | |
---|
219 | self.block_size = mathutil.div_ceil(self.segment_size, |
---|
220 | self._verifycap.needed_shares) |
---|
221 | self.num_segments = mathutil.div_ceil(self._verifycap.size, |
---|
222 | self.segment_size) |
---|
223 | |
---|
224 | self.tail_data_size = self._verifycap.size % self.segment_size |
---|
225 | if not self.tail_data_size: |
---|
226 | self.tail_data_size = self.segment_size |
---|
227 | # padding for erasure code |
---|
228 | self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, |
---|
229 | self._verifycap.needed_shares) |
---|
230 | |
---|
231 | # Ciphertext hash tree root is mandatory, so that there is at most |
---|
232 | # one ciphertext that matches this read-cap or verify-cap. The |
---|
233 | # integrity check on the shares is not sufficient to prevent the |
---|
234 | # original encoder from creating some shares of file A and other |
---|
235 | # shares of file B. |
---|
236 | self.crypttext_root_hash = d['crypttext_root_hash'] |
---|
237 | |
---|
238 | self.share_root_hash = d['share_root_hash'] |
---|
239 | |
---|
240 | |
---|
241 | # Next: things that are optional and not redundant: crypttext_hash |
---|
242 | if d.has_key('crypttext_hash'): |
---|
243 | self.crypttext_hash = d['crypttext_hash'] |
---|
244 | if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE: |
---|
245 | raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),)) |
---|
246 | |
---|
247 | |
---|
248 | # Next: things that are optional, redundant, and required to be |
---|
249 | # consistent: codec_name, codec_params, tail_codec_params, |
---|
250 | # num_segments, size, needed_shares, total_shares |
---|
251 | if d.has_key('codec_name'): |
---|
252 | if d['codec_name'] != "crs": |
---|
253 | raise UnsupportedErasureCodec(d['codec_name']) |
---|
254 | |
---|
255 | if d.has_key('codec_params'): |
---|
256 | ucpss, ucpns, ucpts = codec.parse_params(d['codec_params']) |
---|
257 | if ucpss != self.segment_size: |
---|
258 | raise BadURIExtension("inconsistent erasure code params: " |
---|
259 | "ucpss: %s != self.segment_size: %s" % |
---|
260 | (ucpss, self.segment_size)) |
---|
261 | if ucpns != self._verifycap.needed_shares: |
---|
262 | raise BadURIExtension("inconsistent erasure code params: ucpns: %s != " |
---|
263 | "self._verifycap.needed_shares: %s" % |
---|
264 | (ucpns, self._verifycap.needed_shares)) |
---|
265 | if ucpts != self._verifycap.total_shares: |
---|
266 | raise BadURIExtension("inconsistent erasure code params: ucpts: %s != " |
---|
267 | "self._verifycap.total_shares: %s" % |
---|
268 | (ucpts, self._verifycap.total_shares)) |
---|
269 | |
---|
270 | if d.has_key('tail_codec_params'): |
---|
271 | utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params']) |
---|
272 | if utcpss != self.tail_segment_size: |
---|
273 | raise BadURIExtension("inconsistent erasure code params: utcpss: %s != " |
---|
274 | "self.tail_segment_size: %s, self._verifycap.size: %s, " |
---|
275 | "self.segment_size: %s, self._verifycap.needed_shares: %s" |
---|
276 | % (utcpss, self.tail_segment_size, self._verifycap.size, |
---|
277 | self.segment_size, self._verifycap.needed_shares)) |
---|
278 | if utcpns != self._verifycap.needed_shares: |
---|
279 | raise BadURIExtension("inconsistent erasure code params: utcpns: %s != " |
---|
280 | "self._verifycap.needed_shares: %s" % (utcpns, |
---|
281 | self._verifycap.needed_shares)) |
---|
282 | if utcpts != self._verifycap.total_shares: |
---|
283 | raise BadURIExtension("inconsistent erasure code params: utcpts: %s != " |
---|
284 | "self._verifycap.total_shares: %s" % (utcpts, |
---|
285 | self._verifycap.total_shares)) |
---|
286 | |
---|
287 | if d.has_key('num_segments'): |
---|
288 | if d['num_segments'] != self.num_segments: |
---|
289 | raise BadURIExtension("inconsistent num_segments: size: %s, " |
---|
290 | "segment_size: %s, computed_num_segments: %s, " |
---|
291 | "ueb_num_segments: %s" % (self._verifycap.size, |
---|
292 | self.segment_size, |
---|
293 | self.num_segments, d['num_segments'])) |
---|
294 | |
---|
295 | if d.has_key('size'): |
---|
296 | if d['size'] != self._verifycap.size: |
---|
297 | raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" % |
---|
298 | (self._verifycap.size, d['size'])) |
---|
299 | |
---|
300 | if d.has_key('needed_shares'): |
---|
301 | if d['needed_shares'] != self._verifycap.needed_shares: |
---|
302 | raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB " |
---|
303 | "needed shares: %s" % (self._verifycap.total_shares, |
---|
304 | d['needed_shares'])) |
---|
305 | |
---|
306 | if d.has_key('total_shares'): |
---|
307 | if d['total_shares'] != self._verifycap.total_shares: |
---|
308 | raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB " |
---|
309 | "total shares: %s" % (self._verifycap.total_shares, |
---|
310 | d['total_shares'])) |
---|
311 | |
---|
312 | # Finally, things that are deprecated and ignored: plaintext_hash, |
---|
313 | # plaintext_root_hash |
---|
314 | if d.get('plaintext_hash'): |
---|
315 | log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons " |
---|
316 | "and is no longer used. Ignoring. %s" % (self,)) |
---|
317 | if d.get('plaintext_root_hash'): |
---|
318 | log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security " |
---|
319 | "reasons and is no longer used. Ignoring. %s" % (self,)) |
---|
320 | |
---|
321 | return self |
---|
322 | |
---|
323 | def start(self): |
---|
324 | """Fetch the UEB from bucket, compare its hash to the hash from |
---|
325 | verifycap, then parse it. Returns a deferred which is called back |
---|
326 | with self once the fetch is successful, or is erred back if it |
---|
327 | fails.""" |
---|
328 | d = self._readbucketproxy.get_uri_extension() |
---|
329 | d.addCallback(self._check_integrity) |
---|
330 | d.addCallback(self._parse_and_validate) |
---|
331 | return d |
---|
332 | |
---|
333 | class ValidatedReadBucketProxy(log.PrefixingLogMixin): |
---|
334 | """I am a front-end for a remote storage bucket, responsible for |
---|
335 | retrieving and validating data from that bucket. |
---|
336 | |
---|
337 | My get_block() method is used by BlockDownloaders. |
---|
338 | """ |
---|
339 | |
---|
340 | def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, |
---|
341 | block_size, share_size): |
---|
342 | """ share_hash_tree is required to have already been initialized with |
---|
343 | the root hash (the number-0 hash), using the share_root_hash from the |
---|
344 | UEB""" |
---|
345 | precondition(share_hash_tree[0] is not None, share_hash_tree) |
---|
346 | prefix = "%d-%s-%s" % (sharenum, bucket, |
---|
347 | base32.b2a_l(share_hash_tree[0][:8], 60)) |
---|
348 | log.PrefixingLogMixin.__init__(self, |
---|
349 | facility="tahoe.immutable.download", |
---|
350 | prefix=prefix) |
---|
351 | self.sharenum = sharenum |
---|
352 | self.bucket = bucket |
---|
353 | self.share_hash_tree = share_hash_tree |
---|
354 | self.num_blocks = num_blocks |
---|
355 | self.block_size = block_size |
---|
356 | self.share_size = share_size |
---|
357 | self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks) |
---|
358 | |
---|
359 | def get_all_sharehashes(self): |
---|
360 | """Retrieve and validate all the share-hash-tree nodes that are |
---|
361 | included in this share, regardless of whether we need them to |
---|
362 | validate the share or not. Each share contains a minimal Merkle tree |
---|
363 | chain, but there is lots of overlap, so usually we'll be using hashes |
---|
364 | from other shares and not reading every single hash from this share. |
---|
365 | The Verifier uses this function to read and validate every single |
---|
366 | hash from this share. |
---|
367 | |
---|
368 | Call this (and wait for the Deferred it returns to fire) before |
---|
369 | calling get_block() for the first time: this lets us check that the |
---|
370 | share share contains enough hashes to validate its own data, and |
---|
371 | avoids downloading any share hash twice. |
---|
372 | |
---|
373 | I return a Deferred which errbacks upon failure, probably with |
---|
374 | BadOrMissingHash.""" |
---|
375 | |
---|
376 | d = self.bucket.get_share_hashes() |
---|
377 | def _got_share_hashes(sh): |
---|
378 | sharehashes = dict(sh) |
---|
379 | try: |
---|
380 | self.share_hash_tree.set_hashes(sharehashes) |
---|
381 | except IndexError, le: |
---|
382 | raise BadOrMissingHash(le) |
---|
383 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: |
---|
384 | raise BadOrMissingHash(le) |
---|
385 | d.addCallback(_got_share_hashes) |
---|
386 | return d |
---|
387 | |
---|
388 | def get_all_blockhashes(self): |
---|
389 | """Retrieve and validate all the block-hash-tree nodes that are |
---|
390 | included in this share. Each share contains a full Merkle tree, but |
---|
391 | we usually only fetch the minimal subset necessary for any particular |
---|
392 | block. This function fetches everything at once. The Verifier uses |
---|
393 | this function to validate the block hash tree. |
---|
394 | |
---|
395 | Call this (and wait for the Deferred it returns to fire) after |
---|
396 | calling get_all_sharehashes() and before calling get_block() for the |
---|
397 | first time: this lets us check that the share contains all block |
---|
398 | hashes and avoids downloading them multiple times. |
---|
399 | |
---|
400 | I return a Deferred which errbacks upon failure, probably with |
---|
401 | BadOrMissingHash. |
---|
402 | """ |
---|
403 | |
---|
404 | # get_block_hashes(anything) currently always returns everything |
---|
405 | needed = list(range(len(self.block_hash_tree))) |
---|
406 | d = self.bucket.get_block_hashes(needed) |
---|
407 | def _got_block_hashes(blockhashes): |
---|
408 | if len(blockhashes) < len(self.block_hash_tree): |
---|
409 | raise BadOrMissingHash() |
---|
410 | bh = dict(enumerate(blockhashes)) |
---|
411 | |
---|
412 | try: |
---|
413 | self.block_hash_tree.set_hashes(bh) |
---|
414 | except IndexError, le: |
---|
415 | raise BadOrMissingHash(le) |
---|
416 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: |
---|
417 | raise BadOrMissingHash(le) |
---|
418 | d.addCallback(_got_block_hashes) |
---|
419 | return d |
---|
420 | |
---|
421 | def get_all_crypttext_hashes(self, crypttext_hash_tree): |
---|
422 | """Retrieve and validate all the crypttext-hash-tree nodes that are |
---|
423 | in this share. Normally we don't look at these at all: the download |
---|
424 | process fetches them incrementally as needed to validate each segment |
---|
425 | of ciphertext. But this is a convenient place to give the Verifier a |
---|
426 | function to validate all of these at once. |
---|
427 | |
---|
428 | Call this with a new hashtree object for each share, initialized with |
---|
429 | the crypttext hash tree root. I return a Deferred which errbacks upon |
---|
430 | failure, probably with BadOrMissingHash. |
---|
431 | """ |
---|
432 | |
---|
433 | # get_crypttext_hashes() always returns everything |
---|
434 | d = self.bucket.get_crypttext_hashes() |
---|
435 | def _got_crypttext_hashes(hashes): |
---|
436 | if len(hashes) < len(crypttext_hash_tree): |
---|
437 | raise BadOrMissingHash() |
---|
438 | ct_hashes = dict(enumerate(hashes)) |
---|
439 | try: |
---|
440 | crypttext_hash_tree.set_hashes(ct_hashes) |
---|
441 | except IndexError, le: |
---|
442 | raise BadOrMissingHash(le) |
---|
443 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: |
---|
444 | raise BadOrMissingHash(le) |
---|
445 | d.addCallback(_got_crypttext_hashes) |
---|
446 | return d |
---|
447 | |
---|
448 | def get_block(self, blocknum): |
---|
449 | # the first time we use this bucket, we need to fetch enough elements |
---|
450 | # of the share hash tree to validate it from our share hash up to the |
---|
451 | # hashroot. |
---|
452 | if self.share_hash_tree.needed_hashes(self.sharenum): |
---|
453 | d1 = self.bucket.get_share_hashes() |
---|
454 | else: |
---|
455 | d1 = defer.succeed([]) |
---|
456 | |
---|
457 | # We might need to grab some elements of our block hash tree, to |
---|
458 | # validate the requested block up to the share hash. |
---|
459 | blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True) |
---|
460 | # We don't need the root of the block hash tree, as that comes in the |
---|
461 | # share tree. |
---|
462 | blockhashesneeded.discard(0) |
---|
463 | d2 = self.bucket.get_block_hashes(blockhashesneeded) |
---|
464 | |
---|
465 | if blocknum < self.num_blocks-1: |
---|
466 | thisblocksize = self.block_size |
---|
467 | else: |
---|
468 | thisblocksize = self.share_size % self.block_size |
---|
469 | if thisblocksize == 0: |
---|
470 | thisblocksize = self.block_size |
---|
471 | d3 = self.bucket.get_block_data(blocknum, |
---|
472 | self.block_size, thisblocksize) |
---|
473 | |
---|
474 | dl = deferredutil.gatherResults([d1, d2, d3]) |
---|
475 | dl.addCallback(self._got_data, blocknum) |
---|
476 | return dl |
---|
477 | |
---|
478 | def _got_data(self, results, blocknum): |
---|
479 | precondition(blocknum < self.num_blocks, |
---|
480 | self, blocknum, self.num_blocks) |
---|
481 | sharehashes, blockhashes, blockdata = results |
---|
482 | try: |
---|
483 | sharehashes = dict(sharehashes) |
---|
484 | except ValueError, le: |
---|
485 | le.args = tuple(le.args + (sharehashes,)) |
---|
486 | raise |
---|
487 | blockhashes = dict(enumerate(blockhashes)) |
---|
488 | |
---|
489 | candidate_share_hash = None # in case we log it in the except block below |
---|
490 | blockhash = None # in case we log it in the except block below |
---|
491 | |
---|
492 | try: |
---|
493 | if self.share_hash_tree.needed_hashes(self.sharenum): |
---|
494 | # This will raise exception if the values being passed do not |
---|
495 | # match the root node of self.share_hash_tree. |
---|
496 | try: |
---|
497 | self.share_hash_tree.set_hashes(sharehashes) |
---|
498 | except IndexError, le: |
---|
499 | # Weird -- sharehashes contained index numbers outside of |
---|
500 | # the range that fit into this hash tree. |
---|
501 | raise BadOrMissingHash(le) |
---|
502 | |
---|
503 | # To validate a block we need the root of the block hash tree, |
---|
504 | # which is also one of the leafs of the share hash tree, and is |
---|
505 | # called "the share hash". |
---|
506 | if not self.block_hash_tree[0]: # empty -- no root node yet |
---|
507 | # Get the share hash from the share hash tree. |
---|
508 | share_hash = self.share_hash_tree.get_leaf(self.sharenum) |
---|
509 | if not share_hash: |
---|
510 | # No root node in block_hash_tree and also the share hash |
---|
511 | # wasn't sent by the server. |
---|
512 | raise hashtree.NotEnoughHashesError |
---|
513 | self.block_hash_tree.set_hashes({0: share_hash}) |
---|
514 | |
---|
515 | if self.block_hash_tree.needed_hashes(blocknum): |
---|
516 | self.block_hash_tree.set_hashes(blockhashes) |
---|
517 | |
---|
518 | blockhash = hashutil.block_hash(blockdata) |
---|
519 | self.block_hash_tree.set_hashes(leaves={blocknum: blockhash}) |
---|
520 | #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d " |
---|
521 | # "%r .. %r: %s" % |
---|
522 | # (self.sharenum, blocknum, len(blockdata), |
---|
523 | # blockdata[:50], blockdata[-50:], base32.b2a(blockhash))) |
---|
524 | |
---|
525 | except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: |
---|
526 | # log.WEIRD: indicates undetected disk/network error, or more |
---|
527 | # likely a programming error |
---|
528 | self.log("hash failure in block=%d, shnum=%d on %s" % |
---|
529 | (blocknum, self.sharenum, self.bucket)) |
---|
530 | if self.block_hash_tree.needed_hashes(blocknum): |
---|
531 | self.log(""" failure occurred when checking the block_hash_tree. |
---|
532 | This suggests that either the block data was bad, or that the |
---|
533 | block hashes we received along with it were bad.""") |
---|
534 | else: |
---|
535 | self.log(""" the failure probably occurred when checking the |
---|
536 | share_hash_tree, which suggests that the share hashes we |
---|
537 | received from the remote peer were bad.""") |
---|
538 | self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash)) |
---|
539 | self.log(" block length: %d" % len(blockdata)) |
---|
540 | self.log(" block hash: %s" % base32.b2a_or_none(blockhash)) |
---|
541 | if len(blockdata) < 100: |
---|
542 | self.log(" block data: %r" % (blockdata,)) |
---|
543 | else: |
---|
544 | self.log(" block data start/end: %r .. %r" % |
---|
545 | (blockdata[:50], blockdata[-50:])) |
---|
546 | self.log(" share hash tree:\n" + self.share_hash_tree.dump()) |
---|
547 | self.log(" block hash tree:\n" + self.block_hash_tree.dump()) |
---|
548 | lines = [] |
---|
549 | for i,h in sorted(sharehashes.items()): |
---|
550 | lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) |
---|
551 | self.log(" sharehashes:\n" + "\n".join(lines) + "\n") |
---|
552 | lines = [] |
---|
553 | for i,h in blockhashes.items(): |
---|
554 | lines.append("%3d: %s" % (i, base32.b2a_or_none(h))) |
---|
555 | log.msg(" blockhashes:\n" + "\n".join(lines) + "\n") |
---|
556 | raise BadOrMissingHash(le) |
---|
557 | |
---|
558 | # If we made it here, the block is good. If the hash trees didn't |
---|
559 | # like what they saw, they would have raised a BadHashError, causing |
---|
560 | # our caller to see a Failure and thus ignore this block (as well as |
---|
561 | # dropping this bucket). |
---|
562 | return blockdata |
---|
563 | |
---|
564 | |
---|
565 | |
---|
566 | class BlockDownloader(log.PrefixingLogMixin): |
---|
567 | """I am responsible for downloading a single block (from a single bucket) |
---|
568 | for a single segment. |
---|
569 | |
---|
570 | I am a child of the SegmentDownloader. |
---|
571 | """ |
---|
572 | |
---|
573 | def __init__(self, vbucket, blocknum, parent, results): |
---|
574 | precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket) |
---|
575 | prefix = "%s-%d" % (vbucket, blocknum) |
---|
576 | log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) |
---|
577 | self.vbucket = vbucket |
---|
578 | self.blocknum = blocknum |
---|
579 | self.parent = parent |
---|
580 | self.results = results |
---|
581 | |
---|
582 | def start(self, segnum): |
---|
583 | self.log("get_block(segnum=%d)" % segnum) |
---|
584 | started = time.time() |
---|
585 | d = self.vbucket.get_block(segnum) |
---|
586 | d.addCallbacks(self._hold_block, self._got_block_error, |
---|
587 | callbackArgs=(started,)) |
---|
588 | return d |
---|
589 | |
---|
590 | def _hold_block(self, data, started): |
---|
591 | if self.results: |
---|
592 | elapsed = time.time() - started |
---|
593 | peerid = self.vbucket.bucket.get_peerid() |
---|
594 | if peerid not in self.results.timings["fetch_per_server"]: |
---|
595 | self.results.timings["fetch_per_server"][peerid] = [] |
---|
596 | self.results.timings["fetch_per_server"][peerid].append(elapsed) |
---|
597 | self.log("got block") |
---|
598 | self.parent.hold_block(self.blocknum, data) |
---|
599 | |
---|
600 | def _got_block_error(self, f): |
---|
601 | f.trap(RemoteException, DeadReferenceError, |
---|
602 | IntegrityCheckReject, layout.LayoutInvalid, |
---|
603 | layout.ShareVersionIncompatible) |
---|
604 | if f.check(RemoteException, DeadReferenceError): |
---|
605 | level = log.UNUSUAL |
---|
606 | else: |
---|
607 | level = log.WEIRD |
---|
608 | self.log("failure to get block", level=level, umid="5Z4uHQ") |
---|
609 | if self.results: |
---|
610 | peerid = self.vbucket.bucket.get_peerid() |
---|
611 | self.results.server_problems[peerid] = str(f) |
---|
612 | self.parent.bucket_failed(self.vbucket) |
---|
613 | |
---|
614 | class SegmentDownloader: |
---|
615 | """I am responsible for downloading all the blocks for a single segment |
---|
616 | of data. |
---|
617 | |
---|
618 | I am a child of the CiphertextDownloader. |
---|
619 | """ |
---|
620 | |
---|
621 | def __init__(self, parent, segmentnumber, needed_shares, results): |
---|
622 | self.parent = parent |
---|
623 | self.segmentnumber = segmentnumber |
---|
624 | self.needed_blocks = needed_shares |
---|
625 | self.blocks = {} # k: blocknum, v: data |
---|
626 | self.results = results |
---|
627 | self._log_number = self.parent.log("starting segment %d" % |
---|
628 | segmentnumber) |
---|
629 | |
---|
630 | def log(self, *args, **kwargs): |
---|
631 | if "parent" not in kwargs: |
---|
632 | kwargs["parent"] = self._log_number |
---|
633 | return self.parent.log(*args, **kwargs) |
---|
634 | |
---|
635 | def start(self): |
---|
636 | return self._download() |
---|
637 | |
---|
638 | def _download(self): |
---|
639 | d = self._try() |
---|
640 | def _done(res): |
---|
641 | if len(self.blocks) >= self.needed_blocks: |
---|
642 | # we only need self.needed_blocks blocks |
---|
643 | # we want to get the smallest blockids, because they are |
---|
644 | # more likely to be fast "primary blocks" |
---|
645 | blockids = sorted(self.blocks.keys())[:self.needed_blocks] |
---|
646 | blocks = [] |
---|
647 | for blocknum in blockids: |
---|
648 | blocks.append(self.blocks[blocknum]) |
---|
649 | return (blocks, blockids) |
---|
650 | else: |
---|
651 | return self._download() |
---|
652 | d.addCallback(_done) |
---|
653 | return d |
---|
654 | |
---|
655 | def _try(self): |
---|
656 | # fill our set of active buckets, maybe raising NotEnoughSharesError |
---|
657 | active_buckets = self.parent._activate_enough_buckets() |
---|
658 | # Now we have enough buckets, in self.parent.active_buckets. |
---|
659 | |
---|
660 | # in test cases, bd.start might mutate active_buckets right away, so |
---|
661 | # we need to put off calling start() until we've iterated all the way |
---|
662 | # through it. |
---|
663 | downloaders = [] |
---|
664 | for blocknum, vbucket in active_buckets.iteritems(): |
---|
665 | assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket |
---|
666 | bd = BlockDownloader(vbucket, blocknum, self, self.results) |
---|
667 | downloaders.append(bd) |
---|
668 | if self.results: |
---|
669 | self.results.servers_used.add(vbucket.bucket.get_peerid()) |
---|
670 | l = [bd.start(self.segmentnumber) for bd in downloaders] |
---|
671 | return defer.DeferredList(l, fireOnOneErrback=True) |
---|
672 | |
---|
673 | def hold_block(self, blocknum, data): |
---|
674 | self.blocks[blocknum] = data |
---|
675 | |
---|
676 | def bucket_failed(self, vbucket): |
---|
677 | self.parent.bucket_failed(vbucket) |
---|
678 | |
---|
679 | class DownloadStatus: |
---|
680 | implements(IDownloadStatus) |
---|
681 | statusid_counter = itertools.count(0) |
---|
682 | |
---|
683 | def __init__(self): |
---|
684 | self.storage_index = None |
---|
685 | self.size = None |
---|
686 | self.helper = False |
---|
687 | self.status = "Not started" |
---|
688 | self.progress = 0.0 |
---|
689 | self.paused = False |
---|
690 | self.stopped = False |
---|
691 | self.active = True |
---|
692 | self.results = None |
---|
693 | self.counter = self.statusid_counter.next() |
---|
694 | self.started = time.time() |
---|
695 | |
---|
696 | def get_started(self): |
---|
697 | return self.started |
---|
698 | def get_storage_index(self): |
---|
699 | return self.storage_index |
---|
700 | def get_size(self): |
---|
701 | return self.size |
---|
702 | def using_helper(self): |
---|
703 | return self.helper |
---|
704 | def get_status(self): |
---|
705 | status = self.status |
---|
706 | if self.paused: |
---|
707 | status += " (output paused)" |
---|
708 | if self.stopped: |
---|
709 | status += " (output stopped)" |
---|
710 | return status |
---|
711 | def get_progress(self): |
---|
712 | return self.progress |
---|
713 | def get_active(self): |
---|
714 | return self.active |
---|
715 | def get_results(self): |
---|
716 | return self.results |
---|
717 | def get_counter(self): |
---|
718 | return self.counter |
---|
719 | |
---|
720 | def set_storage_index(self, si): |
---|
721 | self.storage_index = si |
---|
722 | def set_size(self, size): |
---|
723 | self.size = size |
---|
724 | def set_helper(self, helper): |
---|
725 | self.helper = helper |
---|
726 | def set_status(self, status): |
---|
727 | self.status = status |
---|
728 | def set_paused(self, paused): |
---|
729 | self.paused = paused |
---|
730 | def set_stopped(self, stopped): |
---|
731 | self.stopped = stopped |
---|
732 | def set_progress(self, value): |
---|
733 | self.progress = value |
---|
734 | def set_active(self, value): |
---|
735 | self.active = value |
---|
736 | def set_results(self, value): |
---|
737 | self.results = value |
---|
738 | |
---|
739 | class CiphertextDownloader(log.PrefixingLogMixin): |
---|
740 | """ I download shares, check their integrity, then decode them, check the |
---|
741 | integrity of the resulting ciphertext, then and write it to my target. |
---|
742 | Before I send any new request to a server, I always ask the 'monitor' |
---|
743 | object that was passed into my constructor whether this task has been |
---|
744 | cancelled (by invoking its raise_if_cancelled() method).""" |
---|
745 | implements(IPushProducer) |
---|
746 | _status = None |
---|
747 | |
---|
748 | def __init__(self, storage_broker, v, target, monitor): |
---|
749 | |
---|
750 | precondition(IStorageBroker.providedBy(storage_broker), storage_broker) |
---|
751 | precondition(IVerifierURI.providedBy(v), v) |
---|
752 | precondition(IDownloadTarget.providedBy(target), target) |
---|
753 | |
---|
754 | prefix=base32.b2a_l(v.storage_index[:8], 60) |
---|
755 | log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix) |
---|
756 | self._storage_broker = storage_broker |
---|
757 | |
---|
758 | self._verifycap = v |
---|
759 | self._storage_index = v.storage_index |
---|
760 | self._uri_extension_hash = v.uri_extension_hash |
---|
761 | |
---|
762 | self._started = time.time() |
---|
763 | self._status = s = DownloadStatus() |
---|
764 | s.set_status("Starting") |
---|
765 | s.set_storage_index(self._storage_index) |
---|
766 | s.set_size(self._verifycap.size) |
---|
767 | s.set_helper(False) |
---|
768 | s.set_active(True) |
---|
769 | |
---|
770 | self._results = DownloadResults() |
---|
771 | s.set_results(self._results) |
---|
772 | self._results.file_size = self._verifycap.size |
---|
773 | self._results.timings["servers_peer_selection"] = {} |
---|
774 | self._results.timings["fetch_per_server"] = {} |
---|
775 | self._results.timings["cumulative_fetch"] = 0.0 |
---|
776 | self._results.timings["cumulative_decode"] = 0.0 |
---|
777 | self._results.timings["cumulative_decrypt"] = 0.0 |
---|
778 | self._results.timings["paused"] = 0.0 |
---|
779 | |
---|
780 | self._paused = False |
---|
781 | self._stopped = False |
---|
782 | if IConsumer.providedBy(target): |
---|
783 | target.registerProducer(self, True) |
---|
784 | self._target = target |
---|
785 | # Repairer (uploader) needs the storageindex. |
---|
786 | self._target.set_storageindex(self._storage_index) |
---|
787 | self._monitor = monitor |
---|
788 | self._opened = False |
---|
789 | |
---|
790 | self.active_buckets = {} # k: shnum, v: bucket |
---|
791 | self._share_buckets = [] # list of (sharenum, bucket) tuples |
---|
792 | self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets |
---|
793 | |
---|
794 | self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } |
---|
795 | |
---|
796 | self._ciphertext_hasher = hashutil.crypttext_hasher() |
---|
797 | |
---|
798 | self._bytes_done = 0 |
---|
799 | self._status.set_progress(float(self._bytes_done)/self._verifycap.size) |
---|
800 | |
---|
801 | # _got_uri_extension() will create the following: |
---|
802 | # self._crypttext_hash_tree |
---|
803 | # self._share_hash_tree |
---|
804 | # self._current_segnum = 0 |
---|
805 | # self._vup # ValidatedExtendedURIProxy |
---|
806 | |
---|
807 | def pauseProducing(self): |
---|
808 | if self._paused: |
---|
809 | return |
---|
810 | self._paused = defer.Deferred() |
---|
811 | self._paused_at = time.time() |
---|
812 | if self._status: |
---|
813 | self._status.set_paused(True) |
---|
814 | |
---|
815 | def resumeProducing(self): |
---|
816 | if self._paused: |
---|
817 | paused_for = time.time() - self._paused_at |
---|
818 | self._results.timings['paused'] += paused_for |
---|
819 | p = self._paused |
---|
820 | self._paused = None |
---|
821 | eventually(p.callback, None) |
---|
822 | if self._status: |
---|
823 | self._status.set_paused(False) |
---|
824 | |
---|
825 | def stopProducing(self): |
---|
826 | self.log("Download.stopProducing") |
---|
827 | self._stopped = True |
---|
828 | self.resumeProducing() |
---|
829 | if self._status: |
---|
830 | self._status.set_stopped(True) |
---|
831 | self._status.set_active(False) |
---|
832 | |
---|
833 | def start(self): |
---|
834 | self.log("starting download") |
---|
835 | |
---|
836 | # first step: who should we download from? |
---|
837 | d = defer.maybeDeferred(self._get_all_shareholders) |
---|
838 | d.addCallback(self._got_all_shareholders) |
---|
839 | # now get the uri_extension block from somebody and integrity check |
---|
840 | # it and parse and validate its contents |
---|
841 | d.addCallback(self._obtain_uri_extension) |
---|
842 | d.addCallback(self._get_crypttext_hash_tree) |
---|
843 | # once we know that, we can download blocks from everybody |
---|
844 | d.addCallback(self._download_all_segments) |
---|
845 | def _finished(res): |
---|
846 | if self._status: |
---|
847 | self._status.set_status("Finished") |
---|
848 | self._status.set_active(False) |
---|
849 | self._status.set_paused(False) |
---|
850 | if IConsumer.providedBy(self._target): |
---|
851 | self._target.unregisterProducer() |
---|
852 | return res |
---|
853 | d.addBoth(_finished) |
---|
854 | def _failed(why): |
---|
855 | if self._status: |
---|
856 | self._status.set_status("Failed") |
---|
857 | self._status.set_active(False) |
---|
858 | if why.check(DownloadStopped): |
---|
859 | # DownloadStopped just means the consumer aborted the |
---|
860 | # download; not so scary. |
---|
861 | self.log("download stopped", level=log.UNUSUAL) |
---|
862 | else: |
---|
863 | # This is really unusual, and deserves maximum forensics. |
---|
864 | self.log("download failed!", failure=why, level=log.SCARY, |
---|
865 | umid="lp1vaQ") |
---|
866 | return why |
---|
867 | d.addErrback(_failed) |
---|
868 | d.addCallback(self._done) |
---|
869 | return d |
---|
870 | |
---|
871 | def _get_all_shareholders(self): |
---|
872 | dl = [] |
---|
873 | sb = self._storage_broker |
---|
874 | servers = sb.get_servers_for_index(self._storage_index) |
---|
875 | if not servers: |
---|
876 | raise NoServersError("broker gave us no servers!") |
---|
877 | for (peerid,ss) in servers: |
---|
878 | self.log(format="sending DYHB to [%(peerid)s]", |
---|
879 | peerid=idlib.shortnodeid_b2a(peerid), |
---|
880 | level=log.NOISY, umid="rT03hg") |
---|
881 | d = ss.callRemote("get_buckets", self._storage_index) |
---|
882 | d.addCallbacks(self._got_response, self._got_error, |
---|
883 | callbackArgs=(peerid,)) |
---|
884 | dl.append(d) |
---|
885 | self._responses_received = 0 |
---|
886 | self._queries_sent = len(dl) |
---|
887 | if self._status: |
---|
888 | self._status.set_status("Locating Shares (%d/%d)" % |
---|
889 | (self._responses_received, |
---|
890 | self._queries_sent)) |
---|
891 | return defer.DeferredList(dl) |
---|
892 | |
---|
893 | def _got_response(self, buckets, peerid): |
---|
894 | self.log(format="got results from [%(peerid)s]: shnums %(shnums)s", |
---|
895 | peerid=idlib.shortnodeid_b2a(peerid), |
---|
896 | shnums=sorted(buckets.keys()), |
---|
897 | level=log.NOISY, umid="o4uwFg") |
---|
898 | self._responses_received += 1 |
---|
899 | if self._results: |
---|
900 | elapsed = time.time() - self._started |
---|
901 | self._results.timings["servers_peer_selection"][peerid] = elapsed |
---|
902 | if self._status: |
---|
903 | self._status.set_status("Locating Shares (%d/%d)" % |
---|
904 | (self._responses_received, |
---|
905 | self._queries_sent)) |
---|
906 | for sharenum, bucket in buckets.iteritems(): |
---|
907 | b = layout.ReadBucketProxy(bucket, peerid, self._storage_index) |
---|
908 | self.add_share_bucket(sharenum, b) |
---|
909 | |
---|
910 | if self._results: |
---|
911 | if peerid not in self._results.servermap: |
---|
912 | self._results.servermap[peerid] = set() |
---|
913 | self._results.servermap[peerid].add(sharenum) |
---|
914 | |
---|
915 | def add_share_bucket(self, sharenum, bucket): |
---|
916 | # this is split out for the benefit of test_encode.py |
---|
917 | self._share_buckets.append( (sharenum, bucket) ) |
---|
918 | |
---|
919 | def _got_error(self, f): |
---|
920 | level = log.WEIRD |
---|
921 | if f.check(DeadReferenceError): |
---|
922 | level = log.UNUSUAL |
---|
923 | self.log("Error during get_buckets", failure=f, level=level, |
---|
924 | umid="3uuBUQ") |
---|
925 | |
---|
926 | def bucket_failed(self, vbucket): |
---|
927 | shnum = vbucket.sharenum |
---|
928 | del self.active_buckets[shnum] |
---|
929 | s = self._share_vbuckets[shnum] |
---|
930 | # s is a set of ValidatedReadBucketProxy instances |
---|
931 | s.remove(vbucket) |
---|
932 | # ... which might now be empty |
---|
933 | if not s: |
---|
934 | # there are no more buckets which can provide this share, so |
---|
935 | # remove the key. This may prompt us to use a different share. |
---|
936 | del self._share_vbuckets[shnum] |
---|
937 | |
---|
938 | def _got_all_shareholders(self, res): |
---|
939 | if self._results: |
---|
940 | now = time.time() |
---|
941 | self._results.timings["peer_selection"] = now - self._started |
---|
942 | |
---|
943 | if len(self._share_buckets) < self._verifycap.needed_shares: |
---|
944 | msg = "Failed to get enough shareholders: have %d, need %d" \ |
---|
945 | % (len(self._share_buckets), self._verifycap.needed_shares) |
---|
946 | if self._share_buckets: |
---|
947 | raise NotEnoughSharesError(msg) |
---|
948 | else: |
---|
949 | raise NoSharesError(msg) |
---|
950 | |
---|
951 | #for s in self._share_vbuckets.values(): |
---|
952 | # for vb in s: |
---|
953 | # assert isinstance(vb, ValidatedReadBucketProxy), \ |
---|
954 | # "vb is %s but should be a ValidatedReadBucketProxy" % (vb,) |
---|
955 | |
---|
956 | def _obtain_uri_extension(self, ignored): |
---|
957 | # all shareholders are supposed to have a copy of uri_extension, and |
---|
958 | # all are supposed to be identical. We compute the hash of the data |
---|
959 | # that comes back, and compare it against the version in our URI. If |
---|
960 | # they don't match, ignore their data and try someone else. |
---|
961 | if self._status: |
---|
962 | self._status.set_status("Obtaining URI Extension") |
---|
963 | |
---|
964 | uri_extension_fetch_started = time.time() |
---|
965 | |
---|
966 | vups = [] |
---|
967 | for sharenum, bucket in self._share_buckets: |
---|
968 | vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) |
---|
969 | vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid) |
---|
970 | d = vto.start() |
---|
971 | |
---|
972 | def _got_uri_extension(vup): |
---|
973 | precondition(isinstance(vup, ValidatedExtendedURIProxy), vup) |
---|
974 | if self._results: |
---|
975 | elapsed = time.time() - uri_extension_fetch_started |
---|
976 | self._results.timings["uri_extension"] = elapsed |
---|
977 | |
---|
978 | self._vup = vup |
---|
979 | self._codec = codec.CRSDecoder() |
---|
980 | self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares) |
---|
981 | self._tail_codec = codec.CRSDecoder() |
---|
982 | self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares) |
---|
983 | |
---|
984 | self._current_segnum = 0 |
---|
985 | |
---|
986 | self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares) |
---|
987 | self._share_hash_tree.set_hashes({0: vup.share_root_hash}) |
---|
988 | |
---|
989 | self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments) |
---|
990 | self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash}) |
---|
991 | |
---|
992 | # Repairer (uploader) needs the encodingparams. |
---|
993 | self._target.set_encodingparams(( |
---|
994 | self._verifycap.needed_shares, |
---|
995 | self._verifycap.total_shares, # I don't think the target actually cares about "happy". |
---|
996 | self._verifycap.total_shares, |
---|
997 | self._vup.segment_size |
---|
998 | )) |
---|
999 | d.addCallback(_got_uri_extension) |
---|
1000 | return d |
---|
1001 | |
---|
1002 | def _get_crypttext_hash_tree(self, res): |
---|
1003 | vchtps = [] |
---|
1004 | for sharenum, bucket in self._share_buckets: |
---|
1005 | vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) |
---|
1006 | vchtps.append(vchtp) |
---|
1007 | |
---|
1008 | _get_crypttext_hash_tree_started = time.time() |
---|
1009 | if self._status: |
---|
1010 | self._status.set_status("Retrieving crypttext hash tree") |
---|
1011 | |
---|
1012 | vto = ValidatedThingObtainer(vchtps, debugname="vchtps", |
---|
1013 | log_id=self._parentmsgid) |
---|
1014 | d = vto.start() |
---|
1015 | |
---|
1016 | def _got_crypttext_hash_tree(res): |
---|
1017 | # Good -- the self._crypttext_hash_tree that we passed to vchtp |
---|
1018 | # is now populated with hashes. |
---|
1019 | if self._results: |
---|
1020 | elapsed = time.time() - _get_crypttext_hash_tree_started |
---|
1021 | self._results.timings["hashtrees"] = elapsed |
---|
1022 | d.addCallback(_got_crypttext_hash_tree) |
---|
1023 | return d |
---|
1024 | |
---|
1025 | def _activate_enough_buckets(self): |
---|
1026 | """either return a mapping from shnum to a ValidatedReadBucketProxy |
---|
1027 | that can provide data for that share, or raise NotEnoughSharesError""" |
---|
1028 | |
---|
1029 | while len(self.active_buckets) < self._verifycap.needed_shares: |
---|
1030 | # need some more |
---|
1031 | handled_shnums = set(self.active_buckets.keys()) |
---|
1032 | available_shnums = set(self._share_vbuckets.keys()) |
---|
1033 | potential_shnums = list(available_shnums - handled_shnums) |
---|
1034 | if len(potential_shnums) < (self._verifycap.needed_shares |
---|
1035 | - len(self.active_buckets)): |
---|
1036 | have = len(potential_shnums) + len(self.active_buckets) |
---|
1037 | msg = "Unable to activate enough shares: have %d, need %d" \ |
---|
1038 | % (have, self._verifycap.needed_shares) |
---|
1039 | if have: |
---|
1040 | raise NotEnoughSharesError(msg) |
---|
1041 | else: |
---|
1042 | raise NoSharesError(msg) |
---|
1043 | # For the next share, choose a primary share if available, else a |
---|
1044 | # randomly chosen secondary share. |
---|
1045 | potential_shnums.sort() |
---|
1046 | if potential_shnums[0] < self._verifycap.needed_shares: |
---|
1047 | shnum = potential_shnums[0] |
---|
1048 | else: |
---|
1049 | shnum = random.choice(potential_shnums) |
---|
1050 | # and a random bucket that will provide it |
---|
1051 | validated_bucket = random.choice(list(self._share_vbuckets[shnum])) |
---|
1052 | self.active_buckets[shnum] = validated_bucket |
---|
1053 | return self.active_buckets |
---|
1054 | |
---|
1055 | |
---|
1056 | def _download_all_segments(self, res): |
---|
1057 | for sharenum, bucket in self._share_buckets: |
---|
1058 | vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) |
---|
1059 | self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) |
---|
1060 | |
---|
1061 | # after the above code, self._share_vbuckets contains enough |
---|
1062 | # buckets to complete the download, and some extra ones to |
---|
1063 | # tolerate some buckets dropping out or having |
---|
1064 | # errors. self._share_vbuckets is a dictionary that maps from |
---|
1065 | # shnum to a set of ValidatedBuckets, which themselves are |
---|
1066 | # wrappers around RIBucketReader references. |
---|
1067 | self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance |
---|
1068 | |
---|
1069 | self._started_fetching = time.time() |
---|
1070 | |
---|
1071 | d = defer.succeed(None) |
---|
1072 | for segnum in range(self._vup.num_segments): |
---|
1073 | d.addCallback(self._download_segment, segnum) |
---|
1074 | # this pause, at the end of write, prevents pre-fetch from |
---|
1075 | # happening until the consumer is ready for more data. |
---|
1076 | d.addCallback(self._check_for_pause) |
---|
1077 | return d |
---|
1078 | |
---|
1079 | def _check_for_pause(self, res): |
---|
1080 | if self._paused: |
---|
1081 | d = defer.Deferred() |
---|
1082 | self._paused.addCallback(lambda ignored: d.callback(res)) |
---|
1083 | return d |
---|
1084 | if self._stopped: |
---|
1085 | raise DownloadStopped("our Consumer called stopProducing()") |
---|
1086 | self._monitor.raise_if_cancelled() |
---|
1087 | return res |
---|
1088 | |
---|
1089 | def _download_segment(self, res, segnum): |
---|
1090 | if self._status: |
---|
1091 | self._status.set_status("Downloading segment %d of %d" % |
---|
1092 | (segnum+1, self._vup.num_segments)) |
---|
1093 | self.log("downloading seg#%d of %d (%d%%)" |
---|
1094 | % (segnum, self._vup.num_segments, |
---|
1095 | 100.0 * segnum / self._vup.num_segments)) |
---|
1096 | # memory footprint: when the SegmentDownloader finishes pulling down |
---|
1097 | # all shares, we have 1*segment_size of usage. |
---|
1098 | segmentdler = SegmentDownloader(self, segnum, |
---|
1099 | self._verifycap.needed_shares, |
---|
1100 | self._results) |
---|
1101 | started = time.time() |
---|
1102 | d = segmentdler.start() |
---|
1103 | def _finished_fetching(res): |
---|
1104 | elapsed = time.time() - started |
---|
1105 | self._results.timings["cumulative_fetch"] += elapsed |
---|
1106 | return res |
---|
1107 | if self._results: |
---|
1108 | d.addCallback(_finished_fetching) |
---|
1109 | # pause before using more memory |
---|
1110 | d.addCallback(self._check_for_pause) |
---|
1111 | # while the codec does its job, we hit 2*segment_size |
---|
1112 | def _started_decode(res): |
---|
1113 | self._started_decode = time.time() |
---|
1114 | return res |
---|
1115 | if self._results: |
---|
1116 | d.addCallback(_started_decode) |
---|
1117 | if segnum + 1 == self._vup.num_segments: |
---|
1118 | codec = self._tail_codec |
---|
1119 | else: |
---|
1120 | codec = self._codec |
---|
1121 | d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids)) |
---|
1122 | # once the codec is done, we drop back to 1*segment_size, because |
---|
1123 | # 'shares' goes out of scope. The memory usage is all in the |
---|
1124 | # plaintext now, spread out into a bunch of tiny buffers. |
---|
1125 | def _finished_decode(res): |
---|
1126 | elapsed = time.time() - self._started_decode |
---|
1127 | self._results.timings["cumulative_decode"] += elapsed |
---|
1128 | return res |
---|
1129 | if self._results: |
---|
1130 | d.addCallback(_finished_decode) |
---|
1131 | |
---|
1132 | # pause/check-for-stop just before writing, to honor stopProducing |
---|
1133 | d.addCallback(self._check_for_pause) |
---|
1134 | d.addCallback(self._got_segment) |
---|
1135 | return d |
---|
1136 | |
---|
1137 | def _got_segment(self, buffers): |
---|
1138 | precondition(self._crypttext_hash_tree) |
---|
1139 | started_decrypt = time.time() |
---|
1140 | self._status.set_progress(float(self._current_segnum)/self._verifycap.size) |
---|
1141 | |
---|
1142 | if self._current_segnum + 1 == self._vup.num_segments: |
---|
1143 | # This is the last segment. |
---|
1144 | # Trim off any padding added by the upload side. We never send |
---|
1145 | # empty segments. If the data was an exact multiple of the |
---|
1146 | # segment size, the last segment will be full. |
---|
1147 | tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares) |
---|
1148 | num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size) |
---|
1149 | # Remove buffers which don't contain any part of the tail. |
---|
1150 | del buffers[num_buffers_used:] |
---|
1151 | # Remove the past-the-tail-part of the last buffer. |
---|
1152 | tail_in_last_buf = self._vup.tail_data_size % tail_buf_size |
---|
1153 | if tail_in_last_buf == 0: |
---|
1154 | tail_in_last_buf = tail_buf_size |
---|
1155 | buffers[-1] = buffers[-1][:tail_in_last_buf] |
---|
1156 | |
---|
1157 | # First compute the hash of this segment and check that it fits. |
---|
1158 | ch = hashutil.crypttext_segment_hasher() |
---|
1159 | for buffer in buffers: |
---|
1160 | self._ciphertext_hasher.update(buffer) |
---|
1161 | ch.update(buffer) |
---|
1162 | self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()}) |
---|
1163 | |
---|
1164 | # Then write this segment to the target. |
---|
1165 | if not self._opened: |
---|
1166 | self._opened = True |
---|
1167 | self._target.open(self._verifycap.size) |
---|
1168 | |
---|
1169 | for buffer in buffers: |
---|
1170 | self._target.write(buffer) |
---|
1171 | self._bytes_done += len(buffer) |
---|
1172 | |
---|
1173 | self._status.set_progress(float(self._bytes_done)/self._verifycap.size) |
---|
1174 | self._current_segnum += 1 |
---|
1175 | |
---|
1176 | if self._results: |
---|
1177 | elapsed = time.time() - started_decrypt |
---|
1178 | self._results.timings["cumulative_decrypt"] += elapsed |
---|
1179 | |
---|
1180 | def _done(self, res): |
---|
1181 | self.log("download done") |
---|
1182 | if self._results: |
---|
1183 | now = time.time() |
---|
1184 | self._results.timings["total"] = now - self._started |
---|
1185 | self._results.timings["segments"] = now - self._started_fetching |
---|
1186 | if self._vup.crypttext_hash: |
---|
1187 | _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(), |
---|
1188 | "bad crypttext_hash: computed=%s, expected=%s" % |
---|
1189 | (base32.b2a(self._ciphertext_hasher.digest()), |
---|
1190 | base32.b2a(self._vup.crypttext_hash))) |
---|
1191 | _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size) |
---|
1192 | self._status.set_progress(1) |
---|
1193 | self._target.close() |
---|
1194 | return self._target.finish() |
---|
1195 | def get_download_status(self): |
---|
1196 | return self._status |
---|
1197 | |
---|
1198 | |
---|
1199 | class ConsumerAdapter: |
---|
1200 | implements(IDownloadTarget, IConsumer) |
---|
1201 | def __init__(self, consumer): |
---|
1202 | self._consumer = consumer |
---|
1203 | |
---|
1204 | def registerProducer(self, producer, streaming): |
---|
1205 | self._consumer.registerProducer(producer, streaming) |
---|
1206 | def unregisterProducer(self): |
---|
1207 | self._consumer.unregisterProducer() |
---|
1208 | |
---|
1209 | def open(self, size): |
---|
1210 | pass |
---|
1211 | def write(self, data): |
---|
1212 | self._consumer.write(data) |
---|
1213 | def close(self): |
---|
1214 | pass |
---|
1215 | |
---|
1216 | def fail(self, why): |
---|
1217 | pass |
---|
1218 | def register_canceller(self, cb): |
---|
1219 | pass |
---|
1220 | def finish(self): |
---|
1221 | return self._consumer |
---|
1222 | # The following methods are just because the target might be a |
---|
1223 | # repairer.DownUpConnector, and just because the current CHKUpload object |
---|
1224 | # expects to find the storage index and encoding parameters in its |
---|
1225 | # Uploadable. |
---|
1226 | def set_storageindex(self, storageindex): |
---|
1227 | pass |
---|
1228 | def set_encodingparams(self, encodingparams): |
---|
1229 | pass |
---|
1230 | |
---|
1231 | |
---|
1232 | class Downloader: |
---|
1233 | """I am a service that allows file downloading. |
---|
1234 | """ |
---|
1235 | # TODO: in fact, this service only downloads immutable files (URI:CHK:). |
---|
1236 | # It is scheduled to go away, to be replaced by filenode.download() |
---|
1237 | implements(IDownloader) |
---|
1238 | |
---|
1239 | def __init__(self, storage_broker, stats_provider): |
---|
1240 | self.storage_broker = storage_broker |
---|
1241 | self.stats_provider = stats_provider |
---|
1242 | self._all_downloads = weakref.WeakKeyDictionary() # for debugging |
---|
1243 | |
---|
1244 | def download(self, u, t, _log_msg_id=None, monitor=None, history=None): |
---|
1245 | assert isinstance(u, uri.CHKFileURI) |
---|
1246 | t = IDownloadTarget(t) |
---|
1247 | assert t.write |
---|
1248 | assert t.close |
---|
1249 | |
---|
1250 | if self.stats_provider: |
---|
1251 | # these counters are meant for network traffic, and don't |
---|
1252 | # include LIT files |
---|
1253 | self.stats_provider.count('downloader.files_downloaded', 1) |
---|
1254 | self.stats_provider.count('downloader.bytes_downloaded', u.get_size()) |
---|
1255 | |
---|
1256 | target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id) |
---|
1257 | if not monitor: |
---|
1258 | monitor=Monitor() |
---|
1259 | dl = CiphertextDownloader(self.storage_broker, |
---|
1260 | u.get_verify_cap(), target, |
---|
1261 | monitor=monitor) |
---|
1262 | self._all_downloads[dl] = None |
---|
1263 | if history: |
---|
1264 | history.add_download(dl.get_download_status()) |
---|
1265 | d = dl.start() |
---|
1266 | return d |
---|