source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/filenode.py
file stats: 274 lines, 262 executed: 95.6% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. import copy, os.path, stat
    2. from cStringIO import StringIO
    3. from zope.interface import implements
    4. from twisted.internet import defer
    5. from twisted.internet.interfaces import IPushProducer, IConsumer
    6. from twisted.protocols import basic
    7. from foolscap.api import eventually
    8. from allmydata.interfaces import IFileNode, ICheckable, \
    9.      IDownloadTarget, IUploadResults
   10. from allmydata.util import dictutil, log, base32
   11. from allmydata import uri as urimodule
   12. from allmydata.immutable.checker import Checker
   13. from allmydata.check_results import CheckResults, CheckAndRepairResults
   14. from allmydata.immutable.repairer import Repairer
   15. from allmydata.immutable import download
   16. 
   17. class _ImmutableFileNodeBase(object):
   18.     implements(IFileNode, ICheckable)
   19. 
   20.     def get_readonly_uri(self):
   21.         return self.get_uri()
   22. 
   23.     def is_mutable(self):
   24.         return False
   25. 
   26.     def is_readonly(self):
   27.         return True
   28. 
   29.     def __hash__(self):
   30.         return self.u.__hash__()
   31.     def __eq__(self, other):
   32.         if IFileNode.providedBy(other):
   33.             return self.u.__eq__(other.u)
   34.         else:
   35.             return False
   36.     def __ne__(self, other):
   37.         if IFileNode.providedBy(other):
   38.             return self.u.__eq__(other.u)
   39.         else:
   40.             return True
   41. 
   42. class PortionOfFile:
   43.     # like a list slice (things[2:14]), but for a file on disk
   44.     def __init__(self, fn, offset=0, size=None):
   45.         self.f = open(fn, "rb")
   46.         self.f.seek(offset)
   47.         self.bytes_left = size
   48. 
   49.     def read(self, size=None):
   50.         # bytes_to_read = min(size, self.bytes_left), but None>anything
   51.         if size is None:
   52.             bytes_to_read = self.bytes_left
   53.         elif self.bytes_left is None:
   54.             bytes_to_read = size
   55.         else:
   56.             bytes_to_read = min(size, self.bytes_left)
   57.         data = self.f.read(bytes_to_read)
   58.         if self.bytes_left is not None:
   59.             self.bytes_left -= len(data)
   60.         return data
   61. 
   62. class DownloadCache:
   63.     implements(IDownloadTarget)
   64. 
   65.     def __init__(self, filecap, storage_index, downloader,
   66.                  cachedirectorymanager):
   67.         self._downloader = downloader
   68.         self._uri = filecap
   69.         self._storage_index = storage_index
   70.         self.milestones = set() # of (offset,size,Deferred)
   71.         self.cachedirectorymanager = cachedirectorymanager
   72.         self.cachefile = None
   73.         self.download_in_progress = False
   74.         # five states:
   75.         #  new FileNode, no downloads ever performed
   76.         #  new FileNode, leftover file (partial)
   77.         #  new FileNode, leftover file (whole)
   78.         #  download in progress, not yet complete
   79.         #  download complete
   80. 
   81.     def when_range_available(self, offset, size):
   82.         assert isinstance(offset, (int,long))
   83.         assert isinstance(size, (int,long))
   84. 
   85.         d = defer.Deferred()
   86.         self.milestones.add( (offset,size,d) )
   87.         self._check_milestones()
   88.         if self.milestones and not self.download_in_progress:
   89.             self.download_in_progress = True
   90.             log.msg(format=("immutable filenode read [%(si)s]: " +
   91.                             "starting download"),
   92.                     si=base32.b2a(self._storage_index),
   93.                     umid="h26Heg", level=log.OPERATIONAL)
   94.             d2 = self._downloader.download(self._uri, self)
   95.             d2.addBoth(self._download_done)
   96.             d2.addErrback(self._download_failed)
   97.             d2.addErrback(log.err, umid="cQaM9g")
   98.         return d
   99. 
  100.     def read(self, consumer, offset, size):
  101.         assert offset+size <= self.get_filesize()
  102.         if not self.cachefile:
  103.             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
  104.         f = PortionOfFile(self.cachefile.get_filename(), offset, size)
  105.         d = basic.FileSender().beginFileTransfer(f, consumer)
  106.         d.addCallback(lambda lastSent: consumer)
  107.         return d
  108. 
  109.     def _download_done(self, res):
  110.         # clear download_in_progress, so failed downloads can be re-tried
  111.         self.download_in_progress = False
  112.         return res
  113. 
  114.     def _download_failed(self, f):
  115.         # tell anyone who's waiting that we failed
  116.         for m in self.milestones:
  117.             (offset,size,d) = m
  118.             eventually(d.errback, f)
  119.         self.milestones.clear()
  120. 
  121.     def _check_milestones(self):
  122.         current_size = self.get_filesize()
  123.         for m in list(self.milestones):
  124.             (offset,size,d) = m
  125.             if offset+size <= current_size:
  126.                 log.msg(format=("immutable filenode read [%(si)s] " +
  127.                                 "%(offset)d+%(size)d vs %(filesize)d: " +
  128.                                 "done"),
  129.                         si=base32.b2a(self._storage_index),
  130.                         offset=offset, size=size, filesize=current_size,
  131.                         umid="nuedUg", level=log.NOISY)
  132.                 self.milestones.discard(m)
  133.                 eventually(d.callback, None)
  134.             else:
  135.                 log.msg(format=("immutable filenode read [%(si)s] " +
  136.                                 "%(offset)d+%(size)d vs %(filesize)d: " +
  137.                                 "still waiting"),
  138.                         si=base32.b2a(self._storage_index),
  139.                         offset=offset, size=size, filesize=current_size,
  140.                         umid="8PKOhg", level=log.NOISY)
  141. 
  142.     def get_filesize(self):
  143.         if not self.cachefile:
  144.             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
  145.         try:
  146.             filesize = os.stat(self.cachefile.get_filename())[stat.ST_SIZE]
  147.         except OSError:
  148.             filesize = 0
  149.         return filesize
  150. 
  151. 
  152.     def open(self, size):
  153.         if not self.cachefile:
  154.             self.cachefile = self.cachedirectorymanager.get_file(base32.b2a(self._storage_index))
  155.         self.f = open(self.cachefile.get_filename(), "wb")
  156. 
  157.     def write(self, data):
  158.         self.f.write(data)
  159.         self._check_milestones()
  160. 
  161.     def close(self):
  162.         self.f.close()
  163.         self._check_milestones()
  164. 
  165.     def fail(self, why):
  166.         pass
  167.     def register_canceller(self, cb):
  168.         pass
  169.     def finish(self):
  170.         return None
  171.     # The following methods are just because the target might be a
  172.     # repairer.DownUpConnector, and just because the current CHKUpload object
  173.     # expects to find the storage index and encoding parameters in its
  174.     # Uploadable.
  175.     def set_storageindex(self, storageindex):
  176.         pass
  177.     def set_encodingparams(self, encodingparams):
  178.         pass
  179. 
  180. 
  181. class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
  182.     def __init__(self, filecap, storage_broker, secret_holder,
  183.                  downloader, history, cachedirectorymanager):
  184.         assert isinstance(filecap, str)
  185.         self.u = urimodule.CHKFileURI.init_from_string(filecap)
  186.         self._storage_broker = storage_broker
  187.         self._secret_holder = secret_holder
  188.         self._downloader = downloader
  189.         self._history = history
  190.         storage_index = self.get_storage_index()
  191.         self.download_cache = DownloadCache(filecap, storage_index, downloader,
  192.                                             cachedirectorymanager)
  193.         prefix = self.u.get_verify_cap().to_string()
  194.         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
  195.         self.log("starting", level=log.OPERATIONAL)
  196. 
  197.     def get_uri(self):
  198.         return self.u.to_string()
  199. 
  200.     def get_size(self):
  201.         return self.u.get_size()
  202. 
  203.     def get_verify_cap(self):
  204.         return self.u.get_verify_cap()
  205. 
  206.     def get_repair_cap(self):
  207.         # CHK files can be repaired with just the verifycap
  208.         return self.u.get_verify_cap()
  209. 
  210.     def get_storage_index(self):
  211.         return self.u.storage_index
  212. 
  213.     def check_and_repair(self, monitor, verify=False, add_lease=False):
  214.         verifycap = self.get_verify_cap()
  215.         sb = self._storage_broker
  216.         servers = sb.get_all_servers()
  217.         sh = self._secret_holder
  218. 
  219.         c = Checker(verifycap=verifycap, servers=servers,
  220.                     verify=verify, add_lease=add_lease, secret_holder=sh,
  221.                     monitor=monitor)
  222.         d = c.start()
  223.         def _maybe_repair(cr):
  224.             crr = CheckAndRepairResults(self.u.storage_index)
  225.             crr.pre_repair_results = cr
  226.             if cr.is_healthy():
  227.                 crr.post_repair_results = cr
  228.                 return defer.succeed(crr)
  229.             else:
  230.                 crr.repair_attempted = True
  231.                 crr.repair_successful = False # until proven successful
  232.                 def _gather_repair_results(ur):
  233.                     assert IUploadResults.providedBy(ur), ur
  234.                     # clone the cr -- check results to form the basic of the prr -- post-repair results
  235.                     prr = CheckResults(cr.uri, cr.storage_index)
  236.                     prr.data = copy.deepcopy(cr.data)
  237. 
  238.                     sm = prr.data['sharemap']
  239.                     assert isinstance(sm, dictutil.DictOfSets), sm
  240.                     sm.update(ur.sharemap)
  241.                     servers_responding = set(prr.data['servers-responding'])
  242.                     servers_responding.union(ur.sharemap.iterkeys())
  243.                     prr.data['servers-responding'] = list(servers_responding)
  244.                     prr.data['count-shares-good'] = len(sm)
  245.                     prr.data['count-good-share-hosts'] = len(sm)
  246.                     is_healthy = bool(len(sm) >= self.u.total_shares)
  247.                     is_recoverable = bool(len(sm) >= self.u.needed_shares)
  248.                     prr.set_healthy(is_healthy)
  249.                     prr.set_recoverable(is_recoverable)
  250.                     crr.repair_successful = is_healthy
  251.                     prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
  252. 
  253.                     crr.post_repair_results = prr
  254.                     return crr
  255.                 def _repair_error(f):
  256.                     # as with mutable repair, I'm not sure if I want to pass
  257.                     # through a failure or not. TODO
  258.                     crr.repair_successful = False
  259.                     crr.repair_failure = f
  260.                     return f
  261.                 r = Repairer(storage_broker=sb, secret_holder=sh,
  262.                              verifycap=verifycap, monitor=monitor)
  263.                 d = r.start()
  264.                 d.addCallbacks(_gather_repair_results, _repair_error)
  265.                 return d
  266. 
  267.         d.addCallback(_maybe_repair)
  268.         return d
  269. 
  270.     def check(self, monitor, verify=False, add_lease=False):
  271.         verifycap = self.get_verify_cap()
  272.         sb = self._storage_broker
  273.         servers = sb.get_all_servers()
  274.         sh = self._secret_holder
  275. 
  276.         v = Checker(verifycap=verifycap, servers=servers,
  277.                     verify=verify, add_lease=add_lease, secret_holder=sh,
  278.                     monitor=monitor)
  279.         return v.start()
  280. 
  281.     def read(self, consumer, offset=0, size=None):
  282.         if size is None:
  283.             size = self.get_size() - offset
  284.         size = min(size, self.get_size() - offset)
  285. 
  286.         if offset == 0 and size == self.get_size():
  287.             # don't use the cache, just do a normal streaming download
  288.             self.log("doing normal full download", umid="VRSBwg", level=log.OPERATIONAL)
  289.             return self.download(download.ConsumerAdapter(consumer))
  290. 
  291.         d = self.download_cache.when_range_available(offset, size)
  292.         d.addCallback(lambda res:
  293.                       self.download_cache.read(consumer, offset, size))
  294.         return d
  295. 
  296.     def download(self, target):
  297.         return self._downloader.download(self.get_uri(), target,
  298.                                          self._parentmsgid,
  299.                                          history=self._history)
  300. 
  301.     def download_to_data(self):
  302.         return self._downloader.download_to_data(self.get_uri(),
  303.                                                  history=self._history)
  304. 
  305. class LiteralProducer:
  306.     implements(IPushProducer)
  307.     def resumeProducing(self):
  308.         pass
  309.     def stopProducing(self):
  310.         pass
  311. 
  312. 
  313. class LiteralFileNode(_ImmutableFileNodeBase):
  314. 
  315.     def __init__(self, filecap):
  316.         assert isinstance(filecap, str)
  317.         self.u = urimodule.LiteralFileURI.init_from_string(filecap)
  318. 
  319.     def get_uri(self):
  320.         return self.u.to_string()
  321. 
  322.     def get_size(self):
  323.         return len(self.u.data)
  324. 
  325.     def get_verify_cap(self):
  326.         return None
  327. 
  328.     def get_repair_cap(self):
  329.         return None
  330. 
  331.     def get_storage_index(self):
  332.         return None
  333. 
  334.     def check(self, monitor, verify=False, add_lease=False):
  335.         return defer.succeed(None)
  336. 
  337.     def check_and_repair(self, monitor, verify=False, add_lease=False):
  338.         return defer.succeed(None)
  339. 
  340.     def read(self, consumer, offset=0, size=None):
  341.         if size is None:
  342.             data = self.u.data[offset:]
  343.         else:
  344.             data = self.u.data[offset:offset+size]
  345. 
  346.         # We use twisted.protocols.basic.FileSender, which only does
  347.         # non-streaming, i.e. PullProducer, where the receiver/consumer must
  348.         # ask explicitly for each chunk of data. There are only two places in
  349.         # the Twisted codebase that can't handle streaming=False, both of
  350.         # which are in the upload path for an FTP/SFTP server
  351.         # (protocols.ftp.FileConsumer and
  352.         # vfs.adapters.ftp._FileToConsumerAdapter), neither of which is
  353.         # likely to be used as the target for a Tahoe download.
  354. 
  355.         d = basic.FileSender().beginFileTransfer(StringIO(data), consumer)
  356.         d.addCallback(lambda lastSent: consumer)
  357.         return d
  358. 
  359.     def download(self, target):
  360.         # note that this does not update the stats_provider
  361.         data = self.u.data
  362.         if IConsumer.providedBy(target):
  363.             target.registerProducer(LiteralProducer(), True)
  364.         target.open(len(data))
  365.         target.write(data)
  366.         if IConsumer.providedBy(target):
  367.             target.unregisterProducer()
  368.         target.close()
  369.         return defer.maybeDeferred(target.finish)
  370. 
  371.     def download_to_data(self):
  372.         data = self.u.data
  373.         return defer.succeed(data)