source: trunk/src/allmydata/immutable/layout.py

Last change on this file was c93ff23, checked in by Itamar Turner-Trauring <itamar@…>, at 2022-12-01T19:54:28Z

Don't send empty string writes.

  • Property mode set to 100644
File size: 20.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from __future__ import annotations
6
7import struct
8from io import BytesIO
9
10from attrs import define, field
11from zope.interface import implementer
12from twisted.internet import defer
13from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
14     FileTooLargeError, HASH_SIZE
15from allmydata.util import mathutil, observer, log
16from allmydata.util.assertutil import precondition
17from allmydata.storage.server import si_b2a
18
19
20class LayoutInvalid(Exception):
21    """ There is something wrong with these bytes so they can't be
22    interpreted as the kind of immutable file that I know how to download."""
23    pass
24
25class RidiculouslyLargeURIExtensionBlock(LayoutInvalid):
26    """ When downloading a file, the length of the URI Extension Block was
27    given as >= 2**32. This means the share data must have been corrupted, or
28    else the original uploader of the file wrote a ridiculous value into the
29    URI Extension Block length."""
30    pass
31
32class ShareVersionIncompatible(LayoutInvalid):
33    """ When downloading a share, its format was not one of the formats we
34    know how to parse."""
35    pass
36
37"""
38Share data is written in a file. At the start of the file, there is a series
39of four-byte big-endian offset values, which indicate where each section
40starts. Each offset is measured from the beginning of the share data.
41
420x00: version number (=00 00 00 01)
430x04: block size # See Footnote 1 below.
440x08: share data size # See Footnote 1 below.
450x0c: offset of data (=00 00 00 24)
460x10: offset of plaintext_hash_tree UNUSED
470x14: offset of crypttext_hash_tree
480x18: offset of block_hashes
490x1c: offset of share_hashes
500x20: offset of uri_extension_length + uri_extension
510x24: start of data
52?   : start of plaintext_hash_tree UNUSED
53?   : start of crypttext_hash_tree
54?   : start of block_hashes
55?   : start of share_hashes
56       each share_hash is written as a two-byte (big-endian) hashnum
57       followed by the 32-byte SHA-256 hash. We store only the hashes
58       necessary to validate the share hash root
59?   : start of uri_extension_length (four-byte big-endian value)
60?   : start of uri_extension
61"""
62
63"""
64v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
65limitations described in #346.
66
670x00: version number (=00 00 00 02)
680x04: block size # See Footnote 1 below.
690x0c: share data size # See Footnote 1 below.
700x14: offset of data (=00 00 00 00 00 00 00 44)
710x1c: offset of plaintext_hash_tree UNUSED
720x24: offset of crypttext_hash_tree
730x2c: offset of block_hashes
740x34: offset of share_hashes
750x3c: offset of uri_extension_length + uri_extension
760x44: start of data
77    : rest of share is the same as v1, above
78...   ...
79?   : start of uri_extension_length (eight-byte big-endian value)
80?   : start of uri_extension
81"""
82
83# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but
84# they are still provided when writing so that older versions of Tahoe can
85# read them.
86
87FORCE_V2 = False # set briefly by unit tests to make small-sized V2 shares
88
89def make_write_bucket_proxy(rref, server,
90                            data_size, block_size, num_segments,
91                            num_share_hashes, uri_extension_size):
92    # Use layout v1 for small files, so they'll be readable by older versions
93    # (<tahoe-1.3.0). Use layout v2 for large files; they'll only be readable
94    # by tahoe-1.3.0 or later.
95    try:
96        if FORCE_V2:
97            raise FileTooLargeError
98        wbp = WriteBucketProxy(rref, server,
99                               data_size, block_size, num_segments,
100                               num_share_hashes, uri_extension_size)
101    except FileTooLargeError:
102        wbp = WriteBucketProxy_v2(rref, server,
103                                  data_size, block_size, num_segments,
104                                  num_share_hashes, uri_extension_size)
105    return wbp
106
107
108@define
109class _WriteBuffer:
110    """
111    Queue up small writes to be written in a single batched larger write.
112    """
113    _batch_size: int
114    _to_write : BytesIO = field(factory=BytesIO)
115    _written_bytes : int = field(default=0)
116
117    def queue_write(self, data: bytes) -> bool:
118        """
119        Queue a write.  If the result is ``False``, no further action is needed
120        for now.  If the result is some ``True``, it's time to call ``flush()``
121        and do a real write.
122        """
123        self._to_write.write(data)
124        return self.get_queued_bytes() >= self._batch_size
125
126    def flush(self) -> tuple[int, bytes]:
127        """Return offset and data to be written."""
128        offset = self._written_bytes
129        data = self._to_write.getvalue()
130        self._written_bytes += len(data)
131        self._to_write = BytesIO()
132        return (offset, data)
133
134    def get_queued_bytes(self) -> int:
135        """Return number of queued, unwritten bytes."""
136        return self._to_write.tell()
137
138    def get_total_bytes(self) -> int:
139        """Return how many bytes were written or queued in total."""
140        return self._written_bytes + self.get_queued_bytes()
141
142
143@implementer(IStorageBucketWriter)
144class WriteBucketProxy(object):
145    """
146    Note: The various ``put_`` methods need to be called in the order in which the
147    bytes will get written.
148    """
149    fieldsize = 4
150    fieldstruct = ">L"
151
152    def __init__(self, rref, server, data_size, block_size, num_segments,
153                 num_share_hashes, uri_extension_size, batch_size=1_000_000):
154        self._rref = rref
155        self._server = server
156        self._data_size = data_size
157        self._block_size = block_size
158        self._num_segments = num_segments
159
160        effective_segments = mathutil.next_power_of_k(num_segments,2)
161        self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
162        # how many share hashes are included in each share? This will be
163        # about ln2(num_shares).
164        self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
165        self._uri_extension_size = uri_extension_size
166
167        self._create_offsets(block_size, data_size)
168
169        # With a ~1MB batch size, max upload speed is 1MB/(round-trip latency)
170        # assuming the writing code waits for writes to finish, so 20MB/sec if
171        # latency is 50ms. In the US many people only have 1MB/sec upload speed
172        # as of 2022 (standard Comcast). For further discussion of how one
173        # might set batch sizes see
174        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3787#comment:1.
175        self._write_buffer = _WriteBuffer(batch_size)
176
177    def get_allocated_size(self):
178        return (self._offsets['uri_extension'] + self.fieldsize +
179                self._uri_extension_size)
180
181    def _create_offsets(self, block_size, data_size):
182        if block_size >= 2**32 or data_size >= 2**32:
183            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
184
185        offsets = self._offsets = {}
186        x = 0x24
187        offsets['data'] = x
188        x += data_size
189        offsets['plaintext_hash_tree'] = x # UNUSED
190        x += self._segment_hash_size
191        offsets['crypttext_hash_tree'] = x
192        x += self._segment_hash_size
193        offsets['block_hashes'] = x
194        x += self._segment_hash_size
195        offsets['share_hashes'] = x
196        x += self._share_hashtree_size
197        offsets['uri_extension'] = x
198
199        if x >= 2**32:
200            raise FileTooLargeError("This file is too large to be uploaded (offsets).")
201
202        offset_data = struct.pack(">LLLLLLLLL",
203                                  1, # version number
204                                  block_size,
205                                  data_size,
206                                  offsets['data'],
207                                  offsets['plaintext_hash_tree'], # UNUSED
208                                  offsets['crypttext_hash_tree'],
209                                  offsets['block_hashes'],
210                                  offsets['share_hashes'],
211                                  offsets['uri_extension'],
212                                  )
213        assert len(offset_data) == 0x24
214        self._offset_data = offset_data
215
216    def __repr__(self):
217        return "<WriteBucketProxy for node %r>" % self._server.get_name()
218
219    def put_header(self):
220        return self._queue_write(0, self._offset_data)
221
222    def put_block(self, segmentnum, data):
223        offset = self._offsets['data'] + segmentnum * self._block_size
224        assert offset + len(data) <= self._offsets['uri_extension']
225        assert isinstance(data, bytes)
226        if segmentnum < self._num_segments-1:
227            precondition(len(data) == self._block_size,
228                         len(data), self._block_size)
229        else:
230            precondition(len(data) == (self._data_size -
231                                       (self._block_size *
232                                        (self._num_segments - 1))),
233                         len(data), self._block_size)
234        return self._queue_write(offset, data)
235
236    def put_crypttext_hashes(self, hashes):
237        # plaintext_hash_tree precedes crypttext_hash_tree. It is not used, and
238        # so is not explicitly written, but we need to write everything, so
239        # fill it in with nulls.
240        d = self._queue_write(self._offsets['plaintext_hash_tree'], b"\x00" * self._segment_hash_size)
241        d.addCallback(lambda _: self._really_put_crypttext_hashes(hashes))
242        return d
243
244    def _really_put_crypttext_hashes(self, hashes):
245        offset = self._offsets['crypttext_hash_tree']
246        assert isinstance(hashes, list)
247        data = b"".join(hashes)
248        precondition(len(data) == self._segment_hash_size,
249                     len(data), self._segment_hash_size)
250        precondition(offset + len(data) <= self._offsets['block_hashes'],
251                     offset, len(data), offset+len(data),
252                     self._offsets['block_hashes'])
253        return self._queue_write(offset, data)
254
255    def put_block_hashes(self, blockhashes):
256        offset = self._offsets['block_hashes']
257        assert isinstance(blockhashes, list)
258        data = b"".join(blockhashes)
259        precondition(len(data) == self._segment_hash_size,
260                     len(data), self._segment_hash_size)
261        precondition(offset + len(data) <= self._offsets['share_hashes'],
262                     offset, len(data), offset+len(data),
263                     self._offsets['share_hashes'])
264        return self._queue_write(offset, data)
265
266    def put_share_hashes(self, sharehashes):
267        # sharehashes is a list of (index, hash) tuples, so they get stored
268        # as 2+32=34 bytes each
269        offset = self._offsets['share_hashes']
270        assert isinstance(sharehashes, list)
271        data = b"".join([struct.pack(">H", hashnum) + hashvalue
272                        for hashnum,hashvalue in sharehashes])
273        precondition(len(data) == self._share_hashtree_size,
274                     len(data), self._share_hashtree_size)
275        precondition(offset + len(data) <= self._offsets['uri_extension'],
276                     offset, len(data), offset+len(data),
277                     self._offsets['uri_extension'])
278        return self._queue_write(offset, data)
279
280    def put_uri_extension(self, data):
281        offset = self._offsets['uri_extension']
282        assert isinstance(data, bytes)
283        precondition(len(data) == self._uri_extension_size)
284        length = struct.pack(self.fieldstruct, len(data))
285        return self._queue_write(offset, length+data)
286
287    def _queue_write(self, offset, data):
288        """
289        This queues up small writes to be written in a single batched larger
290        write.
291
292        Callers of this function are expected to queue the data in order, with
293        no holes.  As such, the offset is technically unnecessary, but is used
294        to check the inputs.  Possibly we should get rid of it.
295        """
296        assert offset == self._write_buffer.get_total_bytes()
297        if self._write_buffer.queue_write(data):
298            return self._actually_write()
299        else:
300            return defer.succeed(False)
301
302    def _actually_write(self):
303        """Write data to the server."""
304        offset, data = self._write_buffer.flush()
305        return self._rref.callRemote("write", offset, data)
306
307    def close(self):
308        assert self._write_buffer.get_total_bytes() == self.get_allocated_size(), (
309            f"{self._written_buffer.get_total_bytes_queued()} != {self.get_allocated_size()}"
310        )
311        if self._write_buffer.get_queued_bytes() > 0:
312            d = self._actually_write()
313        else:
314            # No data queued, don't send empty string write.
315            d = defer.succeed(True)
316        d.addCallback(lambda _: self._rref.callRemote("close"))
317        return d
318
319    def abort(self):
320        return self._rref.callRemote("abort").addErrback(log.err, "Error from remote call to abort an immutable write bucket")
321
322    def get_servername(self):
323        return self._server.get_name()
324    def get_peerid(self):
325        return self._server.get_serverid()
326
327class WriteBucketProxy_v2(WriteBucketProxy):
328    fieldsize = 8
329    fieldstruct = ">Q"
330
331    def _create_offsets(self, block_size, data_size):
332        if block_size >= 2**64 or data_size >= 2**64:
333            raise FileTooLargeError("This file is too large to be uploaded (data_size).")
334
335        offsets = self._offsets = {}
336        x = 0x44
337        offsets['data'] = x
338        x += data_size
339        offsets['plaintext_hash_tree'] = x # UNUSED
340        x += self._segment_hash_size
341        offsets['crypttext_hash_tree'] = x
342        x += self._segment_hash_size
343        offsets['block_hashes'] = x
344        x += self._segment_hash_size
345        offsets['share_hashes'] = x
346        x += self._share_hashtree_size
347        offsets['uri_extension'] = x
348
349        if x >= 2**64:
350            raise FileTooLargeError("This file is too large to be uploaded (offsets).")
351
352        offset_data = struct.pack(">LQQQQQQQQ",
353                                  2, # version number
354                                  block_size,
355                                  data_size,
356                                  offsets['data'],
357                                  offsets['plaintext_hash_tree'], # UNUSED
358                                  offsets['crypttext_hash_tree'],
359                                  offsets['block_hashes'],
360                                  offsets['share_hashes'],
361                                  offsets['uri_extension'],
362                                  )
363        assert len(offset_data) == 0x44, len(offset_data)
364        self._offset_data = offset_data
365
366@implementer(IStorageBucketReader)
367class ReadBucketProxy(object):
368
369    def __init__(self, rref, server, storage_index):
370        self._rref = rref
371        self._server = server
372        self._storage_index = storage_index
373        self._started = False # sent request to server
374        self._ready = observer.OneShotObserverList() # got response from server
375
376    def get_peerid(self):
377        return self._server.get_serverid()
378
379    def __repr__(self):
380        return "<ReadBucketProxy %r to peer [%r] SI %r>" % \
381               (id(self), self._server.get_name(), si_b2a(self._storage_index))
382
383    def _start_if_needed(self):
384        """ Returns a deferred that will be fired when I'm ready to return
385        data, or errbacks if the starting (header reading and parsing)
386        process fails."""
387        if not self._started:
388            self._start()
389        return self._ready.when_fired()
390
391    def _start(self):
392        self._started = True
393        # TODO: for small shares, read the whole bucket in _start()
394        d = self._fetch_header()
395        d.addCallback(self._parse_offsets)
396        def _fail_waiters(f):
397            self._ready.fire(f)
398        def _notify_waiters(result):
399            self._ready.fire(result)
400        d.addCallbacks(_notify_waiters, _fail_waiters)
401        return d
402
403    def _fetch_header(self):
404        return self._read(0, 0x44)
405
406    def _parse_offsets(self, data):
407        precondition(len(data) >= 0x4)
408        self._offsets = {}
409        (version,) = struct.unpack(">L", data[0:4])
410        if version != 1 and version != 2:
411            raise ShareVersionIncompatible(version)
412
413        if version == 1:
414            precondition(len(data) >= 0x24)
415            x = 0x0c
416            fieldsize = 0x4
417            fieldstruct = ">L"
418        else:
419            precondition(len(data) >= 0x44)
420            x = 0x14
421            fieldsize = 0x8
422            fieldstruct = ">Q"
423
424        self._version = version
425        self._fieldsize = fieldsize
426        self._fieldstruct = fieldstruct
427
428        for field_name in ( 'data',
429                            'plaintext_hash_tree', # UNUSED
430                            'crypttext_hash_tree',
431                            'block_hashes',
432                            'share_hashes',
433                            'uri_extension',
434                           ):
435            offset = struct.unpack(fieldstruct, data[x:x+fieldsize])[0]
436            x += fieldsize
437            self._offsets[field_name] = offset
438        return self._offsets
439
440    def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
441        offset = self._offsets['data'] + blocknum * blocksize
442        return self._read(offset, thisblocksize)
443
444    def get_block_data(self, blocknum, blocksize, thisblocksize):
445        d = self._start_if_needed()
446        d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
447        return d
448
449    def _str2l(self, s):
450        """ split string (pulled from storage) into a list of blockids """
451        return [ s[i:i+HASH_SIZE]
452                 for i in range(0, len(s), HASH_SIZE) ]
453
454    def _get_crypttext_hashes(self, unused=None):
455        offset = self._offsets['crypttext_hash_tree']
456        size = self._offsets['block_hashes'] - offset
457        d = self._read(offset, size)
458        d.addCallback(self._str2l)
459        return d
460
461    def get_crypttext_hashes(self):
462        d = self._start_if_needed()
463        d.addCallback(self._get_crypttext_hashes)
464        return d
465
466    def _get_block_hashes(self, unused=None, at_least_these=()):
467        # TODO: fetch only at_least_these instead of all of them.
468        offset = self._offsets['block_hashes']
469        size = self._offsets['share_hashes'] - offset
470        d = self._read(offset, size)
471        d.addCallback(self._str2l)
472        return d
473
474    def get_block_hashes(self, at_least_these=()):
475        if at_least_these:
476            d = self._start_if_needed()
477            d.addCallback(self._get_block_hashes, at_least_these)
478            return d
479        else:
480            return defer.succeed([])
481
482    def get_share_hashes(self):
483        d = self._start_if_needed()
484        d.addCallback(self._get_share_hashes)
485        return d
486
487    def _get_share_hashes(self, _ignore):
488        """ Tahoe storage servers < v1.3.0 would return an error if you tried
489        to read past the end of the share, so we need to use the offset and
490        read just that much.
491
492        HTTP-based storage protocol also doesn't like reading past the end.
493        """
494        offset = self._offsets['share_hashes']
495        size = self._offsets['uri_extension'] - offset
496        if size % (2+HASH_SIZE) != 0:
497            raise LayoutInvalid("share hash tree corrupted -- should occupy a multiple of %d bytes, not %d bytes" % ((2+HASH_SIZE), size))
498        d = self._read(offset, size)
499        def _unpack_share_hashes(data):
500            if len(data) != size:
501                raise LayoutInvalid("share hash tree corrupted -- got a short read of the share data -- should have gotten %d, not %d bytes" % (size, len(data)))
502            hashes = []
503            for i in range(0, size, 2+HASH_SIZE):
504                hashnum = struct.unpack(">H", data[i:i+2])[0]
505                hashvalue = data[i+2:i+2+HASH_SIZE]
506                hashes.append( (hashnum, hashvalue) )
507            return hashes
508        d.addCallback(_unpack_share_hashes)
509        return d
510
511    def _get_uri_extension(self, unused=None):
512        """ Tahoe storage servers < v1.3.0 would return an error if you tried
513        to read past the end of the share, so we need to fetch the UEB size
514        and then read just that much.
515
516        HTTP-based storage protocol also doesn't like reading past the end.
517        """
518        offset = self._offsets['uri_extension']
519        d = self._read(offset, self._fieldsize)
520        def _got_length(data):
521            if len(data) != self._fieldsize:
522                raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
523            length = struct.unpack(self._fieldstruct, data)[0]
524            if length >= 2000:
525                # URI extension blocks are around 419 bytes long; in previous
526                # versions of the code 1000 was used as a default catchall. So
527                # 2000 or more must be corrupted.
528                raise RidiculouslyLargeURIExtensionBlock(length)
529
530            return self._read(offset+self._fieldsize, length)
531        d.addCallback(_got_length)
532        return d
533
534    def get_uri_extension(self):
535        d = self._start_if_needed()
536        d.addCallback(self._get_uri_extension)
537        return d
538
539    def _read(self, offset, length):
540        return self._rref.callRemote("read", offset, length)
Note: See TracBrowser for help on using the repository browser.