1 | |
---|
2 | import sys, os |
---|
3 | import os.path |
---|
4 | from errno import EEXIST |
---|
5 | from collections import deque |
---|
6 | from datetime import datetime |
---|
7 | import time |
---|
8 | import ConfigParser |
---|
9 | |
---|
10 | from twisted.python.filepath import FilePath |
---|
11 | from twisted.python.monkey import MonkeyPatcher |
---|
12 | from twisted.internet import defer, reactor, task |
---|
13 | from twisted.internet.error import AlreadyCancelled |
---|
14 | from twisted.python.failure import Failure |
---|
15 | from twisted.python import runtime |
---|
16 | from twisted.application import service |
---|
17 | |
---|
18 | from zope.interface import Interface, Attribute, implementer |
---|
19 | |
---|
20 | from eliot import ( |
---|
21 | Field, |
---|
22 | ActionType, |
---|
23 | MessageType, |
---|
24 | write_failure, |
---|
25 | write_traceback, |
---|
26 | log_call, |
---|
27 | ) |
---|
28 | from eliot.twisted import ( |
---|
29 | DeferredContext, |
---|
30 | ) |
---|
31 | |
---|
32 | from allmydata.util import ( |
---|
33 | fileutil, |
---|
34 | configutil, |
---|
35 | yamlutil, |
---|
36 | eliotutil, |
---|
37 | ) |
---|
38 | from allmydata.util.fake_inotify import ( |
---|
39 | humanReadableMask, |
---|
40 | ) |
---|
41 | from allmydata.interfaces import IDirectoryNode |
---|
42 | from allmydata.util import log |
---|
43 | from allmydata.util.fileutil import ( |
---|
44 | precondition_abspath, |
---|
45 | get_pathinfo, |
---|
46 | ConflictError, |
---|
47 | abspath_expanduser_unicode, |
---|
48 | ) |
---|
49 | from allmydata.util.assertutil import precondition, _assert |
---|
50 | from allmydata.util.deferredutil import HookMixin |
---|
51 | from allmydata.util.progress import PercentProgress |
---|
52 | from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ |
---|
53 | extend_filepath, unicode_from_filepath, unicode_segments_from, \ |
---|
54 | quote_filepath, quote_local_unicode_path, FilenameEncodingError |
---|
55 | from allmydata.util.time_format import format_time |
---|
56 | from allmydata.immutable.upload import FileName, Data |
---|
57 | from allmydata import magicfolderdb, magicpath |
---|
58 | |
---|
59 | |
---|
60 | # Mask off all non-owner permissions for magic-folders files by default. |
---|
61 | _DEFAULT_DOWNLOAD_UMASK = 0o077 |
---|
62 | |
---|
63 | IN_EXCL_UNLINK = 0x04000000L |
---|
64 | |
---|
65 | |
---|
66 | class ConfigurationError(Exception): |
---|
67 | """ |
---|
68 | There was something wrong with some magic-folder configuration. |
---|
69 | """ |
---|
70 | |
---|
71 | |
---|
72 | def _get_inotify_module(): |
---|
73 | try: |
---|
74 | if sys.platform == "win32": |
---|
75 | from allmydata.windows import inotify |
---|
76 | elif runtime.platform.supportsINotify(): |
---|
77 | from twisted.internet import inotify |
---|
78 | else: |
---|
79 | raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n" |
---|
80 | "This currently requires Linux or Windows.") |
---|
81 | return inotify |
---|
82 | except (ImportError, AttributeError) as e: |
---|
83 | log.msg(e) |
---|
84 | if sys.platform == "win32": |
---|
85 | raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n" |
---|
86 | "Windows support requires at least Vista, and has only been tested on Windows 7.") |
---|
87 | raise |
---|
88 | |
---|
89 | |
---|
90 | def get_inotify_module(): |
---|
91 | # Until Twisted #9579 is fixed, the Docker check just screws things up. |
---|
92 | # Disable it. |
---|
93 | monkey = MonkeyPatcher() |
---|
94 | monkey.addPatch(runtime.platform, "isDocker", lambda: False) |
---|
95 | return monkey.runWithPatches(_get_inotify_module) |
---|
96 | |
---|
97 | |
---|
98 | def is_new_file(pathinfo, db_entry): |
---|
99 | if db_entry is None: |
---|
100 | return True |
---|
101 | |
---|
102 | if not pathinfo.exists and db_entry.size is None: |
---|
103 | return False |
---|
104 | |
---|
105 | return ((pathinfo.size, pathinfo.ctime_ns, pathinfo.mtime_ns) != |
---|
106 | (db_entry.size, db_entry.ctime_ns, db_entry.mtime_ns)) |
---|
107 | |
---|
108 | |
---|
109 | def _upgrade_magic_folder_config(basedir): |
---|
110 | """ |
---|
111 | Helper that upgrades from single-magic-folder-only configs to |
---|
112 | multiple magic-folder configuration style (in YAML) |
---|
113 | """ |
---|
114 | config_fname = os.path.join(basedir, "tahoe.cfg") |
---|
115 | config = configutil.get_config(config_fname) |
---|
116 | |
---|
117 | collective_fname = os.path.join(basedir, "private", "collective_dircap") |
---|
118 | upload_fname = os.path.join(basedir, "private", "magic_folder_dircap") |
---|
119 | magic_folders = { |
---|
120 | u"default": { |
---|
121 | u"directory": config.get("magic_folder", "local.directory").decode("utf-8"), |
---|
122 | u"collective_dircap": fileutil.read(collective_fname), |
---|
123 | u"upload_dircap": fileutil.read(upload_fname), |
---|
124 | u"poll_interval": int(config.get("magic_folder", "poll_interval")), |
---|
125 | }, |
---|
126 | } |
---|
127 | fileutil.move_into_place( |
---|
128 | source=os.path.join(basedir, "private", "magicfolderdb.sqlite"), |
---|
129 | dest=os.path.join(basedir, "private", "magicfolder_default.sqlite"), |
---|
130 | ) |
---|
131 | save_magic_folders(basedir, magic_folders) |
---|
132 | config.remove_option("magic_folder", "local.directory") |
---|
133 | config.remove_option("magic_folder", "poll_interval") |
---|
134 | configutil.write_config(os.path.join(basedir, 'tahoe.cfg'), config) |
---|
135 | fileutil.remove_if_possible(collective_fname) |
---|
136 | fileutil.remove_if_possible(upload_fname) |
---|
137 | |
---|
138 | |
---|
139 | def maybe_upgrade_magic_folders(node_directory): |
---|
140 | """ |
---|
141 | If the given node directory is not already using the new-style |
---|
142 | magic-folder config it will be upgraded to do so. (This should |
---|
143 | only be done if the user is running a command that needs to modify |
---|
144 | the config) |
---|
145 | """ |
---|
146 | yaml_fname = os.path.join(node_directory, u"private", u"magic_folders.yaml") |
---|
147 | if os.path.exists(yaml_fname): |
---|
148 | # we already have new-style magic folders |
---|
149 | return |
---|
150 | |
---|
151 | config_fname = os.path.join(node_directory, "tahoe.cfg") |
---|
152 | config = configutil.get_config(config_fname) |
---|
153 | |
---|
154 | # we have no YAML config; if we have config in tahoe.cfg then we |
---|
155 | # can upgrade it to the YAML-based configuration |
---|
156 | if config.has_option("magic_folder", "local.directory"): |
---|
157 | _upgrade_magic_folder_config(node_directory) |
---|
158 | |
---|
159 | |
---|
160 | def load_magic_folders(node_directory): |
---|
161 | """ |
---|
162 | Loads existing magic-folder configuration and returns it as a dict |
---|
163 | mapping name -> dict of config. This will NOT upgrade from |
---|
164 | old-style to new-style config (but WILL read old-style config and |
---|
165 | return in the same way as if it was new-style). |
---|
166 | |
---|
167 | :param node_directory: path where node data is stored |
---|
168 | :returns: dict mapping magic-folder-name to its config (also a dict) |
---|
169 | """ |
---|
170 | yaml_fname = os.path.join(node_directory, u"private", u"magic_folders.yaml") |
---|
171 | folders = dict() |
---|
172 | |
---|
173 | config_fname = os.path.join(node_directory, "tahoe.cfg") |
---|
174 | config = configutil.get_config(config_fname) |
---|
175 | |
---|
176 | if not os.path.exists(yaml_fname): |
---|
177 | # there will still be a magic_folder section in a "new" |
---|
178 | # config, but it won't have local.directory nor poll_interval |
---|
179 | # in it. |
---|
180 | if config.has_option("magic_folder", "local.directory"): |
---|
181 | up_fname = os.path.join(node_directory, "private", "magic_folder_dircap") |
---|
182 | coll_fname = os.path.join(node_directory, "private", "collective_dircap") |
---|
183 | directory = config.get("magic_folder", "local.directory").decode('utf8') |
---|
184 | try: |
---|
185 | interval = int(config.get("magic_folder", "poll_interval")) |
---|
186 | except ConfigParser.NoOptionError: |
---|
187 | interval = 60 |
---|
188 | |
---|
189 | if config.has_option("magic_folder", "download.umask"): |
---|
190 | umask = int(config.get("magic_folder", "download.umask"), 8) |
---|
191 | else: |
---|
192 | umask = _DEFAULT_DOWNLOAD_UMASK |
---|
193 | |
---|
194 | folders[u"default"] = { |
---|
195 | u"directory": directory, |
---|
196 | u"upload_dircap": fileutil.read(up_fname), |
---|
197 | u"collective_dircap": fileutil.read(coll_fname), |
---|
198 | u"poll_interval": interval, |
---|
199 | u"umask": umask, |
---|
200 | } |
---|
201 | else: |
---|
202 | # without any YAML file AND no local.directory option it's |
---|
203 | # an error if magic-folder is "enabled" because we don't |
---|
204 | # actually have enough config for any magic-folders at all |
---|
205 | if config.has_section("magic_folder") \ |
---|
206 | and config.getboolean("magic_folder", "enabled") \ |
---|
207 | and not folders: |
---|
208 | raise Exception( |
---|
209 | "[magic_folder] is enabled but has no YAML file and no " |
---|
210 | "'local.directory' option." |
---|
211 | ) |
---|
212 | |
---|
213 | elif os.path.exists(yaml_fname): # yaml config-file exists |
---|
214 | if config.has_option("magic_folder", "local.directory"): |
---|
215 | raise Exception( |
---|
216 | "magic-folder config has both old-style configuration" |
---|
217 | " and new-style configuration; please remove the " |
---|
218 | "'local.directory' key from tahoe.cfg or remove " |
---|
219 | "'magic_folders.yaml' from {}".format(node_directory) |
---|
220 | ) |
---|
221 | with open(yaml_fname, "r") as f: |
---|
222 | magic_folders = yamlutil.safe_load(f.read()) |
---|
223 | if not isinstance(magic_folders, dict): |
---|
224 | raise Exception( |
---|
225 | "'{}' should contain a dict".format(yaml_fname) |
---|
226 | ) |
---|
227 | |
---|
228 | folders = magic_folders['magic-folders'] |
---|
229 | if not isinstance(folders, dict): |
---|
230 | raise Exception( |
---|
231 | "'magic-folders' in '{}' should be a dict".format(yaml_fname) |
---|
232 | ) |
---|
233 | |
---|
234 | # check configuration |
---|
235 | folders = dict( |
---|
236 | (name, fix_magic_folder_config(yaml_fname, name, config)) |
---|
237 | for (name, config) |
---|
238 | in folders.items() |
---|
239 | ) |
---|
240 | return folders |
---|
241 | |
---|
242 | |
---|
243 | def fix_magic_folder_config(yaml_fname, name, config): |
---|
244 | """ |
---|
245 | Check the given folder configuration for validity. |
---|
246 | |
---|
247 | If it refers to a local directory which does not exist, create that |
---|
248 | directory with the configured permissions. |
---|
249 | |
---|
250 | :param unicode yaml_fname: The configuration file from which the |
---|
251 | configuration was read. |
---|
252 | |
---|
253 | :param unicode name: The name of the magic-folder this particular |
---|
254 | configuration blob is associated with. |
---|
255 | |
---|
256 | :param config: The configuration for a single magic-folder. This is |
---|
257 | expected to be a ``dict`` with certain keys and values of certain |
---|
258 | types but these properties will be checked. |
---|
259 | |
---|
260 | :raise ConfigurationError: If the given configuration object does not |
---|
261 | conform to some magic-folder configuration requirement. |
---|
262 | """ |
---|
263 | if not isinstance(config, dict): |
---|
264 | raise ConfigurationError( |
---|
265 | "Each item in '{}' must itself be a dict".format(yaml_fname) |
---|
266 | ) |
---|
267 | |
---|
268 | for k in ['collective_dircap', 'upload_dircap', 'directory', 'poll_interval']: |
---|
269 | if k not in config: |
---|
270 | raise ConfigurationError( |
---|
271 | "Config for magic folder '{}' is missing '{}'".format( |
---|
272 | name, k |
---|
273 | ) |
---|
274 | ) |
---|
275 | |
---|
276 | if not isinstance( |
---|
277 | config.setdefault(u"umask", _DEFAULT_DOWNLOAD_UMASK), |
---|
278 | int, |
---|
279 | ): |
---|
280 | raise Exception("magic-folder download umask must be an integer") |
---|
281 | |
---|
282 | # make sure directory for magic folder exists |
---|
283 | dir_fp = to_filepath(config['directory']) |
---|
284 | umask = config.setdefault('umask', 0077) |
---|
285 | |
---|
286 | try: |
---|
287 | os.mkdir(dir_fp.path, 0777 & (~ umask)) |
---|
288 | except OSError as e: |
---|
289 | if EEXIST != e.errno: |
---|
290 | # Report some unknown problem. |
---|
291 | raise ConfigurationError( |
---|
292 | "magic-folder {} configured path {} could not be created: " |
---|
293 | "{}".format( |
---|
294 | name, |
---|
295 | dir_fp.path, |
---|
296 | str(e), |
---|
297 | ), |
---|
298 | ) |
---|
299 | elif not dir_fp.isdir(): |
---|
300 | # Tell the user there's a collision. |
---|
301 | raise ConfigurationError( |
---|
302 | "magic-folder {} configured path {} exists and is not a " |
---|
303 | "directory".format( |
---|
304 | name, dir_fp.path, |
---|
305 | ), |
---|
306 | ) |
---|
307 | |
---|
308 | result_config = config.copy() |
---|
309 | for k in ['collective_dircap', 'upload_dircap']: |
---|
310 | if isinstance(config[k], unicode): |
---|
311 | result_config[k] = config[k].encode('ascii') |
---|
312 | return result_config |
---|
313 | |
---|
314 | |
---|
315 | |
---|
316 | def save_magic_folders(node_directory, folders): |
---|
317 | fileutil.write_atomically( |
---|
318 | os.path.join(node_directory, u"private", u"magic_folders.yaml"), |
---|
319 | yamlutil.safe_dump({u"magic-folders": folders}), |
---|
320 | ) |
---|
321 | |
---|
322 | config = configutil.get_config(os.path.join(node_directory, u"tahoe.cfg")) |
---|
323 | configutil.set_config(config, "magic_folder", "enabled", "True") |
---|
324 | configutil.write_config(os.path.join(node_directory, u"tahoe.cfg"), config) |
---|
325 | |
---|
326 | |
---|
327 | class MagicFolder(service.MultiService): |
---|
328 | |
---|
329 | @classmethod |
---|
330 | def from_config(cls, client_node, name, config): |
---|
331 | """ |
---|
332 | Create a ``MagicFolder`` from a client node and magic-folder |
---|
333 | configuration. |
---|
334 | |
---|
335 | :param _Client client_node: The client node the magic-folder is |
---|
336 | attached to. |
---|
337 | |
---|
338 | :param dict config: Magic-folder configuration like that in the list |
---|
339 | returned by ``load_magic_folders``. |
---|
340 | """ |
---|
341 | db_filename = client_node.config.get_private_path("magicfolder_{}.sqlite".format(name)) |
---|
342 | local_dir_config = config['directory'] |
---|
343 | try: |
---|
344 | poll_interval = int(config["poll_interval"]) |
---|
345 | except ValueError: |
---|
346 | raise ValueError("'poll_interval' option must be an int") |
---|
347 | |
---|
348 | return cls( |
---|
349 | client=client_node, |
---|
350 | upload_dircap=config["upload_dircap"], |
---|
351 | collective_dircap=config["collective_dircap"], |
---|
352 | # XXX surely a better way for this local_path_u business |
---|
353 | local_path_u=abspath_expanduser_unicode( |
---|
354 | local_dir_config, |
---|
355 | base=client_node.config.get_config_path(), |
---|
356 | ), |
---|
357 | dbfile=abspath_expanduser_unicode(db_filename), |
---|
358 | umask=config["umask"], |
---|
359 | name=name, |
---|
360 | downloader_delay=poll_interval, |
---|
361 | ) |
---|
362 | |
---|
363 | def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask, |
---|
364 | name, uploader_delay=1.0, clock=None, downloader_delay=60): |
---|
365 | precondition_abspath(local_path_u) |
---|
366 | if not os.path.exists(local_path_u): |
---|
367 | raise ValueError("'{}' does not exist".format(local_path_u)) |
---|
368 | if not os.path.isdir(local_path_u): |
---|
369 | raise ValueError("'{}' is not a directory".format(local_path_u)) |
---|
370 | # this is used by 'service' things and must be unique in this Service hierarchy |
---|
371 | self.name = 'magic-folder-{}'.format(name) |
---|
372 | |
---|
373 | service.MultiService.__init__(self) |
---|
374 | |
---|
375 | clock = clock or reactor |
---|
376 | db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1)) |
---|
377 | if db is None: |
---|
378 | raise Exception('ERROR: Unable to load magic folder db.') |
---|
379 | |
---|
380 | # for tests |
---|
381 | self._client = client |
---|
382 | self._db = db |
---|
383 | |
---|
384 | upload_dirnode = self._client.create_node_from_uri(upload_dircap) |
---|
385 | collective_dirnode = self._client.create_node_from_uri(collective_dircap) |
---|
386 | |
---|
387 | self.uploader = Uploader(client, local_path_u, db, upload_dirnode, uploader_delay, clock) |
---|
388 | self.downloader = Downloader(client, local_path_u, db, collective_dirnode, |
---|
389 | upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask, |
---|
390 | self.set_public_status, poll_interval=downloader_delay) |
---|
391 | self._public_status = (False, ['Magic folder has not yet started']) |
---|
392 | |
---|
393 | def get_public_status(self): |
---|
394 | """ |
---|
395 | For the web UI, basically. |
---|
396 | """ |
---|
397 | return self._public_status |
---|
398 | |
---|
399 | def set_public_status(self, status, *messages): |
---|
400 | self._public_status = (status, messages) |
---|
401 | |
---|
402 | def startService(self): |
---|
403 | # TODO: why is this being called more than once? |
---|
404 | if self.running: |
---|
405 | return defer.succeed(None) |
---|
406 | service.MultiService.startService(self) |
---|
407 | return self.uploader.start_monitoring() |
---|
408 | |
---|
409 | def stopService(self): |
---|
410 | with MAGIC_FOLDER_STOP(nickname=self.name).context(): |
---|
411 | d = DeferredContext(self._finish()) |
---|
412 | d.addBoth( |
---|
413 | lambda ign: service.MultiService.stopService(self) |
---|
414 | ) |
---|
415 | return d.addActionFinish() |
---|
416 | |
---|
417 | def ready(self): |
---|
418 | """ready is used to signal us to start |
---|
419 | processing the upload and download items... |
---|
420 | """ |
---|
421 | self.uploader.start_uploading() # synchronous, returns None |
---|
422 | return self.downloader.start_downloading() |
---|
423 | |
---|
424 | def _finish(self): |
---|
425 | d0 = self.downloader.stop() |
---|
426 | d1 = self.uploader.stop() |
---|
427 | return defer.DeferredList(list( |
---|
428 | DeferredContext(d).addErrback(write_failure).result |
---|
429 | for d in [d0, d1] |
---|
430 | )) |
---|
431 | |
---|
432 | |
---|
433 | _NICKNAME = Field.for_types( |
---|
434 | u"nickname", |
---|
435 | [unicode, bytes], |
---|
436 | u"A Magic-Folder participant nickname.", |
---|
437 | ) |
---|
438 | |
---|
439 | _DIRECTION = Field.for_types( |
---|
440 | u"direction", |
---|
441 | [unicode], |
---|
442 | u"A synchronization direction: uploader or downloader.", |
---|
443 | eliotutil.validateSetMembership({u"uploader", u"downloader"}), |
---|
444 | ) |
---|
445 | |
---|
446 | PROCESSING_LOOP = ActionType( |
---|
447 | u"magic-folder:processing-loop", |
---|
448 | [_NICKNAME, _DIRECTION], |
---|
449 | [], |
---|
450 | u"A Magic-Folder is processing uploads or downloads.", |
---|
451 | ) |
---|
452 | |
---|
453 | ITERATION = ActionType( |
---|
454 | u"magic-folder:iteration", |
---|
455 | [_NICKNAME, _DIRECTION], |
---|
456 | [], |
---|
457 | u"A step towards synchronization in one direction.", |
---|
458 | ) |
---|
459 | |
---|
460 | _COUNT = Field.for_types( |
---|
461 | u"count", |
---|
462 | [int, long], |
---|
463 | u"The number of items in the processing queue.", |
---|
464 | ) |
---|
465 | |
---|
466 | PROCESS_QUEUE = ActionType( |
---|
467 | u"magic-folder:process-queue", |
---|
468 | [_COUNT], |
---|
469 | [], |
---|
470 | u"A Magic-Folder is working through an item queue.", |
---|
471 | ) |
---|
472 | |
---|
473 | SCAN_REMOTE_COLLECTIVE = ActionType( |
---|
474 | u"magic-folder:scan-remote-collective", |
---|
475 | [], |
---|
476 | [], |
---|
477 | u"The remote collective is being scanned for peer DMDs.", |
---|
478 | ) |
---|
479 | |
---|
480 | SCAN_REMOTE_DMD = ActionType( |
---|
481 | u"magic-folder:scan-remote-dmd", |
---|
482 | [_NICKNAME], |
---|
483 | [], |
---|
484 | u"A peer DMD is being scanned for changes.", |
---|
485 | ) |
---|
486 | |
---|
487 | REMOTE_VERSION = Field.for_types( |
---|
488 | u"remote_version", |
---|
489 | [int, long], |
---|
490 | u"The version of a path found in a peer DMD.", |
---|
491 | ) |
---|
492 | |
---|
493 | REMOTE_URI = Field.for_types( |
---|
494 | u"remote_uri", |
---|
495 | [bytes], |
---|
496 | u"The filecap of a path found in a peer DMD.", |
---|
497 | ) |
---|
498 | |
---|
499 | REMOTE_DMD_ENTRY = MessageType( |
---|
500 | u"magic-folder:remote-dmd-entry", |
---|
501 | [eliotutil.RELPATH, magicfolderdb.PATHENTRY, REMOTE_VERSION, REMOTE_URI], |
---|
502 | u"A single entry found by scanning a peer DMD.", |
---|
503 | ) |
---|
504 | |
---|
505 | ADD_TO_DOWNLOAD_QUEUE = MessageType( |
---|
506 | u"magic-folder:add-to-download-queue", |
---|
507 | [eliotutil.RELPATH], |
---|
508 | u"An entry was found to be changed and is being queued for download.", |
---|
509 | ) |
---|
510 | |
---|
511 | MAGIC_FOLDER_STOP = ActionType( |
---|
512 | u"magic-folder:stop", |
---|
513 | [_NICKNAME], |
---|
514 | [], |
---|
515 | u"A Magic-Folder is being stopped.", |
---|
516 | ) |
---|
517 | |
---|
518 | MAYBE_UPLOAD = MessageType( |
---|
519 | u"magic-folder:maybe-upload", |
---|
520 | [eliotutil.RELPATH], |
---|
521 | u"A decision is being made about whether to upload a file.", |
---|
522 | ) |
---|
523 | |
---|
524 | PENDING = Field.for_types( |
---|
525 | u"pending", |
---|
526 | [list], |
---|
527 | u"The paths which are pending processing.", |
---|
528 | ) |
---|
529 | |
---|
530 | REMOVE_FROM_PENDING = ActionType( |
---|
531 | u"magic-folder:remove-from-pending", |
---|
532 | [eliotutil.RELPATH, PENDING], |
---|
533 | [], |
---|
534 | u"An item being processed is being removed from the pending set.", |
---|
535 | ) |
---|
536 | |
---|
537 | PATH = Field( |
---|
538 | u"path", |
---|
539 | lambda fp: fp.asTextMode().path, |
---|
540 | u"A local filesystem path.", |
---|
541 | eliotutil.validateInstanceOf(FilePath), |
---|
542 | ) |
---|
543 | |
---|
544 | NOTIFIED_OBJECT_DISAPPEARED = MessageType( |
---|
545 | u"magic-folder:notified-object-disappeared", |
---|
546 | [PATH], |
---|
547 | u"A path which generated a notification was not found on the filesystem. This is normal.", |
---|
548 | ) |
---|
549 | |
---|
550 | NOT_UPLOADING = MessageType( |
---|
551 | u"magic-folder:not-uploading", |
---|
552 | [], |
---|
553 | u"An item being processed is not going to be uploaded.", |
---|
554 | ) |
---|
555 | |
---|
556 | SYMLINK = MessageType( |
---|
557 | u"magic-folder:symlink", |
---|
558 | [PATH], |
---|
559 | u"An item being processed was a symlink and is being skipped", |
---|
560 | ) |
---|
561 | |
---|
562 | CREATED_DIRECTORY = Field.for_types( |
---|
563 | u"created_directory", |
---|
564 | [unicode], |
---|
565 | u"The relative path of a newly created directory in a magic-folder.", |
---|
566 | ) |
---|
567 | |
---|
568 | PROCESS_DIRECTORY = ActionType( |
---|
569 | u"magic-folder:process-directory", |
---|
570 | [], |
---|
571 | [CREATED_DIRECTORY], |
---|
572 | u"An item being processed was a directory.", |
---|
573 | ) |
---|
574 | |
---|
575 | DIRECTORY_PATHENTRY = MessageType( |
---|
576 | u"magic-folder:directory-dbentry", |
---|
577 | [magicfolderdb.PATHENTRY], |
---|
578 | u"Local database state relating to an item possibly being uploaded.", |
---|
579 | ) |
---|
580 | |
---|
581 | NOT_NEW_DIRECTORY = MessageType( |
---|
582 | u"magic-folder:not-new-directory", |
---|
583 | [], |
---|
584 | u"A directory item being processed was found to not be new.", |
---|
585 | ) |
---|
586 | |
---|
587 | NOT_NEW_FILE = MessageType( |
---|
588 | u"magic-folder:not-new-file", |
---|
589 | [], |
---|
590 | u"A file item being processed was found to not be new (or changed).", |
---|
591 | ) |
---|
592 | |
---|
593 | SPECIAL_FILE = MessageType( |
---|
594 | u"magic-folder:special-file", |
---|
595 | [], |
---|
596 | u"An item being processed was found to be of a special type which is not supported.", |
---|
597 | ) |
---|
598 | |
---|
599 | _COUNTER_NAME = Field.for_types( |
---|
600 | u"counter_name", |
---|
601 | # Should really only be unicode |
---|
602 | [unicode, bytes], |
---|
603 | u"The name of a counter.", |
---|
604 | ) |
---|
605 | |
---|
606 | _DELTA = Field.for_types( |
---|
607 | u"delta", |
---|
608 | [int, long], |
---|
609 | u"An amount of a specific change in a counter.", |
---|
610 | ) |
---|
611 | |
---|
612 | _VALUE = Field.for_types( |
---|
613 | u"value", |
---|
614 | [int, long], |
---|
615 | u"The new value of a counter after a change.", |
---|
616 | ) |
---|
617 | |
---|
618 | COUNT_CHANGED = MessageType( |
---|
619 | u"magic-folder:count", |
---|
620 | [_COUNTER_NAME, _DELTA, _VALUE], |
---|
621 | u"The value of a counter has changed.", |
---|
622 | ) |
---|
623 | |
---|
624 | START_MONITORING = ActionType( |
---|
625 | u"magic-folder:start-monitoring", |
---|
626 | [_NICKNAME, _DIRECTION], |
---|
627 | [], |
---|
628 | u"Uploader is beginning to monitor the filesystem for uploadable changes.", |
---|
629 | ) |
---|
630 | |
---|
631 | STOP_MONITORING = ActionType( |
---|
632 | u"magic-folder:stop-monitoring", |
---|
633 | [_NICKNAME, _DIRECTION], |
---|
634 | [], |
---|
635 | u"Uploader is terminating filesystem monitoring operation.", |
---|
636 | ) |
---|
637 | |
---|
638 | START_UPLOADING = ActionType( |
---|
639 | u"magic-folder:start-uploading", |
---|
640 | [_NICKNAME, _DIRECTION], |
---|
641 | [], |
---|
642 | u"Uploader is performing startup-time inspection of known files.", |
---|
643 | ) |
---|
644 | |
---|
645 | _IGNORED = Field.for_types( |
---|
646 | u"ignored", |
---|
647 | [bool], |
---|
648 | u"A file proposed for queueing for processing is instead being ignored by policy.", |
---|
649 | ) |
---|
650 | |
---|
651 | _ALREADY_PENDING = Field.for_types( |
---|
652 | u"already_pending", |
---|
653 | [bool], |
---|
654 | u"A file proposed for queueing for processing is already in the queue.", |
---|
655 | ) |
---|
656 | |
---|
657 | _SIZE = Field.for_types( |
---|
658 | u"size", |
---|
659 | [int, long, type(None)], |
---|
660 | u"The size of a file accepted into the processing queue.", |
---|
661 | ) |
---|
662 | |
---|
663 | ADD_PENDING = ActionType( |
---|
664 | u"magic-folder:add-pending", |
---|
665 | [eliotutil.RELPATH], |
---|
666 | [_IGNORED, _ALREADY_PENDING, _SIZE], |
---|
667 | u"Uploader is adding a path to the processing queue.", |
---|
668 | ) |
---|
669 | |
---|
670 | FULL_SCAN = ActionType( |
---|
671 | u"magic-folder:full-scan", |
---|
672 | [_NICKNAME, _DIRECTION], |
---|
673 | [], |
---|
674 | u"A complete brute-force scan of the local directory is being performed.", |
---|
675 | ) |
---|
676 | |
---|
677 | SCAN = ActionType( |
---|
678 | u"magic-folder:scan", |
---|
679 | [eliotutil.RELPATH], |
---|
680 | [], |
---|
681 | u"A brute-force scan of a subset of the local directory is being performed.", |
---|
682 | ) |
---|
683 | |
---|
684 | NOTIFIED = ActionType( |
---|
685 | u"magic-folder:notified", |
---|
686 | [PATH, _NICKNAME, _DIRECTION], |
---|
687 | [], |
---|
688 | u"Magic-Folder received a notification of a local filesystem change for a certain path.", |
---|
689 | ) |
---|
690 | |
---|
691 | _EVENTS = Field( |
---|
692 | u"events", |
---|
693 | humanReadableMask, |
---|
694 | u"Details about a filesystem event generating a notification event.", |
---|
695 | eliotutil.validateInstanceOf((int, long)), |
---|
696 | ) |
---|
697 | |
---|
698 | _NON_DIR_CREATED = Field.for_types( |
---|
699 | u"non_dir_created", |
---|
700 | [bool], |
---|
701 | u"A creation event was for a non-directory and requires no further inspection.", |
---|
702 | ) |
---|
703 | |
---|
704 | |
---|
705 | REACT_TO_INOTIFY = ActionType( |
---|
706 | u"magic-folder:react-to-inotify", |
---|
707 | [_EVENTS], |
---|
708 | [_IGNORED, _NON_DIR_CREATED, _ALREADY_PENDING], |
---|
709 | u"Magic-Folder is processing a notification from inotify(7) (or a clone) about a filesystem event.", |
---|
710 | ) |
---|
711 | |
---|
712 | _ABSPATH = Field.for_types( |
---|
713 | u"abspath", |
---|
714 | [unicode], |
---|
715 | u"The absolute path of a file being written in a local directory.", |
---|
716 | ) |
---|
717 | |
---|
718 | _IS_CONFLICT = Field.for_types( |
---|
719 | u"is_conflict", |
---|
720 | [bool], |
---|
721 | u"An indication of whether a file being written in a local directory is in a conflicted state.", |
---|
722 | ) |
---|
723 | |
---|
724 | _NOW = Field.for_types( |
---|
725 | u"now", |
---|
726 | [int, long, float], |
---|
727 | u"The time at which a file is being written in a local directory.", |
---|
728 | ) |
---|
729 | |
---|
730 | _MTIME = Field.for_types( |
---|
731 | u"mtime", |
---|
732 | [int, long, float, type(None)], |
---|
733 | u"A modification time to put into the metadata of a file being written in a local directory.", |
---|
734 | ) |
---|
735 | |
---|
736 | WRITE_DOWNLOADED_FILE = ActionType( |
---|
737 | u"magic-folder:write-downloaded-file", |
---|
738 | [_ABSPATH, _SIZE, _IS_CONFLICT, _NOW, _MTIME], |
---|
739 | [], |
---|
740 | u"A downloaded file is being written to the filesystem.", |
---|
741 | ) |
---|
742 | |
---|
743 | ALREADY_GONE = MessageType( |
---|
744 | u"magic-folder:rename:already-gone", |
---|
745 | [], |
---|
746 | u"A deleted file could not be rewritten to a backup path because it no longer exists.", |
---|
747 | ) |
---|
748 | |
---|
749 | _REASON = Field( |
---|
750 | u"reason", |
---|
751 | lambda e: str(e), |
---|
752 | u"An exception which may describe the form of the conflict.", |
---|
753 | eliotutil.validateInstanceOf(Exception), |
---|
754 | ) |
---|
755 | |
---|
756 | OVERWRITE_BECOMES_CONFLICT = MessageType( |
---|
757 | u"magic-folder:overwrite-becomes-conflict", |
---|
758 | [_REASON], |
---|
759 | u"An attempt to overwrite an existing file failed because that file is now conflicted.", |
---|
760 | ) |
---|
761 | |
---|
762 | _FILES = Field( |
---|
763 | u"files", |
---|
764 | lambda file_set: list(file_set), |
---|
765 | u"All of the relative paths belonging to a Magic-Folder that are locally known.", |
---|
766 | ) |
---|
767 | |
---|
768 | ALL_FILES = MessageType( |
---|
769 | u"magic-folder:all-files", |
---|
770 | [_FILES], |
---|
771 | u"A record of the rough state of the local database at the time of downloader start up.", |
---|
772 | ) |
---|
773 | |
---|
774 | _ITEMS = Field( |
---|
775 | u"items", |
---|
776 | lambda deque: list(dict(relpath=item.relpath_u, kind=item.kind) for item in deque), |
---|
777 | u"Items in a processing queue.", |
---|
778 | ) |
---|
779 | |
---|
780 | ITEM_QUEUE = MessageType( |
---|
781 | u"magic-folder:item-queue", |
---|
782 | [_ITEMS], |
---|
783 | u"A report of the items in the processing queue at this point.", |
---|
784 | ) |
---|
785 | |
---|
786 | _BATCH = Field( |
---|
787 | u"batch", |
---|
788 | # Just report the paths for now. Perhaps something from the values would |
---|
789 | # also be useful, though? Consider it. |
---|
790 | lambda batch: batch.keys(), |
---|
791 | u"A batch of scanned items.", |
---|
792 | eliotutil.validateInstanceOf(dict), |
---|
793 | ) |
---|
794 | |
---|
795 | SCAN_BATCH = MessageType( |
---|
796 | u"magic-folder:scan-batch", |
---|
797 | [_BATCH], |
---|
798 | u"Items in a batch of files which were scanned from the DMD.", |
---|
799 | ) |
---|
800 | |
---|
801 | START_DOWNLOADING = ActionType( |
---|
802 | u"magic-folder:start-downloading", |
---|
803 | [_NICKNAME, _DIRECTION], |
---|
804 | [], |
---|
805 | u"A Magic-Folder downloader is initializing and beginning to manage downloads.", |
---|
806 | ) |
---|
807 | |
---|
808 | PERFORM_SCAN = ActionType( |
---|
809 | u"magic-folder:perform-scan", |
---|
810 | [], |
---|
811 | [], |
---|
812 | u"Remote storage is being scanned for changes which need to be synchronized.", |
---|
813 | ) |
---|
814 | |
---|
815 | _STATUS = Field.for_types( |
---|
816 | u"status", |
---|
817 | # Should just be unicode... |
---|
818 | [unicode, bytes], |
---|
819 | u"The status of an item in a processing queue.", |
---|
820 | ) |
---|
821 | |
---|
822 | QUEUED_ITEM_STATUS_CHANGE = MessageType( |
---|
823 | u"magic-folder:item:status-change", |
---|
824 | [eliotutil.RELPATH, _STATUS], |
---|
825 | u"A queued item changed status.", |
---|
826 | ) |
---|
827 | |
---|
828 | _CONFLICT_REASON = Field.for_types( |
---|
829 | u"conflict_reason", |
---|
830 | [unicode, type(None)], |
---|
831 | u"A human-readable explanation of why a file was in conflict.", |
---|
832 | eliotutil.validateSetMembership({ |
---|
833 | u"dbentry mismatch metadata", |
---|
834 | u"dbentry newer version", |
---|
835 | u"last_downloaded_uri mismatch", |
---|
836 | u"file appeared", |
---|
837 | None, |
---|
838 | }), |
---|
839 | ) |
---|
840 | |
---|
841 | CHECKING_CONFLICTS = ActionType( |
---|
842 | u"magic-folder:item:checking-conflicts", |
---|
843 | [], |
---|
844 | [_IS_CONFLICT, _CONFLICT_REASON], |
---|
845 | u"A potential download item is being checked to determine if it is in a conflicted state.", |
---|
846 | ) |
---|
847 | |
---|
848 | REMOTE_DIRECTORY_CREATED = MessageType( |
---|
849 | u"magic-folder:remote-directory-created", |
---|
850 | [], |
---|
851 | u"The downloader found a new directory in the DMD.", |
---|
852 | ) |
---|
853 | |
---|
854 | REMOTE_DIRECTORY_DELETED = MessageType( |
---|
855 | u"magic-folder:remote-directory-deleted", |
---|
856 | [], |
---|
857 | u"The downloader found a directory has been deleted from the DMD.", |
---|
858 | ) |
---|
859 | |
---|
860 | class QueueMixin(HookMixin): |
---|
861 | """ |
---|
862 | A parent class for Uploader and Downloader that handles putting |
---|
863 | IQueuedItem instances into a work queue and processing |
---|
864 | them. Tracks some history of recent items processed (for the |
---|
865 | "status" API). |
---|
866 | |
---|
867 | Subclasses implement _scan_delay, _perform_scan and _process |
---|
868 | |
---|
869 | :ivar unicode _name: Either "uploader" or "downloader". |
---|
870 | |
---|
871 | :ivar _deque: IQueuedItem instances to process |
---|
872 | |
---|
873 | :ivar _process_history: the last 20 items we processed |
---|
874 | |
---|
875 | :ivar _in_progress: current batch of items which are currently |
---|
876 | being processed; chunks of work are removed from _deque and |
---|
877 | worked on. As each finishes, it is added to _process_history |
---|
878 | (with oldest items falling off the end). |
---|
879 | """ |
---|
880 | |
---|
881 | def __init__(self, client, local_path_u, db, name, clock): |
---|
882 | self._client = client |
---|
883 | self._local_path_u = local_path_u |
---|
884 | self._local_filepath = to_filepath(local_path_u) |
---|
885 | self._db = db |
---|
886 | self._name = name |
---|
887 | self._clock = clock |
---|
888 | self._log_fields = dict( |
---|
889 | nickname=self._client.nickname, |
---|
890 | direction=self._name, |
---|
891 | ) |
---|
892 | self._hooks = { |
---|
893 | 'processed': None, |
---|
894 | 'started': None, |
---|
895 | 'iteration': None, |
---|
896 | 'inotify': None, |
---|
897 | 'item_processed': None, |
---|
898 | } |
---|
899 | self.started_d = self.set_hook('started') |
---|
900 | |
---|
901 | # we should have gotten nice errors already while loading the |
---|
902 | # config, but just to be safe: |
---|
903 | assert self._local_filepath.exists() |
---|
904 | assert self._local_filepath.isdir() |
---|
905 | |
---|
906 | self._deque = deque() |
---|
907 | # do we also want to bound on "maximum age"? |
---|
908 | self._process_history = deque(maxlen=20) |
---|
909 | self._in_progress = [] |
---|
910 | |
---|
911 | def get_status(self): |
---|
912 | """ |
---|
913 | Returns an iterable of instances that implement IQueuedItem |
---|
914 | """ |
---|
915 | for item in self._deque: |
---|
916 | yield item |
---|
917 | for item in self._in_progress: |
---|
918 | yield item |
---|
919 | for item in self._process_history: |
---|
920 | yield item |
---|
921 | |
---|
922 | def _get_filepath(self, relpath_u): |
---|
923 | return extend_filepath(self._local_filepath, relpath_u.split(u"/")) |
---|
924 | |
---|
925 | def stop(self): |
---|
926 | """ |
---|
927 | Don't process queued items anymore. |
---|
928 | |
---|
929 | :return Deferred: A ``Deferred`` that fires when processing has |
---|
930 | completely stopped. |
---|
931 | """ |
---|
932 | d = self._processing |
---|
933 | self._processing_loop.stop() |
---|
934 | self._processing = None |
---|
935 | self._processing_loop = None |
---|
936 | return d |
---|
937 | |
---|
938 | def _begin_processing(self): |
---|
939 | """ |
---|
940 | Start a loop that looks for work to do and then does it. |
---|
941 | """ |
---|
942 | action = PROCESSING_LOOP(**self._log_fields) |
---|
943 | |
---|
944 | # Note that we don't put the processing iterations into the logging |
---|
945 | # action because we expect this loop to run for the whole lifetime of |
---|
946 | # the process. The tooling for dealing with incomplete action trees |
---|
947 | # is still somewhat lacking. Putting the iteractions into the overall |
---|
948 | # loop action would hamper reading those logs for now. |
---|
949 | self._processing_loop = task.LoopingCall(self._processing_iteration) |
---|
950 | self._processing_loop.clock = self._clock |
---|
951 | self._processing = self._processing_loop.start(self._scan_delay(), now=True) |
---|
952 | |
---|
953 | with action.context(): |
---|
954 | # We do make sure errors appear in the loop action though. |
---|
955 | d = DeferredContext(self._processing) |
---|
956 | d.addActionFinish() |
---|
957 | |
---|
958 | def _processing_iteration(self): |
---|
959 | """ |
---|
960 | One iteration runs self._process_deque which calls _perform_scan() and |
---|
961 | then completely drains the _deque (processing each item). |
---|
962 | """ |
---|
963 | action = ITERATION(**self._log_fields) |
---|
964 | with action.context(): |
---|
965 | d = DeferredContext(defer.Deferred()) |
---|
966 | |
---|
967 | # adds items to our deque |
---|
968 | d.addCallback(lambda ignored: self._perform_scan()) |
---|
969 | |
---|
970 | # process anything in our queue |
---|
971 | d.addCallback(lambda ignored: self._process_deque()) |
---|
972 | |
---|
973 | # Let the tests know we've made it this far. |
---|
974 | d.addCallback(lambda ignored: self._call_hook(None, 'iteration')) |
---|
975 | |
---|
976 | # Get it out of the Eliot context |
---|
977 | result = d.addActionFinish() |
---|
978 | |
---|
979 | # Kick it off |
---|
980 | result.callback(None) |
---|
981 | |
---|
982 | # Give it back to LoopingCall so it can wait on us. |
---|
983 | return result |
---|
984 | |
---|
985 | def _scan_delay(self): |
---|
986 | raise NotImplementedError |
---|
987 | |
---|
988 | def _perform_scan(self): |
---|
989 | return |
---|
990 | |
---|
991 | @eliotutil.inline_callbacks |
---|
992 | def _process_deque(self): |
---|
993 | # process everything currently in the queue. we're turning it |
---|
994 | # into a list so that if any new items get added while we're |
---|
995 | # processing, they'll not run until next time) |
---|
996 | to_process = list(self._deque) |
---|
997 | self._deque.clear() |
---|
998 | self._count('objects_queued', -len(to_process)) |
---|
999 | |
---|
1000 | # we want to include all these in the next status request, so |
---|
1001 | # we must put them 'somewhere' before the next yield (and it's |
---|
1002 | # not in _process_history because that gets trimmed and we |
---|
1003 | # don't want anything to disappear until after it is |
---|
1004 | # completed) |
---|
1005 | self._in_progress.extend(to_process) |
---|
1006 | |
---|
1007 | with PROCESS_QUEUE(count=len(to_process)): |
---|
1008 | for item in to_process: |
---|
1009 | self._process_history.appendleft(item) |
---|
1010 | self._in_progress.remove(item) |
---|
1011 | try: |
---|
1012 | proc = yield self._process(item) |
---|
1013 | if not proc: |
---|
1014 | self._process_history.remove(item) |
---|
1015 | self._call_hook(item, 'item_processed') |
---|
1016 | except: |
---|
1017 | write_traceback() |
---|
1018 | item.set_status('failed', self._clock.seconds()) |
---|
1019 | proc = Failure() |
---|
1020 | |
---|
1021 | self._call_hook(proc, 'processed') |
---|
1022 | |
---|
1023 | def _get_relpath(self, filepath): |
---|
1024 | segments = unicode_segments_from(filepath, self._local_filepath) |
---|
1025 | return u"/".join(segments) |
---|
1026 | |
---|
1027 | def _count(self, counter_name, delta=1): |
---|
1028 | ctr = 'magic_folder.%s.%s' % (self._name, counter_name) |
---|
1029 | self._client.stats_provider.count(ctr, delta) |
---|
1030 | COUNT_CHANGED.log( |
---|
1031 | counter_name=counter_name, |
---|
1032 | delta=delta, |
---|
1033 | value=self._client.stats_provider.counters[ctr], |
---|
1034 | ) |
---|
1035 | |
---|
1036 | # this isn't in interfaces.py because it's very specific to QueueMixin |
---|
1037 | class IQueuedItem(Interface): |
---|
1038 | relpath_u = Attribute("The path this item represents") |
---|
1039 | progress = Attribute("A PercentProgress instance") |
---|
1040 | |
---|
1041 | def set_status(self, status, current_time=None): |
---|
1042 | """ |
---|
1043 | """ |
---|
1044 | |
---|
1045 | def status_time(self, state): |
---|
1046 | """ |
---|
1047 | Get the time of particular state change, or None |
---|
1048 | """ |
---|
1049 | |
---|
1050 | def status_history(self): |
---|
1051 | """ |
---|
1052 | All status changes, sorted latest -> oldest |
---|
1053 | """ |
---|
1054 | |
---|
1055 | |
---|
1056 | @implementer(IQueuedItem) |
---|
1057 | class QueuedItem(object): |
---|
1058 | kind = None |
---|
1059 | |
---|
1060 | def __init__(self, relpath_u, progress, size): |
---|
1061 | self.relpath_u = relpath_u |
---|
1062 | self.progress = progress |
---|
1063 | self._status_history = dict() |
---|
1064 | self.size = size |
---|
1065 | |
---|
1066 | def set_status(self, status, current_time=None): |
---|
1067 | if current_time is None: |
---|
1068 | current_time = time.time() |
---|
1069 | self._status_history[status] = current_time |
---|
1070 | QUEUED_ITEM_STATUS_CHANGE.log( |
---|
1071 | relpath=self.relpath_u, |
---|
1072 | status=status, |
---|
1073 | ) |
---|
1074 | |
---|
1075 | def status_time(self, state): |
---|
1076 | """ |
---|
1077 | Returns None if there's no status-update for 'state', else returns |
---|
1078 | the timestamp when that state was reached. |
---|
1079 | """ |
---|
1080 | return self._status_history.get(state, None) |
---|
1081 | |
---|
1082 | def status_history(self): |
---|
1083 | """ |
---|
1084 | Returns a list of 2-tuples of (state, timestamp) sorted by timestamp |
---|
1085 | """ |
---|
1086 | hist = self._status_history.items() |
---|
1087 | hist.sort(lambda a, b: cmp(a[1], b[1])) |
---|
1088 | return hist |
---|
1089 | |
---|
1090 | def __eq__(self, other): |
---|
1091 | return ( |
---|
1092 | other.relpath_u == self.relpath_u, |
---|
1093 | other.status_history() == self.status_history(), |
---|
1094 | ) |
---|
1095 | |
---|
1096 | |
---|
1097 | class UploadItem(QueuedItem): |
---|
1098 | """ |
---|
1099 | Represents a single item the _deque of the Uploader |
---|
1100 | """ |
---|
1101 | kind = u"upload" |
---|
1102 | |
---|
1103 | |
---|
1104 | _ITEM = Field( |
---|
1105 | u"item", |
---|
1106 | lambda i: { |
---|
1107 | u"relpath": i.relpath_u, |
---|
1108 | u"size": i.size, |
---|
1109 | }, |
---|
1110 | u"An item to be uploaded or downloaded.", |
---|
1111 | eliotutil.validateInstanceOf(QueuedItem), |
---|
1112 | ) |
---|
1113 | |
---|
1114 | PROCESS_ITEM = ActionType( |
---|
1115 | u"magic-folder:process-item", |
---|
1116 | [_ITEM], |
---|
1117 | [], |
---|
1118 | u"A path which was found wanting of an update is receiving an update.", |
---|
1119 | ) |
---|
1120 | |
---|
1121 | class Uploader(QueueMixin): |
---|
1122 | |
---|
1123 | def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock): |
---|
1124 | QueueMixin.__init__(self, client, local_path_u, db, u'uploader', clock) |
---|
1125 | |
---|
1126 | self.is_ready = False |
---|
1127 | |
---|
1128 | if not IDirectoryNode.providedBy(upload_dirnode): |
---|
1129 | raise AssertionError("'upload_dircap' does not refer to a directory") |
---|
1130 | if upload_dirnode.is_unknown() or upload_dirnode.is_readonly(): |
---|
1131 | raise AssertionError("'upload_dircap' is not a writecap to a directory") |
---|
1132 | |
---|
1133 | self._upload_dirnode = upload_dirnode |
---|
1134 | self._inotify = get_inotify_module() |
---|
1135 | self._notifier = self._inotify.INotify() |
---|
1136 | |
---|
1137 | self._pending = set() # of unicode relpaths |
---|
1138 | self._pending_delay = pending_delay |
---|
1139 | self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes |
---|
1140 | self._periodic_callid = None |
---|
1141 | |
---|
1142 | if hasattr(self._notifier, 'set_pending_delay'): |
---|
1143 | self._notifier.set_pending_delay(pending_delay) |
---|
1144 | |
---|
1145 | # TODO: what about IN_MOVE_SELF and IN_UNMOUNT? |
---|
1146 | # |
---|
1147 | self.mask = ( self._inotify.IN_CREATE |
---|
1148 | | self._inotify.IN_CLOSE_WRITE |
---|
1149 | | self._inotify.IN_MOVED_TO |
---|
1150 | | self._inotify.IN_MOVED_FROM |
---|
1151 | | self._inotify.IN_DELETE |
---|
1152 | | self._inotify.IN_ONLYDIR |
---|
1153 | | IN_EXCL_UNLINK |
---|
1154 | ) |
---|
1155 | self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify], |
---|
1156 | recursive=False)#True) |
---|
1157 | |
---|
1158 | def start_monitoring(self): |
---|
1159 | action = START_MONITORING(**self._log_fields) |
---|
1160 | with action.context(): |
---|
1161 | d = DeferredContext(defer.succeed(None)) |
---|
1162 | |
---|
1163 | d.addCallback(lambda ign: self._notifier.startReading()) |
---|
1164 | d.addCallback(lambda ign: self._count('dirs_monitored')) |
---|
1165 | d.addBoth(self._call_hook, 'started') |
---|
1166 | return d.addActionFinish() |
---|
1167 | |
---|
1168 | def stop(self): |
---|
1169 | action = STOP_MONITORING(**self._log_fields) |
---|
1170 | with action.context(): |
---|
1171 | self._notifier.stopReading() |
---|
1172 | self._count('dirs_monitored', -1) |
---|
1173 | if self._periodic_callid: |
---|
1174 | try: |
---|
1175 | self._periodic_callid.cancel() |
---|
1176 | except AlreadyCancelled: |
---|
1177 | pass |
---|
1178 | |
---|
1179 | if hasattr(self._notifier, 'wait_until_stopped'): |
---|
1180 | d = DeferredContext(self._notifier.wait_until_stopped()) |
---|
1181 | else: |
---|
1182 | d = DeferredContext(defer.succeed(None)) |
---|
1183 | |
---|
1184 | d.addCallback(lambda ignored: QueueMixin.stop(self)) |
---|
1185 | return d.addActionFinish() |
---|
1186 | |
---|
1187 | def start_uploading(self): |
---|
1188 | action = START_UPLOADING(**self._log_fields) |
---|
1189 | with action: |
---|
1190 | self.is_ready = True |
---|
1191 | |
---|
1192 | all_relpaths = self._db.get_all_relpaths() |
---|
1193 | |
---|
1194 | for relpath_u in all_relpaths: |
---|
1195 | self._add_pending(relpath_u) |
---|
1196 | |
---|
1197 | self._full_scan() |
---|
1198 | self._begin_processing() |
---|
1199 | |
---|
1200 | def _scan_delay(self): |
---|
1201 | return self._pending_delay |
---|
1202 | |
---|
1203 | def _full_scan(self): |
---|
1204 | with FULL_SCAN(**self._log_fields): |
---|
1205 | self._periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan) |
---|
1206 | self._scan(u"") |
---|
1207 | |
---|
1208 | def _add_pending(self, relpath_u): |
---|
1209 | with ADD_PENDING(relpath=relpath_u) as action: |
---|
1210 | if magicpath.should_ignore_file(relpath_u): |
---|
1211 | action.add_success_fields(ignored=True, already_pending=False, size=None) |
---|
1212 | return |
---|
1213 | if self.is_pending(relpath_u): |
---|
1214 | action.add_success_fields(ignored=False, already_pending=True, size=None) |
---|
1215 | return |
---|
1216 | |
---|
1217 | self._pending.add(relpath_u) |
---|
1218 | fp = self._get_filepath(relpath_u) |
---|
1219 | pathinfo = get_pathinfo(unicode_from_filepath(fp)) |
---|
1220 | progress = PercentProgress() |
---|
1221 | action.add_success_fields(ignored=False, already_pending=False, size=pathinfo.size) |
---|
1222 | item = UploadItem(relpath_u, progress, pathinfo.size) |
---|
1223 | item.set_status('queued', self._clock.seconds()) |
---|
1224 | self._deque.append(item) |
---|
1225 | self._count('objects_queued') |
---|
1226 | |
---|
1227 | def _scan(self, reldir_u): |
---|
1228 | # Scan a directory by (synchronously) adding the paths of all its children to self._pending. |
---|
1229 | # Note that this doesn't add them to the deque -- that will |
---|
1230 | with SCAN(relpath=reldir_u): |
---|
1231 | fp = self._get_filepath(reldir_u) |
---|
1232 | try: |
---|
1233 | children = listdir_filepath(fp) |
---|
1234 | except EnvironmentError: |
---|
1235 | raise Exception("WARNING: magic folder: permission denied on directory %s" |
---|
1236 | % quote_filepath(fp)) |
---|
1237 | except FilenameEncodingError: |
---|
1238 | raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" |
---|
1239 | % quote_filepath(fp)) |
---|
1240 | |
---|
1241 | for child in children: |
---|
1242 | _assert(isinstance(child, unicode), child=child) |
---|
1243 | self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child) |
---|
1244 | |
---|
1245 | def is_pending(self, relpath_u): |
---|
1246 | return relpath_u in self._pending |
---|
1247 | |
---|
1248 | def _notify(self, opaque, path, events_mask): |
---|
1249 | with NOTIFIED(path=path, **self._log_fields): |
---|
1250 | try: |
---|
1251 | return self._real_notify(opaque, path, events_mask) |
---|
1252 | except Exception: |
---|
1253 | write_traceback() |
---|
1254 | |
---|
1255 | def _real_notify(self, opaque, path, events_mask): |
---|
1256 | action = REACT_TO_INOTIFY( |
---|
1257 | # We could think about logging opaque here but ... it's opaque. |
---|
1258 | # All can do is id() or repr() it and neither of those actually |
---|
1259 | # produces very illuminating results. We drop opaque on the |
---|
1260 | # floor, anyway. |
---|
1261 | events=events_mask, |
---|
1262 | ) |
---|
1263 | success_fields = dict(non_dir_created=False, already_pending=False, ignored=False) |
---|
1264 | |
---|
1265 | with action: |
---|
1266 | relpath_u = self._get_relpath(path) |
---|
1267 | |
---|
1268 | # We filter out IN_CREATE events not associated with a directory. |
---|
1269 | # Acting on IN_CREATE for files could cause us to read and upload |
---|
1270 | # a possibly-incomplete file before the application has closed it. |
---|
1271 | # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think. |
---|
1272 | # It isn't possible to avoid watching for IN_CREATE at all, because |
---|
1273 | # it is the only event notified for a directory creation. |
---|
1274 | |
---|
1275 | if ((events_mask & self._inotify.IN_CREATE) != 0 and |
---|
1276 | (events_mask & self._inotify.IN_ISDIR) == 0): |
---|
1277 | success_fields[u"non_dir_created"] = True |
---|
1278 | elif relpath_u in self._pending: |
---|
1279 | success_fields[u"already_pending"] = True |
---|
1280 | elif magicpath.should_ignore_file(relpath_u): |
---|
1281 | success_fields[u"ignored"] = True |
---|
1282 | else: |
---|
1283 | self._add_pending(relpath_u) |
---|
1284 | self._call_hook(path, 'inotify') |
---|
1285 | action.add_success_fields(**success_fields) |
---|
1286 | |
---|
1287 | def _process(self, item): |
---|
1288 | """ |
---|
1289 | Possibly upload a single QueuedItem. If this returns False, the item is |
---|
1290 | removed from _process_history. |
---|
1291 | """ |
---|
1292 | # Uploader |
---|
1293 | with PROCESS_ITEM(item=item).context(): |
---|
1294 | d = DeferredContext(defer.succeed(False)) |
---|
1295 | |
---|
1296 | relpath_u = item.relpath_u |
---|
1297 | item.set_status('started', self._clock.seconds()) |
---|
1298 | |
---|
1299 | if relpath_u is None: |
---|
1300 | item.set_status('invalid_path', self._clock.seconds()) |
---|
1301 | return d.addActionFinish() |
---|
1302 | |
---|
1303 | precondition(isinstance(relpath_u, unicode), relpath_u) |
---|
1304 | precondition(not relpath_u.endswith(u'/'), relpath_u) |
---|
1305 | |
---|
1306 | def _maybe_upload(ign, now=None): |
---|
1307 | MAYBE_UPLOAD.log(relpath=relpath_u) |
---|
1308 | if now is None: |
---|
1309 | now = time.time() |
---|
1310 | fp = self._get_filepath(relpath_u) |
---|
1311 | pathinfo = get_pathinfo(unicode_from_filepath(fp)) |
---|
1312 | |
---|
1313 | try: |
---|
1314 | with REMOVE_FROM_PENDING(relpath=relpath_u, pending=list(self._pending)): |
---|
1315 | self._pending.remove(relpath_u) |
---|
1316 | except KeyError: |
---|
1317 | pass |
---|
1318 | encoded_path_u = magicpath.path2magic(relpath_u) |
---|
1319 | |
---|
1320 | if not pathinfo.exists: |
---|
1321 | # FIXME merge this with the 'isfile' case. |
---|
1322 | NOTIFIED_OBJECT_DISAPPEARED.log(path=fp) |
---|
1323 | self._count('objects_disappeared') |
---|
1324 | |
---|
1325 | db_entry = self._db.get_db_entry(relpath_u) |
---|
1326 | if db_entry is None: |
---|
1327 | return False |
---|
1328 | |
---|
1329 | last_downloaded_timestamp = now # is this correct? |
---|
1330 | |
---|
1331 | if is_new_file(pathinfo, db_entry): |
---|
1332 | new_version = db_entry.version + 1 |
---|
1333 | else: |
---|
1334 | NOT_UPLOADING.log() |
---|
1335 | self._count('objects_not_uploaded') |
---|
1336 | return False |
---|
1337 | |
---|
1338 | # look out! there's another place we set a "metadata" |
---|
1339 | # object like this (for new, not deleted files) |
---|
1340 | metadata = { |
---|
1341 | 'version': new_version, |
---|
1342 | 'deleted': True, |
---|
1343 | 'last_downloaded_timestamp': last_downloaded_timestamp, |
---|
1344 | 'user_mtime': pathinfo.ctime_ns / 1000000000.0, # why are we using ns in PathInfo?? |
---|
1345 | } |
---|
1346 | |
---|
1347 | # from the Fire Dragons part of the spec: |
---|
1348 | # Later, in response to a local filesystem change at a given path, the |
---|
1349 | # Magic Folder client reads the last-downloaded record associated with |
---|
1350 | # that path (if any) from the database and then uploads the current |
---|
1351 | # file. When it links the uploaded file into its client DMD, it |
---|
1352 | # includes the ``last_downloaded_uri`` field in the metadata of the |
---|
1353 | # directory entry, overwriting any existing field of that name. If |
---|
1354 | # there was no last-downloaded record associated with the path, this |
---|
1355 | # field is omitted. |
---|
1356 | # Note that ``last_downloaded_uri`` field does *not* record the URI of |
---|
1357 | # the uploaded file (which would be redundant); it records the URI of |
---|
1358 | # the last download before the local change that caused the upload. |
---|
1359 | # The field will be absent if the file has never been downloaded by |
---|
1360 | # this client (i.e. if it was created on this client and no change |
---|
1361 | # by any other client has been detected). |
---|
1362 | |
---|
1363 | # XXX currently not actually true: it will record the |
---|
1364 | # LAST THING we wrote to (or saw on) disk (not |
---|
1365 | # necessarily downloaded?) |
---|
1366 | |
---|
1367 | if db_entry.last_downloaded_uri is not None: |
---|
1368 | metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri |
---|
1369 | if db_entry.last_uploaded_uri is not None: |
---|
1370 | metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri |
---|
1371 | |
---|
1372 | empty_uploadable = Data("", self._client.convergence) |
---|
1373 | d2 = DeferredContext(self._upload_dirnode.add_file( |
---|
1374 | encoded_path_u, empty_uploadable, |
---|
1375 | metadata=metadata, |
---|
1376 | overwrite=True, |
---|
1377 | progress=item.progress, |
---|
1378 | )) |
---|
1379 | |
---|
1380 | def _add_db_entry(filenode): |
---|
1381 | filecap = filenode.get_uri() |
---|
1382 | # if we're uploading a file, we want to set |
---|
1383 | # last_downloaded_uri to the filecap so that we don't |
---|
1384 | # immediately re-download it when we start up next |
---|
1385 | last_downloaded_uri = metadata.get('last_downloaded_uri', filecap) |
---|
1386 | self._db.did_upload_version( |
---|
1387 | relpath_u, |
---|
1388 | new_version, |
---|
1389 | filecap, |
---|
1390 | last_downloaded_uri, |
---|
1391 | last_downloaded_timestamp, |
---|
1392 | pathinfo, |
---|
1393 | ) |
---|
1394 | self._count('files_uploaded') |
---|
1395 | d2.addCallback(_add_db_entry) |
---|
1396 | d2.addCallback(lambda ign: True) |
---|
1397 | return d2.result |
---|
1398 | elif pathinfo.islink: |
---|
1399 | SYMLINK.log(path=fp) |
---|
1400 | return False |
---|
1401 | elif pathinfo.isdir: |
---|
1402 | if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False): |
---|
1403 | self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True) |
---|
1404 | |
---|
1405 | db_entry = self._db.get_db_entry(relpath_u) |
---|
1406 | DIRECTORY_PATHENTRY.log(pathentry=db_entry) |
---|
1407 | if not is_new_file(pathinfo, db_entry): |
---|
1408 | NOT_NEW_DIRECTORY.log() |
---|
1409 | return False |
---|
1410 | |
---|
1411 | uploadable = Data("", self._client.convergence) |
---|
1412 | encoded_path_u += magicpath.path2magic(u"/") |
---|
1413 | with PROCESS_DIRECTORY().context() as action: |
---|
1414 | upload_d = DeferredContext(self._upload_dirnode.add_file( |
---|
1415 | encoded_path_u, uploadable, |
---|
1416 | metadata={"version": 0}, |
---|
1417 | overwrite=True, |
---|
1418 | progress=item.progress, |
---|
1419 | )) |
---|
1420 | def _dir_succeeded(ign): |
---|
1421 | action.add_success_fields(created_directory=relpath_u) |
---|
1422 | self._count('directories_created') |
---|
1423 | upload_d.addCallback(_dir_succeeded) |
---|
1424 | upload_d.addCallback(lambda ign: self._scan(relpath_u)) |
---|
1425 | upload_d.addCallback(lambda ign: True) |
---|
1426 | return upload_d.addActionFinish() |
---|
1427 | elif pathinfo.isfile: |
---|
1428 | db_entry = self._db.get_db_entry(relpath_u) |
---|
1429 | |
---|
1430 | last_downloaded_timestamp = now |
---|
1431 | |
---|
1432 | if db_entry is None: |
---|
1433 | new_version = 0 |
---|
1434 | elif is_new_file(pathinfo, db_entry): |
---|
1435 | new_version = db_entry.version + 1 |
---|
1436 | else: |
---|
1437 | NOT_NEW_FILE.log() |
---|
1438 | self._count('objects_not_uploaded') |
---|
1439 | return False |
---|
1440 | |
---|
1441 | metadata = { |
---|
1442 | 'version': new_version, |
---|
1443 | 'last_downloaded_timestamp': last_downloaded_timestamp, |
---|
1444 | 'user_mtime': pathinfo.mtime_ns / 1000000000.0, # why are we using ns in PathInfo?? |
---|
1445 | } |
---|
1446 | if db_entry is not None: |
---|
1447 | if db_entry.last_downloaded_uri is not None: |
---|
1448 | metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri |
---|
1449 | if db_entry.last_uploaded_uri is not None: |
---|
1450 | metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri |
---|
1451 | |
---|
1452 | uploadable = FileName(unicode_from_filepath(fp), self._client.convergence) |
---|
1453 | d2 = DeferredContext(self._upload_dirnode.add_file( |
---|
1454 | encoded_path_u, uploadable, |
---|
1455 | metadata=metadata, |
---|
1456 | overwrite=True, |
---|
1457 | progress=item.progress, |
---|
1458 | )) |
---|
1459 | |
---|
1460 | def _add_db_entry(filenode): |
---|
1461 | filecap = filenode.get_uri() |
---|
1462 | # if we're uploading a file, we want to set |
---|
1463 | # last_downloaded_uri to the filecap so that we don't |
---|
1464 | # immediately re-download it when we start up next |
---|
1465 | last_downloaded_uri = filecap |
---|
1466 | self._db.did_upload_version( |
---|
1467 | relpath_u, |
---|
1468 | new_version, |
---|
1469 | filecap, |
---|
1470 | last_downloaded_uri, |
---|
1471 | last_downloaded_timestamp, |
---|
1472 | pathinfo |
---|
1473 | ) |
---|
1474 | self._count('files_uploaded') |
---|
1475 | return True |
---|
1476 | d2.addCallback(_add_db_entry) |
---|
1477 | return d2.result |
---|
1478 | else: |
---|
1479 | SPECIAL_FILE.log() |
---|
1480 | return False |
---|
1481 | |
---|
1482 | d.addCallback(_maybe_upload) |
---|
1483 | |
---|
1484 | def _succeeded(res): |
---|
1485 | if res: |
---|
1486 | self._count('objects_succeeded') |
---|
1487 | # TODO: maybe we want the status to be 'ignored' if res is False |
---|
1488 | item.set_status('success', self._clock.seconds()) |
---|
1489 | return res |
---|
1490 | def _failed(f): |
---|
1491 | self._count('objects_failed') |
---|
1492 | item.set_status('failure', self._clock.seconds()) |
---|
1493 | return f |
---|
1494 | d.addCallbacks(_succeeded, _failed) |
---|
1495 | return d.addActionFinish() |
---|
1496 | |
---|
1497 | def _get_metadata(self, encoded_path_u): |
---|
1498 | try: |
---|
1499 | d = self._upload_dirnode.get_metadata_for(encoded_path_u) |
---|
1500 | except KeyError: |
---|
1501 | return Failure() |
---|
1502 | return d |
---|
1503 | |
---|
1504 | def _get_filenode(self, encoded_path_u): |
---|
1505 | try: |
---|
1506 | d = self._upload_dirnode.get(encoded_path_u) |
---|
1507 | except KeyError: |
---|
1508 | return Failure() |
---|
1509 | return d |
---|
1510 | |
---|
1511 | |
---|
1512 | class WriteFileMixin(object): |
---|
1513 | FUDGE_SECONDS = 10.0 |
---|
1514 | |
---|
1515 | def _get_conflicted_filename(self, abspath_u): |
---|
1516 | return abspath_u + u".conflict" |
---|
1517 | |
---|
1518 | def _write_downloaded_file(self, local_path_u, abspath_u, file_contents, |
---|
1519 | is_conflict=False, now=None, mtime=None): |
---|
1520 | if now is None: |
---|
1521 | now = time.time() |
---|
1522 | action = WRITE_DOWNLOADED_FILE( |
---|
1523 | abspath=abspath_u, |
---|
1524 | size=len(file_contents), |
---|
1525 | is_conflict=is_conflict, |
---|
1526 | now=now, |
---|
1527 | mtime=mtime, |
---|
1528 | ) |
---|
1529 | with action: |
---|
1530 | return self._write_downloaded_file_logged( |
---|
1531 | local_path_u, |
---|
1532 | abspath_u, |
---|
1533 | file_contents, |
---|
1534 | is_conflict, |
---|
1535 | now, |
---|
1536 | mtime, |
---|
1537 | ) |
---|
1538 | |
---|
1539 | def _write_downloaded_file_logged(self, local_path_u, abspath_u, |
---|
1540 | file_contents, is_conflict, now, mtime): |
---|
1541 | # 1. Write a temporary file, say .foo.tmp. |
---|
1542 | # 2. is_conflict determines whether this is an overwrite or a conflict. |
---|
1543 | # 3. Set the mtime of the replacement file to be T seconds before the |
---|
1544 | # current local time, or mtime whichever is oldest |
---|
1545 | # 4. Perform a file replacement with backup filename foo.backup, |
---|
1546 | # replaced file foo, and replacement file .foo.tmp. If any step of |
---|
1547 | # this operation fails, reclassify as a conflict and stop. |
---|
1548 | # |
---|
1549 | # Returns the path of the destination file. |
---|
1550 | precondition_abspath(abspath_u) |
---|
1551 | replacement_path_u = abspath_u + u".tmp" # FIXME more unique |
---|
1552 | |
---|
1553 | initial_path_u = os.path.dirname(abspath_u) |
---|
1554 | fileutil.make_dirs_with_absolute_mode(local_path_u, initial_path_u, (~ self._umask) & 0777) |
---|
1555 | fileutil.write(replacement_path_u, file_contents) |
---|
1556 | os.chmod(replacement_path_u, (~ self._umask) & 0666) |
---|
1557 | |
---|
1558 | # FUDGE_SECONDS is used to determine if another process has |
---|
1559 | # written to the same file concurrently. This is described in |
---|
1560 | # the Earth Dragon section of our design document ("T" in the |
---|
1561 | # spec is FUDGE_SECONDS here): |
---|
1562 | # docs/proposed/magic-folder/remote-to-local-sync.rst |
---|
1563 | fudge_time = now - self.FUDGE_SECONDS |
---|
1564 | modified_time = min(fudge_time, mtime) if mtime else fudge_time |
---|
1565 | os.utime(replacement_path_u, (now, modified_time)) |
---|
1566 | if is_conflict: |
---|
1567 | return self._rename_conflicted_file(abspath_u, replacement_path_u) |
---|
1568 | else: |
---|
1569 | try: |
---|
1570 | fileutil.replace_file(abspath_u, replacement_path_u) |
---|
1571 | return abspath_u |
---|
1572 | except fileutil.ConflictError as e: |
---|
1573 | OVERWRITE_BECOMES_CONFLICT.log(reason=e) |
---|
1574 | return self._rename_conflicted_file(abspath_u, replacement_path_u) |
---|
1575 | |
---|
1576 | @log_call( |
---|
1577 | action_type=u"magic-folder:rename-conflicted", |
---|
1578 | include_args=["abspath_u", "replacement_path_u"], |
---|
1579 | ) |
---|
1580 | def _rename_conflicted_file(self, abspath_u, replacement_path_u): |
---|
1581 | conflict_path_u = self._get_conflicted_filename(abspath_u) |
---|
1582 | fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u) |
---|
1583 | return conflict_path_u |
---|
1584 | |
---|
1585 | @log_call( |
---|
1586 | action_type=u"magic-folder:rename-deleted", |
---|
1587 | include_args=["abspath_u"], |
---|
1588 | ) |
---|
1589 | def _rename_deleted_file(self, abspath_u): |
---|
1590 | try: |
---|
1591 | fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup') |
---|
1592 | except OSError: |
---|
1593 | ALREADY_GONE.log() |
---|
1594 | return abspath_u |
---|
1595 | |
---|
1596 | |
---|
1597 | def _is_empty_filecap(client, cap): |
---|
1598 | """ |
---|
1599 | Internal helper. |
---|
1600 | |
---|
1601 | :param cap: a capability URI |
---|
1602 | |
---|
1603 | :returns: True if "cap" represents an empty file |
---|
1604 | """ |
---|
1605 | node = client.create_node_from_uri( |
---|
1606 | None, |
---|
1607 | cap.encode('ascii'), |
---|
1608 | ) |
---|
1609 | return (not node.get_size()) |
---|
1610 | |
---|
1611 | |
---|
1612 | class DownloadItem(QueuedItem): |
---|
1613 | """ |
---|
1614 | Represents a single item in the _deque of the Downloader |
---|
1615 | """ |
---|
1616 | kind = u"download" |
---|
1617 | |
---|
1618 | def __init__(self, relpath_u, progress, filenode, metadata, size): |
---|
1619 | super(DownloadItem, self).__init__(relpath_u, progress, size) |
---|
1620 | self.file_node = filenode |
---|
1621 | self.metadata = metadata |
---|
1622 | |
---|
1623 | |
---|
1624 | class Downloader(QueueMixin, WriteFileMixin): |
---|
1625 | |
---|
1626 | def __init__(self, client, local_path_u, db, collective_dirnode, |
---|
1627 | upload_readonly_dircap, clock, is_upload_pending, umask, |
---|
1628 | status_reporter, poll_interval=60): |
---|
1629 | QueueMixin.__init__(self, client, local_path_u, db, u'downloader', clock) |
---|
1630 | |
---|
1631 | if not IDirectoryNode.providedBy(collective_dirnode): |
---|
1632 | raise AssertionError("'collective_dircap' does not refer to a directory") |
---|
1633 | if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly(): |
---|
1634 | raise AssertionError("'collective_dircap' is not a readonly cap to a directory") |
---|
1635 | |
---|
1636 | self._collective_dirnode = collective_dirnode |
---|
1637 | self._upload_readonly_dircap = upload_readonly_dircap |
---|
1638 | self._is_upload_pending = is_upload_pending |
---|
1639 | self._umask = umask |
---|
1640 | self._status_reporter = status_reporter |
---|
1641 | self._poll_interval = poll_interval |
---|
1642 | |
---|
1643 | @eliotutil.inline_callbacks |
---|
1644 | def start_downloading(self): |
---|
1645 | action = START_DOWNLOADING(**self._log_fields) |
---|
1646 | with action: |
---|
1647 | ALL_FILES.log(files=self._db.get_all_relpaths()) |
---|
1648 | |
---|
1649 | while True: |
---|
1650 | try: |
---|
1651 | yield self._scan_remote_collective(scan_self=True) |
---|
1652 | # The integration tests watch for this log message to |
---|
1653 | # decide when it is safe to proceed. Clearly, we need |
---|
1654 | # better programmatic interrogation of magic-folder state. |
---|
1655 | print("Completed initial Magic Folder scan successfully ({})".format(self)) |
---|
1656 | self._begin_processing() |
---|
1657 | return |
---|
1658 | except Exception: |
---|
1659 | self._status_reporter( |
---|
1660 | False, "Initial scan has failed", |
---|
1661 | "Last tried at %s" % self.nice_current_time(), |
---|
1662 | ) |
---|
1663 | write_traceback() |
---|
1664 | yield task.deferLater(self._clock, self._poll_interval, lambda: None) |
---|
1665 | |
---|
1666 | def nice_current_time(self): |
---|
1667 | return format_time(datetime.fromtimestamp(self._clock.seconds()).timetuple()) |
---|
1668 | |
---|
1669 | def _should_download(self, relpath_u, remote_version, remote_uri): |
---|
1670 | """ |
---|
1671 | _should_download returns a bool indicating whether or not a remote object should be downloaded. |
---|
1672 | We check the remote metadata version against our magic-folder db version number; |
---|
1673 | latest version wins. |
---|
1674 | """ |
---|
1675 | if magicpath.should_ignore_file(relpath_u): |
---|
1676 | return False |
---|
1677 | db_entry = self._db.get_db_entry(relpath_u) |
---|
1678 | if db_entry is None: |
---|
1679 | return True |
---|
1680 | if db_entry.version < remote_version: |
---|
1681 | return True |
---|
1682 | if db_entry.last_downloaded_uri is None and _is_empty_filecap(self._client, remote_uri): |
---|
1683 | pass |
---|
1684 | elif db_entry.last_downloaded_uri != remote_uri: |
---|
1685 | return True |
---|
1686 | return False |
---|
1687 | |
---|
1688 | def _get_local_latest(self, relpath_u): |
---|
1689 | """ |
---|
1690 | _get_local_latest takes a unicode path string checks to see if this file object |
---|
1691 | exists in our magic-folder db; if not then return None |
---|
1692 | else check for an entry in our magic-folder db and return it. |
---|
1693 | """ |
---|
1694 | if not self._get_filepath(relpath_u).exists(): |
---|
1695 | return None |
---|
1696 | return self._db.get_db_entry(relpath_u) |
---|
1697 | |
---|
1698 | def _get_collective_latest_file(self, filename): |
---|
1699 | """ |
---|
1700 | _get_collective_latest_file takes a file path pointing to a file managed by |
---|
1701 | magic-folder and returns a deferred that fires with the two tuple containing a |
---|
1702 | file node and metadata for the latest version of the file located in the |
---|
1703 | magic-folder collective directory. |
---|
1704 | """ |
---|
1705 | collective_dirmap_d = self._collective_dirnode.list() |
---|
1706 | def scan_collective(result): |
---|
1707 | list_of_deferreds = [] |
---|
1708 | for dir_name in result.keys(): |
---|
1709 | # XXX make sure it's a directory |
---|
1710 | d = defer.succeed(None) |
---|
1711 | d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename)) |
---|
1712 | list_of_deferreds.append(d) |
---|
1713 | deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True) |
---|
1714 | return deferList |
---|
1715 | collective_dirmap_d.addCallback(scan_collective) |
---|
1716 | def highest_version(deferredList): |
---|
1717 | max_version = 0 |
---|
1718 | metadata = None |
---|
1719 | node = None |
---|
1720 | for success, result in deferredList: |
---|
1721 | if success: |
---|
1722 | if node is None or result[1]['version'] > max_version: |
---|
1723 | node, metadata = result |
---|
1724 | max_version = result[1]['version'] |
---|
1725 | return node, metadata |
---|
1726 | collective_dirmap_d.addCallback(highest_version) |
---|
1727 | return collective_dirmap_d |
---|
1728 | |
---|
1729 | def _scan_remote_dmd(self, nickname, dirnode, scan_batch): |
---|
1730 | with SCAN_REMOTE_DMD(nickname=nickname).context(): |
---|
1731 | d = DeferredContext(dirnode.list()) |
---|
1732 | def scan_listing(listing_map): |
---|
1733 | for encoded_relpath_u in listing_map.keys(): |
---|
1734 | relpath_u = magicpath.magic2path(encoded_relpath_u) |
---|
1735 | |
---|
1736 | file_node, metadata = listing_map[encoded_relpath_u] |
---|
1737 | local_dbentry = self._get_local_latest(relpath_u) |
---|
1738 | |
---|
1739 | # XXX FIXME this is *awefully* similar to |
---|
1740 | # _should_download code in function etc -- can we |
---|
1741 | # share? |
---|
1742 | remote_version = metadata.get('version', None) |
---|
1743 | remote_uri = file_node.get_readonly_uri() |
---|
1744 | REMOTE_DMD_ENTRY.log( |
---|
1745 | relpath=relpath_u, |
---|
1746 | pathentry=local_dbentry, |
---|
1747 | remote_version=remote_version, |
---|
1748 | remote_uri=remote_uri, |
---|
1749 | ) |
---|
1750 | |
---|
1751 | if (local_dbentry is None or remote_version is None or |
---|
1752 | local_dbentry.version < remote_version or |
---|
1753 | (local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)): |
---|
1754 | ADD_TO_DOWNLOAD_QUEUE.log(relpath=relpath_u) |
---|
1755 | if scan_batch.has_key(relpath_u): |
---|
1756 | scan_batch[relpath_u] += [(file_node, metadata)] |
---|
1757 | else: |
---|
1758 | scan_batch[relpath_u] = [(file_node, metadata)] |
---|
1759 | self._status_reporter( |
---|
1760 | True, 'Magic folder is working', |
---|
1761 | 'Last scan: %s' % self.nice_current_time(), |
---|
1762 | ) |
---|
1763 | |
---|
1764 | d.addCallback(scan_listing) |
---|
1765 | return d.addActionFinish() |
---|
1766 | |
---|
1767 | def _scan_remote_collective(self, scan_self=False): |
---|
1768 | scan_batch = {} # path -> [(filenode, metadata)] |
---|
1769 | with SCAN_REMOTE_COLLECTIVE().context(): |
---|
1770 | d = DeferredContext(self._collective_dirnode.list()) |
---|
1771 | def scan_collective(dirmap): |
---|
1772 | d2 = DeferredContext(defer.succeed(None)) |
---|
1773 | for dir_name in dirmap: |
---|
1774 | (dirnode, metadata) = dirmap[dir_name] |
---|
1775 | if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap: |
---|
1776 | d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode: |
---|
1777 | self._scan_remote_dmd(dir_name, dirnode, scan_batch)) |
---|
1778 | # XXX what should we do to make this failure more visible to users? |
---|
1779 | d2.addErrback(write_traceback) |
---|
1780 | return d2.result |
---|
1781 | d.addCallback(scan_collective) |
---|
1782 | |
---|
1783 | @log_call( |
---|
1784 | action_type=u"magic-folder:filter-batch-to-deque", |
---|
1785 | include_args=[], |
---|
1786 | include_result=False, |
---|
1787 | ) |
---|
1788 | def _filter_batch_to_deque(ign): |
---|
1789 | ITEM_QUEUE.log(items=self._deque) |
---|
1790 | SCAN_BATCH.log(batch=scan_batch) |
---|
1791 | for relpath_u in scan_batch.keys(): |
---|
1792 | file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version']) |
---|
1793 | |
---|
1794 | if self._should_download(relpath_u, metadata['version'], file_node.get_readonly_uri()): |
---|
1795 | to_dl = DownloadItem( |
---|
1796 | relpath_u, |
---|
1797 | PercentProgress(file_node.get_size()), |
---|
1798 | file_node, |
---|
1799 | metadata, |
---|
1800 | file_node.get_size(), |
---|
1801 | ) |
---|
1802 | to_dl.set_status('queued', self._clock.seconds()) |
---|
1803 | self._deque.append(to_dl) |
---|
1804 | self._count("objects_queued") |
---|
1805 | else: |
---|
1806 | self._call_hook(None, 'processed', async=True) # await this maybe-Deferred?? |
---|
1807 | |
---|
1808 | d.addCallback(_filter_batch_to_deque) |
---|
1809 | return d.addActionFinish() |
---|
1810 | |
---|
1811 | def _scan_delay(self): |
---|
1812 | return self._poll_interval |
---|
1813 | |
---|
1814 | @eliotutil.inline_callbacks |
---|
1815 | def _perform_scan(self): |
---|
1816 | with PERFORM_SCAN(): |
---|
1817 | try: |
---|
1818 | yield self._scan_remote_collective() |
---|
1819 | self._status_reporter( |
---|
1820 | True, 'Magic folder is working', |
---|
1821 | 'Last scan: %s' % self.nice_current_time(), |
---|
1822 | ) |
---|
1823 | except Exception as e: |
---|
1824 | write_traceback() |
---|
1825 | self._status_reporter( |
---|
1826 | False, 'Remote scan has failed: %s' % str(e), |
---|
1827 | 'Last attempted at %s' % self.nice_current_time(), |
---|
1828 | ) |
---|
1829 | |
---|
1830 | def _process(self, item): |
---|
1831 | """ |
---|
1832 | Possibly upload a single QueuedItem. If this returns False, the item is |
---|
1833 | removed from _process_history. |
---|
1834 | """ |
---|
1835 | # Downloader |
---|
1836 | now = self._clock.seconds() |
---|
1837 | |
---|
1838 | item.set_status('started', now) |
---|
1839 | fp = self._get_filepath(item.relpath_u) |
---|
1840 | abspath_u = unicode_from_filepath(fp) |
---|
1841 | conflict_path_u = self._get_conflicted_filename(abspath_u) |
---|
1842 | last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) |
---|
1843 | |
---|
1844 | with PROCESS_ITEM(item=item): |
---|
1845 | d = DeferredContext(defer.succeed(False)) |
---|
1846 | |
---|
1847 | def do_update_db(written_abspath_u): |
---|
1848 | filecap = item.file_node.get_uri() |
---|
1849 | if not item.file_node.get_size(): |
---|
1850 | filecap = None # ^ is an empty file |
---|
1851 | last_downloaded_uri = filecap |
---|
1852 | last_downloaded_timestamp = now |
---|
1853 | written_pathinfo = get_pathinfo(written_abspath_u) |
---|
1854 | |
---|
1855 | if not written_pathinfo.exists and not item.metadata.get('deleted', False): |
---|
1856 | raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u)) |
---|
1857 | |
---|
1858 | self._db.did_upload_version( |
---|
1859 | item.relpath_u, |
---|
1860 | item.metadata['version'], |
---|
1861 | last_uploaded_uri, |
---|
1862 | last_downloaded_uri, |
---|
1863 | last_downloaded_timestamp, |
---|
1864 | written_pathinfo, |
---|
1865 | ) |
---|
1866 | self._count('objects_downloaded') |
---|
1867 | item.set_status('success', self._clock.seconds()) |
---|
1868 | return True |
---|
1869 | |
---|
1870 | def failed(f): |
---|
1871 | item.set_status('failure', self._clock.seconds()) |
---|
1872 | self._count('objects_failed') |
---|
1873 | return f |
---|
1874 | |
---|
1875 | if os.path.isfile(conflict_path_u): |
---|
1876 | def fail(res): |
---|
1877 | raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,)) |
---|
1878 | d.addCallback(fail) |
---|
1879 | else: |
---|
1880 | |
---|
1881 | # Let ``last_downloaded_uri`` be the field of that name obtained from |
---|
1882 | # the directory entry metadata for ``foo`` in Bob's DMD (this field |
---|
1883 | # may be absent). Then the algorithm is: |
---|
1884 | |
---|
1885 | # * 2a. Attempt to "stat" ``foo`` to get its *current statinfo* (size |
---|
1886 | # in bytes, ``mtime``, and ``ctime``). If Alice has no local copy |
---|
1887 | # of ``foo``, classify as an overwrite. |
---|
1888 | |
---|
1889 | current_statinfo = get_pathinfo(abspath_u) |
---|
1890 | |
---|
1891 | is_conflict = False |
---|
1892 | db_entry = self._db.get_db_entry(item.relpath_u) |
---|
1893 | dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None) |
---|
1894 | |
---|
1895 | # * 2b. Read the following information for the path ``foo`` from the |
---|
1896 | # local magic folder db: |
---|
1897 | # * the *last-seen statinfo*, if any (this is the size in |
---|
1898 | # bytes, ``mtime``, and ``ctime`` stored in the ``local_files`` |
---|
1899 | # table when the file was last uploaded); |
---|
1900 | # * the ``last_uploaded_uri`` field of the ``local_files`` table |
---|
1901 | # for this file, which is the URI under which the file was last |
---|
1902 | # uploaded. |
---|
1903 | |
---|
1904 | with CHECKING_CONFLICTS() as action: |
---|
1905 | conflict_reason = None |
---|
1906 | if db_entry: |
---|
1907 | # * 2c. If any of the following are true, then classify as a conflict: |
---|
1908 | # * i. there are pending notifications of changes to ``foo``; |
---|
1909 | # * ii. the last-seen statinfo is either absent (i.e. there is |
---|
1910 | # no entry in the database for this path), or different from the |
---|
1911 | # current statinfo; |
---|
1912 | |
---|
1913 | if current_statinfo.exists: |
---|
1914 | if (db_entry.mtime_ns != current_statinfo.mtime_ns or \ |
---|
1915 | db_entry.ctime_ns != current_statinfo.ctime_ns or \ |
---|
1916 | db_entry.size != current_statinfo.size): |
---|
1917 | is_conflict = True |
---|
1918 | conflict_reason = u"dbentry mismatch metadata" |
---|
1919 | |
---|
1920 | if db_entry.last_downloaded_uri is None \ |
---|
1921 | or db_entry.last_uploaded_uri is None \ |
---|
1922 | or dmd_last_downloaded_uri is None: |
---|
1923 | # we've never downloaded anything before for this |
---|
1924 | # file, but the other side might have created a new |
---|
1925 | # file "at the same time" |
---|
1926 | if db_entry.version >= item.metadata['version']: |
---|
1927 | is_conflict = True |
---|
1928 | conflict_reason = u"dbentry newer version" |
---|
1929 | elif dmd_last_downloaded_uri != db_entry.last_downloaded_uri: |
---|
1930 | is_conflict = True |
---|
1931 | conflict_reason = u"last_downloaded_uri mismatch" |
---|
1932 | |
---|
1933 | else: # no local db_entry .. but has the file appeared locally meantime? |
---|
1934 | if current_statinfo.exists: |
---|
1935 | is_conflict = True |
---|
1936 | conflict_reason = u"file appeared" |
---|
1937 | |
---|
1938 | action.add_success_fields( |
---|
1939 | is_conflict=is_conflict, |
---|
1940 | conflict_reason=conflict_reason, |
---|
1941 | ) |
---|
1942 | |
---|
1943 | if is_conflict: |
---|
1944 | self._count('objects_conflicted') |
---|
1945 | |
---|
1946 | if item.relpath_u.endswith(u"/"): |
---|
1947 | if item.metadata.get('deleted', False): |
---|
1948 | REMOTE_DIRECTORY_DELETED.log() |
---|
1949 | else: |
---|
1950 | REMOTE_DIRECTORY_CREATED.log() |
---|
1951 | d.addCallback(lambda ign: fileutil.make_dirs(abspath_u)) |
---|
1952 | d.addCallback(lambda ign: abspath_u) |
---|
1953 | else: |
---|
1954 | if item.metadata.get('deleted', False): |
---|
1955 | d.addCallback(lambda ign: self._rename_deleted_file(abspath_u)) |
---|
1956 | else: |
---|
1957 | d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress)) |
---|
1958 | d.addCallback( |
---|
1959 | lambda contents: self._write_downloaded_file( |
---|
1960 | self._local_path_u, abspath_u, contents, |
---|
1961 | is_conflict=is_conflict, |
---|
1962 | mtime=item.metadata.get('user_mtime', item.metadata.get('tahoe', {}).get('linkmotime')), |
---|
1963 | ) |
---|
1964 | ) |
---|
1965 | |
---|
1966 | d.addCallback(do_update_db) |
---|
1967 | d.addErrback(failed) |
---|
1968 | |
---|
1969 | def trap_conflicts(f): |
---|
1970 | f.trap(ConflictError) |
---|
1971 | return False |
---|
1972 | d.addErrback(trap_conflicts) |
---|
1973 | return d.addActionFinish() |
---|