source: trunk/src/allmydata/storage/immutable.py

Last change on this file was 4da491a, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-03-11T20:37:27Z

remove more usage of "future"

  • Property mode set to 100644
File size: 22.4 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os, stat, struct, time
6
7from collections_extended import RangeMap
8
9from foolscap.api import Referenceable
10
11from zope.interface import implementer
12from allmydata.interfaces import (
13    RIBucketWriter, RIBucketReader, ConflictingWriteError,
14    DataTooLargeError,
15    NoSpace,
16)
17from allmydata.util import base32, fileutil, log
18from allmydata.util.assertutil import precondition
19from allmydata.storage.common import UnknownImmutableContainerVersionError
20
21from .immutable_schema import (
22    NEWEST_SCHEMA_VERSION,
23    schema_from_version,
24)
25
26
27# each share file (in storage/shares/$SI/$SHNUM) contains lease information
28# and share data. The share data is accessed by RIBucketWriter.write and
29# RIBucketReader.read . The lease information is not accessible through these
30# interfaces.
31
32# The share file has the following layout:
33#  0x00: share file version number, four bytes, current version is 2
34#  0x04: share data length, four bytes big-endian = A # See Footnote 1 below.
35#  0x08: number of leases, four bytes big-endian
36#  0x0c: beginning of share data (see immutable.layout.WriteBucketProxy)
37#  A+0x0c = B: first lease. Lease format is:
38#   B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner
39#   B+0x04: renew secret, 32 bytes (SHA256 + blake2b) # See Footnote 2 below.
40#   B+0x24: cancel secret, 32 bytes (SHA256 + blake2b)
41#   B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch
42#   B+0x48: next lease, or end of record
43
44# Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers,
45# but it is still filled in by storage servers in case the storage server
46# software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the
47# share file is moved from one storage server to another. The value stored in
48# this field is truncated, so if the actual share data length is >= 2**32,
49# then the value stored in this field will be the actual share data length
50# modulo 2**32.
51
52# Footnote 2: The change between share file version number 1 and 2 is that
53# storage of lease secrets is changed from plaintext to hashed.  This change
54# protects the secrets from compromises of local storage on the server: if a
55# plaintext cancel secret is somehow exfiltrated from the storage server, an
56# attacker could use it to cancel that lease and potentially cause user data
57# to be discarded before intended by the real owner.  As of this comment,
58# lease cancellation is disabled because there have been at least two bugs
59# which leak the persisted value of the cancellation secret.  If lease secrets
60# were stored hashed instead of plaintext then neither of these bugs would
61# have allowed an attacker to learn a usable cancel secret.
62#
63# Clients are free to construct these secrets however they like.  The
64# Tahoe-LAFS client uses a SHA256-based construction.  The server then uses
65# blake2b to hash these values for storage so that it retains no persistent
66# copy of the original secret.
67#
68
69def _fix_lease_count_format(lease_count_format):
70    """
71    Turn a single character struct format string into a format string suitable
72    for use in encoding and decoding the lease count value inside a share
73    file, if possible.
74
75    :param str lease_count_format: A single character format string like
76        ``"B"`` or ``"L"``.
77
78    :raise ValueError: If the given format string is not suitable for use
79        encoding and decoding a lease count.
80
81    :return str: A complete format string which can safely be used to encode
82        and decode lease counts in a share file.
83    """
84    if len(lease_count_format) != 1:
85        raise ValueError(
86            "Cannot construct ShareFile with lease_count_format={!r}; "
87            "format must accept a single value".format(
88                lease_count_format,
89            ),
90        )
91    # Make it big-endian with standard size so all platforms agree on the
92    # result.
93    fixed = ">" + lease_count_format
94    if struct.calcsize(fixed) > 4:
95        # There is only room for at most 4 bytes in the share file format so
96        # we can't allow any larger formats.
97        raise ValueError(
98            "Cannot construct ShareFile with lease_count_format={!r}; "
99            "size must be smaller than size of '>L'".format(
100                lease_count_format,
101            ),
102        )
103    return fixed
104
105
106class ShareFile(object):
107    """
108    Support interaction with persistent storage of a share.
109
110    :ivar str _lease_count_format: The format string which is used to encode
111        and decode the lease count inside the share file.  As stated in the
112        comment in this module there is room for at most 4 bytes in this part
113        of the file.  A format string that works on fewer bytes is allowed to
114        restrict the number of leases allowed in the share file to a smaller
115        number than could be supported by using the full 4 bytes.  This is
116        mostly of interest for testing.
117    """
118    LEASE_SIZE = struct.calcsize(">L32s32sL")
119    sharetype = "immutable"
120
121    @classmethod
122    def is_valid_header(cls, header):
123        # type: (bytes) -> bool
124        """
125        Determine if the given bytes constitute a valid header for this type of
126        container.
127
128        :param header: Some bytes from the beginning of a container.
129
130        :return: ``True`` if the bytes could belong to this container,
131            ``False`` otherwise.
132        """
133        (version,) = struct.unpack(">L", header[:4])
134        return schema_from_version(version) is not None
135
136    def __init__(
137            self,
138            filename,
139            max_size=None,
140            create=False,
141            lease_count_format="L",
142            schema=NEWEST_SCHEMA_VERSION,
143    ):
144        """
145        Initialize a ``ShareFile``.
146
147        :param Optional[int] max_size: If given, the maximum number of bytes
148           that this ``ShareFile`` will accept to be stored.
149
150        :param bool create: If ``True``, create the file (and fail if it
151            exists already).  ``max_size`` must not be ``None`` in this case.
152            If ``False``, open an existing file for reading.
153
154        :param str lease_count_format: A format character to use to encode and
155            decode the number of leases in the share file.  There are only 4
156            bytes available in the file so the format must be 4 bytes or
157            smaller.  If different formats are used at different times with
158            the same share file, the result will likely be nonsense.
159
160            This parameter is intended for the test suite to use to be able to
161            exercise values near the maximum encodeable value without having
162            to create billions of leases.
163
164        :raise ValueError: If the encoding of ``lease_count_format`` is too
165            large or if it is not a single format character.
166        """
167
168        precondition((max_size is not None) or (not create), max_size, create)
169
170        self._lease_count_format = _fix_lease_count_format(lease_count_format)
171        self._lease_count_size = struct.calcsize(self._lease_count_format)
172        self.home = filename
173        self._max_size = max_size
174        if create:
175            # touch the file, so later callers will see that we're working on
176            # it. Also construct the metadata.
177            assert not os.path.exists(self.home)
178            fileutil.make_dirs(os.path.dirname(self.home))
179            self._schema = schema
180            with open(self.home, 'wb') as f:
181                f.write(self._schema.header(max_size))
182            self._lease_offset = max_size + 0x0c
183            self._num_leases = 0
184        else:
185            with open(self.home, 'rb') as f:
186                filesize = os.path.getsize(self.home)
187                (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
188            self._schema = schema_from_version(version)
189            if self._schema is None:
190                raise UnknownImmutableContainerVersionError(filename, version)
191            self._num_leases = num_leases
192            self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
193            self._length = filesize - 0xc - (num_leases * self.LEASE_SIZE)
194
195        self._data_offset = 0xc
196
197    def get_length(self):
198        """
199        Return the length of the data in the share, if we're reading.
200        """
201        return self._length
202
203    def unlink(self):
204        os.unlink(self.home)
205
206    def read_share_data(self, offset, length):
207        precondition(offset >= 0)
208        # reads beyond the end of the data are truncated. Reads that start
209        # beyond the end of the data return an empty string.
210        seekpos = self._data_offset+offset
211        actuallength = max(0, min(length, self._lease_offset-seekpos))
212        if actuallength == 0:
213            return b""
214        with open(self.home, 'rb') as f:
215            f.seek(seekpos)
216            return f.read(actuallength)
217
218    def write_share_data(self, offset, data):
219        length = len(data)
220        precondition(offset >= 0, offset)
221        if self._max_size is not None and offset+length > self._max_size:
222            raise DataTooLargeError(self._max_size, offset, length)
223        with open(self.home, 'rb+') as f:
224            real_offset = self._data_offset+offset
225            f.seek(real_offset)
226            assert f.tell() == real_offset
227            f.write(data)
228
229    def _write_lease_record(self, f, lease_number, lease_info):
230        offset = self._lease_offset + lease_number * self.LEASE_SIZE
231        f.seek(offset)
232        assert f.tell() == offset
233        f.write(self._schema.lease_serializer.serialize(lease_info))
234
235    def _read_num_leases(self, f):
236        f.seek(0x08)
237        (num_leases,) = struct.unpack(
238            self._lease_count_format,
239            f.read(self._lease_count_size),
240        )
241        return num_leases
242
243    def _write_num_leases(self, f, num_leases):
244        self._write_encoded_num_leases(
245            f,
246            struct.pack(self._lease_count_format, num_leases),
247        )
248
249    def _write_encoded_num_leases(self, f, encoded_num_leases):
250        f.seek(0x08)
251        f.write(encoded_num_leases)
252
253    def _truncate_leases(self, f, num_leases):
254        f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
255
256    def get_leases(self):
257        """Yields a LeaseInfo instance for all leases."""
258        with open(self.home, 'rb') as f:
259            (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc))
260            f.seek(self._lease_offset)
261            for i in range(num_leases):
262                data = f.read(self.LEASE_SIZE)
263                if data:
264                    yield self._schema.lease_serializer.unserialize(data)
265
266    def add_lease(self, lease_info):
267        with open(self.home, 'rb+') as f:
268            num_leases = self._read_num_leases(f)
269            # Before we write the new lease record, make sure we can encode
270            # the new lease count.
271            new_lease_count = struct.pack(self._lease_count_format, num_leases + 1)
272            self._write_lease_record(f, num_leases, lease_info)
273            self._write_encoded_num_leases(f, new_lease_count)
274
275    def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False):
276        # type: (bytes, int, bool) -> None
277        """
278        Update the expiration time on an existing lease.
279
280        :param allow_backdate: If ``True`` then allow the new expiration time
281            to be before the current expiration time.  Otherwise, make no
282            change when this is the case.
283
284        :raise IndexError: If there is no lease matching the given renew
285            secret.
286        """
287        for i,lease in enumerate(self.get_leases()):
288            if lease.is_renew_secret(renew_secret):
289                # yup. See if we need to update the owner time.
290                if allow_backdate or new_expire_time > lease.get_expiration_time():
291                    # yes
292                    lease = lease.renew(new_expire_time)
293                    with open(self.home, 'rb+') as f:
294                        self._write_lease_record(f, i, lease)
295                return
296        raise IndexError("unable to renew non-existent lease")
297
298    def add_or_renew_lease(self, available_space, lease_info):
299        """
300        Renew an existing lease if possible, otherwise allocate a new one.
301
302        :param int available_space: The maximum number of bytes of storage to
303            commit in this operation.  If more than this number of bytes is
304            required, raise ``NoSpace`` instead.
305
306        :param LeaseInfo lease_info: The details of the lease to renew or add.
307
308        :raise NoSpace: If more than ``available_space`` bytes is required to
309            complete the operation.  In this case, no lease is added.
310
311        :return: ``None``
312        """
313        try:
314            self.renew_lease(lease_info.renew_secret,
315                             lease_info.get_expiration_time())
316        except IndexError:
317            if lease_info.immutable_size() > available_space:
318                raise NoSpace()
319            self.add_lease(lease_info)
320
321    def cancel_lease(self, cancel_secret):
322        """Remove a lease with the given cancel_secret. If the last lease is
323        cancelled, the file will be removed. Return the number of bytes that
324        were freed (by truncating the list of leases, and possibly by
325        deleting the file. Raise IndexError if there was no lease with the
326        given cancel_secret.
327        """
328
329        leases = list(self.get_leases())
330        num_leases_removed = 0
331        for i,lease in enumerate(leases):
332            if lease.is_cancel_secret(cancel_secret):
333                leases[i] = None
334                num_leases_removed += 1
335        if not num_leases_removed:
336            raise IndexError("unable to find matching lease to cancel")
337        if num_leases_removed:
338            # pack and write out the remaining leases. We write these out in
339            # the same order as they were added, so that if we crash while
340            # doing this, we won't lose any non-cancelled leases.
341            leases = [l for l in leases if l] # remove the cancelled leases
342            with open(self.home, 'rb+') as f:
343                for i, lease in enumerate(leases):
344                    self._write_lease_record(f, i, lease)
345                self._write_num_leases(f, len(leases))
346                self._truncate_leases(f, len(leases))
347        space_freed = self.LEASE_SIZE * num_leases_removed
348        if not len(leases):
349            space_freed += os.stat(self.home)[stat.ST_SIZE]
350            self.unlink()
351        return space_freed
352
353
354class BucketWriter(object):
355    """
356    Keep track of the process of writing to a ShareFile.
357    """
358
359    def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
360        self.ss = ss
361        self.incominghome = incominghome
362        self.finalhome = finalhome
363        self._max_size = max_size # don't allow the client to write more than this
364        self.closed = False
365        self.throw_out_all_data = False
366        self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
367        # also, add our lease to the file now, so that other ones can be
368        # added by simultaneous uploaders
369        self._sharefile.add_lease(lease_info)
370        self._already_written = RangeMap()
371        self._clock = clock
372        self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout)
373
374    def required_ranges(self):  # type: () -> RangeMap
375        """
376        Return which ranges still need to be written.
377        """
378        result = RangeMap()
379        result.set(True, 0, self._max_size)
380        for start, end, _ in self._already_written.ranges():
381            result.delete(start, end)
382        return result
383
384    def allocated_size(self):
385        return self._max_size
386
387    def write(self, offset, data):  # type: (int, bytes) -> bool
388        """
389        Write data at given offset, return whether the upload is complete.
390        """
391        # Delay the timeout, since we received data; if we get an
392        # AlreadyCancelled error, that means there's a bug in the client and
393        # write() was called after close().
394        self._timeout.reset(30 * 60)
395        start = self._clock.seconds()
396        precondition(not self.closed)
397        if self.throw_out_all_data:
398            return False
399
400        # Make sure we're not conflicting with existing data:
401        end = offset + len(data)
402        for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end):
403            chunk_len = chunk_stop - chunk_start
404            actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len)
405            writing_chunk = data[chunk_start - offset:chunk_stop - offset]
406            if actual_chunk != writing_chunk:
407                raise ConflictingWriteError(
408                    "Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop)
409                )
410        self._sharefile.write_share_data(offset, data)
411
412        self._already_written.set(True, offset, end)
413        self.ss.add_latency("write", self._clock.seconds() - start)
414        self.ss.count("write")
415        return self._is_finished()
416
417    def _is_finished(self):
418        """
419        Return whether the whole thing has been written.
420        """
421        return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size
422
423    def close(self):
424        # This can't actually be enabled, because it's not backwards compatible
425        # with old Foolscap clients.
426        # assert self._is_finished()
427        precondition(not self.closed)
428        self._timeout.cancel()
429        start = self._clock.seconds()
430
431        fileutil.make_dirs(os.path.dirname(self.finalhome))
432        fileutil.rename(self.incominghome, self.finalhome)
433        try:
434            # self.incominghome is like storage/shares/incoming/ab/abcde/4 .
435            # We try to delete the parent (.../ab/abcde) to avoid leaving
436            # these directories lying around forever, but the delete might
437            # fail if we're working on another share for the same storage
438            # index (like ab/abcde/5). The alternative approach would be to
439            # use a hierarchy of objects (PrefixHolder, BucketHolder,
440            # ShareWriter), each of which is responsible for a single
441            # directory on disk, and have them use reference counting of
442            # their children to know when they should do the rmdir. This
443            # approach is simpler, but relies on os.rmdir refusing to delete
444            # a non-empty directory. Do *not* use fileutil.rm_dir() here!
445            os.rmdir(os.path.dirname(self.incominghome))
446            # we also delete the grandparent (prefix) directory, .../ab ,
447            # again to avoid leaving directories lying around. This might
448            # fail if there is another bucket open that shares a prefix (like
449            # ab/abfff).
450            os.rmdir(os.path.dirname(os.path.dirname(self.incominghome)))
451            # we leave the great-grandparent (incoming/) directory in place.
452        except EnvironmentError:
453            # ignore the "can't rmdir because the directory is not empty"
454            # exceptions, those are normal consequences of the
455            # above-mentioned conditions.
456            pass
457        self._sharefile = None
458        self.closed = True
459
460        filelen = os.stat(self.finalhome)[stat.ST_SIZE]
461        self.ss.bucket_writer_closed(self, filelen)
462        self.ss.add_latency("close", self._clock.seconds() - start)
463        self.ss.count("close")
464
465    def disconnected(self):
466        if not self.closed:
467            self.abort()
468
469    def _abort_due_to_timeout(self):
470        """
471        Called if we run out of time.
472        """
473        log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome,
474                facility="tahoe.storage", level=log.UNUSUAL)
475        self.abort()
476
477    def abort(self):
478        log.msg("storage: aborting sharefile %s" % self.incominghome,
479                facility="tahoe.storage", level=log.UNUSUAL)
480        self.ss.count("abort")
481        if self.closed:
482            return
483
484        os.remove(self.incominghome)
485        # if we were the last share to be moved, remove the incoming/
486        # directory that was our parent
487        parentdir = os.path.split(self.incominghome)[0]
488        if not os.listdir(parentdir):
489            os.rmdir(parentdir)
490        self._sharefile = None
491
492        # We are now considered closed for further writing. We must tell
493        # the storage server about this so that it stops expecting us to
494        # use the space it allocated for us earlier.
495        self.closed = True
496        self.ss.bucket_writer_closed(self, 0)
497
498        # Cancel timeout if it wasn't already cancelled.
499        if self._timeout.active():
500            self._timeout.cancel()
501
502
503@implementer(RIBucketWriter)
504class FoolscapBucketWriter(Referenceable):  # type: ignore # warner/foolscap#78
505    """
506    Foolscap-specific BucketWriter.
507    """
508    def __init__(self, bucket_writer):
509        self._bucket_writer = bucket_writer
510
511    def remote_write(self, offset, data):
512        self._bucket_writer.write(offset, data)
513
514    def remote_close(self):
515        return self._bucket_writer.close()
516
517    def remote_abort(self):
518        return self._bucket_writer.abort()
519
520
521class BucketReader(object):
522    """
523    Manage the process for reading from a ``ShareFile``.
524    """
525
526    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
527        self.ss = ss
528        self._share_file = ShareFile(sharefname)
529        self.storage_index = storage_index
530        self.shnum = shnum
531
532    def __repr__(self):
533        return "<%s %s %s>" % (self.__class__.__name__,
534                               base32.b2a(self.storage_index[:8])[:12].decode(),
535                               self.shnum)
536
537    def read(self, offset, length):
538        start = time.time()
539        data = self._share_file.read_share_data(offset, length)
540        self.ss.add_latency("read", time.time() - start)
541        self.ss.count("read")
542        return data
543
544    def advise_corrupt_share(self, reason):
545        return self.ss.advise_corrupt_share(b"immutable",
546                                            self.storage_index,
547                                            self.shnum,
548                                            reason)
549
550    def get_length(self):
551        """
552        Return the length of the data in the share.
553        """
554        return self._share_file.get_length()
555
556
557@implementer(RIBucketReader)
558class FoolscapBucketReader(Referenceable):  # type: ignore # warner/foolscap#78
559    """
560    Foolscap wrapper for ``BucketReader``
561    """
562
563    def __init__(self, bucket_reader):
564        self._bucket_reader = bucket_reader
565
566    def remote_read(self, offset, length):
567        return self._bucket_reader.read(offset, length)
568
569    def remote_advise_corrupt_share(self, reason):
570        return self._bucket_reader.advise_corrupt_share(reason)
Note: See TracBrowser for help on using the repository browser.