source: trunk/src/allmydata/immutable/filenode.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 13.2 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from functools import reduce
6import binascii
7from time import time as now
8
9from zope.interface import implementer
10from twisted.internet import defer
11
12from allmydata import uri
13from twisted.internet.interfaces import IConsumer
14from allmydata.crypto import aes
15from allmydata.interfaces import IImmutableFileNode, IUploadResults
16from allmydata.util import consumer
17from allmydata.check_results import CheckResults, CheckAndRepairResults
18from allmydata.util.dictutil import DictOfSets
19from allmydata.util.happinessutil import servers_of_happiness
20
21# local imports
22from allmydata.immutable.checker import Checker
23from allmydata.immutable.repairer import Repairer
24from allmydata.immutable.downloader.node import DownloadNode, \
25     IDownloadStatusHandlingConsumer
26from allmydata.immutable.downloader.status import DownloadStatus
27
28class CiphertextFileNode(object):
29    def __init__(self, verifycap, storage_broker, secret_holder,
30                 terminator, history):
31        assert isinstance(verifycap, uri.CHKFileVerifierURI)
32        self._verifycap = verifycap
33        self._storage_broker = storage_broker
34        self._secret_holder = secret_holder
35        self._terminator = terminator
36        self._history = history
37        self._download_status = None
38        self._node = None # created lazily, on read()
39
40    def _maybe_create_download_node(self):
41        if not self._download_status:
42            ds = DownloadStatus(self._verifycap.storage_index,
43                                self._verifycap.size)
44            if self._history:
45                self._history.add_download(ds)
46            self._download_status = ds
47        if self._node is None:
48            self._node = DownloadNode(self._verifycap, self._storage_broker,
49                                      self._secret_holder,
50                                      self._terminator,
51                                      self._history, self._download_status)
52
53    def read(self, consumer, offset=0, size=None):
54        """I am the main entry point, from which FileNode.read() can get
55        data. I feed the consumer with the desired range of ciphertext. I
56        return a Deferred that fires (with the consumer) when the read is
57        finished."""
58        self._maybe_create_download_node()
59        return self._node.read(consumer, offset, size)
60
61    def get_segment(self, segnum):
62        """Begin downloading a segment. I return a tuple (d, c): 'd' is a
63        Deferred that fires with (offset,data) when the desired segment is
64        available, and c is an object on which c.cancel() can be called to
65        disavow interest in the segment (after which 'd' will never fire).
66
67        You probably need to know the segment size before calling this,
68        unless you want the first few bytes of the file. If you ask for a
69        segment number which turns out to be too large, the Deferred will
70        errback with BadSegmentNumberError.
71
72        The Deferred fires with the offset of the first byte of the data
73        segment, so that you can call get_segment() before knowing the
74        segment size, and still know which data you received.
75        """
76        self._maybe_create_download_node()
77        return self._node.get_segment(segnum)
78
79    def get_segment_size(self):
80        # return a Deferred that fires with the file's real segment size
81        self._maybe_create_download_node()
82        return self._node.get_segsize()
83
84    def get_storage_index(self):
85        return self._verifycap.storage_index
86    def get_verify_cap(self):
87        return self._verifycap
88    def get_size(self):
89        return self._verifycap.size
90
91    def raise_error(self):
92        pass
93
94    def is_mutable(self):
95        return False
96
97    def check_and_repair(self, monitor, verify=False, add_lease=False):
98        c = Checker(verifycap=self._verifycap,
99                    servers=self._storage_broker.get_connected_servers(),
100                    verify=verify, add_lease=add_lease,
101                    secret_holder=self._secret_holder,
102                    monitor=monitor)
103        d = c.start()
104        d.addCallback(self._maybe_repair, monitor)
105        return d
106
107    def _maybe_repair(self, cr, monitor):
108        crr = CheckAndRepairResults(self._verifycap.storage_index)
109        crr.pre_repair_results = cr
110        if cr.is_healthy():
111            crr.post_repair_results = cr
112            return defer.succeed(crr)
113
114        crr.repair_attempted = True
115        crr.repair_successful = False # until proven successful
116        def _repair_error(f):
117            # as with mutable repair, I'm not sure if I want to pass
118            # through a failure or not. TODO
119            crr.repair_successful = False
120            crr.repair_failure = f
121            return f
122        r = Repairer(self, storage_broker=self._storage_broker,
123                     secret_holder=self._secret_holder,
124                     monitor=monitor)
125        d = r.start()
126        d.addCallbacks(self._gather_repair_results, _repair_error,
127                       callbackArgs=(cr, crr,))
128        return d
129
130    def _gather_repair_results(self, ur, cr, crr):
131        assert IUploadResults.providedBy(ur), ur
132        # clone the cr (check results) to form the basis of the
133        # prr (post-repair results)
134
135        verifycap = self._verifycap
136        servers_responding = set(cr.get_servers_responding())
137        sm = DictOfSets()
138        assert isinstance(cr.get_sharemap(), DictOfSets)
139        for shnum, servers in cr.get_sharemap().items():
140            for server in servers:
141                sm.add(shnum, server)
142        for shnum, servers in ur.get_sharemap().items():
143            for server in servers:
144                sm.add(shnum, server)
145                servers_responding.add(server)
146
147        good_hosts = len(reduce(set.union, sm.values(), set()))
148        is_healthy = bool(len(sm) >= verifycap.total_shares)
149        is_recoverable = bool(len(sm) >= verifycap.needed_shares)
150
151        count_happiness = servers_of_happiness(sm)
152
153        prr = CheckResults(cr.get_uri(), cr.get_storage_index(),
154                           healthy=is_healthy, recoverable=is_recoverable,
155                           count_happiness=count_happiness,
156                           count_shares_needed=verifycap.needed_shares,
157                           count_shares_expected=verifycap.total_shares,
158                           count_shares_good=len(sm),
159                           count_good_share_hosts=good_hosts,
160                           count_recoverable_versions=int(is_recoverable),
161                           count_unrecoverable_versions=int(not is_recoverable),
162                           servers_responding=list(servers_responding),
163                           sharemap=sm,
164                           count_wrong_shares=0, # no such thing as wrong, for immutable
165                           list_corrupt_shares=cr.get_corrupt_shares(),
166                           count_corrupt_shares=len(cr.get_corrupt_shares()),
167                           list_incompatible_shares=cr.get_incompatible_shares(),
168                           count_incompatible_shares=len(cr.get_incompatible_shares()),
169                           summary="",
170                           report=[],
171                           share_problems=[],
172                           servermap=None)
173        crr.repair_successful = is_healthy
174        crr.post_repair_results = prr
175        return crr
176
177    def check(self, monitor, verify=False, add_lease=False):
178        verifycap = self._verifycap
179        sb = self._storage_broker
180        servers = sb.get_connected_servers()
181        sh = self._secret_holder
182
183        v = Checker(verifycap=verifycap, servers=servers,
184                    verify=verify, add_lease=add_lease, secret_holder=sh,
185                    monitor=monitor)
186        return v.start()
187
188@implementer(IConsumer, IDownloadStatusHandlingConsumer)
189class DecryptingConsumer(object):
190    """I sit between a CiphertextDownloader (which acts as a Producer) and
191    the real Consumer, decrypting everything that passes by. The real
192    Consumer sees the real Producer, but the Producer sees us instead of the
193    real consumer."""
194
195    def __init__(self, consumer, readkey, offset):
196        self._consumer = consumer
197        self._read_ev = None
198        self._download_status = None
199        # TODO: pycryptopp CTR-mode needs random-access operations: I want
200        # either a=AES(readkey, offset) or better yet both of:
201        #  a=AES(readkey, offset=0)
202        #  a.process(ciphertext, offset=xyz)
203        # For now, we fake it with the existing iv= argument.
204        offset_big = offset // 16
205        offset_small = offset % 16
206        iv = binascii.unhexlify("%032x" % offset_big)
207        self._decryptor = aes.create_decryptor(readkey, iv)
208        # this is just to advance the counter
209        aes.decrypt_data(self._decryptor, b"\x00" * offset_small)
210
211    def set_download_status_read_event(self, read_ev):
212        self._read_ev = read_ev
213    def set_download_status(self, ds):
214        self._download_status = ds
215
216    def registerProducer(self, producer, streaming):
217        # this passes through, so the real consumer can flow-control the real
218        # producer. Therefore we don't need to provide any IPushProducer
219        # methods. We implement all the IConsumer methods as pass-throughs,
220        # and only intercept write() to perform decryption.
221        self._consumer.registerProducer(producer, streaming)
222    def unregisterProducer(self):
223        self._consumer.unregisterProducer()
224    def write(self, ciphertext):
225        started = now()
226        plaintext = aes.decrypt_data(self._decryptor, ciphertext)
227        if self._read_ev:
228            elapsed = now() - started
229            self._read_ev.update(0, elapsed, 0)
230        if self._download_status:
231            self._download_status.add_misc_event("AES", started, now())
232        self._consumer.write(plaintext)
233
234@implementer(IImmutableFileNode)
235class ImmutableFileNode(object):
236
237    # I wrap a CiphertextFileNode with a decryption key
238    def __init__(self, filecap, storage_broker, secret_holder, terminator,
239                 history):
240        assert isinstance(filecap, uri.CHKFileURI)
241        verifycap = filecap.get_verify_cap()
242        self._cnode = CiphertextFileNode(verifycap, storage_broker,
243                                         secret_holder, terminator, history)
244        assert isinstance(filecap, uri.CHKFileURI)
245        self.u = filecap
246        self._readkey = filecap.key
247
248    # TODO: I'm not sure about this.. what's the use case for node==node? If
249    # we keep it here, we should also put this on CiphertextFileNode
250    def __hash__(self):
251        return self.u.__hash__()
252
253    def __eq__(self, other):
254        if isinstance(other, ImmutableFileNode):
255            return self.u.__eq__(other.u)
256        else:
257            return False
258
259    def __ne__(self, other):
260        if isinstance(other, ImmutableFileNode):
261            return self.u.__eq__(other.u)
262        else:
263            return True
264
265    def read(self, consumer, offset=0, size=None):
266        decryptor = DecryptingConsumer(consumer, self._readkey, offset)
267        d = self._cnode.read(decryptor, offset, size)
268        d.addCallback(lambda dc: consumer)
269        return d
270
271    def raise_error(self):
272        pass
273
274    def get_write_uri(self):
275        return None
276
277    def get_readonly_uri(self):
278        return self.get_uri()
279
280    def get_uri(self):
281        return self.u.to_string()
282
283    def get_cap(self):
284        return self.u
285
286    def get_readcap(self):
287        return self.u.get_readonly()
288
289    def get_verify_cap(self):
290        return self.u.get_verify_cap()
291
292    def get_repair_cap(self):
293        # CHK files can be repaired with just the verifycap
294        return self.u.get_verify_cap()
295
296    def get_storage_index(self):
297        return self.u.get_storage_index()
298
299    def get_size(self):
300        return self.u.get_size()
301
302    def get_current_size(self):
303        return defer.succeed(self.get_size())
304
305    def is_mutable(self):
306        return False
307
308    def is_readonly(self):
309        return True
310
311    def is_unknown(self):
312        return False
313
314    def is_allowed_in_immutable_directory(self):
315        return True
316
317    def check_and_repair(self, monitor, verify=False, add_lease=False):
318        return self._cnode.check_and_repair(monitor, verify, add_lease)
319
320    def check(self, monitor, verify=False, add_lease=False):
321        return self._cnode.check(monitor, verify, add_lease)
322
323    def get_best_readable_version(self):
324        """
325        Return an IReadable of the best version of this file. Since
326        immutable files can have only one version, we just return the
327        current filenode.
328        """
329        return defer.succeed(self)
330
331    def download_best_version(self):
332        """
333        Download the best version of this file, returning its contents
334        as a bytestring. Since there is only one version of an immutable
335        file, we download and return the contents of this file.
336        """
337        d = consumer.download_to_data(self)
338        return d
339
340    # for an immutable file, download_to_data (specified in IReadable)
341    # is the same as download_best_version (specified in IFileNode). For
342    # mutable files, the difference is more meaningful, since they can
343    # have multiple versions.
344    download_to_data = download_best_version
345
346
347    # get_size() (IReadable), get_current_size() (IFilesystemNode), and
348    # get_size_of_best_version(IFileNode) are all the same for immutable
349    # files.
350    get_size_of_best_version = get_current_size
Note: See TracBrowser for help on using the repository browser.