1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from __future__ import annotations |
---|
6 | |
---|
7 | from six import ensure_str |
---|
8 | |
---|
9 | import os, time, weakref, itertools |
---|
10 | |
---|
11 | import attr |
---|
12 | |
---|
13 | from zope.interface import implementer |
---|
14 | from twisted.python import failure |
---|
15 | from twisted.internet import defer |
---|
16 | from twisted.application import service |
---|
17 | from foolscap.api import Referenceable, Copyable, RemoteCopy |
---|
18 | |
---|
19 | from allmydata.crypto import aes |
---|
20 | from allmydata.util.hashutil import file_renewal_secret_hash, \ |
---|
21 | file_cancel_secret_hash, bucket_renewal_secret_hash, \ |
---|
22 | bucket_cancel_secret_hash, plaintext_hasher, \ |
---|
23 | storage_index_hash, plaintext_segment_hasher, convergence_hasher |
---|
24 | from allmydata.util.deferredutil import ( |
---|
25 | timeout_call, |
---|
26 | until, |
---|
27 | ) |
---|
28 | from allmydata import hashtree, uri |
---|
29 | from allmydata.storage.server import si_b2a |
---|
30 | from allmydata.immutable import encode |
---|
31 | from allmydata.util import base32, dictutil, idlib, log, mathutil |
---|
32 | from allmydata.util.happinessutil import servers_of_happiness, \ |
---|
33 | merge_servers, failure_message |
---|
34 | from allmydata.util.assertutil import precondition, _assert |
---|
35 | from allmydata.util.rrefutil import add_version_to_remote_reference |
---|
36 | from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ |
---|
37 | IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ |
---|
38 | NoServersError, InsufficientVersionError, UploadUnhappinessError, \ |
---|
39 | DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE, IPeerSelector |
---|
40 | from allmydata.immutable import layout |
---|
41 | |
---|
42 | from io import BytesIO |
---|
43 | from .happiness_upload import share_placement, calculate_happiness |
---|
44 | |
---|
45 | from ..util.eliotutil import ( |
---|
46 | log_call_deferred, |
---|
47 | inline_callbacks, |
---|
48 | ) |
---|
49 | |
---|
50 | from eliot import ( |
---|
51 | ActionType, |
---|
52 | MessageType, |
---|
53 | Field, |
---|
54 | ) |
---|
55 | |
---|
56 | _TOTAL_SHARES = Field.for_types( |
---|
57 | u"total_shares", |
---|
58 | [int], |
---|
59 | u"The total number of shares desired.", |
---|
60 | ) |
---|
61 | |
---|
62 | def _serialize_peers(peers): |
---|
63 | return sorted(base32.b2a(p) for p in peers) |
---|
64 | |
---|
65 | _PEERS = Field( |
---|
66 | u"peers", |
---|
67 | _serialize_peers, |
---|
68 | u"The read/write peers being considered.", |
---|
69 | ) |
---|
70 | |
---|
71 | _READONLY_PEERS = Field( |
---|
72 | u"readonly_peers", |
---|
73 | _serialize_peers, |
---|
74 | u"The read-only peers being considered.", |
---|
75 | ) |
---|
76 | |
---|
77 | def _serialize_existing_shares(existing_shares): |
---|
78 | return { |
---|
79 | ensure_str(server): list(shares) |
---|
80 | for (server, shares) |
---|
81 | in existing_shares.items() |
---|
82 | } |
---|
83 | |
---|
84 | _EXISTING_SHARES = Field( |
---|
85 | u"existing_shares", |
---|
86 | _serialize_existing_shares, |
---|
87 | u"The shares that are believed to already have been placed.", |
---|
88 | ) |
---|
89 | |
---|
90 | def _serialize_happiness_mappings(happiness_mappings): |
---|
91 | return { |
---|
92 | str(sharenum): ensure_str(base32.b2a(serverid)) |
---|
93 | for (sharenum, serverid) |
---|
94 | in happiness_mappings.items() |
---|
95 | } |
---|
96 | |
---|
97 | _HAPPINESS_MAPPINGS = Field( |
---|
98 | u"happiness_mappings", |
---|
99 | _serialize_happiness_mappings, |
---|
100 | u"The computed happiness mapping for a particular upload.", |
---|
101 | ) |
---|
102 | |
---|
103 | _HAPPINESS = Field.for_types( |
---|
104 | u"happiness", |
---|
105 | [int], |
---|
106 | u"The computed happiness of a certain placement.", |
---|
107 | ) |
---|
108 | |
---|
109 | _UPLOAD_TRACKERS = Field( |
---|
110 | u"upload_trackers", |
---|
111 | lambda trackers: list( |
---|
112 | dict( |
---|
113 | server=ensure_str(tracker.get_name()), |
---|
114 | shareids=sorted(tracker.buckets.keys()), |
---|
115 | ) |
---|
116 | for tracker |
---|
117 | in trackers |
---|
118 | ), |
---|
119 | u"Some servers which have agreed to hold some shares for us.", |
---|
120 | ) |
---|
121 | |
---|
122 | _ALREADY_SERVERIDS = Field( |
---|
123 | u"already_serverids", |
---|
124 | lambda d: {str(k): v for k, v in d.items()}, |
---|
125 | u"Some servers which are already holding some shares that we were interested in uploading.", |
---|
126 | ) |
---|
127 | |
---|
128 | LOCATE_ALL_SHAREHOLDERS = ActionType( |
---|
129 | u"immutable:upload:locate-all-shareholders", |
---|
130 | [], |
---|
131 | [_UPLOAD_TRACKERS, _ALREADY_SERVERIDS], |
---|
132 | u"Existing shareholders are being identified to plan upload actions.", |
---|
133 | ) |
---|
134 | |
---|
135 | GET_SHARE_PLACEMENTS = MessageType( |
---|
136 | u"immutable:upload:get-share-placements", |
---|
137 | [_TOTAL_SHARES, _PEERS, _READONLY_PEERS, _EXISTING_SHARES, _HAPPINESS_MAPPINGS, _HAPPINESS], |
---|
138 | u"Share placement is being computed for an upload.", |
---|
139 | ) |
---|
140 | |
---|
141 | _EFFECTIVE_HAPPINESS = Field.for_types( |
---|
142 | u"effective_happiness", |
---|
143 | [int], |
---|
144 | u"The computed happiness value of a share placement map.", |
---|
145 | ) |
---|
146 | |
---|
147 | CONVERGED_HAPPINESS = MessageType( |
---|
148 | u"immutable:upload:get-shareholders:converged-happiness", |
---|
149 | [_EFFECTIVE_HAPPINESS], |
---|
150 | u"The share placement algorithm has converged and placements efforts are complete.", |
---|
151 | ) |
---|
152 | |
---|
153 | |
---|
154 | # this wants to live in storage, not here |
---|
155 | class TooFullError(Exception): |
---|
156 | pass |
---|
157 | |
---|
158 | # HelperUploadResults are what we get from the Helper, and to retain |
---|
159 | # backwards compatibility with old Helpers we can't change the format. We |
---|
160 | # convert them into a local UploadResults upon receipt. |
---|
161 | class HelperUploadResults(Copyable, RemoteCopy): |
---|
162 | # note: don't change this string, it needs to match the value used on the |
---|
163 | # helper, and it does *not* need to match the fully-qualified |
---|
164 | # package/module/class name |
---|
165 | # |
---|
166 | # Needs to be native string to make Foolscap happy. |
---|
167 | typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com" |
---|
168 | copytype = typeToCopy |
---|
169 | |
---|
170 | # also, think twice about changing the shape of any existing attribute, |
---|
171 | # because instances of this class are sent from the helper to its client, |
---|
172 | # so changing this may break compatibility. Consider adding new fields |
---|
173 | # instead of modifying existing ones. |
---|
174 | |
---|
175 | def __init__(self): |
---|
176 | self.timings = {} # dict of name to number of seconds |
---|
177 | self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)} |
---|
178 | self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)} |
---|
179 | self.file_size = None |
---|
180 | self.ciphertext_fetched = None # how much the helper fetched |
---|
181 | self.uri = None |
---|
182 | self.preexisting_shares = None # count of shares already present |
---|
183 | self.pushed_shares = None # count of shares we pushed |
---|
184 | |
---|
185 | @implementer(IUploadResults) |
---|
186 | class UploadResults(object): |
---|
187 | |
---|
188 | def __init__(self, file_size, |
---|
189 | ciphertext_fetched, # how much the helper fetched |
---|
190 | preexisting_shares, # count of shares already present |
---|
191 | pushed_shares, # count of shares we pushed |
---|
192 | sharemap, # {shnum: set(server)} |
---|
193 | servermap, # {server: set(shnum)} |
---|
194 | timings, # dict of name to number of seconds |
---|
195 | uri_extension_data, |
---|
196 | uri_extension_hash, |
---|
197 | verifycapstr): |
---|
198 | self._file_size = file_size |
---|
199 | self._ciphertext_fetched = ciphertext_fetched |
---|
200 | self._preexisting_shares = preexisting_shares |
---|
201 | self._pushed_shares = pushed_shares |
---|
202 | self._sharemap = sharemap |
---|
203 | self._servermap = servermap |
---|
204 | self._timings = timings |
---|
205 | self._uri_extension_data = uri_extension_data |
---|
206 | self._uri_extension_hash = uri_extension_hash |
---|
207 | self._verifycapstr = verifycapstr |
---|
208 | |
---|
209 | def set_uri(self, uri): |
---|
210 | self._uri = uri |
---|
211 | |
---|
212 | def get_file_size(self): |
---|
213 | return self._file_size |
---|
214 | def get_uri(self): |
---|
215 | return self._uri |
---|
216 | def get_ciphertext_fetched(self): |
---|
217 | return self._ciphertext_fetched |
---|
218 | def get_preexisting_shares(self): |
---|
219 | return self._preexisting_shares |
---|
220 | def get_pushed_shares(self): |
---|
221 | return self._pushed_shares |
---|
222 | def get_sharemap(self): |
---|
223 | return self._sharemap |
---|
224 | def get_servermap(self): |
---|
225 | return self._servermap |
---|
226 | def get_timings(self): |
---|
227 | return self._timings |
---|
228 | def get_uri_extension_data(self): |
---|
229 | return self._uri_extension_data |
---|
230 | def get_verifycapstr(self): |
---|
231 | return self._verifycapstr |
---|
232 | |
---|
233 | |
---|
234 | def pretty_print_shnum_to_servers(s): |
---|
235 | return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.items() ]) |
---|
236 | |
---|
237 | |
---|
238 | class ServerTracker(object): |
---|
239 | def __init__(self, server, |
---|
240 | sharesize, blocksize, num_segments, num_share_hashes, |
---|
241 | storage_index, |
---|
242 | bucket_renewal_secret, bucket_cancel_secret, |
---|
243 | uri_extension_size): |
---|
244 | self._server = server |
---|
245 | self.buckets = {} # k: shareid, v: IRemoteBucketWriter |
---|
246 | self.sharesize = sharesize |
---|
247 | self.uri_extension_size = uri_extension_size |
---|
248 | |
---|
249 | wbp = layout.make_write_bucket_proxy(None, None, sharesize, |
---|
250 | blocksize, num_segments, |
---|
251 | num_share_hashes, |
---|
252 | uri_extension_size) |
---|
253 | self.wbp_class = wbp.__class__ # to create more of them |
---|
254 | self.allocated_size = wbp.get_allocated_size() |
---|
255 | self.blocksize = blocksize |
---|
256 | self.num_segments = num_segments |
---|
257 | self.num_share_hashes = num_share_hashes |
---|
258 | self.storage_index = storage_index |
---|
259 | |
---|
260 | self.renew_secret = bucket_renewal_secret |
---|
261 | self.cancel_secret = bucket_cancel_secret |
---|
262 | |
---|
263 | def __repr__(self): |
---|
264 | return ("<ServerTracker for server %r and SI %r>" |
---|
265 | % (self._server.get_name(), si_b2a(self.storage_index)[:5])) |
---|
266 | |
---|
267 | def get_server(self): |
---|
268 | return self._server |
---|
269 | def get_serverid(self): |
---|
270 | return self._server.get_serverid() |
---|
271 | def get_name(self): |
---|
272 | return self._server.get_name() |
---|
273 | |
---|
274 | def query(self, sharenums): |
---|
275 | storage_server = self._server.get_storage_server() |
---|
276 | d = storage_server.allocate_buckets( |
---|
277 | self.storage_index, |
---|
278 | self.renew_secret, |
---|
279 | self.cancel_secret, |
---|
280 | sharenums, |
---|
281 | self.allocated_size, |
---|
282 | canary=Referenceable(), |
---|
283 | ) |
---|
284 | d.addCallback(self._buckets_allocated) |
---|
285 | return d |
---|
286 | |
---|
287 | def ask_about_existing_shares(self): |
---|
288 | storage_server = self._server.get_storage_server() |
---|
289 | return storage_server.get_buckets(self.storage_index) |
---|
290 | |
---|
291 | def _buckets_allocated(self, alreadygot_and_buckets): |
---|
292 | #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets))) |
---|
293 | (alreadygot, buckets) = alreadygot_and_buckets |
---|
294 | b = {} |
---|
295 | for sharenum, rref in list(buckets.items()): |
---|
296 | bp = self.wbp_class(rref, self._server, self.sharesize, |
---|
297 | self.blocksize, |
---|
298 | self.num_segments, |
---|
299 | self.num_share_hashes, |
---|
300 | self.uri_extension_size) |
---|
301 | b[sharenum] = bp |
---|
302 | self.buckets.update(b) |
---|
303 | return (alreadygot, set(b.keys())) |
---|
304 | |
---|
305 | |
---|
306 | def abort(self): |
---|
307 | """ |
---|
308 | I abort the remote bucket writers for all shares. This is a good idea |
---|
309 | to conserve space on the storage server. |
---|
310 | """ |
---|
311 | self.abort_some_buckets(list(self.buckets.keys())) |
---|
312 | |
---|
313 | def abort_some_buckets(self, sharenums): |
---|
314 | """ |
---|
315 | I abort the remote bucket writers for the share numbers in sharenums. |
---|
316 | """ |
---|
317 | for sharenum in sharenums: |
---|
318 | if sharenum in self.buckets: |
---|
319 | self.buckets[sharenum].abort() |
---|
320 | del self.buckets[sharenum] |
---|
321 | |
---|
322 | |
---|
323 | def str_shareloc(shnum, bucketwriter): |
---|
324 | return "%s: %s" % (shnum, ensure_str(bucketwriter.get_servername()),) |
---|
325 | |
---|
326 | |
---|
327 | @implementer(IPeerSelector) |
---|
328 | class PeerSelector(object): |
---|
329 | |
---|
330 | def __init__(self, num_segments, total_shares, needed_shares, min_happiness): |
---|
331 | self.num_segments = num_segments |
---|
332 | self.total_shares = total_shares |
---|
333 | self.needed_shares = needed_shares |
---|
334 | self.min_happiness = min_happiness |
---|
335 | |
---|
336 | self.existing_shares = {} |
---|
337 | self.peers = set() |
---|
338 | self.readonly_peers = set() |
---|
339 | self.bad_peers = set() |
---|
340 | |
---|
341 | def add_peer_with_share(self, peerid, shnum): |
---|
342 | try: |
---|
343 | self.existing_shares[peerid].add(shnum) |
---|
344 | except KeyError: |
---|
345 | self.existing_shares[peerid] = set([shnum]) |
---|
346 | |
---|
347 | def add_peer(self, peerid): |
---|
348 | self.peers.add(peerid) |
---|
349 | |
---|
350 | def mark_readonly_peer(self, peerid): |
---|
351 | self.readonly_peers.add(peerid) |
---|
352 | self.peers.remove(peerid) |
---|
353 | |
---|
354 | def mark_bad_peer(self, peerid): |
---|
355 | if peerid in self.peers: |
---|
356 | self.peers.remove(peerid) |
---|
357 | self.bad_peers.add(peerid) |
---|
358 | elif peerid in self.readonly_peers: |
---|
359 | self.readonly_peers.remove(peerid) |
---|
360 | self.bad_peers.add(peerid) |
---|
361 | |
---|
362 | def get_sharemap_of_preexisting_shares(self): |
---|
363 | preexisting = dictutil.DictOfSets() |
---|
364 | for server, shares in self.existing_shares.items(): |
---|
365 | for share in shares: |
---|
366 | preexisting.add(share, server) |
---|
367 | return preexisting |
---|
368 | |
---|
369 | def get_share_placements(self): |
---|
370 | shares = set(range(self.total_shares)) |
---|
371 | self.happiness_mappings = share_placement(self.peers, self.readonly_peers, shares, self.existing_shares) |
---|
372 | self.happiness = calculate_happiness(self.happiness_mappings) |
---|
373 | GET_SHARE_PLACEMENTS.log( |
---|
374 | total_shares=self.total_shares, |
---|
375 | peers=self.peers, |
---|
376 | readonly_peers=self.readonly_peers, |
---|
377 | existing_shares=self.existing_shares, |
---|
378 | happiness_mappings=self.happiness_mappings, |
---|
379 | happiness=self.happiness, |
---|
380 | ) |
---|
381 | return self.happiness_mappings |
---|
382 | |
---|
383 | def add_peers(self, peerids=None): |
---|
384 | raise NotImplementedError |
---|
385 | |
---|
386 | |
---|
387 | class _QueryStatistics(object): |
---|
388 | |
---|
389 | def __init__(self): |
---|
390 | self.total = 0 |
---|
391 | self.good = 0 |
---|
392 | self.bad = 0 |
---|
393 | self.full = 0 |
---|
394 | self.error = 0 |
---|
395 | self.contacted = 0 |
---|
396 | |
---|
397 | def __str__(self): |
---|
398 | return "QueryStatistics(total={} good={} bad={} full={} " \ |
---|
399 | "error={} contacted={})".format( |
---|
400 | self.total, |
---|
401 | self.good, |
---|
402 | self.bad, |
---|
403 | self.full, |
---|
404 | self.error, |
---|
405 | self.contacted, |
---|
406 | ) |
---|
407 | |
---|
408 | |
---|
409 | class Tahoe2ServerSelector(log.PrefixingLogMixin): |
---|
410 | |
---|
411 | def __init__(self, upload_id, logparent=None, upload_status=None, reactor=None): |
---|
412 | self.upload_id = upload_id |
---|
413 | self._query_stats = _QueryStatistics() |
---|
414 | self.last_failure_msg = None |
---|
415 | self._status = IUploadStatus(upload_status) |
---|
416 | log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) |
---|
417 | self.log("starting", level=log.OPERATIONAL) |
---|
418 | if reactor is None: |
---|
419 | from twisted.internet import reactor |
---|
420 | self._reactor = reactor |
---|
421 | |
---|
422 | def __repr__(self): |
---|
423 | return "<Tahoe2ServerSelector for upload %r>" % self.upload_id |
---|
424 | |
---|
425 | def _create_trackers(self, candidate_servers, allocated_size, |
---|
426 | file_renewal_secret, file_cancel_secret, create_server_tracker): |
---|
427 | |
---|
428 | # filter the list of servers according to which ones can accomodate |
---|
429 | # this request. This excludes older servers (which used a 4-byte size |
---|
430 | # field) from getting large shares (for files larger than about |
---|
431 | # 12GiB). See #439 for details. |
---|
432 | def _get_maxsize(server): |
---|
433 | v0 = server.get_version() |
---|
434 | v1 = v0[b"http://allmydata.org/tahoe/protocols/storage/v1"] |
---|
435 | return v1[b"maximum-immutable-share-size"] |
---|
436 | |
---|
437 | for server in candidate_servers: |
---|
438 | self.peer_selector.add_peer(server.get_serverid()) |
---|
439 | writeable_servers = [ |
---|
440 | server for server in candidate_servers |
---|
441 | if _get_maxsize(server) >= allocated_size |
---|
442 | ] |
---|
443 | readonly_servers = set(candidate_servers) - set(writeable_servers) |
---|
444 | |
---|
445 | for server in readonly_servers: |
---|
446 | self.peer_selector.mark_readonly_peer(server.get_serverid()) |
---|
447 | |
---|
448 | def _make_trackers(servers): |
---|
449 | trackers = [] |
---|
450 | for s in servers: |
---|
451 | seed = s.get_lease_seed() |
---|
452 | renew = bucket_renewal_secret_hash(file_renewal_secret, seed) |
---|
453 | cancel = bucket_cancel_secret_hash(file_cancel_secret, seed) |
---|
454 | st = create_server_tracker(s, renew, cancel) |
---|
455 | trackers.append(st) |
---|
456 | return trackers |
---|
457 | |
---|
458 | write_trackers = _make_trackers(writeable_servers) |
---|
459 | |
---|
460 | # We don't try to allocate shares to these servers, since they've |
---|
461 | # said that they're incapable of storing shares of the size that we'd |
---|
462 | # want to store. We ask them about existing shares for this storage |
---|
463 | # index, which we want to know about for accurate |
---|
464 | # servers_of_happiness accounting, then we forget about them. |
---|
465 | readonly_trackers = _make_trackers(readonly_servers) |
---|
466 | |
---|
467 | return readonly_trackers, write_trackers |
---|
468 | |
---|
469 | @inline_callbacks |
---|
470 | def get_shareholders(self, storage_broker, secret_holder, |
---|
471 | storage_index, share_size, block_size, |
---|
472 | num_segments, total_shares, needed_shares, |
---|
473 | min_happiness, uri_extension_size): |
---|
474 | """ |
---|
475 | @return: (upload_trackers, already_serverids), where upload_trackers |
---|
476 | is a set of ServerTracker instances that have agreed to hold |
---|
477 | some shares for us (the shareids are stashed inside the |
---|
478 | ServerTracker), and already_serverids is a dict mapping |
---|
479 | shnum to a set of serverids for servers which claim to |
---|
480 | already have the share. |
---|
481 | """ |
---|
482 | |
---|
483 | # re-initialize statistics |
---|
484 | self._query_status = _QueryStatistics() |
---|
485 | |
---|
486 | if self._status: |
---|
487 | self._status.set_status("Contacting Servers..") |
---|
488 | |
---|
489 | self.peer_selector = PeerSelector(num_segments, total_shares, |
---|
490 | needed_shares, min_happiness) |
---|
491 | |
---|
492 | self.total_shares = total_shares |
---|
493 | self.min_happiness = min_happiness |
---|
494 | self.needed_shares = needed_shares |
---|
495 | |
---|
496 | self.homeless_shares = set(range(total_shares)) |
---|
497 | self.use_trackers = set() # ServerTrackers that have shares assigned |
---|
498 | # to them |
---|
499 | self.preexisting_shares = {} # shareid => set(serverids) holding shareid |
---|
500 | |
---|
501 | # These servers have shares -- any shares -- for our SI. We keep |
---|
502 | # track of these to write an error message with them later. |
---|
503 | self.serverids_with_shares = set() |
---|
504 | |
---|
505 | # this needed_hashes computation should mirror |
---|
506 | # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree |
---|
507 | # (instead of a HashTree) because we don't require actual hashing |
---|
508 | # just to count the levels. |
---|
509 | ht = hashtree.IncompleteHashTree(total_shares) |
---|
510 | num_share_hashes = len(ht.needed_hashes(0, include_leaf=True)) |
---|
511 | |
---|
512 | # figure out how much space to ask for |
---|
513 | wbp = layout.make_write_bucket_proxy(None, None, |
---|
514 | share_size, 0, num_segments, |
---|
515 | num_share_hashes, |
---|
516 | uri_extension_size) |
---|
517 | allocated_size = wbp.get_allocated_size() |
---|
518 | |
---|
519 | # decide upon the renewal/cancel secrets, to include them in the |
---|
520 | # allocate_buckets query. |
---|
521 | file_renewal_secret = file_renewal_secret_hash( |
---|
522 | secret_holder.get_renewal_secret(), |
---|
523 | storage_index, |
---|
524 | ) |
---|
525 | file_cancel_secret = file_cancel_secret_hash( |
---|
526 | secret_holder.get_cancel_secret(), |
---|
527 | storage_index, |
---|
528 | ) |
---|
529 | |
---|
530 | # see docs/specifications/servers-of-happiness.rst |
---|
531 | # 0. Start with an ordered list of servers. Maybe *2N* of them. |
---|
532 | # |
---|
533 | |
---|
534 | all_servers = storage_broker.get_servers_for_psi(storage_index, for_upload=True) |
---|
535 | if not all_servers: |
---|
536 | raise NoServersError("client gave us zero servers") |
---|
537 | |
---|
538 | def _create_server_tracker(server, renew, cancel): |
---|
539 | return ServerTracker( |
---|
540 | server, share_size, block_size, num_segments, num_share_hashes, |
---|
541 | storage_index, renew, cancel, uri_extension_size |
---|
542 | ) |
---|
543 | |
---|
544 | readonly_trackers, write_trackers = self._create_trackers( |
---|
545 | all_servers[:(2 * total_shares)], |
---|
546 | allocated_size, |
---|
547 | file_renewal_secret, |
---|
548 | file_cancel_secret, |
---|
549 | _create_server_tracker, |
---|
550 | ) |
---|
551 | |
---|
552 | # see docs/specifications/servers-of-happiness.rst |
---|
553 | # 1. Query all servers for existing shares. |
---|
554 | # |
---|
555 | # The spec doesn't say what to do for timeouts/errors. This |
---|
556 | # adds a timeout to each request, and rejects any that reply |
---|
557 | # with error (i.e. just removed from the list) |
---|
558 | |
---|
559 | ds = [] |
---|
560 | if self._status and readonly_trackers: |
---|
561 | self._status.set_status( |
---|
562 | "Contacting readonly servers to find any existing shares" |
---|
563 | ) |
---|
564 | |
---|
565 | # in the "pre servers-of-happiness" code, it was a little |
---|
566 | # ambigious whether "merely asking" counted as a "query" or |
---|
567 | # not, because "allocate_buckets" with nothing to allocate was |
---|
568 | # used to "ask" a write-able server what it held. Now we count |
---|
569 | # "actual allocation queries" only, because those are the only |
---|
570 | # things that actually affect what the server does. |
---|
571 | |
---|
572 | for tracker in readonly_trackers: |
---|
573 | assert isinstance(tracker, ServerTracker) |
---|
574 | d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15) |
---|
575 | d.addBoth(self._handle_existing_response, tracker) |
---|
576 | ds.append(d) |
---|
577 | self.log("asking server %r for any existing shares" % |
---|
578 | (tracker.get_name(),), level=log.NOISY) |
---|
579 | |
---|
580 | for tracker in write_trackers: |
---|
581 | assert isinstance(tracker, ServerTracker) |
---|
582 | d = timeout_call(self._reactor, tracker.ask_about_existing_shares(), 15) |
---|
583 | |
---|
584 | def timed_out(f, tracker): |
---|
585 | # print("TIMEOUT {}: {}".format(tracker, f)) |
---|
586 | write_trackers.remove(tracker) |
---|
587 | readonly_trackers.append(tracker) |
---|
588 | return f |
---|
589 | d.addErrback(timed_out, tracker) |
---|
590 | d.addBoth(self._handle_existing_write_response, tracker, set()) |
---|
591 | ds.append(d) |
---|
592 | self.log("asking server %r for any existing shares" % |
---|
593 | (tracker.get_name(),), level=log.NOISY) |
---|
594 | |
---|
595 | trackers = set(write_trackers) | set(readonly_trackers) |
---|
596 | |
---|
597 | # these will always be (True, None) because errors are handled |
---|
598 | # in the _handle_existing_write_response etc callbacks |
---|
599 | yield defer.DeferredList(ds) |
---|
600 | |
---|
601 | # okay, we've queried the 2N servers, time to get the share |
---|
602 | # placements and attempt to actually place the shares (or |
---|
603 | # renew them on read-only servers). We want to run the loop |
---|
604 | # below *at least once* because even read-only servers won't |
---|
605 | # renew their shares until "allocate_buckets" is called (via |
---|
606 | # tracker.query()) |
---|
607 | |
---|
608 | # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/778#comment:48 |
---|
609 | # min_happiness will be 0 for the repairer, so we set current |
---|
610 | # effective_happiness to less than zero so this loop runs at |
---|
611 | # least once for the repairer... |
---|
612 | |
---|
613 | def _bad_server(fail, tracker): |
---|
614 | self.last_failure_msg = fail |
---|
615 | return False # will mark it readonly |
---|
616 | |
---|
617 | def _make_readonly(tracker): |
---|
618 | # print("making {} read-only".format(tracker.get_serverid())) |
---|
619 | try: |
---|
620 | write_trackers.remove(tracker) |
---|
621 | except ValueError: |
---|
622 | pass |
---|
623 | # XXX can we just use a set() or does order matter? |
---|
624 | if tracker not in readonly_trackers: |
---|
625 | readonly_trackers.append(tracker) |
---|
626 | return None |
---|
627 | |
---|
628 | # so we *always* want to run this loop at least once, even if |
---|
629 | # we only have read-only servers -- because asking them to |
---|
630 | # allocate buckets renews those shares they already have. For |
---|
631 | # subsequent loops, we give up if we've achieved happiness OR |
---|
632 | # if we have zero writable servers left |
---|
633 | |
---|
634 | last_happiness = None |
---|
635 | effective_happiness = -1 |
---|
636 | while effective_happiness < min_happiness and \ |
---|
637 | (last_happiness is None or len(write_trackers)): |
---|
638 | errors_before = self._query_stats.bad |
---|
639 | self._share_placements = self.peer_selector.get_share_placements() |
---|
640 | |
---|
641 | placements = [] |
---|
642 | for tracker in trackers: |
---|
643 | shares_to_ask = self._allocation_for(tracker) |
---|
644 | |
---|
645 | # if we already tried to upload share X to this very |
---|
646 | # same server in a previous iteration, we should *not* |
---|
647 | # ask again. If we *do* ask, there's no real harm, but |
---|
648 | # the server will respond with an empty dict and that |
---|
649 | # confuses our statistics. However, if the server is a |
---|
650 | # readonly sever, we *do* want to ask so it refreshes |
---|
651 | # the share. |
---|
652 | if shares_to_ask != set(tracker.buckets.keys()) or tracker in readonly_trackers: |
---|
653 | self._query_stats.total += 1 |
---|
654 | self._query_stats.contacted += 1 |
---|
655 | d = timeout_call(self._reactor, tracker.query(shares_to_ask), 15) |
---|
656 | d.addBoth(self._buckets_allocated, tracker, shares_to_ask) |
---|
657 | d.addErrback(lambda f, tr: _bad_server(f, tr), tracker) |
---|
658 | d.addCallback(lambda x, tr: _make_readonly(tr) if not x else x, tracker) |
---|
659 | placements.append(d) |
---|
660 | |
---|
661 | yield defer.DeferredList(placements) |
---|
662 | merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) |
---|
663 | effective_happiness = servers_of_happiness(merged) |
---|
664 | if effective_happiness == last_happiness: |
---|
665 | # print("effective happiness still {}".format(last_happiness)) |
---|
666 | # we haven't improved over the last iteration; give up |
---|
667 | break; |
---|
668 | if errors_before == self._query_stats.bad: |
---|
669 | break; |
---|
670 | last_happiness = effective_happiness |
---|
671 | # print("write trackers left: {}".format(len(write_trackers))) |
---|
672 | |
---|
673 | # note: peer_selector.get_allocations() only maps "things we |
---|
674 | # uploaded in the above loop" and specificaly does *not* |
---|
675 | # include any pre-existing shares on read-only servers .. but |
---|
676 | # we *do* want to count those shares towards total happiness. |
---|
677 | |
---|
678 | # no more servers. If we haven't placed enough shares, we fail. |
---|
679 | # XXX note sometimes we're not running the loop at least once, |
---|
680 | # and so 'merged' must be (re-)computed here. |
---|
681 | merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) |
---|
682 | effective_happiness = servers_of_happiness(merged) |
---|
683 | |
---|
684 | # print("placements completed {} vs {}".format(effective_happiness, min_happiness)) |
---|
685 | # for k, v in merged.items(): |
---|
686 | # print(" {} -> {}".format(k, v)) |
---|
687 | |
---|
688 | CONVERGED_HAPPINESS.log( |
---|
689 | effective_happiness=effective_happiness, |
---|
690 | ) |
---|
691 | |
---|
692 | if effective_happiness < min_happiness: |
---|
693 | msg = failure_message( |
---|
694 | peer_count=len(self.serverids_with_shares), |
---|
695 | k=self.needed_shares, |
---|
696 | happy=min_happiness, |
---|
697 | effective_happy=effective_happiness, |
---|
698 | ) |
---|
699 | msg = ("server selection failed for %s: %s (%s), merged=%s" % |
---|
700 | (self, msg, self._get_progress_message(), |
---|
701 | pretty_print_shnum_to_servers(merged))) |
---|
702 | if self.last_failure_msg: |
---|
703 | msg += " (%s)" % (self.last_failure_msg,) |
---|
704 | self.log(msg, level=log.UNUSUAL) |
---|
705 | self._failed(msg) # raises UploadUnhappinessError |
---|
706 | return |
---|
707 | |
---|
708 | # we placed (or already had) enough to be happy, so we're done |
---|
709 | if self._status: |
---|
710 | self._status.set_status("Placed all shares") |
---|
711 | msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " |
---|
712 | "self.use_trackers: %s, self.preexisting_shares: %s") \ |
---|
713 | % (self, self._get_progress_message(), |
---|
714 | pretty_print_shnum_to_servers(merged), |
---|
715 | [', '.join([str_shareloc(k,v) |
---|
716 | for k,v in st.buckets.items()]) |
---|
717 | for st in self.use_trackers], |
---|
718 | pretty_print_shnum_to_servers(self.preexisting_shares)) |
---|
719 | self.log(msg, level=log.OPERATIONAL) |
---|
720 | defer.returnValue((self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares())) |
---|
721 | |
---|
722 | def _handle_existing_response(self, res, tracker): |
---|
723 | """ |
---|
724 | I handle responses to the queries sent by |
---|
725 | Tahoe2ServerSelector.get_shareholders. |
---|
726 | """ |
---|
727 | serverid = tracker.get_serverid() |
---|
728 | if isinstance(res, failure.Failure): |
---|
729 | self.log("%s got error during existing shares check: %s" |
---|
730 | % (tracker.get_name(), res), level=log.UNUSUAL) |
---|
731 | self.peer_selector.mark_bad_peer(serverid) |
---|
732 | else: |
---|
733 | buckets = res |
---|
734 | if buckets: |
---|
735 | self.serverids_with_shares.add(serverid) |
---|
736 | self.log("response to get_buckets() from server %r: alreadygot=%s" |
---|
737 | % (tracker.get_name(), tuple(sorted(buckets))), |
---|
738 | level=log.NOISY) |
---|
739 | for bucket in buckets: |
---|
740 | self.peer_selector.add_peer_with_share(serverid, bucket) |
---|
741 | self.preexisting_shares.setdefault(bucket, set()).add(serverid) |
---|
742 | self.homeless_shares.discard(bucket) |
---|
743 | |
---|
744 | def _handle_existing_write_response(self, res, tracker, shares_to_ask): |
---|
745 | """ |
---|
746 | Function handles the response from the write servers |
---|
747 | when inquiring about what shares each server already has. |
---|
748 | """ |
---|
749 | if isinstance(res, failure.Failure): |
---|
750 | self.peer_selector.mark_bad_peer(tracker.get_serverid()) |
---|
751 | self.log("%s got error during server selection: %s" % (tracker, res), |
---|
752 | level=log.UNUSUAL) |
---|
753 | self.homeless_shares |= shares_to_ask |
---|
754 | msg = ("last failure (from %s) was: %s" % (tracker, res)) |
---|
755 | self.last_failure_msg = msg |
---|
756 | else: |
---|
757 | for share in res.keys(): |
---|
758 | self.peer_selector.add_peer_with_share(tracker.get_serverid(), share) |
---|
759 | |
---|
760 | def _get_progress_message(self): |
---|
761 | if not self.homeless_shares: |
---|
762 | msg = "placed all %d shares, " % (self.total_shares) |
---|
763 | else: |
---|
764 | msg = ("placed %d shares out of %d total (%d homeless), " % |
---|
765 | (self.total_shares - len(self.homeless_shares), |
---|
766 | self.total_shares, |
---|
767 | len(self.homeless_shares))) |
---|
768 | assert self._query_stats.bad == (self._query_stats.full + self._query_stats.error) |
---|
769 | return ( |
---|
770 | msg + "want to place shares on at least {happy} servers such that " |
---|
771 | "any {needed} of them have enough shares to recover the file, " |
---|
772 | "sent {queries} queries to {servers} servers, " |
---|
773 | "{good} queries placed some shares, {bad} placed none " |
---|
774 | "(of which {full} placed none due to the server being" |
---|
775 | " full and {error} placed none due to an error)".format( |
---|
776 | happy=self.min_happiness, |
---|
777 | needed=self.needed_shares, |
---|
778 | queries=self._query_stats.total, |
---|
779 | servers=self._query_stats.contacted, |
---|
780 | good=self._query_stats.good, |
---|
781 | bad=self._query_stats.bad, |
---|
782 | full=self._query_stats.full, |
---|
783 | error=self._query_stats.error |
---|
784 | ) |
---|
785 | ) |
---|
786 | |
---|
787 | def _allocation_for(self, tracker): |
---|
788 | """ |
---|
789 | Given a ServerTracker, return a list of shares that we should |
---|
790 | store on that server. |
---|
791 | """ |
---|
792 | assert isinstance(tracker, ServerTracker) |
---|
793 | |
---|
794 | shares_to_ask = set() |
---|
795 | servermap = self._share_placements |
---|
796 | for shnum, tracker_id in list(servermap.items()): |
---|
797 | if tracker_id == None: |
---|
798 | continue |
---|
799 | if tracker.get_serverid() == tracker_id: |
---|
800 | shares_to_ask.add(shnum) |
---|
801 | if shnum in self.homeless_shares: |
---|
802 | self.homeless_shares.remove(shnum) |
---|
803 | |
---|
804 | if self._status: |
---|
805 | self._status.set_status("Contacting Servers [%r] (first query)," |
---|
806 | " %d shares left.." |
---|
807 | % (tracker.get_name(), |
---|
808 | len(self.homeless_shares))) |
---|
809 | return shares_to_ask |
---|
810 | |
---|
811 | def _buckets_allocated(self, res, tracker, shares_to_ask): |
---|
812 | """ |
---|
813 | Internal helper. If this returns an error or False, the server |
---|
814 | will be considered read-only for any future iterations. |
---|
815 | """ |
---|
816 | if isinstance(res, failure.Failure): |
---|
817 | # This is unusual, and probably indicates a bug or a network |
---|
818 | # problem. |
---|
819 | self.log("%s got error during server selection: %s" % (tracker, res), |
---|
820 | level=log.UNUSUAL) |
---|
821 | self._query_stats.error += 1 |
---|
822 | self._query_stats.bad += 1 |
---|
823 | self.homeless_shares |= shares_to_ask |
---|
824 | try: |
---|
825 | self.peer_selector.mark_readonly_peer(tracker.get_serverid()) |
---|
826 | except KeyError: |
---|
827 | pass |
---|
828 | return res |
---|
829 | |
---|
830 | else: |
---|
831 | (alreadygot, allocated) = res |
---|
832 | self.log("response to allocate_buckets() from server %r: alreadygot=%s, allocated=%s" |
---|
833 | % (tracker.get_name(), |
---|
834 | tuple(sorted(alreadygot)), tuple(sorted(allocated))), |
---|
835 | level=log.NOISY) |
---|
836 | progress = False |
---|
837 | for s in alreadygot: |
---|
838 | self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid()) |
---|
839 | if s in self.homeless_shares: |
---|
840 | self.homeless_shares.remove(s) |
---|
841 | progress = True |
---|
842 | elif s in shares_to_ask: |
---|
843 | progress = True |
---|
844 | |
---|
845 | # the ServerTracker will remember which shares were allocated on |
---|
846 | # that peer. We just have to remember to use them. |
---|
847 | if allocated: |
---|
848 | self.use_trackers.add(tracker) |
---|
849 | progress = True |
---|
850 | |
---|
851 | if allocated or alreadygot: |
---|
852 | self.serverids_with_shares.add(tracker.get_serverid()) |
---|
853 | |
---|
854 | not_yet_present = set(shares_to_ask) - set(alreadygot) |
---|
855 | still_homeless = not_yet_present - set(allocated) |
---|
856 | |
---|
857 | if still_homeless: |
---|
858 | # In networks with lots of space, this is very unusual and |
---|
859 | # probably indicates an error. In networks with servers that |
---|
860 | # are full, it is merely unusual. In networks that are very |
---|
861 | # full, it is common, and many uploads will fail. In most |
---|
862 | # cases, this is obviously not fatal, and we'll just use some |
---|
863 | # other servers. |
---|
864 | |
---|
865 | # some shares are still homeless, keep trying to find them a |
---|
866 | # home. The ones that were rejected get first priority. |
---|
867 | self.homeless_shares |= still_homeless |
---|
868 | # Since they were unable to accept all of our requests, so it |
---|
869 | # is safe to assume that asking them again won't help. |
---|
870 | |
---|
871 | if progress: |
---|
872 | # They accepted at least one of the shares that we asked |
---|
873 | # them to accept, or they had a share that we didn't ask |
---|
874 | # them to accept but that we hadn't placed yet, so this |
---|
875 | # was a productive query |
---|
876 | self._query_stats.good += 1 |
---|
877 | else: |
---|
878 | # if we asked for some allocations, but the server |
---|
879 | # didn't return any at all (i.e. empty dict) it must |
---|
880 | # be full |
---|
881 | self._query_stats.full += 1 |
---|
882 | self._query_stats.bad += 1 |
---|
883 | return progress |
---|
884 | |
---|
885 | def _failed(self, msg): |
---|
886 | """ |
---|
887 | I am called when server selection fails. I first abort all of the |
---|
888 | remote buckets that I allocated during my unsuccessful attempt to |
---|
889 | place shares for this file. I then raise an |
---|
890 | UploadUnhappinessError with my msg argument. |
---|
891 | """ |
---|
892 | for tracker in self.use_trackers: |
---|
893 | assert isinstance(tracker, ServerTracker) |
---|
894 | tracker.abort() |
---|
895 | raise UploadUnhappinessError(msg) |
---|
896 | |
---|
897 | |
---|
898 | @attr.s |
---|
899 | class _Accum(object): |
---|
900 | """ |
---|
901 | Accumulate up to some known amount of ciphertext. |
---|
902 | |
---|
903 | :ivar remaining: The number of bytes still expected. |
---|
904 | :ivar ciphertext: The bytes accumulated so far. |
---|
905 | """ |
---|
906 | remaining : int = attr.ib(validator=attr.validators.instance_of(int)) |
---|
907 | ciphertext : list[bytes] = attr.ib(default=attr.Factory(list)) |
---|
908 | |
---|
909 | def extend(self, |
---|
910 | size, # type: int |
---|
911 | ciphertext, # type: list[bytes] |
---|
912 | ): |
---|
913 | """ |
---|
914 | Accumulate some more ciphertext. |
---|
915 | |
---|
916 | :param size: The amount of data the new ciphertext represents towards |
---|
917 | the goal. This may be more than the actual size of the given |
---|
918 | ciphertext if the source has run out of data. |
---|
919 | |
---|
920 | :param ciphertext: The new ciphertext to accumulate. |
---|
921 | """ |
---|
922 | self.remaining -= size |
---|
923 | self.ciphertext.extend(ciphertext) |
---|
924 | |
---|
925 | |
---|
926 | @implementer(IEncryptedUploadable) |
---|
927 | class EncryptAnUploadable(object): |
---|
928 | """This is a wrapper that takes an IUploadable and provides |
---|
929 | IEncryptedUploadable.""" |
---|
930 | CHUNKSIZE = 50*1024 |
---|
931 | |
---|
932 | def __init__(self, original, log_parent=None, chunk_size=None): |
---|
933 | """ |
---|
934 | :param chunk_size: The number of bytes to read from the uploadable at a |
---|
935 | time, or None for some default. |
---|
936 | """ |
---|
937 | precondition(original.default_params_set, |
---|
938 | "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,)) |
---|
939 | self.original = IUploadable(original) |
---|
940 | self._log_number = log_parent |
---|
941 | self._encryptor = None |
---|
942 | self._plaintext_hasher = plaintext_hasher() |
---|
943 | self._plaintext_segment_hasher = None |
---|
944 | self._plaintext_segment_hashes = [] |
---|
945 | self._encoding_parameters = None |
---|
946 | self._file_size = None |
---|
947 | self._ciphertext_bytes_read = 0 |
---|
948 | self._status = None |
---|
949 | if chunk_size is not None: |
---|
950 | self.CHUNKSIZE = chunk_size |
---|
951 | |
---|
952 | def set_upload_status(self, upload_status): |
---|
953 | self._status = IUploadStatus(upload_status) |
---|
954 | self.original.set_upload_status(upload_status) |
---|
955 | |
---|
956 | def log(self, *args, **kwargs): |
---|
957 | if "facility" not in kwargs: |
---|
958 | kwargs["facility"] = "upload.encryption" |
---|
959 | if "parent" not in kwargs: |
---|
960 | kwargs["parent"] = self._log_number |
---|
961 | return log.msg(*args, **kwargs) |
---|
962 | |
---|
963 | def get_size(self): |
---|
964 | if self._file_size is not None: |
---|
965 | return defer.succeed(self._file_size) |
---|
966 | d = self.original.get_size() |
---|
967 | def _got_size(size): |
---|
968 | self._file_size = size |
---|
969 | if self._status: |
---|
970 | self._status.set_size(size) |
---|
971 | return size |
---|
972 | d.addCallback(_got_size) |
---|
973 | return d |
---|
974 | |
---|
975 | def get_all_encoding_parameters(self): |
---|
976 | if self._encoding_parameters is not None: |
---|
977 | return defer.succeed(self._encoding_parameters) |
---|
978 | d = self.original.get_all_encoding_parameters() |
---|
979 | def _got(encoding_parameters): |
---|
980 | (k, happy, n, segsize) = encoding_parameters |
---|
981 | self._segment_size = segsize # used by segment hashers |
---|
982 | self._encoding_parameters = encoding_parameters |
---|
983 | self.log("my encoding parameters: %s" % (encoding_parameters,), |
---|
984 | level=log.NOISY) |
---|
985 | return encoding_parameters |
---|
986 | d.addCallback(_got) |
---|
987 | return d |
---|
988 | |
---|
989 | def _get_encryptor(self): |
---|
990 | if self._encryptor: |
---|
991 | return defer.succeed(self._encryptor) |
---|
992 | |
---|
993 | d = self.original.get_encryption_key() |
---|
994 | def _got(key): |
---|
995 | self._encryptor = aes.create_encryptor(key) |
---|
996 | |
---|
997 | storage_index = storage_index_hash(key) |
---|
998 | assert isinstance(storage_index, bytes) |
---|
999 | # There's no point to having the SI be longer than the key, so we |
---|
1000 | # specify that it is truncated to the same 128 bits as the AES key. |
---|
1001 | assert len(storage_index) == 16 # SHA-256 truncated to 128b |
---|
1002 | self._storage_index = storage_index |
---|
1003 | if self._status: |
---|
1004 | self._status.set_storage_index(storage_index) |
---|
1005 | return self._encryptor |
---|
1006 | d.addCallback(_got) |
---|
1007 | return d |
---|
1008 | |
---|
1009 | def get_storage_index(self): |
---|
1010 | d = self._get_encryptor() |
---|
1011 | d.addCallback(lambda res: self._storage_index) |
---|
1012 | return d |
---|
1013 | |
---|
1014 | def _get_segment_hasher(self): |
---|
1015 | p = self._plaintext_segment_hasher |
---|
1016 | if p: |
---|
1017 | left = self._segment_size - self._plaintext_segment_hashed_bytes |
---|
1018 | return p, left |
---|
1019 | p = plaintext_segment_hasher() |
---|
1020 | self._plaintext_segment_hasher = p |
---|
1021 | self._plaintext_segment_hashed_bytes = 0 |
---|
1022 | return p, self._segment_size |
---|
1023 | |
---|
1024 | def _update_segment_hash(self, chunk): |
---|
1025 | offset = 0 |
---|
1026 | while offset < len(chunk): |
---|
1027 | p, segment_left = self._get_segment_hasher() |
---|
1028 | chunk_left = len(chunk) - offset |
---|
1029 | this_segment = min(chunk_left, segment_left) |
---|
1030 | p.update(chunk[offset:offset+this_segment]) |
---|
1031 | self._plaintext_segment_hashed_bytes += this_segment |
---|
1032 | |
---|
1033 | if self._plaintext_segment_hashed_bytes == self._segment_size: |
---|
1034 | # we've filled this segment |
---|
1035 | self._plaintext_segment_hashes.append(p.digest()) |
---|
1036 | self._plaintext_segment_hasher = None |
---|
1037 | self.log("closed hash [%d]: %dB" % |
---|
1038 | (len(self._plaintext_segment_hashes)-1, |
---|
1039 | self._plaintext_segment_hashed_bytes), |
---|
1040 | level=log.NOISY) |
---|
1041 | self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s", |
---|
1042 | segnum=len(self._plaintext_segment_hashes)-1, |
---|
1043 | hash=base32.b2a(p.digest()), |
---|
1044 | level=log.NOISY) |
---|
1045 | |
---|
1046 | offset += this_segment |
---|
1047 | |
---|
1048 | |
---|
1049 | def read_encrypted(self, length, hash_only): |
---|
1050 | # make sure our parameters have been set up first |
---|
1051 | d = self.get_all_encoding_parameters() |
---|
1052 | # and size |
---|
1053 | d.addCallback(lambda ignored: self.get_size()) |
---|
1054 | d.addCallback(lambda ignored: self._get_encryptor()) |
---|
1055 | |
---|
1056 | accum = _Accum(length) |
---|
1057 | |
---|
1058 | def action(): |
---|
1059 | """ |
---|
1060 | Read some bytes into the accumulator. |
---|
1061 | """ |
---|
1062 | return self._read_encrypted(accum, hash_only) |
---|
1063 | |
---|
1064 | def condition(): |
---|
1065 | """ |
---|
1066 | Check to see if the accumulator has all the data. |
---|
1067 | """ |
---|
1068 | return accum.remaining == 0 |
---|
1069 | |
---|
1070 | d.addCallback(lambda ignored: until(action, condition)) |
---|
1071 | d.addCallback(lambda ignored: accum.ciphertext) |
---|
1072 | return d |
---|
1073 | |
---|
1074 | def _read_encrypted(self, |
---|
1075 | ciphertext_accum, # type: _Accum |
---|
1076 | hash_only, # type: bool |
---|
1077 | ): |
---|
1078 | # type: (...) -> defer.Deferred |
---|
1079 | """ |
---|
1080 | Read the next chunk of plaintext, encrypt it, and extend the accumulator |
---|
1081 | with the resulting ciphertext. |
---|
1082 | """ |
---|
1083 | # tolerate large length= values without consuming a lot of RAM by |
---|
1084 | # reading just a chunk (say 50kB) at a time. This only really matters |
---|
1085 | # when hash_only==True (i.e. resuming an interrupted upload), since |
---|
1086 | # that's the case where we will be skipping over a lot of data. |
---|
1087 | size = min(ciphertext_accum.remaining, self.CHUNKSIZE) |
---|
1088 | |
---|
1089 | # read a chunk of plaintext.. |
---|
1090 | d = defer.maybeDeferred(self.original.read, size) |
---|
1091 | def _good(plaintext): |
---|
1092 | # and encrypt it.. |
---|
1093 | # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/' |
---|
1094 | ct = self._hash_and_encrypt_plaintext(plaintext, hash_only) |
---|
1095 | # Intentionally tell the accumulator about the expected size, not |
---|
1096 | # the actual size. If we run out of data we still want remaining |
---|
1097 | # to drop otherwise it will never reach 0 and the loop will never |
---|
1098 | # end. |
---|
1099 | ciphertext_accum.extend(size, ct) |
---|
1100 | d.addCallback(_good) |
---|
1101 | return d |
---|
1102 | |
---|
1103 | def _hash_and_encrypt_plaintext(self, data, hash_only): |
---|
1104 | assert isinstance(data, (tuple, list)), type(data) |
---|
1105 | data = list(data) |
---|
1106 | cryptdata = [] |
---|
1107 | # we use data.pop(0) instead of 'for chunk in data' to save |
---|
1108 | # memory: each chunk is destroyed as soon as we're done with it. |
---|
1109 | bytes_processed = 0 |
---|
1110 | while data: |
---|
1111 | chunk = data.pop(0) |
---|
1112 | self.log(" read_encrypted handling %dB-sized chunk" % len(chunk), |
---|
1113 | level=log.NOISY) |
---|
1114 | bytes_processed += len(chunk) |
---|
1115 | self._plaintext_hasher.update(chunk) |
---|
1116 | self._update_segment_hash(chunk) |
---|
1117 | # TODO: we have to encrypt the data (even if hash_only==True) |
---|
1118 | # because the AES-CTR implementation doesn't offer a |
---|
1119 | # way to change the counter value. Once it acquires |
---|
1120 | # this ability, change this to simply update the counter |
---|
1121 | # before each call to (hash_only==False) encrypt_data |
---|
1122 | ciphertext = aes.encrypt_data(self._encryptor, chunk) |
---|
1123 | if hash_only: |
---|
1124 | self.log(" skipping encryption", level=log.NOISY) |
---|
1125 | else: |
---|
1126 | cryptdata.append(ciphertext) |
---|
1127 | del ciphertext |
---|
1128 | del chunk |
---|
1129 | self._ciphertext_bytes_read += bytes_processed |
---|
1130 | if self._status: |
---|
1131 | progress = float(self._ciphertext_bytes_read) / self._file_size |
---|
1132 | self._status.set_progress(1, progress) |
---|
1133 | return cryptdata |
---|
1134 | |
---|
1135 | |
---|
1136 | def get_plaintext_hashtree_leaves(self, first, last, num_segments): |
---|
1137 | # this is currently unused, but will live again when we fix #453 |
---|
1138 | if len(self._plaintext_segment_hashes) < num_segments: |
---|
1139 | # close out the last one |
---|
1140 | assert len(self._plaintext_segment_hashes) == num_segments-1 |
---|
1141 | p, segment_left = self._get_segment_hasher() |
---|
1142 | self._plaintext_segment_hashes.append(p.digest()) |
---|
1143 | del self._plaintext_segment_hasher |
---|
1144 | self.log("closing plaintext leaf hasher, hashed %d bytes" % |
---|
1145 | self._plaintext_segment_hashed_bytes, |
---|
1146 | level=log.NOISY) |
---|
1147 | self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s", |
---|
1148 | segnum=len(self._plaintext_segment_hashes)-1, |
---|
1149 | hash=base32.b2a(p.digest()), |
---|
1150 | level=log.NOISY) |
---|
1151 | assert len(self._plaintext_segment_hashes) == num_segments |
---|
1152 | return defer.succeed(tuple(self._plaintext_segment_hashes[first:last])) |
---|
1153 | |
---|
1154 | def get_plaintext_hash(self): |
---|
1155 | h = self._plaintext_hasher.digest() |
---|
1156 | return defer.succeed(h) |
---|
1157 | |
---|
1158 | def close(self): |
---|
1159 | return self.original.close() |
---|
1160 | |
---|
1161 | @implementer(IUploadStatus) |
---|
1162 | class UploadStatus(object): |
---|
1163 | statusid_counter = itertools.count(0) |
---|
1164 | |
---|
1165 | def __init__(self): |
---|
1166 | self.storage_index = None |
---|
1167 | self.size = None |
---|
1168 | self.helper = False |
---|
1169 | self.status = "Not started" |
---|
1170 | self.progress = [0.0, 0.0, 0.0] |
---|
1171 | self.active = True |
---|
1172 | self.results = None |
---|
1173 | self.counter = next(self.statusid_counter) |
---|
1174 | self.started = time.time() |
---|
1175 | |
---|
1176 | def get_started(self): |
---|
1177 | return self.started |
---|
1178 | def get_storage_index(self): |
---|
1179 | return self.storage_index |
---|
1180 | def get_size(self): |
---|
1181 | return self.size |
---|
1182 | def using_helper(self): |
---|
1183 | return self.helper |
---|
1184 | def get_status(self): |
---|
1185 | return self.status |
---|
1186 | def get_progress(self): |
---|
1187 | return tuple(self.progress) |
---|
1188 | def get_active(self): |
---|
1189 | return self.active |
---|
1190 | def get_results(self): |
---|
1191 | return self.results |
---|
1192 | def get_counter(self): |
---|
1193 | return self.counter |
---|
1194 | |
---|
1195 | def set_storage_index(self, si): |
---|
1196 | self.storage_index = si |
---|
1197 | def set_size(self, size): |
---|
1198 | self.size = size |
---|
1199 | def set_helper(self, helper): |
---|
1200 | self.helper = helper |
---|
1201 | def set_status(self, status): |
---|
1202 | self.status = status |
---|
1203 | def set_progress(self, which, value): |
---|
1204 | # [0]: chk, [1]: ciphertext, [2]: encode+push |
---|
1205 | self.progress[which] = value |
---|
1206 | def set_active(self, value): |
---|
1207 | self.active = value |
---|
1208 | def set_results(self, value): |
---|
1209 | self.results = value |
---|
1210 | |
---|
1211 | class CHKUploader(object): |
---|
1212 | |
---|
1213 | def __init__(self, storage_broker, secret_holder, reactor=None): |
---|
1214 | # server_selector needs storage_broker and secret_holder |
---|
1215 | self._storage_broker = storage_broker |
---|
1216 | self._secret_holder = secret_holder |
---|
1217 | self._log_number = self.log("CHKUploader starting", parent=None) |
---|
1218 | self._encoder = None |
---|
1219 | self._storage_index = None |
---|
1220 | self._upload_status = UploadStatus() |
---|
1221 | self._upload_status.set_helper(False) |
---|
1222 | self._upload_status.set_active(True) |
---|
1223 | self._reactor = reactor |
---|
1224 | |
---|
1225 | # locate_all_shareholders() will create the following attribute: |
---|
1226 | # self._server_trackers = {} # k: shnum, v: instance of ServerTracker |
---|
1227 | |
---|
1228 | def log(self, *args, **kwargs): |
---|
1229 | if "parent" not in kwargs: |
---|
1230 | kwargs["parent"] = self._log_number |
---|
1231 | if "facility" not in kwargs: |
---|
1232 | kwargs["facility"] = "tahoe.upload" |
---|
1233 | return log.msg(*args, **kwargs) |
---|
1234 | |
---|
1235 | @log_call_deferred(action_type=u"immutable:upload:chk:start") |
---|
1236 | def start(self, encrypted_uploadable): |
---|
1237 | """Start uploading the file. |
---|
1238 | |
---|
1239 | Returns a Deferred that will fire with the UploadResults instance. |
---|
1240 | """ |
---|
1241 | |
---|
1242 | self._started = time.time() |
---|
1243 | eu = IEncryptedUploadable(encrypted_uploadable) |
---|
1244 | self.log("starting upload of %s" % eu) |
---|
1245 | |
---|
1246 | eu.set_upload_status(self._upload_status) |
---|
1247 | d = self.start_encrypted(eu) |
---|
1248 | def _done(uploadresults): |
---|
1249 | self._upload_status.set_active(False) |
---|
1250 | return uploadresults |
---|
1251 | d.addBoth(_done) |
---|
1252 | return d |
---|
1253 | |
---|
1254 | def abort(self): |
---|
1255 | """Call this if the upload must be abandoned before it completes. |
---|
1256 | This will tell the shareholders to delete their partial shares. I |
---|
1257 | return a Deferred that fires when these messages have been acked.""" |
---|
1258 | if not self._encoder: |
---|
1259 | # how did you call abort() before calling start() ? |
---|
1260 | return defer.succeed(None) |
---|
1261 | return self._encoder.abort() |
---|
1262 | |
---|
1263 | @log_call_deferred(action_type=u"immutable:upload:chk:start-encrypted") |
---|
1264 | @inline_callbacks |
---|
1265 | def start_encrypted(self, encrypted): |
---|
1266 | """ |
---|
1267 | Returns a Deferred that will fire with the UploadResults instance. |
---|
1268 | """ |
---|
1269 | eu = IEncryptedUploadable(encrypted) |
---|
1270 | |
---|
1271 | started = time.time() |
---|
1272 | # would be Really Nice to make Encoder just a local; only |
---|
1273 | # abort() really needs self._encoder ... |
---|
1274 | self._encoder = encode.Encoder( |
---|
1275 | self._log_number, |
---|
1276 | self._upload_status, |
---|
1277 | ) |
---|
1278 | # this just returns itself |
---|
1279 | yield self._encoder.set_encrypted_uploadable(eu) |
---|
1280 | with LOCATE_ALL_SHAREHOLDERS() as action: |
---|
1281 | (upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started) |
---|
1282 | action.add_success_fields(upload_trackers=upload_trackers, already_serverids=already_serverids) |
---|
1283 | self.set_shareholders(upload_trackers, already_serverids, self._encoder) |
---|
1284 | verifycap = yield self._encoder.start() |
---|
1285 | results = self._encrypted_done(verifycap) |
---|
1286 | defer.returnValue(results) |
---|
1287 | |
---|
1288 | def locate_all_shareholders(self, encoder, started): |
---|
1289 | server_selection_started = now = time.time() |
---|
1290 | self._storage_index_elapsed = now - started |
---|
1291 | storage_broker = self._storage_broker |
---|
1292 | secret_holder = self._secret_holder |
---|
1293 | storage_index = encoder.get_param("storage_index") |
---|
1294 | self._storage_index = storage_index |
---|
1295 | upload_id = si_b2a(storage_index)[:5] |
---|
1296 | self.log("using storage index %r" % upload_id) |
---|
1297 | server_selector = Tahoe2ServerSelector( |
---|
1298 | upload_id, |
---|
1299 | self._log_number, |
---|
1300 | self._upload_status, |
---|
1301 | reactor=self._reactor, |
---|
1302 | ) |
---|
1303 | |
---|
1304 | share_size = encoder.get_param("share_size") |
---|
1305 | block_size = encoder.get_param("block_size") |
---|
1306 | num_segments = encoder.get_param("num_segments") |
---|
1307 | k, desired, n = encoder.get_param("share_counts") |
---|
1308 | |
---|
1309 | self._server_selection_started = time.time() |
---|
1310 | d = server_selector.get_shareholders(storage_broker, secret_holder, |
---|
1311 | storage_index, |
---|
1312 | share_size, block_size, |
---|
1313 | num_segments, n, k, desired, |
---|
1314 | encoder.get_uri_extension_size()) |
---|
1315 | def _done(res): |
---|
1316 | self._server_selection_elapsed = time.time() - server_selection_started |
---|
1317 | return res |
---|
1318 | d.addCallback(_done) |
---|
1319 | return d |
---|
1320 | |
---|
1321 | def set_shareholders(self, upload_trackers, already_serverids, encoder): |
---|
1322 | """ |
---|
1323 | :param upload_trackers: a sequence of ServerTracker objects that |
---|
1324 | have agreed to hold some shares for us (the |
---|
1325 | shareids are stashed inside the ServerTracker) |
---|
1326 | |
---|
1327 | :param already_serverids: a dict mapping sharenum to a set of |
---|
1328 | serverids for servers that claim to already |
---|
1329 | have this share |
---|
1330 | """ |
---|
1331 | msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s" |
---|
1332 | values = ([', '.join([str_shareloc(k,v) |
---|
1333 | for k,v in st.buckets.items()]) |
---|
1334 | for st in upload_trackers], already_serverids) |
---|
1335 | self.log(msgtempl % values, level=log.OPERATIONAL) |
---|
1336 | # record already-present shares in self._results |
---|
1337 | self._count_preexisting_shares = len(already_serverids) |
---|
1338 | |
---|
1339 | self._server_trackers = {} # k: shnum, v: instance of ServerTracker |
---|
1340 | for tracker in upload_trackers: |
---|
1341 | assert isinstance(tracker, ServerTracker) |
---|
1342 | buckets = {} |
---|
1343 | servermap = already_serverids.copy() |
---|
1344 | for tracker in upload_trackers: |
---|
1345 | buckets.update(tracker.buckets) |
---|
1346 | for shnum in tracker.buckets: |
---|
1347 | self._server_trackers[shnum] = tracker |
---|
1348 | servermap.setdefault(shnum, set()).add(tracker.get_serverid()) |
---|
1349 | assert len(buckets) == sum([len(tracker.buckets) |
---|
1350 | for tracker in upload_trackers]), \ |
---|
1351 | "%s (%s) != %s (%s)" % ( |
---|
1352 | len(buckets), |
---|
1353 | buckets, |
---|
1354 | sum([len(tracker.buckets) for tracker in upload_trackers]), |
---|
1355 | [(t.buckets, t.get_serverid()) for t in upload_trackers] |
---|
1356 | ) |
---|
1357 | encoder.set_shareholders(buckets, servermap) |
---|
1358 | |
---|
1359 | def _encrypted_done(self, verifycap): |
---|
1360 | """ |
---|
1361 | :return UploadResults: A description of the outcome of the upload. |
---|
1362 | """ |
---|
1363 | e = self._encoder |
---|
1364 | sharemap = dictutil.DictOfSets() |
---|
1365 | servermap = dictutil.DictOfSets() |
---|
1366 | for shnum in e.get_shares_placed(): |
---|
1367 | server = self._server_trackers[shnum].get_server() |
---|
1368 | sharemap.add(shnum, server) |
---|
1369 | servermap.add(server, shnum) |
---|
1370 | now = time.time() |
---|
1371 | timings = {} |
---|
1372 | timings["total"] = now - self._started |
---|
1373 | timings["storage_index"] = self._storage_index_elapsed |
---|
1374 | timings["peer_selection"] = self._server_selection_elapsed |
---|
1375 | timings.update(e.get_times()) |
---|
1376 | ur = UploadResults(file_size=e.file_size, |
---|
1377 | ciphertext_fetched=0, |
---|
1378 | preexisting_shares=self._count_preexisting_shares, |
---|
1379 | pushed_shares=len(e.get_shares_placed()), |
---|
1380 | sharemap=sharemap, |
---|
1381 | servermap=servermap, |
---|
1382 | timings=timings, |
---|
1383 | uri_extension_data=e.get_uri_extension_data(), |
---|
1384 | uri_extension_hash=e.get_uri_extension_hash(), |
---|
1385 | verifycapstr=verifycap.to_string()) |
---|
1386 | self._upload_status.set_results(ur) |
---|
1387 | return ur |
---|
1388 | |
---|
1389 | def get_upload_status(self): |
---|
1390 | return self._upload_status |
---|
1391 | |
---|
1392 | def read_this_many_bytes(uploadable, size, prepend_data=None): |
---|
1393 | if prepend_data is None: |
---|
1394 | prepend_data = [] |
---|
1395 | if size == 0: |
---|
1396 | return defer.succeed([]) |
---|
1397 | d = uploadable.read(size) |
---|
1398 | def _got(data): |
---|
1399 | assert isinstance(data, list) |
---|
1400 | bytes = sum([len(piece) for piece in data]) |
---|
1401 | assert bytes > 0 |
---|
1402 | assert bytes <= size |
---|
1403 | remaining = size - bytes |
---|
1404 | if remaining: |
---|
1405 | return read_this_many_bytes(uploadable, remaining, |
---|
1406 | prepend_data + data) |
---|
1407 | return prepend_data + data |
---|
1408 | d.addCallback(_got) |
---|
1409 | return d |
---|
1410 | |
---|
1411 | class LiteralUploader(object): |
---|
1412 | |
---|
1413 | def __init__(self): |
---|
1414 | self._status = s = UploadStatus() |
---|
1415 | s.set_storage_index(None) |
---|
1416 | s.set_helper(False) |
---|
1417 | s.set_progress(0, 1.0) |
---|
1418 | s.set_active(False) |
---|
1419 | |
---|
1420 | def start(self, uploadable): |
---|
1421 | uploadable = IUploadable(uploadable) |
---|
1422 | d = uploadable.get_size() |
---|
1423 | def _got_size(size): |
---|
1424 | self._size = size |
---|
1425 | self._status.set_size(size) |
---|
1426 | return read_this_many_bytes(uploadable, size) |
---|
1427 | d.addCallback(_got_size) |
---|
1428 | d.addCallback(lambda data: uri.LiteralFileURI(b"".join(data))) |
---|
1429 | d.addCallback(lambda u: u.to_string()) |
---|
1430 | d.addCallback(self._build_results) |
---|
1431 | return d |
---|
1432 | |
---|
1433 | def _build_results(self, uri): |
---|
1434 | ur = UploadResults(file_size=self._size, |
---|
1435 | ciphertext_fetched=0, |
---|
1436 | preexisting_shares=0, |
---|
1437 | pushed_shares=0, |
---|
1438 | sharemap={}, |
---|
1439 | servermap={}, |
---|
1440 | timings={}, |
---|
1441 | uri_extension_data=None, |
---|
1442 | uri_extension_hash=None, |
---|
1443 | verifycapstr=None) |
---|
1444 | ur.set_uri(uri) |
---|
1445 | self._status.set_status("Finished") |
---|
1446 | self._status.set_progress(1, 1.0) |
---|
1447 | self._status.set_progress(2, 1.0) |
---|
1448 | self._status.set_results(ur) |
---|
1449 | return ur |
---|
1450 | |
---|
1451 | def close(self): |
---|
1452 | pass |
---|
1453 | |
---|
1454 | def get_upload_status(self): |
---|
1455 | return self._status |
---|
1456 | |
---|
1457 | @implementer(RIEncryptedUploadable) |
---|
1458 | class RemoteEncryptedUploadable(Referenceable): # type: ignore # warner/foolscap#78 |
---|
1459 | |
---|
1460 | def __init__(self, encrypted_uploadable, upload_status): |
---|
1461 | self._eu = IEncryptedUploadable(encrypted_uploadable) |
---|
1462 | self._offset = 0 |
---|
1463 | self._bytes_sent = 0 |
---|
1464 | self._status = IUploadStatus(upload_status) |
---|
1465 | # we are responsible for updating the status string while we run, and |
---|
1466 | # for setting the ciphertext-fetch progress. |
---|
1467 | self._size = None |
---|
1468 | |
---|
1469 | def get_size(self): |
---|
1470 | if self._size is not None: |
---|
1471 | return defer.succeed(self._size) |
---|
1472 | d = self._eu.get_size() |
---|
1473 | def _got_size(size): |
---|
1474 | self._size = size |
---|
1475 | return size |
---|
1476 | d.addCallback(_got_size) |
---|
1477 | return d |
---|
1478 | |
---|
1479 | def remote_get_size(self): |
---|
1480 | return self.get_size() |
---|
1481 | def remote_get_all_encoding_parameters(self): |
---|
1482 | return self._eu.get_all_encoding_parameters() |
---|
1483 | |
---|
1484 | def _read_encrypted(self, length, hash_only): |
---|
1485 | d = self._eu.read_encrypted(length, hash_only) |
---|
1486 | def _read(strings): |
---|
1487 | if hash_only: |
---|
1488 | self._offset += length |
---|
1489 | else: |
---|
1490 | size = sum([len(data) for data in strings]) |
---|
1491 | self._offset += size |
---|
1492 | return strings |
---|
1493 | d.addCallback(_read) |
---|
1494 | return d |
---|
1495 | |
---|
1496 | def remote_read_encrypted(self, offset, length): |
---|
1497 | # we don't support seek backwards, but we allow skipping forwards |
---|
1498 | precondition(offset >= 0, offset) |
---|
1499 | precondition(length >= 0, length) |
---|
1500 | lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length), |
---|
1501 | level=log.NOISY) |
---|
1502 | precondition(offset >= self._offset, offset, self._offset) |
---|
1503 | if offset > self._offset: |
---|
1504 | # read the data from disk anyways, to build up the hash tree |
---|
1505 | skip = offset - self._offset |
---|
1506 | log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" % |
---|
1507 | (self._offset, offset, skip), level=log.UNUSUAL, parent=lp) |
---|
1508 | d = self._read_encrypted(skip, hash_only=True) |
---|
1509 | else: |
---|
1510 | d = defer.succeed(None) |
---|
1511 | |
---|
1512 | def _at_correct_offset(res): |
---|
1513 | assert offset == self._offset, "%d != %d" % (offset, self._offset) |
---|
1514 | return self._read_encrypted(length, hash_only=False) |
---|
1515 | d.addCallback(_at_correct_offset) |
---|
1516 | |
---|
1517 | def _read(strings): |
---|
1518 | size = sum([len(data) for data in strings]) |
---|
1519 | self._bytes_sent += size |
---|
1520 | return strings |
---|
1521 | d.addCallback(_read) |
---|
1522 | return d |
---|
1523 | |
---|
1524 | def remote_close(self): |
---|
1525 | return self._eu.close() |
---|
1526 | |
---|
1527 | |
---|
1528 | class AssistedUploader(object): |
---|
1529 | |
---|
1530 | def __init__(self, helper, storage_broker): |
---|
1531 | self._helper = helper |
---|
1532 | self._storage_broker = storage_broker |
---|
1533 | self._log_number = log.msg("AssistedUploader starting") |
---|
1534 | self._storage_index = None |
---|
1535 | self._upload_status = s = UploadStatus() |
---|
1536 | s.set_helper(True) |
---|
1537 | s.set_active(True) |
---|
1538 | |
---|
1539 | def log(self, *args, **kwargs): |
---|
1540 | if "parent" not in kwargs: |
---|
1541 | kwargs["parent"] = self._log_number |
---|
1542 | return log.msg(*args, **kwargs) |
---|
1543 | |
---|
1544 | def start(self, encrypted_uploadable, storage_index): |
---|
1545 | """Start uploading the file. |
---|
1546 | |
---|
1547 | Returns a Deferred that will fire with the UploadResults instance. |
---|
1548 | """ |
---|
1549 | precondition(isinstance(storage_index, bytes), storage_index) |
---|
1550 | self._started = time.time() |
---|
1551 | eu = IEncryptedUploadable(encrypted_uploadable) |
---|
1552 | eu.set_upload_status(self._upload_status) |
---|
1553 | self._encuploadable = eu |
---|
1554 | self._storage_index = storage_index |
---|
1555 | d = eu.get_size() |
---|
1556 | d.addCallback(self._got_size) |
---|
1557 | d.addCallback(lambda res: eu.get_all_encoding_parameters()) |
---|
1558 | d.addCallback(self._got_all_encoding_parameters) |
---|
1559 | d.addCallback(self._contact_helper) |
---|
1560 | d.addCallback(self._build_verifycap) |
---|
1561 | def _done(res): |
---|
1562 | self._upload_status.set_active(False) |
---|
1563 | return res |
---|
1564 | d.addBoth(_done) |
---|
1565 | return d |
---|
1566 | |
---|
1567 | def _got_size(self, size): |
---|
1568 | self._size = size |
---|
1569 | self._upload_status.set_size(size) |
---|
1570 | |
---|
1571 | def _got_all_encoding_parameters(self, params): |
---|
1572 | k, happy, n, segment_size = params |
---|
1573 | # stash these for URI generation later |
---|
1574 | self._needed_shares = k |
---|
1575 | self._total_shares = n |
---|
1576 | self._segment_size = segment_size |
---|
1577 | |
---|
1578 | def _contact_helper(self, res): |
---|
1579 | now = self._time_contacting_helper_start = time.time() |
---|
1580 | self._storage_index_elapsed = now - self._started |
---|
1581 | self.log(format="contacting helper for SI %(si)s..", |
---|
1582 | si=si_b2a(self._storage_index), level=log.NOISY) |
---|
1583 | self._upload_status.set_status("Contacting Helper") |
---|
1584 | d = self._helper.callRemote("upload_chk", self._storage_index) |
---|
1585 | d.addCallback(self._contacted_helper) |
---|
1586 | return d |
---|
1587 | |
---|
1588 | def _contacted_helper(self, helper_upload_results_and_upload_helper): |
---|
1589 | (helper_upload_results, upload_helper) = helper_upload_results_and_upload_helper |
---|
1590 | now = time.time() |
---|
1591 | elapsed = now - self._time_contacting_helper_start |
---|
1592 | self._elapsed_time_contacting_helper = elapsed |
---|
1593 | if upload_helper: |
---|
1594 | self.log("helper says we need to upload", level=log.NOISY) |
---|
1595 | self._upload_status.set_status("Uploading Ciphertext") |
---|
1596 | # we need to upload the file |
---|
1597 | reu = RemoteEncryptedUploadable(self._encuploadable, |
---|
1598 | self._upload_status) |
---|
1599 | # let it pre-compute the size for progress purposes |
---|
1600 | d = reu.get_size() |
---|
1601 | d.addCallback(lambda ignored: |
---|
1602 | upload_helper.callRemote("upload", reu)) |
---|
1603 | # this Deferred will fire with the upload results |
---|
1604 | return d |
---|
1605 | self.log("helper says file is already uploaded", level=log.OPERATIONAL) |
---|
1606 | self._upload_status.set_progress(1, 1.0) |
---|
1607 | return helper_upload_results |
---|
1608 | |
---|
1609 | def _convert_old_upload_results(self, upload_results): |
---|
1610 | # pre-1.3.0 helpers return upload results which contain a mapping |
---|
1611 | # from shnum to a single human-readable string, containing things |
---|
1612 | # like "Found on [x],[y],[z]" (for healthy files that were already in |
---|
1613 | # the grid), "Found on [x]" (for files that needed upload but which |
---|
1614 | # discovered pre-existing shares), and "Placed on [x]" (for newly |
---|
1615 | # uploaded shares). The 1.3.0 helper returns a mapping from shnum to |
---|
1616 | # set of binary serverid strings. |
---|
1617 | |
---|
1618 | # the old results are too hard to deal with (they don't even contain |
---|
1619 | # as much information as the new results, since the nodeids are |
---|
1620 | # abbreviated), so if we detect old results, just clobber them. |
---|
1621 | |
---|
1622 | sharemap = upload_results.sharemap |
---|
1623 | if any(isinstance(v, (bytes, str)) for v in sharemap.values()): |
---|
1624 | upload_results.sharemap = None |
---|
1625 | |
---|
1626 | def _build_verifycap(self, helper_upload_results): |
---|
1627 | self.log("upload finished, building readcap", level=log.OPERATIONAL) |
---|
1628 | self._convert_old_upload_results(helper_upload_results) |
---|
1629 | self._upload_status.set_status("Building Readcap") |
---|
1630 | hur = helper_upload_results |
---|
1631 | assert hur.uri_extension_data["needed_shares"] == self._needed_shares |
---|
1632 | assert hur.uri_extension_data["total_shares"] == self._total_shares |
---|
1633 | assert hur.uri_extension_data["segment_size"] == self._segment_size |
---|
1634 | assert hur.uri_extension_data["size"] == self._size |
---|
1635 | |
---|
1636 | # hur.verifycap doesn't exist if already found |
---|
1637 | v = uri.CHKFileVerifierURI(self._storage_index, |
---|
1638 | uri_extension_hash=hur.uri_extension_hash, |
---|
1639 | needed_shares=self._needed_shares, |
---|
1640 | total_shares=self._total_shares, |
---|
1641 | size=self._size) |
---|
1642 | timings = {} |
---|
1643 | timings["storage_index"] = self._storage_index_elapsed |
---|
1644 | timings["contacting_helper"] = self._elapsed_time_contacting_helper |
---|
1645 | for key,val in hur.timings.items(): |
---|
1646 | if key == "total": |
---|
1647 | key = "helper_total" |
---|
1648 | timings[key] = val |
---|
1649 | now = time.time() |
---|
1650 | timings["total"] = now - self._started |
---|
1651 | |
---|
1652 | # Note: older Helpers (<=1.11) sent tubids as serverids. Newer ones |
---|
1653 | # send pubkeys. get_stub_server() knows how to map both into |
---|
1654 | # IDisplayableServer instances. |
---|
1655 | gss = self._storage_broker.get_stub_server |
---|
1656 | sharemap = {} |
---|
1657 | servermap = {} |
---|
1658 | for shnum, serverids in hur.sharemap.items(): |
---|
1659 | sharemap[shnum] = set([gss(serverid) for serverid in serverids]) |
---|
1660 | # if the file was already in the grid, hur.servermap is an empty dict |
---|
1661 | for serverid, shnums in hur.servermap.items(): |
---|
1662 | servermap[gss(serverid)] = set(shnums) |
---|
1663 | |
---|
1664 | ur = UploadResults(file_size=self._size, |
---|
1665 | # not if already found |
---|
1666 | ciphertext_fetched=hur.ciphertext_fetched, |
---|
1667 | preexisting_shares=hur.preexisting_shares, |
---|
1668 | pushed_shares=hur.pushed_shares, |
---|
1669 | sharemap=sharemap, |
---|
1670 | servermap=servermap, |
---|
1671 | timings=timings, |
---|
1672 | uri_extension_data=hur.uri_extension_data, |
---|
1673 | uri_extension_hash=hur.uri_extension_hash, |
---|
1674 | verifycapstr=v.to_string()) |
---|
1675 | |
---|
1676 | self._upload_status.set_status("Finished") |
---|
1677 | self._upload_status.set_results(ur) |
---|
1678 | return ur |
---|
1679 | |
---|
1680 | def get_upload_status(self): |
---|
1681 | return self._upload_status |
---|
1682 | |
---|
1683 | class BaseUploadable(object): |
---|
1684 | # this is overridden by max_segment_size |
---|
1685 | default_max_segment_size = DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE |
---|
1686 | default_params_set = False |
---|
1687 | |
---|
1688 | max_segment_size = None |
---|
1689 | encoding_param_k = None |
---|
1690 | encoding_param_happy = None |
---|
1691 | encoding_param_n = None |
---|
1692 | |
---|
1693 | _all_encoding_parameters = None |
---|
1694 | _status = None |
---|
1695 | |
---|
1696 | def set_upload_status(self, upload_status): |
---|
1697 | self._status = IUploadStatus(upload_status) |
---|
1698 | |
---|
1699 | def set_default_encoding_parameters(self, default_params): |
---|
1700 | assert isinstance(default_params, dict) |
---|
1701 | for k,v in default_params.items(): |
---|
1702 | precondition(isinstance(k, (bytes, str)), k, v) |
---|
1703 | precondition(isinstance(v, int), k, v) |
---|
1704 | if "k" in default_params: |
---|
1705 | self.default_encoding_param_k = default_params["k"] |
---|
1706 | if "happy" in default_params: |
---|
1707 | self.default_encoding_param_happy = default_params["happy"] |
---|
1708 | if "n" in default_params: |
---|
1709 | self.default_encoding_param_n = default_params["n"] |
---|
1710 | if "max_segment_size" in default_params: |
---|
1711 | self.default_max_segment_size = default_params["max_segment_size"] |
---|
1712 | self.default_params_set = True |
---|
1713 | |
---|
1714 | def get_all_encoding_parameters(self): |
---|
1715 | _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,)) |
---|
1716 | if self._all_encoding_parameters: |
---|
1717 | return defer.succeed(self._all_encoding_parameters) |
---|
1718 | |
---|
1719 | max_segsize = self.max_segment_size or self.default_max_segment_size |
---|
1720 | k = self.encoding_param_k or self.default_encoding_param_k |
---|
1721 | happy = self.encoding_param_happy or self.default_encoding_param_happy |
---|
1722 | n = self.encoding_param_n or self.default_encoding_param_n |
---|
1723 | |
---|
1724 | d = self.get_size() |
---|
1725 | def _got_size(file_size): |
---|
1726 | # for small files, shrink the segment size to avoid wasting space |
---|
1727 | segsize = min(max_segsize, file_size) |
---|
1728 | # this must be a multiple of 'required_shares'==k |
---|
1729 | segsize = mathutil.next_multiple(segsize, k) |
---|
1730 | encoding_parameters = (k, happy, n, segsize) |
---|
1731 | self._all_encoding_parameters = encoding_parameters |
---|
1732 | return encoding_parameters |
---|
1733 | d.addCallback(_got_size) |
---|
1734 | return d |
---|
1735 | |
---|
1736 | @implementer(IUploadable) |
---|
1737 | class FileHandle(BaseUploadable): |
---|
1738 | |
---|
1739 | def __init__(self, filehandle, convergence): |
---|
1740 | """ |
---|
1741 | Upload the data from the filehandle. If convergence is None then a |
---|
1742 | random encryption key will be used, else the plaintext will be hashed, |
---|
1743 | then the hash will be hashed together with the string in the |
---|
1744 | "convergence" argument to form the encryption key. |
---|
1745 | """ |
---|
1746 | assert convergence is None or isinstance(convergence, bytes), (convergence, type(convergence)) |
---|
1747 | self._filehandle = filehandle |
---|
1748 | self._key = None |
---|
1749 | self.convergence = convergence |
---|
1750 | self._size = None |
---|
1751 | |
---|
1752 | def _get_encryption_key_convergent(self): |
---|
1753 | if self._key is not None: |
---|
1754 | return defer.succeed(self._key) |
---|
1755 | |
---|
1756 | d = self.get_size() |
---|
1757 | # that sets self._size as a side-effect |
---|
1758 | d.addCallback(lambda size: self.get_all_encoding_parameters()) |
---|
1759 | def _got(params): |
---|
1760 | k, happy, n, segsize = params |
---|
1761 | f = self._filehandle |
---|
1762 | enckey_hasher = convergence_hasher(k, n, segsize, self.convergence) |
---|
1763 | f.seek(0) |
---|
1764 | BLOCKSIZE = 64*1024 |
---|
1765 | bytes_read = 0 |
---|
1766 | while True: |
---|
1767 | data = f.read(BLOCKSIZE) |
---|
1768 | if not data: |
---|
1769 | break |
---|
1770 | enckey_hasher.update(data) |
---|
1771 | # TODO: setting progress in a non-yielding loop is kind of |
---|
1772 | # pointless, but I'm anticipating (perhaps prematurely) the |
---|
1773 | # day when we use a slowjob or twisted's CooperatorService to |
---|
1774 | # make this yield time to other jobs. |
---|
1775 | bytes_read += len(data) |
---|
1776 | if self._status: |
---|
1777 | self._status.set_progress(0, float(bytes_read)/self._size) |
---|
1778 | f.seek(0) |
---|
1779 | self._key = enckey_hasher.digest() |
---|
1780 | if self._status: |
---|
1781 | self._status.set_progress(0, 1.0) |
---|
1782 | assert len(self._key) == 16 |
---|
1783 | return self._key |
---|
1784 | d.addCallback(_got) |
---|
1785 | return d |
---|
1786 | |
---|
1787 | def _get_encryption_key_random(self): |
---|
1788 | if self._key is None: |
---|
1789 | self._key = os.urandom(16) |
---|
1790 | return defer.succeed(self._key) |
---|
1791 | |
---|
1792 | def get_encryption_key(self): |
---|
1793 | if self.convergence is not None: |
---|
1794 | return self._get_encryption_key_convergent() |
---|
1795 | else: |
---|
1796 | return self._get_encryption_key_random() |
---|
1797 | |
---|
1798 | def get_size(self): |
---|
1799 | if self._size is not None: |
---|
1800 | return defer.succeed(self._size) |
---|
1801 | self._filehandle.seek(0, os.SEEK_END) |
---|
1802 | size = self._filehandle.tell() |
---|
1803 | self._size = size |
---|
1804 | self._filehandle.seek(0) |
---|
1805 | return defer.succeed(size) |
---|
1806 | |
---|
1807 | def read(self, length): |
---|
1808 | return defer.succeed([self._filehandle.read(length)]) |
---|
1809 | |
---|
1810 | def close(self): |
---|
1811 | # the originator of the filehandle reserves the right to close it |
---|
1812 | pass |
---|
1813 | |
---|
1814 | class FileName(FileHandle): |
---|
1815 | def __init__(self, filename, convergence): |
---|
1816 | """ |
---|
1817 | Upload the data from the filename. If convergence is None then a |
---|
1818 | random encryption key will be used, else the plaintext will be hashed, |
---|
1819 | then the hash will be hashed together with the string in the |
---|
1820 | "convergence" argument to form the encryption key. |
---|
1821 | """ |
---|
1822 | assert convergence is None or isinstance(convergence, bytes), (convergence, type(convergence)) |
---|
1823 | FileHandle.__init__(self, open(filename, "rb"), convergence=convergence) |
---|
1824 | def close(self): |
---|
1825 | FileHandle.close(self) |
---|
1826 | self._filehandle.close() |
---|
1827 | |
---|
1828 | class Data(FileHandle): |
---|
1829 | def __init__(self, data, convergence): |
---|
1830 | """ |
---|
1831 | Upload the data from the data argument. If convergence is None then a |
---|
1832 | random encryption key will be used, else the plaintext will be hashed, |
---|
1833 | then the hash will be hashed together with the string in the |
---|
1834 | "convergence" argument to form the encryption key. |
---|
1835 | """ |
---|
1836 | assert convergence is None or isinstance(convergence, bytes), (convergence, type(convergence)) |
---|
1837 | FileHandle.__init__(self, BytesIO(data), convergence=convergence) |
---|
1838 | |
---|
1839 | @implementer(IUploader) |
---|
1840 | class Uploader(service.MultiService, log.PrefixingLogMixin): |
---|
1841 | """I am a service that allows file uploading. I am a service-child of the |
---|
1842 | Client. |
---|
1843 | """ |
---|
1844 | # The type in Twisted for services is wrong in 22.10... |
---|
1845 | # https://github.com/twisted/twisted/issues/10135 |
---|
1846 | name = "uploader" # type: ignore[assignment] |
---|
1847 | URI_LIT_SIZE_THRESHOLD = 55 |
---|
1848 | |
---|
1849 | def __init__(self, helper_furl=None, stats_provider=None, history=None): |
---|
1850 | self._helper_furl = helper_furl |
---|
1851 | self.stats_provider = stats_provider |
---|
1852 | self._history = history |
---|
1853 | self._helper = None |
---|
1854 | self._all_uploads = weakref.WeakKeyDictionary() # for debugging |
---|
1855 | log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload") |
---|
1856 | service.MultiService.__init__(self) |
---|
1857 | |
---|
1858 | def startService(self): |
---|
1859 | service.MultiService.startService(self) |
---|
1860 | if self._helper_furl: |
---|
1861 | self.parent.tub.connectTo(ensure_str(self._helper_furl), |
---|
1862 | self._got_helper) |
---|
1863 | |
---|
1864 | def _got_helper(self, helper): |
---|
1865 | self.log("got helper connection, getting versions") |
---|
1866 | default = { b"http://allmydata.org/tahoe/protocols/helper/v1" : |
---|
1867 | { }, |
---|
1868 | b"application-version": b"unknown: no get_version()", |
---|
1869 | } |
---|
1870 | d = add_version_to_remote_reference(helper, default) |
---|
1871 | d.addCallback(self._got_versioned_helper) |
---|
1872 | |
---|
1873 | def _got_versioned_helper(self, helper): |
---|
1874 | needed = b"http://allmydata.org/tahoe/protocols/helper/v1" |
---|
1875 | if needed not in helper.version: |
---|
1876 | raise InsufficientVersionError(needed, helper.version) |
---|
1877 | self._helper = helper |
---|
1878 | helper.notifyOnDisconnect(self._lost_helper) |
---|
1879 | |
---|
1880 | def _lost_helper(self): |
---|
1881 | self._helper = None |
---|
1882 | |
---|
1883 | def get_helper_info(self): |
---|
1884 | # return a tuple of (helper_furl_or_None, connected_bool) |
---|
1885 | return (self._helper_furl, bool(self._helper)) |
---|
1886 | |
---|
1887 | |
---|
1888 | def upload(self, uploadable, reactor=None): |
---|
1889 | """ |
---|
1890 | Returns a Deferred that will fire with the UploadResults instance. |
---|
1891 | """ |
---|
1892 | assert self.parent |
---|
1893 | assert self.running |
---|
1894 | |
---|
1895 | uploadable = IUploadable(uploadable) |
---|
1896 | d = uploadable.get_size() |
---|
1897 | def _got_size(size): |
---|
1898 | default_params = self.parent.get_encoding_parameters() |
---|
1899 | precondition(isinstance(default_params, dict), default_params) |
---|
1900 | precondition("max_segment_size" in default_params, default_params) |
---|
1901 | uploadable.set_default_encoding_parameters(default_params) |
---|
1902 | |
---|
1903 | if self.stats_provider: |
---|
1904 | self.stats_provider.count('uploader.files_uploaded', 1) |
---|
1905 | self.stats_provider.count('uploader.bytes_uploaded', size) |
---|
1906 | |
---|
1907 | if size <= self.URI_LIT_SIZE_THRESHOLD: |
---|
1908 | uploader = LiteralUploader() |
---|
1909 | return uploader.start(uploadable) |
---|
1910 | else: |
---|
1911 | eu = EncryptAnUploadable(uploadable, self._parentmsgid) |
---|
1912 | d2 = defer.succeed(None) |
---|
1913 | storage_broker = self.parent.get_storage_broker() |
---|
1914 | if self._helper: |
---|
1915 | uploader = AssistedUploader(self._helper, storage_broker) |
---|
1916 | d2.addCallback(lambda x: eu.get_storage_index()) |
---|
1917 | d2.addCallback(lambda si: uploader.start(eu, si)) |
---|
1918 | else: |
---|
1919 | storage_broker = self.parent.get_storage_broker() |
---|
1920 | secret_holder = self.parent._secret_holder |
---|
1921 | uploader = CHKUploader(storage_broker, secret_holder, reactor=reactor) |
---|
1922 | d2.addCallback(lambda x: uploader.start(eu)) |
---|
1923 | |
---|
1924 | self._all_uploads[uploader] = None |
---|
1925 | if self._history: |
---|
1926 | self._history.add_upload(uploader.get_upload_status()) |
---|
1927 | def turn_verifycap_into_read_cap(uploadresults): |
---|
1928 | # Generate the uri from the verifycap plus the key. |
---|
1929 | d3 = uploadable.get_encryption_key() |
---|
1930 | def put_readcap_into_results(key): |
---|
1931 | v = uri.from_string(uploadresults.get_verifycapstr()) |
---|
1932 | r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size) |
---|
1933 | uploadresults.set_uri(r.to_string()) |
---|
1934 | return uploadresults |
---|
1935 | d3.addCallback(put_readcap_into_results) |
---|
1936 | return d3 |
---|
1937 | d2.addCallback(turn_verifycap_into_read_cap) |
---|
1938 | return d2 |
---|
1939 | d.addCallback(_got_size) |
---|
1940 | def _done(res): |
---|
1941 | uploadable.close() |
---|
1942 | return res |
---|
1943 | d.addBoth(_done) |
---|
1944 | return d |
---|