source file: /home/buildslave/tahoe/edgy/build/src/allmydata/introducer/client.py
file stats: 144 lines, 134 executed: 93.1% covered
coverage versus previous test: 0 lines added, 0 lines removed
1.
2. from base64 import b32decode
3. from zope.interface import implements
4. from twisted.application import service
5. from foolscap.api import Referenceable, SturdyRef, eventually
6. from allmydata.interfaces import InsufficientVersionError
7. from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
8. IIntroducerClient
9. from allmydata.util import log, idlib
10. from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
11.
12.
13. class IntroducerClient(service.Service, Referenceable):
14. implements(RIIntroducerSubscriberClient, IIntroducerClient)
15.
16. def __init__(self, tub, introducer_furl,
17. nickname, my_version, oldest_supported):
18. self._tub = tub
19. self.introducer_furl = introducer_furl
20.
21. assert type(nickname) is unicode
22. self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
23. self._my_version = my_version
24. self._oldest_supported = oldest_supported
25.
26. self._published_announcements = set()
27.
28. self._publisher = None
29.
30. self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
31. self._subscribed_service_names = set()
32. self._subscriptions = set() # requests we've actually sent
33.
34. # _current_announcements remembers one announcement per
35. # (servicename,serverid) pair. Anything that arrives with the same
36. # pair will displace the previous one. This stores unpacked
37. # announcement dictionaries, which can be compared for equality to
38. # distinguish re-announcement from updates. It also provides memory
39. # for clients who subscribe after startup.
40. self._current_announcements = {}
41.
42. self.encoding_parameters = None
43.
44. # hooks for unit tests
45. self._debug_counts = {
46. "inbound_message": 0,
47. "inbound_announcement": 0,
48. "wrong_service": 0,
49. "duplicate_announcement": 0,
50. "update": 0,
51. "new_announcement": 0,
52. "outbound_message": 0,
53. }
54.
55. def startService(self):
56. service.Service.startService(self)
57. self._introducer_error = None
58. rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
59. self._introducer_reconnector = rc
60. def connect_failed(failure):
61. self.log("Initial Introducer connection failed: perhaps it's down",
62. level=log.WEIRD, failure=failure, umid="c5MqUQ")
63. d = self._tub.getReference(self.introducer_furl)
64. d.addErrback(connect_failed)
65.
66. def _got_introducer(self, publisher):
67. self.log("connected to introducer, getting versions")
68. default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
69. { },
70. "application-version": "unknown: no get_version()",
71. }
72. d = add_version_to_remote_reference(publisher, default)
73. d.addCallback(self._got_versioned_introducer)
74. d.addErrback(self._got_error)
75.
76. def _got_error(self, f):
77. # TODO: for the introducer, perhaps this should halt the application
78. self._introducer_error = f # polled by tests
79.
80. def _got_versioned_introducer(self, publisher):
81. self.log("got introducer version: %s" % (publisher.version,))
82. # we require a V1 introducer
83. needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
84. if needed not in publisher.version:
85. raise InsufficientVersionError(needed, publisher.version)
86. self._publisher = publisher
87. publisher.notifyOnDisconnect(self._disconnected)
88. self._maybe_publish()
89. self._maybe_subscribe()
90.
91. def _disconnected(self):
92. self.log("bummer, we've lost our connection to the introducer")
93. self._publisher = None
94. self._subscriptions.clear()
95.
96. def log(self, *args, **kwargs):
97. if "facility" not in kwargs:
98. kwargs["facility"] = "tahoe.introducer"
99. return log.msg(*args, **kwargs)
100.
101.
102. def publish(self, furl, service_name, remoteinterface_name):
103. assert type(self._nickname_utf8) is str # we always send UTF-8
104. ann = (furl, service_name, remoteinterface_name,
105. self._nickname_utf8, self._my_version, self._oldest_supported)
106. self._published_announcements.add(ann)
107. self._maybe_publish()
108.
109. def subscribe_to(self, service_name, cb, *args, **kwargs):
110. self._local_subscribers.append( (service_name,cb,args,kwargs) )
111. self._subscribed_service_names.add(service_name)
112. self._maybe_subscribe()
113. for (servicename,nodeid),ann_d in self._current_announcements.items():
114. if servicename == service_name:
115. eventually(cb, nodeid, ann_d)
116.
117. def _maybe_subscribe(self):
118. if not self._publisher:
119. self.log("want to subscribe, but no introducer yet",
120. level=log.NOISY)
121. return
122. for service_name in self._subscribed_service_names:
123. if service_name not in self._subscriptions:
124. # there is a race here, but the subscription desk ignores
125. # duplicate requests.
126. self._subscriptions.add(service_name)
127. d = self._publisher.callRemote("subscribe", self, service_name)
128. d.addErrback(trap_deadref)
129. d.addErrback(log.err, format="server errored during subscribe",
130. facility="tahoe.introducer",
131. level=log.WEIRD, umid="2uMScQ")
132.
133. def _maybe_publish(self):
134. if not self._publisher:
135. self.log("want to publish, but no introducer yet", level=log.NOISY)
136. return
137. # this re-publishes everything. The Introducer ignores duplicates
138. for ann in self._published_announcements:
139. self._debug_counts["outbound_message"] += 1
140. d = self._publisher.callRemote("publish", ann)
141. d.addErrback(trap_deadref)
142. d.addErrback(log.err,
143. format="server errored during publish %(ann)s",
144. ann=ann, facility="tahoe.introducer",
145. level=log.WEIRD, umid="xs9pVQ")
146.
147.
148.
149. def remote_announce(self, announcements):
150. self.log("received %d announcements" % len(announcements))
151. self._debug_counts["inbound_message"] += 1
152. for ann in announcements:
153. try:
154. self._process_announcement(ann)
155. except:
156. log.err(format="unable to process announcement %(ann)s",
157. ann=ann)
158. # Don't let a corrupt announcement prevent us from processing
159. # the remaining ones. Don't return an error to the server,
160. # since they'd just ignore it anyways.
161. pass
162.
163. def _process_announcement(self, ann):
164. self._debug_counts["inbound_announcement"] += 1
165. (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
166. if service_name not in self._subscribed_service_names:
167. self.log("announcement for a service we don't care about [%s]"
168. % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
169. self._debug_counts["wrong_service"] += 1
170. return
171. self.log("announcement for [%s]: %s" % (service_name, ann),
172. umid="BoKEag")
173. assert type(furl) is str
174. assert type(service_name) is str
175. assert type(ri_name) is str
176. assert type(nickname_utf8) is str
177. nickname = nickname_utf8.decode("utf-8")
178. assert type(nickname) is unicode
179. assert type(ver) is str
180. assert type(oldest) is str
181.
182. nodeid = b32decode(SturdyRef(furl).tubID.upper())
183. nodeid_s = idlib.shortnodeid_b2a(nodeid)
184.
185. ann_d = { "version": 0,
186. "service-name": service_name,
187.
188. "FURL": furl,
189. "nickname": nickname,
190. "app-versions": {}, # need #466 and v2 introducer
191. "my-version": ver,
192. "oldest-supported": oldest,
193. }
194.
195. index = (service_name, nodeid)
196. if self._current_announcements.get(index, None) == ann_d:
197. self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
198. service=service_name, nodeid=nodeid_s,
199. level=log.UNUSUAL, umid="B1MIdA")
200. self._debug_counts["duplicate_announcement"] += 1
201. return
202. if index in self._current_announcements:
203. self._debug_counts["update"] += 1
204. else:
205. self._debug_counts["new_announcement"] += 1
206.
207. self._current_announcements[index] = ann_d
208. # note: we never forget an index, but we might update its value
209.
210. for (service_name2,cb,args,kwargs) in self._local_subscribers:
211. if service_name2 == service_name:
212. eventually(cb, nodeid, ann_d, *args, **kwargs)
213.
214. def remote_set_encoding_parameters(self, parameters):
215. self.encoding_parameters = parameters
216.
217. def connected_to_introducer(self):
218. return bool(self._publisher)