Changeset bf670c0 in trunk
- Timestamp:
- 2021-10-12T21:42:59Z (4 years ago)
- Branches:
- master
- Children:
- 26419c4
- Parents:
- 49b6080 (diff), be5f583 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - Files:
-
- 14 added
- 18 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified .circleci/config.yml ¶
r49b6080 rbf670c0 41 41 42 42 - "nixos-19-09": 43 {} 44 45 - "nixos-21-05": 43 46 {} 44 47 … … 439 442 user: "nobody" 440 443 441 442 nixos-19-09: 444 nixos-19-09: &NIXOS 443 445 docker: 444 446 # Run in a highly Nix-capable environment. … … 448 450 environment: 449 451 NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/nixos-19.09-small.tar.gz" 452 SOURCE: "nix/" 450 453 451 454 steps: … … 464 467 # advantage of multiple cores and we get a little speedup by doing 465 468 # them in parallel. 466 nix-build --cores 3 --max-jobs 2 nix/ 469 nix-build --cores 3 --max-jobs 2 "$SOURCE" 470 471 nixos-21-05: 472 <<: *NIXOS 473 474 environment: 475 # Note this doesn't look more similar to the 19.09 NIX_PATH URL because 476 # there was some internal shuffling by the NixOS project about how they 477 # publish stable revisions. 478 NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs/archive/d32b07e6df276d78e3640eb43882b80c9b2b3459.tar.gz" 479 SOURCE: "nix/py3.nix" 467 480 468 481 typechecks: -
TabularUnified .github/workflows/ci.yml ¶
r49b6080 rbf670c0 29 29 include: 30 30 # On macOS don't bother with 3.6-3.8, just to get faster builds. 31 - os: macos- latest31 - os: macos-10.15 32 32 python-version: 2.7 33 33 - os: macos-latest … … 169 169 include: 170 170 # On macOS don't bother with 3.6, just to get faster builds. 171 - os: macos- latest171 - os: macos-10.15 172 172 python-version: 2.7 173 173 - os: macos-latest … … 184 184 # tests on macOS. 185 185 - name: Install Tor [macOS, ${{ matrix.python-version }} ] 186 if: ${{ matrix.os == 'macos-latest'}}186 if: ${{ contains(matrix.os, 'macos') }} 187 187 run: | 188 188 brew extract --version 0.4.5.8 tor homebrew/cask … … 248 248 matrix: 249 249 os: 250 - macos- latest250 - macos-10.15 251 251 - windows-latest 252 252 - ubuntu-latest -
TabularUnified docs/proposed/http-storage-node-protocol.rst ¶
r49b6080 rbf670c0 483 483 It could also just be that the client's preferred servers have changed. 484 484 485 ``P UT/v1/immutable/:storage_index/:share_number``486 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 485 ``PATCH /v1/immutable/:storage_index/:share_number`` 486 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 487 487 488 488 Write data for the indicated share. … … 498 498 The server must recognize when all of the data has been received and mark the share as complete 499 499 (which it can do because it was informed of the size when the storage index was initialized). 500 Clients should upload chunks in re-assembly order.501 500 502 501 * When a chunk that does not complete the share is successfully uploaded the response is ``OK``. 503 * When the chunk that completes the share is successfully uploaded the response is ``CREATED``.504 * If the *Content-Range* for a request covers part of the share that has already been uploaded the response is ``CONFLICT``.505 502 The response body indicates the range of share data that has yet to be uploaded. 506 503 That is:: … … 514 511 ] 515 512 } 513 514 * When the chunk that completes the share is successfully uploaded the response is ``CREATED``. 515 * If the *Content-Range* for a request covers part of the share that has already, 516 and the data does not match already written data, 517 the response is ``CONFLICT``. 518 At this point the only thing to do is abort the upload and start from scratch (see below). 519 520 ``PUT /v1/immutable/:storage_index/:share_number/abort`` 521 !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 522 523 This cancels an *in-progress* upload. 524 525 The response code: 526 527 * When the upload is still in progress and therefore the abort has succeeded, 528 the response is ``OK``. 529 Future uploads can start from scratch with no pre-existing upload state stored on the server. 530 * If the uploaded has already finished, the response is 405 (Method Not Allowed) 531 and no change is made. 532 533 534 Discussion 535 `````````` 536 537 ``PUT`` verbs are only supposed to be used to replace the whole resource, 538 thus the use of ``PATCH``. 539 From RFC 7231:: 540 541 An origin server that allows PUT on a given target resource MUST send 542 a 400 (Bad Request) response to a PUT request that contains a 543 Content-Range header field (Section 4.2 of [RFC7233]), since the 544 payload is likely to be partial content that has been mistakenly PUT 545 as a full representation. Partial content updates are possible by 546 targeting a separately identified resource with state that overlaps a 547 portion of the larger resource, or by using a different method that 548 has been specifically defined for partial updates (for example, the 549 PATCH method defined in [RFC5789]). 516 550 517 551 … … 601 635 "offset": 3, 602 636 "size": 5, 603 "operator": "eq",604 637 "specimen": "hello" 605 638 }, ...], … … 627 660 } 628 661 662 A test vector or read vector that read beyond the boundaries of existing data will return nothing for any bytes past the end. 663 As a result, if there is no data at all, an empty bytestring is returned no matter what the offset or length. 664 629 665 Reading 630 666 ~~~~~~~ … … 667 703 #. Upload the content for immutable share ``7``:: 668 704 669 P UT/v1/immutable/AAAAAAAAAAAAAAAA/7705 PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 670 706 Content-Range: bytes 0-15/48 671 707 <first 16 bytes of share data> … … 673 709 200 OK 674 710 675 P UT/v1/immutable/AAAAAAAAAAAAAAAA/7711 PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 676 712 Content-Range: bytes 16-31/48 677 713 <second 16 bytes of share data> … … 679 715 200 OK 680 716 681 P UT/v1/immutable/AAAAAAAAAAAAAAAA/7717 PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 682 718 Content-Range: bytes 32-47/48 683 719 <final 16 bytes of share data> … … 702 738 ~~~~~~~~~~~~ 703 739 704 1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``:: 740 1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``. 741 The special test vector of size 1 but empty bytes will only pass 742 if there is no existing share, 743 otherwise it will read a byte which won't match `b""`:: 705 744 706 745 POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write … … 716 755 "offset": 0, 717 756 "size": 1, 718 "operator": "eq",719 757 "specimen": "" 720 758 }], … … 748 786 "test": [{ 749 787 "offset": 0, 750 "size": <checkstring size>, 751 "operator": "eq", 788 "size": <length of checkstring>, 752 789 "specimen": "<checkstring>" 753 790 }], -
TabularUnified nix/overlays.nix ¶
r49b6080 rbf670c0 3 3 packageOverrides = python-self: python-super: { 4 4 # eliot is not part of nixpkgs at all at this time. 5 eliot = python-self. callPackage ./eliot.nix { };5 eliot = python-self.pythonPackages.callPackage ./eliot.nix { }; 6 6 7 7 # NixOS autobahn package has trollius as a dependency, although 8 8 # it is optional. Trollius is unmaintained and fails on CI. 9 autobahn = python-super. callPackage ./autobahn.nix { };9 autobahn = python-super.pythonPackages.callPackage ./autobahn.nix { }; 10 10 11 11 # Porting to Python 3 is greatly aided by the future package. A 12 12 # slightly newer version than appears in nixos 19.09 is helpful. 13 future = python-super. callPackage ./future.nix { };13 future = python-super.pythonPackages.callPackage ./future.nix { }; 14 14 15 15 # Need version of pyutil that supports Python 3. The version in 19.09 16 16 # is too old. 17 pyutil = python-super. callPackage ./pyutil.nix { };17 pyutil = python-super.pythonPackages.callPackage ./pyutil.nix { }; 18 18 19 19 # Need a newer version of Twisted, too. 20 twisted = python-super.callPackage ./twisted.nix { }; 20 twisted = python-super.pythonPackages.callPackage ./twisted.nix { }; 21 22 # collections-extended is not part of nixpkgs at this time. 23 collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { }; 24 }; 25 }; 26 27 python39 = super.python39.override { 28 packageOverrides = python-self: python-super: { 29 # collections-extended is not part of nixpkgs at this time. 30 collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { }; 21 31 }; 22 32 }; -
TabularUnified nix/tahoe-lafs.nix ¶
r49b6080 rbf670c0 98 98 service-identity pyyaml magic-wormhole treq 99 99 eliot autobahn cryptography netifaces setuptools 100 future pyutil distro configparser 100 future pyutil distro configparser collections-extended 101 101 ]; 102 102 … … 108 108 html5lib 109 109 tenacity 110 prometheus_client 110 111 ]; 111 112 -
TabularUnified setup.py ¶
r49b6080 rbf670c0 138 138 # Backported configparser for Python 2: 139 139 "configparser ; python_version < '3.0'", 140 141 # For the RangeMap datastructure. 142 "collections-extended", 140 143 ] 141 144 … … 405 408 "paramiko", 406 409 "pytest-timeout", 410 # Does our OpenMetrics endpoint adhere to the spec: 411 "prometheus-client == 0.11.0", 407 412 ] + tor_requires + i2p_requires, 408 413 "tor": tor_requires, -
TabularUnified src/allmydata/interfaces.py ¶
r49b6080 rbf670c0 54 54 55 55 56 class DataTooLargeError(Exception): 57 """The write went past the expected size of the bucket.""" 58 59 60 class ConflictingWriteError(Exception): 61 """Two writes happened to same immutable with different data.""" 62 63 56 64 class RIBucketWriter(RemoteInterface): 57 65 """ Objects of this kind live on the server side. """ … … 92 100 TestVector = ListOf(TupleOf(Offset, ReadSize, bytes, bytes)) 93 101 # elements are (offset, length, operator, specimen) 94 # operator is one of "lt, le, eq, ne, ge, gt"95 # nop always passes and is used to fetch data while writing.96 # you should use length==len(specimen) for everything except nop102 # operator must be b"eq", typically length==len(specimen), but one can ensure 103 # writes don't happen to empty shares by setting length to 1 and specimen to 104 # b"". The operator is still used for wire compatibility with old versions. 97 105 DataVector = ListOf(TupleOf(Offset, ShareData)) 98 106 # (offset, data). This limits us to 30 writes of 1MiB each per call … … 352 360 """ 353 361 :see: ``RIStorageServer.slot_testv_readv_and_writev`` 362 363 While the interface mostly matches, test vectors are simplified. 364 Instead of a tuple ``(offset, read_size, operator, expected_data)`` in 365 the original, for this method you need only pass in 366 ``(offset, read_size, expected_data)``, with the operator implicitly 367 being ``b"eq"``. 354 368 """ 355 369 -
TabularUnified src/allmydata/mutable/layout.py ¶
r49b6080 rbf670c0 310 310 else: 311 311 checkstring = checkstring_or_seqnum 312 self._testvs = [(0, len(checkstring), b"eq",checkstring)]312 self._testvs = [(0, len(checkstring), checkstring)] 313 313 314 314 … … 319 319 """ 320 320 if self._testvs: 321 return self._testvs[0][ 3]321 return self._testvs[0][2] 322 322 return b"" 323 323 … … 549 549 # Our caller has not provided us with another checkstring 550 550 # yet, so we assume that we are writing a new share, and set 551 # a test vector that will allow a new share to be written.551 # a test vector that will only allow a new share to be written. 552 552 self._testvs = [] 553 self._testvs.append(tuple([0, 1, b" eq", b""]))553 self._testvs.append(tuple([0, 1, b""])) 554 554 555 555 tw_vectors = {} … … 890 890 else: 891 891 self._testvs = [] 892 self._testvs.append((0, len(checkstring), b"eq",checkstring))892 self._testvs.append((0, len(checkstring), checkstring)) 893 893 894 894 … … 1162 1162 tw_vectors = {} 1163 1163 if not self._testvs: 1164 # Make sure we will only successfully write if the share didn't 1165 # previously exist. 1164 1166 self._testvs = [] 1165 self._testvs.append(tuple([0, 1, b" eq", b""]))1167 self._testvs.append(tuple([0, 1, b""])) 1166 1168 if not self._written: 1167 1169 # Write a new checkstring to the share when we write it, so … … 1171 1173 def _first_write(): 1172 1174 self._written = True 1173 self._testvs = [(0, len(new_checkstring), b"eq",new_checkstring)]1175 self._testvs = [(0, len(new_checkstring), new_checkstring)] 1174 1176 on_success = _first_write 1175 1177 tw_vectors[self.shnum] = (self._testvs, datavs, None) -
TabularUnified src/allmydata/storage/common.py ¶
r49b6080 rbf670c0 14 14 from allmydata.util import base32 15 15 16 class DataTooLargeError(Exception): 17 pass 16 # Backwards compatibility. 17 from allmydata.interfaces import DataTooLargeError # noqa: F401 18 18 19 class UnknownMutableContainerVersionError(Exception): 19 20 pass -
TabularUnified src/allmydata/storage/immutable.py ¶
r49b6080 rbf670c0 14 14 import os, stat, struct, time 15 15 16 from collections_extended import RangeMap 17 16 18 from foolscap.api import Referenceable 17 19 18 20 from zope.interface import implementer 19 from allmydata.interfaces import RIBucketWriter, RIBucketReader 21 from allmydata.interfaces import ( 22 RIBucketWriter, RIBucketReader, ConflictingWriteError, 23 DataTooLargeError, 24 ) 20 25 from allmydata.util import base32, fileutil, log 21 26 from allmydata.util.assertutil import precondition 22 27 from allmydata.util.hashutil import timing_safe_compare 23 28 from allmydata.storage.lease import LeaseInfo 24 from allmydata.storage.common import UnknownImmutableContainerVersionError, \ 25 DataTooLargeError 29 from allmydata.storage.common import UnknownImmutableContainerVersionError 26 30 27 31 # each share file (in storage/shares/$SI/$SHNUM) contains lease information … … 205 209 class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 206 210 207 def __init__(self, ss, incominghome, finalhome, max_size, lease_info , canary):211 def __init__(self, ss, incominghome, finalhome, max_size, lease_info): 208 212 self.ss = ss 209 213 self.incominghome = incominghome 210 214 self.finalhome = finalhome 211 215 self._max_size = max_size # don't allow the client to write more than this 212 self._canary = canary213 self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)214 216 self.closed = False 215 217 self.throw_out_all_data = False … … 218 220 # added by simultaneous uploaders 219 221 self._sharefile.add_lease(lease_info) 222 self._already_written = RangeMap() 220 223 221 224 def allocated_size(self): … … 227 230 if self.throw_out_all_data: 228 231 return 232 233 # Make sure we're not conflicting with existing data: 234 end = offset + len(data) 235 for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end): 236 chunk_len = chunk_stop - chunk_start 237 actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len) 238 writing_chunk = data[chunk_start - offset:chunk_stop - offset] 239 if actual_chunk != writing_chunk: 240 raise ConflictingWriteError( 241 "Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop) 242 ) 229 243 self._sharefile.write_share_data(offset, data) 244 245 self._already_written.set(True, offset, end) 230 246 self.ss.add_latency("write", time.time() - start) 231 247 self.ss.count("write") … … 263 279 self._sharefile = None 264 280 self.closed = True 265 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)266 281 267 282 filelen = os.stat(self.finalhome)[stat.ST_SIZE] … … 270 285 self.ss.count("close") 271 286 272 def _disconnected(self):287 def disconnected(self): 273 288 if not self.closed: 274 289 self._abort() … … 277 292 log.msg("storage: aborting sharefile %s" % self.incominghome, 278 293 facility="tahoe.storage", level=log.UNUSUAL) 279 if not self.closed:280 self._canary.dontNotifyOnDisconnect(self._disconnect_marker)281 294 self._abort() 282 295 self.ss.count("abort") -
TabularUnified src/allmydata/storage/mutable.py ¶
r49b6080 rbf670c0 435 435 436 436 def testv_compare(a, op, b): 437 assert op in (b"lt", b"le", b"eq", b"ne", b"ge", b"gt") 438 if op == b"lt": 439 return a < b 440 if op == b"le": 441 return a <= b 442 if op == b"eq": 443 return a == b 444 if op == b"ne": 445 return a != b 446 if op == b"ge": 447 return a >= b 448 if op == b"gt": 449 return a > b 450 # never reached 437 assert op == b"eq" 438 return a == b 439 451 440 452 441 class EmptyShare(object): -
TabularUnified src/allmydata/storage/server.py ¶
r49b6080 rbf670c0 12 12 # strings. Omit bytes so we don't leak future's custom bytes. 13 13 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 14 14 else: 15 from typing import Dict 15 16 16 17 import os, re, struct, time 17 import weakref18 18 import six 19 19 20 20 from foolscap.api import Referenceable 21 from foolscap.ipb import IRemoteReference 21 22 from twisted.application import service 22 23 … … 90 91 self._clean_incomplete() 91 92 fileutil.make_dirs(self.incomingdir) 92 self._active_writers = weakref.WeakKeyDictionary()93 93 log.msg("StorageServer created", facility="tahoe.storage") 94 94 … … 121 121 self.lease_checker.setServiceParent(self) 122 122 self._get_current_time = get_current_time 123 124 # Currently being-written Bucketwriters. For Foolscap, lifetime is tied 125 # to connection: when disconnection happens, the BucketWriters are 126 # removed. For HTTP, this makes no sense, so there will be 127 # timeout-based cleanup; see 128 # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807. 129 130 # Map in-progress filesystem path -> BucketWriter: 131 self._bucket_writers = {} # type: Dict[str,BucketWriter] 132 # Canaries and disconnect markers for BucketWriters created via Foolscap: 133 self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] 123 134 124 135 def __repr__(self): … … 239 250 def allocated_size(self): 240 251 space = 0 241 for bw in self._ active_writers:252 for bw in self._bucket_writers.values(): 242 253 space += bw.allocated_size() 243 254 return space … … 264 275 return version 265 276 266 def remote_allocate_buckets(self, storage_index, 267 renew_secret, cancel_secret, 268 sharenums, allocated_size, 269 canary, owner_num=0): 277 def _allocate_buckets(self, storage_index, 278 renew_secret, cancel_secret, 279 sharenums, allocated_size, 280 owner_num=0): 281 """ 282 Generic bucket allocation API. 283 """ 270 284 # owner_num is not for clients to set, but rather it should be 271 285 # curried into the PersonalStorageServer instance that is dedicated … … 316 330 pass 317 331 elif os.path.exists(incominghome): 318 # Note thatwe don't create BucketWriters for shnums that332 # For Foolscap we don't create BucketWriters for shnums that 319 333 # have a partial share (in incoming/), so if a second upload 320 334 # occurs while the first is still in progress, the second … … 324 338 # ok! we need to create the new share file. 325 339 bw = BucketWriter(self, incominghome, finalhome, 326 max_space_per_bucket, lease_info , canary)340 max_space_per_bucket, lease_info) 327 341 if self.no_storage: 328 342 bw.throw_out_all_data = True 329 343 bucketwriters[shnum] = bw 330 self._ active_writers[bw] = 1344 self._bucket_writers[incominghome] = bw 331 345 if limited: 332 346 remaining_space -= max_space_per_bucket … … 339 353 340 354 self.add_latency("allocate", self._get_current_time() - start) 355 return alreadygot, bucketwriters 356 357 def remote_allocate_buckets(self, storage_index, 358 renew_secret, cancel_secret, 359 sharenums, allocated_size, 360 canary, owner_num=0): 361 """Foolscap-specific ``allocate_buckets()`` API.""" 362 alreadygot, bucketwriters = self._allocate_buckets( 363 storage_index, renew_secret, cancel_secret, sharenums, allocated_size, 364 owner_num=owner_num, 365 ) 366 # Abort BucketWriters if disconnection happens. 367 for bw in bucketwriters.values(): 368 disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) 369 self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) 341 370 return alreadygot, bucketwriters 342 371 … … 384 413 if self.stats_provider: 385 414 self.stats_provider.count('storage_server.bytes_added', consumed_size) 386 del self._active_writers[bw] 415 del self._bucket_writers[bw.incominghome] 416 if bw in self._bucket_writer_disconnect_markers: 417 canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) 418 canary.dontNotifyOnDisconnect(disconnect_marker) 387 419 388 420 def _get_bucket_shares(self, storage_index): -
TabularUnified src/allmydata/storage_client.py ¶
r49b6080 rbf670c0 995 995 r_vector, 996 996 ): 997 # Match the wire protocol, which requires 4-tuples for test vectors. 998 wire_format_tw_vectors = { 999 key: ( 1000 [(start, length, b"eq", data) for (start, length, data) in value[0]], 1001 value[1], 1002 value[2], 1003 ) for (key, value) in tw_vectors.items() 1004 } 997 1005 return self._rref.callRemote( 998 1006 "slot_testv_and_readv_and_writev", 999 1007 storage_index, 1000 1008 secrets, 1001 tw_vectors,1009 wire_format_tw_vectors, 1002 1010 r_vector, 1003 1011 ) -
TabularUnified src/allmydata/test/common_util.py ¶
r49b6080 rbf670c0 315 315 return "<fake>" 316 316 317 def disconnected(self): 318 """Disconnect the canary, to be called by test code. 319 320 Can only happen once. 321 """ 322 if self.disconnectors is not None: 323 for (f, args, kwargs) in list(self.disconnectors.values()): 324 f(*args, **kwargs) 325 self.disconnectors = None 326 317 327 318 328 class ShouldFailMixin(object): -
TabularUnified src/allmydata/test/mutable/util.py ¶
r49b6080 rbf670c0 150 150 for shnum, (testv, writev, new_length) in list(tw_vectors.items()): 151 151 for (offset, length, op, specimen) in testv: 152 assert op in (b"le", b"eq", b"ge")152 assert op == b"eq" 153 153 # TODO: this isn't right, the read is controlled by read_vector, 154 154 # not by testv -
TabularUnified src/allmydata/test/test_istorageserver.py ¶
r49b6080 rbf670c0 21 21 from random import Random 22 22 23 from testtools import skipIf24 25 23 from twisted.internet.defer import inlineCallbacks 26 24 27 from foolscap.api import Referenceable 25 from foolscap.api import Referenceable, RemoteException 28 26 29 27 from allmydata.interfaces import IStorageServer … … 78 76 79 77 ``self.storage_server`` is expected to provide ``IStorageServer``. 78 79 ``self.disconnect()`` should disconnect and then reconnect, creating a new 80 ``self.storage_server``. Some implementations may wish to skip tests using 81 this; HTTP has no notion of disconnection. 80 82 """ 81 83 … … 99 101 100 102 @inlineCallbacks 101 @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")102 103 def test_allocate_buckets_repeat(self): 103 104 """ 104 allocate_buckets() with the same storage index returns the same result, 105 because the shares have not been written to. 106 107 This fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 105 ``IStorageServer.allocate_buckets()`` with the same storage index does not return 106 work-in-progress buckets, but will add any newly added buckets. 108 107 """ 109 108 storage_index, renew_secret, cancel_secret = ( … … 116 115 renew_secret, 117 116 cancel_secret, 118 sharenums=set(range( 5)),117 sharenums=set(range(4)), 119 118 allocated_size=1024, 120 119 canary=Referenceable(), … … 129 128 ) 130 129 self.assertEqual(already_got, already_got2) 131 self.assertEqual(set(allocated.keys()), set(allocated2.keys())) 132 133 @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793") 134 @inlineCallbacks 135 def test_allocate_buckets_more_sharenums(self): 136 """ 137 allocate_buckets() with the same storage index but more sharenums 138 acknowledges the extra shares don't exist. 139 140 Fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 130 self.assertEqual(set(allocated2.keys()), {4}) 131 132 @inlineCallbacks 133 def abort_or_disconnect_half_way(self, abort_or_disconnect): 134 """ 135 If we disconnect/abort in the middle of writing to a bucket, all data 136 is wiped, and it's even possible to write different data to the bucket. 137 138 (In the real world one shouldn't do that, but writing different data is 139 a good way to test that the original data really was wiped.) 140 141 ``abort_or_disconnect`` is a callback that takes a bucket and aborts up 142 load, or perhaps disconnects the whole connection. 141 143 """ 142 144 storage_index, renew_secret, cancel_secret = ( … … 145 147 new_secret(), 146 148 ) 147 yield self.storage_server.allocate_buckets( 149 (_, allocated) = yield self.storage_server.allocate_buckets( 150 storage_index, 151 renew_secret, 152 cancel_secret, 153 sharenums={0}, 154 allocated_size=1024, 155 canary=Referenceable(), 156 ) 157 158 # Bucket 1 is fully written in one go. 159 yield allocated[0].callRemote("write", 0, b"1" * 1024) 160 161 # Disconnect or abort, depending on the test: 162 yield abort_or_disconnect(allocated[0]) 163 164 # Write different data with no complaint: 165 (_, allocated) = yield self.storage_server.allocate_buckets( 166 storage_index, 167 renew_secret, 168 cancel_secret, 169 sharenums={0}, 170 allocated_size=1024, 171 canary=Referenceable(), 172 ) 173 yield allocated[0].callRemote("write", 0, b"2" * 1024) 174 175 def test_disconnection(self): 176 """ 177 If we disconnect in the middle of writing to a bucket, all data is 178 wiped, and it's even possible to write different data to the bucket. 179 180 (In the real world one shouldn't do that, but writing different data is 181 a good way to test that the original data really was wiped.) 182 183 HTTP protocol should skip this test, since disconnection is meaningless 184 concept; this is more about testing implicit contract the Foolscap 185 implementation depends on doesn't change as we refactor things. 186 """ 187 return self.abort_or_disconnect_half_way(lambda _: self.disconnect()) 188 189 @inlineCallbacks 190 def test_written_shares_are_allocated(self): 191 """ 192 Shares that are fully written to show up as allocated in result from 193 ``IStorageServer.allocate_buckets()``. Partially-written or empty 194 shares don't. 195 """ 196 storage_index, renew_secret, cancel_secret = ( 197 new_storage_index(), 198 new_secret(), 199 new_secret(), 200 ) 201 (_, allocated) = yield self.storage_server.allocate_buckets( 148 202 storage_index, 149 203 renew_secret, … … 153 207 canary=Referenceable(), 154 208 ) 155 (already_got2, allocated2) = yield self.storage_server.allocate_buckets( 156 storage_index, 157 renew_secret, 158 cancel_secret, 159 sharenums=set(range(7)), 160 allocated_size=1024, 161 canary=Referenceable(), 162 ) 163 self.assertEqual(already_got2, set()) # none were fully written 164 self.assertEqual(set(allocated2.keys()), set(range(7))) 165 166 @inlineCallbacks 167 def test_written_shares_are_allocated(self): 168 """ 169 Shares that are fully written to show up as allocated in result from 170 ``IStorageServer.allocate_buckets()``. Partially-written or empty 171 shares don't. 172 """ 173 storage_index, renew_secret, cancel_secret = ( 174 new_storage_index(), 175 new_secret(), 176 new_secret(), 177 ) 178 (_, allocated) = yield self.storage_server.allocate_buckets( 209 210 # Bucket 1 is fully written in one go. 211 yield allocated[1].callRemote("write", 0, b"1" * 1024) 212 yield allocated[1].callRemote("close") 213 214 # Bucket 2 is fully written in two steps. 215 yield allocated[2].callRemote("write", 0, b"1" * 512) 216 yield allocated[2].callRemote("write", 512, b"2" * 512) 217 yield allocated[2].callRemote("close") 218 219 # Bucket 0 has partial write. 220 yield allocated[0].callRemote("write", 0, b"1" * 512) 221 222 (already_got, _) = yield self.storage_server.allocate_buckets( 179 223 storage_index, 180 224 renew_secret, … … 184 228 canary=Referenceable(), 185 229 ) 186 187 # Bucket 1 is fully written in one go. 188 yield allocated[1].callRemote("write", 0, b"1" * 1024) 189 yield allocated[1].callRemote("close") 190 191 # Bucket 2 is fully written in two steps. 192 yield allocated[2].callRemote("write", 0, b"1" * 512) 193 yield allocated[2].callRemote("write", 512, b"2" * 512) 194 yield allocated[2].callRemote("close") 195 196 # Bucket 0 has partial write. 197 yield allocated[0].callRemote("write", 0, b"1" * 512) 198 199 (already_got, _) = yield self.storage_server.allocate_buckets( 230 self.assertEqual(already_got, {1, 2}) 231 232 @inlineCallbacks 233 def test_written_shares_are_readable(self): 234 """ 235 Shares that are fully written to can be read. 236 237 The result is not affected by the order in which writes 238 happened, only by their offsets. 239 """ 240 storage_index, renew_secret, cancel_secret = ( 241 new_storage_index(), 242 new_secret(), 243 new_secret(), 244 ) 245 (_, allocated) = yield self.storage_server.allocate_buckets( 200 246 storage_index, 201 247 renew_secret, … … 205 251 canary=Referenceable(), 206 252 ) 207 self.assertEqual(already_got, {1, 2})208 209 @inlineCallbacks210 def test_written_shares_are_readable(self):211 """212 Shares that are fully written to can be read.213 214 The result is not affected by the order in which writes215 happened, only by their offsets.216 """217 storage_index, renew_secret, cancel_secret = (218 new_storage_index(),219 new_secret(),220 new_secret(),221 )222 (_, allocated) = yield self.storage_server.allocate_buckets(223 storage_index,224 renew_secret,225 cancel_secret,226 sharenums=set(range(5)),227 allocated_size=1024,228 canary=Referenceable(),229 )230 253 231 254 # Bucket 1 is fully written in order … … 249 272 ) 250 273 251 @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801") 252 def test_overlapping_writes(self): 253 """ 254 The policy for overlapping writes is TBD: 255 https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801 256 """ 274 @inlineCallbacks 275 def test_non_matching_overlapping_writes(self): 276 """ 277 When doing overlapping writes in immutable uploads, non-matching writes 278 fail. 279 """ 280 storage_index, renew_secret, cancel_secret = ( 281 new_storage_index(), 282 new_secret(), 283 new_secret(), 284 ) 285 (_, allocated) = yield self.storage_server.allocate_buckets( 286 storage_index, 287 renew_secret, 288 cancel_secret, 289 sharenums={0}, 290 allocated_size=30, 291 canary=Referenceable(), 292 ) 293 294 yield allocated[0].callRemote("write", 0, b"1" * 25) 295 # Overlapping write that doesn't match: 296 with self.assertRaises(RemoteException): 297 yield allocated[0].callRemote("write", 20, b"2" * 10) 298 299 @inlineCallbacks 300 def test_matching_overlapping_writes(self): 301 """ 302 When doing overlapping writes in immutable uploads, matching writes 303 succeed. 304 """ 305 storage_index, renew_secret, cancel_secret = ( 306 new_storage_index(), 307 new_secret(), 308 new_secret(), 309 ) 310 (_, allocated) = yield self.storage_server.allocate_buckets( 311 storage_index, 312 renew_secret, 313 cancel_secret, 314 sharenums={0}, 315 allocated_size=25, 316 canary=Referenceable(), 317 ) 318 319 yield allocated[0].callRemote("write", 0, b"1" * 10) 320 # Overlapping write that matches: 321 yield allocated[0].callRemote("write", 5, b"1" * 20) 322 yield allocated[0].callRemote("close") 323 324 buckets = yield self.storage_server.get_buckets(storage_index) 325 self.assertEqual(set(buckets.keys()), {0}) 326 327 self.assertEqual((yield buckets[0].callRemote("read", 0, 25)), b"1" * 25) 328 329 def test_abort(self): 330 """ 331 If we call ``abort`` on the ``RIBucketWriter`` to disconnect in the 332 middle of writing to a bucket, all data is wiped, and it's even 333 possible to write different data to the bucket. 334 335 (In the real world one probably wouldn't do that, but writing different 336 data is a good way to test that the original data really was wiped.) 337 """ 338 return self.abort_or_disconnect_half_way( 339 lambda bucket: bucket.callRemote("abort") 340 ) 341 342 @inlineCallbacks 343 def test_get_buckets_skips_unfinished_buckets(self): 344 """ 345 Buckets that are not fully written are not returned by 346 ``IStorageServer.get_buckets()`` implementations. 347 """ 348 storage_index = new_storage_index() 349 (_, allocated) = yield self.storage_server.allocate_buckets( 350 storage_index, 351 renew_secret=new_secret(), 352 cancel_secret=new_secret(), 353 sharenums=set(range(5)), 354 allocated_size=10, 355 canary=Referenceable(), 356 ) 357 358 # Bucket 1 is fully written 359 yield allocated[1].callRemote("write", 0, b"1" * 10) 360 yield allocated[1].callRemote("close") 361 362 # Bucket 2 is partially written 363 yield allocated[2].callRemote("write", 0, b"1" * 5) 364 365 buckets = yield self.storage_server.get_buckets(storage_index) 366 self.assertEqual(set(buckets.keys()), {1}) 367 368 @inlineCallbacks 369 def test_read_bucket_at_offset(self): 370 """ 371 Given a read bucket returned from ``IStorageServer.get_buckets()``, it 372 is possible to read at different offsets and lengths, with reads past 373 the end resulting in empty bytes. 374 """ 375 length = 256 * 17 376 377 storage_index = new_storage_index() 378 (_, allocated) = yield self.storage_server.allocate_buckets( 379 storage_index, 380 renew_secret=new_secret(), 381 cancel_secret=new_secret(), 382 sharenums=set(range(1)), 383 allocated_size=length, 384 canary=Referenceable(), 385 ) 386 387 total_data = _randbytes(256 * 17) 388 yield allocated[0].callRemote("write", 0, total_data) 389 yield allocated[0].callRemote("close") 390 391 buckets = yield self.storage_server.get_buckets(storage_index) 392 bucket = buckets[0] 393 for start, to_read in [ 394 (0, 250), # fraction 395 (0, length), # whole thing 396 (100, 1024), # offset fraction 397 (length + 1, 100), # completely out of bounds 398 (length - 100, 200), # partially out of bounds 399 ]: 400 data = yield bucket.callRemote("read", start, to_read) 401 self.assertEqual( 402 data, 403 total_data[start : start + to_read], 404 "Didn't match for start {}, length {}".format(start, to_read), 405 ) 406 407 @inlineCallbacks 408 def test_bucket_advise_corrupt_share(self): 409 """ 410 Calling ``advise_corrupt_share()`` on a bucket returned by 411 ``IStorageServer.get_buckets()`` does not result in error (other 412 behavior is opaque at this level of abstraction). 413 """ 414 storage_index = new_storage_index() 415 (_, allocated) = yield self.storage_server.allocate_buckets( 416 storage_index, 417 renew_secret=new_secret(), 418 cancel_secret=new_secret(), 419 sharenums=set(range(1)), 420 allocated_size=10, 421 canary=Referenceable(), 422 ) 423 424 yield allocated[0].callRemote("write", 0, b"0123456789") 425 yield allocated[0].callRemote("close") 426 427 buckets = yield self.storage_server.get_buckets(storage_index) 428 yield buckets[0].callRemote("advise_corrupt_share", b"OH NO") 429 430 431 class IStorageServerMutableAPIsTestsMixin(object): 432 """ 433 Tests for ``IStorageServer``'s mutable APIs. 434 435 ``self.storage_server`` is expected to provide ``IStorageServer``. 436 437 ``STARAW`` is short for ``slot_testv_and_readv_and_writev``. 438 """ 439 440 def new_secrets(self): 441 """Return a 3-tuple of secrets for STARAW calls.""" 442 return (new_secret(), new_secret(), new_secret()) 443 444 def staraw(self, *args, **kwargs): 445 """Like ``slot_testv_and_readv_and_writev``, but less typing.""" 446 return self.storage_server.slot_testv_and_readv_and_writev(*args, **kwargs) 447 448 @inlineCallbacks 449 def test_STARAW_reads_after_write(self): 450 """ 451 When data is written with 452 ``IStorageServer.slot_testv_and_readv_and_writev``, it can then be read 453 by a separate call using that API. 454 """ 455 secrets = self.new_secrets() 456 storage_index = new_storage_index() 457 (written, _) = yield self.staraw( 458 storage_index, 459 secrets, 460 tw_vectors={ 461 0: ([], [(0, b"abcdefg")], 7), 462 1: ([], [(0, b"0123"), (4, b"456")], 7), 463 }, 464 r_vector=[], 465 ) 466 self.assertEqual(written, True) 467 468 (_, reads) = yield self.staraw( 469 storage_index, 470 secrets, 471 tw_vectors={}, 472 # Whole thing, partial, going beyond the edge, completely outside 473 # range: 474 r_vector=[(0, 7), (2, 3), (6, 8), (100, 10)], 475 ) 476 self.assertEqual( 477 reads, 478 {0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]}, 479 ) 480 481 @inlineCallbacks 482 def test_SATRAW_reads_happen_before_writes_in_single_query(self): 483 """ 484 If a ``IStorageServer.slot_testv_and_readv_and_writev`` command 485 contains both reads and writes, the read returns results that precede 486 the write. 487 """ 488 secrets = self.new_secrets() 489 storage_index = new_storage_index() 490 (written, _) = yield self.staraw( 491 storage_index, 492 secrets, 493 tw_vectors={ 494 0: ([], [(0, b"abcdefg")], 7), 495 }, 496 r_vector=[], 497 ) 498 self.assertEqual(written, True) 499 500 # Read and write in same command; read happens before write: 501 (written, reads) = yield self.staraw( 502 storage_index, 503 secrets, 504 tw_vectors={ 505 0: ([], [(0, b"X" * 7)], 7), 506 }, 507 r_vector=[(0, 7)], 508 ) 509 self.assertEqual(written, True) 510 self.assertEqual(reads, {0: [b"abcdefg"]}) 511 512 # The write is available in next read: 513 (_, reads) = yield self.staraw( 514 storage_index, 515 secrets, 516 tw_vectors={}, 517 r_vector=[(0, 7)], 518 ) 519 self.assertEqual(reads, {0: [b"X" * 7]}) 520 521 @inlineCallbacks 522 def test_SATRAW_writes_happens_only_if_test_matches(self): 523 """ 524 If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes both a 525 test and a write, the write succeeds if the test matches, and fails if 526 the test does not match. 527 """ 528 secrets = self.new_secrets() 529 storage_index = new_storage_index() 530 (written, _) = yield self.staraw( 531 storage_index, 532 secrets, 533 tw_vectors={ 534 0: ([], [(0, b"1" * 7)], 7), 535 }, 536 r_vector=[], 537 ) 538 self.assertEqual(written, True) 539 540 # Test matches, so write happens: 541 (written, _) = yield self.staraw( 542 storage_index, 543 secrets, 544 tw_vectors={ 545 0: ( 546 [(0, 3, b"1" * 3), (3, 4, b"1" * 4)], 547 [(0, b"2" * 7)], 548 7, 549 ), 550 }, 551 r_vector=[], 552 ) 553 self.assertEqual(written, True) 554 (_, reads) = yield self.staraw( 555 storage_index, 556 secrets, 557 tw_vectors={}, 558 r_vector=[(0, 7)], 559 ) 560 self.assertEqual(reads, {0: [b"2" * 7]}) 561 562 # Test does not match, so write does not happen: 563 (written, _) = yield self.staraw( 564 storage_index, 565 secrets, 566 tw_vectors={ 567 0: ([(0, 7, b"1" * 7)], [(0, b"3" * 7)], 7), 568 }, 569 r_vector=[], 570 ) 571 self.assertEqual(written, False) 572 (_, reads) = yield self.staraw( 573 storage_index, 574 secrets, 575 tw_vectors={}, 576 r_vector=[(0, 7)], 577 ) 578 self.assertEqual(reads, {0: [b"2" * 7]}) 579 580 @inlineCallbacks 581 def test_SATRAW_tests_past_end_of_data(self): 582 """ 583 If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes a test 584 vector that reads past the end of the data, the result is limited to 585 actual available data. 586 """ 587 secrets = self.new_secrets() 588 storage_index = new_storage_index() 589 590 # Since there is no data on server, the test vector will return empty 591 # string, which matches expected result, so write will succeed. 592 (written, _) = yield self.staraw( 593 storage_index, 594 secrets, 595 tw_vectors={ 596 0: ([(0, 10, b"")], [(0, b"1" * 7)], 7), 597 }, 598 r_vector=[], 599 ) 600 self.assertEqual(written, True) 601 602 # Now the test vector is a 10-read off of a 7-byte value, but expected 603 # value is still 7 bytes, so the write will again succeed. 604 (written, _) = yield self.staraw( 605 storage_index, 606 secrets, 607 tw_vectors={ 608 0: ([(0, 10, b"1" * 7)], [(0, b"2" * 7)], 7), 609 }, 610 r_vector=[], 611 ) 612 self.assertEqual(written, True) 613 614 @inlineCallbacks 615 def test_SATRAW_reads_past_end_of_data(self): 616 """ 617 If a ``IStorageServer.slot_testv_and_readv_and_writev`` reads past the 618 end of the data, the result is limited to actual available data. 619 """ 620 secrets = self.new_secrets() 621 storage_index = new_storage_index() 622 623 # Write some data 624 (written, _) = yield self.staraw( 625 storage_index, 626 secrets, 627 tw_vectors={ 628 0: ([], [(0, b"12345")], 5), 629 }, 630 r_vector=[], 631 ) 632 self.assertEqual(written, True) 633 634 # Reads past end. 635 (_, reads) = yield self.staraw( 636 storage_index, 637 secrets, 638 tw_vectors={}, 639 r_vector=[(0, 100), (2, 50)], 640 ) 641 self.assertEqual(reads, {0: [b"12345", b"345"]}) 642 643 @inlineCallbacks 644 def test_STARAW_write_enabler_must_match(self): 645 """ 646 If the write enabler secret passed to 647 ``IStorageServer.slot_testv_and_readv_and_writev`` doesn't match 648 previous writes, the write fails. 649 """ 650 secrets = self.new_secrets() 651 storage_index = new_storage_index() 652 (written, _) = yield self.staraw( 653 storage_index, 654 secrets, 655 tw_vectors={ 656 0: ([], [(0, b"1" * 7)], 7), 657 }, 658 r_vector=[], 659 ) 660 self.assertEqual(written, True) 661 662 # Write enabler secret does not match, so write does not happen: 663 bad_secrets = (new_secret(),) + secrets[1:] 664 with self.assertRaises(RemoteException): 665 yield self.staraw( 666 storage_index, 667 bad_secrets, 668 tw_vectors={ 669 0: ([], [(0, b"2" * 7)], 7), 670 }, 671 r_vector=[], 672 ) 673 (_, reads) = yield self.staraw( 674 storage_index, 675 secrets, 676 tw_vectors={}, 677 r_vector=[(0, 7)], 678 ) 679 self.assertEqual(reads, {0: [b"1" * 7]}) 680 681 @inlineCallbacks 682 def test_STARAW_zero_new_length_deletes(self): 683 """ 684 A zero new length passed to 685 ``IStorageServer.slot_testv_and_readv_and_writev`` deletes the share. 686 """ 687 secrets = self.new_secrets() 688 storage_index = new_storage_index() 689 (written, _) = yield self.staraw( 690 storage_index, 691 secrets, 692 tw_vectors={ 693 0: ([], [(0, b"1" * 7)], 7), 694 }, 695 r_vector=[], 696 ) 697 self.assertEqual(written, True) 698 699 # Write with new length of 0: 700 (written, _) = yield self.staraw( 701 storage_index, 702 secrets, 703 tw_vectors={ 704 0: ([], [(0, b"1" * 7)], 0), 705 }, 706 r_vector=[], 707 ) 708 self.assertEqual(written, True) 709 710 # It's gone! 711 (_, reads) = yield self.staraw( 712 storage_index, 713 secrets, 714 tw_vectors={}, 715 r_vector=[(0, 7)], 716 ) 717 self.assertEqual(reads, {}) 718 719 @inlineCallbacks 720 def test_slot_readv(self): 721 """ 722 Data written with ``IStorageServer.slot_testv_and_readv_and_writev()`` 723 can be read using ``IStorageServer.slot_readv()``. Reads can't go past 724 the end of the data. 725 """ 726 secrets = self.new_secrets() 727 storage_index = new_storage_index() 728 (written, _) = yield self.staraw( 729 storage_index, 730 secrets, 731 tw_vectors={ 732 0: ([], [(0, b"abcdefg")], 7), 733 1: ([], [(0, b"0123"), (4, b"456")], 7), 734 # This will never get read from, just here to show we only read 735 # from shares explicitly requested by slot_readv: 736 2: ([], [(0, b"XYZW")], 4), 737 }, 738 r_vector=[], 739 ) 740 self.assertEqual(written, True) 741 742 reads = yield self.storage_server.slot_readv( 743 storage_index, 744 shares=[0, 1], 745 # Whole thing, partial, going beyond the edge, completely outside 746 # range: 747 readv=[(0, 7), (2, 3), (6, 8), (100, 10)], 748 ) 749 self.assertEqual( 750 reads, 751 {0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]}, 752 ) 753 754 @inlineCallbacks 755 def test_slot_readv_no_shares(self): 756 """ 757 With no shares given, ``IStorageServer.slot_readv()`` reads from all shares. 758 """ 759 secrets = self.new_secrets() 760 storage_index = new_storage_index() 761 (written, _) = yield self.staraw( 762 storage_index, 763 secrets, 764 tw_vectors={ 765 0: ([], [(0, b"abcdefg")], 7), 766 1: ([], [(0, b"0123456")], 7), 767 2: ([], [(0, b"9876543")], 7), 768 }, 769 r_vector=[], 770 ) 771 self.assertEqual(written, True) 772 773 reads = yield self.storage_server.slot_readv( 774 storage_index, 775 shares=[], 776 readv=[(0, 7)], 777 ) 778 self.assertEqual( 779 reads, 780 {0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]}, 781 ) 257 782 258 783 259 784 class _FoolscapMixin(SystemTestMixin): 260 785 """Run tests on Foolscap version of ``IStorageServer.""" 786 787 def _get_native_server(self): 788 return next(iter(self.clients[0].storage_broker.get_known_servers())) 261 789 262 790 @inlineCallbacks … … 266 794 yield SystemTestMixin.setUp(self) 267 795 yield self.set_up_nodes(1) 268 self.storage_server = next( 269 iter(self.clients[0].storage_broker.get_known_servers()) 270 ).get_storage_server() 796 self.storage_server = self._get_native_server().get_storage_server() 271 797 self.assertTrue(IStorageServer.providedBy(self.storage_server)) 272 798 … … 275 801 AsyncTestCase.tearDown(self) 276 802 yield SystemTestMixin.tearDown(self) 803 804 @inlineCallbacks 805 def disconnect(self): 806 """ 807 Disconnect and then reconnect with a new ``IStorageServer``. 808 """ 809 current = self.storage_server 810 yield self.bounce_client(0) 811 self.storage_server = self._get_native_server().get_storage_server() 812 assert self.storage_server is not current 277 813 278 814 … … 287 823 ): 288 824 """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" 825 826 827 class FoolscapMutableAPIsTests( 828 _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase 829 ): 830 """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" -
TabularUnified src/allmydata/test/test_storage.py ¶
r49b6080 rbf670c0 9 9 from __future__ import unicode_literals 10 10 11 from future.utils import native_str, PY2, bytes_to_native_str 11 from future.utils import native_str, PY2, bytes_to_native_str, bchr 12 12 if PY2: 13 13 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 … … 20 20 import struct 21 21 import shutil 22 import gc 22 from uuid import uuid4 23 23 24 24 from twisted.trial import unittest … … 26 26 from twisted.internet import defer 27 27 from twisted.internet.task import Clock 28 29 from hypothesis import given, strategies 28 30 29 31 import itertools … … 34 36 from allmydata.storage.mutable import MutableShareFile 35 37 from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile 36 from allmydata.storage.common import DataTooLargeError,storage_index_to_dir, \38 from allmydata.storage.common import storage_index_to_dir, \ 37 39 UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \ 38 40 si_b2a, si_a2b … … 48 50 VERIFICATION_KEY_SIZE, \ 49 51 SHARE_HASH_CHAIN_SIZE 50 from allmydata.interfaces import BadWriteEnablerError 52 from allmydata.interfaces import ( 53 BadWriteEnablerError, DataTooLargeError, ConflictingWriteError, 54 ) 51 55 from allmydata.test.no_network import NoNetworkServer 52 56 from allmydata.storage_client import ( … … 125 129 def test_create(self): 126 130 incoming, final = self.make_workdir("test_create") 127 bw = BucketWriter(self, incoming, final, 200, self.make_lease(), 128 FakeCanary()) 131 bw = BucketWriter(self, incoming, final, 200, self.make_lease()) 129 132 bw.remote_write(0, b"a"*25) 130 133 bw.remote_write(25, b"b"*25) … … 135 138 def test_readwrite(self): 136 139 incoming, final = self.make_workdir("test_readwrite") 137 bw = BucketWriter(self, incoming, final, 200, self.make_lease(), 138 FakeCanary()) 140 bw = BucketWriter(self, incoming, final, 200, self.make_lease()) 139 141 bw.remote_write(0, b"a"*25) 140 142 bw.remote_write(25, b"b"*25) … … 147 149 self.failUnlessEqual(br.remote_read(25, 25), b"b"*25) 148 150 self.failUnlessEqual(br.remote_read(50, 7), b"c"*7) 151 152 def test_write_past_size_errors(self): 153 """Writing beyond the size of the bucket throws an exception.""" 154 for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]): 155 incoming, final = self.make_workdir( 156 "test_write_past_size_errors-{}".format(i) 157 ) 158 bw = BucketWriter(self, incoming, final, 200, self.make_lease()) 159 with self.assertRaises(DataTooLargeError): 160 bw.remote_write(offset, b"a" * length) 161 162 @given( 163 maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98), 164 maybe_overlapping_length=strategies.integers(min_value=1, max_value=100), 165 ) 166 def test_overlapping_writes_ok_if_matching( 167 self, maybe_overlapping_offset, maybe_overlapping_length 168 ): 169 """ 170 Writes that overlap with previous writes are OK when the content is the 171 same. 172 """ 173 length = 100 174 expected_data = b"".join(bchr(i) for i in range(100)) 175 incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) 176 bw = BucketWriter( 177 self, incoming, final, length, self.make_lease(), 178 ) 179 # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. 180 bw.remote_write(10, expected_data[10:20]) 181 bw.remote_write(30, expected_data[30:40]) 182 bw.remote_write(50, expected_data[50:60]) 183 # Then, an overlapping write but with matching data: 184 bw.remote_write( 185 maybe_overlapping_offset, 186 expected_data[ 187 maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length 188 ] 189 ) 190 # Now fill in the holes: 191 bw.remote_write(0, expected_data[0:10]) 192 bw.remote_write(20, expected_data[20:30]) 193 bw.remote_write(40, expected_data[40:50]) 194 bw.remote_write(60, expected_data[60:]) 195 bw.remote_close() 196 197 br = BucketReader(self, bw.finalhome) 198 self.assertEqual(br.remote_read(0, length), expected_data) 199 200 201 @given( 202 maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98), 203 maybe_overlapping_length=strategies.integers(min_value=1, max_value=100), 204 ) 205 def test_overlapping_writes_not_ok_if_different( 206 self, maybe_overlapping_offset, maybe_overlapping_length 207 ): 208 """ 209 Writes that overlap with previous writes fail with an exception if the 210 contents don't match. 211 """ 212 length = 100 213 incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) 214 bw = BucketWriter( 215 self, incoming, final, length, self.make_lease(), 216 ) 217 # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. 218 bw.remote_write(10, b"1" * 10) 219 bw.remote_write(30, b"1" * 10) 220 bw.remote_write(50, b"1" * 10) 221 # Then, write something that might overlap with some of them, but 222 # conflicts. Then fill in holes left by first three writes. Conflict is 223 # inevitable. 224 with self.assertRaises(ConflictingWriteError): 225 bw.remote_write( 226 maybe_overlapping_offset, 227 b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset), 228 ) 229 bw.remote_write(0, b"1" * 10) 230 bw.remote_write(20, b"1" * 10) 231 bw.remote_write(40, b"1" * 10) 232 bw.remote_write(60, b"1" * 40) 149 233 150 234 def test_read_past_end_of_share_data(self): … … 229 313 fileutil.make_dirs(basedir) 230 314 fileutil.make_dirs(os.path.join(basedir, "tmp")) 231 bw = BucketWriter(self, incoming, final, size, self.make_lease(), 232 FakeCanary()) 315 bw = BucketWriter(self, incoming, final, size, self.make_lease()) 233 316 rb = RemoteBucket(bw) 234 317 return bw, rb, final … … 580 663 OVERHEAD = 3*4 581 664 LEASE_SIZE = 4+32+32+4 582 canary = FakeCanary( True)665 canary = FakeCanary() 583 666 already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) 584 667 self.failUnlessEqual(len(writers), 3) 585 668 # now the StorageServer should have 3000 bytes provisionally 586 669 # allocated, allowing only 2000 more to be claimed 587 self.failUnlessEqual(len(ss._ active_writers), 3)670 self.failUnlessEqual(len(ss._bucket_writers), 3) 588 671 589 672 # allocating 1001-byte shares only leaves room for one 590 already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary) 673 canary2 = FakeCanary() 674 already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) 591 675 self.failUnlessEqual(len(writers2), 1) 592 self.failUnlessEqual(len(ss._ active_writers), 4)676 self.failUnlessEqual(len(ss._bucket_writers), 4) 593 677 594 678 # we abandon the first set, so their provisional allocation should be 595 679 # returned 596 597 del already 598 del writers 599 gc.collect() 600 601 self.failUnlessEqual(len(ss._active_writers), 1) 680 canary.disconnected() 681 682 self.failUnlessEqual(len(ss._bucket_writers), 1) 602 683 # now we have a provisional allocation of 1001 bytes 603 684 … … 608 689 bw.remote_write(0, b"a"*25) 609 690 bw.remote_close() 610 del already2 611 del writers2 612 del bw 613 self.failUnlessEqual(len(ss._active_writers), 0) 691 self.failUnlessEqual(len(ss._bucket_writers), 0) 614 692 615 693 # this also changes the amount reported as available by call_get_disk_stats … … 618 696 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and 619 697 # 5000-1085=3915 free, therefore we can fit 39 100byte shares 620 already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary) 698 canary3 = FakeCanary() 699 already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) 621 700 self.failUnlessEqual(len(writers3), 39) 622 self.failUnlessEqual(len(ss._active_writers), 39) 623 624 del already3 625 del writers3 626 gc.collect() 627 628 self.failUnlessEqual(len(ss._active_writers), 0) 701 self.failUnlessEqual(len(ss._bucket_writers), 39) 702 703 canary3.disconnected() 704 705 self.failUnlessEqual(len(ss._bucket_writers), 0) 629 706 ss.disownServiceParent() 630 707 del ss … … 1075 1152 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) 1076 1153 1077 # as should this one1078 answer = write(b"si1", secrets,1079 {0: ([(10, 5, b"lt", b"11111"),1080 ],1081 [(0, b"x"*100)],1082 None),1083 },1084 [(10,5)],1085 )1086 self.failUnlessEqual(answer, (False,1087 {0: [b"11111"],1088 1: [b""],1089 2: [b""]},1090 ))1091 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1092 1093 1094 1154 def test_operators(self): 1095 1155 # test operators, the data we're comparing is '11111' in all cases. … … 1109 1169 []) 1110 1170 1111 reset()1112 1113 # lt1114 answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11110"),1115 ],1116 [(0, b"x"*100)],1117 None,1118 )}, [(10,5)])1119 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1120 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1121 self.failUnlessEqual(read(b"si1", [], [(0,100)]), {0: [data]})1122 reset()1123 1124 answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11111"),1125 ],1126 [(0, b"x"*100)],1127 None,1128 )}, [(10,5)])1129 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1130 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1131 reset()1132 1133 answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11112"),1134 ],1135 [(0, b"y"*100)],1136 None,1137 )}, [(10,5)])1138 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1139 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1140 reset()1141 1142 # le1143 answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11110"),1144 ],1145 [(0, b"x"*100)],1146 None,1147 )}, [(10,5)])1148 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1149 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1150 reset()1151 1152 answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11111"),1153 ],1154 [(0, b"y"*100)],1155 None,1156 )}, [(10,5)])1157 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1158 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1159 reset()1160 1161 answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11112"),1162 ],1163 [(0, b"y"*100)],1164 None,1165 )}, [(10,5)])1166 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1167 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1168 1171 reset() 1169 1172 … … 1185 1188 self.failUnlessEqual(answer, (True, {0: [b"11111"]})) 1186 1189 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) 1187 reset()1188 1189 # ne1190 answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11111"),1191 ],1192 [(0, b"x"*100)],1193 None,1194 )}, [(10,5)])1195 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1196 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1197 reset()1198 1199 answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11112"),1200 ],1201 [(0, b"y"*100)],1202 None,1203 )}, [(10,5)])1204 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1205 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1206 reset()1207 1208 # ge1209 answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11110"),1210 ],1211 [(0, b"y"*100)],1212 None,1213 )}, [(10,5)])1214 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1215 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1216 reset()1217 1218 answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11111"),1219 ],1220 [(0, b"y"*100)],1221 None,1222 )}, [(10,5)])1223 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1224 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1225 reset()1226 1227 answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11112"),1228 ],1229 [(0, b"y"*100)],1230 None,1231 )}, [(10,5)])1232 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1233 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1234 reset()1235 1236 # gt1237 answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11110"),1238 ],1239 [(0, b"y"*100)],1240 None,1241 )}, [(10,5)])1242 self.failUnlessEqual(answer, (True, {0: [b"11111"]}))1243 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})1244 reset()1245 1246 answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11111"),1247 ],1248 [(0, b"x"*100)],1249 None,1250 )}, [(10,5)])1251 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1252 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1253 reset()1254 1255 answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11112"),1256 ],1257 [(0, b"x"*100)],1258 None,1259 )}, [(10,5)])1260 self.failUnlessEqual(answer, (False, {0: [b"11111"]}))1261 self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})1262 1190 reset() 1263 1191 -
TabularUnified src/allmydata/web/status.py ¶
r49b6080 rbf670c0 15 15 import itertools 16 16 import hashlib 17 import re 17 18 from twisted.internet import defer 18 19 from twisted.python.filepath import FilePath … … 1552 1553 return json.dumps(stats, indent=1) + "\n" 1553 1554 1555 @render_exception 1556 def render_OPENMETRICS(self, req): 1557 """ 1558 Render our stats in `OpenMetrics <https://openmetrics.io/>` format. 1559 For example Prometheus and Victoriametrics can parse this. 1560 Point the scraper to ``/statistics?t=openmetrics`` (instead of the 1561 default ``/metrics``). 1562 """ 1563 req.setHeader("content-type", "application/openmetrics-text; version=1.0.0; charset=utf-8") 1564 stats = self._provider.get_stats() 1565 ret = [] 1566 1567 def mangle_name(name): 1568 return re.sub( 1569 u"_(\d\d)_(\d)_percentile", 1570 u'{quantile="0.\g<1>\g<2>"}', 1571 name.replace(u".", u"_") 1572 ) 1573 1574 def mangle_value(val): 1575 return str(val) if val is not None else u"NaN" 1576 1577 for (k, v) in sorted(stats['counters'].items()): 1578 ret.append(u"tahoe_counters_%s %s" % (mangle_name(k), mangle_value(v))) 1579 for (k, v) in sorted(stats['stats'].items()): 1580 ret.append(u"tahoe_stats_%s %s" % (mangle_name(k), mangle_value(v))) 1581 1582 ret.append(u"# EOF\n") 1583 1584 return u"\n".join(ret) 1585 1554 1586 class StatisticsElement(Element): 1555 1587
Note: See TracChangeset
for help on using the changeset viewer.