source: trunk/src/allmydata/test/test_storage_http.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 62.5 KB
Line 
1"""
2Tests for HTTP storage client + server.
3
4The tests here are synchronous and don't involve running a real reactor.  This
5works, but has some caveats when it comes to testing HTTP endpoints:
6
7* Some HTTP endpoints are synchronous, some are not.
8* For synchronous endpoints, the result is immediately available on the
9  ``Deferred`` coming out of ``StubTreq``.
10* For asynchronous endpoints, you need to use ``StubTreq.flush()`` and
11  iterate the fake in-memory clock/reactor to advance time .
12
13So for HTTP endpoints, you should use ``HttpTestFixture.result_of_with_flush()``
14which handles both, and patches and moves forward the global Twisted
15``Cooperator`` since that is used to drive pull producers. This is,
16sadly, an internal implementation detail of Twisted being leaked to tests...
17
18For definitely synchronous calls, you can just use ``result_of()``.
19"""
20
21import time
22from base64 import b64encode
23from contextlib import contextmanager
24from os import urandom
25from typing import Union, Callable, Tuple, Iterable
26from queue import Queue
27from pycddl import ValidationError as CDDLValidationError
28from hypothesis import assume, given, strategies as st, settings as hypothesis_settings
29from fixtures import Fixture, TempDir, MonkeyPatch
30from treq.client import HTTPClient
31from treq.testing import StubTreq, RequestTraversalAgent
32from klein import Klein
33from hyperlink import DecodedURL
34from collections_extended import RangeMap
35from twisted.internet.task import Clock, Cooperator
36from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
37from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
38from twisted.web import http
39from twisted.web.http_headers import Headers
40from werkzeug import routing
41from werkzeug.exceptions import NotFound as WNotFound
42from testtools.matchers import Equals
43from zope.interface import implementer
44
45from ..util.cbor import dumps
46from ..util.deferredutil import async_to_deferred
47from ..util.cputhreadpool import disable_thread_pool_for_test
48from .common import SyncTestCase
49from ..storage.http_common import (
50    get_content_type,
51    CBOR_MIME_TYPE,
52    response_is_not_html,
53)
54from ..storage.common import si_b2a
55from ..storage.lease import LeaseInfo
56from ..storage.server import StorageServer
57from ..storage.http_server import (
58    HTTPServer,
59    _extract_secrets,
60    Secrets,
61    ClientSecretsException,
62    _authorized_route,
63    StorageIndexConverter,
64    _add_error_handling,
65    read_encoded,
66    _SCHEMAS as SERVER_SCHEMAS,
67    BaseApp,
68)
69from ..storage.http_client import (
70    StorageClient,
71    StorageClientFactory,
72    ClientException,
73    StorageClientImmutables,
74    ImmutableCreateResult,
75    UploadProgress,
76    StorageClientGeneral,
77    _encode_si,
78    StorageClientMutables,
79    TestWriteVectors,
80    WriteVector,
81    ReadVector,
82    ReadTestWriteResult,
83    TestVector,
84    limited_content,
85)
86
87
88class HTTPUtilities(SyncTestCase):
89    """Tests for HTTP common utilities."""
90
91    def test_get_content_type(self):
92        """``get_content_type()`` extracts the content-type from the header."""
93
94        def assert_header_values_result(values, expected_content_type):
95            headers = Headers()
96            if values:
97                headers.setRawHeaders("Content-Type", values)
98            content_type = get_content_type(headers)
99            self.assertEqual(content_type, expected_content_type)
100
101        assert_header_values_result(["text/html"], "text/html")
102        assert_header_values_result([], None)
103        assert_header_values_result(["text/plain", "application/json"], "text/plain")
104        assert_header_values_result(["text/html;encoding=utf-8"], "text/html")
105
106
107def _post_process(params):
108    secret_types, secrets = params
109    secrets = {t: s for (t, s) in zip(secret_types, secrets)}
110    headers = [
111        "{} {}".format(
112            secret_type.value, str(b64encode(secrets[secret_type]), "ascii").strip()
113        )
114        for secret_type in secret_types
115    ]
116    return secrets, headers
117
118
119# Creates a tuple of ({Secret enum value: secret_bytes}, [http headers with secrets]).
120SECRETS_STRATEGY = (
121    st.sets(st.sampled_from(Secrets))
122    .flatmap(
123        lambda secret_types: st.tuples(
124            st.just(secret_types),
125            st.lists(
126                st.binary(min_size=32, max_size=32),
127                min_size=len(secret_types),
128                max_size=len(secret_types),
129            ),
130        )
131    )
132    .map(_post_process)
133)
134
135
136class ExtractSecretsTests(SyncTestCase):
137    """
138    Tests for ``_extract_secrets``.
139    """
140
141    @given(secrets_to_send=SECRETS_STRATEGY)
142    def test_extract_secrets(self, secrets_to_send):
143        """
144        ``_extract_secrets()`` returns a dictionary with the extracted secrets
145        if the input secrets match the required secrets.
146        """
147        secrets, headers = secrets_to_send
148
149        # No secrets needed, none given:
150        self.assertEqual(_extract_secrets(headers, secrets.keys()), secrets)
151
152    @given(
153        secrets_to_send=SECRETS_STRATEGY,
154        secrets_to_require=st.sets(st.sampled_from(Secrets)),
155    )
156    def test_wrong_number_of_secrets(self, secrets_to_send, secrets_to_require):
157        """
158        If the wrong number of secrets are passed to ``_extract_secrets``, a
159        ``ClientSecretsException`` is raised.
160        """
161        secrets_to_send, headers = secrets_to_send
162        assume(secrets_to_send.keys() != secrets_to_require)
163
164        with self.assertRaises(ClientSecretsException):
165            _extract_secrets(headers, secrets_to_require)
166
167    def test_bad_secret_missing_value(self):
168        """
169        Missing value in ``_extract_secrets`` result in
170        ``ClientSecretsException``.
171        """
172        with self.assertRaises(ClientSecretsException):
173            _extract_secrets(["lease-renew-secret"], {Secrets.LEASE_RENEW})
174
175    def test_bad_secret_unknown_prefix(self):
176        """
177        Missing value in ``_extract_secrets`` result in
178        ``ClientSecretsException``.
179        """
180        with self.assertRaises(ClientSecretsException):
181            _extract_secrets(["FOO eA=="], set())
182
183    def test_bad_secret_not_base64(self):
184        """
185        A non-base64 value in ``_extract_secrets`` result in
186        ``ClientSecretsException``.
187        """
188        with self.assertRaises(ClientSecretsException):
189            _extract_secrets(["lease-renew-secret x"], {Secrets.LEASE_RENEW})
190
191    def test_bad_secret_wrong_length_lease_renew(self):
192        """
193        Lease renewal secrets must be 32-bytes long.
194        """
195        with self.assertRaises(ClientSecretsException):
196            _extract_secrets(["lease-renew-secret eA=="], {Secrets.LEASE_RENEW})
197
198    def test_bad_secret_wrong_length_lease_cancel(self):
199        """
200        Lease cancel secrets must be 32-bytes long.
201        """
202        with self.assertRaises(ClientSecretsException):
203            _extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
204
205
206class RouteConverterTests(SyncTestCase):
207    """Tests for custom werkzeug path segment converters."""
208
209    adapter = routing.Map(
210        [
211            routing.Rule(
212                "/<storage_index:storage_index>/", endpoint="si", methods=["GET"]
213            )
214        ],
215        converters={"storage_index": StorageIndexConverter},
216    ).bind("example.com", "/")
217
218    @given(storage_index=st.binary(min_size=16, max_size=16))
219    def test_good_storage_index_is_parsed(self, storage_index):
220        """
221        A valid storage index is accepted and parsed back out by
222        StorageIndexConverter.
223        """
224        self.assertEqual(
225            self.adapter.match(
226                "/{}/".format(str(si_b2a(storage_index), "ascii")), method="GET"
227            ),
228            ("si", {"storage_index": storage_index}),
229        )
230
231    def test_long_storage_index_is_not_parsed(self):
232        """An overly long storage_index string is not parsed."""
233        with self.assertRaises(WNotFound):
234            self.adapter.match("/{}/".format("a" * 27), method="GET")
235
236    def test_short_storage_index_is_not_parsed(self):
237        """An overly short storage_index string is not parsed."""
238        with self.assertRaises(WNotFound):
239            self.adapter.match("/{}/".format("a" * 25), method="GET")
240
241    def test_bad_characters_storage_index_is_not_parsed(self):
242        """A storage_index string with bad characters is not parsed."""
243        with self.assertRaises(WNotFound):
244            self.adapter.match("/{}_/".format("a" * 25), method="GET")
245
246    def test_invalid_storage_index_is_not_parsed(self):
247        """An invalid storage_index string is not parsed."""
248        with self.assertRaises(WNotFound):
249            self.adapter.match("/nomd2a65ylxjbqzsw7gcfh4ivr/", method="GET")
250
251
252# TODO should be actual swissnum
253SWISSNUM_FOR_TEST = b"abcd"
254
255
256def gen_bytes(length: int) -> bytes:
257    """Generate bytes to the given length."""
258    result = (b"0123456789abcdef" * ((length // 16) + 1))[:length]
259    assert len(result) == length
260    return result
261
262
263class TestApp(BaseApp):
264    """HTTP API for testing purposes."""
265
266    clock: IReactorTime
267    _app = Klein()
268    _add_error_handling(_app)
269    _swissnum = SWISSNUM_FOR_TEST  # Match what the test client is using
270
271    @_authorized_route(_app, set(), "/noop", methods=["GET"])
272    def noop(self, request, authorization):
273        return "noop"
274
275    @_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
276    def validate_upload_secret(self, request, authorization):
277        if authorization == {Secrets.UPLOAD: b"MAGIC"}:
278            return "GOOD SECRET"
279        else:
280            return "BAD: {}".format(authorization)
281
282    @_authorized_route(_app, set(), "/storage/v1/version", methods=["GET"])
283    def bad_version(self, request, authorization):
284        """Return version result that violates the expected schema."""
285        request.setHeader("content-type", CBOR_MIME_TYPE)
286        return dumps({"garbage": 123})
287
288    @_authorized_route(_app, set(), "/bytes/<int:length>", methods=["GET"])
289    def generate_bytes(self, request, authorization, length):
290        """Return bytes to the given length using ``gen_bytes()``."""
291        return gen_bytes(length)
292
293    @_authorized_route(_app, set(), "/slowly_never_finish_result", methods=["GET"])
294    def slowly_never_finish_result(self, request, authorization):
295        """
296        Send data immediately, after 59 seconds, after another 59 seconds, and then
297        never again, without finishing the response.
298        """
299        request.write(b"a")
300        self.clock.callLater(59, request.write, b"b")
301        self.clock.callLater(59 + 59, request.write, b"c")
302        return Deferred()
303
304    @_authorized_route(_app, set(), "/die_unfinished", methods=["GET"])
305    def die(self, request, authorization):
306        """
307        Dies half-way.
308        """
309        request.transport.loseConnection()
310        return Deferred()
311
312    @_authorized_route(_app, set(), "/read_body", methods=["POST"])
313    @async_to_deferred
314    async def read_body(self, request, authorization):
315        """
316        Accept an advise_corrupt_share message, return the reason.
317
318        I.e. exercise codepaths used for reading CBOR from the body.
319        """
320        data = await read_encoded(
321            self.clock, request, SERVER_SCHEMAS["advise_corrupt_share"]
322        )
323        return data["reason"]
324
325
326def result_of(d):
327    """
328    Synchronously extract the result of a Deferred.
329    """
330    result = []
331    error = []
332    d.addCallbacks(result.append, error.append)
333    if result:
334        return result[0]
335    if error:
336        error[0].raiseException()
337    raise RuntimeError(
338        "We expected given Deferred to have result already, but it wasn't. "
339        + "This is probably a test design issue."
340    )
341
342
343class CustomHTTPServerTests(SyncTestCase):
344    """
345    Tests that use a custom HTTP server.
346    """
347
348    def setUp(self):
349        super(CustomHTTPServerTests, self).setUp()
350        disable_thread_pool_for_test(self)
351        StorageClientFactory.start_test_mode(
352            lambda pool: self.addCleanup(pool.closeCachedConnections)
353        )
354        self.addCleanup(StorageClientFactory.stop_test_mode)
355        # Could be a fixture, but will only be used in this test class so not
356        # going to bother:
357        self._http_server = TestApp()
358        treq = StubTreq(self._http_server._app.resource())
359        self.client = StorageClient(
360            DecodedURL.from_text("http://127.0.0.1"),
361            SWISSNUM_FOR_TEST,
362            treq=treq,
363            pool=None,
364            # We're using a Treq private API to get the reactor, alas, but only
365            # in a test, so not going to worry about it too much. This would be
366            # fixed if https://github.com/twisted/treq/issues/226 were ever
367            # fixed.
368            clock=treq._agent._memoryReactor,
369            analyze_response=response_is_not_html,
370        )
371        self._http_server.clock = self.client._clock
372
373    def test_bad_swissnum_from_client(self) -> None:
374        """
375        If the swissnum is invalid, a BAD REQUEST response code is returned.
376        """
377        headers = Headers()
378        # The value is not UTF-8.
379        headers.addRawHeader("Authorization", b"\x00\xFF\x00\xFF")
380        response = result_of(
381            self.client._treq.request(
382                "GET",
383                DecodedURL.from_text("http://127.0.0.1/noop"),
384                headers=headers,
385            )
386        )
387        self.assertEqual(response.code, 400)
388
389    def test_bad_secret(self) -> None:
390        """
391        If the secret is invalid (not base64), a BAD REQUEST
392        response code is returned.
393        """
394        bad_secret = b"upload-secret []<>"
395        headers = Headers()
396        headers.addRawHeader(
397            "X-Tahoe-Authorization",
398            bad_secret,
399        )
400        response = result_of(
401            self.client.request(
402                "GET",
403                DecodedURL.from_text("http://127.0.0.1/upload_secret"),
404                headers=headers,
405            )
406        )
407        self.assertEqual(response.code, 400)
408
409    def test_authorization_enforcement(self):
410        """
411        The requirement for secrets is enforced by the ``_authorized_route``
412        decorator; if they are not given, a 400 response code is returned.
413
414        Note that this refers to ``X-Tahoe-Authorization``, not the
415        ``Authorization`` header used for the swissnum.
416        """
417        # Without secret, get a 400 error.
418        response = result_of(
419            self.client.request(
420                "GET",
421                DecodedURL.from_text("http://127.0.0.1/upload_secret"),
422            )
423        )
424        self.assertEqual(response.code, 400)
425
426        # With secret, we're good.
427        response = result_of(
428            self.client.request(
429                "GET",
430                DecodedURL.from_text("http://127.0.0.1/upload_secret"),
431                upload_secret=b"MAGIC",
432            )
433        )
434        self.assertEqual(response.code, 200)
435        self.assertEqual(result_of(response.content()), b"GOOD SECRET")
436
437    def test_client_side_schema_validation(self):
438        """
439        The client validates returned CBOR message against a schema.
440        """
441        client = StorageClientGeneral(self.client)
442        with self.assertRaises(CDDLValidationError):
443            result_of(client.get_version())
444
445    @given(length=st.integers(min_value=1, max_value=1_000_000))
446    # On Python 3.12 we're getting weird deadline issues in CI, so disabling
447    # for now.
448    @hypothesis_settings(deadline=None)
449    def test_limited_content_fits(self, length):
450        """
451        ``http_client.limited_content()`` returns the body if it is less than
452        the max length.
453        """
454        for at_least_length in (length, length + 1, length + 1000, length + 100_000):
455            response = result_of(
456                self.client.request(
457                    "GET",
458                    DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
459                )
460            )
461
462            self.assertEqual(
463                result_of(
464                    limited_content(response, self._http_server.clock, at_least_length)
465                ).read(),
466                gen_bytes(length),
467            )
468
469    @given(length=st.integers(min_value=10, max_value=1_000_000))
470    def test_limited_content_does_not_fit(self, length):
471        """
472        If the body is longer than than max length,
473        ``http_client.limited_content()`` fails with a ``ValueError``.
474        """
475        for too_short in (length - 1, 5):
476            response = result_of(
477                self.client.request(
478                    "GET",
479                    DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
480                )
481            )
482
483            with self.assertRaises(ValueError):
484                result_of(limited_content(response, self._http_server.clock, too_short))
485
486    def test_limited_content_silence_causes_timeout(self):
487        """
488        ``http_client.limited_content() times out if it receives no data for 60
489        seconds.
490        """
491        response = result_of(
492            self.client.request(
493                "GET",
494                DecodedURL.from_text("http://127.0.0.1/slowly_never_finish_result"),
495            )
496        )
497
498        body_deferred = limited_content(response, self._http_server.clock, 4)
499        result = []
500        error = []
501        body_deferred.addCallbacks(result.append, error.append)
502
503        for i in range(59 + 59 + 60):
504            self.assertEqual((result, error), ([], []))
505            self._http_server.clock.advance(1)
506            # Push data between in-memory client and in-memory server:
507            self.client._treq._agent.flush()
508
509        # After 59 (second write) + 59 (third write) + 60 seconds (quiescent
510        # timeout) the limited_content() response times out.
511        self.assertTrue(error)
512        with self.assertRaises(CancelledError):
513            error[0].raiseException()
514
515    def test_limited_content_cancels_timeout_on_failed_response(self):
516        """
517        If the response fails somehow, the timeout is still cancelled.
518        """
519        response = result_of(
520            self.client.request(
521                "GET",
522                DecodedURL.from_text("http://127.0.0.1/die"),
523            )
524        )
525
526        d = limited_content(response, self._http_server.clock, 4)
527        with self.assertRaises(ValueError):
528            result_of(d)
529        self.assertEqual(len(self._http_server.clock.getDelayedCalls()), 0)
530
531    def test_request_with_no_content_type_same_as_cbor(self):
532        """
533        If no ``Content-Type`` header is set when sending a body, it is assumed
534        to be CBOR.
535        """
536        response = result_of(
537            self.client.request(
538                "POST",
539                DecodedURL.from_text("http://127.0.0.1/read_body"),
540                data=dumps({"reason": "test"}),
541            )
542        )
543        self.assertEqual(
544            result_of(limited_content(response, self._http_server.clock, 100)).read(),
545            b"test",
546        )
547
548    def test_request_with_wrong_content(self):
549        """
550        If a non-CBOR ``Content-Type`` header is set when sending a body, the
551        server complains appropriatly.
552        """
553        headers = Headers()
554        headers.setRawHeaders("content-type", ["some/value"])
555        response = result_of(
556            self.client.request(
557                "POST",
558                DecodedURL.from_text("http://127.0.0.1/read_body"),
559                data=dumps({"reason": "test"}),
560                headers=headers,
561            )
562        )
563        self.assertEqual(response.code, http.UNSUPPORTED_MEDIA_TYPE)
564
565
566@implementer(IReactorFromThreads)
567class Reactor(Clock):
568    """
569    Fake reactor that supports time APIs and callFromThread.
570
571    Advancing the clock also runs any callbacks scheduled via callFromThread.
572    """
573
574    def __init__(self):
575        Clock.__init__(self)
576        self._queue = Queue()
577
578    def callFromThread(self, callable, *args, **kwargs):
579        self._queue.put((callable, args, kwargs))
580
581    def advance(self, *args, **kwargs):
582        Clock.advance(self, *args, **kwargs)
583        while not self._queue.empty():
584            f, args, kwargs = self._queue.get()
585            f(*args, **kwargs)
586
587
588class HttpTestFixture(Fixture):
589    """
590    Setup HTTP tests' infrastructure, the storage server and corresponding
591    client.
592    """
593
594    def _setUp(self):
595        StorageClientFactory.start_test_mode(
596            lambda pool: self.addCleanup(pool.closeCachedConnections)
597        )
598        self.addCleanup(StorageClientFactory.stop_test_mode)
599        self.clock = Reactor()
600        self.tempdir = self.useFixture(TempDir())
601        # The global Cooperator used by Twisted (a) used by pull producers in
602        # twisted.web, (b) is driven by a real reactor. We want to push time
603        # forward ourselves since we rely on pull producers in the HTTP storage
604        # server.
605        self.mock = self.useFixture(
606            MonkeyPatch(
607                "twisted.internet.task._theCooperator",
608                Cooperator(scheduler=lambda c: self.clock.callLater(0.000001, c)),
609            )
610        )
611        self.storage_server = StorageServer(
612            self.tempdir.path, b"\x00" * 20, clock=self.clock
613        )
614        self.http_server = HTTPServer(
615            self.clock, self.storage_server, SWISSNUM_FOR_TEST
616        )
617        self.treq = StubTreq(self.http_server.get_resource())
618        self.client = StorageClient(
619            DecodedURL.from_text("http://127.0.0.1"),
620            SWISSNUM_FOR_TEST,
621            treq=self.treq,
622            pool=None,
623            clock=self.clock,
624            analyze_response=response_is_not_html,
625        )
626
627    def result_of_with_flush(self, d):
628        """
629        Like ``result_of``, but supports fake reactor and ``treq`` testing
630        infrastructure necessary to support asynchronous HTTP server endpoints.
631        """
632        d = ensureDeferred(d)
633        result = []
634        error = []
635        d.addCallbacks(result.append, error.append)
636
637        # Check for synchronous HTTP endpoint handler:
638        if result:
639            return result[0]
640        if error:
641            error[0].raiseException()
642
643        # OK, no result yet, probably async HTTP endpoint handler, so advance
644        # time, flush treq, and try again:
645        for i in range(10_000):
646            self.clock.advance(0.001)
647            self.treq.flush()
648            if result:
649                break
650            # By putting the sleep at the end, tests that are completely
651            # synchronous and don't use threads will have already broken out of
652            # the loop, and so will finish without any sleeps. This allows them
653            # to run as quickly as possible.
654            #
655            # However, some tests do talk to APIs that use a thread pool on the
656            # backend, so we need to allow actual time to pass for those.
657            time.sleep(0.001)
658
659        if result:
660            return result[0]
661        if error:
662            error[0].raiseException()
663
664        raise RuntimeError(
665            "We expected given Deferred to have result already, but it wasn't. "
666            + "This is probably a test design issue."
667        )
668
669
670class StorageClientWithHeadersOverride:
671    """Wrap ``StorageClient`` and override sent headers."""
672
673    def __init__(self, storage_client, add_headers):
674        self.storage_client = storage_client
675        self.add_headers = add_headers
676
677    def __getattr__(self, attr):
678        return getattr(self.storage_client, attr)
679
680    def request(self, *args, headers=None, **kwargs):
681        if headers is None:
682            headers = Headers()
683        for key, value in self.add_headers.items():
684            headers.setRawHeaders(key, [value])
685        return self.storage_client.request(*args, headers=headers, **kwargs)
686
687
688@contextmanager
689def assert_fails_with_http_code(test_case: SyncTestCase, code: int):
690    """
691    Context manager that asserts the code fails with the given HTTP response
692    code.
693    """
694    with test_case.assertRaises(ClientException) as e:
695        try:
696            yield
697        finally:
698            pass
699    test_case.assertEqual(e.exception.code, code)
700
701
702class GenericHTTPAPITests(SyncTestCase):
703    """
704    Tests of HTTP client talking to the HTTP server, for generic HTTP API
705    endpoints and concerns.
706    """
707
708    def setUp(self):
709        super(GenericHTTPAPITests, self).setUp()
710        disable_thread_pool_for_test(self)
711        self.http = self.useFixture(HttpTestFixture())
712
713    def test_missing_authentication(self) -> None:
714        """
715        If nothing is given in the ``Authorization`` header at all an
716        ``Unauthorized`` response is returned.
717        """
718        client = HTTPClient(
719            RequestTraversalAgent(self.http.http_server.get_resource())
720        )
721        response = self.http.result_of_with_flush(
722            client.request(
723                "GET",
724                "http://127.0.0.1/storage/v1/version",
725            ),
726        )
727        self.assertThat(response.code, Equals(http.UNAUTHORIZED))
728
729    def test_bad_authentication(self):
730        """
731        If the wrong swissnum is used, an ``Unauthorized`` response code is
732        returned.
733        """
734        client = StorageClientGeneral(
735            StorageClient(
736                DecodedURL.from_text("http://127.0.0.1"),
737                b"something wrong",
738                treq=StubTreq(self.http.http_server.get_resource()),
739                pool=None,
740                clock=self.http.clock,
741                analyze_response=response_is_not_html,
742            )
743        )
744        with assert_fails_with_http_code(self, http.UNAUTHORIZED):
745            self.http.result_of_with_flush(client.get_version())
746
747    def test_unsupported_mime_type(self):
748        """
749        The client can request mime types other than CBOR, and if they are
750        unsupported a NOT ACCEPTABLE (406) error will be returned.
751        """
752        client = StorageClientGeneral(
753            StorageClientWithHeadersOverride(self.http.client, {"accept": "image/gif"})
754        )
755        with assert_fails_with_http_code(self, http.NOT_ACCEPTABLE):
756            self.http.result_of_with_flush(client.get_version())
757
758    def test_version(self):
759        """
760        The client can return the version.
761
762        We ignore available disk space and max immutable share size, since that
763        might change across calls.
764        """
765        client = StorageClientGeneral(self.http.client)
766        version = self.http.result_of_with_flush(client.get_version())
767        version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
768            b"available-space"
769        )
770        version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
771            b"maximum-immutable-share-size"
772        )
773        expected_version = self.http.storage_server.get_version()
774        expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
775            b"available-space"
776        )
777        expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
778            b"maximum-immutable-share-size"
779        )
780        self.assertEqual(version, expected_version)
781
782    def test_server_side_schema_validation(self):
783        """
784        Ensure that schema validation is happening: invalid CBOR should result
785        in bad request response code (error 400).
786
787        We don't bother checking every single request, the API on the
788        server-side is designed to require a schema, so it validates
789        everywhere.  But we check at least one to ensure we get correct
790        response code on bad input, so we know validation happened.
791        """
792        upload_secret = urandom(32)
793        lease_secret = urandom(32)
794        storage_index = urandom(16)
795        url = self.http.client.relative_url(
796            "/storage/v1/immutable/" + _encode_si(storage_index)
797        )
798        message = {"bad-message": "missing expected keys"}
799
800        response = self.http.result_of_with_flush(
801            self.http.client.request(
802                "POST",
803                url,
804                lease_renew_secret=lease_secret,
805                lease_cancel_secret=lease_secret,
806                upload_secret=upload_secret,
807                message_to_serialize=message,
808            )
809        )
810        self.assertEqual(response.code, http.BAD_REQUEST)
811
812
813class ImmutableHTTPAPITests(SyncTestCase):
814    """
815    Tests for immutable upload/download APIs.
816    """
817
818    def setUp(self):
819        super(ImmutableHTTPAPITests, self).setUp()
820        disable_thread_pool_for_test(self)
821        self.http = self.useFixture(HttpTestFixture())
822        self.imm_client = StorageClientImmutables(self.http.client)
823        self.general_client = StorageClientGeneral(self.http.client)
824
825    def create_upload(self, share_numbers, length):
826        """
827        Create a write bucket on server, return:
828
829            (upload_secret, lease_secret, storage_index, result)
830        """
831        upload_secret = urandom(32)
832        lease_secret = urandom(32)
833        storage_index = urandom(16)
834        created = self.http.result_of_with_flush(
835            self.imm_client.create(
836                storage_index,
837                share_numbers,
838                length,
839                upload_secret,
840                lease_secret,
841                lease_secret,
842            )
843        )
844        return (upload_secret, lease_secret, storage_index, created)
845
846    def test_upload_can_be_downloaded(self):
847        """
848        A single share can be uploaded in (possibly overlapping) chunks, and
849        then a random chunk can be downloaded, and it will match the original
850        file.
851
852        We don't exercise the full variation of overlapping chunks because
853        that's already done in test_storage.py.
854        """
855        length = 100
856        expected_data = bytes(range(100))
857
858        # Create a upload:
859        (upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
860        self.assertEqual(
861            created, ImmutableCreateResult(already_have=set(), allocated={1})
862        )
863
864        remaining = RangeMap()
865        remaining.set(True, 0, 100)
866
867        # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
868        def write(offset, length):
869            remaining.empty(offset, offset + length)
870            return self.imm_client.write_share_chunk(
871                storage_index,
872                1,
873                upload_secret,
874                offset,
875                expected_data[offset : offset + length],
876            )
877
878        upload_progress = self.http.result_of_with_flush(write(10, 10))
879        self.assertEqual(
880            upload_progress, UploadProgress(finished=False, required=remaining)
881        )
882        upload_progress = self.http.result_of_with_flush(write(30, 10))
883        self.assertEqual(
884            upload_progress, UploadProgress(finished=False, required=remaining)
885        )
886        upload_progress = self.http.result_of_with_flush(write(50, 10))
887        self.assertEqual(
888            upload_progress, UploadProgress(finished=False, required=remaining)
889        )
890
891        # Then, an overlapping write with matching data (15-35):
892        upload_progress = self.http.result_of_with_flush(write(15, 20))
893        self.assertEqual(
894            upload_progress, UploadProgress(finished=False, required=remaining)
895        )
896
897        # Now fill in the holes:
898        upload_progress = self.http.result_of_with_flush(write(0, 10))
899        self.assertEqual(
900            upload_progress, UploadProgress(finished=False, required=remaining)
901        )
902        upload_progress = self.http.result_of_with_flush(write(40, 10))
903        self.assertEqual(
904            upload_progress, UploadProgress(finished=False, required=remaining)
905        )
906        upload_progress = self.http.result_of_with_flush(write(60, 40))
907        self.assertEqual(
908            upload_progress, UploadProgress(finished=True, required=RangeMap())
909        )
910
911        # We can now read:
912        for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
913            downloaded = self.http.result_of_with_flush(
914                self.imm_client.read_share_chunk(storage_index, 1, offset, length)
915            )
916            self.assertEqual(downloaded, expected_data[offset : offset + length])
917
918    def test_write_with_wrong_upload_key(self):
919        """
920        A write with an upload key that is different than the original upload
921        key will fail.
922        """
923        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
924        with assert_fails_with_http_code(self, http.UNAUTHORIZED):
925            self.http.result_of_with_flush(
926                self.imm_client.write_share_chunk(
927                    storage_index,
928                    1,
929                    upload_secret + b"X",
930                    0,
931                    b"123",
932                )
933            )
934
935    def test_allocate_buckets_second_time_different_shares(self):
936        """
937        If allocate buckets endpoint is called second time with different
938        upload key on potentially different shares, that creates the buckets on
939        those shares that are different.
940        """
941        # Create a upload:
942        (upload_secret, lease_secret, storage_index, created) = self.create_upload(
943            {1, 2, 3}, 100
944        )
945
946        # Write half of share 1
947        self.http.result_of_with_flush(
948            self.imm_client.write_share_chunk(
949                storage_index,
950                1,
951                upload_secret,
952                0,
953                b"a" * 50,
954            )
955        )
956
957        # Add same shares with a different upload key share 1 overlaps with
958        # existing shares, this call shouldn't overwrite the existing
959        # work-in-progress.
960        upload_secret2 = b"x" * 2
961        created2 = self.http.result_of_with_flush(
962            self.imm_client.create(
963                storage_index,
964                {1, 4, 6},
965                100,
966                upload_secret2,
967                lease_secret,
968                lease_secret,
969            )
970        )
971        self.assertEqual(created2.allocated, {4, 6})
972
973        # Write second half of share 1
974        self.assertTrue(
975            self.http.result_of_with_flush(
976                self.imm_client.write_share_chunk(
977                    storage_index,
978                    1,
979                    upload_secret,
980                    50,
981                    b"b" * 50,
982                )
983            ).finished
984        )
985
986        # The upload of share 1 succeeded, demonstrating that second create()
987        # call didn't overwrite work-in-progress.
988        downloaded = self.http.result_of_with_flush(
989            self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
990        )
991        self.assertEqual(downloaded, b"a" * 50 + b"b" * 50)
992
993        # We can successfully upload the shares created with the second upload secret.
994        self.assertTrue(
995            self.http.result_of_with_flush(
996                self.imm_client.write_share_chunk(
997                    storage_index,
998                    4,
999                    upload_secret2,
1000                    0,
1001                    b"x" * 100,
1002                )
1003            ).finished
1004        )
1005
1006    def test_list_shares(self):
1007        """
1008        Once a share is finished uploading, it's possible to list it.
1009        """
1010        (upload_secret, _, storage_index, created) = self.create_upload({1, 2, 3}, 10)
1011
1012        # Initially there are no shares:
1013        self.assertEqual(
1014            self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
1015            set(),
1016        )
1017
1018        # Upload shares 1 and 3:
1019        for share_number in [1, 3]:
1020            progress = self.http.result_of_with_flush(
1021                self.imm_client.write_share_chunk(
1022                    storage_index,
1023                    share_number,
1024                    upload_secret,
1025                    0,
1026                    b"0123456789",
1027                )
1028            )
1029            self.assertTrue(progress.finished)
1030
1031        # Now shares 1 and 3 exist:
1032        self.assertEqual(
1033            self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
1034            {1, 3},
1035        )
1036
1037    def test_upload_bad_content_range(self):
1038        """
1039        Malformed or invalid Content-Range headers to the immutable upload
1040        endpoint result in a 416 error.
1041        """
1042        (upload_secret, _, storage_index, created) = self.create_upload({1}, 10)
1043
1044        def check_invalid(bad_content_range_value):
1045            client = StorageClientImmutables(
1046                StorageClientWithHeadersOverride(
1047                    self.http.client, {"content-range": bad_content_range_value}
1048                )
1049            )
1050            with assert_fails_with_http_code(
1051                self, http.REQUESTED_RANGE_NOT_SATISFIABLE
1052            ):
1053                self.http.result_of_with_flush(
1054                    client.write_share_chunk(
1055                        storage_index,
1056                        1,
1057                        upload_secret,
1058                        0,
1059                        b"0123456789",
1060                    )
1061                )
1062
1063        check_invalid("not a valid content-range header at all")
1064        check_invalid("bytes -1-9/10")
1065        check_invalid("bytes 0--9/10")
1066        check_invalid("teapots 0-9/10")
1067
1068    def test_list_shares_unknown_storage_index(self):
1069        """
1070        Listing unknown storage index's shares results in empty list of shares.
1071        """
1072        storage_index = bytes(range(16))
1073        self.assertEqual(
1074            self.http.result_of_with_flush(self.imm_client.list_shares(storage_index)),
1075            set(),
1076        )
1077
1078    def test_upload_non_existent_storage_index(self):
1079        """
1080        Uploading to a non-existent storage index or share number results in
1081        404.
1082        """
1083        (upload_secret, _, storage_index, _) = self.create_upload({1}, 10)
1084
1085        def unknown_check(storage_index, share_number):
1086            with assert_fails_with_http_code(self, http.NOT_FOUND):
1087                self.http.result_of_with_flush(
1088                    self.imm_client.write_share_chunk(
1089                        storage_index,
1090                        share_number,
1091                        upload_secret,
1092                        0,
1093                        b"0123456789",
1094                    )
1095                )
1096
1097        # Wrong share number:
1098        unknown_check(storage_index, 7)
1099        # Wrong storage index:
1100        unknown_check(b"X" * 16, 7)
1101
1102    def test_multiple_shares_uploaded_to_different_place(self):
1103        """
1104        If a storage index has multiple shares, uploads to different shares are
1105        stored separately and can be downloaded separately.
1106        """
1107        (upload_secret, _, storage_index, _) = self.create_upload({1, 2}, 10)
1108        self.http.result_of_with_flush(
1109            self.imm_client.write_share_chunk(
1110                storage_index,
1111                1,
1112                upload_secret,
1113                0,
1114                b"1" * 10,
1115            )
1116        )
1117        self.http.result_of_with_flush(
1118            self.imm_client.write_share_chunk(
1119                storage_index,
1120                2,
1121                upload_secret,
1122                0,
1123                b"2" * 10,
1124            )
1125        )
1126        self.assertEqual(
1127            self.http.result_of_with_flush(
1128                self.imm_client.read_share_chunk(storage_index, 1, 0, 10)
1129            ),
1130            b"1" * 10,
1131        )
1132        self.assertEqual(
1133            self.http.result_of_with_flush(
1134                self.imm_client.read_share_chunk(storage_index, 2, 0, 10)
1135            ),
1136            b"2" * 10,
1137        )
1138
1139    def test_mismatching_upload_fails(self):
1140        """
1141        If an uploaded chunk conflicts with an already uploaded chunk, a
1142        CONFLICT error is returned.
1143        """
1144        (upload_secret, _, storage_index, created) = self.create_upload({1}, 100)
1145
1146        # Write:
1147        self.http.result_of_with_flush(
1148            self.imm_client.write_share_chunk(
1149                storage_index,
1150                1,
1151                upload_secret,
1152                0,
1153                b"0" * 10,
1154            )
1155        )
1156
1157        # Conflicting write:
1158        with assert_fails_with_http_code(self, http.CONFLICT):
1159            self.http.result_of_with_flush(
1160                self.imm_client.write_share_chunk(
1161                    storage_index,
1162                    1,
1163                    upload_secret,
1164                    0,
1165                    b"0123456789",
1166                )
1167            )
1168
1169    def test_timed_out_upload_allows_reupload(self):
1170        """
1171        If an in-progress upload times out, it is cancelled altogether,
1172        allowing a new upload to occur.
1173        """
1174        self._test_abort_or_timed_out_upload_to_existing_storage_index(
1175            lambda **kwargs: self.http.clock.advance(30 * 60 + 1)
1176        )
1177
1178    def test_abort_upload_allows_reupload(self):
1179        """
1180        If an in-progress upload is aborted, it is cancelled altogether,
1181        allowing a new upload to occur.
1182        """
1183
1184        def abort(storage_index, share_number, upload_secret):
1185            return self.http.result_of_with_flush(
1186                self.imm_client.abort_upload(storage_index, share_number, upload_secret)
1187            )
1188
1189        self._test_abort_or_timed_out_upload_to_existing_storage_index(abort)
1190
1191    def _test_abort_or_timed_out_upload_to_existing_storage_index(self, cancel_upload):
1192        """Start uploading to an existing storage index that then times out or aborts.
1193
1194        Re-uploading should work.
1195        """
1196        # Start an upload:
1197        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
1198        self.http.result_of_with_flush(
1199            self.imm_client.write_share_chunk(
1200                storage_index,
1201                1,
1202                upload_secret,
1203                0,
1204                b"123",
1205            )
1206        )
1207
1208        # Now, the upload is cancelled somehow:
1209        cancel_upload(
1210            storage_index=storage_index, upload_secret=upload_secret, share_number=1
1211        )
1212
1213        # Now we can create a new share with the same storage index without
1214        # complaint:
1215        upload_secret = urandom(32)
1216        lease_secret = urandom(32)
1217        created = self.http.result_of_with_flush(
1218            self.imm_client.create(
1219                storage_index,
1220                {1},
1221                100,
1222                upload_secret,
1223                lease_secret,
1224                lease_secret,
1225            )
1226        )
1227        self.assertEqual(created.allocated, {1})
1228
1229        # And write to it, too:
1230        self.http.result_of_with_flush(
1231            self.imm_client.write_share_chunk(
1232                storage_index,
1233                1,
1234                upload_secret,
1235                0,
1236                b"ABC",
1237            )
1238        )
1239
1240    def test_unknown_aborts(self):
1241        """
1242        Aborting uploads with an unknown storage index or share number will
1243        result 404 HTTP response code.
1244        """
1245        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
1246
1247        for si, num in [(storage_index, 3), (b"x" * 16, 1)]:
1248            with assert_fails_with_http_code(self, http.NOT_FOUND):
1249                self.http.result_of_with_flush(
1250                    self.imm_client.abort_upload(si, num, upload_secret)
1251                )
1252
1253    def test_unauthorized_abort(self):
1254        """
1255        An abort with the wrong key will return an unauthorized error, and will
1256        not abort the upload.
1257        """
1258        (upload_secret, _, storage_index, _) = self.create_upload({1}, 100)
1259
1260        # Failed to abort becaues wrong upload secret:
1261        with assert_fails_with_http_code(self, http.UNAUTHORIZED):
1262            self.http.result_of_with_flush(
1263                self.imm_client.abort_upload(storage_index, 1, upload_secret + b"X")
1264            )
1265
1266        # We can still write to it:
1267        self.http.result_of_with_flush(
1268            self.imm_client.write_share_chunk(
1269                storage_index,
1270                1,
1271                upload_secret,
1272                0,
1273                b"ABC",
1274            )
1275        )
1276
1277    def test_too_late_abort(self):
1278        """
1279        An abort of an already-fully-uploaded immutable will result in 405
1280        error and will not affect the immutable.
1281        """
1282        uploaded_data = b"123"
1283        (upload_secret, _, storage_index, _) = self.create_upload({0}, 3)
1284        self.http.result_of_with_flush(
1285            self.imm_client.write_share_chunk(
1286                storage_index,
1287                0,
1288                upload_secret,
1289                0,
1290                uploaded_data,
1291            )
1292        )
1293
1294        # Can't abort, we finished upload:
1295        with assert_fails_with_http_code(self, http.NOT_ALLOWED):
1296            self.http.result_of_with_flush(
1297                self.imm_client.abort_upload(storage_index, 0, upload_secret)
1298            )
1299
1300        # Abort didn't prevent reading:
1301        self.assertEqual(
1302            uploaded_data,
1303            self.http.result_of_with_flush(
1304                self.imm_client.read_share_chunk(
1305                    storage_index,
1306                    0,
1307                    0,
1308                    3,
1309                )
1310            ),
1311        )
1312
1313    def test_lease_on_unknown_storage_index(self):
1314        """
1315        An attempt to renew an unknown storage index will result in a HTTP 404.
1316        """
1317        storage_index = urandom(16)
1318        secret = b"A" * 32
1319        with assert_fails_with_http_code(self, http.NOT_FOUND):
1320            self.http.result_of_with_flush(
1321                self.general_client.add_or_renew_lease(storage_index, secret, secret)
1322            )
1323
1324
1325class MutableHTTPAPIsTests(SyncTestCase):
1326    """Tests for mutable APIs."""
1327
1328    def setUp(self):
1329        super(MutableHTTPAPIsTests, self).setUp()
1330        disable_thread_pool_for_test(self)
1331        self.http = self.useFixture(HttpTestFixture())
1332        self.mut_client = StorageClientMutables(self.http.client)
1333
1334    def create_upload(self, data=b"abcdef"):
1335        """
1336        Utility that creates shares 0 and 1 with bodies
1337        ``{data}-{share_number}``.
1338        """
1339        write_secret = urandom(32)
1340        lease_secret = urandom(32)
1341        storage_index = urandom(16)
1342        self.http.result_of_with_flush(
1343            self.mut_client.read_test_write_chunks(
1344                storage_index,
1345                write_secret,
1346                lease_secret,
1347                lease_secret,
1348                {
1349                    0: TestWriteVectors(
1350                        write_vectors=[WriteVector(offset=0, data=data + b"-0")]
1351                    ),
1352                    1: TestWriteVectors(
1353                        write_vectors=[
1354                            WriteVector(offset=0, data=data),
1355                            WriteVector(offset=len(data), data=b"-1"),
1356                        ]
1357                    ),
1358                },
1359                [],
1360            )
1361        )
1362        return storage_index, write_secret, lease_secret
1363
1364    def test_write_can_be_read_small_data(self):
1365        """
1366        Small written data can be read using ``read_share_chunk``.
1367        """
1368        self.write_can_be_read(b"abcdef")
1369
1370    def test_write_can_be_read_large_data(self):
1371        """
1372        Large written data (50MB) can be read using ``read_share_chunk``.
1373        """
1374        self.write_can_be_read(b"abcdefghij" * 5 * 1024 * 1024)
1375
1376    def write_can_be_read(self, data):
1377        """
1378        Written data can be read using ``read_share_chunk``.
1379        """
1380        lease_secret = urandom(32)
1381        storage_index = urandom(16)
1382        self.http.result_of_with_flush(
1383            self.mut_client.read_test_write_chunks(
1384                storage_index,
1385                urandom(32),
1386                lease_secret,
1387                lease_secret,
1388                {
1389                    0: TestWriteVectors(
1390                        write_vectors=[WriteVector(offset=0, data=data)]
1391                    ),
1392                },
1393                [],
1394            )
1395        )
1396        read_data = self.http.result_of_with_flush(
1397            self.mut_client.read_share_chunk(storage_index, 0, 0, len(data))
1398        )
1399        self.assertEqual(read_data, data)
1400
1401    def test_read_before_write(self):
1402        """In combo read/test/write operation, reads happen before writes."""
1403        storage_index, write_secret, lease_secret = self.create_upload()
1404        result = self.http.result_of_with_flush(
1405            self.mut_client.read_test_write_chunks(
1406                storage_index,
1407                write_secret,
1408                lease_secret,
1409                lease_secret,
1410                {
1411                    0: TestWriteVectors(
1412                        write_vectors=[WriteVector(offset=1, data=b"XYZ")]
1413                    ),
1414                },
1415                [ReadVector(0, 8)],
1416            )
1417        )
1418        # Reads are from before the write:
1419        self.assertEqual(
1420            result,
1421            ReadTestWriteResult(
1422                success=True, reads={0: [b"abcdef-0"], 1: [b"abcdef-1"]}
1423            ),
1424        )
1425        # But the write did happen:
1426        data0 = self.http.result_of_with_flush(
1427            self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
1428        )
1429        data1 = self.http.result_of_with_flush(
1430            self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
1431        )
1432        self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1"))
1433
1434    def test_conditional_write(self):
1435        """Uploads only happen if the test passes."""
1436        storage_index, write_secret, lease_secret = self.create_upload()
1437        result_failed = self.http.result_of_with_flush(
1438            self.mut_client.read_test_write_chunks(
1439                storage_index,
1440                write_secret,
1441                lease_secret,
1442                lease_secret,
1443                {
1444                    0: TestWriteVectors(
1445                        test_vectors=[TestVector(1, 4, b"FAIL")],
1446                        write_vectors=[WriteVector(offset=1, data=b"XYZ")],
1447                    ),
1448                },
1449                [],
1450            )
1451        )
1452        self.assertFalse(result_failed.success)
1453
1454        # This time the test matches:
1455        result = self.http.result_of_with_flush(
1456            self.mut_client.read_test_write_chunks(
1457                storage_index,
1458                write_secret,
1459                lease_secret,
1460                lease_secret,
1461                {
1462                    0: TestWriteVectors(
1463                        test_vectors=[TestVector(1, 4, b"bcde")],
1464                        write_vectors=[WriteVector(offset=1, data=b"XYZ")],
1465                    ),
1466                },
1467                [ReadVector(0, 8)],
1468            )
1469        )
1470        self.assertTrue(result.success)
1471        self.assertEqual(
1472            self.http.result_of_with_flush(
1473                self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
1474            ),
1475            b"aXYZef-0",
1476        )
1477
1478    def test_list_shares(self):
1479        """``list_shares()`` returns the shares for a given storage index."""
1480        storage_index, _, _ = self.create_upload()
1481        self.assertEqual(
1482            self.http.result_of_with_flush(self.mut_client.list_shares(storage_index)),
1483            {0, 1},
1484        )
1485
1486    def test_non_existent_list_shares(self):
1487        """A non-existent storage index errors when shares are listed."""
1488        with self.assertRaises(ClientException) as exc:
1489            self.http.result_of_with_flush(self.mut_client.list_shares(urandom(32)))
1490        self.assertEqual(exc.exception.code, http.NOT_FOUND)
1491
1492    def test_wrong_write_enabler(self):
1493        """Writes with the wrong write enabler fail, and are not processed."""
1494        storage_index, write_secret, lease_secret = self.create_upload()
1495        with self.assertRaises(ClientException) as exc:
1496            self.http.result_of_with_flush(
1497                self.mut_client.read_test_write_chunks(
1498                    storage_index,
1499                    urandom(32),
1500                    lease_secret,
1501                    lease_secret,
1502                    {
1503                        0: TestWriteVectors(
1504                            write_vectors=[WriteVector(offset=1, data=b"XYZ")]
1505                        ),
1506                    },
1507                    [ReadVector(0, 8)],
1508                )
1509            )
1510        self.assertEqual(exc.exception.code, http.UNAUTHORIZED)
1511
1512        # The write did not happen:
1513        self.assertEqual(
1514            self.http.result_of_with_flush(
1515                self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
1516            ),
1517            b"abcdef-0",
1518        )
1519
1520
1521class SharedImmutableMutableTestsMixin:
1522    """
1523    Shared tests for mutables and immutables where the API is the same.
1524    """
1525
1526    KIND: str  # either "mutable" or "immutable"
1527    general_client: StorageClientGeneral
1528    client: Union[StorageClientImmutables, StorageClientMutables]
1529    clientFactory: Callable[
1530        [StorageClient], Union[StorageClientImmutables, StorageClientMutables]
1531    ]
1532
1533    def upload(self, share_number: int, data_length=26) -> Tuple[bytes, bytes, bytes]:
1534        """
1535        Create a share, return (storage_index, uploaded_data, lease secret).
1536        """
1537        raise NotImplementedError
1538
1539    def get_leases(self, storage_index: bytes) -> Iterable[LeaseInfo]:
1540        """Get leases for the storage index."""
1541        raise NotImplementedError()
1542
1543    def test_advise_corrupt_share(self):
1544        """
1545        Advising share was corrupted succeeds from HTTP client's perspective,
1546        and calls appropriate method on server.
1547        """
1548        corrupted = []
1549        self.http.storage_server.advise_corrupt_share = lambda *args: corrupted.append(
1550            args
1551        )
1552
1553        storage_index, _, _ = self.upload(13)
1554        reason = "OHNO \u1235"
1555        self.http.result_of_with_flush(
1556            self.client.advise_corrupt_share(storage_index, 13, reason)
1557        )
1558
1559        self.assertEqual(
1560            corrupted,
1561            [(self.KIND.encode("ascii"), storage_index, 13, reason.encode("utf-8"))],
1562        )
1563
1564    def test_advise_corrupt_share_unknown(self):
1565        """
1566        Advising an unknown share was corrupted results in 404.
1567        """
1568        storage_index, _, _ = self.upload(13)
1569        reason = "OHNO \u1235"
1570        self.http.result_of_with_flush(
1571            self.client.advise_corrupt_share(storage_index, 13, reason)
1572        )
1573
1574        for si, share_number in [(storage_index, 11), (urandom(16), 13)]:
1575            with assert_fails_with_http_code(self, http.NOT_FOUND):
1576                self.http.result_of_with_flush(
1577                    self.client.advise_corrupt_share(si, share_number, reason)
1578                )
1579
1580    def test_lease_renew_and_add(self):
1581        """
1582        It's possible the renew the lease on an uploaded mutable/immutable, by
1583        using the same renewal secret, or add a new lease by choosing a
1584        different renewal secret.
1585        """
1586        # Create a storage index:
1587        storage_index, _, lease_secret = self.upload(0)
1588
1589        [lease] = self.get_leases(storage_index)
1590        initial_expiration_time = lease.get_expiration_time()
1591
1592        # Time passes:
1593        self.http.clock.advance(167)
1594
1595        # We renew the lease:
1596        self.http.result_of_with_flush(
1597            self.general_client.add_or_renew_lease(
1598                storage_index, lease_secret, lease_secret
1599            )
1600        )
1601
1602        # More time passes:
1603        self.http.clock.advance(10)
1604
1605        # We create a new lease:
1606        lease_secret2 = urandom(32)
1607        self.http.result_of_with_flush(
1608            self.general_client.add_or_renew_lease(
1609                storage_index, lease_secret2, lease_secret2
1610            )
1611        )
1612
1613        [lease1, lease2] = self.get_leases(storage_index)
1614        self.assertEqual(lease1.get_expiration_time(), initial_expiration_time + 167)
1615        self.assertEqual(lease2.get_expiration_time(), initial_expiration_time + 177)
1616
1617    def test_read_of_wrong_storage_index_fails(self):
1618        """
1619        Reading from unknown storage index results in 404.
1620        """
1621        with assert_fails_with_http_code(self, http.NOT_FOUND):
1622            self.http.result_of_with_flush(
1623                self.client.read_share_chunk(
1624                    b"1" * 16,
1625                    1,
1626                    0,
1627                    10,
1628                )
1629            )
1630
1631    def test_read_of_wrong_share_number_fails(self):
1632        """
1633        Reading from unknown storage index results in 404.
1634        """
1635        storage_index, _, _ = self.upload(1)
1636        with assert_fails_with_http_code(self, http.NOT_FOUND):
1637            self.http.result_of_with_flush(
1638                self.client.read_share_chunk(
1639                    storage_index,
1640                    7,  # different share number
1641                    0,
1642                    10,
1643                )
1644            )
1645
1646    def test_read_with_negative_offset_fails(self):
1647        """
1648        Malformed or unsupported Range headers result in 416 (requested range
1649        not satisfiable) error.
1650        """
1651        storage_index, _, _ = self.upload(1)
1652
1653        def check_bad_range(bad_range_value):
1654            client = self.clientFactory(
1655                StorageClientWithHeadersOverride(
1656                    self.http.client, {"range": bad_range_value}
1657                )
1658            )
1659
1660            with assert_fails_with_http_code(
1661                self, http.REQUESTED_RANGE_NOT_SATISFIABLE
1662            ):
1663                self.http.result_of_with_flush(
1664                    client.read_share_chunk(
1665                        storage_index,
1666                        1,
1667                        0,
1668                        10,
1669                    )
1670                )
1671
1672        # Bad unit
1673        check_bad_range("molluscs=0-9")
1674        # Negative offsets
1675        check_bad_range("bytes=-2-9")
1676        check_bad_range("bytes=0--10")
1677        # Negative offset no endpoint
1678        check_bad_range("bytes=-300-")
1679        check_bad_range("bytes=")
1680        # Multiple ranges are currently unsupported, even if they're
1681        # semantically valid under HTTP:
1682        check_bad_range("bytes=0-5, 6-7")
1683        # Ranges without an end are currently unsupported, even if they're
1684        # semantically valid under HTTP.
1685        check_bad_range("bytes=0-")
1686
1687    def _read_with_no_range_test(self, data_length):
1688        """
1689        A read with no range returns the whole mutable/immutable.
1690
1691        Actual test is defined in subclasses, to fix complaints from Hypothesis
1692        about the method having different executors.
1693        """
1694        storage_index, uploaded_data, _ = self.upload(1, data_length)
1695        response = self.http.result_of_with_flush(
1696            self.http.client.request(
1697                "GET",
1698                self.http.client.relative_url(
1699                    "/storage/v1/{}/{}/1".format(self.KIND, _encode_si(storage_index))
1700                ),
1701            )
1702        )
1703        self.assertEqual(response.code, http.OK)
1704        self.assertEqual(
1705            self.http.result_of_with_flush(response.content()), uploaded_data
1706        )
1707
1708    def test_validate_content_range_response_to_read(self):
1709        """
1710        The server responds to ranged reads with an appropriate Content-Range
1711        header.
1712        """
1713        storage_index, _, _ = self.upload(1, 26)
1714
1715        def check_range(requested_range, expected_response):
1716            headers = Headers()
1717            headers.setRawHeaders("range", [requested_range])
1718            response = self.http.result_of_with_flush(
1719                self.http.client.request(
1720                    "GET",
1721                    self.http.client.relative_url(
1722                        "/storage/v1/{}/{}/1".format(
1723                            self.KIND, _encode_si(storage_index)
1724                        )
1725                    ),
1726                    headers=headers,
1727                )
1728            )
1729            self.assertEqual(
1730                response.headers.getRawHeaders("content-range"), [expected_response]
1731            )
1732
1733        check_range("bytes=0-10", "bytes 0-10/*")
1734        check_range("bytes=3-17", "bytes 3-17/*")
1735        # TODO re-enable in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907
1736        # Can't go beyond the end of the mutable/immutable!
1737        # check_range("bytes=10-100", "bytes 10-25/*")
1738
1739
1740class ImmutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
1741    """Shared tests, running on immutables."""
1742
1743    KIND = "immutable"
1744    clientFactory = StorageClientImmutables
1745
1746    def setUp(self):
1747        super(ImmutableSharedTests, self).setUp()
1748        disable_thread_pool_for_test(self)
1749        self.http = self.useFixture(HttpTestFixture())
1750        self.client = self.clientFactory(self.http.client)
1751        self.general_client = StorageClientGeneral(self.http.client)
1752
1753    def upload(self, share_number, data_length=26):
1754        """
1755        Create a share, return (storage_index, uploaded_data, lease_secret).
1756        """
1757        uploaded_data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[
1758            :data_length
1759        ]
1760        upload_secret = urandom(32)
1761        lease_secret = urandom(32)
1762        storage_index = urandom(16)
1763        self.http.result_of_with_flush(
1764            self.client.create(
1765                storage_index,
1766                {share_number},
1767                data_length,
1768                upload_secret,
1769                lease_secret,
1770                lease_secret,
1771            )
1772        )
1773        self.http.result_of_with_flush(
1774            self.client.write_share_chunk(
1775                storage_index,
1776                share_number,
1777                upload_secret,
1778                0,
1779                uploaded_data,
1780            )
1781        )
1782        return storage_index, uploaded_data, lease_secret
1783
1784    def get_leases(self, storage_index):
1785        return self.http.storage_server.get_leases(storage_index)
1786
1787    @given(data_length=st.integers(min_value=1, max_value=300000))
1788    def test_read_with_no_range(self, data_length):
1789        """
1790        A read with no range returns the whole immutable.
1791        """
1792        return self._read_with_no_range_test(data_length)
1793
1794
1795class MutableSharedTests(SharedImmutableMutableTestsMixin, SyncTestCase):
1796    """Shared tests, running on mutables."""
1797
1798    KIND = "mutable"
1799    clientFactory = StorageClientMutables
1800
1801    def setUp(self):
1802        super(MutableSharedTests, self).setUp()
1803        disable_thread_pool_for_test(self)
1804        self.http = self.useFixture(HttpTestFixture())
1805        self.client = self.clientFactory(self.http.client)
1806        self.general_client = StorageClientGeneral(self.http.client)
1807
1808    def upload(self, share_number, data_length=26):
1809        """
1810        Create a share, return (storage_index, uploaded_data, lease_secret).
1811        """
1812        data = (b"abcdefghijklmnopqrstuvwxyz" * ((data_length // 26) + 1))[:data_length]
1813        write_secret = urandom(32)
1814        lease_secret = urandom(32)
1815        storage_index = urandom(16)
1816        self.http.result_of_with_flush(
1817            self.client.read_test_write_chunks(
1818                storage_index,
1819                write_secret,
1820                lease_secret,
1821                lease_secret,
1822                {
1823                    share_number: TestWriteVectors(
1824                        write_vectors=[WriteVector(offset=0, data=data)]
1825                    ),
1826                },
1827                [],
1828            )
1829        )
1830        return storage_index, data, lease_secret
1831
1832    def get_leases(self, storage_index):
1833        return self.http.storage_server.get_slot_leases(storage_index)
1834
1835    @given(data_length=st.integers(min_value=1, max_value=300000))
1836    def test_read_with_no_range(self, data_length):
1837        """
1838        A read with no range returns the whole mutable.
1839        """
1840        return self._read_with_no_range_test(data_length)
Note: See TracBrowser for help on using the repository browser.