Changeset 2bd9dfa in trunk
- Timestamp:
- 2010-01-27T23:34:17Z (15 years ago)
- Branches:
- master
- Children:
- baa11a0a
- Parents:
- 14280b00
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/immutable/download.py ¶
r14280b00 r2bd9dfa 789 789 790 790 self.active_buckets = {} # k: shnum, v: bucket 791 self._share_buckets = [] # list of (sharenum, bucket) tuples791 self._share_buckets = {} # k: sharenum, v: list of buckets 792 792 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets 793 793 … … 870 870 871 871 def _get_all_shareholders(self): 872 dl = [] 872 """ Once the number of buckets that I know about is >= K then I 873 callback the Deferred that I return. 874 875 If all of the get_buckets deferreds have fired (whether callback or 876 errback) and I still don't have enough buckets then I'll callback the 877 Deferred that I return. 878 """ 879 self._wait_for_enough_buckets_d = defer.Deferred() 880 881 self._queries_sent = 0 882 self._responses_received = 0 883 self._queries_failed = 0 873 884 sb = self._storage_broker 874 885 servers = sb.get_servers_for_index(self._storage_index) … … 879 890 peerid=idlib.shortnodeid_b2a(peerid), 880 891 level=log.NOISY, umid="rT03hg") 892 self._queries_sent += 1 881 893 d = ss.callRemote("get_buckets", self._storage_index) 882 894 d.addCallbacks(self._got_response, self._got_error, 883 895 callbackArgs=(peerid,)) 884 dl.append(d)885 self._responses_received = 0886 self._queries_sent = len(dl)887 896 if self._status: 888 897 self._status.set_status("Locating Shares (%d/%d)" % 889 898 (self._responses_received, 890 899 self._queries_sent)) 891 return defer.DeferredList(dl)900 return self._wait_for_enough_buckets_d 892 901 893 902 def _got_response(self, buckets, peerid): … … 907 916 b = layout.ReadBucketProxy(bucket, peerid, self._storage_index) 908 917 self.add_share_bucket(sharenum, b) 918 # If we just got enough buckets for the first time, then fire the 919 # deferred. Then remove it from self so that we don't fire it 920 # again. 921 if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares: 922 self._wait_for_enough_buckets_d.callback(True) 923 self._wait_for_enough_buckets_d = None 924 925 # Else, if we ran out of outstanding requests then fire it and 926 # remove it from self. 927 assert (self._responses_received+self._queries_failed) <= self._queries_sent 928 if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent: 929 self._wait_for_enough_buckets_d.callback(False) 930 self._wait_for_enough_buckets_d = None 909 931 910 932 if self._results: … … 915 937 def add_share_bucket(self, sharenum, bucket): 916 938 # this is split out for the benefit of test_encode.py 917 self._share_buckets. append( (sharenum, bucket))939 self._share_buckets.setdefault(sharenum, []).append(bucket) 918 940 919 941 def _got_error(self, f): … … 923 945 self.log("Error during get_buckets", failure=f, level=level, 924 946 umid="3uuBUQ") 947 # If we ran out of outstanding requests then errback it and remove it 948 # from self. 949 self._queries_failed += 1 950 assert (self._responses_received+self._queries_failed) <= self._queries_sent 951 if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent: 952 self._wait_for_enough_buckets_d.errback() 953 self._wait_for_enough_buckets_d = None 925 954 926 955 def bucket_failed(self, vbucket): … … 965 994 966 995 vups = [] 967 for sharenum, bucket in self._share_buckets: 968 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) 996 for sharenum, buckets in self._share_buckets.iteritems(): 997 for bucket in buckets: 998 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures)) 969 999 vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid) 970 1000 d = vto.start() … … 1002 1032 def _get_crypttext_hash_tree(self, res): 1003 1033 vchtps = [] 1004 for sharenum, bucket in self._share_buckets: 1005 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) 1006 vchtps.append(vchtp) 1034 for sharenum, buckets in self._share_buckets.iteritems(): 1035 for bucket in buckets: 1036 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures) 1037 vchtps.append(vchtp) 1007 1038 1008 1039 _get_crypttext_hash_tree_started = time.time() … … 1055 1086 1056 1087 def _download_all_segments(self, res): 1057 for sharenum, bucket in self._share_buckets: 1058 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) 1059 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) 1088 for sharenum, buckets in self._share_buckets.iteritems(): 1089 for bucket in buckets: 1090 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) 1091 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) 1060 1092 1061 1093 # after the above code, self._share_vbuckets contains enough
Note: See TracChangeset
for help on using the changeset viewer.