source: trunk/src/allmydata/test/test_storage.py

Last change on this file was f9a65f8, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-12-21T01:07:01Z

trim usage of to-be-removed "future" library

  • Property mode set to 100644
File size: 149.7 KB
Line 
1"""
2Tests for allmydata.storage.
3
4Ported to Python 3.
5"""
6
7from __future__ import annotations
8from six import ensure_str
9
10from io import (
11    BytesIO,
12)
13import time
14import os.path
15import platform
16import stat
17import struct
18import shutil
19from functools import partial
20from uuid import uuid4
21
22def bchr(s):
23    return bytes([s])
24
25from testtools.matchers import (
26    Equals,
27    NotEquals,
28    Contains,
29    HasLength,
30    IsInstance,
31)
32
33from twisted.trial import unittest
34
35from twisted.internet import defer
36from twisted.internet.task import Clock
37
38from hypothesis import given, strategies, example
39
40import itertools
41from allmydata import interfaces
42from allmydata.util import fileutil, hashutil, base32
43from allmydata.storage.server import (
44    StorageServer, DEFAULT_RENEWAL_TIME, FoolscapStorageServer,
45)
46from allmydata.storage.shares import get_share_file
47from allmydata.storage.mutable import MutableShareFile
48from allmydata.storage.mutable_schema import (
49    ALL_SCHEMAS as ALL_MUTABLE_SCHEMAS,
50)
51from allmydata.storage.immutable import (
52    BucketWriter, BucketReader, ShareFile, FoolscapBucketWriter,
53    FoolscapBucketReader,
54)
55from allmydata.storage.immutable_schema import (
56    ALL_SCHEMAS as ALL_IMMUTABLE_SCHEMAS,
57)
58from allmydata.storage.common import storage_index_to_dir, \
59     UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \
60     si_b2a, si_a2b
61from allmydata.storage.lease import LeaseInfo
62from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
63     ReadBucketProxy, _WriteBuffer
64from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
65                                     LayoutInvalid, MDMFSIGNABLEHEADER, \
66                                     SIGNED_PREFIX, MDMFHEADER, \
67                                     MDMFOFFSETS, SDMFSlotWriteProxy, \
68                                     PRIVATE_KEY_SIZE, \
69                                     SIGNATURE_SIZE, \
70                                     VERIFICATION_KEY_SIZE, \
71                                     SHARE_HASH_CHAIN_SIZE
72from allmydata.interfaces import (
73    BadWriteEnablerError, DataTooLargeError, ConflictingWriteError,
74)
75from allmydata.test.no_network import NoNetworkServer
76from allmydata.storage_client import (
77    _StorageServer,
78)
79from .common import (
80    LoggingServiceParent,
81    ShouldFailMixin,
82    FakeDisk,
83    SyncTestCase,
84    AsyncTestCase,
85)
86
87from .common_util import FakeCanary
88from .common_storage import (
89    upload_immutable,
90    upload_mutable,
91)
92from .strategies import (
93    offsets,
94    lengths,
95)
96
97
98class UtilTests(SyncTestCase):
99    """Tests for allmydata.storage.common and .shares."""
100
101    def test_encoding(self):
102        """b2a/a2b are the same as base32."""
103        s = b"\xFF HELLO \xF3"
104        result = si_b2a(s)
105        self.assertThat(base32.b2a(s), Equals(result))
106        self.assertThat(si_a2b(result), Equals(s))
107
108    def test_storage_index_to_dir(self):
109        """storage_index_to_dir creates a native string path."""
110        s = b"\xFF HELLO \xF3"
111        path = storage_index_to_dir(s)
112        parts = os.path.split(path)
113        self.assertThat(parts[0], Equals(parts[1][:2]))
114        self.assertThat(path, IsInstance(str))
115
116    def test_get_share_file_mutable(self):
117        """A mutable share is identified by get_share_file()."""
118        path = self.mktemp()
119        msf = MutableShareFile(path)
120        msf.create(b"12", b"abc")  # arbitrary values
121        loaded = get_share_file(path)
122        self.assertThat(loaded, IsInstance(MutableShareFile))
123        self.assertThat(loaded.home, Equals(path))
124
125    def test_get_share_file_immutable(self):
126        """An immutable share is identified by get_share_file()."""
127        path = self.mktemp()
128        _ = ShareFile(path, max_size=1000, create=True)
129        loaded = get_share_file(path)
130        self.assertThat(loaded, IsInstance(ShareFile))
131        self.assertThat(loaded.home, Equals(path))
132
133
134class FakeStatsProvider(object):
135    def count(self, name, delta=1):
136        pass
137    def register_producer(self, producer):
138        pass
139
140
141class Bucket(SyncTestCase):
142    def make_workdir(self, name):
143        basedir = os.path.join("storage", "Bucket", name)
144        incoming = os.path.join(basedir, "tmp", "bucket")
145        final = os.path.join(basedir, "bucket")
146        fileutil.make_dirs(basedir)
147        fileutil.make_dirs(os.path.join(basedir, "tmp"))
148        return incoming, final
149
150    def bucket_writer_closed(self, bw, consumed):
151        pass
152    def add_latency(self, category, latency):
153        pass
154    def count(self, name, delta=1):
155        pass
156
157    def make_lease(self):
158        owner_num = 0
159        renew_secret = os.urandom(32)
160        cancel_secret = os.urandom(32)
161        expiration_time = time.time() + 5000
162        return LeaseInfo(owner_num, renew_secret, cancel_secret,
163                         expiration_time, b"\x00" * 20)
164
165    def test_create(self):
166        incoming, final = self.make_workdir("test_create")
167        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
168        bw.write(0, b"a"*25)
169        bw.write(25, b"b"*25)
170        bw.write(50, b"c"*25)
171        bw.write(75, b"d"*7)
172        bw.close()
173
174    def test_readwrite(self):
175        incoming, final = self.make_workdir("test_readwrite")
176        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
177        bw.write(0, b"a"*25)
178        bw.write(25, b"b"*25)
179        bw.write(50, b"c"*7) # last block may be short
180        bw.close()
181
182        # now read from it
183        br = BucketReader(self, bw.finalhome)
184        self.assertThat(br.read(0, 25), Equals(b"a"*25))
185        self.assertThat(br.read(25, 25), Equals(b"b"*25))
186        self.assertThat(br.read(50, 7), Equals(b"c"*7))
187
188    def test_write_past_size_errors(self):
189        """Writing beyond the size of the bucket throws an exception."""
190        for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]):
191            incoming, final = self.make_workdir(
192                "test_write_past_size_errors-{}".format(i)
193            )
194            bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
195            with self.assertRaises(DataTooLargeError):
196                bw.write(offset, b"a" * length)
197
198    @given(
199        maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
200        maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
201    )
202    def test_overlapping_writes_ok_if_matching(
203            self, maybe_overlapping_offset, maybe_overlapping_length
204    ):
205        """
206        Writes that overlap with previous writes are OK when the content is the
207        same.
208        """
209        length = 100
210        expected_data = b"".join(bchr(i) for i in range(100))
211        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
212        bw = BucketWriter(
213            self, incoming, final, length, self.make_lease(), Clock()
214        )
215        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
216        bw.write(10, expected_data[10:20])
217        bw.write(30, expected_data[30:40])
218        bw.write(50, expected_data[50:60])
219        # Then, an overlapping write but with matching data:
220        bw.write(
221            maybe_overlapping_offset,
222            expected_data[
223                maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
224            ]
225        )
226        # Now fill in the holes:
227        bw.write(0, expected_data[0:10])
228        bw.write(20, expected_data[20:30])
229        bw.write(40, expected_data[40:50])
230        bw.write(60, expected_data[60:])
231        bw.close()
232
233        br = BucketReader(self, bw.finalhome)
234        self.assertEqual(br.read(0, length), expected_data)
235
236    @given(
237        maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
238        maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
239    )
240    def test_overlapping_writes_not_ok_if_different(
241            self, maybe_overlapping_offset, maybe_overlapping_length
242    ):
243        """
244        Writes that overlap with previous writes fail with an exception if the
245        contents don't match.
246        """
247        length = 100
248        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
249        bw = BucketWriter(
250            self, incoming, final, length, self.make_lease(), Clock()
251        )
252        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
253        bw.write(10, b"1" * 10)
254        bw.write(30, b"1" * 10)
255        bw.write(50, b"1" * 10)
256        # Then, write something that might overlap with some of them, but
257        # conflicts. Then fill in holes left by first three writes. Conflict is
258        # inevitable.
259        with self.assertRaises(ConflictingWriteError):
260            bw.write(
261                maybe_overlapping_offset,
262                b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
263            )
264            bw.write(0, b"1" * 10)
265            bw.write(20, b"1" * 10)
266            bw.write(40, b"1" * 10)
267            bw.write(60, b"1" * 40)
268
269    @given(
270        offsets=strategies.lists(
271            strategies.integers(min_value=0, max_value=99),
272            min_size=20,
273            max_size=20
274        ),
275    )
276    @example(offsets=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 40, 70])
277    def test_writes_return_when_finished(
278            self, offsets
279    ):
280        """
281        The ``BucketWriter.write()`` return true if and only if the maximum
282        size has been reached via potentially overlapping writes.  The
283        remaining ranges can be checked via ``BucketWriter.required_ranges()``.
284        """
285        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
286        bw = BucketWriter(
287            self, incoming, final, 100, self.make_lease(), Clock()
288        )
289        local_written = [0] * 100
290        for offset in offsets:
291            length = min(30, 100 - offset)
292            data = b"1" * length
293            for i in range(offset, offset+length):
294                local_written[i] = 1
295            finished = bw.write(offset, data)
296            self.assertEqual(finished, sum(local_written) == 100)
297            required_ranges = bw.required_ranges()
298            for i in range(0, 100):
299                self.assertEqual(local_written[i] == 1, required_ranges.get(i) is None)
300
301    def test_read_past_end_of_share_data(self):
302        # test vector for immutable files (hard-coded contents of an immutable share
303        # file):
304
305        # The following immutable share file content is identical to that
306        # generated with storage.immutable.ShareFile from Tahoe-LAFS v1.8.2
307        # with share data == 'a'. The total size of this content is 85
308        # bytes.
309
310        containerdata = struct.pack('>LLL', 1, 1, 1)
311
312        # A Tahoe-LAFS storage client would send as the share_data a
313        # complicated string involving hash trees and a URI Extension Block
314        # -- see allmydata/immutable/layout.py . This test, which is
315        # simulating a client, just sends 'a'.
316        share_data = b'a'
317
318        ownernumber = struct.pack('>L', 0)
319        renewsecret  = b'THIS LETS ME RENEW YOUR FILE....'
320        assert len(renewsecret) == 32
321        cancelsecret = b'THIS LETS ME KILL YOUR FILE HAHA'
322        assert len(cancelsecret) == 32
323        expirationtime = struct.pack('>L', DEFAULT_RENEWAL_TIME) # 31 days in seconds
324
325        lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
326
327        share_file_data = containerdata + share_data + lease_data
328
329        incoming, final = self.make_workdir("test_read_past_end_of_share_data")
330
331        fileutil.write(final, share_file_data)
332
333        class MockStorageServer(object):
334            def add_latency(self, category, latency):
335                pass
336            def count(self, name, delta=1):
337                pass
338
339        mockstorageserver = MockStorageServer()
340
341        # Now read from it.
342        br = BucketReader(mockstorageserver, final)
343
344        self.assertThat(br.read(0, len(share_data)), Equals(share_data))
345
346        # Read past the end of share data to get the cancel secret.
347        read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
348
349        result_of_read = br.read(0, read_length)
350        self.assertThat(result_of_read, Equals(share_data))
351
352        result_of_read = br.read(0, len(share_data)+1)
353        self.assertThat(result_of_read, Equals(share_data))
354
355    def _assert_timeout_only_after_30_minutes(self, clock, bw):
356        """
357        The ``BucketWriter`` times out and is closed after 30 minutes, but not
358        sooner.
359        """
360        self.assertFalse(bw.closed)
361        # 29 minutes pass. Everything is fine.
362        for i in range(29):
363            clock.advance(60)
364            self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,))
365        # After the 30th minute, the bucket is closed due to lack of writes.
366        clock.advance(60)
367        self.assertTrue(bw.closed)
368
369    def test_bucket_expires_if_no_writes_for_30_minutes(self):
370        """
371        If a ``BucketWriter`` receives no writes for 30 minutes, it is removed.
372        """
373        incoming, final = self.make_workdir("test_bucket_expires")
374        clock = Clock()
375        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
376        self._assert_timeout_only_after_30_minutes(clock, bw)
377
378    def test_bucket_writes_delay_timeout(self):
379        """
380        So long as the ``BucketWriter`` receives writes, the the removal
381        timeout is put off.
382        """
383        incoming, final = self.make_workdir("test_bucket_writes_delay_timeout")
384        clock = Clock()
385        bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
386        # 29 minutes pass, getting close to the timeout...
387        clock.advance(29 * 60)
388        # .. but we receive a write! So that should delay the timeout again to
389        # another 30 minutes.
390        bw.write(0, b"hello")
391        self._assert_timeout_only_after_30_minutes(clock, bw)
392
393    def test_bucket_closing_cancels_timeout(self):
394        """
395        Closing cancels the ``BucketWriter`` timeout.
396        """
397        incoming, final = self.make_workdir("test_bucket_close_timeout")
398        clock = Clock()
399        bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
400        self.assertTrue(clock.getDelayedCalls())
401        bw.close()
402        self.assertFalse(clock.getDelayedCalls())
403
404    def test_bucket_aborting_cancels_timeout(self):
405        """
406        Closing cancels the ``BucketWriter`` timeout.
407        """
408        incoming, final = self.make_workdir("test_bucket_abort_timeout")
409        clock = Clock()
410        bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
411        self.assertTrue(clock.getDelayedCalls())
412        bw.abort()
413        self.assertFalse(clock.getDelayedCalls())
414
415
416class RemoteBucket(object):
417
418    def __init__(self, target):
419        self.target = target
420        self.read_count = 0
421        self.write_count = 0
422
423    def callRemote(self, methname, *args, **kwargs):
424        def _call():
425            meth = getattr(self.target, "remote_" + methname)
426            return meth(*args, **kwargs)
427
428        if methname == "slot_readv":
429            self.read_count += 1
430        if "writev" in methname:
431            self.write_count += 1
432
433        return defer.maybeDeferred(_call)
434
435
436class BucketProxy(AsyncTestCase):
437    def make_bucket(self, name, size):
438        basedir = os.path.join("storage", "BucketProxy", name)
439        incoming = os.path.join(basedir, "tmp", "bucket")
440        final = os.path.join(basedir, "bucket")
441        fileutil.make_dirs(basedir)
442        fileutil.make_dirs(os.path.join(basedir, "tmp"))
443        bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
444        rb = RemoteBucket(FoolscapBucketWriter(bw))
445        return bw, rb, final
446
447    def make_lease(self):
448        owner_num = 0
449        renew_secret = os.urandom(32)
450        cancel_secret = os.urandom(32)
451        expiration_time = time.time() + 5000
452        return LeaseInfo(owner_num, renew_secret, cancel_secret,
453                         expiration_time, b"\x00" * 20)
454
455    def bucket_writer_closed(self, bw, consumed):
456        pass
457    def add_latency(self, category, latency):
458        pass
459    def count(self, name, delta=1):
460        pass
461
462    def test_create(self):
463        bw, rb, sharefname = self.make_bucket("test_create", 500)
464        bp = WriteBucketProxy(rb, None,
465                              data_size=300,
466                              block_size=10,
467                              num_segments=5,
468                              num_share_hashes=3,
469                              uri_extension_size=500)
470        self.assertTrue(interfaces.IStorageBucketWriter.providedBy(bp), bp)
471
472    def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
473        # Let's pretend each share has 100 bytes of data, and that there are
474        # 4 segments (25 bytes each), and 8 shares total. So the two
475        # per-segment merkle trees (crypttext_hash_tree,
476        # block_hashes) will have 4 leaves and 7 nodes each. The per-share
477        # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
478        # nodes. Furthermore, let's assume the uri_extension is 500 bytes
479        # long. That should make the whole share:
480        #
481        # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
482        # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
483
484        sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
485
486        crypttext_hashes = [hashutil.tagged_hash(b"crypt", b"bar%d" % i)
487                            for i in range(7)]
488        block_hashes = [hashutil.tagged_hash(b"block", b"bar%d" % i)
489                        for i in range(7)]
490        share_hashes = [(i, hashutil.tagged_hash(b"share", b"bar%d" % i))
491                        for i in (1,9,13)]
492        uri_extension = b"s" + b"E"*498 + b"e"
493
494        bw, rb, sharefname = self.make_bucket(name, sharesize)
495        bp = wbp_class(rb, None,
496                       data_size=95,
497                       block_size=25,
498                       num_segments=4,
499                       num_share_hashes=3,
500                       uri_extension_size=len(uri_extension))
501
502        d = bp.put_header()
503        d.addCallback(lambda res: bp.put_block(0, b"a"*25))
504        d.addCallback(lambda res: bp.put_block(1, b"b"*25))
505        d.addCallback(lambda res: bp.put_block(2, b"c"*25))
506        d.addCallback(lambda res: bp.put_block(3, b"d"*20))
507        d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
508        d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
509        d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
510        d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
511        d.addCallback(lambda res: bp.close())
512
513        # now read everything back
514        def _start_reading(res):
515            br = BucketReader(self, sharefname)
516            rb = RemoteBucket(FoolscapBucketReader(br))
517            server = NoNetworkServer(b"abc", None)
518            rbp = rbp_class(rb, server, storage_index=b"")
519            self.assertThat(repr(rbp), Contains("to peer"))
520            self.assertTrue(interfaces.IStorageBucketReader.providedBy(rbp), rbp)
521
522            d1 = rbp.get_block_data(0, 25, 25)
523            d1.addCallback(lambda res: self.failUnlessEqual(res, b"a"*25))
524            d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
525            d1.addCallback(lambda res: self.failUnlessEqual(res, b"b"*25))
526            d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
527            d1.addCallback(lambda res: self.failUnlessEqual(res, b"c"*25))
528            d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
529            d1.addCallback(lambda res: self.failUnlessEqual(res, b"d"*20))
530
531            d1.addCallback(lambda res: rbp.get_crypttext_hashes())
532            d1.addCallback(lambda res:
533                           self.failUnlessEqual(res, crypttext_hashes))
534            d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
535            d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
536            d1.addCallback(lambda res: rbp.get_share_hashes())
537            d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
538            d1.addCallback(lambda res: rbp.get_uri_extension())
539            d1.addCallback(lambda res:
540                           self.failUnlessEqual(res, uri_extension))
541
542            return d1
543
544        d.addCallback(_start_reading)
545
546        return d
547
548    def test_readwrite_v1(self):
549        return self._do_test_readwrite("test_readwrite_v1",
550                                       0x24, WriteBucketProxy, ReadBucketProxy)
551
552    def test_readwrite_v2(self):
553        return self._do_test_readwrite("test_readwrite_v2",
554                                       0x44, WriteBucketProxy_v2, ReadBucketProxy)
555
556class Server(AsyncTestCase):
557
558    def setUp(self):
559        super(Server, self).setUp()
560        self.sparent = LoggingServiceParent()
561        self.sparent.startService()
562        self._lease_secret = itertools.count()
563        self.addCleanup(self.sparent.stopService)
564
565    def workdir(self, name):
566        basedir = os.path.join("storage", "Server", name)
567        return basedir
568
569    def create(self, name, reserved_space=0, klass=StorageServer, clock=None):
570        if clock is None:
571            clock = Clock()
572        workdir = self.workdir(name)
573        ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
574                   stats_provider=FakeStatsProvider(),
575                   clock=clock)
576        ss.setServiceParent(self.sparent)
577        return ss
578
579    def test_create(self):
580        self.create("test_create")
581
582    def test_declares_fixed_1528(self):
583        ss = self.create("test_declares_fixed_1528")
584        ver = ss.get_version()
585        sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
586        self.assertTrue(sv1.get(b'prevents-read-past-end-of-share-data'), sv1)
587
588    def test_declares_maximum_share_sizes(self):
589        ss = self.create("test_declares_maximum_share_sizes")
590        ver = ss.get_version()
591        sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
592        self.assertThat(sv1, Contains(b'maximum-immutable-share-size'))
593        self.assertThat(sv1, Contains(b'maximum-mutable-share-size'))
594
595    def test_declares_available_space(self):
596        ss = self.create("test_declares_available_space")
597        ver = ss.get_version()
598        sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
599        self.assertThat(sv1, Contains(b'available-space'))
600
601    def allocate(self, ss, storage_index, sharenums, size, renew_leases=True):
602        """
603        Call directly into the storage server's allocate_buckets implementation,
604        skipping the Foolscap layer.
605        """
606        renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
607        cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
608        if isinstance(ss, FoolscapStorageServer):
609            ss = ss._server
610        return ss.allocate_buckets(
611            storage_index,
612            renew_secret, cancel_secret,
613            sharenums, size,
614            renew_leases=renew_leases,
615        )
616
617    def test_large_share(self):
618        syslow = platform.system().lower()
619        if 'cygwin' in syslow or 'windows' in syslow or 'darwin' in syslow:
620            raise unittest.SkipTest("If your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X and Windows don't support efficient sparse files).")
621
622        avail = fileutil.get_available_space('.', 512*2**20)
623        if avail <= 4*2**30:
624            raise unittest.SkipTest("This test will spuriously fail if you have less than 4 GiB free on your filesystem.")
625
626        ss = self.create("test_large_share")
627
628        already,writers = self.allocate(ss, b"allocate", [0], 2**32+2)
629        self.assertThat(set(), Equals(already))
630        self.assertThat(set([0]), Equals(set(writers.keys())))
631
632        shnum, bucket = list(writers.items())[0]
633        # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
634        bucket.write(2**32, b"ab")
635        bucket.close()
636
637        readers = ss.get_buckets(b"allocate")
638        reader = readers[shnum]
639        self.assertThat(b"ab", Equals(reader.read(2**32, 2)))
640
641    def test_dont_overfill_dirs(self):
642        """
643        This test asserts that if you add a second share whose storage index
644        share lots of leading bits with an extant share (but isn't the exact
645        same storage index), this won't add an entry to the share directory.
646        """
647        ss = self.create("test_dont_overfill_dirs")
648        already, writers = self.allocate(ss, b"storageindex", [0], 10)
649        for i, wb in writers.items():
650            wb.write(0, b"%10d" % i)
651            wb.close()
652        storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
653                                "shares")
654        children_of_storedir = set(os.listdir(storedir))
655
656        # Now store another one under another storageindex that has leading
657        # chars the same as the first storageindex.
658        already, writers = self.allocate(ss, b"storageindey", [0], 10)
659        for i, wb in writers.items():
660            wb.write(0, b"%10d" % i)
661            wb.close()
662        storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
663                                "shares")
664        new_children_of_storedir = set(os.listdir(storedir))
665        self.assertThat(new_children_of_storedir, Equals(children_of_storedir))
666
667    def test_remove_incoming(self):
668        ss = self.create("test_remove_incoming")
669        already, writers = self.allocate(ss, b"vid", list(range(3)), 10)
670        for i,wb in writers.items():
671            wb.write(0, b"%10d" % i)
672            wb.close()
673        incoming_share_dir = wb.incominghome
674        incoming_bucket_dir = os.path.dirname(incoming_share_dir)
675        incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
676        incoming_dir = os.path.dirname(incoming_prefix_dir)
677        self.assertFalse(os.path.exists(incoming_bucket_dir), incoming_bucket_dir)
678        self.assertFalse(os.path.exists(incoming_prefix_dir), incoming_prefix_dir)
679        self.assertTrue(os.path.exists(incoming_dir), incoming_dir)
680
681    def test_abort(self):
682        # remote_abort, when called on a writer, should make sure that
683        # the allocated size of the bucket is not counted by the storage
684        # server when accounting for space.
685        ss = self.create("test_abort")
686        already, writers = self.allocate(ss, b"allocate", [0, 1, 2], 150)
687        self.assertThat(ss.allocated_size(), NotEquals(0))
688
689        # Now abort the writers.
690        for writer in writers.values():
691            writer.abort()
692        self.assertThat(ss.allocated_size(), Equals(0))
693
694    def test_immutable_length(self):
695        """
696        ``get_immutable_share_length()`` returns the length of an immutable
697        share, as does ``BucketWriter.get_length()``..
698        """
699        ss = self.create("test_immutable_length")
700        _, writers = self.allocate(ss, b"allocate", [22], 75)
701        bucket = writers[22]
702        bucket.write(0, b"X" * 75)
703        bucket.close()
704        self.assertThat(ss.get_immutable_share_length(b"allocate", 22), Equals(75))
705        self.assertThat(ss.get_buckets(b"allocate")[22].get_length(), Equals(75))
706
707    def test_allocate(self):
708        ss = self.create("test_allocate")
709
710        self.assertThat(ss.get_buckets(b"allocate"), Equals({}))
711
712        already,writers = self.allocate(ss, b"allocate", [0,1,2], 75)
713        self.assertThat(already, Equals(set()))
714        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
715
716        # while the buckets are open, they should not count as readable
717        self.assertThat(ss.get_buckets(b"allocate"), Equals({}))
718
719        # close the buckets
720        for i,wb in writers.items():
721            wb.write(0, b"%25d" % i)
722            wb.close()
723            # aborting a bucket that was already closed is a no-op
724            wb.abort()
725
726        # now they should be readable
727        b = ss.get_buckets(b"allocate")
728        self.assertThat(set(b.keys()), Equals(set([0,1,2])))
729        self.assertThat(b[0].read(0, 25), Equals(b"%25d" % 0))
730        b_str = str(b[0])
731        self.assertThat(b_str, Contains("BucketReader"))
732        self.assertThat(b_str, Contains("mfwgy33dmf2g 0"))
733
734        # now if we ask about writing again, the server should offer those
735        # three buckets as already present. It should offer them even if we
736        # don't ask about those specific ones.
737        already,writers = self.allocate(ss, b"allocate", [2,3,4], 75)
738        self.assertThat(already, Equals(set([0,1,2])))
739        self.assertThat(set(writers.keys()), Equals(set([3,4])))
740
741        # while those two buckets are open for writing, the server should
742        # refuse to offer them to uploaders
743
744        already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75)
745        self.assertThat(already2, Equals(set([0,1,2])))
746        self.assertThat(set(writers2.keys()), Equals(set([5])))
747
748        # aborting the writes should remove the tempfiles
749        for i,wb in writers2.items():
750            wb.abort()
751        already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75)
752        self.assertThat(already2, Equals(set([0,1,2])))
753        self.assertThat(set(writers2.keys()), Equals(set([5])))
754
755        for i,wb in writers2.items():
756            wb.abort()
757        for i,wb in writers.items():
758            wb.abort()
759
760    def test_allocate_without_lease_renewal(self):
761        """
762        ``StorageServer._allocate_buckets`` does not renew leases on existing
763        shares if ``renew_leases`` is ``False``.
764        """
765        first_lease = 456
766        second_lease = 543
767        storage_index = b"allocate"
768
769        clock = Clock()
770        clock.advance(first_lease)
771        ss = self.create(
772            "test_allocate_without_lease_renewal",
773            clock=clock,
774        )
775
776        # Put a share on there
777        already, writers = self.allocate(
778            ss, storage_index, [0], 1, renew_leases=False,
779        )
780        (writer,) = writers.values()
781        writer.write(0, b"x")
782        writer.close()
783
784        # It should have a lease granted at the current time.
785        shares = dict(ss.get_shares(storage_index))
786        self.assertEqual(
787            [first_lease],
788            list(
789                lease.get_grant_renew_time_time()
790                for lease
791                in ShareFile(shares[0]).get_leases()
792            ),
793        )
794
795        # Let some time pass so we can tell if the lease on share 0 is
796        # renewed.
797        clock.advance(second_lease)
798
799        # Put another share on there.
800        already, writers = self.allocate(
801            ss, storage_index, [1], 1, renew_leases=False,
802        )
803        (writer,) = writers.values()
804        writer.write(0, b"x")
805        writer.close()
806
807        # The first share's lease expiration time is unchanged.
808        shares = dict(ss.get_shares(storage_index))
809        self.assertThat(
810            [first_lease],
811            Equals(list(
812                lease.get_grant_renew_time_time()
813                for lease
814                in ShareFile(shares[0]).get_leases()
815            )),
816        )
817
818    def test_bad_container_version(self):
819        ss = self.create("test_bad_container_version")
820        a,w = self.allocate(ss, b"si1", [0], 10)
821        w[0].write(0, b"\xff"*10)
822        w[0].close()
823
824        fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0")
825        f = open(fn, "rb+")
826        f.seek(0)
827        f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
828        f.close()
829
830        ss.get_buckets(b"allocate")
831
832        e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
833                                  ss.get_buckets, b"si1")
834        self.assertThat(e.filename, Equals(fn))
835        self.assertThat(e.version, Equals(0))
836        self.assertThat(str(e), Contains("had unexpected version 0"))
837
838    def test_disconnect(self):
839        # simulate a disconnection
840        ss = FoolscapStorageServer(self.create("test_disconnect"))
841        renew_secret = b"r" * 32
842        cancel_secret = b"c" * 32
843        canary = FakeCanary()
844        already,writers = ss.remote_allocate_buckets(
845            b"disconnect",
846            renew_secret,
847            cancel_secret,
848            sharenums=[0,1,2],
849            allocated_size=75,
850            canary=canary,
851        )
852        self.assertThat(already, Equals(set()))
853        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
854        for (f,args,kwargs) in list(canary.disconnectors.values()):
855            f(*args, **kwargs)
856        del already
857        del writers
858
859        # that ought to delete the incoming shares
860        already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75)
861        self.assertThat(already, Equals(set()))
862        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
863
864    def test_reserved_space_immutable_lease(self):
865        """
866        If there is not enough available space to store an additional lease on an
867        immutable share then ``remote_add_lease`` fails with ``NoSpace`` when
868        an attempt is made to use it to create a new lease.
869        """
870        disk = FakeDisk(total=1024, used=0)
871        self.patch(fileutil, "get_disk_stats", disk.get_disk_stats)
872
873        ss = self.create("test_reserved_space_immutable_lease")
874
875        storage_index = b"x" * 16
876        renew_secret = b"r" * 32
877        cancel_secret = b"c" * 32
878        shares = {0: b"y" * 500}
879        upload_immutable(ss, storage_index, renew_secret, cancel_secret, shares)
880
881        # use up all the available space
882        disk.use(disk.available)
883
884        # Different secrets to produce a different lease, not a renewal.
885        renew_secret = b"R" * 32
886        cancel_secret = b"C" * 32
887        with self.assertRaises(interfaces.NoSpace):
888            ss.add_lease(storage_index, renew_secret, cancel_secret)
889
890    def test_reserved_space_mutable_lease(self):
891        """
892        If there is not enough available space to store an additional lease on a
893        mutable share then ``remote_add_lease`` fails with ``NoSpace`` when an
894        attempt is made to use it to create a new lease.
895        """
896        disk = FakeDisk(total=1024, used=0)
897        self.patch(fileutil, "get_disk_stats", disk.get_disk_stats)
898
899        ss = self.create("test_reserved_space_mutable_lease")
900
901        renew_secrets = iter(
902            "{}{}".format("r" * 31, i).encode("ascii")
903            for i
904            in range(5)
905        )
906
907        storage_index = b"x" * 16
908        write_enabler = b"w" * 32
909        cancel_secret = b"c" * 32
910        secrets = (write_enabler, next(renew_secrets), cancel_secret)
911        shares = {0: b"y" * 500}
912        upload_mutable(ss, storage_index, secrets, shares)
913
914        # use up all the available space
915        disk.use(disk.available)
916
917        # The upload created one lease.  There is room for three more leases
918        # in the share header.  Even if we're out of disk space, on a boring
919        # enough filesystem we can write these.
920        for i in range(3):
921            ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
922
923        # Having used all of the space for leases in the header, we would have
924        # to allocate storage for the next lease.  Since there is no space
925        # available, this must fail instead.
926        with self.assertRaises(interfaces.NoSpace):
927            ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
928
929
930    def test_reserved_space(self):
931        reserved = 10000
932        allocated = 0
933
934        def call_get_disk_stats(whichdir, reserved_space=0):
935            self.failUnlessEqual(reserved_space, reserved)
936            return {
937              'free_for_nonroot': 15000 - allocated,
938              'avail': max(15000 - allocated - reserved_space, 0),
939            }
940        self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
941
942        ss = FoolscapStorageServer(self.create("test_reserved_space", reserved_space=reserved))
943        # 15k available, 10k reserved, leaves 5k for shares
944
945        # a newly created and filled share incurs this much overhead, beyond
946        # the size we request.
947        OVERHEAD = 3*4
948        LEASE_SIZE = 4+32+32+4
949        renew_secret = b"r" * 32
950        cancel_secret = b"c" * 32
951        canary = FakeCanary()
952        already, writers = ss.remote_allocate_buckets(
953            b"vid1",
954            renew_secret,
955            cancel_secret,
956            sharenums=[0,1,2],
957            allocated_size=1000,
958            canary=canary,
959        )
960        self.assertThat(writers, HasLength(3))
961        # now the StorageServer should have 3000 bytes provisionally
962        # allocated, allowing only 2000 more to be claimed
963        self.assertThat(ss._server._bucket_writers, HasLength(3))
964
965        # allocating 1001-byte shares only leaves room for one
966        canary2 = FakeCanary()
967        already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
968        self.assertThat(writers2, HasLength(1))
969        self.assertThat(ss._server._bucket_writers, HasLength(4))
970
971        # we abandon the first set, so their provisional allocation should be
972        # returned
973        canary.disconnected()
974
975        self.assertThat(ss._server._bucket_writers, HasLength(1))
976        # now we have a provisional allocation of 1001 bytes
977
978        # and we close the second set, so their provisional allocation should
979        # become real, long-term allocation, and grows to include the
980        # overhead.
981        for bw in writers2.values():
982            bw.write(0, b"a"*25)
983            bw.close()
984        self.assertThat(ss._server._bucket_writers, HasLength(0))
985
986        # this also changes the amount reported as available by call_get_disk_stats
987        allocated = 1001 + OVERHEAD + LEASE_SIZE
988
989        # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
990        # 5000-1085=3915 free, therefore we can fit 39 100byte shares
991        canary3 = FakeCanary()
992        already3, writers3 = ss.remote_allocate_buckets(
993            b"vid3",
994            renew_secret,
995            cancel_secret,
996            sharenums=list(range(100)),
997            allocated_size=100,
998            canary=canary3,
999        )
1000        self.assertThat(writers3, HasLength(39))
1001        self.assertThat(ss._server._bucket_writers, HasLength(39))
1002
1003        canary3.disconnected()
1004
1005        self.assertThat(ss._server._bucket_writers, HasLength(0))
1006        ss._server.disownServiceParent()
1007        del ss
1008
1009    def test_seek(self):
1010        basedir = self.workdir("test_seek_behavior")
1011        fileutil.make_dirs(basedir)
1012        filename = os.path.join(basedir, "testfile")
1013        f = open(filename, "wb")
1014        f.write(b"start")
1015        f.close()
1016        # mode="w" allows seeking-to-create-holes, but truncates pre-existing
1017        # files. mode="a" preserves previous contents but does not allow
1018        # seeking-to-create-holes. mode="r+" allows both.
1019        f = open(filename, "rb+")
1020        f.seek(100)
1021        f.write(b"100")
1022        f.close()
1023        filelen = os.stat(filename)[stat.ST_SIZE]
1024        self.assertThat(filelen, Equals(100+3))
1025        f2 = open(filename, "rb")
1026        self.assertThat(f2.read(5), Equals(b"start"))
1027
1028    def create_bucket_5_shares(
1029            self, ss, storage_index, expected_already=0, expected_writers=5
1030    ):
1031        """
1032        Given a StorageServer, create a bucket with 5 shares and return renewal
1033        and cancellation secrets.
1034        """
1035        sharenums = list(range(5))
1036        size = 100
1037
1038        # Creating a bucket also creates a lease:
1039        rs, cs  = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1040                   hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1041        already, writers = ss.allocate_buckets(storage_index, rs, cs,
1042                                               sharenums, size)
1043        self.assertThat(already, HasLength(expected_already))
1044        self.assertThat(writers, HasLength(expected_writers))
1045        for wb in writers.values():
1046            wb.close()
1047        return rs, cs
1048
1049    def test_leases(self):
1050        ss = self.create("test_leases")
1051        sharenums = list(range(5))
1052        size = 100
1053
1054        # Create a bucket:
1055        rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
1056
1057        # Upload of an immutable implies creation of a single lease with the
1058        # supplied secrets.
1059        (lease,) = ss.get_leases(b"si0")
1060        self.assertTrue(lease.is_renew_secret(rs0))
1061
1062        rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
1063
1064        # take out a second lease on si1
1065        rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
1066        (lease1, lease2) = ss.get_leases(b"si1")
1067        self.assertTrue(lease1.is_renew_secret(rs1))
1068        self.assertTrue(lease2.is_renew_secret(rs2))
1069
1070        # and a third lease, using add-lease
1071        rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1072                     hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1073        ss.add_lease(b"si1", rs2a, cs2a)
1074        (lease1, lease2, lease3) = ss.get_leases(b"si1")
1075        self.assertTrue(lease1.is_renew_secret(rs1))
1076        self.assertTrue(lease2.is_renew_secret(rs2))
1077        self.assertTrue(lease3.is_renew_secret(rs2a))
1078
1079        # add-lease on a missing storage index is silently ignored
1080        self.assertThat(ss.add_lease(b"si18", b"", b""), Equals(None))
1081
1082        # check that si0 is readable
1083        readers = ss.get_buckets(b"si0")
1084        self.assertThat(readers, HasLength(5))
1085
1086        # renew the first lease. Only the proper renew_secret should work
1087        ss.renew_lease(b"si0", rs0)
1088        self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", cs0)
1089        self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", rs1)
1090
1091        # check that si0 is still readable
1092        readers = ss.get_buckets(b"si0")
1093        self.assertThat(readers, HasLength(5))
1094
1095        # There is no such method as remote_cancel_lease for now -- see
1096        # ticket #1528.
1097        self.assertFalse(hasattr(FoolscapStorageServer(ss), 'remote_cancel_lease'), \
1098                    "ss should not have a 'remote_cancel_lease' method/attribute")
1099
1100        # test overlapping uploads
1101        rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1102                   hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1103        rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
1104                   hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
1105        already,writers = ss.allocate_buckets(b"si3", rs3, cs3,
1106                                              sharenums, size)
1107        self.assertThat(already, HasLength(0))
1108        self.assertThat(writers, HasLength(5))
1109        already2,writers2 = ss.allocate_buckets(b"si3", rs4, cs4,
1110                                                sharenums, size)
1111        self.assertThat(already2, HasLength(0))
1112        self.assertThat(writers2, HasLength(0))
1113        for wb in writers.values():
1114            wb.close()
1115
1116        leases = list(ss.get_leases(b"si3"))
1117        self.assertThat(leases, HasLength(1))
1118
1119        already3,writers3 = ss.allocate_buckets(b"si3", rs4, cs4,
1120                                                sharenums, size)
1121        self.assertThat(already3, HasLength(5))
1122        self.assertThat(writers3, HasLength(0))
1123
1124        leases = list(ss.get_leases(b"si3"))
1125        self.assertThat(leases, HasLength(2))
1126
1127    def test_immutable_add_lease_renews(self):
1128        """
1129        Adding a lease on an already leased immutable with the same secret just
1130        renews it.
1131        """
1132        clock = Clock()
1133        clock.advance(123)
1134        ss = self.create("test_immutable_add_lease_renews", clock=clock)
1135
1136        # Start out with single lease created with bucket:
1137        renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
1138        [lease] = ss.get_leases(b"si0")
1139        self.assertThat(lease.get_expiration_time(), Equals(123 + DEFAULT_RENEWAL_TIME))
1140
1141        # Time passes:
1142        clock.advance(123456)
1143
1144        # Adding a lease with matching renewal secret just renews it:
1145        ss.add_lease(b"si0", renewal_secret, cancel_secret)
1146        [lease] = ss.get_leases(b"si0")
1147        self.assertThat(lease.get_expiration_time(), Equals(123 + 123456 + DEFAULT_RENEWAL_TIME))
1148
1149    def test_have_shares(self):
1150        """By default the StorageServer has no shares."""
1151        workdir = self.workdir("test_have_shares")
1152        ss = StorageServer(workdir, b"\x00" * 20, readonly_storage=True)
1153        self.assertFalse(ss.have_shares())
1154
1155    def test_readonly(self):
1156        workdir = self.workdir("test_readonly")
1157        ss = StorageServer(workdir, b"\x00" * 20, readonly_storage=True)
1158        ss.setServiceParent(self.sparent)
1159
1160        already,writers = self.allocate(ss, b"vid", [0,1,2], 75)
1161        self.assertThat(already, Equals(set()))
1162        self.assertThat(writers, Equals({}))
1163
1164        stats = ss.get_stats()
1165        self.assertThat(stats["storage_server.accepting_immutable_shares"], Equals(0))
1166        if "storage_server.disk_avail" in stats:
1167            # Some platforms may not have an API to get disk stats.
1168            # But if there are stats, readonly_storage means disk_avail=0
1169            self.assertThat(stats["storage_server.disk_avail"], Equals(0))
1170
1171    def test_discard(self):
1172        # discard is really only used for other tests, but we test it anyways
1173        workdir = self.workdir("test_discard")
1174        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1175        ss.setServiceParent(self.sparent)
1176
1177        already,writers = self.allocate(ss, b"vid", [0,1,2], 75)
1178        self.assertThat(already, Equals(set()))
1179        self.assertThat(set(writers.keys()), Equals(set([0,1,2])))
1180        for i,wb in writers.items():
1181            wb.write(0, b"%25d" % i)
1182            wb.close()
1183        # since we discard the data, the shares should be present but sparse.
1184        # Since we write with some seeks, the data we read back will be all
1185        # zeros.
1186        b = ss.get_buckets(b"vid")
1187        self.assertThat(set(b.keys()), Equals(set([0,1,2])))
1188        self.assertThat(b[0].read(0, 25), Equals(b"\x00" * 25))
1189
1190    def test_reserved_space_advise_corruption(self):
1191        """
1192        If there is no available space then ``remote_advise_corrupt_share`` does
1193        not write a corruption report.
1194        """
1195        disk = FakeDisk(total=1024, used=1024)
1196        self.patch(fileutil, "get_disk_stats", disk.get_disk_stats)
1197
1198        workdir = self.workdir("test_reserved_space_advise_corruption")
1199        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1200        ss.setServiceParent(self.sparent)
1201
1202        upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
1203        ss.advise_corrupt_share(b"immutable", b"si0", 0,
1204                                b"This share smells funny.\n")
1205
1206        self.assertThat(
1207            [],
1208            Equals(os.listdir(ss.corruption_advisory_dir)),
1209        )
1210
1211    def test_advise_corruption(self):
1212        workdir = self.workdir("test_advise_corruption")
1213        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1214        ss.setServiceParent(self.sparent)
1215
1216        si0_s = base32.b2a(b"si0")
1217        upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
1218        ss.advise_corrupt_share(b"immutable", b"si0", 0,
1219                                b"This share smells funny.\n")
1220        reportdir = os.path.join(workdir, "corruption-advisories")
1221        reports = os.listdir(reportdir)
1222        self.assertThat(reports, HasLength(1))
1223        report_si0 = reports[0]
1224        self.assertThat(report_si0, Contains(ensure_str(si0_s)))
1225        f = open(os.path.join(reportdir, report_si0), "rb")
1226        report = f.read()
1227        f.close()
1228        self.assertThat(report, Contains(b"type: immutable"))
1229        self.assertThat(report, Contains(b"storage_index: %s" % si0_s))
1230        self.assertThat(report, Contains(b"share_number: 0"))
1231        self.assertThat(report, Contains(b"This share smells funny."))
1232
1233        # test the RIBucketWriter version too
1234        si1_s = base32.b2a(b"si1")
1235        already,writers = self.allocate(ss, b"si1", [1], 75)
1236        self.assertThat(already, Equals(set()))
1237        self.assertThat(set(writers.keys()), Equals(set([1])))
1238        writers[1].write(0, b"data")
1239        writers[1].close()
1240
1241        b = ss.get_buckets(b"si1")
1242        self.assertThat(set(b.keys()), Equals(set([1])))
1243        b[1].advise_corrupt_share(b"This share tastes like dust.\n")
1244
1245        reports = os.listdir(reportdir)
1246        self.assertThat(reports, HasLength(2))
1247        report_si1 = [r for r in reports if si1_s.decode() in r][0]
1248        f = open(os.path.join(reportdir, report_si1), "rb")
1249        report = f.read()
1250        f.close()
1251        self.assertThat(report, Contains(b"type: immutable"))
1252        self.assertThat(report, Contains(b"storage_index: %s" % si1_s))
1253        self.assertThat(report, Contains(b"share_number: 1"))
1254        self.assertThat(report, Contains(b"This share tastes like dust."))
1255
1256    def test_advise_corruption_missing(self):
1257        """
1258        If a corruption advisory is received for a share that is not present on
1259        this server then it is not persisted.
1260        """
1261        workdir = self.workdir("test_advise_corruption_missing")
1262        ss = StorageServer(workdir, b"\x00" * 20, discard_storage=True)
1263        ss.setServiceParent(self.sparent)
1264
1265        # Upload one share for this storage index
1266        upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
1267
1268        # And try to submit a corruption advisory about a different share
1269        ss.advise_corrupt_share(b"immutable", b"si0", 1,
1270                                b"This share smells funny.\n")
1271
1272        self.assertThat(
1273            [],
1274            Equals(os.listdir(ss.corruption_advisory_dir)),
1275        )
1276
1277
1278class MutableServer(SyncTestCase):
1279
1280    def setUp(self):
1281        super(MutableServer, self).setUp()
1282        self.sparent = LoggingServiceParent()
1283        self._lease_secret = itertools.count()
1284        self.addCleanup(self.sparent.stopService)
1285
1286    def workdir(self, name):
1287        basedir = os.path.join("storage", "MutableServer", name)
1288        return basedir
1289
1290    def create(self, name, clock=None):
1291        workdir = self.workdir(name)
1292        if clock is None:
1293            clock = Clock()
1294        ss = StorageServer(workdir, b"\x00" * 20,
1295                           clock=clock)
1296        ss.setServiceParent(self.sparent)
1297        return ss
1298
1299    def test_create(self):
1300        self.create("test_create")
1301
1302    def write_enabler(self, we_tag):
1303        return hashutil.tagged_hash(b"we_blah", we_tag)
1304
1305    def renew_secret(self, tag):
1306        if isinstance(tag, int):
1307            tag = b"%d" % (tag,)
1308        self.assertThat(tag, IsInstance(bytes))
1309        return hashutil.tagged_hash(b"renew_blah", tag)
1310
1311    def cancel_secret(self, tag):
1312        if isinstance(tag, int):
1313            tag = b"%d" % (tag,)
1314        self.assertThat(tag, IsInstance(bytes))
1315        return hashutil.tagged_hash(b"cancel_blah", tag)
1316
1317    def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
1318        write_enabler = self.write_enabler(we_tag)
1319        renew_secret = self.renew_secret(lease_tag)
1320        cancel_secret = self.cancel_secret(lease_tag)
1321        rstaraw = ss.slot_testv_and_readv_and_writev
1322        testandwritev = dict( [ (shnum, ([], [], None) )
1323                         for shnum in sharenums ] )
1324        readv = []
1325        rc = rstaraw(storage_index,
1326                     (write_enabler, renew_secret, cancel_secret),
1327                     testandwritev,
1328                     readv)
1329        (did_write, readv_data) = rc
1330        self.assertTrue(did_write)
1331        self.assertThat(readv_data, IsInstance(dict))
1332        self.assertThat(readv_data, HasLength(0))
1333
1334    def test_enumerate_mutable_shares(self):
1335        """
1336        ``StorageServer.enumerate_mutable_shares()`` returns a set of share
1337        numbers for the given storage index, or an empty set if it does not
1338        exist at all.
1339        """
1340        ss = self.create("test_enumerate_mutable_shares")
1341
1342        # Initially, nothing exists:
1343        empty = ss.enumerate_mutable_shares(b"si1")
1344
1345        self.allocate(ss, b"si1", b"we1", b"le1", [0, 1, 4, 2], 12)
1346        shares0_1_2_4 = ss.enumerate_mutable_shares(b"si1")
1347
1348        # Remove share 2, by setting size to 0:
1349        secrets = (self.write_enabler(b"we1"),
1350                   self.renew_secret(b"le1"),
1351                   self.cancel_secret(b"le1"))
1352        ss.slot_testv_and_readv_and_writev(b"si1", secrets, {2: ([], [], 0)}, [])
1353        shares0_1_4 = ss.enumerate_mutable_shares(b"si1")
1354        self.assertThat(
1355            (empty, shares0_1_2_4, shares0_1_4),
1356            Equals((set(), {0, 1, 2, 4}, {0, 1, 4}))
1357        )
1358
1359    def test_mutable_share_length(self):
1360        """``get_mutable_share_length()`` returns the length of the share."""
1361        ss = self.create("test_mutable_share_length")
1362        self.allocate(ss, b"si1", b"we1", b"le1", [16], 23)
1363        ss.slot_testv_and_readv_and_writev(
1364            b"si1", (self.write_enabler(b"we1"),
1365                     self.renew_secret(b"le1"),
1366                     self.cancel_secret(b"le1")),
1367            {16: ([], [(0, b"x" * 23)], None)},
1368            []
1369        )
1370        self.assertThat(ss.get_mutable_share_length(b"si1", 16), Equals(23))
1371
1372    def test_mutable_share_length_unknown(self):
1373        """
1374        ``get_mutable_share_length()`` raises a ``KeyError`` on unknown shares.
1375        """
1376        ss = self.create("test_mutable_share_length_unknown")
1377        self.allocate(ss, b"si1", b"we1", b"le1", [16], 23)
1378        ss.slot_testv_and_readv_and_writev(
1379            b"si1", (self.write_enabler(b"we1"),
1380                     self.renew_secret(b"le1"),
1381                     self.cancel_secret(b"le1")),
1382            {16: ([], [(0, b"x" * 23)], None)},
1383            []
1384        )
1385        with self.assertRaises(KeyError):
1386            # Wrong share number.
1387            ss.get_mutable_share_length(b"si1", 17)
1388        with self.assertRaises(KeyError):
1389            # Wrong storage index
1390            ss.get_mutable_share_length(b"unknown", 16)
1391
1392    def test_bad_magic(self):
1393        ss = self.create("test_bad_magic")
1394        self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0]), 10)
1395        fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0")
1396        f = open(fn, "rb+")
1397        f.seek(0)
1398        f.write(b"BAD MAGIC")
1399        f.close()
1400        read = ss.slot_readv
1401        e = self.failUnlessRaises(UnknownMutableContainerVersionError,
1402                                  read, b"si1", [0], [(0,10)])
1403        self.assertThat(e.filename, Equals(fn))
1404        self.assertTrue(e.version.startswith(b"BAD MAGIC"))
1405        self.assertThat(str(e), Contains("had unexpected version"))
1406        self.assertThat(str(e), Contains("BAD MAGIC"))
1407
1408    def test_container_size(self):
1409        ss = self.create("test_container_size")
1410        self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
1411                      set([0,1,2]), 100)
1412        read = ss.slot_readv
1413        rstaraw = ss.slot_testv_and_readv_and_writev
1414        secrets = ( self.write_enabler(b"we1"),
1415                    self.renew_secret(b"we1"),
1416                    self.cancel_secret(b"we1") )
1417        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1418        answer = rstaraw(b"si1", secrets,
1419                         {0: ([], [(0,data)], len(data)+12)},
1420                         [])
1421        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1422
1423        # Trying to make the container too large (by sending a write vector
1424        # whose offset is too high) will raise an exception.
1425        TOOBIG = MutableShareFile.MAX_SIZE + 10
1426        self.failUnlessRaises(DataTooLargeError,
1427                              rstaraw, b"si1", secrets,
1428                              {0: ([], [(TOOBIG,data)], None)},
1429                              [])
1430
1431        answer = rstaraw(b"si1", secrets,
1432                         {0: ([], [(0,data)], None)},
1433                         [])
1434        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1435
1436        read_answer = read(b"si1", [0], [(0,10)])
1437        self.assertThat(read_answer, Equals({0: [data[:10]]}))
1438
1439        # Sending a new_length shorter than the current length truncates the
1440        # data.
1441        answer = rstaraw(b"si1", secrets,
1442                         {0: ([], [], 9)},
1443                         [])
1444        read_answer = read(b"si1", [0], [(0,10)])
1445        self.assertThat(read_answer, Equals({0: [data[:9]]}))
1446
1447        # Sending a new_length longer than the current length doesn't change
1448        # the data.
1449        answer = rstaraw(b"si1", secrets,
1450                         {0: ([], [], 20)},
1451                         [])
1452        assert answer == (True, {0:[],1:[],2:[]})
1453        read_answer = read(b"si1", [0], [(0, 20)])
1454        self.assertThat(read_answer, Equals({0: [data[:9]]}))
1455
1456        # Sending a write vector whose start is after the end of the current
1457        # data doesn't reveal "whatever was there last time" (palimpsest),
1458        # but instead fills with zeroes.
1459
1460        # To test this, we fill the data area with a recognizable pattern.
1461        pattern = u''.join([chr(i) for i in range(100)]).encode("utf-8")
1462        answer = rstaraw(b"si1", secrets,
1463                         {0: ([], [(0, pattern)], None)},
1464                         [])
1465        assert answer == (True, {0:[],1:[],2:[]})
1466        # Then truncate the data...
1467        answer = rstaraw(b"si1", secrets,
1468                         {0: ([], [], 20)},
1469                         [])
1470        assert answer == (True, {0:[],1:[],2:[]})
1471        # Just confirm that you get an empty string if you try to read from
1472        # past the (new) endpoint now.
1473        answer = rstaraw(b"si1", secrets,
1474                         {0: ([], [], None)},
1475                         [(20, 1980)])
1476        self.assertThat(answer, Equals((True, {0:[b''],1:[b''],2:[b'']})))
1477
1478        # Then the extend the file by writing a vector which starts out past
1479        # the end...
1480        answer = rstaraw(b"si1", secrets,
1481                         {0: ([], [(50, b'hellothere')], None)},
1482                         [])
1483        assert answer == (True, {0:[],1:[],2:[]})
1484        # Now if you read the stuff between 20 (where we earlier truncated)
1485        # and 50, it had better be all zeroes.
1486        answer = rstaraw(b"si1", secrets,
1487                         {0: ([], [], None)},
1488                         [(20, 30)])
1489        self.assertThat(answer, Equals((True, {0:[b'\x00'*30],1:[b''],2:[b'']})))
1490
1491        # Also see if the server explicitly declares that it supports this
1492        # feature.
1493        ver = ss.get_version()
1494        storage_v1_ver = ver[b"http://allmydata.org/tahoe/protocols/storage/v1"]
1495        self.assertTrue(storage_v1_ver.get(b"fills-holes-with-zero-bytes"))
1496
1497        # If the size is dropped to zero the share is deleted.
1498        answer = rstaraw(b"si1", secrets,
1499                         {0: ([], [(0,data)], 0)},
1500                         [])
1501        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1502
1503        read_answer = read(b"si1", [0], [(0,10)])
1504        self.assertThat(read_answer, Equals({}))
1505
1506    def test_allocate(self):
1507        ss = self.create("test_allocate")
1508        self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
1509                      set([0,1,2]), 100)
1510
1511        read = ss.slot_readv
1512        self.assertThat(read(b"si1", [0], [(0, 10)]),
1513                             Equals({0: [b""]}))
1514        self.assertThat(read(b"si1", [], [(0, 10)]),
1515                             Equals({0: [b""], 1: [b""], 2: [b""]}))
1516        self.assertThat(read(b"si1", [0], [(100, 10)]),
1517                             Equals({0: [b""]}))
1518
1519        # try writing to one
1520        secrets = ( self.write_enabler(b"we1"),
1521                    self.renew_secret(b"we1"),
1522                    self.cancel_secret(b"we1") )
1523        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1524        write = ss.slot_testv_and_readv_and_writev
1525        answer = write(b"si1", secrets,
1526                       {0: ([], [(0,data)], None)},
1527                       [])
1528        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1529
1530        self.assertThat(read(b"si1", [0], [(0,20)]),
1531                             Equals({0: [b"00000000001111111111"]}))
1532        self.assertThat(read(b"si1", [0], [(95,10)]),
1533                             Equals({0: [b"99999"]}))
1534        #self.failUnlessEqual(s0.get_length(), 100)
1535
1536        bad_secrets = (b"bad write enabler", secrets[1], secrets[2])
1537        f = self.failUnlessRaises(BadWriteEnablerError,
1538                                  write, b"si1", bad_secrets,
1539                                  {}, [])
1540        self.assertThat(str(f), Contains("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'."))
1541
1542        # this testv should fail
1543        answer = write(b"si1", secrets,
1544                       {0: ([(0, 12, b"eq", b"444444444444"),
1545                             (20, 5, b"eq", b"22222"),
1546                             ],
1547                            [(0, b"x"*100)],
1548                            None),
1549                        },
1550                       [(0,12), (20,5)],
1551                       )
1552        self.assertThat(answer, Equals((False,
1553                                      {0: [b"000000000011", b"22222"],
1554                                       1: [b"", b""],
1555                                       2: [b"", b""],
1556                                       })))
1557        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [data]}))
1558
1559    def test_operators(self):
1560        # test operators, the data we're comparing is '11111' in all cases.
1561        # test both fail+pass, reset data after each one.
1562        ss = self.create("test_operators")
1563
1564        secrets = ( self.write_enabler(b"we1"),
1565                    self.renew_secret(b"we1"),
1566                    self.cancel_secret(b"we1") )
1567        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1568        write = ss.slot_testv_and_readv_and_writev
1569        read = ss.slot_readv
1570
1571        def reset():
1572            write(b"si1", secrets,
1573                  {0: ([], [(0,data)], None)},
1574                  [])
1575
1576        reset()
1577
1578        #  eq
1579        answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11112"),
1580                                             ],
1581                                            [(0, b"x"*100)],
1582                                            None,
1583                                            )}, [(10,5)])
1584        self.assertThat(answer, Equals((False, {0: [b"11111"]})))
1585        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [data]}))
1586        reset()
1587
1588        answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11111"),
1589                                             ],
1590                                            [(0, b"y"*100)],
1591                                            None,
1592                                            )}, [(10,5)])
1593        self.assertThat(answer, Equals((True, {0: [b"11111"]})))
1594        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [b"y"*100]}))
1595        reset()
1596
1597        # finally, test some operators against empty shares
1598        answer = write(b"si1", secrets, {1: ([(10, 5, b"eq", b"11112"),
1599                                             ],
1600                                            [(0, b"x"*100)],
1601                                            None,
1602                                            )}, [(10,5)])
1603        self.assertThat(answer, Equals((False, {0: [b"11111"]})))
1604        self.assertThat(read(b"si1", [0], [(0,100)]), Equals({0: [data]}))
1605        reset()
1606
1607    def test_readv(self):
1608        ss = self.create("test_readv")
1609        secrets = ( self.write_enabler(b"we1"),
1610                    self.renew_secret(b"we1"),
1611                    self.cancel_secret(b"we1") )
1612        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1613        write = ss.slot_testv_and_readv_and_writev
1614        read = ss.slot_readv
1615        data = [(b"%d" % i) * 100 for i in range(3)]
1616        rc = write(b"si1", secrets,
1617                   {0: ([], [(0,data[0])], None),
1618                    1: ([], [(0,data[1])], None),
1619                    2: ([], [(0,data[2])], None),
1620                    }, [])
1621        self.assertThat(rc, Equals((True, {})))
1622
1623        answer = read(b"si1", [], [(0, 10)])
1624        self.assertThat(answer, Equals({0: [b"0"*10],
1625                                      1: [b"1"*10],
1626                                      2: [b"2"*10]}))
1627
1628    def compare_leases_without_timestamps(self, leases_a, leases_b):
1629        """
1630        Assert that, except for expiration times, ``leases_a`` contains the same
1631        lease information as ``leases_b``.
1632        """
1633        for a, b in zip(leases_a, leases_b):
1634            # The leases aren't always of the same type (though of course
1635            # corresponding elements in the two lists should be of the same
1636            # type as each other) so it's inconvenient to just reach in and
1637            # normalize the expiration timestamp.  We don't want to call
1638            # `renew` on both objects to normalize the expiration timestamp in
1639            # case `renew` is broken and gives us back equal outputs from
1640            # non-equal inputs (expiration timestamp aside).  It seems
1641            # reasonably safe to use `renew` to make _one_ of the timestamps
1642            # equal to the other though.
1643            self.assertThat(
1644                a.renew(b.get_expiration_time()),
1645                Equals(b),
1646            )
1647        self.assertThat(len(leases_a), Equals(len(leases_b)))
1648
1649    def test_leases(self):
1650        ss = self.create("test_leases")
1651        def secrets(n):
1652            return ( self.write_enabler(b"we1"),
1653                     self.renew_secret(b"we1-%d" % n),
1654                     self.cancel_secret(b"we1-%d" % n) )
1655        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1656        write = ss.slot_testv_and_readv_and_writev
1657        read = ss.slot_readv
1658        rc = write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1659        self.assertThat(rc, Equals((True, {})))
1660
1661        # create a random non-numeric file in the bucket directory, to
1662        # exercise the code that's supposed to ignore those.
1663        bucket_dir = os.path.join(self.workdir("test_leases"),
1664                                  "shares", storage_index_to_dir(b"si1"))
1665        f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1666        f.write("you ought to be ignoring me\n")
1667        f.close()
1668
1669        s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1670        self.assertThat(list(s0.get_leases()), HasLength(1))
1671
1672        # add-lease on a missing storage index is silently ignored
1673        self.assertThat(ss.add_lease(b"si18", b"", b""), Equals(None))
1674
1675        # re-allocate the slots and use the same secrets, that should update
1676        # the lease
1677        write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1678        self.assertThat(list(s0.get_leases()), HasLength(1))
1679
1680        # renew it directly
1681        ss.renew_lease(b"si1", secrets(0)[1])
1682        self.assertThat(list(s0.get_leases()), HasLength(1))
1683
1684        # now allocate them with a bunch of different secrets, to trigger the
1685        # extended lease code. Use add_lease for one of them.
1686        write(b"si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1687        self.assertThat(list(s0.get_leases()), HasLength(2))
1688        secrets2 = secrets(2)
1689        ss.add_lease(b"si1", secrets2[1], secrets2[2])
1690        self.assertThat(list(s0.get_leases()), HasLength(3))
1691        write(b"si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1692        write(b"si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1693        write(b"si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1694
1695        self.assertThat(list(s0.get_leases()), HasLength(6))
1696
1697        all_leases = list(s0.get_leases())
1698        # and write enough data to expand the container, forcing the server
1699        # to move the leases
1700        write(b"si1", secrets(0),
1701              {0: ([], [(0,data)], 200), },
1702              [])
1703
1704        # read back the leases, make sure they're still intact.
1705        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1706
1707        ss.renew_lease(b"si1", secrets(0)[1])
1708        ss.renew_lease(b"si1", secrets(1)[1])
1709        ss.renew_lease(b"si1", secrets(2)[1])
1710        ss.renew_lease(b"si1", secrets(3)[1])
1711        ss.renew_lease(b"si1", secrets(4)[1])
1712        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1713        # get a new copy of the leases, with the current timestamps. Reading
1714        # data and failing to renew/cancel leases should leave the timestamps
1715        # alone.
1716        all_leases = list(s0.get_leases())
1717        # renewing with a bogus token should prompt an error message
1718
1719        # examine the exception thus raised, make sure the old nodeid is
1720        # present, to provide for share migration
1721        e = self.failUnlessRaises(IndexError,
1722                                  ss.renew_lease, b"si1",
1723                                  secrets(20)[1])
1724        e_s = str(e)
1725        self.assertThat(e_s, Contains("Unable to renew non-existent lease"))
1726        self.assertThat(e_s, Contains("I have leases accepted by nodeids:"))
1727        self.assertThat(e_s, Contains("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ."))
1728
1729        self.assertThat(all_leases, Equals(list(s0.get_leases())))
1730
1731        # reading shares should not modify the timestamp
1732        read(b"si1", [], [(0,200)])
1733        self.assertThat(all_leases, Equals(list(s0.get_leases())))
1734
1735        write(b"si1", secrets(0),
1736              {0: ([], [(200, b"make me bigger")], None)}, [])
1737        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1738
1739        write(b"si1", secrets(0),
1740              {0: ([], [(500, b"make me really bigger")], None)}, [])
1741        self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
1742
1743    def test_mutable_add_lease_renews(self):
1744        """
1745        Adding a lease on an already leased mutable with the same secret just
1746        renews it.
1747        """
1748        clock = Clock()
1749        clock.advance(235)
1750        ss = self.create("test_mutable_add_lease_renews",
1751                         clock=clock)
1752        def secrets(n):
1753            return ( self.write_enabler(b"we1"),
1754                     self.renew_secret(b"we1-%d" % n),
1755                     self.cancel_secret(b"we1-%d" % n) )
1756        data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
1757        write = ss.slot_testv_and_readv_and_writev
1758        write_enabler, renew_secret, cancel_secret = secrets(0)
1759        rc = write(b"si1", (write_enabler, renew_secret, cancel_secret),
1760                   {0: ([], [(0,data)], None)}, [])
1761        self.assertThat(rc, Equals((True, {})))
1762
1763        bucket_dir = os.path.join(self.workdir("test_mutable_add_lease_renews"),
1764                                  "shares", storage_index_to_dir(b"si1"))
1765        s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1766        [lease] = s0.get_leases()
1767        self.assertThat(lease.get_expiration_time(), Equals(235 + DEFAULT_RENEWAL_TIME))
1768
1769        # Time passes...
1770        clock.advance(835)
1771
1772        # Adding a lease renews it:
1773        ss.add_lease(b"si1", renew_secret, cancel_secret)
1774        [lease] = s0.get_leases()
1775        self.assertThat(lease.get_expiration_time(),
1776                         Equals(235 + 835 + DEFAULT_RENEWAL_TIME))
1777
1778    def test_remove(self):
1779        ss = self.create("test_remove")
1780        self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
1781                      set([0,1,2]), 100)
1782        readv = ss.slot_readv
1783        writev = ss.slot_testv_and_readv_and_writev
1784        secrets = ( self.write_enabler(b"we1"),
1785                    self.renew_secret(b"we1"),
1786                    self.cancel_secret(b"we1") )
1787        # delete sh0 by setting its size to zero
1788        answer = writev(b"si1", secrets,
1789                        {0: ([], [], 0)},
1790                        [])
1791        # the answer should mention all the shares that existed before the
1792        # write
1793        self.assertThat(answer, Equals((True, {0:[],1:[],2:[]})))
1794        # but a new read should show only sh1 and sh2
1795        self.assertThat(readv(b"si1", [], [(0,10)]),
1796                             Equals({1: [b""], 2: [b""]}))
1797
1798        # delete sh1 by setting its size to zero
1799        answer = writev(b"si1", secrets,
1800                        {1: ([], [], 0)},
1801                        [])
1802        self.assertThat(answer, Equals((True, {1:[],2:[]})))
1803        self.assertThat(readv(b"si1", [], [(0,10)]),
1804                             Equals({2: [b""]}))
1805
1806        # delete sh2 by setting its size to zero
1807        answer = writev(b"si1", secrets,
1808                        {2: ([], [], 0)},
1809                        [])
1810        self.assertThat(answer, Equals((True, {2:[]})))
1811        self.assertThat(readv(b"si1", [], [(0,10)]),
1812                             Equals({}))
1813        # and the bucket directory should now be gone
1814        si = base32.b2a(b"si1").decode()
1815        # note: this is a detail of the storage server implementation, and
1816        # may change in the future
1817        # filesystem paths are native strings
1818        prefix = si[:2]
1819        prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1820        bucketdir = os.path.join(prefixdir, si)
1821        self.assertTrue(os.path.exists(prefixdir), prefixdir)
1822        self.assertFalse(os.path.exists(bucketdir), bucketdir)
1823
1824    def test_writev_without_renew_lease(self):
1825        """
1826        The helper method ``slot_testv_and_readv_and_writev`` does not renew
1827        leases if ``False`` is passed for the ``renew_leases`` parameter.
1828        """
1829        ss = self.create("test_writev_without_renew_lease")
1830
1831        storage_index = b"si2"
1832        secrets = (
1833            self.write_enabler(storage_index),
1834            self.renew_secret(storage_index),
1835            self.cancel_secret(storage_index),
1836        )
1837
1838        sharenum = 3
1839        datav = [(0, b"Hello, world")]
1840
1841        ss.slot_testv_and_readv_and_writev(
1842            storage_index=storage_index,
1843            secrets=secrets,
1844            test_and_write_vectors={
1845                sharenum: ([], datav, None),
1846            },
1847            read_vector=[],
1848            renew_leases=False,
1849        )
1850        leases = list(ss.get_slot_leases(storage_index))
1851        self.assertThat([], Equals(leases))
1852
1853    def test_get_slot_leases_empty_slot(self):
1854        """
1855        When ``get_slot_leases`` is called for a slot for which the server has no
1856        shares, it returns an empty iterable.
1857        """
1858        ss = self.create("test_get_slot_leases_empty_slot")
1859        self.assertThat(
1860            list(ss.get_slot_leases(b"si1")),
1861            Equals([]),
1862        )
1863
1864    def test_remove_non_present(self):
1865        """
1866        A write vector which would remove a share completely is applied as a no-op
1867        by a server which does not have the share.
1868        """
1869        ss = self.create("test_remove_non_present")
1870
1871        storage_index = b"si1"
1872        secrets = (
1873            self.write_enabler(storage_index),
1874            self.renew_secret(storage_index),
1875            self.cancel_secret(storage_index),
1876        )
1877
1878        sharenum = 3
1879        testv = []
1880        datav = []
1881        new_length = 0
1882        read_vector = []
1883
1884        # We don't even need to create any shares to exercise this
1885        # functionality.  Just go straight to sending a truncate-to-zero
1886        # write.
1887        testv_is_good, read_data = ss.slot_testv_and_readv_and_writev(
1888            storage_index=storage_index,
1889            secrets=secrets,
1890            test_and_write_vectors={
1891                sharenum: (testv, datav, new_length),
1892            },
1893            read_vector=read_vector,
1894        )
1895
1896        self.assertTrue(testv_is_good)
1897        self.assertThat({}, Equals(read_data))
1898
1899
1900class MDMFProxies(AsyncTestCase, ShouldFailMixin):
1901    def setUp(self):
1902        super(MDMFProxies, self).setUp()
1903        self.sparent = LoggingServiceParent()
1904        self._lease_secret = itertools.count()
1905        self.ss = self.create("MDMFProxies storage test server")
1906        self.rref = RemoteBucket(FoolscapStorageServer(self.ss))
1907        self.storage_server = _StorageServer(lambda: self.rref)
1908        self.secrets = (self.write_enabler(b"we_secret"),
1909                        self.renew_secret(b"renew_secret"),
1910                        self.cancel_secret(b"cancel_secret"))
1911        self.segment = b"aaaaaa"
1912        self.block = b"aa"
1913        self.salt = b"a" * 16
1914        self.block_hash = b"a" * 32
1915        self.block_hash_tree = [self.block_hash for i in range(6)]
1916        self.share_hash = self.block_hash
1917        self.share_hash_chain = dict([(i, self.share_hash) for i in range(6)])
1918        self.signature = b"foobarbaz"
1919        self.verification_key = b"vvvvvv"
1920        self.encprivkey = b"private"
1921        self.root_hash = self.block_hash
1922        self.salt_hash = self.root_hash
1923        self.salt_hash_tree = [self.salt_hash for i in range(6)]
1924        self.block_hash_tree_s = self.serialize_blockhashes(self.block_hash_tree)
1925        self.share_hash_chain_s = self.serialize_sharehashes(self.share_hash_chain)
1926        # blockhashes and salt hashes are serialized in the same way,
1927        # only we lop off the first element and store that in the
1928        # header.
1929        self.salt_hash_tree_s = self.serialize_blockhashes(self.salt_hash_tree[1:])
1930
1931
1932    def tearDown(self):
1933        super(MDMFProxies, self).tearDown()
1934        self.sparent.stopService()
1935        shutil.rmtree(self.workdir("MDMFProxies storage test server"))
1936
1937
1938    def write_enabler(self, we_tag):
1939        return hashutil.tagged_hash(b"we_blah", we_tag)
1940
1941
1942    def renew_secret(self, tag):
1943        if isinstance(tag, int):
1944            tag = b"%d" % tag
1945        return hashutil.tagged_hash(b"renew_blah", tag)
1946
1947
1948    def cancel_secret(self, tag):
1949        if isinstance(tag, int):
1950            tag = b"%d" % tag
1951        return hashutil.tagged_hash(b"cancel_blah", tag)
1952
1953
1954    def workdir(self, name):
1955        basedir = os.path.join("storage", "MutableServer", name)
1956        return basedir
1957
1958
1959    def create(self, name):
1960        workdir = self.workdir(name)
1961        ss = StorageServer(workdir, b"\x00" * 20)
1962        ss.setServiceParent(self.sparent)
1963        return ss
1964
1965
1966    def build_test_mdmf_share(self, tail_segment=False, empty=False):
1967        # Start with the checkstring
1968        data = struct.pack(">BQ32s",
1969                           1,
1970                           0,
1971                           self.root_hash)
1972        self.checkstring = data
1973        # Next, the encoding parameters
1974        if tail_segment:
1975            data += struct.pack(">BBQQ",
1976                                3,
1977                                10,
1978                                6,
1979                                33)
1980        elif empty:
1981            data += struct.pack(">BBQQ",
1982                                3,
1983                                10,
1984                                0,
1985                                0)
1986        else:
1987            data += struct.pack(">BBQQ",
1988                                3,
1989                                10,
1990                                6,
1991                                36)
1992        # Now we'll build the offsets.
1993        sharedata = b""
1994        if not tail_segment and not empty:
1995            for i in range(6):
1996                sharedata += self.salt + self.block
1997        elif tail_segment:
1998            for i in range(5):
1999                sharedata += self.salt + self.block
2000            sharedata += self.salt + b"a"
2001
2002        # The encrypted private key comes after the shares + salts
2003        offset_size = struct.calcsize(MDMFOFFSETS)
2004        encrypted_private_key_offset = len(data) + offset_size
2005        # The share has chain comes after the private key
2006        sharehashes_offset = encrypted_private_key_offset + \
2007            len(self.encprivkey)
2008
2009        # The signature comes after the share hash chain.
2010        signature_offset = sharehashes_offset + len(self.share_hash_chain_s)
2011
2012        verification_key_offset = signature_offset + len(self.signature)
2013        verification_key_end = verification_key_offset + \
2014            len(self.verification_key)
2015
2016        share_data_offset = offset_size
2017        share_data_offset += PRIVATE_KEY_SIZE
2018        share_data_offset += SIGNATURE_SIZE
2019        share_data_offset += VERIFICATION_KEY_SIZE
2020        share_data_offset += SHARE_HASH_CHAIN_SIZE
2021
2022        blockhashes_offset = share_data_offset + len(sharedata)
2023        eof_offset = blockhashes_offset + len(self.block_hash_tree_s)
2024
2025        data += struct.pack(MDMFOFFSETS,
2026                            encrypted_private_key_offset,
2027                            sharehashes_offset,
2028                            signature_offset,
2029                            verification_key_offset,
2030                            verification_key_end,
2031                            share_data_offset,
2032                            blockhashes_offset,
2033                            eof_offset)
2034
2035        self.offsets = {}
2036        self.offsets['enc_privkey'] = encrypted_private_key_offset
2037        self.offsets['block_hash_tree'] = blockhashes_offset
2038        self.offsets['share_hash_chain'] = sharehashes_offset
2039        self.offsets['signature'] = signature_offset
2040        self.offsets['verification_key'] = verification_key_offset
2041        self.offsets['share_data'] = share_data_offset
2042        self.offsets['verification_key_end'] = verification_key_end
2043        self.offsets['EOF'] = eof_offset
2044
2045        # the private key,
2046        data += self.encprivkey
2047        # the sharehashes
2048        data += self.share_hash_chain_s
2049        # the signature,
2050        data += self.signature
2051        # and the verification key
2052        data += self.verification_key
2053        # Then we'll add in gibberish until we get to the right point.
2054        nulls = b"".join([b" " for i in range(len(data), share_data_offset)])
2055        data += nulls
2056
2057        # Then the share data
2058        data += sharedata
2059        # the blockhashes
2060        data += self.block_hash_tree_s
2061        return data
2062
2063
2064    def write_test_share_to_server(self,
2065                                   storage_index,
2066                                   tail_segment=False,
2067                                   empty=False):
2068        """
2069        I write some data for the read tests to read to self.ss
2070
2071        If tail_segment=True, then I will write a share that has a
2072        smaller tail segment than other segments.
2073        """
2074        write = self.ss.slot_testv_and_readv_and_writev
2075        data = self.build_test_mdmf_share(tail_segment, empty)
2076        # Finally, we write the whole thing to the storage server in one
2077        # pass.
2078        testvs = [(0, 1, b"eq", b"")]
2079        tws = {}
2080        tws[0] = (testvs, [(0, data)], None)
2081        readv = [(0, 1)]
2082        results = write(storage_index, self.secrets, tws, readv)
2083        self.assertTrue(results[0])
2084
2085
2086    def build_test_sdmf_share(self, empty=False):
2087        if empty:
2088            sharedata = b""
2089        else:
2090            sharedata = self.segment * 6
2091        self.sharedata = sharedata
2092        blocksize = len(sharedata) // 3
2093        block = sharedata[:blocksize]
2094        self.blockdata = block
2095        prefix = struct.pack(">BQ32s16s BBQQ",
2096                             0, # version,
2097                             0,
2098                             self.root_hash,
2099                             self.salt,
2100                             3,
2101                             10,
2102                             len(sharedata),
2103                             len(sharedata),
2104                            )
2105        post_offset = struct.calcsize(">BQ32s16sBBQQLLLLQQ")
2106        signature_offset = post_offset + len(self.verification_key)
2107        sharehashes_offset = signature_offset + len(self.signature)
2108        blockhashes_offset = sharehashes_offset + len(self.share_hash_chain_s)
2109        sharedata_offset = blockhashes_offset + len(self.block_hash_tree_s)
2110        encprivkey_offset = sharedata_offset + len(block)
2111        eof_offset = encprivkey_offset + len(self.encprivkey)
2112        offsets = struct.pack(">LLLLQQ",
2113                              signature_offset,
2114                              sharehashes_offset,
2115                              blockhashes_offset,
2116                              sharedata_offset,
2117                              encprivkey_offset,
2118                              eof_offset)
2119        final_share = b"".join([prefix,
2120                                offsets,
2121                                self.verification_key,
2122                                self.signature,
2123                                self.share_hash_chain_s,
2124                                self.block_hash_tree_s,
2125                                block,
2126                                self.encprivkey])
2127        self.offsets = {}
2128        self.offsets['signature'] = signature_offset
2129        self.offsets['share_hash_chain'] = sharehashes_offset
2130        self.offsets['block_hash_tree'] = blockhashes_offset
2131        self.offsets['share_data'] = sharedata_offset
2132        self.offsets['enc_privkey'] = encprivkey_offset
2133        self.offsets['EOF'] = eof_offset
2134        return final_share
2135
2136
2137    def write_sdmf_share_to_server(self,
2138                                   storage_index,
2139                                   empty=False):
2140        # Some tests need SDMF shares to verify that we can still
2141        # read them. This method writes one, which resembles but is not
2142        write = self.ss.slot_testv_and_readv_and_writev
2143        share = self.build_test_sdmf_share(empty)
2144        testvs = [(0, 1, b"eq", b"")]
2145        tws = {}
2146        tws[0] = (testvs, [(0, share)], None)
2147        readv = []
2148        results = write(storage_index, self.secrets, tws, readv)
2149        self.assertTrue(results[0])
2150
2151
2152    def test_read(self):
2153        self.write_test_share_to_server(b"si1")
2154        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2155        # Check that every method equals what we expect it to.
2156        d = defer.succeed(None)
2157        def _check_block_and_salt(block_and_salt):
2158            (block, salt) = block_and_salt
2159            self.assertThat(block, Equals(self.block))
2160            self.assertThat(salt, Equals(self.salt))
2161
2162        for i in range(6):
2163            d.addCallback(lambda ignored, i=i:
2164                mr.get_block_and_salt(i))
2165            d.addCallback(_check_block_and_salt)
2166
2167        d.addCallback(lambda ignored:
2168            mr.get_encprivkey())
2169        d.addCallback(lambda encprivkey:
2170            self.assertThat(self.encprivkey, Equals(encprivkey)))
2171
2172        d.addCallback(lambda ignored:
2173            mr.get_blockhashes())
2174        d.addCallback(lambda blockhashes:
2175            self.assertThat(self.block_hash_tree, Equals(blockhashes)))
2176
2177        d.addCallback(lambda ignored:
2178            mr.get_sharehashes())
2179        d.addCallback(lambda sharehashes:
2180            self.assertThat(self.share_hash_chain, Equals(sharehashes)))
2181
2182        d.addCallback(lambda ignored:
2183            mr.get_signature())
2184        d.addCallback(lambda signature:
2185            self.assertThat(signature, Equals(self.signature)))
2186
2187        d.addCallback(lambda ignored:
2188            mr.get_verification_key())
2189        d.addCallback(lambda verification_key:
2190            self.assertThat(verification_key, Equals(self.verification_key)))
2191
2192        d.addCallback(lambda ignored:
2193            mr.get_seqnum())
2194        d.addCallback(lambda seqnum:
2195            self.assertThat(seqnum, Equals(0)))
2196
2197        d.addCallback(lambda ignored:
2198            mr.get_root_hash())
2199        d.addCallback(lambda root_hash:
2200            self.assertThat(self.root_hash, Equals(root_hash)))
2201
2202        d.addCallback(lambda ignored:
2203            mr.get_seqnum())
2204        d.addCallback(lambda seqnum:
2205            self.assertThat(seqnum, Equals(0)))
2206
2207        d.addCallback(lambda ignored:
2208            mr.get_encoding_parameters())
2209        def _check_encoding_parameters(args):
2210            (k, n, segsize, datalen) = args
2211            self.assertThat(k, Equals(3))
2212            self.assertThat(n, Equals(10))
2213            self.assertThat(segsize, Equals(6))
2214            self.assertThat(datalen, Equals(36))
2215        d.addCallback(_check_encoding_parameters)
2216
2217        d.addCallback(lambda ignored:
2218            mr.get_checkstring())
2219        d.addCallback(lambda checkstring:
2220            self.assertThat(checkstring, Equals(checkstring)))
2221        return d
2222
2223
2224    def test_read_with_different_tail_segment_size(self):
2225        self.write_test_share_to_server(b"si1", tail_segment=True)
2226        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2227        d = mr.get_block_and_salt(5)
2228        def _check_tail_segment(results):
2229            block, salt = results
2230            self.assertThat(block, HasLength(1))
2231            self.assertThat(block, Equals(b"a"))
2232        d.addCallback(_check_tail_segment)
2233        return d
2234
2235
2236    def test_get_block_with_invalid_segnum(self):
2237        self.write_test_share_to_server(b"si1")
2238        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2239        d = defer.succeed(None)
2240        d.addCallback(lambda ignored:
2241            self.shouldFail(LayoutInvalid, "test invalid segnum",
2242                            None,
2243                            mr.get_block_and_salt, 7))
2244        return d
2245
2246
2247    def test_get_encoding_parameters_first(self):
2248        self.write_test_share_to_server(b"si1")
2249        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2250        d = mr.get_encoding_parameters()
2251        def _check_encoding_parameters(args):
2252            (k, n, segment_size, datalen) = args
2253            self.assertThat(k, Equals(3))
2254            self.assertThat(n, Equals(10))
2255            self.assertThat(segment_size, Equals(6))
2256            self.assertThat(datalen, Equals(36))
2257        d.addCallback(_check_encoding_parameters)
2258        return d
2259
2260
2261    def test_get_seqnum_first(self):
2262        self.write_test_share_to_server(b"si1")
2263        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2264        d = mr.get_seqnum()
2265        d.addCallback(lambda seqnum:
2266            self.assertThat(seqnum, Equals(0)))
2267        return d
2268
2269
2270    def test_get_root_hash_first(self):
2271        self.write_test_share_to_server(b"si1")
2272        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2273        d = mr.get_root_hash()
2274        d.addCallback(lambda root_hash:
2275            self.assertThat(root_hash, Equals(self.root_hash)))
2276        return d
2277
2278
2279    def test_get_checkstring_first(self):
2280        self.write_test_share_to_server(b"si1")
2281        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2282        d = mr.get_checkstring()
2283        d.addCallback(lambda checkstring:
2284            self.assertThat(checkstring, Equals(self.checkstring)))
2285        return d
2286
2287
2288    def test_write_read_vectors(self):
2289        # When writing for us, the storage server will return to us a
2290        # read vector, along with its result. If a write fails because
2291        # the test vectors failed, this read vector can help us to
2292        # diagnose the problem. This test ensures that the read vector
2293        # is working appropriately.
2294        mw = self._make_new_mw(b"si1", 0)
2295
2296        for i in range(6):
2297            mw.put_block(self.block, i, self.salt)
2298        mw.put_encprivkey(self.encprivkey)
2299        mw.put_blockhashes(self.block_hash_tree)
2300        mw.put_sharehashes(self.share_hash_chain)
2301        mw.put_root_hash(self.root_hash)
2302        mw.put_signature(self.signature)
2303        mw.put_verification_key(self.verification_key)
2304        d = mw.finish_publishing()
2305        def _then(results):
2306            self.assertThat(results, HasLength(2))
2307            result, readv = results
2308            self.assertTrue(result)
2309            self.assertFalse(readv)
2310            self.old_checkstring = mw.get_checkstring()
2311            mw.set_checkstring(b"")
2312        d.addCallback(_then)
2313        d.addCallback(lambda ignored:
2314            mw.finish_publishing())
2315        def _then_again(results):
2316            self.assertThat(results, HasLength(2))
2317            result, readvs = results
2318            self.assertFalse(result)
2319            self.assertThat(readvs, Contains(0))
2320            readv = readvs[0][0]
2321            self.assertThat(readv, Equals(self.old_checkstring))
2322        d.addCallback(_then_again)
2323        # The checkstring remains the same for the rest of the process.
2324        return d
2325
2326
2327    def test_private_key_after_share_hash_chain(self):
2328        mw = self._make_new_mw(b"si1", 0)
2329        d = defer.succeed(None)
2330        for i in range(6):
2331            d.addCallback(lambda ignored, i=i:
2332                mw.put_block(self.block, i, self.salt))
2333        d.addCallback(lambda ignored:
2334            mw.put_encprivkey(self.encprivkey))
2335        d.addCallback(lambda ignored:
2336            mw.put_sharehashes(self.share_hash_chain))
2337
2338        # Now try to put the private key again.
2339        d.addCallback(lambda ignored:
2340            self.shouldFail(LayoutInvalid, "test repeat private key",
2341                            None,
2342                            mw.put_encprivkey, self.encprivkey))
2343        return d
2344
2345
2346    def test_signature_after_verification_key(self):
2347        mw = self._make_new_mw(b"si1", 0)
2348        d = defer.succeed(None)
2349        # Put everything up to and including the verification key.
2350        for i in range(6):
2351            d.addCallback(lambda ignored, i=i:
2352                mw.put_block(self.block, i, self.salt))
2353        d.addCallback(lambda ignored:
2354            mw.put_encprivkey(self.encprivkey))
2355        d.addCallback(lambda ignored:
2356            mw.put_blockhashes(self.block_hash_tree))
2357        d.addCallback(lambda ignored:
2358            mw.put_sharehashes(self.share_hash_chain))
2359        d.addCallback(lambda ignored:
2360            mw.put_root_hash(self.root_hash))
2361        d.addCallback(lambda ignored:
2362            mw.put_signature(self.signature))
2363        d.addCallback(lambda ignored:
2364            mw.put_verification_key(self.verification_key))
2365        # Now try to put the signature again. This should fail
2366        d.addCallback(lambda ignored:
2367            self.shouldFail(LayoutInvalid, "signature after verification",
2368                            None,
2369                            mw.put_signature, self.signature))
2370        return d
2371
2372
2373    def test_uncoordinated_write(self):
2374        # Make two mutable writers, both pointing to the same storage
2375        # server, both at the same storage index, and try writing to the
2376        # same share.
2377        mw1 = self._make_new_mw(b"si1", 0)
2378        mw2 = self._make_new_mw(b"si1", 0)
2379
2380        def _check_success(results):
2381            result, readvs = results
2382            self.assertTrue(result)
2383
2384        def _check_failure(results):
2385            result, readvs = results
2386            self.assertFalse(result)
2387
2388        def _write_share(mw):
2389            for i in range(6):
2390                mw.put_block(self.block, i, self.salt)
2391            mw.put_encprivkey(self.encprivkey)
2392            mw.put_blockhashes(self.block_hash_tree)
2393            mw.put_sharehashes(self.share_hash_chain)
2394            mw.put_root_hash(self.root_hash)
2395            mw.put_signature(self.signature)
2396            mw.put_verification_key(self.verification_key)
2397            return mw.finish_publishing()
2398        d = _write_share(mw1)
2399        d.addCallback(_check_success)
2400        d.addCallback(lambda ignored:
2401            _write_share(mw2))
2402        d.addCallback(_check_failure)
2403        return d
2404
2405
2406    def test_invalid_salt_size(self):
2407        # Salts need to be 16 bytes in size. Writes that attempt to
2408        # write more or less than this should be rejected.
2409        mw = self._make_new_mw(b"si1", 0)
2410        invalid_salt = b"a" * 17 # 17 bytes
2411        another_invalid_salt = b"b" * 15 # 15 bytes
2412        d = defer.succeed(None)
2413        d.addCallback(lambda ignored:
2414            self.shouldFail(LayoutInvalid, "salt too big",
2415                            None,
2416                            mw.put_block, self.block, 0, invalid_salt))
2417        d.addCallback(lambda ignored:
2418            self.shouldFail(LayoutInvalid, "salt too small",
2419                            None,
2420                            mw.put_block, self.block, 0,
2421                            another_invalid_salt))
2422        return d
2423
2424
2425    def test_write_test_vectors(self):
2426        # If we give the write proxy a bogus test vector at
2427        # any point during the process, it should fail to write when we
2428        # tell it to write.
2429        def _check_failure(results):
2430            self.assertThat(results, HasLength(2))
2431            res, d = results
2432            self.assertFalse(res)
2433
2434        def _check_success(results):
2435            self.assertThat(results, HasLength(2))
2436            res, d = results
2437            self.assertTrue(results)
2438
2439        mw = self._make_new_mw(b"si1", 0)
2440        mw.set_checkstring(b"this is a lie")
2441        for i in range(6):
2442            mw.put_block(self.block, i, self.salt)
2443        mw.put_encprivkey(self.encprivkey)
2444        mw.put_blockhashes(self.block_hash_tree)
2445        mw.put_sharehashes(self.share_hash_chain)
2446        mw.put_root_hash(self.root_hash)
2447        mw.put_signature(self.signature)
2448        mw.put_verification_key(self.verification_key)
2449        d = mw.finish_publishing()
2450        d.addCallback(_check_failure)
2451        d.addCallback(lambda ignored:
2452            mw.set_checkstring(b""))
2453        d.addCallback(lambda ignored:
2454            mw.finish_publishing())
2455        d.addCallback(_check_success)
2456        return d
2457
2458
2459    def serialize_blockhashes(self, blockhashes):
2460        return b"".join(blockhashes)
2461
2462
2463    def serialize_sharehashes(self, sharehashes):
2464        ret = b"".join([struct.pack(">H32s", i, sharehashes[i])
2465                        for i in sorted(sharehashes.keys())])
2466        return ret
2467
2468
2469    def test_write(self):
2470        # This translates to a file with 6 6-byte segments, and with 2-byte
2471        # blocks.
2472        mw = self._make_new_mw(b"si1", 0)
2473        # Test writing some blocks.
2474        read = self.ss.slot_readv
2475        expected_private_key_offset = struct.calcsize(MDMFHEADER)
2476        expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
2477                                    PRIVATE_KEY_SIZE + \
2478                                    SIGNATURE_SIZE + \
2479                                    VERIFICATION_KEY_SIZE + \
2480                                    SHARE_HASH_CHAIN_SIZE
2481        written_block_size = 2 + len(self.salt)
2482        written_block = self.block + self.salt
2483        for i in range(6):
2484            mw.put_block(self.block, i, self.salt)
2485
2486        mw.put_encprivkey(self.encprivkey)
2487        mw.put_blockhashes(self.block_hash_tree)
2488        mw.put_sharehashes(self.share_hash_chain)
2489        mw.put_root_hash(self.root_hash)
2490        mw.put_signature(self.signature)
2491        mw.put_verification_key(self.verification_key)
2492        d = mw.finish_publishing()
2493        def _check_publish(results):
2494            self.assertThat(results, HasLength(2))
2495            result, ign = results
2496            self.assertTrue(result, "publish failed")
2497            for i in range(6):
2498                self.assertThat(read(b"si1", [0], [(expected_sharedata_offset + (i * written_block_size), written_block_size)]),
2499                                Equals({0: [written_block]}))
2500
2501            self.assertThat(self.encprivkey, HasLength(7))
2502            self.assertThat(read(b"si1", [0], [(expected_private_key_offset, 7)]),
2503                                 Equals({0: [self.encprivkey]}))
2504
2505            expected_block_hash_offset = expected_sharedata_offset + \
2506                        (6 * written_block_size)
2507            self.assertThat(self.block_hash_tree_s, HasLength(32 * 6))
2508            self.assertThat(read(b"si1", [0], [(expected_block_hash_offset, 32 * 6)]),
2509                                 Equals({0: [self.block_hash_tree_s]}))
2510
2511            expected_share_hash_offset = expected_private_key_offset + len(self.encprivkey)
2512            self.assertThat(read(b"si1", [0],[(expected_share_hash_offset, (32 + 2) * 6)]),
2513                                 Equals({0: [self.share_hash_chain_s]}))
2514
2515            self.assertThat(read(b"si1", [0], [(9, 32)]),
2516                                 Equals({0: [self.root_hash]}))
2517            expected_signature_offset = expected_share_hash_offset + \
2518                len(self.share_hash_chain_s)
2519            self.assertThat(self.signature, HasLength(9))
2520            self.assertThat(read(b"si1", [0], [(expected_signature_offset, 9)]),
2521                                 Equals({0: [self.signature]}))
2522
2523            expected_verification_key_offset = expected_signature_offset + len(self.signature)
2524            self.assertThat(self.verification_key, HasLength(6))
2525            self.assertThat(read(b"si1", [0], [(expected_verification_key_offset, 6)]),
2526                                 Equals({0: [self.verification_key]}))
2527
2528            signable = mw.get_signable()
2529            verno, seq, roothash, k, n, segsize, datalen = \
2530                                            struct.unpack(">BQ32sBBQQ",
2531                                                          signable)
2532            self.assertThat(verno, Equals(1))
2533            self.assertThat(seq, Equals(0))
2534            self.assertThat(roothash, Equals(self.root_hash))
2535            self.assertThat(k, Equals(3))
2536            self.assertThat(n, Equals(10))
2537            self.assertThat(segsize, Equals(6))
2538            self.assertThat(datalen, Equals(36))
2539            expected_eof_offset = expected_block_hash_offset + \
2540                len(self.block_hash_tree_s)
2541
2542            # Check the version number to make sure that it is correct.
2543            expected_version_number = struct.pack(">B", 1)
2544            self.assertThat(read(b"si1", [0], [(0, 1)]),
2545                                 Equals({0: [expected_version_number]}))
2546            # Check the sequence number to make sure that it is correct
2547            expected_sequence_number = struct.pack(">Q", 0)
2548            self.assertThat(read(b"si1", [0], [(1, 8)]),
2549                                 Equals({0: [expected_sequence_number]}))
2550            # Check that the encoding parameters (k, N, segement size, data
2551            # length) are what they should be. These are  3, 10, 6, 36
2552            expected_k = struct.pack(">B", 3)
2553            self.assertThat(read(b"si1", [0], [(41, 1)]),
2554                                 Equals({0: [expected_k]}))
2555            expected_n = struct.pack(">B", 10)
2556            self.assertThat(read(b"si1", [0], [(42, 1)]),
2557                                 Equals({0: [expected_n]}))
2558            expected_segment_size = struct.pack(">Q", 6)
2559            self.assertThat(read(b"si1", [0], [(43, 8)]),
2560                                 Equals({0: [expected_segment_size]}))
2561            expected_data_length = struct.pack(">Q", 36)
2562            self.assertThat(read(b"si1", [0], [(51, 8)]),
2563                                 Equals({0: [expected_data_length]}))
2564            expected_offset = struct.pack(">Q", expected_private_key_offset)
2565            self.assertThat(read(b"si1", [0], [(59, 8)]),
2566                                 Equals({0: [expected_offset]}))
2567            expected_offset = struct.pack(">Q", expected_share_hash_offset)
2568            self.assertThat(read(b"si1", [0], [(67, 8)]),
2569                                 Equals({0: [expected_offset]}))
2570            expected_offset = struct.pack(">Q", expected_signature_offset)
2571            self.assertThat(read(b"si1", [0], [(75, 8)]),
2572                                 Equals({0: [expected_offset]}))
2573            expected_offset = struct.pack(">Q", expected_verification_key_offset)
2574            self.assertThat(read(b"si1", [0], [(83, 8)]),
2575                                 Equals({0: [expected_offset]}))
2576            expected_offset = struct.pack(">Q", expected_verification_key_offset + len(self.verification_key))
2577            self.assertThat(read(b"si1", [0], [(91, 8)]),
2578                                 Equals({0: [expected_offset]}))
2579            expected_offset = struct.pack(">Q", expected_sharedata_offset)
2580            self.assertThat(read(b"si1", [0], [(99, 8)]),
2581                                 Equals({0: [expected_offset]}))
2582            expected_offset = struct.pack(">Q", expected_block_hash_offset)
2583            self.assertThat(read(b"si1", [0], [(107, 8)]),
2584                                 Equals({0: [expected_offset]}))
2585            expected_offset = struct.pack(">Q", expected_eof_offset)
2586            self.assertThat(read(b"si1", [0], [(115, 8)]),
2587                                 Equals({0: [expected_offset]}))
2588        d.addCallback(_check_publish)
2589        return d
2590
2591    def _make_new_mw(self, si, share, datalength=36):
2592        # This is a file of size 36 bytes. Since it has a segment
2593        # size of 6, we know that it has 6 byte segments, which will
2594        # be split into blocks of 2 bytes because our FEC k
2595        # parameter is 3.
2596        mw = MDMFSlotWriteProxy(share, self.storage_server, si, self.secrets, 0, 3, 10,
2597                                6, datalength)
2598        return mw
2599
2600
2601    def test_write_rejected_with_too_many_blocks(self):
2602        mw = self._make_new_mw(b"si0", 0)
2603
2604        # Try writing too many blocks. We should not be able to write
2605        # more than 6
2606        # blocks into each share.
2607        d = defer.succeed(None)
2608        for i in range(6):
2609            d.addCallback(lambda ignored, i=i:
2610                mw.put_block(self.block, i, self.salt))
2611        d.addCallback(lambda ignored:
2612            self.shouldFail(LayoutInvalid, "too many blocks",
2613                            None,
2614                            mw.put_block, self.block, 7, self.salt))
2615        return d
2616
2617
2618    def test_write_rejected_with_invalid_salt(self):
2619        # Try writing an invalid salt. Salts are 16 bytes -- any more or
2620        # less should cause an error.
2621        mw = self._make_new_mw(b"si1", 0)
2622        bad_salt = b"a" * 17 # 17 bytes
2623        d = defer.succeed(None)
2624        d.addCallback(lambda ignored:
2625            self.shouldFail(LayoutInvalid, "test_invalid_salt",
2626                            None, mw.put_block, self.block, 7, bad_salt))
2627        return d
2628
2629
2630    def test_write_rejected_with_invalid_root_hash(self):
2631        # Try writing an invalid root hash. This should be SHA256d, and
2632        # 32 bytes long as a result.
2633        mw = self._make_new_mw(b"si2", 0)
2634        # 17 bytes != 32 bytes
2635        invalid_root_hash = b"a" * 17
2636        d = defer.succeed(None)
2637        # Before this test can work, we need to put some blocks + salts,
2638        # a block hash tree, and a share hash tree. Otherwise, we'll see
2639        # failures that match what we are looking for, but are caused by
2640        # the constraints imposed on operation ordering.
2641        for i in range(6):
2642            d.addCallback(lambda ignored, i=i:
2643                mw.put_block(self.block, i, self.salt))
2644        d.addCallback(lambda ignored:
2645            mw.put_encprivkey(self.encprivkey))
2646        d.addCallback(lambda ignored:
2647            mw.put_blockhashes(self.block_hash_tree))
2648        d.addCallback(lambda ignored:
2649            mw.put_sharehashes(self.share_hash_chain))
2650        d.addCallback(lambda ignored:
2651            self.shouldFail(LayoutInvalid, "invalid root hash",
2652                            None, mw.put_root_hash, invalid_root_hash))
2653        return d
2654
2655
2656    def test_write_rejected_with_invalid_blocksize(self):
2657        # The blocksize implied by the writer that we get from
2658        # _make_new_mw is 2bytes -- any more or any less than this
2659        # should be cause for failure, unless it is the tail segment, in
2660        # which case it may not be failure.
2661        invalid_block = b"a"
2662        mw = self._make_new_mw(b"si3", 0, 33) # implies a tail segment with
2663                                             # one byte blocks
2664        # 1 bytes != 2 bytes
2665        d = defer.succeed(None)
2666        d.addCallback(lambda ignored, invalid_block=invalid_block:
2667            self.shouldFail(LayoutInvalid, "test blocksize too small",
2668                            None, mw.put_block, invalid_block, 0,
2669                            self.salt))
2670        invalid_block = invalid_block * 3
2671        # 3 bytes != 2 bytes
2672        d.addCallback(lambda ignored:
2673            self.shouldFail(LayoutInvalid, "test blocksize too large",
2674                            None,
2675                            mw.put_block, invalid_block, 0, self.salt))
2676        for i in range(5):
2677            d.addCallback(lambda ignored, i=i:
2678                mw.put_block(self.block, i, self.salt))
2679        # Try to put an invalid tail segment
2680        d.addCallback(lambda ignored:
2681            self.shouldFail(LayoutInvalid, "test invalid tail segment",
2682                            None,
2683                            mw.put_block, self.block, 5, self.salt))
2684        valid_block = b"a"
2685        d.addCallback(lambda ignored:
2686            mw.put_block(valid_block, 5, self.salt))
2687        return d
2688
2689
2690    def test_write_enforces_order_constraints(self):
2691        # We require that the MDMFSlotWriteProxy be interacted with in a
2692        # specific way.
2693        # That way is:
2694        # 0: __init__
2695        # 1: write blocks and salts
2696        # 2: Write the encrypted private key
2697        # 3: Write the block hashes
2698        # 4: Write the share hashes
2699        # 5: Write the root hash and salt hash
2700        # 6: Write the signature and verification key
2701        # 7: Write the file.
2702        #
2703        # Some of these can be performed out-of-order, and some can't.
2704        # The dependencies that I want to test here are:
2705        #  - Private key before block hashes
2706        #  - share hashes and block hashes before root hash
2707        #  - root hash before signature
2708        #  - signature before verification key
2709        mw0 = self._make_new_mw(b"si0", 0)
2710        # Write some shares
2711        d = defer.succeed(None)
2712        for i in range(6):
2713            d.addCallback(lambda ignored, i=i:
2714                mw0.put_block(self.block, i, self.salt))
2715
2716        # Try to write the share hash chain without writing the
2717        # encrypted private key
2718        d.addCallback(lambda ignored:
2719            self.shouldFail(LayoutInvalid, "share hash chain before "
2720                                           "private key",
2721                            None,
2722                            mw0.put_sharehashes, self.share_hash_chain))
2723        # Write the private key.
2724        d.addCallback(lambda ignored:
2725            mw0.put_encprivkey(self.encprivkey))
2726
2727        # Now write the block hashes and try again
2728        d.addCallback(lambda ignored:
2729            mw0.put_blockhashes(self.block_hash_tree))
2730
2731        # We haven't yet put the root hash on the share, so we shouldn't
2732        # be able to sign it.
2733        d.addCallback(lambda ignored:
2734            self.shouldFail(LayoutInvalid, "signature before root hash",
2735                            None, mw0.put_signature, self.signature))
2736
2737        d.addCallback(lambda ignored:
2738            self.failUnlessRaises(LayoutInvalid, mw0.get_signable))
2739
2740        # ..and, since that fails, we also shouldn't be able to put the
2741        # verification key.
2742        d.addCallback(lambda ignored:
2743            self.shouldFail(LayoutInvalid, "key before signature",
2744                            None, mw0.put_verification_key,
2745                            self.verification_key))
2746
2747        # Now write the share hashes.
2748        d.addCallback(lambda ignored:
2749            mw0.put_sharehashes(self.share_hash_chain))
2750        # We should be able to write the root hash now too
2751        d.addCallback(lambda ignored:
2752            mw0.put_root_hash(self.root_hash))
2753
2754        # We should still be unable to put the verification key
2755        d.addCallback(lambda ignored:
2756            self.shouldFail(LayoutInvalid, "key before signature",
2757                            None, mw0.put_verification_key,
2758                            self.verification_key))
2759
2760        d.addCallback(lambda ignored:
2761            mw0.put_signature(self.signature))
2762
2763        # We shouldn't be able to write the offsets to the remote server
2764        # until the offset table is finished; IOW, until we have written
2765        # the verification key.
2766        d.addCallback(lambda ignored:
2767            self.shouldFail(LayoutInvalid, "offsets before verification key",
2768                            None,
2769                            mw0.finish_publishing))
2770
2771        d.addCallback(lambda ignored:
2772            mw0.put_verification_key(self.verification_key))
2773        return d
2774
2775
2776    def test_end_to_end(self):
2777        mw = self._make_new_mw(b"si1", 0)
2778        # Write a share using the mutable writer, and make sure that the
2779        # reader knows how to read everything back to us.
2780        d = defer.succeed(None)
2781        for i in range(6):
2782            d.addCallback(lambda ignored, i=i:
2783                mw.put_block(self.block, i, self.salt))
2784        d.addCallback(lambda ignored:
2785            mw.put_encprivkey(self.encprivkey))
2786        d.addCallback(lambda ignored:
2787            mw.put_blockhashes(self.block_hash_tree))
2788        d.addCallback(lambda ignored:
2789            mw.put_sharehashes(self.share_hash_chain))
2790        d.addCallback(lambda ignored:
2791            mw.put_root_hash(self.root_hash))
2792        d.addCallback(lambda ignored:
2793            mw.put_signature(self.signature))
2794        d.addCallback(lambda ignored:
2795            mw.put_verification_key(self.verification_key))
2796        d.addCallback(lambda ignored:
2797            mw.finish_publishing())
2798
2799        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2800        def _check_block_and_salt(block_and_salt):
2801            (block, salt) = block_and_salt
2802            self.assertThat(block, Equals(self.block))
2803            self.assertThat(salt, Equals(self.salt))
2804
2805        for i in range(6):
2806            d.addCallback(lambda ignored, i=i:
2807                mr.get_block_and_salt(i))
2808            d.addCallback(_check_block_and_salt)
2809
2810        d.addCallback(lambda ignored:
2811            mr.get_encprivkey())
2812        d.addCallback(lambda encprivkey:
2813            self.assertThat(self.encprivkey, Equals(encprivkey)))
2814
2815        d.addCallback(lambda ignored:
2816            mr.get_blockhashes())
2817        d.addCallback(lambda blockhashes:
2818            self.assertThat(self.block_hash_tree, Equals(blockhashes)))
2819
2820        d.addCallback(lambda ignored:
2821            mr.get_sharehashes())
2822        d.addCallback(lambda sharehashes:
2823            self.assertThat(self.share_hash_chain, Equals(sharehashes)))
2824
2825        d.addCallback(lambda ignored:
2826            mr.get_signature())
2827        d.addCallback(lambda signature:
2828            self.assertThat(signature, Equals(self.signature)))
2829
2830        d.addCallback(lambda ignored:
2831            mr.get_verification_key())
2832        d.addCallback(lambda verification_key:
2833            self.assertThat(verification_key, Equals(self.verification_key)))
2834
2835        d.addCallback(lambda ignored:
2836            mr.get_seqnum())
2837        d.addCallback(lambda seqnum:
2838            self.assertThat(seqnum, Equals(0)))
2839
2840        d.addCallback(lambda ignored:
2841            mr.get_root_hash())
2842        d.addCallback(lambda root_hash:
2843            self.assertThat(self.root_hash, Equals(root_hash)))
2844
2845        d.addCallback(lambda ignored:
2846            mr.get_encoding_parameters())
2847        def _check_encoding_parameters(args):
2848            (k, n, segsize, datalen) = args
2849            self.assertThat(k, Equals(3))
2850            self.assertThat(n, Equals(10))
2851            self.assertThat(segsize, Equals(6))
2852            self.assertThat(datalen, Equals(36))
2853        d.addCallback(_check_encoding_parameters)
2854
2855        d.addCallback(lambda ignored:
2856            mr.get_checkstring())
2857        d.addCallback(lambda checkstring:
2858            self.assertThat(checkstring, Equals(mw.get_checkstring())))
2859        return d
2860
2861
2862    def test_is_sdmf(self):
2863        # The MDMFSlotReadProxy should also know how to read SDMF files,
2864        # since it will encounter them on the grid. Callers use the
2865        # is_sdmf method to test this.
2866        self.write_sdmf_share_to_server(b"si1")
2867        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2868        d = mr.is_sdmf()
2869        d.addCallback(lambda issdmf:
2870            self.assertTrue(issdmf))
2871        return d
2872
2873
2874    def test_reads_sdmf(self):
2875        # The slot read proxy should, naturally, know how to tell us
2876        # about data in the SDMF format
2877        self.write_sdmf_share_to_server(b"si1")
2878        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2879        d = defer.succeed(None)
2880        d.addCallback(lambda ignored:
2881            mr.is_sdmf())
2882        d.addCallback(lambda issdmf:
2883            self.assertTrue(issdmf))
2884
2885        # What do we need to read?
2886        #  - The sharedata
2887        #  - The salt
2888        d.addCallback(lambda ignored:
2889            mr.get_block_and_salt(0))
2890        def _check_block_and_salt(results):
2891            block, salt = results
2892            # Our original file is 36 bytes long. Then each share is 12
2893            # bytes in size. The share is composed entirely of the
2894            # letter a. self.block contains 2 as, so 6 * self.block is
2895            # what we are looking for.
2896            self.assertThat(block, Equals(self.block * 6))
2897            self.assertThat(salt, Equals(self.salt))
2898        d.addCallback(_check_block_and_salt)
2899
2900        #  - The blockhashes
2901        d.addCallback(lambda ignored:
2902            mr.get_blockhashes())
2903        d.addCallback(lambda blockhashes:
2904            self.assertThat(self.block_hash_tree,
2905                                 Equals(blockhashes),
2906                                 blockhashes))
2907        #  - The sharehashes
2908        d.addCallback(lambda ignored:
2909            mr.get_sharehashes())
2910        d.addCallback(lambda sharehashes:
2911            self.assertThat(self.share_hash_chain,
2912                                 Equals(sharehashes)))
2913        #  - The keys
2914        d.addCallback(lambda ignored:
2915            mr.get_encprivkey())
2916        d.addCallback(lambda encprivkey:
2917            self.assertThat(encprivkey, Equals(self.encprivkey), encprivkey))
2918        d.addCallback(lambda ignored:
2919            mr.get_verification_key())
2920        d.addCallback(lambda verification_key:
2921            self.assertThat(verification_key,
2922                                 Equals(self.verification_key),
2923                                 verification_key))
2924        #  - The signature
2925        d.addCallback(lambda ignored:
2926            mr.get_signature())
2927        d.addCallback(lambda signature:
2928            self.assertThat(signature, Equals(self.signature), signature))
2929
2930        #  - The sequence number
2931        d.addCallback(lambda ignored:
2932            mr.get_seqnum())
2933        d.addCallback(lambda seqnum:
2934            self.assertThat(seqnum, Equals(0), seqnum))
2935
2936        #  - The root hash
2937        d.addCallback(lambda ignored:
2938            mr.get_root_hash())
2939        d.addCallback(lambda root_hash:
2940            self.assertThat(root_hash, Equals(self.root_hash), root_hash))
2941        return d
2942
2943
2944    def test_only_reads_one_segment_sdmf(self):
2945        # SDMF shares have only one segment, so it doesn't make sense to
2946        # read more segments than that. The reader should know this and
2947        # complain if we try to do that.
2948        self.write_sdmf_share_to_server(b"si1")
2949        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
2950        d = defer.succeed(None)
2951        d.addCallback(lambda ignored:
2952            mr.is_sdmf())
2953        d.addCallback(lambda issdmf:
2954            self.assertTrue(issdmf))
2955        d.addCallback(lambda ignored:
2956            self.shouldFail(LayoutInvalid, "test bad segment",
2957                            None,
2958                            mr.get_block_and_salt, 1))
2959        return d
2960
2961
2962    def test_read_with_prefetched_mdmf_data(self):
2963        # The MDMFSlotReadProxy will prefill certain fields if you pass
2964        # it data that you have already fetched. This is useful for
2965        # cases like the Servermap, which prefetches ~2kb of data while
2966        # finding out which shares are on the remote peer so that it
2967        # doesn't waste round trips.
2968        mdmf_data = self.build_test_mdmf_share()
2969        self.write_test_share_to_server(b"si1")
2970        def _make_mr(ignored, length):
2971            mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0, mdmf_data[:length])
2972            return mr
2973
2974        d = defer.succeed(None)
2975        # This should be enough to fill in both the encoding parameters
2976        # and the table of offsets, which will complete the version
2977        # information tuple.
2978        d.addCallback(_make_mr, 123)
2979        d.addCallback(lambda mr:
2980            mr.get_verinfo())
2981        def _check_verinfo(verinfo):
2982            self.assertTrue(verinfo)
2983            self.assertThat(verinfo, HasLength(9))
2984            (seqnum,
2985             root_hash,
2986             salt_hash,
2987             segsize,
2988             datalen,
2989             k,
2990             n,
2991             prefix,
2992             offsets) = verinfo
2993            self.assertThat(seqnum, Equals(0))
2994            self.assertThat(root_hash, Equals(self.root_hash))
2995            self.assertThat(segsize, Equals(6))
2996            self.assertThat(datalen, Equals(36))
2997            self.assertThat(k, Equals(3))
2998            self.assertThat(n, Equals(10))
2999            expected_prefix = struct.pack(MDMFSIGNABLEHEADER,
3000                                          1,
3001                                          seqnum,
3002                                          root_hash,
3003                                          k,
3004                                          n,
3005                                          segsize,
3006                                          datalen)
3007            self.assertThat(expected_prefix, Equals(prefix))
3008            self.assertThat(self.rref.read_count, Equals(0))
3009        d.addCallback(_check_verinfo)
3010        # This is not enough data to read a block and a share, so the
3011        # wrapper should attempt to read this from the remote server.
3012        d.addCallback(_make_mr, 123)
3013        d.addCallback(lambda mr:
3014            mr.get_block_and_salt(0))
3015        def _check_block_and_salt(block_and_salt):
3016            (block, salt) = block_and_salt
3017            self.assertThat(block, Equals(self.block))
3018            self.assertThat(salt, Equals(self.salt))
3019            self.assertThat(self.rref.read_count, Equals(1))
3020        # This should be enough data to read one block.
3021        d.addCallback(_make_mr, 123 + PRIVATE_KEY_SIZE + SIGNATURE_SIZE + VERIFICATION_KEY_SIZE + SHARE_HASH_CHAIN_SIZE + 140)
3022        d.addCallback(lambda mr:
3023            mr.get_block_and_salt(0))
3024        d.addCallback(_check_block_and_salt)
3025        return d
3026
3027
3028    def test_read_with_prefetched_sdmf_data(self):
3029        sdmf_data = self.build_test_sdmf_share()
3030        self.write_sdmf_share_to_server(b"si1")
3031        def _make_mr(ignored, length):
3032            mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0, sdmf_data[:length])
3033            return mr
3034
3035        d = defer.succeed(None)
3036        # This should be enough to get us the encoding parameters,
3037        # offset table, and everything else we need to build a verinfo
3038        # string.
3039        d.addCallback(_make_mr, 123)
3040        d.addCallback(lambda mr:
3041            mr.get_verinfo())
3042        def _check_verinfo(verinfo):
3043            self.assertTrue(verinfo)
3044            self.assertThat(verinfo, HasLength(9))
3045            (seqnum,
3046             root_hash,
3047             salt,
3048             segsize,
3049             datalen,
3050             k,
3051             n,
3052             prefix,
3053             offsets) = verinfo
3054            self.assertThat(seqnum, Equals(0))
3055            self.assertThat(root_hash, Equals(self.root_hash))
3056            self.assertThat(salt, Equals(self.salt))
3057            self.assertThat(segsize, Equals(36))
3058            self.assertThat(datalen, Equals(36))
3059            self.assertThat(k, Equals(3))
3060            self.assertThat(n, Equals(10))
3061            expected_prefix = struct.pack(SIGNED_PREFIX,
3062                                          0,
3063                                          seqnum,
3064                                          root_hash,
3065                                          salt,
3066                                          k,
3067                                          n,
3068                                          segsize,
3069                                          datalen)
3070            self.assertThat(expected_prefix, Equals(prefix))
3071            self.assertThat(self.rref.read_count, Equals(0))
3072        d.addCallback(_check_verinfo)
3073        # This shouldn't be enough to read any share data.
3074        d.addCallback(_make_mr, 123)
3075        d.addCallback(lambda mr:
3076            mr.get_block_and_salt(0))
3077        def _check_block_and_salt(block_and_salt):
3078            (block, salt) = block_and_salt
3079            self.assertThat(block, Equals(self.block * 6))
3080            self.assertThat(salt, Equals(self.salt))
3081            # TODO: Fix the read routine so that it reads only the data
3082            #       that it has cached if it can't read all of it.
3083            self.assertThat(self.rref.read_count, Equals(2))
3084
3085        # This should be enough to read share data.
3086        d.addCallback(_make_mr, self.offsets['share_data'])
3087        d.addCallback(lambda mr:
3088            mr.get_block_and_salt(0))
3089        d.addCallback(_check_block_and_salt)
3090        return d
3091
3092
3093    def test_read_with_empty_mdmf_file(self):
3094        # Some tests upload a file with no contents to test things
3095        # unrelated to the actual handling of the content of the file.
3096        # The reader should behave intelligently in these cases.
3097        self.write_test_share_to_server(b"si1", empty=True)
3098        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3099        # We should be able to get the encoding parameters, and they
3100        # should be correct.
3101        d = defer.succeed(None)
3102        d.addCallback(lambda ignored:
3103            mr.get_encoding_parameters())
3104        def _check_encoding_parameters(params):
3105            self.assertThat(params, HasLength(4))
3106            k, n, segsize, datalen = params
3107            self.assertThat(k, Equals(3))
3108            self.assertThat(n, Equals(10))
3109            self.assertThat(segsize, Equals(0))
3110            self.assertThat(datalen, Equals(0))
3111        d.addCallback(_check_encoding_parameters)
3112
3113        # We should not be able to fetch a block, since there are no
3114        # blocks to fetch
3115        d.addCallback(lambda ignored:
3116            self.shouldFail(LayoutInvalid, "get block on empty file",
3117                            None,
3118                            mr.get_block_and_salt, 0))
3119        return d
3120
3121
3122    def test_read_with_empty_sdmf_file(self):
3123        self.write_sdmf_share_to_server(b"si1", empty=True)
3124        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3125        # We should be able to get the encoding parameters, and they
3126        # should be correct
3127        d = defer.succeed(None)
3128        d.addCallback(lambda ignored:
3129            mr.get_encoding_parameters())
3130        def _check_encoding_parameters(params):
3131            self.assertThat(params, HasLength(4))
3132            k, n, segsize, datalen = params
3133            self.assertThat(k, Equals(3))
3134            self.assertThat(n, Equals(10))
3135            self.assertThat(segsize, Equals(0))
3136            self.assertThat(datalen, Equals(0))
3137        d.addCallback(_check_encoding_parameters)
3138
3139        # It does not make sense to get a block in this format, so we
3140        # should not be able to.
3141        d.addCallback(lambda ignored:
3142            self.shouldFail(LayoutInvalid, "get block on an empty file",
3143                            None,
3144                            mr.get_block_and_salt, 0))
3145        return d
3146
3147
3148    def test_verinfo_with_sdmf_file(self):
3149        self.write_sdmf_share_to_server(b"si1")
3150        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3151        # We should be able to get the version information.
3152        d = defer.succeed(None)
3153        d.addCallback(lambda ignored:
3154            mr.get_verinfo())
3155        def _check_verinfo(verinfo):
3156            self.assertTrue(verinfo)
3157            self.assertThat(verinfo, HasLength(9))
3158            (seqnum,
3159             root_hash,
3160             salt,
3161             segsize,
3162             datalen,
3163             k,
3164             n,
3165             prefix,
3166             offsets) = verinfo
3167            self.assertThat(seqnum, Equals(0))
3168            self.assertThat(root_hash, Equals(self.root_hash))
3169            self.assertThat(salt, Equals(self.salt))
3170            self.assertThat(segsize, Equals(36))
3171            self.assertThat(datalen, Equals(36))
3172            self.assertThat(k, Equals(3))
3173            self.assertThat(n, Equals(10))
3174            expected_prefix = struct.pack(">BQ32s16s BBQQ",
3175                                          0,
3176                                          seqnum,
3177                                          root_hash,
3178                                          salt,
3179                                          k,
3180                                          n,
3181                                          segsize,
3182                                          datalen)
3183            self.assertThat(prefix, Equals(expected_prefix))
3184            self.assertThat(offsets, Equals(self.offsets))
3185        d.addCallback(_check_verinfo)
3186        return d
3187
3188
3189    def test_verinfo_with_mdmf_file(self):
3190        self.write_test_share_to_server(b"si1")
3191        mr = MDMFSlotReadProxy(self.storage_server, b"si1", 0)
3192        d = defer.succeed(None)
3193        d.addCallback(lambda ignored:
3194            mr.get_verinfo())
3195        def _check_verinfo(verinfo):
3196            self.assertTrue(verinfo)
3197            self.assertThat(verinfo, HasLength(9))
3198            (seqnum,
3199             root_hash,
3200             IV,
3201             segsize,
3202             datalen,
3203             k,
3204             n,
3205             prefix,
3206             offsets) = verinfo
3207            self.assertThat(seqnum, Equals(0))
3208            self.assertThat(root_hash, Equals(self.root_hash))
3209            self.assertFalse(IV)
3210            self.assertThat(segsize, Equals(6))
3211            self.assertThat(datalen, Equals(36))
3212            self.assertThat(k, Equals(3))
3213            self.assertThat(n, Equals(10))
3214            expected_prefix = struct.pack(">BQ32s BBQQ",
3215                                          1,
3216                                          seqnum,
3217                                          root_hash,
3218                                          k,
3219                                          n,
3220                                          segsize,
3221                                          datalen)
3222            self.assertThat(prefix, Equals(expected_prefix))
3223            self.assertThat(offsets, Equals(self.offsets))
3224        d.addCallback(_check_verinfo)
3225        return d
3226
3227
3228    def test_sdmf_writer(self):
3229        # Go through the motions of writing an SDMF share to the storage
3230        # server. Then read the storage server to see that the share got
3231        # written in the way that we think it should have.
3232
3233        # We do this first so that the necessary instance variables get
3234        # set the way we want them for the tests below.
3235        data = self.build_test_sdmf_share()
3236        sdmfr = SDMFSlotWriteProxy(0,
3237                                   self.storage_server,
3238                                   b"si1",
3239                                   self.secrets,
3240                                   0, 3, 10, 36, 36)
3241        # Put the block and salt.
3242        sdmfr.put_block(self.blockdata, 0, self.salt)
3243
3244        # Put the encprivkey
3245        sdmfr.put_encprivkey(self.encprivkey)
3246
3247        # Put the block and share hash chains
3248        sdmfr.put_blockhashes(self.block_hash_tree)
3249        sdmfr.put_sharehashes(self.share_hash_chain)
3250        sdmfr.put_root_hash(self.root_hash)
3251
3252        # Put the signature
3253        sdmfr.put_signature(self.signature)
3254
3255        # Put the verification key
3256        sdmfr.put_verification_key(self.verification_key)
3257
3258        # Now check to make sure that nothing has been written yet.
3259        self.assertThat(self.rref.write_count, Equals(0))
3260
3261        # Now finish publishing
3262        d = sdmfr.finish_publishing()
3263        def _then(ignored):
3264            self.assertThat(self.rref.write_count, Equals(1))
3265            read = self.ss.slot_readv
3266            self.assertThat(read(b"si1", [0], [(0, len(data))]),
3267                                 Equals({0: [data]}))
3268        d.addCallback(_then)
3269        return d
3270
3271
3272    def test_sdmf_writer_preexisting_share(self):
3273        data = self.build_test_sdmf_share()
3274        self.write_sdmf_share_to_server(b"si1")
3275
3276        # Now there is a share on the storage server. To successfully
3277        # write, we need to set the checkstring correctly. When we
3278        # don't, no write should occur.
3279        sdmfw = SDMFSlotWriteProxy(0,
3280                                   self.storage_server,
3281                                   b"si1",
3282                                   self.secrets,
3283                                   1, 3, 10, 36, 36)
3284        sdmfw.put_block(self.blockdata, 0, self.salt)
3285
3286        # Put the encprivkey
3287        sdmfw.put_encprivkey(self.encprivkey)
3288
3289        # Put the block and share hash chains
3290        sdmfw.put_blockhashes(self.block_hash_tree)
3291        sdmfw.put_sharehashes(self.share_hash_chain)
3292
3293        # Put the root hash
3294        sdmfw.put_root_hash(self.root_hash)
3295
3296        # Put the signature
3297        sdmfw.put_signature(self.signature)
3298
3299        # Put the verification key
3300        sdmfw.put_verification_key(self.verification_key)
3301
3302        # We shouldn't have a checkstring yet
3303        self.assertThat(sdmfw.get_checkstring(), Equals(b""))
3304
3305        d = sdmfw.finish_publishing()
3306        def _then(results):
3307            self.assertFalse(results[0])
3308            # this is the correct checkstring
3309            self._expected_checkstring = results[1][0][0]
3310            return self._expected_checkstring
3311
3312        d.addCallback(_then)
3313        d.addCallback(sdmfw.set_checkstring)
3314        d.addCallback(lambda ignored:
3315            sdmfw.get_checkstring())
3316        d.addCallback(lambda checkstring:
3317            self.assertThat(checkstring, Equals(self._expected_checkstring)))
3318        d.addCallback(lambda ignored:
3319            sdmfw.finish_publishing())
3320        def _then_again(results):
3321            self.assertTrue(results[0])
3322            read = self.ss.slot_readv
3323            self.assertThat(read(b"si1", [0], [(1, 8)]),
3324                                 Equals({0: [struct.pack(">Q", 1)]}))
3325            self.assertThat(read(b"si1", [0], [(9, len(data) - 9)]),
3326                                 Equals({0: [data[9:]]}))
3327        d.addCallback(_then_again)
3328        return d
3329
3330
3331class Stats(SyncTestCase):
3332
3333    def setUp(self):
3334        super(Stats, self).setUp()
3335        self.sparent = LoggingServiceParent()
3336        self._lease_secret = itertools.count()
3337        self.addCleanup(self.sparent.stopService)
3338
3339    def workdir(self, name):
3340        basedir = os.path.join("storage", "Server", name)
3341        return basedir
3342
3343    def create(self, name):
3344        workdir = self.workdir(name)
3345        ss = StorageServer(workdir, b"\x00" * 20)
3346        ss.setServiceParent(self.sparent)
3347        return ss
3348
3349    def test_latencies(self):
3350        ss = self.create("test_latencies")
3351        for i in range(10000):
3352            ss.add_latency("allocate", 1.0 * i)
3353        for i in range(1000):
3354            ss.add_latency("renew", 1.0 * i)
3355        for i in range(20):
3356            ss.add_latency("write", 1.0 * i)
3357        for i in range(10):
3358            ss.add_latency("cancel", 2.0 * i)
3359        ss.add_latency("get", 5.0)
3360
3361        output = ss.get_latencies()
3362
3363        self.assertThat(sorted(output.keys()),
3364                             Equals(sorted(["allocate", "renew", "cancel", "write", "get"])))
3365        self.assertThat(ss.latencies["allocate"], HasLength(1000))
3366        self.assertTrue(abs(output["allocate"]["mean"] - 9500) < 1, output)
3367        self.assertTrue(abs(output["allocate"]["01_0_percentile"] - 9010) < 1, output)
3368        self.assertTrue(abs(output["allocate"]["10_0_percentile"] - 9100) < 1, output)
3369        self.assertTrue(abs(output["allocate"]["50_0_percentile"] - 9500) < 1, output)
3370        self.assertTrue(abs(output["allocate"]["90_0_percentile"] - 9900) < 1, output)
3371        self.assertTrue(abs(output["allocate"]["95_0_percentile"] - 9950) < 1, output)
3372        self.assertTrue(abs(output["allocate"]["99_0_percentile"] - 9990) < 1, output)
3373        self.assertTrue(abs(output["allocate"]["99_9_percentile"] - 9999) < 1, output)
3374
3375        self.assertThat(ss.latencies["renew"], HasLength(1000))
3376        self.assertTrue(abs(output["renew"]["mean"] - 500) < 1, output)
3377        self.assertTrue(abs(output["renew"]["01_0_percentile"] -  10) < 1, output)
3378        self.assertTrue(abs(output["renew"]["10_0_percentile"] - 100) < 1, output)
3379        self.assertTrue(abs(output["renew"]["50_0_percentile"] - 500) < 1, output)
3380        self.assertTrue(abs(output["renew"]["90_0_percentile"] - 900) < 1, output)
3381        self.assertTrue(abs(output["renew"]["95_0_percentile"] - 950) < 1, output)
3382        self.assertTrue(abs(output["renew"]["99_0_percentile"] - 990) < 1, output)
3383        self.assertTrue(abs(output["renew"]["99_9_percentile"] - 999) < 1, output)
3384
3385        self.assertThat(ss.latencies["write"], HasLength(20))
3386        self.assertTrue(abs(output["write"]["mean"] - 9) < 1, output)
3387        self.assertTrue(output["write"]["01_0_percentile"] is None, output)
3388        self.assertTrue(abs(output["write"]["10_0_percentile"] -  2) < 1, output)
3389        self.assertTrue(abs(output["write"]["50_0_percentile"] - 10) < 1, output)
3390        self.assertTrue(abs(output["write"]["90_0_percentile"] - 18) < 1, output)
3391        self.assertTrue(abs(output["write"]["95_0_percentile"] - 19) < 1, output)
3392        self.assertTrue(output["write"]["99_0_percentile"] is None, output)
3393        self.assertTrue(output["write"]["99_9_percentile"] is None, output)
3394
3395        self.assertThat(ss.latencies["cancel"], HasLength(10))
3396        self.assertTrue(abs(output["cancel"]["mean"] - 9) < 1, output)
3397        self.assertTrue(output["cancel"]["01_0_percentile"] is None, output)
3398        self.assertTrue(abs(output["cancel"]["10_0_percentile"] -  2) < 1, output)
3399        self.assertTrue(abs(output["cancel"]["50_0_percentile"] - 10) < 1, output)
3400        self.assertTrue(abs(output["cancel"]["90_0_percentile"] - 18) < 1, output)
3401        self.assertTrue(output["cancel"]["95_0_percentile"] is None, output)
3402        self.assertTrue(output["cancel"]["99_0_percentile"] is None, output)
3403        self.assertTrue(output["cancel"]["99_9_percentile"] is None, output)
3404
3405        self.assertThat(ss.latencies["get"], HasLength(1))
3406        self.assertTrue(output["get"]["mean"] is None, output)
3407        self.assertTrue(output["get"]["01_0_percentile"] is None, output)
3408        self.assertTrue(output["get"]["10_0_percentile"] is None, output)
3409        self.assertTrue(output["get"]["50_0_percentile"] is None, output)
3410        self.assertTrue(output["get"]["90_0_percentile"] is None, output)
3411        self.assertTrue(output["get"]["95_0_percentile"] is None, output)
3412        self.assertTrue(output["get"]["99_0_percentile"] is None, output)
3413        self.assertTrue(output["get"]["99_9_percentile"] is None, output)
3414
3415immutable_schemas = strategies.sampled_from(list(ALL_IMMUTABLE_SCHEMAS))
3416
3417class ShareFileTests(SyncTestCase):
3418    """Tests for allmydata.storage.immutable.ShareFile."""
3419
3420    def get_sharefile(self, **kwargs):
3421        sf = ShareFile(self.mktemp(), max_size=1000, create=True, **kwargs)
3422        sf.write_share_data(0, b"abc")
3423        sf.write_share_data(2, b"DEF")
3424        # Should be b'abDEF' now.
3425        return sf
3426
3427    @given(immutable_schemas)
3428    def test_read_write(self, schema):
3429        """Basic writes can be read."""
3430        sf = self.get_sharefile(schema=schema)
3431        self.assertEqual(sf.read_share_data(0, 3), b"abD")
3432        self.assertEqual(sf.read_share_data(1, 4), b"bDEF")
3433
3434    @given(immutable_schemas)
3435    def test_reads_beyond_file_end(self, schema):
3436        """Reads beyond the file size are truncated."""
3437        sf = self.get_sharefile(schema=schema)
3438        self.assertEqual(sf.read_share_data(0, 10), b"abDEF")
3439        self.assertEqual(sf.read_share_data(5, 10), b"")
3440
3441    @given(immutable_schemas)
3442    def test_too_large_write(self, schema):
3443        """Can't do write larger than file size."""
3444        sf = self.get_sharefile(schema=schema)
3445        with self.assertRaises(DataTooLargeError):
3446            sf.write_share_data(0, b"x" * 3000)
3447
3448    @given(immutable_schemas)
3449    def test_no_leases_cancelled(self, schema):
3450        """If no leases were cancelled, IndexError is raised."""
3451        sf = self.get_sharefile(schema=schema)
3452        with self.assertRaises(IndexError):
3453            sf.cancel_lease(b"garbage")
3454
3455    @given(immutable_schemas)
3456    def test_long_lease_count_format(self, schema):
3457        """
3458        ``ShareFile.__init__`` raises ``ValueError`` if the lease count format
3459        given is longer than one character.
3460        """
3461        with self.assertRaises(ValueError):
3462            self.get_sharefile(schema=schema, lease_count_format="BB")
3463
3464    @given(immutable_schemas)
3465    def test_large_lease_count_format(self, schema):
3466        """
3467        ``ShareFile.__init__`` raises ``ValueError`` if the lease count format
3468        encodes to a size larger than 8 bytes.
3469        """
3470        with self.assertRaises(ValueError):
3471            self.get_sharefile(schema=schema, lease_count_format="Q")
3472
3473    @given(immutable_schemas)
3474    def test_avoid_lease_overflow(self, schema):
3475        """
3476        If the share file already has the maximum number of leases supported then
3477        ``ShareFile.add_lease`` raises ``struct.error`` and makes no changes
3478        to the share file contents.
3479        """
3480        make_lease = partial(
3481            LeaseInfo,
3482            renew_secret=b"r" * 32,
3483            cancel_secret=b"c" * 32,
3484            expiration_time=2 ** 31,
3485        )
3486        # Make it a little easier to reach the condition by limiting the
3487        # number of leases to only 255.
3488        sf = self.get_sharefile(schema=schema, lease_count_format="B")
3489
3490        # Add the leases.
3491        for i in range(2 ** 8 - 1):
3492            lease = make_lease(owner_num=i)
3493            sf.add_lease(lease)
3494
3495        # Capture the state of the share file at this point so we can
3496        # determine whether the next operation modifies it or not.
3497        with open(sf.home, "rb") as f:
3498            before_data = f.read()
3499
3500        # It is not possible to add a 256th lease.
3501        lease = make_lease(owner_num=256)
3502        with self.assertRaises(struct.error):
3503            sf.add_lease(lease)
3504
3505        # Compare the share file state to what we captured earlier.  Any
3506        # change is a bug.
3507        with open(sf.home, "rb") as f:
3508            after_data = f.read()
3509
3510        self.assertEqual(before_data, after_data)
3511
3512    @given(immutable_schemas)
3513    def test_renew_secret(self, schema):
3514        """
3515        A lease loaded from an immutable share file at any schema version can have
3516        its renew secret verified.
3517        """
3518        renew_secret = b"r" * 32
3519        cancel_secret = b"c" * 32
3520        expiration_time = 2 ** 31
3521
3522        sf = self.get_sharefile(schema=schema)
3523        lease = LeaseInfo(
3524            owner_num=0,
3525            renew_secret=renew_secret,
3526            cancel_secret=cancel_secret,
3527            expiration_time=expiration_time,
3528        )
3529        sf.add_lease(lease)
3530        (loaded_lease,) = sf.get_leases()
3531        self.assertTrue(loaded_lease.is_renew_secret(renew_secret))
3532
3533    @given(immutable_schemas)
3534    def test_cancel_secret(self, schema):
3535        """
3536        A lease loaded from an immutable share file at any schema version can have
3537        its cancel secret verified.
3538        """
3539        renew_secret = b"r" * 32
3540        cancel_secret = b"c" * 32
3541        expiration_time = 2 ** 31
3542
3543        sf = self.get_sharefile(schema=schema)
3544        lease = LeaseInfo(
3545            owner_num=0,
3546            renew_secret=renew_secret,
3547            cancel_secret=cancel_secret,
3548            expiration_time=expiration_time,
3549        )
3550        sf.add_lease(lease)
3551        (loaded_lease,) = sf.get_leases()
3552        self.assertTrue(loaded_lease.is_cancel_secret(cancel_secret))
3553
3554mutable_schemas = strategies.sampled_from(list(ALL_MUTABLE_SCHEMAS))
3555
3556class MutableShareFileTests(SyncTestCase):
3557    """
3558    Tests for allmydata.storage.mutable.MutableShareFile.
3559    """
3560    def get_sharefile(self, **kwargs):
3561        return MutableShareFile(self.mktemp(), **kwargs)
3562
3563    @given(
3564        schema=mutable_schemas,
3565        nodeid=strategies.just(b"x" * 20),
3566        write_enabler=strategies.just(b"y" * 32),
3567        datav=strategies.lists(
3568            # Limit the max size of these so we don't write *crazy* amounts of
3569            # data to disk.
3570            strategies.tuples(offsets(), strategies.binary(max_size=2 ** 8)),
3571            max_size=2 ** 8,
3572        ),
3573        new_length=offsets(),
3574    )
3575    def test_readv_reads_share_data(self, schema, nodeid, write_enabler, datav, new_length):
3576        """
3577        ``MutableShareFile.readv`` returns bytes from the share data portion
3578        of the share file.
3579        """
3580        sf = self.get_sharefile(schema=schema)
3581        sf.create(my_nodeid=nodeid, write_enabler=write_enabler)
3582        sf.writev(datav=datav, new_length=new_length)
3583
3584        # Apply all of the writes to a simple in-memory buffer so we can
3585        # resolve the final state of the share data.  In particular, this
3586        # helps deal with overlapping writes which otherwise make it tricky to
3587        # figure out what data to expect to be able to read back.
3588        buf = BytesIO()
3589        for (offset, data) in datav:
3590            buf.seek(offset)
3591            buf.write(data)
3592        buf.truncate(new_length)
3593
3594        # Using that buffer, determine the expected result of a readv for all
3595        # of the data just written.
3596        def read_from_buf(offset, length):
3597            buf.seek(offset)
3598            return buf.read(length)
3599        expected_data = list(
3600            read_from_buf(offset, len(data))
3601            for (offset, data)
3602            in datav
3603        )
3604
3605        # Perform a read that gives back all of the data written to the share
3606        # file.
3607        read_vectors = list((offset, len(data)) for (offset, data) in datav)
3608        read_data = sf.readv(read_vectors)
3609
3610        # Make sure the read reproduces the value we computed using our local
3611        # buffer.
3612        self.assertEqual(expected_data, read_data)
3613
3614    @given(
3615        schema=mutable_schemas,
3616        nodeid=strategies.just(b"x" * 20),
3617        write_enabler=strategies.just(b"y" * 32),
3618        readv=strategies.lists(strategies.tuples(offsets(), lengths()), min_size=1),
3619        random=strategies.randoms(),
3620    )
3621    def test_readv_rejects_negative_length(self, schema, nodeid, write_enabler, readv, random):
3622        """
3623        If a negative length is given to ``MutableShareFile.readv`` in a read
3624        vector then ``AssertionError`` is raised.
3625        """
3626        # Pick a read vector to break with a negative value
3627        readv_index = random.randrange(len(readv))
3628        # Decide on whether we're breaking offset or length
3629        offset_or_length = random.randrange(2)
3630
3631        # A helper function that will take a valid offset and length and break
3632        # one of them.
3633        def corrupt(break_length, offset, length):
3634            if break_length:
3635                # length must not be 0 or flipping the sign does nothing
3636                # length must not be negative or flipping the sign *fixes* it
3637                assert length > 0
3638                return (offset, -length)
3639            else:
3640                if offset > 0:
3641                    # We can break offset just by flipping the sign.
3642                    return (-offset, length)
3643                else:
3644                    # Otherwise it has to be zero.  If it was negative, what's
3645                    # going on?
3646                    assert offset == 0
3647                    # Since we can't just flip the sign on 0 to break things,
3648                    # replace a 0 offset with a simple negative value.  All
3649                    # other negative values will be tested by the `offset > 0`
3650                    # case above.
3651                    return (-1, length)
3652
3653        # Break the read vector very slightly!
3654        broken_readv = readv[:]
3655        broken_readv[readv_index] = corrupt(
3656            offset_or_length,
3657            *broken_readv[readv_index]
3658        )
3659
3660        sf = self.get_sharefile(schema=schema)
3661        sf.create(my_nodeid=nodeid, write_enabler=write_enabler)
3662
3663        # A read with a broken read vector is an error.
3664        with self.assertRaises(AssertionError):
3665            sf.readv(broken_readv)
3666
3667
3668class LeaseInfoTests(SyncTestCase):
3669    """
3670    Tests for ``allmydata.storage.lease.LeaseInfo``.
3671    """
3672    def test_is_renew_secret(self):
3673        """
3674        ``LeaseInfo.is_renew_secret`` returns ``True`` if the value given is the
3675        renew secret.
3676        """
3677        renew_secret = b"r" * 32
3678        cancel_secret = b"c" * 32
3679        lease = LeaseInfo(
3680            owner_num=1,
3681            renew_secret=renew_secret,
3682            cancel_secret=cancel_secret,
3683        )
3684        self.assertTrue(lease.is_renew_secret(renew_secret))
3685
3686    def test_is_not_renew_secret(self):
3687        """
3688        ``LeaseInfo.is_renew_secret`` returns ``False`` if the value given is not
3689        the renew secret.
3690        """
3691        renew_secret = b"r" * 32
3692        cancel_secret = b"c" * 32
3693        lease = LeaseInfo(
3694            owner_num=1,
3695            renew_secret=renew_secret,
3696            cancel_secret=cancel_secret,
3697        )
3698        self.assertFalse(lease.is_renew_secret(cancel_secret))
3699
3700    def test_is_cancel_secret(self):
3701        """
3702        ``LeaseInfo.is_cancel_secret`` returns ``True`` if the value given is the
3703        cancel secret.
3704        """
3705        renew_secret = b"r" * 32
3706        cancel_secret = b"c" * 32
3707        lease = LeaseInfo(
3708            owner_num=1,
3709            renew_secret=renew_secret,
3710            cancel_secret=cancel_secret,
3711        )
3712        self.assertTrue(lease.is_cancel_secret(cancel_secret))
3713
3714    def test_is_not_cancel_secret(self):
3715        """
3716        ``LeaseInfo.is_cancel_secret`` returns ``False`` if the value given is not
3717        the cancel secret.
3718        """
3719        renew_secret = b"r" * 32
3720        cancel_secret = b"c" * 32
3721        lease = LeaseInfo(
3722            owner_num=1,
3723            renew_secret=renew_secret,
3724            cancel_secret=cancel_secret,
3725        )
3726        self.assertFalse(lease.is_cancel_secret(renew_secret))
3727
3728    @given(
3729        strategies.tuples(
3730            strategies.integers(min_value=0, max_value=2 ** 31 - 1),
3731            strategies.binary(min_size=32, max_size=32),
3732            strategies.binary(min_size=32, max_size=32),
3733            strategies.integers(min_value=0, max_value=2 ** 31 - 1),
3734            strategies.binary(min_size=20, max_size=20),
3735        ),
3736    )
3737    def test_immutable_size(self, initializer_args):
3738        """
3739        ``LeaseInfo.immutable_size`` returns the length of the result of
3740        ``LeaseInfo.to_immutable_data``.
3741
3742        ``LeaseInfo.mutable_size`` returns the length of the result of
3743        ``LeaseInfo.to_mutable_data``.
3744        """
3745        info = LeaseInfo(*initializer_args)
3746        self.expectThat(
3747            info.to_immutable_data(),
3748            HasLength(info.immutable_size()),
3749        )
3750        self.expectThat(
3751            info.to_mutable_data(),
3752            HasLength(info.mutable_size()),
3753        )
3754
3755
3756class WriteBufferTests(SyncTestCase):
3757    """Tests for ``_WriteBuffer``."""
3758
3759    @given(
3760        small_writes=strategies.lists(
3761            strategies.binary(min_size=1, max_size=20),
3762            min_size=10, max_size=20),
3763        batch_size=strategies.integers(min_value=5, max_value=10)
3764    )
3765    def test_write_buffer(self, small_writes: list[bytes], batch_size: int):
3766        """
3767        ``_WriteBuffer`` coalesces small writes into bigger writes based on
3768        the batch size.
3769        """
3770        wb = _WriteBuffer(batch_size)
3771        result = b""
3772        for data in small_writes:
3773            should_flush = wb.queue_write(data)
3774            if should_flush:
3775                flushed_offset, flushed_data = wb.flush()
3776                self.assertEqual(flushed_offset, len(result))
3777                # The flushed data is in batch sizes, or closest approximation
3778                # given queued inputs:
3779                self.assertTrue(batch_size <= len(flushed_data) < batch_size + len(data))
3780                result += flushed_data
3781
3782        # Final flush:
3783        remaining_length = wb.get_queued_bytes()
3784        flushed_offset, flushed_data = wb.flush()
3785        self.assertEqual(remaining_length, len(flushed_data))
3786        self.assertEqual(flushed_offset, len(result))
3787        result += flushed_data
3788
3789        self.assertEqual(result, b"".join(small_writes))
Note: See TracBrowser for help on using the repository browser.