1 | """ |
---|
2 | Common HTTP infrastructure for the storge server. |
---|
3 | """ |
---|
4 | |
---|
5 | from enum import Enum |
---|
6 | from base64 import urlsafe_b64encode, b64encode |
---|
7 | from hashlib import sha256 |
---|
8 | from typing import Optional |
---|
9 | |
---|
10 | from cryptography.x509 import Certificate |
---|
11 | from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat |
---|
12 | |
---|
13 | from werkzeug.http import parse_options_header |
---|
14 | from twisted.web.http_headers import Headers |
---|
15 | from twisted.web.iweb import IResponse |
---|
16 | |
---|
17 | CBOR_MIME_TYPE = "application/cbor" |
---|
18 | |
---|
19 | |
---|
20 | def get_content_type(headers: Headers) -> Optional[str]: |
---|
21 | """ |
---|
22 | Get the content type from the HTTP ``Content-Type`` header. |
---|
23 | |
---|
24 | Returns ``None`` if no content-type was set. |
---|
25 | """ |
---|
26 | values = headers.getRawHeaders("content-type", [None]) or [None] |
---|
27 | content_type = parse_options_header(values[0])[0] or None |
---|
28 | return content_type |
---|
29 | |
---|
30 | |
---|
31 | def response_is_not_html(response: IResponse) -> None: |
---|
32 | """ |
---|
33 | During tests, this is registered so we can ensure the web server |
---|
34 | doesn't give us text/html. |
---|
35 | |
---|
36 | HTML is never correct except in 404, but it's the default for |
---|
37 | Twisted's web server so we assert nothing unexpected happened. |
---|
38 | """ |
---|
39 | if response.code != 404: |
---|
40 | assert get_content_type(response.headers) != "text/html" |
---|
41 | |
---|
42 | |
---|
43 | def swissnum_auth_header(swissnum: bytes) -> bytes: |
---|
44 | """Return value for ``Authorization`` header.""" |
---|
45 | return b"Tahoe-LAFS " + b64encode(swissnum).strip() |
---|
46 | |
---|
47 | |
---|
48 | class Secrets(Enum): |
---|
49 | """Different kinds of secrets the client may send.""" |
---|
50 | |
---|
51 | LEASE_RENEW = "lease-renew-secret" |
---|
52 | LEASE_CANCEL = "lease-cancel-secret" |
---|
53 | UPLOAD = "upload-secret" |
---|
54 | WRITE_ENABLER = "write-enabler" |
---|
55 | |
---|
56 | |
---|
57 | def get_spki(certificate: Certificate) -> bytes: |
---|
58 | """ |
---|
59 | Get the bytes making up the DER encoded representation of the |
---|
60 | `SubjectPublicKeyInfo` (RFC 7469) for the given certificate. |
---|
61 | """ |
---|
62 | return certificate.public_key().public_bytes( |
---|
63 | Encoding.DER, PublicFormat.SubjectPublicKeyInfo |
---|
64 | ) |
---|
65 | |
---|
66 | def get_spki_hash(certificate: Certificate) -> bytes: |
---|
67 | """ |
---|
68 | Get the public key hash, as per RFC 7469: base64 of sha256 of the public |
---|
69 | key encoded in DER + Subject Public Key Info format. |
---|
70 | |
---|
71 | We use the URL-safe base64 variant, since this is typically found in NURLs. |
---|
72 | """ |
---|
73 | spki_bytes = get_spki(certificate) |
---|
74 | return urlsafe_b64encode(sha256(spki_bytes).digest()).strip().rstrip(b"=") |
---|