source: trunk/src/allmydata/client.py

Last change on this file was d918135a, checked in by Itamar Turner-Trauring <itamar@…>, at 2022-10-03T15:10:36Z

Use parser instead of ad-hoc parser.

  • Property mode set to 100644
File size: 39.4 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6from typing import Optional
7import os, stat, time, weakref
8from base64 import urlsafe_b64encode
9from functools import partial
10# On Python 2 this will be the backported package:
11from configparser import NoSectionError
12
13from foolscap.furl import (
14    decode_furl,
15)
16
17import attr
18from zope.interface import implementer
19
20from twisted.plugin import (
21    getPlugins,
22)
23from twisted.internet import reactor, defer
24from twisted.application import service
25from twisted.application.internet import TimerService
26from twisted.python.filepath import FilePath
27
28import allmydata
29from allmydata.crypto import rsa, ed25519
30from allmydata.crypto.util import remove_prefix
31from allmydata.storage.server import StorageServer, FoolscapStorageServer
32from allmydata import storage_client
33from allmydata.immutable.upload import Uploader
34from allmydata.immutable.offloaded import Helper
35from allmydata.introducer.client import IntroducerClient
36from allmydata.util import (
37    hashutil, base32, pollmixin, log, idlib,
38    yamlutil, configutil,
39    fileutil,
40)
41from allmydata.util.encodingutil import get_filesystem_encoding
42from allmydata.util.abbreviate import parse_abbreviated_size
43from allmydata.util.time_format import parse_duration, parse_date
44from allmydata.util.i2p_provider import create as create_i2p_provider
45from allmydata.util.tor_provider import create as create_tor_provider
46from allmydata.stats import StatsProvider
47from allmydata.history import History
48from allmydata.interfaces import (
49    IStatsProducer,
50    SDMF_VERSION,
51    MDMF_VERSION,
52    DEFAULT_MAX_SEGMENT_SIZE,
53    IFoolscapStoragePlugin,
54    IAnnounceableStorageServer,
55)
56from allmydata.nodemaker import NodeMaker
57from allmydata.blacklist import Blacklist
58from allmydata import node
59
60
61KiB=1024
62MiB=1024*KiB
63GiB=1024*MiB
64TiB=1024*GiB
65PiB=1024*TiB
66
67def _is_valid_section(section_name):
68    """
69    Check for valid dynamic configuration section names.
70
71    Currently considers all possible storage server plugin sections valid.
72    """
73    return (
74        section_name.startswith("storageserver.plugins.") or
75        section_name.startswith("storageclient.plugins.")
76    )
77
78
79_client_config = configutil.ValidConfiguration(
80    static_valid_sections={
81        "client": (
82            "helper.furl",
83            "introducer.furl",
84            "key_generator.furl",
85            "mutable.format",
86            "peers.preferred",
87            "shares.happy",
88            "shares.needed",
89            "shares.total",
90            "storage.plugins",
91        ),
92        "storage": (
93            "debug_discard",
94            "enabled",
95            "anonymous",
96            "expire.cutoff_date",
97            "expire.enabled",
98            "expire.immutable",
99            "expire.mode",
100            "expire.mode",
101            "expire.mutable",
102            "expire.override_lease_duration",
103            "readonly",
104            "reserved_space",
105            "storage_dir",
106            "plugins",
107        ),
108        "sftpd": (
109            "accounts.file",
110            "enabled",
111            "host_privkey_file",
112            "host_pubkey_file",
113            "port",
114        ),
115        "helper": (
116            "enabled",
117        ),
118    },
119    is_valid_section=_is_valid_section,
120    # Anything in a valid section is a valid item, for now.
121    is_valid_item=lambda section, ignored: _is_valid_section(section),
122)
123
124
125def _valid_config():
126    cfg = node._common_valid_config()
127    return cfg.update(_client_config)
128
129# this is put into README in new node-directories
130CLIENT_README = u"""
131This directory contains files which contain private data for the Tahoe node,
132such as private keys.  On Unix-like systems, the permissions on this directory
133are set to disallow users other than its owner from reading the contents of
134the files.   See the 'configuration.rst' documentation file for details.
135"""
136
137
138
139def _make_secret():
140    """
141    Returns a base32-encoded random secret of hashutil.CRYPTO_VAL_SIZE
142    bytes.
143    """
144    return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + b"\n"
145
146
147class SecretHolder(object):
148    def __init__(self, lease_secret, convergence_secret):
149        self._lease_secret = lease_secret
150        self._convergence_secret = convergence_secret
151
152    def get_renewal_secret(self):
153        return hashutil.my_renewal_secret_hash(self._lease_secret)
154
155    def get_cancel_secret(self):
156        return hashutil.my_cancel_secret_hash(self._lease_secret)
157
158    def get_convergence_secret(self):
159        return self._convergence_secret
160
161class KeyGenerator(object):
162    """I create RSA keys for mutable files. Each call to generate() returns a
163    single keypair."""
164
165    def generate(self):
166        """I return a Deferred that fires with a (verifyingkey, signingkey)
167        pair. The returned key will be 2048 bit"""
168        keysize = 2048
169        # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
170        # secs
171        signer, verifier = rsa.create_signing_keypair(keysize)
172        return defer.succeed( (verifier, signer) )
173
174class Terminator(service.Service):
175    def __init__(self):
176        self._clients = weakref.WeakKeyDictionary()
177    def register(self, c):
178        self._clients[c] = None
179    def stopService(self):
180        for c in self._clients:
181            c.stop()
182        return service.Service.stopService(self)
183
184
185def read_config(basedir, portnumfile, generated_files=[]):
186    """
187    Read and validate configuration for a client-style Node. See
188    :method:`allmydata.node.read_config` for parameter meanings (the
189    only difference here is we pass different validation data)
190
191    :returns: :class:`allmydata.node._Config` instance
192    """
193    return node.read_config(
194        basedir, portnumfile,
195        generated_files=generated_files,
196        _valid_config=_valid_config(),
197    )
198
199
200config_from_string = partial(
201    node.config_from_string,
202    _valid_config=_valid_config(),
203)
204
205
206def create_client(basedir=u".", _client_factory=None):
207    """
208    Creates a new client instance (a subclass of Node).
209
210    :param unicode basedir: the node directory (which may not exist yet)
211
212    :param _client_factory: (for testing) a callable that returns an
213        instance of :class:`allmydata.node.Node` (or a subclass). By default
214        this is :class:`allmydata.client._Client`
215
216    :returns: Deferred yielding an instance of :class:`allmydata.client._Client`
217    """
218    try:
219        node.create_node_dir(basedir, CLIENT_README)
220        config = read_config(basedir, u"client.port")
221        # following call is async
222        return create_client_from_config(
223            config,
224            _client_factory=_client_factory,
225        )
226    except Exception:
227        return defer.fail()
228
229
230@defer.inlineCallbacks
231def create_client_from_config(config, _client_factory=None, _introducer_factory=None):
232    """
233    Creates a new client instance (a subclass of Node).  Most code
234    should probably use `create_client` instead.
235
236    :returns: Deferred yielding a _Client instance
237
238    :param config: configuration instance (from read_config()) which
239        encapsulates everything in the "node directory".
240
241    :param _client_factory: for testing; the class to instantiate
242        instead of _Client
243
244    :param _introducer_factory: for testing; the class to instantiate instead
245        of IntroducerClient
246    """
247    if _client_factory is None:
248        _client_factory = _Client
249
250    i2p_provider = create_i2p_provider(reactor, config)
251    tor_provider = create_tor_provider(reactor, config)
252    handlers = node.create_connection_handlers(config, i2p_provider, tor_provider)
253    default_connection_handlers, foolscap_connection_handlers = handlers
254    tub_options = node.create_tub_options(config)
255
256    main_tub = node.create_main_tub(
257        config, tub_options, default_connection_handlers,
258        foolscap_connection_handlers, i2p_provider, tor_provider,
259    )
260
261    introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
262    storage_broker = create_storage_farm_broker(
263        config, default_connection_handlers, foolscap_connection_handlers,
264        tub_options, introducer_clients
265    )
266
267    client = _client_factory(
268        config,
269        main_tub,
270        i2p_provider,
271        tor_provider,
272        introducer_clients,
273        storage_broker,
274    )
275
276    # Initialize storage separately after creating the client.  This is
277    # necessary because we need to pass a reference to the client in to the
278    # storage plugins to allow them to initialize themselves (specifically,
279    # they may want the anonymous IStorageServer implementation so they don't
280    # have to duplicate all of its basic storage functionality).  A better way
281    # to do this, eventually, may be to create that implementation first and
282    # then pass it in to both storage plugin creation and the client factory.
283    # This avoids making a partially initialized client object escape the
284    # client factory and removes the circular dependency between these
285    # objects.
286    storage_plugins = yield _StoragePlugins.from_config(
287        client.get_anonymous_storage_server,
288        config,
289    )
290    client.init_storage(storage_plugins.announceable_storage_servers)
291
292    i2p_provider.setServiceParent(client)
293    tor_provider.setServiceParent(client)
294    for ic in introducer_clients:
295        ic.setServiceParent(client)
296    storage_broker.setServiceParent(client)
297    defer.returnValue(client)
298
299
300@attr.s
301class _StoragePlugins(object):
302    """
303    Functionality related to getting storage plugins set up and ready for use.
304
305    :ivar list[IAnnounceableStorageServer] announceable_storage_servers: The
306        announceable storage servers that should be used according to node
307        configuration.
308    """
309    announceable_storage_servers = attr.ib()
310
311    @classmethod
312    @defer.inlineCallbacks
313    def from_config(cls, get_anonymous_storage_server, config):
314        """
315        Load and configured storage plugins.
316
317        :param get_anonymous_storage_server: A no-argument callable which
318            returns the node's anonymous ``IStorageServer`` implementation.
319
320        :param _Config config: The node's configuration.
321
322        :return: A ``_StoragePlugins`` initialized from the given
323            configuration.
324        """
325        storage_plugin_names = cls._get_enabled_storage_plugin_names(config)
326        plugins = list(cls._collect_storage_plugins(storage_plugin_names))
327        unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins}
328        if unknown_plugin_names:
329            raise configutil.UnknownConfigError(
330                "Storage plugins {} are enabled but not known on this system.".format(
331                    unknown_plugin_names,
332                ),
333            )
334        announceable_storage_servers = yield cls._create_plugin_storage_servers(
335            get_anonymous_storage_server,
336            config,
337            plugins,
338        )
339        defer.returnValue(cls(
340            announceable_storage_servers,
341        ))
342
343    @classmethod
344    def _get_enabled_storage_plugin_names(cls, config):
345        """
346        Get the names of storage plugins that are enabled in the configuration.
347        """
348        return set(
349            config.get_config(
350                "storage", "plugins", ""
351            ).split(u",")
352        ) - {u""}
353
354    @classmethod
355    def _collect_storage_plugins(cls, storage_plugin_names):
356        """
357        Get the storage plugins with names matching those given.
358        """
359        return list(
360            plugin
361            for plugin
362            in getPlugins(IFoolscapStoragePlugin)
363            if plugin.name in storage_plugin_names
364        )
365
366    @classmethod
367    def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins):
368        """
369        Cause each storage plugin to instantiate its storage server and return
370        them all.
371
372        :return: A ``Deferred`` that fires with storage servers instantiated
373            by all of the given storage server plugins.
374        """
375        return defer.gatherResults(
376            list(
377                plugin.get_storage_server(
378                    cls._get_storage_plugin_configuration(config, plugin.name),
379                    get_anonymous_storage_server,
380                ).addCallback(
381                    partial(
382                        _add_to_announcement,
383                        {u"name": plugin.name},
384                    ),
385                )
386                for plugin
387                # The order is fairly arbitrary and it is not meant to convey
388                # anything but providing *some* stable ordering makes the data
389                # a little easier to deal with (mainly in tests and when
390                # manually inspecting it).
391                in sorted(plugins, key=lambda p: p.name)
392            ),
393        )
394
395    @classmethod
396    def _get_storage_plugin_configuration(cls, config, storage_plugin_name):
397        """
398        Load the configuration for a storage server plugin with the given name.
399
400        :return dict[bytes, bytes]: The matching configuration.
401        """
402        try:
403            config = config.items(
404                "storageserver.plugins." + storage_plugin_name,
405            )
406        except NoSectionError:
407            config = []
408        return dict(config)
409
410
411
412def _sequencer(config):
413    """
414    :returns: a 2-tuple consisting of a new announcement
415        sequence-number and random nonce (int, unicode). Reads and
416        re-writes configuration file "announcement-seqnum" (starting at 1
417        if that file doesn't exist).
418    """
419    seqnum_s = config.get_config_from_file("announcement-seqnum")
420    if not seqnum_s:
421        seqnum_s = u"0"
422    seqnum = int(seqnum_s.strip())
423    seqnum += 1  # increment
424    config.write_config_file("announcement-seqnum", "{}\n".format(seqnum))
425    nonce = _make_secret().strip()
426    return seqnum, nonce
427
428
429def create_introducer_clients(config, main_tub, _introducer_factory=None):
430    """
431    Read, validate and parse any 'introducers.yaml' configuration.
432
433    :param _introducer_factory: for testing; the class to instantiate instead
434        of IntroducerClient
435
436    :returns: a list of IntroducerClient instances
437    """
438    if _introducer_factory is None:
439        _introducer_factory = IntroducerClient
440
441    # we return this list
442    introducer_clients = []
443
444    introducers = config.get_introducer_configuration()
445
446    for petname, (furl, cache_path) in list(introducers.items()):
447        ic = _introducer_factory(
448            main_tub,
449            furl.encode("ascii"),
450            config.nickname,
451            str(allmydata.__full_version__),
452            str(_Client.OLDEST_SUPPORTED_VERSION),
453            partial(_sequencer, config),
454            cache_path,
455        )
456        introducer_clients.append(ic)
457    return introducer_clients
458
459
460def create_storage_farm_broker(config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients):
461    """
462    Create a StorageFarmBroker object, for use by Uploader/Downloader
463    (and everybody else who wants to use storage servers)
464
465    :param config: a _Config instance
466
467    :param default_connection_handlers: default Foolscap handlers
468
469    :param foolscap_connection_handlers: available/configured Foolscap
470        handlers
471
472    :param dict tub_options: how to configure our Tub
473
474    :param list introducer_clients: IntroducerClient instances if
475        we're connecting to any
476    """
477    storage_client_config = storage_client.StorageClientConfig.from_node_config(
478        config,
479    )
480
481    def tub_creator(handler_overrides=None, **kwargs):
482        return node.create_tub(
483            tub_options,
484            default_connection_handlers,
485            foolscap_connection_handlers,
486            handler_overrides={} if handler_overrides is None else handler_overrides,
487            **kwargs
488        )
489
490    sb = storage_client.StorageFarmBroker(
491        permute_peers=True,
492        tub_maker=tub_creator,
493        node_config=config,
494        storage_client_config=storage_client_config,
495    )
496    for ic in introducer_clients:
497        sb.use_introducer(ic)
498    return sb
499
500
501def _register_reference(key, config, tub, referenceable):
502    """
503    Register a referenceable in a tub with a stable fURL.
504
505    Stability is achieved by storing the fURL in the configuration the first
506    time and then reading it back on for future calls.
507
508    :param bytes key: An identifier for this reference which can be used to
509        identify its fURL in the configuration.
510
511    :param _Config config: The configuration to use for fURL persistence.
512
513    :param Tub tub: The tub in which to register the reference.
514
515    :param Referenceable referenceable: The referenceable to register in the
516        Tub.
517
518    :return bytes: The fURL at which the object is registered.
519    """
520    persisted_furl = config.get_private_config(
521        key,
522        default=None,
523    )
524    name = None
525    if persisted_furl is not None:
526        _, _, name = decode_furl(persisted_furl)
527    registered_furl = tub.registerReference(
528        referenceable,
529        name=name,
530    )
531    if persisted_furl is None:
532        config.write_private_config(key, registered_furl)
533    return registered_furl
534
535
536@implementer(IAnnounceableStorageServer)
537@attr.s
538class AnnounceableStorageServer(object):
539    announcement = attr.ib()
540    storage_server = attr.ib()
541
542
543
544def _add_to_announcement(information, announceable_storage_server):
545    """
546    Create a new ``AnnounceableStorageServer`` based on
547    ``announceable_storage_server`` with ``information`` added to its
548    ``announcement``.
549    """
550    updated_announcement = announceable_storage_server.announcement.copy()
551    updated_announcement.update(information)
552    return AnnounceableStorageServer(
553        updated_announcement,
554        announceable_storage_server.storage_server,
555    )
556
557
558def storage_enabled(config):
559    """
560    Is storage enabled according to the given configuration object?
561
562    :param _Config config: The configuration to inspect.
563
564    :return bool: ``True`` if storage is enabled, ``False`` otherwise.
565    """
566    return config.get_config("storage", "enabled", True, boolean=True)
567
568
569def anonymous_storage_enabled(config):
570    """
571    Is anonymous access to storage enabled according to the given
572    configuration object?
573
574    :param _Config config: The configuration to inspect.
575
576    :return bool: ``True`` if storage is enabled, ``False`` otherwise.
577    """
578    return (
579        storage_enabled(config) and
580        config.get_config("storage", "anonymous", True, boolean=True)
581    )
582
583
584@implementer(IStatsProducer)
585class _Client(node.Node, pollmixin.PollMixin):
586    """
587    This class should be refactored; see
588    https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931
589    """
590
591    STOREDIR = 'storage'
592    NODETYPE = "client"
593    EXIT_TRIGGER_FILE = "exit_trigger"
594
595    # This means that if a storage server treats me as though I were a
596    # 1.0.0 storage client, it will work as they expect.
597    OLDEST_SUPPORTED_VERSION = "1.0.0"
598
599    # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
600    # is the number of shares required to reconstruct a file. 'desired' means
601    # that we will abort an upload unless we can allocate space for at least
602    # this many. 'total' is the total number of shares created by encoding.
603    # If everybody has room then this is is how many we will upload.
604    DEFAULT_ENCODING_PARAMETERS = {"k": 3,
605                                   "happy": 7,
606                                   "n": 10,
607                                   "max_segment_size": DEFAULT_MAX_SEGMENT_SIZE,
608                                   }
609
610    def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
611                 storage_farm_broker):
612        """
613        Use :func:`allmydata.client.create_client` to instantiate one of these.
614        """
615        node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
616
617        self.started_timestamp = time.time()
618        self.logSource = "Client"
619        self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
620
621        self.introducer_clients = introducer_clients
622        self.storage_broker = storage_farm_broker
623
624        self.init_stats_provider()
625        self.init_secrets()
626        self.init_node_key()
627        self._key_generator = KeyGenerator()
628        key_gen_furl = config.get_config("client", "key_generator.furl", None)
629        if key_gen_furl:
630            log.msg("[client]key_generator.furl= is now ignored, see #2783")
631        self.init_client()
632        self.load_static_servers()
633        self.helper = None
634        if config.get_config("helper", "enabled", False, boolean=True):
635            if not self._is_tub_listening():
636                raise ValueError("config error: helper is enabled, but tub "
637                                 "is not listening ('tub.port=' is empty)")
638            self.init_helper()
639        self.init_sftp_server()
640
641        # If the node sees an exit_trigger file, it will poll every second to see
642        # whether the file still exists, and what its mtime is. If the file does not
643        # exist or has not been modified for a given timeout, the node will exit.
644        exit_trigger_file = config.get_config_path(self.EXIT_TRIGGER_FILE)
645        if os.path.exists(exit_trigger_file):
646            age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
647            self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
648            exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
649            exit_trigger.setServiceParent(self)
650
651        # this needs to happen last, so it can use getServiceNamed() to
652        # acquire references to StorageServer and other web-statusable things
653        webport = config.get_config("node", "web.port", None)
654        if webport:
655            self.init_web(webport) # strports string
656
657        # TODO this may be the wrong location for now? but as temporary measure
658        # it allows us to get NURLs for testing in test_istorageserver.py. This
659        # will eventually get fixed one way or another in
660        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3901. See also
661        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 for the bigger
662        # picture issue.
663        self.storage_nurls : Optional[set] = None
664
665    def init_stats_provider(self):
666        self.stats_provider = StatsProvider(self)
667        self.stats_provider.setServiceParent(self)
668        self.stats_provider.register_producer(self)
669
670    def get_stats(self):
671        return { 'node.uptime': time.time() - self.started_timestamp }
672
673    def init_secrets(self):
674        # configs are always unicode
675        def _unicode_make_secret():
676            return str(_make_secret(), "ascii")
677        lease_s = self.config.get_or_create_private_config(
678            "secret", _unicode_make_secret).encode("utf-8")
679        lease_secret = base32.a2b(lease_s)
680        convergence_s = self.config.get_or_create_private_config(
681            'convergence', _unicode_make_secret).encode("utf-8")
682        self.convergence = base32.a2b(convergence_s)
683        self._secret_holder = SecretHolder(lease_secret, self.convergence)
684
685    def init_node_key(self):
686        # we only create the key once. On all subsequent runs, we re-use the
687        # existing key
688        def _make_key():
689            private_key, _ = ed25519.create_signing_keypair()
690            # Config values are always unicode:
691            return str(ed25519.string_from_signing_key(private_key) + b"\n", "utf-8")
692
693        private_key_str = self.config.get_or_create_private_config(
694            "node.privkey", _make_key).encode("utf-8")
695        private_key, public_key = ed25519.signing_keypair_from_string(private_key_str)
696        public_key_str = ed25519.string_from_verifying_key(public_key)
697        self.config.write_config_file("node.pubkey", public_key_str + b"\n", "wb")
698        self._node_private_key = private_key
699        self._node_public_key = public_key
700
701    def get_long_nodeid(self):
702        # this matches what IServer.get_longname() says about us elsewhere
703        vk_string = ed25519.string_from_verifying_key(self._node_public_key)
704        return remove_prefix(vk_string, b"pub-")
705
706    def get_long_tubid(self):
707        return idlib.nodeid_b2a(self.nodeid)
708
709    def get_web_service(self):
710        """
711        :return: a reference to our web server
712        """
713        return self.getServiceNamed("webish")
714
715    def _init_permutation_seed(self, ss):
716        seed = self.config.get_config_from_file("permutation-seed")
717        if not seed:
718            have_shares = ss.have_shares()
719            if have_shares:
720                # if the server has shares but not a recorded
721                # permutation-seed, then it has been around since pre-#466
722                # days, and the clients who uploaded those shares used our
723                # TubID as a permutation-seed. We should keep using that same
724                # seed to keep the shares in the same place in the permuted
725                # ring, so those clients don't have to perform excessive
726                # searches.
727                seed = base32.b2a(self.nodeid)
728            else:
729                # otherwise, we're free to use the more natural seed of our
730                # pubkey-based serverid
731                vk_string = ed25519.string_from_verifying_key(self._node_public_key)
732                vk_bytes = remove_prefix(vk_string, ed25519.PUBLIC_KEY_PREFIX)
733                seed = base32.b2a(vk_bytes)
734            self.config.write_config_file("permutation-seed", seed+b"\n", mode="wb")
735        return seed.strip()
736
737    def get_anonymous_storage_server(self):
738        """
739        Get the anonymous ``IStorageServer`` implementation for this node.
740
741        Note this will return an object even if storage is disabled on this
742        node (but the object will not be exposed, peers will not be able to
743        access it, and storage will remain disabled).
744
745        The one and only instance for this node is always returned.  It is
746        created first if necessary.
747        """
748        try:
749            ss = self.getServiceNamed(StorageServer.name)
750        except KeyError:
751            pass
752        else:
753            return ss
754
755        readonly = self.config.get_config("storage", "readonly", False, boolean=True)
756
757        config_storedir = self.get_config(
758            "storage", "storage_dir", self.STOREDIR,
759        )
760        storedir = self.config.get_config_path(config_storedir)
761
762        data = self.config.get_config("storage", "reserved_space", None)
763        try:
764            reserved = parse_abbreviated_size(data)
765        except ValueError:
766            log.msg("[storage]reserved_space= contains unparseable value %s"
767                    % data)
768            raise
769        if reserved is None:
770            reserved = 0
771        discard = self.config.get_config("storage", "debug_discard", False,
772                                         boolean=True)
773
774        expire = self.config.get_config("storage", "expire.enabled", False, boolean=True)
775        if expire:
776            mode = self.config.get_config("storage", "expire.mode") # require a mode
777        else:
778            mode = self.config.get_config("storage", "expire.mode", "age")
779
780        o_l_d = self.config.get_config("storage", "expire.override_lease_duration", None)
781        if o_l_d is not None:
782            o_l_d = parse_duration(o_l_d)
783
784        cutoff_date = None
785        if mode == "cutoff-date":
786            cutoff_date = self.config.get_config("storage", "expire.cutoff_date")
787            cutoff_date = parse_date(cutoff_date)
788
789        sharetypes = []
790        if self.config.get_config("storage", "expire.immutable", True, boolean=True):
791            sharetypes.append("immutable")
792        if self.config.get_config("storage", "expire.mutable", True, boolean=True):
793            sharetypes.append("mutable")
794        expiration_sharetypes = tuple(sharetypes)
795
796        ss = StorageServer(storedir, self.nodeid,
797                           reserved_space=reserved,
798                           discard_storage=discard,
799                           readonly_storage=readonly,
800                           stats_provider=self.stats_provider,
801                           expiration_enabled=expire,
802                           expiration_mode=mode,
803                           expiration_override_lease_duration=o_l_d,
804                           expiration_cutoff_date=cutoff_date,
805                           expiration_sharetypes=expiration_sharetypes)
806        ss.setServiceParent(self)
807        return ss
808
809    def init_storage(self, announceable_storage_servers):
810        # should we run a storage server (and publish it for others to use)?
811        if not storage_enabled(self.config):
812            return
813        if not self._is_tub_listening():
814            raise ValueError("config error: storage is enabled, but tub "
815                             "is not listening ('tub.port=' is empty)")
816
817        ss = self.get_anonymous_storage_server()
818        announcement = {
819            "permutation-seed-base32": self._init_permutation_seed(ss),
820        }
821
822        if anonymous_storage_enabled(self.config):
823            furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
824            furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
825            (_, _, swissnum) = decode_furl(furl)
826            self.storage_nurls = self.tub.negotiationClass.add_storage_server(
827                ss, swissnum.encode("ascii")
828            )
829            announcement["anonymous-storage-FURL"] = furl
830
831        enabled_storage_servers = self._enable_storage_servers(
832            announceable_storage_servers,
833        )
834        storage_options = list(
835            storage_server.announcement
836            for storage_server
837            in enabled_storage_servers
838        )
839        plugins_announcement = {}
840        if storage_options:
841            # Only add the new key if there are any plugins enabled.
842            plugins_announcement[u"storage-options"] = storage_options
843
844        announcement.update(plugins_announcement)
845
846        for ic in self.introducer_clients:
847            ic.publish("storage", announcement, self._node_private_key)
848
849    def get_client_storage_plugin_web_resources(self):
850        """
851        Get all of the client-side ``IResource`` implementations provided by
852        enabled storage plugins.
853
854        :return dict[bytes, IResource provider]: The implementations.
855        """
856        return self.storage_broker.get_client_storage_plugin_web_resources(
857            self.config,
858        )
859
860    def _enable_storage_servers(self, announceable_storage_servers):
861        """
862        Register and announce the given storage servers.
863        """
864        for announceable in announceable_storage_servers:
865            yield self._enable_storage_server(announceable)
866
867    def _enable_storage_server(self, announceable_storage_server):
868        """
869        Register a storage server.
870        """
871        config_key = "storage-plugin.{}.furl".format(
872            # Oops, why don't I have a better handle on this value?
873            announceable_storage_server.announcement[u"name"],
874        )
875        furl = _register_reference(
876            config_key,
877            self.config,
878            self.tub,
879            announceable_storage_server.storage_server,
880        )
881        announceable_storage_server = _add_to_announcement(
882            {u"storage-server-FURL": furl},
883            announceable_storage_server,
884        )
885        return announceable_storage_server
886
887    def init_client(self):
888        helper_furl = self.config.get_config("client", "helper.furl", None)
889        if helper_furl in ("None", ""):
890            helper_furl = None
891
892        DEP = self.encoding_params
893        DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"]))
894        DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"]))
895        DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"]))
896
897        # for the CLI to authenticate to local JSON endpoints
898        self._create_auth_token()
899
900        self.history = History(self.stats_provider)
901        self.terminator = Terminator()
902        self.terminator.setServiceParent(self)
903        uploader = Uploader(
904            helper_furl,
905            self.stats_provider,
906            self.history,
907        )
908        uploader.setServiceParent(self)
909        self.init_blacklist()
910        self.init_nodemaker()
911
912    def get_auth_token(self):
913        """
914        This returns a local authentication token, which is just some
915        random data in "api_auth_token" which must be echoed to API
916        calls.
917        """
918        return self.config.get_private_config(
919            'api_auth_token').encode("ascii")
920
921    def _create_auth_token(self):
922        """
923        Creates new auth-token data written to 'private/api_auth_token'.
924
925        This is intentionally re-created every time the node starts.
926        """
927        self.config.write_private_config(
928            'api_auth_token',
929            urlsafe_b64encode(os.urandom(32)) + b'\n',
930        )
931
932    def get_storage_broker(self):
933        return self.storage_broker
934
935    def load_static_servers(self):
936        """
937        Load the servers.yaml file if it exists, and provide the static
938        server data to the StorageFarmBroker.
939        """
940        fn = self.config.get_private_path("servers.yaml")
941        servers_filepath = FilePath(fn)
942        try:
943            with servers_filepath.open() as f:
944                servers_yaml = yamlutil.safe_load(f)
945            static_servers = servers_yaml.get("storage", {})
946            log.msg("found %d static servers in private/servers.yaml" %
947                    len(static_servers))
948            self.storage_broker.set_static_servers(static_servers)
949        except EnvironmentError:
950            pass
951
952    def init_blacklist(self):
953        fn = self.config.get_config_path("access.blacklist")
954        self.blacklist = Blacklist(fn)
955
956    def init_nodemaker(self):
957        default = self.config.get_config("client", "mutable.format", default="SDMF")
958        if default.upper() == "MDMF":
959            self.mutable_file_default = MDMF_VERSION
960        else:
961            self.mutable_file_default = SDMF_VERSION
962        self.nodemaker = NodeMaker(self.storage_broker,
963                                   self._secret_holder,
964                                   self.get_history(),
965                                   self.getServiceNamed("uploader"),
966                                   self.terminator,
967                                   self.get_encoding_parameters(),
968                                   self.mutable_file_default,
969                                   self._key_generator,
970                                   self.blacklist)
971
972    def get_history(self):
973        return self.history
974
975    def init_helper(self):
976        self.helper = Helper(self.config.get_config_path("helper"),
977                             self.storage_broker, self._secret_holder,
978                             self.stats_provider, self.history)
979        # TODO: this is confusing. BASEDIR/private/helper.furl is created by
980        # the helper. BASEDIR/helper.furl is consumed by the client who wants
981        # to use the helper. I like having the filename be the same, since
982        # that makes 'cp' work smoothly, but the difference between config
983        # inputs and generated outputs is hard to see.
984        helper_furlfile = self.config.get_private_path("helper.furl").encode(get_filesystem_encoding())
985        self.tub.registerReference(self.helper, furlFile=helper_furlfile)
986
987    def _get_tempdir(self):
988        """
989        Determine the path to the directory where temporary files for this node
990        should be written.
991
992        :return bytes: The path which will exist and be a directory.
993        """
994        tempdir_config = self.config.get_config("node", "tempdir", "tmp")
995        if isinstance(tempdir_config, bytes):
996            tempdir_config = tempdir_config.decode('utf-8')
997        tempdir = self.config.get_config_path(tempdir_config)
998        if not os.path.exists(tempdir):
999            fileutil.make_dirs(tempdir)
1000        return tempdir
1001
1002    def init_web(self, webport):
1003        self.log("init_web(webport=%s)", args=(webport,))
1004
1005        from allmydata.webish import WebishServer
1006        nodeurl_path = self.config.get_config_path("node.url")
1007        staticdir_config = self.config.get_config("node", "web.static", "public_html")
1008        staticdir = self.config.get_config_path(staticdir_config)
1009        ws = WebishServer(
1010            self,
1011            webport,
1012            self._get_tempdir(),
1013            nodeurl_path,
1014            staticdir,
1015        )
1016        ws.setServiceParent(self)
1017
1018    def init_sftp_server(self):
1019        if self.config.get_config("sftpd", "enabled", False, boolean=True):
1020            accountfile = self.config.get_config("sftpd", "accounts.file", None)
1021            if accountfile:
1022                accountfile = self.config.get_config_path(accountfile)
1023            sftp_portstr = self.config.get_config("sftpd", "port", "tcp:8022")
1024            pubkey_file = self.config.get_config("sftpd", "host_pubkey_file")
1025            privkey_file = self.config.get_config("sftpd", "host_privkey_file")
1026
1027            from allmydata.frontends import sftpd
1028            s = sftpd.SFTPServer(self, accountfile,
1029                                 sftp_portstr, pubkey_file, privkey_file)
1030            s.setServiceParent(self)
1031
1032    def _check_exit_trigger(self, exit_trigger_file):
1033        if os.path.exists(exit_trigger_file):
1034            mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
1035            if mtime > time.time() - 120.0:
1036                return
1037            else:
1038                self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
1039        else:
1040            self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
1041        reactor.stop()
1042
1043    def get_encoding_parameters(self):
1044        return self.encoding_params
1045
1046    def introducer_connection_statuses(self):
1047        return [ic.connection_status() for ic in self.introducer_clients]
1048
1049    def connected_to_introducer(self):
1050        return any([ic.connected_to_introducer() for ic in self.introducer_clients])
1051
1052    def get_renewal_secret(self): # this will go away
1053        return self._secret_holder.get_renewal_secret()
1054
1055    def get_cancel_secret(self):
1056        return self._secret_holder.get_cancel_secret()
1057
1058    def debug_wait_for_client_connections(self, num_clients):
1059        """Return a Deferred that fires (with None) when we have connections
1060        to the given number of peers. Useful for tests that set up a
1061        temporary test network and need to know when it is safe to proceed
1062        with an upload or download."""
1063        def _check():
1064            return len(self.storage_broker.get_connected_servers()) >= num_clients
1065        d = self.poll(_check, 0.5)
1066        d.addCallback(lambda res: None)
1067        return d
1068
1069
1070    # these four methods are the primitives for creating filenodes and
1071    # dirnodes. The first takes a URI and produces a filenode or (new-style)
1072    # dirnode. The other three create brand-new filenodes/dirnodes.
1073
1074    def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
1075        # This returns synchronously.
1076        # Note that it does *not* validate the write_uri and read_uri; instead we
1077        # may get an opaque node if there were any problems.
1078        return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
1079
1080    def create_dirnode(self, initial_children={}, version=None):
1081        d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
1082        return d
1083
1084    def create_immutable_dirnode(self, children, convergence=None):
1085        return self.nodemaker.create_immutable_directory(children, convergence)
1086
1087    def create_mutable_file(self, contents=None, version=None):
1088        return self.nodemaker.create_mutable_file(contents,
1089                                                  version=version)
1090
1091    def upload(self, uploadable, reactor=None):
1092        uploader = self.getServiceNamed("uploader")
1093        return uploader.upload(uploadable, reactor=reactor)
Note: See TracBrowser for help on using the repository browser.