Changeset debefdc in trunk
- Timestamp:
- 2019-08-23T12:45:48Z (6 years ago)
- Branches:
- master
- Children:
- 5f142975
- Parents:
- 85980038 (diff), 7eb17f1 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent. - git-author:
- Jean-Paul Calderone <exarkun@…> (2019-08-23 12:45:48)
- git-committer:
- GitHub <noreply@…> (2019-08-23 12:45:48)
- Files:
-
- 1 added
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/storage/server.py ¶
r85980038 rdebefdc 1 1 import os, re, weakref, struct, time 2 import six 2 3 3 4 from foolscap.api import Referenceable … … 392 393 393 394 This method is not for client use. 394 """ 395 395 396 :note: Only for immutable shares. 397 """ 396 398 # since all shares get the same lease data, we just grab the leases 397 399 # from the first share … … 403 405 return iter([]) 404 406 405 def remote_slot_testv_and_readv_and_writev(self, storage_index, 406 secrets, 407 test_and_write_vectors, 408 read_vector): 409 start = time.time() 410 self.count("writev") 411 si_s = si_b2a(storage_index) 412 log.msg("storage: slot_writev %s" % si_s) 413 si_dir = storage_index_to_dir(storage_index) 414 (write_enabler, renew_secret, cancel_secret) = secrets 415 # shares exist if there is a file for them 416 bucketdir = os.path.join(self.sharedir, si_dir) 407 def get_slot_leases(self, storage_index): 408 """ 409 This method is not for client use. 410 411 :note: Only for mutable shares. 412 413 :return: An iterable of the leases attached to this slot. 414 """ 415 for _, share_filename in self._get_bucket_shares(storage_index): 416 share = MutableShareFile(share_filename) 417 return share.get_leases() 418 return [] 419 420 def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s): 421 """ 422 Gather up existing mutable shares for the given storage index. 423 424 :param bytes bucketdir: The filesystem path containing shares for the 425 given storage index. 426 427 :param bytes write_enabler: The write enabler secret for the shares. 428 429 :param bytes si_s: The storage index in encoded (base32) form. 430 431 :raise BadWriteEnablerError: If the write enabler is not correct for 432 any of the collected shares. 433 434 :return dict[int, MutableShareFile]: The collected shares in a mapping 435 from integer share numbers to ``MutableShareFile`` instances. 436 """ 417 437 shares = {} 418 438 if os.path.isdir(bucketdir): 439 # shares exist if there is a file for them 419 440 for sharenum_s in os.listdir(bucketdir): 420 441 try: … … 426 447 msf.check_write_enabler(write_enabler, si_s) 427 448 shares[sharenum] = msf 428 # write_enabler is good for all existing shares. 429 430 # Now evaluate test vectors. 431 testv_is_good = True 449 return shares 450 451 def _evaluate_test_vectors(self, test_and_write_vectors, shares): 452 """ 453 Execute test vectors against share data. 454 455 :param test_and_write_vectors: See 456 ``allmydata.interfaces.TestAndWriteVectorsForShares``. 457 458 :param dict[int, MutableShareFile] shares: The shares against which to 459 execute the vectors. 460 461 :return bool: ``True`` if and only if all of the test vectors succeed 462 against the given shares. 463 """ 432 464 for sharenum in test_and_write_vectors: 433 465 (testv, datav, new_length) = test_and_write_vectors[sharenum] … … 435 467 if not shares[sharenum].check_testv(testv): 436 468 self.log("testv failed: [%d]: %r" % (sharenum, testv)) 437 testv_is_good = False 438 break 469 return False 439 470 else: 440 471 # compare the vectors against an empty share, in which all … … 443 474 self.log("testv failed (empty): [%d] %r" % (sharenum, 444 475 testv)) 445 testv_is_good = False 446 break 447 448 # now gather the read vectors, before we do any writes 476 return False 477 return True 478 479 def _evaluate_read_vectors(self, read_vector, shares): 480 """ 481 Execute read vectors against share data. 482 483 :param read_vector: See ``allmydata.interfaces.ReadVector``. 484 485 :param dict[int, MutableShareFile] shares: The shares against which to 486 execute the vector. 487 488 :return dict[int, bytes]: The data read from the shares. 489 """ 449 490 read_data = {} 450 491 for sharenum, share in shares.items(): 451 492 read_data[sharenum] = share.readv(read_vector) 452 493 return read_data 494 495 def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares): 496 """ 497 Execute write vectors against share data. 498 499 :param bytes bucketdir: The parent directory holding the shares. This 500 is removed if the last share is removed from it. If shares are 501 created, they are created in it. 502 503 :param secrets: A tuple of ``WriteEnablerSecret``, 504 ``LeaseRenewSecret``, and ``LeaseCancelSecret``. These secrets 505 are used to initialize new shares. 506 507 :param test_and_write_vectors: See 508 ``allmydata.interfaces.TestAndWriteVectorsForShares``. 509 510 :param dict[int, MutableShareFile]: The shares against which to 511 execute the vectors. 512 513 :return dict[int, MutableShareFile]: The shares which still exist 514 after applying the vectors. 515 """ 516 remaining_shares = {} 517 518 for sharenum in test_and_write_vectors: 519 (testv, datav, new_length) = test_and_write_vectors[sharenum] 520 if new_length == 0: 521 if sharenum in shares: 522 shares[sharenum].unlink() 523 else: 524 if sharenum not in shares: 525 # allocate a new share 526 allocated_size = 2000 # arbitrary, really 527 share = self._allocate_slot_share(bucketdir, secrets, 528 sharenum, 529 allocated_size, 530 owner_num=0) 531 shares[sharenum] = share 532 shares[sharenum].writev(datav, new_length) 533 remaining_shares[sharenum] = shares[sharenum] 534 535 if new_length == 0: 536 # delete bucket directories that exist but are empty. They 537 # might not exist if a client showed up and asked us to 538 # truncate a share we weren't even holding. 539 if os.path.exists(bucketdir) and [] == os.listdir(bucketdir): 540 os.rmdir(bucketdir) 541 return remaining_shares 542 543 def _make_lease_info(self, renew_secret, cancel_secret): 544 """ 545 :return LeaseInfo: Information for a new lease for a share. 546 """ 453 547 ownerid = 1 # TODO 454 548 expire_time = time.time() + 31*24*60*60 # one month … … 456 550 renew_secret, cancel_secret, 457 551 expire_time, self.my_nodeid) 552 return lease_info 553 554 def _add_or_renew_leases(self, shares, lease_info): 555 """ 556 Put the given lease onto the given shares. 557 558 :param dict[int, MutableShareFile] shares: The shares to put the lease 559 onto. 560 561 :param LeaseInfo lease_info: The lease to put on the shares. 562 """ 563 for share in six.viewvalues(shares): 564 share.add_or_renew_lease(lease_info) 565 566 def slot_testv_and_readv_and_writev( 567 self, 568 storage_index, 569 secrets, 570 test_and_write_vectors, 571 read_vector, 572 renew_leases, 573 ): 574 """ 575 Read data from shares and conditionally write some data to them. 576 577 :param bool renew_leases: If and only if this is ``True`` and the test 578 vectors pass then shares in this slot will also have an updated 579 lease applied to them. 580 581 See ``allmydata.interfaces.RIStorageServer`` for details about other 582 parameters and return value. 583 """ 584 start = time.time() 585 self.count("writev") 586 si_s = si_b2a(storage_index) 587 log.msg("storage: slot_writev %s" % si_s) 588 si_dir = storage_index_to_dir(storage_index) 589 (write_enabler, renew_secret, cancel_secret) = secrets 590 bucketdir = os.path.join(self.sharedir, si_dir) 591 592 # If collection succeeds we know the write_enabler is good for all 593 # existing shares. 594 shares = self._collect_mutable_shares_for_storage_index( 595 bucketdir, 596 write_enabler, 597 si_s, 598 ) 599 600 # Now evaluate test vectors. 601 testv_is_good = self._evaluate_test_vectors( 602 test_and_write_vectors, 603 shares, 604 ) 605 606 # now gather the read vectors, before we do any writes 607 read_data = self._evaluate_read_vectors( 608 read_vector, 609 shares, 610 ) 458 611 459 612 if testv_is_good: 460 613 # now apply the write vectors 461 for sharenum in test_and_write_vectors: 462 (testv, datav, new_length) = test_and_write_vectors[sharenum] 463 if new_length == 0: 464 if sharenum in shares: 465 shares[sharenum].unlink() 466 else: 467 if sharenum not in shares: 468 # allocate a new share 469 allocated_size = 2000 # arbitrary, really 470 share = self._allocate_slot_share(bucketdir, secrets, 471 sharenum, 472 allocated_size, 473 owner_num=0) 474 shares[sharenum] = share 475 shares[sharenum].writev(datav, new_length) 476 # and update the lease 477 shares[sharenum].add_or_renew_lease(lease_info) 478 479 if new_length == 0: 480 # delete empty bucket directories 481 if not os.listdir(bucketdir): 482 os.rmdir(bucketdir) 483 614 remaining_shares = self._evaluate_write_vectors( 615 bucketdir, 616 secrets, 617 test_and_write_vectors, 618 shares, 619 ) 620 if renew_leases: 621 lease_info = self._make_lease_info(renew_secret, cancel_secret) 622 self._add_or_renew_leases(remaining_shares, lease_info) 484 623 485 624 # all done 486 625 self.add_latency("writev", time.time() - start) 487 626 return (testv_is_good, read_data) 627 628 def remote_slot_testv_and_readv_and_writev(self, storage_index, 629 secrets, 630 test_and_write_vectors, 631 read_vector): 632 return self.slot_testv_and_readv_and_writev( 633 storage_index, 634 secrets, 635 test_and_write_vectors, 636 read_vector, 637 renew_leases=True, 638 ) 488 639 489 640 def _allocate_slot_share(self, bucketdir, secrets, sharenum, -
TabularUnified src/allmydata/test/test_storage.py ¶
r85980038 rdebefdc 1369 1369 self.failUnless(os.path.exists(prefixdir), prefixdir) 1370 1370 self.failIf(os.path.exists(bucketdir), bucketdir) 1371 1372 def test_writev_without_renew_lease(self): 1373 """ 1374 The helper method ``slot_testv_and_readv_and_writev`` does not renew 1375 leases if ``False`` is passed for the ``renew_leases`` parameter. 1376 """ 1377 ss = self.create("test_writev_without_renew_lease") 1378 1379 storage_index = "si2" 1380 secrets = ( 1381 self.write_enabler(storage_index), 1382 self.renew_secret(storage_index), 1383 self.cancel_secret(storage_index), 1384 ) 1385 1386 sharenum = 3 1387 datav = [(0, b"Hello, world")] 1388 1389 ss.slot_testv_and_readv_and_writev( 1390 storage_index=storage_index, 1391 secrets=secrets, 1392 test_and_write_vectors={ 1393 sharenum: ([], datav, None), 1394 }, 1395 read_vector=[], 1396 renew_leases=False, 1397 ) 1398 leases = list(ss.get_slot_leases(storage_index)) 1399 self.assertEqual([], leases) 1400 1401 def test_get_slot_leases_empty_slot(self): 1402 """ 1403 When ``get_slot_leases`` is called for a slot for which the server has no 1404 shares, it returns an empty iterable. 1405 """ 1406 ss = self.create(b"test_get_slot_leases_empty_slot") 1407 self.assertEqual( 1408 list(ss.get_slot_leases(b"si1")), 1409 [], 1410 ) 1411 1412 def test_remove_non_present(self): 1413 """ 1414 A write vector which would remove a share completely is applied as a no-op 1415 by a server which does not have the share. 1416 """ 1417 ss = self.create("test_remove_non_present") 1418 1419 storage_index = "si1" 1420 secrets = ( 1421 self.write_enabler(storage_index), 1422 self.renew_secret(storage_index), 1423 self.cancel_secret(storage_index), 1424 ) 1425 1426 sharenum = 3 1427 testv = [] 1428 datav = [] 1429 new_length = 0 1430 read_vector = [] 1431 1432 # We don't even need to create any shares to exercise this 1433 # functionality. Just go straight to sending a truncate-to-zero 1434 # write. 1435 testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev( 1436 storage_index=storage_index, 1437 secrets=secrets, 1438 test_and_write_vectors={ 1439 sharenum: (testv, datav, new_length), 1440 }, 1441 read_vector=read_vector, 1442 ) 1443 1444 self.assertTrue(testv_is_good) 1445 self.assertEqual({}, read_data) 1371 1446 1372 1447
Note: See TracChangeset
for help on using the changeset viewer.