Ticket #1170: 1170-combo.diff

File 1170-combo.diff, 55.1 KB (added by warner, at 2010-08-19T17:27:57Z)

patch to prefer share diversity, forget leftover data after each segment, and fix handling of numsegs

  • src/allmydata/immutable/downloader/fetcher.py

    diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py
    index e30ced8..83c9beb 100644
    a b from foolscap.api import eventually 
    44from allmydata.interfaces import NotEnoughSharesError, NoSharesError
    55from allmydata.util import log
    66from allmydata.util.dictutil import DictOfSets
    7 from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
    8      BADSEGNUM, BadSegmentNumberError
     7from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
     8     BadSegmentNumberError
    99
    1010class SegmentFetcher:
    1111    """I am responsible for acquiring blocks for a single segment. I will use
    class SegmentFetcher: 
    2626        self._node = node # _Node
    2727        self.segnum = segnum
    2828        self._k = k
    29         self._shares = {} # maps non-dead Share instance to a state, one of
    30                           # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
    31                           # State transition map is:
    32                           #  AVAILABLE -(send-read)-> PENDING
    33                           #  PENDING -(timer)-> OVERDUE
    34                           #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
    35                           #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
    36                           # If a share becomes DEAD, it is removed from the
    37                           # dict. If it becomes BADSEGNUM, the whole fetch is
    38                           # terminated.
     29        self._shares = [] # unused Share instances, sorted by "goodness"
     30                          # (RTT), then shnum. This is populated when DYHB
     31                          # responses arrive, or (for later segments) at
     32                          # startup. We remove shares from it when we call
     33                          # sh.get_block() on them.
     34        self._shares_from_server = DictOfSets() # maps serverid to set of
     35                                                # Shares on that server for
     36                                                # which we have outstanding
     37                                                # get_block() calls.
     38        self._max_shares_per_server = 1 # how many Shares we're allowed to
     39                                        # pull from each server. This starts
     40                                        # at 1 and grows if we don't have
     41                                        # sufficient diversity.
     42        self._active_share_map = {} # maps shnum to outstanding (and not
     43                                    # OVERDUE) Share that provides it.
     44        self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
    3945        self._share_observers = {} # maps Share to EventStreamObserver for
    4046                                   # active ones
    41         self._shnums = DictOfSets() # maps shnum to the shares that provide it
    4247        self._blocks = {} # maps shnum to validated block data
    4348        self._no_more_shares = False
    44         self._bad_segnum = False
    4549        self._last_failure = None
    4650        self._running = True
    4751
    class SegmentFetcher: 
    5054                level=log.NOISY, umid="LWyqpg")
    5155        self._cancel_all_requests()
    5256        self._running = False
    53         self._shares.clear() # let GC work # ??? XXX
     57        # help GC ??? XXX
     58        del self._shares, self._shares_from_server, self._active_share_map
     59        del self._share_observers
    5460
    5561
    5662    # called by our parent _Node
    class SegmentFetcher: 
    5965        # called when ShareFinder locates a new share, and when a non-initial
    6066        # segment fetch is started and we already know about shares from the
    6167        # previous segment
    62         for s in shares:
    63             self._shares[s] = AVAILABLE
    64             self._shnums.add(s._shnum, s)
     68        self._shares.extend(shares)
     69        self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
    6570        eventually(self.loop)
    6671
    6772    def no_more_shares(self):
    class SegmentFetcher: 
    7176
    7277    # internal methods
    7378
    74     def _count_shnums(self, *states):
    75         """shnums for which at least one state is in the following list"""
    76         shnums = []
    77         for shnum,shares in self._shnums.iteritems():
    78             matches = [s for s in shares if self._shares.get(s) in states]
    79             if matches:
    80                 shnums.append(shnum)
    81         return len(shnums)
    82 
    8379    def loop(self):
    8480        try:
    8581            # if any exception occurs here, kill the download
    class SegmentFetcher: 
    9288        k = self._k
    9389        if not self._running:
    9490            return
    95         if self._bad_segnum:
     91        numsegs, authoritative = self._node.get_num_segments()
     92        if authoritative and self.segnum >= numsegs:
    9693            # oops, we were asking for a segment number beyond the end of the
    9794            # file. This is an error.
    9895            self.stop()
    class SegmentFetcher: 
    10299            self._node.fetch_failed(self, f)
    103100            return
    104101
     102        #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
     103        # Should we sent out more requests?
     104        while len(set(self._blocks.keys())
     105                  | set(self._active_share_map.keys())
     106                  ) < k:
     107            # we don't have data or active requests for enough shares. Are
     108            # there any unused shares we can start using?
     109            (sent_something, want_diversity) = self._find_and_use_share()
     110            if sent_something:
     111                # great. loop back around in case we need to send more.
     112                continue
     113            if want_diversity:
     114                # we could have sent something if we'd been allowed to pull
     115                # more shares per server. Increase the limit and try again.
     116                self._max_shares_per_server += 1
     117                log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
     118                        % (self._node._si_prefix, self._max_shares_per_server),
     119                        level=log.NOISY, umid="xY2pBA")
     120                # Also ask for more shares, in the hopes of achieving better
     121                # diversity for the next segment.
     122                self._ask_for_more_shares()
     123                continue
     124            # we need more shares than the ones in self._shares to make
     125            # progress
     126            self._ask_for_more_shares()
     127            if self._no_more_shares:
     128                # But there are no more shares to be had. If we're going to
     129                # succeed, it will be with the shares we've already seen.
     130                # Will they be enough?
     131                if len(set(self._blocks.keys())
     132                       | set(self._active_share_map.keys())
     133                       | set(self._overdue_share_map.keys())
     134                       ) < k:
     135                    # nope. bail.
     136                    self._no_shares_error() # this calls self.stop()
     137                    return
     138                # our outstanding or overdue requests may yet work.
     139            # more shares may be coming. Wait until then.
     140            return
     141
    105142        # are we done?
    106         if self._count_shnums(COMPLETE) >= k:
     143        if len(set(self._blocks.keys())) >= k:
    107144            # yay!
    108145            self.stop()
    109146            self._node.process_blocks(self.segnum, self._blocks)
    110147            return
    111148
    112         # we may have exhausted everything
    113         if (self._no_more_shares and
    114             self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
    115             # no more new shares are coming, and the remaining hopeful shares
    116             # aren't going to be enough. boo!
    117 
    118             log.msg("share states: %r" % (self._shares,),
    119                     level=log.NOISY, umid="0ThykQ")
    120             if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
    121                 format = ("no shares (need %(k)d)."
    122                           " Last failure: %(last_failure)s")
    123                 args = { "k": k,
    124                          "last_failure": self._last_failure }
    125                 error = NoSharesError
    126             else:
    127                 format = ("ran out of shares: %(complete)d complete,"
    128                           " %(pending)d pending, %(overdue)d overdue,"
    129                           " %(unused)d unused, need %(k)d."
    130                           " Last failure: %(last_failure)s")
    131                 args = {"complete": self._count_shnums(COMPLETE),
    132                         "pending": self._count_shnums(PENDING),
    133                         "overdue": self._count_shnums(OVERDUE),
    134                         # 'unused' should be zero
    135                         "unused": self._count_shnums(AVAILABLE),
    136                         "k": k,
    137                         "last_failure": self._last_failure,
    138                         }
    139                 error = NotEnoughSharesError
    140             log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
    141             e = error(format % args)
    142             f = Failure(e)
    143             self.stop()
    144             self._node.fetch_failed(self, f)
    145             return
     149    def _no_shares_error(self):
     150        if not (self._shares or self._active_share_map or
     151                self._overdue_share_map or self._blocks):
     152            format = ("no shares (need %(k)d)."
     153                      " Last failure: %(last_failure)s")
     154            args = { "k": self._k,
     155                     "last_failure": self._last_failure }
     156            error = NoSharesError
     157        else:
     158            format = ("ran out of shares: complete=%(complete)s"
     159                      " pending=%(pending)s overdue=%(overdue)s"
     160                      " unused=%(unused)s need %(k)d."
     161                      " Last failure: %(last_failure)s")
     162            def join(shnums): return ",".join(["sh%d" % shnum
     163                                               for shnum in sorted(shnums)])
     164            pending_s = ",".join([str(sh)
     165                                  for sh in self._active_share_map.values()])
     166            overdue = set()
     167            for shares in self._overdue_share_map.values():
     168                overdue |= shares
     169            overdue_s = ",".join([str(sh) for sh in overdue])
     170            args = {"complete": join(self._blocks.keys()),
     171                    "pending": pending_s,
     172                    "overdue": overdue_s,
     173                    # 'unused' should be zero
     174                    "unused": ",".join([str(sh) for sh in self._shares]),
     175                    "k": self._k,
     176                    "last_failure": self._last_failure,
     177                    }
     178            error = NotEnoughSharesError
     179        log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
     180        e = error(format % args)
     181        f = Failure(e)
     182        self.stop()
     183        self._node.fetch_failed(self, f)
    146184
    147         # nope, not done. Are we "block-hungry" (i.e. do we want to send out
    148         # more read requests, or do we think we have enough in flight
    149         # already?)
    150         while self._count_shnums(PENDING, COMPLETE) < k:
    151             # we're hungry.. are there any unused shares?
    152             sent = self._send_new_request()
    153             if not sent:
    154                 break
    155 
    156         # ok, now are we "share-hungry" (i.e. do we have enough known shares
    157         # to make us happy, or should we ask the ShareFinder to get us more?)
    158         if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
    159             # we're hungry for more shares
    160             self._node.want_more_shares()
    161             # that will trigger the ShareFinder to keep looking
    162 
    163     def _find_one(self, shares, state):
    164         # TODO could choose fastest, or avoid servers already in use
    165         for s in shares:
    166             if self._shares[s] == state:
    167                 return s
    168         # can never get here, caller has assert in case of code bug
    169 
    170     def _send_new_request(self):
    171         # TODO: this is probably O(k^2), and we're called from a range(k)
    172         # loop, so O(k^3)
    173 
    174         # this first loop prefers sh0, then sh1, sh2, etc
    175         for shnum,shares in sorted(self._shnums.iteritems()):
    176             states = [self._shares[s] for s in shares]
    177             if COMPLETE in states or PENDING in states:
    178                 # don't send redundant requests
     185    def _find_and_use_share(self):
     186        sent_something = False
     187        want_diversity = False
     188        for sh in self._shares: # find one good share to fetch
     189            shnum = sh._shnum ; serverid = sh._peerid
     190            if shnum in self._blocks:
     191                continue # don't request data we already have
     192            if shnum in self._active_share_map:
     193                continue # don't send redundant requests
     194            sfs = self._shares_from_server
     195            if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
     196                # don't pull too much from a single server
     197                want_diversity = True
    179198                continue
    180             if AVAILABLE not in states:
    181                 # no candidates for this shnum, move on
    182                 continue
    183             # here's a candidate. Send a request.
    184             s = self._find_one(shares, AVAILABLE)
    185             assert s
    186             self._shares[s] = PENDING
    187             self._share_observers[s] = o = s.get_block(self.segnum)
    188             o.subscribe(self._block_request_activity, share=s, shnum=shnum)
    189             # TODO: build up a list of candidates, then walk through the
    190             # list, sending requests to the most desireable servers,
    191             # re-checking our block-hunger each time. For non-initial segment
    192             # fetches, this would let us stick with faster servers.
    193             return True
    194         # nothing was sent: don't call us again until you have more shares to
    195         # work with, or one of the existing shares has been declared OVERDUE
    196         return False
     199            # ok, we can use this share
     200            self._shares.remove(sh)
     201            self._active_share_map[shnum] = sh
     202            self._shares_from_server.add(serverid, sh)
     203            self._start_share(sh, shnum)
     204            sent_something = True
     205            break
     206        return (sent_something, want_diversity)
     207
     208    def _start_share(self, share, shnum):
     209        self._share_observers[share] = o = share.get_block(self.segnum)
     210        o.subscribe(self._block_request_activity, share=share, shnum=shnum)
     211
     212    def _ask_for_more_shares(self):
     213        if not self._no_more_shares:
     214            self._node.want_more_shares()
     215            # that will trigger the ShareFinder to keep looking, and call our
     216            # add_shares() or no_more_shares() later.
    197217
    198218    def _cancel_all_requests(self):
    199219        for o in self._share_observers.values():
    class SegmentFetcher: 
    208228                " Share(sh%d-on-%s) -> %s" %
    209229                (self._node._si_prefix, shnum, share._peerid_s, state),
    210230                level=log.NOISY, umid="vilNWA")
    211         # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
     231        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
     232        # from all our tracking lists.
    212233        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
    213234            self._share_observers.pop(share, None)
     235            self._shares_from_server.discard(shnum, share)
     236            if self._active_share_map.get(shnum) is share:
     237                del self._active_share_map[shnum]
     238            self._overdue_share_map.discard(shnum, share)
     239
    214240        if state is COMPLETE:
    215             # 'block' is fully validated
    216             self._shares[share] = COMPLETE
     241            # 'block' is fully validated and complete
    217242            self._blocks[shnum] = block
    218         elif state is OVERDUE:
    219             self._shares[share] = OVERDUE
     243
     244        if state is OVERDUE:
     245            # no longer active, but still might complete
     246            del self._active_share_map[shnum]
     247            self._overdue_share_map.add(shnum, share)
    220248            # OVERDUE is not terminal: it will eventually transition to
    221249            # COMPLETE, CORRUPT, or DEAD.
    222         elif state is CORRUPT:
    223             self._shares[share] = CORRUPT
    224         elif state is DEAD:
    225             del self._shares[share]
    226             self._shnums[shnum].remove(share)
    227             self._last_failure = f
    228         elif state is BADSEGNUM:
    229             self._shares[share] = BADSEGNUM # ???
    230             self._bad_segnum = True
    231         eventually(self.loop)
    232250
     251        if state is DEAD:
     252            self._last_failure = f
     253        if state is BADSEGNUM:
     254            # our main loop will ask the DownloadNode each time for the
     255            # number of segments, so we'll deal with this in the top of
     256            # _do_loop
     257            pass
    233258
     259        eventually(self.loop)
  • src/allmydata/immutable/downloader/finder.py

    diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py
    index 9adee99..f2843b1 100644
    a b class ShareFinder: 
    3535        self._storage_broker = storage_broker
    3636        self.share_consumer = self.node = node
    3737        self.max_outstanding_requests = max_outstanding_requests
    38 
    3938        self._hungry = False
    4039
    4140        self._commonshares = {} # shnum to CommonShare instance
    4241        self.undelivered_shares = []
     42        # TODO: change this to deliver all shares as soon as possible. And
     43        # figure out how self.hungry() should interact with the
     44        # SegmentFetcher caller.
    4345        self.pending_requests = set()
    4446        self.overdue_requests = set() # subset of pending_requests
    4547        self.overdue_timers = {}
    class ShareFinder: 
    5254                           si=self._si_prefix,
    5355                           level=log.NOISY, parent=logparent, umid="2xjj2A")
    5456
     57    def update_num_segments(self):
     58        (numsegs, authoritative) = self.node.get_num_segments()
     59        assert authoritative
     60        for cs in self._commonshares.values():
     61            cs.set_authoritative_num_segments(numsegs)
     62
    5563    def start_finding_servers(self):
    5664        # don't get servers until somebody uses us: creating the
    5765        # ImmutableFileNode should not cause work to happen yet. Test case is
    class ShareFinder: 
    146154        lp = self.log(format="sending DYHB to [%(peerid)s]",
    147155                      peerid=idlib.shortnodeid_b2a(peerid),
    148156                      level=log.NOISY, umid="Io7pyg")
    149         d_ev = self._download_status.add_dyhb_sent(peerid, now())
     157        time_sent = now()
     158        d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
    150159        # TODO: get the timer from a Server object, it knows best
    151160        self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
    152161                                                     self.overdue, req)
    153162        d = rref.callRemote("get_buckets", self._storage_index)
    154163        d.addBoth(incidentally, self._request_retired, req)
    155164        d.addCallbacks(self._got_response, self._got_error,
    156                        callbackArgs=(rref.version, peerid, req, d_ev, lp),
     165                       callbackArgs=(rref.version, peerid, req, d_ev,
     166                                     time_sent, lp),
    157167                       errbackArgs=(peerid, req, d_ev, lp))
    158168        d.addErrback(log.err, format="error in send_request",
    159169                     level=log.WEIRD, parent=lp, umid="rpdV0w")
    class ShareFinder: 
    172182        self.overdue_requests.add(req)
    173183        eventually(self.loop)
    174184
    175     def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
     185    def _got_response(self, buckets, server_version, peerid, req, d_ev,
     186                      time_sent, lp):
    176187        shnums = sorted([shnum for shnum in buckets])
    177         d_ev.finished(shnums, now())
     188        time_received = now()
     189        d_ev.finished(shnums, time_received)
     190        dyhb_rtt = time_received - time_sent
    178191        if buckets:
    179192            shnums_s = ",".join([str(shnum) for shnum in shnums])
    180193            self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
    class ShareFinder: 
    184197            self.log(format="no shares from [%(peerid)s]",
    185198                     peerid=idlib.shortnodeid_b2a(peerid),
    186199                     level=log.NOISY, parent=lp, umid="U7d4JA")
    187         if self.node.num_segments is None:
    188             best_numsegs = self.node.guessed_num_segments
    189         else:
    190             best_numsegs = self.node.num_segments
    191200        for shnum, bucket in buckets.iteritems():
    192             self._create_share(best_numsegs, shnum, bucket, server_version,
    193                                peerid)
     201            self._create_share(shnum, bucket, server_version, peerid, dyhb_rtt)
    194202
    195     def _create_share(self, best_numsegs, shnum, bucket, server_version,
    196                       peerid):
     203    def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
    197204        if shnum in self._commonshares:
    198205            cs = self._commonshares[shnum]
    199206        else:
    200             cs = CommonShare(best_numsegs, self._si_prefix, shnum,
     207            numsegs, authoritative = self.node.get_num_segments()
     208            cs = CommonShare(numsegs, self._si_prefix, shnum,
    201209                             self._node_logparent)
     210            if authoritative:
     211                cs.set_authoritative_num_segments(numsegs)
    202212            # Share._get_satisfaction is responsible for updating
    203213            # CommonShare.set_numsegs after we know the UEB. Alternatives:
    204214            #  1: d = self.node.get_num_segments()
    class ShareFinder: 
    214224            #     Yuck.
    215225            self._commonshares[shnum] = cs
    216226        s = Share(bucket, server_version, self.verifycap, cs, self.node,
    217                   self._download_status, peerid, shnum,
     227                  self._download_status, peerid, shnum, dyhb_rtt,
    218228                  self._node_logparent)
    219229        self.undelivered_shares.append(s)
    220230
  • src/allmydata/immutable/downloader/node.py

    diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py
    index 4c92dd8..292e767 100644
    a b class DownloadNode: 
    103103        # as with CommonShare, our ciphertext_hash_tree is a stub until we
    104104        # get the real num_segments
    105105        self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
     106        self.ciphertext_hash_tree_leaves = self.guessed_num_segments
    106107
    107108    def __repr__(self):
    108109        return "Imm_Node(%s)" % (self._si_prefix,)
    class DownloadNode: 
    240241        # _parse_and_store_UEB, and we should abandon the download.
    241242        self.have_UEB = True
    242243
     244        # inform the ShareFinder about our correct number of segments. This
     245        # will update the block-hash-trees in all existing CommonShare
     246        # instances, and will populate new ones with the correct value.
     247        self._sharefinder.update_num_segments()
     248
    243249    def _parse_and_store_UEB(self, d):
    244250        # Note: the UEB contains needed_shares and total_shares. These are
    245251        # redundant and inferior (the filecap contains the authoritative
    class DownloadNode: 
    292298        # shares of file B. self.ciphertext_hash_tree was a guess before:
    293299        # this is where we create it for real.
    294300        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
     301        self.ciphertext_hash_tree_leaves = self.num_segments
    295302        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
    296303
    297304        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
    class DownloadNode: 
    344351                                   % (hashnum, len(self.share_hash_tree)))
    345352        self.share_hash_tree.set_hashes(share_hashes)
    346353
     354    def get_desired_ciphertext_hashes(self, segnum):
     355        if segnum < self.ciphertext_hash_tree_leaves:
     356            return self.ciphertext_hash_tree.needed_hashes(segnum,
     357                                                           include_leaf=True)
     358        return []
    347359    def get_needed_ciphertext_hashes(self, segnum):
    348360        cht = self.ciphertext_hash_tree
    349361        return cht.needed_hashes(segnum, include_leaf=True)
     362
    350363    def process_ciphertext_hashes(self, hashes):
    351364        assert self.num_segments is not None
    352365        # this may raise BadHashError or NotEnoughHashesError
    class DownloadNode: 
    473486            self._active_segment.stop()
    474487            self._active_segment = None
    475488            self._start_new_segment()
     489
     490    # called by ShareFinder to choose hashtree sizes in CommonShares, and by
     491    # SegmentFetcher to tell if it is still fetching a valid segnum.
     492    def get_num_segments(self):
     493        # returns (best_num_segments, authoritative)
     494        if self.num_segments is None:
     495            return (self.guessed_num_segments, False)
     496        return (self.num_segments, True)
  • src/allmydata/immutable/downloader/share.py

    diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py
    index f7ed4e8..0034845 100644
    a b class Share: 
    3333    # servers. A different backend would use a different class.
    3434
    3535    def __init__(self, rref, server_version, verifycap, commonshare, node,
    36                  download_status, peerid, shnum, logparent):
     36                 download_status, peerid, shnum, dyhb_rtt, logparent):
    3737        self._rref = rref
    3838        self._server_version = server_version
    3939        self._node = node # holds share_hash_tree and UEB
    class Share: 
    5151        self._storage_index = verifycap.storage_index
    5252        self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
    5353        self._shnum = shnum
     54        self._dyhb_rtt = dyhb_rtt
    5455        # self._alive becomes False upon fatal corruption or server error
    5556        self._alive = True
    5657        self._lp = log.msg(format="%(share)s created", share=repr(self),
    class Share: 
    278279            if not self._satisfy_UEB():
    279280                # can't check any hashes without the UEB
    280281                return False
     282            # the call to _satisfy_UEB() will immediately set the
     283            # authoritative num_segments in all our CommonShares. If we
     284            # guessed wrong, we might stil be working on a bogus segnum
     285            # (beyond the real range). We catch this and signal BADSEGNUM
     286            # before invoking any further code that touches hashtrees.
    281287        self.actual_segment_size = self._node.segment_size # might be updated
    282288        assert self.actual_segment_size is not None
    283289
    284         # knowing the UEB means knowing num_segments. Despite the redundancy,
    285         # this is the best place to set this. CommonShare.set_numsegs will
    286         # ignore duplicate calls.
     290        # knowing the UEB means knowing num_segments
    287291        assert self._node.num_segments is not None
    288         cs = self._commonshare
    289         cs.set_numsegs(self._node.num_segments)
    290292
    291293        segnum, observers = self._active_segnum_and_observers()
    292294        # if segnum is None, we don't really need to do anything (we have no
    class Share: 
    304306                # can't check block_hash_tree without a root
    305307                return False
    306308
    307         if cs.need_block_hash_root():
     309        if self._commonshare.need_block_hash_root():
    308310            block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
    309             cs.set_block_hash_root(block_hash_root)
     311            self._commonshare.set_block_hash_root(block_hash_root)
    310312
    311313        if segnum is None:
    312314            return False # we don't want any particular segment right now
    class Share: 
    531533            for o in observers:
    532534                # goes to SegmentFetcher._block_request_activity
    533535                o.notify(state=COMPLETE, block=block)
     536            # now clear our received data, to dodge the #1170 spans.py
     537            # complexity bug
     538            self._received = DataSpans()
    534539        except (BadHashError, NotEnoughHashesError), e:
    535540            # rats, we have a corrupt block. Notify our clients that they
    536541            # need to look elsewhere, and advise the server. Unlike
    class Share: 
    586591        if self.actual_offsets or self._overrun_ok:
    587592            if not self._node.have_UEB:
    588593                self._desire_UEB(desire, o)
    589             # They might ask for a segment that doesn't look right.
    590             # _satisfy() will catch+reject bad segnums once we know the UEB
    591             # (and therefore segsize and numsegs), so we'll only fail this
    592             # test if we're still guessing. We want to avoid asking the
    593             # hashtrees for needed_hashes() for bad segnums. So don't enter
    594             # _desire_hashes or _desire_data unless the segnum looks
    595             # reasonable.
    596             if segnum < r["num_segments"]:
    597                 # XXX somehow we're getting here for sh5. we don't yet know
    598                 # the actual_segment_size, we're still working off the guess.
    599                 # the ciphertext_hash_tree has been corrected, but the
    600                 # commonshare._block_hash_tree is still in the guessed state.
    601                 self._desire_share_hashes(desire, o)
    602                 if segnum is not None:
    603                     self._desire_block_hashes(desire, o, segnum)
    604                     self._desire_data(desire, o, r, segnum, segsize)
    605             else:
    606                 log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
    607                         % (segnum, r["num_segments"]),
    608                         level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
     594            self._desire_share_hashes(desire, o)
     595            if segnum is not None:
     596                # They might be asking for a segment number that is beyond
     597                # what we guess the file contains, but _desire_block_hashes
     598                # and _desire_data will tolerate that.
     599                self._desire_block_hashes(desire, o, segnum)
     600                self._desire_data(desire, o, r, segnum, segsize)
    609601
    610602        log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
    611603                % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
    class Share: 
    678670        (want_it, need_it, gotta_gotta_have_it) = desire
    679671
    680672        # block hash chain
    681         for hashnum in self._commonshare.get_needed_block_hashes(segnum):
     673        for hashnum in self._commonshare.get_desired_block_hashes(segnum):
    682674            need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
    683675
    684676        # ciphertext hash chain
    685         for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
     677        for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
    686678            need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
    687679
    688680    def _desire_data(self, desire, o, r, segnum, segsize):
     681        if segnum > r["num_segments"]:
     682            # they're asking for a segment that's beyond what we think is the
     683            # end of the file. We won't get here if we've already learned the
     684            # real UEB: _get_satisfaction() will notice the out-of-bounds and
     685            # terminate the loop. So we must still be guessing, which means
     686            # that they might be correct in asking for such a large segnum.
     687            # But if they're right, then our segsize/segnum guess is
     688            # certainly wrong, which means we don't know what data blocks to
     689            # ask for yet. So don't bother adding anything. When the UEB
     690            # comes back and we learn the correct segsize/segnums, we'll
     691            # either reject the request or have enough information to proceed
     692            # normally. This costs one roundtrip.
     693            log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
     694                    % (segnum, r["num_segments"]),
     695                    level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
     696            return
    689697        (want_it, need_it, gotta_gotta_have_it) = desire
    690698        tail = (segnum == r["num_segments"]-1)
    691699        datastart = o["data"]
    class Share: 
    800808
    801809
    802810class CommonShare:
     811    # TODO: defer creation of the hashtree until somebody uses us. There will
     812    # be a lot of unused shares, and we shouldn't spend the memory on a large
     813    # hashtree unless necessary.
    803814    """I hold data that is common across all instances of a single share,
    804815    like sh2 on both servers A and B. This is just the block hash tree.
    805816    """
    806     def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
     817    def __init__(self, best_numsegs, si_prefix, shnum, logparent):
    807818        self.si_prefix = si_prefix
    808819        self.shnum = shnum
     820
    809821        # in the beginning, before we have the real UEB, we can only guess at
    810822        # the number of segments. But we want to ask for block hashes early.
    811823        # So if we're asked for which block hashes are needed before we know
    812824        # numsegs for sure, we return a guess.
    813         self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
    814         self._know_numsegs = False
     825        self._block_hash_tree = IncompleteHashTree(best_numsegs)
     826        self._block_hash_tree_is_authoritative = False
     827        self._block_hash_tree_leaves = best_numsegs
    815828        self._logparent = logparent
    816829
    817     def set_numsegs(self, numsegs):
    818         if self._know_numsegs:
    819             return
    820         self._block_hash_tree = IncompleteHashTree(numsegs)
    821         self._know_numsegs = True
     830    def __repr__(self):
     831        return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
     832
     833    def set_authoritative_num_segments(self, numsegs):
     834        if self._block_hash_tree_leaves != numsegs:
     835            self._block_hash_tree = IncompleteHashTree(numsegs)
     836            self._block_hash_tree_leaves = numsegs
     837        self._block_hash_tree_is_authoritative = True
    822838
    823839    def need_block_hash_root(self):
    824840        return bool(not self._block_hash_tree[0])
    825841
    826842    def set_block_hash_root(self, roothash):
    827         assert self._know_numsegs
     843        assert self._block_hash_tree_is_authoritative
    828844        self._block_hash_tree.set_hashes({0: roothash})
    829845
     846    def get_desired_block_hashes(self, segnum):
     847        if segnum < self._block_hash_tree_leaves:
     848            return self._block_hash_tree.needed_hashes(segnum,
     849                                                       include_leaf=True)
     850
     851        # the segnum might be out-of-bounds. Originally it was due to a race
     852        # between the receipt of the UEB on one share (from which we learn
     853        # the correct number of segments, update all hash trees to the right
     854        # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
     855        # of a new Share to the SegmentFetcher while that BADSEGNUM was
     856        # queued (which sends out requests to the stale segnum, now larger
     857        # than the hash tree). I fixed that (by making SegmentFetcher.loop
     858        # check for a bad segnum at the start of each pass, instead of using
     859        # the queued BADSEGNUM or a flag it sets), but just in case this
     860        # still happens, I'm leaving the < in place. If it gets hit, there's
     861        # a potential lost-progress problem, but I'm pretty sure that it will
     862        # get cleared up on the following turn.
     863        return []
     864
    830865    def get_needed_block_hashes(self, segnum):
     866        assert self._block_hash_tree_is_authoritative
    831867        # XXX: include_leaf=True needs thought: how did the old downloader do
    832868        # it? I think it grabbed *all* block hashes and set them all at once.
    833869        # Since we want to fetch less data, we either need to fetch the leaf
    class CommonShare: 
    837873        return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
    838874
    839875    def process_block_hashes(self, block_hashes):
    840         assert self._know_numsegs
     876        assert self._block_hash_tree_is_authoritative
    841877        # this may raise BadHashError or NotEnoughHashesError
    842878        self._block_hash_tree.set_hashes(block_hashes)
    843879
    844880    def check_block(self, segnum, block):
    845         assert self._know_numsegs
     881        assert self._block_hash_tree_is_authoritative
    846882        h = hashutil.block_hash(block)
    847883        # this may raise BadHashError or NotEnoughHashesError
    848884        self._block_hash_tree.set_hashes(leaves={segnum: h})
     885
     886# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
     887# auxilliary OVERDUE callback. Just make sure to get all the messages in the
     888# right order and on the right turns.
     889
     890# TODO: we're asking for too much data. We probably don't need
     891# include_leaf=True in the block hash tree or ciphertext hash tree.
     892
     893# TODO: we ask for ciphertext hash tree nodes from all shares (whenever
     894# _desire is called while we're missing those nodes), but we only consume it
     895# from the first response, leaving the rest of the data sitting in _received.
  • src/allmydata/test/test_cli.py

    diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py
    index db5bf5f..2453126 100644
    a b class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): 
    23032303        # the download is abandoned as soon as it's clear that we won't get
    23042304        # enough shares. The one remaining share might be in either the
    23052305        # COMPLETE or the PENDING state.
    2306         in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3"
    2307         in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3"
     2306        in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
     2307        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3"
    23082308
    23092309        d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
    23102310        def _check1((rc, out, err)):
  • src/allmydata/test/test_download.py

    diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py
    index 71a556b..80ce713 100644
    a b from allmydata.test.no_network import GridTestMixin 
    1515from allmydata.test.common import ShouldFailMixin
    1616from allmydata.interfaces import NotEnoughSharesError, NoSharesError
    1717from allmydata.immutable.downloader.common import BadSegmentNumberError, \
    18      BadCiphertextHashError, DownloadStopped
     18     BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
    1919from allmydata.immutable.downloader.status import DownloadStatus
     20from allmydata.immutable.downloader.fetcher import SegmentFetcher
    2021from allmydata.codec import CRSDecoder
    2122from foolscap.eventual import fireEventually, flushEventualQueue
    2223
    class Status(unittest.TestCase): 
    12571258        e2.update(1000, 2.0, 2.0)
    12581259        e2.finished(now+5)
    12591260        self.failUnlessEqual(ds.get_progress(), 1.0)
     1261
     1262class MyShare:
     1263    def __init__(self, shnum, peerid, rtt):
     1264        self._shnum = shnum
     1265        self._peerid = peerid
     1266        self._peerid_s = peerid
     1267        self._dyhb_rtt = rtt
     1268    def __repr__(self):
     1269        return "sh%d-on-%s" % (self._shnum, self._peerid)
     1270
     1271class MySegmentFetcher(SegmentFetcher):
     1272    def __init__(self, *args, **kwargs):
     1273        SegmentFetcher.__init__(self, *args, **kwargs)
     1274        self._test_start_shares = []
     1275    def _start_share(self, share, shnum):
     1276        self._test_start_shares.append(share)
     1277
     1278class FakeNode:
     1279    def __init__(self):
     1280        self.want_more = 0
     1281        self.failed = None
     1282        self.processed = None
     1283        self._si_prefix = "si_prefix"
     1284    def want_more_shares(self):
     1285        self.want_more += 1
     1286    def fetch_failed(self, fetcher, f):
     1287        self.failed = f
     1288    def process_blocks(self, segnum, blocks):
     1289        self.processed = (segnum, blocks)
     1290    def get_num_segments(self):
     1291        return 1, True
     1292
     1293class Selection(unittest.TestCase):
     1294    def test_no_shares(self):
     1295        node = FakeNode()
     1296        sf = SegmentFetcher(node, 0, 3)
     1297        sf.add_shares([])
     1298        d = flushEventualQueue()
     1299        def _check1(ign):
     1300            self.failUnlessEqual(node.want_more, 1)
     1301            self.failUnlessEqual(node.failed, None)
     1302            sf.no_more_shares()
     1303            return flushEventualQueue()
     1304        d.addCallback(_check1)
     1305        def _check2(ign):
     1306            self.failUnless(node.failed)
     1307            self.failUnless(node.failed.check(NoSharesError))
     1308        d.addCallback(_check2)
     1309        return d
     1310
     1311    def test_only_one_share(self):
     1312        node = FakeNode()
     1313        sf = MySegmentFetcher(node, 0, 3)
     1314        shares = [MyShare(0, "peer-A", 0.0)]
     1315        sf.add_shares(shares)
     1316        d = flushEventualQueue()
     1317        def _check1(ign):
     1318            self.failUnlessEqual(node.want_more, 1)
     1319            self.failUnlessEqual(node.failed, None)
     1320            sf.no_more_shares()
     1321            return flushEventualQueue()
     1322        d.addCallback(_check1)
     1323        def _check2(ign):
     1324            self.failUnless(node.failed)
     1325            self.failUnless(node.failed.check(NotEnoughSharesError))
     1326            self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
     1327                              str(node.failed))
     1328        d.addCallback(_check2)
     1329        return d
     1330
     1331    def test_good_diversity_early(self):
     1332        node = FakeNode()
     1333        sf = MySegmentFetcher(node, 0, 3)
     1334        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
     1335        sf.add_shares(shares)
     1336        d = flushEventualQueue()
     1337        def _check1(ign):
     1338            self.failUnlessEqual(node.want_more, 0)
     1339            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1340            for sh in sf._test_start_shares:
     1341                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1342                                           "block-%d" % sh._shnum)
     1343            return flushEventualQueue()
     1344        d.addCallback(_check1)
     1345        def _check2(ign):
     1346            self.failIfEqual(node.processed, None)
     1347            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1348                                                      1: "block-1",
     1349                                                      2: "block-2"}) )
     1350        d.addCallback(_check2)
     1351        return d
     1352
     1353    def test_good_diversity_late(self):
     1354        node = FakeNode()
     1355        sf = MySegmentFetcher(node, 0, 3)
     1356        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
     1357        sf.add_shares([])
     1358        d = flushEventualQueue()
     1359        def _check1(ign):
     1360            self.failUnlessEqual(node.want_more, 1)
     1361            sf.add_shares(shares)
     1362            return flushEventualQueue()
     1363        d.addCallback(_check1)
     1364        def _check2(ign):
     1365            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1366            for sh in sf._test_start_shares:
     1367                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1368                                           "block-%d" % sh._shnum)
     1369            return flushEventualQueue()
     1370        d.addCallback(_check2)
     1371        def _check3(ign):
     1372            self.failIfEqual(node.processed, None)
     1373            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1374                                                      1: "block-1",
     1375                                                      2: "block-2"}) )
     1376        d.addCallback(_check3)
     1377        return d
     1378
     1379    def test_avoid_bad_diversity_late(self):
     1380        node = FakeNode()
     1381        sf = MySegmentFetcher(node, 0, 3)
     1382        # we could satisfy the read entirely from the first server, but we'd
     1383        # prefer not to. Instead, we expect to only pull one share from the
     1384        # first server
     1385        shares = [MyShare(0, "peer-A", 0.0),
     1386                  MyShare(1, "peer-A", 0.0),
     1387                  MyShare(2, "peer-A", 0.0),
     1388                  MyShare(3, "peer-B", 1.0),
     1389                  MyShare(4, "peer-C", 2.0),
     1390                  ]
     1391        sf.add_shares([])
     1392        d = flushEventualQueue()
     1393        def _check1(ign):
     1394            self.failUnlessEqual(node.want_more, 1)
     1395            sf.add_shares(shares)
     1396            return flushEventualQueue()
     1397        d.addCallback(_check1)
     1398        def _check2(ign):
     1399            self.failUnlessEqual(sf._test_start_shares,
     1400                                 [shares[0], shares[3], shares[4]])
     1401            for sh in sf._test_start_shares:
     1402                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1403                                           "block-%d" % sh._shnum)
     1404            return flushEventualQueue()
     1405        d.addCallback(_check2)
     1406        def _check3(ign):
     1407            self.failIfEqual(node.processed, None)
     1408            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1409                                                      3: "block-3",
     1410                                                      4: "block-4"}) )
     1411        d.addCallback(_check3)
     1412        return d
     1413
     1414    def test_suffer_bad_diversity_late(self):
     1415        node = FakeNode()
     1416        sf = MySegmentFetcher(node, 0, 3)
     1417        # we satisfy the read entirely from the first server because we don't
     1418        # have any other choice.
     1419        shares = [MyShare(0, "peer-A", 0.0),
     1420                  MyShare(1, "peer-A", 0.0),
     1421                  MyShare(2, "peer-A", 0.0),
     1422                  MyShare(3, "peer-A", 0.0),
     1423                  MyShare(4, "peer-A", 0.0),
     1424                  ]
     1425        sf.add_shares([])
     1426        d = flushEventualQueue()
     1427        def _check1(ign):
     1428            self.failUnlessEqual(node.want_more, 1)
     1429            sf.add_shares(shares)
     1430            return flushEventualQueue()
     1431        d.addCallback(_check1)
     1432        def _check2(ign):
     1433            self.failUnlessEqual(node.want_more, 3)
     1434            self.failUnlessEqual(sf._test_start_shares,
     1435                                 [shares[0], shares[1], shares[2]])
     1436            for sh in sf._test_start_shares:
     1437                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1438                                           "block-%d" % sh._shnum)
     1439            return flushEventualQueue()
     1440        d.addCallback(_check2)
     1441        def _check3(ign):
     1442            self.failIfEqual(node.processed, None)
     1443            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1444                                                      1: "block-1",
     1445                                                      2: "block-2"}) )
     1446        d.addCallback(_check3)
     1447        return d
     1448
     1449    def test_suffer_bad_diversity_early(self):
     1450        node = FakeNode()
     1451        sf = MySegmentFetcher(node, 0, 3)
     1452        # we satisfy the read entirely from the first server because we don't
     1453        # have any other choice.
     1454        shares = [MyShare(0, "peer-A", 0.0),
     1455                  MyShare(1, "peer-A", 0.0),
     1456                  MyShare(2, "peer-A", 0.0),
     1457                  MyShare(3, "peer-A", 0.0),
     1458                  MyShare(4, "peer-A", 0.0),
     1459                  ]
     1460        sf.add_shares(shares)
     1461        d = flushEventualQueue()
     1462        def _check1(ign):
     1463            self.failUnlessEqual(node.want_more, 2)
     1464            self.failUnlessEqual(sf._test_start_shares,
     1465                                 [shares[0], shares[1], shares[2]])
     1466            for sh in sf._test_start_shares:
     1467                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1468                                           "block-%d" % sh._shnum)
     1469            return flushEventualQueue()
     1470        d.addCallback(_check1)
     1471        def _check2(ign):
     1472            self.failIfEqual(node.processed, None)
     1473            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1474                                                      1: "block-1",
     1475                                                      2: "block-2"}) )
     1476        d.addCallback(_check2)
     1477        return d
     1478
     1479    def test_overdue(self):
     1480        node = FakeNode()
     1481        sf = MySegmentFetcher(node, 0, 3)
     1482        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
     1483        sf.add_shares(shares)
     1484        d = flushEventualQueue()
     1485        def _check1(ign):
     1486            self.failUnlessEqual(node.want_more, 0)
     1487            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1488            for sh in sf._test_start_shares:
     1489                sf._block_request_activity(sh, sh._shnum, OVERDUE)
     1490            return flushEventualQueue()
     1491        d.addCallback(_check1)
     1492        def _check2(ign):
     1493            self.failUnlessEqual(sf._test_start_shares, shares[:6])
     1494            for sh in sf._test_start_shares[3:]:
     1495                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1496                                           "block-%d" % sh._shnum)
     1497            return flushEventualQueue()
     1498        d.addCallback(_check2)
     1499        def _check3(ign):
     1500            self.failIfEqual(node.processed, None)
     1501            self.failUnlessEqual(node.processed, (0, {3: "block-3",
     1502                                                      4: "block-4",
     1503                                                      5: "block-5"}) )
     1504        d.addCallback(_check3)
     1505        return d
     1506
     1507    def test_overdue_fails(self):
     1508        node = FakeNode()
     1509        sf = MySegmentFetcher(node, 0, 3)
     1510        shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
     1511        sf.add_shares(shares)
     1512        sf.no_more_shares()
     1513        d = flushEventualQueue()
     1514        def _check1(ign):
     1515            self.failUnlessEqual(node.want_more, 0)
     1516            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1517            for sh in sf._test_start_shares:
     1518                sf._block_request_activity(sh, sh._shnum, OVERDUE)
     1519            return flushEventualQueue()
     1520        d.addCallback(_check1)
     1521        def _check2(ign):
     1522            self.failUnlessEqual(sf._test_start_shares, shares[:6])
     1523            for sh in sf._test_start_shares[3:]:
     1524                sf._block_request_activity(sh, sh._shnum, DEAD)
     1525            return flushEventualQueue()
     1526        d.addCallback(_check2)
     1527        def _check3(ign):
     1528            # we're still waiting
     1529            self.failUnlessEqual(node.processed, None)
     1530            self.failUnlessEqual(node.failed, None)
     1531            # now complete one of the overdue ones, and kill one of the other
     1532            # ones, leaving one hanging. This should trigger a failure, since
     1533            # we cannot succeed.
     1534            live = sf._test_start_shares[0]
     1535            die = sf._test_start_shares[1]
     1536            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
     1537            sf._block_request_activity(die, die._shnum, DEAD)
     1538            return flushEventualQueue()
     1539        d.addCallback(_check3)
     1540        def _check4(ign):
     1541            self.failUnless(node.failed)
     1542            self.failUnless(node.failed.check(NotEnoughSharesError))
     1543            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
     1544                              str(node.failed))
     1545        d.addCallback(_check4)
     1546        return d
     1547
     1548    def test_avoid_redundancy(self):
     1549        node = FakeNode()
     1550        sf = MySegmentFetcher(node, 0, 3)
     1551        # we could satisfy the read entirely from the first server, but we'd
     1552        # prefer not to. Instead, we expect to only pull one share from the
     1553        # first server
     1554        shares = [MyShare(0, "peer-A", 0.0),
     1555                  MyShare(1, "peer-B", 1.0),
     1556                  MyShare(0, "peer-C", 2.0), # this will be skipped
     1557                  MyShare(1, "peer-D", 3.0),
     1558                  MyShare(2, "peer-E", 4.0),
     1559                  ]
     1560        sf.add_shares(shares[:3])
     1561        d = flushEventualQueue()
     1562        def _check1(ign):
     1563            self.failUnlessEqual(node.want_more, 1)
     1564            self.failUnlessEqual(sf._test_start_shares,
     1565                                 [shares[0], shares[1]])
     1566            # allow sh1 to retire
     1567            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
     1568            return flushEventualQueue()
     1569        d.addCallback(_check1)
     1570        def _check2(ign):
     1571            # and then feed in the remaining shares
     1572            sf.add_shares(shares[3:])
     1573            sf.no_more_shares()
     1574            return flushEventualQueue()
     1575        d.addCallback(_check2)
     1576        def _check3(ign):
     1577            self.failUnlessEqual(sf._test_start_shares,
     1578                                 [shares[0], shares[1], shares[4]])
     1579            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
     1580            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
     1581            return flushEventualQueue()
     1582        d.addCallback(_check3)
     1583        def _check4(ign):
     1584            self.failIfEqual(node.processed, None)
     1585            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1586                                                      1: "block-1",
     1587                                                      2: "block-2"}) )
     1588        d.addCallback(_check4)
     1589        return d
  • src/allmydata/test/test_immutable.py

    diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py
    index 813c5be..17d8472 100644
    a b class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) 
    5252        def _after_download(unused=None):
    5353            after_download_reads = self._count_reads()
    5454            #print before_download_reads, after_download_reads
    55             self.failIf(after_download_reads-before_download_reads > 27,
     55            self.failIf(after_download_reads-before_download_reads > 40,
    5656                        (after_download_reads, before_download_reads))
    5757        d.addCallback(self._download_and_check_plaintext)
    5858        d.addCallback(_after_download)
    class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) 
    7070        def _after_download(unused=None):
    7171            after_download_reads = self._count_reads()
    7272            #print before_download_reads, after_download_reads
    73             self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads))
     73            self.failIf(after_download_reads-before_download_reads > 37, (after_download_reads, before_download_reads))
    7474        d.addCallback(self._download_and_check_plaintext)
    7575        d.addCallback(_after_download)
    7676        return d
  • src/allmydata/test/test_web.py

    diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py
    index 78b9902..c05db3b 100644
    a b class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi 
    42394239        def _check_one_share(body):
    42404240            self.failIf("<html>" in body, body)
    42414241            body = " ".join(body.strip().split())
    4242             msg = ("NotEnoughSharesError: This indicates that some "
    4243                    "servers were unavailable, or that shares have been "
    4244                    "lost to server departure, hard drive failure, or disk "
    4245                    "corruption. You should perform a filecheck on "
    4246                    "this object to learn more. The full error message is:"
    4247                    " ran out of shares: %d complete, %d pending, 0 overdue,"
    4248                    " 0 unused, need 3. Last failure: None")
    4249             msg1 = msg % (1, 0)
    4250             msg2 = msg % (0, 1)
     4242            msgbase = ("NotEnoughSharesError: This indicates that some "
     4243                       "servers were unavailable, or that shares have been "
     4244                       "lost to server departure, hard drive failure, or disk "
     4245                       "corruption. You should perform a filecheck on "
     4246                       "this object to learn more. The full error message is:"
     4247                       )
     4248            msg1 = msgbase + (" ran out of shares:"
     4249                              " complete=sh0"
     4250                              " pending="
     4251                              " overdue= unused= need 3. Last failure: None")
     4252            msg2 = msgbase + (" ran out of shares:"
     4253                              " complete="
     4254                              " pending=Share(sh0-on-xgru5)"
     4255                              " overdue= unused= need 3. Last failure: None")
    42514256            self.failUnless(body == msg1 or body == msg2, body)
    42524257        d.addCallback(_check_one_share)
    42534258