Changeset ba019bf in trunk
- Timestamp:
- 2008-10-30T20:39:09Z (16 years ago)
- Branches:
- master
- Children:
- 845b77ae
- Parents:
- c205a54
- Location:
- src/allmydata
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/client.py ¶
rc205a54 rba019bf 19 19 from allmydata.control import ControlServer 20 20 from allmydata.introducer.client import IntroducerClient 21 from allmydata.util import hashutil, base32, pollmixin, fileutil21 from allmydata.util import hashutil, base32, pollmixin, cachedir 22 22 from allmydata.uri import LiteralFileURI 23 23 from allmydata.dirnode import NewDirectoryNode … … 189 189 self._node_cache = weakref.WeakValueDictionary() # uri -> node 190 190 self.add_service(Uploader(helper_furl, self.stats_provider)) 191 self.download_cachedir = os.path.join(self.basedir, 192 "private", "cache", "download") 193 fileutil.make_dirs(self.download_cachedir) 191 download_cachedir = os.path.join(self.basedir, 192 "private", "cache", "download") 193 self.download_cache = cachedir.CacheDirectoryManager(download_cachedir) 194 self.download_cache.setServiceParent(self) 194 195 self.add_service(Downloader(self.stats_provider)) 195 196 self.add_service(MutableWatcher(self.stats_provider)) … … 340 341 node = LiteralFileNode(u, self) # LIT 341 342 else: 342 cachefile = os.path.join(self.download_cachedir, 343 base32.b2a(u.storage_index)) 344 # TODO: cachefile manager, weakref, expire policy 343 key = base32.b2a(u.storage_index) 344 cachefile = self.download_cache.get_file(key) 345 345 node = FileNode(u, self, cachefile) # CHK 346 346 else: -
TabularUnified src/allmydata/immutable/filenode.py ¶
rc205a54 rba019bf 63 63 return data 64 64 65 class CacheFile:65 class DownloadCache: 66 66 implements(IDownloadTarget) 67 67 68 def __init__(self, node, filename): 69 self.node = node 68 def __init__(self, node, cachefile): 69 self._downloader = node._client.getServiceNamed("downloader") 70 self._uri = node.get_uri() 71 self._storage_index = node.get_storage_index() 70 72 self.milestones = set() # of (offset,size,Deferred) 71 self.cachefile name = filename73 self.cachefile = cachefile 72 74 self.download_in_progress = False 73 75 # five states: … … 89 91 log.msg(format=("immutable filenode read [%(si)s]: " + 90 92 "starting download"), 91 si=base32.b2a(self. node.u.storage_index),93 si=base32.b2a(self._storage_index), 92 94 umid="h26Heg", level=log.OPERATIONAL) 93 downloader = self.node._client.getServiceNamed("downloader") 94 d2 = downloader.download(self.node.get_uri(), self) 95 d2 = self._downloader.download(self._uri, self) 95 96 d2.addBoth(self._download_done) 96 97 d2.addErrback(self._download_failed) … … 100 101 def read(self, consumer, offset, size): 101 102 assert offset+size <= self.get_filesize() 102 f = PortionOfFile(self.cachefile name, offset, size)103 f = PortionOfFile(self.cachefile.get_filename(), offset, size) 103 104 d = basic.FileSender().beginFileTransfer(f, consumer) 104 105 d.addCallback(lambda lastSent: consumer) … … 125 126 "%(offset)d+%(size)d vs %(filesize)d: " + 126 127 "done"), 127 si=base32.b2a(self. node.u.storage_index),128 si=base32.b2a(self._storage_index), 128 129 offset=offset, size=size, filesize=current_size, 129 130 umid="nuedUg", level=log.NOISY) … … 134 135 "%(offset)d+%(size)d vs %(filesize)d: " + 135 136 "still waiting"), 136 si=base32.b2a(self. node.u.storage_index),137 si=base32.b2a(self._storage_index), 137 138 offset=offset, size=size, filesize=current_size, 138 139 umid="8PKOhg", level=log.NOISY) … … 140 141 def get_filesize(self): 141 142 try: 142 filesize = os.stat(self.cachefile name)[stat.ST_SIZE]143 filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE] 143 144 except OSError: 144 145 filesize = 0 … … 147 148 148 149 def open(self, size): 149 self.f = open(self.cachefile name, "wb")150 self.f = open(self.cachefile.get_filename(), "wb") 150 151 151 152 def write(self, data): … … 172 173 def __init__(self, uri, client, cachefile): 173 174 _ImmutableFileNodeBase.__init__(self, uri, client) 174 self. cachefile = CacheFile(self, cachefile)175 self.download_cache = DownloadCache(self, cachefile) 175 176 176 177 def get_uri(self): … … 231 232 return self.download(download.ConsumerAdapter(consumer)) 232 233 233 d = self.cachefile.when_range_available(offset, size) 234 d.addCallback(lambda res: self.cachefile.read(consumer, offset, size)) 234 d = self.download_cache.when_range_available(offset, size) 235 d.addCallback(lambda res: 236 self.download_cache.read(consumer, offset, size)) 235 237 return d 236 238 -
TabularUnified src/allmydata/test/test_filenode.py ¶
rc205a54 rba019bf 5 5 from allmydata.immutable import filenode, download 6 6 from allmydata.mutable.node import MutableFileNode 7 from allmydata.util import hashutil 7 from allmydata.util import hashutil, cachedir 8 8 from allmydata.test.common import download_to_data 9 9 10 10 class NotANode: 11 11 pass 12 13 class FakeClient: 14 # just enough to let the node acquire a downloader (which it won't use) 15 def getServiceNamed(self, name): 16 return None 12 17 13 18 class Node(unittest.TestCase): … … 18 23 total_shares=10, 19 24 size=1000) 20 c = None 21 fn1 = filenode.FileNode(u, c, "cachefile") 22 fn2 = filenode.FileNode(u.to_string(), c, "cachefile") 25 c = FakeClient() 26 cf = cachedir.CacheFile("none") 27 fn1 = filenode.FileNode(u, c, cf) 28 fn2 = filenode.FileNode(u.to_string(), c, cf) 23 29 self.failUnlessEqual(fn1, fn2) 24 30 self.failIfEqual(fn1, "I am not a filenode")
Note: See TracChangeset
for help on using the changeset viewer.