source: trunk/src/allmydata/introducer/server.py @ d214fe3f

Last change on this file since d214fe3f was d214fe3f, checked in by Itamar Turner-Trauring <itamar@…>, at 2021-04-02T17:00:05Z

Delete unused imports.

  • Property mode set to 100644
File size: 15.1 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import print_function
8from __future__ import unicode_literals
9
10
11from future.utils import PY2
12if PY2:
13    from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min  # noqa: F401
14from past.builtins import long
15from six import ensure_text
16
17import time, os.path, textwrap
18
19try:
20    from typing import Any, Dict, Union
21except ImportError:
22    pass
23
24from zope.interface import implementer
25from twisted.application import service
26from twisted.internet import defer
27from twisted.internet.address import IPv4Address
28from twisted.python.failure import Failure
29from foolscap.api import Referenceable
30import allmydata
31from allmydata import node
32from allmydata.util import log, dictutil
33from allmydata.util.i2p_provider import create as create_i2p_provider
34from allmydata.util.tor_provider import create as create_tor_provider
35from allmydata.introducer.interfaces import \
36     RIIntroducerPublisherAndSubscriberService_v2
37from allmydata.introducer.common import unsign_from_foolscap, \
38     SubscriberDescriptor, AnnouncementDescriptor
39from allmydata.node import read_config
40from allmydata.node import create_node_dir
41from allmydata.node import create_connection_handlers
42from allmydata.node import create_control_tub
43from allmydata.node import create_tub_options
44from allmydata.node import create_main_tub
45
46
47# this is put into README in new node-directories
48INTRODUCER_README = """
49This directory contains files which contain private data for the Tahoe node,
50such as private keys.  On Unix-like systems, the permissions on this directory
51are set to disallow users other than its owner from reading the contents of
52the files.   See the 'configuration.rst' documentation file for details.
53"""
54
55_valid_config = node._common_valid_config
56
57class FurlFileConflictError(Exception):
58    pass
59
60def create_introducer(basedir=u"."):
61    """
62    :returns: a Deferred that yields a new _IntroducerNode instance
63    """
64    try:
65        # see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946
66        from twisted.internet import reactor
67
68        if not os.path.exists(basedir):
69            create_node_dir(basedir, INTRODUCER_README)
70
71        config = read_config(
72            basedir, u"client.port",
73            generated_files=["introducer.furl"],
74            _valid_config=_valid_config(),
75        )
76
77        i2p_provider = create_i2p_provider(reactor, config)
78        tor_provider = create_tor_provider(reactor, config)
79
80        default_connection_handlers, foolscap_connection_handlers = create_connection_handlers(config, i2p_provider, tor_provider)
81        tub_options = create_tub_options(config)
82
83        # we don't remember these because the Introducer doesn't make
84        # outbound connections.
85        i2p_provider = None
86        tor_provider = None
87        main_tub = create_main_tub(
88            config, tub_options, default_connection_handlers,
89            foolscap_connection_handlers, i2p_provider, tor_provider,
90        )
91        control_tub = create_control_tub()
92
93        node = _IntroducerNode(
94            config,
95            main_tub,
96            control_tub,
97            i2p_provider,
98            tor_provider,
99        )
100        return defer.succeed(node)
101    except Exception:
102        return Failure()
103
104
105class _IntroducerNode(node.Node):
106    NODETYPE = "introducer"
107
108    def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider):
109        node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider)
110        self.init_introducer()
111        webport = self.get_config("node", "web.port", None)
112        if webport:
113            self.init_web(webport) # strports string
114
115    def init_introducer(self):
116        if not self._is_tub_listening():
117            raise ValueError("config error: we are Introducer, but tub "
118                             "is not listening ('tub.port=' is empty)")
119        introducerservice = IntroducerService()
120        introducerservice.setServiceParent(self)
121
122        old_public_fn = self.config.get_config_path(u"introducer.furl")
123        private_fn = self.config.get_private_path(u"introducer.furl")
124
125        if os.path.exists(old_public_fn):
126            if os.path.exists(private_fn):
127                msg = """This directory (%s) contains both an old public
128                'introducer.furl' file, and a new-style
129                'private/introducer.furl', so I cannot safely remove the old
130                one. Please make sure your desired FURL is in
131                private/introducer.furl, and remove the public file. If this
132                causes your Introducer's FURL to change, you need to inform
133                all grid members so they can update their tahoe.cfg.
134                """
135                raise FurlFileConflictError(textwrap.dedent(msg))
136            os.rename(old_public_fn, private_fn)
137        furl = self.tub.registerReference(introducerservice,
138                                          furlFile=private_fn)
139        self.log(" introducer is at %s" % furl, umid="qF2L9A")
140        self.introducer_url = furl # for tests
141
142    def init_web(self, webport):
143        self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
144
145        from allmydata.webish import IntroducerWebishServer
146        nodeurl_path = self.config.get_config_path(u"node.url")
147        config_staticdir = self.get_config("node", "web.static", "public_html")
148        staticdir = self.config.get_config_path(config_staticdir)
149        ws = IntroducerWebishServer(self, webport, nodeurl_path, staticdir)
150        ws.setServiceParent(self)
151
152
153def stringify_remote_address(rref):
154    remote = rref.getPeer()
155    if isinstance(remote, IPv4Address):
156        return "%s:%d" % (remote.host, remote.port)
157    # loopback is a non-IPv4Address
158    return str(remote)
159
160
161@implementer(RIIntroducerPublisherAndSubscriberService_v2)
162class IntroducerService(service.MultiService, Referenceable):
163    name = "introducer"
164    # v1 is the original protocol, added in 1.0 (but only advertised starting
165    # in 1.3), removed in 1.12. v2 is the new signed protocol, added in 1.10
166    # TODO: reconcile bytes/str for keys
167    VERSION = {
168                #"http://allmydata.org/tahoe/protocols/introducer/v1": { },
169                b"http://allmydata.org/tahoe/protocols/introducer/v2": { },
170                b"application-version": allmydata.__full_version__.encode("utf-8"),
171                }  # type: Dict[Union[bytes, str], Any]
172
173    def __init__(self):
174        service.MultiService.__init__(self)
175        self.introducer_url = None
176        # 'index' is (service_name, key_s, tubid), where key_s or tubid is
177        # None
178        self._announcements = {} # dict of index ->
179                                 # (ann_t, canary, ann, timestamp)
180
181        # ann (the announcement dictionary) is cleaned up: nickname is always
182        # unicode, servicename is always ascii, etc, even though
183        # simplejson.loads sometimes returns either
184
185        # self._subscribers is a dict mapping servicename to subscriptions
186        # 'subscriptions' is a dict mapping rref to a subscription
187        # 'subscription' is a tuple of (subscriber_info, timestamp)
188        # 'subscriber_info' is a dict, provided directly by v2 clients. The
189        # expected keys are: version, nickname, app-versions, my-version,
190        # oldest-supported
191        self._subscribers = dictutil.UnicodeKeyDict({})
192
193        self._debug_counts = {"inbound_message": 0,
194                              "inbound_duplicate": 0,
195                              "inbound_no_seqnum": 0,
196                              "inbound_old_replay": 0,
197                              "inbound_update": 0,
198                              "outbound_message": 0,
199                              "outbound_announcements": 0,
200                              "inbound_subscribe": 0}
201        self._debug_outstanding = 0
202
203    def _debug_retired(self, res):
204        self._debug_outstanding -= 1
205        return res
206
207    def log(self, *args, **kwargs):
208        if "facility" not in kwargs:
209            kwargs["facility"] = "tahoe.introducer.server"
210        return log.msg(*args, **kwargs)
211
212    def get_announcements(self):
213        """Return a list of AnnouncementDescriptor for all announcements"""
214        announcements = []
215        for (index, (_, canary, ann, when)) in list(self._announcements.items()):
216            ad = AnnouncementDescriptor(when, index, canary, ann)
217            announcements.append(ad)
218        return announcements
219
220    def get_subscribers(self):
221        """Return a list of SubscriberDescriptor objects for all subscribers"""
222        s = []
223        for service_name, subscriptions in list(self._subscribers.items()):
224            for rref,(subscriber_info,when) in list(subscriptions.items()):
225                # note that if the subscriber didn't do Tub.setLocation,
226                # tubid will be None. Also, subscribers do not tell us which
227                # pubkey they use; only publishers do that.
228                tubid = rref.getRemoteTubID() or "?"
229                remote_address = stringify_remote_address(rref)
230                # these three assume subscriber_info["version"]==0, but
231                # should tolerate other versions
232                nickname = subscriber_info.get("nickname", u"?")
233                version = subscriber_info.get("my-version", u"?")
234                app_versions = subscriber_info.get("app-versions", {})
235                # 'when' is the time they subscribed
236                sd = SubscriberDescriptor(service_name, when,
237                                          nickname, version, app_versions,
238                                          remote_address, tubid)
239                s.append(sd)
240        return s
241
242    def remote_get_version(self):
243        return self.VERSION
244
245    def remote_publish_v2(self, ann_t, canary):
246        lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
247        return self.publish(ann_t, canary, lp)
248
249    def publish(self, ann_t, canary, lp):
250        try:
251            self._publish(ann_t, canary, lp)
252        except:
253            log.err(format="Introducer.remote_publish failed on %(ann)s",
254                    ann=ann_t,
255                    level=log.UNUSUAL, parent=lp, umid="620rWA")
256            raise
257
258    def _publish(self, ann_t, canary, lp):
259        self._debug_counts["inbound_message"] += 1
260        self.log("introducer: announcement published: %s" % (ann_t,),
261                 umid="wKHgCw")
262        ann, key = unsign_from_foolscap(ann_t) # might raise BadSignature
263        service_name = str(ann["service-name"])
264
265        index = (service_name, key)
266        old = self._announcements.get(index)
267        if old:
268            (old_ann_t, canary, old_ann, timestamp) = old
269            if old_ann == ann:
270                self.log("but we already knew it, ignoring", level=log.NOISY,
271                         umid="myxzLw")
272                self._debug_counts["inbound_duplicate"] += 1
273                return
274            else:
275                if "seqnum" in old_ann:
276                    # must beat previous sequence number to replace
277                    if ("seqnum" not in ann
278                        or not isinstance(ann["seqnum"], (int,long))):
279                        self.log("not replacing old ann, no valid seqnum",
280                                 level=log.NOISY, umid="ySbaVw")
281                        self._debug_counts["inbound_no_seqnum"] += 1
282                        return
283                    if ann["seqnum"] <= old_ann["seqnum"]:
284                        self.log("not replacing old ann, new seqnum is too old"
285                                 " (%s <= %s) (replay attack?)"
286                                 % (ann["seqnum"], old_ann["seqnum"]),
287                                 level=log.UNUSUAL, umid="sX7yqQ")
288                        self._debug_counts["inbound_old_replay"] += 1
289                        return
290                    # ok, seqnum is newer, allow replacement
291                self.log("old announcement being updated", level=log.NOISY,
292                         umid="304r9g")
293                self._debug_counts["inbound_update"] += 1
294        self._announcements[index] = (ann_t, canary, ann, time.time())
295        #if canary:
296        #    canary.notifyOnDisconnect ...
297        # use a CanaryWatcher? with cw.is_connected()?
298        # actually we just want foolscap to give rref.is_connected(), since
299        # this is only for the status display
300
301        for s in self._subscribers.get(service_name, []):
302            self._debug_counts["outbound_message"] += 1
303            self._debug_counts["outbound_announcements"] += 1
304            self._debug_outstanding += 1
305            d = s.callRemote("announce_v2", set([ann_t]))
306            d.addBoth(self._debug_retired)
307            d.addErrback(log.err,
308                         format="subscriber errored on announcement %(ann)s",
309                         ann=ann_t, facility="tahoe.introducer",
310                         level=log.UNUSUAL, umid="jfGMXQ")
311
312    def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
313        self.log("introducer: subscription[%r] request at %r"
314                 % (service_name, subscriber), umid="U3uzLg")
315        service_name = ensure_text(service_name)
316        subscriber_info = dictutil.UnicodeKeyDict({
317            ensure_text(k): v for (k, v) in subscriber_info.items()
318        })
319        return self.add_subscriber(subscriber, service_name, subscriber_info)
320
321    def add_subscriber(self, subscriber, service_name, subscriber_info):
322        self._debug_counts["inbound_subscribe"] += 1
323        if service_name not in self._subscribers:
324            self._subscribers[service_name] = {}
325        subscribers = self._subscribers[service_name]
326        if subscriber in subscribers:
327            self.log("but they're already subscribed, ignoring",
328                     level=log.UNUSUAL, umid="Sy9EfA")
329            return
330
331        assert subscriber_info
332
333        subscribers[subscriber] = (subscriber_info, time.time())
334        def _remove():
335            self.log("introducer: unsubscribing[%s] %s" % (service_name,
336                                                           subscriber),
337                     umid="vYGcJg")
338            subscribers.pop(subscriber, None)
339        subscriber.notifyOnDisconnect(_remove)
340
341        # Make sure types are correct:
342        for k in self._announcements:
343            assert isinstance(k[0], type(service_name))
344
345        # now tell them about any announcements they're interested in
346        announcements = set( [ ann_t
347                               for idx,(ann_t,canary,ann,when)
348                               in self._announcements.items()
349                               if idx[0] == service_name] )
350        if announcements:
351            self._debug_counts["outbound_message"] += 1
352            self._debug_counts["outbound_announcements"] += len(announcements)
353            self._debug_outstanding += 1
354            d = subscriber.callRemote("announce_v2", announcements)
355            d.addBoth(self._debug_retired)
356            d.addErrback(log.err,
357                         format="subscriber errored during subscribe %(anns)s",
358                         anns=announcements, facility="tahoe.introducer",
359                         level=log.UNUSUAL, umid="mtZepQ")
360            return d
Note: See TracBrowser for help on using the repository browser.