source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/filenode.py
file stats: 274 lines, 262 executed: 95.6% covered
coverage versus previous test: 0 lines added, 0 lines removed
1. import copy, os.path, stat
2. from cStringIO import StringIO
3. from zope.interface import implements
4. from twisted.internet import defer
5. from twisted.internet.interfaces import IPushProducer, IConsumer
6. from twisted.protocols import basic
7. from foolscap.api import eventually
8. from allmydata.interfaces import IFileNode, ICheckable, \
9. IDownloadTarget, IUploadResults
10. from allmydata.util import dictutil, log, base32
11. from allmydata import uri as urimodule
12. from allmydata.immutable.checker import Checker
13. from allmydata.check_results import CheckResults, CheckAndRepairResults
14. from allmydata.immutable.repairer import Repairer
15. from allmydata.immutable import download
16.
17. class _ImmutableFileNodeBase(object):
18. implements(IFileNode, ICheckable)
19.
20. def get_readonly_uri(self):
21. return self.get_uri()
22.
23. def is_mutable(self):
24. return False
25.
26. def is_readonly(self):
27. return True
28.
29. def __hash__(self):
30. return self.u.__hash__()
31. def __eq__(self, other):
32. if IFileNode.providedBy(other):
33. return self.u.__eq__(other.u)
34. else:
35. return False
36. def __ne__(self, other):
37. if IFileNode.providedBy(other):
38. return self.u.__eq__(other.u)
39. else:
40. return True
41.
42. class PortionOfFile:
43. # like a list slice (things[2:14]), but for a file on disk
44. def __init__(self, fn, offset=0, size=None):
45. self.f = open(fn, "rb")
46. self.f.seek(offset)
47. self.bytes_left = size
48.
49. def read(self, size=None):
50. # bytes_to_read = min(size, self.bytes_left), but None>anything
51. if size is None:
52. bytes_to_read = self.bytes_left
53. elif self.bytes_left is None:
54. bytes_to_read = size
55. else:
56. bytes_to_read = min(size, self.bytes_left)
57. data = self.f.read(bytes_to_read)
58. if self.bytes_left is not None:
59. self.bytes_left -= len(data)
60. return data
61.
62. class DownloadCache:
63. implements(IDownloadTarget)
64.
65. def __init__(self, filecap, storage_index, downloader,
66. cachedirectorymanager):
67. self._downloader = downloader
68. self._uri = filecap
69. self._storage_index = storage_index
70. self.milestones = set() # of (offset,size,Deferred)
71. self.cachedirectorymanager = cachedirectorymanager
72. self.cachefile = None
73. self.download_in_progress = False
74. # five states:
75. # new FileNode, no downloads ever performed
76. # new FileNode, leftover file (partial)
77. # new FileNode, leftover file (whole)
78. # download in progress, not yet complete
79. # download complete
80.
81. def when_range_available(self, offset, size):
82. assert isinstance(offset, (int,long))
83. assert isinstance(size, (int,long))
84.
85. d = defer.Deferred()
86. self.milestones.add( (offset,size,d) )
87. self._check_milestones()
88. if self.milestones and not self.download_in_progress:
89. self.download_in_progress = True
90. log.msg(format=("immutable filenode read [%(si)s]: " +
91. "starting download"),
92. si=base32.b2a(self._storage_index),
93. umid="h26Heg", level=log.OPERATIONAL)
94. d2 = self._downloader.download(self._uri, self)
95. d2.addBoth(self._download_done)
96. d2.addErrback(self._download_failed)
97. d2.addErrback(log.err, umid="cQaM9g")
98. return d
99.
100. def read(self, consumer, offset, size):
101. assert offset+size <= self.get_filesize()
102. if not self.cachefile:
103. self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
104. f = PortionOfFile(self.cachefile.get_filename(), offset, size)
105. d = basic.FileSender().beginFileTransfer(f, consumer)
106. d.addCallback(lambda lastSent: consumer)
107. return d
108.
109. def _download_done(self, res):
110. # clear download_in_progress, so failed downloads can be re-tried
111. self.download_in_progress = False
112. return res
113.
114. def _download_failed(self, f):
115. # tell anyone who's waiting that we failed
116. for m in self.milestones:
117. (offset,size,d) = m
118. eventually(d.errback, f)
119. self.milestones.clear()
120.
121. def _check_milestones(self):
122. current_size = self.get_filesize()
123. for m in list(self.milestones):
124. (offset,size,d) = m
125. if offset+size <= current_size:
126. log.msg(format=("immutable filenode read [%(si)s] " +
127. "%(offset)d+%(size)d vs %(filesize)d: " +
128. "done"),
129. si=base32.b2a(self._storage_index),
130. offset=offset, size=size, filesize=current_size,
131. umid="nuedUg", level=log.NOISY)
132. self.milestones.discard(m)
133. eventually(d.callback, None)
134. else:
135. log.msg(format=("immutable filenode read [%(si)s] " +
136. "%(offset)d+%(size)d vs %(filesize)d: " +
137. "still waiting"),
138. si=base32.b2a(self._storage_index),
139. offset=offset, size=size, filesize=current_size,
140. umid="8PKOhg", level=log.NOISY)
141.
142. def get_filesize(self):
143. if not self.cachefile:
144. self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
145. try:
146. filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
147. except OSError:
148. filesize = 0
149. return filesize
150.
151.
152. def open(self, size):
153. if not self.cachefile:
154. self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
155. self.f = open(self.cachefile.get_filename(), "wb")
156.
157. def write(self, data):
158. self.f.write(data)
159. self._check_milestones()
160.
161. def close(self):
162. self.f.close()
163. self._check_milestones()
164.
165. def fail(self, why):
166. pass
167. def register_canceller(self, cb):
168. pass
169. def finish(self):
170. return None
171. # The following methods are just because the target might be a
172. # repairer.DownUpConnector, and just because the current CHKUpload object
173. # expects to find the storage index and encoding parameters in its
174. # Uploadable.
175. def set_storageindex(self, storageindex):
176. pass
177. def set_encodingparams(self, encodingparams):
178. pass
179.
180.
181. class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
182. def __init__(self, filecap, storage_broker, secret_holder,
183. downloader, history, cachedirectorymanager):
184. assert isinstance(filecap, str)
185. self.u = urimodule.CHKFileURI.init_from_string(filecap)
186. self._storage_broker = storage_broker
187. self._secret_holder = secret_holder
188. self._downloader = downloader
189. self._history = history
190. storage_index = self.get_storage_index()
191. self.download_cache = DownloadCache(filecap, storage_index, downloader,
192. cachedirectorymanager)
193. prefix = self.u.get_verify_cap().to_string()
194. log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
195. self.log("starting", level=log.OPERATIONAL)
196.
197. def get_uri(self):
198. return self.u.to_string()
199.
200. def get_size(self):
201. return self.u.get_size()
202.
203. def get_verify_cap(self):
204. return self.u.get_verify_cap()
205.
206. def get_repair_cap(self):
207. # CHK files can be repaired with just the verifycap
208. return self.u.get_verify_cap()
209.
210. def get_storage_index(self):
211. return self.u.storage_index
212.
213. def check_and_repair(self, monitor, verify=False, add_lease=False):
214. verifycap = self.get_verify_cap()
215. sb = self._storage_broker
216. servers = sb.get_all_servers()
217. sh = self._secret_holder
218.
219. c = Checker(verifycap=verifycap, servers=servers,
220. verify=verify, add_lease=add_lease, secret_holder=sh,
221. monitor=monitor)
222. d = c.start()
223. def _maybe_repair(cr):
224. crr = CheckAndRepairResults(self.u.storage_index)
225. crr.pre_repair_results = cr
226. if cr.is_healthy():
227. crr.post_repair_results = cr
228. return defer.succeed(crr)
229. else:
230. crr.repair_attempted = True
231. crr.repair_successful = False # until proven successful
232. def _gather_repair_results(ur):
233. assert IUploadResults.providedBy(ur), ur
234. # clone the cr -- check results to form the basic of the prr -- post-repair results
235. prr = CheckResults(cr.uri, cr.storage_index)
236. prr.data = copy.deepcopy(cr.data)
237.
238. sm = prr.data['sharemap']
239. assert isinstance(sm, dictutil.DictOfSets), sm
240. sm.update(ur.sharemap)
241. servers_responding = set(prr.data['servers-responding'])
242. servers_responding.union(ur.sharemap.iterkeys())
243. prr.data['servers-responding'] = list(servers_responding)
244. prr.data['count-shares-good'] = len(sm)
245. prr.data['count-good-share-hosts'] = len(sm)
246. is_healthy = bool(len(sm) >= self.u.total_shares)
247. is_recoverable = bool(len(sm) >= self.u.needed_shares)
248. prr.set_healthy(is_healthy)
249. prr.set_recoverable(is_recoverable)
250. crr.repair_successful = is_healthy
251. prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
252.
253. crr.post_repair_results = prr
254. return crr
255. def _repair_error(f):
256. # as with mutable repair, I'm not sure if I want to pass
257. # through a failure or not. TODO
258. crr.repair_successful = False
259. crr.repair_failure = f
260. return f
261. r = Repairer(storage_broker=sb, secret_holder=sh,
262. verifycap=verifycap, monitor=monitor)
263. d = r.start()
264. d.addCallbacks(_gather_repair_results, _repair_error)
265. return d
266.
267. d.addCallback(_maybe_repair)
268. return d
269.
270. def check(self, monitor, verify=False, add_lease=False):
271. verifycap = self.get_verify_cap()
272. sb = self._storage_broker
273. servers = sb.get_all_servers()
274. sh = self._secret_holder
275.
276. v = Checker(verifycap=verifycap, servers=servers,
277. verify=verify, add_lease=add_lease, secret_holder=sh,
278. monitor=monitor)
279. return v.start()
280.
281. def read(self, consumer, offset=0, size=None):
282. if size is None:
283. size = self.get_size() - offset
284. size = min(size, self.get_size() - offset)
285.
286. if offset == 0 and size == self.get_size():
287. # don't use the cache, just do a normal streaming download
288. self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
289. return self.download(download.ConsumerAdapter(consumer))
290.
291. d = self.download_cache.when_range_available(offset, size)
292. d.addCallback(lambda res:
293. self.download_cache.read(consumer, offset, size))
294. return d
295.
296. def download(self, target):
297. return self._downloader.download(self.get_uri(), target,
298. self._parentmsgid,
299. history=self._history)
300.
301. def download_to_data(self):
302. return self._downloader.download_to_data(self.get_uri(),
303. history=self._history)
304.
305. class LiteralProducer:
306. implements(IPushProducer)
307. def resumeProducing(self):
308. pass
309. def stopProducing(self):
310. pass
311.
312.
313. class LiteralFileNode(_ImmutableFileNodeBase):
314.
315. def __init__(self, filecap):
316. assert isinstance(filecap, str)
317. self.u = urimodule.LiteralFileURI.init_from_string(filecap)
318.
319. def get_uri(self):
320. return self.u.to_string()
321.
322. def get_size(self):
323. return len(self.u.data)
324.
325. def get_verify_cap(self):
326. return None
327.
328. def get_repair_cap(self):
329. return None
330.
331. def get_storage_index(self):
332. return None
333.
334. def check(self, monitor, verify=False, add_lease=False):
335. return defer.succeed(None)
336.
337. def check_and_repair(self, monitor, verify=False, add_lease=False):
338. return defer.succeed(None)
339.
340. def read(self, consumer, offset=0, size=None):
341. if size is None:
342. data = self.u.data[offset:]
343. else:
344. data = self.u.data[offset:offset+size]
345.
346. # We use twisted.protocols.basic.FileSender, which only does
347. # non-streaming, i.e. PullProducer, where the receiver/consumer must
348. # ask explicitly for each chunk of data. There are only two places in
349. # the Twisted codebase that can't handle streaming=False, both of
350. # which are in the upload path for an FTP/SFTP server
351. # (protocols.ftp.FileConsumer and
352. # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
353. # likely to be used as the target for a Tahoe download.
354.
355. d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
356. d.addCallback(lambda lastSent: consumer)
357. return d
358.
359. def download(self, target):
360. # note that this does not update the stats_provider
361. data = self.u.data
362. if IConsumer.providedBy(target):
363. target.registerProducer(LiteralProducer(), True)
364. target.open(len(data))
365. target.write(data)
366. if IConsumer.providedBy(target):
367. target.unregisterProducer()
368. target.close()
369. return defer.maybeDeferred(target.finish)
370.
371. def download_to_data(self):
372. data = self.u.data
373. return defer.succeed(data)