Ticket #466: 2011-02-p3.diff

File 2011-02-p3.diff, 109.2 KB (added by warner, at 2011-02-07T18:36:43Z)

new Introducer. see https://github.com/warner/tahoe-lafs/tree/466-introducer-take2 for updates

  • src/allmydata/client.py

    diff --git a/src/allmydata/client.py b/src/allmydata/client.py
    index fa515d4..46b02bc 100644
    a b from zope.interface import implements 
    66from twisted.internet import reactor, defer
    77from twisted.application import service
    88from twisted.application.internet import TimerService
    9 from foolscap.api import Referenceable
    109from pycryptopp.publickey import rsa
    1110
    1211import allmydata
    from allmydata.util.abbreviate import parse_abbreviated_size 
    2221from allmydata.util.time_format import parse_duration, parse_date
    2322from allmydata.stats import StatsProvider
    2423from allmydata.history import History
    25 from allmydata.interfaces import IStatsProducer, RIStubClient
     24from allmydata.interfaces import IStatsProducer
    2625from allmydata.nodemaker import NodeMaker
    2726
    2827
    GiB=1024*MiB 
    3231TiB=1024*GiB
    3332PiB=1024*TiB
    3433
    35 class StubClient(Referenceable):
    36     implements(RIStubClient)
    37 
    3834def _make_secret():
    3935    return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
    4036
    class Client(node.Node, pollmixin.PollMixin): 
    186182        ic = IntroducerClient(self.tub, self.introducer_furl,
    187183                              self.nickname,
    188184                              str(allmydata.__full_version__),
    189                               str(self.OLDEST_SUPPORTED_VERSION))
     185                              str(self.OLDEST_SUPPORTED_VERSION),
     186                              self.get_app_versions())
    190187        self.introducer_client = ic
    191188        # hold off on starting the IntroducerClient until our tub has been
    192189        # started, so we'll have a useful address on our RemoteReference, so
    class Client(node.Node, pollmixin.PollMixin): 
    292289        self.terminator = Terminator()
    293290        self.terminator.setServiceParent(self)
    294291        self.add_service(Uploader(helper_furl, self.stats_provider))
    295         self.init_stub_client()
    296292        self.init_nodemaker()
    297293
    298294    def init_client_storage_broker(self):
    class Client(node.Node, pollmixin.PollMixin): 
    331327    def get_storage_broker(self):
    332328        return self.storage_broker
    333329
    334     def init_stub_client(self):
    335         def _publish(res):
    336             # we publish an empty object so that the introducer can count how
    337             # many clients are connected and see what versions they're
    338             # running.
    339             sc = StubClient()
    340             furl = self.tub.registerReference(sc)
    341             ri_name = RIStubClient.__remote_name__
    342             self.introducer_client.publish(furl, "stub_client", ri_name)
    343         d = self.when_tub_ready()
    344         d.addCallback(_publish)
    345         d.addErrback(log.err, facility="tahoe.init",
    346                      level=log.BAD, umid="OEHq3g")
    347 
    348330    def init_nodemaker(self):
    349331        self.nodemaker = NodeMaker(self.storage_broker,
    350332                                   self._secret_holder,
  • src/allmydata/interfaces.py

    diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
    index 48094a9..9c2f1c8 100644
    a b WriteEnablerSecret = Hash # used to protect mutable bucket modifications 
    2626LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
    2727LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
    2828
    29 class RIStubClient(RemoteInterface):
    30     """Each client publishes a service announcement for a dummy object called
    31     the StubClient. This object doesn't actually offer any services, but the
    32     announcement helps the Introducer keep track of which clients are
    33     subscribed (so the grid admin can keep track of things like the size of
    34     the grid and the client versions in use. This is the (empty)
    35     RemoteInterface for the StubClient."""
    36 
    3729class RIBucketWriter(RemoteInterface):
    3830    """ Objects of this kind live on the server side. """
    3931    def write(offset=Offset, data=ShareData):
  • src/allmydata/introducer/client.py

    diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py
    index 31fbb5c..1e48fd8 100644
    a b  
    11
    2 from base64 import b32decode
     2import time, simplejson
    33from zope.interface import implements
    44from twisted.application import service
    5 from foolscap.api import Referenceable, SturdyRef, eventually
     5from foolscap.api import Referenceable, eventually, RemoteInterface, Violation
    66from allmydata.interfaces import InsufficientVersionError
    7 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
    8      IIntroducerClient
     7from allmydata.introducer.interfaces import IIntroducerClient, \
     8     RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2
     9from allmydata.introducer.common import sign, unsign, make_index, \
     10     convert_announcement_v1_to_v2, convert_announcement_v2_to_v1
    911from allmydata.util import log, idlib
    10 from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
     12from allmydata.util.rrefutil import add_version_to_remote_reference
     13from allmydata.util.ecdsa import BadSignatureError
     14
     15class ClientAdapter_v1(Referenceable): # for_v1
     16    """I wrap a v2 IntroducerClient to make it look like a v1 client, so it
     17    can be attached to an old server."""
     18    implements(RIIntroducerSubscriberClient_v1)
     19
     20    def __init__(self, original):
     21        self.original = original
     22
     23    def remote_announce(self, announcements):
     24        lp = self.original.log("received %d announcements (v1)" %
     25                               len(announcements))
     26        anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)
     27                       for ann_v1 in announcements])
     28        return self.original.got_announcements(anns_v1, lp)
     29
     30    def remote_set_encoding_parameters(self, parameters):
     31        self.original.remote_set_encoding_parameters(parameters)
     32
     33class RIStubClient(RemoteInterface): # for_v1
     34    """Each client publishes a service announcement for a dummy object called
     35    the StubClient. This object doesn't actually offer any services, but the
     36    announcement helps the Introducer keep track of which clients are
     37    subscribed (so the grid admin can keep track of things like the size of
     38    the grid and the client versions in use. This is the (empty)
     39    RemoteInterface for the StubClient."""
     40
     41class StubClient(Referenceable): # for_v1
     42    implements(RIStubClient)
    1143
    1244
    1345class IntroducerClient(service.Service, Referenceable):
    14     implements(RIIntroducerSubscriberClient, IIntroducerClient)
     46    implements(RIIntroducerSubscriberClient_v2, IIntroducerClient)
    1547
    1648    def __init__(self, tub, introducer_furl,
    17                  nickname, my_version, oldest_supported):
     49                 nickname, my_version, oldest_supported,
     50                 app_versions):
    1851        self._tub = tub
    1952        self.introducer_furl = introducer_furl
    2053
    2154        assert type(nickname) is unicode
    22         self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
     55        self._nickname = nickname
    2356        self._my_version = my_version
    2457        self._oldest_supported = oldest_supported
     58        self._app_versions = app_versions
     59
     60        self._my_subscriber_info = { "version": 0,
     61                                     "nickname": self._nickname,
     62                                     "app-versions": self._app_versions,
     63                                     "my-version": self._my_version,
     64                                     "oldest-supported": self._oldest_supported,
     65                                     }
     66        self._stub_client = None # for_v1
     67        self._stub_client_furl = None
    2568
    26         self._published_announcements = set()
     69        self._published_announcements = {}
     70        self._canary = Referenceable()
    2771
    2872        self._publisher = None
    2973
    class IntroducerClient(service.Service, Referenceable): 
    3377
    3478        # _current_announcements remembers one announcement per
    3579        # (servicename,serverid) pair. Anything that arrives with the same
    36         # pair will displace the previous one. This stores unpacked
    37         # announcement dictionaries, which can be compared for equality to
    38         # distinguish re-announcement from updates. It also provides memory
    39         # for clients who subscribe after startup.
     80        # pair will displace the previous one. This stores tuples of
     81        # (unpacked announcement dictionary, verifyingkey, rxtime). The ann_d
     82        # dicts can be compared for equality to distinguish re-announcement
     83        # from updates. It also provides memory for clients who subscribe
     84        # after startup.
    4085        self._current_announcements = {}
    4186
    4287        self.encoding_parameters = None
    class IntroducerClient(service.Service, Referenceable): 
    5196            "new_announcement": 0,
    5297            "outbound_message": 0,
    5398            }
     99        self._debug_outstanding = 0
     100
     101    def _debug_retired(self, res):
     102        self._debug_outstanding -= 1
     103        return res
    54104
    55105    def startService(self):
    56106        service.Service.startService(self)
    class IntroducerClient(service.Service, Referenceable): 
    95145
    96146    def log(self, *args, **kwargs):
    97147        if "facility" not in kwargs:
    98             kwargs["facility"] = "tahoe.introducer"
     148            kwargs["facility"] = "tahoe.introducer.client"
    99149        return log.msg(*args, **kwargs)
    100150
    101 
    102     def publish(self, furl, service_name, remoteinterface_name):
    103         assert type(self._nickname_utf8) is str # we always send UTF-8
    104         ann = (furl, service_name, remoteinterface_name,
    105                self._nickname_utf8, self._my_version, self._oldest_supported)
    106         self._published_announcements.add(ann)
    107         self._maybe_publish()
    108 
    109151    def subscribe_to(self, service_name, cb, *args, **kwargs):
    110152        self._local_subscribers.append( (service_name,cb,args,kwargs) )
    111153        self._subscribed_service_names.add(service_name)
    112154        self._maybe_subscribe()
    113         for (servicename,nodeid),ann_d in self._current_announcements.items():
     155        for (servicename,nodeid),(ann_d,key,when) in self._current_announcements.items():
    114156            if servicename == service_name:
    115157                eventually(cb, nodeid, ann_d)
    116158
    class IntroducerClient(service.Service, Referenceable): 
    124166                # there is a race here, but the subscription desk ignores
    125167                # duplicate requests.
    126168                self._subscriptions.add(service_name)
    127                 d = self._publisher.callRemote("subscribe", self, service_name)
    128                 d.addErrback(trap_deadref)
    129                 d.addErrback(log.err, format="server errored during subscribe",
    130                              facility="tahoe.introducer",
     169                self._debug_outstanding += 1
     170                d = self._publisher.callRemote("subscribe_v2",
     171                                               self, service_name,
     172                                               self._my_subscriber_info)
     173                d.addBoth(self._debug_retired)
     174                d.addErrback(self._subscribe_handle_v1, service_name) # for_v1
     175                d.addErrback(log.err, facility="tahoe.introducer.client",
    131176                             level=log.WEIRD, umid="2uMScQ")
    132177
     178    def _subscribe_handle_v1(self, f, service_name): # for_v1
     179        f.trap(Violation, NameError)
     180        # they don't have a 'subscribe_v2' method: must be a v1 introducer.
     181        # Fall back to the v1 'subscribe' method, using a client adapter.
     182        ca = ClientAdapter_v1(self)
     183        self._debug_outstanding += 1
     184        d = self._publisher.callRemote("subscribe", ca, service_name)
     185        d.addBoth(self._debug_retired)
     186        # We must also publish an empty 'stub_client' object, so the
     187        # introducer can count how many clients are connected and see what
     188        # versions they're running.
     189        if not self._stub_client_furl:
     190            self._stub_client = sc = StubClient()
     191            self._stub_client_furl = self._tub.registerReference(sc)
     192        def _publish_stub_client(ignored):
     193            ri_name = RIStubClient.__remote_name__
     194            self.publish(self._stub_client_furl, "stub_client", ri_name)
     195        d.addCallback(_publish_stub_client)
     196        return d
     197
     198    def create_announcement(self, furl, service_name, remoteinterface_name,
     199                            signing_key=None):
     200        ann_d = {"version": 0,
     201                 "service-name": service_name,
     202                 "FURL": furl,
     203                 "remoteinterface-name": remoteinterface_name,
     204
     205                 "nickname": self._nickname,
     206                 "app-versions": self._app_versions,
     207                 "my-version": self._my_version,
     208                 "oldest-supported": self._oldest_supported,
     209                 }
     210        return simplejson.dumps(sign(ann_d, signing_key))
     211
     212
     213    def publish(self, furl, service_name, remoteinterface_name,
     214                signing_key=None):
     215        ann = self.create_announcement(furl, service_name, remoteinterface_name,
     216                                       signing_key)
     217        self._published_announcements[service_name] = ann
     218        self._maybe_publish()
     219
    133220    def _maybe_publish(self):
    134221        if not self._publisher:
    135222            self.log("want to publish, but no introducer yet", level=log.NOISY)
    136223            return
    137224        # this re-publishes everything. The Introducer ignores duplicates
    138         for ann in self._published_announcements:
     225        for ann in self._published_announcements.values():
    139226            self._debug_counts["outbound_message"] += 1
    140             d = self._publisher.callRemote("publish", ann)
    141             d.addErrback(trap_deadref)
    142             d.addErrback(log.err,
    143                          format="server errored during publish %(ann)s",
    144                          ann=ann, facility="tahoe.introducer",
     227            self._debug_outstanding += 1
     228            d = self._publisher.callRemote("publish_v2", ann, self._canary)
     229            d.addBoth(self._debug_retired)
     230            d.addErrback(self._handle_v1_publisher, ann) # for_v1
     231            d.addErrback(log.err, ann=ann, facility="tahoe.introducer.client",
    145232                         level=log.WEIRD, umid="xs9pVQ")
    146233
    147 
    148 
    149     def remote_announce(self, announcements):
    150         self.log("received %d announcements" % len(announcements))
     234    def _handle_v1_publisher(self, f, ann): # for_v1
     235        f.trap(Violation, NameError)
     236        # they don't have the 'publish_v2' method, so fall back to the old
     237        # 'publish' method (which takes an unsigned tuple of bytestrings)
     238        self.log("falling back to publish_v1",
     239                 level=log.UNUSUAL, umid="9RCT1A", failure=f)
     240        ann_v1 = convert_announcement_v2_to_v1(ann)
     241        self._debug_outstanding += 1
     242        d = self._publisher.callRemote("publish", ann_v1)
     243        d.addBoth(self._debug_retired)
     244        return d
     245
     246
     247    def remote_announce_v2(self, announcements):
     248        lp = self.log("received %d announcements (v2)" % len(announcements))
     249        return self.got_announcements(announcements, lp)
     250
     251    def got_announcements(self, announcements, lp=None):
     252        # this is the common entry point for both v1 and v2 announcements
    151253        self._debug_counts["inbound_message"] += 1
    152         for ann in announcements:
     254        for ann_s in announcements:
    153255            try:
    154                 self._process_announcement(ann)
    155             except:
    156                 log.err(format="unable to process announcement %(ann)s",
    157                         ann=ann)
    158                 # Don't let a corrupt announcement prevent us from processing
    159                 # the remaining ones. Don't return an error to the server,
    160                 # since they'd just ignore it anyways.
    161                 pass
    162 
    163     def _process_announcement(self, ann):
     256                ann_d, key = unsign(ann_s) # might raise bad-sig error
     257            except BadSignatureError:
     258                self.log("bad signature on inbound announcement: %s" % (ann_s,),
     259                         parent=lp, level=log.WEIRD, umid="ZAU15Q")
     260                # process other announcements that arrived with the bad one
     261                continue
     262
     263            self._process_announcement(ann_d, key)
     264
     265    def _process_announcement(self, ann_d, key):
    164266        self._debug_counts["inbound_announcement"] += 1
    165         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
     267        service_name = str(ann_d["service-name"])
    166268        if service_name not in self._subscribed_service_names:
    167269            self.log("announcement for a service we don't care about [%s]"
    168270                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
    169271            self._debug_counts["wrong_service"] += 1
    170272            return
    171         self.log("announcement for [%s]: %s" % (service_name, ann),
    172                  umid="BoKEag")
    173         assert type(furl) is str
    174         assert type(service_name) is str
    175         assert type(ri_name) is str
    176         assert type(nickname_utf8) is str
    177         nickname = nickname_utf8.decode("utf-8")
    178         assert type(nickname) is unicode
    179         assert type(ver) is str
    180         assert type(oldest) is str
    181 
    182         nodeid = b32decode(SturdyRef(furl).tubID.upper())
    183         nodeid_s = idlib.shortnodeid_b2a(nodeid)
    184 
    185         ann_d = { "version": 0,
    186                   "service-name": service_name,
    187 
    188                   "FURL": furl,
    189                   "nickname": nickname,
    190                   "app-versions": {}, # need #466 and v2 introducer
    191                   "my-version": ver,
    192                   "oldest-supported": oldest,
    193                   }
    194 
    195         index = (service_name, nodeid)
    196         if self._current_announcements.get(index, None) == ann_d:
    197             self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
     273        # for ASCII values, simplejson might give us unicode *or* bytes
     274        if "nickname" in ann_d and isinstance(ann_d["nickname"], str):
     275            ann_d["nickname"] = unicode(ann_d["nickname"])
     276        nick_s = ann_d.get("nickname",u"").encode("utf-8")
     277        lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s",
     278                       nick=nick_s, svc=service_name, ann=ann_d, umid="BoKEag")
     279
     280        index = make_index(ann_d, key)
     281        nodeid = index[1]
     282        nodeid_s = idlib.nodeid_b2a(nodeid)
     283
     284        # is this announcement a duplicate?
     285        if self._current_announcements.get(index, [None]*3)[0] == ann_d:
     286            self.log(format="reannouncement for [%(service)s]:%(nodeid)s, ignoring",
    198287                     service=service_name, nodeid=nodeid_s,
    199                      level=log.UNUSUAL, umid="B1MIdA")
     288                     parent=lp2, level=log.UNUSUAL, umid="B1MIdA")
    200289            self._debug_counts["duplicate_announcement"] += 1
    201290            return
     291        # does it update an existing one?
    202292        if index in self._current_announcements:
    203293            self._debug_counts["update"] += 1
     294            self.log("replacing old announcement: %s" % (ann_d,),
     295                     parent=lp2, level=log.NOISY, umid="wxwgIQ")
    204296        else:
    205297            self._debug_counts["new_announcement"] += 1
     298            self.log("new announcement[%s]" % service_name,
     299                     parent=lp2, level=log.NOISY)
    206300
    207         self._current_announcements[index] = ann_d
     301        self._current_announcements[index] = (ann_d, key, time.time())
    208302        # note: we never forget an index, but we might update its value
    209303
    210304        for (service_name2,cb,args,kwargs) in self._local_subscribers:
  • new file src/allmydata/introducer/common.py

    diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py
    new file mode 100644
    index 0000000..a057ecc
    - +  
     1
     2import re, simplejson
     3from base64 import b32decode
     4from allmydata.util.ecdsa import VerifyingKey
     5
     6def make_index(ann_d, key):
     7    """Return something that can be used as an index (e.g. a tuple of
     8    strings), such that two messages that refer to the same 'thing' will have
     9    the same index. For introducer announcements, this is a tuple of
     10    (service-name, signing-key), or (service-name, tubid) if the announcement
     11    is not signed."""
     12
     13    service_name = str(ann_d["service-name"])
     14    if key:
     15        index = (service_name, key.to_string())
     16    else:
     17        # otherwise, use the FURL to get a tubid
     18        furl = str(ann_d["FURL"])
     19        m = re.match(r'pb://(\w+)@', furl)
     20        assert m
     21        tubid = b32decode(m.group(1).upper())
     22        index = (service_name, tubid)
     23    return index
     24
     25def convert_announcement_v1_to_v2(ann_t):
     26    (furl, service_name, ri_name, nickname, ver, oldest) = ann_t
     27    assert type(furl) is str
     28    assert type(service_name) is str
     29    assert type(ri_name) is str
     30    assert type(nickname) is str
     31    assert type(ver) is str
     32    assert type(oldest) is str
     33    ann_d = {"version": 0,
     34             "service-name": service_name,
     35             "FURL": furl,
     36             "remoteinterface-name": ri_name,
     37
     38             "nickname": nickname.decode("utf-8"),
     39             "app-versions": {},
     40             "my-version": ver,
     41             "oldest-supported": oldest,
     42             }
     43    return simplejson.dumps( (simplejson.dumps(ann_d), None, None) )
     44
     45def convert_announcement_v2_to_v1(ann_v2):
     46    (msg, sig, pubkey) = simplejson.loads(ann_v2)
     47    ann_d = simplejson.loads(msg)
     48    assert ann_d["version"] == 0
     49    ann_t = (str(ann_d["FURL"]), str(ann_d["service-name"]),
     50             str(ann_d["remoteinterface-name"]),
     51             ann_d["nickname"].encode("utf-8"),
     52             str(ann_d["my-version"]),
     53             str(ann_d["oldest-supported"]),
     54             )
     55    return ann_t
     56
     57
     58def sign(ann_d, sk):
     59    msg = simplejson.dumps(ann_d)
     60    if not sk:
     61        return (msg, None, None)
     62    vk = sk.get_verifying_key()
     63    return (msg, sk.sign(msg).encode("hex"), vk.to_string().encode("hex"))
     64
     65def unsign(ann_s):
     66    (msg_s, sig_s, key_s) = simplejson.loads(ann_s)
     67    key = None
     68    if sig_s and key_s:
     69        key = VerifyingKey.from_string(key_s.decode("hex"))
     70        key.verify(sig_s.decode("hex"), msg_s)
     71    msg = simplejson.loads(msg_s)
     72    return (msg, key)
  • src/allmydata/introducer/interfaces.py

    diff --git a/src/allmydata/introducer/interfaces.py b/src/allmydata/introducer/interfaces.py
    index 54f1701..03b0d9d 100644
    a b  
    11
    22from zope.interface import Interface
    33from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
    4     RemoteInterface
     4    RemoteInterface, Referenceable
     5from old import RIIntroducerSubscriberClient_v1
    56FURL = StringConstraint(1000)
    67
     8# old introducer protocol (v1):
     9#
    710# Announcements are (FURL, service_name, remoteinterface_name,
    811#                    nickname, my_version, oldest_supported)
    912#  the (FURL, service_name, remoteinterface_name) refer to the service being
    FURL = StringConstraint(1000) 
    1417#  incompatible peer. The second goal is to enable the development of
    1518#  backwards-compatibility code.
    1619
    17 Announcement = TupleOf(FURL, str, str,
    18                        str, str, str)
     20Announcement_v1 = TupleOf(FURL, str, str,
     21                          str, str, str)
    1922
    20 class RIIntroducerSubscriberClient(RemoteInterface):
    21     __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
     23# new protocol: Announcements are strings, a JSON serialized 3-tuple of (msg,
     24# sig, pubkey). More details to come.
     25Announcement_v2 = str
    2226
    23     def announce(announcements=SetOf(Announcement)):
     27class RIIntroducerSubscriberClient_v2(RemoteInterface):
     28    __remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com"
     29
     30    def announce_v2(announcements=SetOf(Announcement_v2)):
    2431        """I accept announcements from the publisher."""
    2532        return None
    2633
    class RIIntroducerSubscriberClient(RemoteInterface): 
    4148        """
    4249        return None
    4350
    44 # When Foolscap can handle multiple interfaces (Foolscap#17), the
    45 # full-powered introducer will implement both RIIntroducerPublisher and
    46 # RIIntroducerSubscriberService. Until then, we define
    47 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and
    48 # make everybody use that.
     51SubscriberInfo = DictOf(str, Any())
    4952
    50 class RIIntroducerPublisher(RemoteInterface):
     53class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface):
    5154    """To publish a service to the world, connect to me and give me your
    52     announcement message. I will deliver a copy to all connected subscribers."""
    53     __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
    54 
    55     def publish(announcement=Announcement):
    56         # canary?
    57         return None
    58 
    59 class RIIntroducerSubscriberService(RemoteInterface):
    60     __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
    61 
    62     def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
    63         """Give me a subscriber reference, and I will call its new_peers()
    64         method will any announcements that match the desired service name. I
    65         will ignore duplicate subscriptions.
    66         """
    67         return None
    68 
    69 class RIIntroducerPublisherAndSubscriberService(RemoteInterface):
    70     __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
     55    announcement message. I will deliver a copy to all connected subscribers.
     56    To hear about services, connect to me and subscribe to a specific
     57    service_name."""
     58    __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com"
    7159    def get_version():
    7260        return DictOf(str, Any())
    73     def publish(announcement=Announcement):
     61    def publish(announcement=Announcement_v1):
     62        return None
     63    def publish_v2(announcement=Announcement_v2, canary=Referenceable):
    7464        return None
    75     def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str):
     65    def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
     66        return None
     67    def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2,
     68                     service_name=str, subscriber_info=SubscriberInfo):
     69        """Give me a subscriber reference, and I will call its announce_v2()
     70        method with any announcements that match the desired service name. I
     71        will ignore duplicate subscriptions. The subscriber_info dictionary
     72        tells me about the subscriber, and is used for diagnostic/status
     73        displays."""
    7674        return None
    7775
    7876class IIntroducerClient(Interface):
    class IIntroducerClient(Interface): 
    8078    publish their services to the rest of the world, and I help them learn
    8179    about services available on other nodes."""
    8280
    83     def publish(furl, service_name, remoteinterface_name):
     81    def publish(furl, service_name, remoteinterface_name,
     82                signing_key=None):
    8483        """Once you call this, I will tell the world that the Referenceable
    8584        available at FURL is available to provide a service named
    8685        SERVICE_NAME. The precise definition of the service being provided is
    8786        identified by the Foolscap 'remote interface name' in the last
    8887        parameter: this is supposed to be a globally-unique string that
    89         identifies the RemoteInterface that is implemented."""
     88        identifies the RemoteInterface that is implemented.
     89
     90        If signing_key= is set to an instance of ecdsa.SigningKey, it will be
     91        used to sign the announcement."""
    9092
    9193    def subscribe_to(service_name, callback, *args, **kwargs):
    9294        """Call this if you will eventually want to use services with the
  • new file src/allmydata/introducer/old.py

    diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py
    new file mode 100644
    index 0000000..e0bdacf
    - +  
     1
     2import time
     3from base64 import b32decode
     4from zope.interface import implements, Interface
     5from twisted.application import service
     6import allmydata
     7from allmydata.interfaces import InsufficientVersionError
     8from allmydata.util import log, idlib, rrefutil
     9from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \
     10    RemoteInterface, Referenceable, eventually, SturdyRef
     11FURL = StringConstraint(1000)
     12
     13# We keep a copy of the old introducer (both client and server) here to
     14# support compatibility tests. The old client is supposed to handle the new
     15# server, and new client is supposed to handle the old server.
     16
     17
     18# Announcements are (FURL, service_name, remoteinterface_name,
     19#                    nickname, my_version, oldest_supported)
     20#  the (FURL, service_name, remoteinterface_name) refer to the service being
     21#  announced. The (nickname, my_version, oldest_supported) refer to the
     22#  client as a whole. The my_version/oldest_supported strings can be parsed
     23#  by an allmydata.util.version.Version instance, and then compared. The
     24#  first goal is to make sure that nodes are not confused by speaking to an
     25#  incompatible peer. The second goal is to enable the development of
     26#  backwards-compatibility code.
     27
     28Announcement = TupleOf(FURL, str, str,
     29                       str, str, str)
     30
     31class RIIntroducerSubscriberClient_v1(RemoteInterface):
     32    __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com"
     33
     34    def announce(announcements=SetOf(Announcement)):
     35        """I accept announcements from the publisher."""
     36        return None
     37
     38    def set_encoding_parameters(parameters=(int, int, int)):
     39        """Advise the client of the recommended k-of-n encoding parameters
     40        for this grid. 'parameters' is a tuple of (k, desired, n), where 'n'
     41        is the total number of shares that will be created for any given
     42        file, while 'k' is the number of shares that must be retrieved to
     43        recover that file, and 'desired' is the minimum number of shares that
     44        must be placed before the uploader will consider its job a success.
     45        n/k is the expansion ratio, while k determines the robustness.
     46
     47        Introducers should specify 'n' according to the expected size of the
     48        grid (there is no point to producing more shares than there are
     49        peers), and k according to the desired reliability-vs-overhead goals.
     50
     51        Note that setting k=1 is equivalent to simple replication.
     52        """
     53        return None
     54
     55# When Foolscap can handle multiple interfaces (Foolscap#17), the
     56# full-powered introducer will implement both RIIntroducerPublisher and
     57# RIIntroducerSubscriberService. Until then, we define
     58# RIIntroducerPublisherAndSubscriberService as a combination of the two, and
     59# make everybody use that.
     60
     61class RIIntroducerPublisher_v1(RemoteInterface):
     62    """To publish a service to the world, connect to me and give me your
     63    announcement message. I will deliver a copy to all connected subscribers."""
     64    __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com"
     65
     66    def publish(announcement=Announcement):
     67        # canary?
     68        return None
     69
     70class RIIntroducerSubscriberService_v1(RemoteInterface):
     71    __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com"
     72
     73    def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
     74        """Give me a subscriber reference, and I will call its new_peers()
     75        method will any announcements that match the desired service name. I
     76        will ignore duplicate subscriptions.
     77        """
     78        return None
     79
     80class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface):
     81    __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com"
     82    def get_version():
     83        return DictOf(str, Any())
     84    def publish(announcement=Announcement):
     85        return None
     86    def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):
     87        return None
     88
     89class IIntroducerClient(Interface):
     90    """I provide service introduction facilities for a node. I help nodes
     91    publish their services to the rest of the world, and I help them learn
     92    about services available on other nodes."""
     93
     94    def publish(furl, service_name, remoteinterface_name):
     95        """Once you call this, I will tell the world that the Referenceable
     96        available at FURL is available to provide a service named
     97        SERVICE_NAME. The precise definition of the service being provided is
     98        identified by the Foolscap 'remote interface name' in the last
     99        parameter: this is supposed to be a globally-unique string that
     100        identifies the RemoteInterface that is implemented."""
     101
     102    def subscribe_to(service_name, callback, *args, **kwargs):
     103        """Call this if you will eventually want to use services with the
     104        given SERVICE_NAME. This will prompt me to subscribe to announcements
     105        of those services. Your callback will be invoked with at least two
     106        arguments: a serverid (binary string), and an announcement
     107        dictionary, followed by any additional callback args/kwargs you give
     108        me. I will run your callback for both new announcements and for
     109        announcements that have changed, but you must be prepared to tolerate
     110        duplicates.
     111
     112        The announcement dictionary that I give you will have the following
     113        keys:
     114
     115         version: 0
     116         service-name: str('storage')
     117
     118         FURL: str(furl)
     119         remoteinterface-name: str(ri_name)
     120         nickname: unicode
     121         app-versions: {}
     122         my-version: str
     123         oldest-supported: str
     124
     125        Note that app-version will be an empty dictionary until #466 is done
     126        and both the introducer and the remote client have been upgraded. For
     127        current (native) server types, the serverid will always be equal to
     128        the binary form of the FURL's tubid.
     129        """
     130
     131    def connected_to_introducer():
     132        """Returns a boolean, True if we are currently connected to the
     133        introducer, False if not."""
     134
     135
     136class IntroducerClient_v1(service.Service, Referenceable):
     137    implements(RIIntroducerSubscriberClient_v1, IIntroducerClient)
     138
     139    def __init__(self, tub, introducer_furl,
     140                 nickname, my_version, oldest_supported):
     141        self._tub = tub
     142        self.introducer_furl = introducer_furl
     143
     144        assert type(nickname) is unicode
     145        self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
     146        self._my_version = my_version
     147        self._oldest_supported = oldest_supported
     148
     149        self._published_announcements = set()
     150
     151        self._publisher = None
     152
     153        self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
     154        self._subscribed_service_names = set()
     155        self._subscriptions = set() # requests we've actually sent
     156
     157        # _current_announcements remembers one announcement per
     158        # (servicename,serverid) pair. Anything that arrives with the same
     159        # pair will displace the previous one. This stores unpacked
     160        # announcement dictionaries, which can be compared for equality to
     161        # distinguish re-announcement from updates. It also provides memory
     162        # for clients who subscribe after startup.
     163        self._current_announcements = {}
     164
     165        self.encoding_parameters = None
     166
     167        # hooks for unit tests
     168        self._debug_counts = {
     169            "inbound_message": 0,
     170            "inbound_announcement": 0,
     171            "wrong_service": 0,
     172            "duplicate_announcement": 0,
     173            "update": 0,
     174            "new_announcement": 0,
     175            "outbound_message": 0,
     176            }
     177        self._debug_outstanding = 0
     178
     179    def _debug_retired(self, res):
     180        self._debug_outstanding -= 1
     181        return res
     182
     183    def startService(self):
     184        service.Service.startService(self)
     185        self._introducer_error = None
     186        rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
     187        self._introducer_reconnector = rc
     188        def connect_failed(failure):
     189            self.log("Initial Introducer connection failed: perhaps it's down",
     190                     level=log.WEIRD, failure=failure, umid="c5MqUQ")
     191        d = self._tub.getReference(self.introducer_furl)
     192        d.addErrback(connect_failed)
     193
     194    def _got_introducer(self, publisher):
     195        self.log("connected to introducer, getting versions")
     196        default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
     197                    { },
     198                    "application-version": "unknown: no get_version()",
     199                    }
     200        d = rrefutil.add_version_to_remote_reference(publisher, default)
     201        d.addCallback(self._got_versioned_introducer)
     202        d.addErrback(self._got_error)
     203
     204    def _got_error(self, f):
     205        # TODO: for the introducer, perhaps this should halt the application
     206        self._introducer_error = f # polled by tests
     207
     208    def _got_versioned_introducer(self, publisher):
     209        self.log("got introducer version: %s" % (publisher.version,))
     210        # we require a V1 introducer
     211        needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
     212        if needed not in publisher.version:
     213            raise InsufficientVersionError(needed, publisher.version)
     214        self._publisher = publisher
     215        publisher.notifyOnDisconnect(self._disconnected)
     216        self._maybe_publish()
     217        self._maybe_subscribe()
     218
     219    def _disconnected(self):
     220        self.log("bummer, we've lost our connection to the introducer")
     221        self._publisher = None
     222        self._subscriptions.clear()
     223
     224    def log(self, *args, **kwargs):
     225        if "facility" not in kwargs:
     226            kwargs["facility"] = "tahoe.introducer"
     227        return log.msg(*args, **kwargs)
     228
     229
     230    def publish(self, furl, service_name, remoteinterface_name):
     231        assert type(self._nickname_utf8) is str # we always send UTF-8
     232        ann = (furl, service_name, remoteinterface_name,
     233               self._nickname_utf8, self._my_version, self._oldest_supported)
     234        self._published_announcements.add(ann)
     235        self._maybe_publish()
     236
     237    def subscribe_to(self, service_name, cb, *args, **kwargs):
     238        self._local_subscribers.append( (service_name,cb,args,kwargs) )
     239        self._subscribed_service_names.add(service_name)
     240        self._maybe_subscribe()
     241        for (servicename,nodeid),ann_d in self._current_announcements.items():
     242            if servicename == service_name:
     243                eventually(cb, nodeid, ann_d)
     244
     245    def _maybe_subscribe(self):
     246        if not self._publisher:
     247            self.log("want to subscribe, but no introducer yet",
     248                     level=log.NOISY)
     249            return
     250        for service_name in self._subscribed_service_names:
     251            if service_name not in self._subscriptions:
     252                # there is a race here, but the subscription desk ignores
     253                # duplicate requests.
     254                self._subscriptions.add(service_name)
     255                self._debug_outstanding += 1
     256                d = self._publisher.callRemote("subscribe", self, service_name)
     257                d.addBoth(self._debug_retired)
     258                d.addErrback(rrefutil.trap_deadref)
     259                d.addErrback(log.err, format="server errored during subscribe",
     260                             facility="tahoe.introducer",
     261                             level=log.WEIRD, umid="2uMScQ")
     262
     263    def _maybe_publish(self):
     264        if not self._publisher:
     265            self.log("want to publish, but no introducer yet", level=log.NOISY)
     266            return
     267        # this re-publishes everything. The Introducer ignores duplicates
     268        for ann in self._published_announcements:
     269            self._debug_counts["outbound_message"] += 1
     270            self._debug_outstanding += 1
     271            d = self._publisher.callRemote("publish", ann)
     272            d.addBoth(self._debug_retired)
     273            d.addErrback(rrefutil.trap_deadref)
     274            d.addErrback(log.err,
     275                         format="server errored during publish %(ann)s",
     276                         ann=ann, facility="tahoe.introducer",
     277                         level=log.WEIRD, umid="xs9pVQ")
     278
     279
     280
     281    def remote_announce(self, announcements):
     282        self.log("received %d announcements" % len(announcements))
     283        self._debug_counts["inbound_message"] += 1
     284        for ann in announcements:
     285            try:
     286                self._process_announcement(ann)
     287            except:
     288                log.err(format="unable to process announcement %(ann)s",
     289                        ann=ann)
     290                # Don't let a corrupt announcement prevent us from processing
     291                # the remaining ones. Don't return an error to the server,
     292                # since they'd just ignore it anyways.
     293                pass
     294
     295    def _process_announcement(self, ann):
     296        self._debug_counts["inbound_announcement"] += 1
     297        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
     298        if service_name not in self._subscribed_service_names:
     299            self.log("announcement for a service we don't care about [%s]"
     300                     % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
     301            self._debug_counts["wrong_service"] += 1
     302            return
     303        self.log("announcement for [%s]: %s" % (service_name, ann),
     304                 umid="BoKEag")
     305        assert type(furl) is str
     306        assert type(service_name) is str
     307        assert type(ri_name) is str
     308        assert type(nickname_utf8) is str
     309        nickname = nickname_utf8.decode("utf-8")
     310        assert type(nickname) is unicode
     311        assert type(ver) is str
     312        assert type(oldest) is str
     313
     314        nodeid = b32decode(SturdyRef(furl).tubID.upper())
     315        nodeid_s = idlib.shortnodeid_b2a(nodeid)
     316
     317        ann_d = { "version": 0,
     318                  "service-name": service_name,
     319
     320                  "FURL": furl,
     321                  "nickname": nickname,
     322                  "app-versions": {}, # need #466 and v2 introducer
     323                  "my-version": ver,
     324                  "oldest-supported": oldest,
     325                  }
     326
     327        index = (service_name, nodeid)
     328        if self._current_announcements.get(index, None) == ann_d:
     329            self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
     330                     service=service_name, nodeid=nodeid_s,
     331                     level=log.UNUSUAL, umid="B1MIdA")
     332            self._debug_counts["duplicate_announcement"] += 1
     333            return
     334        if index in self._current_announcements:
     335            self._debug_counts["update"] += 1
     336        else:
     337            self._debug_counts["new_announcement"] += 1
     338
     339        self._current_announcements[index] = ann_d
     340        # note: we never forget an index, but we might update its value
     341
     342        for (service_name2,cb,args,kwargs) in self._local_subscribers:
     343            if service_name2 == service_name:
     344                eventually(cb, nodeid, ann_d, *args, **kwargs)
     345
     346    def remote_set_encoding_parameters(self, parameters):
     347        self.encoding_parameters = parameters
     348
     349    def connected_to_introducer(self):
     350        return bool(self._publisher)
     351
     352class IntroducerService_v1(service.MultiService, Referenceable):
     353    implements(RIIntroducerPublisherAndSubscriberService_v1)
     354    name = "introducer"
     355    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
     356                 { },
     357                "application-version": str(allmydata.__full_version__),
     358                }
     359
     360    def __init__(self, basedir="."):
     361        service.MultiService.__init__(self)
     362        self.introducer_url = None
     363        # 'index' is (service_name, tubid)
     364        self._announcements = {} # dict of index -> (announcement, timestamp)
     365        self._subscribers = {} # dict of (rref->timestamp) dicts
     366        self._debug_counts = {"inbound_message": 0,
     367                              "inbound_duplicate": 0,
     368                              "inbound_update": 0,
     369                              "outbound_message": 0,
     370                              "outbound_announcements": 0,
     371                              "inbound_subscribe": 0}
     372        self._debug_outstanding = 0
     373
     374    def _debug_retired(self, res):
     375        self._debug_outstanding -= 1
     376        return res
     377
     378    def log(self, *args, **kwargs):
     379        if "facility" not in kwargs:
     380            kwargs["facility"] = "tahoe.introducer"
     381        return log.msg(*args, **kwargs)
     382
     383    def get_announcements(self):
     384        return self._announcements
     385    def get_subscribers(self):
     386        return self._subscribers
     387
     388    def remote_get_version(self):
     389        return self.VERSION
     390
     391    def remote_publish(self, announcement):
     392        try:
     393            self._publish(announcement)
     394        except:
     395            log.err(format="Introducer.remote_publish failed on %(ann)s",
     396                    ann=announcement, level=log.UNUSUAL, umid="620rWA")
     397            raise
     398
     399    def _publish(self, announcement):
     400        self._debug_counts["inbound_message"] += 1
     401        self.log("introducer: announcement published: %s" % (announcement,) )
     402        (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
     403        #print "PUB", service_name, nickname_utf8
     404
     405        nodeid = b32decode(SturdyRef(furl).tubID.upper())
     406        index = (service_name, nodeid)
     407
     408        if index in self._announcements:
     409            (old_announcement, timestamp) = self._announcements[index]
     410            if old_announcement == announcement:
     411                self.log("but we already knew it, ignoring", level=log.NOISY)
     412                self._debug_counts["inbound_duplicate"] += 1
     413                return
     414            else:
     415                self.log("old announcement being updated", level=log.NOISY)
     416                self._debug_counts["inbound_update"] += 1
     417        self._announcements[index] = (announcement, time.time())
     418
     419        for s in self._subscribers.get(service_name, []):
     420            self._debug_counts["outbound_message"] += 1
     421            self._debug_counts["outbound_announcements"] += 1
     422            self._debug_outstanding += 1
     423            d = s.callRemote("announce", set([announcement]))
     424            d.addBoth(self._debug_retired)
     425            d.addErrback(rrefutil.trap_deadref)
     426            d.addErrback(log.err,
     427                         format="subscriber errored on announcement %(ann)s",
     428                         ann=announcement, facility="tahoe.introducer",
     429                         level=log.UNUSUAL, umid="jfGMXQ")
     430
     431    def remote_subscribe(self, subscriber, service_name):
     432        self.log("introducer: subscription[%s] request at %s" % (service_name,
     433                                                                 subscriber))
     434        self._debug_counts["inbound_subscribe"] += 1
     435        if service_name not in self._subscribers:
     436            self._subscribers[service_name] = {}
     437        subscribers = self._subscribers[service_name]
     438        if subscriber in subscribers:
     439            self.log("but they're already subscribed, ignoring",
     440                     level=log.UNUSUAL)
     441            return
     442        subscribers[subscriber] = time.time()
     443        def _remove():
     444            self.log("introducer: unsubscribing[%s] %s" % (service_name,
     445                                                           subscriber))
     446            subscribers.pop(subscriber, None)
     447        subscriber.notifyOnDisconnect(_remove)
     448
     449        announcements = set(
     450            [ ann
     451              for (sn2,nodeid),(ann,when) in self._announcements.items()
     452              if sn2 == service_name] )
     453
     454        self._debug_counts["outbound_message"] += 1
     455        self._debug_counts["outbound_announcements"] += len(announcements)
     456        self._debug_outstanding += 1
     457        d = subscriber.callRemote("announce", announcements)
     458        d.addBoth(self._debug_retired)
     459        d.addErrback(rrefutil.trap_deadref)
     460        d.addErrback(log.err,
     461                     format="subscriber errored during subscribe %(anns)s",
     462                     anns=announcements, facility="tahoe.introducer",
     463                     level=log.UNUSUAL, umid="1XChxA")
  • src/allmydata/introducer/server.py

    diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py
    index 117fcb5..7868e8f 100644
    a b  
    11
    22import time, os.path
    3 from base64 import b32decode
    43from zope.interface import implements
    54from twisted.application import service
    6 from foolscap.api import Referenceable, SturdyRef
     5from foolscap.api import Referenceable
    76import allmydata
    87from allmydata import node
    9 from allmydata.util import log, rrefutil
     8from allmydata.util import log, base32, idlib
    109from allmydata.introducer.interfaces import \
    11      RIIntroducerPublisherAndSubscriberService
     10     RIIntroducerPublisherAndSubscriberService_v2
     11from allmydata.introducer.common import convert_announcement_v1_to_v2, \
     12     convert_announcement_v2_to_v1, unsign, make_index
    1213
    1314class IntroducerNode(node.Node):
    1415    PORTNUMFILE = "introducer.port"
    class IntroducerNode(node.Node): 
    3031        def _publish(res):
    3132            self.introducer_url = self.tub.registerReference(introducerservice,
    3233                                                             "introducer")
    33             self.log(" introducer is at %s" % self.introducer_url)
     34            self.log(" introducer is at %s" % self.introducer_url,
     35                     umid="qF2L9A")
    3436            self.write_config("introducer.furl", self.introducer_url + "\n")
    3537        d.addCallback(_publish)
    3638        d.addErrback(log.err, facility="tahoe.init",
    3739                     level=log.BAD, umid="UaNs9A")
    3840
    3941    def init_web(self, webport):
    40         self.log("init_web(webport=%s)", args=(webport,))
     42        self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA")
    4143
    4244        from allmydata.webish import IntroducerWebishServer
    4345        nodeurl_path = os.path.join(self.basedir, "node.url")
    4446        ws = IntroducerWebishServer(self, webport, nodeurl_path)
    4547        self.add_service(ws)
    4648
     49class SubscriberAdapter_v1: # for_v1
     50    """I wrap a RemoteReference that points at an old v1 subscriber, enabling
     51    it to be treated like a v2 subscriber.
     52    """
     53
     54    def __init__(self, original):
     55        self.original = original
     56    def __eq__(self, them):
     57        return self.original == them
     58    def __ne__(self, them):
     59        return self.original != them
     60    def __hash__(self):
     61        return hash(self.original)
     62    def getRemoteTubID(self):
     63        return self.original.getRemoteTubID()
     64    def getSturdyRef(self):
     65        return self.original.getSturdyRef()
     66    def getPeer(self):
     67        return self.original.getPeer()
     68    def callRemote(self, methname, *args, **kwargs):
     69        m = getattr(self, "wrap_" + methname)
     70        return m(*args, **kwargs)
     71    def wrap_announce_v2(self, announcements):
     72        anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements]
     73        return self.original.callRemote("announce", set(anns_v1))
     74    def wrap_set_encoding_parameters(self, parameters):
     75        return self.original.callRemote("set_encoding_parameters", parameters)
     76    def notifyOnDisconnect(self, *args, **kwargs):
     77        return self.original.notifyOnDisconnect(*args, **kwargs)
     78
    4779class IntroducerService(service.MultiService, Referenceable):
    48     implements(RIIntroducerPublisherAndSubscriberService)
     80    implements(RIIntroducerPublisherAndSubscriberService_v2)
    4981    name = "introducer"
    5082    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1":
    5183                 { },
    class IntroducerService(service.MultiService, Referenceable): 
    5688        service.MultiService.__init__(self)
    5789        self.introducer_url = None
    5890        # 'index' is (service_name, tubid)
    59         self._announcements = {} # dict of index -> (announcement, timestamp)
    60         self._subscribers = {} # dict of (rref->timestamp) dicts
     91        self._announcements = {} # dict of index ->
     92                                 # (ann_s, canary, ann_d, timestamp)
     93
     94        # ann_d is cleaned up (nickname is always unicode, servicename is
     95        # always ascii, etc, even though simplejson.loads sometimes returns
     96        # either)
     97
     98        # self._subscribers is a dict mapping servicename to subscriptions
     99        # 'subscriptions' is a dict mapping rref to a subscription
     100        # 'subscription' is a tuple of (subscriber_info, timestamp)
     101        # 'subscriber_info' is a dict, provided directly for v2 clients, or
     102        # synthesized for v1 clients. The expected keys are:
     103        #  version, nickname, app-versions, my-version, oldest-supported
     104        self._subscribers = {}
     105
     106        # self._stub_client_announcements contains the information provided
     107        # by v1 clients. We stash this so we can match it up with their
     108        # subscriptions.
     109        self._stub_client_announcements = {} # maps tubid to sinfo # for_v1
     110
    61111        self._debug_counts = {"inbound_message": 0,
    62112                              "inbound_duplicate": 0,
    63113                              "inbound_update": 0,
    64114                              "outbound_message": 0,
    65115                              "outbound_announcements": 0,
    66116                              "inbound_subscribe": 0}
     117        self._debug_outstanding = 0 # also covers SubscriberAdapter_v1
     118
     119    def _debug_retired(self, res):
     120        self._debug_outstanding -= 1
     121        return res
    67122
    68123    def log(self, *args, **kwargs):
    69124        if "facility" not in kwargs:
    70             kwargs["facility"] = "tahoe.introducer"
     125            kwargs["facility"] = "tahoe.introducer.server"
    71126        return log.msg(*args, **kwargs)
    72127
    73128    def get_announcements(self):
    74129        return self._announcements
    75130    def get_subscribers(self):
    76         return self._subscribers
     131        """Return a list of (service_name, when, subscriber_info, rref) for
     132        all subscribers. subscriber_info is a dict with the following keys:
     133        version, nickname, app-versions, my-version, oldest-supported"""
     134        s = []
     135        for service_name, subscriptions in self._subscribers.items():
     136            for rref,(subscriber_info,when) in subscriptions.items():
     137                s.append( (service_name, when, subscriber_info, rref) )
     138        return s
    77139
    78140    def remote_get_version(self):
    79141        return self.VERSION
    80142
    81     def remote_publish(self, announcement):
     143    def remote_publish(self, ann_s): # for_v1
     144        lp = self.log("introducer: old (v1) announcement published: %s"
     145                      % (ann_s,), umid="6zGOIw")
     146        ann_v2 = convert_announcement_v1_to_v2(ann_s)
     147        return self.publish(ann_v2, None, lp)
     148
     149    def remote_publish_v2(self, ann_s, canary):
     150        lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ")
     151        return self.publish(ann_s, canary, lp)
     152
     153    def publish(self, ann_s, canary, lp):
    82154        try:
    83             self._publish(announcement)
     155            self._publish(ann_s, canary, lp)
    84156        except:
    85157            log.err(format="Introducer.remote_publish failed on %(ann)s",
    86                     ann=announcement, level=log.UNUSUAL, umid="620rWA")
     158                    ann=ann_s,
     159                    level=log.UNUSUAL, parent=lp, umid="620rWA")
    87160            raise
    88161
    89     def _publish(self, announcement):
     162    def _publish(self, ann_s, canary, lp):
    90163        self._debug_counts["inbound_message"] += 1
    91         self.log("introducer: announcement published: %s" % (announcement,) )
    92         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement
     164        self.log("introducer: announcement published: %s" % (ann_s,),
     165                 umid="wKHgCw")
     166        ann_d, key = unsign(ann_s) # might raise BadSignatureError
     167        index = make_index(ann_d, key)
    93168
    94         nodeid = b32decode(SturdyRef(furl).tubID.upper())
    95         index = (service_name, nodeid)
     169        service_name = str(ann_d["service-name"])
     170        if service_name == "stub_client": # for_v1
     171            self._attach_stub_client(ann_d, index, lp)
     172            return
    96173
    97174        if index in self._announcements:
    98             (old_announcement, timestamp) = self._announcements[index]
    99             if old_announcement == announcement:
    100                 self.log("but we already knew it, ignoring", level=log.NOISY)
     175            (old_ann_s, canary, ann_d, timestamp) = self._announcements[index]
     176            if old_ann_s == ann_s:
     177                self.log("but we already knew it, ignoring", level=log.NOISY,
     178                         umid="myxzLw")
    101179                self._debug_counts["inbound_duplicate"] += 1
    102180                return
    103181            else:
    104                 self.log("old announcement being updated", level=log.NOISY)
     182                self.log("old announcement being updated", level=log.NOISY,
     183                         umid="304r9g")
    105184                self._debug_counts["inbound_update"] += 1
    106         self._announcements[index] = (announcement, time.time())
     185        self._announcements[index] = (ann_s, canary, ann_d, time.time())
     186        #if canary:
     187        #    canary.notifyOnDisconnect ...
     188        # use a CanaryWatcher? with cw.is_connected()?
     189        # actually we just want foolscap to give rref.is_connected(), since
     190        # this is only for the status display
    107191
    108192        for s in self._subscribers.get(service_name, []):
    109193            self._debug_counts["outbound_message"] += 1
    110194            self._debug_counts["outbound_announcements"] += 1
    111             d = s.callRemote("announce", set([announcement]))
    112             d.addErrback(rrefutil.trap_deadref)
     195            self._debug_outstanding += 1
     196            d = s.callRemote("announce_v2", set([ann_s]))
     197            d.addBoth(self._debug_retired)
    113198            d.addErrback(log.err,
    114199                         format="subscriber errored on announcement %(ann)s",
    115                          ann=announcement, facility="tahoe.introducer",
     200                         ann=ann_s, facility="tahoe.introducer",
    116201                         level=log.UNUSUAL, umid="jfGMXQ")
    117202
    118     def remote_subscribe(self, subscriber, service_name):
    119         self.log("introducer: subscription[%s] request at %s" % (service_name,
    120                                                                  subscriber))
     203    def _attach_stub_client(self, ann_d, index, lp):
     204        # There might be a v1 subscriber for whom this is a stub_client.
     205        # We might have received the subscription before the stub_client
     206        # announcement, in which case we now need to fix up the record in
     207        # self._subscriptions .
     208
     209        # record it for later, in case the stub_client arrived before the
     210        # subscription
     211        subscriber_info = self._get_subscriber_info_from_ann_d(ann_d)
     212        ann_tubid = index[1]
     213        self._stub_client_announcements[ann_tubid] = subscriber_info
     214
     215        lp2 = self.log("stub_client announcement, "
     216                       "looking for matching subscriber",
     217                       parent=lp, level=log.NOISY, umid="BTywDg")
     218
     219        for sn in self._subscribers:
     220            s = self._subscribers[sn]
     221            for (subscriber, info) in s.items():
     222                # we correlate these by looking for a subscriber whose tubid
     223                # matches this announcement
     224                sub_tubid = base32.a2b(subscriber.getRemoteTubID()) # binary
     225                if sub_tubid == ann_tubid:
     226                    self.log(format="found a match, nodeid=%(nodeid)s",
     227                             nodeid=idlib.nodeid_b2a(sub_tubid),
     228                             level=log.NOISY, parent=lp2, umid="xsWs1A")
     229                    # found a match. Does it need info?
     230                    if not info[0]:
     231                        self.log(format="replacing info",
     232                                 level=log.NOISY, parent=lp2, umid="m5kxwA")
     233                        # yup
     234                        s[subscriber] = (subscriber_info, info[1])
     235            # and we don't remember or announce stub_clients beyond what we
     236            # need to get the subscriber_info set up
     237
     238    def _get_subscriber_info_from_ann_d(self, ann_d): # for_v1
     239        sinfo = { "version": ann_d["version"],
     240                  "nickname": ann_d["nickname"],
     241                  "app-versions": ann_d["app-versions"],
     242                  "my-version": ann_d["my-version"],
     243                  "oldest-supported": ann_d["oldest-supported"],
     244                  }
     245        return sinfo
     246
     247    def remote_subscribe(self, subscriber, service_name): # for_v1
     248        self.log("introducer: old (v1) subscription[%s] request at %s"
     249                 % (service_name, subscriber), umid="hJlGUg")
     250        return self.add_subscriber(SubscriberAdapter_v1(subscriber),
     251                                   service_name, None)
     252
     253    def remote_subscribe_v2(self, subscriber, service_name, subscriber_info):
     254        self.log("introducer: subscription[%s] request at %s"
     255                 % (service_name, subscriber), umid="U3uzLg")
     256        return self.add_subscriber(subscriber, service_name, subscriber_info)
     257
     258    def add_subscriber(self, subscriber, service_name, subscriber_info):
    121259        self._debug_counts["inbound_subscribe"] += 1
    122260        if service_name not in self._subscribers:
    123261            self._subscribers[service_name] = {}
    124262        subscribers = self._subscribers[service_name]
    125263        if subscriber in subscribers:
    126264            self.log("but they're already subscribed, ignoring",
    127                      level=log.UNUSUAL)
     265                     level=log.UNUSUAL, umid="Sy9EfA")
    128266            return
    129         subscribers[subscriber] = time.time()
     267
     268        if not subscriber_info: # for_v1
     269            # v1 clients don't provide subscriber_info, but they should
     270            # publish a 'stub client' record which contains the same
     271            # information. If we've already received this, it will be in
     272            # self._stub_client_announcements
     273            tubid_b32 = subscriber.getRemoteTubID()
     274            tubid = base32.a2b(tubid_b32)
     275            if tubid in self._stub_client_announcements:
     276                subscriber_info = self._stub_client_announcements[tubid]
     277
     278        subscribers[subscriber] = (subscriber_info, time.time())
    130279        def _remove():
    131280            self.log("introducer: unsubscribing[%s] %s" % (service_name,
    132                                                            subscriber))
     281                                                           subscriber),
     282                     umid="vYGcJg")
    133283            subscribers.pop(subscriber, None)
    134284        subscriber.notifyOnDisconnect(_remove)
    135285
    136         announcements = set(
    137             [ ann
    138               for (sn2,nodeid),(ann,when) in self._announcements.items()
    139               if sn2 == service_name] )
    140 
    141         self._debug_counts["outbound_message"] += 1
    142         self._debug_counts["outbound_announcements"] += len(announcements)
    143         d = subscriber.callRemote("announce", announcements)
    144         d.addErrback(rrefutil.trap_deadref)
    145         d.addErrback(log.err,
    146                      format="subscriber errored during subscribe %(anns)s",
    147                      anns=announcements, facility="tahoe.introducer",
    148                      level=log.UNUSUAL, umid="mtZepQ")
     286        # now tell them about any announcements they're interested in
     287        announcements = set( [ ann_s
     288                               for idx,(ann_s,canary,ann_d,when)
     289                               in self._announcements.items()
     290                               if idx[0] == service_name] )
     291        if announcements:
     292            self._debug_counts["outbound_message"] += 1
     293            self._debug_counts["outbound_announcements"] += len(announcements)
     294            self._debug_outstanding += 1
     295            d = subscriber.callRemote("announce_v2", announcements)
     296            d.addBoth(self._debug_retired)
     297            d.addErrback(log.err,
     298                         format="subscriber errored during subscribe %(anns)s",
     299                         anns=announcements, facility="tahoe.introducer",
     300                         level=log.UNUSUAL, umid="mtZepQ")
     301            return d
  • src/allmydata/test/test_introducer.py

    diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py
    index 9d0f50e..69d7f1d 100644
    a b from twisted.python import log 
    99from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue
    1010from twisted.application import service
    1111from allmydata.interfaces import InsufficientVersionError
    12 from allmydata.introducer.client import IntroducerClient
     12from allmydata.introducer.client import IntroducerClient, ClientAdapter_v1
    1313from allmydata.introducer.server import IntroducerService
     14from allmydata.introducer import old
    1415# test compatibility with old introducer .tac files
    1516from allmydata.introducer import IntroducerNode
    16 from allmydata.util import pollmixin
     17from allmydata.util import pollmixin, ecdsa
    1718import allmydata.test.common_util as testutil
    1819
    1920class LoggingMultiService(service.MultiService):
    class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): 
    4748
    4849    def test_create(self):
    4950        ic = IntroducerClient(None, "introducer.furl", u"my_nickname",
    50                               "my_version", "oldest_version")
     51                              "my_version", "oldest_version", {})
    5152        self.failUnless(isinstance(ic, IntroducerClient))
    5253
    5354    def test_listen(self):
    5455        i = IntroducerService()
    5556        i.setServiceParent(self.parent)
    5657
    57     def test_duplicate(self):
     58    def test_duplicate_publish(self):
    5859        i = IntroducerService()
    5960        self.failUnlessEqual(len(i.get_announcements()), 0)
    6061        self.failUnlessEqual(len(i.get_subscribers()), 0)
    class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): 
    7374        self.failUnlessEqual(len(i.get_announcements()), 2)
    7475        self.failUnlessEqual(len(i.get_subscribers()), 0)
    7576
     77
     78
     79class Client(unittest.TestCase):
     80    def test_duplicate_receive_v1(self):
     81        ic = IntroducerClient(None,
     82                              "introducer.furl", u"my_nickname",
     83                              "my_version", "oldest_version", {})
     84        announcements = []
     85        ic.subscribe_to("storage",
     86                        lambda nodeid,ann_d: announcements.append(ann_d))
     87        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"
     88        ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")
     89        ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")
     90        ca = ClientAdapter_v1(ic)
     91
     92        ca.remote_announce([ann1])
     93        d = fireEventually()
     94        def _then(ign):
     95            self.failUnlessEqual(len(announcements), 1)
     96            self.failUnlessEqual(announcements[0]["nickname"], u"nick1")
     97            self.failUnlessEqual(announcements[0]["my-version"], "ver23")
     98            self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)
     99            self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
     100            self.failUnlessEqual(ic._debug_counts["update"], 0)
     101            self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)
     102            # now send a duplicate announcement: this should not notify clients
     103            ca.remote_announce([ann1])
     104            return fireEventually()
     105        d.addCallback(_then)
     106        def _then2(ign):
     107            self.failUnlessEqual(len(announcements), 1)
     108            self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)
     109            self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
     110            self.failUnlessEqual(ic._debug_counts["update"], 0)
     111            self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
     112            # and a replacement announcement: same FURL, new other stuff.
     113            # Clients should be notified.
     114            ca.remote_announce([ann1b])
     115            return fireEventually()
     116        d.addCallback(_then2)
     117        def _then3(ign):
     118            self.failUnlessEqual(len(announcements), 2)
     119            self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)
     120            self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)
     121            self.failUnlessEqual(ic._debug_counts["update"], 1)
     122            self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)
     123            # test that the other stuff changed
     124            self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")
     125            self.failUnlessEqual(announcements[-1]["my-version"], "ver24")
     126        d.addCallback(_then3)
     127        return d
     128
     129    def test_duplicate_receive_v2(self):
     130        ic1 = IntroducerClient(None,
     131                               "introducer.furl", u"my_nickname",
     132                               "ver23", "oldest_version", {})
     133        # we use a second client just to create a different-looking
     134        # announcement
     135        ic2 = IntroducerClient(None,
     136                               "introducer.furl", u"my_nickname",
     137                               "ver24","oldest_version",{})
     138        announcements = []
     139        def _received(nodeid, ann_d):
     140            announcements.append( (nodeid, ann_d) )
     141        ic1.subscribe_to("storage", _received)
     142        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp"
     143        furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp"
     144        furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo"
     145
     146        privkey = ecdsa.SigningKey.generate()
     147        pubkey = privkey.get_verifying_key()
     148        pubkey_hex = pubkey.to_string().encode("hex")
     149
     150        # ann1: ic1, furl1
     151        # ann1a: ic1, furl1a (same SturdyRef, different connection hints)
     152        # ann1b: ic2, furl1
     153        # ann2: ic2, furl2
     154
     155        self.ann1 = ic1.create_announcement(furl1, "storage", "RIStorage",
     156                                            privkey)
     157        self.ann1a =  ic1.create_announcement(furl1a, "storage", "RIStorage",
     158                                              privkey)
     159        self.ann1b = ic2.create_announcement(furl1, "storage", "RIStorage",
     160                                             privkey)
     161        self.ann2 = ic2.create_announcement(furl2, "storage", "RIStorage",
     162                                            privkey)
     163
     164        ic1.remote_announce_v2([self.ann1]) # queues eventual-send
     165        d = fireEventually()
     166        def _then1(ign):
     167            self.failUnlessEqual(len(announcements), 1)
     168            nodeid,ann_d = announcements[0]
     169            self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex)
     170            self.failUnlessEqual(ann_d["FURL"], furl1)
     171            self.failUnlessEqual(ann_d["my-version"], "ver23")
     172        d.addCallback(_then1)
     173
     174        # now send a duplicate announcement. This should not fire the
     175        # subscriber
     176        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1]))
     177        d.addCallback(fireEventually)
     178        def _then2(ign):
     179            self.failUnlessEqual(len(announcements), 1)
     180        d.addCallback(_then2)
     181
     182        # and a replacement announcement: same FURL, new other stuff. The
     183        # subscriber *should* be fired.
     184        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b]))
     185        d.addCallback(fireEventually)
     186        def _then3(ign):
     187            self.failUnlessEqual(len(announcements), 2)
     188            nodeid,ann_d = announcements[-1]
     189            self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex)
     190            self.failUnlessEqual(ann_d["FURL"], furl1)
     191            self.failUnlessEqual(ann_d["my-version"], "ver24")
     192        d.addCallback(_then3)
     193
     194        # and a replacement announcement with a different FURL (it uses
     195        # different connection hints)
     196        d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a]))
     197        d.addCallback(fireEventually)
     198        def _then4(ign):
     199            self.failUnlessEqual(len(announcements), 3)
     200            nodeid,ann_d = announcements[-1]
     201            self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex)
     202            self.failUnlessEqual(ann_d["FURL"], furl1a)
     203            self.failUnlessEqual(ann_d["my-version"], "ver23")
     204        d.addCallback(_then4)
     205
     206        # now add a new subscription, which should be called with the
     207        # backlog. The introducer only records one announcement per index, so
     208        # the backlog will only have the latest message.
     209        announcements2 = []
     210        def _received2(nodeid, ann_d):
     211            announcements2.append( (nodeid, ann_d) )
     212        d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2))
     213        d.addCallback(fireEventually)
     214        def _then5(ign):
     215            self.failUnlessEqual(len(announcements2), 1)
     216            nodeid,ann_d = announcements2[-1]
     217            self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex)
     218            self.failUnlessEqual(ann_d["FURL"], furl1a)
     219            self.failUnlessEqual(ann_d["my-version"], "ver23")
     220        d.addCallback(_then5)
     221        return d
     222
    76223class SystemTestMixin(ServiceMixin, pollmixin.PollMixin):
    77224
    78225    def create_tub(self, portnum=0):
    class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): 
    88235            assert self.central_portnum == portnum
    89236        tub.setLocation("localhost:%d" % self.central_portnum)
    90237
     238V1 = "v1"; V2 = "v2"
    91239class SystemTest(SystemTestMixin, unittest.TestCase):
    92240
    93     def test_system(self):
    94         self.basedir = "introducer/SystemTest/system"
    95         os.makedirs(self.basedir)
    96         return self.do_system_test(IntroducerService)
    97     test_system.timeout = 480 # occasionally takes longer than 350s on "draco"
    98 
    99     def do_system_test(self, create_introducer):
     241    def do_system_test(self, server_version):
    100242        self.create_tub()
    101         introducer = create_introducer()
     243        if server_version == V1:
     244            introducer = old.IntroducerService_v1()
     245        else:
     246            introducer = IntroducerService()
    102247        introducer.setServiceParent(self.parent)
    103248        iff = os.path.join(self.basedir, "introducer.furl")
    104249        tub = self.central_tub
    105250        ifurl = self.central_tub.registerReference(introducer, furlFile=iff)
    106251        self.introducer_furl = ifurl
    107252
    108         NUMCLIENTS = 5
    109         # we have 5 clients who publish themselves, and an extra one does
    110         # which not. When the connections are fully established, all six nodes
     253        # we have 5 clients who publish themselves as storage servers, and a
     254        # sixth which does which not. All 6 clients subscriber to hear about
     255        # storage. When the connections are fully established, all six nodes
    111256        # should have 5 connections each.
     257        NUM_STORAGE = 5
     258        NUM_CLIENTS = 6
    112259
    113260        clients = []
    114261        tubs = {}
    115262        received_announcements = {}
    116         NUM_SERVERS = NUMCLIENTS
    117263        subscribing_clients = []
    118264        publishing_clients = []
     265        privkeys = {}
     266        expected_announcements = [0 for c in range(NUM_CLIENTS)]
    119267
    120         for i in range(NUMCLIENTS+1):
     268        for i in range(NUM_CLIENTS):
    121269            tub = Tub()
    122270            #tub.setOption("logLocalFailures", True)
    123271            #tub.setOption("logRemoteFailures", True)
    class SystemTest(SystemTestMixin, unittest.TestCase): 
    128276            tub.setLocation("localhost:%d" % portnum)
    129277
    130278            log.msg("creating client %d: %s" % (i, tub.getShortTubID()))
    131             c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i,
    132                                  "version", "oldest")
     279            if i == 0:
     280                c = old.IntroducerClient_v1(tub, self.introducer_furl,
     281                                            u"nickname-%d" % i,
     282                                            "version", "oldest")
     283            else:
     284                c = IntroducerClient(tub, self.introducer_furl,
     285                                     u"nickname-%d" % i,
     286                                     "version", "oldest",
     287                                     {"component": "component-v1"})
    133288            received_announcements[c] = {}
    134289            def got(serverid, ann_d, announcements):
    135290                announcements[serverid] = ann_d
    136291            c.subscribe_to("storage", got, received_announcements[c])
    137292            subscribing_clients.append(c)
    138 
    139             if i < NUMCLIENTS:
    140                 node_furl = tub.registerReference(Referenceable())
    141                 c.publish(node_furl, "storage", "ri_name")
     293            expected_announcements[i] += 1 # all expect a 'storage' announcement
     294
     295            node_furl = tub.registerReference(Referenceable())
     296            if i < NUM_STORAGE:
     297                if i == 1:
     298                    # sign the announcement
     299                    privkey = privkeys[c] = ecdsa.SigningKey.generate()
     300                    c.publish(node_furl, "storage", "ri_name", privkey)
     301                else:
     302                    c.publish(node_furl, "storage", "ri_name")
    142303                publishing_clients.append(c)
    143             # the last one does not publish anything
     304            else:
     305                # the last one does not publish anything
     306                pass
     307
     308            if i == 0:
     309                # users of the V1 client were required to publish a
     310                # 'stub_client' record (somewhat after they published the
     311                # 'storage' record), so the introducer could see their
     312                # version. Match that behavior.
     313                c.publish(node_furl, "stub_client", "stub_ri_name")
     314
     315            if i == 2:
     316                # also publish something that nobody cares about
     317                boring_furl = tub.registerReference(Referenceable())
     318                c.publish(boring_furl, "boring", "ri_name")
    144319
    145320            c.setServiceParent(self.parent)
    146321            clients.append(c)
    147322            tubs[c] = tub
    148323
    149         def _wait_for_all_connections():
    150             for c in subscribing_clients:
    151                 if len(received_announcements[c]) < NUM_SERVERS:
     324
     325        def _wait_for_connected(ign):
     326            def _connected():
     327                for c in clients:
     328                    if not c.connected_to_introducer():
     329                        return False
     330                return True
     331            return self.poll(_connected)
     332
     333        # we watch the clients to determine when the system has settled down.
     334        # Then we can look inside the server to assert things about its
     335        # state.
     336
     337        def _wait_for_expected_announcements(ign):
     338            def _got_expected_announcements():
     339                for i,c in enumerate(subscribing_clients):
     340                    if len(received_announcements[c]) < expected_announcements[i]:
     341                        return False
     342                return True
     343            return self.poll(_got_expected_announcements)
     344
     345        # before shutting down any Tub, we'd like to know that there are no
     346        # messages outstanding
     347
     348        def _wait_until_idle(ign):
     349            def _idle():
     350                for c in subscribing_clients + publishing_clients:
     351                    if c._debug_outstanding:
     352                        return False
     353                if introducer._debug_outstanding:
    152354                    return False
    153             return True
    154         d = self.poll(_wait_for_all_connections)
     355                return True
     356            return self.poll(_idle)
     357
     358        d = defer.succeed(None)
     359        d.addCallback(_wait_for_connected)
     360        d.addCallback(_wait_for_expected_announcements)
     361        d.addCallback(_wait_until_idle)
    155362
    156363        def _check1(res):
    157364            log.msg("doing _check1")
    158365            dc = introducer._debug_counts
    159             self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS)
    160             self.failUnlessEqual(dc["inbound_duplicate"], 0)
     366            if server_version == V1:
     367                # each storage server publishes a record, and (after its
     368                # 'subscribe' has been ACKed) also publishes a "stub_client".
     369                # The non-storage client (which subscribes) also publishes a
     370                # stub_client. There is also one "boring" service. The number
     371                # of messages is higher, because the stub_clients aren't
     372                # published until after we get the 'subscribe' ack (since we
     373                # don't realize that we're dealing with a v1 server [which
     374                # needs stub_clients] until then), and the act of publishing
     375                # the stub_client causes us to re-send all previous
     376                # announcements.
     377                self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"],
     378                                     NUM_STORAGE + NUM_CLIENTS + 1)
     379            else:
     380                # each storage server publishes a record. There is also one
     381                # "stub_client" and one "boring"
     382                self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2)
     383                self.failUnlessEqual(dc["inbound_duplicate"], 0)
    161384            self.failUnlessEqual(dc["inbound_update"], 0)
    162             self.failUnless(dc["outbound_message"])
     385            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
     386            # the number of outbound messages is tricky.. I think it depends
     387            # upon a race between the publish and the subscribe messages.
     388            self.failUnless(dc["outbound_message"] > 0)
     389            # each client subscribes to "storage", and each server publishes
     390            self.failUnlessEqual(dc["outbound_announcements"],
     391                                 NUM_STORAGE*NUM_CLIENTS)
    163392
    164             for c in clients:
    165                 self.failUnless(c.connected_to_introducer())
    166393            for c in subscribing_clients:
    167394                cdc = c._debug_counts
    168395                self.failUnless(cdc["inbound_message"])
    169396                self.failUnlessEqual(cdc["inbound_announcement"],
    170                                      NUM_SERVERS)
     397                                     NUM_STORAGE)
    171398                self.failUnlessEqual(cdc["wrong_service"], 0)
    172399                self.failUnlessEqual(cdc["duplicate_announcement"], 0)
    173400                self.failUnlessEqual(cdc["update"], 0)
    174401                self.failUnlessEqual(cdc["new_announcement"],
    175                                      NUM_SERVERS)
     402                                     NUM_STORAGE)
    176403                anns = received_announcements[c]
    177                 self.failUnlessEqual(len(anns), NUM_SERVERS)
     404                self.failUnlessEqual(len(anns), NUM_STORAGE)
    178405
    179406                nodeid0 = b32decode(tubs[clients[0]].tubID.upper())
    180407                ann_d = anns[nodeid0]
    181408                nick = ann_d["nickname"]
    182409                self.failUnlessEqual(type(nick), unicode)
    183410                self.failUnlessEqual(nick, u"nickname-0")
    184             for c in publishing_clients:
    185                 cdc = c._debug_counts
    186                 self.failUnlessEqual(cdc["outbound_message"], 1)
     411            if server_version == V1:
     412                for c in publishing_clients:
     413                    cdc = c._debug_counts
     414                    expected = 1 # storage
     415                    if c is clients[2]:
     416                        expected += 1 # boring
     417                    if c is not clients[0]:
     418                        # the v2 client tries to call publish_v2, which fails
     419                        # because the server is v1. It then re-sends
     420                        # everything it has so far, plus a stub_client record
     421                        expected = 2*expected + 1
     422                    if c is clients[0]:
     423                        # we always tell v1 client to send stub_client
     424                        expected += 1
     425                    self.failUnlessEqual(cdc["outbound_message"], expected)
     426            else:
     427                for c in publishing_clients:
     428                    cdc = c._debug_counts
     429                    expected = 1
     430                    if c in [clients[0], # stub_client
     431                             clients[2], # boring
     432                             ]:
     433                        expected = 2
     434                    self.failUnlessEqual(cdc["outbound_message"], expected)
     435            log.msg("_check1 done")
    187436        d.addCallback(_check1)
    188437
    189438        # force an introducer reconnect, by shutting down the Tub it's using
    class SystemTest(SystemTestMixin, unittest.TestCase): 
    196445        d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub"))
    197446        d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
    198447
    199         def _wait_for_introducer_loss():
    200             for c in clients:
    201                 if c.connected_to_introducer():
    202                     return False
    203             return True
    204         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
     448        def _wait_for_introducer_loss(ign):
     449            def _introducer_lost():
     450                for c in clients:
     451                    if c.connected_to_introducer():
     452                        return False
     453                return True
     454            return self.poll(_introducer_lost)
     455        d.addCallback(_wait_for_introducer_loss)
    205456
    206457        def _restart_introducer_tub(_ign):
    207458            log.msg("restarting introducer's Tub")
    208 
    209             dc = introducer._debug_counts
    210             self.expected_count = dc["inbound_message"] + NUM_SERVERS
    211             self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1
    212             introducer._debug0 = dc["outbound_message"]
    213             for c in subscribing_clients:
    214                 cdc = c._debug_counts
    215                 c._debug0 = cdc["inbound_message"]
    216 
     459            # reset counters
     460            for i in range(NUM_CLIENTS):
     461                c = subscribing_clients[i]
     462                for k in c._debug_counts:
     463                    c._debug_counts[k] = 0
     464            for k in introducer._debug_counts:
     465                introducer._debug_counts[k] = 0
     466            expected_announcements[i] += 1 # new 'storage' for everyone
    217467            self.create_tub(self.central_portnum)
    218468            newfurl = self.central_tub.registerReference(introducer,
    219469                                                         furlFile=iff)
    220470            assert newfurl == self.introducer_furl
    221471        d.addCallback(_restart_introducer_tub)
    222472
    223         def _wait_for_introducer_reconnect():
    224             # wait until:
    225             #  all clients are connected
    226             #  the introducer has received publish messages from all of them
    227             #  the introducer has received subscribe messages from all of them
    228             #  the introducer has sent (duplicate) announcements to all of them
    229             #  all clients have received (duplicate) announcements
    230             dc = introducer._debug_counts
    231             for c in clients:
    232                 if not c.connected_to_introducer():
    233                     return False
    234             if dc["inbound_message"] < self.expected_count:
    235                 return False
    236             if dc["inbound_subscribe"] < self.expected_subscribe_count:
    237                 return False
    238             for c in subscribing_clients:
    239                 cdc = c._debug_counts
    240                 if cdc["inbound_message"] < c._debug0+1:
    241                     return False
    242             return True
    243         d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect))
     473        d.addCallback(_wait_for_connected)
     474        d.addCallback(_wait_for_expected_announcements)
     475        d.addCallback(_wait_until_idle)
     476        d.addCallback(lambda _ign: log.msg(" reconnected"))
    244477
    245478        def _check2(res):
    246479            log.msg("doing _check2")
    247480            # assert that the introducer sent out new messages, one per
    248481            # subscriber
    249482            dc = introducer._debug_counts
    250             self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS)
    251             self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS)
    252             self.failUnlessEqual(dc["inbound_update"], 0)
    253             self.failUnlessEqual(dc["outbound_message"],
    254                                  introducer._debug0 + len(subscribing_clients))
    255             for c in clients:
    256                 self.failUnless(c.connected_to_introducer())
     483            self.failUnlessEqual(dc["outbound_announcements"],
     484                                 NUM_STORAGE*NUM_CLIENTS)
     485            self.failUnless(dc["outbound_message"] > 0)
     486            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
    257487            for c in subscribing_clients:
    258488                cdc = c._debug_counts
    259                 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS)
     489                self.failUnlessEqual(cdc["inbound_message"], 1)
     490                self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
     491                self.failUnlessEqual(cdc["new_announcement"], 0)
     492                self.failUnlessEqual(cdc["wrong_service"], 0)
     493                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
    260494        d.addCallback(_check2)
    261495
    262496        # Then force an introducer restart, by shutting down the Tub,
    class SystemTest(SystemTestMixin, unittest.TestCase): 
    267501
    268502        d.addCallback(lambda _ign: log.msg("shutting down introducer"))
    269503        d.addCallback(lambda _ign: self.central_tub.disownServiceParent())
    270         d.addCallback(lambda res: self.poll(_wait_for_introducer_loss))
     504        d.addCallback(_wait_for_introducer_loss)
     505        d.addCallback(lambda _ign: log.msg("introducer lost"))
    271506
    272507        def _restart_introducer(_ign):
    273508            log.msg("restarting introducer")
    274509            self.create_tub(self.central_portnum)
    275 
    276             for c in subscribing_clients:
    277                 # record some counters for later comparison. Stash the values
    278                 # on the client itself, because I'm lazy.
    279                 cdc = c._debug_counts
    280                 c._debug1 = cdc["inbound_announcement"]
    281                 c._debug2 = cdc["inbound_message"]
    282                 c._debug3 = cdc["new_announcement"]
    283             newintroducer = create_introducer()
    284             self.expected_message_count = NUM_SERVERS
    285             self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients)
    286             self.expected_subscribe_count = len(subscribing_clients)
    287             newfurl = self.central_tub.registerReference(newintroducer,
     510            # reset counters
     511            for i in range(NUM_CLIENTS):
     512                c = subscribing_clients[i]
     513                for k in c._debug_counts:
     514                    c._debug_counts[k] = 0
     515            expected_announcements[i] += 1 # new 'storage' for everyone
     516            if server_version == V1:
     517                introducer = old.IntroducerService_v1()
     518            else:
     519                introducer = IntroducerService()
     520            newfurl = self.central_tub.registerReference(introducer,
    288521                                                         furlFile=iff)
    289522            assert newfurl == self.introducer_furl
    290523        d.addCallback(_restart_introducer)
    291         def _wait_for_introducer_reconnect2():
    292             # wait until:
    293             #  all clients are connected
    294             #  the introducer has received publish messages from all of them
    295             #  the introducer has received subscribe messages from all of them
    296             #  the introducer has sent announcements for everybody to everybody
    297             #  all clients have received all the (duplicate) announcements
    298             # at that point, the system should be quiescent
    299             dc = introducer._debug_counts
    300             for c in clients:
    301                 if not c.connected_to_introducer():
    302                     return False
    303             if dc["inbound_message"] < self.expected_message_count:
    304                 return False
    305             if dc["outbound_announcements"] < self.expected_announcement_count:
    306                 return False
    307             if dc["inbound_subscribe"] < self.expected_subscribe_count:
    308                 return False
    309             for c in subscribing_clients:
    310                 cdc = c._debug_counts
    311                 if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS:
    312                     return False
    313             return True
    314         d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2))
     524
     525        d.addCallback(_wait_for_connected)
     526        d.addCallback(_wait_for_expected_announcements)
     527        d.addCallback(_wait_until_idle)
    315528
    316529        def _check3(res):
    317530            log.msg("doing _check3")
    318             for c in clients:
    319                 self.failUnless(c.connected_to_introducer())
     531            dc = introducer._debug_counts
     532            self.failUnlessEqual(dc["outbound_announcements"],
     533                                 NUM_STORAGE*NUM_CLIENTS)
     534            self.failUnless(dc["outbound_message"] > 0)
     535            self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS)
    320536            for c in subscribing_clients:
    321537                cdc = c._debug_counts
    322                 self.failUnless(cdc["inbound_announcement"] > c._debug1)
    323                 self.failUnless(cdc["inbound_message"] > c._debug2)
    324                 # there should have been no new announcements
    325                 self.failUnlessEqual(cdc["new_announcement"], c._debug3)
    326                 # and the right number of duplicate ones. There were
    327                 # NUM_SERVERS from the servertub restart, and there should be
    328                 # another NUM_SERVERS now
    329                 self.failUnlessEqual(cdc["duplicate_announcement"],
    330                                      2*NUM_SERVERS)
     538                self.failUnless(cdc["inbound_message"] > 0)
     539                self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE)
     540                self.failUnlessEqual(cdc["new_announcement"], 0)
     541                self.failUnlessEqual(cdc["wrong_service"], 0)
     542                self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE)
    331543
    332544        d.addCallback(_check3)
    333545        return d
    334546
     547
     548    def test_system_v2_server(self):
     549        self.basedir = "introducer/SystemTest/system_v2_server"
     550        os.makedirs(self.basedir)
     551        return self.do_system_test(V2)
     552    test_system_v2_server.timeout = 480
     553    # occasionally takes longer than 350s on "draco"
     554
     555    def test_system_v1_server(self):
     556        self.basedir = "introducer/SystemTest/system_v1_server"
     557        os.makedirs(self.basedir)
     558        return self.do_system_test(V1)
     559    test_system_v1_server.timeout = 480
     560    # occasionally takes longer than 350s on "draco"
     561
     562from allmydata.util import base32
     563class FakeRemoteReference:
     564    def notifyOnDisconnect(self, *args, **kwargs): pass
     565    def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y"
     566
     567class ClientInfo(unittest.TestCase):
     568    def test_client_v2(self):
     569        introducer = IntroducerService()
     570        tub = introducer_furl = None
     571        app_versions = {"whizzy": "fizzy"}
     572        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
     573                                     "my_version", "oldest", app_versions)
     574        #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
     575        #ann_s = client_v2.create_announcement(furl1, "storage", "RIStorage")
     576        #introducer.remote_publish_v2(ann_s, Referenceable())
     577        subscriber = FakeRemoteReference()
     578        introducer.remote_subscribe_v2(subscriber, "storage",
     579                                       client_v2._my_subscriber_info)
     580        s = introducer.get_subscribers()
     581        self.failUnlessEqual(len(s), 1)
     582        sn, when, si, rref = s[0]
     583        self.failUnlessIdentical(rref, subscriber)
     584        self.failUnlessEqual(sn, "storage")
     585        self.failUnlessEqual(si["version"], 0)
     586        self.failUnlessEqual(si["oldest-supported"], "oldest")
     587        self.failUnlessEqual(si["app-versions"], app_versions)
     588        self.failUnlessEqual(si["nickname"], u"nick-v2")
     589        self.failUnlessEqual(si["my-version"], "my_version")
     590
     591    def test_client_v1(self):
     592        introducer = IntroducerService()
     593        tub = introducer_furl = None
     594        client_v1 = old.IntroducerClient_v1(tub, introducer_furl, u"nick-v1",
     595                                            "my_version", "oldest")
     596        subscriber = FakeRemoteReference()
     597        introducer.remote_subscribe(subscriber, "storage")
     598        # the v1 subscribe interface had no subscriber_info: that was usually
     599        # sent in a separate stub_client pseudo-announcement
     600        s = introducer.get_subscribers()
     601        self.failUnlessEqual(len(s), 1)
     602        sn, when, si, rref = s[0]
     603        # rref will be a SubscriberAdapter_v1 around the real subscriber
     604        self.failUnlessIdentical(rref.original, subscriber)
     605        self.failUnlessEqual(si, None) # not known yet
     606        self.failUnlessEqual(sn, "storage")
     607
     608        # now submit the stub_client announcement
     609        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
     610        ann = (furl1, "stub_client", "RIStubClient",
     611               u"nick-v1".encode("utf-8"), "my_version", "oldest")
     612        introducer.remote_publish(ann)
     613        # the server should correlate the two
     614        s = introducer.get_subscribers()
     615        self.failUnlessEqual(len(s), 1)
     616        sn, when, si, rref = s[0]
     617        self.failUnlessIdentical(rref.original, subscriber)
     618        self.failUnlessEqual(sn, "storage")
     619
     620        self.failUnlessEqual(si["version"], 0)
     621        self.failUnlessEqual(si["oldest-supported"], "oldest")
     622        # v1 announcements do not contain app-versions
     623        self.failUnlessEqual(si["app-versions"], {})
     624        self.failUnlessEqual(si["nickname"], u"nick-v1")
     625        self.failUnlessEqual(si["my-version"], "my_version")
     626
     627        # a subscription that arrives after the stub_client announcement
     628        # should be correlated too
     629        subscriber2 = FakeRemoteReference()
     630        introducer.remote_subscribe(subscriber2, "thing2")
     631
     632        s = introducer.get_subscribers()
     633        subs = dict([(sn, (si,rref)) for sn, when, si, rref in s])
     634        self.failUnlessEqual(len(subs), 2)
     635        (si,rref) = subs["thing2"]
     636        self.failUnlessIdentical(rref.original, subscriber2)
     637        self.failUnlessEqual(si["version"], 0)
     638        self.failUnlessEqual(si["oldest-supported"], "oldest")
     639        # v1 announcements do not contain app-versions
     640        self.failUnlessEqual(si["app-versions"], {})
     641        self.failUnlessEqual(si["nickname"], u"nick-v1")
     642        self.failUnlessEqual(si["my-version"], "my_version")
     643
     644class Announcements(unittest.TestCase):
     645    def test_client_v2_unsigned(self):
     646        introducer = IntroducerService()
     647        tub = introducer_furl = None
     648        app_versions = {"whizzy": "fizzy"}
     649        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
     650                                     "my_version", "oldest", app_versions)
     651        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
     652        serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y")
     653        ann_s0 = client_v2.create_announcement(furl1, "storage", "RIStorage")
     654        canary0 = Referenceable()
     655        introducer.remote_publish_v2(ann_s0, canary0)
     656        a = introducer.get_announcements()
     657        self.failUnlessEqual(len(a), 1)
     658        (index, (ann_s, canary, ann_d, when)) = a.items()[0]
     659        self.failUnlessIdentical(canary, canary0)
     660        self.failUnlessEqual(index, ("storage", serverid))
     661        self.failUnlessEqual(ann_d["app-versions"], app_versions)
     662        self.failUnlessEqual(ann_d["nickname"], u"nick-v2")
     663        self.failUnlessEqual(ann_d["service-name"], "storage")
     664        self.failUnlessEqual(ann_d["my-version"], "my_version")
     665        self.failUnlessEqual(ann_d["FURL"], furl1)
     666
     667    def test_client_v2_signed(self):
     668        introducer = IntroducerService()
     669        tub = introducer_furl = None
     670        app_versions = {"whizzy": "fizzy"}
     671        client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2",
     672                                     "my_version", "oldest", app_versions)
     673        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
     674        serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y")
     675        sk = ecdsa.SigningKey.generate()
     676        pk = sk.get_verifying_key()
     677        pks = pk.to_string()
     678        ann_s0 = client_v2.create_announcement(furl1, "storage", "RIStorage", sk)
     679        canary0 = Referenceable()
     680        introducer.remote_publish_v2(ann_s0, canary0)
     681        a = introducer.get_announcements()
     682        self.failUnlessEqual(len(a), 1)
     683        (index, (ann_s, canary, ann_d, when)) = a.items()[0]
     684        self.failUnlessIdentical(canary, canary0)
     685        self.failUnlessEqual(index, ("storage", pks)) # index is pubkey string
     686        self.failUnlessEqual(ann_d["app-versions"], app_versions)
     687        self.failUnlessEqual(ann_d["nickname"], u"nick-v2")
     688        self.failUnlessEqual(ann_d["service-name"], "storage")
     689        self.failUnlessEqual(ann_d["my-version"], "my_version")
     690        self.failUnlessEqual(ann_d["FURL"], furl1)
     691
     692    def test_client_v1(self):
     693        introducer = IntroducerService()
     694        tub = introducer_furl = None
     695
     696        furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum"
     697        serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y")
     698        ann = (furl1, "storage", "RIStorage",
     699               u"nick-v1".encode("utf-8"), "my_version", "oldest")
     700        introducer.remote_publish(ann)
     701
     702        a = introducer.get_announcements()
     703        self.failUnlessEqual(len(a), 1)
     704        (index, (ann_s, canary, ann_d, when)) = a.items()[0]
     705        self.failUnlessEqual(canary, None)
     706        self.failUnlessEqual(index, ("storage", serverid))
     707        self.failUnlessEqual(ann_d["app-versions"], {})
     708        self.failUnlessEqual(ann_d["nickname"], u"nick-v1".encode("utf-8"))
     709        self.failUnlessEqual(ann_d["service-name"], "storage")
     710        self.failUnlessEqual(ann_d["my-version"], "my_version")
     711        self.failUnlessEqual(ann_d["FURL"], furl1)
     712
     713
    335714class TooNewServer(IntroducerService):
    336715    VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999":
    337716                 { },
    class NonV1Server(SystemTestMixin, unittest.TestCase): 
    359738        tub.setLocation("localhost:%d" % portnum)
    360739
    361740        c = IntroducerClient(tub, self.introducer_furl,
    362                              u"nickname-client", "version", "oldest")
     741                             u"nickname-client", "version", "oldest", {})
    363742        announcements = {}
    364743        def got(serverid, ann_d):
    365744            announcements[serverid] = ann_d
    class DecodeFurl(unittest.TestCase): 
    388767        nodeid = b32decode(m.group(1).upper())
    389768        self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d")
    390769
     770
     771# add tests of StorageFarmBroker: if it receives duplicate announcements, it
     772# should leave the Reconnector in place, also if it receives
     773# same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it
     774# should tear down the Reconnector and make a new one. This behavior used to
     775# live in the IntroducerClient, and thus used to be tested by test_introducer
     776
     777# copying more tests from old branch:
     778
     779#  then also add Upgrade test
  • src/allmydata/test/test_system.py

    diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py
    index bf6af09..b7b3c22 100644
    a b class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): 
    778778                newappverstr = "%s: %s" % (allmydata.__appname__, altverstr)
    779779
    780780                self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res))
    781                 self.failUnless("Announcement Summary: storage: 5, stub_client: 5" in res)
     781                self.failUnless("Announcement Summary: storage: 5" in res)
    782782                self.failUnless("Subscription Summary: storage: 5" in res)
    783783            except unittest.FailTest:
    784784                print
    class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): 
    795795                self.failUnlessEqual(data["subscription_summary"],
    796796                                     {"storage": 5})
    797797                self.failUnlessEqual(data["announcement_summary"],
    798                                      {"storage": 5, "stub_client": 5})
     798                                     {"storage": 5})
    799799                self.failUnlessEqual(data["announcement_distinct_hosts"],
    800                                      {"storage": 1, "stub_client": 1})
     800                                     {"storage": 1})
    801801            except unittest.FailTest:
    802802                print
    803803                print "GET %s?t=json output was:" % self.introweb_url
  • src/allmydata/web/introweb.py

    diff --git a/src/allmydata/web/introweb.py b/src/allmydata/web/introweb.py
    index 28273bd..be950c4 100644
    a b class IntroducerRoot(rend.Page): 
    2929
    3030    def render_JSON(self, ctx):
    3131        res = {}
    32         clients = self.introducer_service.get_subscribers()
    33         subscription_summary = dict([ (name, len(clients[name]))
    34                                       for name in clients ])
    35         res["subscription_summary"] = subscription_summary
     32
     33        counts = {}
     34        subscribers = self.introducer_service.get_subscribers()
     35        for (service_name, ign, ign, ign) in subscribers:
     36            if service_name not in counts:
     37                counts[service_name] = 0
     38            counts[service_name] += 1
     39        res["subscription_summary"] = counts
    3640
    3741        announcement_summary = {}
    3842        service_hosts = {}
    39         for (ann,when) in self.introducer_service.get_announcements().values():
    40             (furl, service_name, ri_name, nickname, ver, oldest) = ann
     43        for a in self.introducer_service.get_announcements().values():
     44            (_, _, ann_d, when) = a
     45            service_name = ann_d["service-name"]
    4146            if service_name not in announcement_summary:
    4247                announcement_summary[service_name] = 0
    4348            announcement_summary[service_name] += 1
    class IntroducerRoot(rend.Page): 
    5055            # enough: when multiple services are run on a single host,
    5156            # they're usually either configured with the same addresses,
    5257            # or setLocationAutomatically picks up the same interfaces.
     58            furl = ann_d["FURL"]
    5359            locations = SturdyRef(furl).getTubRef().getLocations()
    5460            # list of tuples, ("ipv4", host, port)
    5561            host = frozenset([hint[1]
    class IntroducerRoot(rend.Page): 
    7480
    7581    def render_announcement_summary(self, ctx, data):
    7682        services = {}
    77         for (ann,when) in self.introducer_service.get_announcements().values():
    78             (furl, service_name, ri_name, nickname, ver, oldest) = ann
     83        for a in self.introducer_service.get_announcements().values():
     84            (_, _, ann_d, when) = a
     85            service_name = ann_d["service-name"]
    7986            if service_name not in services:
    8087                services[service_name] = 0
    8188            services[service_name] += 1
    class IntroducerRoot(rend.Page): 
    8592                          for service_name in service_names])
    8693
    8794    def render_client_summary(self, ctx, data):
     95        counts = {}
    8896        clients = self.introducer_service.get_subscribers()
    89         service_names = clients.keys()
    90         service_names.sort()
    91         return ", ".join(["%s: %d" % (service_name, len(clients[service_name]))
    92                           for service_name in service_names])
     97        for (service_name, ign, ign, ign) in clients:
     98            if service_name not in counts:
     99                counts[service_name] = 0
     100            counts[service_name] += 1
     101        return ", ".join([ "%s: %d" % (name, counts[name])
     102                           for name in sorted(counts.keys()) ] )
    93103
    94104    def data_services(self, ctx, data):
    95105        introsvc = self.introducer_service
    96         ann = [(since,a)
    97                for (a,since) in introsvc.get_announcements().values()
    98                if a[1] != "stub_client"]
    99         ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
     106        ann = []
     107        for a in introsvc.get_announcements().values():
     108            (_, _, ann_d, when) = a
     109            if ann_d["service-name"] == "stub_client":
     110                continue
     111            ann.append( (when, ann_d) )
     112        ann.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"]))
     113        # this used to be:
     114        #ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) )
     115        # service_name was the primary key, then the whole tuple (starting
     116        # with the furl) was the secondary key
    100117        return ann
    101118
    102     def render_service_row(self, ctx, (since,announcement)):
    103         (furl, service_name, ri_name, nickname, ver, oldest) = announcement
    104         sr = SturdyRef(furl)
     119    def render_service_row(self, ctx, (since,ann_d)):
     120        sr = SturdyRef(ann_d["FURL"])
    105121        nodeid = sr.tubID
    106122        advertised = self.show_location_hints(sr)
    107         ctx.fillSlots("peerid", "%s %s" % (nodeid, nickname))
     123        ctx.fillSlots("peerid", "%s %s" % (nodeid, ann_d["nickname"]))
    108124        ctx.fillSlots("advertised", " ".join(advertised))
    109125        ctx.fillSlots("connected", "?")
    110126        TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
    111127        ctx.fillSlots("announced",
    112128                      time.strftime(TIME_FORMAT, time.localtime(since)))
    113         ctx.fillSlots("version", ver)
    114         ctx.fillSlots("service_name", service_name)
     129        ctx.fillSlots("version", ann_d["my-version"])
     130        ctx.fillSlots("service_name", ann_d["service-name"])
    115131        return ctx.tag
    116132
    117133    def data_subscribers(self, ctx, data):
    118         # use the "stub_client" announcements to get information per nodeid
    119         clients = {}
    120         for (ann,when) in self.introducer_service.get_announcements().values():
    121             if ann[1] != "stub_client":
    122                 continue
    123             (furl, service_name, ri_name, nickname, ver, oldest) = ann
    124             sr = SturdyRef(furl)
    125             nodeid = sr.tubID
    126             clients[nodeid] = ann
    127 
    128         # then we actually provide information per subscriber
    129         s = []
    130         introsvc = self.introducer_service
    131         for service_name, subscribers in introsvc.get_subscribers().items():
    132             for (rref, timestamp) in subscribers.items():
    133                 sr = rref.getSturdyRef()
    134                 nodeid = sr.tubID
    135                 ann = clients.get(nodeid)
    136                 s.append( (service_name, rref, timestamp, ann) )
    137         s.sort()
    138         return s
     134        return self.introducer_service.get_subscribers()
    139135
    140136    def render_subscriber_row(self, ctx, s):
    141         (service_name, rref, since, ann) = s
    142         nickname = "?"
    143         version = "?"
    144         if ann:
    145             (furl, service_name_2, ri_name, nickname, version, oldest) = ann
     137        (service_name, since, info, rref) = s
     138        nickname = info.get("nickname", "?")
     139        version = info.get("my-version", "?")
    146140
    147141        sr = rref.getSturdyRef()
    148142        # if the subscriber didn't do Tub.setLocation, nodeid will be None