Ticket #1170: 1170-p2.diff
File 1170-p2.diff, 66.0 KB (added by warner, at 2010-08-31T19:56:34Z) |
---|
-
src/allmydata/immutable/downloader/fetcher.py
diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py index e30ced8..e78d37e 100644
a b from foolscap.api import eventually 4 4 from allmydata.interfaces import NotEnoughSharesError, NoSharesError 5 5 from allmydata.util import log 6 6 from allmydata.util.dictutil import DictOfSets 7 from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \8 B ADSEGNUM, BadSegmentNumberError7 from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \ 8 BadSegmentNumberError 9 9 10 10 class SegmentFetcher: 11 11 """I am responsible for acquiring blocks for a single segment. I will use … … class SegmentFetcher: 22 22 will shut down and do no further work. My parent can also call my stop() 23 23 method to have me shut down early.""" 24 24 25 def __init__(self, node, segnum, k ):25 def __init__(self, node, segnum, k, logparent): 26 26 self._node = node # _Node 27 27 self.segnum = segnum 28 28 self._k = k 29 self._shares = {} # maps non-dead Share instance to a state, one of 30 # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). 31 # State transition map is: 32 # AVAILABLE -(send-read)-> PENDING 33 # PENDING -(timer)-> OVERDUE 34 # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM 35 # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM 36 # If a share becomes DEAD, it is removed from the 37 # dict. If it becomes BADSEGNUM, the whole fetch is 38 # terminated. 29 self._shares = [] # unused Share instances, sorted by "goodness" 30 # (RTT), then shnum. This is populated when DYHB 31 # responses arrive, or (for later segments) at 32 # startup. We remove shares from it when we call 33 # sh.get_block() on them. 34 self._shares_from_server = DictOfSets() # maps serverid to set of 35 # Shares on that server for 36 # which we have outstanding 37 # get_block() calls. 38 self._max_shares_per_server = 1 # how many Shares we're allowed to 39 # pull from each server. This starts 40 # at 1 and grows if we don't have 41 # sufficient diversity. 42 self._active_share_map = {} # maps shnum to outstanding (and not 43 # OVERDUE) Share that provides it. 44 self._overdue_share_map = DictOfSets() # shares in the OVERDUE state 45 self._lp = logparent 39 46 self._share_observers = {} # maps Share to EventStreamObserver for 40 47 # active ones 41 self._shnums = DictOfSets() # maps shnum to the shares that provide it42 48 self._blocks = {} # maps shnum to validated block data 43 49 self._no_more_shares = False 44 self._bad_segnum = False45 50 self._last_failure = None 46 51 self._running = True 47 52 48 53 def stop(self): 49 54 log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix, 50 level=log.NOISY, umid="LWyqpg")55 level=log.NOISY, parent=self._lp, umid="LWyqpg") 51 56 self._cancel_all_requests() 52 57 self._running = False 53 self._shares.clear() # let GC work # ??? XXX 58 # help GC ??? XXX 59 del self._shares, self._shares_from_server, self._active_share_map 60 del self._share_observers 54 61 55 62 56 63 # called by our parent _Node … … class SegmentFetcher: 59 66 # called when ShareFinder locates a new share, and when a non-initial 60 67 # segment fetch is started and we already know about shares from the 61 68 # previous segment 62 for s in shares: 63 self._shares[s] = AVAILABLE 64 self._shnums.add(s._shnum, s) 69 self._shares.extend(shares) 70 self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) ) 65 71 eventually(self.loop) 66 72 67 73 def no_more_shares(self): … … class SegmentFetcher: 71 77 72 78 # internal methods 73 79 74 def _count_shnums(self, *states):75 """shnums for which at least one state is in the following list"""76 shnums = []77 for shnum,shares in self._shnums.iteritems():78 matches = [s for s in shares if self._shares.get(s) in states]79 if matches:80 shnums.append(shnum)81 return len(shnums)82 83 80 def loop(self): 84 81 try: 85 82 # if any exception occurs here, kill the download … … class SegmentFetcher: 92 89 k = self._k 93 90 if not self._running: 94 91 return 95 if self._bad_segnum: 92 numsegs, authoritative = self._node.get_num_segments() 93 if authoritative and self.segnum >= numsegs: 96 94 # oops, we were asking for a segment number beyond the end of the 97 95 # file. This is an error. 98 96 self.stop() … … class SegmentFetcher: 102 100 self._node.fetch_failed(self, f) 103 101 return 104 102 103 #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares 104 # Should we sent out more requests? 105 while len(set(self._blocks.keys()) 106 | set(self._active_share_map.keys()) 107 ) < k: 108 # we don't have data or active requests for enough shares. Are 109 # there any unused shares we can start using? 110 (sent_something, want_more_diversity) = self._find_and_use_share() 111 if sent_something: 112 # great. loop back around in case we need to send more. 113 continue 114 if want_more_diversity: 115 # we could have sent something if we'd been allowed to pull 116 # more shares per server. Increase the limit and try again. 117 self._max_shares_per_server += 1 118 log.msg("SegmentFetcher(%s) increasing diversity limit to %d" 119 % (self._node._si_prefix, self._max_shares_per_server), 120 level=log.NOISY, umid="xY2pBA") 121 # Also ask for more shares, in the hopes of achieving better 122 # diversity for the next segment. 123 self._ask_for_more_shares() 124 continue 125 # we need more shares than the ones in self._shares to make 126 # progress 127 self._ask_for_more_shares() 128 if self._no_more_shares: 129 # But there are no more shares to be had. If we're going to 130 # succeed, it will be with the shares we've already seen. 131 # Will they be enough? 132 if len(set(self._blocks.keys()) 133 | set(self._active_share_map.keys()) 134 | set(self._overdue_share_map.keys()) 135 ) < k: 136 # nope. bail. 137 self._no_shares_error() # this calls self.stop() 138 return 139 # our outstanding or overdue requests may yet work. 140 # more shares may be coming. Wait until then. 141 return 142 105 143 # are we done? 106 if self._count_shnums(COMPLETE) >= k:144 if len(set(self._blocks.keys())) >= k: 107 145 # yay! 108 146 self.stop() 109 147 self._node.process_blocks(self.segnum, self._blocks) 110 148 return 111 149 112 # we may have exhausted everything 113 if (self._no_more_shares and 114 self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): 115 # no more new shares are coming, and the remaining hopeful shares 116 # aren't going to be enough. boo! 117 118 log.msg("share states: %r" % (self._shares,), 119 level=log.NOISY, umid="0ThykQ") 120 if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: 121 format = ("no shares (need %(k)d)." 122 " Last failure: %(last_failure)s") 123 args = { "k": k, 124 "last_failure": self._last_failure } 125 error = NoSharesError 126 else: 127 format = ("ran out of shares: %(complete)d complete," 128 " %(pending)d pending, %(overdue)d overdue," 129 " %(unused)d unused, need %(k)d." 130 " Last failure: %(last_failure)s") 131 args = {"complete": self._count_shnums(COMPLETE), 132 "pending": self._count_shnums(PENDING), 133 "overdue": self._count_shnums(OVERDUE), 134 # 'unused' should be zero 135 "unused": self._count_shnums(AVAILABLE), 136 "k": k, 137 "last_failure": self._last_failure, 138 } 139 error = NotEnoughSharesError 140 log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) 141 e = error(format % args) 142 f = Failure(e) 143 self.stop() 144 self._node.fetch_failed(self, f) 145 return 150 def _no_shares_error(self): 151 if not (self._shares or self._active_share_map or 152 self._overdue_share_map or self._blocks): 153 format = ("no shares (need %(k)d)." 154 " Last failure: %(last_failure)s") 155 args = { "k": self._k, 156 "last_failure": self._last_failure } 157 error = NoSharesError 158 else: 159 format = ("ran out of shares: complete=%(complete)s" 160 " pending=%(pending)s overdue=%(overdue)s" 161 " unused=%(unused)s need %(k)d." 162 " Last failure: %(last_failure)s") 163 def join(shnums): return ",".join(["sh%d" % shnum 164 for shnum in sorted(shnums)]) 165 pending_s = ",".join([str(sh) 166 for sh in self._active_share_map.values()]) 167 overdue = set() 168 for shares in self._overdue_share_map.values(): 169 overdue |= shares 170 overdue_s = ",".join([str(sh) for sh in overdue]) 171 args = {"complete": join(self._blocks.keys()), 172 "pending": pending_s, 173 "overdue": overdue_s, 174 # 'unused' should be zero 175 "unused": ",".join([str(sh) for sh in self._shares]), 176 "k": self._k, 177 "last_failure": self._last_failure, 178 } 179 error = NotEnoughSharesError 180 log.msg(format=format, 181 level=log.UNUSUAL, parent=self._lp, umid="1DsnTg", 182 **args) 183 e = error(format % args) 184 f = Failure(e) 185 self.stop() 186 self._node.fetch_failed(self, f) 146 187 147 # nope, not done. Are we "block-hungry" (i.e. do we want to send out 148 # more read requests, or do we think we have enough in flight 149 # already?) 150 while self._count_shnums(PENDING, COMPLETE) < k: 151 # we're hungry.. are there any unused shares? 152 sent = self._send_new_request() 153 if not sent: 154 break 155 156 # ok, now are we "share-hungry" (i.e. do we have enough known shares 157 # to make us happy, or should we ask the ShareFinder to get us more?) 158 if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: 159 # we're hungry for more shares 160 self._node.want_more_shares() 161 # that will trigger the ShareFinder to keep looking 162 163 def _find_one(self, shares, state): 164 # TODO could choose fastest, or avoid servers already in use 165 for s in shares: 166 if self._shares[s] == state: 167 return s 168 # can never get here, caller has assert in case of code bug 169 170 def _send_new_request(self): 171 # TODO: this is probably O(k^2), and we're called from a range(k) 172 # loop, so O(k^3) 173 174 # this first loop prefers sh0, then sh1, sh2, etc 175 for shnum,shares in sorted(self._shnums.iteritems()): 176 states = [self._shares[s] for s in shares] 177 if COMPLETE in states or PENDING in states: 178 # don't send redundant requests 188 def _find_and_use_share(self): 189 sent_something = False 190 want_more_diversity = False 191 for sh in self._shares: # find one good share to fetch 192 shnum = sh._shnum ; serverid = sh._peerid 193 if shnum in self._blocks: 194 continue # don't request data we already have 195 if shnum in self._active_share_map: 196 # note: OVERDUE shares are removed from _active_share_map 197 # and added to _overdue_share_map instead. 198 continue # don't send redundant requests 199 sfs = self._shares_from_server 200 if len(sfs.get(serverid,set())) >= self._max_shares_per_server: 201 # don't pull too much from a single server 202 want_more_diversity = True 179 203 continue 180 if AVAILABLE not in states: 181 # no candidates for this shnum, move on 182 continue 183 # here's a candidate. Send a request. 184 s = self._find_one(shares, AVAILABLE) 185 assert s 186 self._shares[s] = PENDING 187 self._share_observers[s] = o = s.get_block(self.segnum) 188 o.subscribe(self._block_request_activity, share=s, shnum=shnum) 189 # TODO: build up a list of candidates, then walk through the 190 # list, sending requests to the most desireable servers, 191 # re-checking our block-hunger each time. For non-initial segment 192 # fetches, this would let us stick with faster servers. 193 return True 194 # nothing was sent: don't call us again until you have more shares to 195 # work with, or one of the existing shares has been declared OVERDUE 196 return False 204 # ok, we can use this share 205 self._shares.remove(sh) 206 self._active_share_map[shnum] = sh 207 self._shares_from_server.add(serverid, sh) 208 self._start_share(sh, shnum) 209 sent_something = True 210 break 211 return (sent_something, want_more_diversity) 212 213 def _start_share(self, share, shnum): 214 self._share_observers[share] = o = share.get_block(self.segnum) 215 o.subscribe(self._block_request_activity, share=share, shnum=shnum) 216 217 def _ask_for_more_shares(self): 218 if not self._no_more_shares: 219 self._node.want_more_shares() 220 # that will trigger the ShareFinder to keep looking, and call our 221 # add_shares() or no_more_shares() later. 197 222 198 223 def _cancel_all_requests(self): 199 224 for o in self._share_observers.values(): … … class SegmentFetcher: 207 232 log.msg("SegmentFetcher(%s)._block_request_activity:" 208 233 " Share(sh%d-on-%s) -> %s" % 209 234 (self._node._si_prefix, shnum, share._peerid_s, state), 210 level=log.NOISY, umid="vilNWA") 211 # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. 235 level=log.NOISY, parent=self._lp, umid="vilNWA") 236 # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share 237 # from all our tracking lists. 212 238 if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): 213 239 self._share_observers.pop(share, None) 240 self._shares_from_server.discard(shnum, share) 241 if self._active_share_map.get(shnum) is share: 242 del self._active_share_map[shnum] 243 self._overdue_share_map.discard(shnum, share) 244 214 245 if state is COMPLETE: 215 # 'block' is fully validated 216 self._shares[share] = COMPLETE 246 # 'block' is fully validated and complete 217 247 self._blocks[shnum] = block 218 elif state is OVERDUE: 219 self._shares[share] = OVERDUE 248 249 if state is OVERDUE: 250 # no longer active, but still might complete 251 del self._active_share_map[shnum] 252 self._overdue_share_map.add(shnum, share) 220 253 # OVERDUE is not terminal: it will eventually transition to 221 254 # COMPLETE, CORRUPT, or DEAD. 222 elif state is CORRUPT:223 self._shares[share] = CORRUPT224 elif state is DEAD:225 del self._shares[share]226 self._shnums[shnum].remove(share)227 self._last_failure = f228 elif state is BADSEGNUM:229 self._shares[share] = BADSEGNUM # ???230 self._bad_segnum = True231 eventually(self.loop)232 255 256 if state is DEAD: 257 self._last_failure = f 258 if state is BADSEGNUM: 259 # our main loop will ask the DownloadNode each time for the 260 # number of segments, so we'll deal with this in the top of 261 # _do_loop 262 pass 233 263 264 eventually(self.loop) -
src/allmydata/immutable/downloader/finder.py
diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 9adee99..fa6204c 100644
a b class ShareFinder: 35 35 self._storage_broker = storage_broker 36 36 self.share_consumer = self.node = node 37 37 self.max_outstanding_requests = max_outstanding_requests 38 39 38 self._hungry = False 40 39 41 40 self._commonshares = {} # shnum to CommonShare instance 42 self.undelivered_shares = []43 41 self.pending_requests = set() 44 42 self.overdue_requests = set() # subset of pending_requests 45 43 self.overdue_timers = {} … … class ShareFinder: 52 50 si=self._si_prefix, 53 51 level=log.NOISY, parent=logparent, umid="2xjj2A") 54 52 53 def update_num_segments(self): 54 (numsegs, authoritative) = self.node.get_num_segments() 55 assert authoritative 56 for cs in self._commonshares.values(): 57 cs.set_authoritative_num_segments(numsegs) 58 55 59 def start_finding_servers(self): 56 60 # don't get servers until somebody uses us: creating the 57 61 # ImmutableFileNode should not cause work to happen yet. Test case is … … class ShareFinder: 83 87 84 88 # internal methods 85 89 def loop(self): 86 undelivered_s = ",".join(["sh%d@%s" %87 (s._shnum, idlib.shortnodeid_b2a(s._peerid))88 for s in self.undelivered_shares])89 90 pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) 90 91 for rt in self.pending_requests]) # sort? 91 92 self.log(format="ShareFinder loop: running=%(running)s" 92 " hungry=%(hungry)s, undelivered=%(undelivered)s," 93 " pending=%(pending)s", 94 running=self.running, hungry=self._hungry, 95 undelivered=undelivered_s, pending=pending_s, 93 " hungry=%(hungry)s, pending=%(pending)s", 94 running=self.running, hungry=self._hungry, pending=pending_s, 96 95 level=log.NOISY, umid="kRtS4Q") 97 96 if not self.running: 98 97 return 99 98 if not self._hungry: 100 99 return 101 if self.undelivered_shares:102 sh = self.undelivered_shares.pop(0)103 # they will call hungry() again if they want more104 self._hungry = False105 self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",106 shnum=sh._shnum, peerid=sh._peerid_s,107 level=log.NOISY, umid="2n1qQw")108 eventually(self.share_consumer.got_shares, [sh])109 return110 100 111 101 non_overdue = self.pending_requests - self.overdue_requests 112 102 if len(non_overdue) >= self.max_outstanding_requests: … … class ShareFinder: 146 136 lp = self.log(format="sending DYHB to [%(peerid)s]", 147 137 peerid=idlib.shortnodeid_b2a(peerid), 148 138 level=log.NOISY, umid="Io7pyg") 149 d_ev = self._download_status.add_dyhb_sent(peerid, now()) 139 time_sent = now() 140 d_ev = self._download_status.add_dyhb_sent(peerid, time_sent) 150 141 # TODO: get the timer from a Server object, it knows best 151 142 self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT, 152 143 self.overdue, req) 153 144 d = rref.callRemote("get_buckets", self._storage_index) 154 145 d.addBoth(incidentally, self._request_retired, req) 155 146 d.addCallbacks(self._got_response, self._got_error, 156 callbackArgs=(rref.version, peerid, req, d_ev, lp), 147 callbackArgs=(rref.version, peerid, req, d_ev, 148 time_sent, lp), 157 149 errbackArgs=(peerid, req, d_ev, lp)) 158 150 d.addErrback(log.err, format="error in send_request", 159 151 level=log.WEIRD, parent=lp, umid="rpdV0w") … … class ShareFinder: 172 164 self.overdue_requests.add(req) 173 165 eventually(self.loop) 174 166 175 def _got_response(self, buckets, server_version, peerid, req, d_ev, lp): 167 def _got_response(self, buckets, server_version, peerid, req, d_ev, 168 time_sent, lp): 176 169 shnums = sorted([shnum for shnum in buckets]) 177 d_ev.finished(shnums, now()) 178 if buckets: 179 shnums_s = ",".join([str(shnum) for shnum in shnums]) 180 self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", 181 shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), 182 level=log.NOISY, parent=lp, umid="0fcEZw") 183 else: 170 time_received = now() 171 d_ev.finished(shnums, time_received) 172 dyhb_rtt = time_received - time_sent 173 if not buckets: 184 174 self.log(format="no shares from [%(peerid)s]", 185 175 peerid=idlib.shortnodeid_b2a(peerid), 186 176 level=log.NOISY, parent=lp, umid="U7d4JA") 187 if self.node.num_segments is None: 188 best_numsegs = self.node.guessed_num_segments 189 else: 190 best_numsegs = self.node.num_segments 177 return 178 shnums_s = ",".join([str(shnum) for shnum in shnums]) 179 self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", 180 shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), 181 level=log.NOISY, parent=lp, umid="0fcEZw") 182 shares = [] 191 183 for shnum, bucket in buckets.iteritems(): 192 self._create_share(best_numsegs, shnum, bucket, server_version, 193 peerid) 184 s = self._create_share(shnum, bucket, server_version, peerid, 185 dyhb_rtt) 186 shares.append(s) 187 self._deliver_shares(shares) 194 188 195 def _create_share(self, best_numsegs, shnum, bucket, server_version, 196 peerid): 189 def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt): 197 190 if shnum in self._commonshares: 198 191 cs = self._commonshares[shnum] 199 192 else: 200 cs = CommonShare(best_numsegs, self._si_prefix, shnum, 193 numsegs, authoritative = self.node.get_num_segments() 194 cs = CommonShare(numsegs, self._si_prefix, shnum, 201 195 self._node_logparent) 196 if authoritative: 197 cs.set_authoritative_num_segments(numsegs) 202 198 # Share._get_satisfaction is responsible for updating 203 199 # CommonShare.set_numsegs after we know the UEB. Alternatives: 204 200 # 1: d = self.node.get_num_segments() … … class ShareFinder: 214 210 # Yuck. 215 211 self._commonshares[shnum] = cs 216 212 s = Share(bucket, server_version, self.verifycap, cs, self.node, 217 self._download_status, peerid, shnum, 213 self._download_status, peerid, shnum, dyhb_rtt, 218 214 self._node_logparent) 219 self.undelivered_shares.append(s) 215 return s 216 217 def _deliver_shares(self, shares): 218 # they will call hungry() again if they want more 219 self._hungry = False 220 shares_s = ",".join([str(sh) for sh in shares]) 221 self.log(format="delivering shares: %s" % shares_s, 222 level=log.NOISY, umid="2n1qQw") 223 eventually(self.share_consumer.got_shares, shares) 220 224 221 225 def _got_error(self, f, peerid, req, d_ev, lp): 222 226 d_ev.finished("error", now()) -
src/allmydata/immutable/downloader/node.py
diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py index 4c92dd8..33c16cf 100644
a b class DownloadNode: 72 72 # things to track callers that want data 73 73 74 74 # _segment_requests can have duplicates 75 self._segment_requests = [] # (segnum, d, cancel_handle )75 self._segment_requests = [] # (segnum, d, cancel_handle, logparent) 76 76 self._active_segment = None # a SegmentFetcher, with .segnum 77 77 78 78 self._segsize_observers = observer.OneShotObserverList() … … class DownloadNode: 81 81 # for each read() call. Segmentation and get_segment() messages are 82 82 # associated with the read() call, everything else is tied to the 83 83 # _Node's log entry. 84 lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d," 84 lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:" 85 " size=%(size)d," 85 86 " guessed_segsize=%(guessed_segsize)d," 86 87 " guessed_numsegs=%(guessed_numsegs)d", 87 88 si=self._si_prefix, size=verifycap.size, … … class DownloadNode: 103 104 # as with CommonShare, our ciphertext_hash_tree is a stub until we 104 105 # get the real num_segments 105 106 self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) 107 self.ciphertext_hash_tree_leaves = self.guessed_num_segments 106 108 107 109 def __repr__(self): 108 return "Imm _Node(%s)" % (self._si_prefix,)110 return "ImmutableDownloadNode(%s)" % (self._si_prefix,) 109 111 110 112 def stop(self): 111 113 # called by the Terminator at shutdown, mostly for tests … … class DownloadNode: 175 177 The Deferred can also errback with other fatal problems, such as 176 178 NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. 177 179 """ 178 l og.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",179 si=base32.b2a(self._verifycap.storage_index)[:8],180 segnum=segnum,181 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")180 lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", 181 si=base32.b2a(self._verifycap.storage_index)[:8], 182 segnum=segnum, 183 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") 182 184 self._download_status.add_segment_request(segnum, now()) 183 185 d = defer.Deferred() 184 186 c = Cancel(self._cancel_request) 185 self._segment_requests.append( (segnum, d, c ) )187 self._segment_requests.append( (segnum, d, c, lp) ) 186 188 self._start_new_segment() 187 189 return (d, c) 188 190 … … class DownloadNode: 208 210 if self._active_segment is None and self._segment_requests: 209 211 segnum = self._segment_requests[0][0] 210 212 k = self._verifycap.needed_shares 213 lp = self._segment_requests[0][3] 211 214 log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d", 212 215 node=repr(self), segnum=segnum, 213 level=log.NOISY, umid="wAlnHQ")214 self._active_segment = fetcher = SegmentFetcher(self, segnum, k )216 level=log.NOISY, parent=lp, umid="wAlnHQ") 217 self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp) 215 218 active_shares = [s for s in self._shares if s.is_alive()] 216 219 fetcher.add_shares(active_shares) # this triggers the loop 217 220 … … class DownloadNode: 234 237 h = hashutil.uri_extension_hash(UEB_s) 235 238 if h != self._verifycap.uri_extension_hash: 236 239 raise BadHashError 237 UEB_dict = uri.unpack_extension(UEB_s) 238 self._parse_and_store_UEB(UEB_dict) # sets self._stuff 240 self._parse_and_store_UEB(UEB_s) # sets self._stuff 239 241 # TODO: a malformed (but authentic) UEB could throw an assertion in 240 242 # _parse_and_store_UEB, and we should abandon the download. 241 243 self.have_UEB = True 242 244 243 def _parse_and_store_UEB(self, d): 245 # inform the ShareFinder about our correct number of segments. This 246 # will update the block-hash-trees in all existing CommonShare 247 # instances, and will populate new ones with the correct value. 248 self._sharefinder.update_num_segments() 249 250 def _parse_and_store_UEB(self, UEB_s): 244 251 # Note: the UEB contains needed_shares and total_shares. These are 245 252 # redundant and inferior (the filecap contains the authoritative 246 253 # values). However, because it is possible to encode the same file in … … class DownloadNode: 252 259 253 260 # therefore, we ignore d['total_shares'] and d['needed_shares']. 254 261 262 d = uri.unpack_extension(UEB_s) 263 255 264 log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", 256 ueb=repr(d), vcap=self._verifycap.to_string(), 265 ueb=repr(uri.unpack_extension_readable(UEB_s)), 266 vcap=self._verifycap.to_string(), 257 267 level=log.NOISY, parent=self._lp, umid="cVqZnA") 258 268 259 269 k, N = self._verifycap.needed_shares, self._verifycap.total_shares … … class DownloadNode: 292 302 # shares of file B. self.ciphertext_hash_tree was a guess before: 293 303 # this is where we create it for real. 294 304 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) 305 self.ciphertext_hash_tree_leaves = self.num_segments 295 306 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) 296 307 297 308 self.share_hash_tree.set_hashes({0: d['share_root_hash']}) … … class DownloadNode: 344 355 % (hashnum, len(self.share_hash_tree))) 345 356 self.share_hash_tree.set_hashes(share_hashes) 346 357 358 def get_desired_ciphertext_hashes(self, segnum): 359 if segnum < self.ciphertext_hash_tree_leaves: 360 return self.ciphertext_hash_tree.needed_hashes(segnum, 361 include_leaf=True) 362 return [] 347 363 def get_needed_ciphertext_hashes(self, segnum): 348 364 cht = self.ciphertext_hash_tree 349 365 return cht.needed_hashes(segnum, include_leaf=True) 366 350 367 def process_ciphertext_hashes(self, hashes): 351 368 assert self.num_segments is not None 352 369 # this may raise BadHashError or NotEnoughHashesError … … class DownloadNode: 457 474 def _extract_requests(self, segnum): 458 475 """Remove matching requests and return their (d,c) tuples so that the 459 476 caller can retire them.""" 460 retire = [(d,c) for (segnum0, d, c ) in self._segment_requests477 retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests 461 478 if segnum0 == segnum] 462 479 self._segment_requests = [t for t in self._segment_requests 463 480 if t[0] != segnum] … … class DownloadNode: 466 483 def _cancel_request(self, c): 467 484 self._segment_requests = [t for t in self._segment_requests 468 485 if t[2] != c] 469 segnums = [segnum for (segnum,d,c ) in self._segment_requests]486 segnums = [segnum for (segnum,d,c,lp) in self._segment_requests] 470 487 # self._active_segment might be None in rare circumstances, so make 471 488 # sure we tolerate it 472 489 if self._active_segment and self._active_segment.segnum not in segnums: 473 490 self._active_segment.stop() 474 491 self._active_segment = None 475 492 self._start_new_segment() 493 494 # called by ShareFinder to choose hashtree sizes in CommonShares, and by 495 # SegmentFetcher to tell if it is still fetching a valid segnum. 496 def get_num_segments(self): 497 # returns (best_num_segments, authoritative) 498 if self.num_segments is None: 499 return (self.guessed_num_segments, False) 500 return (self.num_segments, True) -
src/allmydata/immutable/downloader/share.py
diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py index 413f907..78cce8e 100644
a b class Share: 33 33 # servers. A different backend would use a different class. 34 34 35 35 def __init__(self, rref, server_version, verifycap, commonshare, node, 36 download_status, peerid, shnum, logparent):36 download_status, peerid, shnum, dyhb_rtt, logparent): 37 37 self._rref = rref 38 38 self._server_version = server_version 39 39 self._node = node # holds share_hash_tree and UEB … … class Share: 51 51 self._storage_index = verifycap.storage_index 52 52 self._si_prefix = base32.b2a(verifycap.storage_index)[:8] 53 53 self._shnum = shnum 54 self._dyhb_rtt = dyhb_rtt 54 55 # self._alive becomes False upon fatal corruption or server error 55 56 self._alive = True 56 57 self._lp = log.msg(format="%(share)s created", share=repr(self), … … class Share: 278 279 if not self._satisfy_UEB(): 279 280 # can't check any hashes without the UEB 280 281 return False 282 # the call to _satisfy_UEB() will immediately set the 283 # authoritative num_segments in all our CommonShares. If we 284 # guessed wrong, we might stil be working on a bogus segnum 285 # (beyond the real range). We catch this and signal BADSEGNUM 286 # before invoking any further code that touches hashtrees. 281 287 self.actual_segment_size = self._node.segment_size # might be updated 282 288 assert self.actual_segment_size is not None 283 289 284 # knowing the UEB means knowing num_segments. Despite the redundancy, 285 # this is the best place to set this. CommonShare.set_numsegs will 286 # ignore duplicate calls. 290 # knowing the UEB means knowing num_segments 287 291 assert self._node.num_segments is not None 288 cs = self._commonshare289 cs.set_numsegs(self._node.num_segments)290 292 291 293 segnum, observers = self._active_segnum_and_observers() 292 294 # if segnum is None, we don't really need to do anything (we have no … … class Share: 304 306 # can't check block_hash_tree without a root 305 307 return False 306 308 307 if cs.need_block_hash_root():309 if self._commonshare.need_block_hash_root(): 308 310 block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) 309 cs.set_block_hash_root(block_hash_root)311 self._commonshare.set_block_hash_root(block_hash_root) 310 312 311 313 if segnum is None: 312 314 return False # we don't want any particular segment right now … … class Share: 360 362 ] ): 361 363 offsets[field] = fields[i] 362 364 self.actual_offsets = offsets 363 log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) 365 log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields), 366 level=log.NOISY, parent=self._lp, umid="jedQcw") 364 367 self._received.remove(0, 4) # don't need this anymore 365 368 366 369 # validate the offsets a bit … … class Share: 517 520 block = self._received.pop(blockstart, blocklen) 518 521 if not block: 519 522 log.msg("no data for block %s (want [%d:+%d])" % (repr(self), 520 blockstart, blocklen)) 523 blockstart, blocklen), 524 level=log.NOISY, parent=self._lp, umid="aK0RFw") 521 525 return False 522 526 log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]", 523 527 share=repr(self), start=blockstart, length=blocklen, … … class Share: 589 593 if self.actual_offsets or self._overrun_ok: 590 594 if not self._node.have_UEB: 591 595 self._desire_UEB(desire, o) 592 # They might ask for a segment that doesn't look right. 593 # _satisfy() will catch+reject bad segnums once we know the UEB 594 # (and therefore segsize and numsegs), so we'll only fail this 595 # test if we're still guessing. We want to avoid asking the 596 # hashtrees for needed_hashes() for bad segnums. So don't enter 597 # _desire_hashes or _desire_data unless the segnum looks 598 # reasonable. 599 if segnum < r["num_segments"]: 600 # XXX somehow we're getting here for sh5. we don't yet know 601 # the actual_segment_size, we're still working off the guess. 602 # the ciphertext_hash_tree has been corrected, but the 603 # commonshare._block_hash_tree is still in the guessed state. 604 self._desire_share_hashes(desire, o) 605 if segnum is not None: 606 self._desire_block_hashes(desire, o, segnum) 607 self._desire_data(desire, o, r, segnum, segsize) 608 else: 609 log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)" 610 % (segnum, r["num_segments"]), 611 level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") 596 self._desire_share_hashes(desire, o) 597 if segnum is not None: 598 # They might be asking for a segment number that is beyond 599 # what we guess the file contains, but _desire_block_hashes 600 # and _desire_data will tolerate that. 601 self._desire_block_hashes(desire, o, segnum) 602 self._desire_data(desire, o, r, segnum, segsize) 612 603 613 604 log.msg("end _desire: want_it=%s need_it=%s gotta=%s" 614 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump())) 605 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()), 606 level=log.NOISY, parent=self._lp, umid="IG7CgA") 615 607 if self.actual_offsets: 616 608 return (want_it, need_it+gotta_gotta_have_it) 617 609 else: … … class Share: 681 673 (want_it, need_it, gotta_gotta_have_it) = desire 682 674 683 675 # block hash chain 684 for hashnum in self._commonshare.get_ needed_block_hashes(segnum):676 for hashnum in self._commonshare.get_desired_block_hashes(segnum): 685 677 need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 686 678 687 679 # ciphertext hash chain 688 for hashnum in self._node.get_ needed_ciphertext_hashes(segnum):680 for hashnum in self._node.get_desired_ciphertext_hashes(segnum): 689 681 need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) 690 682 691 683 def _desire_data(self, desire, o, r, segnum, segsize): 684 if segnum > r["num_segments"]: 685 # they're asking for a segment that's beyond what we think is the 686 # end of the file. We won't get here if we've already learned the 687 # real UEB: _get_satisfaction() will notice the out-of-bounds and 688 # terminate the loop. So we must still be guessing, which means 689 # that they might be correct in asking for such a large segnum. 690 # But if they're right, then our segsize/segnum guess is 691 # certainly wrong, which means we don't know what data blocks to 692 # ask for yet. So don't bother adding anything. When the UEB 693 # comes back and we learn the correct segsize/segnums, we'll 694 # either reject the request or have enough information to proceed 695 # normally. This costs one roundtrip. 696 log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)" 697 % (segnum, r["num_segments"]), 698 level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ") 699 return 692 700 (want_it, need_it, gotta_gotta_have_it) = desire 693 701 tail = (segnum == r["num_segments"]-1) 694 702 datastart = o["data"] … … class Share: 803 811 804 812 805 813 class CommonShare: 814 # TODO: defer creation of the hashtree until somebody uses us. There will 815 # be a lot of unused shares, and we shouldn't spend the memory on a large 816 # hashtree unless necessary. 806 817 """I hold data that is common across all instances of a single share, 807 818 like sh2 on both servers A and B. This is just the block hash tree. 808 819 """ 809 def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):820 def __init__(self, best_numsegs, si_prefix, shnum, logparent): 810 821 self.si_prefix = si_prefix 811 822 self.shnum = shnum 823 812 824 # in the beginning, before we have the real UEB, we can only guess at 813 825 # the number of segments. But we want to ask for block hashes early. 814 826 # So if we're asked for which block hashes are needed before we know 815 827 # numsegs for sure, we return a guess. 816 self._block_hash_tree = IncompleteHashTree(guessed_numsegs) 817 self._know_numsegs = False 828 self._block_hash_tree = IncompleteHashTree(best_numsegs) 829 self._block_hash_tree_is_authoritative = False 830 self._block_hash_tree_leaves = best_numsegs 818 831 self._logparent = logparent 819 832 820 def set_numsegs(self, numsegs): 821 if self._know_numsegs: 822 return 823 self._block_hash_tree = IncompleteHashTree(numsegs) 824 self._know_numsegs = True 833 def __repr__(self): 834 return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum) 835 836 def set_authoritative_num_segments(self, numsegs): 837 if self._block_hash_tree_leaves != numsegs: 838 self._block_hash_tree = IncompleteHashTree(numsegs) 839 self._block_hash_tree_leaves = numsegs 840 self._block_hash_tree_is_authoritative = True 825 841 826 842 def need_block_hash_root(self): 827 843 return bool(not self._block_hash_tree[0]) 828 844 829 845 def set_block_hash_root(self, roothash): 830 assert self._ know_numsegs846 assert self._block_hash_tree_is_authoritative 831 847 self._block_hash_tree.set_hashes({0: roothash}) 832 848 849 def get_desired_block_hashes(self, segnum): 850 if segnum < self._block_hash_tree_leaves: 851 return self._block_hash_tree.needed_hashes(segnum, 852 include_leaf=True) 853 854 # the segnum might be out-of-bounds. Originally it was due to a race 855 # between the receipt of the UEB on one share (from which we learn 856 # the correct number of segments, update all hash trees to the right 857 # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery 858 # of a new Share to the SegmentFetcher while that BADSEGNUM was 859 # queued (which sends out requests to the stale segnum, now larger 860 # than the hash tree). I fixed that (by making SegmentFetcher.loop 861 # check for a bad segnum at the start of each pass, instead of using 862 # the queued BADSEGNUM or a flag it sets), but just in case this 863 # still happens, I'm leaving the < in place. If it gets hit, there's 864 # a potential lost-progress problem, but I'm pretty sure that it will 865 # get cleared up on the following turn. 866 return [] 867 833 868 def get_needed_block_hashes(self, segnum): 869 assert self._block_hash_tree_is_authoritative 834 870 # XXX: include_leaf=True needs thought: how did the old downloader do 835 871 # it? I think it grabbed *all* block hashes and set them all at once. 836 872 # Since we want to fetch less data, we either need to fetch the leaf … … class CommonShare: 840 876 return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) 841 877 842 878 def process_block_hashes(self, block_hashes): 843 assert self._ know_numsegs879 assert self._block_hash_tree_is_authoritative 844 880 # this may raise BadHashError or NotEnoughHashesError 845 881 self._block_hash_tree.set_hashes(block_hashes) 846 882 847 883 def check_block(self, segnum, block): 848 assert self._ know_numsegs884 assert self._block_hash_tree_is_authoritative 849 885 h = hashutil.block_hash(block) 850 886 # this may raise BadHashError or NotEnoughHashesError 851 887 self._block_hash_tree.set_hashes(leaves={segnum: h}) 888 889 # TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an 890 # auxilliary OVERDUE callback. Just make sure to get all the messages in the 891 # right order and on the right turns. 892 893 # TODO: we're asking for too much data. We probably don't need 894 # include_leaf=True in the block hash tree or ciphertext hash tree. 895 896 # TODO: we ask for ciphertext hash tree nodes from all shares (whenever 897 # _desire is called while we're missing those nodes), but we only consume it 898 # from the first response, leaving the rest of the data sitting in _received. 899 # This was ameliorated by clearing self._received after each block is 900 # complete. -
src/allmydata/test/test_cli.py
diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py index db5bf5f..2453126 100644
a b class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): 2303 2303 # the download is abandoned as soon as it's clear that we won't get 2304 2304 # enough shares. The one remaining share might be in either the 2305 2305 # COMPLETE or the PENDING state. 2306 in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused,need 3"2307 in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused,need 3"2306 in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3" 2307 in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3" 2308 2308 2309 2309 d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) 2310 2310 def _check1((rc, out, err)): -
src/allmydata/test/test_download.py
diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 71a556b..40f0d62 100644
a b from allmydata.test.no_network import GridTestMixin 15 15 from allmydata.test.common import ShouldFailMixin 16 16 from allmydata.interfaces import NotEnoughSharesError, NoSharesError 17 17 from allmydata.immutable.downloader.common import BadSegmentNumberError, \ 18 BadCiphertextHashError, DownloadStopped 18 BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD 19 19 from allmydata.immutable.downloader.status import DownloadStatus 20 from allmydata.immutable.downloader.fetcher import SegmentFetcher 20 21 from allmydata.codec import CRSDecoder 21 22 from foolscap.eventual import fireEventually, flushEventualQueue 22 23 … … class DownloadTest(_Base, unittest.TestCase): 295 296 # shares 296 297 servers = [] 297 298 shares = sorted([s._shnum for s in self.n._cnode._node._shares]) 298 self.failUnlessEqual(shares, [0,1,2 ])299 self.failUnlessEqual(shares, [0,1,2,3]) 299 300 # break the RIBucketReader references 300 301 for s in self.n._cnode._node._shares: 301 302 s._rref.broken = True … … class DownloadTest(_Base, unittest.TestCase): 318 319 self.failUnlessEqual("".join(c.chunks), plaintext) 319 320 shares = sorted([s._shnum for s in self.n._cnode._node._shares]) 320 321 # we should now be using more shares than we were before 321 self.failIfEqual(shares, [0,1,2 ])322 self.failIfEqual(shares, [0,1,2,3]) 322 323 d.addCallback(_check_failover) 323 324 return d 324 325 … … class DownloadTest(_Base, unittest.TestCase): 539 540 def _con1_should_not_succeed(res): 540 541 self.fail("the first read should not have succeeded") 541 542 def _con1_failed(f): 542 self.failUnless(f.check(No tEnoughSharesError))543 self.failUnless(f.check(NoSharesError)) 543 544 con2.producer.stopProducing() 544 545 return d2 545 546 d.addCallbacks(_con1_should_not_succeed, _con1_failed) … … class DownloadTest(_Base, unittest.TestCase): 583 584 def _con1_should_not_succeed(res): 584 585 self.fail("the first read should not have succeeded") 585 586 def _con1_failed(f): 586 self.failUnless(f.check(No tEnoughSharesError))587 self.failUnless(f.check(NoSharesError)) 587 588 # we *don't* cancel the second one here: this exercises a 588 589 # lost-progress bug from #1154. We just wait for it to 589 590 # succeed. … … class Corruption(_Base, unittest.TestCase): 1121 1122 # All these tests result in a failed download. 1122 1123 d.addCallback(self._corrupt_flip_all, imm_uri, i) 1123 1124 d.addCallback(lambda ign: 1124 self.shouldFail(No tEnoughSharesError, which,1125 self.shouldFail(NoSharesError, which, 1125 1126 substring, 1126 1127 _download, imm_uri)) 1127 1128 d.addCallback(lambda ign: self.restore_all_shares(self.shares)) … … class Status(unittest.TestCase): 1257 1258 e2.update(1000, 2.0, 2.0) 1258 1259 e2.finished(now+5) 1259 1260 self.failUnlessEqual(ds.get_progress(), 1.0) 1261 1262 class MyShare: 1263 def __init__(self, shnum, peerid, rtt): 1264 self._shnum = shnum 1265 self._peerid = peerid 1266 self._peerid_s = peerid 1267 self._dyhb_rtt = rtt 1268 def __repr__(self): 1269 return "sh%d-on-%s" % (self._shnum, self._peerid) 1270 1271 class MySegmentFetcher(SegmentFetcher): 1272 def __init__(self, *args, **kwargs): 1273 SegmentFetcher.__init__(self, *args, **kwargs) 1274 self._test_start_shares = [] 1275 def _start_share(self, share, shnum): 1276 self._test_start_shares.append(share) 1277 1278 class FakeNode: 1279 def __init__(self): 1280 self.want_more = 0 1281 self.failed = None 1282 self.processed = None 1283 self._si_prefix = "si_prefix" 1284 def want_more_shares(self): 1285 self.want_more += 1 1286 def fetch_failed(self, fetcher, f): 1287 self.failed = f 1288 def process_blocks(self, segnum, blocks): 1289 self.processed = (segnum, blocks) 1290 def get_num_segments(self): 1291 return 1, True 1292 1293 class Selection(unittest.TestCase): 1294 def test_no_shares(self): 1295 node = FakeNode() 1296 sf = SegmentFetcher(node, 0, 3, None) 1297 sf.add_shares([]) 1298 d = flushEventualQueue() 1299 def _check1(ign): 1300 self.failUnlessEqual(node.want_more, 1) 1301 self.failUnlessEqual(node.failed, None) 1302 sf.no_more_shares() 1303 return flushEventualQueue() 1304 d.addCallback(_check1) 1305 def _check2(ign): 1306 self.failUnless(node.failed) 1307 self.failUnless(node.failed.check(NoSharesError)) 1308 d.addCallback(_check2) 1309 return d 1310 1311 def test_only_one_share(self): 1312 node = FakeNode() 1313 sf = MySegmentFetcher(node, 0, 3, None) 1314 shares = [MyShare(0, "peer-A", 0.0)] 1315 sf.add_shares(shares) 1316 d = flushEventualQueue() 1317 def _check1(ign): 1318 self.failUnlessEqual(node.want_more, 1) 1319 self.failUnlessEqual(node.failed, None) 1320 sf.no_more_shares() 1321 return flushEventualQueue() 1322 d.addCallback(_check1) 1323 def _check2(ign): 1324 self.failUnless(node.failed) 1325 self.failUnless(node.failed.check(NotEnoughSharesError)) 1326 self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=", 1327 str(node.failed)) 1328 d.addCallback(_check2) 1329 return d 1330 1331 def test_good_diversity_early(self): 1332 node = FakeNode() 1333 sf = MySegmentFetcher(node, 0, 3, None) 1334 shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] 1335 sf.add_shares(shares) 1336 d = flushEventualQueue() 1337 def _check1(ign): 1338 self.failUnlessEqual(node.want_more, 0) 1339 self.failUnlessEqual(sf._test_start_shares, shares[:3]) 1340 for sh in sf._test_start_shares: 1341 sf._block_request_activity(sh, sh._shnum, COMPLETE, 1342 "block-%d" % sh._shnum) 1343 return flushEventualQueue() 1344 d.addCallback(_check1) 1345 def _check2(ign): 1346 self.failIfEqual(node.processed, None) 1347 self.failUnlessEqual(node.processed, (0, {0: "block-0", 1348 1: "block-1", 1349 2: "block-2"}) ) 1350 d.addCallback(_check2) 1351 return d 1352 1353 def test_good_diversity_late(self): 1354 node = FakeNode() 1355 sf = MySegmentFetcher(node, 0, 3, None) 1356 shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] 1357 sf.add_shares([]) 1358 d = flushEventualQueue() 1359 def _check1(ign): 1360 self.failUnlessEqual(node.want_more, 1) 1361 sf.add_shares(shares) 1362 return flushEventualQueue() 1363 d.addCallback(_check1) 1364 def _check2(ign): 1365 self.failUnlessEqual(sf._test_start_shares, shares[:3]) 1366 for sh in sf._test_start_shares: 1367 sf._block_request_activity(sh, sh._shnum, COMPLETE, 1368 "block-%d" % sh._shnum) 1369 return flushEventualQueue() 1370 d.addCallback(_check2) 1371 def _check3(ign): 1372 self.failIfEqual(node.processed, None) 1373 self.failUnlessEqual(node.processed, (0, {0: "block-0", 1374 1: "block-1", 1375 2: "block-2"}) ) 1376 d.addCallback(_check3) 1377 return d 1378 1379 def test_avoid_bad_diversity_late(self): 1380 node = FakeNode() 1381 sf = MySegmentFetcher(node, 0, 3, None) 1382 # we could satisfy the read entirely from the first server, but we'd 1383 # prefer not to. Instead, we expect to only pull one share from the 1384 # first server 1385 shares = [MyShare(0, "peer-A", 0.0), 1386 MyShare(1, "peer-A", 0.0), 1387 MyShare(2, "peer-A", 0.0), 1388 MyShare(3, "peer-B", 1.0), 1389 MyShare(4, "peer-C", 2.0), 1390 ] 1391 sf.add_shares([]) 1392 d = flushEventualQueue() 1393 def _check1(ign): 1394 self.failUnlessEqual(node.want_more, 1) 1395 sf.add_shares(shares) 1396 return flushEventualQueue() 1397 d.addCallback(_check1) 1398 def _check2(ign): 1399 self.failUnlessEqual(sf._test_start_shares, 1400 [shares[0], shares[3], shares[4]]) 1401 for sh in sf._test_start_shares: 1402 sf._block_request_activity(sh, sh._shnum, COMPLETE, 1403 "block-%d" % sh._shnum) 1404 return flushEventualQueue() 1405 d.addCallback(_check2) 1406 def _check3(ign): 1407 self.failIfEqual(node.processed, None) 1408 self.failUnlessEqual(node.processed, (0, {0: "block-0", 1409 3: "block-3", 1410 4: "block-4"}) ) 1411 d.addCallback(_check3) 1412 return d 1413 1414 def test_suffer_bad_diversity_late(self): 1415 node = FakeNode() 1416 sf = MySegmentFetcher(node, 0, 3, None) 1417 # we satisfy the read entirely from the first server because we don't 1418 # have any other choice. 1419 shares = [MyShare(0, "peer-A", 0.0), 1420 MyShare(1, "peer-A", 0.0), 1421 MyShare(2, "peer-A", 0.0), 1422 MyShare(3, "peer-A", 0.0), 1423 MyShare(4, "peer-A", 0.0), 1424 ] 1425 sf.add_shares([]) 1426 d = flushEventualQueue() 1427 def _check1(ign): 1428 self.failUnlessEqual(node.want_more, 1) 1429 sf.add_shares(shares) 1430 return flushEventualQueue() 1431 d.addCallback(_check1) 1432 def _check2(ign): 1433 self.failUnlessEqual(node.want_more, 3) 1434 self.failUnlessEqual(sf._test_start_shares, 1435 [shares[0], shares[1], shares[2]]) 1436 for sh in sf._test_start_shares: 1437 sf._block_request_activity(sh, sh._shnum, COMPLETE, 1438 "block-%d" % sh._shnum) 1439 return flushEventualQueue() 1440 d.addCallback(_check2) 1441 def _check3(ign): 1442 self.failIfEqual(node.processed, None) 1443 self.failUnlessEqual(node.processed, (0, {0: "block-0", 1444 1: "block-1", 1445 2: "block-2"}) ) 1446 d.addCallback(_check3) 1447 return d 1448 1449 def test_suffer_bad_diversity_early(self): 1450 node = FakeNode() 1451 sf = MySegmentFetcher(node, 0, 3, None) 1452 # we satisfy the read entirely from the first server because we don't 1453 # have any other choice. 1454 shares = [MyShare(0, "peer-A", 0.0), 1455 MyShare(1, "peer-A", 0.0), 1456 MyShare(2, "peer-A", 0.0), 1457 MyShare(3, "peer-A", 0.0), 1458 MyShare(4, "peer-A", 0.0), 1459 ] 1460 sf.add_shares(shares) 1461 d = flushEventualQueue() 1462 def _check1(ign): 1463 self.failUnlessEqual(node.want_more, 2) 1464 self.failUnlessEqual(sf._test_start_shares, 1465 [shares[0], shares[1], shares[2]]) 1466 for sh in sf._test_start_shares: 1467 sf._block_request_activity(sh, sh._shnum, COMPLETE, 1468 "block-%d" % sh._shnum) 1469 return flushEventualQueue() 1470 d.addCallback(_check1) 1471 def _check2(ign): 1472 self.failIfEqual(node.processed, None) 1473 self.failUnlessEqual(node.processed, (0, {0: "block-0", 1474 1: "block-1", 1475 2: "block-2"}) ) 1476 d.addCallback(_check2) 1477 return d 1478 1479 def test_overdue(self): 1480 node = FakeNode() 1481 sf = MySegmentFetcher(node, 0, 3, None) 1482 shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)] 1483 sf.add_shares(shares) 1484 d = flushEventualQueue() 1485 def _check1(ign): 1486 self.failUnlessEqual(node.want_more, 0) 1487 self.failUnlessEqual(sf._test_start_shares, shares[:3]) 1488 for sh in sf._test_start_shares: 1489 sf._block_request_activity(sh, sh._shnum, OVERDUE) 1490 return flushEventualQueue() 1491 d.addCallback(_check1) 1492 def _check2(ign): 1493 self.failUnlessEqual(sf._test_start_shares, shares[:6]) 1494 for sh in sf._test_start_shares[3:]: 1495 sf._block_request_activity(sh, sh._shnum, COMPLETE, 1496 "block-%d" % sh._shnum) 1497 return flushEventualQueue() 1498 d.addCallback(_check2) 1499 def _check3(ign): 1500 self.failIfEqual(node.processed, None) 1501 self.failUnlessEqual(node.processed, (0, {3: "block-3", 1502 4: "block-4", 1503 5: "block-5"}) ) 1504 d.addCallback(_check3) 1505 return d 1506 1507 def test_overdue_fails(self): 1508 node = FakeNode() 1509 sf = MySegmentFetcher(node, 0, 3, None) 1510 shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)] 1511 sf.add_shares(shares) 1512 sf.no_more_shares() 1513 d = flushEventualQueue() 1514 def _check1(ign): 1515 self.failUnlessEqual(node.want_more, 0) 1516 self.failUnlessEqual(sf._test_start_shares, shares[:3]) 1517 for sh in sf._test_start_shares: 1518 sf._block_request_activity(sh, sh._shnum, OVERDUE) 1519 return flushEventualQueue() 1520 d.addCallback(_check1) 1521 def _check2(ign): 1522 self.failUnlessEqual(sf._test_start_shares, shares[:6]) 1523 for sh in sf._test_start_shares[3:]: 1524 sf._block_request_activity(sh, sh._shnum, DEAD) 1525 return flushEventualQueue() 1526 d.addCallback(_check2) 1527 def _check3(ign): 1528 # we're still waiting 1529 self.failUnlessEqual(node.processed, None) 1530 self.failUnlessEqual(node.failed, None) 1531 # now complete one of the overdue ones, and kill one of the other 1532 # ones, leaving one hanging. This should trigger a failure, since 1533 # we cannot succeed. 1534 live = sf._test_start_shares[0] 1535 die = sf._test_start_shares[1] 1536 sf._block_request_activity(live, live._shnum, COMPLETE, "block") 1537 sf._block_request_activity(die, die._shnum, DEAD) 1538 return flushEventualQueue() 1539 d.addCallback(_check3) 1540 def _check4(ign): 1541 self.failUnless(node.failed) 1542 self.failUnless(node.failed.check(NotEnoughSharesError)) 1543 self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=", 1544 str(node.failed)) 1545 d.addCallback(_check4) 1546 return d 1547 1548 def test_avoid_redundancy(self): 1549 node = FakeNode() 1550 sf = MySegmentFetcher(node, 0, 3, None) 1551 # we could satisfy the read entirely from the first server, but we'd 1552 # prefer not to. Instead, we expect to only pull one share from the 1553 # first server 1554 shares = [MyShare(0, "peer-A", 0.0), 1555 MyShare(1, "peer-B", 1.0), 1556 MyShare(0, "peer-C", 2.0), # this will be skipped 1557 MyShare(1, "peer-D", 3.0), 1558 MyShare(2, "peer-E", 4.0), 1559 ] 1560 sf.add_shares(shares[:3]) 1561 d = flushEventualQueue() 1562 def _check1(ign): 1563 self.failUnlessEqual(node.want_more, 1) 1564 self.failUnlessEqual(sf._test_start_shares, 1565 [shares[0], shares[1]]) 1566 # allow sh1 to retire 1567 sf._block_request_activity(shares[1], 1, COMPLETE, "block-1") 1568 return flushEventualQueue() 1569 d.addCallback(_check1) 1570 def _check2(ign): 1571 # and then feed in the remaining shares 1572 sf.add_shares(shares[3:]) 1573 sf.no_more_shares() 1574 return flushEventualQueue() 1575 d.addCallback(_check2) 1576 def _check3(ign): 1577 self.failUnlessEqual(sf._test_start_shares, 1578 [shares[0], shares[1], shares[4]]) 1579 sf._block_request_activity(shares[0], 0, COMPLETE, "block-0") 1580 sf._block_request_activity(shares[4], 2, COMPLETE, "block-2") 1581 return flushEventualQueue() 1582 d.addCallback(_check3) 1583 def _check4(ign): 1584 self.failIfEqual(node.processed, None) 1585 self.failUnlessEqual(node.processed, (0, {0: "block-0", 1586 1: "block-1", 1587 2: "block-2"}) ) 1588 d.addCallback(_check4) 1589 return d -
src/allmydata/test/test_immutable.py
diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py index 288332d..511a865 100644
a b class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) 52 52 def _after_download(unused=None): 53 53 after_download_reads = self._count_reads() 54 54 #print before_download_reads, after_download_reads 55 self.failIf(after_download_reads-before_download_reads > 36,55 self.failIf(after_download_reads-before_download_reads > 41, 56 56 (after_download_reads, before_download_reads)) 57 57 d.addCallback(self._download_and_check_plaintext) 58 58 d.addCallback(_after_download) -
src/allmydata/test/test_web.py
diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py index 3008046..f68e98d 100644
a b class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi 4259 4259 def _check_one_share(body): 4260 4260 self.failIf("<html>" in body, body) 4261 4261 body = " ".join(body.strip().split()) 4262 msg = ("NotEnoughSharesError: This indicates that some " 4263 "servers were unavailable, or that shares have been " 4264 "lost to server departure, hard drive failure, or disk " 4265 "corruption. You should perform a filecheck on " 4266 "this object to learn more. The full error message is:" 4267 " ran out of shares: %d complete, %d pending, 0 overdue," 4268 " 0 unused, need 3. Last failure: None") 4269 msg1 = msg % (1, 0) 4270 msg2 = msg % (0, 1) 4262 msgbase = ("NotEnoughSharesError: This indicates that some " 4263 "servers were unavailable, or that shares have been " 4264 "lost to server departure, hard drive failure, or disk " 4265 "corruption. You should perform a filecheck on " 4266 "this object to learn more. The full error message is:" 4267 ) 4268 msg1 = msgbase + (" ran out of shares:" 4269 " complete=sh0" 4270 " pending=" 4271 " overdue= unused= need 3. Last failure: None") 4272 msg2 = msgbase + (" ran out of shares:" 4273 " complete=" 4274 " pending=Share(sh0-on-xgru5)" 4275 " overdue= unused= need 3. Last failure: None") 4271 4276 self.failUnless(body == msg1 or body == msg2, body) 4272 4277 d.addCallback(_check_one_share) 4273 4278