source: trunk/src/allmydata/test/test_storage.py

Last change on this file was 4da491a, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-03-11T20:37:27Z

remove more usage of "future"

  • Property mode set to 100644
File size: 149.7 KB
Line 
1"""
2Tests for allmydata.storage.
3
4Ported to Python 3.
5"""
6
7from __future__ import annotations
8from future.utils import bchr
9from six import ensure_str
10
11from io import (
12    BytesIO,
13)
14import time
15import os.path
16import platform
17import stat
18import struct
19import shutil
20from functools import partial
21from uuid import uuid4
22
23from testtools.matchers import (
24    Equals,
25    NotEquals,
26    Contains,
27    HasLength,
28    IsInstance,
29)
30
31from twisted.trial import unittest
32
33from twisted.internet import defer
34from twisted.internet.task import Clock
35
36from hypothesis import given, strategies, example
37
38import itertools
39from allmydata import interfaces
40from allmydata.util import fileutil, hashutil, base32
41from allmydata.storage.server import (
42    StorageServer, DEFAULT_RENEWAL_TIME, FoolscapStorageServer,
43)
44from allmydata.storage.shares import get_share_file
45from allmydata.storage.mutable import MutableShareFile
46from allmydata.storage.mutable_schema import (
47    ALL_SCHEMAS as ALL_MUTABLE_SCHEMAS,
48)
49from allmydata.storage.immutable import (
50    BucketWriter, BucketReader, ShareFile, FoolscapBucketWriter,
51    FoolscapBucketReader,
52)
53from allmydata.storage.immutable_schema import (
54    ALL_SCHEMAS as ALL_IMMUTABLE_SCHEMAS,
55)
56from allmydata.storage.common import storage_index_to_dir, \
57     UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \
58     si_b2a, si_a2b
59from allmydata.storage.lease import LeaseInfo
60from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
61     ReadBucketProxy, _WriteBuffer
62from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
63                                     LayoutInvalid, MDMFSIGNABLEHEADER, \
64                                     SIGNED_PREFIX, MDMFHEADER, \
65                                     MDMFOFFSETS, SDMFSlotWriteProxy, \
66                                     PRIVATE_KEY_SIZE, \
67                                     SIGNATURE_SIZE, \
68                                     VERIFICATION_KEY_SIZE, \
69                                     SHARE_HASH_CHAIN_SIZE
70from allmydata.interfaces import (
71    BadWriteEnablerError, DataTooLargeError, ConflictingWriteError,
72)
73from allmydata.test.no_network import NoNetworkServer
74from allmydata.storage_client import (
75    _StorageServer,
76)
77from .common import (
78    LoggingServiceParent,
79    ShouldFailMixin,
80    FakeDisk,
81    SyncTestCase,
82    AsyncTestCase,
83)
84
85from .common_util import FakeCanary
86from .common_storage import (
87    upload_immutable,
88    upload_mutable,
89)
90from .strategies import (
91    offsets,
92    lengths,
93)
94
95
96class UtilTests(SyncTestCase):
97    """Tests for allmydata.storage.common and .shares."""
98
99    def test_encoding(self):
100        """b2a/a2b are the same as base32."""
101        s = b"\xFF HELLO \xF3"
102        result = si_b2a(s)
103        self.assertThat(base32.b2a(s), Equals(result))
104        self.assertThat(si_a2b(result), Equals(s))
105
106    def test_storage_index_to_dir(self):
107        """storage_index_to_dir creates a native string path."""
108        s = b"\xFF HELLO \xF3"
109        path = storage_index_to_dir(s)
110        parts = os.path.split(path)
111        self.assertThat(parts[0], Equals(parts[1][:2]))
112        self.assertThat(path, IsInstance(str))
113
114    def test_get_share_file_mutable(self):
115        """A mutable share is identified by get_share_file()."""
116        path = self.mktemp()
117        msf = MutableShareFile(path)
118        msf.create(b"12", b"abc")  # arbitrary values
119        loaded = get_share_file(path)
120        self.assertThat(loaded, IsInstance(MutableShareFile))
121        self.assertThat(loaded.home, Equals(path))
122
123    def test_get_share_file_immutable(self):
124        """An immutable share is identified by get_share_file()."""
125        path = self.mktemp()
126        _ = ShareFile(path, max_size=1000, create=True)
127        loaded = get_share_file(path)
128        self.assertThat(loaded, IsInstance(ShareFile))
129        self.assertThat(loaded.home, Equals(path))
130
131
132class FakeStatsProvider(object):
133    def count(self, name, delta=1):
134        pass
135    def register_producer(self, producer):
136        pass
137
138
139class Bucket(SyncTestCase):
140    def make_workdir(self, name):
141        basedir = os.path.join("storage", "Bucket", name)
142        incoming = os.path.join(basedir, "tmp", "bucket")
143        final = os.path.join(basedir, "bucket")
144        fileutil.make_dirs(basedir)
145        fileutil.make_dirs(os.path.join(basedir, "tmp"))
146        return incoming, final
147
148    def bucket_writer_closed(self, bw, consumed):
149        pass
150    def add_latency(self, category, latency):
151        pass
152    def count(self, name, delta=1):
153        pass
154
155    def make_lease(self):
156        owner_num = 0
157        renew_secret = os.urandom(32)
158        cancel_secret = os.urandom(32)
159        expiration_time = time.time() + 5000
160        return LeaseInfo(owner_num, renew_secret, cancel_secret,
161                         expiration_time, b"\x00" * 20)
162
163    def test_create(self):
164        incoming, final = self.make_workdir("test_create")
165        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
166        bw.write(0, b"a"*25)
167        bw.write(25, b"b"*25)
168        bw.write(50, b"c"*25)
169        bw.write(75, b"d"*7)
170        bw.close()
171
172    def test_readwrite(self):
173        incoming, final = self.make_workdir("test_readwrite")
174        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
175        bw.write(0, b"a"*25)
176        bw.write(25, b"b"*25)
177        bw.write(50, b"c"*7) # last block may be short
178        bw.close()
179
180        # now read from it
181        br = BucketReader(self, bw.finalhome)
182        self.assertThat(br.read(0, 25), Equals(b"a"*25))
183        self.assertThat(br.read(25, 25), Equals(b"b"*25))
184        self.assertThat(br.read(50, 7), Equals(b"c"*7))
185
186    def test_write_past_size_errors(self):
187        """Writing beyond the size of the bucket throws an exception."""
188        for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]):
189            incoming, final = self.make_workdir(
190                "test_write_past_size_errors-{}".format(i)
191            )
192            bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
193            with self.assertRaises(DataTooLargeError):
194                bw.write(offset, b"a" * length)
195
196    @given(
197        maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
198        maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
199    )
200    def test_overlapping_writes_ok_if_matching(
201            self, maybe_overlapping_offset, maybe_overlapping_length
202    ):
203        """
204        Writes that overlap with previous writes are OK when the content is the
205        same.
206        """
207        length = 100
208        expected_data = b"".join(bchr(i) for i in range(100))
209        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
210        bw = BucketWriter(
211            self, incoming, final, length, self.make_lease(), Clock()
212        )
213        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
214        bw.write(10, expected_data[10:20])
215        bw.write(30, expected_data[30:40])
216        bw.write(50, expected_data[50:60])
217        # Then, an overlapping write but with matching data:
218        bw.write(
219            maybe_overlapping_offset,
220            expected_data[
221                maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
222            ]
223        )
224        # Now fill in the holes:
225        bw.write(0, expected_data[0:10])
226        bw.write(20, expected_data[20:30])
227        bw.write(40, expected_data[40:50])
228        bw.write(60, expected_data[60:])
229        bw.close()
230
231        br = BucketReader(self, bw.finalhome)
232        self.assertEqual(br.read(0, length), expected_data)
233
234    @given(
235        maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
236        maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
237    )
238    def test_overlapping_writes_not_ok_if_different(
239            self, maybe_overlapping_offset, maybe_overlapping_length
240    ):
241        """
242        Writes that overlap with previous writes fail with an exception if the
243        contents don't match.
244        """
245        length = 100
246        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
247        bw = BucketWriter(
248            self, incoming, final, length, self.make_lease(), Clock()
249        )
250        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
251        bw.write(10, b"1" * 10)
252        bw.write(30, b"1" * 10)
253        bw.write(50, b"1" * 10)
254        # Then, write something that might overlap with some of them, but
255        # conflicts. Then fill in holes left by first three writes. Conflict is
256        # inevitable.
257        with self.assertRaises(ConflictingWriteError):
258            bw.write(
259                maybe_overlapping_offset,
260                b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
261            )
262            bw.write(0, b"1" * 10)
263            bw.write(20, b"1" * 10)
264            bw.write(40, b"1" * 10)
265            bw.write(60, b"1" * 40)
266
267    @given(
268        offsets=strategies.lists(
269            strategies.integers(min_value=0, max_value=99),
270            min_size=20,
271            max_size=20
272        ),
273    )
274    @example(offsets=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 40, 70])
275    def test_writes_return_when_finished(
276            self, offsets
277    ):
278        """
279        The ``BucketWriter.write()`` return true if and only if the maximum
280        size has been reached via potentially overlapping writes.  The
281        remaining ranges can be checked via ``BucketWriter.required_ranges()``.
282        """
283        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
284        bw = BucketWriter(
285            self, incoming, final, 100, self.make_lease(), Clock()
286        )
287        local_written = [0] * 100
288        for offset in offsets:
289            length = min(30, 100 - offset)
290            data = b"1" * length
291            for i in range(offset, offset+length):
292                local_written[i] = 1
293            finished = bw.write(offset, data)
294            self.assertEqual(finished, sum(local_written) == 100)
295            required_ranges = bw.required_ranges()
296            for i in range(0, 100):
297                self.assertEqual(local_written[i] == 1, required_ranges.get(i) is None)
298
299    def test_read_past_end_of_share_data(self):
300        # test vector for immutable files (hard-coded contents of an immutable share
301        # file):
302
303        # The following immutable share file content is identical to that
304        # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
305        # with share data == 'a'. The total size of this content is 85
306        # bytes.
307
308        containerdata = struct.pack('>LLL', 1, 1, 1)
309
310        # A Tahoe-LAFS storage client would send as the share_data a
311        # complicated string involving hash trees and a URI Extension Block
312        # -- see allmydata/immutable/layout.py . This test, which is
313        # simulating a client, just sends 'a'.
314        share_data = b'a'
315
316        ownernumber = struct.pack('>L', 0)
317        renewsecret  = b'THIS LETS ME RENEW YOUR FILE....'
318        assert len(renewsecret) == 32
319        cancelsecret = b'THIS LETS ME KILL YOUR FILE HAHA'
320        assert len(cancelsecret) == 32
321        expirationtime = struct.pack('>L', DEFAULT_RENEWAL_TIME) # 31 days in seconds
322
323        lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
324
325        share_file_data = containerdata + share_data + lease_data
326
327        incoming, final = self.make_workdir("test_read_past_end_of_share_data")
328
329        fileutil.write(final, share_file_data)
330
331        class MockStorageServer(object):
332            def add_latency(self, category, latency):
333                pass
334            def count(self, name, delta=1):
335                pass
336
337        mockstorageserver = MockStorageServer()
338
339        # Now read from it.
340        br = BucketReader(mockstorageserver, final)
341
342        self.assertThat(br.read(0, len(share_data)), Equals(share_data))
343
344        # Read past the end of share data to get the cancel secret.
345        read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
346
347        result_of_read = br.read(0, read_length)
348        self.assertThat(result_of_read, Equals(share_data))
349
350        result_of_read = br.read(0, len(share_data)+1)
351        self.assertThat(result_of_read, Equals(share_data))
352
353    def _assert_timeout_only_after_30_minutes(self, clock, bw):
354        """
355        The ``BucketWriter`` times out and is closed after 30 minutes, but not
356        sooner.
357        """
358        self.assertFalse(bw.closed)
359        # 29 minutes pass. Everything is fine.
360        for i in range(29):
361            clock.advance(60)
362            self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,))
363        # After the 30th minute, the bucket is closed due to lack of writes.
364        clock.advance(60)
365        self.assertTrue(bw.closed)
366
367    def test_bucket_expires_if_no_writes_for_30_minutes(self):
368        """
369        If a ``BucketWriter`` receives no writes for 30 minutes, it is removed.
370        """
371        incoming, final = self.make_workdir("test_bucket_expires")
372        clock = Clock()
373        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
374        self._assert_timeout_only_after_30_minutes(clock, bw)
375
376    def test_bucket_writes_delay_timeout(self):
377        """
378        So long as the ``BucketWriter`` receives writes, the the removal
379        timeout is put off.
380        """
381        incoming, final = self.make_workdir("test_bucket_writes_delay_timeout")
382        clock = Clock()
383        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
384        # 29 minutes pass, getting close to the timeout...
385        clock.advance(29 * 60)
386        # .. but we receive a write! So that should delay the timeout again to
387        # another 30 minutes.
388        bw.write(0, b"hello")
389        self._assert_timeout_only_after_30_minutes(clock, bw)
390
391    def test_bucket_closing_cancels_timeout(self):
392        """
393        Closing cancels the ``BucketWriter`` timeout.
394        """
395        incoming, final = self.make_workdir("test_bucket_close_timeout")
396        clock = Clock()
397        bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
398        self.assertTrue(clock.getDelayedCalls())
399        bw.close()
400        self.assertFalse(clock.getDelayedCalls())
401
402    def test_bucket_aborting_cancels_timeout(self):
403        """
404        Closing cancels the ``BucketWriter`` timeout.
405        """
406        incoming, final = self.make_workdir("test_bucket_abort_timeout")
407        clock = Clock()
408        bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
409        self.assertTrue(clock.getDelayedCalls())
410        bw.abort()
411        self.assertFalse(clock.getDelayedCalls())
412
413
414class RemoteBucket(object):
415
416    def __init__(self, target):
417        self.target = target
418        self.read_count = 0
419        self.write_count = 0
420
421    def callRemote(self, methname, *args, **kwargs):
422        def _call():
423            meth = getattr(self.target, "remote_" + methname)
424            return meth(*args, **kwargs)
425
426        if methname == "slot_readv":
427            self.read_count += 1
428        if "writev" in methname:
429            self.write_count += 1
430
431        return defer.maybeDeferred(_call)
432
433
434class BucketProxy(AsyncTestCase):
435    def make_bucket(self, name, size):
436        basedir = os.path.join("storage", "BucketProxy", name)
437        incoming = os.path.join(basedir, "tmp", "bucket")
438        final = os.path.join(basedir, "bucket")
439        fileutil.make_dirs(basedir)
440        fileutil.make_dirs(os.path.join(basedir, "tmp"))
441        bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
442        rb = RemoteBucket(FoolscapBucketWriter(bw))
443        return bw, rb, final
444
445    def make_lease(self):
446        owner_num = 0
447        renew_secret = os.urandom(32)
448        cancel_secret = os.urandom(32)
449        expiration_time = time.time() + 5000
450        return LeaseInfo(owner_num, renew_secret, cancel_secret,
451                         expiration_time, b"\x00" * 20)
452
453    def bucket_writer_closed(self, bw, consumed):
454        pass
455    def add_latency(self, category, latency):
456        pass
457    def count(self, name, delta=1):
458        pass
459
460    def test_create(self):
461        bw, rb, sharefname = self.make_bucket("test_create", 500)
462        bp = WriteBucketProxy(rb, None,
463                              data_size=300,
464                              block_size=10,
465                              num_segments=5,
466                              num_share_hashes=3,
467                              uri_extension_size=500)
468        self.assertTrue(interfaces.IStorageBucketWriter.providedBy(bp), bp)
469
470    def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
471        # Let's pretend each share has 100 bytes of data, and that there are
472        # 4 segments (25 bytes each), and 8 shares total. So the two
473        # per-segment merkle trees (crypttext_hash_tree,
474        # block_hashes) will have 4 leaves and 7 nodes each. The per-share
475        # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
476        # nodes. Furthermore, let's assume the uri_extension is 500 bytes
477        # long. That should make the whole share:
478        #
479        # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
480        # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
481
482        sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
483
484        crypttext_hashes = [hashutil.tagged_hash(b"crypt", b"bar%d" % i)
485                            for i in range(7)]
486        block_hashes = [hashutil.tagged_hash(b"block", b"bar%d" % i)
487                        for i in range(7)]
488        share_hashes = [(i, hashutil.tagged_hash(b"share", b"bar%d" % i))
489                        for i in (1,9,13)]
490        uri_extension = b"s" + b"E"*498 + b"e"
491
492        bw, rb, sharefname = self.make_bucket(name, sharesize)
493        bp = wbp_class(rb, None,
494                       data_size=95,
495                       block_size=25,
496                       num_segments=4,
497                       num_share_hashes=3,
498                       uri_extension_size=len(uri_extension))
499
500        d = bp.put_header()
501        d.addCallback(lambda res: bp.put_block(0, b"a"*25))
502        d.addCallback(lambda res: bp.put_block(1, b"b"*25))
503        d.addCallback(lambda res: bp.put_block(2, b"c"*25))
504        d.addCallback(lambda res: bp.put_block(3, b"d"*20))
505        d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
506        d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
507        d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
508        d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
509        d.addCallback(lambda res: bp.close())
510
511        # now read everything back
512        def _start_reading(res):
513            br = BucketReader(self, sharefname)
514            rb = RemoteBucket(FoolscapBucketReader(br))
515            server = NoNetworkServer(b"abc", None)
516            rbp = rbp_class(rb, server, storage_index=b"")
517            self.assertThat(repr(rbp), Contains("to peer"))
518            self.assertTrue(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
519
520            d1 = rbp.get_block_data(0, 25, 25)
521            d1.addCallback(lambda res: self.failUnlessEqual(res, b"a"*25))
522            d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
523            d1.addCallback(lambda res: self.failUnlessEqual(res, b"b"*25))
524            d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
525            d1.addCallback(lambda res: self.failUnlessEqual(res, b"c"*25))
526            d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
527            d1.addCallback(lambda res: self.failUnlessEqual(res, b"d"*20))
528
529            d1.addCallback(lambda res: rbp.get_crypttext_hashes())
530            d1.addCallback(lambda res:
531                           self.failUnlessEqual(res, crypttext_hashes))
532            d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
533            d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
534            d1.addCallback(lambda res: rbp.get_share_hashes())
535            d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
536            d1.addCallback(lambda res: rbp.get_uri_extension())
537            d1.addCallback(lambda res:
538                           self.failUnlessEqual(res, uri_extension))
539
540            return d1
541
542        d.addCallback(_start_reading)
543
544        return d
545
546    def test_readwrite_v1(self):
547        return self._do_test_readwrite("test_readwrite_v1",
548                                       0x24, WriteBucketProxy, ReadBucketProxy)
549
550    def test_readwrite_v2(self):
551        return self._do_test_readwrite("test_readwrite_v2",
552                                       0x44, WriteBucketProxy_v2, ReadBucketProxy)
553
554class Server(AsyncTestCase):
555
556    def setUp(self):
557        super(Server, self).setUp()
558        self.sparent = LoggingServiceParent()
559        self.sparent.startService()
560        self._lease_secret = itertools.count()
561        self.addCleanup(self.sparent.stopService)
562
563    def workdir(self, name):
564        basedir = os.path.join("storage", "Server", name)
565        return basedir
566
567    def create(self, name, reserved_space=0, klass=StorageServer, clock=None):
568        if clock is None:
569            clock = Clock()
570        workdir = self.workdir(name)
571        ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
572                   stats_provider=FakeStatsProvider(),
573                   clock=clock)
574        ss.setServiceParent(self.sparent)
575        return ss
576
577    def test_create(self):
578        self.create("test_create")
579
580    def test_declares_fixed_1528(self):
581        ss = self.create("test_declares_fixed_1528")
582        ver = ss.get_version()
583        sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
584        self.assertTrue(sv1.get(b'prevents-read-past-end-of-share-data'), sv1)
585
586    def test_declares_maximum_share_sizes(self):
587        ss = self.create("test_declares_maximum_share_sizes")
588        ver = ss.get_version()
589        sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
590        self.assertThat(sv1, Contains(b'maximum-immutable-share-size'))
591        self.assertThat(sv1, Contains(b'maximum-mutable-share-size'))
592
593    def test_declares_available_space(self):
594        ss = self.create("test_declares_available_space")
595        ver = ss.get_version()
596        sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
597        self.assertThat(sv1, Contains(b'available-space'))
598
599    def allocate(self, ss, storage_index, sharenums, size, renew_leases=True):
600        """
601        Call directly into the storage server's allocate_buckets implementation,
602        skipping the Foolscap layer.
603        """
604        renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
605        cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
606        if isinstance(ss, FoolscapStorageServer):
607            ss = ss._server
608        return ss.allocate_buckets(
609            storage_index,
610            renew_secret, cancel_secret,
611            sharenums, size,
612            renew_leases=renew_leases,
613        )
614
615    def test_large_share(self):
616        syslow = platform.system().lower()
617        if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
618            raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
619
620        avail = fileutil.get_available_space('.', 512*2**20)
621        if avail <= 4*2**30:
622            raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
623
624        ss = self.create("test_large_share")
625
626        already,writers = self.allocate(ss, b"allocate", [0], 2**32+2)
627        self.assertThat(set(), Equals(already))
628        self.assertThat(set([0]), Equals(set(writers.keys())))
629
630        shnum, bucket = list(writers.items())[0]
631        # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
632        bucket.write(2**32, b"ab")
633        bucket.close()
634
635        readers = ss.get_buckets(b"allocate")
636        reader = readers[shnum]
637        self.assertThat(b"ab", Equals(reader.read(2**32, 2)))
638
639    def test_dont_overfill_dirs(self):
640        """
641        This test asserts that if you add a second share whose storage index
642        share lots of leading bits with an extant share (but isn't the exact
643        same storage index), this won't add an entry to the share directory.
644        """
645        ss = self.create("test_dont_overfill_dirs")
646        already, writers = self.allocate(ss, b"storageindex", [0], 10)
647        for i, wb in writers.items():
648            wb.write(0, b"%10d" % i)
649            wb.close()
650        storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
651                                "shares")
652        children_of_storedir = set(os.listdir(storedir))
653
654        # Now store another one under another storageindex that has leading
655        # chars the same as the first storageindex.
656        already, writers = self.allocate(ss, b"storageindey", [0], 10)
657        for i, wb in writers.items():
658            wb.write(0, b"%10d" % i)
659            wb.close()
660        storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
661                                "shares")
662        new_children_of_storedir = set(os.listdir(storedir))
663        self.assertThat(new_children_of_storedir, Equals(children_of_storedir))
664
665    def test_remove_incoming(self):
666        ss = self.create("test_remove_incoming")
667        already, writers = self.allocate(ss, b"vid", list(range(3)), 10)
668        for i,wb in writers.items():
669            wb.write(0, b"%10d" % i)
670            wb.close()
671        incoming_share_dir = wb.incominghome
672        incoming_bucket_dir = os.path.dirname(incoming_share_dir)
673        incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
674        incoming_dir = os.path.dirname(incoming_prefix_dir)
675        self.assertFalse(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
676        self.assertFalse(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
677        self.assertTrue(os.path.exists(incoming_dir), incoming_dir)
678
679    def test_abort(self):
680        # remote_abort, when called on a writer, should make sure that
681        # the allocated size of the bucket is not counted by the storage
682        # server when accounting for space.
683        ss = self.create("test_abort")
684        already, writers = self.allocate(ss, b"allocate", [0, 1, 2], 150)
685        self.assertThat(ss.allocated_size(), NotEquals(0))
686
687        # Now abort the writers.
688        for writer in writers.values():
689            writer.abort()
690        self.assertThat(ss.allocated_size(), Equals(0))
691
692    def test_immutable_length(self):
693        """
694        ``get_immutable_share_length()`` returns the length of an immutable
695        share, as does ``BucketWriter.get_length()``..
696        """
697        ss = self.create("test_immutable_length")
698        _, writers = self.allocate(ss, b"allocate", [22], 75)
699        bucket = writers[22]
700        bucket.write(0, b"X" * 75)
701        bucket.close()
702        self.assertThat(ss.get_immutable_share_length(b"allocate", 22), Equals(75))
703        self.assertThat(ss.get_buckets(b"allocate")[22].get_length(), Equals(75))
704
705    def test_allocate(self):
706        ss = self.create("test_allocate")
707
708        self.assertThat(ss.get_buckets(b"allocate"), Equals({}))
709
710        already,writers = self.allocate(ss, b"allocate", [0,1,2], 75)
711        self.assertThat(already, Equals(set()))
712        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
713
714        # while the buckets are open, they should not count as readable
715        self.assertThat(ss.get_buckets(b"allocate"), Equals({}))
716
717        # close the buckets
718        for i,wb in writers.items():
719            wb.write(0, b"%25d" % i)
720            wb.close()
721            # aborting a bucket that was already closed is a no-op
722            wb.abort()
723
724        # now they should be readable
725        b = ss.get_buckets(b"allocate")
726        self.assertThat(set(b.keys()), Equals(set([0,1,2])))
727        self.assertThat(b[0].read(0, 25), Equals(b"%25d" % 0))
728        b_str = str(b[0])
729        self.assertThat(b_str, Contains("BucketReader"))
730        self.assertThat(b_str, Contains("mfwgy33dmf2g 0"))
731
732        # now if we ask about writing again, the server should offer those
733        # three buckets as already present. It should offer them even if we
734        # don't ask about those specific ones.
735        already,writers = self.allocate(ss, b"allocate", [2,3,4], 75)
736        self.assertThat(already, Equals(set([0,1,2])))
737        self.assertThat(set(writers.keys()), Equals(set([3,4])))
738
739        # while those two buckets are open for writing, the server should
740        # refuse to offer them to uploaders
741
742        already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75)
743        self.assertThat(already2, Equals(set([0,1,2])))
744        self.assertThat(set(writers2.keys()), Equals(set([5])))
745
746        # aborting the writes should remove the tempfiles
747        for i,wb in writers2.items():
748            wb.abort()
749        already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75)
750        self.assertThat(already2, Equals(set([0,1,2])))
751        self.assertThat(set(writers2.keys()), Equals(set([5])))
752
753        for i,wb in writers2.items():
754            wb.abort()
755        for i,wb in writers.items():
756            wb.abort()
757
758    def test_allocate_without_lease_renewal(self):
759        """
760        ``StorageServer._allocate_buckets`` does not renew leases on existing
761        shares if ``renew_leases`` is ``False``.
762        """
763        first_lease = 456
764        second_lease = 543
765        storage_index = b"allocate"
766
767        clock = Clock()
768        clock.advance(first_lease)
769        ss = self.create(
770            "test_allocate_without_lease_renewal",
771            clock=clock,
772        )
773
774        # Put a share on there
775        already, writers = self.allocate(
776            ss, storage_index, [0], 1, renew_leases=False,
777        )
778        (writer,) = writers.values()
779        writer.write(0, b"x")
780        writer.close()
781
782        # It should have a lease granted at the current time.
783        shares = dict(ss.get_shares(storage_index))
784        self.assertEqual(
785            [first_lease],
786            list(
787                lease.get_grant_renew_time_time()
788                for lease
789                in ShareFile(shares[0]).get_leases()
790            ),
791        )
792
793        # Let some time pass so we can tell if the lease on share 0 is
794        # renewed.
795        clock.advance(second_lease)
796
797        # Put another share on there.
798        already, writers = self.allocate(
799            ss, storage_index, [1], 1, renew_leases=False,
800        )
801        (writer,) = writers.values()
802        writer.write(0, b"x")
803        writer.close()
804
805        # The first share's lease expiration time is unchanged.
806        shares = dict(ss.get_shares(storage_index))
807        self.assertThat(
808            [first_lease],
809            Equals(list(
810                lease.get_grant_renew_time_time()
811                for lease
812                in ShareFile(shares[0]).get_leases()
813            )),
814        )
815
816    def test_bad_container_version(self):
817        ss = self.create("test_bad_container_version")
818        a,w = self.allocate(ss, b"si1", [0], 10)
819        w[0].write(0, b"\xff"*10)
820        w[0].close()
821
822        fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0")
823        f = open(fn, "rb+")
824        f.seek(0)
825        f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
826        f.close()
827
828        ss.get_buckets(b"allocate")
829
830        e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
831                                  ss.get_buckets, b"si1")
832        self.assertThat(e.filename, Equals(fn))
833        self.assertThat(e.version, Equals(0))
834        self.assertThat(str(e), Contains("had unexpected version 0"))
835
836    def test_disconnect(self):
837        # simulate a disconnection
838        ss = FoolscapStorageServer(self.create("test_disconnect"))
839        renew_secret = b"r" * 32
840        cancel_secret = b"c" * 32
841        canary = FakeCanary()
842        already,writers = ss.remote_allocate_buckets(
843            b"disconnect",
844            renew_secret,
845            cancel_secret,
846            sharenums=[0,1,2],
847            allocated_size=75,
848            canary=canary,
849        )
850        self.assertThat(already, Equals(set()))
851        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
852        for (f,args,kwargs) in list(canary.disconnectors.values()):
853            f(*args, **kwargs)
854        del already
855        del writers
856
857        # that ought to delete the incoming shares
858        already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75)
859        self.assertThat(already, Equals(set()))
860        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
861
862    def test_reserved_space_immutable_lease(self):
863        """
864        If there is not enough available space to store an additional lease on an
865        immutable share then ``remote_add_lease`` fails with ``NoSpace`` when
866        an attempt is made to use it to create a new lease.
867        """
868        disk = FakeDisk(total=1024, used=0)
869        self.patch(fileutil, "get_disk_stats", disk.get_disk_stats)
870
871        ss = self.create("test_reserved_space_immutable_lease")
872
873        storage_index = b"x" * 16
874        renew_secret = b"r" * 32
875        cancel_secret = b"c" * 32
876        shares = {0: b"y" * 500}
877        upload_immutable(ss, storage_index, renew_secret, cancel_secret, shares)
878
879        # use up all the available space
880        disk.use(disk.available)
881
882        # Different secrets to produce a different lease, not a renewal.
883        renew_secret = b"R" * 32
884        cancel_secret = b"C" * 32
885        with self.assertRaises(interfaces.NoSpace):
886            ss.add_lease(storage_index, renew_secret, cancel_secret)
887
888    def test_reserved_space_mutable_lease(self):
889        """
890        If there is not enough available space to store an additional lease on a
891        mutable share then ``remote_add_lease`` fails with ``NoSpace`` when an
892        attempt is made to use it to create a new lease.
893        """
894        disk = FakeDisk(total=1024, used=0)
895        self.patch(fileutil, "get_disk_stats", disk.get_disk_stats)
896
897        ss = self.create("test_reserved_space_mutable_lease")
898
899        renew_secrets = iter(
900            "{}{}".format("r" * 31, i).encode("ascii")
901            for i
902            in range(5)
903        )
904
905        storage_index = b"x" * 16
906        write_enabler = b"w" * 32
907        cancel_secret = b"c" * 32
908        secrets = (write_enabler, next(renew_secrets), cancel_secret)
909        shares = {0: b"y" * 500}
910        upload_mutable(ss, storage_index, secrets, shares)
911
912        # use up all the available space
913        disk.use(disk.available)
914
915        # The upload created one lease.  There is room for three more leases
916        # in the share header.  Even if we're out of disk space, on a boring
917        # enough filesystem we can write these.
918        for i in range(3):
919            ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
920
921        # Having used all of the space for leases in the header, we would have
922        # to allocate storage for the next lease.  Since there is no space
923        # available, this must fail instead.
924        with self.assertRaises(interfaces.NoSpace):
925            ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
926
927
928    def test_reserved_space(self):
929        reserved = 10000
930        allocated = 0
931
932        def call_get_disk_stats(whichdir, reserved_space=0):
933            self.failUnlessEqual(reserved_space, reserved)
934            return {
935              'free_for_nonroot': 15000 - allocated,
936              'avail': max(15000 - allocated - reserved_space, 0),
937            }
938        self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
939
940        ss = FoolscapStorageServer(self.create("test_reserved_space", reserved_space=reserved))
941        # 15k available, 10k reserved, leaves 5k for shares
942
943        # a newly created and filled share incurs this much overhead, beyond
944        # the size we request.
945        OVERHEAD = 3*4
946        LEASE_SIZE = 4+32+32+4
947        renew_secret = b"r" * 32
948        cancel_secret = b"c" * 32
949        canary = FakeCanary()
950        already, writers = ss.remote_allocate_buckets(
951            b"vid1",
952            renew_secret,
953            cancel_secret,
954            sharenums=[0,1,2],
955            allocated_size=1000,
956            canary=canary,
957        )
958        self.assertThat(writers, HasLength(3))
959        # now the StorageServer should have 3000 bytes provisionally
960        # allocated, allowing only 2000 more to be claimed
961        self.assertThat(ss._server._bucket_writers, HasLength(3))
962
963        # allocating 1001-byte shares only leaves room for one
964        canary2 = FakeCanary()
965        already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
966        self.assertThat(writers2, HasLength(1))
967        self.assertThat(ss._server._bucket_writers, HasLength(4))
968
969        # we abandon the first set, so their provisional allocation should be
970        # returned
971        canary.disconnected()
972
973        self.assertThat(ss._server._bucket_writers, HasLength(1))
974        # now we have a provisional allocation of 1001 bytes
975
976        # and we close the second set, so their provisional allocation should
977        # become real, long-term allocation, and grows to include the
978        # overhead.
979        for bw in writers2.values():
980            bw.write(0, b"a"*25)
981            bw.close()
982        self.assertThat(ss._server._bucket_writers, HasLength(0))
983
984        # this also changes the amount reported as available by call_get_disk_stats
985        allocated = 1001 + OVERHEAD + LEASE_SIZE
986
987        # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
988        # 5000-1085=3915 free, therefore we can fit 39 100byte shares
989        canary3 = FakeCanary()
990        already3, writers3 = ss.remote_allocate_buckets(
991            b"vid3",
992            renew_secret,
993            cancel_secret,
994            sharenums=list(range(100)),
995            allocated_size=100,
996            canary=canary3,
997        )
998        self.assertThat(writers3, HasLength(39))
999        self.assertThat(ss._server._bucket_writers, HasLength(39))
1000
1001        canary3.disconnected()
1002
1003        self.assertThat(ss._server._bucket_writers, HasLength(0))
1004        ss._server.disownServiceParent()
1005        del ss
1006
1007    def test_seek(self):
1008        basedir = self.workdir("test_seek_behavior")
1009        fileutil.make_dirs(basedir)
1010        filename = os.path.join(basedir, "testfile")
1011        f = open(filename, "wb")
1012        f.write(b"start")
1013        f.close()
1014        # mode="w" allows seeking-to-create-holes, but truncates pre-existing
1015        # files. mode="a" preserves previous contents but does not allow
1016        # seeking-to-create-holes. mode="r+" allows both.
1017        f = open(filename, "rb+")
1018        f.seek(100)
1019        f.write(b"100")
1020        f.close()
1021        filelen = os.stat(filename)[stat.ST_SIZE]
1022        self.assertThat(filelen, Equals(100+3))
1023        f2 = open(filename, "rb")
1024        self.assertThat(f2.read(5), Equals(b"start"))
1025
1026    def create_bucket_5_shares(
1027            self, ss, storage_index, expected_already=0, expected_writers=5
1028    ):
1029        """
1030        Given a StorageServer, create a bucket with 5 shares and return renewal
1031        and cancellation secrets.
1032        """
1033        sharenums = list(range(5))
1034        size = 100
1035
1036        # Creating a bucket also creates a lease:
1037        rs, cs  = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1038                   hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1039        already, writers = ss.allocate_buckets(storage_index, rs, cs,
1040                                               sharenums, size)
1041        self.assertThat(already, HasLength(expected_already))
1042        self.assertThat(writers, HasLength(expected_writers))
1043        for wb in writers.values():
1044            wb.close()
1045        return rs, cs
1046
1047    def test_leases(self):
1048        ss = self.create("test_leases")
1049        sharenums = list(range(5))
1050        size = 100
1051
1052        # Create a bucket:
1053        rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
1054
1055        # Upload of an immutable implies creation of a single lease with the
1056        # supplied secrets.
1057        (lease,) = ss.get_leases(b"si0")
1058        self.assertTrue(lease.is_renew_secret(rs0))
1059
1060        rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
1061
1062        # take out a second lease on si1
1063        rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
1064        (lease1, lease2) = ss.get_leases(b"si1")
1065        self.assertTrue(lease1.is_renew_secret(rs1))
1066        self.assertTrue(lease2.is_renew_secret(rs2))
1067
1068        # and a third lease, using add-lease
1069        rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1070                     hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1071        ss.add_lease(b"si1", rs2a, cs2a)
1072        (lease1, lease2, lease3) = ss.get_leases(b"si1")
1073        self.assertTrue(lease1.is_renew_secret(rs1))
1074        self.assertTrue(lease2.is_renew_secret(rs2))
1075        self.assertTrue(lease3.is_renew_secret(rs2a))
1076
1077        # add-lease on a missing storage index is silently ignored
1078        self.assertThat(ss.add_lease(b"si18", b"", b""), Equals(None))
1079
1080        # check that si0 is readable
1081        readers = ss.get_buckets(b"si0")
1082        self.assertThat(readers, HasLength(5))
1083
1084        # renew the first lease. Only the proper renew_secret should work
1085        ss.renew_lease(b"si0", rs0)
1086        self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", cs0)
1087        self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", rs1)
1088
1089        # check that si0 is still readable
1090        readers = ss.get_buckets(b"si0")
1091        self.assertThat(readers, HasLength(5))
1092
1093        # There is no such method as remote_cancel_lease for now -- see
1094        # ticket #1528.
1095        self.assertFalse(hasattr(FoolscapStorageServer(ss), 'remote_cancel_lease'), \
1096                    "ss should not have a 'remote_cancel_lease' method/attribute")
1097
1098        # test overlapping uploads
1099        rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1100                   hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1101        rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1102                   hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1103        already,writers = ss.allocate_buckets(b"si3", rs3, cs3,
1104                                              sharenums, size)
1105        self.assertThat(already, HasLength(0))
1106        self.assertThat(writers, HasLength(5))
1107        already2,writers2 = ss.allocate_buckets(b"si3", rs4, cs4,
1108                                                sharenums, size)
1109        self.assertThat(already2, HasLength(0))
1110        self.assertThat(writers2, HasLength(0))
1111        for wb in writers.values():
1112            wb.close()
1113
1114        leases = list(ss.get_leases(b"si3"))
1115        self.assertThat(leases, HasLength(1))
1116
1117        already3,writers3 = ss.allocate_buckets(b"si3", rs4, cs4,
1118                                                sharenums, size)
1119        self.assertThat(already3, HasLength(5))
1120        self.assertThat(writers3, HasLength(0))
1121
1122        leases = list(ss.get_leases(b"si3"))
1123        self.assertThat(leases, HasLength(2))
1124
1125    def test_immutable_add_lease_renews(self):
1126        """
1127        Adding a lease on an already leased immutable with the same secret just
1128        renews it.
1129        """
1130        clock = Clock()
1131        clock.advance(123)
1132        ss = self.create("test_immutable_add_lease_renews", clock=clock)
1133
1134        # Start out with single lease created with bucket:
1135        renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
1136        [lease] = ss.get_leases(b"si0")
1137        self.assertThat(lease.get_expiration_time(), Equals(123 + DEFAULT_RENEWAL_TIME))
1138
1139        # Time passes:
1140        clock.advance(123456)
1141
1142        # Adding a lease with matching renewal secret just renews it:
1143        ss.add_lease(b"si0", renewal_secret, cancel_secret)
1144        [lease] = ss.get_leases(b"si0")
1145        self.assertThat(lease.get_expiration_time(), Equals(123 + 123456 + DEFAULT_RENEWAL_TIME))
1146
1147    def test_have_shares(self):
1148        """By default the StorageServer has no shares."""
1149        workdir = self.workdir("test_have_shares")
1150        ss = StorageServer(workdir, b"\x00" * 20, readonly_storage=True)
1151        self.assertFalse(ss.have_shares())
1152
1153    def test_readonly(self):
1154        workdir = self.workdir("test_readonly")
1155        ss = StorageServer(workdir, b"\x00" * 20, readonly_storage=True)
1156        ss.setServiceParent(self.sparent)
1157
1158        already,writers = self.allocate(ss, b"vid", [0,1,2], 75)
1159        self.assertThat(already, Equals(set()))
1160        self.assertThat(writers, Equals({}))
1161
1162        stats = ss.get_stats()
1163        self.assertThat(stats["storage_server.accepting_immutable_shares"], Equals(0))
1164        if "storage_server.disk_avail" in stats:
1165            # Some platforms may not have an API to get disk stats.
1166            # But if there are stats, readonly_storage means disk_avail=0
1167            self.assertThat(stats["storage_server.disk_avail"], Equals(0))
1168
1169    def test_discard(self):
1170        # discard is really only used for other tests, but we test it anyways
1171        workdir = self.workdir("test_discard")
1172        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1173        ss.setServiceParent(self.sparent)
1174
1175        already,writers = self.allocate(ss, b"vid", [0,1,2], 75)
1176        self.assertThat(already, Equals(set()))
1177        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
1178        for i,wb in writers.items():
1179            wb.write(0, b"%25d" % i)
1180            wb.close()
1181        # since we discard the data, the shares should be present but sparse.
1182        # Since we write with some seeks, the data we read back will be all
1183        # zeros.
1184        b = ss.get_buckets(b"vid")
1185        self.assertThat(set(b.keys()), Equals(set([0,1,2])))
1186        self.assertThat(b[0].read(0, 25), Equals(b"\x00" * 25))
1187
1188    def test_reserved_space_advise_corruption(self):
1189        """
1190        If there is no available space then ``remote_advise_corrupt_share`` does
1191        not write a corruption report.
1192        """
1193        disk = FakeDisk(total=1024, used=1024)
1194        self.patch(fileutil, "get_disk_stats", disk.get_disk_stats)
1195
1196        workdir = self.workdir("test_reserved_space_advise_corruption")
1197        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1198        ss.setServiceParent(self.sparent)
1199
1200        upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
1201        ss.advise_corrupt_share(b"immutable", b"si0", 0,
1202                                b"This share smells funny.\n")
1203
1204        self.assertThat(
1205            [],
1206            Equals(os.listdir(ss.corruption_advisory_dir)),
1207        )
1208
1209    def test_advise_corruption(self):
1210        workdir = self.workdir("test_advise_corruption")
1211        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1212        ss.setServiceParent(self.sparent)
1213
1214        si0_s = base32.b2a(b"si0")
1215        upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
1216        ss.advise_corrupt_share(b"immutable", b"si0", 0,
1217                                b"This share smells funny.\n")
1218        reportdir = os.path.join(workdir, "corruption-advisories")
1219        reports = os.listdir(reportdir)
1220        self.assertThat(reports, HasLength(1))
1221        report_si0 = reports[0]
1222        self.assertThat(report_si0, Contains(ensure_str(si0_s)))
1223        f = open(os.path.join(reportdir, report_si0), "rb")
1224        report = f.read()
1225        f.close()
1226        self.assertThat(report, Contains(b"type: immutable"))
1227        self.assertThat(report, Contains(b"storage_index: %s" % si0_s))
1228        self.assertThat(report, Contains(b"share_number: 0"))
1229        self.assertThat(report, Contains(b"This share smells funny."))
1230
1231        # test the RIBucketWriter version too
1232        si1_s = base32.b2a(b"si1")
1233        already,writers = self.allocate(ss, b"si1", [1], 75)
1234        self.assertThat(already, Equals(set()))
1235        self.assertThat(set(writers.keys()), Equals(set([1])))
1236        writers[1].write(0, b"data")
1237        writers[1].close()
1238
1239        b = ss.get_buckets(b"si1")
1240        self.assertThat(set(b.keys()), Equals(set([1])))
1241        b[1].advise_corrupt_share(b"This share tastes like dust.\n")
1242
1243        reports = os.listdir(reportdir)
1244        self.assertThat(reports, HasLength(2))
1245        report_si1 = [r for r in reports if si1_s.decode() in r][0]
1246        f = open(os.path.join(reportdir, report_si1), "rb")
1247        report = f.read()
1248        f.close()
1249        self.assertThat(report, Contains(b"type: immutable"))
1250        self.assertThat(report, Contains(b"storage_index: %s" % si1_s))
1251        self.assertThat(report, Contains(b"share_number: 1"))
1252        self.assertThat(report, Contains(b"This share tastes like dust."))
1253
1254    def test_advise_corruption_missing(self):
1255        """
1256        If a corruption advisory is received for a share that is not present on
1257        this server then it is not persisted.
1258        """
1259        workdir = self.workdir("test_advise_corruption_missing")
1260        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1261        ss.setServiceParent(self.sparent)
1262
1263        # Upload one share for this storage index
1264        upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
1265
1266        # And try to submit a corruption advisory about a different share
1267        ss.advise_corrupt_share(b"immutable", b"si0", 1,
1268                                b"This share smells funny.\n")
1269
1270        self.assertThat(
1271            [],
1272            Equals(os.listdir(ss.corruption_advisory_dir)),
1273        )
1274
1275
1276class MutableServer(SyncTestCase):
1277
1278    def setUp(self):
1279        super(MutableServer, self).setUp()
1280        self.sparent = LoggingServiceParent()
1281        self._lease_secret = itertools.count()
1282        self.addCleanup(self.sparent.stopService)
1283
1284    def workdir(self, name):
1285        basedir = os.path.join("storage", "MutableServer", name)
1286        return basedir
1287
1288    def create(self, name, clock=None):
1289        workdir = self.workdir(name)
1290        if clock is None:
1291            clock = Clock()
1292        ss = StorageServer(workdir, b"\x00" * 20,
1293                           clock=clock)
1294        ss.setServiceParent(self.sparent)
1295        return ss
1296
1297    def test_create(self):
1298        self.create("test_create")
1299
1300    def write_enabler(self, we_tag):
1301        return hashutil.tagged_hash(b"we_blah", we_tag)
1302
1303    def renew_secret(self, tag):
1304        if isinstance(tag, int):
1305            tag = b"%d" % (tag,)
1306        self.assertThat(tag, IsInstance(bytes))
1307        return hashutil.tagged_hash(b"renew_blah", tag)
1308
1309    def cancel_secret(self, tag):
1310        if isinstance(tag, int):
1311            tag = b"%d" % (tag,)
1312        self.assertThat(tag, IsInstance(bytes))
1313        return hashutil.tagged_hash(b"cancel_blah", tag)
1314
1315    def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
1316        write_enabler = self.write_enabler(we_tag)
1317        renew_secret = self.renew_secret(lease_tag)
1318        cancel_secret = self.cancel_secret(lease_tag)
1319        rstaraw = ss.slot_testv_and_readv_and_writev
1320        testandwritev = dict( [ (shnum, ([], [], None) )
1321                         for shnum in sharenums ] )
1322        readv = []
1323        rc = rstaraw(storage_index,
1324                     (write_enabler, renew_secret, cancel_secret),
1325                     testandwritev,
1326                     readv)
1327        (did_write, readv_data) = rc
1328        self.assertTrue(did_write)
1329        self.assertThat(readv_data, IsInstance(dict))
1330        self.assertThat(readv_data, HasLength(0))
1331
1332    def test_enumerate_mutable_shares(self):
1333        """
1334        ``StorageServer.enumerate_mutable_shares()`` returns a set of share
1335        numbers for the given storage index, or an empty set if it does not
1336        exist at all.
1337        """
1338        ss = self.create("test_enumerate_mutable_shares")
1339
1340        # Initially, nothing exists:
1341        empty = ss.enumerate_mutable_shares(b"si1")
1342
1343        self.allocate(ss, b"si1", b"we1", b"le1", [0, 1, 4, 2], 12)
1344        shares0_1_2_4 = ss.enumerate_mutable_shares(b"si1")
1345
1346        # Remove share 2, by setting size to 0:
1347        secrets = (self.write_enabler(b"we1"),
1348                   self.renew_secret(b"le1"),
1349                   self.cancel_secret(b"le1"))
1350        ss.slot_testv_and_readv_and_writev(b"si1", secrets, {2: ([], [], 0)}, [])
1351        shares0_1_4 = ss.enumerate_mutable_shares(b"si1")
1352        self.assertThat(
1353            (empty, shares0_1_2_4, shares0_1_4),
1354            Equals((set(), {0, 1, 2, 4}, {0, 1, 4}))
1355        )
1356
1357    def test_mutable_share_length(self):
1358        """``get_mutable_share_length()`` returns the length of the share."""
1359        ss = self.create("test_mutable_share_length")
1360        self.allocate(ss, b"si1", b"we1", b"le1", [16], 23)
1361        ss.slot_testv_and_readv_and_writev(
1362            b"si1", (self.write_enabler(b"we1"),
1363                     self.renew_secret(b"le1"),
1364                     self.cancel_secret(b"le1")),
1365            {16: ([], [(0, b"x" * 23)], None)},
1366            []
1367        )
1368        self.assertThat(ss.get_mutable_share_length(b"si1", 16), Equals(23))
1369
1370    def test_mutable_share_length_unknown(self):
1371        """
1372        ``get_mutable_share_length()`` raises a ``KeyError`` on unknown shares.
1373        """
1374        ss = self.create("test_mutable_share_length_unknown")
1375        self.allocate(ss, b"si1", b"we1", b"le1", [16], 23)
1376        ss.slot_testv_and_readv_and_writev(
1377            b"si1", (self.write_enabler(b"we1"),
1378                     self.renew_secret(b"le1"),
1379                     self.cancel_secret(b"le1")),
1380            {16: ([], [(0, b"x" * 23)], None)},
1381            []
1382        )
1383        with self.assertRaises(KeyError):
1384            # Wrong share number.
1385            ss.get_mutable_share_length(b"si1", 17)
1386        with self.assertRaises(KeyError):
1387            # Wrong storage index
1388            ss.get_mutable_share_length(b"unknown", 16)
1389
1390    def test_bad_magic(self):
1391        ss = self.create("test_bad_magic")
1392        self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0]), 10)
1393        fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0")
1394        f = open(fn, "rb+")
1395        f.seek(0)
1396        f.write(b"BAD MAGIC")
1397        f.close()
1398        read = ss.slot_readv
1399        e = self.failUnlessRaises(UnknownMutableContainerVersionError,
1400                                  read, b"si1", [0], [(0,10)])
1401        self.assertThat(e.filename, Equals(fn))
1402        self.assertTrue(e.version.startswith(b"BAD MAGIC"))
1403        self.assertThat(str(e), Contains("had unexpected version"))
1404        self.assertThat(str(e), Contains("BAD MAGIC"))
1405
1406    def test_container_size(self):
1407        ss = self.create("test_container_size")
1408        self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
1409                      set([0,1,2]), 100)
1410        read = ss.slot_readv
1411        rstaraw = ss.slot_testv_and_readv_and_writev
1412        secrets = ( self.write_enabler(b"we1"),
1413                    self.renew_secret(b"we1"),
1414                    self.cancel_secret(b"we1") )
1415        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1416        answer = rstaraw(b"si1", secrets,
1417                         {0: ([], [(0,data)], len(data)+12)},
1418                         [])
1419        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1420
1421        # Trying to make the container too large (by sending a write vector
1422        # whose offset is too high) will raise an exception.
1423        TOOBIG = MutableShareFile.MAX_SIZE + 10
1424        self.failUnlessRaises(DataTooLargeError,
1425                              rstaraw, b"si1", secrets,
1426                              {0: ([], [(TOOBIG,data)], None)},
1427                              [])
1428
1429        answer = rstaraw(b"si1", secrets,
1430                         {0: ([], [(0,data)], None)},
1431                         [])
1432        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1433
1434        read_answer = read(b"si1", [0], [(0,10)])
1435        self.assertThat(read_answer, Equals({0: [data[:10]]}))
1436
1437        # Sending a new_length shorter than the current length truncates the
1438        # data.
1439        answer = rstaraw(b"si1", secrets,
1440                         {0: ([], [], 9)},
1441                         [])
1442        read_answer = read(b"si1", [0], [(0,10)])
1443        self.assertThat(read_answer, Equals({0: [data[:9]]}))
1444
1445        # Sending a new_length longer than the current length doesn't change
1446        # the data.
1447        answer = rstaraw(b"si1", secrets,
1448                         {0: ([], [], 20)},
1449                         [])
1450        assert answer == (True, {0:[],1:[],2:[]})
1451        read_answer = read(b"si1", [0], [(0, 20)])
1452        self.assertThat(read_answer, Equals({0: [data[:9]]}))
1453
1454        # Sending a write vector whose start is after the end of the current
1455        # data doesn't reveal "whatever was there last time" (palimpsest),
1456        # but instead fills with zeroes.
1457
1458        # To test this, we fill the data area with a recognizable pattern.
1459        pattern = u''.join([chr(i) for i in range(100)]).encode("utf-8")
1460        answer = rstaraw(b"si1", secrets,
1461                         {0: ([], [(0, pattern)], None)},
1462                         [])
1463        assert answer == (True, {0:[],1:[],2:[]})
1464        # Then truncate the data...
1465        answer = rstaraw(b"si1", secrets,
1466                         {0: ([], [], 20)},
1467                         [])
1468        assert answer == (True, {0:[],1:[],2:[]})
1469        # Just confirm that you get an empty string if you try to read from
1470        # past the (new) endpoint now.
1471        answer = rstaraw(b"si1", secrets,
1472                         {0: ([], [], None)},
1473                         [(20, 1980)])
1474        self.assertThat(answer, Equals((True, {0:[b''],1:[b''],2:[b'']})))
1475
1476        # Then the extend the file by writing a vector which starts out past
1477        # the end...
1478        answer = rstaraw(b"si1", secrets,
1479                         {0: ([], [(50, b'hellothere')], None)},
1480                         [])
1481        assert answer == (True, {0:[],1:[],2:[]})
1482        # Now if you read the stuff between 20 (where we earlier truncated)
1483        # and 50, it had better be all zeroes.
1484        answer = rstaraw(b"si1", secrets,
1485                         {0: ([], [], None)},
1486                         [(20, 30)])
1487        self.assertThat(answer, Equals((True, {0:[b'\x00'*30],1:[b''],2:[b'']})))
1488
1489        # Also see if the server explicitly declares that it supports this
1490        # feature.
1491        ver = ss.get_version()
1492        storage_v1_ver = ver[b"http://allmydata.org/tahoe/protocols/storage/v1"]
1493        self.assertTrue(storage_v1_ver.get(b"fills-holes-with-zero-bytes"))
1494
1495        # If the size is dropped to zero the share is deleted.
1496        answer = rstaraw(b"si1", secrets,
1497                         {0: ([], [(0,data)], 0)},
1498                         [])
1499        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1500
1501        read_answer = read(b"si1", [0], [(0,10)])
1502        self.assertThat(read_answer, Equals({}))
1503
1504    def test_allocate(self):
1505        ss = self.create("test_allocate")
1506        self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
1507                      set([0,1,2]), 100)
1508
1509        read = ss.slot_readv
1510        self.assertThat(read(b"si1", [0], [(0, 10)]),
1511                             Equals({0: [b""]}))
1512        self.assertThat(read(b"si1", [], [(0, 10)]),
1513                             Equals({0: [b""], 1: [b""], 2: [b""]}))
1514        self.assertThat(read(b"si1", [0], [(100, 10)]),
1515                             Equals({0: [b""]}))
1516
1517        # try writing to one
1518        secrets = ( self.write_enabler(b"we1"),
1519                    self.renew_secret(b"we1"),
1520                    self.cancel_secret(b"we1") )
1521        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1522        write = ss.slot_testv_and_readv_and_writev
1523        answer = write(b"si1", secrets,
1524                       {0: ([], [(0,data)], None)},
1525                       [])
1526        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1527
1528        self.assertThat(read(b"si1", [0], [(0,20)]),
1529                             Equals({0: [b"00000000001111111111"]}))
1530        self.assertThat(read(b"si1", [0], [(95,10)]),
1531                             Equals({0: [b"99999"]}))
1532        #self.failUnlessEqual(s0.get_length(), 100)
1533
1534        bad_secrets = (b"bad write enabler", secrets[1], secrets[2])
1535        f = self.failUnlessRaises(BadWriteEnablerError,
1536                                  write, b"si1", bad_secrets,
1537                                  {}, [])
1538        self.assertThat(str(f), Contains("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'."))
1539
1540        # this testv should fail
1541        answer = write(b"si1", secrets,
1542                       {0: ([(0, 12, b"eq", b"444444444444"),
1543                             (20, 5, b"eq", b"22222"),
1544                             ],
1545                            [(0, b"x"*100)],
1546                            None),
1547                        },
1548                       [(0,12), (20,5)],
1549                       )
1550        self.assertThat(answer, Equals((False,
1551                                      {0: [b"000000000011", b"22222"],
1552                                       1: [b"", b""],
1553                                       2: [b"", b""],
1554                                       })))
1555        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [data]}))
1556
1557    def test_operators(self):
1558        # test operators, the data we're comparing is '11111' in all cases.
1559        # test both fail+pass, reset data after each one.
1560        ss = self.create("test_operators")
1561
1562        secrets = ( self.write_enabler(b"we1"),
1563                    self.renew_secret(b"we1"),
1564                    self.cancel_secret(b"we1") )
1565        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1566        write = ss.slot_testv_and_readv_and_writev
1567        read = ss.slot_readv
1568
1569        def reset():
1570            write(b"si1", secrets,
1571                  {0: ([], [(0,data)], None)},
1572                  [])
1573
1574        reset()
1575
1576        #  eq
1577        answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11112"),
1578                                             ],
1579                                            [(0, b"x"*100)],
1580                                            None,
1581                                            )}, [(10,5)])
1582        self.assertThat(answer, Equals((False, {0: [b"11111"]})))
1583        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [data]}))
1584        reset()
1585
1586        answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11111"),
1587                                             ],
1588                                            [(0, b"y"*100)],
1589                                            None,
1590                                            )}, [(10,5)])
1591        self.assertThat(answer, Equals((True, {0: [b"11111"]})))
1592        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [b"y"*100]}))
1593        reset()
1594
1595        # finally, test some operators against empty shares
1596        answer = write(b"si1", secrets, {1: ([(10, 5, b"eq", b"11112"),
1597                                             ],
1598                                            [(0, b"x"*100)],
1599                                            None,
1600                                            )}, [(10,5)])
1601        self.assertThat(answer, Equals((False, {0: [b"11111"]})))
1602        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [data]}))
1603        reset()
1604
1605    def test_readv(self):
1606        ss = self.create("test_readv")
1607        secrets = ( self.write_enabler(b"we1"),
1608                    self.renew_secret(b"we1"),
1609                    self.cancel_secret(b"we1") )
1610        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1611        write = ss.slot_testv_and_readv_and_writev
1612        read = ss.slot_readv
1613        data = [(b"%d" % i) * 100 for i in range(3)]
1614        rc = write(b"si1", secrets,
1615                   {0: ([], [(0,data[0])], None),
1616                    1: ([], [(0,data[1])], None),
1617                    2: ([], [(0,data[2])], None),
1618                    }, [])
1619        self.assertThat(rc, Equals((True, {})))
1620
1621        answer = read(b"si1", [], [(0, 10)])
1622        self.assertThat(answer, Equals({0: [b"0"*10],
1623                                      1: [b"1"*10],
1624                                      2: [b"2"*10]}))
1625
1626    def compare_leases_without_timestamps(self, leases_a, leases_b):
1627        """
1628        Assert that, except for expiration times, ``leases_a`` contains the same
1629        lease information as ``leases_b``.
1630        """
1631        for a, b in zip(leases_a, leases_b):
1632            # The leases aren't always of the same type (though of course
1633            # corresponding elements in the two lists should be of the same
1634            # type as each other) so it's inconvenient to just reach in and
1635            # normalize the expiration timestamp.  We don't want to call
1636            # `renew` on both objects to normalize the expiration timestamp in
1637            # case `renew` is broken and gives us back equal outputs from
1638            # non-equal inputs (expiration timestamp aside).  It seems
1639            # reasonably safe to use `renew` to make _one_ of the timestamps
1640            # equal to the other though.
1641            self.assertThat(
1642                a.renew(b.get_expiration_time()),
1643                Equals(b),
1644            )
1645        self.assertThat(len(leases_a), Equals(len(leases_b)))
1646
1647    def test_leases(self):
1648        ss = self.create("test_leases")
1649        def secrets(n):
1650            return ( self.write_enabler(b"we1"),
1651                     self.renew_secret(b"we1-%d" % n),
1652                     self.cancel_secret(b"we1-%d" % n) )
1653        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1654        write = ss.slot_testv_and_readv_and_writev
1655        read = ss.slot_readv
1656        rc = write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1657        self.assertThat(rc, Equals((True, {})))
1658
1659        # create a random non-numeric file in the bucket directory, to
1660        # exercise the code that's supposed to ignore those.
1661        bucket_dir = os.path.join(self.workdir("test_leases"),
1662                                  "shares", storage_index_to_dir(b"si1"))
1663        f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1664        f.write("you ought to be ignoring me\n")
1665        f.close()
1666
1667        s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1668        self.assertThat(list(s0.get_leases()), HasLength(1))
1669
1670        # add-lease on a missing storage index is silently ignored
1671        self.assertThat(ss.add_lease(b"si18", b"", b""), Equals(None))
1672
1673        # re-allocate the slots and use the same secrets, that should update
1674        # the lease
1675        write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1676        self.assertThat(list(s0.get_leases()), HasLength(1))
1677
1678        # renew it directly
1679        ss.renew_lease(b"si1", secrets(0)[1])
1680        self.assertThat(list(s0.get_leases()), HasLength(1))
1681
1682        # now allocate them with a bunch of different secrets, to trigger the
1683        # extended lease code. Use add_lease for one of them.
1684        write(b"si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1685        self.assertThat(list(s0.get_leases()), HasLength(2))
1686        secrets2 = secrets(2)
1687        ss.add_lease(b"si1", secrets2[1], secrets2[2])
1688        self.assertThat(list(s0.get_leases()), HasLength(3))
1689        write(b"si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1690        write(b"si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1691        write(b"si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1692
1693        self.assertThat(list(s0.get_leases()), HasLength(6))
1694
1695        all_leases = list(s0.get_leases())
1696        # and write enough data to expand the container, forcing the server
1697        # to move the leases
1698        write(b"si1", secrets(0),
1699              {0: ([], [(0,data)], 200), },
1700              [])
1701
1702        # read back the leases, make sure they're still intact.
1703        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1704
1705        ss.renew_lease(b"si1", secrets(0)[1])
1706        ss.renew_lease(b"si1", secrets(1)[1])
1707        ss.renew_lease(b"si1", secrets(2)[1])
1708        ss.renew_lease(b"si1", secrets(3)[1])
1709        ss.renew_lease(b"si1", secrets(4)[1])
1710        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1711        # get a new copy of the leases, with the current timestamps. Reading
1712        # data and failing to renew/cancel leases should leave the timestamps
1713        # alone.
1714        all_leases = list(s0.get_leases())
1715        # renewing with a bogus token should prompt an error message
1716
1717        # examine the exception thus raised, make sure the old nodeid is
1718        # present, to provide for share migration
1719        e = self.failUnlessRaises(IndexError,
1720                                  ss.renew_lease, b"si1",
1721                                  secrets(20)[1])
1722        e_s = str(e)
1723        self.assertThat(e_s, Contains("Unable to renew non-existent lease"))
1724        self.assertThat(e_s, Contains("I have leases accepted by nodeids:"))
1725        self.assertThat(e_s, Contains("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ."))
1726
1727        self.assertThat(all_leases, Equals(list(s0.get_leases())))
1728
1729        # reading shares should not modify the timestamp
1730        read(b"si1", [], [(0,200)])
1731        self.assertThat(all_leases, Equals(list(s0.get_leases())))
1732
1733        write(b"si1", secrets(0),
1734              {0: ([], [(200, b"make me bigger")], None)}, [])
1735        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1736
1737        write(b"si1", secrets(0),
1738              {0: ([], [(500, b"make me really bigger")], None)}, [])
1739        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1740
1741    def test_mutable_add_lease_renews(self):
1742        """
1743        Adding a lease on an already leased mutable with the same secret just
1744        renews it.
1745        """
1746        clock = Clock()
1747        clock.advance(235)
1748        ss = self.create("test_mutable_add_lease_renews",
1749                         clock=clock)
1750        def secrets(n):
1751            return ( self.write_enabler(b"we1"),
1752                     self.renew_secret(b"we1-%d" % n),
1753                     self.cancel_secret(b"we1-%d" % n) )
1754        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1755        write = ss.slot_testv_and_readv_and_writev
1756        write_enabler, renew_secret, cancel_secret = secrets(0)
1757        rc = write(b"si1", (write_enabler, renew_secret, cancel_secret),
1758                   {0: ([], [(0,data)], None)}, [])
1759        self.assertThat(rc, Equals((True, {})))
1760
1761        bucket_dir = os.path.join(self.workdir("test_mutable_add_lease_renews"),
1762                                  "shares", storage_index_to_dir(b"si1"))
1763        s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1764        [lease] = s0.get_leases()
1765        self.assertThat(lease.get_expiration_time(), Equals(235 + DEFAULT_RENEWAL_TIME))
1766
1767        # Time passes...
1768        clock.advance(835)
1769
1770        # Adding a lease renews it:
1771        ss.add_lease(b"si1", renew_secret, cancel_secret)
1772        [lease] = s0.get_leases()
1773        self.assertThat(lease.get_expiration_time(),
1774                         Equals(235 + 835 + DEFAULT_RENEWAL_TIME))
1775
1776    def test_remove(self):
1777        ss = self.create("test_remove")
1778        self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
1779                      set([0,1,2]), 100)
1780        readv = ss.slot_readv
1781        writev = ss.slot_testv_and_readv_and_writev
1782        secrets = ( self.write_enabler(b"we1"),
1783                    self.renew_secret(b"we1"),
1784                    self.cancel_secret(b"we1") )
1785        # delete sh0 by setting its size to zero
1786        answer = writev(b"si1", secrets,
1787                        {0: ([], [], 0)},
1788                        [])
1789        # the answer should mention all the shares that existed before the
1790        # write
1791        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1792        # but a new read should show only sh1 and sh2
1793        self.assertThat(readv(b"si1", [], [(0,10)]),
1794                             Equals({1: [b""], 2: [b""]}))
1795
1796        # delete sh1 by setting its size to zero
1797        answer = writev(b"si1", secrets,
1798                        {1: ([], [], 0)},
1799                        [])
1800        self.assertThat(answer, Equals((True, {1:[],2:[]})))
1801        self.assertThat(readv(b"si1", [], [(0,10)]),
1802                             Equals({2: [b""]}))
1803
1804        # delete sh2 by setting its size to zero
1805        answer = writev(b"si1", secrets,
1806                        {2: ([], [], 0)},
1807                        [])
1808        self.assertThat(answer, Equals((True, {2:[]})))
1809        self.assertThat(readv(b"si1", [], [(0,10)]),
1810                             Equals({}))
1811        # and the bucket directory should now be gone
1812        si = base32.b2a(b"si1").decode()
1813        # note: this is a detail of the storage server implementation, and
1814        # may change in the future
1815        # filesystem paths are native strings
1816        prefix = si[:2]
1817        prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1818        bucketdir = os.path.join(prefixdir, si)
1819        self.assertTrue(os.path.exists(prefixdir), prefixdir)
1820        self.assertFalse(os.path.exists(bucketdir), bucketdir)
1821
1822    def test_writev_without_renew_lease(self):
1823        """
1824        The helper method ``slot_testv_and_readv_and_writev`` does not renew
1825        leases if ``False`` is passed for the ``renew_leases`` parameter.
1826        """
1827        ss = self.create("test_writev_without_renew_lease")
1828
1829        storage_index = b"si2"
1830        secrets = (
1831            self.write_enabler(storage_index),
1832            self.renew_secret(storage_index),
1833            self.cancel_secret(storage_index),
1834        )
1835
1836        sharenum = 3
1837        datav = [(0, b"Hello, world")]
1838
1839        ss.slot_testv_and_readv_and_writev(
1840            storage_index=storage_index,
1841            secrets=secrets,
1842            test_and_write_vectors={
1843                sharenum: ([], datav, None),
1844            },
1845            read_vector=[],
1846            renew_leases=False,
1847        )
1848        leases = list(ss.get_slot_leases(storage_index))
1849        self.assertThat([], Equals(leases))
1850
1851    def test_get_slot_leases_empty_slot(self):
1852        """
1853        When ``get_slot_leases`` is called for a slot for which the server has no
1854        shares, it returns an empty iterable.
1855        """
1856        ss = self.create("test_get_slot_leases_empty_slot")
1857        self.assertThat(
1858            list(ss.get_slot_leases(b"si1")),
1859            Equals([]),
1860        )
1861
1862    def test_remove_non_present(self):
1863        """
1864        A write vector which would remove a share completely is applied as a no-op
1865        by a server which does not have the share.
1866        """
1867        ss = self.create("test_remove_non_present")
1868
1869        storage_index = b"si1"
1870        secrets = (
1871            self.write_enabler(storage_index),
1872            self.renew_secret(storage_index),
1873            self.cancel_secret(storage_index),
1874        )
1875
1876        sharenum = 3
1877        testv = []
1878        datav = []
1879        new_length = 0
1880        read_vector = []
1881
1882        # We don't even need to create any shares to exercise this
1883        # functionality.  Just go straight to sending a truncate-to-zero
1884        # write.
1885        testv_is_good, read_data = ss.slot_testv_and_readv_and_writev(
1886            storage_index=storage_index,
1887            secrets=secrets,
1888            test_and_write_vectors={
1889                sharenum: (testv, datav, new_length),
1890            },
1891            read_vector=read_vector,
1892        )
1893
1894        self.assertTrue(testv_is_good)
1895        self.assertThat({}, Equals(read_data))
1896
1897
1898class MDMFProxies(AsyncTestCase, ShouldFailMixin):
1899    def setUp(self):
1900        super(MDMFProxies, self).setUp()
1901        self.sparent = LoggingServiceParent()
1902        self._lease_secret = itertools.count()
1903        self.ss = self.create("MDMFProxies storage test server")
1904        self.rref = RemoteBucket(FoolscapStorageServer(self.ss))
1905        self.storage_server = _StorageServer(lambda: self.rref)
1906        self.secrets = (self.write_enabler(b"we_secret"),
1907                        self.renew_secret(b"renew_secret"),
1908                        self.cancel_secret(b"cancel_secret"))
1909        self.segment = b"aaaaaa"
1910        self.block = b"aa"
1911        self.salt = b"a" * 16
1912        self.block_hash = b"a" * 32
1913        self.block_hash_tree = [self.block_hash for i in range(6)]
1914        self.share_hash = self.block_hash
1915        self.share_hash_chain = dict([(i, self.share_hash) for i in range(6)])
1916        self.signature = b"foobarbaz"
1917        self.verification_key = b"vvvvvv"
1918        self.encprivkey = b"private"
1919        self.root_hash = self.block_hash
1920        self.salt_hash = self.root_hash
1921        self.salt_hash_tree = [self.salt_hash for i in range(6)]
1922        self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1923        self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1924        # blockhashes and salt hashes are serialized in the same way,
1925        # only we lop off the first element and store that in the
1926        # header.
1927        self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1928
1929
1930    def tearDown(self):
1931        super(MDMFProxies, self).tearDown()
1932        self.sparent.stopService()
1933        shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1934
1935
1936    def write_enabler(self, we_tag):
1937        return hashutil.tagged_hash(b"we_blah", we_tag)
1938
1939
1940    def renew_secret(self, tag):
1941        if isinstance(tag, int):
1942            tag = b"%d" % tag
1943        return hashutil.tagged_hash(b"renew_blah", tag)
1944
1945
1946    def cancel_secret(self, tag):
1947        if isinstance(tag, int):
1948            tag = b"%d" % tag
1949        return hashutil.tagged_hash(b"cancel_blah", tag)
1950
1951
1952    def workdir(self, name):
1953        basedir = os.path.join("storage", "MutableServer", name)
1954        return basedir
1955
1956
1957    def create(self, name):
1958        workdir = self.workdir(name)
1959        ss = StorageServer(workdir, b"\x00" * 20)
1960        ss.setServiceParent(self.sparent)
1961        return ss
1962
1963
1964    def build_test_mdmf_share(self, tail_segment=False, empty=False):
1965        # Start with the checkstring
1966        data = struct.pack(">BQ32s",
1967                           1,
1968                           0,
1969                           self.root_hash)
1970        self.checkstring = data
1971        # Next, the encoding parameters
1972        if tail_segment:
1973            data += struct.pack(">BBQQ",
1974                                3,
1975                                10,
1976                                6,
1977                                33)
1978        elif empty:
1979            data += struct.pack(">BBQQ",
1980                                3,
1981                                10,
1982                                0,
1983                                0)
1984        else:
1985            data += struct.pack(">BBQQ",
1986                                3,
1987                                10,
1988                                6,
1989                                36)
1990        # Now we'll build the offsets.
1991        sharedata = b""
1992        if not tail_segment and not empty:
1993            for i in range(6):
1994                sharedata += self.salt + self.block
1995        elif tail_segment:
1996            for i in range(5):
1997                sharedata += self.salt + self.block
1998            sharedata += self.salt + b"a"
1999
2000        # The encrypted private key comes after the shares + salts
2001        offset_size = struct.calcsize(MDMFOFFSETS)
2002        encrypted_private_key_offset = len(data) + offset_size
2003        # The share has chain comes after the private key
2004        sharehashes_offset = encrypted_private_key_offset + \
2005            len(self.encprivkey)
2006
2007        # The signature comes after the share hash chain.
2008        signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
2009
2010        verification_key_offset = signature_offset + len(self.signature)
2011        verification_key_end = verification_key_offset + \
2012            len(self.verification_key)
2013
2014        share_data_offset = offset_size
2015        share_data_offset += PRIVATE_KEY_SIZE
2016        share_data_offset += SIGNATURE_SIZE
2017        share_data_offset += VERIFICATION_KEY_SIZE
2018        share_data_offset += SHARE_HASH_CHAIN_SIZE
2019
2020        blockhashes_offset = share_data_offset + len(sharedata)
2021        eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
2022
2023        data += struct.pack(MDMFOFFSETS,
2024                            encrypted_private_key_offset,
2025                            sharehashes_offset,
2026                            signature_offset,
2027                            verification_key_offset,
2028                            verification_key_end,
2029                            share_data_offset,
2030                            blockhashes_offset,
2031                            eof_offset)
2032
2033        self.offsets = {}
2034        self.offsets['enc_privkey'] = encrypted_private_key_offset
2035        self.offsets['block_hash_tree'] = blockhashes_offset
2036        self.offsets['share_hash_chain'] = sharehashes_offset
2037        self.offsets['signature'] = signature_offset
2038        self.offsets['verification_key'] = verification_key_offset
2039        self.offsets['share_data'] = share_data_offset
2040        self.offsets['verification_key_end'] = verification_key_end
2041        self.offsets['EOF'] = eof_offset
2042
2043        # the private key,
2044        data += self.encprivkey
2045        # the sharehashes
2046        data += self.share_hash_chain_s
2047        # the signature,
2048        data += self.signature
2049        # and the verification key
2050        data += self.verification_key
2051        # Then we'll add in gibberish until we get to the right point.
2052        nulls = b"".join([b" " for i in range(len(data), share_data_offset)])
2053        data += nulls
2054
2055        # Then the share data
2056        data += sharedata
2057        # the blockhashes
2058        data += self.block_hash_tree_s
2059        return data
2060
2061
2062    def write_test_share_to_server(self,
2063                                   storage_index,
2064                                   tail_segment=False,
2065                                   empty=False):
2066        """
2067        I write some data for the read tests to read to self.ss
2068
2069        If tail_segment=True, then I will write a share that has a
2070        smaller tail segment than other segments.
2071        """
2072        write = self.ss.slot_testv_and_readv_and_writev
2073        data = self.build_test_mdmf_share(tail_segment, empty)
2074        # Finally, we write the whole thing to the storage server in one
2075        # pass.
2076        testvs = [(0, 1, b"eq", b"")]
2077        tws = {}
2078        tws[0] = (testvs, [(0, data)], None)
2079        readv = [(0, 1)]
2080        results = write(storage_index, self.secrets, tws, readv)
2081        self.assertTrue(results[0])
2082
2083
2084    def build_test_sdmf_share(self, empty=False):
2085        if empty:
2086            sharedata = b""
2087        else:
2088            sharedata = self.segment * 6
2089        self.sharedata = sharedata
2090        blocksize = len(sharedata) // 3
2091        block = sharedata[:blocksize]
2092        self.blockdata = block
2093        prefix = struct.pack(">BQ32s16s BBQQ",
2094                             0, # version,
2095                             0,
2096                             self.root_hash,
2097                             self.salt,
2098                             3,
2099                             10,
2100                             len(sharedata),
2101                             len(sharedata),
2102                            )
2103        post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
2104        signature_offset = post_offset + len(self.verification_key)
2105        sharehashes_offset = signature_offset + len(self.signature)
2106        blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
2107        sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
2108        encprivkey_offset = sharedata_offset + len(block)
2109        eof_offset = encprivkey_offset + len(self.encprivkey)
2110        offsets = struct.pack(">LLLLQQ",
2111                              signature_offset,
2112                              sharehashes_offset,
2113                              blockhashes_offset,
2114                              sharedata_offset,
2115                              encprivkey_offset,
2116                              eof_offset)
2117        final_share = b"".join([prefix,
2118                                offsets,
2119                                self.verification_key,
2120                                self.signature,
2121                                self.share_hash_chain_s,
2122                                self.block_hash_tree_s,
2123                                block,
2124                                self.encprivkey])
2125        self.offsets = {}
2126        self.offsets['signature'] = signature_offset
2127        self.offsets['share_hash_chain'] = sharehashes_offset
2128        self.offsets['block_hash_tree'] = blockhashes_offset
2129        self.offsets['share_data'] = sharedata_offset
2130        self.offsets['enc_privkey'] = encprivkey_offset
2131        self.offsets['EOF'] = eof_offset
2132        return final_share
2133
2134
2135    def write_sdmf_share_to_server(self,
2136                                   storage_index,
2137                                   empty=False):
2138        # Some tests need SDMF shares to verify that we can still
2139        # read them. This method writes one, which resembles but is not
2140        write = self.ss.slot_testv_and_readv_and_writev
2141        share = self.build_test_sdmf_share(empty)
2142        testvs = [(0, 1, b"eq", b"")]
2143        tws = {}
2144        tws[0] = (testvs, [(0, share)], None)
2145        readv = []
2146        results = write(storage_index, self.secrets, tws, readv)
2147        self.assertTrue(results[0])
2148
2149
2150    def test_read(self):
2151        self.write_test_share_to_server(b"si1")
2152        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2153        # Check that every method equals what we expect it to.
2154        d = defer.succeed(None)
2155        def _check_block_and_salt(block_and_salt):
2156            (block, salt) = block_and_salt
2157            self.assertThat(block, Equals(self.block))
2158            self.assertThat(salt, Equals(self.salt))
2159
2160        for i in range(6):
2161            d.addCallback(lambda ignored, i=i:
2162                mr.get_block_and_salt(i))
2163            d.addCallback(_check_block_and_salt)
2164
2165        d.addCallback(lambda ignored:
2166            mr.get_encprivkey())
2167        d.addCallback(lambda encprivkey:
2168            self.assertThat(self.encprivkey, Equals(encprivkey)))
2169
2170        d.addCallback(lambda ignored:
2171            mr.get_blockhashes())
2172        d.addCallback(lambda blockhashes:
2173            self.assertThat(self.block_hash_tree, Equals(blockhashes)))
2174
2175        d.addCallback(lambda ignored:
2176            mr.get_sharehashes())
2177        d.addCallback(lambda sharehashes:
2178            self.assertThat(self.share_hash_chain, Equals(sharehashes)))
2179
2180        d.addCallback(lambda ignored:
2181            mr.get_signature())
2182        d.addCallback(lambda signature:
2183            self.assertThat(signature, Equals(self.signature)))
2184
2185        d.addCallback(lambda ignored:
2186            mr.get_verification_key())
2187        d.addCallback(lambda verification_key:
2188            self.assertThat(verification_key, Equals(self.verification_key)))
2189
2190        d.addCallback(lambda ignored:
2191            mr.get_seqnum())
2192        d.addCallback(lambda seqnum:
2193            self.assertThat(seqnum, Equals(0)))
2194
2195        d.addCallback(lambda ignored:
2196            mr.get_root_hash())
2197        d.addCallback(lambda root_hash:
2198            self.assertThat(self.root_hash, Equals(root_hash)))
2199
2200        d.addCallback(lambda ignored:
2201            mr.get_seqnum())
2202        d.addCallback(lambda seqnum:
2203            self.assertThat(seqnum, Equals(0)))
2204
2205        d.addCallback(lambda ignored:
2206            mr.get_encoding_parameters())
2207        def _check_encoding_parameters(args):
2208            (k, n, segsize, datalen) = args
2209            self.assertThat(k, Equals(3))
2210            self.assertThat(n, Equals(10))
2211            self.assertThat(segsize, Equals(6))
2212            self.assertThat(datalen, Equals(36))
2213        d.addCallback(_check_encoding_parameters)
2214
2215        d.addCallback(lambda ignored:
2216            mr.get_checkstring())
2217        d.addCallback(lambda checkstring:
2218            self.assertThat(checkstring, Equals(checkstring)))
2219        return d
2220
2221
2222    def test_read_with_different_tail_segment_size(self):
2223        self.write_test_share_to_server(b"si1", tail_segment=True)
2224        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2225        d = mr.get_block_and_salt(5)
2226        def _check_tail_segment(results):
2227            block, salt = results
2228            self.assertThat(block, HasLength(1))
2229            self.assertThat(block, Equals(b"a"))
2230        d.addCallback(_check_tail_segment)
2231        return d
2232
2233
2234    def test_get_block_with_invalid_segnum(self):
2235        self.write_test_share_to_server(b"si1")
2236        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2237        d = defer.succeed(None)
2238        d.addCallback(lambda ignored:
2239            self.shouldFail(LayoutInvalid, "test invalid segnum",
2240                            None,
2241                            mr.get_block_and_salt, 7))
2242        return d
2243
2244
2245    def test_get_encoding_parameters_first(self):
2246        self.write_test_share_to_server(b"si1")
2247        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2248        d = mr.get_encoding_parameters()
2249        def _check_encoding_parameters(args):
2250            (k, n, segment_size, datalen) = args
2251            self.assertThat(k, Equals(3))
2252            self.assertThat(n, Equals(10))
2253            self.assertThat(segment_size, Equals(6))
2254            self.assertThat(datalen, Equals(36))
2255        d.addCallback(_check_encoding_parameters)
2256        return d
2257
2258
2259    def test_get_seqnum_first(self):
2260        self.write_test_share_to_server(b"si1")
2261        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2262        d = mr.get_seqnum()
2263        d.addCallback(lambda seqnum:
2264            self.assertThat(seqnum, Equals(0)))
2265        return d
2266
2267
2268    def test_get_root_hash_first(self):
2269        self.write_test_share_to_server(b"si1")
2270        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2271        d = mr.get_root_hash()
2272        d.addCallback(lambda root_hash:
2273            self.assertThat(root_hash, Equals(self.root_hash)))
2274        return d
2275
2276
2277    def test_get_checkstring_first(self):
2278        self.write_test_share_to_server(b"si1")
2279        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2280        d = mr.get_checkstring()
2281        d.addCallback(lambda checkstring:
2282            self.assertThat(checkstring, Equals(self.checkstring)))
2283        return d
2284
2285
2286    def test_write_read_vectors(self):
2287        # When writing for us, the storage server will return to us a
2288        # read vector, along with its result. If a write fails because
2289        # the test vectors failed, this read vector can help us to
2290        # diagnose the problem. This test ensures that the read vector
2291        # is working appropriately.
2292        mw = self._make_new_mw(b"si1", 0)
2293
2294        for i in range(6):
2295            mw.put_block(self.block, i, self.salt)
2296        mw.put_encprivkey(self.encprivkey)
2297        mw.put_blockhashes(self.block_hash_tree)
2298        mw.put_sharehashes(self.share_hash_chain)
2299        mw.put_root_hash(self.root_hash)
2300        mw.put_signature(self.signature)
2301        mw.put_verification_key(self.verification_key)
2302        d = mw.finish_publishing()
2303        def _then(results):
2304            self.assertThat(results, HasLength(2))
2305            result, readv = results
2306            self.assertTrue(result)
2307            self.assertFalse(readv)
2308            self.old_checkstring = mw.get_checkstring()
2309            mw.set_checkstring(b"")
2310        d.addCallback(_then)
2311        d.addCallback(lambda ignored:
2312            mw.finish_publishing())
2313        def _then_again(results):
2314            self.assertThat(results, HasLength(2))
2315            result, readvs = results
2316            self.assertFalse(result)
2317            self.assertThat(readvs, Contains(0))
2318            readv = readvs[0][0]
2319            self.assertThat(readv, Equals(self.old_checkstring))
2320        d.addCallback(_then_again)
2321        # The checkstring remains the same for the rest of the process.
2322        return d
2323
2324
2325    def test_private_key_after_share_hash_chain(self):
2326        mw = self._make_new_mw(b"si1", 0)
2327        d = defer.succeed(None)
2328        for i in range(6):
2329            d.addCallback(lambda ignored, i=i:
2330                mw.put_block(self.block, i, self.salt))
2331        d.addCallback(lambda ignored:
2332            mw.put_encprivkey(self.encprivkey))
2333        d.addCallback(lambda ignored:
2334            mw.put_sharehashes(self.share_hash_chain))
2335
2336        # Now try to put the private key again.
2337        d.addCallback(lambda ignored:
2338            self.shouldFail(LayoutInvalid, "test repeat private key",
2339                            None,
2340                            mw.put_encprivkey, self.encprivkey))
2341        return d
2342
2343
2344    def test_signature_after_verification_key(self):
2345        mw = self._make_new_mw(b"si1", 0)
2346        d = defer.succeed(None)
2347        # Put everything up to and including the verification key.
2348        for i in range(6):
2349            d.addCallback(lambda ignored, i=i:
2350                mw.put_block(self.block, i, self.salt))
2351        d.addCallback(lambda ignored:
2352            mw.put_encprivkey(self.encprivkey))
2353        d.addCallback(lambda ignored:
2354            mw.put_blockhashes(self.block_hash_tree))
2355        d.addCallback(lambda ignored:
2356            mw.put_sharehashes(self.share_hash_chain))
2357        d.addCallback(lambda ignored:
2358            mw.put_root_hash(self.root_hash))
2359        d.addCallback(lambda ignored:
2360            mw.put_signature(self.signature))
2361        d.addCallback(lambda ignored:
2362            mw.put_verification_key(self.verification_key))
2363        # Now try to put the signature again. This should fail
2364        d.addCallback(lambda ignored:
2365            self.shouldFail(LayoutInvalid, "signature after verification",
2366                            None,
2367                            mw.put_signature, self.signature))
2368        return d
2369
2370
2371    def test_uncoordinated_write(self):
2372        # Make two mutable writers, both pointing to the same storage
2373        # server, both at the same storage index, and try writing to the
2374        # same share.
2375        mw1 = self._make_new_mw(b"si1", 0)
2376        mw2 = self._make_new_mw(b"si1", 0)
2377
2378        def _check_success(results):
2379            result, readvs = results
2380            self.assertTrue(result)
2381
2382        def _check_failure(results):
2383            result, readvs = results
2384            self.assertFalse(result)
2385
2386        def _write_share(mw):
2387            for i in range(6):
2388                mw.put_block(self.block, i, self.salt)
2389            mw.put_encprivkey(self.encprivkey)
2390            mw.put_blockhashes(self.block_hash_tree)
2391            mw.put_sharehashes(self.share_hash_chain)
2392            mw.put_root_hash(self.root_hash)
2393            mw.put_signature(self.signature)
2394            mw.put_verification_key(self.verification_key)
2395            return mw.finish_publishing()
2396        d = _write_share(mw1)
2397        d.addCallback(_check_success)
2398        d.addCallback(lambda ignored:
2399            _write_share(mw2))
2400        d.addCallback(_check_failure)
2401        return d
2402
2403
2404    def test_invalid_salt_size(self):
2405        # Salts need to be 16 bytes in size. Writes that attempt to
2406        # write more or less than this should be rejected.
2407        mw = self._make_new_mw(b"si1", 0)
2408        invalid_salt = b"a" * 17 # 17 bytes
2409        another_invalid_salt = b"b" * 15 # 15 bytes
2410        d = defer.succeed(None)
2411        d.addCallback(lambda ignored:
2412            self.shouldFail(LayoutInvalid, "salt too big",
2413                            None,
2414                            mw.put_block, self.block, 0, invalid_salt))
2415        d.addCallback(lambda ignored:
2416            self.shouldFail(LayoutInvalid, "salt too small",
2417                            None,
2418                            mw.put_block, self.block, 0,
2419                            another_invalid_salt))
2420        return d
2421
2422
2423    def test_write_test_vectors(self):
2424        # If we give the write proxy a bogus test vector at
2425        # any point during the process, it should fail to write when we
2426        # tell it to write.
2427        def _check_failure(results):
2428            self.assertThat(results, HasLength(2))
2429            res, d = results
2430            self.assertFalse(res)
2431
2432        def _check_success(results):
2433            self.assertThat(results, HasLength(2))
2434            res, d = results
2435            self.assertTrue(results)
2436
2437        mw = self._make_new_mw(b"si1", 0)
2438        mw.set_checkstring(b"this is a lie")
2439        for i in range(6):
2440            mw.put_block(self.block, i, self.salt)
2441        mw.put_encprivkey(self.encprivkey)
2442        mw.put_blockhashes(self.block_hash_tree)
2443        mw.put_sharehashes(self.share_hash_chain)
2444        mw.put_root_hash(self.root_hash)
2445        mw.put_signature(self.signature)
2446        mw.put_verification_key(self.verification_key)
2447        d = mw.finish_publishing()
2448        d.addCallback(_check_failure)
2449        d.addCallback(lambda ignored:
2450            mw.set_checkstring(b""))
2451        d.addCallback(lambda ignored:
2452            mw.finish_publishing())
2453        d.addCallback(_check_success)
2454        return d
2455
2456
2457    def serialize_blockhashes(self, blockhashes):
2458        return b"".join(blockhashes)
2459
2460
2461    def serialize_sharehashes(self, sharehashes):
2462        ret = b"".join([struct.pack(">H32s", i, sharehashes[i])
2463                        for i in sorted(sharehashes.keys())])
2464        return ret
2465
2466
2467    def test_write(self):
2468        # This translates to a file with 6 6-byte segments, and with 2-byte
2469        # blocks.
2470        mw = self._make_new_mw(b"si1", 0)
2471        # Test writing some blocks.
2472        read = self.ss.slot_readv
2473        expected_private_key_offset = struct.calcsize(MDMFHEADER)
2474        expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
2475                                    PRIVATE_KEY_SIZE + \
2476                                    SIGNATURE_SIZE + \
2477                                    VERIFICATION_KEY_SIZE + \
2478                                    SHARE_HASH_CHAIN_SIZE
2479        written_block_size = 2 + len(self.salt)
2480        written_block = self.block + self.salt
2481        for i in range(6):
2482            mw.put_block(self.block, i, self.salt)
2483
2484        mw.put_encprivkey(self.encprivkey)
2485        mw.put_blockhashes(self.block_hash_tree)
2486        mw.put_sharehashes(self.share_hash_chain)
2487        mw.put_root_hash(self.root_hash)
2488        mw.put_signature(self.signature)
2489        mw.put_verification_key(self.verification_key)
2490        d = mw.finish_publishing()
2491        def _check_publish(results):
2492            self.assertThat(results, HasLength(2))
2493            result, ign = results
2494            self.assertTrue(result, "publish failed")
2495            for i in range(6):
2496                self.assertThat(read(b"si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
2497                                Equals({0: [written_block]}))
2498
2499            self.assertThat(self.encprivkey, HasLength(7))
2500            self.assertThat(read(b"si1", [0], [(expected_private_key_offset, 7)]),
2501                                 Equals({0: [self.encprivkey]}))
2502
2503            expected_block_hash_offset = expected_sharedata_offset + \
2504                        (6 * written_block_size)
2505            self.assertThat(self.block_hash_tree_s, HasLength(32 * 6))
2506            self.assertThat(read(b"si1", [0], [(expected_block_hash_offset, 32 * 6)]),
2507                                 Equals({0: [self.block_hash_tree_s]}))
2508
2509            expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
2510            self.assertThat(read(b"si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
2511                                 Equals({0: [self.share_hash_chain_s]}))
2512
2513            self.assertThat(read(b"si1", [0], [(9, 32)]),
2514                                 Equals({0: [self.root_hash]}))
2515            expected_signature_offset = expected_share_hash_offset + \
2516                len(self.share_hash_chain_s)
2517            self.assertThat(self.signature, HasLength(9))
2518            self.assertThat(read(b"si1", [0], [(expected_signature_offset, 9)]),
2519                                 Equals({0: [self.signature]}))
2520
2521            expected_verification_key_offset = expected_signature_offset + len(self.signature)
2522            self.assertThat(self.verification_key, HasLength(6))
2523            self.assertThat(read(b"si1", [0], [(expected_verification_key_offset, 6)]),
2524                                 Equals({0: [self.verification_key]}))
2525
2526            signable = mw.get_signable()
2527            verno, seq, roothash, k, n, segsize, datalen = \
2528                                            struct.unpack(">BQ32sBBQQ",
2529                                                          signable)
2530            self.assertThat(verno, Equals(1))
2531            self.assertThat(seq, Equals(0))
2532            self.assertThat(roothash, Equals(self.root_hash))
2533            self.assertThat(k, Equals(3))
2534            self.assertThat(n, Equals(10))
2535            self.assertThat(segsize, Equals(6))
2536            self.assertThat(datalen, Equals(36))
2537            expected_eof_offset = expected_block_hash_offset + \
2538                len(self.block_hash_tree_s)
2539
2540            # Check the version number to make sure that it is correct.
2541            expected_version_number = struct.pack(">B", 1)
2542            self.assertThat(read(b"si1", [0], [(0, 1)]),
2543                                 Equals({0: [expected_version_number]}))
2544            # Check the sequence number to make sure that it is correct
2545            expected_sequence_number = struct.pack(">Q", 0)
2546            self.assertThat(read(b"si1", [0], [(1, 8)]),
2547                                 Equals({0: [expected_sequence_number]}))
2548            # Check that the encoding parameters (k, N, segement size, data
2549            # length) are what they should be. These are  3, 10, 6, 36
2550            expected_k = struct.pack(">B", 3)
2551            self.assertThat(read(b"si1", [0], [(41, 1)]),
2552                                 Equals({0: [expected_k]}))
2553            expected_n = struct.pack(">B", 10)
2554            self.assertThat(read(b"si1", [0], [(42, 1)]),
2555                                 Equals({0: [expected_n]}))
2556            expected_segment_size = struct.pack(">Q", 6)
2557            self.assertThat(read(b"si1", [0], [(43, 8)]),
2558                                 Equals({0: [expected_segment_size]}))
2559            expected_data_length = struct.pack(">Q", 36)
2560            self.assertThat(read(b"si1", [0], [(51, 8)]),
2561                                 Equals({0: [expected_data_length]}))
2562            expected_offset = struct.pack(">Q", expected_private_key_offset)
2563            self.assertThat(read(b"si1", [0], [(59, 8)]),
2564                                 Equals({0: [expected_offset]}))
2565            expected_offset = struct.pack(">Q", expected_share_hash_offset)
2566            self.assertThat(read(b"si1", [0], [(67, 8)]),
2567                                 Equals({0: [expected_offset]}))
2568            expected_offset = struct.pack(">Q", expected_signature_offset)
2569            self.assertThat(read(b"si1", [0], [(75, 8)]),
2570                                 Equals({0: [expected_offset]}))
2571            expected_offset = struct.pack(">Q", expected_verification_key_offset)
2572            self.assertThat(read(b"si1", [0], [(83, 8)]),
2573                                 Equals({0: [expected_offset]}))
2574            expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2575            self.assertThat(read(b"si1", [0], [(91, 8)]),
2576                                 Equals({0: [expected_offset]}))
2577            expected_offset = struct.pack(">Q", expected_sharedata_offset)
2578            self.assertThat(read(b"si1", [0], [(99, 8)]),
2579                                 Equals({0: [expected_offset]}))
2580            expected_offset = struct.pack(">Q", expected_block_hash_offset)
2581            self.assertThat(read(b"si1", [0], [(107, 8)]),
2582                                 Equals({0: [expected_offset]}))
2583            expected_offset = struct.pack(">Q", expected_eof_offset)
2584            self.assertThat(read(b"si1", [0], [(115, 8)]),
2585                                 Equals({0: [expected_offset]}))
2586        d.addCallback(_check_publish)
2587        return d
2588
2589    def _make_new_mw(self, si, share, datalength=36):
2590        # This is a file of size 36 bytes. Since it has a segment
2591        # size of 6, we know that it has 6 byte segments, which will
2592        # be split into blocks of 2 bytes because our FEC k
2593        # parameter is 3.
2594        mw = MDMFSlotWriteProxy(share, self.storage_server, si, self.secrets, 0, 3, 10,
2595                                6, datalength)
2596        return mw
2597
2598
2599    def test_write_rejected_with_too_many_blocks(self):
2600        mw = self._make_new_mw(b"si0", 0)
2601
2602        # Try writing too many blocks. We should not be able to write
2603        # more than 6
2604        # blocks into each share.
2605        d = defer.succeed(None)
2606        for i in range(6):
2607            d.addCallback(lambda ignored, i=i:
2608                mw.put_block(self.block, i, self.salt))
2609        d.addCallback(lambda ignored:
2610            self.shouldFail(LayoutInvalid, "too many blocks",
2611                            None,
2612                            mw.put_block, self.block, 7, self.salt))
2613        return d
2614
2615
2616    def test_write_rejected_with_invalid_salt(self):
2617        # Try writing an invalid salt. Salts are 16 bytes -- any more or
2618        # less should cause an error.
2619        mw = self._make_new_mw(b"si1", 0)
2620        bad_salt = b"a" * 17 # 17 bytes
2621        d = defer.succeed(None)
2622        d.addCallback(lambda ignored:
2623            self.shouldFail(LayoutInvalid, "test_invalid_salt",
2624                            None, mw.put_block, self.block, 7, bad_salt))
2625        return d
2626
2627
2628    def test_write_rejected_with_invalid_root_hash(self):
2629        # Try writing an invalid root hash. This should be SHA256d, and
2630        # 32 bytes long as a result.
2631        mw = self._make_new_mw(b"si2", 0)
2632        # 17 bytes != 32 bytes
2633        invalid_root_hash = b"a" * 17
2634        d = defer.succeed(None)
2635        # Before this test can work, we need to put some blocks + salts,
2636        # a block hash tree, and a share hash tree. Otherwise, we'll see
2637        # failures that match what we are looking for, but are caused by
2638        # the constraints imposed on operation ordering.
2639        for i in range(6):
2640            d.addCallback(lambda ignored, i=i:
2641                mw.put_block(self.block, i, self.salt))
2642        d.addCallback(lambda ignored:
2643            mw.put_encprivkey(self.encprivkey))
2644        d.addCallback(lambda ignored:
2645            mw.put_blockhashes(self.block_hash_tree))
2646        d.addCallback(lambda ignored:
2647            mw.put_sharehashes(self.share_hash_chain))
2648        d.addCallback(lambda ignored:
2649            self.shouldFail(LayoutInvalid, "invalid root hash",
2650                            None, mw.put_root_hash, invalid_root_hash))
2651        return d
2652
2653
2654    def test_write_rejected_with_invalid_blocksize(self):
2655        # The blocksize implied by the writer that we get from
2656        # _make_new_mw is 2bytes -- any more or any less than this
2657        # should be cause for failure, unless it is the tail segment, in
2658        # which case it may not be failure.
2659        invalid_block = b"a"
2660        mw = self._make_new_mw(b"si3", 0, 33) # implies a tail segment with
2661                                             # one byte blocks
2662        # 1 bytes != 2 bytes
2663        d = defer.succeed(None)
2664        d.addCallback(lambda ignored, invalid_block=invalid_block:
2665            self.shouldFail(LayoutInvalid, "test blocksize too small",
2666                            None, mw.put_block, invalid_block, 0,
2667                            self.salt))
2668        invalid_block = invalid_block * 3
2669        # 3 bytes != 2 bytes
2670        d.addCallback(lambda ignored:
2671            self.shouldFail(LayoutInvalid, "test blocksize too large",
2672                            None,
2673                            mw.put_block, invalid_block, 0, self.salt))
2674        for i in range(5):
2675            d.addCallback(lambda ignored, i=i:
2676                mw.put_block(self.block, i, self.salt))
2677        # Try to put an invalid tail segment
2678        d.addCallback(lambda ignored:
2679            self.shouldFail(LayoutInvalid, "test invalid tail segment",
2680                            None,
2681                            mw.put_block, self.block, 5, self.salt))
2682        valid_block = b"a"
2683        d.addCallback(lambda ignored:
2684            mw.put_block(valid_block, 5, self.salt))
2685        return d
2686
2687
2688    def test_write_enforces_order_constraints(self):
2689        # We require that the MDMFSlotWriteProxy be interacted with in a
2690        # specific way.
2691        # That way is:
2692        # 0: __init__
2693        # 1: write blocks and salts
2694        # 2: Write the encrypted private key
2695        # 3: Write the block hashes
2696        # 4: Write the share hashes
2697        # 5: Write the root hash and salt hash
2698        # 6: Write the signature and verification key
2699        # 7: Write the file.
2700        #
2701        # Some of these can be performed out-of-order, and some can't.
2702        # The dependencies that I want to test here are:
2703        #  - Private key before block hashes
2704        #  - share hashes and block hashes before root hash
2705        #  - root hash before signature
2706        #  - signature before verification key
2707        mw0 = self._make_new_mw(b"si0", 0)
2708        # Write some shares
2709        d = defer.succeed(None)
2710        for i in range(6):
2711            d.addCallback(lambda ignored, i=i:
2712                mw0.put_block(self.block, i, self.salt))
2713
2714        # Try to write the share hash chain without writing the
2715        # encrypted private key
2716        d.addCallback(lambda ignored:
2717            self.shouldFail(LayoutInvalid, "share hash chain before "
2718                                           "private key",
2719                            None,
2720                            mw0.put_sharehashes, self.share_hash_chain))
2721        # Write the private key.
2722        d.addCallback(lambda ignored:
2723            mw0.put_encprivkey(self.encprivkey))
2724
2725        # Now write the block hashes and try again
2726        d.addCallback(lambda ignored:
2727            mw0.put_blockhashes(self.block_hash_tree))
2728
2729        # We haven't yet put the root hash on the share, so we shouldn't
2730        # be able to sign it.
2731        d.addCallback(lambda ignored:
2732            self.shouldFail(LayoutInvalid, "signature before root hash",
2733                            None, mw0.put_signature, self.signature))
2734
2735        d.addCallback(lambda ignored:
2736            self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2737
2738        # ..and, since that fails, we also shouldn't be able to put the
2739        # verification key.
2740        d.addCallback(lambda ignored:
2741            self.shouldFail(LayoutInvalid, "key before signature",
2742                            None, mw0.put_verification_key,
2743                            self.verification_key))
2744
2745        # Now write the share hashes.
2746        d.addCallback(lambda ignored:
2747            mw0.put_sharehashes(self.share_hash_chain))
2748        # We should be able to write the root hash now too
2749        d.addCallback(lambda ignored:
2750            mw0.put_root_hash(self.root_hash))
2751
2752        # We should still be unable to put the verification key
2753        d.addCallback(lambda ignored:
2754            self.shouldFail(LayoutInvalid, "key before signature",
2755                            None, mw0.put_verification_key,
2756                            self.verification_key))
2757
2758        d.addCallback(lambda ignored:
2759            mw0.put_signature(self.signature))
2760
2761        # We shouldn't be able to write the offsets to the remote server
2762        # until the offset table is finished; IOW, until we have written
2763        # the verification key.
2764        d.addCallback(lambda ignored:
2765            self.shouldFail(LayoutInvalid, "offsets before verification key",
2766                            None,
2767                            mw0.finish_publishing))
2768
2769        d.addCallback(lambda ignored:
2770            mw0.put_verification_key(self.verification_key))
2771        return d
2772
2773
2774    def test_end_to_end(self):
2775        mw = self._make_new_mw(b"si1", 0)
2776        # Write a share using the mutable writer, and make sure that the
2777        # reader knows how to read everything back to us.
2778        d = defer.succeed(None)
2779        for i in range(6):
2780            d.addCallback(lambda ignored, i=i:
2781                mw.put_block(self.block, i, self.salt))
2782        d.addCallback(lambda ignored:
2783            mw.put_encprivkey(self.encprivkey))
2784        d.addCallback(lambda ignored:
2785            mw.put_blockhashes(self.block_hash_tree))
2786        d.addCallback(lambda ignored:
2787            mw.put_sharehashes(self.share_hash_chain))
2788        d.addCallback(lambda ignored:
2789            mw.put_root_hash(self.root_hash))
2790        d.addCallback(lambda ignored:
2791            mw.put_signature(self.signature))
2792        d.addCallback(lambda ignored:
2793            mw.put_verification_key(self.verification_key))
2794        d.addCallback(lambda ignored:
2795            mw.finish_publishing())
2796
2797        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2798        def _check_block_and_salt(block_and_salt):
2799            (block, salt) = block_and_salt
2800            self.assertThat(block, Equals(self.block))
2801            self.assertThat(salt, Equals(self.salt))
2802
2803        for i in range(6):
2804            d.addCallback(lambda ignored, i=i:
2805                mr.get_block_and_salt(i))
2806            d.addCallback(_check_block_and_salt)
2807
2808        d.addCallback(lambda ignored:
2809            mr.get_encprivkey())
2810        d.addCallback(lambda encprivkey:
2811            self.assertThat(self.encprivkey, Equals(encprivkey)))
2812
2813        d.addCallback(lambda ignored:
2814            mr.get_blockhashes())
2815        d.addCallback(lambda blockhashes:
2816            self.assertThat(self.block_hash_tree, Equals(blockhashes)))
2817
2818        d.addCallback(lambda ignored:
2819            mr.get_sharehashes())
2820        d.addCallback(lambda sharehashes:
2821            self.assertThat(self.share_hash_chain, Equals(sharehashes)))
2822
2823        d.addCallback(lambda ignored:
2824            mr.get_signature())
2825        d.addCallback(lambda signature:
2826            self.assertThat(signature, Equals(self.signature)))
2827
2828        d.addCallback(lambda ignored:
2829            mr.get_verification_key())
2830        d.addCallback(lambda verification_key:
2831            self.assertThat(verification_key, Equals(self.verification_key)))
2832
2833        d.addCallback(lambda ignored:
2834            mr.get_seqnum())
2835        d.addCallback(lambda seqnum:
2836            self.assertThat(seqnum, Equals(0)))
2837
2838        d.addCallback(lambda ignored:
2839            mr.get_root_hash())
2840        d.addCallback(lambda root_hash:
2841            self.assertThat(self.root_hash, Equals(root_hash)))
2842
2843        d.addCallback(lambda ignored:
2844            mr.get_encoding_parameters())
2845        def _check_encoding_parameters(args):
2846            (k, n, segsize, datalen) = args
2847            self.assertThat(k, Equals(3))
2848            self.assertThat(n, Equals(10))
2849            self.assertThat(segsize, Equals(6))
2850            self.assertThat(datalen, Equals(36))
2851        d.addCallback(_check_encoding_parameters)
2852
2853        d.addCallback(lambda ignored:
2854            mr.get_checkstring())
2855        d.addCallback(lambda checkstring:
2856            self.assertThat(checkstring, Equals(mw.get_checkstring())))
2857        return d
2858
2859
2860    def test_is_sdmf(self):
2861        # The MDMFSlotReadProxy should also know how to read SDMF files,
2862        # since it will encounter them on the grid. Callers use the
2863        # is_sdmf method to test this.
2864        self.write_sdmf_share_to_server(b"si1")
2865        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2866        d = mr.is_sdmf()
2867        d.addCallback(lambda issdmf:
2868            self.assertTrue(issdmf))
2869        return d
2870
2871
2872    def test_reads_sdmf(self):
2873        # The slot read proxy should, naturally, know how to tell us
2874        # about data in the SDMF format
2875        self.write_sdmf_share_to_server(b"si1")
2876        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2877        d = defer.succeed(None)
2878        d.addCallback(lambda ignored:
2879            mr.is_sdmf())
2880        d.addCallback(lambda issdmf:
2881            self.assertTrue(issdmf))
2882
2883        # What do we need to read?
2884        #  - The sharedata
2885        #  - The salt
2886        d.addCallback(lambda ignored:
2887            mr.get_block_and_salt(0))
2888        def _check_block_and_salt(results):
2889            block, salt = results
2890            # Our original file is 36 bytes long. Then each share is 12
2891            # bytes in size. The share is composed entirely of the
2892            # letter a. self.block contains 2 as, so 6 * self.block is
2893            # what we are looking for.
2894            self.assertThat(block, Equals(self.block * 6))
2895            self.assertThat(salt, Equals(self.salt))
2896        d.addCallback(_check_block_and_salt)
2897
2898        #  - The blockhashes
2899        d.addCallback(lambda ignored:
2900            mr.get_blockhashes())
2901        d.addCallback(lambda blockhashes:
2902            self.assertThat(self.block_hash_tree,
2903                                 Equals(blockhashes),
2904                                 blockhashes))
2905        #  - The sharehashes
2906        d.addCallback(lambda ignored:
2907            mr.get_sharehashes())
2908        d.addCallback(lambda sharehashes:
2909            self.assertThat(self.share_hash_chain,
2910                                 Equals(sharehashes)))
2911        #  - The keys
2912        d.addCallback(lambda ignored:
2913            mr.get_encprivkey())
2914        d.addCallback(lambda encprivkey:
2915            self.assertThat(encprivkey, Equals(self.encprivkey), encprivkey))
2916        d.addCallback(lambda ignored:
2917            mr.get_verification_key())
2918        d.addCallback(lambda verification_key:
2919            self.assertThat(verification_key,
2920                                 Equals(self.verification_key),
2921                                 verification_key))
2922        #  - The signature
2923        d.addCallback(lambda ignored:
2924            mr.get_signature())
2925        d.addCallback(lambda signature:
2926            self.assertThat(signature, Equals(self.signature), signature))
2927
2928        #  - The sequence number
2929        d.addCallback(lambda ignored:
2930            mr.get_seqnum())
2931        d.addCallback(lambda seqnum:
2932            self.assertThat(seqnum, Equals(0), seqnum))
2933
2934        #  - The root hash
2935        d.addCallback(lambda ignored:
2936            mr.get_root_hash())
2937        d.addCallback(lambda root_hash:
2938            self.assertThat(root_hash, Equals(self.root_hash), root_hash))
2939        return d
2940
2941
2942    def test_only_reads_one_segment_sdmf(self):
2943        # SDMF shares have only one segment, so it doesn't make sense to
2944        # read more segments than that. The reader should know this and
2945        # complain if we try to do that.
2946        self.write_sdmf_share_to_server(b"si1")
2947        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2948        d = defer.succeed(None)
2949        d.addCallback(lambda ignored:
2950            mr.is_sdmf())
2951        d.addCallback(lambda issdmf:
2952            self.assertTrue(issdmf))
2953        d.addCallback(lambda ignored:
2954            self.shouldFail(LayoutInvalid, "test bad segment",
2955                            None,
2956                            mr.get_block_and_salt, 1))
2957        return d
2958
2959
2960    def test_read_with_prefetched_mdmf_data(self):
2961        # The MDMFSlotReadProxy will prefill certain fields if you pass
2962        # it data that you have already fetched. This is useful for
2963        # cases like the Servermap, which prefetches ~2kb of data while
2964        # finding out which shares are on the remote peer so that it
2965        # doesn't waste round trips.
2966        mdmf_data = self.build_test_mdmf_share()
2967        self.write_test_share_to_server(b"si1")
2968        def _make_mr(ignored, length):
2969            mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0, mdmf_data[:length])
2970            return mr
2971
2972        d = defer.succeed(None)
2973        # This should be enough to fill in both the encoding parameters
2974        # and the table of offsets, which will complete the version
2975        # information tuple.
2976        d.addCallback(_make_mr, 123)
2977        d.addCallback(lambda mr:
2978            mr.get_verinfo())
2979        def _check_verinfo(verinfo):
2980            self.assertTrue(verinfo)
2981            self.assertThat(verinfo, HasLength(9))
2982            (seqnum,
2983             root_hash,
2984             salt_hash,
2985             segsize,
2986             datalen,
2987             k,
2988             n,
2989             prefix,
2990             offsets) = verinfo
2991            self.assertThat(seqnum, Equals(0))
2992            self.assertThat(root_hash, Equals(self.root_hash))
2993            self.assertThat(segsize, Equals(6))
2994            self.assertThat(datalen, Equals(36))
2995            self.assertThat(k, Equals(3))
2996            self.assertThat(n, Equals(10))
2997            expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
2998                                          1,
2999                                          seqnum,
3000                                          root_hash,
3001                                          k,
3002                                          n,
3003                                          segsize,
3004                                          datalen)
3005            self.assertThat(expected_prefix, Equals(prefix))
3006            self.assertThat(self.rref.read_count, Equals(0))
3007        d.addCallback(_check_verinfo)
3008        # This is not enough data to read a block and a share, so the
3009        # wrapper should attempt to read this from the remote server.
3010        d.addCallback(_make_mr, 123)
3011        d.addCallback(lambda mr:
3012            mr.get_block_and_salt(0))
3013        def _check_block_and_salt(block_and_salt):
3014            (block, salt) = block_and_salt
3015            self.assertThat(block, Equals(self.block))
3016            self.assertThat(salt, Equals(self.salt))
3017            self.assertThat(self.rref.read_count, Equals(1))
3018        # This should be enough data to read one block.
3019        d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
3020        d.addCallback(lambda mr:
3021            mr.get_block_and_salt(0))
3022        d.addCallback(_check_block_and_salt)
3023        return d
3024
3025
3026    def test_read_with_prefetched_sdmf_data(self):
3027        sdmf_data = self.build_test_sdmf_share()
3028        self.write_sdmf_share_to_server(b"si1")
3029        def _make_mr(ignored, length):
3030            mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0, sdmf_data[:length])
3031            return mr
3032
3033        d = defer.succeed(None)
3034        # This should be enough to get us the encoding parameters,
3035        # offset table, and everything else we need to build a verinfo
3036        # string.
3037        d.addCallback(_make_mr, 123)
3038        d.addCallback(lambda mr:
3039            mr.get_verinfo())
3040        def _check_verinfo(verinfo):
3041            self.assertTrue(verinfo)
3042            self.assertThat(verinfo, HasLength(9))
3043            (seqnum,
3044             root_hash,
3045             salt,
3046             segsize,
3047             datalen,
3048             k,
3049             n,
3050             prefix,
3051             offsets) = verinfo
3052            self.assertThat(seqnum, Equals(0))
3053            self.assertThat(root_hash, Equals(self.root_hash))
3054            self.assertThat(salt, Equals(self.salt))
3055            self.assertThat(segsize, Equals(36))
3056            self.assertThat(datalen, Equals(36))
3057            self.assertThat(k, Equals(3))
3058            self.assertThat(n, Equals(10))
3059            expected_prefix = struct.pack(SIGNED_PREFIX,
3060                                          0,
3061                                          seqnum,
3062                                          root_hash,
3063                                          salt,
3064                                          k,
3065                                          n,
3066                                          segsize,
3067                                          datalen)
3068            self.assertThat(expected_prefix, Equals(prefix))
3069            self.assertThat(self.rref.read_count, Equals(0))
3070        d.addCallback(_check_verinfo)
3071        # This shouldn't be enough to read any share data.
3072        d.addCallback(_make_mr, 123)
3073        d.addCallback(lambda mr:
3074            mr.get_block_and_salt(0))
3075        def _check_block_and_salt(block_and_salt):
3076            (block, salt) = block_and_salt
3077            self.assertThat(block, Equals(self.block * 6))
3078            self.assertThat(salt, Equals(self.salt))
3079            # TODO: Fix the read routine so that it reads only the data
3080            #       that it has cached if it can't read all of it.
3081            self.assertThat(self.rref.read_count, Equals(2))
3082
3083        # This should be enough to read share data.
3084        d.addCallback(_make_mr, self.offsets['share_data'])
3085        d.addCallback(lambda mr:
3086            mr.get_block_and_salt(0))
3087        d.addCallback(_check_block_and_salt)
3088        return d
3089
3090
3091    def test_read_with_empty_mdmf_file(self):
3092        # Some tests upload a file with no contents to test things
3093        # unrelated to the actual handling of the content of the file.
3094        # The reader should behave intelligently in these cases.
3095        self.write_test_share_to_server(b"si1", empty=True)
3096        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3097        # We should be able to get the encoding parameters, and they
3098        # should be correct.
3099        d = defer.succeed(None)
3100        d.addCallback(lambda ignored:
3101            mr.get_encoding_parameters())
3102        def _check_encoding_parameters(params):
3103            self.assertThat(params, HasLength(4))
3104            k, n, segsize, datalen = params
3105            self.assertThat(k, Equals(3))
3106            self.assertThat(n, Equals(10))
3107            self.assertThat(segsize, Equals(0))
3108            self.assertThat(datalen, Equals(0))
3109        d.addCallback(_check_encoding_parameters)
3110
3111        # We should not be able to fetch a block, since there are no
3112        # blocks to fetch
3113        d.addCallback(lambda ignored:
3114            self.shouldFail(LayoutInvalid, "get block on empty file",
3115                            None,
3116                            mr.get_block_and_salt, 0))
3117        return d
3118
3119
3120    def test_read_with_empty_sdmf_file(self):
3121        self.write_sdmf_share_to_server(b"si1", empty=True)
3122        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3123        # We should be able to get the encoding parameters, and they
3124        # should be correct
3125        d = defer.succeed(None)
3126        d.addCallback(lambda ignored:
3127            mr.get_encoding_parameters())
3128        def _check_encoding_parameters(params):
3129            self.assertThat(params, HasLength(4))
3130            k, n, segsize, datalen = params
3131            self.assertThat(k, Equals(3))
3132            self.assertThat(n, Equals(10))
3133            self.assertThat(segsize, Equals(0))
3134            self.assertThat(datalen, Equals(0))
3135        d.addCallback(_check_encoding_parameters)
3136
3137        # It does not make sense to get a block in this format, so we
3138        # should not be able to.
3139        d.addCallback(lambda ignored:
3140            self.shouldFail(LayoutInvalid, "get block on an empty file",
3141                            None,
3142                            mr.get_block_and_salt, 0))
3143        return d
3144
3145
3146    def test_verinfo_with_sdmf_file(self):
3147        self.write_sdmf_share_to_server(b"si1")
3148        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3149        # We should be able to get the version information.
3150        d = defer.succeed(None)
3151        d.addCallback(lambda ignored:
3152            mr.get_verinfo())
3153        def _check_verinfo(verinfo):
3154            self.assertTrue(verinfo)
3155            self.assertThat(verinfo, HasLength(9))
3156            (seqnum,
3157             root_hash,
3158             salt,
3159             segsize,
3160             datalen,
3161             k,
3162             n,
3163             prefix,
3164             offsets) = verinfo
3165            self.assertThat(seqnum, Equals(0))
3166            self.assertThat(root_hash, Equals(self.root_hash))
3167            self.assertThat(salt, Equals(self.salt))
3168            self.assertThat(segsize, Equals(36))
3169            self.assertThat(datalen, Equals(36))
3170            self.assertThat(k, Equals(3))
3171            self.assertThat(n, Equals(10))
3172            expected_prefix = struct.pack(">BQ32s16s BBQQ",
3173                                          0,
3174                                          seqnum,
3175                                          root_hash,
3176                                          salt,
3177                                          k,
3178                                          n,
3179                                          segsize,
3180                                          datalen)
3181            self.assertThat(prefix, Equals(expected_prefix))
3182            self.assertThat(offsets, Equals(self.offsets))
3183        d.addCallback(_check_verinfo)
3184        return d
3185
3186
3187    def test_verinfo_with_mdmf_file(self):
3188        self.write_test_share_to_server(b"si1")
3189        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3190        d = defer.succeed(None)
3191        d.addCallback(lambda ignored:
3192            mr.get_verinfo())
3193        def _check_verinfo(verinfo):
3194            self.assertTrue(verinfo)
3195            self.assertThat(verinfo, HasLength(9))
3196            (seqnum,
3197             root_hash,
3198             IV,
3199             segsize,
3200             datalen,
3201             k,
3202             n,
3203             prefix,
3204             offsets) = verinfo
3205            self.assertThat(seqnum, Equals(0))
3206            self.assertThat(root_hash, Equals(self.root_hash))
3207            self.assertFalse(IV)
3208            self.assertThat(segsize, Equals(6))
3209            self.assertThat(datalen, Equals(36))
3210            self.assertThat(k, Equals(3))
3211            self.assertThat(n, Equals(10))
3212            expected_prefix = struct.pack(">BQ32s BBQQ",
3213                                          1,
3214                                          seqnum,
3215                                          root_hash,
3216                                          k,
3217                                          n,
3218                                          segsize,
3219                                          datalen)
3220            self.assertThat(prefix, Equals(expected_prefix))
3221            self.assertThat(offsets, Equals(self.offsets))
3222        d.addCallback(_check_verinfo)
3223        return d
3224
3225
3226    def test_sdmf_writer(self):
3227        # Go through the motions of writing an SDMF share to the storage
3228        # server. Then read the storage server to see that the share got
3229        # written in the way that we think it should have.
3230
3231        # We do this first so that the necessary instance variables get
3232        # set the way we want them for the tests below.
3233        data = self.build_test_sdmf_share()
3234        sdmfr = SDMFSlotWriteProxy(0,
3235                                   self.storage_server,
3236                                   b"si1",
3237                                   self.secrets,
3238                                   0, 3, 10, 36, 36)
3239        # Put the block and salt.
3240        sdmfr.put_block(self.blockdata, 0, self.salt)
3241
3242        # Put the encprivkey
3243        sdmfr.put_encprivkey(self.encprivkey)
3244
3245        # Put the block and share hash chains
3246        sdmfr.put_blockhashes(self.block_hash_tree)
3247        sdmfr.put_sharehashes(self.share_hash_chain)
3248        sdmfr.put_root_hash(self.root_hash)
3249
3250        # Put the signature
3251        sdmfr.put_signature(self.signature)
3252
3253        # Put the verification key
3254        sdmfr.put_verification_key(self.verification_key)
3255
3256        # Now check to make sure that nothing has been written yet.
3257        self.assertThat(self.rref.write_count, Equals(0))
3258
3259        # Now finish publishing
3260        d = sdmfr.finish_publishing()
3261        def _then(ignored):
3262            self.assertThat(self.rref.write_count, Equals(1))
3263            read = self.ss.slot_readv
3264            self.assertThat(read(b"si1", [0], [(0, len(data))]),
3265                                 Equals({0: [data]}))
3266        d.addCallback(_then)
3267        return d
3268
3269
3270    def test_sdmf_writer_preexisting_share(self):
3271        data = self.build_test_sdmf_share()
3272        self.write_sdmf_share_to_server(b"si1")
3273
3274        # Now there is a share on the storage server. To successfully
3275        # write, we need to set the checkstring correctly. When we
3276        # don't, no write should occur.
3277        sdmfw = SDMFSlotWriteProxy(0,
3278                                   self.storage_server,
3279                                   b"si1",
3280                                   self.secrets,
3281                                   1, 3, 10, 36, 36)
3282        sdmfw.put_block(self.blockdata, 0, self.salt)
3283
3284        # Put the encprivkey
3285        sdmfw.put_encprivkey(self.encprivkey)
3286
3287        # Put the block and share hash chains
3288        sdmfw.put_blockhashes(self.block_hash_tree)
3289        sdmfw.put_sharehashes(self.share_hash_chain)
3290
3291        # Put the root hash
3292        sdmfw.put_root_hash(self.root_hash)
3293
3294        # Put the signature
3295        sdmfw.put_signature(self.signature)
3296
3297        # Put the verification key
3298        sdmfw.put_verification_key(self.verification_key)
3299
3300        # We shouldn't have a checkstring yet
3301        self.assertThat(sdmfw.get_checkstring(), Equals(b""))
3302
3303        d = sdmfw.finish_publishing()
3304        def _then(results):
3305            self.assertFalse(results[0])
3306            # this is the correct checkstring
3307            self._expected_checkstring = results[1][0][0]
3308            return self._expected_checkstring
3309
3310        d.addCallback(_then)
3311        d.addCallback(sdmfw.set_checkstring)
3312        d.addCallback(lambda ignored:
3313            sdmfw.get_checkstring())
3314        d.addCallback(lambda checkstring:
3315            self.assertThat(checkstring, Equals(self._expected_checkstring)))
3316        d.addCallback(lambda ignored:
3317            sdmfw.finish_publishing())
3318        def _then_again(results):
3319            self.assertTrue(results[0])
3320            read = self.ss.slot_readv
3321            self.assertThat(read(b"si1", [0], [(1, 8)]),
3322                                 Equals({0: [struct.pack(">Q", 1)]}))
3323            self.assertThat(read(b"si1", [0], [(9, len(data) - 9)]),
3324                                 Equals({0: [data[9:]]}))
3325        d.addCallback(_then_again)
3326        return d
3327
3328
3329class Stats(SyncTestCase):
3330
3331    def setUp(self):
3332        super(Stats, self).setUp()
3333        self.sparent = LoggingServiceParent()
3334        self._lease_secret = itertools.count()
3335        self.addCleanup(self.sparent.stopService)
3336
3337    def workdir(self, name):
3338        basedir = os.path.join("storage", "Server", name)
3339        return basedir
3340
3341    def create(self, name):
3342        workdir = self.workdir(name)
3343        ss = StorageServer(workdir, b"\x00" * 20)
3344        ss.setServiceParent(self.sparent)
3345        return ss
3346
3347    def test_latencies(self):
3348        ss = self.create("test_latencies")
3349        for i in range(10000):
3350            ss.add_latency("allocate", 1.0 * i)
3351        for i in range(1000):
3352            ss.add_latency("renew", 1.0 * i)
3353        for i in range(20):
3354            ss.add_latency("write", 1.0 * i)
3355        for i in range(10):
3356            ss.add_latency("cancel", 2.0 * i)
3357        ss.add_latency("get", 5.0)
3358
3359        output = ss.get_latencies()
3360
3361        self.assertThat(sorted(output.keys()),
3362                             Equals(sorted(["allocate", "renew", "cancel", "write", "get"])))
3363        self.assertThat(ss.latencies["allocate"], HasLength(1000))
3364        self.assertTrue(abs(output["allocate"]["mean"] - 9500) < 1, output)
3365        self.assertTrue(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
3366        self.assertTrue(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
3367        self.assertTrue(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
3368        self.assertTrue(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
3369        self.assertTrue(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
3370        self.assertTrue(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
3371        self.assertTrue(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
3372
3373        self.assertThat(ss.latencies["renew"], HasLength(1000))
3374        self.assertTrue(abs(output["renew"]["mean"] - 500) < 1, output)
3375        self.assertTrue(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
3376        self.assertTrue(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
3377        self.assertTrue(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
3378        self.assertTrue(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
3379        self.assertTrue(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
3380        self.assertTrue(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
3381        self.assertTrue(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
3382
3383        self.assertThat(ss.latencies["write"], HasLength(20))
3384        self.assertTrue(abs(output["write"]["mean"] - 9) < 1, output)
3385        self.assertTrue(output["write"]["01_0_percentile"] is None, output)
3386        self.assertTrue(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
3387        self.assertTrue(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
3388        self.assertTrue(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
3389        self.assertTrue(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
3390        self.assertTrue(output["write"]["99_0_percentile"] is None, output)
3391        self.assertTrue(output["write"]["99_9_percentile"] is None, output)
3392
3393        self.assertThat(ss.latencies["cancel"], HasLength(10))
3394        self.assertTrue(abs(output["cancel"]["mean"] - 9) < 1, output)
3395        self.assertTrue(output["cancel"]["01_0_percentile"] is None, output)
3396        self.assertTrue(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
3397        self.assertTrue(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
3398        self.assertTrue(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
3399        self.assertTrue(output["cancel"]["95_0_percentile"] is None, output)
3400        self.assertTrue(output["cancel"]["99_0_percentile"] is None, output)
3401        self.assertTrue(output["cancel"]["99_9_percentile"] is None, output)
3402
3403        self.assertThat(ss.latencies["get"], HasLength(1))
3404        self.assertTrue(output["get"]["mean"] is None, output)
3405        self.assertTrue(output["get"]["01_0_percentile"] is None, output)
3406        self.assertTrue(output["get"]["10_0_percentile"] is None, output)
3407        self.assertTrue(output["get"]["50_0_percentile"] is None, output)
3408        self.assertTrue(output["get"]["90_0_percentile"] is None, output)
3409        self.assertTrue(output["get"]["95_0_percentile"] is None, output)
3410        self.assertTrue(output["get"]["99_0_percentile"] is None, output)
3411        self.assertTrue(output["get"]["99_9_percentile"] is None, output)
3412
3413immutable_schemas = strategies.sampled_from(list(ALL_IMMUTABLE_SCHEMAS))
3414
3415class ShareFileTests(SyncTestCase):
3416    """Tests for allmydata.storage.immutable.ShareFile."""
3417
3418    def get_sharefile(self, **kwargs):
3419        sf = ShareFile(self.mktemp(), max_size=1000, create=True, **kwargs)
3420        sf.write_share_data(0, b"abc")
3421        sf.write_share_data(2, b"DEF")
3422        # Should be b'abDEF' now.
3423        return sf
3424
3425    @given(immutable_schemas)
3426    def test_read_write(self, schema):
3427        """Basic writes can be read."""
3428        sf = self.get_sharefile(schema=schema)
3429        self.assertEqual(sf.read_share_data(0, 3), b"abD")
3430        self.assertEqual(sf.read_share_data(1, 4), b"bDEF")
3431
3432    @given(immutable_schemas)
3433    def test_reads_beyond_file_end(self, schema):
3434        """Reads beyond the file size are truncated."""
3435        sf = self.get_sharefile(schema=schema)
3436        self.assertEqual(sf.read_share_data(0, 10), b"abDEF")
3437        self.assertEqual(sf.read_share_data(5, 10), b"")
3438
3439    @given(immutable_schemas)
3440    def test_too_large_write(self, schema):
3441        """Can't do write larger than file size."""
3442        sf = self.get_sharefile(schema=schema)
3443        with self.assertRaises(DataTooLargeError):
3444            sf.write_share_data(0, b"x" * 3000)
3445
3446    @given(immutable_schemas)
3447    def test_no_leases_cancelled(self, schema):
3448        """If no leases were cancelled, IndexError is raised."""
3449        sf = self.get_sharefile(schema=schema)
3450        with self.assertRaises(IndexError):
3451            sf.cancel_lease(b"garbage")
3452
3453    @given(immutable_schemas)
3454    def test_long_lease_count_format(self, schema):
3455        """
3456        ``ShareFile.__init__`` raises ``ValueError`` if the lease count format
3457        given is longer than one character.
3458        """
3459        with self.assertRaises(ValueError):
3460            self.get_sharefile(schema=schema, lease_count_format="BB")
3461
3462    @given(immutable_schemas)
3463    def test_large_lease_count_format(self, schema):
3464        """
3465        ``ShareFile.__init__`` raises ``ValueError`` if the lease count format
3466        encodes to a size larger than 8 bytes.
3467        """
3468        with self.assertRaises(ValueError):
3469            self.get_sharefile(schema=schema, lease_count_format="Q")
3470
3471    @given(immutable_schemas)
3472    def test_avoid_lease_overflow(self, schema):
3473        """
3474        If the share file already has the maximum number of leases supported then
3475        ``ShareFile.add_lease`` raises ``struct.error`` and makes no changes
3476        to the share file contents.
3477        """
3478        make_lease = partial(
3479            LeaseInfo,
3480            renew_secret=b"r" * 32,
3481            cancel_secret=b"c" * 32,
3482            expiration_time=2 ** 31,
3483        )
3484        # Make it a little easier to reach the condition by limiting the
3485        # number of leases to only 255.
3486        sf = self.get_sharefile(schema=schema, lease_count_format="B")
3487
3488        # Add the leases.
3489        for i in range(2 ** 8 - 1):
3490            lease = make_lease(owner_num=i)
3491            sf.add_lease(lease)
3492
3493        # Capture the state of the share file at this point so we can
3494        # determine whether the next operation modifies it or not.
3495        with open(sf.home, "rb") as f:
3496            before_data = f.read()
3497
3498        # It is not possible to add a 256th lease.
3499        lease = make_lease(owner_num=256)
3500        with self.assertRaises(struct.error):
3501            sf.add_lease(lease)
3502
3503        # Compare the share file state to what we captured earlier.  Any
3504        # change is a bug.
3505        with open(sf.home, "rb") as f:
3506            after_data = f.read()
3507
3508        self.assertEqual(before_data, after_data)
3509
3510    @given(immutable_schemas)
3511    def test_renew_secret(self, schema):
3512        """
3513        A lease loaded from an immutable share file at any schema version can have
3514        its renew secret verified.
3515        """
3516        renew_secret = b"r" * 32
3517        cancel_secret = b"c" * 32
3518        expiration_time = 2 ** 31
3519
3520        sf = self.get_sharefile(schema=schema)
3521        lease = LeaseInfo(
3522            owner_num=0,
3523            renew_secret=renew_secret,
3524            cancel_secret=cancel_secret,
3525            expiration_time=expiration_time,
3526        )
3527        sf.add_lease(lease)
3528        (loaded_lease,) = sf.get_leases()
3529        self.assertTrue(loaded_lease.is_renew_secret(renew_secret))
3530
3531    @given(immutable_schemas)
3532    def test_cancel_secret(self, schema):
3533        """
3534        A lease loaded from an immutable share file at any schema version can have
3535        its cancel secret verified.
3536        """
3537        renew_secret = b"r" * 32
3538        cancel_secret = b"c" * 32
3539        expiration_time = 2 ** 31
3540
3541        sf = self.get_sharefile(schema=schema)
3542        lease = LeaseInfo(
3543            owner_num=0,
3544            renew_secret=renew_secret,
3545            cancel_secret=cancel_secret,
3546            expiration_time=expiration_time,
3547        )
3548        sf.add_lease(lease)
3549        (loaded_lease,) = sf.get_leases()
3550        self.assertTrue(loaded_lease.is_cancel_secret(cancel_secret))
3551
3552mutable_schemas = strategies.sampled_from(list(ALL_MUTABLE_SCHEMAS))
3553
3554class MutableShareFileTests(SyncTestCase):
3555    """
3556    Tests for allmydata.storage.mutable.MutableShareFile.
3557    """
3558    def get_sharefile(self, **kwargs):
3559        return MutableShareFile(self.mktemp(), **kwargs)
3560
3561    @given(
3562        schema=mutable_schemas,
3563        nodeid=strategies.just(b"x" * 20),
3564        write_enabler=strategies.just(b"y" * 32),
3565        datav=strategies.lists(
3566            # Limit the max size of these so we don't write *crazy* amounts of
3567            # data to disk.
3568            strategies.tuples(offsets(), strategies.binary(max_size=2 ** 8)),
3569            max_size=2 ** 8,
3570        ),
3571        new_length=offsets(),
3572    )
3573    def test_readv_reads_share_data(self, schema, nodeid, write_enabler, datav, new_length):
3574        """
3575        ``MutableShareFile.readv`` returns bytes from the share data portion
3576        of the share file.
3577        """
3578        sf = self.get_sharefile(schema=schema)
3579        sf.create(my_nodeid=nodeid, write_enabler=write_enabler)
3580        sf.writev(datav=datav, new_length=new_length)
3581
3582        # Apply all of the writes to a simple in-memory buffer so we can
3583        # resolve the final state of the share data.  In particular, this
3584        # helps deal with overlapping writes which otherwise make it tricky to
3585        # figure out what data to expect to be able to read back.
3586        buf = BytesIO()
3587        for (offset, data) in datav:
3588            buf.seek(offset)
3589            buf.write(data)
3590        buf.truncate(new_length)
3591
3592        # Using that buffer, determine the expected result of a readv for all
3593        # of the data just written.
3594        def read_from_buf(offset, length):
3595            buf.seek(offset)
3596            return buf.read(length)
3597        expected_data = list(
3598            read_from_buf(offset, len(data))
3599            for (offset, data)
3600            in datav
3601        )
3602
3603        # Perform a read that gives back all of the data written to the share
3604        # file.
3605        read_vectors = list((offset, len(data)) for (offset, data) in datav)
3606        read_data = sf.readv(read_vectors)
3607
3608        # Make sure the read reproduces the value we computed using our local
3609        # buffer.
3610        self.assertEqual(expected_data, read_data)
3611
3612    @given(
3613        schema=mutable_schemas,
3614        nodeid=strategies.just(b"x" * 20),
3615        write_enabler=strategies.just(b"y" * 32),
3616        readv=strategies.lists(strategies.tuples(offsets(), lengths()), min_size=1),
3617        random=strategies.randoms(),
3618    )
3619    def test_readv_rejects_negative_length(self, schema, nodeid, write_enabler, readv, random):
3620        """
3621        If a negative length is given to ``MutableShareFile.readv`` in a read
3622        vector then ``AssertionError`` is raised.
3623        """
3624        # Pick a read vector to break with a negative value
3625        readv_index = random.randrange(len(readv))
3626        # Decide on whether we're breaking offset or length
3627        offset_or_length = random.randrange(2)
3628
3629        # A helper function that will take a valid offset and length and break
3630        # one of them.
3631        def corrupt(break_length, offset, length):
3632            if break_length:
3633                # length must not be 0 or flipping the sign does nothing
3634                # length must not be negative or flipping the sign *fixes* it
3635                assert length > 0
3636                return (offset, -length)
3637            else:
3638                if offset > 0:
3639                    # We can break offset just by flipping the sign.
3640                    return (-offset, length)
3641                else:
3642                    # Otherwise it has to be zero.  If it was negative, what's
3643                    # going on?
3644                    assert offset == 0
3645                    # Since we can't just flip the sign on 0 to break things,
3646                    # replace a 0 offset with a simple negative value.  All
3647                    # other negative values will be tested by the `offset > 0`
3648                    # case above.
3649                    return (-1, length)
3650
3651        # Break the read vector very slightly!
3652        broken_readv = readv[:]
3653        broken_readv[readv_index] = corrupt(
3654            offset_or_length,
3655            *broken_readv[readv_index]
3656        )
3657
3658        sf = self.get_sharefile(schema=schema)
3659        sf.create(my_nodeid=nodeid, write_enabler=write_enabler)
3660
3661        # A read with a broken read vector is an error.
3662        with self.assertRaises(AssertionError):
3663            sf.readv(broken_readv)
3664
3665
3666class LeaseInfoTests(SyncTestCase):
3667    """
3668    Tests for ``allmydata.storage.lease.LeaseInfo``.
3669    """
3670    def test_is_renew_secret(self):
3671        """
3672        ``LeaseInfo.is_renew_secret`` returns ``True`` if the value given is the
3673        renew secret.
3674        """
3675        renew_secret = b"r" * 32
3676        cancel_secret = b"c" * 32
3677        lease = LeaseInfo(
3678            owner_num=1,
3679            renew_secret=renew_secret,
3680            cancel_secret=cancel_secret,
3681        )
3682        self.assertTrue(lease.is_renew_secret(renew_secret))
3683
3684    def test_is_not_renew_secret(self):
3685        """
3686        ``LeaseInfo.is_renew_secret`` returns ``False`` if the value given is not
3687        the renew secret.
3688        """
3689        renew_secret = b"r" * 32
3690        cancel_secret = b"c" * 32
3691        lease = LeaseInfo(
3692            owner_num=1,
3693            renew_secret=renew_secret,
3694            cancel_secret=cancel_secret,
3695        )
3696        self.assertFalse(lease.is_renew_secret(cancel_secret))
3697
3698    def test_is_cancel_secret(self):
3699        """
3700        ``LeaseInfo.is_cancel_secret`` returns ``True`` if the value given is the
3701        cancel secret.
3702        """
3703        renew_secret = b"r" * 32
3704        cancel_secret = b"c" * 32
3705        lease = LeaseInfo(
3706            owner_num=1,
3707            renew_secret=renew_secret,
3708            cancel_secret=cancel_secret,
3709        )
3710        self.assertTrue(lease.is_cancel_secret(cancel_secret))
3711
3712    def test_is_not_cancel_secret(self):
3713        """
3714        ``LeaseInfo.is_cancel_secret`` returns ``False`` if the value given is not
3715        the cancel secret.
3716        """
3717        renew_secret = b"r" * 32
3718        cancel_secret = b"c" * 32
3719        lease = LeaseInfo(
3720            owner_num=1,
3721            renew_secret=renew_secret,
3722            cancel_secret=cancel_secret,
3723        )
3724        self.assertFalse(lease.is_cancel_secret(renew_secret))
3725
3726    @given(
3727        strategies.tuples(
3728            strategies.integers(min_value=0, max_value=2 ** 31 - 1),
3729            strategies.binary(min_size=32, max_size=32),
3730            strategies.binary(min_size=32, max_size=32),
3731            strategies.integers(min_value=0, max_value=2 ** 31 - 1),
3732            strategies.binary(min_size=20, max_size=20),
3733        ),
3734    )
3735    def test_immutable_size(self, initializer_args):
3736        """
3737        ``LeaseInfo.immutable_size`` returns the length of the result of
3738        ``LeaseInfo.to_immutable_data``.
3739
3740        ``LeaseInfo.mutable_size`` returns the length of the result of
3741        ``LeaseInfo.to_mutable_data``.
3742        """
3743        info = LeaseInfo(*initializer_args)
3744        self.expectThat(
3745            info.to_immutable_data(),
3746            HasLength(info.immutable_size()),
3747        )
3748        self.expectThat(
3749            info.to_mutable_data(),
3750            HasLength(info.mutable_size()),
3751        )
3752
3753
3754class WriteBufferTests(SyncTestCase):
3755    """Tests for ``_WriteBuffer``."""
3756
3757    @given(
3758        small_writes=strategies.lists(
3759            strategies.binary(min_size=1, max_size=20),
3760            min_size=10, max_size=20),
3761        batch_size=strategies.integers(min_value=5, max_value=10)
3762    )
3763    def test_write_buffer(self, small_writes: list[bytes], batch_size: int):
3764        """
3765        ``_WriteBuffer`` coalesces small writes into bigger writes based on
3766        the batch size.
3767        """
3768        wb = _WriteBuffer(batch_size)
3769        result = b""
3770        for data in small_writes:
3771            should_flush = wb.queue_write(data)
3772            if should_flush:
3773                flushed_offset, flushed_data = wb.flush()
3774                self.assertEqual(flushed_offset, len(result))
3775                # The flushed data is in batch sizes, or closest approximation
3776                # given queued inputs:
3777                self.assertTrue(batch_size <= len(flushed_data) < batch_size + len(data))
3778                result += flushed_data
3779
3780        # Final flush:
3781        remaining_length = wb.get_queued_bytes()
3782        flushed_offset, flushed_data = wb.flush()
3783        self.assertEqual(remaining_length, len(flushed_data))
3784        self.assertEqual(flushed_offset, len(result))
3785        result += flushed_data
3786
3787        self.assertEqual(result, b"".join(small_writes))
Note: See TracBrowser for help on using the repository browser.