1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | import attr |
---|
6 | |
---|
7 | from zope.interface import ( |
---|
8 | implementer, |
---|
9 | ) |
---|
10 | |
---|
11 | from twisted.python.failure import ( |
---|
12 | Failure, |
---|
13 | ) |
---|
14 | from twisted.internet.defer import ( |
---|
15 | succeed, |
---|
16 | fail, |
---|
17 | ) |
---|
18 | from twisted.cred.credentials import ( |
---|
19 | ICredentials, |
---|
20 | ) |
---|
21 | from twisted.cred.portal import ( |
---|
22 | IRealm, |
---|
23 | Portal, |
---|
24 | ) |
---|
25 | from twisted.cred.checkers import ( |
---|
26 | ANONYMOUS, |
---|
27 | ) |
---|
28 | from twisted.cred.error import ( |
---|
29 | UnauthorizedLogin, |
---|
30 | ) |
---|
31 | from twisted.web.iweb import ( |
---|
32 | ICredentialFactory, |
---|
33 | ) |
---|
34 | from twisted.web.resource import ( |
---|
35 | IResource, |
---|
36 | Resource, |
---|
37 | ) |
---|
38 | from twisted.web.guard import ( |
---|
39 | HTTPAuthSessionWrapper, |
---|
40 | ) |
---|
41 | |
---|
42 | from ..util.hashutil import ( |
---|
43 | timing_safe_compare, |
---|
44 | ) |
---|
45 | from ..util.assertutil import ( |
---|
46 | precondition, |
---|
47 | ) |
---|
48 | |
---|
49 | from .logs import ( |
---|
50 | create_log_resources, |
---|
51 | ) |
---|
52 | |
---|
53 | SCHEME = b"tahoe-lafs" |
---|
54 | |
---|
55 | class IToken(ICredentials): |
---|
56 | def check(auth_token): |
---|
57 | pass |
---|
58 | |
---|
59 | |
---|
60 | # Workaround for Shoobx/mypy-zope#26, where without suitable |
---|
61 | # stubs for twisted classes (ICredentials), IToken does not |
---|
62 | # appear to be an Interface. The proper fix appears to be to |
---|
63 | # create stubs for twisted |
---|
64 | # (https://twistedmatrix.com/trac/ticket/9717). For now, |
---|
65 | # bypassing the inline decorator syntax works around the issue. |
---|
66 | _itoken_impl = implementer(IToken) |
---|
67 | |
---|
68 | |
---|
69 | @_itoken_impl |
---|
70 | @attr.s |
---|
71 | class Token(object): |
---|
72 | proposed_token = attr.ib(type=bytes) |
---|
73 | |
---|
74 | def equals(self, valid_token): |
---|
75 | return timing_safe_compare( |
---|
76 | valid_token, |
---|
77 | self.proposed_token, |
---|
78 | ) |
---|
79 | |
---|
80 | |
---|
81 | @attr.s |
---|
82 | class TokenChecker(object): |
---|
83 | get_auth_token = attr.ib() |
---|
84 | |
---|
85 | credentialInterfaces = [IToken] |
---|
86 | |
---|
87 | def requestAvatarId(self, credentials): |
---|
88 | required_token = self.get_auth_token() |
---|
89 | precondition(isinstance(required_token, bytes)) |
---|
90 | if credentials.equals(required_token): |
---|
91 | return succeed(ANONYMOUS) |
---|
92 | return fail(Failure(UnauthorizedLogin())) |
---|
93 | |
---|
94 | |
---|
95 | @implementer(ICredentialFactory) |
---|
96 | @attr.s |
---|
97 | class TokenCredentialFactory(object): |
---|
98 | scheme = SCHEME |
---|
99 | authentication_realm = b"tahoe-lafs" |
---|
100 | |
---|
101 | def getChallenge(self, request): |
---|
102 | return {b"realm": self.authentication_realm} |
---|
103 | |
---|
104 | def decode(self, response, request): |
---|
105 | return Token(response) |
---|
106 | |
---|
107 | |
---|
108 | @implementer(IRealm) |
---|
109 | @attr.s |
---|
110 | class PrivateRealm(object): |
---|
111 | _root = attr.ib() |
---|
112 | |
---|
113 | def _logout(self): |
---|
114 | pass |
---|
115 | |
---|
116 | def requestAvatar(self, avatarId, mind, *interfaces): |
---|
117 | if IResource in interfaces: |
---|
118 | return (IResource, self._root, self._logout) |
---|
119 | raise NotImplementedError( |
---|
120 | "PrivateRealm supports IResource not {}".format(interfaces), |
---|
121 | ) |
---|
122 | |
---|
123 | |
---|
124 | def _create_vulnerable_tree(): |
---|
125 | private = Resource() |
---|
126 | private.putChild(b"logs", create_log_resources()) |
---|
127 | return private |
---|
128 | |
---|
129 | |
---|
130 | def _create_private_tree(get_auth_token, vulnerable): |
---|
131 | realm = PrivateRealm(vulnerable) |
---|
132 | portal = Portal(realm, [TokenChecker(get_auth_token)]) |
---|
133 | return HTTPAuthSessionWrapper(portal, [TokenCredentialFactory()]) |
---|
134 | |
---|
135 | |
---|
136 | def create_private_tree(get_auth_token): |
---|
137 | """ |
---|
138 | Create a new resource tree that only allows requests if they include a |
---|
139 | correct `Authorization: tahoe-lafs <api_auth_token>` header (where |
---|
140 | `api_auth_token` matches the private configuration value). |
---|
141 | """ |
---|
142 | return _create_private_tree( |
---|
143 | get_auth_token, |
---|
144 | _create_vulnerable_tree(), |
---|
145 | ) |
---|