1 | """ |
---|
2 | Tests for allmydata.storage_client. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import annotations |
---|
6 | |
---|
7 | from json import ( |
---|
8 | loads, |
---|
9 | ) |
---|
10 | import hashlib |
---|
11 | from typing import Union, Any, Optional |
---|
12 | |
---|
13 | from hyperlink import DecodedURL |
---|
14 | from fixtures import ( |
---|
15 | TempDir, |
---|
16 | ) |
---|
17 | from testtools.content import ( |
---|
18 | text_content, |
---|
19 | ) |
---|
20 | from testtools.matchers import ( |
---|
21 | MatchesAll, |
---|
22 | IsInstance, |
---|
23 | MatchesStructure, |
---|
24 | Equals, |
---|
25 | Is, |
---|
26 | AfterPreprocessing, |
---|
27 | ) |
---|
28 | |
---|
29 | from zope.interface import ( |
---|
30 | implementer, |
---|
31 | ) |
---|
32 | from zope.interface.verify import ( |
---|
33 | verifyObject, |
---|
34 | ) |
---|
35 | |
---|
36 | from hyperlink import ( |
---|
37 | URL, |
---|
38 | ) |
---|
39 | |
---|
40 | import attr |
---|
41 | |
---|
42 | from twisted.internet.interfaces import ( |
---|
43 | IStreamClientEndpoint, |
---|
44 | IProtocolFactory, |
---|
45 | ) |
---|
46 | from twisted.application.service import ( |
---|
47 | Service, |
---|
48 | ) |
---|
49 | |
---|
50 | from twisted.trial import unittest |
---|
51 | from twisted.internet.defer import ( |
---|
52 | Deferred, |
---|
53 | inlineCallbacks, |
---|
54 | ) |
---|
55 | from twisted.python.filepath import ( |
---|
56 | FilePath, |
---|
57 | ) |
---|
58 | from twisted.internet.task import Clock |
---|
59 | |
---|
60 | from foolscap.api import ( |
---|
61 | Tub, |
---|
62 | ) |
---|
63 | from foolscap.ipb import ( |
---|
64 | IConnectionHintHandler, |
---|
65 | ) |
---|
66 | |
---|
67 | from allmydata.util.deferredutil import MultiFailure |
---|
68 | |
---|
69 | from .no_network import LocalWrapper |
---|
70 | from .common import ( |
---|
71 | EMPTY_CLIENT_CONFIG, |
---|
72 | SyncTestCase, |
---|
73 | AsyncTestCase, |
---|
74 | UseTestPlugins, |
---|
75 | UseNode, |
---|
76 | SameProcessStreamEndpointAssigner, |
---|
77 | MemoryIntroducerClient, |
---|
78 | ) |
---|
79 | from .common_web import ( |
---|
80 | do_http, |
---|
81 | ) |
---|
82 | from .storage_plugin import ( |
---|
83 | DummyStorageClient, |
---|
84 | ) |
---|
85 | from allmydata.webish import ( |
---|
86 | WebishServer, |
---|
87 | ) |
---|
88 | from allmydata.util import base32, yamlutil |
---|
89 | from allmydata.storage_client import ( |
---|
90 | IFoolscapStorageServer, |
---|
91 | NativeStorageServer, |
---|
92 | HTTPNativeStorageServer, |
---|
93 | StorageFarmBroker, |
---|
94 | StorageClientConfig, |
---|
95 | MissingPlugin, |
---|
96 | _FoolscapStorage, |
---|
97 | _NullStorage, |
---|
98 | _pick_a_http_server, |
---|
99 | ANONYMOUS_STORAGE_NURLS, |
---|
100 | ) |
---|
101 | from ..storage.server import ( |
---|
102 | StorageServer, |
---|
103 | ) |
---|
104 | from ..client import config_from_string |
---|
105 | |
---|
106 | from allmydata.interfaces import ( |
---|
107 | IConnectionStatus, |
---|
108 | IStorageServer, |
---|
109 | ) |
---|
110 | |
---|
111 | SOME_FURL = "pb://abcde@nowhere/fake" |
---|
112 | |
---|
113 | |
---|
114 | class NativeStorageServerWithVersion(NativeStorageServer): # type: ignore # tahoe-lafs/ticket/3573 |
---|
115 | def __init__(self, version): |
---|
116 | # note: these instances won't work for anything other than |
---|
117 | # get_available_space() because we don't upcall |
---|
118 | self.version = version |
---|
119 | def get_version(self): |
---|
120 | return self.version |
---|
121 | |
---|
122 | |
---|
123 | class TestNativeStorageServer(unittest.TestCase): |
---|
124 | def test_get_available_space_new(self): |
---|
125 | nss = NativeStorageServerWithVersion( |
---|
126 | { b"http://allmydata.org/tahoe/protocols/storage/v1": |
---|
127 | { b"maximum-immutable-share-size": 111, |
---|
128 | b"available-space": 222, |
---|
129 | } |
---|
130 | }) |
---|
131 | self.failUnlessEqual(nss.get_available_space(), 222) |
---|
132 | |
---|
133 | def test_get_available_space_old(self): |
---|
134 | nss = NativeStorageServerWithVersion( |
---|
135 | { b"http://allmydata.org/tahoe/protocols/storage/v1": |
---|
136 | { b"maximum-immutable-share-size": 111, |
---|
137 | } |
---|
138 | }) |
---|
139 | self.failUnlessEqual(nss.get_available_space(), 111) |
---|
140 | |
---|
141 | def test_missing_nickname(self): |
---|
142 | ann = {"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x", |
---|
143 | "permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3", |
---|
144 | } |
---|
145 | nss = NativeStorageServer(b"server_id", ann, None, {}, EMPTY_CLIENT_CONFIG) |
---|
146 | self.assertEqual(nss.get_nickname(), "") |
---|
147 | |
---|
148 | |
---|
149 | class GetConnectionStatus(unittest.TestCase): |
---|
150 | """ |
---|
151 | Tests for ``NativeStorageServer.get_connection_status``. |
---|
152 | """ |
---|
153 | def test_unrecognized_announcement(self): |
---|
154 | """ |
---|
155 | When ``NativeStorageServer`` is constructed with a storage announcement it |
---|
156 | doesn't recognize, its ``get_connection_status`` nevertheless returns |
---|
157 | an object which provides ``IConnectionStatus``. |
---|
158 | """ |
---|
159 | # Pretty hard to recognize anything from an empty announcement. |
---|
160 | ann = {} |
---|
161 | nss = NativeStorageServer(b"server_id", ann, Tub, {}, EMPTY_CLIENT_CONFIG) |
---|
162 | nss.start_connecting(lambda: None) |
---|
163 | connection_status = nss.get_connection_status() |
---|
164 | self.assertTrue(IConnectionStatus.providedBy(connection_status)) |
---|
165 | |
---|
166 | |
---|
167 | class UnrecognizedAnnouncement(unittest.TestCase): |
---|
168 | """ |
---|
169 | Tests for handling of announcements that aren't recognized and don't use |
---|
170 | *anonymous-storage-FURL*. |
---|
171 | |
---|
172 | Recognition failure is created by making up something completely novel for |
---|
173 | these tests. In real use, recognition failure would most likely come from |
---|
174 | an announcement generated by a storage server plugin which is not loaded |
---|
175 | in the client. |
---|
176 | """ |
---|
177 | plugin_name = u"tahoe-lafs-testing-v1" |
---|
178 | ann = { |
---|
179 | u"storage-options": [ |
---|
180 | { |
---|
181 | u"name": plugin_name, |
---|
182 | u"any-parameter": 12345, |
---|
183 | }, |
---|
184 | ], |
---|
185 | } |
---|
186 | server_id = b"abc" |
---|
187 | |
---|
188 | def _tub_maker(self, overrides): |
---|
189 | return Service() |
---|
190 | |
---|
191 | def native_storage_server(self, config: Optional[StorageClientConfig] = None) -> NativeStorageServer: |
---|
192 | """ |
---|
193 | Make a ``NativeStorageServer`` out of an unrecognizable announcement. |
---|
194 | """ |
---|
195 | return NativeStorageServer( |
---|
196 | self.server_id, |
---|
197 | self.ann, |
---|
198 | self._tub_maker, |
---|
199 | {}, |
---|
200 | node_config=EMPTY_CLIENT_CONFIG, |
---|
201 | config=config if config is not None else StorageClientConfig(), |
---|
202 | ) |
---|
203 | |
---|
204 | def test_no_exceptions(self): |
---|
205 | """ |
---|
206 | ``NativeStorageServer`` can be instantiated with an unrecognized |
---|
207 | announcement. |
---|
208 | """ |
---|
209 | self.native_storage_server() |
---|
210 | |
---|
211 | def test_start_connecting(self): |
---|
212 | """ |
---|
213 | ``NativeStorageServer.start_connecting`` does not raise an exception. |
---|
214 | """ |
---|
215 | server = self.native_storage_server() |
---|
216 | server.start_connecting(None) |
---|
217 | |
---|
218 | def test_stop_connecting(self): |
---|
219 | """ |
---|
220 | ``NativeStorageServer.stop_connecting`` does not raise an exception. |
---|
221 | """ |
---|
222 | server = self.native_storage_server() |
---|
223 | server.start_connecting(None) |
---|
224 | server.stop_connecting() |
---|
225 | |
---|
226 | def test_try_to_connect(self): |
---|
227 | """ |
---|
228 | ``NativeStorageServer.try_to_connect`` does not raise an exception. |
---|
229 | """ |
---|
230 | server = self.native_storage_server() |
---|
231 | server.start_connecting(None) |
---|
232 | server.try_to_connect() |
---|
233 | |
---|
234 | def test_various_data_methods(self): |
---|
235 | """ |
---|
236 | The data accessors of ``NativeStorageServer`` that depend on the |
---|
237 | announcement do not raise an exception. |
---|
238 | """ |
---|
239 | server = self.native_storage_server() |
---|
240 | server.get_permutation_seed() |
---|
241 | server.get_name() |
---|
242 | server.get_longname() |
---|
243 | server.get_tubid() |
---|
244 | server.get_lease_seed() |
---|
245 | server.get_foolscap_write_enabler_seed() |
---|
246 | server.get_nickname() |
---|
247 | |
---|
248 | def test_missing_plugin(self) -> None: |
---|
249 | """ |
---|
250 | An exception is produced if the plugin is missing |
---|
251 | """ |
---|
252 | with self.assertRaises(MissingPlugin): |
---|
253 | self.native_storage_server( |
---|
254 | StorageClientConfig( |
---|
255 | storage_plugins={ |
---|
256 | "missing-plugin-name": {} |
---|
257 | } |
---|
258 | ) |
---|
259 | ) |
---|
260 | |
---|
261 | |
---|
262 | class PluginMatchedAnnouncement(SyncTestCase): |
---|
263 | """ |
---|
264 | Tests for handling by ``NativeStorageServer`` of storage server |
---|
265 | announcements that are handled by an ``IFoolscapStoragePlugin``. |
---|
266 | """ |
---|
267 | @inlineCallbacks |
---|
268 | def make_node(self, introducer_furl, storage_plugin, plugin_config): |
---|
269 | """ |
---|
270 | Create a client node with the given configuration. |
---|
271 | |
---|
272 | :param bytes introducer_furl: The introducer furl with which to |
---|
273 | configure the client. |
---|
274 | |
---|
275 | :param bytes storage_plugin: The name of a storage plugin to enable. |
---|
276 | |
---|
277 | :param dict[bytes, bytes] plugin_config: Configuration to supply to |
---|
278 | the enabled plugin. May also be ``None`` for no configuration |
---|
279 | section (distinct from ``{}`` which creates an empty configuration |
---|
280 | section). |
---|
281 | """ |
---|
282 | tempdir = TempDir() |
---|
283 | self.useFixture(tempdir) |
---|
284 | self.basedir = FilePath(tempdir.path) |
---|
285 | self.basedir.child(u"private").makedirs() |
---|
286 | self.useFixture(UseTestPlugins()) |
---|
287 | |
---|
288 | self.node_fixture = self.useFixture(UseNode( |
---|
289 | plugin_config, |
---|
290 | storage_plugin, |
---|
291 | self.basedir, |
---|
292 | introducer_furl, |
---|
293 | )) |
---|
294 | self.config = self.node_fixture.config |
---|
295 | self.node = yield self.node_fixture.create_node() |
---|
296 | [self.introducer_client] = self.node.introducer_clients |
---|
297 | |
---|
298 | |
---|
299 | def publish(self, server_id, announcement, introducer_client): |
---|
300 | for subscription in introducer_client.subscribed_to: |
---|
301 | if subscription.service_name == u"storage": |
---|
302 | subscription.cb( |
---|
303 | server_id, |
---|
304 | announcement, |
---|
305 | *subscription.args, |
---|
306 | **subscription.kwargs |
---|
307 | ) |
---|
308 | |
---|
309 | def get_storage(self, server_id, node): |
---|
310 | storage_broker = node.get_storage_broker() |
---|
311 | native_storage_server = storage_broker.servers[server_id] |
---|
312 | return native_storage_server._storage |
---|
313 | |
---|
314 | def set_rref(self, server_id, node, rref): |
---|
315 | storage_broker = node.get_storage_broker() |
---|
316 | native_storage_server = storage_broker.servers[server_id] |
---|
317 | native_storage_server._rref = rref |
---|
318 | |
---|
319 | @inlineCallbacks |
---|
320 | def test_ignored_non_enabled_plugin(self): |
---|
321 | """ |
---|
322 | An announcement that could be matched by a plugin that is not enabled is |
---|
323 | not matched. |
---|
324 | """ |
---|
325 | yield self.make_node( |
---|
326 | introducer_furl=SOME_FURL, |
---|
327 | storage_plugin="tahoe-lafs-dummy-v1", |
---|
328 | plugin_config=None, |
---|
329 | ) |
---|
330 | server_id = b"v0-abcdef" |
---|
331 | ann = { |
---|
332 | u"service-name": u"storage", |
---|
333 | u"storage-options": [{ |
---|
334 | # notice how the announcement is for a different storage plugin |
---|
335 | # than the one that is enabled. |
---|
336 | u"name": u"tahoe-lafs-dummy-v2", |
---|
337 | u"storage-server-FURL": SOME_FURL, |
---|
338 | }], |
---|
339 | } |
---|
340 | self.publish(server_id, ann, self.introducer_client) |
---|
341 | storage = self.get_storage(server_id, self.node) |
---|
342 | self.assertIsInstance(storage, _NullStorage) |
---|
343 | |
---|
344 | @inlineCallbacks |
---|
345 | def test_enabled_plugin(self): |
---|
346 | """ |
---|
347 | An announcement that could be matched by a plugin that is enabled with |
---|
348 | configuration is matched and the plugin's storage client is used. |
---|
349 | """ |
---|
350 | plugin_config = { |
---|
351 | "abc": "xyz", |
---|
352 | } |
---|
353 | plugin_name = "tahoe-lafs-dummy-v1" |
---|
354 | yield self.make_node( |
---|
355 | introducer_furl=SOME_FURL, |
---|
356 | storage_plugin=plugin_name, |
---|
357 | plugin_config=plugin_config, |
---|
358 | ) |
---|
359 | server_id = b"v0-abcdef" |
---|
360 | ann = { |
---|
361 | u"service-name": u"storage", |
---|
362 | u"storage-options": [{ |
---|
363 | # and this announcement is for a plugin with a matching name |
---|
364 | u"name": plugin_name, |
---|
365 | u"storage-server-FURL": SOME_FURL, |
---|
366 | }], |
---|
367 | } |
---|
368 | self.publish(server_id, ann, self.introducer_client) |
---|
369 | storage = self.get_storage(server_id, self.node) |
---|
370 | self.assertTrue( |
---|
371 | verifyObject( |
---|
372 | IFoolscapStorageServer, |
---|
373 | storage, |
---|
374 | ), |
---|
375 | ) |
---|
376 | expected_rref = object() |
---|
377 | # Can't easily establish a real Foolscap connection so fake the result |
---|
378 | # of doing so... |
---|
379 | self.set_rref(server_id, self.node, expected_rref) |
---|
380 | self.expectThat( |
---|
381 | storage.storage_server, |
---|
382 | MatchesAll( |
---|
383 | IsInstance(DummyStorageClient), |
---|
384 | MatchesStructure( |
---|
385 | get_rref=AfterPreprocessing( |
---|
386 | lambda get_rref: get_rref(), |
---|
387 | Is(expected_rref), |
---|
388 | ), |
---|
389 | configuration=Equals(plugin_config), |
---|
390 | announcement=Equals({ |
---|
391 | u'name': plugin_name, |
---|
392 | u'storage-server-FURL': u'pb://abcde@nowhere/fake', |
---|
393 | }), |
---|
394 | ), |
---|
395 | ), |
---|
396 | ) |
---|
397 | |
---|
398 | @inlineCallbacks |
---|
399 | def test_enabled_no_configuration_plugin(self): |
---|
400 | """ |
---|
401 | An announcement that could be matched by a plugin that is enabled with no |
---|
402 | configuration is matched and the plugin's storage client is used. |
---|
403 | """ |
---|
404 | plugin_name = "tahoe-lafs-dummy-v1" |
---|
405 | yield self.make_node( |
---|
406 | introducer_furl=SOME_FURL, |
---|
407 | storage_plugin=plugin_name, |
---|
408 | plugin_config=None, |
---|
409 | ) |
---|
410 | server_id = b"v0-abcdef" |
---|
411 | ann = { |
---|
412 | u"service-name": u"storage", |
---|
413 | u"storage-options": [{ |
---|
414 | # and this announcement is for a plugin with a matching name |
---|
415 | u"name": plugin_name, |
---|
416 | u"storage-server-FURL": SOME_FURL, |
---|
417 | }], |
---|
418 | } |
---|
419 | self.publish(server_id, ann, self.introducer_client) |
---|
420 | storage = self.get_storage(server_id, self.node) |
---|
421 | self.addDetail("storage", text_content(str(storage))) |
---|
422 | self.expectThat( |
---|
423 | storage.storage_server, |
---|
424 | MatchesAll( |
---|
425 | IsInstance(DummyStorageClient), |
---|
426 | MatchesStructure( |
---|
427 | configuration=Equals({}), |
---|
428 | ), |
---|
429 | ), |
---|
430 | ) |
---|
431 | |
---|
432 | |
---|
433 | class FoolscapStorageServers(unittest.TestCase): |
---|
434 | """ |
---|
435 | Tests for implementations of ``IFoolscapStorageServer``. |
---|
436 | """ |
---|
437 | def test_null_provider(self): |
---|
438 | """ |
---|
439 | Instances of ``_NullStorage`` provide ``IFoolscapStorageServer``. |
---|
440 | """ |
---|
441 | self.assertTrue( |
---|
442 | verifyObject( |
---|
443 | IFoolscapStorageServer, |
---|
444 | _NullStorage(), |
---|
445 | ), |
---|
446 | ) |
---|
447 | |
---|
448 | def test_foolscap_provider(self): |
---|
449 | """ |
---|
450 | Instances of ``_FoolscapStorage`` provide ``IFoolscapStorageServer``. |
---|
451 | """ |
---|
452 | @implementer(IStorageServer) |
---|
453 | class NotStorageServer(object): |
---|
454 | pass |
---|
455 | self.assertTrue( |
---|
456 | verifyObject( |
---|
457 | IFoolscapStorageServer, |
---|
458 | _FoolscapStorage.from_announcement( |
---|
459 | b"server-id", |
---|
460 | SOME_FURL, |
---|
461 | {u"permutation-seed-base32": base32.b2a(b"permutationseed")}, |
---|
462 | NotStorageServer(), |
---|
463 | ), |
---|
464 | ), |
---|
465 | ) |
---|
466 | |
---|
467 | |
---|
468 | class StoragePluginWebPresence(AsyncTestCase): |
---|
469 | """ |
---|
470 | Tests for the web resources ``IFoolscapStorageServer`` plugins may expose. |
---|
471 | """ |
---|
472 | @inlineCallbacks |
---|
473 | def setUp(self): |
---|
474 | super(StoragePluginWebPresence, self).setUp() |
---|
475 | |
---|
476 | self.useFixture(UseTestPlugins()) |
---|
477 | |
---|
478 | self.port_assigner = SameProcessStreamEndpointAssigner() |
---|
479 | self.port_assigner.setUp() |
---|
480 | self.addCleanup(self.port_assigner.tearDown) |
---|
481 | self.storage_plugin = u"tahoe-lafs-dummy-v1" |
---|
482 | |
---|
483 | from twisted.internet import reactor |
---|
484 | _, webport_endpoint = self.port_assigner.assign(reactor) |
---|
485 | tubport_location, tubport_endpoint = self.port_assigner.assign(reactor) |
---|
486 | |
---|
487 | tempdir = TempDir() |
---|
488 | self.useFixture(tempdir) |
---|
489 | self.basedir = FilePath(tempdir.path) |
---|
490 | self.basedir.child(u"private").makedirs() |
---|
491 | self.node_fixture = self.useFixture(UseNode( |
---|
492 | plugin_config={ |
---|
493 | "web": "1", |
---|
494 | }, |
---|
495 | node_config={ |
---|
496 | # We don't really need the main Tub listening but if we |
---|
497 | # disable it then we also have to disable storage (because |
---|
498 | # config validation policy). |
---|
499 | "tub.port": tubport_endpoint, |
---|
500 | "tub.location": tubport_location, |
---|
501 | "web.port": str(webport_endpoint), |
---|
502 | }, |
---|
503 | storage_plugin=self.storage_plugin, |
---|
504 | basedir=self.basedir, |
---|
505 | introducer_furl=SOME_FURL, |
---|
506 | )) |
---|
507 | self.node = yield self.node_fixture.create_node() |
---|
508 | self.webish = self.node.getServiceNamed(WebishServer.name) |
---|
509 | self.node.startService() |
---|
510 | self.addCleanup(self.node.stopService) |
---|
511 | self.port = self.webish.getPortnum() |
---|
512 | |
---|
513 | @inlineCallbacks |
---|
514 | def test_plugin_resource_path(self): |
---|
515 | """ |
---|
516 | The plugin's resource is published at */storage-plugins/<plugin name>*. |
---|
517 | """ |
---|
518 | url = u"http://127.0.0.1:{port}/storage-plugins/{plugin_name}".format( |
---|
519 | port=self.port, |
---|
520 | plugin_name=self.storage_plugin, |
---|
521 | ).encode("utf-8") |
---|
522 | result = yield do_http("get", url) |
---|
523 | self.assertThat(loads(result), Equals({"web": "1"})) |
---|
524 | |
---|
525 | @inlineCallbacks |
---|
526 | def test_plugin_resource_persistent_across_requests(self): |
---|
527 | """ |
---|
528 | The plugin's resource is loaded and then saved and re-used for future |
---|
529 | requests. |
---|
530 | """ |
---|
531 | url = URL( |
---|
532 | scheme=u"http", |
---|
533 | host=u"127.0.0.1", |
---|
534 | port=self.port, |
---|
535 | path=( |
---|
536 | u"storage-plugins", |
---|
537 | self.storage_plugin, |
---|
538 | u"counter", |
---|
539 | ), |
---|
540 | ).to_text().encode("utf-8") |
---|
541 | values = { |
---|
542 | loads((yield do_http("get", url)))[u"value"], |
---|
543 | loads((yield do_http("get", url)))[u"value"], |
---|
544 | } |
---|
545 | self.assertThat( |
---|
546 | values, |
---|
547 | # If the counter manages to go up then the state stuck around. |
---|
548 | Equals({1, 2}), |
---|
549 | ) |
---|
550 | |
---|
551 | |
---|
552 | _aCertPEM = Tub().myCertificate.dumpPEM() |
---|
553 | def new_tub(): |
---|
554 | """ |
---|
555 | Make a new ``Tub`` with a hard-coded private key. |
---|
556 | """ |
---|
557 | # Use a private key / certificate generated by Tub how it wants. But just |
---|
558 | # re-use the same one every time so we don't waste a lot of time |
---|
559 | # generating them over and over in the tests. |
---|
560 | return Tub(certData=_aCertPEM) |
---|
561 | |
---|
562 | |
---|
563 | def make_broker(tub_maker=None): |
---|
564 | """ |
---|
565 | Create a ``StorageFarmBroker`` with the given tub maker and an empty |
---|
566 | client configuration. |
---|
567 | """ |
---|
568 | if tub_maker is None: |
---|
569 | tub_maker = lambda handler_overrides: new_tub() |
---|
570 | return StorageFarmBroker(True, tub_maker, EMPTY_CLIENT_CONFIG) |
---|
571 | |
---|
572 | |
---|
573 | @implementer(IStreamClientEndpoint) |
---|
574 | @attr.s |
---|
575 | class SpyEndpoint(object): |
---|
576 | """ |
---|
577 | Observe and record connection attempts. |
---|
578 | |
---|
579 | :ivar list _append: A callable that accepts two-tuples. For each |
---|
580 | attempted connection, it will be called with ``Deferred`` that was |
---|
581 | returned and the ``Factory`` that was passed in. |
---|
582 | """ |
---|
583 | _append = attr.ib() |
---|
584 | |
---|
585 | def connect(self, factory): |
---|
586 | """ |
---|
587 | Record the connection attempt. |
---|
588 | |
---|
589 | :return: A ``Deferred`` that ``SpyEndpoint`` will not fire. |
---|
590 | """ |
---|
591 | d = Deferred() |
---|
592 | self._append((d, factory)) |
---|
593 | return d |
---|
594 | |
---|
595 | |
---|
596 | @implementer(IConnectionHintHandler) # type: ignore # warner/foolscap#78 |
---|
597 | @attr.s |
---|
598 | class SpyHandler(object): |
---|
599 | """ |
---|
600 | A Foolscap connection hint handler for the "spy" hint type. Connections |
---|
601 | are handled by just observing and recording them. |
---|
602 | |
---|
603 | :ivar list _connects: A list containing one element for each connection |
---|
604 | attempted with this handler. Each element is a two-tuple of the |
---|
605 | ``Deferred`` that was returned from ``connect`` and the factory that |
---|
606 | was passed to ``connect``. |
---|
607 | """ |
---|
608 | _connects : list[tuple[Deferred[object], IProtocolFactory]]= attr.ib(default=attr.Factory(list)) |
---|
609 | |
---|
610 | def hint_to_endpoint(self, hint, reactor, update_status): |
---|
611 | return (SpyEndpoint(self._connects.append), hint) |
---|
612 | |
---|
613 | |
---|
614 | class TestStorageFarmBroker(unittest.TestCase): |
---|
615 | |
---|
616 | def test_static_servers(self): |
---|
617 | broker = make_broker() |
---|
618 | |
---|
619 | key_s = b'v0-1234-1' |
---|
620 | servers_yaml = """\ |
---|
621 | storage: |
---|
622 | v0-1234-1: |
---|
623 | ann: |
---|
624 | anonymous-storage-FURL: {furl} |
---|
625 | permutation-seed-base32: aaaaaaaaaaaaaaaaaaaaaaaa |
---|
626 | """.format(furl=SOME_FURL) |
---|
627 | servers = yamlutil.safe_load(servers_yaml) |
---|
628 | permseed = base32.a2b(b"aaaaaaaaaaaaaaaaaaaaaaaa") |
---|
629 | broker.set_static_servers(servers["storage"]) |
---|
630 | self.failUnlessEqual(len(broker._static_server_ids), 1) |
---|
631 | s = broker.servers[key_s] |
---|
632 | self.failUnlessEqual(s.announcement, |
---|
633 | servers["storage"]["v0-1234-1"]["ann"]) |
---|
634 | self.failUnlessEqual(s.get_serverid(), key_s) |
---|
635 | self.assertEqual(s.get_permutation_seed(), permseed) |
---|
636 | |
---|
637 | # if the Introducer announces the same thing, we're supposed to |
---|
638 | # ignore it |
---|
639 | |
---|
640 | ann2 = { |
---|
641 | "service-name": "storage", |
---|
642 | "anonymous-storage-FURL": "pb://{}@nowhere/fake2".format(str(base32.b2a(b"1"), "utf-8")), |
---|
643 | "permutation-seed-base32": "bbbbbbbbbbbbbbbbbbbbbbbb", |
---|
644 | } |
---|
645 | broker._got_announcement(key_s, ann2) |
---|
646 | s2 = broker.servers[key_s] |
---|
647 | self.assertIdentical(s2, s) |
---|
648 | self.assertEqual(s2.get_permutation_seed(), permseed) |
---|
649 | |
---|
650 | def test_upgrade_from_foolscap_to_http(self): |
---|
651 | """ |
---|
652 | When an announcement is initially Foolscap but then switches to HTTP, |
---|
653 | HTTP is used, assuming HTTP is enabled. |
---|
654 | """ |
---|
655 | tub_maker = lambda _: new_tub() |
---|
656 | config = config_from_string( |
---|
657 | "/dev/null", "", "[client]\nforce_foolscap = False\n" |
---|
658 | ) |
---|
659 | broker = StorageFarmBroker(True, tub_maker, config) |
---|
660 | broker.startService() |
---|
661 | self.addCleanup(broker.stopService) |
---|
662 | key_s = b'v0-1234-1' |
---|
663 | |
---|
664 | ones = str(base32.b2a(b"1"), "utf-8") |
---|
665 | initial_announcement = { |
---|
666 | "service-name": "storage", |
---|
667 | "anonymous-storage-FURL": f"pb://{ones}@nowhere/fake2", |
---|
668 | "permutation-seed-base32": "bbbbbbbbbbbbbbbbbbbbbbbb", |
---|
669 | } |
---|
670 | broker._got_announcement(key_s, initial_announcement) |
---|
671 | initial_service = broker.servers[key_s] |
---|
672 | self.assertIsInstance(initial_service, NativeStorageServer) |
---|
673 | self.assertTrue(initial_service.running) |
---|
674 | self.assertIdentical(initial_service.parent, broker) |
---|
675 | |
---|
676 | http_announcement = initial_announcement.copy() |
---|
677 | http_announcement[ANONYMOUS_STORAGE_NURLS] = {f"pb://{ones}@nowhere/fake2#v=1"} |
---|
678 | broker._got_announcement(key_s, http_announcement) |
---|
679 | self.assertFalse(initial_service.running) |
---|
680 | self.assertEqual(initial_service.parent, None) |
---|
681 | new_service = broker.servers[key_s] |
---|
682 | self.assertIsInstance(new_service, HTTPNativeStorageServer) |
---|
683 | self.assertTrue(new_service.running) |
---|
684 | self.assertIdentical(new_service.parent, broker) |
---|
685 | |
---|
686 | |
---|
687 | def test_static_permutation_seed_pubkey(self): |
---|
688 | broker = make_broker() |
---|
689 | server_id = b"v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" |
---|
690 | k = b"4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" |
---|
691 | ann = { |
---|
692 | "anonymous-storage-FURL": SOME_FURL, |
---|
693 | } |
---|
694 | broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}}) |
---|
695 | s = broker.servers[server_id] |
---|
696 | self.assertEqual(s.get_permutation_seed(), base32.a2b(k)) |
---|
697 | |
---|
698 | def test_static_permutation_seed_explicit(self): |
---|
699 | broker = make_broker() |
---|
700 | server_id = b"v0-4uazse3xb6uu5qpkb7tel2bm6bpea4jhuigdhqcuvvse7hugtsia" |
---|
701 | k = b"w5gl5igiexhwmftwzhai5jy2jixn7yx7" |
---|
702 | ann = { |
---|
703 | "anonymous-storage-FURL": SOME_FURL, |
---|
704 | "permutation-seed-base32": k, |
---|
705 | } |
---|
706 | broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}}) |
---|
707 | s = broker.servers[server_id] |
---|
708 | self.assertEqual(s.get_permutation_seed(), base32.a2b(k)) |
---|
709 | |
---|
710 | def test_static_permutation_seed_hashed(self): |
---|
711 | broker = make_broker() |
---|
712 | server_id = b"unparseable" |
---|
713 | ann = { |
---|
714 | "anonymous-storage-FURL": SOME_FURL, |
---|
715 | } |
---|
716 | broker.set_static_servers({server_id.decode("ascii"): {"ann": ann}}) |
---|
717 | s = broker.servers[server_id] |
---|
718 | self.assertEqual(s.get_permutation_seed(), |
---|
719 | hashlib.sha256(server_id).digest()) |
---|
720 | |
---|
721 | @inlineCallbacks |
---|
722 | def test_threshold_reached(self): |
---|
723 | """ |
---|
724 | ``StorageFarmBroker.when_connected_enough`` returns a ``Deferred`` which |
---|
725 | only fires after the ``StorageFarmBroker`` has established at least as |
---|
726 | many connections as requested. |
---|
727 | """ |
---|
728 | introducer = MemoryIntroducerClient( |
---|
729 | new_tub(), |
---|
730 | SOME_FURL, |
---|
731 | b"", |
---|
732 | None, |
---|
733 | None, |
---|
734 | None, |
---|
735 | None, |
---|
736 | ) |
---|
737 | new_tubs = [] |
---|
738 | def make_tub(*args, **kwargs): |
---|
739 | return new_tubs.pop() |
---|
740 | broker = make_broker(make_tub) |
---|
741 | # Start the broker so that it will start Tubs attached to it so they |
---|
742 | # will attempt to make connections as necessary so that we can observe |
---|
743 | # those connections. |
---|
744 | broker.startService() |
---|
745 | self.addCleanup(broker.stopService) |
---|
746 | done = broker.when_connected_enough(5) |
---|
747 | broker.use_introducer(introducer) |
---|
748 | # subscribes to "storage" to learn of new storage nodes |
---|
749 | [subscribe] = introducer.subscribed_to |
---|
750 | self.assertEqual( |
---|
751 | subscribe.service_name, |
---|
752 | "storage", |
---|
753 | ) |
---|
754 | got_announcement = subscribe.cb |
---|
755 | |
---|
756 | data = { |
---|
757 | "service-name": "storage", |
---|
758 | "anonymous-storage-FURL": None, |
---|
759 | "permutation-seed-base32": "aaaaaaaaaaaaaaaaaaaaaaaa", |
---|
760 | } |
---|
761 | |
---|
762 | def add_one_server(x): |
---|
763 | data["anonymous-storage-FURL"] = "pb://%s@spy:nowhere/fake" % (str(base32.b2a(b"%d" % x), "ascii"),) |
---|
764 | tub = new_tub() |
---|
765 | connects = [] |
---|
766 | spy = SpyHandler(connects) |
---|
767 | tub.addConnectionHintHandler("spy", spy) |
---|
768 | new_tubs.append(tub) |
---|
769 | got_announcement(b'v0-1234-%d' % x, data) |
---|
770 | |
---|
771 | self.assertEqual( |
---|
772 | 1, len(connects), |
---|
773 | "Expected one connection attempt, got {!r} instead".format(connects), |
---|
774 | ) |
---|
775 | |
---|
776 | # Skip over all the Foolscap negotiation. It's complex with lots |
---|
777 | # of pieces and I don't want to figure out how to fake |
---|
778 | # it. -exarkun |
---|
779 | native = broker.servers[b"v0-1234-%d" % (x,)] |
---|
780 | rref = LocalWrapper(StorageServer(self.mktemp(), b"x" * 20)) |
---|
781 | native._got_connection(rref) |
---|
782 | |
---|
783 | # first 4 shouldn't trigger connected_threashold |
---|
784 | for x in range(4): |
---|
785 | add_one_server(x) |
---|
786 | self.assertFalse(done.called) |
---|
787 | |
---|
788 | # ...but the 5th *should* trigger the threshold |
---|
789 | add_one_server(42) |
---|
790 | |
---|
791 | # so: the OneShotObserverList only notifies via |
---|
792 | # foolscap.eventually() -- which forces the Deferred call |
---|
793 | # through the reactor -- so it's no longer synchronous, |
---|
794 | # meaning that we have to do "real reactor stuff" for the |
---|
795 | # Deferred from when_connected_enough() to actually fire. (or |
---|
796 | # @patch() out the reactor in foolscap.eventually to be a |
---|
797 | # Clock() so we can advance time ourselves, but ... luckily |
---|
798 | # eventually() uses 0 as the timeout currently) |
---|
799 | |
---|
800 | yield done |
---|
801 | self.assertTrue(done.called) |
---|
802 | |
---|
803 | def test_should_we_use_http_default(self): |
---|
804 | """Default is to use HTTP.""" |
---|
805 | basedir = self.mktemp() |
---|
806 | node_config = config_from_string(basedir, "", "") |
---|
807 | announcement = {ANONYMOUS_STORAGE_NURLS: ["pb://..."]} |
---|
808 | self.assertTrue( |
---|
809 | StorageFarmBroker._should_we_use_http(node_config, announcement) |
---|
810 | ) |
---|
811 | # Lacking NURLs, we can't use HTTP: |
---|
812 | self.assertFalse( |
---|
813 | StorageFarmBroker._should_we_use_http(node_config, {}) |
---|
814 | ) |
---|
815 | |
---|
816 | def test_should_we_use_http(self): |
---|
817 | """ |
---|
818 | If HTTP is allowed, it will only be used if the announcement includes |
---|
819 | some NURLs. |
---|
820 | """ |
---|
821 | basedir = self.mktemp() |
---|
822 | |
---|
823 | no_nurls = {} |
---|
824 | empty_nurls = {ANONYMOUS_STORAGE_NURLS: []} |
---|
825 | has_nurls = {ANONYMOUS_STORAGE_NURLS: ["pb://.."]} |
---|
826 | |
---|
827 | for force_foolscap, announcement, expected_http_usage in [ |
---|
828 | ("false", no_nurls, False), |
---|
829 | ("false", empty_nurls, False), |
---|
830 | ("false", has_nurls, True), |
---|
831 | ("true", empty_nurls, False), |
---|
832 | ("true", no_nurls, False), |
---|
833 | ("true", has_nurls, False), |
---|
834 | ]: |
---|
835 | node_config = config_from_string( |
---|
836 | basedir, "", f"[client]\nforce_foolscap = {force_foolscap}" |
---|
837 | ) |
---|
838 | self.assertEqual( |
---|
839 | StorageFarmBroker._should_we_use_http(node_config, announcement), |
---|
840 | expected_http_usage |
---|
841 | ) |
---|
842 | |
---|
843 | |
---|
844 | class PickHTTPServerTests(unittest.SynchronousTestCase): |
---|
845 | """Tests for ``_pick_a_http_server``.""" |
---|
846 | |
---|
847 | def pick_result(self, url_to_results: dict[DecodedURL, tuple[float, Union[Exception, Any]]]) -> Deferred[DecodedURL]: |
---|
848 | """ |
---|
849 | Given mapping of URLs to (delay, result), return the URL of the |
---|
850 | first selected server, or None. |
---|
851 | """ |
---|
852 | clock = Clock() |
---|
853 | |
---|
854 | def request(reactor, url): |
---|
855 | delay, value = url_to_results[url] |
---|
856 | result = Deferred() |
---|
857 | def add_result_value(): |
---|
858 | if isinstance(value, Exception): |
---|
859 | result.errback(value) |
---|
860 | else: |
---|
861 | result.callback(value) |
---|
862 | reactor.callLater(delay, add_result_value) |
---|
863 | return result |
---|
864 | |
---|
865 | d = _pick_a_http_server(clock, list(url_to_results.keys()), request) |
---|
866 | for i in range(100): |
---|
867 | clock.advance(0.1) |
---|
868 | return d |
---|
869 | |
---|
870 | def test_first_successful_connect_is_picked(self): |
---|
871 | """ |
---|
872 | Given multiple good URLs, the first one that connects is chosen. |
---|
873 | """ |
---|
874 | earliest_url = DecodedURL.from_text("http://a") |
---|
875 | latest_url = DecodedURL.from_text("http://b") |
---|
876 | bad_url = DecodedURL.from_text("http://bad") |
---|
877 | result = self.pick_result({ |
---|
878 | latest_url: (2, None), |
---|
879 | earliest_url: (1, None), |
---|
880 | bad_url: (0.5, RuntimeError()), |
---|
881 | }) |
---|
882 | self.assertEqual(self.successResultOf(result), earliest_url) |
---|
883 | |
---|
884 | def test_failures_include_all_reasons(self): |
---|
885 | """ |
---|
886 | If all the requests fail, ``_pick_a_http_server`` raises a |
---|
887 | ``allmydata.util.deferredutil.MultiFailure``. |
---|
888 | """ |
---|
889 | eventually_good_url = DecodedURL.from_text("http://good") |
---|
890 | bad_url = DecodedURL.from_text("http://bad") |
---|
891 | exception1 = RuntimeError() |
---|
892 | exception2 = ZeroDivisionError() |
---|
893 | result = self.pick_result({ |
---|
894 | eventually_good_url: (1, exception1), |
---|
895 | bad_url: (0.1, exception2), |
---|
896 | }) |
---|
897 | exc = self.failureResultOf(result).value |
---|
898 | self.assertIsInstance(exc, MultiFailure) |
---|
899 | self.assertEqual({f.value for f in exc.failures}, {exception2, exception1}) |
---|