source: trunk/src/allmydata/storage/mutable.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 18.5 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os, stat, struct
6
7from allmydata.interfaces import (
8    BadWriteEnablerError,
9    NoSpace,
10)
11from allmydata.util import idlib, log
12from allmydata.util.assertutil import precondition
13from allmydata.util.hashutil import timing_safe_compare
14from allmydata.storage.lease import LeaseInfo
15from allmydata.storage.common import UnknownMutableContainerVersionError, \
16     DataTooLargeError
17from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
18from .mutable_schema import (
19    NEWEST_SCHEMA_VERSION,
20    schema_from_header,
21)
22
23# the MutableShareFile is like the ShareFile, but used for mutable data. It
24# has a different layout. See docs/mutable.txt for more details.
25
26# #   offset    size    name
27# 1   0         32      magic verstr "tahoe mutable container v1" plus binary
28# 2   32        20      write enabler's nodeid
29# 3   52        32      write enabler
30# 4   84        8       data size (actual share data present) (a)
31# 5   92        8       offset of (8) count of extra leases (after data)
32# 6   100       368     four leases, 92 bytes each
33#                        0    4   ownerid (0 means "no lease here")
34#                        4    4   expiration timestamp
35#                        8   32   renewal token
36#                        40  32   cancel token
37#                        72  20   nodeid which accepted the tokens
38# 7   468       (a)     data
39# 8   ??        4       count of extra leases
40# 9   ??        n*92    extra leases
41
42
43# The struct module doc says that L's are 4 bytes in size., and that Q's are
44# 8 bytes in size. Since compatibility depends upon this, double-check it.
45assert struct.calcsize(">L") == 4, struct.calcsize(">L")
46assert struct.calcsize(">Q") == 8, struct.calcsize(">Q")
47
48class MutableShareFile(object):
49
50    sharetype = "mutable"
51    DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s")
52    EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
53    HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases
54    LEASE_SIZE = struct.calcsize(">LL32s32s20s")
55    assert LEASE_SIZE == 92
56    DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
57    assert DATA_OFFSET == 468, DATA_OFFSET
58    # our sharefiles share with a recognizable string, plus some random
59    # binary data to reduce the chance that a regular text file will look
60    # like a sharefile.
61    MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
62    # TODO: decide upon a policy for max share size
63
64    @classmethod
65    def is_valid_header(cls, header):
66        # type: (bytes) -> bool
67        """
68        Determine if the given bytes constitute a valid header for this type of
69        container.
70
71        :param header: Some bytes from the beginning of a container.
72
73        :return: ``True`` if the bytes could belong to this container,
74            ``False`` otherwise.
75        """
76        return schema_from_header(header) is not None
77
78    def __init__(self, filename, parent=None, schema=NEWEST_SCHEMA_VERSION):
79        self.home = filename
80        if os.path.exists(self.home):
81            # we don't cache anything, just check the magic
82            with open(self.home, 'rb') as f:
83                header = f.read(self.HEADER_SIZE)
84            self._schema = schema_from_header(header)
85            if self._schema is None:
86                raise UnknownMutableContainerVersionError(filename, header)
87        else:
88            self._schema = schema
89        self.parent = parent # for logging
90
91    def log(self, *args, **kwargs):
92        return self.parent.log(*args, **kwargs)
93
94    def create(self, my_nodeid, write_enabler):
95        assert not os.path.exists(self.home)
96        with open(self.home, 'wb') as f:
97            f.write(self._schema.header(my_nodeid, write_enabler))
98
99    def unlink(self):
100        os.unlink(self.home)
101
102    def _read_data_length(self, f):
103        f.seek(self.DATA_LENGTH_OFFSET)
104        (data_length,) = struct.unpack(">Q", f.read(8))
105        return data_length
106
107    def _write_data_length(self, f, data_length):
108        f.seek(self.DATA_LENGTH_OFFSET)
109        f.write(struct.pack(">Q", data_length))
110
111    def _read_share_data(self, f, offset, length):
112        precondition(offset >= 0)
113        precondition(length >= 0)
114        data_length = self._read_data_length(f)
115        if offset+length > data_length:
116            # reads beyond the end of the data are truncated. Reads that
117            # start beyond the end of the data return an empty string.
118            length = max(0, data_length-offset)
119        if length == 0:
120            return b""
121        precondition(offset+length <= data_length)
122        f.seek(self.DATA_OFFSET+offset)
123        data = f.read(length)
124        return data
125
126    def _read_extra_lease_offset(self, f):
127        f.seek(self.EXTRA_LEASE_OFFSET)
128        (extra_lease_offset,) = struct.unpack(">Q", f.read(8))
129        return extra_lease_offset
130
131    def _write_extra_lease_offset(self, f, offset):
132        f.seek(self.EXTRA_LEASE_OFFSET)
133        f.write(struct.pack(">Q", offset))
134
135    def _read_num_extra_leases(self, f):
136        offset = self._read_extra_lease_offset(f)
137        f.seek(offset)
138        (num_extra_leases,) = struct.unpack(">L", f.read(4))
139        return num_extra_leases
140
141    def _write_num_extra_leases(self, f, num_leases):
142        extra_lease_offset = self._read_extra_lease_offset(f)
143        f.seek(extra_lease_offset)
144        f.write(struct.pack(">L", num_leases))
145
146    def _change_container_size(self, f, new_container_size):
147        if new_container_size > self.MAX_SIZE:
148            raise DataTooLargeError()
149        old_extra_lease_offset = self._read_extra_lease_offset(f)
150        new_extra_lease_offset = self.DATA_OFFSET + new_container_size
151        if new_extra_lease_offset < old_extra_lease_offset:
152            # TODO: allow containers to shrink. For now they remain large.
153            return
154        num_extra_leases = self._read_num_extra_leases(f)
155        f.seek(old_extra_lease_offset)
156        leases_size = 4 + num_extra_leases * self.LEASE_SIZE
157        extra_lease_data = f.read(leases_size)
158
159        # Zero out the old lease info (in order to minimize the chance that
160        # it could accidentally be exposed to a reader later, re #1528).
161        f.seek(old_extra_lease_offset)
162        f.write(b'\x00' * leases_size)
163        f.flush()
164
165        # An interrupt here will corrupt the leases.
166
167        f.seek(new_extra_lease_offset)
168        f.write(extra_lease_data)
169        self._write_extra_lease_offset(f, new_extra_lease_offset)
170
171    def _write_share_data(self, f, offset, data):
172        length = len(data)
173        precondition(offset >= 0)
174        data_length = self._read_data_length(f)
175        extra_lease_offset = self._read_extra_lease_offset(f)
176
177        if offset+length >= data_length:
178            # They are expanding their data size.
179
180            if self.DATA_OFFSET+offset+length > extra_lease_offset:
181                # TODO: allow containers to shrink. For now, they remain
182                # large.
183
184                # Their new data won't fit in the current container, so we
185                # have to move the leases. With luck, they're expanding it
186                # more than the size of the extra lease block, which will
187                # minimize the corrupt-the-share window
188                self._change_container_size(f, offset+length)
189                extra_lease_offset = self._read_extra_lease_offset(f)
190
191                # an interrupt here is ok.. the container has been enlarged
192                # but the data remains untouched
193
194            assert self.DATA_OFFSET+offset+length <= extra_lease_offset
195            # Their data now fits in the current container. We must write
196            # their new data and modify the recorded data size.
197
198            # Fill any newly exposed empty space with 0's.
199            if offset > data_length:
200                f.seek(self.DATA_OFFSET+data_length)
201                f.write(b'\x00'*(offset - data_length))
202                f.flush()
203
204            new_data_length = offset+length
205            self._write_data_length(f, new_data_length)
206            # an interrupt here will result in a corrupted share
207
208        # now all that's left to do is write out their data
209        f.seek(self.DATA_OFFSET+offset)
210        f.write(data)
211        return
212
213    def _write_lease_record(self, f, lease_number, lease_info):
214        extra_lease_offset = self._read_extra_lease_offset(f)
215        num_extra_leases = self._read_num_extra_leases(f)
216        if lease_number < 4:
217            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
218        elif (lease_number-4) < num_extra_leases:
219            offset = (extra_lease_offset
220                      + 4
221                      + (lease_number-4)*self.LEASE_SIZE)
222        else:
223            # must add an extra lease record
224            self._write_num_extra_leases(f, num_extra_leases+1)
225            offset = (extra_lease_offset
226                      + 4
227                      + (lease_number-4)*self.LEASE_SIZE)
228        f.seek(offset)
229        assert f.tell() == offset
230        f.write(self._schema.lease_serializer.serialize(lease_info))
231
232    def _read_lease_record(self, f, lease_number):
233        # returns a LeaseInfo instance, or None
234        extra_lease_offset = self._read_extra_lease_offset(f)
235        num_extra_leases = self._read_num_extra_leases(f)
236        if lease_number < 4:
237            offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
238        elif (lease_number-4) < num_extra_leases:
239            offset = (extra_lease_offset
240                      + 4
241                      + (lease_number-4)*self.LEASE_SIZE)
242        else:
243            raise IndexError("No such lease number %d" % lease_number)
244        f.seek(offset)
245        assert f.tell() == offset
246        data = f.read(self.LEASE_SIZE)
247        lease_info = self._schema.lease_serializer.unserialize(data)
248        if lease_info.owner_num == 0:
249            return None
250        return lease_info
251
252    def _get_num_lease_slots(self, f):
253        # how many places do we have allocated for leases? Not all of them
254        # are filled.
255        num_extra_leases = self._read_num_extra_leases(f)
256        return 4+num_extra_leases
257
258    def _get_first_empty_lease_slot(self, f):
259        # return an int with the index of an empty slot, or None if we do not
260        # currently have an empty slot
261
262        for i in range(self._get_num_lease_slots(f)):
263            if self._read_lease_record(f, i) is None:
264                return i
265        return None
266
267    def get_leases(self):
268        """Yields a LeaseInfo instance for all leases."""
269        with open(self.home, 'rb') as f:
270            for i, lease in self._enumerate_leases(f):
271                yield lease
272
273    def _enumerate_leases(self, f):
274        for i in range(self._get_num_lease_slots(f)):
275            try:
276                data = self._read_lease_record(f, i)
277                if data is not None:
278                    yield i,data
279            except IndexError:
280                return
281
282    def add_lease(self, available_space, lease_info):
283        """
284        Add a new lease to this share.
285
286        :param int available_space: The maximum number of bytes of storage to
287            commit in this operation.  If more than this number of bytes is
288            required, raise ``NoSpace`` instead.
289
290        :raise NoSpace: If more than ``available_space`` bytes is required to
291            complete the operation.  In this case, no lease is added.
292
293        :return: ``None``
294        """
295        precondition(lease_info.owner_num != 0) # 0 means "no lease here"
296        with open(self.home, 'rb+') as f:
297            num_lease_slots = self._get_num_lease_slots(f)
298            empty_slot = self._get_first_empty_lease_slot(f)
299            if empty_slot is not None:
300                self._write_lease_record(f, empty_slot, lease_info)
301            else:
302                if lease_info.mutable_size() > available_space:
303                    raise NoSpace()
304                self._write_lease_record(f, num_lease_slots, lease_info)
305
306    def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False):
307        # type: (bytes, int, bool) -> None
308        """
309        Update the expiration time on an existing lease.
310
311        :param allow_backdate: If ``True`` then allow the new expiration time
312            to be before the current expiration time.  Otherwise, make no
313            change when this is the case.
314
315        :raise IndexError: If there is no lease matching the given renew
316            secret.
317        """
318        accepting_nodeids = set()
319        with open(self.home, 'rb+') as f:
320            for (leasenum,lease) in self._enumerate_leases(f):
321                if lease.is_renew_secret(renew_secret):
322                    # yup. See if we need to update the owner time.
323                    if allow_backdate or new_expire_time > lease.get_expiration_time():
324                        # yes
325                        lease = lease.renew(new_expire_time)
326                        self._write_lease_record(f, leasenum, lease)
327                    return
328                accepting_nodeids.add(lease.nodeid)
329        # Return the accepting_nodeids set, to give the client a chance to
330        # update the leases on a share which has been migrated from its
331        # original server to a new one.
332        msg = ("Unable to renew non-existent lease. I have leases accepted by"
333               " nodeids: ")
334        msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
335                         for anid in accepting_nodeids])
336        msg += " ."
337        raise IndexError(msg)
338
339    def add_or_renew_lease(self, available_space, lease_info):
340        precondition(lease_info.owner_num != 0) # 0 means "no lease here"
341        try:
342            self.renew_lease(lease_info.renew_secret,
343                             lease_info.get_expiration_time())
344        except IndexError:
345            self.add_lease(available_space, lease_info)
346
347    def cancel_lease(self, cancel_secret):
348        """Remove any leases with the given cancel_secret. If the last lease
349        is cancelled, the file will be removed. Return the number of bytes
350        that were freed (by truncating the list of leases, and possibly by
351        deleting the file. Raise IndexError if there was no lease with the
352        given cancel_secret."""
353
354        accepting_nodeids = set()
355        modified = 0
356        remaining = 0
357        blank_lease = LeaseInfo(owner_num=0,
358                                renew_secret=b"\x00"*32,
359                                cancel_secret=b"\x00"*32,
360                                expiration_time=0,
361                                nodeid=b"\x00"*20)
362        with open(self.home, 'rb+') as f:
363            for (leasenum,lease) in self._enumerate_leases(f):
364                accepting_nodeids.add(lease.nodeid)
365                if lease.is_cancel_secret(cancel_secret):
366                    self._write_lease_record(f, leasenum, blank_lease)
367                    modified += 1
368                else:
369                    remaining += 1
370            if modified:
371                freed_space = self._pack_leases(f)
372                f.close()
373                if not remaining:
374                    freed_space += os.stat(self.home)[stat.ST_SIZE]
375                    self.unlink()
376                return freed_space
377
378        msg = ("Unable to cancel non-existent lease. I have leases "
379               "accepted by nodeids: ")
380        msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
381                         for anid in accepting_nodeids])
382        msg += " ."
383        raise IndexError(msg)
384
385    def _pack_leases(self, f):
386        # TODO: reclaim space from cancelled leases
387        return 0
388
389    def _read_write_enabler_and_nodeid(self, f):
390        f.seek(0)
391        data = f.read(self.HEADER_SIZE)
392        (magic,
393         write_enabler_nodeid, write_enabler,
394         data_length, extra_least_offset) = \
395         struct.unpack(">32s20s32sQQ", data)
396        assert self.is_valid_header(data)
397        return (write_enabler, write_enabler_nodeid)
398
399    def readv(self, readv):
400        datav = []
401        with open(self.home, 'rb') as f:
402            for (offset, length) in readv:
403                datav.append(self._read_share_data(f, offset, length))
404        return datav
405
406    def get_length(self):
407        """
408        Return the length of the data in the share.
409        """
410        f = open(self.home, 'rb')
411        data_length = self._read_data_length(f)
412        f.close()
413        return data_length
414
415    def check_write_enabler(self, write_enabler, si_s):
416        with open(self.home, 'rb+') as f:
417            (real_write_enabler, write_enabler_nodeid) = \
418                                 self._read_write_enabler_and_nodeid(f)
419        # avoid a timing attack
420        #if write_enabler != real_write_enabler:
421        if not timing_safe_compare(write_enabler, real_write_enabler):
422            # accomodate share migration by reporting the nodeid used for the
423            # old write enabler.
424            self.log(format="bad write enabler on SI %(si)s,"
425                     " recorded by nodeid %(nodeid)s",
426                     facility="tahoe.storage",
427                     level=log.WEIRD, umid="cE1eBQ",
428                     si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid))
429            msg = "The write enabler was recorded by nodeid '%s'." % \
430                  (idlib.nodeid_b2a(write_enabler_nodeid),)
431            raise BadWriteEnablerError(msg)
432
433    def check_testv(self, testv):
434        test_good = True
435        with open(self.home, 'rb+') as f:
436            for (offset, length, operator, specimen) in testv:
437                data = self._read_share_data(f, offset, length)
438                if not testv_compare(data, operator, specimen):
439                    test_good = False
440                    break
441        return test_good
442
443    def writev(self, datav, new_length):
444        with open(self.home, 'rb+') as f:
445            for (offset, data) in datav:
446                self._write_share_data(f, offset, data)
447            if new_length is not None:
448                cur_length = self._read_data_length(f)
449                if new_length < cur_length:
450                    self._write_data_length(f, new_length)
451                    # TODO: if we're going to shrink the share file when the
452                    # share data has shrunk, then call
453                    # self._change_container_size() here.
454
455def testv_compare(a, op, b):
456    assert op == b"eq"
457    return a == b
458
459
460class EmptyShare(object):
461
462    def check_testv(self, testv):
463        test_good = True
464        for (offset, length, operator, specimen) in testv:
465            data = b""
466            if not testv_compare(data, operator, specimen):
467                test_good = False
468                break
469        return test_good
470
471def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
472    ms = MutableShareFile(filename, parent)
473    ms.create(my_nodeid, write_enabler)
474    del ms
475    return MutableShareFile(filename, parent)
Note: See TracBrowser for help on using the repository browser.