source: trunk/src/allmydata/dirnode.py

Last change on this file was 4da491a, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-03-11T20:37:27Z

remove more usage of "future"

  • Property mode set to 100644
File size: 36.6 KB
Line 
1"""Directory Node implementation.
2
3Ported to Python 3.
4"""
5
6import time
7
8from zope.interface import implementer
9from twisted.internet import defer
10from foolscap.api import fireEventually
11
12from allmydata.crypto import aes
13from allmydata.deep_stats import DeepStats
14from allmydata.mutable.common import NotWriteableError
15from allmydata.mutable.filenode import MutableFileNode
16from allmydata.unknown import UnknownNode, strip_prefix_for_ro
17from allmydata.interfaces import IFilesystemNode, IDirectoryNode, IFileNode, \
18     ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
19     MustBeDeepImmutableError, CapConstraintError, ChildOfWrongTypeError
20from allmydata.check_results import DeepCheckResults, \
21     DeepCheckAndRepairResults
22from allmydata.monitor import Monitor
23from allmydata.util import hashutil, base32, log, jsonbytes as json
24from allmydata.util.encodingutil import quote_output, normalize
25from allmydata.util.assertutil import precondition
26from allmydata.util.netstring import netstring, split_netstring
27from allmydata.util.consumer import download_to_data
28from allmydata.uri import wrap_dirnode_cap
29from allmydata.util.dictutil import AuxValueDict
30
31from eliot import (
32    ActionType,
33    Field,
34)
35from eliot.twisted import (
36    DeferredContext,
37)
38
39NAME = Field.for_types(
40    "name",
41    [str],
42    "The name linking the parent to this node.",
43)
44
45METADATA = Field.for_types(
46    "metadata",
47    [dict],
48    "Data about a node.",
49)
50
51OVERWRITE = Field.for_types(
52    "overwrite",
53    [bool],
54    "True to replace an existing file of the same name, "
55    "false to fail with a collision error.",
56)
57
58ADD_FILE = ActionType(
59    "dirnode:add-file",
60    [NAME, METADATA, OVERWRITE],
61    [],
62    "Add a new file as a child of a directory.",
63)
64
65
66class _OnlyFiles(object):
67    """Marker for replacement option of only replacing files."""
68
69ONLY_FILES = _OnlyFiles()
70
71
72def update_metadata(metadata, new_metadata, now):
73    """Updates 'metadata' in-place with the information in 'new_metadata'.
74
75    Timestamps are set according to the time 'now'.
76    """
77
78    if metadata is None:
79        metadata = {}
80
81    old_ctime = None
82    if 'ctime' in metadata:
83        old_ctime = metadata['ctime']
84
85    if new_metadata is not None:
86        # Overwrite all metadata.
87        newmd = new_metadata.copy()
88
89        # Except 'tahoe'.
90        if 'tahoe' in newmd:
91            del newmd['tahoe']
92        if 'tahoe' in metadata:
93            newmd['tahoe'] = metadata['tahoe']
94
95        metadata = newmd
96
97    # update timestamps
98    sysmd = metadata.get('tahoe', {})
99    if 'linkcrtime' not in sysmd:
100        # In Tahoe < 1.4.0 we used the word 'ctime' to mean what Tahoe >= 1.4.0
101        # calls 'linkcrtime'. This field is only used if it was in the old metadata,
102        # and 'tahoe:linkcrtime' was not.
103        if old_ctime is not None:
104            sysmd['linkcrtime'] = old_ctime
105        else:
106            sysmd['linkcrtime'] = now
107
108    sysmd['linkmotime'] = now
109    metadata['tahoe'] = sysmd
110
111    return metadata
112
113
114# TODO: {Deleter,MetadataSetter,Adder}.modify all start by unpacking the
115# contents and end by repacking them. It might be better to apply them to
116# the unpacked contents.
117
118class Deleter(object):
119    def __init__(self, node, namex, must_exist=True, must_be_directory=False, must_be_file=False):
120        self.node = node
121        self.name = normalize(namex)
122        self.must_exist = must_exist
123        self.must_be_directory = must_be_directory
124        self.must_be_file = must_be_file
125
126    def modify(self, old_contents, servermap, first_time):
127        children = self.node._unpack_contents(old_contents)
128        if self.name not in children:
129            if first_time and self.must_exist:
130                raise NoSuchChildError(self.name)
131            self.old_child = None
132            return None
133        self.old_child, metadata = children[self.name]
134
135        # Unknown children can be removed regardless of must_be_directory or must_be_file.
136        if self.must_be_directory and IFileNode.providedBy(self.old_child):
137            raise ChildOfWrongTypeError("delete required a directory, not a file")
138        if self.must_be_file and IDirectoryNode.providedBy(self.old_child):
139            raise ChildOfWrongTypeError("delete required a file, not a directory")
140
141        del children[self.name]
142        new_contents = self.node._pack_contents(children)
143        return new_contents
144
145
146class MetadataSetter(object):
147    def __init__(self, node, namex, metadata, create_readonly_node=None):
148        self.node = node
149        self.name = normalize(namex)
150        self.metadata = metadata
151        self.create_readonly_node = create_readonly_node
152
153    def modify(self, old_contents, servermap, first_time):
154        children = self.node._unpack_contents(old_contents)
155        name = self.name
156        if name not in children:
157            raise NoSuchChildError(name)
158
159        now = time.time()
160        child = children[name][0]
161
162        metadata = update_metadata(children[name][1].copy(), self.metadata, now)
163        if self.create_readonly_node and metadata.get('no-write', False):
164            child = self.create_readonly_node(child, name)
165
166        children[name] = (child, metadata)
167        new_contents = self.node._pack_contents(children)
168        return new_contents
169
170
171class Adder(object):
172    def __init__(self, node, entries=None, overwrite=True, create_readonly_node=None):
173        """
174        :param overwrite: Either True (allow overwriting anything existing),
175            False (don't allow overwriting), or ONLY_FILES (only files can be
176            overwritten).
177        """
178        self.node = node
179        if entries is None:
180            entries = {}
181        precondition(isinstance(entries, dict), entries)
182        precondition(overwrite in (True, False, ONLY_FILES), overwrite)
183        # keys of 'entries' may not be normalized.
184        self.entries = entries
185        self.overwrite = overwrite
186        self.create_readonly_node = create_readonly_node
187
188    def set_node(self, namex, node, metadata):
189        precondition(IFilesystemNode.providedBy(node), node)
190        self.entries[namex] = (node, metadata)
191
192    def modify(self, old_contents, servermap, first_time):
193        children = self.node._unpack_contents(old_contents)
194        now = time.time()
195        for (namex, (child, new_metadata)) in list(self.entries.items()):
196            name = normalize(namex)
197            precondition(IFilesystemNode.providedBy(child), child)
198
199            # Strictly speaking this is redundant because we would raise the
200            # error again in _pack_normalized_children.
201            child.raise_error()
202
203            metadata = None
204            if name in children:
205                if not self.overwrite:
206                    raise ExistingChildError("child %s already exists" % quote_output(name, encoding='utf-8'))
207
208                if self.overwrite == ONLY_FILES and IDirectoryNode.providedBy(children[name][0]):
209                    raise ExistingChildError("child %s already exists as a directory" % quote_output(name, encoding='utf-8'))
210                metadata = children[name][1].copy()
211
212            metadata = update_metadata(metadata, new_metadata, now)
213            if self.create_readonly_node and metadata.get('no-write', False):
214                child = self.create_readonly_node(child, name)
215
216            children[name] = (child, metadata)
217        new_contents = self.node._pack_contents(children)
218        return new_contents
219
220def _encrypt_rw_uri(writekey, rw_uri):
221    precondition(isinstance(rw_uri, bytes), rw_uri)
222    precondition(isinstance(writekey, bytes), writekey)
223
224    salt = hashutil.mutable_rwcap_salt_hash(rw_uri)
225    key = hashutil.mutable_rwcap_key_hash(salt, writekey)
226    encryptor = aes.create_encryptor(key)
227    crypttext = aes.encrypt_data(encryptor, rw_uri)
228    mac = hashutil.hmac(key, salt + crypttext)
229    assert len(mac) == 32
230    return salt + crypttext + mac
231    # The MAC is not checked by readers in Tahoe >= 1.3.0, but we still
232    # produce it for the sake of older readers.
233
234def pack_children(childrenx, writekey, deep_immutable=False):
235    # initial_children must have metadata (i.e. {} instead of None)
236    children = {}
237    for (namex, (node, metadata)) in list(childrenx.items()):
238        precondition(isinstance(metadata, dict),
239                     "directory creation requires metadata to be a dict, not None", metadata)
240        children[normalize(namex)] = (node, metadata)
241
242    return _pack_normalized_children(children, writekey=writekey, deep_immutable=deep_immutable)
243
244
245ZERO_LEN_NETSTR=netstring(b'')
246def _pack_normalized_children(children, writekey, deep_immutable=False):
247    """Take a dict that maps:
248         children[unicode_nfc_name] = (IFileSystemNode, metadata_dict)
249    and pack it into a single string, for use as the contents of the backing
250    file. This is the same format as is returned by _unpack_contents. I also
251    accept an AuxValueDict, in which case I'll use the auxilliary cached data
252    as the pre-packed entry, which is faster than re-packing everything each
253    time.
254
255    If writekey is provided then I will superencrypt the child's writecap with
256    writekey.
257
258    If deep_immutable is True, I will require that all my children are deeply
259    immutable, and will raise a MustBeDeepImmutableError if not.
260    """
261    precondition((writekey is None) or isinstance(writekey, bytes), writekey)
262
263    has_aux = isinstance(children, AuxValueDict)
264    entries = []
265    for name in sorted(children.keys()):
266        assert isinstance(name, str)
267        entry = None
268        (child, metadata) = children[name]
269        child.raise_error()
270        if deep_immutable and not child.is_allowed_in_immutable_directory():
271            raise MustBeDeepImmutableError(
272                "child %r is not allowed in an immutable directory" % (name,),
273                name)
274        if has_aux:
275            entry = children.get_aux(name)
276        if not entry:
277            assert IFilesystemNode.providedBy(child), (name,child)
278            assert isinstance(metadata, dict)
279            rw_uri = child.get_write_uri()
280            if rw_uri is None:
281                rw_uri = b""
282            assert isinstance(rw_uri, bytes), rw_uri
283
284            # should be prevented by MustBeDeepImmutableError check above
285            assert not (rw_uri and deep_immutable)
286
287            ro_uri = child.get_readonly_uri()
288            if ro_uri is None:
289                ro_uri = b""
290            assert isinstance(ro_uri, bytes), ro_uri
291            if writekey is not None:
292                writecap = netstring(_encrypt_rw_uri(writekey, rw_uri))
293            else:
294                writecap = ZERO_LEN_NETSTR
295            entry = b"".join([netstring(name.encode("utf-8")),
296                             netstring(strip_prefix_for_ro(ro_uri, deep_immutable)),
297                             writecap,
298                             netstring(json.dumps(metadata).encode("utf-8"))])
299        entries.append(netstring(entry))
300    return b"".join(entries)
301
302@implementer(IDirectoryNode, ICheckable, IDeepCheckable)
303class DirectoryNode(object):
304    filenode_class = MutableFileNode
305
306    def __init__(self, filenode, nodemaker, uploader):
307        assert IFileNode.providedBy(filenode), filenode
308        assert not IDirectoryNode.providedBy(filenode), filenode
309        self._node = filenode
310        filenode_cap = filenode.get_cap()
311        self._uri = wrap_dirnode_cap(filenode_cap)
312        self._nodemaker = nodemaker
313        self._uploader = uploader
314
315    def __repr__(self):
316        return "<%s %s-%s %s>" % (self.__class__.__name__,
317                                  self.is_readonly() and "RO" or "RW",
318                                  self.is_mutable() and "MUT" or "IMM",
319                                  hasattr(self, '_uri') and str(self._uri.abbrev(), "utf-8"))
320
321    def get_size(self):
322        """Return the size of our backing mutable file, in bytes, if we've
323        fetched it. Otherwise return None. This returns synchronously."""
324        return self._node.get_size()
325
326    def get_current_size(self):
327        """Calculate the size of our backing mutable file, in bytes. Returns
328        a Deferred that fires with the result."""
329        return self._node.get_current_size()
330
331    def _read(self):
332        if self._node.is_mutable():
333            # use the IMutableFileNode API.
334            d = self._node.download_best_version()
335        else:
336            d = download_to_data(self._node)
337        d.addCallback(self._unpack_contents)
338        return d
339
340    def _decrypt_rwcapdata(self, encwrcap):
341        salt = encwrcap[:16]
342        crypttext = encwrcap[16:-32]
343        key = hashutil.mutable_rwcap_key_hash(salt, self._node.get_writekey())
344        encryptor = aes.create_decryptor(key)
345        plaintext = aes.decrypt_data(encryptor, crypttext)
346        return plaintext
347
348    def _create_and_validate_node(self, rw_uri, ro_uri, name):
349        # name is just for error reporting
350        node = self._nodemaker.create_from_cap(rw_uri, ro_uri,
351                                               deep_immutable=not self.is_mutable(),
352                                               name=name)
353        node.raise_error()
354        return node
355
356    def _create_readonly_node(self, node, name):
357        # name is just for error reporting
358        if not node.is_unknown() and node.is_readonly():
359            return node
360        return self._create_and_validate_node(None, node.get_readonly_uri(), name=name)
361
362    def _unpack_contents(self, data):
363        # the directory is serialized as a list of netstrings, one per child.
364        # Each child is serialized as a list of four netstrings: (name, ro_uri,
365        # rwcapdata, metadata), in which the name, ro_uri, metadata are in
366        # cleartext. The 'name' is UTF-8 encoded, and should be normalized to NFC.
367        # The rwcapdata is formatted as:
368        # pack("16ss32s", iv, AES(H(writekey+iv), plaintext_rw_uri), mac)
369        assert isinstance(data, bytes), (repr(data), type(data))
370        # an empty directory is serialized as an empty string
371        if data == b"":
372            return AuxValueDict()
373        writeable = not self.is_readonly()
374        mutable = self.is_mutable()
375        children = AuxValueDict()
376        position = 0
377        while position < len(data):
378            entries, position = split_netstring(data, 1, position)
379            entry = entries[0]
380            (namex_utf8, ro_uri, rwcapdata, metadata_s), subpos = split_netstring(entry, 4)
381            if not mutable and len(rwcapdata) > 0:
382                raise ValueError("the rwcapdata field of a dirnode in an immutable directory was not empty")
383
384            # A name containing characters that are unassigned in one version of Unicode might
385            # not be normalized wrt a later version. See the note in section 'Normalization Stability'
386            # at <http://unicode.org/policies/stability_policy.html>.
387            # Therefore we normalize names going both in and out of directories.
388            name = normalize(namex_utf8.decode("utf-8"))
389
390            rw_uri = b""
391            if writeable:
392                rw_uri = self._decrypt_rwcapdata(rwcapdata)
393
394            # Since the encryption uses CTR mode, it currently leaks the length of the
395            # plaintext rw_uri -- and therefore whether it is present, i.e. whether the
396            # dirnode is writeable (ticket #925). By stripping trailing spaces in
397            # Tahoe >= 1.6.0, we may make it easier for future versions to plug this leak.
398            # ro_uri is treated in the same way for consistency.
399            # rw_uri and ro_uri will be either None or a non-empty string.
400
401            rw_uri = rw_uri.rstrip(b' ') or None
402            ro_uri = ro_uri.rstrip(b' ') or None
403
404            try:
405                child = self._create_and_validate_node(rw_uri, ro_uri, name)
406                if mutable or child.is_allowed_in_immutable_directory():
407                    metadata = json.loads(metadata_s)
408                    assert isinstance(metadata, dict)
409                    children[name] = (child, metadata)
410                    children.set_with_aux(name, (child, metadata), auxilliary=entry)
411                else:
412                    log.msg(format="mutable cap for child %(name)s unpacked from an immutable directory",
413                            name=quote_output(name, encoding='utf-8'),
414                            facility="tahoe.webish", level=log.UNUSUAL)
415            except CapConstraintError as e:
416                log.msg(format="unmet constraint on cap for child %(name)s unpacked from a directory:\n"
417                               "%(message)s", message=e.args[0], name=quote_output(name, encoding='utf-8'),
418                               facility="tahoe.webish", level=log.UNUSUAL)
419
420        return children
421
422    def _pack_contents(self, children):
423        # expects children in the same format as _unpack_contents returns
424        return _pack_normalized_children(children, self._node.get_writekey())
425
426    def is_readonly(self):
427        return self._node.is_readonly()
428
429    def is_mutable(self):
430        return self._node.is_mutable()
431
432    def is_unknown(self):
433        return False
434
435    def is_allowed_in_immutable_directory(self):
436        return not self._node.is_mutable()
437
438    def raise_error(self):
439        pass
440
441    def get_uri(self):
442        return self._uri.to_string()
443
444    def get_write_uri(self):
445        if self.is_readonly():
446            return None
447        return self._uri.to_string()
448
449    def get_readonly_uri(self):
450        return self._uri.get_readonly().to_string()
451
452    def get_cap(self):
453        return self._uri
454
455    def get_readcap(self):
456        return self._uri.get_readonly()
457
458    def get_verify_cap(self):
459        return self._uri.get_verify_cap()
460
461    def get_repair_cap(self):
462        if self._node.is_readonly():
463            return None # readonly (mutable) dirnodes are not yet repairable
464        return self._uri
465
466    def get_storage_index(self):
467        return self._uri.get_storage_index()
468
469    def check(self, monitor, verify=False, add_lease=False):
470        """Perform a file check. See IChecker.check for details."""
471        return self._node.check(monitor, verify, add_lease)
472    def check_and_repair(self, monitor, verify=False, add_lease=False):
473        return self._node.check_and_repair(monitor, verify, add_lease)
474
475    def list(self):
476        """I return a Deferred that fires with a dictionary mapping child
477        name to a tuple of (IFilesystemNode, metadata)."""
478        return self._read()
479
480    def has_child(self, namex):
481        """I return a Deferred that fires with a boolean, True if there
482        exists a child of the given name, False if not."""
483        name = normalize(namex)
484        d = self._read()
485        d.addCallback(lambda children: name in children)
486        return d
487
488    def _get(self, children, name):
489        child = children.get(name)
490        if child is None:
491            raise NoSuchChildError(name)
492        return child[0]
493
494    def _get_with_metadata(self, children, name):
495        child = children.get(name)
496        if child is None:
497            raise NoSuchChildError(name)
498        return child
499
500    def get(self, namex):
501        """I return a Deferred that fires with the named child node,
502        which is an IFilesystemNode."""
503        name = normalize(namex)
504        d = self._read()
505        d.addCallback(self._get, name)
506        return d
507
508    def get_child_and_metadata(self, namex):
509        """I return a Deferred that fires with the (node, metadata) pair for
510        the named child. The node is an IFilesystemNode, and the metadata
511        is a dictionary."""
512        name = normalize(namex)
513        d = self._read()
514        d.addCallback(self._get_with_metadata, name)
515        return d
516
517    def get_metadata_for(self, namex):
518        name = normalize(namex)
519        d = self._read()
520        d.addCallback(lambda children: children[name][1])
521        return d
522
523    def set_metadata_for(self, namex, metadata):
524        name = normalize(namex)
525        if self.is_readonly():
526            return defer.fail(NotWriteableError())
527        assert isinstance(metadata, dict)
528        s = MetadataSetter(self, name, metadata,
529                           create_readonly_node=self._create_readonly_node)
530        d = self._node.modify(s.modify)
531        d.addCallback(lambda res: self)
532        return d
533
534    def get_child_at_path(self, pathx):
535        """Transform a child path into an IFilesystemNode.
536
537        I perform a recursive series of 'get' operations to find the named
538        descendant node. I return a Deferred that fires with the node, or
539        errbacks with IndexError if the node could not be found.
540
541        The path can be either a single string (slash-separated) or a list of
542        path-name elements.
543        """
544        d = self.get_child_and_metadata_at_path(pathx)
545        d.addCallback(lambda node_and_metadata: node_and_metadata[0])
546        return d
547
548    def get_child_and_metadata_at_path(self, pathx):
549        """Transform a child path into an IFilesystemNode and
550        a metadata dictionary from the last edge that was traversed.
551        """
552
553        if not pathx:
554            return defer.succeed((self, {}))
555        if isinstance(pathx, (list, tuple)):
556            pass
557        else:
558            pathx = pathx.split("/")
559        for p in pathx:
560            assert isinstance(p, str), p
561        childnamex = pathx[0]
562        remaining_pathx = pathx[1:]
563        if remaining_pathx:
564            d = self.get(childnamex)
565            d.addCallback(lambda node:
566                          node.get_child_and_metadata_at_path(remaining_pathx))
567            return d
568        d = self.get_child_and_metadata(childnamex)
569        return d
570
571    def set_uri(self, namex, writecap, readcap=None, metadata=None, overwrite=True):
572        precondition(isinstance(writecap, (bytes, type(None))), writecap)
573        precondition(isinstance(readcap, (bytes, type(None))), readcap)
574
575        # We now allow packing unknown nodes, provided they are valid
576        # for this type of directory.
577        child_node = self._create_and_validate_node(writecap, readcap, namex)
578        d = self.set_node(namex, child_node, metadata, overwrite)
579        d.addCallback(lambda res: child_node)
580        return d
581
582    def set_children(self, entries, overwrite=True):
583        # this takes URIs
584        a = Adder(self, overwrite=overwrite,
585                  create_readonly_node=self._create_readonly_node)
586        for (namex, e) in entries.items():
587            assert isinstance(namex, str), namex
588            if len(e) == 2:
589                writecap, readcap = e
590                metadata = None
591            else:
592                assert len(e) == 3
593                writecap, readcap, metadata = e
594            precondition(isinstance(writecap, (bytes,type(None))), writecap)
595            precondition(isinstance(readcap, (bytes,type(None))), readcap)
596
597            # We now allow packing unknown nodes, provided they are valid
598            # for this type of directory.
599            child_node = self._create_and_validate_node(writecap, readcap, namex)
600            a.set_node(namex, child_node, metadata)
601        d = self._node.modify(a.modify)
602        d.addCallback(lambda ign: self)
603        return d
604
605    def set_node(self, namex, child, metadata=None, overwrite=True):
606        """I add a child at the specific name. I return a Deferred that fires
607        when the operation finishes. This Deferred will fire with the child
608        node that was just added. I will replace any existing child of the
609        same name.
610
611        If this directory node is read-only, the Deferred will errback with a
612        NotWriteableError."""
613
614        precondition(IFilesystemNode.providedBy(child), child)
615
616        if self.is_readonly():
617            return defer.fail(NotWriteableError())
618        assert IFilesystemNode.providedBy(child), child
619        a = Adder(self, overwrite=overwrite,
620                  create_readonly_node=self._create_readonly_node)
621        a.set_node(namex, child, metadata)
622        d = self._node.modify(a.modify)
623        d.addCallback(lambda res: child)
624        return d
625
626    def set_nodes(self, entries, overwrite=True):
627        precondition(isinstance(entries, dict), entries)
628        if self.is_readonly():
629            return defer.fail(NotWriteableError())
630        a = Adder(self, entries, overwrite=overwrite,
631                  create_readonly_node=self._create_readonly_node)
632        d = self._node.modify(a.modify)
633        d.addCallback(lambda res: self)
634        return d
635
636
637    def add_file(self, namex, uploadable, metadata=None, overwrite=True):
638        """I upload a file (using the given IUploadable), then attach the
639        resulting FileNode to the directory at the given name. I return a
640        Deferred that fires (with the IFileNode of the uploaded file) when
641        the operation completes."""
642        with ADD_FILE(name=namex, metadata=metadata, overwrite=overwrite).context():
643            name = normalize(namex)
644            if self.is_readonly():
645                d = DeferredContext(defer.fail(NotWriteableError()))
646            else:
647                # XXX should pass reactor arg
648                d = DeferredContext(self._uploader.upload(uploadable))
649                d.addCallback(lambda results:
650                              self._create_and_validate_node(results.get_uri(), None,
651                                                             name))
652                d.addCallback(lambda node:
653                              self.set_node(name, node, metadata, overwrite))
654
655        return d.addActionFinish()
656
657    def delete(self, namex, must_exist=True, must_be_directory=False, must_be_file=False):
658        """I remove the child at the specific name. I return a Deferred that
659        fires (with the node just removed) when the operation finishes."""
660        if self.is_readonly():
661            return defer.fail(NotWriteableError())
662        deleter = Deleter(self, namex, must_exist=must_exist,
663                          must_be_directory=must_be_directory, must_be_file=must_be_file)
664        d = self._node.modify(deleter.modify)
665        d.addCallback(lambda res: deleter.old_child)
666        return d
667
668    # XXX: Too many arguments? Worthwhile to break into mutable/immutable?
669    def create_subdirectory(self, namex, initial_children=None, overwrite=True,
670                            mutable=True, mutable_version=None, metadata=None):
671        if initial_children is None:
672            initial_children = {}
673        name = normalize(namex)
674        if self.is_readonly():
675            return defer.fail(NotWriteableError())
676        if mutable:
677            if mutable_version:
678                d = self._nodemaker.create_new_mutable_directory(initial_children,
679                                                                 version=mutable_version)
680            else:
681                d = self._nodemaker.create_new_mutable_directory(initial_children)
682        else:
683            # mutable version doesn't make sense for immmutable directories.
684            assert mutable_version is None
685            d = self._nodemaker.create_immutable_directory(initial_children)
686        def _created(child):
687            entries = {name: (child, metadata)}
688            a = Adder(self, entries, overwrite=overwrite,
689                      create_readonly_node=self._create_readonly_node)
690            d = self._node.modify(a.modify)
691            d.addCallback(lambda res: child)
692            return d
693        d.addCallback(_created)
694        return d
695
696    def move_child_to(self, current_child_namex, new_parent,
697                      new_child_namex=None, overwrite=True):
698        """
699        I take one of my child links and move it to a new parent. The child
700        link is referenced by name. In the new parent, the child link will live
701        at 'new_child_namex', which defaults to 'current_child_namex'. I return
702        a Deferred that fires when the operation finishes.
703        'new_child_namex' and 'current_child_namex' need not be normalized.
704
705        The overwrite parameter may be True (overwrite any existing child),
706        False (error if the new child link already exists), or ONLY_FILES
707        (error if the new child link exists and points to a directory).
708        """
709        if self.is_readonly() or new_parent.is_readonly():
710            return defer.fail(NotWriteableError())
711
712        current_child_name = normalize(current_child_namex)
713        if new_child_namex is None:
714            new_child_name = current_child_name
715        else:
716            new_child_name = normalize(new_child_namex)
717
718        from_uri = self.get_write_uri()
719        if new_parent.get_write_uri() == from_uri and new_child_name == current_child_name:
720            # needed for correctness, otherwise we would delete the child
721            return defer.succeed("redundant rename/relink")
722
723        d = self.get_child_and_metadata(current_child_name)
724        def _got_child(child_and_metadata):
725            (child, metadata) = child_and_metadata
726            return new_parent.set_node(new_child_name, child, metadata,
727                                       overwrite=overwrite)
728        d.addCallback(_got_child)
729        d.addCallback(lambda child: self.delete(current_child_name))
730        return d
731
732
733    def deep_traverse(self, walker):
734        """Perform a recursive walk, using this dirnode as a root, notifying
735        the 'walker' instance of everything I encounter.
736
737        I call walker.enter_directory(parent, children) once for each dirnode
738        I visit, immediately after retrieving the list of children. I pass in
739        the parent dirnode and the dict of childname->(childnode,metadata).
740        This function should *not* traverse the children: I will do that.
741        enter_directory() is most useful for the deep-stats number that
742        counts how large a directory is.
743
744        I call walker.add_node(node, path) for each node (both files and
745        directories) I can reach. Most work should be done here.
746
747        I avoid loops by keeping track of verifier-caps and refusing to call
748        walker.add_node() or traverse a node that I've seen before. This
749        means that any file or directory will only be given to the walker
750        once. If files or directories are referenced multiple times by a
751        directory structure, this may appear to under-count or miss some of
752        them.
753
754        I return a Monitor which can be used to wait for the operation to
755        finish, learn about its progress, or cancel the operation.
756        """
757
758        # this is just a tree-walker, except that following each edge
759        # requires a Deferred. We used to use a ConcurrencyLimiter to limit
760        # fanout to 10 simultaneous operations, but the memory load of the
761        # queued operations was excessive (in one case, with 330k dirnodes,
762        # it caused the process to run into the 3.0GB-ish per-process 32bit
763        # linux memory limit, and crashed). So we use a single big Deferred
764        # chain, and do a strict depth-first traversal, one node at a time.
765        # This can be slower, because we aren't pipelining directory reads,
766        # but it brought the memory footprint down by roughly 50%.
767
768        monitor = Monitor()
769        walker.set_monitor(monitor)
770
771        found = set([self.get_verify_cap()])
772        d = self._deep_traverse_dirnode(self, [], walker, monitor, found)
773        d.addCallback(lambda ignored: walker.finish())
774        d.addBoth(monitor.finish)
775        d.addErrback(lambda f: None)
776
777        return monitor
778
779    def _deep_traverse_dirnode(self, node, path, walker, monitor, found):
780        # process this directory, then walk its children
781        monitor.raise_if_cancelled()
782        d = defer.maybeDeferred(walker.add_node, node, path)
783        d.addCallback(lambda ignored: node.list())
784        d.addCallback(self._deep_traverse_dirnode_children, node, path,
785                      walker, monitor, found)
786        return d
787
788    def _deep_traverse_dirnode_children(self, children, parent, path,
789                                        walker, monitor, found):
790        monitor.raise_if_cancelled()
791        d = defer.maybeDeferred(walker.enter_directory, parent, children)
792        # we process file-like children first, so we can drop their FileNode
793        # objects as quickly as possible. Tests suggest that a FileNode (held
794        # in the client's nodecache) consumes about 2440 bytes. dirnodes (not
795        # in the nodecache) seem to consume about 2000 bytes.
796        dirkids = []
797        filekids = []
798        for name, (child, metadata) in sorted(children.items()):
799            childpath = path + [name]
800            if isinstance(child, UnknownNode):
801                walker.add_node(child, childpath)
802                continue
803            verifier = child.get_verify_cap()
804            # allow LIT files (for which verifier==None) to be processed
805            if (verifier is not None) and (verifier in found):
806                continue
807            found.add(verifier)
808            if IDirectoryNode.providedBy(child):
809                dirkids.append( (child, childpath) )
810            else:
811                filekids.append( (child, childpath) )
812        for i, (child, childpath) in enumerate(filekids):
813            d.addCallback(lambda ignored, child=child, childpath=childpath:
814                          walker.add_node(child, childpath))
815            # to work around the Deferred tail-recursion problem
816            # (specifically the defer.succeed flavor) requires us to avoid
817            # doing more than 158 LIT files in a row. We insert a turn break
818            # once every 100 files (LIT or CHK) to preserve some stack space
819            # for other code. This is a different expression of the same
820            # Twisted problem as in #237.
821            if i % 100 == 99:
822                d.addCallback(lambda ignored: fireEventually())
823        for (child, childpath) in dirkids:
824            d.addCallback(lambda ignored, child=child, childpath=childpath:
825                          self._deep_traverse_dirnode(child, childpath,
826                                                      walker, monitor,
827                                                      found))
828        return d
829
830
831    def build_manifest(self):
832        """Return a Monitor, with a ['status'] that will be a list of (path,
833        cap) tuples, for all nodes (directories and files) reachable from
834        this one."""
835        walker = ManifestWalker(self)
836        return self.deep_traverse(walker)
837
838    def start_deep_stats(self):
839        # Since deep_traverse tracks verifier caps, we avoid double-counting
840        # children for which we've got both a write-cap and a read-cap
841        return self.deep_traverse(DeepStats(self))
842
843    def start_deep_check(self, verify=False, add_lease=False):
844        return self.deep_traverse(DeepChecker(self, verify, repair=False, add_lease=add_lease))
845
846    def start_deep_check_and_repair(self, verify=False, add_lease=False):
847        return self.deep_traverse(DeepChecker(self, verify, repair=True, add_lease=add_lease))
848
849
850class ManifestWalker(DeepStats):
851    def __init__(self, origin):
852        DeepStats.__init__(self, origin)
853        self.manifest = []
854        self.storage_index_strings = set()
855        self.verifycaps = set()
856
857    def add_node(self, node, path):
858        self.manifest.append( (tuple(path), node.get_uri()) )
859        si = node.get_storage_index()
860        if si:
861            self.storage_index_strings.add(base32.b2a(si))
862        v = node.get_verify_cap()
863        if v:
864            self.verifycaps.add(v.to_string())
865        return DeepStats.add_node(self, node, path)
866
867    def get_results(self):
868        stats = DeepStats.get_results(self)
869        return {"manifest": self.manifest,
870                "verifycaps": self.verifycaps,
871                "storage-index": self.storage_index_strings,
872                "stats": stats,
873                }
874
875
876class DeepChecker(object):
877    def __init__(self, root, verify, repair, add_lease):
878        root_si = root.get_storage_index()
879        if root_si:
880            root_si_base32 = base32.b2a(root_si)
881        else:
882            root_si_base32 = ""
883        self._lp = log.msg(format="deep-check starting (%(si)s),"
884                           " verify=%(verify)s, repair=%(repair)s",
885                           si=root_si_base32, verify=verify, repair=repair)
886        self._verify = verify
887        self._repair = repair
888        self._add_lease = add_lease
889        if repair:
890            self._results = DeepCheckAndRepairResults(root_si)
891        else:
892            self._results = DeepCheckResults(root_si)
893        self._stats = DeepStats(root)
894
895    def set_monitor(self, monitor):
896        self.monitor = monitor
897        monitor.set_status(self._results)
898
899    def add_node(self, node, childpath):
900        if self._repair:
901            d = node.check_and_repair(self.monitor, self._verify, self._add_lease)
902            d.addCallback(self._results.add_check_and_repair, childpath)
903        else:
904            d = node.check(self.monitor, self._verify, self._add_lease)
905            d.addCallback(self._results.add_check, childpath)
906        d.addCallback(lambda ignored: self._stats.add_node(node, childpath))
907        return d
908
909    def enter_directory(self, parent, children):
910        return self._stats.enter_directory(parent, children)
911
912    def finish(self):
913        log.msg("deep-check done", parent=self._lp)
914        self._results.update_stats(self._stats.get_results())
915        return self._results
916
917
918# use client.create_dirnode() to make one of these
Note: See TracBrowser for help on using the repository browser.