source: trunk/src/allmydata/immutable/downloader/fetcher.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 11.7 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from twisted.python.failure import Failure
6from foolscap.api import eventually
7from allmydata.interfaces import NotEnoughSharesError, NoSharesError
8from allmydata.util import log
9from allmydata.util.dictutil import DictOfSets
10from .common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
11     BadSegmentNumberError
12
13class SegmentFetcher(object):
14    """I am responsible for acquiring blocks for a single segment. I will use
15    the Share instances passed to my add_shares() method to locate, retrieve,
16    and validate those blocks. I expect my parent node to call my
17    no_more_shares() method when there are no more shares available. I will
18    call my parent's want_more_shares() method when I want more: I expect to
19    see at least one call to add_shares or no_more_shares afterwards.
20
21    When I have enough validated blocks, I will call my parent's
22    process_blocks() method with a dictionary that maps shnum to blockdata.
23    If I am unable to provide enough blocks, I will call my parent's
24    fetch_failed() method with (self, f). After either of these events, I
25    will shut down and do no further work. My parent can also call my stop()
26    method to have me shut down early."""
27
28    def __init__(self, node, segnum, k, logparent):
29        self._node = node # _Node
30        self.segnum = segnum
31        self._k = k
32        self._shares = [] # unused Share instances, sorted by "goodness"
33                          # (RTT), then shnum. This is populated when DYHB
34                          # responses arrive, or (for later segments) at
35                          # startup. We remove shares from it when we call
36                          # sh.get_block() on them.
37        self._shares_from_server = DictOfSets() # maps server to set of
38                                                # Shares on that server for
39                                                # which we have outstanding
40                                                # get_block() calls.
41        self._max_shares_per_server = 1 # how many Shares we're allowed to
42                                        # pull from each server. This starts
43                                        # at 1 and grows if we don't have
44                                        # sufficient diversity.
45        self._active_share_map = {} # maps shnum to outstanding (and not
46                                    # OVERDUE) Share that provides it.
47        self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
48        self._lp = logparent
49        self._share_observers = {} # maps Share to EventStreamObserver for
50                                   # active ones
51        self._blocks = {} # maps shnum to validated block data
52        self._no_more_shares = False
53        self._last_failure = None
54        self._running = True
55
56    def stop(self):
57        if self._running:
58            log.msg("SegmentFetcher(%r).stop" % self._node._si_prefix,
59                    level=log.NOISY, parent=self._lp, umid="LWyqpg")
60            self._cancel_all_requests()
61            self._running = False
62            # help GC ???
63            del self._shares, self._shares_from_server, self._active_share_map
64            del self._share_observers
65
66
67    # called by our parent _Node
68
69    def add_shares(self, shares):
70        # called when ShareFinder locates a new share, and when a non-initial
71        # segment fetch is started and we already know about shares from the
72        # previous segment
73        self._shares.extend(shares)
74        self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
75        eventually(self.loop)
76
77    def no_more_shares(self):
78        # ShareFinder tells us it's reached the end of its list
79        self._no_more_shares = True
80        eventually(self.loop)
81
82    # internal methods
83
84    def loop(self):
85        try:
86            # if any exception occurs here, kill the download
87            self._do_loop()
88        except BaseException:
89            self._node.fetch_failed(self, Failure())
90            raise
91
92    def _do_loop(self):
93        k = self._k
94        if not self._running:
95            return
96        numsegs, authoritative = self._node.get_num_segments()
97        if authoritative and self.segnum >= numsegs:
98            # oops, we were asking for a segment number beyond the end of the
99            # file. This is an error.
100            self.stop()
101            e = BadSegmentNumberError("segnum=%d, numsegs=%d" %
102                                      (self.segnum, self._node.num_segments))
103            f = Failure(e)
104            self._node.fetch_failed(self, f)
105            return
106
107        #print("LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares)
108        # Should we sent out more requests?
109        while len(set(self._blocks.keys())
110                  | set(self._active_share_map.keys())
111                  ) < k:
112            # we don't have data or active requests for enough shares. Are
113            # there any unused shares we can start using?
114            (sent_something, want_more_diversity) = self._find_and_use_share()
115            if sent_something:
116                # great. loop back around in case we need to send more.
117                continue
118            if want_more_diversity:
119                # we could have sent something if we'd been allowed to pull
120                # more shares per server. Increase the limit and try again.
121                self._max_shares_per_server += 1
122                log.msg("SegmentFetcher(%r) increasing diversity limit to %d"
123                        % (self._node._si_prefix, self._max_shares_per_server),
124                        level=log.NOISY, umid="xY2pBA")
125                # Also ask for more shares, in the hopes of achieving better
126                # diversity for the next segment.
127                self._ask_for_more_shares()
128                continue
129            # we need more shares than the ones in self._shares to make
130            # progress
131            self._ask_for_more_shares()
132            if self._no_more_shares:
133                # But there are no more shares to be had. If we're going to
134                # succeed, it will be with the shares we've already seen.
135                # Will they be enough?
136                if len(set(self._blocks.keys())
137                       | set(self._active_share_map.keys())
138                       | set(self._overdue_share_map.keys())
139                       ) < k:
140                    # nope. bail.
141                    self._no_shares_error() # this calls self.stop()
142                    return
143                # our outstanding or overdue requests may yet work.
144            # more shares may be coming. Wait until then.
145            return
146
147        # are we done?
148        if len(set(self._blocks.keys())) >= k:
149            # yay!
150            self.stop()
151            self._node.process_blocks(self.segnum, self._blocks)
152            return
153
154    def _no_shares_error(self):
155        if not (self._shares or self._active_share_map or
156                self._overdue_share_map or self._blocks):
157            format = ("no shares (need %(k)d)."
158                      " Last failure: %(last_failure)s")
159            args = { "k": self._k,
160                     "last_failure": self._last_failure }
161            error = NoSharesError
162        else:
163            format = ("ran out of shares: complete=%(complete)s"
164                      " pending=%(pending)s overdue=%(overdue)s"
165                      " unused=%(unused)s need %(k)d."
166                      " Last failure: %(last_failure)s")
167            def join(shnums): return ",".join(["sh%d" % shnum
168                                               for shnum in sorted(shnums)])
169            pending_s = ",".join([str(sh)
170                                  for sh in self._active_share_map.values()])
171            overdue = set()
172            for shares in self._overdue_share_map.values():
173                overdue |= shares
174            overdue_s = ",".join([str(sh) for sh in overdue])
175            args = {"complete": join(self._blocks.keys()),
176                    "pending": pending_s,
177                    "overdue": overdue_s,
178                    # 'unused' should be zero
179                    "unused": ",".join([str(sh) for sh in self._shares]),
180                    "k": self._k,
181                    "last_failure": self._last_failure,
182                    }
183            error = NotEnoughSharesError
184        log.msg(format=format,
185                level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
186                **args)
187        e = error(format % args)
188        f = Failure(e)
189        self.stop()
190        self._node.fetch_failed(self, f)
191
192    def _find_and_use_share(self):
193        sent_something = False
194        want_more_diversity = False
195        for sh in self._shares: # find one good share to fetch
196            shnum = sh._shnum ; server = sh._server # XXX
197            if shnum in self._blocks:
198                continue # don't request data we already have
199            if shnum in self._active_share_map:
200                # note: OVERDUE shares are removed from _active_share_map
201                # and added to _overdue_share_map instead.
202                continue # don't send redundant requests
203            sfs = self._shares_from_server
204            if len(sfs.get(server,set())) >= self._max_shares_per_server:
205                # don't pull too much from a single server
206                want_more_diversity = True
207                continue
208            # ok, we can use this share
209            self._shares.remove(sh)
210            self._active_share_map[shnum] = sh
211            self._shares_from_server.add(server, sh)
212            self._start_share(sh, shnum)
213            sent_something = True
214            break
215        return (sent_something, want_more_diversity)
216
217    def _start_share(self, share, shnum):
218        self._share_observers[share] = o = share.get_block(self.segnum)
219        o.subscribe(self._block_request_activity, share=share, shnum=shnum)
220
221    def _ask_for_more_shares(self):
222        if not self._no_more_shares:
223            self._node.want_more_shares()
224            # that will trigger the ShareFinder to keep looking, and call our
225            # add_shares() or no_more_shares() later.
226
227    def _cancel_all_requests(self):
228        for o in list(self._share_observers.values()):
229            o.cancel()
230        self._share_observers = {}
231
232    def _block_request_activity(self, share, shnum, state, block=None, f=None):
233        # called by Shares, in response to our s.send_request() calls.
234        if not self._running:
235            return
236        log.msg("SegmentFetcher(%r)._block_request_activity: %s -> %r" %
237                (self._node._si_prefix, repr(share), state),
238                level=log.NOISY, parent=self._lp, umid="vilNWA")
239        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
240        # from all our tracking lists.
241        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
242            self._share_observers.pop(share, None)
243            server = share._server # XXX
244            self._shares_from_server.discard(server, share)
245            if self._active_share_map.get(shnum) is share:
246                del self._active_share_map[shnum]
247            self._overdue_share_map.discard(shnum, share)
248
249        if state is COMPLETE:
250            # 'block' is fully validated and complete
251            self._blocks[shnum] = block
252
253        if state is OVERDUE:
254            # no longer active, but still might complete
255            del self._active_share_map[shnum]
256            self._overdue_share_map.add(shnum, share)
257            # OVERDUE is not terminal: it will eventually transition to
258            # COMPLETE, CORRUPT, or DEAD.
259
260        if state is DEAD:
261            self._last_failure = f
262        if state is BADSEGNUM:
263            # our main loop will ask the DownloadNode each time for the
264            # number of segments, so we'll deal with this in the top of
265            # _do_loop
266            pass
267
268        eventually(self.loop)
Note: See TracBrowser for help on using the repository browser.