source: trunk/src/allmydata/storage/http_client.py

Last change on this file was 0b13875, checked in by Florian Sesser <florian@…>, at 2024-11-14T12:04:01Z

Remove now-unused imports to fix codechecks

  • Property mode set to 100644
File size: 41.4 KB
Line 
1"""
2HTTP client that talks to the HTTP storage server.
3"""
4
5from __future__ import annotations
6
7
8from typing import (
9    Optional,
10    Sequence,
11    Mapping,
12    BinaryIO,
13    cast,
14    TypedDict,
15    Set,
16    Dict,
17    Callable,
18    ClassVar,
19)
20from base64 import b64encode
21from io import BytesIO
22from os import SEEK_END
23
24from attrs import define, asdict, frozen, field
25from eliot import start_action, register_exception_extractor
26from eliot.twisted import DeferredContext
27
28from pycddl import Schema
29from collections_extended import RangeMap
30from werkzeug.datastructures import Range, ContentRange
31from twisted.web.http_headers import Headers
32from twisted.web import http
33from twisted.web.iweb import IPolicyForHTTPS, IResponse, IAgent
34from twisted.internet.defer import Deferred, succeed
35from twisted.internet.interfaces import (
36    IOpenSSLClientConnectionCreator,
37    IReactorTime,
38    IDelayedCall,
39)
40from twisted.internet.ssl import CertificateOptions
41from twisted.protocols.tls import TLSMemoryBIOProtocol
42from twisted.web.client import Agent, HTTPConnectionPool
43from zope.interface import implementer
44from hyperlink import DecodedURL
45import treq
46from treq.client import HTTPClient
47from OpenSSL import SSL
48from werkzeug.http import parse_content_range_header
49
50from .http_common import (
51    swissnum_auth_header,
52    Secrets,
53    get_content_type,
54    CBOR_MIME_TYPE,
55    get_spki_hash,
56    response_is_not_html,
57)
58from ..interfaces import VersionMessage
59from .common import si_b2a, si_to_human_readable
60from ..util.hashutil import timing_safe_compare
61from ..util.deferredutil import async_to_deferred
62from ..util.tor_provider import _Provider as TorProvider
63from ..util.cputhreadpool import defer_to_thread
64from ..util.cbor import dumps
65
66try:
67    from txtorcon import Tor  # type: ignore
68except ImportError:
69
70    class Tor:  # type: ignore[no-redef]
71        pass
72
73
74def _encode_si(si: bytes) -> str:
75    """Encode the storage index into Unicode string."""
76    return str(si_b2a(si), "ascii")
77
78
79class ClientException(Exception):
80    """An unexpected response code from the server."""
81
82    def __init__(
83        self, code: int, message: Optional[str] = None, body: Optional[bytes] = None
84    ):
85        Exception.__init__(self, code, message, body)
86        self.code = code
87        self.message = message
88        self.body = body
89
90
91register_exception_extractor(ClientException, lambda e: {"response_code": e.code})
92
93
94# Schemas for server responses.
95#
96# Tags are of the form #6.nnn, where the number is documented at
97# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
98# indicates a set.
99_SCHEMAS: Mapping[str, Schema] = {
100    "get_version": Schema(
101        # Note that the single-quoted (`'`) string keys in this schema
102        # represent *byte* strings - per the CDDL specification.  Text strings
103        # are represented using strings with *double* quotes (`"`).
104        """
105        response = {'http://allmydata.org/tahoe/protocols/storage/v1' => {
106                 'maximum-immutable-share-size' => uint
107                 'maximum-mutable-share-size' => uint
108                 'available-space' => uint
109                 }
110                 'application-version' => bstr
111              }
112    """
113    ),
114    "allocate_buckets": Schema(
115        """
116    response = {
117      already-have: #6.258([0*256 uint])
118      allocated: #6.258([0*256 uint])
119    }
120    """
121    ),
122    "immutable_write_share_chunk": Schema(
123        """
124    response = {
125      required: [0* {begin: uint, end: uint}]
126    }
127    """
128    ),
129    "list_shares": Schema(
130        """
131    response = #6.258([0*256 uint])
132    """
133    ),
134    "mutable_read_test_write": Schema(
135        """
136        response = {
137          "success": bool,
138          "data": {0*256 share_number: [0* bstr]}
139        }
140        share_number = uint
141        """
142    ),
143    "mutable_list_shares": Schema(
144        """
145        response = #6.258([0*256 uint])
146        """
147    ),
148}
149
150
151@define
152class _LengthLimitedCollector:
153    """
154    Collect data using ``treq.collect()``, with limited length.
155    """
156
157    remaining_length: int
158    timeout_on_silence: IDelayedCall
159    f: BytesIO = field(factory=BytesIO)
160
161    def __call__(self, data: bytes) -> None:
162        self.timeout_on_silence.reset(60)
163        self.remaining_length -= len(data)
164        if self.remaining_length < 0:
165            raise ValueError("Response length was too long")
166        self.f.write(data)
167
168
169def limited_content(
170    response: IResponse,
171    clock: IReactorTime,
172    max_length: int = 30 * 1024 * 1024,
173) -> Deferred[BinaryIO]:
174    """
175    Like ``treq.content()``, but limit data read from the response to a set
176    length.  If the response is longer than the max allowed length, the result
177    fails with a ``ValueError``.
178
179    A potentially useful future improvement would be using a temporary file to
180    store the content; since filesystem buffering means that would use memory
181    for small responses and disk for large responses.
182
183    This will time out if no data is received for 60 seconds; so long as a
184    trickle of data continues to arrive, it will continue to run.
185    """
186    result_deferred = succeed(None)
187
188    # Sadly, addTimeout() won't work because we need access to the IDelayedCall
189    # in order to reset it on each data chunk received.
190    timeout = clock.callLater(60, result_deferred.cancel)
191    collector = _LengthLimitedCollector(max_length, timeout)
192
193    with start_action(
194        action_type="allmydata:storage:http-client:limited-content",
195        max_length=max_length,
196    ).context():
197        d = DeferredContext(result_deferred)
198
199    # Make really sure everything gets called in Deferred context, treq might
200    # call collector directly...
201    d.addCallback(lambda _: treq.collect(response, collector))
202
203    def done(_: object) -> BytesIO:
204        timeout.cancel()
205        collector.f.seek(0)
206        return collector.f
207
208    def failed(f):
209        if timeout.active():
210            timeout.cancel()
211        return f
212
213    result = d.addCallbacks(done, failed)
214    return result.addActionFinish()
215
216
217@define
218class ImmutableCreateResult(object):
219    """Result of creating a storage index for an immutable."""
220
221    already_have: set[int]
222    allocated: set[int]
223
224
225class _TLSContextFactory(CertificateOptions):
226    """
227    Create a context that validates the way Tahoe-LAFS wants to: based on a
228    pinned certificate hash, rather than a certificate authority.
229
230    Originally implemented as part of Foolscap.  To comply with the license,
231    here's the original licensing terms:
232
233    Copyright (c) 2006-2008 Brian Warner
234
235    Permission is hereby granted, free of charge, to any person obtaining a
236    copy of this software and associated documentation files (the "Software"),
237    to deal in the Software without restriction, including without limitation
238    the rights to use, copy, modify, merge, publish, distribute, sublicense,
239    and/or sell copies of the Software, and to permit persons to whom the
240    Software is furnished to do so, subject to the following conditions:
241
242    The above copyright notice and this permission notice shall be included in
243    all copies or substantial portions of the Software.
244
245    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
246    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
247    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
248    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
249    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
250    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
251    DEALINGS IN THE SOFTWARE.
252    """
253
254    def __init__(self, expected_spki_hash: bytes):
255        self.expected_spki_hash = expected_spki_hash
256        CertificateOptions.__init__(self)
257
258    def getContext(self) -> SSL.Context:
259        def always_validate(conn, cert, errno, depth, preverify_ok):
260            # This function is called to validate the certificate received by
261            # the other end. OpenSSL calls it multiple times, for each errno
262            # for each certificate.
263
264            # We do not care about certificate authorities or revocation
265            # lists, we just want to know that the certificate has a valid
266            # signature and follow the chain back to one which is
267            # self-signed. We need to protect against forged signatures, but
268            # not the usual TLS concerns about invalid CAs or revoked
269            # certificates.
270            things_are_ok = (
271                SSL.X509VerificationCodes.OK,
272                SSL.X509VerificationCodes.ERR_CERT_NOT_YET_VALID,
273                SSL.X509VerificationCodes.ERR_CERT_HAS_EXPIRED,
274                SSL.X509VerificationCodes.ERR_DEPTH_ZERO_SELF_SIGNED_CERT,
275                SSL.X509VerificationCodes.ERR_SELF_SIGNED_CERT_IN_CHAIN,
276            )
277            # TODO can we do this once instead of multiple times?
278            if errno in things_are_ok and timing_safe_compare(
279                get_spki_hash(cert.to_cryptography()), self.expected_spki_hash
280            ):
281                return 1
282            # TODO: log the details of the error, because otherwise they get
283            # lost in the PyOpenSSL exception that will eventually be raised
284            # (possibly OpenSSL.SSL.Error: certificate verify failed)
285            return 0
286
287        ctx = CertificateOptions.getContext(self)
288
289        # VERIFY_PEER means we ask the the other end for their certificate.
290        ctx.set_verify(SSL.VERIFY_PEER, always_validate)
291        return ctx
292
293
294@implementer(IPolicyForHTTPS)
295@implementer(IOpenSSLClientConnectionCreator)
296@define
297class _StorageClientHTTPSPolicy:
298    """
299    A HTTPS policy that ensures the SPKI hash of the public key matches a known
300    hash, i.e. pinning-based validation.
301    """
302
303    expected_spki_hash: bytes
304
305    # IPolicyForHTTPS
306    def creatorForNetloc(self, hostname: str, port: int) -> _StorageClientHTTPSPolicy:
307        return self
308
309    # IOpenSSLClientConnectionCreator
310    def clientConnectionForTLS(
311        self, tlsProtocol: TLSMemoryBIOProtocol
312    ) -> SSL.Connection:
313        return SSL.Connection(
314            _TLSContextFactory(self.expected_spki_hash).getContext(), None
315        )
316
317
318@define
319class StorageClientFactory:
320    """
321    Create ``StorageClient`` instances, using appropriate
322    ``twisted.web.iweb.IAgent`` for different connection methods: normal TCP,
323    Tor, and eventually I2P.
324
325    There is some caching involved since there might be shared setup work, e.g.
326    connecting to the local Tor service only needs to happen once.
327    """
328
329    _default_connection_handlers: dict[str, str]
330    _tor_provider: Optional[TorProvider]
331    # Cache the Tor instance created by the provider, if relevant.
332    _tor_instance: Optional[Tor] = None
333
334    # If set, we're doing unit testing and we should call this with any
335    # HTTPConnectionPool that gets passed/created to ``create_agent()``.
336    TEST_MODE_REGISTER_HTTP_POOL: ClassVar[
337        Optional[Callable[[HTTPConnectionPool], None]]
338    ] = None
339
340    @classmethod
341    def start_test_mode(cls, callback: Callable[[HTTPConnectionPool], None]) -> None:
342        """Switch to testing mode.
343
344        In testing mode we register the pool with test system using the given
345        callback so it can Do Things, most notably killing off idle HTTP
346        connections at test shutdown and, in some tests, in the midddle of the
347        test.
348        """
349        cls.TEST_MODE_REGISTER_HTTP_POOL = callback
350
351    @classmethod
352    def stop_test_mode(cls) -> None:
353        """Stop testing mode."""
354        cls.TEST_MODE_REGISTER_HTTP_POOL = None
355
356    async def _create_agent(
357        self,
358        nurl: DecodedURL,
359        reactor: object,
360        tls_context_factory: IPolicyForHTTPS,
361        pool: HTTPConnectionPool,
362    ) -> IAgent:
363        """Create a new ``IAgent``, possibly using Tor."""
364        if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
365            self.TEST_MODE_REGISTER_HTTP_POOL(pool)
366
367        # TODO default_connection_handlers should really be an object, not a
368        # dict, so we can ask "is this using Tor" without poking at a
369        # dictionary with arbitrary strings... See
370        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032
371        handler = self._default_connection_handlers["tcp"]
372
373        if handler == "tcp":
374            return Agent(reactor, tls_context_factory, pool=pool)
375        if handler == "tor" or nurl.scheme == "pb+tor":
376            assert self._tor_provider is not None
377            if self._tor_instance is None:
378                self._tor_instance = await self._tor_provider.get_tor_instance(reactor)
379            return self._tor_instance.web_agent(
380                pool=pool, tls_context_factory=tls_context_factory
381            )
382        else:
383            # I2P support will be added here. See
384            # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4037
385            raise RuntimeError(f"Unsupported tcp connection handler: {handler}")
386
387    async def create_storage_client(
388        self,
389        nurl: DecodedURL,
390        reactor: IReactorTime,
391        pool: Optional[HTTPConnectionPool] = None,
392    ) -> StorageClient:
393        """Create a new ``StorageClient`` for the given NURL."""
394        assert nurl.fragment == "v=1"
395        assert nurl.scheme in ("pb", "pb+tor")
396        if pool is None:
397            pool = HTTPConnectionPool(reactor)
398            pool.maxPersistentPerHost = 10
399
400        certificate_hash = nurl.user.encode("ascii")
401        agent = await self._create_agent(
402            nurl,
403            reactor,
404            _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash),
405            pool,
406        )
407        treq_client = HTTPClient(agent)
408        https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
409        swissnum = nurl.path[0].encode("ascii")
410        response_check = lambda _: None
411        if self.TEST_MODE_REGISTER_HTTP_POOL is not None:
412            response_check = response_is_not_html
413
414        return StorageClient(
415            https_url,
416            swissnum,
417            treq_client,
418            pool,
419            reactor,
420            response_check,
421        )
422
423
424@define(hash=True)
425class StorageClient(object):
426    """
427    Low-level HTTP client that talks to the HTTP storage server.
428
429    Create using a ``StorageClientFactory`` instance.
430    """
431
432    # The URL should be a HTTPS URL ("https://...")
433    _base_url: DecodedURL
434    _swissnum: bytes
435    _treq: HTTPClient
436    _pool: HTTPConnectionPool
437    _clock: IReactorTime
438    # Are we running unit tests?
439    _analyze_response: Callable[[IResponse], None] = lambda _: None
440
441    def relative_url(self, path: str) -> DecodedURL:
442        """Get a URL relative to the base URL."""
443        return self._base_url.click(path)
444
445    def _get_headers(self, headers: Optional[Headers]) -> Headers:
446        """Return the basic headers to be used by default."""
447        if headers is None:
448            headers = Headers()
449        headers.addRawHeader(
450            "Authorization",
451            swissnum_auth_header(self._swissnum),
452        )
453        return headers
454
455    @async_to_deferred
456    async def request(
457        self,
458        method: str,
459        url: DecodedURL,
460        lease_renew_secret: Optional[bytes] = None,
461        lease_cancel_secret: Optional[bytes] = None,
462        upload_secret: Optional[bytes] = None,
463        write_enabler_secret: Optional[bytes] = None,
464        headers: Optional[Headers] = None,
465        message_to_serialize: object = None,
466        timeout: float = 60,
467        **kwargs,
468    ) -> IResponse:
469        """
470        Like ``treq.request()``, but with optional secrets that get translated
471        into corresponding HTTP headers.
472
473        If ``message_to_serialize`` is set, it will be serialized (by default
474        with CBOR) and set as the request body.  It should not be mutated
475        during execution of this function!
476
477        Default timeout is 60 seconds.
478        """
479        with start_action(
480            action_type="allmydata:storage:http-client:request",
481            method=method,
482            url=url.to_text(),
483            timeout=timeout,
484        ) as ctx:
485            response = await self._request(
486                method,
487                url,
488                lease_renew_secret,
489                lease_cancel_secret,
490                upload_secret,
491                write_enabler_secret,
492                headers,
493                message_to_serialize,
494                timeout,
495                **kwargs,
496            )
497            ctx.add_success_fields(response_code=response.code)
498            return response
499
500    async def _request(
501        self,
502        method: str,
503        url: DecodedURL,
504        lease_renew_secret: Optional[bytes] = None,
505        lease_cancel_secret: Optional[bytes] = None,
506        upload_secret: Optional[bytes] = None,
507        write_enabler_secret: Optional[bytes] = None,
508        headers: Optional[Headers] = None,
509        message_to_serialize: object = None,
510        timeout: float = 60,
511        **kwargs,
512    ) -> IResponse:
513        """The implementation of request()."""
514        headers = self._get_headers(headers)
515
516        # Add secrets:
517        for secret, value in [
518            (Secrets.LEASE_RENEW, lease_renew_secret),
519            (Secrets.LEASE_CANCEL, lease_cancel_secret),
520            (Secrets.UPLOAD, upload_secret),
521            (Secrets.WRITE_ENABLER, write_enabler_secret),
522        ]:
523            if value is None:
524                continue
525            headers.addRawHeader(
526                "X-Tahoe-Authorization",
527                b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()),
528            )
529
530        # Note we can accept CBOR:
531        headers.addRawHeader("Accept", CBOR_MIME_TYPE)
532
533        # If there's a request message, serialize it and set the Content-Type
534        # header:
535        if message_to_serialize is not None:
536            if "data" in kwargs:
537                raise TypeError(
538                    "Can't use both `message_to_serialize` and `data` "
539                    "as keyword arguments at the same time"
540                )
541            kwargs["data"] = await defer_to_thread(dumps, message_to_serialize)
542            headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
543
544        response = await self._treq.request(
545            method, url, headers=headers, timeout=timeout, **kwargs
546        )
547        self._analyze_response(response)
548
549        return response
550
551    async def decode_cbor(self, response: IResponse, schema: Schema) -> object:
552        """Given HTTP response, return decoded CBOR body."""
553        with start_action(action_type="allmydata:storage:http-client:decode-cbor"):
554            if response.code > 199 and response.code < 300:
555                content_type = get_content_type(response.headers)
556                if content_type == CBOR_MIME_TYPE:
557                    f = await limited_content(response, self._clock)
558                    data = f.read()
559
560                    def validate_and_decode():
561                        return schema.validate_cbor(data, True)
562
563                    return await defer_to_thread(validate_and_decode)
564                else:
565                    raise ClientException(
566                        -1,
567                        "Server didn't send CBOR, content type is {}".format(
568                            content_type
569                        ),
570                    )
571            else:
572                data = (
573                    await limited_content(response, self._clock, max_length=10_000)
574                ).read()
575                raise ClientException(response.code, response.phrase, data)
576
577    def shutdown(self) -> Deferred[object]:
578        """Shutdown any connections."""
579        return self._pool.closeCachedConnections()
580
581
582@define(hash=True)
583class StorageClientGeneral(object):
584    """
585    High-level HTTP APIs that aren't immutable- or mutable-specific.
586    """
587
588    _client: StorageClient
589
590    @async_to_deferred
591    async def get_version(self) -> VersionMessage:
592        """
593        Return the version metadata for the server.
594        """
595        with start_action(
596            action_type="allmydata:storage:http-client:get-version",
597        ):
598            return await self._get_version()
599
600    async def _get_version(self) -> VersionMessage:
601        """Implementation of get_version()."""
602        url = self._client.relative_url("/storage/v1/version")
603        response = await self._client.request("GET", url)
604        decoded_response = cast(
605            Dict[bytes, object],
606            await self._client.decode_cbor(response, _SCHEMAS["get_version"]),
607        )
608        # Add some features we know are true because the HTTP API
609        # specification requires them and because other parts of the storage
610        # client implementation assumes they will be present.
611        cast(
612            Dict[bytes, object],
613            decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"],
614        ).update(
615            {
616                b"tolerates-immutable-read-overrun": True,
617                b"delete-mutable-shares-with-zero-length-writev": True,
618                b"fills-holes-with-zero-bytes": True,
619                b"prevents-read-past-end-of-share-data": True,
620            }
621        )
622        return decoded_response
623
624    @async_to_deferred
625    async def add_or_renew_lease(
626        self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
627    ) -> None:
628        """
629        Add or renew a lease.
630
631        If the renewal secret matches an existing lease, it is renewed.
632        Otherwise a new lease is added.
633        """
634        with start_action(
635            action_type="allmydata:storage:http-client:add-or-renew-lease",
636            storage_index=si_to_human_readable(storage_index),
637        ):
638            return await self._add_or_renew_lease(
639                storage_index, renew_secret, cancel_secret
640            )
641
642    async def _add_or_renew_lease(
643        self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes
644    ) -> None:
645        url = self._client.relative_url(
646            "/storage/v1/lease/{}".format(_encode_si(storage_index))
647        )
648        response = await self._client.request(
649            "PUT",
650            url,
651            lease_renew_secret=renew_secret,
652            lease_cancel_secret=cancel_secret,
653        )
654
655        if response.code == http.NO_CONTENT:
656            return
657        else:
658            raise ClientException(response.code)
659
660
661@define
662class UploadProgress(object):
663    """
664    Progress of immutable upload, per the server.
665    """
666
667    # True when upload has finished.
668    finished: bool
669    # Remaining ranges to upload.
670    required: RangeMap
671
672
673@async_to_deferred
674async def read_share_chunk(
675    client: StorageClient,
676    share_type: str,
677    storage_index: bytes,
678    share_number: int,
679    offset: int,
680    length: int,
681) -> bytes:
682    """
683    Download a chunk of data from a share.
684
685    TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed downloads
686    should be transparently retried and redownloaded by the implementation a
687    few times so that if a failure percolates up, the caller can assume the
688    failure isn't a short-term blip.
689
690    NOTE: the underlying HTTP protocol is somewhat more flexible than this API,
691    insofar as it doesn't always require a range.  In practice a range is
692    always provided by the current callers.
693    """
694    url = client.relative_url(
695        "/storage/v1/{}/{}/{}".format(
696            share_type, _encode_si(storage_index), share_number
697        )
698    )
699    # The default 60 second timeout is for getting the response, so it doesn't
700    # include the time it takes to download the body... so we will will deal
701    # with that later, via limited_content().
702    response = await client.request(
703        "GET",
704        url,
705        headers=Headers(
706            # Ranges in HTTP are _inclusive_, Python's convention is exclusive,
707            # but Range constructor does that the conversion for us.
708            {"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
709        ),
710        unbuffered=True,  # Don't buffer the response in memory.
711    )
712
713    if response.code == http.NO_CONTENT:
714        return b""
715
716    content_type = get_content_type(response.headers)
717    if content_type != "application/octet-stream":
718        raise ValueError(
719            f"Content-type was wrong: {content_type}, should be application/octet-stream"
720        )
721
722    if response.code == http.PARTIAL_CONTENT:
723        content_range = parse_content_range_header(
724            response.headers.getRawHeaders("content-range")[0] or ""
725        )
726        if (
727            content_range is None
728            or content_range.stop is None
729            or content_range.start is None
730        ):
731            raise ValueError(
732                "Content-Range was missing, invalid, or in format we don't support"
733            )
734        supposed_length = content_range.stop - content_range.start
735        if supposed_length > length:
736            raise ValueError("Server sent more than we asked for?!")
737        # It might also send less than we asked for. That's (probably) OK, e.g.
738        # if we went past the end of the file.
739        body = await limited_content(response, client._clock, supposed_length)
740        body.seek(0, SEEK_END)
741        actual_length = body.tell()
742        if actual_length != supposed_length:
743            # Most likely a mutable that got changed out from under us, but
744            # conceivably could be a bug...
745            raise ValueError(
746                f"Length of response sent from server ({actual_length}) "
747                + f"didn't match Content-Range header ({supposed_length})"
748            )
749        body.seek(0)
750        return body.read()
751    else:
752        # Technically HTTP allows sending an OK with full body under these
753        # circumstances, but the server is not designed to do that so we ignore
754        # that possibility for now...
755        raise ClientException(response.code)
756
757
758@async_to_deferred
759async def advise_corrupt_share(
760    client: StorageClient,
761    share_type: str,
762    storage_index: bytes,
763    share_number: int,
764    reason: str,
765) -> None:
766    assert isinstance(reason, str)
767    url = client.relative_url(
768        "/storage/v1/{}/{}/{}/corrupt".format(
769            share_type, _encode_si(storage_index), share_number
770        )
771    )
772    message = {"reason": reason}
773    response = await client.request("POST", url, message_to_serialize=message)
774    if response.code == http.OK:
775        return
776    else:
777        raise ClientException(
778            response.code,
779        )
780
781
782@define(hash=True)
783class StorageClientImmutables(object):
784    """
785    APIs for interacting with immutables.
786    """
787
788    _client: StorageClient
789
790    @async_to_deferred
791    async def create(
792        self,
793        storage_index: bytes,
794        share_numbers: set[int],
795        allocated_size: int,
796        upload_secret: bytes,
797        lease_renew_secret: bytes,
798        lease_cancel_secret: bytes,
799    ) -> ImmutableCreateResult:
800        """
801        Create a new storage index for an immutable.
802
803        TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 retry
804        internally on failure, to ensure the operation fully succeeded.  If
805        sufficient number of failures occurred, the result may fire with an
806        error, but there's no expectation that user code needs to have a
807        recovery codepath; it will most likely just report an error to the
808        user.
809
810        Result fires when creating the storage index succeeded, if creating the
811        storage index failed the result will fire with an exception.
812        """
813        with start_action(
814            action_type="allmydata:storage:http-client:immutable:create",
815            storage_index=si_to_human_readable(storage_index),
816            share_numbers=share_numbers,
817            allocated_size=allocated_size,
818        ) as ctx:
819            result = await self._create(
820                storage_index,
821                share_numbers,
822                allocated_size,
823                upload_secret,
824                lease_renew_secret,
825                lease_cancel_secret,
826            )
827            ctx.add_success_fields(
828                already_have=result.already_have, allocated=result.allocated
829            )
830            return result
831
832    async def _create(
833        self,
834        storage_index: bytes,
835        share_numbers: set[int],
836        allocated_size: int,
837        upload_secret: bytes,
838        lease_renew_secret: bytes,
839        lease_cancel_secret: bytes,
840    ) -> ImmutableCreateResult:
841        """Implementation of create()."""
842        url = self._client.relative_url(
843            "/storage/v1/immutable/" + _encode_si(storage_index)
844        )
845        message = {"share-numbers": share_numbers, "allocated-size": allocated_size}
846
847        response = await self._client.request(
848            "POST",
849            url,
850            lease_renew_secret=lease_renew_secret,
851            lease_cancel_secret=lease_cancel_secret,
852            upload_secret=upload_secret,
853            message_to_serialize=message,
854        )
855        decoded_response = cast(
856            Mapping[str, Set[int]],
857            await self._client.decode_cbor(response, _SCHEMAS["allocate_buckets"]),
858        )
859        return ImmutableCreateResult(
860            already_have=decoded_response["already-have"],
861            allocated=decoded_response["allocated"],
862        )
863
864    @async_to_deferred
865    async def abort_upload(
866        self, storage_index: bytes, share_number: int, upload_secret: bytes
867    ) -> None:
868        """Abort the upload."""
869        with start_action(
870            action_type="allmydata:storage:http-client:immutable:abort-upload",
871            storage_index=si_to_human_readable(storage_index),
872            share_number=share_number,
873        ):
874            return await self._abort_upload(storage_index, share_number, upload_secret)
875
876    async def _abort_upload(
877        self, storage_index: bytes, share_number: int, upload_secret: bytes
878    ) -> None:
879        """Implementation of ``abort_upload()``."""
880        url = self._client.relative_url(
881            "/storage/v1/immutable/{}/{}/abort".format(
882                _encode_si(storage_index), share_number
883            )
884        )
885        response = await self._client.request(
886            "PUT",
887            url,
888            upload_secret=upload_secret,
889        )
890
891        if response.code == http.OK:
892            return
893        else:
894            raise ClientException(
895                response.code,
896            )
897
898    @async_to_deferred
899    async def write_share_chunk(
900        self,
901        storage_index: bytes,
902        share_number: int,
903        upload_secret: bytes,
904        offset: int,
905        data: bytes,
906    ) -> UploadProgress:
907        """
908        Upload a chunk of data for a specific share.
909
910        TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 The
911        implementation should retry failed uploads transparently a number of
912        times, so that if a failure percolates up, the caller can assume the
913        failure isn't a short-term blip.
914
915        Result fires when the upload succeeded, with a boolean indicating
916        whether the _complete_ share (i.e. all chunks, not just this one) has
917        been uploaded.
918        """
919        with start_action(
920            action_type="allmydata:storage:http-client:immutable:write-share-chunk",
921            storage_index=si_to_human_readable(storage_index),
922            share_number=share_number,
923            offset=offset,
924            data_len=len(data),
925        ) as ctx:
926            result = await self._write_share_chunk(
927                storage_index, share_number, upload_secret, offset, data
928            )
929            ctx.add_success_fields(finished=result.finished)
930            return result
931
932    async def _write_share_chunk(
933        self,
934        storage_index: bytes,
935        share_number: int,
936        upload_secret: bytes,
937        offset: int,
938        data: bytes,
939    ) -> UploadProgress:
940        """Implementation of ``write_share_chunk()``."""
941        url = self._client.relative_url(
942            "/storage/v1/immutable/{}/{}".format(
943                _encode_si(storage_index), share_number
944            )
945        )
946        response = await self._client.request(
947            "PATCH",
948            url,
949            upload_secret=upload_secret,
950            data=data,
951            headers=Headers(
952                {
953                    "content-range": [
954                        ContentRange("bytes", offset, offset + len(data)).to_header()
955                    ]
956                }
957            ),
958        )
959
960        if response.code == http.OK:
961            # Upload is still unfinished.
962            finished = False
963        elif response.code == http.CREATED:
964            # Upload is done!
965            finished = True
966        else:
967            raise ClientException(
968                response.code,
969            )
970        body = cast(
971            Mapping[str, Sequence[Mapping[str, int]]],
972            await self._client.decode_cbor(
973                response, _SCHEMAS["immutable_write_share_chunk"]
974            ),
975        )
976        remaining = RangeMap()
977        for chunk in body["required"]:
978            remaining.set(True, chunk["begin"], chunk["end"])
979        return UploadProgress(finished=finished, required=remaining)
980
981    @async_to_deferred
982    async def read_share_chunk(
983        self, storage_index: bytes, share_number: int, offset: int, length: int
984    ) -> bytes:
985        """
986        Download a chunk of data from a share.
987        """
988        with start_action(
989            action_type="allmydata:storage:http-client:immutable:read-share-chunk",
990            storage_index=si_to_human_readable(storage_index),
991            share_number=share_number,
992            offset=offset,
993            length=length,
994        ) as ctx:
995            result = await read_share_chunk(
996                self._client, "immutable", storage_index, share_number, offset, length
997            )
998            ctx.add_success_fields(data_len=len(result))
999            return result
1000
1001    @async_to_deferred
1002    async def list_shares(self, storage_index: bytes) -> Set[int]:
1003        """
1004        Return the set of shares for a given storage index.
1005        """
1006        with start_action(
1007            action_type="allmydata:storage:http-client:immutable:list-shares",
1008            storage_index=si_to_human_readable(storage_index),
1009        ) as ctx:
1010            result = await self._list_shares(storage_index)
1011            ctx.add_success_fields(shares=result)
1012            return result
1013
1014    async def _list_shares(self, storage_index: bytes) -> Set[int]:
1015        """Implementation of ``list_shares()``."""
1016        url = self._client.relative_url(
1017            "/storage/v1/immutable/{}/shares".format(_encode_si(storage_index))
1018        )
1019        response = await self._client.request(
1020            "GET",
1021            url,
1022        )
1023        if response.code == http.OK:
1024            return cast(
1025                Set[int],
1026                await self._client.decode_cbor(response, _SCHEMAS["list_shares"]),
1027            )
1028        else:
1029            raise ClientException(response.code)
1030
1031    @async_to_deferred
1032    async def advise_corrupt_share(
1033        self,
1034        storage_index: bytes,
1035        share_number: int,
1036        reason: str,
1037    ) -> None:
1038        """Indicate a share has been corrupted, with a human-readable message."""
1039        with start_action(
1040            action_type="allmydata:storage:http-client:immutable:advise-corrupt-share",
1041            storage_index=si_to_human_readable(storage_index),
1042            share_number=share_number,
1043            reason=reason,
1044        ):
1045            await advise_corrupt_share(
1046                self._client, "immutable", storage_index, share_number, reason
1047            )
1048
1049
1050@frozen
1051class WriteVector:
1052    """Data to write to a chunk."""
1053
1054    offset: int
1055    data: bytes
1056
1057
1058@frozen
1059class TestVector:
1060    """Checks to make on a chunk before writing to it."""
1061
1062    offset: int
1063    size: int
1064    specimen: bytes
1065
1066
1067@frozen
1068class ReadVector:
1069    """
1070    Reads to do on chunks, as part of a read/test/write operation.
1071    """
1072
1073    offset: int
1074    size: int
1075
1076
1077@frozen
1078class TestWriteVectors:
1079    """Test and write vectors for a specific share."""
1080
1081    test_vectors: Sequence[TestVector] = field(factory=list)
1082    write_vectors: Sequence[WriteVector] = field(factory=list)
1083    new_length: Optional[int] = None
1084
1085    def asdict(self) -> dict:
1086        """Return dictionary suitable for sending over CBOR."""
1087        d = asdict(self)
1088        d["test"] = d.pop("test_vectors")
1089        d["write"] = d.pop("write_vectors")
1090        d["new-length"] = d.pop("new_length")
1091        return d
1092
1093
1094@frozen
1095class ReadTestWriteResult:
1096    """Result of sending read-test-write vectors."""
1097
1098    success: bool
1099    # Map share numbers to reads corresponding to the request's list of
1100    # ReadVectors:
1101    reads: Mapping[int, Sequence[bytes]]
1102
1103
1104# Result type for mutable read/test/write HTTP response. Can't just use
1105# dict[int,list[bytes]] because on Python 3.8 that will error out.
1106MUTABLE_RTW = TypedDict(
1107    "MUTABLE_RTW", {"success": bool, "data": Mapping[int, Sequence[bytes]]}
1108)
1109
1110
1111@frozen
1112class StorageClientMutables:
1113    """
1114    APIs for interacting with mutables.
1115    """
1116
1117    _client: StorageClient
1118
1119    @async_to_deferred
1120    async def read_test_write_chunks(
1121        self,
1122        storage_index: bytes,
1123        write_enabler_secret: bytes,
1124        lease_renew_secret: bytes,
1125        lease_cancel_secret: bytes,
1126        testwrite_vectors: dict[int, TestWriteVectors],
1127        read_vector: list[ReadVector],
1128    ) -> ReadTestWriteResult:
1129        """
1130        Read, test, and possibly write chunks to a particular mutable storage
1131        index.
1132
1133        Reads are done before writes.
1134
1135        Given a mapping between share numbers and test/write vectors, the tests
1136        are done and if they are valid the writes are done.
1137        """
1138        with start_action(
1139            action_type="allmydata:storage:http-client:mutable:read-test-write",
1140            storage_index=si_to_human_readable(storage_index),
1141        ):
1142            return await self._read_test_write_chunks(
1143                storage_index,
1144                write_enabler_secret,
1145                lease_renew_secret,
1146                lease_cancel_secret,
1147                testwrite_vectors,
1148                read_vector,
1149            )
1150
1151    async def _read_test_write_chunks(
1152        self,
1153        storage_index: bytes,
1154        write_enabler_secret: bytes,
1155        lease_renew_secret: bytes,
1156        lease_cancel_secret: bytes,
1157        testwrite_vectors: dict[int, TestWriteVectors],
1158        read_vector: list[ReadVector],
1159    ) -> ReadTestWriteResult:
1160        """Implementation of ``read_test_write_chunks()``."""
1161        url = self._client.relative_url(
1162            "/storage/v1/mutable/{}/read-test-write".format(_encode_si(storage_index))
1163        )
1164        message = {
1165            "test-write-vectors": {
1166                share_number: twv.asdict()
1167                for (share_number, twv) in testwrite_vectors.items()
1168            },
1169            "read-vector": [asdict(r) for r in read_vector],
1170        }
1171        response = await self._client.request(
1172            "POST",
1173            url,
1174            write_enabler_secret=write_enabler_secret,
1175            lease_renew_secret=lease_renew_secret,
1176            lease_cancel_secret=lease_cancel_secret,
1177            message_to_serialize=message,
1178        )
1179        if response.code == http.OK:
1180            result = cast(
1181                MUTABLE_RTW,
1182                await self._client.decode_cbor(
1183                    response, _SCHEMAS["mutable_read_test_write"]
1184                ),
1185            )
1186            return ReadTestWriteResult(success=result["success"], reads=result["data"])
1187        else:
1188            raise ClientException(response.code, (await response.content()))
1189
1190    @async_to_deferred
1191    async def read_share_chunk(
1192        self,
1193        storage_index: bytes,
1194        share_number: int,
1195        offset: int,
1196        length: int,
1197    ) -> bytes:
1198        """
1199        Download a chunk of data from a share.
1200        """
1201        with start_action(
1202            action_type="allmydata:storage:http-client:mutable:read-share-chunk",
1203            storage_index=si_to_human_readable(storage_index),
1204            share_number=share_number,
1205            offset=offset,
1206            length=length,
1207        ) as ctx:
1208            result = await read_share_chunk(
1209                self._client, "mutable", storage_index, share_number, offset, length
1210            )
1211            ctx.add_success_fields(data_len=len(result))
1212            return result
1213
1214    @async_to_deferred
1215    async def list_shares(self, storage_index: bytes) -> Set[int]:
1216        """
1217        List the share numbers for a given storage index.
1218        """
1219        with start_action(
1220            action_type="allmydata:storage:http-client:mutable:list-shares",
1221            storage_index=si_to_human_readable(storage_index),
1222        ) as ctx:
1223            result = await self._list_shares(storage_index)
1224            ctx.add_success_fields(shares=result)
1225            return result
1226
1227    async def _list_shares(self, storage_index: bytes) -> Set[int]:
1228        """Implementation of ``list_shares()``."""
1229        url = self._client.relative_url(
1230            "/storage/v1/mutable/{}/shares".format(_encode_si(storage_index))
1231        )
1232        response = await self._client.request("GET", url)
1233        if response.code == http.OK:
1234            return cast(
1235                Set[int],
1236                await self._client.decode_cbor(
1237                    response,
1238                    _SCHEMAS["mutable_list_shares"],
1239                ),
1240            )
1241        else:
1242            raise ClientException(response.code)
1243
1244    @async_to_deferred
1245    async def advise_corrupt_share(
1246        self,
1247        storage_index: bytes,
1248        share_number: int,
1249        reason: str,
1250    ) -> None:
1251        """Indicate a share has been corrupted, with a human-readable message."""
1252        with start_action(
1253            action_type="allmydata:storage:http-client:mutable:advise-corrupt-share",
1254            storage_index=si_to_human_readable(storage_index),
1255            share_number=share_number,
1256            reason=reason,
1257        ):
1258            await advise_corrupt_share(
1259                self._client, "mutable", storage_index, share_number, reason
1260            )
Note: See TracBrowser for help on using the repository browser.