Changeset 3e4342ec in trunk


Ignore:
Timestamp:
2010-02-01T06:16:10Z (15 years ago)
Author:
Zooko O'Whielacronx <zooko@…>
Branches:
master
Children:
57e3af1
Parents:
e4e2599
Message:

immutable: downloader accepts notifications of buckets even if those notifications arrive after he has begun downloading shares.
This can be useful if one of the ones that he has already begun downloading fails. See #287 for discussion. This fixes part of #287 which part was a regression caused by #928, namely this fixes fail-over in case a share is corrupted (or the server returns an error or disconnects). This does not fix the related issue mentioned in #287 if a server hangs and doesn't reply to requests for blocks.

Location:
src/allmydata
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/immutable/download.py

    re4e2599 r3e4342ec  
    790790        self.active_buckets = {} # k: shnum, v: bucket
    791791        self._share_buckets = {} # k: sharenum, v: list of buckets
    792         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
     792
     793        # _download_all_segments() will set this to:
     794        # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
     795        self._share_vbuckets = None
    793796
    794797        self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
     
    809812        # self._responses_received = 0
    810813        # self._queries_failed = 0
     814
     815        # This is solely for the use of unit tests. It will be triggered when
     816        # we start downloading shares.
     817        self._stage_4_d = defer.Deferred()
    811818
    812819    def pauseProducing(self):
     
    939946                self._wait_for_enough_buckets_d = None
    940947
     948            if self._share_vbuckets is not None:
     949                vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
     950                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
     951
    941952            if self._results:
    942953                if peerid not in self._results.servermap:
     
    10891100
    10901101    def _download_all_segments(self, res):
     1102        # From now on if new buckets are received then I will notice that
     1103        # self._share_vbuckets is not None and generate a vbucket for that new
     1104        # bucket and add it in to _share_vbuckets. (We had to wait because we
     1105        # didn't have self._vup and self._share_hash_tree earlier. We didn't
     1106        # need validated buckets until now -- now that we are ready to download
     1107        # shares.)
     1108        self._share_vbuckets = {}
    10911109        for sharenum, buckets in self._share_buckets.iteritems():
    10921110            for bucket in buckets:
     
    11101128            # happening until the consumer is ready for more data.
    11111129            d.addCallback(self._check_for_pause)
     1130
     1131        self._stage_4_d.callback(None)
    11121132        return d
    11131133
  • TabularUnified src/allmydata/test/no_network.py

    re4e2599 r3e4342ec  
    253253    def break_server(self, serverid):
    254254        # mark the given server as broken, so it will throw exceptions when
    255         # asked to hold a share
     255        # asked to hold a share or serve a share
    256256        self.servers_by_id[serverid].broken = True
    257257
    258     def hang_server(self, serverid, until=defer.Deferred()):
    259         # hang the given server until 'until' fires
    260         self.servers_by_id[serverid].hung_until = until
     258    def hang_server(self, serverid):
     259        # hang the given server
     260        ss = self.servers_by_id[serverid]
     261        assert ss.hung_until is None
     262        ss.hung_until = defer.Deferred()
     263
     264    def unhang_server(self, serverid):
     265        # unhang the given server
     266        ss = self.servers_by_id[serverid]
     267        assert ss.hung_until is not None
     268        ss.hung_until.callback(None)
     269        ss.hung_until = None
    261270
    262271
  • TabularUnified src/allmydata/test/test_hung_server.py

    re4e2599 r3e4342ec  
    22import os, shutil
    33from twisted.trial import unittest
    4 from twisted.internet import defer, reactor
     4from twisted.internet import defer
    55from allmydata import uri
    66from allmydata.util.consumer import download_to_data
     
    99from allmydata.storage.common import storage_index_to_dir
    1010from allmydata.test.no_network import GridTestMixin
    11 from allmydata.test.common import ShouldFailMixin
     11from allmydata.test.common import ShouldFailMixin, _corrupt_share_data
    1212from allmydata.interfaces import NotEnoughSharesError
    1313
     
    2525        for (id, ss) in servers:
    2626            self.g.hang_server(id, **kwargs)
     27
     28    def _unhang(self, servers, **kwargs):
     29        for (id, ss) in servers:
     30            self.g.unhang_server(id, **kwargs)
    2731
    2832    def _delete_all_shares_from(self, servers):
     
    3135            if i_serverid in serverids:
    3236                os.unlink(i_sharefile)
     37
     38    def _corrupt_all_shares_in(self, servers, corruptor_func):
     39        serverids = [id for (id, ss) in servers]
     40        for (i_shnum, i_serverid, i_sharefile) in self.shares:
     41            if i_serverid in serverids:
     42                self._corrupt_share((i_shnum, i_sharefile), corruptor_func)
    3343
    3444    def _copy_all_shares_from(self, from_servers, to_server):
     
    5363                         in self.shares)
    5464
     65    def _corrupt_share(self, share, corruptor_func):
     66        (sharenum, sharefile) = share
     67        data = open(sharefile, "rb").read()
     68        newdata = corruptor_func(data)
     69        os.unlink(sharefile)
     70        wf = open(sharefile, "wb")
     71        wf.write(newdata)
     72        wf.close()
     73
    5574    def _set_up(self, mutable, testdir, num_clients=1, num_servers=10):
    5675        self.mutable = mutable
     
    81100        return d
    82101
    83     def _check_download(self):
     102    def _start_download(self):
    84103        n = self.c0.create_node_from_uri(self.uri)
    85104        if self.mutable:
    86105            d = n.download_best_version()
    87             expected_plaintext = mutable_plaintext
     106            stage_4_d = None # currently we aren't doing any tests which require this for mutable files
    88107        else:
    89108            d = download_to_data(n)
    90             expected_plaintext = immutable_plaintext
    91         def _got_data(data):
    92             self.failUnlessEqual(data, expected_plaintext)
    93         d.addCallback(_got_data)
     109            stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME
     110        return (d, stage_4_d,)
     111
     112    def _wait_for_data(self, n):
     113        if self.mutable:
     114            d = n.download_best_version()
     115        else:
     116            d = download_to_data(n)
     117        return d
     118
     119    def _check(self, resultingdata):
     120        if self.mutable:
     121            self.failUnlessEqual(resultingdata, mutable_plaintext)
     122        else:
     123            self.failUnlessEqual(resultingdata, immutable_plaintext)
     124
     125    def _download_and_check(self):
     126        d, stage4d = self._start_download()
     127        d.addCallback(self._check)
    94128        return d
    95129
     
    98132            return self.shouldFail(UnrecoverableFileError, self.basedir,
    99133                                   "no recoverable versions",
    100                                    self._check_download)
     134                                   self._download_and_check)
    101135        else:
    102136            return self.shouldFail(NotEnoughSharesError, self.basedir,
    103137                                   "Failed to get enough shareholders",
    104                                    self._check_download)
     138                                   self._download_and_check)
    105139
    106140
     
    109143        for mutable in [False, True]:
    110144            d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_sanity_check"))
    111             d.addCallback(lambda ign: self._check_download())
     145            d.addCallback(lambda ign: self._download_and_check())
    112146        return d
    113147
     
    117151            d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_copied_share"))
    118152            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
    119             d.addCallback(lambda ign: self._check_download())
     153            d.addCallback(lambda ign: self._download_and_check())
    120154            return d
    121155
     
    125159            d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_noshares"))
    126160            d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:]))
    127             d.addCallback(lambda ign: self._check_download())
     161            d.addCallback(lambda ign: self._download_and_check())
    128162        return d
    129163
     
    150184            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0]))
    151185            d.addCallback(lambda ign: self._break(self.servers[2:]))
    152             d.addCallback(lambda ign: self._check_download())
     186            d.addCallback(lambda ign: self._download_and_check())
    153187        return d
    154188
     
    169203            d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung"))
    170204            d.addCallback(lambda ign: self._hang(self.servers[3:]))
    171             d.addCallback(lambda ign: self._check_download())
     205            d.addCallback(lambda ign: self._download_and_check())
    172206        return d
    173207
     
    175209        d = defer.succeed(None)
    176210        for mutable in [False]:
    177             recovered = defer.Deferred()
    178211            d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers"))
    179             d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
    180             d.addCallback(lambda ign: self._hang(self.servers[3:]))
    181             d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
    182             d.addCallback(lambda ign: self._check_download())
     212            d.addCallback(lambda ign: self._hang(self.servers[2:3]))
     213            d.addCallback(lambda ign: self._hang(self.servers[3:]))
     214            d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
     215            d.addCallback(lambda ign: self._download_and_check())
    183216        return d
    184217
     
    186219        d = defer.succeed(None)
    187220        for mutable in [False]:
    188             recovered = defer.Deferred()
    189221            d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares"))
    190222            d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2]))
    191             d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered))
    192             d.addCallback(lambda ign: self._hang(self.servers[3:]))
    193             d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))
    194             d.addCallback(lambda ign: self._check_download())
    195         return d
     223            d.addCallback(lambda ign: self._hang(self.servers[2:3]))
     224            d.addCallback(lambda ign: self._hang(self.servers[3:]))
     225            d.addCallback(lambda ign: self._unhang(self.servers[2:3]))
     226            d.addCallback(lambda ign: self._download_and_check())
     227        return d
     228
     229    def test_failover_during_stage_4(self):
     230        # See #287
     231        d = defer.succeed(None)
     232        for mutable in [False]:
     233            d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
     234            d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data))
     235            d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4"))
     236            d.addCallback(lambda ign: self._hang(self.servers[3:]))
     237            d.addCallback(lambda ign: self._start_download())
     238            def _after_starting_download((doned, started4d)):
     239                started4d.addCallback(lambda ign: self._unhang(self.servers[3:4]))
     240                doned.addCallback(self._check)
     241                return doned
     242            d.addCallback(_after_starting_download)
     243
     244        return d
Note: See TracChangeset for help on using the changeset viewer.