Ticket #798: new-downloader-v10a.dpatch

File new-downloader-v10a.dpatch, 267.7 KB (added by davidsarah, at 2010-08-01T05:49:45Z)

Brian's New Downloader, for testing in 1.8beta (or alpha)

Line 
1Sun Aug  1 05:43:09 GMT Daylight Time 2010  david-sarah@jacaranda.org
2  * Brian's New Downloader, for testing in 1.8beta.
3
4New patches:
5
6[Brian's New Downloader, for testing in 1.8beta.
7david-sarah@jacaranda.org**20100801044309
8 Ignore-this: d9d3637d2fbdd007d857f49cffa688d0
9] {
10hunk ./Makefile 128
11 # quicktest-coverage" to do a unit test run with coverage-gathering enabled,
12 # then use "make coverate-output-text" for a brief report, or "make
13 # coverage-output" for a pretty HTML report. Also see "make .coverage.el" and
14-# misc/coding_helpers/coverage.el for emacs integration.
15+# misc/coding_tools/coverage.el for emacs integration.
16 
17 quicktest-coverage:
18        rm -f .coverage
19hunk ./Makefile 137
20 
21 coverage-output:
22        rm -rf coverage-html
23-       coverage html -d coverage-html
24+       coverage html -i -d coverage-html $(COVERAGE_OMIT)
25        cp .coverage coverage-html/coverage.data
26        @echo "now point your browser at coverage-html/index.html"
27 
28hunk ./Makefile 157
29 .PHONY: repl test-darcs-boringfile test-clean clean find-trailing-spaces
30 
31 .coverage.el: .coverage
32-       $(PYTHON) misc/coding_helpers/coverage2el.py
33+       $(PYTHON) misc/coding_tools/coverage2el.py
34 
35 # 'upload-coverage' is meant to be run with an UPLOAD_TARGET=host:/dir setting
36 ifdef UPLOAD_TARGET
37hunk ./Makefile 181
38 
39 pyflakes:
40        $(PYTHON) -OOu `which pyflakes` src/allmydata |sort |uniq
41+check-umids:
42+       $(PYTHON) misc/coding_tools/check-umids.py `find src/allmydata -name '*.py'`
43 
44 count-lines:
45        @echo -n "files: "
46hunk ./misc/coding_tools/coverage.el 87
47                            'face '(:box "red")
48                            )
49               )
50-            (message "Added annotations")
51+            (message (format "Added annotations: %d uncovered lines"
52+                             (safe-length uncovered-code-lines)))
53             )
54           )
55       (message "unable to find coverage for this file"))
56hunk ./misc/simulators/sizes.py 63
57             self.block_arity = 0
58             self.block_tree_depth = 0
59             self.block_overhead = 0
60-            self.bytes_until_some_data = 20 + share_size
61+            self.bytes_until_some_data = 32 + share_size
62             self.share_storage_overhead = 0
63             self.share_transmission_overhead = 0
64 
65hunk ./misc/simulators/sizes.py 69
66         elif mode == "beta":
67             # k=num_blocks, d=1
68-            # each block has a 20-byte hash
69+            # each block has a 32-byte hash
70             self.block_arity = num_blocks
71             self.block_tree_depth = 1
72hunk ./misc/simulators/sizes.py 72
73-            self.block_overhead = 20
74+            self.block_overhead = 32
75             # the share has a list of hashes, one for each block
76             self.share_storage_overhead = (self.block_overhead *
77                                            num_blocks)
78hunk ./misc/simulators/sizes.py 78
79             # we can get away with not sending the hash of the share that
80             # we're sending in full, once
81-            self.share_transmission_overhead = self.share_storage_overhead - 20
82+            self.share_transmission_overhead = self.share_storage_overhead - 32
83             # we must get the whole list (so it can be validated) before
84             # any data can be validated
85             self.bytes_until_some_data = (self.share_transmission_overhead +
86hunk ./misc/simulators/sizes.py 92
87             # to make things easier, we make the pessimistic assumption that
88             # we have to store hashes for all the empty places in the tree
89             # (when the number of shares is not an exact exponent of k)
90-            self.block_overhead = 20
91+            self.block_overhead = 32
92             # the block hashes are organized into a k-ary tree, which
93             # means storing (and eventually transmitting) more hashes. This
94             # count includes all the low-level share hashes and the root.
95hunk ./misc/simulators/sizes.py 101
96             #print "num_leaves", num_leaves
97             #print "hash_nodes", hash_nodes
98             # the storage overhead is this
99-            self.share_storage_overhead = 20 * (hash_nodes - 1)
100+            self.share_storage_overhead = 32 * (hash_nodes - 1)
101             # the transmission overhead is smaller: if we actually transmit
102             # every block, we don't have to transmit 1/k of the
103             # lowest-level block hashes, and we don't have to transmit the
104hunk ./misc/simulators/sizes.py 106
105             # root because it was already sent with the share-level hash tree
106-            self.share_transmission_overhead = 20 * (hash_nodes
107+            self.share_transmission_overhead = 32 * (hash_nodes
108                                                      - 1 # the root
109                                                      - num_leaves / k)
110             # we must get a full sibling hash chain before we can validate
111hunk ./misc/simulators/sizes.py 112
112             # any data
113             sibling_length = d * (k-1)
114-            self.bytes_until_some_data = 20 * sibling_length + block_size
115+            self.bytes_until_some_data = 32 * sibling_length + block_size
116             
117             
118 
119hunk ./misc/simulators/storage-overhead.py 4
120 #!/usr/bin/env python
121 
122 import sys, math
123-from allmydata import upload, uri, encode, storage
124+from allmydata import uri, storage
125+from allmydata.immutable import upload
126+from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
127 from allmydata.util import mathutil
128 
129 def roundup(size, blocksize=4096):
130hunk ./misc/simulators/storage-overhead.py 27
131     def tell(self):
132         return self.fp
133 
134-def calc(filesize, params=(3,7,10), segsize=encode.Encoder.MAX_SEGMENT_SIZE):
135+def calc(filesize, params=(3,7,10), segsize=DEFAULT_MAX_SEGMENT_SIZE):
136     num_shares = params[2]
137     if filesize <= upload.Uploader.URI_LIT_SIZE_THRESHOLD:
138hunk ./misc/simulators/storage-overhead.py 30
139-        urisize = len(uri.pack_lit("A"*filesize))
140+        urisize = len(uri.LiteralFileURI("A"*filesize).to_string())
141         sharesize = 0
142         sharespace = 0
143     else:
144hunk ./misc/simulators/storage-overhead.py 34
145-        u = upload.FileUploader(None)
146+        u = upload.FileUploader(None) # XXX changed
147         u.set_params(params)
148         # unfortunately, Encoder doesn't currently lend itself to answering
149         # this question without measuring a filesize, so we have to give it a
150hunk ./src/allmydata/client.py 1
151-import os, stat, time
152+import os, stat, time, weakref
153 from allmydata.interfaces import RIStorageServer
154 from allmydata import node
155 
156hunk ./src/allmydata/client.py 7
157 from zope.interface import implements
158 from twisted.internet import reactor, defer
159+from twisted.application import service
160 from twisted.application.internet import TimerService
161 from foolscap.api import Referenceable
162 from pycryptopp.publickey import rsa
163hunk ./src/allmydata/client.py 16
164 from allmydata.storage.server import StorageServer
165 from allmydata import storage_client
166 from allmydata.immutable.upload import Uploader
167-from allmydata.immutable.download import Downloader
168 from allmydata.immutable.offloaded import Helper
169 from allmydata.control import ControlServer
170 from allmydata.introducer.client import IntroducerClient
171hunk ./src/allmydata/client.py 19
172-from allmydata.util import hashutil, base32, pollmixin, cachedir, log
173+from allmydata.util import hashutil, base32, pollmixin, log
174 from allmydata.util.encodingutil import get_filesystem_encoding
175 from allmydata.util.abbreviate import parse_abbreviated_size
176 from allmydata.util.time_format import parse_duration, parse_date
177hunk ./src/allmydata/client.py 98
178             verifier = signer.get_verifying_key()
179             return defer.succeed( (verifier, signer) )
180 
181+class Terminator(service.Service):
182+    def __init__(self):
183+        self._clients = weakref.WeakKeyDictionary()
184+    def register(self, c):
185+        self._clients[c] = None
186+    def stopService(self):
187+        for c in self._clients:
188+            c.stop()
189+        return service.Service.stopService(self)
190+
191 
192 class Client(node.Node, pollmixin.PollMixin):
193     implements(IStatsProducer)
194hunk ./src/allmydata/client.py 292
195 
196         self.init_client_storage_broker()
197         self.history = History(self.stats_provider)
198+        self.terminator = Terminator()
199+        self.terminator.setServiceParent(self)
200         self.add_service(Uploader(helper_furl, self.stats_provider))
201hunk ./src/allmydata/client.py 295
202-        download_cachedir = os.path.join(self.basedir,
203-                                         "private", "cache", "download")
204-        self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)
205-        self.download_cache_dirman.setServiceParent(self)
206-        self.downloader = Downloader(self.storage_broker, self.stats_provider)
207         self.init_stub_client()
208         self.init_nodemaker()
209 
210hunk ./src/allmydata/client.py 353
211                                    self._secret_holder,
212                                    self.get_history(),
213                                    self.getServiceNamed("uploader"),
214-                                   self.downloader,
215-                                   self.download_cache_dirman,
216+                                   self.terminator,
217                                    self.get_encoding_parameters(),
218                                    self._key_generator)
219 
220hunk ./src/allmydata/immutable/checker.py 1
221+from zope.interface import implements
222+from twisted.internet import defer
223 from foolscap.api import DeadReferenceError, RemoteException
224hunk ./src/allmydata/immutable/checker.py 4
225+from allmydata import hashtree, codec, uri
226+from allmydata.interfaces import IValidatedThingProxy, IVerifierURI
227 from allmydata.hashtree import IncompleteHashTree
228 from allmydata.check_results import CheckResults
229hunk ./src/allmydata/immutable/checker.py 8
230-from allmydata.immutable import download
231 from allmydata.uri import CHKFileVerifierURI
232 from allmydata.util.assertutil import precondition
233hunk ./src/allmydata/immutable/checker.py 10
234-from allmydata.util import base32, idlib, deferredutil, dictutil, log
235+from allmydata.util import base32, idlib, deferredutil, dictutil, log, mathutil
236 from allmydata.util.hashutil import file_renewal_secret_hash, \
237      file_cancel_secret_hash, bucket_renewal_secret_hash, \
238hunk ./src/allmydata/immutable/checker.py 13
239-     bucket_cancel_secret_hash
240+     bucket_cancel_secret_hash, uri_extension_hash, CRYPTO_VAL_SIZE, \
241+     block_hash
242 
243 from allmydata.immutable import layout
244 
245hunk ./src/allmydata/immutable/checker.py 18
246+class IntegrityCheckReject(Exception):
247+    pass
248+class BadURIExtension(IntegrityCheckReject):
249+    pass
250+class BadURIExtensionHashValue(IntegrityCheckReject):
251+    pass
252+class BadOrMissingHash(IntegrityCheckReject):
253+    pass
254+class UnsupportedErasureCodec(BadURIExtension):
255+    pass
256+
257+class ValidatedExtendedURIProxy:
258+    implements(IValidatedThingProxy)
259+    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
260+    responsible for retrieving and validating the elements from the UEB."""
261+
262+    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
263+        # fetch_failures is for debugging -- see test_encode.py
264+        self._fetch_failures = fetch_failures
265+        self._readbucketproxy = readbucketproxy
266+        precondition(IVerifierURI.providedBy(verifycap), verifycap)
267+        self._verifycap = verifycap
268+
269+        # required
270+        self.segment_size = None
271+        self.crypttext_root_hash = None
272+        self.share_root_hash = None
273+
274+        # computed
275+        self.block_size = None
276+        self.share_size = None
277+        self.num_segments = None
278+        self.tail_data_size = None
279+        self.tail_segment_size = None
280+
281+        # optional
282+        self.crypttext_hash = None
283+
284+    def __str__(self):
285+        return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
286+
287+    def _check_integrity(self, data):
288+        h = uri_extension_hash(data)
289+        if h != self._verifycap.uri_extension_hash:
290+            msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
291+                   (self._readbucketproxy,
292+                    base32.b2a(self._verifycap.uri_extension_hash),
293+                    base32.b2a(h)))
294+            if self._fetch_failures is not None:
295+                self._fetch_failures["uri_extension"] += 1
296+            raise BadURIExtensionHashValue(msg)
297+        else:
298+            return data
299+
300+    def _parse_and_validate(self, data):
301+        self.share_size = mathutil.div_ceil(self._verifycap.size,
302+                                            self._verifycap.needed_shares)
303+
304+        d = uri.unpack_extension(data)
305+
306+        # There are several kinds of things that can be found in a UEB.
307+        # First, things that we really need to learn from the UEB in order to
308+        # do this download. Next: things which are optional but not redundant
309+        # -- if they are present in the UEB they will get used. Next, things
310+        # that are optional and redundant. These things are required to be
311+        # consistent: they don't have to be in the UEB, but if they are in
312+        # the UEB then they will be checked for consistency with the
313+        # already-known facts, and if they are inconsistent then an exception
314+        # will be raised. These things aren't actually used -- they are just
315+        # tested for consistency and ignored. Finally: things which are
316+        # deprecated -- they ought not be in the UEB at all, and if they are
317+        # present then a warning will be logged but they are otherwise
318+        # ignored.
319+
320+        # First, things that we really need to learn from the UEB:
321+        # segment_size, crypttext_root_hash, and share_root_hash.
322+        self.segment_size = d['segment_size']
323+
324+        self.block_size = mathutil.div_ceil(self.segment_size,
325+                                            self._verifycap.needed_shares)
326+        self.num_segments = mathutil.div_ceil(self._verifycap.size,
327+                                              self.segment_size)
328+
329+        self.tail_data_size = self._verifycap.size % self.segment_size
330+        if not self.tail_data_size:
331+            self.tail_data_size = self.segment_size
332+        # padding for erasure code
333+        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
334+                                                        self._verifycap.needed_shares)
335+
336+        # Ciphertext hash tree root is mandatory, so that there is at most
337+        # one ciphertext that matches this read-cap or verify-cap. The
338+        # integrity check on the shares is not sufficient to prevent the
339+        # original encoder from creating some shares of file A and other
340+        # shares of file B.
341+        self.crypttext_root_hash = d['crypttext_root_hash']
342+
343+        self.share_root_hash = d['share_root_hash']
344+
345+
346+        # Next: things that are optional and not redundant: crypttext_hash
347+        if d.has_key('crypttext_hash'):
348+            self.crypttext_hash = d['crypttext_hash']
349+            if len(self.crypttext_hash) != CRYPTO_VAL_SIZE:
350+                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
351+
352+
353+        # Next: things that are optional, redundant, and required to be
354+        # consistent: codec_name, codec_params, tail_codec_params,
355+        # num_segments, size, needed_shares, total_shares
356+        if d.has_key('codec_name'):
357+            if d['codec_name'] != "crs":
358+                raise UnsupportedErasureCodec(d['codec_name'])
359+
360+        if d.has_key('codec_params'):
361+            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
362+            if ucpss != self.segment_size:
363+                raise BadURIExtension("inconsistent erasure code params: "
364+                                      "ucpss: %s != self.segment_size: %s" %
365+                                      (ucpss, self.segment_size))
366+            if ucpns != self._verifycap.needed_shares:
367+                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
368+                                      "self._verifycap.needed_shares: %s" %
369+                                      (ucpns, self._verifycap.needed_shares))
370+            if ucpts != self._verifycap.total_shares:
371+                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
372+                                      "self._verifycap.total_shares: %s" %
373+                                      (ucpts, self._verifycap.total_shares))
374+
375+        if d.has_key('tail_codec_params'):
376+            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
377+            if utcpss != self.tail_segment_size:
378+                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
379+                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
380+                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
381+                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
382+                                         self.segment_size, self._verifycap.needed_shares))
383+            if utcpns != self._verifycap.needed_shares:
384+                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
385+                                      "self._verifycap.needed_shares: %s" % (utcpns,
386+                                                                             self._verifycap.needed_shares))
387+            if utcpts != self._verifycap.total_shares:
388+                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
389+                                      "self._verifycap.total_shares: %s" % (utcpts,
390+                                                                            self._verifycap.total_shares))
391+
392+        if d.has_key('num_segments'):
393+            if d['num_segments'] != self.num_segments:
394+                raise BadURIExtension("inconsistent num_segments: size: %s, "
395+                                      "segment_size: %s, computed_num_segments: %s, "
396+                                      "ueb_num_segments: %s" % (self._verifycap.size,
397+                                                                self.segment_size,
398+                                                                self.num_segments, d['num_segments']))
399+
400+        if d.has_key('size'):
401+            if d['size'] != self._verifycap.size:
402+                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
403+                                      (self._verifycap.size, d['size']))
404+
405+        if d.has_key('needed_shares'):
406+            if d['needed_shares'] != self._verifycap.needed_shares:
407+                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
408+                                      "needed shares: %s" % (self._verifycap.total_shares,
409+                                                             d['needed_shares']))
410+
411+        if d.has_key('total_shares'):
412+            if d['total_shares'] != self._verifycap.total_shares:
413+                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
414+                                      "total shares: %s" % (self._verifycap.total_shares,
415+                                                            d['total_shares']))
416+
417+        # Finally, things that are deprecated and ignored: plaintext_hash,
418+        # plaintext_root_hash
419+        if d.get('plaintext_hash'):
420+            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
421+                    "and is no longer used.  Ignoring.  %s" % (self,))
422+        if d.get('plaintext_root_hash'):
423+            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
424+                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
425+
426+        return self
427+
428+    def start(self):
429+        """Fetch the UEB from bucket, compare its hash to the hash from
430+        verifycap, then parse it. Returns a deferred which is called back
431+        with self once the fetch is successful, or is erred back if it
432+        fails."""
433+        d = self._readbucketproxy.get_uri_extension()
434+        d.addCallback(self._check_integrity)
435+        d.addCallback(self._parse_and_validate)
436+        return d
437+
438+class ValidatedReadBucketProxy(log.PrefixingLogMixin):
439+    """I am a front-end for a remote storage bucket, responsible for
440+    retrieving and validating data from that bucket.
441+
442+    My get_block() method is used by BlockDownloaders.
443+    """
444+
445+    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
446+                 block_size, share_size):
447+        """ share_hash_tree is required to have already been initialized with
448+        the root hash (the number-0 hash), using the share_root_hash from the
449+        UEB"""
450+        precondition(share_hash_tree[0] is not None, share_hash_tree)
451+        prefix = "%d-%s-%s" % (sharenum, bucket,
452+                               base32.b2a_l(share_hash_tree[0][:8], 60))
453+        log.PrefixingLogMixin.__init__(self,
454+                                       facility="tahoe.immutable.download",
455+                                       prefix=prefix)
456+        self.sharenum = sharenum
457+        self.bucket = bucket
458+        self.share_hash_tree = share_hash_tree
459+        self.num_blocks = num_blocks
460+        self.block_size = block_size
461+        self.share_size = share_size
462+        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
463+
464+    def get_all_sharehashes(self):
465+        """Retrieve and validate all the share-hash-tree nodes that are
466+        included in this share, regardless of whether we need them to
467+        validate the share or not. Each share contains a minimal Merkle tree
468+        chain, but there is lots of overlap, so usually we'll be using hashes
469+        from other shares and not reading every single hash from this share.
470+        The Verifier uses this function to read and validate every single
471+        hash from this share.
472+
473+        Call this (and wait for the Deferred it returns to fire) before
474+        calling get_block() for the first time: this lets us check that the
475+        share share contains enough hashes to validate its own data, and
476+        avoids downloading any share hash twice.
477+
478+        I return a Deferred which errbacks upon failure, probably with
479+        BadOrMissingHash."""
480+
481+        d = self.bucket.get_share_hashes()
482+        def _got_share_hashes(sh):
483+            sharehashes = dict(sh)
484+            try:
485+                self.share_hash_tree.set_hashes(sharehashes)
486+            except IndexError, le:
487+                raise BadOrMissingHash(le)
488+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
489+                raise BadOrMissingHash(le)
490+        d.addCallback(_got_share_hashes)
491+        return d
492+
493+    def get_all_blockhashes(self):
494+        """Retrieve and validate all the block-hash-tree nodes that are
495+        included in this share. Each share contains a full Merkle tree, but
496+        we usually only fetch the minimal subset necessary for any particular
497+        block. This function fetches everything at once. The Verifier uses
498+        this function to validate the block hash tree.
499+
500+        Call this (and wait for the Deferred it returns to fire) after
501+        calling get_all_sharehashes() and before calling get_block() for the
502+        first time: this lets us check that the share contains all block
503+        hashes and avoids downloading them multiple times.
504+
505+        I return a Deferred which errbacks upon failure, probably with
506+        BadOrMissingHash.
507+        """
508+
509+        # get_block_hashes(anything) currently always returns everything
510+        needed = list(range(len(self.block_hash_tree)))
511+        d = self.bucket.get_block_hashes(needed)
512+        def _got_block_hashes(blockhashes):
513+            if len(blockhashes) < len(self.block_hash_tree):
514+                raise BadOrMissingHash()
515+            bh = dict(enumerate(blockhashes))
516+
517+            try:
518+                self.block_hash_tree.set_hashes(bh)
519+            except IndexError, le:
520+                raise BadOrMissingHash(le)
521+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
522+                raise BadOrMissingHash(le)
523+        d.addCallback(_got_block_hashes)
524+        return d
525+
526+    def get_all_crypttext_hashes(self, crypttext_hash_tree):
527+        """Retrieve and validate all the crypttext-hash-tree nodes that are
528+        in this share. Normally we don't look at these at all: the download
529+        process fetches them incrementally as needed to validate each segment
530+        of ciphertext. But this is a convenient place to give the Verifier a
531+        function to validate all of these at once.
532+
533+        Call this with a new hashtree object for each share, initialized with
534+        the crypttext hash tree root. I return a Deferred which errbacks upon
535+        failure, probably with BadOrMissingHash.
536+        """
537+
538+        # get_crypttext_hashes() always returns everything
539+        d = self.bucket.get_crypttext_hashes()
540+        def _got_crypttext_hashes(hashes):
541+            if len(hashes) < len(crypttext_hash_tree):
542+                raise BadOrMissingHash()
543+            ct_hashes = dict(enumerate(hashes))
544+            try:
545+                crypttext_hash_tree.set_hashes(ct_hashes)
546+            except IndexError, le:
547+                raise BadOrMissingHash(le)
548+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
549+                raise BadOrMissingHash(le)
550+        d.addCallback(_got_crypttext_hashes)
551+        return d
552+
553+    def get_block(self, blocknum):
554+        # the first time we use this bucket, we need to fetch enough elements
555+        # of the share hash tree to validate it from our share hash up to the
556+        # hashroot.
557+        if self.share_hash_tree.needed_hashes(self.sharenum):
558+            d1 = self.bucket.get_share_hashes()
559+        else:
560+            d1 = defer.succeed([])
561+
562+        # We might need to grab some elements of our block hash tree, to
563+        # validate the requested block up to the share hash.
564+        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
565+        # We don't need the root of the block hash tree, as that comes in the
566+        # share tree.
567+        blockhashesneeded.discard(0)
568+        d2 = self.bucket.get_block_hashes(blockhashesneeded)
569+
570+        if blocknum < self.num_blocks-1:
571+            thisblocksize = self.block_size
572+        else:
573+            thisblocksize = self.share_size % self.block_size
574+            if thisblocksize == 0:
575+                thisblocksize = self.block_size
576+        d3 = self.bucket.get_block_data(blocknum,
577+                                        self.block_size, thisblocksize)
578+
579+        dl = deferredutil.gatherResults([d1, d2, d3])
580+        dl.addCallback(self._got_data, blocknum)
581+        return dl
582+
583+    def _got_data(self, results, blocknum):
584+        precondition(blocknum < self.num_blocks,
585+                     self, blocknum, self.num_blocks)
586+        sharehashes, blockhashes, blockdata = results
587+        try:
588+            sharehashes = dict(sharehashes)
589+        except ValueError, le:
590+            le.args = tuple(le.args + (sharehashes,))
591+            raise
592+        blockhashes = dict(enumerate(blockhashes))
593+
594+        candidate_share_hash = None # in case we log it in the except block below
595+        blockhash = None # in case we log it in the except block below
596+
597+        try:
598+            if self.share_hash_tree.needed_hashes(self.sharenum):
599+                # This will raise exception if the values being passed do not
600+                # match the root node of self.share_hash_tree.
601+                try:
602+                    self.share_hash_tree.set_hashes(sharehashes)
603+                except IndexError, le:
604+                    # Weird -- sharehashes contained index numbers outside of
605+                    # the range that fit into this hash tree.
606+                    raise BadOrMissingHash(le)
607+
608+            # To validate a block we need the root of the block hash tree,
609+            # which is also one of the leafs of the share hash tree, and is
610+            # called "the share hash".
611+            if not self.block_hash_tree[0]: # empty -- no root node yet
612+                # Get the share hash from the share hash tree.
613+                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
614+                if not share_hash:
615+                    # No root node in block_hash_tree and also the share hash
616+                    # wasn't sent by the server.
617+                    raise hashtree.NotEnoughHashesError
618+                self.block_hash_tree.set_hashes({0: share_hash})
619+
620+            if self.block_hash_tree.needed_hashes(blocknum):
621+                self.block_hash_tree.set_hashes(blockhashes)
622+
623+            blockhash = block_hash(blockdata)
624+            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
625+            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
626+            #        "%r .. %r: %s" %
627+            #        (self.sharenum, blocknum, len(blockdata),
628+            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
629+
630+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
631+            # log.WEIRD: indicates undetected disk/network error, or more
632+            # likely a programming error
633+            self.log("hash failure in block=%d, shnum=%d on %s" %
634+                    (blocknum, self.sharenum, self.bucket))
635+            if self.block_hash_tree.needed_hashes(blocknum):
636+                self.log(""" failure occurred when checking the block_hash_tree.
637+                This suggests that either the block data was bad, or that the
638+                block hashes we received along with it were bad.""")
639+            else:
640+                self.log(""" the failure probably occurred when checking the
641+                share_hash_tree, which suggests that the share hashes we
642+                received from the remote peer were bad.""")
643+            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
644+            self.log(" block length: %d" % len(blockdata))
645+            self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
646+            if len(blockdata) < 100:
647+                self.log(" block data: %r" % (blockdata,))
648+            else:
649+                self.log(" block data start/end: %r .. %r" %
650+                        (blockdata[:50], blockdata[-50:]))
651+            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
652+            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
653+            lines = []
654+            for i,h in sorted(sharehashes.items()):
655+                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
656+            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
657+            lines = []
658+            for i,h in blockhashes.items():
659+                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
660+            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
661+            raise BadOrMissingHash(le)
662+
663+        # If we made it here, the block is good. If the hash trees didn't
664+        # like what they saw, they would have raised a BadHashError, causing
665+        # our caller to see a Failure and thus ignore this block (as well as
666+        # dropping this bucket).
667+        return blockdata
668+
669+
670 class Checker(log.PrefixingLogMixin):
671     """I query all servers to see if M uniquely-numbered shares are
672     available.
673hunk ./src/allmydata/immutable/checker.py 516
674             level = log.WEIRD
675             if f.check(DeadReferenceError):
676                 level = log.UNUSUAL
677-            self.log("failure from server on 'get_buckets' the REMOTE failure was:", facility="tahoe.immutable.checker", failure=f, level=level, umid="3uuBUQ")
678+            self.log("failure from server on 'get_buckets' the REMOTE failure was:",
679+                     facility="tahoe.immutable.checker",
680+                     failure=f, level=level, umid="AX7wZQ")
681             return ({}, serverid, False)
682 
683         d.addCallbacks(_wrap_results, _trap_errs)
684hunk ./src/allmydata/immutable/checker.py 579
685 
686         vcap = self._verifycap
687         b = layout.ReadBucketProxy(bucket, serverid, vcap.get_storage_index())
688-        veup = download.ValidatedExtendedURIProxy(b, vcap)
689+        veup = ValidatedExtendedURIProxy(b, vcap)
690         d = veup.start()
691 
692         def _got_ueb(vup):
693hunk ./src/allmydata/immutable/checker.py 586
694             share_hash_tree = IncompleteHashTree(vcap.total_shares)
695             share_hash_tree.set_hashes({0: vup.share_root_hash})
696 
697-            vrbp = download.ValidatedReadBucketProxy(sharenum, b,
698-                                                     share_hash_tree,
699-                                                     vup.num_segments,
700-                                                     vup.block_size,
701-                                                     vup.share_size)
702+            vrbp = ValidatedReadBucketProxy(sharenum, b,
703+                                            share_hash_tree,
704+                                            vup.num_segments,
705+                                            vup.block_size,
706+                                            vup.share_size)
707 
708             # note: normal download doesn't use get_all_sharehashes(),
709             # because it gets more data than necessary. We've discussed the
710hunk ./src/allmydata/immutable/checker.py 649
711                 return (False, sharenum, 'incompatible')
712             elif f.check(layout.LayoutInvalid,
713                          layout.RidiculouslyLargeURIExtensionBlock,
714-                         download.BadOrMissingHash,
715-                         download.BadURIExtensionHashValue):
716+                         BadOrMissingHash,
717+                         BadURIExtensionHashValue):
718                 return (False, sharenum, 'corrupt')
719 
720             # if it wasn't one of those reasons, re-raise the error
721hunk ./src/allmydata/immutable/download.py 1
722-import random, weakref, itertools, time
723-from zope.interface import implements
724-from twisted.internet import defer, reactor
725-from twisted.internet.interfaces import IPushProducer, IConsumer
726-from foolscap.api import DeadReferenceError, RemoteException, eventually
727-
728-from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
729-from allmydata.util.assertutil import _assert, precondition
730-from allmydata import codec, hashtree, uri
731-from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \
732-     IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
733-     IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
734-     UnableToFetchCriticalDownloadDataError
735-from allmydata.immutable import layout
736-from allmydata.monitor import Monitor
737-from pycryptopp.cipher.aes import AES
738-
739-class IntegrityCheckReject(Exception):
740-    pass
741-
742-class BadURIExtensionHashValue(IntegrityCheckReject):
743-    pass
744-class BadURIExtension(IntegrityCheckReject):
745-    pass
746-class UnsupportedErasureCodec(BadURIExtension):
747-    pass
748-class BadCrypttextHashValue(IntegrityCheckReject):
749-    pass
750-class BadOrMissingHash(IntegrityCheckReject):
751-    pass
752-
753-class DownloadStopped(Exception):
754-    pass
755-
756-class DownloadResults:
757-    implements(IDownloadResults)
758-
759-    def __init__(self):
760-        self.servers_used = set()
761-        self.server_problems = {}
762-        self.servermap = {}
763-        self.timings = {}
764-        self.file_size = None
765-
766-class DecryptingTarget(log.PrefixingLogMixin):
767-    implements(IDownloadTarget, IConsumer)
768-    def __init__(self, target, key, _log_msg_id=None):
769-        precondition(IDownloadTarget.providedBy(target), target)
770-        self.target = target
771-        self._decryptor = AES(key)
772-        prefix = str(target)
773-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
774-    # methods to satisfy the IConsumer interface
775-    def registerProducer(self, producer, streaming):
776-        if IConsumer.providedBy(self.target):
777-            self.target.registerProducer(producer, streaming)
778-    def unregisterProducer(self):
779-        if IConsumer.providedBy(self.target):
780-            self.target.unregisterProducer()
781-    def write(self, ciphertext):
782-        plaintext = self._decryptor.process(ciphertext)
783-        self.target.write(plaintext)
784-    def open(self, size):
785-        self.target.open(size)
786-    def close(self):
787-        self.target.close()
788-    def finish(self):
789-        return self.target.finish()
790-    # The following methods is just to pass through to the next target, and
791-    # just because that target might be a repairer.DownUpConnector, and just
792-    # because the current CHKUpload object expects to find the storage index
793-    # in its Uploadable.
794-    def set_storageindex(self, storageindex):
795-        self.target.set_storageindex(storageindex)
796-    def set_encodingparams(self, encodingparams):
797-        self.target.set_encodingparams(encodingparams)
798-
799-class ValidatedThingObtainer:
800-    def __init__(self, validatedthingproxies, debugname, log_id):
801-        self._validatedthingproxies = validatedthingproxies
802-        self._debugname = debugname
803-        self._log_id = log_id
804-
805-    def _bad(self, f, validatedthingproxy):
806-        f.trap(RemoteException, DeadReferenceError,
807-               IntegrityCheckReject, layout.LayoutInvalid,
808-               layout.ShareVersionIncompatible)
809-        level = log.WEIRD
810-        if f.check(DeadReferenceError):
811-            level = log.UNUSUAL
812-        elif f.check(RemoteException):
813-            level = log.WEIRD
814-        else:
815-            level = log.SCARY
816-        log.msg(parent=self._log_id, facility="tahoe.immutable.download",
817-                format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
818-                op=self._debugname, validatedthingproxy=str(validatedthingproxy),
819-                failure=f, level=level, umid="JGXxBA")
820-        if not self._validatedthingproxies:
821-            raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
822-        # try again with a different one
823-        d = self._try_the_next_one()
824-        return d
825-
826-    def _try_the_next_one(self):
827-        vtp = self._validatedthingproxies.pop(0)
828-        # start() obtains, validates, and callsback-with the thing or else
829-        # errbacks
830-        d = vtp.start()
831-        d.addErrback(self._bad, vtp)
832-        return d
833-
834-    def start(self):
835-        return self._try_the_next_one()
836-
837-class ValidatedCrypttextHashTreeProxy:
838-    implements(IValidatedThingProxy)
839-    """ I am a front-end for a remote crypttext hash tree using a local
840-    ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
841-    Validated Thing protocol (i.e., I have a start() method that fires with
842-    self once I get a valid one)."""
843-    def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
844-                 fetch_failures=None):
845-        # fetch_failures is for debugging -- see test_encode.py
846-        self._readbucketproxy = readbucketproxy
847-        self._num_segments = num_segments
848-        self._fetch_failures = fetch_failures
849-        self._crypttext_hash_tree = crypttext_hash_tree
850-
851-    def _validate(self, proposal):
852-        ct_hashes = dict(list(enumerate(proposal)))
853-        try:
854-            self._crypttext_hash_tree.set_hashes(ct_hashes)
855-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
856-            if self._fetch_failures is not None:
857-                self._fetch_failures["crypttext_hash_tree"] += 1
858-            raise BadOrMissingHash(le)
859-        # If we now have enough of the crypttext hash tree to integrity-check
860-        # *any* segment of ciphertext, then we are done. TODO: It would have
861-        # better alacrity if we downloaded only part of the crypttext hash
862-        # tree at a time.
863-        for segnum in range(self._num_segments):
864-            if self._crypttext_hash_tree.needed_hashes(segnum):
865-                raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
866-        return self
867-
868-    def start(self):
869-        d = self._readbucketproxy.get_crypttext_hashes()
870-        d.addCallback(self._validate)
871-        return d
872-
873-class ValidatedExtendedURIProxy:
874-    implements(IValidatedThingProxy)
875-    """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
876-    responsible for retrieving and validating the elements from the UEB."""
877-
878-    def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
879-        # fetch_failures is for debugging -- see test_encode.py
880-        self._fetch_failures = fetch_failures
881-        self._readbucketproxy = readbucketproxy
882-        precondition(IVerifierURI.providedBy(verifycap), verifycap)
883-        self._verifycap = verifycap
884-
885-        # required
886-        self.segment_size = None
887-        self.crypttext_root_hash = None
888-        self.share_root_hash = None
889-
890-        # computed
891-        self.block_size = None
892-        self.share_size = None
893-        self.num_segments = None
894-        self.tail_data_size = None
895-        self.tail_segment_size = None
896-
897-        # optional
898-        self.crypttext_hash = None
899-
900-    def __str__(self):
901-        return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
902-
903-    def _check_integrity(self, data):
904-        h = hashutil.uri_extension_hash(data)
905-        if h != self._verifycap.uri_extension_hash:
906-            msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
907-                   (self._readbucketproxy,
908-                    base32.b2a(self._verifycap.uri_extension_hash),
909-                    base32.b2a(h)))
910-            if self._fetch_failures is not None:
911-                self._fetch_failures["uri_extension"] += 1
912-            raise BadURIExtensionHashValue(msg)
913-        else:
914-            return data
915-
916-    def _parse_and_validate(self, data):
917-        self.share_size = mathutil.div_ceil(self._verifycap.size,
918-                                            self._verifycap.needed_shares)
919-
920-        d = uri.unpack_extension(data)
921-
922-        # There are several kinds of things that can be found in a UEB.
923-        # First, things that we really need to learn from the UEB in order to
924-        # do this download. Next: things which are optional but not redundant
925-        # -- if they are present in the UEB they will get used. Next, things
926-        # that are optional and redundant. These things are required to be
927-        # consistent: they don't have to be in the UEB, but if they are in
928-        # the UEB then they will be checked for consistency with the
929-        # already-known facts, and if they are inconsistent then an exception
930-        # will be raised. These things aren't actually used -- they are just
931-        # tested for consistency and ignored. Finally: things which are
932-        # deprecated -- they ought not be in the UEB at all, and if they are
933-        # present then a warning will be logged but they are otherwise
934-        # ignored.
935-
936-        # First, things that we really need to learn from the UEB:
937-        # segment_size, crypttext_root_hash, and share_root_hash.
938-        self.segment_size = d['segment_size']
939-
940-        self.block_size = mathutil.div_ceil(self.segment_size,
941-                                            self._verifycap.needed_shares)
942-        self.num_segments = mathutil.div_ceil(self._verifycap.size,
943-                                              self.segment_size)
944-
945-        self.tail_data_size = self._verifycap.size % self.segment_size
946-        if not self.tail_data_size:
947-            self.tail_data_size = self.segment_size
948-        # padding for erasure code
949-        self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
950-                                                        self._verifycap.needed_shares)
951-
952-        # Ciphertext hash tree root is mandatory, so that there is at most
953-        # one ciphertext that matches this read-cap or verify-cap. The
954-        # integrity check on the shares is not sufficient to prevent the
955-        # original encoder from creating some shares of file A and other
956-        # shares of file B.
957-        self.crypttext_root_hash = d['crypttext_root_hash']
958-
959-        self.share_root_hash = d['share_root_hash']
960-
961-
962-        # Next: things that are optional and not redundant: crypttext_hash
963-        if d.has_key('crypttext_hash'):
964-            self.crypttext_hash = d['crypttext_hash']
965-            if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
966-                raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
967-
968-
969-        # Next: things that are optional, redundant, and required to be
970-        # consistent: codec_name, codec_params, tail_codec_params,
971-        # num_segments, size, needed_shares, total_shares
972-        if d.has_key('codec_name'):
973-            if d['codec_name'] != "crs":
974-                raise UnsupportedErasureCodec(d['codec_name'])
975-
976-        if d.has_key('codec_params'):
977-            ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
978-            if ucpss != self.segment_size:
979-                raise BadURIExtension("inconsistent erasure code params: "
980-                                      "ucpss: %s != self.segment_size: %s" %
981-                                      (ucpss, self.segment_size))
982-            if ucpns != self._verifycap.needed_shares:
983-                raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
984-                                      "self._verifycap.needed_shares: %s" %
985-                                      (ucpns, self._verifycap.needed_shares))
986-            if ucpts != self._verifycap.total_shares:
987-                raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
988-                                      "self._verifycap.total_shares: %s" %
989-                                      (ucpts, self._verifycap.total_shares))
990-
991-        if d.has_key('tail_codec_params'):
992-            utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
993-            if utcpss != self.tail_segment_size:
994-                raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
995-                                      "self.tail_segment_size: %s, self._verifycap.size: %s, "
996-                                      "self.segment_size: %s, self._verifycap.needed_shares: %s"
997-                                      % (utcpss, self.tail_segment_size, self._verifycap.size,
998-                                         self.segment_size, self._verifycap.needed_shares))
999-            if utcpns != self._verifycap.needed_shares:
1000-                raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
1001-                                      "self._verifycap.needed_shares: %s" % (utcpns,
1002-                                                                             self._verifycap.needed_shares))
1003-            if utcpts != self._verifycap.total_shares:
1004-                raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
1005-                                      "self._verifycap.total_shares: %s" % (utcpts,
1006-                                                                            self._verifycap.total_shares))
1007-
1008-        if d.has_key('num_segments'):
1009-            if d['num_segments'] != self.num_segments:
1010-                raise BadURIExtension("inconsistent num_segments: size: %s, "
1011-                                      "segment_size: %s, computed_num_segments: %s, "
1012-                                      "ueb_num_segments: %s" % (self._verifycap.size,
1013-                                                                self.segment_size,
1014-                                                                self.num_segments, d['num_segments']))
1015-
1016-        if d.has_key('size'):
1017-            if d['size'] != self._verifycap.size:
1018-                raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
1019-                                      (self._verifycap.size, d['size']))
1020-
1021-        if d.has_key('needed_shares'):
1022-            if d['needed_shares'] != self._verifycap.needed_shares:
1023-                raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
1024-                                      "needed shares: %s" % (self._verifycap.total_shares,
1025-                                                             d['needed_shares']))
1026-
1027-        if d.has_key('total_shares'):
1028-            if d['total_shares'] != self._verifycap.total_shares:
1029-                raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
1030-                                      "total shares: %s" % (self._verifycap.total_shares,
1031-                                                            d['total_shares']))
1032-
1033-        # Finally, things that are deprecated and ignored: plaintext_hash,
1034-        # plaintext_root_hash
1035-        if d.get('plaintext_hash'):
1036-            log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
1037-                    "and is no longer used.  Ignoring.  %s" % (self,))
1038-        if d.get('plaintext_root_hash'):
1039-            log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
1040-                    "reasons and is no longer used.  Ignoring.  %s" % (self,))
1041-
1042-        return self
1043-
1044-    def start(self):
1045-        """Fetch the UEB from bucket, compare its hash to the hash from
1046-        verifycap, then parse it. Returns a deferred which is called back
1047-        with self once the fetch is successful, or is erred back if it
1048-        fails."""
1049-        d = self._readbucketproxy.get_uri_extension()
1050-        d.addCallback(self._check_integrity)
1051-        d.addCallback(self._parse_and_validate)
1052-        return d
1053-
1054-class ValidatedReadBucketProxy(log.PrefixingLogMixin):
1055-    """I am a front-end for a remote storage bucket, responsible for
1056-    retrieving and validating data from that bucket.
1057-
1058-    My get_block() method is used by BlockDownloaders.
1059-    """
1060-
1061-    def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
1062-                 block_size, share_size):
1063-        """ share_hash_tree is required to have already been initialized with
1064-        the root hash (the number-0 hash), using the share_root_hash from the
1065-        UEB"""
1066-        precondition(share_hash_tree[0] is not None, share_hash_tree)
1067-        prefix = "%d-%s-%s" % (sharenum, bucket,
1068-                               base32.b2a_l(share_hash_tree[0][:8], 60))
1069-        log.PrefixingLogMixin.__init__(self,
1070-                                       facility="tahoe.immutable.download",
1071-                                       prefix=prefix)
1072-        self.sharenum = sharenum
1073-        self.bucket = bucket
1074-        self.share_hash_tree = share_hash_tree
1075-        self.num_blocks = num_blocks
1076-        self.block_size = block_size
1077-        self.share_size = share_size
1078-        self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
1079-
1080-    def get_all_sharehashes(self):
1081-        """Retrieve and validate all the share-hash-tree nodes that are
1082-        included in this share, regardless of whether we need them to
1083-        validate the share or not. Each share contains a minimal Merkle tree
1084-        chain, but there is lots of overlap, so usually we'll be using hashes
1085-        from other shares and not reading every single hash from this share.
1086-        The Verifier uses this function to read and validate every single
1087-        hash from this share.
1088-
1089-        Call this (and wait for the Deferred it returns to fire) before
1090-        calling get_block() for the first time: this lets us check that the
1091-        share share contains enough hashes to validate its own data, and
1092-        avoids downloading any share hash twice.
1093-
1094-        I return a Deferred which errbacks upon failure, probably with
1095-        BadOrMissingHash."""
1096-
1097-        d = self.bucket.get_share_hashes()
1098-        def _got_share_hashes(sh):
1099-            sharehashes = dict(sh)
1100-            try:
1101-                self.share_hash_tree.set_hashes(sharehashes)
1102-            except IndexError, le:
1103-                raise BadOrMissingHash(le)
1104-            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
1105-                raise BadOrMissingHash(le)
1106-        d.addCallback(_got_share_hashes)
1107-        return d
1108-
1109-    def get_all_blockhashes(self):
1110-        """Retrieve and validate all the block-hash-tree nodes that are
1111-        included in this share. Each share contains a full Merkle tree, but
1112-        we usually only fetch the minimal subset necessary for any particular
1113-        block. This function fetches everything at once. The Verifier uses
1114-        this function to validate the block hash tree.
1115-
1116-        Call this (and wait for the Deferred it returns to fire) after
1117-        calling get_all_sharehashes() and before calling get_block() for the
1118-        first time: this lets us check that the share contains all block
1119-        hashes and avoids downloading them multiple times.
1120-
1121-        I return a Deferred which errbacks upon failure, probably with
1122-        BadOrMissingHash.
1123-        """
1124-
1125-        # get_block_hashes(anything) currently always returns everything
1126-        needed = list(range(len(self.block_hash_tree)))
1127-        d = self.bucket.get_block_hashes(needed)
1128-        def _got_block_hashes(blockhashes):
1129-            if len(blockhashes) < len(self.block_hash_tree):
1130-                raise BadOrMissingHash()
1131-            bh = dict(enumerate(blockhashes))
1132-
1133-            try:
1134-                self.block_hash_tree.set_hashes(bh)
1135-            except IndexError, le:
1136-                raise BadOrMissingHash(le)
1137-            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
1138-                raise BadOrMissingHash(le)
1139-        d.addCallback(_got_block_hashes)
1140-        return d
1141-
1142-    def get_all_crypttext_hashes(self, crypttext_hash_tree):
1143-        """Retrieve and validate all the crypttext-hash-tree nodes that are
1144-        in this share. Normally we don't look at these at all: the download
1145-        process fetches them incrementally as needed to validate each segment
1146-        of ciphertext. But this is a convenient place to give the Verifier a
1147-        function to validate all of these at once.
1148-
1149-        Call this with a new hashtree object for each share, initialized with
1150-        the crypttext hash tree root. I return a Deferred which errbacks upon
1151-        failure, probably with BadOrMissingHash.
1152-        """
1153-
1154-        # get_crypttext_hashes() always returns everything
1155-        d = self.bucket.get_crypttext_hashes()
1156-        def _got_crypttext_hashes(hashes):
1157-            if len(hashes) < len(crypttext_hash_tree):
1158-                raise BadOrMissingHash()
1159-            ct_hashes = dict(enumerate(hashes))
1160-            try:
1161-                crypttext_hash_tree.set_hashes(ct_hashes)
1162-            except IndexError, le:
1163-                raise BadOrMissingHash(le)
1164-            except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
1165-                raise BadOrMissingHash(le)
1166-        d.addCallback(_got_crypttext_hashes)
1167-        return d
1168-
1169-    def get_block(self, blocknum):
1170-        # the first time we use this bucket, we need to fetch enough elements
1171-        # of the share hash tree to validate it from our share hash up to the
1172-        # hashroot.
1173-        if self.share_hash_tree.needed_hashes(self.sharenum):
1174-            d1 = self.bucket.get_share_hashes()
1175-        else:
1176-            d1 = defer.succeed([])
1177-
1178-        # We might need to grab some elements of our block hash tree, to
1179-        # validate the requested block up to the share hash.
1180-        blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
1181-        # We don't need the root of the block hash tree, as that comes in the
1182-        # share tree.
1183-        blockhashesneeded.discard(0)
1184-        d2 = self.bucket.get_block_hashes(blockhashesneeded)
1185-
1186-        if blocknum < self.num_blocks-1:
1187-            thisblocksize = self.block_size
1188-        else:
1189-            thisblocksize = self.share_size % self.block_size
1190-            if thisblocksize == 0:
1191-                thisblocksize = self.block_size
1192-        d3 = self.bucket.get_block_data(blocknum,
1193-                                        self.block_size, thisblocksize)
1194-
1195-        dl = deferredutil.gatherResults([d1, d2, d3])
1196-        dl.addCallback(self._got_data, blocknum)
1197-        return dl
1198-
1199-    def _got_data(self, results, blocknum):
1200-        precondition(blocknum < self.num_blocks,
1201-                     self, blocknum, self.num_blocks)
1202-        sharehashes, blockhashes, blockdata = results
1203-        try:
1204-            sharehashes = dict(sharehashes)
1205-        except ValueError, le:
1206-            le.args = tuple(le.args + (sharehashes,))
1207-            raise
1208-        blockhashes = dict(enumerate(blockhashes))
1209-
1210-        candidate_share_hash = None # in case we log it in the except block below
1211-        blockhash = None # in case we log it in the except block below
1212-
1213-        try:
1214-            if self.share_hash_tree.needed_hashes(self.sharenum):
1215-                # This will raise exception if the values being passed do not
1216-                # match the root node of self.share_hash_tree.
1217-                try:
1218-                    self.share_hash_tree.set_hashes(sharehashes)
1219-                except IndexError, le:
1220-                    # Weird -- sharehashes contained index numbers outside of
1221-                    # the range that fit into this hash tree.
1222-                    raise BadOrMissingHash(le)
1223-
1224-            # To validate a block we need the root of the block hash tree,
1225-            # which is also one of the leafs of the share hash tree, and is
1226-            # called "the share hash".
1227-            if not self.block_hash_tree[0]: # empty -- no root node yet
1228-                # Get the share hash from the share hash tree.
1229-                share_hash = self.share_hash_tree.get_leaf(self.sharenum)
1230-                if not share_hash:
1231-                    # No root node in block_hash_tree and also the share hash
1232-                    # wasn't sent by the server.
1233-                    raise hashtree.NotEnoughHashesError
1234-                self.block_hash_tree.set_hashes({0: share_hash})
1235-
1236-            if self.block_hash_tree.needed_hashes(blocknum):
1237-                self.block_hash_tree.set_hashes(blockhashes)
1238-
1239-            blockhash = hashutil.block_hash(blockdata)
1240-            self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
1241-            #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
1242-            #        "%r .. %r: %s" %
1243-            #        (self.sharenum, blocknum, len(blockdata),
1244-            #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
1245-
1246-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
1247-            # log.WEIRD: indicates undetected disk/network error, or more
1248-            # likely a programming error
1249-            self.log("hash failure in block=%d, shnum=%d on %s" %
1250-                    (blocknum, self.sharenum, self.bucket))
1251-            if self.block_hash_tree.needed_hashes(blocknum):
1252-                self.log(""" failure occurred when checking the block_hash_tree.
1253-                This suggests that either the block data was bad, or that the
1254-                block hashes we received along with it were bad.""")
1255-            else:
1256-                self.log(""" the failure probably occurred when checking the
1257-                share_hash_tree, which suggests that the share hashes we
1258-                received from the remote peer were bad.""")
1259-            self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
1260-            self.log(" block length: %d" % len(blockdata))
1261-            self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
1262-            if len(blockdata) < 100:
1263-                self.log(" block data: %r" % (blockdata,))
1264-            else:
1265-                self.log(" block data start/end: %r .. %r" %
1266-                        (blockdata[:50], blockdata[-50:]))
1267-            self.log(" share hash tree:\n" + self.share_hash_tree.dump())
1268-            self.log(" block hash tree:\n" + self.block_hash_tree.dump())
1269-            lines = []
1270-            for i,h in sorted(sharehashes.items()):
1271-                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
1272-            self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
1273-            lines = []
1274-            for i,h in blockhashes.items():
1275-                lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
1276-            log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
1277-            raise BadOrMissingHash(le)
1278-
1279-        # If we made it here, the block is good. If the hash trees didn't
1280-        # like what they saw, they would have raised a BadHashError, causing
1281-        # our caller to see a Failure and thus ignore this block (as well as
1282-        # dropping this bucket).
1283-        return blockdata
1284-
1285-
1286-
1287-class BlockDownloader(log.PrefixingLogMixin):
1288-    """I am responsible for downloading a single block (from a single bucket)
1289-    for a single segment.
1290-
1291-    I am a child of the SegmentDownloader.
1292-    """
1293-
1294-    def __init__(self, vbucket, blocknum, parent, results):
1295-        precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
1296-        prefix = "%s-%d" % (vbucket, blocknum)
1297-        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
1298-        self.vbucket = vbucket
1299-        self.blocknum = blocknum
1300-        self.parent = parent
1301-        self.results = results
1302-
1303-    def start(self, segnum):
1304-        self.log("get_block(segnum=%d)" % segnum)
1305-        started = time.time()
1306-        d = self.vbucket.get_block(segnum)
1307-        d.addCallbacks(self._hold_block, self._got_block_error,
1308-                       callbackArgs=(started,))
1309-        return d
1310-
1311-    def _hold_block(self, data, started):
1312-        if self.results:
1313-            elapsed = time.time() - started
1314-            peerid = self.vbucket.bucket.get_peerid()
1315-            if peerid not in self.results.timings["fetch_per_server"]:
1316-                self.results.timings["fetch_per_server"][peerid] = []
1317-            self.results.timings["fetch_per_server"][peerid].append(elapsed)
1318-        self.log("got block")
1319-        self.parent.hold_block(self.blocknum, data)
1320-
1321-    def _got_block_error(self, f):
1322-        f.trap(RemoteException, DeadReferenceError,
1323-               IntegrityCheckReject, layout.LayoutInvalid,
1324-               layout.ShareVersionIncompatible)
1325-        if f.check(RemoteException, DeadReferenceError):
1326-            level = log.UNUSUAL
1327-        else:
1328-            level = log.WEIRD
1329-        self.log("failure to get block", level=level, umid="5Z4uHQ")
1330-        if self.results:
1331-            peerid = self.vbucket.bucket.get_peerid()
1332-            self.results.server_problems[peerid] = str(f)
1333-        self.parent.bucket_failed(self.vbucket)
1334-
1335-class SegmentDownloader:
1336-    """I am responsible for downloading all the blocks for a single segment
1337-    of data.
1338-
1339-    I am a child of the CiphertextDownloader.
1340-    """
1341-
1342-    def __init__(self, parent, segmentnumber, needed_shares, results):
1343-        self.parent = parent
1344-        self.segmentnumber = segmentnumber
1345-        self.needed_blocks = needed_shares
1346-        self.blocks = {} # k: blocknum, v: data
1347-        self.results = results
1348-        self._log_number = self.parent.log("starting segment %d" %
1349-                                           segmentnumber)
1350-
1351-    def log(self, *args, **kwargs):
1352-        if "parent" not in kwargs:
1353-            kwargs["parent"] = self._log_number
1354-        return self.parent.log(*args, **kwargs)
1355-
1356-    def start(self):
1357-        return self._download()
1358-
1359-    def _download(self):
1360-        d = self._try()
1361-        def _done(res):
1362-            if len(self.blocks) >= self.needed_blocks:
1363-                # we only need self.needed_blocks blocks
1364-                # we want to get the smallest blockids, because they are
1365-                # more likely to be fast "primary blocks"
1366-                blockids = sorted(self.blocks.keys())[:self.needed_blocks]
1367-                blocks = []
1368-                for blocknum in blockids:
1369-                    blocks.append(self.blocks[blocknum])
1370-                return (blocks, blockids)
1371-            else:
1372-                return self._download()
1373-        d.addCallback(_done)
1374-        return d
1375-
1376-    def _try(self):
1377-        # fill our set of active buckets, maybe raising NotEnoughSharesError
1378-        active_buckets = self.parent._activate_enough_buckets()
1379-        # Now we have enough buckets, in self.parent.active_buckets.
1380-
1381-        # in test cases, bd.start might mutate active_buckets right away, so
1382-        # we need to put off calling start() until we've iterated all the way
1383-        # through it.
1384-        downloaders = []
1385-        for blocknum, vbucket in active_buckets.iteritems():
1386-            assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
1387-            bd = BlockDownloader(vbucket, blocknum, self, self.results)
1388-            downloaders.append(bd)
1389-            if self.results:
1390-                self.results.servers_used.add(vbucket.bucket.get_peerid())
1391-        l = [bd.start(self.segmentnumber) for bd in downloaders]
1392-        return defer.DeferredList(l, fireOnOneErrback=True)
1393-
1394-    def hold_block(self, blocknum, data):
1395-        self.blocks[blocknum] = data
1396-
1397-    def bucket_failed(self, vbucket):
1398-        self.parent.bucket_failed(vbucket)
1399-
1400-class DownloadStatus:
1401-    implements(IDownloadStatus)
1402-    statusid_counter = itertools.count(0)
1403-
1404-    def __init__(self):
1405-        self.storage_index = None
1406-        self.size = None
1407-        self.helper = False
1408-        self.status = "Not started"
1409-        self.progress = 0.0
1410-        self.paused = False
1411-        self.stopped = False
1412-        self.active = True
1413-        self.results = None
1414-        self.counter = self.statusid_counter.next()
1415-        self.started = time.time()
1416-
1417-    def get_started(self):
1418-        return self.started
1419-    def get_storage_index(self):
1420-        return self.storage_index
1421-    def get_size(self):
1422-        return self.size
1423-    def using_helper(self):
1424-        return self.helper
1425-    def get_status(self):
1426-        status = self.status
1427-        if self.paused:
1428-            status += " (output paused)"
1429-        if self.stopped:
1430-            status += " (output stopped)"
1431-        return status
1432-    def get_progress(self):
1433-        return self.progress
1434-    def get_active(self):
1435-        return self.active
1436-    def get_results(self):
1437-        return self.results
1438-    def get_counter(self):
1439-        return self.counter
1440-
1441-    def set_storage_index(self, si):
1442-        self.storage_index = si
1443-    def set_size(self, size):
1444-        self.size = size
1445-    def set_helper(self, helper):
1446-        self.helper = helper
1447-    def set_status(self, status):
1448-        self.status = status
1449-    def set_paused(self, paused):
1450-        self.paused = paused
1451-    def set_stopped(self, stopped):
1452-        self.stopped = stopped
1453-    def set_progress(self, value):
1454-        self.progress = value
1455-    def set_active(self, value):
1456-        self.active = value
1457-    def set_results(self, value):
1458-        self.results = value
1459-
1460-class CiphertextDownloader(log.PrefixingLogMixin):
1461-    """ I download shares, check their integrity, then decode them, check the
1462-    integrity of the resulting ciphertext, then and write it to my target.
1463-    Before I send any new request to a server, I always ask the 'monitor'
1464-    object that was passed into my constructor whether this task has been
1465-    cancelled (by invoking its raise_if_cancelled() method)."""
1466-    implements(IPushProducer)
1467-    _status = None
1468-
1469-    def __init__(self, storage_broker, v, target, monitor):
1470-
1471-        precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
1472-        precondition(IVerifierURI.providedBy(v), v)
1473-        precondition(IDownloadTarget.providedBy(target), target)
1474-
1475-        self._storage_broker = storage_broker
1476-        self._verifycap = v
1477-        self._storage_index = v.get_storage_index()
1478-        self._uri_extension_hash = v.uri_extension_hash
1479-
1480-        prefix=base32.b2a_l(self._storage_index[:8], 60)
1481-        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
1482-
1483-        self._started = time.time()
1484-        self._status = s = DownloadStatus()
1485-        s.set_status("Starting")
1486-        s.set_storage_index(self._storage_index)
1487-        s.set_size(self._verifycap.size)
1488-        s.set_helper(False)
1489-        s.set_active(True)
1490-
1491-        self._results = DownloadResults()
1492-        s.set_results(self._results)
1493-        self._results.file_size = self._verifycap.size
1494-        self._results.timings["servers_peer_selection"] = {}
1495-        self._results.timings["fetch_per_server"] = {}
1496-        self._results.timings["cumulative_fetch"] = 0.0
1497-        self._results.timings["cumulative_decode"] = 0.0
1498-        self._results.timings["cumulative_decrypt"] = 0.0
1499-        self._results.timings["paused"] = 0.0
1500-
1501-        self._paused = False
1502-        self._stopped = False
1503-        if IConsumer.providedBy(target):
1504-            target.registerProducer(self, True)
1505-        self._target = target
1506-        # Repairer (uploader) needs the storageindex.
1507-        self._target.set_storageindex(self._storage_index)
1508-        self._monitor = monitor
1509-        self._opened = False
1510-
1511-        self.active_buckets = {} # k: shnum, v: bucket
1512-        self._share_buckets = {} # k: sharenum, v: list of buckets
1513-
1514-        # _download_all_segments() will set this to:
1515-        # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
1516-        self._share_vbuckets = None
1517-
1518-        self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
1519-
1520-        self._ciphertext_hasher = hashutil.crypttext_hasher()
1521-
1522-        self._bytes_done = 0
1523-        self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1524-
1525-        # _got_uri_extension() will create the following:
1526-        # self._crypttext_hash_tree
1527-        # self._share_hash_tree
1528-        # self._current_segnum = 0
1529-        # self._vup # ValidatedExtendedURIProxy
1530-
1531-        # _get_all_shareholders() will create the following:
1532-        # self._total_queries
1533-        # self._responses_received = 0
1534-        # self._queries_failed = 0
1535-
1536-        # This is solely for the use of unit tests. It will be triggered when
1537-        # we start downloading shares.
1538-        self._stage_4_d = defer.Deferred()
1539-
1540-    def pauseProducing(self):
1541-        if self._paused:
1542-            return
1543-        self._paused = defer.Deferred()
1544-        self._paused_at = time.time()
1545-        if self._status:
1546-            self._status.set_paused(True)
1547-
1548-    def resumeProducing(self):
1549-        if self._paused:
1550-            paused_for = time.time() - self._paused_at
1551-            self._results.timings['paused'] += paused_for
1552-            p = self._paused
1553-            self._paused = None
1554-            eventually(p.callback, None)
1555-            if self._status:
1556-                self._status.set_paused(False)
1557-
1558-    def stopProducing(self):
1559-        self.log("Download.stopProducing")
1560-        self._stopped = True
1561-        self.resumeProducing()
1562-        if self._status:
1563-            self._status.set_stopped(True)
1564-            self._status.set_active(False)
1565-
1566-    def start(self):
1567-        self.log("starting download")
1568-
1569-        # first step: who should we download from?
1570-        d = defer.maybeDeferred(self._get_all_shareholders)
1571-        d.addBoth(self._got_all_shareholders)
1572-        # now get the uri_extension block from somebody and integrity check
1573-        # it and parse and validate its contents
1574-        d.addCallback(self._obtain_uri_extension)
1575-        d.addCallback(self._get_crypttext_hash_tree)
1576-        # once we know that, we can download blocks from everybody
1577-        d.addCallback(self._download_all_segments)
1578-        def _finished(res):
1579-            if self._status:
1580-                self._status.set_status("Finished")
1581-                self._status.set_active(False)
1582-                self._status.set_paused(False)
1583-            if IConsumer.providedBy(self._target):
1584-                self._target.unregisterProducer()
1585-            return res
1586-        d.addBoth(_finished)
1587-        def _failed(why):
1588-            if self._status:
1589-                self._status.set_status("Failed")
1590-                self._status.set_active(False)
1591-            if why.check(DownloadStopped):
1592-                # DownloadStopped just means the consumer aborted the
1593-                # download; not so scary.
1594-                self.log("download stopped", level=log.UNUSUAL)
1595-            else:
1596-                # This is really unusual, and deserves maximum forensics.
1597-                self.log("download failed!", failure=why, level=log.SCARY,
1598-                         umid="lp1vaQ")
1599-            return why
1600-        d.addErrback(_failed)
1601-        d.addCallback(self._done)
1602-        return d
1603-
1604-    def _get_all_shareholders(self):
1605-        """ Once the number of buckets that I know about is >= K then I
1606-        callback the Deferred that I return.
1607-
1608-        If all of the get_buckets deferreds have fired (whether callback
1609-        or errback) and I still don't have enough buckets then I'll also
1610-        callback -- not errback -- the Deferred that I return.
1611-        """
1612-        wait_for_enough_buckets_d = defer.Deferred()
1613-        self._wait_for_enough_buckets_d = wait_for_enough_buckets_d
1614-
1615-        sb = self._storage_broker
1616-        servers = sb.get_servers_for_index(self._storage_index)
1617-        if not servers:
1618-            raise NoServersError("broker gave us no servers!")
1619-
1620-        self._total_queries = len(servers)
1621-        self._responses_received = 0
1622-        self._queries_failed = 0
1623-        for (peerid,ss) in servers:
1624-            self.log(format="sending DYHB to [%(peerid)s]",
1625-                     peerid=idlib.shortnodeid_b2a(peerid),
1626-                     level=log.NOISY, umid="rT03hg")
1627-            d = ss.callRemote("get_buckets", self._storage_index)
1628-            d.addCallbacks(self._got_response, self._got_error,
1629-                           callbackArgs=(peerid,))
1630-            d.addBoth(self._check_got_all_responses)
1631-
1632-        if self._status:
1633-            self._status.set_status("Locating Shares (%d/%d)" %
1634-                                    (self._responses_received,
1635-                                     self._total_queries))
1636-        return wait_for_enough_buckets_d
1637-
1638-    def _check_got_all_responses(self, ignored=None):
1639-        assert (self._responses_received+self._queries_failed) <= self._total_queries
1640-        if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries:
1641-            reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False)
1642-            self._wait_for_enough_buckets_d = None
1643-
1644-    def _got_response(self, buckets, peerid):
1645-        # Note that this can continue to receive responses after _wait_for_enough_buckets_d
1646-        # has fired.
1647-        self._responses_received += 1
1648-        self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
1649-                 peerid=idlib.shortnodeid_b2a(peerid),
1650-                 shnums=sorted(buckets.keys()),
1651-                 level=log.NOISY, umid="o4uwFg")
1652-        if self._results:
1653-            elapsed = time.time() - self._started
1654-            self._results.timings["servers_peer_selection"][peerid] = elapsed
1655-        if self._status:
1656-            self._status.set_status("Locating Shares (%d/%d)" %
1657-                                    (self._responses_received,
1658-                                     self._total_queries))
1659-        for sharenum, bucket in buckets.iteritems():
1660-            b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
1661-            self.add_share_bucket(sharenum, b)
1662-            # If we just got enough buckets for the first time, then fire the
1663-            # deferred. Then remove it from self so that we don't fire it
1664-            # again.
1665-            if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
1666-                reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
1667-                self._wait_for_enough_buckets_d = None
1668-
1669-            if self._share_vbuckets is not None:
1670-                vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
1671-                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
1672-
1673-            if self._results:
1674-                if peerid not in self._results.servermap:
1675-                    self._results.servermap[peerid] = set()
1676-                self._results.servermap[peerid].add(sharenum)
1677-
1678-    def add_share_bucket(self, sharenum, bucket):
1679-        # this is split out for the benefit of test_encode.py
1680-        self._share_buckets.setdefault(sharenum, []).append(bucket)
1681-
1682-    def _got_error(self, f):
1683-        self._queries_failed += 1
1684-        level = log.WEIRD
1685-        if f.check(DeadReferenceError):
1686-            level = log.UNUSUAL
1687-        self.log("Error during get_buckets", failure=f, level=level,
1688-                         umid="3uuBUQ")
1689-
1690-    def bucket_failed(self, vbucket):
1691-        shnum = vbucket.sharenum
1692-        del self.active_buckets[shnum]
1693-        s = self._share_vbuckets[shnum]
1694-        # s is a set of ValidatedReadBucketProxy instances
1695-        s.remove(vbucket)
1696-        # ... which might now be empty
1697-        if not s:
1698-            # there are no more buckets which can provide this share, so
1699-            # remove the key. This may prompt us to use a different share.
1700-            del self._share_vbuckets[shnum]
1701-
1702-    def _got_all_shareholders(self, res):
1703-        if self._results:
1704-            now = time.time()
1705-            self._results.timings["peer_selection"] = now - self._started
1706-
1707-        if len(self._share_buckets) < self._verifycap.needed_shares:
1708-            msg = "Failed to get enough shareholders: have %d, need %d" \
1709-                  % (len(self._share_buckets), self._verifycap.needed_shares)
1710-            if self._share_buckets:
1711-                raise NotEnoughSharesError(msg)
1712-            else:
1713-                raise NoSharesError(msg)
1714-
1715-        #for s in self._share_vbuckets.values():
1716-        #    for vb in s:
1717-        #        assert isinstance(vb, ValidatedReadBucketProxy), \
1718-        #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
1719-
1720-    def _obtain_uri_extension(self, ignored):
1721-        # all shareholders are supposed to have a copy of uri_extension, and
1722-        # all are supposed to be identical. We compute the hash of the data
1723-        # that comes back, and compare it against the version in our URI. If
1724-        # they don't match, ignore their data and try someone else.
1725-        if self._status:
1726-            self._status.set_status("Obtaining URI Extension")
1727-
1728-        uri_extension_fetch_started = time.time()
1729-
1730-        vups = []
1731-        for sharenum, buckets in self._share_buckets.iteritems():
1732-            for bucket in buckets:
1733-                vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
1734-        vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
1735-        d = vto.start()
1736-
1737-        def _got_uri_extension(vup):
1738-            precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
1739-            if self._results:
1740-                elapsed = time.time() - uri_extension_fetch_started
1741-                self._results.timings["uri_extension"] = elapsed
1742-
1743-            self._vup = vup
1744-            self._codec = codec.CRSDecoder()
1745-            self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
1746-            self._tail_codec = codec.CRSDecoder()
1747-            self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
1748-
1749-            self._current_segnum = 0
1750-
1751-            self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
1752-            self._share_hash_tree.set_hashes({0: vup.share_root_hash})
1753-
1754-            self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
1755-            self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
1756-
1757-            # Repairer (uploader) needs the encodingparams.
1758-            self._target.set_encodingparams((
1759-                self._verifycap.needed_shares,
1760-                0, # see ticket #778 for why this is
1761-                self._verifycap.total_shares,
1762-                self._vup.segment_size
1763-                ))
1764-        d.addCallback(_got_uri_extension)
1765-        return d
1766-
1767-    def _get_crypttext_hash_tree(self, res):
1768-        vchtps = []
1769-        for sharenum, buckets in self._share_buckets.iteritems():
1770-            for bucket in buckets:
1771-                vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
1772-                vchtps.append(vchtp)
1773-
1774-        _get_crypttext_hash_tree_started = time.time()
1775-        if self._status:
1776-            self._status.set_status("Retrieving crypttext hash tree")
1777-
1778-        vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
1779-                                     log_id=self._parentmsgid)
1780-        d = vto.start()
1781-
1782-        def _got_crypttext_hash_tree(res):
1783-            # Good -- the self._crypttext_hash_tree that we passed to vchtp
1784-            # is now populated with hashes.
1785-            if self._results:
1786-                elapsed = time.time() - _get_crypttext_hash_tree_started
1787-                self._results.timings["hashtrees"] = elapsed
1788-        d.addCallback(_got_crypttext_hash_tree)
1789-        return d
1790-
1791-    def _activate_enough_buckets(self):
1792-        """either return a mapping from shnum to a ValidatedReadBucketProxy
1793-        that can provide data for that share, or raise NotEnoughSharesError"""
1794-
1795-        while len(self.active_buckets) < self._verifycap.needed_shares:
1796-            # need some more
1797-            handled_shnums = set(self.active_buckets.keys())
1798-            available_shnums = set(self._share_vbuckets.keys())
1799-            potential_shnums = list(available_shnums - handled_shnums)
1800-            if len(potential_shnums) < (self._verifycap.needed_shares
1801-                                        - len(self.active_buckets)):
1802-                have = len(potential_shnums) + len(self.active_buckets)
1803-                msg = "Unable to activate enough shares: have %d, need %d" \
1804-                      % (have, self._verifycap.needed_shares)
1805-                if have:
1806-                    raise NotEnoughSharesError(msg)
1807-                else:
1808-                    raise NoSharesError(msg)
1809-            # For the next share, choose a primary share if available, else a
1810-            # randomly chosen secondary share.
1811-            potential_shnums.sort()
1812-            if potential_shnums[0] < self._verifycap.needed_shares:
1813-                shnum = potential_shnums[0]
1814-            else:
1815-                shnum = random.choice(potential_shnums)
1816-            # and a random bucket that will provide it
1817-            validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
1818-            self.active_buckets[shnum] = validated_bucket
1819-        return self.active_buckets
1820-
1821-
1822-    def _download_all_segments(self, res):
1823-        # From now on if new buckets are received then I will notice that
1824-        # self._share_vbuckets is not None and generate a vbucket for that new
1825-        # bucket and add it in to _share_vbuckets. (We had to wait because we
1826-        # didn't have self._vup and self._share_hash_tree earlier. We didn't
1827-        # need validated buckets until now -- now that we are ready to download
1828-        # shares.)
1829-        self._share_vbuckets = {}
1830-        for sharenum, buckets in self._share_buckets.iteritems():
1831-            for bucket in buckets:
1832-                vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
1833-                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
1834-
1835-        # after the above code, self._share_vbuckets contains enough
1836-        # buckets to complete the download, and some extra ones to
1837-        # tolerate some buckets dropping out or having
1838-        # errors. self._share_vbuckets is a dictionary that maps from
1839-        # shnum to a set of ValidatedBuckets, which themselves are
1840-        # wrappers around RIBucketReader references.
1841-        self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
1842-
1843-        self._started_fetching = time.time()
1844-
1845-        d = defer.succeed(None)
1846-        for segnum in range(self._vup.num_segments):
1847-            d.addCallback(self._download_segment, segnum)
1848-            # this pause, at the end of write, prevents pre-fetch from
1849-            # happening until the consumer is ready for more data.
1850-            d.addCallback(self._check_for_pause)
1851-
1852-        self._stage_4_d.callback(None)
1853-        return d
1854-
1855-    def _check_for_pause(self, res):
1856-        if self._paused:
1857-            d = defer.Deferred()
1858-            self._paused.addCallback(lambda ignored: d.callback(res))
1859-            return d
1860-        if self._stopped:
1861-            raise DownloadStopped("our Consumer called stopProducing()")
1862-        self._monitor.raise_if_cancelled()
1863-        return res
1864-
1865-    def _download_segment(self, res, segnum):
1866-        if self._status:
1867-            self._status.set_status("Downloading segment %d of %d" %
1868-                                    (segnum+1, self._vup.num_segments))
1869-        self.log("downloading seg#%d of %d (%d%%)"
1870-                 % (segnum, self._vup.num_segments,
1871-                    100.0 * segnum / self._vup.num_segments))
1872-        # memory footprint: when the SegmentDownloader finishes pulling down
1873-        # all shares, we have 1*segment_size of usage.
1874-        segmentdler = SegmentDownloader(self, segnum,
1875-                                        self._verifycap.needed_shares,
1876-                                        self._results)
1877-        started = time.time()
1878-        d = segmentdler.start()
1879-        def _finished_fetching(res):
1880-            elapsed = time.time() - started
1881-            self._results.timings["cumulative_fetch"] += elapsed
1882-            return res
1883-        if self._results:
1884-            d.addCallback(_finished_fetching)
1885-        # pause before using more memory
1886-        d.addCallback(self._check_for_pause)
1887-        # while the codec does its job, we hit 2*segment_size
1888-        def _started_decode(res):
1889-            self._started_decode = time.time()
1890-            return res
1891-        if self._results:
1892-            d.addCallback(_started_decode)
1893-        if segnum + 1 == self._vup.num_segments:
1894-            codec = self._tail_codec
1895-        else:
1896-            codec = self._codec
1897-        d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
1898-        # once the codec is done, we drop back to 1*segment_size, because
1899-        # 'shares' goes out of scope. The memory usage is all in the
1900-        # plaintext now, spread out into a bunch of tiny buffers.
1901-        def _finished_decode(res):
1902-            elapsed = time.time() - self._started_decode
1903-            self._results.timings["cumulative_decode"] += elapsed
1904-            return res
1905-        if self._results:
1906-            d.addCallback(_finished_decode)
1907-
1908-        # pause/check-for-stop just before writing, to honor stopProducing
1909-        d.addCallback(self._check_for_pause)
1910-        d.addCallback(self._got_segment)
1911-        return d
1912-
1913-    def _got_segment(self, buffers):
1914-        precondition(self._crypttext_hash_tree)
1915-        started_decrypt = time.time()
1916-        self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1917-
1918-        if self._current_segnum + 1 == self._vup.num_segments:
1919-            # This is the last segment.
1920-            # Trim off any padding added by the upload side. We never send
1921-            # empty segments. If the data was an exact multiple of the
1922-            # segment size, the last segment will be full.
1923-            tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1924-            num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1925-            # Remove buffers which don't contain any part of the tail.
1926-            del buffers[num_buffers_used:]
1927-            # Remove the past-the-tail-part of the last buffer.
1928-            tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1929-            if tail_in_last_buf == 0:
1930-                tail_in_last_buf = tail_buf_size
1931-            buffers[-1] = buffers[-1][:tail_in_last_buf]
1932-
1933-        # First compute the hash of this segment and check that it fits.
1934-        ch = hashutil.crypttext_segment_hasher()
1935-        for buffer in buffers:
1936-            self._ciphertext_hasher.update(buffer)
1937-            ch.update(buffer)
1938-        self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1939-
1940-        # Then write this segment to the target.
1941-        if not self._opened:
1942-            self._opened = True
1943-            self._target.open(self._verifycap.size)
1944-
1945-        for buffer in buffers:
1946-            self._target.write(buffer)
1947-            self._bytes_done += len(buffer)
1948-
1949-        self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1950-        self._current_segnum += 1
1951-
1952-        if self._results:
1953-            elapsed = time.time() - started_decrypt
1954-            self._results.timings["cumulative_decrypt"] += elapsed
1955-
1956-    def _done(self, res):
1957-        self.log("download done")
1958-        if self._results:
1959-            now = time.time()
1960-            self._results.timings["total"] = now - self._started
1961-            self._results.timings["segments"] = now - self._started_fetching
1962-        if self._vup.crypttext_hash:
1963-            _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1964-                    "bad crypttext_hash: computed=%s, expected=%s" %
1965-                    (base32.b2a(self._ciphertext_hasher.digest()),
1966-                     base32.b2a(self._vup.crypttext_hash)))
1967-        _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1968-        self._status.set_progress(1)
1969-        self._target.close()
1970-        return self._target.finish()
1971-    def get_download_status(self):
1972-        return self._status
1973-
1974-
1975-class ConsumerAdapter:
1976-    implements(IDownloadTarget, IConsumer)
1977-    def __init__(self, consumer):
1978-        self._consumer = consumer
1979-
1980-    def registerProducer(self, producer, streaming):
1981-        self._consumer.registerProducer(producer, streaming)
1982-    def unregisterProducer(self):
1983-        self._consumer.unregisterProducer()
1984-
1985-    def open(self, size):
1986-        pass
1987-    def write(self, data):
1988-        self._consumer.write(data)
1989-    def close(self):
1990-        pass
1991-
1992-    def fail(self, why):
1993-        pass
1994-    def register_canceller(self, cb):
1995-        pass
1996-    def finish(self):
1997-        return self._consumer
1998-    # The following methods are just because the target might be a
1999-    # repairer.DownUpConnector, and just because the current CHKUpload object
2000-    # expects to find the storage index and encoding parameters in its
2001-    # Uploadable.
2002-    def set_storageindex(self, storageindex):
2003-        pass
2004-    def set_encodingparams(self, encodingparams):
2005-        pass
2006-
2007-
2008-class Downloader:
2009-    """I am a service that allows file downloading.
2010-    """
2011-    # TODO: in fact, this service only downloads immutable files (URI:CHK:).
2012-    # It is scheduled to go away, to be replaced by filenode.download()
2013-    implements(IDownloader)
2014-
2015-    def __init__(self, storage_broker, stats_provider):
2016-        self.storage_broker = storage_broker
2017-        self.stats_provider = stats_provider
2018-        self._all_downloads = weakref.WeakKeyDictionary() # for debugging
2019-
2020-    def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
2021-        assert isinstance(u, uri.CHKFileURI)
2022-        t = IDownloadTarget(t)
2023-        assert t.write
2024-        assert t.close
2025-
2026-        if self.stats_provider:
2027-            # these counters are meant for network traffic, and don't
2028-            # include LIT files
2029-            self.stats_provider.count('downloader.files_downloaded', 1)
2030-            self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
2031-
2032-        target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
2033-        if not monitor:
2034-            monitor=Monitor()
2035-        dl = CiphertextDownloader(self.storage_broker,
2036-                                  u.get_verify_cap(), target,
2037-                                  monitor=monitor)
2038-        self._all_downloads[dl] = None
2039-        if history:
2040-            history.add_download(dl.get_download_status())
2041-        d = dl.start()
2042-        return d
2043rmfile ./src/allmydata/immutable/download.py
2044hunk ./src/allmydata/immutable/filenode.py 1
2045-import copy, os.path, stat
2046-from cStringIO import StringIO
2047+
2048+import binascii
2049+import copy
2050+import time
2051+now = time.time
2052 from zope.interface import implements
2053 from twisted.internet import defer
2054hunk ./src/allmydata/immutable/filenode.py 8
2055-from twisted.internet.interfaces import IPushProducer
2056-from twisted.protocols import basic
2057-from foolscap.api import eventually
2058-from allmydata.interfaces import IImmutableFileNode, ICheckable, \
2059-     IDownloadTarget, IUploadResults
2060-from allmydata.util import dictutil, log, base32
2061-from allmydata.uri import CHKFileURI, LiteralFileURI
2062-from allmydata.immutable.checker import Checker
2063+from twisted.internet.interfaces import IConsumer
2064+
2065+from allmydata.interfaces import IImmutableFileNode, IUploadResults
2066+from allmydata import uri
2067 from allmydata.check_results import CheckResults, CheckAndRepairResults
2068hunk ./src/allmydata/immutable/filenode.py 13
2069+from allmydata.util.dictutil import DictOfSets
2070+from pycryptopp.cipher.aes import AES
2071+
2072+# local imports
2073+from allmydata.immutable.checker import Checker
2074 from allmydata.immutable.repairer import Repairer
2075hunk ./src/allmydata/immutable/filenode.py 19
2076-from allmydata.immutable import download
2077+from allmydata.immutable.downloader.node import DownloadNode
2078+from allmydata.immutable.downloader.status import DownloadStatus
2079 
2080hunk ./src/allmydata/immutable/filenode.py 22
2081-class _ImmutableFileNodeBase(object):
2082-    implements(IImmutableFileNode, ICheckable)
2083+class CiphertextFileNode:
2084+    def __init__(self, verifycap, storage_broker, secret_holder,
2085+                 terminator, history, download_status=None):
2086+        assert isinstance(verifycap, uri.CHKFileVerifierURI)
2087+        self._verifycap = verifycap
2088+        self._storage_broker = storage_broker
2089+        self._secret_holder = secret_holder
2090+        if download_status is None:
2091+            ds = DownloadStatus(verifycap.storage_index, verifycap.size)
2092+            if history:
2093+                history.add_download(ds)
2094+            download_status = ds
2095+        self._node = DownloadNode(verifycap, storage_broker, secret_holder,
2096+                                  terminator, history, download_status)
2097 
2098hunk ./src/allmydata/immutable/filenode.py 37
2099-    def get_write_uri(self):
2100-        return None
2101+    def read(self, consumer, offset=0, size=None, read_ev=None):
2102+        """I am the main entry point, from which FileNode.read() can get
2103+        data. I feed the consumer with the desired range of ciphertext. I
2104+        return a Deferred that fires (with the consumer) when the read is
2105+        finished."""
2106+        return self._node.read(consumer, offset, size, read_ev)
2107 
2108hunk ./src/allmydata/immutable/filenode.py 44
2109-    def get_readonly_uri(self):
2110-        return self.get_uri()
2111+    def get_segment(self, segnum):
2112+        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
2113+        Deferred that fires with (offset,data) when the desired segment is
2114+        available, and c is an object on which c.cancel() can be called to
2115+        disavow interest in the segment (after which 'd' will never fire).
2116 
2117hunk ./src/allmydata/immutable/filenode.py 50
2118-    def is_mutable(self):
2119-        return False
2120+        You probably need to know the segment size before calling this,
2121+        unless you want the first few bytes of the file. If you ask for a
2122+        segment number which turns out to be too large, the Deferred will
2123+        errback with BadSegmentNumberError.
2124 
2125hunk ./src/allmydata/immutable/filenode.py 55
2126-    def is_readonly(self):
2127-        return True
2128+        The Deferred fires with the offset of the first byte of the data
2129+        segment, so that you can call get_segment() before knowing the
2130+        segment size, and still know which data you received.
2131+        """
2132+        return self._node.get_segment(segnum)
2133 
2134hunk ./src/allmydata/immutable/filenode.py 61
2135-    def is_unknown(self):
2136-        return False
2137+    def get_segment_size(self):
2138+        # return a Deferred that fires with the file's real segment size
2139+        return self._node.get_segsize()
2140 
2141hunk ./src/allmydata/immutable/filenode.py 65
2142-    def is_allowed_in_immutable_directory(self):
2143-        return True
2144+    def get_storage_index(self):
2145+        return self._verifycap.storage_index
2146+    def get_verify_cap(self):
2147+        return self._verifycap
2148+    def get_size(self):
2149+        return self._verifycap.size
2150 
2151     def raise_error(self):
2152         pass
2153hunk ./src/allmydata/immutable/filenode.py 75
2154 
2155-    def __hash__(self):
2156-        return self.u.__hash__()
2157-    def __eq__(self, other):
2158-        if isinstance(other, _ImmutableFileNodeBase):
2159-            return self.u.__eq__(other.u)
2160-        else:
2161-            return False
2162-    def __ne__(self, other):
2163-        if isinstance(other, _ImmutableFileNodeBase):
2164-            return self.u.__eq__(other.u)
2165-        else:
2166-            return True
2167-
2168-class PortionOfFile:
2169-    # like a list slice (things[2:14]), but for a file on disk
2170-    def __init__(self, fn, offset=0, size=None):
2171-        self.f = open(fn, "rb")
2172-        self.f.seek(offset)
2173-        self.bytes_left = size
2174-
2175-    def read(self, size=None):
2176-        # bytes_to_read = min(size, self.bytes_left), but None>anything
2177-        if size is None:
2178-            bytes_to_read = self.bytes_left
2179-        elif self.bytes_left is None:
2180-            bytes_to_read = size
2181-        else:
2182-            bytes_to_read = min(size, self.bytes_left)
2183-        data = self.f.read(bytes_to_read)
2184-        if self.bytes_left is not None:
2185-            self.bytes_left -= len(data)
2186-        return data
2187-
2188-class DownloadCache:
2189-    implements(IDownloadTarget)
2190-
2191-    def __init__(self, filecap, storage_index, downloader,
2192-                 cachedirectorymanager):
2193-        self._downloader = downloader
2194-        self._uri = filecap
2195-        self._storage_index = storage_index
2196-        self.milestones = set() # of (offset,size,Deferred)
2197-        self.cachedirectorymanager = cachedirectorymanager
2198-        self.cachefile = None
2199-        self.download_in_progress = False
2200-        # five states:
2201-        #  new ImmutableFileNode, no downloads ever performed
2202-        #  new ImmutableFileNode, leftover file (partial)
2203-        #  new ImmutableFileNode, leftover file (whole)
2204-        #  download in progress, not yet complete
2205-        #  download complete
2206-
2207-    def when_range_available(self, offset, size):
2208-        assert isinstance(offset, (int,long))
2209-        assert isinstance(size, (int,long))
2210-
2211-        d = defer.Deferred()
2212-        self.milestones.add( (offset,size,d) )
2213-        self._check_milestones()
2214-        if self.milestones and not self.download_in_progress:
2215-            self.download_in_progress = True
2216-            log.msg(format=("immutable filenode read [%(si)s]: " +
2217-                            "starting download"),
2218-                    si=base32.b2a(self._storage_index),
2219-                    umid="h26Heg", level=log.OPERATIONAL)
2220-            d2 = self._downloader.download(self._uri, self)
2221-            d2.addBoth(self._download_done)
2222-            d2.addErrback(self._download_failed)
2223-            d2.addErrback(log.err, umid="cQaM9g")
2224-        return d
2225-
2226-    def read(self, consumer, offset, size):
2227-        assert offset+size <= self.get_filesize()
2228-        if not self.cachefile:
2229-            self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
2230-        f = PortionOfFile(self.cachefile.get_filename(), offset, size)
2231-        d = basic.FileSender().beginFileTransfer(f, consumer)
2232-        d.addCallback(lambda lastSent: consumer)
2233-        return d
2234-
2235-    def _download_done(self, res):
2236-        # clear download_in_progress, so failed downloads can be re-tried
2237-        self.download_in_progress = False
2238-        return res
2239-
2240-    def _download_failed(self, f):
2241-        # tell anyone who's waiting that we failed
2242-        for m in self.milestones:
2243-            (offset,size,d) = m
2244-            eventually(d.errback, f)
2245-        self.milestones.clear()
2246-
2247-    def _check_milestones(self):
2248-        current_size = self.get_filesize()
2249-        for m in list(self.milestones):
2250-            (offset,size,d) = m
2251-            if offset+size <= current_size:
2252-                log.msg(format=("immutable filenode read [%(si)s] " +
2253-                                "%(offset)d+%(size)d vs %(filesize)d: " +
2254-                                "done"),
2255-                        si=base32.b2a(self._storage_index),
2256-                        offset=offset, size=size, filesize=current_size,
2257-                        umid="nuedUg", level=log.NOISY)
2258-                self.milestones.discard(m)
2259-                eventually(d.callback, None)
2260-            else:
2261-                log.msg(format=("immutable filenode read [%(si)s] " +
2262-                                "%(offset)d+%(size)d vs %(filesize)d: " +
2263-                                "still waiting"),
2264-                        si=base32.b2a(self._storage_index),
2265-                        offset=offset, size=size, filesize=current_size,
2266-                        umid="8PKOhg", level=log.NOISY)
2267-
2268-    def get_filesize(self):
2269-        if not self.cachefile:
2270-            self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
2271-        try:
2272-            filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
2273-        except OSError:
2274-            filesize = 0
2275-        return filesize
2276-
2277-
2278-    def open(self, size):
2279-        if not self.cachefile:
2280-            self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
2281-        self.f = open(self.cachefile.get_filename(), "wb")
2282-
2283-    def write(self, data):
2284-        self.f.write(data)
2285-        self._check_milestones()
2286-
2287-    def close(self):
2288-        self.f.close()
2289-        self._check_milestones()
2290-
2291-    def fail(self, why):
2292-        pass
2293-    def register_canceller(self, cb):
2294-        pass
2295-    def finish(self):
2296-        return None
2297-    # The following methods are just because the target might be a
2298-    # repairer.DownUpConnector, and just because the current CHKUpload object
2299-    # expects to find the storage index and encoding parameters in its
2300-    # Uploadable.
2301-    def set_storageindex(self, storageindex):
2302-        pass
2303-    def set_encodingparams(self, encodingparams):
2304-        pass
2305-
2306-
2307-class ImmutableFileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
2308-    def __init__(self, filecap, storage_broker, secret_holder,
2309-                 downloader, history, cachedirectorymanager):
2310-        assert isinstance(filecap, CHKFileURI)
2311-        self.u = filecap
2312-        self._storage_broker = storage_broker
2313-        self._secret_holder = secret_holder
2314-        self._downloader = downloader
2315-        self._history = history
2316-        storage_index = self.get_storage_index()
2317-        self.download_cache = DownloadCache(filecap, storage_index, downloader,
2318-                                            cachedirectorymanager)
2319-        prefix = self.u.get_verify_cap().to_string()
2320-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
2321-        self.log("starting", level=log.OPERATIONAL)
2322-
2323-    def get_size(self):
2324-        return self.u.get_size()
2325-    def get_current_size(self):
2326-        return defer.succeed(self.get_size())
2327-
2328-    def get_cap(self):
2329-        return self.u
2330-    def get_readcap(self):
2331-        return self.u.get_readonly()
2332-    def get_verify_cap(self):
2333-        return self.u.get_verify_cap()
2334-    def get_repair_cap(self):
2335-        # CHK files can be repaired with just the verifycap
2336-        return self.u.get_verify_cap()
2337-
2338-    def get_uri(self):
2339-        return self.u.to_string()
2340-
2341-    def get_storage_index(self):
2342-        return self.u.get_storage_index()
2343 
2344     def check_and_repair(self, monitor, verify=False, add_lease=False):
2345hunk ./src/allmydata/immutable/filenode.py 77
2346-        verifycap = self.get_verify_cap()
2347+        verifycap = self._verifycap
2348+        storage_index = verifycap.storage_index
2349         sb = self._storage_broker
2350         servers = sb.get_all_servers()
2351         sh = self._secret_holder
2352hunk ./src/allmydata/immutable/filenode.py 88
2353                     monitor=monitor)
2354         d = c.start()
2355         def _maybe_repair(cr):
2356-            crr = CheckAndRepairResults(self.u.get_storage_index())
2357+            crr = CheckAndRepairResults(storage_index)
2358             crr.pre_repair_results = cr
2359             if cr.is_healthy():
2360                 crr.post_repair_results = cr
2361hunk ./src/allmydata/immutable/filenode.py 98
2362                 crr.repair_successful = False # until proven successful
2363                 def _gather_repair_results(ur):
2364                     assert IUploadResults.providedBy(ur), ur
2365-                    # clone the cr -- check results to form the basic of the prr -- post-repair results
2366+                    # clone the cr (check results) to form the basis of the
2367+                    # prr (post-repair results)
2368                     prr = CheckResults(cr.uri, cr.storage_index)
2369                     prr.data = copy.deepcopy(cr.data)
2370 
2371hunk ./src/allmydata/immutable/filenode.py 104
2372                     sm = prr.data['sharemap']
2373-                    assert isinstance(sm, dictutil.DictOfSets), sm
2374+                    assert isinstance(sm, DictOfSets), sm
2375                     sm.update(ur.sharemap)
2376                     servers_responding = set(prr.data['servers-responding'])
2377                     servers_responding.union(ur.sharemap.iterkeys())
2378hunk ./src/allmydata/immutable/filenode.py 111
2379                     prr.data['servers-responding'] = list(servers_responding)
2380                     prr.data['count-shares-good'] = len(sm)
2381                     prr.data['count-good-share-hosts'] = len(sm)
2382-                    is_healthy = bool(len(sm) >= self.u.total_shares)
2383-                    is_recoverable = bool(len(sm) >= self.u.needed_shares)
2384+                    is_healthy = bool(len(sm) >= verifycap.total_shares)
2385+                    is_recoverable = bool(len(sm) >= verifycap.needed_shares)
2386                     prr.set_healthy(is_healthy)
2387                     prr.set_recoverable(is_recoverable)
2388                     crr.repair_successful = is_healthy
2389hunk ./src/allmydata/immutable/filenode.py 116
2390-                    prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
2391+                    prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares)
2392 
2393                     crr.post_repair_results = prr
2394                     return crr
2395hunk ./src/allmydata/immutable/filenode.py 126
2396                     crr.repair_successful = False
2397                     crr.repair_failure = f
2398                     return f
2399-                r = Repairer(storage_broker=sb, secret_holder=sh,
2400-                             verifycap=verifycap, monitor=monitor)
2401+                r = Repairer(self, storage_broker=sb, secret_holder=sh,
2402+                             monitor=monitor)
2403                 d = r.start()
2404                 d.addCallbacks(_gather_repair_results, _repair_error)
2405                 return d
2406hunk ./src/allmydata/immutable/filenode.py 136
2407         return d
2408 
2409     def check(self, monitor, verify=False, add_lease=False):
2410-        verifycap = self.get_verify_cap()
2411+        verifycap = self._verifycap
2412         sb = self._storage_broker
2413         servers = sb.get_all_servers()
2414         sh = self._secret_holder
2415hunk ./src/allmydata/immutable/filenode.py 146
2416                     monitor=monitor)
2417         return v.start()
2418 
2419-    def read(self, consumer, offset=0, size=None):
2420-        self.log("read", offset=offset, size=size,
2421-                 umid="UPP8FA", level=log.OPERATIONAL)
2422-        if size is None:
2423-            size = self.get_size() - offset
2424-        size = min(size, self.get_size() - offset)
2425 
2426hunk ./src/allmydata/immutable/filenode.py 147
2427-        if offset == 0 and size == self.get_size():
2428-            # don't use the cache, just do a normal streaming download
2429-            self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
2430-            target = download.ConsumerAdapter(consumer)
2431-            return self._downloader.download(self.get_cap(), target,
2432-                                             self._parentmsgid,
2433-                                             history=self._history)
2434+class DecryptingConsumer:
2435+    """I sit between a CiphertextDownloader (which acts as a Producer) and
2436+    the real Consumer, decrypting everything that passes by. The real
2437+    Consumer sees the real Producer, but the Producer sees us instead of the
2438+    real consumer."""
2439+    implements(IConsumer)
2440 
2441hunk ./src/allmydata/immutable/filenode.py 154
2442-        d = self.download_cache.when_range_available(offset, size)
2443-        d.addCallback(lambda res:
2444-                      self.download_cache.read(consumer, offset, size))
2445-        return d
2446+    def __init__(self, consumer, readkey, offset, read_event):
2447+        self._consumer = consumer
2448+        self._read_event = read_event
2449+        # TODO: pycryptopp CTR-mode needs random-access operations: I want
2450+        # either a=AES(readkey, offset) or better yet both of:
2451+        #  a=AES(readkey, offset=0)
2452+        #  a.process(ciphertext, offset=xyz)
2453+        # For now, we fake it with the existing iv= argument.
2454+        offset_big = offset // 16
2455+        offset_small = offset % 16
2456+        iv = binascii.unhexlify("%032x" % offset_big)
2457+        self._decryptor = AES(readkey, iv=iv)
2458+        self._decryptor.process("\x00"*offset_small)
2459 
2460hunk ./src/allmydata/immutable/filenode.py 168
2461-class LiteralProducer:
2462-    implements(IPushProducer)
2463-    def resumeProducing(self):
2464-        pass
2465-    def stopProducing(self):
2466-        pass
2467+    def registerProducer(self, producer, streaming):
2468+        # this passes through, so the real consumer can flow-control the real
2469+        # producer. Therefore we don't need to provide any IPushProducer
2470+        # methods. We implement all the IConsumer methods as pass-throughs,
2471+        # and only intercept write() to perform decryption.
2472+        self._consumer.registerProducer(producer, streaming)
2473+    def unregisterProducer(self):
2474+        self._consumer.unregisterProducer()
2475+    def write(self, ciphertext):
2476+        started = now()
2477+        plaintext = self._decryptor.process(ciphertext)
2478+        elapsed = now() - started
2479+        self._read_event.update(0, elapsed, 0)
2480+        self._consumer.write(plaintext)
2481 
2482hunk ./src/allmydata/immutable/filenode.py 183
2483+class ImmutableFileNode:
2484+    implements(IImmutableFileNode)
2485 
2486hunk ./src/allmydata/immutable/filenode.py 186
2487-class LiteralFileNode(_ImmutableFileNodeBase):
2488-
2489-    def __init__(self, filecap):
2490-        assert isinstance(filecap, LiteralFileURI)
2491+    # I wrap a CiphertextFileNode with a decryption key
2492+    def __init__(self, filecap, storage_broker, secret_holder, terminator,
2493+                 history):
2494+        assert isinstance(filecap, uri.CHKFileURI)
2495+        verifycap = filecap.get_verify_cap()
2496+        ds = DownloadStatus(verifycap.storage_index, verifycap.size)
2497+        if history:
2498+            history.add_download(ds)
2499+        self._download_status = ds
2500+        self._cnode = CiphertextFileNode(verifycap, storage_broker,
2501+                                         secret_holder, terminator, history, ds)
2502+        assert isinstance(filecap, uri.CHKFileURI)
2503         self.u = filecap
2504hunk ./src/allmydata/immutable/filenode.py 199
2505+        self._readkey = filecap.key
2506 
2507hunk ./src/allmydata/immutable/filenode.py 201
2508-    def get_size(self):
2509-        return len(self.u.data)
2510-    def get_current_size(self):
2511-        return defer.succeed(self.get_size())
2512+    # TODO: I'm not sure about this.. what's the use case for node==node? If
2513+    # we keep it here, we should also put this on CiphertextFileNode
2514+    def __hash__(self):
2515+        return self.u.__hash__()
2516+    def __eq__(self, other):
2517+        if isinstance(other, ImmutableFileNode):
2518+            return self.u.__eq__(other.u)
2519+        else:
2520+            return False
2521+    def __ne__(self, other):
2522+        if isinstance(other, ImmutableFileNode):
2523+            return self.u.__eq__(other.u)
2524+        else:
2525+            return True
2526+
2527+    def read(self, consumer, offset=0, size=None):
2528+        actual_size = size
2529+        if actual_size == None:
2530+            actual_size = self.u.size
2531+        actual_size = actual_size - offset
2532+        read_ev = self._download_status.add_read_event(offset,actual_size,
2533+                                                       now())
2534+        decryptor = DecryptingConsumer(consumer, self._readkey, offset, read_ev)
2535+        d = self._cnode.read(decryptor, offset, size, read_ev)
2536+        d.addCallback(lambda dc: consumer)
2537+        return d
2538+
2539+    def raise_error(self):
2540+        pass
2541 
2542hunk ./src/allmydata/immutable/filenode.py 231
2543+    def get_write_uri(self):
2544+        return None
2545+
2546+    def get_readonly_uri(self):
2547+        return self.get_uri()
2548+
2549+    def get_uri(self):
2550+        return self.u.to_string()
2551     def get_cap(self):
2552         return self.u
2553     def get_readcap(self):
2554hunk ./src/allmydata/immutable/filenode.py 242
2555-        return self.u
2556+        return self.u.get_readonly()
2557     def get_verify_cap(self):
2558hunk ./src/allmydata/immutable/filenode.py 244
2559-        return None
2560+        return self.u.get_verify_cap()
2561     def get_repair_cap(self):
2562hunk ./src/allmydata/immutable/filenode.py 246
2563-        return None
2564-
2565-    def get_uri(self):
2566-        return self.u.to_string()
2567+        # CHK files can be repaired with just the verifycap
2568+        return self.u.get_verify_cap()
2569 
2570     def get_storage_index(self):
2571hunk ./src/allmydata/immutable/filenode.py 250
2572-        return None
2573+        return self.u.get_storage_index()
2574 
2575hunk ./src/allmydata/immutable/filenode.py 252
2576-    def check(self, monitor, verify=False, add_lease=False):
2577-        return defer.succeed(None)
2578+    def get_size(self):
2579+        return self.u.get_size()
2580+    def get_current_size(self):
2581+        return defer.succeed(self.get_size())
2582 
2583hunk ./src/allmydata/immutable/filenode.py 257
2584-    def check_and_repair(self, monitor, verify=False, add_lease=False):
2585-        return defer.succeed(None)
2586+    def is_mutable(self):
2587+        return False
2588 
2589hunk ./src/allmydata/immutable/filenode.py 260
2590-    def read(self, consumer, offset=0, size=None):
2591-        if size is None:
2592-            data = self.u.data[offset:]
2593-        else:
2594-            data = self.u.data[offset:offset+size]
2595+    def is_readonly(self):
2596+        return True
2597 
2598hunk ./src/allmydata/immutable/filenode.py 263
2599-        # We use twisted.protocols.basic.FileSender, which only does
2600-        # non-streaming, i.e. PullProducer, where the receiver/consumer must
2601-        # ask explicitly for each chunk of data. There are only two places in
2602-        # the Twisted codebase that can't handle streaming=False, both of
2603-        # which are in the upload path for an FTP/SFTP server
2604-        # (protocols.ftp.FileConsumer and
2605-        # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
2606-        # likely to be used as the target for a Tahoe download.
2607+    def is_unknown(self):
2608+        return False
2609 
2610hunk ./src/allmydata/immutable/filenode.py 266
2611-        d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
2612-        d.addCallback(lambda lastSent: consumer)
2613-        return d
2614+    def is_allowed_in_immutable_directory(self):
2615+        return True
2616+
2617+    def check_and_repair(self, monitor, verify=False, add_lease=False):
2618+        return self._cnode.check_and_repair(monitor, verify, add_lease)
2619+    def check(self, monitor, verify=False, add_lease=False):
2620+        return self._cnode.check(monitor, verify, add_lease)
2621hunk ./src/allmydata/immutable/layout.py 77
2622 # they are still provided when writing so that older versions of Tahoe can
2623 # read them.
2624 
2625+FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
2626+
2627 def make_write_bucket_proxy(rref, data_size, block_size, num_segments,
2628                             num_share_hashes, uri_extension_size_max, nodeid):
2629     # Use layout v1 for small files, so they'll be readable by older versions
2630hunk ./src/allmydata/immutable/layout.py 85
2631     # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
2632     # by tahoe-1.3.0 or later.
2633     try:
2634+        if FORCE_V2:
2635+            raise FileTooLargeError
2636         wbp = WriteBucketProxy(rref, data_size, block_size, num_segments,
2637                                num_share_hashes, uri_extension_size_max, nodeid)
2638     except FileTooLargeError:
2639hunk ./src/allmydata/immutable/repairer.py 4
2640 from zope.interface import implements
2641 from twisted.internet import defer
2642 from allmydata.storage.server import si_b2a
2643-from allmydata.util import log, observer
2644-from allmydata.util.assertutil import precondition, _assert
2645-from allmydata.uri import CHKFileVerifierURI
2646-from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
2647-from twisted.internet.interfaces import IConsumer
2648+from allmydata.util import log, consumer
2649+from allmydata.util.assertutil import precondition
2650+from allmydata.interfaces import IEncryptedUploadable
2651 
2652hunk ./src/allmydata/immutable/repairer.py 8
2653-from allmydata.immutable import download, upload
2654-
2655-import collections
2656+from allmydata.immutable import upload
2657 
2658 class Repairer(log.PrefixingLogMixin):
2659hunk ./src/allmydata/immutable/repairer.py 11
2660+    implements(IEncryptedUploadable)
2661     """I generate any shares which were not available and upload them to
2662     servers.
2663 
2664hunk ./src/allmydata/immutable/repairer.py 43
2665     cancelled (by invoking its raise_if_cancelled() method).
2666     """
2667 
2668-    def __init__(self, storage_broker, secret_holder, verifycap, monitor):
2669-        assert precondition(isinstance(verifycap, CHKFileVerifierURI))
2670-
2671-        logprefix = si_b2a(verifycap.get_storage_index())[:5]
2672+    def __init__(self, filenode, storage_broker, secret_holder, monitor):
2673+        logprefix = si_b2a(filenode.get_storage_index())[:5]
2674         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
2675                                        prefix=logprefix)
2676hunk ./src/allmydata/immutable/repairer.py 47
2677-
2678+        self._filenode = filenode
2679         self._storage_broker = storage_broker
2680         self._secret_holder = secret_holder
2681hunk ./src/allmydata/immutable/repairer.py 50
2682-        self._verifycap = verifycap
2683         self._monitor = monitor
2684hunk ./src/allmydata/immutable/repairer.py 51
2685+        self._offset = 0
2686 
2687     def start(self):
2688         self.log("starting repair")
2689hunk ./src/allmydata/immutable/repairer.py 55
2690-        duc = DownUpConnector()
2691-        dl = download.CiphertextDownloader(self._storage_broker,
2692-                                           self._verifycap, target=duc,
2693-                                           monitor=self._monitor)
2694-        ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
2695-
2696-        d = defer.Deferred()
2697-
2698-        # If the upload or the download fails or is stopped, then the repair
2699-        # failed.
2700-        def _errb(f):
2701-            d.errback(f)
2702-            return None
2703-
2704-        # If the upload succeeds, then the repair has succeeded.
2705-        def _cb(res):
2706-            d.callback(res)
2707-        ul.start(duc).addCallbacks(_cb, _errb)
2708-
2709-        # If the download fails or is stopped, then the repair failed.
2710-        d2 = dl.start()
2711-        d2.addErrback(_errb)
2712-
2713-        # We ignore the callback from d2.  Is this right?  Ugh.
2714-
2715+        d = self._filenode.get_segment_size()
2716+        def _got_segsize(segsize):
2717+            vcap = self._filenode.get_verify_cap()
2718+            k = vcap.needed_shares
2719+            N = vcap.total_shares
2720+            happy = upload.BaseUploadable.default_encoding_param_happy
2721+            self._encodingparams = (k, happy, N, segsize)
2722+            ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
2723+            return ul.start(self) # I am the IEncryptedUploadable
2724+        d.addCallback(_got_segsize)
2725         return d
2726 
2727hunk ./src/allmydata/immutable/repairer.py 67
2728-class DownUpConnector(log.PrefixingLogMixin):
2729-    implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
2730-    """I act like an 'encrypted uploadable' -- something that a local
2731-    uploader can read ciphertext from in order to upload the ciphertext.
2732-    However, unbeknownst to the uploader, I actually download the ciphertext
2733-    from a CiphertextDownloader instance as it is needed.
2734-
2735-    On the other hand, I act like a 'download target' -- something that a
2736-    local downloader can write ciphertext to as it downloads the ciphertext.
2737-    That downloader doesn't realize, of course, that I'm just turning around
2738-    and giving the ciphertext to the uploader."""
2739-
2740-    # The theory behind this class is nice: just satisfy two separate
2741-    # interfaces. The implementation is slightly horrible, because of
2742-    # "impedance mismatch" -- the downloader expects to be able to
2743-    # synchronously push data in, and the uploader expects to be able to read
2744-    # data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.
2745-    # The two interfaces have different APIs for pausing/unpausing. The
2746-    # uploader requests metadata like size and encodingparams which the
2747-    # downloader provides either eventually or not at all (okay I just now
2748-    # extended the downloader to provide encodingparams). Most of this
2749-    # slightly horrible code would disappear if CiphertextDownloader just
2750-    # used this object as an IConsumer (plus maybe a couple of other methods)
2751-    # and if the Uploader simply expected to be treated as an IConsumer (plus
2752-    # maybe a couple of other things).
2753-
2754-    def __init__(self, buflim=2**19):
2755-        """If we're already holding at least buflim bytes, then tell the
2756-        downloader to pause until we have less than buflim bytes."""
2757-        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
2758-        self.buflim = buflim
2759-        self.bufs = collections.deque() # list of strings
2760-        self.bufsiz = 0 # how many bytes total in bufs
2761-
2762-        # list of deferreds which will fire with the requested ciphertext
2763-        self.next_read_ds = collections.deque()
2764-
2765-        # how many bytes of ciphertext were requested by each deferred
2766-        self.next_read_lens = collections.deque()
2767-
2768-        self._size_osol = observer.OneShotObserverList()
2769-        self._encodingparams_osol = observer.OneShotObserverList()
2770-        self._storageindex_osol = observer.OneShotObserverList()
2771-        self._closed_to_pusher = False
2772-
2773-        # once seg size is available, the following attribute will be created
2774-        # to hold it:
2775-
2776-        # self.encodingparams # (provided by the object which is pushing data
2777-        # into me, required by the object which is pulling data out of me)
2778-
2779-        # open() will create the following attribute:
2780-        # self.size # size of the whole file (provided by the object which is
2781-        # pushing data into me, required by the object which is pulling data
2782-        # out of me)
2783-
2784-        # set_upload_status() will create the following attribute:
2785-
2786-        # self.upload_status # XXX do we need to actually update this? Is
2787-        # anybody watching the results during a repair?
2788-
2789-    def _satisfy_reads_if_possible(self):
2790-        assert bool(self.next_read_ds) == bool(self.next_read_lens)
2791-        while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0])
2792-                                     or self._closed_to_pusher):
2793-            nrd = self.next_read_ds.popleft()
2794-            nrl = self.next_read_lens.popleft()
2795-
2796-            # Pick out the requested number of bytes from self.bufs, turn it
2797-            # into a string, and callback the deferred with that.
2798-            res = []
2799-            ressize = 0
2800-            while ressize < nrl and self.bufs:
2801-                nextbuf = self.bufs.popleft()
2802-                res.append(nextbuf)
2803-                ressize += len(nextbuf)
2804-                if ressize > nrl:
2805-                    extra = ressize - nrl
2806-                    self.bufs.appendleft(nextbuf[:-extra])
2807-                    res[-1] = nextbuf[:-extra]
2808-            assert _assert(sum(len(x) for x in res) <= nrl, [len(x) for x in res], nrl)
2809-            assert _assert(sum(len(x) for x in res) == nrl or self._closed_to_pusher, [len(x) for x in res], nrl)
2810-            self.bufsiz -= nrl
2811-            if self.bufsiz < self.buflim and self.producer:
2812-                self.producer.resumeProducing()
2813-            nrd.callback(res)
2814-
2815-    # methods to satisfy the IConsumer and IDownloadTarget interfaces. (From
2816-    # the perspective of a downloader I am an IDownloadTarget and an
2817-    # IConsumer.)
2818-    def registerProducer(self, producer, streaming):
2819-        assert streaming # We know how to handle only streaming producers.
2820-        self.producer = producer # the downloader
2821-    def unregisterProducer(self):
2822-        self.producer = None
2823-    def open(self, size):
2824-        self.size = size
2825-        self._size_osol.fire(self.size)
2826-    def set_encodingparams(self, encodingparams):
2827-        self.encodingparams = encodingparams
2828-        self._encodingparams_osol.fire(self.encodingparams)
2829-    def set_storageindex(self, storageindex):
2830-        self.storageindex = storageindex
2831-        self._storageindex_osol.fire(self.storageindex)
2832-    def write(self, data):
2833-        precondition(data) # please don't write empty strings
2834-        self.bufs.append(data)
2835-        self.bufsiz += len(data)
2836-        self._satisfy_reads_if_possible()
2837-        if self.bufsiz >= self.buflim and self.producer:
2838-            self.producer.pauseProducing()
2839-    def finish(self):
2840-        pass
2841-    def close(self):
2842-        self._closed_to_pusher = True
2843-        # Any reads which haven't been satisfied by now are going to
2844-        # have to be satisfied with short reads.
2845-        self._satisfy_reads_if_possible()
2846 
2847     # methods to satisfy the IEncryptedUploader interface
2848     # (From the perspective of an uploader I am an IEncryptedUploadable.)
2849hunk ./src/allmydata/immutable/repairer.py 73
2850     def set_upload_status(self, upload_status):
2851         self.upload_status = upload_status
2852     def get_size(self):
2853-        if hasattr(self, 'size'): # attribute created by self.open()
2854-            return defer.succeed(self.size)
2855-        else:
2856-            return self._size_osol.when_fired()
2857+        size = self._filenode.get_size()
2858+        assert size is not None
2859+        return defer.succeed(size)
2860     def get_all_encoding_parameters(self):
2861hunk ./src/allmydata/immutable/repairer.py 77
2862-        # We have to learn the encoding params from pusher.
2863-        if hasattr(self, 'encodingparams'):
2864-            # attribute created by self.set_encodingparams()
2865-            return defer.succeed(self.encodingparams)
2866-        else:
2867-            return self._encodingparams_osol.when_fired()
2868+        return defer.succeed(self._encodingparams)
2869     def read_encrypted(self, length, hash_only):
2870hunk ./src/allmydata/immutable/repairer.py 79
2871-        """Returns a deferred which eventually fired with the requested
2872-        ciphertext."""
2873+        """Returns a deferred which eventually fires with the requested
2874+        ciphertext, as a list of strings."""
2875         precondition(length) # please don't ask to read 0 bytes
2876hunk ./src/allmydata/immutable/repairer.py 82
2877-        d = defer.Deferred()
2878-        self.next_read_ds.append(d)
2879-        self.next_read_lens.append(length)
2880-        self._satisfy_reads_if_possible()
2881+        mc = consumer.MemoryConsumer()
2882+        d = self._filenode.read(mc, self._offset, length)
2883+        self._offset += length
2884+        d.addCallback(lambda ign: mc.chunks)
2885         return d
2886     def get_storage_index(self):
2887hunk ./src/allmydata/immutable/repairer.py 88
2888-        # We have to learn the storage index from pusher.
2889-        if hasattr(self, 'storageindex'):
2890-            # attribute created by self.set_storageindex()
2891-            return defer.succeed(self.storageindex)
2892-        else:
2893-            return self._storageindex.when_fired()
2894+        return self._filenode.get_storage_index()
2895+    def close(self):
2896+        pass
2897hunk ./src/allmydata/immutable/upload.py 23
2898 from allmydata.util.rrefutil import add_version_to_remote_reference
2899 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
2900      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
2901-     NoServersError, InsufficientVersionError, UploadUnhappinessError
2902+     NoServersError, InsufficientVersionError, UploadUnhappinessError, \
2903+     DEFAULT_MAX_SEGMENT_SIZE
2904 from allmydata.immutable import layout
2905 from pycryptopp.cipher.aes import AES
2906 
2907hunk ./src/allmydata/immutable/upload.py 1209
2908         return self._upload_status
2909 
2910 class BaseUploadable:
2911-    default_max_segment_size = 128*KiB # overridden by max_segment_size
2912+    # this is overridden by max_segment_size
2913+    default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
2914     default_encoding_param_k = 3 # overridden by encoding_parameters
2915     default_encoding_param_happy = 7
2916     default_encoding_param_n = 10
2917hunk ./src/allmydata/interfaces.py 18
2918 
2919 MAX_BUCKETS = 256  # per peer -- zfec offers at most 256 shares per file
2920 
2921+DEFAULT_MAX_SEGMENT_SIZE = 128*1024
2922+
2923 ShareData = StringConstraint(None)
2924 URIExtensionData = StringConstraint(1000)
2925 Number = IntegerConstraint(8) # 2**(8*8) == 16EiB ~= 18e18 ~= 18 exabytes
2926hunk ./src/allmydata/nodemaker.py 4
2927 import weakref
2928 from zope.interface import implements
2929 from allmydata.interfaces import INodeMaker
2930-from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
2931+from allmydata.immutable.literal import LiteralFileNode
2932+from allmydata.immutable.filenode import ImmutableFileNode, CiphertextFileNode
2933 from allmydata.immutable.upload import Data
2934 from allmydata.mutable.filenode import MutableFileNode
2935 from allmydata.dirnode import DirectoryNode, pack_children
2936hunk ./src/allmydata/nodemaker.py 16
2937     implements(INodeMaker)
2938 
2939     def __init__(self, storage_broker, secret_holder, history,
2940-                 uploader, downloader, download_cache_dirman,
2941+                 uploader, terminator,
2942                  default_encoding_parameters, key_generator):
2943         self.storage_broker = storage_broker
2944         self.secret_holder = secret_holder
2945hunk ./src/allmydata/nodemaker.py 22
2946         self.history = history
2947         self.uploader = uploader
2948-        self.downloader = downloader
2949-        self.download_cache_dirman = download_cache_dirman
2950+        self.terminator = terminator
2951         self.default_encoding_parameters = default_encoding_parameters
2952         self.key_generator = key_generator
2953 
2954hunk ./src/allmydata/nodemaker.py 32
2955         return LiteralFileNode(cap)
2956     def _create_immutable(self, cap):
2957         return ImmutableFileNode(cap, self.storage_broker, self.secret_holder,
2958-                                 self.downloader, self.history,
2959-                                 self.download_cache_dirman)
2960+                                 self.terminator, self.history)
2961+    def _create_immutable_verifier(self, cap):
2962+        return CiphertextFileNode(cap, self.storage_broker, self.secret_holder,
2963+                                  self.terminator, self.history)
2964     def _create_mutable(self, cap):
2965         n = MutableFileNode(self.storage_broker, self.secret_holder,
2966                             self.default_encoding_parameters,
2967hunk ./src/allmydata/nodemaker.py 78
2968             return self._create_lit(cap)
2969         if isinstance(cap, uri.CHKFileURI):
2970             return self._create_immutable(cap)
2971+        if isinstance(cap, uri.CHKFileVerifierURI):
2972+            return self._create_immutable_verifier(cap)
2973         if isinstance(cap, (uri.ReadonlySSKFileURI, uri.WriteableSSKFileURI)):
2974             return self._create_mutable(cap)
2975         if isinstance(cap, (uri.DirectoryURI,
2976hunk ./src/allmydata/test/no_network.py 226
2977         fileutil.make_dirs(serverdir)
2978         ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(),
2979                            readonly_storage=readonly)
2980+        ss._no_network_server_number = i
2981         return ss
2982 
2983     def add_server(self, i, ss):
2984hunk ./src/allmydata/test/no_network.py 323
2985                     pass
2986         return sorted(shares)
2987 
2988+    def copy_shares(self, uri):
2989+        shares = {}
2990+        for (shnum, serverid, sharefile) in self.find_uri_shares(uri):
2991+            shares[sharefile] = open(sharefile, "rb").read()
2992+        return shares
2993+
2994+    def restore_all_shares(self, shares):
2995+        for sharefile, data in shares.items():
2996+            open(sharefile, "wb").write(data)
2997+
2998     def delete_share(self, (shnum, serverid, sharefile)):
2999         os.unlink(sharefile)
3000 
3001hunk ./src/allmydata/test/no_network.py 353
3002                 corruptdata = corruptor(sharedata, debug=debug)
3003                 open(i_sharefile, "wb").write(corruptdata)
3004 
3005+    def corrupt_all_shares(self, uri, corruptor, debug=False):
3006+        for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
3007+            sharedata = open(i_sharefile, "rb").read()
3008+            corruptdata = corruptor(sharedata, debug=debug)
3009+            open(i_sharefile, "wb").write(corruptdata)
3010+
3011     def GET(self, urlpath, followRedirect=False, return_response=False,
3012             method="GET", clientnum=0, **kwargs):
3013         # if return_response=True, this fires with (data, statuscode,
3014hunk ./src/allmydata/test/test_cli.py 2304
3015             self.delete_shares_numbered(ur.uri, range(1,10))
3016         d.addCallback(_stash_bad)
3017 
3018+        # the download is abandoned as soon as it's clear that we won't get
3019+        # enough shares. The one remaining share might be in either the
3020+        # COMPLETE or the PENDING state.
3021+        in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3"
3022+        in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3"
3023+
3024         d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
3025         def _check1((rc, out, err)):
3026             self.failIfEqual(rc, 0)
3027hunk ./src/allmydata/test/test_cli.py 2315
3028             self.failUnless("410 Gone" in err, err)
3029             self.failUnlessIn("NotEnoughSharesError: ", err)
3030-            self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err)
3031+            self.failUnless(in_complete_msg in err or in_pending_msg in err,
3032+                            err)
3033         d.addCallback(_check1)
3034 
3035         targetf = os.path.join(self.basedir, "output")
3036hunk ./src/allmydata/test/test_cli.py 2325
3037             self.failIfEqual(rc, 0)
3038             self.failUnless("410 Gone" in err, err)
3039             self.failUnlessIn("NotEnoughSharesError: ", err)
3040-            self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err)
3041+            self.failUnless(in_complete_msg in err or in_pending_msg in err,
3042+                            err)
3043             self.failIf(os.path.exists(targetf))
3044         d.addCallback(_check2)
3045 
3046hunk ./src/allmydata/test/test_dirnode.py 1205
3047     def test_unpack_and_pack_behavior(self):
3048         known_tree = b32decode(self.known_tree)
3049         nodemaker = NodeMaker(None, None, None,
3050-                              None, None, None,
3051+                              None, None,
3052                               {"k": 3, "n": 10}, None)
3053         write_uri = "URI:SSK-RO:e3mdrzfwhoq42hy5ubcz6rp3o4:ybyibhnp3vvwuq2vaw2ckjmesgkklfs6ghxleztqidihjyofgw7q"
3054         filenode = nodemaker.create_from_cap(write_uri)
3055hunk ./src/allmydata/test/test_dirnode.py 1267
3056         return kids
3057 
3058     def test_deep_immutable(self):
3059-        nm = NodeMaker(None, None, None, None, None, None, {"k": 3, "n": 10},
3060-                       None)
3061+        nm = NodeMaker(None, None, None, None, None, {"k": 3, "n": 10}, None)
3062         fn = MinimalFakeMutableFile()
3063 
3064         kids = self._make_kids(nm, ["imm", "lit", "write", "read",
3065hunk ./src/allmydata/test/test_dirnode.py 1361
3066 class FakeClient2(Client):
3067     def __init__(self):
3068         self.nodemaker = FakeNodeMaker(None, None, None,
3069-                                       None, None, None,
3070+                                       None, None,
3071                                        {"k":3,"n":10}, None)
3072     def create_node_from_uri(self, rwcap, rocap):
3073         return self.nodemaker.create_from_cap(rwcap, rocap)
3074hunk ./src/allmydata/test/test_dirnode.py 1645
3075         def _do_delete(ignored):
3076             nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder,
3077                                   c0.get_history(), c0.getServiceNamed("uploader"),
3078-                                  c0.downloader,
3079-                                  c0.download_cache_dirman,
3080+                                  c0.terminator,
3081                                   c0.get_encoding_parameters(),
3082                                   c0._key_generator)
3083             n = nm.create_from_cap(self.root_uri)
3084hunk ./src/allmydata/test/test_download.py 8
3085 
3086 import os
3087 from twisted.trial import unittest
3088+from twisted.internet import defer, reactor
3089 from allmydata import uri
3090 from allmydata.storage.server import storage_index_to_dir
3091hunk ./src/allmydata/test/test_download.py 11
3092-from allmydata.util import base32, fileutil
3093-from allmydata.util.consumer import download_to_data
3094-from allmydata.immutable import upload
3095+from allmydata.util import base32, fileutil, spans, log
3096+from allmydata.util.consumer import download_to_data, MemoryConsumer
3097+from allmydata.immutable import upload, layout
3098 from allmydata.test.no_network import GridTestMixin
3099hunk ./src/allmydata/test/test_download.py 15
3100+from allmydata.test.common import ShouldFailMixin
3101+from allmydata.interfaces import NotEnoughSharesError, NoSharesError
3102+from allmydata.immutable.downloader.common import BadSegmentNumberError, \
3103+     BadCiphertextHashError, DownloadStopped
3104+from allmydata.codec import CRSDecoder
3105+from foolscap.eventual import fireEventually, flushEventualQueue
3106 
3107 plaintext = "This is a moderate-sized file.\n" * 10
3108 mutable_plaintext = "This is a moderate-sized mutable file.\n" * 10
3109hunk ./src/allmydata/test/test_download.py 78
3110 }
3111 #--------- END stored_shares.py ----------------
3112 
3113-class DownloadTest(GridTestMixin, unittest.TestCase):
3114-    timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
3115-    def test_download(self):
3116-        self.basedir = self.mktemp()
3117-        self.set_up_grid()
3118-        self.c0 = self.g.clients[0]
3119-
3120-        # do this to create the shares
3121-        #return self.create_shares()
3122-
3123-        self.load_shares()
3124-        d = self.download_immutable()
3125-        d.addCallback(self.download_mutable)
3126-        return d
3127+class _Base(GridTestMixin, ShouldFailMixin):
3128 
3129     def create_shares(self, ignored=None):
3130         u = upload.Data(plaintext, None)
3131hunk ./src/allmydata/test/test_download.py 175
3132         def _got_data(data):
3133             self.failUnlessEqual(data, plaintext)
3134         d.addCallback(_got_data)
3135+        # make sure we can use the same node twice
3136+        d.addCallback(lambda ign: download_to_data(n))
3137+        d.addCallback(_got_data)
3138         return d
3139 
3140     def download_mutable(self, ignored=None):
3141hunk ./src/allmydata/test/test_download.py 188
3142         d.addCallback(_got_data)
3143         return d
3144 
3145+class DownloadTest(_Base, unittest.TestCase):
3146+    timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
3147+    def test_download(self):
3148+        self.basedir = self.mktemp()
3149+        self.set_up_grid()
3150+        self.c0 = self.g.clients[0]
3151+
3152+        # do this to create the shares
3153+        #return self.create_shares()
3154+
3155+        self.load_shares()
3156+        d = self.download_immutable()
3157+        d.addCallback(self.download_mutable)
3158+        return d
3159+
3160+    def test_download_failover(self):
3161+        self.basedir = self.mktemp()
3162+        self.set_up_grid()
3163+        self.c0 = self.g.clients[0]
3164+
3165+        self.load_shares()
3166+        si = uri.from_string(immutable_uri).get_storage_index()
3167+        si_dir = storage_index_to_dir(si)
3168+
3169+        n = self.c0.create_node_from_uri(immutable_uri)
3170+        d = download_to_data(n)
3171+        def _got_data(data):
3172+            self.failUnlessEqual(data, plaintext)
3173+        d.addCallback(_got_data)
3174+
3175+        def _clobber_some_shares(ign):
3176+            # find the three shares that were used, and delete them. Then
3177+            # download again, forcing the downloader to fail over to other
3178+            # shares
3179+            for s in n._cnode._node._shares:
3180+                for clientnum in immutable_shares:
3181+                    for shnum in immutable_shares[clientnum]:
3182+                        if s._shnum == shnum:
3183+                            fn = os.path.join(self.get_serverdir(clientnum),
3184+                                              "shares", si_dir, str(shnum))
3185+                            os.unlink(fn)
3186+        d.addCallback(_clobber_some_shares)
3187+        d.addCallback(lambda ign: download_to_data(n))
3188+        d.addCallback(_got_data)
3189+
3190+        def _clobber_most_shares(ign):
3191+            # delete all but one of the shares that are still alive
3192+            live_shares = [s for s in n._cnode._node._shares if s.is_alive()]
3193+            save_me = live_shares[0]._shnum
3194+            for clientnum in immutable_shares:
3195+                for shnum in immutable_shares[clientnum]:
3196+                    if shnum == save_me:
3197+                        continue
3198+                    fn = os.path.join(self.get_serverdir(clientnum),
3199+                                      "shares", si_dir, str(shnum))
3200+                    if os.path.exists(fn):
3201+                        os.unlink(fn)
3202+            # now the download should fail with NotEnoughSharesError
3203+            return self.shouldFail(NotEnoughSharesError, "1shares", None,
3204+                                   download_to_data, n)
3205+        d.addCallback(_clobber_most_shares)
3206+
3207+        def _clobber_all_shares(ign):
3208+            # delete the last remaining share
3209+            for clientnum in immutable_shares:
3210+                for shnum in immutable_shares[clientnum]:
3211+                    fn = os.path.join(self.get_serverdir(clientnum),
3212+                                      "shares", si_dir, str(shnum))
3213+                    if os.path.exists(fn):
3214+                        os.unlink(fn)
3215+            # now a new download should fail with NoSharesError. We want a
3216+            # new ImmutableFileNode so it will forget about the old shares.
3217+            # If we merely called create_node_from_uri() without first
3218+            # dereferencing the original node, the NodeMaker's _node_cache
3219+            # would give us back the old one.
3220+            n = None
3221+            n = self.c0.create_node_from_uri(immutable_uri)
3222+            return self.shouldFail(NoSharesError, "0shares", None,
3223+                                   download_to_data, n)
3224+        d.addCallback(_clobber_all_shares)
3225+        return d
3226+
3227+    def test_lost_servers(self):
3228+        # while downloading a file (after seg[0], before seg[1]), lose the
3229+        # three servers that we were using. The download should switch over
3230+        # to other servers.
3231+        self.basedir = self.mktemp()
3232+        self.set_up_grid()
3233+        self.c0 = self.g.clients[0]
3234+
3235+        # upload a file with multiple segments, so we can catch the download
3236+        # in the middle.
3237+        u = upload.Data(plaintext, None)
3238+        u.max_segment_size = 70 # 5 segs
3239+        d = self.c0.upload(u)
3240+        def _uploaded(ur):
3241+            self.uri = ur.uri
3242+            self.n = self.c0.create_node_from_uri(self.uri)
3243+            return download_to_data(self.n)
3244+        d.addCallback(_uploaded)
3245+        def _got_data(data):
3246+            self.failUnlessEqual(data, plaintext)
3247+        d.addCallback(_got_data)
3248+        def _kill_some_servers():
3249+            # find the three shares that were used, and delete them. Then
3250+            # download again, forcing the downloader to fail over to other
3251+            # shares
3252+            servers = []
3253+            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
3254+            self.failUnlessEqual(shares, [0,1,2])
3255+            # break the RIBucketReader references
3256+            for s in self.n._cnode._node._shares:
3257+                s._rref.broken = True
3258+                for servernum in immutable_shares:
3259+                    for shnum in immutable_shares[servernum]:
3260+                        if s._shnum == shnum:
3261+                            ss = self.g.servers_by_number[servernum]
3262+                            servers.append(ss)
3263+            # and, for good measure, break the RIStorageServer references
3264+            # too, just in case the downloader gets more aggressive in the
3265+            # future and tries to re-fetch the same share.
3266+            for ss in servers:
3267+                wrapper = self.g.servers_by_id[ss.my_nodeid]
3268+                wrapper.broken = True
3269+        def _download_again(ign):
3270+            c = StallingConsumer(_kill_some_servers)
3271+            return self.n.read(c)
3272+        d.addCallback(_download_again)
3273+        def _check_failover(c):
3274+            self.failUnlessEqual("".join(c.chunks), plaintext)
3275+            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
3276+            # we should now be using more shares than we were before
3277+            self.failIfEqual(shares, [0,1,2])
3278+        d.addCallback(_check_failover)
3279+        return d
3280+
3281+    def test_badguess(self):
3282+        self.basedir = self.mktemp()
3283+        self.set_up_grid()
3284+        self.c0 = self.g.clients[0]
3285+        self.load_shares()
3286+        n = self.c0.create_node_from_uri(immutable_uri)
3287+
3288+        # Cause the downloader to guess a segsize that's too low, so it will
3289+        # ask for a segment number that's too high (beyond the end of the
3290+        # real list, causing BadSegmentNumberError), to exercise
3291+        # Segmentation._retry_bad_segment
3292+
3293+        con1 = MemoryConsumer()
3294+        n._cnode._node._build_guessed_tables(90)
3295+        # plaintext size of 310 bytes, wrong-segsize of 90 bytes, will make
3296+        # us think that file[180:200] is in the third segment (segnum=2), but
3297+        # really there's only one segment
3298+        d = n.read(con1, 180, 20)
3299+        def _done(res):
3300+            self.failUnlessEqual("".join(con1.chunks), plaintext[180:200])
3301+        d.addCallback(_done)
3302+        return d
3303+
3304+    def test_simultaneous_badguess(self):
3305+        self.basedir = self.mktemp()
3306+        self.set_up_grid()
3307+        self.c0 = self.g.clients[0]
3308+
3309+        # upload a file with multiple segments, and a non-default segsize, to
3310+        # exercise the offset-guessing code. Because we don't tell the
3311+        # downloader about the unusual segsize, it will guess wrong, and have
3312+        # to do extra roundtrips to get the correct data.
3313+        u = upload.Data(plaintext, None)
3314+        u.max_segment_size = 70 # 5 segs, 8-wide hashtree
3315+        con1 = MemoryConsumer()
3316+        con2 = MemoryConsumer()
3317+        d = self.c0.upload(u)
3318+        def _uploaded(ur):
3319+            n = self.c0.create_node_from_uri(ur.uri)
3320+            d1 = n.read(con1, 70, 20)
3321+            d2 = n.read(con2, 140, 20)
3322+            return defer.gatherResults([d1,d2])
3323+        d.addCallback(_uploaded)
3324+        def _done(res):
3325+            self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
3326+            self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
3327+        d.addCallback(_done)
3328+        return d
3329+
3330+    def test_simultaneous_goodguess(self):
3331+        self.basedir = self.mktemp()
3332+        self.set_up_grid()
3333+        self.c0 = self.g.clients[0]
3334+
3335+        # upload a file with multiple segments, and a non-default segsize, to
3336+        # exercise the offset-guessing code. This time we *do* tell the
3337+        # downloader about the unusual segsize, so it can guess right.
3338+        u = upload.Data(plaintext, None)
3339+        u.max_segment_size = 70 # 5 segs, 8-wide hashtree
3340+        con1 = MemoryConsumer()
3341+        con2 = MemoryConsumer()
3342+        d = self.c0.upload(u)
3343+        def _uploaded(ur):
3344+            n = self.c0.create_node_from_uri(ur.uri)
3345+            n._cnode._node._build_guessed_tables(u.max_segment_size)
3346+            d1 = n.read(con1, 70, 20)
3347+            #d2 = n.read(con2, 140, 20) # XXX
3348+            d2 = defer.succeed(None)
3349+            return defer.gatherResults([d1,d2])
3350+        d.addCallback(_uploaded)
3351+        def _done(res):
3352+            self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
3353+            self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
3354+        #d.addCallback(_done)
3355+        return d
3356+
3357+    def test_sequential_goodguess(self):
3358+        self.basedir = self.mktemp()
3359+        self.set_up_grid()
3360+        self.c0 = self.g.clients[0]
3361+        data = (plaintext*100)[:30000] # multiple of k
3362+
3363+        # upload a file with multiple segments, and a non-default segsize, to
3364+        # exercise the offset-guessing code. This time we *do* tell the
3365+        # downloader about the unusual segsize, so it can guess right.
3366+        u = upload.Data(data, None)
3367+        u.max_segment_size = 6000 # 5 segs, 8-wide hashtree
3368+        con1 = MemoryConsumer()
3369+        con2 = MemoryConsumer()
3370+        d = self.c0.upload(u)
3371+        def _uploaded(ur):
3372+            n = self.c0.create_node_from_uri(ur.uri)
3373+            n._cnode._node._build_guessed_tables(u.max_segment_size)
3374+            d = n.read(con1, 12000, 20)
3375+            def _read1(ign):
3376+                self.failUnlessEqual("".join(con1.chunks), data[12000:12020])
3377+                return n.read(con2, 24000, 20)
3378+            d.addCallback(_read1)
3379+            def _read2(ign):
3380+                self.failUnlessEqual("".join(con2.chunks), data[24000:24020])
3381+            d.addCallback(_read2)
3382+            return d
3383+        d.addCallback(_uploaded)
3384+        return d
3385+
3386+
3387+    def test_simultaneous_get_blocks(self):
3388+        self.basedir = self.mktemp()
3389+        self.set_up_grid()
3390+        self.c0 = self.g.clients[0]
3391+
3392+        self.load_shares()
3393+        stay_empty = []
3394+
3395+        n = self.c0.create_node_from_uri(immutable_uri)
3396+        d = download_to_data(n)
3397+        def _use_shares(ign):
3398+            shares = list(n._cnode._node._shares)
3399+            s0 = shares[0]
3400+            # make sure .cancel works too
3401+            o0 = s0.get_block(0)
3402+            o0.subscribe(lambda **kwargs: stay_empty.append(kwargs))
3403+            o1 = s0.get_block(0)
3404+            o2 = s0.get_block(0)
3405+            o0.cancel()
3406+            o3 = s0.get_block(1) # state=BADSEGNUM
3407+            d1 = defer.Deferred()
3408+            d2 = defer.Deferred()
3409+            d3 = defer.Deferred()
3410+            o1.subscribe(lambda **kwargs: d1.callback(kwargs))
3411+            o2.subscribe(lambda **kwargs: d2.callback(kwargs))
3412+            o3.subscribe(lambda **kwargs: d3.callback(kwargs))
3413+            return defer.gatherResults([d1,d2,d3])
3414+        d.addCallback(_use_shares)
3415+        def _done(res):
3416+            r1,r2,r3 = res
3417+            self.failUnlessEqual(r1["state"], "COMPLETE")
3418+            self.failUnlessEqual(r2["state"], "COMPLETE")
3419+            self.failUnlessEqual(r3["state"], "BADSEGNUM")
3420+            self.failUnless("block" in r1)
3421+            self.failUnless("block" in r2)
3422+            self.failIf(stay_empty)
3423+        d.addCallback(_done)
3424+        return d
3425+
3426+    def test_download_no_overrun(self):
3427+        self.basedir = self.mktemp()
3428+        self.set_up_grid()
3429+        self.c0 = self.g.clients[0]
3430+
3431+        self.load_shares()
3432+
3433+        # tweak the client's copies of server-version data, so it believes
3434+        # that they're old and can't handle reads that overrun the length of
3435+        # the share. This exercises a different code path.
3436+        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
3437+            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
3438+            v1["tolerates-immutable-read-overrun"] = False
3439+
3440+        n = self.c0.create_node_from_uri(immutable_uri)
3441+        d = download_to_data(n)
3442+        def _got_data(data):
3443+            self.failUnlessEqual(data, plaintext)
3444+        d.addCallback(_got_data)
3445+        return d
3446+
3447+    def test_download_segment(self):
3448+        self.basedir = self.mktemp()
3449+        self.set_up_grid()
3450+        self.c0 = self.g.clients[0]
3451+        self.load_shares()
3452+        n = self.c0.create_node_from_uri(immutable_uri)
3453+        cn = n._cnode
3454+        (d,c) = cn.get_segment(0)
3455+        def _got_segment((offset,data,decodetime)):
3456+            self.failUnlessEqual(offset, 0)
3457+            self.failUnlessEqual(len(data), len(plaintext))
3458+        d.addCallback(_got_segment)
3459+        return d
3460+
3461+    def test_download_segment_cancel(self):
3462+        self.basedir = self.mktemp()
3463+        self.set_up_grid()
3464+        self.c0 = self.g.clients[0]
3465+        self.load_shares()
3466+        n = self.c0.create_node_from_uri(immutable_uri)
3467+        cn = n._cnode
3468+        (d,c) = cn.get_segment(0)
3469+        fired = []
3470+        d.addCallback(fired.append)
3471+        c.cancel()
3472+        d = fireEventually()
3473+        d.addCallback(flushEventualQueue)
3474+        def _check(ign):
3475+            self.failUnlessEqual(fired, [])
3476+        d.addCallback(_check)
3477+        return d
3478+
3479+    def test_download_bad_segment(self):
3480+        self.basedir = self.mktemp()
3481+        self.set_up_grid()
3482+        self.c0 = self.g.clients[0]
3483+        self.load_shares()
3484+        n = self.c0.create_node_from_uri(immutable_uri)
3485+        cn = n._cnode
3486+        def _try_download():
3487+            (d,c) = cn.get_segment(1)
3488+            return d
3489+        d = self.shouldFail(BadSegmentNumberError, "badseg",
3490+                            "segnum=1, numsegs=1",
3491+                            _try_download)
3492+        return d
3493+
3494+    def test_download_segment_terminate(self):
3495+        self.basedir = self.mktemp()
3496+        self.set_up_grid()
3497+        self.c0 = self.g.clients[0]
3498+        self.load_shares()
3499+        n = self.c0.create_node_from_uri(immutable_uri)
3500+        cn = n._cnode
3501+        (d,c) = cn.get_segment(0)
3502+        fired = []
3503+        d.addCallback(fired.append)
3504+        self.c0.terminator.disownServiceParent()
3505+        d = fireEventually()
3506+        d.addCallback(flushEventualQueue)
3507+        def _check(ign):
3508+            self.failUnlessEqual(fired, [])
3509+        d.addCallback(_check)
3510+        return d
3511+
3512+    def test_pause(self):
3513+        self.basedir = self.mktemp()
3514+        self.set_up_grid()
3515+        self.c0 = self.g.clients[0]
3516+        self.load_shares()
3517+        n = self.c0.create_node_from_uri(immutable_uri)
3518+        c = PausingConsumer()
3519+        d = n.read(c)
3520+        def _downloaded(mc):
3521+            newdata = "".join(mc.chunks)
3522+            self.failUnlessEqual(newdata, plaintext)
3523+        d.addCallback(_downloaded)
3524+        return d
3525+
3526+    def test_pause_then_stop(self):
3527+        self.basedir = self.mktemp()
3528+        self.set_up_grid()
3529+        self.c0 = self.g.clients[0]
3530+        self.load_shares()
3531+        n = self.c0.create_node_from_uri(immutable_uri)
3532+        c = PausingAndStoppingConsumer()
3533+        d = self.shouldFail(DownloadStopped, "test_pause_then_stop",
3534+                            "our Consumer called stopProducing()",
3535+                            n.read, c)
3536+        return d
3537+
3538+    def test_stop(self):
3539+        # use a download targetthat does an immediate stop (ticket #473)
3540+        self.basedir = self.mktemp()
3541+        self.set_up_grid()
3542+        self.c0 = self.g.clients[0]
3543+        self.load_shares()
3544+        n = self.c0.create_node_from_uri(immutable_uri)
3545+        c = StoppingConsumer()
3546+        d = self.shouldFail(DownloadStopped, "test_stop",
3547+                            "our Consumer called stopProducing()",
3548+                            n.read, c)
3549+        return d
3550+
3551+    def test_download_segment_bad_ciphertext_hash(self):
3552+        # The crypttext_hash_tree asserts the integrity of the decoded
3553+        # ciphertext, and exists to detect two sorts of problems. The first
3554+        # is a bug in zfec decode. The second is the "two-sided t-shirt"
3555+        # attack (found by Christian Grothoff), in which a malicious uploader
3556+        # creates two sets of shares (one for file A, second for file B),
3557+        # uploads a combination of them (shares 0-4 of A, 5-9 of B), and then
3558+        # builds an otherwise normal UEB around those shares: their goal is
3559+        # to give their victim a filecap which sometimes downloads the good A
3560+        # contents, and sometimes the bad B contents, depending upon which
3561+        # servers/shares they can get to. Having a hash of the ciphertext
3562+        # forces them to commit to exactly one version. (Christian's prize
3563+        # for finding this problem was a t-shirt with two sides: the shares
3564+        # of file A on the front, B on the back).
3565+
3566+        # creating a set of shares with this property is too hard, although
3567+        # it'd be nice to do so and confirm our fix. (it requires a lot of
3568+        # tampering with the uploader). So instead, we just damage the
3569+        # decoder. The tail decoder is rebuilt each time, so we need to use a
3570+        # file with multiple segments.
3571+        self.basedir = self.mktemp()
3572+        self.set_up_grid()
3573+        self.c0 = self.g.clients[0]
3574+
3575+        u = upload.Data(plaintext, None)
3576+        u.max_segment_size = 60 # 6 segs
3577+        d = self.c0.upload(u)
3578+        def _uploaded(ur):
3579+            n = self.c0.create_node_from_uri(ur.uri)
3580+            n._cnode._node._build_guessed_tables(u.max_segment_size)
3581+
3582+            d = download_to_data(n)
3583+            def _break_codec(data):
3584+                # the codec isn't created until the UEB is retrieved
3585+                node = n._cnode._node
3586+                vcap = node._verifycap
3587+                k, N = vcap.needed_shares, vcap.total_shares
3588+                bad_codec = BrokenDecoder()
3589+                bad_codec.set_params(node.segment_size, k, N)
3590+                node._codec = bad_codec
3591+            d.addCallback(_break_codec)
3592+            # now try to download it again. The broken codec will provide
3593+            # ciphertext that fails the hash test.
3594+            d.addCallback(lambda ign:
3595+                          self.shouldFail(BadCiphertextHashError, "badhash",
3596+                                          "hash failure in "
3597+                                          "ciphertext_hash_tree: segnum=0",
3598+                                          download_to_data, n))
3599+            return d
3600+        d.addCallback(_uploaded)
3601+        return d
3602+
3603+    def OFFtest_download_segment_XXX(self):
3604+        self.basedir = self.mktemp()
3605+        self.set_up_grid()
3606+        self.c0 = self.g.clients[0]
3607+
3608+        # upload a file with multiple segments, and a non-default segsize, to
3609+        # exercise the offset-guessing code. This time we *do* tell the
3610+        # downloader about the unusual segsize, so it can guess right.
3611+        u = upload.Data(plaintext, None)
3612+        u.max_segment_size = 70 # 5 segs, 8-wide hashtree
3613+        con1 = MemoryConsumer()
3614+        con2 = MemoryConsumer()
3615+        d = self.c0.upload(u)
3616+        def _uploaded(ur):
3617+            n = self.c0.create_node_from_uri(ur.uri)
3618+            n._cnode._node._build_guessed_tables(u.max_segment_size)
3619+            d1 = n.read(con1, 70, 20)
3620+            #d2 = n.read(con2, 140, 20)
3621+            d2 = defer.succeed(None)
3622+            return defer.gatherResults([d1,d2])
3623+        d.addCallback(_uploaded)
3624+        def _done(res):
3625+            self.failUnlessEqual("".join(con1.chunks), plaintext[70:90])
3626+            self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
3627+        #d.addCallback(_done)
3628+        return d
3629+
3630+    def test_duplicate_shares(self):
3631+        self.basedir = self.mktemp()
3632+        self.set_up_grid()
3633+        self.c0 = self.g.clients[0]
3634+
3635+        self.load_shares()
3636+        # make sure everybody has a copy of sh0. The second server contacted
3637+        # will report two shares, and the ShareFinder will handle the
3638+        # duplicate by attaching both to the same CommonShare instance.
3639+        si = uri.from_string(immutable_uri).get_storage_index()
3640+        si_dir = storage_index_to_dir(si)
3641+        sh0_file = [sharefile
3642+                    for (shnum, serverid, sharefile)
3643+                    in self.find_uri_shares(immutable_uri)
3644+                    if shnum == 0][0]
3645+        sh0_data = open(sh0_file, "rb").read()
3646+        for clientnum in immutable_shares:
3647+            if 0 in immutable_shares[clientnum]:
3648+                continue
3649+            cdir = self.get_serverdir(clientnum)
3650+            target = os.path.join(cdir, "shares", si_dir, "0")
3651+            outf = open(target, "wb")
3652+            outf.write(sh0_data)
3653+            outf.close()
3654+
3655+        d = self.download_immutable()
3656+        return d
3657+
3658+    def test_verifycap(self):
3659+        self.basedir = self.mktemp()
3660+        self.set_up_grid()
3661+        self.c0 = self.g.clients[0]
3662+        self.load_shares()
3663+
3664+        n = self.c0.create_node_from_uri(immutable_uri)
3665+        vcap = n.get_verify_cap().to_string()
3666+        vn = self.c0.create_node_from_uri(vcap)
3667+        d = download_to_data(vn)
3668+        def _got_ciphertext(ciphertext):
3669+            self.failUnlessEqual(len(ciphertext), len(plaintext))
3670+            self.failIfEqual(ciphertext, plaintext)
3671+        d.addCallback(_got_ciphertext)
3672+        return d
3673+
3674+class BrokenDecoder(CRSDecoder):
3675+    def decode(self, shares, shareids):
3676+        d = CRSDecoder.decode(self, shares, shareids)
3677+        def _decoded(buffers):
3678+            def _corruptor(s, which):
3679+                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
3680+            buffers[0] = _corruptor(buffers[0], 0) # flip lsb of first byte
3681+            return buffers
3682+        d.addCallback(_decoded)
3683+        return d
3684+
3685+
3686+class PausingConsumer(MemoryConsumer):
3687+    def __init__(self):
3688+        MemoryConsumer.__init__(self)
3689+        self.size = 0
3690+        self.writes = 0
3691+    def write(self, data):
3692+        self.size += len(data)
3693+        self.writes += 1
3694+        if self.writes <= 2:
3695+            # we happen to use 4 segments, and want to avoid pausing on the
3696+            # last one (since then the _unpause timer will still be running)
3697+            self.producer.pauseProducing()
3698+            reactor.callLater(0.1, self._unpause)
3699+        return MemoryConsumer.write(self, data)
3700+    def _unpause(self):
3701+        self.producer.resumeProducing()
3702+
3703+class PausingAndStoppingConsumer(PausingConsumer):
3704+    def write(self, data):
3705+        self.producer.pauseProducing()
3706+        reactor.callLater(0.5, self._stop)
3707+    def _stop(self):
3708+        self.producer.stopProducing()
3709+
3710+class StoppingConsumer(PausingConsumer):
3711+    def write(self, data):
3712+        self.producer.stopProducing()
3713+
3714+class StallingConsumer(MemoryConsumer):
3715+    def __init__(self, halfway_cb):
3716+        MemoryConsumer.__init__(self)
3717+        self.halfway_cb = halfway_cb
3718+        self.writes = 0
3719+    def write(self, data):
3720+        self.writes += 1
3721+        if self.writes == 1:
3722+            self.halfway_cb()
3723+        return MemoryConsumer.write(self, data)
3724+
3725+class Corruption(_Base, unittest.TestCase):
3726+
3727+    def _corrupt_flip(self, ign, imm_uri, which):
3728+        log.msg("corrupt %d" % which)
3729+        def _corruptor(s, debug=False):
3730+            return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
3731+        self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
3732+
3733+    def _corrupt_set(self, ign, imm_uri, which, newvalue):
3734+        log.msg("corrupt %d" % which)
3735+        def _corruptor(s, debug=False):
3736+            return s[:which] + chr(newvalue) + s[which+1:]
3737+        self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
3738+
3739+    def test_each_byte(self):
3740+        # Setting catalog_detection=True performs an exhaustive test of the
3741+        # Downloader's response to corruption in the lsb of each byte of the
3742+        # 2070-byte share, with two goals: make sure we tolerate all forms of
3743+        # corruption (i.e. don't hang or return bad data), and make a list of
3744+        # which bytes can be corrupted without influencing the download
3745+        # (since we don't need every byte of the share). That takes 50s to
3746+        # run on my laptop and doesn't have any actual asserts, so we don't
3747+        # normally do that.
3748+        self.catalog_detection = False
3749+
3750+        self.basedir = "download/Corruption/each_byte"
3751+        self.set_up_grid()
3752+        self.c0 = self.g.clients[0]
3753+
3754+        # to exercise the block-hash-tree code properly, we need to have
3755+        # multiple segments. We don't tell the downloader about the different
3756+        # segsize, so it guesses wrong and must do extra roundtrips.
3757+        u = upload.Data(plaintext, None)
3758+        u.max_segment_size = 120 # 3 segs, 4-wide hashtree
3759+
3760+        if self.catalog_detection:
3761+            undetected = spans.Spans()
3762+
3763+        def _download(ign, imm_uri, which, expected):
3764+            n = self.c0.create_node_from_uri(imm_uri)
3765+            # for this test to work, we need to have a new Node each time.
3766+            # Make sure the NodeMaker's weakcache hasn't interfered.
3767+            assert not n._cnode._node._shares
3768+            d = download_to_data(n)
3769+            def _got_data(data):
3770+                self.failUnlessEqual(data, plaintext)
3771+                shnums = sorted([s._shnum for s in n._cnode._node._shares])
3772+                no_sh0 = bool(0 not in shnums)
3773+                sh0 = [s for s in n._cnode._node._shares if s._shnum == 0]
3774+                sh0_had_corruption = False
3775+                if sh0 and sh0[0].had_corruption:
3776+                    sh0_had_corruption = True
3777+                num_needed = len(n._cnode._node._shares)
3778+                if self.catalog_detection:
3779+                    detected = no_sh0 or sh0_had_corruption or (num_needed!=3)
3780+                    if not detected:
3781+                        undetected.add(which, 1)
3782+                if expected == "no-sh0":
3783+                    self.failIfIn(0, shnums)
3784+                elif expected == "0bad-need-3":
3785+                    self.failIf(no_sh0)
3786+                    self.failUnless(sh0[0].had_corruption)
3787+                    self.failUnlessEqual(num_needed, 3)
3788+                elif expected == "need-4th":
3789+                    self.failIf(no_sh0)
3790+                    self.failUnless(sh0[0].had_corruption)
3791+                    self.failIfEqual(num_needed, 3)
3792+            d.addCallback(_got_data)
3793+            return d
3794+
3795+
3796+        d = self.c0.upload(u)
3797+        def _uploaded(ur):
3798+            imm_uri = ur.uri
3799+            self.shares = self.copy_shares(imm_uri)
3800+            d = defer.succeed(None)
3801+            # 'victims' is a list of corruption tests to run. Each one flips
3802+            # the low-order bit of the specified offset in the share file (so
3803+            # offset=0 is the MSB of the container version, offset=15 is the
3804+            # LSB of the share version, offset=24 is the MSB of the
3805+            # data-block-offset, and offset=48 is the first byte of the first
3806+            # data-block). Each one also specifies what sort of corruption
3807+            # we're expecting to see.
3808+            no_sh0_victims = [0,1,2,3] # container version
3809+            need3_victims =  [ ] # none currently in this category
3810+            # when the offsets are corrupted, the Share will be unable to
3811+            # retrieve the data it wants (because it thinks that data lives
3812+            # off in the weeds somewhere), and Share treats DataUnavailable
3813+            # as abandon-this-share, so in general we'll be forced to look
3814+            # for a 4th share.
3815+            need_4th_victims = [12,13,14,15, # share version
3816+                                24,25,26,27, # offset[data]
3817+                                32,33,34,35, # offset[crypttext_hash_tree]
3818+                                36,37,38,39, # offset[block_hashes]
3819+                                44,45,46,47, # offset[UEB]
3820+                                ]
3821+            need_4th_victims.append(48) # block data
3822+            # when corrupting hash trees, we must corrupt a value that isn't
3823+            # directly set from somewhere else. Since we download data from
3824+            # seg0, corrupt something on its hash chain, like [2] (the
3825+            # right-hand child of the root)
3826+            need_4th_victims.append(600+2*32) # block_hashes[2]
3827+            # Share.loop is pretty conservative: it abandons the share at the
3828+            # first sign of corruption. It doesn't strictly need to be this
3829+            # way: if the UEB were corrupt, we could still get good block
3830+            # data from that share, as long as there was a good copy of the
3831+            # UEB elsewhere. If this behavior is relaxed, then corruption in
3832+            # the following fields (which are present in multiple shares)
3833+            # should fall into the "need3_victims" case instead of the
3834+            # "need_4th_victims" case.
3835+            need_4th_victims.append(376+2*32) # crypttext_hash_tree[2]
3836+            need_4th_victims.append(824) # share_hashes
3837+            need_4th_victims.append(994) # UEB length
3838+            need_4th_victims.append(998) # UEB
3839+            corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] +
3840+                          [(i, "0bad-need-3") for i in need3_victims] +
3841+                          [(i, "need-4th") for i in need_4th_victims])
3842+            if self.catalog_detection:
3843+                corrupt_me = [(i, "") for i in range(len(self.sh0_orig))]
3844+            for i,expected in corrupt_me:
3845+                # All these tests result in a successful download. What we're
3846+                # measuring is how many shares the downloader had to use.
3847+                d.addCallback(self._corrupt_flip, imm_uri, i)
3848+                d.addCallback(_download, imm_uri, i, expected)
3849+                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
3850+                d.addCallback(fireEventually)
3851+            corrupt_values = [(3, 2, "no-sh0"),
3852+                              (15, 2, "need-4th"), # share looks v2
3853+                              ]
3854+            for i,newvalue,expected in corrupt_values:
3855+                d.addCallback(self._corrupt_set, imm_uri, i, newvalue)
3856+                d.addCallback(_download, imm_uri, i, expected)
3857+                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
3858+                d.addCallback(fireEventually)
3859+            return d
3860+        d.addCallback(_uploaded)
3861+        def _show_results(ign):
3862+            print
3863+            print ("of [0:%d], corruption ignored in %s" %
3864+                   (len(self.sh0_orig), undetected.dump()))
3865+        if self.catalog_detection:
3866+            d.addCallback(_show_results)
3867+            # of [0:2070], corruption ignored in len=1133:
3868+            # [4-11],[16-23],[28-31],[152-439],[600-663],[1309-2069]
3869+            #  [4-11]: container sizes
3870+            #  [16-23]: share block/data sizes
3871+            #  [152-375]: plaintext hash tree
3872+            #  [376-408]: crypttext_hash_tree[0] (root)
3873+            #  [408-439]: crypttext_hash_tree[1] (computed)
3874+            #  [600-631]: block hash tree[0] (root)
3875+            #  [632-663]: block hash tree[1] (computed)
3876+            #  [1309-]: reserved+unused UEB space
3877+        return d
3878+
3879+    def test_failure(self):
3880+        # this test corrupts all shares in the same way, and asserts that the
3881+        # download fails.
3882+
3883+        self.basedir = "download/Corruption/failure"
3884+        self.set_up_grid()
3885+        self.c0 = self.g.clients[0]
3886+
3887+        # to exercise the block-hash-tree code properly, we need to have
3888+        # multiple segments. We don't tell the downloader about the different
3889+        # segsize, so it guesses wrong and must do extra roundtrips.
3890+        u = upload.Data(plaintext, None)
3891+        u.max_segment_size = 120 # 3 segs, 4-wide hashtree
3892+
3893+        d = self.c0.upload(u)
3894+        def _uploaded(ur):
3895+            imm_uri = ur.uri
3896+            self.shares = self.copy_shares(imm_uri)
3897+
3898+            corrupt_me = [(48, "block data", "Last failure: None"),
3899+                          (600+2*32, "block_hashes[2]", "BadHashError"),
3900+                          (376+2*32, "crypttext_hash_tree[2]", "BadHashError"),
3901+                          (824, "share_hashes", "BadHashError"),
3902+                          ]
3903+            def _download(imm_uri):
3904+                n = self.c0.create_node_from_uri(imm_uri)
3905+                # for this test to work, we need to have a new Node each time.
3906+                # Make sure the NodeMaker's weakcache hasn't interfered.
3907+                assert not n._cnode._node._shares
3908+                return download_to_data(n)
3909+
3910+            d = defer.succeed(None)
3911+            for i,which,substring in corrupt_me:
3912+                # All these tests result in a failed download.
3913+                d.addCallback(self._corrupt_flip_all, imm_uri, i)
3914+                d.addCallback(lambda ign:
3915+                              self.shouldFail(NotEnoughSharesError, which,
3916+                                              substring,
3917+                                              _download, imm_uri))
3918+                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
3919+                d.addCallback(fireEventually)
3920+            return d
3921+        d.addCallback(_uploaded)
3922+
3923+        return d
3924+
3925+    def _corrupt_flip_all(self, ign, imm_uri, which):
3926+        def _corruptor(s, debug=False):
3927+            return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
3928+        self.corrupt_all_shares(imm_uri, _corruptor)
3929+
3930+class DownloadV2(_Base, unittest.TestCase):
3931+    # tests which exercise v2-share code. They first upload a file with
3932+    # FORCE_V2 set.
3933+
3934+    def setUp(self):
3935+        d = defer.maybeDeferred(_Base.setUp, self)
3936+        def _set_force_v2(ign):
3937+            self.old_force_v2 = layout.FORCE_V2
3938+            layout.FORCE_V2 = True
3939+        d.addCallback(_set_force_v2)
3940+        return d
3941+    def tearDown(self):
3942+        layout.FORCE_V2 = self.old_force_v2
3943+        return _Base.tearDown(self)
3944+
3945+    def test_download(self):
3946+        self.basedir = self.mktemp()
3947+        self.set_up_grid()
3948+        self.c0 = self.g.clients[0]
3949+
3950+        # upload a file
3951+        u = upload.Data(plaintext, None)
3952+        d = self.c0.upload(u)
3953+        def _uploaded(ur):
3954+            imm_uri = ur.uri
3955+            n = self.c0.create_node_from_uri(imm_uri)
3956+            return download_to_data(n)
3957+        d.addCallback(_uploaded)
3958+        return d
3959+
3960+    def test_download_no_overrun(self):
3961+        self.basedir = self.mktemp()
3962+        self.set_up_grid()
3963+        self.c0 = self.g.clients[0]
3964+
3965+        # tweak the client's copies of server-version data, so it believes
3966+        # that they're old and can't handle reads that overrun the length of
3967+        # the share. This exercises a different code path.
3968+        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
3969+            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
3970+            v1["tolerates-immutable-read-overrun"] = False
3971+
3972+        # upload a file
3973+        u = upload.Data(plaintext, None)
3974+        d = self.c0.upload(u)
3975+        def _uploaded(ur):
3976+            imm_uri = ur.uri
3977+            n = self.c0.create_node_from_uri(imm_uri)
3978+            return download_to_data(n)
3979+        d.addCallback(_uploaded)
3980+        return d
3981+
3982+    def OFF_test_no_overrun_corrupt_shver(self): # unnecessary
3983+        self.basedir = self.mktemp()
3984+        self.set_up_grid()
3985+        self.c0 = self.g.clients[0]
3986+
3987+        for (peerid, rref) in self.c0.storage_broker.get_all_servers():
3988+            v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
3989+            v1["tolerates-immutable-read-overrun"] = False
3990+
3991+        # upload a file
3992+        u = upload.Data(plaintext, None)
3993+        d = self.c0.upload(u)
3994+        def _uploaded(ur):
3995+            imm_uri = ur.uri
3996+            def _do_corrupt(which, newvalue):
3997+                def _corruptor(s, debug=False):
3998+                    return s[:which] + chr(newvalue) + s[which+1:]
3999+                self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
4000+            _do_corrupt(12+3, 0x00)
4001+            n = self.c0.create_node_from_uri(imm_uri)
4002+            d = download_to_data(n)
4003+            def _got_data(data):
4004+                self.failUnlessEqual(data, plaintext)
4005+            d.addCallback(_got_data)
4006+            return d
4007+        d.addCallback(_uploaded)
4008+        return d
4009hunk ./src/allmydata/test/test_encode.py 3
4010 from zope.interface import implements
4011 from twisted.trial import unittest
4012-from twisted.internet import defer, reactor
4013+from twisted.internet import defer
4014 from twisted.python.failure import Failure
4015 from foolscap.api import fireEventually
4016hunk ./src/allmydata/test/test_encode.py 6
4017-from allmydata import hashtree, uri
4018-from allmydata.immutable import encode, upload, download
4019+from allmydata import uri
4020+from allmydata.immutable import encode, upload, checker
4021 from allmydata.util import hashutil
4022 from allmydata.util.assertutil import _assert
4023hunk ./src/allmydata/test/test_encode.py 10
4024-from allmydata.util.consumer import MemoryConsumer
4025-from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
4026-     NotEnoughSharesError, IStorageBroker, UploadUnhappinessError
4027-from allmydata.monitor import Monitor
4028-import allmydata.test.common_util as testutil
4029+from allmydata.util.consumer import download_to_data
4030+from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader
4031+from allmydata.test.no_network import GridTestMixin
4032 
4033 class LostPeerError(Exception):
4034     pass
4035hunk ./src/allmydata/test/test_encode.py 20
4036 def flip_bit(good): # flips the last bit
4037     return good[:-1] + chr(ord(good[-1]) ^ 0x01)
4038 
4039-class FakeStorageBroker:
4040-    implements(IStorageBroker)
4041-
4042 class FakeBucketReaderWriterProxy:
4043     implements(IStorageBucketWriter, IStorageBucketReader)
4044     # these are used for both reading and writing
4045hunk ./src/allmydata/test/test_encode.py 57
4046             self.blocks[segmentnum] = data
4047         return defer.maybeDeferred(_try)
4048 
4049-    def put_plaintext_hashes(self, hashes):
4050-        def _try():
4051-            assert not self.closed
4052-            assert not self.plaintext_hashes
4053-            self.plaintext_hashes = hashes
4054-        return defer.maybeDeferred(_try)
4055-
4056     def put_crypttext_hashes(self, hashes):
4057         def _try():
4058             assert not self.closed
4059hunk ./src/allmydata/test/test_encode.py 214
4060         fb = FakeBucketReaderWriterProxy()
4061         fb.put_uri_extension(uebstring)
4062         verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
4063-        vup = download.ValidatedExtendedURIProxy(fb, verifycap)
4064+        vup = checker.ValidatedExtendedURIProxy(fb, verifycap)
4065         return vup.start()
4066 
4067     def _test_accept(self, uebdict):
4068hunk ./src/allmydata/test/test_encode.py 228
4069 
4070     def _test_reject(self, uebdict):
4071         d = self._test(uebdict)
4072-        d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
4073+        d.addBoth(self._should_fail, (KeyError, checker.BadURIExtension))
4074         return d
4075 
4076     def test_accept_minimal(self):
4077hunk ./src/allmydata/test/test_encode.py 324
4078 
4079         return d
4080 
4081-    # a series of 3*3 tests to check out edge conditions. One axis is how the
4082-    # plaintext is divided into segments: kn+(-1,0,1). Another way to express
4083-    # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we
4084-    # might test 74 bytes, 75 bytes, and 76 bytes.
4085-
4086-    # on the other axis is how many leaves in the block hash tree we wind up
4087-    # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
4088-    # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
4089-    # segments, and 5 segments.
4090-
4091-    # that results in the following series of data lengths:
4092-    #  3 segs: 74, 75, 51
4093-    #  4 segs: 99, 100, 76
4094-    #  5 segs: 124, 125, 101
4095-
4096-    # all tests encode to 100 shares, which means the share hash tree will
4097-    # have 128 leaves, which means that buckets will be given an 8-long share
4098-    # hash chain
4099-
4100-    # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
4101-    # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
4102-    # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
4103-    # trees, which get 15 blockhashes.
4104-
4105     def test_send_74(self):
4106         # 3 segments (25, 25, 24)
4107         return self.do_encode(25, 74, 100, 3, 7, 8)
4108hunk ./src/allmydata/test/test_encode.py 354
4109         # 5 segments: 25, 25, 25, 25, 1
4110         return self.do_encode(25, 101, 100, 5, 15, 8)
4111 
4112-class PausingConsumer(MemoryConsumer):
4113-    def __init__(self):
4114-        MemoryConsumer.__init__(self)
4115-        self.size = 0
4116-        self.writes = 0
4117-    def write(self, data):
4118-        self.size += len(data)
4119-        self.writes += 1
4120-        if self.writes <= 2:
4121-            # we happen to use 4 segments, and want to avoid pausing on the
4122-            # last one (since then the _unpause timer will still be running)
4123-            self.producer.pauseProducing()
4124-            reactor.callLater(0.1, self._unpause)
4125-        return MemoryConsumer.write(self, data)
4126-    def _unpause(self):
4127-        self.producer.resumeProducing()
4128-
4129-class PausingAndStoppingConsumer(PausingConsumer):
4130-    def write(self, data):
4131-        self.producer.pauseProducing()
4132-        reactor.callLater(0.5, self._stop)
4133-    def _stop(self):
4134-        self.producer.stopProducing()
4135-
4136-class StoppingConsumer(PausingConsumer):
4137-    def write(self, data):
4138-        self.producer.stopProducing()
4139-
4140-class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
4141-    timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box.
4142-    def send_and_recover(self, k_and_happy_and_n=(25,75,100),
4143-                         AVAILABLE_SHARES=None,
4144-                         datalen=76,
4145-                         max_segment_size=25,
4146-                         bucket_modes={},
4147-                         recover_mode="recover",
4148-                         consumer=None,
4149-                         ):
4150-        if AVAILABLE_SHARES is None:
4151-            AVAILABLE_SHARES = k_and_happy_and_n[2]
4152-        data = make_data(datalen)
4153-        d = self.send(k_and_happy_and_n, AVAILABLE_SHARES,
4154-                      max_segment_size, bucket_modes, data)
4155-        # that fires with (uri_extension_hash, e, shareholders)
4156-        d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode,
4157-                      consumer=consumer)
4158-        # that fires with newdata
4159-        def _downloaded((newdata, fd)):
4160-            self.failUnless(newdata == data, str((len(newdata), len(data))))
4161-            return fd
4162-        d.addCallback(_downloaded)
4163-        return d
4164-
4165-    def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,
4166-             bucket_modes, data):
4167-        k, happy, n = k_and_happy_and_n
4168-        NUM_SHARES = k_and_happy_and_n[2]
4169-        if AVAILABLE_SHARES is None:
4170-            AVAILABLE_SHARES = NUM_SHARES
4171-        e = encode.Encoder()
4172-        u = upload.Data(data, convergence="some convergence string")
4173-        # force use of multiple segments by using a low max_segment_size
4174-        u.max_segment_size = max_segment_size
4175-        u.encoding_param_k = k
4176-        u.encoding_param_happy = happy
4177-        u.encoding_param_n = n
4178-        eu = upload.EncryptAnUploadable(u)
4179-        d = e.set_encrypted_uploadable(eu)
4180-
4181-        shareholders = {}
4182-        def _ready(res):
4183-            k,happy,n = e.get_param("share_counts")
4184-            assert n == NUM_SHARES # else we'll be completely confused
4185-            servermap = {}
4186-            for shnum in range(NUM_SHARES):
4187-                mode = bucket_modes.get(shnum, "good")
4188-                peer = FakeBucketReaderWriterProxy(mode, "peer%d" % shnum)
4189-                shareholders[shnum] = peer
4190-                servermap.setdefault(shnum, set()).add(peer.get_peerid())
4191-            e.set_shareholders(shareholders, servermap)
4192-            return e.start()
4193-        d.addCallback(_ready)
4194-        def _sent(res):
4195-            d1 = u.get_encryption_key()
4196-            d1.addCallback(lambda key: (res, key, shareholders))
4197-            return d1
4198-        d.addCallback(_sent)
4199-        return d
4200-
4201-    def recover(self, (res, key, shareholders), AVAILABLE_SHARES,
4202-                recover_mode, consumer=None):
4203-        verifycap = res
4204-
4205-        if "corrupt_key" in recover_mode:
4206-            # we corrupt the key, so that the decrypted data is corrupted and
4207-            # will fail the plaintext hash check. Since we're manually
4208-            # attaching shareholders, the fact that the storage index is also
4209-            # corrupted doesn't matter.
4210-            key = flip_bit(key)
4211-
4212-        u = uri.CHKFileURI(key=key,
4213-                           uri_extension_hash=verifycap.uri_extension_hash,
4214-                           needed_shares=verifycap.needed_shares,
4215-                           total_shares=verifycap.total_shares,
4216-                           size=verifycap.size)
4217-
4218-        sb = FakeStorageBroker()
4219-        if not consumer:
4220-            consumer = MemoryConsumer()
4221-        innertarget = download.ConsumerAdapter(consumer)
4222-        target = download.DecryptingTarget(innertarget, u.key)
4223-        fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())
4224-
4225-        # we manually cycle the CiphertextDownloader through a number of steps that
4226-        # would normally be sequenced by a Deferred chain in
4227-        # CiphertextDownloader.start(), to give us more control over the process.
4228-        # In particular, by bypassing _get_all_shareholders, we skip
4229-        # permuted-peerlist selection.
4230-        for shnum, bucket in shareholders.items():
4231-            if shnum < AVAILABLE_SHARES and bucket.closed:
4232-                fd.add_share_bucket(shnum, bucket)
4233-        fd._got_all_shareholders(None)
4234-
4235-        # Make it possible to obtain uri_extension from the shareholders.
4236-        # Arrange for shareholders[0] to be the first, so we can selectively
4237-        # corrupt the data it returns.
4238-        uri_extension_sources = shareholders.values()
4239-        uri_extension_sources.remove(shareholders[0])
4240-        uri_extension_sources.insert(0, shareholders[0])
4241-
4242-        d = defer.succeed(None)
4243-
4244-        # have the CiphertextDownloader retrieve a copy of uri_extension itself
4245-        d.addCallback(fd._obtain_uri_extension)
4246 
4247hunk ./src/allmydata/test/test_encode.py 355
4248-        if "corrupt_crypttext_hashes" in recover_mode:
4249-            # replace everybody's crypttext hash trees with a different one
4250-            # (computed over a different file), then modify our uri_extension
4251-            # to reflect the new crypttext hash tree root
4252-            def _corrupt_crypttext_hashes(unused):
4253-                assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
4254-                assert fd._vup.crypttext_root_hash, fd._vup
4255-                badhash = hashutil.tagged_hash("bogus", "data")
4256-                bad_crypttext_hashes = [badhash] * fd._vup.num_segments
4257-                badtree = hashtree.HashTree(bad_crypttext_hashes)
4258-                for bucket in shareholders.values():
4259-                    bucket.crypttext_hashes = list(badtree)
4260-                fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
4261-                fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
4262-                return fd._vup
4263-            d.addCallback(_corrupt_crypttext_hashes)
4264+class Roundtrip(GridTestMixin, unittest.TestCase):
4265 
4266hunk ./src/allmydata/test/test_encode.py 357
4267-        # also have the CiphertextDownloader ask for hash trees
4268-        d.addCallback(fd._get_crypttext_hash_tree)
4269-
4270-        d.addCallback(fd._download_all_segments)
4271-        d.addCallback(fd._done)
4272-        def _done(t):
4273-            newdata = "".join(consumer.chunks)
4274-            return (newdata, fd)
4275-        d.addCallback(_done)
4276-        return d
4277-
4278-    def test_not_enough_shares(self):
4279-        d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
4280-        def _done(res):
4281-            self.failUnless(isinstance(res, Failure))
4282-            self.failUnless(res.check(NotEnoughSharesError))
4283-        d.addBoth(_done)
4284-        return d
4285-
4286-    def test_one_share_per_peer(self):
4287-        return self.send_and_recover()
4288-
4289-    def test_74(self):
4290-        return self.send_and_recover(datalen=74)
4291-    def test_75(self):
4292-        return self.send_and_recover(datalen=75)
4293-    def test_51(self):
4294-        return self.send_and_recover(datalen=51)
4295-
4296-    def test_99(self):
4297-        return self.send_and_recover(datalen=99)
4298-    def test_100(self):
4299-        return self.send_and_recover(datalen=100)
4300-    def test_76(self):
4301-        return self.send_and_recover(datalen=76)
4302-
4303-    def test_124(self):
4304-        return self.send_and_recover(datalen=124)
4305-    def test_125(self):
4306-        return self.send_and_recover(datalen=125)
4307-    def test_101(self):
4308-        return self.send_and_recover(datalen=101)
4309-
4310-    def test_pause(self):
4311-        # use a download target that does pauseProducing/resumeProducing a
4312-        # few times, then finishes
4313-        c = PausingConsumer()
4314-        d = self.send_and_recover(consumer=c)
4315-        return d
4316-
4317-    def test_pause_then_stop(self):
4318-        # use a download target that pauses, then stops.
4319-        c = PausingAndStoppingConsumer()
4320-        d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",
4321-                            "our Consumer called stopProducing()",
4322-                            self.send_and_recover, consumer=c)
4323-        return d
4324-
4325-    def test_stop(self):
4326-        # use a download targetthat does an immediate stop (ticket #473)
4327-        c = StoppingConsumer()
4328-        d = self.shouldFail(download.DownloadStopped, "test_stop",
4329-                            "our Consumer called stopProducing()",
4330-                            self.send_and_recover, consumer=c)
4331-        return d
4332-
4333-    # the following tests all use 4-out-of-10 encoding
4334-
4335-    def test_bad_blocks(self):
4336-        # the first 6 servers have bad blocks, which will be caught by the
4337-        # blockhashes
4338-        modemap = dict([(i, "bad block")
4339-                        for i in range(6)]
4340-                       + [(i, "good")
4341-                          for i in range(6, 10)])
4342-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
4343-
4344-    def test_bad_blocks_failure(self):
4345-        # the first 7 servers have bad blocks, which will be caught by the
4346-        # blockhashes, and the download will fail
4347-        modemap = dict([(i, "bad block")
4348-                        for i in range(7)]
4349-                       + [(i, "good")
4350-                          for i in range(7, 10)])
4351-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4352-        def _done(res):
4353-            self.failUnless(isinstance(res, Failure), res)
4354-            self.failUnless(res.check(NotEnoughSharesError), res)
4355-        d.addBoth(_done)
4356-        return d
4357-
4358-    def test_bad_blockhashes(self):
4359-        # the first 6 servers have bad block hashes, so the blockhash tree
4360-        # will not validate
4361-        modemap = dict([(i, "bad blockhash")
4362-                        for i in range(6)]
4363-                       + [(i, "good")
4364-                          for i in range(6, 10)])
4365-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
4366-
4367-    def test_bad_blockhashes_failure(self):
4368-        # the first 7 servers have bad block hashes, so the blockhash tree
4369-        # will not validate, and the download will fail
4370-        modemap = dict([(i, "bad blockhash")
4371-                        for i in range(7)]
4372-                       + [(i, "good")
4373-                          for i in range(7, 10)])
4374-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4375-        def _done(res):
4376-            self.failUnless(isinstance(res, Failure))
4377-            self.failUnless(res.check(NotEnoughSharesError), res)
4378-        d.addBoth(_done)
4379-        return d
4380-
4381-    def test_bad_sharehashes(self):
4382-        # the first 6 servers have bad block hashes, so the sharehash tree
4383-        # will not validate
4384-        modemap = dict([(i, "bad sharehash")
4385-                        for i in range(6)]
4386-                       + [(i, "good")
4387-                          for i in range(6, 10)])
4388-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
4389-
4390-    def assertFetchFailureIn(self, fd, where):
4391-        expected = {"uri_extension": 0,
4392-                    "crypttext_hash_tree": 0,
4393-                    }
4394-        if where is not None:
4395-            expected[where] += 1
4396-        self.failUnlessEqual(fd._fetch_failures, expected)
4397-
4398-    def test_good(self):
4399-        # just to make sure the test harness works when we aren't
4400-        # intentionally causing failures
4401-        modemap = dict([(i, "good") for i in range(0, 10)])
4402-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4403-        d.addCallback(self.assertFetchFailureIn, None)
4404-        return d
4405-
4406-    def test_bad_uri_extension(self):
4407-        # the first server has a bad uri_extension block, so we will fail
4408-        # over to a different server.
4409-        modemap = dict([(i, "bad uri_extension") for i in range(1)] +
4410-                       [(i, "good") for i in range(1, 10)])
4411-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4412-        d.addCallback(self.assertFetchFailureIn, "uri_extension")
4413-        return d
4414-
4415-    def test_bad_crypttext_hashroot(self):
4416-        # the first server has a bad crypttext hashroot, so we will fail
4417-        # over to a different server.
4418-        modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
4419-                       [(i, "good") for i in range(1, 10)])
4420-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4421-        d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
4422-        return d
4423-
4424-    def test_bad_crypttext_hashes(self):
4425-        # the first server has a bad crypttext hash block, so we will fail
4426-        # over to a different server.
4427-        modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
4428-                       [(i, "good") for i in range(1, 10)])
4429-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4430-        d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
4431-        return d
4432-
4433-    def test_bad_crypttext_hashes_failure(self):
4434-        # to test that the crypttext merkle tree is really being applied, we
4435-        # sneak into the download process and corrupt two things: we replace
4436-        # everybody's crypttext hashtree with a bad version (computed over
4437-        # bogus data), and we modify the supposedly-validated uri_extension
4438-        # block to match the new crypttext hashtree root. The download
4439-        # process should notice that the crypttext coming out of FEC doesn't
4440-        # match the tree, and fail.
4441-
4442-        modemap = dict([(i, "good") for i in range(0, 10)])
4443-        d = self.send_and_recover((4,8,10), bucket_modes=modemap,
4444-                                  recover_mode=("corrupt_crypttext_hashes"))
4445-        def _done(res):
4446-            self.failUnless(isinstance(res, Failure))
4447-            self.failUnless(res.check(hashtree.BadHashError), res)
4448-        d.addBoth(_done)
4449-        return d
4450-
4451-    def OFF_test_bad_plaintext(self):
4452-        # faking a decryption failure is easier: just corrupt the key
4453-        modemap = dict([(i, "good") for i in range(0, 10)])
4454-        d = self.send_and_recover((4,8,10), bucket_modes=modemap,
4455-                                  recover_mode=("corrupt_key"))
4456-        def _done(res):
4457-            self.failUnless(isinstance(res, Failure))
4458-            self.failUnless(res.check(hashtree.BadHashError), res)
4459-        d.addBoth(_done)
4460-        return d
4461+    # a series of 3*3 tests to check out edge conditions. One axis is how the
4462+    # plaintext is divided into segments: kn+(-1,0,1). Another way to express
4463+    # this is n%k == -1 or 0 or 1. For example, for 25-byte segments, we
4464+    # might test 74 bytes, 75 bytes, and 76 bytes.
4465 
4466hunk ./src/allmydata/test/test_encode.py 362
4467-    def test_bad_sharehashes_failure(self):
4468-        # all ten servers have bad share hashes, so the sharehash tree
4469-        # will not validate, and the download will fail
4470-        modemap = dict([(i, "bad sharehash")
4471-                        for i in range(10)])
4472-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4473-        def _done(res):
4474-            self.failUnless(isinstance(res, Failure))
4475-            self.failUnless(res.check(NotEnoughSharesError))
4476-        d.addBoth(_done)
4477-        return d
4478+    # on the other axis is how many leaves in the block hash tree we wind up
4479+    # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns
4480+    # into a single leaf. So we'd like to check out, e.g., 3 segments, 4
4481+    # segments, and 5 segments.
4482 
4483hunk ./src/allmydata/test/test_encode.py 367
4484-    def test_missing_sharehashes(self):
4485-        # the first 6 servers are missing their sharehashes, so the
4486-        # sharehash tree will not validate
4487-        modemap = dict([(i, "missing sharehash")
4488-                        for i in range(6)]
4489-                       + [(i, "good")
4490-                          for i in range(6, 10)])
4491-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
4492+    # that results in the following series of data lengths:
4493+    #  3 segs: 74, 75, 51
4494+    #  4 segs: 99, 100, 76
4495+    #  5 segs: 124, 125, 101
4496 
4497hunk ./src/allmydata/test/test_encode.py 372
4498-    def test_missing_sharehashes_failure(self):
4499-        # all servers are missing their sharehashes, so the sharehash tree will not validate,
4500-        # and the download will fail
4501-        modemap = dict([(i, "missing sharehash")
4502-                        for i in range(10)])
4503-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4504-        def _done(res):
4505-            self.failUnless(isinstance(res, Failure), res)
4506-            self.failUnless(res.check(NotEnoughSharesError), res)
4507-        d.addBoth(_done)
4508-        return d
4509+    # all tests encode to 100 shares, which means the share hash tree will
4510+    # have 128 leaves, which means that buckets will be given an 8-long share
4511+    # hash chain
4512 
4513hunk ./src/allmydata/test/test_encode.py 376
4514-    def test_lost_one_shareholder(self):
4515-        # we have enough shareholders when we start, but one segment in we
4516-        # lose one of them. The upload should still succeed, as long as we
4517-        # still have 'servers_of_happiness' peers left.
4518-        modemap = dict([(i, "good") for i in range(9)] +
4519-                       [(i, "lost") for i in range(9, 10)])
4520-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
4521+    # all 3-segment files will have a 4-leaf blockhashtree, and thus expect
4522+    # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash
4523+    # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash
4524+    # trees, which gets 15 blockhashes.
4525 
4526hunk ./src/allmydata/test/test_encode.py 381
4527-    def test_lost_one_shareholder_early(self):
4528-        # we have enough shareholders when we choose peers, but just before
4529-        # we send the 'start' message, we lose one of them. The upload should
4530-        # still succeed, as long as we still have 'servers_of_happiness' peers
4531-        # left.
4532-        modemap = dict([(i, "good") for i in range(9)] +
4533-                       [(i, "lost-early") for i in range(9, 10)])
4534-        return self.send_and_recover((4,8,10), bucket_modes=modemap)
4535+    def test_74(self): return self.do_test_size(74)
4536+    def test_75(self): return self.do_test_size(75)
4537+    def test_51(self): return self.do_test_size(51)
4538+    def test_99(self): return self.do_test_size(99)
4539+    def test_100(self): return self.do_test_size(100)
4540+    def test_76(self): return self.do_test_size(76)
4541+    def test_124(self): return self.do_test_size(124)
4542+    def test_125(self): return self.do_test_size(125)
4543+    def test_101(self): return self.do_test_size(101)
4544 
4545hunk ./src/allmydata/test/test_encode.py 391
4546-    def test_lost_many_shareholders(self):
4547-        # we have enough shareholders when we start, but one segment in we
4548-        # lose all but one of them. The upload should fail.
4549-        modemap = dict([(i, "good") for i in range(1)] +
4550-                       [(i, "lost") for i in range(1, 10)])
4551-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4552-        def _done(res):
4553-            self.failUnless(isinstance(res, Failure))
4554-            self.failUnless(res.check(UploadUnhappinessError), res)
4555-        d.addBoth(_done)
4556+    def upload(self, data):
4557+        u = upload.Data(data, None)
4558+        u.max_segment_size = 25
4559+        u.encoding_param_k = 25
4560+        u.encoding_param_happy = 1
4561+        u.encoding_param_n = 100
4562+        d = self.c0.upload(u)
4563+        d.addCallback(lambda ur: self.c0.create_node_from_uri(ur.uri))
4564+        # returns a FileNode
4565         return d
4566 
4567hunk ./src/allmydata/test/test_encode.py 402
4568-    def test_lost_all_shareholders(self):
4569-        # we have enough shareholders when we start, but one segment in we
4570-        # lose all of them. The upload should fail.
4571-        modemap = dict([(i, "lost") for i in range(10)])
4572-        d = self.send_and_recover((4,8,10), bucket_modes=modemap)
4573-        def _done(res):
4574-            self.failUnless(isinstance(res, Failure))
4575-            self.failUnless(res.check(UploadUnhappinessError))
4576-        d.addBoth(_done)
4577+    def do_test_size(self, size):
4578+        self.basedir = self.mktemp()
4579+        self.set_up_grid()
4580+        self.c0 = self.g.clients[0]
4581+        DATA = "p"*size
4582+        d = self.upload(DATA)
4583+        d.addCallback(lambda n: download_to_data(n))
4584+        def _downloaded(newdata):
4585+            self.failUnlessEqual(newdata, DATA)
4586+        d.addCallback(_downloaded)
4587         return d
4588hunk ./src/allmydata/test/test_filenode.py 5
4589 from twisted.trial import unittest
4590 from allmydata import uri, client
4591 from allmydata.monitor import Monitor
4592-from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
4593+from allmydata.immutable.literal import LiteralFileNode
4594+from allmydata.immutable.filenode import ImmutableFileNode
4595 from allmydata.mutable.filenode import MutableFileNode
4596hunk ./src/allmydata/test/test_filenode.py 8
4597-from allmydata.util import hashutil, cachedir
4598+from allmydata.util import hashutil
4599 from allmydata.util.consumer import download_to_data
4600 
4601 class NotANode:
4602hunk ./src/allmydata/test/test_filenode.py 34
4603                            needed_shares=3,
4604                            total_shares=10,
4605                            size=1000)
4606-        cf = cachedir.CacheFile("none")
4607-        fn1 = ImmutableFileNode(u, None, None, None, None, cf)
4608-        fn2 = ImmutableFileNode(u, None, None, None, None, cf)
4609+        fn1 = ImmutableFileNode(u, None, None, None, None)
4610+        fn2 = ImmutableFileNode(u, None, None, None, None)
4611         self.failUnlessEqual(fn1, fn2)
4612         self.failIfEqual(fn1, "I am not a filenode")
4613         self.failIfEqual(fn1, NotANode())
4614hunk ./src/allmydata/test/test_hung_server.py 12
4615 from allmydata.mutable.common import UnrecoverableFileError
4616 from allmydata.storage.common import storage_index_to_dir
4617 from allmydata.test.no_network import GridTestMixin
4618-from allmydata.test.common import ShouldFailMixin, _corrupt_share_data
4619+from allmydata.test.common import ShouldFailMixin
4620+from allmydata.util.pollmixin import PollMixin
4621 from allmydata.interfaces import NotEnoughSharesError
4622 
4623 immutable_plaintext = "data" * 10000
4624hunk ./src/allmydata/test/test_hung_server.py 19
4625 mutable_plaintext = "muta" * 10000
4626 
4627-class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, unittest.TestCase):
4628+class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
4629+                             unittest.TestCase):
4630     # Many of these tests take around 60 seconds on François's ARM buildslave:
4631     # http://tahoe-lafs.org/buildbot/builders/FranXois%20lenny-armv5tel
4632hunk ./src/allmydata/test/test_hung_server.py 23
4633-    # allmydata.test.test_hung_server.HungServerDownloadTest.test_2_good_8_broken_duplicate_share_fail once ERRORed after 197 seconds on Midnight Magic's NetBSD buildslave:
4634+    # allmydata.test.test_hung_server.HungServerDownloadTest.test_2_good_8_broken_duplicate_share_fail
4635+    # once ERRORed after 197 seconds on Midnight Magic's NetBSD buildslave:
4636     # http://tahoe-lafs.org/buildbot/builders/MM%20netbsd4%20i386%20warp
4637     # MM's buildslave varies a lot in how long it takes to run tests.
4638 
4639hunk ./src/allmydata/test/test_hung_server.py 42
4640         for (id, ss) in servers:
4641             self.g.unhang_server(id, **kwargs)
4642 
4643+    def _hang_shares(self, shnums, **kwargs):
4644+        # hang all servers who are holding the given shares
4645+        hung_serverids = set()
4646+        for (i_shnum, i_serverid, i_sharefile) in self.shares:
4647+            if i_shnum in shnums:
4648+                if i_serverid not in hung_serverids:
4649+                    self.g.hang_server(i_serverid, **kwargs)
4650+                    hung_serverids.add(i_serverid)
4651+
4652     def _delete_all_shares_from(self, servers):
4653         serverids = [id for (id, ss) in servers]
4654         for (i_shnum, i_serverid, i_sharefile) in self.shares:
4655hunk ./src/allmydata/test/test_hung_server.py 128
4656             stage_4_d = None # currently we aren't doing any tests which require this for mutable files
4657         else:
4658             d = download_to_data(n)
4659-            stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME
4660+            #stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME
4661+            stage_4_d = None
4662         return (d, stage_4_d,)
4663 
4664     def _wait_for_data(self, n):
4665hunk ./src/allmydata/test/test_hung_server.py 157
4666                                    self._download_and_check)
4667         else:
4668             return self.shouldFail(NotEnoughSharesError, self.basedir,
4669-                                   "Failed to get enough shareholders",
4670+                                   "ran out of shares",
4671                                    self._download_and_check)
4672 
4673 
4674hunk ./src/allmydata/test/test_hung_server.py 220
4675 
4676     # The tests below do not currently pass for mutable files.
4677 
4678-    def test_3_good_7_hung(self):
4679+    def test_3_good_7_hung_immutable(self):
4680         d = defer.succeed(None)
4681hunk ./src/allmydata/test/test_hung_server.py 222
4682-        for mutable in [False]:
4683-            d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung"))
4684-            d.addCallback(lambda ign: self._hang(self.servers[3:]))
4685-            d.addCallback(lambda ign: self._download_and_check())
4686+        d.addCallback(lambda ign: self._set_up(False, "test_3_good_7_hung"))
4687+        d.addCallback(lambda ign: self._hang(self.servers[3:]))
4688+        d.addCallback(lambda ign: self._download_and_check())
4689         return d
4690 
4691hunk ./src/allmydata/test/test_hung_server.py 227
4692-    def test_2_good_8_hung_then_1_recovers(self):
4693+    def test_5_overdue_immutable(self):
4694+        # restrict the ShareFinder to only allow 5 outstanding requests, and
4695+        # arrange for the first 5 servers to hang. Then trigger the OVERDUE
4696+        # timers (simulating 10 seconds passed), at which point the
4697+        # ShareFinder should send additional queries and finish the download
4698+        # quickly. If we didn't have OVERDUE timers, this test would fail by
4699+        # timing out.
4700+        done = []
4701+        d = self._set_up(False, "test_5_overdue_immutable")
4702+        def _reduce_max_outstanding_requests_and_download(ign):
4703+            self._hang_shares(range(5))
4704+            n = self.c0.create_node_from_uri(self.uri)
4705+            self._sf = n._cnode._node._sharefinder
4706+            self._sf.max_outstanding_requests = 5
4707+            self._sf.OVERDUE_TIMEOUT = 1000.0
4708+            d2 = download_to_data(n)
4709+            # start download, but don't wait for it to complete yet
4710+            def _done(res):
4711+                done.append(res) # we will poll for this later
4712+            d2.addBoth(_done)
4713+        d.addCallback(_reduce_max_outstanding_requests_and_download)
4714+        from foolscap.eventual import fireEventually, flushEventualQueue
4715+        # wait here a while
4716+        d.addCallback(lambda res: fireEventually(res))
4717+        d.addCallback(lambda res: flushEventualQueue())
4718+        d.addCallback(lambda ign: self.failIf(done))
4719+        def _check_waiting(ign):
4720+            # all the share requests should now be stuck waiting
4721+            self.failUnlessEqual(len(self._sf.pending_requests), 5)
4722+            # but none should be marked as OVERDUE until the timers expire
4723+            self.failUnlessEqual(len(self._sf.overdue_requests), 0)
4724+        d.addCallback(_check_waiting)
4725+        def _mark_overdue(ign):
4726+            # declare four requests overdue, allowing new requests to take
4727+            # their place, and leaving one stuck. The finder will keep
4728+            # sending requests until there are 5 non-overdue ones
4729+            # outstanding, at which point we'll have 4 OVERDUE, 1
4730+            # stuck-but-not-overdue, and 4 live requests. All 4 live requests
4731+            # will retire before the download is complete and the ShareFinder
4732+            # is shut off. That will leave 4 OVERDUE and 1
4733+            # stuck-but-not-overdue, for a total of 5 requests in in
4734+            # _sf.pending_requests
4735+            for t in self._sf.overdue_timers.values()[:4]:
4736+                t.reset(-1.0)
4737+            # the timers ought to fire before the eventual-send does
4738+            return fireEventually()
4739+        d.addCallback(_mark_overdue)
4740+        def _we_are_done():
4741+            return bool(done)
4742+        d.addCallback(lambda ign: self.poll(_we_are_done))
4743+        def _check_done(ign):
4744+            self.failUnlessEqual(done, [immutable_plaintext])
4745+            self.failUnlessEqual(len(self._sf.pending_requests), 5)
4746+            self.failUnlessEqual(len(self._sf.overdue_requests), 4)
4747+        d.addCallback(_check_done)
4748+        return d
4749+
4750+    def test_3_good_7_hung_mutable(self):
4751+        raise unittest.SkipTest("still broken")
4752         d = defer.succeed(None)
4753hunk ./src/allmydata/test/test_hung_server.py 287
4754-        for mutable in [False]:
4755-            d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers"))
4756-            d.addCallback(lambda ign: self._hang(self.servers[2:3]))
4757-            d.addCallback(lambda ign: self._hang(self.servers[3:]))
4758-            d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
4759-            d.addCallback(lambda ign: self._download_and_check())
4760+        d.addCallback(lambda ign: self._set_up(True, "test_3_good_7_hung"))
4761+        d.addCallback(lambda ign: self._hang(self.servers[3:]))
4762+        d.addCallback(lambda ign: self._download_and_check())
4763         return d
4764 
4765hunk ./src/allmydata/test/test_hung_server.py 292
4766-    def test_2_good_8_hung_then_1_recovers_with_2_shares(self):
4767+    def test_2_good_8_hung_then_1_recovers_immutable(self):
4768         d = defer.succeed(None)
4769hunk ./src/allmydata/test/test_hung_server.py 294
4770-        for mutable in [False]:
4771-            d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
4772-            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
4773-            d.addCallback(lambda ign: self._hang(self.servers[2:3]))
4774-            d.addCallback(lambda ign: self._hang(self.servers[3:]))
4775-            d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
4776-            d.addCallback(lambda ign: self._download_and_check())
4777+        d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers"))
4778+        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
4779+        d.addCallback(lambda ign: self._hang(self.servers[3:]))
4780+        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
4781+        d.addCallback(lambda ign: self._download_and_check())
4782+        return d
4783+
4784+    def test_2_good_8_hung_then_1_recovers_mutable(self):
4785+        raise unittest.SkipTest("still broken")
4786+        d = defer.succeed(None)
4787+        d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers"))
4788+        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
4789+        d.addCallback(lambda ign: self._hang(self.servers[3:]))
4790+        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
4791+        d.addCallback(lambda ign: self._download_and_check())
4792         return d
4793 
4794hunk ./src/allmydata/test/test_hung_server.py 311
4795-    def test_failover_during_stage_4(self):
4796-        # See #287
4797+    def test_2_good_8_hung_then_1_recovers_with_2_shares_immutable(self):
4798         d = defer.succeed(None)
4799hunk ./src/allmydata/test/test_hung_server.py 313
4800-        for mutable in [False]:
4801-            d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
4802-            d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data))
4803-            d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
4804-            d.addCallback(lambda ign: self._hang(self.servers[3:]))
4805-            d.addCallback(lambda ign: self._start_download())
4806-            def _after_starting_download((doned, started4d)):
4807-                started4d.addCallback(lambda ign: self._unhang(self.servers[3:4]))
4808-                doned.addCallback(self._check)
4809-                return doned
4810-            d.addCallback(_after_starting_download)
4811+        d.addCallback(lambda ign: self._set_up(False, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
4812+        d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
4813+        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
4814+        d.addCallback(lambda ign: self._hang(self.servers[3:]))
4815+        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
4816+        d.addCallback(lambda ign: self._download_and_check())
4817+        return d
4818 
4819hunk ./src/allmydata/test/test_hung_server.py 321
4820+    def test_2_good_8_hung_then_1_recovers_with_2_shares_mutable(self):
4821+        raise unittest.SkipTest("still broken")
4822+        d = defer.succeed(None)
4823+        d.addCallback(lambda ign: self._set_up(True, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
4824+        d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
4825+        d.addCallback(lambda ign: self._hang(self.servers[2:3]))
4826+        d.addCallback(lambda ign: self._hang(self.servers[3:]))
4827+        d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
4828+        d.addCallback(lambda ign: self._download_and_check())
4829         return d
4830hunk ./src/allmydata/test/test_immutable.py 8
4831 from twisted.trial import unittest
4832 import random
4833 
4834-class Test(common.ShareManglingMixin, unittest.TestCase):
4835+class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase):
4836     def test_test_code(self):
4837         # The following process of stashing the shares, running
4838         # replace_shares, and asserting that the new set of shares equals the
4839hunk ./src/allmydata/test/test_immutable.py 21
4840             return res
4841         d.addCallback(_stash_it)
4842 
4843-        # The following process of deleting 8 of the shares and asserting that you can't
4844-        # download it is more to test this test code than to test the Tahoe code...
4845+        # The following process of deleting 8 of the shares and asserting
4846+        # that you can't download it is more to test this test code than to
4847+        # test the Tahoe code...
4848         def _then_delete_8(unused=None):
4849             self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
4850             for i in range(8):
4851hunk ./src/allmydata/test/test_immutable.py 46
4852         return d
4853 
4854     def test_download(self):
4855-        """ Basic download.  (This functionality is more or less already tested by test code in
4856-        other modules, but this module is also going to test some more specific things about
4857-        immutable download.)
4858+        """ Basic download. (This functionality is more or less already
4859+        tested by test code in other modules, but this module is also going
4860+        to test some more specific things about immutable download.)
4861         """
4862         d = defer.succeed(None)
4863         before_download_reads = self._count_reads()
4864hunk ./src/allmydata/test/test_immutable.py 54
4865         def _after_download(unused=None):
4866             after_download_reads = self._count_reads()
4867-            self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads))
4868+            #print before_download_reads, after_download_reads
4869+            self.failIf(after_download_reads-before_download_reads > 27,
4870+                        (after_download_reads, before_download_reads))
4871         d.addCallback(self._download_and_check_plaintext)
4872         d.addCallback(_after_download)
4873         return d
4874hunk ./src/allmydata/test/test_immutable.py 62
4875 
4876     def test_download_from_only_3_remaining_shares(self):
4877-        """ Test download after 7 random shares (of the 10) have been removed. """
4878+        """ Test download after 7 random shares (of the 10) have been
4879+        removed."""
4880         d = defer.succeed(None)
4881         def _then_delete_7(unused=None):
4882             for i in range(7):
4883hunk ./src/allmydata/test/test_immutable.py 72
4884         d.addCallback(_then_delete_7)
4885         def _after_download(unused=None):
4886             after_download_reads = self._count_reads()
4887+            #print before_download_reads, after_download_reads
4888             self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads))
4889         d.addCallback(self._download_and_check_plaintext)
4890         d.addCallback(_after_download)
4891hunk ./src/allmydata/test/test_immutable.py 79
4892         return d
4893 
4894     def test_download_from_only_3_shares_with_good_crypttext_hash(self):
4895-        """ Test download after 7 random shares (of the 10) have had their crypttext hash tree corrupted. """
4896+        """ Test download after 7 random shares (of the 10) have had their
4897+        crypttext hash tree corrupted."""
4898         d = defer.succeed(None)
4899         def _then_corrupt_7(unused=None):
4900             shnums = range(10)
4901hunk ./src/allmydata/test/test_immutable.py 93
4902         return d
4903 
4904     def test_download_abort_if_too_many_missing_shares(self):
4905-        """ Test that download gives up quickly when it realizes there aren't enough shares out
4906-        there."""
4907-        d = defer.succeed(None)
4908-        def _then_delete_8(unused=None):
4909-            for i in range(8):
4910-                self._delete_a_share()
4911-        d.addCallback(_then_delete_8)
4912-
4913-        before_download_reads = self._count_reads()
4914-        def _attempt_to_download(unused=None):
4915-            d2 = download_to_data(self.n)
4916-
4917-            def _callb(res):
4918-                self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
4919-            def _errb(f):
4920-                self.failUnless(f.check(NotEnoughSharesError))
4921-            d2.addCallbacks(_callb, _errb)
4922-            return d2
4923-
4924-        d.addCallback(_attempt_to_download)
4925-
4926-        def _after_attempt(unused=None):
4927-            after_download_reads = self._count_reads()
4928-            # To pass this test, you are required to give up before actually trying to read any
4929-            # share data.
4930-            self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads))
4931-        d.addCallback(_after_attempt)
4932+        """ Test that download gives up quickly when it realizes there aren't
4933+        enough shares out there."""
4934+        for i in range(8):
4935+            self._delete_a_share()
4936+        d = self.shouldFail(NotEnoughSharesError, "delete 8", None,
4937+                            download_to_data, self.n)
4938+        # the new downloader pipelines a bunch of read requests in parallel,
4939+        # so don't bother asserting anything about the number of reads
4940         return d
4941 
4942     def test_download_abort_if_too_many_corrupted_shares(self):
4943hunk ./src/allmydata/test/test_immutable.py 104
4944-        """ Test that download gives up quickly when it realizes there aren't enough uncorrupted
4945-        shares out there. It should be able to tell because the corruption occurs in the
4946-        sharedata version number, which it checks first."""
4947+        """Test that download gives up quickly when it realizes there aren't
4948+        enough uncorrupted shares out there. It should be able to tell
4949+        because the corruption occurs in the sharedata version number, which
4950+        it checks first."""
4951         d = defer.succeed(None)
4952         def _then_corrupt_8(unused=None):
4953             shnums = range(10)
4954hunk ./src/allmydata/test/test_immutable.py 131
4955 
4956         def _after_attempt(unused=None):
4957             after_download_reads = self._count_reads()
4958-            # To pass this test, you are required to give up before reading all of the share
4959-            # data.  Actually, we could give up sooner than 45 reads, but currently our download
4960-            # code does 45 reads.  This test then serves as a "performance regression detector"
4961-            # -- if you change download code so that it takes *more* reads, then this test will
4962-            # fail.
4963-            self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads))
4964+            #print before_download_reads, after_download_reads
4965+            # To pass this test, you are required to give up before reading
4966+            # all of the share data. Actually, we could give up sooner than
4967+            # 45 reads, but currently our download code does 45 reads. This
4968+            # test then serves as a "performance regression detector" -- if
4969+            # you change download code so that it takes *more* reads, then
4970+            # this test will fail.
4971+            self.failIf(after_download_reads-before_download_reads > 45,
4972+                        (after_download_reads, before_download_reads))
4973         d.addCallback(_after_attempt)
4974         return d
4975 
4976hunk ./src/allmydata/test/test_immutable.py 144
4977 
4978-# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
4979+# XXX extend these tests to show bad behavior of various kinds from servers:
4980+# raising exception from each remove_foo() method, for example
4981 
4982 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
4983 
4984hunk ./src/allmydata/test/test_immutable.py 149
4985+# TODO: delete this whole file
4986hunk ./src/allmydata/test/test_mutable.py 200
4987     keygen = client.KeyGenerator()
4988     keygen.set_default_keysize(522)
4989     nodemaker = NodeMaker(storage_broker, sh, None,
4990-                          None, None, None,
4991+                          None, None,
4992                           {"k": 3, "n": 10}, keygen)
4993     return nodemaker
4994 
4995hunk ./src/allmydata/test/test_repairer.py 6
4996 from allmydata.monitor import Monitor
4997 from allmydata import check_results
4998 from allmydata.interfaces import NotEnoughSharesError
4999-from allmydata.immutable import repairer, upload
5000+from allmydata.immutable import upload
5001 from allmydata.util.consumer import download_to_data
5002 from twisted.internet import defer
5003 from twisted.trial import unittest
5004hunk ./src/allmydata/test/test_repairer.py 366
5005 # Optimally, you could repair one of these (small) files in a single write.
5006 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY
5007 
5008-class DownUpConnector(unittest.TestCase):
5009-    def test_deferred_satisfaction(self):
5010-        duc = repairer.DownUpConnector()
5011-        duc.registerProducer(None, True) # just because you have to call registerProducer first
5012-        # case 1: total data in buf is < requested data at time of request
5013-        duc.write('\x01')
5014-        d = duc.read_encrypted(2, False)
5015-        def _then(data):
5016-            self.failUnlessEqual(len(data), 2)
5017-            self.failUnlessEqual(data[0], '\x01')
5018-            self.failUnlessEqual(data[1], '\x02')
5019-        d.addCallback(_then)
5020-        duc.write('\x02')
5021-        return d
5022-
5023-    def test_extra(self):
5024-        duc = repairer.DownUpConnector()
5025-        duc.registerProducer(None, True) # just because you have to call registerProducer first
5026-        # case 1: total data in buf is < requested data at time of request
5027-        duc.write('\x01')
5028-        d = duc.read_encrypted(2, False)
5029-        def _then(data):
5030-            self.failUnlessEqual(len(data), 2)
5031-            self.failUnlessEqual(data[0], '\x01')
5032-            self.failUnlessEqual(data[1], '\x02')
5033-        d.addCallback(_then)
5034-        duc.write('\x02\0x03')
5035-        return d
5036-
5037-    def test_short_reads_1(self):
5038-        # You don't get fewer bytes than you requested -- instead you get no callback at all.
5039-        duc = repairer.DownUpConnector()
5040-        duc.registerProducer(None, True) # just because you have to call registerProducer first
5041-
5042-        d = duc.read_encrypted(2, False)
5043-        duc.write('\x04')
5044-
5045-        def _callb(res):
5046-            self.fail("Shouldn't have gotten this callback res: %s" % (res,))
5047-        d.addCallback(_callb)
5048-
5049-        # Also in the other order of read-vs-write:
5050-        duc2 = repairer.DownUpConnector()
5051-        duc2.registerProducer(None, True) # just because you have to call registerProducer first
5052-        duc2.write('\x04')
5053-        d = duc2.read_encrypted(2, False)
5054-
5055-        def _callb2(res):
5056-            self.fail("Shouldn't have gotten this callback res: %s" % (res,))
5057-        d.addCallback(_callb2)
5058-
5059-        # But once the DUC is closed then you *do* get short reads.
5060-        duc3 = repairer.DownUpConnector()
5061-        duc3.registerProducer(None, True) # just because you have to call registerProducer first
5062-
5063-        d = duc3.read_encrypted(2, False)
5064-        duc3.write('\x04')
5065-        duc3.close()
5066-        def _callb3(res):
5067-            self.failUnlessEqual(len(res), 1)
5068-            self.failUnlessEqual(res[0], '\x04')
5069-        d.addCallback(_callb3)
5070-        return d
5071-
5072-    def test_short_reads_2(self):
5073-        # Also in the other order of read-vs-write.
5074-        duc = repairer.DownUpConnector()
5075-        duc.registerProducer(None, True) # just because you have to call registerProducer first
5076-
5077-        duc.write('\x04')
5078-        d = duc.read_encrypted(2, False)
5079-        duc.close()
5080-
5081-        def _callb(res):
5082-            self.failUnlessEqual(len(res), 1)
5083-            self.failUnlessEqual(res[0], '\x04')
5084-        d.addCallback(_callb)
5085-        return d
5086-
5087-    def test_short_reads_3(self):
5088-        # Also if it is closed before the read.
5089-        duc = repairer.DownUpConnector()
5090-        duc.registerProducer(None, True) # just because you have to call registerProducer first
5091-
5092-        duc.write('\x04')
5093-        duc.close()
5094-        d = duc.read_encrypted(2, False)
5095-        def _callb(res):
5096-            self.failUnlessEqual(len(res), 1)
5097-            self.failUnlessEqual(res[0], '\x04')
5098-        d.addCallback(_callb)
5099-        return d
5100-
5101 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
5102                common.ShouldFailMixin):
5103 
5104hunk ./src/allmydata/test/test_system.py 12
5105 from allmydata.storage.mutable import MutableShareFile
5106 from allmydata.storage.server import si_a2b
5107 from allmydata.immutable import offloaded, upload
5108-from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
5109+from allmydata.immutable.literal import LiteralFileNode
5110+from allmydata.immutable.filenode import ImmutableFileNode
5111 from allmydata.util import idlib, mathutil
5112 from allmydata.util import log, base32
5113 from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_filesystem_encoding
5114hunk ./src/allmydata/test/test_upload.py 2090
5115 #  have a download fail
5116 #  cancel a download (need to implement more cancel stuff)
5117 
5118+# from test_encode:
5119+# NoNetworkGrid, upload part of ciphertext, kill server, continue upload
5120+# check with Kevan, they want to live in test_upload, existing tests might cover
5121+#     def test_lost_one_shareholder(self): # these are upload-side tests
5122+#     def test_lost_one_shareholder_early(self):
5123+#     def test_lost_many_shareholders(self):
5124+#     def test_lost_all_shareholders(self):
5125+
5126hunk ./src/allmydata/test/test_util.py 10
5127 from twisted.internet import defer, reactor
5128 from twisted.python.failure import Failure
5129 from twisted.python import log
5130+from hashlib import md5
5131 
5132 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
5133 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
5134hunk ./src/allmydata/test/test_util.py 17
5135 from allmydata.util import limiter, time_format, pollmixin, cachedir
5136 from allmydata.util import statistics, dictutil, pipeline
5137 from allmydata.util import log as tahoe_log
5138+from allmydata.util.spans import Spans, overlap, DataSpans
5139 
5140 class Base32(unittest.TestCase):
5141     def test_b2a_matches_Pythons(self):
5142hunk ./src/allmydata/test/test_util.py 1573
5143         tahoe_log.err(format="intentional sample error",
5144                       failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
5145         self.flushLoggedErrors(SampleError)
5146+
5147+
5148+class SimpleSpans:
5149+    # this is a simple+inefficient form of util.spans.Spans . We compare the
5150+    # behavior of this reference model against the real (efficient) form.
5151+
5152+    def __init__(self, _span_or_start=None, length=None):
5153+        self._have = set()
5154+        if length is not None:
5155+            for i in range(_span_or_start, _span_or_start+length):
5156+                self._have.add(i)
5157+        elif _span_or_start:
5158+            for (start,length) in _span_or_start:
5159+                self.add(start, length)
5160+
5161+    def add(self, start, length):
5162+        for i in range(start, start+length):
5163+            self._have.add(i)
5164+        return self
5165+
5166+    def remove(self, start, length):
5167+        for i in range(start, start+length):
5168+            self._have.discard(i)
5169+        return self
5170+
5171+    def each(self):
5172+        return sorted(self._have)
5173+
5174+    def __iter__(self):
5175+        items = sorted(self._have)
5176+        prevstart = None
5177+        prevend = None
5178+        for i in items:
5179+            if prevstart is None:
5180+                prevstart = prevend = i
5181+                continue
5182+            if i == prevend+1:
5183+                prevend = i
5184+                continue
5185+            yield (prevstart, prevend-prevstart+1)
5186+            prevstart = prevend = i
5187+        if prevstart is not None:
5188+            yield (prevstart, prevend-prevstart+1)
5189+
5190+    def __len__(self):
5191+        # this also gets us bool(s)
5192+        return len(self._have)
5193+
5194+    def __add__(self, other):
5195+        s = self.__class__(self)
5196+        for (start, length) in other:
5197+            s.add(start, length)
5198+        return s
5199+
5200+    def __sub__(self, other):
5201+        s = self.__class__(self)
5202+        for (start, length) in other:
5203+            s.remove(start, length)
5204+        return s
5205+
5206+    def __iadd__(self, other):
5207+        for (start, length) in other:
5208+            self.add(start, length)
5209+        return self
5210+
5211+    def __isub__(self, other):
5212+        for (start, length) in other:
5213+            self.remove(start, length)
5214+        return self
5215+
5216+    def __and__(self, other):
5217+        s = self.__class__()
5218+        for i in other.each():
5219+            if i in self._have:
5220+                s.add(i, 1)
5221+        return s
5222+
5223+    def __contains__(self, (start,length)):
5224+        for i in range(start, start+length):
5225+            if i not in self._have:
5226+                return False
5227+        return True
5228+
5229+class ByteSpans(unittest.TestCase):
5230+    def test_basic(self):
5231+        s = Spans()
5232+        self.failUnlessEqual(list(s), [])
5233+        self.failIf(s)
5234+        self.failIf((0,1) in s)
5235+        self.failUnlessEqual(len(s), 0)
5236+
5237+        s1 = Spans(3, 4) # 3,4,5,6
5238+        self._check1(s1)
5239+
5240+        s2 = Spans(s1)
5241+        self._check1(s2)
5242+
5243+        s2.add(10,2) # 10,11
5244+        self._check1(s1)
5245+        self.failUnless((10,1) in s2)
5246+        self.failIf((10,1) in s1)
5247+        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
5248+        self.failUnlessEqual(len(s2), 6)
5249+
5250+        s2.add(15,2).add(20,2)
5251+        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
5252+        self.failUnlessEqual(len(s2), 10)
5253+
5254+        s2.remove(4,3).remove(15,1)
5255+        self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
5256+        self.failUnlessEqual(len(s2), 6)
5257+
5258+        s1 = SimpleSpans(3, 4) # 3 4 5 6
5259+        s2 = SimpleSpans(5, 4) # 5 6 7 8
5260+        i = s1 & s2
5261+        self.failUnlessEqual(list(i.each()), [5, 6])
5262+
5263+    def _check1(self, s):
5264+        self.failUnlessEqual(list(s), [(3,4)])
5265+        self.failUnless(s)
5266+        self.failUnlessEqual(len(s), 4)
5267+        self.failIf((0,1) in s)
5268+        self.failUnless((3,4) in s)
5269+        self.failUnless((3,1) in s)
5270+        self.failUnless((5,2) in s)
5271+        self.failUnless((6,1) in s)
5272+        self.failIf((6,2) in s)
5273+        self.failIf((7,1) in s)
5274+        self.failUnlessEqual(list(s.each()), [3,4,5,6])
5275+
5276+    def test_math(self):
5277+        s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
5278+        s2 = Spans(5, 3) # 5,6,7
5279+        s3 = Spans(8, 4) # 8,9,10,11
5280+
5281+        s = s1 - s2
5282+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
5283+        s = s1 - s3
5284+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
5285+        s = s2 - s3
5286+        self.failUnlessEqual(list(s.each()), [5,6,7])
5287+        s = s1 & s2
5288+        self.failUnlessEqual(list(s.each()), [5,6,7])
5289+        s = s2 & s1
5290+        self.failUnlessEqual(list(s.each()), [5,6,7])
5291+        s = s1 & s3
5292+        self.failUnlessEqual(list(s.each()), [8,9])
5293+        s = s3 & s1
5294+        self.failUnlessEqual(list(s.each()), [8,9])
5295+        s = s2 & s3
5296+        self.failUnlessEqual(list(s.each()), [])
5297+        s = s3 & s2
5298+        self.failUnlessEqual(list(s.each()), [])
5299+        s = Spans() & s3
5300+        self.failUnlessEqual(list(s.each()), [])
5301+        s = s3 & Spans()
5302+        self.failUnlessEqual(list(s.each()), [])
5303+
5304+        s = s1 + s2
5305+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
5306+        s = s1 + s3
5307+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
5308+        s = s2 + s3
5309+        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
5310+
5311+        s = Spans(s1)
5312+        s -= s2
5313+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
5314+        s = Spans(s1)
5315+        s -= s3
5316+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
5317+        s = Spans(s2)
5318+        s -= s3
5319+        self.failUnlessEqual(list(s.each()), [5,6,7])
5320+
5321+        s = Spans(s1)
5322+        s += s2
5323+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
5324+        s = Spans(s1)
5325+        s += s3
5326+        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
5327+        s = Spans(s2)
5328+        s += s3
5329+        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
5330+
5331+    def test_random(self):
5332+        # attempt to increase coverage of corner cases by comparing behavior
5333+        # of a simple-but-slow model implementation against the
5334+        # complex-but-fast actual implementation, in a large number of random
5335+        # operations
5336+        S1 = SimpleSpans
5337+        S2 = Spans
5338+        s1 = S1(); s2 = S2()
5339+        seed = ""
5340+        def _create(subseed):
5341+            ns1 = S1(); ns2 = S2()
5342+            for i in range(10):
5343+                what = md5(subseed+str(i)).hexdigest()
5344+                start = int(what[2:4], 16)
5345+                length = max(1,int(what[5:6], 16))
5346+                ns1.add(start, length); ns2.add(start, length)
5347+            return ns1, ns2
5348+
5349+        #print
5350+        for i in range(1000):
5351+            what = md5(seed+str(i)).hexdigest()
5352+            op = what[0]
5353+            subop = what[1]
5354+            start = int(what[2:4], 16)
5355+            length = max(1,int(what[5:6], 16))
5356+            #print what
5357+            if op in "0":
5358+                if subop in "01234":
5359+                    s1 = S1(); s2 = S2()
5360+                elif subop in "5678":
5361+                    s1 = S1(start, length); s2 = S2(start, length)
5362+                else:
5363+                    s1 = S1(s1); s2 = S2(s2)
5364+                #print "s2 = %s" % s2.dump()
5365+            elif op in "123":
5366+                #print "s2.add(%d,%d)" % (start, length)
5367+                s1.add(start, length); s2.add(start, length)
5368+            elif op in "456":
5369+                #print "s2.remove(%d,%d)" % (start, length)
5370+                s1.remove(start, length); s2.remove(start, length)
5371+            elif op in "78":
5372+                ns1, ns2 = _create(what[7:11])
5373+                #print "s2 + %s" % ns2.dump()
5374+                s1 = s1 + ns1; s2 = s2 + ns2
5375+            elif op in "9a":
5376+                ns1, ns2 = _create(what[7:11])
5377+                #print "%s - %s" % (s2.dump(), ns2.dump())
5378+                s1 = s1 - ns1; s2 = s2 - ns2
5379+            elif op in "bc":
5380+                ns1, ns2 = _create(what[7:11])
5381+                #print "s2 += %s" % ns2.dump()
5382+                s1 += ns1; s2 += ns2
5383+            elif op in "de":
5384+                ns1, ns2 = _create(what[7:11])
5385+                #print "%s -= %s" % (s2.dump(), ns2.dump())
5386+                s1 -= ns1; s2 -= ns2
5387+            else:
5388+                ns1, ns2 = _create(what[7:11])
5389+                #print "%s &= %s" % (s2.dump(), ns2.dump())
5390+                s1 = s1 & ns1; s2 = s2 & ns2
5391+            #print "s2 now %s" % s2.dump()
5392+            self.failUnlessEqual(list(s1.each()), list(s2.each()))
5393+            self.failUnlessEqual(len(s1), len(s2))
5394+            self.failUnlessEqual(bool(s1), bool(s2))
5395+            self.failUnlessEqual(list(s1), list(s2))
5396+            for j in range(10):
5397+                what = md5(what[12:14]+str(j)).hexdigest()
5398+                start = int(what[2:4], 16)
5399+                length = max(1, int(what[5:6], 16))
5400+                span = (start, length)
5401+                self.failUnlessEqual(bool(span in s1), bool(span in s2))
5402+
5403+
5404+    # s()
5405+    # s(start,length)
5406+    # s(s0)
5407+    # s.add(start,length) : returns s
5408+    # s.remove(start,length)
5409+    # s.each() -> list of byte offsets, mostly for testing
5410+    # list(s) -> list of (start,length) tuples, one per span
5411+    # (start,length) in s -> True if (start..start+length-1) are all members
5412+    #  NOT equivalent to x in list(s)
5413+    # len(s) -> number of bytes, for testing, bool(), and accounting/limiting
5414+    # bool(s)  (__len__)
5415+    # s = s1+s2, s1-s2, +=s1, -=s1
5416+
5417+    def test_overlap(self):
5418+        for a in range(20):
5419+            for b in range(10):
5420+                for c in range(20):
5421+                    for d in range(10):
5422+                        self._test_overlap(a,b,c,d)
5423+
5424+    def _test_overlap(self, a, b, c, d):
5425+        s1 = set(range(a,a+b))
5426+        s2 = set(range(c,c+d))
5427+        #print "---"
5428+        #self._show_overlap(s1, "1")
5429+        #self._show_overlap(s2, "2")
5430+        o = overlap(a,b,c,d)
5431+        expected = s1.intersection(s2)
5432+        if not expected:
5433+            self.failUnlessEqual(o, None)
5434+        else:
5435+            start,length = o
5436+            so = set(range(start,start+length))
5437+            #self._show(so, "o")
5438+            self.failUnlessEqual(so, expected)
5439+
5440+    def _show_overlap(self, s, c):
5441+        import sys
5442+        out = sys.stdout
5443+        if s:
5444+            for i in range(max(s)):
5445+                if i in s:
5446+                    out.write(c)
5447+                else:
5448+                    out.write(" ")
5449+        out.write("\n")
5450+
5451+def extend(s, start, length, fill):
5452+    if len(s) >= start+length:
5453+        return s
5454+    assert len(fill) == 1
5455+    return s + fill*(start+length-len(s))
5456+
5457+def replace(s, start, data):
5458+    assert len(s) >= start+len(data)
5459+    return s[:start] + data + s[start+len(data):]
5460+
5461+class SimpleDataSpans:
5462+    def __init__(self, other=None):
5463+        self.missing = "" # "1" where missing, "0" where found
5464+        self.data = ""
5465+        if other:
5466+            for (start, data) in other.get_chunks():
5467+                self.add(start, data)
5468+
5469+    def __len__(self):
5470+        return len(self.missing.translate(None, "1"))
5471+    def _dump(self):
5472+        return [i for (i,c) in enumerate(self.missing) if c == "0"]
5473+    def _have(self, start, length):
5474+        m = self.missing[start:start+length]
5475+        if not m or len(m)<length or int(m):
5476+            return False
5477+        return True
5478+    def get_chunks(self):
5479+        for i in self._dump():
5480+            yield (i, self.data[i])
5481+    def get_spans(self):
5482+        return SimpleSpans([(start,len(data))
5483+                            for (start,data) in self.get_chunks()])
5484+    def get(self, start, length):
5485+        if self._have(start, length):
5486+            return self.data[start:start+length]
5487+        return None
5488+    def pop(self, start, length):
5489+        data = self.get(start, length)
5490+        if data:
5491+            self.remove(start, length)
5492+        return data
5493+    def remove(self, start, length):
5494+        self.missing = replace(extend(self.missing, start, length, "1"),
5495+                               start, "1"*length)
5496+    def add(self, start, data):
5497+        self.missing = replace(extend(self.missing, start, len(data), "1"),
5498+                               start, "0"*len(data))
5499+        self.data = replace(extend(self.data, start, len(data), " "),
5500+                            start, data)
5501+
5502+
5503+class StringSpans(unittest.TestCase):
5504+    def do_basic(self, klass):
5505+        ds = klass()
5506+        self.failUnlessEqual(len(ds), 0)
5507+        self.failUnlessEqual(list(ds._dump()), [])
5508+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
5509+        s = ds.get_spans()
5510+        self.failUnlessEqual(ds.get(0, 4), None)
5511+        self.failUnlessEqual(ds.pop(0, 4), None)
5512+        ds.remove(0, 4)
5513+
5514+        ds.add(2, "four")
5515+        self.failUnlessEqual(len(ds), 4)
5516+        self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
5517+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
5518+        s = ds.get_spans()
5519+        self.failUnless((2,2) in s)
5520+        self.failUnlessEqual(ds.get(0, 4), None)
5521+        self.failUnlessEqual(ds.pop(0, 4), None)
5522+        self.failUnlessEqual(ds.get(4, 4), None)
5523+
5524+        ds2 = klass(ds)
5525+        self.failUnlessEqual(len(ds2), 4)
5526+        self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
5527+        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
5528+        self.failUnlessEqual(ds2.get(0, 4), None)
5529+        self.failUnlessEqual(ds2.pop(0, 4), None)
5530+        self.failUnlessEqual(ds2.pop(2, 3), "fou")
5531+        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 1)
5532+        self.failUnlessEqual(ds2.get(2, 3), None)
5533+        self.failUnlessEqual(ds2.get(5, 1), "r")
5534+        self.failUnlessEqual(ds.get(2, 3), "fou")
5535+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
5536+
5537+        ds.add(0, "23")
5538+        self.failUnlessEqual(len(ds), 6)
5539+        self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
5540+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
5541+        self.failUnlessEqual(ds.get(0, 4), "23fo")
5542+        self.failUnlessEqual(ds.pop(0, 4), "23fo")
5543+        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 2)
5544+        self.failUnlessEqual(ds.get(0, 4), None)
5545+        self.failUnlessEqual(ds.pop(0, 4), None)
5546+
5547+        ds = klass()
5548+        ds.add(2, "four")
5549+        ds.add(3, "ea")
5550+        self.failUnlessEqual(ds.get(2, 4), "fear")
5551+
5552+    def do_scan(self, klass):
5553+        # do a test with gaps and spans of size 1 and 2
5554+        #  left=(1,11) * right=(1,11) * gapsize=(1,2)
5555+        # 111, 112, 121, 122, 211, 212, 221, 222
5556+        #    211
5557+        #      121
5558+        #         112
5559+        #            212
5560+        #               222
5561+        #                   221
5562+        #                      111
5563+        #                        122
5564+        #  11 1  1 11 11  11  1 1  111
5565+        # 0123456789012345678901234567
5566+        # abcdefghijklmnopqrstuvwxyz-=
5567+        pieces = [(1, "bc"),
5568+                  (4, "e"),
5569+                  (7, "h"),
5570+                  (9, "jk"),
5571+                  (12, "mn"),
5572+                  (16, "qr"),
5573+                  (20, "u"),
5574+                  (22, "w"),
5575+                  (25, "z-="),
5576+                  ]
5577+        p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
5578+        S = "abcdefghijklmnopqrstuvwxyz-="
5579+        # TODO: when adding data, add capital letters, to make sure we aren't
5580+        # just leaving the old data in place
5581+        l = len(S)
5582+        def base():
5583+            ds = klass()
5584+            for start, data in pieces:
5585+                ds.add(start, data)
5586+            return ds
5587+        def dump(s):
5588+            p = set(s._dump())
5589+            # wow, this is the first time I've ever wanted ?: in python
5590+            # note: this requires python2.5
5591+            d = "".join([(S[i] if i in p else " ") for i in range(l)])
5592+            assert len(d) == l
5593+            return d
5594+        DEBUG = False
5595+        for start in range(0, l):
5596+            for end in range(start+1, l):
5597+                # add [start-end) to the baseline
5598+                which = "%d-%d" % (start, end-1)
5599+                p_added = set(range(start, end))
5600+                b = base()
5601+                if DEBUG:
5602+                    print
5603+                    print dump(b), which
5604+                    add = klass(); add.add(start, S[start:end])
5605+                    print dump(add)
5606+                b.add(start, S[start:end])
5607+                if DEBUG:
5608+                    print dump(b)
5609+                # check that the new span is there
5610+                d = b.get(start, end-start)
5611+                self.failUnlessEqual(d, S[start:end], which)
5612+                # check that all the original pieces are still there
5613+                for t_start, t_data in pieces:
5614+                    t_len = len(t_data)
5615+                    self.failUnlessEqual(b.get(t_start, t_len),
5616+                                         S[t_start:t_start+t_len],
5617+                                         "%s %d+%d" % (which, t_start, t_len))
5618+                # check that a lot of subspans are mostly correct
5619+                for t_start in range(l):
5620+                    for t_len in range(1,4):
5621+                        d = b.get(t_start, t_len)
5622+                        if d is not None:
5623+                            which2 = "%s+(%d-%d)" % (which, t_start,
5624+                                                     t_start+t_len-1)
5625+                            self.failUnlessEqual(d, S[t_start:t_start+t_len],
5626+                                                 which2)
5627+                        # check that removing a subspan gives the right value
5628+                        b2 = klass(b)
5629+                        b2.remove(t_start, t_len)
5630+                        removed = set(range(t_start, t_start+t_len))
5631+                        for i in range(l):
5632+                            exp = (((i in p_elements) or (i in p_added))
5633+                                   and (i not in removed))
5634+                            which2 = "%s-(%d-%d)" % (which, t_start,
5635+                                                     t_start+t_len-1)
5636+                            self.failUnlessEqual(bool(b2.get(i, 1)), exp,
5637+                                                 which2+" %d" % i)
5638+
5639+    def test_test(self):
5640+        self.do_basic(SimpleDataSpans)
5641+        self.do_scan(SimpleDataSpans)
5642+
5643+    def test_basic(self):
5644+        self.do_basic(DataSpans)
5645+        self.do_scan(DataSpans)
5646+
5647+    def test_random(self):
5648+        # attempt to increase coverage of corner cases by comparing behavior
5649+        # of a simple-but-slow model implementation against the
5650+        # complex-but-fast actual implementation, in a large number of random
5651+        # operations
5652+        S1 = SimpleDataSpans
5653+        S2 = DataSpans
5654+        s1 = S1(); s2 = S2()
5655+        seed = ""
5656+        def _randstr(length, seed):
5657+            created = 0
5658+            pieces = []
5659+            while created < length:
5660+                piece = md5(seed + str(created)).hexdigest()
5661+                pieces.append(piece)
5662+                created += len(piece)
5663+            return "".join(pieces)[:length]
5664+        def _create(subseed):
5665+            ns1 = S1(); ns2 = S2()
5666+            for i in range(10):
5667+                what = md5(subseed+str(i)).hexdigest()
5668+                start = int(what[2:4], 16)
5669+                length = max(1,int(what[5:6], 16))
5670+                ns1.add(start, _randstr(length, what[7:9]));
5671+                ns2.add(start, _randstr(length, what[7:9]))
5672+            return ns1, ns2
5673+
5674+        #print
5675+        for i in range(1000):
5676+            what = md5(seed+str(i)).hexdigest()
5677+            op = what[0]
5678+            subop = what[1]
5679+            start = int(what[2:4], 16)
5680+            length = max(1,int(what[5:6], 16))
5681+            #print what
5682+            if op in "0":
5683+                if subop in "0123456":
5684+                    s1 = S1(); s2 = S2()
5685+                else:
5686+                    s1, s2 = _create(what[7:11])
5687+                #print "s2 = %s" % list(s2._dump())
5688+            elif op in "123456":
5689+                #print "s2.add(%d,%d)" % (start, length)
5690+                s1.add(start, _randstr(length, what[7:9]));
5691+                s2.add(start, _randstr(length, what[7:9]))
5692+            elif op in "789abc":
5693+                #print "s2.remove(%d,%d)" % (start, length)
5694+                s1.remove(start, length); s2.remove(start, length)
5695+            else:
5696+                #print "s2.pop(%d,%d)" % (start, length)
5697+                d1 = s1.pop(start, length); d2 = s2.pop(start, length)
5698+                self.failUnlessEqual(d1, d2)
5699+            #print "s1 now %s" % list(s1._dump())
5700+            #print "s2 now %s" % list(s2._dump())
5701+            self.failUnlessEqual(len(s1), len(s2))
5702+            self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
5703+            for j in range(100):
5704+                what = md5(what[12:14]+str(j)).hexdigest()
5705+                start = int(what[2:4], 16)
5706+                length = max(1, int(what[5:6], 16))
5707+                d1 = s1.get(start, length); d2 = s2.get(start, length)
5708+                self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))
5709hunk ./src/allmydata/test/test_web.py 15
5710 from allmydata import interfaces, uri, webish, dirnode
5711 from allmydata.storage.shares import get_share_file
5712 from allmydata.storage_client import StorageFarmBroker
5713-from allmydata.immutable import upload, download
5714+from allmydata.immutable import upload
5715+from allmydata.immutable.downloader.status import DownloadStatus
5716 from allmydata.dirnode import DirectoryNode
5717 from allmydata.nodemaker import NodeMaker
5718 from allmydata.unknown import UnknownNode
5719hunk ./src/allmydata/test/test_web.py 79
5720 
5721 class FakeHistory:
5722     _all_upload_status = [upload.UploadStatus()]
5723-    _all_download_status = [download.DownloadStatus()]
5724+    _all_download_status = [DownloadStatus("storage_index", 1234)]
5725     _all_mapupdate_statuses = [servermap.UpdateStatus()]
5726     _all_publish_statuses = [publish.PublishStatus()]
5727     _all_retrieve_statuses = [retrieve.RetrieveStatus()]
5728hunk ./src/allmydata/test/test_web.py 115
5729         self.uploader = FakeUploader()
5730         self.uploader.setServiceParent(self)
5731         self.nodemaker = FakeNodeMaker(None, self._secret_holder, None,
5732-                                       self.uploader, None, None,
5733+                                       self.uploader, None,
5734                                        None, None)
5735 
5736     def startService(self):
5737hunk ./src/allmydata/test/test_web.py 4191
5738                    "no servers were connected, but it might also indicate "
5739                    "severe corruption. You should perform a filecheck on "
5740                    "this object to learn more. The full error message is: "
5741-                   "Failed to get enough shareholders: have 0, need 3")
5742+                   "no shares (need 3). Last failure: None")
5743             self.failUnlessReallyEqual(exp, body)
5744         d.addCallback(_check_zero_shares)
5745 
5746hunk ./src/allmydata/test/test_web.py 4203
5747         def _check_one_share(body):
5748             self.failIf("<html>" in body, body)
5749             body = " ".join(body.strip().split())
5750-            exp = ("NotEnoughSharesError: This indicates that some "
5751+            msg = ("NotEnoughSharesError: This indicates that some "
5752                    "servers were unavailable, or that shares have been "
5753                    "lost to server departure, hard drive failure, or disk "
5754                    "corruption. You should perform a filecheck on "
5755hunk ./src/allmydata/test/test_web.py 4208
5756                    "this object to learn more. The full error message is:"
5757-                   " Failed to get enough shareholders: have 1, need 3")
5758-            self.failUnlessReallyEqual(exp, body)
5759+                   " ran out of shares: %d complete, %d pending, 0 overdue,"
5760+                   " 0 unused, need 3. Last failure: None")
5761+            msg1 = msg % (1, 0)
5762+            msg2 = msg % (0, 1)
5763+            self.failUnless(body == msg1 or body == msg2, body)
5764         d.addCallback(_check_one_share)
5765 
5766         d.addCallback(lambda ignored:
5767hunk ./src/allmydata/util/observer.py 3
5768 # -*- test-case-name: allmydata.test.test_observer -*-
5769 
5770+import weakref
5771 from twisted.internet import defer
5772 from foolscap.api import eventually
5773 
5774hunk ./src/allmydata/util/observer.py 95
5775     def notify(self, *args, **kwargs):
5776         for o in self._watchers:
5777             eventually(o, *args, **kwargs)
5778+
5779+class EventStreamObserver:
5780+    """A simple class to distribute multiple events to a single subscriber.
5781+    It accepts arbitrary kwargs, but no posargs."""
5782+    def __init__(self):
5783+        self._watcher = None
5784+        self._undelivered_results = []
5785+        self._canceler = None
5786+
5787+    def set_canceler(self, c, methname):
5788+        """I will call c.METHNAME(self) when somebody cancels me."""
5789+        # we use a weakref to avoid creating a cycle between us and the thing
5790+        # we're observing: they'll be holding a reference to us to compare
5791+        # against the value we pass to their canceler function. However,
5792+        # since bound methods are first-class objects (and not kept alive by
5793+        # the object they're bound to), we can't just stash a weakref to the
5794+        # bound cancel method. Instead, we must hold a weakref to the actual
5795+        # object, and obtain its cancel method later.
5796+        # http://code.activestate.com/recipes/81253-weakmethod/ has an
5797+        # alternative.
5798+        self._canceler = (weakref.ref(c), methname)
5799+
5800+    def subscribe(self, observer, **watcher_kwargs):
5801+        self._watcher = (observer, watcher_kwargs)
5802+        while self._undelivered_results:
5803+            self._notify(self._undelivered_results.pop(0))
5804+
5805+    def notify(self, **result_kwargs):
5806+        if self._watcher:
5807+            self._notify(result_kwargs)
5808+        else:
5809+            self._undelivered_results.append(result_kwargs)
5810+
5811+    def _notify(self, result_kwargs):
5812+        o, watcher_kwargs = self._watcher
5813+        kwargs = dict(result_kwargs)
5814+        kwargs.update(watcher_kwargs)
5815+        eventually(o, **kwargs)
5816+
5817+    def cancel(self):
5818+        wr,methname = self._canceler
5819+        o = wr()
5820+        if o:
5821+            getattr(o,methname)(self)
5822hunk ./src/allmydata/web/download-status.xhtml 21
5823   <li>Status: <span n:render="status"/></li>
5824 </ul>
5825 
5826+<div n:render="events"></div>
5827 
5828 <div n:render="results">
5829   <h2>Download Results</h2>
5830hunk ./src/allmydata/web/status.py 361
5831     def download_results(self):
5832         return defer.maybeDeferred(self.download_status.get_results)
5833 
5834+    def relative_time(self, t):
5835+        if t is None:
5836+            return t
5837+        if self.download_status.started is not None:
5838+            return t - self.download_status.started
5839+        return t
5840+    def short_relative_time(self, t):
5841+        t = self.relative_time(t)
5842+        if t is None:
5843+            return ""
5844+        return "+%.6fs" % t
5845+
5846+    def renderHTTP(self, ctx):
5847+        req = inevow.IRequest(ctx)
5848+        t = get_arg(req, "t")
5849+        if t == "json":
5850+            return self.json(req)
5851+        return rend.Page.renderHTTP(self, ctx)
5852+
5853+    def json(self, req):
5854+        req.setHeader("content-type", "text/plain")
5855+        data = {}
5856+        dyhb_events = []
5857+        for serverid,requests in self.download_status.dyhb_requests.iteritems():
5858+            for req in requests:
5859+                dyhb_events.append( (base32.b2a(serverid),) + req )
5860+        dyhb_events.sort(key=lambda req: req[1])
5861+        data["dyhb"] = dyhb_events
5862+        request_events = []
5863+        for serverid,requests in self.download_status.requests.iteritems():
5864+            for req in requests:
5865+                request_events.append( (base32.b2a(serverid),) + req )
5866+        request_events.sort(key=lambda req: (req[4],req[1]))
5867+        data["requests"] = request_events
5868+        data["segment"] = self.download_status.segment_events
5869+        data["read"] = self.download_status.read_events
5870+        return simplejson.dumps(data, indent=1) + "\n"
5871+
5872+    def render_events(self, ctx, data):
5873+        if not self.download_status.storage_index:
5874+            return
5875+        srt = self.short_relative_time
5876+        l = T.ul()
5877+
5878+        t = T.table(class_="status-download-events")
5879+        t[T.tr[T.td["serverid"], T.td["sent"], T.td["received"],
5880+               T.td["shnums"], T.td["RTT"]]]
5881+        dyhb_events = []
5882+        for serverid,requests in self.download_status.dyhb_requests.iteritems():
5883+            for req in requests:
5884+                dyhb_events.append( (serverid,) + req )
5885+        dyhb_events.sort(key=lambda req: req[1])
5886+        for d_ev in dyhb_events:
5887+            (serverid, sent, shnums, received) = d_ev
5888+            serverid_s = idlib.shortnodeid_b2a(serverid)
5889+            rtt = received - sent
5890+            t[T.tr(style="background: %s" % self.color(serverid))[
5891+                [T.td[serverid_s], T.td[srt(sent)], T.td[srt(received)],
5892+                 T.td[",".join([str(shnum) for shnum in shnums])],
5893+                 T.td[self.render_time(None, rtt)],
5894+                 ]]]
5895+        l["DYHB Requests:", t]
5896+
5897+        t = T.table(class_="status-download-events")
5898+        t[T.tr[T.td["range"], T.td["start"], T.td["finish"], T.td["got"],
5899+               T.td["time"], T.td["decrypttime"], T.td["pausedtime"],
5900+               T.td["speed"]]]
5901+        for r_ev in self.download_status.read_events:
5902+            (start, length, requesttime, finishtime, bytes, decrypt, paused) = r_ev
5903+            #print r_ev
5904+            if finishtime is not None:
5905+                rtt = finishtime - requesttime - paused
5906+                speed = self.render_rate(None, 1.0 * bytes / rtt)
5907+                rtt = self.render_time(None, rtt)
5908+                decrypt = self.render_time(None, decrypt)
5909+                paused = self.render_time(None, paused)
5910+            else:
5911+                speed, rtt, decrypt, paused = "","","",""
5912+            t[T.tr[T.td["[%d:+%d]" % (start, length)],
5913+                   T.td[srt(requesttime)], T.td[srt(finishtime)],
5914+                   T.td[bytes], T.td[rtt], T.td[decrypt], T.td[paused],
5915+                   T.td[speed],
5916+                   ]]
5917+        l["Read Events:", t]
5918+
5919+        t = T.table(class_="status-download-events")
5920+        t[T.tr[T.td["type"], T.td["segnum"], T.td["when"], T.td["range"],
5921+               T.td["decodetime"], T.td["segtime"], T.td["speed"]]]
5922+        reqtime = (None, None)
5923+        for s_ev in self.download_status.segment_events:
5924+            (etype, segnum, when, segstart, seglen, decodetime) = s_ev
5925+            if etype == "request":
5926+                t[T.tr[T.td["request"], T.td["seg%d" % segnum],
5927+                       T.td[srt(when)]]]
5928+                reqtime = (segnum, when)
5929+            elif etype == "delivery":
5930+                if reqtime[0] == segnum:
5931+                    segtime = when - reqtime[1]
5932+                    speed = self.render_rate(None, 1.0 * seglen / segtime)
5933+                    segtime = self.render_time(None, segtime)
5934+                else:
5935+                    segtime, speed = "", ""
5936+                t[T.tr[T.td["delivery"], T.td["seg%d" % segnum],
5937+                       T.td[srt(when)],
5938+                       T.td["[%d:+%d]" % (segstart, seglen)],
5939+                       T.td[self.render_time(None,decodetime)],
5940+                       T.td[segtime], T.td[speed]]]
5941+            elif etype == "error":
5942+                t[T.tr[T.td["error"], T.td["seg%d" % segnum]]]
5943+        l["Segment Events:", t]
5944+
5945+        t = T.table(border="1")
5946+        t[T.tr[T.td["serverid"], T.td["shnum"], T.td["range"],
5947+               T.td["txtime"], T.td["rxtime"], T.td["received"], T.td["RTT"]]]
5948+        reqtime = (None, None)
5949+        request_events = []
5950+        for serverid,requests in self.download_status.requests.iteritems():
5951+            for req in requests:
5952+                request_events.append( (serverid,) + req )
5953+        request_events.sort(key=lambda req: (req[4],req[1]))
5954+        for r_ev in request_events:
5955+            (peerid, shnum, start, length, sent, receivedlen, received) = r_ev
5956+            rtt = None
5957+            if received is not None:
5958+                rtt = received - sent
5959+            peerid_s = idlib.shortnodeid_b2a(peerid)
5960+            t[T.tr(style="background: %s" % self.color(peerid))[
5961+                T.td[peerid_s], T.td[shnum],
5962+                T.td["[%d:+%d]" % (start, length)],
5963+                T.td[srt(sent)], T.td[srt(received)], T.td[receivedlen],
5964+                T.td[self.render_time(None, rtt)],
5965+                ]]
5966+        l["Requests:", t]
5967+
5968+        return l
5969+
5970+    def color(self, peerid):
5971+        def m(c):
5972+            return min(ord(c) / 2 + 0x80, 0xff)
5973+        return "#%02x%02x%02x" % (m(peerid[0]), m(peerid[1]), m(peerid[2]))
5974+
5975     def render_results(self, ctx, data):
5976         d = self.download_results()
5977         def _got_results(results):
5978hunk ./src/allmydata/web/status.py 515
5979         TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
5980         started_s = time.strftime(TIME_FORMAT,
5981                                   time.localtime(data.get_started()))
5982-        return started_s
5983+        return started_s + " (%s)" % data.get_started()
5984 
5985     def render_si(self, ctx, data):
5986         si_s = base32.b2a_or_none(data.get_storage_index())
5987hunk ./src/allmydata/web/tahoe.css 139
5988   text-align: center;
5989   padding: 0 1em;
5990 }
5991+
5992+/* recent upload/download status pages */
5993+
5994+table.status-download-events {
5995+  border: 1px solid #aaa;
5996+}
5997+table.status-download-events td {
5998+  border: 1px solid #a00;
5999+  padding: 2px
6000+}
6001+
6002}
6003
6004Context:
6005
6006[bundled zetuptoolz' scriptsetup.py: broadcast WM_SETTINGCHANGE if environment has changed.
6007david-sarah@jacaranda.org**20100801010958
6008 Ignore-this: ac4ac78c45c538c2e50610997b56a86e
6009] 
6010[abbreviate time edge case python2.5 unit test
6011jacob.lyles@gmail.com**20100729210638
6012 Ignore-this: 80f9b1dc98ee768372a50be7d0ef66af
6013] 
6014[test_upload.py: rename test_problem_layout_ticket1124 to test_problem_layout_ticket_1124 -- fix .todo reference.
6015david-sarah@jacaranda.org**20100729152927
6016 Ignore-this: c8fe1047edcc83c87b9feb47f4aa587b
6017] 
6018[misc/build_helpers/run_trial.py: correct error in formatting wrong-code error message.
6019david-sarah@jacaranda.org**20100729151457
6020 Ignore-this: bf4014842a1ffc075e8017053356e3a0
6021] 
6022[test_upload.py: rename test_problem_layout_ticket1124 to test_problem_layout_ticket_1124 for consistency.
6023david-sarah@jacaranda.org**20100729142250
6024 Ignore-this: bc3aad5919ae9079ceb9968ad0f5ea5a
6025] 
6026[docs: fix licensing typo that was earlier fixed in [20090921164651-92b7f-7f97b58101d93dc588445c52a9aaa56a2c7ae336]
6027zooko@zooko.com**20100729052923
6028 Ignore-this: a975d79115911688e5469d4d869e1664
6029 I wish we didn't copies of this licensing text in several different files so that changes can be accidentally omitted from some of them.
6030] 
6031[misc/build_helpers/run_trial.py: allow 'pythonx.y' between 'lib' and 'site-packages'. Also, have the wrong-code error message give the original module source filename.
6032david-sarah@jacaranda.org**20100729052813
6033 Ignore-this: fafb184f1ecc4a9047aa6ea98b51cab6
6034] 
6035[misc/build_helpers/run_trial.py: fix another off-by-two error when the module is loaded from lib/site-packages.
6036david-sarah@jacaranda.org**20100729032853
6037 Ignore-this: b54312cb736ec35e528567c989b79a5d
6038] 
6039[misc/build_helpers/run_trial.py: fix an off-by-one error when determining the root directory from which the module was loaded, and an off-by-two error when it is loaded from an .egg.
6040david-sarah@jacaranda.org**20100729031147
6041 Ignore-this: e38b5dc8dd7b7e387641a4af743c7a27
6042] 
6043[misc/build_helpers/run_trial.py and test_runner.py: refine the false-positive detection for Unicode paths.
6044david-sarah@jacaranda.org**20100729030903
6045 Ignore-this: 42bbfb52bd56ce72dae66f8971e47074
6046] 
6047[misc/build_helpers/run_trial.py: skip option arguments before module name.
6048david-sarah@jacaranda.org**20100729024602
6049 Ignore-this: 51f7f0e4e73205ef3b7c644c4be5cd27
6050] 
6051[misc/build_helpers/run_trial.py and test_runner.py: avoid spurious failures due to non-canonical paths when checking that we are testing the right code. Also simplify module loading in run_trial.py.
6052david-sarah@jacaranda.org**20100729023233
6053 Ignore-this: 5a65065299cdf52b257dbf1ea83bdbaa
6054] 
6055[.darcs-boringfile: fix errors in previous patch, and make _trial_temp a prefix rather than an exact match.
6056david-sarah@jacaranda.org**20100729010634
6057 Ignore-this: 247e24993b578682219d110c031daac3
6058] 
6059[misc/build_helpers/run_trial.py: check that the root from which the module we are testing was loaded is the current directory. addresses #1137
6060david-sarah@jacaranda.org**20100729004317
6061 Ignore-this: e285af3f5cf0e0bc9537632d8457b8a8
6062] 
6063[.darcs-boringfile: take account of generated bin/tahoe.pyscript and bundled .egg directories.
6064david-sarah@jacaranda.org**20100728224723
6065 Ignore-this: dce133644614753907d5d617dc8dd771
6066] 
6067[test_runner.py: add 'test_the_right_code', which partly addresses #1137
6068david-sarah@jacaranda.org**20100728194325
6069 Ignore-this: ed67365cc067881bcffb9ff5fcfa3ef6
6070] 
6071[test_runner.py: add test_run_with_python_options, to test that we haven't broken skipping of option arguments in argv. Also fix errors in the message arguments to failUnlessEqual.
6072david-sarah@jacaranda.org**20100728070445
6073 Ignore-this: fb4a907603dc8ffa71c121dd465b4bb8
6074] 
6075[Skip option arguments to the python interpreter when reconstructing Unicode argv on Windows.
6076david-sarah@jacaranda.org**20100728062731
6077 Ignore-this: 2b17fc43860bcc02a66bb6e5e050ea7c
6078] 
6079[windows/fixups.py: improve comments and reference some relevant Python bugs.
6080david-sarah@jacaranda.org**20100727181921
6081 Ignore-this: 32e61cf98dfc2e3dac60b750dda6429b
6082] 
6083[misc/build_helpers/run-with-pythonpath.py: fix stale comment, and remove 'trial' example that is not the right way to run trial.
6084david-sarah@jacaranda.org**20100726225729
6085 Ignore-this: a61f55557ad69a1633bfb2b8172cce97
6086] 
6087[windows/fixups.py: make errors reported to original_stderr have enough information to debug even if we can't see the traceback.
6088david-sarah@jacaranda.org**20100726221904
6089 Ignore-this: e30b4629a7aa5d71554237c7e809c080
6090] 
6091[windows/fixups.py: fix paste-o in name of Unicode stderr wrapper.
6092david-sarah@jacaranda.org**20100726214736
6093 Ignore-this: cb220931f1683eb53b0c7269e18a38be
6094] 
6095[windows/fixups.py: Don't rely on buggy MSVCRT library for Unicode output, use the Win32 API instead. This should make it work on XP. Also, change how we handle the case where sys.stdout and sys.stderr are redirected, since the .encoding attribute isn't necessarily writeable.
6096david-sarah@jacaranda.org**20100726045019
6097 Ignore-this: 69267abc5065cbd5b86ca71fe4921fb6
6098] 
6099[fileutil: change WindowsError to OSError in abspath_expanduser_unicode, because WindowsError might not exist.
6100david-sarah@jacaranda.org**20100725222603
6101 Ignore-this: e125d503670ed049a9ade0322faa0c51
6102] 
6103[test_runner.py: change to code for locating the bin/tahoe script that was missed when rebasing the patch for #1074.
6104david-sarah@jacaranda.org**20100725182008
6105 Ignore-this: d891a93989ecc3f4301a17110c3d196c
6106] 
6107[Add missing windows/fixups.py (for setting up Unicode args and output on Windows).
6108david-sarah@jacaranda.org**20100725092849
6109 Ignore-this: 35a1e8aeb4e1dea6e81433bf0825a6f6
6110] 
6111[bundled zetuptoolz: add missing scriptsetup.py, and remove cli.exe.
6112david-sarah@jacaranda.org**20100725090203
6113 Ignore-this: 64810149ed7f25babfb123690191920b
6114] 
6115[Move bundled setuptools egg directory to reflect its version (0.6c16dev).
6116david-sarah@jacaranda.org**20100725084629
6117 Ignore-this: b37969282dfd5d1f705e61780e8d62b9
6118] 
6119[Upgrade bundled zetuptoolz to 0.6c16dev.
6120david-sarah@jacaranda.org**20100725083728
6121 Ignore-this: ecca879c0c6d8ee5473db770a522c2f4
6122] 
6123[Changes to Tahoe needed to work with new zetuptoolz (that does not use .exe wrappers on Windows), and to support Unicode arguments and stdout/stderr -- v5
6124david-sarah@jacaranda.org**20100725083216
6125 Ignore-this: 5041a634b1328f041130658233f6a7ce
6126] 
6127[Fix test failures due to Unicode basedir patches.
6128david-sarah@jacaranda.org**20100725010318
6129 Ignore-this: fe92cd439eb3e60a56c007ae452784ed
6130] 
6131[test_system: correct a failure in _test_runner caused by Unicode basedir patch on non-Unicode platforms.
6132david-sarah@jacaranda.org**20100724032123
6133 Ignore-this: 399b3953104fdd1bbed3f7564d163553
6134] 
6135[util.encodingutil: change quote_output to do less unnecessary escaping, and to use double-quotes more consistently when needed. This version avoids u-escaping for characters that are representable in the output encoding, when double quotes are used, and includes tests. fixes #1135
6136david-sarah@jacaranda.org**20100723075314
6137 Ignore-this: b82205834d17db61612dd16436b7c5a2
6138] 
6139[_auto_deps.py: make it easier to build with earliest versions of all dependencies.
6140david-sarah@jacaranda.org**20100722001725
6141 Ignore-this: 90a1bf1d489aa5a98ae39bb2e16c6f66
6142] 
6143[Replace uses of os.path.abspath with abspath_expanduser_unicode where necessary. This makes basedir paths consistently represented as Unicode.
6144david-sarah@jacaranda.org**20100722001418
6145 Ignore-this: 9f8cb706540e695550e0dbe303c01f52
6146] 
6147[Basedir/node directory option improvements. addresses #188, #706, #715, #772, #890
6148david-sarah@jacaranda.org**20100721234834
6149 Ignore-this: 92d52f3af4acb0d659cb49e3306fef6c
6150] 
6151[util.fileutil, test.test_util: add abspath_expanduser_unicode function, to work around <http://bugs.python.org/issue3426>. util.encodingutil: add a convenience function argv_to_abspath.
6152david-sarah@jacaranda.org**20100721231507
6153 Ignore-this: eee6904d1f65a733ff35190879844d08
6154] 
6155[docs/specifications/dirnodes.txt: 'mesh'->'grid'.
6156david-sarah@jacaranda.org**20100723061616
6157 Ignore-this: 887bcf921ef00afba8e05e9239035bca
6158] 
6159[docs/specifications/dirnodes.txt: bring layer terminology up-to-date with architecture.txt, and a few other updates (e.g. note that the MAC is no longer verified, and that URIs can be unknown). Also 'Tahoe'->'Tahoe-LAFS'.
6160david-sarah@jacaranda.org**20100723054703
6161 Ignore-this: f3b98183e7d0a0f391225b8b93ac6c37
6162] 
6163[docs: use current cap to Zooko's wiki page in example text
6164zooko@zooko.com**20100721010543
6165 Ignore-this: 4f36f36758f9fdbaf9eb73eac23b6652
6166 fixes #1134
6167] 
6168[__init__.py: silence DeprecationWarning about BaseException.message globally. fixes #1129
6169david-sarah@jacaranda.org**20100720011939
6170 Ignore-this: 38808986ba79cb2786b010504a22f89
6171] 
6172[test_runner: test that 'tahoe --version' outputs no noise (e.g. DeprecationWarnings).
6173david-sarah@jacaranda.org**20100720011345
6174 Ignore-this: dd358b7b2e5d57282cbe133e8069702e
6175] 
6176[TAG allmydata-tahoe-1.7.1
6177zooko@zooko.com**20100719131352
6178 Ignore-this: 6942056548433dc653a746703819ad8c
6179] 
6180Patch bundle hash:
61812980d8a66f7d6778e0e8076504d00dd5caf1b3b5