source: trunk/src/allmydata/immutable/downloader/node.py

Last change on this file was 62f59fb6, checked in by meejah <meejah@…>, at 2024-06-03T19:03:22Z

Merge branch 'master' into 4072-no-more-blocking-part-2

  • Property mode set to 100644
File size: 23.7 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import time
6now = time.time
7from zope.interface import Interface
8from twisted.python.failure import Failure
9from twisted.internet import defer
10from foolscap.api import eventually
11from allmydata import uri
12from allmydata.codec import CRSDecoder
13from allmydata.util import base32, log, hashutil, mathutil, observer
14from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
15from allmydata.hashtree import IncompleteHashTree, BadHashError, \
16     NotEnoughHashesError
17
18# local imports
19from .finder import ShareFinder
20from .fetcher import SegmentFetcher
21from .segmentation import Segmentation
22from .common import BadCiphertextHashError
23
24class IDownloadStatusHandlingConsumer(Interface):
25    def set_download_status_read_event(read_ev):
26        """Record the DownloadStatus 'read event', to be updated with the
27        time it takes to decrypt each chunk of data."""
28
29class Cancel(object):
30    def __init__(self, f):
31        self._f = f
32        self.active = True
33
34    def cancel(self):
35        if self.active:
36            self.active = False
37            self._f(self)
38
39
40class DownloadNode(object):
41    """Internal class which manages downloads and holds state. External
42    callers use CiphertextFileNode instead."""
43
44    default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
45
46    # Share._node points to me
47    def __init__(self, verifycap, storage_broker, secret_holder,
48                 terminator, history, download_status):
49        assert isinstance(verifycap, uri.CHKFileVerifierURI)
50        self._verifycap = verifycap
51        self._storage_broker = storage_broker
52        self._si_prefix = base32.b2a(verifycap.storage_index[:8])[:12]
53        self.running = True
54        if terminator:
55            terminator.register(self) # calls self.stop() at stopService()
56        # the rules are:
57        # 1: Only send network requests if you're active (self.running is True)
58        # 2: Use TimerService, not reactor.callLater
59        # 3: You can do eventual-sends any time.
60        # These rules should mean that once
61        # stopService()+flushEventualQueue() fires, everything will be done.
62        self._secret_holder = secret_holder
63        self._history = history
64        self._download_status = download_status
65
66        self.share_hash_tree = IncompleteHashTree(self._verifycap.total_shares)
67
68        # we guess the segment size, so Segmentation can pull non-initial
69        # segments in a single roundtrip. This populates
70        # .guessed_segment_size, .guessed_num_segments, and
71        # .ciphertext_hash_tree (with a dummy, to let us guess which hashes
72        # we'll need)
73        self._build_guessed_tables(self.default_max_segment_size)
74
75        # filled in when we parse a valid UEB
76        self.have_UEB = False
77        self.segment_size = None
78        self.tail_segment_size = None
79        self.tail_segment_padded = None
80        self.num_segments = None
81        self.block_size = None
82        self.tail_block_size = None
83
84        # things to track callers that want data
85
86        # _segment_requests can have duplicates
87        self._segment_requests = [] # (segnum, d, cancel_handle, seg_ev, lp)
88        self._active_segment = None # a SegmentFetcher, with .segnum
89
90        self._segsize_observers = observer.OneShotObserverList()
91
92        # we create one top-level logparent for this _Node, and another one
93        # for each read() call. Segmentation and get_segment() messages are
94        # associated with the read() call, everything else is tied to the
95        # _Node's log entry.
96        lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
97                     " size=%(size)d,"
98                     " guessed_segsize=%(guessed_segsize)d,"
99                     " guessed_numsegs=%(guessed_numsegs)d",
100                     si=self._si_prefix, size=verifycap.size,
101                     guessed_segsize=self.guessed_segment_size,
102                     guessed_numsegs=self.guessed_num_segments,
103                     level=log.OPERATIONAL, umid="uJ0zAQ")
104        self._lp = lp
105
106        self._sharefinder = ShareFinder(storage_broker, verifycap, self,
107                                        self._download_status, lp)
108        self._shares = set()
109
110    def _build_guessed_tables(self, max_segment_size):
111        size = min(self._verifycap.size, max_segment_size)
112        s = mathutil.next_multiple(size, self._verifycap.needed_shares)
113        self.guessed_segment_size = s
114        r = self._calculate_sizes(self.guessed_segment_size)
115        self.guessed_num_segments = r["num_segments"]
116        # as with CommonShare, our ciphertext_hash_tree is a stub until we
117        # get the real num_segments
118        self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
119        self.ciphertext_hash_tree_leaves = self.guessed_num_segments
120
121    def __repr__(self):
122        return "ImmutableDownloadNode(%r)" % (self._si_prefix,)
123
124    def stop(self):
125        # called by the Terminator at shutdown, mostly for tests
126        if self._active_segment:
127            seg, self._active_segment = self._active_segment, None
128            seg.stop()
129        self._sharefinder.stop()
130
131    # things called by outside callers, via CiphertextFileNode. get_segment()
132    # may also be called by Segmentation.
133
134    def read(self, consumer, offset, size):
135        """I am the main entry point, from which FileNode.read() can get
136        data. I feed the consumer with the desired range of ciphertext. I
137        return a Deferred that fires (with the consumer) when the read is
138        finished.
139
140        Note that there is no notion of a 'file pointer': each call to read()
141        uses an independent offset= value.
142        """
143        # for concurrent operations: each gets its own Segmentation manager
144        if size is None:
145            size = self._verifycap.size
146        # ignore overruns: clip size so offset+size does not go past EOF, and
147        # so size is not negative (which indicates that offset >= EOF)
148        size = max(0, min(size, self._verifycap.size-offset))
149
150        read_ev = self._download_status.add_read_event(offset, size, now())
151        if IDownloadStatusHandlingConsumer.providedBy(consumer):
152            consumer.set_download_status_read_event(read_ev)
153            consumer.set_download_status(self._download_status)
154
155        lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
156                     si=base32.b2a(self._verifycap.storage_index)[:8],
157                     offset=offset, size=size,
158                     level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
159        if self._history:
160            sp = self._history.stats_provider
161            sp.count("downloader.files_downloaded", 1) # really read() calls
162            sp.count("downloader.bytes_downloaded", size)
163        if size == 0:
164            read_ev.finished(now())
165            # no data, so no producer, so no register/unregisterProducer
166            return defer.succeed(consumer)
167
168        # for concurrent operations, each read() gets its own Segmentation
169        # manager
170        s = Segmentation(self, offset, size, consumer, read_ev, lp)
171
172        # this raises an interesting question: what segments to fetch? if
173        # offset=0, always fetch the first segment, and then allow
174        # Segmentation to be responsible for pulling the subsequent ones if
175        # the first wasn't large enough. If offset>0, we're going to need an
176        # extra roundtrip to get the UEB (and therefore the segment size)
177        # before we can figure out which segment to get. TODO: allow the
178        # offset-table-guessing code (which starts by guessing the segsize)
179        # to assist the offset>0 process.
180        d = s.start()
181        def _done(res):
182            read_ev.finished(now())
183            return res
184        d.addBoth(_done)
185        return d
186
187    def get_segment(self, segnum, logparent=None):
188        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
189        Deferred that fires with (offset,data) when the desired segment is
190        available, and c is an object on which c.cancel() can be called to
191        disavow interest in the segment (after which 'd' will never fire).
192
193        You probably need to know the segment size before calling this,
194        unless you want the first few bytes of the file. If you ask for a
195        segment number which turns out to be too large, the Deferred will
196        errback with BadSegmentNumberError.
197
198        The Deferred fires with the offset of the first byte of the data
199        segment, so that you can call get_segment() before knowing the
200        segment size, and still know which data you received.
201
202        The Deferred can also errback with other fatal problems, such as
203        NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
204        """
205        lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
206                     si=base32.b2a(self._verifycap.storage_index)[:8],
207                     segnum=segnum,
208                     level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
209        seg_ev = self._download_status.add_segment_request(segnum, now())
210        d = defer.Deferred()
211        c = Cancel(self._cancel_request)
212        self._segment_requests.append( (segnum, d, c, seg_ev, lp) )
213        self._start_new_segment()
214        return (d, c)
215
216    def get_segsize(self):
217        """Return a Deferred that fires when we know the real segment size."""
218        if self.segment_size:
219            return defer.succeed(self.segment_size)
220        # TODO: this downloads (and discards) the first segment of the file.
221        # We could make this more efficient by writing
222        # fetcher.SegmentSizeFetcher, with the job of finding a single valid
223        # share and extracting the UEB. We'd add Share.get_UEB() to request
224        # just the UEB.
225        (d,c) = self.get_segment(0)
226        # this ensures that an error during get_segment() will errback the
227        # caller, so Repair won't wait forever on completely missing files
228        d.addCallback(lambda ign: self._segsize_observers.when_fired())
229        return d
230
231    # things called by the Segmentation object used to transform
232    # arbitrary-sized read() calls into quantized segment fetches
233
234    def _start_new_segment(self):
235        if self._active_segment is None and self._segment_requests:
236            (segnum, d, c, seg_ev, lp) = self._segment_requests[0]
237            k = self._verifycap.needed_shares
238            log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
239                    node=repr(self), segnum=segnum,
240                    level=log.NOISY, parent=lp, umid="wAlnHQ")
241            self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
242            seg_ev.activate(now())
243            active_shares = [s for s in self._shares if s.is_alive()]
244            fetcher.add_shares(active_shares) # this triggers the loop
245
246
247    # called by our child ShareFinder
248    def got_shares(self, shares):
249        self._shares.update(shares)
250        if self._active_segment:
251            self._active_segment.add_shares(shares)
252    def no_more_shares(self):
253        self._no_more_shares = True
254        if self._active_segment:
255            self._active_segment.no_more_shares()
256
257    # things called by our Share instances
258
259    def validate_and_store_UEB(self, UEB_s):
260        log.msg("validate_and_store_UEB",
261                level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
262        h = hashutil.uri_extension_hash(UEB_s)
263        if h != self._verifycap.uri_extension_hash:
264            raise BadHashError
265        self._parse_and_store_UEB(UEB_s) # sets self._stuff
266        # TODO: a malformed (but authentic) UEB could throw an assertion in
267        # _parse_and_store_UEB, and we should abandon the download.
268        self.have_UEB = True
269
270        # inform the ShareFinder about our correct number of segments. This
271        # will update the block-hash-trees in all existing CommonShare
272        # instances, and will populate new ones with the correct value.
273        self._sharefinder.update_num_segments()
274
275    def _parse_and_store_UEB(self, UEB_s):
276        # Note: the UEB contains needed_shares and total_shares. These are
277        # redundant and inferior (the filecap contains the authoritative
278        # values). However, because it is possible to encode the same file in
279        # multiple ways, and the encoders might choose (poorly) to use the
280        # same key for both (therefore getting the same SI), we might
281        # encounter shares for both types. The UEB hashes will be different,
282        # however, and we'll disregard the "other" encoding's shares as
283        # corrupted.
284
285        # therefore, we ignore d['total_shares'] and d['needed_shares'].
286
287        d = uri.unpack_extension(UEB_s)
288
289        log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
290                ueb=repr(uri.unpack_extension_readable(UEB_s)),
291                vcap=self._verifycap.to_string(),
292                level=log.NOISY, parent=self._lp, umid="cVqZnA")
293
294        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
295
296        self.segment_size = d['segment_size']
297        self._segsize_observers.fire(self.segment_size)
298
299        r = self._calculate_sizes(self.segment_size)
300        self.tail_segment_size = r["tail_segment_size"]
301        self.tail_segment_padded = r["tail_segment_padded"]
302        self.num_segments = r["num_segments"]
303        self.block_size = r["block_size"]
304        self.tail_block_size = r["tail_block_size"]
305        log.msg("actual sizes: %s" % (r,),
306                level=log.NOISY, parent=self._lp, umid="PY6P5Q")
307        if (self.segment_size == self.guessed_segment_size
308            and self.num_segments == self.guessed_num_segments):
309            log.msg("my guess was right!",
310                    level=log.NOISY, parent=self._lp, umid="x340Ow")
311        else:
312            log.msg("my guess was wrong! Extra round trips for me.",
313                    level=log.NOISY, parent=self._lp, umid="tb7RJw")
314
315        # zfec.Decode() instantiation is fast, but still, let's use the same
316        # codec instance for all but the last segment. 3-of-10 takes 15us on
317        # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
318        # 2.5ms, worst-case 254-of-255 is 9.3ms
319        self._codec = CRSDecoder()
320        self._codec.set_params(self.segment_size, k, N)
321
322
323        # Ciphertext hash tree root is mandatory, so that there is at most
324        # one ciphertext that matches this read-cap or verify-cap. The
325        # integrity check on the shares is not sufficient to prevent the
326        # original encoder from creating some shares of file A and other
327        # shares of file B. self.ciphertext_hash_tree was a guess before:
328        # this is where we create it for real.
329        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
330        self.ciphertext_hash_tree_leaves = self.num_segments
331        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
332
333        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
334
335        # Our job is a fast download, not verification, so we ignore any
336        # redundant fields. The Verifier uses a different code path which
337        # does not ignore them.
338
339    def _calculate_sizes(self, segment_size):
340        # segments of ciphertext
341        size = self._verifycap.size
342        k = self._verifycap.needed_shares
343
344        # this assert matches the one in encode.py:127 inside
345        # Encoded._got_all_encoding_parameters, where the UEB is constructed
346        assert segment_size % k == 0
347
348        # the last segment is usually short. We don't store a whole segsize,
349        # but we do pad the segment up to a multiple of k, because the
350        # encoder requires that.
351        tail_segment_size = size % segment_size
352        if tail_segment_size == 0:
353            tail_segment_size = segment_size
354        padded = mathutil.next_multiple(tail_segment_size, k)
355        tail_segment_padded = padded
356
357        num_segments = mathutil.div_ceil(size, segment_size)
358
359        # each segment is turned into N blocks. All but the last are of size
360        # block_size, and the last is of size tail_block_size
361        block_size = segment_size // k
362        tail_block_size = tail_segment_padded // k
363
364        return { "tail_segment_size": tail_segment_size,
365                 "tail_segment_padded": tail_segment_padded,
366                 "num_segments": num_segments,
367                 "block_size": block_size,
368                 "tail_block_size": tail_block_size
369                 }
370
371
372    def process_share_hashes(self, share_hashes):
373        for hashnum in share_hashes:
374            if hashnum >= len(self.share_hash_tree):
375                # "BadHashError" is normally for e.g. a corrupt block. We
376                # sort of abuse it here to mean a badly numbered hash (which
377                # indicates corruption in the number bytes, rather than in
378                # the data bytes).
379                raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
380                                   % (hashnum, len(self.share_hash_tree)))
381        self.share_hash_tree.set_hashes(share_hashes)
382
383    def get_desired_ciphertext_hashes(self, segnum):
384        if segnum < self.ciphertext_hash_tree_leaves:
385            return self.ciphertext_hash_tree.needed_hashes(segnum,
386                                                           include_leaf=True)
387        return []
388    def get_needed_ciphertext_hashes(self, segnum):
389        cht = self.ciphertext_hash_tree
390        return cht.needed_hashes(segnum, include_leaf=True)
391
392    def process_ciphertext_hashes(self, hashes):
393        assert self.num_segments is not None
394        # this may raise BadHashError or NotEnoughHashesError
395        self.ciphertext_hash_tree.set_hashes(hashes)
396
397
398    # called by our child SegmentFetcher
399
400    def want_more_shares(self):
401        self._sharefinder.hungry()
402
403    def fetch_failed(self, sf, f):
404        assert sf is self._active_segment
405        self._active_segment = None
406        # deliver error upwards
407        for (d,c,seg_ev) in self._extract_requests(sf.segnum):
408            seg_ev.error(now())
409            eventually(self._deliver, d, c, f)
410        self._start_new_segment()
411
412    def process_blocks(self, segnum, blocks):
413        start = now()
414        d = self._decode_blocks(segnum, blocks)
415        d.addCallback(self._check_ciphertext_hash, segnum)
416        def _deliver(result):
417            log.msg(format="delivering segment(%(segnum)d)",
418                    segnum=segnum,
419                    level=log.OPERATIONAL, parent=self._lp,
420                    umid="j60Ojg")
421            when = now()
422            if isinstance(result, Failure):
423                # this catches failures in decode or ciphertext hash
424                for (d,c,seg_ev) in self._extract_requests(segnum):
425                    seg_ev.error(when)
426                    eventually(self._deliver, d, c, result)
427            else:
428                (offset, segment, decodetime) = result
429                self._active_segment = None
430                for (d,c,seg_ev) in self._extract_requests(segnum):
431                    # when we have two requests for the same segment, the
432                    # second one will not be "activated" before the data is
433                    # delivered, so to allow the status-reporting code to see
434                    # consistent behavior, we activate them all now. The
435                    # SegmentEvent will ignore duplicate activate() calls.
436                    # Note that this will result in an inaccurate "receive
437                    # speed" for the second request.
438                    seg_ev.activate(when)
439                    seg_ev.deliver(when, offset, len(segment), decodetime)
440                    eventually(self._deliver, d, c, result)
441            self._download_status.add_misc_event("process_block", start, now())
442            self._start_new_segment()
443        d.addBoth(_deliver)
444        d.addErrback(log.err, "unhandled error during process_blocks",
445                     level=log.WEIRD, parent=self._lp, umid="MkEsCg")
446
447    def _decode_blocks(self, segnum, blocks):
448        start = now()
449        tail = (segnum == self.num_segments-1)
450        codec = self._codec
451        block_size = self.block_size
452        decoded_size = self.segment_size
453        if tail:
454            # account for the padding in the last segment
455            codec = CRSDecoder()
456            k, N = self._verifycap.needed_shares, self._verifycap.total_shares
457            codec.set_params(self.tail_segment_padded, k, N)
458            block_size = self.tail_block_size
459            decoded_size = self.tail_segment_padded
460
461        shares = []
462        shareids = []
463        for (shareid, share) in blocks.items():
464            assert len(share) == block_size
465            shareids.append(shareid)
466            shares.append(share)
467        del blocks
468
469        d = codec.decode(shares, shareids)   # segment
470        del shares
471        def _process(buffers):
472            decodetime = now() - start
473            segment = b"".join(buffers)
474            assert len(segment) == decoded_size
475            del buffers
476            if tail:
477                segment = segment[:self.tail_segment_size]
478            self._download_status.add_misc_event("decode", start, now())
479            return (segment, decodetime)
480        d.addCallback(_process)
481        return d
482
483    def _check_ciphertext_hash(self, segment_and_decodetime, segnum):
484        (segment, decodetime) = segment_and_decodetime
485        start = now()
486        assert self._active_segment.segnum == segnum
487        assert self.segment_size is not None
488        offset = segnum * self.segment_size
489
490        h = hashutil.crypttext_segment_hash(segment)
491        try:
492            self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
493            self._download_status.add_misc_event("CThash", start, now())
494            return (offset, segment, decodetime)
495        except (BadHashError, NotEnoughHashesError):
496            format = ("hash failure in ciphertext_hash_tree:"
497                      " segnum=%(segnum)d, SI=%(si)r")
498            log.msg(format=format, segnum=segnum, si=self._si_prefix,
499                    failure=Failure(),
500                    level=log.WEIRD, parent=self._lp, umid="MTwNnw")
501            # this is especially weird, because we made it past the share
502            # hash tree. It implies that we're using the wrong encoding, or
503            # that the uploader deliberately constructed a bad UEB.
504            msg = format % {"segnum": segnum, "si": self._si_prefix}
505            raise BadCiphertextHashError(msg)
506
507    def _deliver(self, d, c, result):
508        # this method exists to handle cancel() that occurs between
509        # _got_segment and _deliver
510        if c.active:
511            c.active = False # it is now too late to cancel
512            d.callback(result) # might actually be an errback
513
514    def _extract_requests(self, segnum):
515        """Remove matching requests and return their (d,c) tuples so that the
516        caller can retire them."""
517        retire = [(d,c,seg_ev)
518                  for (segnum0,d,c,seg_ev,lp) in self._segment_requests
519                  if segnum0 == segnum]
520        self._segment_requests = [t for t in self._segment_requests
521                                  if t[0] != segnum]
522        return retire
523
524    def _cancel_request(self, cancel):
525        self._segment_requests = [t for t in self._segment_requests
526                                  if t[2] != cancel]
527        segnums = [segnum for (segnum,d,c,seg_ev,lp) in self._segment_requests]
528
529        # self._active_segment might be None in rare circumstances, so make
530        # sure we tolerate it
531        if self._active_segment and self._active_segment.segnum not in segnums:
532            seg, self._active_segment = self._active_segment, None
533            seg.stop()
534            self._start_new_segment()
535
536    # called by ShareFinder to choose hashtree sizes in CommonShares, and by
537    # SegmentFetcher to tell if it is still fetching a valid segnum.
538    def get_num_segments(self):
539        # returns (best_num_segments, authoritative)
540        if self.num_segments is None:
541            return (self.guessed_num_segments, False)
542        return (self.num_segments, True)
Note: See TracBrowser for help on using the repository browser.