source: trunk/src/allmydata/frontends/magic_folder.py @ 0be2cbc

Last change on this file since 0be2cbc was 0be2cbc, checked in by Jean-Paul Calderone <exarkun@…>, at 2019-03-04T16:00:08Z

We don't need quoting in Eliot structured logs.

Also quote_filepath seems to be doing the wrong thing but it's not entirely
clear how.

  • Property mode set to 100644
File size: 70.4 KB
Line 
1
2import sys, os
3import os.path
4from errno import EEXIST
5from collections import deque
6from datetime import datetime
7import time
8import ConfigParser
9
10from twisted.python.filepath import FilePath
11from twisted.python.monkey import MonkeyPatcher
12from twisted.internet import defer, reactor, task
13from twisted.internet.error import AlreadyCancelled
14from twisted.python.failure import Failure
15from twisted.python import runtime
16from twisted.application import service
17
18from zope.interface import Interface, Attribute, implementer
19
20from eliot import (
21    Field,
22    ActionType,
23    MessageType,
24    write_failure,
25    write_traceback,
26    log_call,
27)
28from eliot.twisted import (
29    DeferredContext,
30)
31
32from allmydata.util import (
33    fileutil,
34    configutil,
35    yamlutil,
36    eliotutil,
37)
38from allmydata.util.fake_inotify import (
39    humanReadableMask,
40)
41from allmydata.interfaces import IDirectoryNode
42from allmydata.util import log
43from allmydata.util.fileutil import (
44    precondition_abspath,
45    get_pathinfo,
46    ConflictError,
47    abspath_expanduser_unicode,
48)
49from allmydata.util.assertutil import precondition, _assert
50from allmydata.util.deferredutil import HookMixin
51from allmydata.util.progress import PercentProgress
52from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
53     extend_filepath, unicode_from_filepath, unicode_segments_from, \
54     quote_filepath, quote_local_unicode_path, FilenameEncodingError
55from allmydata.util.time_format import format_time
56from allmydata.immutable.upload import FileName, Data
57from allmydata import magicfolderdb, magicpath
58
59
60# Mask off all non-owner permissions for magic-folders files by default.
61_DEFAULT_DOWNLOAD_UMASK = 0o077
62
63IN_EXCL_UNLINK = 0x04000000L
64
65
66class ConfigurationError(Exception):
67    """
68    There was something wrong with some magic-folder configuration.
69    """
70
71
72def _get_inotify_module():
73    try:
74        if sys.platform == "win32":
75            from allmydata.windows import inotify
76        elif runtime.platform.supportsINotify():
77            from twisted.internet import inotify
78        else:
79            raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
80                                      "This currently requires Linux or Windows.")
81        return inotify
82    except (ImportError, AttributeError) as e:
83        log.msg(e)
84        if sys.platform == "win32":
85            raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
86                                      "Windows support requires at least Vista, and has only been tested on Windows 7.")
87        raise
88
89
90def get_inotify_module():
91    # Until Twisted #9579 is fixed, the Docker check just screws things up.
92    # Disable it.
93    monkey = MonkeyPatcher()
94    monkey.addPatch(runtime.platform, "isDocker", lambda: False)
95    return monkey.runWithPatches(_get_inotify_module)
96
97
98def is_new_file(pathinfo, db_entry):
99    if db_entry is None:
100        return True
101
102    if not pathinfo.exists and db_entry.size is None:
103        return False
104
105    return ((pathinfo.size, pathinfo.ctime_ns, pathinfo.mtime_ns) !=
106            (db_entry.size, db_entry.ctime_ns, db_entry.mtime_ns))
107
108
109def _upgrade_magic_folder_config(basedir):
110    """
111    Helper that upgrades from single-magic-folder-only configs to
112    multiple magic-folder configuration style (in YAML)
113    """
114    config_fname = os.path.join(basedir, "tahoe.cfg")
115    config = configutil.get_config(config_fname)
116
117    collective_fname = os.path.join(basedir, "private", "collective_dircap")
118    upload_fname = os.path.join(basedir, "private", "magic_folder_dircap")
119    magic_folders = {
120        u"default": {
121            u"directory": config.get("magic_folder", "local.directory").decode("utf-8"),
122            u"collective_dircap": fileutil.read(collective_fname),
123            u"upload_dircap": fileutil.read(upload_fname),
124            u"poll_interval": int(config.get("magic_folder", "poll_interval")),
125        },
126    }
127    fileutil.move_into_place(
128        source=os.path.join(basedir, "private", "magicfolderdb.sqlite"),
129        dest=os.path.join(basedir, "private", "magicfolder_default.sqlite"),
130    )
131    save_magic_folders(basedir, magic_folders)
132    config.remove_option("magic_folder", "local.directory")
133    config.remove_option("magic_folder", "poll_interval")
134    configutil.write_config(os.path.join(basedir, 'tahoe.cfg'), config)
135    fileutil.remove_if_possible(collective_fname)
136    fileutil.remove_if_possible(upload_fname)
137
138
139def maybe_upgrade_magic_folders(node_directory):
140    """
141    If the given node directory is not already using the new-style
142    magic-folder config it will be upgraded to do so. (This should
143    only be done if the user is running a command that needs to modify
144    the config)
145    """
146    yaml_fname = os.path.join(node_directory, u"private", u"magic_folders.yaml")
147    if os.path.exists(yaml_fname):
148        # we already have new-style magic folders
149        return
150
151    config_fname = os.path.join(node_directory, "tahoe.cfg")
152    config = configutil.get_config(config_fname)
153
154    # we have no YAML config; if we have config in tahoe.cfg then we
155    # can upgrade it to the YAML-based configuration
156    if config.has_option("magic_folder", "local.directory"):
157        _upgrade_magic_folder_config(node_directory)
158
159
160def load_magic_folders(node_directory):
161    """
162    Loads existing magic-folder configuration and returns it as a dict
163    mapping name -> dict of config. This will NOT upgrade from
164    old-style to new-style config (but WILL read old-style config and
165    return in the same way as if it was new-style).
166
167    :param node_directory: path where node data is stored
168    :returns: dict mapping magic-folder-name to its config (also a dict)
169    """
170    yaml_fname = os.path.join(node_directory, u"private", u"magic_folders.yaml")
171    folders = dict()
172
173    config_fname = os.path.join(node_directory, "tahoe.cfg")
174    config = configutil.get_config(config_fname)
175
176    if not os.path.exists(yaml_fname):
177        # there will still be a magic_folder section in a "new"
178        # config, but it won't have local.directory nor poll_interval
179        # in it.
180        if config.has_option("magic_folder", "local.directory"):
181            up_fname = os.path.join(node_directory, "private", "magic_folder_dircap")
182            coll_fname = os.path.join(node_directory, "private", "collective_dircap")
183            directory = config.get("magic_folder", "local.directory").decode('utf8')
184            try:
185                interval = int(config.get("magic_folder", "poll_interval"))
186            except ConfigParser.NoOptionError:
187                interval = 60
188
189            if config.has_option("magic_folder", "download.umask"):
190                umask = int(config.get("magic_folder", "download.umask"), 8)
191            else:
192                umask = _DEFAULT_DOWNLOAD_UMASK
193
194            folders[u"default"] = {
195                u"directory": directory,
196                u"upload_dircap": fileutil.read(up_fname),
197                u"collective_dircap": fileutil.read(coll_fname),
198                u"poll_interval": interval,
199                u"umask": umask,
200            }
201        else:
202            # without any YAML file AND no local.directory option it's
203            # an error if magic-folder is "enabled" because we don't
204            # actually have enough config for any magic-folders at all
205            if config.has_section("magic_folder") \
206               and config.getboolean("magic_folder", "enabled") \
207               and not folders:
208                raise Exception(
209                    "[magic_folder] is enabled but has no YAML file and no "
210                    "'local.directory' option."
211                )
212
213    elif os.path.exists(yaml_fname):  # yaml config-file exists
214        if config.has_option("magic_folder", "local.directory"):
215            raise Exception(
216                "magic-folder config has both old-style configuration"
217                " and new-style configuration; please remove the "
218                "'local.directory' key from tahoe.cfg or remove "
219                "'magic_folders.yaml' from {}".format(node_directory)
220            )
221        with open(yaml_fname, "r") as f:
222            magic_folders = yamlutil.safe_load(f.read())
223            if not isinstance(magic_folders, dict):
224                raise Exception(
225                    "'{}' should contain a dict".format(yaml_fname)
226                )
227
228            folders = magic_folders['magic-folders']
229            if not isinstance(folders, dict):
230                raise Exception(
231                    "'magic-folders' in '{}' should be a dict".format(yaml_fname)
232                )
233
234    # check configuration
235    folders = dict(
236        (name, fix_magic_folder_config(yaml_fname, name, config))
237        for (name, config)
238        in folders.items()
239    )
240    return folders
241
242
243def fix_magic_folder_config(yaml_fname, name, config):
244    """
245    Check the given folder configuration for validity.
246
247    If it refers to a local directory which does not exist, create that
248    directory with the configured permissions.
249
250    :param unicode yaml_fname: The configuration file from which the
251        configuration was read.
252
253    :param unicode name: The name of the magic-folder this particular
254        configuration blob is associated with.
255
256    :param config: The configuration for a single magic-folder.  This is
257        expected to be a ``dict`` with certain keys and values of certain
258        types but these properties will be checked.
259
260    :raise ConfigurationError: If the given configuration object does not
261        conform to some magic-folder configuration requirement.
262    """
263    if not isinstance(config, dict):
264        raise ConfigurationError(
265            "Each item in '{}' must itself be a dict".format(yaml_fname)
266        )
267
268    for k in ['collective_dircap', 'upload_dircap', 'directory', 'poll_interval']:
269        if k not in config:
270            raise ConfigurationError(
271                "Config for magic folder '{}' is missing '{}'".format(
272                    name, k
273                )
274            )
275
276    if not isinstance(
277        config.setdefault(u"umask", _DEFAULT_DOWNLOAD_UMASK),
278        int,
279    ):
280        raise Exception("magic-folder download umask must be an integer")
281
282    # make sure directory for magic folder exists
283    dir_fp = to_filepath(config['directory'])
284    umask = config.setdefault('umask', 0077)
285
286    try:
287        os.mkdir(dir_fp.path, 0777 & (~ umask))
288    except OSError as e:
289        if EEXIST != e.errno:
290            # Report some unknown problem.
291            raise ConfigurationError(
292                "magic-folder {} configured path {} could not be created: "
293                "{}".format(
294                    name,
295                    dir_fp.path,
296                    str(e),
297                ),
298            )
299        elif not dir_fp.isdir():
300            # Tell the user there's a collision.
301            raise ConfigurationError(
302                "magic-folder {} configured path {} exists and is not a "
303                "directory".format(
304                    name, dir_fp.path,
305                ),
306            )
307
308    result_config = config.copy()
309    for k in ['collective_dircap', 'upload_dircap']:
310        if isinstance(config[k], unicode):
311            result_config[k] = config[k].encode('ascii')
312    return result_config
313
314
315
316def save_magic_folders(node_directory, folders):
317    fileutil.write_atomically(
318        os.path.join(node_directory, u"private", u"magic_folders.yaml"),
319        yamlutil.safe_dump({u"magic-folders": folders}),
320    )
321
322    config = configutil.get_config(os.path.join(node_directory, u"tahoe.cfg"))
323    configutil.set_config(config, "magic_folder", "enabled", "True")
324    configutil.write_config(os.path.join(node_directory, u"tahoe.cfg"), config)
325
326
327class MagicFolder(service.MultiService):
328
329    @classmethod
330    def from_config(cls, client_node, name, config):
331        """
332        Create a ``MagicFolder`` from a client node and magic-folder
333        configuration.
334
335        :param _Client client_node: The client node the magic-folder is
336            attached to.
337
338        :param dict config: Magic-folder configuration like that in the list
339            returned by ``load_magic_folders``.
340        """
341        db_filename = client_node.config.get_private_path("magicfolder_{}.sqlite".format(name))
342        local_dir_config = config['directory']
343        try:
344            poll_interval = int(config["poll_interval"])
345        except ValueError:
346            raise ValueError("'poll_interval' option must be an int")
347
348        return cls(
349            client=client_node,
350            upload_dircap=config["upload_dircap"],
351            collective_dircap=config["collective_dircap"],
352            # XXX surely a better way for this local_path_u business
353            local_path_u=abspath_expanduser_unicode(
354                local_dir_config,
355                base=client_node.config.get_config_path(),
356            ),
357            dbfile=abspath_expanduser_unicode(db_filename),
358            umask=config["umask"],
359            name=name,
360            downloader_delay=poll_interval,
361        )
362
363    def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
364                 name, uploader_delay=1.0, clock=None, downloader_delay=60):
365        precondition_abspath(local_path_u)
366        if not os.path.exists(local_path_u):
367            raise ValueError("'{}' does not exist".format(local_path_u))
368        if not os.path.isdir(local_path_u):
369            raise ValueError("'{}' is not a directory".format(local_path_u))
370        # this is used by 'service' things and must be unique in this Service hierarchy
371        self.name = 'magic-folder-{}'.format(name)
372
373        service.MultiService.__init__(self)
374
375        clock = clock or reactor
376        db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
377        if db is None:
378            raise Exception('ERROR: Unable to load magic folder db.')
379
380        # for tests
381        self._client = client
382        self._db = db
383
384        upload_dirnode = self._client.create_node_from_uri(upload_dircap)
385        collective_dirnode = self._client.create_node_from_uri(collective_dircap)
386
387        self.uploader = Uploader(client, local_path_u, db, upload_dirnode, uploader_delay, clock)
388        self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
389                                     upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask,
390                                     self.set_public_status, poll_interval=downloader_delay)
391        self._public_status = (False, ['Magic folder has not yet started'])
392
393    def get_public_status(self):
394        """
395        For the web UI, basically.
396        """
397        return self._public_status
398
399    def set_public_status(self, status, *messages):
400        self._public_status = (status, messages)
401
402    def startService(self):
403        # TODO: why is this being called more than once?
404        if self.running:
405            return defer.succeed(None)
406        service.MultiService.startService(self)
407        return self.uploader.start_monitoring()
408
409    def stopService(self):
410        with MAGIC_FOLDER_STOP(nickname=self.name).context():
411            d = DeferredContext(self._finish())
412        d.addBoth(
413            lambda ign: service.MultiService.stopService(self)
414        )
415        return d.addActionFinish()
416
417    def ready(self):
418        """ready is used to signal us to start
419        processing the upload and download items...
420        """
421        self.uploader.start_uploading()  # synchronous, returns None
422        return self.downloader.start_downloading()
423
424    def _finish(self):
425        d0 = self.downloader.stop()
426        d1 = self.uploader.stop()
427        return defer.DeferredList(list(
428            DeferredContext(d).addErrback(write_failure).result
429            for d in [d0, d1]
430        ))
431
432
433_NICKNAME = Field.for_types(
434    u"nickname",
435    [unicode, bytes],
436    u"A Magic-Folder participant nickname.",
437)
438
439_DIRECTION = Field.for_types(
440    u"direction",
441    [unicode],
442    u"A synchronization direction: uploader or downloader.",
443    eliotutil.validateSetMembership({u"uploader", u"downloader"}),
444)
445
446PROCESSING_LOOP = ActionType(
447    u"magic-folder:processing-loop",
448    [_NICKNAME, _DIRECTION],
449    [],
450    u"A Magic-Folder is processing uploads or downloads.",
451)
452
453ITERATION = ActionType(
454    u"magic-folder:iteration",
455    [_NICKNAME, _DIRECTION],
456    [],
457    u"A step towards synchronization in one direction.",
458)
459
460_COUNT = Field.for_types(
461    u"count",
462    [int, long],
463    u"The number of items in the processing queue.",
464)
465
466PROCESS_QUEUE = ActionType(
467    u"magic-folder:process-queue",
468    [_COUNT],
469    [],
470    u"A Magic-Folder is working through an item queue.",
471)
472
473SCAN_REMOTE_COLLECTIVE = ActionType(
474    u"magic-folder:scan-remote-collective",
475    [],
476    [],
477    u"The remote collective is being scanned for peer DMDs.",
478)
479
480SCAN_REMOTE_DMD = ActionType(
481    u"magic-folder:scan-remote-dmd",
482    [_NICKNAME],
483    [],
484    u"A peer DMD is being scanned for changes.",
485)
486
487REMOTE_VERSION = Field.for_types(
488    u"remote_version",
489    [int, long],
490    u"The version of a path found in a peer DMD.",
491)
492
493REMOTE_URI = Field.for_types(
494    u"remote_uri",
495    [bytes],
496    u"The filecap of a path found in a peer DMD.",
497)
498
499REMOTE_DMD_ENTRY = MessageType(
500    u"magic-folder:remote-dmd-entry",
501    [eliotutil.RELPATH, magicfolderdb.PATHENTRY, REMOTE_VERSION, REMOTE_URI],
502    u"A single entry found by scanning a peer DMD.",
503)
504
505ADD_TO_DOWNLOAD_QUEUE = MessageType(
506    u"magic-folder:add-to-download-queue",
507    [eliotutil.RELPATH],
508    u"An entry was found to be changed and is being queued for download.",
509)
510
511MAGIC_FOLDER_STOP = ActionType(
512    u"magic-folder:stop",
513    [_NICKNAME],
514    [],
515    u"A Magic-Folder is being stopped.",
516)
517
518MAYBE_UPLOAD = MessageType(
519    u"magic-folder:maybe-upload",
520    [eliotutil.RELPATH],
521    u"A decision is being made about whether to upload a file.",
522)
523
524PENDING = Field.for_types(
525    u"pending",
526    [list],
527    u"The paths which are pending processing.",
528)
529
530REMOVE_FROM_PENDING = ActionType(
531    u"magic-folder:remove-from-pending",
532    [eliotutil.RELPATH, PENDING],
533    [],
534    u"An item being processed is being removed from the pending set.",
535)
536
537PATH = Field(
538    u"path",
539    lambda fp: fp.asTextMode().path,
540    u"A local filesystem path.",
541    eliotutil.validateInstanceOf(FilePath),
542)
543
544NOTIFIED_OBJECT_DISAPPEARED = MessageType(
545    u"magic-folder:notified-object-disappeared",
546    [PATH],
547    u"A path which generated a notification was not found on the filesystem.  This is normal.",
548)
549
550NOT_UPLOADING = MessageType(
551    u"magic-folder:not-uploading",
552    [],
553    u"An item being processed is not going to be uploaded.",
554)
555
556SYMLINK = MessageType(
557    u"magic-folder:symlink",
558    [PATH],
559    u"An item being processed was a symlink and is being skipped",
560)
561
562CREATED_DIRECTORY = Field.for_types(
563    u"created_directory",
564    [unicode],
565    u"The relative path of a newly created directory in a magic-folder.",
566)
567
568PROCESS_DIRECTORY = ActionType(
569    u"magic-folder:process-directory",
570    [],
571    [CREATED_DIRECTORY],
572    u"An item being processed was a directory.",
573)
574
575DIRECTORY_PATHENTRY = MessageType(
576    u"magic-folder:directory-dbentry",
577    [magicfolderdb.PATHENTRY],
578    u"Local database state relating to an item possibly being uploaded.",
579)
580
581NOT_NEW_DIRECTORY = MessageType(
582    u"magic-folder:not-new-directory",
583    [],
584    u"A directory item being processed was found to not be new.",
585)
586
587NOT_NEW_FILE = MessageType(
588    u"magic-folder:not-new-file",
589    [],
590    u"A file item being processed was found to not be new (or changed).",
591)
592
593SPECIAL_FILE = MessageType(
594    u"magic-folder:special-file",
595    [],
596    u"An item being processed was found to be of a special type which is not supported.",
597)
598
599_COUNTER_NAME = Field.for_types(
600    u"counter_name",
601    # Should really only be unicode
602    [unicode, bytes],
603    u"The name of a counter.",
604)
605
606_DELTA = Field.for_types(
607    u"delta",
608    [int, long],
609    u"An amount of a specific change in a counter.",
610)
611
612_VALUE = Field.for_types(
613    u"value",
614    [int, long],
615    u"The new value of a counter after a change.",
616)
617
618COUNT_CHANGED = MessageType(
619    u"magic-folder:count",
620    [_COUNTER_NAME, _DELTA, _VALUE],
621    u"The value of a counter has changed.",
622)
623
624START_MONITORING = ActionType(
625    u"magic-folder:start-monitoring",
626    [_NICKNAME, _DIRECTION],
627    [],
628    u"Uploader is beginning to monitor the filesystem for uploadable changes.",
629)
630
631STOP_MONITORING = ActionType(
632    u"magic-folder:stop-monitoring",
633    [_NICKNAME, _DIRECTION],
634    [],
635    u"Uploader is terminating filesystem monitoring operation.",
636)
637
638START_UPLOADING = ActionType(
639    u"magic-folder:start-uploading",
640    [_NICKNAME, _DIRECTION],
641    [],
642    u"Uploader is performing startup-time inspection of known files.",
643)
644
645_IGNORED = Field.for_types(
646    u"ignored",
647    [bool],
648    u"A file proposed for queueing for processing is instead being ignored by policy.",
649)
650
651_ALREADY_PENDING = Field.for_types(
652    u"already_pending",
653    [bool],
654    u"A file proposed for queueing for processing is already in the queue.",
655)
656
657_SIZE = Field.for_types(
658    u"size",
659    [int, long, type(None)],
660    u"The size of a file accepted into the processing queue.",
661)
662
663ADD_PENDING = ActionType(
664    u"magic-folder:add-pending",
665    [eliotutil.RELPATH],
666    [_IGNORED, _ALREADY_PENDING, _SIZE],
667    u"Uploader is adding a path to the processing queue.",
668)
669
670FULL_SCAN = ActionType(
671    u"magic-folder:full-scan",
672    [_NICKNAME, _DIRECTION],
673    [],
674    u"A complete brute-force scan of the local directory is being performed.",
675)
676
677SCAN = ActionType(
678    u"magic-folder:scan",
679    [eliotutil.RELPATH],
680    [],
681    u"A brute-force scan of a subset of the local directory is being performed.",
682)
683
684NOTIFIED = ActionType(
685    u"magic-folder:notified",
686    [PATH, _NICKNAME, _DIRECTION],
687    [],
688    u"Magic-Folder received a notification of a local filesystem change for a certain path.",
689)
690
691_EVENTS = Field(
692    u"events",
693    humanReadableMask,
694    u"Details about a filesystem event generating a notification event.",
695    eliotutil.validateInstanceOf((int, long)),
696)
697
698_NON_DIR_CREATED = Field.for_types(
699    u"non_dir_created",
700    [bool],
701    u"A creation event was for a non-directory and requires no further inspection.",
702)
703
704
705REACT_TO_INOTIFY = ActionType(
706    u"magic-folder:react-to-inotify",
707    [_EVENTS],
708    [_IGNORED, _NON_DIR_CREATED, _ALREADY_PENDING],
709    u"Magic-Folder is processing a notification from inotify(7) (or a clone) about a filesystem event.",
710)
711
712_ABSPATH = Field.for_types(
713    u"abspath",
714    [unicode],
715    u"The absolute path of a file being written in a local directory.",
716)
717
718_IS_CONFLICT = Field.for_types(
719    u"is_conflict",
720    [bool],
721    u"An indication of whether a file being written in a local directory is in a conflicted state.",
722)
723
724_NOW = Field.for_types(
725    u"now",
726    [int, long, float],
727    u"The time at which a file is being written in a local directory.",
728)
729
730_MTIME = Field.for_types(
731    u"mtime",
732    [int, long, float, type(None)],
733    u"A modification time to put into the metadata of a file being written in a local directory.",
734)
735
736WRITE_DOWNLOADED_FILE = ActionType(
737    u"magic-folder:write-downloaded-file",
738    [_ABSPATH, _SIZE, _IS_CONFLICT, _NOW, _MTIME],
739    [],
740    u"A downloaded file is being written to the filesystem.",
741)
742
743ALREADY_GONE = MessageType(
744    u"magic-folder:rename:already-gone",
745    [],
746    u"A deleted file could not be rewritten to a backup path because it no longer exists.",
747)
748
749_REASON = Field(
750    u"reason",
751    lambda e: str(e),
752    u"An exception which may describe the form of the conflict.",
753    eliotutil.validateInstanceOf(Exception),
754)
755
756OVERWRITE_BECOMES_CONFLICT = MessageType(
757    u"magic-folder:overwrite-becomes-conflict",
758    [_REASON],
759    u"An attempt to overwrite an existing file failed because that file is now conflicted.",
760)
761
762_FILES = Field(
763    u"files",
764    lambda file_set: list(file_set),
765    u"All of the relative paths belonging to a Magic-Folder that are locally known.",
766)
767
768ALL_FILES = MessageType(
769    u"magic-folder:all-files",
770    [_FILES],
771    u"A record of the rough state of the local database at the time of downloader start up.",
772)
773
774_ITEMS = Field(
775    u"items",
776    lambda deque: list(dict(relpath=item.relpath_u, kind=item.kind) for item in deque),
777    u"Items in a processing queue.",
778)
779
780ITEM_QUEUE = MessageType(
781    u"magic-folder:item-queue",
782    [_ITEMS],
783    u"A report of the items in the processing queue at this point.",
784)
785
786_BATCH = Field(
787    u"batch",
788    # Just report the paths for now.  Perhaps something from the values would
789    # also be useful, though?  Consider it.
790    lambda batch: batch.keys(),
791    u"A batch of scanned items.",
792    eliotutil.validateInstanceOf(dict),
793)
794
795SCAN_BATCH = MessageType(
796    u"magic-folder:scan-batch",
797    [_BATCH],
798    u"Items in a batch of files which were scanned from the DMD.",
799)
800
801START_DOWNLOADING = ActionType(
802    u"magic-folder:start-downloading",
803    [_NICKNAME, _DIRECTION],
804    [],
805    u"A Magic-Folder downloader is initializing and beginning to manage downloads.",
806)
807
808PERFORM_SCAN = ActionType(
809    u"magic-folder:perform-scan",
810    [],
811    [],
812    u"Remote storage is being scanned for changes which need to be synchronized.",
813)
814
815_STATUS = Field.for_types(
816    u"status",
817    # Should just be unicode...
818    [unicode, bytes],
819    u"The status of an item in a processing queue.",
820)
821
822QUEUED_ITEM_STATUS_CHANGE = MessageType(
823    u"magic-folder:item:status-change",
824    [eliotutil.RELPATH, _STATUS],
825    u"A queued item changed status.",
826)
827
828_CONFLICT_REASON = Field.for_types(
829    u"conflict_reason",
830    [unicode, type(None)],
831    u"A human-readable explanation of why a file was in conflict.",
832    eliotutil.validateSetMembership({
833        u"dbentry mismatch metadata",
834        u"dbentry newer version",
835        u"last_downloaded_uri mismatch",
836        u"file appeared",
837        None,
838    }),
839)
840
841CHECKING_CONFLICTS = ActionType(
842    u"magic-folder:item:checking-conflicts",
843    [],
844    [_IS_CONFLICT, _CONFLICT_REASON],
845    u"A potential download item is being checked to determine if it is in a conflicted state.",
846)
847
848REMOTE_DIRECTORY_CREATED = MessageType(
849    u"magic-folder:remote-directory-created",
850    [],
851    u"The downloader found a new directory in the DMD.",
852)
853
854REMOTE_DIRECTORY_DELETED = MessageType(
855    u"magic-folder:remote-directory-deleted",
856    [],
857    u"The downloader found a directory has been deleted from the DMD.",
858)
859
860class QueueMixin(HookMixin):
861    """
862    A parent class for Uploader and Downloader that handles putting
863    IQueuedItem instances into a work queue and processing
864    them. Tracks some history of recent items processed (for the
865    "status" API).
866
867    Subclasses implement _scan_delay, _perform_scan and _process
868
869    :ivar unicode _name: Either "uploader" or "downloader".
870
871    :ivar _deque: IQueuedItem instances to process
872
873    :ivar _process_history: the last 20 items we processed
874
875    :ivar _in_progress: current batch of items which are currently
876        being processed; chunks of work are removed from _deque and
877        worked on. As each finishes, it is added to _process_history
878        (with oldest items falling off the end).
879    """
880
881    def __init__(self, client, local_path_u, db, name, clock):
882        self._client = client
883        self._local_path_u = local_path_u
884        self._local_filepath = to_filepath(local_path_u)
885        self._db = db
886        self._name = name
887        self._clock = clock
888        self._log_fields = dict(
889            nickname=self._client.nickname,
890            direction=self._name,
891        )
892        self._hooks = {
893            'processed': None,
894            'started': None,
895            'iteration': None,
896            'inotify': None,
897            'item_processed': None,
898        }
899        self.started_d = self.set_hook('started')
900
901        # we should have gotten nice errors already while loading the
902        # config, but just to be safe:
903        assert self._local_filepath.exists()
904        assert self._local_filepath.isdir()
905
906        self._deque = deque()
907        # do we also want to bound on "maximum age"?
908        self._process_history = deque(maxlen=20)
909        self._in_progress = []
910
911    def get_status(self):
912        """
913        Returns an iterable of instances that implement IQueuedItem
914        """
915        for item in self._deque:
916            yield item
917        for item in self._in_progress:
918            yield item
919        for item in self._process_history:
920            yield item
921
922    def _get_filepath(self, relpath_u):
923        return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
924
925    def stop(self):
926        """
927        Don't process queued items anymore.
928
929        :return Deferred: A ``Deferred`` that fires when processing has
930            completely stopped.
931        """
932        d = self._processing
933        self._processing_loop.stop()
934        self._processing = None
935        self._processing_loop = None
936        return d
937
938    def _begin_processing(self):
939        """
940        Start a loop that looks for work to do and then does it.
941        """
942        action = PROCESSING_LOOP(**self._log_fields)
943
944        # Note that we don't put the processing iterations into the logging
945        # action because we expect this loop to run for the whole lifetime of
946        # the process.  The tooling for dealing with incomplete action trees
947        # is still somewhat lacking.  Putting the iteractions into the overall
948        # loop action would hamper reading those logs for now.
949        self._processing_loop = task.LoopingCall(self._processing_iteration)
950        self._processing_loop.clock = self._clock
951        self._processing = self._processing_loop.start(self._scan_delay(), now=True)
952
953        with action.context():
954            # We do make sure errors appear in the loop action though.
955            d = DeferredContext(self._processing)
956            d.addActionFinish()
957
958    def _processing_iteration(self):
959        """
960        One iteration runs self._process_deque which calls _perform_scan() and
961        then completely drains the _deque (processing each item).
962        """
963        action = ITERATION(**self._log_fields)
964        with action.context():
965            d = DeferredContext(defer.Deferred())
966
967            # adds items to our deque
968            d.addCallback(lambda ignored: self._perform_scan())
969
970            # process anything in our queue
971            d.addCallback(lambda ignored: self._process_deque())
972
973            # Let the tests know we've made it this far.
974            d.addCallback(lambda ignored: self._call_hook(None, 'iteration'))
975
976            # Get it out of the Eliot context
977            result = d.addActionFinish()
978
979            # Kick it off
980            result.callback(None)
981
982            # Give it back to LoopingCall so it can wait on us.
983            return result
984
985    def _scan_delay(self):
986        raise NotImplementedError
987
988    def _perform_scan(self):
989        return
990
991    @eliotutil.inline_callbacks
992    def _process_deque(self):
993        # process everything currently in the queue. we're turning it
994        # into a list so that if any new items get added while we're
995        # processing, they'll not run until next time)
996        to_process = list(self._deque)
997        self._deque.clear()
998        self._count('objects_queued', -len(to_process))
999
1000        # we want to include all these in the next status request, so
1001        # we must put them 'somewhere' before the next yield (and it's
1002        # not in _process_history because that gets trimmed and we
1003        # don't want anything to disappear until after it is
1004        # completed)
1005        self._in_progress.extend(to_process)
1006
1007        with PROCESS_QUEUE(count=len(to_process)):
1008            for item in to_process:
1009                self._process_history.appendleft(item)
1010                self._in_progress.remove(item)
1011                try:
1012                    proc = yield self._process(item)
1013                    if not proc:
1014                        self._process_history.remove(item)
1015                    self._call_hook(item, 'item_processed')
1016                except:
1017                    write_traceback()
1018                    item.set_status('failed', self._clock.seconds())
1019                    proc = Failure()
1020
1021                self._call_hook(proc, 'processed')
1022
1023    def _get_relpath(self, filepath):
1024        segments = unicode_segments_from(filepath, self._local_filepath)
1025        return u"/".join(segments)
1026
1027    def _count(self, counter_name, delta=1):
1028        ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
1029        self._client.stats_provider.count(ctr, delta)
1030        COUNT_CHANGED.log(
1031            counter_name=counter_name,
1032            delta=delta,
1033            value=self._client.stats_provider.counters[ctr],
1034        )
1035
1036# this isn't in interfaces.py because it's very specific to QueueMixin
1037class IQueuedItem(Interface):
1038    relpath_u = Attribute("The path this item represents")
1039    progress = Attribute("A PercentProgress instance")
1040
1041    def set_status(self, status, current_time=None):
1042        """
1043        """
1044
1045    def status_time(self, state):
1046        """
1047        Get the time of particular state change, or None
1048        """
1049
1050    def status_history(self):
1051        """
1052        All status changes, sorted latest -> oldest
1053        """
1054
1055
1056@implementer(IQueuedItem)
1057class QueuedItem(object):
1058    kind = None
1059
1060    def __init__(self, relpath_u, progress, size):
1061        self.relpath_u = relpath_u
1062        self.progress = progress
1063        self._status_history = dict()
1064        self.size = size
1065
1066    def set_status(self, status, current_time=None):
1067        if current_time is None:
1068            current_time = time.time()
1069        self._status_history[status] = current_time
1070        QUEUED_ITEM_STATUS_CHANGE.log(
1071            relpath=self.relpath_u,
1072            status=status,
1073        )
1074
1075    def status_time(self, state):
1076        """
1077        Returns None if there's no status-update for 'state', else returns
1078        the timestamp when that state was reached.
1079        """
1080        return self._status_history.get(state, None)
1081
1082    def status_history(self):
1083        """
1084        Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
1085        """
1086        hist = self._status_history.items()
1087        hist.sort(lambda a, b: cmp(a[1], b[1]))
1088        return hist
1089
1090    def __eq__(self, other):
1091        return (
1092            other.relpath_u == self.relpath_u,
1093            other.status_history() == self.status_history(),
1094        )
1095
1096
1097class UploadItem(QueuedItem):
1098    """
1099    Represents a single item the _deque of the Uploader
1100    """
1101    kind = u"upload"
1102
1103
1104_ITEM = Field(
1105    u"item",
1106    lambda i: {
1107        u"relpath": i.relpath_u,
1108        u"size": i.size,
1109    },
1110    u"An item to be uploaded or downloaded.",
1111    eliotutil.validateInstanceOf(QueuedItem),
1112)
1113
1114PROCESS_ITEM = ActionType(
1115    u"magic-folder:process-item",
1116    [_ITEM],
1117    [],
1118    u"A path which was found wanting of an update is receiving an update.",
1119)
1120
1121class Uploader(QueueMixin):
1122
1123    def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
1124        QueueMixin.__init__(self, client, local_path_u, db, u'uploader', clock)
1125
1126        self.is_ready = False
1127
1128        if not IDirectoryNode.providedBy(upload_dirnode):
1129            raise AssertionError("'upload_dircap' does not refer to a directory")
1130        if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
1131            raise AssertionError("'upload_dircap' is not a writecap to a directory")
1132
1133        self._upload_dirnode = upload_dirnode
1134        self._inotify = get_inotify_module()
1135        self._notifier = self._inotify.INotify()
1136
1137        self._pending = set()  # of unicode relpaths
1138        self._pending_delay = pending_delay
1139        self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
1140        self._periodic_callid = None
1141
1142        if hasattr(self._notifier, 'set_pending_delay'):
1143            self._notifier.set_pending_delay(pending_delay)
1144
1145        # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
1146        #
1147        self.mask = ( self._inotify.IN_CREATE
1148                    | self._inotify.IN_CLOSE_WRITE
1149                    | self._inotify.IN_MOVED_TO
1150                    | self._inotify.IN_MOVED_FROM
1151                    | self._inotify.IN_DELETE
1152                    | self._inotify.IN_ONLYDIR
1153                    | IN_EXCL_UNLINK
1154                    )
1155        self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
1156                             recursive=False)#True)
1157
1158    def start_monitoring(self):
1159        action = START_MONITORING(**self._log_fields)
1160        with action.context():
1161            d = DeferredContext(defer.succeed(None))
1162
1163        d.addCallback(lambda ign: self._notifier.startReading())
1164        d.addCallback(lambda ign: self._count('dirs_monitored'))
1165        d.addBoth(self._call_hook, 'started')
1166        return d.addActionFinish()
1167
1168    def stop(self):
1169        action = STOP_MONITORING(**self._log_fields)
1170        with action.context():
1171            self._notifier.stopReading()
1172            self._count('dirs_monitored', -1)
1173            if self._periodic_callid:
1174                try:
1175                    self._periodic_callid.cancel()
1176                except AlreadyCancelled:
1177                    pass
1178
1179            if hasattr(self._notifier, 'wait_until_stopped'):
1180                d = DeferredContext(self._notifier.wait_until_stopped())
1181            else:
1182                d = DeferredContext(defer.succeed(None))
1183
1184            d.addCallback(lambda ignored: QueueMixin.stop(self))
1185            return d.addActionFinish()
1186
1187    def start_uploading(self):
1188        action = START_UPLOADING(**self._log_fields)
1189        with action:
1190            self.is_ready = True
1191
1192            all_relpaths = self._db.get_all_relpaths()
1193
1194            for relpath_u in all_relpaths:
1195                self._add_pending(relpath_u)
1196
1197            self._full_scan()
1198            self._begin_processing()
1199
1200    def _scan_delay(self):
1201        return self._pending_delay
1202
1203    def _full_scan(self):
1204        with FULL_SCAN(**self._log_fields):
1205            self._periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
1206            self._scan(u"")
1207
1208    def _add_pending(self, relpath_u):
1209        with ADD_PENDING(relpath=relpath_u) as action:
1210            if magicpath.should_ignore_file(relpath_u):
1211                action.add_success_fields(ignored=True, already_pending=False, size=None)
1212                return
1213            if self.is_pending(relpath_u):
1214                action.add_success_fields(ignored=False, already_pending=True, size=None)
1215                return
1216
1217            self._pending.add(relpath_u)
1218            fp = self._get_filepath(relpath_u)
1219            pathinfo = get_pathinfo(unicode_from_filepath(fp))
1220            progress = PercentProgress()
1221            action.add_success_fields(ignored=False, already_pending=False, size=pathinfo.size)
1222            item = UploadItem(relpath_u, progress, pathinfo.size)
1223            item.set_status('queued', self._clock.seconds())
1224            self._deque.append(item)
1225            self._count('objects_queued')
1226
1227    def _scan(self, reldir_u):
1228        # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
1229        # Note that this doesn't add them to the deque -- that will
1230        with SCAN(relpath=reldir_u):
1231            fp = self._get_filepath(reldir_u)
1232            try:
1233                children = listdir_filepath(fp)
1234            except EnvironmentError:
1235                raise Exception("WARNING: magic folder: permission denied on directory %s"
1236                                % quote_filepath(fp))
1237            except FilenameEncodingError:
1238                raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
1239                                % quote_filepath(fp))
1240
1241            for child in children:
1242                _assert(isinstance(child, unicode), child=child)
1243                self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
1244
1245    def is_pending(self, relpath_u):
1246        return relpath_u in self._pending
1247
1248    def _notify(self, opaque, path, events_mask):
1249        with NOTIFIED(path=path, **self._log_fields):
1250            try:
1251                return self._real_notify(opaque, path, events_mask)
1252            except Exception:
1253                write_traceback()
1254
1255    def _real_notify(self, opaque, path, events_mask):
1256        action = REACT_TO_INOTIFY(
1257            # We could think about logging opaque here but ... it's opaque.
1258            # All can do is id() or repr() it and neither of those actually
1259            # produces very illuminating results.  We drop opaque on the
1260            # floor, anyway.
1261            events=events_mask,
1262        )
1263        success_fields = dict(non_dir_created=False, already_pending=False, ignored=False)
1264
1265        with action:
1266            relpath_u = self._get_relpath(path)
1267
1268            # We filter out IN_CREATE events not associated with a directory.
1269            # Acting on IN_CREATE for files could cause us to read and upload
1270            # a possibly-incomplete file before the application has closed it.
1271            # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
1272            # It isn't possible to avoid watching for IN_CREATE at all, because
1273            # it is the only event notified for a directory creation.
1274
1275            if ((events_mask & self._inotify.IN_CREATE) != 0 and
1276                (events_mask & self._inotify.IN_ISDIR) == 0):
1277                success_fields[u"non_dir_created"] = True
1278            elif relpath_u in self._pending:
1279                success_fields[u"already_pending"] = True
1280            elif magicpath.should_ignore_file(relpath_u):
1281                success_fields[u"ignored"] = True
1282            else:
1283                self._add_pending(relpath_u)
1284                self._call_hook(path, 'inotify')
1285            action.add_success_fields(**success_fields)
1286
1287    def _process(self, item):
1288        """
1289        Possibly upload a single QueuedItem.  If this returns False, the item is
1290        removed from _process_history.
1291        """
1292        # Uploader
1293        with PROCESS_ITEM(item=item).context():
1294            d = DeferredContext(defer.succeed(False))
1295
1296            relpath_u = item.relpath_u
1297            item.set_status('started', self._clock.seconds())
1298
1299            if relpath_u is None:
1300                item.set_status('invalid_path', self._clock.seconds())
1301                return d.addActionFinish()
1302
1303            precondition(isinstance(relpath_u, unicode), relpath_u)
1304            precondition(not relpath_u.endswith(u'/'), relpath_u)
1305
1306        def _maybe_upload(ign, now=None):
1307            MAYBE_UPLOAD.log(relpath=relpath_u)
1308            if now is None:
1309                now = time.time()
1310            fp = self._get_filepath(relpath_u)
1311            pathinfo = get_pathinfo(unicode_from_filepath(fp))
1312
1313            try:
1314                with REMOVE_FROM_PENDING(relpath=relpath_u, pending=list(self._pending)):
1315                    self._pending.remove(relpath_u)
1316            except KeyError:
1317                pass
1318            encoded_path_u = magicpath.path2magic(relpath_u)
1319
1320            if not pathinfo.exists:
1321                # FIXME merge this with the 'isfile' case.
1322                NOTIFIED_OBJECT_DISAPPEARED.log(path=fp)
1323                self._count('objects_disappeared')
1324
1325                db_entry = self._db.get_db_entry(relpath_u)
1326                if db_entry is None:
1327                    return False
1328
1329                last_downloaded_timestamp = now  # is this correct?
1330
1331                if is_new_file(pathinfo, db_entry):
1332                    new_version = db_entry.version + 1
1333                else:
1334                    NOT_UPLOADING.log()
1335                    self._count('objects_not_uploaded')
1336                    return False
1337
1338                # look out! there's another place we set a "metadata"
1339                # object like this (for new, not deleted files)
1340                metadata = {
1341                    'version': new_version,
1342                    'deleted': True,
1343                    'last_downloaded_timestamp': last_downloaded_timestamp,
1344                    'user_mtime': pathinfo.ctime_ns / 1000000000.0,  # why are we using ns in PathInfo??
1345                }
1346
1347                # from the Fire Dragons part of the spec:
1348                # Later, in response to a local filesystem change at a given path, the
1349                # Magic Folder client reads the last-downloaded record associated with
1350                # that path (if any) from the database and then uploads the current
1351                # file. When it links the uploaded file into its client DMD, it
1352                # includes the ``last_downloaded_uri`` field in the metadata of the
1353                # directory entry, overwriting any existing field of that name. If
1354                # there was no last-downloaded record associated with the path, this
1355                # field is omitted.
1356                # Note that ``last_downloaded_uri`` field does *not* record the URI of
1357                # the uploaded file (which would be redundant); it records the URI of
1358                # the last download before the local change that caused the upload.
1359                # The field will be absent if the file has never been downloaded by
1360                # this client (i.e. if it was created on this client and no change
1361                # by any other client has been detected).
1362
1363                # XXX currently not actually true: it will record the
1364                # LAST THING we wrote to (or saw on) disk (not
1365                # necessarily downloaded?)
1366
1367                if db_entry.last_downloaded_uri is not None:
1368                    metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
1369                if db_entry.last_uploaded_uri is not None:
1370                    metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri
1371
1372                empty_uploadable = Data("", self._client.convergence)
1373                d2 = DeferredContext(self._upload_dirnode.add_file(
1374                    encoded_path_u, empty_uploadable,
1375                    metadata=metadata,
1376                    overwrite=True,
1377                    progress=item.progress,
1378                ))
1379
1380                def _add_db_entry(filenode):
1381                    filecap = filenode.get_uri()
1382                    # if we're uploading a file, we want to set
1383                    # last_downloaded_uri to the filecap so that we don't
1384                    # immediately re-download it when we start up next
1385                    last_downloaded_uri = metadata.get('last_downloaded_uri', filecap)
1386                    self._db.did_upload_version(
1387                        relpath_u,
1388                        new_version,
1389                        filecap,
1390                        last_downloaded_uri,
1391                        last_downloaded_timestamp,
1392                        pathinfo,
1393                    )
1394                    self._count('files_uploaded')
1395                d2.addCallback(_add_db_entry)
1396                d2.addCallback(lambda ign: True)
1397                return d2.result
1398            elif pathinfo.islink:
1399                SYMLINK.log(path=fp)
1400                return False
1401            elif pathinfo.isdir:
1402                if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
1403                    self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
1404
1405                db_entry = self._db.get_db_entry(relpath_u)
1406                DIRECTORY_PATHENTRY.log(pathentry=db_entry)
1407                if not is_new_file(pathinfo, db_entry):
1408                    NOT_NEW_DIRECTORY.log()
1409                    return False
1410
1411                uploadable = Data("", self._client.convergence)
1412                encoded_path_u += magicpath.path2magic(u"/")
1413                with PROCESS_DIRECTORY().context() as action:
1414                    upload_d = DeferredContext(self._upload_dirnode.add_file(
1415                        encoded_path_u, uploadable,
1416                        metadata={"version": 0},
1417                        overwrite=True,
1418                        progress=item.progress,
1419                    ))
1420                def _dir_succeeded(ign):
1421                    action.add_success_fields(created_directory=relpath_u)
1422                    self._count('directories_created')
1423                upload_d.addCallback(_dir_succeeded)
1424                upload_d.addCallback(lambda ign: self._scan(relpath_u))
1425                upload_d.addCallback(lambda ign: True)
1426                return upload_d.addActionFinish()
1427            elif pathinfo.isfile:
1428                db_entry = self._db.get_db_entry(relpath_u)
1429
1430                last_downloaded_timestamp = now
1431
1432                if db_entry is None:
1433                    new_version = 0
1434                elif is_new_file(pathinfo, db_entry):
1435                    new_version = db_entry.version + 1
1436                else:
1437                    NOT_NEW_FILE.log()
1438                    self._count('objects_not_uploaded')
1439                    return False
1440
1441                metadata = {
1442                    'version': new_version,
1443                    'last_downloaded_timestamp': last_downloaded_timestamp,
1444                    'user_mtime': pathinfo.mtime_ns / 1000000000.0,  # why are we using ns in PathInfo??
1445                }
1446                if db_entry is not None:
1447                    if db_entry.last_downloaded_uri is not None:
1448                        metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
1449                    if db_entry.last_uploaded_uri is not None:
1450                        metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri
1451
1452                uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
1453                d2 = DeferredContext(self._upload_dirnode.add_file(
1454                    encoded_path_u, uploadable,
1455                    metadata=metadata,
1456                    overwrite=True,
1457                    progress=item.progress,
1458                ))
1459
1460                def _add_db_entry(filenode):
1461                    filecap = filenode.get_uri()
1462                    # if we're uploading a file, we want to set
1463                    # last_downloaded_uri to the filecap so that we don't
1464                    # immediately re-download it when we start up next
1465                    last_downloaded_uri = filecap
1466                    self._db.did_upload_version(
1467                        relpath_u,
1468                        new_version,
1469                        filecap,
1470                        last_downloaded_uri,
1471                        last_downloaded_timestamp,
1472                        pathinfo
1473                    )
1474                    self._count('files_uploaded')
1475                    return True
1476                d2.addCallback(_add_db_entry)
1477                return d2.result
1478            else:
1479                SPECIAL_FILE.log()
1480                return False
1481
1482        d.addCallback(_maybe_upload)
1483
1484        def _succeeded(res):
1485            if res:
1486                self._count('objects_succeeded')
1487            # TODO: maybe we want the status to be 'ignored' if res is False
1488            item.set_status('success', self._clock.seconds())
1489            return res
1490        def _failed(f):
1491            self._count('objects_failed')
1492            item.set_status('failure', self._clock.seconds())
1493            return f
1494        d.addCallbacks(_succeeded, _failed)
1495        return d.addActionFinish()
1496
1497    def _get_metadata(self, encoded_path_u):
1498        try:
1499            d = self._upload_dirnode.get_metadata_for(encoded_path_u)
1500        except KeyError:
1501            return Failure()
1502        return d
1503
1504    def _get_filenode(self, encoded_path_u):
1505        try:
1506            d = self._upload_dirnode.get(encoded_path_u)
1507        except KeyError:
1508            return Failure()
1509        return d
1510
1511
1512class WriteFileMixin(object):
1513    FUDGE_SECONDS = 10.0
1514
1515    def _get_conflicted_filename(self, abspath_u):
1516        return abspath_u + u".conflict"
1517
1518    def _write_downloaded_file(self, local_path_u, abspath_u, file_contents,
1519                               is_conflict=False, now=None, mtime=None):
1520        if now is None:
1521            now = time.time()
1522        action = WRITE_DOWNLOADED_FILE(
1523            abspath=abspath_u,
1524            size=len(file_contents),
1525            is_conflict=is_conflict,
1526            now=now,
1527            mtime=mtime,
1528        )
1529        with action:
1530            return self._write_downloaded_file_logged(
1531                local_path_u,
1532                abspath_u,
1533                file_contents,
1534                is_conflict,
1535                now,
1536                mtime,
1537            )
1538
1539    def _write_downloaded_file_logged(self, local_path_u, abspath_u,
1540                                      file_contents, is_conflict, now, mtime):
1541        # 1. Write a temporary file, say .foo.tmp.
1542        # 2. is_conflict determines whether this is an overwrite or a conflict.
1543        # 3. Set the mtime of the replacement file to be T seconds before the
1544        #    current local time, or mtime whichever is oldest
1545        # 4. Perform a file replacement with backup filename foo.backup,
1546        #    replaced file foo, and replacement file .foo.tmp. If any step of
1547        #    this operation fails, reclassify as a conflict and stop.
1548        #
1549        # Returns the path of the destination file.
1550        precondition_abspath(abspath_u)
1551        replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
1552
1553        initial_path_u = os.path.dirname(abspath_u)
1554        fileutil.make_dirs_with_absolute_mode(local_path_u, initial_path_u, (~ self._umask) & 0777)
1555        fileutil.write(replacement_path_u, file_contents)
1556        os.chmod(replacement_path_u, (~ self._umask) & 0666)
1557
1558        # FUDGE_SECONDS is used to determine if another process has
1559        # written to the same file concurrently. This is described in
1560        # the Earth Dragon section of our design document ("T" in the
1561        # spec is FUDGE_SECONDS here):
1562        # docs/proposed/magic-folder/remote-to-local-sync.rst
1563        fudge_time = now - self.FUDGE_SECONDS
1564        modified_time = min(fudge_time, mtime) if mtime else fudge_time
1565        os.utime(replacement_path_u, (now, modified_time))
1566        if is_conflict:
1567            return self._rename_conflicted_file(abspath_u, replacement_path_u)
1568        else:
1569            try:
1570                fileutil.replace_file(abspath_u, replacement_path_u)
1571                return abspath_u
1572            except fileutil.ConflictError as e:
1573                OVERWRITE_BECOMES_CONFLICT.log(reason=e)
1574                return self._rename_conflicted_file(abspath_u, replacement_path_u)
1575
1576    @log_call(
1577        action_type=u"magic-folder:rename-conflicted",
1578        include_args=["abspath_u", "replacement_path_u"],
1579    )
1580    def _rename_conflicted_file(self, abspath_u, replacement_path_u):
1581        conflict_path_u = self._get_conflicted_filename(abspath_u)
1582        fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
1583        return conflict_path_u
1584
1585    @log_call(
1586        action_type=u"magic-folder:rename-deleted",
1587        include_args=["abspath_u"],
1588    )
1589    def _rename_deleted_file(self, abspath_u):
1590        try:
1591            fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
1592        except OSError:
1593            ALREADY_GONE.log()
1594        return abspath_u
1595
1596
1597def _is_empty_filecap(client, cap):
1598    """
1599    Internal helper.
1600
1601    :param cap: a capability URI
1602
1603    :returns: True if "cap" represents an empty file
1604    """
1605    node = client.create_node_from_uri(
1606        None,
1607        cap.encode('ascii'),
1608    )
1609    return (not node.get_size())
1610
1611
1612class DownloadItem(QueuedItem):
1613    """
1614    Represents a single item in the _deque of the Downloader
1615    """
1616    kind = u"download"
1617
1618    def __init__(self, relpath_u, progress, filenode, metadata, size):
1619        super(DownloadItem, self).__init__(relpath_u, progress, size)
1620        self.file_node = filenode
1621        self.metadata = metadata
1622
1623
1624class Downloader(QueueMixin, WriteFileMixin):
1625
1626    def __init__(self, client, local_path_u, db, collective_dirnode,
1627                 upload_readonly_dircap, clock, is_upload_pending, umask,
1628                 status_reporter, poll_interval=60):
1629        QueueMixin.__init__(self, client, local_path_u, db, u'downloader', clock)
1630
1631        if not IDirectoryNode.providedBy(collective_dirnode):
1632            raise AssertionError("'collective_dircap' does not refer to a directory")
1633        if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
1634            raise AssertionError("'collective_dircap' is not a readonly cap to a directory")
1635
1636        self._collective_dirnode = collective_dirnode
1637        self._upload_readonly_dircap = upload_readonly_dircap
1638        self._is_upload_pending = is_upload_pending
1639        self._umask = umask
1640        self._status_reporter = status_reporter
1641        self._poll_interval = poll_interval
1642
1643    @eliotutil.inline_callbacks
1644    def start_downloading(self):
1645        action = START_DOWNLOADING(**self._log_fields)
1646        with action:
1647            ALL_FILES.log(files=self._db.get_all_relpaths())
1648
1649            while True:
1650                try:
1651                    yield self._scan_remote_collective(scan_self=True)
1652                    # The integration tests watch for this log message to
1653                    # decide when it is safe to proceed.  Clearly, we need
1654                    # better programmatic interrogation of magic-folder state.
1655                    print("Completed initial Magic Folder scan successfully ({})".format(self))
1656                    self._begin_processing()
1657                    return
1658                except Exception:
1659                    self._status_reporter(
1660                        False, "Initial scan has failed",
1661                        "Last tried at %s" % self.nice_current_time(),
1662                    )
1663                    write_traceback()
1664                    yield task.deferLater(self._clock, self._poll_interval, lambda: None)
1665
1666    def nice_current_time(self):
1667        return format_time(datetime.fromtimestamp(self._clock.seconds()).timetuple())
1668
1669    def _should_download(self, relpath_u, remote_version, remote_uri):
1670        """
1671        _should_download returns a bool indicating whether or not a remote object should be downloaded.
1672        We check the remote metadata version against our magic-folder db version number;
1673        latest version wins.
1674        """
1675        if magicpath.should_ignore_file(relpath_u):
1676            return False
1677        db_entry = self._db.get_db_entry(relpath_u)
1678        if db_entry is None:
1679            return True
1680        if db_entry.version < remote_version:
1681            return True
1682        if db_entry.last_downloaded_uri is None and _is_empty_filecap(self._client, remote_uri):
1683            pass
1684        elif db_entry.last_downloaded_uri != remote_uri:
1685            return True
1686        return False
1687
1688    def _get_local_latest(self, relpath_u):
1689        """
1690        _get_local_latest takes a unicode path string checks to see if this file object
1691        exists in our magic-folder db; if not then return None
1692        else check for an entry in our magic-folder db and return it.
1693        """
1694        if not self._get_filepath(relpath_u).exists():
1695            return None
1696        return self._db.get_db_entry(relpath_u)
1697
1698    def _get_collective_latest_file(self, filename):
1699        """
1700        _get_collective_latest_file takes a file path pointing to a file managed by
1701        magic-folder and returns a deferred that fires with the two tuple containing a
1702        file node and metadata for the latest version of the file located in the
1703        magic-folder collective directory.
1704        """
1705        collective_dirmap_d = self._collective_dirnode.list()
1706        def scan_collective(result):
1707            list_of_deferreds = []
1708            for dir_name in result.keys():
1709                # XXX make sure it's a directory
1710                d = defer.succeed(None)
1711                d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
1712                list_of_deferreds.append(d)
1713            deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
1714            return deferList
1715        collective_dirmap_d.addCallback(scan_collective)
1716        def highest_version(deferredList):
1717            max_version = 0
1718            metadata = None
1719            node = None
1720            for success, result in deferredList:
1721                if success:
1722                    if node is None or result[1]['version'] > max_version:
1723                        node, metadata = result
1724                        max_version = result[1]['version']
1725            return node, metadata
1726        collective_dirmap_d.addCallback(highest_version)
1727        return collective_dirmap_d
1728
1729    def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
1730        with SCAN_REMOTE_DMD(nickname=nickname).context():
1731            d = DeferredContext(dirnode.list())
1732        def scan_listing(listing_map):
1733            for encoded_relpath_u in listing_map.keys():
1734                relpath_u = magicpath.magic2path(encoded_relpath_u)
1735
1736                file_node, metadata = listing_map[encoded_relpath_u]
1737                local_dbentry = self._get_local_latest(relpath_u)
1738
1739                # XXX FIXME this is *awefully* similar to
1740                # _should_download code in function etc -- can we
1741                # share?
1742                remote_version = metadata.get('version', None)
1743                remote_uri = file_node.get_readonly_uri()
1744                REMOTE_DMD_ENTRY.log(
1745                    relpath=relpath_u,
1746                    pathentry=local_dbentry,
1747                    remote_version=remote_version,
1748                    remote_uri=remote_uri,
1749                )
1750
1751                if (local_dbentry is None or remote_version is None or
1752                    local_dbentry.version < remote_version or
1753                    (local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)):
1754                    ADD_TO_DOWNLOAD_QUEUE.log(relpath=relpath_u)
1755                    if scan_batch.has_key(relpath_u):
1756                        scan_batch[relpath_u] += [(file_node, metadata)]
1757                    else:
1758                        scan_batch[relpath_u] = [(file_node, metadata)]
1759            self._status_reporter(
1760                True, 'Magic folder is working',
1761                'Last scan: %s' % self.nice_current_time(),
1762            )
1763
1764        d.addCallback(scan_listing)
1765        return d.addActionFinish()
1766
1767    def _scan_remote_collective(self, scan_self=False):
1768        scan_batch = {}  # path -> [(filenode, metadata)]
1769        with SCAN_REMOTE_COLLECTIVE().context():
1770            d = DeferredContext(self._collective_dirnode.list())
1771        def scan_collective(dirmap):
1772            d2 = DeferredContext(defer.succeed(None))
1773            for dir_name in dirmap:
1774                (dirnode, metadata) = dirmap[dir_name]
1775                if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
1776                    d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
1777                                   self._scan_remote_dmd(dir_name, dirnode, scan_batch))
1778                    # XXX what should we do to make this failure more visible to users?
1779                    d2.addErrback(write_traceback)
1780            return d2.result
1781        d.addCallback(scan_collective)
1782
1783        @log_call(
1784            action_type=u"magic-folder:filter-batch-to-deque",
1785            include_args=[],
1786            include_result=False,
1787        )
1788        def _filter_batch_to_deque(ign):
1789            ITEM_QUEUE.log(items=self._deque)
1790            SCAN_BATCH.log(batch=scan_batch)
1791            for relpath_u in scan_batch.keys():
1792                file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
1793
1794                if self._should_download(relpath_u, metadata['version'], file_node.get_readonly_uri()):
1795                    to_dl = DownloadItem(
1796                        relpath_u,
1797                        PercentProgress(file_node.get_size()),
1798                        file_node,
1799                        metadata,
1800                        file_node.get_size(),
1801                    )
1802                    to_dl.set_status('queued', self._clock.seconds())
1803                    self._deque.append(to_dl)
1804                    self._count("objects_queued")
1805                else:
1806                    self._call_hook(None, 'processed', async=True)  # await this maybe-Deferred??
1807
1808        d.addCallback(_filter_batch_to_deque)
1809        return d.addActionFinish()
1810
1811    def _scan_delay(self):
1812        return self._poll_interval
1813
1814    @eliotutil.inline_callbacks
1815    def _perform_scan(self):
1816        with PERFORM_SCAN():
1817            try:
1818                yield self._scan_remote_collective()
1819                self._status_reporter(
1820                    True, 'Magic folder is working',
1821                    'Last scan: %s' % self.nice_current_time(),
1822                )
1823            except Exception as e:
1824                write_traceback()
1825                self._status_reporter(
1826                    False, 'Remote scan has failed: %s' % str(e),
1827                    'Last attempted at %s' % self.nice_current_time(),
1828                )
1829
1830    def _process(self, item):
1831        """
1832        Possibly upload a single QueuedItem.  If this returns False, the item is
1833        removed from _process_history.
1834        """
1835        # Downloader
1836        now = self._clock.seconds()
1837
1838        item.set_status('started', now)
1839        fp = self._get_filepath(item.relpath_u)
1840        abspath_u = unicode_from_filepath(fp)
1841        conflict_path_u = self._get_conflicted_filename(abspath_u)
1842        last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
1843
1844        with PROCESS_ITEM(item=item):
1845            d = DeferredContext(defer.succeed(False))
1846
1847        def do_update_db(written_abspath_u):
1848            filecap = item.file_node.get_uri()
1849            if not item.file_node.get_size():
1850                filecap = None  # ^ is an empty file
1851            last_downloaded_uri = filecap
1852            last_downloaded_timestamp = now
1853            written_pathinfo = get_pathinfo(written_abspath_u)
1854
1855            if not written_pathinfo.exists and not item.metadata.get('deleted', False):
1856                raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
1857
1858            self._db.did_upload_version(
1859                item.relpath_u,
1860                item.metadata['version'],
1861                last_uploaded_uri,
1862                last_downloaded_uri,
1863                last_downloaded_timestamp,
1864                written_pathinfo,
1865            )
1866            self._count('objects_downloaded')
1867            item.set_status('success', self._clock.seconds())
1868            return True
1869
1870        def failed(f):
1871            item.set_status('failure', self._clock.seconds())
1872            self._count('objects_failed')
1873            return f
1874
1875        if os.path.isfile(conflict_path_u):
1876            def fail(res):
1877                raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
1878            d.addCallback(fail)
1879        else:
1880
1881            # Let ``last_downloaded_uri`` be the field of that name obtained from
1882            # the directory entry metadata for ``foo`` in Bob's DMD (this field
1883            # may be absent). Then the algorithm is:
1884
1885            # * 2a. Attempt to "stat" ``foo`` to get its *current statinfo* (size
1886            #   in bytes, ``mtime``, and ``ctime``). If Alice has no local copy
1887            #   of ``foo``, classify as an overwrite.
1888
1889            current_statinfo = get_pathinfo(abspath_u)
1890
1891            is_conflict = False
1892            db_entry = self._db.get_db_entry(item.relpath_u)
1893            dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
1894
1895            # * 2b. Read the following information for the path ``foo`` from the
1896            #   local magic folder db:
1897            #   * the *last-seen statinfo*, if any (this is the size in
1898            #     bytes, ``mtime``, and ``ctime`` stored in the ``local_files``
1899            #     table when the file was last uploaded);
1900            #   * the ``last_uploaded_uri`` field of the ``local_files`` table
1901            #     for this file, which is the URI under which the file was last
1902            #     uploaded.
1903
1904            with CHECKING_CONFLICTS() as action:
1905                conflict_reason = None
1906                if db_entry:
1907                    # * 2c. If any of the following are true, then classify as a conflict:
1908                    #   * i. there are pending notifications of changes to ``foo``;
1909                    #   * ii. the last-seen statinfo is either absent (i.e. there is
1910                    #     no entry in the database for this path), or different from the
1911                    #     current statinfo;
1912
1913                    if current_statinfo.exists:
1914                        if (db_entry.mtime_ns != current_statinfo.mtime_ns or \
1915                            db_entry.ctime_ns != current_statinfo.ctime_ns or \
1916                            db_entry.size != current_statinfo.size):
1917                            is_conflict = True
1918                            conflict_reason = u"dbentry mismatch metadata"
1919
1920                        if db_entry.last_downloaded_uri is None \
1921                           or db_entry.last_uploaded_uri is None \
1922                           or dmd_last_downloaded_uri is None:
1923                            # we've never downloaded anything before for this
1924                            # file, but the other side might have created a new
1925                            # file "at the same time"
1926                            if db_entry.version >= item.metadata['version']:
1927                                is_conflict = True
1928                                conflict_reason = u"dbentry newer version"
1929                        elif dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
1930                            is_conflict = True
1931                            conflict_reason = u"last_downloaded_uri mismatch"
1932
1933                else:  # no local db_entry .. but has the file appeared locally meantime?
1934                    if current_statinfo.exists:
1935                        is_conflict = True
1936                        conflict_reason = u"file appeared"
1937
1938                action.add_success_fields(
1939                    is_conflict=is_conflict,
1940                    conflict_reason=conflict_reason,
1941                )
1942
1943            if is_conflict:
1944                self._count('objects_conflicted')
1945
1946            if item.relpath_u.endswith(u"/"):
1947                if item.metadata.get('deleted', False):
1948                    REMOTE_DIRECTORY_DELETED.log()
1949                else:
1950                    REMOTE_DIRECTORY_CREATED.log()
1951                    d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
1952                    d.addCallback(lambda ign: abspath_u)
1953            else:
1954                if item.metadata.get('deleted', False):
1955                    d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
1956                else:
1957                    d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
1958                    d.addCallback(
1959                        lambda contents: self._write_downloaded_file(
1960                            self._local_path_u, abspath_u, contents,
1961                            is_conflict=is_conflict,
1962                            mtime=item.metadata.get('user_mtime', item.metadata.get('tahoe', {}).get('linkmotime')),
1963                        )
1964                    )
1965
1966        d.addCallback(do_update_db)
1967        d.addErrback(failed)
1968
1969        def trap_conflicts(f):
1970            f.trap(ConflictError)
1971            return False
1972        d.addErrback(trap_conflicts)
1973        return d.addActionFinish()
Note: See TracBrowser for help on using the repository browser.