Ticket #798: new-downloader-v2.diff

File new-downloader-v2.diff, 127.7 KB (added by warner, at 2010-04-23T23:35:11Z)

latest WIP patch

  • new file src/allmydata/immutable/download2.py

    diff --git a/src/allmydata/immutable/download2.py b/src/allmydata/immutable/download2.py
    new file mode 100644
    index 0000000..a665533
    - +  
     1
     2import binascii
     3from allmydata.util.hashtree import IncompleteHashTree, BadHashError, \
     4     NotEnoughHashesError
     5
     6(AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \
     7 ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM")
     8
     9class BadSegmentNumberError(Exception):
     10    pass
     11
     12class Share:
     13    """I represent a single instance of a single share (e.g. I reference the
     14    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
     15    I am associated with a CommonShare that remembers data that is held in
     16    common among e.g. SI=abcde/shnum2 across all servers. I am also
     17    associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
     18    servers).
     19    """
     20    # this is a specific implementation of IShare for tahoe's native storage
     21    # servers. A different backend would use a different class.
     22
     23    def __init__(self, rref, verifycap, commonshare, node, peerid, si_s, shnum):
     24        self._rref = rref
     25        self._node = node # holds share_hash_tree and UEB
     26        self._guess_offsets(verifycap, node.guessed_segment_size)
     27        self.actual_offsets = None
     28        self.actual_segment_size = None
     29        self._UEB_length = None
     30        self._commonshare = commonshare # holds block_hash_tree
     31        self._peerid = peerid
     32        self._peerid_s = base32.b2(peerid)[:5]
     33        self._si_prefix = si_s[:8]
     34        self._shnum = shnum
     35
     36        self._lp = log.msg(format="Share(%(si)s) on server=%(server)s starting",
     37                           si=self._si_prefix, server=self._peerid_s,
     38                           level=log.NOISY, umid="P7hv2w")
     39
     40        self._wanted = Spans() # desired metadata
     41        self._wanted_blocks = Spans() # desired block data
     42        self._requested = Spans() # we've sent a request for this
     43        self._received = Spans() # we've received a response for this
     44        self._received_data = DataSpans() # the response contents, still unused
     45        self._requested_blocks = [] # (segnum, set(observer2..))
     46        ver = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"]
     47        self._overrun_ok = ver["tolerates-immutable-read-overrun"]
     48        # If _overrun_ok and we guess the offsets correctly, we can get
     49        # everything in one RTT. If _overrun_ok and we guess wrong, we might
     50        # need two RTT (but we could get lucky and do it in one). If overrun
     51        # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
     52        # 2=offset table, 3=UEB_length and everything else (hashes, block),
     53        # 4=UEB.
     54
     55    def _guess_offsets(self, verifycap, guessed_segment_size):
     56        self.guessed_segment_size = guessed_segment_size
     57        size = verifycap.size
     58        k = verifycap.needed_shares
     59        N = verifycap.total_shares
     60        r = self._node._calculate_sizes(guessed_segment_size, size, k)
     61        offsets = {}
     62        for i,field in enumerate('data',
     63                                 'plaintext_hash_tree', # UNUSED
     64                                 'crypttext_hash_tree',
     65                                 'block_hashes',
     66                                 'share_hashes',
     67                                 'uri_extension',
     68                                 ):
     69            offsets[field] = i # bad guesses are easy :) # XXX stub
     70        self.guessed_offsets = offsets
     71        self._fieldsize = 4
     72        self._fieldstruct = ">L"
     73
     74    # called by our client, the SegmentFetcher
     75    def get_block(self, segnum):
     76        """Add a block number to the list of requests. This will eventually
     77        result in a fetch of the data necessary to validate the block, then
     78        the block itself. The fetch order is generally
     79        first-come-first-served, but requests may be answered out-of-order if
     80        data becomes available sooner.
     81
     82        I return an Observer2, which has two uses. The first is to call
     83        o.subscribe(), which gives me a place to send state changes and
     84        eventually the data block. The second is o.cancel(), which removes
     85        the request (if it is still active).
     86
     87        I will distribute the following events through my Observer2:
     88         - state=OVERDUE: ?? I believe I should have had an answer by now.
     89                          You may want to ask another share instead.
     90         - state=BADSEGNUM: the segnum you asked for is too large. I must
     91                            fetch a valid UEB before I can determine this,
     92                            so the notification is asynchronous
     93         - state=COMPLETE, block=data: here is a valid block
     94         - state=CORRUPT: this share contains corrupted data
     95         - state=DEAD, f=Failure: the server reported an error, this share
     96                                  is unusable
     97        """
     98        assert segnum >= 0
     99        o = Observer2()
     100        o.set_canceler(self._cancel_block_request)
     101        for i,(segnum0,observers) in enumerate(self._requested_blocks):
     102            if segnum0 == segnum:
     103                observers.add(o)
     104                break
     105        else:
     106            self._requested_blocks.append(segnum, set([o]))
     107        eventually(self.loop)
     108        return o
     109
     110    def _cancel_block_request(self, o):
     111        new_requests = []
     112        for e in self._requested_blocks:
     113            (segnum0, observers) = e
     114            observers.discard(o)
     115            if observers:
     116                new_requests.append(e)
     117        self._requested_blocks = new_requests
     118
     119    # internal methods
     120    def _active_segnum(self):
     121        if self._requested_blocks:
     122            return self._requested_blocks[0]
     123        return None
     124
     125    def _active_segnum_and_observers(self):
     126        if self._requested_blocks:
     127            # we only retrieve information for one segment at a time, to
     128            # minimize alacrity (first come, first served)
     129            return self._requested_blocks[0]
     130        return None, []
     131
     132    def loop(self):
     133        # TODO: if any exceptions occur here, kill the download
     134
     135        # we are (eventually) called after all state transitions:
     136        #  new segments added to self._requested_blocks
     137        #  new data received from servers (responses to our read() calls)
     138        #  impatience timer fires (server appears slow)
     139
     140        # First, consume all of the information that we currently have, for
     141        # all the segments people currently want.
     142        while self._get_satisfaction():
     143            pass
     144
     145        # When we get no satisfaction (from the data we've received so far),
     146        # we determine what data we desire (to satisfy more requests). The
     147        # number of segments is finite, so I can't get no satisfaction
     148        # forever.
     149        self._desire()
     150
     151        # finally send out requests for whatever we need (desire minus have).
     152        # You can't always get what you want, but, sometimes, you get what
     153        # you need.
     154        self._request_needed() # express desire
     155
     156    def _get_satisfaction(self):
     157        # return True if we retired a data block, and should therefore be
     158        # called again. Return False if we don't retire a data block (even if
     159        # we do retire some other data, like hash chains).
     160
     161        if self.actual_offsets is None:
     162            if not self._satisfy_offsets():
     163                # can't even look at anything without the offset table
     164                return False
     165
     166        if not self._node.have_UEB:
     167            if not self._satisfy_UEB():
     168                # can't check any hashes without the UEB
     169                return False
     170
     171        segnum, observers = self._active_segnum_and_observers()
     172        if segnum >= self._node.num_segments:
     173            for o in observers:
     174                o.notify(state=BADSEGNUM)
     175            self._requested_blocks.pop(0)
     176            return True
     177
     178        if self._node.share_hash_tree.needed_hashes(self.shnum):
     179            if not self._satisfy_share_hash_tree():
     180                # can't check block_hash_tree without a root
     181                return False
     182
     183        if segnum is None:
     184            return False # we don't want any particular segment right now
     185
     186        # block_hash_tree
     187        needed_hashes = self._commonshare.block_hash_tree.needed_hashes(segnum)
     188        if needed_hashes:
     189            if not self._satisfy_block_hash_tree(needed_hashes):
     190                # can't check block without block_hash_tree
     191                return False
     192
     193        # data blocks
     194        return self._satisfy_data_block(segnum, observers)
     195
     196    def _satisfy_offsets(self):
     197        version_s = self._received_data.get(0, 4)
     198        if version_s is None:
     199            return False
     200        (version,) = struct.unpack(">L", version_s)
     201        if version == 1:
     202            table_start = 0x0c
     203            self._fieldsize = 0x4
     204            self._fieldstruct = ">L"
     205        else:
     206            table_start = 0x14
     207            self._fieldsize = 0x8
     208            self._fieldstruct = ">Q"
     209        offset_table_size = 6 * self._fieldsize
     210        table_s = self._received_data.pop(table_start, offset_table_size)
     211        if table_s is None:
     212            return False
     213        fields = struct.unpack(6*self._fieldstruct, table_s)
     214        offsets = {}
     215        for i,field in enumerate('data',
     216                                 'plaintext_hash_tree', # UNUSED
     217                                 'crypttext_hash_tree',
     218                                 'block_hashes',
     219                                 'share_hashes',
     220                                 'uri_extension',
     221                                 ):
     222            offsets[field] = fields[i]
     223        self.actual_offsets = offsets
     224        self._received_data.remove(0, 4) # don't need this anymore
     225        return True
     226
     227    def _satisfy_UEB(self):
     228        o = self.actual_offsets
     229        fsize = self._fieldsize
     230        rdata = self._received_data
     231        UEB_length_s = rdata.get(o["uri_extension"], fsize)
     232        if not UEB_length_s:
     233            return False
     234        UEB_length = struct.unpack(UEB_length_s, self._fieldstruct)
     235        UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length)
     236        if not UEB_s:
     237            return False
     238        rdata.remove(o["uri_extension"], fsize)
     239        try:
     240            self._node.validate_and_store_UEB(UEB_s)
     241            self.actual_segment_size = self._node.segment_size
     242            assert self.actual_segment_size is not None
     243            return True
     244        except hashtree.BadHashError:
     245            # TODO: if this UEB was bad, we'll keep trying to validate it
     246            # over and over again. Only log.err on the first one, or better
     247            # yet skip all but the first
     248            f = Failure()
     249            self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
     250            return False
     251
     252    def _satisfy_share_hash_tree(self):
     253        # the share hash chain is stored as (hashnum,hash) tuples, so you
     254        # can't fetch just the pieces you need, because you don't know
     255        # exactly where they are. So fetch everything, and parse the results
     256        # later.
     257        o = self.actual_offsets
     258        rdata = self._received_data
     259        hashlen = o["uri_extension"] - o["share_hashes"]
     260        assert hashlen % (2+HASH_SIZE) == 0
     261        hashdata = rdata.get(o["share_hashes"], hashlen)
     262        if not hashdata:
     263            return False
     264        share_hashes = {}
     265        for i in range(0, hashlen, 2+HASH_SIZE):
     266            hashnum = struct.unpack(">H", hashdata[i:i+2])[0]
     267            hashvalue = hashdata[i+2:i+2+HASH_SIZE]
     268            share_hashes[hashnum] = hashvalue
     269        try:
     270            self._node.process_share_hashes(share_hashes)
     271            # adds to self._node.share_hash_tree
     272            rdata.remove(o["share_hashes"], hashlen)
     273            return True
     274        except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
     275                IndexError):
     276            f = Failure()
     277            self._signal_corruption(f, o["share_hashes"], hashlen)
     278            return False
     279
     280    def _signal_corruption(self, f, start, offset):
     281        # there was corruption somewhere in the given range
     282        print f # XXX
     283        pass
     284
     285    def _satisfy_block_hash_tree(self, needed_hashes):
     286        o = self.actual_offsets
     287        rdata = self._received_data
     288        block_hashes = {}
     289        for hashnum in needed_hashes:
     290            hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
     291            if hashdata:
     292                block_hashes[hashnum] = hashdata
     293            else:
     294                return False # missing some hashes
     295        # note that we don't submit any hashes to the block_hash_tree until
     296        # we've gotten them all, because the hash tree will throw an
     297        # exception if we only give it a partial set (which it therefore
     298        # cannot validate)
     299        ok = commonshare.process_block_hashes(block_hashes, serverid_s)
     300        if not ok:
     301            return False
     302        for hashnum in needed_hashes:
     303            rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
     304        return True
     305
     306    def _satisfy_data_block(self, segnum, observers):
     307        tail = (segnum == self._node.num_segments-1)
     308        datastart = self.actual_offsets["data"]
     309        blockstart = datastart + segnum * self._node.block_size
     310        blocklen = self._node.block_size
     311        if tail:
     312            blocklen = self._node.tail_block_size
     313
     314        block = rdata.pop(blockstart, blocklen)
     315        if not block:
     316            return False
     317        # this block is being retired, either as COMPLETE or CORRUPT, since
     318        # no further data reads will help
     319        assert self._requested_blocks[0][0] == segnum
     320        ok = commonshare.check_block(segnum, block)
     321        if ok:
     322            for o in observers:
     323                # goes to SegmentFetcher._block_request_activity
     324                o.notify(state=COMPLETE, block=block)
     325        else:
     326            for o in observers:
     327                o.notify(state=CORRUPT)
     328        self._requested_blocks.pop(0) # retired
     329        return True # got satisfaction
     330
     331    def _desire(self):
     332        segnum, observers = self._active_segnum_and_observers()
     333        fsize = self._fieldsize
     334        rdata = self._received_data
     335        commonshare = self._commonshare
     336
     337        if not self.actual_offsets:
     338            self._desire_offsets()
     339
     340        # we can use guessed offsets as long as this server tolerates overrun
     341        if not self.actual_offsets and not self._overrun_ok:
     342            return # must wait for the offsets to arrive
     343
     344        o = self.actual_offsets or self.guessed_offsets
     345        segsize = self.actual_segment_size or self.guessed_segment_size
     346        if not self._node.have_UEB:
     347            self._desire_UEB(o)
     348
     349        if self._node.share_hash_tree.needed_hashes(self.shnum):
     350            hashlen = o["uri_extension"] - o["share_hashes"]
     351            self._wanted.add(o["share_hashes"], hashlen)
     352
     353        if segnum is None:
     354            return # only need block hashes or blocks for active segments
     355
     356        # block hash chain
     357        for hashnum in commonshare.block_hash_tree.needed_hashes(segnum):
     358            self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
     359
     360        # data
     361        r = self._node._calculate_sizes(segsize, size, k) # XXX
     362        tail = (segnum == r["num_segments"])
     363        datastart = o["data"]
     364        blockstart = datastart + segnum * r["block_size"]
     365        blocklen = r["block_size"]
     366        if tail:
     367            blocklen = r["tail_block_size"]
     368        self._wanted_blocks.add(blockstart, blocklen)
     369
     370
     371    def _desire_offsets(self):
     372        if self._overrun_ok:
     373            # easy! this includes version number, sizes, and offsets
     374            self._wanted.add(0,1024)
     375            return
     376
     377        # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
     378        # To be conservative, only request the data that we know lives there,
     379        # even if that means more roundtrips.
     380
     381        self._wanted.add(0,4)  # version number, always safe
     382        version_s = self._received_data.get(0, 4)
     383        if not version_s:
     384            return
     385        (version,) = struct.unpack(">L", version_s)
     386        if version == 1:
     387            table_start = 0x0c
     388            fieldsize = 0x4
     389        else:
     390            table_start = 0x14
     391            fieldsize = 0x8
     392        offset_table_size = 6 * fieldsize
     393        self._wanted.add(table_start, offset_table_size)
     394
     395    def _desire_UEB(self, o):
     396        # UEB data is stored as (length,data).
     397        if self._overrun_ok:
     398            # We can pre-fetch 2kb, which should probably cover it. If it
     399            # turns out to be larger, we'll come back here later with a known
     400            # length and fetch the rest.
     401            self._wanted.add(o["uri_extension"], 2048)
     402            # now, while that is probably enough to fetch the whole UEB, it
     403            # might not be, so we need to do the next few steps as well. In
     404            # most cases, the following steps will not actually add anything
     405            # to self._wanted
     406
     407        self._wanted.add(o["uri_extension"], self._fieldsize)
     408        # only use a length if we're sure it's correct, otherwise we'll
     409        # probably fetch a huge number
     410        if not self.actual_offsets:
     411            return
     412        UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize)
     413        if UEB_length_s:
     414            UEB_length = struct.unpack(UEB_length_s, self._fieldstruct)
     415            # we know the length, so make sure we grab everything
     416            self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length)
     417
     418    def _request_needed(self):
     419        # send requests for metadata first, to avoid hanging on to large data
     420        # blocks any longer than necessary.
     421        self._send_requests(self._wanted - self._received - self._requested)
     422        # then send requests for data blocks. All the hashes should arrive
     423        # before the blocks, so the blocks can be consumed and released in a
     424        # single turn.
     425        ask = self._wanted_blocks - self._received - self._requested
     426        self._send_requests(ask)
     427
     428    def _send_requests(self, needed):
     429        for (start, length) in needed:
     430            # TODO: quantize to reasonably-large blocks
     431            self._requested.add(start, length)
     432            d = self._send_request(start, length)
     433            d.addCallback(self._got_data, start, length)
     434            d.addErrback(self._got_error)
     435            d.addErrback(lambda f:
     436                         log.err(format="unhandled error during send_request",
     437                                 failure=f, parent=self._lp,
     438                                 level=log.WEIRD, umid="qZu0wg"))
     439
     440    def _send_request(self, start, length):
     441        return self._rref.callRemote("read", start, length)
     442
     443    def _got_data(self, data, start, length):
     444        span = (start, length)
     445        assert span in self._requested
     446        self._requested.remove(start, length)
     447        self._received.add(start, length)
     448        self._received_data.add(start, data)
     449        eventually(self.loop)
     450
     451    def _got_error(self, f): # XXX
     452        log.msg(format="error requesting %(start)d+%(length)d"
     453                " from %(server)s for si %(si)s",
     454                start=start, length=length,
     455                server=self._peerid_s, si=self._si_prefix,
     456                failure=f, parent=self._lp,
     457                level=log.UNUSUAL, umid="qZu0wg")
     458        # retire our observers, assuming we won't be able to make any
     459        # further progress
     460        self._fail(f)
     461
     462    def _fail(self, f):
     463        for (segnum, o) in self._requested_blocks:
     464            o.notify(state=DEAD, f=f)
     465
     466
     467class CommonShare:
     468    """I hold data that is common across all instances of a single share,
     469    like sh2 on both servers A and B. This is just the block hash tree.
     470    """
     471    def __init__(self, numsegs, si_s, shnum):
     472        self.si_s = si_s
     473        self.shnum = shnum
     474        if numsegs is not None:
     475            self._block_hash_tree = IncompleteHashTree(numsegs)
     476
     477    def got_numsegs(self, numsegs):
     478        self._block_hash_tree = IncompleteHashTree(numsegs)
     479
     480    def process_block_hashes(self, block_hashes, serverid_s):
     481        try:
     482            self._block_hash_tree.add_hashes(block_hashes)
     483            return True
     484        except (hashtree.BadHashError, hashtree.NotEnoughHashesError):
     485            hashnums = ",".join(sorted(block_hashes.keys()))
     486            self.log("hash failure in block_hashes=(%(hashnums)s),"
     487                     " shnum=%(shnum)d SI=%(si)s server=%(server)s",
     488                     hashnums=hashnums, shnum=self.shnum,
     489                     si=self.si_s, server=serverid_s,
     490                     level=log.WEIRD, umid="yNyFdA")
     491        return False
     492
     493    def check_block(self, segnum, block):
     494        h = hashutil.block_hash(block)
     495        try:
     496            self._block_hash_tree.set_hashes(leaves={segnum: h})
     497        except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
     498            LOG(...)
     499            return False
     500        return True
     501
     502# all classes are also Services, and the rule is that you don't initiate more
     503# work unless self.running
     504
     505# GC: decide whether each service is restartable or not. For non-restartable
     506# services, stopService() should delete a lot of attributes to kill reference
     507# cycles. The primary goal is to decref remote storage BucketReaders when a
     508# download is complete.
     509
     510class SegmentFetcher:
     511    """I am responsible for acquiring blocks for a single segment. I will use
     512    the Share instances passed to my add_shares() method to locate, retrieve,
     513    and validate those blocks. I expect my parent node to call my
     514    no_more_shares() method when there are no more shares available. I will
     515    call my parent's want_more_shares() method when I want more: I expect to
     516    see at least one call to add_shares or no_more_shares afterwards.
     517
     518    When I have enough validated blocks, I will call my parent's
     519    process_blocks() method with a dictionary that maps shnum to blockdata.
     520    If I am unable to provide enough blocks, I will call my parent's
     521    fetch_failed() method with (self, f). After either of these events, I
     522    will shut down and do no further work. My parent can also call my stop()
     523    method to have me shut down early."""
     524
     525    def __init__(self, node, segnum, k):
     526        self._node = node # _Node
     527        self.segnum = segnum
     528        self._k = k
     529        self._shares = {} # maps non-dead Share instance to a state, one of
     530                          # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
     531                          # State transition map is:
     532                          #  AVAILABLE -(send-read)-> PENDING
     533                          #  PENDING -(timer)-> OVERDUE
     534                          #  PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
     535                          #  OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
     536                          # If a share becomes DEAD, it is removed from the
     537                          # dict. If it becomes BADSEGNUM, the whole fetch is
     538                          # terminated.
     539        self._share_observers = {} # maps Share to Observer2 for active ones
     540        self._shnums = DictOfSets() # maps shnum to the shares that provide it
     541        self._blocks = {} # maps shnum to validated block data
     542        self._no_more_shares = False
     543        self._bad_segnum = False
     544        self._last_failure = None
     545        self._running = True
     546
     547    def stop(self):
     548        self._cancel_all_requests()
     549        self._running = False
     550        del self._shares # let GC work # ???
     551
     552
     553    # called by our parent CiphertextFileNode
     554
     555    def add_shares(self, shares):
     556        # called when ShareFinder locates a new share, and when a non-initial
     557        # segment fetch is started and we already know about shares from the
     558        # previous segment
     559        for s in shares:
     560            self._shares[s] = AVAILABLE
     561            self._shnums[s.shnum].add(s)
     562        eventually(self._loop)
     563
     564    def no_more_shares(self):
     565        # ShareFinder tells us it's reached the end of its list
     566        self._no_more_shares = True
     567
     568    # internal methods
     569
     570    def _count_shnums(self, *states):
     571        """shnums for which at least one state is in the following list"""
     572        shnums = []
     573        for shnum,shares in self._shnums.iteritems():
     574            matches = [s for s in shares if s.state in states]
     575            if matches:
     576                shnums.append(shnum)
     577        return len(shnums)
     578
     579    def _loop(self):
     580        k = self._k
     581        if not self._running:
     582            return
     583        if self._bad_segnum:
     584            # oops, we were asking for a segment number beyond the end of the
     585            # file. This is an error.
     586            self.stop()
     587            e = BadSegmentNumberError("%d > %d" % (self.segnum,
     588                                                   self._node.num_segments))
     589            f = Failure(e)
     590            self._node.fetch_failed(self, f)
     591            return
     592
     593        # are we done?
     594        if self._count_shnums(COMPLETE) >= k:
     595            # yay!
     596            self.stop()
     597            self._node.process_blocks(self.segnum, self._blocks)
     598            return
     599
     600        # we may have exhausted everything
     601        if (self._no_more_shares and
     602            self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
     603            # no more new shares are coming, and the remaining hopeful shares
     604            # aren't going to be enough. boo!
     605            self.stop()
     606            format = ("ran out of shares: %(complete)d complete,"
     607                      " %(pending)d pending, %(overdue)d overdue,"
     608                      " %(unused)d unused, need %(k)k."
     609                      " Last failure: %(last_failure)s")
     610            args = {"complete": self._count_shnums(COMPLETE),
     611                    "pending": self._count_shnums(PENDING),
     612                    "overdue": self._count_shnums(OVERDUE),
     613                    "unused": self._count_shnums(AVAILABLE), # should be zero
     614                    "k": k,
     615                    "last_failure": self._last_failure,
     616                    }
     617            self.log(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
     618            e = NotEnoughShares(format % args)
     619            f = Failure(e)
     620            self._node.fetch_failed(self, f)
     621            return
     622
     623        # nope, not done. Are we "block-hungry" (i.e. do we want to send out
     624        # more read requests, or do we think we have enough in flight
     625        # already?)
     626        while self._count_shnums(PENDING, COMPLETE) < k:
     627            # we're hungry.. are there any unused shares?
     628            sent = self._send_new_request()
     629            if not sent:
     630                break
     631
     632        # ok, now are we "share-hungry" (i.e. do we have enough known shares
     633        # to make us happy, or should we ask the ShareFinder to get us more?)
     634        if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
     635            # we're hungry for more shares
     636            self._node.want_more_shares()
     637            # that will trigger the ShareFinder to keep looking
     638
     639    def _find_one(self, shares, state):
     640        # TODO could choose fastest
     641        for s in shares:
     642            if self._shares[s] == state:
     643                return s
     644        raise IndexError("shouldn't get here")
     645
     646    def _send_new_request(self):
     647        for shnum,shares in self._shnums.iteritems():
     648            states = [self._shares[s] for s in shares]
     649            if COMPLETE in states or PENDING in states:
     650                # don't send redundant requests
     651                continue
     652            if AVAILABLE not in states:
     653                # no candidates for this shnum, move on
     654                continue
     655            # here's a candidate. Send a request.
     656            s = self._find_one(shares, AVAILABLE)
     657            self._shares[s] = PENDING
     658            self._share_observers[s] = o = s.get_block(segnum)
     659            o.subscribe(self._block_request_activity, share=s, shnum=shnum)
     660            # TODO: build up a list of candidates, then walk through the
     661            # list, sending requests to the most desireable servers,
     662            # re-checking our block-hunger each time. For non-initial segment
     663            # fetches, this would let us stick with faster servers.
     664            return True
     665        # nothing was sent: don't call us again until you have more shares to
     666        # work with, or one of the existing shares has been declared OVERDUE
     667        return False
     668
     669    def _cancel_all_requests(self):
     670        for o in self._share_observers.values():
     671            o.cancel()
     672        self._share_observers = {}
     673
     674    def _block_request_activity(self, share, shnum, state, block=None, f=None):
     675        # called by Shares, in response to our s.send_request() calls.
     676        # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
     677        if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
     678            del self._share_observers[share]
     679        if state is COMPLETE:
     680            # 'block' is fully validated
     681            self._shares[share] = COMPLETE
     682            self._blocks[shnum] = block
     683        elif state is OVERDUE:
     684            self._shares[share] = OVERDUE
     685            # OVERDUE is not terminal: it will eventually transition to
     686            # COMPLETE, CORRUPT, or DEAD.
     687        elif state is CORRUPT:
     688            self._shares[share] = CORRUPT
     689        elif state is DEAD:
     690            del self._shares[share]
     691            self._shnums[shnum].remove(share)
     692            self._last_failure = f
     693        elif state is BADSEGNUM:
     694            self._shares[share] = BADSEGNUM # ???
     695            self._bad_segnum = True
     696        eventually(self._loop)
     697
     698
     699class RequestToken:
     700    def __init__(self, peerid):
     701        self.peerid = peerid
     702
     703class ShareFinder:
     704    def __init__(self, storage_broker, storage_index,
     705                 share_consumer, max_outstanding_requests=10):
     706        # XXX need self.node -> CiphertextFileNode
     707        self.running = True
     708        s = storage_broker.get_servers_for_index(storage_index)
     709        self._servers = iter(s)
     710        self.share_consumer = share_consumer
     711        self.max_outstanding = max_outstanding_requests
     712
     713        self._hungry = False
     714
     715        self._commonshares = {} # shnum to CommonShare instance
     716        self.undelivered_shares = []
     717        self.pending_requests = set()
     718
     719        self._si_s = base32.b2a(storage_index)
     720        self._si_prefix = base32.b2a_l(storage_index[:8], 60)
     721        self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
     722                           si=self._si_prefix, level=log.NOISY, umid="2xjj2A")
     723
     724        self._num_segments = None
     725        d = share_consumer.get_num_segments()
     726        d.addCallback(self._got_numsegs)
     727        def _err_numsegs(f):
     728            log.err(format="Unable to get number of segments", failure=f,
     729                    parent=self._lp, level=log.UNUSUAL, umid="dh38Xw")
     730        d.addErrback(_err_numsegs)
     731
     732    def log(self, *args, **kwargs):
     733        if "parent" not in kwargs:
     734            kwargs["parent"] = self._lp
     735        return log.msg(*args, **kwargs)
     736
     737    def stop(self):
     738        self.running = False
     739
     740    def _got_numsegs(self, numsegs):
     741        for cs in self._commonshares.values():
     742            cs.got_numsegs(numsegs)
     743        self._num_segments = numsegs
     744
     745    # called by our parent CiphertextDownloader
     746    def hungry(self):
     747        log.msg(format="ShareFinder[si=%(si)s] hungry",
     748                si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
     749        self._hungry = True
     750        eventually(self.loop)
     751
     752    # internal methods
     753    def loop(self):
     754        log.msg(format="ShareFinder[si=%(si)s] loop: running=%(running)s"
     755                " hungry=%(hungry)s, undelivered=%(undelivered)s,"
     756                " pending=%(pending)s",
     757                si=self._si_prefix, running=self._running, hungry=self._hungry,
     758                undelivered=",".join(["sh%d@%s" % (s._shnum,
     759                                                   idlib.shortnodeid_b2a(s._peerid))
     760                                      for s in self.undelivered_shares]),
     761                pending=",".join([idlib.shortnodeid_b2a(rt.peerid)
     762                                  for rt in self.pending_requests]), # sort?
     763                level=log.NOISY, umid="kRtS4Q")
     764        if not self.running:
     765            return
     766        if not self._hungry:
     767            return
     768        if self.undelivered_shares:
     769            sh = self.undelivered_shares.pop(0)
     770            # they will call hungry() again if they want more
     771            self._hungry = False
     772            eventually(self.share_consumer.got_shares, [sh])
     773            return
     774        if len(self.pending_requests) >= self.max_outstanding_requests:
     775            # cannot send more requests, must wait for some to retire
     776            return
     777
     778        server = None
     779        try:
     780            if self._servers:
     781                server = self._servers.next()
     782        except StopIteration:
     783            self._servers = None
     784
     785        if server:
     786            self.send_request(server)
     787            return
     788
     789        if self.pending_requests:
     790            # no server, but there are still requests in flight: maybe one of
     791            # them will make progress
     792            return
     793
     794        # we've run out of servers (so we can't send any more requests), and
     795        # we have nothing in flight. No further progress can be made. They
     796        # are destined to remain hungry.
     797        self.share_consumer.no_more_shares()
     798        self.stop()
     799
     800
     801    def send_request(self, server):
     802        peerid, rref = server
     803        req = RequestToken(peerid)
     804        self.pending_requests.add(req)
     805        lp = self.log(format="sending DYHB to [%(peerid)s]",
     806                      peerid=idlib.shortnodeid_b2a(peerid),
     807                      level=log.NOISY, umid="Io7pyg")
     808        d = rref.callRemote("get_buckets", self._storage_index)
     809        d.addBoth(incidentally, self.pending_requests.discard, req)
     810        d.addCallbacks(self._got_response, self._got_error,
     811                       callbackArgs=(peerid, req, lp))
     812        d.addErrback(log.err, format="error in send_request",
     813                     level=log.WEIRD, parent=lp, umid="rpdV0w")
     814        d.addCallback(incidentally, eventually, self.loop)
     815
     816    def _got_response(self, buckets, peerid, req, lp):
     817        if buckets:
     818            shnums_s = ",".join([str(shnum) for shnum in buckets])
     819            self.log(format="got shnums [%s] from [%(peerid)s]" % shnums_s,
     820                     peerid=idlib.shortnodeid_b2a(peerid),
     821                     level=log.NOISY, parent=lp, umid="0fcEZw")
     822        else:
     823            self.log(format="no shares from [%(peerid)s]",
     824                     peerid=idlib.shortnodeid_b2a(peerid),
     825                     level=log.NOISY, parent=lp, umid="U7d4JA")
     826        for shnum, bucket in buckets.iteritems():
     827            if shnum not in self._commonshares:
     828                self._commonshares[shnum] = CommonShare(self._num_segments,
     829                                                        self._si_s, shnum)
     830            cs = self._commonshares[shnum]
     831            s = Share(bucket, self.verifycap, cs, self.node,
     832                      peerid, self._si_s, shnum)
     833            self.undelivered_shares.append(s)
     834
     835    def _got_error(self, f, peerid, req):
     836        self.log(format="got error from [%(peerid)s]",
     837                 peerid=idlib.shortnodeid_b2a(peerid), failure=f,
     838                 level=log.UNUSUAL, parent=lp, umid="zUKdCw")
     839
     840
     841
     842class Segmentation:
     843    """I am responsible for a single offset+size read of the file. I handle
     844    segmentation: I figure out which segments are necessary, request them
     845    (from my CiphertextDownloader) in order, and trim the segments down to
     846    match the offset+size span. I use the Producer/Consumer interface to only
     847    request one segment at a time.
     848    """
     849    implements(IPushProducer)
     850    def __init__(self, node, offset, size, consumer):
     851        self._node = node
     852        self._hungry = True
     853        self._active_segnum = None
     854        self._cancel_segment_request = None
     855        # these are updated as we deliver data. At any given time, we still
     856        # want to download file[offset:offset+size]
     857        self._offset = offset
     858        self._size = size
     859        self._consumer = consumer
     860
     861    def start(self):
     862        self._alive = True
     863        self._deferred = defer.Deferred()
     864        self._consumer.registerProducer(self) # XXX???
     865        self._maybe_fetch_next()
     866        return self._deferred
     867
     868    def _maybe_fetch_next(self):
     869        if not self._alive or not self._hungry:
     870            return
     871        if self._active_segnum is not None:
     872            return
     873        self._fetch_next()
     874
     875    def _fetch_next(self):
     876        if self._size == 0:
     877            # done!
     878            self._alive = False
     879            self._hungry = False
     880            self._consumer.unregisterProducer()
     881            self._deferred.callback(self._consumer)
     882            return
     883        n = self._node
     884        have_actual_segment_size = n.segment_size is not None
     885        segment_size = n.segment_size or n.guessed_segment_size
     886        if self._offset == 0:
     887            # great! we want segment0 for sure
     888            wanted_segnum = 0
     889        else:
     890            # this might be a guess
     891            wanted_segnum = self._offset // segment_size
     892        self._active_segnum = wanted_segnum
     893        d,c = n.get_segment(wanted_segnum)
     894        self._cancel_segment_request = c
     895        d.addBoth(self._request_retired)
     896        d.addCallback(self._got_segment, have_actual_segment_size)
     897        d.addErrback(self._retry_bad_segment, have_actual_segment_size)
     898        d.addErrback(self._error)
     899
     900    def _request_retired(self, res):
     901        self._active_segnum = None
     902        self._cancel_segment_request = None
     903        return res
     904
     905    def _got_segment(self, (segment_start,segment), had_actual_segment_size):
     906        self._active_segnum = None
     907        self._cancel_segment_request = None
     908        # we got file[segment_start:segment_start+len(segment)]
     909        # we want file[self._offset:self._offset+self._size]
     910        o = overlap(segment_start, len(segment),  self._offset, self._size)
     911        # the overlap is file[o[0]:o[0]+o[1]]
     912        if not o or o[0] != self._offset:
     913            # we didn't get the first byte, so we can't use this segment
     914            if have_actual_segment_size:
     915                # and we should have gotten it right. This is big problem.
     916                raise SOMETHING
     917            # we've wasted some bandwidth, but now we can grab the right one,
     918            # because we should know the segsize by now.
     919            assert self._node.segment_size is not None
     920            self._maybe_fetch_next()
     921            return
     922        offset_in_segment = self._offset - segment_start
     923        desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
     924
     925        self._offset += len(desired_data)
     926        self._size -= len(desired_data)
     927        self._consumer.write(desired_data)
     928        self._maybe_fetch_next()
     929
     930    def _retry_bad_segment(self, f, had_actual_segment_size):
     931        f.trap(BadSegmentNumberError): # guessed way wrong, off the end
     932        if had_actual_segment_size:
     933            # but we should have known better, so this is a real error
     934            return f
     935        # we didn't know better: try again with more information
     936        return self._maybe_fetch_next()
     937
     938    def _error(self, f):
     939        self._alive = False
     940        self._hungry = False
     941        self._consumer.unregisterProducer()
     942        self._deferred.errback(f)
     943
     944    def stopProducing(self):
     945        self._hungry = False
     946        self._alive = False
     947        # cancel any outstanding segment request
     948        if self._cancel_segment_request:
     949            self._cancel_segment_request()
     950            self._cancel_segment_request = None
     951    def pauseProducing(self):
     952        self._hungry = False
     953    def resumeProducing(self):
     954        self._hungry = True
     955        eventually(self._maybe_fetch_next)
     956
     957class Cancel:
     958    def __init__(self, f):
     959        self._f = f
     960        self.cancelled = False
     961    def cancel(self):
     962        if not self.cancelled:
     963            self.cancelled = True
     964            self._f(self)
     965
     966class _Node:
     967    """Internal class which manages downloads and holds state. External
     968    callers use CiphertextFileNode instead."""
     969
     970    # Share._node points to me
     971    def __init__(self, verifycap, storage_broker, secret_holder,
     972                 terminator, history):
     973        assert isinstance(verifycap, CHKFileVerifierURI)
     974        self._verifycap = verifycap
     975        self.running = True
     976        terminator.register(self) # calls self.stop() at stopService()
     977        # the rules are:
     978        # 1: Only send network requests if you're active (self.running is True)
     979        # 2: Use TimerService, not reactor.callLater
     980        # 3: You can do eventual-sends any time.
     981        # These rules should mean that once
     982        # stopService()+flushEventualQueue() fires, everything will be done.
     983        self._secret_holder = secret_holder
     984        self._history = history
     985
     986        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
     987        self.share_hash_tree = IncompleteHashTree(N)
     988
     989        # we guess the segment size, so Segmentation can pull non-initial
     990        # segments in a single roundtrip
     991        max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the
     992                                   # same place as upload.BaseUploadable
     993        s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k)
     994        self.guessed_segment_size = s
     995
     996        # filled in when we parse a valid UEB
     997        self.have_UEB = False
     998        self.segment_size = None
     999        self.tail_segment_size = None
     1000        self.tail_segment_padded = None
     1001        self.num_segments = None
     1002        self.block_size = None
     1003        self.tail_block_size = None
     1004        self.ciphertext_hash_tree = None # size depends on num_segments
     1005        self.ciphertext_hash = None # flat hash, optional
     1006
     1007        # things to track callers that want data
     1008        self._segsize_observers = OneShotObserverList()
     1009        self._numsegs_observers = OneShotObserverList()
     1010        # _segment_requests can have duplicates
     1011        self._segment_requests = [] # (segnum, d, cancel_handle)
     1012        self._active_segment = None # a SegmentFetcher, with .segnum
     1013
     1014        storage_index = verifycap.storage_index
     1015        self._sharefinder = ShareFinder(storage_broker, storage_index, self)
     1016        self._shares = set()
     1017
     1018    def stop(self):
     1019        # called by the Terminator at shutdown, mostly for tests
     1020        if self._active_segment:
     1021            self._active_segment.stop()
     1022            self._active_segment = None
     1023        self._sharefinder.stop()
     1024
     1025    # things called by outside callers, via CiphertextFileNode. get_segment()
     1026    # may also be called by Segmentation.
     1027
     1028    def read(self, consumer, offset=0, size=None):
     1029        """I am the main entry point, from which FileNode.read() can get
     1030        data. I feed the consumer with the desired range of ciphertext. I
     1031        return a Deferred that fires (with the consumer) when the read is
     1032        finished."""
     1033        # for concurrent operations: each gets its own Segmentation manager
     1034        if size is None:
     1035            size = self._verifycap.size - offset
     1036        s = Segmentation(self, offset, size, consumer)
     1037        # this raises an interesting question: what segments to fetch? if
     1038        # offset=0, always fetch the first segment, and then allow
     1039        # Segmentation to be responsible for pulling the subsequent ones if
     1040        # the first wasn't large enough. If offset>0, we're going to need an
     1041        # extra roundtrip to get the UEB (and therefore the segment size)
     1042        # before we can figure out which segment to get. TODO: allow the
     1043        # offset-table-guessing code (which starts by guessing the segsize)
     1044        # to assist the offset>0 process.
     1045        d = s.start()
     1046        return d
     1047
     1048    def get_segment(self, segnum):
     1049        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
     1050        Deferred that fires with (offset,data) when the desired segment is
     1051        available, and c is an object on which c.cancel() can be called to
     1052        disavow interest in the segment (after which 'd' will never fire).
     1053
     1054        You probably need to know the segment size before calling this,
     1055        unless you want the first few bytes of the file. If you ask for a
     1056        segment number which turns out to be too large, the Deferred will
     1057        errback with BadSegmentNumberError.
     1058
     1059        The Deferred fires with the offset of the first byte of the data
     1060        segment, so that you can call get_segment() before knowing the
     1061        segment size, and still know which data you received.
     1062        """
     1063        d = defer.Deferred()
     1064        c = Cancel(self._cancel_request)
     1065        self._segment_requests.append( (segnum, d, c) )
     1066        self._start_new_segment()
     1067        eventually(self._loop)
     1068        return (d, c)
     1069
     1070    # things called by the Segmentation object used to transform
     1071    # arbitrary-sized read() calls into quantized segment fetches
     1072
     1073    def get_segment_size(self):
     1074        """I return a Deferred that fires with the segment_size used by this
     1075        file."""
     1076        return self._segsize_observers.when_fired()
     1077    def get_num_segments(self):
     1078        """I return a Deferred that fires with the number of segments used by
     1079        this file."""
     1080        return self._numsegs_observers.when_fired()
     1081
     1082    def _start_new_segment(self):
     1083        if self._active_segment is None and self._segment_requests:
     1084            segnum = self._segment_requests[0][0]
     1085            k = self._verifycap.needed_shares
     1086            self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
     1087            active_shares = [s for s in self._shares if s.not_dead()]
     1088            fetcher.add_shares(active_shares) # this triggers the loop
     1089
     1090
     1091    # called by our child ShareFinder
     1092    def got_shares(self, shares):
     1093        self._shares.update(shares)
     1094        if self._active_segment
     1095            self._active_segment.add_shares(shares)
     1096    def no_more_shares(self):
     1097        self._no_more_shares = True
     1098        if self._active_segment:
     1099            self._active_segment.no_more_shares()
     1100
     1101    # things called by our Share instances
     1102
     1103    def validate_and_store_UEB(self, UEB_s):
     1104        h = hashutil.uri_extension_hash(UEB_s)
     1105        if h != self._verifycap.uri_extension_hash:
     1106            raise hashutil.BadHashError
     1107        UEB_dict = uri.unpack_extension(data)
     1108        self._parse_and_store_UEB(self, UEB_dict) # sets self._stuff
     1109        # TODO: a malformed (but authentic) UEB could throw an assertion in
     1110        # _parse_and_store_UEB, and we should abandon the download.
     1111        self.have_UEB = True
     1112        self._segsize_observers.fire(self.segment_size)
     1113        self._numsegs_observers.fire(self.num_segments)
     1114
     1115
     1116    def _parse_and_store_UEB(self, d):
     1117        # Note: the UEB contains needed_shares and total_shares. These are
     1118        # redundant and inferior (the filecap contains the authoritative
     1119        # values). However, because it is possible to encode the same file in
     1120        # multiple ways, and the encoders might choose (poorly) to use the
     1121        # same key for both (therefore getting the same SI), we might
     1122        # encounter shares for both types. The UEB hashes will be different,
     1123        # however, and we'll disregard the "other" encoding's shares as
     1124        # corrupted.
     1125
     1126        # therefore, we ignore d['total_shares'] and d['needed_shares'].
     1127
     1128        k, N = self._verifycap.needed_shares, self._verifycap.total_shares
     1129        size = self._verifycap.size
     1130
     1131        self.segment_size = d['segment_size']
     1132
     1133        r = self._calculate_sizes(self.segment_size, size, k)
     1134        self.tail_segment_size = r["tail_segment_size"]
     1135        self.tail_segment_padded = r["tail_segment_padded"]
     1136        self.num_segments = r["num_segments"]
     1137        self.block_size = r["block_size"]
     1138        self.tail_block_size = r["tail_block_size"]
     1139
     1140        # zfec.Decode() instantiation is fast, but still, let's use the same
     1141        # codec instance for all but the last segment. 3-of-10 takes 15us on
     1142        # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
     1143        # 2.5ms, worst-case 254-of-255 is 9.3ms
     1144        self._codec = codec.CRSDecoder()
     1145        self._codec.set_params(self.segment_size, k, N)
     1146
     1147
     1148        # Ciphertext hash tree root is mandatory, so that there is at most
     1149        # one ciphertext that matches this read-cap or verify-cap. The
     1150        # integrity check on the shares is not sufficient to prevent the
     1151        # original encoder from creating some shares of file A and other
     1152        # shares of file B.
     1153        self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
     1154        self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
     1155
     1156        self.share_hash_tree.set_hashes({0: d['share_root_hash']})
     1157
     1158        # crypttext_hash is optional. We only pull this from the first UEB
     1159        # that we see.
     1160        if 'crypttext_hash' in d:
     1161            if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE:
     1162                self.ciphertext_hash = d['crypttext_hash']
     1163            else:
     1164                log.msg("ignoring bad-length UEB[crypttext_hash], "
     1165                        "got %d bytes, want %d" % (len(d['crypttext_hash']),
     1166                                                   hashutil.CRYPTO_VAL_SIZE),
     1167                        umid="oZkGLA", level=log.WEIRD)
     1168
     1169        # Our job is a fast download, not verification, so we ignore any
     1170        # redundant fields. The Verifier uses a different code path which
     1171        # does not ignore them.
     1172
     1173    def _calculate_sizes(self, segment_size, size, k):
     1174        # segments of ciphertext
     1175
     1176        # this assert matches the one in encode.py:127 inside
     1177        # Encoded._got_all_encoding_parameters, where the UEB is constructed
     1178        assert segment_size % k == 0
     1179
     1180        # the last segment is usually short. We don't store a whole segsize,
     1181        # but we do pad the segment up to a multiple of k, because the
     1182        # encoder requires that.
     1183        tail_segment_size = size % segment_size
     1184        if tail_segment_size == 0:
     1185            tail_segment_size = segment_size
     1186        padded = mathutil.next_multiple(tail_segment_size, k)
     1187        tail_segment_padded = padded
     1188
     1189        num_segments = mathutil.div_ceil(size, segment_size)
     1190
     1191        # each segment is turned into N blocks. All but the last are of size
     1192        # block_size, and the last is of size tail_block_size
     1193        block_size = segment_size / k
     1194        tail_block_size = tail_segment_padded / k
     1195
     1196        return { "tail_segment_size": tail_segment_size,
     1197                 "tail_segment_padded": tail_segment_padded,
     1198                 "num_segments": num_segments,
     1199                 "block_size": block_size,
     1200                 "tail_block_size": tail_block_size,
     1201                 }
     1202
     1203
     1204    def process_share_hashes(self, share_hashes):
     1205        self.share_hash_tree.set_hashes(share_hashes)
     1206
     1207    # called by our child SegmentFetcher
     1208
     1209    def want_more_shares(self):
     1210        self._sharefinder.hungry()
     1211
     1212    def fetch_failed(self, sf, f):
     1213        assert sf is self._active_segment
     1214        sf.disownServiceParent()
     1215        self._active_segment = None
     1216        # deliver error upwards
     1217        for (d,c) in self._extract_requests(sf.segnum):
     1218            eventually(self._deliver_error, d, c, f)
     1219
     1220    def _deliver_error(self, d, c, f):
     1221        # this method exists to handle cancel() that occurs between
     1222        # _got_segment and _deliver_error
     1223        if not c.cancelled:
     1224            d.errback(f)
     1225
     1226    def process_blocks(self, segnum, blocks):
     1227        tail = (segnum == self.num_segments-1)
     1228        codec = self._codec
     1229        if tail:
     1230            # account for the padding in the last segment
     1231            codec = codec.CRSDecoder()
     1232            k, N = self._verifycap.needed_shares, self._verifycap.total_shares
     1233            codec.set_params(self.tail_segment_padded, k, N)
     1234
     1235        shares = []
     1236        shareids = []
     1237        for (shareid, share) in blocks.iteritems():
     1238            shareids.append(shareid)
     1239            shares.append(share)
     1240        del blocks
     1241        segment = codec.decode(shares, shareids)
     1242        del shares
     1243        if tail:
     1244            segment = segment[self.tail_segment_size:]
     1245        self._process_segment(segnum, segment)
     1246
     1247    def _process_segment(self, segnum, segment):
     1248        h = hashutil.crypttext_hash(segment)
     1249        try:
     1250            self.ciphertext_hash_tree.set_hashes(leaves={segnum, h})
     1251        except SOMETHING:
     1252            SOMETHING
     1253        assert self._active_segment.segnum == segnum
     1254        assert self.segment_size is not None
     1255        offset = segnum * self.segment_size
     1256        for (d,c) in self._extract_requests(segnum):
     1257            eventually(self._deliver, d, c, offset, segment)
     1258        self._active_segment = None
     1259        self._start_new_segment()
     1260
     1261    def _deliver(self, d, c, offset, segment):
     1262        # this method exists to handle cancel() that occurs between
     1263        # _got_segment and _deliver
     1264        if not c.cancelled:
     1265            d.callback((offset,segment))
     1266
     1267    def _extract_requests(self, segnum):
     1268        """Remove matching requests and return their (d,c) tuples so that the
     1269        caller can retire them."""
     1270        retire = [(d,c) for (segnum0, d, c) in self._segment_requests
     1271                  if segnum0 == segnum]
     1272        self._segment_requests = [t for t in self._segment_requests
     1273                                  if t[0] != segnum]
     1274        return retire
     1275
     1276    def _cancel_request(self, c):
     1277        self._segment_requests = [t for t in self._segment_requests
     1278                                  if t[2] != c]
     1279        segnums = [segnum for (segnum,d,c) in self._segment_requests]
     1280        if self._active_segment.segnum not in segnums:
     1281            self._active_segment.stop()
     1282            self._active_segment = None
     1283            self._start_new_segment()
     1284
     1285class CiphertextFileNode:
     1286    def __init__(self, verifycap, storage_broker, secret_holder,
     1287                 terminator, history):
     1288        assert isinstance(verifycap, CHKFileVerifierURI)
     1289        self._node = _Node(verifycap, storage_broker, secret_holder,
     1290                           terminator, history)
     1291
     1292    def read(self, consumer, offset=0, size=None):
     1293        """I am the main entry point, from which FileNode.read() can get
     1294        data. I feed the consumer with the desired range of ciphertext. I
     1295        return a Deferred that fires (with the consumer) when the read is
     1296        finished."""
     1297        return self._node.read(consumer, offset, size)
     1298
     1299    def get_segment(self, segnum):
     1300        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
     1301        Deferred that fires with (offset,data) when the desired segment is
     1302        available, and c is an object on which c.cancel() can be called to
     1303        disavow interest in the segment (after which 'd' will never fire).
     1304
     1305        You probably need to know the segment size before calling this,
     1306        unless you want the first few bytes of the file. If you ask for a
     1307        segment number which turns out to be too large, the Deferred will
     1308        errback with BadSegmentNumberError.
     1309
     1310        The Deferred fires with the offset of the first byte of the data
     1311        segment, so that you can call get_segment() before knowing the
     1312        segment size, and still know which data you received.
     1313        """
     1314        return self._node.get_segment(segnum)
     1315
     1316
     1317class DecryptingConsumer:
     1318    """I sit between a CiphertextDownloader (which acts as a Producer) and
     1319    the real Consumer, decrypting everything that passes by. The real
     1320    Consumer sees the real Producer, but the Producer sees us instead of the
     1321    real consumer."""
     1322    implements(IConsumer)
     1323
     1324    def __init__(self, consumer, readkey, offset):
     1325        self._consumer = consumer
     1326        # TODO: pycryptopp CTR-mode needs random-access operations: I want
     1327        # either a=AES(readkey, offset) or better yet both of:
     1328        #  a=AES(readkey, offset=0)
     1329        #  a.process(ciphertext, offset=xyz)
     1330        # For now, we fake it with the existing iv= argument.
     1331        offset_big = offset // 16
     1332        offset_small = offset % 16
     1333        iv = binascii.unhexlify("%032x" % offset_big)
     1334        self._decryptor = AES(readkey, iv=iv)
     1335        self._decryptor.process("\x00"*offset_small)
     1336
     1337    def registerProducer(self, producer):
     1338        # this passes through, so the real consumer can flow-control the real
     1339        # producer. Therefore we don't need to provide any IPushProducer
     1340        # methods. We implement all the IConsumer methods as pass-throughs,
     1341        # and only intercept write() to perform decryption.
     1342        self._consumer.registerProducer(producer)
     1343    def unregisterProducer(self):
     1344        self._consumer.unregisterProducer()
     1345    def write(self, ciphertext):
     1346        plaintext = self._decryptor.process(ciphertext)
     1347        self._consumer.write(plaintext)
     1348
     1349class ImmutableFileNode:
     1350    # I wrap a CiphertextFileNode with a decryption key
     1351    def __init__(self, filecap, storage_broker, secret_holder, downloader,
     1352                 history):
     1353        assert isinstance(filecap, CHKFileURI)
     1354        verifycap = filecap.get_verify_cap()
     1355        self._cnode = CiphertextFileNode(verifycap, storage_broker,
     1356                                         secret_holder, downloader, history)
     1357        assert isinstance(filecap, CHKFileURI)
     1358        self.u = filecap
     1359        # XXX self._readkey
     1360
     1361    def read(self, consumer, offset=0, size=None):
     1362        decryptor = DecryptingConsumer(consumer, self._readkey, offset)
     1363        return self._cnode.read(decryptor, offset, size)
     1364
     1365
     1366# TODO: if server1 has all shares, and server2-10 have one each, make the
     1367# loop stall slightly before requesting all shares from the first server, to
     1368# give it a chance to learn about the other shares and get some diversity.
     1369# Or, don't bother, let the first block all come from one server, and take
     1370# comfort in the fact that we'll learn about the other servers by the time we
     1371# fetch the second block.
     1372#
     1373# davidsarah points out that we could use sequential (instead of parallel)
     1374# fetching of multiple block from a single server: by the time the first
     1375# block arrives, we'll hopefully have heard about other shares. This would
     1376# induce some RTT delays (i.e. lose pipelining) in the case that this server
     1377# has the only shares, but that seems tolerable. We could rig it to only use
     1378# sequential requests on the first segment.
     1379
     1380# as a query gets later, we're more willing to duplicate work.
     1381
     1382# should change server read protocol to allow small shares to be fetched in a
     1383# single RTT. Instead of get_buckets-then-read, just use read(shnums, readv),
     1384# where shnums=[] means all shares, and the return value is a dict of
     1385# # shnum->ta (like with mutable files). The DYHB query should also fetch the
     1386# offset table, since everything else can be located once we have that.
     1387
     1388
     1389# ImmutableFileNode
     1390#    DecryptingConsumer
     1391#  CiphertextFileNode
     1392#    Segmentation
     1393#   ShareFinder
     1394#   SegmentFetcher[segnum] (one at a time)
     1395#   CommonShare[shnum]
     1396#   Share[shnum,server]
     1397
     1398# TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers
     1399# should be failed with BadSegmentNumberError. But should this be the
     1400# responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will
     1401# first appear when a Share receives a valid UEB and calls
     1402# CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is
     1403# expecting to hear from the Share, via the _block_request_activity observer.
     1404
     1405# make it the responsibility of the SegmentFetcher. Each Share that gets a
     1406# valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or
     1407# CORRUPT). The SegmentFetcher it then responsible for shutting down, and
     1408# informing its parent (the CiphertextFileNode) of the BadSegmentNumberError,
     1409# which is then passed to the client of get_segment().
     1410
     1411
     1412# TODO: if offset table is corrupt, attacker could cause us to fetch whole
     1413# (large) share
  • new file src/allmydata/immutable/download2_off.py

    diff --git a/src/allmydata/immutable/download2_off.py b/src/allmydata/immutable/download2_off.py
    new file mode 100755
    index 0000000..d2b8b99
    - +  
     1#! /usr/bin/python
     2
     3# known (shnum,Server) pairs are sorted into a list according to
     4# desireability. This sort is picking a winding path through a matrix of
     5# [shnum][server]. The goal is to get diversity of both shnum and server.
     6
     7# The initial order is:
     8#  find the lowest shnum on the first server, add it
     9#  look at the next server, find the lowest shnum that we don't already have
     10#   if any
     11#  next server, etc, until all known servers are checked
     12#  now look at servers that we skipped (because ...
     13
     14# Keep track of which block requests are outstanding by (shnum,Server). Don't
     15# bother prioritizing "validated" shares: the overhead to pull the share hash
     16# chain is tiny (4 hashes = 128 bytes), and the overhead to pull a new block
     17# hash chain is also tiny (1GB file, 8192 segments of 128KiB each, 13 hashes,
     18# 832 bytes). Each time a block request is sent, also request any necessary
     19# hashes. Don't bother with a "ValidatedShare" class (as distinct from some
     20# other sort of Share). Don't bother avoiding duplicate hash-chain requests.
     21
     22# For each outstanding segread, walk the list and send requests (skipping
     23# outstanding shnums) until requests for k distinct shnums are in flight. If
     24# we can't do that, ask for more. If we get impatient on a request, find the
     25# first non-outstanding
     26
     27# start with the first Share in the list, and send a request. Then look at
     28# the next one. If we already have a pending request for the same shnum or
     29# server, push that Share down onto the fallback list and try the next one,
     30# etc. If we run out of non-fallback shares, use the fallback ones,
     31# preferring shnums that we don't have outstanding requests for (i.e. assume
     32# that all requests will complete). Do this by having a second fallback list.
     33
     34# hell, I'm reviving the Herder. But remember, we're still talking 3 objects
     35# per file, not thousands.
     36
     37# actually, don't bother sorting the initial list. Append Shares as the
     38# responses come back, that will put the fastest servers at the front of the
     39# list, and give a tiny preference to servers that are earlier in the
     40# permuted order.
     41
     42# more ideas:
     43#  sort shares by:
     44#   1: number of roundtrips needed to get some data
     45#   2: share number
     46#   3: ms of RTT delay
     47# maybe measure average time-to-completion of requests, compare completion
     48# time against that, much larger indicates congestion on the server side
     49# or the server's upstream speed is less than our downstream. Minimum
     50# time-to-completion indicates min(our-downstream,their-upstream). Could
     51# fetch shares one-at-a-time to measure that better.
     52
     53# when should we risk duplicate work and send a new request?
     54
     55def walk(self):
     56    shares = sorted(list)
     57    oldshares = copy(shares)
     58    outstanding = list()
     59    fallbacks = list()
     60    second_fallbacks = list()
     61    while len(outstanding.nonlate.shnums) < k: # need more requests
     62        while oldshares:
     63            s = shares.pop(0)
     64            if s.server in outstanding.servers or s.shnum in outstanding.shnums:
     65                fallbacks.append(s)
     66                continue
     67            outstanding.append(s)
     68            send_request(s)
     69            break #'while need_more_requests'
     70        # must use fallback list. Ask for more servers while we're at it.
     71        ask_for_more_servers()
     72        while fallbacks:
     73            s = fallbacks.pop(0)
     74            if s.shnum in outstanding.shnums:
     75                # assume that the outstanding requests will complete, but
     76                # send new requests for other shnums to existing servers
     77                second_fallbacks.append(s)
     78                continue
     79            outstanding.append(s)
     80            send_request(s)
     81            break #'while need_more_requests'
     82        # if we get here, we're being forced to send out multiple queries per
     83        # share. We've already asked for more servers, which might help. If
     84        # there are no late outstanding queries, then duplicate shares won't
     85        # help. Don't send queries for duplicate shares until some of the
     86        # queries are late.
     87        if outstanding.late:
     88            # we're allowed to try any non-outstanding share
     89            while second_fallbacks:
     90                pass
     91    newshares = outstanding + fallbacks + second_fallbacks + oldshares
     92       
     93
     94class Server:
     95    """I represent an abstract Storage Server. One day, the StorageBroker
     96    will return instances of me. For now, the StorageBroker returns (peerid,
     97    RemoteReference) tuples, and this code wraps a Server instance around
     98    them.
     99    """
     100    def __init__(self, peerid, ss):
     101        self.peerid = peerid
     102        self.remote = ss
     103        self._remote_buckets = {} # maps shnum to RIBucketReader
     104        # TODO: release the bucket references on shares that we no longer
     105        # want. OTOH, why would we not want them? Corruption?
     106
     107    def send_query(self, storage_index):
     108        """I return a Deferred that fires with a set of shnums. If the server
     109        had shares available, I will retain the RemoteReferences to its
     110        buckets, so that get_data(shnum, range) can be called later."""
     111        d = self.remote.callRemote("get_buckets", self.storage_index)
     112        d.addCallback(self._got_response)
     113        return d
     114
     115    def _got_response(self, r):
     116        self._remote_buckets = r
     117        return set(r.keys())
     118
     119class ShareOnAServer:
     120    """I represent one instance of a share, known to live on a specific
     121    server. I am created every time a server responds affirmatively to a
     122    do-you-have-block query."""
     123
     124    def __init__(self, shnum, server):
     125        self._shnum = shnum
     126        self._server = server
     127        self._block_hash_tree = None
     128
     129    def cost(self, segnum):
     130        """I return a tuple of (roundtrips, bytes, rtt), indicating how
     131        expensive I think it would be to fetch the given segment. Roundtrips
     132        indicates how many roundtrips it is likely to take (one to get the
     133        data and hashes, plus one to get the offset table and UEB if this is
     134        the first segment we've ever fetched). 'bytes' is how many bytes we
     135        must fetch (estimated). 'rtt' is estimated round-trip time (float) in
     136        seconds for a trivial request. The downloading algorithm will compare
     137        costs to decide which shares should be used."""
     138        # the most significant factor here is roundtrips: a Share for which
     139        # we already have the offset table is better to than a brand new one
     140
     141    def max_bandwidth(self):
     142        """Return a float, indicating the highest plausible bytes-per-second
     143        that I've observed coming from this share. This will be based upon
     144        the minimum (bytes-per-fetch / time-per-fetch) ever observed. This
     145        can we used to estimate the server's upstream bandwidth. Clearly this
     146        is only accurate if a share is retrieved with no contention for
     147        either the upstream, downstream, or middle of the connection, but it
     148        may still serve as a useful metric for deciding which servers to pull
     149        from."""
     150
     151    def get_segment(self, segnum):
     152        """I return a Deferred that will fire with the segment data, or
     153        errback."""
     154
     155class NativeShareOnAServer(ShareOnAServer):
     156    """For tahoe native (foolscap) servers, I contain a RemoteReference to
     157    the RIBucketReader instance."""
     158    def __init__(self, shnum, server, rref):
     159        ShareOnAServer.__init__(self, shnum, server)
     160        self._rref = rref # RIBucketReader
     161
     162class Share:
     163    def __init__(self, shnum):
     164        self._shnum = shnum
     165        # _servers are the Server instances which appear to hold a copy of
     166        # this share. It is populated when the ValidShare is first created,
     167        # or when we receive a get_buckets() response for a shnum that
     168        # already has a ValidShare instance. When we lose the connection to a
     169        # server, we remove it.
     170        self._servers = set()
     171        # offsets, UEB, and share_hash_tree all live in the parent.
     172        # block_hash_tree lives here.
     173        self._block_hash_tree = None
     174
     175        self._want
     176
     177    def get_servers(self):
     178        return self._servers
     179
     180
     181    def get_block(self, segnum):
     182        # read enough data to obtain a single validated block
     183        if not self.have_offsets:
     184            # we get the offsets in their own read, since they tell us where
     185            # everything else lives. We must fetch offsets for each share
     186            # separately, since they aren't directly covered by the UEB.
     187            pass
     188        if not self.parent.have_ueb:
     189            # use _guessed_segsize to make a guess about the layout, so we
     190            # can fetch both the offset table and the UEB in the same read.
     191            # This also requires making a guess about the presence or absence
     192            # of the plaintext_hash_tree. Oh, and also the version number. Oh
     193            # well.
     194            pass
     195
     196class CiphertextDownloader:
     197    """I manage all downloads for a single file. I operate a state machine
     198    with input events that are local read() requests, responses to my remote
     199    'get_bucket' and 'read_bucket' messages, and connection establishment and
     200    loss. My outbound events are connection establishment requests and bucket
     201    read requests messages.
     202    """
     203    # eventually this will merge into the FileNode
     204    ServerClass = Server # for tests to override
     205
     206    def __init__(self, storage_index, ueb_hash, size, k, N, storage_broker,
     207                 shutdowner):
     208        # values we get from the filecap
     209        self._storage_index = si = storage_index
     210        self._ueb_hash = ueb_hash
     211        self._size = size
     212        self._needed_shares = k
     213        self._total_shares = N
     214        self._share_hash_tree = IncompleteHashTree(self._total_shares)
     215        # values we discover when we first fetch the UEB
     216        self._ueb = None # is dict after UEB fetch+validate
     217        self._segsize = None
     218        self._numsegs = None
     219        self._blocksize = None
     220        self._tail_segsize = None
     221        self._ciphertext_hash = None # optional
     222        # structures we create when we fetch the UEB, then continue to fill
     223        # as we download the file
     224        self._share_hash_tree = None # is IncompleteHashTree after UEB fetch
     225        self._ciphertext_hash_tree = None
     226
     227        # values we learn as we download the file
     228        self._offsets = {} # (shnum,Server) to offset table (dict)
     229        self._block_hash_tree = {} # shnum to IncompleteHashTree
     230        # other things which help us
     231        self._guessed_segsize = min(128*1024, size)
     232        self._active_share_readers = {} # maps shnum to Reader instance
     233        self._share_readers = [] # sorted by preference, best first
     234        self._readers = set() # set of Reader instances
     235        self._recent_horizon = 10 # seconds
     236
     237        # 'shutdowner' is a MultiService parent used to cancel all downloads
     238        # when the node is shutting down, to let tests have a clean reactor.
     239
     240        self._init_available_servers()
     241        self._init_find_enough_shares()
     242
     243    # _available_servers is an iterator that provides us with Server
     244    # instances. Each time we pull out a Server, we immediately send it a
     245    # query, so we don't need to keep track of who we've sent queries to.
     246
     247    def _init_available_servers(self):
     248        self._available_servers = self._get_available_servers()
     249        self._no_more_available_servers = False
     250
     251    def _get_available_servers(self):
     252        """I am a generator of servers to use, sorted by the order in which
     253        we should query them. I make sure there are no duplicates in this
     254        list."""
     255        # TODO: make StorageBroker responsible for this non-duplication, and
     256        # replace this method with a simple iter(get_servers_for_index()),
     257        # plus a self._no_more_available_servers=True
     258        seen = set()
     259        sb = self._storage_broker
     260        for (peerid, ss) in sb.get_servers_for_index(self._storage_index):
     261            if peerid not in seen:
     262                yield self.ServerClass(peerid, ss) # Server(peerid, ss)
     263                seen.add(peerid)
     264        self._no_more_available_servers = True
     265
     266    # this block of code is responsible for having enough non-problematic
     267    # distinct shares/servers available and ready for download, and for
     268    # limiting the number of queries that are outstanding. The idea is that
     269    # we'll use the k fastest/best shares, and have the other ones in reserve
     270    # in case those servers stop responding or respond too slowly. We keep
     271    # track of all known shares, but we also keep track of problematic shares
     272    # (ones with hash failures or lost connections), so we can put them at
     273    # the bottom of the list.
     274
     275    def _init_find_enough_shares(self):
     276        # _unvalidated_sharemap maps shnum to set of Servers, and remembers
     277        # where viable (but not yet validated) shares are located. Each
     278        # get_bucket() response adds to this map, each act of validation
     279        # removes from it.
     280        self._sharemap = DictOfSets()
     281
     282        # _sharemap maps shnum to set of Servers, and remembers where viable
     283        # shares are located. Each get_bucket() response adds to this map,
     284        # each hash failure or disconnect removes from it. (TODO: if we
     285        # disconnect but reconnect later, we should be allowed to re-query).
     286        self._sharemap = DictOfSets()
     287
     288        # _problem_shares is a set of (shnum, Server) tuples, and
     289
     290        # _queries_in_flight maps a Server to a timestamp, which remembers
     291        # which servers we've sent queries to (and when) but have not yet
     292        # heard a response. This lets us put a limit on the number of
     293        # outstanding queries, to limit the size of the work window (how much
     294        # extra work we ask servers to do in the hopes of keeping our own
     295        # pipeline filled). We remove a Server from _queries_in_flight when
     296        # we get an answer/error or we finally give up. If we ever switch to
     297        # a non-connection-oriented protocol (like UDP, or forwarded Chord
     298        # queries), we can use this information to retransmit any query that
     299        # has gone unanswered for too long.
     300        self._queries_in_flight = dict()
     301
     302    def _count_recent_queries_in_flight(self):
     303        now = time.time()
     304        recent = now - self._recent_horizon
     305        return len([s for (s,when) in self._queries_in_flight.items()
     306                    if when > recent])
     307
     308    def _find_enough_shares(self):
     309        # goal: have 2*k distinct not-invalid shares available for reading,
     310        # from 2*k distinct servers. Do not have more than 4*k "recent"
     311        # queries in flight at a time.
     312        if (len(self._sharemap) >= 2*self._needed_shares
     313            and len(self._sharemap.values) >= 2*self._needed_shares):
     314            return
     315        num = self._count_recent_queries_in_flight()
     316        while num < 4*self._needed_shares:
     317            try:
     318                s = self._available_servers.next()
     319            except StopIteration:
     320                return # no more progress can be made
     321            self._queries_in_flight[s] = time.time()
     322            d = s.send_query(self._storage_index)
     323            d.addBoth(incidentally, self._queries_in_flight.discard, s)
     324            d.addCallbacks(lambda shnums: [self._sharemap.add(shnum, s)
     325                                           for shnum in shnums],
     326                           lambda f: self._query_error(f, s))
     327            d.addErrback(self._error)
     328            d.addCallback(self._reschedule)
     329            num += 1
     330
     331    def _query_error(self, f, s):
     332        # a server returned an error, log it gently and ignore
     333        level = log.WEIRD
     334        if f.check(DeadReferenceError):
     335            level = log.UNUSUAL
     336        log.msg("Error during get_buckets to server=%(server)s", server=str(s),
     337                failure=f, level=level, umid="3uuBUQ")
     338
     339    # this block is responsible for turning known shares into usable shares,
     340    # by fetching enough data to validate their contents.
     341
     342    # UEB (from any share)
     343    # share hash chain, validated (from any share, for given shnum)
     344    # block hash (any share, given shnum)
     345
     346    def _got_ueb(self, ueb_data, share):
     347        if self._ueb is not None:
     348            return
     349        if hashutil.uri_extension_hash(ueb_data) != self._ueb_hash:
     350            share.error("UEB hash does not match")
     351            return
     352        d = uri.unpack_extension(ueb_data)
     353        self.share_size = mathutil.div_ceil(self._size, self._needed_shares)
     354
     355
     356        # There are several kinds of things that can be found in a UEB.
     357        # First, things that we really need to learn from the UEB in order to
     358        # do this download. Next: things which are optional but not redundant
     359        # -- if they are present in the UEB they will get used. Next, things
     360        # that are optional and redundant. These things are required to be
     361        # consistent: they don't have to be in the UEB, but if they are in
     362        # the UEB then they will be checked for consistency with the
     363        # already-known facts, and if they are inconsistent then an exception
     364        # will be raised. These things aren't actually used -- they are just
     365        # tested for consistency and ignored. Finally: things which are
     366        # deprecated -- they ought not be in the UEB at all, and if they are
     367        # present then a warning will be logged but they are otherwise
     368        # ignored.
     369
     370        # First, things that we really need to learn from the UEB:
     371        # segment_size, crypttext_root_hash, and share_root_hash.
     372        self._segsize = d['segment_size']
     373
     374        self._blocksize = mathutil.div_ceil(self._segsize, self._needed_shares)
     375        self._numsegs = mathutil.div_ceil(self._size, self._segsize)
     376
     377        self._tail_segsize = self._size % self._segsize
     378        if self._tail_segsize == 0:
     379            self._tail_segsize = self._segsize
     380        # padding for erasure code
     381        self._tail_segsize = mathutil.next_multiple(self._tail_segsize,
     382                                                    self._needed_shares)
     383
     384        # Ciphertext hash tree root is mandatory, so that there is at most
     385        # one ciphertext that matches this read-cap or verify-cap. The
     386        # integrity check on the shares is not sufficient to prevent the
     387        # original encoder from creating some shares of file A and other
     388        # shares of file B.
     389        self._ciphertext_hash_tree = IncompleteHashTree(self._numsegs)
     390        self._ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
     391
     392        self._share_hash_tree.set_hashes({0: d['share_root_hash']})
     393
     394
     395        # Next: things that are optional and not redundant: crypttext_hash
     396        if 'crypttext_hash' in d:
     397            if len(self._ciphertext_hash) == hashutil.CRYPTO_VAL_SIZE:
     398                self._ciphertext_hash = d['crypttext_hash']
     399            else:
     400                log.msg("ignoring bad-length UEB[crypttext_hash], "
     401                        "got %d bytes, want %d" % (len(d['crypttext_hash']),
     402                                                   hashutil.CRYPTO_VAL_SIZE),
     403                        umid="oZkGLA", level=log.WEIRD)
     404
     405        # we ignore all of the redundant fields when downloading. The
     406        # Verifier uses a different code path which does not ignore them.
     407
     408        # finally, set self._ueb as a marker that we don't need to request it
     409        # anymore
     410        self._ueb = d
     411
     412    def _got_share_hashes(self, hashes, share):
     413        assert isinstance(hashes, dict)
     414        try:
     415            self._share_hash_tree.set_hashes(hashes)
     416        except (IndexError, BadHashError, NotEnoughHashesError), le:
     417            share.error("Bad or missing hashes")
     418            return
     419
     420    #def _got_block_hashes(
     421
     422    def _init_validate_enough_shares(self):
     423        # _valid_shares maps shnum to ValidatedShare instances, and is
     424        # populated once the block hash root has been fetched and validated
     425        # (which requires any valid copy of the UEB, and a valid copy of the
     426        # share hash chain for each shnum)
     427        self._valid_shares = {}
     428
     429        # _target_shares is an ordered list of ReadyShare instances, each of
     430        # which is a (shnum, server) tuple. It is sorted in order of
     431        # preference: we expect to get the fastest response from the
     432        # ReadyShares at the front of the list. It is also sorted to
     433        # distribute the shnums, so that fetching shares from
     434        # _target_shares[:k] is likely (but not guaranteed) to give us k
     435        # distinct shares. The rule is that we skip over entries for blocks
     436        # that we've already received, limit the number of recent queries for
     437        # the same block,
     438        self._target_shares = []
     439
     440    def _validate_enough_shares(self):
     441        # my goal is to have at least 2*k distinct validated shares from at
     442        # least 2*k distinct servers
     443        valid_share_servers = set()
     444        for vs in self._valid_shares.values():
     445            valid_share_servers.update(vs.get_servers())
     446        if (len(self._valid_shares) >= 2*self._needed_shares
     447            and len(self._valid_share_servers) >= 2*self._needed_shares):
     448            return
     449        #for
     450
     451    def _reschedule(self, _ign):
     452        # fire the loop again
     453        if not self._scheduled:
     454            self._scheduled = True
     455            eventually(self._loop)
     456
     457    def _loop(self):
     458        self._scheduled = False
     459        # what do we need?
     460
     461        self._find_enough_shares()
     462        self._validate_enough_shares()
     463
     464        if not self._ueb:
     465            # we always need a copy of the UEB
     466            pass
     467
     468    def _error(self, f):
     469        # this is an unexpected error: a coding bug
     470        log.err(f, level=log.UNUSUAL)
     471           
     472
     473
     474# using a single packed string (and an offset table) may be an artifact of
     475# our native storage server: other backends might allow cheap multi-part
     476# files (think S3, several buckets per share, one for each section).
     477
     478# find new names for:
     479#  data_holder
     480#  Share / Share2  (ShareInstance / Share? but the first is more useful)
     481
     482class IShare(Interface):
     483    """I represent a single instance of a single share (e.g. I reference the
     484    shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
     485    This interface is used by SegmentFetcher to retrieve validated blocks.
     486    """
     487    def get_block(segnum):
     488        """Return an Observer2, which will be notified with the following
     489        events:
     490         state=COMPLETE, block=data (terminal): validated block data
     491         state=OVERDUE (non-terminal): we have reason to believe that the
     492                                       request might have stalled, or we
     493                                       might just be impatient
     494         state=CORRUPT (terminal): the data we received was corrupt
     495         state=DEAD (terminal): the connection has failed
     496        """
     497
     498
     499# it'd be nice if we receive the hashes before the block, or just
     500# afterwards, so we aren't stuck holding on to unvalidated blocks
     501# that we can't process. If we guess the offsets right, we can
     502# accomplish this by sending the block request after the metadata
     503# requests (by keeping two separate requestlists), and have a one RTT
     504# pipeline like:
     505#  1a=metadata, 1b=block
     506#  1b->process+deliver : one RTT
     507
     508# But if we guess wrong, and fetch the wrong part of the block, we'll
     509# have a pipeline that looks like:
     510#  1a=wrong metadata, 1b=wrong block
     511#  1a->2a=right metadata,2b=right block
     512#  2b->process+deliver
     513# which means two RTT and buffering one block (which, since we'll
     514# guess the segsize wrong for everything, means buffering one
     515# segment)
     516
     517# if we start asking for multiple segments, we could get something
     518# worse:
     519#  1a=wrong metadata, 1b=wrong block0, 1c=wrong block1, ..
     520#  1a->2a=right metadata,2b=right block0,2c=right block1, .
     521#  2b->process+deliver
     522
     523# which means two RTT but fetching and buffering the whole file
     524# before delivering anything. However, since we don't know when the
     525# other shares are going to arrive, we need to avoid having more than
     526# one block in the pipeline anyways. So we shouldn't be able to get
     527# into this state.
     528
     529# it also means that, instead of handling all of
     530# self._requested_blocks at once, we should only be handling one
     531# block at a time: one of the requested block should be special
     532# (probably FIFO). But retire all we can.
     533
     534    # this might be better with a Deferred, using COMPLETE as the success
     535    # case and CORRUPT/DEAD in an errback, because that would let us hold the
     536    # 'share' and 'shnum' arguments locally (instead of roundtripping them
     537    # through Share.send_request). But that OVERDUE is not terminal. So I
     538    # want a new sort of callback mechanism, with the extra-argument-passing
     539    # aspects of Deferred, but without being so one-shot. Is this a job for
     540    # Observer? No, it doesn't take extra arguments. So this uses Observer2.
     541
     542
     543class Reader:
     544    """I am responsible for a single offset+size read of the file. I handle
     545    segmentation: I figure out which segments are necessary, request them
     546    (from my CiphertextDownloader) in order, and trim the segments down to
     547    match the offset+size span. I use the Producer/Consumer interface to only
     548    request one segment at a time.
     549    """
     550    implements(IPushProducer)
     551    def __init__(self, consumer, offset, size):
     552        self._needed = []
     553        self._consumer = consumer
     554        self._hungry = False
     555        self._offset = offset
     556        self._size = size
     557        self._segsize = None
     558    def start(self):
     559        self._alive = True
     560        self._deferred = defer.Deferred()
     561        # the process doesn't actually start until set_segment_size()
     562        return self._deferred
     563
     564    def set_segment_size(self, segsize):
     565        if self._segsize is not None:
     566            return
     567        self._segsize = segsize
     568        self._compute_segnums()
     569
     570    def _compute_segnums(self, segsize):
     571        # now that we know the file's segsize, what segments (and which
     572        # ranges of each) will we need?
     573        size = self._size
     574        offset = self._offset
     575        while size:
     576            assert size >= 0
     577            this_seg_num = int(offset / self._segsize)
     578            this_seg_offset = offset - (seg_num*self._segsize)
     579            this_seg_size = min(size, self._segsize-seg_offset)
     580            size -= this_seg_size
     581            if size:
     582                offset += this_seg_size
     583            yield (this_seg_num, this_seg_offset, this_seg_size)
     584
     585    def get_needed_segments(self):
     586        return set([segnum for (segnum, off, size) in self._needed])
     587
     588
     589    def stopProducing(self):
     590        self._hungry = False
     591        self._alive = False
     592        # TODO: cancel the segment requests
     593    def pauseProducing(self):
     594        self._hungry = False
     595    def resumeProducing(self):
     596        self._hungry = True
     597    def add_segment(self, segnum, offset, size):
     598        self._needed.append( (segnum, offset, size) )
     599    def got_segment(self, segnum, segdata):
     600        """Return True if this schedule has more to go, or False if it is
     601        done."""
     602        assert self._needed[0][segnum] == segnum
     603        (_ign, offset, size) = self._needed.pop(0)
     604        data = segdata[offset:offset+size]
     605        self._consumer.write(data)
     606        if not self._needed:
     607            # we're done
     608            self._alive = False
     609            self._hungry = False
     610            self._consumer.unregisterProducer()
     611            self._deferred.callback(self._consumer)
     612    def error(self, f):
     613        self._alive = False
     614        self._hungry = False
     615        self._consumer.unregisterProducer()
     616        self._deferred.errback(f)
     617
     618
     619
     620class x:
     621    def OFFread(self, consumer, offset=0, size=None):
     622        """I am the main entry point, from which FileNode.read() can get
     623        data."""
     624        # tolerate concurrent operations: each gets its own Reader
     625        if size is None:
     626            size = self._size - offset
     627        r = Reader(consumer, offset, size)
     628        self._readers.add(r)
     629        d = r.start()
     630        if self.segment_size is not None:
     631            r.set_segment_size(self.segment_size)
     632            # TODO: if we can't find any segments, and thus never get a
     633            # segsize, tell the Readers to give up
     634        return d
  • new file src/allmydata/immutable/download2_util.py

    diff --git a/src/allmydata/immutable/download2_util.py b/src/allmydata/immutable/download2_util.py
    new file mode 100755
    index 0000000..32a0ad4
    - +  
     1
     2import weakref
     3
     4class Observer2:
     5    """A simple class to distribute multiple events to a single subscriber.
     6    It accepts arbitrary kwargs, but no posargs."""
     7    def __init__(self):
     8        self._watcher = None
     9        self._undelivered_results = []
     10        self._canceler = None
     11
     12    def set_canceler(self, f):
     13        # we use a weakref to avoid creating a cycle between us and the thing
     14        # we're observing: they'll be holding a reference to us to compare
     15        # against the value we pass to their canceler function.
     16        self._canceler = weakref(f)
     17
     18    def subscribe(self, observer, **watcher_kwargs):
     19        self._watcher = (observer, watcher_kwargs)
     20        while self._undelivered_results:
     21            self._notify(self._undelivered_results.pop(0))
     22
     23    def notify(self, **result_kwargs):
     24        if self._watcher:
     25            self._notify(result_kwargs)
     26        else:
     27            self._undelivered_results.append(result_kwargs)
     28
     29    def _notify(self, result_kwargs):
     30        o, watcher_kwargs = self._watcher
     31        kwargs = dict(result_kwargs)
     32        kwargs.update(watcher_kwargs)
     33        eventually(o, **kwargs)
     34
     35    def cancel(self):
     36        f = self._canceler()
     37        if f:
     38            f(self)
     39
     40class DictOfSets:
     41    def add(self, key, value): pass # XXX
     42    def values(self): # return set that merges all value sets
     43        r = set()
     44        for key in self:
     45            r.update(self[key])
     46        return r
     47
     48
     49def incidentally(res, f, *args, **kwargs):
     50    """Add me to a Deferred chain like this:
     51     d.addBoth(incidentally, func, arg)
     52    and I'll behave as if you'd added the following function:
     53     def _(res):
     54         func(arg)
     55         return res
     56    This is useful if you want to execute an expression when the Deferred
     57    fires, but don't care about its value.
     58    """
     59    f(*args, **kwargs)
     60    return res
     61
     62
     63import weakref
     64class Terminator(service.Service):
     65    def __init__(self):
     66        service.Service.__init__(self)
     67        self._clients = weakref.WeakKeyDictionary()
     68    def register(self, c):
     69        self._clients[c] = None
     70    def stopService(self):
     71        for c in self._clients:
     72            c.stop()
     73        return service.Service.stopService(self)
  • src/allmydata/test/test_util.py

    diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py
    index 0a326b3..5f6ce67 100644
    a b from twisted.trial import unittest 
    77from twisted.internet import defer, reactor
    88from twisted.python.failure import Failure
    99from twisted.python import log
     10from hashlib import md5
    1011
    1112from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
    1213from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
    1314from allmydata.util import limiter, time_format, pollmixin, cachedir
    1415from allmydata.util import statistics, dictutil, pipeline
    1516from allmydata.util import log as tahoe_log
     17from allmydata.util.spans import Spans, overlap, DataSpans
    1618
    1719class Base32(unittest.TestCase):
    1820    def test_b2a_matches_Pythons(self):
    class Log(unittest.TestCase): 
    15371539        tahoe_log.err(format="intentional sample error",
    15381540                      failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
    15391541        self.flushLoggedErrors(SampleError)
     1542
     1543
     1544class SimpleSpans:
     1545    # this is a simple+inefficient form of util.spans.Spans . We compare the
     1546    # behavior of this reference model against the real (efficient) form.
     1547
     1548    def __init__(self, _span_or_start=None, length=None):
     1549        self._have = set()
     1550        if length is not None:
     1551            for i in range(_span_or_start, _span_or_start+length):
     1552                self._have.add(i)
     1553        elif _span_or_start:
     1554            for (start,length) in _span_or_start:
     1555                self.add(start, length)
     1556
     1557    def add(self, start, length):
     1558        for i in range(start, start+length):
     1559            self._have.add(i)
     1560        return self
     1561
     1562    def remove(self, start, length):
     1563        for i in range(start, start+length):
     1564            self._have.discard(i)
     1565        return self
     1566
     1567    def each(self):
     1568        return sorted(self._have)
     1569
     1570    def __iter__(self):
     1571        items = sorted(self._have)
     1572        prevstart = None
     1573        prevend = None
     1574        for i in items:
     1575            if prevstart is None:
     1576                prevstart = prevend = i
     1577                continue
     1578            if i == prevend+1:
     1579                prevend = i
     1580                continue
     1581            yield (prevstart, prevend-prevstart+1)
     1582            prevstart = prevend = i
     1583        if prevstart is not None:
     1584            yield (prevstart, prevend-prevstart+1)
     1585
     1586    def __len__(self):
     1587        # this also gets us bool(s)
     1588        return len(self._have)
     1589
     1590    def __add__(self, other):
     1591        s = self.__class__(self)
     1592        for (start, length) in other:
     1593            s.add(start, length)
     1594        return s
     1595
     1596    def __sub__(self, other):
     1597        s = self.__class__(self)
     1598        for (start, length) in other:
     1599            s.remove(start, length)
     1600        return s
     1601
     1602    def __iadd__(self, other):
     1603        for (start, length) in other:
     1604            self.add(start, length)
     1605        return self
     1606
     1607    def __isub__(self, other):
     1608        for (start, length) in other:
     1609            self.remove(start, length)
     1610        return self
     1611
     1612    def __contains__(self, (start,length)):
     1613        for i in range(start, start+length):
     1614            if i not in self._have:
     1615                return False
     1616        return True
     1617
     1618class ByteSpans(unittest.TestCase):
     1619    def test_basic(self):
     1620        s = Spans()
     1621        self.failUnlessEqual(list(s), [])
     1622        self.failIf(s)
     1623        self.failIf((0,1) in s)
     1624        self.failUnlessEqual(len(s), 0)
     1625
     1626        s1 = Spans(3, 4) # 3,4,5,6
     1627        self._check1(s1)
     1628
     1629        s2 = Spans(s1)
     1630        self._check1(s2)
     1631
     1632        s2.add(10,2) # 10,11
     1633        self._check1(s1)
     1634        self.failUnless((10,1) in s2)
     1635        self.failIf((10,1) in s1)
     1636        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
     1637        self.failUnlessEqual(len(s2), 6)
     1638
     1639        s2.add(15,2).add(20,2)
     1640        self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
     1641        self.failUnlessEqual(len(s2), 10)
     1642
     1643        s2.remove(4,3).remove(15,1)
     1644        self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
     1645        self.failUnlessEqual(len(s2), 6)
     1646
     1647    def _check1(self, s):
     1648        self.failUnlessEqual(list(s), [(3,4)])
     1649        self.failUnless(s)
     1650        self.failUnlessEqual(len(s), 4)
     1651        self.failIf((0,1) in s)
     1652        self.failUnless((3,4) in s)
     1653        self.failUnless((3,1) in s)
     1654        self.failUnless((5,2) in s)
     1655        self.failUnless((6,1) in s)
     1656        self.failIf((6,2) in s)
     1657        self.failIf((7,1) in s)
     1658        self.failUnlessEqual(list(s.each()), [3,4,5,6])
     1659
     1660    def test_math(self):
     1661        s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
     1662        s2 = Spans(5, 3) # 5,6,7
     1663        s3 = Spans(8, 4) # 8,9,10,11
     1664
     1665        s = s1 - s2
     1666        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
     1667        s = s1 - s3
     1668        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
     1669        s = s2 - s3
     1670        self.failUnlessEqual(list(s.each()), [5,6,7])
     1671
     1672        s = s1 + s2
     1673        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
     1674        s = s1 + s3
     1675        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
     1676        s = s2 + s3
     1677        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
     1678
     1679        s = Spans(s1)
     1680        s -= s2
     1681        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
     1682        s = Spans(s1)
     1683        s -= s3
     1684        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
     1685        s = Spans(s2)
     1686        s -= s3
     1687        self.failUnlessEqual(list(s.each()), [5,6,7])
     1688
     1689        s = Spans(s1)
     1690        s += s2
     1691        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
     1692        s = Spans(s1)
     1693        s += s3
     1694        self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
     1695        s = Spans(s2)
     1696        s += s3
     1697        self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
     1698
     1699    def test_random(self):
     1700        # attempt to increase coverage of corner cases by comparing behavior
     1701        # of a simple-but-slow model implementation against the
     1702        # complex-but-fast actual implementation, in a large number of random
     1703        # operations
     1704        S1 = SimpleSpans
     1705        S2 = Spans
     1706        s1 = S1(); s2 = S2()
     1707        seed = ""
     1708        def _create(subseed):
     1709            ns1 = S1(); ns2 = S2()
     1710            for i in range(10):
     1711                what = md5(subseed+str(i)).hexdigest()
     1712                start = int(what[2:4], 16)
     1713                length = max(1,int(what[5:6], 16))
     1714                ns1.add(start, length); ns2.add(start, length)
     1715            return ns1, ns2
     1716
     1717        #print
     1718        for i in range(1000):
     1719            what = md5(seed+str(i)).hexdigest()
     1720            op = what[0]
     1721            subop = what[1]
     1722            start = int(what[2:4], 16)
     1723            length = max(1,int(what[5:6], 16))
     1724            #print what
     1725            if op in "0":
     1726                if subop in "01234":
     1727                    s1 = S1(); s2 = S2()
     1728                elif subop in "5678":
     1729                    s1 = S1(start, length); s2 = S2(start, length)
     1730                else:
     1731                    s1 = S1(s1); s2 = S2(s2)
     1732                #print "s2 = %s" % s2.dump()
     1733            elif op in "123":
     1734                #print "s2.add(%d,%d)" % (start, length)
     1735                s1.add(start, length); s2.add(start, length)
     1736            elif op in "456":
     1737                #print "s2.remove(%d,%d)" % (start, length)
     1738                s1.remove(start, length); s2.remove(start, length)
     1739            elif op in "78":
     1740                ns1, ns2 = _create(what[7:11])
     1741                #print "s2 + %s" % ns2.dump()
     1742                s1 = s1 + ns1; s2 = s2 + ns2
     1743            elif op in "9a":
     1744                ns1, ns2 = _create(what[7:11])
     1745                #print "%s - %s" % (s2.dump(), ns2.dump())
     1746                s1 = s1 - ns1; s2 = s2 - ns2
     1747            elif op in "bc":
     1748                ns1, ns2 = _create(what[7:11])
     1749                #print "s2 += %s" % ns2.dump()
     1750                s1 += ns1; s2 += ns2
     1751            else:
     1752                ns1, ns2 = _create(what[7:11])
     1753                #print "%s -= %s" % (s2.dump(), ns2.dump())
     1754                s1 -= ns1; s2 -= ns2
     1755            #print "s2 now %s" % s2.dump()
     1756            self.failUnlessEqual(list(s1.each()), list(s2.each()))
     1757            self.failUnlessEqual(len(s1), len(s2))
     1758            self.failUnlessEqual(bool(s1), bool(s2))
     1759            self.failUnlessEqual(list(s1), list(s2))
     1760            for j in range(10):
     1761                what = md5(what[12:14]+str(j)).hexdigest()
     1762                start = int(what[2:4], 16)
     1763                length = max(1, int(what[5:6], 16))
     1764                span = (start, length)
     1765                self.failUnlessEqual(bool(span in s1), bool(span in s2))
     1766
     1767
     1768    # s()
     1769    # s(start,length)
     1770    # s(s0)
     1771    # s.add(start,length) : returns s
     1772    # s.remove(start,length)
     1773    # s.each() -> list of byte offsets, mostly for testing
     1774    # list(s) -> list of (start,length) tuples, one per span
     1775    # (start,length) in s -> True if (start..start+length-1) are all members
     1776    #  NOT equivalent to x in list(s)
     1777    # len(s) -> number of bytes, for testing, bool(), and accounting/limiting
     1778    # bool(s)  (__len__)
     1779    # s = s1+s2, s1-s2, +=s1, -=s1
     1780
     1781    def test_overlap(self):
     1782        for a in range(20):
     1783            for b in range(10):
     1784                for c in range(20):
     1785                    for d in range(10):
     1786                        self._test_overlap(a,b,c,d)
     1787
     1788    def _test_overlap(self, a, b, c, d):
     1789        s1 = set(range(a,a+b))
     1790        s2 = set(range(c,c+d))
     1791        #print "---"
     1792        #self._show_overlap(s1, "1")
     1793        #self._show_overlap(s2, "2")
     1794        o = overlap(a,b,c,d)
     1795        expected = s1.intersection(s2)
     1796        if not expected:
     1797            self.failUnlessEqual(o, None)
     1798        else:
     1799            start,length = o
     1800            so = set(range(start,start+length))
     1801            #self._show(so, "o")
     1802            self.failUnlessEqual(so, expected)
     1803
     1804    def _show_overlap(self, s, c):
     1805        import sys
     1806        out = sys.stdout
     1807        if s:
     1808            for i in range(max(s)):
     1809                if i in s:
     1810                    out.write(c)
     1811                else:
     1812                    out.write(" ")
     1813        out.write("\n")
     1814
     1815def extend(s, start, length, fill):
     1816    if len(s) >= start+length:
     1817        return s
     1818    assert len(fill) == 1
     1819    return s + fill*(start+length-len(s))
     1820
     1821def replace(s, start, data):
     1822    assert len(s) >= start+len(data)
     1823    return s[:start] + data + s[start+len(data):]
     1824
     1825class SimpleDataSpans:
     1826    def __init__(self, other=None):
     1827        self.missing = "" # "1" where missing, "0" where found
     1828        self.data = ""
     1829        if other:
     1830            for (start, data) in other.get_spans():
     1831                self.add(start, data)
     1832
     1833    def __len__(self):
     1834        return len(self.missing.translate(None, "1"))
     1835    def _dump(self):
     1836        return [i for (i,c) in enumerate(self.missing) if c == "0"]
     1837    def _have(self, start, length):
     1838        m = self.missing[start:start+length]
     1839        if not m or len(m)<length or int(m):
     1840            return False
     1841        return True
     1842    def get_spans(self):
     1843        for i in self._dump():
     1844            yield (i, self.data[i])
     1845    def get(self, start, length):
     1846        if self._have(start, length):
     1847            return self.data[start:start+length]
     1848        return None
     1849    def pop(self, start, length):
     1850        data = self.get(start, length)
     1851        if data:
     1852            self.remove(start, length)
     1853        return data
     1854    def remove(self, start, length):
     1855        self.missing = replace(extend(self.missing, start, length, "1"),
     1856                               start, "1"*length)
     1857    def add(self, start, data):
     1858        self.missing = replace(extend(self.missing, start, len(data), "1"),
     1859                               start, "0"*len(data))
     1860        self.data = replace(extend(self.data, start, len(data), " "),
     1861                            start, data)
     1862
     1863
     1864class StringSpans(unittest.TestCase):
     1865    def do_basic(self, klass):
     1866        ds = klass()
     1867        self.failUnlessEqual(len(ds), 0)
     1868        self.failUnlessEqual(list(ds._dump()), [])
     1869        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 0)
     1870        self.failUnlessEqual(ds.get(0, 4), None)
     1871        self.failUnlessEqual(ds.pop(0, 4), None)
     1872        ds.remove(0, 4)
     1873
     1874        ds.add(2, "four")
     1875        self.failUnlessEqual(len(ds), 4)
     1876        self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
     1877        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4)
     1878        self.failUnlessEqual(ds.get(0, 4), None)
     1879        self.failUnlessEqual(ds.pop(0, 4), None)
     1880        self.failUnlessEqual(ds.get(4, 4), None)
     1881
     1882        ds2 = klass(ds)
     1883        self.failUnlessEqual(len(ds2), 4)
     1884        self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
     1885        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 4)
     1886        self.failUnlessEqual(ds2.get(0, 4), None)
     1887        self.failUnlessEqual(ds2.pop(0, 4), None)
     1888        self.failUnlessEqual(ds2.pop(2, 3), "fou")
     1889        self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 1)
     1890        self.failUnlessEqual(ds2.get(2, 3), None)
     1891        self.failUnlessEqual(ds2.get(5, 1), "r")
     1892        self.failUnlessEqual(ds.get(2, 3), "fou")
     1893        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4)
     1894
     1895        ds.add(0, "23")
     1896        self.failUnlessEqual(len(ds), 6)
     1897        self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
     1898        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 6)
     1899        self.failUnlessEqual(ds.get(0, 4), "23fo")
     1900        self.failUnlessEqual(ds.pop(0, 4), "23fo")
     1901        self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 2)
     1902        self.failUnlessEqual(ds.get(0, 4), None)
     1903        self.failUnlessEqual(ds.pop(0, 4), None)
     1904
     1905        ds = klass()
     1906        ds.add(2, "four")
     1907        ds.add(3, "ea")
     1908        self.failUnlessEqual(ds.get(2, 4), "fear")
     1909
     1910    def do_scan(self, klass):
     1911        # do a test with gaps and spans of size 1 and 2
     1912        #  left=(1,11) * right=(1,11) * gapsize=(1,2)
     1913        # 111, 112, 121, 122, 211, 212, 221, 222
     1914        #    211
     1915        #      121
     1916        #         112
     1917        #            212
     1918        #               222
     1919        #                   221
     1920        #                      111
     1921        #                        122
     1922        #  11 1  1 11 11  11  1 1  111
     1923        # 0123456789012345678901234567
     1924        # abcdefghijklmnopqrstuvwxyz-=
     1925        pieces = [(1, "bc"),
     1926                  (4, "e"),
     1927                  (7, "h"),
     1928                  (9, "jk"),
     1929                  (12, "mn"),
     1930                  (16, "qr"),
     1931                  (20, "u"),
     1932                  (22, "w"),
     1933                  (25, "z-="),
     1934                  ]
     1935        p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
     1936        S = "abcdefghijklmnopqrstuvwxyz-="
     1937        # TODO: when adding data, add capital letters, to make sure we aren't
     1938        # just leaving the old data in place
     1939        l = len(S)
     1940        def base():
     1941            ds = klass()
     1942            for start, data in pieces:
     1943                ds.add(start, data)
     1944            return ds
     1945        def dump(s):
     1946            p = set(s._dump())
     1947            # wow, this is the first time I've ever wanted ?: in python
     1948            # note: this requires python2.5
     1949            d = "".join([(S[i] if i in p else " ") for i in range(l)])
     1950            assert len(d) == l
     1951            return d
     1952        DEBUG = False
     1953        for start in range(0, l):
     1954            for end in range(start+1, l):
     1955                # add [start-end) to the baseline
     1956                which = "%d-%d" % (start, end-1)
     1957                p_added = set(range(start, end))
     1958                b = base()
     1959                if DEBUG:
     1960                    print
     1961                    print dump(b), which
     1962                    add = klass(); add.add(start, S[start:end])
     1963                    print dump(add)
     1964                b.add(start, S[start:end])
     1965                if DEBUG:
     1966                    print dump(b)
     1967                # check that the new span is there
     1968                d = b.get(start, end-start)
     1969                self.failUnlessEqual(d, S[start:end], which)
     1970                # check that all the original pieces are still there
     1971                for t_start, t_data in pieces:
     1972                    t_len = len(t_data)
     1973                    self.failUnlessEqual(b.get(t_start, t_len),
     1974                                         S[t_start:t_start+t_len],
     1975                                         "%s %d+%d" % (which, t_start, t_len))
     1976                # check that a lot of subspans are mostly correct
     1977                for t_start in range(l):
     1978                    for t_len in range(1,4):
     1979                        d = b.get(t_start, t_len)
     1980                        if d is not None:
     1981                            which2 = "%s+(%d-%d)" % (which, t_start,
     1982                                                     t_start+t_len-1)
     1983                            self.failUnlessEqual(d, S[t_start:t_start+t_len],
     1984                                                 which2)
     1985                        # check that removing a subspan gives the right value
     1986                        b2 = klass(b)
     1987                        b2.remove(t_start, t_len)
     1988                        removed = set(range(t_start, t_start+t_len))
     1989                        for i in range(l):
     1990                            exp = (((i in p_elements) or (i in p_added))
     1991                                   and (i not in removed))
     1992                            which2 = "%s-(%d-%d)" % (which, t_start,
     1993                                                     t_start+t_len-1)
     1994                            self.failUnlessEqual(bool(b2.get(i, 1)), exp,
     1995                                                 which2+" %d" % i)
     1996
     1997    def test_test(self):
     1998        self.do_basic(SimpleDataSpans)
     1999        self.do_scan(SimpleDataSpans)
     2000
     2001    def test_basic(self):
     2002        self.do_basic(DataSpans)
     2003        self.do_scan(DataSpans)
     2004
     2005    def test_random(self):
     2006        # attempt to increase coverage of corner cases by comparing behavior
     2007        # of a simple-but-slow model implementation against the
     2008        # complex-but-fast actual implementation, in a large number of random
     2009        # operations
     2010        S1 = SimpleDataSpans
     2011        S2 = DataSpans
     2012        s1 = S1(); s2 = S2()
     2013        seed = ""
     2014        def _randstr(length, seed):
     2015            created = 0
     2016            pieces = []
     2017            while created < length:
     2018                piece = md5(seed + str(created)).hexdigest()
     2019                pieces.append(piece)
     2020                created += len(piece)
     2021            return "".join(pieces)[:length]
     2022        def _create(subseed):
     2023            ns1 = S1(); ns2 = S2()
     2024            for i in range(10):
     2025                what = md5(subseed+str(i)).hexdigest()
     2026                start = int(what[2:4], 16)
     2027                length = max(1,int(what[5:6], 16))
     2028                ns1.add(start, _randstr(length, what[7:9]));
     2029                ns2.add(start, _randstr(length, what[7:9]))
     2030            return ns1, ns2
     2031
     2032        #print
     2033        for i in range(1000):
     2034            what = md5(seed+str(i)).hexdigest()
     2035            op = what[0]
     2036            subop = what[1]
     2037            start = int(what[2:4], 16)
     2038            length = max(1,int(what[5:6], 16))
     2039            #print what
     2040            if op in "0":
     2041                if subop in "0123456":
     2042                    s1 = S1(); s2 = S2()
     2043                else:
     2044                    s1, s2 = _create(what[7:11])
     2045                #print "s2 = %s" % list(s2._dump())
     2046            elif op in "123456":
     2047                #print "s2.add(%d,%d)" % (start, length)
     2048                s1.add(start, _randstr(length, what[7:9]));
     2049                s2.add(start, _randstr(length, what[7:9]))
     2050            elif op in "789abc":
     2051                #print "s2.remove(%d,%d)" % (start, length)
     2052                s1.remove(start, length); s2.remove(start, length)
     2053            else:
     2054                #print "s2.pop(%d,%d)" % (start, length)
     2055                d1 = s1.pop(start, length); d2 = s2.pop(start, length)
     2056                self.failUnlessEqual(d1, d2)
     2057            #print "s1 now %s" % list(s1._dump())
     2058            #print "s2 now %s" % list(s2._dump())
     2059            self.failUnlessEqual(len(s1), len(s2))
     2060            self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
     2061            for j in range(100):
     2062                what = md5(what[12:14]+str(j)).hexdigest()
     2063                start = int(what[2:4], 16)
     2064                length = max(1, int(what[5:6], 16))
     2065                d1 = s1.get(start, length); d2 = s2.get(start, length)
     2066                self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))
  • new file src/allmydata/util/spans.py

    diff --git a/src/allmydata/util/spans.py b/src/allmydata/util/spans.py
    new file mode 100755
    index 0000000..336fddf
    - +  
     1
     2class Spans:
     3    """I represent a compressed list of booleans, one per index (an integer).
     4    Typically, each index represents an offset into a large string, pointing
     5    to a specific byte of a share. In this context, True means that byte has
     6    been received, or has been requested.
     7
     8    Another way to look at this is maintaining a set of integers, optimized
     9    for operations on spans like 'add range to set' and 'is range in set?'.
     10
     11    This is a python equivalent of perl's Set::IntSpan module, frequently
     12    used to represent .newsrc contents.
     13
     14    Rather than storing an actual (large) list or dictionary, I represent my
     15    internal state as a sorted list of spans, each with a start and a length.
     16    My API is presented in terms of start+length pairs. I provide set
     17    arithmetic operators, to efficiently answer questions like 'I want bytes
     18    XYZ, I already requested bytes ABC, and I've already received bytes DEF:
     19    what bytes should I request now?'.
     20
     21    The new downloader will use it to keep track of which bytes we've requested
     22    or received already.
     23    """
     24
     25    def __init__(self, _span_or_start=None, length=None):
     26        self._spans = list()
     27        if length is not None:
     28            self._spans.append( (_span_or_start, length) )
     29        elif _span_or_start:
     30            for (start,length) in _span_or_start:
     31                self.add(start, length)
     32        self._check()
     33
     34    def _check(self):
     35        assert sorted(self._spans) == self._spans
     36        prev_end = None
     37        try:
     38            for (start,length) in self._spans:
     39                if prev_end is not None:
     40                    assert start > prev_end
     41                prev_end = start+length
     42        except AssertionError:
     43            print "BAD:", self.dump()
     44            raise
     45
     46    def add(self, start, length):
     47        assert start >= 0
     48        assert length > 0
     49        #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump())
     50        first_overlap = last_overlap = None
     51        for i,(s_start,s_length) in enumerate(self._spans):
     52            #print "  (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length))
     53            if (overlap(s_start, s_length, start, length)
     54                or adjacent(s_start, s_length, start, length)):
     55                last_overlap = i
     56                if first_overlap is None:
     57                    first_overlap = i
     58                continue
     59            # no overlap
     60            if first_overlap is not None:
     61                break
     62        #print "  first_overlap", first_overlap, last_overlap
     63        if first_overlap is None:
     64            # no overlap, so just insert the span and sort by starting
     65            # position.
     66            self._spans.insert(0, (start,length))
     67            self._spans.sort()
     68        else:
     69            # everything from [first_overlap] to [last_overlap] overlapped
     70            first_start,first_length = self._spans[first_overlap]
     71            last_start,last_length = self._spans[last_overlap]
     72            newspan_start = min(start, first_start)
     73            newspan_end = max(start+length, last_start+last_length)
     74            newspan_length = newspan_end - newspan_start
     75            newspan = (newspan_start, newspan_length)
     76            self._spans[first_overlap:last_overlap+1] = [newspan]
     77        #print "  ADD done: %s" % self.dump()
     78        self._check()
     79
     80        return self
     81
     82    def remove(self, start, length):
     83        assert start >= 0
     84        assert length > 0
     85        #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump())
     86        first_complete_overlap = last_complete_overlap = None
     87        for i,(s_start,s_length) in enumerate(self._spans):
     88            s_end = s_start + s_length
     89            o = overlap(s_start, s_length, start, length)
     90            if o:
     91                o_start, o_length = o
     92                o_end = o_start+o_length
     93                if o_start == s_start and o_end == s_end:
     94                    # delete this span altogether
     95                    if first_complete_overlap is None:
     96                        first_complete_overlap = i
     97                    last_complete_overlap = i
     98                elif o_start == s_start:
     99                    # we only overlap the left side, so trim the start
     100                    #    1111
     101                    #  rrrr
     102                    #    oo
     103                    # ->   11
     104                    new_start = o_end
     105                    new_end = s_end
     106                    assert new_start > s_start
     107                    new_length = new_end - new_start
     108                    self._spans[i] = (new_start, new_length)
     109                elif o_end == s_end:
     110                    # we only overlap the right side
     111                    #    1111
     112                    #      rrrr
     113                    #      oo
     114                    # -> 11
     115                    new_start = s_start
     116                    new_end = o_start
     117                    assert new_end < s_end
     118                    new_length = new_end - new_start
     119                    self._spans[i] = (new_start, new_length)
     120                else:
     121                    # we overlap the middle, so create a new span. No need to
     122                    # examine any other spans.
     123                    #    111111
     124                    #      rr
     125                    #    LL  RR
     126                    left_start = s_start
     127                    left_end = o_start
     128                    left_length = left_end - left_start
     129                    right_start = o_end
     130                    right_end = s_end
     131                    right_length = right_end - right_start
     132                    self._spans[i] = (left_start, left_length)
     133                    self._spans.append( (right_start, right_length) )
     134                    self._spans.sort()
     135                    break
     136        if first_complete_overlap is not None:
     137            del self._spans[first_complete_overlap:last_complete_overlap+1]
     138        #print "  REMOVE done: %s" % self.dump()
     139        self._check()
     140        return self
     141
     142    def dump(self):
     143        return "len=%d: %s" % (len(self),
     144                               ",".join(["[%d-%d]" % (start,start+l-1)
     145                                         for (start,l) in self._spans]) )
     146
     147    def each(self):
     148        for start, length in self._spans:
     149            for i in range(start, start+length):
     150                yield i
     151
     152    def __iter__(self):
     153        for s in self._spans:
     154            yield s
     155
     156    def __len__(self):
     157        # this also gets us bool(s)
     158        return sum([length for start,length in self._spans])
     159
     160    def __add__(self, other):
     161        s = self.__class__(self)
     162        for (start, length) in other:
     163            s.add(start, length)
     164        return s
     165
     166    def __sub__(self, other):
     167        s = self.__class__(self)
     168        for (start, length) in other:
     169            s.remove(start, length)
     170        return s
     171
     172    def __iadd__(self, other):
     173        for (start, length) in other:
     174            self.add(start, length)
     175        return self
     176
     177    def __isub__(self, other):
     178        for (start, length) in other:
     179            self.remove(start, length)
     180        return self
     181
     182    def __contains__(self, (start,length)):
     183        for span_start,span_length in self._spans:
     184            o = overlap(start, length, span_start, span_length)
     185            if o:
     186                o_start,o_length = o
     187                if o_start == start and o_length == length:
     188                    return True
     189        return False
     190
     191def overlap(start0, length0, start1, length1):
     192    # return start2,length2 of the overlapping region, or None
     193    #  00      00   000   0000  00  00 000  00   00  00      00
     194    #     11    11   11    11   111 11 11  1111 111 11    11
     195    left = max(start0, start1)
     196    right = min(start0+length0, start1+length1)
     197    # if there is overlap, 'left' will be its start, and right-1 will
     198    # be the end'
     199    if left < right:
     200        return (left, right-left)
     201    return None
     202
     203def adjacent(start0, length0, start1, length1):
     204    if (start0 < start1) and start0+length0 == start1:
     205        return True
     206    elif (start1 < start0) and start1+length1 == start0:
     207        return True
     208    return False
     209
     210class DataSpans:
     211    """I represent portions of a large string. Equivalently, I can be said to
     212    maintain a large array of characters (with gaps of empty elements). I can
     213    be used to manage access to a remote share, where some pieces have been
     214    retrieved, some have been requested, and others have not been read.
     215    """
     216
     217    def __init__(self, other=None):
     218        self.spans = [] # (start, data) tuples, non-overlapping, merged
     219        if other:
     220            for (start, data) in other.get_spans():
     221                self.add(start, data)
     222
     223    def __len__(self):
     224        # return number of bytes we're holding
     225        return sum([len(data) for (start,data) in self.spans])
     226
     227    def _dump(self):
     228        # return iterator of sorted list of offsets, one per byte
     229        for (start,data) in self.spans:
     230            for i in range(start, start+len(data)):
     231                yield i
     232
     233    def get_spans(self):
     234        return list(self.spans)
     235
     236    def assert_invariants(self):
     237        if not self.spans:
     238            return
     239        prev_start = self.spans[0][0]
     240        prev_end = prev_start + len(self.spans[0][1])
     241        for start, data in self.spans[1:]:
     242            if not start > prev_end:
     243                # adjacent or overlapping: bad
     244                print "ASSERTION FAILED", self.spans
     245                raise AssertionError
     246
     247    def get(self, start, length):
     248        # returns a string of LENGTH, or None
     249        #print "get", start, length, self.spans
     250        end = start+length
     251        for (s_start,s_data) in self.spans:
     252            s_end = s_start+len(s_data)
     253            #print " ",s_start,s_end
     254            if s_start <= start < s_end:
     255                # we want some data from this span. Because we maintain
     256                # strictly merged and non-overlapping spans, everything we
     257                # want must be in this span.
     258                offset = start - s_start
     259                if offset + length > len(s_data):
     260                    #print " None, span falls short"
     261                    return None # span falls short
     262                #print " some", s_data[offset:offset+length]
     263                return s_data[offset:offset+length]
     264            if s_start >= end:
     265                # we've gone too far: no further spans will overlap
     266                #print " None, gone too far"
     267                return None
     268        #print " None, ran out of spans"
     269        return None
     270
     271    def add(self, start, data):
     272        # first: walk through existing spans, find overlap, modify-in-place
     273        #  create list of new spans
     274        #  add new spans
     275        #  sort
     276        #  merge adjacent spans
     277        #print "add", start, data, self.spans
     278        end = start + len(data)
     279        i = 0
     280        while len(data):
     281            #print " loop", start, data, i, len(self.spans), self.spans
     282            if i >= len(self.spans):
     283                #print " append and done"
     284                # append a last span
     285                self.spans.append( (start, data) )
     286                break
     287            (s_start,s_data) = self.spans[i]
     288            # five basic cases:
     289            #  a: OLD  b:OLDD  c1:OLD  c2:OLD   d1:OLDD  d2:OLD  e: OLLDD
     290            #    NEW     NEW      NEW     NEWW      NEW      NEW     NEW
     291            #
     292            # we handle A by inserting a new segment (with "N") and looping,
     293            # turning it into B or C. We handle B by replacing a prefix and
     294            # terminating. We handle C (both c1 and c2) by replacing the
     295            # segment (and, for c2, looping, turning it into A). We handle D
     296            # by replacing a suffix (and, for d2, looping, turning it into
     297            # A). We handle E by replacing the middle and terminating.
     298            if start < s_start:
     299                # case A: insert a new span, then loop with the remainder
     300                #print " insert new psan"
     301                s_len = s_start-start
     302                self.spans.insert(i, (start, data[:s_len]))
     303                i += 1
     304                start = s_start
     305                data = data[s_len:]
     306                continue
     307            s_len = len(s_data)
     308            s_end = s_start+s_len
     309            if s_start <= start < s_end:
     310                #print " modify this span", s_start, start, s_end
     311                # we want to modify some data in this span: a prefix, a
     312                # suffix, or the whole thing
     313                if s_start == start:
     314                    if s_end <= end:
     315                        #print " replace whole segment"
     316                        # case C: replace this segment
     317                        self.spans[i] = (s_start, data[:s_len])
     318                        i += 1
     319                        start += s_len
     320                        data = data[s_len:]
     321                        # C2 is where len(data)>0
     322                        continue
     323                    # case B: modify the prefix, retain the suffix
     324                    #print " modify prefix"
     325                    self.spans[i] = (s_start, data + s_data[len(data):])
     326                    break
     327                if start > s_start and end < s_end:
     328                    # case E: modify the middle
     329                    #print " modify middle"
     330                    prefix_len = start - s_start # we retain this much
     331                    suffix_len = s_end - end # and retain this much
     332                    newdata = s_data[:prefix_len] + data + s_data[-suffix_len:]
     333                    self.spans[i] = (s_start, newdata)
     334                    break
     335                # case D: retain the prefix, modify the suffix
     336                #print " modify suffix"
     337                prefix_len = start - s_start # we retain this much
     338                suffix_len = s_len - prefix_len # we replace this much
     339                #print "  ", s_data, prefix_len, suffix_len, s_len, data
     340                self.spans[i] = (s_start,
     341                                 s_data[:prefix_len] + data[:suffix_len])
     342                i += 1
     343                start += suffix_len
     344                data = data[suffix_len:]
     345                #print "  now", start, data
     346                # D2 is where len(data)>0
     347                continue
     348            # else we're not there yet
     349            #print " still looking"
     350            i += 1
     351            continue
     352        # now merge adjacent spans
     353        #print " merging", self.spans
     354        newspans = []
     355        for (s_start,s_data) in self.spans:
     356            if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]),
     357                                     s_start, len(s_data)):
     358                newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data)
     359            else:
     360                newspans.append( (s_start, s_data) )
     361        self.spans = newspans
     362        self.assert_invariants()
     363        #print " done", self.spans
     364
     365    def remove(self, start, length):
     366        i = 0
     367        end = start + length
     368        #print "remove", start, length, self.spans
     369        while i < len(self.spans):
     370            (s_start,s_data) = self.spans[i]
     371            if s_start >= end:
     372                # this segment is entirely right of the removed region, and
     373                # all further segments are even further right. We're done.
     374                break
     375            s_len = len(s_data)
     376            s_end = s_start + s_len
     377            o = overlap(start, length, s_start, s_len)
     378            if not o:
     379                i += 1
     380                continue
     381            o_start, o_len = o
     382            o_end = o_start + o_len
     383            if o_len == s_len:
     384                # remove the whole segment
     385                del self.spans[i]
     386                continue
     387            if o_start == s_start:
     388                # remove a prefix, leaving the suffix from o_end to s_end
     389                prefix_len = o_end - o_start
     390                self.spans[i] = (o_end, s_data[prefix_len:])
     391                i += 1
     392                continue
     393            elif o_end == s_end:
     394                # remove a suffix, leaving the prefix from s_start to o_start
     395                prefix_len = o_start - s_start
     396                self.spans[i] = (s_start, s_data[:prefix_len])
     397                i += 1
     398                continue
     399            # remove the middle, creating a new segment
     400            # left is s_start:o_start, right is o_end:s_end
     401            left_len = o_start - s_start
     402            left = s_data[:left_len]
     403            right_len = s_end - o_end
     404            right = s_data[-right_len:]
     405            self.spans[i] = (s_start, left)
     406            self.spans.insert(i+1, (o_end, right))
     407            break
     408        #print " done", self.spans
     409
     410    def pop(self, start, length):
     411        data = self.get(start, length)
     412        if data:
     413            self.remove(start, length)
     414        return data