Changeset bf670c0 in trunk


Ignore:
Timestamp:
2021-10-12T21:42:59Z (4 years ago)
Author:
fenn-cs <fenn25.fn@…>
Branches:
master
Children:
26419c4
Parents:
49b6080 (diff), be5f583 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
Message:

Merge branch 'master' of github.com:tahoe-lafs/tahoe-lafs into 3788.refactor-src.test.mutable

Files:
14 added
18 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified .circleci/config.yml

    r49b6080 rbf670c0  
    4141
    4242      - "nixos-19-09":
     43          {}
     44
     45      - "nixos-21-05":
    4346          {}
    4447
     
    439442        user: "nobody"
    440443
    441 
    442   nixos-19-09:
     444  nixos-19-09: &NIXOS
    443445    docker:
    444446      # Run in a highly Nix-capable environment.
     
    448450    environment:
    449451      NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/nixos-19.09-small.tar.gz"
     452      SOURCE: "nix/"
    450453
    451454    steps:
     
    464467            # advantage of multiple cores and we get a little speedup by doing
    465468            # them in parallel.
    466             nix-build --cores 3 --max-jobs 2 nix/
     469            nix-build --cores 3 --max-jobs 2 "$SOURCE"
     470
     471  nixos-21-05:
     472    <<: *NIXOS
     473
     474    environment:
     475      # Note this doesn't look more similar to the 19.09 NIX_PATH URL because
     476      # there was some internal shuffling by the NixOS project about how they
     477      # publish stable revisions.
     478      NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs/archive/d32b07e6df276d78e3640eb43882b80c9b2b3459.tar.gz"
     479      SOURCE: "nix/py3.nix"
    467480
    468481  typechecks:
  • TabularUnified .github/workflows/ci.yml

    r49b6080 rbf670c0  
    2929        include:
    3030          # On macOS don't bother with 3.6-3.8, just to get faster builds.
    31           - os: macos-latest
     31          - os: macos-10.15
    3232            python-version: 2.7
    3333          - os: macos-latest
     
    169169        include:
    170170          # On macOS don't bother with 3.6, just to get faster builds.
    171           - os: macos-latest
     171          - os: macos-10.15
    172172            python-version: 2.7
    173173          - os: macos-latest
     
    184184      # tests on macOS.
    185185      - name: Install Tor [macOS, ${{ matrix.python-version }} ]
    186         if: ${{ matrix.os == 'macos-latest' }}
     186        if: ${{ contains(matrix.os, 'macos') }}
    187187        run: |
    188188          brew extract --version 0.4.5.8 tor homebrew/cask
     
    248248      matrix:
    249249        os:
    250           - macos-latest
     250          - macos-10.15
    251251          - windows-latest
    252252          - ubuntu-latest
  • TabularUnified docs/proposed/http-storage-node-protocol.rst

    r49b6080 rbf670c0  
    483483  It could also just be that the client's preferred servers have changed.
    484484
    485 ``PUT /v1/immutable/:storage_index/:share_number``
    486 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     485``PATCH /v1/immutable/:storage_index/:share_number``
     486!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    487487
    488488Write data for the indicated share.
     
    498498The server must recognize when all of the data has been received and mark the share as complete
    499499(which it can do because it was informed of the size when the storage index was initialized).
    500 Clients should upload chunks in re-assembly order.
    501500
    502501* When a chunk that does not complete the share is successfully uploaded the response is ``OK``.
    503 * When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
    504 * If the *Content-Range* for a request covers part of the share that has already been uploaded the response is ``CONFLICT``.
    505502  The response body indicates the range of share data that has yet to be uploaded.
    506503  That is::
     
    514511      ]
    515512    }
     513
     514* When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
     515* If the *Content-Range* for a request covers part of the share that has already,
     516  and the data does not match already written data,
     517  the response is ``CONFLICT``.
     518  At this point the only thing to do is abort the upload and start from scratch (see below).
     519
     520``PUT /v1/immutable/:storage_index/:share_number/abort``
     521!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
     522
     523This cancels an *in-progress* upload.
     524
     525The response code:
     526
     527* When the upload is still in progress and therefore the abort has succeeded,
     528  the response is ``OK``.
     529  Future uploads can start from scratch with no pre-existing upload state stored on the server.
     530* If the uploaded has already finished, the response is 405 (Method Not Allowed)
     531  and no change is made.
     532
     533
     534Discussion
     535``````````
     536
     537``PUT`` verbs are only supposed to be used to replace the whole resource,
     538thus the use of ``PATCH``.
     539From RFC 7231::
     540
     541   An origin server that allows PUT on a given target resource MUST send
     542   a 400 (Bad Request) response to a PUT request that contains a
     543   Content-Range header field (Section 4.2 of [RFC7233]), since the
     544   payload is likely to be partial content that has been mistakenly PUT
     545   as a full representation.  Partial content updates are possible by
     546   targeting a separately identified resource with state that overlaps a
     547   portion of the larger resource, or by using a different method that
     548   has been specifically defined for partial updates (for example, the
     549   PATCH method defined in [RFC5789]).
    516550
    517551
     
    601635                   "offset": 3,
    602636                   "size": 5,
    603                    "operator": "eq",
    604637                   "specimen": "hello"
    605638               }, ...],
     
    627660  }
    628661
     662A test vector or read vector that read beyond the boundaries of existing data will return nothing for any bytes past the end.
     663As a result, if there is no data at all, an empty bytestring is returned no matter what the offset or length.
     664
    629665Reading
    630666~~~~~~~
     
    667703#. Upload the content for immutable share ``7``::
    668704
    669      PUT /v1/immutable/AAAAAAAAAAAAAAAA/7
     705     PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
    670706     Content-Range: bytes 0-15/48
    671707     <first 16 bytes of share data>
     
    673709     200 OK
    674710
    675      PUT /v1/immutable/AAAAAAAAAAAAAAAA/7
     711     PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
    676712     Content-Range: bytes 16-31/48
    677713     <second 16 bytes of share data>
     
    679715     200 OK
    680716
    681      PUT /v1/immutable/AAAAAAAAAAAAAAAA/7
     717     PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
    682718     Content-Range: bytes 32-47/48
    683719     <final 16 bytes of share data>
     
    702738~~~~~~~~~~~~
    703739
    704 1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``::
     7401. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``.
     741The special test vector of size 1 but empty bytes will only pass
     742if there is no existing share,
     743otherwise it will read a byte which won't match `b""`::
    705744
    706745     POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
     
    716755                     "offset": 0,
    717756                     "size": 1,
    718                      "operator": "eq",
    719757                     "specimen": ""
    720758                 }],
     
    748786                 "test": [{
    749787                     "offset": 0,
    750                      "size": <checkstring size>,
    751                      "operator": "eq",
     788                     "size": <length of checkstring>,
    752789                     "specimen": "<checkstring>"
    753790                 }],
  • TabularUnified nix/overlays.nix

    r49b6080 rbf670c0  
    33    packageOverrides = python-self: python-super: {
    44      # eliot is not part of nixpkgs at all at this time.
    5       eliot = python-self.callPackage ./eliot.nix { };
     5      eliot = python-self.pythonPackages.callPackage ./eliot.nix { };
    66
    77      # NixOS autobahn package has trollius as a dependency, although
    88      # it is optional. Trollius is unmaintained and fails on CI.
    9       autobahn = python-super.callPackage ./autobahn.nix { };
     9      autobahn = python-super.pythonPackages.callPackage ./autobahn.nix { };
    1010
    1111      # Porting to Python 3 is greatly aided by the future package.  A
    1212      # slightly newer version than appears in nixos 19.09 is helpful.
    13       future = python-super.callPackage ./future.nix { };
     13      future = python-super.pythonPackages.callPackage ./future.nix { };
    1414
    1515      # Need version of pyutil that supports Python 3. The version in 19.09
    1616      # is too old.
    17       pyutil = python-super.callPackage ./pyutil.nix { };
     17      pyutil = python-super.pythonPackages.callPackage ./pyutil.nix { };
    1818
    1919      # Need a newer version of Twisted, too.
    20       twisted = python-super.callPackage ./twisted.nix { };
     20      twisted = python-super.pythonPackages.callPackage ./twisted.nix { };
     21
     22      # collections-extended is not part of nixpkgs at this time.
     23      collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { };
     24    };
     25  };
     26
     27  python39 = super.python39.override {
     28    packageOverrides = python-self: python-super: {
     29      # collections-extended is not part of nixpkgs at this time.
     30      collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { };
    2131    };
    2232  };
  • TabularUnified nix/tahoe-lafs.nix

    r49b6080 rbf670c0  
    9898    service-identity pyyaml magic-wormhole treq
    9999    eliot autobahn cryptography netifaces setuptools
    100     future pyutil distro configparser
     100    future pyutil distro configparser collections-extended
    101101  ];
    102102
     
    108108    html5lib
    109109    tenacity
     110    prometheus_client
    110111  ];
    111112
  • TabularUnified setup.py

    r49b6080 rbf670c0  
    138138    # Backported configparser for Python 2:
    139139    "configparser ; python_version < '3.0'",
     140
     141    # For the RangeMap datastructure.
     142    "collections-extended",
    140143]
    141144
     
    405408              "paramiko",
    406409              "pytest-timeout",
     410              # Does our OpenMetrics endpoint adhere to the spec:
     411              "prometheus-client == 0.11.0",
    407412          ] + tor_requires + i2p_requires,
    408413          "tor": tor_requires,
  • TabularUnified src/allmydata/interfaces.py

    r49b6080 rbf670c0  
    5454
    5555
     56class DataTooLargeError(Exception):
     57    """The write went past the expected size of the bucket."""
     58
     59
     60class ConflictingWriteError(Exception):
     61    """Two writes happened to same immutable with different data."""
     62
     63
    5664class RIBucketWriter(RemoteInterface):
    5765    """ Objects of this kind live on the server side. """
     
    92100TestVector = ListOf(TupleOf(Offset, ReadSize, bytes, bytes))
    93101# elements are (offset, length, operator, specimen)
    94 # operator is one of "lt, le, eq, ne, ge, gt"
    95 # nop always passes and is used to fetch data while writing.
    96 # you should use length==len(specimen) for everything except nop
     102# operator must be b"eq", typically length==len(specimen), but one can ensure
     103# writes don't happen to empty shares by setting length to 1 and specimen to
     104# b"". The operator is still used for wire compatibility with old versions.
    97105DataVector = ListOf(TupleOf(Offset, ShareData))
    98106# (offset, data). This limits us to 30 writes of 1MiB each per call
     
    352360        """
    353361        :see: ``RIStorageServer.slot_testv_readv_and_writev``
     362
     363        While the interface mostly matches, test vectors are simplified.
     364        Instead of a tuple ``(offset, read_size, operator, expected_data)`` in
     365        the original, for this method you need only pass in
     366        ``(offset, read_size, expected_data)``, with the operator implicitly
     367        being ``b"eq"``.
    354368        """
    355369
  • TabularUnified src/allmydata/mutable/layout.py

    r49b6080 rbf670c0  
    310310        else:
    311311            checkstring = checkstring_or_seqnum
    312         self._testvs = [(0, len(checkstring), b"eq", checkstring)]
     312        self._testvs = [(0, len(checkstring), checkstring)]
    313313
    314314
     
    319319        """
    320320        if self._testvs:
    321             return self._testvs[0][3]
     321            return self._testvs[0][2]
    322322        return b""
    323323
     
    549549            # Our caller has not provided us with another checkstring
    550550            # yet, so we assume that we are writing a new share, and set
    551             # a test vector that will allow a new share to be written.
     551            # a test vector that will only allow a new share to be written.
    552552            self._testvs = []
    553             self._testvs.append(tuple([0, 1, b"eq", b""]))
     553            self._testvs.append(tuple([0, 1, b""]))
    554554
    555555        tw_vectors = {}
     
    890890        else:
    891891            self._testvs = []
    892             self._testvs.append((0, len(checkstring), b"eq", checkstring))
     892            self._testvs.append((0, len(checkstring), checkstring))
    893893
    894894
     
    11621162        tw_vectors = {}
    11631163        if not self._testvs:
     1164            # Make sure we will only successfully write if the share didn't
     1165            # previously exist.
    11641166            self._testvs = []
    1165             self._testvs.append(tuple([0, 1, b"eq", b""]))
     1167            self._testvs.append(tuple([0, 1, b""]))
    11661168        if not self._written:
    11671169            # Write a new checkstring to the share when we write it, so
     
    11711173            def _first_write():
    11721174                self._written = True
    1173                 self._testvs = [(0, len(new_checkstring), b"eq", new_checkstring)]
     1175                self._testvs = [(0, len(new_checkstring), new_checkstring)]
    11741176            on_success = _first_write
    11751177        tw_vectors[self.shnum] = (self._testvs, datavs, None)
  • TabularUnified src/allmydata/storage/common.py

    r49b6080 rbf670c0  
    1414from allmydata.util import base32
    1515
    16 class DataTooLargeError(Exception):
    17     pass
     16# Backwards compatibility.
     17from allmydata.interfaces import DataTooLargeError  # noqa: F401
     18
    1819class UnknownMutableContainerVersionError(Exception):
    1920    pass
  • TabularUnified src/allmydata/storage/immutable.py

    r49b6080 rbf670c0  
    1414import os, stat, struct, time
    1515
     16from collections_extended import RangeMap
     17
    1618from foolscap.api import Referenceable
    1719
    1820from zope.interface import implementer
    19 from allmydata.interfaces import RIBucketWriter, RIBucketReader
     21from allmydata.interfaces import (
     22    RIBucketWriter, RIBucketReader, ConflictingWriteError,
     23    DataTooLargeError,
     24)
    2025from allmydata.util import base32, fileutil, log
    2126from allmydata.util.assertutil import precondition
    2227from allmydata.util.hashutil import timing_safe_compare
    2328from allmydata.storage.lease import LeaseInfo
    24 from allmydata.storage.common import UnknownImmutableContainerVersionError, \
    25      DataTooLargeError
     29from allmydata.storage.common import UnknownImmutableContainerVersionError
    2630
    2731# each share file (in storage/shares/$SI/$SHNUM) contains lease information
     
    205209class BucketWriter(Referenceable):  # type: ignore # warner/foolscap#78
    206210
    207     def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
     211    def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
    208212        self.ss = ss
    209213        self.incominghome = incominghome
    210214        self.finalhome = finalhome
    211215        self._max_size = max_size # don't allow the client to write more than this
    212         self._canary = canary
    213         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
    214216        self.closed = False
    215217        self.throw_out_all_data = False
     
    218220        # added by simultaneous uploaders
    219221        self._sharefile.add_lease(lease_info)
     222        self._already_written = RangeMap()
    220223
    221224    def allocated_size(self):
     
    227230        if self.throw_out_all_data:
    228231            return
     232
     233        # Make sure we're not conflicting with existing data:
     234        end = offset + len(data)
     235        for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end):
     236            chunk_len = chunk_stop - chunk_start
     237            actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len)
     238            writing_chunk = data[chunk_start - offset:chunk_stop - offset]
     239            if actual_chunk != writing_chunk:
     240                raise ConflictingWriteError(
     241                    "Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop)
     242                )
    229243        self._sharefile.write_share_data(offset, data)
     244
     245        self._already_written.set(True, offset, end)
    230246        self.ss.add_latency("write", time.time() - start)
    231247        self.ss.count("write")
     
    263279        self._sharefile = None
    264280        self.closed = True
    265         self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
    266281
    267282        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
     
    270285        self.ss.count("close")
    271286
    272     def _disconnected(self):
     287    def disconnected(self):
    273288        if not self.closed:
    274289            self._abort()
     
    277292        log.msg("storage: aborting sharefile %s" % self.incominghome,
    278293                facility="tahoe.storage", level=log.UNUSUAL)
    279         if not self.closed:
    280             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
    281294        self._abort()
    282295        self.ss.count("abort")
  • TabularUnified src/allmydata/storage/mutable.py

    r49b6080 rbf670c0  
    435435
    436436def testv_compare(a, op, b):
    437     assert op in (b"lt", b"le", b"eq", b"ne", b"ge", b"gt")
    438     if op == b"lt":
    439         return a < b
    440     if op == b"le":
    441         return a <= b
    442     if op == b"eq":
    443         return a == b
    444     if op == b"ne":
    445         return a != b
    446     if op == b"ge":
    447         return a >= b
    448     if op == b"gt":
    449         return a > b
    450     # never reached
     437    assert op == b"eq"
     438    return a == b
     439
    451440
    452441class EmptyShare(object):
  • TabularUnified src/allmydata/storage/server.py

    r49b6080 rbf670c0  
    1212    # strings. Omit bytes so we don't leak future's custom bytes.
    1313    from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min  # noqa: F401
    14 
     14else:
     15    from typing import Dict
    1516
    1617import os, re, struct, time
    17 import weakref
    1818import six
    1919
    2020from foolscap.api import Referenceable
     21from foolscap.ipb import IRemoteReference
    2122from twisted.application import service
    2223
     
    9091        self._clean_incomplete()
    9192        fileutil.make_dirs(self.incomingdir)
    92         self._active_writers = weakref.WeakKeyDictionary()
    9393        log.msg("StorageServer created", facility="tahoe.storage")
    9494
     
    121121        self.lease_checker.setServiceParent(self)
    122122        self._get_current_time = get_current_time
     123
     124        # Currently being-written Bucketwriters. For Foolscap, lifetime is tied
     125        # to connection: when disconnection happens, the BucketWriters are
     126        # removed. For HTTP, this makes no sense, so there will be
     127        # timeout-based cleanup; see
     128        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
     129
     130        # Map in-progress filesystem path -> BucketWriter:
     131        self._bucket_writers = {}  # type: Dict[str,BucketWriter]
     132        # Canaries and disconnect markers for BucketWriters created via Foolscap:
     133        self._bucket_writer_disconnect_markers = {}  # type: Dict[BucketWriter,(IRemoteReference, object)]
    123134
    124135    def __repr__(self):
     
    239250    def allocated_size(self):
    240251        space = 0
    241         for bw in self._active_writers:
     252        for bw in self._bucket_writers.values():
    242253            space += bw.allocated_size()
    243254        return space
     
    264275        return version
    265276
    266     def remote_allocate_buckets(self, storage_index,
    267                                 renew_secret, cancel_secret,
    268                                 sharenums, allocated_size,
    269                                 canary, owner_num=0):
     277    def _allocate_buckets(self, storage_index,
     278                          renew_secret, cancel_secret,
     279                          sharenums, allocated_size,
     280                          owner_num=0):
     281        """
     282        Generic bucket allocation API.
     283        """
    270284        # owner_num is not for clients to set, but rather it should be
    271285        # curried into the PersonalStorageServer instance that is dedicated
     
    316330                pass
    317331            elif os.path.exists(incominghome):
    318                 # Note that we don't create BucketWriters for shnums that
     332                # For Foolscap we don't create BucketWriters for shnums that
    319333                # have a partial share (in incoming/), so if a second upload
    320334                # occurs while the first is still in progress, the second
     
    324338                # ok! we need to create the new share file.
    325339                bw = BucketWriter(self, incominghome, finalhome,
    326                                   max_space_per_bucket, lease_info, canary)
     340                                  max_space_per_bucket, lease_info)
    327341                if self.no_storage:
    328342                    bw.throw_out_all_data = True
    329343                bucketwriters[shnum] = bw
    330                 self._active_writers[bw] = 1
     344                self._bucket_writers[incominghome] = bw
    331345                if limited:
    332346                    remaining_space -= max_space_per_bucket
     
    339353
    340354        self.add_latency("allocate", self._get_current_time() - start)
     355        return alreadygot, bucketwriters
     356
     357    def remote_allocate_buckets(self, storage_index,
     358                                renew_secret, cancel_secret,
     359                                sharenums, allocated_size,
     360                                canary, owner_num=0):
     361        """Foolscap-specific ``allocate_buckets()`` API."""
     362        alreadygot, bucketwriters = self._allocate_buckets(
     363            storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
     364            owner_num=owner_num,
     365        )
     366        # Abort BucketWriters if disconnection happens.
     367        for bw in bucketwriters.values():
     368            disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
     369            self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
    341370        return alreadygot, bucketwriters
    342371
     
    384413        if self.stats_provider:
    385414            self.stats_provider.count('storage_server.bytes_added', consumed_size)
    386         del self._active_writers[bw]
     415        del self._bucket_writers[bw.incominghome]
     416        if bw in self._bucket_writer_disconnect_markers:
     417            canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
     418            canary.dontNotifyOnDisconnect(disconnect_marker)
    387419
    388420    def _get_bucket_shares(self, storage_index):
  • TabularUnified src/allmydata/storage_client.py

    r49b6080 rbf670c0  
    995995            r_vector,
    996996    ):
     997        # Match the wire protocol, which requires 4-tuples for test vectors.
     998        wire_format_tw_vectors = {
     999            key: (
     1000                [(start, length, b"eq", data) for (start, length, data) in value[0]],
     1001                value[1],
     1002                value[2],
     1003            ) for (key, value) in tw_vectors.items()
     1004        }
    9971005        return self._rref.callRemote(
    9981006            "slot_testv_and_readv_and_writev",
    9991007            storage_index,
    10001008            secrets,
    1001             tw_vectors,
     1009            wire_format_tw_vectors,
    10021010            r_vector,
    10031011        )
  • TabularUnified src/allmydata/test/common_util.py

    r49b6080 rbf670c0  
    315315        return "<fake>"
    316316
     317    def disconnected(self):
     318        """Disconnect the canary, to be called by test code.
     319
     320        Can only  happen once.
     321        """
     322        if self.disconnectors is not None:
     323            for (f, args, kwargs) in list(self.disconnectors.values()):
     324                f(*args, **kwargs)
     325            self.disconnectors = None
     326
    317327
    318328class ShouldFailMixin(object):
  • TabularUnified src/allmydata/test/mutable/util.py

    r49b6080 rbf670c0  
    150150        for shnum, (testv, writev, new_length) in list(tw_vectors.items()):
    151151            for (offset, length, op, specimen) in testv:
    152                 assert op in (b"le", b"eq", b"ge")
     152                assert op == b"eq"
    153153            # TODO: this isn't right, the read is controlled by read_vector,
    154154            # not by testv
  • TabularUnified src/allmydata/test/test_istorageserver.py

    r49b6080 rbf670c0  
    2121from random import Random
    2222
    23 from testtools import skipIf
    24 
    2523from twisted.internet.defer import inlineCallbacks
    2624
    27 from foolscap.api import Referenceable
     25from foolscap.api import Referenceable, RemoteException
    2826
    2927from allmydata.interfaces import IStorageServer
     
    7876
    7977    ``self.storage_server`` is expected to provide ``IStorageServer``.
     78
     79    ``self.disconnect()`` should disconnect and then reconnect, creating a new
     80    ``self.storage_server``.  Some implementations may wish to skip tests using
     81    this; HTTP has no notion of disconnection.
    8082    """
    8183
     
    99101
    100102    @inlineCallbacks
    101     @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")
    102103    def test_allocate_buckets_repeat(self):
    103104        """
    104         allocate_buckets() with the same storage index returns the same result,
    105         because the shares have not been written to.
    106 
    107         This fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793
     105        ``IStorageServer.allocate_buckets()`` with the same storage index does not return
     106        work-in-progress buckets, but will add any newly added buckets.
    108107        """
    109108        storage_index, renew_secret, cancel_secret = (
     
    116115            renew_secret,
    117116            cancel_secret,
    118             sharenums=set(range(5)),
     117            sharenums=set(range(4)),
    119118            allocated_size=1024,
    120119            canary=Referenceable(),
     
    129128        )
    130129        self.assertEqual(already_got, already_got2)
    131         self.assertEqual(set(allocated.keys()), set(allocated2.keys()))
    132 
    133     @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")
    134     @inlineCallbacks
    135     def test_allocate_buckets_more_sharenums(self):
    136         """
    137         allocate_buckets() with the same storage index but more sharenums
    138         acknowledges the extra shares don't exist.
    139 
    140         Fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793
     130        self.assertEqual(set(allocated2.keys()), {4})
     131
     132    @inlineCallbacks
     133    def abort_or_disconnect_half_way(self, abort_or_disconnect):
     134        """
     135        If we disconnect/abort in the middle of writing to a bucket, all data
     136        is wiped, and it's even possible to write different data to the bucket.
     137
     138        (In the real world one shouldn't do that, but writing different data is
     139        a good way to test that the original data really was wiped.)
     140
     141        ``abort_or_disconnect`` is a callback that takes a bucket and aborts up
     142        load, or perhaps disconnects the whole connection.
    141143        """
    142144        storage_index, renew_secret, cancel_secret = (
     
    145147            new_secret(),
    146148        )
    147         yield self.storage_server.allocate_buckets(
     149        (_, allocated) = yield self.storage_server.allocate_buckets(
     150            storage_index,
     151            renew_secret,
     152            cancel_secret,
     153            sharenums={0},
     154            allocated_size=1024,
     155            canary=Referenceable(),
     156        )
     157
     158        # Bucket 1 is fully written in one go.
     159        yield allocated[0].callRemote("write", 0, b"1" * 1024)
     160
     161        # Disconnect or abort, depending on the test:
     162        yield abort_or_disconnect(allocated[0])
     163
     164        # Write different data with no complaint:
     165        (_, allocated) = yield self.storage_server.allocate_buckets(
     166            storage_index,
     167            renew_secret,
     168            cancel_secret,
     169            sharenums={0},
     170            allocated_size=1024,
     171            canary=Referenceable(),
     172        )
     173        yield allocated[0].callRemote("write", 0, b"2" * 1024)
     174
     175    def test_disconnection(self):
     176        """
     177        If we disconnect in the middle of writing to a bucket, all data is
     178        wiped, and it's even possible to write different data to the bucket.
     179
     180        (In the real world one shouldn't do that, but writing different data is
     181        a good way to test that the original data really was wiped.)
     182
     183        HTTP protocol should skip this test, since disconnection is meaningless
     184        concept; this is more about testing implicit contract the Foolscap
     185        implementation depends on doesn't change as we refactor things.
     186        """
     187        return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
     188
     189    @inlineCallbacks
     190    def test_written_shares_are_allocated(self):
     191        """
     192        Shares that are fully written to show up as allocated in result from
     193        ``IStorageServer.allocate_buckets()``.  Partially-written or empty
     194        shares don't.
     195        """
     196        storage_index, renew_secret, cancel_secret = (
     197            new_storage_index(),
     198            new_secret(),
     199            new_secret(),
     200        )
     201        (_, allocated) = yield self.storage_server.allocate_buckets(
    148202            storage_index,
    149203            renew_secret,
     
    153207            canary=Referenceable(),
    154208        )
    155         (already_got2, allocated2) = yield self.storage_server.allocate_buckets(
    156             storage_index,
    157             renew_secret,
    158             cancel_secret,
    159             sharenums=set(range(7)),
    160             allocated_size=1024,
    161             canary=Referenceable(),
    162         )
    163         self.assertEqual(already_got2, set())  # none were fully written
    164         self.assertEqual(set(allocated2.keys()), set(range(7)))
    165 
    166     @inlineCallbacks
    167     def test_written_shares_are_allocated(self):
    168         """
    169         Shares that are fully written to show up as allocated in result from
    170         ``IStorageServer.allocate_buckets()``.  Partially-written or empty
    171         shares don't.
    172         """
    173         storage_index, renew_secret, cancel_secret = (
    174             new_storage_index(),
    175             new_secret(),
    176             new_secret(),
    177         )
    178         (_, allocated) = yield self.storage_server.allocate_buckets(
     209
     210        # Bucket 1 is fully written in one go.
     211        yield allocated[1].callRemote("write", 0, b"1" * 1024)
     212        yield allocated[1].callRemote("close")
     213
     214        # Bucket 2 is fully written in two steps.
     215        yield allocated[2].callRemote("write", 0, b"1" * 512)
     216        yield allocated[2].callRemote("write", 512, b"2" * 512)
     217        yield allocated[2].callRemote("close")
     218
     219        # Bucket 0 has partial write.
     220        yield allocated[0].callRemote("write", 0, b"1" * 512)
     221
     222        (already_got, _) = yield self.storage_server.allocate_buckets(
    179223            storage_index,
    180224            renew_secret,
     
    184228            canary=Referenceable(),
    185229        )
    186 
    187         # Bucket 1 is fully written in one go.
    188         yield allocated[1].callRemote("write", 0, b"1" * 1024)
    189         yield allocated[1].callRemote("close")
    190 
    191         # Bucket 2 is fully written in two steps.
    192         yield allocated[2].callRemote("write", 0, b"1" * 512)
    193         yield allocated[2].callRemote("write", 512, b"2" * 512)
    194         yield allocated[2].callRemote("close")
    195 
    196         # Bucket 0 has partial write.
    197         yield allocated[0].callRemote("write", 0, b"1" * 512)
    198 
    199         (already_got, _) = yield self.storage_server.allocate_buckets(
     230        self.assertEqual(already_got, {1, 2})
     231
     232    @inlineCallbacks
     233    def test_written_shares_are_readable(self):
     234        """
     235        Shares that are fully written to can be read.
     236
     237        The result is not affected by the order in which writes
     238        happened, only by their offsets.
     239        """
     240        storage_index, renew_secret, cancel_secret = (
     241            new_storage_index(),
     242            new_secret(),
     243            new_secret(),
     244        )
     245        (_, allocated) = yield self.storage_server.allocate_buckets(
    200246            storage_index,
    201247            renew_secret,
     
    205251            canary=Referenceable(),
    206252        )
    207         self.assertEqual(already_got, {1, 2})
    208 
    209     @inlineCallbacks
    210     def test_written_shares_are_readable(self):
    211         """
    212         Shares that are fully written to can be read.
    213 
    214         The result is not affected by the order in which writes
    215         happened, only by their offsets.
    216         """
    217         storage_index, renew_secret, cancel_secret = (
    218             new_storage_index(),
    219             new_secret(),
    220             new_secret(),
    221         )
    222         (_, allocated) = yield self.storage_server.allocate_buckets(
    223             storage_index,
    224             renew_secret,
    225             cancel_secret,
    226             sharenums=set(range(5)),
    227             allocated_size=1024,
    228             canary=Referenceable(),
    229         )
    230253
    231254        # Bucket 1 is fully written in order
     
    249272        )
    250273
    251     @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801")
    252     def test_overlapping_writes(self):
    253         """
    254         The policy for overlapping writes is TBD:
    255         https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801
    256         """
     274    @inlineCallbacks
     275    def test_non_matching_overlapping_writes(self):
     276        """
     277        When doing overlapping writes in immutable uploads, non-matching writes
     278        fail.
     279        """
     280        storage_index, renew_secret, cancel_secret = (
     281            new_storage_index(),
     282            new_secret(),
     283            new_secret(),
     284        )
     285        (_, allocated) = yield self.storage_server.allocate_buckets(
     286            storage_index,
     287            renew_secret,
     288            cancel_secret,
     289            sharenums={0},
     290            allocated_size=30,
     291            canary=Referenceable(),
     292        )
     293
     294        yield allocated[0].callRemote("write", 0, b"1" * 25)
     295        # Overlapping write that doesn't match:
     296        with self.assertRaises(RemoteException):
     297            yield allocated[0].callRemote("write", 20, b"2" * 10)
     298
     299    @inlineCallbacks
     300    def test_matching_overlapping_writes(self):
     301        """
     302        When doing overlapping writes in immutable uploads, matching writes
     303        succeed.
     304        """
     305        storage_index, renew_secret, cancel_secret = (
     306            new_storage_index(),
     307            new_secret(),
     308            new_secret(),
     309        )
     310        (_, allocated) = yield self.storage_server.allocate_buckets(
     311            storage_index,
     312            renew_secret,
     313            cancel_secret,
     314            sharenums={0},
     315            allocated_size=25,
     316            canary=Referenceable(),
     317        )
     318
     319        yield allocated[0].callRemote("write", 0, b"1" * 10)
     320        # Overlapping write that matches:
     321        yield allocated[0].callRemote("write", 5, b"1" * 20)
     322        yield allocated[0].callRemote("close")
     323
     324        buckets = yield self.storage_server.get_buckets(storage_index)
     325        self.assertEqual(set(buckets.keys()), {0})
     326
     327        self.assertEqual((yield buckets[0].callRemote("read", 0, 25)), b"1" * 25)
     328
     329    def test_abort(self):
     330        """
     331        If we call ``abort`` on the ``RIBucketWriter`` to disconnect in the
     332        middle of writing to a bucket, all data is wiped, and it's even
     333        possible to write different data to the bucket.
     334
     335        (In the real world one probably wouldn't do that, but writing different
     336        data is a good way to test that the original data really was wiped.)
     337        """
     338        return self.abort_or_disconnect_half_way(
     339            lambda bucket: bucket.callRemote("abort")
     340        )
     341
     342    @inlineCallbacks
     343    def test_get_buckets_skips_unfinished_buckets(self):
     344        """
     345        Buckets that are not fully written are not returned by
     346        ``IStorageServer.get_buckets()`` implementations.
     347        """
     348        storage_index = new_storage_index()
     349        (_, allocated) = yield self.storage_server.allocate_buckets(
     350            storage_index,
     351            renew_secret=new_secret(),
     352            cancel_secret=new_secret(),
     353            sharenums=set(range(5)),
     354            allocated_size=10,
     355            canary=Referenceable(),
     356        )
     357
     358        # Bucket 1 is fully written
     359        yield allocated[1].callRemote("write", 0, b"1" * 10)
     360        yield allocated[1].callRemote("close")
     361
     362        # Bucket 2 is partially written
     363        yield allocated[2].callRemote("write", 0, b"1" * 5)
     364
     365        buckets = yield self.storage_server.get_buckets(storage_index)
     366        self.assertEqual(set(buckets.keys()), {1})
     367
     368    @inlineCallbacks
     369    def test_read_bucket_at_offset(self):
     370        """
     371        Given a read bucket returned from ``IStorageServer.get_buckets()``, it
     372        is possible to read at different offsets and lengths, with reads past
     373        the end resulting in empty bytes.
     374        """
     375        length = 256 * 17
     376
     377        storage_index = new_storage_index()
     378        (_, allocated) = yield self.storage_server.allocate_buckets(
     379            storage_index,
     380            renew_secret=new_secret(),
     381            cancel_secret=new_secret(),
     382            sharenums=set(range(1)),
     383            allocated_size=length,
     384            canary=Referenceable(),
     385        )
     386
     387        total_data = _randbytes(256 * 17)
     388        yield allocated[0].callRemote("write", 0, total_data)
     389        yield allocated[0].callRemote("close")
     390
     391        buckets = yield self.storage_server.get_buckets(storage_index)
     392        bucket = buckets[0]
     393        for start, to_read in [
     394            (0, 250),  # fraction
     395            (0, length),  # whole thing
     396            (100, 1024),  # offset fraction
     397            (length + 1, 100),  # completely out of bounds
     398            (length - 100, 200),  # partially out of bounds
     399        ]:
     400            data = yield bucket.callRemote("read", start, to_read)
     401            self.assertEqual(
     402                data,
     403                total_data[start : start + to_read],
     404                "Didn't match for start {}, length {}".format(start, to_read),
     405            )
     406
     407    @inlineCallbacks
     408    def test_bucket_advise_corrupt_share(self):
     409        """
     410        Calling ``advise_corrupt_share()`` on a bucket returned by
     411        ``IStorageServer.get_buckets()`` does not result in error (other
     412        behavior is opaque at this level of abstraction).
     413        """
     414        storage_index = new_storage_index()
     415        (_, allocated) = yield self.storage_server.allocate_buckets(
     416            storage_index,
     417            renew_secret=new_secret(),
     418            cancel_secret=new_secret(),
     419            sharenums=set(range(1)),
     420            allocated_size=10,
     421            canary=Referenceable(),
     422        )
     423
     424        yield allocated[0].callRemote("write", 0, b"0123456789")
     425        yield allocated[0].callRemote("close")
     426
     427        buckets = yield self.storage_server.get_buckets(storage_index)
     428        yield buckets[0].callRemote("advise_corrupt_share", b"OH NO")
     429
     430
     431class IStorageServerMutableAPIsTestsMixin(object):
     432    """
     433    Tests for ``IStorageServer``'s mutable APIs.
     434
     435    ``self.storage_server`` is expected to provide ``IStorageServer``.
     436
     437    ``STARAW`` is short for ``slot_testv_and_readv_and_writev``.
     438    """
     439
     440    def new_secrets(self):
     441        """Return a 3-tuple of secrets for STARAW calls."""
     442        return (new_secret(), new_secret(), new_secret())
     443
     444    def staraw(self, *args, **kwargs):
     445        """Like ``slot_testv_and_readv_and_writev``, but less typing."""
     446        return self.storage_server.slot_testv_and_readv_and_writev(*args, **kwargs)
     447
     448    @inlineCallbacks
     449    def test_STARAW_reads_after_write(self):
     450        """
     451        When data is written with
     452        ``IStorageServer.slot_testv_and_readv_and_writev``, it can then be read
     453        by a separate call using that API.
     454        """
     455        secrets = self.new_secrets()
     456        storage_index = new_storage_index()
     457        (written, _) = yield self.staraw(
     458            storage_index,
     459            secrets,
     460            tw_vectors={
     461                0: ([], [(0, b"abcdefg")], 7),
     462                1: ([], [(0, b"0123"), (4, b"456")], 7),
     463            },
     464            r_vector=[],
     465        )
     466        self.assertEqual(written, True)
     467
     468        (_, reads) = yield self.staraw(
     469            storage_index,
     470            secrets,
     471            tw_vectors={},
     472            # Whole thing, partial, going beyond the edge, completely outside
     473            # range:
     474            r_vector=[(0, 7), (2, 3), (6, 8), (100, 10)],
     475        )
     476        self.assertEqual(
     477            reads,
     478            {0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]},
     479        )
     480
     481    @inlineCallbacks
     482    def test_SATRAW_reads_happen_before_writes_in_single_query(self):
     483        """
     484        If a ``IStorageServer.slot_testv_and_readv_and_writev`` command
     485        contains both reads and writes, the read returns results that precede
     486        the write.
     487        """
     488        secrets = self.new_secrets()
     489        storage_index = new_storage_index()
     490        (written, _) = yield self.staraw(
     491            storage_index,
     492            secrets,
     493            tw_vectors={
     494                0: ([], [(0, b"abcdefg")], 7),
     495            },
     496            r_vector=[],
     497        )
     498        self.assertEqual(written, True)
     499
     500        # Read and write in same command; read happens before write:
     501        (written, reads) = yield self.staraw(
     502            storage_index,
     503            secrets,
     504            tw_vectors={
     505                0: ([], [(0, b"X" * 7)], 7),
     506            },
     507            r_vector=[(0, 7)],
     508        )
     509        self.assertEqual(written, True)
     510        self.assertEqual(reads, {0: [b"abcdefg"]})
     511
     512        # The write is available in next read:
     513        (_, reads) = yield self.staraw(
     514            storage_index,
     515            secrets,
     516            tw_vectors={},
     517            r_vector=[(0, 7)],
     518        )
     519        self.assertEqual(reads, {0: [b"X" * 7]})
     520
     521    @inlineCallbacks
     522    def test_SATRAW_writes_happens_only_if_test_matches(self):
     523        """
     524        If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes both a
     525        test and a write, the write succeeds if the test matches, and fails if
     526        the test does not match.
     527        """
     528        secrets = self.new_secrets()
     529        storage_index = new_storage_index()
     530        (written, _) = yield self.staraw(
     531            storage_index,
     532            secrets,
     533            tw_vectors={
     534                0: ([], [(0, b"1" * 7)], 7),
     535            },
     536            r_vector=[],
     537        )
     538        self.assertEqual(written, True)
     539
     540        # Test matches, so write happens:
     541        (written, _) = yield self.staraw(
     542            storage_index,
     543            secrets,
     544            tw_vectors={
     545                0: (
     546                    [(0, 3, b"1" * 3), (3, 4, b"1" * 4)],
     547                    [(0, b"2" * 7)],
     548                    7,
     549                ),
     550            },
     551            r_vector=[],
     552        )
     553        self.assertEqual(written, True)
     554        (_, reads) = yield self.staraw(
     555            storage_index,
     556            secrets,
     557            tw_vectors={},
     558            r_vector=[(0, 7)],
     559        )
     560        self.assertEqual(reads, {0: [b"2" * 7]})
     561
     562        # Test does not match, so write does not happen:
     563        (written, _) = yield self.staraw(
     564            storage_index,
     565            secrets,
     566            tw_vectors={
     567                0: ([(0, 7, b"1" * 7)], [(0, b"3" * 7)], 7),
     568            },
     569            r_vector=[],
     570        )
     571        self.assertEqual(written, False)
     572        (_, reads) = yield self.staraw(
     573            storage_index,
     574            secrets,
     575            tw_vectors={},
     576            r_vector=[(0, 7)],
     577        )
     578        self.assertEqual(reads, {0: [b"2" * 7]})
     579
     580    @inlineCallbacks
     581    def test_SATRAW_tests_past_end_of_data(self):
     582        """
     583        If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes a test
     584        vector that reads past the end of the data, the result is limited to
     585        actual available data.
     586        """
     587        secrets = self.new_secrets()
     588        storage_index = new_storage_index()
     589
     590        # Since there is no data on server, the test vector will return empty
     591        # string, which matches expected result, so write will succeed.
     592        (written, _) = yield self.staraw(
     593            storage_index,
     594            secrets,
     595            tw_vectors={
     596                0: ([(0, 10, b"")], [(0, b"1" * 7)], 7),
     597            },
     598            r_vector=[],
     599        )
     600        self.assertEqual(written, True)
     601
     602        # Now the test vector is a 10-read off of a 7-byte value, but expected
     603        # value is still 7 bytes, so the write will again succeed.
     604        (written, _) = yield self.staraw(
     605            storage_index,
     606            secrets,
     607            tw_vectors={
     608                0: ([(0, 10, b"1" * 7)], [(0, b"2" * 7)], 7),
     609            },
     610            r_vector=[],
     611        )
     612        self.assertEqual(written, True)
     613
     614    @inlineCallbacks
     615    def test_SATRAW_reads_past_end_of_data(self):
     616        """
     617        If a ``IStorageServer.slot_testv_and_readv_and_writev`` reads past the
     618        end of the data, the result is limited to actual available data.
     619        """
     620        secrets = self.new_secrets()
     621        storage_index = new_storage_index()
     622
     623        # Write some data
     624        (written, _) = yield self.staraw(
     625            storage_index,
     626            secrets,
     627            tw_vectors={
     628                0: ([], [(0, b"12345")], 5),
     629            },
     630            r_vector=[],
     631        )
     632        self.assertEqual(written, True)
     633
     634        # Reads past end.
     635        (_, reads) = yield self.staraw(
     636            storage_index,
     637            secrets,
     638            tw_vectors={},
     639            r_vector=[(0, 100), (2, 50)],
     640        )
     641        self.assertEqual(reads, {0: [b"12345", b"345"]})
     642
     643    @inlineCallbacks
     644    def test_STARAW_write_enabler_must_match(self):
     645        """
     646        If the write enabler secret passed to
     647        ``IStorageServer.slot_testv_and_readv_and_writev`` doesn't match
     648        previous writes, the write fails.
     649        """
     650        secrets = self.new_secrets()
     651        storage_index = new_storage_index()
     652        (written, _) = yield self.staraw(
     653            storage_index,
     654            secrets,
     655            tw_vectors={
     656                0: ([], [(0, b"1" * 7)], 7),
     657            },
     658            r_vector=[],
     659        )
     660        self.assertEqual(written, True)
     661
     662        # Write enabler secret does not match, so write does not happen:
     663        bad_secrets = (new_secret(),) + secrets[1:]
     664        with self.assertRaises(RemoteException):
     665            yield self.staraw(
     666                storage_index,
     667                bad_secrets,
     668                tw_vectors={
     669                    0: ([], [(0, b"2" * 7)], 7),
     670                },
     671                r_vector=[],
     672            )
     673        (_, reads) = yield self.staraw(
     674            storage_index,
     675            secrets,
     676            tw_vectors={},
     677            r_vector=[(0, 7)],
     678        )
     679        self.assertEqual(reads, {0: [b"1" * 7]})
     680
     681    @inlineCallbacks
     682    def test_STARAW_zero_new_length_deletes(self):
     683        """
     684        A zero new length passed to
     685        ``IStorageServer.slot_testv_and_readv_and_writev`` deletes the share.
     686        """
     687        secrets = self.new_secrets()
     688        storage_index = new_storage_index()
     689        (written, _) = yield self.staraw(
     690            storage_index,
     691            secrets,
     692            tw_vectors={
     693                0: ([], [(0, b"1" * 7)], 7),
     694            },
     695            r_vector=[],
     696        )
     697        self.assertEqual(written, True)
     698
     699        # Write with new length of 0:
     700        (written, _) = yield self.staraw(
     701            storage_index,
     702            secrets,
     703            tw_vectors={
     704                0: ([], [(0, b"1" * 7)], 0),
     705            },
     706            r_vector=[],
     707        )
     708        self.assertEqual(written, True)
     709
     710        # It's gone!
     711        (_, reads) = yield self.staraw(
     712            storage_index,
     713            secrets,
     714            tw_vectors={},
     715            r_vector=[(0, 7)],
     716        )
     717        self.assertEqual(reads, {})
     718
     719    @inlineCallbacks
     720    def test_slot_readv(self):
     721        """
     722        Data written with ``IStorageServer.slot_testv_and_readv_and_writev()``
     723        can be read using ``IStorageServer.slot_readv()``.  Reads can't go past
     724        the end of the data.
     725        """
     726        secrets = self.new_secrets()
     727        storage_index = new_storage_index()
     728        (written, _) = yield self.staraw(
     729            storage_index,
     730            secrets,
     731            tw_vectors={
     732                0: ([], [(0, b"abcdefg")], 7),
     733                1: ([], [(0, b"0123"), (4, b"456")], 7),
     734                # This will never get read from, just here to show we only read
     735                # from shares explicitly requested by slot_readv:
     736                2: ([], [(0, b"XYZW")], 4),
     737            },
     738            r_vector=[],
     739        )
     740        self.assertEqual(written, True)
     741
     742        reads = yield self.storage_server.slot_readv(
     743            storage_index,
     744            shares=[0, 1],
     745            # Whole thing, partial, going beyond the edge, completely outside
     746            # range:
     747            readv=[(0, 7), (2, 3), (6, 8), (100, 10)],
     748        )
     749        self.assertEqual(
     750            reads,
     751            {0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]},
     752        )
     753
     754    @inlineCallbacks
     755    def test_slot_readv_no_shares(self):
     756        """
     757        With no shares given, ``IStorageServer.slot_readv()`` reads from all shares.
     758        """
     759        secrets = self.new_secrets()
     760        storage_index = new_storage_index()
     761        (written, _) = yield self.staraw(
     762            storage_index,
     763            secrets,
     764            tw_vectors={
     765                0: ([], [(0, b"abcdefg")], 7),
     766                1: ([], [(0, b"0123456")], 7),
     767                2: ([], [(0, b"9876543")], 7),
     768            },
     769            r_vector=[],
     770        )
     771        self.assertEqual(written, True)
     772
     773        reads = yield self.storage_server.slot_readv(
     774            storage_index,
     775            shares=[],
     776            readv=[(0, 7)],
     777        )
     778        self.assertEqual(
     779            reads,
     780            {0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]},
     781        )
    257782
    258783
    259784class _FoolscapMixin(SystemTestMixin):
    260785    """Run tests on Foolscap version of ``IStorageServer."""
     786
     787    def _get_native_server(self):
     788        return next(iter(self.clients[0].storage_broker.get_known_servers()))
    261789
    262790    @inlineCallbacks
     
    266794        yield SystemTestMixin.setUp(self)
    267795        yield self.set_up_nodes(1)
    268         self.storage_server = next(
    269             iter(self.clients[0].storage_broker.get_known_servers())
    270         ).get_storage_server()
     796        self.storage_server = self._get_native_server().get_storage_server()
    271797        self.assertTrue(IStorageServer.providedBy(self.storage_server))
    272798
     
    275801        AsyncTestCase.tearDown(self)
    276802        yield SystemTestMixin.tearDown(self)
     803
     804    @inlineCallbacks
     805    def disconnect(self):
     806        """
     807        Disconnect and then reconnect with a new ``IStorageServer``.
     808        """
     809        current = self.storage_server
     810        yield self.bounce_client(0)
     811        self.storage_server = self._get_native_server().get_storage_server()
     812        assert self.storage_server is not current
    277813
    278814
     
    287823):
    288824    """Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
     825
     826
     827class FoolscapMutableAPIsTests(
     828    _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
     829):
     830    """Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
  • TabularUnified src/allmydata/test/test_storage.py

    r49b6080 rbf670c0  
    99from __future__ import unicode_literals
    1010
    11 from future.utils import native_str, PY2, bytes_to_native_str
     11from future.utils import native_str, PY2, bytes_to_native_str, bchr
    1212if PY2:
    1313    from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min  # noqa: F401
     
    2020import struct
    2121import shutil
    22 import gc
     22from uuid import uuid4
    2323
    2424from twisted.trial import unittest
     
    2626from twisted.internet import defer
    2727from twisted.internet.task import Clock
     28
     29from hypothesis import given, strategies
    2830
    2931import itertools
     
    3436from allmydata.storage.mutable import MutableShareFile
    3537from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
    36 from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \
     38from allmydata.storage.common import storage_index_to_dir, \
    3739     UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \
    3840     si_b2a, si_a2b
     
    4850                                     VERIFICATION_KEY_SIZE, \
    4951                                     SHARE_HASH_CHAIN_SIZE
    50 from allmydata.interfaces import BadWriteEnablerError
     52from allmydata.interfaces import (
     53    BadWriteEnablerError, DataTooLargeError, ConflictingWriteError,
     54)
    5155from allmydata.test.no_network import NoNetworkServer
    5256from allmydata.storage_client import (
     
    125129    def test_create(self):
    126130        incoming, final = self.make_workdir("test_create")
    127         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
    128                           FakeCanary())
     131        bw = BucketWriter(self, incoming, final, 200, self.make_lease())
    129132        bw.remote_write(0, b"a"*25)
    130133        bw.remote_write(25, b"b"*25)
     
    135138    def test_readwrite(self):
    136139        incoming, final = self.make_workdir("test_readwrite")
    137         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
    138                           FakeCanary())
     140        bw = BucketWriter(self, incoming, final, 200, self.make_lease())
    139141        bw.remote_write(0, b"a"*25)
    140142        bw.remote_write(25, b"b"*25)
     
    147149        self.failUnlessEqual(br.remote_read(25, 25), b"b"*25)
    148150        self.failUnlessEqual(br.remote_read(50, 7), b"c"*7)
     151
     152    def test_write_past_size_errors(self):
     153        """Writing beyond the size of the bucket throws an exception."""
     154        for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]):
     155            incoming, final = self.make_workdir(
     156                "test_write_past_size_errors-{}".format(i)
     157            )
     158            bw = BucketWriter(self, incoming, final, 200, self.make_lease())
     159            with self.assertRaises(DataTooLargeError):
     160                bw.remote_write(offset, b"a" * length)
     161
     162    @given(
     163        maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
     164        maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
     165    )
     166    def test_overlapping_writes_ok_if_matching(
     167            self, maybe_overlapping_offset, maybe_overlapping_length
     168    ):
     169        """
     170        Writes that overlap with previous writes are OK when the content is the
     171        same.
     172        """
     173        length = 100
     174        expected_data = b"".join(bchr(i) for i in range(100))
     175        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
     176        bw = BucketWriter(
     177            self, incoming, final, length, self.make_lease(),
     178        )
     179        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
     180        bw.remote_write(10, expected_data[10:20])
     181        bw.remote_write(30, expected_data[30:40])
     182        bw.remote_write(50, expected_data[50:60])
     183        # Then, an overlapping write but with matching data:
     184        bw.remote_write(
     185            maybe_overlapping_offset,
     186            expected_data[
     187                maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
     188            ]
     189        )
     190        # Now fill in the holes:
     191        bw.remote_write(0, expected_data[0:10])
     192        bw.remote_write(20, expected_data[20:30])
     193        bw.remote_write(40, expected_data[40:50])
     194        bw.remote_write(60, expected_data[60:])
     195        bw.remote_close()
     196
     197        br = BucketReader(self, bw.finalhome)
     198        self.assertEqual(br.remote_read(0, length), expected_data)
     199
     200
     201    @given(
     202        maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
     203        maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
     204    )
     205    def test_overlapping_writes_not_ok_if_different(
     206            self, maybe_overlapping_offset, maybe_overlapping_length
     207    ):
     208        """
     209        Writes that overlap with previous writes fail with an exception if the
     210        contents don't match.
     211        """
     212        length = 100
     213        incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
     214        bw = BucketWriter(
     215            self, incoming, final, length, self.make_lease(),
     216        )
     217        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
     218        bw.remote_write(10, b"1" * 10)
     219        bw.remote_write(30, b"1" * 10)
     220        bw.remote_write(50, b"1" * 10)
     221        # Then, write something that might overlap with some of them, but
     222        # conflicts. Then fill in holes left by first three writes. Conflict is
     223        # inevitable.
     224        with self.assertRaises(ConflictingWriteError):
     225            bw.remote_write(
     226                maybe_overlapping_offset,
     227                b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
     228            )
     229            bw.remote_write(0, b"1" * 10)
     230            bw.remote_write(20, b"1" * 10)
     231            bw.remote_write(40, b"1" * 10)
     232            bw.remote_write(60, b"1" * 40)
    149233
    150234    def test_read_past_end_of_share_data(self):
     
    229313        fileutil.make_dirs(basedir)
    230314        fileutil.make_dirs(os.path.join(basedir, "tmp"))
    231         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
    232                           FakeCanary())
     315        bw = BucketWriter(self, incoming, final, size, self.make_lease())
    233316        rb = RemoteBucket(bw)
    234317        return bw, rb, final
     
    580663        OVERHEAD = 3*4
    581664        LEASE_SIZE = 4+32+32+4
    582         canary = FakeCanary(True)
     665        canary = FakeCanary()
    583666        already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
    584667        self.failUnlessEqual(len(writers), 3)
    585668        # now the StorageServer should have 3000 bytes provisionally
    586669        # allocated, allowing only 2000 more to be claimed
    587         self.failUnlessEqual(len(ss._active_writers), 3)
     670        self.failUnlessEqual(len(ss._bucket_writers), 3)
    588671
    589672        # allocating 1001-byte shares only leaves room for one
    590         already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary)
     673        canary2 = FakeCanary()
     674        already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
    591675        self.failUnlessEqual(len(writers2), 1)
    592         self.failUnlessEqual(len(ss._active_writers), 4)
     676        self.failUnlessEqual(len(ss._bucket_writers), 4)
    593677
    594678        # we abandon the first set, so their provisional allocation should be
    595679        # returned
    596 
    597         del already
    598         del writers
    599         gc.collect()
    600 
    601         self.failUnlessEqual(len(ss._active_writers), 1)
     680        canary.disconnected()
     681
     682        self.failUnlessEqual(len(ss._bucket_writers), 1)
    602683        # now we have a provisional allocation of 1001 bytes
    603684
     
    608689            bw.remote_write(0, b"a"*25)
    609690            bw.remote_close()
    610         del already2
    611         del writers2
    612         del bw
    613         self.failUnlessEqual(len(ss._active_writers), 0)
     691        self.failUnlessEqual(len(ss._bucket_writers), 0)
    614692
    615693        # this also changes the amount reported as available by call_get_disk_stats
     
    618696        # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
    619697        # 5000-1085=3915 free, therefore we can fit 39 100byte shares
    620         already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary)
     698        canary3 = FakeCanary()
     699        already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
    621700        self.failUnlessEqual(len(writers3), 39)
    622         self.failUnlessEqual(len(ss._active_writers), 39)
    623 
    624         del already3
    625         del writers3
    626         gc.collect()
    627 
    628         self.failUnlessEqual(len(ss._active_writers), 0)
     701        self.failUnlessEqual(len(ss._bucket_writers), 39)
     702
     703        canary3.disconnected()
     704
     705        self.failUnlessEqual(len(ss._bucket_writers), 0)
    629706        ss.disownServiceParent()
    630707        del ss
     
    10751152        self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    10761153
    1077         # as should this one
    1078         answer = write(b"si1", secrets,
    1079                        {0: ([(10, 5, b"lt", b"11111"),
    1080                              ],
    1081                             [(0, b"x"*100)],
    1082                             None),
    1083                         },
    1084                        [(10,5)],
    1085                        )
    1086         self.failUnlessEqual(answer, (False,
    1087                                       {0: [b"11111"],
    1088                                        1: [b""],
    1089                                        2: [b""]},
    1090                                       ))
    1091         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1092 
    1093 
    10941154    def test_operators(self):
    10951155        # test operators, the data we're comparing is '11111' in all cases.
     
    11091169                  [])
    11101170
    1111         reset()
    1112 
    1113         #  lt
    1114         answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11110"),
    1115                                              ],
    1116                                             [(0, b"x"*100)],
    1117                                             None,
    1118                                             )}, [(10,5)])
    1119         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1120         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1121         self.failUnlessEqual(read(b"si1", [], [(0,100)]), {0: [data]})
    1122         reset()
    1123 
    1124         answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11111"),
    1125                                              ],
    1126                                             [(0, b"x"*100)],
    1127                                             None,
    1128                                             )}, [(10,5)])
    1129         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1130         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1131         reset()
    1132 
    1133         answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11112"),
    1134                                              ],
    1135                                             [(0, b"y"*100)],
    1136                                             None,
    1137                                             )}, [(10,5)])
    1138         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1139         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1140         reset()
    1141 
    1142         #  le
    1143         answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11110"),
    1144                                              ],
    1145                                             [(0, b"x"*100)],
    1146                                             None,
    1147                                             )}, [(10,5)])
    1148         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1149         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1150         reset()
    1151 
    1152         answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11111"),
    1153                                              ],
    1154                                             [(0, b"y"*100)],
    1155                                             None,
    1156                                             )}, [(10,5)])
    1157         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1158         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1159         reset()
    1160 
    1161         answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11112"),
    1162                                              ],
    1163                                             [(0, b"y"*100)],
    1164                                             None,
    1165                                             )}, [(10,5)])
    1166         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1167         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    11681171        reset()
    11691172
     
    11851188        self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    11861189        self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1187         reset()
    1188 
    1189         #  ne
    1190         answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11111"),
    1191                                              ],
    1192                                             [(0, b"x"*100)],
    1193                                             None,
    1194                                             )}, [(10,5)])
    1195         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1196         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1197         reset()
    1198 
    1199         answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11112"),
    1200                                              ],
    1201                                             [(0, b"y"*100)],
    1202                                             None,
    1203                                             )}, [(10,5)])
    1204         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1205         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1206         reset()
    1207 
    1208         #  ge
    1209         answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11110"),
    1210                                              ],
    1211                                             [(0, b"y"*100)],
    1212                                             None,
    1213                                             )}, [(10,5)])
    1214         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1215         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1216         reset()
    1217 
    1218         answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11111"),
    1219                                              ],
    1220                                             [(0, b"y"*100)],
    1221                                             None,
    1222                                             )}, [(10,5)])
    1223         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1224         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1225         reset()
    1226 
    1227         answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11112"),
    1228                                              ],
    1229                                             [(0, b"y"*100)],
    1230                                             None,
    1231                                             )}, [(10,5)])
    1232         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1233         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1234         reset()
    1235 
    1236         #  gt
    1237         answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11110"),
    1238                                              ],
    1239                                             [(0, b"y"*100)],
    1240                                             None,
    1241                                             )}, [(10,5)])
    1242         self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
    1243         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
    1244         reset()
    1245 
    1246         answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11111"),
    1247                                              ],
    1248                                             [(0, b"x"*100)],
    1249                                             None,
    1250                                             )}, [(10,5)])
    1251         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1252         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    1253         reset()
    1254 
    1255         answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11112"),
    1256                                              ],
    1257                                             [(0, b"x"*100)],
    1258                                             None,
    1259                                             )}, [(10,5)])
    1260         self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
    1261         self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
    12621190        reset()
    12631191
  • TabularUnified src/allmydata/web/status.py

    r49b6080 rbf670c0  
    1515import itertools
    1616import hashlib
     17import re
    1718from twisted.internet import defer
    1819from twisted.python.filepath import FilePath
     
    15521553        return json.dumps(stats, indent=1) + "\n"
    15531554
     1555    @render_exception
     1556    def render_OPENMETRICS(self, req):
     1557        """
     1558        Render our stats in `OpenMetrics <https://openmetrics.io/>` format.
     1559        For example Prometheus and Victoriametrics can parse this.
     1560        Point the scraper to ``/statistics?t=openmetrics`` (instead of the
     1561        default ``/metrics``).
     1562        """
     1563        req.setHeader("content-type", "application/openmetrics-text; version=1.0.0; charset=utf-8")
     1564        stats = self._provider.get_stats()
     1565        ret = []
     1566
     1567        def mangle_name(name):
     1568            return re.sub(
     1569                u"_(\d\d)_(\d)_percentile",
     1570                u'{quantile="0.\g<1>\g<2>"}',
     1571                name.replace(u".", u"_")
     1572            )
     1573
     1574        def mangle_value(val):
     1575            return str(val) if val is not None else u"NaN"
     1576
     1577        for (k, v) in sorted(stats['counters'].items()):
     1578            ret.append(u"tahoe_counters_%s %s" % (mangle_name(k), mangle_value(v)))
     1579        for (k, v) in sorted(stats['stats'].items()):
     1580            ret.append(u"tahoe_stats_%s %s" % (mangle_name(k), mangle_value(v)))
     1581
     1582        ret.append(u"# EOF\n")
     1583
     1584        return u"\n".join(ret)
     1585
    15541586class StatisticsElement(Element):
    15551587
Note: See TracChangeset for help on using the changeset viewer.