source: trunk/src/allmydata/storage/http_server.py

Last change on this file was fced1ab0, checked in by Itamar Turner-Trauring <itamar@…>, at 2024-01-24T18:50:55Z

Switch to using pycddl for CBOR decoding.

  • Property mode set to 100644
File size: 38.6 KB
Line 
1"""
2HTTP server for storage.
3"""
4
5from __future__ import annotations
6
7from typing import (
8    Any,
9    Callable,
10    Union,
11    cast,
12    Optional,
13    TypeVar,
14    Sequence,
15    Protocol,
16    Dict,
17)
18from typing_extensions import ParamSpec, Concatenate
19from functools import wraps
20from base64 import b64decode
21import binascii
22from tempfile import TemporaryFile
23from os import SEEK_END, SEEK_SET
24import mmap
25
26from eliot import start_action
27from cryptography.x509 import Certificate as CryptoCertificate
28from zope.interface import implementer
29from klein import Klein, KleinRenderable
30from klein.resource import KleinResource
31from twisted.web import http
32from twisted.internet.interfaces import (
33    IListeningPort,
34    IStreamServerEndpoint,
35    IPullProducer,
36    IProtocolFactory,
37)
38from twisted.internet.address import IPv4Address, IPv6Address
39from twisted.internet.defer import Deferred
40from twisted.internet.ssl import CertificateOptions, Certificate, PrivateCertificate
41from twisted.internet.interfaces import IReactorFromThreads
42from twisted.web.server import Site, Request
43from twisted.web.iweb import IRequest
44from twisted.protocols.tls import TLSMemoryBIOFactory
45from twisted.python.filepath import FilePath
46from twisted.python.failure import Failure
47
48from attrs import define, field, Factory
49from werkzeug.http import (
50    parse_range_header,
51    parse_content_range_header,
52    parse_accept_header,
53)
54from werkzeug.routing import BaseConverter, ValidationError
55from werkzeug.datastructures import ContentRange
56from hyperlink import DecodedURL
57from cryptography.x509 import load_pem_x509_certificate
58
59
60from pycddl import Schema, ValidationError as CDDLValidationError
61from .server import StorageServer
62from .http_common import (
63    swissnum_auth_header,
64    Secrets,
65    get_content_type,
66    CBOR_MIME_TYPE,
67    get_spki_hash,
68)
69
70from .common import si_a2b
71from .immutable import BucketWriter, ConflictingWriteError
72from ..util.hashutil import timing_safe_compare
73from ..util.base32 import rfc3548_alphabet
74from ..util.deferredutil import async_to_deferred
75from ..util.cputhreadpool import defer_to_thread
76from ..util import cbor
77from ..interfaces import BadWriteEnablerError
78
79
80class ClientSecretsException(Exception):
81    """The client did not send the appropriate secrets."""
82
83
84def _extract_secrets(
85    header_values: Sequence[str], required_secrets: set[Secrets]
86) -> dict[Secrets, bytes]:
87    """
88    Given list of values of ``X-Tahoe-Authorization`` headers, and required
89    secrets, return dictionary mapping secrets to decoded values.
90
91    If too few secrets were given, or too many, a ``ClientSecretsException`` is
92    raised; its text is sent in the HTTP response.
93    """
94    string_key_to_enum = {e.value: e for e in Secrets}
95    result = {}
96    try:
97        for header_value in header_values:
98            string_key, string_value = header_value.strip().split(" ", 1)
99            key = string_key_to_enum[string_key]
100            value = b64decode(string_value)
101            if value == b"":
102                raise ClientSecretsException(
103                    "Failed to decode secret {}".format(string_key)
104                )
105            if key in (Secrets.LEASE_CANCEL, Secrets.LEASE_RENEW) and len(value) != 32:
106                raise ClientSecretsException("Lease secrets must be 32 bytes long")
107            result[key] = value
108    except (ValueError, KeyError):
109        raise ClientSecretsException("Bad header value(s): {}".format(header_values))
110    if result.keys() != required_secrets:
111        raise ClientSecretsException(
112            "Expected {} in X-Tahoe-Authorization headers, got {}".format(
113                [r.value for r in required_secrets], list(result.keys())
114            )
115        )
116    return result
117
118
119class BaseApp(Protocol):
120    """Protocol for ``HTTPServer`` and testing equivalent."""
121
122    _swissnum: bytes
123
124
125P = ParamSpec("P")
126T = TypeVar("T")
127SecretsDict = Dict[Secrets, bytes]
128App = TypeVar("App", bound=BaseApp)
129
130
131def _authorization_decorator(
132    required_secrets: set[Secrets],
133) -> Callable[
134    [Callable[Concatenate[App, Request, SecretsDict, P], T]],
135    Callable[Concatenate[App, Request, P], T],
136]:
137    """
138    1. Check the ``Authorization`` header matches server swissnum.
139    2. Extract ``X-Tahoe-Authorization`` headers and pass them in.
140    3. Log the request and response.
141    """
142
143    def decorator(
144        f: Callable[Concatenate[App, Request, SecretsDict, P], T]
145    ) -> Callable[Concatenate[App, Request, P], T]:
146        @wraps(f)
147        def route(
148            self: App,
149            request: Request,
150            *args: P.args,
151            **kwargs: P.kwargs,
152        ) -> T:
153            # Don't set text/html content type by default.
154            # None is actually supported, see https://github.com/twisted/twisted/issues/11902
155            request.defaultContentType = None  # type: ignore[assignment]
156
157            with start_action(
158                action_type="allmydata:storage:http-server:handle-request",
159                method=request.method,
160                path=request.path,
161            ) as ctx:
162                try:
163                    # Check Authorization header:
164                    try:
165                        auth_header = request.requestHeaders.getRawHeaders(
166                            "Authorization", [""]
167                        )[0].encode("utf-8")
168                    except UnicodeError:
169                        raise _HTTPError(http.BAD_REQUEST, "Bad Authorization header")
170                    if not timing_safe_compare(
171                        auth_header,
172                        swissnum_auth_header(self._swissnum),
173                    ):
174                        raise _HTTPError(
175                            http.UNAUTHORIZED, "Wrong Authorization header"
176                        )
177
178                    # Check secrets:
179                    authorization = request.requestHeaders.getRawHeaders(
180                        "X-Tahoe-Authorization", []
181                    )
182                    try:
183                        secrets = _extract_secrets(authorization, required_secrets)
184                    except ClientSecretsException as e:
185                        raise _HTTPError(http.BAD_REQUEST, str(e))
186
187                    # Run the business logic:
188                    result = f(self, request, secrets, *args, **kwargs)
189                except _HTTPError as e:
190                    # This isn't an error necessarily for logging purposes,
191                    # it's an implementation detail, an easier way to set
192                    # response codes.
193                    ctx.add_success_fields(response_code=e.code)
194                    ctx.finish()
195                    raise
196                else:
197                    ctx.add_success_fields(response_code=request.code)
198                    return result
199
200        return route
201
202    return decorator
203
204
205def _authorized_route(
206    klein_app: Klein,
207    required_secrets: set[Secrets],
208    url: str,
209    *route_args: Any,
210    branch: bool = False,
211    **route_kwargs: Any,
212) -> Callable[
213    [
214        Callable[
215            Concatenate[App, Request, SecretsDict, P],
216            KleinRenderable,
217        ]
218    ],
219    Callable[..., KleinRenderable],
220]:
221    """
222    Like Klein's @route, but with additional support for checking the
223    ``Authorization`` header as well as ``X-Tahoe-Authorization`` headers.  The
224    latter will get passed in as second argument to wrapped functions, a
225    dictionary mapping a ``Secret`` value to the uploaded secret.
226
227    :param required_secrets: Set of required ``Secret`` types.
228    """
229
230    def decorator(
231        f: Callable[
232            Concatenate[App, Request, SecretsDict, P],
233            KleinRenderable,
234        ]
235    ) -> Callable[..., KleinRenderable]:
236        @klein_app.route(url, *route_args, branch=branch, **route_kwargs)  # type: ignore[arg-type]
237        @_authorization_decorator(required_secrets)
238        @wraps(f)
239        def handle_route(
240            app: App,
241            request: Request,
242            secrets: SecretsDict,
243            *args: P.args,
244            **kwargs: P.kwargs,
245        ) -> KleinRenderable:
246            return f(app, request, secrets, *args, **kwargs)
247
248        return handle_route
249
250    return decorator
251
252
253@define
254class StorageIndexUploads(object):
255    """
256    In-progress upload to storage index.
257    """
258
259    # Map share number to BucketWriter
260    shares: dict[int, BucketWriter] = Factory(dict)
261
262    # Map share number to the upload secret (different shares might have
263    # different upload secrets).
264    upload_secrets: dict[int, bytes] = Factory(dict)
265
266
267@define
268class UploadsInProgress(object):
269    """
270    Keep track of uploads for storage indexes.
271    """
272
273    # Map storage index to corresponding uploads-in-progress
274    _uploads: dict[bytes, StorageIndexUploads] = Factory(dict)
275
276    # Map BucketWriter to (storage index, share number)
277    _bucketwriters: dict[BucketWriter, tuple[bytes, int]] = Factory(dict)
278
279    def add_write_bucket(
280        self,
281        storage_index: bytes,
282        share_number: int,
283        upload_secret: bytes,
284        bucket: BucketWriter,
285    ):
286        """Add a new ``BucketWriter`` to be tracked."""
287        si_uploads = self._uploads.setdefault(storage_index, StorageIndexUploads())
288        si_uploads.shares[share_number] = bucket
289        si_uploads.upload_secrets[share_number] = upload_secret
290        self._bucketwriters[bucket] = (storage_index, share_number)
291
292    def get_write_bucket(
293        self, storage_index: bytes, share_number: int, upload_secret: bytes
294    ) -> BucketWriter:
295        """Get the given in-progress immutable share upload."""
296        self.validate_upload_secret(storage_index, share_number, upload_secret)
297        try:
298            return self._uploads[storage_index].shares[share_number]
299        except (KeyError, IndexError):
300            raise _HTTPError(http.NOT_FOUND)
301
302    def remove_write_bucket(self, bucket: BucketWriter) -> None:
303        """Stop tracking the given ``BucketWriter``."""
304        try:
305            storage_index, share_number = self._bucketwriters.pop(bucket)
306        except KeyError:
307            # This is probably a BucketWriter created by Foolscap, so just
308            # ignore it.
309            return
310        uploads_index = self._uploads[storage_index]
311        uploads_index.shares.pop(share_number)
312        uploads_index.upload_secrets.pop(share_number)
313        if not uploads_index.shares:
314            self._uploads.pop(storage_index)
315
316    def validate_upload_secret(
317        self, storage_index: bytes, share_number: int, upload_secret: bytes
318    ) -> None:
319        """
320        Raise an unauthorized-HTTP-response exception if the given
321        storage_index+share_number have a different upload secret than the
322        given one.
323
324        If the given upload doesn't exist at all, nothing happens.
325        """
326        if storage_index in self._uploads:
327            in_progress = self._uploads[storage_index]
328            # For pre-existing upload, make sure password matches.
329            if share_number in in_progress.upload_secrets and not timing_safe_compare(
330                in_progress.upload_secrets[share_number], upload_secret
331            ):
332                raise _HTTPError(http.UNAUTHORIZED)
333
334
335class StorageIndexConverter(BaseConverter):
336    """Parser/validator for storage index URL path segments."""
337
338    regex = "[" + str(rfc3548_alphabet, "ascii") + "]{26}"
339
340    def to_python(self, value: str) -> bytes:
341        try:
342            return si_a2b(value.encode("ascii"))
343        except (AssertionError, binascii.Error, ValueError):
344            raise ValidationError("Invalid storage index")
345
346
347class _HTTPError(Exception):
348    """
349    Raise from ``HTTPServer`` endpoint to return the given HTTP response code.
350    """
351
352    def __init__(self, code: int, body: Optional[str] = None):
353        Exception.__init__(self, (code, body))
354        self.code = code
355        self.body = body
356
357
358# CDDL schemas.
359#
360# Tags are of the form #6.nnn, where the number is documented at
361# https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258
362# indicates a set.
363#
364# Somewhat arbitrary limits are set to reduce e.g. number of shares, number of
365# vectors, etc.. These may need to be iterated on in future revisions of the
366# code.
367_SCHEMAS = {
368    "allocate_buckets": Schema(
369        """
370    request = {
371      share-numbers: #6.258([0*256 uint])
372      allocated-size: uint
373    }
374    """
375    ),
376    "advise_corrupt_share": Schema(
377        """
378    request = {
379      reason: tstr .size (1..32765)
380    }
381    """
382    ),
383    "mutable_read_test_write": Schema(
384        """
385        request = {
386            "test-write-vectors": {
387                0*256 share_number : {
388                    "test": [0*30 {"offset": uint, "size": uint, "specimen": bstr}]
389                    "write": [* {"offset": uint, "data": bstr}]
390                    "new-length": uint / null
391                }
392            }
393            "read-vector": [0*30 {"offset": uint, "size": uint}]
394        }
395        share_number = uint
396        """
397    ),
398}
399
400
401# Callable that takes offset and length, returns the data at that range.
402ReadData = Callable[[int, int], bytes]
403
404
405@implementer(IPullProducer)
406@define
407class _ReadAllProducer:
408    """
409    Producer that calls a read function repeatedly to read all the data, and
410    writes to a request.
411    """
412
413    request: Request
414    read_data: ReadData
415    result: Deferred = Factory(Deferred)
416    start: int = field(default=0)
417
418    @classmethod
419    def produce_to(cls, request: Request, read_data: ReadData) -> Deferred[bytes]:
420        """
421        Create and register the producer, returning ``Deferred`` that should be
422        returned from a HTTP server endpoint.
423        """
424        producer = cls(request, read_data)
425        request.registerProducer(producer, False)
426        return producer.result
427
428    def resumeProducing(self) -> None:
429        data = self.read_data(self.start, 65536)
430        if not data:
431            self.request.unregisterProducer()
432            d = self.result
433            del self.result
434            d.callback(b"")
435            return
436        self.request.write(data)
437        self.start += len(data)
438
439    def pauseProducing(self) -> None:
440        pass
441
442    def stopProducing(self) -> None:
443        pass
444
445
446@implementer(IPullProducer)
447@define
448class _ReadRangeProducer:
449    """
450    Producer that calls a read function to read a range of data, and writes to
451    a request.
452    """
453
454    request: Optional[Request]
455    read_data: ReadData
456    result: Optional[Deferred[bytes]]
457    start: int
458    remaining: int
459
460    def resumeProducing(self) -> None:
461        if self.result is None or self.request is None:
462            return
463
464        to_read = min(self.remaining, 65536)
465        data = self.read_data(self.start, to_read)
466        assert len(data) <= to_read
467
468        if not data and self.remaining > 0:
469            d, self.result = self.result, None
470            d.errback(
471                ValueError(
472                    f"Should be {self.remaining} bytes left, but we got an empty read"
473                )
474            )
475            self.stopProducing()
476            return
477
478        if len(data) > self.remaining:
479            d, self.result = self.result, None
480            d.errback(
481                ValueError(
482                    f"Should be {self.remaining} bytes left, but we got more than that ({len(data)})!"
483                )
484            )
485            self.stopProducing()
486            return
487
488        self.start += len(data)
489        self.remaining -= len(data)
490        assert self.remaining >= 0
491
492        self.request.write(data)
493
494        if self.remaining == 0:
495            self.stopProducing()
496
497    def pauseProducing(self) -> None:
498        pass
499
500    def stopProducing(self) -> None:
501        if self.request is not None:
502            self.request.unregisterProducer()
503            self.request = None
504        if self.result is not None:
505            d = self.result
506            self.result = None
507            d.callback(b"")
508
509
510def read_range(
511    request: Request, read_data: ReadData, share_length: int
512) -> Union[Deferred[bytes], bytes]:
513    """
514    Read an optional ``Range`` header, reads data appropriately via the given
515    callable, writes the data to the request.
516
517    Only parses a subset of ``Range`` headers that we support: must be set,
518    bytes only, only a single range, the end must be explicitly specified.
519    Raises a ``_HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)`` if parsing is
520    not possible or the header isn't set.
521
522    Takes a function that will do the actual reading given the start offset and
523    a length to read.
524
525    The resulting data is written to the request.
526    """
527
528    def read_data_with_error_handling(offset: int, length: int) -> bytes:
529        try:
530            return read_data(offset, length)
531        except _HTTPError as e:
532            request.setResponseCode(e.code)
533            # Empty read means we're done.
534            return b""
535
536    if request.getHeader("range") is None:
537        return _ReadAllProducer.produce_to(request, read_data_with_error_handling)
538
539    range_header = parse_range_header(request.getHeader("range"))
540    if (
541        range_header is None  # failed to parse
542        or range_header.units != "bytes"
543        or len(range_header.ranges) > 1  # more than one range
544        or range_header.ranges[0][1] is None  # range without end
545    ):
546        raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)
547
548    offset, end = range_header.ranges[0]
549    assert end is not None  # should've exited in block above this if so
550
551    # If we're being ask to read beyond the length of the share, just read
552    # less:
553    end = min(end, share_length)
554    if offset >= end:
555        # Basically we'd need to return an empty body. However, the
556        # Content-Range header can't actually represent empty lengths... so
557        # (mis)use 204 response code to indicate that.
558        raise _HTTPError(http.NO_CONTENT)
559
560    request.setResponseCode(http.PARTIAL_CONTENT)
561
562    # Actual conversion from Python's exclusive ranges to inclusive ranges is
563    # handled by werkzeug.
564    request.setHeader(
565        "content-range",
566        ContentRange("bytes", offset, end).to_header(),
567    )
568
569    d: Deferred[bytes] = Deferred()
570    request.registerProducer(
571        _ReadRangeProducer(
572            request, read_data_with_error_handling, d, offset, end - offset
573        ),
574        False,
575    )
576    return d
577
578
579def _add_error_handling(app: Klein) -> None:
580    """Add exception handlers to a Klein app."""
581
582    @app.handle_errors(_HTTPError)
583    def _http_error(self: Any, request: IRequest, failure: Failure) -> KleinRenderable:
584        """Handle ``_HTTPError`` exceptions."""
585        assert isinstance(failure.value, _HTTPError)
586        request.setResponseCode(failure.value.code)
587        if failure.value.body is not None:
588            return failure.value.body
589        else:
590            return b""
591
592    @app.handle_errors(CDDLValidationError)
593    def _cddl_validation_error(
594        self: Any, request: IRequest, failure: Failure
595    ) -> KleinRenderable:
596        """Handle CDDL validation errors."""
597        request.setResponseCode(http.BAD_REQUEST)
598        return str(failure.value).encode("utf-8")
599
600
601async def read_encoded(
602    reactor, request, schema: Schema, max_size: int = 1024 * 1024
603) -> Any:
604    """
605    Read encoded request body data, decoding it with CBOR by default.
606
607    Somewhat arbitrarily, limit body size to 1MiB by default.
608    """
609    content_type = get_content_type(request.requestHeaders)
610    if content_type is None:
611        content_type = CBOR_MIME_TYPE
612    if content_type != CBOR_MIME_TYPE:
613        raise _HTTPError(http.UNSUPPORTED_MEDIA_TYPE)
614
615    # Make sure it's not too large:
616    request.content.seek(0, SEEK_END)
617    size = request.content.tell()
618    if size > max_size:
619        raise _HTTPError(http.REQUEST_ENTITY_TOO_LARGE)
620    request.content.seek(0, SEEK_SET)
621
622    # We don't want to load the whole message into memory, cause it might
623    # be quite large. The CDDL validator takes a read-only bytes-like
624    # thing. Luckily, for large request bodies twisted.web will buffer the
625    # data in a file, so we can use mmap() to get a memory view. The CDDL
626    # validator will not make a copy, so it won't increase memory usage
627    # beyond that.
628    try:
629        fd = request.content.fileno()
630    except (ValueError, OSError):
631        fd = -1
632    if fd >= 0:
633        # It's a file, so we can use mmap() to save memory.
634        message = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
635    else:
636        message = request.content.read()
637
638    # Pycddl will release the GIL when validating larger documents, so
639    # let's take advantage of multiple CPUs:
640    decoded = await defer_to_thread(schema.validate_cbor, message, True)
641    return decoded
642
643class HTTPServer(BaseApp):
644    """
645    A HTTP interface to the storage server.
646    """
647
648    _app = Klein()
649    _app.url_map.converters["storage_index"] = StorageIndexConverter
650    _add_error_handling(_app)
651
652    def __init__(
653        self,
654        reactor: IReactorFromThreads,
655        storage_server: StorageServer,
656        swissnum: bytes,
657    ):
658        self._reactor = reactor
659        self._storage_server = storage_server
660        self._swissnum = swissnum
661        # Maps storage index to StorageIndexUploads:
662        self._uploads = UploadsInProgress()
663
664        # When an upload finishes successfully, gets aborted, or times out,
665        # make sure it gets removed from our tracking datastructure:
666        self._storage_server.register_bucket_writer_close_handler(
667            self._uploads.remove_write_bucket
668        )
669
670    def get_resource(self) -> KleinResource:
671        """Return twisted.web ``Resource`` for this object."""
672        return self._app.resource()
673
674    def _send_encoded(self, request: Request, data: object) -> Deferred[bytes]:
675        """
676        Return encoded data suitable for writing as the HTTP body response, by
677        default using CBOR.
678
679        Also sets the appropriate ``Content-Type`` header on the response.
680        """
681        accept_headers = request.requestHeaders.getRawHeaders("accept") or [
682            CBOR_MIME_TYPE
683        ]
684        accept = parse_accept_header(accept_headers[0])
685        if accept.best == CBOR_MIME_TYPE:
686            request.setHeader("Content-Type", CBOR_MIME_TYPE)
687            f = TemporaryFile()
688            cbor.dump(data, f)  # type: ignore
689
690            def read_data(offset: int, length: int) -> bytes:
691                f.seek(offset)
692                return f.read(length)
693
694            return _ReadAllProducer.produce_to(request, read_data)
695        else:
696            # TODO Might want to optionally send JSON someday:
697            # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3861
698            raise _HTTPError(http.NOT_ACCEPTABLE)
699
700    ##### Generic APIs #####
701
702    @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
703    def version(self, request: Request, authorization: SecretsDict) -> KleinRenderable:
704        """Return version information."""
705        return self._send_encoded(request, self._get_version())
706
707    def _get_version(self) -> dict[bytes, Any]:
708        """
709        Get the HTTP version of the storage server's version response.
710
711        This differs from the Foolscap version by omitting certain obsolete
712        fields.
713        """
714        v = self._storage_server.get_version()
715        v1_identifier = b"http://allmydata.org/tahoe/protocols/storage/v1"
716        v1 = v[v1_identifier]
717        return {
718            v1_identifier: {
719                b"maximum-immutable-share-size": v1[b"maximum-immutable-share-size"],
720                b"maximum-mutable-share-size": v1[b"maximum-mutable-share-size"],
721                b"available-space": v1[b"available-space"],
722            },
723            b"application-version": v[b"application-version"],
724        }
725
726    ##### Immutable APIs #####
727
728    @_authorized_route(
729        _app,
730        {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.UPLOAD},
731        "/storage/v1/immutable/<storage_index:storage_index>",
732        methods=["POST"],
733    )
734    @async_to_deferred
735    async def allocate_buckets(
736        self, request: Request, authorization: SecretsDict, storage_index: bytes
737    ) -> KleinRenderable:
738        """Allocate buckets."""
739        upload_secret = authorization[Secrets.UPLOAD]
740        # It's just a list of up to ~256 shares, shouldn't use many bytes.
741        info = await read_encoded(
742            self._reactor, request, _SCHEMAS["allocate_buckets"], max_size=8192
743        )
744
745        # We do NOT validate the upload secret for existing bucket uploads.
746        # Another upload may be happening in parallel, with a different upload
747        # key. That's fine! If a client tries to _write_ to that upload, they
748        # need to have an upload key. That does mean we leak the existence of
749        # these parallel uploads, but if you know storage index you can
750        # download them once upload finishes, so it's not a big deal to leak
751        # that information.
752
753        already_got, sharenum_to_bucket = self._storage_server.allocate_buckets(
754            storage_index,
755            renew_secret=authorization[Secrets.LEASE_RENEW],
756            cancel_secret=authorization[Secrets.LEASE_CANCEL],
757            sharenums=info["share-numbers"],
758            allocated_size=info["allocated-size"],
759        )
760        for share_number, bucket in sharenum_to_bucket.items():
761            self._uploads.add_write_bucket(
762                storage_index, share_number, upload_secret, bucket
763            )
764
765        return await self._send_encoded(
766            request,
767            {"already-have": set(already_got), "allocated": set(sharenum_to_bucket)},
768        )
769
770    @_authorized_route(
771        _app,
772        {Secrets.UPLOAD},
773        "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/abort",
774        methods=["PUT"],
775    )
776    def abort_share_upload(
777        self,
778        request: Request,
779        authorization: SecretsDict,
780        storage_index: bytes,
781        share_number: int,
782    ) -> KleinRenderable:
783        """Abort an in-progress immutable share upload."""
784        try:
785            bucket = self._uploads.get_write_bucket(
786                storage_index, share_number, authorization[Secrets.UPLOAD]
787            )
788        except _HTTPError as e:
789            if e.code == http.NOT_FOUND:
790                # It may be we've already uploaded this, in which case error
791                # should be method not allowed (405).
792                try:
793                    self._storage_server.get_buckets(storage_index)[share_number]
794                except KeyError:
795                    pass
796                else:
797                    # Already uploaded, so we can't abort.
798                    raise _HTTPError(http.NOT_ALLOWED)
799            raise
800
801        # Abort the upload; this should close it which will eventually result
802        # in self._uploads.remove_write_bucket() being called.
803        bucket.abort()
804
805        return b""
806
807    @_authorized_route(
808        _app,
809        {Secrets.UPLOAD},
810        "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
811        methods=["PATCH"],
812    )
813    def write_share_data(
814        self,
815        request: Request,
816        authorization: SecretsDict,
817        storage_index: bytes,
818        share_number: int,
819    ) -> KleinRenderable:
820        """Write data to an in-progress immutable upload."""
821        content_range = parse_content_range_header(request.getHeader("content-range"))
822        if content_range is None or content_range.units != "bytes":
823            request.setResponseCode(http.REQUESTED_RANGE_NOT_SATISFIABLE)
824            return b""
825
826        bucket = self._uploads.get_write_bucket(
827            storage_index, share_number, authorization[Secrets.UPLOAD]
828        )
829        offset = content_range.start or 0
830        # We don't support an unspecified stop for the range:
831        assert content_range.stop is not None
832        # Missing body makes no sense:
833        assert request.content is not None
834        remaining = content_range.stop - offset
835        finished = False
836
837        while remaining > 0:
838            data = request.content.read(min(remaining, 65536))
839            assert data, "uploaded data length doesn't match range"
840            try:
841                finished = bucket.write(offset, data)
842            except ConflictingWriteError:
843                request.setResponseCode(http.CONFLICT)
844                return b""
845            remaining -= len(data)
846            offset += len(data)
847
848        if finished:
849            bucket.close()
850            request.setResponseCode(http.CREATED)
851        else:
852            request.setResponseCode(http.OK)
853
854        required = []
855        for start, end, _ in bucket.required_ranges().ranges():
856            required.append({"begin": start, "end": end})
857        return self._send_encoded(request, {"required": required})
858
859    @_authorized_route(
860        _app,
861        set(),
862        "/storage/v1/immutable/<storage_index:storage_index>/shares",
863        methods=["GET"],
864    )
865    def list_shares(
866        self, request: Request, authorization: SecretsDict, storage_index: bytes
867    ) -> KleinRenderable:
868        """
869        List shares for the given storage index.
870        """
871        share_numbers = set(self._storage_server.get_buckets(storage_index).keys())
872        return self._send_encoded(request, share_numbers)
873
874    @_authorized_route(
875        _app,
876        set(),
877        "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>",
878        methods=["GET"],
879    )
880    def read_share_chunk(
881        self,
882        request: Request,
883        authorization: SecretsDict,
884        storage_index: bytes,
885        share_number: int,
886    ) -> KleinRenderable:
887        """Read a chunk for an already uploaded immutable."""
888        request.setHeader("content-type", "application/octet-stream")
889        try:
890            bucket = self._storage_server.get_buckets(storage_index)[share_number]
891        except KeyError:
892            request.setResponseCode(http.NOT_FOUND)
893            return b""
894
895        return read_range(request, bucket.read, bucket.get_length())
896
897    @_authorized_route(
898        _app,
899        {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL},
900        "/storage/v1/lease/<storage_index:storage_index>",
901        methods=["PUT"],
902    )
903    def add_or_renew_lease(
904        self, request: Request, authorization: SecretsDict, storage_index: bytes
905    ) -> KleinRenderable:
906        """Update the lease for an immutable or mutable share."""
907        if not list(self._storage_server.get_shares(storage_index)):
908            raise _HTTPError(http.NOT_FOUND)
909
910        # Checking of the renewal secret is done by the backend.
911        self._storage_server.add_lease(
912            storage_index,
913            authorization[Secrets.LEASE_RENEW],
914            authorization[Secrets.LEASE_CANCEL],
915        )
916
917        request.setResponseCode(http.NO_CONTENT)
918        return b""
919
920    @_authorized_route(
921        _app,
922        set(),
923        "/storage/v1/immutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
924        methods=["POST"],
925    )
926    @async_to_deferred
927    async def advise_corrupt_share_immutable(
928        self,
929        request: Request,
930        authorization: SecretsDict,
931        storage_index: bytes,
932        share_number: int,
933    ) -> KleinRenderable:
934        """Indicate that given share is corrupt, with a text reason."""
935        try:
936            bucket = self._storage_server.get_buckets(storage_index)[share_number]
937        except KeyError:
938            raise _HTTPError(http.NOT_FOUND)
939
940        # The reason can be a string with explanation, so in theory it could be
941        # longish?
942        info = await read_encoded(
943            self._reactor,
944            request,
945            _SCHEMAS["advise_corrupt_share"],
946            max_size=32768,
947        )
948        bucket.advise_corrupt_share(info["reason"].encode("utf-8"))
949        return b""
950
951    ##### Mutable APIs #####
952
953    @_authorized_route(
954        _app,
955        {Secrets.LEASE_RENEW, Secrets.LEASE_CANCEL, Secrets.WRITE_ENABLER},
956        "/storage/v1/mutable/<storage_index:storage_index>/read-test-write",
957        methods=["POST"],
958    )
959    @async_to_deferred
960    async def mutable_read_test_write(
961        self, request: Request, authorization: SecretsDict, storage_index: bytes
962    ) -> KleinRenderable:
963        """Read/test/write combined operation for mutables."""
964        rtw_request = await read_encoded(
965            self._reactor,
966            request,
967            _SCHEMAS["mutable_read_test_write"],
968            max_size=2**48,
969        )
970        secrets = (
971            authorization[Secrets.WRITE_ENABLER],
972            authorization[Secrets.LEASE_RENEW],
973            authorization[Secrets.LEASE_CANCEL],
974        )
975        try:
976            success, read_data = self._storage_server.slot_testv_and_readv_and_writev(
977                storage_index,
978                secrets,
979                {
980                    k: (
981                        [
982                            (d["offset"], d["size"], b"eq", d["specimen"])
983                            for d in v["test"]
984                        ],
985                        [(d["offset"], d["data"]) for d in v["write"]],
986                        v["new-length"],
987                    )
988                    for (k, v) in rtw_request["test-write-vectors"].items()
989                },
990                [(d["offset"], d["size"]) for d in rtw_request["read-vector"]],
991            )
992        except BadWriteEnablerError:
993            raise _HTTPError(http.UNAUTHORIZED)
994        return await self._send_encoded(
995            request, {"success": success, "data": read_data}
996        )
997
998    @_authorized_route(
999        _app,
1000        set(),
1001        "/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>",
1002        methods=["GET"],
1003    )
1004    def read_mutable_chunk(
1005        self,
1006        request: Request,
1007        authorization: SecretsDict,
1008        storage_index: bytes,
1009        share_number: int,
1010    ) -> KleinRenderable:
1011        """Read a chunk from a mutable."""
1012        request.setHeader("content-type", "application/octet-stream")
1013
1014        try:
1015            share_length = self._storage_server.get_mutable_share_length(
1016                storage_index, share_number
1017            )
1018        except KeyError:
1019            raise _HTTPError(http.NOT_FOUND)
1020
1021        def read_data(offset, length):
1022            try:
1023                return self._storage_server.slot_readv(
1024                    storage_index, [share_number], [(offset, length)]
1025                )[share_number][0]
1026            except KeyError:
1027                raise _HTTPError(http.NOT_FOUND)
1028
1029        return read_range(request, read_data, share_length)
1030
1031    @_authorized_route(
1032        _app,
1033        set(),
1034        "/storage/v1/mutable/<storage_index:storage_index>/shares",
1035        methods=["GET"],
1036    )
1037    def enumerate_mutable_shares(self, request, authorization, storage_index):
1038        """List mutable shares for a storage index."""
1039        shares = self._storage_server.enumerate_mutable_shares(storage_index)
1040        return self._send_encoded(request, shares)
1041
1042    @_authorized_route(
1043        _app,
1044        set(),
1045        "/storage/v1/mutable/<storage_index:storage_index>/<int(signed=False):share_number>/corrupt",
1046        methods=["POST"],
1047    )
1048    @async_to_deferred
1049    async def advise_corrupt_share_mutable(
1050        self,
1051        request: Request,
1052        authorization: SecretsDict,
1053        storage_index: bytes,
1054        share_number: int,
1055    ) -> KleinRenderable:
1056        """Indicate that given share is corrupt, with a text reason."""
1057        if share_number not in {
1058            shnum for (shnum, _) in self._storage_server.get_shares(storage_index)
1059        }:
1060            raise _HTTPError(http.NOT_FOUND)
1061
1062        # The reason can be a string with explanation, so in theory it could be
1063        # longish?
1064        info = await read_encoded(
1065            self._reactor, request, _SCHEMAS["advise_corrupt_share"], max_size=32768
1066        )
1067        self._storage_server.advise_corrupt_share(
1068            b"mutable", storage_index, share_number, info["reason"].encode("utf-8")
1069        )
1070        return b""
1071
1072
1073@implementer(IStreamServerEndpoint)
1074@define
1075class _TLSEndpointWrapper(object):
1076    """
1077    Wrap an existing endpoint with the server-side storage TLS policy.  This is
1078    useful because not all Tahoe-LAFS endpoints might be plain TCP+TLS, for
1079    example there's Tor and i2p.
1080    """
1081
1082    endpoint: IStreamServerEndpoint
1083    context_factory: CertificateOptions
1084
1085    @classmethod
1086    def from_paths(
1087        cls: type[_TLSEndpointWrapper],
1088        endpoint: IStreamServerEndpoint,
1089        private_key_path: FilePath,
1090        cert_path: FilePath,
1091    ) -> "_TLSEndpointWrapper":
1092        """
1093        Create an endpoint with the given private key and certificate paths on
1094        the filesystem.
1095        """
1096        certificate = Certificate.loadPEM(cert_path.getContent()).original
1097        private_key = PrivateCertificate.loadPEM(
1098            cert_path.getContent() + b"\n" + private_key_path.getContent()
1099        ).privateKey.original
1100        certificate_options = CertificateOptions(
1101            privateKey=private_key, certificate=certificate
1102        )
1103        return cls(endpoint=endpoint, context_factory=certificate_options)
1104
1105    def listen(self, factory: IProtocolFactory) -> Deferred[IListeningPort]:
1106        return self.endpoint.listen(
1107            TLSMemoryBIOFactory(self.context_factory, False, factory)
1108        )
1109
1110
1111def build_nurl(
1112    hostname: str,
1113    port: int,
1114    swissnum: str,
1115    certificate: CryptoCertificate,
1116    subscheme: Optional[str] = None,
1117) -> DecodedURL:
1118    """
1119    Construct a HTTPS NURL, given the hostname, port, server swissnum, and x509
1120    certificate for the server.  Clients can then connect to the server using
1121    this NURL.
1122    """
1123    scheme = "pb"
1124    if subscheme is not None:
1125        scheme = f"{scheme}+{subscheme}"
1126    return DecodedURL().replace(
1127        fragment="v=1",  # how we know this NURL is HTTP-based (i.e. not Foolscap)
1128        host=hostname,
1129        port=port,
1130        path=(swissnum,),
1131        userinfo=(
1132            str(
1133                get_spki_hash(certificate),
1134                "ascii",
1135            ),
1136        ),
1137        scheme=scheme,
1138    )
1139
1140
1141def listen_tls(
1142    server: HTTPServer,
1143    hostname: str,
1144    endpoint: IStreamServerEndpoint,
1145    private_key_path: FilePath,
1146    cert_path: FilePath,
1147) -> Deferred[tuple[DecodedURL, IListeningPort]]:
1148    """
1149    Start a HTTPS storage server on the given port, return the NURL and the
1150    listening port.
1151
1152    The hostname is the external IP or hostname clients will connect to, used
1153    to constrtuct the NURL; it does not modify what interfaces the server
1154    listens on.
1155
1156    This will likely need to be updated eventually to handle Tor/i2p.
1157    """
1158    endpoint = _TLSEndpointWrapper.from_paths(endpoint, private_key_path, cert_path)
1159
1160    def get_nurl(listening_port: IListeningPort) -> DecodedURL:
1161        address = cast(Union[IPv4Address, IPv6Address], listening_port.getHost())
1162        return build_nurl(
1163            hostname,
1164            address.port,
1165            str(server._swissnum, "ascii"),
1166            load_pem_x509_certificate(cert_path.getContent()),
1167        )
1168
1169    return endpoint.listen(Site(server.get_resource())).addCallback(
1170        lambda listening_port: (get_nurl(listening_port), listening_port)
1171    )
Note: See TracBrowser for help on using the repository browser.