source: trunk/src/allmydata/mutable/servermap.py

Last change on this file was c7bb190, checked in by Jean-Paul Calderone <exarkun@…>, at 2023-01-03T15:38:18Z

Factor some SSK "signature" key handling code into a more reusable shape

This gives the test suite access to the derivation function so it can
re-derive certain values to use as expected results to compare against actual
results.

  • Property mode set to 100644
File size: 52.1 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6from six import ensure_str
7
8import sys, time, copy
9from zope.interface import implementer
10from itertools import count
11from collections import defaultdict
12from twisted.internet import defer
13from twisted.python import failure
14from foolscap.api import DeadReferenceError, RemoteException, eventually, \
15                         fireEventually
16from allmydata.crypto.error import BadSignature
17from allmydata.crypto import rsa
18from allmydata.util import base32, hashutil, log, deferredutil
19from allmydata.util.dictutil import DictOfSets
20from allmydata.storage.server import si_b2a
21from allmydata.interfaces import IServermapUpdaterStatus
22
23from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \
24     MODE_READ, MODE_REPAIR, CorruptShareError, decrypt_privkey
25from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy
26
27@implementer(IServermapUpdaterStatus)
28class UpdateStatus(object):
29    statusid_counter = count(0)
30    def __init__(self):
31        self.timings = {}
32        self.timings["per_server"] = defaultdict(list)
33        self.timings["cumulative_verify"] = 0.0
34        self.privkey_from = None
35        self.problems = {}
36        self.active = True
37        self.storage_index = None
38        self.mode = "?"
39        self.status = "Not started"
40        self.progress = 0.0
41        self.counter = next(self.statusid_counter)
42        self.started = time.time()
43        self.finished = None
44
45    def add_per_server_time(self, server, op, sent, elapsed):
46        assert op in ("query", "late", "privkey")
47        self.timings["per_server"][server].append((op,sent,elapsed))
48
49    def get_started(self):
50        return self.started
51    def get_finished(self):
52        return self.finished
53    def get_storage_index(self):
54        return self.storage_index
55    def get_mode(self):
56        return self.mode
57    def get_servermap(self):
58        return self.servermap
59    def get_privkey_from(self):
60        return self.privkey_from
61    def using_helper(self):
62        return False
63    def get_size(self):
64        return "-NA-"
65    def get_status(self):
66        return self.status
67    def get_progress(self):
68        return self.progress
69    def get_active(self):
70        return self.active
71    def get_counter(self):
72        return self.counter
73
74    def set_storage_index(self, si):
75        self.storage_index = si
76    def set_mode(self, mode):
77        self.mode = mode
78    def set_privkey_from(self, server):
79        self.privkey_from = server
80    def set_status(self, status):
81        self.status = status
82    def set_progress(self, value):
83        self.progress = value
84    def set_active(self, value):
85        self.active = value
86    def set_finished(self, when):
87        self.finished = when
88
89class ServerMap(object):
90    """I record the placement of mutable shares.
91
92    This object records which shares (of various versions) are located on
93    which servers.
94
95    One purpose I serve is to inform callers about which versions of the
96    mutable file are recoverable and 'current'.
97
98    A second purpose is to serve as a state marker for test-and-set
99    operations. I am passed out of retrieval operations and back into publish
100    operations, which means 'publish this new version, but only if nothing
101    has changed since I last retrieved this data'. This reduces the chances
102    of clobbering a simultaneous (uncoordinated) write.
103
104    @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a
105                        (versionid, timestamp) tuple. Each 'versionid' is a
106                        tuple of (seqnum, root_hash, IV, segsize, datalength,
107                        k, N, signed_prefix, offsets)
108
109    @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing
110                       shares that I should ignore (because a previous user
111                       of the servermap determined that they were invalid).
112                       The updater only locates a certain number of shares:
113                       if some of these turn out to have integrity problems
114                       and are unusable, the caller will need to mark those
115                       shares as bad, then re-update the servermap, then try
116                       again. The dict maps (server, shnum) tuple to old
117                       checkstring.
118    """
119
120    def __init__(self):
121        self._known_shares = {}
122        self.unreachable_servers = set() # servers that didn't respond to queries
123        self.reachable_servers = set() # servers that did respond to queries
124        self._problems = [] # mostly for debugging
125        self._bad_shares = {} # maps (server,shnum) to old checkstring
126        self._last_update_mode = None
127        self._last_update_time = 0
128        self.proxies = {}
129        self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..]
130        # where blockhashes is a list of bytestrings (the result of
131        # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both
132        # (block,salt) tuple-of-bytestrings from get_block_and_salt()
133
134    def copy(self):
135        s = ServerMap()
136        s._known_shares = self._known_shares.copy() # tuple->tuple
137        s.unreachable_servers = set(self.unreachable_servers)
138        s.reachable_servers = set(self.reachable_servers)
139        s._problems = self._problems[:]
140        s._bad_shares = self._bad_shares.copy() # tuple->str
141        s._last_update_mode = self._last_update_mode
142        s._last_update_time = self._last_update_time
143        s.update_data = copy.deepcopy(self.update_data)
144        return s
145
146    def get_reachable_servers(self):
147        return self.reachable_servers
148
149    def mark_server_reachable(self, server):
150        self.reachable_servers.add(server)
151
152    def mark_server_unreachable(self, server):
153        self.unreachable_servers.add(server)
154
155    def mark_bad_share(self, server, shnum, checkstring):
156        """This share was found to be bad, either in the checkstring or
157        signature (detected during mapupdate), or deeper in the share
158        (detected at retrieve time). Remove it from our list of useful
159        shares, and remember that it is bad so we don't add it back again
160        later. We record the share's old checkstring (which might be
161        corrupted or badly signed) so that a repair operation can do the
162        test-and-set using it as a reference.
163        """
164        assert isinstance(checkstring, bytes)
165        key = (server, shnum) # record checkstring
166        self._bad_shares[key] = checkstring
167        self._known_shares.pop(key, None)
168
169    def get_bad_shares(self):
170        # key=(server,shnum) -> checkstring
171        return self._bad_shares
172
173    def add_new_share(self, server, shnum, verinfo, timestamp):
174        """We've written a new share out, replacing any that was there
175        before."""
176        key = (server, shnum)
177        self._bad_shares.pop(key, None)
178        self._known_shares[key] = (verinfo, timestamp)
179
180    def add_problem(self, f):
181        self._problems.append(f)
182    def get_problems(self):
183        return self._problems
184
185    def set_last_update(self, mode, when):
186        self._last_update_mode = mode
187        self._last_update_time = when
188    def get_last_update(self):
189        return (self._last_update_mode, self._last_update_time)
190
191    def dump(self, out=sys.stdout):
192        print("servermap:", file=out)
193
194        for ( (server, shnum), (verinfo, timestamp) ) in list(self._known_shares.items()):
195            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
196             offsets_tuple) = verinfo
197            print("[%s]: sh#%d seq%d-%s %d-of-%d len%d" %
198                         (str(server.get_name(), "utf-8"), shnum,
199                          seqnum, str(base32.b2a(root_hash)[:4], "utf-8"), k, N,
200                          datalength), file=out)
201        if self._problems:
202            print("%d PROBLEMS" % len(self._problems), file=out)
203            for f in self._problems:
204                print(str(f), file=out)
205        return out
206
207    def all_servers(self):
208        return set([server for (server, shnum) in self._known_shares])
209
210    def all_servers_for_version(self, verinfo):
211        """Return a set of servers that hold shares for the given version."""
212        return set([server
213                    for ( (server, shnum), (verinfo2, timestamp) )
214                    in self._known_shares.items()
215                    if verinfo == verinfo2])
216
217    def get_known_shares(self):
218        # maps (server,shnum) to (versionid,timestamp)
219        return self._known_shares
220
221    def make_sharemap(self):
222        """Return a dict that maps shnum to a set of servers that hold it."""
223        sharemap = DictOfSets()
224        for (server, shnum) in self._known_shares:
225            sharemap.add(shnum, server)
226        return sharemap
227
228    def make_versionmap(self):
229        """Return a dict that maps versionid to sets of (shnum, server,
230        timestamp) tuples."""
231        versionmap = DictOfSets()
232        for ( (server, shnum), (verinfo, timestamp) ) in list(self._known_shares.items()):
233            versionmap.add(verinfo, (shnum, server, timestamp))
234        return versionmap
235
236    def debug_shares_on_server(self, server): # used by tests
237        return set([shnum for (s, shnum) in self._known_shares if s == server])
238
239    def version_on_server(self, server, shnum):
240        key = (server, shnum)
241        if key in self._known_shares:
242            (verinfo, timestamp) = self._known_shares[key]
243            return verinfo
244        return None
245
246    def shares_available(self):
247        """Return a dict that maps verinfo to tuples of
248        (num_distinct_shares, k, N) tuples."""
249        versionmap = self.make_versionmap()
250        all_shares = {}
251        for verinfo, shares in list(versionmap.items()):
252            s = set()
253            for (shnum, server, timestamp) in shares:
254                s.add(shnum)
255            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
256             offsets_tuple) = verinfo
257            all_shares[verinfo] = (len(s), k, N)
258        return all_shares
259
260    def highest_seqnum(self):
261        available = self.shares_available()
262        seqnums = [verinfo[0]
263                   for verinfo in available.keys()]
264        seqnums.append(0)
265        return max(seqnums)
266
267    def summarize_version(self, verinfo):
268        """Take a versionid, return a string that describes it."""
269        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
270         offsets_tuple) = verinfo
271        return "seq%d-%s" % (seqnum, str(base32.b2a(root_hash)[:4], "utf-8"))
272
273    def summarize_versions(self):
274        """Return a string describing which versions we know about."""
275        versionmap = self.make_versionmap()
276        bits = []
277        for (verinfo, shares) in list(versionmap.items()):
278            vstr = self.summarize_version(verinfo)
279            shnums = set([shnum for (shnum, server, timestamp) in shares])
280            bits.append("%d*%s" % (len(shnums), vstr))
281        return "/".join(bits)
282
283    def recoverable_versions(self):
284        """Return a set of versionids, one for each version that is currently
285        recoverable."""
286        versionmap = self.make_versionmap()
287        recoverable_versions = set()
288        for (verinfo, shares) in list(versionmap.items()):
289            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
290             offsets_tuple) = verinfo
291            shnums = set([shnum for (shnum, server, timestamp) in shares])
292            if len(shnums) >= k:
293                # this one is recoverable
294                recoverable_versions.add(verinfo)
295
296        return recoverable_versions
297
298    def unrecoverable_versions(self):
299        """Return a set of versionids, one for each version that is currently
300        unrecoverable."""
301        versionmap = self.make_versionmap()
302
303        unrecoverable_versions = set()
304        for (verinfo, shares) in list(versionmap.items()):
305            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
306             offsets_tuple) = verinfo
307            shnums = set([shnum for (shnum, server, timestamp) in shares])
308            if len(shnums) < k:
309                unrecoverable_versions.add(verinfo)
310
311        return unrecoverable_versions
312
313    def best_recoverable_version(self):
314        """Return a single versionid, for the so-called 'best' recoverable
315        version. Sequence number is the primary sort criteria, followed by
316        root hash. Returns None if there are no recoverable versions."""
317        recoverable = list(self.recoverable_versions())
318        recoverable.sort()
319        if recoverable:
320            return recoverable[-1]
321        return None
322
323    def size_of_version(self, verinfo):
324        """Given a versionid (perhaps returned by best_recoverable_version),
325        return the size of the file in bytes."""
326        (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
327         offsets_tuple) = verinfo
328        return datalength
329
330    def unrecoverable_newer_versions(self):
331        # Return a dict of versionid -> health, for versions that are
332        # unrecoverable and have later seqnums than any recoverable versions.
333        # These indicate that a write will lose data.
334        versionmap = self.make_versionmap()
335        healths = {} # maps verinfo to (found,k)
336        unrecoverable = set()
337        highest_recoverable_seqnum = -1
338        for (verinfo, shares) in list(versionmap.items()):
339            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
340             offsets_tuple) = verinfo
341            shnums = set([shnum for (shnum, server, timestamp) in shares])
342            healths[verinfo] = (len(shnums),k)
343            if len(shnums) < k:
344                unrecoverable.add(verinfo)
345            else:
346                highest_recoverable_seqnum = max(seqnum,
347                                                 highest_recoverable_seqnum)
348
349        newversions = {}
350        for verinfo in unrecoverable:
351            (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
352             offsets_tuple) = verinfo
353            if seqnum > highest_recoverable_seqnum:
354                newversions[verinfo] = healths[verinfo]
355
356        return newversions
357
358
359    def needs_merge(self):
360        # return True if there are multiple recoverable versions with the
361        # same seqnum, meaning that MutableFileNode.read_best_version is not
362        # giving you the whole story, and that using its data to do a
363        # subsequent publish will lose information.
364        recoverable_seqnums = [verinfo[0]
365                               for verinfo in self.recoverable_versions()]
366        for seqnum in recoverable_seqnums:
367            if recoverable_seqnums.count(seqnum) > 1:
368                return True
369        return False
370
371
372    def get_update_data_for_share_and_verinfo(self, shnum, verinfo):
373        """
374        I return the update data for the given shnum
375        """
376        update_data = self.update_data[shnum]
377        update_datum = [i[1] for i in update_data if i[0] == verinfo][0]
378        return update_datum
379
380
381    def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data):
382        """
383        I record the block hash tree for the given shnum.
384        """
385        self.update_data.setdefault(shnum , []).append((verinfo, data))
386
387
388class ServermapUpdater(object):
389    def __init__(self, filenode, storage_broker, monitor, servermap,
390                 mode=MODE_READ, add_lease=False, update_range=None):
391        """I update a servermap, locating a sufficient number of useful
392        shares and remembering where they are located.
393
394        """
395
396        self._node = filenode
397        self._storage_broker = storage_broker
398        self._monitor = monitor
399        self._servermap = servermap
400        self.mode = mode
401        self._add_lease = add_lease
402        self._running = True
403
404        self._storage_index = filenode.get_storage_index()
405        self._last_failure = None
406
407        self._status = UpdateStatus()
408        self._status.set_storage_index(self._storage_index)
409        self._status.set_progress(0.0)
410        self._status.set_mode(mode)
411
412        self._servers_responded = set()
413
414        # how much data should we read?
415        # SDMF:
416        #  * if we only need the checkstring, then [0:75]
417        #  * if we need to validate the checkstring sig, then [543ish:799ish]
418        #  * if we need the verification key, then [107:436ish]
419        #   * the offset table at [75:107] tells us about the 'ish'
420        #  * if we need the encrypted private key, we want [-1216ish:]
421        #   * but we can't read from negative offsets
422        #   * the offset table tells us the 'ish', also the positive offset
423        # MDMF:
424        #  * Checkstring? [0:72]
425        #  * If we want to validate the checkstring, then [0:72], [143:?] --
426        #    the offset table will tell us for sure.
427        #  * If we need the verification key, we have to consult the offset
428        #    table as well.
429        # At this point, we don't know which we are. Our filenode can
430        # tell us, but it might be lying -- in some cases, we're
431        # responsible for telling it which kind of file it is.
432        self._read_size = 4000
433        if mode == MODE_CHECK:
434            # we use unpack_prefix_and_signature, so we need 1k
435            self._read_size = 1000
436        self._need_privkey = False
437
438        if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey():
439            self._need_privkey = True
440        # check+repair: repair requires the privkey, so if we didn't happen
441        # to ask for it during the check, we'll have problems doing the
442        # publish.
443
444        self.fetch_update_data = False
445        if mode == MODE_WRITE and update_range:
446            # We're updating the servermap in preparation for an
447            # in-place file update, so we need to fetch some additional
448            # data from each share that we find.
449            assert len(update_range) == 2
450
451            self.start_segment = update_range[0]
452            self.end_segment = update_range[1]
453            self.fetch_update_data = True
454
455        prefix = si_b2a(self._storage_index)[:5]
456        self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)",
457                                   si=prefix, mode=mode)
458
459    def get_status(self):
460        return self._status
461
462    def log(self, *args, **kwargs):
463        if "parent" not in kwargs:
464            kwargs["parent"] = self._log_number
465        if "facility" not in kwargs:
466            kwargs["facility"] = "tahoe.mutable.mapupdate"
467        return log.msg(*args, **kwargs)
468
469    def update(self):
470        """Update the servermap to reflect current conditions. Returns a
471        Deferred that fires with the servermap once the update has finished."""
472        self._started = time.time()
473        self._status.set_active(True)
474
475        # self._valid_versions is a set of validated verinfo tuples. We just
476        # use it to remember which versions had valid signatures, so we can
477        # avoid re-checking the signatures for each share.
478        self._valid_versions = set()
479
480        self._done_deferred = defer.Deferred()
481
482        # first, which servers should be talk to? Any that were in our old
483        # servermap, plus "enough" others.
484
485        self._queries_completed = 0
486
487        sb = self._storage_broker
488        # All of the servers, permuted by the storage index, as usual.
489        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
490        self.full_serverlist = full_serverlist # for use later, immutable
491        self.extra_servers = full_serverlist[:] # servers are removed as we use them
492        self._good_servers = set() # servers who had some shares
493        self._servers_with_shares = set() #servers that we know have shares now
494        self._empty_servers = set() # servers who don't have any shares
495        self._bad_servers = set() # servers to whom our queries failed
496
497        k = self._node.get_required_shares()
498        # For what cases can these conditions work?
499        if k is None:
500            # make a guess
501            k = 3
502        N = self._node.get_total_shares()
503        if N is None:
504            N = 10
505        self.EPSILON = k
506        # we want to send queries to at least this many servers (although we
507        # might not wait for all of their answers to come back)
508        self.num_servers_to_query = k + self.EPSILON
509
510        if self.mode in (MODE_CHECK, MODE_REPAIR):
511            # We want to query all of the servers.
512            initial_servers_to_query = list(full_serverlist)
513            must_query = set(initial_servers_to_query)
514            self.extra_servers = []
515        elif self.mode == MODE_WRITE:
516            # we're planning to replace all the shares, so we want a good
517            # chance of finding them all. We will keep searching until we've
518            # seen epsilon that don't have a share.
519            # We don't query all of the servers because that could take a while.
520            self.num_servers_to_query = N + self.EPSILON
521            initial_servers_to_query, must_query = self._build_initial_querylist()
522            self.required_num_empty_servers = self.EPSILON
523
524            # TODO: arrange to read lots of data from k-ish servers, to avoid
525            # the extra round trip required to read large directories. This
526            # might also avoid the round trip required to read the encrypted
527            # private key.
528
529        else: # MODE_READ, MODE_ANYTHING
530            # 2*k servers is good enough.
531            initial_servers_to_query, must_query = self._build_initial_querylist()
532
533        # this is a set of servers that we are required to get responses
534        # from: they are servers who used to have a share, so we need to know
535        # where they currently stand, even if that means we have to wait for
536        # a silently-lost TCP connection to time out. We remove servers from
537        # this set as we get responses.
538        self._must_query = set(must_query)
539
540        # now initial_servers_to_query contains the servers that we should
541        # ask, self.must_query contains the servers that we must have heard
542        # from before we can consider ourselves finished, and
543        # self.extra_servers contains the overflow (servers that we should
544        # tap if we don't get enough responses)
545        # I guess that self._must_query is a subset of
546        # initial_servers_to_query?
547        assert must_query.issubset(initial_servers_to_query)
548
549        self._send_initial_requests(initial_servers_to_query)
550        self._status.timings["initial_queries"] = time.time() - self._started
551        return self._done_deferred
552
553    def _build_initial_querylist(self):
554        # we send queries to everyone who was already in the sharemap
555        initial_servers_to_query = set(self._servermap.all_servers())
556        # and we must wait for responses from them
557        must_query = set(initial_servers_to_query)
558
559        while ((self.num_servers_to_query > len(initial_servers_to_query))
560               and self.extra_servers):
561            initial_servers_to_query.add(self.extra_servers.pop(0))
562
563        return initial_servers_to_query, must_query
564
565    def _send_initial_requests(self, serverlist):
566        self._status.set_status("Sending %d initial queries" % len(serverlist))
567        self._queries_outstanding = set()
568        for server in serverlist:
569            self._queries_outstanding.add(server)
570            self._do_query(server, self._storage_index, self._read_size)
571
572        if not serverlist:
573            # there is nobody to ask, so we need to short-circuit the state
574            # machine.
575            d = defer.maybeDeferred(self._check_for_done, None)
576            d.addErrback(self._fatal_error)
577
578        # control flow beyond this point: state machine. Receiving responses
579        # from queries is the input. We might send out more queries, or we
580        # might produce a result.
581        return None
582
583    def _do_query(self, server, storage_index, readsize):
584        self.log(format="sending query to [%(name)s], readsize=%(readsize)d",
585                 name=server.get_name(),
586                 readsize=readsize,
587                 level=log.NOISY)
588        started = time.time()
589        self._queries_outstanding.add(server)
590        d = self._do_read(server, storage_index, [], [(0, readsize)])
591        d.addCallback(self._got_results, server, readsize, storage_index,
592                      started)
593        d.addErrback(self._query_failed, server)
594        # errors that aren't handled by _query_failed (and errors caused by
595        # _query_failed) get logged, but we still want to check for doneness.
596        d.addErrback(log.err)
597        d.addErrback(self._fatal_error)
598        d.addCallback(self._check_for_done)
599        return d
600
601    def _do_read(self, server, storage_index, shnums, readv):
602        """
603        If self._add_lease is true, a lease is added, and the result only fires
604        once the least has also been added.
605        """
606        ss = server.get_storage_server()
607        if self._add_lease:
608            # send an add-lease message in parallel. The results are handled
609            # separately.
610            renew_secret = self._node.get_renewal_secret(server)
611            cancel_secret = self._node.get_cancel_secret(server)
612            d2 = ss.add_lease(
613                storage_index,
614                renew_secret,
615                cancel_secret,
616            )
617            # we ignore success
618            d2.addErrback(self._add_lease_failed, server, storage_index)
619        else:
620            d2 = defer.succeed(None)
621        d = ss.slot_readv(storage_index, shnums, readv)
622
623        def passthrough(result):
624            # Wait for d2, but fire with result of slot_readv() regardless of
625            # result of d2.
626            return d2.addBoth(lambda _: result)
627
628        d.addCallback(passthrough)
629        return d
630
631
632    def _got_corrupt_share(self, e, shnum, server, data, lp):
633        """
634        I am called when a remote server returns a corrupt share in
635        response to one of our queries. By corrupt, I mean a share
636        without a valid signature. I then record the failure, notify the
637        server of the corruption, and record the share as bad.
638        """
639        f = failure.Failure(e)
640        self.log(format="bad share: %(f_value)s", f_value=str(f),
641                 failure=f, parent=lp, level=log.WEIRD, umid="h5llHg")
642        # Notify the server that its share is corrupt.
643        self.notify_server_corruption(server, shnum, str(e))
644        # By flagging this as a bad server, we won't count any of
645        # the other shares on that server as valid, though if we
646        # happen to find a valid version string amongst those
647        # shares, we'll keep track of it so that we don't need
648        # to validate the signature on those again.
649        self._bad_servers.add(server)
650        self._last_failure = f
651        # XXX: Use the reader for this?
652        checkstring = data[:SIGNED_PREFIX_LENGTH]
653        self._servermap.mark_bad_share(server, shnum, checkstring)
654        self._servermap.add_problem(f)
655
656
657    def _got_results(self, datavs, server, readsize, storage_index, started):
658        lp = self.log(format="got result from [%(name)s], %(numshares)d shares",
659                      name=server.get_name(),
660                      numshares=len(datavs))
661        ss = server.get_storage_server()
662        now = time.time()
663        elapsed = now - started
664        def _done_processing(ignored=None):
665            self._queries_outstanding.discard(server)
666            self._servermap.mark_server_reachable(server)
667            self._must_query.discard(server)
668            self._queries_completed += 1
669        if not self._running:
670            self.log("but we're not running, so we'll ignore it", parent=lp)
671            _done_processing()
672            self._status.add_per_server_time(server, "late", started, elapsed)
673            return
674        self._status.add_per_server_time(server, "query", started, elapsed)
675
676        if datavs:
677            self._good_servers.add(server)
678        else:
679            self._empty_servers.add(server)
680
681        ds = []
682
683        for shnum,datav in list(datavs.items()):
684            data = datav[0]
685            reader = MDMFSlotReadProxy(ss,
686                                       storage_index,
687                                       shnum,
688                                       data,
689                                       data_is_everything=(len(data) < readsize))
690
691            # our goal, with each response, is to validate the version
692            # information and share data as best we can at this point --
693            # we do this by validating the signature. To do this, we
694            # need to do the following:
695            #   - If we don't already have the public key, fetch the
696            #     public key. We use this to validate the signature.
697            if not self._node.get_pubkey():
698                # fetch and set the public key.
699                d = reader.get_verification_key()
700                d.addCallback(lambda results, shnum=shnum:
701                              self._try_to_set_pubkey(results, server, shnum, lp))
702                # XXX: Make self._pubkey_query_failed?
703                d.addErrback(lambda error, shnum=shnum, data=data:
704                             self._got_corrupt_share(error, shnum, server, data, lp))
705            else:
706                # we already have the public key.
707                d = defer.succeed(None)
708
709            # Neither of these two branches return anything of
710            # consequence, so the first entry in our deferredlist will
711            # be None.
712
713            # - Next, we need the version information. We almost
714            #   certainly got this by reading the first thousand or so
715            #   bytes of the share on the storage server, so we
716            #   shouldn't need to fetch anything at this step.
717            d2 = reader.get_verinfo()
718            d2.addErrback(lambda error, shnum=shnum, data=data:
719                          self._got_corrupt_share(error, shnum, server, data, lp))
720            # - Next, we need the signature. For an SDMF share, it is
721            #   likely that we fetched this when doing our initial fetch
722            #   to get the version information. In MDMF, this lives at
723            #   the end of the share, so unless the file is quite small,
724            #   we'll need to do a remote fetch to get it.
725            d3 = reader.get_signature()
726            d3.addErrback(lambda error, shnum=shnum, data=data:
727                          self._got_corrupt_share(error, shnum, server, data, lp))
728            #  Once we have all three of these responses, we can move on
729            #  to validating the signature
730
731            # Does the node already have a privkey? If not, we'll try to
732            # fetch it here.
733            if self._need_privkey:
734                d4 = reader.get_encprivkey()
735                d4.addCallback(lambda results, shnum=shnum:
736                               self._try_to_validate_privkey(results, server, shnum, lp))
737                d4.addErrback(lambda error, shnum=shnum:
738                              self._privkey_query_failed(error, server, shnum, lp))
739            else:
740                d4 = defer.succeed(None)
741
742
743            if self.fetch_update_data:
744                # fetch the block hash tree and first + last segment, as
745                # configured earlier.
746                # Then set them in wherever we happen to want to set
747                # them.
748                ds = []
749                # XXX: We do this above, too. Is there a good way to
750                # make the two routines share the value without
751                # introducing more roundtrips?
752                ds.append(reader.get_verinfo())
753                ds.append(reader.get_blockhashes())
754                ds.append(reader.get_block_and_salt(self.start_segment))
755                ds.append(reader.get_block_and_salt(self.end_segment))
756                d5 = deferredutil.gatherResults(ds)
757                d5.addCallback(self._got_update_results_one_share, shnum)
758            else:
759                d5 = defer.succeed(None)
760
761            dl = defer.DeferredList([d, d2, d3, d4, d5])
762            def _append_proxy(passthrough, shnum=shnum, reader=reader):
763                # Store the proxy (with its cache) keyed by serverid and
764                # version.
765                _, (_,verinfo), _, _, _ = passthrough
766                verinfo = self._make_verinfo_hashable(verinfo)
767                self._servermap.proxies[(verinfo,
768                                         server.get_serverid(),
769                                         storage_index, shnum)] = reader
770                return passthrough
771            dl.addCallback(_append_proxy)
772            dl.addBoth(self._turn_barrier)
773            dl.addCallback(lambda results, shnum=shnum:
774                           self._got_signature_one_share(results, shnum, server, lp))
775            dl.addErrback(lambda error, shnum=shnum, data=data:
776                          self._got_corrupt_share(error, shnum, server, data, lp))
777            ds.append(dl)
778        # dl is a deferred list that will fire when all of the shares
779        # that we found on this server are done processing. When dl fires,
780        # we know that processing is done, so we can decrement the
781        # semaphore-like thing that we incremented earlier.
782        dl = defer.DeferredList(ds, fireOnOneErrback=True)
783        # Are we done? Done means that there are no more queries to
784        # send, that there are no outstanding queries, and that we
785        # haven't received any queries that are still processing. If we
786        # are done, self._check_for_done will cause the done deferred
787        # that we returned to our caller to fire, which tells them that
788        # they have a complete servermap, and that we won't be touching
789        # the servermap anymore.
790        dl.addCallback(_done_processing)
791        dl.addCallback(self._check_for_done)
792        dl.addErrback(self._fatal_error)
793        # all done!
794        self.log("_got_results done", parent=lp, level=log.NOISY)
795        return dl
796
797
798    def _turn_barrier(self, result):
799        """
800        I help the servermap updater avoid the recursion limit issues
801        discussed in #237.
802        """
803        return fireEventually(result)
804
805
806    def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp):
807        if self._node.get_pubkey():
808            return # don't go through this again if we don't have to
809        fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
810        assert len(fingerprint) == 32
811        if fingerprint != self._node.get_fingerprint():
812            raise CorruptShareError(server, shnum,
813                                    "pubkey doesn't match fingerprint")
814        self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s))
815        assert self._node.get_pubkey()
816
817
818    def notify_server_corruption(self, server, shnum, reason):
819        if isinstance(reason, str):
820            reason = reason.encode("utf-8")
821        ss = server.get_storage_server()
822        ss.advise_corrupt_share(
823            b"mutable",
824            self._storage_index,
825            shnum,
826            reason,
827        )
828
829
830    def _got_signature_one_share(self, results, shnum, server, lp):
831        # It is our job to give versioninfo to our caller. We need to
832        # raise CorruptShareError if the share is corrupt for any
833        # reason, something that our caller will handle.
834        self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s",
835                 shnum=shnum,
836                 name=server.get_name(),
837                 level=log.NOISY,
838                 parent=lp)
839        if not self._running:
840            # We can't process the results, since we can't touch the
841            # servermap anymore.
842            self.log("but we're not running anymore.")
843            return None
844
845        _, verinfo, signature, __, ___ = results
846        verinfo = self._make_verinfo_hashable(verinfo[1])
847
848        # This tuple uniquely identifies a share on the grid; we use it
849        # to keep track of the ones that we've already seen.
850        (seqnum,
851         root_hash,
852         saltish,
853         segsize,
854         datalen,
855         k,
856         n,
857         prefix,
858         offsets_tuple) = verinfo
859
860
861        if verinfo not in self._valid_versions:
862            # This is a new version tuple, and we need to validate it
863            # against the public key before keeping track of it.
864            assert self._node.get_pubkey()
865            try:
866                rsa.verify_signature(self._node.get_pubkey(), signature[1], prefix)
867            except BadSignature:
868                raise CorruptShareError(server, shnum,
869                                        "signature is invalid")
870
871        # ok, it's a valid verinfo. Add it to the list of validated
872        # versions.
873        self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d"
874                 % (seqnum, str(base32.b2a(root_hash)[:4], "utf-8"),
875                    ensure_str(server.get_name()), shnum,
876                    k, n, segsize, datalen),
877                    parent=lp)
878        self._valid_versions.add(verinfo)
879        # We now know that this is a valid candidate verinfo. Whether or
880        # not this instance of it is valid is a matter for the next
881        # statement; at this point, we just know that if we see this
882        # version info again, that its signature checks out and that
883        # we're okay to skip the signature-checking step.
884
885        # (server, shnum) are bound in the method invocation.
886        if (server, shnum) in self._servermap.get_bad_shares():
887            # we've been told that the rest of the data in this share is
888            # unusable, so don't add it to the servermap.
889            self.log("but we've been told this is a bad share",
890                     parent=lp, level=log.UNUSUAL)
891            return verinfo
892
893        # Add the info to our servermap.
894        timestamp = time.time()
895        self._servermap.add_new_share(server, shnum, verinfo, timestamp)
896        self._servers_with_shares.add(server)
897
898        return verinfo
899
900    def _make_verinfo_hashable(self, verinfo):
901        (seqnum,
902         root_hash,
903         saltish,
904         segsize,
905         datalen,
906         k,
907         n,
908         prefix,
909         offsets) = verinfo
910
911        offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] )
912
913        verinfo = (seqnum,
914                   root_hash,
915                   saltish,
916                   segsize,
917                   datalen,
918                   k,
919                   n,
920                   prefix,
921                   offsets_tuple)
922        return verinfo
923
924    def _got_update_results_one_share(self, results, share):
925        """
926        I record the update results in results.
927        """
928        assert len(results) == 4
929        verinfo, blockhashes, start, end = results
930        verinfo = self._make_verinfo_hashable(verinfo)
931        update_data = (blockhashes, start, end)
932        self._servermap.set_update_data_for_share_and_verinfo(share,
933                                                              verinfo,
934                                                              update_data)
935
936    def _deserialize_pubkey(self, pubkey_s):
937        verifier = rsa.create_verifying_key_from_string(pubkey_s)
938        return verifier
939
940    def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp):
941        """
942        Given a writekey from a remote server, I validate it against the
943        writekey stored in my node. If it is valid, then I set the
944        privkey and encprivkey properties of the node.
945        """
946        node_writekey = self._node.get_writekey()
947        alleged_privkey_s = decrypt_privkey(node_writekey, enc_privkey)
948        alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
949        if alleged_writekey != node_writekey:
950            self.log("invalid privkey from %r shnum %d" %
951                     (server.get_name(), shnum),
952                     parent=lp, level=log.WEIRD, umid="aJVccw")
953            return
954
955        # it's good
956        self.log("got valid privkey from shnum %d on serverid %r" %
957                 (shnum, server.get_name()),
958                 parent=lp)
959        privkey, _ = rsa.create_signing_keypair_from_string(alleged_privkey_s)
960        self._node._populate_encprivkey(enc_privkey)
961        self._node._populate_privkey(privkey)
962        self._need_privkey = False
963        self._status.set_privkey_from(server)
964
965
966    def _add_lease_failed(self, f, server, storage_index):
967        # Older versions of Tahoe didn't handle the add-lease message very
968        # well: <=1.1.0 throws a NameError because it doesn't implement
969        # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets
970        # (which is most of them, since we send add-lease to everybody,
971        # before we know whether or not they have any shares for us), and
972        # 1.2.0 throws KeyError even on known buckets due to an internal bug
973        # in the latency-measuring code.
974
975        # we want to ignore the known-harmless errors and log the others. In
976        # particular we want to log any local errors caused by coding
977        # problems.
978
979        if f.check(DeadReferenceError):
980            return
981        if f.check(RemoteException):
982            if f.value.failure.check(KeyError, IndexError, NameError):
983                # this may ignore a bit too much, but that only hurts us
984                # during debugging
985                return
986            self.log(format="error in add_lease from [%(name)s]: %(f_value)s",
987                     name=server.get_name(),
988                     f_value=str(f.value),
989                     failure=f,
990                     level=log.WEIRD, umid="iqg3mw")
991            return
992        # local errors are cause for alarm
993        log.err(f,
994                format="local error in add_lease to [%(name)s]: %(f_value)s",
995                name=server.get_name(),
996                f_value=str(f.value),
997                level=log.WEIRD, umid="ZWh6HA")
998
999    def _query_failed(self, f, server):
1000        if not self._running:
1001            return
1002        level = log.WEIRD
1003        if f.check(DeadReferenceError):
1004            level = log.UNUSUAL
1005        self.log(format="error during query: %(f_value)s",
1006                 f_value=str(f.value), failure=f,
1007                 level=level, umid="IHXuQg")
1008        self._must_query.discard(server)
1009        self._queries_outstanding.discard(server)
1010        self._bad_servers.add(server)
1011        self._servermap.add_problem(f)
1012        # a server could be in both ServerMap.reachable_servers and
1013        # .unreachable_servers if they responded to our query, but then an
1014        # exception was raised in _got_results.
1015        self._servermap.mark_server_unreachable(server)
1016        self._queries_completed += 1
1017        self._last_failure = f
1018
1019
1020    def _privkey_query_failed(self, f, server, shnum, lp):
1021        self._queries_outstanding.discard(server)
1022        if not self._running:
1023            return
1024        level = log.WEIRD
1025        if f.check(DeadReferenceError):
1026            level = log.UNUSUAL
1027        self.log(format="error during privkey query: %(f_value)s",
1028                 f_value=str(f.value), failure=f,
1029                 parent=lp, level=level, umid="McoJ5w")
1030        self._servermap.add_problem(f)
1031        self._last_failure = f
1032
1033
1034    def _check_for_done(self, res):
1035        # exit paths:
1036        #  return self._send_more_queries(outstanding) : send some more queries
1037        #  return self._done() : all done
1038        #  return : keep waiting, no new queries
1039        lp = self.log(format=("_check_for_done, mode is '%(mode)s', "
1040                              "%(outstanding)d queries outstanding, "
1041                              "%(extra)d extra servers available, "
1042                              "%(must)d 'must query' servers left, "
1043                              "need_privkey=%(need_privkey)s"
1044                              ),
1045                      mode=self.mode,
1046                      outstanding=len(self._queries_outstanding),
1047                      extra=len(self.extra_servers),
1048                      must=len(self._must_query),
1049                      need_privkey=self._need_privkey,
1050                      level=log.NOISY,
1051                      )
1052
1053        if not self._running:
1054            self.log("but we're not running", parent=lp, level=log.NOISY)
1055            return
1056
1057        if self._must_query:
1058            # we are still waiting for responses from servers that used to have
1059            # a share, so we must continue to wait. No additional queries are
1060            # required at this time.
1061            self.log("%d 'must query' servers left" % len(self._must_query),
1062                     level=log.NOISY, parent=lp)
1063            return
1064
1065        if (not self._queries_outstanding and not self.extra_servers):
1066            # all queries have retired, and we have no servers left to ask. No
1067            # more progress can be made, therefore we are done.
1068            self.log("all queries are retired, no extra servers: done",
1069                     parent=lp)
1070            return self._done()
1071
1072        recoverable_versions = self._servermap.recoverable_versions()
1073        unrecoverable_versions = self._servermap.unrecoverable_versions()
1074
1075        # what is our completion policy? how hard should we work?
1076
1077        if self.mode == MODE_ANYTHING:
1078            if recoverable_versions:
1079                self.log("%d recoverable versions: done"
1080                         % len(recoverable_versions),
1081                         parent=lp)
1082                return self._done()
1083
1084        if self.mode in (MODE_CHECK, MODE_REPAIR):
1085            # we used self._must_query, and we know there aren't any
1086            # responses still waiting, so that means we must be done
1087            self.log("done", parent=lp)
1088            return self._done()
1089
1090        MAX_IN_FLIGHT = 5
1091        if self.mode == MODE_READ:
1092            # if we've queried k+epsilon servers, and we see a recoverable
1093            # version, and we haven't seen any unrecoverable higher-seqnum'ed
1094            # versions, then we're done.
1095
1096            if self._queries_completed < self.num_servers_to_query:
1097                self.log(format="%(completed)d completed, %(query)d to query: need more",
1098                         completed=self._queries_completed,
1099                         query=self.num_servers_to_query,
1100                         level=log.NOISY, parent=lp)
1101                return self._send_more_queries(MAX_IN_FLIGHT)
1102            if not recoverable_versions:
1103                self.log("no recoverable versions: need more",
1104                         level=log.NOISY, parent=lp)
1105                return self._send_more_queries(MAX_IN_FLIGHT)
1106            highest_recoverable = max(recoverable_versions)
1107            highest_recoverable_seqnum = highest_recoverable[0]
1108            for unrec_verinfo in unrecoverable_versions:
1109                if unrec_verinfo[0] > highest_recoverable_seqnum:
1110                    # there is evidence of a higher-seqnum version, but we
1111                    # don't yet see enough shares to recover it. Try harder.
1112                    # TODO: consider sending more queries.
1113                    # TODO: consider limiting the search distance
1114                    self.log("evidence of higher seqnum: need more",
1115                             level=log.UNUSUAL, parent=lp)
1116                    return self._send_more_queries(MAX_IN_FLIGHT)
1117            # all the unrecoverable versions were old or concurrent with a
1118            # recoverable version. Good enough.
1119            self.log("no higher-seqnum: done", parent=lp)
1120            return self._done()
1121
1122        if self.mode == MODE_WRITE:
1123            # we want to keep querying until we've seen a few that don't have
1124            # any shares, to be sufficiently confident that we've seen all
1125            # the shares. This is still less work than MODE_CHECK, which asks
1126            # every server in the world.
1127
1128            if not recoverable_versions:
1129                self.log("no recoverable versions: need more", parent=lp,
1130                         level=log.NOISY)
1131                return self._send_more_queries(MAX_IN_FLIGHT)
1132
1133            last_found = -1
1134            last_not_responded = -1
1135            num_not_responded = 0
1136            num_not_found = 0
1137            states = []
1138            found_boundary = False
1139
1140            for i,server in enumerate(self.full_serverlist):
1141                if server in self._bad_servers:
1142                    # query failed
1143                    states.append("x")
1144                    #self.log("loop [%s]: x" % server.get_name()
1145                elif server in self._empty_servers:
1146                    # no shares
1147                    states.append("0")
1148                    #self.log("loop [%s]: 0" % server.get_name()
1149                    if last_found != -1:
1150                        num_not_found += 1
1151                        if num_not_found >= self.EPSILON:
1152                            self.log("found our boundary, %s" %
1153                                     "".join(states),
1154                                     parent=lp, level=log.NOISY)
1155                            found_boundary = True
1156                            break
1157
1158                elif server in self._servers_with_shares:
1159                    # yes shares
1160                    states.append("1")
1161                    #self.log("loop [%s]: 1" % server.get_name()
1162                    last_found = i
1163                    num_not_found = 0
1164                else:
1165                    # not responded yet
1166                    states.append("?")
1167                    #self.log("loop [%s]: ?" % server.get_name()
1168                    last_not_responded = i
1169                    num_not_responded += 1
1170
1171            if found_boundary:
1172                # we need to know that we've gotten answers from
1173                # everybody to the left of here
1174                if last_not_responded == -1:
1175                    # we're done
1176                    self.log("have all our answers",
1177                             parent=lp, level=log.NOISY)
1178                    # .. unless we're still waiting on the privkey
1179                    if self._need_privkey:
1180                        self.log("but we're still waiting for the privkey",
1181                                 parent=lp, level=log.NOISY)
1182                        # if we found the boundary but we haven't yet found
1183                        # the privkey, we may need to look further. If
1184                        # somehow all the privkeys were corrupted (but the
1185                        # shares were readable), then this is likely to do an
1186                        # exhaustive search.
1187                        return self._send_more_queries(MAX_IN_FLIGHT)
1188                    return self._done()
1189                # still waiting for somebody
1190                return self._send_more_queries(num_not_responded)
1191
1192            # if we hit here, we didn't find our boundary, so we're still
1193            # waiting for servers
1194            self.log("no boundary yet, %s" % "".join(states), parent=lp,
1195                     level=log.NOISY)
1196            return self._send_more_queries(MAX_IN_FLIGHT)
1197
1198        # otherwise, keep up to 5 queries in flight. TODO: this is pretty
1199        # arbitrary, really I want this to be something like k -
1200        # max(known_version_sharecounts) + some extra
1201        self.log("catchall: need more", parent=lp, level=log.NOISY)
1202        return self._send_more_queries(MAX_IN_FLIGHT)
1203
1204    def _send_more_queries(self, num_outstanding):
1205        more_queries = []
1206
1207        while True:
1208            self.log(format=" there are %(outstanding)d queries outstanding",
1209                     outstanding=len(self._queries_outstanding),
1210                     level=log.NOISY)
1211            active_queries = len(self._queries_outstanding) + len(more_queries)
1212            if active_queries >= num_outstanding:
1213                break
1214            if not self.extra_servers:
1215                break
1216            more_queries.append(self.extra_servers.pop(0))
1217
1218        self.log(format="sending %(more)d more queries: %(who)s",
1219                 more=len(more_queries),
1220                 who=" ".join(["[%r]" % s.get_name() for s in more_queries]),
1221                 level=log.NOISY)
1222
1223        for server in more_queries:
1224            self._do_query(server, self._storage_index, self._read_size)
1225            # we'll retrigger when those queries come back
1226
1227    def _done(self):
1228        if not self._running:
1229            self.log("not running; we're already done")
1230            return
1231        self._running = False
1232        now = time.time()
1233        elapsed = now - self._started
1234        self._status.set_finished(now)
1235        self._status.timings["total"] = elapsed
1236        self._status.set_progress(1.0)
1237        self._status.set_status("Finished")
1238        self._status.set_active(False)
1239
1240        self._servermap.set_last_update(self.mode, self._started)
1241        # the servermap will not be touched after this
1242        self.log("servermap: %s" % self._servermap.summarize_versions())
1243
1244        eventually(self._done_deferred.callback, self._servermap)
1245
1246    def _fatal_error(self, f):
1247        self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw")
1248        self._done_deferred.errback(f)
Note: See TracBrowser for help on using the repository browser.