Changeset debefdc in trunk


Ignore:
Timestamp:
2019-08-23T12:45:48Z (6 years ago)
Author:
GitHub <noreply@…>
Branches:
master
Children:
5f142975
Parents:
85980038 (diff), 7eb17f1 (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
git-author:
Jean-Paul Calderone <exarkun@…> (2019-08-23 12:45:48)
git-committer:
GitHub <noreply@…> (2019-08-23 12:45:48)
Message:

Merge pull request #648 from tahoe-lafs/3241.refactor-mutable-share-write-implementation

Refactor mutable share write implementation

Fixes: ticket:3241

Files:
1 added
2 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/storage/server.py

    r85980038 rdebefdc  
    11import os, re, weakref, struct, time
     2import six
    23
    34from foolscap.api import Referenceable
     
    392393
    393394        This method is not for client use.
    394         """
    395 
     395
     396        :note: Only for immutable shares.
     397        """
    396398        # since all shares get the same lease data, we just grab the leases
    397399        # from the first share
     
    403405            return iter([])
    404406
    405     def remote_slot_testv_and_readv_and_writev(self, storage_index,
    406                                                secrets,
    407                                                test_and_write_vectors,
    408                                                read_vector):
    409         start = time.time()
    410         self.count("writev")
    411         si_s = si_b2a(storage_index)
    412         log.msg("storage: slot_writev %s" % si_s)
    413         si_dir = storage_index_to_dir(storage_index)
    414         (write_enabler, renew_secret, cancel_secret) = secrets
    415         # shares exist if there is a file for them
    416         bucketdir = os.path.join(self.sharedir, si_dir)
     407    def get_slot_leases(self, storage_index):
     408        """
     409        This method is not for client use.
     410
     411        :note: Only for mutable shares.
     412
     413        :return: An iterable of the leases attached to this slot.
     414        """
     415        for _, share_filename in self._get_bucket_shares(storage_index):
     416            share = MutableShareFile(share_filename)
     417            return share.get_leases()
     418        return []
     419
     420    def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s):
     421        """
     422        Gather up existing mutable shares for the given storage index.
     423
     424        :param bytes bucketdir: The filesystem path containing shares for the
     425            given storage index.
     426
     427        :param bytes write_enabler: The write enabler secret for the shares.
     428
     429        :param bytes si_s: The storage index in encoded (base32) form.
     430
     431        :raise BadWriteEnablerError: If the write enabler is not correct for
     432            any of the collected shares.
     433
     434        :return dict[int, MutableShareFile]: The collected shares in a mapping
     435            from integer share numbers to ``MutableShareFile`` instances.
     436        """
    417437        shares = {}
    418438        if os.path.isdir(bucketdir):
     439            # shares exist if there is a file for them
    419440            for sharenum_s in os.listdir(bucketdir):
    420441                try:
     
    426447                msf.check_write_enabler(write_enabler, si_s)
    427448                shares[sharenum] = msf
    428         # write_enabler is good for all existing shares.
    429 
    430         # Now evaluate test vectors.
    431         testv_is_good = True
     449        return shares
     450
     451    def _evaluate_test_vectors(self, test_and_write_vectors, shares):
     452        """
     453        Execute test vectors against share data.
     454
     455        :param test_and_write_vectors: See
     456            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
     457
     458        :param dict[int, MutableShareFile] shares: The shares against which to
     459            execute the vectors.
     460
     461        :return bool: ``True`` if and only if all of the test vectors succeed
     462            against the given shares.
     463        """
    432464        for sharenum in test_and_write_vectors:
    433465            (testv, datav, new_length) = test_and_write_vectors[sharenum]
     
    435467                if not shares[sharenum].check_testv(testv):
    436468                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
    437                     testv_is_good = False
    438                     break
     469                    return False
    439470            else:
    440471                # compare the vectors against an empty share, in which all
     
    443474                    self.log("testv failed (empty): [%d] %r" % (sharenum,
    444475                                                                testv))
    445                     testv_is_good = False
    446                     break
    447 
    448         # now gather the read vectors, before we do any writes
     476                    return False
     477        return True
     478
     479    def _evaluate_read_vectors(self, read_vector, shares):
     480        """
     481        Execute read vectors against share data.
     482
     483        :param read_vector: See ``allmydata.interfaces.ReadVector``.
     484
     485        :param dict[int, MutableShareFile] shares: The shares against which to
     486            execute the vector.
     487
     488        :return dict[int, bytes]: The data read from the shares.
     489        """
    449490        read_data = {}
    450491        for sharenum, share in shares.items():
    451492            read_data[sharenum] = share.readv(read_vector)
    452 
     493        return read_data
     494
     495    def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares):
     496        """
     497        Execute write vectors against share data.
     498
     499        :param bytes bucketdir: The parent directory holding the shares.  This
     500            is removed if the last share is removed from it.  If shares are
     501            created, they are created in it.
     502
     503        :param secrets: A tuple of ``WriteEnablerSecret``,
     504            ``LeaseRenewSecret``, and ``LeaseCancelSecret``.  These secrets
     505            are used to initialize new shares.
     506
     507        :param test_and_write_vectors: See
     508            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
     509
     510        :param dict[int, MutableShareFile]: The shares against which to
     511            execute the vectors.
     512
     513        :return dict[int, MutableShareFile]: The shares which still exist
     514            after applying the vectors.
     515        """
     516        remaining_shares = {}
     517
     518        for sharenum in test_and_write_vectors:
     519            (testv, datav, new_length) = test_and_write_vectors[sharenum]
     520            if new_length == 0:
     521                if sharenum in shares:
     522                    shares[sharenum].unlink()
     523            else:
     524                if sharenum not in shares:
     525                    # allocate a new share
     526                    allocated_size = 2000 # arbitrary, really
     527                    share = self._allocate_slot_share(bucketdir, secrets,
     528                                                      sharenum,
     529                                                      allocated_size,
     530                                                      owner_num=0)
     531                    shares[sharenum] = share
     532                shares[sharenum].writev(datav, new_length)
     533                remaining_shares[sharenum] = shares[sharenum]
     534
     535            if new_length == 0:
     536                # delete bucket directories that exist but are empty.  They
     537                # might not exist if a client showed up and asked us to
     538                # truncate a share we weren't even holding.
     539                if os.path.exists(bucketdir) and [] == os.listdir(bucketdir):
     540                    os.rmdir(bucketdir)
     541        return remaining_shares
     542
     543    def _make_lease_info(self, renew_secret, cancel_secret):
     544        """
     545        :return LeaseInfo: Information for a new lease for a share.
     546        """
    453547        ownerid = 1 # TODO
    454548        expire_time = time.time() + 31*24*60*60   # one month
     
    456550                               renew_secret, cancel_secret,
    457551                               expire_time, self.my_nodeid)
     552        return lease_info
     553
     554    def _add_or_renew_leases(self, shares, lease_info):
     555        """
     556        Put the given lease onto the given shares.
     557
     558        :param dict[int, MutableShareFile] shares: The shares to put the lease
     559            onto.
     560
     561        :param LeaseInfo lease_info: The lease to put on the shares.
     562        """
     563        for share in six.viewvalues(shares):
     564            share.add_or_renew_lease(lease_info)
     565
     566    def slot_testv_and_readv_and_writev(
     567            self,
     568            storage_index,
     569            secrets,
     570            test_and_write_vectors,
     571            read_vector,
     572            renew_leases,
     573    ):
     574        """
     575        Read data from shares and conditionally write some data to them.
     576
     577        :param bool renew_leases: If and only if this is ``True`` and the test
     578            vectors pass then shares in this slot will also have an updated
     579            lease applied to them.
     580
     581        See ``allmydata.interfaces.RIStorageServer`` for details about other
     582        parameters and return value.
     583        """
     584        start = time.time()
     585        self.count("writev")
     586        si_s = si_b2a(storage_index)
     587        log.msg("storage: slot_writev %s" % si_s)
     588        si_dir = storage_index_to_dir(storage_index)
     589        (write_enabler, renew_secret, cancel_secret) = secrets
     590        bucketdir = os.path.join(self.sharedir, si_dir)
     591
     592        # If collection succeeds we know the write_enabler is good for all
     593        # existing shares.
     594        shares = self._collect_mutable_shares_for_storage_index(
     595            bucketdir,
     596            write_enabler,
     597            si_s,
     598        )
     599
     600        # Now evaluate test vectors.
     601        testv_is_good = self._evaluate_test_vectors(
     602            test_and_write_vectors,
     603            shares,
     604        )
     605
     606        # now gather the read vectors, before we do any writes
     607        read_data = self._evaluate_read_vectors(
     608            read_vector,
     609            shares,
     610        )
    458611
    459612        if testv_is_good:
    460613            # now apply the write vectors
    461             for sharenum in test_and_write_vectors:
    462                 (testv, datav, new_length) = test_and_write_vectors[sharenum]
    463                 if new_length == 0:
    464                     if sharenum in shares:
    465                         shares[sharenum].unlink()
    466                 else:
    467                     if sharenum not in shares:
    468                         # allocate a new share
    469                         allocated_size = 2000 # arbitrary, really
    470                         share = self._allocate_slot_share(bucketdir, secrets,
    471                                                           sharenum,
    472                                                           allocated_size,
    473                                                           owner_num=0)
    474                         shares[sharenum] = share
    475                     shares[sharenum].writev(datav, new_length)
    476                     # and update the lease
    477                     shares[sharenum].add_or_renew_lease(lease_info)
    478 
    479             if new_length == 0:
    480                 # delete empty bucket directories
    481                 if not os.listdir(bucketdir):
    482                     os.rmdir(bucketdir)
    483 
     614            remaining_shares = self._evaluate_write_vectors(
     615                bucketdir,
     616                secrets,
     617                test_and_write_vectors,
     618                shares,
     619            )
     620            if renew_leases:
     621                lease_info = self._make_lease_info(renew_secret, cancel_secret)
     622                self._add_or_renew_leases(remaining_shares, lease_info)
    484623
    485624        # all done
    486625        self.add_latency("writev", time.time() - start)
    487626        return (testv_is_good, read_data)
     627
     628    def remote_slot_testv_and_readv_and_writev(self, storage_index,
     629                                               secrets,
     630                                               test_and_write_vectors,
     631                                               read_vector):
     632        return self.slot_testv_and_readv_and_writev(
     633            storage_index,
     634            secrets,
     635            test_and_write_vectors,
     636            read_vector,
     637            renew_leases=True,
     638        )
    488639
    489640    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
  • TabularUnified src/allmydata/test/test_storage.py

    r85980038 rdebefdc  
    13691369        self.failUnless(os.path.exists(prefixdir), prefixdir)
    13701370        self.failIf(os.path.exists(bucketdir), bucketdir)
     1371
     1372    def test_writev_without_renew_lease(self):
     1373        """
     1374        The helper method ``slot_testv_and_readv_and_writev`` does not renew
     1375        leases if ``False`` is passed for the ``renew_leases`` parameter.
     1376        """
     1377        ss = self.create("test_writev_without_renew_lease")
     1378
     1379        storage_index = "si2"
     1380        secrets = (
     1381            self.write_enabler(storage_index),
     1382            self.renew_secret(storage_index),
     1383            self.cancel_secret(storage_index),
     1384        )
     1385
     1386        sharenum = 3
     1387        datav = [(0, b"Hello, world")]
     1388
     1389        ss.slot_testv_and_readv_and_writev(
     1390            storage_index=storage_index,
     1391            secrets=secrets,
     1392            test_and_write_vectors={
     1393                sharenum: ([], datav, None),
     1394            },
     1395            read_vector=[],
     1396            renew_leases=False,
     1397        )
     1398        leases = list(ss.get_slot_leases(storage_index))
     1399        self.assertEqual([], leases)
     1400
     1401    def test_get_slot_leases_empty_slot(self):
     1402        """
     1403        When ``get_slot_leases`` is called for a slot for which the server has no
     1404        shares, it returns an empty iterable.
     1405        """
     1406        ss = self.create(b"test_get_slot_leases_empty_slot")
     1407        self.assertEqual(
     1408            list(ss.get_slot_leases(b"si1")),
     1409            [],
     1410        )
     1411
     1412    def test_remove_non_present(self):
     1413        """
     1414        A write vector which would remove a share completely is applied as a no-op
     1415        by a server which does not have the share.
     1416        """
     1417        ss = self.create("test_remove_non_present")
     1418
     1419        storage_index = "si1"
     1420        secrets = (
     1421            self.write_enabler(storage_index),
     1422            self.renew_secret(storage_index),
     1423            self.cancel_secret(storage_index),
     1424        )
     1425
     1426        sharenum = 3
     1427        testv = []
     1428        datav = []
     1429        new_length = 0
     1430        read_vector = []
     1431
     1432        # We don't even need to create any shares to exercise this
     1433        # functionality.  Just go straight to sending a truncate-to-zero
     1434        # write.
     1435        testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev(
     1436            storage_index=storage_index,
     1437            secrets=secrets,
     1438            test_and_write_vectors={
     1439                sharenum: (testv, datav, new_length),
     1440            },
     1441            read_vector=read_vector,
     1442        )
     1443
     1444        self.assertTrue(testv_is_good)
     1445        self.assertEqual({}, read_data)
    13711446
    13721447
Note: See TracChangeset for help on using the changeset viewer.