Changeset 63b61ce in trunk
- Timestamp:
- 2010-08-04T07:27:10Z (15 years ago)
- Branches:
- master
- Children:
- 20847dd
- Parents:
- 7b7b0c9
- Location:
- src/allmydata/test
- Files:
-
- 13 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/test/no_network.py ¶
r7b7b0c9 r63b61ce 224 224 ss = StorageServer(serverdir, serverid, stats_provider=SimpleStats(), 225 225 readonly_storage=readonly) 226 ss._no_network_server_number = i 226 227 return ss 227 228 … … 320 321 return sorted(shares) 321 322 323 def copy_shares(self, uri): 324 shares = {} 325 for (shnum, serverid, sharefile) in self.find_uri_shares(uri): 326 shares[sharefile] = open(sharefile, "rb").read() 327 return shares 328 329 def restore_all_shares(self, shares): 330 for sharefile, data in shares.items(): 331 open(sharefile, "wb").write(data) 332 322 333 def delete_share(self, (shnum, serverid, sharefile)): 323 334 os.unlink(sharefile) … … 339 350 corruptdata = corruptor(sharedata, debug=debug) 340 351 open(i_sharefile, "wb").write(corruptdata) 352 353 def corrupt_all_shares(self, uri, corruptor, debug=False): 354 for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri): 355 sharedata = open(i_sharefile, "rb").read() 356 corruptdata = corruptor(sharedata, debug=debug) 357 open(i_sharefile, "wb").write(corruptdata) 341 358 342 359 def GET(self, urlpath, followRedirect=False, return_response=False, -
TabularUnified src/allmydata/test/test_cli.py ¶
r7b7b0c9 r63b61ce 2301 2301 d.addCallback(_stash_bad) 2302 2302 2303 # the download is abandoned as soon as it's clear that we won't get 2304 # enough shares. The one remaining share might be in either the 2305 # COMPLETE or the PENDING state. 2306 in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3" 2307 in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3" 2308 2303 2309 d.addCallback(lambda ign: self.do_cli("get", self.uri_1share)) 2304 2310 def _check1((rc, out, err)): … … 2306 2312 self.failUnless("410 Gone" in err, err) 2307 2313 self.failUnlessIn("NotEnoughSharesError: ", err) 2308 self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err) 2314 self.failUnless(in_complete_msg in err or in_pending_msg in err, 2315 err) 2309 2316 d.addCallback(_check1) 2310 2317 … … 2315 2322 self.failUnless("410 Gone" in err, err) 2316 2323 self.failUnlessIn("NotEnoughSharesError: ", err) 2317 self.failUnlessIn("Failed to get enough shareholders: have 1, need 3", err) 2324 self.failUnless(in_complete_msg in err or in_pending_msg in err, 2325 err) 2318 2326 self.failIf(os.path.exists(targetf)) 2319 2327 d.addCallback(_check2) -
TabularUnified src/allmydata/test/test_dirnode.py ¶
r7b7b0c9 r63b61ce 1203 1203 known_tree = b32decode(self.known_tree) 1204 1204 nodemaker = NodeMaker(None, None, None, 1205 None, None, None,1205 None, None, 1206 1206 {"k": 3, "n": 10}, None) 1207 1207 write_uri = "URI:SSK-RO:e3mdrzfwhoq42hy5ubcz6rp3o4:ybyibhnp3vvwuq2vaw2ckjmesgkklfs6ghxleztqidihjyofgw7q" … … 1265 1265 1266 1266 def test_deep_immutable(self): 1267 nm = NodeMaker(None, None, None, None, None, None, {"k": 3, "n": 10}, 1268 None) 1267 nm = NodeMaker(None, None, None, None, None, {"k": 3, "n": 10}, None) 1269 1268 fn = MinimalFakeMutableFile() 1270 1269 … … 1360 1359 def __init__(self): 1361 1360 self.nodemaker = FakeNodeMaker(None, None, None, 1362 None, None, None,1361 None, None, 1363 1362 {"k":3,"n":10}, None) 1364 1363 def create_node_from_uri(self, rwcap, rocap): … … 1644 1643 nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder, 1645 1644 c0.get_history(), c0.getServiceNamed("uploader"), 1646 c0.downloader, 1647 c0.download_cache_dirman, 1645 c0.terminator, 1648 1646 c0.get_encoding_parameters(), 1649 1647 c0._key_generator) -
TabularUnified src/allmydata/test/test_download.py ¶
r7b7b0c9 r63b61ce 6 6 import os 7 7 from twisted.trial import unittest 8 from twisted.internet import defer, reactor 8 9 from allmydata import uri 9 10 from allmydata.storage.server import storage_index_to_dir 10 from allmydata.util import base32, fileutil 11 from allmydata.util.consumer import download_to_data 12 from allmydata.immutable import upload 11 from allmydata.util import base32, fileutil, spans, log 12 from allmydata.util.consumer import download_to_data, MemoryConsumer 13 from allmydata.immutable import upload, layout 13 14 from allmydata.test.no_network import GridTestMixin 15 from allmydata.test.common import ShouldFailMixin 16 from allmydata.interfaces import NotEnoughSharesError, NoSharesError 17 from allmydata.immutable.downloader.common import BadSegmentNumberError, \ 18 BadCiphertextHashError, DownloadStopped 19 from allmydata.codec import CRSDecoder 20 from foolscap.eventual import fireEventually, flushEventualQueue 14 21 15 22 plaintext = "This is a moderate-sized file.\n" * 10 … … 69 76 #--------- END stored_shares.py ---------------- 70 77 71 class DownloadTest(GridTestMixin, unittest.TestCase): 72 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. 73 def test_download(self): 74 self.basedir = self.mktemp() 75 self.set_up_grid() 76 self.c0 = self.g.clients[0] 77 78 # do this to create the shares 79 #return self.create_shares() 80 81 self.load_shares() 82 d = self.download_immutable() 83 d.addCallback(self.download_mutable) 84 return d 78 class _Base(GridTestMixin, ShouldFailMixin): 85 79 86 80 def create_shares(self, ignored=None): … … 179 173 self.failUnlessEqual(data, plaintext) 180 174 d.addCallback(_got_data) 175 # make sure we can use the same node twice 176 d.addCallback(lambda ign: download_to_data(n)) 177 d.addCallback(_got_data) 181 178 return d 182 179 … … 189 186 return d 190 187 188 class DownloadTest(_Base, unittest.TestCase): 189 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. 190 def test_download(self): 191 self.basedir = self.mktemp() 192 self.set_up_grid() 193 self.c0 = self.g.clients[0] 194 195 # do this to create the shares 196 #return self.create_shares() 197 198 self.load_shares() 199 d = self.download_immutable() 200 d.addCallback(self.download_mutable) 201 return d 202 203 def test_download_failover(self): 204 self.basedir = self.mktemp() 205 self.set_up_grid() 206 self.c0 = self.g.clients[0] 207 208 self.load_shares() 209 si = uri.from_string(immutable_uri).get_storage_index() 210 si_dir = storage_index_to_dir(si) 211 212 n = self.c0.create_node_from_uri(immutable_uri) 213 d = download_to_data(n) 214 def _got_data(data): 215 self.failUnlessEqual(data, plaintext) 216 d.addCallback(_got_data) 217 218 def _clobber_some_shares(ign): 219 # find the three shares that were used, and delete them. Then 220 # download again, forcing the downloader to fail over to other 221 # shares 222 for s in n._cnode._node._shares: 223 for clientnum in immutable_shares: 224 for shnum in immutable_shares[clientnum]: 225 if s._shnum == shnum: 226 fn = os.path.join(self.get_serverdir(clientnum), 227 "shares", si_dir, str(shnum)) 228 os.unlink(fn) 229 d.addCallback(_clobber_some_shares) 230 d.addCallback(lambda ign: download_to_data(n)) 231 d.addCallback(_got_data) 232 233 def _clobber_most_shares(ign): 234 # delete all but one of the shares that are still alive 235 live_shares = [s for s in n._cnode._node._shares if s.is_alive()] 236 save_me = live_shares[0]._shnum 237 for clientnum in immutable_shares: 238 for shnum in immutable_shares[clientnum]: 239 if shnum == save_me: 240 continue 241 fn = os.path.join(self.get_serverdir(clientnum), 242 "shares", si_dir, str(shnum)) 243 if os.path.exists(fn): 244 os.unlink(fn) 245 # now the download should fail with NotEnoughSharesError 246 return self.shouldFail(NotEnoughSharesError, "1shares", None, 247 download_to_data, n) 248 d.addCallback(_clobber_most_shares) 249 250 def _clobber_all_shares(ign): 251 # delete the last remaining share 252 for clientnum in immutable_shares: 253 for shnum in immutable_shares[clientnum]: 254 fn = os.path.join(self.get_serverdir(clientnum), 255 "shares", si_dir, str(shnum)) 256 if os.path.exists(fn): 257 os.unlink(fn) 258 # now a new download should fail with NoSharesError. We want a 259 # new ImmutableFileNode so it will forget about the old shares. 260 # If we merely called create_node_from_uri() without first 261 # dereferencing the original node, the NodeMaker's _node_cache 262 # would give us back the old one. 263 n = None 264 n = self.c0.create_node_from_uri(immutable_uri) 265 return self.shouldFail(NoSharesError, "0shares", None, 266 download_to_data, n) 267 d.addCallback(_clobber_all_shares) 268 return d 269 270 def test_lost_servers(self): 271 # while downloading a file (after seg[0], before seg[1]), lose the 272 # three servers that we were using. The download should switch over 273 # to other servers. 274 self.basedir = self.mktemp() 275 self.set_up_grid() 276 self.c0 = self.g.clients[0] 277 278 # upload a file with multiple segments, so we can catch the download 279 # in the middle. 280 u = upload.Data(plaintext, None) 281 u.max_segment_size = 70 # 5 segs 282 d = self.c0.upload(u) 283 def _uploaded(ur): 284 self.uri = ur.uri 285 self.n = self.c0.create_node_from_uri(self.uri) 286 return download_to_data(self.n) 287 d.addCallback(_uploaded) 288 def _got_data(data): 289 self.failUnlessEqual(data, plaintext) 290 d.addCallback(_got_data) 291 def _kill_some_servers(): 292 # find the three shares that were used, and delete them. Then 293 # download again, forcing the downloader to fail over to other 294 # shares 295 servers = [] 296 shares = sorted([s._shnum for s in self.n._cnode._node._shares]) 297 self.failUnlessEqual(shares, [0,1,2]) 298 # break the RIBucketReader references 299 for s in self.n._cnode._node._shares: 300 s._rref.broken = True 301 for servernum in immutable_shares: 302 for shnum in immutable_shares[servernum]: 303 if s._shnum == shnum: 304 ss = self.g.servers_by_number[servernum] 305 servers.append(ss) 306 # and, for good measure, break the RIStorageServer references 307 # too, just in case the downloader gets more aggressive in the 308 # future and tries to re-fetch the same share. 309 for ss in servers: 310 wrapper = self.g.servers_by_id[ss.my_nodeid] 311 wrapper.broken = True 312 def _download_again(ign): 313 c = StallingConsumer(_kill_some_servers) 314 return self.n.read(c) 315 d.addCallback(_download_again) 316 def _check_failover(c): 317 self.failUnlessEqual("".join(c.chunks), plaintext) 318 shares = sorted([s._shnum for s in self.n._cnode._node._shares]) 319 # we should now be using more shares than we were before 320 self.failIfEqual(shares, [0,1,2]) 321 d.addCallback(_check_failover) 322 return d 323 324 def test_badguess(self): 325 self.basedir = self.mktemp() 326 self.set_up_grid() 327 self.c0 = self.g.clients[0] 328 self.load_shares() 329 n = self.c0.create_node_from_uri(immutable_uri) 330 331 # Cause the downloader to guess a segsize that's too low, so it will 332 # ask for a segment number that's too high (beyond the end of the 333 # real list, causing BadSegmentNumberError), to exercise 334 # Segmentation._retry_bad_segment 335 336 con1 = MemoryConsumer() 337 n._cnode._node._build_guessed_tables(90) 338 # plaintext size of 310 bytes, wrong-segsize of 90 bytes, will make 339 # us think that file[180:200] is in the third segment (segnum=2), but 340 # really there's only one segment 341 d = n.read(con1, 180, 20) 342 def _done(res): 343 self.failUnlessEqual("".join(con1.chunks), plaintext[180:200]) 344 d.addCallback(_done) 345 return d 346 347 def test_simultaneous_badguess(self): 348 self.basedir = self.mktemp() 349 self.set_up_grid() 350 self.c0 = self.g.clients[0] 351 352 # upload a file with multiple segments, and a non-default segsize, to 353 # exercise the offset-guessing code. Because we don't tell the 354 # downloader about the unusual segsize, it will guess wrong, and have 355 # to do extra roundtrips to get the correct data. 356 u = upload.Data(plaintext, None) 357 u.max_segment_size = 70 # 5 segs, 8-wide hashtree 358 con1 = MemoryConsumer() 359 con2 = MemoryConsumer() 360 d = self.c0.upload(u) 361 def _uploaded(ur): 362 n = self.c0.create_node_from_uri(ur.uri) 363 d1 = n.read(con1, 70, 20) 364 d2 = n.read(con2, 140, 20) 365 return defer.gatherResults([d1,d2]) 366 d.addCallback(_uploaded) 367 def _done(res): 368 self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) 369 self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) 370 d.addCallback(_done) 371 return d 372 373 def test_simultaneous_goodguess(self): 374 self.basedir = self.mktemp() 375 self.set_up_grid() 376 self.c0 = self.g.clients[0] 377 378 # upload a file with multiple segments, and a non-default segsize, to 379 # exercise the offset-guessing code. This time we *do* tell the 380 # downloader about the unusual segsize, so it can guess right. 381 u = upload.Data(plaintext, None) 382 u.max_segment_size = 70 # 5 segs, 8-wide hashtree 383 con1 = MemoryConsumer() 384 con2 = MemoryConsumer() 385 d = self.c0.upload(u) 386 def _uploaded(ur): 387 n = self.c0.create_node_from_uri(ur.uri) 388 n._cnode._node._build_guessed_tables(u.max_segment_size) 389 d1 = n.read(con1, 70, 20) 390 #d2 = n.read(con2, 140, 20) # XXX 391 d2 = defer.succeed(None) 392 return defer.gatherResults([d1,d2]) 393 d.addCallback(_uploaded) 394 def _done(res): 395 self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) 396 self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) 397 #d.addCallback(_done) 398 return d 399 400 def test_sequential_goodguess(self): 401 self.basedir = self.mktemp() 402 self.set_up_grid() 403 self.c0 = self.g.clients[0] 404 data = (plaintext*100)[:30000] # multiple of k 405 406 # upload a file with multiple segments, and a non-default segsize, to 407 # exercise the offset-guessing code. This time we *do* tell the 408 # downloader about the unusual segsize, so it can guess right. 409 u = upload.Data(data, None) 410 u.max_segment_size = 6000 # 5 segs, 8-wide hashtree 411 con1 = MemoryConsumer() 412 con2 = MemoryConsumer() 413 d = self.c0.upload(u) 414 def _uploaded(ur): 415 n = self.c0.create_node_from_uri(ur.uri) 416 n._cnode._node._build_guessed_tables(u.max_segment_size) 417 d = n.read(con1, 12000, 20) 418 def _read1(ign): 419 self.failUnlessEqual("".join(con1.chunks), data[12000:12020]) 420 return n.read(con2, 24000, 20) 421 d.addCallback(_read1) 422 def _read2(ign): 423 self.failUnlessEqual("".join(con2.chunks), data[24000:24020]) 424 d.addCallback(_read2) 425 return d 426 d.addCallback(_uploaded) 427 return d 428 429 430 def test_simultaneous_get_blocks(self): 431 self.basedir = self.mktemp() 432 self.set_up_grid() 433 self.c0 = self.g.clients[0] 434 435 self.load_shares() 436 stay_empty = [] 437 438 n = self.c0.create_node_from_uri(immutable_uri) 439 d = download_to_data(n) 440 def _use_shares(ign): 441 shares = list(n._cnode._node._shares) 442 s0 = shares[0] 443 # make sure .cancel works too 444 o0 = s0.get_block(0) 445 o0.subscribe(lambda **kwargs: stay_empty.append(kwargs)) 446 o1 = s0.get_block(0) 447 o2 = s0.get_block(0) 448 o0.cancel() 449 o3 = s0.get_block(1) # state=BADSEGNUM 450 d1 = defer.Deferred() 451 d2 = defer.Deferred() 452 d3 = defer.Deferred() 453 o1.subscribe(lambda **kwargs: d1.callback(kwargs)) 454 o2.subscribe(lambda **kwargs: d2.callback(kwargs)) 455 o3.subscribe(lambda **kwargs: d3.callback(kwargs)) 456 return defer.gatherResults([d1,d2,d3]) 457 d.addCallback(_use_shares) 458 def _done(res): 459 r1,r2,r3 = res 460 self.failUnlessEqual(r1["state"], "COMPLETE") 461 self.failUnlessEqual(r2["state"], "COMPLETE") 462 self.failUnlessEqual(r3["state"], "BADSEGNUM") 463 self.failUnless("block" in r1) 464 self.failUnless("block" in r2) 465 self.failIf(stay_empty) 466 d.addCallback(_done) 467 return d 468 469 def test_download_no_overrun(self): 470 self.basedir = self.mktemp() 471 self.set_up_grid() 472 self.c0 = self.g.clients[0] 473 474 self.load_shares() 475 476 # tweak the client's copies of server-version data, so it believes 477 # that they're old and can't handle reads that overrun the length of 478 # the share. This exercises a different code path. 479 for (peerid, rref) in self.c0.storage_broker.get_all_servers(): 480 v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] 481 v1["tolerates-immutable-read-overrun"] = False 482 483 n = self.c0.create_node_from_uri(immutable_uri) 484 d = download_to_data(n) 485 def _got_data(data): 486 self.failUnlessEqual(data, plaintext) 487 d.addCallback(_got_data) 488 return d 489 490 def test_download_segment(self): 491 self.basedir = self.mktemp() 492 self.set_up_grid() 493 self.c0 = self.g.clients[0] 494 self.load_shares() 495 n = self.c0.create_node_from_uri(immutable_uri) 496 cn = n._cnode 497 (d,c) = cn.get_segment(0) 498 def _got_segment((offset,data,decodetime)): 499 self.failUnlessEqual(offset, 0) 500 self.failUnlessEqual(len(data), len(plaintext)) 501 d.addCallback(_got_segment) 502 return d 503 504 def test_download_segment_cancel(self): 505 self.basedir = self.mktemp() 506 self.set_up_grid() 507 self.c0 = self.g.clients[0] 508 self.load_shares() 509 n = self.c0.create_node_from_uri(immutable_uri) 510 cn = n._cnode 511 (d,c) = cn.get_segment(0) 512 fired = [] 513 d.addCallback(fired.append) 514 c.cancel() 515 d = fireEventually() 516 d.addCallback(flushEventualQueue) 517 def _check(ign): 518 self.failUnlessEqual(fired, []) 519 d.addCallback(_check) 520 return d 521 522 def test_download_bad_segment(self): 523 self.basedir = self.mktemp() 524 self.set_up_grid() 525 self.c0 = self.g.clients[0] 526 self.load_shares() 527 n = self.c0.create_node_from_uri(immutable_uri) 528 cn = n._cnode 529 def _try_download(): 530 (d,c) = cn.get_segment(1) 531 return d 532 d = self.shouldFail(BadSegmentNumberError, "badseg", 533 "segnum=1, numsegs=1", 534 _try_download) 535 return d 536 537 def test_download_segment_terminate(self): 538 self.basedir = self.mktemp() 539 self.set_up_grid() 540 self.c0 = self.g.clients[0] 541 self.load_shares() 542 n = self.c0.create_node_from_uri(immutable_uri) 543 cn = n._cnode 544 (d,c) = cn.get_segment(0) 545 fired = [] 546 d.addCallback(fired.append) 547 self.c0.terminator.disownServiceParent() 548 d = fireEventually() 549 d.addCallback(flushEventualQueue) 550 def _check(ign): 551 self.failUnlessEqual(fired, []) 552 d.addCallback(_check) 553 return d 554 555 def test_pause(self): 556 self.basedir = self.mktemp() 557 self.set_up_grid() 558 self.c0 = self.g.clients[0] 559 self.load_shares() 560 n = self.c0.create_node_from_uri(immutable_uri) 561 c = PausingConsumer() 562 d = n.read(c) 563 def _downloaded(mc): 564 newdata = "".join(mc.chunks) 565 self.failUnlessEqual(newdata, plaintext) 566 d.addCallback(_downloaded) 567 return d 568 569 def test_pause_then_stop(self): 570 self.basedir = self.mktemp() 571 self.set_up_grid() 572 self.c0 = self.g.clients[0] 573 self.load_shares() 574 n = self.c0.create_node_from_uri(immutable_uri) 575 c = PausingAndStoppingConsumer() 576 d = self.shouldFail(DownloadStopped, "test_pause_then_stop", 577 "our Consumer called stopProducing()", 578 n.read, c) 579 return d 580 581 def test_stop(self): 582 # use a download targetthat does an immediate stop (ticket #473) 583 self.basedir = self.mktemp() 584 self.set_up_grid() 585 self.c0 = self.g.clients[0] 586 self.load_shares() 587 n = self.c0.create_node_from_uri(immutable_uri) 588 c = StoppingConsumer() 589 d = self.shouldFail(DownloadStopped, "test_stop", 590 "our Consumer called stopProducing()", 591 n.read, c) 592 return d 593 594 def test_download_segment_bad_ciphertext_hash(self): 595 # The crypttext_hash_tree asserts the integrity of the decoded 596 # ciphertext, and exists to detect two sorts of problems. The first 597 # is a bug in zfec decode. The second is the "two-sided t-shirt" 598 # attack (found by Christian Grothoff), in which a malicious uploader 599 # creates two sets of shares (one for file A, second for file B), 600 # uploads a combination of them (shares 0-4 of A, 5-9 of B), and then 601 # builds an otherwise normal UEB around those shares: their goal is 602 # to give their victim a filecap which sometimes downloads the good A 603 # contents, and sometimes the bad B contents, depending upon which 604 # servers/shares they can get to. Having a hash of the ciphertext 605 # forces them to commit to exactly one version. (Christian's prize 606 # for finding this problem was a t-shirt with two sides: the shares 607 # of file A on the front, B on the back). 608 609 # creating a set of shares with this property is too hard, although 610 # it'd be nice to do so and confirm our fix. (it requires a lot of 611 # tampering with the uploader). So instead, we just damage the 612 # decoder. The tail decoder is rebuilt each time, so we need to use a 613 # file with multiple segments. 614 self.basedir = self.mktemp() 615 self.set_up_grid() 616 self.c0 = self.g.clients[0] 617 618 u = upload.Data(plaintext, None) 619 u.max_segment_size = 60 # 6 segs 620 d = self.c0.upload(u) 621 def _uploaded(ur): 622 n = self.c0.create_node_from_uri(ur.uri) 623 n._cnode._node._build_guessed_tables(u.max_segment_size) 624 625 d = download_to_data(n) 626 def _break_codec(data): 627 # the codec isn't created until the UEB is retrieved 628 node = n._cnode._node 629 vcap = node._verifycap 630 k, N = vcap.needed_shares, vcap.total_shares 631 bad_codec = BrokenDecoder() 632 bad_codec.set_params(node.segment_size, k, N) 633 node._codec = bad_codec 634 d.addCallback(_break_codec) 635 # now try to download it again. The broken codec will provide 636 # ciphertext that fails the hash test. 637 d.addCallback(lambda ign: 638 self.shouldFail(BadCiphertextHashError, "badhash", 639 "hash failure in " 640 "ciphertext_hash_tree: segnum=0", 641 download_to_data, n)) 642 return d 643 d.addCallback(_uploaded) 644 return d 645 646 def OFFtest_download_segment_XXX(self): 647 self.basedir = self.mktemp() 648 self.set_up_grid() 649 self.c0 = self.g.clients[0] 650 651 # upload a file with multiple segments, and a non-default segsize, to 652 # exercise the offset-guessing code. This time we *do* tell the 653 # downloader about the unusual segsize, so it can guess right. 654 u = upload.Data(plaintext, None) 655 u.max_segment_size = 70 # 5 segs, 8-wide hashtree 656 con1 = MemoryConsumer() 657 con2 = MemoryConsumer() 658 d = self.c0.upload(u) 659 def _uploaded(ur): 660 n = self.c0.create_node_from_uri(ur.uri) 661 n._cnode._node._build_guessed_tables(u.max_segment_size) 662 d1 = n.read(con1, 70, 20) 663 #d2 = n.read(con2, 140, 20) 664 d2 = defer.succeed(None) 665 return defer.gatherResults([d1,d2]) 666 d.addCallback(_uploaded) 667 def _done(res): 668 self.failUnlessEqual("".join(con1.chunks), plaintext[70:90]) 669 self.failUnlessEqual("".join(con2.chunks), plaintext[140:160]) 670 #d.addCallback(_done) 671 return d 672 673 def test_duplicate_shares(self): 674 self.basedir = self.mktemp() 675 self.set_up_grid() 676 self.c0 = self.g.clients[0] 677 678 self.load_shares() 679 # make sure everybody has a copy of sh0. The second server contacted 680 # will report two shares, and the ShareFinder will handle the 681 # duplicate by attaching both to the same CommonShare instance. 682 si = uri.from_string(immutable_uri).get_storage_index() 683 si_dir = storage_index_to_dir(si) 684 sh0_file = [sharefile 685 for (shnum, serverid, sharefile) 686 in self.find_uri_shares(immutable_uri) 687 if shnum == 0][0] 688 sh0_data = open(sh0_file, "rb").read() 689 for clientnum in immutable_shares: 690 if 0 in immutable_shares[clientnum]: 691 continue 692 cdir = self.get_serverdir(clientnum) 693 target = os.path.join(cdir, "shares", si_dir, "0") 694 outf = open(target, "wb") 695 outf.write(sh0_data) 696 outf.close() 697 698 d = self.download_immutable() 699 return d 700 701 def test_verifycap(self): 702 self.basedir = self.mktemp() 703 self.set_up_grid() 704 self.c0 = self.g.clients[0] 705 self.load_shares() 706 707 n = self.c0.create_node_from_uri(immutable_uri) 708 vcap = n.get_verify_cap().to_string() 709 vn = self.c0.create_node_from_uri(vcap) 710 d = download_to_data(vn) 711 def _got_ciphertext(ciphertext): 712 self.failUnlessEqual(len(ciphertext), len(plaintext)) 713 self.failIfEqual(ciphertext, plaintext) 714 d.addCallback(_got_ciphertext) 715 return d 716 717 class BrokenDecoder(CRSDecoder): 718 def decode(self, shares, shareids): 719 d = CRSDecoder.decode(self, shares, shareids) 720 def _decoded(buffers): 721 def _corruptor(s, which): 722 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] 723 buffers[0] = _corruptor(buffers[0], 0) # flip lsb of first byte 724 return buffers 725 d.addCallback(_decoded) 726 return d 727 728 729 class PausingConsumer(MemoryConsumer): 730 def __init__(self): 731 MemoryConsumer.__init__(self) 732 self.size = 0 733 self.writes = 0 734 def write(self, data): 735 self.size += len(data) 736 self.writes += 1 737 if self.writes <= 2: 738 # we happen to use 4 segments, and want to avoid pausing on the 739 # last one (since then the _unpause timer will still be running) 740 self.producer.pauseProducing() 741 reactor.callLater(0.1, self._unpause) 742 return MemoryConsumer.write(self, data) 743 def _unpause(self): 744 self.producer.resumeProducing() 745 746 class PausingAndStoppingConsumer(PausingConsumer): 747 def write(self, data): 748 self.producer.pauseProducing() 749 reactor.callLater(0.5, self._stop) 750 def _stop(self): 751 self.producer.stopProducing() 752 753 class StoppingConsumer(PausingConsumer): 754 def write(self, data): 755 self.producer.stopProducing() 756 757 class StallingConsumer(MemoryConsumer): 758 def __init__(self, halfway_cb): 759 MemoryConsumer.__init__(self) 760 self.halfway_cb = halfway_cb 761 self.writes = 0 762 def write(self, data): 763 self.writes += 1 764 if self.writes == 1: 765 self.halfway_cb() 766 return MemoryConsumer.write(self, data) 767 768 class Corruption(_Base, unittest.TestCase): 769 770 def _corrupt_flip(self, ign, imm_uri, which): 771 log.msg("corrupt %d" % which) 772 def _corruptor(s, debug=False): 773 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] 774 self.corrupt_shares_numbered(imm_uri, [0], _corruptor) 775 776 def _corrupt_set(self, ign, imm_uri, which, newvalue): 777 log.msg("corrupt %d" % which) 778 def _corruptor(s, debug=False): 779 return s[:which] + chr(newvalue) + s[which+1:] 780 self.corrupt_shares_numbered(imm_uri, [0], _corruptor) 781 782 def test_each_byte(self): 783 # Setting catalog_detection=True performs an exhaustive test of the 784 # Downloader's response to corruption in the lsb of each byte of the 785 # 2070-byte share, with two goals: make sure we tolerate all forms of 786 # corruption (i.e. don't hang or return bad data), and make a list of 787 # which bytes can be corrupted without influencing the download 788 # (since we don't need every byte of the share). That takes 50s to 789 # run on my laptop and doesn't have any actual asserts, so we don't 790 # normally do that. 791 self.catalog_detection = False 792 793 self.basedir = "download/Corruption/each_byte" 794 self.set_up_grid() 795 self.c0 = self.g.clients[0] 796 797 # to exercise the block-hash-tree code properly, we need to have 798 # multiple segments. We don't tell the downloader about the different 799 # segsize, so it guesses wrong and must do extra roundtrips. 800 u = upload.Data(plaintext, None) 801 u.max_segment_size = 120 # 3 segs, 4-wide hashtree 802 803 if self.catalog_detection: 804 undetected = spans.Spans() 805 806 def _download(ign, imm_uri, which, expected): 807 n = self.c0.create_node_from_uri(imm_uri) 808 # for this test to work, we need to have a new Node each time. 809 # Make sure the NodeMaker's weakcache hasn't interfered. 810 assert not n._cnode._node._shares 811 d = download_to_data(n) 812 def _got_data(data): 813 self.failUnlessEqual(data, plaintext) 814 shnums = sorted([s._shnum for s in n._cnode._node._shares]) 815 no_sh0 = bool(0 not in shnums) 816 sh0 = [s for s in n._cnode._node._shares if s._shnum == 0] 817 sh0_had_corruption = False 818 if sh0 and sh0[0].had_corruption: 819 sh0_had_corruption = True 820 num_needed = len(n._cnode._node._shares) 821 if self.catalog_detection: 822 detected = no_sh0 or sh0_had_corruption or (num_needed!=3) 823 if not detected: 824 undetected.add(which, 1) 825 if expected == "no-sh0": 826 self.failIfIn(0, shnums) 827 elif expected == "0bad-need-3": 828 self.failIf(no_sh0) 829 self.failUnless(sh0[0].had_corruption) 830 self.failUnlessEqual(num_needed, 3) 831 elif expected == "need-4th": 832 self.failIf(no_sh0) 833 self.failUnless(sh0[0].had_corruption) 834 self.failIfEqual(num_needed, 3) 835 d.addCallback(_got_data) 836 return d 837 838 839 d = self.c0.upload(u) 840 def _uploaded(ur): 841 imm_uri = ur.uri 842 self.shares = self.copy_shares(imm_uri) 843 d = defer.succeed(None) 844 # 'victims' is a list of corruption tests to run. Each one flips 845 # the low-order bit of the specified offset in the share file (so 846 # offset=0 is the MSB of the container version, offset=15 is the 847 # LSB of the share version, offset=24 is the MSB of the 848 # data-block-offset, and offset=48 is the first byte of the first 849 # data-block). Each one also specifies what sort of corruption 850 # we're expecting to see. 851 no_sh0_victims = [0,1,2,3] # container version 852 need3_victims = [ ] # none currently in this category 853 # when the offsets are corrupted, the Share will be unable to 854 # retrieve the data it wants (because it thinks that data lives 855 # off in the weeds somewhere), and Share treats DataUnavailable 856 # as abandon-this-share, so in general we'll be forced to look 857 # for a 4th share. 858 need_4th_victims = [12,13,14,15, # share version 859 24,25,26,27, # offset[data] 860 32,33,34,35, # offset[crypttext_hash_tree] 861 36,37,38,39, # offset[block_hashes] 862 44,45,46,47, # offset[UEB] 863 ] 864 need_4th_victims.append(48) # block data 865 # when corrupting hash trees, we must corrupt a value that isn't 866 # directly set from somewhere else. Since we download data from 867 # seg0, corrupt something on its hash chain, like [2] (the 868 # right-hand child of the root) 869 need_4th_victims.append(600+2*32) # block_hashes[2] 870 # Share.loop is pretty conservative: it abandons the share at the 871 # first sign of corruption. It doesn't strictly need to be this 872 # way: if the UEB were corrupt, we could still get good block 873 # data from that share, as long as there was a good copy of the 874 # UEB elsewhere. If this behavior is relaxed, then corruption in 875 # the following fields (which are present in multiple shares) 876 # should fall into the "need3_victims" case instead of the 877 # "need_4th_victims" case. 878 need_4th_victims.append(376+2*32) # crypttext_hash_tree[2] 879 need_4th_victims.append(824) # share_hashes 880 need_4th_victims.append(994) # UEB length 881 need_4th_victims.append(998) # UEB 882 corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] + 883 [(i, "0bad-need-3") for i in need3_victims] + 884 [(i, "need-4th") for i in need_4th_victims]) 885 if self.catalog_detection: 886 corrupt_me = [(i, "") for i in range(len(self.sh0_orig))] 887 for i,expected in corrupt_me: 888 # All these tests result in a successful download. What we're 889 # measuring is how many shares the downloader had to use. 890 d.addCallback(self._corrupt_flip, imm_uri, i) 891 d.addCallback(_download, imm_uri, i, expected) 892 d.addCallback(lambda ign: self.restore_all_shares(self.shares)) 893 d.addCallback(fireEventually) 894 corrupt_values = [(3, 2, "no-sh0"), 895 (15, 2, "need-4th"), # share looks v2 896 ] 897 for i,newvalue,expected in corrupt_values: 898 d.addCallback(self._corrupt_set, imm_uri, i, newvalue) 899 d.addCallback(_download, imm_uri, i, expected) 900 d.addCallback(lambda ign: self.restore_all_shares(self.shares)) 901 d.addCallback(fireEventually) 902 return d 903 d.addCallback(_uploaded) 904 def _show_results(ign): 905 print 906 print ("of [0:%d], corruption ignored in %s" % 907 (len(self.sh0_orig), undetected.dump())) 908 if self.catalog_detection: 909 d.addCallback(_show_results) 910 # of [0:2070], corruption ignored in len=1133: 911 # [4-11],[16-23],[28-31],[152-439],[600-663],[1309-2069] 912 # [4-11]: container sizes 913 # [16-23]: share block/data sizes 914 # [152-375]: plaintext hash tree 915 # [376-408]: crypttext_hash_tree[0] (root) 916 # [408-439]: crypttext_hash_tree[1] (computed) 917 # [600-631]: block hash tree[0] (root) 918 # [632-663]: block hash tree[1] (computed) 919 # [1309-]: reserved+unused UEB space 920 return d 921 922 def test_failure(self): 923 # this test corrupts all shares in the same way, and asserts that the 924 # download fails. 925 926 self.basedir = "download/Corruption/failure" 927 self.set_up_grid() 928 self.c0 = self.g.clients[0] 929 930 # to exercise the block-hash-tree code properly, we need to have 931 # multiple segments. We don't tell the downloader about the different 932 # segsize, so it guesses wrong and must do extra roundtrips. 933 u = upload.Data(plaintext, None) 934 u.max_segment_size = 120 # 3 segs, 4-wide hashtree 935 936 d = self.c0.upload(u) 937 def _uploaded(ur): 938 imm_uri = ur.uri 939 self.shares = self.copy_shares(imm_uri) 940 941 corrupt_me = [(48, "block data", "Last failure: None"), 942 (600+2*32, "block_hashes[2]", "BadHashError"), 943 (376+2*32, "crypttext_hash_tree[2]", "BadHashError"), 944 (824, "share_hashes", "BadHashError"), 945 ] 946 def _download(imm_uri): 947 n = self.c0.create_node_from_uri(imm_uri) 948 # for this test to work, we need to have a new Node each time. 949 # Make sure the NodeMaker's weakcache hasn't interfered. 950 assert not n._cnode._node._shares 951 return download_to_data(n) 952 953 d = defer.succeed(None) 954 for i,which,substring in corrupt_me: 955 # All these tests result in a failed download. 956 d.addCallback(self._corrupt_flip_all, imm_uri, i) 957 d.addCallback(lambda ign: 958 self.shouldFail(NotEnoughSharesError, which, 959 substring, 960 _download, imm_uri)) 961 d.addCallback(lambda ign: self.restore_all_shares(self.shares)) 962 d.addCallback(fireEventually) 963 return d 964 d.addCallback(_uploaded) 965 966 return d 967 968 def _corrupt_flip_all(self, ign, imm_uri, which): 969 def _corruptor(s, debug=False): 970 return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] 971 self.corrupt_all_shares(imm_uri, _corruptor) 972 973 class DownloadV2(_Base, unittest.TestCase): 974 # tests which exercise v2-share code. They first upload a file with 975 # FORCE_V2 set. 976 977 def setUp(self): 978 d = defer.maybeDeferred(_Base.setUp, self) 979 def _set_force_v2(ign): 980 self.old_force_v2 = layout.FORCE_V2 981 layout.FORCE_V2 = True 982 d.addCallback(_set_force_v2) 983 return d 984 def tearDown(self): 985 layout.FORCE_V2 = self.old_force_v2 986 return _Base.tearDown(self) 987 988 def test_download(self): 989 self.basedir = self.mktemp() 990 self.set_up_grid() 991 self.c0 = self.g.clients[0] 992 993 # upload a file 994 u = upload.Data(plaintext, None) 995 d = self.c0.upload(u) 996 def _uploaded(ur): 997 imm_uri = ur.uri 998 n = self.c0.create_node_from_uri(imm_uri) 999 return download_to_data(n) 1000 d.addCallback(_uploaded) 1001 return d 1002 1003 def test_download_no_overrun(self): 1004 self.basedir = self.mktemp() 1005 self.set_up_grid() 1006 self.c0 = self.g.clients[0] 1007 1008 # tweak the client's copies of server-version data, so it believes 1009 # that they're old and can't handle reads that overrun the length of 1010 # the share. This exercises a different code path. 1011 for (peerid, rref) in self.c0.storage_broker.get_all_servers(): 1012 v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] 1013 v1["tolerates-immutable-read-overrun"] = False 1014 1015 # upload a file 1016 u = upload.Data(plaintext, None) 1017 d = self.c0.upload(u) 1018 def _uploaded(ur): 1019 imm_uri = ur.uri 1020 n = self.c0.create_node_from_uri(imm_uri) 1021 return download_to_data(n) 1022 d.addCallback(_uploaded) 1023 return d 1024 1025 def OFF_test_no_overrun_corrupt_shver(self): # unnecessary 1026 self.basedir = self.mktemp() 1027 self.set_up_grid() 1028 self.c0 = self.g.clients[0] 1029 1030 for (peerid, rref) in self.c0.storage_broker.get_all_servers(): 1031 v1 = rref.version["http://allmydata.org/tahoe/protocols/storage/v1"] 1032 v1["tolerates-immutable-read-overrun"] = False 1033 1034 # upload a file 1035 u = upload.Data(plaintext, None) 1036 d = self.c0.upload(u) 1037 def _uploaded(ur): 1038 imm_uri = ur.uri 1039 def _do_corrupt(which, newvalue): 1040 def _corruptor(s, debug=False): 1041 return s[:which] + chr(newvalue) + s[which+1:] 1042 self.corrupt_shares_numbered(imm_uri, [0], _corruptor) 1043 _do_corrupt(12+3, 0x00) 1044 n = self.c0.create_node_from_uri(imm_uri) 1045 d = download_to_data(n) 1046 def _got_data(data): 1047 self.failUnlessEqual(data, plaintext) 1048 d.addCallback(_got_data) 1049 return d 1050 d.addCallback(_uploaded) 1051 return d -
TabularUnified src/allmydata/test/test_encode.py ¶
r7b7b0c9 r63b61ce 1 1 from zope.interface import implements 2 2 from twisted.trial import unittest 3 from twisted.internet import defer , reactor3 from twisted.internet import defer 4 4 from twisted.python.failure import Failure 5 5 from foolscap.api import fireEventually 6 from allmydata import hashtree,uri7 from allmydata.immutable import encode, upload, download6 from allmydata import uri 7 from allmydata.immutable import encode, upload, checker 8 8 from allmydata.util import hashutil 9 9 from allmydata.util.assertutil import _assert 10 from allmydata.util.consumer import MemoryConsumer 11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ 12 NotEnoughSharesError, IStorageBroker, UploadUnhappinessError 13 from allmydata.monitor import Monitor 14 import allmydata.test.common_util as testutil 10 from allmydata.util.consumer import download_to_data 11 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader 12 from allmydata.test.no_network import GridTestMixin 15 13 16 14 class LostPeerError(Exception): … … 19 17 def flip_bit(good): # flips the last bit 20 18 return good[:-1] + chr(ord(good[-1]) ^ 0x01) 21 22 class FakeStorageBroker:23 implements(IStorageBroker)24 19 25 20 class FakeBucketReaderWriterProxy: … … 58 53 raise LostPeerError("I'm going away now") 59 54 self.blocks[segmentnum] = data 60 return defer.maybeDeferred(_try)61 62 def put_plaintext_hashes(self, hashes):63 def _try():64 assert not self.closed65 assert not self.plaintext_hashes66 self.plaintext_hashes = hashes67 55 return defer.maybeDeferred(_try) 68 56 … … 224 212 fb.put_uri_extension(uebstring) 225 213 verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE) 226 vup = download.ValidatedExtendedURIProxy(fb, verifycap)214 vup = checker.ValidatedExtendedURIProxy(fb, verifycap) 227 215 return vup.start() 228 216 … … 238 226 def _test_reject(self, uebdict): 239 227 d = self._test(uebdict) 240 d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))228 d.addBoth(self._should_fail, (KeyError, checker.BadURIExtension)) 241 229 return d 242 230 … … 334 322 return d 335 323 336 # a series of 3*3 tests to check out edge conditions. One axis is how the337 # plaintext is divided into segments: kn+(-1,0,1). Another way to express338 # that is that n%k == -1 or 0 or 1. For example, for 25-byte segments, we339 # might test 74 bytes, 75 bytes, and 76 bytes.340 341 # on the other axis is how many leaves in the block hash tree we wind up342 # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns343 # into a single leaf. So we'd like to check out, e.g., 3 segments, 4344 # segments, and 5 segments.345 346 # that results in the following series of data lengths:347 # 3 segs: 74, 75, 51348 # 4 segs: 99, 100, 76349 # 5 segs: 124, 125, 101350 351 # all tests encode to 100 shares, which means the share hash tree will352 # have 128 leaves, which means that buckets will be given an 8-long share353 # hash chain354 355 # all 3-segment files will have a 4-leaf blockhashtree, and thus expect356 # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash357 # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash358 # trees, which get 15 blockhashes.359 360 324 def test_send_74(self): 361 325 # 3 segments (25, 25, 24) … … 388 352 return self.do_encode(25, 101, 100, 5, 15, 8) 389 353 390 class PausingConsumer(MemoryConsumer): 391 def __init__(self): 392 MemoryConsumer.__init__(self) 393 self.size = 0 394 self.writes = 0 395 def write(self, data): 396 self.size += len(data) 397 self.writes += 1 398 if self.writes <= 2: 399 # we happen to use 4 segments, and want to avoid pausing on the 400 # last one (since then the _unpause timer will still be running) 401 self.producer.pauseProducing() 402 reactor.callLater(0.1, self._unpause) 403 return MemoryConsumer.write(self, data) 404 def _unpause(self): 405 self.producer.resumeProducing() 406 407 class PausingAndStoppingConsumer(PausingConsumer): 408 def write(self, data): 409 self.producer.pauseProducing() 410 reactor.callLater(0.5, self._stop) 411 def _stop(self): 412 self.producer.stopProducing() 413 414 class StoppingConsumer(PausingConsumer): 415 def write(self, data): 416 self.producer.stopProducing() 417 418 class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin): 419 timeout = 2400 # It takes longer than 240 seconds on Zandr's ARM box. 420 def send_and_recover(self, k_and_happy_and_n=(25,75,100), 421 AVAILABLE_SHARES=None, 422 datalen=76, 423 max_segment_size=25, 424 bucket_modes={}, 425 recover_mode="recover", 426 consumer=None, 427 ): 428 if AVAILABLE_SHARES is None: 429 AVAILABLE_SHARES = k_and_happy_and_n[2] 430 data = make_data(datalen) 431 d = self.send(k_and_happy_and_n, AVAILABLE_SHARES, 432 max_segment_size, bucket_modes, data) 433 # that fires with (uri_extension_hash, e, shareholders) 434 d.addCallback(self.recover, AVAILABLE_SHARES, recover_mode, 435 consumer=consumer) 436 # that fires with newdata 437 def _downloaded((newdata, fd)): 438 self.failUnless(newdata == data, str((len(newdata), len(data)))) 439 return fd 354 355 class Roundtrip(GridTestMixin, unittest.TestCase): 356 357 # a series of 3*3 tests to check out edge conditions. One axis is how the 358 # plaintext is divided into segments: kn+(-1,0,1). Another way to express 359 # this is n%k == -1 or 0 or 1. For example, for 25-byte segments, we 360 # might test 74 bytes, 75 bytes, and 76 bytes. 361 362 # on the other axis is how many leaves in the block hash tree we wind up 363 # with, relative to a power of 2, so 2^a+(-1,0,1). Each segment turns 364 # into a single leaf. So we'd like to check out, e.g., 3 segments, 4 365 # segments, and 5 segments. 366 367 # that results in the following series of data lengths: 368 # 3 segs: 74, 75, 51 369 # 4 segs: 99, 100, 76 370 # 5 segs: 124, 125, 101 371 372 # all tests encode to 100 shares, which means the share hash tree will 373 # have 128 leaves, which means that buckets will be given an 8-long share 374 # hash chain 375 376 # all 3-segment files will have a 4-leaf blockhashtree, and thus expect 377 # to get 7 blockhashes. 4-segment files will also get 4-leaf block hash 378 # trees and 7 blockhashes. 5-segment files will get 8-leaf block hash 379 # trees, which gets 15 blockhashes. 380 381 def test_74(self): return self.do_test_size(74) 382 def test_75(self): return self.do_test_size(75) 383 def test_51(self): return self.do_test_size(51) 384 def test_99(self): return self.do_test_size(99) 385 def test_100(self): return self.do_test_size(100) 386 def test_76(self): return self.do_test_size(76) 387 def test_124(self): return self.do_test_size(124) 388 def test_125(self): return self.do_test_size(125) 389 def test_101(self): return self.do_test_size(101) 390 391 def upload(self, data): 392 u = upload.Data(data, None) 393 u.max_segment_size = 25 394 u.encoding_param_k = 25 395 u.encoding_param_happy = 1 396 u.encoding_param_n = 100 397 d = self.c0.upload(u) 398 d.addCallback(lambda ur: self.c0.create_node_from_uri(ur.uri)) 399 # returns a FileNode 400 return d 401 402 def do_test_size(self, size): 403 self.basedir = self.mktemp() 404 self.set_up_grid() 405 self.c0 = self.g.clients[0] 406 DATA = "p"*size 407 d = self.upload(DATA) 408 d.addCallback(lambda n: download_to_data(n)) 409 def _downloaded(newdata): 410 self.failUnlessEqual(newdata, DATA) 440 411 d.addCallback(_downloaded) 441 412 return d 442 443 def send(self, k_and_happy_and_n, AVAILABLE_SHARES, max_segment_size,444 bucket_modes, data):445 k, happy, n = k_and_happy_and_n446 NUM_SHARES = k_and_happy_and_n[2]447 if AVAILABLE_SHARES is None:448 AVAILABLE_SHARES = NUM_SHARES449 e = encode.Encoder()450 u = upload.Data(data, convergence="some convergence string")451 # force use of multiple segments by using a low max_segment_size452 u.max_segment_size = max_segment_size453 u.encoding_param_k = k454 u.encoding_param_happy = happy455 u.encoding_param_n = n456 eu = upload.EncryptAnUploadable(u)457 d = e.set_encrypted_uploadable(eu)458 459 shareholders = {}460 def _ready(res):461 k,happy,n = e.get_param("share_counts")462 assert n == NUM_SHARES # else we'll be completely confused463 servermap = {}464 for shnum in range(NUM_SHARES):465 mode = bucket_modes.get(shnum, "good")466 peer = FakeBucketReaderWriterProxy(mode, "peer%d" % shnum)467 shareholders[shnum] = peer468 servermap.setdefault(shnum, set()).add(peer.get_peerid())469 e.set_shareholders(shareholders, servermap)470 return e.start()471 d.addCallback(_ready)472 def _sent(res):473 d1 = u.get_encryption_key()474 d1.addCallback(lambda key: (res, key, shareholders))475 return d1476 d.addCallback(_sent)477 return d478 479 def recover(self, (res, key, shareholders), AVAILABLE_SHARES,480 recover_mode, consumer=None):481 verifycap = res482 483 if "corrupt_key" in recover_mode:484 # we corrupt the key, so that the decrypted data is corrupted and485 # will fail the plaintext hash check. Since we're manually486 # attaching shareholders, the fact that the storage index is also487 # corrupted doesn't matter.488 key = flip_bit(key)489 490 u = uri.CHKFileURI(key=key,491 uri_extension_hash=verifycap.uri_extension_hash,492 needed_shares=verifycap.needed_shares,493 total_shares=verifycap.total_shares,494 size=verifycap.size)495 496 sb = FakeStorageBroker()497 if not consumer:498 consumer = MemoryConsumer()499 innertarget = download.ConsumerAdapter(consumer)500 target = download.DecryptingTarget(innertarget, u.key)501 fd = download.CiphertextDownloader(sb, u.get_verify_cap(), target, monitor=Monitor())502 503 # we manually cycle the CiphertextDownloader through a number of steps that504 # would normally be sequenced by a Deferred chain in505 # CiphertextDownloader.start(), to give us more control over the process.506 # In particular, by bypassing _get_all_shareholders, we skip507 # permuted-peerlist selection.508 for shnum, bucket in shareholders.items():509 if shnum < AVAILABLE_SHARES and bucket.closed:510 fd.add_share_bucket(shnum, bucket)511 fd._got_all_shareholders(None)512 513 # Make it possible to obtain uri_extension from the shareholders.514 # Arrange for shareholders[0] to be the first, so we can selectively515 # corrupt the data it returns.516 uri_extension_sources = shareholders.values()517 uri_extension_sources.remove(shareholders[0])518 uri_extension_sources.insert(0, shareholders[0])519 520 d = defer.succeed(None)521 522 # have the CiphertextDownloader retrieve a copy of uri_extension itself523 d.addCallback(fd._obtain_uri_extension)524 525 if "corrupt_crypttext_hashes" in recover_mode:526 # replace everybody's crypttext hash trees with a different one527 # (computed over a different file), then modify our uri_extension528 # to reflect the new crypttext hash tree root529 def _corrupt_crypttext_hashes(unused):530 assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup531 assert fd._vup.crypttext_root_hash, fd._vup532 badhash = hashutil.tagged_hash("bogus", "data")533 bad_crypttext_hashes = [badhash] * fd._vup.num_segments534 badtree = hashtree.HashTree(bad_crypttext_hashes)535 for bucket in shareholders.values():536 bucket.crypttext_hashes = list(badtree)537 fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)538 fd._crypttext_hash_tree.set_hashes({0: badtree[0]})539 return fd._vup540 d.addCallback(_corrupt_crypttext_hashes)541 542 # also have the CiphertextDownloader ask for hash trees543 d.addCallback(fd._get_crypttext_hash_tree)544 545 d.addCallback(fd._download_all_segments)546 d.addCallback(fd._done)547 def _done(t):548 newdata = "".join(consumer.chunks)549 return (newdata, fd)550 d.addCallback(_done)551 return d552 553 def test_not_enough_shares(self):554 d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)555 def _done(res):556 self.failUnless(isinstance(res, Failure))557 self.failUnless(res.check(NotEnoughSharesError))558 d.addBoth(_done)559 return d560 561 def test_one_share_per_peer(self):562 return self.send_and_recover()563 564 def test_74(self):565 return self.send_and_recover(datalen=74)566 def test_75(self):567 return self.send_and_recover(datalen=75)568 def test_51(self):569 return self.send_and_recover(datalen=51)570 571 def test_99(self):572 return self.send_and_recover(datalen=99)573 def test_100(self):574 return self.send_and_recover(datalen=100)575 def test_76(self):576 return self.send_and_recover(datalen=76)577 578 def test_124(self):579 return self.send_and_recover(datalen=124)580 def test_125(self):581 return self.send_and_recover(datalen=125)582 def test_101(self):583 return self.send_and_recover(datalen=101)584 585 def test_pause(self):586 # use a download target that does pauseProducing/resumeProducing a587 # few times, then finishes588 c = PausingConsumer()589 d = self.send_and_recover(consumer=c)590 return d591 592 def test_pause_then_stop(self):593 # use a download target that pauses, then stops.594 c = PausingAndStoppingConsumer()595 d = self.shouldFail(download.DownloadStopped, "test_pause_then_stop",596 "our Consumer called stopProducing()",597 self.send_and_recover, consumer=c)598 return d599 600 def test_stop(self):601 # use a download targetthat does an immediate stop (ticket #473)602 c = StoppingConsumer()603 d = self.shouldFail(download.DownloadStopped, "test_stop",604 "our Consumer called stopProducing()",605 self.send_and_recover, consumer=c)606 return d607 608 # the following tests all use 4-out-of-10 encoding609 610 def test_bad_blocks(self):611 # the first 6 servers have bad blocks, which will be caught by the612 # blockhashes613 modemap = dict([(i, "bad block")614 for i in range(6)]615 + [(i, "good")616 for i in range(6, 10)])617 return self.send_and_recover((4,8,10), bucket_modes=modemap)618 619 def test_bad_blocks_failure(self):620 # the first 7 servers have bad blocks, which will be caught by the621 # blockhashes, and the download will fail622 modemap = dict([(i, "bad block")623 for i in range(7)]624 + [(i, "good")625 for i in range(7, 10)])626 d = self.send_and_recover((4,8,10), bucket_modes=modemap)627 def _done(res):628 self.failUnless(isinstance(res, Failure), res)629 self.failUnless(res.check(NotEnoughSharesError), res)630 d.addBoth(_done)631 return d632 633 def test_bad_blockhashes(self):634 # the first 6 servers have bad block hashes, so the blockhash tree635 # will not validate636 modemap = dict([(i, "bad blockhash")637 for i in range(6)]638 + [(i, "good")639 for i in range(6, 10)])640 return self.send_and_recover((4,8,10), bucket_modes=modemap)641 642 def test_bad_blockhashes_failure(self):643 # the first 7 servers have bad block hashes, so the blockhash tree644 # will not validate, and the download will fail645 modemap = dict([(i, "bad blockhash")646 for i in range(7)]647 + [(i, "good")648 for i in range(7, 10)])649 d = self.send_and_recover((4,8,10), bucket_modes=modemap)650 def _done(res):651 self.failUnless(isinstance(res, Failure))652 self.failUnless(res.check(NotEnoughSharesError), res)653 d.addBoth(_done)654 return d655 656 def test_bad_sharehashes(self):657 # the first 6 servers have bad block hashes, so the sharehash tree658 # will not validate659 modemap = dict([(i, "bad sharehash")660 for i in range(6)]661 + [(i, "good")662 for i in range(6, 10)])663 return self.send_and_recover((4,8,10), bucket_modes=modemap)664 665 def assertFetchFailureIn(self, fd, where):666 expected = {"uri_extension": 0,667 "crypttext_hash_tree": 0,668 }669 if where is not None:670 expected[where] += 1671 self.failUnlessEqual(fd._fetch_failures, expected)672 673 def test_good(self):674 # just to make sure the test harness works when we aren't675 # intentionally causing failures676 modemap = dict([(i, "good") for i in range(0, 10)])677 d = self.send_and_recover((4,8,10), bucket_modes=modemap)678 d.addCallback(self.assertFetchFailureIn, None)679 return d680 681 def test_bad_uri_extension(self):682 # the first server has a bad uri_extension block, so we will fail683 # over to a different server.684 modemap = dict([(i, "bad uri_extension") for i in range(1)] +685 [(i, "good") for i in range(1, 10)])686 d = self.send_and_recover((4,8,10), bucket_modes=modemap)687 d.addCallback(self.assertFetchFailureIn, "uri_extension")688 return d689 690 def test_bad_crypttext_hashroot(self):691 # the first server has a bad crypttext hashroot, so we will fail692 # over to a different server.693 modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +694 [(i, "good") for i in range(1, 10)])695 d = self.send_and_recover((4,8,10), bucket_modes=modemap)696 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")697 return d698 699 def test_bad_crypttext_hashes(self):700 # the first server has a bad crypttext hash block, so we will fail701 # over to a different server.702 modemap = dict([(i, "bad crypttext hash") for i in range(1)] +703 [(i, "good") for i in range(1, 10)])704 d = self.send_and_recover((4,8,10), bucket_modes=modemap)705 d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")706 return d707 708 def test_bad_crypttext_hashes_failure(self):709 # to test that the crypttext merkle tree is really being applied, we710 # sneak into the download process and corrupt two things: we replace711 # everybody's crypttext hashtree with a bad version (computed over712 # bogus data), and we modify the supposedly-validated uri_extension713 # block to match the new crypttext hashtree root. The download714 # process should notice that the crypttext coming out of FEC doesn't715 # match the tree, and fail.716 717 modemap = dict([(i, "good") for i in range(0, 10)])718 d = self.send_and_recover((4,8,10), bucket_modes=modemap,719 recover_mode=("corrupt_crypttext_hashes"))720 def _done(res):721 self.failUnless(isinstance(res, Failure))722 self.failUnless(res.check(hashtree.BadHashError), res)723 d.addBoth(_done)724 return d725 726 def OFF_test_bad_plaintext(self):727 # faking a decryption failure is easier: just corrupt the key728 modemap = dict([(i, "good") for i in range(0, 10)])729 d = self.send_and_recover((4,8,10), bucket_modes=modemap,730 recover_mode=("corrupt_key"))731 def _done(res):732 self.failUnless(isinstance(res, Failure))733 self.failUnless(res.check(hashtree.BadHashError), res)734 d.addBoth(_done)735 return d736 737 def test_bad_sharehashes_failure(self):738 # all ten servers have bad share hashes, so the sharehash tree739 # will not validate, and the download will fail740 modemap = dict([(i, "bad sharehash")741 for i in range(10)])742 d = self.send_and_recover((4,8,10), bucket_modes=modemap)743 def _done(res):744 self.failUnless(isinstance(res, Failure))745 self.failUnless(res.check(NotEnoughSharesError))746 d.addBoth(_done)747 return d748 749 def test_missing_sharehashes(self):750 # the first 6 servers are missing their sharehashes, so the751 # sharehash tree will not validate752 modemap = dict([(i, "missing sharehash")753 for i in range(6)]754 + [(i, "good")755 for i in range(6, 10)])756 return self.send_and_recover((4,8,10), bucket_modes=modemap)757 758 def test_missing_sharehashes_failure(self):759 # all servers are missing their sharehashes, so the sharehash tree will not validate,760 # and the download will fail761 modemap = dict([(i, "missing sharehash")762 for i in range(10)])763 d = self.send_and_recover((4,8,10), bucket_modes=modemap)764 def _done(res):765 self.failUnless(isinstance(res, Failure), res)766 self.failUnless(res.check(NotEnoughSharesError), res)767 d.addBoth(_done)768 return d769 770 def test_lost_one_shareholder(self):771 # we have enough shareholders when we start, but one segment in we772 # lose one of them. The upload should still succeed, as long as we773 # still have 'servers_of_happiness' peers left.774 modemap = dict([(i, "good") for i in range(9)] +775 [(i, "lost") for i in range(9, 10)])776 return self.send_and_recover((4,8,10), bucket_modes=modemap)777 778 def test_lost_one_shareholder_early(self):779 # we have enough shareholders when we choose peers, but just before780 # we send the 'start' message, we lose one of them. The upload should781 # still succeed, as long as we still have 'servers_of_happiness' peers782 # left.783 modemap = dict([(i, "good") for i in range(9)] +784 [(i, "lost-early") for i in range(9, 10)])785 return self.send_and_recover((4,8,10), bucket_modes=modemap)786 787 def test_lost_many_shareholders(self):788 # we have enough shareholders when we start, but one segment in we789 # lose all but one of them. The upload should fail.790 modemap = dict([(i, "good") for i in range(1)] +791 [(i, "lost") for i in range(1, 10)])792 d = self.send_and_recover((4,8,10), bucket_modes=modemap)793 def _done(res):794 self.failUnless(isinstance(res, Failure))795 self.failUnless(res.check(UploadUnhappinessError), res)796 d.addBoth(_done)797 return d798 799 def test_lost_all_shareholders(self):800 # we have enough shareholders when we start, but one segment in we801 # lose all of them. The upload should fail.802 modemap = dict([(i, "lost") for i in range(10)])803 d = self.send_and_recover((4,8,10), bucket_modes=modemap)804 def _done(res):805 self.failUnless(isinstance(res, Failure))806 self.failUnless(res.check(UploadUnhappinessError))807 d.addBoth(_done)808 return d -
TabularUnified src/allmydata/test/test_filenode.py ¶
r7b7b0c9 r63b61ce 3 3 from allmydata import uri, client 4 4 from allmydata.monitor import Monitor 5 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode 5 from allmydata.immutable.literal import LiteralFileNode 6 from allmydata.immutable.filenode import ImmutableFileNode 6 7 from allmydata.mutable.filenode import MutableFileNode 7 from allmydata.util import hashutil , cachedir8 from allmydata.util import hashutil 8 9 from allmydata.util.consumer import download_to_data 9 10 … … 31 32 total_shares=10, 32 33 size=1000) 33 cf = cachedir.CacheFile("none") 34 fn1 = ImmutableFileNode(u, None, None, None, None, cf) 35 fn2 = ImmutableFileNode(u, None, None, None, None, cf) 34 fn1 = ImmutableFileNode(u, None, None, None, None) 35 fn2 = ImmutableFileNode(u, None, None, None, None) 36 36 self.failUnlessEqual(fn1, fn2) 37 37 self.failIfEqual(fn1, "I am not a filenode") -
TabularUnified src/allmydata/test/test_hung_server.py ¶
r7b7b0c9 r63b61ce 24 24 25 25 timeout = 240 26 skip="not ready" 26 27 27 28 def _break(self, servers): … … 114 115 else: 115 116 d = download_to_data(n) 116 stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME 117 #stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME 118 stage_4_d = None 117 119 return (d, stage_4_d,) 118 120 … … 142 144 else: 143 145 return self.shouldFail(NotEnoughSharesError, self.basedir, 144 " Failed to get enough shareholders",146 "ran out of shares", 145 147 self._download_and_check) 146 148 … … 235 237 236 238 def test_failover_during_stage_4(self): 239 raise unittest.SkipTest("needs rewrite") 237 240 # See #287 238 241 d = defer.succeed(None) -
TabularUnified src/allmydata/test/test_immutable.py ¶
r7b7b0c9 r63b61ce 6 6 import random 7 7 8 class Test(common.ShareManglingMixin, unittest.TestCase):8 class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase): 9 9 def test_test_code(self): 10 10 # The following process of stashing the shares, running … … 19 19 d.addCallback(_stash_it) 20 20 21 # The following process of deleting 8 of the shares and asserting that you can't 22 # download it is more to test this test code than to test the Tahoe code... 21 # The following process of deleting 8 of the shares and asserting 22 # that you can't download it is more to test this test code than to 23 # test the Tahoe code... 23 24 def _then_delete_8(unused=None): 24 25 self.replace_shares(stash[0], storage_index=self.uri.get_storage_index()) … … 43 44 44 45 def test_download(self): 45 """ Basic download. (This functionality is more or less already tested by test code in46 other modules, but this module is also going to test some more specific things about47 immutable download.)46 """ Basic download. (This functionality is more or less already 47 tested by test code in other modules, but this module is also going 48 to test some more specific things about immutable download.) 48 49 """ 49 50 d = defer.succeed(None) … … 51 52 def _after_download(unused=None): 52 53 after_download_reads = self._count_reads() 53 self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) 54 #print before_download_reads, after_download_reads 55 self.failIf(after_download_reads-before_download_reads > 27, 56 (after_download_reads, before_download_reads)) 54 57 d.addCallback(self._download_and_check_plaintext) 55 58 d.addCallback(_after_download) … … 57 60 58 61 def test_download_from_only_3_remaining_shares(self): 59 """ Test download after 7 random shares (of the 10) have been removed. """ 62 """ Test download after 7 random shares (of the 10) have been 63 removed.""" 60 64 d = defer.succeed(None) 61 65 def _then_delete_7(unused=None): … … 66 70 def _after_download(unused=None): 67 71 after_download_reads = self._count_reads() 72 #print before_download_reads, after_download_reads 68 73 self.failIf(after_download_reads-before_download_reads > 27, (after_download_reads, before_download_reads)) 69 74 d.addCallback(self._download_and_check_plaintext) … … 72 77 73 78 def test_download_from_only_3_shares_with_good_crypttext_hash(self): 74 """ Test download after 7 random shares (of the 10) have had their crypttext hash tree corrupted. """ 79 """ Test download after 7 random shares (of the 10) have had their 80 crypttext hash tree corrupted.""" 75 81 d = defer.succeed(None) 76 82 def _then_corrupt_7(unused=None): … … 85 91 86 92 def test_download_abort_if_too_many_missing_shares(self): 87 """ Test that download gives up quickly when it realizes there aren't enough shares out 88 there.""" 89 d = defer.succeed(None) 90 def _then_delete_8(unused=None): 91 for i in range(8): 92 self._delete_a_share() 93 d.addCallback(_then_delete_8) 94 95 before_download_reads = self._count_reads() 96 def _attempt_to_download(unused=None): 97 d2 = download_to_data(self.n) 98 99 def _callb(res): 100 self.fail("Should have gotten an error from attempt to download, not %r" % (res,)) 101 def _errb(f): 102 self.failUnless(f.check(NotEnoughSharesError)) 103 d2.addCallbacks(_callb, _errb) 104 return d2 105 106 d.addCallback(_attempt_to_download) 107 108 def _after_attempt(unused=None): 109 after_download_reads = self._count_reads() 110 # To pass this test, you are required to give up before actually trying to read any 111 # share data. 112 self.failIf(after_download_reads-before_download_reads > 0, (after_download_reads, before_download_reads)) 113 d.addCallback(_after_attempt) 93 """ Test that download gives up quickly when it realizes there aren't 94 enough shares out there.""" 95 for i in range(8): 96 self._delete_a_share() 97 d = self.shouldFail(NotEnoughSharesError, "delete 8", None, 98 download_to_data, self.n) 99 # the new downloader pipelines a bunch of read requests in parallel, 100 # so don't bother asserting anything about the number of reads 114 101 return d 115 102 116 103 def test_download_abort_if_too_many_corrupted_shares(self): 117 """ Test that download gives up quickly when it realizes there aren't enough uncorrupted 118 shares out there. It should be able to tell because the corruption occurs in the 119 sharedata version number, which it checks first.""" 104 """Test that download gives up quickly when it realizes there aren't 105 enough uncorrupted shares out there. It should be able to tell 106 because the corruption occurs in the sharedata version number, which 107 it checks first.""" 120 108 d = defer.succeed(None) 121 109 def _then_corrupt_8(unused=None): … … 141 129 def _after_attempt(unused=None): 142 130 after_download_reads = self._count_reads() 143 # To pass this test, you are required to give up before reading all of the share 144 # data. Actually, we could give up sooner than 45 reads, but currently our download 145 # code does 45 reads. This test then serves as a "performance regression detector" 146 # -- if you change download code so that it takes *more* reads, then this test will 147 # fail. 148 self.failIf(after_download_reads-before_download_reads > 45, (after_download_reads, before_download_reads)) 131 #print before_download_reads, after_download_reads 132 # To pass this test, you are required to give up before reading 133 # all of the share data. Actually, we could give up sooner than 134 # 45 reads, but currently our download code does 45 reads. This 135 # test then serves as a "performance regression detector" -- if 136 # you change download code so that it takes *more* reads, then 137 # this test will fail. 138 self.failIf(after_download_reads-before_download_reads > 45, 139 (after_download_reads, before_download_reads)) 149 140 d.addCallback(_after_attempt) 150 141 return d 151 142 152 143 153 # XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example 144 # XXX extend these tests to show bad behavior of various kinds from servers: 145 # raising exception from each remove_foo() method, for example 154 146 155 147 # XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit 156 148 149 # TODO: delete this whole file -
TabularUnified src/allmydata/test/test_mutable.py ¶
r7b7b0c9 r63b61ce 198 198 keygen.set_default_keysize(522) 199 199 nodemaker = NodeMaker(storage_broker, sh, None, 200 None, None, None,200 None, None, 201 201 {"k": 3, "n": 10}, keygen) 202 202 return nodemaker -
TabularUnified src/allmydata/test/test_repairer.py ¶
r7b7b0c9 r63b61ce 4 4 from allmydata import check_results 5 5 from allmydata.interfaces import NotEnoughSharesError 6 from allmydata.immutable import repairer,upload6 from allmydata.immutable import upload 7 7 from allmydata.util.consumer import download_to_data 8 8 from twisted.internet import defer … … 364 364 DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY 365 365 366 class DownUpConnector(unittest.TestCase):367 def test_deferred_satisfaction(self):368 duc = repairer.DownUpConnector()369 duc.registerProducer(None, True) # just because you have to call registerProducer first370 # case 1: total data in buf is < requested data at time of request371 duc.write('\x01')372 d = duc.read_encrypted(2, False)373 def _then(data):374 self.failUnlessEqual(len(data), 2)375 self.failUnlessEqual(data[0], '\x01')376 self.failUnlessEqual(data[1], '\x02')377 d.addCallback(_then)378 duc.write('\x02')379 return d380 381 def test_extra(self):382 duc = repairer.DownUpConnector()383 duc.registerProducer(None, True) # just because you have to call registerProducer first384 # case 1: total data in buf is < requested data at time of request385 duc.write('\x01')386 d = duc.read_encrypted(2, False)387 def _then(data):388 self.failUnlessEqual(len(data), 2)389 self.failUnlessEqual(data[0], '\x01')390 self.failUnlessEqual(data[1], '\x02')391 d.addCallback(_then)392 duc.write('\x02\0x03')393 return d394 395 def test_short_reads_1(self):396 # You don't get fewer bytes than you requested -- instead you get no callback at all.397 duc = repairer.DownUpConnector()398 duc.registerProducer(None, True) # just because you have to call registerProducer first399 400 d = duc.read_encrypted(2, False)401 duc.write('\x04')402 403 def _callb(res):404 self.fail("Shouldn't have gotten this callback res: %s" % (res,))405 d.addCallback(_callb)406 407 # Also in the other order of read-vs-write:408 duc2 = repairer.DownUpConnector()409 duc2.registerProducer(None, True) # just because you have to call registerProducer first410 duc2.write('\x04')411 d = duc2.read_encrypted(2, False)412 413 def _callb2(res):414 self.fail("Shouldn't have gotten this callback res: %s" % (res,))415 d.addCallback(_callb2)416 417 # But once the DUC is closed then you *do* get short reads.418 duc3 = repairer.DownUpConnector()419 duc3.registerProducer(None, True) # just because you have to call registerProducer first420 421 d = duc3.read_encrypted(2, False)422 duc3.write('\x04')423 duc3.close()424 def _callb3(res):425 self.failUnlessEqual(len(res), 1)426 self.failUnlessEqual(res[0], '\x04')427 d.addCallback(_callb3)428 return d429 430 def test_short_reads_2(self):431 # Also in the other order of read-vs-write.432 duc = repairer.DownUpConnector()433 duc.registerProducer(None, True) # just because you have to call registerProducer first434 435 duc.write('\x04')436 d = duc.read_encrypted(2, False)437 duc.close()438 439 def _callb(res):440 self.failUnlessEqual(len(res), 1)441 self.failUnlessEqual(res[0], '\x04')442 d.addCallback(_callb)443 return d444 445 def test_short_reads_3(self):446 # Also if it is closed before the read.447 duc = repairer.DownUpConnector()448 duc.registerProducer(None, True) # just because you have to call registerProducer first449 450 duc.write('\x04')451 duc.close()452 d = duc.read_encrypted(2, False)453 def _callb(res):454 self.failUnlessEqual(len(res), 1)455 self.failUnlessEqual(res[0], '\x04')456 d.addCallback(_callb)457 return d458 459 366 class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, 460 367 common.ShouldFailMixin): -
TabularUnified src/allmydata/test/test_system.py ¶
r7b7b0c9 r63b61ce 10 10 from allmydata.storage.server import si_a2b 11 11 from allmydata.immutable import offloaded, upload 12 from allmydata.immutable.filenode import ImmutableFileNode, LiteralFileNode 12 from allmydata.immutable.literal import LiteralFileNode 13 from allmydata.immutable.filenode import ImmutableFileNode 13 14 from allmydata.util import idlib, mathutil 14 15 from allmydata.util import log, base32 -
TabularUnified src/allmydata/test/test_upload.py ¶
r7b7b0c9 r63b61ce 2087 2087 # have a download fail 2088 2088 # cancel a download (need to implement more cancel stuff) 2089 2090 # from test_encode: 2091 # NoNetworkGrid, upload part of ciphertext, kill server, continue upload 2092 # check with Kevan, they want to live in test_upload, existing tests might cover 2093 # def test_lost_one_shareholder(self): # these are upload-side tests 2094 # def test_lost_one_shareholder_early(self): 2095 # def test_lost_many_shareholders(self): 2096 # def test_lost_all_shareholders(self): -
TabularUnified src/allmydata/test/test_web.py ¶
r7b7b0c9 r63b61ce 13 13 from allmydata.storage.shares import get_share_file 14 14 from allmydata.storage_client import StorageFarmBroker 15 from allmydata.immutable import upload, download 15 from allmydata.immutable import upload 16 from allmydata.immutable.downloader.status import DownloadStatus 16 17 from allmydata.dirnode import DirectoryNode 17 18 from allmydata.nodemaker import NodeMaker … … 76 77 class FakeHistory: 77 78 _all_upload_status = [upload.UploadStatus()] 78 _all_download_status = [ download.DownloadStatus()]79 _all_download_status = [DownloadStatus("storage_index", 1234)] 79 80 _all_mapupdate_statuses = [servermap.UpdateStatus()] 80 81 _all_publish_statuses = [publish.PublishStatus()] … … 112 113 self.uploader.setServiceParent(self) 113 114 self.nodemaker = FakeNodeMaker(None, self._secret_holder, None, 114 self.uploader, None, None,115 self.uploader, None, 115 116 None, None) 116 117 … … 4188 4189 "severe corruption. You should perform a filecheck on " 4189 4190 "this object to learn more. The full error message is: " 4190 " Failed to get enough shareholders: have 0, need 3")4191 "no shares (need 3). Last failure: None") 4191 4192 self.failUnlessReallyEqual(exp, body) 4192 4193 d.addCallback(_check_zero_shares) … … 4200 4201 self.failIf("<html>" in body, body) 4201 4202 body = " ".join(body.strip().split()) 4202 exp= ("NotEnoughSharesError: This indicates that some "4203 msg = ("NotEnoughSharesError: This indicates that some " 4203 4204 "servers were unavailable, or that shares have been " 4204 4205 "lost to server departure, hard drive failure, or disk " 4205 4206 "corruption. You should perform a filecheck on " 4206 4207 "this object to learn more. The full error message is:" 4207 " Failed to get enough shareholders: have 1, need 3") 4208 self.failUnlessReallyEqual(exp, body) 4208 " ran out of shares: %d complete, %d pending, 0 overdue," 4209 " 0 unused, need 3. Last failure: None") 4210 msg1 = msg % (1, 0) 4211 msg2 = msg % (0, 1) 4212 self.failUnless(body == msg1 or body == msg2, body) 4209 4213 d.addCallback(_check_one_share) 4210 4214
Note: See TracChangeset
for help on using the changeset viewer.