Changeset 6dd2b2d in trunk


Ignore:
Timestamp:
2022-06-27T21:00:24Z (3 years ago)
Author:
Itamar Turner-Trauring <itamar@…>
Branches:
master
Children:
75f3302
Parents:
06eca79
git-author:
Itamar Turner-Trauring <itamar@…> (2022-06-27 18:44:51)
git-committer:
Itamar Turner-Trauring <itamar@…> (2022-06-27 21:00:24)
Message:

More streaming, with tests passing again.

Location:
src/allmydata
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/storage/http_server.py

    r06eca79 r6dd2b2d  
    282282@implementer(IPullProducer)
    283283@define
    284 class _ReadProducer:
    285     """
    286     Producer that calls a read function, and writes to a request.
     284class _ReadAllProducer:
     285    """
     286    Producer that calls a read function repeatedly to read all the data, and
     287    writes to a request.
    287288    """
    288289
     
    293294
    294295    def resumeProducing(self):
    295         data = self.read_data(self.start, self.start + 65536)
     296        data = self.read_data(self.start, 65536)
    296297        if not data:
    297298            self.request.unregisterProducer()
     
    310311
    311312
     313@implementer(IPullProducer)
     314@define
     315class _ReadRangeProducer:
     316    """
     317    Producer that calls a read function to read a range of data, and writes to
     318    a request.
     319    """
     320
     321    request: Request
     322    read_data: Callable[[int, int], bytes]
     323    result: Deferred
     324    start: int
     325    remaining: int
     326    first_read: bool = field(default=True)
     327
     328    def resumeProducing(self):
     329        to_read = min(self.remaining, 65536)
     330        data = self.read_data(self.start, to_read)
     331        assert len(data) <= to_read
     332        if self.first_read and data:
     333            # For empty bodies the content-range header makes no sense since
     334            # the end of the range is inclusive.
     335            self.request.setHeader(
     336                "content-range",
     337                ContentRange("bytes", self.start, self.start + len(data)).to_header(),
     338            )
     339        self.request.write(data)
     340
     341        if not data or len(data) < to_read:
     342            self.request.unregisterProducer()
     343            d = self.result
     344            del self.result
     345            d.callback(b"")
     346            return
     347
     348        self.start += len(data)
     349        self.remaining -= len(data)
     350        assert self.remaining >= 0
     351
     352    def pauseProducing(self):
     353        pass
     354
     355    def stopProducing(self):
     356        pass
     357
     358
    312359def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None:
    313360    """
     
    325372    The resulting data is written to the request.
    326373    """
     374
     375    def read_data_with_error_handling(offset: int, length: int) -> bytes:
     376        try:
     377            return read_data(offset, length)
     378        except _HTTPError as e:
     379            request.setResponseCode(e.code)
     380            # Empty read means we're done.
     381            return b""
     382
    327383    if request.getHeader("range") is None:
    328384        d = Deferred()
    329         request.registerProducer(_ReadProducer(request, read_data, d), False)
     385        request.registerProducer(
     386            _ReadAllProducer(request, read_data_with_error_handling, d), False
     387        )
    330388        return d
    331389
     
    340398
    341399    offset, end = range_header.ranges[0]
    342 
    343     # TODO limit memory usage
    344     # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872
    345     data = read_data(offset, end - offset)
    346 
    347400    request.setResponseCode(http.PARTIAL_CONTENT)
    348     if len(data):
    349         # For empty bodies the content-range header makes no sense since
    350         # the end of the range is inclusive.
    351         request.setHeader(
    352             "content-range",
    353             ContentRange("bytes", offset, offset + len(data)).to_header(),
    354         )
    355     request.write(data)
    356     request.finish()
     401    d = Deferred()
     402    request.registerProducer(
     403        _ReadRangeProducer(
     404            request, read_data_with_error_handling, d, offset, end - offset
     405        ),
     406        False,
     407    )
     408    return d
    357409
    358410
  • TabularUnified src/allmydata/test/test_storage_http.py

    r06eca79 r6dd2b2d  
    1111from pycddl import ValidationError as CDDLValidationError
    1212from hypothesis import assume, given, strategies as st
    13 from fixtures import Fixture, TempDir
     13from fixtures import Fixture, TempDir, MockPatch
    1414from treq.testing import StubTreq
    1515from klein import Klein
     
    315315        self.clock = Clock()
    316316        self.tempdir = self.useFixture(TempDir())
     317        self.mock = self.useFixture(
     318            MockPatch(
     319                "twisted.internet.task._theCooperator",
     320                Cooperator(scheduler=lambda c: self.clock.callLater(0.000001, c)),
     321            )
     322        )
    317323        self.storage_server = StorageServer(
    318324            self.tempdir.path, b"\x00" * 20, clock=self.clock
     
    325331            treq=self.treq,
    326332        )
     333
     334    def result_of_with_flush(self, d):
     335        for i in range(100):
     336            self.clock.advance(0.001)
     337        self.treq.flush()
     338        return result_of(d)
    327339
    328340
     
    549561        # We can now read:
    550562        for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]:
    551             downloaded = result_of(
     563            downloaded = self.http.result_of_with_flush(
    552564                self.imm_client.read_share_chunk(storage_index, 1, offset, length)
    553565            )
     
    624636        # The upload of share 1 succeeded, demonstrating that second create()
    625637        # call didn't overwrite work-in-progress.
    626         downloaded = result_of(
     638        downloaded = self.http.result_of_with_flush(
    627639            self.imm_client.read_share_chunk(storage_index, 1, 0, 100)
    628640        )
     
    754766        )
    755767        self.assertEqual(
    756             result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)),
     768            self.http.result_of_with_flush(
     769                self.imm_client.read_share_chunk(storage_index, 1, 0, 10)
     770            ),
    757771            b"1" * 10,
    758772        )
    759773        self.assertEqual(
    760             result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)),
     774            self.http.result_of_with_flush(
     775                self.imm_client.read_share_chunk(storage_index, 2, 0, 10)
     776            ),
    761777            b"2" * 10,
    762778        )
     
    922938        self.assertEqual(
    923939            uploaded_data,
    924             result_of(
     940            self.http.result_of_with_flush(
    925941                self.imm_client.read_share_chunk(
    926942                    storage_index,
     
    9871003        """
    9881004        storage_index, _, _ = self.create_upload()
    989         data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 1, 7))
    990         data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8))
     1005        data0 = self.http.result_of_with_flush(
     1006            self.mut_client.read_share_chunk(storage_index, 0, 1, 7)
     1007        )
     1008        data1 = self.http.result_of_with_flush(
     1009            self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
     1010        )
    9911011        self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1"))
    9921012
     
    10161036        )
    10171037        # But the write did happen:
    1018         data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8))
    1019         data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8))
     1038        data0 = self.http.result_of_with_flush(
     1039            self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
     1040        )
     1041        data1 = self.http.result_of_with_flush(
     1042            self.mut_client.read_share_chunk(storage_index, 1, 0, 8)
     1043        )
    10201044        self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1"))
    10211045
     
    10581082        self.assertTrue(result.success)
    10591083        self.assertEqual(
    1060             result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)),
     1084            self.http.result_of_with_flush(
     1085                self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
     1086            ),
    10611087            b"aXYZef-0",
    10621088        )
     
    10951121        # The write did not happen:
    10961122        self.assertEqual(
    1097             result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)),
     1123            self.http.result_of_with_flush(
     1124                self.mut_client.read_share_chunk(storage_index, 0, 0, 8)
     1125            ),
    10981126            b"abcdef-0",
    10991127        )
     
    11951223        """
    11961224        with assert_fails_with_http_code(self, http.NOT_FOUND):
    1197             result_of(
     1225            self.http.result_of_with_flush(
    11981226                self.client.read_share_chunk(
    11991227                    b"1" * 16,
     
    12101238        storage_index, _, _ = self.upload(1)
    12111239        with assert_fails_with_http_code(self, http.NOT_FOUND):
    1212             result_of(
     1240            self.http.result_of_with_flush(
    12131241                self.client.read_share_chunk(
    12141242                    storage_index,
     
    12361264                self, http.REQUESTED_RANGE_NOT_SATISFIABLE
    12371265            ):
    1238                 result_of(
     1266                self.http.result_of_with_flush(
    12391267                    client.read_share_chunk(
    12401268                        storage_index,
     
    12651293        A read with no range returns the whole mutable/immutable.
    12661294        """
    1267         self.patch(
    1268             task,
    1269             "_theCooperator",
    1270             Cooperator(scheduler=lambda c: self.http.clock.callLater(0.000001, c)),
    1271         )
    1272 
    1273         def result_of_with_flush(d):
    1274             for i in range(100):
    1275                 self.http.clock.advance(0.001)
    1276             self.http.treq.flush()
    1277             return result_of(d)
    1278 
    12791295        storage_index, uploaded_data, _ = self.upload(1, data_length)
    1280         response = result_of_with_flush(
     1296        response = self.http.result_of_with_flush(
    12811297            self.http.client.request(
    12821298                "GET",
     
    12991315            headers = Headers()
    13001316            headers.setRawHeaders("range", [requested_range])
    1301             response = result_of(
     1317            response = self.http.result_of_with_flush(
    13021318                self.http.client.request(
    13031319                    "GET",
Note: See TracChangeset for help on using the changeset viewer.