source: trunk/src/allmydata/storage/http_common.py

Last change on this file was 50ce8abf, checked in by Jean-Paul Calderone <exarkun@…>, at 2023-08-22T12:50:27Z

adapt the existing case to a multi-case structure

  • Property mode set to 100644
File size: 2.3 KB
Line 
1"""
2Common HTTP infrastructure for the storge server.
3"""
4
5from enum import Enum
6from base64 import urlsafe_b64encode, b64encode
7from hashlib import sha256
8from typing import Optional
9
10from cryptography.x509 import Certificate
11from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
12
13from werkzeug.http import parse_options_header
14from twisted.web.http_headers import Headers
15from twisted.web.iweb import IResponse
16
17CBOR_MIME_TYPE = "application/cbor"
18
19
20def get_content_type(headers: Headers) -> Optional[str]:
21    """
22    Get the content type from the HTTP ``Content-Type`` header.
23
24    Returns ``None`` if no content-type was set.
25    """
26    values = headers.getRawHeaders("content-type", [None]) or [None]
27    content_type = parse_options_header(values[0])[0] or None
28    return content_type
29
30
31def response_is_not_html(response: IResponse) -> None:
32    """
33    During tests, this is registered so we can ensure the web server
34    doesn't give us text/html.
35
36    HTML is never correct except in 404, but it's the default for
37    Twisted's web server so we assert nothing unexpected happened.
38    """
39    if response.code != 404:
40        assert get_content_type(response.headers) != "text/html"
41
42
43def swissnum_auth_header(swissnum: bytes) -> bytes:
44    """Return value for ``Authorization`` header."""
45    return b"Tahoe-LAFS " + b64encode(swissnum).strip()
46
47
48class Secrets(Enum):
49    """Different kinds of secrets the client may send."""
50
51    LEASE_RENEW = "lease-renew-secret"
52    LEASE_CANCEL = "lease-cancel-secret"
53    UPLOAD = "upload-secret"
54    WRITE_ENABLER = "write-enabler"
55
56
57def get_spki(certificate: Certificate) -> bytes:
58    """
59    Get the bytes making up the DER encoded representation of the
60    `SubjectPublicKeyInfo` (RFC 7469) for the given certificate.
61    """
62    return certificate.public_key().public_bytes(
63        Encoding.DER, PublicFormat.SubjectPublicKeyInfo
64    )
65
66def get_spki_hash(certificate: Certificate) -> bytes:
67    """
68    Get the public key hash, as per RFC 7469: base64 of sha256 of the public
69    key encoded in DER + Subject Public Key Info format.
70
71    We use the URL-safe base64 variant, since this is typically found in NURLs.
72    """
73    spki_bytes = get_spki(certificate)
74    return urlsafe_b64encode(sha256(spki_bytes).digest()).strip().rstrip(b"=")
Note: See TracBrowser for help on using the repository browser.