Ticket #1170: 1170-p2.diff

File 1170-p2.diff, 66.0 KB (added by warner, at 2010-08-31T19:56:34Z)

for review: use diversity-seeking share-selection algorithm, improve logging

  • src/allmydata/immutable/downloader/fetcher.py

    diff --git a/src/allmydata/immutable/downloader/fetcher.py b/src/allmydata/immutable/downloader/fetcher.py
    index e30ced8..e78d37e 100644
    a b from foolscap.api import eventually 
    44from allmydata.interfaces import NotEnoughSharesError, NoSharesError
    55from allmydata.util import log
    66from allmydata.util.dictutil import DictOfSets
    7 from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
    8      BADSEGNUM, BadSegmentNumberError
     7from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
     8     BadSegmentNumberError
    99
    1010class SegmentFetcher:
    1111    """I am responsible for acquiring blocks for a single segment. I will use
    class SegmentFetcher: 
    2222    will shut down and do no further work. My parent can also call my stop()
    2323    method to have me shut down early."""
    2424
    25     def __init__(self, node, segnum, k):
     25    def __init__(self, node, segnum, k, logparent):
    2626        self._node = node # _Node
    2727        self.segnum = segnum
    2828        self._k = k
    29         self._shares = {} # maps non-dead Share instance to a state, one of
    30                           # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
    31                           # State transition map is:
    32                           #  AVAILABLE -(send-read)-> PENDING
    33                           #  PENDING -(timer)-> OVERDUE
    34                           #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
    35                           #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
    36                           # If a share becomes DEAD, it is removed from the
    37                           # dict. If it becomes BADSEGNUM, the whole fetch is
    38                           # terminated.
     29        self._shares = [] # unused Share instances, sorted by "goodness"
     30                          # (RTT), then shnum. This is populated when DYHB
     31                          # responses arrive, or (for later segments) at
     32                          # startup. We remove shares from it when we call
     33                          # sh.get_block() on them.
     34        self._shares_from_server = DictOfSets() # maps serverid to set of
     35                                                # Shares on that server for
     36                                                # which we have outstanding
     37                                                # get_block() calls.
     38        self._max_shares_per_server = 1 # how many Shares we're allowed to
     39                                        # pull from each server. This starts
     40                                        # at 1 and grows if we don't have
     41                                        # sufficient diversity.
     42        self._active_share_map = {} # maps shnum to outstanding (and not
     43                                    # OVERDUE) Share that provides it.
     44        self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
     45        self._lp = logparent
    3946        self._share_observers = {} # maps Share to EventStreamObserver for
    4047                                   # active ones
    41         self._shnums = DictOfSets() # maps shnum to the shares that provide it
    4248        self._blocks = {} # maps shnum to validated block data
    4349        self._no_more_shares = False
    44         self._bad_segnum = False
    4550        self._last_failure = None
    4651        self._running = True
    4752
    4853    def stop(self):
    4954        log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
    50                 level=log.NOISY, umid="LWyqpg")
     55                level=log.NOISY, parent=self._lp, umid="LWyqpg")
    5156        self._cancel_all_requests()
    5257        self._running = False
    53         self._shares.clear() # let GC work # ??? XXX
     58        # help GC ??? XXX
     59        del self._shares, self._shares_from_server, self._active_share_map
     60        del self._share_observers
    5461
    5562
    5663    # called by our parent _Node
    class SegmentFetcher: 
    5966        # called when ShareFinder locates a new share, and when a non-initial
    6067        # segment fetch is started and we already know about shares from the
    6168        # previous segment
    62         for s in shares:
    63             self._shares[s] = AVAILABLE
    64             self._shnums.add(s._shnum, s)
     69        self._shares.extend(shares)
     70        self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
    6571        eventually(self.loop)
    6672
    6773    def no_more_shares(self):
    class SegmentFetcher: 
    7177
    7278    # internal methods
    7379
    74     def _count_shnums(self, *states):
    75         """shnums for which at least one state is in the following list"""
    76         shnums = []
    77         for shnum,shares in self._shnums.iteritems():
    78             matches = [s for s in shares if self._shares.get(s) in states]
    79             if matches:
    80                 shnums.append(shnum)
    81         return len(shnums)
    82 
    8380    def loop(self):
    8481        try:
    8582            # if any exception occurs here, kill the download
    class SegmentFetcher: 
    9289        k = self._k
    9390        if not self._running:
    9491            return
    95         if self._bad_segnum:
     92        numsegs, authoritative = self._node.get_num_segments()
     93        if authoritative and self.segnum >= numsegs:
    9694            # oops, we were asking for a segment number beyond the end of the
    9795            # file. This is an error.
    9896            self.stop()
    class SegmentFetcher: 
    102100            self._node.fetch_failed(self, f)
    103101            return
    104102
     103        #print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
     104        # Should we sent out more requests?
     105        while len(set(self._blocks.keys())
     106                  | set(self._active_share_map.keys())
     107                  ) < k:
     108            # we don't have data or active requests for enough shares. Are
     109            # there any unused shares we can start using?
     110            (sent_something, want_more_diversity) = self._find_and_use_share()
     111            if sent_something:
     112                # great. loop back around in case we need to send more.
     113                continue
     114            if want_more_diversity:
     115                # we could have sent something if we'd been allowed to pull
     116                # more shares per server. Increase the limit and try again.
     117                self._max_shares_per_server += 1
     118                log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
     119                        % (self._node._si_prefix, self._max_shares_per_server),
     120                        level=log.NOISY, umid="xY2pBA")
     121                # Also ask for more shares, in the hopes of achieving better
     122                # diversity for the next segment.
     123                self._ask_for_more_shares()
     124                continue
     125            # we need more shares than the ones in self._shares to make
     126            # progress
     127            self._ask_for_more_shares()
     128            if self._no_more_shares:
     129                # But there are no more shares to be had. If we're going to
     130                # succeed, it will be with the shares we've already seen.
     131                # Will they be enough?
     132                if len(set(self._blocks.keys())
     133                       | set(self._active_share_map.keys())
     134                       | set(self._overdue_share_map.keys())
     135                       ) < k:
     136                    # nope. bail.
     137                    self._no_shares_error() # this calls self.stop()
     138                    return
     139                # our outstanding or overdue requests may yet work.
     140            # more shares may be coming. Wait until then.
     141            return
     142
    105143        # are we done?
    106         if self._count_shnums(COMPLETE) >= k:
     144        if len(set(self._blocks.keys())) >= k:
    107145            # yay!
    108146            self.stop()
    109147            self._node.process_blocks(self.segnum, self._blocks)
    110148            return
    111149
    112         # we may have exhausted everything
    113         if (self._no_more_shares and
    114             self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
    115             # no more new shares are coming, and the remaining hopeful shares
    116             # aren't going to be enough. boo!
    117 
    118             log.msg("share states: %r" % (self._shares,),
    119                     level=log.NOISY, umid="0ThykQ")
    120             if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
    121                 format = ("no shares (need %(k)d)."
    122                           " Last failure: %(last_failure)s")
    123                 args = { "k": k,
    124                          "last_failure": self._last_failure }
    125                 error = NoSharesError
    126             else:
    127                 format = ("ran out of shares: %(complete)d complete,"
    128                           " %(pending)d pending, %(overdue)d overdue,"
    129                           " %(unused)d unused, need %(k)d."
    130                           " Last failure: %(last_failure)s")
    131                 args = {"complete": self._count_shnums(COMPLETE),
    132                         "pending": self._count_shnums(PENDING),
    133                         "overdue": self._count_shnums(OVERDUE),
    134                         # 'unused' should be zero
    135                         "unused": self._count_shnums(AVAILABLE),
    136                         "k": k,
    137                         "last_failure": self._last_failure,
    138                         }
    139                 error = NotEnoughSharesError
    140             log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
    141             e = error(format % args)
    142             f = Failure(e)
    143             self.stop()
    144             self._node.fetch_failed(self, f)
    145             return
     150    def _no_shares_error(self):
     151        if not (self._shares or self._active_share_map or
     152                self._overdue_share_map or self._blocks):
     153            format = ("no shares (need %(k)d)."
     154                      " Last failure: %(last_failure)s")
     155            args = { "k": self._k,
     156                     "last_failure": self._last_failure }
     157            error = NoSharesError
     158        else:
     159            format = ("ran out of shares: complete=%(complete)s"
     160                      " pending=%(pending)s overdue=%(overdue)s"
     161                      " unused=%(unused)s need %(k)d."
     162                      " Last failure: %(last_failure)s")
     163            def join(shnums): return ",".join(["sh%d" % shnum
     164                                               for shnum in sorted(shnums)])
     165            pending_s = ",".join([str(sh)
     166                                  for sh in self._active_share_map.values()])
     167            overdue = set()
     168            for shares in self._overdue_share_map.values():
     169                overdue |= shares
     170            overdue_s = ",".join([str(sh) for sh in overdue])
     171            args = {"complete": join(self._blocks.keys()),
     172                    "pending": pending_s,
     173                    "overdue": overdue_s,
     174                    # 'unused' should be zero
     175                    "unused": ",".join([str(sh) for sh in self._shares]),
     176                    "k": self._k,
     177                    "last_failure": self._last_failure,
     178                    }
     179            error = NotEnoughSharesError
     180        log.msg(format=format,
     181                level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
     182                **args)
     183        e = error(format % args)
     184        f = Failure(e)
     185        self.stop()
     186        self._node.fetch_failed(self, f)
    146187
    147         # nope, not done. Are we "block-hungry" (i.e. do we want to send out
    148         # more read requests, or do we think we have enough in flight
    149         # already?)
    150         while self._count_shnums(PENDING, COMPLETE) < k:
    151             # we're hungry.. are there any unused shares?
    152             sent = self._send_new_request()
    153             if not sent:
    154                 break
    155 
    156         # ok, now are we "share-hungry" (i.e. do we have enough known shares
    157         # to make us happy, or should we ask the ShareFinder to get us more?)
    158         if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
    159             # we're hungry for more shares
    160             self._node.want_more_shares()
    161             # that will trigger the ShareFinder to keep looking
    162 
    163     def _find_one(self, shares, state):
    164         # TODO could choose fastest, or avoid servers already in use
    165         for s in shares:
    166             if self._shares[s] == state:
    167                 return s
    168         # can never get here, caller has assert in case of code bug
    169 
    170     def _send_new_request(self):
    171         # TODO: this is probably O(k^2), and we're called from a range(k)
    172         # loop, so O(k^3)
    173 
    174         # this first loop prefers sh0, then sh1, sh2, etc
    175         for shnum,shares in sorted(self._shnums.iteritems()):
    176             states = [self._shares[s] for s in shares]
    177             if COMPLETE in states or PENDING in states:
    178                 # don't send redundant requests
     188    def _find_and_use_share(self):
     189        sent_something = False
     190        want_more_diversity = False
     191        for sh in self._shares: # find one good share to fetch
     192            shnum = sh._shnum ; serverid = sh._peerid
     193            if shnum in self._blocks:
     194                continue # don't request data we already have
     195            if shnum in self._active_share_map:
     196                # note: OVERDUE shares are removed from _active_share_map
     197                # and added to _overdue_share_map instead.
     198                continue # don't send redundant requests
     199            sfs = self._shares_from_server
     200            if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
     201                # don't pull too much from a single server
     202                want_more_diversity = True
    179203                continue
    180             if AVAILABLE not in states:
    181                 # no candidates for this shnum, move on
    182                 continue
    183             # here's a candidate. Send a request.
    184             s = self._find_one(shares, AVAILABLE)
    185             assert s
    186             self._shares[s] = PENDING
    187             self._share_observers[s] = o = s.get_block(self.segnum)
    188             o.subscribe(self._block_request_activity, share=s, shnum=shnum)
    189             # TODO: build up a list of candidates, then walk through the
    190             # list, sending requests to the most desireable servers,
    191             # re-checking our block-hunger each time. For non-initial segment
    192             # fetches, this would let us stick with faster servers.
    193             return True
    194         # nothing was sent: don't call us again until you have more shares to
    195         # work with, or one of the existing shares has been declared OVERDUE
    196         return False
     204            # ok, we can use this share
     205            self._shares.remove(sh)
     206            self._active_share_map[shnum] = sh
     207            self._shares_from_server.add(serverid, sh)
     208            self._start_share(sh, shnum)
     209            sent_something = True
     210            break
     211        return (sent_something, want_more_diversity)
     212
     213    def _start_share(self, share, shnum):
     214        self._share_observers[share] = o = share.get_block(self.segnum)
     215        o.subscribe(self._block_request_activity, share=share, shnum=shnum)
     216
     217    def _ask_for_more_shares(self):
     218        if not self._no_more_shares:
     219            self._node.want_more_shares()
     220            # that will trigger the ShareFinder to keep looking, and call our
     221            # add_shares() or no_more_shares() later.
    197222
    198223    def _cancel_all_requests(self):
    199224        for o in self._share_observers.values():
    class SegmentFetcher: 
    207232        log.msg("SegmentFetcher(%s)._block_request_activity:"
    208233                " Share(sh%d-on-%s) -> %s" %
    209234                (self._node._si_prefix, shnum, share._peerid_s, state),
    210                 level=log.NOISY, umid="vilNWA")
    211         # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
     235                level=log.NOISY, parent=self._lp, umid="vilNWA")
     236        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
     237        # from all our tracking lists.
    212238        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
    213239            self._share_observers.pop(share, None)
     240            self._shares_from_server.discard(shnum, share)
     241            if self._active_share_map.get(shnum) is share:
     242                del self._active_share_map[shnum]
     243            self._overdue_share_map.discard(shnum, share)
     244
    214245        if state is COMPLETE:
    215             # 'block' is fully validated
    216             self._shares[share] = COMPLETE
     246            # 'block' is fully validated and complete
    217247            self._blocks[shnum] = block
    218         elif state is OVERDUE:
    219             self._shares[share] = OVERDUE
     248
     249        if state is OVERDUE:
     250            # no longer active, but still might complete
     251            del self._active_share_map[shnum]
     252            self._overdue_share_map.add(shnum, share)
    220253            # OVERDUE is not terminal: it will eventually transition to
    221254            # COMPLETE, CORRUPT, or DEAD.
    222         elif state is CORRUPT:
    223             self._shares[share] = CORRUPT
    224         elif state is DEAD:
    225             del self._shares[share]
    226             self._shnums[shnum].remove(share)
    227             self._last_failure = f
    228         elif state is BADSEGNUM:
    229             self._shares[share] = BADSEGNUM # ???
    230             self._bad_segnum = True
    231         eventually(self.loop)
    232255
     256        if state is DEAD:
     257            self._last_failure = f
     258        if state is BADSEGNUM:
     259            # our main loop will ask the DownloadNode each time for the
     260            # number of segments, so we'll deal with this in the top of
     261            # _do_loop
     262            pass
    233263
     264        eventually(self.loop)
  • src/allmydata/immutable/downloader/finder.py

    diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py
    index 9adee99..fa6204c 100644
    a b class ShareFinder: 
    3535        self._storage_broker = storage_broker
    3636        self.share_consumer = self.node = node
    3737        self.max_outstanding_requests = max_outstanding_requests
    38 
    3938        self._hungry = False
    4039
    4140        self._commonshares = {} # shnum to CommonShare instance
    42         self.undelivered_shares = []
    4341        self.pending_requests = set()
    4442        self.overdue_requests = set() # subset of pending_requests
    4543        self.overdue_timers = {}
    class ShareFinder: 
    5250                           si=self._si_prefix,
    5351                           level=log.NOISY, parent=logparent, umid="2xjj2A")
    5452
     53    def update_num_segments(self):
     54        (numsegs, authoritative) = self.node.get_num_segments()
     55        assert authoritative
     56        for cs in self._commonshares.values():
     57            cs.set_authoritative_num_segments(numsegs)
     58
    5559    def start_finding_servers(self):
    5660        # don't get servers until somebody uses us: creating the
    5761        # ImmutableFileNode should not cause work to happen yet. Test case is
    class ShareFinder: 
    8387
    8488    # internal methods
    8589    def loop(self):
    86         undelivered_s = ",".join(["sh%d@%s" %
    87                                   (s._shnum, idlib.shortnodeid_b2a(s._peerid))
    88                                   for s in self.undelivered_shares])
    8990        pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
    9091                              for rt in self.pending_requests]) # sort?
    9192        self.log(format="ShareFinder loop: running=%(running)s"
    92                  " hungry=%(hungry)s, undelivered=%(undelivered)s,"
    93                  " pending=%(pending)s",
    94                  running=self.running, hungry=self._hungry,
    95                  undelivered=undelivered_s, pending=pending_s,
     93                 " hungry=%(hungry)s, pending=%(pending)s",
     94                 running=self.running, hungry=self._hungry, pending=pending_s,
    9695                 level=log.NOISY, umid="kRtS4Q")
    9796        if not self.running:
    9897            return
    9998        if not self._hungry:
    10099            return
    101         if self.undelivered_shares:
    102             sh = self.undelivered_shares.pop(0)
    103             # they will call hungry() again if they want more
    104             self._hungry = False
    105             self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",
    106                      shnum=sh._shnum, peerid=sh._peerid_s,
    107                      level=log.NOISY, umid="2n1qQw")
    108             eventually(self.share_consumer.got_shares, [sh])
    109             return
    110100
    111101        non_overdue = self.pending_requests - self.overdue_requests
    112102        if len(non_overdue) >= self.max_outstanding_requests:
    class ShareFinder: 
    146136        lp = self.log(format="sending DYHB to [%(peerid)s]",
    147137                      peerid=idlib.shortnodeid_b2a(peerid),
    148138                      level=log.NOISY, umid="Io7pyg")
    149         d_ev = self._download_status.add_dyhb_sent(peerid, now())
     139        time_sent = now()
     140        d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
    150141        # TODO: get the timer from a Server object, it knows best
    151142        self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
    152143                                                     self.overdue, req)
    153144        d = rref.callRemote("get_buckets", self._storage_index)
    154145        d.addBoth(incidentally, self._request_retired, req)
    155146        d.addCallbacks(self._got_response, self._got_error,
    156                        callbackArgs=(rref.version, peerid, req, d_ev, lp),
     147                       callbackArgs=(rref.version, peerid, req, d_ev,
     148                                     time_sent, lp),
    157149                       errbackArgs=(peerid, req, d_ev, lp))
    158150        d.addErrback(log.err, format="error in send_request",
    159151                     level=log.WEIRD, parent=lp, umid="rpdV0w")
    class ShareFinder: 
    172164        self.overdue_requests.add(req)
    173165        eventually(self.loop)
    174166
    175     def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
     167    def _got_response(self, buckets, server_version, peerid, req, d_ev,
     168                      time_sent, lp):
    176169        shnums = sorted([shnum for shnum in buckets])
    177         d_ev.finished(shnums, now())
    178         if buckets:
    179             shnums_s = ",".join([str(shnum) for shnum in shnums])
    180             self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
    181                      shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
    182                      level=log.NOISY, parent=lp, umid="0fcEZw")
    183         else:
     170        time_received = now()
     171        d_ev.finished(shnums, time_received)
     172        dyhb_rtt = time_received - time_sent
     173        if not buckets:
    184174            self.log(format="no shares from [%(peerid)s]",
    185175                     peerid=idlib.shortnodeid_b2a(peerid),
    186176                     level=log.NOISY, parent=lp, umid="U7d4JA")
    187         if self.node.num_segments is None:
    188             best_numsegs = self.node.guessed_num_segments
    189         else:
    190             best_numsegs = self.node.num_segments
     177            return
     178        shnums_s = ",".join([str(shnum) for shnum in shnums])
     179        self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
     180                 shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
     181                 level=log.NOISY, parent=lp, umid="0fcEZw")
     182        shares = []
    191183        for shnum, bucket in buckets.iteritems():
    192             self._create_share(best_numsegs, shnum, bucket, server_version,
    193                                peerid)
     184            s = self._create_share(shnum, bucket, server_version, peerid,
     185                                   dyhb_rtt)
     186            shares.append(s)
     187        self._deliver_shares(shares)
    194188
    195     def _create_share(self, best_numsegs, shnum, bucket, server_version,
    196                       peerid):
     189    def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
    197190        if shnum in self._commonshares:
    198191            cs = self._commonshares[shnum]
    199192        else:
    200             cs = CommonShare(best_numsegs, self._si_prefix, shnum,
     193            numsegs, authoritative = self.node.get_num_segments()
     194            cs = CommonShare(numsegs, self._si_prefix, shnum,
    201195                             self._node_logparent)
     196            if authoritative:
     197                cs.set_authoritative_num_segments(numsegs)
    202198            # Share._get_satisfaction is responsible for updating
    203199            # CommonShare.set_numsegs after we know the UEB. Alternatives:
    204200            #  1: d = self.node.get_num_segments()
    class ShareFinder: 
    214210            #     Yuck.
    215211            self._commonshares[shnum] = cs
    216212        s = Share(bucket, server_version, self.verifycap, cs, self.node,
    217                   self._download_status, peerid, shnum,
     213                  self._download_status, peerid, shnum, dyhb_rtt,
    218214                  self._node_logparent)
    219         self.undelivered_shares.append(s)
     215        return s
     216
     217    def _deliver_shares(self, shares):
     218        # they will call hungry() again if they want more
     219        self._hungry = False
     220        shares_s = ",".join([str(sh) for sh in shares])
     221        self.log(format="delivering shares: %s" % shares_s,
     222                 level=log.NOISY, umid="2n1qQw")
     223        eventually(self.share_consumer.got_shares, shares)
    220224
    221225    def _got_error(self, f, peerid, req, d_ev, lp):
    222226        d_ev.finished("error", now())
  • src/allmydata/immutable/downloader/node.py

    diff --git a/src/allmydata/immutable/downloader/node.py b/src/allmydata/immutable/downloader/node.py
    index 4c92dd8..33c16cf 100644
    a b class DownloadNode: 
    7272        # things to track callers that want data
    7373
    7474        # _segment_requests can have duplicates
    75         self._segment_requests = [] # (segnum, d, cancel_handle)
     75        self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
    7676        self._active_segment = None # a SegmentFetcher, with .segnum
    7777
    7878        self._segsize_observers = observer.OneShotObserverList()
    class DownloadNode: 
    8181        # for each read() call. Segmentation and get_segment() messages are
    8282        # associated with the read() call, everything else is tied to the
    8383        # _Node's log entry.
    84         lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d,"
     84        lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
     85                     " size=%(size)d,"
    8586                     " guessed_segsize=%(guessed_segsize)d,"
    8687                     " guessed_numsegs=%(guessed_numsegs)d",
    8788                     si=self._si_prefix, size=verifycap.size,
    class DownloadNode: 
    103104        # as with CommonShare, our ciphertext_hash_tree is a stub until we
    104105        # get the real num_segments
    105106        self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
     107        self.ciphertext_hash_tree_leaves = self.guessed_num_segments
    106108
    107109    def __repr__(self):
    108         return "Imm_Node(%s)" % (self._si_prefix,)
     110        return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
    109111
    110112    def stop(self):
    111113        # called by the Terminator at shutdown, mostly for tests
    class DownloadNode: 
    175177        The Deferred can also errback with other fatal problems, such as
    176178        NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
    177179        """
    178         log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
    179                 si=base32.b2a(self._verifycap.storage_index)[:8],
    180                 segnum=segnum,
    181                 level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
     180        lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
     181                     si=base32.b2a(self._verifycap.storage_index)[:8],
     182                     segnum=segnum,
     183                     level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
    182184        self._download_status.add_segment_request(segnum, now())
    183185        d = defer.Deferred()
    184186        c = Cancel(self._cancel_request)
    185         self._segment_requests.append( (segnum, d, c) )
     187        self._segment_requests.append( (segnum, d, c, lp) )
    186188        self._start_new_segment()
    187189        return (d, c)
    188190
    class DownloadNode: 
    208210        if self._active_segment is None and self._segment_requests:
    209211            segnum = self._segment_requests[0][0]
    210212            k = self._verifycap.needed_shares
     213            lp = self._segment_requests[0][3]
    211214            log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
    212215                    node=repr(self), segnum=segnum,
    213                     level=log.NOISY, umid="wAlnHQ")
    214             self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
     216                    level=log.NOISY, parent=lp, umid="wAlnHQ")
     217            self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
    215218            active_shares = [s for s in self._shares if s.is_alive()]
    216219            fetcher.add_shares(active_shares) # this triggers the loop
    217220
    class DownloadNode: 
    234237        h = hashutil.uri_extension_hash(UEB_s)
    235238        if h != self._verifycap.uri_extension_hash:
    236239            raise BadHashError
    237         UEB_dict = uri.unpack_extension(UEB_s)
    238         self._parse_and_store_UEB(UEB_dict) # sets self._stuff
     240        self._parse_and_store_UEB(UEB_s) # sets self._stuff
    239241        # TODO: a malformed (but authentic) UEB could throw an assertion in
    240242        # _parse_and_store_UEB, and we should abandon the download.
    241243        self.have_UEB = True
    242244
    243     def _parse_and_store_UEB(self, d):
     245        # inform the ShareFinder about our correct number of segments. This
     246        # will update the block-hash-trees in all existing CommonShare
     247        # instances, and will populate new ones with the correct value.
     248        self._sharefinder.update_num_segments()
     249
     250    def _parse_and_store_UEB(self, UEB_s):
    244251        # Note: the UEB contains needed_shares and total_shares. These are
    245252        # redundant and inferior (the filecap contains the authoritative
    246253        # values). However, because it is possible to encode the same file in
    class DownloadNode: 
    252259
    253260        # therefore, we ignore d['total_shares'] and d['needed_shares'].
    254261
     262        d = uri.unpack_extension(UEB_s)
     263
    255264        log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
    256                 ueb=repr(d), vcap=self._verifycap.to_string(),
     265                ueb=repr(uri.unpack_extension_readable(UEB_s)),
     266                vcap=self._verifycap.to_string(),
    257267                level=log.NOISY, parent=self._lp, umid="cVqZnA")
    258268
    259269        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
    class DownloadNode: 
    292302        # shares of file B. self.ciphertext_hash_tree was a guess before:
    293303        # this is where we create it for real.
    294304        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
     305        self.ciphertext_hash_tree_leaves = self.num_segments
    295306        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
    296307
    297308        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
    class DownloadNode: 
    344355                                   % (hashnum, len(self.share_hash_tree)))
    345356        self.share_hash_tree.set_hashes(share_hashes)
    346357
     358    def get_desired_ciphertext_hashes(self, segnum):
     359        if segnum < self.ciphertext_hash_tree_leaves:
     360            return self.ciphertext_hash_tree.needed_hashes(segnum,
     361                                                           include_leaf=True)
     362        return []
    347363    def get_needed_ciphertext_hashes(self, segnum):
    348364        cht = self.ciphertext_hash_tree
    349365        return cht.needed_hashes(segnum, include_leaf=True)
     366
    350367    def process_ciphertext_hashes(self, hashes):
    351368        assert self.num_segments is not None
    352369        # this may raise BadHashError or NotEnoughHashesError
    class DownloadNode: 
    457474    def _extract_requests(self, segnum):
    458475        """Remove matching requests and return their (d,c) tuples so that the
    459476        caller can retire them."""
    460         retire = [(d,c) for (segnum0, d, c) in self._segment_requests
     477        retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
    461478                  if segnum0 == segnum]
    462479        self._segment_requests = [t for t in self._segment_requests
    463480                                  if t[0] != segnum]
    class DownloadNode: 
    466483    def _cancel_request(self, c):
    467484        self._segment_requests = [t for t in self._segment_requests
    468485                                  if t[2] != c]
    469         segnums = [segnum for (segnum,d,c) in self._segment_requests]
     486        segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
    470487        # self._active_segment might be None in rare circumstances, so make
    471488        # sure we tolerate it
    472489        if self._active_segment and self._active_segment.segnum not in segnums:
    473490            self._active_segment.stop()
    474491            self._active_segment = None
    475492            self._start_new_segment()
     493
     494    # called by ShareFinder to choose hashtree sizes in CommonShares, and by
     495    # SegmentFetcher to tell if it is still fetching a valid segnum.
     496    def get_num_segments(self):
     497        # returns (best_num_segments, authoritative)
     498        if self.num_segments is None:
     499            return (self.guessed_num_segments, False)
     500        return (self.num_segments, True)
  • src/allmydata/immutable/downloader/share.py

    diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py
    index 413f907..78cce8e 100644
    a b class Share: 
    3333    # servers. A different backend would use a different class.
    3434
    3535    def __init__(self, rref, server_version, verifycap, commonshare, node,
    36                  download_status, peerid, shnum, logparent):
     36                 download_status, peerid, shnum, dyhb_rtt, logparent):
    3737        self._rref = rref
    3838        self._server_version = server_version
    3939        self._node = node # holds share_hash_tree and UEB
    class Share: 
    5151        self._storage_index = verifycap.storage_index
    5252        self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
    5353        self._shnum = shnum
     54        self._dyhb_rtt = dyhb_rtt
    5455        # self._alive becomes False upon fatal corruption or server error
    5556        self._alive = True
    5657        self._lp = log.msg(format="%(share)s created", share=repr(self),
    class Share: 
    278279            if not self._satisfy_UEB():
    279280                # can't check any hashes without the UEB
    280281                return False
     282            # the call to _satisfy_UEB() will immediately set the
     283            # authoritative num_segments in all our CommonShares. If we
     284            # guessed wrong, we might stil be working on a bogus segnum
     285            # (beyond the real range). We catch this and signal BADSEGNUM
     286            # before invoking any further code that touches hashtrees.
    281287        self.actual_segment_size = self._node.segment_size # might be updated
    282288        assert self.actual_segment_size is not None
    283289
    284         # knowing the UEB means knowing num_segments. Despite the redundancy,
    285         # this is the best place to set this. CommonShare.set_numsegs will
    286         # ignore duplicate calls.
     290        # knowing the UEB means knowing num_segments
    287291        assert self._node.num_segments is not None
    288         cs = self._commonshare
    289         cs.set_numsegs(self._node.num_segments)
    290292
    291293        segnum, observers = self._active_segnum_and_observers()
    292294        # if segnum is None, we don't really need to do anything (we have no
    class Share: 
    304306                # can't check block_hash_tree without a root
    305307                return False
    306308
    307         if cs.need_block_hash_root():
     309        if self._commonshare.need_block_hash_root():
    308310            block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
    309             cs.set_block_hash_root(block_hash_root)
     311            self._commonshare.set_block_hash_root(block_hash_root)
    310312
    311313        if segnum is None:
    312314            return False # we don't want any particular segment right now
    class Share: 
    360362                                  ] ):
    361363            offsets[field] = fields[i]
    362364        self.actual_offsets = offsets
    363         log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
     365        log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
     366                level=log.NOISY, parent=self._lp, umid="jedQcw")
    364367        self._received.remove(0, 4) # don't need this anymore
    365368
    366369        # validate the offsets a bit
    class Share: 
    517520        block = self._received.pop(blockstart, blocklen)
    518521        if not block:
    519522            log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
    520                                                               blockstart, blocklen))
     523                                                              blockstart, blocklen),
     524                    level=log.NOISY, parent=self._lp, umid="aK0RFw")
    521525            return False
    522526        log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
    523527                share=repr(self), start=blockstart, length=blocklen,
    class Share: 
    589593        if self.actual_offsets or self._overrun_ok:
    590594            if not self._node.have_UEB:
    591595                self._desire_UEB(desire, o)
    592             # They might ask for a segment that doesn't look right.
    593             # _satisfy() will catch+reject bad segnums once we know the UEB
    594             # (and therefore segsize and numsegs), so we'll only fail this
    595             # test if we're still guessing. We want to avoid asking the
    596             # hashtrees for needed_hashes() for bad segnums. So don't enter
    597             # _desire_hashes or _desire_data unless the segnum looks
    598             # reasonable.
    599             if segnum < r["num_segments"]:
    600                 # XXX somehow we're getting here for sh5. we don't yet know
    601                 # the actual_segment_size, we're still working off the guess.
    602                 # the ciphertext_hash_tree has been corrected, but the
    603                 # commonshare._block_hash_tree is still in the guessed state.
    604                 self._desire_share_hashes(desire, o)
    605                 if segnum is not None:
    606                     self._desire_block_hashes(desire, o, segnum)
    607                     self._desire_data(desire, o, r, segnum, segsize)
    608             else:
    609                 log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
    610                         % (segnum, r["num_segments"]),
    611                         level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
     596            self._desire_share_hashes(desire, o)
     597            if segnum is not None:
     598                # They might be asking for a segment number that is beyond
     599                # what we guess the file contains, but _desire_block_hashes
     600                # and _desire_data will tolerate that.
     601                self._desire_block_hashes(desire, o, segnum)
     602                self._desire_data(desire, o, r, segnum, segsize)
    612603
    613604        log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
    614                 % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
     605                % (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
     606                level=log.NOISY, parent=self._lp, umid="IG7CgA")
    615607        if self.actual_offsets:
    616608            return (want_it, need_it+gotta_gotta_have_it)
    617609        else:
    class Share: 
    681673        (want_it, need_it, gotta_gotta_have_it) = desire
    682674
    683675        # block hash chain
    684         for hashnum in self._commonshare.get_needed_block_hashes(segnum):
     676        for hashnum in self._commonshare.get_desired_block_hashes(segnum):
    685677            need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
    686678
    687679        # ciphertext hash chain
    688         for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
     680        for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
    689681            need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
    690682
    691683    def _desire_data(self, desire, o, r, segnum, segsize):
     684        if segnum > r["num_segments"]:
     685            # they're asking for a segment that's beyond what we think is the
     686            # end of the file. We won't get here if we've already learned the
     687            # real UEB: _get_satisfaction() will notice the out-of-bounds and
     688            # terminate the loop. So we must still be guessing, which means
     689            # that they might be correct in asking for such a large segnum.
     690            # But if they're right, then our segsize/segnum guess is
     691            # certainly wrong, which means we don't know what data blocks to
     692            # ask for yet. So don't bother adding anything. When the UEB
     693            # comes back and we learn the correct segsize/segnums, we'll
     694            # either reject the request or have enough information to proceed
     695            # normally. This costs one roundtrip.
     696            log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
     697                    % (segnum, r["num_segments"]),
     698                    level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
     699            return
    692700        (want_it, need_it, gotta_gotta_have_it) = desire
    693701        tail = (segnum == r["num_segments"]-1)
    694702        datastart = o["data"]
    class Share: 
    803811
    804812
    805813class CommonShare:
     814    # TODO: defer creation of the hashtree until somebody uses us. There will
     815    # be a lot of unused shares, and we shouldn't spend the memory on a large
     816    # hashtree unless necessary.
    806817    """I hold data that is common across all instances of a single share,
    807818    like sh2 on both servers A and B. This is just the block hash tree.
    808819    """
    809     def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
     820    def __init__(self, best_numsegs, si_prefix, shnum, logparent):
    810821        self.si_prefix = si_prefix
    811822        self.shnum = shnum
     823
    812824        # in the beginning, before we have the real UEB, we can only guess at
    813825        # the number of segments. But we want to ask for block hashes early.
    814826        # So if we're asked for which block hashes are needed before we know
    815827        # numsegs for sure, we return a guess.
    816         self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
    817         self._know_numsegs = False
     828        self._block_hash_tree = IncompleteHashTree(best_numsegs)
     829        self._block_hash_tree_is_authoritative = False
     830        self._block_hash_tree_leaves = best_numsegs
    818831        self._logparent = logparent
    819832
    820     def set_numsegs(self, numsegs):
    821         if self._know_numsegs:
    822             return
    823         self._block_hash_tree = IncompleteHashTree(numsegs)
    824         self._know_numsegs = True
     833    def __repr__(self):
     834        return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
     835
     836    def set_authoritative_num_segments(self, numsegs):
     837        if self._block_hash_tree_leaves != numsegs:
     838            self._block_hash_tree = IncompleteHashTree(numsegs)
     839            self._block_hash_tree_leaves = numsegs
     840        self._block_hash_tree_is_authoritative = True
    825841
    826842    def need_block_hash_root(self):
    827843        return bool(not self._block_hash_tree[0])
    828844
    829845    def set_block_hash_root(self, roothash):
    830         assert self._know_numsegs
     846        assert self._block_hash_tree_is_authoritative
    831847        self._block_hash_tree.set_hashes({0: roothash})
    832848
     849    def get_desired_block_hashes(self, segnum):
     850        if segnum < self._block_hash_tree_leaves:
     851            return self._block_hash_tree.needed_hashes(segnum,
     852                                                       include_leaf=True)
     853
     854        # the segnum might be out-of-bounds. Originally it was due to a race
     855        # between the receipt of the UEB on one share (from which we learn
     856        # the correct number of segments, update all hash trees to the right
     857        # size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
     858        # of a new Share to the SegmentFetcher while that BADSEGNUM was
     859        # queued (which sends out requests to the stale segnum, now larger
     860        # than the hash tree). I fixed that (by making SegmentFetcher.loop
     861        # check for a bad segnum at the start of each pass, instead of using
     862        # the queued BADSEGNUM or a flag it sets), but just in case this
     863        # still happens, I'm leaving the < in place. If it gets hit, there's
     864        # a potential lost-progress problem, but I'm pretty sure that it will
     865        # get cleared up on the following turn.
     866        return []
     867
    833868    def get_needed_block_hashes(self, segnum):
     869        assert self._block_hash_tree_is_authoritative
    834870        # XXX: include_leaf=True needs thought: how did the old downloader do
    835871        # it? I think it grabbed *all* block hashes and set them all at once.
    836872        # Since we want to fetch less data, we either need to fetch the leaf
    class CommonShare: 
    840876        return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
    841877
    842878    def process_block_hashes(self, block_hashes):
    843         assert self._know_numsegs
     879        assert self._block_hash_tree_is_authoritative
    844880        # this may raise BadHashError or NotEnoughHashesError
    845881        self._block_hash_tree.set_hashes(block_hashes)
    846882
    847883    def check_block(self, segnum, block):
    848         assert self._know_numsegs
     884        assert self._block_hash_tree_is_authoritative
    849885        h = hashutil.block_hash(block)
    850886        # this may raise BadHashError or NotEnoughHashesError
    851887        self._block_hash_tree.set_hashes(leaves={segnum: h})
     888
     889# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
     890# auxilliary OVERDUE callback. Just make sure to get all the messages in the
     891# right order and on the right turns.
     892
     893# TODO: we're asking for too much data. We probably don't need
     894# include_leaf=True in the block hash tree or ciphertext hash tree.
     895
     896# TODO: we ask for ciphertext hash tree nodes from all shares (whenever
     897# _desire is called while we're missing those nodes), but we only consume it
     898# from the first response, leaving the rest of the data sitting in _received.
     899# This was ameliorated by clearing self._received after each block is
     900# complete.
  • src/allmydata/test/test_cli.py

    diff --git a/src/allmydata/test/test_cli.py b/src/allmydata/test/test_cli.py
    index db5bf5f..2453126 100644
    a b class Errors(GridTestMixin, CLITestMixin, unittest.TestCase): 
    23032303        # the download is abandoned as soon as it's clear that we won't get
    23042304        # enough shares. The one remaining share might be in either the
    23052305        # COMPLETE or the PENDING state.
    2306         in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3"
    2307         in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3"
     2306        in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
     2307        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3"
    23082308
    23092309        d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
    23102310        def _check1((rc, out, err)):
  • src/allmydata/test/test_download.py

    diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py
    index 71a556b..40f0d62 100644
    a b from allmydata.test.no_network import GridTestMixin 
    1515from allmydata.test.common import ShouldFailMixin
    1616from allmydata.interfaces import NotEnoughSharesError, NoSharesError
    1717from allmydata.immutable.downloader.common import BadSegmentNumberError, \
    18      BadCiphertextHashError, DownloadStopped
     18     BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
    1919from allmydata.immutable.downloader.status import DownloadStatus
     20from allmydata.immutable.downloader.fetcher import SegmentFetcher
    2021from allmydata.codec import CRSDecoder
    2122from foolscap.eventual import fireEventually, flushEventualQueue
    2223
    class DownloadTest(_Base, unittest.TestCase): 
    295296            # shares
    296297            servers = []
    297298            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
    298             self.failUnlessEqual(shares, [0,1,2])
     299            self.failUnlessEqual(shares, [0,1,2,3])
    299300            # break the RIBucketReader references
    300301            for s in self.n._cnode._node._shares:
    301302                s._rref.broken = True
    class DownloadTest(_Base, unittest.TestCase): 
    318319            self.failUnlessEqual("".join(c.chunks), plaintext)
    319320            shares = sorted([s._shnum for s in self.n._cnode._node._shares])
    320321            # we should now be using more shares than we were before
    321             self.failIfEqual(shares, [0,1,2])
     322            self.failIfEqual(shares, [0,1,2,3])
    322323        d.addCallback(_check_failover)
    323324        return d
    324325
    class DownloadTest(_Base, unittest.TestCase): 
    539540            def _con1_should_not_succeed(res):
    540541                self.fail("the first read should not have succeeded")
    541542            def _con1_failed(f):
    542                 self.failUnless(f.check(NotEnoughSharesError))
     543                self.failUnless(f.check(NoSharesError))
    543544                con2.producer.stopProducing()
    544545                return d2
    545546            d.addCallbacks(_con1_should_not_succeed, _con1_failed)
    class DownloadTest(_Base, unittest.TestCase): 
    583584            def _con1_should_not_succeed(res):
    584585                self.fail("the first read should not have succeeded")
    585586            def _con1_failed(f):
    586                 self.failUnless(f.check(NotEnoughSharesError))
     587                self.failUnless(f.check(NoSharesError))
    587588                # we *don't* cancel the second one here: this exercises a
    588589                # lost-progress bug from #1154. We just wait for it to
    589590                # succeed.
    class Corruption(_Base, unittest.TestCase): 
    11211122                # All these tests result in a failed download.
    11221123                d.addCallback(self._corrupt_flip_all, imm_uri, i)
    11231124                d.addCallback(lambda ign:
    1124                               self.shouldFail(NotEnoughSharesError, which,
     1125                              self.shouldFail(NoSharesError, which,
    11251126                                              substring,
    11261127                                              _download, imm_uri))
    11271128                d.addCallback(lambda ign: self.restore_all_shares(self.shares))
    class Status(unittest.TestCase): 
    12571258        e2.update(1000, 2.0, 2.0)
    12581259        e2.finished(now+5)
    12591260        self.failUnlessEqual(ds.get_progress(), 1.0)
     1261
     1262class MyShare:
     1263    def __init__(self, shnum, peerid, rtt):
     1264        self._shnum = shnum
     1265        self._peerid = peerid
     1266        self._peerid_s = peerid
     1267        self._dyhb_rtt = rtt
     1268    def __repr__(self):
     1269        return "sh%d-on-%s" % (self._shnum, self._peerid)
     1270
     1271class MySegmentFetcher(SegmentFetcher):
     1272    def __init__(self, *args, **kwargs):
     1273        SegmentFetcher.__init__(self, *args, **kwargs)
     1274        self._test_start_shares = []
     1275    def _start_share(self, share, shnum):
     1276        self._test_start_shares.append(share)
     1277
     1278class FakeNode:
     1279    def __init__(self):
     1280        self.want_more = 0
     1281        self.failed = None
     1282        self.processed = None
     1283        self._si_prefix = "si_prefix"
     1284    def want_more_shares(self):
     1285        self.want_more += 1
     1286    def fetch_failed(self, fetcher, f):
     1287        self.failed = f
     1288    def process_blocks(self, segnum, blocks):
     1289        self.processed = (segnum, blocks)
     1290    def get_num_segments(self):
     1291        return 1, True
     1292
     1293class Selection(unittest.TestCase):
     1294    def test_no_shares(self):
     1295        node = FakeNode()
     1296        sf = SegmentFetcher(node, 0, 3, None)
     1297        sf.add_shares([])
     1298        d = flushEventualQueue()
     1299        def _check1(ign):
     1300            self.failUnlessEqual(node.want_more, 1)
     1301            self.failUnlessEqual(node.failed, None)
     1302            sf.no_more_shares()
     1303            return flushEventualQueue()
     1304        d.addCallback(_check1)
     1305        def _check2(ign):
     1306            self.failUnless(node.failed)
     1307            self.failUnless(node.failed.check(NoSharesError))
     1308        d.addCallback(_check2)
     1309        return d
     1310
     1311    def test_only_one_share(self):
     1312        node = FakeNode()
     1313        sf = MySegmentFetcher(node, 0, 3, None)
     1314        shares = [MyShare(0, "peer-A", 0.0)]
     1315        sf.add_shares(shares)
     1316        d = flushEventualQueue()
     1317        def _check1(ign):
     1318            self.failUnlessEqual(node.want_more, 1)
     1319            self.failUnlessEqual(node.failed, None)
     1320            sf.no_more_shares()
     1321            return flushEventualQueue()
     1322        d.addCallback(_check1)
     1323        def _check2(ign):
     1324            self.failUnless(node.failed)
     1325            self.failUnless(node.failed.check(NotEnoughSharesError))
     1326            self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
     1327                              str(node.failed))
     1328        d.addCallback(_check2)
     1329        return d
     1330
     1331    def test_good_diversity_early(self):
     1332        node = FakeNode()
     1333        sf = MySegmentFetcher(node, 0, 3, None)
     1334        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
     1335        sf.add_shares(shares)
     1336        d = flushEventualQueue()
     1337        def _check1(ign):
     1338            self.failUnlessEqual(node.want_more, 0)
     1339            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1340            for sh in sf._test_start_shares:
     1341                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1342                                           "block-%d" % sh._shnum)
     1343            return flushEventualQueue()
     1344        d.addCallback(_check1)
     1345        def _check2(ign):
     1346            self.failIfEqual(node.processed, None)
     1347            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1348                                                      1: "block-1",
     1349                                                      2: "block-2"}) )
     1350        d.addCallback(_check2)
     1351        return d
     1352
     1353    def test_good_diversity_late(self):
     1354        node = FakeNode()
     1355        sf = MySegmentFetcher(node, 0, 3, None)
     1356        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
     1357        sf.add_shares([])
     1358        d = flushEventualQueue()
     1359        def _check1(ign):
     1360            self.failUnlessEqual(node.want_more, 1)
     1361            sf.add_shares(shares)
     1362            return flushEventualQueue()
     1363        d.addCallback(_check1)
     1364        def _check2(ign):
     1365            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1366            for sh in sf._test_start_shares:
     1367                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1368                                           "block-%d" % sh._shnum)
     1369            return flushEventualQueue()
     1370        d.addCallback(_check2)
     1371        def _check3(ign):
     1372            self.failIfEqual(node.processed, None)
     1373            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1374                                                      1: "block-1",
     1375                                                      2: "block-2"}) )
     1376        d.addCallback(_check3)
     1377        return d
     1378
     1379    def test_avoid_bad_diversity_late(self):
     1380        node = FakeNode()
     1381        sf = MySegmentFetcher(node, 0, 3, None)
     1382        # we could satisfy the read entirely from the first server, but we'd
     1383        # prefer not to. Instead, we expect to only pull one share from the
     1384        # first server
     1385        shares = [MyShare(0, "peer-A", 0.0),
     1386                  MyShare(1, "peer-A", 0.0),
     1387                  MyShare(2, "peer-A", 0.0),
     1388                  MyShare(3, "peer-B", 1.0),
     1389                  MyShare(4, "peer-C", 2.0),
     1390                  ]
     1391        sf.add_shares([])
     1392        d = flushEventualQueue()
     1393        def _check1(ign):
     1394            self.failUnlessEqual(node.want_more, 1)
     1395            sf.add_shares(shares)
     1396            return flushEventualQueue()
     1397        d.addCallback(_check1)
     1398        def _check2(ign):
     1399            self.failUnlessEqual(sf._test_start_shares,
     1400                                 [shares[0], shares[3], shares[4]])
     1401            for sh in sf._test_start_shares:
     1402                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1403                                           "block-%d" % sh._shnum)
     1404            return flushEventualQueue()
     1405        d.addCallback(_check2)
     1406        def _check3(ign):
     1407            self.failIfEqual(node.processed, None)
     1408            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1409                                                      3: "block-3",
     1410                                                      4: "block-4"}) )
     1411        d.addCallback(_check3)
     1412        return d
     1413
     1414    def test_suffer_bad_diversity_late(self):
     1415        node = FakeNode()
     1416        sf = MySegmentFetcher(node, 0, 3, None)
     1417        # we satisfy the read entirely from the first server because we don't
     1418        # have any other choice.
     1419        shares = [MyShare(0, "peer-A", 0.0),
     1420                  MyShare(1, "peer-A", 0.0),
     1421                  MyShare(2, "peer-A", 0.0),
     1422                  MyShare(3, "peer-A", 0.0),
     1423                  MyShare(4, "peer-A", 0.0),
     1424                  ]
     1425        sf.add_shares([])
     1426        d = flushEventualQueue()
     1427        def _check1(ign):
     1428            self.failUnlessEqual(node.want_more, 1)
     1429            sf.add_shares(shares)
     1430            return flushEventualQueue()
     1431        d.addCallback(_check1)
     1432        def _check2(ign):
     1433            self.failUnlessEqual(node.want_more, 3)
     1434            self.failUnlessEqual(sf._test_start_shares,
     1435                                 [shares[0], shares[1], shares[2]])
     1436            for sh in sf._test_start_shares:
     1437                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1438                                           "block-%d" % sh._shnum)
     1439            return flushEventualQueue()
     1440        d.addCallback(_check2)
     1441        def _check3(ign):
     1442            self.failIfEqual(node.processed, None)
     1443            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1444                                                      1: "block-1",
     1445                                                      2: "block-2"}) )
     1446        d.addCallback(_check3)
     1447        return d
     1448
     1449    def test_suffer_bad_diversity_early(self):
     1450        node = FakeNode()
     1451        sf = MySegmentFetcher(node, 0, 3, None)
     1452        # we satisfy the read entirely from the first server because we don't
     1453        # have any other choice.
     1454        shares = [MyShare(0, "peer-A", 0.0),
     1455                  MyShare(1, "peer-A", 0.0),
     1456                  MyShare(2, "peer-A", 0.0),
     1457                  MyShare(3, "peer-A", 0.0),
     1458                  MyShare(4, "peer-A", 0.0),
     1459                  ]
     1460        sf.add_shares(shares)
     1461        d = flushEventualQueue()
     1462        def _check1(ign):
     1463            self.failUnlessEqual(node.want_more, 2)
     1464            self.failUnlessEqual(sf._test_start_shares,
     1465                                 [shares[0], shares[1], shares[2]])
     1466            for sh in sf._test_start_shares:
     1467                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1468                                           "block-%d" % sh._shnum)
     1469            return flushEventualQueue()
     1470        d.addCallback(_check1)
     1471        def _check2(ign):
     1472            self.failIfEqual(node.processed, None)
     1473            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1474                                                      1: "block-1",
     1475                                                      2: "block-2"}) )
     1476        d.addCallback(_check2)
     1477        return d
     1478
     1479    def test_overdue(self):
     1480        node = FakeNode()
     1481        sf = MySegmentFetcher(node, 0, 3, None)
     1482        shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
     1483        sf.add_shares(shares)
     1484        d = flushEventualQueue()
     1485        def _check1(ign):
     1486            self.failUnlessEqual(node.want_more, 0)
     1487            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1488            for sh in sf._test_start_shares:
     1489                sf._block_request_activity(sh, sh._shnum, OVERDUE)
     1490            return flushEventualQueue()
     1491        d.addCallback(_check1)
     1492        def _check2(ign):
     1493            self.failUnlessEqual(sf._test_start_shares, shares[:6])
     1494            for sh in sf._test_start_shares[3:]:
     1495                sf._block_request_activity(sh, sh._shnum, COMPLETE,
     1496                                           "block-%d" % sh._shnum)
     1497            return flushEventualQueue()
     1498        d.addCallback(_check2)
     1499        def _check3(ign):
     1500            self.failIfEqual(node.processed, None)
     1501            self.failUnlessEqual(node.processed, (0, {3: "block-3",
     1502                                                      4: "block-4",
     1503                                                      5: "block-5"}) )
     1504        d.addCallback(_check3)
     1505        return d
     1506
     1507    def test_overdue_fails(self):
     1508        node = FakeNode()
     1509        sf = MySegmentFetcher(node, 0, 3, None)
     1510        shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
     1511        sf.add_shares(shares)
     1512        sf.no_more_shares()
     1513        d = flushEventualQueue()
     1514        def _check1(ign):
     1515            self.failUnlessEqual(node.want_more, 0)
     1516            self.failUnlessEqual(sf._test_start_shares, shares[:3])
     1517            for sh in sf._test_start_shares:
     1518                sf._block_request_activity(sh, sh._shnum, OVERDUE)
     1519            return flushEventualQueue()
     1520        d.addCallback(_check1)
     1521        def _check2(ign):
     1522            self.failUnlessEqual(sf._test_start_shares, shares[:6])
     1523            for sh in sf._test_start_shares[3:]:
     1524                sf._block_request_activity(sh, sh._shnum, DEAD)
     1525            return flushEventualQueue()
     1526        d.addCallback(_check2)
     1527        def _check3(ign):
     1528            # we're still waiting
     1529            self.failUnlessEqual(node.processed, None)
     1530            self.failUnlessEqual(node.failed, None)
     1531            # now complete one of the overdue ones, and kill one of the other
     1532            # ones, leaving one hanging. This should trigger a failure, since
     1533            # we cannot succeed.
     1534            live = sf._test_start_shares[0]
     1535            die = sf._test_start_shares[1]
     1536            sf._block_request_activity(live, live._shnum, COMPLETE, "block")
     1537            sf._block_request_activity(die, die._shnum, DEAD)
     1538            return flushEventualQueue()
     1539        d.addCallback(_check3)
     1540        def _check4(ign):
     1541            self.failUnless(node.failed)
     1542            self.failUnless(node.failed.check(NotEnoughSharesError))
     1543            self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
     1544                              str(node.failed))
     1545        d.addCallback(_check4)
     1546        return d
     1547
     1548    def test_avoid_redundancy(self):
     1549        node = FakeNode()
     1550        sf = MySegmentFetcher(node, 0, 3, None)
     1551        # we could satisfy the read entirely from the first server, but we'd
     1552        # prefer not to. Instead, we expect to only pull one share from the
     1553        # first server
     1554        shares = [MyShare(0, "peer-A", 0.0),
     1555                  MyShare(1, "peer-B", 1.0),
     1556                  MyShare(0, "peer-C", 2.0), # this will be skipped
     1557                  MyShare(1, "peer-D", 3.0),
     1558                  MyShare(2, "peer-E", 4.0),
     1559                  ]
     1560        sf.add_shares(shares[:3])
     1561        d = flushEventualQueue()
     1562        def _check1(ign):
     1563            self.failUnlessEqual(node.want_more, 1)
     1564            self.failUnlessEqual(sf._test_start_shares,
     1565                                 [shares[0], shares[1]])
     1566            # allow sh1 to retire
     1567            sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
     1568            return flushEventualQueue()
     1569        d.addCallback(_check1)
     1570        def _check2(ign):
     1571            # and then feed in the remaining shares
     1572            sf.add_shares(shares[3:])
     1573            sf.no_more_shares()
     1574            return flushEventualQueue()
     1575        d.addCallback(_check2)
     1576        def _check3(ign):
     1577            self.failUnlessEqual(sf._test_start_shares,
     1578                                 [shares[0], shares[1], shares[4]])
     1579            sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
     1580            sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
     1581            return flushEventualQueue()
     1582        d.addCallback(_check3)
     1583        def _check4(ign):
     1584            self.failIfEqual(node.processed, None)
     1585            self.failUnlessEqual(node.processed, (0, {0: "block-0",
     1586                                                      1: "block-1",
     1587                                                      2: "block-2"}) )
     1588        d.addCallback(_check4)
     1589        return d
  • src/allmydata/test/test_immutable.py

    diff --git a/src/allmydata/test/test_immutable.py b/src/allmydata/test/test_immutable.py
    index 288332d..511a865 100644
    a b class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase) 
    5252        def _after_download(unused=None):
    5353            after_download_reads = self._count_reads()
    5454            #print before_download_reads, after_download_reads
    55             self.failIf(after_download_reads-before_download_reads > 36,
     55            self.failIf(after_download_reads-before_download_reads > 41,
    5656                        (after_download_reads, before_download_reads))
    5757        d.addCallback(self._download_and_check_plaintext)
    5858        d.addCallback(_after_download)
  • src/allmydata/test/test_web.py

    diff --git a/src/allmydata/test/test_web.py b/src/allmydata/test/test_web.py
    index 3008046..f68e98d 100644
    a b class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi 
    42594259        def _check_one_share(body):
    42604260            self.failIf("<html>" in body, body)
    42614261            body = " ".join(body.strip().split())
    4262             msg = ("NotEnoughSharesError: This indicates that some "
    4263                    "servers were unavailable, or that shares have been "
    4264                    "lost to server departure, hard drive failure, or disk "
    4265                    "corruption. You should perform a filecheck on "
    4266                    "this object to learn more. The full error message is:"
    4267                    " ran out of shares: %d complete, %d pending, 0 overdue,"
    4268                    " 0 unused, need 3. Last failure: None")
    4269             msg1 = msg % (1, 0)
    4270             msg2 = msg % (0, 1)
     4262            msgbase = ("NotEnoughSharesError: This indicates that some "
     4263                       "servers were unavailable, or that shares have been "
     4264                       "lost to server departure, hard drive failure, or disk "
     4265                       "corruption. You should perform a filecheck on "
     4266                       "this object to learn more. The full error message is:"
     4267                       )
     4268            msg1 = msgbase + (" ran out of shares:"
     4269                              " complete=sh0"
     4270                              " pending="
     4271                              " overdue= unused= need 3. Last failure: None")
     4272            msg2 = msgbase + (" ran out of shares:"
     4273                              " complete="
     4274                              " pending=Share(sh0-on-xgru5)"
     4275                              " overdue= unused= need 3. Last failure: None")
    42714276            self.failUnless(body == msg1 or body == msg2, body)
    42724277        d.addCallback(_check_one_share)
    42734278