1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import annotations |
---|
6 | |
---|
7 | from six import ensure_text |
---|
8 | |
---|
9 | import time, os.path, textwrap |
---|
10 | from typing import Any, Union |
---|
11 | |
---|
12 | from zope.interface import implementer |
---|
13 | from twisted.application import service |
---|
14 | from twisted.internet import defer |
---|
15 | from twisted.internet.address import IPv4Address |
---|
16 | from twisted.python.failure import Failure |
---|
17 | from foolscap.api import Referenceable |
---|
18 | import allmydata |
---|
19 | from allmydata import node |
---|
20 | from allmydata.util import log, dictutil |
---|
21 | from allmydata.util.i2p_provider import create as create_i2p_provider |
---|
22 | from allmydata.util.tor_provider import create as create_tor_provider |
---|
23 | from allmydata.introducer.interfaces import \ |
---|
24 | RIIntroducerPublisherAndSubscriberService_v2 |
---|
25 | from allmydata.introducer.common import unsign_from_foolscap, \ |
---|
26 | SubscriberDescriptor, AnnouncementDescriptor |
---|
27 | from allmydata.node import read_config |
---|
28 | from allmydata.node import create_node_dir |
---|
29 | from allmydata.node import create_connection_handlers |
---|
30 | from allmydata.node import create_tub_options |
---|
31 | from allmydata.node import create_main_tub |
---|
32 | |
---|
33 | |
---|
34 | # this is put into README in new node-directories |
---|
35 | INTRODUCER_README = """ |
---|
36 | This directory contains files which contain private data for the Tahoe node, |
---|
37 | such as private keys. On Unix-like systems, the permissions on this directory |
---|
38 | are set to disallow users other than its owner from reading the contents of |
---|
39 | the files. See the 'configuration.rst' documentation file for details. |
---|
40 | """ |
---|
41 | |
---|
42 | _valid_config = node._common_valid_config |
---|
43 | |
---|
44 | class FurlFileConflictError(Exception): |
---|
45 | pass |
---|
46 | |
---|
47 | def create_introducer(basedir=u"."): |
---|
48 | """ |
---|
49 | :returns: a Deferred that yields a new _IntroducerNode instance |
---|
50 | """ |
---|
51 | try: |
---|
52 | # see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946 |
---|
53 | from twisted.internet import reactor |
---|
54 | |
---|
55 | if not os.path.exists(basedir): |
---|
56 | create_node_dir(basedir, INTRODUCER_README) |
---|
57 | |
---|
58 | config = read_config( |
---|
59 | basedir, u"client.port", |
---|
60 | generated_files=["introducer.furl"], |
---|
61 | _valid_config=_valid_config(), |
---|
62 | ) |
---|
63 | |
---|
64 | i2p_provider = create_i2p_provider(reactor, config) |
---|
65 | tor_provider = create_tor_provider(reactor, config) |
---|
66 | |
---|
67 | default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(config, i2p_provider, tor_provider) |
---|
68 | tub_options = create_tub_options(config) |
---|
69 | |
---|
70 | main_tub = create_main_tub( |
---|
71 | config, tub_options, default_connection_handlers, |
---|
72 | foolscap_connection_handlers, i2p_provider, tor_provider, |
---|
73 | ) |
---|
74 | |
---|
75 | node = _IntroducerNode( |
---|
76 | config, |
---|
77 | main_tub, |
---|
78 | i2p_provider, |
---|
79 | tor_provider, |
---|
80 | ) |
---|
81 | i2p_provider.setServiceParent(node) |
---|
82 | tor_provider.setServiceParent(node) |
---|
83 | return defer.succeed(node) |
---|
84 | except Exception: |
---|
85 | return Failure() |
---|
86 | |
---|
87 | |
---|
88 | class _IntroducerNode(node.Node): |
---|
89 | NODETYPE = "introducer" |
---|
90 | |
---|
91 | def __init__(self, config, main_tub, i2p_provider, tor_provider): |
---|
92 | node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider) |
---|
93 | self.init_introducer() |
---|
94 | webport = self.get_config("node", "web.port", None) |
---|
95 | if webport: |
---|
96 | self.init_web(webport) # strports string |
---|
97 | |
---|
98 | def init_introducer(self): |
---|
99 | if not self._is_tub_listening(): |
---|
100 | raise ValueError("config error: we are Introducer, but tub " |
---|
101 | "is not listening ('tub.port=' is empty)") |
---|
102 | introducerservice = IntroducerService() |
---|
103 | introducerservice.setServiceParent(self) |
---|
104 | |
---|
105 | old_public_fn = self.config.get_config_path(u"introducer.furl") |
---|
106 | private_fn = self.config.get_private_path(u"introducer.furl") |
---|
107 | |
---|
108 | if os.path.exists(old_public_fn): |
---|
109 | if os.path.exists(private_fn): |
---|
110 | msg = """This directory (%s) contains both an old public |
---|
111 | 'introducer.furl' file, and a new-style |
---|
112 | 'private/introducer.furl', so I cannot safely remove the old |
---|
113 | one. Please make sure your desired FURL is in |
---|
114 | private/introducer.furl, and remove the public file. If this |
---|
115 | causes your Introducer's FURL to change, you need to inform |
---|
116 | all grid members so they can update their tahoe.cfg. |
---|
117 | """ |
---|
118 | raise FurlFileConflictError(textwrap.dedent(msg)) |
---|
119 | os.rename(old_public_fn, private_fn) |
---|
120 | furl = self.tub.registerReference(introducerservice, |
---|
121 | furlFile=private_fn) |
---|
122 | self.log(" introducer can be found in {!r}".format(private_fn), umid="qF2L9A") |
---|
123 | self.introducer_url = furl # for tests |
---|
124 | |
---|
125 | def init_web(self, webport): |
---|
126 | self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA") |
---|
127 | |
---|
128 | from allmydata.webish import IntroducerWebishServer |
---|
129 | nodeurl_path = self.config.get_config_path(u"node.url") |
---|
130 | config_staticdir = self.get_config("node", "web.static", "public_html") |
---|
131 | staticdir = self.config.get_config_path(config_staticdir) |
---|
132 | ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir) |
---|
133 | ws.setServiceParent(self) |
---|
134 | |
---|
135 | |
---|
136 | def stringify_remote_address(rref): |
---|
137 | remote = rref.getPeer() |
---|
138 | if isinstance(remote, IPv4Address): |
---|
139 | return "%s:%d" % (remote.host, remote.port) |
---|
140 | # loopback is a non-IPv4Address |
---|
141 | return str(remote) |
---|
142 | |
---|
143 | |
---|
144 | # MyPy doesn't work well with remote interfaces... |
---|
145 | @implementer(RIIntroducerPublisherAndSubscriberService_v2) |
---|
146 | class IntroducerService(service.MultiService, Referenceable): # type: ignore[misc] |
---|
147 | # The type in Twisted for services is wrong in 22.10... |
---|
148 | # https://github.com/twisted/twisted/issues/10135 |
---|
149 | name = "introducer" # type: ignore[assignment] |
---|
150 | # v1 is the original protocol, added in 1.0 (but only advertised starting |
---|
151 | # in 1.3), removed in 1.12. v2 is the new signed protocol, added in 1.10 |
---|
152 | # TODO: reconcile bytes/str for keys |
---|
153 | VERSION : dict[Union[bytes, str], Any]= { |
---|
154 | #"http://allmydata.org/tahoe/protocols/introducer/v1": { }, |
---|
155 | b"http://allmydata.org/tahoe/protocols/introducer/v2": { }, |
---|
156 | b"application-version": allmydata.__full_version__.encode("utf-8"), |
---|
157 | } |
---|
158 | |
---|
159 | def __init__(self): |
---|
160 | service.MultiService.__init__(self) |
---|
161 | self.introducer_url = None |
---|
162 | # 'index' is (service_name, key_s, tubid), where key_s or tubid is |
---|
163 | # None |
---|
164 | self._announcements = {} # dict of index -> |
---|
165 | # (ann_t, canary, ann, timestamp) |
---|
166 | |
---|
167 | # ann (the announcement dictionary) is cleaned up: nickname is always |
---|
168 | # unicode, servicename is always ascii, etc, even though |
---|
169 | # simplejson.loads sometimes returns either |
---|
170 | |
---|
171 | # self._subscribers is a dict mapping servicename to subscriptions |
---|
172 | # 'subscriptions' is a dict mapping rref to a subscription |
---|
173 | # 'subscription' is a tuple of (subscriber_info, timestamp) |
---|
174 | # 'subscriber_info' is a dict, provided directly by v2 clients. The |
---|
175 | # expected keys are: version, nickname, app-versions, my-version, |
---|
176 | # oldest-supported |
---|
177 | self._subscribers = dictutil.UnicodeKeyDict({}) |
---|
178 | |
---|
179 | self._debug_counts = {"inbound_message": 0, |
---|
180 | "inbound_duplicate": 0, |
---|
181 | "inbound_no_seqnum": 0, |
---|
182 | "inbound_old_replay": 0, |
---|
183 | "inbound_update": 0, |
---|
184 | "outbound_message": 0, |
---|
185 | "outbound_announcements": 0, |
---|
186 | "inbound_subscribe": 0} |
---|
187 | self._debug_outstanding = 0 |
---|
188 | |
---|
189 | def _debug_retired(self, res): |
---|
190 | self._debug_outstanding -= 1 |
---|
191 | return res |
---|
192 | |
---|
193 | def log(self, *args, **kwargs): |
---|
194 | if "facility" not in kwargs: |
---|
195 | kwargs["facility"] = "tahoe.introducer.server" |
---|
196 | return log.msg(*args, **kwargs) |
---|
197 | |
---|
198 | def get_announcements(self): |
---|
199 | """Return a list of AnnouncementDescriptor for all announcements""" |
---|
200 | announcements = [] |
---|
201 | for (index, (_, canary, ann, when)) in list(self._announcements.items()): |
---|
202 | ad = AnnouncementDescriptor(when, index, canary, ann) |
---|
203 | announcements.append(ad) |
---|
204 | return announcements |
---|
205 | |
---|
206 | def get_subscribers(self): |
---|
207 | """Return a list of SubscriberDescriptor objects for all subscribers""" |
---|
208 | s = [] |
---|
209 | for service_name, subscriptions in list(self._subscribers.items()): |
---|
210 | for rref,(subscriber_info,when) in list(subscriptions.items()): |
---|
211 | # note that if the subscriber didn't do Tub.setLocation, |
---|
212 | # tubid will be None. Also, subscribers do not tell us which |
---|
213 | # pubkey they use; only publishers do that. |
---|
214 | tubid = rref.getRemoteTubID() or "?" |
---|
215 | remote_address = stringify_remote_address(rref) |
---|
216 | # these three assume subscriber_info["version"]==0, but |
---|
217 | # should tolerate other versions |
---|
218 | nickname = subscriber_info.get("nickname", u"?") |
---|
219 | version = subscriber_info.get("my-version", u"?") |
---|
220 | app_versions = subscriber_info.get("app-versions", {}) |
---|
221 | # 'when' is the time they subscribed |
---|
222 | sd = SubscriberDescriptor(service_name, when, |
---|
223 | nickname, version, app_versions, |
---|
224 | remote_address, tubid) |
---|
225 | s.append(sd) |
---|
226 | return s |
---|
227 | |
---|
228 | def remote_get_version(self): |
---|
229 | return self.VERSION |
---|
230 | |
---|
231 | def remote_publish_v2(self, ann_t, canary): |
---|
232 | lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ") |
---|
233 | return self.publish(ann_t, canary, lp) |
---|
234 | |
---|
235 | def publish(self, ann_t, canary, lp): |
---|
236 | try: |
---|
237 | self._publish(ann_t, canary, lp) |
---|
238 | except: |
---|
239 | log.err(format="Introducer.remote_publish failed on %(ann)s", |
---|
240 | ann=ann_t, |
---|
241 | level=log.UNUSUAL, parent=lp, umid="620rWA") |
---|
242 | raise |
---|
243 | |
---|
244 | def _publish(self, ann_t, canary, lp): |
---|
245 | self._debug_counts["inbound_message"] += 1 |
---|
246 | self.log("introducer: announcement published: %s" % (ann_t,), |
---|
247 | umid="wKHgCw") |
---|
248 | ann, key = unsign_from_foolscap(ann_t) # might raise BadSignature |
---|
249 | service_name = str(ann["service-name"]) |
---|
250 | |
---|
251 | index = (service_name, key) |
---|
252 | old = self._announcements.get(index) |
---|
253 | if old: |
---|
254 | (old_ann_t, canary, old_ann, timestamp) = old |
---|
255 | if old_ann == ann: |
---|
256 | self.log("but we already knew it, ignoring", level=log.NOISY, |
---|
257 | umid="myxzLw") |
---|
258 | self._debug_counts["inbound_duplicate"] += 1 |
---|
259 | return |
---|
260 | else: |
---|
261 | if "seqnum" in old_ann: |
---|
262 | # must beat previous sequence number to replace |
---|
263 | if ("seqnum" not in ann |
---|
264 | or not isinstance(ann["seqnum"], int)): |
---|
265 | self.log("not replacing old ann, no valid seqnum", |
---|
266 | level=log.NOISY, umid="ySbaVw") |
---|
267 | self._debug_counts["inbound_no_seqnum"] += 1 |
---|
268 | return |
---|
269 | if ann["seqnum"] <= old_ann["seqnum"]: |
---|
270 | self.log("not replacing old ann, new seqnum is too old" |
---|
271 | " (%s <= %s) (replay attack?)" |
---|
272 | % (ann["seqnum"], old_ann["seqnum"]), |
---|
273 | level=log.UNUSUAL, umid="sX7yqQ") |
---|
274 | self._debug_counts["inbound_old_replay"] += 1 |
---|
275 | return |
---|
276 | # ok, seqnum is newer, allow replacement |
---|
277 | self.log("old announcement being updated", level=log.NOISY, |
---|
278 | umid="304r9g") |
---|
279 | self._debug_counts["inbound_update"] += 1 |
---|
280 | self._announcements[index] = (ann_t, canary, ann, time.time()) |
---|
281 | #if canary: |
---|
282 | # canary.notifyOnDisconnect ... |
---|
283 | # use a CanaryWatcher? with cw.is_connected()? |
---|
284 | # actually we just want foolscap to give rref.is_connected(), since |
---|
285 | # this is only for the status display |
---|
286 | |
---|
287 | for s in self._subscribers.get(service_name, []): |
---|
288 | self._debug_counts["outbound_message"] += 1 |
---|
289 | self._debug_counts["outbound_announcements"] += 1 |
---|
290 | self._debug_outstanding += 1 |
---|
291 | d = s.callRemote("announce_v2", set([ann_t])) |
---|
292 | d.addBoth(self._debug_retired) |
---|
293 | d.addErrback(log.err, |
---|
294 | format="subscriber errored on announcement %(ann)s", |
---|
295 | ann=ann_t, facility="tahoe.introducer", |
---|
296 | level=log.UNUSUAL, umid="jfGMXQ") |
---|
297 | |
---|
298 | def remote_subscribe_v2(self, subscriber, service_name, subscriber_info): |
---|
299 | self.log("introducer: subscription[%r] request at %r" |
---|
300 | % (service_name, subscriber), umid="U3uzLg") |
---|
301 | service_name = ensure_text(service_name) |
---|
302 | subscriber_info = dictutil.UnicodeKeyDict({ |
---|
303 | ensure_text(k): v for (k, v) in subscriber_info.items() |
---|
304 | }) |
---|
305 | return self.add_subscriber(subscriber, service_name, subscriber_info) |
---|
306 | |
---|
307 | def add_subscriber(self, subscriber, service_name, subscriber_info): |
---|
308 | self._debug_counts["inbound_subscribe"] += 1 |
---|
309 | if service_name not in self._subscribers: |
---|
310 | self._subscribers[service_name] = {} |
---|
311 | subscribers = self._subscribers[service_name] |
---|
312 | if subscriber in subscribers: |
---|
313 | self.log("but they're already subscribed, ignoring", |
---|
314 | level=log.UNUSUAL, umid="Sy9EfA") |
---|
315 | return |
---|
316 | |
---|
317 | assert subscriber_info |
---|
318 | |
---|
319 | subscribers[subscriber] = (subscriber_info, time.time()) |
---|
320 | def _remove(): |
---|
321 | self.log("introducer: unsubscribing[%s] %s" % (service_name, |
---|
322 | subscriber), |
---|
323 | umid="vYGcJg") |
---|
324 | subscribers.pop(subscriber, None) |
---|
325 | subscriber.notifyOnDisconnect(_remove) |
---|
326 | |
---|
327 | # Make sure types are correct: |
---|
328 | for k in self._announcements: |
---|
329 | assert isinstance(k[0], type(service_name)) |
---|
330 | |
---|
331 | # now tell them about any announcements they're interested in |
---|
332 | announcements = set( [ ann_t |
---|
333 | for idx,(ann_t,canary,ann,when) |
---|
334 | in self._announcements.items() |
---|
335 | if idx[0] == service_name] ) |
---|
336 | if announcements: |
---|
337 | self._debug_counts["outbound_message"] += 1 |
---|
338 | self._debug_counts["outbound_announcements"] += len(announcements) |
---|
339 | self._debug_outstanding += 1 |
---|
340 | d = subscriber.callRemote("announce_v2", announcements) |
---|
341 | d.addBoth(self._debug_retired) |
---|
342 | d.addErrback(log.err, |
---|
343 | format="subscriber errored during subscribe %(anns)s", |
---|
344 | anns=announcements, facility="tahoe.introducer", |
---|
345 | level=log.UNUSUAL, umid="mtZepQ") |
---|
346 | return d |
---|