1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import os, stat, struct, time |
---|
6 | |
---|
7 | from collections_extended import RangeMap |
---|
8 | |
---|
9 | from foolscap.api import Referenceable |
---|
10 | |
---|
11 | from zope.interface import implementer |
---|
12 | from allmydata.interfaces import ( |
---|
13 | RIBucketWriter, RIBucketReader, ConflictingWriteError, |
---|
14 | DataTooLargeError, |
---|
15 | NoSpace, |
---|
16 | ) |
---|
17 | from allmydata.util import base32, fileutil, log |
---|
18 | from allmydata.util.assertutil import precondition |
---|
19 | from allmydata.storage.common import UnknownImmutableContainerVersionError |
---|
20 | |
---|
21 | from .immutable_schema import ( |
---|
22 | NEWEST_SCHEMA_VERSION, |
---|
23 | schema_from_version, |
---|
24 | ) |
---|
25 | |
---|
26 | |
---|
27 | # each share file (in storage/shares/$SI/$SHNUM) contains lease information |
---|
28 | # and share data. The share data is accessed by RIBucketWriter.write and |
---|
29 | # RIBucketReader.read . The lease information is not accessible through these |
---|
30 | # interfaces. |
---|
31 | |
---|
32 | # The share file has the following layout: |
---|
33 | # 0x00: share file version number, four bytes, current version is 2 |
---|
34 | # 0x04: share data length, four bytes big-endian = A # See Footnote 1 below. |
---|
35 | # 0x08: number of leases, four bytes big-endian |
---|
36 | # 0x0c: beginning of share data (see immutable.layout.WriteBucketProxy) |
---|
37 | # A+0x0c = B: first lease. Lease format is: |
---|
38 | # B+0x00: owner number, 4 bytes big-endian, 0 is reserved for no-owner |
---|
39 | # B+0x04: renew secret, 32 bytes (SHA256 + blake2b) # See Footnote 2 below. |
---|
40 | # B+0x24: cancel secret, 32 bytes (SHA256 + blake2b) |
---|
41 | # B+0x44: expiration time, 4 bytes big-endian seconds-since-epoch |
---|
42 | # B+0x48: next lease, or end of record |
---|
43 | |
---|
44 | # Footnote 1: as of Tahoe v1.3.0 this field is not used by storage servers, |
---|
45 | # but it is still filled in by storage servers in case the storage server |
---|
46 | # software gets downgraded from >= Tahoe v1.3.0 to < Tahoe v1.3.0, or the |
---|
47 | # share file is moved from one storage server to another. The value stored in |
---|
48 | # this field is truncated, so if the actual share data length is >= 2**32, |
---|
49 | # then the value stored in this field will be the actual share data length |
---|
50 | # modulo 2**32. |
---|
51 | |
---|
52 | # Footnote 2: The change between share file version number 1 and 2 is that |
---|
53 | # storage of lease secrets is changed from plaintext to hashed. This change |
---|
54 | # protects the secrets from compromises of local storage on the server: if a |
---|
55 | # plaintext cancel secret is somehow exfiltrated from the storage server, an |
---|
56 | # attacker could use it to cancel that lease and potentially cause user data |
---|
57 | # to be discarded before intended by the real owner. As of this comment, |
---|
58 | # lease cancellation is disabled because there have been at least two bugs |
---|
59 | # which leak the persisted value of the cancellation secret. If lease secrets |
---|
60 | # were stored hashed instead of plaintext then neither of these bugs would |
---|
61 | # have allowed an attacker to learn a usable cancel secret. |
---|
62 | # |
---|
63 | # Clients are free to construct these secrets however they like. The |
---|
64 | # Tahoe-LAFS client uses a SHA256-based construction. The server then uses |
---|
65 | # blake2b to hash these values for storage so that it retains no persistent |
---|
66 | # copy of the original secret. |
---|
67 | # |
---|
68 | |
---|
69 | def _fix_lease_count_format(lease_count_format): |
---|
70 | """ |
---|
71 | Turn a single character struct format string into a format string suitable |
---|
72 | for use in encoding and decoding the lease count value inside a share |
---|
73 | file, if possible. |
---|
74 | |
---|
75 | :param str lease_count_format: A single character format string like |
---|
76 | ``"B"`` or ``"L"``. |
---|
77 | |
---|
78 | :raise ValueError: If the given format string is not suitable for use |
---|
79 | encoding and decoding a lease count. |
---|
80 | |
---|
81 | :return str: A complete format string which can safely be used to encode |
---|
82 | and decode lease counts in a share file. |
---|
83 | """ |
---|
84 | if len(lease_count_format) != 1: |
---|
85 | raise ValueError( |
---|
86 | "Cannot construct ShareFile with lease_count_format={!r}; " |
---|
87 | "format must accept a single value".format( |
---|
88 | lease_count_format, |
---|
89 | ), |
---|
90 | ) |
---|
91 | # Make it big-endian with standard size so all platforms agree on the |
---|
92 | # result. |
---|
93 | fixed = ">" + lease_count_format |
---|
94 | if struct.calcsize(fixed) > 4: |
---|
95 | # There is only room for at most 4 bytes in the share file format so |
---|
96 | # we can't allow any larger formats. |
---|
97 | raise ValueError( |
---|
98 | "Cannot construct ShareFile with lease_count_format={!r}; " |
---|
99 | "size must be smaller than size of '>L'".format( |
---|
100 | lease_count_format, |
---|
101 | ), |
---|
102 | ) |
---|
103 | return fixed |
---|
104 | |
---|
105 | |
---|
106 | class ShareFile(object): |
---|
107 | """ |
---|
108 | Support interaction with persistent storage of a share. |
---|
109 | |
---|
110 | :ivar str _lease_count_format: The format string which is used to encode |
---|
111 | and decode the lease count inside the share file. As stated in the |
---|
112 | comment in this module there is room for at most 4 bytes in this part |
---|
113 | of the file. A format string that works on fewer bytes is allowed to |
---|
114 | restrict the number of leases allowed in the share file to a smaller |
---|
115 | number than could be supported by using the full 4 bytes. This is |
---|
116 | mostly of interest for testing. |
---|
117 | """ |
---|
118 | LEASE_SIZE = struct.calcsize(">L32s32sL") |
---|
119 | sharetype = "immutable" |
---|
120 | |
---|
121 | @classmethod |
---|
122 | def is_valid_header(cls, header): |
---|
123 | # type: (bytes) -> bool |
---|
124 | """ |
---|
125 | Determine if the given bytes constitute a valid header for this type of |
---|
126 | container. |
---|
127 | |
---|
128 | :param header: Some bytes from the beginning of a container. |
---|
129 | |
---|
130 | :return: ``True`` if the bytes could belong to this container, |
---|
131 | ``False`` otherwise. |
---|
132 | """ |
---|
133 | (version,) = struct.unpack(">L", header[:4]) |
---|
134 | return schema_from_version(version) is not None |
---|
135 | |
---|
136 | def __init__( |
---|
137 | self, |
---|
138 | filename, |
---|
139 | max_size=None, |
---|
140 | create=False, |
---|
141 | lease_count_format="L", |
---|
142 | schema=NEWEST_SCHEMA_VERSION, |
---|
143 | ): |
---|
144 | """ |
---|
145 | Initialize a ``ShareFile``. |
---|
146 | |
---|
147 | :param Optional[int] max_size: If given, the maximum number of bytes |
---|
148 | that this ``ShareFile`` will accept to be stored. |
---|
149 | |
---|
150 | :param bool create: If ``True``, create the file (and fail if it |
---|
151 | exists already). ``max_size`` must not be ``None`` in this case. |
---|
152 | If ``False``, open an existing file for reading. |
---|
153 | |
---|
154 | :param str lease_count_format: A format character to use to encode and |
---|
155 | decode the number of leases in the share file. There are only 4 |
---|
156 | bytes available in the file so the format must be 4 bytes or |
---|
157 | smaller. If different formats are used at different times with |
---|
158 | the same share file, the result will likely be nonsense. |
---|
159 | |
---|
160 | This parameter is intended for the test suite to use to be able to |
---|
161 | exercise values near the maximum encodeable value without having |
---|
162 | to create billions of leases. |
---|
163 | |
---|
164 | :raise ValueError: If the encoding of ``lease_count_format`` is too |
---|
165 | large or if it is not a single format character. |
---|
166 | """ |
---|
167 | |
---|
168 | precondition((max_size is not None) or (not create), max_size, create) |
---|
169 | |
---|
170 | self._lease_count_format = _fix_lease_count_format(lease_count_format) |
---|
171 | self._lease_count_size = struct.calcsize(self._lease_count_format) |
---|
172 | self.home = filename |
---|
173 | self._max_size = max_size |
---|
174 | if create: |
---|
175 | # touch the file, so later callers will see that we're working on |
---|
176 | # it. Also construct the metadata. |
---|
177 | assert not os.path.exists(self.home) |
---|
178 | fileutil.make_dirs(os.path.dirname(self.home)) |
---|
179 | self._schema = schema |
---|
180 | with open(self.home, 'wb') as f: |
---|
181 | f.write(self._schema.header(max_size)) |
---|
182 | self._lease_offset = max_size + 0x0c |
---|
183 | self._num_leases = 0 |
---|
184 | else: |
---|
185 | with open(self.home, 'rb') as f: |
---|
186 | filesize = os.path.getsize(self.home) |
---|
187 | (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) |
---|
188 | self._schema = schema_from_version(version) |
---|
189 | if self._schema is None: |
---|
190 | raise UnknownImmutableContainerVersionError(filename, version) |
---|
191 | self._num_leases = num_leases |
---|
192 | self._lease_offset = filesize - (num_leases * self.LEASE_SIZE) |
---|
193 | self._length = filesize - 0xc - (num_leases * self.LEASE_SIZE) |
---|
194 | |
---|
195 | self._data_offset = 0xc |
---|
196 | |
---|
197 | def get_length(self): |
---|
198 | """ |
---|
199 | Return the length of the data in the share, if we're reading. |
---|
200 | """ |
---|
201 | return self._length |
---|
202 | |
---|
203 | def unlink(self): |
---|
204 | os.unlink(self.home) |
---|
205 | |
---|
206 | def read_share_data(self, offset, length): |
---|
207 | precondition(offset >= 0) |
---|
208 | # reads beyond the end of the data are truncated. Reads that start |
---|
209 | # beyond the end of the data return an empty string. |
---|
210 | seekpos = self._data_offset+offset |
---|
211 | actuallength = max(0, min(length, self._lease_offset-seekpos)) |
---|
212 | if actuallength == 0: |
---|
213 | return b"" |
---|
214 | with open(self.home, 'rb') as f: |
---|
215 | f.seek(seekpos) |
---|
216 | return f.read(actuallength) |
---|
217 | |
---|
218 | def write_share_data(self, offset, data): |
---|
219 | length = len(data) |
---|
220 | precondition(offset >= 0, offset) |
---|
221 | if self._max_size is not None and offset+length > self._max_size: |
---|
222 | raise DataTooLargeError(self._max_size, offset, length) |
---|
223 | with open(self.home, 'rb+') as f: |
---|
224 | real_offset = self._data_offset+offset |
---|
225 | f.seek(real_offset) |
---|
226 | assert f.tell() == real_offset |
---|
227 | f.write(data) |
---|
228 | |
---|
229 | def _write_lease_record(self, f, lease_number, lease_info): |
---|
230 | offset = self._lease_offset + lease_number * self.LEASE_SIZE |
---|
231 | f.seek(offset) |
---|
232 | assert f.tell() == offset |
---|
233 | f.write(self._schema.lease_serializer.serialize(lease_info)) |
---|
234 | |
---|
235 | def _read_num_leases(self, f): |
---|
236 | f.seek(0x08) |
---|
237 | (num_leases,) = struct.unpack( |
---|
238 | self._lease_count_format, |
---|
239 | f.read(self._lease_count_size), |
---|
240 | ) |
---|
241 | return num_leases |
---|
242 | |
---|
243 | def _write_num_leases(self, f, num_leases): |
---|
244 | self._write_encoded_num_leases( |
---|
245 | f, |
---|
246 | struct.pack(self._lease_count_format, num_leases), |
---|
247 | ) |
---|
248 | |
---|
249 | def _write_encoded_num_leases(self, f, encoded_num_leases): |
---|
250 | f.seek(0x08) |
---|
251 | f.write(encoded_num_leases) |
---|
252 | |
---|
253 | def _truncate_leases(self, f, num_leases): |
---|
254 | f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE) |
---|
255 | |
---|
256 | def get_leases(self): |
---|
257 | """Yields a LeaseInfo instance for all leases.""" |
---|
258 | with open(self.home, 'rb') as f: |
---|
259 | (version, unused, num_leases) = struct.unpack(">LLL", f.read(0xc)) |
---|
260 | f.seek(self._lease_offset) |
---|
261 | for i in range(num_leases): |
---|
262 | data = f.read(self.LEASE_SIZE) |
---|
263 | if data: |
---|
264 | yield self._schema.lease_serializer.unserialize(data) |
---|
265 | |
---|
266 | def add_lease(self, lease_info): |
---|
267 | with open(self.home, 'rb+') as f: |
---|
268 | num_leases = self._read_num_leases(f) |
---|
269 | # Before we write the new lease record, make sure we can encode |
---|
270 | # the new lease count. |
---|
271 | new_lease_count = struct.pack(self._lease_count_format, num_leases + 1) |
---|
272 | self._write_lease_record(f, num_leases, lease_info) |
---|
273 | self._write_encoded_num_leases(f, new_lease_count) |
---|
274 | |
---|
275 | def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False): |
---|
276 | # type: (bytes, int, bool) -> None |
---|
277 | """ |
---|
278 | Update the expiration time on an existing lease. |
---|
279 | |
---|
280 | :param allow_backdate: If ``True`` then allow the new expiration time |
---|
281 | to be before the current expiration time. Otherwise, make no |
---|
282 | change when this is the case. |
---|
283 | |
---|
284 | :raise IndexError: If there is no lease matching the given renew |
---|
285 | secret. |
---|
286 | """ |
---|
287 | for i,lease in enumerate(self.get_leases()): |
---|
288 | if lease.is_renew_secret(renew_secret): |
---|
289 | # yup. See if we need to update the owner time. |
---|
290 | if allow_backdate or new_expire_time > lease.get_expiration_time(): |
---|
291 | # yes |
---|
292 | lease = lease.renew(new_expire_time) |
---|
293 | with open(self.home, 'rb+') as f: |
---|
294 | self._write_lease_record(f, i, lease) |
---|
295 | return |
---|
296 | raise IndexError("unable to renew non-existent lease") |
---|
297 | |
---|
298 | def add_or_renew_lease(self, available_space, lease_info): |
---|
299 | """ |
---|
300 | Renew an existing lease if possible, otherwise allocate a new one. |
---|
301 | |
---|
302 | :param int available_space: The maximum number of bytes of storage to |
---|
303 | commit in this operation. If more than this number of bytes is |
---|
304 | required, raise ``NoSpace`` instead. |
---|
305 | |
---|
306 | :param LeaseInfo lease_info: The details of the lease to renew or add. |
---|
307 | |
---|
308 | :raise NoSpace: If more than ``available_space`` bytes is required to |
---|
309 | complete the operation. In this case, no lease is added. |
---|
310 | |
---|
311 | :return: ``None`` |
---|
312 | """ |
---|
313 | try: |
---|
314 | self.renew_lease(lease_info.renew_secret, |
---|
315 | lease_info.get_expiration_time()) |
---|
316 | except IndexError: |
---|
317 | if lease_info.immutable_size() > available_space: |
---|
318 | raise NoSpace() |
---|
319 | self.add_lease(lease_info) |
---|
320 | |
---|
321 | def cancel_lease(self, cancel_secret): |
---|
322 | """Remove a lease with the given cancel_secret. If the last lease is |
---|
323 | cancelled, the file will be removed. Return the number of bytes that |
---|
324 | were freed (by truncating the list of leases, and possibly by |
---|
325 | deleting the file. Raise IndexError if there was no lease with the |
---|
326 | given cancel_secret. |
---|
327 | """ |
---|
328 | |
---|
329 | leases = list(self.get_leases()) |
---|
330 | num_leases_removed = 0 |
---|
331 | for i,lease in enumerate(leases): |
---|
332 | if lease.is_cancel_secret(cancel_secret): |
---|
333 | leases[i] = None |
---|
334 | num_leases_removed += 1 |
---|
335 | if not num_leases_removed: |
---|
336 | raise IndexError("unable to find matching lease to cancel") |
---|
337 | if num_leases_removed: |
---|
338 | # pack and write out the remaining leases. We write these out in |
---|
339 | # the same order as they were added, so that if we crash while |
---|
340 | # doing this, we won't lose any non-cancelled leases. |
---|
341 | leases = [l for l in leases if l] # remove the cancelled leases |
---|
342 | with open(self.home, 'rb+') as f: |
---|
343 | for i, lease in enumerate(leases): |
---|
344 | self._write_lease_record(f, i, lease) |
---|
345 | self._write_num_leases(f, len(leases)) |
---|
346 | self._truncate_leases(f, len(leases)) |
---|
347 | space_freed = self.LEASE_SIZE * num_leases_removed |
---|
348 | if not len(leases): |
---|
349 | space_freed += os.stat(self.home)[stat.ST_SIZE] |
---|
350 | self.unlink() |
---|
351 | return space_freed |
---|
352 | |
---|
353 | |
---|
354 | class BucketWriter(object): |
---|
355 | """ |
---|
356 | Keep track of the process of writing to a ShareFile. |
---|
357 | """ |
---|
358 | |
---|
359 | def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock): |
---|
360 | self.ss = ss |
---|
361 | self.incominghome = incominghome |
---|
362 | self.finalhome = finalhome |
---|
363 | self._max_size = max_size # don't allow the client to write more than this |
---|
364 | self.closed = False |
---|
365 | self.throw_out_all_data = False |
---|
366 | self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) |
---|
367 | # also, add our lease to the file now, so that other ones can be |
---|
368 | # added by simultaneous uploaders |
---|
369 | self._sharefile.add_lease(lease_info) |
---|
370 | self._already_written = RangeMap() |
---|
371 | self._clock = clock |
---|
372 | self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout) |
---|
373 | |
---|
374 | def required_ranges(self): # type: () -> RangeMap |
---|
375 | """ |
---|
376 | Return which ranges still need to be written. |
---|
377 | """ |
---|
378 | result = RangeMap() |
---|
379 | result.set(True, 0, self._max_size) |
---|
380 | for start, end, _ in self._already_written.ranges(): |
---|
381 | result.delete(start, end) |
---|
382 | return result |
---|
383 | |
---|
384 | def allocated_size(self): |
---|
385 | return self._max_size |
---|
386 | |
---|
387 | def write(self, offset, data): # type: (int, bytes) -> bool |
---|
388 | """ |
---|
389 | Write data at given offset, return whether the upload is complete. |
---|
390 | """ |
---|
391 | # Delay the timeout, since we received data; if we get an |
---|
392 | # AlreadyCancelled error, that means there's a bug in the client and |
---|
393 | # write() was called after close(). |
---|
394 | self._timeout.reset(30 * 60) |
---|
395 | start = self._clock.seconds() |
---|
396 | precondition(not self.closed) |
---|
397 | if self.throw_out_all_data: |
---|
398 | return False |
---|
399 | |
---|
400 | # Make sure we're not conflicting with existing data: |
---|
401 | end = offset + len(data) |
---|
402 | for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end): |
---|
403 | chunk_len = chunk_stop - chunk_start |
---|
404 | actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len) |
---|
405 | writing_chunk = data[chunk_start - offset:chunk_stop - offset] |
---|
406 | if actual_chunk != writing_chunk: |
---|
407 | raise ConflictingWriteError( |
---|
408 | "Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop) |
---|
409 | ) |
---|
410 | self._sharefile.write_share_data(offset, data) |
---|
411 | |
---|
412 | self._already_written.set(True, offset, end) |
---|
413 | self.ss.add_latency("write", self._clock.seconds() - start) |
---|
414 | self.ss.count("write") |
---|
415 | return self._is_finished() |
---|
416 | |
---|
417 | def _is_finished(self): |
---|
418 | """ |
---|
419 | Return whether the whole thing has been written. |
---|
420 | """ |
---|
421 | return sum([mr.stop - mr.start for mr in self._already_written.ranges()]) == self._max_size |
---|
422 | |
---|
423 | def close(self): |
---|
424 | # This can't actually be enabled, because it's not backwards compatible |
---|
425 | # with old Foolscap clients. |
---|
426 | # assert self._is_finished() |
---|
427 | precondition(not self.closed) |
---|
428 | self._timeout.cancel() |
---|
429 | start = self._clock.seconds() |
---|
430 | |
---|
431 | fileutil.make_dirs(os.path.dirname(self.finalhome)) |
---|
432 | fileutil.rename(self.incominghome, self.finalhome) |
---|
433 | try: |
---|
434 | # self.incominghome is like storage/shares/incoming/ab/abcde/4 . |
---|
435 | # We try to delete the parent (.../ab/abcde) to avoid leaving |
---|
436 | # these directories lying around forever, but the delete might |
---|
437 | # fail if we're working on another share for the same storage |
---|
438 | # index (like ab/abcde/5). The alternative approach would be to |
---|
439 | # use a hierarchy of objects (PrefixHolder, BucketHolder, |
---|
440 | # ShareWriter), each of which is responsible for a single |
---|
441 | # directory on disk, and have them use reference counting of |
---|
442 | # their children to know when they should do the rmdir. This |
---|
443 | # approach is simpler, but relies on os.rmdir refusing to delete |
---|
444 | # a non-empty directory. Do *not* use fileutil.rm_dir() here! |
---|
445 | os.rmdir(os.path.dirname(self.incominghome)) |
---|
446 | # we also delete the grandparent (prefix) directory, .../ab , |
---|
447 | # again to avoid leaving directories lying around. This might |
---|
448 | # fail if there is another bucket open that shares a prefix (like |
---|
449 | # ab/abfff). |
---|
450 | os.rmdir(os.path.dirname(os.path.dirname(self.incominghome))) |
---|
451 | # we leave the great-grandparent (incoming/) directory in place. |
---|
452 | except EnvironmentError: |
---|
453 | # ignore the "can't rmdir because the directory is not empty" |
---|
454 | # exceptions, those are normal consequences of the |
---|
455 | # above-mentioned conditions. |
---|
456 | pass |
---|
457 | self._sharefile = None |
---|
458 | self.closed = True |
---|
459 | |
---|
460 | filelen = os.stat(self.finalhome)[stat.ST_SIZE] |
---|
461 | self.ss.bucket_writer_closed(self, filelen) |
---|
462 | self.ss.add_latency("close", self._clock.seconds() - start) |
---|
463 | self.ss.count("close") |
---|
464 | |
---|
465 | def disconnected(self): |
---|
466 | if not self.closed: |
---|
467 | self.abort() |
---|
468 | |
---|
469 | def _abort_due_to_timeout(self): |
---|
470 | """ |
---|
471 | Called if we run out of time. |
---|
472 | """ |
---|
473 | log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome, |
---|
474 | facility="tahoe.storage", level=log.UNUSUAL) |
---|
475 | self.abort() |
---|
476 | |
---|
477 | def abort(self): |
---|
478 | log.msg("storage: aborting sharefile %s" % self.incominghome, |
---|
479 | facility="tahoe.storage", level=log.UNUSUAL) |
---|
480 | self.ss.count("abort") |
---|
481 | if self.closed: |
---|
482 | return |
---|
483 | |
---|
484 | os.remove(self.incominghome) |
---|
485 | # if we were the last share to be moved, remove the incoming/ |
---|
486 | # directory that was our parent |
---|
487 | parentdir = os.path.split(self.incominghome)[0] |
---|
488 | if not os.listdir(parentdir): |
---|
489 | os.rmdir(parentdir) |
---|
490 | self._sharefile = None |
---|
491 | |
---|
492 | # We are now considered closed for further writing. We must tell |
---|
493 | # the storage server about this so that it stops expecting us to |
---|
494 | # use the space it allocated for us earlier. |
---|
495 | self.closed = True |
---|
496 | self.ss.bucket_writer_closed(self, 0) |
---|
497 | |
---|
498 | # Cancel timeout if it wasn't already cancelled. |
---|
499 | if self._timeout.active(): |
---|
500 | self._timeout.cancel() |
---|
501 | |
---|
502 | |
---|
503 | @implementer(RIBucketWriter) |
---|
504 | class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78 |
---|
505 | """ |
---|
506 | Foolscap-specific BucketWriter. |
---|
507 | """ |
---|
508 | def __init__(self, bucket_writer): |
---|
509 | self._bucket_writer = bucket_writer |
---|
510 | |
---|
511 | def remote_write(self, offset, data): |
---|
512 | self._bucket_writer.write(offset, data) |
---|
513 | |
---|
514 | def remote_close(self): |
---|
515 | return self._bucket_writer.close() |
---|
516 | |
---|
517 | def remote_abort(self): |
---|
518 | return self._bucket_writer.abort() |
---|
519 | |
---|
520 | |
---|
521 | class BucketReader(object): |
---|
522 | """ |
---|
523 | Manage the process for reading from a ``ShareFile``. |
---|
524 | """ |
---|
525 | |
---|
526 | def __init__(self, ss, sharefname, storage_index=None, shnum=None): |
---|
527 | self.ss = ss |
---|
528 | self._share_file = ShareFile(sharefname) |
---|
529 | self.storage_index = storage_index |
---|
530 | self.shnum = shnum |
---|
531 | |
---|
532 | def __repr__(self): |
---|
533 | return "<%s %s %s>" % (self.__class__.__name__, |
---|
534 | base32.b2a(self.storage_index[:8])[:12].decode(), |
---|
535 | self.shnum) |
---|
536 | |
---|
537 | def read(self, offset, length): |
---|
538 | start = time.time() |
---|
539 | data = self._share_file.read_share_data(offset, length) |
---|
540 | self.ss.add_latency("read", time.time() - start) |
---|
541 | self.ss.count("read") |
---|
542 | return data |
---|
543 | |
---|
544 | def advise_corrupt_share(self, reason): |
---|
545 | return self.ss.advise_corrupt_share(b"immutable", |
---|
546 | self.storage_index, |
---|
547 | self.shnum, |
---|
548 | reason) |
---|
549 | |
---|
550 | def get_length(self): |
---|
551 | """ |
---|
552 | Return the length of the data in the share. |
---|
553 | """ |
---|
554 | return self._share_file.get_length() |
---|
555 | |
---|
556 | |
---|
557 | @implementer(RIBucketReader) |
---|
558 | class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78 |
---|
559 | """ |
---|
560 | Foolscap wrapper for ``BucketReader`` |
---|
561 | """ |
---|
562 | |
---|
563 | def __init__(self, bucket_reader): |
---|
564 | self._bucket_reader = bucket_reader |
---|
565 | |
---|
566 | def remote_read(self, offset, length): |
---|
567 | return self._bucket_reader.read(offset, length) |
---|
568 | |
---|
569 | def remote_advise_corrupt_share(self, reason): |
---|
570 | return self._bucket_reader.advise_corrupt_share(reason) |
---|