Ticket #466: 2011-02-p3.diff
File 2011-02-p3.diff, 109.2 KB (added by warner, at 2011-02-07T18:36:43Z) |
---|
-
src/allmydata/client.py
diff --git a/src/allmydata/client.py b/src/allmydata/client.py index fa515d4..46b02bc 100644
a b from zope.interface import implements 6 6 from twisted.internet import reactor, defer 7 7 from twisted.application import service 8 8 from twisted.application.internet import TimerService 9 from foolscap.api import Referenceable10 9 from pycryptopp.publickey import rsa 11 10 12 11 import allmydata … … from allmydata.util.abbreviate import parse_abbreviated_size 22 21 from allmydata.util.time_format import parse_duration, parse_date 23 22 from allmydata.stats import StatsProvider 24 23 from allmydata.history import History 25 from allmydata.interfaces import IStatsProducer , RIStubClient24 from allmydata.interfaces import IStatsProducer 26 25 from allmydata.nodemaker import NodeMaker 27 26 28 27 … … GiB=1024*MiB 32 31 TiB=1024*GiB 33 32 PiB=1024*TiB 34 33 35 class StubClient(Referenceable):36 implements(RIStubClient)37 38 34 def _make_secret(): 39 35 return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n" 40 36 … … class Client(node.Node, pollmixin.PollMixin): 186 182 ic = IntroducerClient(self.tub, self.introducer_furl, 187 183 self.nickname, 188 184 str(allmydata.__full_version__), 189 str(self.OLDEST_SUPPORTED_VERSION)) 185 str(self.OLDEST_SUPPORTED_VERSION), 186 self.get_app_versions()) 190 187 self.introducer_client = ic 191 188 # hold off on starting the IntroducerClient until our tub has been 192 189 # started, so we'll have a useful address on our RemoteReference, so … … class Client(node.Node, pollmixin.PollMixin): 292 289 self.terminator = Terminator() 293 290 self.terminator.setServiceParent(self) 294 291 self.add_service(Uploader(helper_furl, self.stats_provider)) 295 self.init_stub_client()296 292 self.init_nodemaker() 297 293 298 294 def init_client_storage_broker(self): … … class Client(node.Node, pollmixin.PollMixin): 331 327 def get_storage_broker(self): 332 328 return self.storage_broker 333 329 334 def init_stub_client(self):335 def _publish(res):336 # we publish an empty object so that the introducer can count how337 # many clients are connected and see what versions they're338 # running.339 sc = StubClient()340 furl = self.tub.registerReference(sc)341 ri_name = RIStubClient.__remote_name__342 self.introducer_client.publish(furl, "stub_client", ri_name)343 d = self.when_tub_ready()344 d.addCallback(_publish)345 d.addErrback(log.err, facility="tahoe.init",346 level=log.BAD, umid="OEHq3g")347 348 330 def init_nodemaker(self): 349 331 self.nodemaker = NodeMaker(self.storage_broker, 350 332 self._secret_holder, -
src/allmydata/interfaces.py
diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 48094a9..9c2f1c8 100644
a b WriteEnablerSecret = Hash # used to protect mutable bucket modifications 26 26 LeaseRenewSecret = Hash # used to protect bucket lease renewal requests 27 27 LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests 28 28 29 class RIStubClient(RemoteInterface):30 """Each client publishes a service announcement for a dummy object called31 the StubClient. This object doesn't actually offer any services, but the32 announcement helps the Introducer keep track of which clients are33 subscribed (so the grid admin can keep track of things like the size of34 the grid and the client versions in use. This is the (empty)35 RemoteInterface for the StubClient."""36 37 29 class RIBucketWriter(RemoteInterface): 38 30 """ Objects of this kind live on the server side. """ 39 31 def write(offset=Offset, data=ShareData): -
src/allmydata/introducer/client.py
diff --git a/src/allmydata/introducer/client.py b/src/allmydata/introducer/client.py index 31fbb5c..1e48fd8 100644
a b 1 1 2 from base64 import b32decode 2 import time, simplejson 3 3 from zope.interface import implements 4 4 from twisted.application import service 5 from foolscap.api import Referenceable, SturdyRef, eventually5 from foolscap.api import Referenceable, eventually, RemoteInterface, Violation 6 6 from allmydata.interfaces import InsufficientVersionError 7 from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \ 8 IIntroducerClient 7 from allmydata.introducer.interfaces import IIntroducerClient, \ 8 RIIntroducerSubscriberClient_v1, RIIntroducerSubscriberClient_v2 9 from allmydata.introducer.common import sign, unsign, make_index, \ 10 convert_announcement_v1_to_v2, convert_announcement_v2_to_v1 9 11 from allmydata.util import log, idlib 10 from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref 12 from allmydata.util.rrefutil import add_version_to_remote_reference 13 from allmydata.util.ecdsa import BadSignatureError 14 15 class ClientAdapter_v1(Referenceable): # for_v1 16 """I wrap a v2 IntroducerClient to make it look like a v1 client, so it 17 can be attached to an old server.""" 18 implements(RIIntroducerSubscriberClient_v1) 19 20 def __init__(self, original): 21 self.original = original 22 23 def remote_announce(self, announcements): 24 lp = self.original.log("received %d announcements (v1)" % 25 len(announcements)) 26 anns_v1 = set([convert_announcement_v1_to_v2(ann_v1) 27 for ann_v1 in announcements]) 28 return self.original.got_announcements(anns_v1, lp) 29 30 def remote_set_encoding_parameters(self, parameters): 31 self.original.remote_set_encoding_parameters(parameters) 32 33 class RIStubClient(RemoteInterface): # for_v1 34 """Each client publishes a service announcement for a dummy object called 35 the StubClient. This object doesn't actually offer any services, but the 36 announcement helps the Introducer keep track of which clients are 37 subscribed (so the grid admin can keep track of things like the size of 38 the grid and the client versions in use. This is the (empty) 39 RemoteInterface for the StubClient.""" 40 41 class StubClient(Referenceable): # for_v1 42 implements(RIStubClient) 11 43 12 44 13 45 class IntroducerClient(service.Service, Referenceable): 14 implements(RIIntroducerSubscriberClient , IIntroducerClient)46 implements(RIIntroducerSubscriberClient_v2, IIntroducerClient) 15 47 16 48 def __init__(self, tub, introducer_furl, 17 nickname, my_version, oldest_supported): 49 nickname, my_version, oldest_supported, 50 app_versions): 18 51 self._tub = tub 19 52 self.introducer_furl = introducer_furl 20 53 21 54 assert type(nickname) is unicode 22 self._nickname _utf8 = nickname.encode("utf-8") # we always send UTF-855 self._nickname = nickname 23 56 self._my_version = my_version 24 57 self._oldest_supported = oldest_supported 58 self._app_versions = app_versions 59 60 self._my_subscriber_info = { "version": 0, 61 "nickname": self._nickname, 62 "app-versions": self._app_versions, 63 "my-version": self._my_version, 64 "oldest-supported": self._oldest_supported, 65 } 66 self._stub_client = None # for_v1 67 self._stub_client_furl = None 25 68 26 self._published_announcements = set() 69 self._published_announcements = {} 70 self._canary = Referenceable() 27 71 28 72 self._publisher = None 29 73 … … class IntroducerClient(service.Service, Referenceable): 33 77 34 78 # _current_announcements remembers one announcement per 35 79 # (servicename,serverid) pair. Anything that arrives with the same 36 # pair will displace the previous one. This stores unpacked 37 # announcement dictionaries, which can be compared for equality to 38 # distinguish re-announcement from updates. It also provides memory 39 # for clients who subscribe after startup. 80 # pair will displace the previous one. This stores tuples of 81 # (unpacked announcement dictionary, verifyingkey, rxtime). The ann_d 82 # dicts can be compared for equality to distinguish re-announcement 83 # from updates. It also provides memory for clients who subscribe 84 # after startup. 40 85 self._current_announcements = {} 41 86 42 87 self.encoding_parameters = None … … class IntroducerClient(service.Service, Referenceable): 51 96 "new_announcement": 0, 52 97 "outbound_message": 0, 53 98 } 99 self._debug_outstanding = 0 100 101 def _debug_retired(self, res): 102 self._debug_outstanding -= 1 103 return res 54 104 55 105 def startService(self): 56 106 service.Service.startService(self) … … class IntroducerClient(service.Service, Referenceable): 95 145 96 146 def log(self, *args, **kwargs): 97 147 if "facility" not in kwargs: 98 kwargs["facility"] = "tahoe.introducer "148 kwargs["facility"] = "tahoe.introducer.client" 99 149 return log.msg(*args, **kwargs) 100 150 101 102 def publish(self, furl, service_name, remoteinterface_name):103 assert type(self._nickname_utf8) is str # we always send UTF-8104 ann = (furl, service_name, remoteinterface_name,105 self._nickname_utf8, self._my_version, self._oldest_supported)106 self._published_announcements.add(ann)107 self._maybe_publish()108 109 151 def subscribe_to(self, service_name, cb, *args, **kwargs): 110 152 self._local_subscribers.append( (service_name,cb,args,kwargs) ) 111 153 self._subscribed_service_names.add(service_name) 112 154 self._maybe_subscribe() 113 for (servicename,nodeid), ann_din self._current_announcements.items():155 for (servicename,nodeid),(ann_d,key,when) in self._current_announcements.items(): 114 156 if servicename == service_name: 115 157 eventually(cb, nodeid, ann_d) 116 158 … … class IntroducerClient(service.Service, Referenceable): 124 166 # there is a race here, but the subscription desk ignores 125 167 # duplicate requests. 126 168 self._subscriptions.add(service_name) 127 d = self._publisher.callRemote("subscribe", self, service_name) 128 d.addErrback(trap_deadref) 129 d.addErrback(log.err, format="server errored during subscribe", 130 facility="tahoe.introducer", 169 self._debug_outstanding += 1 170 d = self._publisher.callRemote("subscribe_v2", 171 self, service_name, 172 self._my_subscriber_info) 173 d.addBoth(self._debug_retired) 174 d.addErrback(self._subscribe_handle_v1, service_name) # for_v1 175 d.addErrback(log.err, facility="tahoe.introducer.client", 131 176 level=log.WEIRD, umid="2uMScQ") 132 177 178 def _subscribe_handle_v1(self, f, service_name): # for_v1 179 f.trap(Violation, NameError) 180 # they don't have a 'subscribe_v2' method: must be a v1 introducer. 181 # Fall back to the v1 'subscribe' method, using a client adapter. 182 ca = ClientAdapter_v1(self) 183 self._debug_outstanding += 1 184 d = self._publisher.callRemote("subscribe", ca, service_name) 185 d.addBoth(self._debug_retired) 186 # We must also publish an empty 'stub_client' object, so the 187 # introducer can count how many clients are connected and see what 188 # versions they're running. 189 if not self._stub_client_furl: 190 self._stub_client = sc = StubClient() 191 self._stub_client_furl = self._tub.registerReference(sc) 192 def _publish_stub_client(ignored): 193 ri_name = RIStubClient.__remote_name__ 194 self.publish(self._stub_client_furl, "stub_client", ri_name) 195 d.addCallback(_publish_stub_client) 196 return d 197 198 def create_announcement(self, furl, service_name, remoteinterface_name, 199 signing_key=None): 200 ann_d = {"version": 0, 201 "service-name": service_name, 202 "FURL": furl, 203 "remoteinterface-name": remoteinterface_name, 204 205 "nickname": self._nickname, 206 "app-versions": self._app_versions, 207 "my-version": self._my_version, 208 "oldest-supported": self._oldest_supported, 209 } 210 return simplejson.dumps(sign(ann_d, signing_key)) 211 212 213 def publish(self, furl, service_name, remoteinterface_name, 214 signing_key=None): 215 ann = self.create_announcement(furl, service_name, remoteinterface_name, 216 signing_key) 217 self._published_announcements[service_name] = ann 218 self._maybe_publish() 219 133 220 def _maybe_publish(self): 134 221 if not self._publisher: 135 222 self.log("want to publish, but no introducer yet", level=log.NOISY) 136 223 return 137 224 # this re-publishes everything. The Introducer ignores duplicates 138 for ann in self._published_announcements :225 for ann in self._published_announcements.values(): 139 226 self._debug_counts["outbound_message"] += 1 140 d = self._publisher.callRemote("publish", ann)141 d .addErrback(trap_deadref)142 d.add Errback(log.err,143 format="server errored during publish %(ann)s",144 ann=ann, facility="tahoe.introducer",227 self._debug_outstanding += 1 228 d = self._publisher.callRemote("publish_v2", ann, self._canary) 229 d.addBoth(self._debug_retired) 230 d.addErrback(self._handle_v1_publisher, ann) # for_v1 231 d.addErrback(log.err, ann=ann, facility="tahoe.introducer.client", 145 232 level=log.WEIRD, umid="xs9pVQ") 146 233 147 148 149 def remote_announce(self, announcements): 150 self.log("received %d announcements" % len(announcements)) 234 def _handle_v1_publisher(self, f, ann): # for_v1 235 f.trap(Violation, NameError) 236 # they don't have the 'publish_v2' method, so fall back to the old 237 # 'publish' method (which takes an unsigned tuple of bytestrings) 238 self.log("falling back to publish_v1", 239 level=log.UNUSUAL, umid="9RCT1A", failure=f) 240 ann_v1 = convert_announcement_v2_to_v1(ann) 241 self._debug_outstanding += 1 242 d = self._publisher.callRemote("publish", ann_v1) 243 d.addBoth(self._debug_retired) 244 return d 245 246 247 def remote_announce_v2(self, announcements): 248 lp = self.log("received %d announcements (v2)" % len(announcements)) 249 return self.got_announcements(announcements, lp) 250 251 def got_announcements(self, announcements, lp=None): 252 # this is the common entry point for both v1 and v2 announcements 151 253 self._debug_counts["inbound_message"] += 1 152 for ann in announcements:254 for ann_s in announcements: 153 255 try: 154 self._process_announcement(ann)155 except :156 log.err(format="unable to process announcement %(ann)s",157 ann=ann)158 # Don't let a corrupt announcement prevent us from processing159 # the remaining ones. Don't return an error to the server,160 # since they'd just ignore it anyways. 161 pass162 163 def _process_announcement(self, ann ):256 ann_d, key = unsign(ann_s) # might raise bad-sig error 257 except BadSignatureError: 258 self.log("bad signature on inbound announcement: %s" % (ann_s,), 259 parent=lp, level=log.WEIRD, umid="ZAU15Q") 260 # process other announcements that arrived with the bad one 261 continue 262 263 self._process_announcement(ann_d, key) 264 265 def _process_announcement(self, ann_d, key): 164 266 self._debug_counts["inbound_announcement"] += 1 165 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann267 service_name = str(ann_d["service-name"]) 166 268 if service_name not in self._subscribed_service_names: 167 269 self.log("announcement for a service we don't care about [%s]" 168 270 % (service_name,), level=log.UNUSUAL, umid="dIpGNA") 169 271 self._debug_counts["wrong_service"] += 1 170 272 return 171 self.log("announcement for [%s]: %s" % (service_name, ann), 172 umid="BoKEag") 173 assert type(furl) is str 174 assert type(service_name) is str 175 assert type(ri_name) is str 176 assert type(nickname_utf8) is str 177 nickname = nickname_utf8.decode("utf-8") 178 assert type(nickname) is unicode 179 assert type(ver) is str 180 assert type(oldest) is str 181 182 nodeid = b32decode(SturdyRef(furl).tubID.upper()) 183 nodeid_s = idlib.shortnodeid_b2a(nodeid) 184 185 ann_d = { "version": 0, 186 "service-name": service_name, 187 188 "FURL": furl, 189 "nickname": nickname, 190 "app-versions": {}, # need #466 and v2 introducer 191 "my-version": ver, 192 "oldest-supported": oldest, 193 } 194 195 index = (service_name, nodeid) 196 if self._current_announcements.get(index, None) == ann_d: 197 self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", 273 # for ASCII values, simplejson might give us unicode *or* bytes 274 if "nickname" in ann_d and isinstance(ann_d["nickname"], str): 275 ann_d["nickname"] = unicode(ann_d["nickname"]) 276 nick_s = ann_d.get("nickname",u"").encode("utf-8") 277 lp2 = self.log(format="announcement for nickname '%(nick)s', service=%(svc)s: %(ann)s", 278 nick=nick_s, svc=service_name, ann=ann_d, umid="BoKEag") 279 280 index = make_index(ann_d, key) 281 nodeid = index[1] 282 nodeid_s = idlib.nodeid_b2a(nodeid) 283 284 # is this announcement a duplicate? 285 if self._current_announcements.get(index, [None]*3)[0] == ann_d: 286 self.log(format="reannouncement for [%(service)s]:%(nodeid)s, ignoring", 198 287 service=service_name, nodeid=nodeid_s, 199 level=log.UNUSUAL, umid="B1MIdA")288 parent=lp2, level=log.UNUSUAL, umid="B1MIdA") 200 289 self._debug_counts["duplicate_announcement"] += 1 201 290 return 291 # does it update an existing one? 202 292 if index in self._current_announcements: 203 293 self._debug_counts["update"] += 1 294 self.log("replacing old announcement: %s" % (ann_d,), 295 parent=lp2, level=log.NOISY, umid="wxwgIQ") 204 296 else: 205 297 self._debug_counts["new_announcement"] += 1 298 self.log("new announcement[%s]" % service_name, 299 parent=lp2, level=log.NOISY) 206 300 207 self._current_announcements[index] = ann_d301 self._current_announcements[index] = (ann_d, key, time.time()) 208 302 # note: we never forget an index, but we might update its value 209 303 210 304 for (service_name2,cb,args,kwargs) in self._local_subscribers: -
new file src/allmydata/introducer/common.py
diff --git a/src/allmydata/introducer/common.py b/src/allmydata/introducer/common.py new file mode 100644 index 0000000..a057ecc
- + 1 2 import re, simplejson 3 from base64 import b32decode 4 from allmydata.util.ecdsa import VerifyingKey 5 6 def make_index(ann_d, key): 7 """Return something that can be used as an index (e.g. a tuple of 8 strings), such that two messages that refer to the same 'thing' will have 9 the same index. For introducer announcements, this is a tuple of 10 (service-name, signing-key), or (service-name, tubid) if the announcement 11 is not signed.""" 12 13 service_name = str(ann_d["service-name"]) 14 if key: 15 index = (service_name, key.to_string()) 16 else: 17 # otherwise, use the FURL to get a tubid 18 furl = str(ann_d["FURL"]) 19 m = re.match(r'pb://(\w+)@', furl) 20 assert m 21 tubid = b32decode(m.group(1).upper()) 22 index = (service_name, tubid) 23 return index 24 25 def convert_announcement_v1_to_v2(ann_t): 26 (furl, service_name, ri_name, nickname, ver, oldest) = ann_t 27 assert type(furl) is str 28 assert type(service_name) is str 29 assert type(ri_name) is str 30 assert type(nickname) is str 31 assert type(ver) is str 32 assert type(oldest) is str 33 ann_d = {"version": 0, 34 "service-name": service_name, 35 "FURL": furl, 36 "remoteinterface-name": ri_name, 37 38 "nickname": nickname.decode("utf-8"), 39 "app-versions": {}, 40 "my-version": ver, 41 "oldest-supported": oldest, 42 } 43 return simplejson.dumps( (simplejson.dumps(ann_d), None, None) ) 44 45 def convert_announcement_v2_to_v1(ann_v2): 46 (msg, sig, pubkey) = simplejson.loads(ann_v2) 47 ann_d = simplejson.loads(msg) 48 assert ann_d["version"] == 0 49 ann_t = (str(ann_d["FURL"]), str(ann_d["service-name"]), 50 str(ann_d["remoteinterface-name"]), 51 ann_d["nickname"].encode("utf-8"), 52 str(ann_d["my-version"]), 53 str(ann_d["oldest-supported"]), 54 ) 55 return ann_t 56 57 58 def sign(ann_d, sk): 59 msg = simplejson.dumps(ann_d) 60 if not sk: 61 return (msg, None, None) 62 vk = sk.get_verifying_key() 63 return (msg, sk.sign(msg).encode("hex"), vk.to_string().encode("hex")) 64 65 def unsign(ann_s): 66 (msg_s, sig_s, key_s) = simplejson.loads(ann_s) 67 key = None 68 if sig_s and key_s: 69 key = VerifyingKey.from_string(key_s.decode("hex")) 70 key.verify(sig_s.decode("hex"), msg_s) 71 msg = simplejson.loads(msg_s) 72 return (msg, key) -
src/allmydata/introducer/interfaces.py
diff --git a/src/allmydata/introducer/interfaces.py b/src/allmydata/introducer/interfaces.py index 54f1701..03b0d9d 100644
a b 1 1 2 2 from zope.interface import Interface 3 3 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ 4 RemoteInterface 4 RemoteInterface, Referenceable 5 from old import RIIntroducerSubscriberClient_v1 5 6 FURL = StringConstraint(1000) 6 7 8 # old introducer protocol (v1): 9 # 7 10 # Announcements are (FURL, service_name, remoteinterface_name, 8 11 # nickname, my_version, oldest_supported) 9 12 # the (FURL, service_name, remoteinterface_name) refer to the service being … … FURL = StringConstraint(1000) 14 17 # incompatible peer. The second goal is to enable the development of 15 18 # backwards-compatibility code. 16 19 17 Announcement = TupleOf(FURL, str, str,18 str, str, str)20 Announcement_v1 = TupleOf(FURL, str, str, 21 str, str, str) 19 22 20 class RIIntroducerSubscriberClient(RemoteInterface): 21 __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" 23 # new protocol: Announcements are strings, a JSON serialized 3-tuple of (msg, 24 # sig, pubkey). More details to come. 25 Announcement_v2 = str 22 26 23 def announce(announcements=SetOf(Announcement)): 27 class RIIntroducerSubscriberClient_v2(RemoteInterface): 28 __remote_name__ = "RIIntroducerSubscriberClient_v2.tahoe.allmydata.com" 29 30 def announce_v2(announcements=SetOf(Announcement_v2)): 24 31 """I accept announcements from the publisher.""" 25 32 return None 26 33 … … class RIIntroducerSubscriberClient(RemoteInterface): 41 48 """ 42 49 return None 43 50 44 # When Foolscap can handle multiple interfaces (Foolscap#17), the 45 # full-powered introducer will implement both RIIntroducerPublisher and 46 # RIIntroducerSubscriberService. Until then, we define 47 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and 48 # make everybody use that. 51 SubscriberInfo = DictOf(str, Any()) 49 52 50 class RIIntroducerPublisher (RemoteInterface):53 class RIIntroducerPublisherAndSubscriberService_v2(RemoteInterface): 51 54 """To publish a service to the world, connect to me and give me your 52 announcement message. I will deliver a copy to all connected subscribers.""" 53 __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" 54 55 def publish(announcement=Announcement): 56 # canary? 57 return None 58 59 class RIIntroducerSubscriberService(RemoteInterface): 60 __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" 61 62 def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): 63 """Give me a subscriber reference, and I will call its new_peers() 64 method will any announcements that match the desired service name. I 65 will ignore duplicate subscriptions. 66 """ 67 return None 68 69 class RIIntroducerPublisherAndSubscriberService(RemoteInterface): 70 __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" 55 announcement message. I will deliver a copy to all connected subscribers. 56 To hear about services, connect to me and subscribe to a specific 57 service_name.""" 58 __remote_name__ = "RIIntroducerPublisherAndSubscriberService_v2.tahoe.allmydata.com" 71 59 def get_version(): 72 60 return DictOf(str, Any()) 73 def publish(announcement=Announcement): 61 def publish(announcement=Announcement_v1): 62 return None 63 def publish_v2(announcement=Announcement_v2, canary=Referenceable): 74 64 return None 75 def subscribe(subscriber=RIIntroducerSubscriberClient, service_name=str): 65 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): 66 return None 67 def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2, 68 service_name=str, subscriber_info=SubscriberInfo): 69 """Give me a subscriber reference, and I will call its announce_v2() 70 method with any announcements that match the desired service name. I 71 will ignore duplicate subscriptions. The subscriber_info dictionary 72 tells me about the subscriber, and is used for diagnostic/status 73 displays.""" 76 74 return None 77 75 78 76 class IIntroducerClient(Interface): … … class IIntroducerClient(Interface): 80 78 publish their services to the rest of the world, and I help them learn 81 79 about services available on other nodes.""" 82 80 83 def publish(furl, service_name, remoteinterface_name): 81 def publish(furl, service_name, remoteinterface_name, 82 signing_key=None): 84 83 """Once you call this, I will tell the world that the Referenceable 85 84 available at FURL is available to provide a service named 86 85 SERVICE_NAME. The precise definition of the service being provided is 87 86 identified by the Foolscap 'remote interface name' in the last 88 87 parameter: this is supposed to be a globally-unique string that 89 identifies the RemoteInterface that is implemented.""" 88 identifies the RemoteInterface that is implemented. 89 90 If signing_key= is set to an instance of ecdsa.SigningKey, it will be 91 used to sign the announcement.""" 90 92 91 93 def subscribe_to(service_name, callback, *args, **kwargs): 92 94 """Call this if you will eventually want to use services with the -
new file src/allmydata/introducer/old.py
diff --git a/src/allmydata/introducer/old.py b/src/allmydata/introducer/old.py new file mode 100644 index 0000000..e0bdacf
- + 1 2 import time 3 from base64 import b32decode 4 from zope.interface import implements, Interface 5 from twisted.application import service 6 import allmydata 7 from allmydata.interfaces import InsufficientVersionError 8 from allmydata.util import log, idlib, rrefutil 9 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ 10 RemoteInterface, Referenceable, eventually, SturdyRef 11 FURL = StringConstraint(1000) 12 13 # We keep a copy of the old introducer (both client and server) here to 14 # support compatibility tests. The old client is supposed to handle the new 15 # server, and new client is supposed to handle the old server. 16 17 18 # Announcements are (FURL, service_name, remoteinterface_name, 19 # nickname, my_version, oldest_supported) 20 # the (FURL, service_name, remoteinterface_name) refer to the service being 21 # announced. The (nickname, my_version, oldest_supported) refer to the 22 # client as a whole. The my_version/oldest_supported strings can be parsed 23 # by an allmydata.util.version.Version instance, and then compared. The 24 # first goal is to make sure that nodes are not confused by speaking to an 25 # incompatible peer. The second goal is to enable the development of 26 # backwards-compatibility code. 27 28 Announcement = TupleOf(FURL, str, str, 29 str, str, str) 30 31 class RIIntroducerSubscriberClient_v1(RemoteInterface): 32 __remote_name__ = "RIIntroducerSubscriberClient.tahoe.allmydata.com" 33 34 def announce(announcements=SetOf(Announcement)): 35 """I accept announcements from the publisher.""" 36 return None 37 38 def set_encoding_parameters(parameters=(int, int, int)): 39 """Advise the client of the recommended k-of-n encoding parameters 40 for this grid. 'parameters' is a tuple of (k, desired, n), where 'n' 41 is the total number of shares that will be created for any given 42 file, while 'k' is the number of shares that must be retrieved to 43 recover that file, and 'desired' is the minimum number of shares that 44 must be placed before the uploader will consider its job a success. 45 n/k is the expansion ratio, while k determines the robustness. 46 47 Introducers should specify 'n' according to the expected size of the 48 grid (there is no point to producing more shares than there are 49 peers), and k according to the desired reliability-vs-overhead goals. 50 51 Note that setting k=1 is equivalent to simple replication. 52 """ 53 return None 54 55 # When Foolscap can handle multiple interfaces (Foolscap#17), the 56 # full-powered introducer will implement both RIIntroducerPublisher and 57 # RIIntroducerSubscriberService. Until then, we define 58 # RIIntroducerPublisherAndSubscriberService as a combination of the two, and 59 # make everybody use that. 60 61 class RIIntroducerPublisher_v1(RemoteInterface): 62 """To publish a service to the world, connect to me and give me your 63 announcement message. I will deliver a copy to all connected subscribers.""" 64 __remote_name__ = "RIIntroducerPublisher.tahoe.allmydata.com" 65 66 def publish(announcement=Announcement): 67 # canary? 68 return None 69 70 class RIIntroducerSubscriberService_v1(RemoteInterface): 71 __remote_name__ = "RIIntroducerSubscriberService.tahoe.allmydata.com" 72 73 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): 74 """Give me a subscriber reference, and I will call its new_peers() 75 method will any announcements that match the desired service name. I 76 will ignore duplicate subscriptions. 77 """ 78 return None 79 80 class RIIntroducerPublisherAndSubscriberService_v1(RemoteInterface): 81 __remote_name__ = "RIIntroducerPublisherAndSubscriberService.tahoe.allmydata.com" 82 def get_version(): 83 return DictOf(str, Any()) 84 def publish(announcement=Announcement): 85 return None 86 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str): 87 return None 88 89 class IIntroducerClient(Interface): 90 """I provide service introduction facilities for a node. I help nodes 91 publish their services to the rest of the world, and I help them learn 92 about services available on other nodes.""" 93 94 def publish(furl, service_name, remoteinterface_name): 95 """Once you call this, I will tell the world that the Referenceable 96 available at FURL is available to provide a service named 97 SERVICE_NAME. The precise definition of the service being provided is 98 identified by the Foolscap 'remote interface name' in the last 99 parameter: this is supposed to be a globally-unique string that 100 identifies the RemoteInterface that is implemented.""" 101 102 def subscribe_to(service_name, callback, *args, **kwargs): 103 """Call this if you will eventually want to use services with the 104 given SERVICE_NAME. This will prompt me to subscribe to announcements 105 of those services. Your callback will be invoked with at least two 106 arguments: a serverid (binary string), and an announcement 107 dictionary, followed by any additional callback args/kwargs you give 108 me. I will run your callback for both new announcements and for 109 announcements that have changed, but you must be prepared to tolerate 110 duplicates. 111 112 The announcement dictionary that I give you will have the following 113 keys: 114 115 version: 0 116 service-name: str('storage') 117 118 FURL: str(furl) 119 remoteinterface-name: str(ri_name) 120 nickname: unicode 121 app-versions: {} 122 my-version: str 123 oldest-supported: str 124 125 Note that app-version will be an empty dictionary until #466 is done 126 and both the introducer and the remote client have been upgraded. For 127 current (native) server types, the serverid will always be equal to 128 the binary form of the FURL's tubid. 129 """ 130 131 def connected_to_introducer(): 132 """Returns a boolean, True if we are currently connected to the 133 introducer, False if not.""" 134 135 136 class IntroducerClient_v1(service.Service, Referenceable): 137 implements(RIIntroducerSubscriberClient_v1, IIntroducerClient) 138 139 def __init__(self, tub, introducer_furl, 140 nickname, my_version, oldest_supported): 141 self._tub = tub 142 self.introducer_furl = introducer_furl 143 144 assert type(nickname) is unicode 145 self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8 146 self._my_version = my_version 147 self._oldest_supported = oldest_supported 148 149 self._published_announcements = set() 150 151 self._publisher = None 152 153 self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples 154 self._subscribed_service_names = set() 155 self._subscriptions = set() # requests we've actually sent 156 157 # _current_announcements remembers one announcement per 158 # (servicename,serverid) pair. Anything that arrives with the same 159 # pair will displace the previous one. This stores unpacked 160 # announcement dictionaries, which can be compared for equality to 161 # distinguish re-announcement from updates. It also provides memory 162 # for clients who subscribe after startup. 163 self._current_announcements = {} 164 165 self.encoding_parameters = None 166 167 # hooks for unit tests 168 self._debug_counts = { 169 "inbound_message": 0, 170 "inbound_announcement": 0, 171 "wrong_service": 0, 172 "duplicate_announcement": 0, 173 "update": 0, 174 "new_announcement": 0, 175 "outbound_message": 0, 176 } 177 self._debug_outstanding = 0 178 179 def _debug_retired(self, res): 180 self._debug_outstanding -= 1 181 return res 182 183 def startService(self): 184 service.Service.startService(self) 185 self._introducer_error = None 186 rc = self._tub.connectTo(self.introducer_furl, self._got_introducer) 187 self._introducer_reconnector = rc 188 def connect_failed(failure): 189 self.log("Initial Introducer connection failed: perhaps it's down", 190 level=log.WEIRD, failure=failure, umid="c5MqUQ") 191 d = self._tub.getReference(self.introducer_furl) 192 d.addErrback(connect_failed) 193 194 def _got_introducer(self, publisher): 195 self.log("connected to introducer, getting versions") 196 default = { "http://allmydata.org/tahoe/protocols/introducer/v1": 197 { }, 198 "application-version": "unknown: no get_version()", 199 } 200 d = rrefutil.add_version_to_remote_reference(publisher, default) 201 d.addCallback(self._got_versioned_introducer) 202 d.addErrback(self._got_error) 203 204 def _got_error(self, f): 205 # TODO: for the introducer, perhaps this should halt the application 206 self._introducer_error = f # polled by tests 207 208 def _got_versioned_introducer(self, publisher): 209 self.log("got introducer version: %s" % (publisher.version,)) 210 # we require a V1 introducer 211 needed = "http://allmydata.org/tahoe/protocols/introducer/v1" 212 if needed not in publisher.version: 213 raise InsufficientVersionError(needed, publisher.version) 214 self._publisher = publisher 215 publisher.notifyOnDisconnect(self._disconnected) 216 self._maybe_publish() 217 self._maybe_subscribe() 218 219 def _disconnected(self): 220 self.log("bummer, we've lost our connection to the introducer") 221 self._publisher = None 222 self._subscriptions.clear() 223 224 def log(self, *args, **kwargs): 225 if "facility" not in kwargs: 226 kwargs["facility"] = "tahoe.introducer" 227 return log.msg(*args, **kwargs) 228 229 230 def publish(self, furl, service_name, remoteinterface_name): 231 assert type(self._nickname_utf8) is str # we always send UTF-8 232 ann = (furl, service_name, remoteinterface_name, 233 self._nickname_utf8, self._my_version, self._oldest_supported) 234 self._published_announcements.add(ann) 235 self._maybe_publish() 236 237 def subscribe_to(self, service_name, cb, *args, **kwargs): 238 self._local_subscribers.append( (service_name,cb,args,kwargs) ) 239 self._subscribed_service_names.add(service_name) 240 self._maybe_subscribe() 241 for (servicename,nodeid),ann_d in self._current_announcements.items(): 242 if servicename == service_name: 243 eventually(cb, nodeid, ann_d) 244 245 def _maybe_subscribe(self): 246 if not self._publisher: 247 self.log("want to subscribe, but no introducer yet", 248 level=log.NOISY) 249 return 250 for service_name in self._subscribed_service_names: 251 if service_name not in self._subscriptions: 252 # there is a race here, but the subscription desk ignores 253 # duplicate requests. 254 self._subscriptions.add(service_name) 255 self._debug_outstanding += 1 256 d = self._publisher.callRemote("subscribe", self, service_name) 257 d.addBoth(self._debug_retired) 258 d.addErrback(rrefutil.trap_deadref) 259 d.addErrback(log.err, format="server errored during subscribe", 260 facility="tahoe.introducer", 261 level=log.WEIRD, umid="2uMScQ") 262 263 def _maybe_publish(self): 264 if not self._publisher: 265 self.log("want to publish, but no introducer yet", level=log.NOISY) 266 return 267 # this re-publishes everything. The Introducer ignores duplicates 268 for ann in self._published_announcements: 269 self._debug_counts["outbound_message"] += 1 270 self._debug_outstanding += 1 271 d = self._publisher.callRemote("publish", ann) 272 d.addBoth(self._debug_retired) 273 d.addErrback(rrefutil.trap_deadref) 274 d.addErrback(log.err, 275 format="server errored during publish %(ann)s", 276 ann=ann, facility="tahoe.introducer", 277 level=log.WEIRD, umid="xs9pVQ") 278 279 280 281 def remote_announce(self, announcements): 282 self.log("received %d announcements" % len(announcements)) 283 self._debug_counts["inbound_message"] += 1 284 for ann in announcements: 285 try: 286 self._process_announcement(ann) 287 except: 288 log.err(format="unable to process announcement %(ann)s", 289 ann=ann) 290 # Don't let a corrupt announcement prevent us from processing 291 # the remaining ones. Don't return an error to the server, 292 # since they'd just ignore it anyways. 293 pass 294 295 def _process_announcement(self, ann): 296 self._debug_counts["inbound_announcement"] += 1 297 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann 298 if service_name not in self._subscribed_service_names: 299 self.log("announcement for a service we don't care about [%s]" 300 % (service_name,), level=log.UNUSUAL, umid="dIpGNA") 301 self._debug_counts["wrong_service"] += 1 302 return 303 self.log("announcement for [%s]: %s" % (service_name, ann), 304 umid="BoKEag") 305 assert type(furl) is str 306 assert type(service_name) is str 307 assert type(ri_name) is str 308 assert type(nickname_utf8) is str 309 nickname = nickname_utf8.decode("utf-8") 310 assert type(nickname) is unicode 311 assert type(ver) is str 312 assert type(oldest) is str 313 314 nodeid = b32decode(SturdyRef(furl).tubID.upper()) 315 nodeid_s = idlib.shortnodeid_b2a(nodeid) 316 317 ann_d = { "version": 0, 318 "service-name": service_name, 319 320 "FURL": furl, 321 "nickname": nickname, 322 "app-versions": {}, # need #466 and v2 introducer 323 "my-version": ver, 324 "oldest-supported": oldest, 325 } 326 327 index = (service_name, nodeid) 328 if self._current_announcements.get(index, None) == ann_d: 329 self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring", 330 service=service_name, nodeid=nodeid_s, 331 level=log.UNUSUAL, umid="B1MIdA") 332 self._debug_counts["duplicate_announcement"] += 1 333 return 334 if index in self._current_announcements: 335 self._debug_counts["update"] += 1 336 else: 337 self._debug_counts["new_announcement"] += 1 338 339 self._current_announcements[index] = ann_d 340 # note: we never forget an index, but we might update its value 341 342 for (service_name2,cb,args,kwargs) in self._local_subscribers: 343 if service_name2 == service_name: 344 eventually(cb, nodeid, ann_d, *args, **kwargs) 345 346 def remote_set_encoding_parameters(self, parameters): 347 self.encoding_parameters = parameters 348 349 def connected_to_introducer(self): 350 return bool(self._publisher) 351 352 class IntroducerService_v1(service.MultiService, Referenceable): 353 implements(RIIntroducerPublisherAndSubscriberService_v1) 354 name = "introducer" 355 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": 356 { }, 357 "application-version": str(allmydata.__full_version__), 358 } 359 360 def __init__(self, basedir="."): 361 service.MultiService.__init__(self) 362 self.introducer_url = None 363 # 'index' is (service_name, tubid) 364 self._announcements = {} # dict of index -> (announcement, timestamp) 365 self._subscribers = {} # dict of (rref->timestamp) dicts 366 self._debug_counts = {"inbound_message": 0, 367 "inbound_duplicate": 0, 368 "inbound_update": 0, 369 "outbound_message": 0, 370 "outbound_announcements": 0, 371 "inbound_subscribe": 0} 372 self._debug_outstanding = 0 373 374 def _debug_retired(self, res): 375 self._debug_outstanding -= 1 376 return res 377 378 def log(self, *args, **kwargs): 379 if "facility" not in kwargs: 380 kwargs["facility"] = "tahoe.introducer" 381 return log.msg(*args, **kwargs) 382 383 def get_announcements(self): 384 return self._announcements 385 def get_subscribers(self): 386 return self._subscribers 387 388 def remote_get_version(self): 389 return self.VERSION 390 391 def remote_publish(self, announcement): 392 try: 393 self._publish(announcement) 394 except: 395 log.err(format="Introducer.remote_publish failed on %(ann)s", 396 ann=announcement, level=log.UNUSUAL, umid="620rWA") 397 raise 398 399 def _publish(self, announcement): 400 self._debug_counts["inbound_message"] += 1 401 self.log("introducer: announcement published: %s" % (announcement,) ) 402 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement 403 #print "PUB", service_name, nickname_utf8 404 405 nodeid = b32decode(SturdyRef(furl).tubID.upper()) 406 index = (service_name, nodeid) 407 408 if index in self._announcements: 409 (old_announcement, timestamp) = self._announcements[index] 410 if old_announcement == announcement: 411 self.log("but we already knew it, ignoring", level=log.NOISY) 412 self._debug_counts["inbound_duplicate"] += 1 413 return 414 else: 415 self.log("old announcement being updated", level=log.NOISY) 416 self._debug_counts["inbound_update"] += 1 417 self._announcements[index] = (announcement, time.time()) 418 419 for s in self._subscribers.get(service_name, []): 420 self._debug_counts["outbound_message"] += 1 421 self._debug_counts["outbound_announcements"] += 1 422 self._debug_outstanding += 1 423 d = s.callRemote("announce", set([announcement])) 424 d.addBoth(self._debug_retired) 425 d.addErrback(rrefutil.trap_deadref) 426 d.addErrback(log.err, 427 format="subscriber errored on announcement %(ann)s", 428 ann=announcement, facility="tahoe.introducer", 429 level=log.UNUSUAL, umid="jfGMXQ") 430 431 def remote_subscribe(self, subscriber, service_name): 432 self.log("introducer: subscription[%s] request at %s" % (service_name, 433 subscriber)) 434 self._debug_counts["inbound_subscribe"] += 1 435 if service_name not in self._subscribers: 436 self._subscribers[service_name] = {} 437 subscribers = self._subscribers[service_name] 438 if subscriber in subscribers: 439 self.log("but they're already subscribed, ignoring", 440 level=log.UNUSUAL) 441 return 442 subscribers[subscriber] = time.time() 443 def _remove(): 444 self.log("introducer: unsubscribing[%s] %s" % (service_name, 445 subscriber)) 446 subscribers.pop(subscriber, None) 447 subscriber.notifyOnDisconnect(_remove) 448 449 announcements = set( 450 [ ann 451 for (sn2,nodeid),(ann,when) in self._announcements.items() 452 if sn2 == service_name] ) 453 454 self._debug_counts["outbound_message"] += 1 455 self._debug_counts["outbound_announcements"] += len(announcements) 456 self._debug_outstanding += 1 457 d = subscriber.callRemote("announce", announcements) 458 d.addBoth(self._debug_retired) 459 d.addErrback(rrefutil.trap_deadref) 460 d.addErrback(log.err, 461 format="subscriber errored during subscribe %(anns)s", 462 anns=announcements, facility="tahoe.introducer", 463 level=log.UNUSUAL, umid="1XChxA") -
src/allmydata/introducer/server.py
diff --git a/src/allmydata/introducer/server.py b/src/allmydata/introducer/server.py index 117fcb5..7868e8f 100644
a b 1 1 2 2 import time, os.path 3 from base64 import b32decode4 3 from zope.interface import implements 5 4 from twisted.application import service 6 from foolscap.api import Referenceable , SturdyRef5 from foolscap.api import Referenceable 7 6 import allmydata 8 7 from allmydata import node 9 from allmydata.util import log, rrefutil8 from allmydata.util import log, base32, idlib 10 9 from allmydata.introducer.interfaces import \ 11 RIIntroducerPublisherAndSubscriberService 10 RIIntroducerPublisherAndSubscriberService_v2 11 from allmydata.introducer.common import convert_announcement_v1_to_v2, \ 12 convert_announcement_v2_to_v1, unsign, make_index 12 13 13 14 class IntroducerNode(node.Node): 14 15 PORTNUMFILE = "introducer.port" … … class IntroducerNode(node.Node): 30 31 def _publish(res): 31 32 self.introducer_url = self.tub.registerReference(introducerservice, 32 33 "introducer") 33 self.log(" introducer is at %s" % self.introducer_url) 34 self.log(" introducer is at %s" % self.introducer_url, 35 umid="qF2L9A") 34 36 self.write_config("introducer.furl", self.introducer_url + "\n") 35 37 d.addCallback(_publish) 36 38 d.addErrback(log.err, facility="tahoe.init", 37 39 level=log.BAD, umid="UaNs9A") 38 40 39 41 def init_web(self, webport): 40 self.log("init_web(webport=%s)", args=(webport,) )42 self.log("init_web(webport=%s)", args=(webport,), umid="2bUygA") 41 43 42 44 from allmydata.webish import IntroducerWebishServer 43 45 nodeurl_path = os.path.join(self.basedir, "node.url") 44 46 ws = IntroducerWebishServer(self, webport, nodeurl_path) 45 47 self.add_service(ws) 46 48 49 class SubscriberAdapter_v1: # for_v1 50 """I wrap a RemoteReference that points at an old v1 subscriber, enabling 51 it to be treated like a v2 subscriber. 52 """ 53 54 def __init__(self, original): 55 self.original = original 56 def __eq__(self, them): 57 return self.original == them 58 def __ne__(self, them): 59 return self.original != them 60 def __hash__(self): 61 return hash(self.original) 62 def getRemoteTubID(self): 63 return self.original.getRemoteTubID() 64 def getSturdyRef(self): 65 return self.original.getSturdyRef() 66 def getPeer(self): 67 return self.original.getPeer() 68 def callRemote(self, methname, *args, **kwargs): 69 m = getattr(self, "wrap_" + methname) 70 return m(*args, **kwargs) 71 def wrap_announce_v2(self, announcements): 72 anns_v1 = [convert_announcement_v2_to_v1(ann) for ann in announcements] 73 return self.original.callRemote("announce", set(anns_v1)) 74 def wrap_set_encoding_parameters(self, parameters): 75 return self.original.callRemote("set_encoding_parameters", parameters) 76 def notifyOnDisconnect(self, *args, **kwargs): 77 return self.original.notifyOnDisconnect(*args, **kwargs) 78 47 79 class IntroducerService(service.MultiService, Referenceable): 48 implements(RIIntroducerPublisherAndSubscriberService )80 implements(RIIntroducerPublisherAndSubscriberService_v2) 49 81 name = "introducer" 50 82 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v1": 51 83 { }, … … class IntroducerService(service.MultiService, Referenceable): 56 88 service.MultiService.__init__(self) 57 89 self.introducer_url = None 58 90 # 'index' is (service_name, tubid) 59 self._announcements = {} # dict of index -> (announcement, timestamp) 60 self._subscribers = {} # dict of (rref->timestamp) dicts 91 self._announcements = {} # dict of index -> 92 # (ann_s, canary, ann_d, timestamp) 93 94 # ann_d is cleaned up (nickname is always unicode, servicename is 95 # always ascii, etc, even though simplejson.loads sometimes returns 96 # either) 97 98 # self._subscribers is a dict mapping servicename to subscriptions 99 # 'subscriptions' is a dict mapping rref to a subscription 100 # 'subscription' is a tuple of (subscriber_info, timestamp) 101 # 'subscriber_info' is a dict, provided directly for v2 clients, or 102 # synthesized for v1 clients. The expected keys are: 103 # version, nickname, app-versions, my-version, oldest-supported 104 self._subscribers = {} 105 106 # self._stub_client_announcements contains the information provided 107 # by v1 clients. We stash this so we can match it up with their 108 # subscriptions. 109 self._stub_client_announcements = {} # maps tubid to sinfo # for_v1 110 61 111 self._debug_counts = {"inbound_message": 0, 62 112 "inbound_duplicate": 0, 63 113 "inbound_update": 0, 64 114 "outbound_message": 0, 65 115 "outbound_announcements": 0, 66 116 "inbound_subscribe": 0} 117 self._debug_outstanding = 0 # also covers SubscriberAdapter_v1 118 119 def _debug_retired(self, res): 120 self._debug_outstanding -= 1 121 return res 67 122 68 123 def log(self, *args, **kwargs): 69 124 if "facility" not in kwargs: 70 kwargs["facility"] = "tahoe.introducer "125 kwargs["facility"] = "tahoe.introducer.server" 71 126 return log.msg(*args, **kwargs) 72 127 73 128 def get_announcements(self): 74 129 return self._announcements 75 130 def get_subscribers(self): 76 return self._subscribers 131 """Return a list of (service_name, when, subscriber_info, rref) for 132 all subscribers. subscriber_info is a dict with the following keys: 133 version, nickname, app-versions, my-version, oldest-supported""" 134 s = [] 135 for service_name, subscriptions in self._subscribers.items(): 136 for rref,(subscriber_info,when) in subscriptions.items(): 137 s.append( (service_name, when, subscriber_info, rref) ) 138 return s 77 139 78 140 def remote_get_version(self): 79 141 return self.VERSION 80 142 81 def remote_publish(self, announcement): 143 def remote_publish(self, ann_s): # for_v1 144 lp = self.log("introducer: old (v1) announcement published: %s" 145 % (ann_s,), umid="6zGOIw") 146 ann_v2 = convert_announcement_v1_to_v2(ann_s) 147 return self.publish(ann_v2, None, lp) 148 149 def remote_publish_v2(self, ann_s, canary): 150 lp = self.log("introducer: announcement (v2) published", umid="L2QXkQ") 151 return self.publish(ann_s, canary, lp) 152 153 def publish(self, ann_s, canary, lp): 82 154 try: 83 self._publish(ann ouncement)155 self._publish(ann_s, canary, lp) 84 156 except: 85 157 log.err(format="Introducer.remote_publish failed on %(ann)s", 86 ann=announcement, level=log.UNUSUAL, umid="620rWA") 158 ann=ann_s, 159 level=log.UNUSUAL, parent=lp, umid="620rWA") 87 160 raise 88 161 89 def _publish(self, ann ouncement):162 def _publish(self, ann_s, canary, lp): 90 163 self._debug_counts["inbound_message"] += 1 91 self.log("introducer: announcement published: %s" % (announcement,) ) 92 (furl, service_name, ri_name, nickname_utf8, ver, oldest) = announcement 164 self.log("introducer: announcement published: %s" % (ann_s,), 165 umid="wKHgCw") 166 ann_d, key = unsign(ann_s) # might raise BadSignatureError 167 index = make_index(ann_d, key) 93 168 94 nodeid = b32decode(SturdyRef(furl).tubID.upper()) 95 index = (service_name, nodeid) 169 service_name = str(ann_d["service-name"]) 170 if service_name == "stub_client": # for_v1 171 self._attach_stub_client(ann_d, index, lp) 172 return 96 173 97 174 if index in self._announcements: 98 (old_announcement, timestamp) = self._announcements[index] 99 if old_announcement == announcement: 100 self.log("but we already knew it, ignoring", level=log.NOISY) 175 (old_ann_s, canary, ann_d, timestamp) = self._announcements[index] 176 if old_ann_s == ann_s: 177 self.log("but we already knew it, ignoring", level=log.NOISY, 178 umid="myxzLw") 101 179 self._debug_counts["inbound_duplicate"] += 1 102 180 return 103 181 else: 104 self.log("old announcement being updated", level=log.NOISY) 182 self.log("old announcement being updated", level=log.NOISY, 183 umid="304r9g") 105 184 self._debug_counts["inbound_update"] += 1 106 self._announcements[index] = (announcement, time.time()) 185 self._announcements[index] = (ann_s, canary, ann_d, time.time()) 186 #if canary: 187 # canary.notifyOnDisconnect ... 188 # use a CanaryWatcher? with cw.is_connected()? 189 # actually we just want foolscap to give rref.is_connected(), since 190 # this is only for the status display 107 191 108 192 for s in self._subscribers.get(service_name, []): 109 193 self._debug_counts["outbound_message"] += 1 110 194 self._debug_counts["outbound_announcements"] += 1 111 d = s.callRemote("announce", set([announcement])) 112 d.addErrback(rrefutil.trap_deadref) 195 self._debug_outstanding += 1 196 d = s.callRemote("announce_v2", set([ann_s])) 197 d.addBoth(self._debug_retired) 113 198 d.addErrback(log.err, 114 199 format="subscriber errored on announcement %(ann)s", 115 ann=ann ouncement, facility="tahoe.introducer",200 ann=ann_s, facility="tahoe.introducer", 116 201 level=log.UNUSUAL, umid="jfGMXQ") 117 202 118 def remote_subscribe(self, subscriber, service_name): 119 self.log("introducer: subscription[%s] request at %s" % (service_name, 120 subscriber)) 203 def _attach_stub_client(self, ann_d, index, lp): 204 # There might be a v1 subscriber for whom this is a stub_client. 205 # We might have received the subscription before the stub_client 206 # announcement, in which case we now need to fix up the record in 207 # self._subscriptions . 208 209 # record it for later, in case the stub_client arrived before the 210 # subscription 211 subscriber_info = self._get_subscriber_info_from_ann_d(ann_d) 212 ann_tubid = index[1] 213 self._stub_client_announcements[ann_tubid] = subscriber_info 214 215 lp2 = self.log("stub_client announcement, " 216 "looking for matching subscriber", 217 parent=lp, level=log.NOISY, umid="BTywDg") 218 219 for sn in self._subscribers: 220 s = self._subscribers[sn] 221 for (subscriber, info) in s.items(): 222 # we correlate these by looking for a subscriber whose tubid 223 # matches this announcement 224 sub_tubid = base32.a2b(subscriber.getRemoteTubID()) # binary 225 if sub_tubid == ann_tubid: 226 self.log(format="found a match, nodeid=%(nodeid)s", 227 nodeid=idlib.nodeid_b2a(sub_tubid), 228 level=log.NOISY, parent=lp2, umid="xsWs1A") 229 # found a match. Does it need info? 230 if not info[0]: 231 self.log(format="replacing info", 232 level=log.NOISY, parent=lp2, umid="m5kxwA") 233 # yup 234 s[subscriber] = (subscriber_info, info[1]) 235 # and we don't remember or announce stub_clients beyond what we 236 # need to get the subscriber_info set up 237 238 def _get_subscriber_info_from_ann_d(self, ann_d): # for_v1 239 sinfo = { "version": ann_d["version"], 240 "nickname": ann_d["nickname"], 241 "app-versions": ann_d["app-versions"], 242 "my-version": ann_d["my-version"], 243 "oldest-supported": ann_d["oldest-supported"], 244 } 245 return sinfo 246 247 def remote_subscribe(self, subscriber, service_name): # for_v1 248 self.log("introducer: old (v1) subscription[%s] request at %s" 249 % (service_name, subscriber), umid="hJlGUg") 250 return self.add_subscriber(SubscriberAdapter_v1(subscriber), 251 service_name, None) 252 253 def remote_subscribe_v2(self, subscriber, service_name, subscriber_info): 254 self.log("introducer: subscription[%s] request at %s" 255 % (service_name, subscriber), umid="U3uzLg") 256 return self.add_subscriber(subscriber, service_name, subscriber_info) 257 258 def add_subscriber(self, subscriber, service_name, subscriber_info): 121 259 self._debug_counts["inbound_subscribe"] += 1 122 260 if service_name not in self._subscribers: 123 261 self._subscribers[service_name] = {} 124 262 subscribers = self._subscribers[service_name] 125 263 if subscriber in subscribers: 126 264 self.log("but they're already subscribed, ignoring", 127 level=log.UNUSUAL )265 level=log.UNUSUAL, umid="Sy9EfA") 128 266 return 129 subscribers[subscriber] = time.time() 267 268 if not subscriber_info: # for_v1 269 # v1 clients don't provide subscriber_info, but they should 270 # publish a 'stub client' record which contains the same 271 # information. If we've already received this, it will be in 272 # self._stub_client_announcements 273 tubid_b32 = subscriber.getRemoteTubID() 274 tubid = base32.a2b(tubid_b32) 275 if tubid in self._stub_client_announcements: 276 subscriber_info = self._stub_client_announcements[tubid] 277 278 subscribers[subscriber] = (subscriber_info, time.time()) 130 279 def _remove(): 131 280 self.log("introducer: unsubscribing[%s] %s" % (service_name, 132 subscriber)) 281 subscriber), 282 umid="vYGcJg") 133 283 subscribers.pop(subscriber, None) 134 284 subscriber.notifyOnDisconnect(_remove) 135 285 136 announcements = set( 137 [ ann 138 for (sn2,nodeid),(ann,when) in self._announcements.items() 139 if sn2 == service_name] ) 140 141 self._debug_counts["outbound_message"] += 1 142 self._debug_counts["outbound_announcements"] += len(announcements) 143 d = subscriber.callRemote("announce", announcements) 144 d.addErrback(rrefutil.trap_deadref) 145 d.addErrback(log.err, 146 format="subscriber errored during subscribe %(anns)s", 147 anns=announcements, facility="tahoe.introducer", 148 level=log.UNUSUAL, umid="mtZepQ") 286 # now tell them about any announcements they're interested in 287 announcements = set( [ ann_s 288 for idx,(ann_s,canary,ann_d,when) 289 in self._announcements.items() 290 if idx[0] == service_name] ) 291 if announcements: 292 self._debug_counts["outbound_message"] += 1 293 self._debug_counts["outbound_announcements"] += len(announcements) 294 self._debug_outstanding += 1 295 d = subscriber.callRemote("announce_v2", announcements) 296 d.addBoth(self._debug_retired) 297 d.addErrback(log.err, 298 format="subscriber errored during subscribe %(anns)s", 299 anns=announcements, facility="tahoe.introducer", 300 level=log.UNUSUAL, umid="mtZepQ") 301 return d -
src/allmydata/test/test_introducer.py
diff --git a/src/allmydata/test/test_introducer.py b/src/allmydata/test/test_introducer.py index 9d0f50e..69d7f1d 100644
a b from twisted.python import log 9 9 from foolscap.api import Tub, Referenceable, fireEventually, flushEventualQueue 10 10 from twisted.application import service 11 11 from allmydata.interfaces import InsufficientVersionError 12 from allmydata.introducer.client import IntroducerClient 12 from allmydata.introducer.client import IntroducerClient, ClientAdapter_v1 13 13 from allmydata.introducer.server import IntroducerService 14 from allmydata.introducer import old 14 15 # test compatibility with old introducer .tac files 15 16 from allmydata.introducer import IntroducerNode 16 from allmydata.util import pollmixin 17 from allmydata.util import pollmixin, ecdsa 17 18 import allmydata.test.common_util as testutil 18 19 19 20 class LoggingMultiService(service.MultiService): … … class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): 47 48 48 49 def test_create(self): 49 50 ic = IntroducerClient(None, "introducer.furl", u"my_nickname", 50 "my_version", "oldest_version" )51 "my_version", "oldest_version", {}) 51 52 self.failUnless(isinstance(ic, IntroducerClient)) 52 53 53 54 def test_listen(self): 54 55 i = IntroducerService() 55 56 i.setServiceParent(self.parent) 56 57 57 def test_duplicate (self):58 def test_duplicate_publish(self): 58 59 i = IntroducerService() 59 60 self.failUnlessEqual(len(i.get_announcements()), 0) 60 61 self.failUnlessEqual(len(i.get_subscribers()), 0) … … class Introducer(ServiceMixin, unittest.TestCase, pollmixin.PollMixin): 73 74 self.failUnlessEqual(len(i.get_announcements()), 2) 74 75 self.failUnlessEqual(len(i.get_subscribers()), 0) 75 76 77 78 79 class Client(unittest.TestCase): 80 def test_duplicate_receive_v1(self): 81 ic = IntroducerClient(None, 82 "introducer.furl", u"my_nickname", 83 "my_version", "oldest_version", {}) 84 announcements = [] 85 ic.subscribe_to("storage", 86 lambda nodeid,ann_d: announcements.append(ann_d)) 87 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra" 88 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0") 89 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0") 90 ca = ClientAdapter_v1(ic) 91 92 ca.remote_announce([ann1]) 93 d = fireEventually() 94 def _then(ign): 95 self.failUnlessEqual(len(announcements), 1) 96 self.failUnlessEqual(announcements[0]["nickname"], u"nick1") 97 self.failUnlessEqual(announcements[0]["my-version"], "ver23") 98 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1) 99 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) 100 self.failUnlessEqual(ic._debug_counts["update"], 0) 101 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0) 102 # now send a duplicate announcement: this should not notify clients 103 ca.remote_announce([ann1]) 104 return fireEventually() 105 d.addCallback(_then) 106 def _then2(ign): 107 self.failUnlessEqual(len(announcements), 1) 108 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2) 109 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) 110 self.failUnlessEqual(ic._debug_counts["update"], 0) 111 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) 112 # and a replacement announcement: same FURL, new other stuff. 113 # Clients should be notified. 114 ca.remote_announce([ann1b]) 115 return fireEventually() 116 d.addCallback(_then2) 117 def _then3(ign): 118 self.failUnlessEqual(len(announcements), 2) 119 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3) 120 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1) 121 self.failUnlessEqual(ic._debug_counts["update"], 1) 122 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1) 123 # test that the other stuff changed 124 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1") 125 self.failUnlessEqual(announcements[-1]["my-version"], "ver24") 126 d.addCallback(_then3) 127 return d 128 129 def test_duplicate_receive_v2(self): 130 ic1 = IntroducerClient(None, 131 "introducer.furl", u"my_nickname", 132 "ver23", "oldest_version", {}) 133 # we use a second client just to create a different-looking 134 # announcement 135 ic2 = IntroducerClient(None, 136 "introducer.furl", u"my_nickname", 137 "ver24","oldest_version",{}) 138 announcements = [] 139 def _received(nodeid, ann_d): 140 announcements.append( (nodeid, ann_d) ) 141 ic1.subscribe_to("storage", _received) 142 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnp" 143 furl1a = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:7777/gydnp" 144 furl2 = "pb://ttwwooyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/ttwwoo" 145 146 privkey = ecdsa.SigningKey.generate() 147 pubkey = privkey.get_verifying_key() 148 pubkey_hex = pubkey.to_string().encode("hex") 149 150 # ann1: ic1, furl1 151 # ann1a: ic1, furl1a (same SturdyRef, different connection hints) 152 # ann1b: ic2, furl1 153 # ann2: ic2, furl2 154 155 self.ann1 = ic1.create_announcement(furl1, "storage", "RIStorage", 156 privkey) 157 self.ann1a = ic1.create_announcement(furl1a, "storage", "RIStorage", 158 privkey) 159 self.ann1b = ic2.create_announcement(furl1, "storage", "RIStorage", 160 privkey) 161 self.ann2 = ic2.create_announcement(furl2, "storage", "RIStorage", 162 privkey) 163 164 ic1.remote_announce_v2([self.ann1]) # queues eventual-send 165 d = fireEventually() 166 def _then1(ign): 167 self.failUnlessEqual(len(announcements), 1) 168 nodeid,ann_d = announcements[0] 169 self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) 170 self.failUnlessEqual(ann_d["FURL"], furl1) 171 self.failUnlessEqual(ann_d["my-version"], "ver23") 172 d.addCallback(_then1) 173 174 # now send a duplicate announcement. This should not fire the 175 # subscriber 176 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1])) 177 d.addCallback(fireEventually) 178 def _then2(ign): 179 self.failUnlessEqual(len(announcements), 1) 180 d.addCallback(_then2) 181 182 # and a replacement announcement: same FURL, new other stuff. The 183 # subscriber *should* be fired. 184 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1b])) 185 d.addCallback(fireEventually) 186 def _then3(ign): 187 self.failUnlessEqual(len(announcements), 2) 188 nodeid,ann_d = announcements[-1] 189 self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) 190 self.failUnlessEqual(ann_d["FURL"], furl1) 191 self.failUnlessEqual(ann_d["my-version"], "ver24") 192 d.addCallback(_then3) 193 194 # and a replacement announcement with a different FURL (it uses 195 # different connection hints) 196 d.addCallback(lambda ign: ic1.remote_announce_v2([self.ann1a])) 197 d.addCallback(fireEventually) 198 def _then4(ign): 199 self.failUnlessEqual(len(announcements), 3) 200 nodeid,ann_d = announcements[-1] 201 self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) 202 self.failUnlessEqual(ann_d["FURL"], furl1a) 203 self.failUnlessEqual(ann_d["my-version"], "ver23") 204 d.addCallback(_then4) 205 206 # now add a new subscription, which should be called with the 207 # backlog. The introducer only records one announcement per index, so 208 # the backlog will only have the latest message. 209 announcements2 = [] 210 def _received2(nodeid, ann_d): 211 announcements2.append( (nodeid, ann_d) ) 212 d.addCallback(lambda ign: ic1.subscribe_to("storage", _received2)) 213 d.addCallback(fireEventually) 214 def _then5(ign): 215 self.failUnlessEqual(len(announcements2), 1) 216 nodeid,ann_d = announcements2[-1] 217 self.failUnlessEqual(nodeid.encode("hex"), pubkey_hex) 218 self.failUnlessEqual(ann_d["FURL"], furl1a) 219 self.failUnlessEqual(ann_d["my-version"], "ver23") 220 d.addCallback(_then5) 221 return d 222 76 223 class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): 77 224 78 225 def create_tub(self, portnum=0): … … class SystemTestMixin(ServiceMixin, pollmixin.PollMixin): 88 235 assert self.central_portnum == portnum 89 236 tub.setLocation("localhost:%d" % self.central_portnum) 90 237 238 V1 = "v1"; V2 = "v2" 91 239 class SystemTest(SystemTestMixin, unittest.TestCase): 92 240 93 def test_system(self): 94 self.basedir = "introducer/SystemTest/system" 95 os.makedirs(self.basedir) 96 return self.do_system_test(IntroducerService) 97 test_system.timeout = 480 # occasionally takes longer than 350s on "draco" 98 99 def do_system_test(self, create_introducer): 241 def do_system_test(self, server_version): 100 242 self.create_tub() 101 introducer = create_introducer() 243 if server_version == V1: 244 introducer = old.IntroducerService_v1() 245 else: 246 introducer = IntroducerService() 102 247 introducer.setServiceParent(self.parent) 103 248 iff = os.path.join(self.basedir, "introducer.furl") 104 249 tub = self.central_tub 105 250 ifurl = self.central_tub.registerReference(introducer, furlFile=iff) 106 251 self.introducer_furl = ifurl 107 252 108 NUMCLIENTS = 5109 # we have 5 clients who publish themselves, and an extra one does110 # which not. When the connections are fully established, all six nodes253 # we have 5 clients who publish themselves as storage servers, and a 254 # sixth which does which not. All 6 clients subscriber to hear about 255 # storage. When the connections are fully established, all six nodes 111 256 # should have 5 connections each. 257 NUM_STORAGE = 5 258 NUM_CLIENTS = 6 112 259 113 260 clients = [] 114 261 tubs = {} 115 262 received_announcements = {} 116 NUM_SERVERS = NUMCLIENTS117 263 subscribing_clients = [] 118 264 publishing_clients = [] 265 privkeys = {} 266 expected_announcements = [0 for c in range(NUM_CLIENTS)] 119 267 120 for i in range(NUM CLIENTS+1):268 for i in range(NUM_CLIENTS): 121 269 tub = Tub() 122 270 #tub.setOption("logLocalFailures", True) 123 271 #tub.setOption("logRemoteFailures", True) … … class SystemTest(SystemTestMixin, unittest.TestCase): 128 276 tub.setLocation("localhost:%d" % portnum) 129 277 130 278 log.msg("creating client %d: %s" % (i, tub.getShortTubID())) 131 c = IntroducerClient(tub, self.introducer_furl, u"nickname-%d" % i, 132 "version", "oldest") 279 if i == 0: 280 c = old.IntroducerClient_v1(tub, self.introducer_furl, 281 u"nickname-%d" % i, 282 "version", "oldest") 283 else: 284 c = IntroducerClient(tub, self.introducer_furl, 285 u"nickname-%d" % i, 286 "version", "oldest", 287 {"component": "component-v1"}) 133 288 received_announcements[c] = {} 134 289 def got(serverid, ann_d, announcements): 135 290 announcements[serverid] = ann_d 136 291 c.subscribe_to("storage", got, received_announcements[c]) 137 292 subscribing_clients.append(c) 138 139 if i < NUMCLIENTS: 140 node_furl = tub.registerReference(Referenceable()) 141 c.publish(node_furl, "storage", "ri_name") 293 expected_announcements[i] += 1 # all expect a 'storage' announcement 294 295 node_furl = tub.registerReference(Referenceable()) 296 if i < NUM_STORAGE: 297 if i == 1: 298 # sign the announcement 299 privkey = privkeys[c] = ecdsa.SigningKey.generate() 300 c.publish(node_furl, "storage", "ri_name", privkey) 301 else: 302 c.publish(node_furl, "storage", "ri_name") 142 303 publishing_clients.append(c) 143 # the last one does not publish anything 304 else: 305 # the last one does not publish anything 306 pass 307 308 if i == 0: 309 # users of the V1 client were required to publish a 310 # 'stub_client' record (somewhat after they published the 311 # 'storage' record), so the introducer could see their 312 # version. Match that behavior. 313 c.publish(node_furl, "stub_client", "stub_ri_name") 314 315 if i == 2: 316 # also publish something that nobody cares about 317 boring_furl = tub.registerReference(Referenceable()) 318 c.publish(boring_furl, "boring", "ri_name") 144 319 145 320 c.setServiceParent(self.parent) 146 321 clients.append(c) 147 322 tubs[c] = tub 148 323 149 def _wait_for_all_connections(): 150 for c in subscribing_clients: 151 if len(received_announcements[c]) < NUM_SERVERS: 324 325 def _wait_for_connected(ign): 326 def _connected(): 327 for c in clients: 328 if not c.connected_to_introducer(): 329 return False 330 return True 331 return self.poll(_connected) 332 333 # we watch the clients to determine when the system has settled down. 334 # Then we can look inside the server to assert things about its 335 # state. 336 337 def _wait_for_expected_announcements(ign): 338 def _got_expected_announcements(): 339 for i,c in enumerate(subscribing_clients): 340 if len(received_announcements[c]) < expected_announcements[i]: 341 return False 342 return True 343 return self.poll(_got_expected_announcements) 344 345 # before shutting down any Tub, we'd like to know that there are no 346 # messages outstanding 347 348 def _wait_until_idle(ign): 349 def _idle(): 350 for c in subscribing_clients + publishing_clients: 351 if c._debug_outstanding: 352 return False 353 if introducer._debug_outstanding: 152 354 return False 153 return True 154 d = self.poll(_wait_for_all_connections) 355 return True 356 return self.poll(_idle) 357 358 d = defer.succeed(None) 359 d.addCallback(_wait_for_connected) 360 d.addCallback(_wait_for_expected_announcements) 361 d.addCallback(_wait_until_idle) 155 362 156 363 def _check1(res): 157 364 log.msg("doing _check1") 158 365 dc = introducer._debug_counts 159 self.failUnlessEqual(dc["inbound_message"], NUM_SERVERS) 160 self.failUnlessEqual(dc["inbound_duplicate"], 0) 366 if server_version == V1: 367 # each storage server publishes a record, and (after its 368 # 'subscribe' has been ACKed) also publishes a "stub_client". 369 # The non-storage client (which subscribes) also publishes a 370 # stub_client. There is also one "boring" service. The number 371 # of messages is higher, because the stub_clients aren't 372 # published until after we get the 'subscribe' ack (since we 373 # don't realize that we're dealing with a v1 server [which 374 # needs stub_clients] until then), and the act of publishing 375 # the stub_client causes us to re-send all previous 376 # announcements. 377 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"], 378 NUM_STORAGE + NUM_CLIENTS + 1) 379 else: 380 # each storage server publishes a record. There is also one 381 # "stub_client" and one "boring" 382 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) 383 self.failUnlessEqual(dc["inbound_duplicate"], 0) 161 384 self.failUnlessEqual(dc["inbound_update"], 0) 162 self.failUnless(dc["outbound_message"]) 385 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) 386 # the number of outbound messages is tricky.. I think it depends 387 # upon a race between the publish and the subscribe messages. 388 self.failUnless(dc["outbound_message"] > 0) 389 # each client subscribes to "storage", and each server publishes 390 self.failUnlessEqual(dc["outbound_announcements"], 391 NUM_STORAGE*NUM_CLIENTS) 163 392 164 for c in clients:165 self.failUnless(c.connected_to_introducer())166 393 for c in subscribing_clients: 167 394 cdc = c._debug_counts 168 395 self.failUnless(cdc["inbound_message"]) 169 396 self.failUnlessEqual(cdc["inbound_announcement"], 170 NUM_S ERVERS)397 NUM_STORAGE) 171 398 self.failUnlessEqual(cdc["wrong_service"], 0) 172 399 self.failUnlessEqual(cdc["duplicate_announcement"], 0) 173 400 self.failUnlessEqual(cdc["update"], 0) 174 401 self.failUnlessEqual(cdc["new_announcement"], 175 NUM_S ERVERS)402 NUM_STORAGE) 176 403 anns = received_announcements[c] 177 self.failUnlessEqual(len(anns), NUM_S ERVERS)404 self.failUnlessEqual(len(anns), NUM_STORAGE) 178 405 179 406 nodeid0 = b32decode(tubs[clients[0]].tubID.upper()) 180 407 ann_d = anns[nodeid0] 181 408 nick = ann_d["nickname"] 182 409 self.failUnlessEqual(type(nick), unicode) 183 410 self.failUnlessEqual(nick, u"nickname-0") 184 for c in publishing_clients: 185 cdc = c._debug_counts 186 self.failUnlessEqual(cdc["outbound_message"], 1) 411 if server_version == V1: 412 for c in publishing_clients: 413 cdc = c._debug_counts 414 expected = 1 # storage 415 if c is clients[2]: 416 expected += 1 # boring 417 if c is not clients[0]: 418 # the v2 client tries to call publish_v2, which fails 419 # because the server is v1. It then re-sends 420 # everything it has so far, plus a stub_client record 421 expected = 2*expected + 1 422 if c is clients[0]: 423 # we always tell v1 client to send stub_client 424 expected += 1 425 self.failUnlessEqual(cdc["outbound_message"], expected) 426 else: 427 for c in publishing_clients: 428 cdc = c._debug_counts 429 expected = 1 430 if c in [clients[0], # stub_client 431 clients[2], # boring 432 ]: 433 expected = 2 434 self.failUnlessEqual(cdc["outbound_message"], expected) 435 log.msg("_check1 done") 187 436 d.addCallback(_check1) 188 437 189 438 # force an introducer reconnect, by shutting down the Tub it's using … … class SystemTest(SystemTestMixin, unittest.TestCase): 196 445 d.addCallback(lambda _ign: log.msg("shutting down introducer's Tub")) 197 446 d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) 198 447 199 def _wait_for_introducer_loss(): 200 for c in clients: 201 if c.connected_to_introducer(): 202 return False 203 return True 204 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) 448 def _wait_for_introducer_loss(ign): 449 def _introducer_lost(): 450 for c in clients: 451 if c.connected_to_introducer(): 452 return False 453 return True 454 return self.poll(_introducer_lost) 455 d.addCallback(_wait_for_introducer_loss) 205 456 206 457 def _restart_introducer_tub(_ign): 207 458 log.msg("restarting introducer's Tub") 208 209 dc = introducer._debug_counts 210 self.expected_count = dc["inbound_message"] + NUM_SERVERS 211 self.expected_subscribe_count = dc["inbound_subscribe"] + NUMCLIENTS+1 212 introducer._debug0 = dc["outbound_message"] 213 for c in subscribing_clients: 214 cdc = c._debug_counts 215 c._debug0 = cdc["inbound_message"] 216 459 # reset counters 460 for i in range(NUM_CLIENTS): 461 c = subscribing_clients[i] 462 for k in c._debug_counts: 463 c._debug_counts[k] = 0 464 for k in introducer._debug_counts: 465 introducer._debug_counts[k] = 0 466 expected_announcements[i] += 1 # new 'storage' for everyone 217 467 self.create_tub(self.central_portnum) 218 468 newfurl = self.central_tub.registerReference(introducer, 219 469 furlFile=iff) 220 470 assert newfurl == self.introducer_furl 221 471 d.addCallback(_restart_introducer_tub) 222 472 223 def _wait_for_introducer_reconnect(): 224 # wait until: 225 # all clients are connected 226 # the introducer has received publish messages from all of them 227 # the introducer has received subscribe messages from all of them 228 # the introducer has sent (duplicate) announcements to all of them 229 # all clients have received (duplicate) announcements 230 dc = introducer._debug_counts 231 for c in clients: 232 if not c.connected_to_introducer(): 233 return False 234 if dc["inbound_message"] < self.expected_count: 235 return False 236 if dc["inbound_subscribe"] < self.expected_subscribe_count: 237 return False 238 for c in subscribing_clients: 239 cdc = c._debug_counts 240 if cdc["inbound_message"] < c._debug0+1: 241 return False 242 return True 243 d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect)) 473 d.addCallback(_wait_for_connected) 474 d.addCallback(_wait_for_expected_announcements) 475 d.addCallback(_wait_until_idle) 476 d.addCallback(lambda _ign: log.msg(" reconnected")) 244 477 245 478 def _check2(res): 246 479 log.msg("doing _check2") 247 480 # assert that the introducer sent out new messages, one per 248 481 # subscriber 249 482 dc = introducer._debug_counts 250 self.failUnlessEqual(dc["inbound_message"], 2*NUM_SERVERS) 251 self.failUnlessEqual(dc["inbound_duplicate"], NUM_SERVERS) 252 self.failUnlessEqual(dc["inbound_update"], 0) 253 self.failUnlessEqual(dc["outbound_message"], 254 introducer._debug0 + len(subscribing_clients)) 255 for c in clients: 256 self.failUnless(c.connected_to_introducer()) 483 self.failUnlessEqual(dc["outbound_announcements"], 484 NUM_STORAGE*NUM_CLIENTS) 485 self.failUnless(dc["outbound_message"] > 0) 486 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) 257 487 for c in subscribing_clients: 258 488 cdc = c._debug_counts 259 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_SERVERS) 489 self.failUnlessEqual(cdc["inbound_message"], 1) 490 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) 491 self.failUnlessEqual(cdc["new_announcement"], 0) 492 self.failUnlessEqual(cdc["wrong_service"], 0) 493 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) 260 494 d.addCallback(_check2) 261 495 262 496 # Then force an introducer restart, by shutting down the Tub, … … class SystemTest(SystemTestMixin, unittest.TestCase): 267 501 268 502 d.addCallback(lambda _ign: log.msg("shutting down introducer")) 269 503 d.addCallback(lambda _ign: self.central_tub.disownServiceParent()) 270 d.addCallback(lambda res: self.poll(_wait_for_introducer_loss)) 504 d.addCallback(_wait_for_introducer_loss) 505 d.addCallback(lambda _ign: log.msg("introducer lost")) 271 506 272 507 def _restart_introducer(_ign): 273 508 log.msg("restarting introducer") 274 509 self.create_tub(self.central_portnum) 275 276 for c in subscribing_clients: 277 # record some counters for later comparison. Stash the values 278 # on the client itself, because I'm lazy. 279 cdc = c._debug_counts 280 c._debug1 = cdc["inbound_announcement"] 281 c._debug2 = cdc["inbound_message"] 282 c._debug3 = cdc["new_announcement"] 283 newintroducer = create_introducer() 284 self.expected_message_count = NUM_SERVERS 285 self.expected_announcement_count = NUM_SERVERS*len(subscribing_clients) 286 self.expected_subscribe_count = len(subscribing_clients) 287 newfurl = self.central_tub.registerReference(newintroducer, 510 # reset counters 511 for i in range(NUM_CLIENTS): 512 c = subscribing_clients[i] 513 for k in c._debug_counts: 514 c._debug_counts[k] = 0 515 expected_announcements[i] += 1 # new 'storage' for everyone 516 if server_version == V1: 517 introducer = old.IntroducerService_v1() 518 else: 519 introducer = IntroducerService() 520 newfurl = self.central_tub.registerReference(introducer, 288 521 furlFile=iff) 289 522 assert newfurl == self.introducer_furl 290 523 d.addCallback(_restart_introducer) 291 def _wait_for_introducer_reconnect2(): 292 # wait until: 293 # all clients are connected 294 # the introducer has received publish messages from all of them 295 # the introducer has received subscribe messages from all of them 296 # the introducer has sent announcements for everybody to everybody 297 # all clients have received all the (duplicate) announcements 298 # at that point, the system should be quiescent 299 dc = introducer._debug_counts 300 for c in clients: 301 if not c.connected_to_introducer(): 302 return False 303 if dc["inbound_message"] < self.expected_message_count: 304 return False 305 if dc["outbound_announcements"] < self.expected_announcement_count: 306 return False 307 if dc["inbound_subscribe"] < self.expected_subscribe_count: 308 return False 309 for c in subscribing_clients: 310 cdc = c._debug_counts 311 if cdc["inbound_announcement"] < c._debug1+NUM_SERVERS: 312 return False 313 return True 314 d.addCallback(lambda res: self.poll(_wait_for_introducer_reconnect2)) 524 525 d.addCallback(_wait_for_connected) 526 d.addCallback(_wait_for_expected_announcements) 527 d.addCallback(_wait_until_idle) 315 528 316 529 def _check3(res): 317 530 log.msg("doing _check3") 318 for c in clients: 319 self.failUnless(c.connected_to_introducer()) 531 dc = introducer._debug_counts 532 self.failUnlessEqual(dc["outbound_announcements"], 533 NUM_STORAGE*NUM_CLIENTS) 534 self.failUnless(dc["outbound_message"] > 0) 535 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) 320 536 for c in subscribing_clients: 321 537 cdc = c._debug_counts 322 self.failUnless(cdc["inbound_announcement"] > c._debug1) 323 self.failUnless(cdc["inbound_message"] > c._debug2) 324 # there should have been no new announcements 325 self.failUnlessEqual(cdc["new_announcement"], c._debug3) 326 # and the right number of duplicate ones. There were 327 # NUM_SERVERS from the servertub restart, and there should be 328 # another NUM_SERVERS now 329 self.failUnlessEqual(cdc["duplicate_announcement"], 330 2*NUM_SERVERS) 538 self.failUnless(cdc["inbound_message"] > 0) 539 self.failUnlessEqual(cdc["inbound_announcement"], NUM_STORAGE) 540 self.failUnlessEqual(cdc["new_announcement"], 0) 541 self.failUnlessEqual(cdc["wrong_service"], 0) 542 self.failUnlessEqual(cdc["duplicate_announcement"], NUM_STORAGE) 331 543 332 544 d.addCallback(_check3) 333 545 return d 334 546 547 548 def test_system_v2_server(self): 549 self.basedir = "introducer/SystemTest/system_v2_server" 550 os.makedirs(self.basedir) 551 return self.do_system_test(V2) 552 test_system_v2_server.timeout = 480 553 # occasionally takes longer than 350s on "draco" 554 555 def test_system_v1_server(self): 556 self.basedir = "introducer/SystemTest/system_v1_server" 557 os.makedirs(self.basedir) 558 return self.do_system_test(V1) 559 test_system_v1_server.timeout = 480 560 # occasionally takes longer than 350s on "draco" 561 562 from allmydata.util import base32 563 class FakeRemoteReference: 564 def notifyOnDisconnect(self, *args, **kwargs): pass 565 def getRemoteTubID(self): return "62ubehyunnyhzs7r6vdonnm2hpi52w6y" 566 567 class ClientInfo(unittest.TestCase): 568 def test_client_v2(self): 569 introducer = IntroducerService() 570 tub = introducer_furl = None 571 app_versions = {"whizzy": "fizzy"} 572 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", 573 "my_version", "oldest", app_versions) 574 #furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" 575 #ann_s = client_v2.create_announcement(furl1, "storage", "RIStorage") 576 #introducer.remote_publish_v2(ann_s, Referenceable()) 577 subscriber = FakeRemoteReference() 578 introducer.remote_subscribe_v2(subscriber, "storage", 579 client_v2._my_subscriber_info) 580 s = introducer.get_subscribers() 581 self.failUnlessEqual(len(s), 1) 582 sn, when, si, rref = s[0] 583 self.failUnlessIdentical(rref, subscriber) 584 self.failUnlessEqual(sn, "storage") 585 self.failUnlessEqual(si["version"], 0) 586 self.failUnlessEqual(si["oldest-supported"], "oldest") 587 self.failUnlessEqual(si["app-versions"], app_versions) 588 self.failUnlessEqual(si["nickname"], u"nick-v2") 589 self.failUnlessEqual(si["my-version"], "my_version") 590 591 def test_client_v1(self): 592 introducer = IntroducerService() 593 tub = introducer_furl = None 594 client_v1 = old.IntroducerClient_v1(tub, introducer_furl, u"nick-v1", 595 "my_version", "oldest") 596 subscriber = FakeRemoteReference() 597 introducer.remote_subscribe(subscriber, "storage") 598 # the v1 subscribe interface had no subscriber_info: that was usually 599 # sent in a separate stub_client pseudo-announcement 600 s = introducer.get_subscribers() 601 self.failUnlessEqual(len(s), 1) 602 sn, when, si, rref = s[0] 603 # rref will be a SubscriberAdapter_v1 around the real subscriber 604 self.failUnlessIdentical(rref.original, subscriber) 605 self.failUnlessEqual(si, None) # not known yet 606 self.failUnlessEqual(sn, "storage") 607 608 # now submit the stub_client announcement 609 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" 610 ann = (furl1, "stub_client", "RIStubClient", 611 u"nick-v1".encode("utf-8"), "my_version", "oldest") 612 introducer.remote_publish(ann) 613 # the server should correlate the two 614 s = introducer.get_subscribers() 615 self.failUnlessEqual(len(s), 1) 616 sn, when, si, rref = s[0] 617 self.failUnlessIdentical(rref.original, subscriber) 618 self.failUnlessEqual(sn, "storage") 619 620 self.failUnlessEqual(si["version"], 0) 621 self.failUnlessEqual(si["oldest-supported"], "oldest") 622 # v1 announcements do not contain app-versions 623 self.failUnlessEqual(si["app-versions"], {}) 624 self.failUnlessEqual(si["nickname"], u"nick-v1") 625 self.failUnlessEqual(si["my-version"], "my_version") 626 627 # a subscription that arrives after the stub_client announcement 628 # should be correlated too 629 subscriber2 = FakeRemoteReference() 630 introducer.remote_subscribe(subscriber2, "thing2") 631 632 s = introducer.get_subscribers() 633 subs = dict([(sn, (si,rref)) for sn, when, si, rref in s]) 634 self.failUnlessEqual(len(subs), 2) 635 (si,rref) = subs["thing2"] 636 self.failUnlessIdentical(rref.original, subscriber2) 637 self.failUnlessEqual(si["version"], 0) 638 self.failUnlessEqual(si["oldest-supported"], "oldest") 639 # v1 announcements do not contain app-versions 640 self.failUnlessEqual(si["app-versions"], {}) 641 self.failUnlessEqual(si["nickname"], u"nick-v1") 642 self.failUnlessEqual(si["my-version"], "my_version") 643 644 class Announcements(unittest.TestCase): 645 def test_client_v2_unsigned(self): 646 introducer = IntroducerService() 647 tub = introducer_furl = None 648 app_versions = {"whizzy": "fizzy"} 649 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", 650 "my_version", "oldest", app_versions) 651 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" 652 serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y") 653 ann_s0 = client_v2.create_announcement(furl1, "storage", "RIStorage") 654 canary0 = Referenceable() 655 introducer.remote_publish_v2(ann_s0, canary0) 656 a = introducer.get_announcements() 657 self.failUnlessEqual(len(a), 1) 658 (index, (ann_s, canary, ann_d, when)) = a.items()[0] 659 self.failUnlessIdentical(canary, canary0) 660 self.failUnlessEqual(index, ("storage", serverid)) 661 self.failUnlessEqual(ann_d["app-versions"], app_versions) 662 self.failUnlessEqual(ann_d["nickname"], u"nick-v2") 663 self.failUnlessEqual(ann_d["service-name"], "storage") 664 self.failUnlessEqual(ann_d["my-version"], "my_version") 665 self.failUnlessEqual(ann_d["FURL"], furl1) 666 667 def test_client_v2_signed(self): 668 introducer = IntroducerService() 669 tub = introducer_furl = None 670 app_versions = {"whizzy": "fizzy"} 671 client_v2 = IntroducerClient(tub, introducer_furl, u"nick-v2", 672 "my_version", "oldest", app_versions) 673 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" 674 serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y") 675 sk = ecdsa.SigningKey.generate() 676 pk = sk.get_verifying_key() 677 pks = pk.to_string() 678 ann_s0 = client_v2.create_announcement(furl1, "storage", "RIStorage", sk) 679 canary0 = Referenceable() 680 introducer.remote_publish_v2(ann_s0, canary0) 681 a = introducer.get_announcements() 682 self.failUnlessEqual(len(a), 1) 683 (index, (ann_s, canary, ann_d, when)) = a.items()[0] 684 self.failUnlessIdentical(canary, canary0) 685 self.failUnlessEqual(index, ("storage", pks)) # index is pubkey string 686 self.failUnlessEqual(ann_d["app-versions"], app_versions) 687 self.failUnlessEqual(ann_d["nickname"], u"nick-v2") 688 self.failUnlessEqual(ann_d["service-name"], "storage") 689 self.failUnlessEqual(ann_d["my-version"], "my_version") 690 self.failUnlessEqual(ann_d["FURL"], furl1) 691 692 def test_client_v1(self): 693 introducer = IntroducerService() 694 tub = introducer_furl = None 695 696 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:0/swissnum" 697 serverid = base32.a2b("62ubehyunnyhzs7r6vdonnm2hpi52w6y") 698 ann = (furl1, "storage", "RIStorage", 699 u"nick-v1".encode("utf-8"), "my_version", "oldest") 700 introducer.remote_publish(ann) 701 702 a = introducer.get_announcements() 703 self.failUnlessEqual(len(a), 1) 704 (index, (ann_s, canary, ann_d, when)) = a.items()[0] 705 self.failUnlessEqual(canary, None) 706 self.failUnlessEqual(index, ("storage", serverid)) 707 self.failUnlessEqual(ann_d["app-versions"], {}) 708 self.failUnlessEqual(ann_d["nickname"], u"nick-v1".encode("utf-8")) 709 self.failUnlessEqual(ann_d["service-name"], "storage") 710 self.failUnlessEqual(ann_d["my-version"], "my_version") 711 self.failUnlessEqual(ann_d["FURL"], furl1) 712 713 335 714 class TooNewServer(IntroducerService): 336 715 VERSION = { "http://allmydata.org/tahoe/protocols/introducer/v999": 337 716 { }, … … class NonV1Server(SystemTestMixin, unittest.TestCase): 359 738 tub.setLocation("localhost:%d" % portnum) 360 739 361 740 c = IntroducerClient(tub, self.introducer_furl, 362 u"nickname-client", "version", "oldest" )741 u"nickname-client", "version", "oldest", {}) 363 742 announcements = {} 364 743 def got(serverid, ann_d): 365 744 announcements[serverid] = ann_d … … class DecodeFurl(unittest.TestCase): 388 767 nodeid = b32decode(m.group(1).upper()) 389 768 self.failUnlessEqual(nodeid, "\x9fM\xf2\x19\xcckU0\xbf\x03\r\x10\x99\xfb&\x9b-\xc7A\x1d") 390 769 770 771 # add tests of StorageFarmBroker: if it receives duplicate announcements, it 772 # should leave the Reconnector in place, also if it receives 773 # same-FURL-different-misc, but if it receives same-nodeid-different-FURL, it 774 # should tear down the Reconnector and make a new one. This behavior used to 775 # live in the IntroducerClient, and thus used to be tested by test_introducer 776 777 # copying more tests from old branch: 778 779 # then also add Upgrade test -
src/allmydata/test/test_system.py
diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index bf6af09..b7b3c22 100644
a b class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): 778 778 newappverstr = "%s: %s" % (allmydata.__appname__, altverstr) 779 779 780 780 self.failUnless((appverstr in res) or (newappverstr in res), (appverstr, newappverstr, res)) 781 self.failUnless("Announcement Summary: storage: 5 , stub_client: 5" in res)781 self.failUnless("Announcement Summary: storage: 5" in res) 782 782 self.failUnless("Subscription Summary: storage: 5" in res) 783 783 except unittest.FailTest: 784 784 print … … class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): 795 795 self.failUnlessEqual(data["subscription_summary"], 796 796 {"storage": 5}) 797 797 self.failUnlessEqual(data["announcement_summary"], 798 {"storage": 5 , "stub_client": 5})798 {"storage": 5}) 799 799 self.failUnlessEqual(data["announcement_distinct_hosts"], 800 {"storage": 1 , "stub_client": 1})800 {"storage": 1}) 801 801 except unittest.FailTest: 802 802 print 803 803 print "GET %s?t=json output was:" % self.introweb_url -
src/allmydata/web/introweb.py
diff --git a/src/allmydata/web/introweb.py b/src/allmydata/web/introweb.py index 28273bd..be950c4 100644
a b class IntroducerRoot(rend.Page): 29 29 30 30 def render_JSON(self, ctx): 31 31 res = {} 32 clients = self.introducer_service.get_subscribers() 33 subscription_summary = dict([ (name, len(clients[name])) 34 for name in clients ]) 35 res["subscription_summary"] = subscription_summary 32 33 counts = {} 34 subscribers = self.introducer_service.get_subscribers() 35 for (service_name, ign, ign, ign) in subscribers: 36 if service_name not in counts: 37 counts[service_name] = 0 38 counts[service_name] += 1 39 res["subscription_summary"] = counts 36 40 37 41 announcement_summary = {} 38 42 service_hosts = {} 39 for (ann,when) in self.introducer_service.get_announcements().values(): 40 (furl, service_name, ri_name, nickname, ver, oldest) = ann 43 for a in self.introducer_service.get_announcements().values(): 44 (_, _, ann_d, when) = a 45 service_name = ann_d["service-name"] 41 46 if service_name not in announcement_summary: 42 47 announcement_summary[service_name] = 0 43 48 announcement_summary[service_name] += 1 … … class IntroducerRoot(rend.Page): 50 55 # enough: when multiple services are run on a single host, 51 56 # they're usually either configured with the same addresses, 52 57 # or setLocationAutomatically picks up the same interfaces. 58 furl = ann_d["FURL"] 53 59 locations = SturdyRef(furl).getTubRef().getLocations() 54 60 # list of tuples, ("ipv4", host, port) 55 61 host = frozenset([hint[1] … … class IntroducerRoot(rend.Page): 74 80 75 81 def render_announcement_summary(self, ctx, data): 76 82 services = {} 77 for (ann,when) in self.introducer_service.get_announcements().values(): 78 (furl, service_name, ri_name, nickname, ver, oldest) = ann 83 for a in self.introducer_service.get_announcements().values(): 84 (_, _, ann_d, when) = a 85 service_name = ann_d["service-name"] 79 86 if service_name not in services: 80 87 services[service_name] = 0 81 88 services[service_name] += 1 … … class IntroducerRoot(rend.Page): 85 92 for service_name in service_names]) 86 93 87 94 def render_client_summary(self, ctx, data): 95 counts = {} 88 96 clients = self.introducer_service.get_subscribers() 89 service_names = clients.keys() 90 service_names.sort() 91 return ", ".join(["%s: %d" % (service_name, len(clients[service_name])) 92 for service_name in service_names]) 97 for (service_name, ign, ign, ign) in clients: 98 if service_name not in counts: 99 counts[service_name] = 0 100 counts[service_name] += 1 101 return ", ".join([ "%s: %d" % (name, counts[name]) 102 for name in sorted(counts.keys()) ] ) 93 103 94 104 def data_services(self, ctx, data): 95 105 introsvc = self.introducer_service 96 ann = [(since,a) 97 for (a,since) in introsvc.get_announcements().values() 98 if a[1] != "stub_client"] 99 ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) 106 ann = [] 107 for a in introsvc.get_announcements().values(): 108 (_, _, ann_d, when) = a 109 if ann_d["service-name"] == "stub_client": 110 continue 111 ann.append( (when, ann_d) ) 112 ann.sort(key=lambda x: (x[1]["service-name"], x[1]["nickname"])) 113 # this used to be: 114 #ann.sort(lambda a,b: cmp( (a[1][1], a), (b[1][1], b) ) ) 115 # service_name was the primary key, then the whole tuple (starting 116 # with the furl) was the secondary key 100 117 return ann 101 118 102 def render_service_row(self, ctx, (since,announcement)): 103 (furl, service_name, ri_name, nickname, ver, oldest) = announcement 104 sr = SturdyRef(furl) 119 def render_service_row(self, ctx, (since,ann_d)): 120 sr = SturdyRef(ann_d["FURL"]) 105 121 nodeid = sr.tubID 106 122 advertised = self.show_location_hints(sr) 107 ctx.fillSlots("peerid", "%s %s" % (nodeid, nickname))123 ctx.fillSlots("peerid", "%s %s" % (nodeid, ann_d["nickname"])) 108 124 ctx.fillSlots("advertised", " ".join(advertised)) 109 125 ctx.fillSlots("connected", "?") 110 126 TIME_FORMAT = "%H:%M:%S %d-%b-%Y" 111 127 ctx.fillSlots("announced", 112 128 time.strftime(TIME_FORMAT, time.localtime(since))) 113 ctx.fillSlots("version", ver)114 ctx.fillSlots("service_name", service_name)129 ctx.fillSlots("version", ann_d["my-version"]) 130 ctx.fillSlots("service_name", ann_d["service-name"]) 115 131 return ctx.tag 116 132 117 133 def data_subscribers(self, ctx, data): 118 # use the "stub_client" announcements to get information per nodeid 119 clients = {} 120 for (ann,when) in self.introducer_service.get_announcements().values(): 121 if ann[1] != "stub_client": 122 continue 123 (furl, service_name, ri_name, nickname, ver, oldest) = ann 124 sr = SturdyRef(furl) 125 nodeid = sr.tubID 126 clients[nodeid] = ann 127 128 # then we actually provide information per subscriber 129 s = [] 130 introsvc = self.introducer_service 131 for service_name, subscribers in introsvc.get_subscribers().items(): 132 for (rref, timestamp) in subscribers.items(): 133 sr = rref.getSturdyRef() 134 nodeid = sr.tubID 135 ann = clients.get(nodeid) 136 s.append( (service_name, rref, timestamp, ann) ) 137 s.sort() 138 return s 134 return self.introducer_service.get_subscribers() 139 135 140 136 def render_subscriber_row(self, ctx, s): 141 (service_name, rref, since, ann) = s 142 nickname = "?" 143 version = "?" 144 if ann: 145 (furl, service_name_2, ri_name, nickname, version, oldest) = ann 137 (service_name, since, info, rref) = s 138 nickname = info.get("nickname", "?") 139 version = info.get("my-version", "?") 146 140 147 141 sr = rref.getSturdyRef() 148 142 # if the subscriber didn't do Tub.setLocation, nodeid will be None