1 | |
---|
2 | """ |
---|
3 | I contain the client-side code which speaks to storage servers, in particular |
---|
4 | the foolscap-based server implemented in src/allmydata/storage/*.py . |
---|
5 | |
---|
6 | Ported to Python 3. |
---|
7 | """ |
---|
8 | |
---|
9 | # roadmap: |
---|
10 | # |
---|
11 | # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to |
---|
12 | # create it, change uploader/servermap to get rrefs from it. ServerFarm calls |
---|
13 | # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs |
---|
14 | # to clients. webapi status pages call broker.get_info_about_serverid. |
---|
15 | # |
---|
16 | # 2: move get_info methods to the descriptor, webapi status pages call |
---|
17 | # broker.get_descriptor_for_serverid().get_info |
---|
18 | # |
---|
19 | # 3?later?: store descriptors in UploadResults/etc instead of serverids, |
---|
20 | # webapi status pages call descriptor.get_info and don't use storage_broker |
---|
21 | # or Client |
---|
22 | # |
---|
23 | # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer |
---|
24 | # optional. This closes #467 |
---|
25 | # |
---|
26 | # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other |
---|
27 | # clients. Clients stop doing callRemote(), use NativeStorageClient methods |
---|
28 | # instead (which might do something else, i.e. http or whatever). The |
---|
29 | # introducer and tahoe.cfg only create NativeStorageClients for now. |
---|
30 | # |
---|
31 | # 6: implement other sorts of IStorageClient classes: S3, etc |
---|
32 | |
---|
33 | from __future__ import annotations |
---|
34 | |
---|
35 | from typing import Union, Callable, Any, Optional, cast, Dict, Iterable |
---|
36 | from os import urandom |
---|
37 | import re |
---|
38 | import time |
---|
39 | import hashlib |
---|
40 | from io import StringIO |
---|
41 | from configparser import NoSectionError |
---|
42 | import json |
---|
43 | |
---|
44 | import attr |
---|
45 | from attr import define |
---|
46 | from hyperlink import DecodedURL |
---|
47 | from twisted.web.client import HTTPConnectionPool |
---|
48 | from zope.interface import ( |
---|
49 | Attribute, |
---|
50 | Interface, |
---|
51 | implementer, |
---|
52 | ) |
---|
53 | from twisted.python.failure import Failure |
---|
54 | from twisted.web import http |
---|
55 | from twisted.internet.task import LoopingCall |
---|
56 | from twisted.internet import defer, reactor |
---|
57 | from twisted.internet.interfaces import IReactorTime |
---|
58 | from twisted.application import service |
---|
59 | from twisted.logger import Logger |
---|
60 | from twisted.plugin import ( |
---|
61 | getPlugins, |
---|
62 | ) |
---|
63 | from eliot import ( |
---|
64 | log_call, |
---|
65 | ) |
---|
66 | from foolscap.ipb import IRemoteReference |
---|
67 | from foolscap.api import eventually, RemoteException |
---|
68 | from foolscap.reconnector import ( |
---|
69 | ReconnectionInfo, |
---|
70 | ) |
---|
71 | from allmydata.interfaces import ( |
---|
72 | IStorageBroker, |
---|
73 | IDisplayableServer, |
---|
74 | IServer, |
---|
75 | IStorageServer, |
---|
76 | IFoolscapStoragePlugin, |
---|
77 | VersionMessage |
---|
78 | ) |
---|
79 | from allmydata.grid_manager import ( |
---|
80 | create_grid_manager_verifier, SignedCertificate |
---|
81 | ) |
---|
82 | from allmydata.crypto import ( |
---|
83 | ed25519, |
---|
84 | ) |
---|
85 | from allmydata.util.tor_provider import _Provider as TorProvider |
---|
86 | from allmydata.util import log, base32, connection_status |
---|
87 | from allmydata.util.assertutil import precondition |
---|
88 | from allmydata.util.observer import ObserverList |
---|
89 | from allmydata.util.rrefutil import add_version_to_remote_reference |
---|
90 | from allmydata.util.hashutil import permute_server_hash |
---|
91 | from allmydata.util.dictutil import BytesKeyDict, UnicodeKeyDict |
---|
92 | from allmydata.util.deferredutil import async_to_deferred, race |
---|
93 | from allmydata.util.attrs_provides import provides |
---|
94 | from allmydata.storage.http_client import ( |
---|
95 | StorageClient, StorageClientImmutables, StorageClientGeneral, |
---|
96 | ClientException as HTTPClientException, StorageClientMutables, |
---|
97 | ReadVector, TestWriteVectors, WriteVector, TestVector, ClientException, |
---|
98 | StorageClientFactory |
---|
99 | ) |
---|
100 | from .node import _Config |
---|
101 | |
---|
102 | _log = Logger() |
---|
103 | |
---|
104 | ANONYMOUS_STORAGE_NURLS = "anonymous-storage-NURLs" |
---|
105 | |
---|
106 | |
---|
107 | # who is responsible for de-duplication? |
---|
108 | # both? |
---|
109 | # IC remembers the unpacked announcements it receives, to provide for late |
---|
110 | # subscribers and to remove duplicates |
---|
111 | |
---|
112 | # if a client subscribes after startup, will they receive old announcements? |
---|
113 | # yes |
---|
114 | |
---|
115 | # who will be responsible for signature checking? |
---|
116 | # make it be IntroducerClient, so they can push the filter outwards and |
---|
117 | # reduce inbound network traffic |
---|
118 | |
---|
119 | # what should the interface between StorageFarmBroker and IntroducerClient |
---|
120 | # look like? |
---|
121 | # don't pass signatures: only pass validated blessed-objects |
---|
122 | |
---|
123 | @attr.s |
---|
124 | class StorageClientConfig(object): |
---|
125 | """ |
---|
126 | Configuration for a node acting as a storage client. |
---|
127 | |
---|
128 | :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the |
---|
129 | storage servers where share placement is preferred, in order of |
---|
130 | decreasing preference. See the *[client]peers.preferred* documentation |
---|
131 | for details. |
---|
132 | |
---|
133 | :ivar dict[unicode, dict[unicode, unicode]] storage_plugins: A mapping from |
---|
134 | names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the |
---|
135 | respective configuration. |
---|
136 | |
---|
137 | :ivar list[ed25519.VerifyKey] grid_manager_keys: with no keys in |
---|
138 | this list, we'll upload to any storage server. Otherwise, we will |
---|
139 | only upload to a storage-server that has a valid certificate |
---|
140 | signed by at least one of these keys. |
---|
141 | """ |
---|
142 | preferred_peers : Iterable[bytes] = attr.ib(default=()) |
---|
143 | storage_plugins : dict[str, dict[str, str]] = attr.ib(default=attr.Factory(dict)) |
---|
144 | grid_manager_keys : list[ed25519.Ed25519PublicKey] = attr.ib(default=attr.Factory(list)) |
---|
145 | |
---|
146 | @classmethod |
---|
147 | def from_node_config(cls, config): |
---|
148 | """ |
---|
149 | Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node |
---|
150 | configuration. |
---|
151 | |
---|
152 | :param _Config config: The loaded Tahoe-LAFS node configuration. |
---|
153 | """ |
---|
154 | ps = config.get_config("client", "peers.preferred", "").split(",") |
---|
155 | preferred_peers = tuple([p.strip() for p in ps if p != ""]) |
---|
156 | |
---|
157 | enabled_storage_plugins = ( |
---|
158 | name.strip() |
---|
159 | for name |
---|
160 | in config.get_config( |
---|
161 | "client", |
---|
162 | "storage.plugins", |
---|
163 | "", |
---|
164 | ).split(u",") |
---|
165 | if name.strip() |
---|
166 | ) |
---|
167 | |
---|
168 | storage_plugins = {} |
---|
169 | for plugin_name in enabled_storage_plugins: |
---|
170 | try: |
---|
171 | plugin_config = config.items("storageclient.plugins." + plugin_name) |
---|
172 | except NoSectionError: |
---|
173 | plugin_config = [] |
---|
174 | storage_plugins[plugin_name] = dict(plugin_config) |
---|
175 | |
---|
176 | grid_manager_keys = [] |
---|
177 | for name, gm_key in config.enumerate_section('grid_managers').items(): |
---|
178 | grid_manager_keys.append( |
---|
179 | ed25519.verifying_key_from_string(gm_key.encode("ascii")) |
---|
180 | ) |
---|
181 | |
---|
182 | |
---|
183 | return cls( |
---|
184 | preferred_peers, |
---|
185 | storage_plugins, |
---|
186 | grid_manager_keys, |
---|
187 | ) |
---|
188 | |
---|
189 | def get_configured_storage_plugins(self) -> dict[str, IFoolscapStoragePlugin]: |
---|
190 | """ |
---|
191 | :returns: a mapping from names to instances for all available |
---|
192 | plugins |
---|
193 | |
---|
194 | :raises MissingPlugin: if the configuration asks for a plugin |
---|
195 | for which there is no corresponding instance (e.g. it is |
---|
196 | not installed). |
---|
197 | """ |
---|
198 | plugins = { |
---|
199 | plugin.name: plugin |
---|
200 | for plugin |
---|
201 | in getPlugins(IFoolscapStoragePlugin) |
---|
202 | } |
---|
203 | |
---|
204 | # mypy doesn't like "str" in place of Any ... |
---|
205 | configured: Dict[Any, IFoolscapStoragePlugin] = dict() |
---|
206 | for plugin_name in self.storage_plugins: |
---|
207 | try: |
---|
208 | plugin = plugins[plugin_name] |
---|
209 | except KeyError: |
---|
210 | raise MissingPlugin(plugin_name) |
---|
211 | configured[plugin_name] = plugin |
---|
212 | return configured |
---|
213 | |
---|
214 | |
---|
215 | @implementer(IStorageBroker) |
---|
216 | class StorageFarmBroker(service.MultiService): |
---|
217 | """I live on the client, and know about storage servers. For each server |
---|
218 | that is participating in a grid, I either maintain a connection to it or |
---|
219 | remember enough information to establish a connection to it on demand. |
---|
220 | I'm also responsible for subscribing to the IntroducerClient to find out |
---|
221 | about new servers as they are announced by the Introducer. |
---|
222 | |
---|
223 | :ivar _tub_maker: A one-argument callable which accepts a dictionary of |
---|
224 | "handler overrides" and returns a ``foolscap.api.Tub``. |
---|
225 | |
---|
226 | :ivar StorageClientConfig storage_client_config: Values from the node |
---|
227 | configuration file relating to storage behavior. |
---|
228 | """ |
---|
229 | |
---|
230 | @property |
---|
231 | def preferred_peers(self): |
---|
232 | return self.storage_client_config.preferred_peers |
---|
233 | |
---|
234 | def __init__( |
---|
235 | self, |
---|
236 | permute_peers, |
---|
237 | tub_maker, |
---|
238 | node_config: _Config, |
---|
239 | storage_client_config=None, |
---|
240 | default_connection_handlers=None, |
---|
241 | tor_provider: Optional[TorProvider]=None, |
---|
242 | ): |
---|
243 | service.MultiService.__init__(self) |
---|
244 | if default_connection_handlers is None: |
---|
245 | default_connection_handlers = {"tcp": "tcp"} |
---|
246 | |
---|
247 | assert permute_peers # False not implemented yet |
---|
248 | self.permute_peers = permute_peers |
---|
249 | self._tub_maker = tub_maker |
---|
250 | |
---|
251 | self.node_config = node_config |
---|
252 | |
---|
253 | if storage_client_config is None: |
---|
254 | storage_client_config = StorageClientConfig() |
---|
255 | self.storage_client_config = storage_client_config |
---|
256 | |
---|
257 | # self.servers maps serverid -> IServer, and keeps track of all the |
---|
258 | # storage servers that we've heard about. Each descriptor manages its |
---|
259 | # own Reconnector, and will give us a RemoteReference when we ask |
---|
260 | # them for it. |
---|
261 | self.servers = BytesKeyDict() |
---|
262 | self._static_server_ids : set[bytes] = set() # ignore announcements for these |
---|
263 | self.introducer_client = None |
---|
264 | self._threshold_listeners : list[tuple[float,defer.Deferred[Any]]]= [] # tuples of (threshold, Deferred) |
---|
265 | self._connected_high_water_mark = 0 |
---|
266 | self._tor_provider = tor_provider |
---|
267 | self._default_connection_handlers = default_connection_handlers |
---|
268 | |
---|
269 | @log_call(action_type=u"storage-client:broker:set-static-servers") |
---|
270 | def set_static_servers(self, servers): |
---|
271 | # Sorting the items gives us a deterministic processing order. This |
---|
272 | # doesn't really matter but it makes the logging behavior more |
---|
273 | # predictable and easier to test (and at least one test does depend on |
---|
274 | # this sorted order). |
---|
275 | for (server_id, server) in sorted(servers.items()): |
---|
276 | try: |
---|
277 | storage_server = self._make_storage_server( |
---|
278 | server_id.encode("utf-8"), |
---|
279 | server, |
---|
280 | ) |
---|
281 | except Exception: |
---|
282 | # TODO: The _make_storage_server failure is logged but maybe |
---|
283 | # we should write a traceback here. Notably, tests don't |
---|
284 | # automatically fail just because we hit this case. Well |
---|
285 | # written tests will still fail if a surprising exception |
---|
286 | # arrives here but they might be harder to debug without this |
---|
287 | # information. |
---|
288 | pass |
---|
289 | else: |
---|
290 | if isinstance(server_id, str): |
---|
291 | server_id = server_id.encode("utf-8") |
---|
292 | self._static_server_ids.add(server_id) |
---|
293 | self.servers[server_id] = storage_server |
---|
294 | storage_server.setServiceParent(self) |
---|
295 | storage_server.start_connecting(self._trigger_connections) |
---|
296 | |
---|
297 | def get_client_storage_plugin_web_resources(self, node_config): |
---|
298 | """ |
---|
299 | Get all of the client-side ``IResource`` implementations provided by |
---|
300 | enabled storage plugins. |
---|
301 | |
---|
302 | :param allmydata.node._Config node_config: The complete node |
---|
303 | configuration for the node from which these web resources will be |
---|
304 | served. |
---|
305 | |
---|
306 | :return dict[unicode, IResource]: Resources for all of the plugins. |
---|
307 | """ |
---|
308 | plugins = { |
---|
309 | plugin.name: plugin |
---|
310 | for plugin |
---|
311 | in getPlugins(IFoolscapStoragePlugin) |
---|
312 | } |
---|
313 | return UnicodeKeyDict({ |
---|
314 | name: plugins[name].get_client_resource(node_config) |
---|
315 | for (name, config) |
---|
316 | in self.storage_client_config.storage_plugins.items() |
---|
317 | }) |
---|
318 | |
---|
319 | @staticmethod |
---|
320 | def _should_we_use_http(node_config: _Config, announcement: dict) -> bool: |
---|
321 | """ |
---|
322 | Given an announcement dictionary and config, return whether we should |
---|
323 | connect to storage server over HTTP. |
---|
324 | """ |
---|
325 | return not node_config.get_config( |
---|
326 | "client", "force_foolscap", default=False, boolean=True, |
---|
327 | ) and len(announcement.get(ANONYMOUS_STORAGE_NURLS, [])) > 0 |
---|
328 | |
---|
329 | @log_call( |
---|
330 | action_type=u"storage-client:broker:make-storage-server", |
---|
331 | include_args=["server_id"], |
---|
332 | include_result=False, |
---|
333 | ) |
---|
334 | def _make_storage_server(self, server_id, server): |
---|
335 | """ |
---|
336 | Create a new ``IServer`` for the given storage server announcement. |
---|
337 | |
---|
338 | :param bytes server_id: The unique identifier for the server. |
---|
339 | |
---|
340 | :param dict server: The server announcement. See ``Static Server |
---|
341 | Definitions`` in the configuration documentation for details about |
---|
342 | the structure and contents. |
---|
343 | |
---|
344 | :return IServer: The object-y representation of the server described |
---|
345 | by the given announcement. |
---|
346 | """ |
---|
347 | assert isinstance(server_id, bytes) |
---|
348 | gm_verifier = create_grid_manager_verifier( |
---|
349 | self.storage_client_config.grid_manager_keys, |
---|
350 | [SignedCertificate.load(StringIO(json.dumps(data))) for data in server["ann"].get("grid-manager-certificates", [])], |
---|
351 | "pub-{}".format(str(server_id, "ascii")).encode("ascii"), # server_id is v0-<key> not pub-v0-key .. for reasons? |
---|
352 | ) |
---|
353 | |
---|
354 | if self._should_we_use_http(self.node_config, server["ann"]): |
---|
355 | s = HTTPNativeStorageServer( |
---|
356 | server_id, |
---|
357 | server["ann"], |
---|
358 | grid_manager_verifier=gm_verifier, |
---|
359 | default_connection_handlers=self._default_connection_handlers, |
---|
360 | tor_provider=self._tor_provider |
---|
361 | ) |
---|
362 | s.on_status_changed(lambda _: self._got_connection()) |
---|
363 | return s |
---|
364 | |
---|
365 | handler_overrides = server.get("connections", {}) |
---|
366 | s = NativeStorageServer( |
---|
367 | server_id, |
---|
368 | server["ann"], |
---|
369 | self._tub_maker, |
---|
370 | handler_overrides, |
---|
371 | self.node_config, |
---|
372 | self.storage_client_config, |
---|
373 | gm_verifier, |
---|
374 | ) |
---|
375 | s.on_status_changed(lambda _: self._got_connection()) |
---|
376 | return s |
---|
377 | |
---|
378 | def when_connected_enough(self, threshold): |
---|
379 | """ |
---|
380 | :returns: a Deferred that fires if/when our high water mark for |
---|
381 | number of connected servers becomes (or ever was) above |
---|
382 | "threshold". |
---|
383 | """ |
---|
384 | d = defer.Deferred() |
---|
385 | self._threshold_listeners.append( (threshold, d) ) |
---|
386 | self._check_connected_high_water_mark() |
---|
387 | return d |
---|
388 | |
---|
389 | # these two are used in unit tests |
---|
390 | def test_add_rref(self, serverid, rref, ann): |
---|
391 | s = self._make_storage_server( |
---|
392 | serverid, |
---|
393 | {"ann": ann.copy()}, |
---|
394 | ) |
---|
395 | s._rref = rref |
---|
396 | s._is_connected = True |
---|
397 | self.servers[serverid] = s |
---|
398 | |
---|
399 | def test_add_server(self, server_id, s): |
---|
400 | s.on_status_changed(lambda _: self._got_connection()) |
---|
401 | self.servers[server_id] = s |
---|
402 | |
---|
403 | def use_introducer(self, introducer_client): |
---|
404 | self.introducer_client = ic = introducer_client |
---|
405 | ic.subscribe_to("storage", self._got_announcement) |
---|
406 | |
---|
407 | def _got_connection(self): |
---|
408 | # this is called by NativeStorageServer when it is connected |
---|
409 | self._check_connected_high_water_mark() |
---|
410 | |
---|
411 | def _check_connected_high_water_mark(self): |
---|
412 | current = len(self.get_connected_servers()) |
---|
413 | if current > self._connected_high_water_mark: |
---|
414 | self._connected_high_water_mark = current |
---|
415 | |
---|
416 | remaining = [] |
---|
417 | for threshold, d in self._threshold_listeners: |
---|
418 | if self._connected_high_water_mark >= threshold: |
---|
419 | eventually(d.callback, None) |
---|
420 | else: |
---|
421 | remaining.append( (threshold, d) ) |
---|
422 | self._threshold_listeners = remaining |
---|
423 | |
---|
424 | def _should_ignore_announcement(self, server_id, ann): |
---|
425 | """ |
---|
426 | Determine whether a new storage announcement should be discarded or used |
---|
427 | to update our collection of storage servers. |
---|
428 | |
---|
429 | :param bytes server_id: The unique identifier for the storage server |
---|
430 | which made the announcement. |
---|
431 | |
---|
432 | :param dict ann: The announcement. |
---|
433 | |
---|
434 | :return bool: ``True`` if the announcement should be ignored, |
---|
435 | ``False`` if it should be used to update our local storage server |
---|
436 | state. |
---|
437 | """ |
---|
438 | # Let local static configuration always override any announcement for |
---|
439 | # a particular server. |
---|
440 | if server_id in self._static_server_ids: |
---|
441 | log.msg(format="ignoring announcement for static server '%(id)s'", |
---|
442 | id=server_id, |
---|
443 | facility="tahoe.storage_broker", umid="AlxzqA", |
---|
444 | level=log.UNUSUAL) |
---|
445 | return True |
---|
446 | |
---|
447 | try: |
---|
448 | old = self.servers[server_id] |
---|
449 | except KeyError: |
---|
450 | # We don't know anything about this server. Let's use the |
---|
451 | # announcement to change that. |
---|
452 | return False |
---|
453 | else: |
---|
454 | # Determine if this announcement is at all difference from the |
---|
455 | # announcement we already have for the server. If it is the same, |
---|
456 | # we don't need to change anything. |
---|
457 | return old.get_announcement() == ann |
---|
458 | |
---|
459 | def _got_announcement(self, key_s, ann): |
---|
460 | """ |
---|
461 | This callback is given to the introducer and called any time an |
---|
462 | announcement is received which has a valid signature and does not have |
---|
463 | a sequence number less than or equal to a previous sequence number |
---|
464 | seen for that server by that introducer. |
---|
465 | |
---|
466 | Note sequence numbers are not considered between different introducers |
---|
467 | so if we use more than one introducer it is possible for them to |
---|
468 | deliver us stale announcements in some cases. |
---|
469 | """ |
---|
470 | precondition(isinstance(key_s, bytes), key_s) |
---|
471 | precondition(key_s.startswith(b"v0-"), key_s) |
---|
472 | precondition(ann["service-name"] == "storage", ann["service-name"]) |
---|
473 | server_id = key_s |
---|
474 | |
---|
475 | if self._should_ignore_announcement(server_id, ann): |
---|
476 | return |
---|
477 | |
---|
478 | s = self._make_storage_server( |
---|
479 | server_id, |
---|
480 | {u"ann": ann}, |
---|
481 | ) |
---|
482 | |
---|
483 | try: |
---|
484 | old = self.servers.pop(server_id) |
---|
485 | except KeyError: |
---|
486 | pass |
---|
487 | else: |
---|
488 | # It's a replacement, get rid of the old one. |
---|
489 | old.stop_connecting() |
---|
490 | old.disownServiceParent() |
---|
491 | # NOTE: this disownServiceParent() returns a Deferred that |
---|
492 | # doesn't fire until Tub.stopService fires, which will wait for |
---|
493 | # any existing connections to be shut down. This doesn't |
---|
494 | # generally matter for normal runtime, but unit tests can run |
---|
495 | # into DirtyReactorErrors if they don't block on these. If a test |
---|
496 | # replaces one server with a newer version, then terminates |
---|
497 | # before the old one has been shut down, it might get |
---|
498 | # DirtyReactorErrors. The fix would be to gather these Deferreds |
---|
499 | # into a structure that will block StorageFarmBroker.stopService |
---|
500 | # until they have fired (but hopefully don't keep reference |
---|
501 | # cycles around when they fire earlier than that, which will |
---|
502 | # almost always be the case for normal runtime). |
---|
503 | |
---|
504 | # now we forget about them and start using the new one |
---|
505 | s.setServiceParent(self) |
---|
506 | self.servers[server_id] = s |
---|
507 | s.start_connecting(self._trigger_connections) |
---|
508 | # the descriptor will manage their own Reconnector, and each time we |
---|
509 | # need servers, we'll ask them if they're connected or not. |
---|
510 | |
---|
511 | def _trigger_connections(self): |
---|
512 | # when one connection is established, reset the timers on all others, |
---|
513 | # to trigger a reconnection attempt in one second. This is intended |
---|
514 | # to accelerate server connections when we've been offline for a |
---|
515 | # while. The goal is to avoid hanging out for a long time with |
---|
516 | # connections to only a subset of the servers, which would increase |
---|
517 | # the chances that we'll put shares in weird places (and not update |
---|
518 | # existing shares of mutable files). See #374 for more details. |
---|
519 | for dsc in list(self.servers.values()): |
---|
520 | dsc.try_to_connect() |
---|
521 | |
---|
522 | def get_servers_for_psi(self, peer_selection_index, for_upload=False): |
---|
523 | """ |
---|
524 | :param for_upload: used to determine if we should include any |
---|
525 | servers that are invalid according to Grid Manager |
---|
526 | processing. When for_upload is True and we have any Grid |
---|
527 | Manager keys configured, any storage servers with invalid or |
---|
528 | missing certificates will be excluded. |
---|
529 | """ |
---|
530 | # return a list of server objects (IServers) |
---|
531 | assert self.permute_peers == True |
---|
532 | connected_servers = self.get_connected_servers() |
---|
533 | preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers) |
---|
534 | if for_upload: |
---|
535 | # print("upload processing: {}".format([srv.upload_permitted() for srv in connected_servers])) |
---|
536 | connected_servers = [ |
---|
537 | srv |
---|
538 | for srv in connected_servers |
---|
539 | if srv.upload_permitted() |
---|
540 | ] |
---|
541 | |
---|
542 | def _permuted(server): |
---|
543 | seed = server.get_permutation_seed() |
---|
544 | is_unpreferred = server not in preferred_servers |
---|
545 | return (is_unpreferred, |
---|
546 | permute_server_hash(peer_selection_index, seed)) |
---|
547 | return sorted(connected_servers, key=_permuted) |
---|
548 | |
---|
549 | def get_all_serverids(self): |
---|
550 | return frozenset(self.servers.keys()) |
---|
551 | |
---|
552 | def get_connected_servers(self): |
---|
553 | return frozenset([s for s in self.servers.values() if s.is_connected()]) |
---|
554 | |
---|
555 | def get_known_servers(self): |
---|
556 | return frozenset(self.servers.values()) |
---|
557 | |
---|
558 | def get_nickname_for_serverid(self, serverid): |
---|
559 | if serverid in self.servers: |
---|
560 | return self.servers[serverid].get_nickname() |
---|
561 | return None |
---|
562 | |
---|
563 | def get_stub_server(self, serverid): |
---|
564 | if serverid in self.servers: |
---|
565 | return self.servers[serverid] |
---|
566 | # some time before 1.12, we changed "serverid" to be "key_s" (the |
---|
567 | # printable verifying key, used in V2 announcements), instead of the |
---|
568 | # tubid. When the immutable uploader delegates work to a Helper, |
---|
569 | # get_stub_server() is used to map the returning server identifiers |
---|
570 | # to IDisplayableServer instances (to get a name, for display on the |
---|
571 | # Upload Results web page). If the Helper is running 1.12 or newer, |
---|
572 | # it will send pubkeys, but if it's still running 1.11, it will send |
---|
573 | # tubids. This clause maps the old tubids to our existing servers. |
---|
574 | for s in list(self.servers.values()): |
---|
575 | if isinstance(s, NativeStorageServer): |
---|
576 | if serverid == s.get_tubid(): |
---|
577 | return s |
---|
578 | return StubServer(serverid) |
---|
579 | |
---|
580 | @implementer(IDisplayableServer) |
---|
581 | class StubServer(object): |
---|
582 | def __init__(self, serverid): |
---|
583 | assert isinstance(serverid, bytes) |
---|
584 | self.serverid = serverid # binary tubid |
---|
585 | def get_serverid(self): |
---|
586 | return self.serverid |
---|
587 | def get_name(self): |
---|
588 | return base32.b2a(self.serverid)[:8] |
---|
589 | def get_longname(self): |
---|
590 | return base32.b2a(self.serverid) |
---|
591 | def get_nickname(self): |
---|
592 | return "?" |
---|
593 | |
---|
594 | |
---|
595 | class IFoolscapStorageServer(Interface): |
---|
596 | """ |
---|
597 | An internal interface that mediates between ``NativeStorageServer`` and |
---|
598 | Foolscap-based ``IStorageServer`` implementations. |
---|
599 | """ |
---|
600 | nickname = Attribute(""" |
---|
601 | A name for this server for presentation to users. |
---|
602 | """) |
---|
603 | permutation_seed = Attribute(""" |
---|
604 | A stable value associated with this server which a client can use as an |
---|
605 | input to the server selection permutation ordering. |
---|
606 | """) |
---|
607 | tubid = Attribute(""" |
---|
608 | The identifier for the Tub in which the server is run. |
---|
609 | """) |
---|
610 | storage_server = Attribute(""" |
---|
611 | An IStorageServer provide which implements a concrete Foolscap-based |
---|
612 | protocol for communicating with the server. |
---|
613 | """) |
---|
614 | name = Attribute(""" |
---|
615 | Another name for this server for presentation to users. |
---|
616 | """) |
---|
617 | longname = Attribute(""" |
---|
618 | *Another* name for this server for presentation to users. |
---|
619 | """) |
---|
620 | lease_seed = Attribute(""" |
---|
621 | A stable value associated with this server which a client can use as an |
---|
622 | input to a lease secret generation function. |
---|
623 | """) |
---|
624 | |
---|
625 | def connect_to(tub, got_connection): |
---|
626 | """ |
---|
627 | Attempt to establish and maintain a connection to the server. |
---|
628 | |
---|
629 | :param Tub tub: A Foolscap Tub from which the connection is to |
---|
630 | originate. |
---|
631 | |
---|
632 | :param got_connection: A one-argument callable which is called with a |
---|
633 | Foolscap ``RemoteReference`` when a connection is established. |
---|
634 | This may be called multiple times if the connection is lost and |
---|
635 | then re-established. |
---|
636 | |
---|
637 | :return foolscap.reconnector.Reconnector: An object which manages the |
---|
638 | connection and reconnection attempts. |
---|
639 | """ |
---|
640 | |
---|
641 | |
---|
642 | def _parse_announcement(server_id: bytes, furl: bytes, ann: dict) -> tuple[str, bytes, bytes, bytes, bytes]: |
---|
643 | """ |
---|
644 | Parse the furl and announcement, return: |
---|
645 | |
---|
646 | (nickname, permutation_seed, tubid, short_description, long_description) |
---|
647 | """ |
---|
648 | m = re.match(br'pb://(\w+)@', furl) |
---|
649 | assert m, furl |
---|
650 | tubid_s = m.group(1).lower() |
---|
651 | tubid = base32.a2b(tubid_s) |
---|
652 | if "permutation-seed-base32" in ann: |
---|
653 | seed = ann["permutation-seed-base32"] |
---|
654 | if isinstance(seed, str): |
---|
655 | seed = seed.encode("utf-8") |
---|
656 | ps = base32.a2b(seed) |
---|
657 | elif re.search(br'^v0-[0-9a-zA-Z]{52}$', server_id): |
---|
658 | ps = base32.a2b(server_id[3:]) |
---|
659 | else: |
---|
660 | log.msg("unable to parse serverid '%(server_id)s as pubkey, " |
---|
661 | "hashing it to get permutation-seed, " |
---|
662 | "may not converge with other clients", |
---|
663 | server_id=server_id, |
---|
664 | facility="tahoe.storage_broker", |
---|
665 | level=log.UNUSUAL, umid="qu86tw") |
---|
666 | ps = hashlib.sha256(server_id).digest() |
---|
667 | permutation_seed = ps |
---|
668 | |
---|
669 | assert server_id |
---|
670 | long_description = server_id |
---|
671 | if server_id.startswith(b"v0-"): |
---|
672 | # remove v0- prefix from abbreviated name |
---|
673 | short_description = server_id[3:3+8] |
---|
674 | else: |
---|
675 | short_description = server_id[:8] |
---|
676 | nickname = ann.get("nickname", "") |
---|
677 | |
---|
678 | return (nickname, permutation_seed, tubid, short_description, long_description) |
---|
679 | |
---|
680 | |
---|
681 | @implementer(IFoolscapStorageServer) |
---|
682 | @attr.s(frozen=True) |
---|
683 | class _FoolscapStorage(object): |
---|
684 | """ |
---|
685 | Abstraction for connecting to a storage server exposed via Foolscap. |
---|
686 | """ |
---|
687 | nickname = attr.ib() |
---|
688 | permutation_seed = attr.ib() |
---|
689 | tubid = attr.ib() |
---|
690 | |
---|
691 | storage_server = attr.ib(validator=provides(IStorageServer)) |
---|
692 | |
---|
693 | _furl = attr.ib() |
---|
694 | _short_description = attr.ib() |
---|
695 | _long_description = attr.ib() |
---|
696 | |
---|
697 | |
---|
698 | @property |
---|
699 | def name(self): |
---|
700 | return self._short_description |
---|
701 | |
---|
702 | @property |
---|
703 | def longname(self): |
---|
704 | return self._long_description |
---|
705 | |
---|
706 | @property |
---|
707 | def lease_seed(self): |
---|
708 | return self.tubid |
---|
709 | |
---|
710 | @classmethod |
---|
711 | def from_announcement(cls, server_id, furl, ann, storage_server): |
---|
712 | """ |
---|
713 | Create an instance from a fURL and an announcement like:: |
---|
714 | |
---|
715 | {"permutation-seed-base32": "...", |
---|
716 | "nickname": "...", |
---|
717 | "grid-manager-certificates": [..], |
---|
718 | } |
---|
719 | |
---|
720 | *nickname* and *grid-manager-certificates* are optional. |
---|
721 | |
---|
722 | The furl will be a Unicode string on Python 3; on Python 2 it will be |
---|
723 | either a native (bytes) string or a Unicode string. |
---|
724 | """ |
---|
725 | (nickname, permutation_seed, tubid, short_description, long_description) = _parse_announcement(server_id, furl.encode("utf-8"), ann) |
---|
726 | return cls( |
---|
727 | nickname=nickname, |
---|
728 | permutation_seed=permutation_seed, |
---|
729 | tubid=tubid, |
---|
730 | storage_server=storage_server, |
---|
731 | furl=furl.encode("utf-8"), |
---|
732 | short_description=short_description, |
---|
733 | long_description=long_description, |
---|
734 | ) |
---|
735 | |
---|
736 | def connect_to(self, tub, got_connection): |
---|
737 | return tub.connectTo(self._furl, got_connection) |
---|
738 | |
---|
739 | |
---|
740 | @implementer(IFoolscapStorageServer) |
---|
741 | @define |
---|
742 | class _NullStorage(object): |
---|
743 | """ |
---|
744 | Abstraction for *not* communicating with a storage server of a type with |
---|
745 | which we can't communicate. |
---|
746 | """ |
---|
747 | nickname = "" |
---|
748 | permutation_seed = hashlib.sha256(b"").digest() |
---|
749 | tubid = hashlib.sha256(b"").digest() |
---|
750 | storage_server = None |
---|
751 | |
---|
752 | lease_seed = hashlib.sha256(b"").digest() |
---|
753 | |
---|
754 | name = "<unsupported>" |
---|
755 | longname: str = "<storage with unsupported protocol>" |
---|
756 | |
---|
757 | def connect_to(self, tub, got_connection): |
---|
758 | return NonReconnector() |
---|
759 | |
---|
760 | |
---|
761 | class NonReconnector(object): |
---|
762 | """ |
---|
763 | A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything. |
---|
764 | """ |
---|
765 | def stopConnecting(self): |
---|
766 | pass |
---|
767 | |
---|
768 | def reset(self): |
---|
769 | pass |
---|
770 | |
---|
771 | def getReconnectionInfo(self): |
---|
772 | return ReconnectionInfo() |
---|
773 | |
---|
774 | |
---|
775 | class AnnouncementNotMatched(Exception): |
---|
776 | """ |
---|
777 | A storage server announcement wasn't matched by any of the locally enabled |
---|
778 | plugins. |
---|
779 | """ |
---|
780 | |
---|
781 | |
---|
782 | @attr.s(auto_exc=True) |
---|
783 | class MissingPlugin(Exception): |
---|
784 | """ |
---|
785 | A particular plugin was requested but is missing |
---|
786 | """ |
---|
787 | |
---|
788 | plugin_name = attr.ib() |
---|
789 | |
---|
790 | def __str__(self): |
---|
791 | return "Missing plugin '{}'".format(self.plugin_name) |
---|
792 | |
---|
793 | |
---|
794 | def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref): |
---|
795 | """ |
---|
796 | Construct an ``IStorageServer`` from the most locally-preferred plugin |
---|
797 | that is offered in the given announcement. |
---|
798 | |
---|
799 | :param allmydata.node._Config node_config: The node configuration to |
---|
800 | pass to the plugin. |
---|
801 | |
---|
802 | :param dict announcement: The storage announcement for the storage |
---|
803 | server we should build |
---|
804 | """ |
---|
805 | storage_options = announcement.get(u"storage-options", []) |
---|
806 | plugins = config.get_configured_storage_plugins() |
---|
807 | |
---|
808 | # for every storage-option that we have enabled locally (in order |
---|
809 | # of preference), see if the announcement asks for such a thing. |
---|
810 | # if it does, great: we return that storage-client |
---|
811 | # otherwise we've run out of options... |
---|
812 | |
---|
813 | for options in storage_options: |
---|
814 | try: |
---|
815 | plugin = plugins[options[u"name"]] |
---|
816 | except KeyError: |
---|
817 | # we didn't configure this kind of plugin locally, so |
---|
818 | # consider the next announced option |
---|
819 | continue |
---|
820 | |
---|
821 | furl = options[u"storage-server-FURL"] |
---|
822 | return furl, plugin.get_storage_client( |
---|
823 | node_config, |
---|
824 | options, |
---|
825 | get_rref, |
---|
826 | ) |
---|
827 | |
---|
828 | # none of the storage options in the announcement are configured |
---|
829 | # locally; we can't make a storage-client. |
---|
830 | plugin_names = ", ".join(sorted(option["name"] for option in storage_options)) |
---|
831 | raise AnnouncementNotMatched(plugin_names) |
---|
832 | |
---|
833 | |
---|
834 | def _available_space_from_version(version): |
---|
835 | if version is None: |
---|
836 | return None |
---|
837 | protocol_v1_version = version.get(b'http://allmydata.org/tahoe/protocols/storage/v1', BytesKeyDict()) |
---|
838 | available_space = protocol_v1_version.get(b'available-space') |
---|
839 | if available_space is None: |
---|
840 | available_space = protocol_v1_version.get(b'maximum-immutable-share-size', None) |
---|
841 | return available_space |
---|
842 | |
---|
843 | |
---|
844 | def _make_storage_system( |
---|
845 | node_config: _Config, |
---|
846 | config: StorageClientConfig, |
---|
847 | ann: dict, |
---|
848 | server_id: bytes, |
---|
849 | get_rref: Callable[[], Optional[IRemoteReference]], |
---|
850 | ) -> IFoolscapStorageServer: |
---|
851 | """ |
---|
852 | Create an object for interacting with the storage server described by |
---|
853 | the given announcement. |
---|
854 | |
---|
855 | :param node_config: The node configuration to pass to any configured |
---|
856 | storage plugins. |
---|
857 | |
---|
858 | :param config: Configuration specifying desired storage client behavior. |
---|
859 | |
---|
860 | :param ann: The storage announcement from the storage server we are meant |
---|
861 | to communicate with. |
---|
862 | |
---|
863 | :param server_id: The unique identifier for the server. |
---|
864 | |
---|
865 | :param get_rref: A function which returns a remote reference to the |
---|
866 | server-side object which implements this storage system, if one is |
---|
867 | available (otherwise None). |
---|
868 | |
---|
869 | :return: An object enabling communication via Foolscap with the server |
---|
870 | which generated the announcement. |
---|
871 | """ |
---|
872 | unmatched = None |
---|
873 | # Try to match the announcement against a plugin. |
---|
874 | try: |
---|
875 | furl, storage_server = _storage_from_foolscap_plugin( |
---|
876 | node_config, |
---|
877 | config, |
---|
878 | ann, |
---|
879 | # Pass in an accessor for our _rref attribute. The value of |
---|
880 | # the attribute may change over time as connections are lost |
---|
881 | # and re-established. The _StorageServer should always be |
---|
882 | # able to get the most up-to-date value. |
---|
883 | get_rref, |
---|
884 | ) |
---|
885 | except AnnouncementNotMatched as e: |
---|
886 | # show a more-specific error to the user for this server |
---|
887 | # (Note this will only be shown if the server _doesn't_ offer |
---|
888 | # anonymous service, which will match below) |
---|
889 | unmatched = _NullStorage('{}: missing plugin "{}"'.format(server_id.decode("utf8"), str(e))) |
---|
890 | else: |
---|
891 | return _FoolscapStorage.from_announcement( |
---|
892 | server_id, |
---|
893 | furl, |
---|
894 | ann, |
---|
895 | storage_server, |
---|
896 | ) |
---|
897 | |
---|
898 | # Try to match the announcement against the anonymous access scheme. |
---|
899 | try: |
---|
900 | furl = ann[u"anonymous-storage-FURL"] |
---|
901 | except KeyError: |
---|
902 | # Nope |
---|
903 | pass |
---|
904 | else: |
---|
905 | # See comment above for the _storage_from_foolscap_plugin case |
---|
906 | # about passing in get_rref. |
---|
907 | storage_server = _StorageServer(get_rref=get_rref) |
---|
908 | return _FoolscapStorage.from_announcement( |
---|
909 | server_id, |
---|
910 | furl, |
---|
911 | ann, |
---|
912 | storage_server, |
---|
913 | ) |
---|
914 | |
---|
915 | # Nothing matched so we can't talk to this server. (There should |
---|
916 | # not be a way to get here without this local being valid) |
---|
917 | assert unmatched is not None, "Expected unmatched plugin error" |
---|
918 | return unmatched |
---|
919 | |
---|
920 | |
---|
921 | @implementer(IServer) |
---|
922 | class NativeStorageServer(service.MultiService): |
---|
923 | """I hold information about a storage server that we want to connect to. |
---|
924 | If we are connected, I hold the RemoteReference, their host address, and |
---|
925 | the their version information. I remember information about when we were |
---|
926 | last connected too, even if we aren't currently connected. |
---|
927 | |
---|
928 | @ivar last_connect_time: when we last established a connection |
---|
929 | @ivar last_loss_time: when we last lost a connection |
---|
930 | |
---|
931 | @ivar version: the server's versiondict, from the most recent announcement |
---|
932 | @ivar nickname: the server's self-reported nickname (unicode), same |
---|
933 | |
---|
934 | @ivar rref: the RemoteReference, if connected, otherwise None |
---|
935 | """ |
---|
936 | |
---|
937 | VERSION_DEFAULTS = UnicodeKeyDict({ |
---|
938 | "http://allmydata.org/tahoe/protocols/storage/v1" : |
---|
939 | UnicodeKeyDict({ "maximum-immutable-share-size": 2**32 - 1, |
---|
940 | "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2 |
---|
941 | "tolerates-immutable-read-overrun": False, |
---|
942 | "delete-mutable-shares-with-zero-length-writev": False, |
---|
943 | "available-space": None, |
---|
944 | }), |
---|
945 | "application-version": "unknown: no get_version()", |
---|
946 | }) |
---|
947 | |
---|
948 | def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=None, |
---|
949 | grid_manager_verifier=None): |
---|
950 | service.MultiService.__init__(self) |
---|
951 | assert isinstance(server_id, bytes) |
---|
952 | self._server_id = server_id |
---|
953 | self.announcement = ann |
---|
954 | self._tub_maker = tub_maker |
---|
955 | self._handler_overrides = handler_overrides |
---|
956 | |
---|
957 | if config is None: |
---|
958 | config = StorageClientConfig() |
---|
959 | |
---|
960 | self._grid_manager_verifier = grid_manager_verifier |
---|
961 | |
---|
962 | self._storage = _make_storage_system(node_config, config, ann, self._server_id, self.get_rref) |
---|
963 | |
---|
964 | self.last_connect_time = None |
---|
965 | self.last_loss_time = None |
---|
966 | self._rref = None |
---|
967 | self._is_connected = False |
---|
968 | self._reconnector = None |
---|
969 | self._trigger_cb = None |
---|
970 | self._on_status_changed = ObserverList() |
---|
971 | |
---|
972 | def upload_permitted(self): |
---|
973 | """ |
---|
974 | If our client is configured with Grid Manager public-keys, we will |
---|
975 | only upload to storage servers that have a currently-valid |
---|
976 | certificate signed by at least one of the Grid Managers we |
---|
977 | accept. |
---|
978 | |
---|
979 | :return: True if we should use this server for uploads, False |
---|
980 | otherwise. |
---|
981 | """ |
---|
982 | # if we have no Grid Manager keys configured, choice is easy |
---|
983 | if self._grid_manager_verifier is None: |
---|
984 | return True |
---|
985 | return self._grid_manager_verifier() |
---|
986 | |
---|
987 | def get_permutation_seed(self): |
---|
988 | return self._storage.permutation_seed |
---|
989 | def get_name(self): # keep methodname short |
---|
990 | # TODO: decide who adds [] in the short description. It should |
---|
991 | # probably be the output side, not here. |
---|
992 | return self._storage.name |
---|
993 | def get_longname(self): |
---|
994 | return self._storage.longname |
---|
995 | def get_tubid(self): |
---|
996 | return self._storage.tubid |
---|
997 | def get_lease_seed(self): |
---|
998 | return self._storage.lease_seed |
---|
999 | def get_foolscap_write_enabler_seed(self): |
---|
1000 | return self._storage.tubid |
---|
1001 | def get_nickname(self): |
---|
1002 | return self._storage.nickname |
---|
1003 | |
---|
1004 | def on_status_changed(self, status_changed): |
---|
1005 | """ |
---|
1006 | :param status_changed: a callable taking a single arg (the |
---|
1007 | NativeStorageServer) that is notified when we become connected |
---|
1008 | """ |
---|
1009 | return self._on_status_changed.subscribe(status_changed) |
---|
1010 | |
---|
1011 | # Special methods used by copy.copy() and copy.deepcopy(). When those are |
---|
1012 | # used in allmydata.immutable.filenode to copy CheckResults during |
---|
1013 | # repair, we want it to treat the IServer instances as singletons, and |
---|
1014 | # not attempt to duplicate them.. |
---|
1015 | def __copy__(self): |
---|
1016 | return self |
---|
1017 | def __deepcopy__(self, memodict): |
---|
1018 | return self |
---|
1019 | |
---|
1020 | def __repr__(self): |
---|
1021 | return "<NativeStorageServer for %r>" % self.get_name() |
---|
1022 | def get_serverid(self): |
---|
1023 | return self._server_id |
---|
1024 | def get_version(self): |
---|
1025 | if self._rref: |
---|
1026 | return self._rref.version |
---|
1027 | return None |
---|
1028 | def get_announcement(self): |
---|
1029 | return self.announcement |
---|
1030 | |
---|
1031 | def get_connection_status(self): |
---|
1032 | last_received = None |
---|
1033 | if self._rref: |
---|
1034 | last_received = self._rref.getDataLastReceivedAt() |
---|
1035 | return connection_status.from_foolscap_reconnector(self._reconnector, |
---|
1036 | last_received) |
---|
1037 | |
---|
1038 | def is_connected(self): |
---|
1039 | return self._is_connected |
---|
1040 | |
---|
1041 | def get_available_space(self): |
---|
1042 | version = self.get_version() |
---|
1043 | return _available_space_from_version(version) |
---|
1044 | |
---|
1045 | def start_connecting(self, trigger_cb): |
---|
1046 | self._tub = self._tub_maker(self._handler_overrides) |
---|
1047 | self._tub.setServiceParent(self) |
---|
1048 | self._trigger_cb = trigger_cb |
---|
1049 | self._reconnector = self._storage.connect_to(self._tub, self._got_connection) |
---|
1050 | |
---|
1051 | def _got_connection(self, rref): |
---|
1052 | lp = log.msg(format="got connection to %(name)s, getting versions", |
---|
1053 | name=self.get_name(), |
---|
1054 | facility="tahoe.storage_broker", umid="coUECQ") |
---|
1055 | if self._trigger_cb: |
---|
1056 | eventually(self._trigger_cb) |
---|
1057 | default = self.VERSION_DEFAULTS |
---|
1058 | d = add_version_to_remote_reference(rref, default) |
---|
1059 | d.addCallback(self._got_versioned_service, lp) |
---|
1060 | d.addCallback(lambda ign: self._on_status_changed.notify(self)) |
---|
1061 | d.addErrback(log.err, format="storageclient._got_connection", |
---|
1062 | name=self.get_name(), umid="Sdq3pg") |
---|
1063 | |
---|
1064 | def _got_versioned_service(self, rref, lp): |
---|
1065 | log.msg(format="%(name)s provided version info %(version)s", |
---|
1066 | name=self.get_name(), version=rref.version, |
---|
1067 | facility="tahoe.storage_broker", umid="SWmJYg", |
---|
1068 | level=log.NOISY, parent=lp) |
---|
1069 | |
---|
1070 | self.last_connect_time = time.time() |
---|
1071 | self._rref = rref |
---|
1072 | self._is_connected = True |
---|
1073 | rref.notifyOnDisconnect(self._lost) |
---|
1074 | |
---|
1075 | def get_rref(self): |
---|
1076 | return self._rref |
---|
1077 | |
---|
1078 | def get_storage_server(self): |
---|
1079 | """ |
---|
1080 | See ``IServer.get_storage_server``. |
---|
1081 | """ |
---|
1082 | if self._rref is None: |
---|
1083 | return None |
---|
1084 | return self._storage.storage_server |
---|
1085 | |
---|
1086 | def _lost(self): |
---|
1087 | log.msg(format="lost connection to %(name)s", name=self.get_name(), |
---|
1088 | facility="tahoe.storage_broker", umid="zbRllw") |
---|
1089 | self.last_loss_time = time.time() |
---|
1090 | # self._rref is now stale: all callRemote()s will get a |
---|
1091 | # DeadReferenceError. We leave the stale reference in place so that |
---|
1092 | # uploader/downloader code (which received this IServer through |
---|
1093 | # get_connected_servers() or get_servers_for_psi()) can continue to |
---|
1094 | # use s.get_rref().callRemote() and not worry about it being None. |
---|
1095 | self._is_connected = False |
---|
1096 | |
---|
1097 | def stop_connecting(self): |
---|
1098 | # used when this descriptor has been superceded by another |
---|
1099 | self._reconnector.stopConnecting() |
---|
1100 | |
---|
1101 | def try_to_connect(self): |
---|
1102 | # used when the broker wants us to hurry up |
---|
1103 | self._reconnector.reset() |
---|
1104 | |
---|
1105 | |
---|
1106 | @async_to_deferred |
---|
1107 | async def _pick_a_http_server( |
---|
1108 | reactor, |
---|
1109 | nurls: list[DecodedURL], |
---|
1110 | request: Callable[[object, DecodedURL], defer.Deferred[object]] |
---|
1111 | ) -> DecodedURL: |
---|
1112 | """Pick the first server we successfully send a request to. |
---|
1113 | |
---|
1114 | Fires with ``None`` if no server was found, or with the ``DecodedURL`` of |
---|
1115 | the first successfully-connected server. |
---|
1116 | """ |
---|
1117 | requests = [] |
---|
1118 | for nurl in nurls: |
---|
1119 | def to_nurl(_: object, nurl: DecodedURL=nurl) -> DecodedURL: |
---|
1120 | return nurl |
---|
1121 | |
---|
1122 | requests.append(request(reactor, nurl).addCallback(to_nurl)) |
---|
1123 | |
---|
1124 | queries: defer.Deferred[tuple[int, DecodedURL]] = race(requests) |
---|
1125 | _, nurl = await queries |
---|
1126 | return nurl |
---|
1127 | |
---|
1128 | |
---|
1129 | @implementer(IServer) |
---|
1130 | class HTTPNativeStorageServer(service.MultiService): |
---|
1131 | """ |
---|
1132 | Like ``NativeStorageServer``, but for HTTP clients. |
---|
1133 | |
---|
1134 | The notion of being "connected" is less meaningful for HTTP; we just poll |
---|
1135 | occasionally, and if we've succeeded at last poll, we assume we're |
---|
1136 | "connected". |
---|
1137 | """ |
---|
1138 | |
---|
1139 | def __init__(self, server_id: bytes, announcement, default_connection_handlers: dict[str,str], reactor=reactor, grid_manager_verifier=None, tor_provider: Optional[TorProvider]=None): |
---|
1140 | service.MultiService.__init__(self) |
---|
1141 | assert isinstance(server_id, bytes) |
---|
1142 | self._server_id = server_id |
---|
1143 | self.announcement = announcement |
---|
1144 | self._on_status_changed = ObserverList() |
---|
1145 | self._reactor = reactor |
---|
1146 | self._grid_manager_verifier = grid_manager_verifier |
---|
1147 | self._storage_client_factory = StorageClientFactory( |
---|
1148 | default_connection_handlers, tor_provider |
---|
1149 | ) |
---|
1150 | |
---|
1151 | furl = announcement["anonymous-storage-FURL"].encode("utf-8") |
---|
1152 | ( |
---|
1153 | self._nickname, |
---|
1154 | self._permutation_seed, |
---|
1155 | self._tubid, |
---|
1156 | self._short_description, |
---|
1157 | self._long_description |
---|
1158 | ) = _parse_announcement(server_id, furl, announcement) |
---|
1159 | self._nurls = [ |
---|
1160 | DecodedURL.from_text(u) |
---|
1161 | for u in announcement[ANONYMOUS_STORAGE_NURLS] |
---|
1162 | ] |
---|
1163 | self._istorage_server : Optional[_HTTPStorageServer] = None |
---|
1164 | |
---|
1165 | self._connection_status = connection_status.ConnectionStatus.unstarted() |
---|
1166 | self._version = None |
---|
1167 | self._last_connect_time = None |
---|
1168 | self._connecting_deferred : Optional[defer.Deferred[object]]= None |
---|
1169 | |
---|
1170 | def get_permutation_seed(self): |
---|
1171 | return self._permutation_seed |
---|
1172 | |
---|
1173 | def get_name(self): |
---|
1174 | return self._short_description |
---|
1175 | |
---|
1176 | def get_longname(self): |
---|
1177 | return self._long_description |
---|
1178 | |
---|
1179 | def get_tubid(self): |
---|
1180 | return self._tubid |
---|
1181 | |
---|
1182 | def get_lease_seed(self): |
---|
1183 | # Apparently this is what Foolscap version above does?! |
---|
1184 | return self._tubid |
---|
1185 | |
---|
1186 | def get_foolscap_write_enabler_seed(self): |
---|
1187 | return self._tubid |
---|
1188 | |
---|
1189 | def get_nickname(self): |
---|
1190 | return self._nickname |
---|
1191 | |
---|
1192 | def on_status_changed(self, status_changed): |
---|
1193 | """ |
---|
1194 | :param status_changed: a callable taking a single arg (the |
---|
1195 | NativeStorageServer) that is notified when we become connected |
---|
1196 | """ |
---|
1197 | return self._on_status_changed.subscribe(status_changed) |
---|
1198 | |
---|
1199 | def upload_permitted(self): |
---|
1200 | """ |
---|
1201 | If our client is configured with Grid Manager public-keys, we will |
---|
1202 | only upload to storage servers that have a currently-valid |
---|
1203 | certificate signed by at least one of the Grid Managers we |
---|
1204 | accept. |
---|
1205 | |
---|
1206 | :return: True if we should use this server for uploads, False |
---|
1207 | otherwise. |
---|
1208 | """ |
---|
1209 | # if we have no Grid Manager keys configured, choice is easy |
---|
1210 | if self._grid_manager_verifier is None: |
---|
1211 | return True |
---|
1212 | return self._grid_manager_verifier() |
---|
1213 | |
---|
1214 | # Special methods used by copy.copy() and copy.deepcopy(). When those are |
---|
1215 | # used in allmydata.immutable.filenode to copy CheckResults during |
---|
1216 | # repair, we want it to treat the IServer instances as singletons, and |
---|
1217 | # not attempt to duplicate them.. |
---|
1218 | def __copy__(self): |
---|
1219 | return self |
---|
1220 | |
---|
1221 | def __deepcopy__(self, memodict): |
---|
1222 | return self |
---|
1223 | |
---|
1224 | def __repr__(self): |
---|
1225 | return "<HTTPNativeStorageServer for %r>" % self.get_name() |
---|
1226 | |
---|
1227 | def get_serverid(self): |
---|
1228 | return self._server_id |
---|
1229 | |
---|
1230 | def get_version(self): |
---|
1231 | return self._version |
---|
1232 | |
---|
1233 | def get_announcement(self): |
---|
1234 | return self.announcement |
---|
1235 | |
---|
1236 | def get_connection_status(self): |
---|
1237 | return self._connection_status |
---|
1238 | |
---|
1239 | def is_connected(self): |
---|
1240 | return self._connection_status.connected |
---|
1241 | |
---|
1242 | def get_available_space(self): |
---|
1243 | version = self.get_version() |
---|
1244 | return _available_space_from_version(version) |
---|
1245 | |
---|
1246 | def start_connecting(self, trigger_cb): |
---|
1247 | self._lc = LoopingCall(self._connect) |
---|
1248 | self._lc.start(1, True) |
---|
1249 | |
---|
1250 | def _got_version(self, version): |
---|
1251 | self._last_connect_time = time.time() |
---|
1252 | self._version = version |
---|
1253 | self._connection_status = connection_status.ConnectionStatus( |
---|
1254 | True, "connected", [], self._last_connect_time, self._last_connect_time |
---|
1255 | ) |
---|
1256 | self._on_status_changed.notify(self) |
---|
1257 | |
---|
1258 | def _failed_to_connect(self, reason): |
---|
1259 | self._connection_status = connection_status.ConnectionStatus( |
---|
1260 | False, f"failure: {reason}", [], self._last_connect_time, self._last_connect_time |
---|
1261 | ) |
---|
1262 | self._on_status_changed.notify(self) |
---|
1263 | |
---|
1264 | def get_storage_server(self): |
---|
1265 | """ |
---|
1266 | See ``IServer.get_storage_server``. |
---|
1267 | """ |
---|
1268 | if self._connection_status.summary == "unstarted": |
---|
1269 | return None |
---|
1270 | return self._istorage_server |
---|
1271 | |
---|
1272 | def stop_connecting(self): |
---|
1273 | self._lc.stop() |
---|
1274 | if self._connecting_deferred is not None: |
---|
1275 | self._connecting_deferred.cancel() |
---|
1276 | |
---|
1277 | def try_to_connect(self): |
---|
1278 | self._connect() |
---|
1279 | |
---|
1280 | def _connect(self) -> defer.Deferred[object]: |
---|
1281 | """ |
---|
1282 | Try to connect to a working storage server. |
---|
1283 | |
---|
1284 | If called while a previous ``_connect()`` is already running, it will |
---|
1285 | just return the same ``Deferred``. |
---|
1286 | |
---|
1287 | ``LoopingCall.stop()`` doesn't cancel ``Deferred``s, unfortunately: |
---|
1288 | https://github.com/twisted/twisted/issues/11814. Thus we want to store |
---|
1289 | the ``Deferred`` so we can cancel it when necessary. |
---|
1290 | |
---|
1291 | We also want to return it so that loop iterations take it into account, |
---|
1292 | and a new iteration doesn't start while we're in the middle of the |
---|
1293 | previous one. |
---|
1294 | """ |
---|
1295 | # Conceivably try_to_connect() was called on this before, in which case |
---|
1296 | # we already are in the middle of connecting. So in that case just |
---|
1297 | # return whatever is in progress: |
---|
1298 | if self._connecting_deferred is not None: |
---|
1299 | return self._connecting_deferred |
---|
1300 | |
---|
1301 | def done(_): |
---|
1302 | self._connecting_deferred = None |
---|
1303 | |
---|
1304 | connecting = self._pick_server_and_get_version() |
---|
1305 | # Set a short timeout since we're relying on this for server liveness. |
---|
1306 | connecting = connecting.addTimeout(5, self._reactor).addCallbacks( |
---|
1307 | self._got_version, self._failed_to_connect |
---|
1308 | ).addBoth(done) |
---|
1309 | self._connecting_deferred = connecting |
---|
1310 | return connecting |
---|
1311 | |
---|
1312 | @async_to_deferred |
---|
1313 | async def _pick_server_and_get_version(self): |
---|
1314 | """ |
---|
1315 | Minimal implementation of connection logic: pick a server, get its |
---|
1316 | version. This doesn't deal with errors much, so as to minimize |
---|
1317 | statefulness. It does change ``self._istorage_server``, so possibly |
---|
1318 | more refactoring would be useful to remove even that much statefulness. |
---|
1319 | """ |
---|
1320 | async def get_istorage_server() -> _HTTPStorageServer: |
---|
1321 | if self._istorage_server is not None: |
---|
1322 | return self._istorage_server |
---|
1323 | |
---|
1324 | # We haven't selected a server yet, so let's do so. |
---|
1325 | |
---|
1326 | # TODO This is somewhat inefficient on startup: it takes two successful |
---|
1327 | # version() calls before we are live talking to a server, it could only |
---|
1328 | # be one. See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3992 |
---|
1329 | |
---|
1330 | @async_to_deferred |
---|
1331 | async def request(reactor, nurl: DecodedURL): |
---|
1332 | # Since we're just using this one off to check if the NURL |
---|
1333 | # works, no need for persistent pool or other fanciness. |
---|
1334 | pool = HTTPConnectionPool(reactor, persistent=False) |
---|
1335 | pool.retryAutomatically = False |
---|
1336 | storage_client = await self._storage_client_factory.create_storage_client( |
---|
1337 | nurl, reactor, pool |
---|
1338 | ) |
---|
1339 | return await StorageClientGeneral(storage_client).get_version() |
---|
1340 | |
---|
1341 | nurl = await _pick_a_http_server(reactor, self._nurls, request) |
---|
1342 | |
---|
1343 | # If we've gotten this far, we've found a working NURL. |
---|
1344 | storage_client = await self._storage_client_factory.create_storage_client( |
---|
1345 | nurl, cast(IReactorTime, reactor), None |
---|
1346 | ) |
---|
1347 | self._istorage_server = _HTTPStorageServer.from_http_client(storage_client) |
---|
1348 | return self._istorage_server |
---|
1349 | |
---|
1350 | try: |
---|
1351 | storage_server = await get_istorage_server() |
---|
1352 | |
---|
1353 | # Get the version from the remote server. |
---|
1354 | version = await storage_server.get_version() |
---|
1355 | return version |
---|
1356 | except Exception as e: |
---|
1357 | log.msg(f"Failed to connect to a HTTP storage server: {e}", level=log.CURIOUS) |
---|
1358 | raise |
---|
1359 | |
---|
1360 | def stopService(self): |
---|
1361 | if self._connecting_deferred is not None: |
---|
1362 | self._connecting_deferred.cancel() |
---|
1363 | |
---|
1364 | result = service.MultiService.stopService(self) |
---|
1365 | if self._lc.running: |
---|
1366 | self._lc.stop() |
---|
1367 | self._failed_to_connect("shut down") |
---|
1368 | |
---|
1369 | if self._istorage_server is not None: |
---|
1370 | client_shutting_down = self._istorage_server._http_client.shutdown() |
---|
1371 | result.addCallback(lambda _: client_shutting_down) |
---|
1372 | |
---|
1373 | return result |
---|
1374 | |
---|
1375 | |
---|
1376 | class UnknownServerTypeError(Exception): |
---|
1377 | pass |
---|
1378 | |
---|
1379 | |
---|
1380 | @implementer(IStorageServer) |
---|
1381 | @attr.s |
---|
1382 | class _StorageServer(object): |
---|
1383 | """ |
---|
1384 | ``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via |
---|
1385 | a ``RemoteReference``. |
---|
1386 | """ |
---|
1387 | _get_rref = attr.ib() |
---|
1388 | |
---|
1389 | @property |
---|
1390 | def _rref(self): |
---|
1391 | return self._get_rref() |
---|
1392 | |
---|
1393 | def get_version(self): |
---|
1394 | return self._rref.callRemote( |
---|
1395 | "get_version", |
---|
1396 | ) |
---|
1397 | |
---|
1398 | def allocate_buckets( |
---|
1399 | self, |
---|
1400 | storage_index, |
---|
1401 | renew_secret, |
---|
1402 | cancel_secret, |
---|
1403 | sharenums, |
---|
1404 | allocated_size, |
---|
1405 | canary, |
---|
1406 | ): |
---|
1407 | return self._rref.callRemote( |
---|
1408 | "allocate_buckets", |
---|
1409 | storage_index, |
---|
1410 | renew_secret, |
---|
1411 | cancel_secret, |
---|
1412 | sharenums, |
---|
1413 | allocated_size, |
---|
1414 | canary, |
---|
1415 | ) |
---|
1416 | |
---|
1417 | def add_lease( |
---|
1418 | self, |
---|
1419 | storage_index, |
---|
1420 | renew_secret, |
---|
1421 | cancel_secret, |
---|
1422 | ): |
---|
1423 | return self._rref.callRemote( |
---|
1424 | "add_lease", |
---|
1425 | storage_index, |
---|
1426 | renew_secret, |
---|
1427 | cancel_secret, |
---|
1428 | ) |
---|
1429 | |
---|
1430 | def get_buckets( |
---|
1431 | self, |
---|
1432 | storage_index, |
---|
1433 | ): |
---|
1434 | return self._rref.callRemote( |
---|
1435 | "get_buckets", |
---|
1436 | storage_index, |
---|
1437 | ) |
---|
1438 | |
---|
1439 | def slot_readv( |
---|
1440 | self, |
---|
1441 | storage_index, |
---|
1442 | shares, |
---|
1443 | readv, |
---|
1444 | ): |
---|
1445 | return self._rref.callRemote( |
---|
1446 | "slot_readv", |
---|
1447 | storage_index, |
---|
1448 | shares, |
---|
1449 | readv, |
---|
1450 | ) |
---|
1451 | |
---|
1452 | def slot_testv_and_readv_and_writev( |
---|
1453 | self, |
---|
1454 | storage_index, |
---|
1455 | secrets, |
---|
1456 | tw_vectors, |
---|
1457 | r_vector, |
---|
1458 | ): |
---|
1459 | # Match the wire protocol, which requires 4-tuples for test vectors. |
---|
1460 | wire_format_tw_vectors = { |
---|
1461 | key: ( |
---|
1462 | [(start, length, b"eq", data) for (start, length, data) in value[0]], |
---|
1463 | value[1], |
---|
1464 | value[2], |
---|
1465 | ) for (key, value) in tw_vectors.items() |
---|
1466 | } |
---|
1467 | return self._rref.callRemote( |
---|
1468 | "slot_testv_and_readv_and_writev", |
---|
1469 | storage_index, |
---|
1470 | secrets, |
---|
1471 | wire_format_tw_vectors, |
---|
1472 | r_vector, |
---|
1473 | ) |
---|
1474 | |
---|
1475 | def advise_corrupt_share( |
---|
1476 | self, |
---|
1477 | share_type, |
---|
1478 | storage_index, |
---|
1479 | shnum, |
---|
1480 | reason, |
---|
1481 | ): |
---|
1482 | return self._rref.callRemote( |
---|
1483 | "advise_corrupt_share", |
---|
1484 | share_type, |
---|
1485 | storage_index, |
---|
1486 | shnum, |
---|
1487 | reason, |
---|
1488 | ).addErrback(log.err, "Error from remote call to advise_corrupt_share") |
---|
1489 | |
---|
1490 | |
---|
1491 | |
---|
1492 | @attr.s(hash=True) |
---|
1493 | class _FakeRemoteReference(object): |
---|
1494 | """ |
---|
1495 | Emulate a Foolscap RemoteReference, calling a local object instead. |
---|
1496 | """ |
---|
1497 | local_object = attr.ib(type=object) |
---|
1498 | |
---|
1499 | @defer.inlineCallbacks |
---|
1500 | def callRemote(self, action, *args, **kwargs): |
---|
1501 | try: |
---|
1502 | result = yield getattr(self.local_object, action)(*args, **kwargs) |
---|
1503 | defer.returnValue(result) |
---|
1504 | except HTTPClientException as e: |
---|
1505 | raise RemoteException((e.code, e.message, e.body)) |
---|
1506 | |
---|
1507 | |
---|
1508 | @attr.s |
---|
1509 | class _HTTPBucketWriter(object): |
---|
1510 | """ |
---|
1511 | Emulate a ``RIBucketWriter``, but use HTTP protocol underneath. |
---|
1512 | """ |
---|
1513 | client = attr.ib(type=StorageClientImmutables) |
---|
1514 | storage_index = attr.ib(type=bytes) |
---|
1515 | share_number = attr.ib(type=int) |
---|
1516 | upload_secret = attr.ib(type=bytes) |
---|
1517 | finished = attr.ib(type=defer.Deferred[bool], factory=defer.Deferred) |
---|
1518 | |
---|
1519 | def abort(self): |
---|
1520 | return self.client.abort_upload(self.storage_index, self.share_number, |
---|
1521 | self.upload_secret) |
---|
1522 | |
---|
1523 | @defer.inlineCallbacks |
---|
1524 | def write(self, offset, data): |
---|
1525 | result = yield self.client.write_share_chunk( |
---|
1526 | self.storage_index, self.share_number, self.upload_secret, offset, data |
---|
1527 | ) |
---|
1528 | if result.finished: |
---|
1529 | self.finished.callback(True) |
---|
1530 | defer.returnValue(None) |
---|
1531 | |
---|
1532 | def close(self): |
---|
1533 | # We're not _really_ closed until all writes have succeeded and we |
---|
1534 | # finished writing all the data. |
---|
1535 | return self.finished |
---|
1536 | |
---|
1537 | |
---|
1538 | def _ignore_404(failure: Failure) -> Optional[Failure]: |
---|
1539 | """ |
---|
1540 | Useful for advise_corrupt_share(), since it swallows unknown share numbers |
---|
1541 | in Foolscap. |
---|
1542 | """ |
---|
1543 | if failure.check(HTTPClientException) and failure.value.code == http.NOT_FOUND: |
---|
1544 | return None |
---|
1545 | else: |
---|
1546 | return failure |
---|
1547 | |
---|
1548 | |
---|
1549 | @attr.s(hash=True) |
---|
1550 | class _HTTPBucketReader(object): |
---|
1551 | """ |
---|
1552 | Emulate a ``RIBucketReader``, but use HTTP protocol underneath. |
---|
1553 | """ |
---|
1554 | client = attr.ib(type=StorageClientImmutables) |
---|
1555 | storage_index = attr.ib(type=bytes) |
---|
1556 | share_number = attr.ib(type=int) |
---|
1557 | |
---|
1558 | def read(self, offset, length): |
---|
1559 | return self.client.read_share_chunk( |
---|
1560 | self.storage_index, self.share_number, offset, length |
---|
1561 | ) |
---|
1562 | |
---|
1563 | def advise_corrupt_share(self, reason): |
---|
1564 | return self.client.advise_corrupt_share( |
---|
1565 | self.storage_index, self.share_number, |
---|
1566 | str(reason, "utf-8", errors="backslashreplace") |
---|
1567 | ).addErrback(_ignore_404) |
---|
1568 | |
---|
1569 | |
---|
1570 | # WORK IN PROGRESS, for now it doesn't actually implement whole thing. |
---|
1571 | @implementer(IStorageServer) # type: ignore |
---|
1572 | @attr.s |
---|
1573 | class _HTTPStorageServer(object): |
---|
1574 | """ |
---|
1575 | Talk to remote storage server over HTTP. |
---|
1576 | """ |
---|
1577 | _http_client = attr.ib(type=StorageClient) |
---|
1578 | |
---|
1579 | @staticmethod |
---|
1580 | def from_http_client(http_client: StorageClient) -> _HTTPStorageServer: |
---|
1581 | """ |
---|
1582 | Create an ``IStorageServer`` from a HTTP ``StorageClient``. |
---|
1583 | """ |
---|
1584 | return _HTTPStorageServer(http_client=http_client) |
---|
1585 | |
---|
1586 | def get_version(self) -> defer.Deferred[VersionMessage]: |
---|
1587 | return StorageClientGeneral(self._http_client).get_version() |
---|
1588 | |
---|
1589 | @defer.inlineCallbacks |
---|
1590 | def allocate_buckets( |
---|
1591 | self, |
---|
1592 | storage_index, |
---|
1593 | renew_secret, |
---|
1594 | cancel_secret, |
---|
1595 | sharenums, |
---|
1596 | allocated_size, |
---|
1597 | canary |
---|
1598 | ): |
---|
1599 | upload_secret = urandom(20) |
---|
1600 | immutable_client = StorageClientImmutables(self._http_client) |
---|
1601 | result = immutable_client.create( |
---|
1602 | storage_index, sharenums, allocated_size, upload_secret, renew_secret, |
---|
1603 | cancel_secret |
---|
1604 | ) |
---|
1605 | result = yield result |
---|
1606 | defer.returnValue( |
---|
1607 | (result.already_have, { |
---|
1608 | share_num: _FakeRemoteReference(_HTTPBucketWriter( |
---|
1609 | client=immutable_client, |
---|
1610 | storage_index=storage_index, |
---|
1611 | share_number=share_num, |
---|
1612 | upload_secret=upload_secret |
---|
1613 | )) |
---|
1614 | for share_num in result.allocated |
---|
1615 | }) |
---|
1616 | ) |
---|
1617 | |
---|
1618 | @defer.inlineCallbacks |
---|
1619 | def get_buckets( |
---|
1620 | self, |
---|
1621 | storage_index |
---|
1622 | ): |
---|
1623 | immutable_client = StorageClientImmutables(self._http_client) |
---|
1624 | share_numbers = yield immutable_client.list_shares( |
---|
1625 | storage_index |
---|
1626 | ) |
---|
1627 | defer.returnValue({ |
---|
1628 | share_num: _FakeRemoteReference(_HTTPBucketReader( |
---|
1629 | immutable_client, storage_index, share_num |
---|
1630 | )) |
---|
1631 | for share_num in share_numbers |
---|
1632 | }) |
---|
1633 | |
---|
1634 | @async_to_deferred |
---|
1635 | async def add_lease( |
---|
1636 | self, |
---|
1637 | storage_index, |
---|
1638 | renew_secret, |
---|
1639 | cancel_secret |
---|
1640 | ): |
---|
1641 | client = StorageClientGeneral(self._http_client) |
---|
1642 | try: |
---|
1643 | await client.add_or_renew_lease( |
---|
1644 | storage_index, renew_secret, cancel_secret |
---|
1645 | ) |
---|
1646 | except ClientException as e: |
---|
1647 | if e.code == http.NOT_FOUND: |
---|
1648 | # Silently do nothing, as is the case for the Foolscap client |
---|
1649 | return |
---|
1650 | raise |
---|
1651 | |
---|
1652 | def advise_corrupt_share( |
---|
1653 | self, |
---|
1654 | share_type, |
---|
1655 | storage_index, |
---|
1656 | shnum, |
---|
1657 | reason: bytes |
---|
1658 | ): |
---|
1659 | if share_type == b"immutable": |
---|
1660 | client : Union[StorageClientImmutables, StorageClientMutables] = StorageClientImmutables(self._http_client) |
---|
1661 | elif share_type == b"mutable": |
---|
1662 | client = StorageClientMutables(self._http_client) |
---|
1663 | else: |
---|
1664 | raise ValueError("Unknown share type") |
---|
1665 | return client.advise_corrupt_share( |
---|
1666 | storage_index, shnum, str(reason, "utf-8", errors="backslashreplace") |
---|
1667 | ).addErrback(_ignore_404) |
---|
1668 | |
---|
1669 | @defer.inlineCallbacks |
---|
1670 | def slot_readv(self, storage_index, shares, readv): |
---|
1671 | mutable_client = StorageClientMutables(self._http_client) |
---|
1672 | pending_reads = {} |
---|
1673 | reads = {} |
---|
1674 | # If shares list is empty, that means list all shares, so we need |
---|
1675 | # to do a query to get that. |
---|
1676 | if not shares: |
---|
1677 | shares = yield mutable_client.list_shares(storage_index) |
---|
1678 | |
---|
1679 | # Start all the queries in parallel: |
---|
1680 | for share_number in shares: |
---|
1681 | share_reads = defer.gatherResults( |
---|
1682 | [ |
---|
1683 | mutable_client.read_share_chunk( |
---|
1684 | storage_index, share_number, offset, length |
---|
1685 | ) |
---|
1686 | for (offset, length) in readv |
---|
1687 | ] |
---|
1688 | ) |
---|
1689 | pending_reads[share_number] = share_reads |
---|
1690 | |
---|
1691 | # Wait for all the queries to finish: |
---|
1692 | for share_number, pending_result in pending_reads.items(): |
---|
1693 | reads[share_number] = yield pending_result |
---|
1694 | |
---|
1695 | return reads |
---|
1696 | |
---|
1697 | @defer.inlineCallbacks |
---|
1698 | def slot_testv_and_readv_and_writev( |
---|
1699 | self, |
---|
1700 | storage_index, |
---|
1701 | secrets, |
---|
1702 | tw_vectors, |
---|
1703 | r_vector, |
---|
1704 | ): |
---|
1705 | mutable_client = StorageClientMutables(self._http_client) |
---|
1706 | we_secret, lr_secret, lc_secret = secrets |
---|
1707 | client_tw_vectors = {} |
---|
1708 | for share_num, (test_vector, data_vector, new_length) in tw_vectors.items(): |
---|
1709 | client_test_vectors = [ |
---|
1710 | TestVector(offset=offset, size=size, specimen=specimen) |
---|
1711 | for (offset, size, specimen) in test_vector |
---|
1712 | ] |
---|
1713 | client_write_vectors = [ |
---|
1714 | WriteVector(offset=offset, data=data) for (offset, data) in data_vector |
---|
1715 | ] |
---|
1716 | client_tw_vectors[share_num] = TestWriteVectors( |
---|
1717 | test_vectors=client_test_vectors, |
---|
1718 | write_vectors=client_write_vectors, |
---|
1719 | new_length=new_length |
---|
1720 | ) |
---|
1721 | client_read_vectors = [ |
---|
1722 | ReadVector(offset=offset, size=size) |
---|
1723 | for (offset, size) in r_vector |
---|
1724 | ] |
---|
1725 | try: |
---|
1726 | client_result = yield mutable_client.read_test_write_chunks( |
---|
1727 | storage_index, we_secret, lr_secret, lc_secret, client_tw_vectors, |
---|
1728 | client_read_vectors, |
---|
1729 | ) |
---|
1730 | except ClientException as e: |
---|
1731 | if e.code == http.UNAUTHORIZED: |
---|
1732 | raise RemoteException("Unauthorized write, possibly you passed the wrong write enabler?") |
---|
1733 | raise |
---|
1734 | return (client_result.success, client_result.reads) |
---|