Changeset 2bd9dfa in trunk


Ignore:
Timestamp:
2010-01-27T23:34:17Z (15 years ago)
Author:
Zooko O'Whielacronx <zooko@…>
Branches:
master
Children:
baa11a0a
Parents:
14280b00
Message:

immutable: download from the first servers which provide at least K buckets instead of waiting for all servers to reply
This should put an end to the phenomenon I've been seeing that a single hung server can cause all downloads on a grid to hang. Also it should speed up all downloads by (a) not-waiting for responses to queries that it doesn't need, and (b) downloading shares from the servers which answered the initial query the fastest.
Also, do not count how many buckets you've gotten when deciding whether the download has enough shares or not -- instead count how many buckets to *unique* shares that you've gotten. This appears to improve a slightly weird behavior in the current download code in which receiving >= K different buckets all to the same sharenumber would make it think it had enough to download the file when in fact it hadn't.
This patch needs tests before it is actually ready for trunk.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/immutable/download.py

    r14280b00 r2bd9dfa  
    789789
    790790        self.active_buckets = {} # k: shnum, v: bucket
    791         self._share_buckets = [] # list of (sharenum, bucket) tuples
     791        self._share_buckets = {} # k: sharenum, v: list of buckets
    792792        self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
    793793
     
    870870
    871871    def _get_all_shareholders(self):
    872         dl = []
     872        """ Once the number of buckets that I know about is >= K then I
     873        callback the Deferred that I return.
     874
     875        If all of the get_buckets deferreds have fired (whether callback or
     876        errback) and I still don't have enough buckets then I'll callback the
     877        Deferred that I return.
     878        """
     879        self._wait_for_enough_buckets_d = defer.Deferred()
     880
     881        self._queries_sent = 0
     882        self._responses_received = 0
     883        self._queries_failed = 0
    873884        sb = self._storage_broker
    874885        servers = sb.get_servers_for_index(self._storage_index)
     
    879890                     peerid=idlib.shortnodeid_b2a(peerid),
    880891                     level=log.NOISY, umid="rT03hg")
     892            self._queries_sent += 1
    881893            d = ss.callRemote("get_buckets", self._storage_index)
    882894            d.addCallbacks(self._got_response, self._got_error,
    883895                           callbackArgs=(peerid,))
    884             dl.append(d)
    885         self._responses_received = 0
    886         self._queries_sent = len(dl)
    887896        if self._status:
    888897            self._status.set_status("Locating Shares (%d/%d)" %
    889898                                    (self._responses_received,
    890899                                     self._queries_sent))
    891         return defer.DeferredList(dl)
     900        return self._wait_for_enough_buckets_d
    892901
    893902    def _got_response(self, buckets, peerid):
     
    907916            b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
    908917            self.add_share_bucket(sharenum, b)
     918            # If we just got enough buckets for the first time, then fire the
     919            # deferred. Then remove it from self so that we don't fire it
     920            # again.
     921            if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
     922                self._wait_for_enough_buckets_d.callback(True)
     923                self._wait_for_enough_buckets_d = None
     924
     925            # Else, if we ran out of outstanding requests then fire it and
     926            # remove it from self.
     927            assert (self._responses_received+self._queries_failed) <= self._queries_sent
     928            if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
     929                self._wait_for_enough_buckets_d.callback(False)
     930                self._wait_for_enough_buckets_d = None
    909931
    910932            if self._results:
     
    915937    def add_share_bucket(self, sharenum, bucket):
    916938        # this is split out for the benefit of test_encode.py
    917         self._share_buckets.append( (sharenum, bucket) )
     939        self._share_buckets.setdefault(sharenum, []).append(bucket)
    918940
    919941    def _got_error(self, f):
     
    923945        self.log("Error during get_buckets", failure=f, level=level,
    924946                         umid="3uuBUQ")
     947        # If we ran out of outstanding requests then errback it and remove it
     948        # from self.
     949        self._queries_failed += 1
     950        assert (self._responses_received+self._queries_failed) <= self._queries_sent
     951        if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
     952            self._wait_for_enough_buckets_d.errback()
     953            self._wait_for_enough_buckets_d = None
    925954
    926955    def bucket_failed(self, vbucket):
     
    965994
    966995        vups = []
    967         for sharenum, bucket in self._share_buckets:
    968             vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
     996        for sharenum, buckets in self._share_buckets.iteritems():
     997            for bucket in buckets:
     998                vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
    969999        vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
    9701000        d = vto.start()
     
    10021032    def _get_crypttext_hash_tree(self, res):
    10031033        vchtps = []
    1004         for sharenum, bucket in self._share_buckets:
    1005             vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
    1006             vchtps.append(vchtp)
     1034        for sharenum, buckets in self._share_buckets.iteritems():
     1035            for bucket in buckets:
     1036                vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
     1037                vchtps.append(vchtp)
    10071038
    10081039        _get_crypttext_hash_tree_started = time.time()
     
    10551086
    10561087    def _download_all_segments(self, res):
    1057         for sharenum, bucket in self._share_buckets:
    1058             vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
    1059             self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
     1088        for sharenum, buckets in self._share_buckets.iteritems():
     1089            for bucket in buckets:
     1090                vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
     1091                self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
    10601092
    10611093        # after the above code, self._share_vbuckets contains enough
Note: See TracChangeset for help on using the changeset viewer.