Thu Jan 8 17:24:38 MST 2009 zooko@zooko.com * immutable: first pass at a Repairer! Except it doesn't work, and is larded with debug print statements In fact, the only reason I am recording this patch is to share it with Brian while I head out to the Boulder Linux User's Group meeting. Perhaps this patch will be unrecorded/amended into a complete implementation with an entirely different patch description before it lands in trunk. diff -rN -u old-trunk/src/allmydata/immutable/filenode.py new-trunk/src/allmydata/immutable/filenode.py --- old-trunk/src/allmydata/immutable/filenode.py 2009-01-08 17:28:11.000000000 -0700 +++ new-trunk/src/allmydata/immutable/filenode.py 2009-01-08 17:28:11.000000000 -0700 @@ -206,7 +206,7 @@ def _gather_repair_results(rr): crr.post_repair_results = rr return crr - r = Repairer(client=self._client, verifycap=verifycap, servers=servers, monitor=monitor) + r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor) d = r.start() d.addCallback(_gather_repair_results) return d diff -rN -u old-trunk/src/allmydata/immutable/repairer.py new-trunk/src/allmydata/immutable/repairer.py --- old-trunk/src/allmydata/immutable/repairer.py 2009-01-08 17:28:11.000000000 -0700 +++ new-trunk/src/allmydata/immutable/repairer.py 2009-01-08 17:28:11.000000000 -0700 @@ -1,33 +1,18 @@ +from zope.interface import implements from twisted.internet import defer from allmydata import storage from allmydata.check_results import CheckerResults, CheckAndRepairResults -from allmydata.immutable import download -from allmydata.util import nummedobj +from allmydata.util import log, observer from allmydata.util.assertutil import precondition from allmydata.uri import CHKFileVerifierURI +from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget +from twisted.internet.interfaces import IConsumer -from allmydata.immutable import layout +from allmydata.immutable import download, layout, upload -import sha, time +import collections -def _permute_servers(servers, key): - return sorted(servers, key=lambda x: sha.new(key+x[0]).digest()) - -class LogMixin(nummedobj.NummedObj): - def __init__(self, client, verifycap): - nummedobj.NummedObj.__init__(self) - self._client = client - self._verifycap = verifycap - self._storageindex = self._verifycap.storage_index - self._log_prefix = prefix = storage.si_b2a(self._storageindex)[:5] - self._parentmsgid = self._client.log("%s(%s): starting" % (self.__repr__(), self._log_prefix)) - - def log(self, msg, parent=None, *args, **kwargs): - if parent is None: - parent = self._parentmsgid - return self._client.log("%s(%s): %s" % (self.__repr__(), self._log_prefix, msg), parent=parent, *args, **kwargs) - -class Repairer(LogMixin): +class Repairer(log.PrefixingLogMixin): """ I generate any shares which were not available and upload them to servers. Which servers? Well, I take the list of servers and if I used the Checker in verify mode @@ -58,109 +43,153 @@ into my constructor whether this task has been cancelled (by invoking its raise_if_cancelled() method). """ - def __init__(self, client, verifycap, servers, monitor): + def __init__(self, client, verifycap, monitor): assert precondition(isinstance(verifycap, CHKFileVerifierURI)) - assert precondition(isinstance(servers, (set, frozenset))) - for (serverid, serverrref) in servers: - assert precondition(isinstance(serverid, str)) - LogMixin.__init__(self, client, verifycap) + logprefix = storage.si_b2a(verifycap.storage_index)[:5] + log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix) + self._client = client + self._verifycap = verifycap self._monitor = monitor - self._servers = servers def start(self): - self.log("starting download") - d = defer.succeed(_permute_servers(self._servers, self._storageindex)) - d.addCallback(self._check_phase) - d.addCallback(self._repair_phase) + self.log("starting repair") + duc = DownUpConnector() + dl = download.CiphertextDownloader(self._client, self._verifycap, target=duc, monitor=self._monitor) + dl.start() + ul = upload.CHKUploader(self._client) + return ul.start(duc) + +class DownUpConnector(log.PrefixingLogMixin): + implements(IEncryptedUploadable, IDownloadTarget, IConsumer) + """ I act like an "encrypted uploadable" -- something that a local uploader can read + ciphertext from in order to upload the ciphertext. However, unbeknownst to the uploader, + I actually download the ciphertext from a CiphertextDownloader instance as it is needed. + + On the other hand, I act like a "download target" -- something that a local downloader can + write ciphertext to as it downloads the ciphertext. That downloader doesn't realize, of + course, that I'm just turning around and giving the ciphertext to the uploader. """ + + # The theory behind this class is nice: just satisfy two separate interfaces. The + # implementation is slightly horrible, because of "impedance mismatch" -- the downloader + # expects to be able to synchronously push data in, and the uploader expects to be able to + # read data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred. The two + # interfaces have different APIs for pausing/unpausing. The uploader requests metadata like + # size and segsize which the downloader provides either eventually or not at all (okay I + # just now extended the downloader to provide segsize). Most of this slightly horrible code + # would disappear if CiphertextDownloader just used this object as an IConsumer (plus maybe + # a couple of other methods) and if the Uploader simply expected to be treated as an + # IConsumer (plus maybe a couple of other things). + + def __init__(self, buflim=2**19): + """ If we're already holding at least buflim bytes, then tell the downloader to pause + until we have less than buflim bytes.""" + log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer") + self.buflim = buflim + self.bufs = collections.deque() # list of strings + self.bufsiz = 0 # how many bytes total in bufs + + self.next_read_ds = collections.deque() # list of deferreds which will fire with the requested ciphertext + self.next_read_lens = collections.deque() # how many bytes of ciphertext were requested by each deferred + + self._size_osol = observer.OneShotObserverList() + self._segsize_osol = observer.OneShotObserverList() + + # once seg size is available, the following attribute will be created to hold it: + + # self.segsize # size of segment (provided by the object which is pushing data into me, + # required by the object which is pulling data out of me) + + # open() will create the following attribute: + # self.size # size of the whole file (provided by the object which is pushing data into + # me, required by the object which is pulling data out of me) + + # set_upload_status() will create the following attribute: + + # self.upload_status # XXX do we need to actually update this? Is anybody watching the + # results during a repair? + + + # methods to satisfy the IConsumer and IDownloadTarget interfaces + # (From the perspective of a downloader I am an IDownloadTarget and an IConsumer.) + def registerProducer(self, producer, streaming): + assert streaming # We know how to handle only streaming producers. + self.producer = producer # the downloader + def unregisterProducer(self): + self.producer = None + def open(self, size): + self.size = size + self._size_osol.fire(self.size) + def set_segsize(self, segsize): + self.segsize = segsize + self._segsize_osol.fire(self.segsize) + def write(self, data): + self.bufs.append(data) + self.bufsiz += len(data) + print "xxx %s.write([%d]); self.bufsiz: %d, len(self.bufs): %d" % (self, len(data), self.bufsiz, len(self.bufs)) + assert bool(self.next_read_ds) == bool(self.next_read_lens) + while self.next_read_ds and self.bufsiz >= self.next_read_lens[0]: + nrd = self.next_read_ds.popleft() + nrl = self.next_read_lens.popleft() + # Pick out the requested number of bytes from self.bufs, turn it into a string, and + # callback the deferred with that. + res = [] + ressize = 0 + while ressize < nrl: + nextbuf = self.bufs.popleft() + res.append(nextbuf) + ressize += len(nextbuf) + if ressize > self.next_read_len: + leftover = ressize - nrl + self.bufs.appendleft(nextbuf[leftover:]) + res[-1] = nextbuf[:leftover] + self.bufsiz -= nrl + print "xxx after rm %d %s.write([%d]); self.bufsiz: %d, len(self.bufs): %d" % (nrl, self, len(data), self.bufsiz, len(self.bufs)) + if self.bufsiz < self.buflim: + self.producer.resumeProducing() + nrd.fire(''.join(res)) + + if self.bufsiz >= self.buflim: + self.producer.pauseProducing() + def finish(self): + return + def close(self): + raise "THIS METHOD NOT HERE YET" + def fail(self, why): + raise "THIS METHOD NOT HERE YET" + def register_canceller(self, cb): + raise "THIS METHOD NOT HERE YET" + + # methods to satisfy the IEncryptedUploader interface + # (From the perspective of an uploader I am an IEncryptedUploadable.) + def set_upload_status(self, upload_status): + self.upload_status = upload_status + def get_size(self): + if hasattr(self, 'size'): # attribute created by self.open() + return defer.succeed(self.size) + else: + return self._size_osol.when_fired() + def get_segsize(self): + if hasattr(self, 'segsize'): # attribute created by self.set_segsize() + return defer.succeed(self.segsize) + else: + return self._segsize_osol.when_fired() + def get_all_encoding_parameters(self): + # We have to learn the segment size from the ValidatedExtendedURIProxy object. + d = self.get_segsize() + def make_params(segsize): + # I don't think anyone cares about "shares of happiness" in these params. + return (self._verifycap.needed_shares, self._verifycap.total_shares, self._verifycap.total_shares, segsize) + d.addCallback(make_params) return d + def read_encrypted(self, length, hash_only): + """ Returns a deferred which eventually fired with the requested ciphertext. """ + print "xxx %s.read_encrypted(%d, %d)" % (self, length, hash_only) + self.next_read_ds.append(defer.Deferred()) + self.next_read_lens.append(length) + def close(self): + print "xxx %s.close()" % (self,) - def _check_phase(self, unused=None): - return unused - - def _repair_phase(self, unused=None): - bogusresults = CheckAndRepairResults(self._storageindex) # XXX THIS REPAIRER NOT HERE YET - bogusresults.pre_repair_results = CheckerResults(self._verifycap, self._storageindex) - bogusresults.pre_repair_results.set_healthy(True) - bogusresults.pre_repair_results.set_needs_rebalancing(False) - bogusresults.post_repair_results = CheckerResults(self._verifycap, self._storageindex) - bogusresults.post_repair_results.set_healthy(True) - bogusresults.post_repair_results.set_needs_rebalancing(False) - bogusdata = {} - bogusdata['count-shares-good'] = "this repairer not here yet" - bogusdata['count-shares-needed'] = "this repairer not here yet" - bogusdata['count-shares-expected'] = "this repairer not here yet" - bogusdata['count-good-share-hosts'] = "this repairer not here yet" - bogusdata['count-corrupt-shares'] = "this repairer not here yet" - bogusdata['count-list-corrupt-shares'] = [] # XXX THIS REPAIRER NOT HERE YET - bogusdata['servers-responding'] = [] # XXX THIS REPAIRER NOT HERE YET - bogusdata['sharemap'] = {} # XXX THIS REPAIRER NOT HERE YET - bogusdata['count-wrong-shares'] = "this repairer not here yet" - bogusdata['count-recoverable-versions'] = "this repairer not here yet" - bogusdata['count-unrecoverable-versions'] = "this repairer not here yet" - bogusresults.pre_repair_results.data.update(bogusdata) - bogusresults.post_repair_results.data.update(bogusdata) - return bogusresults - - def _get_all_shareholders(self, ignored=None): - dl = [] - for (peerid,ss) in self._client.get_permuted_peers("storage", - self._storageindex): - d = ss.callRemote("get_buckets", self._storageindex) - d.addCallbacks(self._got_response, self._got_error, - callbackArgs=(peerid,)) - dl.append(d) - self._responses_received = 0 - self._queries_sent = len(dl) - if self._status: - self._status.set_status("Locating Shares (%d/%d)" % - (self._responses_received, - self._queries_sent)) - return defer.DeferredList(dl) - - def _got_response(self, buckets, peerid): - self._responses_received += 1 - if self._results: - elapsed = time.time() - self._started - self._results.timings["servers_peer_selection"][peerid] = elapsed - if self._status: - self._status.set_status("Locating Shares (%d/%d)" % - (self._responses_received, - self._queries_sent)) - for sharenum, bucket in buckets.iteritems(): - b = layout.ReadBucketProxy(bucket, peerid, self._si_s) - self.add_share_bucket(sharenum, b) - self._uri_extension_sources.append(b) - if self._results: - if peerid not in self._results.servermap: - self._results.servermap[peerid] = set() - self._results.servermap[peerid].add(sharenum) - - def _got_all_shareholders(self, res): - if self._results: - now = time.time() - self._results.timings["peer_selection"] = now - self._started - - if len(self._share_buckets) < self._num_needed_shares: - raise download.NotEnoughSharesError - - def _verify_done(self, ignored): - # TODO: The following results are just stubs, and need to be replaced - # with actual values. These exist to make things like deep-check not - # fail. XXX - self._check_results.set_needs_rebalancing(False) - N = self._total_shares - data = { - "count-shares-good": N, - "count-good-share-hosts": N, - "count-corrupt-shares": 0, - "list-corrupt-shares": [], - "servers-responding": [], - "sharemap": {}, - "count-wrong-shares": 0, - "count-recoverable-versions": 1, - "count-unrecoverable-versions": 0, - } - self._check_results.set_data(data) - return self._check_results + def get_storage_index(self): + raise "THIS METHOD NOT HERE YET"