source: trunk/src/allmydata/immutable/downloader/segmentation.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 6.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import time
6now = time.time
7from zope.interface import implementer
8from twisted.internet import defer
9from twisted.internet.interfaces import IPushProducer
10from foolscap.api import eventually
11from allmydata.util import log
12from allmydata.util.spans import overlap
13from allmydata.interfaces import DownloadStopped
14
15from .common import BadSegmentNumberError, WrongSegmentError
16
17@implementer(IPushProducer)
18class Segmentation(object):
19    """I am responsible for a single offset+size read of the file. I handle
20    segmentation: I figure out which segments are necessary, request them
21    (from my CiphertextDownloader) in order, and trim the segments down to
22    match the offset+size span. I use the Producer/Consumer interface to only
23    request one segment at a time.
24    """
25    def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
26        self._node = node
27        self._hungry = True
28        self._active_segnum = None
29        self._cancel_segment_request = None
30        # these are updated as we deliver data. At any given time, we still
31        # want to download file[offset:offset+size]
32        self._offset = offset
33        self._size = size
34        assert offset+size <= node._verifycap.size
35        self._consumer = consumer
36        self._read_ev = read_ev
37        self._start_pause = None
38        self._lp = logparent
39
40    def start(self):
41        self._alive = True
42        self._deferred = defer.Deferred()
43        self._deferred.addBoth(self._done)
44        self._consumer.registerProducer(self, True)
45        self._maybe_fetch_next()
46        return self._deferred
47
48    def _done(self, res):
49        self._consumer.unregisterProducer()
50        return res
51
52    def _maybe_fetch_next(self):
53        if not self._alive or not self._hungry:
54            return
55        if self._active_segnum is not None:
56            return
57        self._fetch_next()
58
59    def _fetch_next(self):
60        if self._size == 0:
61            # done!
62            self._alive = False
63            self._hungry = False
64            self._deferred.callback(self._consumer)
65            return
66        n = self._node
67        have_actual_segment_size = n.segment_size is not None
68        guess_s = ""
69        if not have_actual_segment_size:
70            guess_s = "probably "
71        segment_size = n.segment_size or n.guessed_segment_size
72        if self._offset == 0:
73            # great! we want segment0 for sure
74            wanted_segnum = 0
75        else:
76            # this might be a guess
77            wanted_segnum = self._offset // segment_size
78        log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
79                offset=self._offset, guess=guess_s, segnum=wanted_segnum,
80                level=log.NOISY, parent=self._lp, umid="5WfN0w")
81        self._active_segnum = wanted_segnum
82        d,c = n.get_segment(wanted_segnum, self._lp)
83        self._cancel_segment_request = c
84        d.addBoth(self._request_retired)
85        d.addCallback(self._got_segment, wanted_segnum)
86        if not have_actual_segment_size:
87            # we can retry once
88            d.addErrback(self._retry_bad_segment)
89        d.addErrback(self._error)
90
91    def _request_retired(self, res):
92        self._active_segnum = None
93        self._cancel_segment_request = None
94        return res
95
96    def _got_segment(self, segment_args, wanted_segnum):
97        (segment_start, segment, decodetime) = segment_args
98        self._cancel_segment_request = None
99        # we got file[segment_start:segment_start+len(segment)]
100        # we want file[self._offset:self._offset+self._size]
101        log.msg(format="Segmentation got data:"
102                " want [%(wantstart)d-%(wantend)d),"
103                " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
104                wantstart=self._offset, wantend=self._offset+self._size,
105                segstart=segment_start, segend=segment_start+len(segment),
106                segnum=wanted_segnum,
107                level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
108
109        o = overlap(segment_start, len(segment),  self._offset, self._size)
110        # the overlap is file[o[0]:o[0]+o[1]]
111        if not o or o[0] != self._offset:
112            # we didn't get the first byte, so we can't use this segment
113            log.msg("Segmentation handed wrong data:"
114                    " want [%d-%d), given [%d-%d), for segnum=%d,"
115                    " for si=%r"
116                    % (self._offset, self._offset+self._size,
117                       segment_start, segment_start+len(segment),
118                       wanted_segnum, self._node._si_prefix),
119                    level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
120            # we may retry if the segnum we asked was based on a guess
121            raise WrongSegmentError("I was given the wrong data.")
122        offset_in_segment = self._offset - segment_start
123        desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
124
125        self._offset += len(desired_data)
126        self._size -= len(desired_data)
127        self._consumer.write(desired_data)
128        # the consumer might call our .pauseProducing() inside that write()
129        # call, setting self._hungry=False
130        self._read_ev.update(len(desired_data), 0, 0)
131        # note: filenode.DecryptingConsumer is responsible for calling
132        # _read_ev.update with how much decrypt_time was consumed
133        self._maybe_fetch_next()
134
135    def _retry_bad_segment(self, f):
136        f.trap(WrongSegmentError, BadSegmentNumberError)
137        # we guessed the segnum wrong: either one that doesn't overlap with
138        # the start of our desired region, or one that's beyond the end of
139        # the world. Now that we have the right information, we're allowed to
140        # retry once.
141        assert self._node.segment_size is not None
142        return self._maybe_fetch_next()
143
144    def _error(self, f):
145        log.msg("Error in Segmentation", failure=f,
146                level=log.WEIRD, parent=self._lp, umid="EYlXBg")
147        self._alive = False
148        self._hungry = False
149        self._deferred.errback(f)
150
151    def stopProducing(self):
152        log.msg("asked to stopProducing",
153                level=log.NOISY, parent=self._lp, umid="XIyL9w")
154        self._hungry = False
155        self._alive = False
156        # cancel any outstanding segment request
157        if self._cancel_segment_request:
158            self._cancel_segment_request.cancel()
159            self._cancel_segment_request = None
160        e = DownloadStopped("our Consumer called stopProducing()")
161        self._deferred.errback(e)
162
163    def pauseProducing(self):
164        self._hungry = False
165        self._start_pause = now()
166    def resumeProducing(self):
167        self._hungry = True
168        eventually(self._maybe_fetch_next)
169        if self._start_pause is not None:
170            paused = now() - self._start_pause
171            self._read_ev.update(0, 0, paused)
172            self._start_pause = None
Note: See TracBrowser for help on using the repository browser.