Changeset f6f9a97 in trunk


Ignore:
Timestamp:
2010-08-05T18:55:07Z (15 years ago)
Author:
Brian Warner <warner@…>
Branches:
master
Children:
a0124e9
Parents:
43c50321
git-author:
Brian Warner <warner@…> (2010-08-05 18:45:49)
git-committer:
Brian Warner <warner@…> (2010-08-05 18:55:07)
Message:

DownloadNode?: fix lost-progress in fetch_failed, tolerate cancel when no segment-fetch is active. Fixes #1154.

The lost-progress bug occurred when two simultanous read() calls fetched
different segments, and the first one failed (due to corruption, or the other
bugs in #1154): the second read() would never complete. While in this state,
cancelling the second read by having its consumer call stopProducing) would
trigger the cancel-intolerance bug. Finally, in downloader.node.Cancel,
prevent late cancels by adding an 'active' flag

Location:
src/allmydata
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/immutable/downloader/node.py

    r43c50321 rf6f9a97  
    2121    def __init__(self, f):
    2222        self._f = f
    23         self.cancelled = False
     23        self.active = True
    2424    def cancel(self):
    25         if not self.cancelled:
    26             self.cancelled = True
     25        if self.active:
     26            self.active = False
    2727            self._f(self)
    2828
     
    361361    def fetch_failed(self, sf, f):
    362362        assert sf is self._active_segment
    363         self._active_segment = None
    364363        # deliver error upwards
    365364        for (d,c) in self._extract_requests(sf.segnum):
    366365            eventually(self._deliver, d, c, f)
     366        self._active_segment = None
     367        self._start_new_segment()
    367368
    368369    def process_blocks(self, segnum, blocks):
     
    450451        # this method exists to handle cancel() that occurs between
    451452        # _got_segment and _deliver
    452         if not c.cancelled:
     453        if c.active:
     454            c.active = False # it is now too late to cancel
    453455            d.callback(result) # might actually be an errback
    454456
     
    466468                                  if t[2] != c]
    467469        segnums = [segnum for (segnum,d,c) in self._segment_requests]
    468         if self._active_segment.segnum not in segnums:
     470        # self._active_segment might be None in rare circumstances, so make
     471        # sure we tolerate it
     472        if self._active_segment and self._active_segment.segnum not in segnums:
    469473            self._active_segment.stop()
    470474            self._active_segment = None
  • TabularUnified src/allmydata/test/test_download.py

    r43c50321 rf6f9a97  
    487487        return d
    488488
     489    def test_simultaneous_onefails_onecancelled(self):
     490        # This exercises an mplayer behavior in ticket #1154. I believe that
     491        # mplayer made two simultaneous webapi GET requests: first one for an
     492        # index region at the end of the (mp3/video) file, then one for the
     493        # first block of the file (the order doesn't really matter). All GETs
     494        # failed (NoSharesError) because of the type(__len__)==long bug. Each
     495        # GET submitted a DownloadNode.get_segment() request, which was
     496        # queued by the DN (DN._segment_requests), so the second one was
     497        # blocked waiting on the first one. When the first one failed,
     498        # DN.fetch_failed() was invoked, which errbacks the first GET, but
     499        # left the other one hanging (the lost-progress bug mentioned in
     500        # #1154 comment 10)
     501        #
     502        # Then mplayer sees that the index region GET failed, so it cancels
     503        # the first-block GET (by closing the HTTP request), triggering
     504        # stopProducer. The second GET was waiting in the Deferred (between
     505        # n.get_segment() and self._request_retired), so its
     506        # _cancel_segment_request was active, so was invoked. However,
     507        # DN._active_segment was None since it was not working on any segment
     508        # at that time, hence the error in #1154.
     509
     510        self.basedir = self.mktemp()
     511        self.set_up_grid()
     512        self.c0 = self.g.clients[0]
     513
     514        # upload a file with multiple segments, so we can catch the download
     515        # in the middle. Tell the downloader, so it can guess correctly.
     516        u = upload.Data(plaintext, None)
     517        u.max_segment_size = 70 # 5 segs
     518        d = self.c0.upload(u)
     519        def _uploaded(ur):
     520            # corrupt all the shares so the download will fail
     521            def _corruptor(s, debug=False):
     522                which = 48 # first byte of block0
     523                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
     524            self.corrupt_all_shares(ur.uri, _corruptor)
     525            n = self.c0.create_node_from_uri(ur.uri)
     526            n._cnode._maybe_create_download_node()
     527            n._cnode._node._build_guessed_tables(u.max_segment_size)
     528            con1 = MemoryConsumer()
     529            con2 = MemoryConsumer()
     530            d = n.read(con1, 0L, 20)
     531            d2 = n.read(con2, 140L, 20)
     532            # con2 will be cancelled, so d2 should fail with DownloadStopped
     533            def _con2_should_not_succeed(res):
     534                self.fail("the second read should not have succeeded")
     535            def _con2_failed(f):
     536                self.failUnless(f.check(DownloadStopped))
     537            d2.addCallbacks(_con2_should_not_succeed, _con2_failed)
     538
     539            def _con1_should_not_succeed(res):
     540                self.fail("the first read should not have succeeded")
     541            def _con1_failed(f):
     542                self.failUnless(f.check(NotEnoughSharesError))
     543                con2.producer.stopProducing()
     544                return d2
     545            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
     546            return d
     547        d.addCallback(_uploaded)
     548        return d
     549
     550    def test_simultaneous_onefails(self):
     551        self.basedir = self.mktemp()
     552        self.set_up_grid()
     553        self.c0 = self.g.clients[0]
     554
     555        # upload a file with multiple segments, so we can catch the download
     556        # in the middle. Tell the downloader, so it can guess correctly.
     557        u = upload.Data(plaintext, None)
     558        u.max_segment_size = 70 # 5 segs
     559        d = self.c0.upload(u)
     560        def _uploaded(ur):
     561            # corrupt all the shares so the download will fail
     562            def _corruptor(s, debug=False):
     563                which = 48 # first byte of block0
     564                return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
     565            self.corrupt_all_shares(ur.uri, _corruptor)
     566            n = self.c0.create_node_from_uri(ur.uri)
     567            n._cnode._maybe_create_download_node()
     568            n._cnode._node._build_guessed_tables(u.max_segment_size)
     569            con1 = MemoryConsumer()
     570            con2 = MemoryConsumer()
     571            d = n.read(con1, 0L, 20)
     572            d2 = n.read(con2, 140L, 20)
     573            # con2 should wait for con1 to fail and then con2 should succeed.
     574            # In particular, we should not lose progress. If this test fails,
     575            # it will fail with a timeout error.
     576            def _con2_should_succeed(res):
     577                # this should succeed because we only corrupted the first
     578                # segment of each share. The segment that holds [140:160] is
     579                # fine, as are the hash chains and UEB.
     580                self.failUnlessEqual("".join(con2.chunks), plaintext[140:160])
     581            d2.addCallback(_con2_should_succeed)
     582
     583            def _con1_should_not_succeed(res):
     584                self.fail("the first read should not have succeeded")
     585            def _con1_failed(f):
     586                self.failUnless(f.check(NotEnoughSharesError))
     587                # we *don't* cancel the second one here: this exercises a
     588                # lost-progress bug from #1154. We just wait for it to
     589                # succeed.
     590                return d2
     591            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
     592            return d
     593        d.addCallback(_uploaded)
     594        return d
     595
    489596    def test_download_no_overrun(self):
    490597        self.basedir = self.mktemp()
     
    600707
    601708    def test_stop(self):
    602         # use a download targetthat does an immediate stop (ticket #473)
     709        # use a download target that stops after the first segment (#473)
    603710        self.basedir = self.mktemp()
    604711        self.set_up_grid()
     
    610717                            "our Consumer called stopProducing()",
    611718                            n.read, c)
     719        return d
     720
     721    def test_stop_immediately(self):
     722        # and a target that stops right after registerProducer (maybe #1154)
     723        self.basedir = self.mktemp()
     724        self.set_up_grid()
     725        self.c0 = self.g.clients[0]
     726        self.load_shares()
     727        n = self.c0.create_node_from_uri(immutable_uri)
     728
     729        c = ImmediatelyStoppingConsumer() # stops after registerProducer
     730        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
     731                            "our Consumer called stopProducing()",
     732                            n.read, c)
     733        return d
     734
     735    def test_stop_immediately2(self):
     736        # and a target that stops right after registerProducer (maybe #1154)
     737        self.basedir = self.mktemp()
     738        self.set_up_grid()
     739        self.c0 = self.g.clients[0]
     740        self.load_shares()
     741        n = self.c0.create_node_from_uri(immutable_uri)
     742
     743        c = MemoryConsumer()
     744        d0 = n.read(c)
     745        c.producer.stopProducing()
     746        d = self.shouldFail(DownloadStopped, "test_stop_immediately",
     747                            "our Consumer called stopProducing()",
     748                            lambda: d0)
    612749        return d
    613750
     
    775912class StoppingConsumer(PausingConsumer):
    776913    def write(self, data):
     914        self.producer.stopProducing()
     915
     916class ImmediatelyStoppingConsumer(MemoryConsumer):
     917    def registerProducer(self, p, streaming):
     918        MemoryConsumer.registerProducer(self, p, streaming)
    777919        self.producer.stopProducing()
    778920
Note: See TracChangeset for help on using the changeset viewer.