| 1 | |
| 2 | import binascii |
| 3 | import struct |
| 4 | import copy |
| 5 | from zope.interface import implements |
| 6 | from twisted.python.failure import Failure |
| 7 | from twisted.internet import defer |
| 8 | from twisted.internet.interfaces import IPushProducer, IConsumer |
| 9 | |
| 10 | from foolscap.api import eventually |
| 11 | from allmydata.interfaces import IImmutableFileNode, IUploadResults, \ |
| 12 | NotEnoughSharesError, NoSharesError, HASH_SIZE |
| 13 | from allmydata.hashtree import IncompleteHashTree, BadHashError, \ |
| 14 | NotEnoughHashesError |
| 15 | from allmydata.util import base32, log, hashutil, mathutil, idlib |
| 16 | from allmydata.util.spans import Spans, DataSpans, overlap |
| 17 | from allmydata.util.dictutil import DictOfSets |
| 18 | from allmydata.check_results import CheckResults, CheckAndRepairResults |
| 19 | from allmydata.codec import CRSDecoder |
| 20 | from allmydata import uri |
| 21 | from pycryptopp.cipher.aes import AES |
| 22 | from download2_util import Observer2, incidentally |
| 23 | from layout import make_write_bucket_proxy |
| 24 | from checker import Checker |
| 25 | from repairer import Repairer |
| 26 | |
| 27 | (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \ |
| 28 | ("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM") |
| 29 | |
| 30 | KiB = 1024 |
| 31 | class BadSegmentNumberError(Exception): |
| 32 | pass |
| 33 | class BadSegmentError(Exception): |
| 34 | pass |
| 35 | class BadCiphertextHashError(Exception): |
| 36 | pass |
| 37 | |
| 38 | class Share: |
| 39 | """I represent a single instance of a single share (e.g. I reference the |
| 40 | shnum2 for share SI=abcde on server xy12t, not the one on server ab45q). |
| 41 | I am associated with a CommonShare that remembers data that is held in |
| 42 | common among e.g. SI=abcde/shnum2 across all servers. I am also |
| 43 | associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all |
| 44 | servers). |
| 45 | """ |
| 46 | # this is a specific implementation of IShare for tahoe's native storage |
| 47 | # servers. A different backend would use a different class. |
| 48 | |
| 49 | def __init__(self, rref, server_version, verifycap, commonshare, node, |
| 50 | peerid, shnum, logparent): |
| 51 | self._rref = rref |
| 52 | self._server_version = server_version |
| 53 | self._node = node # holds share_hash_tree and UEB |
| 54 | self._guess_offsets(verifycap, node.guessed_segment_size) |
| 55 | self.actual_offsets = None |
| 56 | self.actual_segment_size = None |
| 57 | self._UEB_length = None |
| 58 | self._commonshare = commonshare # holds block_hash_tree |
| 59 | self._peerid = peerid |
| 60 | self._peerid_s = base32.b2a(peerid)[:5] |
| 61 | self._storage_index = verifycap.storage_index |
| 62 | self._si_prefix = base32.b2a(verifycap.storage_index)[:8] |
| 63 | self._shnum = shnum |
| 64 | |
| 65 | self._lp = log.msg(format="Share(%(si)s) on server=%(server)s starting", |
| 66 | si=self._si_prefix, server=self._peerid_s, |
| 67 | level=log.NOISY, parent=logparent, umid="P7hv2w") |
| 68 | |
| 69 | self._wanted = Spans() # desired metadata |
| 70 | self._wanted_blocks = Spans() # desired block data |
| 71 | self._requested = Spans() # we've sent a request for this |
| 72 | # self._received contains data that we haven't yet used |
| 73 | self._received = DataSpans() # we've received a response for this |
| 74 | self._requested_blocks = [] # (segnum, set(observer2..)) |
| 75 | ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"] |
| 76 | self._overrun_ok = ver["tolerates-immutable-read-overrun"] |
| 77 | # If _overrun_ok and we guess the offsets correctly, we can get |
| 78 | # everything in one RTT. If _overrun_ok and we guess wrong, we might |
| 79 | # need two RTT (but we could get lucky and do it in one). If overrun |
| 80 | # is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version, |
| 81 | # 2=offset table, 3=UEB_length and everything else (hashes, block), |
| 82 | # 4=UEB. |
| 83 | |
| 84 | self._dead = False |
| 85 | |
| 86 | def not_dead(self): |
| 87 | # XXX: reconsider. If the share sees a single error, should it remain |
| 88 | # dead for all time? Or should the next segment try again? Also, |
| 89 | # 'not_dead' is a dorky method name. This DEAD state is stored |
| 90 | # elsewhere too (SegmentFetcher per-share states?) and needs to be |
| 91 | # consistent. |
| 92 | return not self._dead |
| 93 | |
| 94 | def _guess_offsets(self, verifycap, guessed_segment_size): |
| 95 | self.guessed_segment_size = guessed_segment_size |
| 96 | size = verifycap.size |
| 97 | k = verifycap.needed_shares |
| 98 | N = verifycap.total_shares |
| 99 | r = self._node._calculate_sizes(guessed_segment_size) |
| 100 | # num_segments, block_size/tail_block_size |
| 101 | # guessed_segment_size/tail_segment_size/tail_segment_padded |
| 102 | share_size = mathutil.div_ceil(size, k) |
| 103 | # share_size is the amount of block data that will be put into each |
| 104 | # share, summed over all segments. It does not include hashes, the |
| 105 | # UEB, or other overhead. |
| 106 | |
| 107 | # use the upload-side code to get this as accurate as possible |
| 108 | ht = IncompleteHashTree(N) |
| 109 | num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) |
| 110 | wbp = make_write_bucket_proxy(None, share_size, r["block_size"], |
| 111 | r["num_segments"], num_share_hashes, 0, |
| 112 | None) |
| 113 | self._fieldsize = wbp.fieldsize |
| 114 | self._fieldstruct = wbp.fieldstruct |
| 115 | self.guessed_offsets = wbp._offsets |
| 116 | |
| 117 | # called by our client, the SegmentFetcher |
| 118 | def get_block(self, segnum): |
| 119 | """Add a block number to the list of requests. This will eventually |
| 120 | result in a fetch of the data necessary to validate the block, then |
| 121 | the block itself. The fetch order is generally |
| 122 | first-come-first-served, but requests may be answered out-of-order if |
| 123 | data becomes available sooner. |
| 124 | |
| 125 | I return an Observer2, which has two uses. The first is to call |
| 126 | o.subscribe(), which gives me a place to send state changes and |
| 127 | eventually the data block. The second is o.cancel(), which removes |
| 128 | the request (if it is still active). |
| 129 | |
| 130 | I will distribute the following events through my Observer2: |
| 131 | - state=OVERDUE: ?? I believe I should have had an answer by now. |
| 132 | You may want to ask another share instead. |
| 133 | - state=BADSEGNUM: the segnum you asked for is too large. I must |
| 134 | fetch a valid UEB before I can determine this, |
| 135 | so the notification is asynchronous |
| 136 | - state=COMPLETE, block=data: here is a valid block |
| 137 | - state=CORRUPT: this share contains corrupted data |
| 138 | - state=DEAD, f=Failure: the server reported an error, this share |
| 139 | is unusable |
| 140 | """ |
| 141 | log.msg("Share(sh%d-on-%s).get_block(%d)" % |
| 142 | (self._shnum, self._peerid_s, segnum), |
| 143 | level=log.NOISY, parent=self._lp, umid="RTo9MQ") |
| 144 | assert segnum >= 0 |
| 145 | o = Observer2() |
| 146 | o.set_canceler(self._cancel_block_request) |
| 147 | for i,(segnum0,observers) in enumerate(self._requested_blocks): |
| 148 | if segnum0 == segnum: |
| 149 | observers.add(o) |
| 150 | break |
| 151 | else: |
| 152 | self._requested_blocks.append( (segnum, set([o])) ) |
| 153 | eventually(self.loop) |
| 154 | return o |
| 155 | |
| 156 | def _cancel_block_request(self, o): |
| 157 | new_requests = [] |
| 158 | for e in self._requested_blocks: |
| 159 | (segnum0, observers) = e |
| 160 | observers.discard(o) |
| 161 | if observers: |
| 162 | new_requests.append(e) |
| 163 | self._requested_blocks = new_requests |
| 164 | |
| 165 | # internal methods |
| 166 | def _active_segnum(self): |
| 167 | if self._requested_blocks: |
| 168 | return self._requested_blocks[0] |
| 169 | return None |
| 170 | |
| 171 | def _active_segnum_and_observers(self): |
| 172 | if self._requested_blocks: |
| 173 | # we only retrieve information for one segment at a time, to |
| 174 | # minimize alacrity (first come, first served) |
| 175 | return self._requested_blocks[0] |
| 176 | return None, [] |
| 177 | |
| 178 | def loop(self): |
| 179 | try: |
| 180 | # if any exceptions occur here, kill the download |
| 181 | log.msg("Share(sh%d on %s).loop, reqs=[%s], wanted=%s, requested=%s, received=%s, wanted_blocks=%s" % |
| 182 | (self._shnum, self._peerid_s, |
| 183 | ",".join([str(req[0]) for req in self._requested_blocks]), |
| 184 | self._wanted.dump(), self._requested.dump(), |
| 185 | self._received.dump(), self._wanted_blocks.dump() ), |
| 186 | level=log.NOISY, parent=self._lp, umid="BaL1zw") |
| 187 | self._do_loop() |
| 188 | except BaseException: |
| 189 | self._fail(Failure()) |
| 190 | raise |
| 191 | |
| 192 | def _do_loop(self): |
| 193 | # we are (eventually) called after all state transitions: |
| 194 | # new segments added to self._requested_blocks |
| 195 | # new data received from servers (responses to our read() calls) |
| 196 | # impatience timer fires (server appears slow) |
| 197 | |
| 198 | # First, consume all of the information that we currently have, for |
| 199 | # all the segments people currently want. |
| 200 | while self._get_satisfaction(): |
| 201 | pass |
| 202 | |
| 203 | # When we get no satisfaction (from the data we've received so far), |
| 204 | # we determine what data we desire (to satisfy more requests). The |
| 205 | # number of segments is finite, so I can't get no satisfaction |
| 206 | # forever. |
| 207 | self._desire() |
| 208 | |
| 209 | # finally send out requests for whatever we need (desire minus have). |
| 210 | # You can't always get what you want, but, sometimes, you get what |
| 211 | # you need. |
| 212 | self._request_needed() # express desire |
| 213 | |
| 214 | def _get_satisfaction(self): |
| 215 | # return True if we retired a data block, and should therefore be |
| 216 | # called again. Return False if we don't retire a data block (even if |
| 217 | # we do retire some other data, like hash chains). |
| 218 | |
| 219 | if self.actual_offsets is None: |
| 220 | if not self._satisfy_offsets(): |
| 221 | # can't even look at anything without the offset table |
| 222 | return False |
| 223 | |
| 224 | if not self._node.have_UEB: |
| 225 | if not self._satisfy_UEB(): |
| 226 | # can't check any hashes without the UEB |
| 227 | return False |
| 228 | |
| 229 | # knowing the UEB means knowing num_segments. Despite the redundancy, |
| 230 | # this is the best place to set this. CommonShare.set_numsegs will |
| 231 | # ignore duplicate calls. |
| 232 | cs = self._commonshare |
| 233 | cs.set_numsegs(self._node.num_segments) |
| 234 | |
| 235 | segnum, observers = self._active_segnum_and_observers() |
| 236 | if segnum >= self._node.num_segments: |
| 237 | for o in observers: |
| 238 | o.notify(state=BADSEGNUM) |
| 239 | self._requested_blocks.pop(0) |
| 240 | return True |
| 241 | |
| 242 | if self._node.share_hash_tree.needed_hashes(self._shnum): |
| 243 | if not self._satisfy_share_hash_tree(): |
| 244 | # can't check block_hash_tree without a root |
| 245 | return False |
| 246 | |
| 247 | if cs.need_block_hash_root(): |
| 248 | block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum) |
| 249 | cs.set_block_hash_root(block_hash_root) |
| 250 | |
| 251 | if segnum is None: |
| 252 | return False # we don't want any particular segment right now |
| 253 | |
| 254 | # block_hash_tree |
| 255 | needed_hashes = self._commonshare.get_needed_block_hashes(segnum) |
| 256 | if needed_hashes: |
| 257 | if not self._satisfy_block_hash_tree(needed_hashes): |
| 258 | # can't check block without block_hash_tree |
| 259 | return False |
| 260 | |
| 261 | # ciphertext_hash_tree |
| 262 | needed_hashes = self._node.get_needed_ciphertext_hashes(segnum) |
| 263 | if needed_hashes: |
| 264 | if not self._satisfy_ciphertext_hash_tree(needed_hashes): |
| 265 | # can't check decoded blocks without ciphertext_hash_tree |
| 266 | return False |
| 267 | |
| 268 | # data blocks |
| 269 | return self._satisfy_data_block(segnum, observers) |
| 270 | |
| 271 | def _satisfy_offsets(self): |
| 272 | version_s = self._received.get(0, 4) |
| 273 | if version_s is None: |
| 274 | return False |
| 275 | (version,) = struct.unpack(">L", version_s) |
| 276 | if version == 1: |
| 277 | table_start = 0x0c |
| 278 | self._fieldsize = 0x4 |
| 279 | self._fieldstruct = "L" |
| 280 | else: |
| 281 | table_start = 0x14 |
| 282 | self._fieldsize = 0x8 |
| 283 | self._fieldstruct = "Q" |
| 284 | offset_table_size = 6 * self._fieldsize |
| 285 | table_s = self._received.pop(table_start, offset_table_size) |
| 286 | if table_s is None: |
| 287 | return False |
| 288 | fields = struct.unpack(">"+6*self._fieldstruct, table_s) |
| 289 | offsets = {} |
| 290 | for i,field in enumerate(['data', |
| 291 | 'plaintext_hash_tree', # UNUSED |
| 292 | 'crypttext_hash_tree', |
| 293 | 'block_hashes', |
| 294 | 'share_hashes', |
| 295 | 'uri_extension', |
| 296 | ] ): |
| 297 | offsets[field] = fields[i] |
| 298 | self.actual_offsets = offsets |
| 299 | log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields)) |
| 300 | self._received.remove(0, 4) # don't need this anymore |
| 301 | return True |
| 302 | |
| 303 | def _satisfy_UEB(self): |
| 304 | o = self.actual_offsets |
| 305 | fsize = self._fieldsize |
| 306 | rdata = self._received |
| 307 | UEB_length_s = rdata.get(o["uri_extension"], fsize) |
| 308 | if not UEB_length_s: |
| 309 | return False |
| 310 | (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) |
| 311 | UEB_s = rdata.pop(o["uri_extension"]+fsize, UEB_length) |
| 312 | if not UEB_s: |
| 313 | return False |
| 314 | rdata.remove(o["uri_extension"], fsize) |
| 315 | try: |
| 316 | self._node.validate_and_store_UEB(UEB_s) |
| 317 | self.actual_segment_size = self._node.segment_size |
| 318 | assert self.actual_segment_size is not None |
| 319 | return True |
| 320 | except BadHashError: |
| 321 | # TODO: if this UEB was bad, we'll keep trying to validate it |
| 322 | # over and over again. Only log.err on the first one, or better |
| 323 | # yet skip all but the first |
| 324 | f = Failure() |
| 325 | self._signal_corruption(f, o["uri_extension"], fsize+UEB_length) |
| 326 | return False |
| 327 | |
| 328 | def _satisfy_share_hash_tree(self): |
| 329 | # the share hash chain is stored as (hashnum,hash) tuples, so you |
| 330 | # can't fetch just the pieces you need, because you don't know |
| 331 | # exactly where they are. So fetch everything, and parse the results |
| 332 | # later. |
| 333 | o = self.actual_offsets |
| 334 | rdata = self._received |
| 335 | hashlen = o["uri_extension"] - o["share_hashes"] |
| 336 | assert hashlen % (2+HASH_SIZE) == 0 |
| 337 | hashdata = rdata.get(o["share_hashes"], hashlen) |
| 338 | if not hashdata: |
| 339 | return False |
| 340 | share_hashes = {} |
| 341 | for i in range(0, hashlen, 2+HASH_SIZE): |
| 342 | (hashnum,) = struct.unpack(">H", hashdata[i:i+2]) |
| 343 | hashvalue = hashdata[i+2:i+2+HASH_SIZE] |
| 344 | share_hashes[hashnum] = hashvalue |
| 345 | try: |
| 346 | self._node.process_share_hashes(share_hashes) |
| 347 | # adds to self._node.share_hash_tree |
| 348 | rdata.remove(o["share_hashes"], hashlen) |
| 349 | return True |
| 350 | except (BadHashError, NotEnoughHashesError, IndexError): |
| 351 | f = Failure() |
| 352 | self._signal_corruption(f, o["share_hashes"], hashlen) |
| 353 | return False |
| 354 | |
| 355 | def _signal_corruption(self, f, start, offset): |
| 356 | # there was corruption somewhere in the given range |
| 357 | reason = "corruption in share[%d-%d): %s" % (start, start+offset, |
| 358 | str(f.value)) |
| 359 | self._rref.callRemoteOnly("advise_corrupt_share", "immutable", |
| 360 | self._storage_index, self._shnum, reason) |
| 361 | |
| 362 | def _satisfy_block_hash_tree(self, needed_hashes): |
| 363 | o = self.actual_offsets |
| 364 | rdata = self._received |
| 365 | block_hashes = {} |
| 366 | for hashnum in needed_hashes: |
| 367 | hashdata = rdata.get(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) |
| 368 | if hashdata: |
| 369 | block_hashes[hashnum] = hashdata |
| 370 | else: |
| 371 | return False # missing some hashes |
| 372 | # note that we don't submit any hashes to the block_hash_tree until |
| 373 | # we've gotten them all, because the hash tree will throw an |
| 374 | # exception if we only give it a partial set (which it therefore |
| 375 | # cannot validate) |
| 376 | commonshare = self._commonshare |
| 377 | ok = commonshare.process_block_hashes(block_hashes, self._peerid_s) |
| 378 | if not ok: |
| 379 | return False |
| 380 | for hashnum in needed_hashes: |
| 381 | rdata.remove(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) |
| 382 | return True |
| 383 | |
| 384 | def _satisfy_ciphertext_hash_tree(self, needed_hashes): |
| 385 | start = self.actual_offsets["crypttext_hash_tree"] |
| 386 | rdata = self._received |
| 387 | ciphertext_hashes = {} |
| 388 | for hashnum in needed_hashes: |
| 389 | hashdata = rdata.get(start+hashnum*HASH_SIZE, HASH_SIZE) |
| 390 | if hashdata: |
| 391 | ciphertext_hashes[hashnum] = hashdata |
| 392 | else: |
| 393 | return False # missing some hashes |
| 394 | # we don't submit any hashes to the ciphertext_hash_tree until we've |
| 395 | # gotten them all |
| 396 | ok = self._node.process_ciphertext_hashes(ciphertext_hashes, |
| 397 | self._shnum, self._peerid_s) |
| 398 | if not ok: |
| 399 | return False |
| 400 | for hashnum in needed_hashes: |
| 401 | rdata.remove(start+hashnum*HASH_SIZE, HASH_SIZE) |
| 402 | return True |
| 403 | |
| 404 | def _satisfy_data_block(self, segnum, observers): |
| 405 | tail = (segnum == self._node.num_segments-1) |
| 406 | datastart = self.actual_offsets["data"] |
| 407 | blockstart = datastart + segnum * self._node.block_size |
| 408 | blocklen = self._node.block_size |
| 409 | if tail: |
| 410 | blocklen = self._node.tail_block_size |
| 411 | |
| 412 | rdata = self._received |
| 413 | block = rdata.pop(blockstart, blocklen) |
| 414 | if not block: |
| 415 | return False |
| 416 | # this block is being retired, either as COMPLETE or CORRUPT, since |
| 417 | # no further data reads will help |
| 418 | assert self._requested_blocks[0][0] == segnum |
| 419 | commonshare = self._commonshare |
| 420 | ok = commonshare.check_block(segnum, block, self._peerid_s) |
| 421 | if ok: |
| 422 | for o in observers: |
| 423 | # goes to SegmentFetcher._block_request_activity |
| 424 | o.notify(state=COMPLETE, block=block) |
| 425 | else: |
| 426 | for o in observers: |
| 427 | o.notify(state=CORRUPT) |
| 428 | self._requested_blocks.pop(0) # retired |
| 429 | return True # got satisfaction |
| 430 | |
| 431 | def _desire(self): |
| 432 | segnum, observers = self._active_segnum_and_observers() |
| 433 | commonshare = self._commonshare |
| 434 | |
| 435 | if not self.actual_offsets: |
| 436 | self._desire_offsets() |
| 437 | |
| 438 | # we can use guessed offsets as long as this server tolerates overrun |
| 439 | if not self.actual_offsets and not self._overrun_ok: |
| 440 | return # must wait for the offsets to arrive |
| 441 | |
| 442 | o = self.actual_offsets or self.guessed_offsets |
| 443 | segsize = self.actual_segment_size or self.guessed_segment_size |
| 444 | if not self._node.have_UEB: |
| 445 | self._desire_UEB(o) |
| 446 | |
| 447 | if self._node.share_hash_tree.needed_hashes(self._shnum): |
| 448 | hashlen = o["uri_extension"] - o["share_hashes"] |
| 449 | self._wanted.add(o["share_hashes"], hashlen) |
| 450 | |
| 451 | if segnum is None: |
| 452 | return # only need block hashes or blocks for active segments |
| 453 | |
| 454 | # block hash chain |
| 455 | for hashnum in commonshare.get_needed_block_hashes(segnum): |
| 456 | self._wanted.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE) |
| 457 | |
| 458 | # ciphertext hash chain |
| 459 | for hashnum in self._node.get_needed_ciphertext_hashes(segnum): |
| 460 | self._wanted.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE) |
| 461 | |
| 462 | # data |
| 463 | r = self._node._calculate_sizes(segsize) |
| 464 | tail = (segnum == r["num_segments"]) |
| 465 | datastart = o["data"] |
| 466 | blockstart = datastart + segnum * r["block_size"] |
| 467 | blocklen = r["block_size"] |
| 468 | if tail: |
| 469 | blocklen = r["tail_block_size"] |
| 470 | self._wanted_blocks.add(blockstart, blocklen) |
| 471 | |
| 472 | |
| 473 | def _desire_offsets(self): |
| 474 | if self._overrun_ok: |
| 475 | # easy! this includes version number, sizes, and offsets |
| 476 | self._wanted.add(0,1024) |
| 477 | return |
| 478 | |
| 479 | # v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44). |
| 480 | # To be conservative, only request the data that we know lives there, |
| 481 | # even if that means more roundtrips. |
| 482 | |
| 483 | self._wanted.add(0,4) # version number, always safe |
| 484 | version_s = self._received.get(0, 4) |
| 485 | if not version_s: |
| 486 | return |
| 487 | (version,) = struct.unpack(">L", version_s) |
| 488 | if version == 1: |
| 489 | table_start = 0x0c |
| 490 | fieldsize = 0x4 |
| 491 | else: |
| 492 | table_start = 0x14 |
| 493 | fieldsize = 0x8 |
| 494 | offset_table_size = 6 * fieldsize |
| 495 | self._wanted.add(table_start, offset_table_size) |
| 496 | |
| 497 | def _desire_UEB(self, o): |
| 498 | # UEB data is stored as (length,data). |
| 499 | rdata = self._received |
| 500 | if self._overrun_ok: |
| 501 | # We can pre-fetch 2kb, which should probably cover it. If it |
| 502 | # turns out to be larger, we'll come back here later with a known |
| 503 | # length and fetch the rest. |
| 504 | self._wanted.add(o["uri_extension"], 2048) |
| 505 | # now, while that is probably enough to fetch the whole UEB, it |
| 506 | # might not be, so we need to do the next few steps as well. In |
| 507 | # most cases, the following steps will not actually add anything |
| 508 | # to self._wanted |
| 509 | |
| 510 | self._wanted.add(o["uri_extension"], self._fieldsize) |
| 511 | # only use a length if we're sure it's correct, otherwise we'll |
| 512 | # probably fetch a huge number |
| 513 | if not self.actual_offsets: |
| 514 | return |
| 515 | UEB_length_s = rdata.get(o["uri_extension"], self._fieldsize) |
| 516 | if UEB_length_s: |
| 517 | (UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s) |
| 518 | # we know the length, so make sure we grab everything |
| 519 | self._wanted.add(o["uri_extension"]+self._fieldsize, UEB_length) |
| 520 | |
| 521 | def _request_needed(self): |
| 522 | # send requests for metadata first, to avoid hanging on to large data |
| 523 | # blocks any longer than necessary. |
| 524 | received = self._received.get_spans() |
| 525 | self._send_requests(self._wanted - received - self._requested) |
| 526 | # then send requests for data blocks. All the hashes should arrive |
| 527 | # before the blocks, so the blocks can be consumed and released in a |
| 528 | # single turn. |
| 529 | self._send_requests(self._wanted_blocks - received - self._requested) |
| 530 | |
| 531 | def _send_requests(self, needed): |
| 532 | for (start, length) in needed: |
| 533 | # TODO: quantize to reasonably-large blocks |
| 534 | self._requested.add(start, length) |
| 535 | lp = log.msg(format="_send_request(sh%(shnum)d-on-%(peerid)s)" |
| 536 | " [%(start)d:+%(length)d]", |
| 537 | shnum=self._shnum, peerid=self._peerid_s, |
| 538 | start=start, length=length, |
| 539 | level=log.NOISY, parent=self._lp, umid="sgVAyA") |
| 540 | d = self._send_request(start, length) |
| 541 | d.addCallback(self._got_data, start, length, lp) |
| 542 | d.addErrback(self._got_error, start, length, lp) |
| 543 | d.addCallback(incidentally, eventually, self.loop) |
| 544 | d.addErrback(lambda f: |
| 545 | log.err(format="unhandled error during send_request", |
| 546 | failure=f, parent=self._lp, |
| 547 | level=log.WEIRD, umid="qZu0wg")) |
| 548 | |
| 549 | def _send_request(self, start, length): |
| 550 | return self._rref.callRemote("read", start, length) |
| 551 | |
| 552 | def _got_data(self, data, start, length, lp): |
| 553 | log.msg(format="_got_data [%(start)d:+%(length)d] -> %(datalen)d", |
| 554 | start=start, length=length, datalen=len(data), |
| 555 | level=log.NOISY, parent=lp, umid="sgVAyA") |
| 556 | span = (start, length) |
| 557 | assert span in self._requested |
| 558 | self._requested.remove(start, length) |
| 559 | self._received.add(start, data) |
| 560 | |
| 561 | def _got_error(self, f, start, length, lp): |
| 562 | log.msg(format="error requesting %(start)d+%(length)d" |
| 563 | " from %(server)s for si %(si)s", |
| 564 | start=start, length=length, |
| 565 | server=self._peerid_s, si=self._si_prefix, |
| 566 | failure=f, parent=lp, level=log.UNUSUAL, umid="qZu0wg") |
| 567 | # retire our observers, assuming we won't be able to make any |
| 568 | # further progress |
| 569 | self._fail(f) |
| 570 | |
| 571 | def _fail(self, f): |
| 572 | log.msg(format="abandoning Share(sh%(shnum)d-on-%(peerid)s", |
| 573 | failure=f, shnum=self._shnum, peerid=self._peerid_s, |
| 574 | level=log.UNUSUAL, parent=self._lp, umid="JKM2Og") |
| 575 | self._dead = True |
| 576 | for (segnum, observers) in self._requested_blocks: |
| 577 | for o in observers: |
| 578 | o.notify(state=DEAD, f=f) |
| 579 | |
| 580 | |
| 581 | class CommonShare: |
| 582 | """I hold data that is common across all instances of a single share, |
| 583 | like sh2 on both servers A and B. This is just the block hash tree. |
| 584 | """ |
| 585 | def __init__(self, guessed_numsegs, si_prefix, shnum, logparent): |
| 586 | self.si_prefix = si_prefix |
| 587 | self.shnum = shnum |
| 588 | # in the beginning, before we have the real UEB, we can only guess at |
| 589 | # the number of segments. But we want to ask for block hashes early. |
| 590 | # So if we're asked for which block hashes are needed before we know |
| 591 | # numsegs for sure, we return a guess. |
| 592 | self._block_hash_tree = IncompleteHashTree(guessed_numsegs) |
| 593 | self._know_numsegs = False |
| 594 | self._logparent = logparent |
| 595 | |
| 596 | def set_numsegs(self, numsegs): |
| 597 | if self._know_numsegs: |
| 598 | return |
| 599 | self._block_hash_tree = IncompleteHashTree(numsegs) |
| 600 | self._know_numsegs = True |
| 601 | |
| 602 | def need_block_hash_root(self): |
| 603 | return bool(not self._block_hash_tree[0]) |
| 604 | |
| 605 | def set_block_hash_root(self, roothash): |
| 606 | lp = log.msg("CommonShare.set_block_hash_root: %s" % repr(roothash), |
| 607 | level=log.NOISY, parent=self._logparent, umid="wwG5Gw") |
| 608 | self._block_hash_tree.set_hashes({0: roothash}) |
| 609 | log.msg("done with set_block_hash_root", |
| 610 | level=log.NOISY, parent=lp, umid="xjWKcw") |
| 611 | |
| 612 | def get_needed_block_hashes(self, segnum): |
| 613 | needed = ",".join([str(n) for n in sorted(self._block_hash_tree.needed_hashes(segnum))]) |
| 614 | log.msg("CommonShare.get_needed_block_hashes: segnum=%d needs %s" % |
| 615 | (segnum, needed), |
| 616 | level=log.NOISY, parent=self._logparent, umid="6qTMnw") |
| 617 | # XXX: include_leaf=True needs thought: how did the old downloader do |
| 618 | # it? I think it grabbed *all* block hashes and set them all at once. |
| 619 | # Since we want to fetch less data, we either need to fetch the leaf |
| 620 | # too, or wait to set the block hashes until we've also received the |
| 621 | # block itself, so we can hash it too, and set the chain+leaf all at |
| 622 | # the same time. |
| 623 | return self._block_hash_tree.needed_hashes(segnum, include_leaf=True) |
| 624 | |
| 625 | def process_block_hashes(self, block_hashes, serverid_s): |
| 626 | assert self._know_numsegs |
| 627 | try: |
| 628 | self._block_hash_tree.set_hashes(block_hashes) |
| 629 | return True |
| 630 | except (BadHashError, NotEnoughHashesError): |
| 631 | hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())]) |
| 632 | log.msg(format="hash failure in block_hashes=(%(hashnums)s)," |
| 633 | " shnum=%(shnum)d SI=%(si)s server=%(server)s", |
| 634 | hashnums=hashnums, shnum=self.shnum, |
| 635 | si=self.si_prefix, server=serverid_s, failure=Failure(), |
| 636 | level=log.WEIRD, parent=self._logparent, umid="yNyFdA") |
| 637 | return False |
| 638 | |
| 639 | def check_block(self, segnum, block, serverid_s): |
| 640 | assert self._know_numsegs |
| 641 | h = hashutil.block_hash(block) |
| 642 | try: |
| 643 | self._block_hash_tree.set_hashes(leaves={segnum: h}) |
| 644 | return True |
| 645 | except (BadHashError, NotEnoughHashesError): |
| 646 | self.log(format="hash failure in block %(segnum)d," |
| 647 | " shnum=%(shnum)d SI=%(si)s server=%(server)s", |
| 648 | segnum=segnum, shnum=self.shnum, si=self.si_prefix, |
| 649 | server=serverid_s, failure=Failure(), |
| 650 | level=log.WEIRD, parent=self._logparent, umid="mZjkqA") |
| 651 | return False |
| 652 | |
| 653 | # all classes are also Services, and the rule is that you don't initiate more |
| 654 | # work unless self.running |
| 655 | |
| 656 | # GC: decide whether each service is restartable or not. For non-restartable |
| 657 | # services, stopService() should delete a lot of attributes to kill reference |
| 658 | # cycles. The primary goal is to decref remote storage BucketReaders when a |
| 659 | # download is complete. |
| 660 | |
| 661 | class SegmentFetcher: |
| 662 | """I am responsible for acquiring blocks for a single segment. I will use |
| 663 | the Share instances passed to my add_shares() method to locate, retrieve, |
| 664 | and validate those blocks. I expect my parent node to call my |
| 665 | no_more_shares() method when there are no more shares available. I will |
| 666 | call my parent's want_more_shares() method when I want more: I expect to |
| 667 | see at least one call to add_shares or no_more_shares afterwards. |
| 668 | |
| 669 | When I have enough validated blocks, I will call my parent's |
| 670 | process_blocks() method with a dictionary that maps shnum to blockdata. |
| 671 | If I am unable to provide enough blocks, I will call my parent's |
| 672 | fetch_failed() method with (self, f). After either of these events, I |
| 673 | will shut down and do no further work. My parent can also call my stop() |
| 674 | method to have me shut down early.""" |
| 675 | |
| 676 | def __init__(self, node, segnum, k): |
| 677 | self._node = node # _Node |
| 678 | self.segnum = segnum |
| 679 | self._k = k |
| 680 | self._shares = {} # maps non-dead Share instance to a state, one of |
| 681 | # (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT). |
| 682 | # State transition map is: |
| 683 | # AVAILABLE -(send-read)-> PENDING |
| 684 | # PENDING -(timer)-> OVERDUE |
| 685 | # PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM |
| 686 | # OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM |
| 687 | # If a share becomes DEAD, it is removed from the |
| 688 | # dict. If it becomes BADSEGNUM, the whole fetch is |
| 689 | # terminated. |
| 690 | self._share_observers = {} # maps Share to Observer2 for active ones |
| 691 | self._shnums = DictOfSets() # maps shnum to the shares that provide it |
| 692 | self._blocks = {} # maps shnum to validated block data |
| 693 | self._no_more_shares = False |
| 694 | self._bad_segnum = False |
| 695 | self._last_failure = None |
| 696 | self._running = True |
| 697 | |
| 698 | def stop(self): |
| 699 | self._cancel_all_requests() |
| 700 | self._running = False |
| 701 | del self._shares # let GC work # ??? XXX |
| 702 | |
| 703 | |
| 704 | # called by our parent _Node |
| 705 | |
| 706 | def add_shares(self, shares): |
| 707 | # called when ShareFinder locates a new share, and when a non-initial |
| 708 | # segment fetch is started and we already know about shares from the |
| 709 | # previous segment |
| 710 | for s in shares: |
| 711 | self._shares[s] = AVAILABLE |
| 712 | self._shnums.add(s._shnum, s) |
| 713 | eventually(self.loop) |
| 714 | |
| 715 | def no_more_shares(self): |
| 716 | # ShareFinder tells us it's reached the end of its list |
| 717 | self._no_more_shares = True |
| 718 | eventually(self.loop) |
| 719 | |
| 720 | # internal methods |
| 721 | |
| 722 | def _count_shnums(self, *states): |
| 723 | """shnums for which at least one state is in the following list""" |
| 724 | shnums = [] |
| 725 | for shnum,shares in self._shnums.iteritems(): |
| 726 | matches = [s for s in shares if self._shares[s] in states] |
| 727 | if matches: |
| 728 | shnums.append(shnum) |
| 729 | return len(shnums) |
| 730 | |
| 731 | def loop(self): |
| 732 | try: |
| 733 | # if any exception occurs here, kill the download |
| 734 | self._do_loop() |
| 735 | except BaseException: |
| 736 | self._node.fetch_failed(self, Failure()) |
| 737 | raise |
| 738 | |
| 739 | def _do_loop(self): |
| 740 | k = self._k |
| 741 | if not self._running: |
| 742 | return |
| 743 | if self._bad_segnum: |
| 744 | # oops, we were asking for a segment number beyond the end of the |
| 745 | # file. This is an error. |
| 746 | self.stop() |
| 747 | e = BadSegmentNumberError("%d > %d" % (self.segnum, |
| 748 | self._node.num_segments)) |
| 749 | f = Failure(e) |
| 750 | self._node.fetch_failed(self, f) |
| 751 | return |
| 752 | |
| 753 | # are we done? |
| 754 | if self._count_shnums(COMPLETE) >= k: |
| 755 | # yay! |
| 756 | self.stop() |
| 757 | self._node.process_blocks(self.segnum, self._blocks) |
| 758 | return |
| 759 | |
| 760 | # we may have exhausted everything |
| 761 | if (self._no_more_shares and |
| 762 | self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k): |
| 763 | # no more new shares are coming, and the remaining hopeful shares |
| 764 | # aren't going to be enough. boo! |
| 765 | self.stop() |
| 766 | |
| 767 | if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0: |
| 768 | format = ("no shares (need %(k)d)." |
| 769 | " Last failure: %(last_failure)s") |
| 770 | args = { "k": k, |
| 771 | "last_failure": self._last_failure } |
| 772 | error = NoSharesError |
| 773 | else: |
| 774 | format = ("ran out of shares: %(complete)d complete," |
| 775 | " %(pending)d pending, %(overdue)d overdue," |
| 776 | " %(unused)d unused, need %(k)d." |
| 777 | " Last failure: %(last_failure)s") |
| 778 | args = {"complete": self._count_shnums(COMPLETE), |
| 779 | "pending": self._count_shnums(PENDING), |
| 780 | "overdue": self._count_shnums(OVERDUE), |
| 781 | # 'unused' should be zero |
| 782 | "unused": self._count_shnums(AVAILABLE), |
| 783 | "k": k, |
| 784 | "last_failure": self._last_failure, |
| 785 | } |
| 786 | error = NotEnoughSharesError |
| 787 | log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args) |
| 788 | e = error(format % args) |
| 789 | f = Failure(e) |
| 790 | self._node.fetch_failed(self, f) |
| 791 | return |
| 792 | |
| 793 | # nope, not done. Are we "block-hungry" (i.e. do we want to send out |
| 794 | # more read requests, or do we think we have enough in flight |
| 795 | # already?) |
| 796 | while self._count_shnums(PENDING, COMPLETE) < k: |
| 797 | # we're hungry.. are there any unused shares? |
| 798 | sent = self._send_new_request() |
| 799 | if not sent: |
| 800 | break |
| 801 | |
| 802 | # ok, now are we "share-hungry" (i.e. do we have enough known shares |
| 803 | # to make us happy, or should we ask the ShareFinder to get us more?) |
| 804 | if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k: |
| 805 | # we're hungry for more shares |
| 806 | self._node.want_more_shares() |
| 807 | # that will trigger the ShareFinder to keep looking |
| 808 | |
| 809 | def _find_one(self, shares, state): |
| 810 | # TODO could choose fastest |
| 811 | for s in shares: |
| 812 | if self._shares[s] == state: |
| 813 | return s |
| 814 | raise IndexError("shouldn't get here") |
| 815 | |
| 816 | def _send_new_request(self): |
| 817 | for shnum,shares in self._shnums.iteritems(): |
| 818 | states = [self._shares[s] for s in shares] |
| 819 | if COMPLETE in states or PENDING in states: |
| 820 | # don't send redundant requests |
| 821 | continue |
| 822 | if AVAILABLE not in states: |
| 823 | # no candidates for this shnum, move on |
| 824 | continue |
| 825 | # here's a candidate. Send a request. |
| 826 | s = self._find_one(shares, AVAILABLE) |
| 827 | self._shares[s] = PENDING |
| 828 | self._share_observers[s] = o = s.get_block(self.segnum) |
| 829 | o.subscribe(self._block_request_activity, share=s, shnum=shnum) |
| 830 | # TODO: build up a list of candidates, then walk through the |
| 831 | # list, sending requests to the most desireable servers, |
| 832 | # re-checking our block-hunger each time. For non-initial segment |
| 833 | # fetches, this would let us stick with faster servers. |
| 834 | return True |
| 835 | # nothing was sent: don't call us again until you have more shares to |
| 836 | # work with, or one of the existing shares has been declared OVERDUE |
| 837 | return False |
| 838 | |
| 839 | def _cancel_all_requests(self): |
| 840 | for o in self._share_observers.values(): |
| 841 | o.cancel() |
| 842 | self._share_observers = {} |
| 843 | |
| 844 | def _block_request_activity(self, share, shnum, state, block=None, f=None): |
| 845 | # called by Shares, in response to our s.send_request() calls. |
| 846 | # COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. |
| 847 | if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM): |
| 848 | del self._share_observers[share] |
| 849 | if state is COMPLETE: |
| 850 | # 'block' is fully validated |
| 851 | self._shares[share] = COMPLETE |
| 852 | self._blocks[shnum] = block |
| 853 | elif state is OVERDUE: |
| 854 | self._shares[share] = OVERDUE |
| 855 | # OVERDUE is not terminal: it will eventually transition to |
| 856 | # COMPLETE, CORRUPT, or DEAD. |
| 857 | elif state is CORRUPT: |
| 858 | self._shares[share] = CORRUPT |
| 859 | elif state is DEAD: |
| 860 | del self._shares[share] |
| 861 | self._shnums[shnum].remove(share) |
| 862 | self._last_failure = f |
| 863 | elif state is BADSEGNUM: |
| 864 | self._shares[share] = BADSEGNUM # ??? |
| 865 | self._bad_segnum = True |
| 866 | eventually(self.loop) |
| 867 | |
| 868 | |
| 869 | class RequestToken: |
| 870 | def __init__(self, peerid): |
| 871 | self.peerid = peerid |
| 872 | |
| 873 | class ShareFinder: |
| 874 | def __init__(self, storage_broker, verifycap, node, logparent=None, |
| 875 | max_outstanding_requests=10): |
| 876 | self.running = True |
| 877 | self.verifycap = verifycap |
| 878 | s = storage_broker.get_servers_for_index(verifycap.storage_index) |
| 879 | self._servers = iter(s) |
| 880 | self.share_consumer = self.node = node |
| 881 | self.max_outstanding_requests = max_outstanding_requests |
| 882 | |
| 883 | self._hungry = False |
| 884 | |
| 885 | self._commonshares = {} # shnum to CommonShare instance |
| 886 | self.undelivered_shares = [] |
| 887 | self.pending_requests = set() |
| 888 | |
| 889 | self._storage_index = verifycap.storage_index |
| 890 | self._si_prefix = base32.b2a_l(self._storage_index[:8], 60) |
| 891 | self._node_logparent = logparent |
| 892 | self._lp = log.msg(format="ShareFinder[si=%(si)s] starting", |
| 893 | si=self._si_prefix, |
| 894 | level=log.NOISY, parent=logparent, umid="2xjj2A") |
| 895 | |
| 896 | def log(self, *args, **kwargs): |
| 897 | if "parent" not in kwargs: |
| 898 | kwargs["parent"] = self._lp |
| 899 | return log.msg(*args, **kwargs) |
| 900 | |
| 901 | def stop(self): |
| 902 | self.running = False |
| 903 | |
| 904 | # called by our parent CiphertextDownloader |
| 905 | def hungry(self): |
| 906 | self.log(format="ShareFinder[si=%(si)s] hungry", |
| 907 | si=self._si_prefix, level=log.NOISY, umid="NywYaQ") |
| 908 | self._hungry = True |
| 909 | eventually(self.loop) |
| 910 | |
| 911 | # internal methods |
| 912 | def loop(self): |
| 913 | undelivered_s = ",".join(["sh%d@%s" % |
| 914 | (s._shnum, idlib.shortnodeid_b2a(s._peerid)) |
| 915 | for s in self.undelivered_shares]) |
| 916 | pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid) |
| 917 | for rt in self.pending_requests]) # sort? |
| 918 | self.log(format="ShareFinder loop: running=%(running)s" |
| 919 | " hungry=%(hungry)s, undelivered=%(undelivered)s," |
| 920 | " pending=%(pending)s", |
| 921 | running=self.running, hungry=self._hungry, |
| 922 | undelivered=undelivered_s, pending=pending_s, |
| 923 | level=log.NOISY, umid="kRtS4Q") |
| 924 | if not self.running: |
| 925 | return |
| 926 | if not self._hungry: |
| 927 | return |
| 928 | if self.undelivered_shares: |
| 929 | sh = self.undelivered_shares.pop(0) |
| 930 | # they will call hungry() again if they want more |
| 931 | self._hungry = False |
| 932 | self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)", |
| 933 | shnum=sh._shnum, peerid=sh._peerid_s, |
| 934 | level=log.NOISY, umid="2n1qQw") |
| 935 | eventually(self.share_consumer.got_shares, [sh]) |
| 936 | return |
| 937 | if len(self.pending_requests) >= self.max_outstanding_requests: |
| 938 | # cannot send more requests, must wait for some to retire |
| 939 | return |
| 940 | |
| 941 | server = None |
| 942 | try: |
| 943 | if self._servers: |
| 944 | server = self._servers.next() |
| 945 | except StopIteration: |
| 946 | self._servers = None |
| 947 | |
| 948 | if server: |
| 949 | self.send_request(server) |
| 950 | return |
| 951 | |
| 952 | if self.pending_requests: |
| 953 | # no server, but there are still requests in flight: maybe one of |
| 954 | # them will make progress |
| 955 | return |
| 956 | |
| 957 | self.log(format="ShareFinder.loop: no_more_shares, ever", |
| 958 | level=log.UNUSUAL, umid="XjQlzg") |
| 959 | # we've run out of servers (so we can't send any more requests), and |
| 960 | # we have nothing in flight. No further progress can be made. They |
| 961 | # are destined to remain hungry. |
| 962 | self.share_consumer.no_more_shares() |
| 963 | self.stop() |
| 964 | |
| 965 | def send_request(self, server): |
| 966 | peerid, rref = server |
| 967 | req = RequestToken(peerid) |
| 968 | self.pending_requests.add(req) |
| 969 | lp = self.log(format="sending DYHB to [%(peerid)s]", |
| 970 | peerid=idlib.shortnodeid_b2a(peerid), |
| 971 | level=log.NOISY, umid="Io7pyg") |
| 972 | d = rref.callRemote("get_buckets", self._storage_index) |
| 973 | d.addBoth(incidentally, self.pending_requests.discard, req) |
| 974 | d.addCallbacks(self._got_response, self._got_error, |
| 975 | callbackArgs=(rref.version, peerid, req, lp), |
| 976 | errbackArgs=(peerid, req, lp)) |
| 977 | d.addErrback(log.err, format="error in send_request", |
| 978 | level=log.WEIRD, parent=lp, umid="rpdV0w") |
| 979 | d.addCallback(incidentally, eventually, self.loop) |
| 980 | |
| 981 | def _got_response(self, buckets, server_version, peerid, req, lp): |
| 982 | if buckets: |
| 983 | shnums_s = ",".join([str(shnum) for shnum in buckets]) |
| 984 | self.log(format="got shnums [%(shnums)s] from [%(peerid)s]", |
| 985 | shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid), |
| 986 | level=log.NOISY, parent=lp, umid="0fcEZw") |
| 987 | else: |
| 988 | self.log(format="no shares from [%(peerid)s]", |
| 989 | peerid=idlib.shortnodeid_b2a(peerid), |
| 990 | level=log.NOISY, parent=lp, umid="U7d4JA") |
| 991 | if self.node.num_segments is None: |
| 992 | best_numsegs = self.node.guessed_num_segments |
| 993 | else: |
| 994 | best_numsegs = self.node.num_segments |
| 995 | for shnum, bucket in buckets.iteritems(): |
| 996 | if shnum in self._commonshares: |
| 997 | cs = self._commonshares[shnum] |
| 998 | else: |
| 999 | cs = CommonShare(best_numsegs, self._si_prefix, shnum, |
| 1000 | self._node_logparent) |
| 1001 | # Share._get_satisfaction is responsible for updating |
| 1002 | # CommonShare.set_numsegs after we know the UEB. Alternatives: |
| 1003 | # 1: d = self.node.get_num_segments() |
| 1004 | # d.addCallback(cs.got_numsegs) |
| 1005 | # the problem is that the OneShotObserverList I was using |
| 1006 | # inserts an eventual-send between _get_satisfaction's |
| 1007 | # _satisfy_UEB and _satisfy_block_hash_tree, and the |
| 1008 | # CommonShare didn't get the num_segs message before |
| 1009 | # being asked to set block hash values. To resolve this |
| 1010 | # would require an immediate ObserverList instead of |
| 1011 | # an eventual-send -based one |
| 1012 | # 2: break _get_satisfaction into Deferred-attached pieces. |
| 1013 | # Yuck. |
| 1014 | self._commonshares[shnum] = cs |
| 1015 | s = Share(bucket, server_version, self.verifycap, cs, self.node, |
| 1016 | peerid, shnum, self._node_logparent) |
| 1017 | self.undelivered_shares.append(s) |
| 1018 | |
| 1019 | def _got_error(self, f, peerid, req, lp): |
| 1020 | self.log(format="got error from [%(peerid)s]", |
| 1021 | peerid=idlib.shortnodeid_b2a(peerid), failure=f, |
| 1022 | level=log.UNUSUAL, parent=lp, umid="zUKdCw") |
| 1023 | |
| 1024 | |
| 1025 | |
| 1026 | class Segmentation: |
| 1027 | """I am responsible for a single offset+size read of the file. I handle |
| 1028 | segmentation: I figure out which segments are necessary, request them |
| 1029 | (from my CiphertextDownloader) in order, and trim the segments down to |
| 1030 | match the offset+size span. I use the Producer/Consumer interface to only |
| 1031 | request one segment at a time. |
| 1032 | """ |
| 1033 | implements(IPushProducer) |
| 1034 | def __init__(self, node, offset, size, consumer, logparent=None): |
| 1035 | self._node = node |
| 1036 | self._hungry = True |
| 1037 | self._active_segnum = None |
| 1038 | self._cancel_segment_request = None |
| 1039 | # these are updated as we deliver data. At any given time, we still |
| 1040 | # want to download file[offset:offset+size] |
| 1041 | self._offset = offset |
| 1042 | self._size = size |
| 1043 | self._consumer = consumer |
| 1044 | self._lp = logparent |
| 1045 | |
| 1046 | def start(self): |
| 1047 | self._alive = True |
| 1048 | self._deferred = defer.Deferred() |
| 1049 | self._consumer.registerProducer(self, True) |
| 1050 | self._maybe_fetch_next() |
| 1051 | return self._deferred |
| 1052 | |
| 1053 | def _maybe_fetch_next(self): |
| 1054 | if not self._alive or not self._hungry: |
| 1055 | return |
| 1056 | if self._active_segnum is not None: |
| 1057 | return |
| 1058 | self._fetch_next() |
| 1059 | |
| 1060 | def _fetch_next(self): |
| 1061 | if self._size == 0: |
| 1062 | # done! |
| 1063 | self._alive = False |
| 1064 | self._hungry = False |
| 1065 | self._consumer.unregisterProducer() |
| 1066 | self._deferred.callback(self._consumer) |
| 1067 | return |
| 1068 | n = self._node |
| 1069 | have_actual_segment_size = n.segment_size is not None |
| 1070 | segment_size = n.segment_size or n.guessed_segment_size |
| 1071 | if self._offset == 0: |
| 1072 | # great! we want segment0 for sure |
| 1073 | wanted_segnum = 0 |
| 1074 | else: |
| 1075 | # this might be a guess |
| 1076 | wanted_segnum = self._offset // segment_size |
| 1077 | log.msg(format="_fetch_next(offset=%(offset)d) wants segnum=%(segnum)d", |
| 1078 | offset=self._offset, segnum=wanted_segnum, |
| 1079 | level=log.NOISY, parent=self._lp, umid="5WfN0w") |
| 1080 | self._active_segnum = wanted_segnum |
| 1081 | d,c = n.get_segment(wanted_segnum, self._lp) |
| 1082 | self._cancel_segment_request = c |
| 1083 | d.addBoth(self._request_retired) |
| 1084 | d.addCallback(self._got_segment, have_actual_segment_size, |
| 1085 | wanted_segnum) |
| 1086 | d.addErrback(self._retry_bad_segment, have_actual_segment_size) |
| 1087 | d.addErrback(self._error) |
| 1088 | |
| 1089 | def _request_retired(self, res): |
| 1090 | self._active_segnum = None |
| 1091 | self._cancel_segment_request = None |
| 1092 | return res |
| 1093 | |
| 1094 | def _got_segment(self, (segment_start,segment), had_actual_segment_size, |
| 1095 | wanted_segnum): |
| 1096 | self._cancel_segment_request = None |
| 1097 | # we got file[segment_start:segment_start+len(segment)] |
| 1098 | # we want file[self._offset:self._offset+self._size] |
| 1099 | log.msg(format="Segmentation got data:" |
| 1100 | " want [%(wantstart)d-%(wantend)d)," |
| 1101 | " given [%(segstart)d-%(segend)d), for segnum=%(segnum)d", |
| 1102 | wantstart=self._offset, wantend=self._offset+self._size, |
| 1103 | segstart=segment_start, segend=segment_start+len(segment), |
| 1104 | segnum=wanted_segnum, |
| 1105 | level=log.OPERATIONAL, parent=self._lp, umid="32dHcg") |
| 1106 | |
| 1107 | o = overlap(segment_start, len(segment), self._offset, self._size) |
| 1108 | # the overlap is file[o[0]:o[0]+o[1]] |
| 1109 | if not o or o[0] != self._offset: |
| 1110 | # we didn't get the first byte, so we can't use this segment |
| 1111 | if self._node.segment_size is not None: |
| 1112 | # and we should have gotten it right. This is big problem. |
| 1113 | log.msg("Segmentation handed wrong data (but we knew better):" |
| 1114 | " want [%d-%d), given [%d-%d), for segnum=%d," |
| 1115 | " for si=%s" |
| 1116 | % (self._offset, self._offset+self._size, |
| 1117 | segment_start, segment_start+len(segment), |
| 1118 | wanted_segnum, self._node._si_prefix), |
| 1119 | level=log.WEIRD, parent=self._lp, umid="STlIiA") |
| 1120 | raise BadSegmentError("Despite knowing the segment size," |
| 1121 | " I was given the wrong data." |
| 1122 | " I cannot cope.") |
| 1123 | # we've wasted some bandwidth, but now we can grab the right one, |
| 1124 | # because we should know the segsize by now. |
| 1125 | assert self._node.segment_size is not None |
| 1126 | self._maybe_fetch_next() |
| 1127 | return |
| 1128 | offset_in_segment = self._offset - segment_start |
| 1129 | desired_data = segment[offset_in_segment:offset_in_segment+o[1]] |
| 1130 | |
| 1131 | self._offset += len(desired_data) |
| 1132 | self._size -= len(desired_data) |
| 1133 | self._consumer.write(desired_data) |
| 1134 | # the consumer might call our .pauseProducing() inside that write() |
| 1135 | # call, setting self._hungry=False |
| 1136 | self._maybe_fetch_next() |
| 1137 | |
| 1138 | def _retry_bad_segment(self, f, had_actual_segment_size): |
| 1139 | f.trap(BadSegmentNumberError) # guessed way wrong, off the end |
| 1140 | if had_actual_segment_size: |
| 1141 | # but we should have known better, so this is a real error |
| 1142 | return f |
| 1143 | # we didn't know better: try again with more information |
| 1144 | return self._maybe_fetch_next() |
| 1145 | |
| 1146 | def _error(self, f): |
| 1147 | log.msg("Error in Segmentation", |
| 1148 | level=log.WEIRD, parent=self._lp, umid="EYlXBg") |
| 1149 | self._alive = False |
| 1150 | self._hungry = False |
| 1151 | self._consumer.unregisterProducer() |
| 1152 | self._deferred.errback(f) |
| 1153 | |
| 1154 | def stopProducing(self): |
| 1155 | self._hungry = False |
| 1156 | self._alive = False |
| 1157 | # cancel any outstanding segment request |
| 1158 | if self._cancel_segment_request: |
| 1159 | self._cancel_segment_request() |
| 1160 | self._cancel_segment_request = None |
| 1161 | def pauseProducing(self): |
| 1162 | self._hungry = False |
| 1163 | def resumeProducing(self): |
| 1164 | self._hungry = True |
| 1165 | eventually(self._maybe_fetch_next) |
| 1166 | |
| 1167 | class Cancel: |
| 1168 | def __init__(self, f): |
| 1169 | self._f = f |
| 1170 | self.cancelled = False |
| 1171 | def cancel(self): |
| 1172 | if not self.cancelled: |
| 1173 | self.cancelled = True |
| 1174 | self._f(self) |
| 1175 | |
| 1176 | class _Node: |
| 1177 | """Internal class which manages downloads and holds state. External |
| 1178 | callers use CiphertextFileNode instead.""" |
| 1179 | |
| 1180 | # Share._node points to me |
| 1181 | def __init__(self, verifycap, storage_broker, secret_holder, |
| 1182 | terminator, history): |
| 1183 | assert isinstance(verifycap, uri.CHKFileVerifierURI) |
| 1184 | self._verifycap = verifycap |
| 1185 | self._storage_broker = storage_broker |
| 1186 | self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60) |
| 1187 | self.running = True |
| 1188 | terminator.register(self) # calls self.stop() at stopService() |
| 1189 | # the rules are: |
| 1190 | # 1: Only send network requests if you're active (self.running is True) |
| 1191 | # 2: Use TimerService, not reactor.callLater |
| 1192 | # 3: You can do eventual-sends any time. |
| 1193 | # These rules should mean that once |
| 1194 | # stopService()+flushEventualQueue() fires, everything will be done. |
| 1195 | self._secret_holder = secret_holder |
| 1196 | self._history = history |
| 1197 | |
| 1198 | k, N = self._verifycap.needed_shares, self._verifycap.total_shares |
| 1199 | self.share_hash_tree = IncompleteHashTree(N) |
| 1200 | |
| 1201 | # we guess the segment size, so Segmentation can pull non-initial |
| 1202 | # segments in a single roundtrip |
| 1203 | max_segment_size = 128*KiB # TODO: pull from elsewhere, maybe the |
| 1204 | # same place as upload.BaseUploadable |
| 1205 | s = mathutil.next_multiple(min(verifycap.size, max_segment_size), k) |
| 1206 | self.guessed_segment_size = s |
| 1207 | r = self._calculate_sizes(self.guessed_segment_size) |
| 1208 | self.guessed_num_segments = r["num_segments"] |
| 1209 | # as with CommonShare, our ciphertext_hash_tree is a stub until we |
| 1210 | # get the real num_segments |
| 1211 | self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments) |
| 1212 | |
| 1213 | # filled in when we parse a valid UEB |
| 1214 | self.have_UEB = False |
| 1215 | self.segment_size = None |
| 1216 | self.tail_segment_size = None |
| 1217 | self.tail_segment_padded = None |
| 1218 | self.num_segments = None |
| 1219 | self.block_size = None |
| 1220 | self.tail_block_size = None |
| 1221 | #self.ciphertext_hash_tree = None # size depends on num_segments |
| 1222 | self.ciphertext_hash = None # flat hash, optional |
| 1223 | |
| 1224 | # things to track callers that want data |
| 1225 | |
| 1226 | # _segment_requests can have duplicates |
| 1227 | self._segment_requests = [] # (segnum, d, cancel_handle) |
| 1228 | self._active_segment = None # a SegmentFetcher, with .segnum |
| 1229 | |
| 1230 | # we create one top-level logparent for this _Node, and another one |
| 1231 | # for each read() call. Segmentation and get_segment() messages are |
| 1232 | # associated with the read() call, everything else is tied to the |
| 1233 | # _Node's log entry. |
| 1234 | lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d," |
| 1235 | " guessed_segsize=%(guessed_segsize)d," |
| 1236 | " guessed_numsegs=%(guessed_numsegs)d", |
| 1237 | si=self._si_prefix, size=verifycap.size, |
| 1238 | guessed_segsize=self.guessed_segment_size, |
| 1239 | guessed_numsegs=self.guessed_num_segments, |
| 1240 | level=log.OPERATIONAL, umid="uJ0zAQ") |
| 1241 | self._lp = lp |
| 1242 | |
| 1243 | self._sharefinder = ShareFinder(storage_broker, verifycap, self, lp) |
| 1244 | self._shares = set() |
| 1245 | |
| 1246 | def stop(self): |
| 1247 | # called by the Terminator at shutdown, mostly for tests |
| 1248 | if self._active_segment: |
| 1249 | self._active_segment.stop() |
| 1250 | self._active_segment = None |
| 1251 | self._sharefinder.stop() |
| 1252 | |
| 1253 | # things called by outside callers, via CiphertextFileNode. get_segment() |
| 1254 | # may also be called by Segmentation. |
| 1255 | |
| 1256 | def read(self, consumer, offset=0, size=None): |
| 1257 | """I am the main entry point, from which FileNode.read() can get |
| 1258 | data. I feed the consumer with the desired range of ciphertext. I |
| 1259 | return a Deferred that fires (with the consumer) when the read is |
| 1260 | finished. |
| 1261 | |
| 1262 | Note that there is no notion of a 'file pointer': each call to read() |
| 1263 | uses an independent offset= value.""" |
| 1264 | # for concurrent operations: each gets its own Segmentation manager |
| 1265 | if size is None: |
| 1266 | size = self._verifycap.size |
| 1267 | # clip size so offset+size does not go past EOF |
| 1268 | size = min(size, self._verifycap.size-offset) |
| 1269 | lp = log.msg(format="imm Node(%(si)s.read(%(offset)d, %(size)d)", |
| 1270 | si=base32.b2a(self._verifycap.storage_index)[:8], |
| 1271 | offset=offset, size=size, |
| 1272 | level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww") |
| 1273 | sp = self._history.stats_provider |
| 1274 | sp.count("downloader.files_downloaded", 1) # really read() calls |
| 1275 | sp.count("downloader.bytes_downloaded", size) |
| 1276 | s = Segmentation(self, offset, size, consumer, lp) |
| 1277 | # this raises an interesting question: what segments to fetch? if |
| 1278 | # offset=0, always fetch the first segment, and then allow |
| 1279 | # Segmentation to be responsible for pulling the subsequent ones if |
| 1280 | # the first wasn't large enough. If offset>0, we're going to need an |
| 1281 | # extra roundtrip to get the UEB (and therefore the segment size) |
| 1282 | # before we can figure out which segment to get. TODO: allow the |
| 1283 | # offset-table-guessing code (which starts by guessing the segsize) |
| 1284 | # to assist the offset>0 process. |
| 1285 | d = s.start() |
| 1286 | return d |
| 1287 | |
| 1288 | def get_segment(self, segnum, logparent=None): |
| 1289 | """Begin downloading a segment. I return a tuple (d, c): 'd' is a |
| 1290 | Deferred that fires with (offset,data) when the desired segment is |
| 1291 | available, and c is an object on which c.cancel() can be called to |
| 1292 | disavow interest in the segment (after which 'd' will never fire). |
| 1293 | |
| 1294 | You probably need to know the segment size before calling this, |
| 1295 | unless you want the first few bytes of the file. If you ask for a |
| 1296 | segment number which turns out to be too large, the Deferred will |
| 1297 | errback with BadSegmentNumberError. |
| 1298 | |
| 1299 | The Deferred fires with the offset of the first byte of the data |
| 1300 | segment, so that you can call get_segment() before knowing the |
| 1301 | segment size, and still know which data you received. |
| 1302 | |
| 1303 | The Deferred can also errback with other fatal problems, such as |
| 1304 | NotEnoughSharesError, NoSharesError, or BadCiphertextHashError. |
| 1305 | """ |
| 1306 | log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)", |
| 1307 | si=base32.b2a(self._verifycap.storage_index)[:8], |
| 1308 | segnum=segnum, |
| 1309 | level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ") |
| 1310 | d = defer.Deferred() |
| 1311 | c = Cancel(self._cancel_request) |
| 1312 | self._segment_requests.append( (segnum, d, c) ) |
| 1313 | self._start_new_segment() |
| 1314 | return (d, c) |
| 1315 | |
| 1316 | # things called by the Segmentation object used to transform |
| 1317 | # arbitrary-sized read() calls into quantized segment fetches |
| 1318 | |
| 1319 | def _start_new_segment(self): |
| 1320 | if self._active_segment is None and self._segment_requests: |
| 1321 | segnum = self._segment_requests[0][0] |
| 1322 | k = self._verifycap.needed_shares |
| 1323 | self._active_segment = fetcher = SegmentFetcher(self, segnum, k) |
| 1324 | active_shares = [s for s in self._shares if s.not_dead()] |
| 1325 | fetcher.add_shares(active_shares) # this triggers the loop |
| 1326 | |
| 1327 | |
| 1328 | # called by our child ShareFinder |
| 1329 | def got_shares(self, shares): |
| 1330 | self._shares.update(shares) |
| 1331 | if self._active_segment: |
| 1332 | self._active_segment.add_shares(shares) |
| 1333 | def no_more_shares(self): |
| 1334 | self._no_more_shares = True |
| 1335 | if self._active_segment: |
| 1336 | self._active_segment.no_more_shares() |
| 1337 | |
| 1338 | # things called by our Share instances |
| 1339 | |
| 1340 | def validate_and_store_UEB(self, UEB_s): |
| 1341 | log.msg("validate_and_store_UEB", |
| 1342 | level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw") |
| 1343 | h = hashutil.uri_extension_hash(UEB_s) |
| 1344 | if h != self._verifycap.uri_extension_hash: |
| 1345 | raise hashutil.BadHashError |
| 1346 | UEB_dict = uri.unpack_extension(UEB_s) |
| 1347 | self._parse_and_store_UEB(UEB_dict) # sets self._stuff |
| 1348 | # TODO: a malformed (but authentic) UEB could throw an assertion in |
| 1349 | # _parse_and_store_UEB, and we should abandon the download. |
| 1350 | self.have_UEB = True |
| 1351 | |
| 1352 | def _parse_and_store_UEB(self, d): |
| 1353 | # Note: the UEB contains needed_shares and total_shares. These are |
| 1354 | # redundant and inferior (the filecap contains the authoritative |
| 1355 | # values). However, because it is possible to encode the same file in |
| 1356 | # multiple ways, and the encoders might choose (poorly) to use the |
| 1357 | # same key for both (therefore getting the same SI), we might |
| 1358 | # encounter shares for both types. The UEB hashes will be different, |
| 1359 | # however, and we'll disregard the "other" encoding's shares as |
| 1360 | # corrupted. |
| 1361 | |
| 1362 | # therefore, we ignore d['total_shares'] and d['needed_shares']. |
| 1363 | |
| 1364 | log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s", |
| 1365 | ueb=repr(d), vcap=self._verifycap.to_string(), |
| 1366 | level=log.NOISY, parent=self._lp, umid="cVqZnA") |
| 1367 | |
| 1368 | k, N = self._verifycap.needed_shares, self._verifycap.total_shares |
| 1369 | |
| 1370 | self.segment_size = d['segment_size'] |
| 1371 | |
| 1372 | r = self._calculate_sizes(self.segment_size) |
| 1373 | self.tail_segment_size = r["tail_segment_size"] |
| 1374 | self.tail_segment_padded = r["tail_segment_padded"] |
| 1375 | self.num_segments = r["num_segments"] |
| 1376 | self.block_size = r["block_size"] |
| 1377 | self.tail_block_size = r["tail_block_size"] |
| 1378 | log.msg("actual sizes: %s" % (r,), |
| 1379 | level=log.NOISY, parent=self._lp, umid="PY6P5Q") |
| 1380 | |
| 1381 | # zfec.Decode() instantiation is fast, but still, let's use the same |
| 1382 | # codec instance for all but the last segment. 3-of-10 takes 15us on |
| 1383 | # my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is |
| 1384 | # 2.5ms, worst-case 254-of-255 is 9.3ms |
| 1385 | self._codec = CRSDecoder() |
| 1386 | self._codec.set_params(self.segment_size, k, N) |
| 1387 | |
| 1388 | |
| 1389 | # Ciphertext hash tree root is mandatory, so that there is at most |
| 1390 | # one ciphertext that matches this read-cap or verify-cap. The |
| 1391 | # integrity check on the shares is not sufficient to prevent the |
| 1392 | # original encoder from creating some shares of file A and other |
| 1393 | # shares of file B. |
| 1394 | self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments) |
| 1395 | self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']}) |
| 1396 | |
| 1397 | self.share_hash_tree.set_hashes({0: d['share_root_hash']}) |
| 1398 | |
| 1399 | # crypttext_hash is optional. We only pull this from the first UEB |
| 1400 | # that we see. |
| 1401 | if 'crypttext_hash' in d: |
| 1402 | if len(d["crypttext_hash"]) == hashutil.CRYPTO_VAL_SIZE: |
| 1403 | self.ciphertext_hash = d['crypttext_hash'] |
| 1404 | else: |
| 1405 | log.msg("ignoring bad-length UEB[crypttext_hash], " |
| 1406 | "got %d bytes, want %d" % (len(d['crypttext_hash']), |
| 1407 | hashutil.CRYPTO_VAL_SIZE), |
| 1408 | level=log.WEIRD, parent=self._lp, umid="oZkGLA") |
| 1409 | |
| 1410 | # Our job is a fast download, not verification, so we ignore any |
| 1411 | # redundant fields. The Verifier uses a different code path which |
| 1412 | # does not ignore them. |
| 1413 | |
| 1414 | def _calculate_sizes(self, segment_size): |
| 1415 | # segments of ciphertext |
| 1416 | size = self._verifycap.size |
| 1417 | k = self._verifycap.needed_shares |
| 1418 | |
| 1419 | # this assert matches the one in encode.py:127 inside |
| 1420 | # Encoded._got_all_encoding_parameters, where the UEB is constructed |
| 1421 | assert segment_size % k == 0 |
| 1422 | |
| 1423 | # the last segment is usually short. We don't store a whole segsize, |
| 1424 | # but we do pad the segment up to a multiple of k, because the |
| 1425 | # encoder requires that. |
| 1426 | tail_segment_size = size % segment_size |
| 1427 | if tail_segment_size == 0: |
| 1428 | tail_segment_size = segment_size |
| 1429 | padded = mathutil.next_multiple(tail_segment_size, k) |
| 1430 | tail_segment_padded = padded |
| 1431 | |
| 1432 | num_segments = mathutil.div_ceil(size, segment_size) |
| 1433 | |
| 1434 | # each segment is turned into N blocks. All but the last are of size |
| 1435 | # block_size, and the last is of size tail_block_size |
| 1436 | block_size = segment_size / k |
| 1437 | tail_block_size = tail_segment_padded / k |
| 1438 | |
| 1439 | return { "tail_segment_size": tail_segment_size, |
| 1440 | "tail_segment_padded": tail_segment_padded, |
| 1441 | "num_segments": num_segments, |
| 1442 | "block_size": block_size, |
| 1443 | "tail_block_size": tail_block_size, |
| 1444 | } |
| 1445 | |
| 1446 | |
| 1447 | def process_share_hashes(self, share_hashes): |
| 1448 | self.share_hash_tree.set_hashes(share_hashes) |
| 1449 | |
| 1450 | def get_needed_ciphertext_hashes(self, segnum): |
| 1451 | cht = self.ciphertext_hash_tree |
| 1452 | return cht.needed_hashes(segnum, include_leaf=True) |
| 1453 | def process_ciphertext_hashes(self, hashes, shnum, serverid_s): |
| 1454 | assert self.num_segments is not None |
| 1455 | try: |
| 1456 | self.ciphertext_hash_tree.set_hashes(hashes) |
| 1457 | return True |
| 1458 | except (BadHashError, NotEnoughHashesError): |
| 1459 | hashnums = ",".join([str(n) for n in sorted(hashes.keys())]) |
| 1460 | log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s)," |
| 1461 | " shnum=%(shnum)d SI=%(si)s server=%(server)s", |
| 1462 | hashnums=hashnums, shnum=shnum, |
| 1463 | si=self._si_prefix, server=serverid_s, failure=Failure(), |
| 1464 | level=log.WEIRD, parent=self._lp, umid="iZI0TA") |
| 1465 | return False |
| 1466 | |
| 1467 | # called by our child SegmentFetcher |
| 1468 | |
| 1469 | def want_more_shares(self): |
| 1470 | self._sharefinder.hungry() |
| 1471 | |
| 1472 | def fetch_failed(self, sf, f): |
| 1473 | assert sf is self._active_segment |
| 1474 | self._active_segment = None |
| 1475 | # deliver error upwards |
| 1476 | for (d,c) in self._extract_requests(sf.segnum): |
| 1477 | eventually(self._deliver, d, c, f) |
| 1478 | |
| 1479 | def process_blocks(self, segnum, blocks): |
| 1480 | d = defer.maybeDeferred(self._decode_blocks, segnum, blocks) |
| 1481 | d.addCallback(self._check_ciphertext_hash, segnum) |
| 1482 | def _deliver(result): |
| 1483 | log.msg(format="delivering segment(%(segnum)d)", |
| 1484 | segnum=segnum, |
| 1485 | level=log.OPERATIONAL, parent=self._lp, |
| 1486 | umid="j60Ojg") |
| 1487 | for (d,c) in self._extract_requests(segnum): |
| 1488 | eventually(self._deliver, d, c, result) |
| 1489 | self._active_segment = None |
| 1490 | self._start_new_segment() |
| 1491 | d.addBoth(_deliver) |
| 1492 | d.addErrback(lambda f: |
| 1493 | log.err("unhandled error during process_blocks", |
| 1494 | failure=f, level=log.WEIRD, |
| 1495 | parent=self._lp, umid="MkEsCg")) |
| 1496 | |
| 1497 | def _decode_blocks(self, segnum, blocks): |
| 1498 | tail = (segnum == self.num_segments-1) |
| 1499 | codec = self._codec |
| 1500 | block_size = self.block_size |
| 1501 | decoded_size = self.segment_size |
| 1502 | if tail: |
| 1503 | # account for the padding in the last segment |
| 1504 | codec = CRSDecoder() |
| 1505 | k, N = self._verifycap.needed_shares, self._verifycap.total_shares |
| 1506 | codec.set_params(self.tail_segment_padded, k, N) |
| 1507 | block_size = self.tail_block_size |
| 1508 | decoded_size = self.tail_segment_padded |
| 1509 | |
| 1510 | shares = [] |
| 1511 | shareids = [] |
| 1512 | for (shareid, share) in blocks.iteritems(): |
| 1513 | assert len(share) == block_size |
| 1514 | shareids.append(shareid) |
| 1515 | shares.append(share) |
| 1516 | del blocks |
| 1517 | |
| 1518 | d = codec.decode(shares, shareids) # segment |
| 1519 | del shares |
| 1520 | def _process(buffers): |
| 1521 | segment = "".join(buffers) |
| 1522 | assert len(segment) == decoded_size |
| 1523 | del buffers |
| 1524 | if tail: |
| 1525 | segment = segment[:self.tail_segment_size] |
| 1526 | return segment |
| 1527 | d.addCallback(_process) |
| 1528 | return d |
| 1529 | |
| 1530 | def _check_ciphertext_hash(self, segment, segnum): |
| 1531 | assert self._active_segment.segnum == segnum |
| 1532 | assert self.segment_size is not None |
| 1533 | offset = segnum * self.segment_size |
| 1534 | |
| 1535 | h = hashutil.crypttext_segment_hash(segment) |
| 1536 | try: |
| 1537 | self.ciphertext_hash_tree.set_hashes(leaves={segnum: h}) |
| 1538 | return (offset, segment) |
| 1539 | except (BadHashError, NotEnoughHashesError): |
| 1540 | format = ("hash failure in ciphertext_hash_tree:" |
| 1541 | " segnum=%(segnum)d, SI=%(si)s") |
| 1542 | log.msg(format=format, segnum=segnum, si=self._si_prefix, |
| 1543 | failure=Failure(), |
| 1544 | level=log.WEIRD, parent=self._lp, umid="MTwNnw") |
| 1545 | # this is especially weird, because we made it past the share |
| 1546 | # hash tree. It implies that we're using the wrong encoding, or |
| 1547 | # that the uploader deliberately constructed a bad UEB. |
| 1548 | msg = format % {"segnum": segnum, "si": self._si_prefix} |
| 1549 | raise BadCiphertextHashError(msg) |
| 1550 | |
| 1551 | def _deliver(self, d, c, result): |
| 1552 | # this method exists to handle cancel() that occurs between |
| 1553 | # _got_segment and _deliver |
| 1554 | if not c.cancelled: |
| 1555 | d.callback(result) # might actually be an errback |
| 1556 | |
| 1557 | def _extract_requests(self, segnum): |
| 1558 | """Remove matching requests and return their (d,c) tuples so that the |
| 1559 | caller can retire them.""" |
| 1560 | retire = [(d,c) for (segnum0, d, c) in self._segment_requests |
| 1561 | if segnum0 == segnum] |
| 1562 | self._segment_requests = [t for t in self._segment_requests |
| 1563 | if t[0] != segnum] |
| 1564 | return retire |
| 1565 | |
| 1566 | def _cancel_request(self, c): |
| 1567 | self._segment_requests = [t for t in self._segment_requests |
| 1568 | if t[2] != c] |
| 1569 | segnums = [segnum for (segnum,d,c) in self._segment_requests] |
| 1570 | if self._active_segment.segnum not in segnums: |
| 1571 | self._active_segment.stop() |
| 1572 | self._active_segment = None |
| 1573 | self._start_new_segment() |
| 1574 | |
| 1575 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
| 1576 | verifycap = self._verifycap |
| 1577 | storage_index = verifycap.storage_index |
| 1578 | sb = self._storage_broker |
| 1579 | servers = sb.get_all_servers() |
| 1580 | sh = self._secret_holder |
| 1581 | |
| 1582 | c = Checker(verifycap=verifycap, servers=servers, |
| 1583 | verify=verify, add_lease=add_lease, secret_holder=sh, |
| 1584 | monitor=monitor) |
| 1585 | d = c.start() |
| 1586 | def _maybe_repair(cr): |
| 1587 | crr = CheckAndRepairResults(storage_index) |
| 1588 | crr.pre_repair_results = cr |
| 1589 | if cr.is_healthy(): |
| 1590 | crr.post_repair_results = cr |
| 1591 | return defer.succeed(crr) |
| 1592 | else: |
| 1593 | crr.repair_attempted = True |
| 1594 | crr.repair_successful = False # until proven successful |
| 1595 | def _gather_repair_results(ur): |
| 1596 | assert IUploadResults.providedBy(ur), ur |
| 1597 | # clone the cr (check results) to form the basis of the |
| 1598 | # prr (post-repair results) |
| 1599 | prr = CheckResults(cr.uri, cr.storage_index) |
| 1600 | prr.data = copy.deepcopy(cr.data) |
| 1601 | |
| 1602 | sm = prr.data['sharemap'] |
| 1603 | assert isinstance(sm, DictOfSets), sm |
| 1604 | sm.update(ur.sharemap) |
| 1605 | servers_responding = set(prr.data['servers-responding']) |
| 1606 | servers_responding.union(ur.sharemap.iterkeys()) |
| 1607 | prr.data['servers-responding'] = list(servers_responding) |
| 1608 | prr.data['count-shares-good'] = len(sm) |
| 1609 | prr.data['count-good-share-hosts'] = len(sm) |
| 1610 | is_healthy = bool(len(sm) >= verifycap.total_shares) |
| 1611 | is_recoverable = bool(len(sm) >= verifycap.needed_shares) |
| 1612 | prr.set_healthy(is_healthy) |
| 1613 | prr.set_recoverable(is_recoverable) |
| 1614 | crr.repair_successful = is_healthy |
| 1615 | prr.set_needs_rebalancing(len(sm) >= verifycap.total_shares) |
| 1616 | |
| 1617 | crr.post_repair_results = prr |
| 1618 | return crr |
| 1619 | def _repair_error(f): |
| 1620 | # as with mutable repair, I'm not sure if I want to pass |
| 1621 | # through a failure or not. TODO |
| 1622 | crr.repair_successful = False |
| 1623 | crr.repair_failure = f |
| 1624 | return f |
| 1625 | r = Repairer(storage_broker=sb, secret_holder=sh, |
| 1626 | verifycap=verifycap, monitor=monitor) |
| 1627 | d = r.start() |
| 1628 | d.addCallbacks(_gather_repair_results, _repair_error) |
| 1629 | return d |
| 1630 | |
| 1631 | d.addCallback(_maybe_repair) |
| 1632 | return d |
| 1633 | |
| 1634 | def check(self, monitor, verify=False, add_lease=False): |
| 1635 | verifycap = self._verifycap |
| 1636 | sb = self._storage_broker |
| 1637 | servers = sb.get_all_servers() |
| 1638 | sh = self._secret_holder |
| 1639 | |
| 1640 | v = Checker(verifycap=verifycap, servers=servers, |
| 1641 | verify=verify, add_lease=add_lease, secret_holder=sh, |
| 1642 | monitor=monitor) |
| 1643 | return v.start() |
| 1644 | |
| 1645 | class CiphertextFileNode: |
| 1646 | def __init__(self, verifycap, storage_broker, secret_holder, |
| 1647 | terminator, history): |
| 1648 | assert isinstance(verifycap, uri.CHKFileVerifierURI) |
| 1649 | self._node = _Node(verifycap, storage_broker, secret_holder, |
| 1650 | terminator, history) |
| 1651 | |
| 1652 | def read(self, consumer, offset=0, size=None): |
| 1653 | """I am the main entry point, from which FileNode.read() can get |
| 1654 | data. I feed the consumer with the desired range of ciphertext. I |
| 1655 | return a Deferred that fires (with the consumer) when the read is |
| 1656 | finished.""" |
| 1657 | return self._node.read(consumer, offset, size) |
| 1658 | |
| 1659 | def get_segment(self, segnum): |
| 1660 | """Begin downloading a segment. I return a tuple (d, c): 'd' is a |
| 1661 | Deferred that fires with (offset,data) when the desired segment is |
| 1662 | available, and c is an object on which c.cancel() can be called to |
| 1663 | disavow interest in the segment (after which 'd' will never fire). |
| 1664 | |
| 1665 | You probably need to know the segment size before calling this, |
| 1666 | unless you want the first few bytes of the file. If you ask for a |
| 1667 | segment number which turns out to be too large, the Deferred will |
| 1668 | errback with BadSegmentNumberError. |
| 1669 | |
| 1670 | The Deferred fires with the offset of the first byte of the data |
| 1671 | segment, so that you can call get_segment() before knowing the |
| 1672 | segment size, and still know which data you received. |
| 1673 | """ |
| 1674 | return self._node.get_segment(segnum) |
| 1675 | |
| 1676 | def raise_error(self): |
| 1677 | pass |
| 1678 | |
| 1679 | |
| 1680 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
| 1681 | return self._node.check_and_repair(monitor, verify, add_lease) |
| 1682 | def check(self, monitor, verify=False, add_lease=False): |
| 1683 | return self._node.check(monitor, verify, add_lease) |
| 1684 | |
| 1685 | |
| 1686 | class DecryptingConsumer: |
| 1687 | """I sit between a CiphertextDownloader (which acts as a Producer) and |
| 1688 | the real Consumer, decrypting everything that passes by. The real |
| 1689 | Consumer sees the real Producer, but the Producer sees us instead of the |
| 1690 | real consumer.""" |
| 1691 | implements(IConsumer) |
| 1692 | |
| 1693 | def __init__(self, consumer, readkey, offset): |
| 1694 | self._consumer = consumer |
| 1695 | # TODO: pycryptopp CTR-mode needs random-access operations: I want |
| 1696 | # either a=AES(readkey, offset) or better yet both of: |
| 1697 | # a=AES(readkey, offset=0) |
| 1698 | # a.process(ciphertext, offset=xyz) |
| 1699 | # For now, we fake it with the existing iv= argument. |
| 1700 | offset_big = offset // 16 |
| 1701 | offset_small = offset % 16 |
| 1702 | iv = binascii.unhexlify("%032x" % offset_big) |
| 1703 | self._decryptor = AES(readkey, iv=iv) |
| 1704 | self._decryptor.process("\x00"*offset_small) |
| 1705 | |
| 1706 | def registerProducer(self, producer, streaming): |
| 1707 | # this passes through, so the real consumer can flow-control the real |
| 1708 | # producer. Therefore we don't need to provide any IPushProducer |
| 1709 | # methods. We implement all the IConsumer methods as pass-throughs, |
| 1710 | # and only intercept write() to perform decryption. |
| 1711 | self._consumer.registerProducer(producer, streaming) |
| 1712 | def unregisterProducer(self): |
| 1713 | self._consumer.unregisterProducer() |
| 1714 | def write(self, ciphertext): |
| 1715 | plaintext = self._decryptor.process(ciphertext) |
| 1716 | self._consumer.write(plaintext) |
| 1717 | |
| 1718 | class ImmutableFileNode: |
| 1719 | implements(IImmutableFileNode) |
| 1720 | |
| 1721 | # I wrap a CiphertextFileNode with a decryption key |
| 1722 | def __init__(self, filecap, storage_broker, secret_holder, terminator, |
| 1723 | history): |
| 1724 | assert isinstance(filecap, uri.CHKFileURI) |
| 1725 | verifycap = filecap.get_verify_cap() |
| 1726 | self._cnode = CiphertextFileNode(verifycap, storage_broker, |
| 1727 | secret_holder, terminator, history) |
| 1728 | assert isinstance(filecap, uri.CHKFileURI) |
| 1729 | self.u = filecap |
| 1730 | self._readkey = filecap.key |
| 1731 | |
| 1732 | def read(self, consumer, offset=0, size=None): |
| 1733 | decryptor = DecryptingConsumer(consumer, self._readkey, offset) |
| 1734 | d = self._cnode.read(decryptor, offset, size) |
| 1735 | d.addCallback(lambda dc: consumer) |
| 1736 | return d |
| 1737 | |
| 1738 | def raise_error(self): |
| 1739 | pass |
| 1740 | |
| 1741 | def get_write_uri(self): |
| 1742 | return None |
| 1743 | |
| 1744 | def get_readonly_uri(self): |
| 1745 | return self.get_uri() |
| 1746 | |
| 1747 | def get_uri(self): |
| 1748 | return self.u.to_string() |
| 1749 | def get_cap(self): |
| 1750 | return self.u |
| 1751 | def get_readcap(self): |
| 1752 | return self.u.get_readonly() |
| 1753 | def get_verify_cap(self): |
| 1754 | return self.u.get_verify_cap() |
| 1755 | def get_repair_cap(self): |
| 1756 | # CHK files can be repaired with just the verifycap |
| 1757 | return self.u.get_verify_cap() |
| 1758 | |
| 1759 | def get_storage_index(self): |
| 1760 | return self.u.get_storage_index() |
| 1761 | |
| 1762 | def get_size(self): |
| 1763 | return self.u.get_size() |
| 1764 | def get_current_size(self): |
| 1765 | return defer.succeed(self.get_size()) |
| 1766 | |
| 1767 | def is_mutable(self): |
| 1768 | return False |
| 1769 | |
| 1770 | def is_readonly(self): |
| 1771 | return True |
| 1772 | |
| 1773 | def is_unknown(self): |
| 1774 | return False |
| 1775 | |
| 1776 | def is_allowed_in_immutable_directory(self): |
| 1777 | return True |
| 1778 | |
| 1779 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
| 1780 | return self._cnode.check_and_repair(monitor, verify, add_lease) |
| 1781 | def check(self, monitor, verify=False, add_lease=False): |
| 1782 | return self._cnode.check(monitor, verify, add_lease) |
| 1783 | |
| 1784 | # TODO: if server1 has all shares, and server2-10 have one each, make the |
| 1785 | # loop stall slightly before requesting all shares from the first server, to |
| 1786 | # give it a chance to learn about the other shares and get some diversity. |
| 1787 | # Or, don't bother, let the first block all come from one server, and take |
| 1788 | # comfort in the fact that we'll learn about the other servers by the time we |
| 1789 | # fetch the second block. |
| 1790 | # |
| 1791 | # davidsarah points out that we could use sequential (instead of parallel) |
| 1792 | # fetching of multiple block from a single server: by the time the first |
| 1793 | # block arrives, we'll hopefully have heard about other shares. This would |
| 1794 | # induce some RTT delays (i.e. lose pipelining) in the case that this server |
| 1795 | # has the only shares, but that seems tolerable. We could rig it to only use |
| 1796 | # sequential requests on the first segment. |
| 1797 | |
| 1798 | # as a query gets later, we're more willing to duplicate work. |
| 1799 | |
| 1800 | # should change server read protocol to allow small shares to be fetched in a |
| 1801 | # single RTT. Instead of get_buckets-then-read, just use read(shnums, readv), |
| 1802 | # where shnums=[] means all shares, and the return value is a dict of |
| 1803 | # # shnum->ta (like with mutable files). The DYHB query should also fetch the |
| 1804 | # offset table, since everything else can be located once we have that. |
| 1805 | |
| 1806 | |
| 1807 | # ImmutableFileNode |
| 1808 | # DecryptingConsumer |
| 1809 | # CiphertextFileNode |
| 1810 | # Segmentation |
| 1811 | # ShareFinder |
| 1812 | # SegmentFetcher[segnum] (one at a time) |
| 1813 | # CommonShare[shnum] |
| 1814 | # Share[shnum,server] |
| 1815 | |
| 1816 | # TODO: when we learn numsegs, any get_segment() calls for bad blocknumbers |
| 1817 | # should be failed with BadSegmentNumberError. But should this be the |
| 1818 | # responsibility of CiphertextFileNode, or SegmentFetcher? The knowledge will |
| 1819 | # first appear when a Share receives a valid UEB and calls |
| 1820 | # CiphertextFileNode.validate_UEB, then _parse_UEB. The SegmentFetcher is |
| 1821 | # expecting to hear from the Share, via the _block_request_activity observer. |
| 1822 | |
| 1823 | # make it the responsibility of the SegmentFetcher. Each Share that gets a |
| 1824 | # valid UEB will tell the SegmentFetcher BADSEGNUM (instead of COMPLETE or |
| 1825 | # CORRUPT). The SegmentFetcher it then responsible for shutting down, and |
| 1826 | # informing its parent (the CiphertextFileNode) of the BadSegmentNumberError, |
| 1827 | # which is then passed to the client of get_segment(). |
| 1828 | |
| 1829 | |
| 1830 | # TODO: if offset table is corrupt, attacker could cause us to fetch whole |
| 1831 | # (large) share |
| 1832 | |
| 1833 | # log budget: when downloading at 1MBps (i.e. 8 segments-per-second), 10 |
| 1834 | # log.OPERATIONAL per second, 100 log.NOISY per second. With k=3, that's 3 |
| 1835 | # log.NOISY per block fetch. |