diff --git a/src/allmydata/mutable/checker.py b/src/allmydata/mutable/checker.py
index ea288a0..6b629ec 100644
a
|
b
|
from allmydata.uri import from_string |
3 | 3 | from allmydata.util import base32, log |
4 | 4 | from allmydata.check_results import CheckAndRepairResults, CheckResults |
5 | 5 | |
6 | | from allmydata.mutable.common import MODE_CHECK, CorruptShareError |
| 6 | from allmydata.mutable.common import MODE_CHECK, MODE_WRITE, CorruptShareError |
7 | 7 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
8 | 8 | from allmydata.mutable.retrieve import Retrieve # for verifying |
9 | 9 | |
10 | 10 | class MutableChecker: |
| 11 | SERVERMAP_MODE = MODE_CHECK |
11 | 12 | |
12 | 13 | def __init__(self, node, storage_broker, history, monitor): |
13 | 14 | self._node = node |
… |
… |
class MutableChecker: |
26 | 27 | # of finding all of the shares, and getting a good idea of |
27 | 28 | # recoverability, etc, without verifying. |
28 | 29 | u = ServermapUpdater(self._node, self._storage_broker, self._monitor, |
29 | | servermap, MODE_CHECK, add_lease=add_lease) |
| 30 | servermap, self.SERVERMAP_MODE, |
| 31 | add_lease=add_lease) |
30 | 32 | if self._history: |
31 | 33 | self._history.notify_mapupdate(u.get_status()) |
32 | 34 | d = u.update() |
… |
… |
class MutableChecker: |
241 | 243 | |
242 | 244 | |
243 | 245 | class MutableCheckAndRepairer(MutableChecker): |
| 246 | SERVERMAP_MODE = MODE_WRITE # needed to get the privkey |
| 247 | |
244 | 248 | def __init__(self, node, storage_broker, history, monitor): |
245 | 249 | MutableChecker.__init__(self, node, storage_broker, history, monitor) |
246 | 250 | self.cr_results = CheckAndRepairResults(self._storage_index) |
… |
… |
class MutableCheckAndRepairer(MutableChecker): |
264 | 268 | self.cr_results.repair_attempted = False |
265 | 269 | return |
266 | 270 | self.cr_results.repair_attempted = True |
267 | | d = self._node.repair(self.results) |
| 271 | d = self._node.repair(self.results, monitor=self._monitor) |
268 | 272 | def _repair_finished(repair_results): |
269 | 273 | self.cr_results.repair_successful = repair_results.get_successful() |
270 | 274 | r = CheckResults(from_string(self._node.get_uri()), self._storage_index) |
diff --git a/src/allmydata/mutable/common.py b/src/allmydata/mutable/common.py
index 9ce11c5..9ce8e37 100644
a
|
b
|
MODE_ANYTHING = "MODE_ANYTHING" # one recoverable version |
6 | 6 | MODE_WRITE = "MODE_WRITE" # replace all shares, probably.. not for initial |
7 | 7 | # creation |
8 | 8 | MODE_READ = "MODE_READ" |
| 9 | MODE_REPAIR = "MODE_REPAIR" # query all peers, get the privkey |
9 | 10 | |
10 | 11 | class NotWriteableError(Exception): |
11 | 12 | pass |
diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py
index 74ed7f0..61ccd39 100644
a
|
b
|
class MutableFileNode: |
308 | 308 | ################################# |
309 | 309 | # IRepairable |
310 | 310 | |
311 | | def repair(self, check_results, force=False): |
| 311 | def repair(self, check_results, force=False, monitor=None): |
312 | 312 | assert ICheckResults(check_results) |
313 | | r = Repairer(self, check_results) |
| 313 | r = Repairer(self, check_results, self._storage_broker, |
| 314 | self._history, monitor) |
314 | 315 | d = r.start(force) |
315 | 316 | return d |
316 | 317 | |
diff --git a/src/allmydata/mutable/publish.py b/src/allmydata/mutable/publish.py
index 4f61ad1..c611a6f 100644
a
|
b
|
from allmydata.storage.server import si_b2a |
15 | 15 | from pycryptopp.cipher.aes import AES |
16 | 16 | from foolscap.api import eventually, fireEventually |
17 | 17 | |
18 | | from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, \ |
| 18 | from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \ |
19 | 19 | UncoordinatedWriteError, NotEnoughServersError |
20 | 20 | from allmydata.mutable.servermap import ServerMap |
21 | 21 | from allmydata.mutable.layout import get_version_from_checkstring,\ |
… |
… |
class Publish: |
187 | 187 | # servermap was updated in MODE_WRITE, so we can depend upon the |
188 | 188 | # serverlist computed by that process instead of computing our own. |
189 | 189 | assert self._servermap |
190 | | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK) |
| 190 | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) |
191 | 191 | # we will push a version that is one larger than anything present |
192 | 192 | # in the grid, according to the servermap. |
193 | 193 | self._new_seqnum = self._servermap.highest_seqnum() + 1 |
… |
… |
class Publish: |
373 | 373 | # servermap was updated in MODE_WRITE, so we can depend upon the |
374 | 374 | # serverlist computed by that process instead of computing our own. |
375 | 375 | if self._servermap: |
376 | | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK) |
| 376 | assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR) |
377 | 377 | # we will push a version that is one larger than anything present |
378 | 378 | # in the grid, according to the servermap. |
379 | 379 | self._new_seqnum = self._servermap.highest_seqnum() + 1 |
diff --git a/src/allmydata/mutable/repairer.py b/src/allmydata/mutable/repairer.py
index d0bfeff..94641e4 100644
a
|
b
|
from zope.interface import implements |
3 | 3 | from twisted.internet import defer |
4 | 4 | from allmydata.interfaces import IRepairResults, ICheckResults |
5 | 5 | from allmydata.mutable.publish import MutableData |
| 6 | from allmydata.mutable.common import MODE_REPAIR |
| 7 | from allmydata.mutable.servermap import ServerMap, ServermapUpdater |
6 | 8 | |
7 | 9 | class RepairResults: |
8 | 10 | implements(IRepairResults) |
… |
… |
class MustForceRepairError(Exception): |
23 | 25 | pass |
24 | 26 | |
25 | 27 | class Repairer: |
26 | | def __init__(self, node, check_results): |
| 28 | def __init__(self, node, check_results, storage_broker, history, monitor): |
27 | 29 | self.node = node |
28 | 30 | self.check_results = ICheckResults(check_results) |
29 | 31 | assert check_results.storage_index == self.node.get_storage_index() |
| 32 | self._storage_broker = storage_broker |
| 33 | self._history = history |
| 34 | self._monitor = monitor |
30 | 35 | |
31 | 36 | def start(self, force=False): |
32 | 37 | # download, then re-publish. If a server had a bad share, try to |
… |
… |
class Repairer: |
55 | 60 | # old shares: replace old shares with the latest version |
56 | 61 | # bogus shares (bad sigs): replace the bad one with a good one |
57 | 62 | |
58 | | smap = self.check_results.get_servermap() |
| 63 | # first, update the servermap in MODE_REPAIR, which files all shares |
| 64 | # and makes sure we get the privkey. |
| 65 | u = ServermapUpdater(self.node, self._storage_broker, self._monitor, |
| 66 | ServerMap(), MODE_REPAIR) |
| 67 | if self._history: |
| 68 | self._history.notify_mapupdate(u.get_status()) |
| 69 | d = u.update() |
| 70 | d.addCallback(self._got_full_servermap, force) |
| 71 | return d |
59 | 72 | |
| 73 | def _got_full_servermap(self, smap, force): |
60 | 74 | best_version = smap.best_recoverable_version() |
61 | 75 | if not best_version: |
62 | 76 | # the file is damaged beyond repair |
diff --git a/src/allmydata/mutable/servermap.py b/src/allmydata/mutable/servermap.py
index df75518..e908e42 100644
a
|
b
|
from allmydata.storage.server import si_b2a |
12 | 12 | from allmydata.interfaces import IServermapUpdaterStatus |
13 | 13 | from pycryptopp.publickey import rsa |
14 | 14 | |
15 | | from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ |
16 | | CorruptShareError |
| 15 | from allmydata.mutable.common import MODE_CHECK, MODE_ANYTHING, MODE_WRITE, \ |
| 16 | MODE_READ, MODE_REPAIR, CorruptShareError |
17 | 17 | from allmydata.mutable.layout import SIGNED_PREFIX_LENGTH, MDMFSlotReadProxy |
18 | 18 | |
19 | 19 | class UpdateStatus: |
… |
… |
class ServermapUpdater: |
426 | 426 | self._read_size = 1000 |
427 | 427 | self._need_privkey = False |
428 | 428 | |
429 | | if mode == MODE_WRITE and not self._node.get_privkey(): |
| 429 | if mode in (MODE_WRITE, MODE_REPAIR) and not self._node.get_privkey(): |
430 | 430 | self._need_privkey = True |
431 | 431 | # check+repair: repair requires the privkey, so if we didn't happen |
432 | 432 | # to ask for it during the check, we'll have problems doing the |
… |
… |
class ServermapUpdater: |
497 | 497 | # might not wait for all of their answers to come back) |
498 | 498 | self.num_servers_to_query = k + self.EPSILON |
499 | 499 | |
500 | | if self.mode == MODE_CHECK: |
| 500 | if self.mode in (MODE_CHECK, MODE_REPAIR): |
501 | 501 | # We want to query all of the servers. |
502 | 502 | initial_servers_to_query = list(full_serverlist) |
503 | 503 | must_query = set(initial_servers_to_query) |
… |
… |
class ServermapUpdater: |
1063 | 1063 | parent=lp) |
1064 | 1064 | return self._done() |
1065 | 1065 | |
1066 | | if self.mode == MODE_CHECK: |
| 1066 | if self.mode == (MODE_CHECK, MODE_REPAIR): |
1067 | 1067 | # we used self._must_query, and we know there aren't any |
1068 | 1068 | # responses still waiting, so that means we must be done |
1069 | 1069 | self.log("done", parent=lp) |
diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py
index c878117..a13a00e 100644
a
|
b
|
from allmydata.test.test_download import PausingConsumer, \ |
39 | 39 | PausingAndStoppingConsumer, StoppingConsumer, \ |
40 | 40 | ImmediatelyStoppingConsumer |
41 | 41 | |
| 42 | def eventuaaaaaly(res=None): |
| 43 | d = fireEventually(res) |
| 44 | d.addCallback(fireEventually) |
| 45 | d.addCallback(fireEventually) |
| 46 | return d |
| 47 | |
42 | 48 | |
43 | 49 | # this "FakeStorage" exists to put the share data in RAM and avoid using real |
44 | 50 | # network connections, both to speed up the tests and to reduce the amount of |
… |
… |
class FakeStorage: |
69 | 75 | def read(self, peerid, storage_index): |
70 | 76 | shares = self._peers.get(peerid, {}) |
71 | 77 | if self._sequence is None: |
72 | | return defer.succeed(shares) |
73 | | d = defer.Deferred() |
| 78 | return eventuaaaaaly(shares) |
| 79 | d = eventuaaaaaly() |
74 | 80 | if not self._pending: |
75 | 81 | self._pending_timer = reactor.callLater(1.0, self._fire_readers) |
76 | 82 | if peerid not in self._pending: |
… |
… |
def make_storagebroker(s=None, num_peers=10): |
233 | 239 | storage_broker.test_add_rref(peerid, fss, ann) |
234 | 240 | return storage_broker |
235 | 241 | |
236 | | def make_nodemaker(s=None, num_peers=10): |
| 242 | def make_nodemaker(s=None, num_peers=10, keysize=TEST_RSA_KEY_SIZE): |
237 | 243 | storage_broker = make_storagebroker(s, num_peers) |
238 | 244 | sh = client.SecretHolder("lease secret", "convergence secret") |
239 | 245 | keygen = client.KeyGenerator() |
240 | | keygen.set_default_keysize(TEST_RSA_KEY_SIZE) |
| 246 | if keysize: |
| 247 | keygen.set_default_keysize(keysize) |
241 | 248 | nodemaker = NodeMaker(storage_broker, sh, None, |
242 | 249 | None, None, |
243 | 250 | {"k": 3, "n": 10}, SDMF_VERSION, keygen) |
… |
… |
class PublishMixin: |
957 | 964 | d.addCallback(_created) |
958 | 965 | return d |
959 | 966 | |
| 967 | def publish_empty_sdmf(self): |
| 968 | self.CONTENTS = "" |
| 969 | self.uploadable = MutableData(self.CONTENTS) |
| 970 | self._storage = FakeStorage() |
| 971 | self._nodemaker = make_nodemaker(self._storage, keysize=None) |
| 972 | self._storage_broker = self._nodemaker.storage_broker |
| 973 | d = self._nodemaker.create_mutable_file(self.uploadable, |
| 974 | version=SDMF_VERSION) |
| 975 | def _created(node): |
| 976 | self._fn = node |
| 977 | self._fn2 = self._nodemaker.create_from_cap(node.get_uri()) |
| 978 | d.addCallback(_created) |
| 979 | return d |
| 980 | |
960 | 981 | |
961 | 982 | def publish_multiple(self, version=0): |
962 | 983 | self.CONTENTS = ["Contents 0", |
… |
… |
class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin): |
2157 | 2178 | d.addCallback(_check_results) |
2158 | 2179 | return d |
2159 | 2180 | |
| 2181 | def test_repair_empty(self): |
| 2182 | # bug 1689: delete one share of an empty mutable file, then repair. |
| 2183 | # In the buggy version, the check that preceeds the retrieve+publish |
| 2184 | # cycle uses MODE_READ, instead of MODE_REPAIR, and fails to get the |
| 2185 | # privkey that repair needs. |
| 2186 | d = self.publish_empty_sdmf() |
| 2187 | def _delete_one_share(ign): |
| 2188 | shares = self._storage._peers |
| 2189 | for peerid in shares: |
| 2190 | for shnum in list(shares[peerid]): |
| 2191 | if shnum == 0: |
| 2192 | del shares[peerid][shnum] |
| 2193 | d.addCallback(_delete_one_share) |
| 2194 | d.addCallback(lambda ign: self._fn2.check(Monitor())) |
| 2195 | d.addCallback(lambda check_results: self._fn2.repair(check_results)) |
| 2196 | def _check(crr): |
| 2197 | self.failUnlessEqual(crr.get_successful(), True) |
| 2198 | d.addCallback(_check) |
| 2199 | return d |
| 2200 | |
2160 | 2201 | class DevNullDictionary(dict): |
2161 | 2202 | def __setitem__(self, key, value): |
2162 | 2203 | return |