1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import os, stat, struct |
---|
6 | |
---|
7 | from allmydata.interfaces import ( |
---|
8 | BadWriteEnablerError, |
---|
9 | NoSpace, |
---|
10 | ) |
---|
11 | from allmydata.util import idlib, log |
---|
12 | from allmydata.util.assertutil import precondition |
---|
13 | from allmydata.util.hashutil import timing_safe_compare |
---|
14 | from allmydata.storage.lease import LeaseInfo |
---|
15 | from allmydata.storage.common import UnknownMutableContainerVersionError, \ |
---|
16 | DataTooLargeError |
---|
17 | from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE |
---|
18 | from .mutable_schema import ( |
---|
19 | NEWEST_SCHEMA_VERSION, |
---|
20 | schema_from_header, |
---|
21 | ) |
---|
22 | |
---|
23 | # the MutableShareFile is like the ShareFile, but used for mutable data. It |
---|
24 | # has a different layout. See docs/mutable.txt for more details. |
---|
25 | |
---|
26 | # # offset size name |
---|
27 | # 1 0 32 magic verstr "tahoe mutable container v1" plus binary |
---|
28 | # 2 32 20 write enabler's nodeid |
---|
29 | # 3 52 32 write enabler |
---|
30 | # 4 84 8 data size (actual share data present) (a) |
---|
31 | # 5 92 8 offset of (8) count of extra leases (after data) |
---|
32 | # 6 100 368 four leases, 92 bytes each |
---|
33 | # 0 4 ownerid (0 means "no lease here") |
---|
34 | # 4 4 expiration timestamp |
---|
35 | # 8 32 renewal token |
---|
36 | # 40 32 cancel token |
---|
37 | # 72 20 nodeid which accepted the tokens |
---|
38 | # 7 468 (a) data |
---|
39 | # 8 ?? 4 count of extra leases |
---|
40 | # 9 ?? n*92 extra leases |
---|
41 | |
---|
42 | |
---|
43 | # The struct module doc says that L's are 4 bytes in size., and that Q's are |
---|
44 | # 8 bytes in size. Since compatibility depends upon this, double-check it. |
---|
45 | assert struct.calcsize(">L") == 4, struct.calcsize(">L") |
---|
46 | assert struct.calcsize(">Q") == 8, struct.calcsize(">Q") |
---|
47 | |
---|
48 | class MutableShareFile(object): |
---|
49 | |
---|
50 | sharetype = "mutable" |
---|
51 | DATA_LENGTH_OFFSET = struct.calcsize(">32s20s32s") |
---|
52 | EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8 |
---|
53 | HEADER_SIZE = struct.calcsize(">32s20s32sQQ") # doesn't include leases |
---|
54 | LEASE_SIZE = struct.calcsize(">LL32s32s20s") |
---|
55 | assert LEASE_SIZE == 92 |
---|
56 | DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE |
---|
57 | assert DATA_OFFSET == 468, DATA_OFFSET |
---|
58 | # our sharefiles share with a recognizable string, plus some random |
---|
59 | # binary data to reduce the chance that a regular text file will look |
---|
60 | # like a sharefile. |
---|
61 | MAX_SIZE = MAX_MUTABLE_SHARE_SIZE |
---|
62 | # TODO: decide upon a policy for max share size |
---|
63 | |
---|
64 | @classmethod |
---|
65 | def is_valid_header(cls, header): |
---|
66 | # type: (bytes) -> bool |
---|
67 | """ |
---|
68 | Determine if the given bytes constitute a valid header for this type of |
---|
69 | container. |
---|
70 | |
---|
71 | :param header: Some bytes from the beginning of a container. |
---|
72 | |
---|
73 | :return: ``True`` if the bytes could belong to this container, |
---|
74 | ``False`` otherwise. |
---|
75 | """ |
---|
76 | return schema_from_header(header) is not None |
---|
77 | |
---|
78 | def __init__(self, filename, parent=None, schema=NEWEST_SCHEMA_VERSION): |
---|
79 | self.home = filename |
---|
80 | if os.path.exists(self.home): |
---|
81 | # we don't cache anything, just check the magic |
---|
82 | with open(self.home, 'rb') as f: |
---|
83 | header = f.read(self.HEADER_SIZE) |
---|
84 | self._schema = schema_from_header(header) |
---|
85 | if self._schema is None: |
---|
86 | raise UnknownMutableContainerVersionError(filename, header) |
---|
87 | else: |
---|
88 | self._schema = schema |
---|
89 | self.parent = parent # for logging |
---|
90 | |
---|
91 | def log(self, *args, **kwargs): |
---|
92 | return self.parent.log(*args, **kwargs) |
---|
93 | |
---|
94 | def create(self, my_nodeid, write_enabler): |
---|
95 | assert not os.path.exists(self.home) |
---|
96 | with open(self.home, 'wb') as f: |
---|
97 | f.write(self._schema.header(my_nodeid, write_enabler)) |
---|
98 | |
---|
99 | def unlink(self): |
---|
100 | os.unlink(self.home) |
---|
101 | |
---|
102 | def _read_data_length(self, f): |
---|
103 | f.seek(self.DATA_LENGTH_OFFSET) |
---|
104 | (data_length,) = struct.unpack(">Q", f.read(8)) |
---|
105 | return data_length |
---|
106 | |
---|
107 | def _write_data_length(self, f, data_length): |
---|
108 | f.seek(self.DATA_LENGTH_OFFSET) |
---|
109 | f.write(struct.pack(">Q", data_length)) |
---|
110 | |
---|
111 | def _read_share_data(self, f, offset, length): |
---|
112 | precondition(offset >= 0) |
---|
113 | precondition(length >= 0) |
---|
114 | data_length = self._read_data_length(f) |
---|
115 | if offset+length > data_length: |
---|
116 | # reads beyond the end of the data are truncated. Reads that |
---|
117 | # start beyond the end of the data return an empty string. |
---|
118 | length = max(0, data_length-offset) |
---|
119 | if length == 0: |
---|
120 | return b"" |
---|
121 | precondition(offset+length <= data_length) |
---|
122 | f.seek(self.DATA_OFFSET+offset) |
---|
123 | data = f.read(length) |
---|
124 | return data |
---|
125 | |
---|
126 | def _read_extra_lease_offset(self, f): |
---|
127 | f.seek(self.EXTRA_LEASE_OFFSET) |
---|
128 | (extra_lease_offset,) = struct.unpack(">Q", f.read(8)) |
---|
129 | return extra_lease_offset |
---|
130 | |
---|
131 | def _write_extra_lease_offset(self, f, offset): |
---|
132 | f.seek(self.EXTRA_LEASE_OFFSET) |
---|
133 | f.write(struct.pack(">Q", offset)) |
---|
134 | |
---|
135 | def _read_num_extra_leases(self, f): |
---|
136 | offset = self._read_extra_lease_offset(f) |
---|
137 | f.seek(offset) |
---|
138 | (num_extra_leases,) = struct.unpack(">L", f.read(4)) |
---|
139 | return num_extra_leases |
---|
140 | |
---|
141 | def _write_num_extra_leases(self, f, num_leases): |
---|
142 | extra_lease_offset = self._read_extra_lease_offset(f) |
---|
143 | f.seek(extra_lease_offset) |
---|
144 | f.write(struct.pack(">L", num_leases)) |
---|
145 | |
---|
146 | def _change_container_size(self, f, new_container_size): |
---|
147 | if new_container_size > self.MAX_SIZE: |
---|
148 | raise DataTooLargeError() |
---|
149 | old_extra_lease_offset = self._read_extra_lease_offset(f) |
---|
150 | new_extra_lease_offset = self.DATA_OFFSET + new_container_size |
---|
151 | if new_extra_lease_offset < old_extra_lease_offset: |
---|
152 | # TODO: allow containers to shrink. For now they remain large. |
---|
153 | return |
---|
154 | num_extra_leases = self._read_num_extra_leases(f) |
---|
155 | f.seek(old_extra_lease_offset) |
---|
156 | leases_size = 4 + num_extra_leases * self.LEASE_SIZE |
---|
157 | extra_lease_data = f.read(leases_size) |
---|
158 | |
---|
159 | # Zero out the old lease info (in order to minimize the chance that |
---|
160 | # it could accidentally be exposed to a reader later, re #1528). |
---|
161 | f.seek(old_extra_lease_offset) |
---|
162 | f.write(b'\x00' * leases_size) |
---|
163 | f.flush() |
---|
164 | |
---|
165 | # An interrupt here will corrupt the leases. |
---|
166 | |
---|
167 | f.seek(new_extra_lease_offset) |
---|
168 | f.write(extra_lease_data) |
---|
169 | self._write_extra_lease_offset(f, new_extra_lease_offset) |
---|
170 | |
---|
171 | def _write_share_data(self, f, offset, data): |
---|
172 | length = len(data) |
---|
173 | precondition(offset >= 0) |
---|
174 | data_length = self._read_data_length(f) |
---|
175 | extra_lease_offset = self._read_extra_lease_offset(f) |
---|
176 | |
---|
177 | if offset+length >= data_length: |
---|
178 | # They are expanding their data size. |
---|
179 | |
---|
180 | if self.DATA_OFFSET+offset+length > extra_lease_offset: |
---|
181 | # TODO: allow containers to shrink. For now, they remain |
---|
182 | # large. |
---|
183 | |
---|
184 | # Their new data won't fit in the current container, so we |
---|
185 | # have to move the leases. With luck, they're expanding it |
---|
186 | # more than the size of the extra lease block, which will |
---|
187 | # minimize the corrupt-the-share window |
---|
188 | self._change_container_size(f, offset+length) |
---|
189 | extra_lease_offset = self._read_extra_lease_offset(f) |
---|
190 | |
---|
191 | # an interrupt here is ok.. the container has been enlarged |
---|
192 | # but the data remains untouched |
---|
193 | |
---|
194 | assert self.DATA_OFFSET+offset+length <= extra_lease_offset |
---|
195 | # Their data now fits in the current container. We must write |
---|
196 | # their new data and modify the recorded data size. |
---|
197 | |
---|
198 | # Fill any newly exposed empty space with 0's. |
---|
199 | if offset > data_length: |
---|
200 | f.seek(self.DATA_OFFSET+data_length) |
---|
201 | f.write(b'\x00'*(offset - data_length)) |
---|
202 | f.flush() |
---|
203 | |
---|
204 | new_data_length = offset+length |
---|
205 | self._write_data_length(f, new_data_length) |
---|
206 | # an interrupt here will result in a corrupted share |
---|
207 | |
---|
208 | # now all that's left to do is write out their data |
---|
209 | f.seek(self.DATA_OFFSET+offset) |
---|
210 | f.write(data) |
---|
211 | return |
---|
212 | |
---|
213 | def _write_lease_record(self, f, lease_number, lease_info): |
---|
214 | extra_lease_offset = self._read_extra_lease_offset(f) |
---|
215 | num_extra_leases = self._read_num_extra_leases(f) |
---|
216 | if lease_number < 4: |
---|
217 | offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE |
---|
218 | elif (lease_number-4) < num_extra_leases: |
---|
219 | offset = (extra_lease_offset |
---|
220 | + 4 |
---|
221 | + (lease_number-4)*self.LEASE_SIZE) |
---|
222 | else: |
---|
223 | # must add an extra lease record |
---|
224 | self._write_num_extra_leases(f, num_extra_leases+1) |
---|
225 | offset = (extra_lease_offset |
---|
226 | + 4 |
---|
227 | + (lease_number-4)*self.LEASE_SIZE) |
---|
228 | f.seek(offset) |
---|
229 | assert f.tell() == offset |
---|
230 | f.write(self._schema.lease_serializer.serialize(lease_info)) |
---|
231 | |
---|
232 | def _read_lease_record(self, f, lease_number): |
---|
233 | # returns a LeaseInfo instance, or None |
---|
234 | extra_lease_offset = self._read_extra_lease_offset(f) |
---|
235 | num_extra_leases = self._read_num_extra_leases(f) |
---|
236 | if lease_number < 4: |
---|
237 | offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE |
---|
238 | elif (lease_number-4) < num_extra_leases: |
---|
239 | offset = (extra_lease_offset |
---|
240 | + 4 |
---|
241 | + (lease_number-4)*self.LEASE_SIZE) |
---|
242 | else: |
---|
243 | raise IndexError("No such lease number %d" % lease_number) |
---|
244 | f.seek(offset) |
---|
245 | assert f.tell() == offset |
---|
246 | data = f.read(self.LEASE_SIZE) |
---|
247 | lease_info = self._schema.lease_serializer.unserialize(data) |
---|
248 | if lease_info.owner_num == 0: |
---|
249 | return None |
---|
250 | return lease_info |
---|
251 | |
---|
252 | def _get_num_lease_slots(self, f): |
---|
253 | # how many places do we have allocated for leases? Not all of them |
---|
254 | # are filled. |
---|
255 | num_extra_leases = self._read_num_extra_leases(f) |
---|
256 | return 4+num_extra_leases |
---|
257 | |
---|
258 | def _get_first_empty_lease_slot(self, f): |
---|
259 | # return an int with the index of an empty slot, or None if we do not |
---|
260 | # currently have an empty slot |
---|
261 | |
---|
262 | for i in range(self._get_num_lease_slots(f)): |
---|
263 | if self._read_lease_record(f, i) is None: |
---|
264 | return i |
---|
265 | return None |
---|
266 | |
---|
267 | def get_leases(self): |
---|
268 | """Yields a LeaseInfo instance for all leases.""" |
---|
269 | with open(self.home, 'rb') as f: |
---|
270 | for i, lease in self._enumerate_leases(f): |
---|
271 | yield lease |
---|
272 | |
---|
273 | def _enumerate_leases(self, f): |
---|
274 | for i in range(self._get_num_lease_slots(f)): |
---|
275 | try: |
---|
276 | data = self._read_lease_record(f, i) |
---|
277 | if data is not None: |
---|
278 | yield i,data |
---|
279 | except IndexError: |
---|
280 | return |
---|
281 | |
---|
282 | def add_lease(self, available_space, lease_info): |
---|
283 | """ |
---|
284 | Add a new lease to this share. |
---|
285 | |
---|
286 | :param int available_space: The maximum number of bytes of storage to |
---|
287 | commit in this operation. If more than this number of bytes is |
---|
288 | required, raise ``NoSpace`` instead. |
---|
289 | |
---|
290 | :raise NoSpace: If more than ``available_space`` bytes is required to |
---|
291 | complete the operation. In this case, no lease is added. |
---|
292 | |
---|
293 | :return: ``None`` |
---|
294 | """ |
---|
295 | precondition(lease_info.owner_num != 0) # 0 means "no lease here" |
---|
296 | with open(self.home, 'rb+') as f: |
---|
297 | num_lease_slots = self._get_num_lease_slots(f) |
---|
298 | empty_slot = self._get_first_empty_lease_slot(f) |
---|
299 | if empty_slot is not None: |
---|
300 | self._write_lease_record(f, empty_slot, lease_info) |
---|
301 | else: |
---|
302 | if lease_info.mutable_size() > available_space: |
---|
303 | raise NoSpace() |
---|
304 | self._write_lease_record(f, num_lease_slots, lease_info) |
---|
305 | |
---|
306 | def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False): |
---|
307 | # type: (bytes, int, bool) -> None |
---|
308 | """ |
---|
309 | Update the expiration time on an existing lease. |
---|
310 | |
---|
311 | :param allow_backdate: If ``True`` then allow the new expiration time |
---|
312 | to be before the current expiration time. Otherwise, make no |
---|
313 | change when this is the case. |
---|
314 | |
---|
315 | :raise IndexError: If there is no lease matching the given renew |
---|
316 | secret. |
---|
317 | """ |
---|
318 | accepting_nodeids = set() |
---|
319 | with open(self.home, 'rb+') as f: |
---|
320 | for (leasenum,lease) in self._enumerate_leases(f): |
---|
321 | if lease.is_renew_secret(renew_secret): |
---|
322 | # yup. See if we need to update the owner time. |
---|
323 | if allow_backdate or new_expire_time > lease.get_expiration_time(): |
---|
324 | # yes |
---|
325 | lease = lease.renew(new_expire_time) |
---|
326 | self._write_lease_record(f, leasenum, lease) |
---|
327 | return |
---|
328 | accepting_nodeids.add(lease.nodeid) |
---|
329 | # Return the accepting_nodeids set, to give the client a chance to |
---|
330 | # update the leases on a share which has been migrated from its |
---|
331 | # original server to a new one. |
---|
332 | msg = ("Unable to renew non-existent lease. I have leases accepted by" |
---|
333 | " nodeids: ") |
---|
334 | msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid)) |
---|
335 | for anid in accepting_nodeids]) |
---|
336 | msg += " ." |
---|
337 | raise IndexError(msg) |
---|
338 | |
---|
339 | def add_or_renew_lease(self, available_space, lease_info): |
---|
340 | precondition(lease_info.owner_num != 0) # 0 means "no lease here" |
---|
341 | try: |
---|
342 | self.renew_lease(lease_info.renew_secret, |
---|
343 | lease_info.get_expiration_time()) |
---|
344 | except IndexError: |
---|
345 | self.add_lease(available_space, lease_info) |
---|
346 | |
---|
347 | def cancel_lease(self, cancel_secret): |
---|
348 | """Remove any leases with the given cancel_secret. If the last lease |
---|
349 | is cancelled, the file will be removed. Return the number of bytes |
---|
350 | that were freed (by truncating the list of leases, and possibly by |
---|
351 | deleting the file. Raise IndexError if there was no lease with the |
---|
352 | given cancel_secret.""" |
---|
353 | |
---|
354 | accepting_nodeids = set() |
---|
355 | modified = 0 |
---|
356 | remaining = 0 |
---|
357 | blank_lease = LeaseInfo(owner_num=0, |
---|
358 | renew_secret=b"\x00"*32, |
---|
359 | cancel_secret=b"\x00"*32, |
---|
360 | expiration_time=0, |
---|
361 | nodeid=b"\x00"*20) |
---|
362 | with open(self.home, 'rb+') as f: |
---|
363 | for (leasenum,lease) in self._enumerate_leases(f): |
---|
364 | accepting_nodeids.add(lease.nodeid) |
---|
365 | if lease.is_cancel_secret(cancel_secret): |
---|
366 | self._write_lease_record(f, leasenum, blank_lease) |
---|
367 | modified += 1 |
---|
368 | else: |
---|
369 | remaining += 1 |
---|
370 | if modified: |
---|
371 | freed_space = self._pack_leases(f) |
---|
372 | f.close() |
---|
373 | if not remaining: |
---|
374 | freed_space += os.stat(self.home)[stat.ST_SIZE] |
---|
375 | self.unlink() |
---|
376 | return freed_space |
---|
377 | |
---|
378 | msg = ("Unable to cancel non-existent lease. I have leases " |
---|
379 | "accepted by nodeids: ") |
---|
380 | msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid)) |
---|
381 | for anid in accepting_nodeids]) |
---|
382 | msg += " ." |
---|
383 | raise IndexError(msg) |
---|
384 | |
---|
385 | def _pack_leases(self, f): |
---|
386 | # TODO: reclaim space from cancelled leases |
---|
387 | return 0 |
---|
388 | |
---|
389 | def _read_write_enabler_and_nodeid(self, f): |
---|
390 | f.seek(0) |
---|
391 | data = f.read(self.HEADER_SIZE) |
---|
392 | (magic, |
---|
393 | write_enabler_nodeid, write_enabler, |
---|
394 | data_length, extra_least_offset) = \ |
---|
395 | struct.unpack(">32s20s32sQQ", data) |
---|
396 | assert self.is_valid_header(data) |
---|
397 | return (write_enabler, write_enabler_nodeid) |
---|
398 | |
---|
399 | def readv(self, readv): |
---|
400 | datav = [] |
---|
401 | with open(self.home, 'rb') as f: |
---|
402 | for (offset, length) in readv: |
---|
403 | datav.append(self._read_share_data(f, offset, length)) |
---|
404 | return datav |
---|
405 | |
---|
406 | def get_length(self): |
---|
407 | """ |
---|
408 | Return the length of the data in the share. |
---|
409 | """ |
---|
410 | f = open(self.home, 'rb') |
---|
411 | data_length = self._read_data_length(f) |
---|
412 | f.close() |
---|
413 | return data_length |
---|
414 | |
---|
415 | def check_write_enabler(self, write_enabler, si_s): |
---|
416 | with open(self.home, 'rb+') as f: |
---|
417 | (real_write_enabler, write_enabler_nodeid) = \ |
---|
418 | self._read_write_enabler_and_nodeid(f) |
---|
419 | # avoid a timing attack |
---|
420 | #if write_enabler != real_write_enabler: |
---|
421 | if not timing_safe_compare(write_enabler, real_write_enabler): |
---|
422 | # accomodate share migration by reporting the nodeid used for the |
---|
423 | # old write enabler. |
---|
424 | self.log(format="bad write enabler on SI %(si)s," |
---|
425 | " recorded by nodeid %(nodeid)s", |
---|
426 | facility="tahoe.storage", |
---|
427 | level=log.WEIRD, umid="cE1eBQ", |
---|
428 | si=si_s, nodeid=idlib.nodeid_b2a(write_enabler_nodeid)) |
---|
429 | msg = "The write enabler was recorded by nodeid '%s'." % \ |
---|
430 | (idlib.nodeid_b2a(write_enabler_nodeid),) |
---|
431 | raise BadWriteEnablerError(msg) |
---|
432 | |
---|
433 | def check_testv(self, testv): |
---|
434 | test_good = True |
---|
435 | with open(self.home, 'rb+') as f: |
---|
436 | for (offset, length, operator, specimen) in testv: |
---|
437 | data = self._read_share_data(f, offset, length) |
---|
438 | if not testv_compare(data, operator, specimen): |
---|
439 | test_good = False |
---|
440 | break |
---|
441 | return test_good |
---|
442 | |
---|
443 | def writev(self, datav, new_length): |
---|
444 | with open(self.home, 'rb+') as f: |
---|
445 | for (offset, data) in datav: |
---|
446 | self._write_share_data(f, offset, data) |
---|
447 | if new_length is not None: |
---|
448 | cur_length = self._read_data_length(f) |
---|
449 | if new_length < cur_length: |
---|
450 | self._write_data_length(f, new_length) |
---|
451 | # TODO: if we're going to shrink the share file when the |
---|
452 | # share data has shrunk, then call |
---|
453 | # self._change_container_size() here. |
---|
454 | |
---|
455 | def testv_compare(a, op, b): |
---|
456 | assert op == b"eq" |
---|
457 | return a == b |
---|
458 | |
---|
459 | |
---|
460 | class EmptyShare(object): |
---|
461 | |
---|
462 | def check_testv(self, testv): |
---|
463 | test_good = True |
---|
464 | for (offset, length, operator, specimen) in testv: |
---|
465 | data = b"" |
---|
466 | if not testv_compare(data, operator, specimen): |
---|
467 | test_good = False |
---|
468 | break |
---|
469 | return test_good |
---|
470 | |
---|
471 | def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent): |
---|
472 | ms = MutableShareFile(filename, parent) |
---|
473 | ms.create(my_nodeid, write_enabler) |
---|
474 | del ms |
---|
475 | return MutableShareFile(filename, parent) |
---|