Ticket #798: new-downloader-v3.diff
File new-downloader-v3.diff, 141.6 KB (added by warner, at 2010-04-26T09:53:14Z) |
---|
-
src/allmydata/client.py
diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 12e7473..a1ed272 100644
a b import allmydata 12 12 from allmydata.storage.server import StorageServer 13 13 from allmydata import storage_client 14 14 from allmydata.immutable.upload import Uploader 15 from allmydata.immutable.download import Downloader15 from allmydata.immutable.download2_util import Terminator 16 16 from allmydata.immutable.offloaded import Helper 17 17 from allmydata.control import ControlServer 18 18 from allmydata.introducer.client import IntroducerClient … … class Client(node.Node, pollmixin.PollMixin): 278 278 279 279 self.init_client_storage_broker() 280 280 self.history = History(self.stats_provider) 281 self.terminator = Terminator() 282 self.terminator.setServiceParent(self) 281 283 self.add_service(Uploader(helper_furl, self.stats_provider)) 282 download_cachedir = os.path.join(self.basedir,283 "private", "cache", "download")284 self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir)285 self.download_cache_dirman.setServiceParent(self)286 self.downloader = Downloader(self.storage_broker, self.stats_provider)287 284 self.init_stub_client() 288 285 self.init_nodemaker() 289 286 … … class Client(node.Node, pollmixin.PollMixin): 342 339 self._secret_holder, 343 340 self.get_history(), 344 341 self.getServiceNamed("uploader"), 345 self.downloader, 346 self.download_cache_dirman, 342 self.terminator, 347 343 self.get_encoding_parameters(), 348 344 self._key_generator) 349 345 -
new file src/allmydata/immutable/download2.py
diff --git a/src/allmydata/immutable/download2.py b/src/allmydata/immutable/download2.py new file mode 100644 index 0000000..83f8a47
- + 1 2 import binascii 3 import struct 4 from zope.interface import implements 5 from twisted.python.failure import Failure 6 from twisted.internet import defer 7 from twisted.internet.interfaces import IPushProducer, IConsumer 8 9 from foolscap.api import eventually 10 from allmydata.interfaces import HASH_SIZE, NotEnoughSharesError, \ 11 IImmutableFileNode 12 from allmydata.hashtree import IncompleteHashTree, BadHashError, \ 13 NotEnoughHashesError 14 from allmydata.util import base32, log, hashutil, mathutil, idlib 15 from allmydata.util.spans import Spans, DataSpans, overlap 16 from allmydata.util.dictutil import DictOfSets 17 from allmydata.codec import CRSDecoder 18 from allmydata import uri 19 from pycryptopp.cipher.aes import AES 20 from download2_util import Observer2, incidentally 21 from layout import make_write_bucket_proxy 22 23 (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \ 24 ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM") 25 26 KiB = 1024 27 class BadSegmentNumberError(Exception): 28 pass 29 class BadSegmentError(Exception): 30 pass 31 class BadCiphertextHashError(Exception): 32 pass 33 34 class Share: 35 """I represent a single instance of a single share (e.g. I reference the 36 shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). 37 I am associated with a CommonShare that remembers data that is held in 38 common among e.g. SI=abcde/shnum2 across all servers. I am also 39 associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all 40 servers). 41 """ 42 # this is a specific implementation of IShare for tahoe's native storage 43 # servers. A different backend would use a different class. 44 45 def __init__(self, rref, server_version, verifycap, commonshare, node, 46 peerid, shnum): 47 self._rref = rref 48 self._server_version = server_version 49 self._node = node # holds share_hash_tree and UEB 50 self._guess_offsets(verifycap, node.guessed_segment_size) 51 self.actual_offsets = None 52 self.actual_segment_size = None 53 self._UEB_length = None 54 self._commonshare = commonshare # holds block_hash_tree 55 self._peerid = peerid 56 self._peerid_s = base32.b2a(peerid)[:5] 57 self._storage_index = verifycap.storage_index 58 self._si_prefix = base32.b2a(verifycap.storage_index)[:8] 59 self._shnum = shnum 60 61 self._lp = log.msg(format="Share(%(si)s) on server=%(server)s starting", 62 si=self._si_prefix, server=self._peerid_s, 63 level=log.NOISY, umid="P7hv2w") 64 65 self._wanted = Spans() # desired metadata 66 self._wanted_blocks = Spans() # desired block data 67 self._requested = Spans() # we've sent a request for this 68 self._received = Spans() # we've received a response for this 69 self._received_data = DataSpans() # the response contents, still unused 70 self._requested_blocks = [] # (segnum, set(observer2..)) 71 ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"] 72 self._overrun_ok = ver["tolerates-immutable-read-overrun"] 73 # If _overrun_ok and we guess the offsets correctly, we can get 74 # everything in one RTT. If _overrun_ok and we guess wrong, we might 75 # need two RTT (but we could get lucky and do it in one). If overrun 76 # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version, 77 # 2=offset table, 3=UEB_length and everything else (hashes, block), 78 # 4=UEB. 79 80 def _guess_offsets(self, verifycap, guessed_segment_size): 81 self.guessed_segment_size = guessed_segment_size 82 size = verifycap.size 83 k = verifycap.needed_shares 84 N = verifycap.total_shares 85 r = self._node._calculate_sizes(guessed_segment_size) 86 # num_segments, block_size/tail_block_size 87 # guessed_segment_size/tail_segment_size/tail_segment_padded 88 share_size = mathutil.div_ceil(size, k) 89 # share_size is the amount of block data that will be put into each 90 # share, summed over all segments. It does not include hashes, the 91 # UEB, or other overhead. 92 93 # use the upload-side code to get this as accurate as possible 94 ht = IncompleteHashTree(N) 95 num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) 96 wbp = make_write_bucket_proxy(None, share_size, r["block_size"], 97 r["num_segments"], num_share_hashes, 0, 98 None) 99 self._fieldsize = wbp.fieldsize 100 self._fieldstruct = wbp.fieldstruct 101 self.guessed_offsets = wbp._offsets 102 103 # called by our client, the SegmentFetcher 104 def get_block(self, segnum): 105 """Add a block number to the list of requests. This will eventually 106 result in a fetch of the data necessary to validate the block, then 107 the block itself. The fetch order is generally 108 first-come-first-served, but requests may be answered out-of-order if 109 data becomes available sooner. 110 111 I return an Observer2, which has two uses. The first is to call 112 o.subscribe(), which gives me a place to send state changes and 113 eventually the data block. The second is o.cancel(), which removes 114 the request (if it is still active). 115 116 I will distribute the following events through my Observer2: 117 - state=OVERDUE: ?? I believe I should have had an answer by now. 118 You may want to ask another share instead. 119 - state=BADSEGNUM: the segnum you asked for is too large. I must 120 fetch a valid UEB before I can determine this, 121 so the notification is asynchronous 122 - state=COMPLETE, block=data: here is a valid block 123 - state=CORRUPT: this share contains corrupted data 124 - state=DEAD, f=Failure: the server reported an error, this share 125 is unusable 126 """ 127 assert segnum >= 0 128 o = Observer2() 129 o.set_canceler(self._cancel_block_request) 130 for i,(segnum0,observers) in enumerate(self._requested_blocks): 131 if segnum0 == segnum: 132 observers.add(o) 133 break 134 else: 135 self._requested_blocks.append( (segnum, set([o])) ) 136 eventually(self.loop) 137 return o 138 139 def _cancel_block_request(self, o): 140 new_requests = [] 141 for e in self._requested_blocks: 142 (segnum0, observers) = e 143 observers.discard(o) 144 if observers: 145 new_requests.append(e) 146 self._requested_blocks = new_requests 147 148 # internal methods 149 def _active_segnum(self): 150 if self._requested_blocks: 151 return self._requested_blocks[0] 152 return None 153 154 def _active_segnum_and_observers(self): 155 if self._requested_blocks: 156 # we only retrieve information for one segment at a time, to 157 # minimize alacrity (first come, first served) 158 return self._requested_blocks[0] 159 return None, [] 160 161 def loop(self): 162 try: 163 # if any exceptions occur here, kill the download 164 self._do_loop() 165 except BaseException: 166 self._fail(Failure()) 167 raise 168 169 def _do_loop(self): 170 # we are (eventually) called after all state transitions: 171 # new segments added to self._requested_blocks 172 # new data received from servers (responses to our read() calls) 173 # impatience timer fires (server appears slow) 174 175 # First, consume all of the information that we currently have, for 176 # all the segments people currently want. 177 while self._get_satisfaction(): 178 pass 179 180 # When we get no satisfaction (from the data we've received so far), 181 # we determine what data we desire (to satisfy more requests). The 182 # number of segments is finite, so I can't get no satisfaction 183 # forever. 184 self._desire() 185 186 # finally send out requests for whatever we need (desire minus have). 187 # You can't always get what you want, but, sometimes, you get what 188 # you need. 189 self._request_needed() # express desire 190 191 def _get_satisfaction(self): 192 # return True if we retired a data block, and should therefore be 193 # called again. Return False if we don't retire a data block (even if 194 # we do retire some other data, like hash chains). 195 196 if self.actual_offsets is None: 197 if not self._satisfy_offsets(): 198 # can't even look at anything without the offset table 199 return False 200 201 if not self._node.have_UEB: 202 if not self._satisfy_UEB(): 203 # can't check any hashes without the UEB 204 return False 205 206 # knowing the UEB means knowing num_segments. Despite the redundancy, 207 # this is the best place to set this. CommonShare.set_numsegs will 208 # ignore duplicate calls. 209 cs = self._commonshare 210 cs.set_numsegs(self._node.num_segments) 211 212 segnum, observers = self._active_segnum_and_observers() 213 if segnum >= self._node.num_segments: 214 for o in observers: 215 o.notify(state=BADSEGNUM) 216 self._requested_blocks.pop(0) 217 return True 218 219 if self._node.share_hash_tree.needed_hashes(self._shnum): 220 if not self._satisfy_share_hash_tree(): 221 # can't check block_hash_tree without a root 222 return False 223 224 if cs.need_block_hash_root(): 225 block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) 226 cs.set_block_hash_root(block_hash_root) 227 228 if segnum is None: 229 return False # we don't want any particular segment right now 230 231 # block_hash_tree 232 needed_hashes = self._commonshare.get_needed_block_hashes(segnum) 233 if needed_hashes: 234 if not self._satisfy_block_hash_tree(needed_hashes): 235 # can't check block without block_hash_tree 236 return False 237 238 # data blocks 239 return self._satisfy_data_block(segnum, observers) 240 241 def _satisfy_offsets(self): 242 version_s = self._received_data.get(0, 4) 243 if version_s is None: 244 return False 245 (version,) = struct.unpack(">L", version_s) 246 if version == 1: 247 table_start = 0x0c 248 self._fieldsize = 0x4 249 self._fieldstruct = "L" 250 else: 251 table_start = 0x14 252 self._fieldsize = 0x8 253 self._fieldstruct = "Q" 254 offset_table_size = 6 * self._fieldsize 255 table_s = self._received_data.pop(table_start, offset_table_size) 256 if table_s is None: 257 return False 258 fields = struct.unpack(">"+6*self._fieldstruct, table_s) 259 offsets = {} 260 for i,field in enumerate(['data', 261 'plaintext_hash_tree', # UNUSED 262 'crypttext_hash_tree', 263 'block_hashes', 264 'share_hashes', 265 'uri_extension', 266 ] ): 267 offsets[field] = fields[i] 268 self.actual_offsets = offsets 269 log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) 270 self._received_data.remove(0, 4) # don't need this anymore 271 return True 272 273 def _satisfy_UEB(self): 274 o = self.actual_offsets 275 fsize = self._fieldsize 276 rdata = self._received_data 277 UEB_length_s = rdata.get(o["uri_extension"], fsize) 278 if not UEB_length_s: 279 return False 280 (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) 281 UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length) 282 if not UEB_s: 283 return False 284 rdata.remove(o["uri_extension"], fsize) 285 try: 286 self._node.validate_and_store_UEB(UEB_s) 287 self.actual_segment_size = self._node.segment_size 288 assert self.actual_segment_size is not None 289 return True 290 except BadHashError: 291 # TODO: if this UEB was bad, we'll keep trying to validate it 292 # over and over again. Only log.err on the first one, or better 293 # yet skip all but the first 294 f = Failure() 295 self._signal_corruption(f, o["uri_extension"], fsize+UEB_length) 296 return False 297 298 def _satisfy_share_hash_tree(self): 299 # the share hash chain is stored as (hashnum,hash) tuples, so you 300 # can't fetch just the pieces you need, because you don't know 301 # exactly where they are. So fetch everything, and parse the results 302 # later. 303 o = self.actual_offsets 304 rdata = self._received_data 305 hashlen = o["uri_extension"] - o["share_hashes"] 306 assert hashlen % (2+HASH_SIZE) == 0 307 hashdata = rdata.get(o["share_hashes"], hashlen) 308 if not hashdata: 309 return False 310 share_hashes = {} 311 for i in range(0, hashlen, 2+HASH_SIZE): 312 (hashnum,) = struct.unpack(">H", hashdata[i:i+2]) 313 hashvalue = hashdata[i+2:i+2+HASH_SIZE] 314 share_hashes[hashnum] = hashvalue 315 try: 316 self._node.process_share_hashes(share_hashes) 317 # adds to self._node.share_hash_tree 318 rdata.remove(o["share_hashes"], hashlen) 319 return True 320 except (BadHashError, NotEnoughHashesError, IndexError): 321 f = Failure() 322 self._signal_corruption(f, o["share_hashes"], hashlen) 323 return False 324 325 def _signal_corruption(self, f, start, offset): 326 # there was corruption somewhere in the given range 327 reason = "corruption in share[%d-%d): %s" % (start, start+offset, 328 str(f.value)) 329 self._rref.callRemoteOnly("advise_corrupt_share", "immutable", 330 self._storage_index, self._shnum, reason) 331 332 def _satisfy_block_hash_tree(self, needed_hashes): 333 o = self.actual_offsets 334 rdata = self._received_data 335 block_hashes = {} 336 for hashnum in needed_hashes: 337 hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 338 if hashdata: 339 block_hashes[hashnum] = hashdata 340 else: 341 return False # missing some hashes 342 # note that we don't submit any hashes to the block_hash_tree until 343 # we've gotten them all, because the hash tree will throw an 344 # exception if we only give it a partial set (which it therefore 345 # cannot validate) 346 commonshare = self._commonshare 347 ok = commonshare.process_block_hashes(block_hashes, self._peerid_s) 348 if not ok: 349 return False 350 for hashnum in needed_hashes: 351 rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 352 return True 353 354 def _satisfy_data_block(self, segnum, observers): 355 tail = (segnum == self._node.num_segments-1) 356 datastart = self.actual_offsets["data"] 357 blockstart = datastart + segnum * self._node.block_size 358 blocklen = self._node.block_size 359 if tail: 360 blocklen = self._node.tail_block_size 361 362 rdata = self._received_data 363 block = rdata.pop(blockstart, blocklen) 364 if not block: 365 return False 366 # this block is being retired, either as COMPLETE or CORRUPT, since 367 # no further data reads will help 368 assert self._requested_blocks[0][0] == segnum 369 commonshare = self._commonshare 370 ok = commonshare.check_block(segnum, block, self._peerid_s) 371 if ok: 372 for o in observers: 373 # goes to SegmentFetcher._block_request_activity 374 o.notify(state=COMPLETE, block=block) 375 else: 376 for o in observers: 377 o.notify(state=CORRUPT) 378 self._requested_blocks.pop(0) # retired 379 return True # got satisfaction 380 381 def _desire(self): 382 segnum, observers = self._active_segnum_and_observers() 383 commonshare = self._commonshare 384 385 if not self.actual_offsets: 386 self._desire_offsets() 387 388 # we can use guessed offsets as long as this server tolerates overrun 389 if not self.actual_offsets and not self._overrun_ok: 390 return # must wait for the offsets to arrive 391 392 o = self.actual_offsets or self.guessed_offsets 393 segsize = self.actual_segment_size or self.guessed_segment_size 394 if not self._node.have_UEB: 395 self._desire_UEB(o) 396 397 if self._node.share_hash_tree.needed_hashes(self._shnum): 398 hashlen = o["uri_extension"] - o["share_hashes"] 399 self._wanted.add(o["share_hashes"], hashlen) 400 401 if segnum is None: 402 return # only need block hashes or blocks for active segments 403 404 # block hash chain 405 for hashnum in commonshare.get_needed_block_hashes(segnum): 406 self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 407 408 # data 409 r = self._node._calculate_sizes(segsize) 410 tail = (segnum == r["num_segments"]) 411 datastart = o["data"] 412 blockstart = datastart + segnum * r["block_size"] 413 blocklen = r["block_size"] 414 if tail: 415 blocklen = r["tail_block_size"] 416 self._wanted_blocks.add(blockstart, blocklen) 417 418 419 def _desire_offsets(self): 420 if self._overrun_ok: 421 # easy! this includes version number, sizes, and offsets 422 self._wanted.add(0,1024) 423 return 424 425 # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44). 426 # To be conservative, only request the data that we know lives there, 427 # even if that means more roundtrips. 428 429 self._wanted.add(0,4) # version number, always safe 430 version_s = self._received_data.get(0, 4) 431 if not version_s: 432 return 433 (version,) = struct.unpack(">L", version_s) 434 if version == 1: 435 table_start = 0x0c 436 fieldsize = 0x4 437 else: 438 table_start = 0x14 439 fieldsize = 0x8 440 offset_table_size = 6 * fieldsize 441 self._wanted.add(table_start, offset_table_size) 442 443 def _desire_UEB(self, o): 444 # UEB data is stored as (length,data). 445 rdata = self._received_data 446 if self._overrun_ok: 447 # We can pre-fetch 2kb, which should probably cover it. If it 448 # turns out to be larger, we'll come back here later with a known 449 # length and fetch the rest. 450 self._wanted.add(o["uri_extension"], 2048) 451 # now, while that is probably enough to fetch the whole UEB, it 452 # might not be, so we need to do the next few steps as well. In 453 # most cases, the following steps will not actually add anything 454 # to self._wanted 455 456 self._wanted.add(o["uri_extension"], self._fieldsize) 457 # only use a length if we're sure it's correct, otherwise we'll 458 # probably fetch a huge number 459 if not self.actual_offsets: 460 return 461 UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize) 462 if UEB_length_s: 463 (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) 464 # we know the length, so make sure we grab everything 465 self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length) 466 467 def _request_needed(self): 468 # send requests for metadata first, to avoid hanging on to large data 469 # blocks any longer than necessary. 470 self._send_requests(self._wanted - self._received - self._requested) 471 # then send requests for data blocks. All the hashes should arrive 472 # before the blocks, so the blocks can be consumed and released in a 473 # single turn. 474 ask = self._wanted_blocks - self._received - self._requested 475 self._send_requests(ask) 476 477 def _send_requests(self, needed): 478 for (start, length) in needed: 479 # TODO: quantize to reasonably-large blocks 480 self._requested.add(start, length) 481 lp = log.msg(format="_send_request(%(peerid)s)" 482 " [%(start)d:+%(length)d]", 483 peerid=self._peerid_s, start=start, length=length, 484 level=log.NOISY, umid="sgVAyA") 485 d = self._send_request(start, length) 486 d.addCallback(self._got_data, start, length, lp) 487 d.addErrback(self._got_error, start, length, lp) 488 d.addCallback(incidentally, eventually, self.loop) 489 d.addErrback(lambda f: 490 log.err(format="unhandled error during send_request", 491 failure=f, parent=self._lp, 492 level=log.WEIRD, umid="qZu0wg")) 493 494 def _send_request(self, start, length): 495 return self._rref.callRemote("read", start, length) 496 497 def _got_data(self, data, start, length, lp): 498 log.msg(format="_got_data [%(start)d:+%(length)d] -> %(datalen)d", 499 start=start, length=length, datalen=len(data), 500 level=log.NOISY, parent=lp, umid="sgVAyA") 501 span = (start, length) 502 assert span in self._requested 503 self._requested.remove(start, length) 504 self._received.add(start, length) 505 self._received_data.add(start, data) 506 507 def _got_error(self, f, start, length, lp): 508 log.msg(format="error requesting %(start)d+%(length)d" 509 " from %(server)s for si %(si)s", 510 start=start, length=length, 511 server=self._peerid_s, si=self._si_prefix, 512 failure=f, parent=lp, level=log.UNUSUAL, umid="qZu0wg") 513 # retire our observers, assuming we won't be able to make any 514 # further progress 515 self._fail(f) 516 517 def _fail(self, f): 518 for (segnum, observers) in self._requested_blocks: 519 for o in observers: 520 o.notify(state=DEAD, f=f) 521 522 523 class CommonShare: 524 """I hold data that is common across all instances of a single share, 525 like sh2 on both servers A and B. This is just the block hash tree. 526 """ 527 def __init__(self, guessed_numsegs, si_prefix, shnum): 528 self.si_prefix = si_prefix 529 self.shnum = shnum 530 # in the beginning, before we have the real UEB, we can only guess at 531 # the number of segments. But we want to ask for block hashes early. 532 # So if we're asked for which block hashes are needed before we know 533 # numsegs for sure, we return a guess. 534 self._block_hash_tree = IncompleteHashTree(guessed_numsegs) 535 self._know_numsegs = False 536 537 def set_numsegs(self, numsegs): 538 if self._know_numsegs: 539 return 540 self._block_hash_tree = IncompleteHashTree(numsegs) 541 self._know_numsegs = True 542 543 def need_block_hash_root(self): 544 log.msg("need_block_hash_root: %s" % bool(not self._block_hash_tree[0])) 545 return bool(not self._block_hash_tree[0]) 546 547 def set_block_hash_root(self, roothash): 548 log.msg("set_block_hash_root: %s" % repr(roothash)) 549 self._block_hash_tree.set_hashes({0: roothash}) 550 log.msg("done with set_block_hash_root") 551 552 def get_needed_block_hashes(self, segnum): 553 needed = ",".join([str(n) for n in sorted(self._block_hash_tree.needed_hashes(segnum))]) 554 log.msg("segnum=%d needs %s" % (segnum, needed)) 555 # XXX: include_leaf=True needs thought: how did the old downloader do 556 # it? I think it grabbed *all* block hashes and set them all at once. 557 # Since we want to fetch less data, we either need to fetch the leaf 558 # too, or wait to set the block hashes until we've also received the 559 # block itself, so we can hash it too, and set the chain+leaf all at 560 # the same time. 561 return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) 562 563 def process_block_hashes(self, block_hashes, serverid_s): 564 assert self._know_numsegs 565 try: 566 self._block_hash_tree.set_hashes(block_hashes) 567 return True 568 except (BadHashError, NotEnoughHashesError): 569 hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())]) 570 log.msg(format="hash failure in block_hashes=(%(hashnums)s)," 571 " shnum=%(shnum)d SI=%(si)s server=%(server)s", 572 hashnums=hashnums, shnum=self.shnum, 573 si=self.si_prefix, server=serverid_s, failure=Failure(), 574 level=log.WEIRD, umid="yNyFdA") 575 return False 576 577 def check_block(self, segnum, block, serverid_s): 578 assert self._know_numsegs 579 h = hashutil.block_hash(block) 580 try: 581 self._block_hash_tree.set_hashes(leaves={segnum: h}) 582 return True 583 except (BadHashError, NotEnoughHashesError): 584 self.log(format="hash failure in block %(segnum)d," 585 " shnum=%(shnum)d SI=%(si)s server=%(server)s", 586 segnum=segnum, shnum=self.shnum, si=self.si_prefix, 587 server=serverid_s, failure=Failure(), 588 level=log.WEIRD, umid="mZjkqA") 589 return False 590 591 # all classes are also Services, and the rule is that you don't initiate more 592 # work unless self.running 593 594 # GC: decide whether each service is restartable or not. For non-restartable 595 # services, stopService() should delete a lot of attributes to kill reference 596 # cycles. The primary goal is to decref remote storage BucketReaders when a 597 # download is complete. 598 599 class SegmentFetcher: 600 """I am responsible for acquiring blocks for a single segment. I will use 601 the Share instances passed to my add_shares() method to locate, retrieve, 602 and validate those blocks. I expect my parent node to call my 603 no_more_shares() method when there are no more shares available. I will 604 call my parent's want_more_shares() method when I want more: I expect to 605 see at least one call to add_shares or no_more_shares afterwards. 606 607 When I have enough validated blocks, I will call my parent's 608 process_blocks() method with a dictionary that maps shnum to blockdata. 609 If I am unable to provide enough blocks, I will call my parent's 610 fetch_failed() method with (self, f). After either of these events, I 611 will shut down and do no further work. My parent can also call my stop() 612 method to have me shut down early.""" 613 614 def __init__(self, node, segnum, k): 615 self._node = node # _Node 616 self.segnum = segnum 617 self._k = k 618 self._shares = {} # maps non-dead Share instance to a state, one of 619 # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). 620 # State transition map is: 621 # AVAILABLE -(send-read)-> PENDING 622 # PENDING -(timer)-> OVERDUE 623 # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM 624 # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM 625 # If a share becomes DEAD, it is removed from the 626 # dict. If it becomes BADSEGNUM, the whole fetch is 627 # terminated. 628 self._share_observers = {} # maps Share to Observer2 for active ones 629 self._shnums = DictOfSets() # maps shnum to the shares that provide it 630 self._blocks = {} # maps shnum to validated block data 631 self._no_more_shares = False 632 self._bad_segnum = False 633 self._last_failure = None 634 self._running = True 635 636 def stop(self): 637 self._cancel_all_requests() 638 self._running = False 639 del self._shares # let GC work # ??? XXX 640 641 642 # called by our parent _Node 643 644 def add_shares(self, shares): 645 # called when ShareFinder locates a new share, and when a non-initial 646 # segment fetch is started and we already know about shares from the 647 # previous segment 648 for s in shares: 649 self._shares[s] = AVAILABLE 650 self._shnums.add(s._shnum, s) 651 eventually(self.loop) 652 653 def no_more_shares(self): 654 # ShareFinder tells us it's reached the end of its list 655 self._no_more_shares = True 656 eventually(self.loop) 657 658 # internal methods 659 660 def _count_shnums(self, *states): 661 """shnums for which at least one state is in the following list""" 662 shnums = [] 663 for shnum,shares in self._shnums.iteritems(): 664 matches = [s for s in shares if self._shares[s] in states] 665 if matches: 666 shnums.append(shnum) 667 return len(shnums) 668 669 def loop(self): 670 try: 671 # if any exception occurs here, kill the download 672 self._do_loop() 673 except BaseException: 674 self._node.fetch_failed(self, Failure()) 675 raise 676 677 def _do_loop(self): 678 k = self._k 679 if not self._running: 680 return 681 if self._bad_segnum: 682 # oops, we were asking for a segment number beyond the end of the 683 # file. This is an error. 684 self.stop() 685 e = BadSegmentNumberError("%d > %d" % (self.segnum, 686 self._node.num_segments)) 687 f = Failure(e) 688 self._node.fetch_failed(self, f) 689 return 690 691 # are we done? 692 if self._count_shnums(COMPLETE) >= k: 693 # yay! 694 self.stop() 695 self._node.process_blocks(self.segnum, self._blocks) 696 return 697 698 # we may have exhausted everything 699 if (self._no_more_shares and 700 self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): 701 # no more new shares are coming, and the remaining hopeful shares 702 # aren't going to be enough. boo! 703 self.stop() 704 format = ("ran out of shares: %(complete)d complete," 705 " %(pending)d pending, %(overdue)d overdue," 706 " %(unused)d unused, need %(k)d." 707 " Last failure: %(last_failure)s") 708 args = {"complete": self._count_shnums(COMPLETE), 709 "pending": self._count_shnums(PENDING), 710 "overdue": self._count_shnums(OVERDUE), 711 "unused": self._count_shnums(AVAILABLE), # should be zero 712 "k": k, 713 "last_failure": self._last_failure, 714 } 715 log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) 716 e = NotEnoughSharesError(format % args) 717 f = Failure(e) 718 self._node.fetch_failed(self, f) 719 return 720 721 # nope, not done. Are we "block-hungry" (i.e. do we want to send out 722 # more read requests, or do we think we have enough in flight 723 # already?) 724 while self._count_shnums(PENDING, COMPLETE) < k: 725 # we're hungry.. are there any unused shares? 726 sent = self._send_new_request() 727 if not sent: 728 break 729 730 # ok, now are we "share-hungry" (i.e. do we have enough known shares 731 # to make us happy, or should we ask the ShareFinder to get us more?) 732 if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: 733 # we're hungry for more shares 734 self._node.want_more_shares() 735 # that will trigger the ShareFinder to keep looking 736 737 def _find_one(self, shares, state): 738 # TODO could choose fastest 739 for s in shares: 740 if self._shares[s] == state: 741 return s 742 raise IndexError("shouldn't get here") 743 744 def _send_new_request(self): 745 for shnum,shares in self._shnums.iteritems(): 746 states = [self._shares[s] for s in shares] 747 if COMPLETE in states or PENDING in states: 748 # don't send redundant requests 749 continue 750 if AVAILABLE not in states: 751 # no candidates for this shnum, move on 752 continue 753 # here's a candidate. Send a request. 754 s = self._find_one(shares, AVAILABLE) 755 self._shares[s] = PENDING 756 self._share_observers[s] = o = s.get_block(self.segnum) 757 o.subscribe(self._block_request_activity, share=s, shnum=shnum) 758 # TODO: build up a list of candidates, then walk through the 759 # list, sending requests to the most desireable servers, 760 # re-checking our block-hunger each time. For non-initial segment 761 # fetches, this would let us stick with faster servers. 762 return True 763 # nothing was sent: don't call us again until you have more shares to 764 # work with, or one of the existing shares has been declared OVERDUE 765 return False 766 767 def _cancel_all_requests(self): 768 for o in self._share_observers.values(): 769 o.cancel() 770 self._share_observers = {} 771 772 def _block_request_activity(self, share, shnum, state, block=None, f=None): 773 # called by Shares, in response to our s.send_request() calls. 774 # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. 775 if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): 776 del self._share_observers[share] 777 if state is COMPLETE: 778 # 'block' is fully validated 779 self._shares[share] = COMPLETE 780 self._blocks[shnum] = block 781 elif state is OVERDUE: 782 self._shares[share] = OVERDUE 783 # OVERDUE is not terminal: it will eventually transition to 784 # COMPLETE, CORRUPT, or DEAD. 785 elif state is CORRUPT: 786 self._shares[share] = CORRUPT 787 elif state is DEAD: 788 del self._shares[share] 789 self._shnums[shnum].remove(share) 790 self._last_failure = f 791 elif state is BADSEGNUM: 792 self._shares[share] = BADSEGNUM # ??? 793 self._bad_segnum = True 794 eventually(self.loop) 795 796 797 class RequestToken: 798 def __init__(self, peerid): 799 self.peerid = peerid 800 801 class ShareFinder: 802 def __init__(self, storage_broker, verifycap, node, 803 max_outstanding_requests=10): 804 self.running = True 805 self.verifycap = verifycap 806 s = storage_broker.get_servers_for_index(verifycap.storage_index) 807 self._servers = iter(s) 808 self.share_consumer = self.node = node 809 self.max_outstanding_requests = max_outstanding_requests 810 811 self._hungry = False 812 813 self._commonshares = {} # shnum to CommonShare instance 814 self.undelivered_shares = [] 815 self.pending_requests = set() 816 817 self._storage_index = verifycap.storage_index 818 self._si_prefix = base32.b2a_l(self._storage_index[:8], 60) 819 self._lp = log.msg(format="ShareFinder[si=%(si)s] starting", 820 si=self._si_prefix, level=log.NOISY, umid="2xjj2A") 821 822 def log(self, *args, **kwargs): 823 if "parent" not in kwargs: 824 kwargs["parent"] = self._lp 825 return log.msg(*args, **kwargs) 826 827 def stop(self): 828 self.running = False 829 830 # called by our parent CiphertextDownloader 831 def hungry(self): 832 log.msg(format="ShareFinder[si=%(si)s] hungry", 833 si=self._si_prefix, level=log.NOISY, umid="NywYaQ") 834 self._hungry = True 835 eventually(self.loop) 836 837 # internal methods 838 def loop(self): 839 undelivered_s = ",".join(["sh%d@%s" % 840 (s._shnum, idlib.shortnodeid_b2a(s._peerid)) 841 for s in self.undelivered_shares]) 842 pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) 843 for rt in self.pending_requests]) # sort? 844 log.msg(format="ShareFinder[si=%(si)s] loop: running=%(running)s" 845 " hungry=%(hungry)s, undelivered=%(undelivered)s," 846 " pending=%(pending)s", 847 si=self._si_prefix, running=self.running, hungry=self._hungry, 848 undelivered=undelivered_s, pending=pending_s, 849 level=log.NOISY, umid="kRtS4Q") 850 if not self.running: 851 return 852 if not self._hungry: 853 return 854 if self.undelivered_shares: 855 sh = self.undelivered_shares.pop(0) 856 # they will call hungry() again if they want more 857 self._hungry = False 858 eventually(self.share_consumer.got_shares, [sh]) 859 return 860 if len(self.pending_requests) >= self.max_outstanding_requests: 861 # cannot send more requests, must wait for some to retire 862 return 863 864 server = None 865 try: 866 if self._servers: 867 server = self._servers.next() 868 except StopIteration: 869 self._servers = None 870 871 if server: 872 self.send_request(server) 873 return 874 875 if self.pending_requests: 876 # no server, but there are still requests in flight: maybe one of 877 # them will make progress 878 return 879 880 log.msg(format="ShareFinder.loop: no_more_shares", 881 level=log.UNUSUAL, umid="XjQlzg") 882 # we've run out of servers (so we can't send any more requests), and 883 # we have nothing in flight. No further progress can be made. They 884 # are destined to remain hungry. 885 self.share_consumer.no_more_shares() 886 self.stop() 887 888 def send_request(self, server): 889 peerid, rref = server 890 req = RequestToken(peerid) 891 self.pending_requests.add(req) 892 lp = self.log(format="sending DYHB to [%(peerid)s]", 893 peerid=idlib.shortnodeid_b2a(peerid), 894 level=log.NOISY, umid="Io7pyg") 895 d = rref.callRemote("get_buckets", self._storage_index) 896 d.addBoth(incidentally, self.pending_requests.discard, req) 897 d.addCallbacks(self._got_response, self._got_error, 898 callbackArgs=(rref.version, peerid, req, lp), 899 errbackArgs=(peerid, req, lp)) 900 d.addErrback(log.err, format="error in send_request", 901 level=log.WEIRD, parent=lp, umid="rpdV0w") 902 d.addCallback(incidentally, eventually, self.loop) 903 904 def _got_response(self, buckets, server_version, peerid, req, lp): 905 if buckets: 906 shnums_s = ",".join([str(shnum) for shnum in buckets]) 907 self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", 908 shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), 909 level=log.NOISY, parent=lp, umid="0fcEZw") 910 else: 911 self.log(format="no shares from [%(peerid)s]", 912 peerid=idlib.shortnodeid_b2a(peerid), 913 level=log.NOISY, parent=lp, umid="U7d4JA") 914 if self.node.num_segments is None: 915 best_numsegs = self.node.guessed_num_segments 916 else: 917 best_numsegs = self.node.num_segments 918 for shnum, bucket in buckets.iteritems(): 919 if shnum in self._commonshares: 920 cs = self._commonshares[shnum] 921 else: 922 cs = CommonShare(best_numsegs, self._si_prefix, shnum) 923 # Share._get_satisfaction is responsible for updating 924 # CommonShare.set_numsegs after we know the UEB. Alternatives: 925 # 1: d = self.node.get_num_segments() 926 # d.addCallback(cs.got_numsegs) 927 # the problem is that the OneShotObserverList I was using 928 # inserts an eventual-send between _get_satisfaction's 929 # _satisfy_UEB and _satisfy_block_hash_tree, and the 930 # CommonShare didn't get the num_segs message before 931 # being asked to set block hash values. To resolve this 932 # would require an immediate ObserverList instead of 933 # an eventual-send -based one 934 # 2: break _get_satisfaction into Deferred-attached pieces. 935 # Yuck. 936 self._commonshares[shnum] = cs 937 s = Share(bucket, server_version, self.verifycap, cs, self.node, 938 peerid, shnum) 939 self.undelivered_shares.append(s) 940 941 def _got_error(self, f, peerid, req, lp): 942 self.log(format="got error from [%(peerid)s]", 943 peerid=idlib.shortnodeid_b2a(peerid), failure=f, 944 level=log.UNUSUAL, parent=lp, umid="zUKdCw") 945 946 947 948 class Segmentation: 949 """I am responsible for a single offset+size read of the file. I handle 950 segmentation: I figure out which segments are necessary, request them 951 (from my CiphertextDownloader) in order, and trim the segments down to 952 match the offset+size span. I use the Producer/Consumer interface to only 953 request one segment at a time. 954 """ 955 implements(IPushProducer) 956 def __init__(self, node, offset, size, consumer): 957 self._node = node 958 self._hungry = True 959 self._active_segnum = None 960 self._cancel_segment_request = None 961 # these are updated as we deliver data. At any given time, we still 962 # want to download file[offset:offset+size] 963 self._offset = offset 964 self._size = size 965 self._consumer = consumer 966 967 def start(self): 968 self._alive = True 969 self._deferred = defer.Deferred() 970 self._consumer.registerProducer(self, True) 971 self._maybe_fetch_next() 972 return self._deferred 973 974 def _maybe_fetch_next(self): 975 if not self._alive or not self._hungry: 976 return 977 if self._active_segnum is not None: 978 return 979 self._fetch_next() 980 981 def _fetch_next(self): 982 if self._size == 0: 983 # done! 984 self._alive = False 985 self._hungry = False 986 self._consumer.unregisterProducer() 987 self._deferred.callback(self._consumer) 988 return 989 n = self._node 990 have_actual_segment_size = n.segment_size is not None 991 segment_size = n.segment_size or n.guessed_segment_size 992 if self._offset == 0: 993 # great! we want segment0 for sure 994 wanted_segnum = 0 995 else: 996 # this might be a guess 997 wanted_segnum = self._offset // segment_size 998 self._active_segnum = wanted_segnum 999 d,c = n.get_segment(wanted_segnum) 1000 self._cancel_segment_request = c 1001 d.addBoth(self._request_retired) 1002 d.addCallback(self._got_segment, have_actual_segment_size) 1003 d.addErrback(self._retry_bad_segment, have_actual_segment_size) 1004 d.addErrback(self._error) 1005 1006 def _request_retired(self, res): 1007 self._active_segnum = None 1008 self._cancel_segment_request = None 1009 return res 1010 1011 def _got_segment(self, (segment_start,segment), had_actual_segment_size): 1012 segnum = self._active_segnum 1013 self._active_segnum = None 1014 self._cancel_segment_request = None 1015 # we got file[segment_start:segment_start+len(segment)] 1016 # we want file[self._offset:self._offset+self._size] 1017 o = overlap(segment_start, len(segment), self._offset, self._size) 1018 # the overlap is file[o[0]:o[0]+o[1]] 1019 if not o or o[0] != self._offset: 1020 # we didn't get the first byte, so we can't use this segment 1021 if self._node.segment_size is not None: 1022 # and we should have gotten it right. This is big problem. 1023 log.msg("Segmentation handed wrong data (but we knew better):" 1024 " want [%d-%d), given [%d-%d), for segnum=%d," 1025 " for si=%s" 1026 % (self._offset, self._offset+self._size, 1027 segment_start, segment_start+len(segment), 1028 segnum, self._node._si_prefix), 1029 level=log.WEIRD, umid="STlIiA") 1030 raise BadSegmentError("Despite knowing the segment size," 1031 " we were given the wrong data." 1032 " I cannot cope.") 1033 # we've wasted some bandwidth, but now we can grab the right one, 1034 # because we should know the segsize by now. 1035 assert self._node.segment_size is not None 1036 self._maybe_fetch_next() 1037 return 1038 offset_in_segment = self._offset - segment_start 1039 desired_data = segment[offset_in_segment:offset_in_segment+o[1]] 1040 1041 self._offset += len(desired_data) 1042 self._size -= len(desired_data) 1043 self._consumer.write(desired_data) 1044 # the consumer might call our .pauseProducing() inside that write() 1045 # call, setting self._hungry=False 1046 self._maybe_fetch_next() 1047 1048 def _retry_bad_segment(self, f, had_actual_segment_size): 1049 f.trap(BadSegmentNumberError) # guessed way wrong, off the end 1050 if had_actual_segment_size: 1051 # but we should have known better, so this is a real error 1052 return f 1053 # we didn't know better: try again with more information 1054 return self._maybe_fetch_next() 1055 1056 def _error(self, f): 1057 self._alive = False 1058 self._hungry = False 1059 self._consumer.unregisterProducer() 1060 self._deferred.errback(f) 1061 1062 def stopProducing(self): 1063 self._hungry = False 1064 self._alive = False 1065 # cancel any outstanding segment request 1066 if self._cancel_segment_request: 1067 self._cancel_segment_request() 1068 self._cancel_segment_request = None 1069 def pauseProducing(self): 1070 self._hungry = False 1071 def resumeProducing(self): 1072 self._hungry = True 1073 eventually(self._maybe_fetch_next) 1074 1075 class Cancel: 1076 def __init__(self, f): 1077 self._f = f 1078 self.cancelled = False 1079 def cancel(self): 1080 if not self.cancelled: 1081 self.cancelled = True 1082 self._f(self) 1083 1084 class _Node: 1085 """Internal class which manages downloads and holds state. External 1086 callers use CiphertextFileNode instead.""" 1087 1088 # Share._node points to me 1089 def __init__(self, verifycap, storage_broker, secret_holder, 1090 terminator, history): 1091 assert isinstance(verifycap, uri.CHKFileVerifierURI) 1092 self._verifycap = verifycap 1093 self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60) 1094 self.running = True 1095 terminator.register(self) # calls self.stop() at stopService() 1096 # the rules are: 1097 # 1: Only send network requests if you're active (self.running is True) 1098 # 2: Use TimerService, not reactor.callLater 1099 # 3: You can do eventual-sends any time. 1100 # These rules should mean that once 1101 # stopService()+flushEventualQueue() fires, everything will be done. 1102 self._secret_holder = secret_holder 1103 self._history = history 1104 1105 k, N = self._verifycap.needed_shares, self._verifycap.total_shares 1106 self.share_hash_tree = IncompleteHashTree(N) 1107 1108 # we guess the segment size, so Segmentation can pull non-initial 1109 # segments in a single roundtrip 1110 max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the 1111 # same place as upload.BaseUploadable 1112 s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k) 1113 self.guessed_segment_size = s 1114 r = self._calculate_sizes(self.guessed_segment_size) 1115 self.guessed_num_segments = r["num_segments"] 1116 1117 # filled in when we parse a valid UEB 1118 self.have_UEB = False 1119 self.segment_size = None 1120 self.tail_segment_size = None 1121 self.tail_segment_padded = None 1122 self.num_segments = None 1123 self.block_size = None 1124 self.tail_block_size = None 1125 self.ciphertext_hash_tree = None # size depends on num_segments 1126 self.ciphertext_hash = None # flat hash, optional 1127 1128 # things to track callers that want data 1129 1130 # _segment_requests can have duplicates 1131 self._segment_requests = [] # (segnum, d, cancel_handle) 1132 self._active_segment = None # a SegmentFetcher, with .segnum 1133 1134 self._sharefinder = ShareFinder(storage_broker, verifycap, self) 1135 self._shares = set() 1136 1137 def stop(self): 1138 # called by the Terminator at shutdown, mostly for tests 1139 if self._active_segment: 1140 self._active_segment.stop() 1141 self._active_segment = None 1142 self._sharefinder.stop() 1143 1144 # things called by outside callers, via CiphertextFileNode. get_segment() 1145 # may also be called by Segmentation. 1146 1147 def read(self, consumer, offset=0, size=None): 1148 """I am the main entry point, from which FileNode.read() can get 1149 data. I feed the consumer with the desired range of ciphertext. I 1150 return a Deferred that fires (with the consumer) when the read is 1151 finished.""" 1152 # for concurrent operations: each gets its own Segmentation manager 1153 if size is None: 1154 size = self._verifycap.size - offset 1155 log.msg(format="imm Node(%(si)s.read(%(offset)d, %(size)d)", 1156 si=base32.b2a(self._verifycap.storage_index)[:8], 1157 offset=offset, size=size, 1158 level=log.OPERATIONAL, umid="l3j3Ww") 1159 s = Segmentation(self, offset, size, consumer) 1160 # this raises an interesting question: what segments to fetch? if 1161 # offset=0, always fetch the first segment, and then allow 1162 # Segmentation to be responsible for pulling the subsequent ones if 1163 # the first wasn't large enough. If offset>0, we're going to need an 1164 # extra roundtrip to get the UEB (and therefore the segment size) 1165 # before we can figure out which segment to get. TODO: allow the 1166 # offset-table-guessing code (which starts by guessing the segsize) 1167 # to assist the offset>0 process. 1168 d = s.start() 1169 return d 1170 1171 def get_segment(self, segnum): 1172 """Begin downloading a segment. I return a tuple (d, c): 'd' is a 1173 Deferred that fires with (offset,data) when the desired segment is 1174 available, and c is an object on which c.cancel() can be called to 1175 disavow interest in the segment (after which 'd' will never fire). 1176 1177 You probably need to know the segment size before calling this, 1178 unless you want the first few bytes of the file. If you ask for a 1179 segment number which turns out to be too large, the Deferred will 1180 errback with BadSegmentNumberError. 1181 1182 The Deferred fires with the offset of the first byte of the data 1183 segment, so that you can call get_segment() before knowing the 1184 segment size, and still know which data you received. 1185 1186 The Deferred can also errback with other fatal problems, such as 1187 NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. 1188 """ 1189 log.msg(format="imm Node(%(si)s.get_segment(%(segnum)d)", 1190 si=base32.b2a(self._verifycap.storage_index)[:8], 1191 segnum=segnum, level=log.OPERATIONAL, umid="UKFjDQ") 1192 d = defer.Deferred() 1193 c = Cancel(self._cancel_request) 1194 self._segment_requests.append( (segnum, d, c) ) 1195 self._start_new_segment() 1196 return (d, c) 1197 1198 # things called by the Segmentation object used to transform 1199 # arbitrary-sized read() calls into quantized segment fetches 1200 1201 def _start_new_segment(self): 1202 if self._active_segment is None and self._segment_requests: 1203 segnum = self._segment_requests[0][0] 1204 k = self._verifycap.needed_shares 1205 self._active_segment = fetcher = SegmentFetcher(self, segnum, k) 1206 active_shares = [s for s in self._shares if s.not_dead()] 1207 fetcher.add_shares(active_shares) # this triggers the loop 1208 1209 1210 # called by our child ShareFinder 1211 def got_shares(self, shares): 1212 self._shares.update(shares) 1213 if self._active_segment: 1214 self._active_segment.add_shares(shares) 1215 def no_more_shares(self): 1216 self._no_more_shares = True 1217 if self._active_segment: 1218 self._active_segment.no_more_shares() 1219 1220 # things called by our Share instances 1221 1222 def validate_and_store_UEB(self, UEB_s): 1223 log.msg("validate_and_store_UEB", 1224 level=log.OPERATIONAL, umid="7sTrPw") 1225 h = hashutil.uri_extension_hash(UEB_s) 1226 if h != self._verifycap.uri_extension_hash: 1227 raise hashutil.BadHashError 1228 UEB_dict = uri.unpack_extension(UEB_s) 1229 self._parse_and_store_UEB(UEB_dict) # sets self._stuff 1230 # TODO: a malformed (but authentic) UEB could throw an assertion in 1231 # _parse_and_store_UEB, and we should abandon the download. 1232 self.have_UEB = True 1233 1234 def _parse_and_store_UEB(self, d): 1235 # Note: the UEB contains needed_shares and total_shares. These are 1236 # redundant and inferior (the filecap contains the authoritative 1237 # values). However, because it is possible to encode the same file in 1238 # multiple ways, and the encoders might choose (poorly) to use the 1239 # same key for both (therefore getting the same SI), we might 1240 # encounter shares for both types. The UEB hashes will be different, 1241 # however, and we'll disregard the "other" encoding's shares as 1242 # corrupted. 1243 1244 # therefore, we ignore d['total_shares'] and d['needed_shares']. 1245 1246 log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", 1247 ueb=repr(d), vcap=self._verifycap.to_string(), 1248 level=log.NOISY, umid="cVqZnA") 1249 1250 k, N = self._verifycap.needed_shares, self._verifycap.total_shares 1251 1252 self.segment_size = d['segment_size'] 1253 1254 r = self._calculate_sizes(self.segment_size) 1255 self.tail_segment_size = r["tail_segment_size"] 1256 self.tail_segment_padded = r["tail_segment_padded"] 1257 self.num_segments = r["num_segments"] 1258 self.block_size = r["block_size"] 1259 self.tail_block_size = r["tail_block_size"] 1260 log.msg("actual sizes: %s" % (r,), 1261 level=log.NOISY, umid="PY6P5Q") 1262 1263 # zfec.Decode() instantiation is fast, but still, let's use the same 1264 # codec instance for all but the last segment. 3-of-10 takes 15us on 1265 # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is 1266 # 2.5ms, worst-case 254-of-255 is 9.3ms 1267 self._codec = CRSDecoder() 1268 self._codec.set_params(self.segment_size, k, N) 1269 1270 1271 # Ciphertext hash tree root is mandatory, so that there is at most 1272 # one ciphertext that matches this read-cap or verify-cap. The 1273 # integrity check on the shares is not sufficient to prevent the 1274 # original encoder from creating some shares of file A and other 1275 # shares of file B. 1276 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) 1277 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) 1278 1279 self.share_hash_tree.set_hashes({0: d['share_root_hash']}) 1280 1281 # crypttext_hash is optional. We only pull this from the first UEB 1282 # that we see. 1283 if 'crypttext_hash' in d: 1284 if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE: 1285 self.ciphertext_hash = d['crypttext_hash'] 1286 else: 1287 log.msg("ignoring bad-length UEB[crypttext_hash], " 1288 "got %d bytes, want %d" % (len(d['crypttext_hash']), 1289 hashutil.CRYPTO_VAL_SIZE), 1290 umid="oZkGLA", level=log.WEIRD) 1291 1292 # Our job is a fast download, not verification, so we ignore any 1293 # redundant fields. The Verifier uses a different code path which 1294 # does not ignore them. 1295 1296 def _calculate_sizes(self, segment_size): 1297 # segments of ciphertext 1298 size = self._verifycap.size 1299 k = self._verifycap.needed_shares 1300 1301 # this assert matches the one in encode.py:127 inside 1302 # Encoded._got_all_encoding_parameters, where the UEB is constructed 1303 assert segment_size % k == 0 1304 1305 # the last segment is usually short. We don't store a whole segsize, 1306 # but we do pad the segment up to a multiple of k, because the 1307 # encoder requires that. 1308 tail_segment_size = size % segment_size 1309 if tail_segment_size == 0: 1310 tail_segment_size = segment_size 1311 padded = mathutil.next_multiple(tail_segment_size, k) 1312 tail_segment_padded = padded 1313 1314 num_segments = mathutil.div_ceil(size, segment_size) 1315 1316 # each segment is turned into N blocks. All but the last are of size 1317 # block_size, and the last is of size tail_block_size 1318 block_size = segment_size / k 1319 tail_block_size = tail_segment_padded / k 1320 1321 return { "tail_segment_size": tail_segment_size, 1322 "tail_segment_padded": tail_segment_padded, 1323 "num_segments": num_segments, 1324 "block_size": block_size, 1325 "tail_block_size": tail_block_size, 1326 } 1327 1328 1329 def process_share_hashes(self, share_hashes): 1330 self.share_hash_tree.set_hashes(share_hashes) 1331 1332 # called by our child SegmentFetcher 1333 1334 def want_more_shares(self): 1335 self._sharefinder.hungry() 1336 1337 def fetch_failed(self, sf, f): 1338 assert sf is self._active_segment 1339 self._active_segment = None 1340 # deliver error upwards 1341 for (d,c) in self._extract_requests(sf.segnum): 1342 eventually(self._deliver, d, c, f) 1343 1344 def process_blocks(self, segnum, blocks): 1345 d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) 1346 d.addCallback(self._check_ciphertext_hash, segnum) 1347 def _deliver(result): 1348 for (d,c) in self._extract_requests(segnum): 1349 eventually(self._deliver, d, c, result) 1350 self._active_segment = None 1351 self._start_new_segment() 1352 d.addBoth(_deliver) 1353 d.addErrback(lambda f: 1354 log.err(format="unhandled error during process_blocks", 1355 failure=f, level=log.WEIRD, umid="MkEsCg")) 1356 1357 def _decode_blocks(self, segnum, blocks): 1358 tail = (segnum == self.num_segments-1) 1359 codec = self._codec 1360 block_size = self.block_size 1361 decoded_size = self.segment_size 1362 if tail: 1363 # account for the padding in the last segment 1364 codec = CRSDecoder() 1365 k, N = self._verifycap.needed_shares, self._verifycap.total_shares 1366 codec.set_params(self.tail_segment_padded, k, N) 1367 block_size = self.tail_block_size 1368 decoded_size = self.tail_segment_padded 1369 1370 shares = [] 1371 shareids = [] 1372 for (shareid, share) in blocks.iteritems(): 1373 assert len(share) == block_size 1374 shareids.append(shareid) 1375 shares.append(share) 1376 del blocks 1377 1378 d = codec.decode(shares, shareids) # segment 1379 del shares 1380 def _process(buffers): 1381 segment = "".join(buffers) 1382 assert len(segment) == decoded_size 1383 del buffers 1384 if tail: 1385 segment = segment[:self.tail_segment_size] 1386 return segment 1387 d.addCallback(_process) 1388 return d 1389 1390 def _check_ciphertext_hash(self, segment, segnum): 1391 assert self._active_segment.segnum == segnum 1392 assert self.segment_size is not None 1393 offset = segnum * self.segment_size 1394 1395 h = hashutil.crypttext_segment_hash(segment) 1396 try: 1397 self.ciphertext_hash_tree.set_hashes(leaves={segnum: h}) 1398 return (offset, segment) 1399 except (BadHashError, NotEnoughHashesError): 1400 format = ("hash failure in ciphertext_hash_tree:" 1401 " segnum=%(segnum)d, SI=%(si)s") 1402 log.msg(format=format, segnum=segnum, si=self._si_prefix, 1403 failure=Failure(), level=log.WEIRD, umid="MTwNnw") 1404 # this is especially weird, because we made it past the share 1405 # hash tree. It implies that we're using the wrong encoding, or 1406 # that the uploader deliberately constructed a bad UEB. 1407 msg = format % {"segnum": segnum, "si": self._si_prefix} 1408 raise BadCiphertextHashError(msg) 1409 1410 def _deliver(self, d, c, result): 1411 # this method exists to handle cancel() that occurs between 1412 # _got_segment and _deliver 1413 if not c.cancelled: 1414 d.callback(result) # might actually be an errback 1415 1416 def _extract_requests(self, segnum): 1417 """Remove matching requests and return their (d,c) tuples so that the 1418 caller can retire them.""" 1419 retire = [(d,c) for (segnum0, d, c) in self._segment_requests 1420 if segnum0 == segnum] 1421 self._segment_requests = [t for t in self._segment_requests 1422 if t[0] != segnum] 1423 return retire 1424 1425 def _cancel_request(self, c): 1426 self._segment_requests = [t for t in self._segment_requests 1427 if t[2] != c] 1428 segnums = [segnum for (segnum,d,c) in self._segment_requests] 1429 if self._active_segment.segnum not in segnums: 1430 self._active_segment.stop() 1431 self._active_segment = None 1432 self._start_new_segment() 1433 1434 class CiphertextFileNode: 1435 def __init__(self, verifycap, storage_broker, secret_holder, 1436 terminator, history): 1437 assert isinstance(verifycap, uri.CHKFileVerifierURI) 1438 self._node = _Node(verifycap, storage_broker, secret_holder, 1439 terminator, history) 1440 1441 def read(self, consumer, offset=0, size=None): 1442 """I am the main entry point, from which FileNode.read() can get 1443 data. I feed the consumer with the desired range of ciphertext. I 1444 return a Deferred that fires (with the consumer) when the read is 1445 finished.""" 1446 return self._node.read(consumer, offset, size) 1447 1448 def get_segment(self, segnum): 1449 """Begin downloading a segment. I return a tuple (d, c): 'd' is a 1450 Deferred that fires with (offset,data) when the desired segment is 1451 available, and c is an object on which c.cancel() can be called to 1452 disavow interest in the segment (after which 'd' will never fire). 1453 1454 You probably need to know the segment size before calling this, 1455 unless you want the first few bytes of the file. If you ask for a 1456 segment number which turns out to be too large, the Deferred will 1457 errback with BadSegmentNumberError. 1458 1459 The Deferred fires with the offset of the first byte of the data 1460 segment, so that you can call get_segment() before knowing the 1461 segment size, and still know which data you received. 1462 """ 1463 return self._node.get_segment(segnum) 1464 1465 def raise_error(self): 1466 pass 1467 1468 1469 class DecryptingConsumer: 1470 """I sit between a CiphertextDownloader (which acts as a Producer) and 1471 the real Consumer, decrypting everything that passes by. The real 1472 Consumer sees the real Producer, but the Producer sees us instead of the 1473 real consumer.""" 1474 implements(IConsumer) 1475 1476 def __init__(self, consumer, readkey, offset): 1477 self._consumer = consumer 1478 # TODO: pycryptopp CTR-mode needs random-access operations: I want 1479 # either a=AES(readkey, offset) or better yet both of: 1480 # a=AES(readkey, offset=0) 1481 # a.process(ciphertext, offset=xyz) 1482 # For now, we fake it with the existing iv= argument. 1483 offset_big = offset // 16 1484 offset_small = offset % 16 1485 iv = binascii.unhexlify("%032x" % offset_big) 1486 self._decryptor = AES(readkey, iv=iv) 1487 self._decryptor.process("\x00"*offset_small) 1488 1489 def registerProducer(self, producer, streaming): 1490 # this passes through, so the real consumer can flow-control the real 1491 # producer. Therefore we don't need to provide any IPushProducer 1492 # methods. We implement all the IConsumer methods as pass-throughs, 1493 # and only intercept write() to perform decryption. 1494 self._consumer.registerProducer(producer, streaming) 1495 def unregisterProducer(self): 1496 self._consumer.unregisterProducer() 1497 def write(self, ciphertext): 1498 plaintext = self._decryptor.process(ciphertext) 1499 self._consumer.write(plaintext) 1500 1501 class ImmutableFileNode: 1502 implements(IImmutableFileNode) 1503 1504 # I wrap a CiphertextFileNode with a decryption key 1505 def __init__(self, filecap, storage_broker, secret_holder, terminator, 1506 history): 1507 assert isinstance(filecap, uri.CHKFileURI) 1508 verifycap = filecap.get_verify_cap() 1509 self._cnode = CiphertextFileNode(verifycap, storage_broker, 1510 secret_holder, terminator, history) 1511 assert isinstance(filecap, uri.CHKFileURI) 1512 self.u = filecap 1513 self._readkey = filecap.key 1514 1515 def read(self, consumer, offset=0, size=None): 1516 decryptor = DecryptingConsumer(consumer, self._readkey, offset) 1517 d = self._cnode.read(decryptor, offset, size) 1518 d.addCallback(lambda dc: consumer) 1519 return d 1520 1521 def raise_error(self): 1522 pass 1523 1524 def get_write_uri(self): 1525 return None 1526 1527 def get_readonly_uri(self): 1528 return self.get_uri() 1529 1530 def get_uri(self): 1531 return self.u.to_string() 1532 def get_cap(self): 1533 return self.u 1534 def get_readcap(self): 1535 return self.u.get_readonly() 1536 def get_verify_cap(self): 1537 return self.u.get_verify_cap() 1538 def get_repair_cap(self): 1539 # CHK files can be repaired with just the verifycap 1540 return self.u.get_verify_cap() 1541 1542 def get_storage_index(self): 1543 return self.u.get_storage_index() 1544 1545 def get_size(self): 1546 return self.u.get_size() 1547 def get_current_size(self): 1548 return defer.succeed(self.get_size()) 1549 1550 def is_mutable(self): 1551 return False 1552 1553 def is_readonly(self): 1554 return True 1555 1556 def is_unknown(self): 1557 return False 1558 1559 def is_allowed_in_immutable_directory(self): 1560 return True 1561 1562 1563 # TODO: if server1 has all shares, and server2-10 have one each, make the 1564 # loop stall slightly before requesting all shares from the first server, to 1565 # give it a chance to learn about the other shares and get some diversity. 1566 # Or, don't bother, let the first block all come from one server, and take 1567 # comfort in the fact that we'll learn about the other servers by the time we 1568 # fetch the second block. 1569 # 1570 # davidsarah points out that we could use sequential (instead of parallel) 1571 # fetching of multiple block from a single server: by the time the first 1572 # block arrives, we'll hopefully have heard about other shares. This would 1573 # induce some RTT delays (i.e. lose pipelining) in the case that this server 1574 # has the only shares, but that seems tolerable. We could rig it to only use 1575 # sequential requests on the first segment. 1576 1577 # as a query gets later, we're more willing to duplicate work. 1578 1579 # should change server read protocol to allow small shares to be fetched in a 1580 # single RTT. Instead of get_buckets-then-read, just use read(shnums, readv), 1581 # where shnums=[] means all shares, and the return value is a dict of 1582 # # shnum->ta (like with mutable files). The DYHB query should also fetch the 1583 # offset table, since everything else can be located once we have that. 1584 1585 1586 # ImmutableFileNode 1587 # DecryptingConsumer 1588 # CiphertextFileNode 1589 # Segmentation 1590 # ShareFinder 1591 # SegmentFetcher[segnum] (one at a time) 1592 # CommonShare[shnum] 1593 # Share[shnum,server] 1594 1595 # TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers 1596 # should be failed with BadSegmentNumberError. But should this be the 1597 # responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will 1598 # first appear when a Share receives a valid UEB and calls 1599 # CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is 1600 # expecting to hear from the Share, via the _block_request_activity observer. 1601 1602 # make it the responsibility of the SegmentFetcher. Each Share that gets a 1603 # valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or 1604 # CORRUPT). The SegmentFetcher it then responsible for shutting down, and 1605 # informing its parent (the CiphertextFileNode) of the BadSegmentNumberError, 1606 # which is then passed to the client of get_segment(). 1607 1608 1609 # TODO: if offset table is corrupt, attacker could cause us to fetch whole 1610 # (large) share 1611 1612 # log budget: when downloading at 1MBps (i.e. 8 segments-per-second), 10 1613 # log.OPERATIONAL per second, 100 log.NOISY per second. With k=3, that's 3 1614 # log.NOISY per block fetch. -
new file src/allmydata/immutable/download2_off.py
diff --git a/src/allmydata/immutable/download2_off.py b/src/allmydata/immutable/download2_off.py new file mode 100755 index 0000000..d2b8b99
- + 1 #! /usr/bin/python 2 3 # known (shnum,Server) pairs are sorted into a list according to 4 # desireability. This sort is picking a winding path through a matrix of 5 # [shnum][server]. The goal is to get diversity of both shnum and server. 6 7 # The initial order is: 8 # find the lowest shnum on the first server, add it 9 # look at the next server, find the lowest shnum that we don't already have 10 # if any 11 # next server, etc, until all known servers are checked 12 # now look at servers that we skipped (because ... 13 14 # Keep track of which block requests are outstanding by (shnum,Server). Don't 15 # bother prioritizing "validated" shares: the overhead to pull the share hash 16 # chain is tiny (4 hashes = 128 bytes), and the overhead to pull a new block 17 # hash chain is also tiny (1GB file, 8192 segments of 128KiB each, 13 hashes, 18 # 832 bytes). Each time a block request is sent, also request any necessary 19 # hashes. Don't bother with a "ValidatedShare" class (as distinct from some 20 # other sort of Share). Don't bother avoiding duplicate hash-chain requests. 21 22 # For each outstanding segread, walk the list and send requests (skipping 23 # outstanding shnums) until requests for k distinct shnums are in flight. If 24 # we can't do that, ask for more. If we get impatient on a request, find the 25 # first non-outstanding 26 27 # start with the first Share in the list, and send a request. Then look at 28 # the next one. If we already have a pending request for the same shnum or 29 # server, push that Share down onto the fallback list and try the next one, 30 # etc. If we run out of non-fallback shares, use the fallback ones, 31 # preferring shnums that we don't have outstanding requests for (i.e. assume 32 # that all requests will complete). Do this by having a second fallback list. 33 34 # hell, I'm reviving the Herder. But remember, we're still talking 3 objects 35 # per file, not thousands. 36 37 # actually, don't bother sorting the initial list. Append Shares as the 38 # responses come back, that will put the fastest servers at the front of the 39 # list, and give a tiny preference to servers that are earlier in the 40 # permuted order. 41 42 # more ideas: 43 # sort shares by: 44 # 1: number of roundtrips needed to get some data 45 # 2: share number 46 # 3: ms of RTT delay 47 # maybe measure average time-to-completion of requests, compare completion 48 # time against that, much larger indicates congestion on the server side 49 # or the server's upstream speed is less than our downstream. Minimum 50 # time-to-completion indicates min(our-downstream,their-upstream). Could 51 # fetch shares one-at-a-time to measure that better. 52 53 # when should we risk duplicate work and send a new request? 54 55 def walk(self): 56 shares = sorted(list) 57 oldshares = copy(shares) 58 outstanding = list() 59 fallbacks = list() 60 second_fallbacks = list() 61 while len(outstanding.nonlate.shnums) < k: # need more requests 62 while oldshares: 63 s = shares.pop(0) 64 if s.server in outstanding.servers or s.shnum in outstanding.shnums: 65 fallbacks.append(s) 66 continue 67 outstanding.append(s) 68 send_request(s) 69 break #'while need_more_requests' 70 # must use fallback list. Ask for more servers while we're at it. 71 ask_for_more_servers() 72 while fallbacks: 73 s = fallbacks.pop(0) 74 if s.shnum in outstanding.shnums: 75 # assume that the outstanding requests will complete, but 76 # send new requests for other shnums to existing servers 77 second_fallbacks.append(s) 78 continue 79 outstanding.append(s) 80 send_request(s) 81 break #'while need_more_requests' 82 # if we get here, we're being forced to send out multiple queries per 83 # share. We've already asked for more servers, which might help. If 84 # there are no late outstanding queries, then duplicate shares won't 85 # help. Don't send queries for duplicate shares until some of the 86 # queries are late. 87 if outstanding.late: 88 # we're allowed to try any non-outstanding share 89 while second_fallbacks: 90 pass 91 newshares = outstanding + fallbacks + second_fallbacks + oldshares 92 93 94 class Server: 95 """I represent an abstract Storage Server. One day, the StorageBroker 96 will return instances of me. For now, the StorageBroker returns (peerid, 97 RemoteReference) tuples, and this code wraps a Server instance around 98 them. 99 """ 100 def __init__(self, peerid, ss): 101 self.peerid = peerid 102 self.remote = ss 103 self._remote_buckets = {} # maps shnum to RIBucketReader 104 # TODO: release the bucket references on shares that we no longer 105 # want. OTOH, why would we not want them? Corruption? 106 107 def send_query(self, storage_index): 108 """I return a Deferred that fires with a set of shnums. If the server 109 had shares available, I will retain the RemoteReferences to its 110 buckets, so that get_data(shnum, range) can be called later.""" 111 d = self.remote.callRemote("get_buckets", self.storage_index) 112 d.addCallback(self._got_response) 113 return d 114 115 def _got_response(self, r): 116 self._remote_buckets = r 117 return set(r.keys()) 118 119 class ShareOnAServer: 120 """I represent one instance of a share, known to live on a specific 121 server. I am created every time a server responds affirmatively to a 122 do-you-have-block query.""" 123 124 def __init__(self, shnum, server): 125 self._shnum = shnum 126 self._server = server 127 self._block_hash_tree = None 128 129 def cost(self, segnum): 130 """I return a tuple of (roundtrips, bytes, rtt), indicating how 131 expensive I think it would be to fetch the given segment. Roundtrips 132 indicates how many roundtrips it is likely to take (one to get the 133 data and hashes, plus one to get the offset table and UEB if this is 134 the first segment we've ever fetched). 'bytes' is how many bytes we 135 must fetch (estimated). 'rtt' is estimated round-trip time (float) in 136 seconds for a trivial request. The downloading algorithm will compare 137 costs to decide which shares should be used.""" 138 # the most significant factor here is roundtrips: a Share for which 139 # we already have the offset table is better to than a brand new one 140 141 def max_bandwidth(self): 142 """Return a float, indicating the highest plausible bytes-per-second 143 that I've observed coming from this share. This will be based upon 144 the minimum (bytes-per-fetch / time-per-fetch) ever observed. This 145 can we used to estimate the server's upstream bandwidth. Clearly this 146 is only accurate if a share is retrieved with no contention for 147 either the upstream, downstream, or middle of the connection, but it 148 may still serve as a useful metric for deciding which servers to pull 149 from.""" 150 151 def get_segment(self, segnum): 152 """I return a Deferred that will fire with the segment data, or 153 errback.""" 154 155 class NativeShareOnAServer(ShareOnAServer): 156 """For tahoe native (foolscap) servers, I contain a RemoteReference to 157 the RIBucketReader instance.""" 158 def __init__(self, shnum, server, rref): 159 ShareOnAServer.__init__(self, shnum, server) 160 self._rref = rref # RIBucketReader 161 162 class Share: 163 def __init__(self, shnum): 164 self._shnum = shnum 165 # _servers are the Server instances which appear to hold a copy of 166 # this share. It is populated when the ValidShare is first created, 167 # or when we receive a get_buckets() response for a shnum that 168 # already has a ValidShare instance. When we lose the connection to a 169 # server, we remove it. 170 self._servers = set() 171 # offsets, UEB, and share_hash_tree all live in the parent. 172 # block_hash_tree lives here. 173 self._block_hash_tree = None 174 175 self._want 176 177 def get_servers(self): 178 return self._servers 179 180 181 def get_block(self, segnum): 182 # read enough data to obtain a single validated block 183 if not self.have_offsets: 184 # we get the offsets in their own read, since they tell us where 185 # everything else lives. We must fetch offsets for each share 186 # separately, since they aren't directly covered by the UEB. 187 pass 188 if not self.parent.have_ueb: 189 # use _guessed_segsize to make a guess about the layout, so we 190 # can fetch both the offset table and the UEB in the same read. 191 # This also requires making a guess about the presence or absence 192 # of the plaintext_hash_tree. Oh, and also the version number. Oh 193 # well. 194 pass 195 196 class CiphertextDownloader: 197 """I manage all downloads for a single file. I operate a state machine 198 with input events that are local read() requests, responses to my remote 199 'get_bucket' and 'read_bucket' messages, and connection establishment and 200 loss. My outbound events are connection establishment requests and bucket 201 read requests messages. 202 """ 203 # eventually this will merge into the FileNode 204 ServerClass = Server # for tests to override 205 206 def __init__(self, storage_index, ueb_hash, size, k, N, storage_broker, 207 shutdowner): 208 # values we get from the filecap 209 self._storage_index = si = storage_index 210 self._ueb_hash = ueb_hash 211 self._size = size 212 self._needed_shares = k 213 self._total_shares = N 214 self._share_hash_tree = IncompleteHashTree(self._total_shares) 215 # values we discover when we first fetch the UEB 216 self._ueb = None # is dict after UEB fetch+validate 217 self._segsize = None 218 self._numsegs = None 219 self._blocksize = None 220 self._tail_segsize = None 221 self._ciphertext_hash = None # optional 222 # structures we create when we fetch the UEB, then continue to fill 223 # as we download the file 224 self._share_hash_tree = None # is IncompleteHashTree after UEB fetch 225 self._ciphertext_hash_tree = None 226 227 # values we learn as we download the file 228 self._offsets = {} # (shnum,Server) to offset table (dict) 229 self._block_hash_tree = {} # shnum to IncompleteHashTree 230 # other things which help us 231 self._guessed_segsize = min(128*1024, size) 232 self._active_share_readers = {} # maps shnum to Reader instance 233 self._share_readers = [] # sorted by preference, best first 234 self._readers = set() # set of Reader instances 235 self._recent_horizon = 10 # seconds 236 237 # 'shutdowner' is a MultiService parent used to cancel all downloads 238 # when the node is shutting down, to let tests have a clean reactor. 239 240 self._init_available_servers() 241 self._init_find_enough_shares() 242 243 # _available_servers is an iterator that provides us with Server 244 # instances. Each time we pull out a Server, we immediately send it a 245 # query, so we don't need to keep track of who we've sent queries to. 246 247 def _init_available_servers(self): 248 self._available_servers = self._get_available_servers() 249 self._no_more_available_servers = False 250 251 def _get_available_servers(self): 252 """I am a generator of servers to use, sorted by the order in which 253 we should query them. I make sure there are no duplicates in this 254 list.""" 255 # TODO: make StorageBroker responsible for this non-duplication, and 256 # replace this method with a simple iter(get_servers_for_index()), 257 # plus a self._no_more_available_servers=True 258 seen = set() 259 sb = self._storage_broker 260 for (peerid, ss) in sb.get_servers_for_index(self._storage_index): 261 if peerid not in seen: 262 yield self.ServerClass(peerid, ss) # Server(peerid, ss) 263 seen.add(peerid) 264 self._no_more_available_servers = True 265 266 # this block of code is responsible for having enough non-problematic 267 # distinct shares/servers available and ready for download, and for 268 # limiting the number of queries that are outstanding. The idea is that 269 # we'll use the k fastest/best shares, and have the other ones in reserve 270 # in case those servers stop responding or respond too slowly. We keep 271 # track of all known shares, but we also keep track of problematic shares 272 # (ones with hash failures or lost connections), so we can put them at 273 # the bottom of the list. 274 275 def _init_find_enough_shares(self): 276 # _unvalidated_sharemap maps shnum to set of Servers, and remembers 277 # where viable (but not yet validated) shares are located. Each 278 # get_bucket() response adds to this map, each act of validation 279 # removes from it. 280 self._sharemap = DictOfSets() 281 282 # _sharemap maps shnum to set of Servers, and remembers where viable 283 # shares are located. Each get_bucket() response adds to this map, 284 # each hash failure or disconnect removes from it. (TODO: if we 285 # disconnect but reconnect later, we should be allowed to re-query). 286 self._sharemap = DictOfSets() 287 288 # _problem_shares is a set of (shnum, Server) tuples, and 289 290 # _queries_in_flight maps a Server to a timestamp, which remembers 291 # which servers we've sent queries to (and when) but have not yet 292 # heard a response. This lets us put a limit on the number of 293 # outstanding queries, to limit the size of the work window (how much 294 # extra work we ask servers to do in the hopes of keeping our own 295 # pipeline filled). We remove a Server from _queries_in_flight when 296 # we get an answer/error or we finally give up. If we ever switch to 297 # a non-connection-oriented protocol (like UDP, or forwarded Chord 298 # queries), we can use this information to retransmit any query that 299 # has gone unanswered for too long. 300 self._queries_in_flight = dict() 301 302 def _count_recent_queries_in_flight(self): 303 now = time.time() 304 recent = now - self._recent_horizon 305 return len([s for (s,when) in self._queries_in_flight.items() 306 if when > recent]) 307 308 def _find_enough_shares(self): 309 # goal: have 2*k distinct not-invalid shares available for reading, 310 # from 2*k distinct servers. Do not have more than 4*k "recent" 311 # queries in flight at a time. 312 if (len(self._sharemap) >= 2*self._needed_shares 313 and len(self._sharemap.values) >= 2*self._needed_shares): 314 return 315 num = self._count_recent_queries_in_flight() 316 while num < 4*self._needed_shares: 317 try: 318 s = self._available_servers.next() 319 except StopIteration: 320 return # no more progress can be made 321 self._queries_in_flight[s] = time.time() 322 d = s.send_query(self._storage_index) 323 d.addBoth(incidentally, self._queries_in_flight.discard, s) 324 d.addCallbacks(lambda shnums: [self._sharemap.add(shnum, s) 325 for shnum in shnums], 326 lambda f: self._query_error(f, s)) 327 d.addErrback(self._error) 328 d.addCallback(self._reschedule) 329 num += 1 330 331 def _query_error(self, f, s): 332 # a server returned an error, log it gently and ignore 333 level = log.WEIRD 334 if f.check(DeadReferenceError): 335 level = log.UNUSUAL 336 log.msg("Error during get_buckets to server=%(server)s", server=str(s), 337 failure=f, level=level, umid="3uuBUQ") 338 339 # this block is responsible for turning known shares into usable shares, 340 # by fetching enough data to validate their contents. 341 342 # UEB (from any share) 343 # share hash chain, validated (from any share, for given shnum) 344 # block hash (any share, given shnum) 345 346 def _got_ueb(self, ueb_data, share): 347 if self._ueb is not None: 348 return 349 if hashutil.uri_extension_hash(ueb_data) != self._ueb_hash: 350 share.error("UEB hash does not match") 351 return 352 d = uri.unpack_extension(ueb_data) 353 self.share_size = mathutil.div_ceil(self._size, self._needed_shares) 354 355 356 # There are several kinds of things that can be found in a UEB. 357 # First, things that we really need to learn from the UEB in order to 358 # do this download. Next: things which are optional but not redundant 359 # -- if they are present in the UEB they will get used. Next, things 360 # that are optional and redundant. These things are required to be 361 # consistent: they don't have to be in the UEB, but if they are in 362 # the UEB then they will be checked for consistency with the 363 # already-known facts, and if they are inconsistent then an exception 364 # will be raised. These things aren't actually used -- they are just 365 # tested for consistency and ignored. Finally: things which are 366 # deprecated -- they ought not be in the UEB at all, and if they are 367 # present then a warning will be logged but they are otherwise 368 # ignored. 369 370 # First, things that we really need to learn from the UEB: 371 # segment_size, crypttext_root_hash, and share_root_hash. 372 self._segsize = d['segment_size'] 373 374 self._blocksize = mathutil.div_ceil(self._segsize, self._needed_shares) 375 self._numsegs = mathutil.div_ceil(self._size, self._segsize) 376 377 self._tail_segsize = self._size % self._segsize 378 if self._tail_segsize == 0: 379 self._tail_segsize = self._segsize 380 # padding for erasure code 381 self._tail_segsize = mathutil.next_multiple(self._tail_segsize, 382 self._needed_shares) 383 384 # Ciphertext hash tree root is mandatory, so that there is at most 385 # one ciphertext that matches this read-cap or verify-cap. The 386 # integrity check on the shares is not sufficient to prevent the 387 # original encoder from creating some shares of file A and other 388 # shares of file B. 389 self._ciphertext_hash_tree = IncompleteHashTree(self._numsegs) 390 self._ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) 391 392 self._share_hash_tree.set_hashes({0: d['share_root_hash']}) 393 394 395 # Next: things that are optional and not redundant: crypttext_hash 396 if 'crypttext_hash' in d: 397 if len(self._ciphertext_hash) == hashutil.CRYPTO_VAL_SIZE: 398 self._ciphertext_hash = d['crypttext_hash'] 399 else: 400 log.msg("ignoring bad-length UEB[crypttext_hash], " 401 "got %d bytes, want %d" % (len(d['crypttext_hash']), 402 hashutil.CRYPTO_VAL_SIZE), 403 umid="oZkGLA", level=log.WEIRD) 404 405 # we ignore all of the redundant fields when downloading. The 406 # Verifier uses a different code path which does not ignore them. 407 408 # finally, set self._ueb as a marker that we don't need to request it 409 # anymore 410 self._ueb = d 411 412 def _got_share_hashes(self, hashes, share): 413 assert isinstance(hashes, dict) 414 try: 415 self._share_hash_tree.set_hashes(hashes) 416 except (IndexError, BadHashError, NotEnoughHashesError), le: 417 share.error("Bad or missing hashes") 418 return 419 420 #def _got_block_hashes( 421 422 def _init_validate_enough_shares(self): 423 # _valid_shares maps shnum to ValidatedShare instances, and is 424 # populated once the block hash root has been fetched and validated 425 # (which requires any valid copy of the UEB, and a valid copy of the 426 # share hash chain for each shnum) 427 self._valid_shares = {} 428 429 # _target_shares is an ordered list of ReadyShare instances, each of 430 # which is a (shnum, server) tuple. It is sorted in order of 431 # preference: we expect to get the fastest response from the 432 # ReadyShares at the front of the list. It is also sorted to 433 # distribute the shnums, so that fetching shares from 434 # _target_shares[:k] is likely (but not guaranteed) to give us k 435 # distinct shares. The rule is that we skip over entries for blocks 436 # that we've already received, limit the number of recent queries for 437 # the same block, 438 self._target_shares = [] 439 440 def _validate_enough_shares(self): 441 # my goal is to have at least 2*k distinct validated shares from at 442 # least 2*k distinct servers 443 valid_share_servers = set() 444 for vs in self._valid_shares.values(): 445 valid_share_servers.update(vs.get_servers()) 446 if (len(self._valid_shares) >= 2*self._needed_shares 447 and len(self._valid_share_servers) >= 2*self._needed_shares): 448 return 449 #for 450 451 def _reschedule(self, _ign): 452 # fire the loop again 453 if not self._scheduled: 454 self._scheduled = True 455 eventually(self._loop) 456 457 def _loop(self): 458 self._scheduled = False 459 # what do we need? 460 461 self._find_enough_shares() 462 self._validate_enough_shares() 463 464 if not self._ueb: 465 # we always need a copy of the UEB 466 pass 467 468 def _error(self, f): 469 # this is an unexpected error: a coding bug 470 log.err(f, level=log.UNUSUAL) 471 472 473 474 # using a single packed string (and an offset table) may be an artifact of 475 # our native storage server: other backends might allow cheap multi-part 476 # files (think S3, several buckets per share, one for each section). 477 478 # find new names for: 479 # data_holder 480 # Share / Share2 (ShareInstance / Share? but the first is more useful) 481 482 class IShare(Interface): 483 """I represent a single instance of a single share (e.g. I reference the 484 shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). 485 This interface is used by SegmentFetcher to retrieve validated blocks. 486 """ 487 def get_block(segnum): 488 """Return an Observer2, which will be notified with the following 489 events: 490 state=COMPLETE, block=data (terminal): validated block data 491 state=OVERDUE (non-terminal): we have reason to believe that the 492 request might have stalled, or we 493 might just be impatient 494 state=CORRUPT (terminal): the data we received was corrupt 495 state=DEAD (terminal): the connection has failed 496 """ 497 498 499 # it'd be nice if we receive the hashes before the block, or just 500 # afterwards, so we aren't stuck holding on to unvalidated blocks 501 # that we can't process. If we guess the offsets right, we can 502 # accomplish this by sending the block request after the metadata 503 # requests (by keeping two separate requestlists), and have a one RTT 504 # pipeline like: 505 # 1a=metadata, 1b=block 506 # 1b->process+deliver : one RTT 507 508 # But if we guess wrong, and fetch the wrong part of the block, we'll 509 # have a pipeline that looks like: 510 # 1a=wrong metadata, 1b=wrong block 511 # 1a->2a=right metadata,2b=right block 512 # 2b->process+deliver 513 # which means two RTT and buffering one block (which, since we'll 514 # guess the segsize wrong for everything, means buffering one 515 # segment) 516 517 # if we start asking for multiple segments, we could get something 518 # worse: 519 # 1a=wrong metadata, 1b=wrong block0, 1c=wrong block1, .. 520 # 1a->2a=right metadata,2b=right block0,2c=right block1, . 521 # 2b->process+deliver 522 523 # which means two RTT but fetching and buffering the whole file 524 # before delivering anything. However, since we don't know when the 525 # other shares are going to arrive, we need to avoid having more than 526 # one block in the pipeline anyways. So we shouldn't be able to get 527 # into this state. 528 529 # it also means that, instead of handling all of 530 # self._requested_blocks at once, we should only be handling one 531 # block at a time: one of the requested block should be special 532 # (probably FIFO). But retire all we can. 533 534 # this might be better with a Deferred, using COMPLETE as the success 535 # case and CORRUPT/DEAD in an errback, because that would let us hold the 536 # 'share' and 'shnum' arguments locally (instead of roundtripping them 537 # through Share.send_request). But that OVERDUE is not terminal. So I 538 # want a new sort of callback mechanism, with the extra-argument-passing 539 # aspects of Deferred, but without being so one-shot. Is this a job for 540 # Observer? No, it doesn't take extra arguments. So this uses Observer2. 541 542 543 class Reader: 544 """I am responsible for a single offset+size read of the file. I handle 545 segmentation: I figure out which segments are necessary, request them 546 (from my CiphertextDownloader) in order, and trim the segments down to 547 match the offset+size span. I use the Producer/Consumer interface to only 548 request one segment at a time. 549 """ 550 implements(IPushProducer) 551 def __init__(self, consumer, offset, size): 552 self._needed = [] 553 self._consumer = consumer 554 self._hungry = False 555 self._offset = offset 556 self._size = size 557 self._segsize = None 558 def start(self): 559 self._alive = True 560 self._deferred = defer.Deferred() 561 # the process doesn't actually start until set_segment_size() 562 return self._deferred 563 564 def set_segment_size(self, segsize): 565 if self._segsize is not None: 566 return 567 self._segsize = segsize 568 self._compute_segnums() 569 570 def _compute_segnums(self, segsize): 571 # now that we know the file's segsize, what segments (and which 572 # ranges of each) will we need? 573 size = self._size 574 offset = self._offset 575 while size: 576 assert size >= 0 577 this_seg_num = int(offset / self._segsize) 578 this_seg_offset = offset - (seg_num*self._segsize) 579 this_seg_size = min(size, self._segsize-seg_offset) 580 size -= this_seg_size 581 if size: 582 offset += this_seg_size 583 yield (this_seg_num, this_seg_offset, this_seg_size) 584 585 def get_needed_segments(self): 586 return set([segnum for (segnum, off, size) in self._needed]) 587 588 589 def stopProducing(self): 590 self._hungry = False 591 self._alive = False 592 # TODO: cancel the segment requests 593 def pauseProducing(self): 594 self._hungry = False 595 def resumeProducing(self): 596 self._hungry = True 597 def add_segment(self, segnum, offset, size): 598 self._needed.append( (segnum, offset, size) ) 599 def got_segment(self, segnum, segdata): 600 """Return True if this schedule has more to go, or False if it is 601 done.""" 602 assert self._needed[0][segnum] == segnum 603 (_ign, offset, size) = self._needed.pop(0) 604 data = segdata[offset:offset+size] 605 self._consumer.write(data) 606 if not self._needed: 607 # we're done 608 self._alive = False 609 self._hungry = False 610 self._consumer.unregisterProducer() 611 self._deferred.callback(self._consumer) 612 def error(self, f): 613 self._alive = False 614 self._hungry = False 615 self._consumer.unregisterProducer() 616 self._deferred.errback(f) 617 618 619 620 class x: 621 def OFFread(self, consumer, offset=0, size=None): 622 """I am the main entry point, from which FileNode.read() can get 623 data.""" 624 # tolerate concurrent operations: each gets its own Reader 625 if size is None: 626 size = self._size - offset 627 r = Reader(consumer, offset, size) 628 self._readers.add(r) 629 d = r.start() 630 if self.segment_size is not None: 631 r.set_segment_size(self.segment_size) 632 # TODO: if we can't find any segments, and thus never get a 633 # segsize, tell the Readers to give up 634 return d -
new file src/allmydata/immutable/download2_util.py
diff --git a/src/allmydata/immutable/download2_util.py b/src/allmydata/immutable/download2_util.py new file mode 100755 index 0000000..9e20ff4
- + 1 import weakref 2 3 from twisted.application import service 4 from foolscap.api import eventually 5 6 class Observer2: 7 """A simple class to distribute multiple events to a single subscriber. 8 It accepts arbitrary kwargs, but no posargs.""" 9 def __init__(self): 10 self._watcher = None 11 self._undelivered_results = [] 12 self._canceler = None 13 14 def set_canceler(self, f): 15 # we use a weakref to avoid creating a cycle between us and the thing 16 # we're observing: they'll be holding a reference to us to compare 17 # against the value we pass to their canceler function. 18 self._canceler = weakref.ref(f) 19 20 def subscribe(self, observer, **watcher_kwargs): 21 self._watcher = (observer, watcher_kwargs) 22 while self._undelivered_results: 23 self._notify(self._undelivered_results.pop(0)) 24 25 def notify(self, **result_kwargs): 26 if self._watcher: 27 self._notify(result_kwargs) 28 else: 29 self._undelivered_results.append(result_kwargs) 30 31 def _notify(self, result_kwargs): 32 o, watcher_kwargs = self._watcher 33 kwargs = dict(result_kwargs) 34 kwargs.update(watcher_kwargs) 35 eventually(o, **kwargs) 36 37 def cancel(self): 38 f = self._canceler() 39 if f: 40 f(self) 41 42 43 def incidentally(res, f, *args, **kwargs): 44 """Add me to a Deferred chain like this: 45 d.addBoth(incidentally, func, arg) 46 and I'll behave as if you'd added the following function: 47 def _(res): 48 func(arg) 49 return res 50 This is useful if you want to execute an expression when the Deferred 51 fires, but don't care about its value. 52 """ 53 f(*args, **kwargs) 54 return res 55 56 57 class Terminator(service.Service): 58 def __init__(self): 59 self._clients = weakref.WeakKeyDictionary() 60 def register(self, c): 61 self._clients[c] = None 62 def stopService(self): 63 for c in self._clients: 64 c.stop() 65 return service.Service.stopService(self) -
src/allmydata/nodemaker.py
diff --git a/src/allmydata/nodemaker.py b/src/allmydata/nodemaker.py index a30efbf..36ddfc7 100644
a b import weakref 2 2 from zope.interface import implements 3 3 from allmydata.util.assertutil import precondition 4 4 from allmydata.interfaces import INodeMaker, MustBeDeepImmutableError 5 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode 5 from allmydata.immutable.filenode import LiteralFileNode 6 from allmydata.immutable.download2 import ImmutableFileNode 6 7 from allmydata.immutable.upload import Data 7 8 from allmydata.mutable.filenode import MutableFileNode 8 9 from allmydata.dirnode import DirectoryNode, pack_children … … class NodeMaker: 17 18 implements(INodeMaker) 18 19 19 20 def __init__(self, storage_broker, secret_holder, history, 20 uploader, downloader, download_cache_dirman,21 uploader, terminator, 21 22 default_encoding_parameters, key_generator): 22 23 self.storage_broker = storage_broker 23 24 self.secret_holder = secret_holder 24 25 self.history = history 25 26 self.uploader = uploader 26 self.downloader = downloader 27 self.download_cache_dirman = download_cache_dirman 27 self.terminator = terminator 28 28 self.default_encoding_parameters = default_encoding_parameters 29 29 self.key_generator = key_generator 30 30 … … class NodeMaker: 34 34 return LiteralFileNode(cap) 35 35 def _create_immutable(self, cap): 36 36 return ImmutableFileNode(cap, self.storage_broker, self.secret_holder, 37 self.downloader, self.history, 38 self.download_cache_dirman) 37 self.terminator, self.history) 39 38 def _create_mutable(self, cap): 40 39 n = MutableFileNode(self.storage_broker, self.secret_holder, 41 40 self.default_encoding_parameters, … … class NodeMaker: 48 47 # this returns synchronously. It starts with a "cap string". 49 48 assert isinstance(writecap, (str, type(None))), type(writecap) 50 49 assert isinstance(readcap, (str, type(None))), type(readcap) 51 50 52 51 bigcap = writecap or readcap 53 52 if not bigcap: 54 53 # maybe the writecap was hidden because we're in a readonly -
src/allmydata/test/test_util.py
diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 0a326b3..5f6ce67 100644
a b from twisted.trial import unittest 7 7 from twisted.internet import defer, reactor 8 8 from twisted.python.failure import Failure 9 9 from twisted.python import log 10 from hashlib import md5 10 11 11 12 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil 12 13 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate 13 14 from allmydata.util import limiter, time_format, pollmixin, cachedir 14 15 from allmydata.util import statistics, dictutil, pipeline 15 16 from allmydata.util import log as tahoe_log 17 from allmydata.util.spans import Spans, overlap, DataSpans 16 18 17 19 class Base32(unittest.TestCase): 18 20 def test_b2a_matches_Pythons(self): … … class Log(unittest.TestCase): 1537 1539 tahoe_log.err(format="intentional sample error", 1538 1540 failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ") 1539 1541 self.flushLoggedErrors(SampleError) 1542 1543 1544 class SimpleSpans: 1545 # this is a simple+inefficient form of util.spans.Spans . We compare the 1546 # behavior of this reference model against the real (efficient) form. 1547 1548 def __init__(self, _span_or_start=None, length=None): 1549 self._have = set() 1550 if length is not None: 1551 for i in range(_span_or_start, _span_or_start+length): 1552 self._have.add(i) 1553 elif _span_or_start: 1554 for (start,length) in _span_or_start: 1555 self.add(start, length) 1556 1557 def add(self, start, length): 1558 for i in range(start, start+length): 1559 self._have.add(i) 1560 return self 1561 1562 def remove(self, start, length): 1563 for i in range(start, start+length): 1564 self._have.discard(i) 1565 return self 1566 1567 def each(self): 1568 return sorted(self._have) 1569 1570 def __iter__(self): 1571 items = sorted(self._have) 1572 prevstart = None 1573 prevend = None 1574 for i in items: 1575 if prevstart is None: 1576 prevstart = prevend = i 1577 continue 1578 if i == prevend+1: 1579 prevend = i 1580 continue 1581 yield (prevstart, prevend-prevstart+1) 1582 prevstart = prevend = i 1583 if prevstart is not None: 1584 yield (prevstart, prevend-prevstart+1) 1585 1586 def __len__(self): 1587 # this also gets us bool(s) 1588 return len(self._have) 1589 1590 def __add__(self, other): 1591 s = self.__class__(self) 1592 for (start, length) in other: 1593 s.add(start, length) 1594 return s 1595 1596 def __sub__(self, other): 1597 s = self.__class__(self) 1598 for (start, length) in other: 1599 s.remove(start, length) 1600 return s 1601 1602 def __iadd__(self, other): 1603 for (start, length) in other: 1604 self.add(start, length) 1605 return self 1606 1607 def __isub__(self, other): 1608 for (start, length) in other: 1609 self.remove(start, length) 1610 return self 1611 1612 def __contains__(self, (start,length)): 1613 for i in range(start, start+length): 1614 if i not in self._have: 1615 return False 1616 return True 1617 1618 class ByteSpans(unittest.TestCase): 1619 def test_basic(self): 1620 s = Spans() 1621 self.failUnlessEqual(list(s), []) 1622 self.failIf(s) 1623 self.failIf((0,1) in s) 1624 self.failUnlessEqual(len(s), 0) 1625 1626 s1 = Spans(3, 4) # 3,4,5,6 1627 self._check1(s1) 1628 1629 s2 = Spans(s1) 1630 self._check1(s2) 1631 1632 s2.add(10,2) # 10,11 1633 self._check1(s1) 1634 self.failUnless((10,1) in s2) 1635 self.failIf((10,1) in s1) 1636 self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) 1637 self.failUnlessEqual(len(s2), 6) 1638 1639 s2.add(15,2).add(20,2) 1640 self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) 1641 self.failUnlessEqual(len(s2), 10) 1642 1643 s2.remove(4,3).remove(15,1) 1644 self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) 1645 self.failUnlessEqual(len(s2), 6) 1646 1647 def _check1(self, s): 1648 self.failUnlessEqual(list(s), [(3,4)]) 1649 self.failUnless(s) 1650 self.failUnlessEqual(len(s), 4) 1651 self.failIf((0,1) in s) 1652 self.failUnless((3,4) in s) 1653 self.failUnless((3,1) in s) 1654 self.failUnless((5,2) in s) 1655 self.failUnless((6,1) in s) 1656 self.failIf((6,2) in s) 1657 self.failIf((7,1) in s) 1658 self.failUnlessEqual(list(s.each()), [3,4,5,6]) 1659 1660 def test_math(self): 1661 s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 1662 s2 = Spans(5, 3) # 5,6,7 1663 s3 = Spans(8, 4) # 8,9,10,11 1664 1665 s = s1 - s2 1666 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) 1667 s = s1 - s3 1668 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) 1669 s = s2 - s3 1670 self.failUnlessEqual(list(s.each()), [5,6,7]) 1671 1672 s = s1 + s2 1673 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) 1674 s = s1 + s3 1675 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) 1676 s = s2 + s3 1677 self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) 1678 1679 s = Spans(s1) 1680 s -= s2 1681 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) 1682 s = Spans(s1) 1683 s -= s3 1684 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) 1685 s = Spans(s2) 1686 s -= s3 1687 self.failUnlessEqual(list(s.each()), [5,6,7]) 1688 1689 s = Spans(s1) 1690 s += s2 1691 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) 1692 s = Spans(s1) 1693 s += s3 1694 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) 1695 s = Spans(s2) 1696 s += s3 1697 self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) 1698 1699 def test_random(self): 1700 # attempt to increase coverage of corner cases by comparing behavior 1701 # of a simple-but-slow model implementation against the 1702 # complex-but-fast actual implementation, in a large number of random 1703 # operations 1704 S1 = SimpleSpans 1705 S2 = Spans 1706 s1 = S1(); s2 = S2() 1707 seed = "" 1708 def _create(subseed): 1709 ns1 = S1(); ns2 = S2() 1710 for i in range(10): 1711 what = md5(subseed+str(i)).hexdigest() 1712 start = int(what[2:4], 16) 1713 length = max(1,int(what[5:6], 16)) 1714 ns1.add(start, length); ns2.add(start, length) 1715 return ns1, ns2 1716 1717 #print 1718 for i in range(1000): 1719 what = md5(seed+str(i)).hexdigest() 1720 op = what[0] 1721 subop = what[1] 1722 start = int(what[2:4], 16) 1723 length = max(1,int(what[5:6], 16)) 1724 #print what 1725 if op in "0": 1726 if subop in "01234": 1727 s1 = S1(); s2 = S2() 1728 elif subop in "5678": 1729 s1 = S1(start, length); s2 = S2(start, length) 1730 else: 1731 s1 = S1(s1); s2 = S2(s2) 1732 #print "s2 = %s" % s2.dump() 1733 elif op in "123": 1734 #print "s2.add(%d,%d)" % (start, length) 1735 s1.add(start, length); s2.add(start, length) 1736 elif op in "456": 1737 #print "s2.remove(%d,%d)" % (start, length) 1738 s1.remove(start, length); s2.remove(start, length) 1739 elif op in "78": 1740 ns1, ns2 = _create(what[7:11]) 1741 #print "s2 + %s" % ns2.dump() 1742 s1 = s1 + ns1; s2 = s2 + ns2 1743 elif op in "9a": 1744 ns1, ns2 = _create(what[7:11]) 1745 #print "%s - %s" % (s2.dump(), ns2.dump()) 1746 s1 = s1 - ns1; s2 = s2 - ns2 1747 elif op in "bc": 1748 ns1, ns2 = _create(what[7:11]) 1749 #print "s2 += %s" % ns2.dump() 1750 s1 += ns1; s2 += ns2 1751 else: 1752 ns1, ns2 = _create(what[7:11]) 1753 #print "%s -= %s" % (s2.dump(), ns2.dump()) 1754 s1 -= ns1; s2 -= ns2 1755 #print "s2 now %s" % s2.dump() 1756 self.failUnlessEqual(list(s1.each()), list(s2.each())) 1757 self.failUnlessEqual(len(s1), len(s2)) 1758 self.failUnlessEqual(bool(s1), bool(s2)) 1759 self.failUnlessEqual(list(s1), list(s2)) 1760 for j in range(10): 1761 what = md5(what[12:14]+str(j)).hexdigest() 1762 start = int(what[2:4], 16) 1763 length = max(1, int(what[5:6], 16)) 1764 span = (start, length) 1765 self.failUnlessEqual(bool(span in s1), bool(span in s2)) 1766 1767 1768 # s() 1769 # s(start,length) 1770 # s(s0) 1771 # s.add(start,length) : returns s 1772 # s.remove(start,length) 1773 # s.each() -> list of byte offsets, mostly for testing 1774 # list(s) -> list of (start,length) tuples, one per span 1775 # (start,length) in s -> True if (start..start+length-1) are all members 1776 # NOT equivalent to x in list(s) 1777 # len(s) -> number of bytes, for testing, bool(), and accounting/limiting 1778 # bool(s) (__len__) 1779 # s = s1+s2, s1-s2, +=s1, -=s1 1780 1781 def test_overlap(self): 1782 for a in range(20): 1783 for b in range(10): 1784 for c in range(20): 1785 for d in range(10): 1786 self._test_overlap(a,b,c,d) 1787 1788 def _test_overlap(self, a, b, c, d): 1789 s1 = set(range(a,a+b)) 1790 s2 = set(range(c,c+d)) 1791 #print "---" 1792 #self._show_overlap(s1, "1") 1793 #self._show_overlap(s2, "2") 1794 o = overlap(a,b,c,d) 1795 expected = s1.intersection(s2) 1796 if not expected: 1797 self.failUnlessEqual(o, None) 1798 else: 1799 start,length = o 1800 so = set(range(start,start+length)) 1801 #self._show(so, "o") 1802 self.failUnlessEqual(so, expected) 1803 1804 def _show_overlap(self, s, c): 1805 import sys 1806 out = sys.stdout 1807 if s: 1808 for i in range(max(s)): 1809 if i in s: 1810 out.write(c) 1811 else: 1812 out.write(" ") 1813 out.write("\n") 1814 1815 def extend(s, start, length, fill): 1816 if len(s) >= start+length: 1817 return s 1818 assert len(fill) == 1 1819 return s + fill*(start+length-len(s)) 1820 1821 def replace(s, start, data): 1822 assert len(s) >= start+len(data) 1823 return s[:start] + data + s[start+len(data):] 1824 1825 class SimpleDataSpans: 1826 def __init__(self, other=None): 1827 self.missing = "" # "1" where missing, "0" where found 1828 self.data = "" 1829 if other: 1830 for (start, data) in other.get_spans(): 1831 self.add(start, data) 1832 1833 def __len__(self): 1834 return len(self.missing.translate(None, "1")) 1835 def _dump(self): 1836 return [i for (i,c) in enumerate(self.missing) if c == "0"] 1837 def _have(self, start, length): 1838 m = self.missing[start:start+length] 1839 if not m or len(m)<length or int(m): 1840 return False 1841 return True 1842 def get_spans(self): 1843 for i in self._dump(): 1844 yield (i, self.data[i]) 1845 def get(self, start, length): 1846 if self._have(start, length): 1847 return self.data[start:start+length] 1848 return None 1849 def pop(self, start, length): 1850 data = self.get(start, length) 1851 if data: 1852 self.remove(start, length) 1853 return data 1854 def remove(self, start, length): 1855 self.missing = replace(extend(self.missing, start, length, "1"), 1856 start, "1"*length) 1857 def add(self, start, data): 1858 self.missing = replace(extend(self.missing, start, len(data), "1"), 1859 start, "0"*len(data)) 1860 self.data = replace(extend(self.data, start, len(data), " "), 1861 start, data) 1862 1863 1864 class StringSpans(unittest.TestCase): 1865 def do_basic(self, klass): 1866 ds = klass() 1867 self.failUnlessEqual(len(ds), 0) 1868 self.failUnlessEqual(list(ds._dump()), []) 1869 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 0) 1870 self.failUnlessEqual(ds.get(0, 4), None) 1871 self.failUnlessEqual(ds.pop(0, 4), None) 1872 ds.remove(0, 4) 1873 1874 ds.add(2, "four") 1875 self.failUnlessEqual(len(ds), 4) 1876 self.failUnlessEqual(list(ds._dump()), [2,3,4,5]) 1877 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4) 1878 self.failUnlessEqual(ds.get(0, 4), None) 1879 self.failUnlessEqual(ds.pop(0, 4), None) 1880 self.failUnlessEqual(ds.get(4, 4), None) 1881 1882 ds2 = klass(ds) 1883 self.failUnlessEqual(len(ds2), 4) 1884 self.failUnlessEqual(list(ds2._dump()), [2,3,4,5]) 1885 self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 4) 1886 self.failUnlessEqual(ds2.get(0, 4), None) 1887 self.failUnlessEqual(ds2.pop(0, 4), None) 1888 self.failUnlessEqual(ds2.pop(2, 3), "fou") 1889 self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 1) 1890 self.failUnlessEqual(ds2.get(2, 3), None) 1891 self.failUnlessEqual(ds2.get(5, 1), "r") 1892 self.failUnlessEqual(ds.get(2, 3), "fou") 1893 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4) 1894 1895 ds.add(0, "23") 1896 self.failUnlessEqual(len(ds), 6) 1897 self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5]) 1898 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 6) 1899 self.failUnlessEqual(ds.get(0, 4), "23fo") 1900 self.failUnlessEqual(ds.pop(0, 4), "23fo") 1901 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 2) 1902 self.failUnlessEqual(ds.get(0, 4), None) 1903 self.failUnlessEqual(ds.pop(0, 4), None) 1904 1905 ds = klass() 1906 ds.add(2, "four") 1907 ds.add(3, "ea") 1908 self.failUnlessEqual(ds.get(2, 4), "fear") 1909 1910 def do_scan(self, klass): 1911 # do a test with gaps and spans of size 1 and 2 1912 # left=(1,11) * right=(1,11) * gapsize=(1,2) 1913 # 111, 112, 121, 122, 211, 212, 221, 222 1914 # 211 1915 # 121 1916 # 112 1917 # 212 1918 # 222 1919 # 221 1920 # 111 1921 # 122 1922 # 11 1 1 11 11 11 1 1 111 1923 # 0123456789012345678901234567 1924 # abcdefghijklmnopqrstuvwxyz-= 1925 pieces = [(1, "bc"), 1926 (4, "e"), 1927 (7, "h"), 1928 (9, "jk"), 1929 (12, "mn"), 1930 (16, "qr"), 1931 (20, "u"), 1932 (22, "w"), 1933 (25, "z-="), 1934 ] 1935 p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27]) 1936 S = "abcdefghijklmnopqrstuvwxyz-=" 1937 # TODO: when adding data, add capital letters, to make sure we aren't 1938 # just leaving the old data in place 1939 l = len(S) 1940 def base(): 1941 ds = klass() 1942 for start, data in pieces: 1943 ds.add(start, data) 1944 return ds 1945 def dump(s): 1946 p = set(s._dump()) 1947 # wow, this is the first time I've ever wanted ?: in python 1948 # note: this requires python2.5 1949 d = "".join([(S[i] if i in p else " ") for i in range(l)]) 1950 assert len(d) == l 1951 return d 1952 DEBUG = False 1953 for start in range(0, l): 1954 for end in range(start+1, l): 1955 # add [start-end) to the baseline 1956 which = "%d-%d" % (start, end-1) 1957 p_added = set(range(start, end)) 1958 b = base() 1959 if DEBUG: 1960 print 1961 print dump(b), which 1962 add = klass(); add.add(start, S[start:end]) 1963 print dump(add) 1964 b.add(start, S[start:end]) 1965 if DEBUG: 1966 print dump(b) 1967 # check that the new span is there 1968 d = b.get(start, end-start) 1969 self.failUnlessEqual(d, S[start:end], which) 1970 # check that all the original pieces are still there 1971 for t_start, t_data in pieces: 1972 t_len = len(t_data) 1973 self.failUnlessEqual(b.get(t_start, t_len), 1974 S[t_start:t_start+t_len], 1975 "%s %d+%d" % (which, t_start, t_len)) 1976 # check that a lot of subspans are mostly correct 1977 for t_start in range(l): 1978 for t_len in range(1,4): 1979 d = b.get(t_start, t_len) 1980 if d is not None: 1981 which2 = "%s+(%d-%d)" % (which, t_start, 1982 t_start+t_len-1) 1983 self.failUnlessEqual(d, S[t_start:t_start+t_len], 1984 which2) 1985 # check that removing a subspan gives the right value 1986 b2 = klass(b) 1987 b2.remove(t_start, t_len) 1988 removed = set(range(t_start, t_start+t_len)) 1989 for i in range(l): 1990 exp = (((i in p_elements) or (i in p_added)) 1991 and (i not in removed)) 1992 which2 = "%s-(%d-%d)" % (which, t_start, 1993 t_start+t_len-1) 1994 self.failUnlessEqual(bool(b2.get(i, 1)), exp, 1995 which2+" %d" % i) 1996 1997 def test_test(self): 1998 self.do_basic(SimpleDataSpans) 1999 self.do_scan(SimpleDataSpans) 2000 2001 def test_basic(self): 2002 self.do_basic(DataSpans) 2003 self.do_scan(DataSpans) 2004 2005 def test_random(self): 2006 # attempt to increase coverage of corner cases by comparing behavior 2007 # of a simple-but-slow model implementation against the 2008 # complex-but-fast actual implementation, in a large number of random 2009 # operations 2010 S1 = SimpleDataSpans 2011 S2 = DataSpans 2012 s1 = S1(); s2 = S2() 2013 seed = "" 2014 def _randstr(length, seed): 2015 created = 0 2016 pieces = [] 2017 while created < length: 2018 piece = md5(seed + str(created)).hexdigest() 2019 pieces.append(piece) 2020 created += len(piece) 2021 return "".join(pieces)[:length] 2022 def _create(subseed): 2023 ns1 = S1(); ns2 = S2() 2024 for i in range(10): 2025 what = md5(subseed+str(i)).hexdigest() 2026 start = int(what[2:4], 16) 2027 length = max(1,int(what[5:6], 16)) 2028 ns1.add(start, _randstr(length, what[7:9])); 2029 ns2.add(start, _randstr(length, what[7:9])) 2030 return ns1, ns2 2031 2032 #print 2033 for i in range(1000): 2034 what = md5(seed+str(i)).hexdigest() 2035 op = what[0] 2036 subop = what[1] 2037 start = int(what[2:4], 16) 2038 length = max(1,int(what[5:6], 16)) 2039 #print what 2040 if op in "0": 2041 if subop in "0123456": 2042 s1 = S1(); s2 = S2() 2043 else: 2044 s1, s2 = _create(what[7:11]) 2045 #print "s2 = %s" % list(s2._dump()) 2046 elif op in "123456": 2047 #print "s2.add(%d,%d)" % (start, length) 2048 s1.add(start, _randstr(length, what[7:9])); 2049 s2.add(start, _randstr(length, what[7:9])) 2050 elif op in "789abc": 2051 #print "s2.remove(%d,%d)" % (start, length) 2052 s1.remove(start, length); s2.remove(start, length) 2053 else: 2054 #print "s2.pop(%d,%d)" % (start, length) 2055 d1 = s1.pop(start, length); d2 = s2.pop(start, length) 2056 self.failUnlessEqual(d1, d2) 2057 #print "s1 now %s" % list(s1._dump()) 2058 #print "s2 now %s" % list(s2._dump()) 2059 self.failUnlessEqual(len(s1), len(s2)) 2060 self.failUnlessEqual(list(s1._dump()), list(s2._dump())) 2061 for j in range(100): 2062 what = md5(what[12:14]+str(j)).hexdigest() 2063 start = int(what[2:4], 16) 2064 length = max(1, int(what[5:6], 16)) 2065 d1 = s1.get(start, length); d2 = s2.get(start, length) 2066 self.failUnlessEqual(d1, d2, "%d+%d" % (start, length)) -
src/allmydata/util/dictutil.py
diff --git a/src/allmydata/util/dictutil.py b/src/allmydata/util/dictutil.py index 3dc815b..91785ac 100644
a b class DictOfSets(dict): 57 57 if not self[key]: 58 58 del self[key] 59 59 60 def allvalues(self): 61 # return a set that merges all value sets 62 r = set() 63 for key in self: 64 r.update(self[key]) 65 return r 66 60 67 class UtilDict: 61 68 def __init__(self, initialdata={}): 62 69 self.d = {} -
new file src/allmydata/util/spans.py
diff --git a/src/allmydata/util/spans.py b/src/allmydata/util/spans.py new file mode 100755 index 0000000..336fddf
- + 1 2 class Spans: 3 """I represent a compressed list of booleans, one per index (an integer). 4 Typically, each index represents an offset into a large string, pointing 5 to a specific byte of a share. In this context, True means that byte has 6 been received, or has been requested. 7 8 Another way to look at this is maintaining a set of integers, optimized 9 for operations on spans like 'add range to set' and 'is range in set?'. 10 11 This is a python equivalent of perl's Set::IntSpan module, frequently 12 used to represent .newsrc contents. 13 14 Rather than storing an actual (large) list or dictionary, I represent my 15 internal state as a sorted list of spans, each with a start and a length. 16 My API is presented in terms of start+length pairs. I provide set 17 arithmetic operators, to efficiently answer questions like 'I want bytes 18 XYZ, I already requested bytes ABC, and I've already received bytes DEF: 19 what bytes should I request now?'. 20 21 The new downloader will use it to keep track of which bytes we've requested 22 or received already. 23 """ 24 25 def __init__(self, _span_or_start=None, length=None): 26 self._spans = list() 27 if length is not None: 28 self._spans.append( (_span_or_start, length) ) 29 elif _span_or_start: 30 for (start,length) in _span_or_start: 31 self.add(start, length) 32 self._check() 33 34 def _check(self): 35 assert sorted(self._spans) == self._spans 36 prev_end = None 37 try: 38 for (start,length) in self._spans: 39 if prev_end is not None: 40 assert start > prev_end 41 prev_end = start+length 42 except AssertionError: 43 print "BAD:", self.dump() 44 raise 45 46 def add(self, start, length): 47 assert start >= 0 48 assert length > 0 49 #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump()) 50 first_overlap = last_overlap = None 51 for i,(s_start,s_length) in enumerate(self._spans): 52 #print " (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length)) 53 if (overlap(s_start, s_length, start, length) 54 or adjacent(s_start, s_length, start, length)): 55 last_overlap = i 56 if first_overlap is None: 57 first_overlap = i 58 continue 59 # no overlap 60 if first_overlap is not None: 61 break 62 #print " first_overlap", first_overlap, last_overlap 63 if first_overlap is None: 64 # no overlap, so just insert the span and sort by starting 65 # position. 66 self._spans.insert(0, (start,length)) 67 self._spans.sort() 68 else: 69 # everything from [first_overlap] to [last_overlap] overlapped 70 first_start,first_length = self._spans[first_overlap] 71 last_start,last_length = self._spans[last_overlap] 72 newspan_start = min(start, first_start) 73 newspan_end = max(start+length, last_start+last_length) 74 newspan_length = newspan_end - newspan_start 75 newspan = (newspan_start, newspan_length) 76 self._spans[first_overlap:last_overlap+1] = [newspan] 77 #print " ADD done: %s" % self.dump() 78 self._check() 79 80 return self 81 82 def remove(self, start, length): 83 assert start >= 0 84 assert length > 0 85 #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump()) 86 first_complete_overlap = last_complete_overlap = None 87 for i,(s_start,s_length) in enumerate(self._spans): 88 s_end = s_start + s_length 89 o = overlap(s_start, s_length, start, length) 90 if o: 91 o_start, o_length = o 92 o_end = o_start+o_length 93 if o_start == s_start and o_end == s_end: 94 # delete this span altogether 95 if first_complete_overlap is None: 96 first_complete_overlap = i 97 last_complete_overlap = i 98 elif o_start == s_start: 99 # we only overlap the left side, so trim the start 100 # 1111 101 # rrrr 102 # oo 103 # -> 11 104 new_start = o_end 105 new_end = s_end 106 assert new_start > s_start 107 new_length = new_end - new_start 108 self._spans[i] = (new_start, new_length) 109 elif o_end == s_end: 110 # we only overlap the right side 111 # 1111 112 # rrrr 113 # oo 114 # -> 11 115 new_start = s_start 116 new_end = o_start 117 assert new_end < s_end 118 new_length = new_end - new_start 119 self._spans[i] = (new_start, new_length) 120 else: 121 # we overlap the middle, so create a new span. No need to 122 # examine any other spans. 123 # 111111 124 # rr 125 # LL RR 126 left_start = s_start 127 left_end = o_start 128 left_length = left_end - left_start 129 right_start = o_end 130 right_end = s_end 131 right_length = right_end - right_start 132 self._spans[i] = (left_start, left_length) 133 self._spans.append( (right_start, right_length) ) 134 self._spans.sort() 135 break 136 if first_complete_overlap is not None: 137 del self._spans[first_complete_overlap:last_complete_overlap+1] 138 #print " REMOVE done: %s" % self.dump() 139 self._check() 140 return self 141 142 def dump(self): 143 return "len=%d: %s" % (len(self), 144 ",".join(["[%d-%d]" % (start,start+l-1) 145 for (start,l) in self._spans]) ) 146 147 def each(self): 148 for start, length in self._spans: 149 for i in range(start, start+length): 150 yield i 151 152 def __iter__(self): 153 for s in self._spans: 154 yield s 155 156 def __len__(self): 157 # this also gets us bool(s) 158 return sum([length for start,length in self._spans]) 159 160 def __add__(self, other): 161 s = self.__class__(self) 162 for (start, length) in other: 163 s.add(start, length) 164 return s 165 166 def __sub__(self, other): 167 s = self.__class__(self) 168 for (start, length) in other: 169 s.remove(start, length) 170 return s 171 172 def __iadd__(self, other): 173 for (start, length) in other: 174 self.add(start, length) 175 return self 176 177 def __isub__(self, other): 178 for (start, length) in other: 179 self.remove(start, length) 180 return self 181 182 def __contains__(self, (start,length)): 183 for span_start,span_length in self._spans: 184 o = overlap(start, length, span_start, span_length) 185 if o: 186 o_start,o_length = o 187 if o_start == start and o_length == length: 188 return True 189 return False 190 191 def overlap(start0, length0, start1, length1): 192 # return start2,length2 of the overlapping region, or None 193 # 00 00 000 0000 00 00 000 00 00 00 00 194 # 11 11 11 11 111 11 11 1111 111 11 11 195 left = max(start0, start1) 196 right = min(start0+length0, start1+length1) 197 # if there is overlap, 'left' will be its start, and right-1 will 198 # be the end' 199 if left < right: 200 return (left, right-left) 201 return None 202 203 def adjacent(start0, length0, start1, length1): 204 if (start0 < start1) and start0+length0 == start1: 205 return True 206 elif (start1 < start0) and start1+length1 == start0: 207 return True 208 return False 209 210 class DataSpans: 211 """I represent portions of a large string. Equivalently, I can be said to 212 maintain a large array of characters (with gaps of empty elements). I can 213 be used to manage access to a remote share, where some pieces have been 214 retrieved, some have been requested, and others have not been read. 215 """ 216 217 def __init__(self, other=None): 218 self.spans = [] # (start, data) tuples, non-overlapping, merged 219 if other: 220 for (start, data) in other.get_spans(): 221 self.add(start, data) 222 223 def __len__(self): 224 # return number of bytes we're holding 225 return sum([len(data) for (start,data) in self.spans]) 226 227 def _dump(self): 228 # return iterator of sorted list of offsets, one per byte 229 for (start,data) in self.spans: 230 for i in range(start, start+len(data)): 231 yield i 232 233 def get_spans(self): 234 return list(self.spans) 235 236 def assert_invariants(self): 237 if not self.spans: 238 return 239 prev_start = self.spans[0][0] 240 prev_end = prev_start + len(self.spans[0][1]) 241 for start, data in self.spans[1:]: 242 if not start > prev_end: 243 # adjacent or overlapping: bad 244 print "ASSERTION FAILED", self.spans 245 raise AssertionError 246 247 def get(self, start, length): 248 # returns a string of LENGTH, or None 249 #print "get", start, length, self.spans 250 end = start+length 251 for (s_start,s_data) in self.spans: 252 s_end = s_start+len(s_data) 253 #print " ",s_start,s_end 254 if s_start <= start < s_end: 255 # we want some data from this span. Because we maintain 256 # strictly merged and non-overlapping spans, everything we 257 # want must be in this span. 258 offset = start - s_start 259 if offset + length > len(s_data): 260 #print " None, span falls short" 261 return None # span falls short 262 #print " some", s_data[offset:offset+length] 263 return s_data[offset:offset+length] 264 if s_start >= end: 265 # we've gone too far: no further spans will overlap 266 #print " None, gone too far" 267 return None 268 #print " None, ran out of spans" 269 return None 270 271 def add(self, start, data): 272 # first: walk through existing spans, find overlap, modify-in-place 273 # create list of new spans 274 # add new spans 275 # sort 276 # merge adjacent spans 277 #print "add", start, data, self.spans 278 end = start + len(data) 279 i = 0 280 while len(data): 281 #print " loop", start, data, i, len(self.spans), self.spans 282 if i >= len(self.spans): 283 #print " append and done" 284 # append a last span 285 self.spans.append( (start, data) ) 286 break 287 (s_start,s_data) = self.spans[i] 288 # five basic cases: 289 # a: OLD b:OLDD c1:OLD c2:OLD d1:OLDD d2:OLD e: OLLDD 290 # NEW NEW NEW NEWW NEW NEW NEW 291 # 292 # we handle A by inserting a new segment (with "N") and looping, 293 # turning it into B or C. We handle B by replacing a prefix and 294 # terminating. We handle C (both c1 and c2) by replacing the 295 # segment (and, for c2, looping, turning it into A). We handle D 296 # by replacing a suffix (and, for d2, looping, turning it into 297 # A). We handle E by replacing the middle and terminating. 298 if start < s_start: 299 # case A: insert a new span, then loop with the remainder 300 #print " insert new psan" 301 s_len = s_start-start 302 self.spans.insert(i, (start, data[:s_len])) 303 i += 1 304 start = s_start 305 data = data[s_len:] 306 continue 307 s_len = len(s_data) 308 s_end = s_start+s_len 309 if s_start <= start < s_end: 310 #print " modify this span", s_start, start, s_end 311 # we want to modify some data in this span: a prefix, a 312 # suffix, or the whole thing 313 if s_start == start: 314 if s_end <= end: 315 #print " replace whole segment" 316 # case C: replace this segment 317 self.spans[i] = (s_start, data[:s_len]) 318 i += 1 319 start += s_len 320 data = data[s_len:] 321 # C2 is where len(data)>0 322 continue 323 # case B: modify the prefix, retain the suffix 324 #print " modify prefix" 325 self.spans[i] = (s_start, data + s_data[len(data):]) 326 break 327 if start > s_start and end < s_end: 328 # case E: modify the middle 329 #print " modify middle" 330 prefix_len = start - s_start # we retain this much 331 suffix_len = s_end - end # and retain this much 332 newdata = s_data[:prefix_len] + data + s_data[-suffix_len:] 333 self.spans[i] = (s_start, newdata) 334 break 335 # case D: retain the prefix, modify the suffix 336 #print " modify suffix" 337 prefix_len = start - s_start # we retain this much 338 suffix_len = s_len - prefix_len # we replace this much 339 #print " ", s_data, prefix_len, suffix_len, s_len, data 340 self.spans[i] = (s_start, 341 s_data[:prefix_len] + data[:suffix_len]) 342 i += 1 343 start += suffix_len 344 data = data[suffix_len:] 345 #print " now", start, data 346 # D2 is where len(data)>0 347 continue 348 # else we're not there yet 349 #print " still looking" 350 i += 1 351 continue 352 # now merge adjacent spans 353 #print " merging", self.spans 354 newspans = [] 355 for (s_start,s_data) in self.spans: 356 if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]), 357 s_start, len(s_data)): 358 newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data) 359 else: 360 newspans.append( (s_start, s_data) ) 361 self.spans = newspans 362 self.assert_invariants() 363 #print " done", self.spans 364 365 def remove(self, start, length): 366 i = 0 367 end = start + length 368 #print "remove", start, length, self.spans 369 while i < len(self.spans): 370 (s_start,s_data) = self.spans[i] 371 if s_start >= end: 372 # this segment is entirely right of the removed region, and 373 # all further segments are even further right. We're done. 374 break 375 s_len = len(s_data) 376 s_end = s_start + s_len 377 o = overlap(start, length, s_start, s_len) 378 if not o: 379 i += 1 380 continue 381 o_start, o_len = o 382 o_end = o_start + o_len 383 if o_len == s_len: 384 # remove the whole segment 385 del self.spans[i] 386 continue 387 if o_start == s_start: 388 # remove a prefix, leaving the suffix from o_end to s_end 389 prefix_len = o_end - o_start 390 self.spans[i] = (o_end, s_data[prefix_len:]) 391 i += 1 392 continue 393 elif o_end == s_end: 394 # remove a suffix, leaving the prefix from s_start to o_start 395 prefix_len = o_start - s_start 396 self.spans[i] = (s_start, s_data[:prefix_len]) 397 i += 1 398 continue 399 # remove the middle, creating a new segment 400 # left is s_start:o_start, right is o_end:s_end 401 left_len = o_start - s_start 402 left = s_data[:left_len] 403 right_len = s_end - o_end 404 right = s_data[-right_len:] 405 self.spans[i] = (s_start, left) 406 self.spans.insert(i+1, (o_end, right)) 407 break 408 #print " done", self.spans 409 410 def pop(self, start, length): 411 data = self.get(start, length) 412 if data: 413 self.remove(start, length) 414 return data