source: trunk/src/allmydata/immutable/downloader/segmentation.py @ 22a07e9b

Last change on this file since 22a07e9b was 22a07e9b, checked in by Brian Warner <warner@…>, at 2010-08-04T07:26:29Z

Rewrite immutable downloader (#798). This patch adds the new downloader itself.

  • Property mode set to 100644
File size: 6.4 KB
Line 
1
2import time
3now = time.time
4from zope.interface import implements
5from twisted.internet import defer
6from twisted.internet.interfaces import IPushProducer
7from foolscap.api import eventually
8from allmydata.util import log
9from allmydata.util.spans import overlap
10
11from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
12
13class Segmentation:
14    """I am responsible for a single offset+size read of the file. I handle
15    segmentation: I figure out which segments are necessary, request them
16    (from my CiphertextDownloader) in order, and trim the segments down to
17    match the offset+size span. I use the Producer/Consumer interface to only
18    request one segment at a time.
19    """
20    implements(IPushProducer)
21    def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
22        self._node = node
23        self._hungry = True
24        self._active_segnum = None
25        self._cancel_segment_request = None
26        # these are updated as we deliver data. At any given time, we still
27        # want to download file[offset:offset+size]
28        self._offset = offset
29        self._size = size
30        assert offset+size <= node._verifycap.size
31        self._consumer = consumer
32        self._read_ev = read_ev
33        self._start_pause = None
34        self._lp = logparent
35
36    def start(self):
37        self._alive = True
38        self._deferred = defer.Deferred()
39        self._consumer.registerProducer(self, True)
40        self._maybe_fetch_next()
41        return self._deferred
42
43    def _maybe_fetch_next(self):
44        if not self._alive or not self._hungry:
45            return
46        if self._active_segnum is not None:
47            return
48        self._fetch_next()
49
50    def _fetch_next(self):
51        if self._size == 0:
52            # done!
53            self._alive = False
54            self._hungry = False
55            self._consumer.unregisterProducer()
56            self._deferred.callback(self._consumer)
57            return
58        n = self._node
59        have_actual_segment_size = n.segment_size is not None
60        guess_s = ""
61        if not have_actual_segment_size:
62            guess_s = "probably "
63        segment_size = n.segment_size or n.guessed_segment_size
64        if self._offset == 0:
65            # great! we want segment0 for sure
66            wanted_segnum = 0
67        else:
68            # this might be a guess
69            wanted_segnum = self._offset // segment_size
70        log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
71                offset=self._offset, guess=guess_s, segnum=wanted_segnum,
72                level=log.NOISY, parent=self._lp, umid="5WfN0w")
73        self._active_segnum = wanted_segnum
74        d,c = n.get_segment(wanted_segnum, self._lp)
75        self._cancel_segment_request = c
76        d.addBoth(self._request_retired)
77        d.addCallback(self._got_segment, wanted_segnum)
78        if not have_actual_segment_size:
79            # we can retry once
80            d.addErrback(self._retry_bad_segment)
81        d.addErrback(self._error)
82
83    def _request_retired(self, res):
84        self._active_segnum = None
85        self._cancel_segment_request = None
86        return res
87
88    def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
89        self._cancel_segment_request = None
90        # we got file[segment_start:segment_start+len(segment)]
91        # we want file[self._offset:self._offset+self._size]
92        log.msg(format="Segmentation got data:"
93                " want [%(wantstart)d-%(wantend)d),"
94                " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
95                wantstart=self._offset, wantend=self._offset+self._size,
96                segstart=segment_start, segend=segment_start+len(segment),
97                segnum=wanted_segnum,
98                level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
99
100        o = overlap(segment_start, len(segment),  self._offset, self._size)
101        # the overlap is file[o[0]:o[0]+o[1]]
102        if not o or o[0] != self._offset:
103            # we didn't get the first byte, so we can't use this segment
104            log.msg("Segmentation handed wrong data:"
105                    " want [%d-%d), given [%d-%d), for segnum=%d,"
106                    " for si=%s"
107                    % (self._offset, self._offset+self._size,
108                       segment_start, segment_start+len(segment),
109                       wanted_segnum, self._node._si_prefix),
110                    level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
111            # we may retry if the segnum we asked was based on a guess
112            raise WrongSegmentError("I was given the wrong data.")
113        offset_in_segment = self._offset - segment_start
114        desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
115
116        self._offset += len(desired_data)
117        self._size -= len(desired_data)
118        self._consumer.write(desired_data)
119        # the consumer might call our .pauseProducing() inside that write()
120        # call, setting self._hungry=False
121        self._read_ev.update(len(desired_data), 0, 0)
122        self._maybe_fetch_next()
123
124    def _retry_bad_segment(self, f):
125        f.trap(WrongSegmentError, BadSegmentNumberError)
126        # we guessed the segnum wrong: either one that doesn't overlap with
127        # the start of our desired region, or one that's beyond the end of
128        # the world. Now that we have the right information, we're allowed to
129        # retry once.
130        assert self._node.segment_size is not None
131        return self._maybe_fetch_next()
132
133    def _error(self, f):
134        log.msg("Error in Segmentation", failure=f,
135                level=log.WEIRD, parent=self._lp, umid="EYlXBg")
136        self._alive = False
137        self._hungry = False
138        self._consumer.unregisterProducer()
139        self._deferred.errback(f)
140
141    def stopProducing(self):
142        self._hungry = False
143        self._alive = False
144        # cancel any outstanding segment request
145        if self._cancel_segment_request:
146            self._cancel_segment_request.cancel()
147            self._cancel_segment_request = None
148        e = DownloadStopped("our Consumer called stopProducing()")
149        self._deferred.errback(e)
150
151    def pauseProducing(self):
152        self._hungry = False
153        self._start_pause = now()
154    def resumeProducing(self):
155        self._hungry = True
156        eventually(self._maybe_fetch_next)
157        if self._start_pause is not None:
158            paused = now() - self._start_pause
159            self._read_ev.update(0, 0, paused)
160            self._start_pause = None
Note: See TracBrowser for help on using the repository browser.