source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/immutable.py
file stats: 200 lines, 199 executed: 99.5% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. import os, stat, struct, time
    2. 
    3. from foolscap.api import Referenceable
    4. 
    5. from zope.interface import implements
    6. from allmydata.interfaces import RIBucketWriter, RIBucketReader
    7. from allmydata.util import base32, fileutil, log
    8. from allmydata.util.assertutil import precondition
    9. from allmydata.util.hashutil import constant_time_compare
   10. from allmydata.storage.lease import LeaseInfo
   11. from allmydata.storage.common import UnknownImmutableContainerVersionError, \
   12.      DataTooLargeError
   13. 
   14. # each share file (in storage/shares/$SI/$SHNUM) contains lease information
   15. # and share data. The share data is accessed by RIBucketWriter.write and
   16. # RIBucketReader.read . The lease information is not accessible through these
   17. # interfaces.
   18. 
   19. # The share file has the following layout:
   20. #  0x00: share file version number, four bytes, current version is 1
   21. #  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
   22. #  0x08: number of leases, four bytes big-endian
   23. #  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
   24. #  A+0x0c = B: first lease. Lease format is:
   25. #   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
   26. #   B+0x04: renew secret, 32 bytes (SHA256)
   27. #   B+0x24: cancel secret, 32 bytes (SHA256)
   28. #   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
   29. #   B+0x48: next lease, or end of record
   30. 
   31. # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers,
   32. # but it is still filled in by storage servers in case the storage server
   33. # software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the
   34. # share file is moved from one storage server to another. The value stored in
   35. # this field is truncated, so if the actual share data length is >= 2**32,
   36. # then the value stored in this field will be the actual share data length
   37. # modulo 2**32.
   38. 
   39. class ShareFile:
   40.     LEASE_SIZE = struct.calcsize(">L32s32sL")
   41.     sharetype = "immutable"
   42. 
   43.     def __init__(self, filename, max_size=None, create=False):
   44.         """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
   45.         precondition((max_size is not None) or (not create), max_size, create)
   46.         self.home = filename
   47.         self._max_size = max_size
   48.         if create:
   49.             # touch the file, so later callers will see that we're working on
   50.             # it. Also construct the metadata.
   51.             assert not os.path.exists(self.home)
   52.             fileutil.make_dirs(os.path.dirname(self.home))
   53.             f = open(self.home, 'wb')
   54.             # The second field -- the four-byte share data length -- is no
   55.             # longer used as of Tahoe v1.3.0, but we continue to write it in
   56.             # there in case someone downgrades a storage server from >=
   57.             # Tahoe-1.3.0 to < Tahoe-1.3.0, or moves a share file from one
   58.             # server to another, etc. We do saturation -- a share data length
   59.             # larger than 2**32-1 (what can fit into the field) is marked as
   60.             # the largest length that can fit into the field. That way, even
   61.             # if this does happen, the old < v1.3.0 server will still allow
   62.             # clients to read the first part of the share.
   63.             f.write(struct.pack(">LLL", 1, min(2**32-1, max_size), 0))
   64.             f.close()
   65.             self._lease_offset = max_size + 0x0c
   66.             self._num_leases = 0
   67.         else:
   68.             f = open(self.home, 'rb')
   69.             filesize = os.path.getsize(self.home)
   70.             (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
   71.             f.close()
   72.             if version != 1:
   73.                 msg = "sharefile %s had version %d but we wanted 1" % \
   74.                       (filename, version)
   75.                 raise UnknownImmutableContainerVersionError(msg)
   76.             self._num_leases = num_leases
   77.             self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
   78.         self._data_offset = 0xc
   79. 
   80.     def unlink(self):
   81.         os.unlink(self.home)
   82. 
   83.     def read_share_data(self, offset, length):
   84.         precondition(offset >= 0)
   85.         # reads beyond the end of the data are truncated. Reads that start
   86.         # beyond the end of the data return an empty string. I wonder why
   87.         # Python doesn't do the following computation for me?
   88.         seekpos = self._data_offset+offset
   89.         fsize = os.path.getsize(self.home)
   90.         actuallength = max(0, min(length, fsize-seekpos))
   91.         if actuallength == 0:
   92.             return ""
   93.         f = open(self.home, 'rb')
   94.         f.seek(seekpos)
   95.         return f.read(actuallength)
   96. 
   97.     def write_share_data(self, offset, data):
   98.         length = len(data)
   99.         precondition(offset >= 0, offset)
  100.         if self._max_size is not None and offset+length > self._max_size:
  101.             raise DataTooLargeError(self._max_size, offset, length)
  102.         f = open(self.home, 'rb+')
  103.         real_offset = self._data_offset+offset
  104.         f.seek(real_offset)
  105.         assert f.tell() == real_offset
  106.         f.write(data)
  107.         f.close()
  108. 
  109.     def _write_lease_record(self, f, lease_number, lease_info):
  110.         offset = self._lease_offset + lease_number * self.LEASE_SIZE
  111.         f.seek(offset)
  112.         assert f.tell() == offset
  113.         f.write(lease_info.to_immutable_data())
  114. 
  115.     def _read_num_leases(self, f):
  116.         f.seek(0x08)
  117.         (num_leases,) = struct.unpack(">L", f.read(4))
  118.         return num_leases
  119. 
  120.     def _write_num_leases(self, f, num_leases):
  121.         f.seek(0x08)
  122.         f.write(struct.pack(">L", num_leases))
  123. 
  124.     def _truncate_leases(self, f, num_leases):
  125.         f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
  126. 
  127.     def get_leases(self):
  128.         """Yields a LeaseInfo instance for all leases."""
  129.         f = open(self.home, 'rb')
  130.         (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
  131.         f.seek(self._lease_offset)
  132.         for i in range(num_leases):
  133.             data = f.read(self.LEASE_SIZE)
  134.             if data:
  135.                 yield LeaseInfo().from_immutable_data(data)
  136. 
  137.     def add_lease(self, lease_info):
  138.         f = open(self.home, 'rb+')
  139.         num_leases = self._read_num_leases(f)
  140.         self._write_lease_record(f, num_leases, lease_info)
  141.         self._write_num_leases(f, num_leases+1)
  142.         f.close()
  143. 
  144.     def renew_lease(self, renew_secret, new_expire_time):
  145.         for i,lease in enumerate(self.get_leases()):
  146.             if constant_time_compare(lease.renew_secret, renew_secret):
  147.                 # yup. See if we need to update the owner time.
  148.                 if new_expire_time > lease.expiration_time:
  149.                     # yes
  150.                     lease.expiration_time = new_expire_time
  151.                     f = open(self.home, 'rb+')
  152.                     self._write_lease_record(f, i, lease)
  153.                     f.close()
  154.                 return
  155.         raise IndexError("unable to renew non-existent lease")
  156. 
  157.     def add_or_renew_lease(self, lease_info):
  158.         try:
  159.             self.renew_lease(lease_info.renew_secret,
  160.                              lease_info.expiration_time)
  161.         except IndexError:
  162.             self.add_lease(lease_info)
  163. 
  164. 
  165.     def cancel_lease(self, cancel_secret):
  166.         """Remove a lease with the given cancel_secret. If the last lease is
  167.         cancelled, the file will be removed. Return the number of bytes that
  168.         were freed (by truncating the list of leases, and possibly by
  169.         deleting the file. Raise IndexError if there was no lease with the
  170.         given cancel_secret.
  171.         """
  172. 
  173.         leases = list(self.get_leases())
  174.         num_leases_removed = 0
  175.         for i,lease in enumerate(leases):
  176.             if constant_time_compare(lease.cancel_secret, cancel_secret):
  177.                 leases[i] = None
  178.                 num_leases_removed += 1
  179.         if not num_leases_removed:
  180.             raise IndexError("unable to find matching lease to cancel")
  181.         if num_leases_removed:
  182.             # pack and write out the remaining leases. We write these out in
  183.             # the same order as they were added, so that if we crash while
  184.             # doing this, we won't lose any non-cancelled leases.
  185.             leases = [l for l in leases if l] # remove the cancelled leases
  186.             f = open(self.home, 'rb+')
  187.             for i,lease in enumerate(leases):
  188.                 self._write_lease_record(f, i, lease)
  189.             self._write_num_leases(f, len(leases))
  190.             self._truncate_leases(f, len(leases))
  191.             f.close()
  192.         space_freed = self.LEASE_SIZE * num_leases_removed
  193.         if not len(leases):
  194.             space_freed += os.stat(self.home)[stat.ST_SIZE]
  195.             self.unlink()
  196.         return space_freed
  197. 
  198. 
  199. class BucketWriter(Referenceable):
  200.     implements(RIBucketWriter)
  201. 
  202.     def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
  203.         self.ss = ss
  204.         self.incominghome = incominghome
  205.         self.finalhome = finalhome
  206.         self._max_size = max_size # don't allow the client to write more than this
  207.         self._canary = canary
  208.         self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
  209.         self.closed = False
  210.         self.throw_out_all_data = False
  211.         self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
  212.         # also, add our lease to the file now, so that other ones can be
  213.         # added by simultaneous uploaders
  214.         self._sharefile.add_lease(lease_info)
  215. 
  216.     def allocated_size(self):
  217.         return self._max_size
  218. 
  219.     def remote_write(self, offset, data):
  220.         start = time.time()
  221.         precondition(not self.closed)
  222.         if self.throw_out_all_data:
  223.             return
  224.         self._sharefile.write_share_data(offset, data)
  225.         self.ss.add_latency("write", time.time() - start)
  226.         self.ss.count("write")
  227. 
  228.     def remote_close(self):
  229.         precondition(not self.closed)
  230.         start = time.time()
  231. 
  232.         fileutil.make_dirs(os.path.dirname(self.finalhome))
  233.         fileutil.rename(self.incominghome, self.finalhome)
  234.         try:
  235.             # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
  236.             # We try to delete the parent (.../ab/abcde) to avoid leaving
  237.             # these directories lying around forever, but the delete might
  238.             # fail if we're working on another share for the same storage
  239.             # index (like ab/abcde/5). The alternative approach would be to
  240.             # use a hierarchy of objects (PrefixHolder, BucketHolder,
  241.             # ShareWriter), each of which is responsible for a single
  242.             # directory on disk, and have them use reference counting of
  243.             # their children to know when they should do the rmdir. This
  244.             # approach is simpler, but relies on os.rmdir refusing to delete
  245.             # a non-empty directory. Do *not* use fileutil.rm_dir() here!
  246.             os.rmdir(os.path.dirname(self.incominghome))
  247.             # we also delete the grandparent (prefix) directory, .../ab ,
  248.             # again to avoid leaving directories lying around. This might
  249.             # fail if there is another bucket open that shares a prefix (like
  250.             # ab/abfff).
  251.             os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
  252.             # we leave the great-grandparent (incoming/) directory in place.
  253.         except EnvironmentError:
  254.             # ignore the "can't rmdir because the directory is not empty"
  255.             # exceptions, those are normal consequences of the
  256.             # above-mentioned conditions.
  257.             pass
  258.         self._sharefile = None
  259.         self.closed = True
  260.         self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
  261. 
  262.         filelen = os.stat(self.finalhome)[stat.ST_SIZE]
  263.         self.ss.bucket_writer_closed(self, filelen)
  264.         self.ss.add_latency("close", time.time() - start)
  265.         self.ss.count("close")
  266. 
  267.     def _disconnected(self):
  268.         if not self.closed:
  269.             self._abort()
  270. 
  271.     def remote_abort(self):
  272.         log.msg("storage: aborting sharefile %s" % self.incominghome,
  273.                 facility="tahoe.storage", level=log.UNUSUAL)
  274.         if not self.closed:
  275.             self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
  276.         self._abort()
  277.         self.ss.count("abort")
  278. 
  279.     def _abort(self):
  280.         if self.closed:
  281.             return
  282.         os.remove(self.incominghome)
  283.         # if we were the last share to be moved, remove the incoming/
  284.         # directory that was our parent
  285.         parentdir = os.path.split(self.incominghome)[0]
  286.         if not os.listdir(parentdir):
  287.             os.rmdir(parentdir)
  288. 
  289. 
  290. 
  291. class BucketReader(Referenceable):
  292.     implements(RIBucketReader)
  293. 
  294.     def __init__(self, ss, sharefname, storage_index=None, shnum=None):
  295.         self.ss = ss
  296.         self._share_file = ShareFile(sharefname)
  297.         self.storage_index = storage_index
  298.         self.shnum = shnum
  299. 
  300.     def __repr__(self):
  301.         return "<%s %s %s>" % (self.__class__.__name__,
  302.                                base32.b2a_l(self.storage_index[:8], 60),
  303.                                self.shnum)
  304. 
  305.     def remote_read(self, offset, length):
  306.         start = time.time()
  307.         data = self._share_file.read_share_data(offset, length)
  308.         self.ss.add_latency("read", time.time() - start)
  309.         self.ss.count("read")
  310.         return data
  311. 
  312.     def remote_advise_corrupt_share(self, reason):
  313.         return self.ss.remote_advise_corrupt_share("immutable",
  314.                                                    self.storage_index,
  315.                                                    self.shnum,
  316.                                                    reason)