Changeset f6f9a97 in trunk
- Timestamp:
- 2010-08-05T18:55:07Z (15 years ago)
- Branches:
- master
- Children:
- a0124e9
- Parents:
- 43c50321
- git-author:
- Brian Warner <warner@…> (2010-08-05 18:45:49)
- git-committer:
- Brian Warner <warner@…> (2010-08-05 18:55:07)
- Location:
- src/allmydata
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/immutable/downloader/node.py ¶
r43c50321 rf6f9a97 21 21 def __init__(self, f): 22 22 self._f = f 23 self. cancelled = False23 self.active = True 24 24 def cancel(self): 25 if not self.cancelled:26 self. cancelled = True25 if self.active: 26 self.active = False 27 27 self._f(self) 28 28 … … 361 361 def fetch_failed(self, sf, f): 362 362 assert sf is self._active_segment 363 self._active_segment = None364 363 # deliver error upwards 365 364 for (d,c) in self._extract_requests(sf.segnum): 366 365 eventually(self._deliver, d, c, f) 366 self._active_segment = None 367 self._start_new_segment() 367 368 368 369 def process_blocks(self, segnum, blocks): … … 450 451 # this method exists to handle cancel() that occurs between 451 452 # _got_segment and _deliver 452 if not c.cancelled: 453 if c.active: 454 c.active = False # it is now too late to cancel 453 455 d.callback(result) # might actually be an errback 454 456 … … 466 468 if t[2] != c] 467 469 segnums = [segnum for (segnum,d,c) in self._segment_requests] 468 if self._active_segment.segnum not in segnums: 470 # self._active_segment might be None in rare circumstances, so make 471 # sure we tolerate it 472 if self._active_segment and self._active_segment.segnum not in segnums: 469 473 self._active_segment.stop() 470 474 self._active_segment = None -
TabularUnified src/allmydata/test/test_download.py ¶
r43c50321 rf6f9a97 487 487 return d 488 488 489 def test_simultaneous_onefails_onecancelled(self): 490 # This exercises an mplayer behavior in ticket #1154. I believe that 491 # mplayer made two simultaneous webapi GET requests: first one for an 492 # index region at the end of the (mp3/video) file, then one for the 493 # first block of the file (the order doesn't really matter). All GETs 494 # failed (NoSharesError) because of the type(__len__)==long bug. Each 495 # GET submitted a DownloadNode.get_segment() request, which was 496 # queued by the DN (DN._segment_requests), so the second one was 497 # blocked waiting on the first one. When the first one failed, 498 # DN.fetch_failed() was invoked, which errbacks the first GET, but 499 # left the other one hanging (the lost-progress bug mentioned in 500 # #1154 comment 10) 501 # 502 # Then mplayer sees that the index region GET failed, so it cancels 503 # the first-block GET (by closing the HTTP request), triggering 504 # stopProducer. The second GET was waiting in the Deferred (between 505 # n.get_segment() and self._request_retired), so its 506 # _cancel_segment_request was active, so was invoked. However, 507 # DN._active_segment was None since it was not working on any segment 508 # at that time, hence the error in #1154. 509 510 self.basedir = self.mktemp() 511 self.set_up_grid() 512 self.c0 = self.g.clients[0] 513 514 # upload a file with multiple segments, so we can catch the download 515 # in the middle. Tell the downloader, so it can guess correctly. 516 u = upload.Data(plaintext, None) 517 u.max_segment_size = 70 # 5 segs 518 d = self.c0.upload(u) 519 def _uploaded(ur): 520 # corrupt all the shares so the download will fail 521 def _corruptor(s, debug=False): 522 which = 48 # first byte of block0 523 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] 524 self.corrupt_all_shares(ur.uri, _corruptor) 525 n = self.c0.create_node_from_uri(ur.uri) 526 n._cnode._maybe_create_download_node() 527 n._cnode._node._build_guessed_tables(u.max_segment_size) 528 con1 = MemoryConsumer() 529 con2 = MemoryConsumer() 530 d = n.read(con1, 0L, 20) 531 d2 = n.read(con2, 140L, 20) 532 # con2 will be cancelled, so d2 should fail with DownloadStopped 533 def _con2_should_not_succeed(res): 534 self.fail("the second read should not have succeeded") 535 def _con2_failed(f): 536 self.failUnless(f.check(DownloadStopped)) 537 d2.addCallbacks(_con2_should_not_succeed, _con2_failed) 538 539 def _con1_should_not_succeed(res): 540 self.fail("the first read should not have succeeded") 541 def _con1_failed(f): 542 self.failUnless(f.check(NotEnoughSharesError)) 543 con2.producer.stopProducing() 544 return d2 545 d.addCallbacks(_con1_should_not_succeed, _con1_failed) 546 return d 547 d.addCallback(_uploaded) 548 return d 549 550 def test_simultaneous_onefails(self): 551 self.basedir = self.mktemp() 552 self.set_up_grid() 553 self.c0 = self.g.clients[0] 554 555 # upload a file with multiple segments, so we can catch the download 556 # in the middle. Tell the downloader, so it can guess correctly. 557 u = upload.Data(plaintext, None) 558 u.max_segment_size = 70 # 5 segs 559 d = self.c0.upload(u) 560 def _uploaded(ur): 561 # corrupt all the shares so the download will fail 562 def _corruptor(s, debug=False): 563 which = 48 # first byte of block0 564 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] 565 self.corrupt_all_shares(ur.uri, _corruptor) 566 n = self.c0.create_node_from_uri(ur.uri) 567 n._cnode._maybe_create_download_node() 568 n._cnode._node._build_guessed_tables(u.max_segment_size) 569 con1 = MemoryConsumer() 570 con2 = MemoryConsumer() 571 d = n.read(con1, 0L, 20) 572 d2 = n.read(con2, 140L, 20) 573 # con2 should wait for con1 to fail and then con2 should succeed. 574 # In particular, we should not lose progress. If this test fails, 575 # it will fail with a timeout error. 576 def _con2_should_succeed(res): 577 # this should succeed because we only corrupted the first 578 # segment of each share. The segment that holds [140:160] is 579 # fine, as are the hash chains and UEB. 580 self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) 581 d2.addCallback(_con2_should_succeed) 582 583 def _con1_should_not_succeed(res): 584 self.fail("the first read should not have succeeded") 585 def _con1_failed(f): 586 self.failUnless(f.check(NotEnoughSharesError)) 587 # we *don't* cancel the second one here: this exercises a 588 # lost-progress bug from #1154. We just wait for it to 589 # succeed. 590 return d2 591 d.addCallbacks(_con1_should_not_succeed, _con1_failed) 592 return d 593 d.addCallback(_uploaded) 594 return d 595 489 596 def test_download_no_overrun(self): 490 597 self.basedir = self.mktemp() … … 600 707 601 708 def test_stop(self): 602 # use a download target that does an immediate stop (ticket#473)709 # use a download target that stops after the first segment (#473) 603 710 self.basedir = self.mktemp() 604 711 self.set_up_grid() … … 610 717 "our Consumer called stopProducing()", 611 718 n.read, c) 719 return d 720 721 def test_stop_immediately(self): 722 # and a target that stops right after registerProducer (maybe #1154) 723 self.basedir = self.mktemp() 724 self.set_up_grid() 725 self.c0 = self.g.clients[0] 726 self.load_shares() 727 n = self.c0.create_node_from_uri(immutable_uri) 728 729 c = ImmediatelyStoppingConsumer() # stops after registerProducer 730 d = self.shouldFail(DownloadStopped, "test_stop_immediately", 731 "our Consumer called stopProducing()", 732 n.read, c) 733 return d 734 735 def test_stop_immediately2(self): 736 # and a target that stops right after registerProducer (maybe #1154) 737 self.basedir = self.mktemp() 738 self.set_up_grid() 739 self.c0 = self.g.clients[0] 740 self.load_shares() 741 n = self.c0.create_node_from_uri(immutable_uri) 742 743 c = MemoryConsumer() 744 d0 = n.read(c) 745 c.producer.stopProducing() 746 d = self.shouldFail(DownloadStopped, "test_stop_immediately", 747 "our Consumer called stopProducing()", 748 lambda: d0) 612 749 return d 613 750 … … 775 912 class StoppingConsumer(PausingConsumer): 776 913 def write(self, data): 914 self.producer.stopProducing() 915 916 class ImmediatelyStoppingConsumer(MemoryConsumer): 917 def registerProducer(self, p, streaming): 918 MemoryConsumer.registerProducer(self, p, streaming) 777 919 self.producer.stopProducing() 778 920
Note: See TracChangeset
for help on using the changeset viewer.