source: trunk/src/allmydata/storage_client.py

Last change on this file was 7ab0483, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-12-11T15:09:50Z

Pacify newer mypy

  • Property mode set to 100644
File size: 60.5 KB
Line 
1
2"""
3I contain the client-side code which speaks to storage servers, in particular
4the foolscap-based server implemented in src/allmydata/storage/*.py .
5
6Ported to Python 3.
7"""
8
9# roadmap:
10#
11# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
12# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
13# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
14# to clients. webapi status pages call broker.get_info_about_serverid.
15#
16# 2: move get_info methods to the descriptor, webapi status pages call
17# broker.get_descriptor_for_serverid().get_info
18#
19# 3?later?: store descriptors in UploadResults/etc instead of serverids,
20# webapi status pages call descriptor.get_info and don't use storage_broker
21# or Client
22#
23# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
24# optional. This closes #467
25#
26# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
27# clients. Clients stop doing callRemote(), use NativeStorageClient methods
28# instead (which might do something else, i.e. http or whatever). The
29# introducer and tahoe.cfg only create NativeStorageClients for now.
30#
31# 6: implement other sorts of IStorageClient classes: S3, etc
32
33from __future__ import annotations
34
35from typing import Union, Callable, Any, Optional, cast, Dict, Iterable
36from os import urandom
37import re
38import time
39import hashlib
40from io import StringIO
41from configparser import NoSectionError
42import json
43
44import attr
45from attr import define
46from hyperlink import DecodedURL
47from twisted.web.client import HTTPConnectionPool
48from zope.interface import (
49    Attribute,
50    Interface,
51    implementer,
52)
53from twisted.python.failure import Failure
54from twisted.web import http
55from twisted.internet.task import LoopingCall
56from twisted.internet import defer, reactor
57from twisted.internet.interfaces import IReactorTime
58from twisted.application import service
59from twisted.logger import Logger
60from twisted.plugin import (
61    getPlugins,
62)
63from eliot import (
64    log_call,
65)
66from foolscap.ipb import IRemoteReference
67from foolscap.api import eventually, RemoteException
68from foolscap.reconnector import (
69    ReconnectionInfo,
70)
71from allmydata.interfaces import (
72    IStorageBroker,
73    IDisplayableServer,
74    IServer,
75    IStorageServer,
76    IFoolscapStoragePlugin,
77    VersionMessage
78)
79from allmydata.grid_manager import (
80    create_grid_manager_verifier, SignedCertificate
81)
82from allmydata.crypto import (
83    ed25519,
84)
85from allmydata.util.tor_provider import _Provider as TorProvider
86from allmydata.util import log, base32, connection_status
87from allmydata.util.assertutil import precondition
88from allmydata.util.observer import ObserverList
89from allmydata.util.rrefutil import add_version_to_remote_reference
90from allmydata.util.hashutil import permute_server_hash
91from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict
92from allmydata.util.deferredutil import async_to_deferred, race
93from allmydata.util.attrs_provides import provides
94from allmydata.storage.http_client import (
95    StorageClient, StorageClientImmutables, StorageClientGeneral,
96    ClientException as HTTPClientException, StorageClientMutables,
97    ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException,
98    StorageClientFactory
99)
100from .node import _Config
101
102_log = Logger()
103
104ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs"
105
106
107# who is responsible for de-duplication?
108#  both?
109#  IC remembers the unpacked announcements it receives, to provide for late
110#  subscribers and to remove duplicates
111
112# if a client subscribes after startup, will they receive old announcements?
113#  yes
114
115# who will be responsible for signature checking?
116#  make it be IntroducerClient, so they can push the filter outwards and
117#  reduce inbound network traffic
118
119# what should the interface between StorageFarmBroker and IntroducerClient
120# look like?
121#  don't pass signatures: only pass validated blessed-objects
122
123@attr.s
124class StorageClientConfig(object):
125    """
126    Configuration for a node acting as a storage client.
127
128    :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the
129        storage servers where share placement is preferred, in order of
130        decreasing preference.  See the *[client]peers.preferred* documentation
131        for details.
132
133    :ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from
134        names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
135        respective configuration.
136
137    :ivar list[ed25519.VerifyKey] grid_manager_keys: with no keys in
138        this list, we'll upload to any storage server. Otherwise, we will
139        only upload to a storage-server that has a valid certificate
140        signed by at least one of these keys.
141    """
142    preferred_peers : Iterable[bytes] = attr.ib(default=())
143    storage_plugins : dict[str, dict[str, str]] = attr.ib(default=attr.Factory(dict))
144    grid_manager_keys : list[ed25519.Ed25519PublicKey] = attr.ib(default=attr.Factory(list))
145
146    @classmethod
147    def from_node_config(cls, config):
148        """
149        Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node
150        configuration.
151
152        :param _Config config: The loaded Tahoe-LAFS node configuration.
153        """
154        ps = config.get_config("client", "peers.preferred", "").split(",")
155        preferred_peers = tuple([p.strip() for p in ps if p != ""])
156
157        enabled_storage_plugins = (
158            name.strip()
159            for name
160            in config.get_config(
161                "client",
162                "storage.plugins",
163                "",
164            ).split(u",")
165            if name.strip()
166        )
167
168        storage_plugins = {}
169        for plugin_name in enabled_storage_plugins:
170            try:
171                plugin_config = config.items("storageclient.plugins." + plugin_name)
172            except NoSectionError:
173                plugin_config = []
174            storage_plugins[plugin_name] = dict(plugin_config)
175
176        grid_manager_keys = []
177        for name, gm_key in config.enumerate_section('grid_managers').items():
178            grid_manager_keys.append(
179                ed25519.verifying_key_from_string(gm_key.encode("ascii"))
180            )
181
182
183        return cls(
184            preferred_peers,
185            storage_plugins,
186            grid_manager_keys,
187        )
188
189    def get_configured_storage_plugins(self) -> dict[str, IFoolscapStoragePlugin]:
190        """
191        :returns: a mapping from names to instances for all available
192            plugins
193
194        :raises MissingPlugin: if the configuration asks for a plugin
195            for which there is no corresponding instance (e.g. it is
196            not installed).
197        """
198        plugins = {
199            plugin.name: plugin
200            for plugin
201            in getPlugins(IFoolscapStoragePlugin)
202        }
203
204        # mypy doesn't like "str" in place of Any ...
205        configured: Dict[Any, IFoolscapStoragePlugin] = dict()
206        for plugin_name in self.storage_plugins:
207            try:
208                plugin = plugins[plugin_name]
209            except KeyError:
210                raise MissingPlugin(plugin_name)
211            configured[plugin_name] = plugin
212        return configured
213
214
215@implementer(IStorageBroker)
216class StorageFarmBroker(service.MultiService):
217    """I live on the client, and know about storage servers. For each server
218    that is participating in a grid, I either maintain a connection to it or
219    remember enough information to establish a connection to it on demand.
220    I'm also responsible for subscribing to the IntroducerClient to find out
221    about new servers as they are announced by the Introducer.
222
223    :ivar _tub_maker: A one-argument callable which accepts a dictionary of
224        "handler overrides" and returns a ``foolscap.api.Tub``.
225
226    :ivar StorageClientConfig storage_client_config: Values from the node
227        configuration file relating to storage behavior.
228    """
229
230    @property
231    def preferred_peers(self):
232        return self.storage_client_config.preferred_peers
233
234    def __init__(
235            self,
236            permute_peers,
237            tub_maker,
238            node_config: _Config,
239            storage_client_config=None,
240            default_connection_handlers=None,
241            tor_provider: Optional[TorProvider]=None,
242    ):
243        service.MultiService.__init__(self)
244        if default_connection_handlers is None:
245            default_connection_handlers = {"tcp": "tcp"}
246
247        assert permute_peers # False not implemented yet
248        self.permute_peers = permute_peers
249        self._tub_maker = tub_maker
250
251        self.node_config = node_config
252
253        if storage_client_config is None:
254            storage_client_config = StorageClientConfig()
255        self.storage_client_config = storage_client_config
256
257        # self.servers maps serverid -> IServer, and keeps track of all the
258        # storage servers that we've heard about. Each descriptor manages its
259        # own Reconnector, and will give us a RemoteReference when we ask
260        # them for it.
261        self.servers = BytesKeyDict()
262        self._static_server_ids : set[bytes] = set() # ignore announcements for these
263        self.introducer_client = None
264        self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred)
265        self._connected_high_water_mark = 0
266        self._tor_provider = tor_provider
267        self._default_connection_handlers = default_connection_handlers
268
269    @log_call(action_type=u"storage-client:broker:set-static-servers")
270    def set_static_servers(self, servers):
271        # Sorting the items gives us a deterministic processing order.  This
272        # doesn't really matter but it makes the logging behavior more
273        # predictable and easier to test (and at least one test does depend on
274        # this sorted order).
275        for (server_id, server) in sorted(servers.items()):
276            try:
277                storage_server = self._make_storage_server(
278                    server_id.encode("utf-8"),
279                    server,
280                )
281            except Exception:
282                # TODO: The _make_storage_server failure is logged but maybe
283                # we should write a traceback here.  Notably, tests don't
284                # automatically fail just because we hit this case.  Well
285                # written tests will still fail if a surprising exception
286                # arrives here but they might be harder to debug without this
287                # information.
288                pass
289            else:
290                if isinstance(server_id, str):
291                    server_id = server_id.encode("utf-8")
292                self._static_server_ids.add(server_id)
293                self.servers[server_id] = storage_server
294                storage_server.setServiceParent(self)
295                storage_server.start_connecting(self._trigger_connections)
296
297    def get_client_storage_plugin_web_resources(self, node_config):
298        """
299        Get all of the client-side ``IResource`` implementations provided by
300        enabled storage plugins.
301
302        :param allmydata.node._Config node_config: The complete node
303            configuration for the node from which these web resources will be
304            served.
305
306        :return dict[unicode, IResource]: Resources for all of the plugins.
307        """
308        plugins = {
309            plugin.name: plugin
310            for plugin
311            in getPlugins(IFoolscapStoragePlugin)
312        }
313        return UnicodeKeyDict({
314            name: plugins[name].get_client_resource(node_config)
315            for (name, config)
316            in self.storage_client_config.storage_plugins.items()
317        })
318
319    @staticmethod
320    def _should_we_use_http(node_config: _Config, announcement: dict) -> bool:
321        """
322        Given an announcement dictionary and config, return whether we should
323        connect to storage server over HTTP.
324        """
325        return not node_config.get_config(
326            "client", "force_foolscap", default=False, boolean=True,
327        ) and len(announcement.get(ANONYMOUS_STORAGE_NURLS, [])) > 0
328
329    @log_call(
330        action_type=u"storage-client:broker:make-storage-server",
331        include_args=["server_id"],
332        include_result=False,
333    )
334    def _make_storage_server(self, server_id, server):
335        """
336        Create a new ``IServer`` for the given storage server announcement.
337
338        :param bytes server_id: The unique identifier for the server.
339
340        :param dict server: The server announcement.  See ``Static Server
341            Definitions`` in the configuration documentation for details about
342            the structure and contents.
343
344        :return IServer: The object-y representation of the server described
345            by the given announcement.
346        """
347        assert isinstance(server_id, bytes)
348        gm_verifier = create_grid_manager_verifier(
349            self.storage_client_config.grid_manager_keys,
350            [SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])],
351            "pub-{}".format(str(server_id, "ascii")).encode("ascii"),  # server_id is v0-<key> not pub-v0-key .. for reasons?
352        )
353
354        if self._should_we_use_http(self.node_config, server["ann"]):
355            s = HTTPNativeStorageServer(
356                server_id,
357                server["ann"],
358                grid_manager_verifier=gm_verifier,
359                default_connection_handlers=self._default_connection_handlers,
360                tor_provider=self._tor_provider
361            )
362            s.on_status_changed(lambda _: self._got_connection())
363            return s
364
365        handler_overrides = server.get("connections", {})
366        s = NativeStorageServer(
367            server_id,
368            server["ann"],
369            self._tub_maker,
370            handler_overrides,
371            self.node_config,
372            self.storage_client_config,
373            gm_verifier,
374        )
375        s.on_status_changed(lambda _: self._got_connection())
376        return s
377
378    def when_connected_enough(self, threshold):
379        """
380        :returns: a Deferred that fires if/when our high water mark for
381        number of connected servers becomes (or ever was) above
382        "threshold".
383        """
384        d = defer.Deferred()
385        self._threshold_listeners.append( (threshold, d) )
386        self._check_connected_high_water_mark()
387        return d
388
389    # these two are used in unit tests
390    def test_add_rref(self, serverid, rref, ann):
391        s = self._make_storage_server(
392            serverid,
393            {"ann": ann.copy()},
394        )
395        s._rref = rref
396        s._is_connected = True
397        self.servers[serverid] = s
398
399    def test_add_server(self, server_id, s):
400        s.on_status_changed(lambda _: self._got_connection())
401        self.servers[server_id] = s
402
403    def use_introducer(self, introducer_client):
404        self.introducer_client = ic = introducer_client
405        ic.subscribe_to("storage", self._got_announcement)
406
407    def _got_connection(self):
408        # this is called by NativeStorageServer when it is connected
409        self._check_connected_high_water_mark()
410
411    def _check_connected_high_water_mark(self):
412        current = len(self.get_connected_servers())
413        if current > self._connected_high_water_mark:
414            self._connected_high_water_mark = current
415
416        remaining = []
417        for threshold, d in self._threshold_listeners:
418            if self._connected_high_water_mark >= threshold:
419                eventually(d.callback, None)
420            else:
421                remaining.append( (threshold, d) )
422        self._threshold_listeners = remaining
423
424    def _should_ignore_announcement(self, server_id, ann):
425        """
426        Determine whether a new storage announcement should be discarded or used
427        to update our collection of storage servers.
428
429        :param bytes server_id: The unique identifier for the storage server
430            which made the announcement.
431
432        :param dict ann: The announcement.
433
434        :return bool: ``True`` if the announcement should be ignored,
435            ``False`` if it should be used to update our local storage server
436            state.
437        """
438        # Let local static configuration always override any announcement for
439        # a particular server.
440        if server_id in self._static_server_ids:
441            log.msg(format="ignoring announcement for static server '%(id)s'",
442                    id=server_id,
443                    facility="tahoe.storage_broker", umid="AlxzqA",
444                    level=log.UNUSUAL)
445            return True
446
447        try:
448            old = self.servers[server_id]
449        except KeyError:
450            # We don't know anything about this server.  Let's use the
451            # announcement to change that.
452            return False
453        else:
454            # Determine if this announcement is at all difference from the
455            # announcement we already have for the server.  If it is the same,
456            # we don't need to change anything.
457            return old.get_announcement() == ann
458
459    def _got_announcement(self, key_s, ann):
460        """
461        This callback is given to the introducer and called any time an
462        announcement is received which has a valid signature and does not have
463        a sequence number less than or equal to a previous sequence number
464        seen for that server by that introducer.
465
466        Note sequence numbers are not considered between different introducers
467        so if we use more than one introducer it is possible for them to
468        deliver us stale announcements in some cases.
469        """
470        precondition(isinstance(key_s, bytes), key_s)
471        precondition(key_s.startswith(b"v0-"), key_s)
472        precondition(ann["service-name"] == "storage", ann["service-name"])
473        server_id = key_s
474
475        if self._should_ignore_announcement(server_id, ann):
476            return
477
478        s = self._make_storage_server(
479            server_id,
480            {u"ann": ann},
481        )
482
483        try:
484            old = self.servers.pop(server_id)
485        except KeyError:
486            pass
487        else:
488            # It's a replacement, get rid of the old one.
489            old.stop_connecting()
490            old.disownServiceParent()
491            # NOTE: this disownServiceParent() returns a Deferred that
492            # doesn't fire until Tub.stopService fires, which will wait for
493            # any existing connections to be shut down. This doesn't
494            # generally matter for normal runtime, but unit tests can run
495            # into DirtyReactorErrors if they don't block on these. If a test
496            # replaces one server with a newer version, then terminates
497            # before the old one has been shut down, it might get
498            # DirtyReactorErrors. The fix would be to gather these Deferreds
499            # into a structure that will block StorageFarmBroker.stopService
500            # until they have fired (but hopefully don't keep reference
501            # cycles around when they fire earlier than that, which will
502            # almost always be the case for normal runtime).
503
504        # now we forget about them and start using the new one
505        s.setServiceParent(self)
506        self.servers[server_id] = s
507        s.start_connecting(self._trigger_connections)
508        # the descriptor will manage their own Reconnector, and each time we
509        # need servers, we'll ask them if they're connected or not.
510
511    def _trigger_connections(self):
512        # when one connection is established, reset the timers on all others,
513        # to trigger a reconnection attempt in one second. This is intended
514        # to accelerate server connections when we've been offline for a
515        # while. The goal is to avoid hanging out for a long time with
516        # connections to only a subset of the servers, which would increase
517        # the chances that we'll put shares in weird places (and not update
518        # existing shares of mutable files). See #374 for more details.
519        for dsc in list(self.servers.values()):
520            dsc.try_to_connect()
521
522    def get_servers_for_psi(self, peer_selection_index, for_upload=False):
523        """
524        :param for_upload: used to determine if we should include any
525        servers that are invalid according to Grid Manager
526        processing. When for_upload is True and we have any Grid
527        Manager keys configured, any storage servers with invalid or
528        missing certificates will be excluded.
529        """
530        # return a list of server objects (IServers)
531        assert self.permute_peers == True
532        connected_servers = self.get_connected_servers()
533        preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
534        if for_upload:
535            # print("upload processing: {}".format([srv.upload_permitted() for srv in connected_servers]))
536            connected_servers = [
537                srv
538                for srv in connected_servers
539                if srv.upload_permitted()
540            ]
541
542        def _permuted(server):
543            seed = server.get_permutation_seed()
544            is_unpreferred = server not in preferred_servers
545            return (is_unpreferred,
546                    permute_server_hash(peer_selection_index, seed))
547        return sorted(connected_servers, key=_permuted)
548
549    def get_all_serverids(self):
550        return frozenset(self.servers.keys())
551
552    def get_connected_servers(self):
553        return frozenset([s for s in self.servers.values() if s.is_connected()])
554
555    def get_known_servers(self):
556        return frozenset(self.servers.values())
557
558    def get_nickname_for_serverid(self, serverid):
559        if serverid in self.servers:
560            return self.servers[serverid].get_nickname()
561        return None
562
563    def get_stub_server(self, serverid):
564        if serverid in self.servers:
565            return self.servers[serverid]
566        # some time before 1.12, we changed "serverid" to be "key_s" (the
567        # printable verifying key, used in V2 announcements), instead of the
568        # tubid. When the immutable uploader delegates work to a Helper,
569        # get_stub_server() is used to map the returning server identifiers
570        # to IDisplayableServer instances (to get a name, for display on the
571        # Upload Results web page). If the Helper is running 1.12 or newer,
572        # it will send pubkeys, but if it's still running 1.11, it will send
573        # tubids. This clause maps the old tubids to our existing servers.
574        for s in list(self.servers.values()):
575            if isinstance(s, NativeStorageServer):
576                if serverid == s.get_tubid():
577                    return s
578        return StubServer(serverid)
579
580@implementer(IDisplayableServer)
581class StubServer(object):
582    def __init__(self, serverid):
583        assert isinstance(serverid, bytes)
584        self.serverid = serverid # binary tubid
585    def get_serverid(self):
586        return self.serverid
587    def get_name(self):
588        return base32.b2a(self.serverid)[:8]
589    def get_longname(self):
590        return base32.b2a(self.serverid)
591    def get_nickname(self):
592        return "?"
593
594
595class IFoolscapStorageServer(Interface):
596    """
597    An internal interface that mediates between ``NativeStorageServer`` and
598    Foolscap-based ``IStorageServer`` implementations.
599    """
600    nickname = Attribute("""
601    A name for this server for presentation to users.
602    """)
603    permutation_seed = Attribute("""
604    A stable value associated with this server which a client can use as an
605    input to the server selection permutation ordering.
606    """)
607    tubid = Attribute("""
608    The identifier for the Tub in which the server is run.
609    """)
610    storage_server = Attribute("""
611    An IStorageServer provide which implements a concrete Foolscap-based
612    protocol for communicating with the server.
613    """)
614    name = Attribute("""
615    Another name for this server for presentation to users.
616    """)
617    longname = Attribute("""
618    *Another* name for this server for presentation to users.
619    """)
620    lease_seed = Attribute("""
621    A stable value associated with this server which a client can use as an
622    input to a lease secret generation function.
623    """)
624
625    def connect_to(tub, got_connection):
626        """
627        Attempt to establish and maintain a connection to the server.
628
629        :param Tub tub: A Foolscap Tub from which the connection is to
630            originate.
631
632        :param got_connection: A one-argument callable which is called with a
633            Foolscap ``RemoteReference`` when a connection is established.
634            This may be called multiple times if the connection is lost and
635            then re-established.
636
637        :return foolscap.reconnector.Reconnector: An object which manages the
638            connection and reconnection attempts.
639        """
640
641
642def _parse_announcement(server_id: bytes, furl: bytes, ann: dict) -> tuple[str, bytes, bytes, bytes, bytes]:
643    """
644    Parse the furl and announcement, return:
645
646        (nickname, permutation_seed, tubid, short_description, long_description)
647    """
648    m = re.match(br'pb://(\w+)@', furl)
649    assert m, furl
650    tubid_s = m.group(1).lower()
651    tubid = base32.a2b(tubid_s)
652    if "permutation-seed-base32" in ann:
653        seed = ann["permutation-seed-base32"]
654        if isinstance(seed, str):
655            seed = seed.encode("utf-8")
656        ps = base32.a2b(seed)
657    elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id):
658        ps = base32.a2b(server_id[3:])
659    else:
660        log.msg("unable to parse serverid '%(server_id)s as pubkey, "
661                "hashing it to get permutation-seed, "
662                "may not converge with other clients",
663                server_id=server_id,
664                facility="tahoe.storage_broker",
665                level=log.UNUSUAL, umid="qu86tw")
666        ps = hashlib.sha256(server_id).digest()
667    permutation_seed = ps
668
669    assert server_id
670    long_description = server_id
671    if server_id.startswith(b"v0-"):
672        # remove v0- prefix from abbreviated name
673        short_description = server_id[3:3+8]
674    else:
675        short_description = server_id[:8]
676    nickname = ann.get("nickname", "")
677
678    return (nickname, permutation_seed, tubid, short_description, long_description)
679
680
681@implementer(IFoolscapStorageServer)
682@attr.s(frozen=True)
683class _FoolscapStorage(object):
684    """
685    Abstraction for connecting to a storage server exposed via Foolscap.
686    """
687    nickname = attr.ib()
688    permutation_seed = attr.ib()
689    tubid = attr.ib()
690
691    storage_server = attr.ib(validator=provides(IStorageServer))
692
693    _furl = attr.ib()
694    _short_description = attr.ib()
695    _long_description = attr.ib()
696
697
698    @property
699    def name(self):
700        return self._short_description
701
702    @property
703    def longname(self):
704        return self._long_description
705
706    @property
707    def lease_seed(self):
708        return self.tubid
709
710    @classmethod
711    def from_announcement(cls, server_id, furl, ann, storage_server):
712        """
713        Create an instance from a fURL and an announcement like::
714
715            {"permutation-seed-base32": "...",
716             "nickname": "...",
717             "grid-manager-certificates": [..],
718            }
719
720        *nickname* and *grid-manager-certificates* are optional.
721
722        The furl will be a Unicode string on Python 3; on Python 2 it will be
723        either a native (bytes) string or a Unicode string.
724        """
725        (nickname, permutation_seed, tubid, short_description, long_description) = _parse_announcement(server_id, furl.encode("utf-8"), ann)
726        return cls(
727            nickname=nickname,
728            permutation_seed=permutation_seed,
729            tubid=tubid,
730            storage_server=storage_server,
731            furl=furl.encode("utf-8"),
732            short_description=short_description,
733            long_description=long_description,
734        )
735
736    def connect_to(self, tub, got_connection):
737        return tub.connectTo(self._furl, got_connection)
738
739
740@implementer(IFoolscapStorageServer)
741@define
742class _NullStorage(object):
743    """
744    Abstraction for *not* communicating with a storage server of a type with
745    which we can't communicate.
746    """
747    nickname = ""
748    permutation_seed = hashlib.sha256(b"").digest()
749    tubid = hashlib.sha256(b"").digest()
750    storage_server = None
751
752    lease_seed = hashlib.sha256(b"").digest()
753
754    name = "<unsupported>"
755    longname: str = "<storage with unsupported protocol>"
756
757    def connect_to(self, tub, got_connection):
758        return NonReconnector()
759
760
761class NonReconnector(object):
762    """
763    A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything.
764    """
765    def stopConnecting(self):
766        pass
767
768    def reset(self):
769        pass
770
771    def getReconnectionInfo(self):
772        return ReconnectionInfo()
773
774
775class AnnouncementNotMatched(Exception):
776    """
777    A storage server announcement wasn't matched by any of the locally enabled
778    plugins.
779    """
780
781
782@attr.s(auto_exc=True)
783class MissingPlugin(Exception):
784    """
785    A particular plugin was requested but is missing
786    """
787
788    plugin_name = attr.ib()
789
790    def __str__(self):
791        return "Missing plugin '{}'".format(self.plugin_name)
792
793
794def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
795    """
796    Construct an ``IStorageServer`` from the most locally-preferred plugin
797    that is offered in the given announcement.
798
799    :param allmydata.node._Config node_config: The node configuration to
800        pass to the plugin.
801
802    :param dict announcement: The storage announcement for the storage
803        server we should build
804    """
805    storage_options = announcement.get(u"storage-options", [])
806    plugins = config.get_configured_storage_plugins()
807
808    # for every storage-option that we have enabled locally (in order
809    # of preference), see if the announcement asks for such a thing.
810    # if it does, great: we return that storage-client
811    # otherwise we've run out of options...
812
813    for options in storage_options:
814        try:
815            plugin = plugins[options[u"name"]]
816        except KeyError:
817            # we didn't configure this kind of plugin locally, so
818            # consider the next announced option
819            continue
820
821        furl = options[u"storage-server-FURL"]
822        return furl, plugin.get_storage_client(
823            node_config,
824            options,
825            get_rref,
826        )
827
828    # none of the storage options in the announcement are configured
829    # locally; we can't make a storage-client.
830    plugin_names = ", ".join(sorted(option["name"] for option in storage_options))
831    raise AnnouncementNotMatched(plugin_names)
832
833
834def _available_space_from_version(version):
835    if version is None:
836        return None
837    protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict())
838    available_space = protocol_v1_version.get(b'available-space')
839    if available_space is None:
840        available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None)
841    return available_space
842
843
844def _make_storage_system(
845        node_config: _Config,
846        config: StorageClientConfig,
847        ann: dict,
848        server_id: bytes,
849        get_rref: Callable[[], Optional[IRemoteReference]],
850) -> IFoolscapStorageServer:
851    """
852    Create an object for interacting with the storage server described by
853    the given announcement.
854
855    :param node_config: The node configuration to pass to any configured
856        storage plugins.
857
858    :param config: Configuration specifying desired storage client behavior.
859
860    :param ann: The storage announcement from the storage server we are meant
861        to communicate with.
862
863    :param server_id: The unique identifier for the server.
864
865    :param get_rref: A function which returns a remote reference to the
866        server-side object which implements this storage system, if one is
867        available (otherwise None).
868
869    :return: An object enabling communication via Foolscap with the server
870        which generated the announcement.
871    """
872    unmatched = None
873    # Try to match the announcement against a plugin.
874    try:
875        furl, storage_server = _storage_from_foolscap_plugin(
876            node_config,
877            config,
878            ann,
879            # Pass in an accessor for our _rref attribute.  The value of
880            # the attribute may change over time as connections are lost
881            # and re-established.  The _StorageServer should always be
882            # able to get the most up-to-date value.
883            get_rref,
884        )
885    except AnnouncementNotMatched as e:
886        # show a more-specific error to the user for this server
887        # (Note this will only be shown if the server _doesn't_ offer
888        # anonymous service, which will match below)
889        unmatched = _NullStorage('{}: missing plugin "{}"'.format(server_id.decode("utf8"), str(e)))
890    else:
891        return _FoolscapStorage.from_announcement(
892            server_id,
893            furl,
894            ann,
895            storage_server,
896        )
897
898    # Try to match the announcement against the anonymous access scheme.
899    try:
900        furl = ann[u"anonymous-storage-FURL"]
901    except KeyError:
902        # Nope
903        pass
904    else:
905        # See comment above for the _storage_from_foolscap_plugin case
906        # about passing in get_rref.
907        storage_server = _StorageServer(get_rref=get_rref)
908        return _FoolscapStorage.from_announcement(
909            server_id,
910            furl,
911            ann,
912            storage_server,
913        )
914
915    # Nothing matched so we can't talk to this server. (There should
916    # not be a way to get here without this local being valid)
917    assert unmatched is not None, "Expected unmatched plugin error"
918    return unmatched
919
920
921@implementer(IServer)
922class NativeStorageServer(service.MultiService):
923    """I hold information about a storage server that we want to connect to.
924    If we are connected, I hold the RemoteReference, their host address, and
925    the their version information. I remember information about when we were
926    last connected too, even if we aren't currently connected.
927
928    @ivar last_connect_time: when we last established a connection
929    @ivar last_loss_time: when we last lost a connection
930
931    @ivar version: the server's versiondict, from the most recent announcement
932    @ivar nickname: the server's self-reported nickname (unicode), same
933
934    @ivar rref: the RemoteReference, if connected, otherwise None
935    """
936
937    VERSION_DEFAULTS = UnicodeKeyDict({
938        "http://allmydata.org/tahoe/protocols/storage/v1" :
939        UnicodeKeyDict({ "maximum-immutable-share-size": 2**32 - 1,
940          "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
941          "tolerates-immutable-read-overrun": False,
942          "delete-mutable-shares-with-zero-length-writev": False,
943          "available-space": None,
944          }),
945        "application-version": "unknown: no get_version()",
946        })
947
948    def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=None,
949                 grid_manager_verifier=None):
950        service.MultiService.__init__(self)
951        assert isinstance(server_id, bytes)
952        self._server_id = server_id
953        self.announcement = ann
954        self._tub_maker = tub_maker
955        self._handler_overrides = handler_overrides
956
957        if config is None:
958            config = StorageClientConfig()
959
960        self._grid_manager_verifier = grid_manager_verifier
961
962        self._storage = _make_storage_system(node_config, config, ann, self._server_id, self.get_rref)
963
964        self.last_connect_time = None
965        self.last_loss_time = None
966        self._rref = None
967        self._is_connected = False
968        self._reconnector = None
969        self._trigger_cb = None
970        self._on_status_changed = ObserverList()
971
972    def upload_permitted(self):
973        """
974        If our client is configured with Grid Manager public-keys, we will
975        only upload to storage servers that have a currently-valid
976        certificate signed by at least one of the Grid Managers we
977        accept.
978
979        :return: True if we should use this server for uploads, False
980            otherwise.
981        """
982        # if we have no Grid Manager keys configured, choice is easy
983        if self._grid_manager_verifier is None:
984            return True
985        return self._grid_manager_verifier()
986
987    def get_permutation_seed(self):
988        return self._storage.permutation_seed
989    def get_name(self): # keep methodname short
990        # TODO: decide who adds [] in the short description. It should
991        # probably be the output side, not here.
992        return self._storage.name
993    def get_longname(self):
994        return self._storage.longname
995    def get_tubid(self):
996        return self._storage.tubid
997    def get_lease_seed(self):
998        return self._storage.lease_seed
999    def get_foolscap_write_enabler_seed(self):
1000        return self._storage.tubid
1001    def get_nickname(self):
1002        return self._storage.nickname
1003
1004    def on_status_changed(self, status_changed):
1005        """
1006        :param status_changed: a callable taking a single arg (the
1007            NativeStorageServer) that is notified when we become connected
1008        """
1009        return self._on_status_changed.subscribe(status_changed)
1010
1011    # Special methods used by copy.copy() and copy.deepcopy(). When those are
1012    # used in allmydata.immutable.filenode to copy CheckResults during
1013    # repair, we want it to treat the IServer instances as singletons, and
1014    # not attempt to duplicate them..
1015    def __copy__(self):
1016        return self
1017    def __deepcopy__(self, memodict):
1018        return self
1019
1020    def __repr__(self):
1021        return "<NativeStorageServer for %r>" % self.get_name()
1022    def get_serverid(self):
1023        return self._server_id
1024    def get_version(self):
1025        if self._rref:
1026            return self._rref.version
1027        return None
1028    def get_announcement(self):
1029        return self.announcement
1030
1031    def get_connection_status(self):
1032        last_received = None
1033        if self._rref:
1034            last_received = self._rref.getDataLastReceivedAt()
1035        return connection_status.from_foolscap_reconnector(self._reconnector,
1036                                                           last_received)
1037
1038    def is_connected(self):
1039        return self._is_connected
1040
1041    def get_available_space(self):
1042        version = self.get_version()
1043        return _available_space_from_version(version)
1044
1045    def start_connecting(self, trigger_cb):
1046        self._tub = self._tub_maker(self._handler_overrides)
1047        self._tub.setServiceParent(self)
1048        self._trigger_cb = trigger_cb
1049        self._reconnector = self._storage.connect_to(self._tub, self._got_connection)
1050
1051    def _got_connection(self, rref):
1052        lp = log.msg(format="got connection to %(name)s, getting versions",
1053                     name=self.get_name(),
1054                     facility="tahoe.storage_broker", umid="coUECQ")
1055        if self._trigger_cb:
1056            eventually(self._trigger_cb)
1057        default = self.VERSION_DEFAULTS
1058        d = add_version_to_remote_reference(rref, default)
1059        d.addCallback(self._got_versioned_service, lp)
1060        d.addCallback(lambda ign: self._on_status_changed.notify(self))
1061        d.addErrback(log.err, format="storageclient._got_connection",
1062                     name=self.get_name(), umid="Sdq3pg")
1063
1064    def _got_versioned_service(self, rref, lp):
1065        log.msg(format="%(name)s provided version info %(version)s",
1066                name=self.get_name(), version=rref.version,
1067                facility="tahoe.storage_broker", umid="SWmJYg",
1068                level=log.NOISY, parent=lp)
1069
1070        self.last_connect_time = time.time()
1071        self._rref = rref
1072        self._is_connected = True
1073        rref.notifyOnDisconnect(self._lost)
1074
1075    def get_rref(self):
1076        return self._rref
1077
1078    def get_storage_server(self):
1079        """
1080        See ``IServer.get_storage_server``.
1081        """
1082        if self._rref is None:
1083            return None
1084        return self._storage.storage_server
1085
1086    def _lost(self):
1087        log.msg(format="lost connection to %(name)s", name=self.get_name(),
1088                facility="tahoe.storage_broker", umid="zbRllw")
1089        self.last_loss_time = time.time()
1090        # self._rref is now stale: all callRemote()s will get a
1091        # DeadReferenceError. We leave the stale reference in place so that
1092        # uploader/downloader code (which received this IServer through
1093        # get_connected_servers() or get_servers_for_psi()) can continue to
1094        # use s.get_rref().callRemote() and not worry about it being None.
1095        self._is_connected = False
1096
1097    def stop_connecting(self):
1098        # used when this descriptor has been superceded by another
1099        self._reconnector.stopConnecting()
1100
1101    def try_to_connect(self):
1102        # used when the broker wants us to hurry up
1103        self._reconnector.reset()
1104
1105
1106@async_to_deferred
1107async def _pick_a_http_server(
1108        reactor,
1109        nurls: list[DecodedURL],
1110        request: Callable[[object, DecodedURL], defer.Deferred[object]]
1111) -> DecodedURL:
1112    """Pick the first server we successfully send a request to.
1113
1114    Fires with ``None`` if no server was found, or with the ``DecodedURL`` of
1115    the first successfully-connected server.
1116    """
1117    requests = []
1118    for nurl in nurls:
1119        def to_nurl(_: object, nurl: DecodedURL=nurl) -> DecodedURL:
1120            return nurl
1121
1122        requests.append(request(reactor, nurl).addCallback(to_nurl))
1123
1124    queries: defer.Deferred[tuple[int, DecodedURL]] = race(requests)
1125    _, nurl = await queries
1126    return nurl
1127
1128
1129@implementer(IServer)
1130class HTTPNativeStorageServer(service.MultiService):
1131    """
1132    Like ``NativeStorageServer``, but for HTTP clients.
1133
1134    The notion of being "connected" is less meaningful for HTTP; we just poll
1135    occasionally, and if we've succeeded at last poll, we assume we're
1136    "connected".
1137    """
1138
1139    def __init__(self, server_id: bytes, announcement, default_connection_handlers: dict[str,str], reactor=reactor, grid_manager_verifier=None, tor_provider: Optional[TorProvider]=None):
1140        service.MultiService.__init__(self)
1141        assert isinstance(server_id, bytes)
1142        self._server_id = server_id
1143        self.announcement = announcement
1144        self._on_status_changed = ObserverList()
1145        self._reactor = reactor
1146        self._grid_manager_verifier = grid_manager_verifier
1147        self._storage_client_factory = StorageClientFactory(
1148            default_connection_handlers, tor_provider
1149        )
1150
1151        furl = announcement["anonymous-storage-FURL"].encode("utf-8")
1152        (
1153            self._nickname,
1154            self._permutation_seed,
1155            self._tubid,
1156            self._short_description,
1157            self._long_description
1158        ) = _parse_announcement(server_id, furl, announcement)
1159        self._nurls = [
1160            DecodedURL.from_text(u)
1161            for u in announcement[ANONYMOUS_STORAGE_NURLS]
1162        ]
1163        self._istorage_server : Optional[_HTTPStorageServer] = None
1164
1165        self._connection_status = connection_status.ConnectionStatus.unstarted()
1166        self._version = None
1167        self._last_connect_time = None
1168        self._connecting_deferred : Optional[defer.Deferred[object]]= None
1169
1170    def get_permutation_seed(self):
1171        return self._permutation_seed
1172
1173    def get_name(self):
1174        return self._short_description
1175
1176    def get_longname(self):
1177        return self._long_description
1178
1179    def get_tubid(self):
1180        return self._tubid
1181
1182    def get_lease_seed(self):
1183        # Apparently this is what Foolscap version above does?!
1184        return self._tubid
1185
1186    def get_foolscap_write_enabler_seed(self):
1187        return self._tubid
1188
1189    def get_nickname(self):
1190        return self._nickname
1191
1192    def on_status_changed(self, status_changed):
1193        """
1194        :param status_changed: a callable taking a single arg (the
1195            NativeStorageServer) that is notified when we become connected
1196        """
1197        return self._on_status_changed.subscribe(status_changed)
1198
1199    def upload_permitted(self):
1200        """
1201        If our client is configured with Grid Manager public-keys, we will
1202        only upload to storage servers that have a currently-valid
1203        certificate signed by at least one of the Grid Managers we
1204        accept.
1205
1206        :return: True if we should use this server for uploads, False
1207            otherwise.
1208        """
1209        # if we have no Grid Manager keys configured, choice is easy
1210        if self._grid_manager_verifier is None:
1211            return True
1212        return self._grid_manager_verifier()
1213
1214    # Special methods used by copy.copy() and copy.deepcopy(). When those are
1215    # used in allmydata.immutable.filenode to copy CheckResults during
1216    # repair, we want it to treat the IServer instances as singletons, and
1217    # not attempt to duplicate them..
1218    def __copy__(self):
1219        return self
1220
1221    def __deepcopy__(self, memodict):
1222        return self
1223
1224    def __repr__(self):
1225        return "<HTTPNativeStorageServer for %r>" % self.get_name()
1226
1227    def get_serverid(self):
1228        return self._server_id
1229
1230    def get_version(self):
1231        return self._version
1232
1233    def get_announcement(self):
1234        return self.announcement
1235
1236    def get_connection_status(self):
1237        return self._connection_status
1238
1239    def is_connected(self):
1240        return self._connection_status.connected
1241
1242    def get_available_space(self):
1243        version = self.get_version()
1244        return _available_space_from_version(version)
1245
1246    def start_connecting(self, trigger_cb):
1247        self._lc = LoopingCall(self._connect)
1248        self._lc.start(1, True)
1249
1250    def _got_version(self, version):
1251        self._last_connect_time = time.time()
1252        self._version = version
1253        self._connection_status = connection_status.ConnectionStatus(
1254            True, "connected", [], self._last_connect_time, self._last_connect_time
1255        )
1256        self._on_status_changed.notify(self)
1257
1258    def _failed_to_connect(self, reason):
1259        self._connection_status = connection_status.ConnectionStatus(
1260            False, f"failure: {reason}", [], self._last_connect_time, self._last_connect_time
1261        )
1262        self._on_status_changed.notify(self)
1263
1264    def get_storage_server(self):
1265        """
1266        See ``IServer.get_storage_server``.
1267        """
1268        if self._connection_status.summary == "unstarted":
1269            return None
1270        return self._istorage_server
1271
1272    def stop_connecting(self):
1273        self._lc.stop()
1274        if self._connecting_deferred is not None:
1275            self._connecting_deferred.cancel()
1276
1277    def try_to_connect(self):
1278        self._connect()
1279
1280    def _connect(self) -> defer.Deferred[object]:
1281        """
1282        Try to connect to a working storage server.
1283
1284        If called while a previous ``_connect()`` is already running, it will
1285        just return the same ``Deferred``.
1286
1287        ``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately:
1288        https://github.com/twisted/twisted/issues/11814. Thus we want to store
1289        the ``Deferred`` so we can cancel it when necessary.
1290
1291        We also want to return it so that loop iterations take it into account,
1292        and a new iteration doesn't start while we're in the middle of the
1293        previous one.
1294        """
1295        # Conceivably try_to_connect() was called on this before, in which case
1296        # we already are in the middle of connecting. So in that case just
1297        # return whatever is in progress:
1298        if self._connecting_deferred is not None:
1299            return self._connecting_deferred
1300
1301        def done(_):
1302            self._connecting_deferred = None
1303
1304        connecting = self._pick_server_and_get_version()
1305        # Set a short timeout since we're relying on this for server liveness.
1306        connecting = connecting.addTimeout(5, self._reactor).addCallbacks(
1307            self._got_version, self._failed_to_connect
1308        ).addBoth(done)
1309        self._connecting_deferred = connecting
1310        return connecting
1311
1312    @async_to_deferred
1313    async def _pick_server_and_get_version(self):
1314        """
1315        Minimal implementation of connection logic: pick a server, get its
1316        version.  This doesn't deal with errors much, so as to minimize
1317        statefulness.  It does change ``self._istorage_server``, so possibly
1318        more refactoring would be useful to remove even that much statefulness.
1319        """
1320        async def get_istorage_server() -> _HTTPStorageServer:
1321            if self._istorage_server is not None:
1322                return self._istorage_server
1323
1324            # We haven't selected a server yet, so let's do so.
1325
1326            # TODO This is somewhat inefficient on startup: it takes two successful
1327            # version() calls before we are live talking to a server, it could only
1328            # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992
1329
1330            @async_to_deferred
1331            async def request(reactor, nurl: DecodedURL):
1332                # Since we're just using this one off to check if the NURL
1333                # works, no need for persistent pool or other fanciness.
1334                pool = HTTPConnectionPool(reactor, persistent=False)
1335                pool.retryAutomatically = False
1336                storage_client = await self._storage_client_factory.create_storage_client(
1337                    nurl, reactor, pool
1338                )
1339                return await StorageClientGeneral(storage_client).get_version()
1340
1341            nurl = await _pick_a_http_server(reactor, self._nurls, request)
1342
1343            # If we've gotten this far, we've found a working NURL.
1344            storage_client = await self._storage_client_factory.create_storage_client(
1345                    nurl, cast(IReactorTime, reactor), None
1346            )
1347            self._istorage_server = _HTTPStorageServer.from_http_client(storage_client)
1348            return self._istorage_server
1349
1350        try:
1351            storage_server = await get_istorage_server()
1352
1353            # Get the version from the remote server.
1354            version = await storage_server.get_version()
1355            return version
1356        except Exception as e:
1357            log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS)
1358            raise
1359
1360    def stopService(self):
1361        if self._connecting_deferred is not None:
1362            self._connecting_deferred.cancel()
1363
1364        result = service.MultiService.stopService(self)
1365        if self._lc.running:
1366            self._lc.stop()
1367        self._failed_to_connect("shut down")
1368
1369        if self._istorage_server is not None:
1370            client_shutting_down = self._istorage_server._http_client.shutdown()
1371            result.addCallback(lambda _: client_shutting_down)
1372
1373        return result
1374
1375
1376class UnknownServerTypeError(Exception):
1377    pass
1378
1379
1380@implementer(IStorageServer)
1381@attr.s
1382class _StorageServer(object):
1383    """
1384    ``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via
1385    a ``RemoteReference``.
1386    """
1387    _get_rref = attr.ib()
1388
1389    @property
1390    def _rref(self):
1391        return self._get_rref()
1392
1393    def get_version(self):
1394        return self._rref.callRemote(
1395            "get_version",
1396        )
1397
1398    def allocate_buckets(
1399            self,
1400            storage_index,
1401            renew_secret,
1402            cancel_secret,
1403            sharenums,
1404            allocated_size,
1405            canary,
1406    ):
1407        return self._rref.callRemote(
1408            "allocate_buckets",
1409            storage_index,
1410            renew_secret,
1411            cancel_secret,
1412            sharenums,
1413            allocated_size,
1414            canary,
1415        )
1416
1417    def add_lease(
1418            self,
1419            storage_index,
1420            renew_secret,
1421            cancel_secret,
1422    ):
1423        return self._rref.callRemote(
1424            "add_lease",
1425            storage_index,
1426            renew_secret,
1427            cancel_secret,
1428        )
1429
1430    def get_buckets(
1431            self,
1432            storage_index,
1433    ):
1434        return self._rref.callRemote(
1435            "get_buckets",
1436            storage_index,
1437        )
1438
1439    def slot_readv(
1440            self,
1441            storage_index,
1442            shares,
1443            readv,
1444    ):
1445        return self._rref.callRemote(
1446            "slot_readv",
1447            storage_index,
1448            shares,
1449            readv,
1450        )
1451
1452    def slot_testv_and_readv_and_writev(
1453            self,
1454            storage_index,
1455            secrets,
1456            tw_vectors,
1457            r_vector,
1458    ):
1459        # Match the wire protocol, which requires 4-tuples for test vectors.
1460        wire_format_tw_vectors = {
1461            key: (
1462                [(start, length, b"eq", data) for (start, length, data) in value[0]],
1463                value[1],
1464                value[2],
1465            ) for (key, value) in tw_vectors.items()
1466        }
1467        return self._rref.callRemote(
1468            "slot_testv_and_readv_and_writev",
1469            storage_index,
1470            secrets,
1471            wire_format_tw_vectors,
1472            r_vector,
1473        )
1474
1475    def advise_corrupt_share(
1476            self,
1477            share_type,
1478            storage_index,
1479            shnum,
1480            reason,
1481    ):
1482        return self._rref.callRemote(
1483            "advise_corrupt_share",
1484            share_type,
1485            storage_index,
1486            shnum,
1487            reason,
1488        ).addErrback(log.err, "Error from remote call to advise_corrupt_share")
1489
1490
1491
1492@attr.s(hash=True)
1493class _FakeRemoteReference(object):
1494    """
1495    Emulate a Foolscap RemoteReference, calling a local object instead.
1496    """
1497    local_object = attr.ib(type=object)
1498
1499    @defer.inlineCallbacks
1500    def callRemote(self, action, *args, **kwargs):
1501        try:
1502            result = yield getattr(self.local_object, action)(*args, **kwargs)
1503            defer.returnValue(result)
1504        except HTTPClientException as e:
1505            raise RemoteException((e.code, e.message, e.body))
1506
1507
1508@attr.s
1509class _HTTPBucketWriter(object):
1510    """
1511    Emulate a ``RIBucketWriter``, but use HTTP protocol underneath.
1512    """
1513    client = attr.ib(type=StorageClientImmutables)
1514    storage_index = attr.ib(type=bytes)
1515    share_number = attr.ib(type=int)
1516    upload_secret = attr.ib(type=bytes)
1517    finished = attr.ib(type=defer.Deferred[bool], factory=defer.Deferred)
1518
1519    def abort(self):
1520        return self.client.abort_upload(self.storage_index, self.share_number,
1521                                        self.upload_secret)
1522
1523    @defer.inlineCallbacks
1524    def write(self, offset, data):
1525        result = yield self.client.write_share_chunk(
1526            self.storage_index, self.share_number, self.upload_secret, offset, data
1527        )
1528        if result.finished:
1529            self.finished.callback(True)
1530        defer.returnValue(None)
1531
1532    def close(self):
1533        # We're not _really_ closed until all writes have succeeded and we
1534        # finished writing all the data.
1535        return self.finished
1536
1537
1538def _ignore_404(failure: Failure) -> Optional[Failure]:
1539    """
1540    Useful for advise_corrupt_share(), since it swallows unknown share numbers
1541    in Foolscap.
1542    """
1543    if failure.check(HTTPClientException) and failure.value.code == http.NOT_FOUND:
1544        return None
1545    else:
1546        return failure
1547
1548
1549@attr.s(hash=True)
1550class _HTTPBucketReader(object):
1551    """
1552    Emulate a ``RIBucketReader``, but use HTTP protocol underneath.
1553    """
1554    client = attr.ib(type=StorageClientImmutables)
1555    storage_index = attr.ib(type=bytes)
1556    share_number = attr.ib(type=int)
1557
1558    def read(self, offset, length):
1559        return self.client.read_share_chunk(
1560            self.storage_index, self.share_number, offset, length
1561        )
1562
1563    def advise_corrupt_share(self, reason):
1564       return self.client.advise_corrupt_share(
1565           self.storage_index, self.share_number,
1566           str(reason, "utf-8", errors="backslashreplace")
1567       ).addErrback(_ignore_404)
1568
1569
1570# WORK IN PROGRESS, for now it doesn't actually implement whole thing.
1571@implementer(IStorageServer)  # type: ignore
1572@attr.s
1573class _HTTPStorageServer(object):
1574    """
1575    Talk to remote storage server over HTTP.
1576    """
1577    _http_client = attr.ib(type=StorageClient)
1578
1579    @staticmethod
1580    def from_http_client(http_client: StorageClient) -> _HTTPStorageServer:
1581        """
1582        Create an ``IStorageServer`` from a HTTP ``StorageClient``.
1583        """
1584        return _HTTPStorageServer(http_client=http_client)
1585
1586    def get_version(self) -> defer.Deferred[VersionMessage]:
1587        return StorageClientGeneral(self._http_client).get_version()
1588
1589    @defer.inlineCallbacks
1590    def allocate_buckets(
1591            self,
1592            storage_index,
1593            renew_secret,
1594            cancel_secret,
1595            sharenums,
1596            allocated_size,
1597            canary
1598    ):
1599        upload_secret = urandom(20)
1600        immutable_client = StorageClientImmutables(self._http_client)
1601        result = immutable_client.create(
1602            storage_index, sharenums, allocated_size, upload_secret, renew_secret,
1603            cancel_secret
1604        )
1605        result = yield result
1606        defer.returnValue(
1607            (result.already_have, {
1608                 share_num: _FakeRemoteReference(_HTTPBucketWriter(
1609                     client=immutable_client,
1610                     storage_index=storage_index,
1611                     share_number=share_num,
1612                     upload_secret=upload_secret
1613                 ))
1614                 for share_num in result.allocated
1615            })
1616        )
1617
1618    @defer.inlineCallbacks
1619    def get_buckets(
1620            self,
1621            storage_index
1622    ):
1623        immutable_client = StorageClientImmutables(self._http_client)
1624        share_numbers = yield immutable_client.list_shares(
1625            storage_index
1626        )
1627        defer.returnValue({
1628            share_num: _FakeRemoteReference(_HTTPBucketReader(
1629                immutable_client, storage_index, share_num
1630            ))
1631            for share_num in share_numbers
1632        })
1633
1634    @async_to_deferred
1635    async def add_lease(
1636        self,
1637        storage_index,
1638        renew_secret,
1639        cancel_secret
1640    ):
1641        client = StorageClientGeneral(self._http_client)
1642        try:
1643            await client.add_or_renew_lease(
1644                storage_index, renew_secret, cancel_secret
1645            )
1646        except ClientException as e:
1647            if e.code == http.NOT_FOUND:
1648                # Silently do nothing, as is the case for the Foolscap client
1649                return
1650            raise
1651
1652    def advise_corrupt_share(
1653        self,
1654        share_type,
1655        storage_index,
1656        shnum,
1657        reason: bytes
1658    ):
1659        if share_type == b"immutable":
1660            client : Union[StorageClientImmutables, StorageClientMutables] = StorageClientImmutables(self._http_client)
1661        elif share_type == b"mutable":
1662            client = StorageClientMutables(self._http_client)
1663        else:
1664            raise ValueError("Unknown share type")
1665        return client.advise_corrupt_share(
1666            storage_index, shnum, str(reason, "utf-8", errors="backslashreplace")
1667        ).addErrback(_ignore_404)
1668
1669    @defer.inlineCallbacks
1670    def slot_readv(self, storage_index, shares, readv):
1671        mutable_client = StorageClientMutables(self._http_client)
1672        pending_reads = {}
1673        reads = {}
1674        # If shares list is empty, that means list all shares, so we need
1675        # to do a query to get that.
1676        if not shares:
1677            shares = yield mutable_client.list_shares(storage_index)
1678
1679        # Start all the queries in parallel:
1680        for share_number in shares:
1681            share_reads = defer.gatherResults(
1682                [
1683                    mutable_client.read_share_chunk(
1684                        storage_index, share_number, offset, length
1685                    )
1686                    for (offset, length) in readv
1687                ]
1688            )
1689            pending_reads[share_number] = share_reads
1690
1691        # Wait for all the queries to finish:
1692        for share_number, pending_result in pending_reads.items():
1693            reads[share_number] = yield pending_result
1694
1695        return reads
1696
1697    @defer.inlineCallbacks
1698    def slot_testv_and_readv_and_writev(
1699            self,
1700            storage_index,
1701            secrets,
1702            tw_vectors,
1703            r_vector,
1704    ):
1705        mutable_client = StorageClientMutables(self._http_client)
1706        we_secret, lr_secret, lc_secret = secrets
1707        client_tw_vectors = {}
1708        for share_num, (test_vector, data_vector, new_length) in tw_vectors.items():
1709            client_test_vectors = [
1710                TestVector(offset=offset, size=size, specimen=specimen)
1711                for (offset, size, specimen) in test_vector
1712            ]
1713            client_write_vectors = [
1714                WriteVector(offset=offset, data=data) for (offset, data) in data_vector
1715            ]
1716            client_tw_vectors[share_num] = TestWriteVectors(
1717                test_vectors=client_test_vectors,
1718                write_vectors=client_write_vectors,
1719                new_length=new_length
1720            )
1721        client_read_vectors = [
1722            ReadVector(offset=offset, size=size)
1723            for (offset, size) in r_vector
1724        ]
1725        try:
1726            client_result = yield mutable_client.read_test_write_chunks(
1727                storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors,
1728                client_read_vectors,
1729            )
1730        except ClientException as e:
1731            if e.code == http.UNAUTHORIZED:
1732                raise RemoteException("Unauthorized write, possibly you passed the wrong write enabler?")
1733            raise
1734        return (client_result.success, client_result.reads)
Note: See TracBrowser for help on using the repository browser.