1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import absolute_import |
---|
6 | from __future__ import division |
---|
7 | from __future__ import print_function |
---|
8 | from __future__ import unicode_literals |
---|
9 | |
---|
10 | |
---|
11 | from future.utils import PY2 |
---|
12 | if PY2: |
---|
13 | from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 |
---|
14 | from past.builtins import long |
---|
15 | from six import ensure_text |
---|
16 | |
---|
17 | import time, os.path, textwrap |
---|
18 | |
---|
19 | try: |
---|
20 | from typing import Any, Dict, Union |
---|
21 | except ImportError: |
---|
22 | pass |
---|
23 | |
---|
24 | from zope.interface import implementer |
---|
25 | from twisted.application import service |
---|
26 | from twisted.internet import defer |
---|
27 | from twisted.internet.address import IPv4Address |
---|
28 | from twisted.python.failure import Failure |
---|
29 | from foolscap.api import Referenceable |
---|
30 | import allmydata |
---|
31 | from allmydata import node |
---|
32 | from allmydata.util import log, dictutil |
---|
33 | from allmydata.util.i2p_provider import create as create_i2p_provider |
---|
34 | from allmydata.util.tor_provider import create as create_tor_provider |
---|
35 | from allmydata.introducer.interfaces import \ |
---|
36 | RIIntroducerPublisherAndSubscriberService_v2 |
---|
37 | from allmydata.introducer.common import unsign_from_foolscap, \ |
---|
38 | SubscriberDescriptor, AnnouncementDescriptor |
---|
39 | from allmydata.node import read_config |
---|
40 | from allmydata.node import create_node_dir |
---|
41 | from allmydata.node import create_connection_handlers |
---|
42 | from allmydata.node import create_control_tub |
---|
43 | from allmydata.node import create_tub_options |
---|
44 | from allmydata.node import create_main_tub |
---|
45 | |
---|
46 | |
---|
47 | # this is put into README in new node-directories |
---|
48 | INTRODUCER_README = """ |
---|
49 | This directory contains files which contain private data for the Tahoe node, |
---|
50 | such as private keys. On Unix-like systems, the permissions on this directory |
---|
51 | are set to disallow users other than its owner from reading the contents of |
---|
52 | the files. See the 'configuration.rst' documentation file for details. |
---|
53 | """ |
---|
54 | |
---|
55 | _valid_config = node._common_valid_config |
---|
56 | |
---|
57 | class FurlFileConflictError(Exception): |
---|
58 | pass |
---|
59 | |
---|
60 | def create_introducer(basedir=u"."): |
---|
61 | """ |
---|
62 | :returns: a Deferred that yields a new _IntroducerNode instance |
---|
63 | """ |
---|
64 | try: |
---|
65 | # see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946 |
---|
66 | from twisted.internet import reactor |
---|
67 | |
---|
68 | if not os.path.exists(basedir): |
---|
69 | create_node_dir(basedir, INTRODUCER_README) |
---|
70 | |
---|
71 | config = read_config( |
---|
72 | basedir, u"client.port", |
---|
73 | generated_files=["introducer.furl"], |
---|
74 | _valid_config=_valid_config(), |
---|
75 | ) |
---|
76 | |
---|
77 | i2p_provider = create_i2p_provider(reactor, config) |
---|
78 | tor_provider = create_tor_provider(reactor, config) |
---|
79 | |
---|
80 | default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(config, i2p_provider, tor_provider) |
---|
81 | tub_options = create_tub_options(config) |
---|
82 | |
---|
83 | # we don't remember these because the Introducer doesn't make |
---|
84 | # outbound connections. |
---|
85 | i2p_provider = None |
---|
86 | tor_provider = None |
---|
87 | main_tub = create_main_tub( |
---|
88 | config, tub_options, default_connection_handlers, |
---|
89 | foolscap_connection_handlers, i2p_provider, tor_provider, |
---|
90 | ) |
---|
91 | control_tub = create_control_tub() |
---|
92 | |
---|
93 | node = _IntroducerNode( |
---|
94 | config, |
---|
95 | main_tub, |
---|
96 | control_tub, |
---|
97 | i2p_provider, |
---|
98 | tor_provider, |
---|
99 | ) |
---|
100 | return defer.succeed(node) |
---|
101 | except Exception: |
---|
102 | return Failure() |
---|
103 | |
---|
104 | |
---|
105 | class _IntroducerNode(node.Node): |
---|
106 | NODETYPE = "introducer" |
---|
107 | |
---|
108 | def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider): |
---|
109 | node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider) |
---|
110 | self.init_introducer() |
---|
111 | webport = self.get_config("node", "web.port", None) |
---|
112 | if webport: |
---|
113 | self.init_web(webport) # strports string |
---|
114 | |
---|
115 | def init_introducer(self): |
---|
116 | if not self._is_tub_listening(): |
---|
117 | raise ValueError("config error: we are Introducer, but tub " |
---|
118 | "is not listening ('tub.port=' is empty)") |
---|
119 | introducerservice = IntroducerService() |
---|
120 | introducerservice.setServiceParent(self) |
---|
121 | |
---|
122 | old_public_fn = self.config.get_config_path(u"introducer.furl") |
---|
123 | private_fn = self.config.get_private_path(u"introducer.furl") |
---|
124 | |
---|
125 | if os.path.exists(old_public_fn): |
---|
126 | if os.path.exists(private_fn): |
---|
127 | msg = """This directory (%s) contains both an old public |
---|
128 | 'introducer.furl' file, and a new-style |
---|
129 | 'private/introducer.furl', so I cannot safely remove the old |
---|
130 | one. Please make sure your desired FURL is in |
---|
131 | private/introducer.furl, and remove the public file. If this |
---|
132 | causes your Introducer's FURL to change, you need to inform |
---|
133 | all grid members so they can update their tahoe.cfg. |
---|
134 | """ |
---|
135 | raise FurlFileConflictError(textwrap.dedent(msg)) |
---|
136 | os.rename(old_public_fn, private_fn) |
---|
137 | furl = self.tub.registerReference(introducerservice, |
---|
138 | furlFile=private_fn) |
---|
139 | self.log(" introducer is at %s" % furl, umid="qF2L9A") |
---|
140 | self.introducer_url = furl # for tests |
---|
141 | |
---|
142 | def init_web(self, webport): |
---|
143 | self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA") |
---|
144 | |
---|
145 | from allmydata.webish import IntroducerWebishServer |
---|
146 | nodeurl_path = self.config.get_config_path(u"node.url") |
---|
147 | config_staticdir = self.get_config("node", "web.static", "public_html") |
---|
148 | staticdir = self.config.get_config_path(config_staticdir) |
---|
149 | ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir) |
---|
150 | ws.setServiceParent(self) |
---|
151 | |
---|
152 | |
---|
153 | def stringify_remote_address(rref): |
---|
154 | remote = rref.getPeer() |
---|
155 | if isinstance(remote, IPv4Address): |
---|
156 | return "%s:%d" % (remote.host, remote.port) |
---|
157 | # loopback is a non-IPv4Address |
---|
158 | return str(remote) |
---|
159 | |
---|
160 | |
---|
161 | @implementer(RIIntroducerPublisherAndSubscriberService_v2) |
---|
162 | class IntroducerService(service.MultiService, Referenceable): |
---|
163 | name = "introducer" |
---|
164 | # v1 is the original protocol, added in 1.0 (but only advertised starting |
---|
165 | # in 1.3), removed in 1.12. v2 is the new signed protocol, added in 1.10 |
---|
166 | # TODO: reconcile bytes/str for keys |
---|
167 | VERSION = { |
---|
168 | #"http://allmydata.org/tahoe/protocols/introducer/v1": { }, |
---|
169 | b"http://allmydata.org/tahoe/protocols/introducer/v2": { }, |
---|
170 | b"application-version": allmydata.__full_version__.encode("utf-8"), |
---|
171 | } # type: Dict[Union[bytes, str], Any] |
---|
172 | |
---|
173 | def __init__(self): |
---|
174 | service.MultiService.__init__(self) |
---|
175 | self.introducer_url = None |
---|
176 | # 'index' is (service_name, key_s, tubid), where key_s or tubid is |
---|
177 | # None |
---|
178 | self._announcements = {} # dict of index -> |
---|
179 | # (ann_t, canary, ann, timestamp) |
---|
180 | |
---|
181 | # ann (the announcement dictionary) is cleaned up: nickname is always |
---|
182 | # unicode, servicename is always ascii, etc, even though |
---|
183 | # simplejson.loads sometimes returns either |
---|
184 | |
---|
185 | # self._subscribers is a dict mapping servicename to subscriptions |
---|
186 | # 'subscriptions' is a dict mapping rref to a subscription |
---|
187 | # 'subscription' is a tuple of (subscriber_info, timestamp) |
---|
188 | # 'subscriber_info' is a dict, provided directly by v2 clients. The |
---|
189 | # expected keys are: version, nickname, app-versions, my-version, |
---|
190 | # oldest-supported |
---|
191 | self._subscribers = dictutil.UnicodeKeyDict({}) |
---|
192 | |
---|
193 | self._debug_counts = {"inbound_message": 0, |
---|
194 | "inbound_duplicate": 0, |
---|
195 | "inbound_no_seqnum": 0, |
---|
196 | "inbound_old_replay": 0, |
---|
197 | "inbound_update": 0, |
---|
198 | "outbound_message": 0, |
---|
199 | "outbound_announcements": 0, |
---|
200 | "inbound_subscribe": 0} |
---|
201 | self._debug_outstanding = 0 |
---|
202 | |
---|
203 | def _debug_retired(self, res): |
---|
204 | self._debug_outstanding -= 1 |
---|
205 | return res |
---|
206 | |
---|
207 | def log(self, *args, **kwargs): |
---|
208 | if "facility" not in kwargs: |
---|
209 | kwargs["facility"] = "tahoe.introducer.server" |
---|
210 | return log.msg(*args, **kwargs) |
---|
211 | |
---|
212 | def get_announcements(self): |
---|
213 | """Return a list of AnnouncementDescriptor for all announcements""" |
---|
214 | announcements = [] |
---|
215 | for (index, (_, canary, ann, when)) in list(self._announcements.items()): |
---|
216 | ad = AnnouncementDescriptor(when, index, canary, ann) |
---|
217 | announcements.append(ad) |
---|
218 | return announcements |
---|
219 | |
---|
220 | def get_subscribers(self): |
---|
221 | """Return a list of SubscriberDescriptor objects for all subscribers""" |
---|
222 | s = [] |
---|
223 | for service_name, subscriptions in list(self._subscribers.items()): |
---|
224 | for rref,(subscriber_info,when) in list(subscriptions.items()): |
---|
225 | # note that if the subscriber didn't do Tub.setLocation, |
---|
226 | # tubid will be None. Also, subscribers do not tell us which |
---|
227 | # pubkey they use; only publishers do that. |
---|
228 | tubid = rref.getRemoteTubID() or "?" |
---|
229 | remote_address = stringify_remote_address(rref) |
---|
230 | # these three assume subscriber_info["version"]==0, but |
---|
231 | # should tolerate other versions |
---|
232 | nickname = subscriber_info.get("nickname", u"?") |
---|
233 | version = subscriber_info.get("my-version", u"?") |
---|
234 | app_versions = subscriber_info.get("app-versions", {}) |
---|
235 | # 'when' is the time they subscribed |
---|
236 | sd = SubscriberDescriptor(service_name, when, |
---|
237 | nickname, version, app_versions, |
---|
238 | remote_address, tubid) |
---|
239 | s.append(sd) |
---|
240 | return s |
---|
241 | |
---|
242 | def remote_get_version(self): |
---|
243 | return self.VERSION |
---|
244 | |
---|
245 | def remote_publish_v2(self, ann_t, canary): |
---|
246 | lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ") |
---|
247 | return self.publish(ann_t, canary, lp) |
---|
248 | |
---|
249 | def publish(self, ann_t, canary, lp): |
---|
250 | try: |
---|
251 | self._publish(ann_t, canary, lp) |
---|
252 | except: |
---|
253 | log.err(format="Introducer.remote_publish failed on %(ann)s", |
---|
254 | ann=ann_t, |
---|
255 | level=log.UNUSUAL, parent=lp, umid="620rWA") |
---|
256 | raise |
---|
257 | |
---|
258 | def _publish(self, ann_t, canary, lp): |
---|
259 | self._debug_counts["inbound_message"] += 1 |
---|
260 | self.log("introducer: announcement published: %s" % (ann_t,), |
---|
261 | umid="wKHgCw") |
---|
262 | ann, key = unsign_from_foolscap(ann_t) # might raise BadSignature |
---|
263 | service_name = str(ann["service-name"]) |
---|
264 | |
---|
265 | index = (service_name, key) |
---|
266 | old = self._announcements.get(index) |
---|
267 | if old: |
---|
268 | (old_ann_t, canary, old_ann, timestamp) = old |
---|
269 | if old_ann == ann: |
---|
270 | self.log("but we already knew it, ignoring", level=log.NOISY, |
---|
271 | umid="myxzLw") |
---|
272 | self._debug_counts["inbound_duplicate"] += 1 |
---|
273 | return |
---|
274 | else: |
---|
275 | if "seqnum" in old_ann: |
---|
276 | # must beat previous sequence number to replace |
---|
277 | if ("seqnum" not in ann |
---|
278 | or not isinstance(ann["seqnum"], (int,long))): |
---|
279 | self.log("not replacing old ann, no valid seqnum", |
---|
280 | level=log.NOISY, umid="ySbaVw") |
---|
281 | self._debug_counts["inbound_no_seqnum"] += 1 |
---|
282 | return |
---|
283 | if ann["seqnum"] <= old_ann["seqnum"]: |
---|
284 | self.log("not replacing old ann, new seqnum is too old" |
---|
285 | " (%s <= %s) (replay attack?)" |
---|
286 | % (ann["seqnum"], old_ann["seqnum"]), |
---|
287 | level=log.UNUSUAL, umid="sX7yqQ") |
---|
288 | self._debug_counts["inbound_old_replay"] += 1 |
---|
289 | return |
---|
290 | # ok, seqnum is newer, allow replacement |
---|
291 | self.log("old announcement being updated", level=log.NOISY, |
---|
292 | umid="304r9g") |
---|
293 | self._debug_counts["inbound_update"] += 1 |
---|
294 | self._announcements[index] = (ann_t, canary, ann, time.time()) |
---|
295 | #if canary: |
---|
296 | # canary.notifyOnDisconnect ... |
---|
297 | # use a CanaryWatcher? with cw.is_connected()? |
---|
298 | # actually we just want foolscap to give rref.is_connected(), since |
---|
299 | # this is only for the status display |
---|
300 | |
---|
301 | for s in self._subscribers.get(service_name, []): |
---|
302 | self._debug_counts["outbound_message"] += 1 |
---|
303 | self._debug_counts["outbound_announcements"] += 1 |
---|
304 | self._debug_outstanding += 1 |
---|
305 | d = s.callRemote("announce_v2", set([ann_t])) |
---|
306 | d.addBoth(self._debug_retired) |
---|
307 | d.addErrback(log.err, |
---|
308 | format="subscriber errored on announcement %(ann)s", |
---|
309 | ann=ann_t, facility="tahoe.introducer", |
---|
310 | level=log.UNUSUAL, umid="jfGMXQ") |
---|
311 | |
---|
312 | def remote_subscribe_v2(self, subscriber, service_name, subscriber_info): |
---|
313 | self.log("introducer: subscription[%r] request at %r" |
---|
314 | % (service_name, subscriber), umid="U3uzLg") |
---|
315 | service_name = ensure_text(service_name) |
---|
316 | subscriber_info = dictutil.UnicodeKeyDict({ |
---|
317 | ensure_text(k): v for (k, v) in subscriber_info.items() |
---|
318 | }) |
---|
319 | return self.add_subscriber(subscriber, service_name, subscriber_info) |
---|
320 | |
---|
321 | def add_subscriber(self, subscriber, service_name, subscriber_info): |
---|
322 | self._debug_counts["inbound_subscribe"] += 1 |
---|
323 | if service_name not in self._subscribers: |
---|
324 | self._subscribers[service_name] = {} |
---|
325 | subscribers = self._subscribers[service_name] |
---|
326 | if subscriber in subscribers: |
---|
327 | self.log("but they're already subscribed, ignoring", |
---|
328 | level=log.UNUSUAL, umid="Sy9EfA") |
---|
329 | return |
---|
330 | |
---|
331 | assert subscriber_info |
---|
332 | |
---|
333 | subscribers[subscriber] = (subscriber_info, time.time()) |
---|
334 | def _remove(): |
---|
335 | self.log("introducer: unsubscribing[%s] %s" % (service_name, |
---|
336 | subscriber), |
---|
337 | umid="vYGcJg") |
---|
338 | subscribers.pop(subscriber, None) |
---|
339 | subscriber.notifyOnDisconnect(_remove) |
---|
340 | |
---|
341 | # Make sure types are correct: |
---|
342 | for k in self._announcements: |
---|
343 | assert isinstance(k[0], type(service_name)) |
---|
344 | |
---|
345 | # now tell them about any announcements they're interested in |
---|
346 | announcements = set( [ ann_t |
---|
347 | for idx,(ann_t,canary,ann,when) |
---|
348 | in self._announcements.items() |
---|
349 | if idx[0] == service_name] ) |
---|
350 | if announcements: |
---|
351 | self._debug_counts["outbound_message"] += 1 |
---|
352 | self._debug_counts["outbound_announcements"] += len(announcements) |
---|
353 | self._debug_outstanding += 1 |
---|
354 | d = subscriber.callRemote("announce_v2", announcements) |
---|
355 | d.addBoth(self._debug_retired) |
---|
356 | d.addErrback(log.err, |
---|
357 | format="subscriber errored during subscribe %(anns)s", |
---|
358 | anns=announcements, facility="tahoe.introducer", |
---|
359 | level=log.UNUSUAL, umid="mtZepQ") |
---|
360 | return d |
---|