source: trunk/src/allmydata/storage_client.py @ e113cba

Last change on this file since e113cba was d4b5de2e, checked in by GitHub <noreply@…>, at 2019-08-23T13:04:23Z

Merge pull request #653 from tahoe-lafs/3248.pass-config-to-get_storage_client

Pass the full _Config to IFoolscapStoragePlugin.get_storage_client

Fixes: ticket:3248

  • Property mode set to 100644
File size: 32.6 KB
Line 
1
2"""
3I contain the client-side code which speaks to storage servers, in particular
4the foolscap-based server implemented in src/allmydata/storage/*.py .
5"""
6
7# roadmap:
8#
9# 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to
10# create it, change uploader/servermap to get rrefs from it. ServerFarm calls
11# IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs
12# to clients. webapi status pages call broker.get_info_about_serverid.
13#
14# 2: move get_info methods to the descriptor, webapi status pages call
15# broker.get_descriptor_for_serverid().get_info
16#
17# 3?later?: store descriptors in UploadResults/etc instead of serverids,
18# webapi status pages call descriptor.get_info and don't use storage_broker
19# or Client
20#
21# 4: enable static config: tahoe.cfg can add descriptors. Make the introducer
22# optional. This closes #467
23#
24# 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other
25# clients. Clients stop doing callRemote(), use NativeStorageClient methods
26# instead (which might do something else, i.e. http or whatever). The
27# introducer and tahoe.cfg only create NativeStorageClients for now.
28#
29# 6: implement other sorts of IStorageClient classes: S3, etc
30
31
32import re, time, hashlib
33from ConfigParser import (
34    NoSectionError,
35)
36import attr
37from zope.interface import (
38    Attribute,
39    Interface,
40    implementer,
41)
42from twisted.internet import defer
43from twisted.application import service
44from twisted.plugin import (
45    getPlugins,
46)
47from eliot import (
48    log_call,
49)
50from foolscap.api import eventually
51from foolscap.reconnector import (
52    ReconnectionInfo,
53)
54from allmydata.interfaces import (
55    IStorageBroker,
56    IDisplayableServer,
57    IServer,
58    IStorageServer,
59    IFoolscapStoragePlugin,
60)
61from allmydata.util import log, base32, connection_status
62from allmydata.util.assertutil import precondition
63from allmydata.util.observer import ObserverList
64from allmydata.util.rrefutil import add_version_to_remote_reference
65from allmydata.util.hashutil import permute_server_hash
66
67# who is responsible for de-duplication?
68#  both?
69#  IC remembers the unpacked announcements it receives, to provide for late
70#  subscribers and to remove duplicates
71
72# if a client subscribes after startup, will they receive old announcements?
73#  yes
74
75# who will be responsible for signature checking?
76#  make it be IntroducerClient, so they can push the filter outwards and
77#  reduce inbound network traffic
78
79# what should the interface between StorageFarmBroker and IntroducerClient
80# look like?
81#  don't pass signatures: only pass validated blessed-objects
82
83@attr.s
84class StorageClientConfig(object):
85    """
86    Configuration for a node acting as a storage client.
87
88    :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the
89        storage servers where share placement is preferred, in order of
90        decreasing preference.  See the *[client]peers.preferred*
91        documentation for details.
92
93    :ivar dict[unicode, dict[bytes, bytes]] storage_plugins: A mapping from
94        names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the
95        respective configuration.
96    """
97    preferred_peers = attr.ib(default=())
98    storage_plugins = attr.ib(default=attr.Factory(dict))
99
100    @classmethod
101    def from_node_config(cls, config):
102        """
103        Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node
104        configuration.
105
106        :param _Config config: The loaded Tahoe-LAFS node configuration.
107        """
108        ps = config.get_config("client", "peers.preferred", b"").split(b",")
109        preferred_peers = tuple([p.strip() for p in ps if p != b""])
110
111        enabled_storage_plugins = (
112            name.strip()
113            for name
114            in config.get_config(
115                b"client",
116                b"storage.plugins",
117                b"",
118            ).decode("utf-8").split(u",")
119            if name.strip()
120        )
121
122        storage_plugins = {}
123        for plugin_name in enabled_storage_plugins:
124            try:
125                plugin_config = config.items(b"storageclient.plugins." + plugin_name)
126            except NoSectionError:
127                plugin_config = []
128            storage_plugins[plugin_name] = dict(plugin_config)
129
130        return cls(
131            preferred_peers,
132            storage_plugins,
133        )
134
135
136@implementer(IStorageBroker)
137class StorageFarmBroker(service.MultiService):
138    """I live on the client, and know about storage servers. For each server
139    that is participating in a grid, I either maintain a connection to it or
140    remember enough information to establish a connection to it on demand.
141    I'm also responsible for subscribing to the IntroducerClient to find out
142    about new servers as they are announced by the Introducer.
143
144    :ivar StorageClientConfig storage_client_config: Values from the node
145        configuration file relating to storage behavior.
146    """
147
148    @property
149    def preferred_peers(self):
150        return self.storage_client_config.preferred_peers
151
152    def __init__(
153            self,
154            permute_peers,
155            tub_maker,
156            node_config,
157            storage_client_config=None,
158    ):
159        service.MultiService.__init__(self)
160        assert permute_peers # False not implemented yet
161        self.permute_peers = permute_peers
162        self._tub_maker = tub_maker
163
164        self.node_config = node_config
165
166        if storage_client_config is None:
167            storage_client_config = StorageClientConfig()
168        self.storage_client_config = storage_client_config
169
170        # self.servers maps serverid -> IServer, and keeps track of all the
171        # storage servers that we've heard about. Each descriptor manages its
172        # own Reconnector, and will give us a RemoteReference when we ask
173        # them for it.
174        self.servers = {}
175        self._static_server_ids = set() # ignore announcements for these
176        self.introducer_client = None
177        self._threshold_listeners = [] # tuples of (threshold, Deferred)
178        self._connected_high_water_mark = 0
179
180    @log_call(action_type=u"storage-client:broker:set-static-servers")
181    def set_static_servers(self, servers):
182        # Sorting the items gives us a deterministic processing order.  This
183        # doesn't really matter but it makes the logging behavior more
184        # predictable and easier to test (and at least one test does depend on
185        # this sorted order).
186        for (server_id, server) in sorted(servers.items()):
187            try:
188                storage_server = self._make_storage_server(server_id, server)
189            except Exception:
190                # TODO: The _make_storage_server failure is logged but maybe
191                # we should write a traceback here.  Notably, tests don't
192                # automatically fail just because we hit this case.  Well
193                # written tests will still fail if a surprising exception
194                # arrives here but they might be harder to debug without this
195                # information.
196                pass
197            else:
198                self._static_server_ids.add(server_id)
199                self.servers[server_id] = storage_server
200                storage_server.setServiceParent(self)
201                storage_server.start_connecting(self._trigger_connections)
202
203    def get_client_storage_plugin_web_resources(self, node_config):
204        """
205        Get all of the client-side ``IResource`` implementations provided by
206        enabled storage plugins.
207
208        :param allmydata.node._Config node_config: The complete node
209            configuration for the node from which these web resources will be
210            served.
211
212        :return dict[unicode, IResource]: Resources for all of the plugins.
213        """
214        plugins = {
215            plugin.name: plugin
216            for plugin
217            in getPlugins(IFoolscapStoragePlugin)
218        }
219        return {
220            name: plugins[name].get_client_resource(node_config)
221            for (name, config)
222            in self.storage_client_config.storage_plugins.items()
223        }
224
225    @log_call(
226        action_type=u"storage-client:broker:make-storage-server",
227        include_args=["server_id"],
228        include_result=False,
229    )
230    def _make_storage_server(self, server_id, server):
231        assert isinstance(server_id, unicode) # from YAML
232        server_id = server_id.encode("ascii")
233        handler_overrides = server.get("connections", {})
234        s = NativeStorageServer(
235            server_id,
236            server["ann"],
237            self._tub_maker,
238            handler_overrides,
239            self.node_config,
240            self.storage_client_config,
241        )
242        s.on_status_changed(lambda _: self._got_connection())
243        return s
244
245    def when_connected_enough(self, threshold):
246        """
247        :returns: a Deferred that fires if/when our high water mark for
248        number of connected servers becomes (or ever was) above
249        "threshold".
250        """
251        d = defer.Deferred()
252        self._threshold_listeners.append( (threshold, d) )
253        self._check_connected_high_water_mark()
254        return d
255
256    # these two are used in unit tests
257    def test_add_rref(self, serverid, rref, ann):
258        s = self._make_storage_server(
259            serverid.decode("ascii"),
260            {"ann": ann.copy()},
261        )
262        s._rref = rref
263        s._is_connected = True
264        self.servers[serverid] = s
265
266    def test_add_server(self, server_id, s):
267        s.on_status_changed(lambda _: self._got_connection())
268        self.servers[server_id] = s
269
270    def use_introducer(self, introducer_client):
271        self.introducer_client = ic = introducer_client
272        ic.subscribe_to("storage", self._got_announcement)
273
274    def _got_connection(self):
275        # this is called by NativeStorageServer when it is connected
276        self._check_connected_high_water_mark()
277
278    def _check_connected_high_water_mark(self):
279        current = len(self.get_connected_servers())
280        if current > self._connected_high_water_mark:
281            self._connected_high_water_mark = current
282
283        remaining = []
284        for threshold, d in self._threshold_listeners:
285            if self._connected_high_water_mark >= threshold:
286                eventually(d.callback, None)
287            else:
288                remaining.append( (threshold, d) )
289        self._threshold_listeners = remaining
290
291    def _got_announcement(self, key_s, ann):
292        precondition(isinstance(key_s, str), key_s)
293        precondition(key_s.startswith("v0-"), key_s)
294        precondition(ann["service-name"] == "storage", ann["service-name"])
295        server_id = key_s
296        if server_id in self._static_server_ids:
297            log.msg(format="ignoring announcement for static server '%(id)s'",
298                    id=server_id,
299                    facility="tahoe.storage_broker", umid="AlxzqA",
300                    level=log.UNUSUAL)
301            return
302        s = self._make_storage_server(
303            server_id.decode("utf-8"),
304            {u"ann": ann},
305        )
306        server_id = s.get_serverid()
307        old = self.servers.get(server_id)
308        if old:
309            if old.get_announcement() == ann:
310                return # duplicate
311            # replacement
312            del self.servers[server_id]
313            old.stop_connecting()
314            old.disownServiceParent()
315            # NOTE: this disownServiceParent() returns a Deferred that
316            # doesn't fire until Tub.stopService fires, which will wait for
317            # any existing connections to be shut down. This doesn't
318            # generally matter for normal runtime, but unit tests can run
319            # into DirtyReactorErrors if they don't block on these. If a test
320            # replaces one server with a newer version, then terminates
321            # before the old one has been shut down, it might get
322            # DirtyReactorErrors. The fix would be to gather these Deferreds
323            # into a structure that will block StorageFarmBroker.stopService
324            # until they have fired (but hopefully don't keep reference
325            # cycles around when they fire earlier than that, which will
326            # almost always be the case for normal runtime).
327        # now we forget about them and start using the new one
328        s.setServiceParent(self)
329        self.servers[server_id] = s
330        s.start_connecting(self._trigger_connections)
331        # the descriptor will manage their own Reconnector, and each time we
332        # need servers, we'll ask them if they're connected or not.
333
334    def _trigger_connections(self):
335        # when one connection is established, reset the timers on all others,
336        # to trigger a reconnection attempt in one second. This is intended
337        # to accelerate server connections when we've been offline for a
338        # while. The goal is to avoid hanging out for a long time with
339        # connections to only a subset of the servers, which would increase
340        # the chances that we'll put shares in weird places (and not update
341        # existing shares of mutable files). See #374 for more details.
342        for dsc in self.servers.values():
343            dsc.try_to_connect()
344
345    def get_servers_for_psi(self, peer_selection_index):
346        # return a list of server objects (IServers)
347        assert self.permute_peers == True
348        connected_servers = self.get_connected_servers()
349        preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers)
350        def _permuted(server):
351            seed = server.get_permutation_seed()
352            is_unpreferred = server not in preferred_servers
353            return (is_unpreferred,
354                    permute_server_hash(peer_selection_index, seed))
355        return sorted(connected_servers, key=_permuted)
356
357    def get_all_serverids(self):
358        return frozenset(self.servers.keys())
359
360    def get_connected_servers(self):
361        return frozenset([s for s in self.servers.values() if s.is_connected()])
362
363    def get_known_servers(self):
364        return frozenset(self.servers.values())
365
366    def get_nickname_for_serverid(self, serverid):
367        if serverid in self.servers:
368            return self.servers[serverid].get_nickname()
369        return None
370
371    def get_stub_server(self, serverid):
372        if serverid in self.servers:
373            return self.servers[serverid]
374        # some time before 1.12, we changed "serverid" to be "key_s" (the
375        # printable verifying key, used in V2 announcements), instead of the
376        # tubid. When the immutable uploader delegates work to a Helper,
377        # get_stub_server() is used to map the returning server identifiers
378        # to IDisplayableServer instances (to get a name, for display on the
379        # Upload Results web page). If the Helper is running 1.12 or newer,
380        # it will send pubkeys, but if it's still running 1.11, it will send
381        # tubids. This clause maps the old tubids to our existing servers.
382        for s in self.servers.values():
383            if isinstance(s, NativeStorageServer):
384                if serverid == s.get_tubid():
385                    return s
386        return StubServer(serverid)
387
388@implementer(IDisplayableServer)
389class StubServer(object):
390    def __init__(self, serverid):
391        self.serverid = serverid # binary tubid
392    def get_serverid(self):
393        return self.serverid
394    def get_name(self):
395        return base32.b2a(self.serverid)[:8]
396    def get_longname(self):
397        return base32.b2a(self.serverid)
398    def get_nickname(self):
399        return "?"
400
401
402class IFoolscapStorageServer(Interface):
403    """
404    An internal interface that mediates between ``NativeStorageServer`` and
405    Foolscap-based ``IStorageServer`` implementations.
406    """
407    nickname = Attribute("""
408    A name for this server for presentation to users.
409    """)
410    permutation_seed = Attribute("""
411    A stable value associated with this server which a client can use as an
412    input to the server selection permutation ordering.
413    """)
414    tubid = Attribute("""
415    The identifier for the Tub in which the server is run.
416    """)
417    storage_server = Attribute("""
418    An IStorageServer provide which implements a concrete Foolscap-based
419    protocol for communicating with the server.
420    """)
421    name = Attribute("""
422    Another name for this server for presentation to users.
423    """)
424    longname = Attribute("""
425    *Another* name for this server for presentation to users.
426    """)
427    lease_seed = Attribute("""
428    A stable value associated with this server which a client can use as an
429    input to a lease secret generation function.
430    """)
431
432    def connect_to(tub, got_connection):
433        """
434        Attempt to establish and maintain a connection to the server.
435
436        :param Tub tub: A Foolscap Tub from which the connection is to
437            originate.
438
439        :param got_connection: A one-argument callable which is called with a
440            Foolscap ``RemoteReference`` when a connection is established.
441            This may be called multiple times if the connection is lost and
442            then re-established.
443
444        :return foolscap.reconnector.Reconnector: An object which manages the
445            connection and reconnection attempts.
446        """
447
448
449@implementer(IFoolscapStorageServer)
450@attr.s(frozen=True)
451class _FoolscapStorage(object):
452    """
453    Abstraction for connecting to a storage server exposed via Foolscap.
454    """
455    nickname = attr.ib()
456    permutation_seed = attr.ib()
457    tubid = attr.ib()
458
459    storage_server = attr.ib(validator=attr.validators.provides(IStorageServer))
460
461    _furl = attr.ib()
462    _short_description = attr.ib()
463    _long_description = attr.ib()
464
465
466    @property
467    def name(self):
468        return self._short_description
469
470    @property
471    def longname(self):
472        return self._long_description
473
474    @property
475    def lease_seed(self):
476        return self.tubid
477
478    @classmethod
479    def from_announcement(cls, server_id, furl, ann, storage_server):
480        """
481        Create an instance from a fURL and an announcement like::
482
483            {"permutation-seed-base32": "...",
484             "nickname": "...",
485            }
486
487        *nickname* is optional.
488        """
489        m = re.match(r'pb://(\w+)@', furl)
490        assert m, furl
491        tubid_s = m.group(1).lower()
492        tubid = base32.a2b(tubid_s)
493        if "permutation-seed-base32" in ann:
494            ps = base32.a2b(str(ann["permutation-seed-base32"]))
495        elif re.search(r'^v0-[0-9a-zA-Z]{52}$', server_id):
496            ps = base32.a2b(server_id[3:])
497        else:
498            log.msg("unable to parse serverid '%(server_id)s as pubkey, "
499                    "hashing it to get permutation-seed, "
500                    "may not converge with other clients",
501                    server_id=server_id,
502                    facility="tahoe.storage_broker",
503                    level=log.UNUSUAL, umid="qu86tw")
504            ps = hashlib.sha256(server_id).digest()
505        permutation_seed = ps
506
507        assert server_id
508        long_description = server_id
509        if server_id.startswith("v0-"):
510            # remove v0- prefix from abbreviated name
511            short_description = server_id[3:3+8]
512        else:
513            short_description = server_id[:8]
514        nickname = ann.get("nickname", "")
515
516        return cls(
517            nickname=nickname,
518            permutation_seed=permutation_seed,
519            tubid=tubid,
520            storage_server=storage_server,
521            furl=furl,
522            short_description=short_description,
523            long_description=long_description,
524        )
525
526    def connect_to(self, tub, got_connection):
527        return tub.connectTo(self._furl, got_connection)
528
529
530@implementer(IFoolscapStorageServer)
531class _NullStorage(object):
532    """
533    Abstraction for *not* communicating with a storage server of a type with
534    which we can't communicate.
535    """
536    nickname = ""
537    permutation_seed = hashlib.sha256("").digest()
538    tubid = hashlib.sha256("").digest()
539    storage_server = None
540
541    lease_seed = hashlib.sha256("").digest()
542
543    name = "<unsupported>"
544    longname = "<storage with unsupported protocol>"
545
546    def connect_to(self, tub, got_connection):
547        return NonReconnector()
548
549
550class NonReconnector(object):
551    """
552    A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything.
553    """
554    def stopConnecting(self):
555        pass
556
557    def reset(self):
558        pass
559
560    def getReconnectionInfo(self):
561        return ReconnectionInfo()
562
563_null_storage = _NullStorage()
564
565
566class AnnouncementNotMatched(Exception):
567    """
568    A storage server announcement wasn't matched by any of the locally enabled
569    plugins.
570    """
571
572
573def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref):
574    """
575    Construct an ``IStorageServer`` from the most locally-preferred plugin
576    that is offered in the given announcement.
577
578    :param allmydata.node._Config node_config: The node configuration to
579        pass to the plugin.
580    """
581    plugins = {
582        plugin.name: plugin
583        for plugin
584        in getPlugins(IFoolscapStoragePlugin)
585    }
586    storage_options = announcement.get(u"storage-options", [])
587    for plugin_name, plugin_config in config.storage_plugins.items():
588        try:
589            plugin = plugins[plugin_name]
590        except KeyError:
591            raise ValueError("{} not installed".format(plugin_name))
592        for option in storage_options:
593            if plugin_name == option[u"name"]:
594                furl = option[u"storage-server-FURL"]
595                return furl, plugin.get_storage_client(
596                    node_config,
597                    option,
598                    get_rref,
599                )
600    raise AnnouncementNotMatched()
601
602
603@implementer(IServer)
604class NativeStorageServer(service.MultiService):
605    """I hold information about a storage server that we want to connect to.
606    If we are connected, I hold the RemoteReference, their host address, and
607    the their version information. I remember information about when we were
608    last connected too, even if we aren't currently connected.
609
610    @ivar last_connect_time: when we last established a connection
611    @ivar last_loss_time: when we last lost a connection
612
613    @ivar version: the server's versiondict, from the most recent announcement
614    @ivar nickname: the server's self-reported nickname (unicode), same
615
616    @ivar rref: the RemoteReference, if connected, otherwise None
617    @ivar remote_host: the IAddress, if connected, otherwise None
618    """
619
620    VERSION_DEFAULTS = {
621        "http://allmydata.org/tahoe/protocols/storage/v1" :
622        { "maximum-immutable-share-size": 2**32 - 1,
623          "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2
624          "tolerates-immutable-read-overrun": False,
625          "delete-mutable-shares-with-zero-length-writev": False,
626          "available-space": None,
627          },
628        "application-version": "unknown: no get_version()",
629        }
630
631    def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()):
632        service.MultiService.__init__(self)
633        assert isinstance(server_id, str)
634        self._server_id = server_id
635        self.announcement = ann
636        self._tub_maker = tub_maker
637        self._handler_overrides = handler_overrides
638
639        self._storage = self._make_storage_system(node_config, config, ann)
640
641        self.last_connect_time = None
642        self.last_loss_time = None
643        self.remote_host = None
644        self._rref = None
645        self._is_connected = False
646        self._reconnector = None
647        self._trigger_cb = None
648        self._on_status_changed = ObserverList()
649
650    def _make_storage_system(self, node_config, config, ann):
651        """
652        :param allmydata.node._Config node_config: The node configuration to pass
653            to any configured storage plugins.
654
655        :param StorageClientConfig config: Configuration specifying desired
656            storage client behavior.
657
658        :param dict ann: The storage announcement from the storage server we
659            are meant to communicate with.
660
661        :return IFoolscapStorageServer: An object enabling communication via
662            Foolscap with the server which generated the announcement.
663        """
664        # Try to match the announcement against a plugin.
665        try:
666            furl, storage_server = _storage_from_foolscap_plugin(
667                node_config,
668                config,
669                ann,
670                # Pass in an accessor for our _rref attribute.  The value of
671                # the attribute may change over time as connections are lost
672                # and re-established.  The _StorageServer should always be
673                # able to get the most up-to-date value.
674                self.get_rref,
675            )
676        except AnnouncementNotMatched:
677            # Nope.
678            pass
679        else:
680            return _FoolscapStorage.from_announcement(
681                self._server_id,
682                furl.encode("utf-8"),
683                ann,
684                storage_server,
685            )
686
687        # Try to match the announcement against the anonymous access scheme.
688        try:
689            furl = ann[u"anonymous-storage-FURL"]
690        except KeyError:
691            # Nope
692            pass
693        else:
694            # See comment above for the _storage_from_foolscap_plugin case
695            # about passing in get_rref.
696            storage_server = _StorageServer(get_rref=self.get_rref)
697            return _FoolscapStorage.from_announcement(
698                self._server_id,
699                furl.encode("utf-8"),
700                ann,
701                storage_server,
702            )
703
704        # Nothing matched so we can't talk to this server.
705        return _null_storage
706
707    def get_permutation_seed(self):
708        return self._storage.permutation_seed
709    def get_name(self): # keep methodname short
710        # TODO: decide who adds [] in the short description. It should
711        # probably be the output side, not here.
712        return self._storage.name
713    def get_longname(self):
714        return self._storage.longname
715    def get_tubid(self):
716        return self._storage.tubid
717    def get_lease_seed(self):
718        return self._storage.lease_seed
719    def get_foolscap_write_enabler_seed(self):
720        return self._storage.tubid
721    def get_nickname(self):
722        return self._storage.nickname
723
724    def on_status_changed(self, status_changed):
725        """
726        :param status_changed: a callable taking a single arg (the
727            NativeStorageServer) that is notified when we become connected
728        """
729        return self._on_status_changed.subscribe(status_changed)
730
731    # Special methods used by copy.copy() and copy.deepcopy(). When those are
732    # used in allmydata.immutable.filenode to copy CheckResults during
733    # repair, we want it to treat the IServer instances as singletons, and
734    # not attempt to duplicate them..
735    def __copy__(self):
736        return self
737    def __deepcopy__(self, memodict):
738        return self
739
740    def __repr__(self):
741        return "<NativeStorageServer for %s>" % self.get_name()
742    def get_serverid(self):
743        return self._server_id
744    def get_version(self):
745        if self._rref:
746            return self._rref.version
747        return None
748    def get_announcement(self):
749        return self.announcement
750    def get_remote_host(self):
751        return self.remote_host
752
753    def get_connection_status(self):
754        last_received = None
755        if self._rref:
756            last_received = self._rref.getDataLastReceivedAt()
757        return connection_status.from_foolscap_reconnector(self._reconnector,
758                                                           last_received)
759
760    def is_connected(self):
761        return self._is_connected
762
763    def get_available_space(self):
764        version = self.get_version()
765        if version is None:
766            return None
767        protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {})
768        available_space = protocol_v1_version.get('available-space')
769        if available_space is None:
770            available_space = protocol_v1_version.get('maximum-immutable-share-size', None)
771        return available_space
772
773    def start_connecting(self, trigger_cb):
774        self._tub = self._tub_maker(self._handler_overrides)
775        self._tub.setServiceParent(self)
776        self._trigger_cb = trigger_cb
777        self._reconnector = self._storage.connect_to(self._tub, self._got_connection)
778
779    def _got_connection(self, rref):
780        lp = log.msg(format="got connection to %(name)s, getting versions",
781                     name=self.get_name(),
782                     facility="tahoe.storage_broker", umid="coUECQ")
783        if self._trigger_cb:
784            eventually(self._trigger_cb)
785        default = self.VERSION_DEFAULTS
786        d = add_version_to_remote_reference(rref, default)
787        d.addCallback(self._got_versioned_service, lp)
788        d.addCallback(lambda ign: self._on_status_changed.notify(self))
789        d.addErrback(log.err, format="storageclient._got_connection",
790                     name=self.get_name(), umid="Sdq3pg")
791
792    def _got_versioned_service(self, rref, lp):
793        log.msg(format="%(name)s provided version info %(version)s",
794                name=self.get_name(), version=rref.version,
795                facility="tahoe.storage_broker", umid="SWmJYg",
796                level=log.NOISY, parent=lp)
797
798        self.last_connect_time = time.time()
799        self.remote_host = rref.getLocationHints()
800        self._rref = rref
801        self._is_connected = True
802        rref.notifyOnDisconnect(self._lost)
803
804    def get_rref(self):
805        return self._rref
806
807    def get_storage_server(self):
808        """
809        See ``IServer.get_storage_server``.
810        """
811        if self._rref is None:
812            return None
813        return self._storage.storage_server
814
815    def _lost(self):
816        log.msg(format="lost connection to %(name)s", name=self.get_name(),
817                facility="tahoe.storage_broker", umid="zbRllw")
818        self.last_loss_time = time.time()
819        # self._rref is now stale: all callRemote()s will get a
820        # DeadReferenceError. We leave the stale reference in place so that
821        # uploader/downloader code (which received this IServer through
822        # get_connected_servers() or get_servers_for_psi()) can continue to
823        # use s.get_rref().callRemote() and not worry about it being None.
824        self._is_connected = False
825        self.remote_host = None
826
827    def stop_connecting(self):
828        # used when this descriptor has been superceded by another
829        self._reconnector.stopConnecting()
830
831    def try_to_connect(self):
832        # used when the broker wants us to hurry up
833        self._reconnector.reset()
834
835class UnknownServerTypeError(Exception):
836    pass
837
838
839@implementer(IStorageServer)
840@attr.s
841class _StorageServer(object):
842    """
843    ``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via
844    a ``RemoteReference``.
845    """
846    _get_rref = attr.ib()
847
848    @property
849    def _rref(self):
850        return self._get_rref()
851
852    def get_version(self):
853        return self._rref.callRemote(
854            "get_version",
855        )
856
857    def allocate_buckets(
858            self,
859            storage_index,
860            renew_secret,
861            cancel_secret,
862            sharenums,
863            allocated_size,
864            canary,
865    ):
866        return self._rref.callRemote(
867            "allocate_buckets",
868            storage_index,
869            renew_secret,
870            cancel_secret,
871            sharenums,
872            allocated_size,
873            canary,
874        )
875
876    def add_lease(
877            self,
878            storage_index,
879            renew_secret,
880            cancel_secret,
881    ):
882        return self._rref.callRemote(
883            "add_lease",
884            storage_index,
885            renew_secret,
886            cancel_secret,
887        )
888
889    def renew_lease(
890            self,
891            storage_index,
892            renew_secret,
893    ):
894        return self._rref.callRemote(
895            "renew_lease",
896            storage_index,
897            renew_secret,
898        )
899
900    def get_buckets(
901            self,
902            storage_index,
903    ):
904        return self._rref.callRemote(
905            "get_buckets",
906            storage_index,
907        )
908
909    def slot_readv(
910            self,
911            storage_index,
912            shares,
913            readv,
914    ):
915        return self._rref.callRemote(
916            "slot_readv",
917            storage_index,
918            shares,
919            readv,
920        )
921
922    def slot_testv_and_readv_and_writev(
923            self,
924            storage_index,
925            secrets,
926            tw_vectors,
927            r_vector,
928    ):
929        return self._rref.callRemote(
930            "slot_testv_and_readv_and_writev",
931            storage_index,
932            secrets,
933            tw_vectors,
934            r_vector,
935        )
936
937    def advise_corrupt_share(
938            self,
939            share_type,
940            storage_index,
941            shnum,
942            reason,
943    ):
944        return self._rref.callRemoteOnly(
945            "advise_corrupt_share",
946            share_type,
947            storage_index,
948            shnum,
949            reason,
950        )
Note: See TracBrowser for help on using the repository browser.