Changeset 4563ba4 in trunk
- Timestamp:
- 2012-12-27T00:00:17Z (12 years ago)
- Branches:
- master
- Children:
- 0d4a826
- Parents:
- 86189298
- Location:
- src/allmydata
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/mutable/common.py ¶
r86189298 r4563ba4 1 2 from allmydata.util.spans import DataSpans3 1 4 2 MODE_CHECK = "MODE_CHECK" # query all peers … … 60 58 class UnknownVersionError(BadShareError): 61 59 """The share we received was of a version we don't recognize.""" 62 63 class ResponseCache:64 """I cache share data, to reduce the number of round trips used during65 mutable file operations. All of the data in my cache is for a single66 storage index, but I will keep information on multiple shares for67 that storage index.68 69 I maintain a highest-seen sequence number, and will flush all entries70 each time this number increases (this doesn't necessarily imply that71 all entries have the same sequence number).72 73 My cache is indexed by a (verinfo, shnum) tuple.74 75 My cache entries are DataSpans instances, each representing a set of76 non-overlapping byteranges.77 """78 79 def __init__(self):80 self.cache = {}81 self.seqnum = None82 83 def _clear(self):84 # also used by unit tests85 self.cache = {}86 87 def add(self, verinfo, shnum, offset, data):88 seqnum = verinfo[0]89 if seqnum > self.seqnum:90 self._clear()91 self.seqnum = seqnum92 93 index = (verinfo, shnum)94 if index in self.cache:95 self.cache[index].add(offset, data)96 else:97 spans = DataSpans()98 spans.add(offset, data)99 self.cache[index] = spans100 101 def read(self, verinfo, shnum, offset, length):102 """Try to satisfy a read request from cache.103 Returns data, or None if the cache did not hold the entire requested span.104 """105 106 # TODO: perhaps return a DataSpans object representing the fragments107 # that we have, instead of only returning a hit if we can satisfy the108 # whole request from cache.109 110 index = (verinfo, shnum)111 if index in self.cache:112 return self.cache[index].get(offset, length)113 else:114 return None -
TabularUnified src/allmydata/mutable/filenode.py ¶
r86189298 r4563ba4 18 18 TransformingUploadable 19 19 from allmydata.mutable.common import MODE_READ, MODE_WRITE, MODE_CHECK, UnrecoverableFileError, \ 20 ResponseCache,UncoordinatedWriteError20 UncoordinatedWriteError 21 21 from allmydata.mutable.servermap import ServerMap, ServermapUpdater 22 22 from allmydata.mutable.retrieve import Retrieve … … 66 66 self._total_shares = default_encoding_parameters["n"] 67 67 self._sharemap = {} # known shares, shnum-to-[nodeids] 68 self._cache = ResponseCache()69 68 self._most_recent_size = None 70 69 # filled in after __init__ if we're being created for the first time; … … 181 180 def _populate_encprivkey(self, encprivkey): 182 181 self._encprivkey = encprivkey 183 def _add_to_cache(self, verinfo, shnum, offset, data):184 self._cache.add(verinfo, shnum, offset, data)185 def _read_from_cache(self, verinfo, shnum, offset, length):186 return self._cache.read(verinfo, shnum, offset, length)187 182 188 183 def get_write_enabler(self, server): -
TabularUnified src/allmydata/mutable/layout.py ¶
r86189298 r4563ba4 1193 1193 storage_index, 1194 1194 shnum, 1195 data=""): 1195 data="", 1196 data_is_everything=False): 1196 1197 # Start the initialization process. 1197 1198 self._rref = rref … … 1224 1225 # If the user has chosen to initialize us with some data, we'll 1225 1226 # try to satisfy subsequent data requests with that data before 1226 # asking the storage server for it. If1227 # asking the storage server for it. 1227 1228 self._data = data 1229 1230 # If the provided data is known to be complete, then we know there's 1231 # nothing to be gained by querying the server, so we should just 1232 # partially satisfy requests with what we have. 1233 self._data_is_everything = data_is_everything 1234 1228 1235 # The way callers interact with cache in the filenode returns 1229 1236 # None if there isn't any cached data, but the way we index the … … 1739 1746 # fulfills the requests that it can, and not demand that all 1740 1747 # requests are satisfiable before running it. 1741 if not unsatisfiable and not force_remote: 1748 1749 if not unsatisfiable or self._data_is_everything: 1742 1750 results = [self._data[offset:offset+length] 1743 1751 for (offset, length) in readvs] -
TabularUnified src/allmydata/mutable/retrieve.py ¶
r86189298 r4563ba4 287 287 for (shnum, server, timestamp) in shares: 288 288 self.remaining_sharemap.add(shnum, server) 289 # If the servermap update fetched anything, it fetched at least 1 290 # KiB, so we ask for that much. 291 # TODO: Change the cache methods to allow us to fetch all of the 292 # data that they have, then change this method to do that. 293 any_cache = self._node._read_from_cache(self.verinfo, shnum, 294 0, 1000) 295 reader = MDMFSlotReadProxy(server.get_rref(), 296 self._storage_index, 297 shnum, 298 any_cache) 289 # Reuse the SlotReader from the servermap. 290 key = (self.verinfo, server.get_serverid(), 291 self._storage_index, shnum) 292 if key in self.servermap.proxies: 293 reader = self.servermap.proxies[key] 294 else: 295 reader = MDMFSlotReadProxy(server.get_rref(), 296 self._storage_index, shnum, None) 299 297 reader.server = server 300 298 self.readers[shnum] = reader … … 767 765 block_and_salt, blockhashes, sharehashes = results 768 766 block, salt = block_and_salt 767 assert type(block) is str, (block, salt) 769 768 770 769 blockhashes = dict(enumerate(blockhashes)) … … 839 838 self.log("getting blockhashes for segment %d, share %d: %s" % \ 840 839 (segnum, reader.shnum, str(needed))) 841 d1 = reader.get_blockhashes(needed, force_remote=True) 840 # TODO is force_remote necessary here? 841 d1 = reader.get_blockhashes(needed, force_remote=False) 842 842 if self.share_hash_tree.needed_hashes(reader.shnum): 843 843 need = self.share_hash_tree.needed_hashes(reader.shnum) 844 844 self.log("also need sharehashes for share %d: %s" % (reader.shnum, 845 845 str(need))) 846 d2 = reader.get_sharehashes(need, force_remote= True)846 d2 = reader.get_sharehashes(need, force_remote=False) 847 847 else: 848 848 d2 = defer.succeed({}) # the logic in the next method -
TabularUnified src/allmydata/mutable/servermap.py ¶
r86189298 r4563ba4 120 120 self._last_update_mode = None 121 121 self._last_update_time = 0 122 self.proxies = {} 122 123 self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..] 123 124 # where blockhashes is a list of bytestrings (the result of … … 632 633 633 634 634 def _cache_good_sharedata(self, verinfo, shnum, now, data):635 """636 If one of my queries returns successfully (which means that we637 were able to and successfully did validate the signature), I638 cache the data that we initially fetched from the storage639 server. This will help reduce the number of roundtrips that need640 to occur when the file is downloaded, or when the file is641 updated.642 """643 if verinfo:644 self._node._add_to_cache(verinfo, shnum, 0, data)645 646 647 635 def _got_results(self, datavs, server, readsize, storage_index, started): 648 636 lp = self.log(format="got result from [%(name)s], %(numshares)d shares", … … 676 664 storage_index, 677 665 shnum, 678 data) 666 data, 667 data_is_everything=(len(data) < readsize)) 668 679 669 # our goal, with each response, is to validate the version 680 670 # information and share data as best we can at this point -- … … 748 738 749 739 dl = defer.DeferredList([d, d2, d3, d4, d5]) 740 def _append_proxy(passthrough, shnum=shnum, reader=reader): 741 # Store the proxy (with its cache) keyed by serverid and 742 # version. 743 _, (_,verinfo), _, _, _ = passthrough 744 verinfo = self._make_verinfo_hashable(verinfo) 745 self._servermap.proxies[(verinfo, 746 server.get_serverid(), 747 storage_index, shnum)] = reader 748 return passthrough 749 dl.addCallback(_append_proxy) 750 750 dl.addBoth(self._turn_barrier) 751 751 dl.addCallback(lambda results, shnum=shnum: … … 753 753 dl.addErrback(lambda error, shnum=shnum, data=data: 754 754 self._got_corrupt_share(error, shnum, server, data, lp)) 755 dl.addCallback(lambda verinfo, shnum=shnum, data=data:756 self._cache_good_sharedata(verinfo, shnum, now, data))757 755 ds.append(dl) 758 756 # dl is a deferred list that will fire when all of the shares … … 818 816 819 817 _, verinfo, signature, __, ___ = results 818 verinfo = self._make_verinfo_hashable(verinfo[1]) 819 820 # This tuple uniquely identifies a share on the grid; we use it 821 # to keep track of the ones that we've already seen. 820 822 (seqnum, 821 823 root_hash, … … 826 828 n, 827 829 prefix, 828 offsets) = verinfo[1] 829 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) 830 831 # XXX: This should be done for us in the method, so 832 # presumably you can go in there and fix it. 833 verinfo = (seqnum, 834 root_hash, 835 saltish, 836 segsize, 837 datalen, 838 k, 839 n, 840 prefix, 841 offsets_tuple) 842 # This tuple uniquely identifies a share on the grid; we use it 843 # to keep track of the ones that we've already seen. 830 offsets_tuple) = verinfo 831 844 832 845 833 if verinfo not in self._valid_versions: … … 880 868 return verinfo 881 869 882 883 def _got_update_results_one_share(self, results, share): 884 """ 885 I record the update results in results. 886 """ 887 assert len(results) == 4 888 verinfo, blockhashes, start, end = results 870 def _make_verinfo_hashable(self, verinfo): 889 871 (seqnum, 890 872 root_hash, … … 896 878 prefix, 897 879 offsets) = verinfo 880 898 881 offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) 899 882 900 # XXX: This should be done for us in the method, so901 # presumably you can go in there and fix it.902 883 verinfo = (seqnum, 903 884 root_hash, … … 909 890 prefix, 910 891 offsets_tuple) 911 892 return verinfo 893 894 def _got_update_results_one_share(self, results, share): 895 """ 896 I record the update results in results. 897 """ 898 assert len(results) == 4 899 verinfo, blockhashes, start, end = results 900 verinfo = self._make_verinfo_hashable(verinfo) 912 901 update_data = (blockhashes, start, end) 913 902 self._servermap.set_update_data_for_share_and_verinfo(share, -
TabularUnified src/allmydata/test/no_network.py ¶
r86189298 r4563ba4 44 44 self.post_call_notifier = None 45 45 self.disconnectors = {} 46 self.counter_by_methname = {} 47 48 def _clear_counters(self): 49 self.counter_by_methname = {} 46 50 47 51 def callRemoteOnly(self, methname, *args, **kwargs): … … 63 67 64 68 def _really_call(): 69 def incr(d, k): d[k] = d.setdefault(k, 0) + 1 70 incr(self.counter_by_methname, methname) 65 71 meth = getattr(self.original, "remote_" + methname) 66 72 return meth(*args, **kwargs) -
TabularUnified src/allmydata/test/test_dirnode.py ¶
r86189298 r4563ba4 1098 1098 return d 1099 1099 1100 def test_deepcheck_cachemisses(self): 1101 self.basedir = "dirnode/Dirnode/test_mdmf_cachemisses" 1102 self.set_up_grid() 1103 d = self._test_deepcheck_create() 1104 # Clear the counters and set the rootnode 1105 d.addCallback(lambda rootnode: 1106 not [ss._clear_counters() for ss 1107 in self.g.wrappers_by_id.values()] or rootnode) 1108 d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done()) 1109 def _check(ign): 1110 count = sum([ss.counter_by_methname['slot_readv'] 1111 for ss in self.g.wrappers_by_id.values()]) 1112 self.failIf(count > 60, 'Expected only 60 cache misses,' 1113 'unfortunately there were %d' % (count,)) 1114 d.addCallback(_check) 1115 return d 1116 1100 1117 def test_deepcheck_mdmf(self): 1101 1118 self.basedir = "dirnode/Dirnode/test_deepcheck_mdmf" -
TabularUnified src/allmydata/test/test_mutable.py ¶
r86189298 r4563ba4 22 22 23 23 from allmydata.mutable.filenode import MutableFileNode, BackoffAgent 24 from allmydata.mutable.common import ResponseCache,\24 from allmydata.mutable.common import \ 25 25 MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ 26 26 NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \ … … 640 640 return d 641 641 642 643 def test_response_cache_memory_leak(self):644 d = self.nodemaker.create_mutable_file("contents")645 def _created(n):646 d = n.download_best_version()647 d.addCallback(lambda res: self.failUnlessEqual(res, "contents"))648 d.addCallback(lambda ign: self.failUnless(isinstance(n._cache, ResponseCache)))649 650 def _check_cache(expected):651 # The total size of cache entries should not increase on the second download;652 # in fact the cache contents should be identical.653 d2 = n.download_best_version()654 d2.addCallback(lambda rep: self.failUnlessEqual(repr(n._cache.cache), expected))655 return d2656 d.addCallback(lambda ign: _check_cache(repr(n._cache.cache)))657 return d658 d.addCallback(_created)659 return d660 661 642 def test_create_with_initial_contents_function(self): 662 643 data = "initial contents" … … 1529 1510 failure_checker=_check) 1530 1511 1531 def test_corrupt_all_block_hash_tree_late(self):1532 def _check(res):1533 f = res[0]1534 self.failUnless(f.check(NotEnoughSharesError))1535 return self._test_corrupt_all("block_hash_tree",1536 "block hash tree failure",1537 corrupt_early=False,1538 failure_checker=_check)1539 1540 1512 1541 1513 def test_corrupt_all_block_late(self): … … 1619 1591 self._test_corrupt_all(("block_hash_tree", 12 * 32), 1620 1592 "block hash tree failure", 1621 corrupt_early= False,1593 corrupt_early=True, 1622 1594 should_succeed=False)) 1623 1595 return d … … 1625 1597 1626 1598 def test_corrupt_mdmf_block_hash_tree_late(self): 1599 # Note - there is no SDMF counterpart to this test, as the SDMF 1600 # files are guaranteed to have exactly one block, and therefore 1601 # the block hash tree fits within the initial read (#1240). 1627 1602 d = self.publish_mdmf() 1628 1603 d.addCallback(lambda ignored: 1629 1604 self._test_corrupt_all(("block_hash_tree", 12 * 32), 1630 1605 "block hash tree failure", 1631 corrupt_early= True,1606 corrupt_early=False, 1632 1607 should_succeed=False)) 1633 1608 return d … … 2234 2209 # a variety of encodings. This is actually kind of tricky to set up. 2235 2210 2236 contents1 = "Contents for encoding 1 (3-of-10) go here" 2237 contents2 = "Contents for encoding 2 (4-of-9) go here" 2238 contents3 = "Contents for encoding 3 (4-of-7) go here" 2211 contents1 = "Contents for encoding 1 (3-of-10) go here"*1000 2212 contents2 = "Contents for encoding 2 (4-of-9) go here"*1000 2213 contents3 = "Contents for encoding 3 (4-of-7) go here"*1000 2239 2214 2240 2215 # we make a retrieval object that doesn't know what encoding … … 2404 2379 2405 2380 2406 class Utils(unittest.TestCase):2407 def test_cache(self):2408 c = ResponseCache()2409 # xdata = base62.b2a(os.urandom(100))[:100]2410 xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"2411 ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"2412 c.add("v1", 1, 0, xdata)2413 c.add("v1", 1, 2000, ydata)2414 self.failUnlessEqual(c.read("v2", 1, 10, 11), None)2415 self.failUnlessEqual(c.read("v1", 2, 10, 11), None)2416 self.failUnlessEqual(c.read("v1", 1, 0, 10), xdata[:10])2417 self.failUnlessEqual(c.read("v1", 1, 90, 10), xdata[90:])2418 self.failUnlessEqual(c.read("v1", 1, 300, 10), None)2419 self.failUnlessEqual(c.read("v1", 1, 2050, 5), ydata[50:55])2420 self.failUnlessEqual(c.read("v1", 1, 0, 101), None)2421 self.failUnlessEqual(c.read("v1", 1, 99, 1), xdata[99:100])2422 self.failUnlessEqual(c.read("v1", 1, 100, 1), None)2423 self.failUnlessEqual(c.read("v1", 1, 1990, 9), None)2424 self.failUnlessEqual(c.read("v1", 1, 1990, 10), None)2425 self.failUnlessEqual(c.read("v1", 1, 1990, 11), None)2426 self.failUnlessEqual(c.read("v1", 1, 1990, 15), None)2427 self.failUnlessEqual(c.read("v1", 1, 1990, 19), None)2428 self.failUnlessEqual(c.read("v1", 1, 1990, 20), None)2429 self.failUnlessEqual(c.read("v1", 1, 1990, 21), None)2430 self.failUnlessEqual(c.read("v1", 1, 1990, 25), None)2431 self.failUnlessEqual(c.read("v1", 1, 1999, 25), None)2432 2433 # test joining fragments2434 c = ResponseCache()2435 c.add("v1", 1, 0, xdata[:10])2436 c.add("v1", 1, 10, xdata[10:20])2437 self.failUnlessEqual(c.read("v1", 1, 0, 20), xdata[:20])2438 2439 2381 class Exceptions(unittest.TestCase): 2440 2382 def test_repr(self): … … 2443 2385 ucwe = UncoordinatedWriteError() 2444 2386 self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe)) 2387 2445 2388 2446 2389 class SameKeyGenerator: … … 2515 2458 self.set_up_grid() 2516 2459 nm = self.g.clients[0].nodemaker 2517 d = nm.create_mutable_file(MutableData("contents 1" ))2460 d = nm.create_mutable_file(MutableData("contents 1"*4000)) 2518 2461 def _created(n): 2519 2462 d = defer.succeed(None) … … 2529 2472 # This will look like someone has changed the file since we 2530 2473 # updated the servermap. 2531 d.addCallback(lambda res: n._cache._clear())2532 2474 d.addCallback(lambda res: log.msg("starting doomed read")) 2533 2475 d.addCallback(lambda res:
Note: See TracChangeset
for help on using the changeset viewer.