source: trunk/src/allmydata/client.py

Last change on this file was def7014, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-11-15T17:15:06Z

json dumping doesn't like keys that are bytes (not sure how this ever worked)

  • Property mode set to 100644
File size: 43.0 KB
Line 
1"""
2Functionality related to operating a Tahoe-LAFS node (client _or_ server).
3"""
4from __future__ import annotations
5
6import os
7import stat
8import time
9import weakref
10from typing import Optional, Iterable
11from base64 import urlsafe_b64encode
12from functools import partial
13from configparser import NoSectionError
14
15from six import ensure_text
16from foolscap.furl import (
17    decode_furl,
18)
19
20import attr
21from zope.interface import implementer
22
23from twisted.plugin import (
24    getPlugins,
25)
26from twisted.internet import reactor, defer
27from twisted.application import service
28from twisted.application.internet import TimerService
29from twisted.python.filepath import FilePath
30
31import allmydata
32from allmydata import node
33from allmydata.crypto import rsa, ed25519
34from allmydata.crypto.util import remove_prefix
35from allmydata.storage.server import StorageServer, FoolscapStorageServer
36from allmydata import storage_client
37from allmydata.immutable.upload import Uploader
38from allmydata.immutable.offloaded import Helper
39from allmydata.mutable.filenode import MutableFileNode
40from allmydata.introducer.client import IntroducerClient
41from allmydata.util import (
42    hashutil, base32, pollmixin, log, idlib,
43    yamlutil, configutil,
44    fileutil,
45)
46from allmydata.util.encodingutil import get_filesystem_encoding
47from allmydata.util.abbreviate import parse_abbreviated_size
48from allmydata.util.time_format import parse_duration, parse_date
49from allmydata.util.i2p_provider import create as create_i2p_provider
50from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider
51from allmydata.util.cputhreadpool import defer_to_thread
52from allmydata.util.deferredutil import async_to_deferred
53from allmydata.stats import StatsProvider
54from allmydata.history import History
55from allmydata.interfaces import (
56    IStatsProducer,
57    SDMF_VERSION,
58    MDMF_VERSION,
59    DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
60    IFoolscapStoragePlugin,
61    IAnnounceableStorageServer,
62)
63from allmydata.nodemaker import NodeMaker
64from allmydata.blacklist import Blacklist
65from allmydata.node import _Config
66
67KiB=1024
68MiB=1024*KiB
69GiB=1024*MiB
70TiB=1024*GiB
71PiB=1024*TiB
72
73def _is_valid_section(section_name):
74    """
75    Check for valid dynamic configuration section names.
76
77    Currently considers all possible storage server plugin sections valid.
78    """
79    return (
80        section_name.startswith("storageserver.plugins.") or
81        section_name.startswith("storageclient.plugins.") or
82        section_name in ("grid_managers", "grid_manager_certificates")
83    )
84
85
86_client_config = configutil.ValidConfiguration(
87    static_valid_sections={
88        "client": (
89            "helper.furl",
90            "introducer.furl",
91            "key_generator.furl",
92            "mutable.format",
93            "peers.preferred",
94            "shares.happy",
95            "shares.needed",
96            "shares.total",
97            "shares._max_immutable_segment_size_for_testing",
98            "storage.plugins",
99            "force_foolscap",
100        ),
101        "storage": (
102            "debug_discard",
103            "enabled",
104            "anonymous",
105            "expire.cutoff_date",
106            "expire.enabled",
107            "expire.immutable",
108            "expire.mode",
109            "expire.mode",
110            "expire.mutable",
111            "expire.override_lease_duration",
112            "readonly",
113            "reserved_space",
114            "storage_dir",
115            "plugins",
116            "grid_management",
117            "force_foolscap",
118        ),
119        "sftpd": (
120            "accounts.file",
121            "enabled",
122            "host_privkey_file",
123            "host_pubkey_file",
124            "port",
125        ),
126        "helper": (
127            "enabled",
128        ),
129    },
130    is_valid_section=_is_valid_section,
131    # Anything in a valid section is a valid item, for now.
132    is_valid_item=lambda section, ignored: _is_valid_section(section),
133)
134
135
136def _valid_config():
137    cfg = node._common_valid_config()
138    return cfg.update(_client_config)
139
140# this is put into README in new node-directories
141CLIENT_README = u"""
142This directory contains files which contain private data for the Tahoe node,
143such as private keys.  On Unix-like systems, the permissions on this directory
144are set to disallow users other than its owner from reading the contents of
145the files.   See the 'configuration.rst' documentation file for details.
146"""
147
148
149
150def _make_secret():
151    """
152    Returns a base32-encoded random secret of hashutil.CRYPTO_VAL_SIZE
153    bytes.
154    """
155    return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + b"\n"
156
157
158class SecretHolder(object):
159    def __init__(self, lease_secret, convergence_secret):
160        self._lease_secret = lease_secret
161        self._convergence_secret = convergence_secret
162
163    def get_renewal_secret(self):
164        return hashutil.my_renewal_secret_hash(self._lease_secret)
165
166    def get_cancel_secret(self):
167        return hashutil.my_cancel_secret_hash(self._lease_secret)
168
169    def get_convergence_secret(self):
170        return self._convergence_secret
171
172class KeyGenerator(object):
173    """I create RSA keys for mutable files. Each call to generate() returns a
174    single keypair."""
175
176    @async_to_deferred
177    async def generate(self) -> tuple[rsa.PublicKey, rsa.PrivateKey]:
178        """
179        I return a Deferred that fires with a (verifyingkey, signingkey)
180        pair. The returned key will be 2048 bit.
181        """
182        keysize = 2048
183        private, public = await defer_to_thread(
184            rsa.create_signing_keypair, keysize
185        )
186        return public, private
187
188
189class Terminator(service.Service):
190    def __init__(self):
191        self._clients = weakref.WeakKeyDictionary()
192    def register(self, c):
193        self._clients[c] = None
194    def stopService(self):
195        for c in self._clients:
196            c.stop()
197        return service.Service.stopService(self)
198
199
200def read_config(basedir, portnumfile, generated_files: Iterable=()):
201    """
202    Read and validate configuration for a client-style Node. See
203    :method:`allmydata.node.read_config` for parameter meanings (the
204    only difference here is we pass different validation data)
205
206    :returns: :class:`allmydata.node._Config` instance
207    """
208    return node.read_config(
209        basedir, portnumfile,
210        generated_files=generated_files,
211        _valid_config=_valid_config(),
212    )
213
214
215config_from_string = partial(
216    node.config_from_string,
217    _valid_config=_valid_config(),
218)
219
220
221def create_client(basedir=u".", _client_factory=None):
222    """
223    Creates a new client instance (a subclass of Node).
224
225    :param unicode basedir: the node directory (which may not exist yet)
226
227    :param _client_factory: (for testing) a callable that returns an
228        instance of :class:`allmydata.node.Node` (or a subclass). By default
229        this is :class:`allmydata.client._Client`
230
231    :returns: Deferred yielding an instance of :class:`allmydata.client._Client`
232    """
233    try:
234        node.create_node_dir(basedir, CLIENT_README)
235        config = read_config(basedir, u"client.port")
236        # following call is async
237        return create_client_from_config(
238            config,
239            _client_factory=_client_factory,
240        )
241    except Exception:
242        return defer.fail()
243
244
245@defer.inlineCallbacks
246def create_client_from_config(config, _client_factory=None, _introducer_factory=None):
247    """
248    Creates a new client instance (a subclass of Node).  Most code
249    should probably use `create_client` instead.
250
251    :returns: Deferred yielding a _Client instance
252
253    :param config: configuration instance (from read_config()) which
254        encapsulates everything in the "node directory".
255
256    :param _client_factory: for testing; the class to instantiate
257        instead of _Client
258
259    :param _introducer_factory: for testing; the class to instantiate instead
260        of IntroducerClient
261    """
262    if _client_factory is None:
263        _client_factory = _Client
264
265    i2p_provider = create_i2p_provider(reactor, config)
266    tor_provider = create_tor_provider(reactor, config)
267    handlers = node.create_connection_handlers(config, i2p_provider, tor_provider)
268    default_connection_handlers, foolscap_connection_handlers = handlers
269    tub_options = node.create_tub_options(config)
270
271    main_tub = node.create_main_tub(
272        config, tub_options, default_connection_handlers,
273        foolscap_connection_handlers, i2p_provider, tor_provider,
274    )
275
276    introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
277    storage_broker = create_storage_farm_broker(
278        config, default_connection_handlers, foolscap_connection_handlers,
279        tub_options, introducer_clients, tor_provider
280    )
281
282    client = _client_factory(
283        config,
284        main_tub,
285        i2p_provider,
286        tor_provider,
287        introducer_clients,
288        storage_broker,
289    )
290
291    # Initialize storage separately after creating the client.  This is
292    # necessary because we need to pass a reference to the client in to the
293    # storage plugins to allow them to initialize themselves (specifically,
294    # they may want the anonymous IStorageServer implementation so they don't
295    # have to duplicate all of its basic storage functionality).  A better way
296    # to do this, eventually, may be to create that implementation first and
297    # then pass it in to both storage plugin creation and the client factory.
298    # This avoids making a partially initialized client object escape the
299    # client factory and removes the circular dependency between these
300    # objects.
301    storage_plugins = yield _StoragePlugins.from_config(
302        client.get_anonymous_storage_server,
303        config,
304    )
305    client.init_storage(storage_plugins.announceable_storage_servers)
306
307    i2p_provider.setServiceParent(client)
308    tor_provider.setServiceParent(client)
309    for ic in introducer_clients:
310        ic.setServiceParent(client)
311    storage_broker.setServiceParent(client)
312    defer.returnValue(client)
313
314
315@attr.s
316class _StoragePlugins(object):
317    """
318    Functionality related to getting storage plugins set up and ready for use.
319
320    :ivar list[IAnnounceableStorageServer] announceable_storage_servers: The
321        announceable storage servers that should be used according to node
322        configuration.
323    """
324    announceable_storage_servers = attr.ib()
325
326    @classmethod
327    @defer.inlineCallbacks
328    def from_config(cls, get_anonymous_storage_server, config):
329        """
330        Load and configured storage plugins.
331
332        :param get_anonymous_storage_server: A no-argument callable which
333            returns the node's anonymous ``IStorageServer`` implementation.
334
335        :param _Config config: The node's configuration.
336
337        :return: A ``_StoragePlugins`` initialized from the given
338            configuration.
339        """
340        storage_plugin_names = cls._get_enabled_storage_plugin_names(config)
341        plugins = list(cls._collect_storage_plugins(storage_plugin_names))
342        unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins}
343        if unknown_plugin_names:
344            raise configutil.UnknownConfigError(
345                "Storage plugins {} are enabled but not known on this system.".format(
346                    unknown_plugin_names,
347                ),
348            )
349        announceable_storage_servers = yield cls._create_plugin_storage_servers(
350            get_anonymous_storage_server,
351            config,
352            plugins,
353        )
354        defer.returnValue(cls(
355            announceable_storage_servers,
356        ))
357
358    @classmethod
359    def _get_enabled_storage_plugin_names(cls, config):
360        """
361        Get the names of storage plugins that are enabled in the configuration.
362        """
363        return set(
364            config.get_config(
365                "storage", "plugins", ""
366            ).split(u",")
367        ) - {u""}
368
369    @classmethod
370    def _collect_storage_plugins(cls, storage_plugin_names):
371        """
372        Get the storage plugins with names matching those given.
373        """
374        return list(
375            plugin
376            for plugin
377            in getPlugins(IFoolscapStoragePlugin)
378            if plugin.name in storage_plugin_names
379        )
380
381    @classmethod
382    def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins):
383        """
384        Cause each storage plugin to instantiate its storage server and return
385        them all.
386
387        :return: A ``Deferred`` that fires with storage servers instantiated
388            by all of the given storage server plugins.
389        """
390        return defer.gatherResults(
391            list(
392                plugin.get_storage_server(
393                    cls._get_storage_plugin_configuration(config, plugin.name),
394                    get_anonymous_storage_server,
395                ).addCallback(
396                    partial(
397                        _add_to_announcement,
398                        {u"name": plugin.name},
399                    ),
400                )
401                for plugin
402                # The order is fairly arbitrary and it is not meant to convey
403                # anything but providing *some* stable ordering makes the data
404                # a little easier to deal with (mainly in tests and when
405                # manually inspecting it).
406                in sorted(plugins, key=lambda p: p.name)
407            ),
408        )
409
410    @classmethod
411    def _get_storage_plugin_configuration(cls, config, storage_plugin_name):
412        """
413        Load the configuration for a storage server plugin with the given name.
414
415        :return dict[bytes, bytes]: The matching configuration.
416        """
417        try:
418            config = config.items(
419                "storageserver.plugins." + storage_plugin_name,
420            )
421        except NoSectionError:
422            config = []
423        return dict(config)
424
425
426
427def _sequencer(config):
428    """
429    :returns: a 2-tuple consisting of a new announcement
430        sequence-number and random nonce (int, unicode). Reads and
431        re-writes configuration file "announcement-seqnum" (starting at 1
432        if that file doesn't exist).
433    """
434    seqnum_s = config.get_config_from_file("announcement-seqnum")
435    if not seqnum_s:
436        seqnum_s = u"0"
437    seqnum = int(seqnum_s.strip())
438    seqnum += 1  # increment
439    config.write_config_file("announcement-seqnum", "{}\n".format(seqnum))
440    nonce = _make_secret().strip()
441    return seqnum, nonce
442
443
444def create_introducer_clients(config, main_tub, _introducer_factory=None):
445    """
446    Read, validate and parse any 'introducers.yaml' configuration.
447
448    :param _introducer_factory: for testing; the class to instantiate instead
449        of IntroducerClient
450
451    :returns: a list of IntroducerClient instances
452    """
453    if _introducer_factory is None:
454        _introducer_factory = IntroducerClient
455
456    # we return this list
457    introducer_clients = []
458
459    introducers = config.get_introducer_configuration()
460
461    for petname, (furl, cache_path) in list(introducers.items()):
462        ic = _introducer_factory(
463            main_tub,
464            furl.encode("ascii"),
465            config.nickname,
466            str(allmydata.__full_version__),
467            str(_Client.OLDEST_SUPPORTED_VERSION),
468            partial(_sequencer, config),
469            cache_path,
470        )
471        introducer_clients.append(ic)
472    return introducer_clients
473
474
475def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]):
476    """
477    Create a StorageFarmBroker object, for use by Uploader/Downloader
478    (and everybody else who wants to use storage servers)
479
480    :param config: a _Config instance
481
482    :param default_connection_handlers: default Foolscap handlers
483
484    :param foolscap_connection_handlers: available/configured Foolscap
485        handlers
486
487    :param dict tub_options: how to configure our Tub
488
489    :param list introducer_clients: IntroducerClient instances if
490        we're connecting to any
491    """
492    storage_client_config = storage_client.StorageClientConfig.from_node_config(
493        config,
494    )
495    # ensure that we can at least load all plugins that the
496    # configuration mentions; doing this early (i.e. before creating
497    # storage-clients themselves) allows us to exit in case of a
498    # problem.
499    storage_client_config.get_configured_storage_plugins()
500
501    def tub_creator(handler_overrides=None, **kwargs):
502        return node.create_tub(
503            tub_options,
504            default_connection_handlers,
505            foolscap_connection_handlers,
506            handler_overrides={} if handler_overrides is None else handler_overrides,
507            **kwargs
508        )
509
510    # create the actual storage-broker
511    sb = storage_client.StorageFarmBroker(
512        permute_peers=True,
513        tub_maker=tub_creator,
514        node_config=config,
515        storage_client_config=storage_client_config,
516        default_connection_handlers=default_connection_handlers,
517        tor_provider=tor_provider,
518    )
519    for ic in introducer_clients:
520        sb.use_introducer(ic)
521    return sb
522
523
524def _register_reference(key, config, tub, referenceable):
525    """
526    Register a referenceable in a tub with a stable fURL.
527
528    Stability is achieved by storing the fURL in the configuration the first
529    time and then reading it back on for future calls.
530
531    :param bytes key: An identifier for this reference which can be used to
532        identify its fURL in the configuration.
533
534    :param _Config config: The configuration to use for fURL persistence.
535
536    :param Tub tub: The tub in which to register the reference.
537
538    :param Referenceable referenceable: The referenceable to register in the
539        Tub.
540
541    :return bytes: The fURL at which the object is registered.
542    """
543    persisted_furl = config.get_private_config(
544        key,
545        default=None,
546    )
547    name = None
548    if persisted_furl is not None:
549        _, _, name = decode_furl(persisted_furl)
550    registered_furl = tub.registerReference(
551        referenceable,
552        name=name,
553    )
554    if persisted_furl is None:
555        config.write_private_config(key, registered_furl)
556    return registered_furl
557
558
559@implementer(IAnnounceableStorageServer)
560@attr.s
561class AnnounceableStorageServer(object):
562    announcement = attr.ib()
563    storage_server = attr.ib()
564
565
566
567def _add_to_announcement(information, announceable_storage_server):
568    """
569    Create a new ``AnnounceableStorageServer`` based on
570    ``announceable_storage_server`` with ``information`` added to its
571    ``announcement``.
572    """
573    updated_announcement = announceable_storage_server.announcement.copy()
574    updated_announcement.update(information)
575    return AnnounceableStorageServer(
576        updated_announcement,
577        announceable_storage_server.storage_server,
578    )
579
580
581def storage_enabled(config):
582    """
583    Is storage enabled according to the given configuration object?
584
585    :param _Config config: The configuration to inspect.
586
587    :return bool: ``True`` if storage is enabled, ``False`` otherwise.
588    """
589    return config.get_config("storage", "enabled", True, boolean=True)
590
591
592def anonymous_storage_enabled(config):
593    """
594    Is anonymous access to storage enabled according to the given
595    configuration object?
596
597    :param _Config config: The configuration to inspect.
598
599    :return bool: ``True`` if storage is enabled, ``False`` otherwise.
600    """
601    return (
602        storage_enabled(config) and
603        config.get_config("storage", "anonymous", True, boolean=True)
604    )
605
606
607@implementer(IStatsProducer)
608class _Client(node.Node, pollmixin.PollMixin):
609    """
610    This class should be refactored; see
611    https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931
612    """
613
614    STOREDIR = 'storage'
615    NODETYPE = "client"
616    EXIT_TRIGGER_FILE = "exit_trigger"
617
618    # This means that if a storage server treats me as though I were a
619    # 1.0.0 storage client, it will work as they expect.
620    OLDEST_SUPPORTED_VERSION = "1.0.0"
621
622    # This is a dictionary of (needed, desired, total, max_segment_size). 'needed'
623    # is the number of shares required to reconstruct a file. 'desired' means
624    # that we will abort an upload unless we can allocate space for at least
625    # this many. 'total' is the total number of shares created by encoding.
626    # If everybody has room then this is is how many we will upload.
627    DEFAULT_ENCODING_PARAMETERS = {"k": 3,
628                                   "happy": 7,
629                                   "n": 10,
630                                   "max_segment_size": DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE,
631                                   }
632
633    def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
634                 storage_farm_broker):
635        """
636        Use :func:`allmydata.client.create_client` to instantiate one of these.
637        """
638        node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
639
640        self.started_timestamp = time.time()
641        self.logSource = "Client"
642        self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy()
643
644        self.introducer_clients = introducer_clients
645        self.storage_broker = storage_farm_broker
646
647        self.init_stats_provider()
648        self.init_secrets()
649        self.init_node_key()
650        self._key_generator = KeyGenerator()
651        key_gen_furl = config.get_config("client", "key_generator.furl", None)
652        if key_gen_furl:
653            log.msg("[client]key_generator.furl= is now ignored, see #2783")
654        self.init_client()
655        self.load_static_servers()
656        self.helper = None
657        if config.get_config("helper", "enabled", False, boolean=True):
658            if not self._is_tub_listening():
659                raise ValueError("config error: helper is enabled, but tub "
660                                 "is not listening ('tub.port=' is empty)")
661            self.init_helper()
662        self.init_sftp_server()
663
664        # If the node sees an exit_trigger file, it will poll every second to see
665        # whether the file still exists, and what its mtime is. If the file does not
666        # exist or has not been modified for a given timeout, the node will exit.
667        exit_trigger_file = config.get_config_path(self.EXIT_TRIGGER_FILE)
668        if os.path.exists(exit_trigger_file):
669            age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME]
670            self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age))
671            exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file)
672            exit_trigger.setServiceParent(self)
673
674        # this needs to happen last, so it can use getServiceNamed() to
675        # acquire references to StorageServer and other web-statusable things
676        webport = config.get_config("node", "web.port", None)
677        if webport:
678            self.init_web(webport) # strports string
679
680        # TODO this may be the wrong location for now? but as temporary measure
681        # it allows us to get NURLs for testing in test_istorageserver.py. This
682        # will eventually get fixed one way or another in
683        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3901. See also
684        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 for the bigger
685        # picture issue.
686        self.storage_nurls : Optional[set] = None
687
688    def init_stats_provider(self):
689        self.stats_provider = StatsProvider(self)
690        self.stats_provider.setServiceParent(self)
691        self.stats_provider.register_producer(self)
692
693    def get_stats(self):
694        return { 'node.uptime': time.time() - self.started_timestamp }
695
696    def init_secrets(self):
697        # configs are always unicode
698        def _unicode_make_secret():
699            return str(_make_secret(), "ascii")
700        lease_s = self.config.get_or_create_private_config(
701            "secret", _unicode_make_secret).encode("utf-8")
702        lease_secret = base32.a2b(lease_s)
703        convergence_s = self.config.get_or_create_private_config(
704            'convergence', _unicode_make_secret).encode("utf-8")
705        self.convergence = base32.a2b(convergence_s)
706        self._secret_holder = SecretHolder(lease_secret, self.convergence)
707
708    def init_node_key(self):
709        # we only create the key once. On all subsequent runs, we re-use the
710        # existing key
711        def _make_key():
712            private_key, _ = ed25519.create_signing_keypair()
713            # Config values are always unicode:
714            return str(ed25519.string_from_signing_key(private_key) + b"\n", "utf-8")
715
716        private_key_str = self.config.get_or_create_private_config(
717            "node.privkey", _make_key).encode("utf-8")
718        private_key, public_key = ed25519.signing_keypair_from_string(private_key_str)
719        public_key_str = ed25519.string_from_verifying_key(public_key)
720        self.config.write_config_file("node.pubkey", public_key_str + b"\n", "wb")
721        self._node_private_key = private_key
722        self._node_public_key = public_key
723
724    def get_long_nodeid(self):
725        # this matches what IServer.get_longname() says about us elsewhere
726        vk_string = ed25519.string_from_verifying_key(self._node_public_key)
727        return remove_prefix(vk_string, b"pub-")
728
729    def get_long_tubid(self):
730        return idlib.nodeid_b2a(self.nodeid)
731
732    def get_web_service(self):
733        """
734        :return: a reference to our web server
735        """
736        return self.getServiceNamed("webish")
737
738    def _init_permutation_seed(self, ss):
739        seed = self.config.get_config_from_file("permutation-seed")
740        if not seed:
741            have_shares = ss.have_shares()
742            if have_shares:
743                # if the server has shares but not a recorded
744                # permutation-seed, then it has been around since pre-#466
745                # days, and the clients who uploaded those shares used our
746                # TubID as a permutation-seed. We should keep using that same
747                # seed to keep the shares in the same place in the permuted
748                # ring, so those clients don't have to perform excessive
749                # searches.
750                seed = base32.b2a(self.nodeid)
751            else:
752                # otherwise, we're free to use the more natural seed of our
753                # pubkey-based serverid
754                vk_string = ed25519.string_from_verifying_key(self._node_public_key)
755                vk_bytes = remove_prefix(vk_string, ed25519.PUBLIC_KEY_PREFIX)
756                seed = base32.b2a(vk_bytes)
757            self.config.write_config_file("permutation-seed", seed+b"\n", mode="wb")
758        return seed.strip()
759
760    def get_anonymous_storage_server(self):
761        """
762        Get the anonymous ``IStorageServer`` implementation for this node.
763
764        Note this will return an object even if storage is disabled on this
765        node (but the object will not be exposed, peers will not be able to
766        access it, and storage will remain disabled).
767
768        The one and only instance for this node is always returned.  It is
769        created first if necessary.
770        """
771        try:
772            ss = self.getServiceNamed(StorageServer.name)
773        except KeyError:
774            pass
775        else:
776            return ss
777
778        readonly = self.config.get_config("storage", "readonly", False, boolean=True)
779
780        config_storedir = self.get_config(
781            "storage", "storage_dir", self.STOREDIR,
782        )
783        storedir = self.config.get_config_path(config_storedir)
784
785        data = self.config.get_config("storage", "reserved_space", None)
786        try:
787            reserved = parse_abbreviated_size(data)
788        except ValueError:
789            log.msg("[storage]reserved_space= contains unparseable value %s"
790                    % data)
791            raise
792        if reserved is None:
793            reserved = 0
794        discard = self.config.get_config("storage", "debug_discard", False,
795                                         boolean=True)
796
797        expire = self.config.get_config("storage", "expire.enabled", False, boolean=True)
798        if expire:
799            mode = self.config.get_config("storage", "expire.mode") # require a mode
800        else:
801            mode = self.config.get_config("storage", "expire.mode", "age")
802
803        o_l_d = self.config.get_config("storage", "expire.override_lease_duration", None)
804        if o_l_d is not None:
805            o_l_d = parse_duration(o_l_d)
806
807        cutoff_date = None
808        if mode == "cutoff-date":
809            cutoff_date = self.config.get_config("storage", "expire.cutoff_date")
810            cutoff_date = parse_date(cutoff_date)
811
812        sharetypes = []
813        if self.config.get_config("storage", "expire.immutable", True, boolean=True):
814            sharetypes.append("immutable")
815        if self.config.get_config("storage", "expire.mutable", True, boolean=True):
816            sharetypes.append("mutable")
817        expiration_sharetypes = tuple(sharetypes)
818
819        ss = StorageServer(
820            storedir, self.nodeid,
821            reserved_space=reserved,
822            discard_storage=discard,
823            readonly_storage=readonly,
824            stats_provider=self.stats_provider,
825            expiration_enabled=expire,
826            expiration_mode=mode,
827            expiration_override_lease_duration=o_l_d,
828            expiration_cutoff_date=cutoff_date,
829            expiration_sharetypes=expiration_sharetypes,
830        )
831        ss.setServiceParent(self)
832        return ss
833
834    def init_storage(self, announceable_storage_servers):
835        # should we run a storage server (and publish it for others to use)?
836        if not storage_enabled(self.config):
837            return
838        if not self._is_tub_listening():
839            raise ValueError("config error: storage is enabled, but tub "
840                             "is not listening ('tub.port=' is empty)")
841
842        ss = self.get_anonymous_storage_server()
843        announcement = {
844            "permutation-seed-base32": self._init_permutation_seed(ss),
845        }
846
847        if anonymous_storage_enabled(self.config):
848            furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
849            furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
850            (_, _, swissnum) = decode_furl(furl)
851            if hasattr(self.tub.negotiationClass, "add_storage_server"):
852                nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii"))
853                self.storage_nurls = nurls
854                # There is code in e.g. storage_client.py that checks if an
855                # announcement has changed. Since NURL order isn't meaningful,
856                # we don't want a change in the order to count as a change, so we
857                # send the NURLs as a set. CBOR supports sets, as does Foolscap.
858                announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = {n.to_text() for n in nurls}
859            announcement["anonymous-storage-FURL"] = furl
860
861        enabled_storage_servers = self._enable_storage_servers(
862            announceable_storage_servers,
863        )
864        storage_options = list(
865            storage_server.announcement
866            for storage_server
867            in enabled_storage_servers
868        )
869        plugins_announcement = {}
870        if storage_options:
871            # Only add the new key if there are any plugins enabled.
872            plugins_announcement[u"storage-options"] = storage_options
873
874        announcement.update(plugins_announcement)
875
876        if self.config.get_config("storage", "grid_management", default=False, boolean=True):
877            grid_manager_certificates = self.config.get_grid_manager_certificates()
878            announcement[u"grid-manager-certificates"] = grid_manager_certificates
879
880        # Note: certificates are not verified for validity here, but
881        # that may be useful. See:
882        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3977
883
884        for ic in self.introducer_clients:
885            ic.publish("storage", announcement, self._node_private_key)
886
887    def get_client_storage_plugin_web_resources(self):
888        """
889        Get all of the client-side ``IResource`` implementations provided by
890        enabled storage plugins.
891
892        :return dict[bytes, IResource provider]: The implementations.
893        """
894        return self.storage_broker.get_client_storage_plugin_web_resources(
895            self.config,
896        )
897
898    def _enable_storage_servers(self, announceable_storage_servers):
899        """
900        Register and announce the given storage servers.
901        """
902        for announceable in announceable_storage_servers:
903            yield self._enable_storage_server(announceable)
904
905    def _enable_storage_server(self, announceable_storage_server):
906        """
907        Register a storage server.
908        """
909        config_key = "storage-plugin.{}.furl".format(
910            # Oops, why don't I have a better handle on this value?
911            announceable_storage_server.announcement[u"name"],
912        )
913        furl = _register_reference(
914            config_key,
915            self.config,
916            self.tub,
917            announceable_storage_server.storage_server,
918        )
919        announceable_storage_server = _add_to_announcement(
920            {u"storage-server-FURL": furl},
921            announceable_storage_server,
922        )
923        return announceable_storage_server
924
925    def init_client(self):
926        helper_furl = self.config.get_config("client", "helper.furl", None)
927        if helper_furl in ("None", ""):
928            helper_furl = None
929
930        DEP = self.encoding_params
931        DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"]))
932        DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"]))
933        DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"]))
934        # At the moment this is only used for testing, thus the janky config
935        # attribute name.
936        DEP["max_segment_size"] = int(self.config.get_config(
937            "client",
938            "shares._max_immutable_segment_size_for_testing",
939            DEP["max_segment_size"])
940        )
941
942        # for the CLI to authenticate to local JSON endpoints
943        self._create_auth_token()
944
945        self.history = History(self.stats_provider)
946        self.terminator = Terminator()
947        self.terminator.setServiceParent(self)
948        uploader = Uploader(
949            helper_furl,
950            self.stats_provider,
951            self.history,
952        )
953        uploader.setServiceParent(self)
954        self.init_blacklist()
955        self.init_nodemaker()
956
957    def get_auth_token(self):
958        """
959        This returns a local authentication token, which is just some
960        random data in "api_auth_token" which must be echoed to API
961        calls.
962        """
963        return self.config.get_private_config(
964            'api_auth_token').encode("ascii")
965
966    def _create_auth_token(self):
967        """
968        Creates new auth-token data written to 'private/api_auth_token'.
969
970        This is intentionally re-created every time the node starts.
971        """
972        self.config.write_private_config(
973            'api_auth_token',
974            urlsafe_b64encode(os.urandom(32)) + b'\n',
975        )
976
977    def get_storage_broker(self):
978        return self.storage_broker
979
980    def load_static_servers(self):
981        """
982        Load the servers.yaml file if it exists, and provide the static
983        server data to the StorageFarmBroker.
984        """
985        fn = self.config.get_private_path("servers.yaml")
986        servers_filepath = FilePath(fn)
987        try:
988            with servers_filepath.open() as f:
989                servers_yaml = yamlutil.safe_load(f)
990            static_servers = servers_yaml.get("storage", {})
991            log.msg("found %d static servers in private/servers.yaml" %
992                    len(static_servers))
993            static_servers = {
994                ensure_text(key): value for (key, value) in static_servers.items()
995            }
996            self.storage_broker.set_static_servers(static_servers)
997        except EnvironmentError:
998            pass
999
1000    def init_blacklist(self):
1001        fn = self.config.get_config_path("access.blacklist")
1002        self.blacklist = Blacklist(fn)
1003
1004    def init_nodemaker(self):
1005        default = self.config.get_config("client", "mutable.format", default="SDMF")
1006        if default.upper() == "MDMF":
1007            self.mutable_file_default = MDMF_VERSION
1008        else:
1009            self.mutable_file_default = SDMF_VERSION
1010        self.nodemaker = NodeMaker(self.storage_broker,
1011                                   self._secret_holder,
1012                                   self.get_history(),
1013                                   self.getServiceNamed("uploader"),
1014                                   self.terminator,
1015                                   self.get_encoding_parameters(),
1016                                   self.mutable_file_default,
1017                                   self._key_generator,
1018                                   self.blacklist)
1019
1020    def get_history(self):
1021        return self.history
1022
1023    def init_helper(self):
1024        self.helper = Helper(self.config.get_config_path("helper"),
1025                             self.storage_broker, self._secret_holder,
1026                             self.stats_provider, self.history)
1027        # TODO: this is confusing. BASEDIR/private/helper.furl is created by
1028        # the helper. BASEDIR/helper.furl is consumed by the client who wants
1029        # to use the helper. I like having the filename be the same, since
1030        # that makes 'cp' work smoothly, but the difference between config
1031        # inputs and generated outputs is hard to see.
1032        helper_furlfile = self.config.get_private_path("helper.furl").encode(get_filesystem_encoding())
1033        self.tub.registerReference(self.helper, furlFile=helper_furlfile)
1034
1035    def _get_tempdir(self):
1036        """
1037        Determine the path to the directory where temporary files for this node
1038        should be written.
1039
1040        :return bytes: The path which will exist and be a directory.
1041        """
1042        tempdir_config = self.config.get_config("node", "tempdir", "tmp")
1043        if isinstance(tempdir_config, bytes):
1044            tempdir_config = tempdir_config.decode('utf-8')
1045        tempdir = self.config.get_config_path(tempdir_config)
1046        if not os.path.exists(tempdir):
1047            fileutil.make_dirs(tempdir)
1048        return tempdir
1049
1050    def init_web(self, webport):
1051        self.log("init_web(webport=%s)", args=(webport,))
1052
1053        from allmydata.webish import WebishServer, anonymous_tempfile_factory
1054        nodeurl_path = self.config.get_config_path("node.url")
1055        staticdir_config = self.config.get_config("node", "web.static", "public_html")
1056        staticdir = self.config.get_config_path(staticdir_config)
1057        ws = WebishServer(
1058            self,
1059            webport,
1060            anonymous_tempfile_factory(self._get_tempdir()),
1061            nodeurl_path,
1062            staticdir,
1063        )
1064        ws.setServiceParent(self)
1065
1066    def init_sftp_server(self):
1067        if self.config.get_config("sftpd", "enabled", False, boolean=True):
1068            accountfile = self.config.get_config("sftpd", "accounts.file", None)
1069            if accountfile:
1070                accountfile = self.config.get_config_path(accountfile)
1071            sftp_portstr = self.config.get_config("sftpd", "port", "tcp:8022")
1072            pubkey_file = self.config.get_config("sftpd", "host_pubkey_file")
1073            privkey_file = self.config.get_config("sftpd", "host_privkey_file")
1074
1075            from allmydata.frontends import sftpd
1076            s = sftpd.SFTPServer(self, accountfile,
1077                                 sftp_portstr, pubkey_file, privkey_file)
1078            s.setServiceParent(self)
1079
1080    def _check_exit_trigger(self, exit_trigger_file):
1081        if os.path.exists(exit_trigger_file):
1082            mtime = os.stat(exit_trigger_file)[stat.ST_MTIME]
1083            if mtime > time.time() - 120.0:
1084                return
1085            else:
1086                self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,))
1087        else:
1088            self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,))
1089        reactor.stop()
1090
1091    def get_encoding_parameters(self):
1092        return self.encoding_params
1093
1094    def introducer_connection_statuses(self):
1095        return [ic.connection_status() for ic in self.introducer_clients]
1096
1097    def connected_to_introducer(self):
1098        return any([ic.connected_to_introducer() for ic in self.introducer_clients])
1099
1100    def get_renewal_secret(self): # this will go away
1101        return self._secret_holder.get_renewal_secret()
1102
1103    def get_cancel_secret(self):
1104        return self._secret_holder.get_cancel_secret()
1105
1106    def debug_wait_for_client_connections(self, num_clients):
1107        """Return a Deferred that fires (with None) when we have connections
1108        to the given number of peers. Useful for tests that set up a
1109        temporary test network and need to know when it is safe to proceed
1110        with an upload or download."""
1111        def _check():
1112            return len(self.storage_broker.get_connected_servers()) >= num_clients
1113        d = self.poll(_check, 0.5)
1114        d.addCallback(lambda res: None)
1115        return d
1116
1117
1118    # these four methods are the primitives for creating filenodes and
1119    # dirnodes. The first takes a URI and produces a filenode or (new-style)
1120    # dirnode. The other three create brand-new filenodes/dirnodes.
1121
1122    def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"):
1123        # This returns synchronously.
1124        # Note that it does *not* validate the write_uri and read_uri; instead we
1125        # may get an opaque node if there were any problems.
1126        return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name)
1127
1128    def create_dirnode(self, initial_children=None, version=None):
1129        d = self.nodemaker.create_new_mutable_directory(initial_children, version=version)
1130        return d
1131
1132    def create_immutable_dirnode(self, children, convergence=None):
1133        return self.nodemaker.create_immutable_directory(children, convergence)
1134
1135    def create_mutable_file(
1136            self,
1137            contents: bytes | None = None,
1138            version: int | None = None,
1139            *,
1140            unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None,
1141    ) -> MutableFileNode:
1142        """
1143        Create *and upload* a new mutable object.
1144
1145        :param contents: If given, the initial contents for the new object.
1146
1147        :param version: If given, the mutable file format for the new object
1148            (otherwise a format will be chosen automatically).
1149
1150        :param unique_keypair: **Warning** This value independently determines
1151            the identity of the mutable object to create.  There cannot be two
1152            different mutable objects that share a keypair.  They will merge
1153            into one object (with undefined contents).
1154
1155            It is common to pass a None value (or not pass a valuye) for this
1156            parameter.  In these cases, a new random keypair will be
1157            generated.
1158
1159            If non-None, the given public/private keypair will be used for the
1160            new object.  The expected use-case is for implementing compliance
1161            tests.
1162
1163        :return: A Deferred which will fire with a representation of the new
1164            mutable object after it has been uploaded.
1165        """
1166        return self.nodemaker.create_mutable_file(contents,
1167                                                  version=version,
1168                                                  keypair=unique_keypair)
1169
1170    def upload(self, uploadable, reactor=None):
1171        uploader = self.getServiceNamed("uploader")
1172        return uploader.upload(uploadable, reactor=reactor)
Note: See TracBrowser for help on using the repository browser.