Changeset ea35563 in trunk
- Timestamp:
- 2016-06-02T16:47:58Z (9 years ago)
- Branches:
- master
- Children:
- 7feee8a2
- Parents:
- 73b08d2
- Location:
- src/allmydata
- Files:
-
- 1 deleted
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/introducer/client.py ¶
r73b08d2 rea35563 3 3 from zope.interface import implements 4 4 from twisted.application import service 5 from twisted.internet import defer 5 6 from foolscap.api import Referenceable, eventually, RemoteInterface 6 7 from allmydata.interfaces import InsufficientVersionError 7 8 from allmydata.introducer.interfaces import IIntroducerClient, \ 8 RIIntroducerSubscriberClient_v 1, RIIntroducerSubscriberClient_v29 RIIntroducerSubscriberClient_v2 9 10 from allmydata.introducer.common import sign_to_foolscap, unsign_from_foolscap,\ 10 convert_announcement_v1_to_v2, convert_announcement_v2_to_v1, \11 11 make_index, get_tubid_string_from_ann, get_tubid_string 12 12 from allmydata.util import log … … 16 16 class InvalidCacheError(Exception): 17 17 pass 18 19 class WrapV2ClientInV1Interface(Referenceable): # for_v120 """I wrap a v2 IntroducerClient to make it look like a v1 client, so it21 can be attached to an old server."""22 implements(RIIntroducerSubscriberClient_v1)23 24 def __init__(self, original):25 self.original = original26 27 def remote_announce(self, announcements):28 lp = self.original.log("received %d announcements (v1)" %29 len(announcements))30 anns_v1 = set([convert_announcement_v1_to_v2(ann_v1)31 for ann_v1 in announcements])32 return self.original.got_announcements(anns_v1, lp)33 34 class RIStubClient(RemoteInterface): # for_v135 """Each client publishes a service announcement for a dummy object called36 the StubClient. This object doesn't actually offer any services, but the37 announcement helps the Introducer keep track of which clients are38 subscribed (so the grid admin can keep track of things like the size of39 the grid and the client versions in use. This is the (empty)40 RemoteInterface for the StubClient."""41 42 class StubClient(Referenceable): # for_v143 implements(RIStubClient)44 18 45 19 V1 = "http://allmydata.org/tahoe/protocols/introducer/v1" … … 69 43 "oldest-supported": self._oldest_supported, 70 44 } 71 self._stub_client = None # for_v172 self._stub_client_furl = None73 45 74 46 self._outbound_announcements = {} # not signed … … 157 129 def _got_introducer(self, publisher): 158 130 self.log("connected to introducer, getting versions") 159 default = { "http://allmydata.org/tahoe/protocols/introducer/v 1":131 default = { "http://allmydata.org/tahoe/protocols/introducer/v2": 160 132 { }, 161 133 "application-version": "unknown: no get_version()", … … 171 143 def _got_versioned_introducer(self, publisher): 172 144 self.log("got introducer version: %s" % (publisher.version,)) 173 # we require an introducer that speaks at least one of (V1, V2)174 if not (V1 in publisher.version or V2 in publisher.version):175 raise InsufficientVersionError("V 1 or V2", publisher.version)145 # we require an introducer that speaks at least V2 146 if V2 not in publisher.version: 147 raise InsufficientVersionError("V2", publisher.version) 176 148 self._publisher = publisher 177 149 publisher.notifyOnDisconnect(self._disconnected) … … 214 186 d.addBoth(self._debug_retired) 215 187 else: 216 d = self._subscribe_handle_v1(service_name) # for_v1188 d = defer.fail(InsufficientVersionError("V2", self._publisher.version)) 217 189 d.addErrback(log.err, facility="tahoe.introducer.client", 218 190 level=log.WEIRD, umid="2uMScQ") 219 220 def _subscribe_handle_v1(self, service_name): # for_v1221 # they don't speak V2: must be a v1 introducer. Fall back to the v1222 # 'subscribe' method, using a client adapter.223 ca = WrapV2ClientInV1Interface(self)224 self._debug_outstanding += 1225 d = self._publisher.callRemote("subscribe", ca, service_name)226 d.addBoth(self._debug_retired)227 # We must also publish an empty 'stub_client' object, so the228 # introducer can count how many clients are connected and see what229 # versions they're running.230 if not self._stub_client_furl:231 self._stub_client = sc = StubClient()232 self._stub_client_furl = self._tub.registerReference(sc)233 def _publish_stub_client(ignored):234 furl = self._stub_client_furl235 self.publish("stub_client",236 { "anonymous-storage-FURL": furl,237 "permutation-seed-base32": get_tubid_string(furl),238 })239 d.addCallback(_publish_stub_client)240 return d241 191 242 192 def create_announcement_dict(self, service_name, ann): … … 282 232 d.addBoth(self._debug_retired) 283 233 else: 284 d = self._handle_v1_publisher(ann_t) # for_v1234 d = defer.fail(InsufficientVersionError("V2", self._publisher.version)) 285 235 d.addErrback(log.err, ann_t=ann_t, 286 236 facility="tahoe.introducer.client", 287 237 level=log.WEIRD, umid="xs9pVQ") 288 238 289 def _handle_v1_publisher(self, ann_t): # for_v1290 # they don't speak V2, so fall back to the old 'publish' method291 # (which takes an unsigned tuple of bytestrings)292 self.log("falling back to publish_v1",293 level=log.UNUSUAL, umid="9RCT1A")294 ann_v1 = convert_announcement_v2_to_v1(ann_t)295 self._debug_outstanding += 1296 d = self._publisher.callRemote("publish", ann_v1)297 d.addBoth(self._debug_retired)298 return d299 300 301 239 def remote_announce_v2(self, announcements): 302 240 lp = self.log("received %d announcements (v2)" % len(announcements)) … … 304 242 305 243 def got_announcements(self, announcements, lp=None): 306 # this is the common entry point for both v1 and v2announcements244 # this is the common entry point for announcements 307 245 self._debug_counts["inbound_message"] += 1 308 246 for ann_t in announcements: -
TabularUnified src/allmydata/introducer/interfaces.py ¶
r73b08d2 rea35563 3 3 from foolscap.api import StringConstraint, TupleOf, SetOf, DictOf, Any, \ 4 4 RemoteInterface, Referenceable 5 from old import RIIntroducerSubscriberClient_v16 5 FURL = StringConstraint(1000) 7 8 # old introducer protocol (v1):9 #10 # Announcements are (FURL, service_name, remoteinterface_name,11 # nickname, my_version, oldest_supported)12 # the (FURL, service_name, remoteinterface_name) refer to the service being13 # announced. The (nickname, my_version, oldest_supported) refer to the14 # client as a whole. The my_version/oldest_supported strings can be parsed15 # by an allmydata.util.version.Version instance, and then compared. The16 # first goal is to make sure that nodes are not confused by speaking to an17 # incompatible peer. The second goal is to enable the development of18 # backwards-compatibility code.19 20 Announcement_v1 = TupleOf(FURL, str, str,21 str, str, str)22 6 23 7 # v2 protocol over foolscap: Announcements are 3-tuples of (bytes, str, str) … … 42 26 def get_version(): 43 27 return DictOf(str, Any()) 44 def publish(announcement=Announcement_v1):45 return None46 28 def publish_v2(announcement=Announcement_v2, canary=Referenceable): 47 return None48 def subscribe(subscriber=RIIntroducerSubscriberClient_v1, service_name=str):49 29 return None 50 30 def subscribe_v2(subscriber=RIIntroducerSubscriberClient_v2, -
TabularUnified src/allmydata/test/test_introducer.py ¶
r73b08d2 rea35563 12 12 from twisted.application import service 13 13 from allmydata.interfaces import InsufficientVersionError 14 from allmydata.introducer.client import IntroducerClient, \ 15 WrapV2ClientInV1Interface 14 from allmydata.introducer.client import IntroducerClient 16 15 from allmydata.introducer.server import IntroducerService, FurlFileConflictError 17 16 from allmydata.introducer.common import get_tubid_string_from_ann, \ 18 17 get_tubid_string, sign_to_foolscap, unsign_from_foolscap, \ 19 18 UnknownKeyError 20 from allmydata.introducer import old21 19 # test compatibility with old introducer .tac files 22 20 from allmydata.introducer import IntroducerNode … … 173 171 174 172 class Client(unittest.TestCase): 175 def test_duplicate_receive_v1(self):176 ic = IntroducerClient(None,177 "introducer.furl", u"my_nickname",178 "my_version", "oldest_version", {}, fakeseq,179 FilePath(self.mktemp()))180 announcements = []181 ic.subscribe_to("storage",182 lambda key_s,ann: announcements.append(ann))183 furl1 = "pb://62ubehyunnyhzs7r6vdonnm2hpi52w6y@127.0.0.1:36106/gydnpigj2ja2qr2srq4ikjwnl7xfgbra"184 ann1 = (furl1, "storage", "RIStorage", "nick1", "ver23", "ver0")185 ann1b = (furl1, "storage", "RIStorage", "nick1", "ver24", "ver0")186 ca = WrapV2ClientInV1Interface(ic)187 188 ca.remote_announce([ann1])189 d = fireEventually()190 def _then(ign):191 self.failUnlessEqual(len(announcements), 1)192 self.failUnlessEqual(announcements[0]["nickname"], u"nick1")193 self.failUnlessEqual(announcements[0]["my-version"], "ver23")194 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 1)195 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)196 self.failUnlessEqual(ic._debug_counts["update"], 0)197 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 0)198 # now send a duplicate announcement: this should not notify clients199 ca.remote_announce([ann1])200 return fireEventually()201 d.addCallback(_then)202 def _then2(ign):203 self.failUnlessEqual(len(announcements), 1)204 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 2)205 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)206 self.failUnlessEqual(ic._debug_counts["update"], 0)207 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)208 # and a replacement announcement: same FURL, new other stuff.209 # Clients should be notified.210 ca.remote_announce([ann1b])211 return fireEventually()212 d.addCallback(_then2)213 def _then3(ign):214 self.failUnlessEqual(len(announcements), 2)215 self.failUnlessEqual(ic._debug_counts["inbound_announcement"], 3)216 self.failUnlessEqual(ic._debug_counts["new_announcement"], 1)217 self.failUnlessEqual(ic._debug_counts["update"], 1)218 self.failUnlessEqual(ic._debug_counts["duplicate_announcement"], 1)219 # test that the other stuff changed220 self.failUnlessEqual(announcements[-1]["nickname"], u"nick1")221 self.failUnlessEqual(announcements[-1]["my-version"], "ver24")222 d.addCallback(_then3)223 return d224 225 173 def test_duplicate_receive_v2(self): 226 174 ic1 = IntroducerClient(None, … … 331 279 return d 332 280 333 def test_id_collision(self):334 # test replacement case where tubid equals a keyid (one should335 # not replace the other)336 ic = IntroducerClient(None,337 "introducer.furl", u"my_nickname",338 "my_version", "oldest_version", {}, fakeseq,339 FilePath(self.mktemp()))340 announcements = []341 ic.subscribe_to("storage",342 lambda key_s,ann: announcements.append(ann))343 sk_s, vk_s = keyutil.make_keypair()344 sk, _ignored = keyutil.parse_privkey(sk_s)345 keyid = keyutil.remove_prefix(vk_s, "pub-v0-")346 furl1 = "pb://onug64tu@127.0.0.1:123/short" # base32("short")347 furl2 = "pb://%s@127.0.0.1:36106/swissnum" % keyid348 ann_t = make_ann_t(ic, furl1, sk, 1)349 ic.remote_announce_v2([ann_t])350 d = fireEventually()351 def _then(ign):352 # first announcement has been processed353 self.failUnlessEqual(len(announcements), 1)354 self.failUnlessEqual(announcements[0]["anonymous-storage-FURL"],355 furl1)356 # now submit a second one, with a tubid that happens to look just357 # like the pubkey-based serverid we just processed. They should358 # not overlap.359 ann2 = (furl2, "storage", "RIStorage", "nick1", "ver23", "ver0")360 ca = WrapV2ClientInV1Interface(ic)361 ca.remote_announce([ann2])362 return fireEventually()363 d.addCallback(_then)364 def _then2(ign):365 # if they overlapped, the second announcement would be ignored366 self.failUnlessEqual(len(announcements), 2)367 self.failUnlessEqual(announcements[1]["anonymous-storage-FURL"],368 furl2)369 d.addCallback(_then2)370 return d371 372 281 class Server(unittest.TestCase): 373 282 def test_duplicate(self): … … 519 428 class SystemTest(SystemTestMixin, unittest.TestCase): 520 429 521 def do_system_test(self , server_version):430 def do_system_test(self): 522 431 self.create_tub() 523 if server_version == V1: 524 introducer = old.IntroducerService_v1() 525 else: 526 introducer = IntroducerService() 432 introducer = IntroducerService() 527 433 introducer.setServiceParent(self.parent) 528 434 iff = os.path.join(self.basedir, "introducer.furl") … … 559 465 560 466 log.msg("creating client %d: %s" % (i, tub.getShortTubID())) 561 if i == 0: 562 c = old.IntroducerClient_v1(tub, self.introducer_furl, 563 NICKNAME % str(i), 564 "version", "oldest") 565 else: 566 c = IntroducerClient(tub, self.introducer_furl, 567 NICKNAME % str(i), 568 "version", "oldest", 569 {"component": "component-v1"}, fakeseq, 570 FilePath(self.mktemp())) 467 c = IntroducerClient(tub, self.introducer_furl, 468 NICKNAME % str(i), 469 "version", "oldest", 470 {"component": "component-v1"}, fakeseq, 471 FilePath(self.mktemp())) 571 472 received_announcements[c] = {} 572 473 def got(key_s_or_tubid, ann, announcements, i): … … 583 484 if i < NUM_STORAGE: 584 485 if i == 0: 585 c.publish(node_furl, "storage", "ri_name") 586 printable_serverids[i] = get_tubid_string(node_furl) 486 # XXX wtf this makes no sense 487 #c.publish(node_furl, "storage", "ri_name") 488 #printable_serverids[i] = get_tubid_string(node_furl) 489 pass 587 490 elif i == 1: 588 491 # sign the announcement … … 591 494 privkeys[c] = privkey 592 495 c.publish("storage", make_ann(node_furl), privkey) 593 if server_version == V1: 594 printable_serverids[i] = get_tubid_string(node_furl) 595 else: 596 assert pubkey_s.startswith("pub-") 597 printable_serverids[i] = pubkey_s[len("pub-"):] 496 assert pubkey_s.startswith("pub-") 497 printable_serverids[i] = pubkey_s[len("pub-"):] 598 498 else: 599 499 c.publish("storage", make_ann(node_furl)) … … 609 509 # 'storage' record), so the introducer could see their 610 510 # version. Match that behavior. 611 c.publish(node_furl, "stub_client", "stub_ri_name") 511 #c.publish(node_furl, "stub_client", "stub_ri_name") 512 pass 612 513 613 514 if i == 2: … … 662 563 log.msg("doing _check1") 663 564 dc = self.the_introducer._debug_counts 664 if server_version == V1: 665 # each storage server publishes a record, and (after its 666 # 'subscribe' has been ACKed) also publishes a "stub_client". 667 # The non-storage client (which subscribes) also publishes a 668 # stub_client. There is also one "boring" service. The number 669 # of messages is higher, because the stub_clients aren't 670 # published until after we get the 'subscribe' ack (since we 671 # don't realize that we're dealing with a v1 server [which 672 # needs stub_clients] until then), and the act of publishing 673 # the stub_client causes us to re-send all previous 674 # announcements. 675 self.failUnlessEqual(dc["inbound_message"] - dc["inbound_duplicate"], 676 NUM_STORAGE + NUM_CLIENTS + 1) 677 else: 678 # each storage server publishes a record. There is also one 679 # "stub_client" and one "boring" 680 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) 681 self.failUnlessEqual(dc["inbound_duplicate"], 0) 565 # each storage server publishes a record. There is also one 566 # "stub_client" and one "boring" 567 self.failUnlessEqual(dc["inbound_message"], NUM_STORAGE+2) 568 self.failUnlessEqual(dc["inbound_duplicate"], 0) 682 569 self.failUnlessEqual(dc["inbound_update"], 0) 683 570 self.failUnlessEqual(dc["inbound_subscribe"], NUM_CLIENTS) … … 707 594 self.failUnlessEqual(type(nick), unicode) 708 595 self.failUnlessEqual(nick, NICKNAME % "0") 709 if server_version == V1: 710 for c in publishing_clients: 711 cdc = c._debug_counts 712 expected = 1 # storage 713 if c is clients[2]: 714 expected += 1 # boring 715 if c is not clients[0]: 716 # the v2 client tries to call publish_v2, which fails 717 # because the server is v1. It then re-sends 718 # everything it has so far, plus a stub_client record 719 expected = 2*expected + 1 720 if c is clients[0]: 721 # we always tell v1 client to send stub_client 722 expected += 1 723 self.failUnlessEqual(cdc["outbound_message"], expected) 724 else: 725 for c in publishing_clients: 726 cdc = c._debug_counts 727 expected = 1 728 if c in [clients[0], # stub_client 729 clients[2], # boring 730 ]: 731 expected = 2 732 self.failUnlessEqual(cdc["outbound_message"], expected) 596 for c in publishing_clients: 597 cdc = c._debug_counts 598 expected = 1 599 if c in [clients[0], # stub_client 600 clients[2], # boring 601 ]: 602 expected = 2 603 self.failUnlessEqual(cdc["outbound_message"], expected) 733 604 # now check the web status, make sure it renders without error 734 605 ir = introweb.IntroducerRoot(self.parent) … … 826 697 c._debug_counts[k] = 0 827 698 expected_announcements[i] += 1 # new 'storage' for everyone 828 if server_version == V1: 829 introducer = old.IntroducerService_v1() 830 else: 831 introducer = IntroducerService() 699 introducer = IntroducerService() 832 700 self.the_introducer = introducer 833 701 newfurl = self.central_tub.registerReference(self.the_introducer, … … 862 730 self.basedir = "introducer/SystemTest/system_v2_server" 863 731 os.makedirs(self.basedir) 864 return self.do_system_test( V2)732 return self.do_system_test() 865 733 test_system_v2_server.timeout = 480 866 # occasionally takes longer than 350s on "draco"867 868 def test_system_v1_server(self):869 self.basedir = "introducer/SystemTest/system_v1_server"870 os.makedirs(self.basedir)871 return self.do_system_test(V1)872 test_system_v1_server.timeout = 480873 734 # occasionally takes longer than 350s on "draco" 874 735
Note: See TracChangeset
for help on using the changeset viewer.