""" Ported to Python 3. """ import os, stat, time, weakref from zope.interface import implementer from twisted.internet import defer from foolscap.api import Referenceable, DeadReferenceError, eventually import allmydata # for __full_version__ from allmydata import interfaces, uri from allmydata.storage.server import si_b2a from allmydata.immutable import upload from allmydata.immutable.layout import ReadBucketProxy from allmydata.util.assertutil import precondition from allmydata.util import log, observer, fileutil, hashutil, dictutil class NotEnoughWritersError(Exception): pass class CHKCheckerAndUEBFetcher(object): """I check to see if a file is already present in the grid. I also fetch the URI Extension Block, which is useful for an uploading client who wants to avoid the work of encryption and encoding. I return False if the file is not completely healthy: i.e. if there are less than 'N' shares present. If the file is completely healthy, I return a tuple of (sharemap, UEB_data, UEB_hash). A sharemap is a dict with share numbers as keys and sets of server ids (which hold that share) as values. """ def __init__(self, peer_getter, storage_index, logparent): self._peer_getter = peer_getter self._found_shares = set() self._storage_index = storage_index self._sharemap = dictutil.DictOfSets() self._readers = set() self._ueb_hash = None self._ueb_data = None self._logparent = logparent def log(self, *args, **kwargs): if 'facility' not in kwargs: kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch" if 'parent' not in kwargs: kwargs['parent'] = self._logparent return log.msg(*args, **kwargs) def check(self): """ :return Deferred[bool|(DictOfSets, dict, bytes)]: If no share can be found with a usable UEB block or fewer than N shares can be found then the Deferred fires with ``False``. Otherwise, it fires with a tuple of the sharemap, the UEB data, and the UEB hash. """ d = self._get_all_shareholders(self._storage_index) d.addCallback(self._get_uri_extension) d.addCallback(self._done) return d def _get_all_shareholders(self, storage_index): dl = [] for s in self._peer_getter(storage_index): d = s.get_storage_server().get_buckets(storage_index) d.addCallbacks(self._got_response, self._got_error, callbackArgs=(s,)) dl.append(d) return defer.DeferredList(dl) def _got_response(self, buckets, server): # buckets is a dict: maps shum to an rref of the server who holds it shnums_s = ",".join([str(shnum) for shnum in buckets]) self.log("got_response: [%r] has %d shares (%s)" % (server.get_name(), len(buckets), shnums_s), level=log.NOISY) self._found_shares.update(buckets.keys()) for k in buckets: self._sharemap.add(k, server.get_serverid()) self._readers.update( [ (bucket, server) for bucket in buckets.values() ] ) def _got_error(self, f): if f.check(DeadReferenceError): return log.err(f, parent=self._logparent) def _get_uri_extension(self, res): # assume that we can pull the UEB from any share. If we get an error, # declare the whole file unavailable. if not self._readers: self.log("no readers, so no UEB", level=log.NOISY) return b,server = self._readers.pop() rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index)) d = rbp.get_uri_extension() d.addCallback(self._got_uri_extension) d.addErrback(self._ueb_error) return d def _got_uri_extension(self, ueb): self.log("_got_uri_extension", level=log.NOISY) self._ueb_hash = hashutil.uri_extension_hash(ueb) self._ueb_data = uri.unpack_extension(ueb) def _ueb_error(self, f): # an error means the file is unavailable, but the overall check # shouldn't fail. self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg") return None def _done(self, res): if self._ueb_data: found = len(self._found_shares) total = self._ueb_data['total_shares'] self.log(format="got %(found)d shares of %(total)d", found=found, total=total, level=log.NOISY) if found < total: # not all shares are present in the grid self.log("not enough to qualify, file not found in grid", level=log.NOISY) return False # all shares are present self.log("all shares present, file is found in grid", level=log.NOISY) return (self._sharemap, self._ueb_data, self._ueb_hash) # no shares are present self.log("unable to find UEB data, file not found in grid", level=log.NOISY) return False @implementer(interfaces.RICHKUploadHelper) class CHKUploadHelper(Referenceable, upload.CHKUploader): # type: ignore # warner/foolscap#78 """I am the helper-server -side counterpart to AssistedUploader. I handle peer selection, encoding, and share pushing. I read ciphertext from the remote AssistedUploader. """ VERSION = { b"http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" : { }, b"application-version": allmydata.__full_version__.encode("utf-8"), } def __init__(self, storage_index, helper, storage_broker, secret_holder, incoming_file, encoding_file, log_number): upload.CHKUploader.__init__(self, storage_broker, secret_holder) self._storage_index = storage_index self._helper = helper self._incoming_file = incoming_file self._encoding_file = encoding_file self._upload_id = si_b2a(storage_index)[:5] self._log_number = log_number self._upload_status = upload.UploadStatus() self._upload_status.set_helper(False) self._upload_status.set_storage_index(storage_index) self._upload_status.set_status("fetching ciphertext") self._upload_status.set_progress(0, 1.0) self._helper.log("CHKUploadHelper starting for SI %r" % self._upload_id, parent=log_number) self._storage_broker = storage_broker self._secret_holder = secret_holder self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file, self._log_number) self._reader = LocalCiphertextReader(self, storage_index, encoding_file) self._finished_observers = observer.OneShotObserverList() self._started = time.time() d = self._fetcher.when_done() d.addCallback(lambda res: self._reader.start()) d.addCallback(lambda res: self.start_encrypted(self._reader)) d.addCallback(self._finished) d.addErrback(self._failed) def log(self, *args, **kwargs): if 'facility' not in kwargs: kwargs['facility'] = "tahoe.helper.chk" return upload.CHKUploader.log(self, *args, **kwargs) def remote_get_version(self): return self.VERSION def remote_upload(self, reader): # reader is an RIEncryptedUploadable. I am specified to return an # UploadResults dictionary. # Log how much ciphertext we need to get. self.log("deciding whether to upload the file or not", level=log.NOISY) if os.path.exists(self._encoding_file): # we have the whole file, and we might be encoding it (or the # encode/upload might have failed, and we need to restart it). self.log("ciphertext already in place", level=log.UNUSUAL) elif os.path.exists(self._incoming_file): # we have some of the file, but not all of it (otherwise we'd be # encoding). The caller might be useful. self.log("partial ciphertext already present", level=log.UNUSUAL) else: # we don't remember uploading this file self.log("no ciphertext yet", level=log.NOISY) # let our fetcher pull ciphertext from the reader. self._fetcher.add_reader(reader) # and also hashes self._reader.add_reader(reader) # and inform the client when the upload has finished return self._finished_observers.when_fired() def _finished(self, ur): assert interfaces.IUploadResults.providedBy(ur), ur vcapstr = ur.get_verifycapstr() precondition(isinstance(vcapstr, bytes), vcapstr) v = uri.from_string(vcapstr) f_times = self._fetcher.get_times() hur = upload.HelperUploadResults() hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"], "total_fetch": f_times["total"], } for key,val in ur.get_timings().items(): hur.timings[key] = val hur.uri_extension_hash = v.uri_extension_hash hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched() hur.preexisting_shares = ur.get_preexisting_shares() # hur.sharemap needs to be {shnum: set(serverid)} hur.sharemap = {} for shnum, servers in ur.get_sharemap().items(): hur.sharemap[shnum] = set([s.get_serverid() for s in servers]) # and hur.servermap needs to be {serverid: set(shnum)} hur.servermap = {} for server, shnums in ur.get_servermap().items(): hur.servermap[server.get_serverid()] = set(shnums) hur.pushed_shares = ur.get_pushed_shares() hur.file_size = ur.get_file_size() hur.uri_extension_data = ur.get_uri_extension_data() hur.verifycapstr = vcapstr self._reader.close() os.unlink(self._encoding_file) self._finished_observers.fire(hur) self._helper.upload_finished(self._storage_index, v.size) del self._reader def _failed(self, f): self.log(format="CHKUploadHelper(%(si)s) failed", si=si_b2a(self._storage_index)[:5], failure=f, level=log.UNUSUAL) self._finished_observers.fire(f) self._helper.upload_finished(self._storage_index, 0) del self._reader class AskUntilSuccessMixin(object): # create me with a _reader array _last_failure = None def add_reader(self, reader): self._readers.append(reader) def call(self, *args, **kwargs): if not self._readers: raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure) rr = self._readers[0] d = rr.callRemote(*args, **kwargs) def _err(f): self._last_failure = f if rr in self._readers: self._readers.remove(rr) self._upload_helper.log("call to assisted uploader %s failed" % rr, failure=f, level=log.UNUSUAL) # we can try again with someone else who's left return self.call(*args, **kwargs) d.addErrback(_err) return d class CHKCiphertextFetcher(AskUntilSuccessMixin): """I use one or more remote RIEncryptedUploadable instances to gather ciphertext on disk. When I'm done, the file I create can be used by a LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload process. I begin pulling ciphertext as soon as a reader is added. I remove readers when they have any sort of error. If the last reader is removed, I fire my when_done() Deferred with a failure. I fire my when_done() Deferred (with None) immediately after I have moved the ciphertext to 'encoded_file'. """ def __init__(self, helper, incoming_file, encoded_file, logparent): self._upload_helper = helper self._incoming_file = incoming_file self._encoding_file = encoded_file self._upload_id = helper._upload_id self._log_parent = logparent self._done_observers = observer.OneShotObserverList() self._readers = [] self._started = False self._f = None self._times = { "cumulative_fetch": 0.0, "total": 0.0, } self._ciphertext_fetched = 0 def log(self, *args, **kwargs): if "facility" not in kwargs: kwargs["facility"] = "tahoe.helper.chkupload.fetch" if "parent" not in kwargs: kwargs["parent"] = self._log_parent return log.msg(*args, **kwargs) def add_reader(self, reader): AskUntilSuccessMixin.add_reader(self, reader) eventually(self._start) def _start(self): if self._started: return self._started = True started = time.time() if os.path.exists(self._encoding_file): self.log("ciphertext already present, bypassing fetch", level=log.UNUSUAL) d = defer.succeed(None) else: # first, find out how large the file is going to be d = self.call("get_size") d.addCallback(self._got_size) d.addCallback(self._start_reading) d.addCallback(self._done) d.addCallback(self._done2, started) d.addErrback(self._failed) def _got_size(self, size): self.log("total size is %d bytes" % size, level=log.NOISY) self._upload_helper._upload_status.set_size(size) self._expected_size = size def _start_reading(self, res): # then find out how much crypttext we have on disk if os.path.exists(self._incoming_file): self._have = os.stat(self._incoming_file)[stat.ST_SIZE] self._upload_helper._helper.count("chk_upload_helper.resumes") self.log("we already have %d bytes" % self._have, level=log.NOISY) else: self._have = 0 self.log("we do not have any ciphertext yet", level=log.NOISY) self.log("starting ciphertext fetch", level=log.NOISY) self._f = open(self._incoming_file, "ab") # now loop to pull the data from the readers d = defer.Deferred() self._loop(d) # this Deferred will be fired once the last byte has been written to # self._f return d # read data in 50kB chunks. We should choose a more considered number # here, possibly letting the client specify it. The goal should be to # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce # the upload bandwidth lost because this protocol is non-windowing. Too # large, however, means more memory consumption for both ends. Something # that can be transferred in, say, 10 seconds sounds about right. On my # home DSL line (50kBps upstream), that suggests 500kB. Most lines are # slower, maybe 10kBps, which suggests 100kB, and that's a bit more # memory than I want to hang on to, so I'm going to go with 50kB and see # how that works. CHUNK_SIZE = 50*1024 def _loop(self, fire_when_done): # this slightly weird structure is needed because Deferreds don't do # tail-recursion, so it is important to let each one retire promptly. # Simply chaining them will cause a stack overflow at the end of a # transfer that involves more than a few hundred chunks. # 'fire_when_done' lives a long time, but the Deferreds returned by # the inner _fetch() call do not. start = time.time() d = defer.maybeDeferred(self._fetch) def _done(finished): elapsed = time.time() - start self._times["cumulative_fetch"] += elapsed if finished: self.log("finished reading ciphertext", level=log.NOISY) fire_when_done.callback(None) else: self._loop(fire_when_done) def _err(f): self.log(format="[%(si)s] ciphertext read failed", si=self._upload_id, failure=f, level=log.UNUSUAL) fire_when_done.errback(f) d.addCallbacks(_done, _err) return None def _fetch(self): needed = self._expected_size - self._have fetch_size = min(needed, self.CHUNK_SIZE) if fetch_size == 0: self._upload_helper._upload_status.set_progress(1, 1.0) return True # all done percent = 0.0 if self._expected_size: percent = 1.0 * (self._have+fetch_size) / self._expected_size self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)", si=self._upload_id, start=self._have, end=self._have+fetch_size, total=self._expected_size, percent=int(100.0*percent), level=log.NOISY) d = self.call("read_encrypted", self._have, fetch_size) def _got_data(ciphertext_v): for data in ciphertext_v: self._f.write(data) self._have += len(data) self._ciphertext_fetched += len(data) self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data)) self._upload_helper._upload_status.set_progress(1, percent) return False # not done d.addCallback(_got_data) return d def _done(self, res): self._f.close() self._f = None self.log(format="done fetching ciphertext, size=%(size)d", size=os.stat(self._incoming_file)[stat.ST_SIZE], level=log.NOISY) os.rename(self._incoming_file, self._encoding_file) def _done2(self, _ignored, started): self.log("done2", level=log.NOISY) elapsed = time.time() - started self._times["total"] = elapsed self._readers = [] self._done_observers.fire(None) def _failed(self, f): if self._f: self._f.close() self._readers = [] self._done_observers.fire(f) def when_done(self): return self._done_observers.when_fired() def get_times(self): return self._times def get_ciphertext_fetched(self): return self._ciphertext_fetched @implementer(interfaces.IEncryptedUploadable) class LocalCiphertextReader(AskUntilSuccessMixin): def __init__(self, upload_helper, storage_index, encoding_file): self._readers = [] self._upload_helper = upload_helper self._storage_index = storage_index self._encoding_file = encoding_file self._status = None def start(self): self._upload_helper._upload_status.set_status("pushing") self._size = os.stat(self._encoding_file)[stat.ST_SIZE] self.f = open(self._encoding_file, "rb") def get_size(self): return defer.succeed(self._size) def get_all_encoding_parameters(self): return self.call("get_all_encoding_parameters") def get_storage_index(self): return defer.succeed(self._storage_index) def read_encrypted(self, length, hash_only): assert hash_only is False d = defer.maybeDeferred(self.f.read, length) d.addCallback(lambda data: [data]) return d def close(self): self.f.close() # ??. I'm not sure if it makes sense to forward the close message. return self.call("close") # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3561 def set_upload_status(self, upload_status): raise NotImplementedError @implementer(interfaces.RIHelper, interfaces.IStatsProducer) class Helper(Referenceable): # type: ignore # warner/foolscap#78 """ :ivar dict[bytes, CHKUploadHelper] _active_uploads: For any uploads which have been started but not finished, a mapping from storage index to the upload helper. :ivar chk_checker: A callable which returns an object like a CHKCheckerAndUEBFetcher instance which can check CHK shares. Primarily for the convenience of tests to override. :ivar chk_upload: A callable which returns an object like a CHKUploadHelper instance which can upload CHK shares. Primarily for the convenience of tests to override. """ # this is the non-distributed version. When we need to have multiple # helpers, this object will become the HelperCoordinator, and will query # the farm of Helpers to see if anyone has the storage_index of interest, # and send the request off to them. If nobody has it, we'll choose a # helper at random. name = "helper" VERSION = { b"http://allmydata.org/tahoe/protocols/helper/v1" : { }, b"application-version": allmydata.__full_version__.encode("utf-8"), } MAX_UPLOAD_STATUSES = 10 chk_checker = CHKCheckerAndUEBFetcher chk_upload = CHKUploadHelper def __init__(self, basedir, storage_broker, secret_holder, stats_provider, history): self._basedir = basedir self._storage_broker = storage_broker self._secret_holder = secret_holder self._chk_incoming = os.path.join(basedir, "CHK_incoming") self._chk_encoding = os.path.join(basedir, "CHK_encoding") fileutil.make_dirs(self._chk_incoming) fileutil.make_dirs(self._chk_encoding) self._active_uploads = {} self._all_uploads = weakref.WeakKeyDictionary() # for debugging self.stats_provider = stats_provider if stats_provider: stats_provider.register_producer(self) self._counters = {"chk_upload_helper.upload_requests": 0, "chk_upload_helper.upload_already_present": 0, "chk_upload_helper.upload_need_upload": 0, "chk_upload_helper.resumes": 0, "chk_upload_helper.fetched_bytes": 0, "chk_upload_helper.encoded_bytes": 0, } self._history = history def log(self, *args, **kwargs): if 'facility' not in kwargs: kwargs['facility'] = "tahoe.helper" return log.msg(*args, **kwargs) def count(self, key, value=1): if self.stats_provider: self.stats_provider.count(key, value) self._counters[key] += value def get_stats(self): OLD = 86400*2 # 48hours now = time.time() inc_count = inc_size = inc_size_old = 0 enc_count = enc_size = enc_size_old = 0 inc = os.listdir(self._chk_incoming) enc = os.listdir(self._chk_encoding) for f in inc: s = os.stat(os.path.join(self._chk_incoming, f)) size = s[stat.ST_SIZE] mtime = s[stat.ST_MTIME] inc_count += 1 inc_size += size if now - mtime > OLD: inc_size_old += size for f in enc: s = os.stat(os.path.join(self._chk_encoding, f)) size = s[stat.ST_SIZE] mtime = s[stat.ST_MTIME] enc_count += 1 enc_size += size if now - mtime > OLD: enc_size_old += size stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads), 'chk_upload_helper.incoming_count': inc_count, 'chk_upload_helper.incoming_size': inc_size, 'chk_upload_helper.incoming_size_old': inc_size_old, 'chk_upload_helper.encoding_count': enc_count, 'chk_upload_helper.encoding_size': enc_size, 'chk_upload_helper.encoding_size_old': enc_size_old, } stats.update(self._counters) return stats def remote_get_version(self): return self.VERSION def remote_upload_chk(self, storage_index): """ See ``RIHelper.upload_chk`` """ self.count("chk_upload_helper.upload_requests") lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_b2a(storage_index)) if storage_index in self._active_uploads: self.log("upload is currently active", parent=lp) uh = self._active_uploads[storage_index] return (None, uh) d = self._check_chk(storage_index, lp) d.addCallback(self._did_chk_check, storage_index, lp) def _err(f): self.log("error while checking for chk-already-in-grid", failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg") return f d.addErrback(_err) return d def _check_chk(self, storage_index, lp): # see if this file is already in the grid lp2 = self.log("doing a quick check+UEBfetch", parent=lp, level=log.NOISY) sb = self._storage_broker c = self.chk_checker(sb.get_servers_for_psi, storage_index, lp2) d = c.check() def _checked(res): if res: (sharemap, ueb_data, ueb_hash) = res self.log("found file in grid", level=log.NOISY, parent=lp) hur = upload.HelperUploadResults() hur.uri_extension_hash = ueb_hash hur.sharemap = sharemap hur.uri_extension_data = ueb_data hur.preexisting_shares = len(sharemap) hur.pushed_shares = 0 return hur return None d.addCallback(_checked) return d def _did_chk_check(self, already_present, storage_index, lp): if already_present: # the necessary results are placed in the UploadResults self.count("chk_upload_helper.upload_already_present") self.log("file already found in grid", parent=lp) return (already_present, None) self.count("chk_upload_helper.upload_need_upload") # the file is not present in the grid, by which we mean there are # less than 'N' shares available. self.log("unable to find file in the grid", parent=lp, level=log.NOISY) # We need an upload helper. Check our active uploads again in # case there was a race. if storage_index in self._active_uploads: self.log("upload is currently active", parent=lp) uh = self._active_uploads[storage_index] else: self.log("creating new upload helper", parent=lp) uh = self._make_chk_upload_helper(storage_index, lp) self._active_uploads[storage_index] = uh self._add_upload(uh) return (None, uh) def _make_chk_upload_helper(self, storage_index, lp): si_s = si_b2a(storage_index).decode('ascii') incoming_file = os.path.join(self._chk_incoming, si_s) encoding_file = os.path.join(self._chk_encoding, si_s) uh = self.chk_upload( storage_index, self, self._storage_broker, self._secret_holder, incoming_file, encoding_file, lp, ) return uh def _add_upload(self, uh): self._all_uploads[uh] = None if self._history: s = uh.get_upload_status() self._history.notify_helper_upload(s) def upload_finished(self, storage_index, size): # this is called with size=0 if the upload failed self.count("chk_upload_helper.encoded_bytes", size) uh = self._active_uploads[storage_index] del self._active_uploads[storage_index] s = uh.get_upload_status() s.set_active(False)