source file: /home/buildslave/tahoe/edgy/build/src/allmydata/immutable/offloaded.py
file stats: 456 lines, 439 executed: 96.3% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. 
    2. import os, stat, time, weakref
    3. from zope.interface import implements
    4. from twisted.internet import defer
    5. from foolscap.api import Referenceable, DeadReferenceError, eventually
    6. import allmydata # for __full_version__
    7. from allmydata import interfaces, uri
    8. from allmydata.storage.server import si_b2a
    9. from allmydata.immutable import upload
   10. from allmydata.immutable.layout import ReadBucketProxy
   11. from allmydata.util.assertutil import precondition
   12. from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
   13. 
   14. 
   15. class NotEnoughWritersError(Exception):
   16.     pass
   17. 
   18. 
   19. class CHKCheckerAndUEBFetcher:
   20.     """I check to see if a file is already present in the grid. I also fetch
   21.     the URI Extension Block, which is useful for an uploading client who
   22.     wants to avoid the work of encryption and encoding.
   23. 
   24.     I return False if the file is not completely healthy: i.e. if there are
   25.     less than 'N' shares present.
   26. 
   27.     If the file is completely healthy, I return a tuple of (sharemap,
   28.     UEB_data, UEB_hash).
   29.     """
   30. 
   31.     def __init__(self, peer_getter, storage_index, logparent=None):
   32.         self._peer_getter = peer_getter
   33.         self._found_shares = set()
   34.         self._storage_index = storage_index
   35.         self._sharemap = dictutil.DictOfSets()
   36.         self._readers = set()
   37.         self._ueb_hash = None
   38.         self._ueb_data = None
   39.         self._logparent = logparent
   40. 
   41.     def log(self, *args, **kwargs):
   42.         if 'facility' not in kwargs:
   43.             kwargs['facility'] = "tahoe.helper.chk.checkandUEBfetch"
   44.         if 'parent' not in kwargs:
   45.             kwargs['parent'] = self._logparent
   46.         return log.msg(*args, **kwargs)
   47. 
   48.     def check(self):
   49.         d = self._get_all_shareholders(self._storage_index)
   50.         d.addCallback(self._get_uri_extension)
   51.         d.addCallback(self._done)
   52.         return d
   53. 
   54.     def _get_all_shareholders(self, storage_index):
   55.         dl = []
   56.         for (peerid, ss) in self._peer_getter(storage_index):
   57.             d = ss.callRemote("get_buckets", storage_index)
   58.             d.addCallbacks(self._got_response, self._got_error,
   59.                            callbackArgs=(peerid,))
   60.             dl.append(d)
   61.         return defer.DeferredList(dl)
   62. 
   63.     def _got_response(self, buckets, peerid):
   64.         # buckets is a dict: maps shum to an rref of the server who holds it
   65.         shnums_s = ",".join([str(shnum) for shnum in buckets])
   66.         self.log("got_response: [%s] has %d shares (%s)" %
   67.                  (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
   68.                  level=log.NOISY)
   69.         self._found_shares.update(buckets.keys())
   70.         for k in buckets:
   71.             self._sharemap.add(k, peerid)
   72.         self._readers.update( [ (bucket, peerid)
   73.                                 for bucket in buckets.values() ] )
   74. 
   75.     def _got_error(self, f):
   76.         if f.check(DeadReferenceError):
   77.             return
   78.         log.err(f, parent=self._logparent)
   79.         pass
   80. 
   81.     def _get_uri_extension(self, res):
   82.         # assume that we can pull the UEB from any share. If we get an error,
   83.         # declare the whole file unavailable.
   84.         if not self._readers:
   85.             self.log("no readers, so no UEB", level=log.NOISY)
   86.             return
   87.         b,peerid = self._readers.pop()
   88.         rbp = ReadBucketProxy(b, peerid, si_b2a(self._storage_index))
   89.         d = rbp.get_uri_extension()
   90.         d.addCallback(self._got_uri_extension)
   91.         d.addErrback(self._ueb_error)
   92.         return d
   93. 
   94.     def _got_uri_extension(self, ueb):
   95.         self.log("_got_uri_extension", level=log.NOISY)
   96.         self._ueb_hash = hashutil.uri_extension_hash(ueb)
   97.         self._ueb_data = uri.unpack_extension(ueb)
   98. 
   99.     def _ueb_error(self, f):
  100.         # an error means the file is unavailable, but the overall check
  101.         # shouldn't fail.
  102.         self.log("UEB fetch failed", failure=f, level=log.WEIRD, umid="sJLKVg")
  103.         return None
  104. 
  105.     def _done(self, res):
  106.         if self._ueb_data:
  107.             found = len(self._found_shares)
  108.             total = self._ueb_data['total_shares']
  109.             self.log(format="got %(found)d shares of %(total)d",
  110.                      found=found, total=total, level=log.NOISY)
  111.             if found < total:
  112.                 # not all shares are present in the grid
  113.                 self.log("not enough to qualify, file not found in grid",
  114.                          level=log.NOISY)
  115.                 return False
  116.             # all shares are present
  117.             self.log("all shares present, file is found in grid",
  118.                      level=log.NOISY)
  119.             return (self._sharemap, self._ueb_data, self._ueb_hash)
  120.         # no shares are present
  121.         self.log("unable to find UEB data, file not found in grid",
  122.                  level=log.NOISY)
  123.         return False
  124. 
  125. 
  126. class CHKUploadHelper(Referenceable, upload.CHKUploader):
  127.     """I am the helper-server -side counterpart to AssistedUploader. I handle
  128.     peer selection, encoding, and share pushing. I read ciphertext from the
  129.     remote AssistedUploader.
  130.     """
  131.     implements(interfaces.RICHKUploadHelper)
  132.     VERSION = { "http://allmydata.org/tahoe/protocols/helper/chk-upload/v1" :
  133.                  { },
  134.                 "application-version": str(allmydata.__full_version__),
  135.                 }
  136. 
  137.     def __init__(self, storage_index,
  138.                  helper, storage_broker, secret_holder,
  139.                  incoming_file, encoding_file,
  140.                  results, log_number):
  141.         self._storage_index = storage_index
  142.         self._helper = helper
  143.         self._incoming_file = incoming_file
  144.         self._encoding_file = encoding_file
  145.         self._upload_id = si_b2a(storage_index)[:5]
  146.         self._log_number = log_number
  147.         self._results = results
  148.         self._upload_status = upload.UploadStatus()
  149.         self._upload_status.set_helper(False)
  150.         self._upload_status.set_storage_index(storage_index)
  151.         self._upload_status.set_status("fetching ciphertext")
  152.         self._upload_status.set_progress(0, 1.0)
  153.         self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
  154.                          parent=log_number)
  155. 
  156.         self._storage_broker = storage_broker
  157.         self._secret_holder = secret_holder
  158.         self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
  159.                                              self._log_number)
  160.         self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
  161.         self._finished_observers = observer.OneShotObserverList()
  162. 
  163.         d = self._fetcher.when_done()
  164.         d.addCallback(lambda res: self._reader.start())
  165.         d.addCallback(lambda res: self.start_encrypted(self._reader))
  166.         d.addCallback(self._finished)
  167.         d.addErrback(self._failed)
  168. 
  169.     def log(self, *args, **kwargs):
  170.         if 'facility' not in kwargs:
  171.             kwargs['facility'] = "tahoe.helper.chk"
  172.         return upload.CHKUploader.log(self, *args, **kwargs)
  173. 
  174.     def start(self):
  175.         self._started = time.time()
  176.         # determine if we need to upload the file. If so, return ({},self) .
  177.         # If not, return (UploadResults,None) .
  178.         self.log("deciding whether to upload the file or not", level=log.NOISY)
  179.         if os.path.exists(self._encoding_file):
  180.             # we have the whole file, and we might be encoding it (or the
  181.             # encode/upload might have failed, and we need to restart it).
  182.             self.log("ciphertext already in place", level=log.UNUSUAL)
  183.             return (self._results, self)
  184.         if os.path.exists(self._incoming_file):
  185.             # we have some of the file, but not all of it (otherwise we'd be
  186.             # encoding). The caller might be useful.
  187.             self.log("partial ciphertext already present", level=log.UNUSUAL)
  188.             return (self._results, self)
  189.         # we don't remember uploading this file
  190.         self.log("no ciphertext yet", level=log.NOISY)
  191.         return (self._results, self)
  192. 
  193.     def remote_get_version(self):
  194.         return self.VERSION
  195. 
  196.     def remote_upload(self, reader):
  197.         # reader is an RIEncryptedUploadable. I am specified to return an
  198.         # UploadResults dictionary.
  199. 
  200.         # let our fetcher pull ciphertext from the reader.
  201.         self._fetcher.add_reader(reader)
  202.         # and also hashes
  203.         self._reader.add_reader(reader)
  204. 
  205.         # and inform the client when the upload has finished
  206.         return self._finished_observers.when_fired()
  207. 
  208.     def _finished(self, uploadresults):
  209.         precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
  210.         assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
  211.         r = uploadresults
  212.         v = uri.from_string(r.verifycapstr)
  213.         r.uri_extension_hash = v.uri_extension_hash
  214.         f_times = self._fetcher.get_times()
  215.         r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
  216.         r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
  217.         r.timings["total_fetch"] = f_times["total"]
  218.         self._reader.close()
  219.         os.unlink(self._encoding_file)
  220.         self._finished_observers.fire(r)
  221.         self._helper.upload_finished(self._storage_index, v.size)
  222.         del self._reader
  223. 
  224.     def _failed(self, f):
  225.         self.log(format="CHKUploadHelper(%(si)s) failed",
  226.                  si=si_b2a(self._storage_index)[:5],
  227.                  failure=f,
  228.                  level=log.UNUSUAL)
  229.         self._finished_observers.fire(f)
  230.         self._helper.upload_finished(self._storage_index, 0)
  231.         del self._reader
  232. 
  233. class AskUntilSuccessMixin:
  234.     # create me with a _reader array
  235.     _last_failure = None
  236. 
  237.     def add_reader(self, reader):
  238.         self._readers.append(reader)
  239. 
  240.     def call(self, *args, **kwargs):
  241.         if not self._readers:
  242.             raise NotEnoughWritersError("ran out of assisted uploaders, last failure was %s" % self._last_failure)
  243.         rr = self._readers[0]
  244.         d = rr.callRemote(*args, **kwargs)
  245.         def _err(f):
  246.             self._last_failure = f
  247.             if rr in self._readers:
  248.                 self._readers.remove(rr)
  249.             self._upload_helper.log("call to assisted uploader %s failed" % rr,
  250.                                     failure=f, level=log.UNUSUAL)
  251.             # we can try again with someone else who's left
  252.             return self.call(*args, **kwargs)
  253.         d.addErrback(_err)
  254.         return d
  255. 
  256. class CHKCiphertextFetcher(AskUntilSuccessMixin):
  257.     """I use one or more remote RIEncryptedUploadable instances to gather
  258.     ciphertext on disk. When I'm done, the file I create can be used by a
  259.     LocalCiphertextReader to satisfy the ciphertext needs of a CHK upload
  260.     process.
  261. 
  262.     I begin pulling ciphertext as soon as a reader is added. I remove readers
  263.     when they have any sort of error. If the last reader is removed, I fire
  264.     my when_done() Deferred with a failure.
  265. 
  266.     I fire my when_done() Deferred (with None) immediately after I have moved
  267.     the ciphertext to 'encoded_file'.
  268.     """
  269. 
  270.     def __init__(self, helper, incoming_file, encoded_file, logparent):
  271.         self._upload_helper = helper
  272.         self._incoming_file = incoming_file
  273.         self._encoding_file = encoded_file
  274.         self._upload_id = helper._upload_id
  275.         self._log_parent = logparent
  276.         self._done_observers = observer.OneShotObserverList()
  277.         self._readers = []
  278.         self._started = False
  279.         self._f = None
  280.         self._times = {
  281.             "cumulative_fetch": 0.0,
  282.             "total": 0.0,
  283.             }
  284.         self._ciphertext_fetched = 0
  285. 
  286.     def log(self, *args, **kwargs):
  287.         if "facility" not in kwargs:
  288.             kwargs["facility"] = "tahoe.helper.chkupload.fetch"
  289.         if "parent" not in kwargs:
  290.             kwargs["parent"] = self._log_parent
  291.         return log.msg(*args, **kwargs)
  292. 
  293.     def add_reader(self, reader):
  294.         AskUntilSuccessMixin.add_reader(self, reader)
  295.         eventually(self._start)
  296. 
  297.     def _start(self):
  298.         if self._started:
  299.             return
  300.         self._started = True
  301.         started = time.time()
  302. 
  303.         if os.path.exists(self._encoding_file):
  304.             self.log("ciphertext already present, bypassing fetch",
  305.                      level=log.UNUSUAL)
  306.             # we'll still need the plaintext hashes (when
  307.             # LocalCiphertextReader.get_plaintext_hashtree_leaves() is
  308.             # called), and currently the easiest way to get them is to ask
  309.             # the sender for the last byte of ciphertext. That will provoke
  310.             # them into reading and hashing (but not sending) everything
  311.             # else.
  312.             have = os.stat(self._encoding_file)[stat.ST_SIZE]
  313.             d = self.call("read_encrypted", have-1, 1)
  314.             d.addCallback(self._done2, started)
  315.             return
  316. 
  317.         # first, find out how large the file is going to be
  318.         d = self.call("get_size")
  319.         d.addCallback(self._got_size)
  320.         d.addCallback(self._start_reading)
  321.         d.addCallback(self._done)
  322.         d.addCallback(self._done2, started)
  323.         d.addErrback(self._failed)
  324. 
  325.     def _got_size(self, size):
  326.         self.log("total size is %d bytes" % size, level=log.NOISY)
  327.         self._upload_helper._upload_status.set_size(size)
  328.         self._expected_size = size
  329. 
  330.     def _start_reading(self, res):
  331.         # then find out how much crypttext we have on disk
  332.         if os.path.exists(self._incoming_file):
  333.             self._have = os.stat(self._incoming_file)[stat.ST_SIZE]
  334.             self._upload_helper._helper.count("chk_upload_helper.resumes")
  335.             self.log("we already have %d bytes" % self._have, level=log.NOISY)
  336.         else:
  337.             self._have = 0
  338.             self.log("we do not have any ciphertext yet", level=log.NOISY)
  339.         self.log("starting ciphertext fetch", level=log.NOISY)
  340.         self._f = open(self._incoming_file, "ab")
  341. 
  342.         # now loop to pull the data from the readers
  343.         d = defer.Deferred()
  344.         self._loop(d)
  345.         # this Deferred will be fired once the last byte has been written to
  346.         # self._f
  347.         return d
  348. 
  349.     # read data in 50kB chunks. We should choose a more considered number
  350.     # here, possibly letting the client specify it. The goal should be to
  351.     # keep the RTT*bandwidth to be less than 10% of the chunk size, to reduce
  352.     # the upload bandwidth lost because this protocol is non-windowing. Too
  353.     # large, however, means more memory consumption for both ends. Something
  354.     # that can be transferred in, say, 10 seconds sounds about right. On my
  355.     # home DSL line (50kBps upstream), that suggests 500kB. Most lines are
  356.     # slower, maybe 10kBps, which suggests 100kB, and that's a bit more
  357.     # memory than I want to hang on to, so I'm going to go with 50kB and see
  358.     # how that works.
  359.     CHUNK_SIZE = 50*1024
  360. 
  361.     def _loop(self, fire_when_done):
  362.         # this slightly weird structure is needed because Deferreds don't do
  363.         # tail-recursion, so it is important to let each one retire promptly.
  364.         # Simply chaining them will cause a stack overflow at the end of a
  365.         # transfer that involves more than a few hundred chunks.
  366.         # 'fire_when_done' lives a long time, but the Deferreds returned by
  367.         # the inner _fetch() call do not.
  368.         start = time.time()
  369.         d = defer.maybeDeferred(self._fetch)
  370.         def _done(finished):
  371.             elapsed = time.time() - start
  372.             self._times["cumulative_fetch"] += elapsed
  373.             if finished:
  374.                 self.log("finished reading ciphertext", level=log.NOISY)
  375.                 fire_when_done.callback(None)
  376.             else:
  377.                 self._loop(fire_when_done)
  378.         def _err(f):
  379.             self.log(format="[%(si)s] ciphertext read failed",
  380.                      si=self._upload_id, failure=f, level=log.UNUSUAL)
  381.             fire_when_done.errback(f)
  382.         d.addCallbacks(_done, _err)
  383.         return None
  384. 
  385.     def _fetch(self):
  386.         needed = self._expected_size - self._have
  387.         fetch_size = min(needed, self.CHUNK_SIZE)
  388.         if fetch_size == 0:
  389.             self._upload_helper._upload_status.set_progress(1, 1.0)
  390.             return True # all done
  391.         percent = 0.0
  392.         if self._expected_size:
  393.             percent = 1.0 * (self._have+fetch_size) / self._expected_size
  394.         self.log(format="fetching [%(si)s] %(start)d-%(end)d of %(total)d (%(percent)d%%)",
  395.                  si=self._upload_id,
  396.                  start=self._have,
  397.                  end=self._have+fetch_size,
  398.                  total=self._expected_size,
  399.                  percent=int(100.0*percent),
  400.                  level=log.NOISY)
  401.         d = self.call("read_encrypted", self._have, fetch_size)
  402.         def _got_data(ciphertext_v):
  403.             for data in ciphertext_v:
  404.                 self._f.write(data)
  405.                 self._have += len(data)
  406.                 self._ciphertext_fetched += len(data)
  407.                 self._upload_helper._helper.count("chk_upload_helper.fetched_bytes", len(data))
  408.                 self._upload_helper._upload_status.set_progress(1, percent)
  409.             return False # not done
  410.         d.addCallback(_got_data)
  411.         return d
  412. 
  413.     def _done(self, res):
  414.         self._f.close()
  415.         self._f = None
  416.         self.log(format="done fetching ciphertext, size=%(size)d",
  417.                  size=os.stat(self._incoming_file)[stat.ST_SIZE],
  418.                  level=log.NOISY)
  419.         os.rename(self._incoming_file, self._encoding_file)
  420. 
  421.     def _done2(self, _ignored, started):
  422.         self.log("done2", level=log.NOISY)
  423.         elapsed = time.time() - started
  424.         self._times["total"] = elapsed
  425.         self._readers = []
  426.         self._done_observers.fire(None)
  427. 
  428.     def _failed(self, f):
  429.         if self._f:
  430.             self._f.close()
  431.         self._readers = []
  432.         self._done_observers.fire(f)
  433. 
  434.     def when_done(self):
  435.         return self._done_observers.when_fired()
  436. 
  437.     def get_times(self):
  438.         return self._times
  439. 
  440.     def get_ciphertext_fetched(self):
  441.         return self._ciphertext_fetched
  442. 
  443. 
  444. class LocalCiphertextReader(AskUntilSuccessMixin):
  445.     implements(interfaces.IEncryptedUploadable)
  446. 
  447.     def __init__(self, upload_helper, storage_index, encoding_file):
  448.         self._readers = []
  449.         self._upload_helper = upload_helper
  450.         self._storage_index = storage_index
  451.         self._encoding_file = encoding_file
  452.         self._status = None
  453. 
  454.     def start(self):
  455.         self._upload_helper._upload_status.set_status("pushing")
  456.         self._size = os.stat(self._encoding_file)[stat.ST_SIZE]
  457.         self.f = open(self._encoding_file, "rb")
  458. 
  459.     def get_size(self):
  460.         return defer.succeed(self._size)
  461. 
  462.     def get_all_encoding_parameters(self):
  463.         return self.call("get_all_encoding_parameters")
  464. 
  465.     def get_storage_index(self):
  466.         return defer.succeed(self._storage_index)
  467. 
  468.     def read_encrypted(self, length, hash_only):
  469.         assert hash_only is False
  470.         d = defer.maybeDeferred(self.f.read, length)
  471.         d.addCallback(lambda data: [data])
  472.         return d
  473. 
  474.     def close(self):
  475.         self.f.close()
  476.         # ??. I'm not sure if it makes sense to forward the close message.
  477.         return self.call("close")
  478. 
  479. 
  480. 
  481. class Helper(Referenceable):
  482.     implements(interfaces.RIHelper, interfaces.IStatsProducer)
  483.     # this is the non-distributed version. When we need to have multiple
  484.     # helpers, this object will become the HelperCoordinator, and will query
  485.     # the farm of Helpers to see if anyone has the storage_index of interest,
  486.     # and send the request off to them. If nobody has it, we'll choose a
  487.     # helper at random.
  488. 
  489.     name = "helper"
  490.     VERSION = { "http://allmydata.org/tahoe/protocols/helper/v1" :
  491.                  { },
  492.                 "application-version": str(allmydata.__full_version__),
  493.                 }
  494.     chk_upload_helper_class = CHKUploadHelper
  495.     MAX_UPLOAD_STATUSES = 10
  496. 
  497.     def __init__(self, basedir, storage_broker, secret_holder,
  498.                  stats_provider, history):
  499.         self._basedir = basedir
  500.         self._storage_broker = storage_broker
  501.         self._secret_holder = secret_holder
  502.         self._chk_incoming = os.path.join(basedir, "CHK_incoming")
  503.         self._chk_encoding = os.path.join(basedir, "CHK_encoding")
  504.         fileutil.make_dirs(self._chk_incoming)
  505.         fileutil.make_dirs(self._chk_encoding)
  506.         self._active_uploads = {}
  507.         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
  508.         self.stats_provider = stats_provider
  509.         if stats_provider:
  510.             stats_provider.register_producer(self)
  511.         self._counters = {"chk_upload_helper.upload_requests": 0,
  512.                           "chk_upload_helper.upload_already_present": 0,
  513.                           "chk_upload_helper.upload_need_upload": 0,
  514.                           "chk_upload_helper.resumes": 0,
  515.                           "chk_upload_helper.fetched_bytes": 0,
  516.                           "chk_upload_helper.encoded_bytes": 0,
  517.                           }
  518.         self._history = history
  519. 
  520.     def log(self, *args, **kwargs):
  521.         if 'facility' not in kwargs:
  522.             kwargs['facility'] = "tahoe.helper"
  523.         return log.msg(*args, **kwargs)
  524. 
  525.     def count(self, key, value=1):
  526.         if self.stats_provider:
  527.             self.stats_provider.count(key, value)
  528.         self._counters[key] += value
  529. 
  530.     def get_stats(self):
  531.         OLD = 86400*2 # 48hours
  532.         now = time.time()
  533.         inc_count = inc_size = inc_size_old = 0
  534.         enc_count = enc_size = enc_size_old = 0
  535.         inc = os.listdir(self._chk_incoming)
  536.         enc = os.listdir(self._chk_encoding)
  537.         for f in inc:
  538.             s = os.stat(os.path.join(self._chk_incoming, f))
  539.             size = s[stat.ST_SIZE]
  540.             mtime = s[stat.ST_MTIME]
  541.             inc_count += 1
  542.             inc_size += size
  543.             if now - mtime > OLD:
  544.                 inc_size_old += size
  545.         for f in enc:
  546.             s = os.stat(os.path.join(self._chk_encoding, f))
  547.             size = s[stat.ST_SIZE]
  548.             mtime = s[stat.ST_MTIME]
  549.             enc_count += 1
  550.             enc_size += size
  551.             if now - mtime > OLD:
  552.                 enc_size_old += size
  553.         stats = { 'chk_upload_helper.active_uploads': len(self._active_uploads),
  554.                   'chk_upload_helper.incoming_count': inc_count,
  555.                   'chk_upload_helper.incoming_size': inc_size,
  556.                   'chk_upload_helper.incoming_size_old': inc_size_old,
  557.                   'chk_upload_helper.encoding_count': enc_count,
  558.                   'chk_upload_helper.encoding_size': enc_size,
  559.                   'chk_upload_helper.encoding_size_old': enc_size_old,
  560.                   }
  561.         stats.update(self._counters)
  562.         return stats
  563. 
  564.     def remote_get_version(self):
  565.         return self.VERSION
  566. 
  567.     def remote_upload_chk(self, storage_index):
  568.         self.count("chk_upload_helper.upload_requests")
  569.         r = upload.UploadResults()
  570.         started = time.time()
  571.         si_s = si_b2a(storage_index)
  572.         lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
  573.         incoming_file = os.path.join(self._chk_incoming, si_s)
  574.         encoding_file = os.path.join(self._chk_encoding, si_s)
  575.         if storage_index in self._active_uploads:
  576.             self.log("upload is currently active", parent=lp)
  577.             uh = self._active_uploads[storage_index]
  578.             return uh.start()
  579. 
  580.         d = self._check_for_chk_already_in_grid(storage_index, r, lp)
  581.         def _checked(already_present):
  582.             elapsed = time.time() - started
  583.             r.timings['existence_check'] = elapsed
  584.             if already_present:
  585.                 # the necessary results are placed in the UploadResults
  586.                 self.count("chk_upload_helper.upload_already_present")
  587.                 self.log("file already found in grid", parent=lp)
  588.                 return (r, None)
  589. 
  590.             self.count("chk_upload_helper.upload_need_upload")
  591.             # the file is not present in the grid, by which we mean there are
  592.             # less than 'N' shares available.
  593.             self.log("unable to find file in the grid", parent=lp,
  594.                      level=log.NOISY)
  595.             # We need an upload helper. Check our active uploads again in
  596.             # case there was a race.
  597.             if storage_index in self._active_uploads:
  598.                 self.log("upload is currently active", parent=lp)
  599.                 uh = self._active_uploads[storage_index]
  600.             else:
  601.                 self.log("creating new upload helper", parent=lp)
  602.                 uh = self.chk_upload_helper_class(storage_index, self,
  603.                                                   self._storage_broker,
  604.                                                   self._secret_holder,
  605.                                                   incoming_file, encoding_file,
  606.                                                   r, lp)
  607.                 self._active_uploads[storage_index] = uh
  608.                 self._add_upload(uh)
  609.             return uh.start()
  610.         d.addCallback(_checked)
  611.         def _err(f):
  612.             self.log("error while checking for chk-already-in-grid",
  613.                      failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
  614.             return f
  615.         d.addErrback(_err)
  616.         return d
  617. 
  618.     def _check_for_chk_already_in_grid(self, storage_index, results, lp):
  619.         # see if this file is already in the grid
  620.         lp2 = self.log("doing a quick check+UEBfetch",
  621.                        parent=lp, level=log.NOISY)
  622.         sb = self._storage_broker
  623.         c = CHKCheckerAndUEBFetcher(sb.get_servers_for_index, storage_index, lp2)
  624.         d = c.check()
  625.         def _checked(res):
  626.             if res:
  627.                 (sharemap, ueb_data, ueb_hash) = res
  628.                 self.log("found file in grid", level=log.NOISY, parent=lp)
  629.                 results.uri_extension_hash = ueb_hash
  630.                 results.sharemap = sharemap
  631.                 results.uri_extension_data = ueb_data
  632.                 results.preexisting_shares = len(sharemap)
  633.                 results.pushed_shares = 0
  634.                 return True
  635.             return False
  636.         d.addCallback(_checked)
  637.         return d
  638. 
  639.     def _add_upload(self, uh):
  640.         self._all_uploads[uh] = None
  641.         if self._history:
  642.             s = uh.get_upload_status()
  643.             self._history.notify_helper_upload(s)
  644. 
  645.     def upload_finished(self, storage_index, size):
  646.         # this is called with size=0 if the upload failed
  647.         self.count("chk_upload_helper.encoded_bytes", size)
  648.         uh = self._active_uploads[storage_index]
  649.         del self._active_uploads[storage_index]
  650.         s = uh.get_upload_status()
  651.         s.set_active(False)