Changeset 4563ba4 in trunk


Ignore:
Timestamp:
2012-12-27T00:00:17Z (12 years ago)
Author:
David-Sarah Hopwood <david-sarah@…>
Branches:
master
Children:
0d4a826
Parents:
86189298
Message:

Remove ResponseCache? in favor of MDMFSlotReadProxy's cache. closes #1240.

This contains several merged patches. Individual messages follow, latest first:

  • Fix a warning from check-miscaptures.
  • In retrieve.py, explicitly test whether a key is in self.servermap.proxies rather than catching KeyError?.
  • Added a new comment to the MDMF version of the test I removed, explaining the removal of the SDMF version.
  • Removed test_corrupt_all_block_hash_tree_late, since the entire block_hash_tree is cached in the servermap for an SDMF file.
  • Fixed several tests that require files larger than the servermap cache.
  • Remove unused test_response_cache_memory_leak().
  • Exercise the cache.
  • Test infrastructure for counting cache misses on MDMF files.
  • Removed the ResponseCache?. Instead, the MDMFSlotReadProxy initialized by ServerMap? is kept around so Retrieve can access it. The ReadProxy? has a cache of the first 1000 bytes initially read from each share by the ServerMap?. We're able to satisfy a number of requests out of this cache, so roundtrips are reduced from 84 to 60 in test_deepcheck_mdmf. There is still some mystery about under what conditions the cache has fewer than 1000 bytes. Also this breaks some existing unit tests that depend on the inner behavior of ResponseCache?.
  • The servermap.proxies (a cache of SlotReadProxies?) is now keyed by (verinfo,serverid,shnum) rather than just (serverid,shnum)
  • Minor cosmetic changes
  • Added a test failure if the number of cache misses is too high.

Author: Andrew Miller <amiller@…>
Signed-off-by: David-Sarah Hopwood <davidsarah@…>

Location:
src/allmydata
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/mutable/common.py

    r86189298 r4563ba4  
    1 
    2 from allmydata.util.spans import DataSpans
    31
    42MODE_CHECK = "MODE_CHECK" # query all peers
     
    6058class UnknownVersionError(BadShareError):
    6159    """The share we received was of a version we don't recognize."""
    62 
    63 class ResponseCache:
    64     """I cache share data, to reduce the number of round trips used during
    65     mutable file operations. All of the data in my cache is for a single
    66     storage index, but I will keep information on multiple shares for
    67     that storage index.
    68 
    69     I maintain a highest-seen sequence number, and will flush all entries
    70     each time this number increases (this doesn't necessarily imply that
    71     all entries have the same sequence number).
    72 
    73     My cache is indexed by a (verinfo, shnum) tuple.
    74 
    75     My cache entries are DataSpans instances, each representing a set of
    76     non-overlapping byteranges.
    77     """
    78 
    79     def __init__(self):
    80         self.cache = {}
    81         self.seqnum = None
    82 
    83     def _clear(self):
    84         # also used by unit tests
    85         self.cache = {}
    86 
    87     def add(self, verinfo, shnum, offset, data):
    88         seqnum = verinfo[0]
    89         if seqnum > self.seqnum:
    90             self._clear()
    91             self.seqnum = seqnum
    92 
    93         index = (verinfo, shnum)
    94         if index in self.cache:
    95             self.cache[index].add(offset, data)
    96         else:
    97             spans = DataSpans()
    98             spans.add(offset, data)
    99             self.cache[index] = spans
    100 
    101     def read(self, verinfo, shnum, offset, length):
    102         """Try to satisfy a read request from cache.
    103         Returns data, or None if the cache did not hold the entire requested span.
    104         """
    105 
    106         # TODO: perhaps return a DataSpans object representing the fragments
    107         # that we have, instead of only returning a hit if we can satisfy the
    108         # whole request from cache.
    109 
    110         index = (verinfo, shnum)
    111         if index in self.cache:
    112             return self.cache[index].get(offset, length)
    113         else:
    114             return None
  • TabularUnified src/allmydata/mutable/filenode.py

    r86189298 r4563ba4  
    1818                                      TransformingUploadable
    1919from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \
    20      ResponseCache, UncoordinatedWriteError
     20     UncoordinatedWriteError
    2121from allmydata.mutable.servermap import ServerMap, ServermapUpdater
    2222from allmydata.mutable.retrieve import Retrieve
     
    6666        self._total_shares = default_encoding_parameters["n"]
    6767        self._sharemap = {} # known shares, shnum-to-[nodeids]
    68         self._cache = ResponseCache()
    6968        self._most_recent_size = None
    7069        # filled in after __init__ if we're being created for the first time;
     
    181180    def _populate_encprivkey(self, encprivkey):
    182181        self._encprivkey = encprivkey
    183     def _add_to_cache(self, verinfo, shnum, offset, data):
    184         self._cache.add(verinfo, shnum, offset, data)
    185     def _read_from_cache(self, verinfo, shnum, offset, length):
    186         return self._cache.read(verinfo, shnum, offset, length)
    187182
    188183    def get_write_enabler(self, server):
  • TabularUnified src/allmydata/mutable/layout.py

    r86189298 r4563ba4  
    11931193                 storage_index,
    11941194                 shnum,
    1195                  data=""):
     1195                 data="",
     1196                 data_is_everything=False):
    11961197        # Start the initialization process.
    11971198        self._rref = rref
     
    12241225        # If the user has chosen to initialize us with some data, we'll
    12251226        # try to satisfy subsequent data requests with that data before
    1226         # asking the storage server for it. If
     1227        # asking the storage server for it.
    12271228        self._data = data
     1229
     1230        # If the provided data is known to be complete, then we know there's
     1231        # nothing to be gained by querying the server, so we should just
     1232        # partially satisfy requests with what we have.
     1233        self._data_is_everything = data_is_everything
     1234
    12281235        # The way callers interact with cache in the filenode returns
    12291236        # None if there isn't any cached data, but the way we index the
     
    17391746        # fulfills the requests that it can, and not demand that all
    17401747        # requests are satisfiable before running it.
    1741         if not unsatisfiable and not force_remote:
     1748
     1749        if not unsatisfiable or self._data_is_everything:
    17421750            results = [self._data[offset:offset+length]
    17431751                       for (offset, length) in readvs]
  • TabularUnified src/allmydata/mutable/retrieve.py

    r86189298 r4563ba4  
    287287        for (shnum, server, timestamp) in shares:
    288288            self.remaining_sharemap.add(shnum, server)
    289             # If the servermap update fetched anything, it fetched at least 1
    290             # KiB, so we ask for that much.
    291             # TODO: Change the cache methods to allow us to fetch all of the
    292             # data that they have, then change this method to do that.
    293             any_cache = self._node._read_from_cache(self.verinfo, shnum,
    294                                                     0, 1000)
    295             reader = MDMFSlotReadProxy(server.get_rref(),
    296                                        self._storage_index,
    297                                        shnum,
    298                                        any_cache)
     289            # Reuse the SlotReader from the servermap.
     290            key = (self.verinfo, server.get_serverid(),
     291                   self._storage_index, shnum)
     292            if key in self.servermap.proxies:
     293                reader = self.servermap.proxies[key]
     294            else:
     295                reader = MDMFSlotReadProxy(server.get_rref(),
     296                                           self._storage_index, shnum, None)
    299297            reader.server = server
    300298            self.readers[shnum] = reader
     
    767765        block_and_salt, blockhashes, sharehashes = results
    768766        block, salt = block_and_salt
     767        assert type(block) is str, (block, salt)
    769768
    770769        blockhashes = dict(enumerate(blockhashes))
     
    839838        self.log("getting blockhashes for segment %d, share %d: %s" % \
    840839                 (segnum, reader.shnum, str(needed)))
    841         d1 = reader.get_blockhashes(needed, force_remote=True)
     840        # TODO is force_remote necessary here?
     841        d1 = reader.get_blockhashes(needed, force_remote=False)
    842842        if self.share_hash_tree.needed_hashes(reader.shnum):
    843843            need = self.share_hash_tree.needed_hashes(reader.shnum)
    844844            self.log("also need sharehashes for share %d: %s" % (reader.shnum,
    845845                                                                 str(need)))
    846             d2 = reader.get_sharehashes(need, force_remote=True)
     846            d2 = reader.get_sharehashes(need, force_remote=False)
    847847        else:
    848848            d2 = defer.succeed({}) # the logic in the next method
  • TabularUnified src/allmydata/mutable/servermap.py

    r86189298 r4563ba4  
    120120        self._last_update_mode = None
    121121        self._last_update_time = 0
     122        self.proxies = {}
    122123        self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
    123124        # where blockhashes is a list of bytestrings (the result of
     
    632633
    633634
    634     def _cache_good_sharedata(self, verinfo, shnum, now, data):
    635         """
    636         If one of my queries returns successfully (which means that we
    637         were able to and successfully did validate the signature), I
    638         cache the data that we initially fetched from the storage
    639         server. This will help reduce the number of roundtrips that need
    640         to occur when the file is downloaded, or when the file is
    641         updated.
    642         """
    643         if verinfo:
    644             self._node._add_to_cache(verinfo, shnum, 0, data)
    645 
    646 
    647635    def _got_results(self, datavs, server, readsize, storage_index, started):
    648636        lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
     
    676664                                       storage_index,
    677665                                       shnum,
    678                                        data)
     666                                       data,
     667                                       data_is_everything=(len(data) < readsize))
     668
    679669            # our goal, with each response, is to validate the version
    680670            # information and share data as best we can at this point --
     
    748738
    749739            dl = defer.DeferredList([d, d2, d3, d4, d5])
     740            def _append_proxy(passthrough, shnum=shnum, reader=reader):
     741                # Store the proxy (with its cache) keyed by serverid and
     742                # version.
     743                _, (_,verinfo), _, _, _ = passthrough
     744                verinfo = self._make_verinfo_hashable(verinfo)
     745                self._servermap.proxies[(verinfo,
     746                                         server.get_serverid(),
     747                                         storage_index, shnum)] = reader
     748                return passthrough
     749            dl.addCallback(_append_proxy)
    750750            dl.addBoth(self._turn_barrier)
    751751            dl.addCallback(lambda results, shnum=shnum:
     
    753753            dl.addErrback(lambda error, shnum=shnum, data=data:
    754754                          self._got_corrupt_share(error, shnum, server, data, lp))
    755             dl.addCallback(lambda verinfo, shnum=shnum, data=data:
    756                            self._cache_good_sharedata(verinfo, shnum, now, data))
    757755            ds.append(dl)
    758756        # dl is a deferred list that will fire when all of the shares
     
    818816
    819817        _, verinfo, signature, __, ___ = results
     818        verinfo = self._make_verinfo_hashable(verinfo[1])
     819
     820        # This tuple uniquely identifies a share on the grid; we use it
     821        # to keep track of the ones that we've already seen.
    820822        (seqnum,
    821823         root_hash,
     
    826828         n,
    827829         prefix,
    828          offsets) = verinfo[1]
    829         offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
    830 
    831         # XXX: This should be done for us in the method, so
    832         # presumably you can go in there and fix it.
    833         verinfo = (seqnum,
    834                    root_hash,
    835                    saltish,
    836                    segsize,
    837                    datalen,
    838                    k,
    839                    n,
    840                    prefix,
    841                    offsets_tuple)
    842         # This tuple uniquely identifies a share on the grid; we use it
    843         # to keep track of the ones that we've already seen.
     830         offsets_tuple) = verinfo
     831
    844832
    845833        if verinfo not in self._valid_versions:
     
    880868        return verinfo
    881869
    882 
    883     def _got_update_results_one_share(self, results, share):
    884         """
    885         I record the update results in results.
    886         """
    887         assert len(results) == 4
    888         verinfo, blockhashes, start, end = results
     870    def _make_verinfo_hashable(self, verinfo):
    889871        (seqnum,
    890872         root_hash,
     
    896878         prefix,
    897879         offsets) = verinfo
     880
    898881        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
    899882
    900         # XXX: This should be done for us in the method, so
    901         # presumably you can go in there and fix it.
    902883        verinfo = (seqnum,
    903884                   root_hash,
     
    909890                   prefix,
    910891                   offsets_tuple)
    911 
     892        return verinfo
     893
     894    def _got_update_results_one_share(self, results, share):
     895        """
     896        I record the update results in results.
     897        """
     898        assert len(results) == 4
     899        verinfo, blockhashes, start, end = results
     900        verinfo = self._make_verinfo_hashable(verinfo)
    912901        update_data = (blockhashes, start, end)
    913902        self._servermap.set_update_data_for_share_and_verinfo(share,
  • TabularUnified src/allmydata/test/no_network.py

    r86189298 r4563ba4  
    4444        self.post_call_notifier = None
    4545        self.disconnectors = {}
     46        self.counter_by_methname = {}
     47
     48    def _clear_counters(self):
     49        self.counter_by_methname = {}
    4650
    4751    def callRemoteOnly(self, methname, *args, **kwargs):
     
    6367
    6468        def _really_call():
     69            def incr(d, k): d[k] = d.setdefault(k, 0) + 1
     70            incr(self.counter_by_methname, methname)
    6571            meth = getattr(self.original, "remote_" + methname)
    6672            return meth(*args, **kwargs)
  • TabularUnified src/allmydata/test/test_dirnode.py

    r86189298 r4563ba4  
    10981098        return d
    10991099
     1100    def test_deepcheck_cachemisses(self):
     1101        self.basedir = "dirnode/Dirnode/test_mdmf_cachemisses"
     1102        self.set_up_grid()
     1103        d = self._test_deepcheck_create()
     1104        # Clear the counters and set the rootnode
     1105        d.addCallback(lambda rootnode:
     1106                      not [ss._clear_counters() for ss
     1107                           in self.g.wrappers_by_id.values()] or rootnode)
     1108        d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done())
     1109        def _check(ign):
     1110            count = sum([ss.counter_by_methname['slot_readv']
     1111                         for ss in self.g.wrappers_by_id.values()])
     1112            self.failIf(count > 60, 'Expected only 60 cache misses,'
     1113                                    'unfortunately there were %d' % (count,))
     1114        d.addCallback(_check)
     1115        return d
     1116
    11001117    def test_deepcheck_mdmf(self):
    11011118        self.basedir = "dirnode/Dirnode/test_deepcheck_mdmf"
  • TabularUnified src/allmydata/test/test_mutable.py

    r86189298 r4563ba4  
    2222
    2323from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
    24 from allmydata.mutable.common import ResponseCache, \
     24from allmydata.mutable.common import \
    2525     MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
    2626     NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
     
    640640        return d
    641641
    642 
    643     def test_response_cache_memory_leak(self):
    644         d = self.nodemaker.create_mutable_file("contents")
    645         def _created(n):
    646             d = n.download_best_version()
    647             d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))
    648             d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))
    649 
    650             def _check_cache(expected):
    651                 # The total size of cache entries should not increase on the second download;
    652                 # in fact the cache contents should be identical.
    653                 d2 = n.download_best_version()
    654                 d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))
    655                 return d2
    656             d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))
    657             return d
    658         d.addCallback(_created)
    659         return d
    660 
    661642    def test_create_with_initial_contents_function(self):
    662643        data = "initial contents"
     
    15291510                                      failure_checker=_check)
    15301511
    1531     def test_corrupt_all_block_hash_tree_late(self):
    1532         def _check(res):
    1533             f = res[0]
    1534             self.failUnless(f.check(NotEnoughSharesError))
    1535         return self._test_corrupt_all("block_hash_tree",
    1536                                       "block hash tree failure",
    1537                                       corrupt_early=False,
    1538                                       failure_checker=_check)
    1539 
    15401512
    15411513    def test_corrupt_all_block_late(self):
     
    16191591            self._test_corrupt_all(("block_hash_tree", 12 * 32),
    16201592                                   "block hash tree failure",
    1621                                    corrupt_early=False,
     1593                                   corrupt_early=True,
    16221594                                   should_succeed=False))
    16231595        return d
     
    16251597
    16261598    def test_corrupt_mdmf_block_hash_tree_late(self):
     1599        # Note - there is no SDMF counterpart to this test, as the SDMF
     1600        # files are guaranteed to have exactly one block, and therefore
     1601        # the block hash tree fits within the initial read (#1240).
    16271602        d = self.publish_mdmf()
    16281603        d.addCallback(lambda ignored:
    16291604            self._test_corrupt_all(("block_hash_tree", 12 * 32),
    16301605                                   "block hash tree failure",
    1631                                    corrupt_early=True,
     1606                                   corrupt_early=False,
    16321607                                   should_succeed=False))
    16331608        return d
     
    22342209        # a variety of encodings. This is actually kind of tricky to set up.
    22352210
    2236         contents1 = "Contents for encoding 1 (3-of-10) go here"
    2237         contents2 = "Contents for encoding 2 (4-of-9) go here"
    2238         contents3 = "Contents for encoding 3 (4-of-7) go here"
     2211        contents1 = "Contents for encoding 1 (3-of-10) go here"*1000
     2212        contents2 = "Contents for encoding 2 (4-of-9) go here"*1000
     2213        contents3 = "Contents for encoding 3 (4-of-7) go here"*1000
    22392214
    22402215        # we make a retrieval object that doesn't know what encoding
     
    24042379
    24052380
    2406 class Utils(unittest.TestCase):
    2407     def test_cache(self):
    2408         c = ResponseCache()
    2409         # xdata = base62.b2a(os.urandom(100))[:100]
    2410         xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
    2411         ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
    2412         c.add("v1", 1, 0, xdata)
    2413         c.add("v1", 1, 2000, ydata)
    2414         self.failUnlessEqual(c.read("v2", 1, 10, 11), None)
    2415         self.failUnlessEqual(c.read("v1", 2, 10, 11), None)
    2416         self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])
    2417         self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])
    2418         self.failUnlessEqual(c.read("v1", 1, 300, 10), None)
    2419         self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])
    2420         self.failUnlessEqual(c.read("v1", 1, 0, 101), None)
    2421         self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])
    2422         self.failUnlessEqual(c.read("v1", 1, 100, 1), None)
    2423         self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)
    2424         self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)
    2425         self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)
    2426         self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)
    2427         self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)
    2428         self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)
    2429         self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)
    2430         self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)
    2431         self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)
    2432 
    2433         # test joining fragments
    2434         c = ResponseCache()
    2435         c.add("v1", 1, 0, xdata[:10])
    2436         c.add("v1", 1, 10, xdata[10:20])
    2437         self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])
    2438 
    24392381class Exceptions(unittest.TestCase):
    24402382    def test_repr(self):
     
    24432385        ucwe = UncoordinatedWriteError()
    24442386        self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
     2387
    24452388
    24462389class SameKeyGenerator:
     
    25152458        self.set_up_grid()
    25162459        nm = self.g.clients[0].nodemaker
    2517         d = nm.create_mutable_file(MutableData("contents 1"))
     2460        d = nm.create_mutable_file(MutableData("contents 1"*4000))
    25182461        def _created(n):
    25192462            d = defer.succeed(None)
     
    25292472            # This will look like someone has changed the file since we
    25302473            # updated the servermap.
    2531             d.addCallback(lambda res: n._cache._clear())
    25322474            d.addCallback(lambda res: log.msg("starting doomed read"))
    25332475            d.addCallback(lambda res:
Note: See TracChangeset for help on using the changeset viewer.