Ticket #798: new-downloader-v2.diff
File new-downloader-v2.diff, 127.7 KB (added by warner, at 2010-04-23T23:35:11Z) |
---|
-
new file src/allmydata/immutable/download2.py
diff --git a/src/allmydata/immutable/download2.py b/src/allmydata/immutable/download2.py new file mode 100644 index 0000000..a665533
- + 1 2 import binascii 3 from allmydata.util.hashtree import IncompleteHashTree, BadHashError, \ 4 NotEnoughHashesError 5 6 (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \ 7 ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM") 8 9 class BadSegmentNumberError(Exception): 10 pass 11 12 class Share: 13 """I represent a single instance of a single share (e.g. I reference the 14 shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). 15 I am associated with a CommonShare that remembers data that is held in 16 common among e.g. SI=abcde/shnum2 across all servers. I am also 17 associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all 18 servers). 19 """ 20 # this is a specific implementation of IShare for tahoe's native storage 21 # servers. A different backend would use a different class. 22 23 def __init__(self, rref, verifycap, commonshare, node, peerid, si_s, shnum): 24 self._rref = rref 25 self._node = node # holds share_hash_tree and UEB 26 self._guess_offsets(verifycap, node.guessed_segment_size) 27 self.actual_offsets = None 28 self.actual_segment_size = None 29 self._UEB_length = None 30 self._commonshare = commonshare # holds block_hash_tree 31 self._peerid = peerid 32 self._peerid_s = base32.b2(peerid)[:5] 33 self._si_prefix = si_s[:8] 34 self._shnum = shnum 35 36 self._lp = log.msg(format="Share(%(si)s) on server=%(server)s starting", 37 si=self._si_prefix, server=self._peerid_s, 38 level=log.NOISY, umid="P7hv2w") 39 40 self._wanted = Spans() # desired metadata 41 self._wanted_blocks = Spans() # desired block data 42 self._requested = Spans() # we've sent a request for this 43 self._received = Spans() # we've received a response for this 44 self._received_data = DataSpans() # the response contents, still unused 45 self._requested_blocks = [] # (segnum, set(observer2..)) 46 ver = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] 47 self._overrun_ok = ver["tolerates-immutable-read-overrun"] 48 # If _overrun_ok and we guess the offsets correctly, we can get 49 # everything in one RTT. If _overrun_ok and we guess wrong, we might 50 # need two RTT (but we could get lucky and do it in one). If overrun 51 # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version, 52 # 2=offset table, 3=UEB_length and everything else (hashes, block), 53 # 4=UEB. 54 55 def _guess_offsets(self, verifycap, guessed_segment_size): 56 self.guessed_segment_size = guessed_segment_size 57 size = verifycap.size 58 k = verifycap.needed_shares 59 N = verifycap.total_shares 60 r = self._node._calculate_sizes(guessed_segment_size, size, k) 61 offsets = {} 62 for i,field in enumerate('data', 63 'plaintext_hash_tree', # UNUSED 64 'crypttext_hash_tree', 65 'block_hashes', 66 'share_hashes', 67 'uri_extension', 68 ): 69 offsets[field] = i # bad guesses are easy :) # XXX stub 70 self.guessed_offsets = offsets 71 self._fieldsize = 4 72 self._fieldstruct = ">L" 73 74 # called by our client, the SegmentFetcher 75 def get_block(self, segnum): 76 """Add a block number to the list of requests. This will eventually 77 result in a fetch of the data necessary to validate the block, then 78 the block itself. The fetch order is generally 79 first-come-first-served, but requests may be answered out-of-order if 80 data becomes available sooner. 81 82 I return an Observer2, which has two uses. The first is to call 83 o.subscribe(), which gives me a place to send state changes and 84 eventually the data block. The second is o.cancel(), which removes 85 the request (if it is still active). 86 87 I will distribute the following events through my Observer2: 88 - state=OVERDUE: ?? I believe I should have had an answer by now. 89 You may want to ask another share instead. 90 - state=BADSEGNUM: the segnum you asked for is too large. I must 91 fetch a valid UEB before I can determine this, 92 so the notification is asynchronous 93 - state=COMPLETE, block=data: here is a valid block 94 - state=CORRUPT: this share contains corrupted data 95 - state=DEAD, f=Failure: the server reported an error, this share 96 is unusable 97 """ 98 assert segnum >= 0 99 o = Observer2() 100 o.set_canceler(self._cancel_block_request) 101 for i,(segnum0,observers) in enumerate(self._requested_blocks): 102 if segnum0 == segnum: 103 observers.add(o) 104 break 105 else: 106 self._requested_blocks.append(segnum, set([o])) 107 eventually(self.loop) 108 return o 109 110 def _cancel_block_request(self, o): 111 new_requests = [] 112 for e in self._requested_blocks: 113 (segnum0, observers) = e 114 observers.discard(o) 115 if observers: 116 new_requests.append(e) 117 self._requested_blocks = new_requests 118 119 # internal methods 120 def _active_segnum(self): 121 if self._requested_blocks: 122 return self._requested_blocks[0] 123 return None 124 125 def _active_segnum_and_observers(self): 126 if self._requested_blocks: 127 # we only retrieve information for one segment at a time, to 128 # minimize alacrity (first come, first served) 129 return self._requested_blocks[0] 130 return None, [] 131 132 def loop(self): 133 # TODO: if any exceptions occur here, kill the download 134 135 # we are (eventually) called after all state transitions: 136 # new segments added to self._requested_blocks 137 # new data received from servers (responses to our read() calls) 138 # impatience timer fires (server appears slow) 139 140 # First, consume all of the information that we currently have, for 141 # all the segments people currently want. 142 while self._get_satisfaction(): 143 pass 144 145 # When we get no satisfaction (from the data we've received so far), 146 # we determine what data we desire (to satisfy more requests). The 147 # number of segments is finite, so I can't get no satisfaction 148 # forever. 149 self._desire() 150 151 # finally send out requests for whatever we need (desire minus have). 152 # You can't always get what you want, but, sometimes, you get what 153 # you need. 154 self._request_needed() # express desire 155 156 def _get_satisfaction(self): 157 # return True if we retired a data block, and should therefore be 158 # called again. Return False if we don't retire a data block (even if 159 # we do retire some other data, like hash chains). 160 161 if self.actual_offsets is None: 162 if not self._satisfy_offsets(): 163 # can't even look at anything without the offset table 164 return False 165 166 if not self._node.have_UEB: 167 if not self._satisfy_UEB(): 168 # can't check any hashes without the UEB 169 return False 170 171 segnum, observers = self._active_segnum_and_observers() 172 if segnum >= self._node.num_segments: 173 for o in observers: 174 o.notify(state=BADSEGNUM) 175 self._requested_blocks.pop(0) 176 return True 177 178 if self._node.share_hash_tree.needed_hashes(self.shnum): 179 if not self._satisfy_share_hash_tree(): 180 # can't check block_hash_tree without a root 181 return False 182 183 if segnum is None: 184 return False # we don't want any particular segment right now 185 186 # block_hash_tree 187 needed_hashes = self._commonshare.block_hash_tree.needed_hashes(segnum) 188 if needed_hashes: 189 if not self._satisfy_block_hash_tree(needed_hashes): 190 # can't check block without block_hash_tree 191 return False 192 193 # data blocks 194 return self._satisfy_data_block(segnum, observers) 195 196 def _satisfy_offsets(self): 197 version_s = self._received_data.get(0, 4) 198 if version_s is None: 199 return False 200 (version,) = struct.unpack(">L", version_s) 201 if version == 1: 202 table_start = 0x0c 203 self._fieldsize = 0x4 204 self._fieldstruct = ">L" 205 else: 206 table_start = 0x14 207 self._fieldsize = 0x8 208 self._fieldstruct = ">Q" 209 offset_table_size = 6 * self._fieldsize 210 table_s = self._received_data.pop(table_start, offset_table_size) 211 if table_s is None: 212 return False 213 fields = struct.unpack(6*self._fieldstruct, table_s) 214 offsets = {} 215 for i,field in enumerate('data', 216 'plaintext_hash_tree', # UNUSED 217 'crypttext_hash_tree', 218 'block_hashes', 219 'share_hashes', 220 'uri_extension', 221 ): 222 offsets[field] = fields[i] 223 self.actual_offsets = offsets 224 self._received_data.remove(0, 4) # don't need this anymore 225 return True 226 227 def _satisfy_UEB(self): 228 o = self.actual_offsets 229 fsize = self._fieldsize 230 rdata = self._received_data 231 UEB_length_s = rdata.get(o["uri_extension"], fsize) 232 if not UEB_length_s: 233 return False 234 UEB_length = struct.unpack(UEB_length_s, self._fieldstruct) 235 UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length) 236 if not UEB_s: 237 return False 238 rdata.remove(o["uri_extension"], fsize) 239 try: 240 self._node.validate_and_store_UEB(UEB_s) 241 self.actual_segment_size = self._node.segment_size 242 assert self.actual_segment_size is not None 243 return True 244 except hashtree.BadHashError: 245 # TODO: if this UEB was bad, we'll keep trying to validate it 246 # over and over again. Only log.err on the first one, or better 247 # yet skip all but the first 248 f = Failure() 249 self._signal_corruption(f, o["uri_extension"], fsize+UEB_length) 250 return False 251 252 def _satisfy_share_hash_tree(self): 253 # the share hash chain is stored as (hashnum,hash) tuples, so you 254 # can't fetch just the pieces you need, because you don't know 255 # exactly where they are. So fetch everything, and parse the results 256 # later. 257 o = self.actual_offsets 258 rdata = self._received_data 259 hashlen = o["uri_extension"] - o["share_hashes"] 260 assert hashlen % (2+HASH_SIZE) == 0 261 hashdata = rdata.get(o["share_hashes"], hashlen) 262 if not hashdata: 263 return False 264 share_hashes = {} 265 for i in range(0, hashlen, 2+HASH_SIZE): 266 hashnum = struct.unpack(">H", hashdata[i:i+2])[0] 267 hashvalue = hashdata[i+2:i+2+HASH_SIZE] 268 share_hashes[hashnum] = hashvalue 269 try: 270 self._node.process_share_hashes(share_hashes) 271 # adds to self._node.share_hash_tree 272 rdata.remove(o["share_hashes"], hashlen) 273 return True 274 except (hashtree.BadHashError, hashtree.NotEnoughHashesError, 275 IndexError): 276 f = Failure() 277 self._signal_corruption(f, o["share_hashes"], hashlen) 278 return False 279 280 def _signal_corruption(self, f, start, offset): 281 # there was corruption somewhere in the given range 282 print f # XXX 283 pass 284 285 def _satisfy_block_hash_tree(self, needed_hashes): 286 o = self.actual_offsets 287 rdata = self._received_data 288 block_hashes = {} 289 for hashnum in needed_hashes: 290 hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 291 if hashdata: 292 block_hashes[hashnum] = hashdata 293 else: 294 return False # missing some hashes 295 # note that we don't submit any hashes to the block_hash_tree until 296 # we've gotten them all, because the hash tree will throw an 297 # exception if we only give it a partial set (which it therefore 298 # cannot validate) 299 ok = commonshare.process_block_hashes(block_hashes, serverid_s) 300 if not ok: 301 return False 302 for hashnum in needed_hashes: 303 rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 304 return True 305 306 def _satisfy_data_block(self, segnum, observers): 307 tail = (segnum == self._node.num_segments-1) 308 datastart = self.actual_offsets["data"] 309 blockstart = datastart + segnum * self._node.block_size 310 blocklen = self._node.block_size 311 if tail: 312 blocklen = self._node.tail_block_size 313 314 block = rdata.pop(blockstart, blocklen) 315 if not block: 316 return False 317 # this block is being retired, either as COMPLETE or CORRUPT, since 318 # no further data reads will help 319 assert self._requested_blocks[0][0] == segnum 320 ok = commonshare.check_block(segnum, block) 321 if ok: 322 for o in observers: 323 # goes to SegmentFetcher._block_request_activity 324 o.notify(state=COMPLETE, block=block) 325 else: 326 for o in observers: 327 o.notify(state=CORRUPT) 328 self._requested_blocks.pop(0) # retired 329 return True # got satisfaction 330 331 def _desire(self): 332 segnum, observers = self._active_segnum_and_observers() 333 fsize = self._fieldsize 334 rdata = self._received_data 335 commonshare = self._commonshare 336 337 if not self.actual_offsets: 338 self._desire_offsets() 339 340 # we can use guessed offsets as long as this server tolerates overrun 341 if not self.actual_offsets and not self._overrun_ok: 342 return # must wait for the offsets to arrive 343 344 o = self.actual_offsets or self.guessed_offsets 345 segsize = self.actual_segment_size or self.guessed_segment_size 346 if not self._node.have_UEB: 347 self._desire_UEB(o) 348 349 if self._node.share_hash_tree.needed_hashes(self.shnum): 350 hashlen = o["uri_extension"] - o["share_hashes"] 351 self._wanted.add(o["share_hashes"], hashlen) 352 353 if segnum is None: 354 return # only need block hashes or blocks for active segments 355 356 # block hash chain 357 for hashnum in commonshare.block_hash_tree.needed_hashes(segnum): 358 self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) 359 360 # data 361 r = self._node._calculate_sizes(segsize, size, k) # XXX 362 tail = (segnum == r["num_segments"]) 363 datastart = o["data"] 364 blockstart = datastart + segnum * r["block_size"] 365 blocklen = r["block_size"] 366 if tail: 367 blocklen = r["tail_block_size"] 368 self._wanted_blocks.add(blockstart, blocklen) 369 370 371 def _desire_offsets(self): 372 if self._overrun_ok: 373 # easy! this includes version number, sizes, and offsets 374 self._wanted.add(0,1024) 375 return 376 377 # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44). 378 # To be conservative, only request the data that we know lives there, 379 # even if that means more roundtrips. 380 381 self._wanted.add(0,4) # version number, always safe 382 version_s = self._received_data.get(0, 4) 383 if not version_s: 384 return 385 (version,) = struct.unpack(">L", version_s) 386 if version == 1: 387 table_start = 0x0c 388 fieldsize = 0x4 389 else: 390 table_start = 0x14 391 fieldsize = 0x8 392 offset_table_size = 6 * fieldsize 393 self._wanted.add(table_start, offset_table_size) 394 395 def _desire_UEB(self, o): 396 # UEB data is stored as (length,data). 397 if self._overrun_ok: 398 # We can pre-fetch 2kb, which should probably cover it. If it 399 # turns out to be larger, we'll come back here later with a known 400 # length and fetch the rest. 401 self._wanted.add(o["uri_extension"], 2048) 402 # now, while that is probably enough to fetch the whole UEB, it 403 # might not be, so we need to do the next few steps as well. In 404 # most cases, the following steps will not actually add anything 405 # to self._wanted 406 407 self._wanted.add(o["uri_extension"], self._fieldsize) 408 # only use a length if we're sure it's correct, otherwise we'll 409 # probably fetch a huge number 410 if not self.actual_offsets: 411 return 412 UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize) 413 if UEB_length_s: 414 UEB_length = struct.unpack(UEB_length_s, self._fieldstruct) 415 # we know the length, so make sure we grab everything 416 self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length) 417 418 def _request_needed(self): 419 # send requests for metadata first, to avoid hanging on to large data 420 # blocks any longer than necessary. 421 self._send_requests(self._wanted - self._received - self._requested) 422 # then send requests for data blocks. All the hashes should arrive 423 # before the blocks, so the blocks can be consumed and released in a 424 # single turn. 425 ask = self._wanted_blocks - self._received - self._requested 426 self._send_requests(ask) 427 428 def _send_requests(self, needed): 429 for (start, length) in needed: 430 # TODO: quantize to reasonably-large blocks 431 self._requested.add(start, length) 432 d = self._send_request(start, length) 433 d.addCallback(self._got_data, start, length) 434 d.addErrback(self._got_error) 435 d.addErrback(lambda f: 436 log.err(format="unhandled error during send_request", 437 failure=f, parent=self._lp, 438 level=log.WEIRD, umid="qZu0wg")) 439 440 def _send_request(self, start, length): 441 return self._rref.callRemote("read", start, length) 442 443 def _got_data(self, data, start, length): 444 span = (start, length) 445 assert span in self._requested 446 self._requested.remove(start, length) 447 self._received.add(start, length) 448 self._received_data.add(start, data) 449 eventually(self.loop) 450 451 def _got_error(self, f): # XXX 452 log.msg(format="error requesting %(start)d+%(length)d" 453 " from %(server)s for si %(si)s", 454 start=start, length=length, 455 server=self._peerid_s, si=self._si_prefix, 456 failure=f, parent=self._lp, 457 level=log.UNUSUAL, umid="qZu0wg") 458 # retire our observers, assuming we won't be able to make any 459 # further progress 460 self._fail(f) 461 462 def _fail(self, f): 463 for (segnum, o) in self._requested_blocks: 464 o.notify(state=DEAD, f=f) 465 466 467 class CommonShare: 468 """I hold data that is common across all instances of a single share, 469 like sh2 on both servers A and B. This is just the block hash tree. 470 """ 471 def __init__(self, numsegs, si_s, shnum): 472 self.si_s = si_s 473 self.shnum = shnum 474 if numsegs is not None: 475 self._block_hash_tree = IncompleteHashTree(numsegs) 476 477 def got_numsegs(self, numsegs): 478 self._block_hash_tree = IncompleteHashTree(numsegs) 479 480 def process_block_hashes(self, block_hashes, serverid_s): 481 try: 482 self._block_hash_tree.add_hashes(block_hashes) 483 return True 484 except (hashtree.BadHashError, hashtree.NotEnoughHashesError): 485 hashnums = ",".join(sorted(block_hashes.keys())) 486 self.log("hash failure in block_hashes=(%(hashnums)s)," 487 " shnum=%(shnum)d SI=%(si)s server=%(server)s", 488 hashnums=hashnums, shnum=self.shnum, 489 si=self.si_s, server=serverid_s, 490 level=log.WEIRD, umid="yNyFdA") 491 return False 492 493 def check_block(self, segnum, block): 494 h = hashutil.block_hash(block) 495 try: 496 self._block_hash_tree.set_hashes(leaves={segnum: h}) 497 except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le: 498 LOG(...) 499 return False 500 return True 501 502 # all classes are also Services, and the rule is that you don't initiate more 503 # work unless self.running 504 505 # GC: decide whether each service is restartable or not. For non-restartable 506 # services, stopService() should delete a lot of attributes to kill reference 507 # cycles. The primary goal is to decref remote storage BucketReaders when a 508 # download is complete. 509 510 class SegmentFetcher: 511 """I am responsible for acquiring blocks for a single segment. I will use 512 the Share instances passed to my add_shares() method to locate, retrieve, 513 and validate those blocks. I expect my parent node to call my 514 no_more_shares() method when there are no more shares available. I will 515 call my parent's want_more_shares() method when I want more: I expect to 516 see at least one call to add_shares or no_more_shares afterwards. 517 518 When I have enough validated blocks, I will call my parent's 519 process_blocks() method with a dictionary that maps shnum to blockdata. 520 If I am unable to provide enough blocks, I will call my parent's 521 fetch_failed() method with (self, f). After either of these events, I 522 will shut down and do no further work. My parent can also call my stop() 523 method to have me shut down early.""" 524 525 def __init__(self, node, segnum, k): 526 self._node = node # _Node 527 self.segnum = segnum 528 self._k = k 529 self._shares = {} # maps non-dead Share instance to a state, one of 530 # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). 531 # State transition map is: 532 # AVAILABLE -(send-read)-> PENDING 533 # PENDING -(timer)-> OVERDUE 534 # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM 535 # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM 536 # If a share becomes DEAD, it is removed from the 537 # dict. If it becomes BADSEGNUM, the whole fetch is 538 # terminated. 539 self._share_observers = {} # maps Share to Observer2 for active ones 540 self._shnums = DictOfSets() # maps shnum to the shares that provide it 541 self._blocks = {} # maps shnum to validated block data 542 self._no_more_shares = False 543 self._bad_segnum = False 544 self._last_failure = None 545 self._running = True 546 547 def stop(self): 548 self._cancel_all_requests() 549 self._running = False 550 del self._shares # let GC work # ??? 551 552 553 # called by our parent CiphertextFileNode 554 555 def add_shares(self, shares): 556 # called when ShareFinder locates a new share, and when a non-initial 557 # segment fetch is started and we already know about shares from the 558 # previous segment 559 for s in shares: 560 self._shares[s] = AVAILABLE 561 self._shnums[s.shnum].add(s) 562 eventually(self._loop) 563 564 def no_more_shares(self): 565 # ShareFinder tells us it's reached the end of its list 566 self._no_more_shares = True 567 568 # internal methods 569 570 def _count_shnums(self, *states): 571 """shnums for which at least one state is in the following list""" 572 shnums = [] 573 for shnum,shares in self._shnums.iteritems(): 574 matches = [s for s in shares if s.state in states] 575 if matches: 576 shnums.append(shnum) 577 return len(shnums) 578 579 def _loop(self): 580 k = self._k 581 if not self._running: 582 return 583 if self._bad_segnum: 584 # oops, we were asking for a segment number beyond the end of the 585 # file. This is an error. 586 self.stop() 587 e = BadSegmentNumberError("%d > %d" % (self.segnum, 588 self._node.num_segments)) 589 f = Failure(e) 590 self._node.fetch_failed(self, f) 591 return 592 593 # are we done? 594 if self._count_shnums(COMPLETE) >= k: 595 # yay! 596 self.stop() 597 self._node.process_blocks(self.segnum, self._blocks) 598 return 599 600 # we may have exhausted everything 601 if (self._no_more_shares and 602 self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): 603 # no more new shares are coming, and the remaining hopeful shares 604 # aren't going to be enough. boo! 605 self.stop() 606 format = ("ran out of shares: %(complete)d complete," 607 " %(pending)d pending, %(overdue)d overdue," 608 " %(unused)d unused, need %(k)k." 609 " Last failure: %(last_failure)s") 610 args = {"complete": self._count_shnums(COMPLETE), 611 "pending": self._count_shnums(PENDING), 612 "overdue": self._count_shnums(OVERDUE), 613 "unused": self._count_shnums(AVAILABLE), # should be zero 614 "k": k, 615 "last_failure": self._last_failure, 616 } 617 self.log(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) 618 e = NotEnoughShares(format % args) 619 f = Failure(e) 620 self._node.fetch_failed(self, f) 621 return 622 623 # nope, not done. Are we "block-hungry" (i.e. do we want to send out 624 # more read requests, or do we think we have enough in flight 625 # already?) 626 while self._count_shnums(PENDING, COMPLETE) < k: 627 # we're hungry.. are there any unused shares? 628 sent = self._send_new_request() 629 if not sent: 630 break 631 632 # ok, now are we "share-hungry" (i.e. do we have enough known shares 633 # to make us happy, or should we ask the ShareFinder to get us more?) 634 if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: 635 # we're hungry for more shares 636 self._node.want_more_shares() 637 # that will trigger the ShareFinder to keep looking 638 639 def _find_one(self, shares, state): 640 # TODO could choose fastest 641 for s in shares: 642 if self._shares[s] == state: 643 return s 644 raise IndexError("shouldn't get here") 645 646 def _send_new_request(self): 647 for shnum,shares in self._shnums.iteritems(): 648 states = [self._shares[s] for s in shares] 649 if COMPLETE in states or PENDING in states: 650 # don't send redundant requests 651 continue 652 if AVAILABLE not in states: 653 # no candidates for this shnum, move on 654 continue 655 # here's a candidate. Send a request. 656 s = self._find_one(shares, AVAILABLE) 657 self._shares[s] = PENDING 658 self._share_observers[s] = o = s.get_block(segnum) 659 o.subscribe(self._block_request_activity, share=s, shnum=shnum) 660 # TODO: build up a list of candidates, then walk through the 661 # list, sending requests to the most desireable servers, 662 # re-checking our block-hunger each time. For non-initial segment 663 # fetches, this would let us stick with faster servers. 664 return True 665 # nothing was sent: don't call us again until you have more shares to 666 # work with, or one of the existing shares has been declared OVERDUE 667 return False 668 669 def _cancel_all_requests(self): 670 for o in self._share_observers.values(): 671 o.cancel() 672 self._share_observers = {} 673 674 def _block_request_activity(self, share, shnum, state, block=None, f=None): 675 # called by Shares, in response to our s.send_request() calls. 676 # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. 677 if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): 678 del self._share_observers[share] 679 if state is COMPLETE: 680 # 'block' is fully validated 681 self._shares[share] = COMPLETE 682 self._blocks[shnum] = block 683 elif state is OVERDUE: 684 self._shares[share] = OVERDUE 685 # OVERDUE is not terminal: it will eventually transition to 686 # COMPLETE, CORRUPT, or DEAD. 687 elif state is CORRUPT: 688 self._shares[share] = CORRUPT 689 elif state is DEAD: 690 del self._shares[share] 691 self._shnums[shnum].remove(share) 692 self._last_failure = f 693 elif state is BADSEGNUM: 694 self._shares[share] = BADSEGNUM # ??? 695 self._bad_segnum = True 696 eventually(self._loop) 697 698 699 class RequestToken: 700 def __init__(self, peerid): 701 self.peerid = peerid 702 703 class ShareFinder: 704 def __init__(self, storage_broker, storage_index, 705 share_consumer, max_outstanding_requests=10): 706 # XXX need self.node -> CiphertextFileNode 707 self.running = True 708 s = storage_broker.get_servers_for_index(storage_index) 709 self._servers = iter(s) 710 self.share_consumer = share_consumer 711 self.max_outstanding = max_outstanding_requests 712 713 self._hungry = False 714 715 self._commonshares = {} # shnum to CommonShare instance 716 self.undelivered_shares = [] 717 self.pending_requests = set() 718 719 self._si_s = base32.b2a(storage_index) 720 self._si_prefix = base32.b2a_l(storage_index[:8], 60) 721 self._lp = log.msg(format="ShareFinder[si=%(si)s] starting", 722 si=self._si_prefix, level=log.NOISY, umid="2xjj2A") 723 724 self._num_segments = None 725 d = share_consumer.get_num_segments() 726 d.addCallback(self._got_numsegs) 727 def _err_numsegs(f): 728 log.err(format="Unable to get number of segments", failure=f, 729 parent=self._lp, level=log.UNUSUAL, umid="dh38Xw") 730 d.addErrback(_err_numsegs) 731 732 def log(self, *args, **kwargs): 733 if "parent" not in kwargs: 734 kwargs["parent"] = self._lp 735 return log.msg(*args, **kwargs) 736 737 def stop(self): 738 self.running = False 739 740 def _got_numsegs(self, numsegs): 741 for cs in self._commonshares.values(): 742 cs.got_numsegs(numsegs) 743 self._num_segments = numsegs 744 745 # called by our parent CiphertextDownloader 746 def hungry(self): 747 log.msg(format="ShareFinder[si=%(si)s] hungry", 748 si=self._si_prefix, level=log.NOISY, umid="NywYaQ") 749 self._hungry = True 750 eventually(self.loop) 751 752 # internal methods 753 def loop(self): 754 log.msg(format="ShareFinder[si=%(si)s] loop: running=%(running)s" 755 " hungry=%(hungry)s, undelivered=%(undelivered)s," 756 " pending=%(pending)s", 757 si=self._si_prefix, running=self._running, hungry=self._hungry, 758 undelivered=",".join(["sh%d@%s" % (s._shnum, 759 idlib.shortnodeid_b2a(s._peerid)) 760 for s in self.undelivered_shares]), 761 pending=",".join([idlib.shortnodeid_b2a(rt.peerid) 762 for rt in self.pending_requests]), # sort? 763 level=log.NOISY, umid="kRtS4Q") 764 if not self.running: 765 return 766 if not self._hungry: 767 return 768 if self.undelivered_shares: 769 sh = self.undelivered_shares.pop(0) 770 # they will call hungry() again if they want more 771 self._hungry = False 772 eventually(self.share_consumer.got_shares, [sh]) 773 return 774 if len(self.pending_requests) >= self.max_outstanding_requests: 775 # cannot send more requests, must wait for some to retire 776 return 777 778 server = None 779 try: 780 if self._servers: 781 server = self._servers.next() 782 except StopIteration: 783 self._servers = None 784 785 if server: 786 self.send_request(server) 787 return 788 789 if self.pending_requests: 790 # no server, but there are still requests in flight: maybe one of 791 # them will make progress 792 return 793 794 # we've run out of servers (so we can't send any more requests), and 795 # we have nothing in flight. No further progress can be made. They 796 # are destined to remain hungry. 797 self.share_consumer.no_more_shares() 798 self.stop() 799 800 801 def send_request(self, server): 802 peerid, rref = server 803 req = RequestToken(peerid) 804 self.pending_requests.add(req) 805 lp = self.log(format="sending DYHB to [%(peerid)s]", 806 peerid=idlib.shortnodeid_b2a(peerid), 807 level=log.NOISY, umid="Io7pyg") 808 d = rref.callRemote("get_buckets", self._storage_index) 809 d.addBoth(incidentally, self.pending_requests.discard, req) 810 d.addCallbacks(self._got_response, self._got_error, 811 callbackArgs=(peerid, req, lp)) 812 d.addErrback(log.err, format="error in send_request", 813 level=log.WEIRD, parent=lp, umid="rpdV0w") 814 d.addCallback(incidentally, eventually, self.loop) 815 816 def _got_response(self, buckets, peerid, req, lp): 817 if buckets: 818 shnums_s = ",".join([str(shnum) for shnum in buckets]) 819 self.log(format="got shnums [%s] from [%(peerid)s]" % shnums_s, 820 peerid=idlib.shortnodeid_b2a(peerid), 821 level=log.NOISY, parent=lp, umid="0fcEZw") 822 else: 823 self.log(format="no shares from [%(peerid)s]", 824 peerid=idlib.shortnodeid_b2a(peerid), 825 level=log.NOISY, parent=lp, umid="U7d4JA") 826 for shnum, bucket in buckets.iteritems(): 827 if shnum not in self._commonshares: 828 self._commonshares[shnum] = CommonShare(self._num_segments, 829 self._si_s, shnum) 830 cs = self._commonshares[shnum] 831 s = Share(bucket, self.verifycap, cs, self.node, 832 peerid, self._si_s, shnum) 833 self.undelivered_shares.append(s) 834 835 def _got_error(self, f, peerid, req): 836 self.log(format="got error from [%(peerid)s]", 837 peerid=idlib.shortnodeid_b2a(peerid), failure=f, 838 level=log.UNUSUAL, parent=lp, umid="zUKdCw") 839 840 841 842 class Segmentation: 843 """I am responsible for a single offset+size read of the file. I handle 844 segmentation: I figure out which segments are necessary, request them 845 (from my CiphertextDownloader) in order, and trim the segments down to 846 match the offset+size span. I use the Producer/Consumer interface to only 847 request one segment at a time. 848 """ 849 implements(IPushProducer) 850 def __init__(self, node, offset, size, consumer): 851 self._node = node 852 self._hungry = True 853 self._active_segnum = None 854 self._cancel_segment_request = None 855 # these are updated as we deliver data. At any given time, we still 856 # want to download file[offset:offset+size] 857 self._offset = offset 858 self._size = size 859 self._consumer = consumer 860 861 def start(self): 862 self._alive = True 863 self._deferred = defer.Deferred() 864 self._consumer.registerProducer(self) # XXX??? 865 self._maybe_fetch_next() 866 return self._deferred 867 868 def _maybe_fetch_next(self): 869 if not self._alive or not self._hungry: 870 return 871 if self._active_segnum is not None: 872 return 873 self._fetch_next() 874 875 def _fetch_next(self): 876 if self._size == 0: 877 # done! 878 self._alive = False 879 self._hungry = False 880 self._consumer.unregisterProducer() 881 self._deferred.callback(self._consumer) 882 return 883 n = self._node 884 have_actual_segment_size = n.segment_size is not None 885 segment_size = n.segment_size or n.guessed_segment_size 886 if self._offset == 0: 887 # great! we want segment0 for sure 888 wanted_segnum = 0 889 else: 890 # this might be a guess 891 wanted_segnum = self._offset // segment_size 892 self._active_segnum = wanted_segnum 893 d,c = n.get_segment(wanted_segnum) 894 self._cancel_segment_request = c 895 d.addBoth(self._request_retired) 896 d.addCallback(self._got_segment, have_actual_segment_size) 897 d.addErrback(self._retry_bad_segment, have_actual_segment_size) 898 d.addErrback(self._error) 899 900 def _request_retired(self, res): 901 self._active_segnum = None 902 self._cancel_segment_request = None 903 return res 904 905 def _got_segment(self, (segment_start,segment), had_actual_segment_size): 906 self._active_segnum = None 907 self._cancel_segment_request = None 908 # we got file[segment_start:segment_start+len(segment)] 909 # we want file[self._offset:self._offset+self._size] 910 o = overlap(segment_start, len(segment), self._offset, self._size) 911 # the overlap is file[o[0]:o[0]+o[1]] 912 if not o or o[0] != self._offset: 913 # we didn't get the first byte, so we can't use this segment 914 if have_actual_segment_size: 915 # and we should have gotten it right. This is big problem. 916 raise SOMETHING 917 # we've wasted some bandwidth, but now we can grab the right one, 918 # because we should know the segsize by now. 919 assert self._node.segment_size is not None 920 self._maybe_fetch_next() 921 return 922 offset_in_segment = self._offset - segment_start 923 desired_data = segment[offset_in_segment:offset_in_segment+o[1]] 924 925 self._offset += len(desired_data) 926 self._size -= len(desired_data) 927 self._consumer.write(desired_data) 928 self._maybe_fetch_next() 929 930 def _retry_bad_segment(self, f, had_actual_segment_size): 931 f.trap(BadSegmentNumberError): # guessed way wrong, off the end 932 if had_actual_segment_size: 933 # but we should have known better, so this is a real error 934 return f 935 # we didn't know better: try again with more information 936 return self._maybe_fetch_next() 937 938 def _error(self, f): 939 self._alive = False 940 self._hungry = False 941 self._consumer.unregisterProducer() 942 self._deferred.errback(f) 943 944 def stopProducing(self): 945 self._hungry = False 946 self._alive = False 947 # cancel any outstanding segment request 948 if self._cancel_segment_request: 949 self._cancel_segment_request() 950 self._cancel_segment_request = None 951 def pauseProducing(self): 952 self._hungry = False 953 def resumeProducing(self): 954 self._hungry = True 955 eventually(self._maybe_fetch_next) 956 957 class Cancel: 958 def __init__(self, f): 959 self._f = f 960 self.cancelled = False 961 def cancel(self): 962 if not self.cancelled: 963 self.cancelled = True 964 self._f(self) 965 966 class _Node: 967 """Internal class which manages downloads and holds state. External 968 callers use CiphertextFileNode instead.""" 969 970 # Share._node points to me 971 def __init__(self, verifycap, storage_broker, secret_holder, 972 terminator, history): 973 assert isinstance(verifycap, CHKFileVerifierURI) 974 self._verifycap = verifycap 975 self.running = True 976 terminator.register(self) # calls self.stop() at stopService() 977 # the rules are: 978 # 1: Only send network requests if you're active (self.running is True) 979 # 2: Use TimerService, not reactor.callLater 980 # 3: You can do eventual-sends any time. 981 # These rules should mean that once 982 # stopService()+flushEventualQueue() fires, everything will be done. 983 self._secret_holder = secret_holder 984 self._history = history 985 986 k, N = self._verifycap.needed_shares, self._verifycap.total_shares 987 self.share_hash_tree = IncompleteHashTree(N) 988 989 # we guess the segment size, so Segmentation can pull non-initial 990 # segments in a single roundtrip 991 max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the 992 # same place as upload.BaseUploadable 993 s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k) 994 self.guessed_segment_size = s 995 996 # filled in when we parse a valid UEB 997 self.have_UEB = False 998 self.segment_size = None 999 self.tail_segment_size = None 1000 self.tail_segment_padded = None 1001 self.num_segments = None 1002 self.block_size = None 1003 self.tail_block_size = None 1004 self.ciphertext_hash_tree = None # size depends on num_segments 1005 self.ciphertext_hash = None # flat hash, optional 1006 1007 # things to track callers that want data 1008 self._segsize_observers = OneShotObserverList() 1009 self._numsegs_observers = OneShotObserverList() 1010 # _segment_requests can have duplicates 1011 self._segment_requests = [] # (segnum, d, cancel_handle) 1012 self._active_segment = None # a SegmentFetcher, with .segnum 1013 1014 storage_index = verifycap.storage_index 1015 self._sharefinder = ShareFinder(storage_broker, storage_index, self) 1016 self._shares = set() 1017 1018 def stop(self): 1019 # called by the Terminator at shutdown, mostly for tests 1020 if self._active_segment: 1021 self._active_segment.stop() 1022 self._active_segment = None 1023 self._sharefinder.stop() 1024 1025 # things called by outside callers, via CiphertextFileNode. get_segment() 1026 # may also be called by Segmentation. 1027 1028 def read(self, consumer, offset=0, size=None): 1029 """I am the main entry point, from which FileNode.read() can get 1030 data. I feed the consumer with the desired range of ciphertext. I 1031 return a Deferred that fires (with the consumer) when the read is 1032 finished.""" 1033 # for concurrent operations: each gets its own Segmentation manager 1034 if size is None: 1035 size = self._verifycap.size - offset 1036 s = Segmentation(self, offset, size, consumer) 1037 # this raises an interesting question: what segments to fetch? if 1038 # offset=0, always fetch the first segment, and then allow 1039 # Segmentation to be responsible for pulling the subsequent ones if 1040 # the first wasn't large enough. If offset>0, we're going to need an 1041 # extra roundtrip to get the UEB (and therefore the segment size) 1042 # before we can figure out which segment to get. TODO: allow the 1043 # offset-table-guessing code (which starts by guessing the segsize) 1044 # to assist the offset>0 process. 1045 d = s.start() 1046 return d 1047 1048 def get_segment(self, segnum): 1049 """Begin downloading a segment. I return a tuple (d, c): 'd' is a 1050 Deferred that fires with (offset,data) when the desired segment is 1051 available, and c is an object on which c.cancel() can be called to 1052 disavow interest in the segment (after which 'd' will never fire). 1053 1054 You probably need to know the segment size before calling this, 1055 unless you want the first few bytes of the file. If you ask for a 1056 segment number which turns out to be too large, the Deferred will 1057 errback with BadSegmentNumberError. 1058 1059 The Deferred fires with the offset of the first byte of the data 1060 segment, so that you can call get_segment() before knowing the 1061 segment size, and still know which data you received. 1062 """ 1063 d = defer.Deferred() 1064 c = Cancel(self._cancel_request) 1065 self._segment_requests.append( (segnum, d, c) ) 1066 self._start_new_segment() 1067 eventually(self._loop) 1068 return (d, c) 1069 1070 # things called by the Segmentation object used to transform 1071 # arbitrary-sized read() calls into quantized segment fetches 1072 1073 def get_segment_size(self): 1074 """I return a Deferred that fires with the segment_size used by this 1075 file.""" 1076 return self._segsize_observers.when_fired() 1077 def get_num_segments(self): 1078 """I return a Deferred that fires with the number of segments used by 1079 this file.""" 1080 return self._numsegs_observers.when_fired() 1081 1082 def _start_new_segment(self): 1083 if self._active_segment is None and self._segment_requests: 1084 segnum = self._segment_requests[0][0] 1085 k = self._verifycap.needed_shares 1086 self._active_segment = fetcher = SegmentFetcher(self, segnum, k) 1087 active_shares = [s for s in self._shares if s.not_dead()] 1088 fetcher.add_shares(active_shares) # this triggers the loop 1089 1090 1091 # called by our child ShareFinder 1092 def got_shares(self, shares): 1093 self._shares.update(shares) 1094 if self._active_segment 1095 self._active_segment.add_shares(shares) 1096 def no_more_shares(self): 1097 self._no_more_shares = True 1098 if self._active_segment: 1099 self._active_segment.no_more_shares() 1100 1101 # things called by our Share instances 1102 1103 def validate_and_store_UEB(self, UEB_s): 1104 h = hashutil.uri_extension_hash(UEB_s) 1105 if h != self._verifycap.uri_extension_hash: 1106 raise hashutil.BadHashError 1107 UEB_dict = uri.unpack_extension(data) 1108 self._parse_and_store_UEB(self, UEB_dict) # sets self._stuff 1109 # TODO: a malformed (but authentic) UEB could throw an assertion in 1110 # _parse_and_store_UEB, and we should abandon the download. 1111 self.have_UEB = True 1112 self._segsize_observers.fire(self.segment_size) 1113 self._numsegs_observers.fire(self.num_segments) 1114 1115 1116 def _parse_and_store_UEB(self, d): 1117 # Note: the UEB contains needed_shares and total_shares. These are 1118 # redundant and inferior (the filecap contains the authoritative 1119 # values). However, because it is possible to encode the same file in 1120 # multiple ways, and the encoders might choose (poorly) to use the 1121 # same key for both (therefore getting the same SI), we might 1122 # encounter shares for both types. The UEB hashes will be different, 1123 # however, and we'll disregard the "other" encoding's shares as 1124 # corrupted. 1125 1126 # therefore, we ignore d['total_shares'] and d['needed_shares']. 1127 1128 k, N = self._verifycap.needed_shares, self._verifycap.total_shares 1129 size = self._verifycap.size 1130 1131 self.segment_size = d['segment_size'] 1132 1133 r = self._calculate_sizes(self.segment_size, size, k) 1134 self.tail_segment_size = r["tail_segment_size"] 1135 self.tail_segment_padded = r["tail_segment_padded"] 1136 self.num_segments = r["num_segments"] 1137 self.block_size = r["block_size"] 1138 self.tail_block_size = r["tail_block_size"] 1139 1140 # zfec.Decode() instantiation is fast, but still, let's use the same 1141 # codec instance for all but the last segment. 3-of-10 takes 15us on 1142 # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is 1143 # 2.5ms, worst-case 254-of-255 is 9.3ms 1144 self._codec = codec.CRSDecoder() 1145 self._codec.set_params(self.segment_size, k, N) 1146 1147 1148 # Ciphertext hash tree root is mandatory, so that there is at most 1149 # one ciphertext that matches this read-cap or verify-cap. The 1150 # integrity check on the shares is not sufficient to prevent the 1151 # original encoder from creating some shares of file A and other 1152 # shares of file B. 1153 self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) 1154 self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) 1155 1156 self.share_hash_tree.set_hashes({0: d['share_root_hash']}) 1157 1158 # crypttext_hash is optional. We only pull this from the first UEB 1159 # that we see. 1160 if 'crypttext_hash' in d: 1161 if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE: 1162 self.ciphertext_hash = d['crypttext_hash'] 1163 else: 1164 log.msg("ignoring bad-length UEB[crypttext_hash], " 1165 "got %d bytes, want %d" % (len(d['crypttext_hash']), 1166 hashutil.CRYPTO_VAL_SIZE), 1167 umid="oZkGLA", level=log.WEIRD) 1168 1169 # Our job is a fast download, not verification, so we ignore any 1170 # redundant fields. The Verifier uses a different code path which 1171 # does not ignore them. 1172 1173 def _calculate_sizes(self, segment_size, size, k): 1174 # segments of ciphertext 1175 1176 # this assert matches the one in encode.py:127 inside 1177 # Encoded._got_all_encoding_parameters, where the UEB is constructed 1178 assert segment_size % k == 0 1179 1180 # the last segment is usually short. We don't store a whole segsize, 1181 # but we do pad the segment up to a multiple of k, because the 1182 # encoder requires that. 1183 tail_segment_size = size % segment_size 1184 if tail_segment_size == 0: 1185 tail_segment_size = segment_size 1186 padded = mathutil.next_multiple(tail_segment_size, k) 1187 tail_segment_padded = padded 1188 1189 num_segments = mathutil.div_ceil(size, segment_size) 1190 1191 # each segment is turned into N blocks. All but the last are of size 1192 # block_size, and the last is of size tail_block_size 1193 block_size = segment_size / k 1194 tail_block_size = tail_segment_padded / k 1195 1196 return { "tail_segment_size": tail_segment_size, 1197 "tail_segment_padded": tail_segment_padded, 1198 "num_segments": num_segments, 1199 "block_size": block_size, 1200 "tail_block_size": tail_block_size, 1201 } 1202 1203 1204 def process_share_hashes(self, share_hashes): 1205 self.share_hash_tree.set_hashes(share_hashes) 1206 1207 # called by our child SegmentFetcher 1208 1209 def want_more_shares(self): 1210 self._sharefinder.hungry() 1211 1212 def fetch_failed(self, sf, f): 1213 assert sf is self._active_segment 1214 sf.disownServiceParent() 1215 self._active_segment = None 1216 # deliver error upwards 1217 for (d,c) in self._extract_requests(sf.segnum): 1218 eventually(self._deliver_error, d, c, f) 1219 1220 def _deliver_error(self, d, c, f): 1221 # this method exists to handle cancel() that occurs between 1222 # _got_segment and _deliver_error 1223 if not c.cancelled: 1224 d.errback(f) 1225 1226 def process_blocks(self, segnum, blocks): 1227 tail = (segnum == self.num_segments-1) 1228 codec = self._codec 1229 if tail: 1230 # account for the padding in the last segment 1231 codec = codec.CRSDecoder() 1232 k, N = self._verifycap.needed_shares, self._verifycap.total_shares 1233 codec.set_params(self.tail_segment_padded, k, N) 1234 1235 shares = [] 1236 shareids = [] 1237 for (shareid, share) in blocks.iteritems(): 1238 shareids.append(shareid) 1239 shares.append(share) 1240 del blocks 1241 segment = codec.decode(shares, shareids) 1242 del shares 1243 if tail: 1244 segment = segment[self.tail_segment_size:] 1245 self._process_segment(segnum, segment) 1246 1247 def _process_segment(self, segnum, segment): 1248 h = hashutil.crypttext_hash(segment) 1249 try: 1250 self.ciphertext_hash_tree.set_hashes(leaves={segnum, h}) 1251 except SOMETHING: 1252 SOMETHING 1253 assert self._active_segment.segnum == segnum 1254 assert self.segment_size is not None 1255 offset = segnum * self.segment_size 1256 for (d,c) in self._extract_requests(segnum): 1257 eventually(self._deliver, d, c, offset, segment) 1258 self._active_segment = None 1259 self._start_new_segment() 1260 1261 def _deliver(self, d, c, offset, segment): 1262 # this method exists to handle cancel() that occurs between 1263 # _got_segment and _deliver 1264 if not c.cancelled: 1265 d.callback((offset,segment)) 1266 1267 def _extract_requests(self, segnum): 1268 """Remove matching requests and return their (d,c) tuples so that the 1269 caller can retire them.""" 1270 retire = [(d,c) for (segnum0, d, c) in self._segment_requests 1271 if segnum0 == segnum] 1272 self._segment_requests = [t for t in self._segment_requests 1273 if t[0] != segnum] 1274 return retire 1275 1276 def _cancel_request(self, c): 1277 self._segment_requests = [t for t in self._segment_requests 1278 if t[2] != c] 1279 segnums = [segnum for (segnum,d,c) in self._segment_requests] 1280 if self._active_segment.segnum not in segnums: 1281 self._active_segment.stop() 1282 self._active_segment = None 1283 self._start_new_segment() 1284 1285 class CiphertextFileNode: 1286 def __init__(self, verifycap, storage_broker, secret_holder, 1287 terminator, history): 1288 assert isinstance(verifycap, CHKFileVerifierURI) 1289 self._node = _Node(verifycap, storage_broker, secret_holder, 1290 terminator, history) 1291 1292 def read(self, consumer, offset=0, size=None): 1293 """I am the main entry point, from which FileNode.read() can get 1294 data. I feed the consumer with the desired range of ciphertext. I 1295 return a Deferred that fires (with the consumer) when the read is 1296 finished.""" 1297 return self._node.read(consumer, offset, size) 1298 1299 def get_segment(self, segnum): 1300 """Begin downloading a segment. I return a tuple (d, c): 'd' is a 1301 Deferred that fires with (offset,data) when the desired segment is 1302 available, and c is an object on which c.cancel() can be called to 1303 disavow interest in the segment (after which 'd' will never fire). 1304 1305 You probably need to know the segment size before calling this, 1306 unless you want the first few bytes of the file. If you ask for a 1307 segment number which turns out to be too large, the Deferred will 1308 errback with BadSegmentNumberError. 1309 1310 The Deferred fires with the offset of the first byte of the data 1311 segment, so that you can call get_segment() before knowing the 1312 segment size, and still know which data you received. 1313 """ 1314 return self._node.get_segment(segnum) 1315 1316 1317 class DecryptingConsumer: 1318 """I sit between a CiphertextDownloader (which acts as a Producer) and 1319 the real Consumer, decrypting everything that passes by. The real 1320 Consumer sees the real Producer, but the Producer sees us instead of the 1321 real consumer.""" 1322 implements(IConsumer) 1323 1324 def __init__(self, consumer, readkey, offset): 1325 self._consumer = consumer 1326 # TODO: pycryptopp CTR-mode needs random-access operations: I want 1327 # either a=AES(readkey, offset) or better yet both of: 1328 # a=AES(readkey, offset=0) 1329 # a.process(ciphertext, offset=xyz) 1330 # For now, we fake it with the existing iv= argument. 1331 offset_big = offset // 16 1332 offset_small = offset % 16 1333 iv = binascii.unhexlify("%032x" % offset_big) 1334 self._decryptor = AES(readkey, iv=iv) 1335 self._decryptor.process("\x00"*offset_small) 1336 1337 def registerProducer(self, producer): 1338 # this passes through, so the real consumer can flow-control the real 1339 # producer. Therefore we don't need to provide any IPushProducer 1340 # methods. We implement all the IConsumer methods as pass-throughs, 1341 # and only intercept write() to perform decryption. 1342 self._consumer.registerProducer(producer) 1343 def unregisterProducer(self): 1344 self._consumer.unregisterProducer() 1345 def write(self, ciphertext): 1346 plaintext = self._decryptor.process(ciphertext) 1347 self._consumer.write(plaintext) 1348 1349 class ImmutableFileNode: 1350 # I wrap a CiphertextFileNode with a decryption key 1351 def __init__(self, filecap, storage_broker, secret_holder, downloader, 1352 history): 1353 assert isinstance(filecap, CHKFileURI) 1354 verifycap = filecap.get_verify_cap() 1355 self._cnode = CiphertextFileNode(verifycap, storage_broker, 1356 secret_holder, downloader, history) 1357 assert isinstance(filecap, CHKFileURI) 1358 self.u = filecap 1359 # XXX self._readkey 1360 1361 def read(self, consumer, offset=0, size=None): 1362 decryptor = DecryptingConsumer(consumer, self._readkey, offset) 1363 return self._cnode.read(decryptor, offset, size) 1364 1365 1366 # TODO: if server1 has all shares, and server2-10 have one each, make the 1367 # loop stall slightly before requesting all shares from the first server, to 1368 # give it a chance to learn about the other shares and get some diversity. 1369 # Or, don't bother, let the first block all come from one server, and take 1370 # comfort in the fact that we'll learn about the other servers by the time we 1371 # fetch the second block. 1372 # 1373 # davidsarah points out that we could use sequential (instead of parallel) 1374 # fetching of multiple block from a single server: by the time the first 1375 # block arrives, we'll hopefully have heard about other shares. This would 1376 # induce some RTT delays (i.e. lose pipelining) in the case that this server 1377 # has the only shares, but that seems tolerable. We could rig it to only use 1378 # sequential requests on the first segment. 1379 1380 # as a query gets later, we're more willing to duplicate work. 1381 1382 # should change server read protocol to allow small shares to be fetched in a 1383 # single RTT. Instead of get_buckets-then-read, just use read(shnums, readv), 1384 # where shnums=[] means all shares, and the return value is a dict of 1385 # # shnum->ta (like with mutable files). The DYHB query should also fetch the 1386 # offset table, since everything else can be located once we have that. 1387 1388 1389 # ImmutableFileNode 1390 # DecryptingConsumer 1391 # CiphertextFileNode 1392 # Segmentation 1393 # ShareFinder 1394 # SegmentFetcher[segnum] (one at a time) 1395 # CommonShare[shnum] 1396 # Share[shnum,server] 1397 1398 # TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers 1399 # should be failed with BadSegmentNumberError. But should this be the 1400 # responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will 1401 # first appear when a Share receives a valid UEB and calls 1402 # CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is 1403 # expecting to hear from the Share, via the _block_request_activity observer. 1404 1405 # make it the responsibility of the SegmentFetcher. Each Share that gets a 1406 # valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or 1407 # CORRUPT). The SegmentFetcher it then responsible for shutting down, and 1408 # informing its parent (the CiphertextFileNode) of the BadSegmentNumberError, 1409 # which is then passed to the client of get_segment(). 1410 1411 1412 # TODO: if offset table is corrupt, attacker could cause us to fetch whole 1413 # (large) share -
new file src/allmydata/immutable/download2_off.py
diff --git a/src/allmydata/immutable/download2_off.py b/src/allmydata/immutable/download2_off.py new file mode 100755 index 0000000..d2b8b99
- + 1 #! /usr/bin/python 2 3 # known (shnum,Server) pairs are sorted into a list according to 4 # desireability. This sort is picking a winding path through a matrix of 5 # [shnum][server]. The goal is to get diversity of both shnum and server. 6 7 # The initial order is: 8 # find the lowest shnum on the first server, add it 9 # look at the next server, find the lowest shnum that we don't already have 10 # if any 11 # next server, etc, until all known servers are checked 12 # now look at servers that we skipped (because ... 13 14 # Keep track of which block requests are outstanding by (shnum,Server). Don't 15 # bother prioritizing "validated" shares: the overhead to pull the share hash 16 # chain is tiny (4 hashes = 128 bytes), and the overhead to pull a new block 17 # hash chain is also tiny (1GB file, 8192 segments of 128KiB each, 13 hashes, 18 # 832 bytes). Each time a block request is sent, also request any necessary 19 # hashes. Don't bother with a "ValidatedShare" class (as distinct from some 20 # other sort of Share). Don't bother avoiding duplicate hash-chain requests. 21 22 # For each outstanding segread, walk the list and send requests (skipping 23 # outstanding shnums) until requests for k distinct shnums are in flight. If 24 # we can't do that, ask for more. If we get impatient on a request, find the 25 # first non-outstanding 26 27 # start with the first Share in the list, and send a request. Then look at 28 # the next one. If we already have a pending request for the same shnum or 29 # server, push that Share down onto the fallback list and try the next one, 30 # etc. If we run out of non-fallback shares, use the fallback ones, 31 # preferring shnums that we don't have outstanding requests for (i.e. assume 32 # that all requests will complete). Do this by having a second fallback list. 33 34 # hell, I'm reviving the Herder. But remember, we're still talking 3 objects 35 # per file, not thousands. 36 37 # actually, don't bother sorting the initial list. Append Shares as the 38 # responses come back, that will put the fastest servers at the front of the 39 # list, and give a tiny preference to servers that are earlier in the 40 # permuted order. 41 42 # more ideas: 43 # sort shares by: 44 # 1: number of roundtrips needed to get some data 45 # 2: share number 46 # 3: ms of RTT delay 47 # maybe measure average time-to-completion of requests, compare completion 48 # time against that, much larger indicates congestion on the server side 49 # or the server's upstream speed is less than our downstream. Minimum 50 # time-to-completion indicates min(our-downstream,their-upstream). Could 51 # fetch shares one-at-a-time to measure that better. 52 53 # when should we risk duplicate work and send a new request? 54 55 def walk(self): 56 shares = sorted(list) 57 oldshares = copy(shares) 58 outstanding = list() 59 fallbacks = list() 60 second_fallbacks = list() 61 while len(outstanding.nonlate.shnums) < k: # need more requests 62 while oldshares: 63 s = shares.pop(0) 64 if s.server in outstanding.servers or s.shnum in outstanding.shnums: 65 fallbacks.append(s) 66 continue 67 outstanding.append(s) 68 send_request(s) 69 break #'while need_more_requests' 70 # must use fallback list. Ask for more servers while we're at it. 71 ask_for_more_servers() 72 while fallbacks: 73 s = fallbacks.pop(0) 74 if s.shnum in outstanding.shnums: 75 # assume that the outstanding requests will complete, but 76 # send new requests for other shnums to existing servers 77 second_fallbacks.append(s) 78 continue 79 outstanding.append(s) 80 send_request(s) 81 break #'while need_more_requests' 82 # if we get here, we're being forced to send out multiple queries per 83 # share. We've already asked for more servers, which might help. If 84 # there are no late outstanding queries, then duplicate shares won't 85 # help. Don't send queries for duplicate shares until some of the 86 # queries are late. 87 if outstanding.late: 88 # we're allowed to try any non-outstanding share 89 while second_fallbacks: 90 pass 91 newshares = outstanding + fallbacks + second_fallbacks + oldshares 92 93 94 class Server: 95 """I represent an abstract Storage Server. One day, the StorageBroker 96 will return instances of me. For now, the StorageBroker returns (peerid, 97 RemoteReference) tuples, and this code wraps a Server instance around 98 them. 99 """ 100 def __init__(self, peerid, ss): 101 self.peerid = peerid 102 self.remote = ss 103 self._remote_buckets = {} # maps shnum to RIBucketReader 104 # TODO: release the bucket references on shares that we no longer 105 # want. OTOH, why would we not want them? Corruption? 106 107 def send_query(self, storage_index): 108 """I return a Deferred that fires with a set of shnums. If the server 109 had shares available, I will retain the RemoteReferences to its 110 buckets, so that get_data(shnum, range) can be called later.""" 111 d = self.remote.callRemote("get_buckets", self.storage_index) 112 d.addCallback(self._got_response) 113 return d 114 115 def _got_response(self, r): 116 self._remote_buckets = r 117 return set(r.keys()) 118 119 class ShareOnAServer: 120 """I represent one instance of a share, known to live on a specific 121 server. I am created every time a server responds affirmatively to a 122 do-you-have-block query.""" 123 124 def __init__(self, shnum, server): 125 self._shnum = shnum 126 self._server = server 127 self._block_hash_tree = None 128 129 def cost(self, segnum): 130 """I return a tuple of (roundtrips, bytes, rtt), indicating how 131 expensive I think it would be to fetch the given segment. Roundtrips 132 indicates how many roundtrips it is likely to take (one to get the 133 data and hashes, plus one to get the offset table and UEB if this is 134 the first segment we've ever fetched). 'bytes' is how many bytes we 135 must fetch (estimated). 'rtt' is estimated round-trip time (float) in 136 seconds for a trivial request. The downloading algorithm will compare 137 costs to decide which shares should be used.""" 138 # the most significant factor here is roundtrips: a Share for which 139 # we already have the offset table is better to than a brand new one 140 141 def max_bandwidth(self): 142 """Return a float, indicating the highest plausible bytes-per-second 143 that I've observed coming from this share. This will be based upon 144 the minimum (bytes-per-fetch / time-per-fetch) ever observed. This 145 can we used to estimate the server's upstream bandwidth. Clearly this 146 is only accurate if a share is retrieved with no contention for 147 either the upstream, downstream, or middle of the connection, but it 148 may still serve as a useful metric for deciding which servers to pull 149 from.""" 150 151 def get_segment(self, segnum): 152 """I return a Deferred that will fire with the segment data, or 153 errback.""" 154 155 class NativeShareOnAServer(ShareOnAServer): 156 """For tahoe native (foolscap) servers, I contain a RemoteReference to 157 the RIBucketReader instance.""" 158 def __init__(self, shnum, server, rref): 159 ShareOnAServer.__init__(self, shnum, server) 160 self._rref = rref # RIBucketReader 161 162 class Share: 163 def __init__(self, shnum): 164 self._shnum = shnum 165 # _servers are the Server instances which appear to hold a copy of 166 # this share. It is populated when the ValidShare is first created, 167 # or when we receive a get_buckets() response for a shnum that 168 # already has a ValidShare instance. When we lose the connection to a 169 # server, we remove it. 170 self._servers = set() 171 # offsets, UEB, and share_hash_tree all live in the parent. 172 # block_hash_tree lives here. 173 self._block_hash_tree = None 174 175 self._want 176 177 def get_servers(self): 178 return self._servers 179 180 181 def get_block(self, segnum): 182 # read enough data to obtain a single validated block 183 if not self.have_offsets: 184 # we get the offsets in their own read, since they tell us where 185 # everything else lives. We must fetch offsets for each share 186 # separately, since they aren't directly covered by the UEB. 187 pass 188 if not self.parent.have_ueb: 189 # use _guessed_segsize to make a guess about the layout, so we 190 # can fetch both the offset table and the UEB in the same read. 191 # This also requires making a guess about the presence or absence 192 # of the plaintext_hash_tree. Oh, and also the version number. Oh 193 # well. 194 pass 195 196 class CiphertextDownloader: 197 """I manage all downloads for a single file. I operate a state machine 198 with input events that are local read() requests, responses to my remote 199 'get_bucket' and 'read_bucket' messages, and connection establishment and 200 loss. My outbound events are connection establishment requests and bucket 201 read requests messages. 202 """ 203 # eventually this will merge into the FileNode 204 ServerClass = Server # for tests to override 205 206 def __init__(self, storage_index, ueb_hash, size, k, N, storage_broker, 207 shutdowner): 208 # values we get from the filecap 209 self._storage_index = si = storage_index 210 self._ueb_hash = ueb_hash 211 self._size = size 212 self._needed_shares = k 213 self._total_shares = N 214 self._share_hash_tree = IncompleteHashTree(self._total_shares) 215 # values we discover when we first fetch the UEB 216 self._ueb = None # is dict after UEB fetch+validate 217 self._segsize = None 218 self._numsegs = None 219 self._blocksize = None 220 self._tail_segsize = None 221 self._ciphertext_hash = None # optional 222 # structures we create when we fetch the UEB, then continue to fill 223 # as we download the file 224 self._share_hash_tree = None # is IncompleteHashTree after UEB fetch 225 self._ciphertext_hash_tree = None 226 227 # values we learn as we download the file 228 self._offsets = {} # (shnum,Server) to offset table (dict) 229 self._block_hash_tree = {} # shnum to IncompleteHashTree 230 # other things which help us 231 self._guessed_segsize = min(128*1024, size) 232 self._active_share_readers = {} # maps shnum to Reader instance 233 self._share_readers = [] # sorted by preference, best first 234 self._readers = set() # set of Reader instances 235 self._recent_horizon = 10 # seconds 236 237 # 'shutdowner' is a MultiService parent used to cancel all downloads 238 # when the node is shutting down, to let tests have a clean reactor. 239 240 self._init_available_servers() 241 self._init_find_enough_shares() 242 243 # _available_servers is an iterator that provides us with Server 244 # instances. Each time we pull out a Server, we immediately send it a 245 # query, so we don't need to keep track of who we've sent queries to. 246 247 def _init_available_servers(self): 248 self._available_servers = self._get_available_servers() 249 self._no_more_available_servers = False 250 251 def _get_available_servers(self): 252 """I am a generator of servers to use, sorted by the order in which 253 we should query them. I make sure there are no duplicates in this 254 list.""" 255 # TODO: make StorageBroker responsible for this non-duplication, and 256 # replace this method with a simple iter(get_servers_for_index()), 257 # plus a self._no_more_available_servers=True 258 seen = set() 259 sb = self._storage_broker 260 for (peerid, ss) in sb.get_servers_for_index(self._storage_index): 261 if peerid not in seen: 262 yield self.ServerClass(peerid, ss) # Server(peerid, ss) 263 seen.add(peerid) 264 self._no_more_available_servers = True 265 266 # this block of code is responsible for having enough non-problematic 267 # distinct shares/servers available and ready for download, and for 268 # limiting the number of queries that are outstanding. The idea is that 269 # we'll use the k fastest/best shares, and have the other ones in reserve 270 # in case those servers stop responding or respond too slowly. We keep 271 # track of all known shares, but we also keep track of problematic shares 272 # (ones with hash failures or lost connections), so we can put them at 273 # the bottom of the list. 274 275 def _init_find_enough_shares(self): 276 # _unvalidated_sharemap maps shnum to set of Servers, and remembers 277 # where viable (but not yet validated) shares are located. Each 278 # get_bucket() response adds to this map, each act of validation 279 # removes from it. 280 self._sharemap = DictOfSets() 281 282 # _sharemap maps shnum to set of Servers, and remembers where viable 283 # shares are located. Each get_bucket() response adds to this map, 284 # each hash failure or disconnect removes from it. (TODO: if we 285 # disconnect but reconnect later, we should be allowed to re-query). 286 self._sharemap = DictOfSets() 287 288 # _problem_shares is a set of (shnum, Server) tuples, and 289 290 # _queries_in_flight maps a Server to a timestamp, which remembers 291 # which servers we've sent queries to (and when) but have not yet 292 # heard a response. This lets us put a limit on the number of 293 # outstanding queries, to limit the size of the work window (how much 294 # extra work we ask servers to do in the hopes of keeping our own 295 # pipeline filled). We remove a Server from _queries_in_flight when 296 # we get an answer/error or we finally give up. If we ever switch to 297 # a non-connection-oriented protocol (like UDP, or forwarded Chord 298 # queries), we can use this information to retransmit any query that 299 # has gone unanswered for too long. 300 self._queries_in_flight = dict() 301 302 def _count_recent_queries_in_flight(self): 303 now = time.time() 304 recent = now - self._recent_horizon 305 return len([s for (s,when) in self._queries_in_flight.items() 306 if when > recent]) 307 308 def _find_enough_shares(self): 309 # goal: have 2*k distinct not-invalid shares available for reading, 310 # from 2*k distinct servers. Do not have more than 4*k "recent" 311 # queries in flight at a time. 312 if (len(self._sharemap) >= 2*self._needed_shares 313 and len(self._sharemap.values) >= 2*self._needed_shares): 314 return 315 num = self._count_recent_queries_in_flight() 316 while num < 4*self._needed_shares: 317 try: 318 s = self._available_servers.next() 319 except StopIteration: 320 return # no more progress can be made 321 self._queries_in_flight[s] = time.time() 322 d = s.send_query(self._storage_index) 323 d.addBoth(incidentally, self._queries_in_flight.discard, s) 324 d.addCallbacks(lambda shnums: [self._sharemap.add(shnum, s) 325 for shnum in shnums], 326 lambda f: self._query_error(f, s)) 327 d.addErrback(self._error) 328 d.addCallback(self._reschedule) 329 num += 1 330 331 def _query_error(self, f, s): 332 # a server returned an error, log it gently and ignore 333 level = log.WEIRD 334 if f.check(DeadReferenceError): 335 level = log.UNUSUAL 336 log.msg("Error during get_buckets to server=%(server)s", server=str(s), 337 failure=f, level=level, umid="3uuBUQ") 338 339 # this block is responsible for turning known shares into usable shares, 340 # by fetching enough data to validate their contents. 341 342 # UEB (from any share) 343 # share hash chain, validated (from any share, for given shnum) 344 # block hash (any share, given shnum) 345 346 def _got_ueb(self, ueb_data, share): 347 if self._ueb is not None: 348 return 349 if hashutil.uri_extension_hash(ueb_data) != self._ueb_hash: 350 share.error("UEB hash does not match") 351 return 352 d = uri.unpack_extension(ueb_data) 353 self.share_size = mathutil.div_ceil(self._size, self._needed_shares) 354 355 356 # There are several kinds of things that can be found in a UEB. 357 # First, things that we really need to learn from the UEB in order to 358 # do this download. Next: things which are optional but not redundant 359 # -- if they are present in the UEB they will get used. Next, things 360 # that are optional and redundant. These things are required to be 361 # consistent: they don't have to be in the UEB, but if they are in 362 # the UEB then they will be checked for consistency with the 363 # already-known facts, and if they are inconsistent then an exception 364 # will be raised. These things aren't actually used -- they are just 365 # tested for consistency and ignored. Finally: things which are 366 # deprecated -- they ought not be in the UEB at all, and if they are 367 # present then a warning will be logged but they are otherwise 368 # ignored. 369 370 # First, things that we really need to learn from the UEB: 371 # segment_size, crypttext_root_hash, and share_root_hash. 372 self._segsize = d['segment_size'] 373 374 self._blocksize = mathutil.div_ceil(self._segsize, self._needed_shares) 375 self._numsegs = mathutil.div_ceil(self._size, self._segsize) 376 377 self._tail_segsize = self._size % self._segsize 378 if self._tail_segsize == 0: 379 self._tail_segsize = self._segsize 380 # padding for erasure code 381 self._tail_segsize = mathutil.next_multiple(self._tail_segsize, 382 self._needed_shares) 383 384 # Ciphertext hash tree root is mandatory, so that there is at most 385 # one ciphertext that matches this read-cap or verify-cap. The 386 # integrity check on the shares is not sufficient to prevent the 387 # original encoder from creating some shares of file A and other 388 # shares of file B. 389 self._ciphertext_hash_tree = IncompleteHashTree(self._numsegs) 390 self._ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) 391 392 self._share_hash_tree.set_hashes({0: d['share_root_hash']}) 393 394 395 # Next: things that are optional and not redundant: crypttext_hash 396 if 'crypttext_hash' in d: 397 if len(self._ciphertext_hash) == hashutil.CRYPTO_VAL_SIZE: 398 self._ciphertext_hash = d['crypttext_hash'] 399 else: 400 log.msg("ignoring bad-length UEB[crypttext_hash], " 401 "got %d bytes, want %d" % (len(d['crypttext_hash']), 402 hashutil.CRYPTO_VAL_SIZE), 403 umid="oZkGLA", level=log.WEIRD) 404 405 # we ignore all of the redundant fields when downloading. The 406 # Verifier uses a different code path which does not ignore them. 407 408 # finally, set self._ueb as a marker that we don't need to request it 409 # anymore 410 self._ueb = d 411 412 def _got_share_hashes(self, hashes, share): 413 assert isinstance(hashes, dict) 414 try: 415 self._share_hash_tree.set_hashes(hashes) 416 except (IndexError, BadHashError, NotEnoughHashesError), le: 417 share.error("Bad or missing hashes") 418 return 419 420 #def _got_block_hashes( 421 422 def _init_validate_enough_shares(self): 423 # _valid_shares maps shnum to ValidatedShare instances, and is 424 # populated once the block hash root has been fetched and validated 425 # (which requires any valid copy of the UEB, and a valid copy of the 426 # share hash chain for each shnum) 427 self._valid_shares = {} 428 429 # _target_shares is an ordered list of ReadyShare instances, each of 430 # which is a (shnum, server) tuple. It is sorted in order of 431 # preference: we expect to get the fastest response from the 432 # ReadyShares at the front of the list. It is also sorted to 433 # distribute the shnums, so that fetching shares from 434 # _target_shares[:k] is likely (but not guaranteed) to give us k 435 # distinct shares. The rule is that we skip over entries for blocks 436 # that we've already received, limit the number of recent queries for 437 # the same block, 438 self._target_shares = [] 439 440 def _validate_enough_shares(self): 441 # my goal is to have at least 2*k distinct validated shares from at 442 # least 2*k distinct servers 443 valid_share_servers = set() 444 for vs in self._valid_shares.values(): 445 valid_share_servers.update(vs.get_servers()) 446 if (len(self._valid_shares) >= 2*self._needed_shares 447 and len(self._valid_share_servers) >= 2*self._needed_shares): 448 return 449 #for 450 451 def _reschedule(self, _ign): 452 # fire the loop again 453 if not self._scheduled: 454 self._scheduled = True 455 eventually(self._loop) 456 457 def _loop(self): 458 self._scheduled = False 459 # what do we need? 460 461 self._find_enough_shares() 462 self._validate_enough_shares() 463 464 if not self._ueb: 465 # we always need a copy of the UEB 466 pass 467 468 def _error(self, f): 469 # this is an unexpected error: a coding bug 470 log.err(f, level=log.UNUSUAL) 471 472 473 474 # using a single packed string (and an offset table) may be an artifact of 475 # our native storage server: other backends might allow cheap multi-part 476 # files (think S3, several buckets per share, one for each section). 477 478 # find new names for: 479 # data_holder 480 # Share / Share2 (ShareInstance / Share? but the first is more useful) 481 482 class IShare(Interface): 483 """I represent a single instance of a single share (e.g. I reference the 484 shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). 485 This interface is used by SegmentFetcher to retrieve validated blocks. 486 """ 487 def get_block(segnum): 488 """Return an Observer2, which will be notified with the following 489 events: 490 state=COMPLETE, block=data (terminal): validated block data 491 state=OVERDUE (non-terminal): we have reason to believe that the 492 request might have stalled, or we 493 might just be impatient 494 state=CORRUPT (terminal): the data we received was corrupt 495 state=DEAD (terminal): the connection has failed 496 """ 497 498 499 # it'd be nice if we receive the hashes before the block, or just 500 # afterwards, so we aren't stuck holding on to unvalidated blocks 501 # that we can't process. If we guess the offsets right, we can 502 # accomplish this by sending the block request after the metadata 503 # requests (by keeping two separate requestlists), and have a one RTT 504 # pipeline like: 505 # 1a=metadata, 1b=block 506 # 1b->process+deliver : one RTT 507 508 # But if we guess wrong, and fetch the wrong part of the block, we'll 509 # have a pipeline that looks like: 510 # 1a=wrong metadata, 1b=wrong block 511 # 1a->2a=right metadata,2b=right block 512 # 2b->process+deliver 513 # which means two RTT and buffering one block (which, since we'll 514 # guess the segsize wrong for everything, means buffering one 515 # segment) 516 517 # if we start asking for multiple segments, we could get something 518 # worse: 519 # 1a=wrong metadata, 1b=wrong block0, 1c=wrong block1, .. 520 # 1a->2a=right metadata,2b=right block0,2c=right block1, . 521 # 2b->process+deliver 522 523 # which means two RTT but fetching and buffering the whole file 524 # before delivering anything. However, since we don't know when the 525 # other shares are going to arrive, we need to avoid having more than 526 # one block in the pipeline anyways. So we shouldn't be able to get 527 # into this state. 528 529 # it also means that, instead of handling all of 530 # self._requested_blocks at once, we should only be handling one 531 # block at a time: one of the requested block should be special 532 # (probably FIFO). But retire all we can. 533 534 # this might be better with a Deferred, using COMPLETE as the success 535 # case and CORRUPT/DEAD in an errback, because that would let us hold the 536 # 'share' and 'shnum' arguments locally (instead of roundtripping them 537 # through Share.send_request). But that OVERDUE is not terminal. So I 538 # want a new sort of callback mechanism, with the extra-argument-passing 539 # aspects of Deferred, but without being so one-shot. Is this a job for 540 # Observer? No, it doesn't take extra arguments. So this uses Observer2. 541 542 543 class Reader: 544 """I am responsible for a single offset+size read of the file. I handle 545 segmentation: I figure out which segments are necessary, request them 546 (from my CiphertextDownloader) in order, and trim the segments down to 547 match the offset+size span. I use the Producer/Consumer interface to only 548 request one segment at a time. 549 """ 550 implements(IPushProducer) 551 def __init__(self, consumer, offset, size): 552 self._needed = [] 553 self._consumer = consumer 554 self._hungry = False 555 self._offset = offset 556 self._size = size 557 self._segsize = None 558 def start(self): 559 self._alive = True 560 self._deferred = defer.Deferred() 561 # the process doesn't actually start until set_segment_size() 562 return self._deferred 563 564 def set_segment_size(self, segsize): 565 if self._segsize is not None: 566 return 567 self._segsize = segsize 568 self._compute_segnums() 569 570 def _compute_segnums(self, segsize): 571 # now that we know the file's segsize, what segments (and which 572 # ranges of each) will we need? 573 size = self._size 574 offset = self._offset 575 while size: 576 assert size >= 0 577 this_seg_num = int(offset / self._segsize) 578 this_seg_offset = offset - (seg_num*self._segsize) 579 this_seg_size = min(size, self._segsize-seg_offset) 580 size -= this_seg_size 581 if size: 582 offset += this_seg_size 583 yield (this_seg_num, this_seg_offset, this_seg_size) 584 585 def get_needed_segments(self): 586 return set([segnum for (segnum, off, size) in self._needed]) 587 588 589 def stopProducing(self): 590 self._hungry = False 591 self._alive = False 592 # TODO: cancel the segment requests 593 def pauseProducing(self): 594 self._hungry = False 595 def resumeProducing(self): 596 self._hungry = True 597 def add_segment(self, segnum, offset, size): 598 self._needed.append( (segnum, offset, size) ) 599 def got_segment(self, segnum, segdata): 600 """Return True if this schedule has more to go, or False if it is 601 done.""" 602 assert self._needed[0][segnum] == segnum 603 (_ign, offset, size) = self._needed.pop(0) 604 data = segdata[offset:offset+size] 605 self._consumer.write(data) 606 if not self._needed: 607 # we're done 608 self._alive = False 609 self._hungry = False 610 self._consumer.unregisterProducer() 611 self._deferred.callback(self._consumer) 612 def error(self, f): 613 self._alive = False 614 self._hungry = False 615 self._consumer.unregisterProducer() 616 self._deferred.errback(f) 617 618 619 620 class x: 621 def OFFread(self, consumer, offset=0, size=None): 622 """I am the main entry point, from which FileNode.read() can get 623 data.""" 624 # tolerate concurrent operations: each gets its own Reader 625 if size is None: 626 size = self._size - offset 627 r = Reader(consumer, offset, size) 628 self._readers.add(r) 629 d = r.start() 630 if self.segment_size is not None: 631 r.set_segment_size(self.segment_size) 632 # TODO: if we can't find any segments, and thus never get a 633 # segsize, tell the Readers to give up 634 return d -
new file src/allmydata/immutable/download2_util.py
diff --git a/src/allmydata/immutable/download2_util.py b/src/allmydata/immutable/download2_util.py new file mode 100755 index 0000000..32a0ad4
- + 1 2 import weakref 3 4 class Observer2: 5 """A simple class to distribute multiple events to a single subscriber. 6 It accepts arbitrary kwargs, but no posargs.""" 7 def __init__(self): 8 self._watcher = None 9 self._undelivered_results = [] 10 self._canceler = None 11 12 def set_canceler(self, f): 13 # we use a weakref to avoid creating a cycle between us and the thing 14 # we're observing: they'll be holding a reference to us to compare 15 # against the value we pass to their canceler function. 16 self._canceler = weakref(f) 17 18 def subscribe(self, observer, **watcher_kwargs): 19 self._watcher = (observer, watcher_kwargs) 20 while self._undelivered_results: 21 self._notify(self._undelivered_results.pop(0)) 22 23 def notify(self, **result_kwargs): 24 if self._watcher: 25 self._notify(result_kwargs) 26 else: 27 self._undelivered_results.append(result_kwargs) 28 29 def _notify(self, result_kwargs): 30 o, watcher_kwargs = self._watcher 31 kwargs = dict(result_kwargs) 32 kwargs.update(watcher_kwargs) 33 eventually(o, **kwargs) 34 35 def cancel(self): 36 f = self._canceler() 37 if f: 38 f(self) 39 40 class DictOfSets: 41 def add(self, key, value): pass # XXX 42 def values(self): # return set that merges all value sets 43 r = set() 44 for key in self: 45 r.update(self[key]) 46 return r 47 48 49 def incidentally(res, f, *args, **kwargs): 50 """Add me to a Deferred chain like this: 51 d.addBoth(incidentally, func, arg) 52 and I'll behave as if you'd added the following function: 53 def _(res): 54 func(arg) 55 return res 56 This is useful if you want to execute an expression when the Deferred 57 fires, but don't care about its value. 58 """ 59 f(*args, **kwargs) 60 return res 61 62 63 import weakref 64 class Terminator(service.Service): 65 def __init__(self): 66 service.Service.__init__(self) 67 self._clients = weakref.WeakKeyDictionary() 68 def register(self, c): 69 self._clients[c] = None 70 def stopService(self): 71 for c in self._clients: 72 c.stop() 73 return service.Service.stopService(self) -
src/allmydata/test/test_util.py
diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 0a326b3..5f6ce67 100644
a b from twisted.trial import unittest 7 7 from twisted.internet import defer, reactor 8 8 from twisted.python.failure import Failure 9 9 from twisted.python import log 10 from hashlib import md5 10 11 11 12 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil 12 13 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate 13 14 from allmydata.util import limiter, time_format, pollmixin, cachedir 14 15 from allmydata.util import statistics, dictutil, pipeline 15 16 from allmydata.util import log as tahoe_log 17 from allmydata.util.spans import Spans, overlap, DataSpans 16 18 17 19 class Base32(unittest.TestCase): 18 20 def test_b2a_matches_Pythons(self): … … class Log(unittest.TestCase): 1537 1539 tahoe_log.err(format="intentional sample error", 1538 1540 failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ") 1539 1541 self.flushLoggedErrors(SampleError) 1542 1543 1544 class SimpleSpans: 1545 # this is a simple+inefficient form of util.spans.Spans . We compare the 1546 # behavior of this reference model against the real (efficient) form. 1547 1548 def __init__(self, _span_or_start=None, length=None): 1549 self._have = set() 1550 if length is not None: 1551 for i in range(_span_or_start, _span_or_start+length): 1552 self._have.add(i) 1553 elif _span_or_start: 1554 for (start,length) in _span_or_start: 1555 self.add(start, length) 1556 1557 def add(self, start, length): 1558 for i in range(start, start+length): 1559 self._have.add(i) 1560 return self 1561 1562 def remove(self, start, length): 1563 for i in range(start, start+length): 1564 self._have.discard(i) 1565 return self 1566 1567 def each(self): 1568 return sorted(self._have) 1569 1570 def __iter__(self): 1571 items = sorted(self._have) 1572 prevstart = None 1573 prevend = None 1574 for i in items: 1575 if prevstart is None: 1576 prevstart = prevend = i 1577 continue 1578 if i == prevend+1: 1579 prevend = i 1580 continue 1581 yield (prevstart, prevend-prevstart+1) 1582 prevstart = prevend = i 1583 if prevstart is not None: 1584 yield (prevstart, prevend-prevstart+1) 1585 1586 def __len__(self): 1587 # this also gets us bool(s) 1588 return len(self._have) 1589 1590 def __add__(self, other): 1591 s = self.__class__(self) 1592 for (start, length) in other: 1593 s.add(start, length) 1594 return s 1595 1596 def __sub__(self, other): 1597 s = self.__class__(self) 1598 for (start, length) in other: 1599 s.remove(start, length) 1600 return s 1601 1602 def __iadd__(self, other): 1603 for (start, length) in other: 1604 self.add(start, length) 1605 return self 1606 1607 def __isub__(self, other): 1608 for (start, length) in other: 1609 self.remove(start, length) 1610 return self 1611 1612 def __contains__(self, (start,length)): 1613 for i in range(start, start+length): 1614 if i not in self._have: 1615 return False 1616 return True 1617 1618 class ByteSpans(unittest.TestCase): 1619 def test_basic(self): 1620 s = Spans() 1621 self.failUnlessEqual(list(s), []) 1622 self.failIf(s) 1623 self.failIf((0,1) in s) 1624 self.failUnlessEqual(len(s), 0) 1625 1626 s1 = Spans(3, 4) # 3,4,5,6 1627 self._check1(s1) 1628 1629 s2 = Spans(s1) 1630 self._check1(s2) 1631 1632 s2.add(10,2) # 10,11 1633 self._check1(s1) 1634 self.failUnless((10,1) in s2) 1635 self.failIf((10,1) in s1) 1636 self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11]) 1637 self.failUnlessEqual(len(s2), 6) 1638 1639 s2.add(15,2).add(20,2) 1640 self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21]) 1641 self.failUnlessEqual(len(s2), 10) 1642 1643 s2.remove(4,3).remove(15,1) 1644 self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21]) 1645 self.failUnlessEqual(len(s2), 6) 1646 1647 def _check1(self, s): 1648 self.failUnlessEqual(list(s), [(3,4)]) 1649 self.failUnless(s) 1650 self.failUnlessEqual(len(s), 4) 1651 self.failIf((0,1) in s) 1652 self.failUnless((3,4) in s) 1653 self.failUnless((3,1) in s) 1654 self.failUnless((5,2) in s) 1655 self.failUnless((6,1) in s) 1656 self.failIf((6,2) in s) 1657 self.failIf((7,1) in s) 1658 self.failUnlessEqual(list(s.each()), [3,4,5,6]) 1659 1660 def test_math(self): 1661 s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9 1662 s2 = Spans(5, 3) # 5,6,7 1663 s3 = Spans(8, 4) # 8,9,10,11 1664 1665 s = s1 - s2 1666 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) 1667 s = s1 - s3 1668 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) 1669 s = s2 - s3 1670 self.failUnlessEqual(list(s.each()), [5,6,7]) 1671 1672 s = s1 + s2 1673 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) 1674 s = s1 + s3 1675 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) 1676 s = s2 + s3 1677 self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) 1678 1679 s = Spans(s1) 1680 s -= s2 1681 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9]) 1682 s = Spans(s1) 1683 s -= s3 1684 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7]) 1685 s = Spans(s2) 1686 s -= s3 1687 self.failUnlessEqual(list(s.each()), [5,6,7]) 1688 1689 s = Spans(s1) 1690 s += s2 1691 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9]) 1692 s = Spans(s1) 1693 s += s3 1694 self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11]) 1695 s = Spans(s2) 1696 s += s3 1697 self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11]) 1698 1699 def test_random(self): 1700 # attempt to increase coverage of corner cases by comparing behavior 1701 # of a simple-but-slow model implementation against the 1702 # complex-but-fast actual implementation, in a large number of random 1703 # operations 1704 S1 = SimpleSpans 1705 S2 = Spans 1706 s1 = S1(); s2 = S2() 1707 seed = "" 1708 def _create(subseed): 1709 ns1 = S1(); ns2 = S2() 1710 for i in range(10): 1711 what = md5(subseed+str(i)).hexdigest() 1712 start = int(what[2:4], 16) 1713 length = max(1,int(what[5:6], 16)) 1714 ns1.add(start, length); ns2.add(start, length) 1715 return ns1, ns2 1716 1717 #print 1718 for i in range(1000): 1719 what = md5(seed+str(i)).hexdigest() 1720 op = what[0] 1721 subop = what[1] 1722 start = int(what[2:4], 16) 1723 length = max(1,int(what[5:6], 16)) 1724 #print what 1725 if op in "0": 1726 if subop in "01234": 1727 s1 = S1(); s2 = S2() 1728 elif subop in "5678": 1729 s1 = S1(start, length); s2 = S2(start, length) 1730 else: 1731 s1 = S1(s1); s2 = S2(s2) 1732 #print "s2 = %s" % s2.dump() 1733 elif op in "123": 1734 #print "s2.add(%d,%d)" % (start, length) 1735 s1.add(start, length); s2.add(start, length) 1736 elif op in "456": 1737 #print "s2.remove(%d,%d)" % (start, length) 1738 s1.remove(start, length); s2.remove(start, length) 1739 elif op in "78": 1740 ns1, ns2 = _create(what[7:11]) 1741 #print "s2 + %s" % ns2.dump() 1742 s1 = s1 + ns1; s2 = s2 + ns2 1743 elif op in "9a": 1744 ns1, ns2 = _create(what[7:11]) 1745 #print "%s - %s" % (s2.dump(), ns2.dump()) 1746 s1 = s1 - ns1; s2 = s2 - ns2 1747 elif op in "bc": 1748 ns1, ns2 = _create(what[7:11]) 1749 #print "s2 += %s" % ns2.dump() 1750 s1 += ns1; s2 += ns2 1751 else: 1752 ns1, ns2 = _create(what[7:11]) 1753 #print "%s -= %s" % (s2.dump(), ns2.dump()) 1754 s1 -= ns1; s2 -= ns2 1755 #print "s2 now %s" % s2.dump() 1756 self.failUnlessEqual(list(s1.each()), list(s2.each())) 1757 self.failUnlessEqual(len(s1), len(s2)) 1758 self.failUnlessEqual(bool(s1), bool(s2)) 1759 self.failUnlessEqual(list(s1), list(s2)) 1760 for j in range(10): 1761 what = md5(what[12:14]+str(j)).hexdigest() 1762 start = int(what[2:4], 16) 1763 length = max(1, int(what[5:6], 16)) 1764 span = (start, length) 1765 self.failUnlessEqual(bool(span in s1), bool(span in s2)) 1766 1767 1768 # s() 1769 # s(start,length) 1770 # s(s0) 1771 # s.add(start,length) : returns s 1772 # s.remove(start,length) 1773 # s.each() -> list of byte offsets, mostly for testing 1774 # list(s) -> list of (start,length) tuples, one per span 1775 # (start,length) in s -> True if (start..start+length-1) are all members 1776 # NOT equivalent to x in list(s) 1777 # len(s) -> number of bytes, for testing, bool(), and accounting/limiting 1778 # bool(s) (__len__) 1779 # s = s1+s2, s1-s2, +=s1, -=s1 1780 1781 def test_overlap(self): 1782 for a in range(20): 1783 for b in range(10): 1784 for c in range(20): 1785 for d in range(10): 1786 self._test_overlap(a,b,c,d) 1787 1788 def _test_overlap(self, a, b, c, d): 1789 s1 = set(range(a,a+b)) 1790 s2 = set(range(c,c+d)) 1791 #print "---" 1792 #self._show_overlap(s1, "1") 1793 #self._show_overlap(s2, "2") 1794 o = overlap(a,b,c,d) 1795 expected = s1.intersection(s2) 1796 if not expected: 1797 self.failUnlessEqual(o, None) 1798 else: 1799 start,length = o 1800 so = set(range(start,start+length)) 1801 #self._show(so, "o") 1802 self.failUnlessEqual(so, expected) 1803 1804 def _show_overlap(self, s, c): 1805 import sys 1806 out = sys.stdout 1807 if s: 1808 for i in range(max(s)): 1809 if i in s: 1810 out.write(c) 1811 else: 1812 out.write(" ") 1813 out.write("\n") 1814 1815 def extend(s, start, length, fill): 1816 if len(s) >= start+length: 1817 return s 1818 assert len(fill) == 1 1819 return s + fill*(start+length-len(s)) 1820 1821 def replace(s, start, data): 1822 assert len(s) >= start+len(data) 1823 return s[:start] + data + s[start+len(data):] 1824 1825 class SimpleDataSpans: 1826 def __init__(self, other=None): 1827 self.missing = "" # "1" where missing, "0" where found 1828 self.data = "" 1829 if other: 1830 for (start, data) in other.get_spans(): 1831 self.add(start, data) 1832 1833 def __len__(self): 1834 return len(self.missing.translate(None, "1")) 1835 def _dump(self): 1836 return [i for (i,c) in enumerate(self.missing) if c == "0"] 1837 def _have(self, start, length): 1838 m = self.missing[start:start+length] 1839 if not m or len(m)<length or int(m): 1840 return False 1841 return True 1842 def get_spans(self): 1843 for i in self._dump(): 1844 yield (i, self.data[i]) 1845 def get(self, start, length): 1846 if self._have(start, length): 1847 return self.data[start:start+length] 1848 return None 1849 def pop(self, start, length): 1850 data = self.get(start, length) 1851 if data: 1852 self.remove(start, length) 1853 return data 1854 def remove(self, start, length): 1855 self.missing = replace(extend(self.missing, start, length, "1"), 1856 start, "1"*length) 1857 def add(self, start, data): 1858 self.missing = replace(extend(self.missing, start, len(data), "1"), 1859 start, "0"*len(data)) 1860 self.data = replace(extend(self.data, start, len(data), " "), 1861 start, data) 1862 1863 1864 class StringSpans(unittest.TestCase): 1865 def do_basic(self, klass): 1866 ds = klass() 1867 self.failUnlessEqual(len(ds), 0) 1868 self.failUnlessEqual(list(ds._dump()), []) 1869 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 0) 1870 self.failUnlessEqual(ds.get(0, 4), None) 1871 self.failUnlessEqual(ds.pop(0, 4), None) 1872 ds.remove(0, 4) 1873 1874 ds.add(2, "four") 1875 self.failUnlessEqual(len(ds), 4) 1876 self.failUnlessEqual(list(ds._dump()), [2,3,4,5]) 1877 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4) 1878 self.failUnlessEqual(ds.get(0, 4), None) 1879 self.failUnlessEqual(ds.pop(0, 4), None) 1880 self.failUnlessEqual(ds.get(4, 4), None) 1881 1882 ds2 = klass(ds) 1883 self.failUnlessEqual(len(ds2), 4) 1884 self.failUnlessEqual(list(ds2._dump()), [2,3,4,5]) 1885 self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 4) 1886 self.failUnlessEqual(ds2.get(0, 4), None) 1887 self.failUnlessEqual(ds2.pop(0, 4), None) 1888 self.failUnlessEqual(ds2.pop(2, 3), "fou") 1889 self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_spans()]), 1) 1890 self.failUnlessEqual(ds2.get(2, 3), None) 1891 self.failUnlessEqual(ds2.get(5, 1), "r") 1892 self.failUnlessEqual(ds.get(2, 3), "fou") 1893 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 4) 1894 1895 ds.add(0, "23") 1896 self.failUnlessEqual(len(ds), 6) 1897 self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5]) 1898 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 6) 1899 self.failUnlessEqual(ds.get(0, 4), "23fo") 1900 self.failUnlessEqual(ds.pop(0, 4), "23fo") 1901 self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_spans()]), 2) 1902 self.failUnlessEqual(ds.get(0, 4), None) 1903 self.failUnlessEqual(ds.pop(0, 4), None) 1904 1905 ds = klass() 1906 ds.add(2, "four") 1907 ds.add(3, "ea") 1908 self.failUnlessEqual(ds.get(2, 4), "fear") 1909 1910 def do_scan(self, klass): 1911 # do a test with gaps and spans of size 1 and 2 1912 # left=(1,11) * right=(1,11) * gapsize=(1,2) 1913 # 111, 112, 121, 122, 211, 212, 221, 222 1914 # 211 1915 # 121 1916 # 112 1917 # 212 1918 # 222 1919 # 221 1920 # 111 1921 # 122 1922 # 11 1 1 11 11 11 1 1 111 1923 # 0123456789012345678901234567 1924 # abcdefghijklmnopqrstuvwxyz-= 1925 pieces = [(1, "bc"), 1926 (4, "e"), 1927 (7, "h"), 1928 (9, "jk"), 1929 (12, "mn"), 1930 (16, "qr"), 1931 (20, "u"), 1932 (22, "w"), 1933 (25, "z-="), 1934 ] 1935 p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27]) 1936 S = "abcdefghijklmnopqrstuvwxyz-=" 1937 # TODO: when adding data, add capital letters, to make sure we aren't 1938 # just leaving the old data in place 1939 l = len(S) 1940 def base(): 1941 ds = klass() 1942 for start, data in pieces: 1943 ds.add(start, data) 1944 return ds 1945 def dump(s): 1946 p = set(s._dump()) 1947 # wow, this is the first time I've ever wanted ?: in python 1948 # note: this requires python2.5 1949 d = "".join([(S[i] if i in p else " ") for i in range(l)]) 1950 assert len(d) == l 1951 return d 1952 DEBUG = False 1953 for start in range(0, l): 1954 for end in range(start+1, l): 1955 # add [start-end) to the baseline 1956 which = "%d-%d" % (start, end-1) 1957 p_added = set(range(start, end)) 1958 b = base() 1959 if DEBUG: 1960 print 1961 print dump(b), which 1962 add = klass(); add.add(start, S[start:end]) 1963 print dump(add) 1964 b.add(start, S[start:end]) 1965 if DEBUG: 1966 print dump(b) 1967 # check that the new span is there 1968 d = b.get(start, end-start) 1969 self.failUnlessEqual(d, S[start:end], which) 1970 # check that all the original pieces are still there 1971 for t_start, t_data in pieces: 1972 t_len = len(t_data) 1973 self.failUnlessEqual(b.get(t_start, t_len), 1974 S[t_start:t_start+t_len], 1975 "%s %d+%d" % (which, t_start, t_len)) 1976 # check that a lot of subspans are mostly correct 1977 for t_start in range(l): 1978 for t_len in range(1,4): 1979 d = b.get(t_start, t_len) 1980 if d is not None: 1981 which2 = "%s+(%d-%d)" % (which, t_start, 1982 t_start+t_len-1) 1983 self.failUnlessEqual(d, S[t_start:t_start+t_len], 1984 which2) 1985 # check that removing a subspan gives the right value 1986 b2 = klass(b) 1987 b2.remove(t_start, t_len) 1988 removed = set(range(t_start, t_start+t_len)) 1989 for i in range(l): 1990 exp = (((i in p_elements) or (i in p_added)) 1991 and (i not in removed)) 1992 which2 = "%s-(%d-%d)" % (which, t_start, 1993 t_start+t_len-1) 1994 self.failUnlessEqual(bool(b2.get(i, 1)), exp, 1995 which2+" %d" % i) 1996 1997 def test_test(self): 1998 self.do_basic(SimpleDataSpans) 1999 self.do_scan(SimpleDataSpans) 2000 2001 def test_basic(self): 2002 self.do_basic(DataSpans) 2003 self.do_scan(DataSpans) 2004 2005 def test_random(self): 2006 # attempt to increase coverage of corner cases by comparing behavior 2007 # of a simple-but-slow model implementation against the 2008 # complex-but-fast actual implementation, in a large number of random 2009 # operations 2010 S1 = SimpleDataSpans 2011 S2 = DataSpans 2012 s1 = S1(); s2 = S2() 2013 seed = "" 2014 def _randstr(length, seed): 2015 created = 0 2016 pieces = [] 2017 while created < length: 2018 piece = md5(seed + str(created)).hexdigest() 2019 pieces.append(piece) 2020 created += len(piece) 2021 return "".join(pieces)[:length] 2022 def _create(subseed): 2023 ns1 = S1(); ns2 = S2() 2024 for i in range(10): 2025 what = md5(subseed+str(i)).hexdigest() 2026 start = int(what[2:4], 16) 2027 length = max(1,int(what[5:6], 16)) 2028 ns1.add(start, _randstr(length, what[7:9])); 2029 ns2.add(start, _randstr(length, what[7:9])) 2030 return ns1, ns2 2031 2032 #print 2033 for i in range(1000): 2034 what = md5(seed+str(i)).hexdigest() 2035 op = what[0] 2036 subop = what[1] 2037 start = int(what[2:4], 16) 2038 length = max(1,int(what[5:6], 16)) 2039 #print what 2040 if op in "0": 2041 if subop in "0123456": 2042 s1 = S1(); s2 = S2() 2043 else: 2044 s1, s2 = _create(what[7:11]) 2045 #print "s2 = %s" % list(s2._dump()) 2046 elif op in "123456": 2047 #print "s2.add(%d,%d)" % (start, length) 2048 s1.add(start, _randstr(length, what[7:9])); 2049 s2.add(start, _randstr(length, what[7:9])) 2050 elif op in "789abc": 2051 #print "s2.remove(%d,%d)" % (start, length) 2052 s1.remove(start, length); s2.remove(start, length) 2053 else: 2054 #print "s2.pop(%d,%d)" % (start, length) 2055 d1 = s1.pop(start, length); d2 = s2.pop(start, length) 2056 self.failUnlessEqual(d1, d2) 2057 #print "s1 now %s" % list(s1._dump()) 2058 #print "s2 now %s" % list(s2._dump()) 2059 self.failUnlessEqual(len(s1), len(s2)) 2060 self.failUnlessEqual(list(s1._dump()), list(s2._dump())) 2061 for j in range(100): 2062 what = md5(what[12:14]+str(j)).hexdigest() 2063 start = int(what[2:4], 16) 2064 length = max(1, int(what[5:6], 16)) 2065 d1 = s1.get(start, length); d2 = s2.get(start, length) 2066 self.failUnlessEqual(d1, d2, "%d+%d" % (start, length)) -
new file src/allmydata/util/spans.py
diff --git a/src/allmydata/util/spans.py b/src/allmydata/util/spans.py new file mode 100755 index 0000000..336fddf
- + 1 2 class Spans: 3 """I represent a compressed list of booleans, one per index (an integer). 4 Typically, each index represents an offset into a large string, pointing 5 to a specific byte of a share. In this context, True means that byte has 6 been received, or has been requested. 7 8 Another way to look at this is maintaining a set of integers, optimized 9 for operations on spans like 'add range to set' and 'is range in set?'. 10 11 This is a python equivalent of perl's Set::IntSpan module, frequently 12 used to represent .newsrc contents. 13 14 Rather than storing an actual (large) list or dictionary, I represent my 15 internal state as a sorted list of spans, each with a start and a length. 16 My API is presented in terms of start+length pairs. I provide set 17 arithmetic operators, to efficiently answer questions like 'I want bytes 18 XYZ, I already requested bytes ABC, and I've already received bytes DEF: 19 what bytes should I request now?'. 20 21 The new downloader will use it to keep track of which bytes we've requested 22 or received already. 23 """ 24 25 def __init__(self, _span_or_start=None, length=None): 26 self._spans = list() 27 if length is not None: 28 self._spans.append( (_span_or_start, length) ) 29 elif _span_or_start: 30 for (start,length) in _span_or_start: 31 self.add(start, length) 32 self._check() 33 34 def _check(self): 35 assert sorted(self._spans) == self._spans 36 prev_end = None 37 try: 38 for (start,length) in self._spans: 39 if prev_end is not None: 40 assert start > prev_end 41 prev_end = start+length 42 except AssertionError: 43 print "BAD:", self.dump() 44 raise 45 46 def add(self, start, length): 47 assert start >= 0 48 assert length > 0 49 #print " ADD [%d+%d -%d) to %s" % (start, length, start+length, self.dump()) 50 first_overlap = last_overlap = None 51 for i,(s_start,s_length) in enumerate(self._spans): 52 #print " (%d+%d)-> overlap=%s adjacent=%s" % (s_start,s_length, overlap(s_start, s_length, start, length), adjacent(s_start, s_length, start, length)) 53 if (overlap(s_start, s_length, start, length) 54 or adjacent(s_start, s_length, start, length)): 55 last_overlap = i 56 if first_overlap is None: 57 first_overlap = i 58 continue 59 # no overlap 60 if first_overlap is not None: 61 break 62 #print " first_overlap", first_overlap, last_overlap 63 if first_overlap is None: 64 # no overlap, so just insert the span and sort by starting 65 # position. 66 self._spans.insert(0, (start,length)) 67 self._spans.sort() 68 else: 69 # everything from [first_overlap] to [last_overlap] overlapped 70 first_start,first_length = self._spans[first_overlap] 71 last_start,last_length = self._spans[last_overlap] 72 newspan_start = min(start, first_start) 73 newspan_end = max(start+length, last_start+last_length) 74 newspan_length = newspan_end - newspan_start 75 newspan = (newspan_start, newspan_length) 76 self._spans[first_overlap:last_overlap+1] = [newspan] 77 #print " ADD done: %s" % self.dump() 78 self._check() 79 80 return self 81 82 def remove(self, start, length): 83 assert start >= 0 84 assert length > 0 85 #print " REMOVE [%d+%d -%d) from %s" % (start, length, start+length, self.dump()) 86 first_complete_overlap = last_complete_overlap = None 87 for i,(s_start,s_length) in enumerate(self._spans): 88 s_end = s_start + s_length 89 o = overlap(s_start, s_length, start, length) 90 if o: 91 o_start, o_length = o 92 o_end = o_start+o_length 93 if o_start == s_start and o_end == s_end: 94 # delete this span altogether 95 if first_complete_overlap is None: 96 first_complete_overlap = i 97 last_complete_overlap = i 98 elif o_start == s_start: 99 # we only overlap the left side, so trim the start 100 # 1111 101 # rrrr 102 # oo 103 # -> 11 104 new_start = o_end 105 new_end = s_end 106 assert new_start > s_start 107 new_length = new_end - new_start 108 self._spans[i] = (new_start, new_length) 109 elif o_end == s_end: 110 # we only overlap the right side 111 # 1111 112 # rrrr 113 # oo 114 # -> 11 115 new_start = s_start 116 new_end = o_start 117 assert new_end < s_end 118 new_length = new_end - new_start 119 self._spans[i] = (new_start, new_length) 120 else: 121 # we overlap the middle, so create a new span. No need to 122 # examine any other spans. 123 # 111111 124 # rr 125 # LL RR 126 left_start = s_start 127 left_end = o_start 128 left_length = left_end - left_start 129 right_start = o_end 130 right_end = s_end 131 right_length = right_end - right_start 132 self._spans[i] = (left_start, left_length) 133 self._spans.append( (right_start, right_length) ) 134 self._spans.sort() 135 break 136 if first_complete_overlap is not None: 137 del self._spans[first_complete_overlap:last_complete_overlap+1] 138 #print " REMOVE done: %s" % self.dump() 139 self._check() 140 return self 141 142 def dump(self): 143 return "len=%d: %s" % (len(self), 144 ",".join(["[%d-%d]" % (start,start+l-1) 145 for (start,l) in self._spans]) ) 146 147 def each(self): 148 for start, length in self._spans: 149 for i in range(start, start+length): 150 yield i 151 152 def __iter__(self): 153 for s in self._spans: 154 yield s 155 156 def __len__(self): 157 # this also gets us bool(s) 158 return sum([length for start,length in self._spans]) 159 160 def __add__(self, other): 161 s = self.__class__(self) 162 for (start, length) in other: 163 s.add(start, length) 164 return s 165 166 def __sub__(self, other): 167 s = self.__class__(self) 168 for (start, length) in other: 169 s.remove(start, length) 170 return s 171 172 def __iadd__(self, other): 173 for (start, length) in other: 174 self.add(start, length) 175 return self 176 177 def __isub__(self, other): 178 for (start, length) in other: 179 self.remove(start, length) 180 return self 181 182 def __contains__(self, (start,length)): 183 for span_start,span_length in self._spans: 184 o = overlap(start, length, span_start, span_length) 185 if o: 186 o_start,o_length = o 187 if o_start == start and o_length == length: 188 return True 189 return False 190 191 def overlap(start0, length0, start1, length1): 192 # return start2,length2 of the overlapping region, or None 193 # 00 00 000 0000 00 00 000 00 00 00 00 194 # 11 11 11 11 111 11 11 1111 111 11 11 195 left = max(start0, start1) 196 right = min(start0+length0, start1+length1) 197 # if there is overlap, 'left' will be its start, and right-1 will 198 # be the end' 199 if left < right: 200 return (left, right-left) 201 return None 202 203 def adjacent(start0, length0, start1, length1): 204 if (start0 < start1) and start0+length0 == start1: 205 return True 206 elif (start1 < start0) and start1+length1 == start0: 207 return True 208 return False 209 210 class DataSpans: 211 """I represent portions of a large string. Equivalently, I can be said to 212 maintain a large array of characters (with gaps of empty elements). I can 213 be used to manage access to a remote share, where some pieces have been 214 retrieved, some have been requested, and others have not been read. 215 """ 216 217 def __init__(self, other=None): 218 self.spans = [] # (start, data) tuples, non-overlapping, merged 219 if other: 220 for (start, data) in other.get_spans(): 221 self.add(start, data) 222 223 def __len__(self): 224 # return number of bytes we're holding 225 return sum([len(data) for (start,data) in self.spans]) 226 227 def _dump(self): 228 # return iterator of sorted list of offsets, one per byte 229 for (start,data) in self.spans: 230 for i in range(start, start+len(data)): 231 yield i 232 233 def get_spans(self): 234 return list(self.spans) 235 236 def assert_invariants(self): 237 if not self.spans: 238 return 239 prev_start = self.spans[0][0] 240 prev_end = prev_start + len(self.spans[0][1]) 241 for start, data in self.spans[1:]: 242 if not start > prev_end: 243 # adjacent or overlapping: bad 244 print "ASSERTION FAILED", self.spans 245 raise AssertionError 246 247 def get(self, start, length): 248 # returns a string of LENGTH, or None 249 #print "get", start, length, self.spans 250 end = start+length 251 for (s_start,s_data) in self.spans: 252 s_end = s_start+len(s_data) 253 #print " ",s_start,s_end 254 if s_start <= start < s_end: 255 # we want some data from this span. Because we maintain 256 # strictly merged and non-overlapping spans, everything we 257 # want must be in this span. 258 offset = start - s_start 259 if offset + length > len(s_data): 260 #print " None, span falls short" 261 return None # span falls short 262 #print " some", s_data[offset:offset+length] 263 return s_data[offset:offset+length] 264 if s_start >= end: 265 # we've gone too far: no further spans will overlap 266 #print " None, gone too far" 267 return None 268 #print " None, ran out of spans" 269 return None 270 271 def add(self, start, data): 272 # first: walk through existing spans, find overlap, modify-in-place 273 # create list of new spans 274 # add new spans 275 # sort 276 # merge adjacent spans 277 #print "add", start, data, self.spans 278 end = start + len(data) 279 i = 0 280 while len(data): 281 #print " loop", start, data, i, len(self.spans), self.spans 282 if i >= len(self.spans): 283 #print " append and done" 284 # append a last span 285 self.spans.append( (start, data) ) 286 break 287 (s_start,s_data) = self.spans[i] 288 # five basic cases: 289 # a: OLD b:OLDD c1:OLD c2:OLD d1:OLDD d2:OLD e: OLLDD 290 # NEW NEW NEW NEWW NEW NEW NEW 291 # 292 # we handle A by inserting a new segment (with "N") and looping, 293 # turning it into B or C. We handle B by replacing a prefix and 294 # terminating. We handle C (both c1 and c2) by replacing the 295 # segment (and, for c2, looping, turning it into A). We handle D 296 # by replacing a suffix (and, for d2, looping, turning it into 297 # A). We handle E by replacing the middle and terminating. 298 if start < s_start: 299 # case A: insert a new span, then loop with the remainder 300 #print " insert new psan" 301 s_len = s_start-start 302 self.spans.insert(i, (start, data[:s_len])) 303 i += 1 304 start = s_start 305 data = data[s_len:] 306 continue 307 s_len = len(s_data) 308 s_end = s_start+s_len 309 if s_start <= start < s_end: 310 #print " modify this span", s_start, start, s_end 311 # we want to modify some data in this span: a prefix, a 312 # suffix, or the whole thing 313 if s_start == start: 314 if s_end <= end: 315 #print " replace whole segment" 316 # case C: replace this segment 317 self.spans[i] = (s_start, data[:s_len]) 318 i += 1 319 start += s_len 320 data = data[s_len:] 321 # C2 is where len(data)>0 322 continue 323 # case B: modify the prefix, retain the suffix 324 #print " modify prefix" 325 self.spans[i] = (s_start, data + s_data[len(data):]) 326 break 327 if start > s_start and end < s_end: 328 # case E: modify the middle 329 #print " modify middle" 330 prefix_len = start - s_start # we retain this much 331 suffix_len = s_end - end # and retain this much 332 newdata = s_data[:prefix_len] + data + s_data[-suffix_len:] 333 self.spans[i] = (s_start, newdata) 334 break 335 # case D: retain the prefix, modify the suffix 336 #print " modify suffix" 337 prefix_len = start - s_start # we retain this much 338 suffix_len = s_len - prefix_len # we replace this much 339 #print " ", s_data, prefix_len, suffix_len, s_len, data 340 self.spans[i] = (s_start, 341 s_data[:prefix_len] + data[:suffix_len]) 342 i += 1 343 start += suffix_len 344 data = data[suffix_len:] 345 #print " now", start, data 346 # D2 is where len(data)>0 347 continue 348 # else we're not there yet 349 #print " still looking" 350 i += 1 351 continue 352 # now merge adjacent spans 353 #print " merging", self.spans 354 newspans = [] 355 for (s_start,s_data) in self.spans: 356 if newspans and adjacent(newspans[-1][0], len(newspans[-1][1]), 357 s_start, len(s_data)): 358 newspans[-1] = (newspans[-1][0], newspans[-1][1] + s_data) 359 else: 360 newspans.append( (s_start, s_data) ) 361 self.spans = newspans 362 self.assert_invariants() 363 #print " done", self.spans 364 365 def remove(self, start, length): 366 i = 0 367 end = start + length 368 #print "remove", start, length, self.spans 369 while i < len(self.spans): 370 (s_start,s_data) = self.spans[i] 371 if s_start >= end: 372 # this segment is entirely right of the removed region, and 373 # all further segments are even further right. We're done. 374 break 375 s_len = len(s_data) 376 s_end = s_start + s_len 377 o = overlap(start, length, s_start, s_len) 378 if not o: 379 i += 1 380 continue 381 o_start, o_len = o 382 o_end = o_start + o_len 383 if o_len == s_len: 384 # remove the whole segment 385 del self.spans[i] 386 continue 387 if o_start == s_start: 388 # remove a prefix, leaving the suffix from o_end to s_end 389 prefix_len = o_end - o_start 390 self.spans[i] = (o_end, s_data[prefix_len:]) 391 i += 1 392 continue 393 elif o_end == s_end: 394 # remove a suffix, leaving the prefix from s_start to o_start 395 prefix_len = o_start - s_start 396 self.spans[i] = (s_start, s_data[:prefix_len]) 397 i += 1 398 continue 399 # remove the middle, creating a new segment 400 # left is s_start:o_start, right is o_end:s_end 401 left_len = o_start - s_start 402 left = s_data[:left_len] 403 right_len = s_end - o_end 404 right = s_data[-right_len:] 405 self.spans[i] = (s_start, left) 406 self.spans.insert(i+1, (o_end, right)) 407 break 408 #print " done", self.spans 409 410 def pop(self, start, length): 411 data = self.get(start, length) 412 if data: 413 self.remove(start, length) 414 return data