source file: /home/buildslave/tahoe/edgy/build/src/allmydata/introducer/client.py
file stats: 144 lines, 134 executed: 93.1% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. 
    2. from base64 import b32decode
    3. from zope.interface import implements
    4. from twisted.application import service
    5. from foolscap.api import Referenceable, SturdyRef, eventually
    6. from allmydata.interfaces import InsufficientVersionError
    7. from allmydata.introducer.interfaces import RIIntroducerSubscriberClient, \
    8.      IIntroducerClient
    9. from allmydata.util import log, idlib
   10. from allmydata.util.rrefutil import add_version_to_remote_reference, trap_deadref
   11. 
   12. 
   13. class IntroducerClient(service.Service, Referenceable):
   14.     implements(RIIntroducerSubscriberClient, IIntroducerClient)
   15. 
   16.     def __init__(self, tub, introducer_furl,
   17.                  nickname, my_version, oldest_supported):
   18.         self._tub = tub
   19.         self.introducer_furl = introducer_furl
   20. 
   21.         assert type(nickname) is unicode
   22.         self._nickname_utf8 = nickname.encode("utf-8") # we always send UTF-8
   23.         self._my_version = my_version
   24.         self._oldest_supported = oldest_supported
   25. 
   26.         self._published_announcements = set()
   27. 
   28.         self._publisher = None
   29. 
   30.         self._local_subscribers = [] # (servicename,cb,args,kwargs) tuples
   31.         self._subscribed_service_names = set()
   32.         self._subscriptions = set() # requests we've actually sent
   33. 
   34.         # _current_announcements remembers one announcement per
   35.         # (servicename,serverid) pair. Anything that arrives with the same
   36.         # pair will displace the previous one. This stores unpacked
   37.         # announcement dictionaries, which can be compared for equality to
   38.         # distinguish re-announcement from updates. It also provides memory
   39.         # for clients who subscribe after startup.
   40.         self._current_announcements = {}
   41. 
   42.         self.encoding_parameters = None
   43. 
   44.         # hooks for unit tests
   45.         self._debug_counts = {
   46.             "inbound_message": 0,
   47.             "inbound_announcement": 0,
   48.             "wrong_service": 0,
   49.             "duplicate_announcement": 0,
   50.             "update": 0,
   51.             "new_announcement": 0,
   52.             "outbound_message": 0,
   53.             }
   54. 
   55.     def startService(self):
   56.         service.Service.startService(self)
   57.         self._introducer_error = None
   58.         rc = self._tub.connectTo(self.introducer_furl, self._got_introducer)
   59.         self._introducer_reconnector = rc
   60.         def connect_failed(failure):
   61.             self.log("Initial Introducer connection failed: perhaps it's down",
   62.                      level=log.WEIRD, failure=failure, umid="c5MqUQ")
   63.         d = self._tub.getReference(self.introducer_furl)
   64.         d.addErrback(connect_failed)
   65. 
   66.     def _got_introducer(self, publisher):
   67.         self.log("connected to introducer, getting versions")
   68.         default = { "http://allmydata.org/tahoe/protocols/introducer/v1":
   69.                     { },
   70.                     "application-version": "unknown: no get_version()",
   71.                     }
   72.         d = add_version_to_remote_reference(publisher, default)
   73.         d.addCallback(self._got_versioned_introducer)
   74.         d.addErrback(self._got_error)
   75. 
   76.     def _got_error(self, f):
   77.         # TODO: for the introducer, perhaps this should halt the application
   78.         self._introducer_error = f # polled by tests
   79. 
   80.     def _got_versioned_introducer(self, publisher):
   81.         self.log("got introducer version: %s" % (publisher.version,))
   82.         # we require a V1 introducer
   83.         needed = "http://allmydata.org/tahoe/protocols/introducer/v1"
   84.         if needed not in publisher.version:
   85.             raise InsufficientVersionError(needed, publisher.version)
   86.         self._publisher = publisher
   87.         publisher.notifyOnDisconnect(self._disconnected)
   88.         self._maybe_publish()
   89.         self._maybe_subscribe()
   90. 
   91.     def _disconnected(self):
   92.         self.log("bummer, we've lost our connection to the introducer")
   93.         self._publisher = None
   94.         self._subscriptions.clear()
   95. 
   96.     def log(self, *args, **kwargs):
   97.         if "facility" not in kwargs:
   98.             kwargs["facility"] = "tahoe.introducer"
   99.         return log.msg(*args, **kwargs)
  100. 
  101. 
  102.     def publish(self, furl, service_name, remoteinterface_name):
  103.         assert type(self._nickname_utf8) is str # we always send UTF-8
  104.         ann = (furl, service_name, remoteinterface_name,
  105.                self._nickname_utf8, self._my_version, self._oldest_supported)
  106.         self._published_announcements.add(ann)
  107.         self._maybe_publish()
  108. 
  109.     def subscribe_to(self, service_name, cb, *args, **kwargs):
  110.         self._local_subscribers.append( (service_name,cb,args,kwargs) )
  111.         self._subscribed_service_names.add(service_name)
  112.         self._maybe_subscribe()
  113.         for (servicename,nodeid),ann_d in self._current_announcements.items():
  114.             if servicename == service_name:
  115.                 eventually(cb, nodeid, ann_d)
  116. 
  117.     def _maybe_subscribe(self):
  118.         if not self._publisher:
  119.             self.log("want to subscribe, but no introducer yet",
  120.                      level=log.NOISY)
  121.             return
  122.         for service_name in self._subscribed_service_names:
  123.             if service_name not in self._subscriptions:
  124.                 # there is a race here, but the subscription desk ignores
  125.                 # duplicate requests.
  126.                 self._subscriptions.add(service_name)
  127.                 d = self._publisher.callRemote("subscribe", self, service_name)
  128.                 d.addErrback(trap_deadref)
  129.                 d.addErrback(log.err, format="server errored during subscribe",
  130.                              facility="tahoe.introducer",
  131.                              level=log.WEIRD, umid="2uMScQ")
  132. 
  133.     def _maybe_publish(self):
  134.         if not self._publisher:
  135.             self.log("want to publish, but no introducer yet", level=log.NOISY)
  136.             return
  137.         # this re-publishes everything. The Introducer ignores duplicates
  138.         for ann in self._published_announcements:
  139.             self._debug_counts["outbound_message"] += 1
  140.             d = self._publisher.callRemote("publish", ann)
  141.             d.addErrback(trap_deadref)
  142.             d.addErrback(log.err,
  143.                          format="server errored during publish %(ann)s",
  144.                          ann=ann, facility="tahoe.introducer",
  145.                          level=log.WEIRD, umid="xs9pVQ")
  146. 
  147. 
  148. 
  149.     def remote_announce(self, announcements):
  150.         self.log("received %d announcements" % len(announcements))
  151.         self._debug_counts["inbound_message"] += 1
  152.         for ann in announcements:
  153.             try:
  154.                 self._process_announcement(ann)
  155.             except:
  156.                 log.err(format="unable to process announcement %(ann)s",
  157.                         ann=ann)
  158.                 # Don't let a corrupt announcement prevent us from processing
  159.                 # the remaining ones. Don't return an error to the server,
  160.                 # since they'd just ignore it anyways.
  161.                 pass
  162. 
  163.     def _process_announcement(self, ann):
  164.         self._debug_counts["inbound_announcement"] += 1
  165.         (furl, service_name, ri_name, nickname_utf8, ver, oldest) = ann
  166.         if service_name not in self._subscribed_service_names:
  167.             self.log("announcement for a service we don't care about [%s]"
  168.                      % (service_name,), level=log.UNUSUAL, umid="dIpGNA")
  169.             self._debug_counts["wrong_service"] += 1
  170.             return
  171.         self.log("announcement for [%s]: %s" % (service_name, ann),
  172.                  umid="BoKEag")
  173.         assert type(furl) is str
  174.         assert type(service_name) is str
  175.         assert type(ri_name) is str
  176.         assert type(nickname_utf8) is str
  177.         nickname = nickname_utf8.decode("utf-8")
  178.         assert type(nickname) is unicode
  179.         assert type(ver) is str
  180.         assert type(oldest) is str
  181. 
  182.         nodeid = b32decode(SturdyRef(furl).tubID.upper())
  183.         nodeid_s = idlib.shortnodeid_b2a(nodeid)
  184. 
  185.         ann_d = { "version": 0,
  186.                   "service-name": service_name,
  187. 
  188.                   "FURL": furl,
  189.                   "nickname": nickname,
  190.                   "app-versions": {}, # need #466 and v2 introducer
  191.                   "my-version": ver,
  192.                   "oldest-supported": oldest,
  193.                   }
  194. 
  195.         index = (service_name, nodeid)
  196.         if self._current_announcements.get(index, None) == ann_d:
  197.             self.log("reannouncement for [%(service)s]:%(nodeid)s, ignoring",
  198.                      service=service_name, nodeid=nodeid_s,
  199.                      level=log.UNUSUAL, umid="B1MIdA")
  200.             self._debug_counts["duplicate_announcement"] += 1
  201.             return
  202.         if index in self._current_announcements:
  203.             self._debug_counts["update"] += 1
  204.         else:
  205.             self._debug_counts["new_announcement"] += 1
  206. 
  207.         self._current_announcements[index] = ann_d
  208.         # note: we never forget an index, but we might update its value
  209. 
  210.         for (service_name2,cb,args,kwargs) in self._local_subscribers:
  211.             if service_name2 == service_name:
  212.                 eventually(cb, nodeid, ann_d, *args, **kwargs)
  213. 
  214.     def remote_set_encoding_parameters(self, parameters):
  215.         self.encoding_parameters = parameters
  216. 
  217.     def connected_to_introducer(self):
  218.         return bool(self._publisher)