1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | from __future__ import annotations |
---|
5 | |
---|
6 | from six import ensure_str |
---|
7 | |
---|
8 | import sys, time, copy |
---|
9 | from zope.interface import implementer |
---|
10 | from itertools import count |
---|
11 | from collections import defaultdict |
---|
12 | from twisted.internet import defer |
---|
13 | from twisted.python import failure |
---|
14 | from foolscap.api import DeadReferenceError, RemoteException, eventually, \ |
---|
15 | fireEventually |
---|
16 | from allmydata.crypto.error import BadSignature |
---|
17 | from allmydata.crypto import rsa |
---|
18 | from allmydata.util import base32, hashutil, log, deferredutil |
---|
19 | from allmydata.util.dictutil import DictOfSets |
---|
20 | from allmydata.storage.server import si_b2a |
---|
21 | from allmydata.interfaces import IServermapUpdaterStatus |
---|
22 | |
---|
23 | from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \ |
---|
24 | MODE_READ, MODE_REPAIR, CorruptShareError, decrypt_privkey |
---|
25 | from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy |
---|
26 | |
---|
27 | @implementer(IServermapUpdaterStatus) |
---|
28 | class UpdateStatus(object): |
---|
29 | statusid_counter = count(0) |
---|
30 | def __init__(self): |
---|
31 | self.timings = {} |
---|
32 | self.timings["per_server"] = defaultdict(list) |
---|
33 | self.timings["cumulative_verify"] = 0.0 |
---|
34 | self.privkey_from = None |
---|
35 | self.problems = {} |
---|
36 | self.active = True |
---|
37 | self.storage_index = None |
---|
38 | self.mode = "?" |
---|
39 | self.status = "Not started" |
---|
40 | self.progress = 0.0 |
---|
41 | self.counter = next(self.statusid_counter) |
---|
42 | self.started = time.time() |
---|
43 | self.finished = None |
---|
44 | |
---|
45 | def add_per_server_time(self, server, op, sent, elapsed): |
---|
46 | assert op in ("query", "late", "privkey") |
---|
47 | self.timings["per_server"][server].append((op,sent,elapsed)) |
---|
48 | |
---|
49 | def get_started(self): |
---|
50 | return self.started |
---|
51 | def get_finished(self): |
---|
52 | return self.finished |
---|
53 | def get_storage_index(self): |
---|
54 | return self.storage_index |
---|
55 | def get_mode(self): |
---|
56 | return self.mode |
---|
57 | def get_servermap(self): |
---|
58 | return self.servermap |
---|
59 | def get_privkey_from(self): |
---|
60 | return self.privkey_from |
---|
61 | def using_helper(self): |
---|
62 | return False |
---|
63 | def get_size(self): |
---|
64 | return "-NA-" |
---|
65 | def get_status(self): |
---|
66 | return self.status |
---|
67 | def get_progress(self): |
---|
68 | return self.progress |
---|
69 | def get_active(self): |
---|
70 | return self.active |
---|
71 | def get_counter(self): |
---|
72 | return self.counter |
---|
73 | |
---|
74 | def set_storage_index(self, si): |
---|
75 | self.storage_index = si |
---|
76 | def set_mode(self, mode): |
---|
77 | self.mode = mode |
---|
78 | def set_privkey_from(self, server): |
---|
79 | self.privkey_from = server |
---|
80 | def set_status(self, status): |
---|
81 | self.status = status |
---|
82 | def set_progress(self, value): |
---|
83 | self.progress = value |
---|
84 | def set_active(self, value): |
---|
85 | self.active = value |
---|
86 | def set_finished(self, when): |
---|
87 | self.finished = when |
---|
88 | |
---|
89 | class ServerMap(object): |
---|
90 | """I record the placement of mutable shares. |
---|
91 | |
---|
92 | This object records which shares (of various versions) are located on |
---|
93 | which servers. |
---|
94 | |
---|
95 | One purpose I serve is to inform callers about which versions of the |
---|
96 | mutable file are recoverable and 'current'. |
---|
97 | |
---|
98 | A second purpose is to serve as a state marker for test-and-set |
---|
99 | operations. I am passed out of retrieval operations and back into publish |
---|
100 | operations, which means 'publish this new version, but only if nothing |
---|
101 | has changed since I last retrieved this data'. This reduces the chances |
---|
102 | of clobbering a simultaneous (uncoordinated) write. |
---|
103 | |
---|
104 | @var _known_shares: a dictionary, mapping a (server, shnum) tuple to a |
---|
105 | (versionid, timestamp) tuple. Each 'versionid' is a |
---|
106 | tuple of (seqnum, root_hash, IV, segsize, datalength, |
---|
107 | k, N, signed_prefix, offsets) |
---|
108 | |
---|
109 | @ivar _bad_shares: dict with keys of (server, shnum) tuples, describing |
---|
110 | shares that I should ignore (because a previous user |
---|
111 | of the servermap determined that they were invalid). |
---|
112 | The updater only locates a certain number of shares: |
---|
113 | if some of these turn out to have integrity problems |
---|
114 | and are unusable, the caller will need to mark those |
---|
115 | shares as bad, then re-update the servermap, then try |
---|
116 | again. The dict maps (server, shnum) tuple to old |
---|
117 | checkstring. |
---|
118 | """ |
---|
119 | |
---|
120 | def __init__(self): |
---|
121 | self._known_shares = {} |
---|
122 | self.unreachable_servers = set() # servers that didn't respond to queries |
---|
123 | self.reachable_servers = set() # servers that did respond to queries |
---|
124 | self._problems = [] # mostly for debugging |
---|
125 | self._bad_shares = {} # maps (server,shnum) to old checkstring |
---|
126 | self._last_update_mode = None |
---|
127 | self._last_update_time = 0 |
---|
128 | self.proxies = {} |
---|
129 | self.update_data = {} # shnum -> [(verinfo,(blockhashes,start,end)),..] |
---|
130 | # where blockhashes is a list of bytestrings (the result of |
---|
131 | # layout.MDMFSlotReadProxy.get_blockhashes), and start/end are both |
---|
132 | # (block,salt) tuple-of-bytestrings from get_block_and_salt() |
---|
133 | |
---|
134 | def copy(self): |
---|
135 | s = ServerMap() |
---|
136 | s._known_shares = self._known_shares.copy() # tuple->tuple |
---|
137 | s.unreachable_servers = set(self.unreachable_servers) |
---|
138 | s.reachable_servers = set(self.reachable_servers) |
---|
139 | s._problems = self._problems[:] |
---|
140 | s._bad_shares = self._bad_shares.copy() # tuple->str |
---|
141 | s._last_update_mode = self._last_update_mode |
---|
142 | s._last_update_time = self._last_update_time |
---|
143 | s.update_data = copy.deepcopy(self.update_data) |
---|
144 | return s |
---|
145 | |
---|
146 | def get_reachable_servers(self): |
---|
147 | return self.reachable_servers |
---|
148 | |
---|
149 | def mark_server_reachable(self, server): |
---|
150 | self.reachable_servers.add(server) |
---|
151 | |
---|
152 | def mark_server_unreachable(self, server): |
---|
153 | self.unreachable_servers.add(server) |
---|
154 | |
---|
155 | def mark_bad_share(self, server, shnum, checkstring): |
---|
156 | """This share was found to be bad, either in the checkstring or |
---|
157 | signature (detected during mapupdate), or deeper in the share |
---|
158 | (detected at retrieve time). Remove it from our list of useful |
---|
159 | shares, and remember that it is bad so we don't add it back again |
---|
160 | later. We record the share's old checkstring (which might be |
---|
161 | corrupted or badly signed) so that a repair operation can do the |
---|
162 | test-and-set using it as a reference. |
---|
163 | """ |
---|
164 | assert isinstance(checkstring, bytes) |
---|
165 | key = (server, shnum) # record checkstring |
---|
166 | self._bad_shares[key] = checkstring |
---|
167 | self._known_shares.pop(key, None) |
---|
168 | |
---|
169 | def get_bad_shares(self): |
---|
170 | # key=(server,shnum) -> checkstring |
---|
171 | return self._bad_shares |
---|
172 | |
---|
173 | def add_new_share(self, server, shnum, verinfo, timestamp): |
---|
174 | """We've written a new share out, replacing any that was there |
---|
175 | before.""" |
---|
176 | key = (server, shnum) |
---|
177 | self._bad_shares.pop(key, None) |
---|
178 | self._known_shares[key] = (verinfo, timestamp) |
---|
179 | |
---|
180 | def add_problem(self, f): |
---|
181 | self._problems.append(f) |
---|
182 | def get_problems(self): |
---|
183 | return self._problems |
---|
184 | |
---|
185 | def set_last_update(self, mode, when): |
---|
186 | self._last_update_mode = mode |
---|
187 | self._last_update_time = when |
---|
188 | def get_last_update(self): |
---|
189 | return (self._last_update_mode, self._last_update_time) |
---|
190 | |
---|
191 | def dump(self, out=sys.stdout): |
---|
192 | print("servermap:", file=out) |
---|
193 | |
---|
194 | for ( (server, shnum), (verinfo, timestamp) ) in list(self._known_shares.items()): |
---|
195 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
196 | offsets_tuple) = verinfo |
---|
197 | print("[%s]: sh#%d seq%d-%s %d-of-%d len%d" % |
---|
198 | (str(server.get_name(), "utf-8"), shnum, |
---|
199 | seqnum, str(base32.b2a(root_hash)[:4], "utf-8"), k, N, |
---|
200 | datalength), file=out) |
---|
201 | if self._problems: |
---|
202 | print("%d PROBLEMS" % len(self._problems), file=out) |
---|
203 | for f in self._problems: |
---|
204 | print(str(f), file=out) |
---|
205 | return out |
---|
206 | |
---|
207 | def all_servers(self): |
---|
208 | return set([server for (server, shnum) in self._known_shares]) |
---|
209 | |
---|
210 | def all_servers_for_version(self, verinfo): |
---|
211 | """Return a set of servers that hold shares for the given version.""" |
---|
212 | return set([server |
---|
213 | for ( (server, shnum), (verinfo2, timestamp) ) |
---|
214 | in self._known_shares.items() |
---|
215 | if verinfo == verinfo2]) |
---|
216 | |
---|
217 | def get_known_shares(self): |
---|
218 | # maps (server,shnum) to (versionid,timestamp) |
---|
219 | return self._known_shares |
---|
220 | |
---|
221 | def make_sharemap(self): |
---|
222 | """Return a dict that maps shnum to a set of servers that hold it.""" |
---|
223 | sharemap = DictOfSets() |
---|
224 | for (server, shnum) in self._known_shares: |
---|
225 | sharemap.add(shnum, server) |
---|
226 | return sharemap |
---|
227 | |
---|
228 | def make_versionmap(self): |
---|
229 | """Return a dict that maps versionid to sets of (shnum, server, |
---|
230 | timestamp) tuples.""" |
---|
231 | versionmap = DictOfSets() |
---|
232 | for ( (server, shnum), (verinfo, timestamp) ) in list(self._known_shares.items()): |
---|
233 | versionmap.add(verinfo, (shnum, server, timestamp)) |
---|
234 | return versionmap |
---|
235 | |
---|
236 | def debug_shares_on_server(self, server): # used by tests |
---|
237 | return set([shnum for (s, shnum) in self._known_shares if s == server]) |
---|
238 | |
---|
239 | def version_on_server(self, server, shnum): |
---|
240 | key = (server, shnum) |
---|
241 | if key in self._known_shares: |
---|
242 | (verinfo, timestamp) = self._known_shares[key] |
---|
243 | return verinfo |
---|
244 | return None |
---|
245 | |
---|
246 | def shares_available(self): |
---|
247 | """Return a dict that maps verinfo to tuples of |
---|
248 | (num_distinct_shares, k, N) tuples.""" |
---|
249 | versionmap = self.make_versionmap() |
---|
250 | all_shares = {} |
---|
251 | for verinfo, shares in list(versionmap.items()): |
---|
252 | s = set() |
---|
253 | for (shnum, server, timestamp) in shares: |
---|
254 | s.add(shnum) |
---|
255 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
256 | offsets_tuple) = verinfo |
---|
257 | all_shares[verinfo] = (len(s), k, N) |
---|
258 | return all_shares |
---|
259 | |
---|
260 | def highest_seqnum(self): |
---|
261 | available = self.shares_available() |
---|
262 | seqnums = [verinfo[0] |
---|
263 | for verinfo in available.keys()] |
---|
264 | seqnums.append(0) |
---|
265 | return max(seqnums) |
---|
266 | |
---|
267 | def summarize_version(self, verinfo): |
---|
268 | """Take a versionid, return a string that describes it.""" |
---|
269 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
270 | offsets_tuple) = verinfo |
---|
271 | return "seq%d-%s" % (seqnum, str(base32.b2a(root_hash)[:4], "utf-8")) |
---|
272 | |
---|
273 | def summarize_versions(self): |
---|
274 | """Return a string describing which versions we know about.""" |
---|
275 | versionmap = self.make_versionmap() |
---|
276 | bits = [] |
---|
277 | for (verinfo, shares) in list(versionmap.items()): |
---|
278 | vstr = self.summarize_version(verinfo) |
---|
279 | shnums = set([shnum for (shnum, server, timestamp) in shares]) |
---|
280 | bits.append("%d*%s" % (len(shnums), vstr)) |
---|
281 | return "/".join(bits) |
---|
282 | |
---|
283 | def recoverable_versions(self): |
---|
284 | """Return a set of versionids, one for each version that is currently |
---|
285 | recoverable.""" |
---|
286 | versionmap = self.make_versionmap() |
---|
287 | recoverable_versions = set() |
---|
288 | for (verinfo, shares) in list(versionmap.items()): |
---|
289 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
290 | offsets_tuple) = verinfo |
---|
291 | shnums = set([shnum for (shnum, server, timestamp) in shares]) |
---|
292 | if len(shnums) >= k: |
---|
293 | # this one is recoverable |
---|
294 | recoverable_versions.add(verinfo) |
---|
295 | |
---|
296 | return recoverable_versions |
---|
297 | |
---|
298 | def unrecoverable_versions(self): |
---|
299 | """Return a set of versionids, one for each version that is currently |
---|
300 | unrecoverable.""" |
---|
301 | versionmap = self.make_versionmap() |
---|
302 | |
---|
303 | unrecoverable_versions = set() |
---|
304 | for (verinfo, shares) in list(versionmap.items()): |
---|
305 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
306 | offsets_tuple) = verinfo |
---|
307 | shnums = set([shnum for (shnum, server, timestamp) in shares]) |
---|
308 | if len(shnums) < k: |
---|
309 | unrecoverable_versions.add(verinfo) |
---|
310 | |
---|
311 | return unrecoverable_versions |
---|
312 | |
---|
313 | def best_recoverable_version(self): |
---|
314 | """Return a single versionid, for the so-called 'best' recoverable |
---|
315 | version. Sequence number is the primary sort criteria, followed by |
---|
316 | root hash. Returns None if there are no recoverable versions.""" |
---|
317 | recoverable = list(self.recoverable_versions()) |
---|
318 | recoverable.sort() |
---|
319 | if recoverable: |
---|
320 | return recoverable[-1] |
---|
321 | return None |
---|
322 | |
---|
323 | def size_of_version(self, verinfo): |
---|
324 | """Given a versionid (perhaps returned by best_recoverable_version), |
---|
325 | return the size of the file in bytes.""" |
---|
326 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
327 | offsets_tuple) = verinfo |
---|
328 | return datalength |
---|
329 | |
---|
330 | def unrecoverable_newer_versions(self): |
---|
331 | # Return a dict of versionid -> health, for versions that are |
---|
332 | # unrecoverable and have later seqnums than any recoverable versions. |
---|
333 | # These indicate that a write will lose data. |
---|
334 | versionmap = self.make_versionmap() |
---|
335 | healths = {} # maps verinfo to (found,k) |
---|
336 | unrecoverable = set() |
---|
337 | highest_recoverable_seqnum = -1 |
---|
338 | for (verinfo, shares) in list(versionmap.items()): |
---|
339 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
340 | offsets_tuple) = verinfo |
---|
341 | shnums = set([shnum for (shnum, server, timestamp) in shares]) |
---|
342 | healths[verinfo] = (len(shnums),k) |
---|
343 | if len(shnums) < k: |
---|
344 | unrecoverable.add(verinfo) |
---|
345 | else: |
---|
346 | highest_recoverable_seqnum = max(seqnum, |
---|
347 | highest_recoverable_seqnum) |
---|
348 | |
---|
349 | newversions = {} |
---|
350 | for verinfo in unrecoverable: |
---|
351 | (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, |
---|
352 | offsets_tuple) = verinfo |
---|
353 | if seqnum > highest_recoverable_seqnum: |
---|
354 | newversions[verinfo] = healths[verinfo] |
---|
355 | |
---|
356 | return newversions |
---|
357 | |
---|
358 | |
---|
359 | def needs_merge(self): |
---|
360 | # return True if there are multiple recoverable versions with the |
---|
361 | # same seqnum, meaning that MutableFileNode.read_best_version is not |
---|
362 | # giving you the whole story, and that using its data to do a |
---|
363 | # subsequent publish will lose information. |
---|
364 | recoverable_seqnums = [verinfo[0] |
---|
365 | for verinfo in self.recoverable_versions()] |
---|
366 | for seqnum in recoverable_seqnums: |
---|
367 | if recoverable_seqnums.count(seqnum) > 1: |
---|
368 | return True |
---|
369 | return False |
---|
370 | |
---|
371 | |
---|
372 | def get_update_data_for_share_and_verinfo(self, shnum, verinfo): |
---|
373 | """ |
---|
374 | I return the update data for the given shnum |
---|
375 | """ |
---|
376 | update_data = self.update_data[shnum] |
---|
377 | update_datum = [i[1] for i in update_data if i[0] == verinfo][0] |
---|
378 | return update_datum |
---|
379 | |
---|
380 | |
---|
381 | def set_update_data_for_share_and_verinfo(self, shnum, verinfo, data): |
---|
382 | """ |
---|
383 | I record the block hash tree for the given shnum. |
---|
384 | """ |
---|
385 | self.update_data.setdefault(shnum , []).append((verinfo, data)) |
---|
386 | |
---|
387 | |
---|
388 | class ServermapUpdater(object): |
---|
389 | def __init__(self, filenode, storage_broker, monitor, servermap, |
---|
390 | mode=MODE_READ, add_lease=False, update_range=None): |
---|
391 | """I update a servermap, locating a sufficient number of useful |
---|
392 | shares and remembering where they are located. |
---|
393 | |
---|
394 | """ |
---|
395 | |
---|
396 | self._node = filenode |
---|
397 | self._storage_broker = storage_broker |
---|
398 | self._monitor = monitor |
---|
399 | self._servermap = servermap |
---|
400 | self.mode = mode |
---|
401 | self._add_lease = add_lease |
---|
402 | self._running = True |
---|
403 | |
---|
404 | self._storage_index = filenode.get_storage_index() |
---|
405 | self._last_failure = None |
---|
406 | |
---|
407 | self._status = UpdateStatus() |
---|
408 | self._status.set_storage_index(self._storage_index) |
---|
409 | self._status.set_progress(0.0) |
---|
410 | self._status.set_mode(mode) |
---|
411 | |
---|
412 | self._servers_responded = set() |
---|
413 | |
---|
414 | # how much data should we read? |
---|
415 | # SDMF: |
---|
416 | # * if we only need the checkstring, then [0:75] |
---|
417 | # * if we need to validate the checkstring sig, then [543ish:799ish] |
---|
418 | # * if we need the verification key, then [107:436ish] |
---|
419 | # * the offset table at [75:107] tells us about the 'ish' |
---|
420 | # * if we need the encrypted private key, we want [-1216ish:] |
---|
421 | # * but we can't read from negative offsets |
---|
422 | # * the offset table tells us the 'ish', also the positive offset |
---|
423 | # MDMF: |
---|
424 | # * Checkstring? [0:72] |
---|
425 | # * If we want to validate the checkstring, then [0:72], [143:?] -- |
---|
426 | # the offset table will tell us for sure. |
---|
427 | # * If we need the verification key, we have to consult the offset |
---|
428 | # table as well. |
---|
429 | # At this point, we don't know which we are. Our filenode can |
---|
430 | # tell us, but it might be lying -- in some cases, we're |
---|
431 | # responsible for telling it which kind of file it is. |
---|
432 | self._read_size = 4000 |
---|
433 | if mode == MODE_CHECK: |
---|
434 | # we use unpack_prefix_and_signature, so we need 1k |
---|
435 | self._read_size = 1000 |
---|
436 | self._need_privkey = False |
---|
437 | |
---|
438 | if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey(): |
---|
439 | self._need_privkey = True |
---|
440 | # check+repair: repair requires the privkey, so if we didn't happen |
---|
441 | # to ask for it during the check, we'll have problems doing the |
---|
442 | # publish. |
---|
443 | |
---|
444 | self.fetch_update_data = False |
---|
445 | if mode == MODE_WRITE and update_range: |
---|
446 | # We're updating the servermap in preparation for an |
---|
447 | # in-place file update, so we need to fetch some additional |
---|
448 | # data from each share that we find. |
---|
449 | assert len(update_range) == 2 |
---|
450 | |
---|
451 | self.start_segment = update_range[0] |
---|
452 | self.end_segment = update_range[1] |
---|
453 | self.fetch_update_data = True |
---|
454 | |
---|
455 | prefix = si_b2a(self._storage_index)[:5] |
---|
456 | self._log_number = log.msg(format="SharemapUpdater(%(si)s): starting (%(mode)s)", |
---|
457 | si=prefix, mode=mode) |
---|
458 | |
---|
459 | def get_status(self): |
---|
460 | return self._status |
---|
461 | |
---|
462 | def log(self, *args, **kwargs): |
---|
463 | if "parent" not in kwargs: |
---|
464 | kwargs["parent"] = self._log_number |
---|
465 | if "facility" not in kwargs: |
---|
466 | kwargs["facility"] = "tahoe.mutable.mapupdate" |
---|
467 | return log.msg(*args, **kwargs) |
---|
468 | |
---|
469 | def update(self): |
---|
470 | """Update the servermap to reflect current conditions. Returns a |
---|
471 | Deferred that fires with the servermap once the update has finished.""" |
---|
472 | self._started = time.time() |
---|
473 | self._status.set_active(True) |
---|
474 | |
---|
475 | # self._valid_versions is a set of validated verinfo tuples. We just |
---|
476 | # use it to remember which versions had valid signatures, so we can |
---|
477 | # avoid re-checking the signatures for each share. |
---|
478 | self._valid_versions = set() |
---|
479 | |
---|
480 | self._done_deferred = defer.Deferred() |
---|
481 | |
---|
482 | # first, which servers should be talk to? Any that were in our old |
---|
483 | # servermap, plus "enough" others. |
---|
484 | |
---|
485 | self._queries_completed = 0 |
---|
486 | |
---|
487 | sb = self._storage_broker |
---|
488 | # All of the servers, permuted by the storage index, as usual. |
---|
489 | full_serverlist = list(sb.get_servers_for_psi(self._storage_index)) |
---|
490 | self.full_serverlist = full_serverlist # for use later, immutable |
---|
491 | self.extra_servers = full_serverlist[:] # servers are removed as we use them |
---|
492 | self._good_servers = set() # servers who had some shares |
---|
493 | self._servers_with_shares = set() #servers that we know have shares now |
---|
494 | self._empty_servers = set() # servers who don't have any shares |
---|
495 | self._bad_servers = set() # servers to whom our queries failed |
---|
496 | |
---|
497 | k = self._node.get_required_shares() |
---|
498 | # For what cases can these conditions work? |
---|
499 | if k is None: |
---|
500 | # make a guess |
---|
501 | k = 3 |
---|
502 | N = self._node.get_total_shares() |
---|
503 | if N is None: |
---|
504 | N = 10 |
---|
505 | self.EPSILON = k |
---|
506 | # we want to send queries to at least this many servers (although we |
---|
507 | # might not wait for all of their answers to come back) |
---|
508 | self.num_servers_to_query = k + self.EPSILON |
---|
509 | |
---|
510 | if self.mode in (MODE_CHECK, MODE_REPAIR): |
---|
511 | # We want to query all of the servers. |
---|
512 | initial_servers_to_query = list(full_serverlist) |
---|
513 | must_query = set(initial_servers_to_query) |
---|
514 | self.extra_servers = [] |
---|
515 | elif self.mode == MODE_WRITE: |
---|
516 | # we're planning to replace all the shares, so we want a good |
---|
517 | # chance of finding them all. We will keep searching until we've |
---|
518 | # seen epsilon that don't have a share. |
---|
519 | # We don't query all of the servers because that could take a while. |
---|
520 | self.num_servers_to_query = N + self.EPSILON |
---|
521 | initial_servers_to_query, must_query = self._build_initial_querylist() |
---|
522 | self.required_num_empty_servers = self.EPSILON |
---|
523 | |
---|
524 | # TODO: arrange to read lots of data from k-ish servers, to avoid |
---|
525 | # the extra round trip required to read large directories. This |
---|
526 | # might also avoid the round trip required to read the encrypted |
---|
527 | # private key. |
---|
528 | |
---|
529 | else: # MODE_READ, MODE_ANYTHING |
---|
530 | # 2*k servers is good enough. |
---|
531 | initial_servers_to_query, must_query = self._build_initial_querylist() |
---|
532 | |
---|
533 | # this is a set of servers that we are required to get responses |
---|
534 | # from: they are servers who used to have a share, so we need to know |
---|
535 | # where they currently stand, even if that means we have to wait for |
---|
536 | # a silently-lost TCP connection to time out. We remove servers from |
---|
537 | # this set as we get responses. |
---|
538 | self._must_query = set(must_query) |
---|
539 | |
---|
540 | # now initial_servers_to_query contains the servers that we should |
---|
541 | # ask, self.must_query contains the servers that we must have heard |
---|
542 | # from before we can consider ourselves finished, and |
---|
543 | # self.extra_servers contains the overflow (servers that we should |
---|
544 | # tap if we don't get enough responses) |
---|
545 | # I guess that self._must_query is a subset of |
---|
546 | # initial_servers_to_query? |
---|
547 | assert must_query.issubset(initial_servers_to_query) |
---|
548 | |
---|
549 | self._send_initial_requests(initial_servers_to_query) |
---|
550 | self._status.timings["initial_queries"] = time.time() - self._started |
---|
551 | return self._done_deferred |
---|
552 | |
---|
553 | def _build_initial_querylist(self): |
---|
554 | # we send queries to everyone who was already in the sharemap |
---|
555 | initial_servers_to_query = set(self._servermap.all_servers()) |
---|
556 | # and we must wait for responses from them |
---|
557 | must_query = set(initial_servers_to_query) |
---|
558 | |
---|
559 | while ((self.num_servers_to_query > len(initial_servers_to_query)) |
---|
560 | and self.extra_servers): |
---|
561 | initial_servers_to_query.add(self.extra_servers.pop(0)) |
---|
562 | |
---|
563 | return initial_servers_to_query, must_query |
---|
564 | |
---|
565 | def _send_initial_requests(self, serverlist): |
---|
566 | self._status.set_status("Sending %d initial queries" % len(serverlist)) |
---|
567 | self._queries_outstanding = set() |
---|
568 | for server in serverlist: |
---|
569 | self._queries_outstanding.add(server) |
---|
570 | self._do_query(server, self._storage_index, self._read_size) |
---|
571 | |
---|
572 | if not serverlist: |
---|
573 | # there is nobody to ask, so we need to short-circuit the state |
---|
574 | # machine. |
---|
575 | d = defer.maybeDeferred(self._check_for_done, None) |
---|
576 | d.addErrback(self._fatal_error) |
---|
577 | |
---|
578 | # control flow beyond this point: state machine. Receiving responses |
---|
579 | # from queries is the input. We might send out more queries, or we |
---|
580 | # might produce a result. |
---|
581 | return None |
---|
582 | |
---|
583 | def _do_query(self, server, storage_index, readsize): |
---|
584 | self.log(format="sending query to [%(name)s], readsize=%(readsize)d", |
---|
585 | name=server.get_name(), |
---|
586 | readsize=readsize, |
---|
587 | level=log.NOISY) |
---|
588 | started = time.time() |
---|
589 | self._queries_outstanding.add(server) |
---|
590 | d = self._do_read(server, storage_index, [], [(0, readsize)]) |
---|
591 | d.addCallback(self._got_results, server, readsize, storage_index, |
---|
592 | started) |
---|
593 | d.addErrback(self._query_failed, server) |
---|
594 | # errors that aren't handled by _query_failed (and errors caused by |
---|
595 | # _query_failed) get logged, but we still want to check for doneness. |
---|
596 | d.addErrback(log.err) |
---|
597 | d.addErrback(self._fatal_error) |
---|
598 | d.addCallback(self._check_for_done) |
---|
599 | return d |
---|
600 | |
---|
601 | def _do_read(self, server, storage_index, shnums, readv): |
---|
602 | """ |
---|
603 | If self._add_lease is true, a lease is added, and the result only fires |
---|
604 | once the least has also been added. |
---|
605 | """ |
---|
606 | ss = server.get_storage_server() |
---|
607 | if self._add_lease: |
---|
608 | # send an add-lease message in parallel. The results are handled |
---|
609 | # separately. |
---|
610 | renew_secret = self._node.get_renewal_secret(server) |
---|
611 | cancel_secret = self._node.get_cancel_secret(server) |
---|
612 | d2 = ss.add_lease( |
---|
613 | storage_index, |
---|
614 | renew_secret, |
---|
615 | cancel_secret, |
---|
616 | ) |
---|
617 | # we ignore success |
---|
618 | d2.addErrback(self._add_lease_failed, server, storage_index) |
---|
619 | else: |
---|
620 | d2 = defer.succeed(None) |
---|
621 | d = ss.slot_readv(storage_index, shnums, readv) |
---|
622 | |
---|
623 | def passthrough(result): |
---|
624 | # Wait for d2, but fire with result of slot_readv() regardless of |
---|
625 | # result of d2. |
---|
626 | return d2.addBoth(lambda _: result) |
---|
627 | |
---|
628 | d.addCallback(passthrough) |
---|
629 | return d |
---|
630 | |
---|
631 | |
---|
632 | def _got_corrupt_share(self, e, shnum, server, data, lp): |
---|
633 | """ |
---|
634 | I am called when a remote server returns a corrupt share in |
---|
635 | response to one of our queries. By corrupt, I mean a share |
---|
636 | without a valid signature. I then record the failure, notify the |
---|
637 | server of the corruption, and record the share as bad. |
---|
638 | """ |
---|
639 | f = failure.Failure(e) |
---|
640 | self.log(format="bad share: %(f_value)s", f_value=str(f), |
---|
641 | failure=f, parent=lp, level=log.WEIRD, umid="h5llHg") |
---|
642 | # Notify the server that its share is corrupt. |
---|
643 | self.notify_server_corruption(server, shnum, str(e)) |
---|
644 | # By flagging this as a bad server, we won't count any of |
---|
645 | # the other shares on that server as valid, though if we |
---|
646 | # happen to find a valid version string amongst those |
---|
647 | # shares, we'll keep track of it so that we don't need |
---|
648 | # to validate the signature on those again. |
---|
649 | self._bad_servers.add(server) |
---|
650 | self._last_failure = f |
---|
651 | # XXX: Use the reader for this? |
---|
652 | checkstring = data[:SIGNED_PREFIX_LENGTH] |
---|
653 | self._servermap.mark_bad_share(server, shnum, checkstring) |
---|
654 | self._servermap.add_problem(f) |
---|
655 | |
---|
656 | |
---|
657 | def _got_results(self, datavs, server, readsize, storage_index, started): |
---|
658 | lp = self.log(format="got result from [%(name)s], %(numshares)d shares", |
---|
659 | name=server.get_name(), |
---|
660 | numshares=len(datavs)) |
---|
661 | ss = server.get_storage_server() |
---|
662 | now = time.time() |
---|
663 | elapsed = now - started |
---|
664 | def _done_processing(ignored=None): |
---|
665 | self._queries_outstanding.discard(server) |
---|
666 | self._servermap.mark_server_reachable(server) |
---|
667 | self._must_query.discard(server) |
---|
668 | self._queries_completed += 1 |
---|
669 | if not self._running: |
---|
670 | self.log("but we're not running, so we'll ignore it", parent=lp) |
---|
671 | _done_processing() |
---|
672 | self._status.add_per_server_time(server, "late", started, elapsed) |
---|
673 | return |
---|
674 | self._status.add_per_server_time(server, "query", started, elapsed) |
---|
675 | |
---|
676 | if datavs: |
---|
677 | self._good_servers.add(server) |
---|
678 | else: |
---|
679 | self._empty_servers.add(server) |
---|
680 | |
---|
681 | ds = [] |
---|
682 | |
---|
683 | for shnum,datav in list(datavs.items()): |
---|
684 | data = datav[0] |
---|
685 | reader = MDMFSlotReadProxy(ss, |
---|
686 | storage_index, |
---|
687 | shnum, |
---|
688 | data, |
---|
689 | data_is_everything=(len(data) < readsize)) |
---|
690 | |
---|
691 | # our goal, with each response, is to validate the version |
---|
692 | # information and share data as best we can at this point -- |
---|
693 | # we do this by validating the signature. To do this, we |
---|
694 | # need to do the following: |
---|
695 | # - If we don't already have the public key, fetch the |
---|
696 | # public key. We use this to validate the signature. |
---|
697 | if not self._node.get_pubkey(): |
---|
698 | # fetch and set the public key. |
---|
699 | d = reader.get_verification_key() |
---|
700 | d.addCallback(lambda results, shnum=shnum: |
---|
701 | self._try_to_set_pubkey(results, server, shnum, lp)) |
---|
702 | # XXX: Make self._pubkey_query_failed? |
---|
703 | d.addErrback(lambda error, shnum=shnum, data=data: |
---|
704 | self._got_corrupt_share(error, shnum, server, data, lp)) |
---|
705 | else: |
---|
706 | # we already have the public key. |
---|
707 | d = defer.succeed(None) |
---|
708 | |
---|
709 | # Neither of these two branches return anything of |
---|
710 | # consequence, so the first entry in our deferredlist will |
---|
711 | # be None. |
---|
712 | |
---|
713 | # - Next, we need the version information. We almost |
---|
714 | # certainly got this by reading the first thousand or so |
---|
715 | # bytes of the share on the storage server, so we |
---|
716 | # shouldn't need to fetch anything at this step. |
---|
717 | d2 = reader.get_verinfo() |
---|
718 | d2.addErrback(lambda error, shnum=shnum, data=data: |
---|
719 | self._got_corrupt_share(error, shnum, server, data, lp)) |
---|
720 | # - Next, we need the signature. For an SDMF share, it is |
---|
721 | # likely that we fetched this when doing our initial fetch |
---|
722 | # to get the version information. In MDMF, this lives at |
---|
723 | # the end of the share, so unless the file is quite small, |
---|
724 | # we'll need to do a remote fetch to get it. |
---|
725 | d3 = reader.get_signature() |
---|
726 | d3.addErrback(lambda error, shnum=shnum, data=data: |
---|
727 | self._got_corrupt_share(error, shnum, server, data, lp)) |
---|
728 | # Once we have all three of these responses, we can move on |
---|
729 | # to validating the signature |
---|
730 | |
---|
731 | # Does the node already have a privkey? If not, we'll try to |
---|
732 | # fetch it here. |
---|
733 | if self._need_privkey: |
---|
734 | d4 = reader.get_encprivkey() |
---|
735 | d4.addCallback(lambda results, shnum=shnum: |
---|
736 | self._try_to_validate_privkey(results, server, shnum, lp)) |
---|
737 | d4.addErrback(lambda error, shnum=shnum: |
---|
738 | self._privkey_query_failed(error, server, shnum, lp)) |
---|
739 | else: |
---|
740 | d4 = defer.succeed(None) |
---|
741 | |
---|
742 | |
---|
743 | if self.fetch_update_data: |
---|
744 | # fetch the block hash tree and first + last segment, as |
---|
745 | # configured earlier. |
---|
746 | # Then set them in wherever we happen to want to set |
---|
747 | # them. |
---|
748 | ds = [] |
---|
749 | # XXX: We do this above, too. Is there a good way to |
---|
750 | # make the two routines share the value without |
---|
751 | # introducing more roundtrips? |
---|
752 | ds.append(reader.get_verinfo()) |
---|
753 | ds.append(reader.get_blockhashes()) |
---|
754 | ds.append(reader.get_block_and_salt(self.start_segment)) |
---|
755 | ds.append(reader.get_block_and_salt(self.end_segment)) |
---|
756 | d5 = deferredutil.gatherResults(ds) |
---|
757 | d5.addCallback(self._got_update_results_one_share, shnum) |
---|
758 | else: |
---|
759 | d5 = defer.succeed(None) |
---|
760 | |
---|
761 | dl = defer.DeferredList([d, d2, d3, d4, d5]) |
---|
762 | def _append_proxy(passthrough, shnum=shnum, reader=reader): |
---|
763 | # Store the proxy (with its cache) keyed by serverid and |
---|
764 | # version. |
---|
765 | _, (_,verinfo), _, _, _ = passthrough |
---|
766 | verinfo = self._make_verinfo_hashable(verinfo) |
---|
767 | self._servermap.proxies[(verinfo, |
---|
768 | server.get_serverid(), |
---|
769 | storage_index, shnum)] = reader |
---|
770 | return passthrough |
---|
771 | dl.addCallback(_append_proxy) |
---|
772 | dl.addBoth(self._turn_barrier) |
---|
773 | dl.addCallback(lambda results, shnum=shnum: |
---|
774 | self._got_signature_one_share(results, shnum, server, lp)) |
---|
775 | dl.addErrback(lambda error, shnum=shnum, data=data: |
---|
776 | self._got_corrupt_share(error, shnum, server, data, lp)) |
---|
777 | ds.append(dl) |
---|
778 | # dl is a deferred list that will fire when all of the shares |
---|
779 | # that we found on this server are done processing. When dl fires, |
---|
780 | # we know that processing is done, so we can decrement the |
---|
781 | # semaphore-like thing that we incremented earlier. |
---|
782 | dl = defer.DeferredList(ds, fireOnOneErrback=True) |
---|
783 | # Are we done? Done means that there are no more queries to |
---|
784 | # send, that there are no outstanding queries, and that we |
---|
785 | # haven't received any queries that are still processing. If we |
---|
786 | # are done, self._check_for_done will cause the done deferred |
---|
787 | # that we returned to our caller to fire, which tells them that |
---|
788 | # they have a complete servermap, and that we won't be touching |
---|
789 | # the servermap anymore. |
---|
790 | dl.addCallback(_done_processing) |
---|
791 | dl.addCallback(self._check_for_done) |
---|
792 | dl.addErrback(self._fatal_error) |
---|
793 | # all done! |
---|
794 | self.log("_got_results done", parent=lp, level=log.NOISY) |
---|
795 | return dl |
---|
796 | |
---|
797 | |
---|
798 | def _turn_barrier(self, result): |
---|
799 | """ |
---|
800 | I help the servermap updater avoid the recursion limit issues |
---|
801 | discussed in #237. |
---|
802 | """ |
---|
803 | return fireEventually(result) |
---|
804 | |
---|
805 | |
---|
806 | def _try_to_set_pubkey(self, pubkey_s, server, shnum, lp): |
---|
807 | if self._node.get_pubkey(): |
---|
808 | return # don't go through this again if we don't have to |
---|
809 | fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) |
---|
810 | assert len(fingerprint) == 32 |
---|
811 | if fingerprint != self._node.get_fingerprint(): |
---|
812 | raise CorruptShareError(server, shnum, |
---|
813 | "pubkey doesn't match fingerprint") |
---|
814 | self._node._populate_pubkey(self._deserialize_pubkey(pubkey_s)) |
---|
815 | assert self._node.get_pubkey() |
---|
816 | |
---|
817 | |
---|
818 | def notify_server_corruption(self, server, shnum, reason): |
---|
819 | if isinstance(reason, str): |
---|
820 | reason = reason.encode("utf-8") |
---|
821 | ss = server.get_storage_server() |
---|
822 | ss.advise_corrupt_share( |
---|
823 | b"mutable", |
---|
824 | self._storage_index, |
---|
825 | shnum, |
---|
826 | reason, |
---|
827 | ) |
---|
828 | |
---|
829 | |
---|
830 | def _got_signature_one_share(self, results, shnum, server, lp): |
---|
831 | # It is our job to give versioninfo to our caller. We need to |
---|
832 | # raise CorruptShareError if the share is corrupt for any |
---|
833 | # reason, something that our caller will handle. |
---|
834 | self.log(format="_got_results: got shnum #%(shnum)d from serverid %(name)s", |
---|
835 | shnum=shnum, |
---|
836 | name=server.get_name(), |
---|
837 | level=log.NOISY, |
---|
838 | parent=lp) |
---|
839 | if not self._running: |
---|
840 | # We can't process the results, since we can't touch the |
---|
841 | # servermap anymore. |
---|
842 | self.log("but we're not running anymore.") |
---|
843 | return None |
---|
844 | |
---|
845 | _, verinfo, signature, __, ___ = results |
---|
846 | verinfo = self._make_verinfo_hashable(verinfo[1]) |
---|
847 | |
---|
848 | # This tuple uniquely identifies a share on the grid; we use it |
---|
849 | # to keep track of the ones that we've already seen. |
---|
850 | (seqnum, |
---|
851 | root_hash, |
---|
852 | saltish, |
---|
853 | segsize, |
---|
854 | datalen, |
---|
855 | k, |
---|
856 | n, |
---|
857 | prefix, |
---|
858 | offsets_tuple) = verinfo |
---|
859 | |
---|
860 | |
---|
861 | if verinfo not in self._valid_versions: |
---|
862 | # This is a new version tuple, and we need to validate it |
---|
863 | # against the public key before keeping track of it. |
---|
864 | assert self._node.get_pubkey() |
---|
865 | try: |
---|
866 | rsa.verify_signature(self._node.get_pubkey(), signature[1], prefix) |
---|
867 | except BadSignature: |
---|
868 | raise CorruptShareError(server, shnum, |
---|
869 | "signature is invalid") |
---|
870 | |
---|
871 | # ok, it's a valid verinfo. Add it to the list of validated |
---|
872 | # versions. |
---|
873 | self.log(" found valid version %d-%s from %s-sh%d: %d-%d/%d/%d" |
---|
874 | % (seqnum, str(base32.b2a(root_hash)[:4], "utf-8"), |
---|
875 | ensure_str(server.get_name()), shnum, |
---|
876 | k, n, segsize, datalen), |
---|
877 | parent=lp) |
---|
878 | self._valid_versions.add(verinfo) |
---|
879 | # We now know that this is a valid candidate verinfo. Whether or |
---|
880 | # not this instance of it is valid is a matter for the next |
---|
881 | # statement; at this point, we just know that if we see this |
---|
882 | # version info again, that its signature checks out and that |
---|
883 | # we're okay to skip the signature-checking step. |
---|
884 | |
---|
885 | # (server, shnum) are bound in the method invocation. |
---|
886 | if (server, shnum) in self._servermap.get_bad_shares(): |
---|
887 | # we've been told that the rest of the data in this share is |
---|
888 | # unusable, so don't add it to the servermap. |
---|
889 | self.log("but we've been told this is a bad share", |
---|
890 | parent=lp, level=log.UNUSUAL) |
---|
891 | return verinfo |
---|
892 | |
---|
893 | # Add the info to our servermap. |
---|
894 | timestamp = time.time() |
---|
895 | self._servermap.add_new_share(server, shnum, verinfo, timestamp) |
---|
896 | self._servers_with_shares.add(server) |
---|
897 | |
---|
898 | return verinfo |
---|
899 | |
---|
900 | def _make_verinfo_hashable(self, verinfo): |
---|
901 | (seqnum, |
---|
902 | root_hash, |
---|
903 | saltish, |
---|
904 | segsize, |
---|
905 | datalen, |
---|
906 | k, |
---|
907 | n, |
---|
908 | prefix, |
---|
909 | offsets) = verinfo |
---|
910 | |
---|
911 | offsets_tuple = tuple( [(key,value) for key,value in offsets.items()] ) |
---|
912 | |
---|
913 | verinfo = (seqnum, |
---|
914 | root_hash, |
---|
915 | saltish, |
---|
916 | segsize, |
---|
917 | datalen, |
---|
918 | k, |
---|
919 | n, |
---|
920 | prefix, |
---|
921 | offsets_tuple) |
---|
922 | return verinfo |
---|
923 | |
---|
924 | def _got_update_results_one_share(self, results, share): |
---|
925 | """ |
---|
926 | I record the update results in results. |
---|
927 | """ |
---|
928 | assert len(results) == 4 |
---|
929 | verinfo, blockhashes, start, end = results |
---|
930 | verinfo = self._make_verinfo_hashable(verinfo) |
---|
931 | update_data = (blockhashes, start, end) |
---|
932 | self._servermap.set_update_data_for_share_and_verinfo(share, |
---|
933 | verinfo, |
---|
934 | update_data) |
---|
935 | |
---|
936 | def _deserialize_pubkey(self, pubkey_s): |
---|
937 | verifier = rsa.create_verifying_key_from_string(pubkey_s) |
---|
938 | return verifier |
---|
939 | |
---|
940 | def _try_to_validate_privkey(self, enc_privkey, server, shnum, lp): |
---|
941 | """ |
---|
942 | Given a writekey from a remote server, I validate it against the |
---|
943 | writekey stored in my node. If it is valid, then I set the |
---|
944 | privkey and encprivkey properties of the node. |
---|
945 | """ |
---|
946 | node_writekey = self._node.get_writekey() |
---|
947 | alleged_privkey_s = decrypt_privkey(node_writekey, enc_privkey) |
---|
948 | alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s) |
---|
949 | if alleged_writekey != node_writekey: |
---|
950 | self.log("invalid privkey from %r shnum %d" % |
---|
951 | (server.get_name(), shnum), |
---|
952 | parent=lp, level=log.WEIRD, umid="aJVccw") |
---|
953 | return |
---|
954 | |
---|
955 | # it's good |
---|
956 | self.log("got valid privkey from shnum %d on serverid %r" % |
---|
957 | (shnum, server.get_name()), |
---|
958 | parent=lp) |
---|
959 | privkey, _ = rsa.create_signing_keypair_from_string(alleged_privkey_s) |
---|
960 | self._node._populate_encprivkey(enc_privkey) |
---|
961 | self._node._populate_privkey(privkey) |
---|
962 | self._need_privkey = False |
---|
963 | self._status.set_privkey_from(server) |
---|
964 | |
---|
965 | |
---|
966 | def _add_lease_failed(self, f, server, storage_index): |
---|
967 | # Older versions of Tahoe didn't handle the add-lease message very |
---|
968 | # well: <=1.1.0 throws a NameError because it doesn't implement |
---|
969 | # remote_add_lease(), 1.2.0/1.3.0 throw IndexError on unknown buckets |
---|
970 | # (which is most of them, since we send add-lease to everybody, |
---|
971 | # before we know whether or not they have any shares for us), and |
---|
972 | # 1.2.0 throws KeyError even on known buckets due to an internal bug |
---|
973 | # in the latency-measuring code. |
---|
974 | |
---|
975 | # we want to ignore the known-harmless errors and log the others. In |
---|
976 | # particular we want to log any local errors caused by coding |
---|
977 | # problems. |
---|
978 | |
---|
979 | if f.check(DeadReferenceError): |
---|
980 | return |
---|
981 | if f.check(RemoteException): |
---|
982 | if f.value.failure.check(KeyError, IndexError, NameError): |
---|
983 | # this may ignore a bit too much, but that only hurts us |
---|
984 | # during debugging |
---|
985 | return |
---|
986 | self.log(format="error in add_lease from [%(name)s]: %(f_value)s", |
---|
987 | name=server.get_name(), |
---|
988 | f_value=str(f.value), |
---|
989 | failure=f, |
---|
990 | level=log.WEIRD, umid="iqg3mw") |
---|
991 | return |
---|
992 | # local errors are cause for alarm |
---|
993 | log.err(f, |
---|
994 | format="local error in add_lease to [%(name)s]: %(f_value)s", |
---|
995 | name=server.get_name(), |
---|
996 | f_value=str(f.value), |
---|
997 | level=log.WEIRD, umid="ZWh6HA") |
---|
998 | |
---|
999 | def _query_failed(self, f, server): |
---|
1000 | if not self._running: |
---|
1001 | return |
---|
1002 | level = log.WEIRD |
---|
1003 | if f.check(DeadReferenceError): |
---|
1004 | level = log.UNUSUAL |
---|
1005 | self.log(format="error during query: %(f_value)s", |
---|
1006 | f_value=str(f.value), failure=f, |
---|
1007 | level=level, umid="IHXuQg") |
---|
1008 | self._must_query.discard(server) |
---|
1009 | self._queries_outstanding.discard(server) |
---|
1010 | self._bad_servers.add(server) |
---|
1011 | self._servermap.add_problem(f) |
---|
1012 | # a server could be in both ServerMap.reachable_servers and |
---|
1013 | # .unreachable_servers if they responded to our query, but then an |
---|
1014 | # exception was raised in _got_results. |
---|
1015 | self._servermap.mark_server_unreachable(server) |
---|
1016 | self._queries_completed += 1 |
---|
1017 | self._last_failure = f |
---|
1018 | |
---|
1019 | |
---|
1020 | def _privkey_query_failed(self, f, server, shnum, lp): |
---|
1021 | self._queries_outstanding.discard(server) |
---|
1022 | if not self._running: |
---|
1023 | return |
---|
1024 | level = log.WEIRD |
---|
1025 | if f.check(DeadReferenceError): |
---|
1026 | level = log.UNUSUAL |
---|
1027 | self.log(format="error during privkey query: %(f_value)s", |
---|
1028 | f_value=str(f.value), failure=f, |
---|
1029 | parent=lp, level=level, umid="McoJ5w") |
---|
1030 | self._servermap.add_problem(f) |
---|
1031 | self._last_failure = f |
---|
1032 | |
---|
1033 | |
---|
1034 | def _check_for_done(self, res): |
---|
1035 | # exit paths: |
---|
1036 | # return self._send_more_queries(outstanding) : send some more queries |
---|
1037 | # return self._done() : all done |
---|
1038 | # return : keep waiting, no new queries |
---|
1039 | lp = self.log(format=("_check_for_done, mode is '%(mode)s', " |
---|
1040 | "%(outstanding)d queries outstanding, " |
---|
1041 | "%(extra)d extra servers available, " |
---|
1042 | "%(must)d 'must query' servers left, " |
---|
1043 | "need_privkey=%(need_privkey)s" |
---|
1044 | ), |
---|
1045 | mode=self.mode, |
---|
1046 | outstanding=len(self._queries_outstanding), |
---|
1047 | extra=len(self.extra_servers), |
---|
1048 | must=len(self._must_query), |
---|
1049 | need_privkey=self._need_privkey, |
---|
1050 | level=log.NOISY, |
---|
1051 | ) |
---|
1052 | |
---|
1053 | if not self._running: |
---|
1054 | self.log("but we're not running", parent=lp, level=log.NOISY) |
---|
1055 | return |
---|
1056 | |
---|
1057 | if self._must_query: |
---|
1058 | # we are still waiting for responses from servers that used to have |
---|
1059 | # a share, so we must continue to wait. No additional queries are |
---|
1060 | # required at this time. |
---|
1061 | self.log("%d 'must query' servers left" % len(self._must_query), |
---|
1062 | level=log.NOISY, parent=lp) |
---|
1063 | return |
---|
1064 | |
---|
1065 | if (not self._queries_outstanding and not self.extra_servers): |
---|
1066 | # all queries have retired, and we have no servers left to ask. No |
---|
1067 | # more progress can be made, therefore we are done. |
---|
1068 | self.log("all queries are retired, no extra servers: done", |
---|
1069 | parent=lp) |
---|
1070 | return self._done() |
---|
1071 | |
---|
1072 | recoverable_versions = self._servermap.recoverable_versions() |
---|
1073 | unrecoverable_versions = self._servermap.unrecoverable_versions() |
---|
1074 | |
---|
1075 | # what is our completion policy? how hard should we work? |
---|
1076 | |
---|
1077 | if self.mode == MODE_ANYTHING: |
---|
1078 | if recoverable_versions: |
---|
1079 | self.log("%d recoverable versions: done" |
---|
1080 | % len(recoverable_versions), |
---|
1081 | parent=lp) |
---|
1082 | return self._done() |
---|
1083 | |
---|
1084 | if self.mode in (MODE_CHECK, MODE_REPAIR): |
---|
1085 | # we used self._must_query, and we know there aren't any |
---|
1086 | # responses still waiting, so that means we must be done |
---|
1087 | self.log("done", parent=lp) |
---|
1088 | return self._done() |
---|
1089 | |
---|
1090 | MAX_IN_FLIGHT = 5 |
---|
1091 | if self.mode == MODE_READ: |
---|
1092 | # if we've queried k+epsilon servers, and we see a recoverable |
---|
1093 | # version, and we haven't seen any unrecoverable higher-seqnum'ed |
---|
1094 | # versions, then we're done. |
---|
1095 | |
---|
1096 | if self._queries_completed < self.num_servers_to_query: |
---|
1097 | self.log(format="%(completed)d completed, %(query)d to query: need more", |
---|
1098 | completed=self._queries_completed, |
---|
1099 | query=self.num_servers_to_query, |
---|
1100 | level=log.NOISY, parent=lp) |
---|
1101 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1102 | if not recoverable_versions: |
---|
1103 | self.log("no recoverable versions: need more", |
---|
1104 | level=log.NOISY, parent=lp) |
---|
1105 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1106 | highest_recoverable = max(recoverable_versions) |
---|
1107 | highest_recoverable_seqnum = highest_recoverable[0] |
---|
1108 | for unrec_verinfo in unrecoverable_versions: |
---|
1109 | if unrec_verinfo[0] > highest_recoverable_seqnum: |
---|
1110 | # there is evidence of a higher-seqnum version, but we |
---|
1111 | # don't yet see enough shares to recover it. Try harder. |
---|
1112 | # TODO: consider sending more queries. |
---|
1113 | # TODO: consider limiting the search distance |
---|
1114 | self.log("evidence of higher seqnum: need more", |
---|
1115 | level=log.UNUSUAL, parent=lp) |
---|
1116 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1117 | # all the unrecoverable versions were old or concurrent with a |
---|
1118 | # recoverable version. Good enough. |
---|
1119 | self.log("no higher-seqnum: done", parent=lp) |
---|
1120 | return self._done() |
---|
1121 | |
---|
1122 | if self.mode == MODE_WRITE: |
---|
1123 | # we want to keep querying until we've seen a few that don't have |
---|
1124 | # any shares, to be sufficiently confident that we've seen all |
---|
1125 | # the shares. This is still less work than MODE_CHECK, which asks |
---|
1126 | # every server in the world. |
---|
1127 | |
---|
1128 | if not recoverable_versions: |
---|
1129 | self.log("no recoverable versions: need more", parent=lp, |
---|
1130 | level=log.NOISY) |
---|
1131 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1132 | |
---|
1133 | last_found = -1 |
---|
1134 | last_not_responded = -1 |
---|
1135 | num_not_responded = 0 |
---|
1136 | num_not_found = 0 |
---|
1137 | states = [] |
---|
1138 | found_boundary = False |
---|
1139 | |
---|
1140 | for i,server in enumerate(self.full_serverlist): |
---|
1141 | if server in self._bad_servers: |
---|
1142 | # query failed |
---|
1143 | states.append("x") |
---|
1144 | #self.log("loop [%s]: x" % server.get_name() |
---|
1145 | elif server in self._empty_servers: |
---|
1146 | # no shares |
---|
1147 | states.append("0") |
---|
1148 | #self.log("loop [%s]: 0" % server.get_name() |
---|
1149 | if last_found != -1: |
---|
1150 | num_not_found += 1 |
---|
1151 | if num_not_found >= self.EPSILON: |
---|
1152 | self.log("found our boundary, %s" % |
---|
1153 | "".join(states), |
---|
1154 | parent=lp, level=log.NOISY) |
---|
1155 | found_boundary = True |
---|
1156 | break |
---|
1157 | |
---|
1158 | elif server in self._servers_with_shares: |
---|
1159 | # yes shares |
---|
1160 | states.append("1") |
---|
1161 | #self.log("loop [%s]: 1" % server.get_name() |
---|
1162 | last_found = i |
---|
1163 | num_not_found = 0 |
---|
1164 | else: |
---|
1165 | # not responded yet |
---|
1166 | states.append("?") |
---|
1167 | #self.log("loop [%s]: ?" % server.get_name() |
---|
1168 | last_not_responded = i |
---|
1169 | num_not_responded += 1 |
---|
1170 | |
---|
1171 | if found_boundary: |
---|
1172 | # we need to know that we've gotten answers from |
---|
1173 | # everybody to the left of here |
---|
1174 | if last_not_responded == -1: |
---|
1175 | # we're done |
---|
1176 | self.log("have all our answers", |
---|
1177 | parent=lp, level=log.NOISY) |
---|
1178 | # .. unless we're still waiting on the privkey |
---|
1179 | if self._need_privkey: |
---|
1180 | self.log("but we're still waiting for the privkey", |
---|
1181 | parent=lp, level=log.NOISY) |
---|
1182 | # if we found the boundary but we haven't yet found |
---|
1183 | # the privkey, we may need to look further. If |
---|
1184 | # somehow all the privkeys were corrupted (but the |
---|
1185 | # shares were readable), then this is likely to do an |
---|
1186 | # exhaustive search. |
---|
1187 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1188 | return self._done() |
---|
1189 | # still waiting for somebody |
---|
1190 | return self._send_more_queries(num_not_responded) |
---|
1191 | |
---|
1192 | # if we hit here, we didn't find our boundary, so we're still |
---|
1193 | # waiting for servers |
---|
1194 | self.log("no boundary yet, %s" % "".join(states), parent=lp, |
---|
1195 | level=log.NOISY) |
---|
1196 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1197 | |
---|
1198 | # otherwise, keep up to 5 queries in flight. TODO: this is pretty |
---|
1199 | # arbitrary, really I want this to be something like k - |
---|
1200 | # max(known_version_sharecounts) + some extra |
---|
1201 | self.log("catchall: need more", parent=lp, level=log.NOISY) |
---|
1202 | return self._send_more_queries(MAX_IN_FLIGHT) |
---|
1203 | |
---|
1204 | def _send_more_queries(self, num_outstanding): |
---|
1205 | more_queries = [] |
---|
1206 | |
---|
1207 | while True: |
---|
1208 | self.log(format=" there are %(outstanding)d queries outstanding", |
---|
1209 | outstanding=len(self._queries_outstanding), |
---|
1210 | level=log.NOISY) |
---|
1211 | active_queries = len(self._queries_outstanding) + len(more_queries) |
---|
1212 | if active_queries >= num_outstanding: |
---|
1213 | break |
---|
1214 | if not self.extra_servers: |
---|
1215 | break |
---|
1216 | more_queries.append(self.extra_servers.pop(0)) |
---|
1217 | |
---|
1218 | self.log(format="sending %(more)d more queries: %(who)s", |
---|
1219 | more=len(more_queries), |
---|
1220 | who=" ".join(["[%r]" % s.get_name() for s in more_queries]), |
---|
1221 | level=log.NOISY) |
---|
1222 | |
---|
1223 | for server in more_queries: |
---|
1224 | self._do_query(server, self._storage_index, self._read_size) |
---|
1225 | # we'll retrigger when those queries come back |
---|
1226 | |
---|
1227 | def _done(self): |
---|
1228 | if not self._running: |
---|
1229 | self.log("not running; we're already done") |
---|
1230 | return |
---|
1231 | self._running = False |
---|
1232 | now = time.time() |
---|
1233 | elapsed = now - self._started |
---|
1234 | self._status.set_finished(now) |
---|
1235 | self._status.timings["total"] = elapsed |
---|
1236 | self._status.set_progress(1.0) |
---|
1237 | self._status.set_status("Finished") |
---|
1238 | self._status.set_active(False) |
---|
1239 | |
---|
1240 | self._servermap.set_last_update(self.mode, self._started) |
---|
1241 | # the servermap will not be touched after this |
---|
1242 | self.log("servermap: %s" % self._servermap.summarize_versions()) |
---|
1243 | |
---|
1244 | eventually(self._done_deferred.callback, self._servermap) |
---|
1245 | |
---|
1246 | def _fatal_error(self, f): |
---|
1247 | self.log("fatal error", failure=f, level=log.WEIRD, umid="1cNvlw") |
---|
1248 | self._done_deferred.errback(f) |
---|