source: trunk/src/allmydata/immutable/downloader/finder.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 9.1 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from six import ensure_str
6
7import time
8now = time.time
9from foolscap.api import eventually
10from allmydata.util import base32, log
11from twisted.internet import reactor
12
13from .share import Share, CommonShare
14
15def incidentally(res, f, *args, **kwargs):
16    """Add me to a Deferred chain like this:
17     d.addBoth(incidentally, func, arg)
18    and I'll behave as if you'd added the following function:
19     def _(res):
20         func(arg)
21         return res
22    This is useful if you want to execute an expression when the Deferred
23    fires, but don't care about its value.
24    """
25    f(*args, **kwargs)
26    return res
27
28class RequestToken(object):
29    def __init__(self, server):
30        self.server = server
31
32class ShareFinder(object):
33    OVERDUE_TIMEOUT = 10.0
34
35    def __init__(self, storage_broker, verifycap, node, download_status,
36                 logparent=None, max_outstanding_requests=10):
37        self.running = True # stopped by Share.stop, from Terminator
38        self.verifycap = verifycap
39        self._started = False
40        self._storage_broker = storage_broker
41        self.share_consumer = self.node = node
42        self.max_outstanding_requests = max_outstanding_requests
43        self._hungry = False
44
45        self._commonshares = {} # shnum to CommonShare instance
46        self.pending_requests = set()
47        self.overdue_requests = set() # subset of pending_requests
48        self.overdue_timers = {}
49
50        self._storage_index = verifycap.storage_index
51        self._si_prefix = base32.b2a(self._storage_index[:8])[:12]
52        self._node_logparent = logparent
53        self._download_status = download_status
54        self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
55                           si=self._si_prefix,
56                           level=log.NOISY, parent=logparent, umid="2xjj2A")
57
58    def update_num_segments(self):
59        (numsegs, authoritative) = self.node.get_num_segments()
60        assert authoritative
61        for cs in self._commonshares.values():
62            cs.set_authoritative_num_segments(numsegs)
63
64    def start_finding_servers(self):
65        # don't get servers until somebody uses us: creating the
66        # ImmutableFileNode should not cause work to happen yet. Test case is
67        # test_dirnode, which creates us with storage_broker=None
68        if not self._started:
69            si = self.verifycap.storage_index
70            servers = self._storage_broker.get_servers_for_psi(si)
71            self._servers = iter(servers)
72            self._started = True
73
74    def log(self, *args, **kwargs):
75        if "parent" not in kwargs:
76            kwargs["parent"] = self._lp
77        return log.msg(*args, **kwargs)
78
79    def stop(self):
80        self.running = False
81        while self.overdue_timers:
82            req,t = self.overdue_timers.popitem()
83            t.cancel()
84
85    # called by our parent CiphertextDownloader
86    def hungry(self):
87        self.log(format="ShareFinder[si=%(si)s] hungry",
88                 si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
89        self.start_finding_servers()
90        self._hungry = True
91        eventually(self.loop)
92
93    # internal methods
94    def loop(self):
95        pending_s = ",".join([ensure_str(rt.server.get_name())
96                              for rt in self.pending_requests]) # sort?
97        self.log(format="ShareFinder loop: running=%(running)s"
98                 " hungry=%(hungry)s, pending=%(pending)s",
99                 running=self.running, hungry=self._hungry, pending=pending_s,
100                 level=log.NOISY, umid="kRtS4Q")
101        if not self.running:
102            return
103        if not self._hungry:
104            return
105
106        non_overdue = self.pending_requests - self.overdue_requests
107        if len(non_overdue) >= self.max_outstanding_requests:
108            # cannot send more requests, must wait for some to retire
109            return
110
111        server = None
112        try:
113            if self._servers:
114                server = next(self._servers)
115        except StopIteration:
116            self._servers = None
117
118        if server:
119            self.send_request(server)
120            # we loop again to get parallel queries. The check above will
121            # prevent us from looping forever.
122            eventually(self.loop)
123            return
124
125        if self.pending_requests:
126            # no server, but there are still requests in flight: maybe one of
127            # them will make progress
128            return
129
130        self.log(format="ShareFinder.loop: no_more_shares, ever",
131                 level=log.UNUSUAL, umid="XjQlzg")
132        # we've run out of servers (so we can't send any more requests), and
133        # we have nothing in flight. No further progress can be made. They
134        # are destined to remain hungry.
135        eventually(self.share_consumer.no_more_shares)
136
137    def send_request(self, server):
138        req = RequestToken(server)
139        self.pending_requests.add(req)
140        lp = self.log(format="sending DYHB to [%(name)s]", name=server.get_name(),
141                      level=log.NOISY, umid="Io7pyg")
142        time_sent = now()
143        d_ev = self._download_status.add_dyhb_request(server, time_sent)
144        # TODO: get the timer from a Server object, it knows best
145        self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
146                                                     self.overdue, req)
147        d = server.get_storage_server().get_buckets(self._storage_index)
148        d.addBoth(incidentally, self._request_retired, req)
149        d.addCallbacks(self._got_response, self._got_error,
150                       callbackArgs=(server, req, d_ev, time_sent, lp),
151                       errbackArgs=(server, req, d_ev, lp))
152        d.addErrback(log.err, format="error in send_request",
153                     level=log.WEIRD, parent=lp, umid="rpdV0w")
154        d.addCallback(incidentally, eventually, self.loop)
155
156    def _request_retired(self, req):
157        self.pending_requests.discard(req)
158        self.overdue_requests.discard(req)
159        if req in self.overdue_timers:
160            self.overdue_timers[req].cancel()
161            del self.overdue_timers[req]
162
163    def overdue(self, req):
164        del self.overdue_timers[req]
165        assert req in self.pending_requests # paranoia, should never be false
166        self.overdue_requests.add(req)
167        eventually(self.loop)
168
169    def _got_response(self, buckets, server, req, d_ev, time_sent, lp):
170        shnums = sorted([shnum for shnum in buckets])
171        time_received = now()
172        d_ev.finished(shnums, time_received)
173        dyhb_rtt = time_received - time_sent
174        if not buckets:
175            self.log(format="no shares from [%(name)s]", name=server.get_name(),
176                     level=log.NOISY, parent=lp, umid="U7d4JA")
177            return
178        shnums_s = ",".join([str(shnum) for shnum in shnums])
179        self.log(format="got shnums [%(shnums)s] from [%(name)s]",
180                 shnums=shnums_s, name=server.get_name(),
181                 level=log.NOISY, parent=lp, umid="0fcEZw")
182        shares = []
183        for shnum, bucket in buckets.items():
184            s = self._create_share(shnum, bucket, server, dyhb_rtt)
185            shares.append(s)
186        self._deliver_shares(shares)
187
188    def _create_share(self, shnum, bucket, server, dyhb_rtt):
189        if shnum in self._commonshares:
190            cs = self._commonshares[shnum]
191        else:
192            numsegs, authoritative = self.node.get_num_segments()
193            cs = CommonShare(numsegs, self._si_prefix, shnum,
194                             self._node_logparent)
195            if authoritative:
196                cs.set_authoritative_num_segments(numsegs)
197            # Share._get_satisfaction is responsible for updating
198            # CommonShare.set_numsegs after we know the UEB. Alternatives:
199            #  1: d = self.node.get_num_segments()
200            #     d.addCallback(cs.got_numsegs)
201            #   the problem is that the OneShotObserverList I was using
202            #   inserts an eventual-send between _get_satisfaction's
203            #   _satisfy_UEB and _satisfy_block_hash_tree, and the
204            #   CommonShare didn't get the num_segs message before
205            #   being asked to set block hash values. To resolve this
206            #   would require an immediate ObserverList instead of
207            #   an eventual-send -based one
208            #  2: break _get_satisfaction into Deferred-attached pieces.
209            #     Yuck.
210            self._commonshares[shnum] = cs
211        s = Share(bucket, server, self.verifycap, cs, self.node,
212                  self._download_status, shnum, dyhb_rtt,
213                  self._node_logparent)
214        return s
215
216    def _deliver_shares(self, shares):
217        # they will call hungry() again if they want more
218        self._hungry = False
219        shares_s = ",".join([str(sh) for sh in shares])
220        self.log(format="delivering shares: %s" % shares_s,
221                 level=log.NOISY, umid="2n1qQw")
222        eventually(self.share_consumer.got_shares, shares)
223
224    def _got_error(self, f, server, req, d_ev, lp):
225        d_ev.error(now())
226        self.log(format="got error from [%(name)s]",
227                 name=server.get_name(), failure=f,
228                 level=log.UNUSUAL, parent=lp, umid="zUKdCw")
Note: See TracBrowser for help on using the repository browser.