1 | """ |
---|
2 | HTTP client that talks to the HTTP storage server. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import annotations |
---|
6 | |
---|
7 | |
---|
8 | from typing import ( |
---|
9 | Optional, |
---|
10 | Sequence, |
---|
11 | Mapping, |
---|
12 | BinaryIO, |
---|
13 | cast, |
---|
14 | TypedDict, |
---|
15 | Set, |
---|
16 | Dict, |
---|
17 | Callable, |
---|
18 | ClassVar, |
---|
19 | ) |
---|
20 | from base64 import b64encode |
---|
21 | from io import BytesIO |
---|
22 | from os import SEEK_END |
---|
23 | |
---|
24 | from attrs import define, asdict, frozen, field |
---|
25 | from eliot import start_action, register_exception_extractor |
---|
26 | from eliot.twisted import DeferredContext |
---|
27 | |
---|
28 | from pycddl import Schema |
---|
29 | from collections_extended import RangeMap |
---|
30 | from werkzeug.datastructures import Range, ContentRange |
---|
31 | from twisted.web.http_headers import Headers |
---|
32 | from twisted.web import http |
---|
33 | from twisted.web.iweb import IPolicyForHTTPS, IResponse, IAgent |
---|
34 | from twisted.internet.defer import Deferred, succeed |
---|
35 | from twisted.internet.interfaces import ( |
---|
36 | IOpenSSLClientConnectionCreator, |
---|
37 | IReactorTime, |
---|
38 | IDelayedCall, |
---|
39 | ) |
---|
40 | from twisted.internet.ssl import CertificateOptions |
---|
41 | from twisted.protocols.tls import TLSMemoryBIOProtocol |
---|
42 | from twisted.web.client import Agent, HTTPConnectionPool |
---|
43 | from zope.interface import implementer |
---|
44 | from hyperlink import DecodedURL |
---|
45 | import treq |
---|
46 | from treq.client import HTTPClient |
---|
47 | from OpenSSL import SSL |
---|
48 | from werkzeug.http import parse_content_range_header |
---|
49 | |
---|
50 | from .http_common import ( |
---|
51 | swissnum_auth_header, |
---|
52 | Secrets, |
---|
53 | get_content_type, |
---|
54 | CBOR_MIME_TYPE, |
---|
55 | get_spki_hash, |
---|
56 | response_is_not_html, |
---|
57 | ) |
---|
58 | from ..interfaces import VersionMessage |
---|
59 | from .common import si_b2a, si_to_human_readable |
---|
60 | from ..util.hashutil import timing_safe_compare |
---|
61 | from ..util.deferredutil import async_to_deferred |
---|
62 | from ..util.tor_provider import _Provider as TorProvider |
---|
63 | from ..util.cputhreadpool import defer_to_thread |
---|
64 | from ..util.cbor import dumps |
---|
65 | |
---|
66 | try: |
---|
67 | from txtorcon import Tor # type: ignore |
---|
68 | except ImportError: |
---|
69 | |
---|
70 | class Tor: # type: ignore[no-redef] |
---|
71 | pass |
---|
72 | |
---|
73 | |
---|
74 | def _encode_si(si: bytes) -> str: |
---|
75 | """Encode the storage index into Unicode string.""" |
---|
76 | return str(si_b2a(si), "ascii") |
---|
77 | |
---|
78 | |
---|
79 | class ClientException(Exception): |
---|
80 | """An unexpected response code from the server.""" |
---|
81 | |
---|
82 | def __init__( |
---|
83 | self, code: int, message: Optional[str] = None, body: Optional[bytes] = None |
---|
84 | ): |
---|
85 | Exception.__init__(self, code, message, body) |
---|
86 | self.code = code |
---|
87 | self.message = message |
---|
88 | self.body = body |
---|
89 | |
---|
90 | |
---|
91 | register_exception_extractor(ClientException, lambda e: {"response_code": e.code}) |
---|
92 | |
---|
93 | |
---|
94 | # Schemas for server responses. |
---|
95 | # |
---|
96 | # Tags are of the form #6.nnn, where the number is documented at |
---|
97 | # https://www.iana.org/assignments/cbor-tags/cbor-tags.xhtml. Notably, #6.258 |
---|
98 | # indicates a set. |
---|
99 | _SCHEMAS: Mapping[str, Schema] = { |
---|
100 | "get_version": Schema( |
---|
101 | # Note that the single-quoted (`'`) string keys in this schema |
---|
102 | # represent *byte* strings - per the CDDL specification. Text strings |
---|
103 | # are represented using strings with *double* quotes (`"`). |
---|
104 | """ |
---|
105 | response = {'http://allmydata.org/tahoe/protocols/storage/v1' => { |
---|
106 | 'maximum-immutable-share-size' => uint |
---|
107 | 'maximum-mutable-share-size' => uint |
---|
108 | 'available-space' => uint |
---|
109 | } |
---|
110 | 'application-version' => bstr |
---|
111 | } |
---|
112 | """ |
---|
113 | ), |
---|
114 | "allocate_buckets": Schema( |
---|
115 | """ |
---|
116 | response = { |
---|
117 | already-have: #6.258([0*256 uint]) |
---|
118 | allocated: #6.258([0*256 uint]) |
---|
119 | } |
---|
120 | """ |
---|
121 | ), |
---|
122 | "immutable_write_share_chunk": Schema( |
---|
123 | """ |
---|
124 | response = { |
---|
125 | required: [0* {begin: uint, end: uint}] |
---|
126 | } |
---|
127 | """ |
---|
128 | ), |
---|
129 | "list_shares": Schema( |
---|
130 | """ |
---|
131 | response = #6.258([0*256 uint]) |
---|
132 | """ |
---|
133 | ), |
---|
134 | "mutable_read_test_write": Schema( |
---|
135 | """ |
---|
136 | response = { |
---|
137 | "success": bool, |
---|
138 | "data": {0*256 share_number: [0* bstr]} |
---|
139 | } |
---|
140 | share_number = uint |
---|
141 | """ |
---|
142 | ), |
---|
143 | "mutable_list_shares": Schema( |
---|
144 | """ |
---|
145 | response = #6.258([0*256 uint]) |
---|
146 | """ |
---|
147 | ), |
---|
148 | } |
---|
149 | |
---|
150 | |
---|
151 | @define |
---|
152 | class _LengthLimitedCollector: |
---|
153 | """ |
---|
154 | Collect data using ``treq.collect()``, with limited length. |
---|
155 | """ |
---|
156 | |
---|
157 | remaining_length: int |
---|
158 | timeout_on_silence: IDelayedCall |
---|
159 | f: BytesIO = field(factory=BytesIO) |
---|
160 | |
---|
161 | def __call__(self, data: bytes) -> None: |
---|
162 | self.timeout_on_silence.reset(60) |
---|
163 | self.remaining_length -= len(data) |
---|
164 | if self.remaining_length < 0: |
---|
165 | raise ValueError("Response length was too long") |
---|
166 | self.f.write(data) |
---|
167 | |
---|
168 | |
---|
169 | def limited_content( |
---|
170 | response: IResponse, |
---|
171 | clock: IReactorTime, |
---|
172 | max_length: int = 30 * 1024 * 1024, |
---|
173 | ) -> Deferred[BinaryIO]: |
---|
174 | """ |
---|
175 | Like ``treq.content()``, but limit data read from the response to a set |
---|
176 | length. If the response is longer than the max allowed length, the result |
---|
177 | fails with a ``ValueError``. |
---|
178 | |
---|
179 | A potentially useful future improvement would be using a temporary file to |
---|
180 | store the content; since filesystem buffering means that would use memory |
---|
181 | for small responses and disk for large responses. |
---|
182 | |
---|
183 | This will time out if no data is received for 60 seconds; so long as a |
---|
184 | trickle of data continues to arrive, it will continue to run. |
---|
185 | """ |
---|
186 | result_deferred = succeed(None) |
---|
187 | |
---|
188 | # Sadly, addTimeout() won't work because we need access to the IDelayedCall |
---|
189 | # in order to reset it on each data chunk received. |
---|
190 | timeout = clock.callLater(60, result_deferred.cancel) |
---|
191 | collector = _LengthLimitedCollector(max_length, timeout) |
---|
192 | |
---|
193 | with start_action( |
---|
194 | action_type="allmydata:storage:http-client:limited-content", |
---|
195 | max_length=max_length, |
---|
196 | ).context(): |
---|
197 | d = DeferredContext(result_deferred) |
---|
198 | |
---|
199 | # Make really sure everything gets called in Deferred context, treq might |
---|
200 | # call collector directly... |
---|
201 | d.addCallback(lambda _: treq.collect(response, collector)) |
---|
202 | |
---|
203 | def done(_: object) -> BytesIO: |
---|
204 | timeout.cancel() |
---|
205 | collector.f.seek(0) |
---|
206 | return collector.f |
---|
207 | |
---|
208 | def failed(f): |
---|
209 | if timeout.active(): |
---|
210 | timeout.cancel() |
---|
211 | return f |
---|
212 | |
---|
213 | result = d.addCallbacks(done, failed) |
---|
214 | return result.addActionFinish() |
---|
215 | |
---|
216 | |
---|
217 | @define |
---|
218 | class ImmutableCreateResult(object): |
---|
219 | """Result of creating a storage index for an immutable.""" |
---|
220 | |
---|
221 | already_have: set[int] |
---|
222 | allocated: set[int] |
---|
223 | |
---|
224 | |
---|
225 | class _TLSContextFactory(CertificateOptions): |
---|
226 | """ |
---|
227 | Create a context that validates the way Tahoe-LAFS wants to: based on a |
---|
228 | pinned certificate hash, rather than a certificate authority. |
---|
229 | |
---|
230 | Originally implemented as part of Foolscap. To comply with the license, |
---|
231 | here's the original licensing terms: |
---|
232 | |
---|
233 | Copyright (c) 2006-2008 Brian Warner |
---|
234 | |
---|
235 | Permission is hereby granted, free of charge, to any person obtaining a |
---|
236 | copy of this software and associated documentation files (the "Software"), |
---|
237 | to deal in the Software without restriction, including without limitation |
---|
238 | the rights to use, copy, modify, merge, publish, distribute, sublicense, |
---|
239 | and/or sell copies of the Software, and to permit persons to whom the |
---|
240 | Software is furnished to do so, subject to the following conditions: |
---|
241 | |
---|
242 | The above copyright notice and this permission notice shall be included in |
---|
243 | all copies or substantial portions of the Software. |
---|
244 | |
---|
245 | THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
---|
246 | IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
---|
247 | FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL |
---|
248 | THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
---|
249 | LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING |
---|
250 | FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
---|
251 | DEALINGS IN THE SOFTWARE. |
---|
252 | """ |
---|
253 | |
---|
254 | def __init__(self, expected_spki_hash: bytes): |
---|
255 | self.expected_spki_hash = expected_spki_hash |
---|
256 | CertificateOptions.__init__(self) |
---|
257 | |
---|
258 | def getContext(self) -> SSL.Context: |
---|
259 | def always_validate(conn, cert, errno, depth, preverify_ok): |
---|
260 | # This function is called to validate the certificate received by |
---|
261 | # the other end. OpenSSL calls it multiple times, for each errno |
---|
262 | # for each certificate. |
---|
263 | |
---|
264 | # We do not care about certificate authorities or revocation |
---|
265 | # lists, we just want to know that the certificate has a valid |
---|
266 | # signature and follow the chain back to one which is |
---|
267 | # self-signed. We need to protect against forged signatures, but |
---|
268 | # not the usual TLS concerns about invalid CAs or revoked |
---|
269 | # certificates. |
---|
270 | things_are_ok = ( |
---|
271 | SSL.X509VerificationCodes.OK, |
---|
272 | SSL.X509VerificationCodes.ERR_CERT_NOT_YET_VALID, |
---|
273 | SSL.X509VerificationCodes.ERR_CERT_HAS_EXPIRED, |
---|
274 | SSL.X509VerificationCodes.ERR_DEPTH_ZERO_SELF_SIGNED_CERT, |
---|
275 | SSL.X509VerificationCodes.ERR_SELF_SIGNED_CERT_IN_CHAIN, |
---|
276 | ) |
---|
277 | # TODO can we do this once instead of multiple times? |
---|
278 | if errno in things_are_ok and timing_safe_compare( |
---|
279 | get_spki_hash(cert.to_cryptography()), self.expected_spki_hash |
---|
280 | ): |
---|
281 | return 1 |
---|
282 | # TODO: log the details of the error, because otherwise they get |
---|
283 | # lost in the PyOpenSSL exception that will eventually be raised |
---|
284 | # (possibly OpenSSL.SSL.Error: certificate verify failed) |
---|
285 | return 0 |
---|
286 | |
---|
287 | ctx = CertificateOptions.getContext(self) |
---|
288 | |
---|
289 | # VERIFY_PEER means we ask the the other end for their certificate. |
---|
290 | ctx.set_verify(SSL.VERIFY_PEER, always_validate) |
---|
291 | return ctx |
---|
292 | |
---|
293 | |
---|
294 | @implementer(IPolicyForHTTPS) |
---|
295 | @implementer(IOpenSSLClientConnectionCreator) |
---|
296 | @define |
---|
297 | class _StorageClientHTTPSPolicy: |
---|
298 | """ |
---|
299 | A HTTPS policy that ensures the SPKI hash of the public key matches a known |
---|
300 | hash, i.e. pinning-based validation. |
---|
301 | """ |
---|
302 | |
---|
303 | expected_spki_hash: bytes |
---|
304 | |
---|
305 | # IPolicyForHTTPS |
---|
306 | def creatorForNetloc(self, hostname: str, port: int) -> _StorageClientHTTPSPolicy: |
---|
307 | return self |
---|
308 | |
---|
309 | # IOpenSSLClientConnectionCreator |
---|
310 | def clientConnectionForTLS( |
---|
311 | self, tlsProtocol: TLSMemoryBIOProtocol |
---|
312 | ) -> SSL.Connection: |
---|
313 | return SSL.Connection( |
---|
314 | _TLSContextFactory(self.expected_spki_hash).getContext(), None |
---|
315 | ) |
---|
316 | |
---|
317 | |
---|
318 | @define |
---|
319 | class StorageClientFactory: |
---|
320 | """ |
---|
321 | Create ``StorageClient`` instances, using appropriate |
---|
322 | ``twisted.web.iweb.IAgent`` for different connection methods: normal TCP, |
---|
323 | Tor, and eventually I2P. |
---|
324 | |
---|
325 | There is some caching involved since there might be shared setup work, e.g. |
---|
326 | connecting to the local Tor service only needs to happen once. |
---|
327 | """ |
---|
328 | |
---|
329 | _default_connection_handlers: dict[str, str] |
---|
330 | _tor_provider: Optional[TorProvider] |
---|
331 | # Cache the Tor instance created by the provider, if relevant. |
---|
332 | _tor_instance: Optional[Tor] = None |
---|
333 | |
---|
334 | # If set, we're doing unit testing and we should call this with any |
---|
335 | # HTTPConnectionPool that gets passed/created to ``create_agent()``. |
---|
336 | TEST_MODE_REGISTER_HTTP_POOL: ClassVar[ |
---|
337 | Optional[Callable[[HTTPConnectionPool], None]] |
---|
338 | ] = None |
---|
339 | |
---|
340 | @classmethod |
---|
341 | def start_test_mode(cls, callback: Callable[[HTTPConnectionPool], None]) -> None: |
---|
342 | """Switch to testing mode. |
---|
343 | |
---|
344 | In testing mode we register the pool with test system using the given |
---|
345 | callback so it can Do Things, most notably killing off idle HTTP |
---|
346 | connections at test shutdown and, in some tests, in the midddle of the |
---|
347 | test. |
---|
348 | """ |
---|
349 | cls.TEST_MODE_REGISTER_HTTP_POOL = callback |
---|
350 | |
---|
351 | @classmethod |
---|
352 | def stop_test_mode(cls) -> None: |
---|
353 | """Stop testing mode.""" |
---|
354 | cls.TEST_MODE_REGISTER_HTTP_POOL = None |
---|
355 | |
---|
356 | async def _create_agent( |
---|
357 | self, |
---|
358 | nurl: DecodedURL, |
---|
359 | reactor: object, |
---|
360 | tls_context_factory: IPolicyForHTTPS, |
---|
361 | pool: HTTPConnectionPool, |
---|
362 | ) -> IAgent: |
---|
363 | """Create a new ``IAgent``, possibly using Tor.""" |
---|
364 | if self.TEST_MODE_REGISTER_HTTP_POOL is not None: |
---|
365 | self.TEST_MODE_REGISTER_HTTP_POOL(pool) |
---|
366 | |
---|
367 | # TODO default_connection_handlers should really be an object, not a |
---|
368 | # dict, so we can ask "is this using Tor" without poking at a |
---|
369 | # dictionary with arbitrary strings... See |
---|
370 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4032 |
---|
371 | handler = self._default_connection_handlers["tcp"] |
---|
372 | |
---|
373 | if handler == "tcp": |
---|
374 | return Agent(reactor, tls_context_factory, pool=pool) |
---|
375 | if handler == "tor" or nurl.scheme == "pb+tor": |
---|
376 | assert self._tor_provider is not None |
---|
377 | if self._tor_instance is None: |
---|
378 | self._tor_instance = await self._tor_provider.get_tor_instance(reactor) |
---|
379 | return self._tor_instance.web_agent( |
---|
380 | pool=pool, tls_context_factory=tls_context_factory |
---|
381 | ) |
---|
382 | else: |
---|
383 | # I2P support will be added here. See |
---|
384 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4037 |
---|
385 | raise RuntimeError(f"Unsupported tcp connection handler: {handler}") |
---|
386 | |
---|
387 | async def create_storage_client( |
---|
388 | self, |
---|
389 | nurl: DecodedURL, |
---|
390 | reactor: IReactorTime, |
---|
391 | pool: Optional[HTTPConnectionPool] = None, |
---|
392 | ) -> StorageClient: |
---|
393 | """Create a new ``StorageClient`` for the given NURL.""" |
---|
394 | assert nurl.fragment == "v=1" |
---|
395 | assert nurl.scheme in ("pb", "pb+tor") |
---|
396 | if pool is None: |
---|
397 | pool = HTTPConnectionPool(reactor) |
---|
398 | pool.maxPersistentPerHost = 10 |
---|
399 | |
---|
400 | certificate_hash = nurl.user.encode("ascii") |
---|
401 | agent = await self._create_agent( |
---|
402 | nurl, |
---|
403 | reactor, |
---|
404 | _StorageClientHTTPSPolicy(expected_spki_hash=certificate_hash), |
---|
405 | pool, |
---|
406 | ) |
---|
407 | treq_client = HTTPClient(agent) |
---|
408 | https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port) |
---|
409 | swissnum = nurl.path[0].encode("ascii") |
---|
410 | response_check = lambda _: None |
---|
411 | if self.TEST_MODE_REGISTER_HTTP_POOL is not None: |
---|
412 | response_check = response_is_not_html |
---|
413 | |
---|
414 | return StorageClient( |
---|
415 | https_url, |
---|
416 | swissnum, |
---|
417 | treq_client, |
---|
418 | pool, |
---|
419 | reactor, |
---|
420 | response_check, |
---|
421 | ) |
---|
422 | |
---|
423 | |
---|
424 | @define(hash=True) |
---|
425 | class StorageClient(object): |
---|
426 | """ |
---|
427 | Low-level HTTP client that talks to the HTTP storage server. |
---|
428 | |
---|
429 | Create using a ``StorageClientFactory`` instance. |
---|
430 | """ |
---|
431 | |
---|
432 | # The URL should be a HTTPS URL ("https://...") |
---|
433 | _base_url: DecodedURL |
---|
434 | _swissnum: bytes |
---|
435 | _treq: HTTPClient |
---|
436 | _pool: HTTPConnectionPool |
---|
437 | _clock: IReactorTime |
---|
438 | # Are we running unit tests? |
---|
439 | _analyze_response: Callable[[IResponse], None] = lambda _: None |
---|
440 | |
---|
441 | def relative_url(self, path: str) -> DecodedURL: |
---|
442 | """Get a URL relative to the base URL.""" |
---|
443 | return self._base_url.click(path) |
---|
444 | |
---|
445 | def _get_headers(self, headers: Optional[Headers]) -> Headers: |
---|
446 | """Return the basic headers to be used by default.""" |
---|
447 | if headers is None: |
---|
448 | headers = Headers() |
---|
449 | headers.addRawHeader( |
---|
450 | "Authorization", |
---|
451 | swissnum_auth_header(self._swissnum), |
---|
452 | ) |
---|
453 | return headers |
---|
454 | |
---|
455 | @async_to_deferred |
---|
456 | async def request( |
---|
457 | self, |
---|
458 | method: str, |
---|
459 | url: DecodedURL, |
---|
460 | lease_renew_secret: Optional[bytes] = None, |
---|
461 | lease_cancel_secret: Optional[bytes] = None, |
---|
462 | upload_secret: Optional[bytes] = None, |
---|
463 | write_enabler_secret: Optional[bytes] = None, |
---|
464 | headers: Optional[Headers] = None, |
---|
465 | message_to_serialize: object = None, |
---|
466 | timeout: float = 60, |
---|
467 | **kwargs, |
---|
468 | ) -> IResponse: |
---|
469 | """ |
---|
470 | Like ``treq.request()``, but with optional secrets that get translated |
---|
471 | into corresponding HTTP headers. |
---|
472 | |
---|
473 | If ``message_to_serialize`` is set, it will be serialized (by default |
---|
474 | with CBOR) and set as the request body. It should not be mutated |
---|
475 | during execution of this function! |
---|
476 | |
---|
477 | Default timeout is 60 seconds. |
---|
478 | """ |
---|
479 | with start_action( |
---|
480 | action_type="allmydata:storage:http-client:request", |
---|
481 | method=method, |
---|
482 | url=url.to_text(), |
---|
483 | timeout=timeout, |
---|
484 | ) as ctx: |
---|
485 | response = await self._request( |
---|
486 | method, |
---|
487 | url, |
---|
488 | lease_renew_secret, |
---|
489 | lease_cancel_secret, |
---|
490 | upload_secret, |
---|
491 | write_enabler_secret, |
---|
492 | headers, |
---|
493 | message_to_serialize, |
---|
494 | timeout, |
---|
495 | **kwargs, |
---|
496 | ) |
---|
497 | ctx.add_success_fields(response_code=response.code) |
---|
498 | return response |
---|
499 | |
---|
500 | async def _request( |
---|
501 | self, |
---|
502 | method: str, |
---|
503 | url: DecodedURL, |
---|
504 | lease_renew_secret: Optional[bytes] = None, |
---|
505 | lease_cancel_secret: Optional[bytes] = None, |
---|
506 | upload_secret: Optional[bytes] = None, |
---|
507 | write_enabler_secret: Optional[bytes] = None, |
---|
508 | headers: Optional[Headers] = None, |
---|
509 | message_to_serialize: object = None, |
---|
510 | timeout: float = 60, |
---|
511 | **kwargs, |
---|
512 | ) -> IResponse: |
---|
513 | """The implementation of request().""" |
---|
514 | headers = self._get_headers(headers) |
---|
515 | |
---|
516 | # Add secrets: |
---|
517 | for secret, value in [ |
---|
518 | (Secrets.LEASE_RENEW, lease_renew_secret), |
---|
519 | (Secrets.LEASE_CANCEL, lease_cancel_secret), |
---|
520 | (Secrets.UPLOAD, upload_secret), |
---|
521 | (Secrets.WRITE_ENABLER, write_enabler_secret), |
---|
522 | ]: |
---|
523 | if value is None: |
---|
524 | continue |
---|
525 | headers.addRawHeader( |
---|
526 | "X-Tahoe-Authorization", |
---|
527 | b"%s %s" % (secret.value.encode("ascii"), b64encode(value).strip()), |
---|
528 | ) |
---|
529 | |
---|
530 | # Note we can accept CBOR: |
---|
531 | headers.addRawHeader("Accept", CBOR_MIME_TYPE) |
---|
532 | |
---|
533 | # If there's a request message, serialize it and set the Content-Type |
---|
534 | # header: |
---|
535 | if message_to_serialize is not None: |
---|
536 | if "data" in kwargs: |
---|
537 | raise TypeError( |
---|
538 | "Can't use both `message_to_serialize` and `data` " |
---|
539 | "as keyword arguments at the same time" |
---|
540 | ) |
---|
541 | kwargs["data"] = await defer_to_thread(dumps, message_to_serialize) |
---|
542 | headers.addRawHeader("Content-Type", CBOR_MIME_TYPE) |
---|
543 | |
---|
544 | response = await self._treq.request( |
---|
545 | method, url, headers=headers, timeout=timeout, **kwargs |
---|
546 | ) |
---|
547 | self._analyze_response(response) |
---|
548 | |
---|
549 | return response |
---|
550 | |
---|
551 | async def decode_cbor(self, response: IResponse, schema: Schema) -> object: |
---|
552 | """Given HTTP response, return decoded CBOR body.""" |
---|
553 | with start_action(action_type="allmydata:storage:http-client:decode-cbor"): |
---|
554 | if response.code > 199 and response.code < 300: |
---|
555 | content_type = get_content_type(response.headers) |
---|
556 | if content_type == CBOR_MIME_TYPE: |
---|
557 | f = await limited_content(response, self._clock) |
---|
558 | data = f.read() |
---|
559 | |
---|
560 | def validate_and_decode(): |
---|
561 | return schema.validate_cbor(data, True) |
---|
562 | |
---|
563 | return await defer_to_thread(validate_and_decode) |
---|
564 | else: |
---|
565 | raise ClientException( |
---|
566 | -1, |
---|
567 | "Server didn't send CBOR, content type is {}".format( |
---|
568 | content_type |
---|
569 | ), |
---|
570 | ) |
---|
571 | else: |
---|
572 | data = ( |
---|
573 | await limited_content(response, self._clock, max_length=10_000) |
---|
574 | ).read() |
---|
575 | raise ClientException(response.code, response.phrase, data) |
---|
576 | |
---|
577 | def shutdown(self) -> Deferred[object]: |
---|
578 | """Shutdown any connections.""" |
---|
579 | return self._pool.closeCachedConnections() |
---|
580 | |
---|
581 | |
---|
582 | @define(hash=True) |
---|
583 | class StorageClientGeneral(object): |
---|
584 | """ |
---|
585 | High-level HTTP APIs that aren't immutable- or mutable-specific. |
---|
586 | """ |
---|
587 | |
---|
588 | _client: StorageClient |
---|
589 | |
---|
590 | @async_to_deferred |
---|
591 | async def get_version(self) -> VersionMessage: |
---|
592 | """ |
---|
593 | Return the version metadata for the server. |
---|
594 | """ |
---|
595 | with start_action( |
---|
596 | action_type="allmydata:storage:http-client:get-version", |
---|
597 | ): |
---|
598 | return await self._get_version() |
---|
599 | |
---|
600 | async def _get_version(self) -> VersionMessage: |
---|
601 | """Implementation of get_version().""" |
---|
602 | url = self._client.relative_url("/storage/v1/version") |
---|
603 | response = await self._client.request("GET", url) |
---|
604 | decoded_response = cast( |
---|
605 | Dict[bytes, object], |
---|
606 | await self._client.decode_cbor(response, _SCHEMAS["get_version"]), |
---|
607 | ) |
---|
608 | # Add some features we know are true because the HTTP API |
---|
609 | # specification requires them and because other parts of the storage |
---|
610 | # client implementation assumes they will be present. |
---|
611 | cast( |
---|
612 | Dict[bytes, object], |
---|
613 | decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"], |
---|
614 | ).update( |
---|
615 | { |
---|
616 | b"tolerates-immutable-read-overrun": True, |
---|
617 | b"delete-mutable-shares-with-zero-length-writev": True, |
---|
618 | b"fills-holes-with-zero-bytes": True, |
---|
619 | b"prevents-read-past-end-of-share-data": True, |
---|
620 | } |
---|
621 | ) |
---|
622 | return decoded_response |
---|
623 | |
---|
624 | @async_to_deferred |
---|
625 | async def add_or_renew_lease( |
---|
626 | self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes |
---|
627 | ) -> None: |
---|
628 | """ |
---|
629 | Add or renew a lease. |
---|
630 | |
---|
631 | If the renewal secret matches an existing lease, it is renewed. |
---|
632 | Otherwise a new lease is added. |
---|
633 | """ |
---|
634 | with start_action( |
---|
635 | action_type="allmydata:storage:http-client:add-or-renew-lease", |
---|
636 | storage_index=si_to_human_readable(storage_index), |
---|
637 | ): |
---|
638 | return await self._add_or_renew_lease( |
---|
639 | storage_index, renew_secret, cancel_secret |
---|
640 | ) |
---|
641 | |
---|
642 | async def _add_or_renew_lease( |
---|
643 | self, storage_index: bytes, renew_secret: bytes, cancel_secret: bytes |
---|
644 | ) -> None: |
---|
645 | url = self._client.relative_url( |
---|
646 | "/storage/v1/lease/{}".format(_encode_si(storage_index)) |
---|
647 | ) |
---|
648 | response = await self._client.request( |
---|
649 | "PUT", |
---|
650 | url, |
---|
651 | lease_renew_secret=renew_secret, |
---|
652 | lease_cancel_secret=cancel_secret, |
---|
653 | ) |
---|
654 | |
---|
655 | if response.code == http.NO_CONTENT: |
---|
656 | return |
---|
657 | else: |
---|
658 | raise ClientException(response.code) |
---|
659 | |
---|
660 | |
---|
661 | @define |
---|
662 | class UploadProgress(object): |
---|
663 | """ |
---|
664 | Progress of immutable upload, per the server. |
---|
665 | """ |
---|
666 | |
---|
667 | # True when upload has finished. |
---|
668 | finished: bool |
---|
669 | # Remaining ranges to upload. |
---|
670 | required: RangeMap |
---|
671 | |
---|
672 | |
---|
673 | @async_to_deferred |
---|
674 | async def read_share_chunk( |
---|
675 | client: StorageClient, |
---|
676 | share_type: str, |
---|
677 | storage_index: bytes, |
---|
678 | share_number: int, |
---|
679 | offset: int, |
---|
680 | length: int, |
---|
681 | ) -> bytes: |
---|
682 | """ |
---|
683 | Download a chunk of data from a share. |
---|
684 | |
---|
685 | TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 Failed downloads |
---|
686 | should be transparently retried and redownloaded by the implementation a |
---|
687 | few times so that if a failure percolates up, the caller can assume the |
---|
688 | failure isn't a short-term blip. |
---|
689 | |
---|
690 | NOTE: the underlying HTTP protocol is somewhat more flexible than this API, |
---|
691 | insofar as it doesn't always require a range. In practice a range is |
---|
692 | always provided by the current callers. |
---|
693 | """ |
---|
694 | url = client.relative_url( |
---|
695 | "/storage/v1/{}/{}/{}".format( |
---|
696 | share_type, _encode_si(storage_index), share_number |
---|
697 | ) |
---|
698 | ) |
---|
699 | # The default 60 second timeout is for getting the response, so it doesn't |
---|
700 | # include the time it takes to download the body... so we will will deal |
---|
701 | # with that later, via limited_content(). |
---|
702 | response = await client.request( |
---|
703 | "GET", |
---|
704 | url, |
---|
705 | headers=Headers( |
---|
706 | # Ranges in HTTP are _inclusive_, Python's convention is exclusive, |
---|
707 | # but Range constructor does that the conversion for us. |
---|
708 | {"range": [Range("bytes", [(offset, offset + length)]).to_header()]} |
---|
709 | ), |
---|
710 | unbuffered=True, # Don't buffer the response in memory. |
---|
711 | ) |
---|
712 | |
---|
713 | if response.code == http.NO_CONTENT: |
---|
714 | return b"" |
---|
715 | |
---|
716 | content_type = get_content_type(response.headers) |
---|
717 | if content_type != "application/octet-stream": |
---|
718 | raise ValueError( |
---|
719 | f"Content-type was wrong: {content_type}, should be application/octet-stream" |
---|
720 | ) |
---|
721 | |
---|
722 | if response.code == http.PARTIAL_CONTENT: |
---|
723 | content_range = parse_content_range_header( |
---|
724 | response.headers.getRawHeaders("content-range")[0] or "" |
---|
725 | ) |
---|
726 | if ( |
---|
727 | content_range is None |
---|
728 | or content_range.stop is None |
---|
729 | or content_range.start is None |
---|
730 | ): |
---|
731 | raise ValueError( |
---|
732 | "Content-Range was missing, invalid, or in format we don't support" |
---|
733 | ) |
---|
734 | supposed_length = content_range.stop - content_range.start |
---|
735 | if supposed_length > length: |
---|
736 | raise ValueError("Server sent more than we asked for?!") |
---|
737 | # It might also send less than we asked for. That's (probably) OK, e.g. |
---|
738 | # if we went past the end of the file. |
---|
739 | body = await limited_content(response, client._clock, supposed_length) |
---|
740 | body.seek(0, SEEK_END) |
---|
741 | actual_length = body.tell() |
---|
742 | if actual_length != supposed_length: |
---|
743 | # Most likely a mutable that got changed out from under us, but |
---|
744 | # conceivably could be a bug... |
---|
745 | raise ValueError( |
---|
746 | f"Length of response sent from server ({actual_length}) " |
---|
747 | + f"didn't match Content-Range header ({supposed_length})" |
---|
748 | ) |
---|
749 | body.seek(0) |
---|
750 | return body.read() |
---|
751 | else: |
---|
752 | # Technically HTTP allows sending an OK with full body under these |
---|
753 | # circumstances, but the server is not designed to do that so we ignore |
---|
754 | # that possibility for now... |
---|
755 | raise ClientException(response.code) |
---|
756 | |
---|
757 | |
---|
758 | @async_to_deferred |
---|
759 | async def advise_corrupt_share( |
---|
760 | client: StorageClient, |
---|
761 | share_type: str, |
---|
762 | storage_index: bytes, |
---|
763 | share_number: int, |
---|
764 | reason: str, |
---|
765 | ) -> None: |
---|
766 | assert isinstance(reason, str) |
---|
767 | url = client.relative_url( |
---|
768 | "/storage/v1/{}/{}/{}/corrupt".format( |
---|
769 | share_type, _encode_si(storage_index), share_number |
---|
770 | ) |
---|
771 | ) |
---|
772 | message = {"reason": reason} |
---|
773 | response = await client.request("POST", url, message_to_serialize=message) |
---|
774 | if response.code == http.OK: |
---|
775 | return |
---|
776 | else: |
---|
777 | raise ClientException( |
---|
778 | response.code, |
---|
779 | ) |
---|
780 | |
---|
781 | |
---|
782 | @define(hash=True) |
---|
783 | class StorageClientImmutables(object): |
---|
784 | """ |
---|
785 | APIs for interacting with immutables. |
---|
786 | """ |
---|
787 | |
---|
788 | _client: StorageClient |
---|
789 | |
---|
790 | @async_to_deferred |
---|
791 | async def create( |
---|
792 | self, |
---|
793 | storage_index: bytes, |
---|
794 | share_numbers: set[int], |
---|
795 | allocated_size: int, |
---|
796 | upload_secret: bytes, |
---|
797 | lease_renew_secret: bytes, |
---|
798 | lease_cancel_secret: bytes, |
---|
799 | ) -> ImmutableCreateResult: |
---|
800 | """ |
---|
801 | Create a new storage index for an immutable. |
---|
802 | |
---|
803 | TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 retry |
---|
804 | internally on failure, to ensure the operation fully succeeded. If |
---|
805 | sufficient number of failures occurred, the result may fire with an |
---|
806 | error, but there's no expectation that user code needs to have a |
---|
807 | recovery codepath; it will most likely just report an error to the |
---|
808 | user. |
---|
809 | |
---|
810 | Result fires when creating the storage index succeeded, if creating the |
---|
811 | storage index failed the result will fire with an exception. |
---|
812 | """ |
---|
813 | with start_action( |
---|
814 | action_type="allmydata:storage:http-client:immutable:create", |
---|
815 | storage_index=si_to_human_readable(storage_index), |
---|
816 | share_numbers=share_numbers, |
---|
817 | allocated_size=allocated_size, |
---|
818 | ) as ctx: |
---|
819 | result = await self._create( |
---|
820 | storage_index, |
---|
821 | share_numbers, |
---|
822 | allocated_size, |
---|
823 | upload_secret, |
---|
824 | lease_renew_secret, |
---|
825 | lease_cancel_secret, |
---|
826 | ) |
---|
827 | ctx.add_success_fields( |
---|
828 | already_have=result.already_have, allocated=result.allocated |
---|
829 | ) |
---|
830 | return result |
---|
831 | |
---|
832 | async def _create( |
---|
833 | self, |
---|
834 | storage_index: bytes, |
---|
835 | share_numbers: set[int], |
---|
836 | allocated_size: int, |
---|
837 | upload_secret: bytes, |
---|
838 | lease_renew_secret: bytes, |
---|
839 | lease_cancel_secret: bytes, |
---|
840 | ) -> ImmutableCreateResult: |
---|
841 | """Implementation of create().""" |
---|
842 | url = self._client.relative_url( |
---|
843 | "/storage/v1/immutable/" + _encode_si(storage_index) |
---|
844 | ) |
---|
845 | message = {"share-numbers": share_numbers, "allocated-size": allocated_size} |
---|
846 | |
---|
847 | response = await self._client.request( |
---|
848 | "POST", |
---|
849 | url, |
---|
850 | lease_renew_secret=lease_renew_secret, |
---|
851 | lease_cancel_secret=lease_cancel_secret, |
---|
852 | upload_secret=upload_secret, |
---|
853 | message_to_serialize=message, |
---|
854 | ) |
---|
855 | decoded_response = cast( |
---|
856 | Mapping[str, Set[int]], |
---|
857 | await self._client.decode_cbor(response, _SCHEMAS["allocate_buckets"]), |
---|
858 | ) |
---|
859 | return ImmutableCreateResult( |
---|
860 | already_have=decoded_response["already-have"], |
---|
861 | allocated=decoded_response["allocated"], |
---|
862 | ) |
---|
863 | |
---|
864 | @async_to_deferred |
---|
865 | async def abort_upload( |
---|
866 | self, storage_index: bytes, share_number: int, upload_secret: bytes |
---|
867 | ) -> None: |
---|
868 | """Abort the upload.""" |
---|
869 | with start_action( |
---|
870 | action_type="allmydata:storage:http-client:immutable:abort-upload", |
---|
871 | storage_index=si_to_human_readable(storage_index), |
---|
872 | share_number=share_number, |
---|
873 | ): |
---|
874 | return await self._abort_upload(storage_index, share_number, upload_secret) |
---|
875 | |
---|
876 | async def _abort_upload( |
---|
877 | self, storage_index: bytes, share_number: int, upload_secret: bytes |
---|
878 | ) -> None: |
---|
879 | """Implementation of ``abort_upload()``.""" |
---|
880 | url = self._client.relative_url( |
---|
881 | "/storage/v1/immutable/{}/{}/abort".format( |
---|
882 | _encode_si(storage_index), share_number |
---|
883 | ) |
---|
884 | ) |
---|
885 | response = await self._client.request( |
---|
886 | "PUT", |
---|
887 | url, |
---|
888 | upload_secret=upload_secret, |
---|
889 | ) |
---|
890 | |
---|
891 | if response.code == http.OK: |
---|
892 | return |
---|
893 | else: |
---|
894 | raise ClientException( |
---|
895 | response.code, |
---|
896 | ) |
---|
897 | |
---|
898 | @async_to_deferred |
---|
899 | async def write_share_chunk( |
---|
900 | self, |
---|
901 | storage_index: bytes, |
---|
902 | share_number: int, |
---|
903 | upload_secret: bytes, |
---|
904 | offset: int, |
---|
905 | data: bytes, |
---|
906 | ) -> UploadProgress: |
---|
907 | """ |
---|
908 | Upload a chunk of data for a specific share. |
---|
909 | |
---|
910 | TODO https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3857 The |
---|
911 | implementation should retry failed uploads transparently a number of |
---|
912 | times, so that if a failure percolates up, the caller can assume the |
---|
913 | failure isn't a short-term blip. |
---|
914 | |
---|
915 | Result fires when the upload succeeded, with a boolean indicating |
---|
916 | whether the _complete_ share (i.e. all chunks, not just this one) has |
---|
917 | been uploaded. |
---|
918 | """ |
---|
919 | with start_action( |
---|
920 | action_type="allmydata:storage:http-client:immutable:write-share-chunk", |
---|
921 | storage_index=si_to_human_readable(storage_index), |
---|
922 | share_number=share_number, |
---|
923 | offset=offset, |
---|
924 | data_len=len(data), |
---|
925 | ) as ctx: |
---|
926 | result = await self._write_share_chunk( |
---|
927 | storage_index, share_number, upload_secret, offset, data |
---|
928 | ) |
---|
929 | ctx.add_success_fields(finished=result.finished) |
---|
930 | return result |
---|
931 | |
---|
932 | async def _write_share_chunk( |
---|
933 | self, |
---|
934 | storage_index: bytes, |
---|
935 | share_number: int, |
---|
936 | upload_secret: bytes, |
---|
937 | offset: int, |
---|
938 | data: bytes, |
---|
939 | ) -> UploadProgress: |
---|
940 | """Implementation of ``write_share_chunk()``.""" |
---|
941 | url = self._client.relative_url( |
---|
942 | "/storage/v1/immutable/{}/{}".format( |
---|
943 | _encode_si(storage_index), share_number |
---|
944 | ) |
---|
945 | ) |
---|
946 | response = await self._client.request( |
---|
947 | "PATCH", |
---|
948 | url, |
---|
949 | upload_secret=upload_secret, |
---|
950 | data=data, |
---|
951 | headers=Headers( |
---|
952 | { |
---|
953 | "content-range": [ |
---|
954 | ContentRange("bytes", offset, offset + len(data)).to_header() |
---|
955 | ] |
---|
956 | } |
---|
957 | ), |
---|
958 | ) |
---|
959 | |
---|
960 | if response.code == http.OK: |
---|
961 | # Upload is still unfinished. |
---|
962 | finished = False |
---|
963 | elif response.code == http.CREATED: |
---|
964 | # Upload is done! |
---|
965 | finished = True |
---|
966 | else: |
---|
967 | raise ClientException( |
---|
968 | response.code, |
---|
969 | ) |
---|
970 | body = cast( |
---|
971 | Mapping[str, Sequence[Mapping[str, int]]], |
---|
972 | await self._client.decode_cbor( |
---|
973 | response, _SCHEMAS["immutable_write_share_chunk"] |
---|
974 | ), |
---|
975 | ) |
---|
976 | remaining = RangeMap() |
---|
977 | for chunk in body["required"]: |
---|
978 | remaining.set(True, chunk["begin"], chunk["end"]) |
---|
979 | return UploadProgress(finished=finished, required=remaining) |
---|
980 | |
---|
981 | @async_to_deferred |
---|
982 | async def read_share_chunk( |
---|
983 | self, storage_index: bytes, share_number: int, offset: int, length: int |
---|
984 | ) -> bytes: |
---|
985 | """ |
---|
986 | Download a chunk of data from a share. |
---|
987 | """ |
---|
988 | with start_action( |
---|
989 | action_type="allmydata:storage:http-client:immutable:read-share-chunk", |
---|
990 | storage_index=si_to_human_readable(storage_index), |
---|
991 | share_number=share_number, |
---|
992 | offset=offset, |
---|
993 | length=length, |
---|
994 | ) as ctx: |
---|
995 | result = await read_share_chunk( |
---|
996 | self._client, "immutable", storage_index, share_number, offset, length |
---|
997 | ) |
---|
998 | ctx.add_success_fields(data_len=len(result)) |
---|
999 | return result |
---|
1000 | |
---|
1001 | @async_to_deferred |
---|
1002 | async def list_shares(self, storage_index: bytes) -> Set[int]: |
---|
1003 | """ |
---|
1004 | Return the set of shares for a given storage index. |
---|
1005 | """ |
---|
1006 | with start_action( |
---|
1007 | action_type="allmydata:storage:http-client:immutable:list-shares", |
---|
1008 | storage_index=si_to_human_readable(storage_index), |
---|
1009 | ) as ctx: |
---|
1010 | result = await self._list_shares(storage_index) |
---|
1011 | ctx.add_success_fields(shares=result) |
---|
1012 | return result |
---|
1013 | |
---|
1014 | async def _list_shares(self, storage_index: bytes) -> Set[int]: |
---|
1015 | """Implementation of ``list_shares()``.""" |
---|
1016 | url = self._client.relative_url( |
---|
1017 | "/storage/v1/immutable/{}/shares".format(_encode_si(storage_index)) |
---|
1018 | ) |
---|
1019 | response = await self._client.request( |
---|
1020 | "GET", |
---|
1021 | url, |
---|
1022 | ) |
---|
1023 | if response.code == http.OK: |
---|
1024 | return cast( |
---|
1025 | Set[int], |
---|
1026 | await self._client.decode_cbor(response, _SCHEMAS["list_shares"]), |
---|
1027 | ) |
---|
1028 | else: |
---|
1029 | raise ClientException(response.code) |
---|
1030 | |
---|
1031 | @async_to_deferred |
---|
1032 | async def advise_corrupt_share( |
---|
1033 | self, |
---|
1034 | storage_index: bytes, |
---|
1035 | share_number: int, |
---|
1036 | reason: str, |
---|
1037 | ) -> None: |
---|
1038 | """Indicate a share has been corrupted, with a human-readable message.""" |
---|
1039 | with start_action( |
---|
1040 | action_type="allmydata:storage:http-client:immutable:advise-corrupt-share", |
---|
1041 | storage_index=si_to_human_readable(storage_index), |
---|
1042 | share_number=share_number, |
---|
1043 | reason=reason, |
---|
1044 | ): |
---|
1045 | await advise_corrupt_share( |
---|
1046 | self._client, "immutable", storage_index, share_number, reason |
---|
1047 | ) |
---|
1048 | |
---|
1049 | |
---|
1050 | @frozen |
---|
1051 | class WriteVector: |
---|
1052 | """Data to write to a chunk.""" |
---|
1053 | |
---|
1054 | offset: int |
---|
1055 | data: bytes |
---|
1056 | |
---|
1057 | |
---|
1058 | @frozen |
---|
1059 | class TestVector: |
---|
1060 | """Checks to make on a chunk before writing to it.""" |
---|
1061 | |
---|
1062 | offset: int |
---|
1063 | size: int |
---|
1064 | specimen: bytes |
---|
1065 | |
---|
1066 | |
---|
1067 | @frozen |
---|
1068 | class ReadVector: |
---|
1069 | """ |
---|
1070 | Reads to do on chunks, as part of a read/test/write operation. |
---|
1071 | """ |
---|
1072 | |
---|
1073 | offset: int |
---|
1074 | size: int |
---|
1075 | |
---|
1076 | |
---|
1077 | @frozen |
---|
1078 | class TestWriteVectors: |
---|
1079 | """Test and write vectors for a specific share.""" |
---|
1080 | |
---|
1081 | test_vectors: Sequence[TestVector] = field(factory=list) |
---|
1082 | write_vectors: Sequence[WriteVector] = field(factory=list) |
---|
1083 | new_length: Optional[int] = None |
---|
1084 | |
---|
1085 | def asdict(self) -> dict: |
---|
1086 | """Return dictionary suitable for sending over CBOR.""" |
---|
1087 | d = asdict(self) |
---|
1088 | d["test"] = d.pop("test_vectors") |
---|
1089 | d["write"] = d.pop("write_vectors") |
---|
1090 | d["new-length"] = d.pop("new_length") |
---|
1091 | return d |
---|
1092 | |
---|
1093 | |
---|
1094 | @frozen |
---|
1095 | class ReadTestWriteResult: |
---|
1096 | """Result of sending read-test-write vectors.""" |
---|
1097 | |
---|
1098 | success: bool |
---|
1099 | # Map share numbers to reads corresponding to the request's list of |
---|
1100 | # ReadVectors: |
---|
1101 | reads: Mapping[int, Sequence[bytes]] |
---|
1102 | |
---|
1103 | |
---|
1104 | # Result type for mutable read/test/write HTTP response. Can't just use |
---|
1105 | # dict[int,list[bytes]] because on Python 3.8 that will error out. |
---|
1106 | MUTABLE_RTW = TypedDict( |
---|
1107 | "MUTABLE_RTW", {"success": bool, "data": Mapping[int, Sequence[bytes]]} |
---|
1108 | ) |
---|
1109 | |
---|
1110 | |
---|
1111 | @frozen |
---|
1112 | class StorageClientMutables: |
---|
1113 | """ |
---|
1114 | APIs for interacting with mutables. |
---|
1115 | """ |
---|
1116 | |
---|
1117 | _client: StorageClient |
---|
1118 | |
---|
1119 | @async_to_deferred |
---|
1120 | async def read_test_write_chunks( |
---|
1121 | self, |
---|
1122 | storage_index: bytes, |
---|
1123 | write_enabler_secret: bytes, |
---|
1124 | lease_renew_secret: bytes, |
---|
1125 | lease_cancel_secret: bytes, |
---|
1126 | testwrite_vectors: dict[int, TestWriteVectors], |
---|
1127 | read_vector: list[ReadVector], |
---|
1128 | ) -> ReadTestWriteResult: |
---|
1129 | """ |
---|
1130 | Read, test, and possibly write chunks to a particular mutable storage |
---|
1131 | index. |
---|
1132 | |
---|
1133 | Reads are done before writes. |
---|
1134 | |
---|
1135 | Given a mapping between share numbers and test/write vectors, the tests |
---|
1136 | are done and if they are valid the writes are done. |
---|
1137 | """ |
---|
1138 | with start_action( |
---|
1139 | action_type="allmydata:storage:http-client:mutable:read-test-write", |
---|
1140 | storage_index=si_to_human_readable(storage_index), |
---|
1141 | ): |
---|
1142 | return await self._read_test_write_chunks( |
---|
1143 | storage_index, |
---|
1144 | write_enabler_secret, |
---|
1145 | lease_renew_secret, |
---|
1146 | lease_cancel_secret, |
---|
1147 | testwrite_vectors, |
---|
1148 | read_vector, |
---|
1149 | ) |
---|
1150 | |
---|
1151 | async def _read_test_write_chunks( |
---|
1152 | self, |
---|
1153 | storage_index: bytes, |
---|
1154 | write_enabler_secret: bytes, |
---|
1155 | lease_renew_secret: bytes, |
---|
1156 | lease_cancel_secret: bytes, |
---|
1157 | testwrite_vectors: dict[int, TestWriteVectors], |
---|
1158 | read_vector: list[ReadVector], |
---|
1159 | ) -> ReadTestWriteResult: |
---|
1160 | """Implementation of ``read_test_write_chunks()``.""" |
---|
1161 | url = self._client.relative_url( |
---|
1162 | "/storage/v1/mutable/{}/read-test-write".format(_encode_si(storage_index)) |
---|
1163 | ) |
---|
1164 | message = { |
---|
1165 | "test-write-vectors": { |
---|
1166 | share_number: twv.asdict() |
---|
1167 | for (share_number, twv) in testwrite_vectors.items() |
---|
1168 | }, |
---|
1169 | "read-vector": [asdict(r) for r in read_vector], |
---|
1170 | } |
---|
1171 | response = await self._client.request( |
---|
1172 | "POST", |
---|
1173 | url, |
---|
1174 | write_enabler_secret=write_enabler_secret, |
---|
1175 | lease_renew_secret=lease_renew_secret, |
---|
1176 | lease_cancel_secret=lease_cancel_secret, |
---|
1177 | message_to_serialize=message, |
---|
1178 | ) |
---|
1179 | if response.code == http.OK: |
---|
1180 | result = cast( |
---|
1181 | MUTABLE_RTW, |
---|
1182 | await self._client.decode_cbor( |
---|
1183 | response, _SCHEMAS["mutable_read_test_write"] |
---|
1184 | ), |
---|
1185 | ) |
---|
1186 | return ReadTestWriteResult(success=result["success"], reads=result["data"]) |
---|
1187 | else: |
---|
1188 | raise ClientException(response.code, (await response.content())) |
---|
1189 | |
---|
1190 | @async_to_deferred |
---|
1191 | async def read_share_chunk( |
---|
1192 | self, |
---|
1193 | storage_index: bytes, |
---|
1194 | share_number: int, |
---|
1195 | offset: int, |
---|
1196 | length: int, |
---|
1197 | ) -> bytes: |
---|
1198 | """ |
---|
1199 | Download a chunk of data from a share. |
---|
1200 | """ |
---|
1201 | with start_action( |
---|
1202 | action_type="allmydata:storage:http-client:mutable:read-share-chunk", |
---|
1203 | storage_index=si_to_human_readable(storage_index), |
---|
1204 | share_number=share_number, |
---|
1205 | offset=offset, |
---|
1206 | length=length, |
---|
1207 | ) as ctx: |
---|
1208 | result = await read_share_chunk( |
---|
1209 | self._client, "mutable", storage_index, share_number, offset, length |
---|
1210 | ) |
---|
1211 | ctx.add_success_fields(data_len=len(result)) |
---|
1212 | return result |
---|
1213 | |
---|
1214 | @async_to_deferred |
---|
1215 | async def list_shares(self, storage_index: bytes) -> Set[int]: |
---|
1216 | """ |
---|
1217 | List the share numbers for a given storage index. |
---|
1218 | """ |
---|
1219 | with start_action( |
---|
1220 | action_type="allmydata:storage:http-client:mutable:list-shares", |
---|
1221 | storage_index=si_to_human_readable(storage_index), |
---|
1222 | ) as ctx: |
---|
1223 | result = await self._list_shares(storage_index) |
---|
1224 | ctx.add_success_fields(shares=result) |
---|
1225 | return result |
---|
1226 | |
---|
1227 | async def _list_shares(self, storage_index: bytes) -> Set[int]: |
---|
1228 | """Implementation of ``list_shares()``.""" |
---|
1229 | url = self._client.relative_url( |
---|
1230 | "/storage/v1/mutable/{}/shares".format(_encode_si(storage_index)) |
---|
1231 | ) |
---|
1232 | response = await self._client.request("GET", url) |
---|
1233 | if response.code == http.OK: |
---|
1234 | return cast( |
---|
1235 | Set[int], |
---|
1236 | await self._client.decode_cbor( |
---|
1237 | response, |
---|
1238 | _SCHEMAS["mutable_list_shares"], |
---|
1239 | ), |
---|
1240 | ) |
---|
1241 | else: |
---|
1242 | raise ClientException(response.code) |
---|
1243 | |
---|
1244 | @async_to_deferred |
---|
1245 | async def advise_corrupt_share( |
---|
1246 | self, |
---|
1247 | storage_index: bytes, |
---|
1248 | share_number: int, |
---|
1249 | reason: str, |
---|
1250 | ) -> None: |
---|
1251 | """Indicate a share has been corrupted, with a human-readable message.""" |
---|
1252 | with start_action( |
---|
1253 | action_type="allmydata:storage:http-client:mutable:advise-corrupt-share", |
---|
1254 | storage_index=si_to_human_readable(storage_index), |
---|
1255 | share_number=share_number, |
---|
1256 | reason=reason, |
---|
1257 | ): |
---|
1258 | await advise_corrupt_share( |
---|
1259 | self._client, "mutable", storage_index, share_number, reason |
---|
1260 | ) |
---|