1 | |
---|
2 | """ |
---|
3 | I contain the client-side code which speaks to storage servers, in particular |
---|
4 | the foolscap-based server implemented in src/allmydata/storage/*.py . |
---|
5 | """ |
---|
6 | |
---|
7 | # roadmap: |
---|
8 | # |
---|
9 | # 1: implement StorageFarmBroker (i.e. "storage broker"), change Client to |
---|
10 | # create it, change uploader/servermap to get rrefs from it. ServerFarm calls |
---|
11 | # IntroducerClient.subscribe_to . ServerFarm hides descriptors, passes rrefs |
---|
12 | # to clients. webapi status pages call broker.get_info_about_serverid. |
---|
13 | # |
---|
14 | # 2: move get_info methods to the descriptor, webapi status pages call |
---|
15 | # broker.get_descriptor_for_serverid().get_info |
---|
16 | # |
---|
17 | # 3?later?: store descriptors in UploadResults/etc instead of serverids, |
---|
18 | # webapi status pages call descriptor.get_info and don't use storage_broker |
---|
19 | # or Client |
---|
20 | # |
---|
21 | # 4: enable static config: tahoe.cfg can add descriptors. Make the introducer |
---|
22 | # optional. This closes #467 |
---|
23 | # |
---|
24 | # 5: implement NativeStorageClient, pass it to Tahoe2PeerSelector and other |
---|
25 | # clients. Clients stop doing callRemote(), use NativeStorageClient methods |
---|
26 | # instead (which might do something else, i.e. http or whatever). The |
---|
27 | # introducer and tahoe.cfg only create NativeStorageClients for now. |
---|
28 | # |
---|
29 | # 6: implement other sorts of IStorageClient classes: S3, etc |
---|
30 | |
---|
31 | |
---|
32 | import re, time, hashlib |
---|
33 | from ConfigParser import ( |
---|
34 | NoSectionError, |
---|
35 | ) |
---|
36 | import attr |
---|
37 | from zope.interface import ( |
---|
38 | Attribute, |
---|
39 | Interface, |
---|
40 | implementer, |
---|
41 | ) |
---|
42 | from twisted.internet import defer |
---|
43 | from twisted.application import service |
---|
44 | from twisted.plugin import ( |
---|
45 | getPlugins, |
---|
46 | ) |
---|
47 | from eliot import ( |
---|
48 | log_call, |
---|
49 | ) |
---|
50 | from foolscap.api import eventually |
---|
51 | from foolscap.reconnector import ( |
---|
52 | ReconnectionInfo, |
---|
53 | ) |
---|
54 | from allmydata.interfaces import ( |
---|
55 | IStorageBroker, |
---|
56 | IDisplayableServer, |
---|
57 | IServer, |
---|
58 | IStorageServer, |
---|
59 | IFoolscapStoragePlugin, |
---|
60 | ) |
---|
61 | from allmydata.util import log, base32, connection_status |
---|
62 | from allmydata.util.assertutil import precondition |
---|
63 | from allmydata.util.observer import ObserverList |
---|
64 | from allmydata.util.rrefutil import add_version_to_remote_reference |
---|
65 | from allmydata.util.hashutil import permute_server_hash |
---|
66 | |
---|
67 | # who is responsible for de-duplication? |
---|
68 | # both? |
---|
69 | # IC remembers the unpacked announcements it receives, to provide for late |
---|
70 | # subscribers and to remove duplicates |
---|
71 | |
---|
72 | # if a client subscribes after startup, will they receive old announcements? |
---|
73 | # yes |
---|
74 | |
---|
75 | # who will be responsible for signature checking? |
---|
76 | # make it be IntroducerClient, so they can push the filter outwards and |
---|
77 | # reduce inbound network traffic |
---|
78 | |
---|
79 | # what should the interface between StorageFarmBroker and IntroducerClient |
---|
80 | # look like? |
---|
81 | # don't pass signatures: only pass validated blessed-objects |
---|
82 | |
---|
83 | @attr.s |
---|
84 | class StorageClientConfig(object): |
---|
85 | """ |
---|
86 | Configuration for a node acting as a storage client. |
---|
87 | |
---|
88 | :ivar preferred_peers: An iterable of the server-ids (``bytes``) of the |
---|
89 | storage servers where share placement is preferred, in order of |
---|
90 | decreasing preference. See the *[client]peers.preferred* |
---|
91 | documentation for details. |
---|
92 | |
---|
93 | :ivar dict[unicode, dict[bytes, bytes]] storage_plugins: A mapping from |
---|
94 | names of ``IFoolscapStoragePlugin`` configured in *tahoe.cfg* to the |
---|
95 | respective configuration. |
---|
96 | """ |
---|
97 | preferred_peers = attr.ib(default=()) |
---|
98 | storage_plugins = attr.ib(default=attr.Factory(dict)) |
---|
99 | |
---|
100 | @classmethod |
---|
101 | def from_node_config(cls, config): |
---|
102 | """ |
---|
103 | Create a ``StorageClientConfig`` from a complete Tahoe-LAFS node |
---|
104 | configuration. |
---|
105 | |
---|
106 | :param _Config config: The loaded Tahoe-LAFS node configuration. |
---|
107 | """ |
---|
108 | ps = config.get_config("client", "peers.preferred", b"").split(b",") |
---|
109 | preferred_peers = tuple([p.strip() for p in ps if p != b""]) |
---|
110 | |
---|
111 | enabled_storage_plugins = ( |
---|
112 | name.strip() |
---|
113 | for name |
---|
114 | in config.get_config( |
---|
115 | b"client", |
---|
116 | b"storage.plugins", |
---|
117 | b"", |
---|
118 | ).decode("utf-8").split(u",") |
---|
119 | if name.strip() |
---|
120 | ) |
---|
121 | |
---|
122 | storage_plugins = {} |
---|
123 | for plugin_name in enabled_storage_plugins: |
---|
124 | try: |
---|
125 | plugin_config = config.items(b"storageclient.plugins." + plugin_name) |
---|
126 | except NoSectionError: |
---|
127 | plugin_config = [] |
---|
128 | storage_plugins[plugin_name] = dict(plugin_config) |
---|
129 | |
---|
130 | return cls( |
---|
131 | preferred_peers, |
---|
132 | storage_plugins, |
---|
133 | ) |
---|
134 | |
---|
135 | |
---|
136 | @implementer(IStorageBroker) |
---|
137 | class StorageFarmBroker(service.MultiService): |
---|
138 | """I live on the client, and know about storage servers. For each server |
---|
139 | that is participating in a grid, I either maintain a connection to it or |
---|
140 | remember enough information to establish a connection to it on demand. |
---|
141 | I'm also responsible for subscribing to the IntroducerClient to find out |
---|
142 | about new servers as they are announced by the Introducer. |
---|
143 | |
---|
144 | :ivar StorageClientConfig storage_client_config: Values from the node |
---|
145 | configuration file relating to storage behavior. |
---|
146 | """ |
---|
147 | |
---|
148 | @property |
---|
149 | def preferred_peers(self): |
---|
150 | return self.storage_client_config.preferred_peers |
---|
151 | |
---|
152 | def __init__( |
---|
153 | self, |
---|
154 | permute_peers, |
---|
155 | tub_maker, |
---|
156 | node_config, |
---|
157 | storage_client_config=None, |
---|
158 | ): |
---|
159 | service.MultiService.__init__(self) |
---|
160 | assert permute_peers # False not implemented yet |
---|
161 | self.permute_peers = permute_peers |
---|
162 | self._tub_maker = tub_maker |
---|
163 | |
---|
164 | self.node_config = node_config |
---|
165 | |
---|
166 | if storage_client_config is None: |
---|
167 | storage_client_config = StorageClientConfig() |
---|
168 | self.storage_client_config = storage_client_config |
---|
169 | |
---|
170 | # self.servers maps serverid -> IServer, and keeps track of all the |
---|
171 | # storage servers that we've heard about. Each descriptor manages its |
---|
172 | # own Reconnector, and will give us a RemoteReference when we ask |
---|
173 | # them for it. |
---|
174 | self.servers = {} |
---|
175 | self._static_server_ids = set() # ignore announcements for these |
---|
176 | self.introducer_client = None |
---|
177 | self._threshold_listeners = [] # tuples of (threshold, Deferred) |
---|
178 | self._connected_high_water_mark = 0 |
---|
179 | |
---|
180 | @log_call(action_type=u"storage-client:broker:set-static-servers") |
---|
181 | def set_static_servers(self, servers): |
---|
182 | # Sorting the items gives us a deterministic processing order. This |
---|
183 | # doesn't really matter but it makes the logging behavior more |
---|
184 | # predictable and easier to test (and at least one test does depend on |
---|
185 | # this sorted order). |
---|
186 | for (server_id, server) in sorted(servers.items()): |
---|
187 | try: |
---|
188 | storage_server = self._make_storage_server(server_id, server) |
---|
189 | except Exception: |
---|
190 | # TODO: The _make_storage_server failure is logged but maybe |
---|
191 | # we should write a traceback here. Notably, tests don't |
---|
192 | # automatically fail just because we hit this case. Well |
---|
193 | # written tests will still fail if a surprising exception |
---|
194 | # arrives here but they might be harder to debug without this |
---|
195 | # information. |
---|
196 | pass |
---|
197 | else: |
---|
198 | self._static_server_ids.add(server_id) |
---|
199 | self.servers[server_id] = storage_server |
---|
200 | storage_server.setServiceParent(self) |
---|
201 | storage_server.start_connecting(self._trigger_connections) |
---|
202 | |
---|
203 | def get_client_storage_plugin_web_resources(self, node_config): |
---|
204 | """ |
---|
205 | Get all of the client-side ``IResource`` implementations provided by |
---|
206 | enabled storage plugins. |
---|
207 | |
---|
208 | :param allmydata.node._Config node_config: The complete node |
---|
209 | configuration for the node from which these web resources will be |
---|
210 | served. |
---|
211 | |
---|
212 | :return dict[unicode, IResource]: Resources for all of the plugins. |
---|
213 | """ |
---|
214 | plugins = { |
---|
215 | plugin.name: plugin |
---|
216 | for plugin |
---|
217 | in getPlugins(IFoolscapStoragePlugin) |
---|
218 | } |
---|
219 | return { |
---|
220 | name: plugins[name].get_client_resource(node_config) |
---|
221 | for (name, config) |
---|
222 | in self.storage_client_config.storage_plugins.items() |
---|
223 | } |
---|
224 | |
---|
225 | @log_call( |
---|
226 | action_type=u"storage-client:broker:make-storage-server", |
---|
227 | include_args=["server_id"], |
---|
228 | include_result=False, |
---|
229 | ) |
---|
230 | def _make_storage_server(self, server_id, server): |
---|
231 | assert isinstance(server_id, unicode) # from YAML |
---|
232 | server_id = server_id.encode("ascii") |
---|
233 | handler_overrides = server.get("connections", {}) |
---|
234 | s = NativeStorageServer( |
---|
235 | server_id, |
---|
236 | server["ann"], |
---|
237 | self._tub_maker, |
---|
238 | handler_overrides, |
---|
239 | self.node_config, |
---|
240 | self.storage_client_config, |
---|
241 | ) |
---|
242 | s.on_status_changed(lambda _: self._got_connection()) |
---|
243 | return s |
---|
244 | |
---|
245 | def when_connected_enough(self, threshold): |
---|
246 | """ |
---|
247 | :returns: a Deferred that fires if/when our high water mark for |
---|
248 | number of connected servers becomes (or ever was) above |
---|
249 | "threshold". |
---|
250 | """ |
---|
251 | d = defer.Deferred() |
---|
252 | self._threshold_listeners.append( (threshold, d) ) |
---|
253 | self._check_connected_high_water_mark() |
---|
254 | return d |
---|
255 | |
---|
256 | # these two are used in unit tests |
---|
257 | def test_add_rref(self, serverid, rref, ann): |
---|
258 | s = self._make_storage_server( |
---|
259 | serverid.decode("ascii"), |
---|
260 | {"ann": ann.copy()}, |
---|
261 | ) |
---|
262 | s._rref = rref |
---|
263 | s._is_connected = True |
---|
264 | self.servers[serverid] = s |
---|
265 | |
---|
266 | def test_add_server(self, server_id, s): |
---|
267 | s.on_status_changed(lambda _: self._got_connection()) |
---|
268 | self.servers[server_id] = s |
---|
269 | |
---|
270 | def use_introducer(self, introducer_client): |
---|
271 | self.introducer_client = ic = introducer_client |
---|
272 | ic.subscribe_to("storage", self._got_announcement) |
---|
273 | |
---|
274 | def _got_connection(self): |
---|
275 | # this is called by NativeStorageServer when it is connected |
---|
276 | self._check_connected_high_water_mark() |
---|
277 | |
---|
278 | def _check_connected_high_water_mark(self): |
---|
279 | current = len(self.get_connected_servers()) |
---|
280 | if current > self._connected_high_water_mark: |
---|
281 | self._connected_high_water_mark = current |
---|
282 | |
---|
283 | remaining = [] |
---|
284 | for threshold, d in self._threshold_listeners: |
---|
285 | if self._connected_high_water_mark >= threshold: |
---|
286 | eventually(d.callback, None) |
---|
287 | else: |
---|
288 | remaining.append( (threshold, d) ) |
---|
289 | self._threshold_listeners = remaining |
---|
290 | |
---|
291 | def _got_announcement(self, key_s, ann): |
---|
292 | precondition(isinstance(key_s, str), key_s) |
---|
293 | precondition(key_s.startswith("v0-"), key_s) |
---|
294 | precondition(ann["service-name"] == "storage", ann["service-name"]) |
---|
295 | server_id = key_s |
---|
296 | if server_id in self._static_server_ids: |
---|
297 | log.msg(format="ignoring announcement for static server '%(id)s'", |
---|
298 | id=server_id, |
---|
299 | facility="tahoe.storage_broker", umid="AlxzqA", |
---|
300 | level=log.UNUSUAL) |
---|
301 | return |
---|
302 | s = self._make_storage_server( |
---|
303 | server_id.decode("utf-8"), |
---|
304 | {u"ann": ann}, |
---|
305 | ) |
---|
306 | server_id = s.get_serverid() |
---|
307 | old = self.servers.get(server_id) |
---|
308 | if old: |
---|
309 | if old.get_announcement() == ann: |
---|
310 | return # duplicate |
---|
311 | # replacement |
---|
312 | del self.servers[server_id] |
---|
313 | old.stop_connecting() |
---|
314 | old.disownServiceParent() |
---|
315 | # NOTE: this disownServiceParent() returns a Deferred that |
---|
316 | # doesn't fire until Tub.stopService fires, which will wait for |
---|
317 | # any existing connections to be shut down. This doesn't |
---|
318 | # generally matter for normal runtime, but unit tests can run |
---|
319 | # into DirtyReactorErrors if they don't block on these. If a test |
---|
320 | # replaces one server with a newer version, then terminates |
---|
321 | # before the old one has been shut down, it might get |
---|
322 | # DirtyReactorErrors. The fix would be to gather these Deferreds |
---|
323 | # into a structure that will block StorageFarmBroker.stopService |
---|
324 | # until they have fired (but hopefully don't keep reference |
---|
325 | # cycles around when they fire earlier than that, which will |
---|
326 | # almost always be the case for normal runtime). |
---|
327 | # now we forget about them and start using the new one |
---|
328 | s.setServiceParent(self) |
---|
329 | self.servers[server_id] = s |
---|
330 | s.start_connecting(self._trigger_connections) |
---|
331 | # the descriptor will manage their own Reconnector, and each time we |
---|
332 | # need servers, we'll ask them if they're connected or not. |
---|
333 | |
---|
334 | def _trigger_connections(self): |
---|
335 | # when one connection is established, reset the timers on all others, |
---|
336 | # to trigger a reconnection attempt in one second. This is intended |
---|
337 | # to accelerate server connections when we've been offline for a |
---|
338 | # while. The goal is to avoid hanging out for a long time with |
---|
339 | # connections to only a subset of the servers, which would increase |
---|
340 | # the chances that we'll put shares in weird places (and not update |
---|
341 | # existing shares of mutable files). See #374 for more details. |
---|
342 | for dsc in self.servers.values(): |
---|
343 | dsc.try_to_connect() |
---|
344 | |
---|
345 | def get_servers_for_psi(self, peer_selection_index): |
---|
346 | # return a list of server objects (IServers) |
---|
347 | assert self.permute_peers == True |
---|
348 | connected_servers = self.get_connected_servers() |
---|
349 | preferred_servers = frozenset(s for s in connected_servers if s.get_longname() in self.preferred_peers) |
---|
350 | def _permuted(server): |
---|
351 | seed = server.get_permutation_seed() |
---|
352 | is_unpreferred = server not in preferred_servers |
---|
353 | return (is_unpreferred, |
---|
354 | permute_server_hash(peer_selection_index, seed)) |
---|
355 | return sorted(connected_servers, key=_permuted) |
---|
356 | |
---|
357 | def get_all_serverids(self): |
---|
358 | return frozenset(self.servers.keys()) |
---|
359 | |
---|
360 | def get_connected_servers(self): |
---|
361 | return frozenset([s for s in self.servers.values() if s.is_connected()]) |
---|
362 | |
---|
363 | def get_known_servers(self): |
---|
364 | return frozenset(self.servers.values()) |
---|
365 | |
---|
366 | def get_nickname_for_serverid(self, serverid): |
---|
367 | if serverid in self.servers: |
---|
368 | return self.servers[serverid].get_nickname() |
---|
369 | return None |
---|
370 | |
---|
371 | def get_stub_server(self, serverid): |
---|
372 | if serverid in self.servers: |
---|
373 | return self.servers[serverid] |
---|
374 | # some time before 1.12, we changed "serverid" to be "key_s" (the |
---|
375 | # printable verifying key, used in V2 announcements), instead of the |
---|
376 | # tubid. When the immutable uploader delegates work to a Helper, |
---|
377 | # get_stub_server() is used to map the returning server identifiers |
---|
378 | # to IDisplayableServer instances (to get a name, for display on the |
---|
379 | # Upload Results web page). If the Helper is running 1.12 or newer, |
---|
380 | # it will send pubkeys, but if it's still running 1.11, it will send |
---|
381 | # tubids. This clause maps the old tubids to our existing servers. |
---|
382 | for s in self.servers.values(): |
---|
383 | if isinstance(s, NativeStorageServer): |
---|
384 | if serverid == s.get_tubid(): |
---|
385 | return s |
---|
386 | return StubServer(serverid) |
---|
387 | |
---|
388 | @implementer(IDisplayableServer) |
---|
389 | class StubServer(object): |
---|
390 | def __init__(self, serverid): |
---|
391 | self.serverid = serverid # binary tubid |
---|
392 | def get_serverid(self): |
---|
393 | return self.serverid |
---|
394 | def get_name(self): |
---|
395 | return base32.b2a(self.serverid)[:8] |
---|
396 | def get_longname(self): |
---|
397 | return base32.b2a(self.serverid) |
---|
398 | def get_nickname(self): |
---|
399 | return "?" |
---|
400 | |
---|
401 | |
---|
402 | class IFoolscapStorageServer(Interface): |
---|
403 | """ |
---|
404 | An internal interface that mediates between ``NativeStorageServer`` and |
---|
405 | Foolscap-based ``IStorageServer`` implementations. |
---|
406 | """ |
---|
407 | nickname = Attribute(""" |
---|
408 | A name for this server for presentation to users. |
---|
409 | """) |
---|
410 | permutation_seed = Attribute(""" |
---|
411 | A stable value associated with this server which a client can use as an |
---|
412 | input to the server selection permutation ordering. |
---|
413 | """) |
---|
414 | tubid = Attribute(""" |
---|
415 | The identifier for the Tub in which the server is run. |
---|
416 | """) |
---|
417 | storage_server = Attribute(""" |
---|
418 | An IStorageServer provide which implements a concrete Foolscap-based |
---|
419 | protocol for communicating with the server. |
---|
420 | """) |
---|
421 | name = Attribute(""" |
---|
422 | Another name for this server for presentation to users. |
---|
423 | """) |
---|
424 | longname = Attribute(""" |
---|
425 | *Another* name for this server for presentation to users. |
---|
426 | """) |
---|
427 | lease_seed = Attribute(""" |
---|
428 | A stable value associated with this server which a client can use as an |
---|
429 | input to a lease secret generation function. |
---|
430 | """) |
---|
431 | |
---|
432 | def connect_to(tub, got_connection): |
---|
433 | """ |
---|
434 | Attempt to establish and maintain a connection to the server. |
---|
435 | |
---|
436 | :param Tub tub: A Foolscap Tub from which the connection is to |
---|
437 | originate. |
---|
438 | |
---|
439 | :param got_connection: A one-argument callable which is called with a |
---|
440 | Foolscap ``RemoteReference`` when a connection is established. |
---|
441 | This may be called multiple times if the connection is lost and |
---|
442 | then re-established. |
---|
443 | |
---|
444 | :return foolscap.reconnector.Reconnector: An object which manages the |
---|
445 | connection and reconnection attempts. |
---|
446 | """ |
---|
447 | |
---|
448 | |
---|
449 | @implementer(IFoolscapStorageServer) |
---|
450 | @attr.s(frozen=True) |
---|
451 | class _FoolscapStorage(object): |
---|
452 | """ |
---|
453 | Abstraction for connecting to a storage server exposed via Foolscap. |
---|
454 | """ |
---|
455 | nickname = attr.ib() |
---|
456 | permutation_seed = attr.ib() |
---|
457 | tubid = attr.ib() |
---|
458 | |
---|
459 | storage_server = attr.ib(validator=attr.validators.provides(IStorageServer)) |
---|
460 | |
---|
461 | _furl = attr.ib() |
---|
462 | _short_description = attr.ib() |
---|
463 | _long_description = attr.ib() |
---|
464 | |
---|
465 | |
---|
466 | @property |
---|
467 | def name(self): |
---|
468 | return self._short_description |
---|
469 | |
---|
470 | @property |
---|
471 | def longname(self): |
---|
472 | return self._long_description |
---|
473 | |
---|
474 | @property |
---|
475 | def lease_seed(self): |
---|
476 | return self.tubid |
---|
477 | |
---|
478 | @classmethod |
---|
479 | def from_announcement(cls, server_id, furl, ann, storage_server): |
---|
480 | """ |
---|
481 | Create an instance from a fURL and an announcement like:: |
---|
482 | |
---|
483 | {"permutation-seed-base32": "...", |
---|
484 | "nickname": "...", |
---|
485 | } |
---|
486 | |
---|
487 | *nickname* is optional. |
---|
488 | """ |
---|
489 | m = re.match(r'pb://(\w+)@', furl) |
---|
490 | assert m, furl |
---|
491 | tubid_s = m.group(1).lower() |
---|
492 | tubid = base32.a2b(tubid_s) |
---|
493 | if "permutation-seed-base32" in ann: |
---|
494 | ps = base32.a2b(str(ann["permutation-seed-base32"])) |
---|
495 | elif re.search(r'^v0-[0-9a-zA-Z]{52}$', server_id): |
---|
496 | ps = base32.a2b(server_id[3:]) |
---|
497 | else: |
---|
498 | log.msg("unable to parse serverid '%(server_id)s as pubkey, " |
---|
499 | "hashing it to get permutation-seed, " |
---|
500 | "may not converge with other clients", |
---|
501 | server_id=server_id, |
---|
502 | facility="tahoe.storage_broker", |
---|
503 | level=log.UNUSUAL, umid="qu86tw") |
---|
504 | ps = hashlib.sha256(server_id).digest() |
---|
505 | permutation_seed = ps |
---|
506 | |
---|
507 | assert server_id |
---|
508 | long_description = server_id |
---|
509 | if server_id.startswith("v0-"): |
---|
510 | # remove v0- prefix from abbreviated name |
---|
511 | short_description = server_id[3:3+8] |
---|
512 | else: |
---|
513 | short_description = server_id[:8] |
---|
514 | nickname = ann.get("nickname", "") |
---|
515 | |
---|
516 | return cls( |
---|
517 | nickname=nickname, |
---|
518 | permutation_seed=permutation_seed, |
---|
519 | tubid=tubid, |
---|
520 | storage_server=storage_server, |
---|
521 | furl=furl, |
---|
522 | short_description=short_description, |
---|
523 | long_description=long_description, |
---|
524 | ) |
---|
525 | |
---|
526 | def connect_to(self, tub, got_connection): |
---|
527 | return tub.connectTo(self._furl, got_connection) |
---|
528 | |
---|
529 | |
---|
530 | @implementer(IFoolscapStorageServer) |
---|
531 | class _NullStorage(object): |
---|
532 | """ |
---|
533 | Abstraction for *not* communicating with a storage server of a type with |
---|
534 | which we can't communicate. |
---|
535 | """ |
---|
536 | nickname = "" |
---|
537 | permutation_seed = hashlib.sha256("").digest() |
---|
538 | tubid = hashlib.sha256("").digest() |
---|
539 | storage_server = None |
---|
540 | |
---|
541 | lease_seed = hashlib.sha256("").digest() |
---|
542 | |
---|
543 | name = "<unsupported>" |
---|
544 | longname = "<storage with unsupported protocol>" |
---|
545 | |
---|
546 | def connect_to(self, tub, got_connection): |
---|
547 | return NonReconnector() |
---|
548 | |
---|
549 | |
---|
550 | class NonReconnector(object): |
---|
551 | """ |
---|
552 | A ``foolscap.reconnector.Reconnector``-alike that doesn't do anything. |
---|
553 | """ |
---|
554 | def stopConnecting(self): |
---|
555 | pass |
---|
556 | |
---|
557 | def reset(self): |
---|
558 | pass |
---|
559 | |
---|
560 | def getReconnectionInfo(self): |
---|
561 | return ReconnectionInfo() |
---|
562 | |
---|
563 | _null_storage = _NullStorage() |
---|
564 | |
---|
565 | |
---|
566 | class AnnouncementNotMatched(Exception): |
---|
567 | """ |
---|
568 | A storage server announcement wasn't matched by any of the locally enabled |
---|
569 | plugins. |
---|
570 | """ |
---|
571 | |
---|
572 | |
---|
573 | def _storage_from_foolscap_plugin(node_config, config, announcement, get_rref): |
---|
574 | """ |
---|
575 | Construct an ``IStorageServer`` from the most locally-preferred plugin |
---|
576 | that is offered in the given announcement. |
---|
577 | |
---|
578 | :param allmydata.node._Config node_config: The node configuration to |
---|
579 | pass to the plugin. |
---|
580 | """ |
---|
581 | plugins = { |
---|
582 | plugin.name: plugin |
---|
583 | for plugin |
---|
584 | in getPlugins(IFoolscapStoragePlugin) |
---|
585 | } |
---|
586 | storage_options = announcement.get(u"storage-options", []) |
---|
587 | for plugin_name, plugin_config in config.storage_plugins.items(): |
---|
588 | try: |
---|
589 | plugin = plugins[plugin_name] |
---|
590 | except KeyError: |
---|
591 | raise ValueError("{} not installed".format(plugin_name)) |
---|
592 | for option in storage_options: |
---|
593 | if plugin_name == option[u"name"]: |
---|
594 | furl = option[u"storage-server-FURL"] |
---|
595 | return furl, plugin.get_storage_client( |
---|
596 | node_config, |
---|
597 | option, |
---|
598 | get_rref, |
---|
599 | ) |
---|
600 | raise AnnouncementNotMatched() |
---|
601 | |
---|
602 | |
---|
603 | @implementer(IServer) |
---|
604 | class NativeStorageServer(service.MultiService): |
---|
605 | """I hold information about a storage server that we want to connect to. |
---|
606 | If we are connected, I hold the RemoteReference, their host address, and |
---|
607 | the their version information. I remember information about when we were |
---|
608 | last connected too, even if we aren't currently connected. |
---|
609 | |
---|
610 | @ivar last_connect_time: when we last established a connection |
---|
611 | @ivar last_loss_time: when we last lost a connection |
---|
612 | |
---|
613 | @ivar version: the server's versiondict, from the most recent announcement |
---|
614 | @ivar nickname: the server's self-reported nickname (unicode), same |
---|
615 | |
---|
616 | @ivar rref: the RemoteReference, if connected, otherwise None |
---|
617 | @ivar remote_host: the IAddress, if connected, otherwise None |
---|
618 | """ |
---|
619 | |
---|
620 | VERSION_DEFAULTS = { |
---|
621 | "http://allmydata.org/tahoe/protocols/storage/v1" : |
---|
622 | { "maximum-immutable-share-size": 2**32 - 1, |
---|
623 | "maximum-mutable-share-size": 2*1000*1000*1000, # maximum prior to v1.9.2 |
---|
624 | "tolerates-immutable-read-overrun": False, |
---|
625 | "delete-mutable-shares-with-zero-length-writev": False, |
---|
626 | "available-space": None, |
---|
627 | }, |
---|
628 | "application-version": "unknown: no get_version()", |
---|
629 | } |
---|
630 | |
---|
631 | def __init__(self, server_id, ann, tub_maker, handler_overrides, node_config, config=StorageClientConfig()): |
---|
632 | service.MultiService.__init__(self) |
---|
633 | assert isinstance(server_id, str) |
---|
634 | self._server_id = server_id |
---|
635 | self.announcement = ann |
---|
636 | self._tub_maker = tub_maker |
---|
637 | self._handler_overrides = handler_overrides |
---|
638 | |
---|
639 | self._storage = self._make_storage_system(node_config, config, ann) |
---|
640 | |
---|
641 | self.last_connect_time = None |
---|
642 | self.last_loss_time = None |
---|
643 | self.remote_host = None |
---|
644 | self._rref = None |
---|
645 | self._is_connected = False |
---|
646 | self._reconnector = None |
---|
647 | self._trigger_cb = None |
---|
648 | self._on_status_changed = ObserverList() |
---|
649 | |
---|
650 | def _make_storage_system(self, node_config, config, ann): |
---|
651 | """ |
---|
652 | :param allmydata.node._Config node_config: The node configuration to pass |
---|
653 | to any configured storage plugins. |
---|
654 | |
---|
655 | :param StorageClientConfig config: Configuration specifying desired |
---|
656 | storage client behavior. |
---|
657 | |
---|
658 | :param dict ann: The storage announcement from the storage server we |
---|
659 | are meant to communicate with. |
---|
660 | |
---|
661 | :return IFoolscapStorageServer: An object enabling communication via |
---|
662 | Foolscap with the server which generated the announcement. |
---|
663 | """ |
---|
664 | # Try to match the announcement against a plugin. |
---|
665 | try: |
---|
666 | furl, storage_server = _storage_from_foolscap_plugin( |
---|
667 | node_config, |
---|
668 | config, |
---|
669 | ann, |
---|
670 | # Pass in an accessor for our _rref attribute. The value of |
---|
671 | # the attribute may change over time as connections are lost |
---|
672 | # and re-established. The _StorageServer should always be |
---|
673 | # able to get the most up-to-date value. |
---|
674 | self.get_rref, |
---|
675 | ) |
---|
676 | except AnnouncementNotMatched: |
---|
677 | # Nope. |
---|
678 | pass |
---|
679 | else: |
---|
680 | return _FoolscapStorage.from_announcement( |
---|
681 | self._server_id, |
---|
682 | furl.encode("utf-8"), |
---|
683 | ann, |
---|
684 | storage_server, |
---|
685 | ) |
---|
686 | |
---|
687 | # Try to match the announcement against the anonymous access scheme. |
---|
688 | try: |
---|
689 | furl = ann[u"anonymous-storage-FURL"] |
---|
690 | except KeyError: |
---|
691 | # Nope |
---|
692 | pass |
---|
693 | else: |
---|
694 | # See comment above for the _storage_from_foolscap_plugin case |
---|
695 | # about passing in get_rref. |
---|
696 | storage_server = _StorageServer(get_rref=self.get_rref) |
---|
697 | return _FoolscapStorage.from_announcement( |
---|
698 | self._server_id, |
---|
699 | furl.encode("utf-8"), |
---|
700 | ann, |
---|
701 | storage_server, |
---|
702 | ) |
---|
703 | |
---|
704 | # Nothing matched so we can't talk to this server. |
---|
705 | return _null_storage |
---|
706 | |
---|
707 | def get_permutation_seed(self): |
---|
708 | return self._storage.permutation_seed |
---|
709 | def get_name(self): # keep methodname short |
---|
710 | # TODO: decide who adds [] in the short description. It should |
---|
711 | # probably be the output side, not here. |
---|
712 | return self._storage.name |
---|
713 | def get_longname(self): |
---|
714 | return self._storage.longname |
---|
715 | def get_tubid(self): |
---|
716 | return self._storage.tubid |
---|
717 | def get_lease_seed(self): |
---|
718 | return self._storage.lease_seed |
---|
719 | def get_foolscap_write_enabler_seed(self): |
---|
720 | return self._storage.tubid |
---|
721 | def get_nickname(self): |
---|
722 | return self._storage.nickname |
---|
723 | |
---|
724 | def on_status_changed(self, status_changed): |
---|
725 | """ |
---|
726 | :param status_changed: a callable taking a single arg (the |
---|
727 | NativeStorageServer) that is notified when we become connected |
---|
728 | """ |
---|
729 | return self._on_status_changed.subscribe(status_changed) |
---|
730 | |
---|
731 | # Special methods used by copy.copy() and copy.deepcopy(). When those are |
---|
732 | # used in allmydata.immutable.filenode to copy CheckResults during |
---|
733 | # repair, we want it to treat the IServer instances as singletons, and |
---|
734 | # not attempt to duplicate them.. |
---|
735 | def __copy__(self): |
---|
736 | return self |
---|
737 | def __deepcopy__(self, memodict): |
---|
738 | return self |
---|
739 | |
---|
740 | def __repr__(self): |
---|
741 | return "<NativeStorageServer for %s>" % self.get_name() |
---|
742 | def get_serverid(self): |
---|
743 | return self._server_id |
---|
744 | def get_version(self): |
---|
745 | if self._rref: |
---|
746 | return self._rref.version |
---|
747 | return None |
---|
748 | def get_announcement(self): |
---|
749 | return self.announcement |
---|
750 | def get_remote_host(self): |
---|
751 | return self.remote_host |
---|
752 | |
---|
753 | def get_connection_status(self): |
---|
754 | last_received = None |
---|
755 | if self._rref: |
---|
756 | last_received = self._rref.getDataLastReceivedAt() |
---|
757 | return connection_status.from_foolscap_reconnector(self._reconnector, |
---|
758 | last_received) |
---|
759 | |
---|
760 | def is_connected(self): |
---|
761 | return self._is_connected |
---|
762 | |
---|
763 | def get_available_space(self): |
---|
764 | version = self.get_version() |
---|
765 | if version is None: |
---|
766 | return None |
---|
767 | protocol_v1_version = version.get('http://allmydata.org/tahoe/protocols/storage/v1', {}) |
---|
768 | available_space = protocol_v1_version.get('available-space') |
---|
769 | if available_space is None: |
---|
770 | available_space = protocol_v1_version.get('maximum-immutable-share-size', None) |
---|
771 | return available_space |
---|
772 | |
---|
773 | def start_connecting(self, trigger_cb): |
---|
774 | self._tub = self._tub_maker(self._handler_overrides) |
---|
775 | self._tub.setServiceParent(self) |
---|
776 | self._trigger_cb = trigger_cb |
---|
777 | self._reconnector = self._storage.connect_to(self._tub, self._got_connection) |
---|
778 | |
---|
779 | def _got_connection(self, rref): |
---|
780 | lp = log.msg(format="got connection to %(name)s, getting versions", |
---|
781 | name=self.get_name(), |
---|
782 | facility="tahoe.storage_broker", umid="coUECQ") |
---|
783 | if self._trigger_cb: |
---|
784 | eventually(self._trigger_cb) |
---|
785 | default = self.VERSION_DEFAULTS |
---|
786 | d = add_version_to_remote_reference(rref, default) |
---|
787 | d.addCallback(self._got_versioned_service, lp) |
---|
788 | d.addCallback(lambda ign: self._on_status_changed.notify(self)) |
---|
789 | d.addErrback(log.err, format="storageclient._got_connection", |
---|
790 | name=self.get_name(), umid="Sdq3pg") |
---|
791 | |
---|
792 | def _got_versioned_service(self, rref, lp): |
---|
793 | log.msg(format="%(name)s provided version info %(version)s", |
---|
794 | name=self.get_name(), version=rref.version, |
---|
795 | facility="tahoe.storage_broker", umid="SWmJYg", |
---|
796 | level=log.NOISY, parent=lp) |
---|
797 | |
---|
798 | self.last_connect_time = time.time() |
---|
799 | self.remote_host = rref.getLocationHints() |
---|
800 | self._rref = rref |
---|
801 | self._is_connected = True |
---|
802 | rref.notifyOnDisconnect(self._lost) |
---|
803 | |
---|
804 | def get_rref(self): |
---|
805 | return self._rref |
---|
806 | |
---|
807 | def get_storage_server(self): |
---|
808 | """ |
---|
809 | See ``IServer.get_storage_server``. |
---|
810 | """ |
---|
811 | if self._rref is None: |
---|
812 | return None |
---|
813 | return self._storage.storage_server |
---|
814 | |
---|
815 | def _lost(self): |
---|
816 | log.msg(format="lost connection to %(name)s", name=self.get_name(), |
---|
817 | facility="tahoe.storage_broker", umid="zbRllw") |
---|
818 | self.last_loss_time = time.time() |
---|
819 | # self._rref is now stale: all callRemote()s will get a |
---|
820 | # DeadReferenceError. We leave the stale reference in place so that |
---|
821 | # uploader/downloader code (which received this IServer through |
---|
822 | # get_connected_servers() or get_servers_for_psi()) can continue to |
---|
823 | # use s.get_rref().callRemote() and not worry about it being None. |
---|
824 | self._is_connected = False |
---|
825 | self.remote_host = None |
---|
826 | |
---|
827 | def stop_connecting(self): |
---|
828 | # used when this descriptor has been superceded by another |
---|
829 | self._reconnector.stopConnecting() |
---|
830 | |
---|
831 | def try_to_connect(self): |
---|
832 | # used when the broker wants us to hurry up |
---|
833 | self._reconnector.reset() |
---|
834 | |
---|
835 | class UnknownServerTypeError(Exception): |
---|
836 | pass |
---|
837 | |
---|
838 | |
---|
839 | @implementer(IStorageServer) |
---|
840 | @attr.s |
---|
841 | class _StorageServer(object): |
---|
842 | """ |
---|
843 | ``_StorageServer`` is a direct pass-through to an ``RIStorageServer`` via |
---|
844 | a ``RemoteReference``. |
---|
845 | """ |
---|
846 | _get_rref = attr.ib() |
---|
847 | |
---|
848 | @property |
---|
849 | def _rref(self): |
---|
850 | return self._get_rref() |
---|
851 | |
---|
852 | def get_version(self): |
---|
853 | return self._rref.callRemote( |
---|
854 | "get_version", |
---|
855 | ) |
---|
856 | |
---|
857 | def allocate_buckets( |
---|
858 | self, |
---|
859 | storage_index, |
---|
860 | renew_secret, |
---|
861 | cancel_secret, |
---|
862 | sharenums, |
---|
863 | allocated_size, |
---|
864 | canary, |
---|
865 | ): |
---|
866 | return self._rref.callRemote( |
---|
867 | "allocate_buckets", |
---|
868 | storage_index, |
---|
869 | renew_secret, |
---|
870 | cancel_secret, |
---|
871 | sharenums, |
---|
872 | allocated_size, |
---|
873 | canary, |
---|
874 | ) |
---|
875 | |
---|
876 | def add_lease( |
---|
877 | self, |
---|
878 | storage_index, |
---|
879 | renew_secret, |
---|
880 | cancel_secret, |
---|
881 | ): |
---|
882 | return self._rref.callRemote( |
---|
883 | "add_lease", |
---|
884 | storage_index, |
---|
885 | renew_secret, |
---|
886 | cancel_secret, |
---|
887 | ) |
---|
888 | |
---|
889 | def renew_lease( |
---|
890 | self, |
---|
891 | storage_index, |
---|
892 | renew_secret, |
---|
893 | ): |
---|
894 | return self._rref.callRemote( |
---|
895 | "renew_lease", |
---|
896 | storage_index, |
---|
897 | renew_secret, |
---|
898 | ) |
---|
899 | |
---|
900 | def get_buckets( |
---|
901 | self, |
---|
902 | storage_index, |
---|
903 | ): |
---|
904 | return self._rref.callRemote( |
---|
905 | "get_buckets", |
---|
906 | storage_index, |
---|
907 | ) |
---|
908 | |
---|
909 | def slot_readv( |
---|
910 | self, |
---|
911 | storage_index, |
---|
912 | shares, |
---|
913 | readv, |
---|
914 | ): |
---|
915 | return self._rref.callRemote( |
---|
916 | "slot_readv", |
---|
917 | storage_index, |
---|
918 | shares, |
---|
919 | readv, |
---|
920 | ) |
---|
921 | |
---|
922 | def slot_testv_and_readv_and_writev( |
---|
923 | self, |
---|
924 | storage_index, |
---|
925 | secrets, |
---|
926 | tw_vectors, |
---|
927 | r_vector, |
---|
928 | ): |
---|
929 | return self._rref.callRemote( |
---|
930 | "slot_testv_and_readv_and_writev", |
---|
931 | storage_index, |
---|
932 | secrets, |
---|
933 | tw_vectors, |
---|
934 | r_vector, |
---|
935 | ) |
---|
936 | |
---|
937 | def advise_corrupt_share( |
---|
938 | self, |
---|
939 | share_type, |
---|
940 | storage_index, |
---|
941 | shnum, |
---|
942 | reason, |
---|
943 | ): |
---|
944 | return self._rref.callRemoteOnly( |
---|
945 | "advise_corrupt_share", |
---|
946 | share_type, |
---|
947 | storage_index, |
---|
948 | shnum, |
---|
949 | reason, |
---|
950 | ) |
---|