1 | """Directory Node implementation. |
---|
2 | |
---|
3 | Ported to Python 3. |
---|
4 | """ |
---|
5 | |
---|
6 | import time |
---|
7 | |
---|
8 | from zope.interface import implementer |
---|
9 | from twisted.internet import defer |
---|
10 | from foolscap.api import fireEventually |
---|
11 | |
---|
12 | from allmydata.crypto import aes |
---|
13 | from allmydata.deep_stats import DeepStats |
---|
14 | from allmydata.mutable.common import NotWriteableError |
---|
15 | from allmydata.mutable.filenode import MutableFileNode |
---|
16 | from allmydata.unknown import UnknownNode, strip_prefix_for_ro |
---|
17 | from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \ |
---|
18 | ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \ |
---|
19 | MustBeDeepImmutableError, CapConstraintError, ChildOfWrongTypeError |
---|
20 | from allmydata.check_results import DeepCheckResults, \ |
---|
21 | DeepCheckAndRepairResults |
---|
22 | from allmydata.monitor import Monitor |
---|
23 | from allmydata.util import hashutil, base32, log, jsonbytes as json |
---|
24 | from allmydata.util.encodingutil import quote_output, normalize |
---|
25 | from allmydata.util.assertutil import precondition |
---|
26 | from allmydata.util.netstring import netstring, split_netstring |
---|
27 | from allmydata.util.consumer import download_to_data |
---|
28 | from allmydata.uri import wrap_dirnode_cap |
---|
29 | from allmydata.util.dictutil import AuxValueDict |
---|
30 | |
---|
31 | from eliot import ( |
---|
32 | ActionType, |
---|
33 | Field, |
---|
34 | ) |
---|
35 | from eliot.twisted import ( |
---|
36 | DeferredContext, |
---|
37 | ) |
---|
38 | |
---|
39 | NAME = Field.for_types( |
---|
40 | "name", |
---|
41 | [str], |
---|
42 | "The name linking the parent to this node.", |
---|
43 | ) |
---|
44 | |
---|
45 | METADATA = Field.for_types( |
---|
46 | "metadata", |
---|
47 | [dict], |
---|
48 | "Data about a node.", |
---|
49 | ) |
---|
50 | |
---|
51 | OVERWRITE = Field.for_types( |
---|
52 | "overwrite", |
---|
53 | [bool], |
---|
54 | "True to replace an existing file of the same name, " |
---|
55 | "false to fail with a collision error.", |
---|
56 | ) |
---|
57 | |
---|
58 | ADD_FILE = ActionType( |
---|
59 | "dirnode:add-file", |
---|
60 | [NAME, METADATA, OVERWRITE], |
---|
61 | [], |
---|
62 | "Add a new file as a child of a directory.", |
---|
63 | ) |
---|
64 | |
---|
65 | |
---|
66 | class _OnlyFiles(object): |
---|
67 | """Marker for replacement option of only replacing files.""" |
---|
68 | |
---|
69 | ONLY_FILES = _OnlyFiles() |
---|
70 | |
---|
71 | |
---|
72 | def update_metadata(metadata, new_metadata, now): |
---|
73 | """Updates 'metadata' in-place with the information in 'new_metadata'. |
---|
74 | |
---|
75 | Timestamps are set according to the time 'now'. |
---|
76 | """ |
---|
77 | |
---|
78 | if metadata is None: |
---|
79 | metadata = {} |
---|
80 | |
---|
81 | old_ctime = None |
---|
82 | if 'ctime' in metadata: |
---|
83 | old_ctime = metadata['ctime'] |
---|
84 | |
---|
85 | if new_metadata is not None: |
---|
86 | # Overwrite all metadata. |
---|
87 | newmd = new_metadata.copy() |
---|
88 | |
---|
89 | # Except 'tahoe'. |
---|
90 | if 'tahoe' in newmd: |
---|
91 | del newmd['tahoe'] |
---|
92 | if 'tahoe' in metadata: |
---|
93 | newmd['tahoe'] = metadata['tahoe'] |
---|
94 | |
---|
95 | metadata = newmd |
---|
96 | |
---|
97 | # update timestamps |
---|
98 | sysmd = metadata.get('tahoe', {}) |
---|
99 | if 'linkcrtime' not in sysmd: |
---|
100 | # In Tahoe < 1.4.0 we used the word 'ctime' to mean what Tahoe >= 1.4.0 |
---|
101 | # calls 'linkcrtime'. This field is only used if it was in the old metadata, |
---|
102 | # and 'tahoe:linkcrtime' was not. |
---|
103 | if old_ctime is not None: |
---|
104 | sysmd['linkcrtime'] = old_ctime |
---|
105 | else: |
---|
106 | sysmd['linkcrtime'] = now |
---|
107 | |
---|
108 | sysmd['linkmotime'] = now |
---|
109 | metadata['tahoe'] = sysmd |
---|
110 | |
---|
111 | return metadata |
---|
112 | |
---|
113 | |
---|
114 | # TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the |
---|
115 | # contents and end by repacking them. It might be better to apply them to |
---|
116 | # the unpacked contents. |
---|
117 | |
---|
118 | class Deleter(object): |
---|
119 | def __init__(self, node, namex, must_exist=True, must_be_directory=False, must_be_file=False): |
---|
120 | self.node = node |
---|
121 | self.name = normalize(namex) |
---|
122 | self.must_exist = must_exist |
---|
123 | self.must_be_directory = must_be_directory |
---|
124 | self.must_be_file = must_be_file |
---|
125 | |
---|
126 | def modify(self, old_contents, servermap, first_time): |
---|
127 | children = self.node._unpack_contents(old_contents) |
---|
128 | if self.name not in children: |
---|
129 | if first_time and self.must_exist: |
---|
130 | raise NoSuchChildError(self.name) |
---|
131 | self.old_child = None |
---|
132 | return None |
---|
133 | self.old_child, metadata = children[self.name] |
---|
134 | |
---|
135 | # Unknown children can be removed regardless of must_be_directory or must_be_file. |
---|
136 | if self.must_be_directory and IFileNode.providedBy(self.old_child): |
---|
137 | raise ChildOfWrongTypeError("delete required a directory, not a file") |
---|
138 | if self.must_be_file and IDirectoryNode.providedBy(self.old_child): |
---|
139 | raise ChildOfWrongTypeError("delete required a file, not a directory") |
---|
140 | |
---|
141 | del children[self.name] |
---|
142 | new_contents = self.node._pack_contents(children) |
---|
143 | return new_contents |
---|
144 | |
---|
145 | |
---|
146 | class MetadataSetter(object): |
---|
147 | def __init__(self, node, namex, metadata, create_readonly_node=None): |
---|
148 | self.node = node |
---|
149 | self.name = normalize(namex) |
---|
150 | self.metadata = metadata |
---|
151 | self.create_readonly_node = create_readonly_node |
---|
152 | |
---|
153 | def modify(self, old_contents, servermap, first_time): |
---|
154 | children = self.node._unpack_contents(old_contents) |
---|
155 | name = self.name |
---|
156 | if name not in children: |
---|
157 | raise NoSuchChildError(name) |
---|
158 | |
---|
159 | now = time.time() |
---|
160 | child = children[name][0] |
---|
161 | |
---|
162 | metadata = update_metadata(children[name][1].copy(), self.metadata, now) |
---|
163 | if self.create_readonly_node and metadata.get('no-write', False): |
---|
164 | child = self.create_readonly_node(child, name) |
---|
165 | |
---|
166 | children[name] = (child, metadata) |
---|
167 | new_contents = self.node._pack_contents(children) |
---|
168 | return new_contents |
---|
169 | |
---|
170 | |
---|
171 | class Adder(object): |
---|
172 | def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None): |
---|
173 | """ |
---|
174 | :param overwrite: Either True (allow overwriting anything existing), |
---|
175 | False (don't allow overwriting), or ONLY_FILES (only files can be |
---|
176 | overwritten). |
---|
177 | """ |
---|
178 | self.node = node |
---|
179 | if entries is None: |
---|
180 | entries = {} |
---|
181 | precondition(isinstance(entries, dict), entries) |
---|
182 | precondition(overwrite in (True, False, ONLY_FILES), overwrite) |
---|
183 | # keys of 'entries' may not be normalized. |
---|
184 | self.entries = entries |
---|
185 | self.overwrite = overwrite |
---|
186 | self.create_readonly_node = create_readonly_node |
---|
187 | |
---|
188 | def set_node(self, namex, node, metadata): |
---|
189 | precondition(IFilesystemNode.providedBy(node), node) |
---|
190 | self.entries[namex] = (node, metadata) |
---|
191 | |
---|
192 | def modify(self, old_contents, servermap, first_time): |
---|
193 | children = self.node._unpack_contents(old_contents) |
---|
194 | now = time.time() |
---|
195 | for (namex, (child, new_metadata)) in list(self.entries.items()): |
---|
196 | name = normalize(namex) |
---|
197 | precondition(IFilesystemNode.providedBy(child), child) |
---|
198 | |
---|
199 | # Strictly speaking this is redundant because we would raise the |
---|
200 | # error again in _pack_normalized_children. |
---|
201 | child.raise_error() |
---|
202 | |
---|
203 | metadata = None |
---|
204 | if name in children: |
---|
205 | if not self.overwrite: |
---|
206 | raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8')) |
---|
207 | |
---|
208 | if self.overwrite == ONLY_FILES and IDirectoryNode.providedBy(children[name][0]): |
---|
209 | raise ExistingChildError("child %s already exists as a directory" % quote_output(name, encoding='utf-8')) |
---|
210 | metadata = children[name][1].copy() |
---|
211 | |
---|
212 | metadata = update_metadata(metadata, new_metadata, now) |
---|
213 | if self.create_readonly_node and metadata.get('no-write', False): |
---|
214 | child = self.create_readonly_node(child, name) |
---|
215 | |
---|
216 | children[name] = (child, metadata) |
---|
217 | new_contents = self.node._pack_contents(children) |
---|
218 | return new_contents |
---|
219 | |
---|
220 | def _encrypt_rw_uri(writekey, rw_uri): |
---|
221 | precondition(isinstance(rw_uri, bytes), rw_uri) |
---|
222 | precondition(isinstance(writekey, bytes), writekey) |
---|
223 | |
---|
224 | salt = hashutil.mutable_rwcap_salt_hash(rw_uri) |
---|
225 | key = hashutil.mutable_rwcap_key_hash(salt, writekey) |
---|
226 | encryptor = aes.create_encryptor(key) |
---|
227 | crypttext = aes.encrypt_data(encryptor, rw_uri) |
---|
228 | mac = hashutil.hmac(key, salt + crypttext) |
---|
229 | assert len(mac) == 32 |
---|
230 | return salt + crypttext + mac |
---|
231 | # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still |
---|
232 | # produce it for the sake of older readers. |
---|
233 | |
---|
234 | def pack_children(childrenx, writekey, deep_immutable=False): |
---|
235 | # initial_children must have metadata (i.e. {} instead of None) |
---|
236 | children = {} |
---|
237 | for (namex, (node, metadata)) in list(childrenx.items()): |
---|
238 | precondition(isinstance(metadata, dict), |
---|
239 | "directory creation requires metadata to be a dict, not None", metadata) |
---|
240 | children[normalize(namex)] = (node, metadata) |
---|
241 | |
---|
242 | return _pack_normalized_children(children, writekey=writekey, deep_immutable=deep_immutable) |
---|
243 | |
---|
244 | |
---|
245 | ZERO_LEN_NETSTR=netstring(b'') |
---|
246 | def _pack_normalized_children(children, writekey, deep_immutable=False): |
---|
247 | """Take a dict that maps: |
---|
248 | children[unicode_nfc_name] = (IFileSystemNode, metadata_dict) |
---|
249 | and pack it into a single string, for use as the contents of the backing |
---|
250 | file. This is the same format as is returned by _unpack_contents. I also |
---|
251 | accept an AuxValueDict, in which case I'll use the auxilliary cached data |
---|
252 | as the pre-packed entry, which is faster than re-packing everything each |
---|
253 | time. |
---|
254 | |
---|
255 | If writekey is provided then I will superencrypt the child's writecap with |
---|
256 | writekey. |
---|
257 | |
---|
258 | If deep_immutable is True, I will require that all my children are deeply |
---|
259 | immutable, and will raise a MustBeDeepImmutableError if not. |
---|
260 | """ |
---|
261 | precondition((writekey is None) or isinstance(writekey, bytes), writekey) |
---|
262 | |
---|
263 | has_aux = isinstance(children, AuxValueDict) |
---|
264 | entries = [] |
---|
265 | for name in sorted(children.keys()): |
---|
266 | assert isinstance(name, str) |
---|
267 | entry = None |
---|
268 | (child, metadata) = children[name] |
---|
269 | child.raise_error() |
---|
270 | if deep_immutable and not child.is_allowed_in_immutable_directory(): |
---|
271 | raise MustBeDeepImmutableError( |
---|
272 | "child %r is not allowed in an immutable directory" % (name,), |
---|
273 | name) |
---|
274 | if has_aux: |
---|
275 | entry = children.get_aux(name) |
---|
276 | if not entry: |
---|
277 | assert IFilesystemNode.providedBy(child), (name,child) |
---|
278 | assert isinstance(metadata, dict) |
---|
279 | rw_uri = child.get_write_uri() |
---|
280 | if rw_uri is None: |
---|
281 | rw_uri = b"" |
---|
282 | assert isinstance(rw_uri, bytes), rw_uri |
---|
283 | |
---|
284 | # should be prevented by MustBeDeepImmutableError check above |
---|
285 | assert not (rw_uri and deep_immutable) |
---|
286 | |
---|
287 | ro_uri = child.get_readonly_uri() |
---|
288 | if ro_uri is None: |
---|
289 | ro_uri = b"" |
---|
290 | assert isinstance(ro_uri, bytes), ro_uri |
---|
291 | if writekey is not None: |
---|
292 | writecap = netstring(_encrypt_rw_uri(writekey, rw_uri)) |
---|
293 | else: |
---|
294 | writecap = ZERO_LEN_NETSTR |
---|
295 | entry = b"".join([netstring(name.encode("utf-8")), |
---|
296 | netstring(strip_prefix_for_ro(ro_uri, deep_immutable)), |
---|
297 | writecap, |
---|
298 | netstring(json.dumps(metadata).encode("utf-8"))]) |
---|
299 | entries.append(netstring(entry)) |
---|
300 | return b"".join(entries) |
---|
301 | |
---|
302 | @implementer(IDirectoryNode, ICheckable, IDeepCheckable) |
---|
303 | class DirectoryNode(object): |
---|
304 | filenode_class = MutableFileNode |
---|
305 | |
---|
306 | def __init__(self, filenode, nodemaker, uploader): |
---|
307 | assert IFileNode.providedBy(filenode), filenode |
---|
308 | assert not IDirectoryNode.providedBy(filenode), filenode |
---|
309 | self._node = filenode |
---|
310 | filenode_cap = filenode.get_cap() |
---|
311 | self._uri = wrap_dirnode_cap(filenode_cap) |
---|
312 | self._nodemaker = nodemaker |
---|
313 | self._uploader = uploader |
---|
314 | |
---|
315 | def __repr__(self): |
---|
316 | return "<%s %s-%s %s>" % (self.__class__.__name__, |
---|
317 | self.is_readonly() and "RO" or "RW", |
---|
318 | self.is_mutable() and "MUT" or "IMM", |
---|
319 | hasattr(self, '_uri') and str(self._uri.abbrev(), "utf-8")) |
---|
320 | |
---|
321 | def get_size(self): |
---|
322 | """Return the size of our backing mutable file, in bytes, if we've |
---|
323 | fetched it. Otherwise return None. This returns synchronously.""" |
---|
324 | return self._node.get_size() |
---|
325 | |
---|
326 | def get_current_size(self): |
---|
327 | """Calculate the size of our backing mutable file, in bytes. Returns |
---|
328 | a Deferred that fires with the result.""" |
---|
329 | return self._node.get_current_size() |
---|
330 | |
---|
331 | def _read(self): |
---|
332 | if self._node.is_mutable(): |
---|
333 | # use the IMutableFileNode API. |
---|
334 | d = self._node.download_best_version() |
---|
335 | else: |
---|
336 | d = download_to_data(self._node) |
---|
337 | d.addCallback(self._unpack_contents) |
---|
338 | return d |
---|
339 | |
---|
340 | def _decrypt_rwcapdata(self, encwrcap): |
---|
341 | salt = encwrcap[:16] |
---|
342 | crypttext = encwrcap[16:-32] |
---|
343 | key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey()) |
---|
344 | encryptor = aes.create_decryptor(key) |
---|
345 | plaintext = aes.decrypt_data(encryptor, crypttext) |
---|
346 | return plaintext |
---|
347 | |
---|
348 | def _create_and_validate_node(self, rw_uri, ro_uri, name): |
---|
349 | # name is just for error reporting |
---|
350 | node = self._nodemaker.create_from_cap(rw_uri, ro_uri, |
---|
351 | deep_immutable=not self.is_mutable(), |
---|
352 | name=name) |
---|
353 | node.raise_error() |
---|
354 | return node |
---|
355 | |
---|
356 | def _create_readonly_node(self, node, name): |
---|
357 | # name is just for error reporting |
---|
358 | if not node.is_unknown() and node.is_readonly(): |
---|
359 | return node |
---|
360 | return self._create_and_validate_node(None, node.get_readonly_uri(), name=name) |
---|
361 | |
---|
362 | def _unpack_contents(self, data): |
---|
363 | # the directory is serialized as a list of netstrings, one per child. |
---|
364 | # Each child is serialized as a list of four netstrings: (name, ro_uri, |
---|
365 | # rwcapdata, metadata), in which the name, ro_uri, metadata are in |
---|
366 | # cleartext. The 'name' is UTF-8 encoded, and should be normalized to NFC. |
---|
367 | # The rwcapdata is formatted as: |
---|
368 | # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac) |
---|
369 | assert isinstance(data, bytes), (repr(data), type(data)) |
---|
370 | # an empty directory is serialized as an empty string |
---|
371 | if data == b"": |
---|
372 | return AuxValueDict() |
---|
373 | writeable = not self.is_readonly() |
---|
374 | mutable = self.is_mutable() |
---|
375 | children = AuxValueDict() |
---|
376 | position = 0 |
---|
377 | while position < len(data): |
---|
378 | entries, position = split_netstring(data, 1, position) |
---|
379 | entry = entries[0] |
---|
380 | (namex_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4) |
---|
381 | if not mutable and len(rwcapdata) > 0: |
---|
382 | raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty") |
---|
383 | |
---|
384 | # A name containing characters that are unassigned in one version of Unicode might |
---|
385 | # not be normalized wrt a later version. See the note in section 'Normalization Stability' |
---|
386 | # at <http://unicode.org/policies/stability_policy.html>. |
---|
387 | # Therefore we normalize names going both in and out of directories. |
---|
388 | name = normalize(namex_utf8.decode("utf-8")) |
---|
389 | |
---|
390 | rw_uri = b"" |
---|
391 | if writeable: |
---|
392 | rw_uri = self._decrypt_rwcapdata(rwcapdata) |
---|
393 | |
---|
394 | # Since the encryption uses CTR mode, it currently leaks the length of the |
---|
395 | # plaintext rw_uri -- and therefore whether it is present, i.e. whether the |
---|
396 | # dirnode is writeable (ticket #925). By stripping trailing spaces in |
---|
397 | # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak. |
---|
398 | # ro_uri is treated in the same way for consistency. |
---|
399 | # rw_uri and ro_uri will be either None or a non-empty string. |
---|
400 | |
---|
401 | rw_uri = rw_uri.rstrip(b' ') or None |
---|
402 | ro_uri = ro_uri.rstrip(b' ') or None |
---|
403 | |
---|
404 | try: |
---|
405 | child = self._create_and_validate_node(rw_uri, ro_uri, name) |
---|
406 | if mutable or child.is_allowed_in_immutable_directory(): |
---|
407 | metadata = json.loads(metadata_s) |
---|
408 | assert isinstance(metadata, dict) |
---|
409 | children[name] = (child, metadata) |
---|
410 | children.set_with_aux(name, (child, metadata), auxilliary=entry) |
---|
411 | else: |
---|
412 | log.msg(format="mutable cap for child %(name)s unpacked from an immutable directory", |
---|
413 | name=quote_output(name, encoding='utf-8'), |
---|
414 | facility="tahoe.webish", level=log.UNUSUAL) |
---|
415 | except CapConstraintError as e: |
---|
416 | log.msg(format="unmet constraint on cap for child %(name)s unpacked from a directory:\n" |
---|
417 | "%(message)s", message=e.args[0], name=quote_output(name, encoding='utf-8'), |
---|
418 | facility="tahoe.webish", level=log.UNUSUAL) |
---|
419 | |
---|
420 | return children |
---|
421 | |
---|
422 | def _pack_contents(self, children): |
---|
423 | # expects children in the same format as _unpack_contents returns |
---|
424 | return _pack_normalized_children(children, self._node.get_writekey()) |
---|
425 | |
---|
426 | def is_readonly(self): |
---|
427 | return self._node.is_readonly() |
---|
428 | |
---|
429 | def is_mutable(self): |
---|
430 | return self._node.is_mutable() |
---|
431 | |
---|
432 | def is_unknown(self): |
---|
433 | return False |
---|
434 | |
---|
435 | def is_allowed_in_immutable_directory(self): |
---|
436 | return not self._node.is_mutable() |
---|
437 | |
---|
438 | def raise_error(self): |
---|
439 | pass |
---|
440 | |
---|
441 | def get_uri(self): |
---|
442 | return self._uri.to_string() |
---|
443 | |
---|
444 | def get_write_uri(self): |
---|
445 | if self.is_readonly(): |
---|
446 | return None |
---|
447 | return self._uri.to_string() |
---|
448 | |
---|
449 | def get_readonly_uri(self): |
---|
450 | return self._uri.get_readonly().to_string() |
---|
451 | |
---|
452 | def get_cap(self): |
---|
453 | return self._uri |
---|
454 | |
---|
455 | def get_readcap(self): |
---|
456 | return self._uri.get_readonly() |
---|
457 | |
---|
458 | def get_verify_cap(self): |
---|
459 | return self._uri.get_verify_cap() |
---|
460 | |
---|
461 | def get_repair_cap(self): |
---|
462 | if self._node.is_readonly(): |
---|
463 | return None # readonly (mutable) dirnodes are not yet repairable |
---|
464 | return self._uri |
---|
465 | |
---|
466 | def get_storage_index(self): |
---|
467 | return self._uri.get_storage_index() |
---|
468 | |
---|
469 | def check(self, monitor, verify=False, add_lease=False): |
---|
470 | """Perform a file check. See IChecker.check for details.""" |
---|
471 | return self._node.check(monitor, verify, add_lease) |
---|
472 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
---|
473 | return self._node.check_and_repair(monitor, verify, add_lease) |
---|
474 | |
---|
475 | def list(self): |
---|
476 | """I return a Deferred that fires with a dictionary mapping child |
---|
477 | name to a tuple of (IFilesystemNode, metadata).""" |
---|
478 | return self._read() |
---|
479 | |
---|
480 | def has_child(self, namex): |
---|
481 | """I return a Deferred that fires with a boolean, True if there |
---|
482 | exists a child of the given name, False if not.""" |
---|
483 | name = normalize(namex) |
---|
484 | d = self._read() |
---|
485 | d.addCallback(lambda children: name in children) |
---|
486 | return d |
---|
487 | |
---|
488 | def _get(self, children, name): |
---|
489 | child = children.get(name) |
---|
490 | if child is None: |
---|
491 | raise NoSuchChildError(name) |
---|
492 | return child[0] |
---|
493 | |
---|
494 | def _get_with_metadata(self, children, name): |
---|
495 | child = children.get(name) |
---|
496 | if child is None: |
---|
497 | raise NoSuchChildError(name) |
---|
498 | return child |
---|
499 | |
---|
500 | def get(self, namex): |
---|
501 | """I return a Deferred that fires with the named child node, |
---|
502 | which is an IFilesystemNode.""" |
---|
503 | name = normalize(namex) |
---|
504 | d = self._read() |
---|
505 | d.addCallback(self._get, name) |
---|
506 | return d |
---|
507 | |
---|
508 | def get_child_and_metadata(self, namex): |
---|
509 | """I return a Deferred that fires with the (node, metadata) pair for |
---|
510 | the named child. The node is an IFilesystemNode, and the metadata |
---|
511 | is a dictionary.""" |
---|
512 | name = normalize(namex) |
---|
513 | d = self._read() |
---|
514 | d.addCallback(self._get_with_metadata, name) |
---|
515 | return d |
---|
516 | |
---|
517 | def get_metadata_for(self, namex): |
---|
518 | name = normalize(namex) |
---|
519 | d = self._read() |
---|
520 | d.addCallback(lambda children: children[name][1]) |
---|
521 | return d |
---|
522 | |
---|
523 | def set_metadata_for(self, namex, metadata): |
---|
524 | name = normalize(namex) |
---|
525 | if self.is_readonly(): |
---|
526 | return defer.fail(NotWriteableError()) |
---|
527 | assert isinstance(metadata, dict) |
---|
528 | s = MetadataSetter(self, name, metadata, |
---|
529 | create_readonly_node=self._create_readonly_node) |
---|
530 | d = self._node.modify(s.modify) |
---|
531 | d.addCallback(lambda res: self) |
---|
532 | return d |
---|
533 | |
---|
534 | def get_child_at_path(self, pathx): |
---|
535 | """Transform a child path into an IFilesystemNode. |
---|
536 | |
---|
537 | I perform a recursive series of 'get' operations to find the named |
---|
538 | descendant node. I return a Deferred that fires with the node, or |
---|
539 | errbacks with IndexError if the node could not be found. |
---|
540 | |
---|
541 | The path can be either a single string (slash-separated) or a list of |
---|
542 | path-name elements. |
---|
543 | """ |
---|
544 | d = self.get_child_and_metadata_at_path(pathx) |
---|
545 | d.addCallback(lambda node_and_metadata: node_and_metadata[0]) |
---|
546 | return d |
---|
547 | |
---|
548 | def get_child_and_metadata_at_path(self, pathx): |
---|
549 | """Transform a child path into an IFilesystemNode and |
---|
550 | a metadata dictionary from the last edge that was traversed. |
---|
551 | """ |
---|
552 | |
---|
553 | if not pathx: |
---|
554 | return defer.succeed((self, {})) |
---|
555 | if isinstance(pathx, (list, tuple)): |
---|
556 | pass |
---|
557 | else: |
---|
558 | pathx = pathx.split("/") |
---|
559 | for p in pathx: |
---|
560 | assert isinstance(p, str), p |
---|
561 | childnamex = pathx[0] |
---|
562 | remaining_pathx = pathx[1:] |
---|
563 | if remaining_pathx: |
---|
564 | d = self.get(childnamex) |
---|
565 | d.addCallback(lambda node: |
---|
566 | node.get_child_and_metadata_at_path(remaining_pathx)) |
---|
567 | return d |
---|
568 | d = self.get_child_and_metadata(childnamex) |
---|
569 | return d |
---|
570 | |
---|
571 | def set_uri(self, namex, writecap, readcap=None, metadata=None, overwrite=True): |
---|
572 | precondition(isinstance(writecap, (bytes, type(None))), writecap) |
---|
573 | precondition(isinstance(readcap, (bytes, type(None))), readcap) |
---|
574 | |
---|
575 | # We now allow packing unknown nodes, provided they are valid |
---|
576 | # for this type of directory. |
---|
577 | child_node = self._create_and_validate_node(writecap, readcap, namex) |
---|
578 | d = self.set_node(namex, child_node, metadata, overwrite) |
---|
579 | d.addCallback(lambda res: child_node) |
---|
580 | return d |
---|
581 | |
---|
582 | def set_children(self, entries, overwrite=True): |
---|
583 | # this takes URIs |
---|
584 | a = Adder(self, overwrite=overwrite, |
---|
585 | create_readonly_node=self._create_readonly_node) |
---|
586 | for (namex, e) in entries.items(): |
---|
587 | assert isinstance(namex, str), namex |
---|
588 | if len(e) == 2: |
---|
589 | writecap, readcap = e |
---|
590 | metadata = None |
---|
591 | else: |
---|
592 | assert len(e) == 3 |
---|
593 | writecap, readcap, metadata = e |
---|
594 | precondition(isinstance(writecap, (bytes,type(None))), writecap) |
---|
595 | precondition(isinstance(readcap, (bytes,type(None))), readcap) |
---|
596 | |
---|
597 | # We now allow packing unknown nodes, provided they are valid |
---|
598 | # for this type of directory. |
---|
599 | child_node = self._create_and_validate_node(writecap, readcap, namex) |
---|
600 | a.set_node(namex, child_node, metadata) |
---|
601 | d = self._node.modify(a.modify) |
---|
602 | d.addCallback(lambda ign: self) |
---|
603 | return d |
---|
604 | |
---|
605 | def set_node(self, namex, child, metadata=None, overwrite=True): |
---|
606 | """I add a child at the specific name. I return a Deferred that fires |
---|
607 | when the operation finishes. This Deferred will fire with the child |
---|
608 | node that was just added. I will replace any existing child of the |
---|
609 | same name. |
---|
610 | |
---|
611 | If this directory node is read-only, the Deferred will errback with a |
---|
612 | NotWriteableError.""" |
---|
613 | |
---|
614 | precondition(IFilesystemNode.providedBy(child), child) |
---|
615 | |
---|
616 | if self.is_readonly(): |
---|
617 | return defer.fail(NotWriteableError()) |
---|
618 | assert IFilesystemNode.providedBy(child), child |
---|
619 | a = Adder(self, overwrite=overwrite, |
---|
620 | create_readonly_node=self._create_readonly_node) |
---|
621 | a.set_node(namex, child, metadata) |
---|
622 | d = self._node.modify(a.modify) |
---|
623 | d.addCallback(lambda res: child) |
---|
624 | return d |
---|
625 | |
---|
626 | def set_nodes(self, entries, overwrite=True): |
---|
627 | precondition(isinstance(entries, dict), entries) |
---|
628 | if self.is_readonly(): |
---|
629 | return defer.fail(NotWriteableError()) |
---|
630 | a = Adder(self, entries, overwrite=overwrite, |
---|
631 | create_readonly_node=self._create_readonly_node) |
---|
632 | d = self._node.modify(a.modify) |
---|
633 | d.addCallback(lambda res: self) |
---|
634 | return d |
---|
635 | |
---|
636 | |
---|
637 | def add_file(self, namex, uploadable, metadata=None, overwrite=True): |
---|
638 | """I upload a file (using the given IUploadable), then attach the |
---|
639 | resulting FileNode to the directory at the given name. I return a |
---|
640 | Deferred that fires (with the IFileNode of the uploaded file) when |
---|
641 | the operation completes.""" |
---|
642 | with ADD_FILE(name=namex, metadata=metadata, overwrite=overwrite).context(): |
---|
643 | name = normalize(namex) |
---|
644 | if self.is_readonly(): |
---|
645 | d = DeferredContext(defer.fail(NotWriteableError())) |
---|
646 | else: |
---|
647 | # XXX should pass reactor arg |
---|
648 | d = DeferredContext(self._uploader.upload(uploadable)) |
---|
649 | d.addCallback(lambda results: |
---|
650 | self._create_and_validate_node(results.get_uri(), None, |
---|
651 | name)) |
---|
652 | d.addCallback(lambda node: |
---|
653 | self.set_node(name, node, metadata, overwrite)) |
---|
654 | |
---|
655 | return d.addActionFinish() |
---|
656 | |
---|
657 | def delete(self, namex, must_exist=True, must_be_directory=False, must_be_file=False): |
---|
658 | """I remove the child at the specific name. I return a Deferred that |
---|
659 | fires (with the node just removed) when the operation finishes.""" |
---|
660 | if self.is_readonly(): |
---|
661 | return defer.fail(NotWriteableError()) |
---|
662 | deleter = Deleter(self, namex, must_exist=must_exist, |
---|
663 | must_be_directory=must_be_directory, must_be_file=must_be_file) |
---|
664 | d = self._node.modify(deleter.modify) |
---|
665 | d.addCallback(lambda res: deleter.old_child) |
---|
666 | return d |
---|
667 | |
---|
668 | # XXX: Too many arguments? Worthwhile to break into mutable/immutable? |
---|
669 | def create_subdirectory(self, namex, initial_children=None, overwrite=True, |
---|
670 | mutable=True, mutable_version=None, metadata=None): |
---|
671 | if initial_children is None: |
---|
672 | initial_children = {} |
---|
673 | name = normalize(namex) |
---|
674 | if self.is_readonly(): |
---|
675 | return defer.fail(NotWriteableError()) |
---|
676 | if mutable: |
---|
677 | if mutable_version: |
---|
678 | d = self._nodemaker.create_new_mutable_directory(initial_children, |
---|
679 | version=mutable_version) |
---|
680 | else: |
---|
681 | d = self._nodemaker.create_new_mutable_directory(initial_children) |
---|
682 | else: |
---|
683 | # mutable version doesn't make sense for immmutable directories. |
---|
684 | assert mutable_version is None |
---|
685 | d = self._nodemaker.create_immutable_directory(initial_children) |
---|
686 | def _created(child): |
---|
687 | entries = {name: (child, metadata)} |
---|
688 | a = Adder(self, entries, overwrite=overwrite, |
---|
689 | create_readonly_node=self._create_readonly_node) |
---|
690 | d = self._node.modify(a.modify) |
---|
691 | d.addCallback(lambda res: child) |
---|
692 | return d |
---|
693 | d.addCallback(_created) |
---|
694 | return d |
---|
695 | |
---|
696 | def move_child_to(self, current_child_namex, new_parent, |
---|
697 | new_child_namex=None, overwrite=True): |
---|
698 | """ |
---|
699 | I take one of my child links and move it to a new parent. The child |
---|
700 | link is referenced by name. In the new parent, the child link will live |
---|
701 | at 'new_child_namex', which defaults to 'current_child_namex'. I return |
---|
702 | a Deferred that fires when the operation finishes. |
---|
703 | 'new_child_namex' and 'current_child_namex' need not be normalized. |
---|
704 | |
---|
705 | The overwrite parameter may be True (overwrite any existing child), |
---|
706 | False (error if the new child link already exists), or ONLY_FILES |
---|
707 | (error if the new child link exists and points to a directory). |
---|
708 | """ |
---|
709 | if self.is_readonly() or new_parent.is_readonly(): |
---|
710 | return defer.fail(NotWriteableError()) |
---|
711 | |
---|
712 | current_child_name = normalize(current_child_namex) |
---|
713 | if new_child_namex is None: |
---|
714 | new_child_name = current_child_name |
---|
715 | else: |
---|
716 | new_child_name = normalize(new_child_namex) |
---|
717 | |
---|
718 | from_uri = self.get_write_uri() |
---|
719 | if new_parent.get_write_uri() == from_uri and new_child_name == current_child_name: |
---|
720 | # needed for correctness, otherwise we would delete the child |
---|
721 | return defer.succeed("redundant rename/relink") |
---|
722 | |
---|
723 | d = self.get_child_and_metadata(current_child_name) |
---|
724 | def _got_child(child_and_metadata): |
---|
725 | (child, metadata) = child_and_metadata |
---|
726 | return new_parent.set_node(new_child_name, child, metadata, |
---|
727 | overwrite=overwrite) |
---|
728 | d.addCallback(_got_child) |
---|
729 | d.addCallback(lambda child: self.delete(current_child_name)) |
---|
730 | return d |
---|
731 | |
---|
732 | |
---|
733 | def deep_traverse(self, walker): |
---|
734 | """Perform a recursive walk, using this dirnode as a root, notifying |
---|
735 | the 'walker' instance of everything I encounter. |
---|
736 | |
---|
737 | I call walker.enter_directory(parent, children) once for each dirnode |
---|
738 | I visit, immediately after retrieving the list of children. I pass in |
---|
739 | the parent dirnode and the dict of childname->(childnode,metadata). |
---|
740 | This function should *not* traverse the children: I will do that. |
---|
741 | enter_directory() is most useful for the deep-stats number that |
---|
742 | counts how large a directory is. |
---|
743 | |
---|
744 | I call walker.add_node(node, path) for each node (both files and |
---|
745 | directories) I can reach. Most work should be done here. |
---|
746 | |
---|
747 | I avoid loops by keeping track of verifier-caps and refusing to call |
---|
748 | walker.add_node() or traverse a node that I've seen before. This |
---|
749 | means that any file or directory will only be given to the walker |
---|
750 | once. If files or directories are referenced multiple times by a |
---|
751 | directory structure, this may appear to under-count or miss some of |
---|
752 | them. |
---|
753 | |
---|
754 | I return a Monitor which can be used to wait for the operation to |
---|
755 | finish, learn about its progress, or cancel the operation. |
---|
756 | """ |
---|
757 | |
---|
758 | # this is just a tree-walker, except that following each edge |
---|
759 | # requires a Deferred. We used to use a ConcurrencyLimiter to limit |
---|
760 | # fanout to 10 simultaneous operations, but the memory load of the |
---|
761 | # queued operations was excessive (in one case, with 330k dirnodes, |
---|
762 | # it caused the process to run into the 3.0GB-ish per-process 32bit |
---|
763 | # linux memory limit, and crashed). So we use a single big Deferred |
---|
764 | # chain, and do a strict depth-first traversal, one node at a time. |
---|
765 | # This can be slower, because we aren't pipelining directory reads, |
---|
766 | # but it brought the memory footprint down by roughly 50%. |
---|
767 | |
---|
768 | monitor = Monitor() |
---|
769 | walker.set_monitor(monitor) |
---|
770 | |
---|
771 | found = set([self.get_verify_cap()]) |
---|
772 | d = self._deep_traverse_dirnode(self, [], walker, monitor, found) |
---|
773 | d.addCallback(lambda ignored: walker.finish()) |
---|
774 | d.addBoth(monitor.finish) |
---|
775 | d.addErrback(lambda f: None) |
---|
776 | |
---|
777 | return monitor |
---|
778 | |
---|
779 | def _deep_traverse_dirnode(self, node, path, walker, monitor, found): |
---|
780 | # process this directory, then walk its children |
---|
781 | monitor.raise_if_cancelled() |
---|
782 | d = defer.maybeDeferred(walker.add_node, node, path) |
---|
783 | d.addCallback(lambda ignored: node.list()) |
---|
784 | d.addCallback(self._deep_traverse_dirnode_children, node, path, |
---|
785 | walker, monitor, found) |
---|
786 | return d |
---|
787 | |
---|
788 | def _deep_traverse_dirnode_children(self, children, parent, path, |
---|
789 | walker, monitor, found): |
---|
790 | monitor.raise_if_cancelled() |
---|
791 | d = defer.maybeDeferred(walker.enter_directory, parent, children) |
---|
792 | # we process file-like children first, so we can drop their FileNode |
---|
793 | # objects as quickly as possible. Tests suggest that a FileNode (held |
---|
794 | # in the client's nodecache) consumes about 2440 bytes. dirnodes (not |
---|
795 | # in the nodecache) seem to consume about 2000 bytes. |
---|
796 | dirkids = [] |
---|
797 | filekids = [] |
---|
798 | for name, (child, metadata) in sorted(children.items()): |
---|
799 | childpath = path + [name] |
---|
800 | if isinstance(child, UnknownNode): |
---|
801 | walker.add_node(child, childpath) |
---|
802 | continue |
---|
803 | verifier = child.get_verify_cap() |
---|
804 | # allow LIT files (for which verifier==None) to be processed |
---|
805 | if (verifier is not None) and (verifier in found): |
---|
806 | continue |
---|
807 | found.add(verifier) |
---|
808 | if IDirectoryNode.providedBy(child): |
---|
809 | dirkids.append( (child, childpath) ) |
---|
810 | else: |
---|
811 | filekids.append( (child, childpath) ) |
---|
812 | for i, (child, childpath) in enumerate(filekids): |
---|
813 | d.addCallback(lambda ignored, child=child, childpath=childpath: |
---|
814 | walker.add_node(child, childpath)) |
---|
815 | # to work around the Deferred tail-recursion problem |
---|
816 | # (specifically the defer.succeed flavor) requires us to avoid |
---|
817 | # doing more than 158 LIT files in a row. We insert a turn break |
---|
818 | # once every 100 files (LIT or CHK) to preserve some stack space |
---|
819 | # for other code. This is a different expression of the same |
---|
820 | # Twisted problem as in #237. |
---|
821 | if i % 100 == 99: |
---|
822 | d.addCallback(lambda ignored: fireEventually()) |
---|
823 | for (child, childpath) in dirkids: |
---|
824 | d.addCallback(lambda ignored, child=child, childpath=childpath: |
---|
825 | self._deep_traverse_dirnode(child, childpath, |
---|
826 | walker, monitor, |
---|
827 | found)) |
---|
828 | return d |
---|
829 | |
---|
830 | |
---|
831 | def build_manifest(self): |
---|
832 | """Return a Monitor, with a ['status'] that will be a list of (path, |
---|
833 | cap) tuples, for all nodes (directories and files) reachable from |
---|
834 | this one.""" |
---|
835 | walker = ManifestWalker(self) |
---|
836 | return self.deep_traverse(walker) |
---|
837 | |
---|
838 | def start_deep_stats(self): |
---|
839 | # Since deep_traverse tracks verifier caps, we avoid double-counting |
---|
840 | # children for which we've got both a write-cap and a read-cap |
---|
841 | return self.deep_traverse(DeepStats(self)) |
---|
842 | |
---|
843 | def start_deep_check(self, verify=False, add_lease=False): |
---|
844 | return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease)) |
---|
845 | |
---|
846 | def start_deep_check_and_repair(self, verify=False, add_lease=False): |
---|
847 | return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease)) |
---|
848 | |
---|
849 | |
---|
850 | class ManifestWalker(DeepStats): |
---|
851 | def __init__(self, origin): |
---|
852 | DeepStats.__init__(self, origin) |
---|
853 | self.manifest = [] |
---|
854 | self.storage_index_strings = set() |
---|
855 | self.verifycaps = set() |
---|
856 | |
---|
857 | def add_node(self, node, path): |
---|
858 | self.manifest.append( (tuple(path), node.get_uri()) ) |
---|
859 | si = node.get_storage_index() |
---|
860 | if si: |
---|
861 | self.storage_index_strings.add(base32.b2a(si)) |
---|
862 | v = node.get_verify_cap() |
---|
863 | if v: |
---|
864 | self.verifycaps.add(v.to_string()) |
---|
865 | return DeepStats.add_node(self, node, path) |
---|
866 | |
---|
867 | def get_results(self): |
---|
868 | stats = DeepStats.get_results(self) |
---|
869 | return {"manifest": self.manifest, |
---|
870 | "verifycaps": self.verifycaps, |
---|
871 | "storage-index": self.storage_index_strings, |
---|
872 | "stats": stats, |
---|
873 | } |
---|
874 | |
---|
875 | |
---|
876 | class DeepChecker(object): |
---|
877 | def __init__(self, root, verify, repair, add_lease): |
---|
878 | root_si = root.get_storage_index() |
---|
879 | if root_si: |
---|
880 | root_si_base32 = base32.b2a(root_si) |
---|
881 | else: |
---|
882 | root_si_base32 = "" |
---|
883 | self._lp = log.msg(format="deep-check starting (%(si)s)," |
---|
884 | " verify=%(verify)s, repair=%(repair)s", |
---|
885 | si=root_si_base32, verify=verify, repair=repair) |
---|
886 | self._verify = verify |
---|
887 | self._repair = repair |
---|
888 | self._add_lease = add_lease |
---|
889 | if repair: |
---|
890 | self._results = DeepCheckAndRepairResults(root_si) |
---|
891 | else: |
---|
892 | self._results = DeepCheckResults(root_si) |
---|
893 | self._stats = DeepStats(root) |
---|
894 | |
---|
895 | def set_monitor(self, monitor): |
---|
896 | self.monitor = monitor |
---|
897 | monitor.set_status(self._results) |
---|
898 | |
---|
899 | def add_node(self, node, childpath): |
---|
900 | if self._repair: |
---|
901 | d = node.check_and_repair(self.monitor, self._verify, self._add_lease) |
---|
902 | d.addCallback(self._results.add_check_and_repair, childpath) |
---|
903 | else: |
---|
904 | d = node.check(self.monitor, self._verify, self._add_lease) |
---|
905 | d.addCallback(self._results.add_check, childpath) |
---|
906 | d.addCallback(lambda ignored: self._stats.add_node(node, childpath)) |
---|
907 | return d |
---|
908 | |
---|
909 | def enter_directory(self, parent, children): |
---|
910 | return self._stats.enter_directory(parent, children) |
---|
911 | |
---|
912 | def finish(self): |
---|
913 | log.msg("deep-check done", parent=self._lp) |
---|
914 | self._results.update_stats(self._stats.get_results()) |
---|
915 | return self._results |
---|
916 | |
---|
917 | |
---|
918 | # use client.create_dirnode() to make one of these |
---|