source: trunk/src/allmydata/web/private.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 3.2 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import attr
6
7from zope.interface import (
8    implementer,
9)
10
11from twisted.python.failure import (
12    Failure,
13)
14from twisted.internet.defer import (
15    succeed,
16    fail,
17)
18from twisted.cred.credentials import (
19    ICredentials,
20)
21from twisted.cred.portal import (
22    IRealm,
23    Portal,
24)
25from twisted.cred.checkers import (
26    ANONYMOUS,
27)
28from twisted.cred.error import (
29    UnauthorizedLogin,
30)
31from twisted.web.iweb import (
32    ICredentialFactory,
33)
34from twisted.web.resource import (
35    IResource,
36    Resource,
37)
38from twisted.web.guard import (
39    HTTPAuthSessionWrapper,
40)
41
42from ..util.hashutil import (
43    timing_safe_compare,
44)
45from ..util.assertutil import (
46    precondition,
47)
48
49from .logs import (
50    create_log_resources,
51)
52
53SCHEME = b"tahoe-lafs"
54
55class IToken(ICredentials):
56    def check(auth_token):
57        pass
58
59
60# Workaround for Shoobx/mypy-zope#26, where without suitable
61# stubs for twisted classes (ICredentials), IToken does not
62# appear to be an Interface. The proper fix appears to be to
63# create stubs for twisted
64# (https://twistedmatrix.com/trac/ticket/9717). For now,
65# bypassing the inline decorator syntax works around the issue.
66_itoken_impl = implementer(IToken)
67
68
69@_itoken_impl
70@attr.s
71class Token(object):
72    proposed_token = attr.ib(type=bytes)
73
74    def equals(self, valid_token):
75        return timing_safe_compare(
76            valid_token,
77            self.proposed_token,
78        )
79
80
81@attr.s
82class TokenChecker(object):
83    get_auth_token = attr.ib()
84
85    credentialInterfaces = [IToken]
86
87    def requestAvatarId(self, credentials):
88        required_token = self.get_auth_token()
89        precondition(isinstance(required_token, bytes))
90        if credentials.equals(required_token):
91            return succeed(ANONYMOUS)
92        return fail(Failure(UnauthorizedLogin()))
93
94
95@implementer(ICredentialFactory)
96@attr.s
97class TokenCredentialFactory(object):
98    scheme = SCHEME
99    authentication_realm = b"tahoe-lafs"
100
101    def getChallenge(self, request):
102        return {b"realm": self.authentication_realm}
103
104    def decode(self, response, request):
105        return Token(response)
106
107
108@implementer(IRealm)
109@attr.s
110class PrivateRealm(object):
111    _root = attr.ib()
112
113    def _logout(self):
114        pass
115
116    def requestAvatar(self, avatarId, mind, *interfaces):
117        if IResource in interfaces:
118            return (IResource, self._root, self._logout)
119        raise NotImplementedError(
120            "PrivateRealm supports IResource not {}".format(interfaces),
121        )
122
123
124def _create_vulnerable_tree():
125    private = Resource()
126    private.putChild(b"logs", create_log_resources())
127    return private
128
129
130def _create_private_tree(get_auth_token, vulnerable):
131    realm = PrivateRealm(vulnerable)
132    portal = Portal(realm, [TokenChecker(get_auth_token)])
133    return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()])
134
135
136def create_private_tree(get_auth_token):
137    """
138    Create a new resource tree that only allows requests if they include a
139    correct `Authorization: tahoe-lafs <api_auth_token>` header (where
140    `api_auth_token` matches the private configuration value).
141    """
142    return _create_private_tree(
143        get_auth_token,
144        _create_vulnerable_tree(),
145    )
Note: See TracBrowser for help on using the repository browser.