Changeset 6dd2b2d in trunk
- Timestamp:
- 2022-06-27T21:00:24Z (3 years ago)
- Branches:
- master
- Children:
- 75f3302
- Parents:
- 06eca79
- git-author:
- Itamar Turner-Trauring <itamar@…> (2022-06-27 18:44:51)
- git-committer:
- Itamar Turner-Trauring <itamar@…> (2022-06-27 21:00:24)
- Location:
- src/allmydata
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/storage/http_server.py ¶
r06eca79 r6dd2b2d 282 282 @implementer(IPullProducer) 283 283 @define 284 class _ReadProducer: 285 """ 286 Producer that calls a read function, and writes to a request. 284 class _ReadAllProducer: 285 """ 286 Producer that calls a read function repeatedly to read all the data, and 287 writes to a request. 287 288 """ 288 289 … … 293 294 294 295 def resumeProducing(self): 295 data = self.read_data(self.start, self.start +65536)296 data = self.read_data(self.start, 65536) 296 297 if not data: 297 298 self.request.unregisterProducer() … … 310 311 311 312 313 @implementer(IPullProducer) 314 @define 315 class _ReadRangeProducer: 316 """ 317 Producer that calls a read function to read a range of data, and writes to 318 a request. 319 """ 320 321 request: Request 322 read_data: Callable[[int, int], bytes] 323 result: Deferred 324 start: int 325 remaining: int 326 first_read: bool = field(default=True) 327 328 def resumeProducing(self): 329 to_read = min(self.remaining, 65536) 330 data = self.read_data(self.start, to_read) 331 assert len(data) <= to_read 332 if self.first_read and data: 333 # For empty bodies the content-range header makes no sense since 334 # the end of the range is inclusive. 335 self.request.setHeader( 336 "content-range", 337 ContentRange("bytes", self.start, self.start + len(data)).to_header(), 338 ) 339 self.request.write(data) 340 341 if not data or len(data) < to_read: 342 self.request.unregisterProducer() 343 d = self.result 344 del self.result 345 d.callback(b"") 346 return 347 348 self.start += len(data) 349 self.remaining -= len(data) 350 assert self.remaining >= 0 351 352 def pauseProducing(self): 353 pass 354 355 def stopProducing(self): 356 pass 357 358 312 359 def read_range(request: Request, read_data: Callable[[int, int], bytes]) -> None: 313 360 """ … … 325 372 The resulting data is written to the request. 326 373 """ 374 375 def read_data_with_error_handling(offset: int, length: int) -> bytes: 376 try: 377 return read_data(offset, length) 378 except _HTTPError as e: 379 request.setResponseCode(e.code) 380 # Empty read means we're done. 381 return b"" 382 327 383 if request.getHeader("range") is None: 328 384 d = Deferred() 329 request.registerProducer(_ReadProducer(request, read_data, d), False) 385 request.registerProducer( 386 _ReadAllProducer(request, read_data_with_error_handling, d), False 387 ) 330 388 return d 331 389 … … 340 398 341 399 offset, end = range_header.ranges[0] 342 343 # TODO limit memory usage344 # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3872345 data = read_data(offset, end - offset)346 347 400 request.setResponseCode(http.PARTIAL_CONTENT) 348 if len(data): 349 # For empty bodies the content-range header makes no sense since 350 # the end of the range is inclusive. 351 request.setHeader( 352 "content-range", 353 ContentRange("bytes", offset, offset + len(data)).to_header(), 354 ) 355 request.write(data) 356 request.finish() 401 d = Deferred() 402 request.registerProducer( 403 _ReadRangeProducer( 404 request, read_data_with_error_handling, d, offset, end - offset 405 ), 406 False, 407 ) 408 return d 357 409 358 410 -
TabularUnified src/allmydata/test/test_storage_http.py ¶
r06eca79 r6dd2b2d 11 11 from pycddl import ValidationError as CDDLValidationError 12 12 from hypothesis import assume, given, strategies as st 13 from fixtures import Fixture, TempDir 13 from fixtures import Fixture, TempDir, MockPatch 14 14 from treq.testing import StubTreq 15 15 from klein import Klein … … 315 315 self.clock = Clock() 316 316 self.tempdir = self.useFixture(TempDir()) 317 self.mock = self.useFixture( 318 MockPatch( 319 "twisted.internet.task._theCooperator", 320 Cooperator(scheduler=lambda c: self.clock.callLater(0.000001, c)), 321 ) 322 ) 317 323 self.storage_server = StorageServer( 318 324 self.tempdir.path, b"\x00" * 20, clock=self.clock … … 325 331 treq=self.treq, 326 332 ) 333 334 def result_of_with_flush(self, d): 335 for i in range(100): 336 self.clock.advance(0.001) 337 self.treq.flush() 338 return result_of(d) 327 339 328 340 … … 549 561 # We can now read: 550 562 for offset, length in [(0, 100), (10, 19), (99, 1), (49, 200)]: 551 downloaded = result_of(563 downloaded = self.http.result_of_with_flush( 552 564 self.imm_client.read_share_chunk(storage_index, 1, offset, length) 553 565 ) … … 624 636 # The upload of share 1 succeeded, demonstrating that second create() 625 637 # call didn't overwrite work-in-progress. 626 downloaded = result_of(638 downloaded = self.http.result_of_with_flush( 627 639 self.imm_client.read_share_chunk(storage_index, 1, 0, 100) 628 640 ) … … 754 766 ) 755 767 self.assertEqual( 756 result_of(self.imm_client.read_share_chunk(storage_index, 1, 0, 10)), 768 self.http.result_of_with_flush( 769 self.imm_client.read_share_chunk(storage_index, 1, 0, 10) 770 ), 757 771 b"1" * 10, 758 772 ) 759 773 self.assertEqual( 760 result_of(self.imm_client.read_share_chunk(storage_index, 2, 0, 10)), 774 self.http.result_of_with_flush( 775 self.imm_client.read_share_chunk(storage_index, 2, 0, 10) 776 ), 761 777 b"2" * 10, 762 778 ) … … 922 938 self.assertEqual( 923 939 uploaded_data, 924 result_of(940 self.http.result_of_with_flush( 925 941 self.imm_client.read_share_chunk( 926 942 storage_index, … … 987 1003 """ 988 1004 storage_index, _, _ = self.create_upload() 989 data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 1, 7)) 990 data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8)) 1005 data0 = self.http.result_of_with_flush( 1006 self.mut_client.read_share_chunk(storage_index, 0, 1, 7) 1007 ) 1008 data1 = self.http.result_of_with_flush( 1009 self.mut_client.read_share_chunk(storage_index, 1, 0, 8) 1010 ) 991 1011 self.assertEqual((data0, data1), (b"bcdef-0", b"abcdef-1")) 992 1012 … … 1016 1036 ) 1017 1037 # But the write did happen: 1018 data0 = result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)) 1019 data1 = result_of(self.mut_client.read_share_chunk(storage_index, 1, 0, 8)) 1038 data0 = self.http.result_of_with_flush( 1039 self.mut_client.read_share_chunk(storage_index, 0, 0, 8) 1040 ) 1041 data1 = self.http.result_of_with_flush( 1042 self.mut_client.read_share_chunk(storage_index, 1, 0, 8) 1043 ) 1020 1044 self.assertEqual((data0, data1), (b"aXYZef-0", b"abcdef-1")) 1021 1045 … … 1058 1082 self.assertTrue(result.success) 1059 1083 self.assertEqual( 1060 result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)), 1084 self.http.result_of_with_flush( 1085 self.mut_client.read_share_chunk(storage_index, 0, 0, 8) 1086 ), 1061 1087 b"aXYZef-0", 1062 1088 ) … … 1095 1121 # The write did not happen: 1096 1122 self.assertEqual( 1097 result_of(self.mut_client.read_share_chunk(storage_index, 0, 0, 8)), 1123 self.http.result_of_with_flush( 1124 self.mut_client.read_share_chunk(storage_index, 0, 0, 8) 1125 ), 1098 1126 b"abcdef-0", 1099 1127 ) … … 1195 1223 """ 1196 1224 with assert_fails_with_http_code(self, http.NOT_FOUND): 1197 result_of(1225 self.http.result_of_with_flush( 1198 1226 self.client.read_share_chunk( 1199 1227 b"1" * 16, … … 1210 1238 storage_index, _, _ = self.upload(1) 1211 1239 with assert_fails_with_http_code(self, http.NOT_FOUND): 1212 result_of(1240 self.http.result_of_with_flush( 1213 1241 self.client.read_share_chunk( 1214 1242 storage_index, … … 1236 1264 self, http.REQUESTED_RANGE_NOT_SATISFIABLE 1237 1265 ): 1238 result_of(1266 self.http.result_of_with_flush( 1239 1267 client.read_share_chunk( 1240 1268 storage_index, … … 1265 1293 A read with no range returns the whole mutable/immutable. 1266 1294 """ 1267 self.patch(1268 task,1269 "_theCooperator",1270 Cooperator(scheduler=lambda c: self.http.clock.callLater(0.000001, c)),1271 )1272 1273 def result_of_with_flush(d):1274 for i in range(100):1275 self.http.clock.advance(0.001)1276 self.http.treq.flush()1277 return result_of(d)1278 1279 1295 storage_index, uploaded_data, _ = self.upload(1, data_length) 1280 response = result_of_with_flush(1296 response = self.http.result_of_with_flush( 1281 1297 self.http.client.request( 1282 1298 "GET", … … 1299 1315 headers = Headers() 1300 1316 headers.setRawHeaders("range", [requested_range]) 1301 response = result_of(1317 response = self.http.result_of_with_flush( 1302 1318 self.http.client.request( 1303 1319 "GET",
Note: See TracChangeset
for help on using the changeset viewer.