source: trunk/src/allmydata/immutable/download.py @ 731d15e

Last change on this file since 731d15e was 731d15e, checked in by Brian Warner <warner@…>, at 2010-01-14T22:15:29Z

hush pyflakes-0.4.0 warnings: remove trivial unused variables. For #900.

  • Property mode set to 100644
File size: 54.1 KB
Line 
1import random, weakref, itertools, time
2from zope.interface import implements
3from twisted.internet import defer
4from twisted.internet.interfaces import IPushProducer, IConsumer
5from foolscap.api import DeadReferenceError, RemoteException, eventually
6
7from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8from allmydata.util.assertutil import _assert, precondition
9from allmydata import codec, hashtree, uri
10from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \
11     IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
12     IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
13     UnableToFetchCriticalDownloadDataError
14from allmydata.immutable import layout
15from allmydata.monitor import Monitor
16from pycryptopp.cipher.aes import AES
17
18class IntegrityCheckReject(Exception):
19    pass
20
21class BadURIExtensionHashValue(IntegrityCheckReject):
22    pass
23class BadURIExtension(IntegrityCheckReject):
24    pass
25class UnsupportedErasureCodec(BadURIExtension):
26    pass
27class BadCrypttextHashValue(IntegrityCheckReject):
28    pass
29class BadOrMissingHash(IntegrityCheckReject):
30    pass
31
32class DownloadStopped(Exception):
33    pass
34
35class DownloadResults:
36    implements(IDownloadResults)
37
38    def __init__(self):
39        self.servers_used = set()
40        self.server_problems = {}
41        self.servermap = {}
42        self.timings = {}
43        self.file_size = None
44
45class DecryptingTarget(log.PrefixingLogMixin):
46    implements(IDownloadTarget, IConsumer)
47    def __init__(self, target, key, _log_msg_id=None):
48        precondition(IDownloadTarget.providedBy(target), target)
49        self.target = target
50        self._decryptor = AES(key)
51        prefix = str(target)
52        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
53    # methods to satisfy the IConsumer interface
54    def registerProducer(self, producer, streaming):
55        if IConsumer.providedBy(self.target):
56            self.target.registerProducer(producer, streaming)
57    def unregisterProducer(self):
58        if IConsumer.providedBy(self.target):
59            self.target.unregisterProducer()
60    def write(self, ciphertext):
61        plaintext = self._decryptor.process(ciphertext)
62        self.target.write(plaintext)
63    def open(self, size):
64        self.target.open(size)
65    def close(self):
66        self.target.close()
67    def finish(self):
68        return self.target.finish()
69    # The following methods is just to pass through to the next target, and
70    # just because that target might be a repairer.DownUpConnector, and just
71    # because the current CHKUpload object expects to find the storage index
72    # in its Uploadable.
73    def set_storageindex(self, storageindex):
74        self.target.set_storageindex(storageindex)
75    def set_encodingparams(self, encodingparams):
76        self.target.set_encodingparams(encodingparams)
77
78class ValidatedThingObtainer:
79    def __init__(self, validatedthingproxies, debugname, log_id):
80        self._validatedthingproxies = validatedthingproxies
81        self._debugname = debugname
82        self._log_id = log_id
83
84    def _bad(self, f, validatedthingproxy):
85        f.trap(RemoteException, DeadReferenceError,
86               IntegrityCheckReject, layout.LayoutInvalid,
87               layout.ShareVersionIncompatible)
88        level = log.WEIRD
89        if f.check(DeadReferenceError):
90            level = log.UNUSUAL
91        elif f.check(RemoteException):
92            level = log.WEIRD
93        else:
94            level = log.SCARY
95        log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96                format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97                op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98                failure=f, level=level, umid="JGXxBA")
99        if not self._validatedthingproxies:
100            raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101        # try again with a different one
102        d = self._try_the_next_one()
103        return d
104
105    def _try_the_next_one(self):
106        vtp = self._validatedthingproxies.pop(0)
107        # start() obtains, validates, and callsback-with the thing or else
108        # errbacks
109        d = vtp.start()
110        d.addErrback(self._bad, vtp)
111        return d
112
113    def start(self):
114        return self._try_the_next_one()
115
116class ValidatedCrypttextHashTreeProxy:
117    implements(IValidatedThingProxy)
118    """ I am a front-end for a remote crypttext hash tree using a local
119    ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
120    Validated Thing protocol (i.e., I have a start() method that fires with
121    self once I get a valid one)."""
122    def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
123                 fetch_failures=None):
124        # fetch_failures is for debugging -- see test_encode.py
125        self._readbucketproxy = readbucketproxy
126        self._num_segments = num_segments
127        self._fetch_failures = fetch_failures
128        self._crypttext_hash_tree = crypttext_hash_tree
129
130    def _validate(self, proposal):
131        ct_hashes = dict(list(enumerate(proposal)))
132        try:
133            self._crypttext_hash_tree.set_hashes(ct_hashes)
134        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
135            if self._fetch_failures is not None:
136                self._fetch_failures["crypttext_hash_tree"] += 1
137            raise BadOrMissingHash(le)
138        # If we now have enough of the crypttext hash tree to integrity-check
139        # *any* segment of ciphertext, then we are done. TODO: It would have
140        # better alacrity if we downloaded only part of the crypttext hash
141        # tree at a time.
142        for segnum in range(self._num_segments):
143            if self._crypttext_hash_tree.needed_hashes(segnum):
144                raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
145        return self
146
147    def start(self):
148        d = self._readbucketproxy.get_crypttext_hashes()
149        d.addCallback(self._validate)
150        return d
151
152class ValidatedExtendedURIProxy:
153    implements(IValidatedThingProxy)
154    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
155    responsible for retrieving and validating the elements from the UEB."""
156
157    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
158        # fetch_failures is for debugging -- see test_encode.py
159        self._fetch_failures = fetch_failures
160        self._readbucketproxy = readbucketproxy
161        precondition(IVerifierURI.providedBy(verifycap), verifycap)
162        self._verifycap = verifycap
163
164        # required
165        self.segment_size = None
166        self.crypttext_root_hash = None
167        self.share_root_hash = None
168
169        # computed
170        self.block_size = None
171        self.share_size = None
172        self.num_segments = None
173        self.tail_data_size = None
174        self.tail_segment_size = None
175
176        # optional
177        self.crypttext_hash = None
178
179    def __str__(self):
180        return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
181
182    def _check_integrity(self, data):
183        h = hashutil.uri_extension_hash(data)
184        if h != self._verifycap.uri_extension_hash:
185            msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
186                   (self._readbucketproxy,
187                    base32.b2a(self._verifycap.uri_extension_hash),
188                    base32.b2a(h)))
189            if self._fetch_failures is not None:
190                self._fetch_failures["uri_extension"] += 1
191            raise BadURIExtensionHashValue(msg)
192        else:
193            return data
194
195    def _parse_and_validate(self, data):
196        self.share_size = mathutil.div_ceil(self._verifycap.size,
197                                            self._verifycap.needed_shares)
198
199        d = uri.unpack_extension(data)
200
201        # There are several kinds of things that can be found in a UEB.
202        # First, things that we really need to learn from the UEB in order to
203        # do this download. Next: things which are optional but not redundant
204        # -- if they are present in the UEB they will get used. Next, things
205        # that are optional and redundant. These things are required to be
206        # consistent: they don't have to be in the UEB, but if they are in
207        # the UEB then they will be checked for consistency with the
208        # already-known facts, and if they are inconsistent then an exception
209        # will be raised. These things aren't actually used -- they are just
210        # tested for consistency and ignored. Finally: things which are
211        # deprecated -- they ought not be in the UEB at all, and if they are
212        # present then a warning will be logged but they are otherwise
213        # ignored.
214
215        # First, things that we really need to learn from the UEB:
216        # segment_size, crypttext_root_hash, and share_root_hash.
217        self.segment_size = d['segment_size']
218
219        self.block_size = mathutil.div_ceil(self.segment_size,
220                                            self._verifycap.needed_shares)
221        self.num_segments = mathutil.div_ceil(self._verifycap.size,
222                                              self.segment_size)
223
224        self.tail_data_size = self._verifycap.size % self.segment_size
225        if not self.tail_data_size:
226            self.tail_data_size = self.segment_size
227        # padding for erasure code
228        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
229                                                        self._verifycap.needed_shares)
230
231        # Ciphertext hash tree root is mandatory, so that there is at most
232        # one ciphertext that matches this read-cap or verify-cap. The
233        # integrity check on the shares is not sufficient to prevent the
234        # original encoder from creating some shares of file A and other
235        # shares of file B.
236        self.crypttext_root_hash = d['crypttext_root_hash']
237
238        self.share_root_hash = d['share_root_hash']
239
240
241        # Next: things that are optional and not redundant: crypttext_hash
242        if d.has_key('crypttext_hash'):
243            self.crypttext_hash = d['crypttext_hash']
244            if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
245                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
246
247
248        # Next: things that are optional, redundant, and required to be
249        # consistent: codec_name, codec_params, tail_codec_params,
250        # num_segments, size, needed_shares, total_shares
251        if d.has_key('codec_name'):
252            if d['codec_name'] != "crs":
253                raise UnsupportedErasureCodec(d['codec_name'])
254
255        if d.has_key('codec_params'):
256            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
257            if ucpss != self.segment_size:
258                raise BadURIExtension("inconsistent erasure code params: "
259                                      "ucpss: %s != self.segment_size: %s" %
260                                      (ucpss, self.segment_size))
261            if ucpns != self._verifycap.needed_shares:
262                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
263                                      "self._verifycap.needed_shares: %s" %
264                                      (ucpns, self._verifycap.needed_shares))
265            if ucpts != self._verifycap.total_shares:
266                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
267                                      "self._verifycap.total_shares: %s" %
268                                      (ucpts, self._verifycap.total_shares))
269
270        if d.has_key('tail_codec_params'):
271            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
272            if utcpss != self.tail_segment_size:
273                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
274                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
275                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
276                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
277                                         self.segment_size, self._verifycap.needed_shares))
278            if utcpns != self._verifycap.needed_shares:
279                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
280                                      "self._verifycap.needed_shares: %s" % (utcpns,
281                                                                             self._verifycap.needed_shares))
282            if utcpts != self._verifycap.total_shares:
283                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
284                                      "self._verifycap.total_shares: %s" % (utcpts,
285                                                                            self._verifycap.total_shares))
286
287        if d.has_key('num_segments'):
288            if d['num_segments'] != self.num_segments:
289                raise BadURIExtension("inconsistent num_segments: size: %s, "
290                                      "segment_size: %s, computed_num_segments: %s, "
291                                      "ueb_num_segments: %s" % (self._verifycap.size,
292                                                                self.segment_size,
293                                                                self.num_segments, d['num_segments']))
294
295        if d.has_key('size'):
296            if d['size'] != self._verifycap.size:
297                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
298                                      (self._verifycap.size, d['size']))
299
300        if d.has_key('needed_shares'):
301            if d['needed_shares'] != self._verifycap.needed_shares:
302                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
303                                      "needed shares: %s" % (self._verifycap.total_shares,
304                                                             d['needed_shares']))
305
306        if d.has_key('total_shares'):
307            if d['total_shares'] != self._verifycap.total_shares:
308                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
309                                      "total shares: %s" % (self._verifycap.total_shares,
310                                                            d['total_shares']))
311
312        # Finally, things that are deprecated and ignored: plaintext_hash,
313        # plaintext_root_hash
314        if d.get('plaintext_hash'):
315            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
316                    "and is no longer used.  Ignoring.  %s" % (self,))
317        if d.get('plaintext_root_hash'):
318            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
319                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
320
321        return self
322
323    def start(self):
324        """Fetch the UEB from bucket, compare its hash to the hash from
325        verifycap, then parse it. Returns a deferred which is called back
326        with self once the fetch is successful, or is erred back if it
327        fails."""
328        d = self._readbucketproxy.get_uri_extension()
329        d.addCallback(self._check_integrity)
330        d.addCallback(self._parse_and_validate)
331        return d
332
333class ValidatedReadBucketProxy(log.PrefixingLogMixin):
334    """I am a front-end for a remote storage bucket, responsible for
335    retrieving and validating data from that bucket.
336
337    My get_block() method is used by BlockDownloaders.
338    """
339
340    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
341                 block_size, share_size):
342        """ share_hash_tree is required to have already been initialized with
343        the root hash (the number-0 hash), using the share_root_hash from the
344        UEB"""
345        precondition(share_hash_tree[0] is not None, share_hash_tree)
346        prefix = "%d-%s-%s" % (sharenum, bucket,
347                               base32.b2a_l(share_hash_tree[0][:8], 60))
348        log.PrefixingLogMixin.__init__(self,
349                                       facility="tahoe.immutable.download",
350                                       prefix=prefix)
351        self.sharenum = sharenum
352        self.bucket = bucket
353        self.share_hash_tree = share_hash_tree
354        self.num_blocks = num_blocks
355        self.block_size = block_size
356        self.share_size = share_size
357        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
358
359    def get_all_sharehashes(self):
360        """Retrieve and validate all the share-hash-tree nodes that are
361        included in this share, regardless of whether we need them to
362        validate the share or not. Each share contains a minimal Merkle tree
363        chain, but there is lots of overlap, so usually we'll be using hashes
364        from other shares and not reading every single hash from this share.
365        The Verifier uses this function to read and validate every single
366        hash from this share.
367
368        Call this (and wait for the Deferred it returns to fire) before
369        calling get_block() for the first time: this lets us check that the
370        share share contains enough hashes to validate its own data, and
371        avoids downloading any share hash twice.
372
373        I return a Deferred which errbacks upon failure, probably with
374        BadOrMissingHash."""
375
376        d = self.bucket.get_share_hashes()
377        def _got_share_hashes(sh):
378            sharehashes = dict(sh)
379            try:
380                self.share_hash_tree.set_hashes(sharehashes)
381            except IndexError, le:
382                raise BadOrMissingHash(le)
383            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
384                raise BadOrMissingHash(le)
385        d.addCallback(_got_share_hashes)
386        return d
387
388    def get_all_blockhashes(self):
389        """Retrieve and validate all the block-hash-tree nodes that are
390        included in this share. Each share contains a full Merkle tree, but
391        we usually only fetch the minimal subset necessary for any particular
392        block. This function fetches everything at once. The Verifier uses
393        this function to validate the block hash tree.
394
395        Call this (and wait for the Deferred it returns to fire) after
396        calling get_all_sharehashes() and before calling get_block() for the
397        first time: this lets us check that the share contains all block
398        hashes and avoids downloading them multiple times.
399
400        I return a Deferred which errbacks upon failure, probably with
401        BadOrMissingHash.
402        """
403
404        # get_block_hashes(anything) currently always returns everything
405        needed = list(range(len(self.block_hash_tree)))
406        d = self.bucket.get_block_hashes(needed)
407        def _got_block_hashes(blockhashes):
408            if len(blockhashes) < len(self.block_hash_tree):
409                raise BadOrMissingHash()
410            bh = dict(enumerate(blockhashes))
411
412            try:
413                self.block_hash_tree.set_hashes(bh)
414            except IndexError, le:
415                raise BadOrMissingHash(le)
416            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
417                raise BadOrMissingHash(le)
418        d.addCallback(_got_block_hashes)
419        return d
420
421    def get_all_crypttext_hashes(self, crypttext_hash_tree):
422        """Retrieve and validate all the crypttext-hash-tree nodes that are
423        in this share. Normally we don't look at these at all: the download
424        process fetches them incrementally as needed to validate each segment
425        of ciphertext. But this is a convenient place to give the Verifier a
426        function to validate all of these at once.
427
428        Call this with a new hashtree object for each share, initialized with
429        the crypttext hash tree root. I return a Deferred which errbacks upon
430        failure, probably with BadOrMissingHash.
431        """
432
433        # get_crypttext_hashes() always returns everything
434        d = self.bucket.get_crypttext_hashes()
435        def _got_crypttext_hashes(hashes):
436            if len(hashes) < len(crypttext_hash_tree):
437                raise BadOrMissingHash()
438            ct_hashes = dict(enumerate(hashes))
439            try:
440                crypttext_hash_tree.set_hashes(ct_hashes)
441            except IndexError, le:
442                raise BadOrMissingHash(le)
443            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
444                raise BadOrMissingHash(le)
445        d.addCallback(_got_crypttext_hashes)
446        return d
447
448    def get_block(self, blocknum):
449        # the first time we use this bucket, we need to fetch enough elements
450        # of the share hash tree to validate it from our share hash up to the
451        # hashroot.
452        if self.share_hash_tree.needed_hashes(self.sharenum):
453            d1 = self.bucket.get_share_hashes()
454        else:
455            d1 = defer.succeed([])
456
457        # We might need to grab some elements of our block hash tree, to
458        # validate the requested block up to the share hash.
459        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
460        # We don't need the root of the block hash tree, as that comes in the
461        # share tree.
462        blockhashesneeded.discard(0)
463        d2 = self.bucket.get_block_hashes(blockhashesneeded)
464
465        if blocknum < self.num_blocks-1:
466            thisblocksize = self.block_size
467        else:
468            thisblocksize = self.share_size % self.block_size
469            if thisblocksize == 0:
470                thisblocksize = self.block_size
471        d3 = self.bucket.get_block_data(blocknum,
472                                        self.block_size, thisblocksize)
473
474        dl = deferredutil.gatherResults([d1, d2, d3])
475        dl.addCallback(self._got_data, blocknum)
476        return dl
477
478    def _got_data(self, results, blocknum):
479        precondition(blocknum < self.num_blocks,
480                     self, blocknum, self.num_blocks)
481        sharehashes, blockhashes, blockdata = results
482        try:
483            sharehashes = dict(sharehashes)
484        except ValueError, le:
485            le.args = tuple(le.args + (sharehashes,))
486            raise
487        blockhashes = dict(enumerate(blockhashes))
488
489        candidate_share_hash = None # in case we log it in the except block below
490        blockhash = None # in case we log it in the except block below
491
492        try:
493            if self.share_hash_tree.needed_hashes(self.sharenum):
494                # This will raise exception if the values being passed do not
495                # match the root node of self.share_hash_tree.
496                try:
497                    self.share_hash_tree.set_hashes(sharehashes)
498                except IndexError, le:
499                    # Weird -- sharehashes contained index numbers outside of
500                    # the range that fit into this hash tree.
501                    raise BadOrMissingHash(le)
502
503            # To validate a block we need the root of the block hash tree,
504            # which is also one of the leafs of the share hash tree, and is
505            # called "the share hash".
506            if not self.block_hash_tree[0]: # empty -- no root node yet
507                # Get the share hash from the share hash tree.
508                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
509                if not share_hash:
510                    # No root node in block_hash_tree and also the share hash
511                    # wasn't sent by the server.
512                    raise hashtree.NotEnoughHashesError
513                self.block_hash_tree.set_hashes({0: share_hash})
514
515            if self.block_hash_tree.needed_hashes(blocknum):
516                self.block_hash_tree.set_hashes(blockhashes)
517
518            blockhash = hashutil.block_hash(blockdata)
519            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
520            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
521            #        "%r .. %r: %s" %
522            #        (self.sharenum, blocknum, len(blockdata),
523            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
524
525        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
526            # log.WEIRD: indicates undetected disk/network error, or more
527            # likely a programming error
528            self.log("hash failure in block=%d, shnum=%d on %s" %
529                    (blocknum, self.sharenum, self.bucket))
530            if self.block_hash_tree.needed_hashes(blocknum):
531                self.log(""" failure occurred when checking the block_hash_tree.
532                This suggests that either the block data was bad, or that the
533                block hashes we received along with it were bad.""")
534            else:
535                self.log(""" the failure probably occurred when checking the
536                share_hash_tree, which suggests that the share hashes we
537                received from the remote peer were bad.""")
538            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
539            self.log(" block length: %d" % len(blockdata))
540            self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
541            if len(blockdata) < 100:
542                self.log(" block data: %r" % (blockdata,))
543            else:
544                self.log(" block data start/end: %r .. %r" %
545                        (blockdata[:50], blockdata[-50:]))
546            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
547            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
548            lines = []
549            for i,h in sorted(sharehashes.items()):
550                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
551            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
552            lines = []
553            for i,h in blockhashes.items():
554                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
555            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
556            raise BadOrMissingHash(le)
557
558        # If we made it here, the block is good. If the hash trees didn't
559        # like what they saw, they would have raised a BadHashError, causing
560        # our caller to see a Failure and thus ignore this block (as well as
561        # dropping this bucket).
562        return blockdata
563
564
565
566class BlockDownloader(log.PrefixingLogMixin):
567    """I am responsible for downloading a single block (from a single bucket)
568    for a single segment.
569
570    I am a child of the SegmentDownloader.
571    """
572
573    def __init__(self, vbucket, blocknum, parent, results):
574        precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
575        prefix = "%s-%d" % (vbucket, blocknum)
576        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
577        self.vbucket = vbucket
578        self.blocknum = blocknum
579        self.parent = parent
580        self.results = results
581
582    def start(self, segnum):
583        self.log("get_block(segnum=%d)" % segnum)
584        started = time.time()
585        d = self.vbucket.get_block(segnum)
586        d.addCallbacks(self._hold_block, self._got_block_error,
587                       callbackArgs=(started,))
588        return d
589
590    def _hold_block(self, data, started):
591        if self.results:
592            elapsed = time.time() - started
593            peerid = self.vbucket.bucket.get_peerid()
594            if peerid not in self.results.timings["fetch_per_server"]:
595                self.results.timings["fetch_per_server"][peerid] = []
596            self.results.timings["fetch_per_server"][peerid].append(elapsed)
597        self.log("got block")
598        self.parent.hold_block(self.blocknum, data)
599
600    def _got_block_error(self, f):
601        f.trap(RemoteException, DeadReferenceError,
602               IntegrityCheckReject, layout.LayoutInvalid,
603               layout.ShareVersionIncompatible)
604        if f.check(RemoteException, DeadReferenceError):
605            level = log.UNUSUAL
606        else:
607            level = log.WEIRD
608        self.log("failure to get block", level=level, umid="5Z4uHQ")
609        if self.results:
610            peerid = self.vbucket.bucket.get_peerid()
611            self.results.server_problems[peerid] = str(f)
612        self.parent.bucket_failed(self.vbucket)
613
614class SegmentDownloader:
615    """I am responsible for downloading all the blocks for a single segment
616    of data.
617
618    I am a child of the CiphertextDownloader.
619    """
620
621    def __init__(self, parent, segmentnumber, needed_shares, results):
622        self.parent = parent
623        self.segmentnumber = segmentnumber
624        self.needed_blocks = needed_shares
625        self.blocks = {} # k: blocknum, v: data
626        self.results = results
627        self._log_number = self.parent.log("starting segment %d" %
628                                           segmentnumber)
629
630    def log(self, *args, **kwargs):
631        if "parent" not in kwargs:
632            kwargs["parent"] = self._log_number
633        return self.parent.log(*args, **kwargs)
634
635    def start(self):
636        return self._download()
637
638    def _download(self):
639        d = self._try()
640        def _done(res):
641            if len(self.blocks) >= self.needed_blocks:
642                # we only need self.needed_blocks blocks
643                # we want to get the smallest blockids, because they are
644                # more likely to be fast "primary blocks"
645                blockids = sorted(self.blocks.keys())[:self.needed_blocks]
646                blocks = []
647                for blocknum in blockids:
648                    blocks.append(self.blocks[blocknum])
649                return (blocks, blockids)
650            else:
651                return self._download()
652        d.addCallback(_done)
653        return d
654
655    def _try(self):
656        # fill our set of active buckets, maybe raising NotEnoughSharesError
657        active_buckets = self.parent._activate_enough_buckets()
658        # Now we have enough buckets, in self.parent.active_buckets.
659
660        # in test cases, bd.start might mutate active_buckets right away, so
661        # we need to put off calling start() until we've iterated all the way
662        # through it.
663        downloaders = []
664        for blocknum, vbucket in active_buckets.iteritems():
665            assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
666            bd = BlockDownloader(vbucket, blocknum, self, self.results)
667            downloaders.append(bd)
668            if self.results:
669                self.results.servers_used.add(vbucket.bucket.get_peerid())
670        l = [bd.start(self.segmentnumber) for bd in downloaders]
671        return defer.DeferredList(l, fireOnOneErrback=True)
672
673    def hold_block(self, blocknum, data):
674        self.blocks[blocknum] = data
675
676    def bucket_failed(self, vbucket):
677        self.parent.bucket_failed(vbucket)
678
679class DownloadStatus:
680    implements(IDownloadStatus)
681    statusid_counter = itertools.count(0)
682
683    def __init__(self):
684        self.storage_index = None
685        self.size = None
686        self.helper = False
687        self.status = "Not started"
688        self.progress = 0.0
689        self.paused = False
690        self.stopped = False
691        self.active = True
692        self.results = None
693        self.counter = self.statusid_counter.next()
694        self.started = time.time()
695
696    def get_started(self):
697        return self.started
698    def get_storage_index(self):
699        return self.storage_index
700    def get_size(self):
701        return self.size
702    def using_helper(self):
703        return self.helper
704    def get_status(self):
705        status = self.status
706        if self.paused:
707            status += " (output paused)"
708        if self.stopped:
709            status += " (output stopped)"
710        return status
711    def get_progress(self):
712        return self.progress
713    def get_active(self):
714        return self.active
715    def get_results(self):
716        return self.results
717    def get_counter(self):
718        return self.counter
719
720    def set_storage_index(self, si):
721        self.storage_index = si
722    def set_size(self, size):
723        self.size = size
724    def set_helper(self, helper):
725        self.helper = helper
726    def set_status(self, status):
727        self.status = status
728    def set_paused(self, paused):
729        self.paused = paused
730    def set_stopped(self, stopped):
731        self.stopped = stopped
732    def set_progress(self, value):
733        self.progress = value
734    def set_active(self, value):
735        self.active = value
736    def set_results(self, value):
737        self.results = value
738
739class CiphertextDownloader(log.PrefixingLogMixin):
740    """ I download shares, check their integrity, then decode them, check the
741    integrity of the resulting ciphertext, then and write it to my target.
742    Before I send any new request to a server, I always ask the 'monitor'
743    object that was passed into my constructor whether this task has been
744    cancelled (by invoking its raise_if_cancelled() method)."""
745    implements(IPushProducer)
746    _status = None
747
748    def __init__(self, storage_broker, v, target, monitor):
749
750        precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
751        precondition(IVerifierURI.providedBy(v), v)
752        precondition(IDownloadTarget.providedBy(target), target)
753
754        prefix=base32.b2a_l(v.storage_index[:8], 60)
755        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
756        self._storage_broker = storage_broker
757
758        self._verifycap = v
759        self._storage_index = v.storage_index
760        self._uri_extension_hash = v.uri_extension_hash
761
762        self._started = time.time()
763        self._status = s = DownloadStatus()
764        s.set_status("Starting")
765        s.set_storage_index(self._storage_index)
766        s.set_size(self._verifycap.size)
767        s.set_helper(False)
768        s.set_active(True)
769
770        self._results = DownloadResults()
771        s.set_results(self._results)
772        self._results.file_size = self._verifycap.size
773        self._results.timings["servers_peer_selection"] = {}
774        self._results.timings["fetch_per_server"] = {}
775        self._results.timings["cumulative_fetch"] = 0.0
776        self._results.timings["cumulative_decode"] = 0.0
777        self._results.timings["cumulative_decrypt"] = 0.0
778        self._results.timings["paused"] = 0.0
779
780        self._paused = False
781        self._stopped = False
782        if IConsumer.providedBy(target):
783            target.registerProducer(self, True)
784        self._target = target
785        # Repairer (uploader) needs the storageindex.
786        self._target.set_storageindex(self._storage_index)
787        self._monitor = monitor
788        self._opened = False
789
790        self.active_buckets = {} # k: shnum, v: bucket
791        self._share_buckets = [] # list of (sharenum, bucket) tuples
792        self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
793
794        self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
795
796        self._ciphertext_hasher = hashutil.crypttext_hasher()
797
798        self._bytes_done = 0
799        self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
800
801        # _got_uri_extension() will create the following:
802        # self._crypttext_hash_tree
803        # self._share_hash_tree
804        # self._current_segnum = 0
805        # self._vup # ValidatedExtendedURIProxy
806
807    def pauseProducing(self):
808        if self._paused:
809            return
810        self._paused = defer.Deferred()
811        self._paused_at = time.time()
812        if self._status:
813            self._status.set_paused(True)
814
815    def resumeProducing(self):
816        if self._paused:
817            paused_for = time.time() - self._paused_at
818            self._results.timings['paused'] += paused_for
819            p = self._paused
820            self._paused = None
821            eventually(p.callback, None)
822            if self._status:
823                self._status.set_paused(False)
824
825    def stopProducing(self):
826        self.log("Download.stopProducing")
827        self._stopped = True
828        self.resumeProducing()
829        if self._status:
830            self._status.set_stopped(True)
831            self._status.set_active(False)
832
833    def start(self):
834        self.log("starting download")
835
836        # first step: who should we download from?
837        d = defer.maybeDeferred(self._get_all_shareholders)
838        d.addCallback(self._got_all_shareholders)
839        # now get the uri_extension block from somebody and integrity check
840        # it and parse and validate its contents
841        d.addCallback(self._obtain_uri_extension)
842        d.addCallback(self._get_crypttext_hash_tree)
843        # once we know that, we can download blocks from everybody
844        d.addCallback(self._download_all_segments)
845        def _finished(res):
846            if self._status:
847                self._status.set_status("Finished")
848                self._status.set_active(False)
849                self._status.set_paused(False)
850            if IConsumer.providedBy(self._target):
851                self._target.unregisterProducer()
852            return res
853        d.addBoth(_finished)
854        def _failed(why):
855            if self._status:
856                self._status.set_status("Failed")
857                self._status.set_active(False)
858            if why.check(DownloadStopped):
859                # DownloadStopped just means the consumer aborted the
860                # download; not so scary.
861                self.log("download stopped", level=log.UNUSUAL)
862            else:
863                # This is really unusual, and deserves maximum forensics.
864                self.log("download failed!", failure=why, level=log.SCARY,
865                         umid="lp1vaQ")
866            return why
867        d.addErrback(_failed)
868        d.addCallback(self._done)
869        return d
870
871    def _get_all_shareholders(self):
872        dl = []
873        sb = self._storage_broker
874        servers = sb.get_servers_for_index(self._storage_index)
875        if not servers:
876            raise NoServersError("broker gave us no servers!")
877        for (peerid,ss) in servers:
878            self.log(format="sending DYHB to [%(peerid)s]",
879                     peerid=idlib.shortnodeid_b2a(peerid),
880                     level=log.NOISY, umid="rT03hg")
881            d = ss.callRemote("get_buckets", self._storage_index)
882            d.addCallbacks(self._got_response, self._got_error,
883                           callbackArgs=(peerid,))
884            dl.append(d)
885        self._responses_received = 0
886        self._queries_sent = len(dl)
887        if self._status:
888            self._status.set_status("Locating Shares (%d/%d)" %
889                                    (self._responses_received,
890                                     self._queries_sent))
891        return defer.DeferredList(dl)
892
893    def _got_response(self, buckets, peerid):
894        self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
895                 peerid=idlib.shortnodeid_b2a(peerid),
896                 shnums=sorted(buckets.keys()),
897                 level=log.NOISY, umid="o4uwFg")
898        self._responses_received += 1
899        if self._results:
900            elapsed = time.time() - self._started
901            self._results.timings["servers_peer_selection"][peerid] = elapsed
902        if self._status:
903            self._status.set_status("Locating Shares (%d/%d)" %
904                                    (self._responses_received,
905                                     self._queries_sent))
906        for sharenum, bucket in buckets.iteritems():
907            b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
908            self.add_share_bucket(sharenum, b)
909
910            if self._results:
911                if peerid not in self._results.servermap:
912                    self._results.servermap[peerid] = set()
913                self._results.servermap[peerid].add(sharenum)
914
915    def add_share_bucket(self, sharenum, bucket):
916        # this is split out for the benefit of test_encode.py
917        self._share_buckets.append( (sharenum, bucket) )
918
919    def _got_error(self, f):
920        level = log.WEIRD
921        if f.check(DeadReferenceError):
922            level = log.UNUSUAL
923        self.log("Error during get_buckets", failure=f, level=level,
924                         umid="3uuBUQ")
925
926    def bucket_failed(self, vbucket):
927        shnum = vbucket.sharenum
928        del self.active_buckets[shnum]
929        s = self._share_vbuckets[shnum]
930        # s is a set of ValidatedReadBucketProxy instances
931        s.remove(vbucket)
932        # ... which might now be empty
933        if not s:
934            # there are no more buckets which can provide this share, so
935            # remove the key. This may prompt us to use a different share.
936            del self._share_vbuckets[shnum]
937
938    def _got_all_shareholders(self, res):
939        if self._results:
940            now = time.time()
941            self._results.timings["peer_selection"] = now - self._started
942
943        if len(self._share_buckets) < self._verifycap.needed_shares:
944            msg = "Failed to get enough shareholders: have %d, need %d" \
945                  % (len(self._share_buckets), self._verifycap.needed_shares)
946            if self._share_buckets:
947                raise NotEnoughSharesError(msg)
948            else:
949                raise NoSharesError(msg)
950
951        #for s in self._share_vbuckets.values():
952        #    for vb in s:
953        #        assert isinstance(vb, ValidatedReadBucketProxy), \
954        #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
955
956    def _obtain_uri_extension(self, ignored):
957        # all shareholders are supposed to have a copy of uri_extension, and
958        # all are supposed to be identical. We compute the hash of the data
959        # that comes back, and compare it against the version in our URI. If
960        # they don't match, ignore their data and try someone else.
961        if self._status:
962            self._status.set_status("Obtaining URI Extension")
963
964        uri_extension_fetch_started = time.time()
965
966        vups = []
967        for sharenum, bucket in self._share_buckets:
968            vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
969        vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
970        d = vto.start()
971
972        def _got_uri_extension(vup):
973            precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
974            if self._results:
975                elapsed = time.time() - uri_extension_fetch_started
976                self._results.timings["uri_extension"] = elapsed
977
978            self._vup = vup
979            self._codec = codec.CRSDecoder()
980            self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
981            self._tail_codec = codec.CRSDecoder()
982            self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
983
984            self._current_segnum = 0
985
986            self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
987            self._share_hash_tree.set_hashes({0: vup.share_root_hash})
988
989            self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
990            self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
991
992            # Repairer (uploader) needs the encodingparams.
993            self._target.set_encodingparams((
994                self._verifycap.needed_shares,
995                self._verifycap.total_shares, # I don't think the target actually cares about "happy".
996                self._verifycap.total_shares,
997                self._vup.segment_size
998                ))
999        d.addCallback(_got_uri_extension)
1000        return d
1001
1002    def _get_crypttext_hash_tree(self, res):
1003        vchtps = []
1004        for sharenum, bucket in self._share_buckets:
1005            vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
1006            vchtps.append(vchtp)
1007
1008        _get_crypttext_hash_tree_started = time.time()
1009        if self._status:
1010            self._status.set_status("Retrieving crypttext hash tree")
1011
1012        vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
1013                                     log_id=self._parentmsgid)
1014        d = vto.start()
1015
1016        def _got_crypttext_hash_tree(res):
1017            # Good -- the self._crypttext_hash_tree that we passed to vchtp
1018            # is now populated with hashes.
1019            if self._results:
1020                elapsed = time.time() - _get_crypttext_hash_tree_started
1021                self._results.timings["hashtrees"] = elapsed
1022        d.addCallback(_got_crypttext_hash_tree)
1023        return d
1024
1025    def _activate_enough_buckets(self):
1026        """either return a mapping from shnum to a ValidatedReadBucketProxy
1027        that can provide data for that share, or raise NotEnoughSharesError"""
1028
1029        while len(self.active_buckets) < self._verifycap.needed_shares:
1030            # need some more
1031            handled_shnums = set(self.active_buckets.keys())
1032            available_shnums = set(self._share_vbuckets.keys())
1033            potential_shnums = list(available_shnums - handled_shnums)
1034            if len(potential_shnums) < (self._verifycap.needed_shares
1035                                        - len(self.active_buckets)):
1036                have = len(potential_shnums) + len(self.active_buckets)
1037                msg = "Unable to activate enough shares: have %d, need %d" \
1038                      % (have, self._verifycap.needed_shares)
1039                if have:
1040                    raise NotEnoughSharesError(msg)
1041                else:
1042                    raise NoSharesError(msg)
1043            # For the next share, choose a primary share if available, else a
1044            # randomly chosen secondary share.
1045            potential_shnums.sort()
1046            if potential_shnums[0] < self._verifycap.needed_shares:
1047                shnum = potential_shnums[0]
1048            else:
1049                shnum = random.choice(potential_shnums)
1050            # and a random bucket that will provide it
1051            validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
1052            self.active_buckets[shnum] = validated_bucket
1053        return self.active_buckets
1054
1055
1056    def _download_all_segments(self, res):
1057        for sharenum, bucket in self._share_buckets:
1058            vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
1059            self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
1060
1061        # after the above code, self._share_vbuckets contains enough
1062        # buckets to complete the download, and some extra ones to
1063        # tolerate some buckets dropping out or having
1064        # errors. self._share_vbuckets is a dictionary that maps from
1065        # shnum to a set of ValidatedBuckets, which themselves are
1066        # wrappers around RIBucketReader references.
1067        self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
1068
1069        self._started_fetching = time.time()
1070
1071        d = defer.succeed(None)
1072        for segnum in range(self._vup.num_segments):
1073            d.addCallback(self._download_segment, segnum)
1074            # this pause, at the end of write, prevents pre-fetch from
1075            # happening until the consumer is ready for more data.
1076            d.addCallback(self._check_for_pause)
1077        return d
1078
1079    def _check_for_pause(self, res):
1080        if self._paused:
1081            d = defer.Deferred()
1082            self._paused.addCallback(lambda ignored: d.callback(res))
1083            return d
1084        if self._stopped:
1085            raise DownloadStopped("our Consumer called stopProducing()")
1086        self._monitor.raise_if_cancelled()
1087        return res
1088
1089    def _download_segment(self, res, segnum):
1090        if self._status:
1091            self._status.set_status("Downloading segment %d of %d" %
1092                                    (segnum+1, self._vup.num_segments))
1093        self.log("downloading seg#%d of %d (%d%%)"
1094                 % (segnum, self._vup.num_segments,
1095                    100.0 * segnum / self._vup.num_segments))
1096        # memory footprint: when the SegmentDownloader finishes pulling down
1097        # all shares, we have 1*segment_size of usage.
1098        segmentdler = SegmentDownloader(self, segnum,
1099                                        self._verifycap.needed_shares,
1100                                        self._results)
1101        started = time.time()
1102        d = segmentdler.start()
1103        def _finished_fetching(res):
1104            elapsed = time.time() - started
1105            self._results.timings["cumulative_fetch"] += elapsed
1106            return res
1107        if self._results:
1108            d.addCallback(_finished_fetching)
1109        # pause before using more memory
1110        d.addCallback(self._check_for_pause)
1111        # while the codec does its job, we hit 2*segment_size
1112        def _started_decode(res):
1113            self._started_decode = time.time()
1114            return res
1115        if self._results:
1116            d.addCallback(_started_decode)
1117        if segnum + 1 == self._vup.num_segments:
1118            codec = self._tail_codec
1119        else:
1120            codec = self._codec
1121        d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
1122        # once the codec is done, we drop back to 1*segment_size, because
1123        # 'shares' goes out of scope. The memory usage is all in the
1124        # plaintext now, spread out into a bunch of tiny buffers.
1125        def _finished_decode(res):
1126            elapsed = time.time() - self._started_decode
1127            self._results.timings["cumulative_decode"] += elapsed
1128            return res
1129        if self._results:
1130            d.addCallback(_finished_decode)
1131
1132        # pause/check-for-stop just before writing, to honor stopProducing
1133        d.addCallback(self._check_for_pause)
1134        d.addCallback(self._got_segment)
1135        return d
1136
1137    def _got_segment(self, buffers):
1138        precondition(self._crypttext_hash_tree)
1139        started_decrypt = time.time()
1140        self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1141
1142        if self._current_segnum + 1 == self._vup.num_segments:
1143            # This is the last segment.
1144            # Trim off any padding added by the upload side. We never send
1145            # empty segments. If the data was an exact multiple of the
1146            # segment size, the last segment will be full.
1147            tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1148            num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1149            # Remove buffers which don't contain any part of the tail.
1150            del buffers[num_buffers_used:]
1151            # Remove the past-the-tail-part of the last buffer.
1152            tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1153            if tail_in_last_buf == 0:
1154                tail_in_last_buf = tail_buf_size
1155            buffers[-1] = buffers[-1][:tail_in_last_buf]
1156
1157        # First compute the hash of this segment and check that it fits.
1158        ch = hashutil.crypttext_segment_hasher()
1159        for buffer in buffers:
1160            self._ciphertext_hasher.update(buffer)
1161            ch.update(buffer)
1162        self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1163
1164        # Then write this segment to the target.
1165        if not self._opened:
1166            self._opened = True
1167            self._target.open(self._verifycap.size)
1168
1169        for buffer in buffers:
1170            self._target.write(buffer)
1171            self._bytes_done += len(buffer)
1172
1173        self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1174        self._current_segnum += 1
1175
1176        if self._results:
1177            elapsed = time.time() - started_decrypt
1178            self._results.timings["cumulative_decrypt"] += elapsed
1179
1180    def _done(self, res):
1181        self.log("download done")
1182        if self._results:
1183            now = time.time()
1184            self._results.timings["total"] = now - self._started
1185            self._results.timings["segments"] = now - self._started_fetching
1186        if self._vup.crypttext_hash:
1187            _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1188                    "bad crypttext_hash: computed=%s, expected=%s" %
1189                    (base32.b2a(self._ciphertext_hasher.digest()),
1190                     base32.b2a(self._vup.crypttext_hash)))
1191        _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1192        self._status.set_progress(1)
1193        self._target.close()
1194        return self._target.finish()
1195    def get_download_status(self):
1196        return self._status
1197
1198
1199class ConsumerAdapter:
1200    implements(IDownloadTarget, IConsumer)
1201    def __init__(self, consumer):
1202        self._consumer = consumer
1203
1204    def registerProducer(self, producer, streaming):
1205        self._consumer.registerProducer(producer, streaming)
1206    def unregisterProducer(self):
1207        self._consumer.unregisterProducer()
1208
1209    def open(self, size):
1210        pass
1211    def write(self, data):
1212        self._consumer.write(data)
1213    def close(self):
1214        pass
1215
1216    def fail(self, why):
1217        pass
1218    def register_canceller(self, cb):
1219        pass
1220    def finish(self):
1221        return self._consumer
1222    # The following methods are just because the target might be a
1223    # repairer.DownUpConnector, and just because the current CHKUpload object
1224    # expects to find the storage index and encoding parameters in its
1225    # Uploadable.
1226    def set_storageindex(self, storageindex):
1227        pass
1228    def set_encodingparams(self, encodingparams):
1229        pass
1230
1231
1232class Downloader:
1233    """I am a service that allows file downloading.
1234    """
1235    # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1236    # It is scheduled to go away, to be replaced by filenode.download()
1237    implements(IDownloader)
1238
1239    def __init__(self, storage_broker, stats_provider):
1240        self.storage_broker = storage_broker
1241        self.stats_provider = stats_provider
1242        self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1243
1244    def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1245        assert isinstance(u, uri.CHKFileURI)
1246        t = IDownloadTarget(t)
1247        assert t.write
1248        assert t.close
1249
1250        if self.stats_provider:
1251            # these counters are meant for network traffic, and don't
1252            # include LIT files
1253            self.stats_provider.count('downloader.files_downloaded', 1)
1254            self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1255
1256        target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1257        if not monitor:
1258            monitor=Monitor()
1259        dl = CiphertextDownloader(self.storage_broker,
1260                                  u.get_verify_cap(), target,
1261                                  monitor=monitor)
1262        self._all_downloads[dl] = None
1263        if history:
1264            history.add_download(dl.get_download_status())
1265        d = dl.start()
1266        return d
Note: See TracBrowser for help on using the repository browser.