1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from functools import reduce |
---|
6 | import binascii |
---|
7 | from time import time as now |
---|
8 | |
---|
9 | from zope.interface import implementer |
---|
10 | from twisted.internet import defer |
---|
11 | |
---|
12 | from allmydata import uri |
---|
13 | from twisted.internet.interfaces import IConsumer |
---|
14 | from allmydata.crypto import aes |
---|
15 | from allmydata.interfaces import IImmutableFileNode, IUploadResults |
---|
16 | from allmydata.util import consumer |
---|
17 | from allmydata.check_results import CheckResults, CheckAndRepairResults |
---|
18 | from allmydata.util.dictutil import DictOfSets |
---|
19 | from allmydata.util.happinessutil import servers_of_happiness |
---|
20 | |
---|
21 | # local imports |
---|
22 | from allmydata.immutable.checker import Checker |
---|
23 | from allmydata.immutable.repairer import Repairer |
---|
24 | from allmydata.immutable.downloader.node import DownloadNode, \ |
---|
25 | IDownloadStatusHandlingConsumer |
---|
26 | from allmydata.immutable.downloader.status import DownloadStatus |
---|
27 | |
---|
28 | class CiphertextFileNode(object): |
---|
29 | def __init__(self, verifycap, storage_broker, secret_holder, |
---|
30 | terminator, history): |
---|
31 | assert isinstance(verifycap, uri.CHKFileVerifierURI) |
---|
32 | self._verifycap = verifycap |
---|
33 | self._storage_broker = storage_broker |
---|
34 | self._secret_holder = secret_holder |
---|
35 | self._terminator = terminator |
---|
36 | self._history = history |
---|
37 | self._download_status = None |
---|
38 | self._node = None # created lazily, on read() |
---|
39 | |
---|
40 | def _maybe_create_download_node(self): |
---|
41 | if not self._download_status: |
---|
42 | ds = DownloadStatus(self._verifycap.storage_index, |
---|
43 | self._verifycap.size) |
---|
44 | if self._history: |
---|
45 | self._history.add_download(ds) |
---|
46 | self._download_status = ds |
---|
47 | if self._node is None: |
---|
48 | self._node = DownloadNode(self._verifycap, self._storage_broker, |
---|
49 | self._secret_holder, |
---|
50 | self._terminator, |
---|
51 | self._history, self._download_status) |
---|
52 | |
---|
53 | def read(self, consumer, offset=0, size=None): |
---|
54 | """I am the main entry point, from which FileNode.read() can get |
---|
55 | data. I feed the consumer with the desired range of ciphertext. I |
---|
56 | return a Deferred that fires (with the consumer) when the read is |
---|
57 | finished.""" |
---|
58 | self._maybe_create_download_node() |
---|
59 | return self._node.read(consumer, offset, size) |
---|
60 | |
---|
61 | def get_segment(self, segnum): |
---|
62 | """Begin downloading a segment. I return a tuple (d, c): 'd' is a |
---|
63 | Deferred that fires with (offset,data) when the desired segment is |
---|
64 | available, and c is an object on which c.cancel() can be called to |
---|
65 | disavow interest in the segment (after which 'd' will never fire). |
---|
66 | |
---|
67 | You probably need to know the segment size before calling this, |
---|
68 | unless you want the first few bytes of the file. If you ask for a |
---|
69 | segment number which turns out to be too large, the Deferred will |
---|
70 | errback with BadSegmentNumberError. |
---|
71 | |
---|
72 | The Deferred fires with the offset of the first byte of the data |
---|
73 | segment, so that you can call get_segment() before knowing the |
---|
74 | segment size, and still know which data you received. |
---|
75 | """ |
---|
76 | self._maybe_create_download_node() |
---|
77 | return self._node.get_segment(segnum) |
---|
78 | |
---|
79 | def get_segment_size(self): |
---|
80 | # return a Deferred that fires with the file's real segment size |
---|
81 | self._maybe_create_download_node() |
---|
82 | return self._node.get_segsize() |
---|
83 | |
---|
84 | def get_storage_index(self): |
---|
85 | return self._verifycap.storage_index |
---|
86 | def get_verify_cap(self): |
---|
87 | return self._verifycap |
---|
88 | def get_size(self): |
---|
89 | return self._verifycap.size |
---|
90 | |
---|
91 | def raise_error(self): |
---|
92 | pass |
---|
93 | |
---|
94 | def is_mutable(self): |
---|
95 | return False |
---|
96 | |
---|
97 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
---|
98 | c = Checker(verifycap=self._verifycap, |
---|
99 | servers=self._storage_broker.get_connected_servers(), |
---|
100 | verify=verify, add_lease=add_lease, |
---|
101 | secret_holder=self._secret_holder, |
---|
102 | monitor=monitor) |
---|
103 | d = c.start() |
---|
104 | d.addCallback(self._maybe_repair, monitor) |
---|
105 | return d |
---|
106 | |
---|
107 | def _maybe_repair(self, cr, monitor): |
---|
108 | crr = CheckAndRepairResults(self._verifycap.storage_index) |
---|
109 | crr.pre_repair_results = cr |
---|
110 | if cr.is_healthy(): |
---|
111 | crr.post_repair_results = cr |
---|
112 | return defer.succeed(crr) |
---|
113 | |
---|
114 | crr.repair_attempted = True |
---|
115 | crr.repair_successful = False # until proven successful |
---|
116 | def _repair_error(f): |
---|
117 | # as with mutable repair, I'm not sure if I want to pass |
---|
118 | # through a failure or not. TODO |
---|
119 | crr.repair_successful = False |
---|
120 | crr.repair_failure = f |
---|
121 | return f |
---|
122 | r = Repairer(self, storage_broker=self._storage_broker, |
---|
123 | secret_holder=self._secret_holder, |
---|
124 | monitor=monitor) |
---|
125 | d = r.start() |
---|
126 | d.addCallbacks(self._gather_repair_results, _repair_error, |
---|
127 | callbackArgs=(cr, crr,)) |
---|
128 | return d |
---|
129 | |
---|
130 | def _gather_repair_results(self, ur, cr, crr): |
---|
131 | assert IUploadResults.providedBy(ur), ur |
---|
132 | # clone the cr (check results) to form the basis of the |
---|
133 | # prr (post-repair results) |
---|
134 | |
---|
135 | verifycap = self._verifycap |
---|
136 | servers_responding = set(cr.get_servers_responding()) |
---|
137 | sm = DictOfSets() |
---|
138 | assert isinstance(cr.get_sharemap(), DictOfSets) |
---|
139 | for shnum, servers in cr.get_sharemap().items(): |
---|
140 | for server in servers: |
---|
141 | sm.add(shnum, server) |
---|
142 | for shnum, servers in ur.get_sharemap().items(): |
---|
143 | for server in servers: |
---|
144 | sm.add(shnum, server) |
---|
145 | servers_responding.add(server) |
---|
146 | |
---|
147 | good_hosts = len(reduce(set.union, sm.values(), set())) |
---|
148 | is_healthy = bool(len(sm) >= verifycap.total_shares) |
---|
149 | is_recoverable = bool(len(sm) >= verifycap.needed_shares) |
---|
150 | |
---|
151 | count_happiness = servers_of_happiness(sm) |
---|
152 | |
---|
153 | prr = CheckResults(cr.get_uri(), cr.get_storage_index(), |
---|
154 | healthy=is_healthy, recoverable=is_recoverable, |
---|
155 | count_happiness=count_happiness, |
---|
156 | count_shares_needed=verifycap.needed_shares, |
---|
157 | count_shares_expected=verifycap.total_shares, |
---|
158 | count_shares_good=len(sm), |
---|
159 | count_good_share_hosts=good_hosts, |
---|
160 | count_recoverable_versions=int(is_recoverable), |
---|
161 | count_unrecoverable_versions=int(not is_recoverable), |
---|
162 | servers_responding=list(servers_responding), |
---|
163 | sharemap=sm, |
---|
164 | count_wrong_shares=0, # no such thing as wrong, for immutable |
---|
165 | list_corrupt_shares=cr.get_corrupt_shares(), |
---|
166 | count_corrupt_shares=len(cr.get_corrupt_shares()), |
---|
167 | list_incompatible_shares=cr.get_incompatible_shares(), |
---|
168 | count_incompatible_shares=len(cr.get_incompatible_shares()), |
---|
169 | summary="", |
---|
170 | report=[], |
---|
171 | share_problems=[], |
---|
172 | servermap=None) |
---|
173 | crr.repair_successful = is_healthy |
---|
174 | crr.post_repair_results = prr |
---|
175 | return crr |
---|
176 | |
---|
177 | def check(self, monitor, verify=False, add_lease=False): |
---|
178 | verifycap = self._verifycap |
---|
179 | sb = self._storage_broker |
---|
180 | servers = sb.get_connected_servers() |
---|
181 | sh = self._secret_holder |
---|
182 | |
---|
183 | v = Checker(verifycap=verifycap, servers=servers, |
---|
184 | verify=verify, add_lease=add_lease, secret_holder=sh, |
---|
185 | monitor=monitor) |
---|
186 | return v.start() |
---|
187 | |
---|
188 | @implementer(IConsumer, IDownloadStatusHandlingConsumer) |
---|
189 | class DecryptingConsumer(object): |
---|
190 | """I sit between a CiphertextDownloader (which acts as a Producer) and |
---|
191 | the real Consumer, decrypting everything that passes by. The real |
---|
192 | Consumer sees the real Producer, but the Producer sees us instead of the |
---|
193 | real consumer.""" |
---|
194 | |
---|
195 | def __init__(self, consumer, readkey, offset): |
---|
196 | self._consumer = consumer |
---|
197 | self._read_ev = None |
---|
198 | self._download_status = None |
---|
199 | # TODO: pycryptopp CTR-mode needs random-access operations: I want |
---|
200 | # either a=AES(readkey, offset) or better yet both of: |
---|
201 | # a=AES(readkey, offset=0) |
---|
202 | # a.process(ciphertext, offset=xyz) |
---|
203 | # For now, we fake it with the existing iv= argument. |
---|
204 | offset_big = offset // 16 |
---|
205 | offset_small = offset % 16 |
---|
206 | iv = binascii.unhexlify("%032x" % offset_big) |
---|
207 | self._decryptor = aes.create_decryptor(readkey, iv) |
---|
208 | # this is just to advance the counter |
---|
209 | aes.decrypt_data(self._decryptor, b"\x00" * offset_small) |
---|
210 | |
---|
211 | def set_download_status_read_event(self, read_ev): |
---|
212 | self._read_ev = read_ev |
---|
213 | def set_download_status(self, ds): |
---|
214 | self._download_status = ds |
---|
215 | |
---|
216 | def registerProducer(self, producer, streaming): |
---|
217 | # this passes through, so the real consumer can flow-control the real |
---|
218 | # producer. Therefore we don't need to provide any IPushProducer |
---|
219 | # methods. We implement all the IConsumer methods as pass-throughs, |
---|
220 | # and only intercept write() to perform decryption. |
---|
221 | self._consumer.registerProducer(producer, streaming) |
---|
222 | def unregisterProducer(self): |
---|
223 | self._consumer.unregisterProducer() |
---|
224 | def write(self, ciphertext): |
---|
225 | started = now() |
---|
226 | plaintext = aes.decrypt_data(self._decryptor, ciphertext) |
---|
227 | if self._read_ev: |
---|
228 | elapsed = now() - started |
---|
229 | self._read_ev.update(0, elapsed, 0) |
---|
230 | if self._download_status: |
---|
231 | self._download_status.add_misc_event("AES", started, now()) |
---|
232 | self._consumer.write(plaintext) |
---|
233 | |
---|
234 | @implementer(IImmutableFileNode) |
---|
235 | class ImmutableFileNode(object): |
---|
236 | |
---|
237 | # I wrap a CiphertextFileNode with a decryption key |
---|
238 | def __init__(self, filecap, storage_broker, secret_holder, terminator, |
---|
239 | history): |
---|
240 | assert isinstance(filecap, uri.CHKFileURI) |
---|
241 | verifycap = filecap.get_verify_cap() |
---|
242 | self._cnode = CiphertextFileNode(verifycap, storage_broker, |
---|
243 | secret_holder, terminator, history) |
---|
244 | assert isinstance(filecap, uri.CHKFileURI) |
---|
245 | self.u = filecap |
---|
246 | self._readkey = filecap.key |
---|
247 | |
---|
248 | # TODO: I'm not sure about this.. what's the use case for node==node? If |
---|
249 | # we keep it here, we should also put this on CiphertextFileNode |
---|
250 | def __hash__(self): |
---|
251 | return self.u.__hash__() |
---|
252 | |
---|
253 | def __eq__(self, other): |
---|
254 | if isinstance(other, ImmutableFileNode): |
---|
255 | return self.u.__eq__(other.u) |
---|
256 | else: |
---|
257 | return False |
---|
258 | |
---|
259 | def __ne__(self, other): |
---|
260 | if isinstance(other, ImmutableFileNode): |
---|
261 | return self.u.__eq__(other.u) |
---|
262 | else: |
---|
263 | return True |
---|
264 | |
---|
265 | def read(self, consumer, offset=0, size=None): |
---|
266 | decryptor = DecryptingConsumer(consumer, self._readkey, offset) |
---|
267 | d = self._cnode.read(decryptor, offset, size) |
---|
268 | d.addCallback(lambda dc: consumer) |
---|
269 | return d |
---|
270 | |
---|
271 | def raise_error(self): |
---|
272 | pass |
---|
273 | |
---|
274 | def get_write_uri(self): |
---|
275 | return None |
---|
276 | |
---|
277 | def get_readonly_uri(self): |
---|
278 | return self.get_uri() |
---|
279 | |
---|
280 | def get_uri(self): |
---|
281 | return self.u.to_string() |
---|
282 | |
---|
283 | def get_cap(self): |
---|
284 | return self.u |
---|
285 | |
---|
286 | def get_readcap(self): |
---|
287 | return self.u.get_readonly() |
---|
288 | |
---|
289 | def get_verify_cap(self): |
---|
290 | return self.u.get_verify_cap() |
---|
291 | |
---|
292 | def get_repair_cap(self): |
---|
293 | # CHK files can be repaired with just the verifycap |
---|
294 | return self.u.get_verify_cap() |
---|
295 | |
---|
296 | def get_storage_index(self): |
---|
297 | return self.u.get_storage_index() |
---|
298 | |
---|
299 | def get_size(self): |
---|
300 | return self.u.get_size() |
---|
301 | |
---|
302 | def get_current_size(self): |
---|
303 | return defer.succeed(self.get_size()) |
---|
304 | |
---|
305 | def is_mutable(self): |
---|
306 | return False |
---|
307 | |
---|
308 | def is_readonly(self): |
---|
309 | return True |
---|
310 | |
---|
311 | def is_unknown(self): |
---|
312 | return False |
---|
313 | |
---|
314 | def is_allowed_in_immutable_directory(self): |
---|
315 | return True |
---|
316 | |
---|
317 | def check_and_repair(self, monitor, verify=False, add_lease=False): |
---|
318 | return self._cnode.check_and_repair(monitor, verify, add_lease) |
---|
319 | |
---|
320 | def check(self, monitor, verify=False, add_lease=False): |
---|
321 | return self._cnode.check(monitor, verify, add_lease) |
---|
322 | |
---|
323 | def get_best_readable_version(self): |
---|
324 | """ |
---|
325 | Return an IReadable of the best version of this file. Since |
---|
326 | immutable files can have only one version, we just return the |
---|
327 | current filenode. |
---|
328 | """ |
---|
329 | return defer.succeed(self) |
---|
330 | |
---|
331 | def download_best_version(self): |
---|
332 | """ |
---|
333 | Download the best version of this file, returning its contents |
---|
334 | as a bytestring. Since there is only one version of an immutable |
---|
335 | file, we download and return the contents of this file. |
---|
336 | """ |
---|
337 | d = consumer.download_to_data(self) |
---|
338 | return d |
---|
339 | |
---|
340 | # for an immutable file, download_to_data (specified in IReadable) |
---|
341 | # is the same as download_best_version (specified in IFileNode). For |
---|
342 | # mutable files, the difference is more meaningful, since they can |
---|
343 | # have multiple versions. |
---|
344 | download_to_data = download_best_version |
---|
345 | |
---|
346 | |
---|
347 | # get_size() (IReadable), get_current_size() (IFilesystemNode), and |
---|
348 | # get_size_of_best_version(IFileNode) are all the same for immutable |
---|
349 | # files. |
---|
350 | get_size_of_best_version = get_current_size |
---|