Ticket #393: 393status9.dpatch

File 393status9.dpatch, 275.0 KB (added by kevan, at 2010-06-26T00:52:38Z)
Line 
1Thu Jun 24 16:42:08 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
2  * Alter the ServermapUpdater to find MDMF files
3 
4  The servermapupdater should find MDMF files on a grid in the same way
5  that it finds SDMF files. This patch makes it do that.
6
7Thu Jun 24 16:44:10 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
8  * Make a segmented mutable uploader
9 
10  The mutable file uploader should be able to publish files with one
11  segment and files with multiple segments. This patch makes it do that.
12  This is still incomplete, and rather ugly -- I need to flesh out error
13  handling, I need to write tests, and I need to remove some of the uglier
14  kludges in the process before I can call this done.
15
16Thu Jun 24 16:46:37 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
17  * Misc. changes to support the work I'm doing
18 
19      - Add a notion of file version number to interfaces.py
20      - Alter mutable file node interfaces to have a notion of version,
21        though this may be changed later.
22      - Alter mutable/filenode.py to conform to these changes.
23      - Add a salt hasher to util/hashutil.py
24
25Thu Jun 24 16:48:33 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
26  * nodemaker.py: create MDMF files when asked to
27
28Thu Jun 24 16:49:05 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
29  * storage/server.py: minor code cleanup
30
31Thu Jun 24 16:49:24 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
32  * test/test_mutable.py: alter some tests that were failing due to MDMF; minor code cleanup.
33
34Fri Jun 25 17:33:24 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
35  * Add MDMF reader and writer
36 
37  The MDMF/SDMF reader and MDMF writer are similar to the object proxies
38  that exist for immutable files. They abstract away details of
39  connection, state, and caching from their callers (in this case, the
40  download, servermap updater, and uploader), and expose methods to get
41  and set information on the remote server.
42 
43  MDMFSlotReadProxy reads a mutable file from the server, doing the right
44  thing (in most cases) regardless of whether the file is MDMF or SDMF. It
45  allows callers to tell it how to batch and flush reads.
46 
47  MDMFSlotWriteProxy writes an MDMF mutable file to a server.
48 
49  This patch also includes tests for MDMFSlotReadProxy and
50  MDMFSlotWriteProxy.
51
52Fri Jun 25 17:33:55 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
53  * Write a segmented mutable downloader
54 
55  The segmented mutable downloader can deal with MDMF files (files with
56  one or more segments in MDMF format) and SDMF files (files with one
57  segment in SDMF format). It is backwards compatible with the old
58  file format.
59 
60  This patch also contains tests for the segmented mutable downloader.
61
62Fri Jun 25 17:35:20 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
63  * test/test_mutable.py: change the definition of corrupt() to work with MDMF as well as SDMF files, change users of corrupt to use the new definition
64
65New patches:
66
67[Alter the ServermapUpdater to find MDMF files
68Kevan Carstensen <kevan@isnotajoke.com>**20100624234208
69 Ignore-this: 8f89a4f853bc3096990cddf0e0644813
70 
71 The servermapupdater should find MDMF files on a grid in the same way
72 that it finds SDMF files. This patch makes it do that.
73] {
74hunk ./src/allmydata/mutable/servermap.py 7
75 from itertools import count
76 from twisted.internet import defer
77 from twisted.python import failure
78-from foolscap.api import DeadReferenceError, RemoteException, eventually
79+from foolscap.api import DeadReferenceError, RemoteException, eventually, \
80+                         fireEventually
81 from allmydata.util import base32, hashutil, idlib, log
82 from allmydata.storage.server import si_b2a
83 from allmydata.interfaces import IServermapUpdaterStatus
84hunk ./src/allmydata/mutable/servermap.py 17
85 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
86      DictOfSets, CorruptShareError, NeedMoreDataError
87 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
88-     SIGNED_PREFIX_LENGTH
89+     SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
90 
91 class UpdateStatus:
92     implements(IServermapUpdaterStatus)
93hunk ./src/allmydata/mutable/servermap.py 254
94         """Return a set of versionids, one for each version that is currently
95         recoverable."""
96         versionmap = self.make_versionmap()
97-
98         recoverable_versions = set()
99         for (verinfo, shares) in versionmap.items():
100             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
101hunk ./src/allmydata/mutable/servermap.py 366
102         self._servers_responded = set()
103 
104         # how much data should we read?
105+        # SDMF:
106         #  * if we only need the checkstring, then [0:75]
107         #  * if we need to validate the checkstring sig, then [543ish:799ish]
108         #  * if we need the verification key, then [107:436ish]
109hunk ./src/allmydata/mutable/servermap.py 374
110         #  * if we need the encrypted private key, we want [-1216ish:]
111         #   * but we can't read from negative offsets
112         #   * the offset table tells us the 'ish', also the positive offset
113-        # A future version of the SMDF slot format should consider using
114-        # fixed-size slots so we can retrieve less data. For now, we'll just
115-        # read 2000 bytes, which also happens to read enough actual data to
116-        # pre-fetch a 9-entry dirnode.
117+        # MDMF:
118+        #  * Checkstring? [0:72]
119+        #  * If we want to validate the checkstring, then [0:72], [143:?] --
120+        #    the offset table will tell us for sure.
121+        #  * If we need the verification key, we have to consult the offset
122+        #    table as well.
123+        # At this point, we don't know which we are. Our filenode can
124+        # tell us, but it might be lying -- in some cases, we're
125+        # responsible for telling it which kind of file it is.
126         self._read_size = 4000
127         if mode == MODE_CHECK:
128             # we use unpack_prefix_and_signature, so we need 1k
129hunk ./src/allmydata/mutable/servermap.py 432
130         self._queries_completed = 0
131 
132         sb = self._storage_broker
133+        # All of the peers, permuted by the storage index, as usual.
134         full_peerlist = sb.get_servers_for_index(self._storage_index)
135         self.full_peerlist = full_peerlist # for use later, immutable
136         self.extra_peers = full_peerlist[:] # peers are removed as we use them
137hunk ./src/allmydata/mutable/servermap.py 439
138         self._good_peers = set() # peers who had some shares
139         self._empty_peers = set() # peers who don't have any shares
140         self._bad_peers = set() # peers to whom our queries failed
141+        self._readers = {} # peerid -> dict(sharewriters), filled in
142+                           # after responses come in.
143 
144         k = self._node.get_required_shares()
145hunk ./src/allmydata/mutable/servermap.py 443
146+        # For what cases can these conditions work?
147         if k is None:
148             # make a guess
149             k = 3
150hunk ./src/allmydata/mutable/servermap.py 456
151         self.num_peers_to_query = k + self.EPSILON
152 
153         if self.mode == MODE_CHECK:
154+            # We want to query all of the peers.
155             initial_peers_to_query = dict(full_peerlist)
156             must_query = set(initial_peers_to_query.keys())
157             self.extra_peers = []
158hunk ./src/allmydata/mutable/servermap.py 464
159             # we're planning to replace all the shares, so we want a good
160             # chance of finding them all. We will keep searching until we've
161             # seen epsilon that don't have a share.
162+            # We don't query all of the peers because that could take a while.
163             self.num_peers_to_query = N + self.EPSILON
164             initial_peers_to_query, must_query = self._build_initial_querylist()
165             self.required_num_empty_peers = self.EPSILON
166hunk ./src/allmydata/mutable/servermap.py 474
167             # might also avoid the round trip required to read the encrypted
168             # private key.
169 
170-        else:
171+        else: # MODE_READ, MODE_ANYTHING
172+            # 2k peers is good enough.
173             initial_peers_to_query, must_query = self._build_initial_querylist()
174 
175         # this is a set of peers that we are required to get responses from:
176hunk ./src/allmydata/mutable/servermap.py 490
177         # before we can consider ourselves finished, and self.extra_peers
178         # contains the overflow (peers that we should tap if we don't get
179         # enough responses)
180+        # I guess that self._must_query is a subset of
181+        # initial_peers_to_query?
182+        assert set(must_query).issubset(set(initial_peers_to_query))
183 
184         self._send_initial_requests(initial_peers_to_query)
185         self._status.timings["initial_queries"] = time.time() - self._started
186hunk ./src/allmydata/mutable/servermap.py 549
187         # errors that aren't handled by _query_failed (and errors caused by
188         # _query_failed) get logged, but we still want to check for doneness.
189         d.addErrback(log.err)
190-        d.addBoth(self._check_for_done)
191         d.addErrback(self._fatal_error)
192hunk ./src/allmydata/mutable/servermap.py 550
193+        d.addCallback(self._check_for_done)
194         return d
195 
196     def _do_read(self, ss, peerid, storage_index, shnums, readv):
197hunk ./src/allmydata/mutable/servermap.py 569
198         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
199         return d
200 
201+
202+    def _got_corrupt_share(self, e, shnum, peerid, data, lp):
203+        """
204+        I am called when a remote server returns a corrupt share in
205+        response to one of our queries. By corrupt, I mean a share
206+        without a valid signature. I then record the failure, notify the
207+        server of the corruption, and record the share as bad.
208+        """
209+        f = failure.Failure(e)
210+        self.log(format="bad share: %(f_value)s", f_value=str(f.value),
211+                 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
212+        # Notify the server that its share is corrupt.
213+        self.notify_server_corruption(peerid, shnum, str(e))
214+        # By flagging this as a bad peer, we won't count any of
215+        # the other shares on that peer as valid, though if we
216+        # happen to find a valid version string amongst those
217+        # shares, we'll keep track of it so that we don't need
218+        # to validate the signature on those again.
219+        self._bad_peers.add(peerid)
220+        self._last_failure = f
221+        # XXX: Use the reader for this?
222+        checkstring = data[:SIGNED_PREFIX_LENGTH]
223+        self._servermap.mark_bad_share(peerid, shnum, checkstring)
224+        self._servermap.problems.append(f)
225+
226+
227+    def _cache_good_sharedata(self, verinfo, shnum, now, data):
228+        """
229+        If one of my queries returns successfully (which means that we
230+        were able to and successfully did validate the signature), I
231+        cache the data that we initially fetched from the storage
232+        server. This will help reduce the number of roundtrips that need
233+        to occur when the file is downloaded, or when the file is
234+        updated.
235+        """
236+        self._node._add_to_cache(verinfo, shnum, 0, data, now)
237+
238+
239     def _got_results(self, datavs, peerid, readsize, stuff, started):
240         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
241                       peerid=idlib.shortnodeid_b2a(peerid),
242hunk ./src/allmydata/mutable/servermap.py 630
243         else:
244             self._empty_peers.add(peerid)
245 
246-        last_verinfo = None
247-        last_shnum = None
248+        ss, storage_index = stuff
249+        ds = []
250+
251         for shnum,datav in datavs.items():
252             data = datav[0]
253hunk ./src/allmydata/mutable/servermap.py 635
254-            try:
255-                verinfo = self._got_results_one_share(shnum, data, peerid, lp)
256-                last_verinfo = verinfo
257-                last_shnum = shnum
258-                self._node._add_to_cache(verinfo, shnum, 0, data, now)
259-            except CorruptShareError, e:
260-                # log it and give the other shares a chance to be processed
261-                f = failure.Failure()
262-                self.log(format="bad share: %(f_value)s", f_value=str(f.value),
263-                         failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
264-                self.notify_server_corruption(peerid, shnum, str(e))
265-                self._bad_peers.add(peerid)
266-                self._last_failure = f
267-                checkstring = data[:SIGNED_PREFIX_LENGTH]
268-                self._servermap.mark_bad_share(peerid, shnum, checkstring)
269-                self._servermap.problems.append(f)
270-                pass
271-
272-        self._status.timings["cumulative_verify"] += (time.time() - now)
273+            reader = MDMFSlotReadProxy(ss,
274+                                       storage_index,
275+                                       shnum,
276+                                       data)
277+            self._readers.setdefault(peerid, dict())[shnum] = reader
278+            # our goal, with each response, is to validate the version
279+            # information and share data as best we can at this point --
280+            # we do this by validating the signature. To do this, we
281+            # need to do the following:
282+            #   - If we don't already have the public key, fetch the
283+            #     public key. We use this to validate the signature.
284+            if not self._node.get_pubkey():
285+                # fetch and set the public key.
286+                d = reader.get_verification_key()
287+                d.addCallback(lambda results, shnum=shnum, peerid=peerid:
288+                    self._try_to_set_pubkey(results, peerid, shnum, lp))
289+                # XXX: Make self._pubkey_query_failed?
290+                d.addErrback(lambda error, shnum=shnum, peerid=peerid:
291+                    self._got_corrupt_share(error, shnum, peerid, data, lp))
292+            else:
293+                # we already have the public key.
294+                d = defer.succeed(None)
295+            # Neither of these two branches return anything of
296+            # consequence, so the first entry in our deferredlist will
297+            # be None.
298 
299hunk ./src/allmydata/mutable/servermap.py 661
300-        if self._need_privkey and last_verinfo:
301-            # send them a request for the privkey. We send one request per
302-            # server.
303-            lp2 = self.log("sending privkey request",
304-                           parent=lp, level=log.NOISY)
305-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
306-             offsets_tuple) = last_verinfo
307-            o = dict(offsets_tuple)
308+            # - Next, we need the version information. We almost
309+            #   certainly got this by reading the first thousand or so
310+            #   bytes of the share on the storage server, so we
311+            #   shouldn't need to fetch anything at this step.
312+            d2 = reader.get_verinfo()
313+            d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
314+                self._got_corrupt_share(error, shnum, peerid, data, lp))
315+            # - Next, we need the signature. For an SDMF share, it is
316+            #   likely that we fetched this when doing our initial fetch
317+            #   to get the version information. In MDMF, this lives at
318+            #   the end of the share, so unless the file is quite small,
319+            #   we'll need to do a remote fetch to get it.
320+            d3 = reader.get_signature()
321+            d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
322+                self._got_corrupt_share(error, shnum, peerid, data, lp))
323+            #  Once we have all three of these responses, we can move on
324+            #  to validating the signature
325 
326hunk ./src/allmydata/mutable/servermap.py 679
327-            self._queries_outstanding.add(peerid)
328-            readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
329-            ss = self._servermap.connections[peerid]
330-            privkey_started = time.time()
331-            d = self._do_read(ss, peerid, self._storage_index,
332-                              [last_shnum], readv)
333-            d.addCallback(self._got_privkey_results, peerid, last_shnum,
334-                          privkey_started, lp2)
335-            d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
336-            d.addErrback(log.err)
337-            d.addCallback(self._check_for_done)
338-            d.addErrback(self._fatal_error)
339+            # Does the node already have a privkey? If not, we'll try to
340+            # fetch it here.
341+            if self._need_privkey:
342+                d4 = reader.get_encprivkey()
343+                d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
344+                    self._try_to_validate_privkey(results, peerid, shnum, lp))
345+                d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
346+                    self._privkey_query_failed(error, shnum, data, lp))
347+            else:
348+                d4 = defer.succeed(None)
349 
350hunk ./src/allmydata/mutable/servermap.py 690
351+            dl = defer.DeferredList([d, d2, d3, d4])
352+            dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
353+                self._got_signature_one_share(results, shnum, peerid, lp))
354+            dl.addErrback(lambda error, shnum=shnum, data=data:
355+               self._got_corrupt_share(error, shnum, peerid, data, lp))
356+            dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
357+                self._cache_good_sharedata(verinfo, shnum, now, data))
358+            ds.append(dl)
359+        # dl is a deferred list that will fire when all of the shares
360+        # that we found on this peer are done processing. When dl fires,
361+        # we know that processing is done, so we can decrement the
362+        # semaphore-like thing that we incremented earlier.
363+        dl = defer.DeferredList(ds, fireOnOneErrback=True)
364+        # Are we done? Done means that there are no more queries to
365+        # send, that there are no outstanding queries, and that we
366+        # haven't received any queries that are still processing. If we
367+        # are done, self._check_for_done will cause the done deferred
368+        # that we returned to our caller to fire, which tells them that
369+        # they have a complete servermap, and that we won't be touching
370+        # the servermap anymore.
371+        dl.addCallback(self._check_for_done)
372+        dl.addErrback(self._fatal_error)
373         # all done!
374         self.log("_got_results done", parent=lp, level=log.NOISY)
375hunk ./src/allmydata/mutable/servermap.py 714
376+        return dl
377+
378+
379+    def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
380+        if self._node.get_pubkey():
381+            return # don't go through this again if we don't have to
382+        fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
383+        assert len(fingerprint) == 32
384+        if fingerprint != self._node.get_fingerprint():
385+            raise CorruptShareError(peerid, shnum,
386+                                "pubkey doesn't match fingerprint")
387+        self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
388+        assert self._node.get_pubkey()
389+
390 
391     def notify_server_corruption(self, peerid, shnum, reason):
392         ss = self._servermap.connections[peerid]
393hunk ./src/allmydata/mutable/servermap.py 734
394         ss.callRemoteOnly("advise_corrupt_share",
395                           "mutable", self._storage_index, shnum, reason)
396 
397-    def _got_results_one_share(self, shnum, data, peerid, lp):
398+
399+    def _got_signature_one_share(self, results, shnum, peerid, lp):
400+        # It is our job to give versioninfo to our caller. We need to
401+        # raise CorruptShareError if the share is corrupt for any
402+        # reason, something that our caller will handle.
403         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
404                  shnum=shnum,
405                  peerid=idlib.shortnodeid_b2a(peerid),
406hunk ./src/allmydata/mutable/servermap.py 744
407                  level=log.NOISY,
408                  parent=lp)
409-
410-        # this might raise NeedMoreDataError, if the pubkey and signature
411-        # live at some weird offset. That shouldn't happen, so I'm going to
412-        # treat it as a bad share.
413-        (seqnum, root_hash, IV, k, N, segsize, datalength,
414-         pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
415-
416-        if not self._node.get_pubkey():
417-            fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
418-            assert len(fingerprint) == 32
419-            if fingerprint != self._node.get_fingerprint():
420-                raise CorruptShareError(peerid, shnum,
421-                                        "pubkey doesn't match fingerprint")
422-            self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
423-
424-        if self._need_privkey:
425-            self._try_to_extract_privkey(data, peerid, shnum, lp)
426-
427-        (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
428-         ig_segsize, ig_datalen, offsets) = unpack_header(data)
429+        _, verinfo, signature, __ = results
430+        (seqnum,
431+         root_hash,
432+         saltish,
433+         segsize,
434+         datalen,
435+         k,
436+         n,
437+         prefix,
438+         offsets) = verinfo[1]
439         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
440 
441hunk ./src/allmydata/mutable/servermap.py 756
442-        verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
443+        # XXX: This should be done for us in the method, so
444+        # presumably you can go in there and fix it.
445+        verinfo = (seqnum,
446+                   root_hash,
447+                   saltish,
448+                   segsize,
449+                   datalen,
450+                   k,
451+                   n,
452+                   prefix,
453                    offsets_tuple)
454hunk ./src/allmydata/mutable/servermap.py 767
455+        # This tuple uniquely identifies a share on the grid; we use it
456+        # to keep track of the ones that we've already seen.
457 
458         if verinfo not in self._valid_versions:
459hunk ./src/allmydata/mutable/servermap.py 771
460-            # it's a new pair. Verify the signature.
461-            valid = self._node.get_pubkey().verify(prefix, signature)
462+            # This is a new version tuple, and we need to validate it
463+            # against the public key before keeping track of it.
464+            assert self._node.get_pubkey()
465+            valid = self._node.get_pubkey().verify(prefix, signature[1])
466             if not valid:
467hunk ./src/allmydata/mutable/servermap.py 776
468-                raise CorruptShareError(peerid, shnum, "signature is invalid")
469+                raise CorruptShareError(peerid, shnum,
470+                                        "signature is invalid")
471 
472hunk ./src/allmydata/mutable/servermap.py 779
473-            # ok, it's a valid verinfo. Add it to the list of validated
474-            # versions.
475-            self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
476-                     % (seqnum, base32.b2a(root_hash)[:4],
477-                        idlib.shortnodeid_b2a(peerid), shnum,
478-                        k, N, segsize, datalength),
479-                     parent=lp)
480-            self._valid_versions.add(verinfo)
481-        # We now know that this is a valid candidate verinfo.
482+        # ok, it's a valid verinfo. Add it to the list of validated
483+        # versions.
484+        self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
485+                 % (seqnum, base32.b2a(root_hash)[:4],
486+                    idlib.shortnodeid_b2a(peerid), shnum,
487+                    k, n, segsize, datalen),
488+                    parent=lp)
489+        self._valid_versions.add(verinfo)
490+        # We now know that this is a valid candidate verinfo. Whether or
491+        # not this instance of it is valid is a matter for the next
492+        # statement; at this point, we just know that if we see this
493+        # version info again, that its signature checks out and that
494+        # we're okay to skip the signature-checking step.
495 
496hunk ./src/allmydata/mutable/servermap.py 793
497+        # (peerid, shnum) are bound in the method invocation.
498         if (peerid, shnum) in self._servermap.bad_shares:
499             # we've been told that the rest of the data in this share is
500             # unusable, so don't add it to the servermap.
501hunk ./src/allmydata/mutable/servermap.py 808
502         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
503         return verinfo
504 
505+
506     def _deserialize_pubkey(self, pubkey_s):
507         verifier = rsa.create_verifying_key_from_string(pubkey_s)
508         return verifier
509hunk ./src/allmydata/mutable/servermap.py 813
510 
511-    def _try_to_extract_privkey(self, data, peerid, shnum, lp):
512-        try:
513-            r = unpack_share(data)
514-        except NeedMoreDataError, e:
515-            # this share won't help us. oh well.
516-            offset = e.encprivkey_offset
517-            length = e.encprivkey_length
518-            self.log("shnum %d on peerid %s: share was too short (%dB) "
519-                     "to get the encprivkey; [%d:%d] ought to hold it" %
520-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
521-                      offset, offset+length),
522-                     parent=lp)
523-            # NOTE: if uncoordinated writes are taking place, someone might
524-            # change the share (and most probably move the encprivkey) before
525-            # we get a chance to do one of these reads and fetch it. This
526-            # will cause us to see a NotEnoughSharesError(unable to fetch
527-            # privkey) instead of an UncoordinatedWriteError . This is a
528-            # nuisance, but it will go away when we move to DSA-based mutable
529-            # files (since the privkey will be small enough to fit in the
530-            # write cap).
531-
532-            return
533-
534-        (seqnum, root_hash, IV, k, N, segsize, datalen,
535-         pubkey, signature, share_hash_chain, block_hash_tree,
536-         share_data, enc_privkey) = r
537-
538-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
539 
540     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
541hunk ./src/allmydata/mutable/servermap.py 815
542-
543+        """
544+        Given a writekey from a remote server, I validate it against the
545+        writekey stored in my node. If it is valid, then I set the
546+        privkey and encprivkey properties of the node.
547+        """
548         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
549         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
550         if alleged_writekey != self._node.get_writekey():
551hunk ./src/allmydata/mutable/servermap.py 892
552         self._queries_completed += 1
553         self._last_failure = f
554 
555-    def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
556-        now = time.time()
557-        elapsed = now - started
558-        self._status.add_per_server_time(peerid, "privkey", started, elapsed)
559-        self._queries_outstanding.discard(peerid)
560-        if not self._need_privkey:
561-            return
562-        if shnum not in datavs:
563-            self.log("privkey wasn't there when we asked it",
564-                     level=log.WEIRD, umid="VA9uDQ")
565-            return
566-        datav = datavs[shnum]
567-        enc_privkey = datav[0]
568-        self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
569 
570     def _privkey_query_failed(self, f, peerid, shnum, lp):
571         self._queries_outstanding.discard(peerid)
572hunk ./src/allmydata/mutable/servermap.py 906
573         self._servermap.problems.append(f)
574         self._last_failure = f
575 
576+
577     def _check_for_done(self, res):
578         # exit paths:
579         #  return self._send_more_queries(outstanding) : send some more queries
580hunk ./src/allmydata/mutable/servermap.py 912
581         #  return self._done() : all done
582         #  return : keep waiting, no new queries
583-
584         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
585                               "%(outstanding)d queries outstanding, "
586                               "%(extra)d extra peers available, "
587hunk ./src/allmydata/mutable/servermap.py 1117
588         self._servermap.last_update_time = self._started
589         # the servermap will not be touched after this
590         self.log("servermap: %s" % self._servermap.summarize_versions())
591+
592         eventually(self._done_deferred.callback, self._servermap)
593 
594     def _fatal_error(self, f):
595hunk ./src/allmydata/test/test_mutable.py 613
596         d.addCallback(_created)
597         return d
598 
599-    def publish_multiple(self):
600+    def publish_mdmf(self):
601+        # like publish_one, except that the result is guaranteed to be
602+        # an MDMF file.
603+        # self.CONTENTS should have more than one segment.
604+        self.CONTENTS = "This is an MDMF file" * 100000
605+        self._storage = FakeStorage()
606+        self._nodemaker = make_nodemaker(self._storage)
607+        self._storage_broker = self._nodemaker.storage_broker
608+        d = self._nodemaker.create_mutable_file(self.CONTENTS, version=1)
609+        def _created(node):
610+            self._fn = node
611+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
612+        d.addCallback(_created)
613+        return d
614+
615+
616+    def publish_sdmf(self):
617+        # like publish_one, except that the result is guaranteed to be
618+        # an SDMF file
619+        self.CONTENTS = "This is an SDMF file" * 1000
620+        self._storage = FakeStorage()
621+        self._nodemaker = make_nodemaker(self._storage)
622+        self._storage_broker = self._nodemaker.storage_broker
623+        d = self._nodemaker.create_mutable_file(self.CONTENTS, version=0)
624+        def _created(node):
625+            self._fn = node
626+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
627+        d.addCallback(_created)
628+        return d
629+
630+
631+    def publish_multiple(self, version=0):
632         self.CONTENTS = ["Contents 0",
633                          "Contents 1",
634                          "Contents 2",
635hunk ./src/allmydata/test/test_mutable.py 653
636         self._copied_shares = {}
637         self._storage = FakeStorage()
638         self._nodemaker = make_nodemaker(self._storage)
639-        d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
640+        d = self._nodemaker.create_mutable_file(self.CONTENTS[0], version=version) # seqnum=1
641         def _created(node):
642             self._fn = node
643             # now create multiple versions of the same file, and accumulate
644hunk ./src/allmydata/test/test_mutable.py 881
645         return d
646 
647 
648+    def test_servermapupdater_finds_mdmf_files(self):
649+        # setUp already published an MDMF file for us. We just need to
650+        # make sure that when we run the ServermapUpdater, the file is
651+        # reported to have one recoverable version.
652+        d = defer.succeed(None)
653+        d.addCallback(lambda ignored:
654+            self.publish_mdmf())
655+        d.addCallback(lambda ignored:
656+            self.make_servermap(mode=MODE_CHECK))
657+        # Calling make_servermap also updates the servermap in the mode
658+        # that we specify, so we just need to see what it says.
659+        def _check_servermap(sm):
660+            self.failUnlessEqual(len(sm.recoverable_versions()), 1)
661+        d.addCallback(_check_servermap)
662+        return d
663+
664+
665+    def test_servermapupdater_finds_sdmf_files(self):
666+        d = defer.succeed(None)
667+        d.addCallback(lambda ignored:
668+            self.publish_sdmf())
669+        d.addCallback(lambda ignored:
670+            self.make_servermap(mode=MODE_CHECK))
671+        d.addCallback(lambda servermap:
672+            self.failUnlessEqual(len(servermap.recoverable_versions()), 1))
673+        return d
674+
675 
676 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
677     def setUp(self):
678hunk ./src/allmydata/test/test_mutable.py 1025
679         return d
680     test_no_servers_download.timeout = 15
681 
682+
683     def _test_corrupt_all(self, offset, substring,
684                           should_succeed=False, corrupt_early=True,
685                           failure_checker=None):
686}
687[Make a segmented mutable uploader
688Kevan Carstensen <kevan@isnotajoke.com>**20100624234410
689 Ignore-this: 3e5182612083ff3e11593a4edf5de307
690 
691 The mutable file uploader should be able to publish files with one
692 segment and files with multiple segments. This patch makes it do that.
693 This is still incomplete, and rather ugly -- I need to flesh out error
694 handling, I need to write tests, and I need to remove some of the uglier
695 kludges in the process before I can call this done.
696] {
697hunk ./src/allmydata/mutable/publish.py 8
698 from zope.interface import implements
699 from twisted.internet import defer
700 from twisted.python import failure
701-from allmydata.interfaces import IPublishStatus
702+from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION
703 from allmydata.util import base32, hashutil, mathutil, idlib, log
704 from allmydata import hashtree, codec
705 from allmydata.storage.server import si_b2a
706hunk ./src/allmydata/mutable/publish.py 19
707      UncoordinatedWriteError, NotEnoughServersError
708 from allmydata.mutable.servermap import ServerMap
709 from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
710-     unpack_checkstring, SIGNED_PREFIX
711+     unpack_checkstring, SIGNED_PREFIX, MDMFSlotWriteProxy
712+
713+KiB = 1024
714+DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
715 
716 class PublishStatus:
717     implements(IPublishStatus)
718hunk ./src/allmydata/mutable/publish.py 112
719         self._status.set_helper(False)
720         self._status.set_progress(0.0)
721         self._status.set_active(True)
722+        # We use this to control how the file is written.
723+        version = self._node.get_version()
724+        assert version in (SDMF_VERSION, MDMF_VERSION)
725+        self._version = version
726 
727     def get_status(self):
728         return self._status
729hunk ./src/allmydata/mutable/publish.py 134
730         simultaneous write.
731         """
732 
733-        # 1: generate shares (SDMF: files are small, so we can do it in RAM)
734-        # 2: perform peer selection, get candidate servers
735-        #  2a: send queries to n+epsilon servers, to determine current shares
736-        #  2b: based upon responses, create target map
737-        # 3: send slot_testv_and_readv_and_writev messages
738-        # 4: as responses return, update share-dispatch table
739-        # 4a: may need to run recovery algorithm
740-        # 5: when enough responses are back, we're done
741+        # 0. Setup encoding parameters, encoder, and other such things.
742+        # 1. Encrypt, encode, and publish segments.
743 
744         self.log("starting publish, datalen is %s" % len(newdata))
745         self._status.set_size(len(newdata))
746hunk ./src/allmydata/mutable/publish.py 187
747         self.bad_peers = set() # peerids who have errbacked/refused requests
748 
749         self.newdata = newdata
750-        self.salt = os.urandom(16)
751 
752hunk ./src/allmydata/mutable/publish.py 188
753+        # This will set self.segment_size, self.num_segments, and
754+        # self.fec.
755         self.setup_encoding_parameters()
756 
757         # if we experience any surprises (writes which were rejected because
758hunk ./src/allmydata/mutable/publish.py 238
759             self.bad_share_checkstrings[key] = old_checkstring
760             self.connections[peerid] = self._servermap.connections[peerid]
761 
762-        # create the shares. We'll discard these as they are delivered. SDMF:
763-        # we're allowed to hold everything in memory.
764+        # Now, the process dovetails -- if this is an SDMF file, we need
765+        # to write an SDMF file. Otherwise, we need to write an MDMF
766+        # file.
767+        if self._version == MDMF_VERSION:
768+            return self._publish_mdmf()
769+        else:
770+            return self._publish_sdmf()
771+        #return self.done_deferred
772+
773+    def _publish_mdmf(self):
774+        # Next, we find homes for all of the shares that we don't have
775+        # homes for yet.
776+        # TODO: Make this part do peer selection.
777+        self.update_goal()
778+        self.writers = {}
779+        # For each (peerid, shnum) in self.goal, we make an
780+        # MDMFSlotWriteProxy for that peer. We'll use this to write
781+        # shares to the peer.
782+        for key in self.goal:
783+            peerid, shnum = key
784+            write_enabler = self._node.get_write_enabler(peerid)
785+            renew_secret = self._node.get_renewal_secret(peerid)
786+            cancel_secret = self._node.get_cancel_secret(peerid)
787+            secrets = (write_enabler, renew_secret, cancel_secret)
788+
789+            self.writers[shnum] =  MDMFSlotWriteProxy(shnum,
790+                                                      self.connections[peerid],
791+                                                      self._storage_index,
792+                                                      secrets,
793+                                                      self._new_seqnum,
794+                                                      self.required_shares,
795+                                                      self.total_shares,
796+                                                      self.segment_size,
797+                                                      len(self.newdata))
798+            if (peerid, shnum) in self._servermap.servermap:
799+                old_versionid, old_timestamp = self._servermap.servermap[key]
800+                (old_seqnum, old_root_hash, old_salt, old_segsize,
801+                 old_datalength, old_k, old_N, old_prefix,
802+                 old_offsets_tuple) = old_versionid
803+                old_checkstring = pack_checkstring(old_seqnum,
804+                                                   old_root_hash,
805+                                                   old_salt, 1)
806+                self.writers[shnum].set_checkstring(old_checkstring)
807+
808+        # Now, we start pushing shares.
809+        self._status.timings["setup"] = time.time() - self._started
810+        def _start_pushing(res):
811+            self._started_pushing = time.time()
812+            return res
813+
814+        # First, we encrypt, encode, and publish the shares that we need
815+        # to encrypt, encode, and publish.
816+
817+        # This will eventually hold the block hash chain for each share
818+        # that we publish. We define it this way so that empty publishes
819+        # will still have something to write to the remote slot.
820+        self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
821+        self.sharehash_leaves = None # eventually [sharehashes]
822+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
823+                              # validate the share]
824 
825hunk ./src/allmydata/mutable/publish.py 299
826+        d = defer.succeed(None)
827+        self.log("Starting push")
828+        for i in xrange(self.num_segments - 1):
829+            d.addCallback(lambda ignored, i=i:
830+                self.push_segment(i))
831+            d.addCallback(self._turn_barrier)
832+        # We have at least one segment, so we will have a tail segment
833+        if self.num_segments > 0:
834+            d.addCallback(lambda ignored:
835+                self.push_tail_segment())
836+
837+        d.addCallback(lambda ignored:
838+            self.push_encprivkey())
839+        d.addCallback(lambda ignored:
840+            self.push_blockhashes())
841+        d.addCallback(lambda ignored:
842+            self.push_salthashes())
843+        d.addCallback(lambda ignored:
844+            self.push_sharehashes())
845+        d.addCallback(lambda ignored:
846+            self.push_toplevel_hashes_and_signature())
847+        d.addCallback(lambda ignored:
848+            self.finish_publishing())
849+        return d
850+
851+
852+    def _publish_sdmf(self):
853         self._status.timings["setup"] = time.time() - self._started
854hunk ./src/allmydata/mutable/publish.py 327
855+        self.salt = os.urandom(16)
856+
857         d = self._encrypt_and_encode()
858         d.addCallback(self._generate_shares)
859         def _start_pushing(res):
860hunk ./src/allmydata/mutable/publish.py 340
861 
862         return self.done_deferred
863 
864+
865     def setup_encoding_parameters(self):
866hunk ./src/allmydata/mutable/publish.py 342
867-        segment_size = len(self.newdata)
868+        if self._version == MDMF_VERSION:
869+            segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
870+        else:
871+            segment_size = len(self.newdata) # SDMF is only one segment
872         # this must be a multiple of self.required_shares
873         segment_size = mathutil.next_multiple(segment_size,
874                                               self.required_shares)
875hunk ./src/allmydata/mutable/publish.py 355
876                                                   segment_size)
877         else:
878             self.num_segments = 0
879-        assert self.num_segments in [0, 1,] # SDMF restrictions
880+        if self._version == SDMF_VERSION:
881+            assert self.num_segments in (0, 1) # SDMF
882+            return
883+        # calculate the tail segment size.
884+        self.tail_segment_size = len(self.newdata) % segment_size
885+
886+        if self.tail_segment_size == 0:
887+            # The tail segment is the same size as the other segments.
888+            self.tail_segment_size = segment_size
889+
890+        # We'll make an encoder ahead-of-time for the normal-sized
891+        # segments (defined as any segment of segment_size size.
892+        # (the part of the code that puts the tail segment will make its
893+        #  own encoder for that part)
894+        fec = codec.CRSEncoder()
895+        fec.set_params(self.segment_size,
896+                       self.required_shares, self.total_shares)
897+        self.piece_size = fec.get_block_size()
898+        self.fec = fec
899+        # This is not technically part of the encoding parameters, but
900+        # that we are setting up the encoder and encoding parameters is
901+        # a good indicator that we will soon need it.
902+        self.salt_hashes = []
903+
904+
905+    def push_segment(self, segnum):
906+        started = time.time()
907+        segsize = self.segment_size
908+        self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
909+        data = self.newdata[segsize * segnum:segsize*(segnum + 1)]
910+        assert len(data) == segsize
911+
912+        salt = os.urandom(16)
913+        self.salt_hashes.append(hashutil.mutable_salt_hash(salt))
914+
915+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
916+        enc = AES(key)
917+        crypttext = enc.process(data)
918+        assert len(crypttext) == len(data)
919+
920+        now = time.time()
921+        self._status.timings["encrypt"] = now - started
922+        started = now
923+
924+        # now apply FEC
925+
926+        self._status.set_status("Encoding")
927+        crypttext_pieces = [None] * self.required_shares
928+        piece_size = self.piece_size
929+        for i in range(len(crypttext_pieces)):
930+            offset = i * piece_size
931+            piece = crypttext[offset:offset+piece_size]
932+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
933+            crypttext_pieces[i] = piece
934+            assert len(piece) == piece_size
935+        d = self.fec.encode(crypttext_pieces)
936+        def _done_encoding(res):
937+            elapsed = time.time() - started
938+            self._status.timings["encode"] = elapsed
939+            return res
940+        d.addCallback(_done_encoding)
941+
942+        def _push_shares_and_salt(results):
943+            shares, shareids = results
944+            dl = []
945+            for i in xrange(len(shares)):
946+                sharedata = shares[i]
947+                shareid = shareids[i]
948+                block_hash = hashutil.block_hash(sharedata)
949+                self.blockhashes[shareid].append(block_hash)
950+
951+                # find the writer for this share
952+                d = self.writers[shareid].put_block(sharedata, segnum, salt)
953+                dl.append(d)
954+            # TODO: Naturally, we need to check on the results of these.
955+            return defer.DeferredList(dl)
956+        d.addCallback(_push_shares_and_salt)
957+        return d
958+
959+
960+    def push_tail_segment(self):
961+        # This is essentially the same as push_segment, except that we
962+        # don't use the cached encoder that we use elsewhere.
963+        self.log("Pushing tail segment")
964+        started = time.time()
965+        segsize = self.segment_size
966+        data = self.newdata[segsize * (self.num_segments-1):]
967+        assert len(data) == self.tail_segment_size
968+        salt = os.urandom(16)
969+        self.salt_hashes.append(hashutil.mutable_salt_hash(salt))
970+
971+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
972+        enc = AES(key)
973+        crypttext = enc.process(data)
974+        assert len(crypttext) == len(data)
975+
976+        now = time.time()
977+        self._status.timings['encrypt'] = now - started
978+        started = now
979+
980+        self._status.set_status("Encoding")
981+        tail_fec = codec.CRSEncoder()
982+        tail_fec.set_params(self.tail_segment_size,
983+                            self.required_shares,
984+                            self.total_shares)
985+
986+        crypttext_pieces = [None] * self.required_shares
987+        piece_size = tail_fec.get_block_size()
988+        for i in range(len(crypttext_pieces)):
989+            offset = i * piece_size
990+            piece = crypttext[offset:offset+piece_size]
991+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
992+            crypttext_pieces[i] = piece
993+            assert len(piece) == piece_size
994+        d = tail_fec.encode(crypttext_pieces)
995+        def _push_shares_and_salt(results):
996+            shares, shareids = results
997+            dl = []
998+            for i in xrange(len(shares)):
999+                sharedata = shares[i]
1000+                shareid = shareids[i]
1001+                block_hash = hashutil.block_hash(sharedata)
1002+                self.blockhashes[shareid].append(block_hash)
1003+                # find the writer for this share
1004+                d = self.writers[shareid].put_block(sharedata,
1005+                                                    self.num_segments - 1,
1006+                                                    salt)
1007+                dl.append(d)
1008+            # TODO: Naturally, we need to check on the results of these.
1009+            return defer.DeferredList(dl)
1010+        d.addCallback(_push_shares_and_salt)
1011+        return d
1012+
1013+
1014+    def push_encprivkey(self):
1015+        started = time.time()
1016+        encprivkey = self._encprivkey
1017+        dl = []
1018+        def _spy_on_writer(results):
1019+            print results
1020+            return results
1021+        for shnum, writer in self.writers.iteritems():
1022+            d = writer.put_encprivkey(encprivkey)
1023+            dl.append(d)
1024+        d = defer.DeferredList(dl)
1025+        return d
1026+
1027+
1028+    def push_blockhashes(self):
1029+        started = time.time()
1030+        dl = []
1031+        def _spy_on_results(results):
1032+            print results
1033+            return results
1034+        self.sharehash_leaves = [None] * len(self.blockhashes)
1035+        for shnum, blockhashes in self.blockhashes.iteritems():
1036+            t = hashtree.HashTree(blockhashes)
1037+            self.blockhashes[shnum] = list(t)
1038+            # set the leaf for future use.
1039+            self.sharehash_leaves[shnum] = t[0]
1040+            d = self.writers[shnum].put_blockhashes(self.blockhashes[shnum])
1041+            dl.append(d)
1042+        d = defer.DeferredList(dl)
1043+        return d
1044+
1045+
1046+    def push_salthashes(self):
1047+        started = time.time()
1048+        dl = []
1049+        t = hashtree.HashTree(self.salt_hashes)
1050+        pushing = list(t)
1051+        for shnum in self.writers.iterkeys():
1052+            d = self.writers[shnum].put_salthashes(t)
1053+            dl.append(d)
1054+        dl = defer.DeferredList(dl)
1055+        return dl
1056+
1057+
1058+    def push_sharehashes(self):
1059+        share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
1060+        share_hash_chain = {}
1061+        ds = []
1062+        def _spy_on_results(results):
1063+            print results
1064+            return results
1065+        for shnum in xrange(len(self.sharehash_leaves)):
1066+            needed_indices = share_hash_tree.needed_hashes(shnum)
1067+            self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
1068+                                             for i in needed_indices] )
1069+            d = self.writers[shnum].put_sharehashes(self.sharehashes[shnum])
1070+            ds.append(d)
1071+        self.root_hash = share_hash_tree[0]
1072+        d = defer.DeferredList(ds)
1073+        return d
1074+
1075+
1076+    def push_toplevel_hashes_and_signature(self):
1077+        # We need to to three things here:
1078+        #   - Push the root hash and salt hash
1079+        #   - Get the checkstring of the resulting layout; sign that.
1080+        #   - Push the signature
1081+        ds = []
1082+        def _spy_on_results(results):
1083+            print results
1084+            return results
1085+        for shnum in xrange(self.total_shares):
1086+            d = self.writers[shnum].put_root_hash(self.root_hash)
1087+            ds.append(d)
1088+        d = defer.DeferredList(ds)
1089+        def _make_and_place_signature(ignored):
1090+            signable = self.writers[0].get_signable()
1091+            self.signature = self._privkey.sign(signable)
1092+
1093+            ds = []
1094+            for (shnum, writer) in self.writers.iteritems():
1095+                d = writer.put_signature(self.signature)
1096+                ds.append(d)
1097+            return defer.DeferredList(ds)
1098+        d.addCallback(_make_and_place_signature)
1099+        return d
1100+
1101+
1102+    def finish_publishing(self):
1103+        # We're almost done -- we just need to put the verification key
1104+        # and the offsets
1105+        ds = []
1106+        verification_key = self._pubkey.serialize()
1107+
1108+        def _spy_on_results(results):
1109+            print results
1110+            return results
1111+        for (shnum, writer) in self.writers.iteritems():
1112+            d = writer.put_verification_key(verification_key)
1113+            d.addCallback(lambda ignored, writer=writer:
1114+                writer.finish_publishing())
1115+            ds.append(d)
1116+        return defer.DeferredList(ds)
1117+
1118+
1119+    def _turn_barrier(self, res):
1120+        # putting this method in a Deferred chain imposes a guaranteed
1121+        # reactor turn between the pre- and post- portions of that chain.
1122+        # This can be useful to limit memory consumption: since Deferreds do
1123+        # not do tail recursion, code which uses defer.succeed(result) for
1124+        # consistency will cause objects to live for longer than you might
1125+        # normally expect.
1126+        return fireEventually(res)
1127+
1128 
1129     def _fatal_error(self, f):
1130         self.log("error during loop", failure=f, level=log.UNUSUAL)
1131hunk ./src/allmydata/mutable/publish.py 739
1132             self.log_goal(self.goal, "after update: ")
1133 
1134 
1135-
1136     def _encrypt_and_encode(self):
1137         # this returns a Deferred that fires with a list of (sharedata,
1138         # sharenum) tuples. TODO: cache the ciphertext, only produce the
1139hunk ./src/allmydata/mutable/publish.py 780
1140         d.addCallback(_done_encoding)
1141         return d
1142 
1143+
1144     def _generate_shares(self, shares_and_shareids):
1145         # this sets self.shares and self.root_hash
1146         self.log("_generate_shares")
1147hunk ./src/allmydata/mutable/publish.py 1168
1148             self._status.set_progress(1.0)
1149         eventually(self.done_deferred.callback, res)
1150 
1151-
1152hunk ./src/allmydata/test/test_mutable.py 225
1153         d.addCallback(_created)
1154         return d
1155 
1156+
1157+    def test_create_mdmf(self):
1158+        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
1159+        def _created(n):
1160+            self.failUnless(isinstance(n, MutableFileNode))
1161+            self.failUnlessEqual(n.get_storage_index(), n._storage_index)
1162+            sb = self.nodemaker.storage_broker
1163+            peer0 = sorted(sb.get_all_serverids())[0]
1164+            shnums = self._storage._peers[peer0].keys()
1165+            self.failUnlessEqual(len(shnums), 1)
1166+        d.addCallback(_created)
1167+        return d
1168+
1169+
1170     def test_serialize(self):
1171         n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
1172         calls = []
1173hunk ./src/allmydata/test/test_mutable.py 311
1174         d.addCallback(_created)
1175         return d
1176 
1177+
1178+    def test_create_mdmf_with_initial_contents(self):
1179+        initial_contents = "foobarbaz" * 131072 # 900KiB
1180+        d = self.nodemaker.create_mutable_file(initial_contents,
1181+                                               version=MDMF_VERSION)
1182+        def _created(n):
1183+            d = n.download_best_version()
1184+            d.addCallback(lambda data:
1185+                self.failUnlessEqual(data, initial_contents))
1186+            d.addCallback(lambda ignored:
1187+                n.overwrite(initial_contents + "foobarbaz"))
1188+            d.addCallback(lambda ignored:
1189+                n.download_best_version())
1190+            d.addCallback(lambda data:
1191+                self.failUnlessEqual(data, initial_contents +
1192+                                           "foobarbaz"))
1193+            return d
1194+        d.addCallback(_created)
1195+        return d
1196+
1197+
1198     def test_create_with_initial_contents_function(self):
1199         data = "initial contents"
1200         def _make_contents(n):
1201hunk ./src/allmydata/test/test_mutable.py 347
1202         d.addCallback(lambda data2: self.failUnlessEqual(data2, data))
1203         return d
1204 
1205+
1206+    def test_create_mdmf_with_initial_contents_function(self):
1207+        data = "initial contents" * 100000
1208+        def _make_contents(n):
1209+            self.failUnless(isinstance(n, MutableFileNode))
1210+            key = n.get_writekey()
1211+            self.failUnless(isinstance(key, str), key)
1212+            self.failUnlessEqual(len(key), 16)
1213+            return data
1214+        d = self.nodemaker.create_mutable_file(_make_contents,
1215+                                               version=MDMF_VERSION)
1216+        d.addCallback(lambda n:
1217+            n.download_best_version())
1218+        d.addCallback(lambda data2:
1219+            self.failUnlessEqual(data2, data))
1220+        return d
1221+
1222+
1223     def test_create_with_too_large_contents(self):
1224         BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
1225         d = self.nodemaker.create_mutable_file(BIG)
1226}
1227[Misc. changes to support the work I'm doing
1228Kevan Carstensen <kevan@isnotajoke.com>**20100624234637
1229 Ignore-this: fdd18fa8cc05f4b4b15ff53ee24a1819
1230 
1231     - Add a notion of file version number to interfaces.py
1232     - Alter mutable file node interfaces to have a notion of version,
1233       though this may be changed later.
1234     - Alter mutable/filenode.py to conform to these changes.
1235     - Add a salt hasher to util/hashutil.py
1236] {
1237hunk ./src/allmydata/interfaces.py 7
1238      ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
1239 
1240 HASH_SIZE=32
1241+SALT_SIZE=16
1242+
1243+SDMF_VERSION=0
1244+MDMF_VERSION=1
1245 
1246 Hash = StringConstraint(maxLength=HASH_SIZE,
1247                         minLength=HASH_SIZE)# binary format 32-byte SHA256 hash
1248hunk ./src/allmydata/interfaces.py 811
1249         writer-visible data using this writekey.
1250         """
1251 
1252+    def set_version(version):
1253+        """Tahoe-LAFS supports SDMF and MDMF mutable files. By default,
1254+        we upload in SDMF for reasons of compatibility. If you want to
1255+        change this, set_version will let you do that.
1256+
1257+        To say that this file should be uploaded in SDMF, pass in a 0. To
1258+        say that the file should be uploaded as MDMF, pass in a 1.
1259+        """
1260+
1261+    def get_version():
1262+        """Returns the mutable file protocol version."""
1263+
1264 class NotEnoughSharesError(Exception):
1265     """Download was unable to get enough shares"""
1266 
1267hunk ./src/allmydata/mutable/filenode.py 8
1268 from twisted.internet import defer, reactor
1269 from foolscap.api import eventually
1270 from allmydata.interfaces import IMutableFileNode, \
1271-     ICheckable, ICheckResults, NotEnoughSharesError
1272+     ICheckable, ICheckResults, NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION
1273 from allmydata.util import hashutil, log
1274 from allmydata.util.assertutil import precondition
1275 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
1276hunk ./src/allmydata/mutable/filenode.py 67
1277         self._sharemap = {} # known shares, shnum-to-[nodeids]
1278         self._cache = ResponseCache()
1279         self._most_recent_size = None
1280+        # filled in after __init__ if we're being created for the first time;
1281+        # filled in by the servermap updater before publishing, otherwise.
1282+        # set to this default value in case neither of those things happen,
1283+        # or in case the servermap can't find any shares to tell us what
1284+        # to publish as.
1285+        # TODO: Set this back to None, and find out why the tests fail
1286+        #       with it set to None.
1287+        self._protocol_version = SDMF_VERSION
1288 
1289         # all users of this MutableFileNode go through the serializer. This
1290         # takes advantage of the fact that Deferreds discard the callbacks
1291hunk ./src/allmydata/mutable/filenode.py 472
1292     def _did_upload(self, res, size):
1293         self._most_recent_size = size
1294         return res
1295+
1296+
1297+    def set_version(self, version):
1298+        # I can be set in two ways:
1299+        #  1. When the node is created.
1300+        #  2. (for an existing share) when the Servermap is updated
1301+        #     before I am read.
1302+        assert version in (MDMF_VERSION, SDMF_VERSION)
1303+        self._protocol_version = version
1304+
1305+
1306+    def get_version(self):
1307+        return self._protocol_version
1308hunk ./src/allmydata/util/hashutil.py 90
1309 MUTABLE_READKEY_TAG = "allmydata_mutable_writekey_to_readkey_v1"
1310 MUTABLE_DATAKEY_TAG = "allmydata_mutable_readkey_to_datakey_v1"
1311 MUTABLE_STORAGEINDEX_TAG = "allmydata_mutable_readkey_to_storage_index_v1"
1312+MUTABLE_SALT_TAG = "allmydata_mutable_segment_salt_v1"
1313 
1314 # dirnodes
1315 DIRNODE_CHILD_WRITECAP_TAG = "allmydata_mutable_writekey_and_salt_to_dirnode_child_capkey_v1"
1316hunk ./src/allmydata/util/hashutil.py 134
1317 def plaintext_segment_hasher():
1318     return tagged_hasher(PLAINTEXT_SEGMENT_TAG)
1319 
1320+def mutable_salt_hash(data):
1321+    return tagged_hash(MUTABLE_SALT_TAG, data)
1322+def mutable_salt_hasher():
1323+    return tagged_hasher(MUTABLE_SALT_TAG)
1324+
1325 KEYLEN = 16
1326 IVLEN = 16
1327 
1328}
1329[nodemaker.py: create MDMF files when asked to
1330Kevan Carstensen <kevan@isnotajoke.com>**20100624234833
1331 Ignore-this: 26c16aaca9ddab7a7ce37a4530bc970
1332] {
1333hunk ./src/allmydata/nodemaker.py 3
1334 import weakref
1335 from zope.interface import implements
1336-from allmydata.interfaces import INodeMaker
1337+from allmydata.util.assertutil import precondition
1338+from allmydata.interfaces import INodeMaker, MustBeDeepImmutableError, \
1339+                                 SDMF_VERSION, MDMF_VERSION
1340 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
1341 from allmydata.immutable.upload import Data
1342 from allmydata.mutable.filenode import MutableFileNode
1343hunk ./src/allmydata/nodemaker.py 92
1344             return self._create_dirnode(filenode)
1345         return None
1346 
1347-    def create_mutable_file(self, contents=None, keysize=None):
1348+    def create_mutable_file(self, contents=None, keysize=None,
1349+                            version=SDMF_VERSION):
1350         n = MutableFileNode(self.storage_broker, self.secret_holder,
1351                             self.default_encoding_parameters, self.history)
1352hunk ./src/allmydata/nodemaker.py 96
1353+        n.set_version(version)
1354         d = self.key_generator.generate(keysize)
1355         d.addCallback(n.create_with_keys, contents)
1356         d.addCallback(lambda res: n)
1357hunk ./src/allmydata/nodemaker.py 102
1358         return d
1359 
1360-    def create_new_mutable_directory(self, initial_children={}):
1361+    def create_new_mutable_directory(self, initial_children={},
1362+                                     version=SDMF_VERSION):
1363+        # initial_children must have metadata (i.e. {} instead of None)
1364+        for (name, (node, metadata)) in initial_children.iteritems():
1365+            precondition(isinstance(metadata, dict),
1366+                         "create_new_mutable_directory requires metadata to be a dict, not None", metadata)
1367+            node.raise_error()
1368         d = self.create_mutable_file(lambda n:
1369hunk ./src/allmydata/nodemaker.py 110
1370-                                     pack_children(n, initial_children))
1371+                                     pack_children(n, initial_children),
1372+                                     version)
1373         d.addCallback(self._create_dirnode)
1374         return d
1375 
1376}
1377[storage/server.py: minor code cleanup
1378Kevan Carstensen <kevan@isnotajoke.com>**20100624234905
1379 Ignore-this: 2358c531c39e48d3c8e56b62b5768228
1380] {
1381hunk ./src/allmydata/storage/server.py 569
1382                                          self)
1383         return share
1384 
1385-    def remote_slot_readv(self, storage_index, shares, readv):
1386+    def remote_slot_readv(self, storage_index, shares, readvs):
1387         start = time.time()
1388         self.count("readv")
1389         si_s = si_b2a(storage_index)
1390hunk ./src/allmydata/storage/server.py 590
1391             if sharenum in shares or not shares:
1392                 filename = os.path.join(bucketdir, sharenum_s)
1393                 msf = MutableShareFile(filename, self)
1394-                datavs[sharenum] = msf.readv(readv)
1395+                datavs[sharenum] = msf.readv(readvs)
1396         log.msg("returning shares %s" % (datavs.keys(),),
1397                 facility="tahoe.storage", level=log.NOISY, parent=lp)
1398         self.add_latency("readv", time.time() - start)
1399}
1400[test/test_mutable.py: alter some tests that were failing due to MDMF; minor code cleanup.
1401Kevan Carstensen <kevan@isnotajoke.com>**20100624234924
1402 Ignore-this: afb86ec1fbdbfe1a5ef6f46f350273c0
1403] {
1404hunk ./src/allmydata/test/test_mutable.py 151
1405             chr(ord(original[byte_offset]) ^ 0x01) +
1406             original[byte_offset+1:])
1407 
1408+def add_two(original, byte_offset):
1409+    # It isn't enough to simply flip the bit for the version number,
1410+    # because 1 is a valid version number. So we add two instead.
1411+    return (original[:byte_offset] +
1412+            chr(ord(original[byte_offset]) ^ 0x02) +
1413+            original[byte_offset+1:])
1414+
1415 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
1416     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
1417     # list of shnums to corrupt.
1418hunk ./src/allmydata/test/test_mutable.py 187
1419                 real_offset = offset1
1420             real_offset = int(real_offset) + offset2 + offset_offset
1421             assert isinstance(real_offset, int), offset
1422-            shares[shnum] = flip_bit(data, real_offset)
1423+            if offset1 == 0: # verbyte
1424+                f = add_two
1425+            else:
1426+                f = flip_bit
1427+            shares[shnum] = f(data, real_offset)
1428     return res
1429 
1430 def make_storagebroker(s=None, num_peers=10):
1431hunk ./src/allmydata/test/test_mutable.py 476
1432         d.addCallback(_created)
1433         return d
1434 
1435+
1436     def test_modify_backoffer(self):
1437         def _modifier(old_contents, servermap, first_time):
1438             return old_contents + "line2"
1439hunk ./src/allmydata/test/test_mutable.py 742
1440         d.addCallback(_created)
1441         return d
1442 
1443+
1444     def _copy_shares(self, ignored, index):
1445         shares = self._storage._peers
1446         # we need a deep copy
1447}
1448[Add MDMF reader and writer
1449Kevan Carstensen <kevan@isnotajoke.com>**20100626003324
1450 Ignore-this: 35612135b9f9f5da102c4ec89c059bf0
1451 
1452 The MDMF/SDMF reader and MDMF writer are similar to the object proxies
1453 that exist for immutable files. They abstract away details of
1454 connection, state, and caching from their callers (in this case, the
1455 download, servermap updater, and uploader), and expose methods to get
1456 and set information on the remote server.
1457 
1458 MDMFSlotReadProxy reads a mutable file from the server, doing the right
1459 thing (in most cases) regardless of whether the file is MDMF or SDMF. It
1460 allows callers to tell it how to batch and flush reads.
1461 
1462 MDMFSlotWriteProxy writes an MDMF mutable file to a server.
1463 
1464 This patch also includes tests for MDMFSlotReadProxy and
1465 MDMFSlotWriteProxy.
1466] {
1467hunk ./src/allmydata/mutable/layout.py 4
1468 
1469 import struct
1470 from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
1471+from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
1472+                                 MDMF_VERSION
1473+from allmydata.util import mathutil, observer
1474+from twisted.python import failure
1475+from twisted.internet import defer
1476+
1477+
1478+# These strings describe the format of the packed structs they help process
1479+# Here's what they mean:
1480+#
1481+#  PREFIX:
1482+#    >: Big-endian byte order; the most significant byte is first (leftmost).
1483+#    B: The version information; an 8 bit version identifier. Stored as
1484+#       an unsigned char. This is currently 00 00 00 00; our modifications
1485+#       will turn it into 00 00 00 01.
1486+#    Q: The sequence number; this is sort of like a revision history for
1487+#       mutable files; they start at 1 and increase as they are changed after
1488+#       being uploaded. Stored as an unsigned long long, which is 8 bytes in
1489+#       length.
1490+#  32s: The root hash of the share hash tree. We use sha-256d, so we use 32
1491+#       characters = 32 bytes to store the value.
1492+#  16s: The salt for the readkey. This is a 16-byte random value, stored as
1493+#       16 characters.
1494+#
1495+#  SIGNED_PREFIX additions, things that are covered by the signature:
1496+#    B: The "k" encoding parameter. We store this as an 8-bit character,
1497+#       which is convenient because our erasure coding scheme cannot
1498+#       encode if you ask for more than 255 pieces.
1499+#    B: The "N" encoding parameter. Stored as an 8-bit character for the
1500+#       same reasons as above.
1501+#    Q: The segment size of the uploaded file. This will essentially be the
1502+#       length of the file in SDMF. An unsigned long long, so we can store
1503+#       files of quite large size.
1504+#    Q: The data length of the uploaded file. Modulo padding, this will be
1505+#       the same of the data length field. Like the data length field, it is
1506+#       an unsigned long long and can be quite large.
1507+#
1508+#   HEADER additions:
1509+#     L: The offset of the signature of this. An unsigned long.
1510+#     L: The offset of the share hash chain. An unsigned long.
1511+#     L: The offset of the block hash tree. An unsigned long.
1512+#     L: The offset of the share data. An unsigned long.
1513+#     Q: The offset of the encrypted private key. An unsigned long long, to
1514+#        account for the possibility of a lot of share data.
1515+#     Q: The offset of the EOF. An unsigned long long, to account for the
1516+#        possibility of a lot of share data.
1517+#
1518+#  After all of these, we have the following:
1519+#    - The verification key: Occupies the space between the end of the header
1520+#      and the start of the signature (i.e.: data[HEADER_LENGTH:o['signature']].
1521+#    - The signature, which goes from the signature offset to the share hash
1522+#      chain offset.
1523+#    - The share hash chain, which goes from the share hash chain offset to
1524+#      the block hash tree offset.
1525+#    - The share data, which goes from the share data offset to the encrypted
1526+#      private key offset.
1527+#    - The encrypted private key offset, which goes until the end of the file.
1528+#
1529+#  The block hash tree in this encoding has only one share, so the offset of
1530+#  the share data will be 32 bits more than the offset of the block hash tree.
1531+#  Given this, we may need to check to see how many bytes a reasonably sized
1532+#  block hash tree will take up.
1533 
1534 PREFIX = ">BQ32s16s" # each version has a different prefix
1535 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
1536hunk ./src/allmydata/mutable/layout.py 191
1537     return (share_hash_chain, block_hash_tree, share_data)
1538 
1539 
1540-def pack_checkstring(seqnum, root_hash, IV):
1541+def pack_checkstring(seqnum, root_hash, IV, version=0):
1542     return struct.pack(PREFIX,
1543hunk ./src/allmydata/mutable/layout.py 193
1544-                       0, # version,
1545+                       version,
1546                        seqnum,
1547                        root_hash,
1548                        IV)
1549hunk ./src/allmydata/mutable/layout.py 266
1550                            encprivkey])
1551     return final_share
1552 
1553+def pack_prefix(seqnum, root_hash, IV,
1554+                required_shares, total_shares,
1555+                segment_size, data_length):
1556+    prefix = struct.pack(SIGNED_PREFIX,
1557+                         0, # version,
1558+                         seqnum,
1559+                         root_hash,
1560+                         IV,
1561+                         required_shares,
1562+                         total_shares,
1563+                         segment_size,
1564+                         data_length,
1565+                         )
1566+    return prefix
1567+
1568+
1569+MDMFHEADER = ">BQ32s32sBBQQ LQQQQQQQ"
1570+MDMFHEADERWITHOUTOFFSETS = ">BQ32s32sBBQQ"
1571+MDMFHEADERSIZE = struct.calcsize(MDMFHEADER)
1572+MDMFCHECKSTRING = ">BQ32s32s"
1573+MDMFSIGNABLEHEADER = ">BQ32s32sBBQQ"
1574+MDMFOFFSETS = ">LQQQQQQQ"
1575+
1576+class MDMFSlotWriteProxy:
1577+    #implements(IMutableSlotWriter) TODO
1578+
1579+    """
1580+    I represent a remote write slot for an MDMF mutable file.
1581+
1582+    I abstract away from my caller the details of block and salt
1583+    management, and the implementation of the on-disk format for MDMF
1584+    shares.
1585+    """
1586+
1587+    # Expected layout, MDMF:
1588+    # offset:     size:       name:
1589+    #-- signed part --
1590+    # 0           1           version number (01)
1591+    # 1           8           sequence number
1592+    # 9           32          share tree root hash
1593+    # 41          32          salt tree root hash
1594+    # 73          1           The "k" encoding parameter
1595+    # 74          1           The "N" encoding parameter
1596+    # 75          8           The segment size of the uploaded file
1597+    # 83          8           The data length of the uploaded file
1598+    #-- end signed part --
1599+    # 91          4           The offset of the share data
1600+    # 95          8           The offset of the encrypted private key
1601+    # 103         8           The offset of the block hash tree
1602+    # 111         8           The offset of the salt hash tree
1603+    # 119         8           The offset of the signature hash chain
1604+    # 127         8           The offset of the signature
1605+    # 135         8           The offset of the verification key
1606+    # 143         8           offset of the EOF
1607+    #
1608+    # followed by salts, share data, the encrypted private key, the
1609+    # block hash tree, the salt hash tree, the share hash chain, a
1610+    # signature over the first eight fields, and a verification key.
1611+    #
1612+    # The checkstring is the first four fields -- the version number,
1613+    # sequence number, root hash and root salt hash. This is consistent
1614+    # in meaning to what we have with SDMF files, except now instead of
1615+    # using the literal salt, we use a value derived from all of the
1616+    # salts.
1617+    #
1618+    # The ordering of the offsets is different to reflect the dependencies
1619+    # that we'll run into with an MDMF file. The expected write flow is
1620+    # something like this:
1621+    #
1622+    #   0: Initialize with the sequence number, encoding
1623+    #      parameters and data length. From this, we can deduce the
1624+    #      number of segments, and from that we can deduce the size of
1625+    #      the AES salt field, telling us where to write AES salts, and
1626+    #      where to write share data. We can also figure out where the
1627+    #      encrypted private key should go, because we can figure out
1628+    #      how big the share data will be.
1629+    #
1630+    #   1: Encrypt, encode, and upload the file in chunks. Do something
1631+    #      like
1632+    #
1633+    #       put_block(data, segnum, salt)
1634+    #
1635+    #      to write a block and a salt to the disk. We can do both of
1636+    #      these operations now because we have enough of the offsets to
1637+    #      know where to put them.
1638+    #
1639+    #   2: Put the encrypted private key. Use:
1640+    #
1641+    #        put_encprivkey(encprivkey)
1642+    #
1643+    #      Now that we know the length of the private key, we can fill
1644+    #      in the offset for the block hash tree.
1645+    #
1646+    #   3: We're now in a position to upload the block hash tree for
1647+    #      a share. Put that using something like:
1648+    #       
1649+    #        put_blockhashes(block_hash_tree)
1650+    #
1651+    #      Note that block_hash_tree is a list of hashes -- we'll take
1652+    #      care of the details of serializing that appropriately. When
1653+    #      we get the block hash tree, we are also in a position to
1654+    #      calculate the offset for the share hash chain, and fill that
1655+    #      into the offsets table.
1656+    #
1657+    #   4: At the same time, we're in a position to upload the salt hash
1658+    #      tree. This is a Merkle tree over all of the salts. We use a
1659+    #      Merkle tree so that we can validate each block,salt pair as
1660+    #      we download them later. We do this using
1661+    #
1662+    #        put_salthashes(salt_hash_tree)
1663+    #
1664+    #      When you do this, I automatically put the root of the tree
1665+    #      (the hash at index 0 of the list) in its appropriate slot in
1666+    #      the signed prefix of the share.
1667+    #
1668+    #   5: We're now in a position to upload the share hash chain for
1669+    #      a share. Do that with something like:
1670+    #     
1671+    #        put_sharehashes(share_hash_chain)
1672+    #
1673+    #      share_hash_chain should be a dictionary mapping shnums to
1674+    #      32-byte hashes -- the wrapper handles serialization.
1675+    #      We'll know where to put the signature at this point, also.
1676+    #      The root of this tree will be put explicitly in the next
1677+    #      step.
1678+    #
1679+    #      TODO: Why? Why not just include it in the tree here?
1680+    #
1681+    #   6: Before putting the signature, we must first put the
1682+    #      root_hash. Do this with:
1683+    #
1684+    #        put_root_hash(root_hash).
1685+    #     
1686+    #      In terms of knowing where to put this value, it was always
1687+    #      possible to place it, but it makes sense semantically to
1688+    #      place it after the share hash tree, so that's why you do it
1689+    #      in this order.
1690+    #
1691+    #   6: With the root hash put, we can now sign the header. Use:
1692+    #
1693+    #        get_signable()
1694+    #
1695+    #      to get the part of the header that you want to sign, and use:
1696+    #       
1697+    #        put_signature(signature)
1698+    #
1699+    #      to write your signature to the remote server.
1700+    #
1701+    #   6: Add the verification key, and finish. Do:
1702+    #
1703+    #        put_verification_key(key)
1704+    #
1705+    #      and
1706+    #
1707+    #        finish_publish()
1708+    #
1709+    # Checkstring management:
1710+    #
1711+    # To write to a mutable slot, we have to provide test vectors to ensure
1712+    # that we are writing to the same data that we think we are. These
1713+    # vectors allow us to detect uncoordinated writes; that is, writes
1714+    # where both we and some other shareholder are writing to the
1715+    # mutable slot, and to report those back to the parts of the program
1716+    # doing the writing.
1717+    #
1718+    # With SDMF, this was easy -- all of the share data was written in
1719+    # one go, so it was easy to detect uncoordinated writes, and we only
1720+    # had to do it once. With MDMF, not all of the file is written at
1721+    # once.
1722+    #
1723+    # If a share is new, we write out as much of the header as we can
1724+    # before writing out anything else. This gives other writers a
1725+    # canary that they can use to detect uncoordinated writes, and, if
1726+    # they do the same thing, gives us the same canary. We them update
1727+    # the share. We won't be able to write out two fields of the header
1728+    # -- the share tree hash and the salt hash -- until we finish
1729+    # writing out the share. We only require the writer to provide the
1730+    # initial checkstring, and keep track of what it should be after
1731+    # updates ourselves.
1732+    #
1733+    # If we haven't written anything yet, then on the first write (which
1734+    # will probably be a block + salt of a share), we'll also write out
1735+    # the header. On subsequent passes, we'll expect to see the header.
1736+    # This changes in two places:
1737+    #
1738+    #   - When we write out the salt hash
1739+    #   - When we write out the root of the share hash tree
1740+    #
1741+    # since these values will change the header. It is possible that we
1742+    # can just make those be written in one operation to minimize
1743+    # disruption.
1744+    def __init__(self,
1745+                 shnum,
1746+                 rref, # a remote reference to a storage server
1747+                 storage_index,
1748+                 secrets, # (write_enabler, renew_secret, cancel_secret)
1749+                 seqnum, # the sequence number of the mutable file
1750+                 required_shares,
1751+                 total_shares,
1752+                 segment_size,
1753+                 data_length): # the length of the original file
1754+        self._shnum = shnum
1755+        self._rref = rref
1756+        self._storage_index = storage_index
1757+        self._seqnum = seqnum
1758+        self._required_shares = required_shares
1759+        assert self._shnum >= 0 and self._shnum < total_shares
1760+        self._total_shares = total_shares
1761+        # We build up the offset table as we write things. It is the
1762+        # last thing we write to the remote server.
1763+        self._offsets = {}
1764+        self._testvs = []
1765+        self._secrets = secrets
1766+        # The segment size needs to be a multiple of the k parameter --
1767+        # any padding should have been carried out by the publisher
1768+        # already.
1769+        assert segment_size % required_shares == 0
1770+        self._segment_size = segment_size
1771+        self._data_length = data_length
1772+
1773+        # These are set later -- we define them here so that we can
1774+        # check for their existence easily
1775+
1776+        # This is the root of the share hash tree -- the Merkle tree
1777+        # over the roots of the block hash trees computed for shares in
1778+        # this upload.
1779+        self._root_hash = None
1780+        # This is the root of the salt hash tree -- the Merkle tree over
1781+        # the hashes of the salts used for each segment of the file.
1782+        self._salt_hash = None
1783+
1784+        # We haven't yet written anything to the remote bucket. By
1785+        # setting this, we tell the _write method as much. The write
1786+        # method will then know that it also needs to add a write vector
1787+        # for the checkstring (or what we have of it) to the first write
1788+        # request. We'll then record that value for future use.  If
1789+        # we're expecting something to be there already, we need to call
1790+        # set_checkstring before we write anything to tell the first
1791+        # write about that.
1792+        self._written = False
1793+
1794+        # When writing data to the storage servers, we get a read vector
1795+        # for free. We'll read the checkstring, which will help us
1796+        # figure out what's gone wrong if a write fails.
1797+        self._readv = [(0, struct.calcsize(MDMFCHECKSTRING))]
1798+
1799+        # We calculate the number of segments because it tells us
1800+        # where the salt part of the file ends/share segment begins,
1801+        # and also because it provides a useful amount of bounds checking.
1802+        self._num_segments = mathutil.div_ceil(self._data_length,
1803+                                               self._segment_size)
1804+        self._block_size = self._segment_size / self._required_shares
1805+        # We also calculate the share size, to help us with block
1806+        # constraints later.
1807+        tail_size = self._data_length % self._segment_size
1808+        if not tail_size:
1809+            self._tail_block_size = self._block_size
1810+        else:
1811+            self._tail_block_size = mathutil.next_multiple(tail_size,
1812+                                                           self._required_shares)
1813+            self._tail_block_size /= self._required_shares
1814+
1815+        # We already know where the AES salts start; right after the end
1816+        # of the header (which is defined as the signable part + the offsets)
1817+        # We need to calculate where the share data starts, since we're
1818+        # responsible (after this method) for being able to write it.
1819+        self._offsets['share-data'] = MDMFHEADERSIZE
1820+        self._offsets['share-data'] += self._num_segments * SALT_SIZE
1821+        # We can also calculate where the encrypted private key begins
1822+        # from what we know know.
1823+        self._offsets['enc_privkey'] = self._offsets['share-data']
1824+        self._offsets['enc_privkey'] += self._block_size * (self._num_segments - 1)
1825+        self._offsets['enc_privkey'] += self._tail_block_size
1826+        # We'll wait for the rest. Callers can now call my "put_block" and
1827+        # "set_checkstring" methods.
1828+
1829+
1830+    def set_checkstring(self, checkstring):
1831+        """
1832+        Set checkstring checkstring for the given shnum.
1833+
1834+        By default, I assume that I am writing new shares to the grid.
1835+        If you don't explcitly set your own checkstring, I will use
1836+        one that requires that the remote share not exist. You will want
1837+        to use this method if you are updating a share in-place;
1838+        otherwise, writes will fail.
1839+        """
1840+        # You're allowed to overwrite checkstrings with this method;
1841+        # I assume that users know what they are doing when they call
1842+        # it.
1843+        if checkstring == "":
1844+            # We special-case this, since len("") = 0, but we need
1845+            # length of 1 for the case of an empty share to work on the
1846+            # storage server, which is what a checkstring that is the
1847+            # empty string means.
1848+            self._testvs = []
1849+        else:
1850+            self._testvs = []
1851+            self._testvs.append((0, len(checkstring), "eq", checkstring))
1852+
1853+
1854+    def __repr__(self):
1855+        return "MDMFSlotWriteProxy for share %d" % self._shnum
1856+
1857+
1858+    def get_checkstring(self):
1859+        """
1860+        Given a share number, I return a representation of what the
1861+        checkstring for that share on the server will look like.
1862+        """
1863+        if self._root_hash:
1864+            roothash = self._root_hash
1865+        else:
1866+            roothash = "\x00" * 32
1867+        # self._salt_hash and self._root_hash means that we've written
1868+        # both of these things to the server. self._salt_hash will be
1869+        # set first, though, and if self._root_hash isn't also set then
1870+        # neither of them are written to the server, so we need to leave
1871+        # them alone.
1872+        if self._salt_hash and self._root_hash:
1873+            salthash = self._salt_hash
1874+        else:
1875+            salthash = "\x00" * 32
1876+        checkstring = struct.pack(MDMFCHECKSTRING,
1877+                                  1,
1878+                                  self._seqnum,
1879+                                  roothash,
1880+                                  salthash)
1881+        return checkstring
1882+
1883+
1884+    def put_block(self, data, segnum, salt):
1885+        """
1886+        Put the encrypted-and-encoded data segment in the slot, along
1887+        with the salt.
1888+        """
1889+        if segnum >= self._num_segments:
1890+            raise LayoutInvalid("I won't overwrite the private key")
1891+        if len(salt) != SALT_SIZE:
1892+            raise LayoutInvalid("I was given a salt of size %d, but "
1893+                                "I wanted a salt of size %d")
1894+        if segnum + 1 == self._num_segments:
1895+            if len(data) != self._tail_block_size:
1896+                raise LayoutInvalid("I was given the wrong size block to write")
1897+        elif len(data) != self._block_size:
1898+            raise LayoutInvalid("I was given the wrong size block to write")
1899+
1900+        # We want to write at offsets['share-data'] + segnum * block_size.
1901+        assert self._offsets
1902+        assert self._offsets['share-data']
1903+
1904+        offset = self._offsets['share-data'] + segnum * self._block_size
1905+        datavs = [tuple([offset, data])]
1906+        # We also have to write the salt. This is at:
1907+        salt_offset = MDMFHEADERSIZE + SALT_SIZE * segnum
1908+        datavs.append(tuple([salt_offset, salt]))
1909+        return self._write(datavs)
1910+
1911+
1912+    def put_encprivkey(self, encprivkey):
1913+        """
1914+        Put the encrypted private key in the remote slot.
1915+        """
1916+        assert self._offsets
1917+        assert self._offsets['enc_privkey']
1918+        # You shouldn't re-write the encprivkey after the block hash
1919+        # tree is written, since that could cause the private key to run
1920+        # into the block hash tree. Before it writes the block hash
1921+        # tree, the block hash tree writing method writes the offset of
1922+        # the salt hash tree. So that's a good indicator of whether or
1923+        # not the block hash tree has been written.
1924+        if "salt_hash_tree" in self._offsets:
1925+            raise LayoutInvalid("You must write this before the block hash tree")
1926+
1927+        self._offsets['block_hash_tree'] = self._offsets['enc_privkey'] + len(encprivkey)
1928+        datavs = [(tuple([self._offsets['enc_privkey'], encprivkey]))]
1929+        def _on_failure():
1930+            del(self._offsets['block_hash_tree'])
1931+        return self._write(datavs, on_failure=_on_failure)
1932+
1933+
1934+    def put_blockhashes(self, blockhashes):
1935+        """
1936+        Put the block hash tree in the remote slot.
1937+
1938+        The encrypted private key must be put before the block hash
1939+        tree, since we need to know how large it is to know where the
1940+        block hash tree should go. The block hash tree must be put
1941+        before the salt hash tree, since its size determines the
1942+        offset of the share hash chain.
1943+        """
1944+        assert self._offsets
1945+        assert isinstance(blockhashes, list)
1946+        if "block_hash_tree" not in self._offsets:
1947+            raise LayoutInvalid("You must put the encrypted private key "
1948+                                "before you put the block hash tree")
1949+        # If written, the share hash chain causes the signature offset
1950+        # to be defined.
1951+        if "share_hash_chain" in self._offsets:
1952+            raise LayoutInvalid("You must put the block hash tree before "
1953+                                "you put the salt hash tree")
1954+        blockhashes_s = "".join(blockhashes)
1955+        self._offsets['salt_hash_tree'] = self._offsets['block_hash_tree'] + len(blockhashes_s)
1956+        datavs = []
1957+        datavs.append(tuple([self._offsets['block_hash_tree'], blockhashes_s]))
1958+        def _on_failure():
1959+            del(self._offsets['salt_hash_tree'])
1960+        return self._write(datavs, on_failure=_on_failure)
1961+
1962+
1963+    def put_salthashes(self, salthashes):
1964+        """
1965+        Put the salt hash tree in the remote slot.
1966+
1967+        The block hash tree must be put before the salt hash tree, since
1968+        its size tells us where we need to put the salt hash tree. This
1969+        method must be called before the share hash chain can be
1970+        uploaded, since the size of the salt hash tree tells us where
1971+        the share hash chain can go
1972+        """
1973+        assert self._offsets
1974+        assert isinstance(salthashes, list)
1975+        if "salt_hash_tree" not in self._offsets:
1976+            raise LayoutInvalid("You must put the block hash tree "
1977+                                "before putting the salt hash tree")
1978+        if "signature" in self._offsets:
1979+            raise LayoutInvalid("You must put the salt hash tree "
1980+                                "before you put the share hash chain")
1981+        # The root of the salt hash tree is at index 0. We'll write this when
1982+        # we put the root hash later; we just keep track of it for now.
1983+        self._salt_hash = salthashes[0]
1984+        salthashes_s = "".join(salthashes[1:])
1985+        self._offsets['share_hash_chain'] = self._offsets['salt_hash_tree'] + len(salthashes_s)
1986+        datavs = []
1987+        datavs.append(tuple([self._offsets['salt_hash_tree'], salthashes_s]))
1988+        def _on_failure():
1989+            del(self._offsets['share_hash_chain'])
1990+        return self._write(datavs, on_failure=_on_failure)
1991+
1992+
1993+    def put_sharehashes(self, sharehashes):
1994+        """
1995+        Put the share hash chain in the remote slot.
1996+
1997+        The salt hash tree must be put before the share hash chain,
1998+        since we need to know where the salt hash tree ends before we
1999+        can know where the share hash chain starts. The share hash chain
2000+        must be put before the signature, since the length of the packed
2001+        share hash chain determines the offset of the signature. Also,
2002+        semantically, you must know what the root of the salt hash tree
2003+        is before you can generate a valid signature.
2004+        """
2005+        assert isinstance(sharehashes, dict)
2006+        if "share_hash_chain" not in self._offsets:
2007+            raise LayoutInvalid("You need to put the salt hash tree before "
2008+                                "you can put the share hash chain")
2009+        # The signature comes after the share hash chain. If the
2010+        # signature has already been written, we must not write another
2011+        # share hash chain. The signature writes the verification key
2012+        # offset when it gets sent to the remote server, so we look for
2013+        # that.
2014+        if "verification_key" in self._offsets:
2015+            raise LayoutInvalid("You must write the share hash chain "
2016+                                "before you write the signature")
2017+        datavs = []
2018+        sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i])
2019+                                  for i in sorted(sharehashes.keys())])
2020+        self._offsets['signature'] = self._offsets['share_hash_chain'] + len(sharehashes_s)
2021+        datavs.append(tuple([self._offsets['share_hash_chain'], sharehashes_s]))
2022+        def _on_failure():
2023+            del(self._offsets['signature'])
2024+        return self._write(datavs, on_failure=_on_failure)
2025+
2026+
2027+    def put_root_hash(self, roothash):
2028+        """
2029+        Put the root hash (the root of the share hash tree) in the
2030+        remote slot.
2031+        """
2032+        # It does not make sense to be able to put the root
2033+        # hash without first putting the share hashes, since you need
2034+        # the share hashes to generate the root hash.
2035+        #
2036+        # Signature is defined by the routine that places the share hash
2037+        # chain, so it's a good thing to look for in finding out whether
2038+        # or not the share hash chain exists on the remote server.
2039+        if "signature" not in self._offsets:
2040+            raise LayoutInvalid("You need to put the share hash chain "
2041+                                "before you can put the root share hash")
2042+        if len(roothash) != HASH_SIZE:
2043+            raise LayoutInvalid("hashes and salts must be exactly %d bytes"
2044+                                 % HASH_SIZE)
2045+        datavs = []
2046+        self._root_hash = roothash
2047+        # To write both of these values, we update the checkstring on
2048+        # the remote server, which includes them
2049+        checkstring = self.get_checkstring()
2050+        datavs.append(tuple([0, checkstring]))
2051+        # This write, if successful, changes the checkstring, so we need
2052+        # to update our internal checkstring to be consistent with the
2053+        # one on the server.
2054+        def _on_success():
2055+            self._testvs = [(0, len(checkstring), "eq", checkstring)]
2056+        def _on_failure():
2057+            self._root_hash = None
2058+            self._salt_hash = None
2059+        return self._write(datavs,
2060+                           on_success=_on_success,
2061+                           on_failure=_on_failure)
2062+
2063+
2064+    def get_signable(self):
2065+        """
2066+        Get the first eight fields of the mutable file; the parts that
2067+        are signed.
2068+        """
2069+        if not self._root_hash or not self._salt_hash:
2070+            raise LayoutInvalid("You need to set the root hash and the "
2071+                                "salt hash before getting something to "
2072+                                "sign")
2073+        return struct.pack(MDMFSIGNABLEHEADER,
2074+                           1,
2075+                           self._seqnum,
2076+                           self._root_hash,
2077+                           self._salt_hash,
2078+                           self._required_shares,
2079+                           self._total_shares,
2080+                           self._segment_size,
2081+                           self._data_length)
2082+
2083+
2084+    def put_signature(self, signature):
2085+        """
2086+        Put the signature field to the remote slot.
2087+
2088+        I require that the root hash and share hash chain have been put
2089+        to the grid before I will write the signature to the grid.
2090+        """
2091+        if "signature" not in self._offsets:
2092+            raise LayoutInvalid("You must put the share hash chain "
2093+        # It does not make sense to put a signature without first
2094+        # putting the root hash and the salt hash (since otherwise
2095+        # the signature would be incomplete), so we don't allow that.
2096+                       "before putting the signature")
2097+        if not self._root_hash:
2098+            raise LayoutInvalid("You must complete the signed prefix "
2099+                                "before computing a signature")
2100+        # If we put the signature after we put the verification key, we
2101+        # could end up running into the verification key, and will
2102+        # probably screw up the offsets as well. So we don't allow that.
2103+        # The method that writes the verification key defines the EOF
2104+        # offset before writing the verification key, so look for that.
2105+        if "EOF" in self._offsets:
2106+            raise LayoutInvalid("You must write the signature before the verification key")
2107+
2108+        self._offsets['verification_key'] = self._offsets['signature'] + len(signature)
2109+        datavs = []
2110+        datavs.append(tuple([self._offsets['signature'], signature]))
2111+        def _on_failure():
2112+            del(self._offsets['verification_key'])
2113+        return self._write(datavs, on_failure=_on_failure)
2114+
2115+
2116+    def put_verification_key(self, verification_key):
2117+        """
2118+        Put the verification key into the remote slot.
2119+
2120+        I require that the signature have been written to the storage
2121+        server before I allow the verification key to be written to the
2122+        remote server.
2123+        """
2124+        if "verification_key" not in self._offsets:
2125+            raise LayoutInvalid("You must put the signature before you "
2126+                                "can put the verification key")
2127+        self._offsets['EOF'] = self._offsets['verification_key'] + len(verification_key)
2128+        datavs = []
2129+        datavs.append(tuple([self._offsets['verification_key'], verification_key]))
2130+        def _on_failure():
2131+            del(self._offsets['EOF'])
2132+        return self._write(datavs, on_failure=_on_failure)
2133+
2134+
2135+    def finish_publishing(self):
2136+        """
2137+        Write the offset table and encoding parameters to the remote
2138+        slot, since that's the only thing we have yet to publish at this
2139+        point.
2140+        """
2141+        if "EOF" not in self._offsets:
2142+            raise LayoutInvalid("You must put the verification key before "
2143+                                "you can publish the offsets")
2144+        offsets_offset = struct.calcsize(MDMFHEADERWITHOUTOFFSETS)
2145+        offsets = struct.pack(MDMFOFFSETS,
2146+                              self._offsets['share-data'],
2147+                              self._offsets['enc_privkey'],
2148+                              self._offsets['block_hash_tree'],
2149+                              self._offsets['salt_hash_tree'],
2150+                              self._offsets['share_hash_chain'],
2151+                              self._offsets['signature'],
2152+                              self._offsets['verification_key'],
2153+                              self._offsets['EOF'])
2154+        datavs = []
2155+        datavs.append(tuple([offsets_offset, offsets]))
2156+        encoding_parameters_offset = struct.calcsize(MDMFCHECKSTRING)
2157+        params = struct.pack(">BBQQ",
2158+                             self._required_shares,
2159+                             self._total_shares,
2160+                             self._segment_size,
2161+                             self._data_length)
2162+        datavs.append(tuple([encoding_parameters_offset, params]))
2163+        return self._write(datavs)
2164+
2165+
2166+    def _write(self, datavs, on_failure=None, on_success=None):
2167+        """I write the data vectors in datavs to the remote slot."""
2168+        tw_vectors = {}
2169+        new_share = False
2170+        if not self._testvs:
2171+            self._testvs = []
2172+            self._testvs.append(tuple([0, 1, "eq", ""]))
2173+            new_share = True
2174+        if not self._written:
2175+            # Write a new checkstring to the share when we write it, so
2176+            # that we have something to check later.
2177+            new_checkstring = self.get_checkstring()
2178+            datavs.append((0, new_checkstring))
2179+            def _first_write():
2180+                self._written = True
2181+                self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)]
2182+            on_success = _first_write
2183+        tw_vectors[self._shnum] = (self._testvs, datavs, None)
2184+        datalength = sum([len(x[1]) for x in datavs])
2185+        d = self._rref.callRemote("slot_testv_and_readv_and_writev",
2186+                                  self._storage_index,
2187+                                  self._secrets,
2188+                                  tw_vectors,
2189+                                  self._readv)
2190+        def _result(results):
2191+            if isinstance(results, failure.Failure) or not results[0]:
2192+                # Do nothing; the write was unsuccessful.
2193+                if on_failure: on_failure()
2194+            else:
2195+                if on_success: on_success()
2196+            return results
2197+        d.addCallback(_result)
2198+        return d
2199+
2200+
2201+class MDMFSlotReadProxy:
2202+    """
2203+    I read from a mutable slot filled with data written in the MDMF data
2204+    format (which is described above).
2205+
2206+    I can be initialized with some amount of data, which I will use (if
2207+    it is valid) to eliminate some of the need to fetch it from servers.
2208+    """
2209+    def __init__(self,
2210+                 rref,
2211+                 storage_index,
2212+                 shnum,
2213+                 data=""):
2214+        # Start the initialization process.
2215+        self._rref = rref
2216+        self._storage_index = storage_index
2217+        self.shnum = shnum
2218+
2219+        # Before doing anything, the reader is probably going to want to
2220+        # verify that the signature is correct. To do that, they'll need
2221+        # the verification key, and the signature. To get those, we'll
2222+        # need the offset table. So fetch the offset table on the
2223+        # assumption that that will be the first thing that a reader is
2224+        # going to do.
2225+
2226+        # The fact that these encoding parameters are None tells us
2227+        # that we haven't yet fetched them from the remote share, so we
2228+        # should. We could just not set them, but the checks will be
2229+        # easier to read if we don't have to use hasattr.
2230+        self._version_number = None
2231+        self._sequence_number = None
2232+        self._root_hash = None
2233+        self._salt_hash = None
2234+        self._salt = None
2235+        self._required_shares = None
2236+        self._total_shares = None
2237+        self._segment_size = None
2238+        self._data_length = None
2239+        self._offsets = None
2240+
2241+        # If the user has chosen to initialize us with some data, we'll
2242+        # try to satisfy subsequent data requests with that data before
2243+        # asking the storage server for it. If
2244+        self._data = data
2245+        # The way callers interact with cache in the filenode returns
2246+        # None if there isn't any cached data, but the way we index the
2247+        # cached data requires a string, so convert None to "".
2248+        if self._data == None:
2249+            self._data = ""
2250+
2251+        self._queue_observers = observer.ObserverList()
2252+        self._queue_errbacks = observer.ObserverList()
2253+        self._readvs = []
2254+
2255+
2256+    def _maybe_fetch_offsets_and_header(self, force_remote=False):
2257+        """
2258+        I fetch the offset table and the header from the remote slot if
2259+        I don't already have them. If I do have them, I do nothing and
2260+        return an empty Deferred.
2261+        """
2262+        if self._offsets:
2263+            return defer.succeed(None)
2264+        # At this point, we may be either SDMF or MDMF. Fetching 91
2265+        # bytes will be enough to get information for both SDMF and
2266+        # MDMF, though we'll be left with about 20 more bytes than we
2267+        # need if this ends up being SDMF. We could just fetch the first
2268+        # byte, which would save the extra bytes at the cost of an
2269+        # additional roundtrip after we parse the result.
2270+        readvs = [(0, 91)]
2271+        d = self._read(readvs, force_remote)
2272+        d.addCallback(self._process_encoding_parameters)
2273+
2274+        # Now, we have the encoding parameters, which will tell us
2275+        # where we need to look for the offset table.
2276+        def _fetch_offsets(ignored):
2277+            if self._version_number == 0:
2278+                # In SDMF, the offset table starts at byte 75, and
2279+                # extends for 32 bytes
2280+                readv = [(75, 32)] # struct.calcsize(">LLLLQQ") == 32
2281+
2282+            elif self._version_number == 1:
2283+                # In MDMF, the offset table starts at byte 91 and
2284+                # extends for 60 bytes
2285+                readv = [(91, 60)] # struct.calcsize(">LQQQQQQQ") == 60
2286+            else:
2287+                raise LayoutInvalid("I only understand SDMF and MDMF")
2288+            return readv
2289+
2290+        d.addCallback(_fetch_offsets)
2291+        d.addCallback(lambda readv:
2292+            self._read(readv))
2293+        d.addCallback(self._process_offsets)
2294+        return d
2295+
2296+
2297+    def _process_encoding_parameters(self, encoding_parameters):
2298+        assert self.shnum in encoding_parameters
2299+        encoding_parameters = encoding_parameters[self.shnum][0]
2300+        # The first byte is the version number. It will tell us what
2301+        # to do next.
2302+        (verno,) = struct.unpack(">B", encoding_parameters[:1])
2303+        if verno == MDMF_VERSION:
2304+            (verno,
2305+             seqnum,
2306+             root_hash,
2307+             salt_hash,
2308+             k,
2309+             n,
2310+             segsize,
2311+             datalen) = struct.unpack(MDMFHEADERWITHOUTOFFSETS,
2312+                                      encoding_parameters)
2313+            self._salt_hash = salt_hash
2314+            if segsize == 0 and datalen == 0:
2315+                # Empty file, no segments.
2316+                self._num_segments = 0
2317+            else:
2318+                self._num_segments = mathutil.div_ceil(datalen, segsize)
2319+
2320+        elif verno == SDMF_VERSION:
2321+            (verno,
2322+             seqnum,
2323+             root_hash,
2324+             salt,
2325+             k,
2326+             n,
2327+             segsize,
2328+             datalen) = struct.unpack(">BQ32s16s BBQQ",
2329+                                      encoding_parameters[:75])
2330+            self._salt = salt
2331+            if segsize == 0 and datalen == 0:
2332+                # empty file
2333+                self._num_segments = 0
2334+            else:
2335+                # non-empty SDMF files have one segment.
2336+                self._num_segments = 1
2337+        else:
2338+            raise UnknownVersionError("You asked me to read mutable file "
2339+                                      "version %d, but I only understand "
2340+                                      "%d and %d" % (verno, SDMF_VERSION,
2341+                                                     MDMF_VERSION))
2342+
2343+        self._version_number = verno
2344+        self._sequence_number = seqnum
2345+        self._root_hash = root_hash
2346+        self._required_shares = k
2347+        self._total_shares = n
2348+        self._segment_size = segsize
2349+        self._data_length = datalen
2350+
2351+        self._block_size = self._segment_size / self._required_shares
2352+        # We can upload empty files, and need to account for this fact
2353+        # so as to avoid zero-division and zero-modulo errors.
2354+        if datalen > 0:
2355+            tail_size = self._data_length % self._segment_size
2356+        else:
2357+            tail_size = 0
2358+        if not tail_size:
2359+            self._tail_block_size = self._block_size
2360+        else:
2361+            self._tail_block_size = mathutil.next_multiple(tail_size,
2362+                                                    self._required_shares)
2363+            self._tail_block_size /= self._required_shares
2364+
2365+
2366+    def _process_offsets(self, offsets):
2367+        assert self.shnum in offsets
2368+        offsets = offsets[self.shnum][0]
2369+        if self._version_number == 0:
2370+            (signature,
2371+             share_hash_chain,
2372+             block_hash_tree,
2373+             share_data,
2374+             enc_privkey,
2375+             EOF) = struct.unpack(">LLLLQQ", offsets)
2376+            self._offsets = {}
2377+            self._offsets['signature'] = signature
2378+            self._offsets['share_data'] = share_data
2379+            self._offsets['block_hash_tree'] = block_hash_tree
2380+            self._offsets['share_hash_chain'] = share_hash_chain
2381+            self._offsets['enc_privkey'] = enc_privkey
2382+            self._offsets['EOF'] = EOF
2383+        elif self._version_number == 1:
2384+            (share_data,
2385+             encprivkey,
2386+             blockhashes,
2387+             salthashes,
2388+             sharehashes,
2389+             signature,
2390+             verification_key,
2391+             eof) = struct.unpack(MDMFOFFSETS, offsets)
2392+            self._offsets = {}
2393+            self._offsets['share_data'] = share_data
2394+            self._offsets['enc_privkey'] = encprivkey
2395+            self._offsets['block_hash_tree'] = blockhashes
2396+            self._offsets['salt_hash_tree']= salthashes
2397+            self._offsets['share_hash_chain'] = sharehashes
2398+            self._offsets['signature'] = signature
2399+            self._offsets['verification_key'] = verification_key
2400+            self._offsets['EOF'] = eof
2401+
2402+
2403+    def get_block_and_salt(self, segnum, queue=False):
2404+        """
2405+        I return (block, salt), where block is the block data and
2406+        salt is the salt used to encrypt that segment.
2407+        """
2408+        d = self._maybe_fetch_offsets_and_header()
2409+        def _then(ignored):
2410+            base_share_offset = self._offsets['share_data']
2411+            if self._version_number == 1:
2412+                base_salt_offset = struct.calcsize(MDMFHEADER)
2413+                salt_offset = base_salt_offset + SALT_SIZE * segnum
2414+            else:
2415+                salt_offset = None # no per-segment salts in SDMF
2416+            return base_share_offset, salt_offset
2417+
2418+        d.addCallback(_then)
2419+
2420+        def _calculate_share_offset(share_and_salt_offset):
2421+            base_share_offset, salt_offset = share_and_salt_offset
2422+            if segnum + 1 > self._num_segments:
2423+                raise LayoutInvalid("Not a valid segment number")
2424+
2425+            share_offset = base_share_offset + self._block_size * segnum
2426+            if segnum + 1 == self._num_segments:
2427+                data = self._tail_block_size
2428+            else:
2429+                data = self._block_size
2430+            readvs = [(share_offset, data)]
2431+            if salt_offset:
2432+                readvs.insert(0,(salt_offset, SALT_SIZE))
2433+            return readvs
2434+
2435+        d.addCallback(_calculate_share_offset)
2436+        d.addCallback(lambda readvs:
2437+            self._read(readvs, queue=queue))
2438+        def _process_results(results):
2439+            assert self.shnum in results
2440+            if self._version_number == 0:
2441+                # We only read the share data, but we know the salt from
2442+                # when we fetched the header
2443+                data = results[self.shnum]
2444+                if not data:
2445+                    data = ""
2446+                else:
2447+                    assert len(data) == 1
2448+                    data = data[0]
2449+                salt = self._salt
2450+            else:
2451+                data = results[self.shnum]
2452+                if not data:
2453+                    salt = data = ""
2454+                else:
2455+                    assert len(data) == 2
2456+                    salt, data = results[self.shnum]
2457+            return data, salt
2458+        d.addCallback(_process_results)
2459+        return d
2460+
2461+
2462+    def get_blockhashes(self, needed=None, queue=False, force_remote=False):
2463+        """
2464+        I return the block hash tree
2465+
2466+        I take an optional argument, needed, which is a set of indices
2467+        correspond to hashes that I should fetch. If this argument is
2468+        missing, I will fetch the entire block hash tree; otherwise, I
2469+        may attempt to fetch fewer hashes, based on what needed says
2470+        that I should do. Note that I may fetch as many hashes as I
2471+        want, so long as the set of hashes that I do fetch is a superset
2472+        of the ones that I am asked for, so callers should be prepared
2473+        to tolerate additional hashes.
2474+        """
2475+        # TODO: Return only the parts of the block hash tree necessary
2476+        # to validate the blocknum provided?
2477+        # This is a good idea, but it is hard to implement correctly. It
2478+        # is bad to fetch any one block hash more than once, so we
2479+        # probably just want to fetch the whole thing at once and then
2480+        # serve it.
2481+        if needed == set([]):
2482+            return defer.succeed([])
2483+        d = self._maybe_fetch_offsets_and_header()
2484+        def _then(ignored):
2485+            blockhashes_offset = self._offsets['block_hash_tree']
2486+            if self._version_number == 1:
2487+                blockhashes_length = self._offsets['salt_hash_tree'] - blockhashes_offset
2488+            else:
2489+                blockhashes_length = self._offsets['share_data'] - blockhashes_offset
2490+            readvs = [(blockhashes_offset, blockhashes_length)]
2491+            return readvs
2492+        d.addCallback(_then)
2493+        d.addCallback(lambda readvs:
2494+            self._read(readvs, queue=queue, force_remote=force_remote))
2495+        def _build_block_hash_tree(results):
2496+            assert self.shnum in results
2497+
2498+            rawhashes = results[self.shnum][0]
2499+            results = [rawhashes[i:i+HASH_SIZE]
2500+                       for i in range(0, len(rawhashes), HASH_SIZE)]
2501+            return results
2502+        d.addCallback(_build_block_hash_tree)
2503+        return d
2504+
2505+
2506+    def get_salthashes(self, needed=None, queue=False):
2507+        """
2508+        I return the salt hash tree.
2509+
2510+        I accept an optional argument, needed, which is a set of indices
2511+        corresponding to hashes that I should fetch. If this argument is
2512+        missing, I will fetch and return the entire salt hash tree.
2513+        Otherwise, I may fetch any part of the salt hash tree, so long
2514+        as the part that I fetch and return is a superset of the part
2515+        that my caller has asked for. Callers should be prepared to
2516+        tolerate this behavior.
2517+
2518+        This method is only meaningful for MDMF files, as only MDMF
2519+        files have a salt hash tree. If the remote file is an SDMF file,
2520+        this method will return False.
2521+        """
2522+        # TODO: Only get the leaves nodes implied by salthashes
2523+        if needed == set([]):
2524+            return defer.succeed([])
2525+        d = self._maybe_fetch_offsets_and_header()
2526+        def _then(ignored):
2527+            if self._version_number == 0:
2528+                return []
2529+            else:
2530+                salthashes_offset = self._offsets['salt_hash_tree']
2531+                salthashes_length = self._offsets['share_hash_chain'] - salthashes_offset
2532+                return [(salthashes_offset, salthashes_length)]
2533+        d.addCallback(_then)
2534+        def _maybe_read(readvs):
2535+            if readvs:
2536+                return self._read(readvs, queue=queue)
2537+            else:
2538+                return False
2539+        d.addCallback(_maybe_read)
2540+        def _process_results(results):
2541+            if not results:
2542+                return False
2543+            assert self.shnum in results
2544+
2545+            rawhashes = results[self.shnum][0]
2546+            results = [rawhashes[i:i+HASH_SIZE]
2547+                       for i in range(0, len(rawhashes), HASH_SIZE)]
2548+            return results
2549+        d.addCallback(_process_results)
2550+        return d
2551+
2552+
2553+    def get_sharehashes(self, needed=None, queue=False, force_remote=False):
2554+        """
2555+        I return the part of the share hash chain placed to validate
2556+        this share.
2557+
2558+        I take an optional argument, needed. Needed is a set of indices
2559+        that correspond to the hashes that I should fetch. If needed is
2560+        not present, I will fetch and return the entire share hash
2561+        chain. Otherwise, I may fetch and return any part of the share
2562+        hash chain that is a superset of the part that I am asked to
2563+        fetch. Callers should be prepared to deal with more hashes than
2564+        they've asked for.
2565+        """
2566+        if needed == set([]):
2567+            return defer.succeed([])
2568+        d = self._maybe_fetch_offsets_and_header()
2569+
2570+        def _make_readvs(ignored):
2571+            sharehashes_offset = self._offsets['share_hash_chain']
2572+            if self._version_number == 0:
2573+                sharehashes_length = self._offsets['block_hash_tree'] - sharehashes_offset
2574+            else:
2575+                sharehashes_length = self._offsets['signature'] - sharehashes_offset
2576+            readvs = [(sharehashes_offset, sharehashes_length)]
2577+            return readvs
2578+        d.addCallback(_make_readvs)
2579+        d.addCallback(lambda readvs:
2580+            self._read(readvs, queue=queue, force_remote=force_remote))
2581+        def _build_share_hash_chain(results):
2582+            assert self.shnum in results
2583+
2584+            sharehashes = results[self.shnum][0]
2585+            results = [sharehashes[i:i+(HASH_SIZE + 2)]
2586+                       for i in range(0, len(sharehashes), HASH_SIZE + 2)]
2587+            results = dict([struct.unpack(">H32s", data)
2588+                            for data in results])
2589+            return results
2590+        d.addCallback(_build_share_hash_chain)
2591+        return d
2592+
2593+
2594+    def get_encprivkey(self, queue=False):
2595+        """
2596+        I return the encrypted private key.
2597+        """
2598+        d = self._maybe_fetch_offsets_and_header()
2599+
2600+        def _make_readvs(ignored):
2601+            privkey_offset = self._offsets['enc_privkey']
2602+            if self._version_number == 0:
2603+                privkey_length = self._offsets['EOF'] - privkey_offset
2604+            else:
2605+                privkey_length = self._offsets['block_hash_tree'] - privkey_offset
2606+            readvs = [(privkey_offset, privkey_length)]
2607+            return readvs
2608+        d.addCallback(_make_readvs)
2609+        d.addCallback(lambda readvs:
2610+            self._read(readvs, queue=queue))
2611+        def _process_results(results):
2612+            assert self.shnum in results
2613+            privkey = results[self.shnum][0]
2614+            return privkey
2615+        d.addCallback(_process_results)
2616+        return d
2617+
2618+
2619+    def get_signature(self, queue=False):
2620+        """
2621+        I return the signature of my share.
2622+        """
2623+        d = self._maybe_fetch_offsets_and_header()
2624+
2625+        def _make_readvs(ignored):
2626+            signature_offset = self._offsets['signature']
2627+            if self._version_number == 1:
2628+                signature_length = self._offsets['verification_key'] - signature_offset
2629+            else:
2630+                signature_length = self._offsets['share_hash_chain'] - signature_offset
2631+            readvs = [(signature_offset, signature_length)]
2632+            return readvs
2633+        d.addCallback(_make_readvs)
2634+        d.addCallback(lambda readvs:
2635+            self._read(readvs, queue=queue))
2636+        def _process_results(results):
2637+            assert self.shnum in results
2638+            signature = results[self.shnum][0]
2639+            return signature
2640+        d.addCallback(_process_results)
2641+        return d
2642+
2643+
2644+    def get_verification_key(self, queue=False):
2645+        """
2646+        I return the verification key.
2647+        """
2648+        d = self._maybe_fetch_offsets_and_header()
2649+
2650+        def _make_readvs(ignored):
2651+            if self._version_number == 1:
2652+                vk_offset = self._offsets['verification_key']
2653+                vk_length = self._offsets['EOF'] - vk_offset
2654+            else:
2655+                vk_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
2656+                vk_length = self._offsets['signature'] - vk_offset
2657+            readvs = [(vk_offset, vk_length)]
2658+            return readvs
2659+        d.addCallback(_make_readvs)
2660+        d.addCallback(lambda readvs:
2661+            self._read(readvs, queue=queue))
2662+        def _process_results(results):
2663+            assert self.shnum in results
2664+            verification_key = results[self.shnum][0]
2665+            return verification_key
2666+        d.addCallback(_process_results)
2667+        return d
2668+
2669+
2670+    def get_encoding_parameters(self):
2671+        """
2672+        I return (k, n, segsize, datalen)
2673+        """
2674+        d = self._maybe_fetch_offsets_and_header()
2675+        d.addCallback(lambda ignored:
2676+            (self._required_shares,
2677+             self._total_shares,
2678+             self._segment_size,
2679+             self._data_length))
2680+        return d
2681+
2682+
2683+    def get_seqnum(self):
2684+        """
2685+        I return the sequence number for this share.
2686+        """
2687+        d = self._maybe_fetch_offsets_and_header()
2688+        d.addCallback(lambda ignored:
2689+            self._sequence_number)
2690+        return d
2691+
2692+
2693+    def get_root_hash(self):
2694+        """
2695+        I return the root of the block hash tree
2696+        """
2697+        d = self._maybe_fetch_offsets_and_header()
2698+        d.addCallback(lambda ignored: self._root_hash)
2699+        return d
2700+
2701+
2702+    def get_salt_hash(self):
2703+        """
2704+        I return the flat salt hash
2705+        """
2706+        d = self._maybe_fetch_offsets_and_header()
2707+        d.addCallback(lambda ignored: self._salt_hash)
2708+        return d
2709+
2710+
2711+    def get_checkstring(self):
2712+        """
2713+        I return the packed representation of the following:
2714+
2715+            - version number
2716+            - sequence number
2717+            - root hash
2718+            - salt hash
2719+
2720+        which my users use as a checkstring to detect other writers.
2721+        """
2722+        d = self._maybe_fetch_offsets_and_header()
2723+        def _build_checkstring(ignored):
2724+            if self._salt_hash:
2725+                checkstring = struct.pack(MDMFCHECKSTRING,
2726+                                          self._version_number,
2727+                                          self._sequence_number,
2728+                                          self._root_hash,
2729+                                          self._salt_hash)
2730+            else:
2731+                checkstring = strut.pack(PREFIX,
2732+                                         self._version_number,
2733+                                         self._sequence_number,
2734+                                         self._root_hash,
2735+                                         self._salt)
2736+            return checkstring
2737+        d.addCallback(_build_checkstring)
2738+        return d
2739+
2740+
2741+    def get_prefix(self, force_remote):
2742+        d = self._maybe_fetch_offsets_and_header(force_remote)
2743+        d.addCallback(lambda ignored:
2744+            self._build_prefix())
2745+        return d
2746+
2747+
2748+    def _build_prefix(self):
2749+        # The prefix is another name for the part of the remote share
2750+        # that gets signed. It consists of everything up to and
2751+        # including the datalength, packed by struct.
2752+        if self._version_number == SDMF_VERSION:
2753+            format_string = SIGNED_PREFIX
2754+            salt_to_use = self._salt
2755+        else:
2756+            format_string = MDMFSIGNABLEHEADER
2757+            salt_to_use = self._salt_hash
2758+        return struct.pack(format_string,
2759+                           self._version_number,
2760+                           self._sequence_number,
2761+                           self._root_hash,
2762+                           salt_to_use,
2763+                           self._required_shares,
2764+                           self._total_shares,
2765+                           self._segment_size,
2766+                           self._data_length)
2767+
2768+
2769+    def _get_offsets_tuple(self):
2770+        # The offsets tuple is another component of the version
2771+        # information tuple. It is basically our offsets dictionary,
2772+        # itemized and in a tuple.
2773+        return self._offsets.copy()
2774+
2775+
2776+    def get_verinfo(self):
2777+        """
2778+        I return my verinfo tuple. This is used by the ServermapUpdater
2779+        to keep track of versions of mutable files.
2780+
2781+        The verinfo tuple for MDMF files contains:
2782+            - seqnum
2783+            - root hash
2784+            - salt hash
2785+            - segsize
2786+            - datalen
2787+            - k
2788+            - n
2789+            - prefix (the thing that you sign)
2790+            - a tuple of offsets
2791+
2792+        The verinfo tuple for SDMF files is the same, but contains a
2793+        16-byte IV instead of a hash of salts.
2794+        """
2795+        d = self._maybe_fetch_offsets_and_header()
2796+        def _build_verinfo(ignored):
2797+            if self._version_number == SDMF_VERSION:
2798+                salt_to_use = self._salt
2799+            else:
2800+                salt_to_use = self._salt_hash
2801+            return (self._sequence_number,
2802+                    self._root_hash,
2803+                    salt_to_use,
2804+                    self._segment_size,
2805+                    self._data_length,
2806+                    self._required_shares,
2807+                    self._total_shares,
2808+                    self._build_prefix(),
2809+                    self._get_offsets_tuple())
2810+        d.addCallback(_build_verinfo)
2811+        return d
2812+
2813+
2814+    def flush(self):
2815+        """
2816+        I flush my queue of read vectors.
2817+        """
2818+        d = self._read(self._readvs)
2819+        def _then(results):
2820+            self._readvs = []
2821+            if isinstance(results, failure.Failure):
2822+                self._queue_errbacks.notify(results)
2823+            else:
2824+                self._queue_observers.notify(results)
2825+            self._queue_observers = observer.ObserverList()
2826+            self._queue_errbacks = observer.ObserverList()
2827+        d.addBoth(_then)
2828+
2829+
2830+    def _read(self, readvs, force_remote=False, queue=False):
2831+        unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
2832+        # TODO: It's entirely possible to tweak this so that it just
2833+        # fulfills the requests that it can, and not demand that all
2834+        # requests are satisfiable before running it.
2835+        if not unsatisfiable and not force_remote:
2836+            results = [self._data[offset:offset+length]
2837+                       for (offset, length) in readvs]
2838+            results = {self.shnum: results}
2839+            return defer.succeed(results)
2840+        else:
2841+            if queue:
2842+                start = len(self._readvs)
2843+                self._readvs += readvs
2844+                end = len(self._readvs)
2845+                def _get_results(results, start, end):
2846+                    if not self.shnum in results:
2847+                        return {self._shnum: [""]}
2848+                    return {self.shnum: results[self.shnum][start:end]}
2849+                d = defer.Deferred()
2850+                d.addCallback(_get_results, start, end)
2851+                self._queue_observers.subscribe(d.callback)
2852+                self._queue_errbacks.subscribe(d.errback)
2853+                return d
2854+            return self._rref.callRemote("slot_readv",
2855+                                         self._storage_index,
2856+                                         [self.shnum],
2857+                                         readvs)
2858+
2859+
2860+    def is_sdmf(self):
2861+        """I tell my caller whether or not my remote file is SDMF or MDMF
2862+        """
2863+        d = self._maybe_fetch_offsets_and_header()
2864+        d.addCallback(lambda ignored:
2865+            self._version_number == 0)
2866+        return d
2867+
2868+
2869+class LayoutInvalid(Exception):
2870+    """
2871+    This isn't a valid MDMF mutable file
2872+    """
2873hunk ./src/allmydata/test/test_storage.py 2
2874 
2875-import time, os.path, stat, re, simplejson, struct
2876+import time, os.path, stat, re, simplejson, struct, shutil
2877 
2878 from twisted.trial import unittest
2879 
2880hunk ./src/allmydata/test/test_storage.py 22
2881 from allmydata.storage.expirer import LeaseCheckingCrawler
2882 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
2883      ReadBucketProxy
2884-from allmydata.interfaces import BadWriteEnablerError
2885-from allmydata.test.common import LoggingServiceParent
2886+from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
2887+                                     LayoutInvalid, MDMFSIGNABLEHEADER, \
2888+                                     SIGNED_PREFIX, MDMFHEADER
2889+from allmydata.interfaces import BadWriteEnablerError, MDMF_VERSION, \
2890+                                 SDMF_VERSION
2891+from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
2892 from allmydata.test.common_web import WebRenderingMixin
2893 from allmydata.web.storage import StorageStatus, remove_prefix
2894 
2895hunk ./src/allmydata/test/test_storage.py 105
2896 
2897 class RemoteBucket:
2898 
2899+    def __init__(self):
2900+        self.read_count = 0
2901+        self.write_count = 0
2902+
2903     def callRemote(self, methname, *args, **kwargs):
2904         def _call():
2905             meth = getattr(self.target, "remote_" + methname)
2906hunk ./src/allmydata/test/test_storage.py 113
2907             return meth(*args, **kwargs)
2908+
2909+        if methname == "slot_readv":
2910+            self.read_count += 1
2911+        if methname == "slot_writev":
2912+            self.write_count += 1
2913+
2914         return defer.maybeDeferred(_call)
2915 
2916hunk ./src/allmydata/test/test_storage.py 121
2917+
2918 class BucketProxy(unittest.TestCase):
2919     def make_bucket(self, name, size):
2920         basedir = os.path.join("storage", "BucketProxy", name)
2921hunk ./src/allmydata/test/test_storage.py 1298
2922         self.failUnless(os.path.exists(prefixdir), prefixdir)
2923         self.failIf(os.path.exists(bucketdir), bucketdir)
2924 
2925+
2926+class MDMFProxies(unittest.TestCase, ShouldFailMixin):
2927+    def setUp(self):
2928+        self.sparent = LoggingServiceParent()
2929+        self._lease_secret = itertools.count()
2930+        self.ss = self.create("MDMFProxies storage test server")
2931+        self.rref = RemoteBucket()
2932+        self.rref.target = self.ss
2933+        self.secrets = (self.write_enabler("we_secret"),
2934+                        self.renew_secret("renew_secret"),
2935+                        self.cancel_secret("cancel_secret"))
2936+        self.segment = "aaaaaa"
2937+        self.block = "aa"
2938+        self.salt = "a" * 16
2939+        self.block_hash = "a" * 32
2940+        self.block_hash_tree = [self.block_hash for i in xrange(6)]
2941+        self.share_hash = self.block_hash
2942+        self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
2943+        self.signature = "foobarbaz"
2944+        self.verification_key = "vvvvvv"
2945+        self.encprivkey = "private"
2946+        self.root_hash = self.block_hash
2947+        self.salt_hash = self.root_hash
2948+        self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
2949+        self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
2950+        self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
2951+        # blockhashes and salt hashes are serialized in the same way,
2952+        # only we lop off the first element and store that in the
2953+        # header.
2954+        self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
2955+
2956+
2957+    def tearDown(self):
2958+        self.sparent.stopService()
2959+        shutil.rmtree(self.workdir("MDMFProxies storage test server"))
2960+
2961+
2962+    def write_enabler(self, we_tag):
2963+        return hashutil.tagged_hash("we_blah", we_tag)
2964+
2965+
2966+    def renew_secret(self, tag):
2967+        return hashutil.tagged_hash("renew_blah", str(tag))
2968+
2969+
2970+    def cancel_secret(self, tag):
2971+        return hashutil.tagged_hash("cancel_blah", str(tag))
2972+
2973+
2974+    def workdir(self, name):
2975+        basedir = os.path.join("storage", "MutableServer", name)
2976+        return basedir
2977+
2978+
2979+    def create(self, name):
2980+        workdir = self.workdir(name)
2981+        ss = StorageServer(workdir, "\x00" * 20)
2982+        ss.setServiceParent(self.sparent)
2983+        return ss
2984+
2985+
2986+    def build_test_mdmf_share(self, tail_segment=False, empty=False):
2987+        # Start with the checkstring
2988+        data = struct.pack(">BQ32s32s",
2989+                           1,
2990+                           0,
2991+                           self.root_hash,
2992+                           self.salt_hash)
2993+        self.checkstring = data
2994+        # Next, the encoding parameters
2995+        if tail_segment:
2996+            data += struct.pack(">BBQQ",
2997+                                3,
2998+                                10,
2999+                                6,
3000+                                33)
3001+        elif empty:
3002+            data += struct.pack(">BBQQ",
3003+                                3,
3004+                                10,
3005+                                0,
3006+                                0)
3007+        else:
3008+            data += struct.pack(">BBQQ",
3009+                                3,
3010+                                10,
3011+                                6,
3012+                                36)
3013+        # Now we'll build the offsets.
3014+        # The header -- everything up to the salts -- is 143 bytes long.
3015+        # The shares come after the salts.
3016+        if empty:
3017+            salts = ""
3018+        else:
3019+            salts = self.salt * 6
3020+        share_offset = 151 + len(salts)
3021+        if tail_segment:
3022+            sharedata = self.block * 6
3023+        elif empty:
3024+            sharedata = ""
3025+        else:
3026+            sharedata = self.block * 6 + "a"
3027+        # The encrypted private key comes after the shares
3028+        encrypted_private_key_offset = share_offset + len(sharedata)
3029+        # The blockhashes come after the private key
3030+        blockhashes_offset = encrypted_private_key_offset + len(self.encprivkey)
3031+        # The salthashes come after the blockhashes
3032+        salthashes_offset = blockhashes_offset + len(self.block_hash_tree_s)
3033+        # The sharehashes come after the salt hashes
3034+        sharehashes_offset = salthashes_offset + len(self.salt_hash_tree_s)
3035+        # The signature comes after the share hash chain
3036+        signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
3037+        # The verification key comes after the signature
3038+        verification_offset = signature_offset + len(self.signature)
3039+        # The EOF comes after the verification key
3040+        eof_offset = verification_offset + len(self.verification_key)
3041+        data += struct.pack(">LQQQQQQQ",
3042+                            share_offset,
3043+                            encrypted_private_key_offset,
3044+                            blockhashes_offset,
3045+                            salthashes_offset,
3046+                            sharehashes_offset,
3047+                            signature_offset,
3048+                            verification_offset,
3049+                            eof_offset)
3050+        self.offsets = {}
3051+        self.offsets['share_data'] = share_offset
3052+        self.offsets['enc_privkey'] = encrypted_private_key_offset
3053+        self.offsets['block_hash_tree'] = blockhashes_offset
3054+        self.offsets['salt_hash_tree'] = salthashes_offset
3055+        self.offsets['share_hash_chain'] = sharehashes_offset
3056+        self.offsets['signature'] = signature_offset
3057+        self.offsets['verification_key'] = verification_offset
3058+        self.offsets['EOF'] = eof_offset
3059+        # Next, we'll add in the salts,
3060+        data += salts
3061+        # the share data,
3062+        data += sharedata
3063+        # the private key,
3064+        data += self.encprivkey
3065+        # the block hash tree,
3066+        data += self.block_hash_tree_s
3067+        # the salt hash tree
3068+        data += self.salt_hash_tree_s
3069+        # the share hash chain,
3070+        data += self.share_hash_chain_s
3071+        # the signature,
3072+        data += self.signature
3073+        # and the verification key
3074+        data += self.verification_key
3075+        return data
3076+
3077+
3078+    def write_test_share_to_server(self,
3079+                                   storage_index,
3080+                                   tail_segment=False,
3081+                                   empty=False):
3082+        """
3083+        I write some data for the read tests to read to self.ss
3084+
3085+        If tail_segment=True, then I will write a share that has a
3086+        smaller tail segment than other segments.
3087+        """
3088+        write = self.ss.remote_slot_testv_and_readv_and_writev
3089+        data = self.build_test_mdmf_share(tail_segment, empty)
3090+        # Finally, we write the whole thing to the storage server in one
3091+        # pass.
3092+        testvs = [(0, 1, "eq", "")]
3093+        tws = {}
3094+        tws[0] = (testvs, [(0, data)], None)
3095+        readv = [(0, 1)]
3096+        results = write(storage_index, self.secrets, tws, readv)
3097+        self.failUnless(results[0])
3098+
3099+
3100+    def build_test_sdmf_share(self, empty=False):
3101+        if empty:
3102+            sharedata = ""
3103+        else:
3104+            sharedata = self.segment * 6
3105+        blocksize = len(sharedata) / 3
3106+        block = sharedata[:blocksize]
3107+        prefix = struct.pack(">BQ32s16s BBQQ",
3108+                             0, # version,
3109+                             0,
3110+                             self.root_hash,
3111+                             self.salt,
3112+                             3,
3113+                             10,
3114+                             len(sharedata),
3115+                             len(sharedata),
3116+                            )
3117+        post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
3118+        signature_offset = post_offset + len(self.verification_key)
3119+        sharehashes_offset = signature_offset + len(self.signature)
3120+        blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
3121+        sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
3122+        encprivkey_offset = sharedata_offset + len(block)
3123+        eof_offset = encprivkey_offset + len(self.encprivkey)
3124+        offsets = struct.pack(">LLLLQQ",
3125+                              signature_offset,
3126+                              sharehashes_offset,
3127+                              blockhashes_offset,
3128+                              sharedata_offset,
3129+                              encprivkey_offset,
3130+                              eof_offset)
3131+        final_share = "".join([prefix,
3132+                           offsets,
3133+                           self.verification_key,
3134+                           self.signature,
3135+                           self.share_hash_chain_s,
3136+                           self.block_hash_tree_s,
3137+                           block,
3138+                           self.encprivkey])
3139+        self.offsets = {}
3140+        self.offsets['signature'] = signature_offset
3141+        self.offsets['share_hash_chain'] = sharehashes_offset
3142+        self.offsets['block_hash_tree'] = blockhashes_offset
3143+        self.offsets['share_data'] = sharedata_offset
3144+        self.offsets['enc_privkey'] = encprivkey_offset
3145+        self.offsets['EOF'] = eof_offset
3146+        return final_share
3147+
3148+
3149+    def write_sdmf_share_to_server(self,
3150+                                   storage_index,
3151+                                   empty=False):
3152+        # Some tests need SDMF shares to verify that we can still
3153+        # read them. This method writes one, which resembles but is not
3154+        assert self.rref
3155+        write = self.ss.remote_slot_testv_and_readv_and_writev
3156+        share = self.build_test_sdmf_share(empty)
3157+        testvs = [(0, 1, "eq", "")]
3158+        tws = {}
3159+        tws[0] = (testvs, [(0, share)], None)
3160+        readv = []
3161+        results = write(storage_index, self.secrets, tws, readv)
3162+        self.failUnless(results[0])
3163+
3164+
3165+    def test_read(self):
3166+        self.write_test_share_to_server("si1")
3167+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3168+        # Check that every method equals what we expect it to.
3169+        d = defer.succeed(None)
3170+        def _check_block_and_salt((block, salt)):
3171+            self.failUnlessEqual(block, self.block)
3172+            self.failUnlessEqual(salt, self.salt)
3173+
3174+        for i in xrange(6):
3175+            d.addCallback(lambda ignored, i=i:
3176+                mr.get_block_and_salt(i))
3177+            d.addCallback(_check_block_and_salt)
3178+
3179+        d.addCallback(lambda ignored:
3180+            mr.get_encprivkey())
3181+        d.addCallback(lambda encprivkey:
3182+            self.failUnlessEqual(self.encprivkey, encprivkey))
3183+
3184+        d.addCallback(lambda ignored:
3185+            mr.get_blockhashes())
3186+        d.addCallback(lambda blockhashes:
3187+            self.failUnlessEqual(self.block_hash_tree, blockhashes))
3188+
3189+        d.addCallback(lambda ignored:
3190+            mr.get_salthashes())
3191+        d.addCallback(lambda salthashes:
3192+            self.failUnlessEqual(self.salt_hash_tree[1:], salthashes))
3193+
3194+        d.addCallback(lambda ignored:
3195+            mr.get_sharehashes())
3196+        d.addCallback(lambda sharehashes:
3197+            self.failUnlessEqual(self.share_hash_chain, sharehashes))
3198+
3199+        d.addCallback(lambda ignored:
3200+            mr.get_signature())
3201+        d.addCallback(lambda signature:
3202+            self.failUnlessEqual(signature, self.signature))
3203+
3204+        d.addCallback(lambda ignored:
3205+            mr.get_verification_key())
3206+        d.addCallback(lambda verification_key:
3207+            self.failUnlessEqual(verification_key, self.verification_key))
3208+
3209+        d.addCallback(lambda ignored:
3210+            mr.get_seqnum())
3211+        d.addCallback(lambda seqnum:
3212+            self.failUnlessEqual(seqnum, 0))
3213+
3214+        d.addCallback(lambda ignored:
3215+            mr.get_root_hash())
3216+        d.addCallback(lambda root_hash:
3217+            self.failUnlessEqual(self.root_hash, root_hash))
3218+
3219+        d.addCallback(lambda ignored:
3220+            mr.get_salt_hash())
3221+        d.addCallback(lambda salt_hash:
3222+            self.failUnlessEqual(self.salt_hash, salt_hash))
3223+
3224+        d.addCallback(lambda ignored:
3225+            mr.get_seqnum())
3226+        d.addCallback(lambda seqnum:
3227+            self.failUnlessEqual(0, seqnum))
3228+
3229+        d.addCallback(lambda ignored:
3230+            mr.get_encoding_parameters())
3231+        def _check_encoding_parameters((k, n, segsize, datalen)):
3232+            self.failUnlessEqual(k, 3)
3233+            self.failUnlessEqual(n, 10)
3234+            self.failUnlessEqual(segsize, 6)
3235+            self.failUnlessEqual(datalen, 36)
3236+        d.addCallback(_check_encoding_parameters)
3237+
3238+        d.addCallback(lambda ignored:
3239+            mr.get_checkstring())
3240+        d.addCallback(lambda checkstring:
3241+            self.failUnlessEqual(checkstring, checkstring))
3242+        return d
3243+
3244+
3245+    def test_read_salthashes_on_sdmf_file(self):
3246+        self.write_sdmf_share_to_server("si1")
3247+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3248+        d = defer.succeed(None)
3249+        d.addCallback(lambda ignored:
3250+            mr.get_salthashes())
3251+        d.addCallback(lambda results:
3252+            self.failIf(results))
3253+        return d
3254+
3255+
3256+    def test_read_with_different_tail_segment_size(self):
3257+        self.write_test_share_to_server("si1", tail_segment=True)
3258+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3259+        d = mr.get_block_and_salt(5)
3260+        def _check_tail_segment(results):
3261+            block, salt = results
3262+            self.failUnlessEqual(len(block), 1)
3263+            self.failUnlessEqual(block, "a")
3264+        d.addCallback(_check_tail_segment)
3265+        return d
3266+
3267+
3268+    def test_get_block_with_invalid_segnum(self):
3269+        self.write_test_share_to_server("si1")
3270+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3271+        d = defer.succeed(None)
3272+        d.addCallback(lambda ignored:
3273+            self.shouldFail(LayoutInvalid, "test invalid segnum",
3274+                            None,
3275+                            mr.get_block_and_salt, 7))
3276+        return d
3277+
3278+
3279+    def test_get_encoding_parameters_first(self):
3280+        self.write_test_share_to_server("si1")
3281+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3282+        d = mr.get_encoding_parameters()
3283+        def _check_encoding_parameters((k, n, segment_size, datalen)):
3284+            self.failUnlessEqual(k, 3)
3285+            self.failUnlessEqual(n, 10)
3286+            self.failUnlessEqual(segment_size, 6)
3287+            self.failUnlessEqual(datalen, 36)
3288+        d.addCallback(_check_encoding_parameters)
3289+        return d
3290+
3291+
3292+    def test_get_seqnum_first(self):
3293+        self.write_test_share_to_server("si1")
3294+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3295+        d = mr.get_seqnum()
3296+        d.addCallback(lambda seqnum:
3297+            self.failUnlessEqual(seqnum, 0))
3298+        return d
3299+
3300+
3301+    def test_get_root_hash_first(self):
3302+        self.write_test_share_to_server("si1")
3303+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3304+        d = mr.get_root_hash()
3305+        d.addCallback(lambda root_hash:
3306+            self.failUnlessEqual(root_hash, self.root_hash))
3307+        return d
3308+
3309+
3310+    def test_get_salt_hash_first(self):
3311+        self.write_test_share_to_server("si1")
3312+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3313+        d = mr.get_salt_hash()
3314+        d.addCallback(lambda salt_hash:
3315+            self.failUnlessEqual(salt_hash, self.salt_hash))
3316+        return d
3317+
3318+
3319+    def test_get_checkstring_first(self):
3320+        self.write_test_share_to_server("si1")
3321+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3322+        d = mr.get_checkstring()
3323+        d.addCallback(lambda checkstring:
3324+            self.failUnlessEqual(checkstring, self.checkstring))
3325+        return d
3326+
3327+
3328+    def test_write_read_vectors(self):
3329+        # When writing for us, the storage server will return to us a
3330+        # read vector, along with its result. If a write fails because
3331+        # the test vectors failed, this read vector can help us to
3332+        # diagnose the problem. This test ensures that the read vector
3333+        # is working appropriately.
3334+        mw = self._make_new_mw("si1", 0)
3335+        d = defer.succeed(None)
3336+
3337+        # Write one share. This should return a checkstring of nothing,
3338+        # since there is no data there.
3339+        d.addCallback(lambda ignored:
3340+            mw.put_block(self.block, 0, self.salt))
3341+        def _check_first_write(results):
3342+            result, readvs = results
3343+            self.failUnless(result)
3344+            self.failIf(readvs)
3345+        d.addCallback(_check_first_write)
3346+        # Now, there should be a different checkstring returned when
3347+        # we write other shares
3348+        d.addCallback(lambda ignored:
3349+            mw.put_block(self.block, 1, self.salt))
3350+        def _check_next_write(results):
3351+            result, readvs = results
3352+            self.failUnless(result)
3353+            self.expected_checkstring = mw.get_checkstring()
3354+            self.failUnlessIn(0, readvs)
3355+            self.failUnlessEqual(readvs[0][0], self.expected_checkstring)
3356+        d.addCallback(_check_next_write)
3357+        # Add the other four shares
3358+        for i in xrange(2, 6):
3359+            d.addCallback(lambda ignored, i=i:
3360+                mw.put_block(self.block, i, self.salt))
3361+            d.addCallback(_check_next_write)
3362+        # Add the encrypted private key
3363+        d.addCallback(lambda ignored:
3364+            mw.put_encprivkey(self.encprivkey))
3365+        d.addCallback(_check_next_write)
3366+        # Add the block hash tree and share hash tree
3367+        d.addCallback(lambda ignored:
3368+            mw.put_blockhashes(self.block_hash_tree))
3369+        d.addCallback(_check_next_write)
3370+        d.addCallback(lambda ignored:
3371+            mw.put_salthashes(self.salt_hash_tree))
3372+        d.addCallback(_check_next_write)
3373+        d.addCallback(lambda ignored:
3374+            mw.put_sharehashes(self.share_hash_chain))
3375+        d.addCallback(_check_next_write)
3376+        # Add the root hash and the salt hash. This should change the
3377+        # checkstring, but not in a way that we'll be able to see right
3378+        # now, since the read vectors are applied before the write
3379+        # vectors.
3380+        d.addCallback(lambda ignored:
3381+            mw.put_root_hash(self.root_hash))
3382+        def _check_old_testv_after_new_one_is_written(results):
3383+            result, readvs = results
3384+            self.failUnless(result)
3385+            self.failUnlessIn(0, readvs)
3386+            self.failUnlessEqual(self.expected_checkstring,
3387+                                 readvs[0][0])
3388+            new_checkstring = mw.get_checkstring()
3389+            self.failIfEqual(new_checkstring,
3390+                             readvs[0][0])
3391+        d.addCallback(_check_old_testv_after_new_one_is_written)
3392+        # Now add the signature. This should succeed, meaning that the
3393+        # data gets written and the read vector matches what the writer
3394+        # thinks should be there.
3395+        d.addCallback(lambda ignored:
3396+            mw.put_signature(self.signature))
3397+        d.addCallback(_check_next_write)
3398+        # The checkstring remains the same for the rest of the process.
3399+        return d
3400+
3401+
3402+    def test_blockhashes_after_salt_hash_tree(self):
3403+        mw = self._make_new_mw("si1", 0)
3404+        d = defer.succeed(None)
3405+        # Put everything up to and including the salt hash tree
3406+        for i in xrange(6):
3407+            d.addCallback(lambda ignored, i=i:
3408+                mw.put_block(self.block, i, self.salt))
3409+        d.addCallback(lambda ignored:
3410+            mw.put_encprivkey(self.encprivkey))
3411+        d.addCallback(lambda ignored:
3412+            mw.put_blockhashes(self.block_hash_tree))
3413+        d.addCallback(lambda ignored:
3414+            mw.put_salthashes(self.salt_hash_tree))
3415+        # Now try to put a block hash tree after the salt hash tree
3416+        # This won't necessarily overwrite the share hash chain, but it
3417+        # is a bad idea in general -- if we write one that is anything
3418+        # other than the exact size of the initial one, we will either
3419+        # overwrite the share hash chain, or give the reader (who uses
3420+        # the offset of the share hash chain as an end boundary) a
3421+        # shorter tree than they know to read, which will result in them
3422+        # reading junk. There is little reason to support it as a use
3423+        # case, so we should disallow it altogether.
3424+        d.addCallback(lambda ignored:
3425+            self.shouldFail(LayoutInvalid, "test same blockhashes",
3426+                            None,
3427+                            mw.put_blockhashes, self.block_hash_tree))
3428+        return d
3429+
3430+
3431+    def test_salt_hash_tree_after_share_hash_chain(self):
3432+        mw = self._make_new_mw("si1", 0)
3433+        d = defer.succeed(None)
3434+        # Put everything up to and including the share hash chain
3435+        for i in xrange(6):
3436+            d.addCallback(lambda ignored, i=i:
3437+                mw.put_block(self.block, i, self.salt))
3438+        d.addCallback(lambda ignored:
3439+            mw.put_encprivkey(self.encprivkey))
3440+        d.addCallback(lambda ignored:
3441+            mw.put_blockhashes(self.block_hash_tree))
3442+        d.addCallback(lambda ignored:
3443+            mw.put_salthashes(self.salt_hash_tree))
3444+        d.addCallback(lambda ignored:
3445+            mw.put_sharehashes(self.share_hash_chain))
3446+
3447+        # Now try to put the salt hash tree again. This should fail for
3448+        # the same reason that it fails in the previous test.
3449+        d.addCallback(lambda ignored:
3450+            self.shouldFail(LayoutInvalid, "test repeat salthashes",
3451+                            None,
3452+                            mw.put_salthashes, self.salt_hash_tree))
3453+        return d
3454+
3455+
3456+    def test_encprivkey_after_blockhashes(self):
3457+        mw = self._make_new_mw("si1", 0)
3458+        d = defer.succeed(None)
3459+        # Put everything up to and including the block hash tree
3460+        for i in xrange(6):
3461+            d.addCallback(lambda ignored, i=i:
3462+                mw.put_block(self.block, i, self.salt))
3463+        d.addCallback(lambda ignored:
3464+            mw.put_encprivkey(self.encprivkey))
3465+        d.addCallback(lambda ignored:
3466+            mw.put_blockhashes(self.block_hash_tree))
3467+        d.addCallback(lambda ignored:
3468+            self.shouldFail(LayoutInvalid, "out of order private key",
3469+                            None,
3470+                            mw.put_encprivkey, self.encprivkey))
3471+        return d
3472+
3473+
3474+    def test_share_hash_chain_after_signature(self):
3475+        mw = self._make_new_mw("si1", 0)
3476+        d = defer.succeed(None)
3477+        # Put everything up to and including the signature
3478+        for i in xrange(6):
3479+            d.addCallback(lambda ignored, i=i:
3480+                mw.put_block(self.block, i, self.salt))
3481+        d.addCallback(lambda ignored:
3482+            mw.put_encprivkey(self.encprivkey))
3483+        d.addCallback(lambda ignored:
3484+            mw.put_blockhashes(self.block_hash_tree))
3485+        d.addCallback(lambda ignored:
3486+            mw.put_salthashes(self.salt_hash_tree))
3487+        d.addCallback(lambda ignored:
3488+            mw.put_sharehashes(self.share_hash_chain))
3489+        d.addCallback(lambda ignored:
3490+            mw.put_root_hash(self.root_hash))
3491+        d.addCallback(lambda ignored:
3492+            mw.put_signature(self.signature))
3493+        # Now try to put the share hash chain again. This should fail
3494+        d.addCallback(lambda ignored:
3495+            self.shouldFail(LayoutInvalid, "out of order share hash chain",
3496+                            None,
3497+                            mw.put_sharehashes, self.share_hash_chain))
3498+        return d
3499+
3500+
3501+    def test_signature_after_verification_key(self):
3502+        mw = self._make_new_mw("si1", 0)
3503+        d = defer.succeed(None)
3504+        # Put everything up to and including the verification key.
3505+        for i in xrange(6):
3506+            d.addCallback(lambda ignored, i=i:
3507+                mw.put_block(self.block, i, self.salt))
3508+        d.addCallback(lambda ignored:
3509+            mw.put_encprivkey(self.encprivkey))
3510+        d.addCallback(lambda ignored:
3511+            mw.put_blockhashes(self.block_hash_tree))
3512+        d.addCallback(lambda ignored:
3513+            mw.put_salthashes(self.salt_hash_tree))
3514+        d.addCallback(lambda ignored:
3515+            mw.put_sharehashes(self.share_hash_chain))
3516+        d.addCallback(lambda ignored:
3517+            mw.put_root_hash(self.root_hash))
3518+        d.addCallback(lambda ignored:
3519+            mw.put_signature(self.signature))
3520+        d.addCallback(lambda ignored:
3521+            mw.put_verification_key(self.verification_key))
3522+        # Now try to put the signature again. This should fail
3523+        d.addCallback(lambda ignored:
3524+            self.shouldFail(LayoutInvalid, "signature after verification",
3525+                            None,
3526+                            mw.put_signature, self.signature))
3527+        return d
3528+
3529+
3530+    def test_uncoordinated_write(self):
3531+        # Make two mutable writers, both pointing to the same storage
3532+        # server, both at the same storage index, and try writing to the
3533+        # same share.
3534+        mw1 = self._make_new_mw("si1", 0)
3535+        mw2 = self._make_new_mw("si1", 0)
3536+        d = defer.succeed(None)
3537+        def _check_success(results):
3538+            result, readvs = results
3539+            self.failUnless(result)
3540+
3541+        def _check_failure(results):
3542+            result, readvs = results
3543+            self.failIf(result)
3544+
3545+        d.addCallback(lambda ignored:
3546+            mw1.put_block(self.block, 0, self.salt))
3547+        d.addCallback(_check_success)
3548+        d.addCallback(lambda ignored:
3549+            mw2.put_block(self.block, 0, self.salt))
3550+        d.addCallback(_check_failure)
3551+        return d
3552+
3553+
3554+    def test_invalid_salt_size(self):
3555+        # Salts need to be 16 bytes in size. Writes that attempt to
3556+        # write more or less than this should be rejected.
3557+        mw = self._make_new_mw("si1", 0)
3558+        invalid_salt = "a" * 17 # 17 bytes
3559+        another_invalid_salt = "b" * 15 # 15 bytes
3560+        d = defer.succeed(None)
3561+        d.addCallback(lambda ignored:
3562+            self.shouldFail(LayoutInvalid, "salt too big",
3563+                            None,
3564+                            mw.put_block, self.block, 0, invalid_salt))
3565+        d.addCallback(lambda ignored:
3566+            self.shouldFail(LayoutInvalid, "salt too small",
3567+                            None,
3568+                            mw.put_block, self.block, 0,
3569+                            another_invalid_salt))
3570+        return d
3571+
3572+
3573+    def test_write_test_vectors(self):
3574+        # If we give the write proxy a bogus test vector at
3575+        # any point during the process, it should fail to write.
3576+        mw = self._make_new_mw("si1", 0)
3577+        mw.set_checkstring("this is a lie")
3578+        # The initial write should be expecting to find the improbable
3579+        # checkstring above in place; finding nothing, it should fail.
3580+        d = defer.succeed(None)
3581+        d.addCallback(lambda ignored:
3582+            mw.put_block(self.block, 0, self.salt))
3583+        def _check_failure(results):
3584+            result, readv = results
3585+            self.failIf(result)
3586+        d.addCallback(_check_failure)
3587+        # Now set the checkstring to the empty string, which
3588+        # indicates that no share is there.
3589+        d.addCallback(lambda ignored:
3590+            mw.set_checkstring(""))
3591+        d.addCallback(lambda ignored:
3592+            mw.put_block(self.block, 0, self.salt))
3593+        def _check_success(results):
3594+            result, readv = results
3595+            self.failUnless(result)
3596+        d.addCallback(_check_success)
3597+        # Now set the checkstring to something wrong
3598+        d.addCallback(lambda ignored:
3599+            mw.set_checkstring("something wrong"))
3600+        # This should fail to do anything
3601+        d.addCallback(lambda ignored:
3602+            mw.put_block(self.block, 1, self.salt))
3603+        d.addCallback(_check_failure)
3604+        # Now set it back to what it should be.
3605+        d.addCallback(lambda ignored:
3606+            mw.set_checkstring(mw.get_checkstring()))
3607+        for i in xrange(1, 6):
3608+            d.addCallback(lambda ignored, i=i:
3609+                mw.put_block(self.block, i, self.salt))
3610+            d.addCallback(_check_success)
3611+        d.addCallback(lambda ignored:
3612+            mw.put_encprivkey(self.encprivkey))
3613+        d.addCallback(_check_success)
3614+        d.addCallback(lambda ignored:
3615+            mw.put_blockhashes(self.block_hash_tree))
3616+        d.addCallback(_check_success)
3617+        d.addCallback(lambda ignored:
3618+            mw.put_salthashes(self.salt_hash_tree))
3619+        d.addCallback(_check_success)
3620+        d.addCallback(lambda ignored:
3621+            mw.put_sharehashes(self.share_hash_chain))
3622+        d.addCallback(_check_success)
3623+        def _keep_old_checkstring(ignored):
3624+            self.old_checkstring = mw.get_checkstring()
3625+            mw.set_checkstring("foobarbaz")
3626+        d.addCallback(_keep_old_checkstring)
3627+        d.addCallback(lambda ignored:
3628+            mw.put_root_hash(self.root_hash))
3629+        d.addCallback(_check_failure)
3630+        d.addCallback(lambda ignored:
3631+            self.failUnlessEqual(self.old_checkstring, mw.get_checkstring()))
3632+        def _restore_old_checkstring(ignored):
3633+            mw.set_checkstring(self.old_checkstring)
3634+        d.addCallback(_restore_old_checkstring)
3635+        d.addCallback(lambda ignored:
3636+            mw.put_root_hash(self.root_hash))
3637+        d.addCallback(_check_success)
3638+        # The checkstring should have been set appropriately for us on
3639+        # the last write; if we try to change it to something else,
3640+        # that change should cause the verification key step to fail.
3641+        d.addCallback(lambda ignored:
3642+            mw.set_checkstring("something else"))
3643+        d.addCallback(lambda ignored:
3644+            mw.put_signature(self.signature))
3645+        d.addCallback(_check_failure)
3646+        d.addCallback(lambda ignored:
3647+            mw.set_checkstring(mw.get_checkstring()))
3648+        d.addCallback(lambda ignored:
3649+            mw.put_signature(self.signature))
3650+        d.addCallback(_check_success)
3651+        d.addCallback(lambda ignored:
3652+            mw.put_verification_key(self.verification_key))
3653+        d.addCallback(_check_success)
3654+        return d
3655+
3656+
3657+    def test_offset_only_set_on_success(self):
3658+        # The write proxy should be smart enough to detect when a write
3659+        # has failed, and to temper its definition of progress based on
3660+        # that.
3661+        mw = self._make_new_mw("si1", 0)
3662+        d = defer.succeed(None)
3663+        for i in xrange(1, 6):
3664+            d.addCallback(lambda ignored, i=i:
3665+                mw.put_block(self.block, i, self.salt))
3666+        def _break_checkstring(ignored):
3667+            self._old_checkstring = mw.get_checkstring()
3668+            mw.set_checkstring("foobarbaz")
3669+
3670+        def _fix_checkstring(ignored):
3671+            mw.set_checkstring(self._old_checkstring)
3672+
3673+        d.addCallback(_break_checkstring)
3674+
3675+        # Setting the encrypted private key shouldn't work now, which is
3676+        # to be expected and is tested elsewhere. We also want to make
3677+        # sure that we can't add the block hash tree after a failed
3678+        # write of this sort.
3679+        d.addCallback(lambda ignored:
3680+            mw.put_encprivkey(self.encprivkey))
3681+        d.addCallback(lambda ignored:
3682+            self.shouldFail(LayoutInvalid, "test out-of-order blockhashes",
3683+                            None,
3684+                            mw.put_blockhashes, self.block_hash_tree))
3685+        d.addCallback(_fix_checkstring)
3686+        d.addCallback(lambda ignored:
3687+            mw.put_encprivkey(self.encprivkey))
3688+        d.addCallback(_break_checkstring)
3689+        d.addCallback(lambda ignored:
3690+            mw.put_blockhashes(self.block_hash_tree))
3691+        d.addCallback(lambda ignored:
3692+            self.shouldFail(LayoutInvalid, "test out-of-order sharehashes",
3693+                            None,
3694+                            mw.put_sharehashes, self.share_hash_chain))
3695+        d.addCallback(_fix_checkstring)
3696+        d.addCallback(lambda ignored:
3697+            mw.put_blockhashes(self.block_hash_tree))
3698+        d.addCallback(lambda ignored:
3699+            mw.put_salthashes(self.salt_hash_tree))
3700+        d.addCallback(_break_checkstring)
3701+        d.addCallback(lambda ignored:
3702+            mw.put_sharehashes(self.share_hash_chain))
3703+        d.addCallback(lambda ignored:
3704+            self.shouldFail(LayoutInvalid, "out-of-order root hash",
3705+                            None,
3706+                            mw.put_root_hash, self.root_hash))
3707+        d.addCallback(_fix_checkstring)
3708+        d.addCallback(lambda ignored:
3709+            mw.put_sharehashes(self.share_hash_chain))
3710+        d.addCallback(_break_checkstring)
3711+        d.addCallback(lambda ignored:
3712+            mw.put_root_hash(self.root_hash))
3713+        d.addCallback(lambda ignored:
3714+            self.shouldFail(LayoutInvalid, "out-of-order signature",
3715+                            None,
3716+                            mw.put_signature, self.signature))
3717+        d.addCallback(_fix_checkstring)
3718+        d.addCallback(lambda ignored:
3719+            mw.put_root_hash(self.root_hash))
3720+        d.addCallback(_break_checkstring)
3721+        d.addCallback(lambda ignored:
3722+            mw.put_signature(self.signature))
3723+        d.addCallback(lambda ignored:
3724+            self.shouldFail(LayoutInvalid, "out-of-order verification key",
3725+                            None,
3726+                            mw.put_verification_key,
3727+                            self.verification_key))
3728+        d.addCallback(_fix_checkstring)
3729+        d.addCallback(lambda ignored:
3730+            mw.put_signature(self.signature))
3731+        d.addCallback(_break_checkstring)
3732+        d.addCallback(lambda ignored:
3733+            mw.put_verification_key(self.verification_key))
3734+        d.addCallback(lambda ignored:
3735+            self.shouldFail(LayoutInvalid, "out-of-order finish",
3736+                            None,
3737+                            mw.finish_publishing))
3738+        return d
3739+
3740+
3741+    def serialize_blockhashes(self, blockhashes):
3742+        return "".join(blockhashes)
3743+
3744+
3745+    def serialize_sharehashes(self, sharehashes):
3746+        ret = "".join([struct.pack(">H32s", i, sharehashes[i])
3747+                        for i in sorted(sharehashes.keys())])
3748+        return ret
3749+
3750+
3751+    def test_write(self):
3752+        # This translates to a file with 6 6-byte segments, and with 2-byte
3753+        # blocks.
3754+        mw = self._make_new_mw("si1", 0)
3755+        mw2 = self._make_new_mw("si1", 1)
3756+        # Test writing some blocks.
3757+        read = self.ss.remote_slot_readv
3758+        expected_salt_offset = struct.calcsize(MDMFHEADER)
3759+        expected_share_offset = expected_salt_offset + (16 * 6)
3760+        def _check_block_write(i, share):
3761+            self.failUnlessEqual(read("si1", [share], [(expected_share_offset + (i * 2), 2)]),
3762+                                {share: [self.block]})
3763+            self.failUnlessEqual(read("si1", [share], [(expected_salt_offset + (i * 16), 16)]),
3764+                                 {share: [self.salt]})
3765+        d = defer.succeed(None)
3766+        for i in xrange(6):
3767+            d.addCallback(lambda ignored, i=i:
3768+                mw.put_block(self.block, i, self.salt))
3769+            d.addCallback(lambda ignored, i=i:
3770+                _check_block_write(i, 0))
3771+        # Now try the same thing, but with share 1 instead of share 0.
3772+        for i in xrange(6):
3773+            d.addCallback(lambda ignored, i=i:
3774+                mw2.put_block(self.block, i, self.salt))
3775+            d.addCallback(lambda ignored, i=i:
3776+                _check_block_write(i, 1))
3777+
3778+        # Next, we make a fake encrypted private key, and put it onto the
3779+        # storage server.
3780+        d.addCallback(lambda ignored:
3781+            mw.put_encprivkey(self.encprivkey))
3782+        # So far, we have:
3783+        #  header:  143 bytes
3784+        #  salts:   16 * 6 = 96 bytes
3785+        #  blocks:  2 * 6 = 12 bytes
3786+        #   = 251 bytes
3787+        expected_private_key_offset = expected_share_offset + len(self.block) * 6
3788+        self.failUnlessEqual(len(self.encprivkey), 7)
3789+        d.addCallback(lambda ignored:
3790+            self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
3791+                                 {0: [self.encprivkey]}))
3792+
3793+        # Next, we put a fake block hash tree.
3794+        d.addCallback(lambda ignored:
3795+            mw.put_blockhashes(self.block_hash_tree))
3796+        # The block hash tree got inserted at:
3797+        #  header + salts + blocks: 251 bytes
3798+        #  encrypted private key:   7 bytes
3799+        #       = 258 bytes
3800+        expected_block_hash_offset = expected_private_key_offset + len(self.encprivkey)
3801+        self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
3802+        d.addCallback(lambda ignored:
3803+            self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
3804+                                 {0: [self.block_hash_tree_s]}))
3805+
3806+        # Next, we put a fake salt hash tree.
3807+        d.addCallback(lambda ignored:
3808+            mw.put_salthashes(self.salt_hash_tree))
3809+        # The salt hash tree got inserted at
3810+        # header + salts + blocks + private key = 258 bytes
3811+        # block hash tree:          32 * 6 = 192 bytes
3812+        #   = 450 bytes
3813+        expected_salt_hash_offset = expected_block_hash_offset + len(self.block_hash_tree_s)
3814+        d.addCallback(lambda ignored:
3815+            self.failUnlessEqual(read("si1", [0], [(expected_salt_hash_offset, 32 * 5)]), {0: [self.salt_hash_tree_s]}))
3816+
3817+        # Next, put a fake share hash chain
3818+        d.addCallback(lambda ignored:
3819+            mw.put_sharehashes(self.share_hash_chain))
3820+        # The share hash chain got inserted at:
3821+        # header + salts + blocks + private key = 258 bytes
3822+        # block hash tree:                        32 * 6 = 192 bytes
3823+        # salt hash tree:                         32 * 5 = 160 bytes
3824+        #   = 610
3825+        expected_share_hash_offset = expected_salt_hash_offset + len(self.salt_hash_tree_s)
3826+        d.addCallback(lambda ignored:
3827+            self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
3828+                                 {0: [self.share_hash_chain_s]}))
3829+
3830+        # Next, we put what is supposed to be the root hash of
3831+        # our share hash tree but isn't       
3832+        d.addCallback(lambda ignored:
3833+            mw.put_root_hash(self.root_hash))
3834+        # The root hash gets inserted at byte 9 (its position is in the header,
3835+        # and is fixed). The salt is right after it.
3836+        def _check(ignored):
3837+            self.failUnlessEqual(read("si1", [0], [(9, 32)]),
3838+                                 {0: [self.root_hash]})
3839+            self.failUnlessEqual(read("si1", [0], [(41, 32)]),
3840+                                 {0: [self.salt_hash]})
3841+        d.addCallback(_check)
3842+
3843+        # Next, we put a signature of the header block.
3844+        d.addCallback(lambda ignored:
3845+            mw.put_signature(self.signature))
3846+        # The signature gets written to:
3847+        #   header + salts + blocks + block and salt and share hash tree = 814
3848+        expected_signature_offset = expected_share_hash_offset + len(self.share_hash_chain_s)
3849+        self.failUnlessEqual(len(self.signature), 9)
3850+        d.addCallback(lambda ignored:
3851+            self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
3852+                                 {0: [self.signature]}))
3853+
3854+        # Next, we put the verification key
3855+        d.addCallback(lambda ignored:
3856+            mw.put_verification_key(self.verification_key))
3857+        # The verification key gets written to:
3858+        #   804 + 9 = 815 bytes
3859+        expected_verification_key_offset = expected_signature_offset + len(self.signature)
3860+        self.failUnlessEqual(len(self.verification_key), 6)
3861+        d.addCallback(lambda ignored:
3862+            self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
3863+                                 {0: [self.verification_key]}))
3864+
3865+        def _check_signable(ignored):
3866+            # Make sure that the signable is what we think it should be.
3867+            signable = mw.get_signable()
3868+            verno, seq, roothash, salthash, k, n, segsize, datalen = \
3869+                                            struct.unpack(">BQ32s32sBBQQ",
3870+                                                          signable)
3871+            self.failUnlessEqual(verno, 1)
3872+            self.failUnlessEqual(seq, 0)
3873+            self.failUnlessEqual(roothash, self.root_hash)
3874+            self.failUnlessEqual(salthash, self.salt_hash)
3875+            self.failUnlessEqual(k, 3)
3876+            self.failUnlessEqual(n, 10)
3877+            self.failUnlessEqual(segsize, 6)
3878+            self.failUnlessEqual(datalen, 36)
3879+        d.addCallback(_check_signable)
3880+        # Next, we cause the offset table to be published.
3881+        d.addCallback(lambda ignored:
3882+            mw.finish_publishing())
3883+        expected_eof_offset = expected_verification_key_offset + len(self.verification_key)
3884+
3885+        # The offset table starts at byte 91. Happily, we have already
3886+        # worked out most of these offsets above, but we want to make
3887+        # sure that the representation on disk agrees what what we've
3888+        # calculated.
3889+        #
3890+        # (we don't have an explicit offset for the AES salts, because
3891+        # we know that they start right after the header)
3892+        def _check_offsets(ignored):
3893+            # Check the version number to make sure that it is correct.
3894+            expected_version_number = struct.pack(">B", 1)
3895+            self.failUnlessEqual(read("si1", [0], [(0, 1)]),
3896+                                 {0: [expected_version_number]})
3897+            # Check the sequence number to make sure that it is correct
3898+            expected_sequence_number = struct.pack(">Q", 0)
3899+            self.failUnlessEqual(read("si1", [0], [(1, 8)]),
3900+                                 {0: [expected_sequence_number]})
3901+            # Check that the encoding parameters (k, N, segement size, data
3902+            # length) are what they should be. These are  3, 10, 6, 36
3903+            expected_k = struct.pack(">B", 3)
3904+            self.failUnlessEqual(read("si1", [0], [(73, 1)]),
3905+                                 {0: [expected_k]})
3906+            expected_n = struct.pack(">B", 10)
3907+            self.failUnlessEqual(read("si1", [0], [(74, 1)]),
3908+                                 {0: [expected_n]})
3909+            expected_segment_size = struct.pack(">Q", 6)
3910+            self.failUnlessEqual(read("si1", [0], [(75, 8)]),
3911+                                 {0: [expected_segment_size]})
3912+            expected_data_length = struct.pack(">Q", 36)
3913+            self.failUnlessEqual(read("si1", [0], [(83, 8)]),
3914+                                 {0: [expected_data_length]})
3915+            # 91          4           The offset of the share data
3916+            expected_offset = struct.pack(">L", expected_share_offset)
3917+            self.failUnlessEqual(read("si1", [0], [(91, 4)]),
3918+                                 {0: [expected_offset]})
3919+            # 95          8           The offset of the encrypted private key
3920+            expected_offset = struct.pack(">Q", expected_private_key_offset)
3921+            self.failUnlessEqual(read("si1", [0], [(95, 8)]),
3922+                                 {0: [expected_offset]})
3923+            # 103         8           The offset of the block hash tree
3924+            expected_offset = struct.pack(">Q", expected_block_hash_offset)
3925+            self.failUnlessEqual(read("si1", [0], [(103, 8)]),
3926+                                 {0: [expected_offset]})
3927+            # 111         8           The offset of the salt hash tree
3928+            expected_offset = struct.pack(">Q", expected_salt_hash_offset)
3929+            self.failUnlessEqual(read("si1", [0], [(111, 8)]),
3930+                                 {0: [expected_offset]})
3931+            # 119         8           The offset of the share hash chain
3932+            expected_offset = struct.pack(">Q", expected_share_hash_offset)
3933+            self.failUnlessEqual(read("si1", [0], [(119, 8)]),
3934+                                 {0: [expected_offset]})
3935+            # 127         8           The offset of the signature
3936+            expected_offset = struct.pack(">Q", expected_signature_offset)
3937+            self.failUnlessEqual(read("si1", [0], [(127, 8)]),
3938+                                 {0: [expected_offset]})
3939+            # 135         8           offset of the verification_key
3940+            expected_offset = struct.pack(">Q", expected_verification_key_offset)
3941+            self.failUnlessEqual(read("si1", [0], [(135, 8)]),
3942+                                 {0: [expected_offset]})
3943+            # 143         8           offset of the EOF
3944+            expected_offset = struct.pack(">Q", expected_eof_offset)
3945+            self.failUnlessEqual(read("si1", [0], [(143, 8)]),
3946+                                 {0: [expected_offset]})
3947+        d.addCallback(_check_offsets)
3948+        return d
3949+
3950+    def _make_new_mw(self, si, share, datalength=36):
3951+        # This is a file of size 36 bytes. Since it has a segment
3952+        # size of 6, we know that it has 6 byte segments, which will
3953+        # be split into blocks of 2 bytes because our FEC k
3954+        # parameter is 3.
3955+        mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
3956+                                6, datalength)
3957+        return mw
3958+
3959+
3960+    def test_write_rejected_with_too_many_blocks(self):
3961+        mw = self._make_new_mw("si0", 0)
3962+
3963+        # Try writing too many blocks. We should not be able to write
3964+        # more than 6
3965+        # blocks into each share.
3966+        d = defer.succeed(None)
3967+        for i in xrange(6):
3968+            d.addCallback(lambda ignored, i=i:
3969+                mw.put_block(self.block, i, self.salt))
3970+        d.addCallback(lambda ignored:
3971+            self.shouldFail(LayoutInvalid, "too many blocks",
3972+                            None,
3973+                            mw.put_block, self.block, 7, self.salt))
3974+        return d
3975+
3976+
3977+    def test_write_rejected_with_invalid_salt(self):
3978+        # Try writing an invalid salt. Salts are 16 bytes -- any more or
3979+        # less should cause an error.
3980+        mw = self._make_new_mw("si1", 0)
3981+        bad_salt = "a" * 17 # 17 bytes
3982+        d = defer.succeed(None)
3983+        d.addCallback(lambda ignored:
3984+            self.shouldFail(LayoutInvalid, "test_invalid_salt",
3985+                            None, mw.put_block, self.block, 7, bad_salt))
3986+        return d
3987+
3988+
3989+    def test_write_rejected_with_invalid_root_hash(self):
3990+        # Try writing an invalid root hash. This should be SHA256d, and
3991+        # 32 bytes long as a result.
3992+        mw = self._make_new_mw("si2", 0)
3993+        # 17 bytes != 32 bytes
3994+        invalid_root_hash = "a" * 17
3995+        d = defer.succeed(None)
3996+        # Before this test can work, we need to put some blocks + salts,
3997+        # a block hash tree, and a share hash tree. Otherwise, we'll see
3998+        # failures that match what we are looking for, but are caused by
3999+        # the constraints imposed on operation ordering.
4000+        for i in xrange(6):
4001+            d.addCallback(lambda ignored, i=i:
4002+                mw.put_block(self.block, i, self.salt))
4003+        d.addCallback(lambda ignored:
4004+            mw.put_encprivkey(self.encprivkey))
4005+        d.addCallback(lambda ignored:
4006+            mw.put_blockhashes(self.block_hash_tree))
4007+        d.addCallback(lambda ignored:
4008+            mw.put_salthashes(self.salt_hash_tree))
4009+        d.addCallback(lambda ignored:
4010+            mw.put_sharehashes(self.share_hash_chain))
4011+        d.addCallback(lambda ignored:
4012+            self.shouldFail(LayoutInvalid, "invalid root hash",
4013+                            None, mw.put_root_hash, invalid_root_hash))
4014+        return d
4015+
4016+
4017+    def test_write_rejected_with_invalid_blocksize(self):
4018+        # The blocksize implied by the writer that we get from
4019+        # _make_new_mw is 2bytes -- any more or any less than this
4020+        # should be cause for failure, unless it is the tail segment, in
4021+        # which case it may not be failure.
4022+        invalid_block = "a"
4023+        mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
4024+                                             # one byte blocks
4025+        # 1 bytes != 2 bytes
4026+        d = defer.succeed(None)
4027+        d.addCallback(lambda ignored, invalid_block=invalid_block:
4028+            self.shouldFail(LayoutInvalid, "test blocksize too small",
4029+                            None, mw.put_block, invalid_block, 0,
4030+                            self.salt))
4031+        invalid_block = invalid_block * 3
4032+        # 3 bytes != 2 bytes
4033+        d.addCallback(lambda ignored:
4034+            self.shouldFail(LayoutInvalid, "test blocksize too large",
4035+                            None,
4036+                            mw.put_block, invalid_block, 0, self.salt))
4037+        for i in xrange(5):
4038+            d.addCallback(lambda ignored, i=i:
4039+                mw.put_block(self.block, i, self.salt))
4040+        # Try to put an invalid tail segment
4041+        d.addCallback(lambda ignored:
4042+            self.shouldFail(LayoutInvalid, "test invalid tail segment",
4043+                            None,
4044+                            mw.put_block, self.block, 5, self.salt))
4045+        valid_block = "a"
4046+        d.addCallback(lambda ignored:
4047+            mw.put_block(valid_block, 5, self.salt))
4048+        return d
4049+
4050+
4051+    def test_write_enforces_order_constraints(self):
4052+        # We require that the MDMFSlotWriteProxy be interacted with in a
4053+        # specific way.
4054+        # That way is:
4055+        # 0: __init__
4056+        # 1: write blocks and salts
4057+        # 2: Write the encrypted private key
4058+        # 3: Write the block hashes
4059+        # 4: Write the share hashes
4060+        # 5: Write the root hash and salt hash
4061+        # 6: Write the signature and verification key
4062+        # 7: Write the file.
4063+        #
4064+        # Some of these can be performed out-of-order, and some can't.
4065+        # The dependencies that I want to test here are:
4066+        #  - Private key before block hashes
4067+        #  - share hashes and block hashes before root hash
4068+        #  - root hash before signature
4069+        #  - signature before verification key
4070+        mw0 = self._make_new_mw("si0", 0)
4071+        # Write some shares
4072+        d = defer.succeed(None)
4073+        for i in xrange(6):
4074+            d.addCallback(lambda ignored, i=i:
4075+                mw0.put_block(self.block, i, self.salt))
4076+        # Try to write the block hashes before writing the encrypted
4077+        # private key
4078+        d.addCallback(lambda ignored:
4079+            self.shouldFail(LayoutInvalid, "block hashes before key",
4080+                            None, mw0.put_blockhashes,
4081+                            self.block_hash_tree))
4082+
4083+        # Write the private key.
4084+        d.addCallback(lambda ignored:
4085+            mw0.put_encprivkey(self.encprivkey))
4086+
4087+
4088+        # Try to write the salt hash tree without writing the block hash
4089+        # tree.
4090+        d.addCallback(lambda ignored:
4091+            self.shouldFail(LayoutInvalid, "salt hash tree before bht",
4092+                            None,
4093+                            mw0.put_salthashes, self.salt_hash_tree))
4094+
4095+
4096+        # Try to write the share hash chain without writing the block
4097+        # hash tree
4098+        d.addCallback(lambda ignored:
4099+            self.shouldFail(LayoutInvalid, "share hash chain before "
4100+                                           "salt hash tree",
4101+                            None,
4102+                            mw0.put_sharehashes, self.share_hash_chain))
4103+
4104+        # Try to write the root hash and salt hash without writing either the
4105+        # block hashes or the salt hashes or the share hashes
4106+        d.addCallback(lambda ignored:
4107+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
4108+                            None,
4109+                            mw0.put_root_hash, self.root_hash))
4110+
4111+        # Now write the block hashes and try again
4112+        d.addCallback(lambda ignored:
4113+            mw0.put_blockhashes(self.block_hash_tree))
4114+
4115+        d.addCallback(lambda ignored:
4116+            self.shouldFail(LayoutInvalid, "share hash before salt hashes",
4117+                            None,
4118+                            mw0.put_sharehashes, self.share_hash_chain))
4119+        d.addCallback(lambda ignored:
4120+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
4121+                            None, mw0.put_root_hash, self.root_hash))
4122+
4123+        # We haven't yet put the root hash on the share, so we shouldn't
4124+        # be able to sign it.
4125+        d.addCallback(lambda ignored:
4126+            self.shouldFail(LayoutInvalid, "signature before root hash",
4127+                            None, mw0.put_signature, self.signature))
4128+
4129+        d.addCallback(lambda ignored:
4130+            self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
4131+
4132+        # ..and, since that fails, we also shouldn't be able to put the
4133+        # verification key.
4134+        d.addCallback(lambda ignored:
4135+            self.shouldFail(LayoutInvalid, "key before signature",
4136+                            None, mw0.put_verification_key,
4137+                            self.verification_key))
4138+
4139+        # Now write the salt hashes, and try again.
4140+        d.addCallback(lambda ignored:
4141+            mw0.put_salthashes(self.salt_hash_tree))
4142+
4143+        d.addCallback(lambda ignored:
4144+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
4145+                            None,
4146+                            mw0.put_root_hash, self.root_hash))
4147+
4148+        # We should still be unable to sign the header
4149+        d.addCallback(lambda ignored:
4150+            self.shouldFail(LayoutInvalid, "signature before hashes",
4151+                            None,
4152+                            mw0.put_signature, self.signature))
4153+
4154+        # Now write the share hashes.
4155+        d.addCallback(lambda ignored:
4156+            mw0.put_sharehashes(self.share_hash_chain))
4157+        # We should be able to write the root hash now too
4158+        d.addCallback(lambda ignored:
4159+            mw0.put_root_hash(self.root_hash))
4160+
4161+        # We should still be unable to put the verification key
4162+        d.addCallback(lambda ignored:
4163+            self.shouldFail(LayoutInvalid, "key before signature",
4164+                            None, mw0.put_verification_key,
4165+                            self.verification_key))
4166+
4167+        d.addCallback(lambda ignored:
4168+            mw0.put_signature(self.signature))
4169+
4170+        # We shouldn't be able to write the offsets to the remote server
4171+        # until the offset table is finished; IOW, until we have written
4172+        # the verification key.
4173+        d.addCallback(lambda ignored:
4174+            self.shouldFail(LayoutInvalid, "offsets before verification key",
4175+                            None,
4176+                            mw0.finish_publishing))
4177+
4178+        d.addCallback(lambda ignored:
4179+            mw0.put_verification_key(self.verification_key))
4180+        return d
4181+
4182+
4183+    def test_end_to_end(self):
4184+        mw = self._make_new_mw("si1", 0)
4185+        # Write a share using the mutable writer, and make sure that the
4186+        # reader knows how to read everything back to us.
4187+        d = defer.succeed(None)
4188+        for i in xrange(6):
4189+            d.addCallback(lambda ignored, i=i:
4190+                mw.put_block(self.block, i, self.salt))
4191+        d.addCallback(lambda ignored:
4192+            mw.put_encprivkey(self.encprivkey))
4193+        d.addCallback(lambda ignored:
4194+            mw.put_blockhashes(self.block_hash_tree))
4195+        d.addCallback(lambda ignored:
4196+            mw.put_salthashes(self.salt_hash_tree))
4197+        d.addCallback(lambda ignored:
4198+            mw.put_sharehashes(self.share_hash_chain))
4199+        d.addCallback(lambda ignored:
4200+            mw.put_root_hash(self.root_hash))
4201+        d.addCallback(lambda ignored:
4202+            mw.put_signature(self.signature))
4203+        d.addCallback(lambda ignored:
4204+            mw.put_verification_key(self.verification_key))
4205+        d.addCallback(lambda ignored:
4206+            mw.finish_publishing())
4207+
4208+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4209+        def _check_block_and_salt((block, salt)):
4210+            self.failUnlessEqual(block, self.block)
4211+            self.failUnlessEqual(salt, self.salt)
4212+
4213+        for i in xrange(6):
4214+            d.addCallback(lambda ignored, i=i:
4215+                mr.get_block_and_salt(i))
4216+            d.addCallback(_check_block_and_salt)
4217+
4218+        d.addCallback(lambda ignored:
4219+            mr.get_encprivkey())
4220+        d.addCallback(lambda encprivkey:
4221+            self.failUnlessEqual(self.encprivkey, encprivkey))
4222+
4223+        d.addCallback(lambda ignored:
4224+            mr.get_blockhashes())
4225+        d.addCallback(lambda blockhashes:
4226+            self.failUnlessEqual(self.block_hash_tree, blockhashes))
4227+
4228+        d.addCallback(lambda ignored:
4229+            mr.get_sharehashes())
4230+        d.addCallback(lambda sharehashes:
4231+            self.failUnlessEqual(self.share_hash_chain, sharehashes))
4232+
4233+        d.addCallback(lambda ignored:
4234+            mr.get_signature())
4235+        d.addCallback(lambda signature:
4236+            self.failUnlessEqual(signature, self.signature))
4237+
4238+        d.addCallback(lambda ignored:
4239+            mr.get_verification_key())
4240+        d.addCallback(lambda verification_key:
4241+            self.failUnlessEqual(verification_key, self.verification_key))
4242+
4243+        d.addCallback(lambda ignored:
4244+            mr.get_seqnum())
4245+        d.addCallback(lambda seqnum:
4246+            self.failUnlessEqual(seqnum, 0))
4247+
4248+        d.addCallback(lambda ignored:
4249+            mr.get_root_hash())
4250+        d.addCallback(lambda root_hash:
4251+            self.failUnlessEqual(self.root_hash, root_hash))
4252+
4253+        d.addCallback(lambda ignored:
4254+            mr.get_salt_hash())
4255+        d.addCallback(lambda salt_hash:
4256+            self.failUnlessEqual(self.salt_hash, salt_hash))
4257+
4258+        d.addCallback(lambda ignored:
4259+            mr.get_encoding_parameters())
4260+        def _check_encoding_parameters((k, n, segsize, datalen)):
4261+            self.failUnlessEqual(k, 3)
4262+            self.failUnlessEqual(n, 10)
4263+            self.failUnlessEqual(segsize, 6)
4264+            self.failUnlessEqual(datalen, 36)
4265+        d.addCallback(_check_encoding_parameters)
4266+
4267+        d.addCallback(lambda ignored:
4268+            mr.get_checkstring())
4269+        d.addCallback(lambda checkstring:
4270+            self.failUnlessEqual(checkstring, mw.get_checkstring()))
4271+        return d
4272+
4273+
4274+    def test_is_sdmf(self):
4275+        # The MDMFSlotReadProxy should also know how to read SDMF files,
4276+        # since it will encounter them on the grid. Callers use the
4277+        # is_sdmf method to test this.
4278+        self.write_sdmf_share_to_server("si1")
4279+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4280+        d = mr.is_sdmf()
4281+        d.addCallback(lambda issdmf:
4282+            self.failUnless(issdmf))
4283+        return d
4284+
4285+
4286+    def test_reads_sdmf(self):
4287+        # The slot read proxy should, naturally, know how to tell us
4288+        # about data in the SDMF format
4289+        self.write_sdmf_share_to_server("si1")
4290+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4291+        d = defer.succeed(None)
4292+        d.addCallback(lambda ignored:
4293+            mr.is_sdmf())
4294+        d.addCallback(lambda issdmf:
4295+            self.failUnless(issdmf))
4296+
4297+        # What do we need to read?
4298+        #  - The sharedata
4299+        #  - The salt
4300+        d.addCallback(lambda ignored:
4301+            mr.get_block_and_salt(0))
4302+        def _check_block_and_salt(results):
4303+            block, salt = results
4304+            # Our original file is 36 bytes long. Then each share is 12
4305+            # bytes in size. The share is composed entirely of the
4306+            # letter a. self.block contains 2 as, so 6 * self.block is
4307+            # what we are looking for.
4308+            self.failUnlessEqual(block, self.block * 6)
4309+            self.failUnlessEqual(salt, self.salt)
4310+        d.addCallback(_check_block_and_salt)
4311+
4312+        #  - The blockhashes
4313+        d.addCallback(lambda ignored:
4314+            mr.get_blockhashes())
4315+        d.addCallback(lambda blockhashes:
4316+            self.failUnlessEqual(self.block_hash_tree,
4317+                                 blockhashes,
4318+                                 blockhashes))
4319+        #  - The sharehashes
4320+        d.addCallback(lambda ignored:
4321+            mr.get_sharehashes())
4322+        d.addCallback(lambda sharehashes:
4323+            self.failUnlessEqual(self.share_hash_chain,
4324+                                 sharehashes))
4325+        #  - The keys
4326+        d.addCallback(lambda ignored:
4327+            mr.get_encprivkey())
4328+        d.addCallback(lambda encprivkey:
4329+            self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
4330+        d.addCallback(lambda ignored:
4331+            mr.get_verification_key())
4332+        d.addCallback(lambda verification_key:
4333+            self.failUnlessEqual(verification_key,
4334+                                 self.verification_key,
4335+                                 verification_key))
4336+        #  - The signature
4337+        d.addCallback(lambda ignored:
4338+            mr.get_signature())
4339+        d.addCallback(lambda signature:
4340+            self.failUnlessEqual(signature, self.signature, signature))
4341+
4342+        #  - The sequence number
4343+        d.addCallback(lambda ignored:
4344+            mr.get_seqnum())
4345+        d.addCallback(lambda seqnum:
4346+            self.failUnlessEqual(seqnum, 0, seqnum))
4347+
4348+        #  - The root hash
4349+        #  - The salt hash (to verify that it is None)
4350+        d.addCallback(lambda ignored:
4351+            mr.get_root_hash())
4352+        d.addCallback(lambda root_hash:
4353+            self.failUnlessEqual(root_hash, self.root_hash, root_hash))
4354+        d.addCallback(lambda ignored:
4355+            mr.get_salt_hash())
4356+        d.addCallback(lambda salt_hash:
4357+            self.failIf(salt_hash))
4358+        return d
4359+
4360+
4361+    def test_only_reads_one_segment_sdmf(self):
4362+        # SDMF shares have only one segment, so it doesn't make sense to
4363+        # read more segments than that. The reader should know this and
4364+        # complain if we try to do that.
4365+        self.write_sdmf_share_to_server("si1")
4366+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4367+        d = defer.succeed(None)
4368+        d.addCallback(lambda ignored:
4369+            mr.is_sdmf())
4370+        d.addCallback(lambda issdmf:
4371+            self.failUnless(issdmf))
4372+        d.addCallback(lambda ignored:
4373+            self.shouldFail(LayoutInvalid, "test bad segment",
4374+                            None,
4375+                            mr.get_block_and_salt, 1))
4376+        return d
4377+
4378+
4379+    def test_read_with_prefetched_mdmf_data(self):
4380+        # The MDMFSlotReadProxy will prefill certain fields if you pass
4381+        # it data that you have already fetched. This is useful for
4382+        # cases like the Servermap, which prefetches ~2kb of data while
4383+        # finding out which shares are on the remote peer so that it
4384+        # doesn't waste round trips.
4385+        mdmf_data = self.build_test_mdmf_share()
4386+        self.write_test_share_to_server("si1")
4387+        def _make_mr(ignored, length):
4388+            mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
4389+            return mr
4390+
4391+        d = defer.succeed(None)
4392+        # This should be enough to fill in both the encoding parameters
4393+        # and the table of offsets, which will complete the version
4394+        # information tuple.
4395+        d.addCallback(_make_mr, 151)
4396+        d.addCallback(lambda mr:
4397+            mr.get_verinfo())
4398+        def _check_verinfo(verinfo):
4399+            self.failUnless(verinfo)
4400+            self.failUnlessEqual(len(verinfo), 9)
4401+            (seqnum,
4402+             root_hash,
4403+             salt_hash,
4404+             segsize,
4405+             datalen,
4406+             k,
4407+             n,
4408+             prefix,
4409+             offsets) = verinfo
4410+            self.failUnlessEqual(seqnum, 0)
4411+            self.failUnlessEqual(root_hash, self.root_hash)
4412+            self.failUnlessEqual(salt_hash, self.salt_hash)
4413+            self.failUnlessEqual(segsize, 6)
4414+            self.failUnlessEqual(datalen, 36)
4415+            self.failUnlessEqual(k, 3)
4416+            self.failUnlessEqual(n, 10)
4417+            expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
4418+                                          1,
4419+                                          seqnum,
4420+                                          root_hash,
4421+                                          salt_hash,
4422+                                          k,
4423+                                          n,
4424+                                          segsize,
4425+                                          datalen)
4426+            self.failUnlessEqual(expected_prefix, prefix)
4427+            self.failUnlessEqual(self.rref.read_count, 0)
4428+        d.addCallback(_check_verinfo)
4429+        # This is not enough data to read a block and a share, so the
4430+        # wrapper should attempt to read this from the remote server.
4431+        d.addCallback(_make_mr, 151)
4432+        d.addCallback(lambda mr:
4433+            mr.get_block_and_salt(0))
4434+        def _check_block_and_salt((block, salt)):
4435+            self.failUnlessEqual(block, self.block)
4436+            self.failUnlessEqual(salt, self.salt)
4437+            self.failUnlessEqual(self.rref.read_count, 1)
4438+        # The file that we're playing with has 6 segments. Then there
4439+        # are 6 * 16 = 96 bytes of salts before we can write shares.
4440+        # Each block has two bytes, so 143 + 96 + 2 = 241 bytes should
4441+        # be enough to read one block.
4442+        d.addCallback(_make_mr, 249)
4443+        d.addCallback(lambda mr:
4444+            mr.get_block_and_salt(0))
4445+        d.addCallback(_check_block_and_salt)
4446+        return d
4447+
4448+
4449+    def test_read_with_prefetched_sdmf_data(self):
4450+        sdmf_data = self.build_test_sdmf_share()
4451+        self.write_sdmf_share_to_server("si1")
4452+        def _make_mr(ignored, length):
4453+            mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
4454+            return mr
4455+
4456+        d = defer.succeed(None)
4457+        # This should be enough to get us the encoding parameters,
4458+        # offset table, and everything else we need to build a verinfo
4459+        # string.
4460+        d.addCallback(_make_mr, 107)
4461+        d.addCallback(lambda mr:
4462+            mr.get_verinfo())
4463+        def _check_verinfo(verinfo):
4464+            self.failUnless(verinfo)
4465+            self.failUnlessEqual(len(verinfo), 9)
4466+            (seqnum,
4467+             root_hash,
4468+             salt,
4469+             segsize,
4470+             datalen,
4471+             k,
4472+             n,
4473+             prefix,
4474+             offsets) = verinfo
4475+            self.failUnlessEqual(seqnum, 0)
4476+            self.failUnlessEqual(root_hash, self.root_hash)
4477+            self.failUnlessEqual(salt, self.salt)
4478+            self.failUnlessEqual(segsize, 36)
4479+            self.failUnlessEqual(datalen, 36)
4480+            self.failUnlessEqual(k, 3)
4481+            self.failUnlessEqual(n, 10)
4482+            expected_prefix = struct.pack(SIGNED_PREFIX,
4483+                                          0,
4484+                                          seqnum,
4485+                                          root_hash,
4486+                                          salt,
4487+                                          k,
4488+                                          n,
4489+                                          segsize,
4490+                                          datalen)
4491+            self.failUnlessEqual(expected_prefix, prefix)
4492+            self.failUnlessEqual(self.rref.read_count, 0)
4493+        d.addCallback(_check_verinfo)
4494+        # This shouldn't be enough to read any share data.
4495+        d.addCallback(_make_mr, 107)
4496+        d.addCallback(lambda mr:
4497+            mr.get_block_and_salt(0))
4498+        def _check_block_and_salt((block, salt)):
4499+            self.failUnlessEqual(block, self.block * 6)
4500+            self.failUnlessEqual(salt, self.salt)
4501+            # TODO: Fix the read routine so that it reads only the data
4502+            #       that it has cached if it can't read all of it.
4503+            self.failUnlessEqual(self.rref.read_count, 2)
4504+
4505+        # This should be enough to read share data.
4506+        d.addCallback(_make_mr, self.offsets['share_data'])
4507+        d.addCallback(lambda mr:
4508+            mr.get_block_and_salt(0))
4509+        d.addCallback(_check_block_and_salt)
4510+        return d
4511+
4512+
4513+    def test_read_with_empty_mdmf_file(self):
4514+        # Some tests upload a file with no contents to test things
4515+        # unrelated to the actual handling of the content of the file.
4516+        # The reader should behave intelligently in these cases.
4517+        self.write_test_share_to_server("si1", empty=True)
4518+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4519+        # We should be able to get the encoding parameters, and they
4520+        # should be correct.
4521+        d = defer.succeed(None)
4522+        d.addCallback(lambda ignored:
4523+            mr.get_encoding_parameters())
4524+        def _check_encoding_parameters(params):
4525+            self.failUnlessEqual(len(params), 4)
4526+            k, n, segsize, datalen = params
4527+            self.failUnlessEqual(k, 3)
4528+            self.failUnlessEqual(n, 10)
4529+            self.failUnlessEqual(segsize, 0)
4530+            self.failUnlessEqual(datalen, 0)
4531+        d.addCallback(_check_encoding_parameters)
4532+
4533+        # We should not be able to fetch a block, since there are no
4534+        # blocks to fetch
4535+        d.addCallback(lambda ignored:
4536+            self.shouldFail(LayoutInvalid, "get block on empty file",
4537+                            None,
4538+                            mr.get_block_and_salt, 0))
4539+        return d
4540+
4541+
4542+    def test_read_with_empty_sdmf_file(self):
4543+        self.write_sdmf_share_to_server("si1", empty=True)
4544+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4545+        # We should be able to get the encoding parameters, and they
4546+        # should be correct
4547+        d = defer.succeed(None)
4548+        d.addCallback(lambda ignored:
4549+            mr.get_encoding_parameters())
4550+        def _check_encoding_parameters(params):
4551+            self.failUnlessEqual(len(params), 4)
4552+            k, n, segsize, datalen = params
4553+            self.failUnlessEqual(k, 3)
4554+            self.failUnlessEqual(n, 10)
4555+            self.failUnlessEqual(segsize, 0)
4556+            self.failUnlessEqual(datalen, 0)
4557+        d.addCallback(_check_encoding_parameters)
4558+
4559+        # It does not make sense to get a block in this format, so we
4560+        # should not be able to.
4561+        d.addCallback(lambda ignored:
4562+            self.shouldFail(LayoutInvalid, "get block on an empty file",
4563+                            None,
4564+                            mr.get_block_and_salt, 0))
4565+        return d
4566+
4567+
4568+    def test_verinfo_with_sdmf_file(self):
4569+        self.write_sdmf_share_to_server("si1")
4570+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4571+        # We should be able to get the version information.
4572+        d = defer.succeed(None)
4573+        d.addCallback(lambda ignored:
4574+            mr.get_verinfo())
4575+        def _check_verinfo(verinfo):
4576+            self.failUnless(verinfo)
4577+            self.failUnlessEqual(len(verinfo), 9)
4578+            (seqnum,
4579+             root_hash,
4580+             salt,
4581+             segsize,
4582+             datalen,
4583+             k,
4584+             n,
4585+             prefix,
4586+             offsets) = verinfo
4587+            self.failUnlessEqual(seqnum, 0)
4588+            self.failUnlessEqual(root_hash, self.root_hash)
4589+            self.failUnlessEqual(salt, self.salt)
4590+            self.failUnlessEqual(segsize, 36)
4591+            self.failUnlessEqual(datalen, 36)
4592+            self.failUnlessEqual(k, 3)
4593+            self.failUnlessEqual(n, 10)
4594+            expected_prefix = struct.pack(">BQ32s16s BBQQ",
4595+                                          0,
4596+                                          seqnum,
4597+                                          root_hash,
4598+                                          salt,
4599+                                          k,
4600+                                          n,
4601+                                          segsize,
4602+                                          datalen)
4603+            self.failUnlessEqual(prefix, expected_prefix)
4604+            self.failUnlessEqual(offsets, self.offsets)
4605+        d.addCallback(_check_verinfo)
4606+        return d
4607+
4608+
4609+    def test_verinfo_with_mdmf_file(self):
4610+        self.write_test_share_to_server("si1")
4611+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4612+        d = defer.succeed(None)
4613+        d.addCallback(lambda ignored:
4614+            mr.get_verinfo())
4615+        def _check_verinfo(verinfo):
4616+            self.failUnless(verinfo)
4617+            self.failUnlessEqual(len(verinfo), 9)
4618+            (seqnum,
4619+             root_hash,
4620+             salt_hash,
4621+             segsize,
4622+             datalen,
4623+             k,
4624+             n,
4625+             prefix,
4626+             offsets) = verinfo
4627+            self.failUnlessEqual(seqnum, 0)
4628+            self.failUnlessEqual(root_hash, self.root_hash)
4629+            self.failUnlessEqual(salt_hash, self.salt_hash)
4630+            self.failUnlessEqual(segsize, 6)
4631+            self.failUnlessEqual(datalen, 36)
4632+            self.failUnlessEqual(k, 3)
4633+            self.failUnlessEqual(n, 10)
4634+            expected_prefix = struct.pack(">BQ32s32s BBQQ",
4635+                                          1,
4636+                                          seqnum,
4637+                                          root_hash,
4638+                                          salt_hash,
4639+                                          k,
4640+                                          n,
4641+                                          segsize,
4642+                                          datalen)
4643+            self.failUnlessEqual(prefix, expected_prefix)
4644+            self.failUnlessEqual(offsets, self.offsets)
4645+        d.addCallback(_check_verinfo)
4646+        return d
4647+
4648+
4649+    def test_reader_queue(self):
4650+        self.write_test_share_to_server('si1')
4651+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
4652+        d1 = mr.get_block_and_salt(0, queue=True)
4653+        d2 = mr.get_blockhashes(queue=True)
4654+        d3 = mr.get_salthashes(queue=True)
4655+        d4 = mr.get_sharehashes(queue=True)
4656+        d5 = mr.get_signature(queue=True)
4657+        d6 = mr.get_verification_key(queue=True)
4658+        dl = defer.DeferredList([d1, d2, d3, d4, d5, d6])
4659+        mr.flush()
4660+        def _print(results):
4661+            self.failUnlessEqual(len(results), 6)
4662+            # We have one read for version information, one for offsets, and
4663+            # one for everything else.
4664+            self.failUnlessEqual(self.rref.read_count, 3)
4665+            block, salt = results[0][1] # results[0] is a boolean that says
4666+                                           # whether or not the operation
4667+                                           # worked.
4668+            self.failUnlessEqual(self.block, block)
4669+            self.failUnlessEqual(self.salt, salt)
4670+
4671+            blockhashes = results[1][1]
4672+            self.failUnlessEqual(self.block_hash_tree, blockhashes)
4673+
4674+            salthashes = results[2][1]
4675+            self.failUnlessEqual(self.salt_hash_tree[1:], salthashes)
4676+
4677+            sharehashes = results[3][1]
4678+            self.failUnlessEqual(self.share_hash_chain, sharehashes)
4679+
4680+            signature = results[4][1]
4681+            self.failUnlessEqual(self.signature, signature)
4682+
4683+            verification_key = results[5][1]
4684+            self.failUnlessEqual(self.verification_key, verification_key)
4685+        dl.addCallback(_print)
4686+        return dl
4687+
4688+
4689 class Stats(unittest.TestCase):
4690 
4691     def setUp(self):
4692}
4693[Write a segmented mutable downloader
4694Kevan Carstensen <kevan@isnotajoke.com>**20100626003355
4695 Ignore-this: 48bbcbee2b2f6afeffca23688db010e2
4696 
4697 The segmented mutable downloader can deal with MDMF files (files with
4698 one or more segments in MDMF format) and SDMF files (files with one
4699 segment in SDMF format). It is backwards compatible with the old
4700 file format.
4701 
4702 This patch also contains tests for the segmented mutable downloader.
4703] {
4704hunk ./src/allmydata/mutable/retrieve.py 9
4705 from twisted.python import failure
4706 from foolscap.api import DeadReferenceError, eventually, fireEventually
4707 from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
4708-from allmydata.util import hashutil, idlib, log
4709+from allmydata.util import hashutil, idlib, log, mathutil
4710 from allmydata import hashtree, codec
4711 from allmydata.storage.server import si_b2a
4712 from pycryptopp.cipher.aes import AES
4713hunk ./src/allmydata/mutable/retrieve.py 16
4714 from pycryptopp.publickey import rsa
4715 
4716 from allmydata.mutable.common import DictOfSets, CorruptShareError, UncoordinatedWriteError
4717-from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data
4718+from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data, \
4719+                                     MDMFSlotReadProxy
4720 
4721 class RetrieveStatus:
4722     implements(IRetrieveStatus)
4723hunk ./src/allmydata/mutable/retrieve.py 103
4724         self.verinfo = verinfo
4725         # during repair, we may be called upon to grab the private key, since
4726         # it wasn't picked up during a verify=False checker run, and we'll
4727-        # need it for repair to generate the a new version.
4728+        # need it for repair to generate a new version.
4729         self._need_privkey = fetch_privkey
4730         if self._node.get_privkey():
4731             self._need_privkey = False
4732hunk ./src/allmydata/mutable/retrieve.py 108
4733 
4734+        if self._need_privkey:
4735+            # TODO: Evaluate the need for this. We'll use it if we want
4736+            # to limit how many queries are on the wire for the privkey
4737+            # at once.
4738+            self._privkey_query_markers = [] # one Marker for each time we've
4739+                                             # tried to get the privkey.
4740+
4741         self._status = RetrieveStatus()
4742         self._status.set_storage_index(self._storage_index)
4743         self._status.set_helper(False)
4744hunk ./src/allmydata/mutable/retrieve.py 124
4745          offsets_tuple) = self.verinfo
4746         self._status.set_size(datalength)
4747         self._status.set_encoding(k, N)
4748+        self.readers = {}
4749 
4750     def get_status(self):
4751         return self._status
4752hunk ./src/allmydata/mutable/retrieve.py 148
4753         self.remaining_sharemap = DictOfSets()
4754         for (shnum, peerid, timestamp) in shares:
4755             self.remaining_sharemap.add(shnum, peerid)
4756+            # If the servermap update fetched anything, it fetched at least 1
4757+            # KiB, so we ask for that much.
4758+            # TODO: Change the cache methods to allow us to fetch all of the
4759+            # data that they have, then change this method to do that.
4760+            any_cache, timestamp = self._node._read_from_cache(self.verinfo,
4761+                                                               shnum,
4762+                                                               0,
4763+                                                               1000)
4764+            ss = self.servermap.connections[peerid]
4765+            reader = MDMFSlotReadProxy(ss,
4766+                                       self._storage_index,
4767+                                       shnum,
4768+                                       any_cache)
4769+            reader.peerid = peerid
4770+            self.readers[shnum] = reader
4771+
4772 
4773         self.shares = {} # maps shnum to validated blocks
4774hunk ./src/allmydata/mutable/retrieve.py 166
4775+        self._active_readers = [] # list of active readers for this dl.
4776+        self._validated_readers = set() # set of readers that we have
4777+                                        # validated the prefix of
4778+        self._block_hash_trees = {} # shnum => hashtree
4779+        # TODO: Make this into a file-backed consumer or something to
4780+        # conserve memory.
4781+        self._plaintext = ""
4782 
4783         # how many shares do we need?
4784hunk ./src/allmydata/mutable/retrieve.py 175
4785-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4786+        (seqnum,
4787+         root_hash,
4788+         IV,
4789+         segsize,
4790+         datalength,
4791+         k,
4792+         N,
4793+         prefix,
4794          offsets_tuple) = self.verinfo
4795hunk ./src/allmydata/mutable/retrieve.py 184
4796-        assert len(self.remaining_sharemap) >= k
4797-        # we start with the lowest shnums we have available, since FEC is
4798-        # faster if we're using "primary shares"
4799-        self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
4800-        for shnum in self.active_shnums:
4801-            # we use an arbitrary peer who has the share. If shares are
4802-            # doubled up (more than one share per peer), we could make this
4803-            # run faster by spreading the load among multiple peers. But the
4804-            # algorithm to do that is more complicated than I want to write
4805-            # right now, and a well-provisioned grid shouldn't have multiple
4806-            # shares per peer.
4807-            peerid = list(self.remaining_sharemap[shnum])[0]
4808-            self.get_data(shnum, peerid)
4809 
4810hunk ./src/allmydata/mutable/retrieve.py 185
4811-        # control flow beyond this point: state machine. Receiving responses
4812-        # from queries is the input. We might send out more queries, or we
4813-        # might produce a result.
4814 
4815hunk ./src/allmydata/mutable/retrieve.py 186
4816+        # We need one share hash tree for the entire file; its leaves
4817+        # are the roots of the block hash trees for the shares that
4818+        # comprise it, and its root is in the verinfo.
4819+        self.share_hash_tree = hashtree.IncompleteHashTree(N)
4820+        self.share_hash_tree.set_hashes({0: root_hash})
4821+
4822+        # This will set up both the segment decoder and the tail segment
4823+        # decoder, as well as a variety of other instance variables that
4824+        # the download process will use.
4825+        self._setup_encoding_parameters()
4826+        assert len(self.remaining_sharemap) >= k
4827+
4828+        self.log("starting download")
4829+        self._add_active_peers()
4830+        # The download process beyond this is a state machine.
4831+        # _add_active_peers will select the peers that we want to use
4832+        # for the download, and then attempt to start downloading. After
4833+        # each segment, it will check for doneness, reacting to broken
4834+        # peers and corrupt shares as necessary. If it runs out of good
4835+        # peers before downloading all of the segments, _done_deferred
4836+        # will errback.  Otherwise, it will eventually callback with the
4837+        # contents of the mutable file.
4838         return self._done_deferred
4839 
4840hunk ./src/allmydata/mutable/retrieve.py 210
4841-    def get_data(self, shnum, peerid):
4842-        self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
4843-                 shnum=shnum,
4844-                 peerid=idlib.shortnodeid_b2a(peerid),
4845-                 level=log.NOISY)
4846-        ss = self.servermap.connections[peerid]
4847-        started = time.time()
4848-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4849+
4850+    def _setup_encoding_parameters(self):
4851+        """
4852+        I set up the encoding parameters, including k, n, the number
4853+        of segments associated with this file, and the segment decoder.
4854+        """
4855+        (seqnum,
4856+         root_hash,
4857+         IV,
4858+         segsize,
4859+         datalength,
4860+         k,
4861+         n,
4862+         known_prefix,
4863          offsets_tuple) = self.verinfo
4864hunk ./src/allmydata/mutable/retrieve.py 225
4865-        offsets = dict(offsets_tuple)
4866+        self._required_shares = k
4867+        self._total_shares = n
4868+        self._segment_size = segsize
4869+        self._data_length = datalength
4870+        if datalength and segsize:
4871+            self._num_segments = mathutil.div_ceil(datalength, segsize)
4872+            self._tail_data_size = datalength % segsize
4873+        else:
4874+            self._num_segments = 0
4875+            self._tail_data_size = 0
4876 
4877hunk ./src/allmydata/mutable/retrieve.py 236
4878-        # we read the checkstring, to make sure that the data we grab is from
4879-        # the right version.
4880-        readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
4881+        self._segment_decoder = codec.CRSDecoder()
4882+        self._segment_decoder.set_params(segsize, k, n)
4883+        self._current_segment = 0
4884 
4885hunk ./src/allmydata/mutable/retrieve.py 240
4886-        # We also read the data, and the hashes necessary to validate them
4887-        # (share_hash_chain, block_hash_tree, share_data). We don't read the
4888-        # signature or the pubkey, since that was handled during the
4889-        # servermap phase, and we'll be comparing the share hash chain
4890-        # against the roothash that was validated back then.
4891+        if  not self._tail_data_size:
4892+            self._tail_data_size = segsize
4893 
4894hunk ./src/allmydata/mutable/retrieve.py 243
4895-        readv.append( (offsets['share_hash_chain'],
4896-                       offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
4897+        self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
4898+                                                         self._required_shares)
4899+        if self._tail_segment_size == self._segment_size:
4900+            self._tail_decoder = self._segment_decoder
4901+        else:
4902+            self._tail_decoder = codec.CRSDecoder()
4903+            self._tail_decoder.set_params(self._tail_segment_size,
4904+                                          self._required_shares,
4905+                                          self._total_shares)
4906 
4907hunk ./src/allmydata/mutable/retrieve.py 253
4908-        # if we need the private key (for repair), we also fetch that
4909-        if self._need_privkey:
4910-            readv.append( (offsets['enc_privkey'],
4911-                           offsets['EOF'] - offsets['enc_privkey']) )
4912+        self.log("got encoding parameters: "
4913+                 "k: %d "
4914+                 "n: %d "
4915+                 "%d segments of %d bytes each (%d byte tail segment)" % \
4916+                 (k, n, self._num_segments, self._segment_size,
4917+                  self._tail_segment_size))
4918 
4919hunk ./src/allmydata/mutable/retrieve.py 260
4920-        m = Marker()
4921-        self._outstanding_queries[m] = (peerid, shnum, started)
4922+        for i in xrange(self._total_shares):
4923+            # So we don't have to do this later.
4924+            self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
4925 
4926hunk ./src/allmydata/mutable/retrieve.py 264
4927-        # ask the cache first
4928-        got_from_cache = False
4929-        datavs = []
4930-        for (offset, length) in readv:
4931-            (data, timestamp) = self._node._read_from_cache(self.verinfo, shnum,
4932-                                                            offset, length)
4933-            if data is not None:
4934-                datavs.append(data)
4935-        if len(datavs) == len(readv):
4936-            self.log("got data from cache")
4937-            got_from_cache = True
4938-            d = fireEventually({shnum: datavs})
4939-            # datavs is a dict mapping shnum to a pair of strings
4940-        else:
4941-            d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
4942-        self.remaining_sharemap.discard(shnum, peerid)
4943+        # If we have more than one segment, we are an SDMF file, which
4944+        # means that we need to validate the salts as we receive them.
4945+        self._salt_hash_tree = hashtree.IncompleteHashTree(self._num_segments)
4946+        self._salt_hash_tree[0] = IV # from the prefix.
4947 
4948hunk ./src/allmydata/mutable/retrieve.py 269
4949-        d.addCallback(self._got_results, m, peerid, started, got_from_cache)
4950-        d.addErrback(self._query_failed, m, peerid)
4951-        # errors that aren't handled by _query_failed (and errors caused by
4952-        # _query_failed) get logged, but we still want to check for doneness.
4953-        def _oops(f):
4954-            self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
4955-                     shnum=shnum,
4956-                     peerid=idlib.shortnodeid_b2a(peerid),
4957-                     failure=f,
4958-                     level=log.WEIRD, umid="W0xnQA")
4959-        d.addErrback(_oops)
4960-        d.addBoth(self._check_for_done)
4961-        # any error during _check_for_done means the download fails. If the
4962-        # download is successful, _check_for_done will fire _done by itself.
4963-        d.addErrback(self._done)
4964-        d.addErrback(log.err)
4965-        return d # purely for testing convenience
4966 
4967hunk ./src/allmydata/mutable/retrieve.py 270
4968-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
4969-        # isolate the callRemote to a separate method, so tests can subclass
4970-        # Publish and override it
4971-        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
4972-        return d
4973+    def _add_active_peers(self):
4974+        """
4975+        I populate self._active_readers with enough active readers to
4976+        retrieve the contents of this mutable file. I am called before
4977+        downloading starts, and (eventually) after each validation
4978+        error, connection error, or other problem in the download.
4979+        """
4980+        # TODO: It would be cool to investigate other heuristics for
4981+        # reader selection. For instance, the cost (in time the user
4982+        # spends waiting for their file) of selecting a really slow peer
4983+        # that happens to have a primary share is probably more than
4984+        # selecting a really fast peer that doesn't have a primary
4985+        # share. Maybe the servermap could be extended to provide this
4986+        # information; it could keep track of latency information while
4987+        # it gathers more important data, and then this routine could
4988+        # use that to select active readers.
4989+        #
4990+        # (these and other questions would be easier to answer with a
4991+        #  robust, configurable tahoe-lafs simulator, which modeled node
4992+        #  failures, differences in node speed, and other characteristics
4993+        #  that we expect storage servers to have.  You could have
4994+        #  presets for really stable grids (like allmydata.com),
4995+        #  friendnets, make it easy to configure your own settings, and
4996+        #  then simulate the effect of big changes on these use cases
4997+        #  instead of just reasoning about what the effect might be. Out
4998+        #  of scope for MDMF, though.)
4999 
5000hunk ./src/allmydata/mutable/retrieve.py 297
5001-    def remove_peer(self, peerid):
5002-        for shnum in list(self.remaining_sharemap.keys()):
5003-            self.remaining_sharemap.discard(shnum, peerid)
5004+        # We need at least self._required_shares readers to download a
5005+        # segment.
5006+        needed = self._required_shares - len(self._active_readers)
5007+        # XXX: Why don't format= log messages work here?
5008+        self.log("adding %d peers to the active peers list" % needed)
5009 
5010hunk ./src/allmydata/mutable/retrieve.py 303
5011-    def _got_results(self, datavs, marker, peerid, started, got_from_cache):
5012-        now = time.time()
5013-        elapsed = now - started
5014-        if not got_from_cache:
5015-            self._status.add_fetch_timing(peerid, elapsed)
5016-        self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
5017-                 shares=len(datavs),
5018-                 peerid=idlib.shortnodeid_b2a(peerid),
5019-                 level=log.NOISY)
5020-        self._outstanding_queries.pop(marker, None)
5021-        if not self._running:
5022-            return
5023+        # We favor lower numbered shares, since FEC is faster with
5024+        # primary shares than with other shares, and lower-numbered
5025+        # shares are more likely to be primary than higher numbered
5026+        # shares.
5027+        active_shnums = set(sorted(self.remaining_sharemap.keys()))
5028+        # We shouldn't consider adding shares that we already have; this
5029+        # will cause problems later.
5030+        active_shnums -= set([reader.shnum for reader in self._active_readers])
5031+        active_shnums = list(active_shnums)[:needed]
5032+        if len(active_shnums) < needed:
5033+            # We don't have enough readers to retrieve the file; fail.
5034+            return self._failed()
5035 
5036hunk ./src/allmydata/mutable/retrieve.py 316
5037-        # note that we only ask for a single share per query, so we only
5038-        # expect a single share back. On the other hand, we use the extra
5039-        # shares if we get them.. seems better than an assert().
5040+        for shnum in active_shnums:
5041+            self._active_readers.append(self.readers[shnum])
5042+            self.log("added reader for share %d" % shnum)
5043+        assert len(self._active_readers) == self._required_shares
5044+        # Conceptually, this is part of the _add_active_peers step. It
5045+        # validates the prefixes of newly added readers to make sure
5046+        # that they match what we are expecting for self.verinfo. If
5047+        # validation is successful, _validate_active_prefixes will call
5048+        # _download_current_segment for us. If validation is
5049+        # unsuccessful, then _validate_prefixes will remove the peer and
5050+        # call _add_active_peers again, where we will attempt to rectify
5051+        # the problem by choosing another peer.
5052+        return self._validate_active_prefixes()
5053 
5054hunk ./src/allmydata/mutable/retrieve.py 330
5055-        for shnum,datav in datavs.items():
5056-            (prefix, hash_and_data) = datav[:2]
5057-            try:
5058-                self._got_results_one_share(shnum, peerid,
5059-                                            prefix, hash_and_data)
5060-            except CorruptShareError, e:
5061-                # log it and give the other shares a chance to be processed
5062-                f = failure.Failure()
5063-                self.log(format="bad share: %(f_value)s",
5064-                         f_value=str(f.value), failure=f,
5065-                         level=log.WEIRD, umid="7fzWZw")
5066-                self.notify_server_corruption(peerid, shnum, str(e))
5067-                self.remove_peer(peerid)
5068-                self.servermap.mark_bad_share(peerid, shnum, prefix)
5069-                self._bad_shares.add( (peerid, shnum) )
5070-                self._status.problems[peerid] = f
5071-                self._last_failure = f
5072-                pass
5073-            if self._need_privkey and len(datav) > 2:
5074-                lp = None
5075-                self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
5076-        # all done!
5077 
5078hunk ./src/allmydata/mutable/retrieve.py 331
5079-    def notify_server_corruption(self, peerid, shnum, reason):
5080-        ss = self.servermap.connections[peerid]
5081-        ss.callRemoteOnly("advise_corrupt_share",
5082-                          "mutable", self._storage_index, shnum, reason)
5083+    def _validate_active_prefixes(self):
5084+        """
5085+        I check to make sure that the prefixes on the peers that I am
5086+        currently reading from match the prefix that we want to see, as
5087+        said in self.verinfo.
5088 
5089hunk ./src/allmydata/mutable/retrieve.py 337
5090-    def _got_results_one_share(self, shnum, peerid,
5091-                               got_prefix, got_hash_and_data):
5092-        self.log("_got_results: got shnum #%d from peerid %s"
5093-                 % (shnum, idlib.shortnodeid_b2a(peerid)))
5094-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5095+        If I find that all of the active peers have acceptable prefixes,
5096+        I pass control to _download_current_segment, which will use
5097+        those peers to do cool things. If I find that some of the active
5098+        peers have unacceptable prefixes, I will remove them from active
5099+        peers (and from further consideration) and call
5100+        _add_active_peers to attempt to rectify the situation. I keep
5101+        track of which peers I have already validated so that I don't
5102+        need to do so again.
5103+        """
5104+        assert self._active_readers, "No more active readers"
5105+
5106+        ds = []
5107+        new_readers = set(self._active_readers) - self._validated_readers
5108+        self.log('validating %d newly-added active readers' % len(new_readers))
5109+
5110+        for reader in new_readers:
5111+            # We force a remote read here -- otherwise, we are relying
5112+            # on cached data that we already verified as valid, and we
5113+            # won't detect an uncoordinated write that has occurred
5114+            # since the last servermap update.
5115+            d = reader.get_prefix(force_remote=True)
5116+            d.addCallback(self._try_to_validate_prefix, reader)
5117+            ds.append(d)
5118+        dl = defer.DeferredList(ds, consumeErrors=True)
5119+        def _check_results(results):
5120+            # Each result in results will be of the form (success, msg).
5121+            # We don't care about msg, but success will tell us whether
5122+            # or not the checkstring validated. If it didn't, we need to
5123+            # remove the offending (peer,share) from our active readers,
5124+            # and ensure that active readers is again populated.
5125+            bad_readers = []
5126+            for i, result in enumerate(results):
5127+                if not result[0]:
5128+                    reader = self._active_readers[i]
5129+                    f = result[1]
5130+                    assert isinstance(f, failure.Failure)
5131+
5132+                    self.log("The reader %s failed to "
5133+                             "properly validate: %s" % \
5134+                             (reader, str(f.value)))
5135+                    bad_readers.append((reader, f))
5136+                else:
5137+                    reader = self._active_readers[i]
5138+                    self.log("the reader %s checks out, so we'll use it" % \
5139+                             reader)
5140+                    self._validated_readers.add(reader)
5141+                    # Each time we validate a reader, we check to see if
5142+                    # we need the private key. If we do, we politely ask
5143+                    # for it and then continue computing. If we find
5144+                    # that we haven't gotten it at the end of
5145+                    # segment decoding, then we'll take more drastic
5146+                    # measures.
5147+                    if self._need_privkey:
5148+                        d = reader.get_encprivkey()
5149+                        d.addCallback(self._try_to_validate_privkey, reader)
5150+            if bad_readers:
5151+                # We do them all at once, or else we screw up list indexing.
5152+                for (reader, f) in bad_readers:
5153+                    self._mark_bad_share(reader, f)
5154+                return self._add_active_peers()
5155+            else:
5156+                return self._download_current_segment()
5157+            # The next step will assert that it has enough active
5158+            # readers to fetch shares; we just need to remove it.
5159+        dl.addCallback(_check_results)
5160+        return dl
5161+
5162+
5163+    def _try_to_validate_prefix(self, prefix, reader):
5164+        """
5165+        I check that the prefix returned by a candidate server for
5166+        retrieval matches the prefix that the servermap knows about
5167+        (and, hence, the prefix that was validated earlier). If it does,
5168+        I return True, which means that I approve of the use of the
5169+        candidate server for segment retrieval. If it doesn't, I return
5170+        False, which means that another server must be chosen.
5171+        """
5172+        (seqnum,
5173+         root_hash,
5174+         IV,
5175+         segsize,
5176+         datalength,
5177+         k,
5178+         N,
5179+         known_prefix,
5180          offsets_tuple) = self.verinfo
5181hunk ./src/allmydata/mutable/retrieve.py 423
5182-        assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
5183-        if got_prefix != prefix:
5184-            msg = "someone wrote to the data since we read the servermap: prefix changed"
5185-            raise UncoordinatedWriteError(msg)
5186-        (share_hash_chain, block_hash_tree,
5187-         share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
5188+        if known_prefix != prefix:
5189+            self.log("prefix from share %d doesn't match" % reader.shnum)
5190+            raise UncoordinatedWriteError("Mismatched prefix -- this could "
5191+                                          "indicate an uncoordinated write")
5192+        # Otherwise, we're okay -- no issues.
5193 
5194hunk ./src/allmydata/mutable/retrieve.py 429
5195-        assert isinstance(share_data, str)
5196-        # build the block hash tree. SDMF has only one leaf.
5197-        leaves = [hashutil.block_hash(share_data)]
5198-        t = hashtree.HashTree(leaves)
5199-        if list(t) != block_hash_tree:
5200-            raise CorruptShareError(peerid, shnum, "block hash tree failure")
5201-        share_hash_leaf = t[0]
5202-        t2 = hashtree.IncompleteHashTree(N)
5203-        # root_hash was checked by the signature
5204-        t2.set_hashes({0: root_hash})
5205-        try:
5206-            t2.set_hashes(hashes=share_hash_chain,
5207-                          leaves={shnum: share_hash_leaf})
5208-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
5209-                IndexError), e:
5210-            msg = "corrupt hashes: %s" % (e,)
5211-            raise CorruptShareError(peerid, shnum, msg)
5212-        self.log(" data valid! len=%d" % len(share_data))
5213-        # each query comes down to this: placing validated share data into
5214-        # self.shares
5215-        self.shares[shnum] = share_data
5216 
5217hunk ./src/allmydata/mutable/retrieve.py 430
5218-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
5219+    def _remove_reader(self, reader):
5220+        """
5221+        At various points, we will wish to remove a peer from
5222+        consideration and/or use. These include, but are not necessarily
5223+        limited to:
5224 
5225hunk ./src/allmydata/mutable/retrieve.py 436
5226-        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
5227-        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
5228-        if alleged_writekey != self._node.get_writekey():
5229-            self.log("invalid privkey from %s shnum %d" %
5230-                     (idlib.nodeid_b2a(peerid)[:8], shnum),
5231-                     parent=lp, level=log.WEIRD, umid="YIw4tA")
5232-            return
5233+            - A connection error.
5234+            - A mismatched prefix (that is, a prefix that does not match
5235+              our conception of the version information string).
5236+            - A failing block hash, salt hash, or share hash, which can
5237+              indicate disk failure/bit flips, or network trouble.
5238 
5239hunk ./src/allmydata/mutable/retrieve.py 442
5240-        # it's good
5241-        self.log("got valid privkey from shnum %d on peerid %s" %
5242-                 (shnum, idlib.shortnodeid_b2a(peerid)),
5243-                 parent=lp)
5244-        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
5245-        self._node._populate_encprivkey(enc_privkey)
5246-        self._node._populate_privkey(privkey)
5247-        self._need_privkey = False
5248+        This method will do that. I will make sure that the
5249+        (shnum,reader) combination represented by my reader argument is
5250+        not used for anything else during this download. I will not
5251+        advise the reader of any corruption, something that my callers
5252+        may wish to do on their own.
5253+        """
5254+        # TODO: When you're done writing this, see if this is ever
5255+        # actually used for something that _mark_bad_share isn't. I have
5256+        # a feeling that they will be used for very similar things, and
5257+        # that having them both here is just going to be an epic amount
5258+        # of code duplication.
5259+        #
5260+        # (well, okay, not epic, but meaningful)
5261+        self.log("removing reader %s" % reader)
5262+        # Remove the reader from _active_readers
5263+        self._active_readers.remove(reader)
5264+        # TODO: self.readers.remove(reader)?
5265+        for shnum in list(self.remaining_sharemap.keys()):
5266+            self.remaining_sharemap.discard(shnum, reader.peerid)
5267 
5268hunk ./src/allmydata/mutable/retrieve.py 462
5269-    def _query_failed(self, f, marker, peerid):
5270-        self.log(format="query to [%(peerid)s] failed",
5271-                 peerid=idlib.shortnodeid_b2a(peerid),
5272-                 level=log.NOISY)
5273-        self._status.problems[peerid] = f
5274-        self._outstanding_queries.pop(marker, None)
5275-        if not self._running:
5276-            return
5277-        self._last_failure = f
5278-        self.remove_peer(peerid)
5279-        level = log.WEIRD
5280-        if f.check(DeadReferenceError):
5281-            level = log.UNUSUAL
5282-        self.log(format="error during query: %(f_value)s",
5283-                 f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
5284 
5285hunk ./src/allmydata/mutable/retrieve.py 463
5286-    def _check_for_done(self, res):
5287-        # exit paths:
5288-        #  return : keep waiting, no new queries
5289-        #  return self._send_more_queries(outstanding) : send some more queries
5290-        #  fire self._done(plaintext) : download successful
5291-        #  raise exception : download fails
5292+    def _mark_bad_share(self, reader, f):
5293+        """
5294+        I mark the (peerid, shnum) encapsulated by my reader argument as
5295+        a bad share, which means that it will not be used anywhere else.
5296 
5297hunk ./src/allmydata/mutable/retrieve.py 468
5298-        self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
5299-                 running=self._running, decoding=self._decoding,
5300-                 level=log.NOISY)
5301-        if not self._running:
5302-            return
5303-        if self._decoding:
5304-            return
5305-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5306-         offsets_tuple) = self.verinfo
5307+        There are several reasons to want to mark something as a bad
5308+        share. These include:
5309 
5310hunk ./src/allmydata/mutable/retrieve.py 471
5311-        if len(self.shares) < k:
5312-            # we don't have enough shares yet
5313-            return self._maybe_send_more_queries(k)
5314-        if self._need_privkey:
5315-            # we got k shares, but none of them had a valid privkey. TODO:
5316-            # look further. Adding code to do this is a bit complicated, and
5317-            # I want to avoid that complication, and this should be pretty
5318-            # rare (k shares with bitflips in the enc_privkey but not in the
5319-            # data blocks). If we actually do get here, the subsequent repair
5320-            # will fail for lack of a privkey.
5321-            self.log("got k shares but still need_privkey, bummer",
5322-                     level=log.WEIRD, umid="MdRHPA")
5323+            - A connection error to the peer.
5324+            - A mismatched prefix (that is, a prefix that does not match
5325+              our local conception of the version information string).
5326+            - A failing block hash, salt hash, share hash, or other
5327+              integrity check.
5328 
5329hunk ./src/allmydata/mutable/retrieve.py 477
5330-        # we have enough to finish. All the shares have had their hashes
5331-        # checked, so if something fails at this point, we don't know how
5332-        # to fix it, so the download will fail.
5333+        This method will ensure that readers that we wish to mark bad
5334+        (for these reasons or other reasons) are not used for the rest
5335+        of the download. Additionally, it will attempt to tell the
5336+        remote peer (with no guarantee of success) that its share is
5337+        corrupt.
5338+        """
5339+        self.log("marking share %d on server %s as bad" % \
5340+                 (reader.shnum, reader))
5341+        self._remove_reader(reader)
5342+        self._bad_shares.add((reader.peerid, reader.shnum))
5343+        self._status.problems[reader.peerid] = f
5344+        self._last_failure = f
5345+        self.notify_server_corruption(reader.peerid, reader.shnum,
5346+                                      str(f.value))
5347 
5348hunk ./src/allmydata/mutable/retrieve.py 492
5349-        self._decoding = True # avoid reentrancy
5350-        self._status.set_status("decoding")
5351-        now = time.time()
5352-        elapsed = now - self._started
5353-        self._status.timings["fetch"] = elapsed
5354 
5355hunk ./src/allmydata/mutable/retrieve.py 493
5356-        d = defer.maybeDeferred(self._decode)
5357-        d.addCallback(self._decrypt, IV, self._node.get_readkey())
5358-        d.addBoth(self._done)
5359-        return d # purely for test convenience
5360+    def _download_current_segment(self):
5361+        """
5362+        I download, validate, decode, decrypt, and assemble the segment
5363+        that this Retrieve is currently responsible for downloading.
5364+        """
5365+        assert len(self._active_readers) >= self._required_shares
5366+        if self._current_segment < self._num_segments:
5367+            d = self._process_segment(self._current_segment)
5368+        else:
5369+            d = defer.succeed(None)
5370+        d.addCallback(self._check_for_done)
5371+        return d
5372 
5373hunk ./src/allmydata/mutable/retrieve.py 506
5374-    def _maybe_send_more_queries(self, k):
5375-        # we don't have enough shares yet. Should we send out more queries?
5376-        # There are some number of queries outstanding, each for a single
5377-        # share. If we can generate 'needed_shares' additional queries, we do
5378-        # so. If we can't, then we know this file is a goner, and we raise
5379-        # NotEnoughSharesError.
5380-        self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
5381-                         "outstanding=%(outstanding)d"),
5382-                 have=len(self.shares), k=k,
5383-                 outstanding=len(self._outstanding_queries),
5384-                 level=log.NOISY)
5385 
5386hunk ./src/allmydata/mutable/retrieve.py 507
5387-        remaining_shares = k - len(self.shares)
5388-        needed = remaining_shares - len(self._outstanding_queries)
5389-        if not needed:
5390-            # we have enough queries in flight already
5391+    def _process_segment(self, segnum):
5392+        """
5393+        I download, validate, decode, and decrypt one segment of the
5394+        file that this Retrieve is retrieving. This means coordinating
5395+        the process of getting k blocks of that file, validating them,
5396+        assembling them into one segment with the decoder, and then
5397+        decrypting them.
5398+        """
5399+        self.log("processing segment %d" % segnum)
5400 
5401hunk ./src/allmydata/mutable/retrieve.py 517
5402-            # TODO: but if they've been in flight for a long time, and we
5403-            # have reason to believe that new queries might respond faster
5404-            # (i.e. we've seen other queries come back faster, then consider
5405-            # sending out new queries. This could help with peers which have
5406-            # silently gone away since the servermap was updated, for which
5407-            # we're still waiting for the 15-minute TCP disconnect to happen.
5408-            self.log("enough queries are in flight, no more are needed",
5409-                     level=log.NOISY)
5410-            return
5411+        # TODO: The old code uses a marker. Should this code do that
5412+        # too? What did the Marker do?
5413+        assert len(self._active_readers) >= self._required_shares
5414+
5415+        # We need to ask each of our active readers for its block and
5416+        # salt. We will then validate those. If validation is
5417+        # successful, we will assemble the results into plaintext.
5418+        ds = []
5419+        for reader in self._active_readers:
5420+            d = reader.get_block_and_salt(segnum, queue=True)
5421+            d2 = self._get_needed_hashes(reader, segnum)
5422+            dl = defer.DeferredList([d, d2], consumeErrors=True)
5423+            dl.addCallback(self._validate_block, segnum, reader)
5424+            dl.addErrback(self._validation_or_decoding_failed, [reader])
5425+            ds.append(dl)
5426+            reader.flush()
5427+        dl = defer.DeferredList(ds)
5428+        dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
5429+        return dl
5430 
5431hunk ./src/allmydata/mutable/retrieve.py 537
5432-        outstanding_shnums = set([shnum
5433-                                  for (peerid, shnum, started)
5434-                                  in self._outstanding_queries.values()])
5435-        # prefer low-numbered shares, they are more likely to be primary
5436-        available_shnums = sorted(self.remaining_sharemap.keys())
5437-        for shnum in available_shnums:
5438-            if shnum in outstanding_shnums:
5439-                # skip ones that are already in transit
5440-                continue
5441-            if shnum not in self.remaining_sharemap:
5442-                # no servers for that shnum. note that DictOfSets removes
5443-                # empty sets from the dict for us.
5444-                continue
5445-            peerid = list(self.remaining_sharemap[shnum])[0]
5446-            # get_data will remove that peerid from the sharemap, and add the
5447-            # query to self._outstanding_queries
5448-            self._status.set_status("Retrieving More Shares")
5449-            self.get_data(shnum, peerid)
5450-            needed -= 1
5451-            if not needed:
5452+
5453+    def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
5454+        """
5455+        I take the results of fetching and validating the blocks from a
5456+        callback chain in another method. If the results are such that
5457+        they tell me that validation and fetching succeeded without
5458+        incident, I will proceed with decoding and decryption.
5459+        Otherwise, I will do nothing.
5460+        """
5461+        self.log("trying to decode and decrypt segment %d" % segnum)
5462+        failures = False
5463+        for block_and_salt in blocks_and_salts:
5464+            if not block_and_salt[0] or block_and_salt[1] == None:
5465+                self.log("some validation operations failed; not proceeding")
5466+                failures = True
5467                 break
5468hunk ./src/allmydata/mutable/retrieve.py 553
5469+        if not failures:
5470+            self.log("everything looks ok, building segment %d" % segnum)
5471+            d = self._decode_blocks(blocks_and_salts, segnum)
5472+            d.addCallback(self._decrypt_segment)
5473+            d.addErrback(self._validation_or_decoding_failed,
5474+                         self._active_readers)
5475+            d.addCallback(self._set_segment)
5476+            return d
5477+        else:
5478+            return defer.succeed(None)
5479+
5480+
5481+    def _set_segment(self, segment):
5482+        """
5483+        Given a plaintext segment, I register that segment with the
5484+        target that is handling the file download.
5485+        """
5486+        self.log("got plaintext for segment %d" % self._current_segment)
5487+        self._plaintext += segment
5488+        self._current_segment += 1
5489 
5490hunk ./src/allmydata/mutable/retrieve.py 574
5491-        # at this point, we have as many outstanding queries as we can. If
5492-        # needed!=0 then we might not have enough to recover the file.
5493-        if needed:
5494-            format = ("ran out of peers: "
5495-                      "have %(have)d shares (k=%(k)d), "
5496-                      "%(outstanding)d queries in flight, "
5497-                      "need %(need)d more, "
5498-                      "found %(bad)d bad shares")
5499-            args = {"have": len(self.shares),
5500-                    "k": k,
5501-                    "outstanding": len(self._outstanding_queries),
5502-                    "need": needed,
5503-                    "bad": len(self._bad_shares),
5504-                    }
5505-            self.log(format=format,
5506-                     level=log.WEIRD, umid="ezTfjw", **args)
5507-            err = NotEnoughSharesError("%s, last failure: %s" %
5508-                                      (format % args, self._last_failure))
5509-            if self._bad_shares:
5510-                self.log("We found some bad shares this pass. You should "
5511-                         "update the servermap and try again to check "
5512-                         "more peers",
5513-                         level=log.WEIRD, umid="EFkOlA")
5514-                err.servermap = self.servermap
5515-            raise err
5516 
5517hunk ./src/allmydata/mutable/retrieve.py 575
5518+    def _validation_or_decoding_failed(self, f, readers):
5519+        """
5520+        I am called when a block or a salt fails to correctly validate, or when
5521+        the decryption or decoding operation fails for some reason.  I react to
5522+        this failure by notifying the remote server of corruption, and then
5523+        removing the remote peer from further activity.
5524+        """
5525+        assert isinstance(readers, list)
5526+        bad_shnums = [reader.shnum for reader in readers]
5527+
5528+        self.log("validation or decoding failed on share(s) %s, peer(s) %s "
5529+                 ", segment %d: %s" % \
5530+                 (bad_shnums, readers, self._current_segment, str(f)))
5531+        for reader in readers:
5532+            self._mark_bad_share(reader, f)
5533         return
5534 
5535hunk ./src/allmydata/mutable/retrieve.py 592
5536-    def _decode(self):
5537-        started = time.time()
5538-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5539-         offsets_tuple) = self.verinfo
5540 
5541hunk ./src/allmydata/mutable/retrieve.py 593
5542-        # shares_dict is a dict mapping shnum to share data, but the codec
5543-        # wants two lists.
5544-        shareids = []; shares = []
5545-        for shareid, share in self.shares.items():
5546+    def _validate_block(self, results, segnum, reader):
5547+        """
5548+        I validate a block from one share on a remote server.
5549+        """
5550+        # Grab the part of the block hash tree that is necessary to
5551+        # validate this block, then generate the block hash root.
5552+        self.log("validating share %d for segment %d" % (reader.shnum,
5553+                                                             segnum))
5554+        # Did we fail to fetch either of the things that we were
5555+        # supposed to? Fail if so.
5556+        if not results[0][0] and results[1][0]:
5557+            # handled by the errback handler.
5558+
5559+            # These all get batched into one query, so the resulting
5560+            # failure should be the same for all of them, so we can just
5561+            # use the first one.
5562+            assert isinstance(results[0][1], failure.Failure)
5563+
5564+            f = results[0][1]
5565+            raise CorruptShareError(reader.peerid,
5566+                                    reader.shnum,
5567+                                    "Connection error: %s" % str(f))
5568+
5569+        block_and_salt, block_and_sharehashes = results
5570+        block, salt = block_and_salt[1]
5571+        blockhashes, sharehashes = block_and_sharehashes[1]
5572+
5573+        blockhashes = dict(enumerate(blockhashes[1]))
5574+        self.log("the reader gave me the following blockhashes: %s" % \
5575+                 blockhashes.keys())
5576+        self.log("the reader gave me the following sharehashes: %s" % \
5577+                 sharehashes[1].keys())
5578+        bht = self._block_hash_trees[reader.shnum]
5579+
5580+        if bht.needed_hashes(segnum, include_leaf=True):
5581+            try:
5582+                bht.set_hashes(blockhashes)
5583+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
5584+                    IndexError), e:
5585+                raise CorruptShareError(reader.peerid,
5586+                                        reader.shnum,
5587+                                        "block hash tree failure: %s" % e)
5588+
5589+        blockhash = hashutil.block_hash(block)
5590+        # If this works without an error, then validation is
5591+        # successful.
5592+        try:
5593+           bht.set_hashes(leaves={segnum: blockhash})
5594+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
5595+                IndexError), e:
5596+            raise CorruptShareError(reader.peerid,
5597+                                    reader.shnum,
5598+                                    "block hash tree failure: %s" % e)
5599+
5600+        # Reaching this point means that we know that this segment
5601+        # is correct. Now we need to check to see whether the share
5602+        # hash chain is also correct.
5603+        # SDMF wrote share hash chains that didn't contain the
5604+        # leaves, which would be produced from the block hash tree.
5605+        # So we need to validate the block hash tree first. If
5606+        # successful, then bht[0] will contain the root for the
5607+        # shnum, which will be a leaf in the share hash tree, which
5608+        # will allow us to validate the rest of the tree.
5609+        if self.share_hash_tree.needed_hashes(reader.shnum,
5610+                                               include_leaf=True):
5611+            try:
5612+                self.share_hash_tree.set_hashes(hashes=sharehashes[1],
5613+                                            leaves={reader.shnum: bht[0]})
5614+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
5615+                    IndexError), e:
5616+                raise CorruptShareError(reader.peerid,
5617+                                        reader.shnum,
5618+                                        "corrupt hashes: %s" % e)
5619+
5620+        # TODO: Validate the salt, too.
5621+        self.log('share %d is valid for segment %d' % (reader.shnum,
5622+                                                       segnum))
5623+        return {reader.shnum: (block, salt)}
5624+
5625+
5626+    def _get_needed_hashes(self, reader, segnum):
5627+        """
5628+        I get the hashes needed to validate segnum from the reader, then return
5629+        to my caller when this is done.
5630+        """
5631+        bht = self._block_hash_trees[reader.shnum]
5632+        needed = bht.needed_hashes(segnum, include_leaf=True)
5633+        # The root of the block hash tree is also a leaf in the share
5634+        # hash tree. So we don't need to fetch it from the remote
5635+        # server. In the case of files with one segment, this means that
5636+        # we won't fetch any block hash tree from the remote server,
5637+        # since the hash of each share of the file is the entire block
5638+        # hash tree, and is a leaf in the share hash tree. This is fine,
5639+        # since any share corruption will be detected in the share hash
5640+        # tree.
5641+        #needed.discard(0)
5642+        self.log("getting blockhashes for segment %d, share %d: %s" % \
5643+                 (segnum, reader.shnum, str(needed)))
5644+        d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
5645+        if self.share_hash_tree.needed_hashes(reader.shnum):
5646+            need = self.share_hash_tree.needed_hashes(reader.shnum)
5647+            self.log("also need sharehashes for share %d: %s" % (reader.shnum,
5648+                                                                 str(need)))
5649+            d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
5650+        else:
5651+            d2 = defer.succeed({}) # the logic in the next method
5652+                                   # expects a dict
5653+        dl = defer.DeferredList([d1, d2], consumeErrors=True)
5654+        return dl
5655+
5656+
5657+    def _decode_blocks(self, blocks_and_salts, segnum):
5658+        """
5659+        I take a list of k blocks and salts, and decode that into a
5660+        single encrypted segment.
5661+        """
5662+        d = {}
5663+        # We want to merge our dictionaries to the form
5664+        # {shnum: blocks_and_salts}
5665+        #
5666+        # The dictionaries come from validate block that way, so we just
5667+        # need to merge them.
5668+        for block_and_salt in blocks_and_salts:
5669+            d.update(block_and_salt[1])
5670+
5671+        # All of these blocks should have the same salt; in SDMF, it is
5672+        # the file-wide IV, while in MDMF it is the per-segment salt. In
5673+        # either case, we just need to get one of them and use it.
5674+        #
5675+        # d.items()[0] is like (shnum, (block, salt))
5676+        # d.items()[0][1] is like (block, salt)
5677+        # d.items()[0][1][1] is the salt.
5678+        salt = d.items()[0][1][1]
5679+        # Next, extract just the blocks from the dict. We'll use the
5680+        # salt in the next step.
5681+        share_and_shareids = [(k, v[0]) for k, v in d.items()]
5682+        d2 = dict(share_and_shareids)
5683+        shareids = []
5684+        shares = []
5685+        for shareid, share in d2.items():
5686             shareids.append(shareid)
5687             shares.append(share)
5688 
5689hunk ./src/allmydata/mutable/retrieve.py 736
5690-        assert len(shareids) >= k, len(shareids)
5691+        assert len(shareids) >= self._required_shares, len(shareids)
5692         # zfec really doesn't want extra shares
5693hunk ./src/allmydata/mutable/retrieve.py 738
5694-        shareids = shareids[:k]
5695-        shares = shares[:k]
5696-
5697-        fec = codec.CRSDecoder()
5698-        fec.set_params(segsize, k, N)
5699-
5700-        self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
5701-        self.log("about to decode, shareids=%s" % (shareids,))
5702-        d = defer.maybeDeferred(fec.decode, shares, shareids)
5703-        def _done(buffers):
5704-            self._status.timings["decode"] = time.time() - started
5705-            self.log(" decode done, %d buffers" % len(buffers))
5706+        shareids = shareids[:self._required_shares]
5707+        shares = shares[:self._required_shares]
5708+        self.log("decoding segment %d" % segnum)
5709+        if segnum == self._num_segments - 1:
5710+            d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
5711+        else:
5712+            d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
5713+        def _process(buffers):
5714             segment = "".join(buffers)
5715hunk ./src/allmydata/mutable/retrieve.py 747
5716+            self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
5717+                     segnum=segnum,
5718+                     numsegs=self._num_segments,
5719+                     level=log.NOISY)
5720             self.log(" joined length %d, datalength %d" %
5721hunk ./src/allmydata/mutable/retrieve.py 752
5722-                     (len(segment), datalength))
5723-            segment = segment[:datalength]
5724+                     (len(segment), self._data_length))
5725+            if segnum == self._num_segments - 1:
5726+                size_to_use = self._tail_data_size
5727+            else:
5728+                size_to_use = self._segment_size
5729+            segment = segment[:size_to_use]
5730             self.log(" segment len=%d" % len(segment))
5731hunk ./src/allmydata/mutable/retrieve.py 759
5732-            return segment
5733-        def _err(f):
5734-            self.log(" decode failed: %s" % f)
5735-            return f
5736-        d.addCallback(_done)
5737-        d.addErrback(_err)
5738+            return segment, salt
5739+        d.addCallback(_process)
5740         return d
5741 
5742hunk ./src/allmydata/mutable/retrieve.py 763
5743-    def _decrypt(self, crypttext, IV, readkey):
5744+
5745+    def _decrypt_segment(self, segment_and_salt):
5746+        """
5747+        I take a single segment and its salt, and decrypt it. I return
5748+        the plaintext of the segment that is in my argument.
5749+        """
5750+        segment, salt = segment_and_salt
5751         self._status.set_status("decrypting")
5752hunk ./src/allmydata/mutable/retrieve.py 771
5753+        self.log("decrypting segment %d" % self._current_segment)
5754         started = time.time()
5755hunk ./src/allmydata/mutable/retrieve.py 773
5756-        key = hashutil.ssk_readkey_data_hash(IV, readkey)
5757+        key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
5758         decryptor = AES(key)
5759hunk ./src/allmydata/mutable/retrieve.py 775
5760-        plaintext = decryptor.process(crypttext)
5761+        plaintext = decryptor.process(segment)
5762         self._status.timings["decrypt"] = time.time() - started
5763         return plaintext
5764 
5765hunk ./src/allmydata/mutable/retrieve.py 779
5766-    def _done(self, res):
5767-        if not self._running:
5768+
5769+    def notify_server_corruption(self, peerid, shnum, reason):
5770+        ss = self.servermap.connections[peerid]
5771+        ss.callRemoteOnly("advise_corrupt_share",
5772+                          "mutable", self._storage_index, shnum, reason)
5773+
5774+
5775+    def _try_to_validate_privkey(self, enc_privkey, reader):
5776+
5777+        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
5778+        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
5779+        if alleged_writekey != self._node.get_writekey():
5780+            self.log("invalid privkey from %s shnum %d" %
5781+                     (reader, reader.shnum),
5782+                     level=log.WEIRD, umid="YIw4tA")
5783             return
5784hunk ./src/allmydata/mutable/retrieve.py 795
5785-        self._running = False
5786-        self._status.set_active(False)
5787-        self._status.timings["total"] = time.time() - self._started
5788-        # res is either the new contents, or a Failure
5789-        if isinstance(res, failure.Failure):
5790-            self.log("Retrieve done, with failure", failure=res,
5791-                     level=log.UNUSUAL)
5792-            self._status.set_status("Failed")
5793-        else:
5794-            self.log("Retrieve done, success!")
5795-            self._status.set_status("Finished")
5796-            self._status.set_progress(1.0)
5797-            # remember the encoding parameters, use them again next time
5798-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5799-             offsets_tuple) = self.verinfo
5800-            self._node._populate_required_shares(k)
5801-            self._node._populate_total_shares(N)
5802-        eventually(self._done_deferred.callback, res)
5803 
5804hunk ./src/allmydata/mutable/retrieve.py 796
5805+        # it's good
5806+        self.log("got valid privkey from shnum %d on reader %s" %
5807+                 (reader.shnum, reader))
5808+        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
5809+        self._node._populate_encprivkey(enc_privkey)
5810+        self._node._populate_privkey(privkey)
5811+        self._need_privkey = False
5812+
5813+
5814+    def _check_for_done(self, res):
5815+        """
5816+        I check to see if this Retrieve object has successfully finished
5817+        its work.
5818+
5819+        I can exit in the following ways:
5820+            - If there are no more segments to download, then I exit by
5821+              causing self._done_deferred to fire with the plaintext
5822+              content requested by the caller.
5823+            - If there are still segments to be downloaded, and there
5824+              are enough active readers (readers which have not broken
5825+              and have not given us corrupt data) to continue
5826+              downloading, I send control back to
5827+              _download_current_segment.
5828+            - If there are still segments to be downloaded but there are
5829+              not enough active peers to download them, I ask
5830+              _add_active_peers to add more peers. If it is successful,
5831+              it will call _download_current_segment. If there are not
5832+              enough peers to retrieve the file, then that will cause
5833+              _done_deferred to errback.
5834+        """
5835+        self.log("checking for doneness")
5836+        if self._current_segment == self._num_segments:
5837+            # No more segments to download, we're done.
5838+            self.log("got plaintext, done")
5839+            return self._done()
5840+
5841+        if len(self._active_readers) >= self._required_shares:
5842+            # More segments to download, but we have enough good peers
5843+            # in self._active_readers that we can do that without issue,
5844+            # so go nab the next segment.
5845+            self.log("not done yet: on segment %d of %d" % \
5846+                     (self._current_segment + 1, self._num_segments))
5847+            return self._download_current_segment()
5848+
5849+        self.log("not done yet: on segment %d of %d, need to add peers" % \
5850+                 (self._current_segment + 1, self._num_segments))
5851+        return self._add_active_peers()
5852+
5853+
5854+    def _done(self):
5855+        """
5856+        I am called by _check_for_done when the download process has
5857+        finished successfully. After making some useful logging
5858+        statements, I return the decrypted contents to the owner of this
5859+        Retrieve object through self._done_deferred.
5860+        """
5861+        eventually(self._done_deferred.callback, self._plaintext)
5862+
5863+
5864+    def _failed(self):
5865+        """
5866+        I am called by _add_active_peers when there are not enough
5867+        active peers left to complete the download. After making some
5868+        useful logging statements, I return an exception to that effect
5869+        to the caller of this Retrieve object through
5870+        self._done_deferred.
5871+        """
5872+        format = ("ran out of peers: "
5873+                  "have %(have)d of %(total)d segments "
5874+                  "found %(bad)d bad shares "
5875+                  "encoding %(k)d-of-%(n)d")
5876+        args = {"have": self._current_segment,
5877+                "total": self._num_segments,
5878+                "k": self._required_shares,
5879+                "n": self._total_shares,
5880+                "bad": len(self._bad_shares)}
5881+        e = NotEnoughSharesError("%s, last failure: %s" % (format % args,
5882+                                                        str(self._last_failure)))
5883+        f = failure.Failure(e)
5884+        eventually(self._done_deferred.callback, f)
5885hunk ./src/allmydata/test/test_mutable.py 12
5886 from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
5887      ssk_pubkey_fingerprint_hash
5888 from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
5889-     NotEnoughSharesError
5890+     NotEnoughSharesError, SDMF_VERSION, MDMF_VERSION
5891 from allmydata.monitor import Monitor
5892 from allmydata.test.common import ShouldFailMixin
5893 from allmydata.test.no_network import GridTestMixin
5894hunk ./src/allmydata/test/test_mutable.py 28
5895 from allmydata.mutable.retrieve import Retrieve
5896 from allmydata.mutable.publish import Publish
5897 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
5898-from allmydata.mutable.layout import unpack_header, unpack_share
5899+from allmydata.mutable.layout import unpack_header, unpack_share, \
5900+                                     MDMFSlotReadProxy
5901 from allmydata.mutable.repairer import MustForceRepairError
5902 
5903 import allmydata.test.common_util as testutil
5904hunk ./src/allmydata/test/test_mutable.py 104
5905         d = fireEventually()
5906         d.addCallback(lambda res: _call())
5907         return d
5908+
5909     def callRemoteOnly(self, methname, *args, **kwargs):
5910         d = self.callRemote(methname, *args, **kwargs)
5911         d.addBoth(lambda ignore: None)
5912hunk ./src/allmydata/test/test_mutable.py 163
5913 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
5914     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
5915     # list of shnums to corrupt.
5916+    ds = []
5917     for peerid in s._peers:
5918         shares = s._peers[peerid]
5919         for shnum in shares:
5920hunk ./src/allmydata/test/test_mutable.py 313
5921         d.addCallback(_created)
5922         return d
5923 
5924+
5925+    def test_upload_and_download_mdmf(self):
5926+        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
5927+        def _created(n):
5928+            d = defer.succeed(None)
5929+            d.addCallback(lambda ignored:
5930+                n.get_servermap(MODE_READ))
5931+            def _then(servermap):
5932+                dumped = servermap.dump(StringIO())
5933+                self.failUnlessIn("3-of-10", dumped.getvalue())
5934+            d.addCallback(_then)
5935+            # Now overwrite the contents with some new contents. We want
5936+            # to make them big enough to force the file to be uploaded
5937+            # in more than one segment.
5938+            big_contents = "contents1" * 100000 # about 900 KiB
5939+            d.addCallback(lambda ignored:
5940+                n.overwrite(big_contents))
5941+            d.addCallback(lambda ignored:
5942+                n.download_best_version())
5943+            d.addCallback(lambda data:
5944+                self.failUnlessEqual(data, big_contents))
5945+            # Overwrite the contents again with some new contents. As
5946+            # before, they need to be big enough to force multiple
5947+            # segments, so that we make the downloader deal with
5948+            # multiple segments.
5949+            bigger_contents = "contents2" * 1000000 # about 9MiB
5950+            d.addCallback(lambda ignored:
5951+                n.overwrite(bigger_contents))
5952+            d.addCallback(lambda ignored:
5953+                n.download_best_version())
5954+            d.addCallback(lambda data:
5955+                self.failUnlessEqual(data, bigger_contents))
5956+            return d
5957+        d.addCallback(_created)
5958+        return d
5959+
5960+
5961     def test_create_with_initial_contents(self):
5962         d = self.nodemaker.create_mutable_file("contents 1")
5963         def _created(n):
5964hunk ./src/allmydata/test/test_mutable.py 1133
5965 
5966 
5967     def _test_corrupt_all(self, offset, substring,
5968-                          should_succeed=False, corrupt_early=True,
5969-                          failure_checker=None):
5970+                          should_succeed=False,
5971+                          corrupt_early=True,
5972+                          failure_checker=None,
5973+                          fetch_privkey=False):
5974         d = defer.succeed(None)
5975         if corrupt_early:
5976             d.addCallback(corrupt, self._storage, offset)
5977hunk ./src/allmydata/test/test_mutable.py 1153
5978                     self.failUnlessIn(substring, "".join(allproblems))
5979                 return servermap
5980             if should_succeed:
5981-                d1 = self._fn.download_version(servermap, ver)
5982+                d1 = self._fn.download_version(servermap, ver,
5983+                                               fetch_privkey)
5984                 d1.addCallback(lambda new_contents:
5985                                self.failUnlessEqual(new_contents, self.CONTENTS))
5986             else:
5987hunk ./src/allmydata/test/test_mutable.py 1161
5988                 d1 = self.shouldFail(NotEnoughSharesError,
5989                                      "_corrupt_all(offset=%s)" % (offset,),
5990                                      substring,
5991-                                     self._fn.download_version, servermap, ver)
5992+                                     self._fn.download_version, servermap,
5993+                                                                ver,
5994+                                                                fetch_privkey)
5995             if failure_checker:
5996                 d1.addCallback(failure_checker)
5997             d1.addCallback(lambda res: servermap)
5998hunk ./src/allmydata/test/test_mutable.py 1172
5999         return d
6000 
6001     def test_corrupt_all_verbyte(self):
6002-        # when the version byte is not 0, we hit an UnknownVersionError error
6003-        # in unpack_share().
6004+        # when the version byte is not 0 or 1, we hit an UnknownVersionError
6005+        # error in unpack_share().
6006         d = self._test_corrupt_all(0, "UnknownVersionError")
6007         def _check_servermap(servermap):
6008             # and the dump should mention the problems
6009hunk ./src/allmydata/test/test_mutable.py 1179
6010             s = StringIO()
6011             dump = servermap.dump(s).getvalue()
6012-            self.failUnless("10 PROBLEMS" in dump, dump)
6013+            self.failUnless("30 PROBLEMS" in dump, dump)
6014         d.addCallback(_check_servermap)
6015         return d
6016 
6017hunk ./src/allmydata/test/test_mutable.py 1249
6018         return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
6019 
6020 
6021+    def test_corrupt_all_encprivkey_late(self):
6022+        # this should work for the same reason as above, but we corrupt
6023+        # after the servermap update to exercise the error handling
6024+        # code.
6025+        # We need to remove the privkey from the node, or the retrieve
6026+        # process won't know to update it.
6027+        self._fn._privkey = None
6028+        return self._test_corrupt_all("enc_privkey",
6029+                                      None, # this shouldn't fail
6030+                                      should_succeed=True,
6031+                                      corrupt_early=False,
6032+                                      fetch_privkey=True)
6033+
6034+
6035     def test_corrupt_all_seqnum_late(self):
6036         # corrupting the seqnum between mapupdate and retrieve should result
6037         # in NotEnoughSharesError, since each share will look invalid
6038hunk ./src/allmydata/test/test_mutable.py 1269
6039         def _check(res):
6040             f = res[0]
6041             self.failUnless(f.check(NotEnoughSharesError))
6042-            self.failUnless("someone wrote to the data since we read the servermap" in str(f))
6043+            self.failUnless("uncoordinated write" in str(f))
6044         return self._test_corrupt_all(1, "ran out of peers",
6045                                       corrupt_early=False,
6046                                       failure_checker=_check)
6047hunk ./src/allmydata/test/test_mutable.py 1319
6048                       self.failUnlessEqual(new_contents, self.CONTENTS))
6049         return d
6050 
6051-    def test_corrupt_some(self):
6052-        # corrupt the data of first five shares (so the servermap thinks
6053-        # they're good but retrieve marks them as bad), so that the
6054-        # MODE_READ set of 6 will be insufficient, forcing node.download to
6055-        # retry with more servers.
6056-        corrupt(None, self._storage, "share_data", range(5))
6057-        d = self.make_servermap()
6058+
6059+    def _test_corrupt_some(self, offset, mdmf=False):
6060+        if mdmf:
6061+            d = self.publish_mdmf()
6062+        else:
6063+            d = defer.succeed(None)
6064+        d.addCallback(lambda ignored:
6065+            corrupt(None, self._storage, offset, range(5)))
6066+        d.addCallback(lambda ignored:
6067+            self.make_servermap())
6068         def _do_retrieve(servermap):
6069             ver = servermap.best_recoverable_version()
6070             self.failUnless(ver)
6071hunk ./src/allmydata/test/test_mutable.py 1335
6072             return self._fn.download_best_version()
6073         d.addCallback(_do_retrieve)
6074         d.addCallback(lambda new_contents:
6075-                      self.failUnlessEqual(new_contents, self.CONTENTS))
6076+            self.failUnlessEqual(new_contents, self.CONTENTS))
6077         return d
6078 
6079hunk ./src/allmydata/test/test_mutable.py 1338
6080+
6081+    def test_corrupt_some(self):
6082+        # corrupt the data of first five shares (so the servermap thinks
6083+        # they're good but retrieve marks them as bad), so that the
6084+        # MODE_READ set of 6 will be insufficient, forcing node.download to
6085+        # retry with more servers.
6086+        return self._test_corrupt_some("share_data")
6087+
6088+
6089     def test_download_fails(self):
6090         corrupt(None, self._storage, "signature")
6091         d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
6092hunk ./src/allmydata/test/test_mutable.py 1351
6093                             "no recoverable versions",
6094-                            self._fn.download_best_version)
6095+                            self._fn.download_best_version))
6096         return d
6097 
6098 
6099hunk ./src/allmydata/test/test_mutable.py 1355
6100+
6101+    def test_corrupt_mdmf_block_hash_tree(self):
6102+        d = self.publish_mdmf()
6103+        d.addCallback(lambda ignored:
6104+            self._test_corrupt_all(("block_hash_tree", 12 * 32),
6105+                                   "block hash tree failure",
6106+                                   corrupt_early=False,
6107+                                   should_succeed=False))
6108+        return d
6109+
6110+
6111+    def test_corrupt_mdmf_block_hash_tree_late(self):
6112+        d = self.publish_mdmf()
6113+        d.addCallback(lambda ignored:
6114+            self._test_corrupt_all(("block_hash_tree", 12 * 32),
6115+                                   "block hash tree failure",
6116+                                   corrupt_early=True,
6117+                                   should_succeed=False))
6118+        return d
6119+
6120+
6121+    def test_corrupt_mdmf_share_data(self):
6122+        d = self.publish_mdmf()
6123+        d.addCallback(lambda ignored:
6124+            # TODO: Find out what the block size is and corrupt a
6125+            # specific block, rather than just guessing.
6126+            self._test_corrupt_all(("share_data", 12 * 40),
6127+                                    "block hash tree failure",
6128+                                    corrupt_early=True,
6129+                                    should_succeed=False))
6130+        return d
6131+
6132+
6133+    def test_corrupt_some_mdmf(self):
6134+        return self._test_corrupt_some(("share_data", 12 * 40),
6135+                                       mdmf=True)
6136+
6137+
6138 class CheckerMixin:
6139     def check_good(self, r, where):
6140         self.failUnless(r.is_healthy(), where)
6141hunk ./src/allmydata/test/test_mutable.py 2093
6142             d.addCallback(lambda res:
6143                           self.shouldFail(NotEnoughSharesError,
6144                                           "test_retrieve_surprise",
6145-                                          "ran out of peers: have 0 shares (k=3)",
6146+                                          "ran out of peers: have 0 of 1",
6147                                           n.download_version,
6148                                           self.old_map,
6149                                           self.old_map.best_recoverable_version(),
6150hunk ./src/allmydata/test/test_mutable.py 2102
6151         d.addCallback(_created)
6152         return d
6153 
6154+
6155     def test_unexpected_shares(self):
6156         # upload the file, take a servermap, shut down one of the servers,
6157         # upload it again (causing shares to appear on a new server), then
6158hunk ./src/allmydata/test/test_mutable.py 2306
6159         self.basedir = "mutable/Problems/test_privkey_query_missing"
6160         self.set_up_grid(num_servers=20)
6161         nm = self.g.clients[0].nodemaker
6162-        LARGE = "These are Larger contents" * 2000 # about 50KB
6163+        LARGE = "These are Larger contents" * 2000 # about 50KiB
6164         nm._node_cache = DevNullDictionary() # disable the nodecache
6165 
6166         d = nm.create_mutable_file(LARGE)
6167hunk ./src/allmydata/test/test_mutable.py 2319
6168         d.addCallback(_created)
6169         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
6170         return d
6171+
6172+
6173+    def test_block_and_hash_query_error(self):
6174+        # This tests for what happens when a query to a remote server
6175+        # fails in either the hash validation step or the block getting
6176+        # step (because of batching, this is the same actual query).
6177+        # We need to have the storage server persist up until the point
6178+        # that its prefix is validated, then suddenly die. This
6179+        # exercises some exception handling code in Retrieve.
6180+        self.basedir = "mutable/Problems/test_block_and_hash_query_error"
6181+        self.set_up_grid(num_servers=20)
6182+        nm = self.g.clients[0].nodemaker
6183+        CONTENTS = "contents" * 2000
6184+        d = nm.create_mutable_file(CONTENTS)
6185+        def _created(node):
6186+            self._node = node
6187+        d.addCallback(_created)
6188+        d.addCallback(lambda ignored:
6189+            self._node.get_servermap(MODE_READ))
6190+        def _then(servermap):
6191+            # we have our servermap. Now we set up the servers like the
6192+            # tests above -- the first one that gets a read call should
6193+            # start throwing errors, but only after returning its prefix
6194+            # for validation. Since we'll download without fetching the
6195+            # private key, the next query to the remote server will be
6196+            # for either a block and salt or for hashes, either of which
6197+            # will exercise the error handling code.
6198+            killer = FirstServerGetsKilled()
6199+            for (serverid, ss) in nm.storage_broker.get_all_servers():
6200+                ss.post_call_notifier = killer.notify
6201+            ver = servermap.best_recoverable_version()
6202+            assert ver
6203+            return self._node.download_version(servermap, ver)
6204+        d.addCallback(_then)
6205+        d.addCallback(lambda data:
6206+            self.failUnlessEqual(data, CONTENTS))
6207+        return d
6208}
6209[test/test_mutable.py: change the definition of corrupt() to work with MDMF as well as SDMF files, change users of corrupt to use the new definition
6210Kevan Carstensen <kevan@isnotajoke.com>**20100626003520
6211 Ignore-this: 836e59e2fde0535f6b4bea3468dc8244
6212] {
6213hunk ./src/allmydata/test/test_mutable.py 171
6214                 and shnum not in shnums_to_corrupt):
6215                 continue
6216             data = shares[shnum]
6217-            (version,
6218-             seqnum,
6219-             root_hash,
6220-             IV,
6221-             k, N, segsize, datalen,
6222-             o) = unpack_header(data)
6223-            if isinstance(offset, tuple):
6224-                offset1, offset2 = offset
6225-            else:
6226-                offset1 = offset
6227-                offset2 = 0
6228-            if offset1 == "pubkey":
6229-                real_offset = 107
6230-            elif offset1 in o:
6231-                real_offset = o[offset1]
6232-            else:
6233-                real_offset = offset1
6234-            real_offset = int(real_offset) + offset2 + offset_offset
6235-            assert isinstance(real_offset, int), offset
6236-            if offset1 == 0: # verbyte
6237-                f = add_two
6238-            else:
6239-                f = flip_bit
6240-            shares[shnum] = f(data, real_offset)
6241-    return res
6242+            # We're feeding the reader all of the share data, so it
6243+            # won't need to use the rref that we didn't provide, nor the
6244+            # storage index that we didn't provide. We do this because
6245+            # the reader will work for both MDMF and SDMF.
6246+            reader = MDMFSlotReadProxy(None, None, shnum, data)
6247+            # We need to get the offsets for the next part.
6248+            d = reader.get_verinfo()
6249+            def _do_corruption(verinfo, data, shnum):
6250+                (seqnum,
6251+                 root_hash,
6252+                 IV,
6253+                 segsize,
6254+                 datalen,
6255+                 k, n, prefix, o) = verinfo
6256+                if isinstance(offset, tuple):
6257+                    offset1, offset2 = offset
6258+                else:
6259+                    offset1 = offset
6260+                    offset2 = 0
6261+                if offset1 == "pubkey":
6262+                    real_offset = 107
6263+                elif offset1 in o:
6264+                    real_offset = o[offset1]
6265+                else:
6266+                    real_offset = offset1
6267+                real_offset = int(real_offset) + offset2 + offset_offset
6268+                assert isinstance(real_offset, int), offset
6269+                if offset1 == 0: # verbyte
6270+                    f = add_two
6271+                else:
6272+                    f = flip_bit
6273+                shares[shnum] = f(data, real_offset)
6274+            d.addCallback(_do_corruption, data, shnum)
6275+            ds.append(d)
6276+    dl = defer.DeferredList(ds)
6277+    dl.addCallback(lambda ignored: res)
6278+    return dl
6279 
6280 def make_storagebroker(s=None, num_peers=10):
6281     if not s:
6282hunk ./src/allmydata/test/test_mutable.py 1360
6283 
6284 
6285     def test_download_fails(self):
6286-        corrupt(None, self._storage, "signature")
6287-        d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
6288+        d = corrupt(None, self._storage, "signature")
6289+        d.addCallback(lambda ignored:
6290+            self.shouldFail(UnrecoverableFileError, "test_download_anyway",
6291                             "no recoverable versions",
6292                             self._fn.download_best_version))
6293         return d
6294hunk ./src/allmydata/test/test_mutable.py 1453
6295         return d
6296 
6297     def test_check_all_bad_sig(self):
6298-        corrupt(None, self._storage, 1) # bad sig
6299-        d = self._fn.check(Monitor())
6300+        d = corrupt(None, self._storage, 1) # bad sig
6301+        d.addCallback(lambda ignored:
6302+            self._fn.check(Monitor()))
6303         d.addCallback(self.check_bad, "test_check_all_bad_sig")
6304         return d
6305 
6306hunk ./src/allmydata/test/test_mutable.py 1460
6307     def test_check_all_bad_blocks(self):
6308-        corrupt(None, self._storage, "share_data", [9]) # bad blocks
6309+        d = corrupt(None, self._storage, "share_data", [9]) # bad blocks
6310         # the Checker won't notice this.. it doesn't look at actual data
6311hunk ./src/allmydata/test/test_mutable.py 1462
6312-        d = self._fn.check(Monitor())
6313+        d.addCallback(lambda ignored:
6314+            self._fn.check(Monitor()))
6315         d.addCallback(self.check_good, "test_check_all_bad_blocks")
6316         return d
6317 
6318hunk ./src/allmydata/test/test_mutable.py 1473
6319         return d
6320 
6321     def test_verify_all_bad_sig(self):
6322-        corrupt(None, self._storage, 1) # bad sig
6323-        d = self._fn.check(Monitor(), verify=True)
6324+        d = corrupt(None, self._storage, 1) # bad sig
6325+        d.addCallback(lambda ignored:
6326+            self._fn.check(Monitor(), verify=True))
6327         d.addCallback(self.check_bad, "test_verify_all_bad_sig")
6328         return d
6329 
6330hunk ./src/allmydata/test/test_mutable.py 1480
6331     def test_verify_one_bad_sig(self):
6332-        corrupt(None, self._storage, 1, [9]) # bad sig
6333-        d = self._fn.check(Monitor(), verify=True)
6334+        d = corrupt(None, self._storage, 1, [9]) # bad sig
6335+        d.addCallback(lambda ignored:
6336+            self._fn.check(Monitor(), verify=True))
6337         d.addCallback(self.check_bad, "test_verify_one_bad_sig")
6338         return d
6339 
6340hunk ./src/allmydata/test/test_mutable.py 1487
6341     def test_verify_one_bad_block(self):
6342-        corrupt(None, self._storage, "share_data", [9]) # bad blocks
6343+        d = corrupt(None, self._storage, "share_data", [9]) # bad blocks
6344         # the Verifier *will* notice this, since it examines every byte
6345hunk ./src/allmydata/test/test_mutable.py 1489
6346-        d = self._fn.check(Monitor(), verify=True)
6347+        d.addCallback(lambda ignored:
6348+            self._fn.check(Monitor(), verify=True))
6349         d.addCallback(self.check_bad, "test_verify_one_bad_block")
6350         d.addCallback(self.check_expected_failure,
6351                       CorruptShareError, "block hash tree failure",
6352hunk ./src/allmydata/test/test_mutable.py 1498
6353         return d
6354 
6355     def test_verify_one_bad_sharehash(self):
6356-        corrupt(None, self._storage, "share_hash_chain", [9], 5)
6357-        d = self._fn.check(Monitor(), verify=True)
6358+        d = corrupt(None, self._storage, "share_hash_chain", [9], 5)
6359+        d.addCallback(lambda ignored:
6360+            self._fn.check(Monitor(), verify=True))
6361         d.addCallback(self.check_bad, "test_verify_one_bad_sharehash")
6362         d.addCallback(self.check_expected_failure,
6363                       CorruptShareError, "corrupt hashes",
6364hunk ./src/allmydata/test/test_mutable.py 1508
6365         return d
6366 
6367     def test_verify_one_bad_encprivkey(self):
6368-        corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
6369-        d = self._fn.check(Monitor(), verify=True)
6370+        d = corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
6371+        d.addCallback(lambda ignored:
6372+            self._fn.check(Monitor(), verify=True))
6373         d.addCallback(self.check_bad, "test_verify_one_bad_encprivkey")
6374         d.addCallback(self.check_expected_failure,
6375                       CorruptShareError, "invalid privkey",
6376hunk ./src/allmydata/test/test_mutable.py 1518
6377         return d
6378 
6379     def test_verify_one_bad_encprivkey_uncheckable(self):
6380-        corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
6381+        d = corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
6382         readonly_fn = self._fn.get_readonly()
6383         # a read-only node has no way to validate the privkey
6384hunk ./src/allmydata/test/test_mutable.py 1521
6385-        d = readonly_fn.check(Monitor(), verify=True)
6386+        d.addCallback(lambda ignored:
6387+            readonly_fn.check(Monitor(), verify=True))
6388         d.addCallback(self.check_good,
6389                       "test_verify_one_bad_encprivkey_uncheckable")
6390         return d
6391}
6392
6393Context:
6394
6395[docs: about.html link to home page early on, and be decentralized storage instead of cloud storage this time around
6396zooko@zooko.com**20100619065318
6397 Ignore-this: dc6db03f696e5b6d2848699e754d8053
6398] 
6399[docs: update about.html, especially to have a non-broken link to quickstart.html, and also to comment out the broken links to "for Paranoids" and "for Corporates"
6400zooko@zooko.com**20100619065124
6401 Ignore-this: e292c7f51c337a84ebfeb366fbd24d6c
6402] 
6403[TAG allmydata-tahoe-1.7.0
6404zooko@zooko.com**20100619052631
6405 Ignore-this: d21e27afe6d85e2e3ba6a3292ba2be1
6406] 
6407Patch bundle hash:
64087302b362be3f9a62ac23077fbfd0a8e8bec96de4