1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from zope.interface import implementer |
---|
6 | from twisted.internet import defer |
---|
7 | from allmydata.storage.server import si_b2a |
---|
8 | from allmydata.util import log, consumer |
---|
9 | from allmydata.util.assertutil import precondition |
---|
10 | from allmydata.interfaces import IEncryptedUploadable |
---|
11 | |
---|
12 | from allmydata.immutable import upload |
---|
13 | |
---|
14 | @implementer(IEncryptedUploadable) |
---|
15 | class Repairer(log.PrefixingLogMixin): |
---|
16 | """I generate any shares which were not available and upload them to |
---|
17 | servers. |
---|
18 | |
---|
19 | Which servers? Well, I just use the normal upload process, so any servers |
---|
20 | that will take shares. In fact, I even believe servers if they say that |
---|
21 | they already have shares even if attempts to download those shares would |
---|
22 | fail because the shares are corrupted. |
---|
23 | |
---|
24 | My process of uploading replacement shares proceeds in a segment-wise |
---|
25 | fashion -- first I ask servers if they can hold the new shares, and wait |
---|
26 | until enough have agreed then I download the first segment of the file |
---|
27 | and upload the first block of each replacement share, and only after all |
---|
28 | those blocks have been uploaded do I download the second segment of the |
---|
29 | file and upload the second block of each replacement share to its |
---|
30 | respective server. (I do it this way in order to minimize the amount of |
---|
31 | downloading I have to do and the amount of memory I have to use at any |
---|
32 | one time.) |
---|
33 | |
---|
34 | If any of the servers to which I am uploading replacement shares fails to |
---|
35 | accept the blocks during this process, then I just stop using that |
---|
36 | server, abandon any share-uploads that were going to that server, and |
---|
37 | proceed to finish uploading the remaining shares to their respective |
---|
38 | servers. At the end of my work, I produce an object which satisfies the |
---|
39 | ICheckAndRepairResults interface (by firing the deferred that I returned |
---|
40 | from start() and passing that check-and-repair-results object). |
---|
41 | |
---|
42 | Before I send any new request to a server, I always ask the 'monitor' |
---|
43 | object that was passed into my constructor whether this task has been |
---|
44 | cancelled (by invoking its raise_if_cancelled() method). |
---|
45 | """ |
---|
46 | |
---|
47 | def __init__(self, filenode, storage_broker, secret_holder, monitor): |
---|
48 | logprefix = si_b2a(filenode.get_storage_index())[:5] |
---|
49 | log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", |
---|
50 | prefix=logprefix) |
---|
51 | self._filenode = filenode |
---|
52 | self._storage_broker = storage_broker |
---|
53 | self._secret_holder = secret_holder |
---|
54 | self._monitor = monitor |
---|
55 | self._offset = 0 |
---|
56 | |
---|
57 | def start(self): |
---|
58 | self.log("starting repair") |
---|
59 | d = self._filenode.get_segment_size() |
---|
60 | def _got_segsize(segsize): |
---|
61 | vcap = self._filenode.get_verify_cap() |
---|
62 | k = vcap.needed_shares |
---|
63 | N = vcap.total_shares |
---|
64 | # Per ticket #1212 |
---|
65 | # (http://tahoe-lafs.org/trac/tahoe-lafs/ticket/1212) |
---|
66 | happy = 0 |
---|
67 | self._encodingparams = (k, happy, N, segsize) |
---|
68 | # XXX should pass a reactor to this |
---|
69 | ul = upload.CHKUploader(self._storage_broker, self._secret_holder) |
---|
70 | return ul.start(self) # I am the IEncryptedUploadable |
---|
71 | d.addCallback(_got_segsize) |
---|
72 | return d |
---|
73 | |
---|
74 | |
---|
75 | # methods to satisfy the IEncryptedUploader interface |
---|
76 | # (From the perspective of an uploader I am an IEncryptedUploadable.) |
---|
77 | def set_upload_status(self, upload_status): |
---|
78 | self.upload_status = upload_status |
---|
79 | def get_size(self): |
---|
80 | size = self._filenode.get_size() |
---|
81 | assert size is not None |
---|
82 | return defer.succeed(size) |
---|
83 | def get_all_encoding_parameters(self): |
---|
84 | return defer.succeed(self._encodingparams) |
---|
85 | def read_encrypted(self, length, hash_only): |
---|
86 | """Returns a deferred which eventually fires with the requested |
---|
87 | ciphertext, as a list of strings.""" |
---|
88 | precondition(length) # please don't ask to read 0 bytes |
---|
89 | mc = consumer.MemoryConsumer() |
---|
90 | d = self._filenode.read(mc, self._offset, length) |
---|
91 | self._offset += length |
---|
92 | d.addCallback(lambda ign: mc.chunks) |
---|
93 | return d |
---|
94 | def get_storage_index(self): |
---|
95 | return self._filenode.get_storage_index() |
---|
96 | def close(self): |
---|
97 | pass |
---|