Ticket #393: 393status10.dpatch

File 393status10.dpatch, 263.3 KB (added by kevan, at 2010-06-27T00:07:27Z)
Line 
1Thu Jun 24 16:46:37 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
2  * Misc. changes to support the work I'm doing
3 
4      - Add a notion of file version number to interfaces.py
5      - Alter mutable file node interfaces to have a notion of version,
6        though this may be changed later.
7      - Alter mutable/filenode.py to conform to these changes.
8      - Add a salt hasher to util/hashutil.py
9
10Thu Jun 24 16:48:33 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
11  * nodemaker.py: create MDMF files when asked to
12
13Thu Jun 24 16:49:05 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
14  * storage/server.py: minor code cleanup
15
16Thu Jun 24 16:49:24 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
17  * test/test_mutable.py: alter some tests that were failing due to MDMF; minor code cleanup.
18
19Fri Jun 25 17:35:20 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
20  * test/test_mutable.py: change the definition of corrupt() to work with MDMF as well as SDMF files, change users of corrupt to use the new definition
21
22Sat Jun 26 16:40:33 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
23  * Add MDMF reader and writer
24 
25  The MDMF/SDMF reader and MDMF writer are similar to the object proxies
26  that exist for immutable files. They abstract away details of
27  connection, state, and caching from their callers (in this case, the
28  download, servermap updater, and uploader), and expose methods to get
29  and set information on the remote server.
30 
31  MDMFSlotReadProxy reads a mutable file from the server, doing the right
32  thing (in most cases) regardless of whether the file is MDMF or SDMF. It
33  allows callers to tell it how to batch and flush reads.
34 
35  MDMFSlotWriteProxy writes an MDMF mutable file to a server.
36 
37  This patch also includes tests for MDMFSlotReadProxy and
38  MDMFSlotWriteProxy.
39
40Sat Jun 26 16:41:18 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
41  * Alter the ServermapUpdater to find MDMF files
42 
43  The servermapupdater should find MDMF files on a grid in the same way
44  that it finds SDMF files. This patch makes it do that.
45
46Sat Jun 26 16:42:04 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
47  * Make a segmented mutable uploader
48 
49  The mutable file uploader should be able to publish files with one
50  segment and files with multiple segments. This patch makes it do that.
51  This is still incomplete, and rather ugly -- I need to flesh out error
52  handling, I need to write tests, and I need to remove some of the uglier
53  kludges in the process before I can call this done.
54
55Sat Jun 26 16:43:14 PDT 2010  Kevan Carstensen <kevan@isnotajoke.com>
56  * Write a segmented mutable downloader
57 
58  The segmented mutable downloader can deal with MDMF files (files with
59  one or more segments in MDMF format) and SDMF files (files with one
60  segment in SDMF format). It is backwards compatible with the old
61  file format.
62 
63  This patch also contains tests for the segmented mutable downloader.
64
65New patches:
66
67[Misc. changes to support the work I'm doing
68Kevan Carstensen <kevan@isnotajoke.com>**20100624234637
69 Ignore-this: fdd18fa8cc05f4b4b15ff53ee24a1819
70 
71     - Add a notion of file version number to interfaces.py
72     - Alter mutable file node interfaces to have a notion of version,
73       though this may be changed later.
74     - Alter mutable/filenode.py to conform to these changes.
75     - Add a salt hasher to util/hashutil.py
76] {
77hunk ./src/allmydata/interfaces.py 7
78      ChoiceOf, IntegerConstraint, Any, RemoteInterface, Referenceable
79 
80 HASH_SIZE=32
81+SALT_SIZE=16
82+
83+SDMF_VERSION=0
84+MDMF_VERSION=1
85 
86 Hash = StringConstraint(maxLength=HASH_SIZE,
87                         minLength=HASH_SIZE)# binary format 32-byte SHA256 hash
88hunk ./src/allmydata/interfaces.py 811
89         writer-visible data using this writekey.
90         """
91 
92+    def set_version(version):
93+        """Tahoe-LAFS supports SDMF and MDMF mutable files. By default,
94+        we upload in SDMF for reasons of compatibility. If you want to
95+        change this, set_version will let you do that.
96+
97+        To say that this file should be uploaded in SDMF, pass in a 0. To
98+        say that the file should be uploaded as MDMF, pass in a 1.
99+        """
100+
101+    def get_version():
102+        """Returns the mutable file protocol version."""
103+
104 class NotEnoughSharesError(Exception):
105     """Download was unable to get enough shares"""
106 
107hunk ./src/allmydata/mutable/filenode.py 8
108 from twisted.internet import defer, reactor
109 from foolscap.api import eventually
110 from allmydata.interfaces import IMutableFileNode, \
111-     ICheckable, ICheckResults, NotEnoughSharesError
112+     ICheckable, ICheckResults, NotEnoughSharesError, MDMF_VERSION, SDMF_VERSION
113 from allmydata.util import hashutil, log
114 from allmydata.util.assertutil import precondition
115 from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
116hunk ./src/allmydata/mutable/filenode.py 67
117         self._sharemap = {} # known shares, shnum-to-[nodeids]
118         self._cache = ResponseCache()
119         self._most_recent_size = None
120+        # filled in after __init__ if we're being created for the first time;
121+        # filled in by the servermap updater before publishing, otherwise.
122+        # set to this default value in case neither of those things happen,
123+        # or in case the servermap can't find any shares to tell us what
124+        # to publish as.
125+        # TODO: Set this back to None, and find out why the tests fail
126+        #       with it set to None.
127+        self._protocol_version = SDMF_VERSION
128 
129         # all users of this MutableFileNode go through the serializer. This
130         # takes advantage of the fact that Deferreds discard the callbacks
131hunk ./src/allmydata/mutable/filenode.py 472
132     def _did_upload(self, res, size):
133         self._most_recent_size = size
134         return res
135+
136+
137+    def set_version(self, version):
138+        # I can be set in two ways:
139+        #  1. When the node is created.
140+        #  2. (for an existing share) when the Servermap is updated
141+        #     before I am read.
142+        assert version in (MDMF_VERSION, SDMF_VERSION)
143+        self._protocol_version = version
144+
145+
146+    def get_version(self):
147+        return self._protocol_version
148hunk ./src/allmydata/util/hashutil.py 90
149 MUTABLE_READKEY_TAG = "allmydata_mutable_writekey_to_readkey_v1"
150 MUTABLE_DATAKEY_TAG = "allmydata_mutable_readkey_to_datakey_v1"
151 MUTABLE_STORAGEINDEX_TAG = "allmydata_mutable_readkey_to_storage_index_v1"
152+MUTABLE_SALT_TAG = "allmydata_mutable_segment_salt_v1"
153 
154 # dirnodes
155 DIRNODE_CHILD_WRITECAP_TAG = "allmydata_mutable_writekey_and_salt_to_dirnode_child_capkey_v1"
156hunk ./src/allmydata/util/hashutil.py 134
157 def plaintext_segment_hasher():
158     return tagged_hasher(PLAINTEXT_SEGMENT_TAG)
159 
160+def mutable_salt_hash(data):
161+    return tagged_hash(MUTABLE_SALT_TAG, data)
162+def mutable_salt_hasher():
163+    return tagged_hasher(MUTABLE_SALT_TAG)
164+
165 KEYLEN = 16
166 IVLEN = 16
167 
168}
169[nodemaker.py: create MDMF files when asked to
170Kevan Carstensen <kevan@isnotajoke.com>**20100624234833
171 Ignore-this: 26c16aaca9ddab7a7ce37a4530bc970
172] {
173hunk ./src/allmydata/nodemaker.py 3
174 import weakref
175 from zope.interface import implements
176-from allmydata.interfaces import INodeMaker
177+from allmydata.util.assertutil import precondition
178+from allmydata.interfaces import INodeMaker, MustBeDeepImmutableError, \
179+                                 SDMF_VERSION, MDMF_VERSION
180 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode
181 from allmydata.immutable.upload import Data
182 from allmydata.mutable.filenode import MutableFileNode
183hunk ./src/allmydata/nodemaker.py 92
184             return self._create_dirnode(filenode)
185         return None
186 
187-    def create_mutable_file(self, contents=None, keysize=None):
188+    def create_mutable_file(self, contents=None, keysize=None,
189+                            version=SDMF_VERSION):
190         n = MutableFileNode(self.storage_broker, self.secret_holder,
191                             self.default_encoding_parameters, self.history)
192hunk ./src/allmydata/nodemaker.py 96
193+        n.set_version(version)
194         d = self.key_generator.generate(keysize)
195         d.addCallback(n.create_with_keys, contents)
196         d.addCallback(lambda res: n)
197hunk ./src/allmydata/nodemaker.py 102
198         return d
199 
200-    def create_new_mutable_directory(self, initial_children={}):
201+    def create_new_mutable_directory(self, initial_children={},
202+                                     version=SDMF_VERSION):
203+        # initial_children must have metadata (i.e. {} instead of None)
204+        for (name, (node, metadata)) in initial_children.iteritems():
205+            precondition(isinstance(metadata, dict),
206+                         "create_new_mutable_directory requires metadata to be a dict, not None", metadata)
207+            node.raise_error()
208         d = self.create_mutable_file(lambda n:
209hunk ./src/allmydata/nodemaker.py 110
210-                                     pack_children(n, initial_children))
211+                                     pack_children(n, initial_children),
212+                                     version)
213         d.addCallback(self._create_dirnode)
214         return d
215 
216}
217[storage/server.py: minor code cleanup
218Kevan Carstensen <kevan@isnotajoke.com>**20100624234905
219 Ignore-this: 2358c531c39e48d3c8e56b62b5768228
220] {
221hunk ./src/allmydata/storage/server.py 569
222                                          self)
223         return share
224 
225-    def remote_slot_readv(self, storage_index, shares, readv):
226+    def remote_slot_readv(self, storage_index, shares, readvs):
227         start = time.time()
228         self.count("readv")
229         si_s = si_b2a(storage_index)
230hunk ./src/allmydata/storage/server.py 590
231             if sharenum in shares or not shares:
232                 filename = os.path.join(bucketdir, sharenum_s)
233                 msf = MutableShareFile(filename, self)
234-                datavs[sharenum] = msf.readv(readv)
235+                datavs[sharenum] = msf.readv(readvs)
236         log.msg("returning shares %s" % (datavs.keys(),),
237                 facility="tahoe.storage", level=log.NOISY, parent=lp)
238         self.add_latency("readv", time.time() - start)
239}
240[test/test_mutable.py: alter some tests that were failing due to MDMF; minor code cleanup.
241Kevan Carstensen <kevan@isnotajoke.com>**20100624234924
242 Ignore-this: afb86ec1fbdbfe1a5ef6f46f350273c0
243] {
244hunk ./src/allmydata/test/test_mutable.py 151
245             chr(ord(original[byte_offset]) ^ 0x01) +
246             original[byte_offset+1:])
247 
248+def add_two(original, byte_offset):
249+    # It isn't enough to simply flip the bit for the version number,
250+    # because 1 is a valid version number. So we add two instead.
251+    return (original[:byte_offset] +
252+            chr(ord(original[byte_offset]) ^ 0x02) +
253+            original[byte_offset+1:])
254+
255 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
256     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
257     # list of shnums to corrupt.
258hunk ./src/allmydata/test/test_mutable.py 187
259                 real_offset = offset1
260             real_offset = int(real_offset) + offset2 + offset_offset
261             assert isinstance(real_offset, int), offset
262-            shares[shnum] = flip_bit(data, real_offset)
263+            if offset1 == 0: # verbyte
264+                f = add_two
265+            else:
266+                f = flip_bit
267+            shares[shnum] = f(data, real_offset)
268     return res
269 
270 def make_storagebroker(s=None, num_peers=10):
271hunk ./src/allmydata/test/test_mutable.py 423
272         d.addCallback(_created)
273         return d
274 
275+
276     def test_modify_backoffer(self):
277         def _modifier(old_contents, servermap, first_time):
278             return old_contents + "line2"
279hunk ./src/allmydata/test/test_mutable.py 658
280         d.addCallback(_created)
281         return d
282 
283+
284     def _copy_shares(self, ignored, index):
285         shares = self._storage._peers
286         # we need a deep copy
287}
288[test/test_mutable.py: change the definition of corrupt() to work with MDMF as well as SDMF files, change users of corrupt to use the new definition
289Kevan Carstensen <kevan@isnotajoke.com>**20100626003520
290 Ignore-this: 836e59e2fde0535f6b4bea3468dc8244
291] {
292hunk ./src/allmydata/test/test_mutable.py 168
293                 and shnum not in shnums_to_corrupt):
294                 continue
295             data = shares[shnum]
296-            (version,
297-             seqnum,
298-             root_hash,
299-             IV,
300-             k, N, segsize, datalen,
301-             o) = unpack_header(data)
302-            if isinstance(offset, tuple):
303-                offset1, offset2 = offset
304-            else:
305-                offset1 = offset
306-                offset2 = 0
307-            if offset1 == "pubkey":
308-                real_offset = 107
309-            elif offset1 in o:
310-                real_offset = o[offset1]
311-            else:
312-                real_offset = offset1
313-            real_offset = int(real_offset) + offset2 + offset_offset
314-            assert isinstance(real_offset, int), offset
315-            if offset1 == 0: # verbyte
316-                f = add_two
317-            else:
318-                f = flip_bit
319-            shares[shnum] = f(data, real_offset)
320-    return res
321+            # We're feeding the reader all of the share data, so it
322+            # won't need to use the rref that we didn't provide, nor the
323+            # storage index that we didn't provide. We do this because
324+            # the reader will work for both MDMF and SDMF.
325+            reader = MDMFSlotReadProxy(None, None, shnum, data)
326+            # We need to get the offsets for the next part.
327+            d = reader.get_verinfo()
328+            def _do_corruption(verinfo, data, shnum):
329+                (seqnum,
330+                 root_hash,
331+                 IV,
332+                 segsize,
333+                 datalen,
334+                 k, n, prefix, o) = verinfo
335+                if isinstance(offset, tuple):
336+                    offset1, offset2 = offset
337+                else:
338+                    offset1 = offset
339+                    offset2 = 0
340+                if offset1 == "pubkey":
341+                    real_offset = 107
342+                elif offset1 in o:
343+                    real_offset = o[offset1]
344+                else:
345+                    real_offset = offset1
346+                real_offset = int(real_offset) + offset2 + offset_offset
347+                assert isinstance(real_offset, int), offset
348+                if offset1 == 0: # verbyte
349+                    f = add_two
350+                else:
351+                    f = flip_bit
352+                shares[shnum] = f(data, real_offset)
353+            d.addCallback(_do_corruption, data, shnum)
354+            ds.append(d)
355+    dl = defer.DeferredList(ds)
356+    dl.addCallback(lambda ignored: res)
357+    return dl
358 
359 def make_storagebroker(s=None, num_peers=10):
360     if not s:
361hunk ./src/allmydata/test/test_mutable.py 1177
362         return d
363 
364     def test_download_fails(self):
365-        corrupt(None, self._storage, "signature")
366-        d = self.shouldFail(UnrecoverableFileError, "test_download_anyway",
367+        d = corrupt(None, self._storage, "signature")
368+        d.addCallback(lambda ignored:
369+            self.shouldFail(UnrecoverableFileError, "test_download_anyway",
370                             "no recoverable versions",
371                             self._fn.download_best_version)
372         return d
373hunk ./src/allmydata/test/test_mutable.py 1232
374         return d
375 
376     def test_check_all_bad_sig(self):
377-        corrupt(None, self._storage, 1) # bad sig
378-        d = self._fn.check(Monitor())
379+        d = corrupt(None, self._storage, 1) # bad sig
380+        d.addCallback(lambda ignored:
381+            self._fn.check(Monitor()))
382         d.addCallback(self.check_bad, "test_check_all_bad_sig")
383         return d
384 
385hunk ./src/allmydata/test/test_mutable.py 1239
386     def test_check_all_bad_blocks(self):
387-        corrupt(None, self._storage, "share_data", [9]) # bad blocks
388+        d = corrupt(None, self._storage, "share_data", [9]) # bad blocks
389         # the Checker won't notice this.. it doesn't look at actual data
390hunk ./src/allmydata/test/test_mutable.py 1241
391-        d = self._fn.check(Monitor())
392+        d.addCallback(lambda ignored:
393+            self._fn.check(Monitor()))
394         d.addCallback(self.check_good, "test_check_all_bad_blocks")
395         return d
396 
397hunk ./src/allmydata/test/test_mutable.py 1252
398         return d
399 
400     def test_verify_all_bad_sig(self):
401-        corrupt(None, self._storage, 1) # bad sig
402-        d = self._fn.check(Monitor(), verify=True)
403+        d = corrupt(None, self._storage, 1) # bad sig
404+        d.addCallback(lambda ignored:
405+            self._fn.check(Monitor(), verify=True))
406         d.addCallback(self.check_bad, "test_verify_all_bad_sig")
407         return d
408 
409hunk ./src/allmydata/test/test_mutable.py 1259
410     def test_verify_one_bad_sig(self):
411-        corrupt(None, self._storage, 1, [9]) # bad sig
412-        d = self._fn.check(Monitor(), verify=True)
413+        d = corrupt(None, self._storage, 1, [9]) # bad sig
414+        d.addCallback(lambda ignored:
415+            self._fn.check(Monitor(), verify=True))
416         d.addCallback(self.check_bad, "test_verify_one_bad_sig")
417         return d
418 
419hunk ./src/allmydata/test/test_mutable.py 1266
420     def test_verify_one_bad_block(self):
421-        corrupt(None, self._storage, "share_data", [9]) # bad blocks
422+        d = corrupt(None, self._storage, "share_data", [9]) # bad blocks
423         # the Verifier *will* notice this, since it examines every byte
424hunk ./src/allmydata/test/test_mutable.py 1268
425-        d = self._fn.check(Monitor(), verify=True)
426+        d.addCallback(lambda ignored:
427+            self._fn.check(Monitor(), verify=True))
428         d.addCallback(self.check_bad, "test_verify_one_bad_block")
429         d.addCallback(self.check_expected_failure,
430                       CorruptShareError, "block hash tree failure",
431hunk ./src/allmydata/test/test_mutable.py 1277
432         return d
433 
434     def test_verify_one_bad_sharehash(self):
435-        corrupt(None, self._storage, "share_hash_chain", [9], 5)
436-        d = self._fn.check(Monitor(), verify=True)
437+        d = corrupt(None, self._storage, "share_hash_chain", [9], 5)
438+        d.addCallback(lambda ignored:
439+            self._fn.check(Monitor(), verify=True))
440         d.addCallback(self.check_bad, "test_verify_one_bad_sharehash")
441         d.addCallback(self.check_expected_failure,
442                       CorruptShareError, "corrupt hashes",
443hunk ./src/allmydata/test/test_mutable.py 1287
444         return d
445 
446     def test_verify_one_bad_encprivkey(self):
447-        corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
448-        d = self._fn.check(Monitor(), verify=True)
449+        d = corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
450+        d.addCallback(lambda ignored:
451+            self._fn.check(Monitor(), verify=True))
452         d.addCallback(self.check_bad, "test_verify_one_bad_encprivkey")
453         d.addCallback(self.check_expected_failure,
454                       CorruptShareError, "invalid privkey",
455hunk ./src/allmydata/test/test_mutable.py 1297
456         return d
457 
458     def test_verify_one_bad_encprivkey_uncheckable(self):
459-        corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
460+        d = corrupt(None, self._storage, "enc_privkey", [9]) # bad privkey
461         readonly_fn = self._fn.get_readonly()
462         # a read-only node has no way to validate the privkey
463hunk ./src/allmydata/test/test_mutable.py 1300
464-        d = readonly_fn.check(Monitor(), verify=True)
465+        d.addCallback(lambda ignored:
466+            readonly_fn.check(Monitor(), verify=True))
467         d.addCallback(self.check_good,
468                       "test_verify_one_bad_encprivkey_uncheckable")
469         return d
470}
471[Add MDMF reader and writer
472Kevan Carstensen <kevan@isnotajoke.com>**20100626234033
473 Ignore-this: 30bb93141aacc4201e87302a462b3075
474 
475 The MDMF/SDMF reader and MDMF writer are similar to the object proxies
476 that exist for immutable files. They abstract away details of
477 connection, state, and caching from their callers (in this case, the
478 download, servermap updater, and uploader), and expose methods to get
479 and set information on the remote server.
480 
481 MDMFSlotReadProxy reads a mutable file from the server, doing the right
482 thing (in most cases) regardless of whether the file is MDMF or SDMF. It
483 allows callers to tell it how to batch and flush reads.
484 
485 MDMFSlotWriteProxy writes an MDMF mutable file to a server.
486 
487 This patch also includes tests for MDMFSlotReadProxy and
488 MDMFSlotWriteProxy.
489] {
490hunk ./src/allmydata/mutable/layout.py 4
491 
492 import struct
493 from allmydata.mutable.common import NeedMoreDataError, UnknownVersionError
494+from allmydata.interfaces import HASH_SIZE, SALT_SIZE, SDMF_VERSION, \
495+                                 MDMF_VERSION
496+from allmydata.util import mathutil, observer
497+from twisted.python import failure
498+from twisted.internet import defer
499+
500+
501+# These strings describe the format of the packed structs they help process
502+# Here's what they mean:
503+#
504+#  PREFIX:
505+#    >: Big-endian byte order; the most significant byte is first (leftmost).
506+#    B: The version information; an 8 bit version identifier. Stored as
507+#       an unsigned char. This is currently 00 00 00 00; our modifications
508+#       will turn it into 00 00 00 01.
509+#    Q: The sequence number; this is sort of like a revision history for
510+#       mutable files; they start at 1 and increase as they are changed after
511+#       being uploaded. Stored as an unsigned long long, which is 8 bytes in
512+#       length.
513+#  32s: The root hash of the share hash tree. We use sha-256d, so we use 32
514+#       characters = 32 bytes to store the value.
515+#  16s: The salt for the readkey. This is a 16-byte random value, stored as
516+#       16 characters.
517+#
518+#  SIGNED_PREFIX additions, things that are covered by the signature:
519+#    B: The "k" encoding parameter. We store this as an 8-bit character,
520+#       which is convenient because our erasure coding scheme cannot
521+#       encode if you ask for more than 255 pieces.
522+#    B: The "N" encoding parameter. Stored as an 8-bit character for the
523+#       same reasons as above.
524+#    Q: The segment size of the uploaded file. This will essentially be the
525+#       length of the file in SDMF. An unsigned long long, so we can store
526+#       files of quite large size.
527+#    Q: The data length of the uploaded file. Modulo padding, this will be
528+#       the same of the data length field. Like the data length field, it is
529+#       an unsigned long long and can be quite large.
530+#
531+#   HEADER additions:
532+#     L: The offset of the signature of this. An unsigned long.
533+#     L: The offset of the share hash chain. An unsigned long.
534+#     L: The offset of the block hash tree. An unsigned long.
535+#     L: The offset of the share data. An unsigned long.
536+#     Q: The offset of the encrypted private key. An unsigned long long, to
537+#        account for the possibility of a lot of share data.
538+#     Q: The offset of the EOF. An unsigned long long, to account for the
539+#        possibility of a lot of share data.
540+#
541+#  After all of these, we have the following:
542+#    - The verification key: Occupies the space between the end of the header
543+#      and the start of the signature (i.e.: data[HEADER_LENGTH:o['signature']].
544+#    - The signature, which goes from the signature offset to the share hash
545+#      chain offset.
546+#    - The share hash chain, which goes from the share hash chain offset to
547+#      the block hash tree offset.
548+#    - The share data, which goes from the share data offset to the encrypted
549+#      private key offset.
550+#    - The encrypted private key offset, which goes until the end of the file.
551+#
552+#  The block hash tree in this encoding has only one share, so the offset of
553+#  the share data will be 32 bits more than the offset of the block hash tree.
554+#  Given this, we may need to check to see how many bytes a reasonably sized
555+#  block hash tree will take up.
556 
557 PREFIX = ">BQ32s16s" # each version has a different prefix
558 SIGNED_PREFIX = ">BQ32s16s BBQQ" # this is covered by the signature
559hunk ./src/allmydata/mutable/layout.py 72
560 SIGNED_PREFIX_LENGTH = struct.calcsize(SIGNED_PREFIX)
561 HEADER = ">BQ32s16s BBQQ LLLLQQ" # includes offsets
562 HEADER_LENGTH = struct.calcsize(HEADER)
563+OFFSETS = ">LLLLQQ"
564+OFFSETS_LENGTH = struct.calcsize(OFFSETS)
565 
566 def unpack_header(data):
567     o = {}
568hunk ./src/allmydata/mutable/layout.py 193
569     return (share_hash_chain, block_hash_tree, share_data)
570 
571 
572-def pack_checkstring(seqnum, root_hash, IV):
573+def pack_checkstring(seqnum, root_hash, IV, version=0):
574     return struct.pack(PREFIX,
575hunk ./src/allmydata/mutable/layout.py 195
576-                       0, # version,
577+                       version,
578                        seqnum,
579                        root_hash,
580                        IV)
581hunk ./src/allmydata/mutable/layout.py 268
582                            encprivkey])
583     return final_share
584 
585+def pack_prefix(seqnum, root_hash, IV,
586+                required_shares, total_shares,
587+                segment_size, data_length):
588+    prefix = struct.pack(SIGNED_PREFIX,
589+                         0, # version,
590+                         seqnum,
591+                         root_hash,
592+                         IV,
593+                         required_shares,
594+                         total_shares,
595+                         segment_size,
596+                         data_length,
597+                         )
598+    return prefix
599+
600+
601+MDMFHEADER = ">BQ32sBBQQ QQQQQQ"
602+MDMFHEADERWITHOUTOFFSETS = ">BQ32sBBQQ"
603+MDMFHEADERSIZE = struct.calcsize(MDMFHEADER)
604+MDMFHEADERWITHOUTOFFSETSSIZE = struct.calcsize(MDMFHEADERWITHOUTOFFSETS)
605+MDMFCHECKSTRING = ">BQ32s"
606+MDMFSIGNABLEHEADER = ">BQ32sBBQQ"
607+MDMFOFFSETS = ">QQQQQQ"
608+MDMFOFFSETS_LENGTH = struct.calcsize(MDMFOFFSETS)
609+
610+class MDMFSlotWriteProxy:
611+    #implements(IMutableSlotWriter) TODO
612+
613+    """
614+    I represent a remote write slot for an MDMF mutable file.
615+
616+    I abstract away from my caller the details of block and salt
617+    management, and the implementation of the on-disk format for MDMF
618+    shares.
619+    """
620+    # We want to get rid of the salt tree root hash for MDMF. It is
621+    # easier to check validity if we store the salts next to the block
622+    # data for each segment. We then make the block hash tree leaves
623+    # made up of hashes of block + salt for a particular block/segment.
624+    # In this way, we get salt validity checking for free, and make for
625+    # simpler upload and download logic.
626+    #
627+    # Problems?
628+    #
629+    # Well, the meaning of the datalength field, per
630+    # docs/specifications/mutable.txt, is to give the datalength of the
631+    # original plaintext -- not the ciphertext.
632+    #
633+    # We don't really care about this issue when we're uploading. But
634+    # the downloader needs to know how many segments there are, and
635+    # calculates this by checking the data length against the segment
636+    # size. Not a problem when the only thing in segments are the
637+    # Wait, this is stupid. Leave the data length and the segment size
638+    # alone -- this will allow the code to calculate the number of
639+    # segments appropriately. Then tweak the read code to read more from
640+    # the remote server. Done.
641+
642+    # Expected layout, MDMF:
643+    # offset:     size:       name:
644+    #-- signed part --
645+    # 0           1           version number (01)
646+    # 1           8           sequence number
647+    # 9           32          share tree root hash
648+    # 41          1           The "k" encoding parameter
649+    # 42          1           The "N" encoding parameter
650+    # 43          8           The segment size of the uploaded file
651+    # 51          8           The data length of the original plaintext
652+    #-- end signed part --
653+    # 59          8           The offset of the encrypted private key
654+    # 67          8           The offset of the block hash tree
655+    # 75          8           The offset of the share hash chain
656+    # 83          8           The offset of the signature
657+    # 91          8           The offset of the verification key
658+    # 99          8           The offset of the EOF
659+    #
660+    # followed by salts and share data, the encrypted private key, the
661+    # block hash tree, the salt hash tree, the share hash chain, a
662+    # signature over the first eight fields, and a verification key.
663+    #
664+    # The checkstring is the first three fields -- the version number,
665+    # sequence number, root hash and root salt hash. This is consistent
666+    # in meaning to what we have with SDMF files, except now instead of
667+    # using the literal salt, we use a value derived from all of the
668+    # salts -- the share hash root.
669+    #
670+    # The salt is stored before the block for each segment. The block
671+    # hash tree is computed over the combination of block and salt for
672+    # each segment. In this way, we get integrity checking for both
673+    # block and salt with the current block hash tree arrangement.
674+    #
675+    # The ordering of the offsets is different to reflect the dependencies
676+    # that we'll run into with an MDMF file. The expected write flow is
677+    # something like this:
678+    #
679+    #   0: Initialize with the sequence number, encoding parameters and
680+    #      data length. From this, we can deduce the number of segments,
681+    #      and where they should go.. We can also figure out where the
682+    #      encrypted private key should go, because we can figure out how
683+    #      big the share data will be.
684+    #
685+    #   1: Encrypt, encode, and upload the file in chunks. Do something
686+    #      like
687+    #
688+    #       put_block(data, segnum, salt)
689+    #
690+    #      to write a block and a salt to the disk. We can do both of
691+    #      these operations now because we have enough of the offsets to
692+    #      know where to put them.
693+    #
694+    #   2: Put the encrypted private key. Use:
695+    #
696+    #        put_encprivkey(encprivkey)
697+    #
698+    #      Now that we know the length of the private key, we can fill
699+    #      in the offset for the block hash tree.
700+    #
701+    #   3: We're now in a position to upload the block hash tree for
702+    #      a share. Put that using something like:
703+    #       
704+    #        put_blockhashes(block_hash_tree)
705+    #
706+    #      Note that block_hash_tree is a list of hashes -- we'll take
707+    #      care of the details of serializing that appropriately. When
708+    #      we get the block hash tree, we are also in a position to
709+    #      calculate the offset for the share hash chain, and fill that
710+    #      into the offsets table.
711+    #
712+    #   4: At the same time, we're in a position to upload the salt hash
713+    #      tree. This is a Merkle tree over all of the salts. We use a
714+    #      Merkle tree so that we can validate each block,salt pair as
715+    #      we download them later. We do this using
716+    #
717+    #        put_salthashes(salt_hash_tree)
718+    #
719+    #      When you do this, I automatically put the root of the tree
720+    #      (the hash at index 0 of the list) in its appropriate slot in
721+    #      the signed prefix of the share.
722+    #
723+    #   5: We're now in a position to upload the share hash chain for
724+    #      a share. Do that with something like:
725+    #     
726+    #        put_sharehashes(share_hash_chain)
727+    #
728+    #      share_hash_chain should be a dictionary mapping shnums to
729+    #      32-byte hashes -- the wrapper handles serialization.
730+    #      We'll know where to put the signature at this point, also.
731+    #      The root of this tree will be put explicitly in the next
732+    #      step.
733+    #
734+    #      TODO: Why? Why not just include it in the tree here?
735+    #
736+    #   6: Before putting the signature, we must first put the
737+    #      root_hash. Do this with:
738+    #
739+    #        put_root_hash(root_hash).
740+    #     
741+    #      In terms of knowing where to put this value, it was always
742+    #      possible to place it, but it makes sense semantically to
743+    #      place it after the share hash tree, so that's why you do it
744+    #      in this order.
745+    #
746+    #   6: With the root hash put, we can now sign the header. Use:
747+    #
748+    #        get_signable()
749+    #
750+    #      to get the part of the header that you want to sign, and use:
751+    #       
752+    #        put_signature(signature)
753+    #
754+    #      to write your signature to the remote server.
755+    #
756+    #   6: Add the verification key, and finish. Do:
757+    #
758+    #        put_verification_key(key)
759+    #
760+    #      and
761+    #
762+    #        finish_publish()
763+    #
764+    # Checkstring management:
765+    #
766+    # To write to a mutable slot, we have to provide test vectors to ensure
767+    # that we are writing to the same data that we think we are. These
768+    # vectors allow us to detect uncoordinated writes; that is, writes
769+    # where both we and some other shareholder are writing to the
770+    # mutable slot, and to report those back to the parts of the program
771+    # doing the writing.
772+    #
773+    # With SDMF, this was easy -- all of the share data was written in
774+    # one go, so it was easy to detect uncoordinated writes, and we only
775+    # had to do it once. With MDMF, not all of the file is written at
776+    # once.
777+    #
778+    # If a share is new, we write out as much of the header as we can
779+    # before writing out anything else. This gives other writers a
780+    # canary that they can use to detect uncoordinated writes, and, if
781+    # they do the same thing, gives us the same canary. We them update
782+    # the share. We won't be able to write out two fields of the header
783+    # -- the share tree hash and the salt hash -- until we finish
784+    # writing out the share. We only require the writer to provide the
785+    # initial checkstring, and keep track of what it should be after
786+    # updates ourselves.
787+    #
788+    # If we haven't written anything yet, then on the first write (which
789+    # will probably be a block + salt of a share), we'll also write out
790+    # the header. On subsequent passes, we'll expect to see the header.
791+    # This changes in two places:
792+    #
793+    #   - When we write out the salt hash
794+    #   - When we write out the root of the share hash tree
795+    #
796+    # since these values will change the header. It is possible that we
797+    # can just make those be written in one operation to minimize
798+    # disruption.
799+    def __init__(self,
800+                 shnum,
801+                 rref, # a remote reference to a storage server
802+                 storage_index,
803+                 secrets, # (write_enabler, renew_secret, cancel_secret)
804+                 seqnum, # the sequence number of the mutable file
805+                 required_shares,
806+                 total_shares,
807+                 segment_size,
808+                 data_length): # the length of the original file
809+        self._shnum = shnum
810+        self._rref = rref
811+        self._storage_index = storage_index
812+        self._seqnum = seqnum
813+        self._required_shares = required_shares
814+        assert self._shnum >= 0 and self._shnum < total_shares
815+        self._total_shares = total_shares
816+        # We build up the offset table as we write things. It is the
817+        # last thing we write to the remote server.
818+        self._offsets = {}
819+        self._testvs = []
820+        self._secrets = secrets
821+        # The segment size needs to be a multiple of the k parameter --
822+        # any padding should have been carried out by the publisher
823+        # already.
824+        assert segment_size % required_shares == 0
825+        self._segment_size = segment_size
826+        self._data_length = data_length
827+
828+        # These are set later -- we define them here so that we can
829+        # check for their existence easily
830+
831+        # This is the root of the share hash tree -- the Merkle tree
832+        # over the roots of the block hash trees computed for shares in
833+        # this upload.
834+        self._root_hash = None
835+
836+        # We haven't yet written anything to the remote bucket. By
837+        # setting this, we tell the _write method as much. The write
838+        # method will then know that it also needs to add a write vector
839+        # for the checkstring (or what we have of it) to the first write
840+        # request. We'll then record that value for future use.  If
841+        # we're expecting something to be there already, we need to call
842+        # set_checkstring before we write anything to tell the first
843+        # write about that.
844+        self._written = False
845+
846+        # When writing data to the storage servers, we get a read vector
847+        # for free. We'll read the checkstring, which will help us
848+        # figure out what's gone wrong if a write fails.
849+        self._readv = [(0, struct.calcsize(MDMFCHECKSTRING))]
850+
851+        # We calculate the number of segments because it tells us
852+        # where the salt part of the file ends/share segment begins,
853+        # and also because it provides a useful amount of bounds checking.
854+        self._num_segments = mathutil.div_ceil(self._data_length,
855+                                               self._segment_size)
856+        self._block_size = self._segment_size / self._required_shares
857+        # We also calculate the share size, to help us with block
858+        # constraints later.
859+        tail_size = self._data_length % self._segment_size
860+        if not tail_size:
861+            self._tail_block_size = self._block_size
862+        else:
863+            self._tail_block_size = mathutil.next_multiple(tail_size,
864+                                                           self._required_shares)
865+            self._tail_block_size /= self._required_shares
866+
867+        # We already know where the sharedata starts; right after the end
868+        # of the header (which is defined as the signable part + the offsets)
869+        # We can also calculate where the encrypted private key begins
870+        # from what we know know.
871+        self._actual_block_size = self._block_size + SALT_SIZE
872+        data_size = self._actual_block_size * (self._num_segments - 1)
873+        data_size += self._tail_block_size
874+        data_size += SALT_SIZE
875+        self._offsets['enc_privkey'] = MDMFHEADERSIZE
876+        self._offsets['enc_privkey'] += data_size
877+        # We'll wait for the rest. Callers can now call my "put_block" and
878+        # "set_checkstring" methods.
879+
880+
881+    def set_checkstring(self, seqnum_or_checkstring, root_hash=None):
882+        """
883+        Set checkstring checkstring for the given shnum.
884+
885+        This can be invoked in one of two ways.
886+
887+        With one argument, I assume that you are giving me a literal
888+        checkstring -- e.g., the output of get_checkstring. I will then
889+        set that checkstring as it is. This form is used by unit tests.
890+
891+        With two arguments, I assume that you are giving me a sequence
892+        number and root hash to make a checkstring from. In that case, I
893+        will build a checkstring and set it for you. This form is used
894+        by the publisher.
895+
896+        By default, I assume that I am writing new shares to the grid.
897+        If you don't explcitly set your own checkstring, I will use
898+        one that requires that the remote share not exist. You will want
899+        to use this method if you are updating a share in-place;
900+        otherwise, writes will fail.
901+        """
902+        # You're allowed to overwrite checkstrings with this method;
903+        # I assume that users know what they are doing when they call
904+        # it.
905+        if root_hash:
906+            checkstring = struct.pack(MDMFCHECKSTRING,
907+                                      1,
908+                                      seqnum_or_checkstring,
909+                                      root_hash)
910+        else:
911+            checkstring = seqnum_or_checkstring
912+
913+        if checkstring == "":
914+            # We special-case this, since len("") = 0, but we need
915+            # length of 1 for the case of an empty share to work on the
916+            # storage server, which is what a checkstring that is the
917+            # empty string means.
918+            self._testvs = []
919+        else:
920+            self._testvs = []
921+            self._testvs.append((0, len(checkstring), "eq", checkstring))
922+
923+
924+    def __repr__(self):
925+        return "MDMFSlotWriteProxy for share %d" % self._shnum
926+
927+
928+    def get_checkstring(self):
929+        """
930+        Given a share number, I return a representation of what the
931+        checkstring for that share on the server will look like.
932+
933+        I am mostly used for tests.
934+        """
935+        if self._root_hash:
936+            roothash = self._root_hash
937+        else:
938+            roothash = "\x00" * 32
939+        return struct.pack(MDMFCHECKSTRING,
940+                           1,
941+                           self._seqnum,
942+                           roothash)
943+
944+
945+    def put_block(self, data, segnum, salt):
946+        """
947+        Put the encrypted-and-encoded data segment in the slot, along
948+        with the salt.
949+        """
950+        if segnum >= self._num_segments:
951+            raise LayoutInvalid("I won't overwrite the private key")
952+        if len(salt) != SALT_SIZE:
953+            raise LayoutInvalid("I was given a salt of size %d, but "
954+                                "I wanted a salt of size %d")
955+        if segnum + 1 == self._num_segments:
956+            if len(data) != self._tail_block_size:
957+                raise LayoutInvalid("I was given the wrong size block to write")
958+        elif len(data) != self._block_size:
959+            raise LayoutInvalid("I was given the wrong size block to write")
960+
961+        # We want to write at len(MDMFHEADER) + segnum * block_size.
962+
963+        offset = MDMFHEADERSIZE + (self._actual_block_size * segnum)
964+        data = salt + data
965+
966+        datavs = [tuple([offset, data])]
967+        return self._write(datavs)
968+
969+
970+    def put_encprivkey(self, encprivkey):
971+        """
972+        Put the encrypted private key in the remote slot.
973+        """
974+        assert self._offsets
975+        assert self._offsets['enc_privkey']
976+        # You shouldn't re-write the encprivkey after the block hash
977+        # tree is written, since that could cause the private key to run
978+        # into the block hash tree. Before it writes the block hash
979+        # tree, the block hash tree writing method writes the offset of
980+        # the salt hash tree. So that's a good indicator of whether or
981+        # not the block hash tree has been written.
982+        if "share_hash_chain" in self._offsets:
983+            raise LayoutInvalid("You must write this before the block hash tree")
984+
985+        self._offsets['block_hash_tree'] = self._offsets['enc_privkey'] + len(encprivkey)
986+        datavs = [(tuple([self._offsets['enc_privkey'], encprivkey]))]
987+        def _on_failure():
988+            del(self._offsets['block_hash_tree'])
989+        return self._write(datavs, on_failure=_on_failure)
990+
991+
992+    def put_blockhashes(self, blockhashes):
993+        """
994+        Put the block hash tree in the remote slot.
995+
996+        The encrypted private key must be put before the block hash
997+        tree, since we need to know how large it is to know where the
998+        block hash tree should go. The block hash tree must be put
999+        before the salt hash tree, since its size determines the
1000+        offset of the share hash chain.
1001+        """
1002+        assert self._offsets
1003+        assert isinstance(blockhashes, list)
1004+        if "block_hash_tree" not in self._offsets:
1005+            raise LayoutInvalid("You must put the encrypted private key "
1006+                                "before you put the block hash tree")
1007+        # If written, the share hash chain causes the signature offset
1008+        # to be defined.
1009+        if "signature" in self._offsets:
1010+            raise LayoutInvalid("You must put the block hash tree before "
1011+                                "you put the share hash chain")
1012+        blockhashes_s = "".join(blockhashes)
1013+        self._offsets['share_hash_chain'] = self._offsets['block_hash_tree'] + len(blockhashes_s)
1014+        datavs = []
1015+        datavs.append(tuple([self._offsets['block_hash_tree'], blockhashes_s]))
1016+        def _on_failure():
1017+            del(self._offsets['share_hash_chain'])
1018+        return self._write(datavs, on_failure=_on_failure)
1019+
1020+
1021+    def put_sharehashes(self, sharehashes):
1022+        """
1023+        Put the share hash chain in the remote slot.
1024+
1025+        The salt hash tree must be put before the share hash chain,
1026+        since we need to know where the salt hash tree ends before we
1027+        can know where the share hash chain starts. The share hash chain
1028+        must be put before the signature, since the length of the packed
1029+        share hash chain determines the offset of the signature. Also,
1030+        semantically, you must know what the root of the salt hash tree
1031+        is before you can generate a valid signature.
1032+        """
1033+        assert isinstance(sharehashes, dict)
1034+        if "share_hash_chain" not in self._offsets:
1035+            raise LayoutInvalid("You need to put the salt hash tree before "
1036+                                "you can put the share hash chain")
1037+        # The signature comes after the share hash chain. If the
1038+        # signature has already been written, we must not write another
1039+        # share hash chain. The signature writes the verification key
1040+        # offset when it gets sent to the remote server, so we look for
1041+        # that.
1042+        if "verification_key" in self._offsets:
1043+            raise LayoutInvalid("You must write the share hash chain "
1044+                                "before you write the signature")
1045+        datavs = []
1046+        sharehashes_s = "".join([struct.pack(">H32s", i, sharehashes[i])
1047+                                  for i in sorted(sharehashes.keys())])
1048+        self._offsets['signature'] = self._offsets['share_hash_chain'] + len(sharehashes_s)
1049+        datavs.append(tuple([self._offsets['share_hash_chain'], sharehashes_s]))
1050+        def _on_failure():
1051+            del(self._offsets['signature'])
1052+        return self._write(datavs, on_failure=_on_failure)
1053+
1054+
1055+    def put_root_hash(self, roothash):
1056+        """
1057+        Put the root hash (the root of the share hash tree) in the
1058+        remote slot.
1059+        """
1060+        # It does not make sense to be able to put the root
1061+        # hash without first putting the share hashes, since you need
1062+        # the share hashes to generate the root hash.
1063+        #
1064+        # Signature is defined by the routine that places the share hash
1065+        # chain, so it's a good thing to look for in finding out whether
1066+        # or not the share hash chain exists on the remote server.
1067+        if "signature" not in self._offsets:
1068+            raise LayoutInvalid("You need to put the share hash chain "
1069+                                "before you can put the root share hash")
1070+        if len(roothash) != HASH_SIZE:
1071+            raise LayoutInvalid("hashes and salts must be exactly %d bytes"
1072+                                 % HASH_SIZE)
1073+        datavs = []
1074+        self._root_hash = roothash
1075+        # To write both of these values, we update the checkstring on
1076+        # the remote server, which includes them
1077+        checkstring = self.get_checkstring()
1078+        datavs.append(tuple([0, checkstring]))
1079+        # This write, if successful, changes the checkstring, so we need
1080+        # to update our internal checkstring to be consistent with the
1081+        # one on the server.
1082+        def _on_success():
1083+            self._testvs = [(0, len(checkstring), "eq", checkstring)]
1084+        def _on_failure():
1085+            self._root_hash = None
1086+        return self._write(datavs,
1087+                           on_success=_on_success,
1088+                           on_failure=_on_failure)
1089+
1090+
1091+    def get_signable(self):
1092+        """
1093+        Get the first seven fields of the mutable file; the parts that
1094+        are signed.
1095+        """
1096+        if not self._root_hash:
1097+            raise LayoutInvalid("You need to set the root hash "
1098+                                "before getting something to "
1099+                                "sign")
1100+        return struct.pack(MDMFSIGNABLEHEADER,
1101+                           1,
1102+                           self._seqnum,
1103+                           self._root_hash,
1104+                           self._required_shares,
1105+                           self._total_shares,
1106+                           self._segment_size,
1107+                           self._data_length)
1108+
1109+
1110+    def put_signature(self, signature):
1111+        """
1112+        Put the signature field to the remote slot.
1113+
1114+        I require that the root hash and share hash chain have been put
1115+        to the grid before I will write the signature to the grid.
1116+        """
1117+        if "signature" not in self._offsets:
1118+            raise LayoutInvalid("You must put the share hash chain "
1119+        # It does not make sense to put a signature without first
1120+        # putting the root hash and the salt hash (since otherwise
1121+        # the signature would be incomplete), so we don't allow that.
1122+                       "before putting the signature")
1123+        if not self._root_hash:
1124+            raise LayoutInvalid("You must complete the signed prefix "
1125+                                "before computing a signature")
1126+        # If we put the signature after we put the verification key, we
1127+        # could end up running into the verification key, and will
1128+        # probably screw up the offsets as well. So we don't allow that.
1129+        # The method that writes the verification key defines the EOF
1130+        # offset before writing the verification key, so look for that.
1131+        if "EOF" in self._offsets:
1132+            raise LayoutInvalid("You must write the signature before the verification key")
1133+
1134+        self._offsets['verification_key'] = self._offsets['signature'] + len(signature)
1135+        datavs = []
1136+        datavs.append(tuple([self._offsets['signature'], signature]))
1137+        def _on_failure():
1138+            del(self._offsets['verification_key'])
1139+        return self._write(datavs, on_failure=_on_failure)
1140+
1141+
1142+    def put_verification_key(self, verification_key):
1143+        """
1144+        Put the verification key into the remote slot.
1145+
1146+        I require that the signature have been written to the storage
1147+        server before I allow the verification key to be written to the
1148+        remote server.
1149+        """
1150+        if "verification_key" not in self._offsets:
1151+            raise LayoutInvalid("You must put the signature before you "
1152+                                "can put the verification key")
1153+        self._offsets['EOF'] = self._offsets['verification_key'] + len(verification_key)
1154+        datavs = []
1155+        datavs.append(tuple([self._offsets['verification_key'], verification_key]))
1156+        def _on_failure():
1157+            del(self._offsets['EOF'])
1158+        return self._write(datavs, on_failure=_on_failure)
1159+
1160+
1161+    def finish_publishing(self):
1162+        """
1163+        Write the offset table and encoding parameters to the remote
1164+        slot, since that's the only thing we have yet to publish at this
1165+        point.
1166+        """
1167+        if "EOF" not in self._offsets:
1168+            raise LayoutInvalid("You must put the verification key before "
1169+                                "you can publish the offsets")
1170+        offsets_offset = struct.calcsize(MDMFHEADERWITHOUTOFFSETS)
1171+        offsets = struct.pack(MDMFOFFSETS,
1172+                              self._offsets['enc_privkey'],
1173+                              self._offsets['block_hash_tree'],
1174+                              self._offsets['share_hash_chain'],
1175+                              self._offsets['signature'],
1176+                              self._offsets['verification_key'],
1177+                              self._offsets['EOF'])
1178+        datavs = []
1179+        datavs.append(tuple([offsets_offset, offsets]))
1180+        encoding_parameters_offset = struct.calcsize(MDMFCHECKSTRING)
1181+        params = struct.pack(">BBQQ",
1182+                             self._required_shares,
1183+                             self._total_shares,
1184+                             self._segment_size,
1185+                             self._data_length)
1186+        datavs.append(tuple([encoding_parameters_offset, params]))
1187+        return self._write(datavs)
1188+
1189+
1190+    def _write(self, datavs, on_failure=None, on_success=None):
1191+        """I write the data vectors in datavs to the remote slot."""
1192+        tw_vectors = {}
1193+        new_share = False
1194+        if not self._testvs:
1195+            self._testvs = []
1196+            self._testvs.append(tuple([0, 1, "eq", ""]))
1197+            new_share = True
1198+        if not self._written:
1199+            # Write a new checkstring to the share when we write it, so
1200+            # that we have something to check later.
1201+            new_checkstring = self.get_checkstring()
1202+            datavs.append((0, new_checkstring))
1203+            def _first_write():
1204+                self._written = True
1205+                self._testvs = [(0, len(new_checkstring), "eq", new_checkstring)]
1206+            on_success = _first_write
1207+        tw_vectors[self._shnum] = (self._testvs, datavs, None)
1208+        datalength = sum([len(x[1]) for x in datavs])
1209+        d = self._rref.callRemote("slot_testv_and_readv_and_writev",
1210+                                  self._storage_index,
1211+                                  self._secrets,
1212+                                  tw_vectors,
1213+                                  self._readv)
1214+        def _result(results):
1215+            if isinstance(results, failure.Failure) or not results[0]:
1216+                # Do nothing; the write was unsuccessful.
1217+                if on_failure: on_failure()
1218+            else:
1219+                if on_success: on_success()
1220+            return results
1221+        d.addCallback(_result)
1222+        return d
1223+
1224+
1225+class MDMFSlotReadProxy:
1226+    """
1227+    I read from a mutable slot filled with data written in the MDMF data
1228+    format (which is described above).
1229+
1230+    I can be initialized with some amount of data, which I will use (if
1231+    it is valid) to eliminate some of the need to fetch it from servers.
1232+    """
1233+    def __init__(self,
1234+                 rref,
1235+                 storage_index,
1236+                 shnum,
1237+                 data=""):
1238+        # Start the initialization process.
1239+        self._rref = rref
1240+        self._storage_index = storage_index
1241+        self.shnum = shnum
1242+
1243+        # Before doing anything, the reader is probably going to want to
1244+        # verify that the signature is correct. To do that, they'll need
1245+        # the verification key, and the signature. To get those, we'll
1246+        # need the offset table. So fetch the offset table on the
1247+        # assumption that that will be the first thing that a reader is
1248+        # going to do.
1249+
1250+        # The fact that these encoding parameters are None tells us
1251+        # that we haven't yet fetched them from the remote share, so we
1252+        # should. We could just not set them, but the checks will be
1253+        # easier to read if we don't have to use hasattr.
1254+        self._version_number = None
1255+        self._sequence_number = None
1256+        self._root_hash = None
1257+        # Filled in if we're dealing with an SDMF file. Unused
1258+        # otherwise.
1259+        self._salt = None
1260+        self._required_shares = None
1261+        self._total_shares = None
1262+        self._segment_size = None
1263+        self._data_length = None
1264+        self._offsets = None
1265+
1266+        # If the user has chosen to initialize us with some data, we'll
1267+        # try to satisfy subsequent data requests with that data before
1268+        # asking the storage server for it. If
1269+        self._data = data
1270+        # The way callers interact with cache in the filenode returns
1271+        # None if there isn't any cached data, but the way we index the
1272+        # cached data requires a string, so convert None to "".
1273+        if self._data == None:
1274+            self._data = ""
1275+
1276+        self._queue_observers = observer.ObserverList()
1277+        self._queue_errbacks = observer.ObserverList()
1278+        self._readvs = []
1279+
1280+
1281+    def _maybe_fetch_offsets_and_header(self, force_remote=False):
1282+        """
1283+        I fetch the offset table and the header from the remote slot if
1284+        I don't already have them. If I do have them, I do nothing and
1285+        return an empty Deferred.
1286+        """
1287+        if self._offsets:
1288+            return defer.succeed(None)
1289+        # At this point, we may be either SDMF or MDMF. Fetching 107
1290+        # bytes will be enough to get header and offsets for both SDMF and
1291+        # MDMF, though we'll be left with 4 more bytes than we
1292+        # need if this ends up being MDMF. This is probably less
1293+        # expensive than the cost of a second roundtrip.
1294+        readvs = [(0, 107)]
1295+        d = self._read(readvs, force_remote)
1296+        d.addCallback(self._process_encoding_parameters)
1297+        d.addCallback(self._process_offsets)
1298+        return d
1299+
1300+
1301+    def _process_encoding_parameters(self, encoding_parameters):
1302+        assert self.shnum in encoding_parameters
1303+        encoding_parameters = encoding_parameters[self.shnum][0]
1304+        # The first byte is the version number. It will tell us what
1305+        # to do next.
1306+        (verno,) = struct.unpack(">B", encoding_parameters[:1])
1307+        if verno == MDMF_VERSION:
1308+            read_size = MDMFHEADERWITHOUTOFFSETSSIZE
1309+            (verno,
1310+             seqnum,
1311+             root_hash,
1312+             k,
1313+             n,
1314+             segsize,
1315+             datalen) = struct.unpack(MDMFHEADERWITHOUTOFFSETS,
1316+                                      encoding_parameters[:read_size])
1317+            if segsize == 0 and datalen == 0:
1318+                # Empty file, no segments.
1319+                self._num_segments = 0
1320+            else:
1321+                self._num_segments = mathutil.div_ceil(datalen, segsize)
1322+
1323+        elif verno == SDMF_VERSION:
1324+            read_size = SIGNED_PREFIX_LENGTH
1325+            (verno,
1326+             seqnum,
1327+             root_hash,
1328+             salt,
1329+             k,
1330+             n,
1331+             segsize,
1332+             datalen) = struct.unpack(">BQ32s16s BBQQ",
1333+                                encoding_parameters[:SIGNED_PREFIX_LENGTH])
1334+            self._salt = salt
1335+            if segsize == 0 and datalen == 0:
1336+                # empty file
1337+                self._num_segments = 0
1338+            else:
1339+                # non-empty SDMF files have one segment.
1340+                self._num_segments = 1
1341+        else:
1342+            raise UnknownVersionError("You asked me to read mutable file "
1343+                                      "version %d, but I only understand "
1344+                                      "%d and %d" % (verno, SDMF_VERSION,
1345+                                                     MDMF_VERSION))
1346+
1347+        self._version_number = verno
1348+        self._sequence_number = seqnum
1349+        self._root_hash = root_hash
1350+        self._required_shares = k
1351+        self._total_shares = n
1352+        self._segment_size = segsize
1353+        self._data_length = datalen
1354+
1355+        self._block_size = self._segment_size / self._required_shares
1356+        # We can upload empty files, and need to account for this fact
1357+        # so as to avoid zero-division and zero-modulo errors.
1358+        if datalen > 0:
1359+            tail_size = self._data_length % self._segment_size
1360+        else:
1361+            tail_size = 0
1362+        if not tail_size:
1363+            self._tail_block_size = self._block_size
1364+        else:
1365+            self._tail_block_size = mathutil.next_multiple(tail_size,
1366+                                                    self._required_shares)
1367+            self._tail_block_size /= self._required_shares
1368+
1369+        return encoding_parameters
1370+
1371+
1372+    def _process_offsets(self, offsets):
1373+        if self._version_number == 0:
1374+            read_size = OFFSETS_LENGTH
1375+            read_offset = SIGNED_PREFIX_LENGTH
1376+            end = read_size + read_offset
1377+            (signature,
1378+             share_hash_chain,
1379+             block_hash_tree,
1380+             share_data,
1381+             enc_privkey,
1382+             EOF) = struct.unpack(">LLLLQQ",
1383+                                  offsets[read_offset:end])
1384+            self._offsets = {}
1385+            self._offsets['signature'] = signature
1386+            self._offsets['share_data'] = share_data
1387+            self._offsets['block_hash_tree'] = block_hash_tree
1388+            self._offsets['share_hash_chain'] = share_hash_chain
1389+            self._offsets['enc_privkey'] = enc_privkey
1390+            self._offsets['EOF'] = EOF
1391+
1392+        elif self._version_number == 1:
1393+            read_offset = MDMFHEADERWITHOUTOFFSETSSIZE
1394+            read_length = MDMFOFFSETS_LENGTH
1395+            end = read_offset + read_length
1396+            (encprivkey,
1397+             blockhashes,
1398+             sharehashes,
1399+             signature,
1400+             verification_key,
1401+             eof) = struct.unpack(MDMFOFFSETS,
1402+                                  offsets[read_offset:end])
1403+            self._offsets = {}
1404+            self._offsets['enc_privkey'] = encprivkey
1405+            self._offsets['block_hash_tree'] = blockhashes
1406+            self._offsets['share_hash_chain'] = sharehashes
1407+            self._offsets['signature'] = signature
1408+            self._offsets['verification_key'] = verification_key
1409+            self._offsets['EOF'] = eof
1410+
1411+
1412+    def get_block_and_salt(self, segnum, queue=False):
1413+        """
1414+        I return (block, salt), where block is the block data and
1415+        salt is the salt used to encrypt that segment.
1416+        """
1417+        d = self._maybe_fetch_offsets_and_header()
1418+        def _then(ignored):
1419+            if self._version_number == 1:
1420+                base_share_offset = MDMFHEADERSIZE
1421+            else:
1422+                base_share_offset = self._offsets['share_data']
1423+
1424+            if segnum + 1 > self._num_segments:
1425+                raise LayoutInvalid("Not a valid segment number")
1426+
1427+            if self._version_number == 0:
1428+                share_offset = base_share_offset + self._block_size * segnum
1429+            else:
1430+                share_offset = base_share_offset + (self._block_size + \
1431+                                                    SALT_SIZE) * segnum
1432+            if segnum + 1 == self._num_segments:
1433+                data = self._tail_block_size
1434+            else:
1435+                data = self._block_size
1436+
1437+            if self._version_number == 1:
1438+                data += SALT_SIZE
1439+
1440+            readvs = [(share_offset, data)]
1441+            return readvs
1442+        d.addCallback(_then)
1443+        d.addCallback(lambda readvs:
1444+            self._read(readvs, queue=queue))
1445+        def _process_results(results):
1446+            assert self.shnum in results
1447+            if self._version_number == 0:
1448+                # We only read the share data, but we know the salt from
1449+                # when we fetched the header
1450+                data = results[self.shnum]
1451+                if not data:
1452+                    data = ""
1453+                else:
1454+                    assert len(data) == 1
1455+                    data = data[0]
1456+                salt = self._salt
1457+            else:
1458+                data = results[self.shnum]
1459+                if not data:
1460+                    salt = data = ""
1461+                else:
1462+                    salt_and_data = results[self.shnum][0]
1463+                    salt = salt_and_data[:SALT_SIZE]
1464+                    data = salt_and_data[SALT_SIZE:]
1465+            return data, salt
1466+        d.addCallback(_process_results)
1467+        return d
1468+
1469+
1470+    def get_blockhashes(self, needed=None, queue=False, force_remote=False):
1471+        """
1472+        I return the block hash tree
1473+
1474+        I take an optional argument, needed, which is a set of indices
1475+        correspond to hashes that I should fetch. If this argument is
1476+        missing, I will fetch the entire block hash tree; otherwise, I
1477+        may attempt to fetch fewer hashes, based on what needed says
1478+        that I should do. Note that I may fetch as many hashes as I
1479+        want, so long as the set of hashes that I do fetch is a superset
1480+        of the ones that I am asked for, so callers should be prepared
1481+        to tolerate additional hashes.
1482+        """
1483+        # TODO: Return only the parts of the block hash tree necessary
1484+        # to validate the blocknum provided?
1485+        # This is a good idea, but it is hard to implement correctly. It
1486+        # is bad to fetch any one block hash more than once, so we
1487+        # probably just want to fetch the whole thing at once and then
1488+        # serve it.
1489+        if needed == set([]):
1490+            return defer.succeed([])
1491+        d = self._maybe_fetch_offsets_and_header()
1492+        def _then(ignored):
1493+            blockhashes_offset = self._offsets['block_hash_tree']
1494+            if self._version_number == 1:
1495+                blockhashes_length = self._offsets['share_hash_chain'] - blockhashes_offset
1496+            else:
1497+                blockhashes_length = self._offsets['share_data'] - blockhashes_offset
1498+            readvs = [(blockhashes_offset, blockhashes_length)]
1499+            return readvs
1500+        d.addCallback(_then)
1501+        d.addCallback(lambda readvs:
1502+            self._read(readvs, queue=queue, force_remote=force_remote))
1503+        def _build_block_hash_tree(results):
1504+            assert self.shnum in results
1505+
1506+            rawhashes = results[self.shnum][0]
1507+            results = [rawhashes[i:i+HASH_SIZE]
1508+                       for i in range(0, len(rawhashes), HASH_SIZE)]
1509+            return results
1510+        d.addCallback(_build_block_hash_tree)
1511+        return d
1512+
1513+
1514+    def get_sharehashes(self, needed=None, queue=False, force_remote=False):
1515+        """
1516+        I return the part of the share hash chain placed to validate
1517+        this share.
1518+
1519+        I take an optional argument, needed. Needed is a set of indices
1520+        that correspond to the hashes that I should fetch. If needed is
1521+        not present, I will fetch and return the entire share hash
1522+        chain. Otherwise, I may fetch and return any part of the share
1523+        hash chain that is a superset of the part that I am asked to
1524+        fetch. Callers should be prepared to deal with more hashes than
1525+        they've asked for.
1526+        """
1527+        if needed == set([]):
1528+            return defer.succeed([])
1529+        d = self._maybe_fetch_offsets_and_header()
1530+
1531+        def _make_readvs(ignored):
1532+            sharehashes_offset = self._offsets['share_hash_chain']
1533+            if self._version_number == 0:
1534+                sharehashes_length = self._offsets['block_hash_tree'] - sharehashes_offset
1535+            else:
1536+                sharehashes_length = self._offsets['signature'] - sharehashes_offset
1537+            readvs = [(sharehashes_offset, sharehashes_length)]
1538+            return readvs
1539+        d.addCallback(_make_readvs)
1540+        d.addCallback(lambda readvs:
1541+            self._read(readvs, queue=queue, force_remote=force_remote))
1542+        def _build_share_hash_chain(results):
1543+            assert self.shnum in results
1544+
1545+            sharehashes = results[self.shnum][0]
1546+            results = [sharehashes[i:i+(HASH_SIZE + 2)]
1547+                       for i in range(0, len(sharehashes), HASH_SIZE + 2)]
1548+            results = dict([struct.unpack(">H32s", data)
1549+                            for data in results])
1550+            return results
1551+        d.addCallback(_build_share_hash_chain)
1552+        return d
1553+
1554+
1555+    def get_encprivkey(self, queue=False):
1556+        """
1557+        I return the encrypted private key.
1558+        """
1559+        d = self._maybe_fetch_offsets_and_header()
1560+
1561+        def _make_readvs(ignored):
1562+            privkey_offset = self._offsets['enc_privkey']
1563+            if self._version_number == 0:
1564+                privkey_length = self._offsets['EOF'] - privkey_offset
1565+            else:
1566+                privkey_length = self._offsets['block_hash_tree'] - privkey_offset
1567+            readvs = [(privkey_offset, privkey_length)]
1568+            return readvs
1569+        d.addCallback(_make_readvs)
1570+        d.addCallback(lambda readvs:
1571+            self._read(readvs, queue=queue))
1572+        def _process_results(results):
1573+            assert self.shnum in results
1574+            privkey = results[self.shnum][0]
1575+            return privkey
1576+        d.addCallback(_process_results)
1577+        return d
1578+
1579+
1580+    def get_signature(self, queue=False):
1581+        """
1582+        I return the signature of my share.
1583+        """
1584+        d = self._maybe_fetch_offsets_and_header()
1585+
1586+        def _make_readvs(ignored):
1587+            signature_offset = self._offsets['signature']
1588+            if self._version_number == 1:
1589+                signature_length = self._offsets['verification_key'] - signature_offset
1590+            else:
1591+                signature_length = self._offsets['share_hash_chain'] - signature_offset
1592+            readvs = [(signature_offset, signature_length)]
1593+            return readvs
1594+        d.addCallback(_make_readvs)
1595+        d.addCallback(lambda readvs:
1596+            self._read(readvs, queue=queue))
1597+        def _process_results(results):
1598+            assert self.shnum in results
1599+            signature = results[self.shnum][0]
1600+            return signature
1601+        d.addCallback(_process_results)
1602+        return d
1603+
1604+
1605+    def get_verification_key(self, queue=False):
1606+        """
1607+        I return the verification key.
1608+        """
1609+        d = self._maybe_fetch_offsets_and_header()
1610+
1611+        def _make_readvs(ignored):
1612+            if self._version_number == 1:
1613+                vk_offset = self._offsets['verification_key']
1614+                vk_length = self._offsets['EOF'] - vk_offset
1615+            else:
1616+                vk_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
1617+                vk_length = self._offsets['signature'] - vk_offset
1618+            readvs = [(vk_offset, vk_length)]
1619+            return readvs
1620+        d.addCallback(_make_readvs)
1621+        d.addCallback(lambda readvs:
1622+            self._read(readvs, queue=queue))
1623+        def _process_results(results):
1624+            assert self.shnum in results
1625+            verification_key = results[self.shnum][0]
1626+            return verification_key
1627+        d.addCallback(_process_results)
1628+        return d
1629+
1630+
1631+    def get_encoding_parameters(self):
1632+        """
1633+        I return (k, n, segsize, datalen)
1634+        """
1635+        d = self._maybe_fetch_offsets_and_header()
1636+        d.addCallback(lambda ignored:
1637+            (self._required_shares,
1638+             self._total_shares,
1639+             self._segment_size,
1640+             self._data_length))
1641+        return d
1642+
1643+
1644+    def get_seqnum(self):
1645+        """
1646+        I return the sequence number for this share.
1647+        """
1648+        d = self._maybe_fetch_offsets_and_header()
1649+        d.addCallback(lambda ignored:
1650+            self._sequence_number)
1651+        return d
1652+
1653+
1654+    def get_root_hash(self):
1655+        """
1656+        I return the root of the block hash tree
1657+        """
1658+        d = self._maybe_fetch_offsets_and_header()
1659+        d.addCallback(lambda ignored: self._root_hash)
1660+        return d
1661+
1662+
1663+    def get_checkstring(self):
1664+        """
1665+        I return the packed representation of the following:
1666+
1667+            - version number
1668+            - sequence number
1669+            - root hash
1670+            - salt hash
1671+
1672+        which my users use as a checkstring to detect other writers.
1673+        """
1674+        d = self._maybe_fetch_offsets_and_header()
1675+        def _build_checkstring(ignored):
1676+            if self._salt:
1677+                checkstring = strut.pack(PREFIX,
1678+                                         self._version_number,
1679+                                         self._sequence_number,
1680+                                         self._root_hash,
1681+                                         self._salt)
1682+            else:
1683+                checkstring = struct.pack(MDMFCHECKSTRING,
1684+                                          self._version_number,
1685+                                          self._sequence_number,
1686+                                          self._root_hash)
1687+
1688+            return checkstring
1689+        d.addCallback(_build_checkstring)
1690+        return d
1691+
1692+
1693+    def get_prefix(self, force_remote):
1694+        d = self._maybe_fetch_offsets_and_header(force_remote)
1695+        d.addCallback(lambda ignored:
1696+            self._build_prefix())
1697+        return d
1698+
1699+
1700+    def _build_prefix(self):
1701+        # The prefix is another name for the part of the remote share
1702+        # that gets signed. It consists of everything up to and
1703+        # including the datalength, packed by struct.
1704+        if self._version_number == SDMF_VERSION:
1705+            return struct.pack(SIGNED_PREFIX,
1706+                           self._version_number,
1707+                           self._sequence_number,
1708+                           self._root_hash,
1709+                           self._salt,
1710+                           self._required_shares,
1711+                           self._total_shares,
1712+                           self._segment_size,
1713+                           self._data_length)
1714+
1715+        else:
1716+            return struct.pack(MDMFSIGNABLEHEADER,
1717+                           self._version_number,
1718+                           self._sequence_number,
1719+                           self._root_hash,
1720+                           self._required_shares,
1721+                           self._total_shares,
1722+                           self._segment_size,
1723+                           self._data_length)
1724+
1725+
1726+    def _get_offsets_tuple(self):
1727+        # The offsets tuple is another component of the version
1728+        # information tuple. It is basically our offsets dictionary,
1729+        # itemized and in a tuple.
1730+        return self._offsets.copy()
1731+
1732+
1733+    def get_verinfo(self):
1734+        """
1735+        I return my verinfo tuple. This is used by the ServermapUpdater
1736+        to keep track of versions of mutable files.
1737+
1738+        The verinfo tuple for MDMF files contains:
1739+            - seqnum
1740+            - root hash
1741+            - a blank (nothing)
1742+            - segsize
1743+            - datalen
1744+            - k
1745+            - n
1746+            - prefix (the thing that you sign)
1747+            - a tuple of offsets
1748+
1749+        We include the nonce in MDMF to simplify processing of version
1750+        information tuples.
1751+
1752+        The verinfo tuple for SDMF files is the same, but contains a
1753+        16-byte IV instead of a hash of salts.
1754+        """
1755+        d = self._maybe_fetch_offsets_and_header()
1756+        def _build_verinfo(ignored):
1757+            if self._version_number == SDMF_VERSION:
1758+                salt_to_use = self._salt
1759+            else:
1760+                salt_to_use = None
1761+            return (self._sequence_number,
1762+                    self._root_hash,
1763+                    salt_to_use,
1764+                    self._segment_size,
1765+                    self._data_length,
1766+                    self._required_shares,
1767+                    self._total_shares,
1768+                    self._build_prefix(),
1769+                    self._get_offsets_tuple())
1770+        d.addCallback(_build_verinfo)
1771+        return d
1772+
1773+
1774+    def flush(self):
1775+        """
1776+        I flush my queue of read vectors.
1777+        """
1778+        d = self._read(self._readvs)
1779+        def _then(results):
1780+            self._readvs = []
1781+            if isinstance(results, failure.Failure):
1782+                self._queue_errbacks.notify(results)
1783+            else:
1784+                self._queue_observers.notify(results)
1785+            self._queue_observers = observer.ObserverList()
1786+            self._queue_errbacks = observer.ObserverList()
1787+        d.addBoth(_then)
1788+
1789+
1790+    def _read(self, readvs, force_remote=False, queue=False):
1791+        unsatisfiable = filter(lambda x: x[0] + x[1] > len(self._data), readvs)
1792+        # TODO: It's entirely possible to tweak this so that it just
1793+        # fulfills the requests that it can, and not demand that all
1794+        # requests are satisfiable before running it.
1795+        if not unsatisfiable and not force_remote:
1796+            results = [self._data[offset:offset+length]
1797+                       for (offset, length) in readvs]
1798+            results = {self.shnum: results}
1799+            return defer.succeed(results)
1800+        else:
1801+            if queue:
1802+                start = len(self._readvs)
1803+                self._readvs += readvs
1804+                end = len(self._readvs)
1805+                def _get_results(results, start, end):
1806+                    if not self.shnum in results:
1807+                        return {self._shnum: [""]}
1808+                    return {self.shnum: results[self.shnum][start:end]}
1809+                d = defer.Deferred()
1810+                d.addCallback(_get_results, start, end)
1811+                self._queue_observers.subscribe(d.callback)
1812+                self._queue_errbacks.subscribe(d.errback)
1813+                return d
1814+            return self._rref.callRemote("slot_readv",
1815+                                         self._storage_index,
1816+                                         [self.shnum],
1817+                                         readvs)
1818+
1819+
1820+    def is_sdmf(self):
1821+        """I tell my caller whether or not my remote file is SDMF or MDMF
1822+        """
1823+        d = self._maybe_fetch_offsets_and_header()
1824+        d.addCallback(lambda ignored:
1825+            self._version_number == 0)
1826+        return d
1827+
1828+
1829+class LayoutInvalid(Exception):
1830+    """
1831+    This isn't a valid MDMF mutable file
1832+    """
1833hunk ./src/allmydata/test/test_storage.py 2
1834 
1835-import time, os.path, stat, re, simplejson, struct
1836+import time, os.path, stat, re, simplejson, struct, shutil
1837 
1838 from twisted.trial import unittest
1839 
1840hunk ./src/allmydata/test/test_storage.py 22
1841 from allmydata.storage.expirer import LeaseCheckingCrawler
1842 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
1843      ReadBucketProxy
1844-from allmydata.interfaces import BadWriteEnablerError
1845-from allmydata.test.common import LoggingServiceParent
1846+from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
1847+                                     LayoutInvalid, MDMFSIGNABLEHEADER, \
1848+                                     SIGNED_PREFIX, MDMFHEADER, MDMFOFFSETS
1849+from allmydata.interfaces import BadWriteEnablerError, MDMF_VERSION, \
1850+                                 SDMF_VERSION
1851+from allmydata.test.common import LoggingServiceParent, ShouldFailMixin
1852 from allmydata.test.common_web import WebRenderingMixin
1853 from allmydata.web.storage import StorageStatus, remove_prefix
1854 
1855hunk ./src/allmydata/test/test_storage.py 105
1856 
1857 class RemoteBucket:
1858 
1859+    def __init__(self):
1860+        self.read_count = 0
1861+        self.write_count = 0
1862+
1863     def callRemote(self, methname, *args, **kwargs):
1864         def _call():
1865             meth = getattr(self.target, "remote_" + methname)
1866hunk ./src/allmydata/test/test_storage.py 113
1867             return meth(*args, **kwargs)
1868+
1869+        if methname == "slot_readv":
1870+            self.read_count += 1
1871+        if methname == "slot_writev":
1872+            self.write_count += 1
1873+
1874         return defer.maybeDeferred(_call)
1875 
1876hunk ./src/allmydata/test/test_storage.py 121
1877+
1878 class BucketProxy(unittest.TestCase):
1879     def make_bucket(self, name, size):
1880         basedir = os.path.join("storage", "BucketProxy", name)
1881hunk ./src/allmydata/test/test_storage.py 1298
1882         self.failUnless(os.path.exists(prefixdir), prefixdir)
1883         self.failIf(os.path.exists(bucketdir), bucketdir)
1884 
1885+
1886+class MDMFProxies(unittest.TestCase, ShouldFailMixin):
1887+    def setUp(self):
1888+        self.sparent = LoggingServiceParent()
1889+        self._lease_secret = itertools.count()
1890+        self.ss = self.create("MDMFProxies storage test server")
1891+        self.rref = RemoteBucket()
1892+        self.rref.target = self.ss
1893+        self.secrets = (self.write_enabler("we_secret"),
1894+                        self.renew_secret("renew_secret"),
1895+                        self.cancel_secret("cancel_secret"))
1896+        self.segment = "aaaaaa"
1897+        self.block = "aa"
1898+        self.salt = "a" * 16
1899+        self.block_hash = "a" * 32
1900+        self.block_hash_tree = [self.block_hash for i in xrange(6)]
1901+        self.share_hash = self.block_hash
1902+        self.share_hash_chain = dict([(i, self.share_hash) for i in xrange(6)])
1903+        self.signature = "foobarbaz"
1904+        self.verification_key = "vvvvvv"
1905+        self.encprivkey = "private"
1906+        self.root_hash = self.block_hash
1907+        self.salt_hash = self.root_hash
1908+        self.salt_hash_tree = [self.salt_hash for i in xrange(6)]
1909+        self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1910+        self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1911+        # blockhashes and salt hashes are serialized in the same way,
1912+        # only we lop off the first element and store that in the
1913+        # header.
1914+        self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1915+
1916+
1917+    def tearDown(self):
1918+        self.sparent.stopService()
1919+        shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1920+
1921+
1922+    def write_enabler(self, we_tag):
1923+        return hashutil.tagged_hash("we_blah", we_tag)
1924+
1925+
1926+    def renew_secret(self, tag):
1927+        return hashutil.tagged_hash("renew_blah", str(tag))
1928+
1929+
1930+    def cancel_secret(self, tag):
1931+        return hashutil.tagged_hash("cancel_blah", str(tag))
1932+
1933+
1934+    def workdir(self, name):
1935+        basedir = os.path.join("storage", "MutableServer", name)
1936+        return basedir
1937+
1938+
1939+    def create(self, name):
1940+        workdir = self.workdir(name)
1941+        ss = StorageServer(workdir, "\x00" * 20)
1942+        ss.setServiceParent(self.sparent)
1943+        return ss
1944+
1945+
1946+    def build_test_mdmf_share(self, tail_segment=False, empty=False):
1947+        # Start with the checkstring
1948+        data = struct.pack(">BQ32s",
1949+                           1,
1950+                           0,
1951+                           self.root_hash)
1952+        self.checkstring = data
1953+        # Next, the encoding parameters
1954+        if tail_segment:
1955+            data += struct.pack(">BBQQ",
1956+                                3,
1957+                                10,
1958+                                6,
1959+                                33)
1960+        elif empty:
1961+            data += struct.pack(">BBQQ",
1962+                                3,
1963+                                10,
1964+                                0,
1965+                                0)
1966+        else:
1967+            data += struct.pack(">BBQQ",
1968+                                3,
1969+                                10,
1970+                                6,
1971+                                36)
1972+        # Now we'll build the offsets.
1973+        sharedata = ""
1974+        if not tail_segment and not empty:
1975+            for i in xrange(6):
1976+                sharedata += self.salt + self.block
1977+        elif tail_segment:
1978+            for i in xrange(5):
1979+                sharedata += self.salt + self.block
1980+            sharedata += self.salt + "a"
1981+
1982+        # The encrypted private key comes after the shares + salts
1983+        offset_size = struct.calcsize(MDMFOFFSETS)
1984+        encrypted_private_key_offset = len(data) + offset_size + len(sharedata)
1985+        # The blockhashes come after the private key
1986+        blockhashes_offset = encrypted_private_key_offset + len(self.encprivkey)
1987+        # The sharehashes come after the salt hashes
1988+        sharehashes_offset = blockhashes_offset + len(self.block_hash_tree_s)
1989+        # The signature comes after the share hash chain
1990+        signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
1991+        # The verification key comes after the signature
1992+        verification_offset = signature_offset + len(self.signature)
1993+        # The EOF comes after the verification key
1994+        eof_offset = verification_offset + len(self.verification_key)
1995+        data += struct.pack(MDMFOFFSETS,
1996+                            encrypted_private_key_offset,
1997+                            blockhashes_offset,
1998+                            sharehashes_offset,
1999+                            signature_offset,
2000+                            verification_offset,
2001+                            eof_offset)
2002+        self.offsets = {}
2003+        self.offsets['enc_privkey'] = encrypted_private_key_offset
2004+        self.offsets['block_hash_tree'] = blockhashes_offset
2005+        self.offsets['share_hash_chain'] = sharehashes_offset
2006+        self.offsets['signature'] = signature_offset
2007+        self.offsets['verification_key'] = verification_offset
2008+        self.offsets['EOF'] = eof_offset
2009+        # Next, we'll add in the salts and share data,
2010+        data += sharedata
2011+        # the private key,
2012+        data += self.encprivkey
2013+        # the block hash tree,
2014+        data += self.block_hash_tree_s
2015+        # the share hash chain,
2016+        data += self.share_hash_chain_s
2017+        # the signature,
2018+        data += self.signature
2019+        # and the verification key
2020+        data += self.verification_key
2021+        return data
2022+
2023+
2024+    def write_test_share_to_server(self,
2025+                                   storage_index,
2026+                                   tail_segment=False,
2027+                                   empty=False):
2028+        """
2029+        I write some data for the read tests to read to self.ss
2030+
2031+        If tail_segment=True, then I will write a share that has a
2032+        smaller tail segment than other segments.
2033+        """
2034+        write = self.ss.remote_slot_testv_and_readv_and_writev
2035+        data = self.build_test_mdmf_share(tail_segment, empty)
2036+        # Finally, we write the whole thing to the storage server in one
2037+        # pass.
2038+        testvs = [(0, 1, "eq", "")]
2039+        tws = {}
2040+        tws[0] = (testvs, [(0, data)], None)
2041+        readv = [(0, 1)]
2042+        results = write(storage_index, self.secrets, tws, readv)
2043+        self.failUnless(results[0])
2044+
2045+
2046+    def build_test_sdmf_share(self, empty=False):
2047+        if empty:
2048+            sharedata = ""
2049+        else:
2050+            sharedata = self.segment * 6
2051+        blocksize = len(sharedata) / 3
2052+        block = sharedata[:blocksize]
2053+        prefix = struct.pack(">BQ32s16s BBQQ",
2054+                             0, # version,
2055+                             0,
2056+                             self.root_hash,
2057+                             self.salt,
2058+                             3,
2059+                             10,
2060+                             len(sharedata),
2061+                             len(sharedata),
2062+                            )
2063+        post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
2064+        signature_offset = post_offset + len(self.verification_key)
2065+        sharehashes_offset = signature_offset + len(self.signature)
2066+        blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
2067+        sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
2068+        encprivkey_offset = sharedata_offset + len(block)
2069+        eof_offset = encprivkey_offset + len(self.encprivkey)
2070+        offsets = struct.pack(">LLLLQQ",
2071+                              signature_offset,
2072+                              sharehashes_offset,
2073+                              blockhashes_offset,
2074+                              sharedata_offset,
2075+                              encprivkey_offset,
2076+                              eof_offset)
2077+        final_share = "".join([prefix,
2078+                           offsets,
2079+                           self.verification_key,
2080+                           self.signature,
2081+                           self.share_hash_chain_s,
2082+                           self.block_hash_tree_s,
2083+                           block,
2084+                           self.encprivkey])
2085+        self.offsets = {}
2086+        self.offsets['signature'] = signature_offset
2087+        self.offsets['share_hash_chain'] = sharehashes_offset
2088+        self.offsets['block_hash_tree'] = blockhashes_offset
2089+        self.offsets['share_data'] = sharedata_offset
2090+        self.offsets['enc_privkey'] = encprivkey_offset
2091+        self.offsets['EOF'] = eof_offset
2092+        return final_share
2093+
2094+
2095+    def write_sdmf_share_to_server(self,
2096+                                   storage_index,
2097+                                   empty=False):
2098+        # Some tests need SDMF shares to verify that we can still
2099+        # read them. This method writes one, which resembles but is not
2100+        assert self.rref
2101+        write = self.ss.remote_slot_testv_and_readv_and_writev
2102+        share = self.build_test_sdmf_share(empty)
2103+        testvs = [(0, 1, "eq", "")]
2104+        tws = {}
2105+        tws[0] = (testvs, [(0, share)], None)
2106+        readv = []
2107+        results = write(storage_index, self.secrets, tws, readv)
2108+        self.failUnless(results[0])
2109+
2110+
2111+    def test_read(self):
2112+        self.write_test_share_to_server("si1")
2113+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2114+        # Check that every method equals what we expect it to.
2115+        d = defer.succeed(None)
2116+        def _check_block_and_salt((block, salt)):
2117+            self.failUnlessEqual(block, self.block)
2118+            self.failUnlessEqual(salt, self.salt)
2119+
2120+        for i in xrange(6):
2121+            d.addCallback(lambda ignored, i=i:
2122+                mr.get_block_and_salt(i))
2123+            d.addCallback(_check_block_and_salt)
2124+
2125+        d.addCallback(lambda ignored:
2126+            mr.get_encprivkey())
2127+        d.addCallback(lambda encprivkey:
2128+            self.failUnlessEqual(self.encprivkey, encprivkey))
2129+
2130+        d.addCallback(lambda ignored:
2131+            mr.get_blockhashes())
2132+        d.addCallback(lambda blockhashes:
2133+            self.failUnlessEqual(self.block_hash_tree, blockhashes))
2134+
2135+        d.addCallback(lambda ignored:
2136+            mr.get_sharehashes())
2137+        d.addCallback(lambda sharehashes:
2138+            self.failUnlessEqual(self.share_hash_chain, sharehashes))
2139+
2140+        d.addCallback(lambda ignored:
2141+            mr.get_signature())
2142+        d.addCallback(lambda signature:
2143+            self.failUnlessEqual(signature, self.signature))
2144+
2145+        d.addCallback(lambda ignored:
2146+            mr.get_verification_key())
2147+        d.addCallback(lambda verification_key:
2148+            self.failUnlessEqual(verification_key, self.verification_key))
2149+
2150+        d.addCallback(lambda ignored:
2151+            mr.get_seqnum())
2152+        d.addCallback(lambda seqnum:
2153+            self.failUnlessEqual(seqnum, 0))
2154+
2155+        d.addCallback(lambda ignored:
2156+            mr.get_root_hash())
2157+        d.addCallback(lambda root_hash:
2158+            self.failUnlessEqual(self.root_hash, root_hash))
2159+
2160+        d.addCallback(lambda ignored:
2161+            mr.get_seqnum())
2162+        d.addCallback(lambda seqnum:
2163+            self.failUnlessEqual(0, seqnum))
2164+
2165+        d.addCallback(lambda ignored:
2166+            mr.get_encoding_parameters())
2167+        def _check_encoding_parameters((k, n, segsize, datalen)):
2168+            self.failUnlessEqual(k, 3)
2169+            self.failUnlessEqual(n, 10)
2170+            self.failUnlessEqual(segsize, 6)
2171+            self.failUnlessEqual(datalen, 36)
2172+        d.addCallback(_check_encoding_parameters)
2173+
2174+        d.addCallback(lambda ignored:
2175+            mr.get_checkstring())
2176+        d.addCallback(lambda checkstring:
2177+            self.failUnlessEqual(checkstring, checkstring))
2178+        return d
2179+
2180+
2181+    def test_read_with_different_tail_segment_size(self):
2182+        self.write_test_share_to_server("si1", tail_segment=True)
2183+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2184+        d = mr.get_block_and_salt(5)
2185+        def _check_tail_segment(results):
2186+            block, salt = results
2187+            self.failUnlessEqual(len(block), 1)
2188+            self.failUnlessEqual(block, "a")
2189+        d.addCallback(_check_tail_segment)
2190+        return d
2191+
2192+
2193+    def test_get_block_with_invalid_segnum(self):
2194+        self.write_test_share_to_server("si1")
2195+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2196+        d = defer.succeed(None)
2197+        d.addCallback(lambda ignored:
2198+            self.shouldFail(LayoutInvalid, "test invalid segnum",
2199+                            None,
2200+                            mr.get_block_and_salt, 7))
2201+        return d
2202+
2203+
2204+    def test_get_encoding_parameters_first(self):
2205+        self.write_test_share_to_server("si1")
2206+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2207+        d = mr.get_encoding_parameters()
2208+        def _check_encoding_parameters((k, n, segment_size, datalen)):
2209+            self.failUnlessEqual(k, 3)
2210+            self.failUnlessEqual(n, 10)
2211+            self.failUnlessEqual(segment_size, 6)
2212+            self.failUnlessEqual(datalen, 36)
2213+        d.addCallback(_check_encoding_parameters)
2214+        return d
2215+
2216+
2217+    def test_get_seqnum_first(self):
2218+        self.write_test_share_to_server("si1")
2219+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2220+        d = mr.get_seqnum()
2221+        d.addCallback(lambda seqnum:
2222+            self.failUnlessEqual(seqnum, 0))
2223+        return d
2224+
2225+
2226+    def test_get_root_hash_first(self):
2227+        self.write_test_share_to_server("si1")
2228+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2229+        d = mr.get_root_hash()
2230+        d.addCallback(lambda root_hash:
2231+            self.failUnlessEqual(root_hash, self.root_hash))
2232+        return d
2233+
2234+
2235+    def test_get_checkstring_first(self):
2236+        self.write_test_share_to_server("si1")
2237+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2238+        d = mr.get_checkstring()
2239+        d.addCallback(lambda checkstring:
2240+            self.failUnlessEqual(checkstring, self.checkstring))
2241+        return d
2242+
2243+
2244+    def test_write_read_vectors(self):
2245+        # When writing for us, the storage server will return to us a
2246+        # read vector, along with its result. If a write fails because
2247+        # the test vectors failed, this read vector can help us to
2248+        # diagnose the problem. This test ensures that the read vector
2249+        # is working appropriately.
2250+        mw = self._make_new_mw("si1", 0)
2251+        d = defer.succeed(None)
2252+
2253+        # Write one share. This should return a checkstring of nothing,
2254+        # since there is no data there.
2255+        d.addCallback(lambda ignored:
2256+            mw.put_block(self.block, 0, self.salt))
2257+        def _check_first_write(results):
2258+            result, readvs = results
2259+            self.failUnless(result)
2260+            self.failIf(readvs)
2261+        d.addCallback(_check_first_write)
2262+        # Now, there should be a different checkstring returned when
2263+        # we write other shares
2264+        d.addCallback(lambda ignored:
2265+            mw.put_block(self.block, 1, self.salt))
2266+        def _check_next_write(results):
2267+            result, readvs = results
2268+            self.failUnless(result)
2269+            self.expected_checkstring = mw.get_checkstring()
2270+            self.failUnlessIn(0, readvs)
2271+            self.failUnlessEqual(readvs[0][0], self.expected_checkstring)
2272+        d.addCallback(_check_next_write)
2273+        # Add the other four shares
2274+        for i in xrange(2, 6):
2275+            d.addCallback(lambda ignored, i=i:
2276+                mw.put_block(self.block, i, self.salt))
2277+            d.addCallback(_check_next_write)
2278+        # Add the encrypted private key
2279+        d.addCallback(lambda ignored:
2280+            mw.put_encprivkey(self.encprivkey))
2281+        d.addCallback(_check_next_write)
2282+        # Add the block hash tree and share hash tree
2283+        d.addCallback(lambda ignored:
2284+            mw.put_blockhashes(self.block_hash_tree))
2285+        d.addCallback(_check_next_write)
2286+        d.addCallback(lambda ignored:
2287+            mw.put_sharehashes(self.share_hash_chain))
2288+        d.addCallback(_check_next_write)
2289+        # Add the root hash and the salt hash. This should change the
2290+        # checkstring, but not in a way that we'll be able to see right
2291+        # now, since the read vectors are applied before the write
2292+        # vectors.
2293+        d.addCallback(lambda ignored:
2294+            mw.put_root_hash(self.root_hash))
2295+        def _check_old_testv_after_new_one_is_written(results):
2296+            result, readvs = results
2297+            self.failUnless(result)
2298+            self.failUnlessIn(0, readvs)
2299+            self.failUnlessEqual(self.expected_checkstring,
2300+                                 readvs[0][0])
2301+            new_checkstring = mw.get_checkstring()
2302+            self.failIfEqual(new_checkstring,
2303+                             readvs[0][0])
2304+        d.addCallback(_check_old_testv_after_new_one_is_written)
2305+        # Now add the signature. This should succeed, meaning that the
2306+        # data gets written and the read vector matches what the writer
2307+        # thinks should be there.
2308+        d.addCallback(lambda ignored:
2309+            mw.put_signature(self.signature))
2310+        d.addCallback(_check_next_write)
2311+        # The checkstring remains the same for the rest of the process.
2312+        return d
2313+
2314+
2315+    def test_blockhashes_after_share_hash_chain(self):
2316+        mw = self._make_new_mw("si1", 0)
2317+        d = defer.succeed(None)
2318+        # Put everything up to and including the share hash chain
2319+        for i in xrange(6):
2320+            d.addCallback(lambda ignored, i=i:
2321+                mw.put_block(self.block, i, self.salt))
2322+        d.addCallback(lambda ignored:
2323+            mw.put_encprivkey(self.encprivkey))
2324+        d.addCallback(lambda ignored:
2325+            mw.put_blockhashes(self.block_hash_tree))
2326+        d.addCallback(lambda ignored:
2327+            mw.put_sharehashes(self.share_hash_chain))
2328+
2329+        # Now try to put the block hash tree again.
2330+        d.addCallback(lambda ignored:
2331+            self.shouldFail(LayoutInvalid, "test repeat salthashes",
2332+                            None,
2333+                            mw.put_blockhashes, self.block_hash_tree))
2334+        return d
2335+
2336+
2337+    def test_encprivkey_after_blockhashes(self):
2338+        mw = self._make_new_mw("si1", 0)
2339+        d = defer.succeed(None)
2340+        # Put everything up to and including the block hash tree
2341+        for i in xrange(6):
2342+            d.addCallback(lambda ignored, i=i:
2343+                mw.put_block(self.block, i, self.salt))
2344+        d.addCallback(lambda ignored:
2345+            mw.put_encprivkey(self.encprivkey))
2346+        d.addCallback(lambda ignored:
2347+            mw.put_blockhashes(self.block_hash_tree))
2348+        d.addCallback(lambda ignored:
2349+            self.shouldFail(LayoutInvalid, "out of order private key",
2350+                            None,
2351+                            mw.put_encprivkey, self.encprivkey))
2352+        return d
2353+
2354+
2355+    def test_share_hash_chain_after_signature(self):
2356+        mw = self._make_new_mw("si1", 0)
2357+        d = defer.succeed(None)
2358+        # Put everything up to and including the signature
2359+        for i in xrange(6):
2360+            d.addCallback(lambda ignored, i=i:
2361+                mw.put_block(self.block, i, self.salt))
2362+        d.addCallback(lambda ignored:
2363+            mw.put_encprivkey(self.encprivkey))
2364+        d.addCallback(lambda ignored:
2365+            mw.put_blockhashes(self.block_hash_tree))
2366+        d.addCallback(lambda ignored:
2367+            mw.put_sharehashes(self.share_hash_chain))
2368+        d.addCallback(lambda ignored:
2369+            mw.put_root_hash(self.root_hash))
2370+        d.addCallback(lambda ignored:
2371+            mw.put_signature(self.signature))
2372+        # Now try to put the share hash chain again. This should fail
2373+        d.addCallback(lambda ignored:
2374+            self.shouldFail(LayoutInvalid, "out of order share hash chain",
2375+                            None,
2376+                            mw.put_sharehashes, self.share_hash_chain))
2377+        return d
2378+
2379+
2380+    def test_signature_after_verification_key(self):
2381+        mw = self._make_new_mw("si1", 0)
2382+        d = defer.succeed(None)
2383+        # Put everything up to and including the verification key.
2384+        for i in xrange(6):
2385+            d.addCallback(lambda ignored, i=i:
2386+                mw.put_block(self.block, i, self.salt))
2387+        d.addCallback(lambda ignored:
2388+            mw.put_encprivkey(self.encprivkey))
2389+        d.addCallback(lambda ignored:
2390+            mw.put_blockhashes(self.block_hash_tree))
2391+        d.addCallback(lambda ignored:
2392+            mw.put_sharehashes(self.share_hash_chain))
2393+        d.addCallback(lambda ignored:
2394+            mw.put_root_hash(self.root_hash))
2395+        d.addCallback(lambda ignored:
2396+            mw.put_signature(self.signature))
2397+        d.addCallback(lambda ignored:
2398+            mw.put_verification_key(self.verification_key))
2399+        # Now try to put the signature again. This should fail
2400+        d.addCallback(lambda ignored:
2401+            self.shouldFail(LayoutInvalid, "signature after verification",
2402+                            None,
2403+                            mw.put_signature, self.signature))
2404+        return d
2405+
2406+
2407+    def test_uncoordinated_write(self):
2408+        # Make two mutable writers, both pointing to the same storage
2409+        # server, both at the same storage index, and try writing to the
2410+        # same share.
2411+        mw1 = self._make_new_mw("si1", 0)
2412+        mw2 = self._make_new_mw("si1", 0)
2413+        d = defer.succeed(None)
2414+        def _check_success(results):
2415+            result, readvs = results
2416+            self.failUnless(result)
2417+
2418+        def _check_failure(results):
2419+            result, readvs = results
2420+            self.failIf(result)
2421+
2422+        d.addCallback(lambda ignored:
2423+            mw1.put_block(self.block, 0, self.salt))
2424+        d.addCallback(_check_success)
2425+        d.addCallback(lambda ignored:
2426+            mw2.put_block(self.block, 0, self.salt))
2427+        d.addCallback(_check_failure)
2428+        return d
2429+
2430+
2431+    def test_invalid_salt_size(self):
2432+        # Salts need to be 16 bytes in size. Writes that attempt to
2433+        # write more or less than this should be rejected.
2434+        mw = self._make_new_mw("si1", 0)
2435+        invalid_salt = "a" * 17 # 17 bytes
2436+        another_invalid_salt = "b" * 15 # 15 bytes
2437+        d = defer.succeed(None)
2438+        d.addCallback(lambda ignored:
2439+            self.shouldFail(LayoutInvalid, "salt too big",
2440+                            None,
2441+                            mw.put_block, self.block, 0, invalid_salt))
2442+        d.addCallback(lambda ignored:
2443+            self.shouldFail(LayoutInvalid, "salt too small",
2444+                            None,
2445+                            mw.put_block, self.block, 0,
2446+                            another_invalid_salt))
2447+        return d
2448+
2449+
2450+    def test_write_test_vectors(self):
2451+        # If we give the write proxy a bogus test vector at
2452+        # any point during the process, it should fail to write.
2453+        mw = self._make_new_mw("si1", 0)
2454+        mw.set_checkstring("this is a lie")
2455+        # The initial write should be expecting to find the improbable
2456+        # checkstring above in place; finding nothing, it should fail.
2457+        d = defer.succeed(None)
2458+        d.addCallback(lambda ignored:
2459+            mw.put_block(self.block, 0, self.salt))
2460+        def _check_failure(results):
2461+            result, readv = results
2462+            self.failIf(result)
2463+        d.addCallback(_check_failure)
2464+        # Now set the checkstring to the empty string, which
2465+        # indicates that no share is there.
2466+        d.addCallback(lambda ignored:
2467+            mw.set_checkstring(""))
2468+        d.addCallback(lambda ignored:
2469+            mw.put_block(self.block, 0, self.salt))
2470+        def _check_success(results):
2471+            result, readv = results
2472+            self.failUnless(result)
2473+        d.addCallback(_check_success)
2474+        # Now set the checkstring to something wrong
2475+        d.addCallback(lambda ignored:
2476+            mw.set_checkstring("something wrong"))
2477+        # This should fail to do anything
2478+        d.addCallback(lambda ignored:
2479+            mw.put_block(self.block, 1, self.salt))
2480+        d.addCallback(_check_failure)
2481+        # Now set it back to what it should be.
2482+        d.addCallback(lambda ignored:
2483+            mw.set_checkstring(mw.get_checkstring()))
2484+        for i in xrange(1, 6):
2485+            d.addCallback(lambda ignored, i=i:
2486+                mw.put_block(self.block, i, self.salt))
2487+            d.addCallback(_check_success)
2488+        d.addCallback(lambda ignored:
2489+            mw.put_encprivkey(self.encprivkey))
2490+        d.addCallback(_check_success)
2491+        d.addCallback(lambda ignored:
2492+            mw.put_blockhashes(self.block_hash_tree))
2493+        d.addCallback(_check_success)
2494+        d.addCallback(lambda ignored:
2495+            mw.put_sharehashes(self.share_hash_chain))
2496+        d.addCallback(_check_success)
2497+        def _keep_old_checkstring(ignored):
2498+            self.old_checkstring = mw.get_checkstring()
2499+            mw.set_checkstring("foobarbaz")
2500+        d.addCallback(_keep_old_checkstring)
2501+        d.addCallback(lambda ignored:
2502+            mw.put_root_hash(self.root_hash))
2503+        d.addCallback(_check_failure)
2504+        d.addCallback(lambda ignored:
2505+            self.failUnlessEqual(self.old_checkstring, mw.get_checkstring()))
2506+        def _restore_old_checkstring(ignored):
2507+            mw.set_checkstring(self.old_checkstring)
2508+        d.addCallback(_restore_old_checkstring)
2509+        d.addCallback(lambda ignored:
2510+            mw.put_root_hash(self.root_hash))
2511+        d.addCallback(_check_success)
2512+        # The checkstring should have been set appropriately for us on
2513+        # the last write; if we try to change it to something else,
2514+        # that change should cause the verification key step to fail.
2515+        d.addCallback(lambda ignored:
2516+            mw.set_checkstring("something else"))
2517+        d.addCallback(lambda ignored:
2518+            mw.put_signature(self.signature))
2519+        d.addCallback(_check_failure)
2520+        d.addCallback(lambda ignored:
2521+            mw.set_checkstring(mw.get_checkstring()))
2522+        d.addCallback(lambda ignored:
2523+            mw.put_signature(self.signature))
2524+        d.addCallback(_check_success)
2525+        d.addCallback(lambda ignored:
2526+            mw.put_verification_key(self.verification_key))
2527+        d.addCallback(_check_success)
2528+        return d
2529+
2530+
2531+    def test_offset_only_set_on_success(self):
2532+        # The write proxy should be smart enough to detect when a write
2533+        # has failed, and to temper its definition of progress based on
2534+        # that.
2535+        mw = self._make_new_mw("si1", 0)
2536+        d = defer.succeed(None)
2537+        for i in xrange(1, 6):
2538+            d.addCallback(lambda ignored, i=i:
2539+                mw.put_block(self.block, i, self.salt))
2540+        def _break_checkstring(ignored):
2541+            self._old_checkstring = mw.get_checkstring()
2542+            mw.set_checkstring("foobarbaz")
2543+
2544+        def _fix_checkstring(ignored):
2545+            mw.set_checkstring(self._old_checkstring)
2546+
2547+        d.addCallback(_break_checkstring)
2548+
2549+        # Setting the encrypted private key shouldn't work now, which is
2550+        # to be expected and is tested elsewhere. We also want to make
2551+        # sure that we can't add the block hash tree after a failed
2552+        # write of this sort.
2553+        d.addCallback(lambda ignored:
2554+            mw.put_encprivkey(self.encprivkey))
2555+        d.addCallback(lambda ignored:
2556+            self.shouldFail(LayoutInvalid, "test out-of-order blockhashes",
2557+                            None,
2558+                            mw.put_blockhashes, self.block_hash_tree))
2559+        d.addCallback(_fix_checkstring)
2560+        d.addCallback(lambda ignored:
2561+            mw.put_encprivkey(self.encprivkey))
2562+        d.addCallback(_break_checkstring)
2563+        d.addCallback(lambda ignored:
2564+            mw.put_blockhashes(self.block_hash_tree))
2565+        d.addCallback(lambda ignored:
2566+            self.shouldFail(LayoutInvalid, "test out-of-order sharehashes",
2567+                            None,
2568+                            mw.put_sharehashes, self.share_hash_chain))
2569+        d.addCallback(_fix_checkstring)
2570+        d.addCallback(lambda ignored:
2571+            mw.put_blockhashes(self.block_hash_tree))
2572+        d.addCallback(_break_checkstring)
2573+        d.addCallback(lambda ignored:
2574+            mw.put_sharehashes(self.share_hash_chain))
2575+        d.addCallback(lambda ignored:
2576+            self.shouldFail(LayoutInvalid, "out-of-order root hash",
2577+                            None,
2578+                            mw.put_root_hash, self.root_hash))
2579+        d.addCallback(_fix_checkstring)
2580+        d.addCallback(lambda ignored:
2581+            mw.put_sharehashes(self.share_hash_chain))
2582+        d.addCallback(_break_checkstring)
2583+        d.addCallback(lambda ignored:
2584+            mw.put_root_hash(self.root_hash))
2585+        d.addCallback(lambda ignored:
2586+            self.shouldFail(LayoutInvalid, "out-of-order signature",
2587+                            None,
2588+                            mw.put_signature, self.signature))
2589+        d.addCallback(_fix_checkstring)
2590+        d.addCallback(lambda ignored:
2591+            mw.put_root_hash(self.root_hash))
2592+        d.addCallback(_break_checkstring)
2593+        d.addCallback(lambda ignored:
2594+            mw.put_signature(self.signature))
2595+        d.addCallback(lambda ignored:
2596+            self.shouldFail(LayoutInvalid, "out-of-order verification key",
2597+                            None,
2598+                            mw.put_verification_key,
2599+                            self.verification_key))
2600+        d.addCallback(_fix_checkstring)
2601+        d.addCallback(lambda ignored:
2602+            mw.put_signature(self.signature))
2603+        d.addCallback(_break_checkstring)
2604+        d.addCallback(lambda ignored:
2605+            mw.put_verification_key(self.verification_key))
2606+        d.addCallback(lambda ignored:
2607+            self.shouldFail(LayoutInvalid, "out-of-order finish",
2608+                            None,
2609+                            mw.finish_publishing))
2610+        return d
2611+
2612+
2613+    def serialize_blockhashes(self, blockhashes):
2614+        return "".join(blockhashes)
2615+
2616+
2617+    def serialize_sharehashes(self, sharehashes):
2618+        ret = "".join([struct.pack(">H32s", i, sharehashes[i])
2619+                        for i in sorted(sharehashes.keys())])
2620+        return ret
2621+
2622+
2623+    def test_write(self):
2624+        # This translates to a file with 6 6-byte segments, and with 2-byte
2625+        # blocks.
2626+        mw = self._make_new_mw("si1", 0)
2627+        mw2 = self._make_new_mw("si1", 1)
2628+        # Test writing some blocks.
2629+        read = self.ss.remote_slot_readv
2630+        expected_sharedata_offset = struct.calcsize(MDMFHEADER)
2631+        written_block_size = 2 + len(self.salt)
2632+        written_block = self.block + self.salt
2633+        def _check_block_write(i, share):
2634+            self.failUnlessEqual(read("si1", [share], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
2635+                                {share: [written_block]})
2636+        d = defer.succeed(None)
2637+        for i in xrange(6):
2638+            d.addCallback(lambda ignored, i=i:
2639+                mw.put_block(self.block, i, self.salt))
2640+            d.addCallback(lambda ignored, i=i:
2641+                _check_block_write(i, 0))
2642+        # Now try the same thing, but with share 1 instead of share 0.
2643+        for i in xrange(6):
2644+            d.addCallback(lambda ignored, i=i:
2645+                mw2.put_block(self.block, i, self.salt))
2646+            d.addCallback(lambda ignored, i=i:
2647+                _check_block_write(i, 1))
2648+
2649+        # Next, we make a fake encrypted private key, and put it onto the
2650+        # storage server.
2651+        d.addCallback(lambda ignored:
2652+            mw.put_encprivkey(self.encprivkey))
2653+        expected_private_key_offset = expected_sharedata_offset + \
2654+                                      len(written_block) * 6
2655+        self.failUnlessEqual(len(self.encprivkey), 7)
2656+        d.addCallback(lambda ignored:
2657+            self.failUnlessEqual(read("si1", [0], [(expected_private_key_offset, 7)]),
2658+                                 {0: [self.encprivkey]}))
2659+
2660+        # Next, we put a fake block hash tree.
2661+        d.addCallback(lambda ignored:
2662+            mw.put_blockhashes(self.block_hash_tree))
2663+        expected_block_hash_offset = expected_private_key_offset + len(self.encprivkey)
2664+        self.failUnlessEqual(len(self.block_hash_tree_s), 32 * 6)
2665+        d.addCallback(lambda ignored:
2666+            self.failUnlessEqual(read("si1", [0], [(expected_block_hash_offset, 32 * 6)]),
2667+                                 {0: [self.block_hash_tree_s]}))
2668+
2669+        # Next, put a fake share hash chain
2670+        d.addCallback(lambda ignored:
2671+            mw.put_sharehashes(self.share_hash_chain))
2672+        expected_share_hash_offset = expected_block_hash_offset + len(self.block_hash_tree_s)
2673+        d.addCallback(lambda ignored:
2674+            self.failUnlessEqual(read("si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
2675+                                 {0: [self.share_hash_chain_s]}))
2676+
2677+        # Next, we put what is supposed to be the root hash of
2678+        # our share hash tree but isn't       
2679+        d.addCallback(lambda ignored:
2680+            mw.put_root_hash(self.root_hash))
2681+        # The root hash gets inserted at byte 9 (its position is in the header,
2682+        # and is fixed).
2683+        def _check(ignored):
2684+            self.failUnlessEqual(read("si1", [0], [(9, 32)]),
2685+                                 {0: [self.root_hash]})
2686+        d.addCallback(_check)
2687+
2688+        # Next, we put a signature of the header block.
2689+        d.addCallback(lambda ignored:
2690+            mw.put_signature(self.signature))
2691+        expected_signature_offset = expected_share_hash_offset + len(self.share_hash_chain_s)
2692+        self.failUnlessEqual(len(self.signature), 9)
2693+        d.addCallback(lambda ignored:
2694+            self.failUnlessEqual(read("si1", [0], [(expected_signature_offset, 9)]),
2695+                                 {0: [self.signature]}))
2696+
2697+        # Next, we put the verification key
2698+        d.addCallback(lambda ignored:
2699+            mw.put_verification_key(self.verification_key))
2700+        expected_verification_key_offset = expected_signature_offset + len(self.signature)
2701+        self.failUnlessEqual(len(self.verification_key), 6)
2702+        d.addCallback(lambda ignored:
2703+            self.failUnlessEqual(read("si1", [0], [(expected_verification_key_offset, 6)]),
2704+                                 {0: [self.verification_key]}))
2705+
2706+        def _check_signable(ignored):
2707+            # Make sure that the signable is what we think it should be.
2708+            signable = mw.get_signable()
2709+            verno, seq, roothash, k, n, segsize, datalen = \
2710+                                            struct.unpack(">BQ32sBBQQ",
2711+                                                          signable)
2712+            self.failUnlessEqual(verno, 1)
2713+            self.failUnlessEqual(seq, 0)
2714+            self.failUnlessEqual(roothash, self.root_hash)
2715+            self.failUnlessEqual(k, 3)
2716+            self.failUnlessEqual(n, 10)
2717+            self.failUnlessEqual(segsize, 6)
2718+            self.failUnlessEqual(datalen, 36)
2719+        d.addCallback(_check_signable)
2720+        # Next, we cause the offset table to be published.
2721+        d.addCallback(lambda ignored:
2722+            mw.finish_publishing())
2723+        expected_eof_offset = expected_verification_key_offset + len(self.verification_key)
2724+
2725+        def _check_offsets(ignored):
2726+            # Check the version number to make sure that it is correct.
2727+            expected_version_number = struct.pack(">B", 1)
2728+            self.failUnlessEqual(read("si1", [0], [(0, 1)]),
2729+                                 {0: [expected_version_number]})
2730+            # Check the sequence number to make sure that it is correct
2731+            expected_sequence_number = struct.pack(">Q", 0)
2732+            self.failUnlessEqual(read("si1", [0], [(1, 8)]),
2733+                                 {0: [expected_sequence_number]})
2734+            # Check that the encoding parameters (k, N, segement size, data
2735+            # length) are what they should be. These are  3, 10, 6, 36
2736+            expected_k = struct.pack(">B", 3)
2737+            self.failUnlessEqual(read("si1", [0], [(41, 1)]),
2738+                                 {0: [expected_k]})
2739+            expected_n = struct.pack(">B", 10)
2740+            self.failUnlessEqual(read("si1", [0], [(42, 1)]),
2741+                                 {0: [expected_n]})
2742+            expected_segment_size = struct.pack(">Q", 6)
2743+            self.failUnlessEqual(read("si1", [0], [(43, 8)]),
2744+                                 {0: [expected_segment_size]})
2745+            expected_data_length = struct.pack(">Q", 36)
2746+            self.failUnlessEqual(read("si1", [0], [(51, 8)]),
2747+                                 {0: [expected_data_length]})
2748+            expected_offset = struct.pack(">Q", expected_private_key_offset)
2749+            self.failUnlessEqual(read("si1", [0], [(59, 8)]),
2750+                                 {0: [expected_offset]})
2751+            expected_offset = struct.pack(">Q", expected_block_hash_offset)
2752+            self.failUnlessEqual(read("si1", [0], [(67, 8)]),
2753+                                 {0: [expected_offset]})
2754+            expected_offset = struct.pack(">Q", expected_share_hash_offset)
2755+            self.failUnlessEqual(read("si1", [0], [(75, 8)]),
2756+                                 {0: [expected_offset]})
2757+            expected_offset = struct.pack(">Q", expected_signature_offset)
2758+            self.failUnlessEqual(read("si1", [0], [(83, 8)]),
2759+                                 {0: [expected_offset]})
2760+            expected_offset = struct.pack(">Q", expected_verification_key_offset)
2761+            self.failUnlessEqual(read("si1", [0], [(91, 8)]),
2762+                                 {0: [expected_offset]})
2763+            expected_offset = struct.pack(">Q", expected_eof_offset)
2764+            self.failUnlessEqual(read("si1", [0], [(99, 8)]),
2765+                                 {0: [expected_offset]})
2766+        d.addCallback(_check_offsets)
2767+        return d
2768+
2769+    def _make_new_mw(self, si, share, datalength=36):
2770+        # This is a file of size 36 bytes. Since it has a segment
2771+        # size of 6, we know that it has 6 byte segments, which will
2772+        # be split into blocks of 2 bytes because our FEC k
2773+        # parameter is 3.
2774+        mw = MDMFSlotWriteProxy(share, self.rref, si, self.secrets, 0, 3, 10,
2775+                                6, datalength)
2776+        return mw
2777+
2778+
2779+    def test_write_rejected_with_too_many_blocks(self):
2780+        mw = self._make_new_mw("si0", 0)
2781+
2782+        # Try writing too many blocks. We should not be able to write
2783+        # more than 6
2784+        # blocks into each share.
2785+        d = defer.succeed(None)
2786+        for i in xrange(6):
2787+            d.addCallback(lambda ignored, i=i:
2788+                mw.put_block(self.block, i, self.salt))
2789+        d.addCallback(lambda ignored:
2790+            self.shouldFail(LayoutInvalid, "too many blocks",
2791+                            None,
2792+                            mw.put_block, self.block, 7, self.salt))
2793+        return d
2794+
2795+
2796+    def test_write_rejected_with_invalid_salt(self):
2797+        # Try writing an invalid salt. Salts are 16 bytes -- any more or
2798+        # less should cause an error.
2799+        mw = self._make_new_mw("si1", 0)
2800+        bad_salt = "a" * 17 # 17 bytes
2801+        d = defer.succeed(None)
2802+        d.addCallback(lambda ignored:
2803+            self.shouldFail(LayoutInvalid, "test_invalid_salt",
2804+                            None, mw.put_block, self.block, 7, bad_salt))
2805+        return d
2806+
2807+
2808+    def test_write_rejected_with_invalid_root_hash(self):
2809+        # Try writing an invalid root hash. This should be SHA256d, and
2810+        # 32 bytes long as a result.
2811+        mw = self._make_new_mw("si2", 0)
2812+        # 17 bytes != 32 bytes
2813+        invalid_root_hash = "a" * 17
2814+        d = defer.succeed(None)
2815+        # Before this test can work, we need to put some blocks + salts,
2816+        # a block hash tree, and a share hash tree. Otherwise, we'll see
2817+        # failures that match what we are looking for, but are caused by
2818+        # the constraints imposed on operation ordering.
2819+        for i in xrange(6):
2820+            d.addCallback(lambda ignored, i=i:
2821+                mw.put_block(self.block, i, self.salt))
2822+        d.addCallback(lambda ignored:
2823+            mw.put_encprivkey(self.encprivkey))
2824+        d.addCallback(lambda ignored:
2825+            mw.put_blockhashes(self.block_hash_tree))
2826+        d.addCallback(lambda ignored:
2827+            mw.put_sharehashes(self.share_hash_chain))
2828+        d.addCallback(lambda ignored:
2829+            self.shouldFail(LayoutInvalid, "invalid root hash",
2830+                            None, mw.put_root_hash, invalid_root_hash))
2831+        return d
2832+
2833+
2834+    def test_write_rejected_with_invalid_blocksize(self):
2835+        # The blocksize implied by the writer that we get from
2836+        # _make_new_mw is 2bytes -- any more or any less than this
2837+        # should be cause for failure, unless it is the tail segment, in
2838+        # which case it may not be failure.
2839+        invalid_block = "a"
2840+        mw = self._make_new_mw("si3", 0, 33) # implies a tail segment with
2841+                                             # one byte blocks
2842+        # 1 bytes != 2 bytes
2843+        d = defer.succeed(None)
2844+        d.addCallback(lambda ignored, invalid_block=invalid_block:
2845+            self.shouldFail(LayoutInvalid, "test blocksize too small",
2846+                            None, mw.put_block, invalid_block, 0,
2847+                            self.salt))
2848+        invalid_block = invalid_block * 3
2849+        # 3 bytes != 2 bytes
2850+        d.addCallback(lambda ignored:
2851+            self.shouldFail(LayoutInvalid, "test blocksize too large",
2852+                            None,
2853+                            mw.put_block, invalid_block, 0, self.salt))
2854+        for i in xrange(5):
2855+            d.addCallback(lambda ignored, i=i:
2856+                mw.put_block(self.block, i, self.salt))
2857+        # Try to put an invalid tail segment
2858+        d.addCallback(lambda ignored:
2859+            self.shouldFail(LayoutInvalid, "test invalid tail segment",
2860+                            None,
2861+                            mw.put_block, self.block, 5, self.salt))
2862+        valid_block = "a"
2863+        d.addCallback(lambda ignored:
2864+            mw.put_block(valid_block, 5, self.salt))
2865+        return d
2866+
2867+
2868+    def test_write_enforces_order_constraints(self):
2869+        # We require that the MDMFSlotWriteProxy be interacted with in a
2870+        # specific way.
2871+        # That way is:
2872+        # 0: __init__
2873+        # 1: write blocks and salts
2874+        # 2: Write the encrypted private key
2875+        # 3: Write the block hashes
2876+        # 4: Write the share hashes
2877+        # 5: Write the root hash and salt hash
2878+        # 6: Write the signature and verification key
2879+        # 7: Write the file.
2880+        #
2881+        # Some of these can be performed out-of-order, and some can't.
2882+        # The dependencies that I want to test here are:
2883+        #  - Private key before block hashes
2884+        #  - share hashes and block hashes before root hash
2885+        #  - root hash before signature
2886+        #  - signature before verification key
2887+        mw0 = self._make_new_mw("si0", 0)
2888+        # Write some shares
2889+        d = defer.succeed(None)
2890+        for i in xrange(6):
2891+            d.addCallback(lambda ignored, i=i:
2892+                mw0.put_block(self.block, i, self.salt))
2893+        # Try to write the block hashes before writing the encrypted
2894+        # private key
2895+        d.addCallback(lambda ignored:
2896+            self.shouldFail(LayoutInvalid, "block hashes before key",
2897+                            None, mw0.put_blockhashes,
2898+                            self.block_hash_tree))
2899+
2900+        # Write the private key.
2901+        d.addCallback(lambda ignored:
2902+            mw0.put_encprivkey(self.encprivkey))
2903+
2904+
2905+        # Try to write the share hash chain without writing the block
2906+        # hash tree
2907+        d.addCallback(lambda ignored:
2908+            self.shouldFail(LayoutInvalid, "share hash chain before "
2909+                                           "salt hash tree",
2910+                            None,
2911+                            mw0.put_sharehashes, self.share_hash_chain))
2912+
2913+        # Try to write the root hash and without writing either the
2914+        # block hashes or the or the share hashes
2915+        d.addCallback(lambda ignored:
2916+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
2917+                            None,
2918+                            mw0.put_root_hash, self.root_hash))
2919+
2920+        # Now write the block hashes and try again
2921+        d.addCallback(lambda ignored:
2922+            mw0.put_blockhashes(self.block_hash_tree))
2923+
2924+        d.addCallback(lambda ignored:
2925+            self.shouldFail(LayoutInvalid, "root hash before share hashes",
2926+                            None, mw0.put_root_hash, self.root_hash))
2927+
2928+        # We haven't yet put the root hash on the share, so we shouldn't
2929+        # be able to sign it.
2930+        d.addCallback(lambda ignored:
2931+            self.shouldFail(LayoutInvalid, "signature before root hash",
2932+                            None, mw0.put_signature, self.signature))
2933+
2934+        d.addCallback(lambda ignored:
2935+            self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2936+
2937+        # ..and, since that fails, we also shouldn't be able to put the
2938+        # verification key.
2939+        d.addCallback(lambda ignored:
2940+            self.shouldFail(LayoutInvalid, "key before signature",
2941+                            None, mw0.put_verification_key,
2942+                            self.verification_key))
2943+
2944+        # Now write the share hashes.
2945+        d.addCallback(lambda ignored:
2946+            mw0.put_sharehashes(self.share_hash_chain))
2947+        # We should be able to write the root hash now too
2948+        d.addCallback(lambda ignored:
2949+            mw0.put_root_hash(self.root_hash))
2950+
2951+        # We should still be unable to put the verification key
2952+        d.addCallback(lambda ignored:
2953+            self.shouldFail(LayoutInvalid, "key before signature",
2954+                            None, mw0.put_verification_key,
2955+                            self.verification_key))
2956+
2957+        d.addCallback(lambda ignored:
2958+            mw0.put_signature(self.signature))
2959+
2960+        # We shouldn't be able to write the offsets to the remote server
2961+        # until the offset table is finished; IOW, until we have written
2962+        # the verification key.
2963+        d.addCallback(lambda ignored:
2964+            self.shouldFail(LayoutInvalid, "offsets before verification key",
2965+                            None,
2966+                            mw0.finish_publishing))
2967+
2968+        d.addCallback(lambda ignored:
2969+            mw0.put_verification_key(self.verification_key))
2970+        return d
2971+
2972+
2973+    def test_end_to_end(self):
2974+        mw = self._make_new_mw("si1", 0)
2975+        # Write a share using the mutable writer, and make sure that the
2976+        # reader knows how to read everything back to us.
2977+        d = defer.succeed(None)
2978+        for i in xrange(6):
2979+            d.addCallback(lambda ignored, i=i:
2980+                mw.put_block(self.block, i, self.salt))
2981+        d.addCallback(lambda ignored:
2982+            mw.put_encprivkey(self.encprivkey))
2983+        d.addCallback(lambda ignored:
2984+            mw.put_blockhashes(self.block_hash_tree))
2985+        d.addCallback(lambda ignored:
2986+            mw.put_sharehashes(self.share_hash_chain))
2987+        d.addCallback(lambda ignored:
2988+            mw.put_root_hash(self.root_hash))
2989+        d.addCallback(lambda ignored:
2990+            mw.put_signature(self.signature))
2991+        d.addCallback(lambda ignored:
2992+            mw.put_verification_key(self.verification_key))
2993+        d.addCallback(lambda ignored:
2994+            mw.finish_publishing())
2995+
2996+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
2997+        def _check_block_and_salt((block, salt)):
2998+            self.failUnlessEqual(block, self.block)
2999+            self.failUnlessEqual(salt, self.salt)
3000+
3001+        for i in xrange(6):
3002+            d.addCallback(lambda ignored, i=i:
3003+                mr.get_block_and_salt(i))
3004+            d.addCallback(_check_block_and_salt)
3005+
3006+        d.addCallback(lambda ignored:
3007+            mr.get_encprivkey())
3008+        d.addCallback(lambda encprivkey:
3009+            self.failUnlessEqual(self.encprivkey, encprivkey))
3010+
3011+        d.addCallback(lambda ignored:
3012+            mr.get_blockhashes())
3013+        d.addCallback(lambda blockhashes:
3014+            self.failUnlessEqual(self.block_hash_tree, blockhashes))
3015+
3016+        d.addCallback(lambda ignored:
3017+            mr.get_sharehashes())
3018+        d.addCallback(lambda sharehashes:
3019+            self.failUnlessEqual(self.share_hash_chain, sharehashes))
3020+
3021+        d.addCallback(lambda ignored:
3022+            mr.get_signature())
3023+        d.addCallback(lambda signature:
3024+            self.failUnlessEqual(signature, self.signature))
3025+
3026+        d.addCallback(lambda ignored:
3027+            mr.get_verification_key())
3028+        d.addCallback(lambda verification_key:
3029+            self.failUnlessEqual(verification_key, self.verification_key))
3030+
3031+        d.addCallback(lambda ignored:
3032+            mr.get_seqnum())
3033+        d.addCallback(lambda seqnum:
3034+            self.failUnlessEqual(seqnum, 0))
3035+
3036+        d.addCallback(lambda ignored:
3037+            mr.get_root_hash())
3038+        d.addCallback(lambda root_hash:
3039+            self.failUnlessEqual(self.root_hash, root_hash))
3040+
3041+        d.addCallback(lambda ignored:
3042+            mr.get_encoding_parameters())
3043+        def _check_encoding_parameters((k, n, segsize, datalen)):
3044+            self.failUnlessEqual(k, 3)
3045+            self.failUnlessEqual(n, 10)
3046+            self.failUnlessEqual(segsize, 6)
3047+            self.failUnlessEqual(datalen, 36)
3048+        d.addCallback(_check_encoding_parameters)
3049+
3050+        d.addCallback(lambda ignored:
3051+            mr.get_checkstring())
3052+        d.addCallback(lambda checkstring:
3053+            self.failUnlessEqual(checkstring, mw.get_checkstring()))
3054+        return d
3055+
3056+
3057+    def test_is_sdmf(self):
3058+        # The MDMFSlotReadProxy should also know how to read SDMF files,
3059+        # since it will encounter them on the grid. Callers use the
3060+        # is_sdmf method to test this.
3061+        self.write_sdmf_share_to_server("si1")
3062+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3063+        d = mr.is_sdmf()
3064+        d.addCallback(lambda issdmf:
3065+            self.failUnless(issdmf))
3066+        return d
3067+
3068+
3069+    def test_reads_sdmf(self):
3070+        # The slot read proxy should, naturally, know how to tell us
3071+        # about data in the SDMF format
3072+        self.write_sdmf_share_to_server("si1")
3073+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3074+        d = defer.succeed(None)
3075+        d.addCallback(lambda ignored:
3076+            mr.is_sdmf())
3077+        d.addCallback(lambda issdmf:
3078+            self.failUnless(issdmf))
3079+
3080+        # What do we need to read?
3081+        #  - The sharedata
3082+        #  - The salt
3083+        d.addCallback(lambda ignored:
3084+            mr.get_block_and_salt(0))
3085+        def _check_block_and_salt(results):
3086+            block, salt = results
3087+            # Our original file is 36 bytes long. Then each share is 12
3088+            # bytes in size. The share is composed entirely of the
3089+            # letter a. self.block contains 2 as, so 6 * self.block is
3090+            # what we are looking for.
3091+            self.failUnlessEqual(block, self.block * 6)
3092+            self.failUnlessEqual(salt, self.salt)
3093+        d.addCallback(_check_block_and_salt)
3094+
3095+        #  - The blockhashes
3096+        d.addCallback(lambda ignored:
3097+            mr.get_blockhashes())
3098+        d.addCallback(lambda blockhashes:
3099+            self.failUnlessEqual(self.block_hash_tree,
3100+                                 blockhashes,
3101+                                 blockhashes))
3102+        #  - The sharehashes
3103+        d.addCallback(lambda ignored:
3104+            mr.get_sharehashes())
3105+        d.addCallback(lambda sharehashes:
3106+            self.failUnlessEqual(self.share_hash_chain,
3107+                                 sharehashes))
3108+        #  - The keys
3109+        d.addCallback(lambda ignored:
3110+            mr.get_encprivkey())
3111+        d.addCallback(lambda encprivkey:
3112+            self.failUnlessEqual(encprivkey, self.encprivkey, encprivkey))
3113+        d.addCallback(lambda ignored:
3114+            mr.get_verification_key())
3115+        d.addCallback(lambda verification_key:
3116+            self.failUnlessEqual(verification_key,
3117+                                 self.verification_key,
3118+                                 verification_key))
3119+        #  - The signature
3120+        d.addCallback(lambda ignored:
3121+            mr.get_signature())
3122+        d.addCallback(lambda signature:
3123+            self.failUnlessEqual(signature, self.signature, signature))
3124+
3125+        #  - The sequence number
3126+        d.addCallback(lambda ignored:
3127+            mr.get_seqnum())
3128+        d.addCallback(lambda seqnum:
3129+            self.failUnlessEqual(seqnum, 0, seqnum))
3130+
3131+        #  - The root hash
3132+        d.addCallback(lambda ignored:
3133+            mr.get_root_hash())
3134+        d.addCallback(lambda root_hash:
3135+            self.failUnlessEqual(root_hash, self.root_hash, root_hash))
3136+        return d
3137+
3138+
3139+    def test_only_reads_one_segment_sdmf(self):
3140+        # SDMF shares have only one segment, so it doesn't make sense to
3141+        # read more segments than that. The reader should know this and
3142+        # complain if we try to do that.
3143+        self.write_sdmf_share_to_server("si1")
3144+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3145+        d = defer.succeed(None)
3146+        d.addCallback(lambda ignored:
3147+            mr.is_sdmf())
3148+        d.addCallback(lambda issdmf:
3149+            self.failUnless(issdmf))
3150+        d.addCallback(lambda ignored:
3151+            self.shouldFail(LayoutInvalid, "test bad segment",
3152+                            None,
3153+                            mr.get_block_and_salt, 1))
3154+        return d
3155+
3156+
3157+    def test_read_with_prefetched_mdmf_data(self):
3158+        # The MDMFSlotReadProxy will prefill certain fields if you pass
3159+        # it data that you have already fetched. This is useful for
3160+        # cases like the Servermap, which prefetches ~2kb of data while
3161+        # finding out which shares are on the remote peer so that it
3162+        # doesn't waste round trips.
3163+        mdmf_data = self.build_test_mdmf_share()
3164+        self.write_test_share_to_server("si1")
3165+        def _make_mr(ignored, length):
3166+            mr = MDMFSlotReadProxy(self.rref, "si1", 0, mdmf_data[:length])
3167+            return mr
3168+
3169+        d = defer.succeed(None)
3170+        # This should be enough to fill in both the encoding parameters
3171+        # and the table of offsets, which will complete the version
3172+        # information tuple.
3173+        d.addCallback(_make_mr, 107)
3174+        d.addCallback(lambda mr:
3175+            mr.get_verinfo())
3176+        def _check_verinfo(verinfo):
3177+            self.failUnless(verinfo)
3178+            self.failUnlessEqual(len(verinfo), 9)
3179+            (seqnum,
3180+             root_hash,
3181+             salt_hash,
3182+             segsize,
3183+             datalen,
3184+             k,
3185+             n,
3186+             prefix,
3187+             offsets) = verinfo
3188+            self.failUnlessEqual(seqnum, 0)
3189+            self.failUnlessEqual(root_hash, self.root_hash)
3190+            self.failUnlessEqual(segsize, 6)
3191+            self.failUnlessEqual(datalen, 36)
3192+            self.failUnlessEqual(k, 3)
3193+            self.failUnlessEqual(n, 10)
3194+            expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
3195+                                          1,
3196+                                          seqnum,
3197+                                          root_hash,
3198+                                          k,
3199+                                          n,
3200+                                          segsize,
3201+                                          datalen)
3202+            self.failUnlessEqual(expected_prefix, prefix)
3203+            self.failUnlessEqual(self.rref.read_count, 0)
3204+        d.addCallback(_check_verinfo)
3205+        # This is not enough data to read a block and a share, so the
3206+        # wrapper should attempt to read this from the remote server.
3207+        d.addCallback(_make_mr, 107)
3208+        d.addCallback(lambda mr:
3209+            mr.get_block_and_salt(0))
3210+        def _check_block_and_salt((block, salt)):
3211+            self.failUnlessEqual(block, self.block)
3212+            self.failUnlessEqual(salt, self.salt)
3213+            self.failUnlessEqual(self.rref.read_count, 1)
3214+        # This should be enough data to read one block.
3215+        d.addCallback(_make_mr, 249)
3216+        d.addCallback(lambda mr:
3217+            mr.get_block_and_salt(0))
3218+        d.addCallback(_check_block_and_salt)
3219+        return d
3220+
3221+
3222+    def test_read_with_prefetched_sdmf_data(self):
3223+        sdmf_data = self.build_test_sdmf_share()
3224+        self.write_sdmf_share_to_server("si1")
3225+        def _make_mr(ignored, length):
3226+            mr = MDMFSlotReadProxy(self.rref, "si1", 0, sdmf_data[:length])
3227+            return mr
3228+
3229+        d = defer.succeed(None)
3230+        # This should be enough to get us the encoding parameters,
3231+        # offset table, and everything else we need to build a verinfo
3232+        # string.
3233+        d.addCallback(_make_mr, 107)
3234+        d.addCallback(lambda mr:
3235+            mr.get_verinfo())
3236+        def _check_verinfo(verinfo):
3237+            self.failUnless(verinfo)
3238+            self.failUnlessEqual(len(verinfo), 9)
3239+            (seqnum,
3240+             root_hash,
3241+             salt,
3242+             segsize,
3243+             datalen,
3244+             k,
3245+             n,
3246+             prefix,
3247+             offsets) = verinfo
3248+            self.failUnlessEqual(seqnum, 0)
3249+            self.failUnlessEqual(root_hash, self.root_hash)
3250+            self.failUnlessEqual(salt, self.salt)
3251+            self.failUnlessEqual(segsize, 36)
3252+            self.failUnlessEqual(datalen, 36)
3253+            self.failUnlessEqual(k, 3)
3254+            self.failUnlessEqual(n, 10)
3255+            expected_prefix = struct.pack(SIGNED_PREFIX,
3256+                                          0,
3257+                                          seqnum,
3258+                                          root_hash,
3259+                                          salt,
3260+                                          k,
3261+                                          n,
3262+                                          segsize,
3263+                                          datalen)
3264+            self.failUnlessEqual(expected_prefix, prefix)
3265+            self.failUnlessEqual(self.rref.read_count, 0)
3266+        d.addCallback(_check_verinfo)
3267+        # This shouldn't be enough to read any share data.
3268+        d.addCallback(_make_mr, 107)
3269+        d.addCallback(lambda mr:
3270+            mr.get_block_and_salt(0))
3271+        def _check_block_and_salt((block, salt)):
3272+            self.failUnlessEqual(block, self.block * 6)
3273+            self.failUnlessEqual(salt, self.salt)
3274+            # TODO: Fix the read routine so that it reads only the data
3275+            #       that it has cached if it can't read all of it.
3276+            self.failUnlessEqual(self.rref.read_count, 2)
3277+
3278+        # This should be enough to read share data.
3279+        d.addCallback(_make_mr, self.offsets['share_data'])
3280+        d.addCallback(lambda mr:
3281+            mr.get_block_and_salt(0))
3282+        d.addCallback(_check_block_and_salt)
3283+        return d
3284+
3285+
3286+    def test_read_with_empty_mdmf_file(self):
3287+        # Some tests upload a file with no contents to test things
3288+        # unrelated to the actual handling of the content of the file.
3289+        # The reader should behave intelligently in these cases.
3290+        self.write_test_share_to_server("si1", empty=True)
3291+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3292+        # We should be able to get the encoding parameters, and they
3293+        # should be correct.
3294+        d = defer.succeed(None)
3295+        d.addCallback(lambda ignored:
3296+            mr.get_encoding_parameters())
3297+        def _check_encoding_parameters(params):
3298+            self.failUnlessEqual(len(params), 4)
3299+            k, n, segsize, datalen = params
3300+            self.failUnlessEqual(k, 3)
3301+            self.failUnlessEqual(n, 10)
3302+            self.failUnlessEqual(segsize, 0)
3303+            self.failUnlessEqual(datalen, 0)
3304+        d.addCallback(_check_encoding_parameters)
3305+
3306+        # We should not be able to fetch a block, since there are no
3307+        # blocks to fetch
3308+        d.addCallback(lambda ignored:
3309+            self.shouldFail(LayoutInvalid, "get block on empty file",
3310+                            None,
3311+                            mr.get_block_and_salt, 0))
3312+        return d
3313+
3314+
3315+    def test_read_with_empty_sdmf_file(self):
3316+        self.write_sdmf_share_to_server("si1", empty=True)
3317+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3318+        # We should be able to get the encoding parameters, and they
3319+        # should be correct
3320+        d = defer.succeed(None)
3321+        d.addCallback(lambda ignored:
3322+            mr.get_encoding_parameters())
3323+        def _check_encoding_parameters(params):
3324+            self.failUnlessEqual(len(params), 4)
3325+            k, n, segsize, datalen = params
3326+            self.failUnlessEqual(k, 3)
3327+            self.failUnlessEqual(n, 10)
3328+            self.failUnlessEqual(segsize, 0)
3329+            self.failUnlessEqual(datalen, 0)
3330+        d.addCallback(_check_encoding_parameters)
3331+
3332+        # It does not make sense to get a block in this format, so we
3333+        # should not be able to.
3334+        d.addCallback(lambda ignored:
3335+            self.shouldFail(LayoutInvalid, "get block on an empty file",
3336+                            None,
3337+                            mr.get_block_and_salt, 0))
3338+        return d
3339+
3340+
3341+    def test_verinfo_with_sdmf_file(self):
3342+        self.write_sdmf_share_to_server("si1")
3343+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3344+        # We should be able to get the version information.
3345+        d = defer.succeed(None)
3346+        d.addCallback(lambda ignored:
3347+            mr.get_verinfo())
3348+        def _check_verinfo(verinfo):
3349+            self.failUnless(verinfo)
3350+            self.failUnlessEqual(len(verinfo), 9)
3351+            (seqnum,
3352+             root_hash,
3353+             salt,
3354+             segsize,
3355+             datalen,
3356+             k,
3357+             n,
3358+             prefix,
3359+             offsets) = verinfo
3360+            self.failUnlessEqual(seqnum, 0)
3361+            self.failUnlessEqual(root_hash, self.root_hash)
3362+            self.failUnlessEqual(salt, self.salt)
3363+            self.failUnlessEqual(segsize, 36)
3364+            self.failUnlessEqual(datalen, 36)
3365+            self.failUnlessEqual(k, 3)
3366+            self.failUnlessEqual(n, 10)
3367+            expected_prefix = struct.pack(">BQ32s16s BBQQ",
3368+                                          0,
3369+                                          seqnum,
3370+                                          root_hash,
3371+                                          salt,
3372+                                          k,
3373+                                          n,
3374+                                          segsize,
3375+                                          datalen)
3376+            self.failUnlessEqual(prefix, expected_prefix)
3377+            self.failUnlessEqual(offsets, self.offsets)
3378+        d.addCallback(_check_verinfo)
3379+        return d
3380+
3381+
3382+    def test_verinfo_with_mdmf_file(self):
3383+        self.write_test_share_to_server("si1")
3384+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3385+        d = defer.succeed(None)
3386+        d.addCallback(lambda ignored:
3387+            mr.get_verinfo())
3388+        def _check_verinfo(verinfo):
3389+            self.failUnless(verinfo)
3390+            self.failUnlessEqual(len(verinfo), 9)
3391+            (seqnum,
3392+             root_hash,
3393+             IV,
3394+             segsize,
3395+             datalen,
3396+             k,
3397+             n,
3398+             prefix,
3399+             offsets) = verinfo
3400+            self.failUnlessEqual(seqnum, 0)
3401+            self.failUnlessEqual(root_hash, self.root_hash)
3402+            self.failIf(IV)
3403+            self.failUnlessEqual(segsize, 6)
3404+            self.failUnlessEqual(datalen, 36)
3405+            self.failUnlessEqual(k, 3)
3406+            self.failUnlessEqual(n, 10)
3407+            expected_prefix = struct.pack(">BQ32s BBQQ",
3408+                                          1,
3409+                                          seqnum,
3410+                                          root_hash,
3411+                                          k,
3412+                                          n,
3413+                                          segsize,
3414+                                          datalen)
3415+            self.failUnlessEqual(prefix, expected_prefix)
3416+            self.failUnlessEqual(offsets, self.offsets)
3417+        d.addCallback(_check_verinfo)
3418+        return d
3419+
3420+
3421+    def test_reader_queue(self):
3422+        self.write_test_share_to_server('si1')
3423+        mr = MDMFSlotReadProxy(self.rref, "si1", 0)
3424+        d1 = mr.get_block_and_salt(0, queue=True)
3425+        d2 = mr.get_blockhashes(queue=True)
3426+        d3 = mr.get_sharehashes(queue=True)
3427+        d4 = mr.get_signature(queue=True)
3428+        d5 = mr.get_verification_key(queue=True)
3429+        dl = defer.DeferredList([d1, d2, d3, d4, d5])
3430+        mr.flush()
3431+        def _print(results):
3432+            self.failUnlessEqual(len(results), 5)
3433+            # We have one read for version information and offsets, and
3434+            # one for everything else.
3435+            self.failUnlessEqual(self.rref.read_count, 2)
3436+            block, salt = results[0][1] # results[0] is a boolean that says
3437+                                           # whether or not the operation
3438+                                           # worked.
3439+            self.failUnlessEqual(self.block, block)
3440+            self.failUnlessEqual(self.salt, salt)
3441+
3442+            blockhashes = results[1][1]
3443+            self.failUnlessEqual(self.block_hash_tree, blockhashes)
3444+
3445+            sharehashes = results[2][1]
3446+            self.failUnlessEqual(self.share_hash_chain, sharehashes)
3447+
3448+            signature = results[3][1]
3449+            self.failUnlessEqual(self.signature, signature)
3450+
3451+            verification_key = results[4][1]
3452+            self.failUnlessEqual(self.verification_key, verification_key)
3453+        dl.addCallback(_print)
3454+        return dl
3455+
3456+
3457 class Stats(unittest.TestCase):
3458 
3459     def setUp(self):
3460}
3461[Alter the ServermapUpdater to find MDMF files
3462Kevan Carstensen <kevan@isnotajoke.com>**20100626234118
3463 Ignore-this: 25f6278209c2983ba8f307cfe0fde0
3464 
3465 The servermapupdater should find MDMF files on a grid in the same way
3466 that it finds SDMF files. This patch makes it do that.
3467] {
3468hunk ./src/allmydata/mutable/servermap.py 7
3469 from itertools import count
3470 from twisted.internet import defer
3471 from twisted.python import failure
3472-from foolscap.api import DeadReferenceError, RemoteException, eventually
3473+from foolscap.api import DeadReferenceError, RemoteException, eventually, \
3474+                         fireEventually
3475 from allmydata.util import base32, hashutil, idlib, log
3476 from allmydata.storage.server import si_b2a
3477 from allmydata.interfaces import IServermapUpdaterStatus
3478hunk ./src/allmydata/mutable/servermap.py 17
3479 from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
3480      DictOfSets, CorruptShareError, NeedMoreDataError
3481 from allmydata.mutable.layout import unpack_prefix_and_signature, unpack_header, unpack_share, \
3482-     SIGNED_PREFIX_LENGTH
3483+     SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
3484 
3485 class UpdateStatus:
3486     implements(IServermapUpdaterStatus)
3487hunk ./src/allmydata/mutable/servermap.py 254
3488         """Return a set of versionids, one for each version that is currently
3489         recoverable."""
3490         versionmap = self.make_versionmap()
3491-
3492         recoverable_versions = set()
3493         for (verinfo, shares) in versionmap.items():
3494             (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3495hunk ./src/allmydata/mutable/servermap.py 366
3496         self._servers_responded = set()
3497 
3498         # how much data should we read?
3499+        # SDMF:
3500         #  * if we only need the checkstring, then [0:75]
3501         #  * if we need to validate the checkstring sig, then [543ish:799ish]
3502         #  * if we need the verification key, then [107:436ish]
3503hunk ./src/allmydata/mutable/servermap.py 374
3504         #  * if we need the encrypted private key, we want [-1216ish:]
3505         #   * but we can't read from negative offsets
3506         #   * the offset table tells us the 'ish', also the positive offset
3507-        # A future version of the SMDF slot format should consider using
3508-        # fixed-size slots so we can retrieve less data. For now, we'll just
3509-        # read 2000 bytes, which also happens to read enough actual data to
3510-        # pre-fetch a 9-entry dirnode.
3511+        # MDMF:
3512+        #  * Checkstring? [0:72]
3513+        #  * If we want to validate the checkstring, then [0:72], [143:?] --
3514+        #    the offset table will tell us for sure.
3515+        #  * If we need the verification key, we have to consult the offset
3516+        #    table as well.
3517+        # At this point, we don't know which we are. Our filenode can
3518+        # tell us, but it might be lying -- in some cases, we're
3519+        # responsible for telling it which kind of file it is.
3520         self._read_size = 4000
3521         if mode == MODE_CHECK:
3522             # we use unpack_prefix_and_signature, so we need 1k
3523hunk ./src/allmydata/mutable/servermap.py 432
3524         self._queries_completed = 0
3525 
3526         sb = self._storage_broker
3527+        # All of the peers, permuted by the storage index, as usual.
3528         full_peerlist = sb.get_servers_for_index(self._storage_index)
3529         self.full_peerlist = full_peerlist # for use later, immutable
3530         self.extra_peers = full_peerlist[:] # peers are removed as we use them
3531hunk ./src/allmydata/mutable/servermap.py 439
3532         self._good_peers = set() # peers who had some shares
3533         self._empty_peers = set() # peers who don't have any shares
3534         self._bad_peers = set() # peers to whom our queries failed
3535+        self._readers = {} # peerid -> dict(sharewriters), filled in
3536+                           # after responses come in.
3537 
3538         k = self._node.get_required_shares()
3539hunk ./src/allmydata/mutable/servermap.py 443
3540+        # For what cases can these conditions work?
3541         if k is None:
3542             # make a guess
3543             k = 3
3544hunk ./src/allmydata/mutable/servermap.py 456
3545         self.num_peers_to_query = k + self.EPSILON
3546 
3547         if self.mode == MODE_CHECK:
3548+            # We want to query all of the peers.
3549             initial_peers_to_query = dict(full_peerlist)
3550             must_query = set(initial_peers_to_query.keys())
3551             self.extra_peers = []
3552hunk ./src/allmydata/mutable/servermap.py 464
3553             # we're planning to replace all the shares, so we want a good
3554             # chance of finding them all. We will keep searching until we've
3555             # seen epsilon that don't have a share.
3556+            # We don't query all of the peers because that could take a while.
3557             self.num_peers_to_query = N + self.EPSILON
3558             initial_peers_to_query, must_query = self._build_initial_querylist()
3559             self.required_num_empty_peers = self.EPSILON
3560hunk ./src/allmydata/mutable/servermap.py 474
3561             # might also avoid the round trip required to read the encrypted
3562             # private key.
3563 
3564-        else:
3565+        else: # MODE_READ, MODE_ANYTHING
3566+            # 2k peers is good enough.
3567             initial_peers_to_query, must_query = self._build_initial_querylist()
3568 
3569         # this is a set of peers that we are required to get responses from:
3570hunk ./src/allmydata/mutable/servermap.py 490
3571         # before we can consider ourselves finished, and self.extra_peers
3572         # contains the overflow (peers that we should tap if we don't get
3573         # enough responses)
3574+        # I guess that self._must_query is a subset of
3575+        # initial_peers_to_query?
3576+        assert set(must_query).issubset(set(initial_peers_to_query))
3577 
3578         self._send_initial_requests(initial_peers_to_query)
3579         self._status.timings["initial_queries"] = time.time() - self._started
3580hunk ./src/allmydata/mutable/servermap.py 549
3581         # errors that aren't handled by _query_failed (and errors caused by
3582         # _query_failed) get logged, but we still want to check for doneness.
3583         d.addErrback(log.err)
3584-        d.addBoth(self._check_for_done)
3585         d.addErrback(self._fatal_error)
3586hunk ./src/allmydata/mutable/servermap.py 550
3587+        d.addCallback(self._check_for_done)
3588         return d
3589 
3590     def _do_read(self, ss, peerid, storage_index, shnums, readv):
3591hunk ./src/allmydata/mutable/servermap.py 569
3592         d = ss.callRemote("slot_readv", storage_index, shnums, readv)
3593         return d
3594 
3595+
3596+    def _got_corrupt_share(self, e, shnum, peerid, data, lp):
3597+        """
3598+        I am called when a remote server returns a corrupt share in
3599+        response to one of our queries. By corrupt, I mean a share
3600+        without a valid signature. I then record the failure, notify the
3601+        server of the corruption, and record the share as bad.
3602+        """
3603+        f = failure.Failure(e)
3604+        self.log(format="bad share: %(f_value)s", f_value=str(f),
3605+                 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
3606+        # Notify the server that its share is corrupt.
3607+        self.notify_server_corruption(peerid, shnum, str(e))
3608+        # By flagging this as a bad peer, we won't count any of
3609+        # the other shares on that peer as valid, though if we
3610+        # happen to find a valid version string amongst those
3611+        # shares, we'll keep track of it so that we don't need
3612+        # to validate the signature on those again.
3613+        self._bad_peers.add(peerid)
3614+        self._last_failure = f
3615+        # XXX: Use the reader for this?
3616+        checkstring = data[:SIGNED_PREFIX_LENGTH]
3617+        self._servermap.mark_bad_share(peerid, shnum, checkstring)
3618+        self._servermap.problems.append(f)
3619+
3620+
3621+    def _cache_good_sharedata(self, verinfo, shnum, now, data):
3622+        """
3623+        If one of my queries returns successfully (which means that we
3624+        were able to and successfully did validate the signature), I
3625+        cache the data that we initially fetched from the storage
3626+        server. This will help reduce the number of roundtrips that need
3627+        to occur when the file is downloaded, or when the file is
3628+        updated.
3629+        """
3630+        self._node._add_to_cache(verinfo, shnum, 0, data, now)
3631+
3632+
3633     def _got_results(self, datavs, peerid, readsize, stuff, started):
3634         lp = self.log(format="got result from [%(peerid)s], %(numshares)d shares",
3635                       peerid=idlib.shortnodeid_b2a(peerid),
3636hunk ./src/allmydata/mutable/servermap.py 630
3637         else:
3638             self._empty_peers.add(peerid)
3639 
3640-        last_verinfo = None
3641-        last_shnum = None
3642+        ss, storage_index = stuff
3643+        ds = []
3644+
3645         for shnum,datav in datavs.items():
3646             data = datav[0]
3647hunk ./src/allmydata/mutable/servermap.py 635
3648-            try:
3649-                verinfo = self._got_results_one_share(shnum, data, peerid, lp)
3650-                last_verinfo = verinfo
3651-                last_shnum = shnum
3652-                self._node._add_to_cache(verinfo, shnum, 0, data, now)
3653-            except CorruptShareError, e:
3654-                # log it and give the other shares a chance to be processed
3655-                f = failure.Failure()
3656-                self.log(format="bad share: %(f_value)s", f_value=str(f.value),
3657-                         failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
3658-                self.notify_server_corruption(peerid, shnum, str(e))
3659-                self._bad_peers.add(peerid)
3660-                self._last_failure = f
3661-                checkstring = data[:SIGNED_PREFIX_LENGTH]
3662-                self._servermap.mark_bad_share(peerid, shnum, checkstring)
3663-                self._servermap.problems.append(f)
3664-                pass
3665-
3666-        self._status.timings["cumulative_verify"] += (time.time() - now)
3667+            reader = MDMFSlotReadProxy(ss,
3668+                                       storage_index,
3669+                                       shnum,
3670+                                       data)
3671+            self._readers.setdefault(peerid, dict())[shnum] = reader
3672+            # our goal, with each response, is to validate the version
3673+            # information and share data as best we can at this point --
3674+            # we do this by validating the signature. To do this, we
3675+            # need to do the following:
3676+            #   - If we don't already have the public key, fetch the
3677+            #     public key. We use this to validate the signature.
3678+            if not self._node.get_pubkey():
3679+                # fetch and set the public key.
3680+                d = reader.get_verification_key()
3681+                d.addCallback(lambda results, shnum=shnum, peerid=peerid:
3682+                    self._try_to_set_pubkey(results, peerid, shnum, lp))
3683+                # XXX: Make self._pubkey_query_failed?
3684+                d.addErrback(lambda error, shnum=shnum, peerid=peerid:
3685+                    self._got_corrupt_share(error, shnum, peerid, data, lp))
3686+            else:
3687+                # we already have the public key.
3688+                d = defer.succeed(None)
3689+            # Neither of these two branches return anything of
3690+            # consequence, so the first entry in our deferredlist will
3691+            # be None.
3692 
3693hunk ./src/allmydata/mutable/servermap.py 661
3694-        if self._need_privkey and last_verinfo:
3695-            # send them a request for the privkey. We send one request per
3696-            # server.
3697-            lp2 = self.log("sending privkey request",
3698-                           parent=lp, level=log.NOISY)
3699-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3700-             offsets_tuple) = last_verinfo
3701-            o = dict(offsets_tuple)
3702+            # - Next, we need the version information. We almost
3703+            #   certainly got this by reading the first thousand or so
3704+            #   bytes of the share on the storage server, so we
3705+            #   shouldn't need to fetch anything at this step.
3706+            d2 = reader.get_verinfo()
3707+            d2.addErrback(lambda error, shnum=shnum, peerid=peerid:
3708+                self._got_corrupt_share(error, shnum, peerid, data, lp))
3709+            # - Next, we need the signature. For an SDMF share, it is
3710+            #   likely that we fetched this when doing our initial fetch
3711+            #   to get the version information. In MDMF, this lives at
3712+            #   the end of the share, so unless the file is quite small,
3713+            #   we'll need to do a remote fetch to get it.
3714+            d3 = reader.get_signature()
3715+            d3.addErrback(lambda error, shnum=shnum, peerid=peerid:
3716+                self._got_corrupt_share(error, shnum, peerid, data, lp))
3717+            #  Once we have all three of these responses, we can move on
3718+            #  to validating the signature
3719 
3720hunk ./src/allmydata/mutable/servermap.py 679
3721-            self._queries_outstanding.add(peerid)
3722-            readv = [ (o['enc_privkey'], (o['EOF'] - o['enc_privkey'])) ]
3723-            ss = self._servermap.connections[peerid]
3724-            privkey_started = time.time()
3725-            d = self._do_read(ss, peerid, self._storage_index,
3726-                              [last_shnum], readv)
3727-            d.addCallback(self._got_privkey_results, peerid, last_shnum,
3728-                          privkey_started, lp2)
3729-            d.addErrback(self._privkey_query_failed, peerid, last_shnum, lp2)
3730-            d.addErrback(log.err)
3731-            d.addCallback(self._check_for_done)
3732-            d.addErrback(self._fatal_error)
3733+            # Does the node already have a privkey? If not, we'll try to
3734+            # fetch it here.
3735+            if self._need_privkey:
3736+                d4 = reader.get_encprivkey()
3737+                d4.addCallback(lambda results, shnum=shnum, peerid=peerid:
3738+                    self._try_to_validate_privkey(results, peerid, shnum, lp))
3739+                d4.addErrback(lambda error, shnum=shnum, peerid=peerid:
3740+                    self._privkey_query_failed(error, shnum, data, lp))
3741+            else:
3742+                d4 = defer.succeed(None)
3743 
3744hunk ./src/allmydata/mutable/servermap.py 690
3745+            dl = defer.DeferredList([d, d2, d3, d4])
3746+            dl.addCallback(lambda results, shnum=shnum, peerid=peerid:
3747+                self._got_signature_one_share(results, shnum, peerid, lp))
3748+            dl.addErrback(lambda error, shnum=shnum, data=data:
3749+               self._got_corrupt_share(error, shnum, peerid, data, lp))
3750+            dl.addCallback(lambda verinfo, shnum=shnum, peerid=peerid, data=data:
3751+                self._cache_good_sharedata(verinfo, shnum, now, data))
3752+            ds.append(dl)
3753+        # dl is a deferred list that will fire when all of the shares
3754+        # that we found on this peer are done processing. When dl fires,
3755+        # we know that processing is done, so we can decrement the
3756+        # semaphore-like thing that we incremented earlier.
3757+        dl = defer.DeferredList(ds, fireOnOneErrback=True)
3758+        # Are we done? Done means that there are no more queries to
3759+        # send, that there are no outstanding queries, and that we
3760+        # haven't received any queries that are still processing. If we
3761+        # are done, self._check_for_done will cause the done deferred
3762+        # that we returned to our caller to fire, which tells them that
3763+        # they have a complete servermap, and that we won't be touching
3764+        # the servermap anymore.
3765+        dl.addCallback(self._check_for_done)
3766+        dl.addErrback(self._fatal_error)
3767         # all done!
3768         self.log("_got_results done", parent=lp, level=log.NOISY)
3769hunk ./src/allmydata/mutable/servermap.py 714
3770+        return dl
3771+
3772+
3773+    def _try_to_set_pubkey(self, pubkey_s, peerid, shnum, lp):
3774+        if self._node.get_pubkey():
3775+            return # don't go through this again if we don't have to
3776+        fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
3777+        assert len(fingerprint) == 32
3778+        if fingerprint != self._node.get_fingerprint():
3779+            raise CorruptShareError(peerid, shnum,
3780+                                "pubkey doesn't match fingerprint")
3781+        self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
3782+        assert self._node.get_pubkey()
3783+
3784 
3785     def notify_server_corruption(self, peerid, shnum, reason):
3786         ss = self._servermap.connections[peerid]
3787hunk ./src/allmydata/mutable/servermap.py 734
3788         ss.callRemoteOnly("advise_corrupt_share",
3789                           "mutable", self._storage_index, shnum, reason)
3790 
3791-    def _got_results_one_share(self, shnum, data, peerid, lp):
3792+
3793+    def _got_signature_one_share(self, results, shnum, peerid, lp):
3794+        # It is our job to give versioninfo to our caller. We need to
3795+        # raise CorruptShareError if the share is corrupt for any
3796+        # reason, something that our caller will handle.
3797         self.log(format="_got_results: got shnum #%(shnum)d from peerid %(peerid)s",
3798                  shnum=shnum,
3799                  peerid=idlib.shortnodeid_b2a(peerid),
3800hunk ./src/allmydata/mutable/servermap.py 744
3801                  level=log.NOISY,
3802                  parent=lp)
3803-
3804-        # this might raise NeedMoreDataError, if the pubkey and signature
3805-        # live at some weird offset. That shouldn't happen, so I'm going to
3806-        # treat it as a bad share.
3807-        (seqnum, root_hash, IV, k, N, segsize, datalength,
3808-         pubkey_s, signature, prefix) = unpack_prefix_and_signature(data)
3809-
3810-        if not self._node.get_pubkey():
3811-            fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
3812-            assert len(fingerprint) == 32
3813-            if fingerprint != self._node.get_fingerprint():
3814-                raise CorruptShareError(peerid, shnum,
3815-                                        "pubkey doesn't match fingerprint")
3816-            self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
3817-
3818-        if self._need_privkey:
3819-            self._try_to_extract_privkey(data, peerid, shnum, lp)
3820-
3821-        (ig_version, ig_seqnum, ig_root_hash, ig_IV, ig_k, ig_N,
3822-         ig_segsize, ig_datalen, offsets) = unpack_header(data)
3823+        _, verinfo, signature, __ = results
3824+        (seqnum,
3825+         root_hash,
3826+         saltish,
3827+         segsize,
3828+         datalen,
3829+         k,
3830+         n,
3831+         prefix,
3832+         offsets) = verinfo[1]
3833         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
3834 
3835hunk ./src/allmydata/mutable/servermap.py 756
3836-        verinfo = (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
3837+        # XXX: This should be done for us in the method, so
3838+        # presumably you can go in there and fix it.
3839+        verinfo = (seqnum,
3840+                   root_hash,
3841+                   saltish,
3842+                   segsize,
3843+                   datalen,
3844+                   k,
3845+                   n,
3846+                   prefix,
3847                    offsets_tuple)
3848hunk ./src/allmydata/mutable/servermap.py 767
3849+        # This tuple uniquely identifies a share on the grid; we use it
3850+        # to keep track of the ones that we've already seen.
3851 
3852         if verinfo not in self._valid_versions:
3853hunk ./src/allmydata/mutable/servermap.py 771
3854-            # it's a new pair. Verify the signature.
3855-            valid = self._node.get_pubkey().verify(prefix, signature)
3856+            # This is a new version tuple, and we need to validate it
3857+            # against the public key before keeping track of it.
3858+            assert self._node.get_pubkey()
3859+            valid = self._node.get_pubkey().verify(prefix, signature[1])
3860             if not valid:
3861hunk ./src/allmydata/mutable/servermap.py 776
3862-                raise CorruptShareError(peerid, shnum, "signature is invalid")
3863+                raise CorruptShareError(peerid, shnum,
3864+                                        "signature is invalid")
3865 
3866hunk ./src/allmydata/mutable/servermap.py 779
3867-            # ok, it's a valid verinfo. Add it to the list of validated
3868-            # versions.
3869-            self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
3870-                     % (seqnum, base32.b2a(root_hash)[:4],
3871-                        idlib.shortnodeid_b2a(peerid), shnum,
3872-                        k, N, segsize, datalength),
3873-                     parent=lp)
3874-            self._valid_versions.add(verinfo)
3875-        # We now know that this is a valid candidate verinfo.
3876+        # ok, it's a valid verinfo. Add it to the list of validated
3877+        # versions.
3878+        self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
3879+                 % (seqnum, base32.b2a(root_hash)[:4],
3880+                    idlib.shortnodeid_b2a(peerid), shnum,
3881+                    k, n, segsize, datalen),
3882+                    parent=lp)
3883+        self._valid_versions.add(verinfo)
3884+        # We now know that this is a valid candidate verinfo. Whether or
3885+        # not this instance of it is valid is a matter for the next
3886+        # statement; at this point, we just know that if we see this
3887+        # version info again, that its signature checks out and that
3888+        # we're okay to skip the signature-checking step.
3889 
3890hunk ./src/allmydata/mutable/servermap.py 793
3891+        # (peerid, shnum) are bound in the method invocation.
3892         if (peerid, shnum) in self._servermap.bad_shares:
3893             # we've been told that the rest of the data in this share is
3894             # unusable, so don't add it to the servermap.
3895hunk ./src/allmydata/mutable/servermap.py 808
3896         self.versionmap.add(verinfo, (shnum, peerid, timestamp))
3897         return verinfo
3898 
3899+
3900     def _deserialize_pubkey(self, pubkey_s):
3901         verifier = rsa.create_verifying_key_from_string(pubkey_s)
3902         return verifier
3903hunk ./src/allmydata/mutable/servermap.py 813
3904 
3905-    def _try_to_extract_privkey(self, data, peerid, shnum, lp):
3906-        try:
3907-            r = unpack_share(data)
3908-        except NeedMoreDataError, e:
3909-            # this share won't help us. oh well.
3910-            offset = e.encprivkey_offset
3911-            length = e.encprivkey_length
3912-            self.log("shnum %d on peerid %s: share was too short (%dB) "
3913-                     "to get the encprivkey; [%d:%d] ought to hold it" %
3914-                     (shnum, idlib.shortnodeid_b2a(peerid), len(data),
3915-                      offset, offset+length),
3916-                     parent=lp)
3917-            # NOTE: if uncoordinated writes are taking place, someone might
3918-            # change the share (and most probably move the encprivkey) before
3919-            # we get a chance to do one of these reads and fetch it. This
3920-            # will cause us to see a NotEnoughSharesError(unable to fetch
3921-            # privkey) instead of an UncoordinatedWriteError . This is a
3922-            # nuisance, but it will go away when we move to DSA-based mutable
3923-            # files (since the privkey will be small enough to fit in the
3924-            # write cap).
3925-
3926-            return
3927-
3928-        (seqnum, root_hash, IV, k, N, segsize, datalen,
3929-         pubkey, signature, share_hash_chain, block_hash_tree,
3930-         share_data, enc_privkey) = r
3931-
3932-        return self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
3933 
3934     def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
3935hunk ./src/allmydata/mutable/servermap.py 815
3936-
3937+        """
3938+        Given a writekey from a remote server, I validate it against the
3939+        writekey stored in my node. If it is valid, then I set the
3940+        privkey and encprivkey properties of the node.
3941+        """
3942         alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
3943         alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
3944         if alleged_writekey != self._node.get_writekey():
3945hunk ./src/allmydata/mutable/servermap.py 892
3946         self._queries_completed += 1
3947         self._last_failure = f
3948 
3949-    def _got_privkey_results(self, datavs, peerid, shnum, started, lp):
3950-        now = time.time()
3951-        elapsed = now - started
3952-        self._status.add_per_server_time(peerid, "privkey", started, elapsed)
3953-        self._queries_outstanding.discard(peerid)
3954-        if not self._need_privkey:
3955-            return
3956-        if shnum not in datavs:
3957-            self.log("privkey wasn't there when we asked it",
3958-                     level=log.WEIRD, umid="VA9uDQ")
3959-            return
3960-        datav = datavs[shnum]
3961-        enc_privkey = datav[0]
3962-        self._try_to_validate_privkey(enc_privkey, peerid, shnum, lp)
3963 
3964     def _privkey_query_failed(self, f, peerid, shnum, lp):
3965         self._queries_outstanding.discard(peerid)
3966hunk ./src/allmydata/mutable/servermap.py 906
3967         self._servermap.problems.append(f)
3968         self._last_failure = f
3969 
3970+
3971     def _check_for_done(self, res):
3972         # exit paths:
3973         #  return self._send_more_queries(outstanding) : send some more queries
3974hunk ./src/allmydata/mutable/servermap.py 912
3975         #  return self._done() : all done
3976         #  return : keep waiting, no new queries
3977-
3978         lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
3979                               "%(outstanding)d queries outstanding, "
3980                               "%(extra)d extra peers available, "
3981hunk ./src/allmydata/mutable/servermap.py 1117
3982         self._servermap.last_update_time = self._started
3983         # the servermap will not be touched after this
3984         self.log("servermap: %s" % self._servermap.summarize_versions())
3985+
3986         eventually(self._done_deferred.callback, self._servermap)
3987 
3988     def _fatal_error(self, f):
3989hunk ./src/allmydata/test/test_mutable.py 637
3990         d.addCallback(_created)
3991         return d
3992 
3993-    def publish_multiple(self):
3994+    def publish_mdmf(self):
3995+        # like publish_one, except that the result is guaranteed to be
3996+        # an MDMF file.
3997+        # self.CONTENTS should have more than one segment.
3998+        self.CONTENTS = "This is an MDMF file" * 100000
3999+        self._storage = FakeStorage()
4000+        self._nodemaker = make_nodemaker(self._storage)
4001+        self._storage_broker = self._nodemaker.storage_broker
4002+        d = self._nodemaker.create_mutable_file(self.CONTENTS, version=1)
4003+        def _created(node):
4004+            self._fn = node
4005+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
4006+        d.addCallback(_created)
4007+        return d
4008+
4009+
4010+    def publish_sdmf(self):
4011+        # like publish_one, except that the result is guaranteed to be
4012+        # an SDMF file
4013+        self.CONTENTS = "This is an SDMF file" * 1000
4014+        self._storage = FakeStorage()
4015+        self._nodemaker = make_nodemaker(self._storage)
4016+        self._storage_broker = self._nodemaker.storage_broker
4017+        d = self._nodemaker.create_mutable_file(self.CONTENTS, version=0)
4018+        def _created(node):
4019+            self._fn = node
4020+            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
4021+        d.addCallback(_created)
4022+        return d
4023+
4024+
4025+    def publish_multiple(self, version=0):
4026         self.CONTENTS = ["Contents 0",
4027                          "Contents 1",
4028                          "Contents 2",
4029hunk ./src/allmydata/test/test_mutable.py 677
4030         self._copied_shares = {}
4031         self._storage = FakeStorage()
4032         self._nodemaker = make_nodemaker(self._storage)
4033-        d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
4034+        d = self._nodemaker.create_mutable_file(self.CONTENTS[0], version=version) # seqnum=1
4035         def _created(node):
4036             self._fn = node
4037             # now create multiple versions of the same file, and accumulate
4038hunk ./src/allmydata/test/test_mutable.py 906
4039         return d
4040 
4041 
4042+    def test_servermapupdater_finds_mdmf_files(self):
4043+        # setUp already published an MDMF file for us. We just need to
4044+        # make sure that when we run the ServermapUpdater, the file is
4045+        # reported to have one recoverable version.
4046+        d = defer.succeed(None)
4047+        d.addCallback(lambda ignored:
4048+            self.publish_mdmf())
4049+        d.addCallback(lambda ignored:
4050+            self.make_servermap(mode=MODE_CHECK))
4051+        # Calling make_servermap also updates the servermap in the mode
4052+        # that we specify, so we just need to see what it says.
4053+        def _check_servermap(sm):
4054+            self.failUnlessEqual(len(sm.recoverable_versions()), 1)
4055+        d.addCallback(_check_servermap)
4056+        return d
4057+
4058+
4059+    def test_servermapupdater_finds_sdmf_files(self):
4060+        d = defer.succeed(None)
4061+        d.addCallback(lambda ignored:
4062+            self.publish_sdmf())
4063+        d.addCallback(lambda ignored:
4064+            self.make_servermap(mode=MODE_CHECK))
4065+        d.addCallback(lambda servermap:
4066+            self.failUnlessEqual(len(servermap.recoverable_versions()), 1))
4067+        return d
4068+
4069 
4070 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin, PublishMixin):
4071     def setUp(self):
4072hunk ./src/allmydata/test/test_mutable.py 1050
4073         return d
4074     test_no_servers_download.timeout = 15
4075 
4076+
4077     def _test_corrupt_all(self, offset, substring,
4078                           should_succeed=False, corrupt_early=True,
4079                           failure_checker=None):
4080}
4081[Make a segmented mutable uploader
4082Kevan Carstensen <kevan@isnotajoke.com>**20100626234204
4083 Ignore-this: d199af8ab0bc64d8ed2bc19c5437bfba
4084 
4085 The mutable file uploader should be able to publish files with one
4086 segment and files with multiple segments. This patch makes it do that.
4087 This is still incomplete, and rather ugly -- I need to flesh out error
4088 handling, I need to write tests, and I need to remove some of the uglier
4089 kludges in the process before I can call this done.
4090] {
4091hunk ./src/allmydata/mutable/publish.py 8
4092 from zope.interface import implements
4093 from twisted.internet import defer
4094 from twisted.python import failure
4095-from allmydata.interfaces import IPublishStatus
4096+from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION
4097 from allmydata.util import base32, hashutil, mathutil, idlib, log
4098 from allmydata import hashtree, codec
4099 from allmydata.storage.server import si_b2a
4100hunk ./src/allmydata/mutable/publish.py 19
4101      UncoordinatedWriteError, NotEnoughServersError
4102 from allmydata.mutable.servermap import ServerMap
4103 from allmydata.mutable.layout import pack_prefix, pack_share, unpack_header, pack_checkstring, \
4104-     unpack_checkstring, SIGNED_PREFIX
4105+     unpack_checkstring, SIGNED_PREFIX, MDMFSlotWriteProxy
4106+
4107+KiB = 1024
4108+DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
4109 
4110 class PublishStatus:
4111     implements(IPublishStatus)
4112hunk ./src/allmydata/mutable/publish.py 112
4113         self._status.set_helper(False)
4114         self._status.set_progress(0.0)
4115         self._status.set_active(True)
4116+        # We use this to control how the file is written.
4117+        version = self._node.get_version()
4118+        assert version in (SDMF_VERSION, MDMF_VERSION)
4119+        self._version = version
4120 
4121     def get_status(self):
4122         return self._status
4123hunk ./src/allmydata/mutable/publish.py 134
4124         simultaneous write.
4125         """
4126 
4127-        # 1: generate shares (SDMF: files are small, so we can do it in RAM)
4128-        # 2: perform peer selection, get candidate servers
4129-        #  2a: send queries to n+epsilon servers, to determine current shares
4130-        #  2b: based upon responses, create target map
4131-        # 3: send slot_testv_and_readv_and_writev messages
4132-        # 4: as responses return, update share-dispatch table
4133-        # 4a: may need to run recovery algorithm
4134-        # 5: when enough responses are back, we're done
4135+        # 0. Setup encoding parameters, encoder, and other such things.
4136+        # 1. Encrypt, encode, and publish segments.
4137 
4138         self.log("starting publish, datalen is %s" % len(newdata))
4139         self._status.set_size(len(newdata))
4140hunk ./src/allmydata/mutable/publish.py 187
4141         self.bad_peers = set() # peerids who have errbacked/refused requests
4142 
4143         self.newdata = newdata
4144-        self.salt = os.urandom(16)
4145 
4146hunk ./src/allmydata/mutable/publish.py 188
4147+        # This will set self.segment_size, self.num_segments, and
4148+        # self.fec.
4149         self.setup_encoding_parameters()
4150 
4151         # if we experience any surprises (writes which were rejected because
4152hunk ./src/allmydata/mutable/publish.py 238
4153             self.bad_share_checkstrings[key] = old_checkstring
4154             self.connections[peerid] = self._servermap.connections[peerid]
4155 
4156-        # create the shares. We'll discard these as they are delivered. SDMF:
4157-        # we're allowed to hold everything in memory.
4158+        # Now, the process dovetails -- if this is an SDMF file, we need
4159+        # to write an SDMF file. Otherwise, we need to write an MDMF
4160+        # file.
4161+        if self._version == MDMF_VERSION:
4162+            return self._publish_mdmf()
4163+        else:
4164+            return self._publish_sdmf()
4165+        #return self.done_deferred
4166+
4167+    def _publish_mdmf(self):
4168+        # Next, we find homes for all of the shares that we don't have
4169+        # homes for yet.
4170+        # TODO: Make this part do peer selection.
4171+        self.update_goal()
4172+        self.writers = {}
4173+        # For each (peerid, shnum) in self.goal, we make an
4174+        # MDMFSlotWriteProxy for that peer. We'll use this to write
4175+        # shares to the peer.
4176+        for key in self.goal:
4177+            peerid, shnum = key
4178+            write_enabler = self._node.get_write_enabler(peerid)
4179+            renew_secret = self._node.get_renewal_secret(peerid)
4180+            cancel_secret = self._node.get_cancel_secret(peerid)
4181+            secrets = (write_enabler, renew_secret, cancel_secret)
4182+
4183+            self.writers[shnum] =  MDMFSlotWriteProxy(shnum,
4184+                                                      self.connections[peerid],
4185+                                                      self._storage_index,
4186+                                                      secrets,
4187+                                                      self._new_seqnum,
4188+                                                      self.required_shares,
4189+                                                      self.total_shares,
4190+                                                      self.segment_size,
4191+                                                      len(self.newdata))
4192+            if (peerid, shnum) in self._servermap.servermap:
4193+                old_versionid, old_timestamp = self._servermap.servermap[key]
4194+                (old_seqnum, old_root_hash, old_salt, old_segsize,
4195+                 old_datalength, old_k, old_N, old_prefix,
4196+                 old_offsets_tuple) = old_versionid
4197+                self.writers[shnum].set_checkstring(old_seqnum, old_root_hash)
4198+
4199+        # Now, we start pushing shares.
4200+        self._status.timings["setup"] = time.time() - self._started
4201+        def _start_pushing(res):
4202+            self._started_pushing = time.time()
4203+            return res
4204+
4205+        # First, we encrypt, encode, and publish the shares that we need
4206+        # to encrypt, encode, and publish.
4207+
4208+        # This will eventually hold the block hash chain for each share
4209+        # that we publish. We define it this way so that empty publishes
4210+        # will still have something to write to the remote slot.
4211+        self.blockhashes = dict([(i, []) for i in xrange(self.total_shares)])
4212+        self.sharehash_leaves = None # eventually [sharehashes]
4213+        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
4214+                              # validate the share]
4215 
4216hunk ./src/allmydata/mutable/publish.py 296
4217+        d = defer.succeed(None)
4218+        self.log("Starting push")
4219+        for i in xrange(self.num_segments - 1):
4220+            d.addCallback(lambda ignored, i=i:
4221+                self.push_segment(i))
4222+            d.addCallback(self._turn_barrier)
4223+        # We have at least one segment, so we will have a tail segment
4224+        if self.num_segments > 0:
4225+            d.addCallback(lambda ignored:
4226+                self.push_tail_segment())
4227+
4228+        d.addCallback(lambda ignored:
4229+            self.push_encprivkey())
4230+        d.addCallback(lambda ignored:
4231+            self.push_blockhashes())
4232+        d.addCallback(lambda ignored:
4233+            self.push_sharehashes())
4234+        d.addCallback(lambda ignored:
4235+            self.push_toplevel_hashes_and_signature())
4236+        d.addCallback(lambda ignored:
4237+            self.finish_publishing())
4238+        return d
4239+
4240+
4241+    def _publish_sdmf(self):
4242         self._status.timings["setup"] = time.time() - self._started
4243hunk ./src/allmydata/mutable/publish.py 322
4244+        self.salt = os.urandom(16)
4245+
4246         d = self._encrypt_and_encode()
4247         d.addCallback(self._generate_shares)
4248         def _start_pushing(res):
4249hunk ./src/allmydata/mutable/publish.py 335
4250 
4251         return self.done_deferred
4252 
4253+
4254     def setup_encoding_parameters(self):
4255hunk ./src/allmydata/mutable/publish.py 337
4256-        segment_size = len(self.newdata)
4257+        if self._version == MDMF_VERSION:
4258+            segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
4259+        else:
4260+            segment_size = len(self.newdata) # SDMF is only one segment
4261         # this must be a multiple of self.required_shares
4262         segment_size = mathutil.next_multiple(segment_size,
4263                                               self.required_shares)
4264hunk ./src/allmydata/mutable/publish.py 350
4265                                                   segment_size)
4266         else:
4267             self.num_segments = 0
4268-        assert self.num_segments in [0, 1,] # SDMF restrictions
4269+        if self._version == SDMF_VERSION:
4270+            assert self.num_segments in (0, 1) # SDMF
4271+            return
4272+        # calculate the tail segment size.
4273+        self.tail_segment_size = len(self.newdata) % segment_size
4274+
4275+        if self.tail_segment_size == 0:
4276+            # The tail segment is the same size as the other segments.
4277+            self.tail_segment_size = segment_size
4278+
4279+        # We'll make an encoder ahead-of-time for the normal-sized
4280+        # segments (defined as any segment of segment_size size.
4281+        # (the part of the code that puts the tail segment will make its
4282+        #  own encoder for that part)
4283+        fec = codec.CRSEncoder()
4284+        fec.set_params(self.segment_size,
4285+                       self.required_shares, self.total_shares)
4286+        self.piece_size = fec.get_block_size()
4287+        self.fec = fec
4288+
4289+
4290+    def push_segment(self, segnum):
4291+        started = time.time()
4292+        segsize = self.segment_size
4293+        self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
4294+        data = self.newdata[segsize * segnum:segsize*(segnum + 1)]
4295+        assert len(data) == segsize
4296+
4297+        salt = os.urandom(16)
4298+
4299+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
4300+        enc = AES(key)
4301+        crypttext = enc.process(data)
4302+        assert len(crypttext) == len(data)
4303+
4304+        now = time.time()
4305+        self._status.timings["encrypt"] = now - started
4306+        started = now
4307+
4308+        # now apply FEC
4309+
4310+        self._status.set_status("Encoding")
4311+        crypttext_pieces = [None] * self.required_shares
4312+        piece_size = self.piece_size
4313+        for i in range(len(crypttext_pieces)):
4314+            offset = i * piece_size
4315+            piece = crypttext[offset:offset+piece_size]
4316+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
4317+            crypttext_pieces[i] = piece
4318+            assert len(piece) == piece_size
4319+        d = self.fec.encode(crypttext_pieces)
4320+        def _done_encoding(res):
4321+            elapsed = time.time() - started
4322+            self._status.timings["encode"] = elapsed
4323+            return res
4324+        d.addCallback(_done_encoding)
4325+
4326+        def _push_shares_and_salt(results):
4327+            shares, shareids = results
4328+            dl = []
4329+            for i in xrange(len(shares)):
4330+                sharedata = shares[i]
4331+                shareid = shareids[i]
4332+                block_hash = hashutil.block_hash(salt + sharedata)
4333+                self.blockhashes[shareid].append(block_hash)
4334+
4335+                # find the writer for this share
4336+                d = self.writers[shareid].put_block(sharedata, segnum, salt)
4337+                dl.append(d)
4338+            # TODO: Naturally, we need to check on the results of these.
4339+            return defer.DeferredList(dl)
4340+        d.addCallback(_push_shares_and_salt)
4341+        return d
4342+
4343+
4344+    def push_tail_segment(self):
4345+        # This is essentially the same as push_segment, except that we
4346+        # don't use the cached encoder that we use elsewhere.
4347+        self.log("Pushing tail segment")
4348+        started = time.time()
4349+        segsize = self.segment_size
4350+        data = self.newdata[segsize * (self.num_segments-1):]
4351+        assert len(data) == self.tail_segment_size
4352+        salt = os.urandom(16)
4353+
4354+        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
4355+        enc = AES(key)
4356+        crypttext = enc.process(data)
4357+        assert len(crypttext) == len(data)
4358+
4359+        now = time.time()
4360+        self._status.timings['encrypt'] = now - started
4361+        started = now
4362+
4363+        self._status.set_status("Encoding")
4364+        tail_fec = codec.CRSEncoder()
4365+        tail_fec.set_params(self.tail_segment_size,
4366+                            self.required_shares,
4367+                            self.total_shares)
4368+
4369+        crypttext_pieces = [None] * self.required_shares
4370+        piece_size = tail_fec.get_block_size()
4371+        for i in range(len(crypttext_pieces)):
4372+            offset = i * piece_size
4373+            piece = crypttext[offset:offset+piece_size]
4374+            piece = piece + "\x00"*(piece_size - len(piece)) # padding
4375+            crypttext_pieces[i] = piece
4376+            assert len(piece) == piece_size
4377+        d = tail_fec.encode(crypttext_pieces)
4378+        def _push_shares_and_salt(results):
4379+            shares, shareids = results
4380+            dl = []
4381+            for i in xrange(len(shares)):
4382+                sharedata = shares[i]
4383+                shareid = shareids[i]
4384+                block_hash = hashutil.block_hash(salt + sharedata)
4385+                self.blockhashes[shareid].append(block_hash)
4386+                # find the writer for this share
4387+                d = self.writers[shareid].put_block(sharedata,
4388+                                                    self.num_segments - 1,
4389+                                                    salt)
4390+                dl.append(d)
4391+            # TODO: Naturally, we need to check on the results of these.
4392+            return defer.DeferredList(dl)
4393+        d.addCallback(_push_shares_and_salt)
4394+        return d
4395+
4396+
4397+    def push_encprivkey(self):
4398+        started = time.time()
4399+        encprivkey = self._encprivkey
4400+        dl = []
4401+        def _spy_on_writer(results):
4402+            print results
4403+            return results
4404+        for shnum, writer in self.writers.iteritems():
4405+            d = writer.put_encprivkey(encprivkey)
4406+            dl.append(d)
4407+        d = defer.DeferredList(dl)
4408+        return d
4409+
4410+
4411+    def push_blockhashes(self):
4412+        started = time.time()
4413+        dl = []
4414+        def _spy_on_results(results):
4415+            print results
4416+            return results
4417+        self.sharehash_leaves = [None] * len(self.blockhashes)
4418+        for shnum, blockhashes in self.blockhashes.iteritems():
4419+            t = hashtree.HashTree(blockhashes)
4420+            self.blockhashes[shnum] = list(t)
4421+            # set the leaf for future use.
4422+            self.sharehash_leaves[shnum] = t[0]
4423+            d = self.writers[shnum].put_blockhashes(self.blockhashes[shnum])
4424+            dl.append(d)
4425+        d = defer.DeferredList(dl)
4426+        return d
4427+
4428+
4429+    def push_sharehashes(self):
4430+        share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
4431+        share_hash_chain = {}
4432+        ds = []
4433+        def _spy_on_results(results):
4434+            print results
4435+            return results
4436+        for shnum in xrange(len(self.sharehash_leaves)):
4437+            needed_indices = share_hash_tree.needed_hashes(shnum)
4438+            self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
4439+                                             for i in needed_indices] )
4440+            d = self.writers[shnum].put_sharehashes(self.sharehashes[shnum])
4441+            ds.append(d)
4442+        self.root_hash = share_hash_tree[0]
4443+        d = defer.DeferredList(ds)
4444+        return d
4445+
4446+
4447+    def push_toplevel_hashes_and_signature(self):
4448+        # We need to to three things here:
4449+        #   - Push the root hash and salt hash
4450+        #   - Get the checkstring of the resulting layout; sign that.
4451+        #   - Push the signature
4452+        ds = []
4453+        def _spy_on_results(results):
4454+            print results
4455+            return results
4456+        for shnum in xrange(self.total_shares):
4457+            d = self.writers[shnum].put_root_hash(self.root_hash)
4458+            ds.append(d)
4459+        d = defer.DeferredList(ds)
4460+        def _make_and_place_signature(ignored):
4461+            signable = self.writers[0].get_signable()
4462+            self.signature = self._privkey.sign(signable)
4463+
4464+            ds = []
4465+            for (shnum, writer) in self.writers.iteritems():
4466+                d = writer.put_signature(self.signature)
4467+                ds.append(d)
4468+            return defer.DeferredList(ds)
4469+        d.addCallback(_make_and_place_signature)
4470+        return d
4471+
4472+
4473+    def finish_publishing(self):
4474+        # We're almost done -- we just need to put the verification key
4475+        # and the offsets
4476+        ds = []
4477+        verification_key = self._pubkey.serialize()
4478+
4479+        def _spy_on_results(results):
4480+            print results
4481+            return results
4482+        for (shnum, writer) in self.writers.iteritems():
4483+            d = writer.put_verification_key(verification_key)
4484+            d.addCallback(lambda ignored, writer=writer:
4485+                writer.finish_publishing())
4486+            ds.append(d)
4487+        return defer.DeferredList(ds)
4488+
4489+
4490+    def _turn_barrier(self, res):
4491+        # putting this method in a Deferred chain imposes a guaranteed
4492+        # reactor turn between the pre- and post- portions of that chain.
4493+        # This can be useful to limit memory consumption: since Deferreds do
4494+        # not do tail recursion, code which uses defer.succeed(result) for
4495+        # consistency will cause objects to live for longer than you might
4496+        # normally expect.
4497+        return fireEventually(res)
4498+
4499 
4500     def _fatal_error(self, f):
4501         self.log("error during loop", failure=f, level=log.UNUSUAL)
4502hunk ./src/allmydata/mutable/publish.py 716
4503             self.log_goal(self.goal, "after update: ")
4504 
4505 
4506-
4507     def _encrypt_and_encode(self):
4508         # this returns a Deferred that fires with a list of (sharedata,
4509         # sharenum) tuples. TODO: cache the ciphertext, only produce the
4510hunk ./src/allmydata/mutable/publish.py 757
4511         d.addCallback(_done_encoding)
4512         return d
4513 
4514+
4515     def _generate_shares(self, shares_and_shareids):
4516         # this sets self.shares and self.root_hash
4517         self.log("_generate_shares")
4518hunk ./src/allmydata/mutable/publish.py 1145
4519             self._status.set_progress(1.0)
4520         eventually(self.done_deferred.callback, res)
4521 
4522-
4523hunk ./src/allmydata/test/test_mutable.py 248
4524         d.addCallback(_created)
4525         return d
4526 
4527+
4528+    def test_create_mdmf(self):
4529+        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
4530+        def _created(n):
4531+            self.failUnless(isinstance(n, MutableFileNode))
4532+            self.failUnlessEqual(n.get_storage_index(), n._storage_index)
4533+            sb = self.nodemaker.storage_broker
4534+            peer0 = sorted(sb.get_all_serverids())[0]
4535+            shnums = self._storage._peers[peer0].keys()
4536+            self.failUnlessEqual(len(shnums), 1)
4537+        d.addCallback(_created)
4538+        return d
4539+
4540+
4541     def test_serialize(self):
4542         n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
4543         calls = []
4544hunk ./src/allmydata/test/test_mutable.py 334
4545         d.addCallback(_created)
4546         return d
4547 
4548+
4549+    def test_create_mdmf_with_initial_contents(self):
4550+        initial_contents = "foobarbaz" * 131072 # 900KiB
4551+        d = self.nodemaker.create_mutable_file(initial_contents,
4552+                                               version=MDMF_VERSION)
4553+        def _created(n):
4554+            d = n.download_best_version()
4555+            d.addCallback(lambda data:
4556+                self.failUnlessEqual(data, initial_contents))
4557+            d.addCallback(lambda ignored:
4558+                n.overwrite(initial_contents + "foobarbaz"))
4559+            d.addCallback(lambda ignored:
4560+                n.download_best_version())
4561+            d.addCallback(lambda data:
4562+                self.failUnlessEqual(data, initial_contents +
4563+                                           "foobarbaz"))
4564+            return d
4565+        d.addCallback(_created)
4566+        return d
4567+
4568+
4569     def test_create_with_initial_contents_function(self):
4570         data = "initial contents"
4571         def _make_contents(n):
4572hunk ./src/allmydata/test/test_mutable.py 370
4573         d.addCallback(lambda data2: self.failUnlessEqual(data2, data))
4574         return d
4575 
4576+
4577+    def test_create_mdmf_with_initial_contents_function(self):
4578+        data = "initial contents" * 100000
4579+        def _make_contents(n):
4580+            self.failUnless(isinstance(n, MutableFileNode))
4581+            key = n.get_writekey()
4582+            self.failUnless(isinstance(key, str), key)
4583+            self.failUnlessEqual(len(key), 16)
4584+            return data
4585+        d = self.nodemaker.create_mutable_file(_make_contents,
4586+                                               version=MDMF_VERSION)
4587+        d.addCallback(lambda n:
4588+            n.download_best_version())
4589+        d.addCallback(lambda data2:
4590+            self.failUnlessEqual(data2, data))
4591+        return d
4592+
4593+
4594     def test_create_with_too_large_contents(self):
4595         BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
4596         d = self.nodemaker.create_mutable_file(BIG)
4597}
4598[Write a segmented mutable downloader
4599Kevan Carstensen <kevan@isnotajoke.com>**20100626234314
4600 Ignore-this: d2bef531cde1b5c38f2eb28afdd4b17c
4601 
4602 The segmented mutable downloader can deal with MDMF files (files with
4603 one or more segments in MDMF format) and SDMF files (files with one
4604 segment in SDMF format). It is backwards compatible with the old
4605 file format.
4606 
4607 This patch also contains tests for the segmented mutable downloader.
4608] {
4609hunk ./src/allmydata/mutable/retrieve.py 8
4610 from twisted.internet import defer
4611 from twisted.python import failure
4612 from foolscap.api import DeadReferenceError, eventually, fireEventually
4613-from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
4614-from allmydata.util import hashutil, idlib, log
4615+from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \
4616+                                 MDMF_VERSION, SDMF_VERSION
4617+from allmydata.util import hashutil, idlib, log, mathutil
4618 from allmydata import hashtree, codec
4619 from allmydata.storage.server import si_b2a
4620 from pycryptopp.cipher.aes import AES
4621hunk ./src/allmydata/mutable/retrieve.py 17
4622 from pycryptopp.publickey import rsa
4623 
4624 from allmydata.mutable.common import DictOfSets, CorruptShareError, UncoordinatedWriteError
4625-from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data
4626+from allmydata.mutable.layout import SIGNED_PREFIX, unpack_share_data, \
4627+                                     MDMFSlotReadProxy
4628 
4629 class RetrieveStatus:
4630     implements(IRetrieveStatus)
4631hunk ./src/allmydata/mutable/retrieve.py 104
4632         self.verinfo = verinfo
4633         # during repair, we may be called upon to grab the private key, since
4634         # it wasn't picked up during a verify=False checker run, and we'll
4635-        # need it for repair to generate the a new version.
4636+        # need it for repair to generate a new version.
4637         self._need_privkey = fetch_privkey
4638         if self._node.get_privkey():
4639             self._need_privkey = False
4640hunk ./src/allmydata/mutable/retrieve.py 109
4641 
4642+        if self._need_privkey:
4643+            # TODO: Evaluate the need for this. We'll use it if we want
4644+            # to limit how many queries are on the wire for the privkey
4645+            # at once.
4646+            self._privkey_query_markers = [] # one Marker for each time we've
4647+                                             # tried to get the privkey.
4648+
4649         self._status = RetrieveStatus()
4650         self._status.set_storage_index(self._storage_index)
4651         self._status.set_helper(False)
4652hunk ./src/allmydata/mutable/retrieve.py 125
4653          offsets_tuple) = self.verinfo
4654         self._status.set_size(datalength)
4655         self._status.set_encoding(k, N)
4656+        self.readers = {}
4657 
4658     def get_status(self):
4659         return self._status
4660hunk ./src/allmydata/mutable/retrieve.py 149
4661         self.remaining_sharemap = DictOfSets()
4662         for (shnum, peerid, timestamp) in shares:
4663             self.remaining_sharemap.add(shnum, peerid)
4664+            # If the servermap update fetched anything, it fetched at least 1
4665+            # KiB, so we ask for that much.
4666+            # TODO: Change the cache methods to allow us to fetch all of the
4667+            # data that they have, then change this method to do that.
4668+            any_cache, timestamp = self._node._read_from_cache(self.verinfo,
4669+                                                               shnum,
4670+                                                               0,
4671+                                                               1000)
4672+            ss = self.servermap.connections[peerid]
4673+            reader = MDMFSlotReadProxy(ss,
4674+                                       self._storage_index,
4675+                                       shnum,
4676+                                       any_cache)
4677+            reader.peerid = peerid
4678+            self.readers[shnum] = reader
4679+
4680 
4681         self.shares = {} # maps shnum to validated blocks
4682hunk ./src/allmydata/mutable/retrieve.py 167
4683+        self._active_readers = [] # list of active readers for this dl.
4684+        self._validated_readers = set() # set of readers that we have
4685+                                        # validated the prefix of
4686+        self._block_hash_trees = {} # shnum => hashtree
4687+        # TODO: Make this into a file-backed consumer or something to
4688+        # conserve memory.
4689+        self._plaintext = ""
4690 
4691         # how many shares do we need?
4692hunk ./src/allmydata/mutable/retrieve.py 176
4693-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4694+        (seqnum,
4695+         root_hash,
4696+         IV,
4697+         segsize,
4698+         datalength,
4699+         k,
4700+         N,
4701+         prefix,
4702          offsets_tuple) = self.verinfo
4703hunk ./src/allmydata/mutable/retrieve.py 185
4704-        assert len(self.remaining_sharemap) >= k
4705-        # we start with the lowest shnums we have available, since FEC is
4706-        # faster if we're using "primary shares"
4707-        self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
4708-        for shnum in self.active_shnums:
4709-            # we use an arbitrary peer who has the share. If shares are
4710-            # doubled up (more than one share per peer), we could make this
4711-            # run faster by spreading the load among multiple peers. But the
4712-            # algorithm to do that is more complicated than I want to write
4713-            # right now, and a well-provisioned grid shouldn't have multiple
4714-            # shares per peer.
4715-            peerid = list(self.remaining_sharemap[shnum])[0]
4716-            self.get_data(shnum, peerid)
4717 
4718hunk ./src/allmydata/mutable/retrieve.py 186
4719-        # control flow beyond this point: state machine. Receiving responses
4720-        # from queries is the input. We might send out more queries, or we
4721-        # might produce a result.
4722 
4723hunk ./src/allmydata/mutable/retrieve.py 187
4724+        # We need one share hash tree for the entire file; its leaves
4725+        # are the roots of the block hash trees for the shares that
4726+        # comprise it, and its root is in the verinfo.
4727+        self.share_hash_tree = hashtree.IncompleteHashTree(N)
4728+        self.share_hash_tree.set_hashes({0: root_hash})
4729+
4730+        # This will set up both the segment decoder and the tail segment
4731+        # decoder, as well as a variety of other instance variables that
4732+        # the download process will use.
4733+        self._setup_encoding_parameters()
4734+        assert len(self.remaining_sharemap) >= k
4735+
4736+        self.log("starting download")
4737+        self._add_active_peers()
4738+        # The download process beyond this is a state machine.
4739+        # _add_active_peers will select the peers that we want to use
4740+        # for the download, and then attempt to start downloading. After
4741+        # each segment, it will check for doneness, reacting to broken
4742+        # peers and corrupt shares as necessary. If it runs out of good
4743+        # peers before downloading all of the segments, _done_deferred
4744+        # will errback.  Otherwise, it will eventually callback with the
4745+        # contents of the mutable file.
4746         return self._done_deferred
4747 
4748hunk ./src/allmydata/mutable/retrieve.py 211
4749-    def get_data(self, shnum, peerid):
4750-        self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
4751-                 shnum=shnum,
4752-                 peerid=idlib.shortnodeid_b2a(peerid),
4753-                 level=log.NOISY)
4754-        ss = self.servermap.connections[peerid]
4755-        started = time.time()
4756-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
4757+
4758+    def _setup_encoding_parameters(self):
4759+        """
4760+        I set up the encoding parameters, including k, n, the number
4761+        of segments associated with this file, and the segment decoder.
4762+        """
4763+        (seqnum,
4764+         root_hash,
4765+         IV,
4766+         segsize,
4767+         datalength,
4768+         k,
4769+         n,
4770+         known_prefix,
4771          offsets_tuple) = self.verinfo
4772hunk ./src/allmydata/mutable/retrieve.py 226
4773-        offsets = dict(offsets_tuple)
4774+        self._required_shares = k
4775+        self._total_shares = n
4776+        self._segment_size = segsize
4777+        self._data_length = datalength
4778+
4779+        if not IV:
4780+            self._version = MDMF_VERSION
4781+        else:
4782+            self._version = SDMF_VERSION
4783+
4784+        if datalength and segsize:
4785+            self._num_segments = mathutil.div_ceil(datalength, segsize)
4786+            self._tail_data_size = datalength % segsize
4787+        else:
4788+            self._num_segments = 0
4789+            self._tail_data_size = 0
4790 
4791hunk ./src/allmydata/mutable/retrieve.py 243
4792-        # we read the checkstring, to make sure that the data we grab is from
4793-        # the right version.
4794-        readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
4795+        self._segment_decoder = codec.CRSDecoder()
4796+        self._segment_decoder.set_params(segsize, k, n)
4797+        self._current_segment = 0
4798 
4799hunk ./src/allmydata/mutable/retrieve.py 247
4800-        # We also read the data, and the hashes necessary to validate them
4801-        # (share_hash_chain, block_hash_tree, share_data). We don't read the
4802-        # signature or the pubkey, since that was handled during the
4803-        # servermap phase, and we'll be comparing the share hash chain
4804-        # against the roothash that was validated back then.
4805+        if  not self._tail_data_size:
4806+            self._tail_data_size = segsize
4807 
4808hunk ./src/allmydata/mutable/retrieve.py 250
4809-        readv.append( (offsets['share_hash_chain'],
4810-                       offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
4811+        self._tail_segment_size = mathutil.next_multiple(self._tail_data_size,
4812+                                                         self._required_shares)
4813+        if self._tail_segment_size == self._segment_size:
4814+            self._tail_decoder = self._segment_decoder
4815+        else:
4816+            self._tail_decoder = codec.CRSDecoder()
4817+            self._tail_decoder.set_params(self._tail_segment_size,
4818+                                          self._required_shares,
4819+                                          self._total_shares)
4820 
4821hunk ./src/allmydata/mutable/retrieve.py 260
4822-        # if we need the private key (for repair), we also fetch that
4823-        if self._need_privkey:
4824-            readv.append( (offsets['enc_privkey'],
4825-                           offsets['EOF'] - offsets['enc_privkey']) )
4826+        self.log("got encoding parameters: "
4827+                 "k: %d "
4828+                 "n: %d "
4829+                 "%d segments of %d bytes each (%d byte tail segment)" % \
4830+                 (k, n, self._num_segments, self._segment_size,
4831+                  self._tail_segment_size))
4832 
4833hunk ./src/allmydata/mutable/retrieve.py 267
4834-        m = Marker()
4835-        self._outstanding_queries[m] = (peerid, shnum, started)
4836+        for i in xrange(self._total_shares):
4837+            # So we don't have to do this later.
4838+            self._block_hash_trees[i] = hashtree.IncompleteHashTree(self._num_segments)
4839 
4840hunk ./src/allmydata/mutable/retrieve.py 271
4841-        # ask the cache first
4842-        got_from_cache = False
4843-        datavs = []
4844-        for (offset, length) in readv:
4845-            (data, timestamp) = self._node._read_from_cache(self.verinfo, shnum,
4846-                                                            offset, length)
4847-            if data is not None:
4848-                datavs.append(data)
4849-        if len(datavs) == len(readv):
4850-            self.log("got data from cache")
4851-            got_from_cache = True
4852-            d = fireEventually({shnum: datavs})
4853-            # datavs is a dict mapping shnum to a pair of strings
4854-        else:
4855-            d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
4856-        self.remaining_sharemap.discard(shnum, peerid)
4857+        # If we have more than one segment, we are an SDMF file, which
4858+        # means that we need to validate the salts as we receive them.
4859+        self._salt_hash_tree = hashtree.IncompleteHashTree(self._num_segments)
4860+        self._salt_hash_tree[0] = IV # from the prefix.
4861 
4862hunk ./src/allmydata/mutable/retrieve.py 276
4863-        d.addCallback(self._got_results, m, peerid, started, got_from_cache)
4864-        d.addErrback(self._query_failed, m, peerid)
4865-        # errors that aren't handled by _query_failed (and errors caused by
4866-        # _query_failed) get logged, but we still want to check for doneness.
4867-        def _oops(f):
4868-            self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
4869-                     shnum=shnum,
4870-                     peerid=idlib.shortnodeid_b2a(peerid),
4871-                     failure=f,
4872-                     level=log.WEIRD, umid="W0xnQA")
4873-        d.addErrback(_oops)
4874-        d.addBoth(self._check_for_done)
4875-        # any error during _check_for_done means the download fails. If the
4876-        # download is successful, _check_for_done will fire _done by itself.
4877-        d.addErrback(self._done)
4878-        d.addErrback(log.err)
4879-        return d # purely for testing convenience
4880 
4881hunk ./src/allmydata/mutable/retrieve.py 277
4882-    def _do_read(self, ss, peerid, storage_index, shnums, readv):
4883-        # isolate the callRemote to a separate method, so tests can subclass
4884-        # Publish and override it
4885-        d = ss.callRemote("slot_readv", storage_index, shnums, readv)
4886-        return d
4887+    def _add_active_peers(self):
4888+        """
4889+        I populate self._active_readers with enough active readers to
4890+        retrieve the contents of this mutable file. I am called before
4891+        downloading starts, and (eventually) after each validation
4892+        error, connection error, or other problem in the download.
4893+        """
4894+        # TODO: It would be cool to investigate other heuristics for
4895+        # reader selection. For instance, the cost (in time the user
4896+        # spends waiting for their file) of selecting a really slow peer
4897+        # that happens to have a primary share is probably more than
4898+        # selecting a really fast peer that doesn't have a primary
4899+        # share. Maybe the servermap could be extended to provide this
4900+        # information; it could keep track of latency information while
4901+        # it gathers more important data, and then this routine could
4902+        # use that to select active readers.
4903+        #
4904+        # (these and other questions would be easier to answer with a
4905+        #  robust, configurable tahoe-lafs simulator, which modeled node
4906+        #  failures, differences in node speed, and other characteristics
4907+        #  that we expect storage servers to have.  You could have
4908+        #  presets for really stable grids (like allmydata.com),
4909+        #  friendnets, make it easy to configure your own settings, and
4910+        #  then simulate the effect of big changes on these use cases
4911+        #  instead of just reasoning about what the effect might be. Out
4912+        #  of scope for MDMF, though.)
4913 
4914hunk ./src/allmydata/mutable/retrieve.py 304
4915-    def remove_peer(self, peerid):
4916-        for shnum in list(self.remaining_sharemap.keys()):
4917-            self.remaining_sharemap.discard(shnum, peerid)
4918+        # We need at least self._required_shares readers to download a
4919+        # segment.
4920+        needed = self._required_shares - len(self._active_readers)
4921+        # XXX: Why don't format= log messages work here?
4922+        self.log("adding %d peers to the active peers list" % needed)
4923 
4924hunk ./src/allmydata/mutable/retrieve.py 310
4925-    def _got_results(self, datavs, marker, peerid, started, got_from_cache):
4926-        now = time.time()
4927-        elapsed = now - started
4928-        if not got_from_cache:
4929-            self._status.add_fetch_timing(peerid, elapsed)
4930-        self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
4931-                 shares=len(datavs),
4932-                 peerid=idlib.shortnodeid_b2a(peerid),
4933-                 level=log.NOISY)
4934-        self._outstanding_queries.pop(marker, None)
4935-        if not self._running:
4936-            return
4937+        # We favor lower numbered shares, since FEC is faster with
4938+        # primary shares than with other shares, and lower-numbered
4939+        # shares are more likely to be primary than higher numbered
4940+        # shares.
4941+        active_shnums = set(sorted(self.remaining_sharemap.keys()))
4942+        # We shouldn't consider adding shares that we already have; this
4943+        # will cause problems later.
4944+        active_shnums -= set([reader.shnum for reader in self._active_readers])
4945+        active_shnums = list(active_shnums)[:needed]
4946+        if len(active_shnums) < needed:
4947+            # We don't have enough readers to retrieve the file; fail.
4948+            return self._failed()
4949 
4950hunk ./src/allmydata/mutable/retrieve.py 323
4951-        # note that we only ask for a single share per query, so we only
4952-        # expect a single share back. On the other hand, we use the extra
4953-        # shares if we get them.. seems better than an assert().
4954+        for shnum in active_shnums:
4955+            self._active_readers.append(self.readers[shnum])
4956+            self.log("added reader for share %d" % shnum)
4957+        assert len(self._active_readers) == self._required_shares
4958+        # Conceptually, this is part of the _add_active_peers step. It
4959+        # validates the prefixes of newly added readers to make sure
4960+        # that they match what we are expecting for self.verinfo. If
4961+        # validation is successful, _validate_active_prefixes will call
4962+        # _download_current_segment for us. If validation is
4963+        # unsuccessful, then _validate_prefixes will remove the peer and
4964+        # call _add_active_peers again, where we will attempt to rectify
4965+        # the problem by choosing another peer.
4966+        return self._validate_active_prefixes()
4967 
4968hunk ./src/allmydata/mutable/retrieve.py 337
4969-        for shnum,datav in datavs.items():
4970-            (prefix, hash_and_data) = datav[:2]
4971-            try:
4972-                self._got_results_one_share(shnum, peerid,
4973-                                            prefix, hash_and_data)
4974-            except CorruptShareError, e:
4975-                # log it and give the other shares a chance to be processed
4976-                f = failure.Failure()
4977-                self.log(format="bad share: %(f_value)s",
4978-                         f_value=str(f.value), failure=f,
4979-                         level=log.WEIRD, umid="7fzWZw")
4980-                self.notify_server_corruption(peerid, shnum, str(e))
4981-                self.remove_peer(peerid)
4982-                self.servermap.mark_bad_share(peerid, shnum, prefix)
4983-                self._bad_shares.add( (peerid, shnum) )
4984-                self._status.problems[peerid] = f
4985-                self._last_failure = f
4986-                pass
4987-            if self._need_privkey and len(datav) > 2:
4988-                lp = None
4989-                self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
4990-        # all done!
4991 
4992hunk ./src/allmydata/mutable/retrieve.py 338
4993-    def notify_server_corruption(self, peerid, shnum, reason):
4994-        ss = self.servermap.connections[peerid]
4995-        ss.callRemoteOnly("advise_corrupt_share",
4996-                          "mutable", self._storage_index, shnum, reason)
4997+    def _validate_active_prefixes(self):
4998+        """
4999+        I check to make sure that the prefixes on the peers that I am
5000+        currently reading from match the prefix that we want to see, as
5001+        said in self.verinfo.
5002 
5003hunk ./src/allmydata/mutable/retrieve.py 344
5004-    def _got_results_one_share(self, shnum, peerid,
5005-                               got_prefix, got_hash_and_data):
5006-        self.log("_got_results: got shnum #%d from peerid %s"
5007-                 % (shnum, idlib.shortnodeid_b2a(peerid)))
5008-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5009+        If I find that all of the active peers have acceptable prefixes,
5010+        I pass control to _download_current_segment, which will use
5011+        those peers to do cool things. If I find that some of the active
5012+        peers have unacceptable prefixes, I will remove them from active
5013+        peers (and from further consideration) and call
5014+        _add_active_peers to attempt to rectify the situation. I keep
5015+        track of which peers I have already validated so that I don't
5016+        need to do so again.
5017+        """
5018+        assert self._active_readers, "No more active readers"
5019+
5020+        ds = []
5021+        new_readers = set(self._active_readers) - self._validated_readers
5022+        self.log('validating %d newly-added active readers' % len(new_readers))
5023+
5024+        for reader in new_readers:
5025+            # We force a remote read here -- otherwise, we are relying
5026+            # on cached data that we already verified as valid, and we
5027+            # won't detect an uncoordinated write that has occurred
5028+            # since the last servermap update.
5029+            d = reader.get_prefix(force_remote=True)
5030+            d.addCallback(self._try_to_validate_prefix, reader)
5031+            ds.append(d)
5032+        dl = defer.DeferredList(ds, consumeErrors=True)
5033+        def _check_results(results):
5034+            # Each result in results will be of the form (success, msg).
5035+            # We don't care about msg, but success will tell us whether
5036+            # or not the checkstring validated. If it didn't, we need to
5037+            # remove the offending (peer,share) from our active readers,
5038+            # and ensure that active readers is again populated.
5039+            bad_readers = []
5040+            for i, result in enumerate(results):
5041+                if not result[0]:
5042+                    reader = self._active_readers[i]
5043+                    f = result[1]
5044+                    assert isinstance(f, failure.Failure)
5045+
5046+                    self.log("The reader %s failed to "
5047+                             "properly validate: %s" % \
5048+                             (reader, str(f.value)))
5049+                    bad_readers.append((reader, f))
5050+                else:
5051+                    reader = self._active_readers[i]
5052+                    self.log("the reader %s checks out, so we'll use it" % \
5053+                             reader)
5054+                    self._validated_readers.add(reader)
5055+                    # Each time we validate a reader, we check to see if
5056+                    # we need the private key. If we do, we politely ask
5057+                    # for it and then continue computing. If we find
5058+                    # that we haven't gotten it at the end of
5059+                    # segment decoding, then we'll take more drastic
5060+                    # measures.
5061+                    if self._need_privkey:
5062+                        d = reader.get_encprivkey()
5063+                        d.addCallback(self._try_to_validate_privkey, reader)
5064+            if bad_readers:
5065+                # We do them all at once, or else we screw up list indexing.
5066+                for (reader, f) in bad_readers:
5067+                    self._mark_bad_share(reader, f)
5068+                return self._add_active_peers()
5069+            else:
5070+                return self._download_current_segment()
5071+            # The next step will assert that it has enough active
5072+            # readers to fetch shares; we just need to remove it.
5073+        dl.addCallback(_check_results)
5074+        return dl
5075+
5076+
5077+    def _try_to_validate_prefix(self, prefix, reader):
5078+        """
5079+        I check that the prefix returned by a candidate server for
5080+        retrieval matches the prefix that the servermap knows about
5081+        (and, hence, the prefix that was validated earlier). If it does,
5082+        I return True, which means that I approve of the use of the
5083+        candidate server for segment retrieval. If it doesn't, I return
5084+        False, which means that another server must be chosen.
5085+        """
5086+        (seqnum,
5087+         root_hash,
5088+         IV,
5089+         segsize,
5090+         datalength,
5091+         k,
5092+         N,
5093+         known_prefix,
5094          offsets_tuple) = self.verinfo
5095hunk ./src/allmydata/mutable/retrieve.py 430
5096-        assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
5097-        if got_prefix != prefix:
5098-            msg = "someone wrote to the data since we read the servermap: prefix changed"
5099-            raise UncoordinatedWriteError(msg)
5100-        (share_hash_chain, block_hash_tree,
5101-         share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
5102+        if known_prefix != prefix:
5103+            self.log("prefix from share %d doesn't match" % reader.shnum)
5104+            raise UncoordinatedWriteError("Mismatched prefix -- this could "
5105+                                          "indicate an uncoordinated write")
5106+        # Otherwise, we're okay -- no issues.
5107 
5108hunk ./src/allmydata/mutable/retrieve.py 436
5109-        assert isinstance(share_data, str)
5110-        # build the block hash tree. SDMF has only one leaf.
5111-        leaves = [hashutil.block_hash(share_data)]
5112-        t = hashtree.HashTree(leaves)
5113-        if list(t) != block_hash_tree:
5114-            raise CorruptShareError(peerid, shnum, "block hash tree failure")
5115-        share_hash_leaf = t[0]
5116-        t2 = hashtree.IncompleteHashTree(N)
5117-        # root_hash was checked by the signature
5118-        t2.set_hashes({0: root_hash})
5119-        try:
5120-            t2.set_hashes(hashes=share_hash_chain,
5121-                          leaves={shnum: share_hash_leaf})
5122-        except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
5123-                IndexError), e:
5124-            msg = "corrupt hashes: %s" % (e,)
5125-            raise CorruptShareError(peerid, shnum, msg)
5126-        self.log(" data valid! len=%d" % len(share_data))
5127-        # each query comes down to this: placing validated share data into
5128-        # self.shares
5129-        self.shares[shnum] = share_data
5130 
5131hunk ./src/allmydata/mutable/retrieve.py 437
5132-    def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
5133+    def _remove_reader(self, reader):
5134+        """
5135+        At various points, we will wish to remove a peer from
5136+        consideration and/or use. These include, but are not necessarily
5137+        limited to:
5138 
5139hunk ./src/allmydata/mutable/retrieve.py 443
5140-        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
5141-        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
5142-        if alleged_writekey != self._node.get_writekey():
5143-            self.log("invalid privkey from %s shnum %d" %
5144-                     (idlib.nodeid_b2a(peerid)[:8], shnum),
5145-                     parent=lp, level=log.WEIRD, umid="YIw4tA")
5146-            return
5147+            - A connection error.
5148+            - A mismatched prefix (that is, a prefix that does not match
5149+              our conception of the version information string).
5150+            - A failing block hash, salt hash, or share hash, which can
5151+              indicate disk failure/bit flips, or network trouble.
5152 
5153hunk ./src/allmydata/mutable/retrieve.py 449
5154-        # it's good
5155-        self.log("got valid privkey from shnum %d on peerid %s" %
5156-                 (shnum, idlib.shortnodeid_b2a(peerid)),
5157-                 parent=lp)
5158-        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
5159-        self._node._populate_encprivkey(enc_privkey)
5160-        self._node._populate_privkey(privkey)
5161-        self._need_privkey = False
5162+        This method will do that. I will make sure that the
5163+        (shnum,reader) combination represented by my reader argument is
5164+        not used for anything else during this download. I will not
5165+        advise the reader of any corruption, something that my callers
5166+        may wish to do on their own.
5167+        """
5168+        # TODO: When you're done writing this, see if this is ever
5169+        # actually used for something that _mark_bad_share isn't. I have
5170+        # a feeling that they will be used for very similar things, and
5171+        # that having them both here is just going to be an epic amount
5172+        # of code duplication.
5173+        #
5174+        # (well, okay, not epic, but meaningful)
5175+        self.log("removing reader %s" % reader)
5176+        # Remove the reader from _active_readers
5177+        self._active_readers.remove(reader)
5178+        # TODO: self.readers.remove(reader)?
5179+        for shnum in list(self.remaining_sharemap.keys()):
5180+            self.remaining_sharemap.discard(shnum, reader.peerid)
5181 
5182hunk ./src/allmydata/mutable/retrieve.py 469
5183-    def _query_failed(self, f, marker, peerid):
5184-        self.log(format="query to [%(peerid)s] failed",
5185-                 peerid=idlib.shortnodeid_b2a(peerid),
5186-                 level=log.NOISY)
5187-        self._status.problems[peerid] = f
5188-        self._outstanding_queries.pop(marker, None)
5189-        if not self._running:
5190-            return
5191-        self._last_failure = f
5192-        self.remove_peer(peerid)
5193-        level = log.WEIRD
5194-        if f.check(DeadReferenceError):
5195-            level = log.UNUSUAL
5196-        self.log(format="error during query: %(f_value)s",
5197-                 f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
5198 
5199hunk ./src/allmydata/mutable/retrieve.py 470
5200-    def _check_for_done(self, res):
5201-        # exit paths:
5202-        #  return : keep waiting, no new queries
5203-        #  return self._send_more_queries(outstanding) : send some more queries
5204-        #  fire self._done(plaintext) : download successful
5205-        #  raise exception : download fails
5206+    def _mark_bad_share(self, reader, f):
5207+        """
5208+        I mark the (peerid, shnum) encapsulated by my reader argument as
5209+        a bad share, which means that it will not be used anywhere else.
5210 
5211hunk ./src/allmydata/mutable/retrieve.py 475
5212-        self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
5213-                 running=self._running, decoding=self._decoding,
5214-                 level=log.NOISY)
5215-        if not self._running:
5216-            return
5217-        if self._decoding:
5218-            return
5219-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5220-         offsets_tuple) = self.verinfo
5221+        There are several reasons to want to mark something as a bad
5222+        share. These include:
5223 
5224hunk ./src/allmydata/mutable/retrieve.py 478
5225-        if len(self.shares) < k:
5226-            # we don't have enough shares yet
5227-            return self._maybe_send_more_queries(k)
5228-        if self._need_privkey:
5229-            # we got k shares, but none of them had a valid privkey. TODO:
5230-            # look further. Adding code to do this is a bit complicated, and
5231-            # I want to avoid that complication, and this should be pretty
5232-            # rare (k shares with bitflips in the enc_privkey but not in the
5233-            # data blocks). If we actually do get here, the subsequent repair
5234-            # will fail for lack of a privkey.
5235-            self.log("got k shares but still need_privkey, bummer",
5236-                     level=log.WEIRD, umid="MdRHPA")
5237+            - A connection error to the peer.
5238+            - A mismatched prefix (that is, a prefix that does not match
5239+              our local conception of the version information string).
5240+            - A failing block hash, salt hash, share hash, or other
5241+              integrity check.
5242 
5243hunk ./src/allmydata/mutable/retrieve.py 484
5244-        # we have enough to finish. All the shares have had their hashes
5245-        # checked, so if something fails at this point, we don't know how
5246-        # to fix it, so the download will fail.
5247+        This method will ensure that readers that we wish to mark bad
5248+        (for these reasons or other reasons) are not used for the rest
5249+        of the download. Additionally, it will attempt to tell the
5250+        remote peer (with no guarantee of success) that its share is
5251+        corrupt.
5252+        """
5253+        self.log("marking share %d on server %s as bad" % \
5254+                 (reader.shnum, reader))
5255+        self._remove_reader(reader)
5256+        self._bad_shares.add((reader.peerid, reader.shnum))
5257+        self._status.problems[reader.peerid] = f
5258+        self._last_failure = f
5259+        self.notify_server_corruption(reader.peerid, reader.shnum,
5260+                                      str(f.value))
5261 
5262hunk ./src/allmydata/mutable/retrieve.py 499
5263-        self._decoding = True # avoid reentrancy
5264-        self._status.set_status("decoding")
5265-        now = time.time()
5266-        elapsed = now - self._started
5267-        self._status.timings["fetch"] = elapsed
5268 
5269hunk ./src/allmydata/mutable/retrieve.py 500
5270-        d = defer.maybeDeferred(self._decode)
5271-        d.addCallback(self._decrypt, IV, self._node.get_readkey())
5272-        d.addBoth(self._done)
5273-        return d # purely for test convenience
5274+    def _download_current_segment(self):
5275+        """
5276+        I download, validate, decode, decrypt, and assemble the segment
5277+        that this Retrieve is currently responsible for downloading.
5278+        """
5279+        assert len(self._active_readers) >= self._required_shares
5280+        if self._current_segment < self._num_segments:
5281+            d = self._process_segment(self._current_segment)
5282+        else:
5283+            d = defer.succeed(None)
5284+        d.addCallback(self._check_for_done)
5285+        return d
5286 
5287hunk ./src/allmydata/mutable/retrieve.py 513
5288-    def _maybe_send_more_queries(self, k):
5289-        # we don't have enough shares yet. Should we send out more queries?
5290-        # There are some number of queries outstanding, each for a single
5291-        # share. If we can generate 'needed_shares' additional queries, we do
5292-        # so. If we can't, then we know this file is a goner, and we raise
5293-        # NotEnoughSharesError.
5294-        self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
5295-                         "outstanding=%(outstanding)d"),
5296-                 have=len(self.shares), k=k,
5297-                 outstanding=len(self._outstanding_queries),
5298-                 level=log.NOISY)
5299 
5300hunk ./src/allmydata/mutable/retrieve.py 514
5301-        remaining_shares = k - len(self.shares)
5302-        needed = remaining_shares - len(self._outstanding_queries)
5303-        if not needed:
5304-            # we have enough queries in flight already
5305+    def _process_segment(self, segnum):
5306+        """
5307+        I download, validate, decode, and decrypt one segment of the
5308+        file that this Retrieve is retrieving. This means coordinating
5309+        the process of getting k blocks of that file, validating them,
5310+        assembling them into one segment with the decoder, and then
5311+        decrypting them.
5312+        """
5313+        self.log("processing segment %d" % segnum)
5314 
5315hunk ./src/allmydata/mutable/retrieve.py 524
5316-            # TODO: but if they've been in flight for a long time, and we
5317-            # have reason to believe that new queries might respond faster
5318-            # (i.e. we've seen other queries come back faster, then consider
5319-            # sending out new queries. This could help with peers which have
5320-            # silently gone away since the servermap was updated, for which
5321-            # we're still waiting for the 15-minute TCP disconnect to happen.
5322-            self.log("enough queries are in flight, no more are needed",
5323-                     level=log.NOISY)
5324-            return
5325+        # TODO: The old code uses a marker. Should this code do that
5326+        # too? What did the Marker do?
5327+        assert len(self._active_readers) >= self._required_shares
5328+
5329+        # We need to ask each of our active readers for its block and
5330+        # salt. We will then validate those. If validation is
5331+        # successful, we will assemble the results into plaintext.
5332+        ds = []
5333+        for reader in self._active_readers:
5334+            d = reader.get_block_and_salt(segnum, queue=True)
5335+            d2 = self._get_needed_hashes(reader, segnum)
5336+            dl = defer.DeferredList([d, d2], consumeErrors=True)
5337+            dl.addCallback(self._validate_block, segnum, reader)
5338+            dl.addErrback(self._validation_or_decoding_failed, [reader])
5339+            ds.append(dl)
5340+            reader.flush()
5341+        dl = defer.DeferredList(ds)
5342+        dl.addCallback(self._maybe_decode_and_decrypt_segment, segnum)
5343+        return dl
5344 
5345hunk ./src/allmydata/mutable/retrieve.py 544
5346-        outstanding_shnums = set([shnum
5347-                                  for (peerid, shnum, started)
5348-                                  in self._outstanding_queries.values()])
5349-        # prefer low-numbered shares, they are more likely to be primary
5350-        available_shnums = sorted(self.remaining_sharemap.keys())
5351-        for shnum in available_shnums:
5352-            if shnum in outstanding_shnums:
5353-                # skip ones that are already in transit
5354-                continue
5355-            if shnum not in self.remaining_sharemap:
5356-                # no servers for that shnum. note that DictOfSets removes
5357-                # empty sets from the dict for us.
5358-                continue
5359-            peerid = list(self.remaining_sharemap[shnum])[0]
5360-            # get_data will remove that peerid from the sharemap, and add the
5361-            # query to self._outstanding_queries
5362-            self._status.set_status("Retrieving More Shares")
5363-            self.get_data(shnum, peerid)
5364-            needed -= 1
5365-            if not needed:
5366+
5367+    def _maybe_decode_and_decrypt_segment(self, blocks_and_salts, segnum):
5368+        """
5369+        I take the results of fetching and validating the blocks from a
5370+        callback chain in another method. If the results are such that
5371+        they tell me that validation and fetching succeeded without
5372+        incident, I will proceed with decoding and decryption.
5373+        Otherwise, I will do nothing.
5374+        """
5375+        self.log("trying to decode and decrypt segment %d" % segnum)
5376+        failures = False
5377+        for block_and_salt in blocks_and_salts:
5378+            if not block_and_salt[0] or block_and_salt[1] == None:
5379+                self.log("some validation operations failed; not proceeding")
5380+                failures = True
5381                 break
5382hunk ./src/allmydata/mutable/retrieve.py 560
5383+        if not failures:
5384+            self.log("everything looks ok, building segment %d" % segnum)
5385+            d = self._decode_blocks(blocks_and_salts, segnum)
5386+            d.addCallback(self._decrypt_segment)
5387+            d.addErrback(self._validation_or_decoding_failed,
5388+                         self._active_readers)
5389+            d.addCallback(self._set_segment)
5390+            return d
5391+        else:
5392+            return defer.succeed(None)
5393+
5394+
5395+    def _set_segment(self, segment):
5396+        """
5397+        Given a plaintext segment, I register that segment with the
5398+        target that is handling the file download.
5399+        """
5400+        self.log("got plaintext for segment %d" % self._current_segment)
5401+        self._plaintext += segment
5402+        self._current_segment += 1
5403 
5404hunk ./src/allmydata/mutable/retrieve.py 581
5405-        # at this point, we have as many outstanding queries as we can. If
5406-        # needed!=0 then we might not have enough to recover the file.
5407-        if needed:
5408-            format = ("ran out of peers: "
5409-                      "have %(have)d shares (k=%(k)d), "
5410-                      "%(outstanding)d queries in flight, "
5411-                      "need %(need)d more, "
5412-                      "found %(bad)d bad shares")
5413-            args = {"have": len(self.shares),
5414-                    "k": k,
5415-                    "outstanding": len(self._outstanding_queries),
5416-                    "need": needed,
5417-                    "bad": len(self._bad_shares),
5418-                    }
5419-            self.log(format=format,
5420-                     level=log.WEIRD, umid="ezTfjw", **args)
5421-            err = NotEnoughSharesError("%s, last failure: %s" %
5422-                                      (format % args, self._last_failure))
5423-            if self._bad_shares:
5424-                self.log("We found some bad shares this pass. You should "
5425-                         "update the servermap and try again to check "
5426-                         "more peers",
5427-                         level=log.WEIRD, umid="EFkOlA")
5428-                err.servermap = self.servermap
5429-            raise err
5430 
5431hunk ./src/allmydata/mutable/retrieve.py 582
5432+    def _validation_or_decoding_failed(self, f, readers):
5433+        """
5434+        I am called when a block or a salt fails to correctly validate, or when
5435+        the decryption or decoding operation fails for some reason.  I react to
5436+        this failure by notifying the remote server of corruption, and then
5437+        removing the remote peer from further activity.
5438+        """
5439+        assert isinstance(readers, list)
5440+        bad_shnums = [reader.shnum for reader in readers]
5441+
5442+        self.log("validation or decoding failed on share(s) %s, peer(s) %s "
5443+                 ", segment %d: %s" % \
5444+                 (bad_shnums, readers, self._current_segment, str(f)))
5445+        for reader in readers:
5446+            self._mark_bad_share(reader, f)
5447         return
5448 
5449hunk ./src/allmydata/mutable/retrieve.py 599
5450-    def _decode(self):
5451-        started = time.time()
5452-        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5453-         offsets_tuple) = self.verinfo
5454 
5455hunk ./src/allmydata/mutable/retrieve.py 600
5456-        # shares_dict is a dict mapping shnum to share data, but the codec
5457-        # wants two lists.
5458-        shareids = []; shares = []
5459-        for shareid, share in self.shares.items():
5460+    def _validate_block(self, results, segnum, reader):
5461+        """
5462+        I validate a block from one share on a remote server.
5463+        """
5464+        # Grab the part of the block hash tree that is necessary to
5465+        # validate this block, then generate the block hash root.
5466+        self.log("validating share %d for segment %d" % (reader.shnum,
5467+                                                             segnum))
5468+        # Did we fail to fetch either of the things that we were
5469+        # supposed to? Fail if so.
5470+        if not results[0][0] and results[1][0]:
5471+            # handled by the errback handler.
5472+
5473+            # These all get batched into one query, so the resulting
5474+            # failure should be the same for all of them, so we can just
5475+            # use the first one.
5476+            assert isinstance(results[0][1], failure.Failure)
5477+
5478+            f = results[0][1]
5479+            raise CorruptShareError(reader.peerid,
5480+                                    reader.shnum,
5481+                                    "Connection error: %s" % str(f))
5482+
5483+        block_and_salt, block_and_sharehashes = results
5484+        block, salt = block_and_salt[1]
5485+        blockhashes, sharehashes = block_and_sharehashes[1]
5486+
5487+        blockhashes = dict(enumerate(blockhashes[1]))
5488+        self.log("the reader gave me the following blockhashes: %s" % \
5489+                 blockhashes.keys())
5490+        self.log("the reader gave me the following sharehashes: %s" % \
5491+                 sharehashes[1].keys())
5492+        bht = self._block_hash_trees[reader.shnum]
5493+
5494+        if bht.needed_hashes(segnum, include_leaf=True):
5495+            try:
5496+                bht.set_hashes(blockhashes)
5497+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
5498+                    IndexError), e:
5499+                raise CorruptShareError(reader.peerid,
5500+                                        reader.shnum,
5501+                                        "block hash tree failure: %s" % e)
5502+
5503+        if self._version == MDMF_VERSION:
5504+            blockhash = hashutil.block_hash(salt + block)
5505+        else:
5506+            blockhash = hashutil.block_hash(block)
5507+        # If this works without an error, then validation is
5508+        # successful.
5509+        try:
5510+           bht.set_hashes(leaves={segnum: blockhash})
5511+        except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
5512+                IndexError), e:
5513+            raise CorruptShareError(reader.peerid,
5514+                                    reader.shnum,
5515+                                    "block hash tree failure: %s" % e)
5516+
5517+        # Reaching this point means that we know that this segment
5518+        # is correct. Now we need to check to see whether the share
5519+        # hash chain is also correct.
5520+        # SDMF wrote share hash chains that didn't contain the
5521+        # leaves, which would be produced from the block hash tree.
5522+        # So we need to validate the block hash tree first. If
5523+        # successful, then bht[0] will contain the root for the
5524+        # shnum, which will be a leaf in the share hash tree, which
5525+        # will allow us to validate the rest of the tree.
5526+        if self.share_hash_tree.needed_hashes(reader.shnum,
5527+                                               include_leaf=True):
5528+            try:
5529+                self.share_hash_tree.set_hashes(hashes=sharehashes[1],
5530+                                            leaves={reader.shnum: bht[0]})
5531+            except (hashtree.BadHashError, hashtree.NotEnoughHashesError, \
5532+                    IndexError), e:
5533+                raise CorruptShareError(reader.peerid,
5534+                                        reader.shnum,
5535+                                        "corrupt hashes: %s" % e)
5536+
5537+        # TODO: Validate the salt, too.
5538+        self.log('share %d is valid for segment %d' % (reader.shnum,
5539+                                                       segnum))
5540+        return {reader.shnum: (block, salt)}
5541+
5542+
5543+    def _get_needed_hashes(self, reader, segnum):
5544+        """
5545+        I get the hashes needed to validate segnum from the reader, then return
5546+        to my caller when this is done.
5547+        """
5548+        bht = self._block_hash_trees[reader.shnum]
5549+        needed = bht.needed_hashes(segnum, include_leaf=True)
5550+        # The root of the block hash tree is also a leaf in the share
5551+        # hash tree. So we don't need to fetch it from the remote
5552+        # server. In the case of files with one segment, this means that
5553+        # we won't fetch any block hash tree from the remote server,
5554+        # since the hash of each share of the file is the entire block
5555+        # hash tree, and is a leaf in the share hash tree. This is fine,
5556+        # since any share corruption will be detected in the share hash
5557+        # tree.
5558+        #needed.discard(0)
5559+        self.log("getting blockhashes for segment %d, share %d: %s" % \
5560+                 (segnum, reader.shnum, str(needed)))
5561+        d1 = reader.get_blockhashes(needed, queue=True, force_remote=True)
5562+        if self.share_hash_tree.needed_hashes(reader.shnum):
5563+            need = self.share_hash_tree.needed_hashes(reader.shnum)
5564+            self.log("also need sharehashes for share %d: %s" % (reader.shnum,
5565+                                                                 str(need)))
5566+            d2 = reader.get_sharehashes(need, queue=True, force_remote=True)
5567+        else:
5568+            d2 = defer.succeed({}) # the logic in the next method
5569+                                   # expects a dict
5570+        dl = defer.DeferredList([d1, d2], consumeErrors=True)
5571+        return dl
5572+
5573+
5574+    def _decode_blocks(self, blocks_and_salts, segnum):
5575+        """
5576+        I take a list of k blocks and salts, and decode that into a
5577+        single encrypted segment.
5578+        """
5579+        d = {}
5580+        # We want to merge our dictionaries to the form
5581+        # {shnum: blocks_and_salts}
5582+        #
5583+        # The dictionaries come from validate block that way, so we just
5584+        # need to merge them.
5585+        for block_and_salt in blocks_and_salts:
5586+            d.update(block_and_salt[1])
5587+
5588+        # All of these blocks should have the same salt; in SDMF, it is
5589+        # the file-wide IV, while in MDMF it is the per-segment salt. In
5590+        # either case, we just need to get one of them and use it.
5591+        #
5592+        # d.items()[0] is like (shnum, (block, salt))
5593+        # d.items()[0][1] is like (block, salt)
5594+        # d.items()[0][1][1] is the salt.
5595+        salt = d.items()[0][1][1]
5596+        # Next, extract just the blocks from the dict. We'll use the
5597+        # salt in the next step.
5598+        share_and_shareids = [(k, v[0]) for k, v in d.items()]
5599+        d2 = dict(share_and_shareids)
5600+        shareids = []
5601+        shares = []
5602+        for shareid, share in d2.items():
5603             shareids.append(shareid)
5604             shares.append(share)
5605 
5606hunk ./src/allmydata/mutable/retrieve.py 746
5607-        assert len(shareids) >= k, len(shareids)
5608+        assert len(shareids) >= self._required_shares, len(shareids)
5609         # zfec really doesn't want extra shares
5610hunk ./src/allmydata/mutable/retrieve.py 748
5611-        shareids = shareids[:k]
5612-        shares = shares[:k]
5613-
5614-        fec = codec.CRSDecoder()
5615-        fec.set_params(segsize, k, N)
5616-
5617-        self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
5618-        self.log("about to decode, shareids=%s" % (shareids,))
5619-        d = defer.maybeDeferred(fec.decode, shares, shareids)
5620-        def _done(buffers):
5621-            self._status.timings["decode"] = time.time() - started
5622-            self.log(" decode done, %d buffers" % len(buffers))
5623+        shareids = shareids[:self._required_shares]
5624+        shares = shares[:self._required_shares]
5625+        self.log("decoding segment %d" % segnum)
5626+        if segnum == self._num_segments - 1:
5627+            d = defer.maybeDeferred(self._tail_decoder.decode, shares, shareids)
5628+        else:
5629+            d = defer.maybeDeferred(self._segment_decoder.decode, shares, shareids)
5630+        def _process(buffers):
5631             segment = "".join(buffers)
5632hunk ./src/allmydata/mutable/retrieve.py 757
5633+            self.log(format="now decoding segment %(segnum)s of %(numsegs)s",
5634+                     segnum=segnum,
5635+                     numsegs=self._num_segments,
5636+                     level=log.NOISY)
5637             self.log(" joined length %d, datalength %d" %
5638hunk ./src/allmydata/mutable/retrieve.py 762
5639-                     (len(segment), datalength))
5640-            segment = segment[:datalength]
5641+                     (len(segment), self._data_length))
5642+            if segnum == self._num_segments - 1:
5643+                size_to_use = self._tail_data_size
5644+            else:
5645+                size_to_use = self._segment_size
5646+            segment = segment[:size_to_use]
5647             self.log(" segment len=%d" % len(segment))
5648hunk ./src/allmydata/mutable/retrieve.py 769
5649-            return segment
5650-        def _err(f):
5651-            self.log(" decode failed: %s" % f)
5652-            return f
5653-        d.addCallback(_done)
5654-        d.addErrback(_err)
5655+            return segment, salt
5656+        d.addCallback(_process)
5657         return d
5658 
5659hunk ./src/allmydata/mutable/retrieve.py 773
5660-    def _decrypt(self, crypttext, IV, readkey):
5661+
5662+    def _decrypt_segment(self, segment_and_salt):
5663+        """
5664+        I take a single segment and its salt, and decrypt it. I return
5665+        the plaintext of the segment that is in my argument.
5666+        """
5667+        segment, salt = segment_and_salt
5668         self._status.set_status("decrypting")
5669hunk ./src/allmydata/mutable/retrieve.py 781
5670+        self.log("decrypting segment %d" % self._current_segment)
5671         started = time.time()
5672hunk ./src/allmydata/mutable/retrieve.py 783
5673-        key = hashutil.ssk_readkey_data_hash(IV, readkey)
5674+        key = hashutil.ssk_readkey_data_hash(salt, self._node.get_readkey())
5675         decryptor = AES(key)
5676hunk ./src/allmydata/mutable/retrieve.py 785
5677-        plaintext = decryptor.process(crypttext)
5678+        plaintext = decryptor.process(segment)
5679         self._status.timings["decrypt"] = time.time() - started
5680         return plaintext
5681 
5682hunk ./src/allmydata/mutable/retrieve.py 789
5683-    def _done(self, res):
5684-        if not self._running:
5685+
5686+    def notify_server_corruption(self, peerid, shnum, reason):
5687+        ss = self.servermap.connections[peerid]
5688+        ss.callRemoteOnly("advise_corrupt_share",
5689+                          "mutable", self._storage_index, shnum, reason)
5690+
5691+
5692+    def _try_to_validate_privkey(self, enc_privkey, reader):
5693+
5694+        alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
5695+        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
5696+        if alleged_writekey != self._node.get_writekey():
5697+            self.log("invalid privkey from %s shnum %d" %
5698+                     (reader, reader.shnum),
5699+                     level=log.WEIRD, umid="YIw4tA")
5700             return
5701hunk ./src/allmydata/mutable/retrieve.py 805
5702-        self._running = False
5703-        self._status.set_active(False)
5704-        self._status.timings["total"] = time.time() - self._started
5705-        # res is either the new contents, or a Failure
5706-        if isinstance(res, failure.Failure):
5707-            self.log("Retrieve done, with failure", failure=res,
5708-                     level=log.UNUSUAL)
5709-            self._status.set_status("Failed")
5710-        else:
5711-            self.log("Retrieve done, success!")
5712-            self._status.set_status("Finished")
5713-            self._status.set_progress(1.0)
5714-            # remember the encoding parameters, use them again next time
5715-            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
5716-             offsets_tuple) = self.verinfo
5717-            self._node._populate_required_shares(k)
5718-            self._node._populate_total_shares(N)
5719-        eventually(self._done_deferred.callback, res)
5720 
5721hunk ./src/allmydata/mutable/retrieve.py 806
5722+        # it's good
5723+        self.log("got valid privkey from shnum %d on reader %s" %
5724+                 (reader.shnum, reader))
5725+        privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
5726+        self._node._populate_encprivkey(enc_privkey)
5727+        self._node._populate_privkey(privkey)
5728+        self._need_privkey = False
5729+
5730+
5731+    def _check_for_done(self, res):
5732+        """
5733+        I check to see if this Retrieve object has successfully finished
5734+        its work.
5735+
5736+        I can exit in the following ways:
5737+            - If there are no more segments to download, then I exit by
5738+              causing self._done_deferred to fire with the plaintext
5739+              content requested by the caller.
5740+            - If there are still segments to be downloaded, and there
5741+              are enough active readers (readers which have not broken
5742+              and have not given us corrupt data) to continue
5743+              downloading, I send control back to
5744+              _download_current_segment.
5745+            - If there are still segments to be downloaded but there are
5746+              not enough active peers to download them, I ask
5747+              _add_active_peers to add more peers. If it is successful,
5748+              it will call _download_current_segment. If there are not
5749+              enough peers to retrieve the file, then that will cause
5750+              _done_deferred to errback.
5751+        """
5752+        self.log("checking for doneness")
5753+        if self._current_segment == self._num_segments:
5754+            # No more segments to download, we're done.
5755+            self.log("got plaintext, done")
5756+            return self._done()
5757+
5758+        if len(self._active_readers) >= self._required_shares:
5759+            # More segments to download, but we have enough good peers
5760+            # in self._active_readers that we can do that without issue,
5761+            # so go nab the next segment.
5762+            self.log("not done yet: on segment %d of %d" % \
5763+                     (self._current_segment + 1, self._num_segments))
5764+            return self._download_current_segment()
5765+
5766+        self.log("not done yet: on segment %d of %d, need to add peers" % \
5767+                 (self._current_segment + 1, self._num_segments))
5768+        return self._add_active_peers()
5769+
5770+
5771+    def _done(self):
5772+        """
5773+        I am called by _check_for_done when the download process has
5774+        finished successfully. After making some useful logging
5775+        statements, I return the decrypted contents to the owner of this
5776+        Retrieve object through self._done_deferred.
5777+        """
5778+        eventually(self._done_deferred.callback, self._plaintext)
5779+
5780+
5781+    def _failed(self):
5782+        """
5783+        I am called by _add_active_peers when there are not enough
5784+        active peers left to complete the download. After making some
5785+        useful logging statements, I return an exception to that effect
5786+        to the caller of this Retrieve object through
5787+        self._done_deferred.
5788+        """
5789+        format = ("ran out of peers: "
5790+                  "have %(have)d of %(total)d segments "
5791+                  "found %(bad)d bad shares "
5792+                  "encoding %(k)d-of-%(n)d")
5793+        args = {"have": self._current_segment,
5794+                "total": self._num_segments,
5795+                "k": self._required_shares,
5796+                "n": self._total_shares,
5797+                "bad": len(self._bad_shares)}
5798+        e = NotEnoughSharesError("%s, last failure: %s" % (format % args,
5799+                                                        str(self._last_failure)))
5800+        f = failure.Failure(e)
5801+        eventually(self._done_deferred.callback, f)
5802hunk ./src/allmydata/test/test_mutable.py 12
5803 from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
5804      ssk_pubkey_fingerprint_hash
5805 from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
5806-     NotEnoughSharesError
5807+     NotEnoughSharesError, SDMF_VERSION, MDMF_VERSION
5808 from allmydata.monitor import Monitor
5809 from allmydata.test.common import ShouldFailMixin
5810 from allmydata.test.no_network import GridTestMixin
5811hunk ./src/allmydata/test/test_mutable.py 28
5812 from allmydata.mutable.retrieve import Retrieve
5813 from allmydata.mutable.publish import Publish
5814 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
5815-from allmydata.mutable.layout import unpack_header, unpack_share
5816+from allmydata.mutable.layout import unpack_header, unpack_share, \
5817+                                     MDMFSlotReadProxy
5818 from allmydata.mutable.repairer import MustForceRepairError
5819 
5820 import allmydata.test.common_util as testutil
5821hunk ./src/allmydata/test/test_mutable.py 104
5822         d = fireEventually()
5823         d.addCallback(lambda res: _call())
5824         return d
5825+
5826     def callRemoteOnly(self, methname, *args, **kwargs):
5827         d = self.callRemote(methname, *args, **kwargs)
5828         d.addBoth(lambda ignore: None)
5829hunk ./src/allmydata/test/test_mutable.py 163
5830 def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
5831     # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
5832     # list of shnums to corrupt.
5833+    ds = []
5834     for peerid in s._peers:
5835         shares = s._peers[peerid]
5836         for shnum in shares:
5837hunk ./src/allmydata/test/test_mutable.py 190
5838                 else:
5839                     offset1 = offset
5840                     offset2 = 0
5841-                if offset1 == "pubkey":
5842+                if offset1 == "pubkey" and IV:
5843                     real_offset = 107
5844hunk ./src/allmydata/test/test_mutable.py 192
5845+                elif offset1 == "share_data" and not IV:
5846+                    real_offset = 104
5847                 elif offset1 in o:
5848                     real_offset = o[offset1]
5849                 else:
5850hunk ./src/allmydata/test/test_mutable.py 327
5851         d.addCallback(_created)
5852         return d
5853 
5854+
5855+    def test_upload_and_download_mdmf(self):
5856+        d = self.nodemaker.create_mutable_file(version=MDMF_VERSION)
5857+        def _created(n):
5858+            d = defer.succeed(None)
5859+            d.addCallback(lambda ignored:
5860+                n.get_servermap(MODE_READ))
5861+            def _then(servermap):
5862+                dumped = servermap.dump(StringIO())
5863+                self.failUnlessIn("3-of-10", dumped.getvalue())
5864+            d.addCallback(_then)
5865+            # Now overwrite the contents with some new contents. We want
5866+            # to make them big enough to force the file to be uploaded
5867+            # in more than one segment.
5868+            big_contents = "contents1" * 100000 # about 900 KiB
5869+            d.addCallback(lambda ignored:
5870+                n.overwrite(big_contents))
5871+            d.addCallback(lambda ignored:
5872+                n.download_best_version())
5873+            d.addCallback(lambda data:
5874+                self.failUnlessEqual(data, big_contents))
5875+            # Overwrite the contents again with some new contents. As
5876+            # before, they need to be big enough to force multiple
5877+            # segments, so that we make the downloader deal with
5878+            # multiple segments.
5879+            bigger_contents = "contents2" * 1000000 # about 9MiB
5880+            d.addCallback(lambda ignored:
5881+                n.overwrite(bigger_contents))
5882+            d.addCallback(lambda ignored:
5883+                n.download_best_version())
5884+            d.addCallback(lambda data:
5885+                self.failUnlessEqual(data, bigger_contents))
5886+            return d
5887+        d.addCallback(_created)
5888+        return d
5889+
5890+
5891     def test_create_with_initial_contents(self):
5892         d = self.nodemaker.create_mutable_file("contents 1")
5893         def _created(n):
5894hunk ./src/allmydata/test/test_mutable.py 1147
5895 
5896 
5897     def _test_corrupt_all(self, offset, substring,
5898-                          should_succeed=False, corrupt_early=True,
5899-                          failure_checker=None):
5900+                          should_succeed=False,
5901+                          corrupt_early=True,
5902+                          failure_checker=None,
5903+                          fetch_privkey=False):
5904         d = defer.succeed(None)
5905         if corrupt_early:
5906             d.addCallback(corrupt, self._storage, offset)
5907hunk ./src/allmydata/test/test_mutable.py 1167
5908                     self.failUnlessIn(substring, "".join(allproblems))
5909                 return servermap
5910             if should_succeed:
5911-                d1 = self._fn.download_version(servermap, ver)
5912+                d1 = self._fn.download_version(servermap, ver,
5913+                                               fetch_privkey)
5914                 d1.addCallback(lambda new_contents:
5915                                self.failUnlessEqual(new_contents, self.CONTENTS))
5916             else:
5917hunk ./src/allmydata/test/test_mutable.py 1175
5918                 d1 = self.shouldFail(NotEnoughSharesError,
5919                                      "_corrupt_all(offset=%s)" % (offset,),
5920                                      substring,
5921-                                     self._fn.download_version, servermap, ver)
5922+                                     self._fn.download_version, servermap,
5923+                                                                ver,
5924+                                                                fetch_privkey)
5925             if failure_checker:
5926                 d1.addCallback(failure_checker)
5927             d1.addCallback(lambda res: servermap)
5928hunk ./src/allmydata/test/test_mutable.py 1186
5929         return d
5930 
5931     def test_corrupt_all_verbyte(self):
5932-        # when the version byte is not 0, we hit an UnknownVersionError error
5933-        # in unpack_share().
5934+        # when the version byte is not 0 or 1, we hit an UnknownVersionError
5935+        # error in unpack_share().
5936         d = self._test_corrupt_all(0, "UnknownVersionError")
5937         def _check_servermap(servermap):
5938             # and the dump should mention the problems
5939hunk ./src/allmydata/test/test_mutable.py 1193
5940             s = StringIO()
5941             dump = servermap.dump(s).getvalue()
5942-            self.failUnless("10 PROBLEMS" in dump, dump)
5943+            self.failUnless("30 PROBLEMS" in dump, dump)
5944         d.addCallback(_check_servermap)
5945         return d
5946 
5947hunk ./src/allmydata/test/test_mutable.py 1263
5948         return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
5949 
5950 
5951+    def test_corrupt_all_encprivkey_late(self):
5952+        # this should work for the same reason as above, but we corrupt
5953+        # after the servermap update to exercise the error handling
5954+        # code.
5955+        # We need to remove the privkey from the node, or the retrieve
5956+        # process won't know to update it.
5957+        self._fn._privkey = None
5958+        return self._test_corrupt_all("enc_privkey",
5959+                                      None, # this shouldn't fail
5960+                                      should_succeed=True,
5961+                                      corrupt_early=False,
5962+                                      fetch_privkey=True)
5963+
5964+
5965     def test_corrupt_all_seqnum_late(self):
5966         # corrupting the seqnum between mapupdate and retrieve should result
5967         # in NotEnoughSharesError, since each share will look invalid
5968hunk ./src/allmydata/test/test_mutable.py 1283
5969         def _check(res):
5970             f = res[0]
5971             self.failUnless(f.check(NotEnoughSharesError))
5972-            self.failUnless("someone wrote to the data since we read the servermap" in str(f))
5973+            self.failUnless("uncoordinated write" in str(f))
5974         return self._test_corrupt_all(1, "ran out of peers",
5975                                       corrupt_early=False,
5976                                       failure_checker=_check)
5977hunk ./src/allmydata/test/test_mutable.py 1333
5978                       self.failUnlessEqual(new_contents, self.CONTENTS))
5979         return d
5980 
5981-    def test_corrupt_some(self):
5982-        # corrupt the data of first five shares (so the servermap thinks
5983-        # they're good but retrieve marks them as bad), so that the
5984-        # MODE_READ set of 6 will be insufficient, forcing node.download to
5985-        # retry with more servers.
5986-        corrupt(None, self._storage, "share_data", range(5))
5987-        d = self.make_servermap()
5988+
5989+    def _test_corrupt_some(self, offset, mdmf=False):
5990+        if mdmf:
5991+            d = self.publish_mdmf()
5992+        else:
5993+            d = defer.succeed(None)
5994+        d.addCallback(lambda ignored:
5995+            corrupt(None, self._storage, offset, range(5)))
5996+        d.addCallback(lambda ignored:
5997+            self.make_servermap())
5998         def _do_retrieve(servermap):
5999             ver = servermap.best_recoverable_version()
6000             self.failUnless(ver)
6001hunk ./src/allmydata/test/test_mutable.py 1349
6002             return self._fn.download_best_version()
6003         d.addCallback(_do_retrieve)
6004         d.addCallback(lambda new_contents:
6005-                      self.failUnlessEqual(new_contents, self.CONTENTS))
6006+            self.failUnlessEqual(new_contents, self.CONTENTS))
6007         return d
6008 
6009hunk ./src/allmydata/test/test_mutable.py 1352
6010+
6011+    def test_corrupt_some(self):
6012+        # corrupt the data of first five shares (so the servermap thinks
6013+        # they're good but retrieve marks them as bad), so that the
6014+        # MODE_READ set of 6 will be insufficient, forcing node.download to
6015+        # retry with more servers.
6016+        return self._test_corrupt_some("share_data")
6017+
6018+
6019     def test_download_fails(self):
6020         d = corrupt(None, self._storage, "signature")
6021         d.addCallback(lambda ignored:
6022hunk ./src/allmydata/test/test_mutable.py 1366
6023             self.shouldFail(UnrecoverableFileError, "test_download_anyway",
6024                             "no recoverable versions",
6025-                            self._fn.download_best_version)
6026+                            self._fn.download_best_version))
6027         return d
6028 
6029 
6030hunk ./src/allmydata/test/test_mutable.py 1370
6031+
6032+    def test_corrupt_mdmf_block_hash_tree(self):
6033+        d = self.publish_mdmf()
6034+        d.addCallback(lambda ignored:
6035+            self._test_corrupt_all(("block_hash_tree", 12 * 32),
6036+                                   "block hash tree failure",
6037+                                   corrupt_early=False,
6038+                                   should_succeed=False))
6039+        return d
6040+
6041+
6042+    def test_corrupt_mdmf_block_hash_tree_late(self):
6043+        d = self.publish_mdmf()
6044+        d.addCallback(lambda ignored:
6045+            self._test_corrupt_all(("block_hash_tree", 12 * 32),
6046+                                   "block hash tree failure",
6047+                                   corrupt_early=True,
6048+                                   should_succeed=False))
6049+        return d
6050+
6051+
6052+    def test_corrupt_mdmf_share_data(self):
6053+        d = self.publish_mdmf()
6054+        d.addCallback(lambda ignored:
6055+            # TODO: Find out what the block size is and corrupt a
6056+            # specific block, rather than just guessing.
6057+            self._test_corrupt_all(("share_data", 12 * 40),
6058+                                    "block hash tree failure",
6059+                                    corrupt_early=True,
6060+                                    should_succeed=False))
6061+        return d
6062+
6063+
6064+    def test_corrupt_some_mdmf(self):
6065+        return self._test_corrupt_some(("share_data", 12 * 40),
6066+                                       mdmf=True)
6067+
6068+
6069 class CheckerMixin:
6070     def check_good(self, r, where):
6071         self.failUnless(r.is_healthy(), where)
6072hunk ./src/allmydata/test/test_mutable.py 2116
6073             d.addCallback(lambda res:
6074                           self.shouldFail(NotEnoughSharesError,
6075                                           "test_retrieve_surprise",
6076-                                          "ran out of peers: have 0 shares (k=3)",
6077+                                          "ran out of peers: have 0 of 1",
6078                                           n.download_version,
6079                                           self.old_map,
6080                                           self.old_map.best_recoverable_version(),
6081hunk ./src/allmydata/test/test_mutable.py 2125
6082         d.addCallback(_created)
6083         return d
6084 
6085+
6086     def test_unexpected_shares(self):
6087         # upload the file, take a servermap, shut down one of the servers,
6088         # upload it again (causing shares to appear on a new server), then
6089hunk ./src/allmydata/test/test_mutable.py 2329
6090         self.basedir = "mutable/Problems/test_privkey_query_missing"
6091         self.set_up_grid(num_servers=20)
6092         nm = self.g.clients[0].nodemaker
6093-        LARGE = "These are Larger contents" * 2000 # about 50KB
6094+        LARGE = "These are Larger contents" * 2000 # about 50KiB
6095         nm._node_cache = DevNullDictionary() # disable the nodecache
6096 
6097         d = nm.create_mutable_file(LARGE)
6098hunk ./src/allmydata/test/test_mutable.py 2342
6099         d.addCallback(_created)
6100         d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
6101         return d
6102+
6103+
6104+    def test_block_and_hash_query_error(self):
6105+        # This tests for what happens when a query to a remote server
6106+        # fails in either the hash validation step or the block getting
6107+        # step (because of batching, this is the same actual query).
6108+        # We need to have the storage server persist up until the point
6109+        # that its prefix is validated, then suddenly die. This
6110+        # exercises some exception handling code in Retrieve.
6111+        self.basedir = "mutable/Problems/test_block_and_hash_query_error"
6112+        self.set_up_grid(num_servers=20)
6113+        nm = self.g.clients[0].nodemaker
6114+        CONTENTS = "contents" * 2000
6115+        d = nm.create_mutable_file(CONTENTS)
6116+        def _created(node):
6117+            self._node = node
6118+        d.addCallback(_created)
6119+        d.addCallback(lambda ignored:
6120+            self._node.get_servermap(MODE_READ))
6121+        def _then(servermap):
6122+            # we have our servermap. Now we set up the servers like the
6123+            # tests above -- the first one that gets a read call should
6124+            # start throwing errors, but only after returning its prefix
6125+            # for validation. Since we'll download without fetching the
6126+            # private key, the next query to the remote server will be
6127+            # for either a block and salt or for hashes, either of which
6128+            # will exercise the error handling code.
6129+            killer = FirstServerGetsKilled()
6130+            for (serverid, ss) in nm.storage_broker.get_all_servers():
6131+                ss.post_call_notifier = killer.notify
6132+            ver = servermap.best_recoverable_version()
6133+            assert ver
6134+            return self._node.download_version(servermap, ver)
6135+        d.addCallback(_then)
6136+        d.addCallback(lambda data:
6137+            self.failUnlessEqual(data, CONTENTS))
6138+        return d
6139}
6140
6141Context:
6142
6143[docs: about.html link to home page early on, and be decentralized storage instead of cloud storage this time around
6144zooko@zooko.com**20100619065318
6145 Ignore-this: dc6db03f696e5b6d2848699e754d8053
6146] 
6147[docs: update about.html, especially to have a non-broken link to quickstart.html, and also to comment out the broken links to "for Paranoids" and "for Corporates"
6148zooko@zooko.com**20100619065124
6149 Ignore-this: e292c7f51c337a84ebfeb366fbd24d6c
6150] 
6151[TAG allmydata-tahoe-1.7.0
6152zooko@zooko.com**20100619052631
6153 Ignore-this: d21e27afe6d85e2e3ba6a3292ba2be1
6154] 
6155Patch bundle hash:
61567595cd3e7a002ae1c9c0f6ba7e21f39a7f4477e1