source: trunk/src/allmydata/immutable/upload.py

Last change on this file was 6c48698, checked in by Itamar Turner-Trauring <itamar@…>, at 2024-03-01T18:00:24Z

No need for repetition.

  • Property mode set to 100644
File size: 76.5 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from __future__ import annotations
6
7from six import ensure_str
8
9import os, time, weakref, itertools
10
11import attr
12
13from zope.interface import implementer
14from twisted.python import failure
15from twisted.internet import defer
16from twisted.application import service
17from foolscap.api import Referenceable, Copyable, RemoteCopy
18
19from allmydata.crypto import aes
20from allmydata.util.hashutil import file_renewal_secret_hash, \
21     file_cancel_secret_hash, bucket_renewal_secret_hash, \
22     bucket_cancel_secret_hash, plaintext_hasher, \
23     storage_index_hash, plaintext_segment_hasher, convergence_hasher
24from allmydata.util.deferredutil import (
25    timeout_call,
26    until,
27)
28from allmydata import hashtree, uri
29from allmydata.storage.server import si_b2a
30from allmydata.immutable import encode
31from allmydata.util import base32, dictutil, idlib, log, mathutil
32from allmydata.util.happinessutil import servers_of_happiness, \
33    merge_servers, failure_message
34from allmydata.util.assertutil import precondition, _assert
35from allmydata.util.rrefutil import add_version_to_remote_reference
36from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
37     IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
38     NoServersError, InsufficientVersionError, UploadUnhappinessError, \
39     DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, IPeerSelector
40from allmydata.immutable import layout
41
42from io import BytesIO
43from .happiness_upload import share_placement, calculate_happiness
44
45from ..util.eliotutil import (
46    log_call_deferred,
47    inline_callbacks,
48)
49
50from eliot import (
51    ActionType,
52    MessageType,
53    Field,
54)
55
56_TOTAL_SHARES = Field.for_types(
57    u"total_shares",
58    [int],
59    u"The total number of shares desired.",
60)
61
62def _serialize_peers(peers):
63    return sorted(base32.b2a(p) for p in peers)
64
65_PEERS = Field(
66    u"peers",
67    _serialize_peers,
68    u"The read/write peers being considered.",
69)
70
71_READONLY_PEERS = Field(
72    u"readonly_peers",
73    _serialize_peers,
74    u"The read-only peers being considered.",
75)
76
77def _serialize_existing_shares(existing_shares):
78    return {
79        ensure_str(server): list(shares)
80        for (server, shares)
81        in existing_shares.items()
82    }
83
84_EXISTING_SHARES = Field(
85    u"existing_shares",
86    _serialize_existing_shares,
87    u"The shares that are believed to already have been placed.",
88)
89
90def _serialize_happiness_mappings(happiness_mappings):
91    return {
92        str(sharenum): ensure_str(base32.b2a(serverid))
93        for (sharenum, serverid)
94        in happiness_mappings.items()
95    }
96
97_HAPPINESS_MAPPINGS = Field(
98    u"happiness_mappings",
99    _serialize_happiness_mappings,
100    u"The computed happiness mapping for a particular upload.",
101)
102
103_HAPPINESS = Field.for_types(
104    u"happiness",
105    [int],
106    u"The computed happiness of a certain placement.",
107)
108
109_UPLOAD_TRACKERS = Field(
110    u"upload_trackers",
111    lambda trackers: list(
112        dict(
113            server=ensure_str(tracker.get_name()),
114            shareids=sorted(tracker.buckets.keys()),
115        )
116        for tracker
117        in trackers
118    ),
119    u"Some servers which have agreed to hold some shares for us.",
120)
121
122_ALREADY_SERVERIDS = Field(
123    u"already_serverids",
124    lambda d: {str(k): v for k, v in d.items()},
125    u"Some servers which are already holding some shares that we were interested in uploading.",
126)
127
128LOCATE_ALL_SHAREHOLDERS = ActionType(
129    u"immutable:upload:locate-all-shareholders",
130    [],
131    [_UPLOAD_TRACKERS, _ALREADY_SERVERIDS],
132    u"Existing shareholders are being identified to plan upload actions.",
133)
134
135GET_SHARE_PLACEMENTS = MessageType(
136    u"immutable:upload:get-share-placements",
137    [_TOTAL_SHARES, _PEERS, _READONLY_PEERS, _EXISTING_SHARES, _HAPPINESS_MAPPINGS, _HAPPINESS],
138    u"Share placement is being computed for an upload.",
139)
140
141_EFFECTIVE_HAPPINESS = Field.for_types(
142    u"effective_happiness",
143    [int],
144    u"The computed happiness value of a share placement map.",
145)
146
147CONVERGED_HAPPINESS = MessageType(
148    u"immutable:upload:get-shareholders:converged-happiness",
149    [_EFFECTIVE_HAPPINESS],
150    u"The share placement algorithm has converged and placements efforts are complete.",
151)
152
153
154# this wants to live in storage, not here
155class TooFullError(Exception):
156    pass
157
158# HelperUploadResults are what we get from the Helper, and to retain
159# backwards compatibility with old Helpers we can't change the format. We
160# convert them into a local UploadResults upon receipt.
161class HelperUploadResults(Copyable, RemoteCopy):
162    # note: don't change this string, it needs to match the value used on the
163    # helper, and it does *not* need to match the fully-qualified
164    # package/module/class name
165    #
166    # Needs to be native string to make Foolscap happy.
167    typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
168    copytype = typeToCopy
169
170    # also, think twice about changing the shape of any existing attribute,
171    # because instances of this class are sent from the helper to its client,
172    # so changing this may break compatibility. Consider adding new fields
173    # instead of modifying existing ones.
174
175    def __init__(self):
176        self.timings = {} # dict of name to number of seconds
177        self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
178        self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
179        self.file_size = None
180        self.ciphertext_fetched = None # how much the helper fetched
181        self.uri = None
182        self.preexisting_shares = None # count of shares already present
183        self.pushed_shares = None # count of shares we pushed
184
185@implementer(IUploadResults)
186class UploadResults(object):
187
188    def __init__(self, file_size,
189                 ciphertext_fetched, # how much the helper fetched
190                 preexisting_shares, # count of shares already present
191                 pushed_shares, # count of shares we pushed
192                 sharemap, # {shnum: set(server)}
193                 servermap, # {server: set(shnum)}
194                 timings, # dict of name to number of seconds
195                 uri_extension_data,
196                 uri_extension_hash,
197                 verifycapstr):
198        self._file_size = file_size
199        self._ciphertext_fetched = ciphertext_fetched
200        self._preexisting_shares = preexisting_shares
201        self._pushed_shares = pushed_shares
202        self._sharemap = sharemap
203        self._servermap = servermap
204        self._timings = timings
205        self._uri_extension_data = uri_extension_data
206        self._uri_extension_hash = uri_extension_hash
207        self._verifycapstr = verifycapstr
208
209    def set_uri(self, uri):
210        self._uri = uri
211
212    def get_file_size(self):
213        return self._file_size
214    def get_uri(self):
215        return self._uri
216    def get_ciphertext_fetched(self):
217        return self._ciphertext_fetched
218    def get_preexisting_shares(self):
219        return self._preexisting_shares
220    def get_pushed_shares(self):
221        return self._pushed_shares
222    def get_sharemap(self):
223        return self._sharemap
224    def get_servermap(self):
225        return self._servermap
226    def get_timings(self):
227        return self._timings
228    def get_uri_extension_data(self):
229        return self._uri_extension_data
230    def get_verifycapstr(self):
231        return self._verifycapstr
232
233
234def pretty_print_shnum_to_servers(s):
235    return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ])
236
237
238class ServerTracker(object):
239    def __init__(self, server,
240                 sharesize, blocksize, num_segments, num_share_hashes,
241                 storage_index,
242                 bucket_renewal_secret, bucket_cancel_secret,
243                 uri_extension_size):
244        self._server = server
245        self.buckets = {} # k: shareid, v: IRemoteBucketWriter
246        self.sharesize = sharesize
247        self.uri_extension_size = uri_extension_size
248
249        wbp = layout.make_write_bucket_proxy(None, None, sharesize,
250                                             blocksize, num_segments,
251                                             num_share_hashes,
252                                             uri_extension_size)
253        self.wbp_class = wbp.__class__ # to create more of them
254        self.allocated_size = wbp.get_allocated_size()
255        self.blocksize = blocksize
256        self.num_segments = num_segments
257        self.num_share_hashes = num_share_hashes
258        self.storage_index = storage_index
259
260        self.renew_secret = bucket_renewal_secret
261        self.cancel_secret = bucket_cancel_secret
262
263    def __repr__(self):
264        return ("<ServerTracker for server %r and SI %r>"
265                % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
266
267    def get_server(self):
268        return self._server
269    def get_serverid(self):
270        return self._server.get_serverid()
271    def get_name(self):
272        return self._server.get_name()
273
274    def query(self, sharenums):
275        storage_server = self._server.get_storage_server()
276        d = storage_server.allocate_buckets(
277            self.storage_index,
278            self.renew_secret,
279            self.cancel_secret,
280            sharenums,
281            self.allocated_size,
282            canary=Referenceable(),
283        )
284        d.addCallback(self._buckets_allocated)
285        return d
286
287    def ask_about_existing_shares(self):
288        storage_server = self._server.get_storage_server()
289        return storage_server.get_buckets(self.storage_index)
290
291    def _buckets_allocated(self, alreadygot_and_buckets):
292        #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
293        (alreadygot, buckets) = alreadygot_and_buckets
294        b = {}
295        for sharenum, rref in list(buckets.items()):
296            bp = self.wbp_class(rref, self._server, self.sharesize,
297                                self.blocksize,
298                                self.num_segments,
299                                self.num_share_hashes,
300                                self.uri_extension_size)
301            b[sharenum] = bp
302        self.buckets.update(b)
303        return (alreadygot, set(b.keys()))
304
305
306    def abort(self):
307        """
308        I abort the remote bucket writers for all shares. This is a good idea
309        to conserve space on the storage server.
310        """
311        self.abort_some_buckets(list(self.buckets.keys()))
312
313    def abort_some_buckets(self, sharenums):
314        """
315        I abort the remote bucket writers for the share numbers in sharenums.
316        """
317        for sharenum in sharenums:
318            if sharenum in self.buckets:
319                self.buckets[sharenum].abort()
320                del self.buckets[sharenum]
321
322
323def str_shareloc(shnum, bucketwriter):
324    return "%s: %s" % (shnum, ensure_str(bucketwriter.get_servername()),)
325
326
327@implementer(IPeerSelector)
328class PeerSelector(object):
329
330    def __init__(self, num_segments, total_shares, needed_shares, min_happiness):
331        self.num_segments = num_segments
332        self.total_shares = total_shares
333        self.needed_shares = needed_shares
334        self.min_happiness = min_happiness
335
336        self.existing_shares = {}
337        self.peers = set()
338        self.readonly_peers = set()
339        self.bad_peers = set()
340
341    def add_peer_with_share(self, peerid, shnum):
342        try:
343            self.existing_shares[peerid].add(shnum)
344        except KeyError:
345            self.existing_shares[peerid] = set([shnum])
346
347    def add_peer(self, peerid):
348        self.peers.add(peerid)
349
350    def mark_readonly_peer(self, peerid):
351        self.readonly_peers.add(peerid)
352        self.peers.remove(peerid)
353
354    def mark_bad_peer(self, peerid):
355        if peerid in self.peers:
356            self.peers.remove(peerid)
357            self.bad_peers.add(peerid)
358        elif peerid in self.readonly_peers:
359            self.readonly_peers.remove(peerid)
360            self.bad_peers.add(peerid)
361
362    def get_sharemap_of_preexisting_shares(self):
363        preexisting = dictutil.DictOfSets()
364        for server, shares in self.existing_shares.items():
365            for share in shares:
366                preexisting.add(share, server)
367        return preexisting
368
369    def get_share_placements(self):
370        shares = set(range(self.total_shares))
371        self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares)
372        self.happiness = calculate_happiness(self.happiness_mappings)
373        GET_SHARE_PLACEMENTS.log(
374            total_shares=self.total_shares,
375            peers=self.peers,
376            readonly_peers=self.readonly_peers,
377            existing_shares=self.existing_shares,
378            happiness_mappings=self.happiness_mappings,
379            happiness=self.happiness,
380        )
381        return self.happiness_mappings
382
383    def add_peers(self, peerids=None):
384        raise NotImplementedError
385
386
387class _QueryStatistics(object):
388
389    def __init__(self):
390        self.total = 0
391        self.good = 0
392        self.bad = 0
393        self.full = 0
394        self.error = 0
395        self.contacted = 0
396
397    def __str__(self):
398        return "QueryStatistics(total={} good={} bad={} full={} " \
399            "error={} contacted={})".format(
400                self.total,
401                self.good,
402                self.bad,
403                self.full,
404                self.error,
405                self.contacted,
406            )
407
408
409class Tahoe2ServerSelector(log.PrefixingLogMixin):
410
411    def __init__(self, upload_id, logparent=None, upload_status=None, reactor=None):
412        self.upload_id = upload_id
413        self._query_stats = _QueryStatistics()
414        self.last_failure_msg = None
415        self._status = IUploadStatus(upload_status)
416        log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
417        self.log("starting", level=log.OPERATIONAL)
418        if reactor is None:
419            from twisted.internet import reactor
420        self._reactor = reactor
421
422    def __repr__(self):
423        return "<Tahoe2ServerSelector for upload %r>" % self.upload_id
424
425    def _create_trackers(self, candidate_servers, allocated_size,
426                         file_renewal_secret, file_cancel_secret, create_server_tracker):
427
428        # filter the list of servers according to which ones can accomodate
429        # this request. This excludes older servers (which used a 4-byte size
430        # field) from getting large shares (for files larger than about
431        # 12GiB). See #439 for details.
432        def _get_maxsize(server):
433            v0 = server.get_version()
434            v1 = v0[b"http://allmydata.org/tahoe/protocols/storage/v1"]
435            return v1[b"maximum-immutable-share-size"]
436
437        for server in candidate_servers:
438            self.peer_selector.add_peer(server.get_serverid())
439        writeable_servers = [
440            server for server in candidate_servers
441            if _get_maxsize(server) >= allocated_size
442        ]
443        readonly_servers = set(candidate_servers) - set(writeable_servers)
444
445        for server in readonly_servers:
446            self.peer_selector.mark_readonly_peer(server.get_serverid())
447
448        def _make_trackers(servers):
449            trackers = []
450            for s in servers:
451                seed = s.get_lease_seed()
452                renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
453                cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
454                st = create_server_tracker(s, renew, cancel)
455                trackers.append(st)
456            return trackers
457
458        write_trackers = _make_trackers(writeable_servers)
459
460        # We don't try to allocate shares to these servers, since they've
461        # said that they're incapable of storing shares of the size that we'd
462        # want to store. We ask them about existing shares for this storage
463        # index, which we want to know about for accurate
464        # servers_of_happiness accounting, then we forget about them.
465        readonly_trackers = _make_trackers(readonly_servers)
466
467        return readonly_trackers, write_trackers
468
469    @inline_callbacks
470    def get_shareholders(self, storage_broker, secret_holder,
471                         storage_index, share_size, block_size,
472                         num_segments, total_shares, needed_shares,
473                         min_happiness, uri_extension_size):
474        """
475        @return: (upload_trackers, already_serverids), where upload_trackers
476                 is a set of ServerTracker instances that have agreed to hold
477                 some shares for us (the shareids are stashed inside the
478                 ServerTracker), and already_serverids is a dict mapping
479                 shnum to a set of serverids for servers which claim to
480                 already have the share.
481        """
482
483        # re-initialize statistics
484        self._query_status = _QueryStatistics()
485
486        if self._status:
487            self._status.set_status("Contacting Servers..")
488
489        self.peer_selector = PeerSelector(num_segments, total_shares,
490                                          needed_shares, min_happiness)
491
492        self.total_shares = total_shares
493        self.min_happiness = min_happiness
494        self.needed_shares = needed_shares
495
496        self.homeless_shares = set(range(total_shares))
497        self.use_trackers = set() # ServerTrackers that have shares assigned
498                                  # to them
499        self.preexisting_shares = {} # shareid => set(serverids) holding shareid
500
501        # These servers have shares -- any shares -- for our SI. We keep
502        # track of these to write an error message with them later.
503        self.serverids_with_shares = set()
504
505        # this needed_hashes computation should mirror
506        # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
507        # (instead of a HashTree) because we don't require actual hashing
508        # just to count the levels.
509        ht = hashtree.IncompleteHashTree(total_shares)
510        num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
511
512        # figure out how much space to ask for
513        wbp = layout.make_write_bucket_proxy(None, None,
514                                             share_size, 0, num_segments,
515                                             num_share_hashes,
516                                             uri_extension_size)
517        allocated_size = wbp.get_allocated_size()
518
519        # decide upon the renewal/cancel secrets, to include them in the
520        # allocate_buckets query.
521        file_renewal_secret = file_renewal_secret_hash(
522            secret_holder.get_renewal_secret(),
523            storage_index,
524        )
525        file_cancel_secret = file_cancel_secret_hash(
526            secret_holder.get_cancel_secret(),
527            storage_index,
528        )
529
530        # see docs/specifications/servers-of-happiness.rst
531        # 0. Start with an ordered list of servers. Maybe *2N* of them.
532        #
533
534        all_servers = storage_broker.get_servers_for_psi(storage_index, for_upload=True)
535        if not all_servers:
536            raise NoServersError("client gave us zero servers")
537
538        def _create_server_tracker(server, renew, cancel):
539            return ServerTracker(
540                server, share_size, block_size, num_segments, num_share_hashes,
541                storage_index, renew, cancel, uri_extension_size
542            )
543
544        readonly_trackers, write_trackers = self._create_trackers(
545            all_servers[:(2 * total_shares)],
546            allocated_size,
547            file_renewal_secret,
548            file_cancel_secret,
549            _create_server_tracker,
550        )
551
552        # see docs/specifications/servers-of-happiness.rst
553        # 1. Query all servers for existing shares.
554        #
555        # The spec doesn't say what to do for timeouts/errors. This
556        # adds a timeout to each request, and rejects any that reply
557        # with error (i.e. just removed from the list)
558
559        ds = []
560        if self._status and readonly_trackers:
561            self._status.set_status(
562                "Contacting readonly servers to find any existing shares"
563            )
564
565        # in the "pre servers-of-happiness" code, it was a little
566        # ambigious whether "merely asking" counted as a "query" or
567        # not, because "allocate_buckets" with nothing to allocate was
568        # used to "ask" a write-able server what it held. Now we count
569        # "actual allocation queries" only, because those are the only
570        # things that actually affect what the server does.
571
572        for tracker in readonly_trackers:
573            assert isinstance(tracker, ServerTracker)
574            d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15)
575            d.addBoth(self._handle_existing_response, tracker)
576            ds.append(d)
577            self.log("asking server %r for any existing shares" %
578                     (tracker.get_name(),), level=log.NOISY)
579
580        for tracker in write_trackers:
581            assert isinstance(tracker, ServerTracker)
582            d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15)
583
584            def timed_out(f, tracker):
585                # print("TIMEOUT {}: {}".format(tracker, f))
586                write_trackers.remove(tracker)
587                readonly_trackers.append(tracker)
588                return f
589            d.addErrback(timed_out, tracker)
590            d.addBoth(self._handle_existing_write_response, tracker, set())
591            ds.append(d)
592            self.log("asking server %r for any existing shares" %
593                     (tracker.get_name(),), level=log.NOISY)
594
595        trackers = set(write_trackers) | set(readonly_trackers)
596
597        # these will always be (True, None) because errors are handled
598        # in the _handle_existing_write_response etc callbacks
599        yield defer.DeferredList(ds)
600
601        # okay, we've queried the 2N servers, time to get the share
602        # placements and attempt to actually place the shares (or
603        # renew them on read-only servers). We want to run the loop
604        # below *at least once* because even read-only servers won't
605        # renew their shares until "allocate_buckets" is called (via
606        # tracker.query())
607
608        # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/778#comment:48
609        # min_happiness will be 0 for the repairer, so we set current
610        # effective_happiness to less than zero so this loop runs at
611        # least once for the repairer...
612
613        def _bad_server(fail, tracker):
614            self.last_failure_msg = fail
615            return False  # will mark it readonly
616
617        def _make_readonly(tracker):
618            # print("making {} read-only".format(tracker.get_serverid()))
619            try:
620                write_trackers.remove(tracker)
621            except ValueError:
622                pass
623            # XXX can we just use a set() or does order matter?
624            if tracker not in readonly_trackers:
625                readonly_trackers.append(tracker)
626            return None
627
628        # so we *always* want to run this loop at least once, even if
629        # we only have read-only servers -- because asking them to
630        # allocate buckets renews those shares they already have. For
631        # subsequent loops, we give up if we've achieved happiness OR
632        # if we have zero writable servers left
633
634        last_happiness = None
635        effective_happiness = -1
636        while effective_happiness < min_happiness and \
637              (last_happiness is None or len(write_trackers)):
638            errors_before = self._query_stats.bad
639            self._share_placements = self.peer_selector.get_share_placements()
640
641            placements = []
642            for tracker in trackers:
643                shares_to_ask = self._allocation_for(tracker)
644
645                # if we already tried to upload share X to this very
646                # same server in a previous iteration, we should *not*
647                # ask again. If we *do* ask, there's no real harm, but
648                # the server will respond with an empty dict and that
649                # confuses our statistics. However, if the server is a
650                # readonly sever, we *do* want to ask so it refreshes
651                # the share.
652                if shares_to_ask != set(tracker.buckets.keys()) or tracker in readonly_trackers:
653                    self._query_stats.total += 1
654                    self._query_stats.contacted += 1
655                    d = timeout_call(self._reactor, tracker.query(shares_to_ask), 15)
656                    d.addBoth(self._buckets_allocated, tracker, shares_to_ask)
657                    d.addErrback(lambda f, tr: _bad_server(f, tr), tracker)
658                    d.addCallback(lambda x, tr: _make_readonly(tr) if not x else x, tracker)
659                    placements.append(d)
660
661            yield defer.DeferredList(placements)
662            merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
663            effective_happiness = servers_of_happiness(merged)
664            if effective_happiness == last_happiness:
665                # print("effective happiness still {}".format(last_happiness))
666                # we haven't improved over the last iteration; give up
667                break;
668            if errors_before == self._query_stats.bad:
669                break;
670            last_happiness = effective_happiness
671            # print("write trackers left: {}".format(len(write_trackers)))
672
673        # note: peer_selector.get_allocations() only maps "things we
674        # uploaded in the above loop" and specificaly does *not*
675        # include any pre-existing shares on read-only servers .. but
676        # we *do* want to count those shares towards total happiness.
677
678        # no more servers. If we haven't placed enough shares, we fail.
679        # XXX note sometimes we're not running the loop at least once,
680        # and so 'merged' must be (re-)computed here.
681        merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
682        effective_happiness = servers_of_happiness(merged)
683
684        # print("placements completed {} vs {}".format(effective_happiness, min_happiness))
685        # for k, v in merged.items():
686        #     print("  {} -> {}".format(k, v))
687
688        CONVERGED_HAPPINESS.log(
689            effective_happiness=effective_happiness,
690        )
691
692        if effective_happiness < min_happiness:
693            msg = failure_message(
694                peer_count=len(self.serverids_with_shares),
695                k=self.needed_shares,
696                happy=min_happiness,
697                effective_happy=effective_happiness,
698            )
699            msg = ("server selection failed for %s: %s (%s), merged=%s" %
700                   (self, msg, self._get_progress_message(),
701                    pretty_print_shnum_to_servers(merged)))
702            if self.last_failure_msg:
703                msg += " (%s)" % (self.last_failure_msg,)
704            self.log(msg, level=log.UNUSUAL)
705            self._failed(msg)  # raises UploadUnhappinessError
706            return
707
708        # we placed (or already had) enough to be happy, so we're done
709        if self._status:
710            self._status.set_status("Placed all shares")
711        msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
712               "self.use_trackers: %s, self.preexisting_shares: %s") \
713               % (self, self._get_progress_message(),
714                  pretty_print_shnum_to_servers(merged),
715                  [', '.join([str_shareloc(k,v)
716                              for k,v in st.buckets.items()])
717                   for st in self.use_trackers],
718                  pretty_print_shnum_to_servers(self.preexisting_shares))
719        self.log(msg, level=log.OPERATIONAL)
720        defer.returnValue((self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares()))
721
722    def _handle_existing_response(self, res, tracker):
723        """
724        I handle responses to the queries sent by
725        Tahoe2ServerSelector.get_shareholders.
726        """
727        serverid = tracker.get_serverid()
728        if isinstance(res, failure.Failure):
729            self.log("%s got error during existing shares check: %s"
730                    % (tracker.get_name(), res), level=log.UNUSUAL)
731            self.peer_selector.mark_bad_peer(serverid)
732        else:
733            buckets = res
734            if buckets:
735                self.serverids_with_shares.add(serverid)
736            self.log("response to get_buckets() from server %r: alreadygot=%s"
737                    % (tracker.get_name(), tuple(sorted(buckets))),
738                    level=log.NOISY)
739            for bucket in buckets:
740                self.peer_selector.add_peer_with_share(serverid, bucket)
741                self.preexisting_shares.setdefault(bucket, set()).add(serverid)
742                self.homeless_shares.discard(bucket)
743
744    def _handle_existing_write_response(self, res, tracker, shares_to_ask):
745        """
746        Function handles the response from the write servers
747        when inquiring about what shares each server already has.
748        """
749        if isinstance(res, failure.Failure):
750            self.peer_selector.mark_bad_peer(tracker.get_serverid())
751            self.log("%s got error during server selection: %s" % (tracker, res),
752                    level=log.UNUSUAL)
753            self.homeless_shares |= shares_to_ask
754            msg = ("last failure (from %s) was: %s" % (tracker, res))
755            self.last_failure_msg = msg
756        else:
757            for share in res.keys():
758                self.peer_selector.add_peer_with_share(tracker.get_serverid(), share)
759
760    def _get_progress_message(self):
761        if not self.homeless_shares:
762            msg = "placed all %d shares, " % (self.total_shares)
763        else:
764            msg = ("placed %d shares out of %d total (%d homeless), " %
765                   (self.total_shares - len(self.homeless_shares),
766                    self.total_shares,
767                    len(self.homeless_shares)))
768        assert self._query_stats.bad == (self._query_stats.full + self._query_stats.error)
769        return (
770            msg + "want to place shares on at least {happy} servers such that "
771            "any {needed} of them have enough shares to recover the file, "
772            "sent {queries} queries to {servers} servers, "
773            "{good} queries placed some shares, {bad} placed none "
774            "(of which {full} placed none due to the server being"
775            " full and {error} placed none due to an error)".format(
776                happy=self.min_happiness,
777                needed=self.needed_shares,
778                queries=self._query_stats.total,
779                servers=self._query_stats.contacted,
780                good=self._query_stats.good,
781                bad=self._query_stats.bad,
782                full=self._query_stats.full,
783                error=self._query_stats.error
784            )
785        )
786
787    def _allocation_for(self, tracker):
788        """
789        Given a ServerTracker, return a list of shares that we should
790        store on that server.
791        """
792        assert isinstance(tracker, ServerTracker)
793
794        shares_to_ask = set()
795        servermap = self._share_placements
796        for shnum, tracker_id in list(servermap.items()):
797            if tracker_id == None:
798                continue
799            if tracker.get_serverid() == tracker_id:
800                shares_to_ask.add(shnum)
801                if shnum in self.homeless_shares:
802                    self.homeless_shares.remove(shnum)
803
804        if self._status:
805            self._status.set_status("Contacting Servers [%r] (first query),"
806                                    " %d shares left.."
807                                    % (tracker.get_name(),
808                                       len(self.homeless_shares)))
809        return shares_to_ask
810
811    def _buckets_allocated(self, res, tracker, shares_to_ask):
812        """
813        Internal helper. If this returns an error or False, the server
814        will be considered read-only for any future iterations.
815        """
816        if isinstance(res, failure.Failure):
817            # This is unusual, and probably indicates a bug or a network
818            # problem.
819            self.log("%s got error during server selection: %s" % (tracker, res),
820                    level=log.UNUSUAL)
821            self._query_stats.error += 1
822            self._query_stats.bad += 1
823            self.homeless_shares |= shares_to_ask
824            try:
825                self.peer_selector.mark_readonly_peer(tracker.get_serverid())
826            except KeyError:
827                pass
828            return res
829
830        else:
831            (alreadygot, allocated) = res
832            self.log("response to allocate_buckets() from server %r: alreadygot=%s, allocated=%s"
833                    % (tracker.get_name(),
834                       tuple(sorted(alreadygot)), tuple(sorted(allocated))),
835                    level=log.NOISY)
836            progress = False
837            for s in alreadygot:
838                self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
839                if s in self.homeless_shares:
840                    self.homeless_shares.remove(s)
841                    progress = True
842                elif s in shares_to_ask:
843                    progress = True
844
845            # the ServerTracker will remember which shares were allocated on
846            # that peer. We just have to remember to use them.
847            if allocated:
848                self.use_trackers.add(tracker)
849                progress = True
850
851            if allocated or alreadygot:
852                self.serverids_with_shares.add(tracker.get_serverid())
853
854            not_yet_present = set(shares_to_ask) - set(alreadygot)
855            still_homeless = not_yet_present - set(allocated)
856
857            if still_homeless:
858                # In networks with lots of space, this is very unusual and
859                # probably indicates an error. In networks with servers that
860                # are full, it is merely unusual. In networks that are very
861                # full, it is common, and many uploads will fail. In most
862                # cases, this is obviously not fatal, and we'll just use some
863                # other servers.
864
865                # some shares are still homeless, keep trying to find them a
866                # home. The ones that were rejected get first priority.
867                self.homeless_shares |= still_homeless
868                # Since they were unable to accept all of our requests, so it
869                # is safe to assume that asking them again won't help.
870
871            if progress:
872                # They accepted at least one of the shares that we asked
873                # them to accept, or they had a share that we didn't ask
874                # them to accept but that we hadn't placed yet, so this
875                # was a productive query
876                self._query_stats.good += 1
877            else:
878                # if we asked for some allocations, but the server
879                # didn't return any at all (i.e. empty dict) it must
880                # be full
881                self._query_stats.full += 1
882                self._query_stats.bad += 1
883            return progress
884
885    def _failed(self, msg):
886        """
887        I am called when server selection fails. I first abort all of the
888        remote buckets that I allocated during my unsuccessful attempt to
889        place shares for this file. I then raise an
890        UploadUnhappinessError with my msg argument.
891        """
892        for tracker in self.use_trackers:
893            assert isinstance(tracker, ServerTracker)
894            tracker.abort()
895        raise UploadUnhappinessError(msg)
896
897
898@attr.s
899class _Accum(object):
900    """
901    Accumulate up to some known amount of ciphertext.
902
903    :ivar remaining: The number of bytes still expected.
904    :ivar ciphertext: The bytes accumulated so far.
905    """
906    remaining : int = attr.ib(validator=attr.validators.instance_of(int))
907    ciphertext : list[bytes] = attr.ib(default=attr.Factory(list))
908
909    def extend(self,
910               size,           # type: int
911               ciphertext,     # type: list[bytes]
912    ):
913        """
914        Accumulate some more ciphertext.
915
916        :param size: The amount of data the new ciphertext represents towards
917            the goal.  This may be more than the actual size of the given
918            ciphertext if the source has run out of data.
919
920        :param ciphertext: The new ciphertext to accumulate.
921        """
922        self.remaining -= size
923        self.ciphertext.extend(ciphertext)
924
925
926@implementer(IEncryptedUploadable)
927class EncryptAnUploadable(object):
928    """This is a wrapper that takes an IUploadable and provides
929    IEncryptedUploadable."""
930    CHUNKSIZE = 50*1024
931
932    def __init__(self, original, log_parent=None, chunk_size=None):
933        """
934        :param chunk_size: The number of bytes to read from the uploadable at a
935            time, or None for some default.
936        """
937        precondition(original.default_params_set,
938                     "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
939        self.original = IUploadable(original)
940        self._log_number = log_parent
941        self._encryptor = None
942        self._plaintext_hasher = plaintext_hasher()
943        self._plaintext_segment_hasher = None
944        self._plaintext_segment_hashes = []
945        self._encoding_parameters = None
946        self._file_size = None
947        self._ciphertext_bytes_read = 0
948        self._status = None
949        if chunk_size is not None:
950            self.CHUNKSIZE = chunk_size
951
952    def set_upload_status(self, upload_status):
953        self._status = IUploadStatus(upload_status)
954        self.original.set_upload_status(upload_status)
955
956    def log(self, *args, **kwargs):
957        if "facility" not in kwargs:
958            kwargs["facility"] = "upload.encryption"
959        if "parent" not in kwargs:
960            kwargs["parent"] = self._log_number
961        return log.msg(*args, **kwargs)
962
963    def get_size(self):
964        if self._file_size is not None:
965            return defer.succeed(self._file_size)
966        d = self.original.get_size()
967        def _got_size(size):
968            self._file_size = size
969            if self._status:
970                self._status.set_size(size)
971            return size
972        d.addCallback(_got_size)
973        return d
974
975    def get_all_encoding_parameters(self):
976        if self._encoding_parameters is not None:
977            return defer.succeed(self._encoding_parameters)
978        d = self.original.get_all_encoding_parameters()
979        def _got(encoding_parameters):
980            (k, happy, n, segsize) = encoding_parameters
981            self._segment_size = segsize # used by segment hashers
982            self._encoding_parameters = encoding_parameters
983            self.log("my encoding parameters: %s" % (encoding_parameters,),
984                     level=log.NOISY)
985            return encoding_parameters
986        d.addCallback(_got)
987        return d
988
989    def _get_encryptor(self):
990        if self._encryptor:
991            return defer.succeed(self._encryptor)
992
993        d = self.original.get_encryption_key()
994        def _got(key):
995            self._encryptor = aes.create_encryptor(key)
996
997            storage_index = storage_index_hash(key)
998            assert isinstance(storage_index, bytes)
999            # There's no point to having the SI be longer than the key, so we
1000            # specify that it is truncated to the same 128 bits as the AES key.
1001            assert len(storage_index) == 16  # SHA-256 truncated to 128b
1002            self._storage_index = storage_index
1003            if self._status:
1004                self._status.set_storage_index(storage_index)
1005            return self._encryptor
1006        d.addCallback(_got)
1007        return d
1008
1009    def get_storage_index(self):
1010        d = self._get_encryptor()
1011        d.addCallback(lambda res: self._storage_index)
1012        return d
1013
1014    def _get_segment_hasher(self):
1015        p = self._plaintext_segment_hasher
1016        if p:
1017            left = self._segment_size - self._plaintext_segment_hashed_bytes
1018            return p, left
1019        p = plaintext_segment_hasher()
1020        self._plaintext_segment_hasher = p
1021        self._plaintext_segment_hashed_bytes = 0
1022        return p, self._segment_size
1023
1024    def _update_segment_hash(self, chunk):
1025        offset = 0
1026        while offset < len(chunk):
1027            p, segment_left = self._get_segment_hasher()
1028            chunk_left = len(chunk) - offset
1029            this_segment = min(chunk_left, segment_left)
1030            p.update(chunk[offset:offset+this_segment])
1031            self._plaintext_segment_hashed_bytes += this_segment
1032
1033            if self._plaintext_segment_hashed_bytes == self._segment_size:
1034                # we've filled this segment
1035                self._plaintext_segment_hashes.append(p.digest())
1036                self._plaintext_segment_hasher = None
1037                self.log("closed hash [%d]: %dB" %
1038                         (len(self._plaintext_segment_hashes)-1,
1039                          self._plaintext_segment_hashed_bytes),
1040                         level=log.NOISY)
1041                self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
1042                         segnum=len(self._plaintext_segment_hashes)-1,
1043                         hash=base32.b2a(p.digest()),
1044                         level=log.NOISY)
1045
1046            offset += this_segment
1047
1048
1049    def read_encrypted(self, length, hash_only):
1050        # make sure our parameters have been set up first
1051        d = self.get_all_encoding_parameters()
1052        # and size
1053        d.addCallback(lambda ignored: self.get_size())
1054        d.addCallback(lambda ignored: self._get_encryptor())
1055
1056        accum = _Accum(length)
1057
1058        def action():
1059            """
1060            Read some bytes into the accumulator.
1061            """
1062            return self._read_encrypted(accum, hash_only)
1063
1064        def condition():
1065            """
1066            Check to see if the accumulator has all the data.
1067            """
1068            return accum.remaining == 0
1069
1070        d.addCallback(lambda ignored: until(action, condition))
1071        d.addCallback(lambda ignored: accum.ciphertext)
1072        return d
1073
1074    def _read_encrypted(self,
1075                        ciphertext_accum,  # type: _Accum
1076                        hash_only,         # type: bool
1077    ):
1078        # type: (...) -> defer.Deferred
1079        """
1080        Read the next chunk of plaintext, encrypt it, and extend the accumulator
1081        with the resulting ciphertext.
1082        """
1083        # tolerate large length= values without consuming a lot of RAM by
1084        # reading just a chunk (say 50kB) at a time. This only really matters
1085        # when hash_only==True (i.e. resuming an interrupted upload), since
1086        # that's the case where we will be skipping over a lot of data.
1087        size = min(ciphertext_accum.remaining, self.CHUNKSIZE)
1088
1089        # read a chunk of plaintext..
1090        d = defer.maybeDeferred(self.original.read, size)
1091        def _good(plaintext):
1092            # and encrypt it..
1093            # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
1094            ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
1095            # Intentionally tell the accumulator about the expected size, not
1096            # the actual size.  If we run out of data we still want remaining
1097            # to drop otherwise it will never reach 0 and the loop will never
1098            # end.
1099            ciphertext_accum.extend(size, ct)
1100        d.addCallback(_good)
1101        return d
1102
1103    def _hash_and_encrypt_plaintext(self, data, hash_only):
1104        assert isinstance(data, (tuple, list)), type(data)
1105        data = list(data)
1106        cryptdata = []
1107        # we use data.pop(0) instead of 'for chunk in data' to save
1108        # memory: each chunk is destroyed as soon as we're done with it.
1109        bytes_processed = 0
1110        while data:
1111            chunk = data.pop(0)
1112            self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
1113                     level=log.NOISY)
1114            bytes_processed += len(chunk)
1115            self._plaintext_hasher.update(chunk)
1116            self._update_segment_hash(chunk)
1117            # TODO: we have to encrypt the data (even if hash_only==True)
1118            # because the AES-CTR implementation doesn't offer a
1119            # way to change the counter value. Once it acquires
1120            # this ability, change this to simply update the counter
1121            # before each call to (hash_only==False) encrypt_data
1122            ciphertext = aes.encrypt_data(self._encryptor, chunk)
1123            if hash_only:
1124                self.log("  skipping encryption", level=log.NOISY)
1125            else:
1126                cryptdata.append(ciphertext)
1127            del ciphertext
1128            del chunk
1129        self._ciphertext_bytes_read += bytes_processed
1130        if self._status:
1131            progress = float(self._ciphertext_bytes_read) / self._file_size
1132            self._status.set_progress(1, progress)
1133        return cryptdata
1134
1135
1136    def get_plaintext_hashtree_leaves(self, first, last, num_segments):
1137        # this is currently unused, but will live again when we fix #453
1138        if len(self._plaintext_segment_hashes) < num_segments:
1139            # close out the last one
1140            assert len(self._plaintext_segment_hashes) == num_segments-1
1141            p, segment_left = self._get_segment_hasher()
1142            self._plaintext_segment_hashes.append(p.digest())
1143            del self._plaintext_segment_hasher
1144            self.log("closing plaintext leaf hasher, hashed %d bytes" %
1145                     self._plaintext_segment_hashed_bytes,
1146                     level=log.NOISY)
1147            self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
1148                     segnum=len(self._plaintext_segment_hashes)-1,
1149                     hash=base32.b2a(p.digest()),
1150                     level=log.NOISY)
1151        assert len(self._plaintext_segment_hashes) == num_segments
1152        return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
1153
1154    def get_plaintext_hash(self):
1155        h = self._plaintext_hasher.digest()
1156        return defer.succeed(h)
1157
1158    def close(self):
1159        return self.original.close()
1160
1161@implementer(IUploadStatus)
1162class UploadStatus(object):
1163    statusid_counter = itertools.count(0)
1164
1165    def __init__(self):
1166        self.storage_index = None
1167        self.size = None
1168        self.helper = False
1169        self.status = "Not started"
1170        self.progress = [0.0, 0.0, 0.0]
1171        self.active = True
1172        self.results = None
1173        self.counter = next(self.statusid_counter)
1174        self.started = time.time()
1175
1176    def get_started(self):
1177        return self.started
1178    def get_storage_index(self):
1179        return self.storage_index
1180    def get_size(self):
1181        return self.size
1182    def using_helper(self):
1183        return self.helper
1184    def get_status(self):
1185        return self.status
1186    def get_progress(self):
1187        return tuple(self.progress)
1188    def get_active(self):
1189        return self.active
1190    def get_results(self):
1191        return self.results
1192    def get_counter(self):
1193        return self.counter
1194
1195    def set_storage_index(self, si):
1196        self.storage_index = si
1197    def set_size(self, size):
1198        self.size = size
1199    def set_helper(self, helper):
1200        self.helper = helper
1201    def set_status(self, status):
1202        self.status = status
1203    def set_progress(self, which, value):
1204        # [0]: chk, [1]: ciphertext, [2]: encode+push
1205        self.progress[which] = value
1206    def set_active(self, value):
1207        self.active = value
1208    def set_results(self, value):
1209        self.results = value
1210
1211class CHKUploader(object):
1212
1213    def __init__(self, storage_broker, secret_holder, reactor=None):
1214        # server_selector needs storage_broker and secret_holder
1215        self._storage_broker = storage_broker
1216        self._secret_holder = secret_holder
1217        self._log_number = self.log("CHKUploader starting", parent=None)
1218        self._encoder = None
1219        self._storage_index = None
1220        self._upload_status = UploadStatus()
1221        self._upload_status.set_helper(False)
1222        self._upload_status.set_active(True)
1223        self._reactor = reactor
1224
1225        # locate_all_shareholders() will create the following attribute:
1226        # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1227
1228    def log(self, *args, **kwargs):
1229        if "parent" not in kwargs:
1230            kwargs["parent"] = self._log_number
1231        if "facility" not in kwargs:
1232            kwargs["facility"] = "tahoe.upload"
1233        return log.msg(*args, **kwargs)
1234
1235    @log_call_deferred(action_type=u"immutable:upload:chk:start")
1236    def start(self, encrypted_uploadable):
1237        """Start uploading the file.
1238
1239        Returns a Deferred that will fire with the UploadResults instance.
1240        """
1241
1242        self._started = time.time()
1243        eu = IEncryptedUploadable(encrypted_uploadable)
1244        self.log("starting upload of %s" % eu)
1245
1246        eu.set_upload_status(self._upload_status)
1247        d = self.start_encrypted(eu)
1248        def _done(uploadresults):
1249            self._upload_status.set_active(False)
1250            return uploadresults
1251        d.addBoth(_done)
1252        return d
1253
1254    def abort(self):
1255        """Call this if the upload must be abandoned before it completes.
1256        This will tell the shareholders to delete their partial shares. I
1257        return a Deferred that fires when these messages have been acked."""
1258        if not self._encoder:
1259            # how did you call abort() before calling start() ?
1260            return defer.succeed(None)
1261        return self._encoder.abort()
1262
1263    @log_call_deferred(action_type=u"immutable:upload:chk:start-encrypted")
1264    @inline_callbacks
1265    def start_encrypted(self, encrypted):
1266        """
1267        Returns a Deferred that will fire with the UploadResults instance.
1268        """
1269        eu = IEncryptedUploadable(encrypted)
1270
1271        started = time.time()
1272        # would be Really Nice to make Encoder just a local; only
1273        # abort() really needs self._encoder ...
1274        self._encoder = encode.Encoder(
1275            self._log_number,
1276            self._upload_status,
1277        )
1278        # this just returns itself
1279        yield self._encoder.set_encrypted_uploadable(eu)
1280        with LOCATE_ALL_SHAREHOLDERS() as action:
1281            (upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started)
1282            action.add_success_fields(upload_trackers=upload_trackers, already_serverids=already_serverids)
1283        self.set_shareholders(upload_trackers, already_serverids, self._encoder)
1284        verifycap = yield self._encoder.start()
1285        results = self._encrypted_done(verifycap)
1286        defer.returnValue(results)
1287
1288    def locate_all_shareholders(self, encoder, started):
1289        server_selection_started = now = time.time()
1290        self._storage_index_elapsed = now - started
1291        storage_broker = self._storage_broker
1292        secret_holder = self._secret_holder
1293        storage_index = encoder.get_param("storage_index")
1294        self._storage_index = storage_index
1295        upload_id = si_b2a(storage_index)[:5]
1296        self.log("using storage index %r" % upload_id)
1297        server_selector = Tahoe2ServerSelector(
1298            upload_id,
1299            self._log_number,
1300            self._upload_status,
1301            reactor=self._reactor,
1302        )
1303
1304        share_size = encoder.get_param("share_size")
1305        block_size = encoder.get_param("block_size")
1306        num_segments = encoder.get_param("num_segments")
1307        k, desired, n = encoder.get_param("share_counts")
1308
1309        self._server_selection_started = time.time()
1310        d = server_selector.get_shareholders(storage_broker, secret_holder,
1311                                             storage_index,
1312                                             share_size, block_size,
1313                                             num_segments, n, k, desired,
1314                                             encoder.get_uri_extension_size())
1315        def _done(res):
1316            self._server_selection_elapsed = time.time() - server_selection_started
1317            return res
1318        d.addCallback(_done)
1319        return d
1320
1321    def set_shareholders(self, upload_trackers, already_serverids, encoder):
1322        """
1323        :param upload_trackers: a sequence of ServerTracker objects that
1324                                have agreed to hold some shares for us (the
1325                                shareids are stashed inside the ServerTracker)
1326
1327        :param already_serverids: a dict mapping sharenum to a set of
1328                                  serverids for servers that claim to already
1329                                  have this share
1330        """
1331        msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
1332        values = ([', '.join([str_shareloc(k,v)
1333                              for k,v in st.buckets.items()])
1334                   for st in upload_trackers], already_serverids)
1335        self.log(msgtempl % values, level=log.OPERATIONAL)
1336        # record already-present shares in self._results
1337        self._count_preexisting_shares = len(already_serverids)
1338
1339        self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1340        for tracker in upload_trackers:
1341            assert isinstance(tracker, ServerTracker)
1342        buckets = {}
1343        servermap = already_serverids.copy()
1344        for tracker in upload_trackers:
1345            buckets.update(tracker.buckets)
1346            for shnum in tracker.buckets:
1347                self._server_trackers[shnum] = tracker
1348                servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1349        assert len(buckets) == sum([len(tracker.buckets)
1350                                    for tracker in upload_trackers]), \
1351            "%s (%s) != %s (%s)" % (
1352                len(buckets),
1353                buckets,
1354                sum([len(tracker.buckets) for tracker in upload_trackers]),
1355                [(t.buckets, t.get_serverid()) for t in upload_trackers]
1356                )
1357        encoder.set_shareholders(buckets, servermap)
1358
1359    def _encrypted_done(self, verifycap):
1360        """
1361        :return UploadResults: A description of the outcome of the upload.
1362        """
1363        e = self._encoder
1364        sharemap = dictutil.DictOfSets()
1365        servermap = dictutil.DictOfSets()
1366        for shnum in e.get_shares_placed():
1367            server = self._server_trackers[shnum].get_server()
1368            sharemap.add(shnum, server)
1369            servermap.add(server, shnum)
1370        now = time.time()
1371        timings = {}
1372        timings["total"] = now - self._started
1373        timings["storage_index"] = self._storage_index_elapsed
1374        timings["peer_selection"] = self._server_selection_elapsed
1375        timings.update(e.get_times())
1376        ur = UploadResults(file_size=e.file_size,
1377                           ciphertext_fetched=0,
1378                           preexisting_shares=self._count_preexisting_shares,
1379                           pushed_shares=len(e.get_shares_placed()),
1380                           sharemap=sharemap,
1381                           servermap=servermap,
1382                           timings=timings,
1383                           uri_extension_data=e.get_uri_extension_data(),
1384                           uri_extension_hash=e.get_uri_extension_hash(),
1385                           verifycapstr=verifycap.to_string())
1386        self._upload_status.set_results(ur)
1387        return ur
1388
1389    def get_upload_status(self):
1390        return self._upload_status
1391
1392def read_this_many_bytes(uploadable, size, prepend_data=None):
1393    if prepend_data is None:
1394        prepend_data = []
1395    if size == 0:
1396        return defer.succeed([])
1397    d = uploadable.read(size)
1398    def _got(data):
1399        assert isinstance(data, list)
1400        bytes = sum([len(piece) for piece in data])
1401        assert bytes > 0
1402        assert bytes <= size
1403        remaining = size - bytes
1404        if remaining:
1405            return read_this_many_bytes(uploadable, remaining,
1406                                        prepend_data + data)
1407        return prepend_data + data
1408    d.addCallback(_got)
1409    return d
1410
1411class LiteralUploader(object):
1412
1413    def __init__(self):
1414        self._status = s = UploadStatus()
1415        s.set_storage_index(None)
1416        s.set_helper(False)
1417        s.set_progress(0, 1.0)
1418        s.set_active(False)
1419
1420    def start(self, uploadable):
1421        uploadable = IUploadable(uploadable)
1422        d = uploadable.get_size()
1423        def _got_size(size):
1424            self._size = size
1425            self._status.set_size(size)
1426            return read_this_many_bytes(uploadable, size)
1427        d.addCallback(_got_size)
1428        d.addCallback(lambda data: uri.LiteralFileURI(b"".join(data)))
1429        d.addCallback(lambda u: u.to_string())
1430        d.addCallback(self._build_results)
1431        return d
1432
1433    def _build_results(self, uri):
1434        ur = UploadResults(file_size=self._size,
1435                           ciphertext_fetched=0,
1436                           preexisting_shares=0,
1437                           pushed_shares=0,
1438                           sharemap={},
1439                           servermap={},
1440                           timings={},
1441                           uri_extension_data=None,
1442                           uri_extension_hash=None,
1443                           verifycapstr=None)
1444        ur.set_uri(uri)
1445        self._status.set_status("Finished")
1446        self._status.set_progress(1, 1.0)
1447        self._status.set_progress(2, 1.0)
1448        self._status.set_results(ur)
1449        return ur
1450
1451    def close(self):
1452        pass
1453
1454    def get_upload_status(self):
1455        return self._status
1456
1457@implementer(RIEncryptedUploadable)
1458class RemoteEncryptedUploadable(Referenceable):  # type: ignore # warner/foolscap#78
1459
1460    def __init__(self, encrypted_uploadable, upload_status):
1461        self._eu = IEncryptedUploadable(encrypted_uploadable)
1462        self._offset = 0
1463        self._bytes_sent = 0
1464        self._status = IUploadStatus(upload_status)
1465        # we are responsible for updating the status string while we run, and
1466        # for setting the ciphertext-fetch progress.
1467        self._size = None
1468
1469    def get_size(self):
1470        if self._size is not None:
1471            return defer.succeed(self._size)
1472        d = self._eu.get_size()
1473        def _got_size(size):
1474            self._size = size
1475            return size
1476        d.addCallback(_got_size)
1477        return d
1478
1479    def remote_get_size(self):
1480        return self.get_size()
1481    def remote_get_all_encoding_parameters(self):
1482        return self._eu.get_all_encoding_parameters()
1483
1484    def _read_encrypted(self, length, hash_only):
1485        d = self._eu.read_encrypted(length, hash_only)
1486        def _read(strings):
1487            if hash_only:
1488                self._offset += length
1489            else:
1490                size = sum([len(data) for data in strings])
1491                self._offset += size
1492            return strings
1493        d.addCallback(_read)
1494        return d
1495
1496    def remote_read_encrypted(self, offset, length):
1497        # we don't support seek backwards, but we allow skipping forwards
1498        precondition(offset >= 0, offset)
1499        precondition(length >= 0, length)
1500        lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1501                     level=log.NOISY)
1502        precondition(offset >= self._offset, offset, self._offset)
1503        if offset > self._offset:
1504            # read the data from disk anyways, to build up the hash tree
1505            skip = offset - self._offset
1506            log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1507                    (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1508            d = self._read_encrypted(skip, hash_only=True)
1509        else:
1510            d = defer.succeed(None)
1511
1512        def _at_correct_offset(res):
1513            assert offset == self._offset, "%d != %d" % (offset, self._offset)
1514            return self._read_encrypted(length, hash_only=False)
1515        d.addCallback(_at_correct_offset)
1516
1517        def _read(strings):
1518            size = sum([len(data) for data in strings])
1519            self._bytes_sent += size
1520            return strings
1521        d.addCallback(_read)
1522        return d
1523
1524    def remote_close(self):
1525        return self._eu.close()
1526
1527
1528class AssistedUploader(object):
1529
1530    def __init__(self, helper, storage_broker):
1531        self._helper = helper
1532        self._storage_broker = storage_broker
1533        self._log_number = log.msg("AssistedUploader starting")
1534        self._storage_index = None
1535        self._upload_status = s = UploadStatus()
1536        s.set_helper(True)
1537        s.set_active(True)
1538
1539    def log(self, *args, **kwargs):
1540        if "parent" not in kwargs:
1541            kwargs["parent"] = self._log_number
1542        return log.msg(*args, **kwargs)
1543
1544    def start(self, encrypted_uploadable, storage_index):
1545        """Start uploading the file.
1546
1547        Returns a Deferred that will fire with the UploadResults instance.
1548        """
1549        precondition(isinstance(storage_index, bytes), storage_index)
1550        self._started = time.time()
1551        eu = IEncryptedUploadable(encrypted_uploadable)
1552        eu.set_upload_status(self._upload_status)
1553        self._encuploadable = eu
1554        self._storage_index = storage_index
1555        d = eu.get_size()
1556        d.addCallback(self._got_size)
1557        d.addCallback(lambda res: eu.get_all_encoding_parameters())
1558        d.addCallback(self._got_all_encoding_parameters)
1559        d.addCallback(self._contact_helper)
1560        d.addCallback(self._build_verifycap)
1561        def _done(res):
1562            self._upload_status.set_active(False)
1563            return res
1564        d.addBoth(_done)
1565        return d
1566
1567    def _got_size(self, size):
1568        self._size = size
1569        self._upload_status.set_size(size)
1570
1571    def _got_all_encoding_parameters(self, params):
1572        k, happy, n, segment_size = params
1573        # stash these for URI generation later
1574        self._needed_shares = k
1575        self._total_shares = n
1576        self._segment_size = segment_size
1577
1578    def _contact_helper(self, res):
1579        now = self._time_contacting_helper_start = time.time()
1580        self._storage_index_elapsed = now - self._started
1581        self.log(format="contacting helper for SI %(si)s..",
1582                 si=si_b2a(self._storage_index), level=log.NOISY)
1583        self._upload_status.set_status("Contacting Helper")
1584        d = self._helper.callRemote("upload_chk", self._storage_index)
1585        d.addCallback(self._contacted_helper)
1586        return d
1587
1588    def _contacted_helper(self, helper_upload_results_and_upload_helper):
1589        (helper_upload_results, upload_helper) = helper_upload_results_and_upload_helper
1590        now = time.time()
1591        elapsed = now - self._time_contacting_helper_start
1592        self._elapsed_time_contacting_helper = elapsed
1593        if upload_helper:
1594            self.log("helper says we need to upload", level=log.NOISY)
1595            self._upload_status.set_status("Uploading Ciphertext")
1596            # we need to upload the file
1597            reu = RemoteEncryptedUploadable(self._encuploadable,
1598                                            self._upload_status)
1599            # let it pre-compute the size for progress purposes
1600            d = reu.get_size()
1601            d.addCallback(lambda ignored:
1602                          upload_helper.callRemote("upload", reu))
1603            # this Deferred will fire with the upload results
1604            return d
1605        self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1606        self._upload_status.set_progress(1, 1.0)
1607        return helper_upload_results
1608
1609    def _convert_old_upload_results(self, upload_results):
1610        # pre-1.3.0 helpers return upload results which contain a mapping
1611        # from shnum to a single human-readable string, containing things
1612        # like "Found on [x],[y],[z]" (for healthy files that were already in
1613        # the grid), "Found on [x]" (for files that needed upload but which
1614        # discovered pre-existing shares), and "Placed on [x]" (for newly
1615        # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1616        # set of binary serverid strings.
1617
1618        # the old results are too hard to deal with (they don't even contain
1619        # as much information as the new results, since the nodeids are
1620        # abbreviated), so if we detect old results, just clobber them.
1621
1622        sharemap = upload_results.sharemap
1623        if any(isinstance(v, (bytes, str)) for v in sharemap.values()):
1624            upload_results.sharemap = None
1625
1626    def _build_verifycap(self, helper_upload_results):
1627        self.log("upload finished, building readcap", level=log.OPERATIONAL)
1628        self._convert_old_upload_results(helper_upload_results)
1629        self._upload_status.set_status("Building Readcap")
1630        hur = helper_upload_results
1631        assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1632        assert hur.uri_extension_data["total_shares"] == self._total_shares
1633        assert hur.uri_extension_data["segment_size"] == self._segment_size
1634        assert hur.uri_extension_data["size"] == self._size
1635
1636        # hur.verifycap doesn't exist if already found
1637        v = uri.CHKFileVerifierURI(self._storage_index,
1638                                   uri_extension_hash=hur.uri_extension_hash,
1639                                   needed_shares=self._needed_shares,
1640                                   total_shares=self._total_shares,
1641                                   size=self._size)
1642        timings = {}
1643        timings["storage_index"] = self._storage_index_elapsed
1644        timings["contacting_helper"] = self._elapsed_time_contacting_helper
1645        for key,val in hur.timings.items():
1646            if key == "total":
1647                key = "helper_total"
1648            timings[key] = val
1649        now = time.time()
1650        timings["total"] = now - self._started
1651
1652        # Note: older Helpers (<=1.11) sent tubids as serverids. Newer ones
1653        # send pubkeys. get_stub_server() knows how to map both into
1654        # IDisplayableServer instances.
1655        gss = self._storage_broker.get_stub_server
1656        sharemap = {}
1657        servermap = {}
1658        for shnum, serverids in hur.sharemap.items():
1659            sharemap[shnum] = set([gss(serverid) for serverid in serverids])
1660        # if the file was already in the grid, hur.servermap is an empty dict
1661        for serverid, shnums in hur.servermap.items():
1662            servermap[gss(serverid)] = set(shnums)
1663
1664        ur = UploadResults(file_size=self._size,
1665                           # not if already found
1666                           ciphertext_fetched=hur.ciphertext_fetched,
1667                           preexisting_shares=hur.preexisting_shares,
1668                           pushed_shares=hur.pushed_shares,
1669                           sharemap=sharemap,
1670                           servermap=servermap,
1671                           timings=timings,
1672                           uri_extension_data=hur.uri_extension_data,
1673                           uri_extension_hash=hur.uri_extension_hash,
1674                           verifycapstr=v.to_string())
1675
1676        self._upload_status.set_status("Finished")
1677        self._upload_status.set_results(ur)
1678        return ur
1679
1680    def get_upload_status(self):
1681        return self._upload_status
1682
1683class BaseUploadable(object):
1684    # this is overridden by max_segment_size
1685    default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
1686    default_params_set = False
1687
1688    max_segment_size = None
1689    encoding_param_k = None
1690    encoding_param_happy = None
1691    encoding_param_n = None
1692
1693    _all_encoding_parameters = None
1694    _status = None
1695
1696    def set_upload_status(self, upload_status):
1697        self._status = IUploadStatus(upload_status)
1698
1699    def set_default_encoding_parameters(self, default_params):
1700        assert isinstance(default_params, dict)
1701        for k,v in default_params.items():
1702            precondition(isinstance(k, (bytes, str)), k, v)
1703            precondition(isinstance(v, int), k, v)
1704        if "k" in default_params:
1705            self.default_encoding_param_k = default_params["k"]
1706        if "happy" in default_params:
1707            self.default_encoding_param_happy = default_params["happy"]
1708        if "n" in default_params:
1709            self.default_encoding_param_n = default_params["n"]
1710        if "max_segment_size" in default_params:
1711            self.default_max_segment_size = default_params["max_segment_size"]
1712        self.default_params_set = True
1713
1714    def get_all_encoding_parameters(self):
1715        _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
1716        if self._all_encoding_parameters:
1717            return defer.succeed(self._all_encoding_parameters)
1718
1719        max_segsize = self.max_segment_size or self.default_max_segment_size
1720        k = self.encoding_param_k or self.default_encoding_param_k
1721        happy = self.encoding_param_happy or self.default_encoding_param_happy
1722        n = self.encoding_param_n or self.default_encoding_param_n
1723
1724        d = self.get_size()
1725        def _got_size(file_size):
1726            # for small files, shrink the segment size to avoid wasting space
1727            segsize = min(max_segsize, file_size)
1728            # this must be a multiple of 'required_shares'==k
1729            segsize = mathutil.next_multiple(segsize, k)
1730            encoding_parameters = (k, happy, n, segsize)
1731            self._all_encoding_parameters = encoding_parameters
1732            return encoding_parameters
1733        d.addCallback(_got_size)
1734        return d
1735
1736@implementer(IUploadable)
1737class FileHandle(BaseUploadable):
1738
1739    def __init__(self, filehandle, convergence):
1740        """
1741        Upload the data from the filehandle.  If convergence is None then a
1742        random encryption key will be used, else the plaintext will be hashed,
1743        then the hash will be hashed together with the string in the
1744        "convergence" argument to form the encryption key.
1745        """
1746        assert convergence is None or isinstance(convergence, bytes), (convergence, type(convergence))
1747        self._filehandle = filehandle
1748        self._key = None
1749        self.convergence = convergence
1750        self._size = None
1751
1752    def _get_encryption_key_convergent(self):
1753        if self._key is not None:
1754            return defer.succeed(self._key)
1755
1756        d = self.get_size()
1757        # that sets self._size as a side-effect
1758        d.addCallback(lambda size: self.get_all_encoding_parameters())
1759        def _got(params):
1760            k, happy, n, segsize = params
1761            f = self._filehandle
1762            enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1763            f.seek(0)
1764            BLOCKSIZE = 64*1024
1765            bytes_read = 0
1766            while True:
1767                data = f.read(BLOCKSIZE)
1768                if not data:
1769                    break
1770                enckey_hasher.update(data)
1771                # TODO: setting progress in a non-yielding loop is kind of
1772                # pointless, but I'm anticipating (perhaps prematurely) the
1773                # day when we use a slowjob or twisted's CooperatorService to
1774                # make this yield time to other jobs.
1775                bytes_read += len(data)
1776                if self._status:
1777                    self._status.set_progress(0, float(bytes_read)/self._size)
1778            f.seek(0)
1779            self._key = enckey_hasher.digest()
1780            if self._status:
1781                self._status.set_progress(0, 1.0)
1782            assert len(self._key) == 16
1783            return self._key
1784        d.addCallback(_got)
1785        return d
1786
1787    def _get_encryption_key_random(self):
1788        if self._key is None:
1789            self._key = os.urandom(16)
1790        return defer.succeed(self._key)
1791
1792    def get_encryption_key(self):
1793        if self.convergence is not None:
1794            return self._get_encryption_key_convergent()
1795        else:
1796            return self._get_encryption_key_random()
1797
1798    def get_size(self):
1799        if self._size is not None:
1800            return defer.succeed(self._size)
1801        self._filehandle.seek(0, os.SEEK_END)
1802        size = self._filehandle.tell()
1803        self._size = size
1804        self._filehandle.seek(0)
1805        return defer.succeed(size)
1806
1807    def read(self, length):
1808        return defer.succeed([self._filehandle.read(length)])
1809
1810    def close(self):
1811        # the originator of the filehandle reserves the right to close it
1812        pass
1813
1814class FileName(FileHandle):
1815    def __init__(self, filename, convergence):
1816        """
1817        Upload the data from the filename.  If convergence is None then a
1818        random encryption key will be used, else the plaintext will be hashed,
1819        then the hash will be hashed together with the string in the
1820        "convergence" argument to form the encryption key.
1821        """
1822        assert convergence is None or isinstance(convergence, bytes), (convergence, type(convergence))
1823        FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1824    def close(self):
1825        FileHandle.close(self)
1826        self._filehandle.close()
1827
1828class Data(FileHandle):
1829    def __init__(self, data, convergence):
1830        """
1831        Upload the data from the data argument.  If convergence is None then a
1832        random encryption key will be used, else the plaintext will be hashed,
1833        then the hash will be hashed together with the string in the
1834        "convergence" argument to form the encryption key.
1835        """
1836        assert convergence is None or isinstance(convergence, bytes), (convergence, type(convergence))
1837        FileHandle.__init__(self, BytesIO(data), convergence=convergence)
1838
1839@implementer(IUploader)
1840class Uploader(service.MultiService, log.PrefixingLogMixin):
1841    """I am a service that allows file uploading. I am a service-child of the
1842    Client.
1843    """
1844    # The type in Twisted for services is wrong in 22.10...
1845    # https://github.com/twisted/twisted/issues/10135
1846    name = "uploader"  # type: ignore[assignment]
1847    URI_LIT_SIZE_THRESHOLD = 55
1848
1849    def __init__(self, helper_furl=None, stats_provider=None, history=None):
1850        self._helper_furl = helper_furl
1851        self.stats_provider = stats_provider
1852        self._history = history
1853        self._helper = None
1854        self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1855        log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1856        service.MultiService.__init__(self)
1857
1858    def startService(self):
1859        service.MultiService.startService(self)
1860        if self._helper_furl:
1861            self.parent.tub.connectTo(ensure_str(self._helper_furl),
1862                                      self._got_helper)
1863
1864    def _got_helper(self, helper):
1865        self.log("got helper connection, getting versions")
1866        default = { b"http://allmydata.org/tahoe/protocols/helper/v1" :
1867                    { },
1868                    b"application-version": b"unknown: no get_version()",
1869                    }
1870        d = add_version_to_remote_reference(helper, default)
1871        d.addCallback(self._got_versioned_helper)
1872
1873    def _got_versioned_helper(self, helper):
1874        needed = b"http://allmydata.org/tahoe/protocols/helper/v1"
1875        if needed not in helper.version:
1876            raise InsufficientVersionError(needed, helper.version)
1877        self._helper = helper
1878        helper.notifyOnDisconnect(self._lost_helper)
1879
1880    def _lost_helper(self):
1881        self._helper = None
1882
1883    def get_helper_info(self):
1884        # return a tuple of (helper_furl_or_None, connected_bool)
1885        return (self._helper_furl, bool(self._helper))
1886
1887
1888    def upload(self, uploadable, reactor=None):
1889        """
1890        Returns a Deferred that will fire with the UploadResults instance.
1891        """
1892        assert self.parent
1893        assert self.running
1894
1895        uploadable = IUploadable(uploadable)
1896        d = uploadable.get_size()
1897        def _got_size(size):
1898            default_params = self.parent.get_encoding_parameters()
1899            precondition(isinstance(default_params, dict), default_params)
1900            precondition("max_segment_size" in default_params, default_params)
1901            uploadable.set_default_encoding_parameters(default_params)
1902
1903            if self.stats_provider:
1904                self.stats_provider.count('uploader.files_uploaded', 1)
1905                self.stats_provider.count('uploader.bytes_uploaded', size)
1906
1907            if size <= self.URI_LIT_SIZE_THRESHOLD:
1908                uploader = LiteralUploader()
1909                return uploader.start(uploadable)
1910            else:
1911                eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1912                d2 = defer.succeed(None)
1913                storage_broker = self.parent.get_storage_broker()
1914                if self._helper:
1915                    uploader = AssistedUploader(self._helper, storage_broker)
1916                    d2.addCallback(lambda x: eu.get_storage_index())
1917                    d2.addCallback(lambda si: uploader.start(eu, si))
1918                else:
1919                    storage_broker = self.parent.get_storage_broker()
1920                    secret_holder = self.parent._secret_holder
1921                    uploader = CHKUploader(storage_broker, secret_holder, reactor=reactor)
1922                    d2.addCallback(lambda x: uploader.start(eu))
1923
1924                self._all_uploads[uploader] = None
1925                if self._history:
1926                    self._history.add_upload(uploader.get_upload_status())
1927                def turn_verifycap_into_read_cap(uploadresults):
1928                    # Generate the uri from the verifycap plus the key.
1929                    d3 = uploadable.get_encryption_key()
1930                    def put_readcap_into_results(key):
1931                        v = uri.from_string(uploadresults.get_verifycapstr())
1932                        r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1933                        uploadresults.set_uri(r.to_string())
1934                        return uploadresults
1935                    d3.addCallback(put_readcap_into_results)
1936                    return d3
1937                d2.addCallback(turn_verifycap_into_read_cap)
1938                return d2
1939        d.addCallback(_got_size)
1940        def _done(res):
1941            uploadable.close()
1942            return res
1943        d.addBoth(_done)
1944        return d
Note: See TracBrowser for help on using the repository browser.