| 1 | """ |
|---|
| 2 | Functionality related to operating a Tahoe-LAFS node (client _or_ server). |
|---|
| 3 | """ |
|---|
| 4 | from __future__ import annotations |
|---|
| 5 | |
|---|
| 6 | import os |
|---|
| 7 | import stat |
|---|
| 8 | import time |
|---|
| 9 | import weakref |
|---|
| 10 | from typing import Optional, Iterable |
|---|
| 11 | from base64 import urlsafe_b64encode |
|---|
| 12 | from functools import partial |
|---|
| 13 | from configparser import NoSectionError |
|---|
| 14 | |
|---|
| 15 | from six import ensure_text |
|---|
| 16 | from foolscap.furl import ( |
|---|
| 17 | decode_furl, |
|---|
| 18 | ) |
|---|
| 19 | |
|---|
| 20 | import attr |
|---|
| 21 | from zope.interface import implementer |
|---|
| 22 | |
|---|
| 23 | from twisted.plugin import ( |
|---|
| 24 | getPlugins, |
|---|
| 25 | ) |
|---|
| 26 | from twisted.internet import reactor, defer |
|---|
| 27 | from twisted.application import service |
|---|
| 28 | from twisted.application.internet import TimerService |
|---|
| 29 | from twisted.python.filepath import FilePath |
|---|
| 30 | |
|---|
| 31 | import allmydata |
|---|
| 32 | from allmydata import node |
|---|
| 33 | from allmydata.crypto import rsa, ed25519 |
|---|
| 34 | from allmydata.crypto.util import remove_prefix |
|---|
| 35 | from allmydata.dirnode import DirectoryNode |
|---|
| 36 | from allmydata.storage.server import StorageServer, FoolscapStorageServer |
|---|
| 37 | from allmydata import storage_client |
|---|
| 38 | from allmydata.immutable.upload import Uploader |
|---|
| 39 | from allmydata.immutable.offloaded import Helper |
|---|
| 40 | from allmydata.mutable.filenode import MutableFileNode |
|---|
| 41 | from allmydata.introducer.client import IntroducerClient |
|---|
| 42 | from allmydata.util import ( |
|---|
| 43 | hashutil, base32, pollmixin, log, idlib, |
|---|
| 44 | yamlutil, configutil, |
|---|
| 45 | fileutil, |
|---|
| 46 | ) |
|---|
| 47 | from allmydata.util.encodingutil import get_filesystem_encoding |
|---|
| 48 | from allmydata.util.abbreviate import parse_abbreviated_size |
|---|
| 49 | from allmydata.util.time_format import parse_duration, parse_date |
|---|
| 50 | from allmydata.util.i2p_provider import create as create_i2p_provider |
|---|
| 51 | from allmydata.util.tor_provider import create as create_tor_provider, _Provider as TorProvider |
|---|
| 52 | from allmydata.util.cputhreadpool import defer_to_thread |
|---|
| 53 | from allmydata.util.deferredutil import async_to_deferred |
|---|
| 54 | from allmydata.stats import StatsProvider |
|---|
| 55 | from allmydata.history import History |
|---|
| 56 | from allmydata.interfaces import ( |
|---|
| 57 | IStatsProducer, |
|---|
| 58 | SDMF_VERSION, |
|---|
| 59 | MDMF_VERSION, |
|---|
| 60 | DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, |
|---|
| 61 | IFoolscapStoragePlugin, |
|---|
| 62 | IAnnounceableStorageServer, |
|---|
| 63 | ) |
|---|
| 64 | from allmydata.nodemaker import NodeMaker |
|---|
| 65 | from allmydata.blacklist import Blacklist |
|---|
| 66 | from allmydata.node import _Config |
|---|
| 67 | |
|---|
| 68 | KiB=1024 |
|---|
| 69 | MiB=1024*KiB |
|---|
| 70 | GiB=1024*MiB |
|---|
| 71 | TiB=1024*GiB |
|---|
| 72 | PiB=1024*TiB |
|---|
| 73 | |
|---|
| 74 | def _is_valid_section(section_name): |
|---|
| 75 | """ |
|---|
| 76 | Check for valid dynamic configuration section names. |
|---|
| 77 | |
|---|
| 78 | Currently considers all possible storage server plugin sections valid. |
|---|
| 79 | """ |
|---|
| 80 | return ( |
|---|
| 81 | section_name.startswith("storageserver.plugins.") or |
|---|
| 82 | section_name.startswith("storageclient.plugins.") or |
|---|
| 83 | section_name in ("grid_managers", "grid_manager_certificates") |
|---|
| 84 | ) |
|---|
| 85 | |
|---|
| 86 | |
|---|
| 87 | _client_config = configutil.ValidConfiguration( |
|---|
| 88 | static_valid_sections={ |
|---|
| 89 | "client": ( |
|---|
| 90 | "helper.furl", |
|---|
| 91 | "introducer.furl", |
|---|
| 92 | "key_generator.furl", |
|---|
| 93 | "mutable.format", |
|---|
| 94 | "peers.preferred", |
|---|
| 95 | "shares.happy", |
|---|
| 96 | "shares.needed", |
|---|
| 97 | "shares.total", |
|---|
| 98 | "shares._max_immutable_segment_size_for_testing", |
|---|
| 99 | "storage.plugins", |
|---|
| 100 | "force_foolscap", |
|---|
| 101 | ), |
|---|
| 102 | "storage": ( |
|---|
| 103 | "debug_discard", |
|---|
| 104 | "enabled", |
|---|
| 105 | "anonymous", |
|---|
| 106 | "expire.cutoff_date", |
|---|
| 107 | "expire.enabled", |
|---|
| 108 | "expire.immutable", |
|---|
| 109 | "expire.mode", |
|---|
| 110 | "expire.mode", |
|---|
| 111 | "expire.mutable", |
|---|
| 112 | "expire.override_lease_duration", |
|---|
| 113 | "readonly", |
|---|
| 114 | "reserved_space", |
|---|
| 115 | "storage_dir", |
|---|
| 116 | "plugins", |
|---|
| 117 | "grid_management", |
|---|
| 118 | "force_foolscap", |
|---|
| 119 | ), |
|---|
| 120 | "sftpd": ( |
|---|
| 121 | "accounts.file", |
|---|
| 122 | "enabled", |
|---|
| 123 | "host_privkey_file", |
|---|
| 124 | "host_pubkey_file", |
|---|
| 125 | "port", |
|---|
| 126 | ), |
|---|
| 127 | "helper": ( |
|---|
| 128 | "enabled", |
|---|
| 129 | ), |
|---|
| 130 | }, |
|---|
| 131 | is_valid_section=_is_valid_section, |
|---|
| 132 | # Anything in a valid section is a valid item, for now. |
|---|
| 133 | is_valid_item=lambda section, ignored: _is_valid_section(section), |
|---|
| 134 | ) |
|---|
| 135 | |
|---|
| 136 | |
|---|
| 137 | def _valid_config(): |
|---|
| 138 | cfg = node._common_valid_config() |
|---|
| 139 | return cfg.update(_client_config) |
|---|
| 140 | |
|---|
| 141 | # this is put into README in new node-directories |
|---|
| 142 | CLIENT_README = u""" |
|---|
| 143 | This directory contains files which contain private data for the Tahoe node, |
|---|
| 144 | such as private keys. On Unix-like systems, the permissions on this directory |
|---|
| 145 | are set to disallow users other than its owner from reading the contents of |
|---|
| 146 | the files. See the 'configuration.rst' documentation file for details. |
|---|
| 147 | """ |
|---|
| 148 | |
|---|
| 149 | |
|---|
| 150 | |
|---|
| 151 | def _make_secret(): |
|---|
| 152 | """ |
|---|
| 153 | Returns a base32-encoded random secret of hashutil.CRYPTO_VAL_SIZE |
|---|
| 154 | bytes. |
|---|
| 155 | """ |
|---|
| 156 | return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + b"\n" |
|---|
| 157 | |
|---|
| 158 | |
|---|
| 159 | class SecretHolder: |
|---|
| 160 | def __init__(self, lease_secret, convergence_secret): |
|---|
| 161 | self._lease_secret = lease_secret |
|---|
| 162 | self._convergence_secret = convergence_secret |
|---|
| 163 | |
|---|
| 164 | def get_renewal_secret(self): |
|---|
| 165 | return hashutil.my_renewal_secret_hash(self._lease_secret) |
|---|
| 166 | |
|---|
| 167 | def get_cancel_secret(self): |
|---|
| 168 | return hashutil.my_cancel_secret_hash(self._lease_secret) |
|---|
| 169 | |
|---|
| 170 | def get_convergence_secret(self): |
|---|
| 171 | return self._convergence_secret |
|---|
| 172 | |
|---|
| 173 | class KeyGenerator: |
|---|
| 174 | """I create RSA keys for mutable files. Each call to generate() returns a |
|---|
| 175 | single keypair.""" |
|---|
| 176 | |
|---|
| 177 | @async_to_deferred |
|---|
| 178 | async def generate(self) -> tuple[rsa.PublicKey, rsa.PrivateKey]: |
|---|
| 179 | """ |
|---|
| 180 | I return a Deferred that fires with a (verifyingkey, signingkey) |
|---|
| 181 | pair. The returned key will be 2048 bit. |
|---|
| 182 | """ |
|---|
| 183 | keysize = 2048 |
|---|
| 184 | private, public = await defer_to_thread( |
|---|
| 185 | rsa.create_signing_keypair, keysize |
|---|
| 186 | ) |
|---|
| 187 | return public, private |
|---|
| 188 | |
|---|
| 189 | |
|---|
| 190 | class Terminator(service.Service): |
|---|
| 191 | def __init__(self): |
|---|
| 192 | self._clients = weakref.WeakKeyDictionary() |
|---|
| 193 | def register(self, c): |
|---|
| 194 | self._clients[c] = None |
|---|
| 195 | def stopService(self): |
|---|
| 196 | for c in self._clients: |
|---|
| 197 | c.stop() |
|---|
| 198 | return service.Service.stopService(self) |
|---|
| 199 | |
|---|
| 200 | |
|---|
| 201 | def read_config(basedir, portnumfile, generated_files: Iterable=()): |
|---|
| 202 | """ |
|---|
| 203 | Read and validate configuration for a client-style Node. See |
|---|
| 204 | :method:`allmydata.node.read_config` for parameter meanings (the |
|---|
| 205 | only difference here is we pass different validation data) |
|---|
| 206 | |
|---|
| 207 | :returns: :class:`allmydata.node._Config` instance |
|---|
| 208 | """ |
|---|
| 209 | return node.read_config( |
|---|
| 210 | basedir, portnumfile, |
|---|
| 211 | generated_files=generated_files, |
|---|
| 212 | _valid_config=_valid_config(), |
|---|
| 213 | ) |
|---|
| 214 | |
|---|
| 215 | |
|---|
| 216 | config_from_string = partial( |
|---|
| 217 | node.config_from_string, |
|---|
| 218 | _valid_config=_valid_config(), |
|---|
| 219 | ) |
|---|
| 220 | |
|---|
| 221 | |
|---|
| 222 | def create_client(basedir=u".", _client_factory=None): |
|---|
| 223 | """ |
|---|
| 224 | Creates a new client instance (a subclass of Node). |
|---|
| 225 | |
|---|
| 226 | :param unicode basedir: the node directory (which may not exist yet) |
|---|
| 227 | |
|---|
| 228 | :param _client_factory: (for testing) a callable that returns an |
|---|
| 229 | instance of :class:`allmydata.node.Node` (or a subclass). By default |
|---|
| 230 | this is :class:`allmydata.client._Client` |
|---|
| 231 | |
|---|
| 232 | :returns: Deferred yielding an instance of :class:`allmydata.client._Client` |
|---|
| 233 | """ |
|---|
| 234 | try: |
|---|
| 235 | node.create_node_dir(basedir, CLIENT_README) |
|---|
| 236 | config = read_config(basedir, u"client.port") |
|---|
| 237 | # following call is async |
|---|
| 238 | return create_client_from_config( |
|---|
| 239 | config, |
|---|
| 240 | _client_factory=_client_factory, |
|---|
| 241 | ) |
|---|
| 242 | except Exception: |
|---|
| 243 | return defer.fail() |
|---|
| 244 | |
|---|
| 245 | |
|---|
| 246 | @defer.inlineCallbacks |
|---|
| 247 | def create_client_from_config(config, _client_factory=None, _introducer_factory=None): |
|---|
| 248 | """ |
|---|
| 249 | Creates a new client instance (a subclass of Node). Most code |
|---|
| 250 | should probably use `create_client` instead. |
|---|
| 251 | |
|---|
| 252 | :returns: Deferred yielding a _Client instance |
|---|
| 253 | |
|---|
| 254 | :param config: configuration instance (from read_config()) which |
|---|
| 255 | encapsulates everything in the "node directory". |
|---|
| 256 | |
|---|
| 257 | :param _client_factory: for testing; the class to instantiate |
|---|
| 258 | instead of _Client |
|---|
| 259 | |
|---|
| 260 | :param _introducer_factory: for testing; the class to instantiate instead |
|---|
| 261 | of IntroducerClient |
|---|
| 262 | """ |
|---|
| 263 | if _client_factory is None: |
|---|
| 264 | _client_factory = _Client |
|---|
| 265 | |
|---|
| 266 | i2p_provider = create_i2p_provider(reactor, config) |
|---|
| 267 | tor_provider = create_tor_provider(reactor, config) |
|---|
| 268 | handlers = node.create_connection_handlers(config, i2p_provider, tor_provider) |
|---|
| 269 | default_connection_handlers, foolscap_connection_handlers = handlers |
|---|
| 270 | tub_options = node.create_tub_options(config) |
|---|
| 271 | |
|---|
| 272 | main_tub = node.create_main_tub( |
|---|
| 273 | config, tub_options, default_connection_handlers, |
|---|
| 274 | foolscap_connection_handlers, i2p_provider, tor_provider, |
|---|
| 275 | ) |
|---|
| 276 | |
|---|
| 277 | introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory) |
|---|
| 278 | storage_broker = create_storage_farm_broker( |
|---|
| 279 | config, default_connection_handlers, foolscap_connection_handlers, |
|---|
| 280 | tub_options, introducer_clients, tor_provider |
|---|
| 281 | ) |
|---|
| 282 | |
|---|
| 283 | client = _client_factory( |
|---|
| 284 | config, |
|---|
| 285 | main_tub, |
|---|
| 286 | i2p_provider, |
|---|
| 287 | tor_provider, |
|---|
| 288 | introducer_clients, |
|---|
| 289 | storage_broker, |
|---|
| 290 | ) |
|---|
| 291 | |
|---|
| 292 | # Initialize storage separately after creating the client. This is |
|---|
| 293 | # necessary because we need to pass a reference to the client in to the |
|---|
| 294 | # storage plugins to allow them to initialize themselves (specifically, |
|---|
| 295 | # they may want the anonymous IStorageServer implementation so they don't |
|---|
| 296 | # have to duplicate all of its basic storage functionality). A better way |
|---|
| 297 | # to do this, eventually, may be to create that implementation first and |
|---|
| 298 | # then pass it in to both storage plugin creation and the client factory. |
|---|
| 299 | # This avoids making a partially initialized client object escape the |
|---|
| 300 | # client factory and removes the circular dependency between these |
|---|
| 301 | # objects. |
|---|
| 302 | storage_plugins = yield _StoragePlugins.from_config( |
|---|
| 303 | client.get_anonymous_storage_server, |
|---|
| 304 | config, |
|---|
| 305 | ) |
|---|
| 306 | client.init_storage(storage_plugins.announceable_storage_servers) |
|---|
| 307 | |
|---|
| 308 | i2p_provider.setServiceParent(client) |
|---|
| 309 | tor_provider.setServiceParent(client) |
|---|
| 310 | for ic in introducer_clients: |
|---|
| 311 | ic.setServiceParent(client) |
|---|
| 312 | storage_broker.setServiceParent(client) |
|---|
| 313 | defer.returnValue(client) |
|---|
| 314 | |
|---|
| 315 | |
|---|
| 316 | @attr.s |
|---|
| 317 | class _StoragePlugins: |
|---|
| 318 | """ |
|---|
| 319 | Functionality related to getting storage plugins set up and ready for use. |
|---|
| 320 | |
|---|
| 321 | :ivar list[IAnnounceableStorageServer] announceable_storage_servers: The |
|---|
| 322 | announceable storage servers that should be used according to node |
|---|
| 323 | configuration. |
|---|
| 324 | """ |
|---|
| 325 | announceable_storage_servers = attr.ib() |
|---|
| 326 | |
|---|
| 327 | @classmethod |
|---|
| 328 | @defer.inlineCallbacks |
|---|
| 329 | def from_config(cls, get_anonymous_storage_server, config): |
|---|
| 330 | """ |
|---|
| 331 | Load and configured storage plugins. |
|---|
| 332 | |
|---|
| 333 | :param get_anonymous_storage_server: A no-argument callable which |
|---|
| 334 | returns the node's anonymous ``IStorageServer`` implementation. |
|---|
| 335 | |
|---|
| 336 | :param _Config config: The node's configuration. |
|---|
| 337 | |
|---|
| 338 | :return: A ``_StoragePlugins`` initialized from the given |
|---|
| 339 | configuration. |
|---|
| 340 | """ |
|---|
| 341 | storage_plugin_names = cls._get_enabled_storage_plugin_names(config) |
|---|
| 342 | plugins = list(cls._collect_storage_plugins(storage_plugin_names)) |
|---|
| 343 | unknown_plugin_names = storage_plugin_names - {plugin.name for plugin in plugins} |
|---|
| 344 | if unknown_plugin_names: |
|---|
| 345 | raise configutil.UnknownConfigError( |
|---|
| 346 | "Storage plugins {} are enabled but not known on this system.".format( |
|---|
| 347 | unknown_plugin_names, |
|---|
| 348 | ), |
|---|
| 349 | ) |
|---|
| 350 | announceable_storage_servers = yield cls._create_plugin_storage_servers( |
|---|
| 351 | get_anonymous_storage_server, |
|---|
| 352 | config, |
|---|
| 353 | plugins, |
|---|
| 354 | ) |
|---|
| 355 | defer.returnValue(cls( |
|---|
| 356 | announceable_storage_servers, |
|---|
| 357 | )) |
|---|
| 358 | |
|---|
| 359 | @classmethod |
|---|
| 360 | def _get_enabled_storage_plugin_names(cls, config): |
|---|
| 361 | """ |
|---|
| 362 | Get the names of storage plugins that are enabled in the configuration. |
|---|
| 363 | """ |
|---|
| 364 | return set( |
|---|
| 365 | config.get_config( |
|---|
| 366 | "storage", "plugins", "" |
|---|
| 367 | ).split(u",") |
|---|
| 368 | ) - {u""} |
|---|
| 369 | |
|---|
| 370 | @classmethod |
|---|
| 371 | def _collect_storage_plugins(cls, storage_plugin_names): |
|---|
| 372 | """ |
|---|
| 373 | Get the storage plugins with names matching those given. |
|---|
| 374 | """ |
|---|
| 375 | return list( |
|---|
| 376 | plugin |
|---|
| 377 | for plugin |
|---|
| 378 | in getPlugins(IFoolscapStoragePlugin) |
|---|
| 379 | if plugin.name in storage_plugin_names |
|---|
| 380 | ) |
|---|
| 381 | |
|---|
| 382 | @classmethod |
|---|
| 383 | def _create_plugin_storage_servers(cls, get_anonymous_storage_server, config, plugins): |
|---|
| 384 | """ |
|---|
| 385 | Cause each storage plugin to instantiate its storage server and return |
|---|
| 386 | them all. |
|---|
| 387 | |
|---|
| 388 | :return: A ``Deferred`` that fires with storage servers instantiated |
|---|
| 389 | by all of the given storage server plugins. |
|---|
| 390 | """ |
|---|
| 391 | return defer.gatherResults( |
|---|
| 392 | list( |
|---|
| 393 | plugin.get_storage_server( |
|---|
| 394 | cls._get_storage_plugin_configuration(config, plugin.name), |
|---|
| 395 | get_anonymous_storage_server, |
|---|
| 396 | ).addCallback( |
|---|
| 397 | partial( |
|---|
| 398 | _add_to_announcement, |
|---|
| 399 | {u"name": plugin.name}, |
|---|
| 400 | ), |
|---|
| 401 | ) |
|---|
| 402 | for plugin |
|---|
| 403 | # The order is fairly arbitrary and it is not meant to convey |
|---|
| 404 | # anything but providing *some* stable ordering makes the data |
|---|
| 405 | # a little easier to deal with (mainly in tests and when |
|---|
| 406 | # manually inspecting it). |
|---|
| 407 | in sorted(plugins, key=lambda p: p.name) |
|---|
| 408 | ), |
|---|
| 409 | ) |
|---|
| 410 | |
|---|
| 411 | @classmethod |
|---|
| 412 | def _get_storage_plugin_configuration(cls, config, storage_plugin_name): |
|---|
| 413 | """ |
|---|
| 414 | Load the configuration for a storage server plugin with the given name. |
|---|
| 415 | |
|---|
| 416 | :return dict[bytes, bytes]: The matching configuration. |
|---|
| 417 | """ |
|---|
| 418 | try: |
|---|
| 419 | config = config.items( |
|---|
| 420 | "storageserver.plugins." + storage_plugin_name, |
|---|
| 421 | ) |
|---|
| 422 | except NoSectionError: |
|---|
| 423 | config = [] |
|---|
| 424 | return dict(config) |
|---|
| 425 | |
|---|
| 426 | |
|---|
| 427 | |
|---|
| 428 | def _sequencer(config): |
|---|
| 429 | """ |
|---|
| 430 | :returns: a 2-tuple consisting of a new announcement |
|---|
| 431 | sequence-number and random nonce (int, unicode). Reads and |
|---|
| 432 | re-writes configuration file "announcement-seqnum" (starting at 1 |
|---|
| 433 | if that file doesn't exist). |
|---|
| 434 | """ |
|---|
| 435 | seqnum_s = config.get_config_from_file("announcement-seqnum") |
|---|
| 436 | if not seqnum_s: |
|---|
| 437 | seqnum_s = u"0" |
|---|
| 438 | seqnum = int(seqnum_s.strip()) |
|---|
| 439 | seqnum += 1 # increment |
|---|
| 440 | config.write_config_file("announcement-seqnum", "{}\n".format(seqnum)) |
|---|
| 441 | nonce = _make_secret().strip() |
|---|
| 442 | return seqnum, nonce |
|---|
| 443 | |
|---|
| 444 | |
|---|
| 445 | def create_introducer_clients(config, main_tub, _introducer_factory=None): |
|---|
| 446 | """ |
|---|
| 447 | Read, validate and parse any 'introducers.yaml' configuration. |
|---|
| 448 | |
|---|
| 449 | :param _introducer_factory: for testing; the class to instantiate instead |
|---|
| 450 | of IntroducerClient |
|---|
| 451 | |
|---|
| 452 | :returns: a list of IntroducerClient instances |
|---|
| 453 | """ |
|---|
| 454 | if _introducer_factory is None: |
|---|
| 455 | _introducer_factory = IntroducerClient |
|---|
| 456 | |
|---|
| 457 | # we return this list |
|---|
| 458 | introducer_clients = [] |
|---|
| 459 | |
|---|
| 460 | introducers = config.get_introducer_configuration() |
|---|
| 461 | |
|---|
| 462 | for petname, (furl, cache_path) in list(introducers.items()): |
|---|
| 463 | ic = _introducer_factory( |
|---|
| 464 | main_tub, |
|---|
| 465 | furl.encode("ascii"), |
|---|
| 466 | config.nickname, |
|---|
| 467 | str(allmydata.__full_version__), |
|---|
| 468 | str(_Client.OLDEST_SUPPORTED_VERSION), |
|---|
| 469 | partial(_sequencer, config), |
|---|
| 470 | cache_path, |
|---|
| 471 | ) |
|---|
| 472 | introducer_clients.append(ic) |
|---|
| 473 | return introducer_clients |
|---|
| 474 | |
|---|
| 475 | |
|---|
| 476 | def create_storage_farm_broker(config: _Config, default_connection_handlers, foolscap_connection_handlers, tub_options, introducer_clients, tor_provider: Optional[TorProvider]): |
|---|
| 477 | """ |
|---|
| 478 | Create a StorageFarmBroker object, for use by Uploader/Downloader |
|---|
| 479 | (and everybody else who wants to use storage servers) |
|---|
| 480 | |
|---|
| 481 | :param config: a _Config instance |
|---|
| 482 | |
|---|
| 483 | :param default_connection_handlers: default Foolscap handlers |
|---|
| 484 | |
|---|
| 485 | :param foolscap_connection_handlers: available/configured Foolscap |
|---|
| 486 | handlers |
|---|
| 487 | |
|---|
| 488 | :param dict tub_options: how to configure our Tub |
|---|
| 489 | |
|---|
| 490 | :param list introducer_clients: IntroducerClient instances if |
|---|
| 491 | we're connecting to any |
|---|
| 492 | """ |
|---|
| 493 | storage_client_config = storage_client.StorageClientConfig.from_node_config( |
|---|
| 494 | config, |
|---|
| 495 | ) |
|---|
| 496 | # ensure that we can at least load all plugins that the |
|---|
| 497 | # configuration mentions; doing this early (i.e. before creating |
|---|
| 498 | # storage-clients themselves) allows us to exit in case of a |
|---|
| 499 | # problem. |
|---|
| 500 | storage_client_config.get_configured_storage_plugins() |
|---|
| 501 | |
|---|
| 502 | def tub_creator(handler_overrides=None, **kwargs): |
|---|
| 503 | return node.create_tub( |
|---|
| 504 | tub_options, |
|---|
| 505 | default_connection_handlers, |
|---|
| 506 | foolscap_connection_handlers, |
|---|
| 507 | handler_overrides={} if handler_overrides is None else handler_overrides, |
|---|
| 508 | **kwargs |
|---|
| 509 | ) |
|---|
| 510 | |
|---|
| 511 | # create the actual storage-broker |
|---|
| 512 | sb = storage_client.StorageFarmBroker( |
|---|
| 513 | permute_peers=True, |
|---|
| 514 | tub_maker=tub_creator, |
|---|
| 515 | node_config=config, |
|---|
| 516 | storage_client_config=storage_client_config, |
|---|
| 517 | default_connection_handlers=default_connection_handlers, |
|---|
| 518 | tor_provider=tor_provider, |
|---|
| 519 | ) |
|---|
| 520 | for ic in introducer_clients: |
|---|
| 521 | sb.use_introducer(ic) |
|---|
| 522 | return sb |
|---|
| 523 | |
|---|
| 524 | |
|---|
| 525 | def _register_reference(key, config, tub, referenceable): |
|---|
| 526 | """ |
|---|
| 527 | Register a referenceable in a tub with a stable fURL. |
|---|
| 528 | |
|---|
| 529 | Stability is achieved by storing the fURL in the configuration the first |
|---|
| 530 | time and then reading it back on for future calls. |
|---|
| 531 | |
|---|
| 532 | :param bytes key: An identifier for this reference which can be used to |
|---|
| 533 | identify its fURL in the configuration. |
|---|
| 534 | |
|---|
| 535 | :param _Config config: The configuration to use for fURL persistence. |
|---|
| 536 | |
|---|
| 537 | :param Tub tub: The tub in which to register the reference. |
|---|
| 538 | |
|---|
| 539 | :param Referenceable referenceable: The referenceable to register in the |
|---|
| 540 | Tub. |
|---|
| 541 | |
|---|
| 542 | :return bytes: The fURL at which the object is registered. |
|---|
| 543 | """ |
|---|
| 544 | persisted_furl = config.get_private_config( |
|---|
| 545 | key, |
|---|
| 546 | default=None, |
|---|
| 547 | ) |
|---|
| 548 | name = None |
|---|
| 549 | if persisted_furl is not None: |
|---|
| 550 | _, _, name = decode_furl(persisted_furl) |
|---|
| 551 | registered_furl = tub.registerReference( |
|---|
| 552 | referenceable, |
|---|
| 553 | name=name, |
|---|
| 554 | ) |
|---|
| 555 | if persisted_furl is None: |
|---|
| 556 | config.write_private_config(key, registered_furl) |
|---|
| 557 | return registered_furl |
|---|
| 558 | |
|---|
| 559 | |
|---|
| 560 | @implementer(IAnnounceableStorageServer) |
|---|
| 561 | @attr.s |
|---|
| 562 | class AnnounceableStorageServer: |
|---|
| 563 | announcement = attr.ib() |
|---|
| 564 | storage_server = attr.ib() |
|---|
| 565 | |
|---|
| 566 | |
|---|
| 567 | |
|---|
| 568 | def _add_to_announcement(information, announceable_storage_server): |
|---|
| 569 | """ |
|---|
| 570 | Create a new ``AnnounceableStorageServer`` based on |
|---|
| 571 | ``announceable_storage_server`` with ``information`` added to its |
|---|
| 572 | ``announcement``. |
|---|
| 573 | """ |
|---|
| 574 | updated_announcement = announceable_storage_server.announcement.copy() |
|---|
| 575 | updated_announcement.update(information) |
|---|
| 576 | return AnnounceableStorageServer( |
|---|
| 577 | updated_announcement, |
|---|
| 578 | announceable_storage_server.storage_server, |
|---|
| 579 | ) |
|---|
| 580 | |
|---|
| 581 | |
|---|
| 582 | def storage_enabled(config): |
|---|
| 583 | """ |
|---|
| 584 | Is storage enabled according to the given configuration object? |
|---|
| 585 | |
|---|
| 586 | :param _Config config: The configuration to inspect. |
|---|
| 587 | |
|---|
| 588 | :return bool: ``True`` if storage is enabled, ``False`` otherwise. |
|---|
| 589 | """ |
|---|
| 590 | return config.get_config("storage", "enabled", True, boolean=True) |
|---|
| 591 | |
|---|
| 592 | |
|---|
| 593 | def anonymous_storage_enabled(config): |
|---|
| 594 | """ |
|---|
| 595 | Is anonymous access to storage enabled according to the given |
|---|
| 596 | configuration object? |
|---|
| 597 | |
|---|
| 598 | :param _Config config: The configuration to inspect. |
|---|
| 599 | |
|---|
| 600 | :return bool: ``True`` if storage is enabled, ``False`` otherwise. |
|---|
| 601 | """ |
|---|
| 602 | return ( |
|---|
| 603 | storage_enabled(config) and |
|---|
| 604 | config.get_config("storage", "anonymous", True, boolean=True) |
|---|
| 605 | ) |
|---|
| 606 | |
|---|
| 607 | |
|---|
| 608 | @implementer(IStatsProducer) |
|---|
| 609 | class _Client(node.Node, pollmixin.PollMixin): |
|---|
| 610 | """ |
|---|
| 611 | This class should be refactored; see |
|---|
| 612 | https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 |
|---|
| 613 | """ |
|---|
| 614 | |
|---|
| 615 | STOREDIR = 'storage' |
|---|
| 616 | NODETYPE = "client" |
|---|
| 617 | EXIT_TRIGGER_FILE = "exit_trigger" |
|---|
| 618 | |
|---|
| 619 | # This means that if a storage server treats me as though I were a |
|---|
| 620 | # 1.0.0 storage client, it will work as they expect. |
|---|
| 621 | OLDEST_SUPPORTED_VERSION = "1.0.0" |
|---|
| 622 | |
|---|
| 623 | # This is a dictionary of (needed, desired, total, max_segment_size). 'needed' |
|---|
| 624 | # is the number of shares required to reconstruct a file. 'desired' means |
|---|
| 625 | # that we will abort an upload unless we can allocate space for at least |
|---|
| 626 | # this many. 'total' is the total number of shares created by encoding. |
|---|
| 627 | # If everybody has room then this is is how many we will upload. |
|---|
| 628 | DEFAULT_ENCODING_PARAMETERS = {"k": 3, |
|---|
| 629 | "happy": 7, |
|---|
| 630 | "n": 10, |
|---|
| 631 | "max_segment_size": DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, |
|---|
| 632 | } |
|---|
| 633 | |
|---|
| 634 | def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients, |
|---|
| 635 | storage_farm_broker): |
|---|
| 636 | """ |
|---|
| 637 | Use :func:`allmydata.client.create_client` to instantiate one of these. |
|---|
| 638 | """ |
|---|
| 639 | node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider) |
|---|
| 640 | |
|---|
| 641 | self.started_timestamp = time.time() |
|---|
| 642 | self.logSource = "Client" |
|---|
| 643 | self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() |
|---|
| 644 | |
|---|
| 645 | self.introducer_clients = introducer_clients |
|---|
| 646 | self.storage_broker = storage_farm_broker |
|---|
| 647 | |
|---|
| 648 | self.init_stats_provider() |
|---|
| 649 | self.init_secrets() |
|---|
| 650 | self.init_node_key() |
|---|
| 651 | self._key_generator = KeyGenerator() |
|---|
| 652 | key_gen_furl = config.get_config("client", "key_generator.furl", None) |
|---|
| 653 | if key_gen_furl: |
|---|
| 654 | log.msg("[client]key_generator.furl= is now ignored, see #2783") |
|---|
| 655 | self.init_client() |
|---|
| 656 | self.load_static_servers() |
|---|
| 657 | self.helper = None |
|---|
| 658 | if config.get_config("helper", "enabled", False, boolean=True): |
|---|
| 659 | if not self._is_tub_listening(): |
|---|
| 660 | raise ValueError("config error: helper is enabled, but tub " |
|---|
| 661 | "is not listening ('tub.port=' is empty)") |
|---|
| 662 | self.init_helper() |
|---|
| 663 | self.init_sftp_server() |
|---|
| 664 | |
|---|
| 665 | # If the node sees an exit_trigger file, it will poll every second to see |
|---|
| 666 | # whether the file still exists, and what its mtime is. If the file does not |
|---|
| 667 | # exist or has not been modified for a given timeout, the node will exit. |
|---|
| 668 | exit_trigger_file = config.get_config_path(self.EXIT_TRIGGER_FILE) |
|---|
| 669 | if os.path.exists(exit_trigger_file): |
|---|
| 670 | age = time.time() - os.stat(exit_trigger_file)[stat.ST_MTIME] |
|---|
| 671 | self.log("%s file noticed (%ds old), starting timer" % (self.EXIT_TRIGGER_FILE, age)) |
|---|
| 672 | exit_trigger = TimerService(1.0, self._check_exit_trigger, exit_trigger_file) |
|---|
| 673 | exit_trigger.setServiceParent(self) |
|---|
| 674 | |
|---|
| 675 | # this needs to happen last, so it can use getServiceNamed() to |
|---|
| 676 | # acquire references to StorageServer and other web-statusable things |
|---|
| 677 | webport = config.get_config("node", "web.port", None) |
|---|
| 678 | if webport: |
|---|
| 679 | self.init_web(webport) # strports string |
|---|
| 680 | |
|---|
| 681 | # TODO this may be the wrong location for now? but as temporary measure |
|---|
| 682 | # it allows us to get NURLs for testing in test_istorageserver.py. This |
|---|
| 683 | # will eventually get fixed one way or another in |
|---|
| 684 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3901. See also |
|---|
| 685 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3931 for the bigger |
|---|
| 686 | # picture issue. |
|---|
| 687 | self.storage_nurls : Optional[set] = None |
|---|
| 688 | |
|---|
| 689 | def init_stats_provider(self): |
|---|
| 690 | self.stats_provider = StatsProvider(self) |
|---|
| 691 | self.stats_provider.setServiceParent(self) |
|---|
| 692 | self.stats_provider.register_producer(self) |
|---|
| 693 | |
|---|
| 694 | def get_stats(self): |
|---|
| 695 | return { 'node.uptime': time.time() - self.started_timestamp } |
|---|
| 696 | |
|---|
| 697 | def init_secrets(self): |
|---|
| 698 | # configs are always unicode |
|---|
| 699 | def _unicode_make_secret(): |
|---|
| 700 | return str(_make_secret(), "ascii") |
|---|
| 701 | lease_s = self.config.get_or_create_private_config( |
|---|
| 702 | "secret", _unicode_make_secret).encode("utf-8") |
|---|
| 703 | lease_secret = base32.a2b(lease_s) |
|---|
| 704 | convergence_s = self.config.get_or_create_private_config( |
|---|
| 705 | 'convergence', _unicode_make_secret).encode("utf-8") |
|---|
| 706 | self.convergence = base32.a2b(convergence_s) |
|---|
| 707 | self._secret_holder = SecretHolder(lease_secret, self.convergence) |
|---|
| 708 | |
|---|
| 709 | def init_node_key(self): |
|---|
| 710 | # we only create the key once. On all subsequent runs, we re-use the |
|---|
| 711 | # existing key |
|---|
| 712 | def _make_key(): |
|---|
| 713 | private_key, _ = ed25519.create_signing_keypair() |
|---|
| 714 | # Config values are always unicode: |
|---|
| 715 | return str(ed25519.string_from_signing_key(private_key) + b"\n", "utf-8") |
|---|
| 716 | |
|---|
| 717 | private_key_str = self.config.get_or_create_private_config( |
|---|
| 718 | "node.privkey", _make_key).encode("utf-8") |
|---|
| 719 | private_key, public_key = ed25519.signing_keypair_from_string(private_key_str) |
|---|
| 720 | public_key_str = ed25519.string_from_verifying_key(public_key) |
|---|
| 721 | self.config.write_config_file("node.pubkey", public_key_str + b"\n", "wb") |
|---|
| 722 | self._node_private_key = private_key |
|---|
| 723 | self._node_public_key = public_key |
|---|
| 724 | |
|---|
| 725 | def get_long_nodeid(self): |
|---|
| 726 | # this matches what IServer.get_longname() says about us elsewhere |
|---|
| 727 | vk_string = ed25519.string_from_verifying_key(self._node_public_key) |
|---|
| 728 | return remove_prefix(vk_string, b"pub-") |
|---|
| 729 | |
|---|
| 730 | def get_long_tubid(self): |
|---|
| 731 | return idlib.nodeid_b2a(self.nodeid) |
|---|
| 732 | |
|---|
| 733 | def get_web_service(self): |
|---|
| 734 | """ |
|---|
| 735 | :return: a reference to our web server |
|---|
| 736 | """ |
|---|
| 737 | return self.getServiceNamed("webish") |
|---|
| 738 | |
|---|
| 739 | def _init_permutation_seed(self, ss): |
|---|
| 740 | seed = self.config.get_config_from_file("permutation-seed") |
|---|
| 741 | if not seed: |
|---|
| 742 | have_shares = ss.have_shares() |
|---|
| 743 | if have_shares: |
|---|
| 744 | # if the server has shares but not a recorded |
|---|
| 745 | # permutation-seed, then it has been around since pre-#466 |
|---|
| 746 | # days, and the clients who uploaded those shares used our |
|---|
| 747 | # TubID as a permutation-seed. We should keep using that same |
|---|
| 748 | # seed to keep the shares in the same place in the permuted |
|---|
| 749 | # ring, so those clients don't have to perform excessive |
|---|
| 750 | # searches. |
|---|
| 751 | seed = base32.b2a(self.nodeid) |
|---|
| 752 | else: |
|---|
| 753 | # otherwise, we're free to use the more natural seed of our |
|---|
| 754 | # pubkey-based serverid |
|---|
| 755 | vk_string = ed25519.string_from_verifying_key(self._node_public_key) |
|---|
| 756 | vk_bytes = remove_prefix(vk_string, ed25519.PUBLIC_KEY_PREFIX) |
|---|
| 757 | seed = base32.b2a(vk_bytes) |
|---|
| 758 | self.config.write_config_file("permutation-seed", seed+b"\n", mode="wb") |
|---|
| 759 | return seed.strip() |
|---|
| 760 | |
|---|
| 761 | def get_anonymous_storage_server(self): |
|---|
| 762 | """ |
|---|
| 763 | Get the anonymous ``IStorageServer`` implementation for this node. |
|---|
| 764 | |
|---|
| 765 | Note this will return an object even if storage is disabled on this |
|---|
| 766 | node (but the object will not be exposed, peers will not be able to |
|---|
| 767 | access it, and storage will remain disabled). |
|---|
| 768 | |
|---|
| 769 | The one and only instance for this node is always returned. It is |
|---|
| 770 | created first if necessary. |
|---|
| 771 | """ |
|---|
| 772 | try: |
|---|
| 773 | ss = self.getServiceNamed(StorageServer.name) |
|---|
| 774 | except KeyError: |
|---|
| 775 | pass |
|---|
| 776 | else: |
|---|
| 777 | return ss |
|---|
| 778 | |
|---|
| 779 | readonly = self.config.get_config("storage", "readonly", False, boolean=True) |
|---|
| 780 | |
|---|
| 781 | config_storedir = self.get_config( |
|---|
| 782 | "storage", "storage_dir", self.STOREDIR, |
|---|
| 783 | ) |
|---|
| 784 | storedir = self.config.get_config_path(config_storedir) |
|---|
| 785 | |
|---|
| 786 | data = self.config.get_config("storage", "reserved_space", None) |
|---|
| 787 | try: |
|---|
| 788 | reserved = parse_abbreviated_size(data) |
|---|
| 789 | except ValueError: |
|---|
| 790 | log.msg("[storage]reserved_space= contains unparseable value %s" |
|---|
| 791 | % data) |
|---|
| 792 | raise |
|---|
| 793 | if reserved is None: |
|---|
| 794 | reserved = 0 |
|---|
| 795 | discard = self.config.get_config("storage", "debug_discard", False, |
|---|
| 796 | boolean=True) |
|---|
| 797 | |
|---|
| 798 | expire = self.config.get_config("storage", "expire.enabled", False, boolean=True) |
|---|
| 799 | if expire: |
|---|
| 800 | mode = self.config.get_config("storage", "expire.mode") # require a mode |
|---|
| 801 | else: |
|---|
| 802 | mode = self.config.get_config("storage", "expire.mode", "age") |
|---|
| 803 | |
|---|
| 804 | o_l_d = self.config.get_config("storage", "expire.override_lease_duration", None) |
|---|
| 805 | if o_l_d is not None: |
|---|
| 806 | o_l_d = parse_duration(o_l_d) |
|---|
| 807 | |
|---|
| 808 | cutoff_date = None |
|---|
| 809 | if mode == "cutoff-date": |
|---|
| 810 | cutoff_date = self.config.get_config("storage", "expire.cutoff_date") |
|---|
| 811 | cutoff_date = parse_date(cutoff_date) |
|---|
| 812 | |
|---|
| 813 | sharetypes = [] |
|---|
| 814 | if self.config.get_config("storage", "expire.immutable", True, boolean=True): |
|---|
| 815 | sharetypes.append("immutable") |
|---|
| 816 | if self.config.get_config("storage", "expire.mutable", True, boolean=True): |
|---|
| 817 | sharetypes.append("mutable") |
|---|
| 818 | expiration_sharetypes = tuple(sharetypes) |
|---|
| 819 | |
|---|
| 820 | ss = StorageServer( |
|---|
| 821 | storedir, self.nodeid, |
|---|
| 822 | reserved_space=reserved, |
|---|
| 823 | discard_storage=discard, |
|---|
| 824 | readonly_storage=readonly, |
|---|
| 825 | stats_provider=self.stats_provider, |
|---|
| 826 | expiration_enabled=expire, |
|---|
| 827 | expiration_mode=mode, |
|---|
| 828 | expiration_override_lease_duration=o_l_d, |
|---|
| 829 | expiration_cutoff_date=cutoff_date, |
|---|
| 830 | expiration_sharetypes=expiration_sharetypes, |
|---|
| 831 | ) |
|---|
| 832 | ss.setServiceParent(self) |
|---|
| 833 | return ss |
|---|
| 834 | |
|---|
| 835 | def init_storage(self, announceable_storage_servers): |
|---|
| 836 | # should we run a storage server (and publish it for others to use)? |
|---|
| 837 | if not storage_enabled(self.config): |
|---|
| 838 | return |
|---|
| 839 | if not self._is_tub_listening(): |
|---|
| 840 | raise ValueError("config error: storage is enabled, but tub " |
|---|
| 841 | "is not listening ('tub.port=' is empty)") |
|---|
| 842 | |
|---|
| 843 | ss = self.get_anonymous_storage_server() |
|---|
| 844 | announcement = { |
|---|
| 845 | "permutation-seed-base32": self._init_permutation_seed(ss), |
|---|
| 846 | } |
|---|
| 847 | |
|---|
| 848 | if anonymous_storage_enabled(self.config): |
|---|
| 849 | furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding()) |
|---|
| 850 | furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file) |
|---|
| 851 | (_, _, swissnum) = decode_furl(furl) |
|---|
| 852 | if hasattr(self.tub.negotiationClass, "add_storage_server"): |
|---|
| 853 | nurls = self.tub.negotiationClass.add_storage_server(ss, swissnum.encode("ascii")) |
|---|
| 854 | self.storage_nurls = nurls |
|---|
| 855 | # There is code in e.g. storage_client.py that checks if an |
|---|
| 856 | # announcement has changed. Since NURL order isn't meaningful, |
|---|
| 857 | # we don't want a change in the order to count as a change, so we |
|---|
| 858 | # send the NURLs as a set. CBOR supports sets, as does Foolscap. |
|---|
| 859 | announcement[storage_client.ANONYMOUS_STORAGE_NURLS] = {n.to_text() for n in nurls} |
|---|
| 860 | announcement["anonymous-storage-FURL"] = furl |
|---|
| 861 | |
|---|
| 862 | enabled_storage_servers = self._enable_storage_servers( |
|---|
| 863 | announceable_storage_servers, |
|---|
| 864 | ) |
|---|
| 865 | storage_options = list( |
|---|
| 866 | storage_server.announcement |
|---|
| 867 | for storage_server |
|---|
| 868 | in enabled_storage_servers |
|---|
| 869 | ) |
|---|
| 870 | plugins_announcement = {} |
|---|
| 871 | if storage_options: |
|---|
| 872 | # Only add the new key if there are any plugins enabled. |
|---|
| 873 | plugins_announcement[u"storage-options"] = storage_options |
|---|
| 874 | |
|---|
| 875 | announcement.update(plugins_announcement) |
|---|
| 876 | |
|---|
| 877 | if self.config.get_config("storage", "grid_management", default=False, boolean=True): |
|---|
| 878 | grid_manager_certificates = self.config.get_grid_manager_certificates() |
|---|
| 879 | announcement[u"grid-manager-certificates"] = grid_manager_certificates |
|---|
| 880 | |
|---|
| 881 | # Note: certificates are not verified for validity here, but |
|---|
| 882 | # that may be useful. See: |
|---|
| 883 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3977 |
|---|
| 884 | |
|---|
| 885 | for ic in self.introducer_clients: |
|---|
| 886 | ic.publish("storage", announcement, self._node_private_key) |
|---|
| 887 | |
|---|
| 888 | def get_client_storage_plugin_web_resources(self): |
|---|
| 889 | """ |
|---|
| 890 | Get all of the client-side ``IResource`` implementations provided by |
|---|
| 891 | enabled storage plugins. |
|---|
| 892 | |
|---|
| 893 | :return dict[bytes, IResource provider]: The implementations. |
|---|
| 894 | """ |
|---|
| 895 | return self.storage_broker.get_client_storage_plugin_web_resources( |
|---|
| 896 | self.config, |
|---|
| 897 | ) |
|---|
| 898 | |
|---|
| 899 | def _enable_storage_servers(self, announceable_storage_servers): |
|---|
| 900 | """ |
|---|
| 901 | Register and announce the given storage servers. |
|---|
| 902 | """ |
|---|
| 903 | for announceable in announceable_storage_servers: |
|---|
| 904 | yield self._enable_storage_server(announceable) |
|---|
| 905 | |
|---|
| 906 | def _enable_storage_server(self, announceable_storage_server): |
|---|
| 907 | """ |
|---|
| 908 | Register a storage server. |
|---|
| 909 | """ |
|---|
| 910 | config_key = "storage-plugin.{}.furl".format( |
|---|
| 911 | # Oops, why don't I have a better handle on this value? |
|---|
| 912 | announceable_storage_server.announcement[u"name"], |
|---|
| 913 | ) |
|---|
| 914 | furl = _register_reference( |
|---|
| 915 | config_key, |
|---|
| 916 | self.config, |
|---|
| 917 | self.tub, |
|---|
| 918 | announceable_storage_server.storage_server, |
|---|
| 919 | ) |
|---|
| 920 | announceable_storage_server = _add_to_announcement( |
|---|
| 921 | {u"storage-server-FURL": furl}, |
|---|
| 922 | announceable_storage_server, |
|---|
| 923 | ) |
|---|
| 924 | return announceable_storage_server |
|---|
| 925 | |
|---|
| 926 | def init_client(self): |
|---|
| 927 | helper_furl = self.config.get_config("client", "helper.furl", None) |
|---|
| 928 | if helper_furl in ("None", ""): |
|---|
| 929 | helper_furl = None |
|---|
| 930 | |
|---|
| 931 | DEP = self.encoding_params |
|---|
| 932 | DEP["k"] = int(self.config.get_config("client", "shares.needed", DEP["k"])) |
|---|
| 933 | DEP["n"] = int(self.config.get_config("client", "shares.total", DEP["n"])) |
|---|
| 934 | DEP["happy"] = int(self.config.get_config("client", "shares.happy", DEP["happy"])) |
|---|
| 935 | # At the moment this is only used for testing, thus the janky config |
|---|
| 936 | # attribute name. |
|---|
| 937 | DEP["max_segment_size"] = int(self.config.get_config( |
|---|
| 938 | "client", |
|---|
| 939 | "shares._max_immutable_segment_size_for_testing", |
|---|
| 940 | DEP["max_segment_size"]) |
|---|
| 941 | ) |
|---|
| 942 | |
|---|
| 943 | # for the CLI to authenticate to local JSON endpoints |
|---|
| 944 | self._create_auth_token() |
|---|
| 945 | |
|---|
| 946 | self.history = History(self.stats_provider) |
|---|
| 947 | self.terminator = Terminator() |
|---|
| 948 | self.terminator.setServiceParent(self) |
|---|
| 949 | uploader = Uploader( |
|---|
| 950 | helper_furl, |
|---|
| 951 | self.stats_provider, |
|---|
| 952 | self.history, |
|---|
| 953 | ) |
|---|
| 954 | uploader.setServiceParent(self) |
|---|
| 955 | self.init_blacklist() |
|---|
| 956 | self.init_nodemaker() |
|---|
| 957 | |
|---|
| 958 | def get_auth_token(self): |
|---|
| 959 | """ |
|---|
| 960 | This returns a local authentication token, which is just some |
|---|
| 961 | random data in "api_auth_token" which must be echoed to API |
|---|
| 962 | calls. |
|---|
| 963 | """ |
|---|
| 964 | return self.config.get_private_config( |
|---|
| 965 | 'api_auth_token').encode("ascii") |
|---|
| 966 | |
|---|
| 967 | def _create_auth_token(self): |
|---|
| 968 | """ |
|---|
| 969 | Creates new auth-token data written to 'private/api_auth_token'. |
|---|
| 970 | |
|---|
| 971 | This is intentionally re-created every time the node starts. |
|---|
| 972 | """ |
|---|
| 973 | self.config.write_private_config( |
|---|
| 974 | 'api_auth_token', |
|---|
| 975 | urlsafe_b64encode(os.urandom(32)) + b'\n', |
|---|
| 976 | ) |
|---|
| 977 | |
|---|
| 978 | def get_storage_broker(self): |
|---|
| 979 | return self.storage_broker |
|---|
| 980 | |
|---|
| 981 | def load_static_servers(self): |
|---|
| 982 | """ |
|---|
| 983 | Load the servers.yaml file if it exists, and provide the static |
|---|
| 984 | server data to the StorageFarmBroker. |
|---|
| 985 | """ |
|---|
| 986 | fn = self.config.get_private_path("servers.yaml") |
|---|
| 987 | servers_filepath = FilePath(fn) |
|---|
| 988 | try: |
|---|
| 989 | with servers_filepath.open() as f: |
|---|
| 990 | servers_yaml = yamlutil.safe_load(f) |
|---|
| 991 | static_servers = servers_yaml.get("storage", {}) |
|---|
| 992 | log.msg("found %d static servers in private/servers.yaml" % |
|---|
| 993 | len(static_servers)) |
|---|
| 994 | static_servers = { |
|---|
| 995 | ensure_text(key): value for (key, value) in static_servers.items() |
|---|
| 996 | } |
|---|
| 997 | self.storage_broker.set_static_servers(static_servers) |
|---|
| 998 | except EnvironmentError: |
|---|
| 999 | pass |
|---|
| 1000 | |
|---|
| 1001 | def init_blacklist(self): |
|---|
| 1002 | fn = self.config.get_config_path("access.blacklist") |
|---|
| 1003 | self.blacklist = Blacklist(fn) |
|---|
| 1004 | |
|---|
| 1005 | def init_nodemaker(self): |
|---|
| 1006 | default = self.config.get_config("client", "mutable.format", default="SDMF") |
|---|
| 1007 | if default.upper() == "MDMF": |
|---|
| 1008 | self.mutable_file_default = MDMF_VERSION |
|---|
| 1009 | else: |
|---|
| 1010 | self.mutable_file_default = SDMF_VERSION |
|---|
| 1011 | self.nodemaker = NodeMaker(self.storage_broker, |
|---|
| 1012 | self._secret_holder, |
|---|
| 1013 | self.get_history(), |
|---|
| 1014 | self.getServiceNamed("uploader"), |
|---|
| 1015 | self.terminator, |
|---|
| 1016 | self.get_encoding_parameters(), |
|---|
| 1017 | self.mutable_file_default, |
|---|
| 1018 | self._key_generator, |
|---|
| 1019 | self.blacklist) |
|---|
| 1020 | |
|---|
| 1021 | def get_history(self): |
|---|
| 1022 | return self.history |
|---|
| 1023 | |
|---|
| 1024 | def init_helper(self): |
|---|
| 1025 | self.helper = Helper(self.config.get_config_path("helper"), |
|---|
| 1026 | self.storage_broker, self._secret_holder, |
|---|
| 1027 | self.stats_provider, self.history) |
|---|
| 1028 | # TODO: this is confusing. BASEDIR/private/helper.furl is created by |
|---|
| 1029 | # the helper. BASEDIR/helper.furl is consumed by the client who wants |
|---|
| 1030 | # to use the helper. I like having the filename be the same, since |
|---|
| 1031 | # that makes 'cp' work smoothly, but the difference between config |
|---|
| 1032 | # inputs and generated outputs is hard to see. |
|---|
| 1033 | helper_furlfile = self.config.get_private_path("helper.furl").encode(get_filesystem_encoding()) |
|---|
| 1034 | self.tub.registerReference(self.helper, furlFile=helper_furlfile) |
|---|
| 1035 | |
|---|
| 1036 | def _get_tempdir(self): |
|---|
| 1037 | """ |
|---|
| 1038 | Determine the path to the directory where temporary files for this node |
|---|
| 1039 | should be written. |
|---|
| 1040 | |
|---|
| 1041 | :return bytes: The path which will exist and be a directory. |
|---|
| 1042 | """ |
|---|
| 1043 | tempdir_config = self.config.get_config("node", "tempdir", "tmp") |
|---|
| 1044 | if isinstance(tempdir_config, bytes): |
|---|
| 1045 | tempdir_config = tempdir_config.decode('utf-8') |
|---|
| 1046 | tempdir = self.config.get_config_path(tempdir_config) |
|---|
| 1047 | if not os.path.exists(tempdir): |
|---|
| 1048 | fileutil.make_dirs(tempdir) |
|---|
| 1049 | return tempdir |
|---|
| 1050 | |
|---|
| 1051 | def init_web(self, webport): |
|---|
| 1052 | self.log("init_web(webport=%s)", args=(webport,)) |
|---|
| 1053 | |
|---|
| 1054 | from allmydata.webish import WebishServer, anonymous_tempfile_factory |
|---|
| 1055 | nodeurl_path = self.config.get_config_path("node.url") |
|---|
| 1056 | staticdir_config = self.config.get_config("node", "web.static", "public_html") |
|---|
| 1057 | staticdir = self.config.get_config_path(staticdir_config) |
|---|
| 1058 | ws = WebishServer( |
|---|
| 1059 | self, |
|---|
| 1060 | webport, |
|---|
| 1061 | anonymous_tempfile_factory(self._get_tempdir()), |
|---|
| 1062 | nodeurl_path, |
|---|
| 1063 | staticdir, |
|---|
| 1064 | ) |
|---|
| 1065 | ws.setServiceParent(self) |
|---|
| 1066 | |
|---|
| 1067 | def init_sftp_server(self): |
|---|
| 1068 | if self.config.get_config("sftpd", "enabled", False, boolean=True): |
|---|
| 1069 | accountfile = self.config.get_config("sftpd", "accounts.file", None) |
|---|
| 1070 | if accountfile: |
|---|
| 1071 | accountfile = self.config.get_config_path(accountfile) |
|---|
| 1072 | sftp_portstr = self.config.get_config("sftpd", "port", "tcp:8022") |
|---|
| 1073 | pubkey_file = self.config.get_config("sftpd", "host_pubkey_file") |
|---|
| 1074 | privkey_file = self.config.get_config("sftpd", "host_privkey_file") |
|---|
| 1075 | |
|---|
| 1076 | from allmydata.frontends import sftpd |
|---|
| 1077 | s = sftpd.SFTPServer(self, accountfile, |
|---|
| 1078 | sftp_portstr, pubkey_file, privkey_file) |
|---|
| 1079 | s.setServiceParent(self) |
|---|
| 1080 | |
|---|
| 1081 | def _check_exit_trigger(self, exit_trigger_file): |
|---|
| 1082 | if os.path.exists(exit_trigger_file): |
|---|
| 1083 | mtime = os.stat(exit_trigger_file)[stat.ST_MTIME] |
|---|
| 1084 | if mtime > time.time() - 120.0: |
|---|
| 1085 | return |
|---|
| 1086 | else: |
|---|
| 1087 | self.log("%s file too old, shutting down" % (self.EXIT_TRIGGER_FILE,)) |
|---|
| 1088 | else: |
|---|
| 1089 | self.log("%s file missing, shutting down" % (self.EXIT_TRIGGER_FILE,)) |
|---|
| 1090 | reactor.stop() |
|---|
| 1091 | |
|---|
| 1092 | def get_encoding_parameters(self): |
|---|
| 1093 | return self.encoding_params |
|---|
| 1094 | |
|---|
| 1095 | def introducer_connection_statuses(self): |
|---|
| 1096 | return [ic.connection_status() for ic in self.introducer_clients] |
|---|
| 1097 | |
|---|
| 1098 | def connected_to_introducer(self): |
|---|
| 1099 | return any([ic.connected_to_introducer() for ic in self.introducer_clients]) |
|---|
| 1100 | |
|---|
| 1101 | def get_renewal_secret(self): # this will go away |
|---|
| 1102 | return self._secret_holder.get_renewal_secret() |
|---|
| 1103 | |
|---|
| 1104 | def get_cancel_secret(self): |
|---|
| 1105 | return self._secret_holder.get_cancel_secret() |
|---|
| 1106 | |
|---|
| 1107 | def debug_wait_for_client_connections(self, num_clients): |
|---|
| 1108 | """Return a Deferred that fires (with None) when we have connections |
|---|
| 1109 | to the given number of peers. Useful for tests that set up a |
|---|
| 1110 | temporary test network and need to know when it is safe to proceed |
|---|
| 1111 | with an upload or download.""" |
|---|
| 1112 | def _check(): |
|---|
| 1113 | return len(self.storage_broker.get_connected_servers()) >= num_clients |
|---|
| 1114 | d = self.poll(_check, 0.5) |
|---|
| 1115 | d.addCallback(lambda res: None) |
|---|
| 1116 | return d |
|---|
| 1117 | |
|---|
| 1118 | |
|---|
| 1119 | # these four methods are the primitives for creating filenodes and |
|---|
| 1120 | # dirnodes. The first takes a URI and produces a filenode or (new-style) |
|---|
| 1121 | # dirnode. The other three create brand-new filenodes/dirnodes. |
|---|
| 1122 | |
|---|
| 1123 | def create_node_from_uri(self, write_uri, read_uri=None, deep_immutable=False, name="<unknown name>"): |
|---|
| 1124 | # This returns synchronously. |
|---|
| 1125 | # Note that it does *not* validate the write_uri and read_uri; instead we |
|---|
| 1126 | # may get an opaque node if there were any problems. |
|---|
| 1127 | return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name) |
|---|
| 1128 | |
|---|
| 1129 | def create_dirnode( |
|---|
| 1130 | self, |
|---|
| 1131 | initial_children: dict | None = None, |
|---|
| 1132 | version: int | None = None, |
|---|
| 1133 | *, |
|---|
| 1134 | unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None |
|---|
| 1135 | ) -> DirectoryNode: |
|---|
| 1136 | """ |
|---|
| 1137 | Create a new directory. |
|---|
| 1138 | |
|---|
| 1139 | :param initial_children: If given, a structured dict representing the |
|---|
| 1140 | initial content of the created directory. See |
|---|
| 1141 | `docs/frontends/webapi.rst` for examples. |
|---|
| 1142 | |
|---|
| 1143 | :param version: If given, an int representing the mutable file format |
|---|
| 1144 | of the new object. Acceptable values are currently `SDMF_VERSION` |
|---|
| 1145 | or `MDMF_VERSION` (corresponding to 0 or 1, respectively, as |
|---|
| 1146 | defined in `allmydata.interfaces`). If no such value is provided, |
|---|
| 1147 | the default mutable format will be used (currently SDMF). |
|---|
| 1148 | |
|---|
| 1149 | :param unique_keypair: an optional tuple containing the RSA public |
|---|
| 1150 | and private key to be used for the new directory. Typically, this |
|---|
| 1151 | value is omitted (in which case a new random keypair will be |
|---|
| 1152 | generated at creation time). |
|---|
| 1153 | |
|---|
| 1154 | **Warning** This value independently determines the identity of |
|---|
| 1155 | the mutable object to create. There cannot be two different |
|---|
| 1156 | mutable objects that share a keypair. They will merge into one |
|---|
| 1157 | object (with undefined contents). |
|---|
| 1158 | |
|---|
| 1159 | :return: A Deferred which will fire with a representation of the new |
|---|
| 1160 | directory after it has been created. |
|---|
| 1161 | """ |
|---|
| 1162 | d = self.nodemaker.create_new_mutable_directory( |
|---|
| 1163 | initial_children, |
|---|
| 1164 | version=version, |
|---|
| 1165 | keypair=unique_keypair, |
|---|
| 1166 | ) |
|---|
| 1167 | return d |
|---|
| 1168 | |
|---|
| 1169 | def create_immutable_dirnode(self, children, convergence=None): |
|---|
| 1170 | return self.nodemaker.create_immutable_directory(children, convergence) |
|---|
| 1171 | |
|---|
| 1172 | def create_mutable_file( |
|---|
| 1173 | self, |
|---|
| 1174 | contents: bytes | None = None, |
|---|
| 1175 | version: int | None = None, |
|---|
| 1176 | *, |
|---|
| 1177 | unique_keypair: tuple[rsa.PublicKey, rsa.PrivateKey] | None = None, |
|---|
| 1178 | ) -> MutableFileNode: |
|---|
| 1179 | """ |
|---|
| 1180 | Create *and upload* a new mutable object. |
|---|
| 1181 | |
|---|
| 1182 | :param contents: If given, the initial contents for the new object. |
|---|
| 1183 | |
|---|
| 1184 | :param version: If given, the mutable file format for the new object |
|---|
| 1185 | (otherwise a format will be chosen automatically). |
|---|
| 1186 | |
|---|
| 1187 | :param unique_keypair: **Warning** This value independently determines |
|---|
| 1188 | the identity of the mutable object to create. There cannot be two |
|---|
| 1189 | different mutable objects that share a keypair. They will merge |
|---|
| 1190 | into one object (with undefined contents). |
|---|
| 1191 | |
|---|
| 1192 | It is common to pass a None value (or not pass a valuye) for this |
|---|
| 1193 | parameter. In these cases, a new random keypair will be |
|---|
| 1194 | generated. |
|---|
| 1195 | |
|---|
| 1196 | If non-None, the given public/private keypair will be used for the |
|---|
| 1197 | new object. The expected use-case is for implementing compliance |
|---|
| 1198 | tests. |
|---|
| 1199 | |
|---|
| 1200 | :return: A Deferred which will fire with a representation of the new |
|---|
| 1201 | mutable object after it has been uploaded. |
|---|
| 1202 | """ |
|---|
| 1203 | return self.nodemaker.create_mutable_file(contents, |
|---|
| 1204 | version=version, |
|---|
| 1205 | keypair=unique_keypair) |
|---|
| 1206 | |
|---|
| 1207 | def upload(self, uploadable, reactor=None): |
|---|
| 1208 | uploader = self.getServiceNamed("uploader") |
|---|
| 1209 | return uploader.upload(uploadable, reactor=reactor) |
|---|